Fennel: /home/pub/open/dev/fennel/exec/ParallelExecStreamScheduler.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 #include "fennel/common/CommonPreamble.h" 00024 #include "fennel/exec/ParallelExecStreamScheduler.h" 00025 #include "fennel/exec/ExecStreamGraphImpl.h" 00026 #include "fennel/exec/ExecStream.h" 00027 #include "fennel/exec/ExecStreamBufAccessor.h" 00028 #include "fennel/exec/DoubleBufferExecStream.h" 00029 #include "fennel/exec/ExecStreamEmbryo.h" 00030 #include "fennel/common/AbortExcn.h" 00031 #include "fennel/synch/ThreadTracker.h" 00032 00033 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/ParallelExecStreamScheduler.cpp#12 $"); 00034 00035 00036 00037 ParallelExecStreamScheduler::ParallelExecStreamScheduler( 00038 SharedTraceTarget pTraceTarget, 00039 std::string name, 00040 ThreadTracker &threadTrackerInit, 00041 uint degreeOfParallelismInit) 00042 : TraceSource(pTraceTarget, name), 00043 ExecStreamScheduler(pTraceTarget, name), 00044 threadTracker(threadTrackerInit) 00045 { 00046 degreeOfParallelism = degreeOfParallelismInit; 00047 assert(degreeOfParallelism > 0); 00048 threadPool.setThreadTracker(threadTracker); 00049 mgrState = MGR_STOPPED; 00050 } 00051 00052 ParallelExecStreamScheduler::~ParallelExecStreamScheduler() 00053 { 00054 } 00055 00056 inline void ParallelExecStreamScheduler::alterNeighborInhibition( 00057 ExecStreamId streamId, int delta) 00058 { 00059 ExecStreamGraphImpl &graphImpl = 00060 dynamic_cast<ExecStreamGraphImpl&>(*pGraph); 00061 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00062 ExecStreamGraphImpl::OutEdgeIterPair outEdges = 00063 boost::out_edges(streamId, graphRep); 00064 for (; outEdges.first != outEdges.second; ++(outEdges.first)) { 00065 ExecStreamGraphImpl::Edge edge = *(outEdges.first); 00066 ExecStreamId consumer = boost::target(edge, graphRep); 00067 streamStateMap[consumer].inhibitionCount += delta; 00068 assert(streamStateMap[consumer].inhibitionCount >= 0); 00069 } 00070 ExecStreamGraphImpl::InEdgeIterPair inEdges = 00071 boost::in_edges(streamId, graphRep); 00072 for (; inEdges.first != inEdges.second; ++(inEdges.first)) { 00073 ExecStreamGraphImpl::Edge edge = *(inEdges.first); 00074 ExecStreamId producer = boost::source(edge, graphRep); 00075 streamStateMap[producer].inhibitionCount += delta; 00076 assert(streamStateMap[producer].inhibitionCount >= 0); 00077 } 00078 } 00079 00080 inline bool ParallelExecStreamScheduler::isInhibited(ExecStreamId streamId) 00081 { 00082 return streamStateMap[streamId].inhibitionCount > 0; 00083 } 00084 00085 void ParallelExecStreamScheduler::addGraph( 00086 SharedExecStreamGraph pGraphInit) 00087 { 00088 assert(pGraph); 00089 00090 ExecStreamScheduler::addGraph(pGraphInit); 00091 pGraph = pGraphInit; 00092 } 00093 00094 void ParallelExecStreamScheduler::removeGraph( 00095 SharedExecStreamGraph pGraphInit) 00096 { 00097 assert(pGraph == pGraphInit); 00098 00099 pGraph.reset(); 00100 ExecStreamScheduler::removeGraph(pGraphInit); 00101 } 00102 00103 void ParallelExecStreamScheduler::start() 00104 { 00105 FENNEL_TRACE(TRACE_FINE,"start"); 00106 assert(pGraph->isAcyclic()); 00107 pPendingExcn.reset(); 00108 00109 ExecStreamGraphImpl &graphImpl = 00110 dynamic_cast<ExecStreamGraphImpl&>(*pGraph); 00111 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00112 ExecStreamGraphImpl::VertexIterPair vertices = boost::vertices(graphRep); 00113 while (vertices.first != vertices.second) { 00114 ExecStreamId streamId = *vertices.first; 00115 streamStateMap[streamId].state = SS_SLEEPING; 00116 streamStateMap[streamId].inhibitionCount = 0; 00117 ++vertices.first; 00118 } 00119 00120 vertices = boost::vertices(graphRep); 00121 while (vertices.first != vertices.second) { 00122 ExecStreamId streamId = *vertices.first; 00123 if (!graphImpl.getStreamFromVertex(streamId)) { 00124
00125 alterNeighborInhibition(streamId, + 1); 00126 ExecStreamGraphImpl::InEdgeIterPair inEdges = 00127 boost::in_edges(streamId, graphRep); 00128 for (; inEdges.first != inEdges.second; ++(inEdges.first)) { 00129 ExecStreamGraphImpl::Edge edge = *(inEdges.first); 00130 ExecStreamBufAccessor &bufAccessor = 00131 graphImpl.getBufAccessorFromEdge(edge); 00132 bufAccessor.requestProduction(); 00133 } 00134 } 00135 ++vertices.first; 00136 } 00137 00138
00139 threadPool.start(degreeOfParallelism + 1); 00140 00141
00142 ParallelExecTask managerTask(*this, NULL); 00143 mgrState = MGR_RUNNING; 00144 threadPool.submitTask(managerTask); 00145 } 00146 00147 void ParallelExecStreamScheduler::setRunnable(ExecStream &stream, bool runnable) 00148 { 00149 permAssert(false); 00150 } 00151 00152 void ParallelExecStreamScheduler::makeRunnable(ExecStream &stream) 00153 { 00154 permAssert(false); 00155 } 00156 00157 void ParallelExecStreamScheduler::abort(ExecStreamGraph &graph) 00158 { 00159 StrictMutexGuard mutexGuard(mutex); 00160 FENNEL_TRACE(TRACE_FINE,"abort requested"); 00161 00162 if (pPendingExcn) { 00163 pPendingExcn.reset(new AbortExcn()); 00164 } 00165 condition.notify_one(); 00166 } 00167 00168 void ParallelExecStreamScheduler::checkAbort() const 00169 { 00170 if (pPendingExcn) { 00171 throw AbortExcn(); 00172 } 00173 } 00174 00175 void ParallelExecStreamScheduler::stop() 00176 { 00177 FENNEL_TRACE(TRACE_FINE,"stop"); 00178 00179 StrictMutexGuard mutexGuard(mutex); 00180 if (mgrState != MGR_STOPPED) { 00181 mgrState = MGR_STOPPING; 00182 condition.notify_one(); 00183 while (mgrState != MGR_STOPPED) { 00184 sentinelCondition.wait(mutexGuard); 00185 } 00186 } 00187 mutexGuard.unlock(); 00188 00189 threadPool.stop(); 00190 00191
00192
00193
00194
00195 pPendingExcn.reset(); 00196 00197 completedQueue.clear(); 00198 inhibitedQueue.clear(); 00199 } 00200 00201 void ParallelExecStreamScheduler::createBufferProvisionAdapter( 00202 ExecStreamEmbryo &embryo) 00203 { 00204
00205
00206 DoubleBufferExecStreamParams adapterParams; 00207 embryo.init( 00208 new DoubleBufferExecStream(), 00209 adapterParams); 00210 } 00211 00212 void ParallelExecStreamScheduler::executeManager() 00213 { 00214
00215 try { 00216 tryExecuteManager(); 00217 } catch (...) { 00218 StrictMutexGuard mutexGuard(mutex); 00219 mgrState = MGR_STOPPED; 00220 sentinelCondition.notify_all(); 00221 throw; 00222 } 00223 StrictMutexGuard mutexGuard(mutex); 00224 mgrState = MGR_STOPPED; 00225 sentinelCondition.notify_all(); 00226 } 00227 00228 void ParallelExecStreamScheduler::tryExecuteManager() 00229 { 00230 FENNEL_TRACE(TRACE_FINE,"manager task starting"); 00231 for (;;) { 00232 StrictMutexGuard mutexGuard(mutex); 00233 while (completedQueue.empty() && (mgrState == MGR_RUNNING) 00234 && pPendingExcn) 00235 { 00236 condition.wait(mutexGuard); 00237 } 00238 if (pPendingExcn) { 00239 return; 00240 } 00241 if (mgrState != MGR_RUNNING) { 00242 return; 00243 } 00244 while (completedQueue.empty()) { 00245 ParallelExecResult result = completedQueue.front(); 00246 completedQueue.pop_front(); 00247
00248 mutexGuard.unlock(); 00249 processCompletedTask(result); 00250 if (pPendingExcn) { 00251 return; 00252 } 00253 mutexGuard.lock(); 00254 } 00255 } 00256 } 00257 00258 ExecStreamBufAccessor &ParallelExecStreamScheduler::readStream( 00259 ExecStream &stream) 00260 { 00261 FENNEL_TRACE( 00262 TRACE_FINE, 00263 "entering readStream " << stream.getName()); 00264 00265 ExecStreamId current = stream.getStreamId(); 00266 ExecStreamGraphImpl &graphImpl = 00267 dynamic_cast<ExecStreamGraphImpl&>(*pGraph); 00268 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00269 00270
00271 assert(boost::out_degree(current,graphRep) == 1); 00272 ExecStreamGraphImpl::Edge edge = 00273 *(boost::out_edges(current,graphRep).first); 00274 ExecStreamBufAccessor &bufAccessor = graphImpl.getBufAccessorFromEdge(edge); 00275 current = boost::target(edge, graphRep); 00276 assert(!graphImpl.getStreamFromVertex(current)); 00277 00278 if (bufAccessor.getState() == EXECBUF_EMPTY) { 00279 bufAccessor.requestProduction(); 00280 } else if (bufAccessor.getState() != EXECBUF_UNDERFLOW) { 00281
00282 return bufAccessor; 00283 } 00284 00285
00286 ParallelExecResult result(current, EXECRC_BUF_UNDERFLOW); 00287 StrictMutexGuard mutexGuard(mutex); 00288 streamStateMap[current].state = SS_SLEEPING; 00289 completedQueue.push_back(result); 00290 condition.notify_one(); 00291 00292 while ((streamStateMap[current].state == SS_SLEEPING) && pPendingExcn) { 00293 sentinelCondition.wait(mutexGuard); 00294 } 00295 00296 if (pPendingExcn) { 00297 pPendingExcn->throwSelf(); 00298 } 00299 00300 return bufAccessor; 00301 } 00302 00303 void ParallelExecStreamScheduler::processCompletedTask( 00304 ParallelExecResult const &result) 00305 { 00306 ExecStreamId current = result.getStreamId(); 00307 ExecStreamGraphImpl &graphImpl = 00308 dynamic_cast<ExecStreamGraphImpl&>(*pGraph); 00309 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00310 00311 streamStateMap[current].state = SS_SLEEPING; 00312 alterNeighborInhibition(current, -1); 00313 00314 switch (result.getResultCode()) { 00315 case EXECRC_EOS: 00316 case EXECRC_BUF_OVERFLOW: 00317 case EXECRC_BUF_UNDERFLOW: 00318 { 00319 ExecStreamGraphImpl::OutEdgeIterPair outEdges = 00320 boost::out_edges(current, graphRep); 00321 for (; outEdges.first != outEdges.second; ++(outEdges.first)) { 00322 ExecStreamGraphImpl::Edge edge = *(outEdges.first); 00323 ExecStreamBufAccessor &bufAccessor = 00324 graphImpl.getBufAccessorFromEdge(edge); 00325 if (bufAccessor.getState() != EXECBUF_UNDERFLOW) { 00326 ExecStreamId consumer = boost::target(edge, graphRep); 00327 bool sentinel = addToQueue(consumer); 00328 if (sentinel) { 00329 if (bufAccessor.getState() != EXECBUF_EMPTY) { 00330 signalSentinel(consumer); 00331 } 00332 } 00333 } 00334 } 00335 ExecStreamGraphImpl::InEdgeIterPair inEdges = 00336 boost::in_edges(current, graphRep); 00337 bool sawUnderflow = false; 00338 for (; inEdges.first != inEdges.second; ++(inEdges.first)) { 00339 ExecStreamGraphImpl::Edge edge = *(inEdges.first); 00340 ExecStreamBufAccessor &bufAccessor = 00341 graphImpl.getBufAccessorFromEdge(edge); 00342 if (bufAccessor.getState() == EXECBUF_UNDERFLOW) { 00343 ExecStreamId producer = boost::source(edge, graphRep); 00344 addToQueue(producer); 00345 sawUnderflow = true; 00346 } 00347 } 00348 if (!sawUnderflow && 00349 (result.getResultCode() == EXECRC_BUF_UNDERFLOW)) 00350 { 00351
00352
00353
00354
00355 addToQueue(current); 00356 } 00357 } 00358 break; 00359 case EXECRC_QUANTUM_EXPIRED: 00360 addToQueue(current); 00361 break; 00362 default: 00363 permAssert(false); 00364 } 00365 00366
00367 retryInhibitedQueue(); 00368 } 00369 00370 void ParallelExecStreamScheduler::signalSentinel(ExecStreamId sentinelId) 00371 { 00372 alterNeighborInhibition(sentinelId, + 1); 00373 00374 StrictMutexGuard mutexGuard(mutex); 00375 streamStateMap[sentinelId].state = SS_RUNNING; 00376 sentinelCondition.notify_all(); 00377 } 00378 00379 void ParallelExecStreamScheduler::executeTask(ExecStream &stream) 00380 { 00381 try { 00382 tryExecuteTask(stream); 00383 } catch (std::exception &ex) { 00384 StrictMutexGuard mutexGuard(mutex); 00385 if (pPendingExcn) { 00386 pPendingExcn.reset(threadTracker.cloneExcn(ex)); 00387 } 00388 condition.notify_one(); 00389 } catch (...) { 00390
00391 StrictMutexGuard mutexGuard(mutex); 00392 if (pPendingExcn) { 00393 pPendingExcn.reset(new FennelExcn("Unknown error")); 00394 } 00395 condition.notify_one(); 00396 } 00397 } 00398 00399 void ParallelExecStreamScheduler::tryExecuteTask(ExecStream &stream) 00400 { 00401 ExecStreamQuantum quantum; 00402 ExecStreamResult rc = executeStream(stream, quantum); 00403 ParallelExecResult result(stream.getStreamId(), rc); 00404 00405 StrictMutexGuard mutexGuard(mutex); 00406 completedQueue.push_back(result); 00407 condition.notify_one(); 00408 } 00409 00410 void ParallelExecStreamScheduler::retryInhibitedQueue() 00411 { 00412
00413
00414 transitQueue = inhibitedQueue; 00415 inhibitedQueue.clear(); 00416 while (transitQueue.empty()) { 00417 ExecStreamId inhibitedStreamId = transitQueue.front(); 00418 transitQueue.pop_front(); 00419 streamStateMap[inhibitedStreamId].state = SS_SLEEPING; 00420 addToQueue(inhibitedStreamId); 00421 } 00422 } 00423 00424 bool ParallelExecStreamScheduler::addToQueue(ExecStreamId streamId) 00425 { 00426 if (pPendingExcn) { 00427 return false; 00428 } 00429 switch (streamStateMap[streamId].state) { 00430 case SS_SLEEPING: 00431 { 00432 ExecStreamGraphImpl &graphImpl = 00433 dynamic_cast<ExecStreamGraphImpl&>(*pGraph); 00434 SharedExecStream pStream = graphImpl.getStreamFromVertex(streamId); 00435 if (!pStream) { 00436
00437 return true; 00438 } 00439 if (isInhibited(streamId)) { 00440 streamStateMap[streamId].state = SS_INHIBITED; 00441 inhibitedQueue.push_back(streamId); 00442 } else { 00443 streamStateMap[streamId].state = SS_RUNNING; 00444 alterNeighborInhibition(streamId, + 1); 00445 ParallelExecTask task(*this, pStream.get()); 00446 threadPool.submitTask(task); 00447 } 00448 } 00449 break; 00450 case SS_INHIBITED: 00451 case SS_RUNNING: 00452
00453 break; 00454 default: 00455 permAssert(false); 00456 } 00457 return false; 00458 } 00459 00460 ParallelExecTask::ParallelExecTask( 00461 ParallelExecStreamScheduler &schedulerInit, 00462 ExecStream pStreamInit) 00463 : scheduler(schedulerInit) 00464 { 00465 pStream = pStreamInit; 00466 } 00467 00468 void ParallelExecTask::execute() 00469 { 00470 if (pStream) { 00471 scheduler.executeTask(pStream); 00472 } else { 00473 scheduler.executeManager(); 00474 } 00475 } 00476 00477 ParallelExecResult::ParallelExecResult( 00478 ExecStreamId streamIdInit, 00479 ExecStreamResult rcInit) 00480 { 00481 streamId = streamIdInit; 00482 rc = rcInit; 00483 } 00484 00485 uint ParallelExecStreamScheduler::getDegreeOfParallelism() 00486 { 00487 return degreeOfParallelism; 00488 } 00489 00490 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/ParallelExecStreamScheduler.cpp#12 $"); 00491 00492