Fennel: /home/pub/open/dev/fennel/exec/ParallelExecStreamScheduler.h Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 #ifndef Fennel_ParallelExecStreamScheduler_Included 00024 #define Fennel_ParallelExecStreamScheduler_Included 00025 00026 #include "fennel/exec/ExecStreamScheduler.h" 00027 #include "fennel/exec/ExecStreamGraphImpl.h" 00028 #include "fennel/synch/ThreadPool.h" 00029 #include "fennel/synch/SynchMonitoredObject.h" 00030 #include "fennel/common/FennelExcn.h" 00031 00032 #include 00033 #include 00034 #include <boost/scoped_ptr.hpp> 00035 00036 FENNEL_BEGIN_NAMESPACE 00037 00038 class ExecStreamGraphImpl; 00039 class ParallelExecStreamScheduler; 00040 class ThreadTracker; 00041 00046 class FENNEL_EXEC_EXPORT ParallelExecTask 00047 { 00048 ParallelExecStreamScheduler &scheduler; 00049 ExecStream *pStream; 00050 00051 public: 00052 explicit ParallelExecTask( 00053 ParallelExecStreamScheduler &scheduler, 00054 ExecStream *pStream); 00055 00056 inline ExecStreamId getStreamId() const 00057 { 00058 return pStream->getStreamId(); 00059 } 00060 00061 void execute(); 00062 }; 00063 00068 class FENNEL_EXEC_EXPORT ParallelExecResult 00069 { 00070 ExecStreamId streamId; 00071 ExecStreamResult rc; 00072 00073 public: 00074 explicit ParallelExecResult( 00075 ExecStreamId streamId, 00076 ExecStreamResult rc); 00077 00078 inline ExecStreamId getStreamId() const 00079 { 00080 return streamId; 00081 } 00082 00083 inline ExecStreamResult getResultCode() const 00084 { 00085 return rc; 00086 } 00087 }; 00088 00098 class FENNEL_EXEC_EXPORT ParallelExecStreamScheduler 00099 : public ExecStreamScheduler, public SynchMonitoredObject 00100 { 00101 enum StreamState 00102 { 00103 SS_SLEEPING, 00104 SS_RUNNING, 00105 SS_INHIBITED 00106 }; 00107 00108 struct StreamStateMapEntry 00109 { 00110 StreamState state; 00111 int inhibitionCount; 00112 }; 00113 00114 enum ManagerState { 00115 MGR_RUNNING, 00116 MGR_STOPPING, 00117 MGR_STOPPED 00118 }; 00119 00120 typedef std::hash_map<ExecStreamId, StreamStateMapEntry> 00121 StreamStateMap; 00122 typedef std::deque InhibitedQueue; 00123 00124 friend class ParallelExecTask; 00125 00126 SharedExecStreamGraph pGraph; 00127 00128 ThreadPool threadPool; 00129 std::deque completedQueue; 00130 00131 ThreadTracker &threadTracker; 00132 00133 StreamStateMap streamStateMap; 00134 ManagerState mgrState; 00135 00136 InhibitedQueue inhibitedQueue; 00137 InhibitedQueue transitQueue; 00138 LocalCondition sentinelCondition; 00139 00140 uint degreeOfParallelism; 00141 00142 boost::scoped_ptr pPendingExcn; 00143 00144 void tryExecuteManager(); 00145 void executeManager(); 00146 void tryExecuteTask(ExecStream &); 00147 void executeTask(ExecStream &); 00148 bool addToQueue(ExecStreamId streamId); 00149 void signalSentinel(ExecStreamId sentinelId); 00150 void retryInhibitedQueue(); 00151 void processCompletedTask(ParallelExecResult const &task); 00152 inline bool isInhibited(ExecStreamId streamId); 00153 inline void alterNeighborInhibition(ExecStreamId streamId, int delta); 00154 00155 public: 00169 explicit ParallelExecStreamScheduler( 00170 SharedTraceTarget pTraceTarget, 00171 std::string name, 00172 ThreadTracker &threadTracker, 00173 uint degreeOfParallelism); 00174 00175 virtual ~ParallelExecStreamScheduler(); 00176 00177
00178 virtual void addGraph(SharedExecStreamGraph pGraph); 00179 virtual void removeGraph(SharedExecStreamGraph pGraph); 00180 virtual void start(); 00181 virtual void setRunnable(ExecStream &stream, bool); 00182 virtual void makeRunnable(ExecStream &stream); 00183 virtual void abort(ExecStreamGraph &graph); 00184 virtual void checkAbort() const; 00185 virtual void stop(); 00186 virtual ExecStreamBufAccessor &readStream(ExecStream &stream); 00187 virtual void createBufferProvisionAdapter( 00188 ExecStreamEmbryo &embryo); 00189 virtual uint getDegreeOfParallelism(); 00190 }; 00191 00192 FENNEL_END_NAMESPACE 00193 00194 #endif 00195 00196