Fennel: /home/pub/open/dev/fennel/exec/DfsTreeExecStreamScheduler.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/exec/DfsTreeExecStreamScheduler.h" 00026 #include "fennel/exec/ExecStreamGraphImpl.h" 00027 #include "fennel/exec/ExecStream.h" 00028 #include "fennel/exec/ExecStreamBufAccessor.h" 00029 #include "fennel/common/AbortExcn.h" 00030 00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/DfsTreeExecStreamScheduler.cpp#20 $"); 00032 00033 DfsTreeExecStreamScheduler::DfsTreeExecStreamScheduler( 00034 SharedTraceTarget pTraceTargetInit, 00035 std::string nameInit) 00036 : TraceSource(pTraceTargetInit, nameInit), 00037 ExecStreamScheduler(pTraceTargetInit, nameInit) 00038 { 00039 } 00040 00041 DfsTreeExecStreamScheduler::~DfsTreeExecStreamScheduler() 00042 { 00043 } 00044 00045 void DfsTreeExecStreamScheduler::addGraph(SharedExecStreamGraph pGraphInit) 00046 { 00047 assert(pGraph); 00048 00049 ExecStreamScheduler::addGraph(pGraphInit); 00050 pGraph = pGraphInit; 00051 } 00052 00053 void DfsTreeExecStreamScheduler::removeGraph(SharedExecStreamGraph pGraphInit) 00054 { 00055 assert(pGraph == pGraphInit); 00056 00057 pGraph.reset(); 00058 ExecStreamScheduler::removeGraph(pGraphInit); 00059 } 00060 00061 void DfsTreeExecStreamScheduler::start() 00062 { 00063 FENNEL_TRACE(TRACE_FINE,"start"); 00064 00065
00066
00067
00068 00069
00070
00071
00072 assert(pGraph->isAcyclic()); 00073 aborted = false; 00074 } 00075 00076 void DfsTreeExecStreamScheduler::setRunnable(ExecStream &, bool) 00077 { 00078 permAssert(false); 00079 } 00080 00081 void DfsTreeExecStreamScheduler::abort(ExecStreamGraph &) 00082 { 00083 FENNEL_TRACE(TRACE_FINE,"abort requested"); 00084 00085 aborted = true; 00086 } 00087 00088 void DfsTreeExecStreamScheduler::checkAbort() const 00089 { 00090 if (aborted) { 00091 FENNEL_TRACE(TRACE_FINE,"abort detected"); 00092 throw AbortExcn(); 00093 } 00094 } 00095 00096 void DfsTreeExecStreamScheduler::stop() 00097 { 00098 FENNEL_TRACE(TRACE_FINE,"stop"); 00099 00100
00101 aborted = false; 00102 } 00103 00104 ExecStreamBufAccessor &DfsTreeExecStreamScheduler::readStream( 00105 ExecStream &stream) 00106 { 00107 FENNEL_TRACE( 00108 TRACE_FINE, 00109 "entering readStream " << stream.getName()); 00110 00111 ExecStreamId current = stream.getStreamId(); 00112 ExecStreamQuantum quantum; 00113 00114 ExecStreamGraphImpl &graphImpl = 00115 dynamic_cast<ExecStreamGraphImpl&>(*pGraph); 00116 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00117 00118
00119 assert(boost::out_degree(current,graphRep) == 1); 00120 assert(!graphImpl.getStreamFromVertex( 00121 boost::target( 00122 *(boost::out_edges(current,graphRep).first), 00123 graphRep))); 00124 00125
00126 00127 for (;;) { 00128 ExecStreamGraphImpl::InEdgeIterPair inEdges = 00129 boost::in_edges(current,graphRep); 00130 for (; inEdges.first != inEdges.second; ++(inEdges.first)) { 00131 ExecStreamGraphImpl::Edge edge = *(inEdges.first); 00132 ExecStreamBufAccessor &bufAccessor = 00133 graphImpl.getBufAccessorFromEdge(edge); 00134 if (bufAccessor.getState() == EXECBUF_UNDERFLOW) { 00135
00136 current = boost::source(edge,graphRep); 00137 break; 00138 } 00139 } 00140 if (inEdges.first != inEdges.second) { 00141
00142 continue; 00143 } 00144 00145 SharedExecStream pStream = graphImpl.getStreamFromVertex(current); 00146 ExecStreamResult rc = executeStream(*pStream, quantum); 00147 00148 checkAbort(); 00149 00150 ExecStreamGraphImpl::Edge edge; 00151 00152 switch (rc) { 00153 case EXECRC_EOS: 00154
00155 if (findNextConsumer( 00156 graphImpl, graphRep, stream, edge, current, EXECBUF_EOS)) 00157 { 00158 return graphImpl.getBufAccessorFromEdge(edge); 00159 } 00160
00161 break; 00162 case EXECRC_BUF_OVERFLOW: 00163
00164
00165 if (findNextConsumer( 00166 graphImpl, graphRep, stream, edge, current, EXECBUF_UNDERFLOW)) 00167 { 00168 return graphImpl.getBufAccessorFromEdge(edge); 00169 } 00170 break; 00171 case EXECRC_BUF_UNDERFLOW: 00172
00173
00174 break; 00175 case EXECRC_QUANTUM_EXPIRED: 00176 break; 00177 default: 00178 permAssert(false); 00179 } 00180 } 00181 } 00182 00183 bool DfsTreeExecStreamScheduler::findNextConsumer( 00184 ExecStreamGraphImpl &graphImpl, 00185 const ExecStreamGraphImpl::GraphRep &graphRep, 00186 const ExecStream &stream, 00187 ExecStreamGraphImpl::Edge &edge, 00188 ExecStreamId &current, 00189 ExecStreamBufState skipState) 00190 { 00191 ExecStreamGraphImpl::OutEdgeIterPair outEdges = 00192 boost::out_edges(current,graphRep); 00193 00194 bool emptyFound = false; 00195
00196 ExecStreamGraphImpl::Edge emptyEdge = edge; 00197 ExecStreamId emptyStreamId = current; 00198 00199 for (; outEdges.first != outEdges.second; ++(outEdges.first)) { 00200 edge = *(outEdges.first); 00201 current = boost::target(edge,graphRep); 00202 if (boost::out_degree(current,graphRep) == 0) { 00203
00204 assert(!graphImpl.getStreamFromVertex(current)); 00205 FENNEL_TRACE( 00206 TRACE_FINE, 00207 "leaving readStream " << stream.getName()); 00208 return false; 00209 } 00210 00211 ExecStreamBufAccessor &bufAccessor = 00212 graphImpl.getBufAccessorFromEdge(edge); 00213 00214
00215
00216
00217
00218
00219 if (bufAccessor.getState() == EXECBUF_EMPTY) { 00220 if (!emptyFound) { 00221 emptyFound = true; 00222 emptyEdge = edge; 00223 emptyStreamId = current; 00224 } 00225 continue; 00226 } 00227 00228 if (bufAccessor.getState() != skipState) { 00229 break; 00230 } 00231 assert(!(skipState == EXECBUF_UNDERFLOW && 00232 bufAccessor.getState() == EXECBUF_EOS)); 00233 } 00234 00235 if (outEdges.first == outEdges.second && emptyFound) { 00236 edge = emptyEdge; 00237 current = emptyStreamId; 00238 } else { 00239 assert(!(skipState == EXECBUF_UNDERFLOW && 00240 outEdges.first == outEdges.second)); 00241 } 00242 00243 return true; 00244 } 00245 00246 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/DfsTreeExecStreamScheduler.cpp#20 $"); 00247 00248