Fennel: DfsTreeExecStreamScheduler Class Reference (original) (raw)
DfsTreeExecStreamScheduler is a reference implementation of the ExecStreamScheduler interface. More...
#include <[DfsTreeExecStreamScheduler.h](DfsTreeExecStreamScheduler%5F8h-source.html)>
Inheritance diagram for DfsTreeExecStreamScheduler:

| Public Member Functions | |
|---|---|
| DfsTreeExecStreamScheduler (SharedTraceTarget pTraceTarget, std::string name) | |
| Constructs a new scheduler. | |
| virtual | ~DfsTreeExecStreamScheduler () |
| virtual void | addGraph (SharedExecStreamGraph pGraph) |
| Adds a graph to be scheduled. | |
| virtual void | removeGraph (SharedExecStreamGraph pGraph) |
| Removes a graph currently being scheduled. | |
| virtual void | start () |
| Starts this scheduler, preparing it to execute streams. | |
| virtual void | setRunnable (ExecStream &stream, bool) |
| Sets whether that a specific stream should be considered for execution. | |
| virtual void | abort (ExecStreamGraph &graph) |
| Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling. | |
| virtual void | checkAbort () const |
| Checks whether there is an abort request for this scheduler, and if so, throws an AbortExcn. | |
| virtual void | stop () |
| Shuts down this scheduler, preventing any further streams from being scheduled. | |
| virtual ExecStreamBufAccessor & | readStream (ExecStream &stream) |
| Reads data from a stream, first performing any scheduling necessary to make output available. | |
| virtual void | traceStreamBufferContents (ExecStream &stream, ExecStreamBufAccessor &bufAccessor, TraceLevel traceLevel) |
| Traces the contents of a stream buffer. | |
| void | makeRunnable (ExecStream &stream) |
| Requests that a specific stream be considered for execution. | |
| virtual SharedExecStreamBufAccessor | newBufAccessor () |
| Creates a new ExecStreamBufAccessor suitable for use with this scheduler. | |
| virtual void | createBufferProvisionAdapter (ExecStreamEmbryo &embryo) |
| Creates a new adapter stream capable of buffering the output of a stream with BUFPROV_CONSUMER for use as input to a stream with BUFPROV_PRODUCER. | |
| virtual void | createCopyProvisionAdapter (ExecStreamEmbryo &embryo) |
| Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER. | |
| virtual uint | getDegreeOfParallelism () |
| **Returns:**the degree of parallelism implemented by this scheduler, or 1 for a non-parallel scheduler | |
| virtual void | initTraceSource (SharedTraceTarget pTraceTarget, std::string name) |
| For use when initialization has to be deferred until after construction. | |
| void | trace (TraceLevel level, std::string message) const |
| Records a trace message. | |
| bool | isTracing () const |
| **Returns:**true iff tracing is enabled for this source | |
| bool | isTracingLevel (TraceLevel level) const |
| Determines whether a particular level is being traced. | |
| TraceTarget & | getTraceTarget () const |
| **Returns:**the TraceTarget for this source | |
| SharedTraceTarget | getSharedTraceTarget () const |
| **Returns:**the SharedTraceTarget for this source | |
| std::string | getTraceSourceName () const |
| Gets the name of this source. | |
| void | setTraceSourceName (std::string const &n) |
| Sets the name of this source. | |
| TraceLevel | getMinimumTraceLevel () const |
| void | disableTracing () |
| Protected Member Functions | |
| ExecStreamResult | executeStream (ExecStream &stream, ExecStreamQuantum const &quantum) |
| Executes one stream, performing tracing if enabled. | |
| virtual void | tracePreExecution (ExecStream &stream, ExecStreamQuantum const &quantum) |
| Traces before execution of a stream. | |
| virtual void | tracePostExecution (ExecStream &stream, ExecStreamResult rc) |
| Traces after execution of a stream. | |
| virtual void | traceStreamBuffers (ExecStream &stream, TraceLevel inputTupleTraceLevel, TraceLevel outputTupleTraceLevel) |
| Traces the states of the input and output buffers adjacent to a stream. | |
| Protected Attributes | |
| bool | tracingFine |
| Private Member Functions | |
| bool | findNextConsumer (ExecStreamGraphImpl &graphImpl, const ExecStreamGraphImpl::GraphRep &graphRep, const ExecStream &stream, ExecStreamGraphImpl::Edge &edge, ExecStreamId ¤t, ExecStreamBufState skipState) |
| Finds the next consumer to execute for a given producer. | |
| Private Attributes | |
| volatile bool | aborted |
| SharedExecStreamGraph | pGraph |
Detailed Description
DfsTreeExecStreamScheduler is a reference implementation of the ExecStreamScheduler interface.
See SchedulerDesign for more details.
Version:
Id
//open/dev/fennel/exec/DfsTreeExecStreamScheduler.h#16
Definition at line 42 of file DfsTreeExecStreamScheduler.h.
Constructor & Destructor Documentation
| DfsTreeExecStreamScheduler::DfsTreeExecStreamScheduler | ( | SharedTraceTarget | pTraceTarget, |
|---|---|---|---|
| std::string | name | ||
| ) | [explicit] |
Constructs a new scheduler.
Parameters:
| pTraceTarget | the TraceTarget to which messages will be sent, or NULL to disable tracing entirely |
|---|---|
| name | the name to use for tracing this scheduler |
Definition at line 33 of file DfsTreeExecStreamScheduler.cpp.
| DfsTreeExecStreamScheduler::~DfsTreeExecStreamScheduler | ( | | ) | [virtual] | | -------------------------------------------------------- | - | | - | ----------- |
Member Function Documentation
Finds the next consumer to execute for a given producer.
Parameters:
| graphImpl | current stream graph |
|---|---|
| graphRep | graph representation of current stream graph |
| stream | currrent execution stream |
| edge | returns edge to consumer to execute next |
| current | returns id of consumer to execute next |
| skipState | state to skip when looking for next consumer |
Returns:
false if reached sink vertex, else true
Definition at line 183 of file DfsTreeExecStreamScheduler.cpp.
References EXECBUF_EMPTY, EXECBUF_EOS, EXECBUF_UNDERFLOW, ExecStreamId, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStream::getName(), ExecStreamBufAccessor::getState(), ExecStreamGraphImpl::getStreamFromVertex(), and TRACE_FINE.
Referenced by readStream().
00190 {
00191 ExecStreamGraphImpl::OutEdgeIterPair outEdges =
00192 boost::out_edges(current,graphRep);
00193
00194 bool emptyFound = false;
00195
00196 ExecStreamGraphImpl::Edge emptyEdge = edge;
00197 ExecStreamId emptyStreamId = current;
00198
00199 for (; outEdges.first != outEdges.second; ++(outEdges.first)) {
00200 edge = *(outEdges.first);
00201 current = boost::target(edge,graphRep);
00202 if (boost::out_degree(current,graphRep) == 0) {
00203
00204 assert(!graphImpl.getStreamFromVertex(current));
00205 FENNEL_TRACE(
00206 TRACE_FINE,
00207 "leaving readStream " << stream.getName());
00208 return false;
00209 }
00210
00211 ExecStreamBufAccessor &bufAccessor =
00212 graphImpl.getBufAccessorFromEdge(edge);
00213
00214
00215
00216
00217
00218
00219 if (bufAccessor.getState() == EXECBUF_EMPTY) {
00220 if (!emptyFound) {
00221 emptyFound = true;
00222 emptyEdge = edge;
00223 emptyStreamId = current;
00224 }
00225 continue;
00226 }
00227
00228 if (bufAccessor.getState() != skipState) {
00229 break;
00230 }
00231 assert(!(skipState == EXECBUF_UNDERFLOW &&
00232 bufAccessor.getState() == EXECBUF_EOS));
00233 }
00234
00235 if (outEdges.first == outEdges.second && emptyFound) {
00236 edge = emptyEdge;
00237 current = emptyStreamId;
00238 } else {
00239 assert(!(skipState == EXECBUF_UNDERFLOW &&
00240 outEdges.first == outEdges.second));
00241 }
00242
00243 return true;
00244 }
| void DfsTreeExecStreamScheduler::start | ( | | ) | [virtual] | | -------------------------------------- | - | | - | ----------- |
| void DfsTreeExecStreamScheduler::setRunnable | ( | ExecStream & | stream, |
|---|---|---|---|
| bool | |||
| ) | [virtual] |
| void DfsTreeExecStreamScheduler::abort | ( | ExecStreamGraph & | graph | ) | [virtual] |
|---|
Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling.
Returns immediately, not waiting for abort request to be fully processed.
Parameters:
| graph | graph to abort; must be one of the graphs associated with this scheduler |
|---|
Implements ExecStreamScheduler.
Definition at line 81 of file DfsTreeExecStreamScheduler.cpp.
References aborted, and TRACE_FINE.
00082 { 00083 FENNEL_TRACE(TRACE_FINE,"abort requested"); 00084 00085 aborted = true; 00086 }
| void DfsTreeExecStreamScheduler::checkAbort | ( | | ) | const [virtual] | | ------------------------------------------- | - | | - | ----------------- |
| void DfsTreeExecStreamScheduler::stop | ( | | ) | [virtual] | | ------------------------------------- | - | | - | ----------- |
Reads data from a stream, first performing any scheduling necessary to make output available.
Parameters:
| stream | the stream from which to read |
|---|
Returns:
accessor for output data buffer
Implements ExecStreamScheduler.
Definition at line 104 of file DfsTreeExecStreamScheduler.cpp.
References checkAbort(), EXECBUF_EOS, EXECBUF_UNDERFLOW, EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_QUANTUM_EXPIRED, ExecStreamId, ExecStreamScheduler::executeStream(), findNextConsumer(), ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamGraphImpl::getGraphRep(), ExecStream::getName(), ExecStreamGraphImpl::getStreamFromVertex(), ExecStream::getStreamId(), and TRACE_FINE.
00106 {
00107 FENNEL_TRACE(
00108 TRACE_FINE,
00109 "entering readStream " << stream.getName());
00110
00111 ExecStreamId current = stream.getStreamId();
00112 ExecStreamQuantum quantum;
00113
00114 ExecStreamGraphImpl &graphImpl =
00115 dynamic_cast<ExecStreamGraphImpl&>(*pGraph);
00116 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep();
00117
00118
00119 assert(boost::out_degree(current,graphRep) == 1);
00120 assert(!graphImpl.getStreamFromVertex(
00121 boost::target(
00122 *(boost::out_edges(current,graphRep).first),
00123 graphRep)));
00124
00125
00126
00127 for (;;) {
00128 ExecStreamGraphImpl::InEdgeIterPair inEdges =
00129 boost::in_edges(current,graphRep);
00130 for (; inEdges.first != inEdges.second; ++(inEdges.first)) {
00131 ExecStreamGraphImpl::Edge edge = *(inEdges.first);
00132 ExecStreamBufAccessor &bufAccessor =
00133 graphImpl.getBufAccessorFromEdge(edge);
00134 if (bufAccessor.getState() == EXECBUF_UNDERFLOW) {
00135
00136 current = boost::source(edge,graphRep);
00137 break;
00138 }
00139 }
00140 if (inEdges.first != inEdges.second) {
00141
00142 continue;
00143 }
00144
00145 SharedExecStream pStream = graphImpl.getStreamFromVertex(current);
00146 ExecStreamResult rc = executeStream(*pStream, quantum);
00147
00148 checkAbort();
00149
00150 ExecStreamGraphImpl::Edge edge;
00151
00152 switch (rc) {
00153 case EXECRC_EOS:
00154
00155 if ((
00156 graphImpl, graphRep, stream, edge, current, EXECBUF_EOS))
00157 {
00158 return graphImpl.getBufAccessorFromEdge(edge);
00159 }
00160
00161 break;
00162 case EXECRC_BUF_OVERFLOW:
00163
00164
00165 if ((
00166 graphImpl, graphRep, stream, edge, current, EXECBUF_UNDERFLOW))
00167 {
00168 return graphImpl.getBufAccessorFromEdge(edge);
00169 }
00170 break;
00171 case EXECRC_BUF_UNDERFLOW:
00172
00173
00174 break;
00175 case EXECRC_QUANTUM_EXPIRED:
00176 break;
00177 default:
00178 permAssert(false);
00179 }
00180 }
00181 }
| void ExecStreamScheduler::traceStreamBuffers | ( | ExecStream & | stream, |
|---|---|---|---|
| TraceLevel | inputTupleTraceLevel, | ||
| TraceLevel | outputTupleTraceLevel | ||
| ) | [protected, virtual, inherited] |
Traces the states of the input and output buffers adjacent to a stream.
Parameters:
| stream | stream whose buffers are to be traced |
|---|---|
| inputTupleTraceLevel | trace level at which tuple contents of input buffers are to be traced |
| outputTupleTraceLevel | trace level at which tuple contents of output buffers are to be traced |
Definition at line 140 of file ExecStreamScheduler.cpp.
References ExecStreamBufState_names, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamBufAccessor::getConsumptionAvailable(), ExecStream::getGraph(), ExecStreamGraphImpl::getGraphRep(), ExecStreamBufAccessor::getProductionAvailable(), ExecStreamBufAccessor::getState(), ExecStream::getStreamId(), ExecStreamBufAccessor::hasPendingEOS(), TraceSource::isTracingLevel(), TRACE_FINER, and ExecStreamScheduler::traceStreamBufferContents().
Referenced by ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().
00144 { 00145 ExecStreamGraphImpl &graphImpl = 00146 dynamic_cast<ExecStreamGraphImpl&>(stream.getGraph()); 00147 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00148 00149 ExecStreamGraphImpl::InEdgeIterPair inEdges = 00150 boost::in_edges(stream.getStreamId(),graphRep); 00151 for (uint i = 0; inEdges.first != inEdges.second; 00152 ++(inEdges.first), ++i) 00153 { 00154 ExecStreamGraphImpl::Edge edge = *(inEdges.first); 00155 ExecStreamBufAccessor &bufAccessor = 00156 graphImpl.getBufAccessorFromEdge(edge); 00157 FENNEL_TRACE( 00158 TRACE_FINER, 00159 "input buffer " << i << ": " 00160 << ExecStreamBufState_names[bufAccessor.getState()] 00161 << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "") 00162 << ", consumption available = " 00163 << bufAccessor.getConsumptionAvailable()); 00164 if (stream.isTracingLevel(inputTupleTraceLevel)) { 00165 traceStreamBufferContents( 00166 stream, bufAccessor, inputTupleTraceLevel); 00167 } 00168 } 00169 00170 ExecStreamGraphImpl::OutEdgeIterPair outEdges = 00171 boost::out_edges(stream.getStreamId(),graphRep); 00172 for (uint i = 0; outEdges.first != outEdges.second; 00173 ++(outEdges.first), ++i) { 00174 ExecStreamGraphImpl::Edge edge = *(outEdges.first); 00175 ExecStreamBufAccessor &bufAccessor = 00176 graphImpl.getBufAccessorFromEdge(edge); 00177 FENNEL_TRACE( 00178 TRACE_FINER, 00179 "output buffer " << i << ": " 00180 << ExecStreamBufState_names[bufAccessor.getState()] 00181 << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "") 00182 << ", consumption available = " 00183 << bufAccessor.getConsumptionAvailable() 00184 << ", production available = " 00185 << bufAccessor.getProductionAvailable()); 00186 if (stream.isTracingLevel(outputTupleTraceLevel)) { 00187 traceStreamBufferContents( 00188 stream, bufAccessor, outputTupleTraceLevel); 00189 } 00190 } 00191 }
| void ExecStreamScheduler::makeRunnable | ( | ExecStream & | stream | ) | [inline, inherited] |
|---|
| void ExecStreamScheduler::createBufferProvisionAdapter | ( | ExecStreamEmbryo & | embryo | ) | [virtual, inherited] |
|---|
| void ExecStreamScheduler::createCopyProvisionAdapter | ( | ExecStreamEmbryo & | embryo | ) | [virtual, inherited] |
|---|
Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER.
Default implementation is CopyExecStream. Caller is responsible for filling in generic ExecStreamParams after return.
Parameters:
| embryo | receives new adapter stream |
|---|
Definition at line 92 of file ExecStreamScheduler.cpp.
References ExecStreamEmbryo::init().
| uint ExecStreamScheduler::getDegreeOfParallelism | ( | | ) | [virtual, inherited] | | ------------------------------------------------------------------------------------------------------------- | - | | - | ---------------------- |
| void TraceSource::initTraceSource | ( | SharedTraceTarget | pTraceTarget, |
|---|---|---|---|
| std::string | name | ||
| ) | [virtual, inherited] |
| void TraceSource::trace | ( | TraceLevel | level, |
|---|---|---|---|
| std::string | message | ||
| ) | const [inherited] |
| bool TraceSource::isTracing | ( | | ) | const [inline, inherited] | | --------------------------- | - | | - | --------------------------- |
| bool TraceSource::isTracingLevel | ( | TraceLevel | level | ) | const [inline, inherited] |
|---|
| TraceTarget& TraceSource::getTraceTarget | ( | | ) | const [inline, inherited] | | ----------------------------------------------------------------- | - | | - | --------------------------- |
| std::string TraceSource::getTraceSourceName | ( | | ) | const [inline, inherited] | | ------------------------------------------- | - | | - | --------------------------- |
| void TraceSource::setTraceSourceName | ( | std::string const & | n | ) | [inline, inherited] |
|---|
Sets the name of this source.
Useful to construct dynamic names for fine-grained filtering.
Definition at line 136 of file TraceSource.h.
00137 { 00138 name = n; 00139 }
| TraceLevel TraceSource::getMinimumTraceLevel | ( | | ) | const [inline, inherited] | | ------------------------------------------------------------------------------------------------------ | - | | - | --------------------------- |
| void TraceSource::disableTracing | ( | | ) | [inherited] | | -------------------------------- | - | | - | ------------- |
Member Data Documentation
The documentation for this class was generated from the following files:
- /home/pub/open/dev/fennel/exec/DfsTreeExecStreamScheduler.h
- /home/pub/open/dev/fennel/exec/DfsTreeExecStreamScheduler.cpp
