Fennel: DfsTreeExecStreamScheduler Class Reference (original) (raw)

DfsTreeExecStreamScheduler is a reference implementation of the ExecStreamScheduler interface. More...

#include <[DfsTreeExecStreamScheduler.h](DfsTreeExecStreamScheduler%5F8h-source.html)>

Inheritance diagram for DfsTreeExecStreamScheduler:

List of all members.

Public Member Functions
DfsTreeExecStreamScheduler (SharedTraceTarget pTraceTarget, std::string name)
Constructs a new scheduler.
virtual ~DfsTreeExecStreamScheduler ()
virtual void addGraph (SharedExecStreamGraph pGraph)
Adds a graph to be scheduled.
virtual void removeGraph (SharedExecStreamGraph pGraph)
Removes a graph currently being scheduled.
virtual void start ()
Starts this scheduler, preparing it to execute streams.
virtual void setRunnable (ExecStream &stream, bool)
Sets whether that a specific stream should be considered for execution.
virtual void abort (ExecStreamGraph &graph)
Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling.
virtual void checkAbort () const
Checks whether there is an abort request for this scheduler, and if so, throws an AbortExcn.
virtual void stop ()
Shuts down this scheduler, preventing any further streams from being scheduled.
virtual ExecStreamBufAccessor & readStream (ExecStream &stream)
Reads data from a stream, first performing any scheduling necessary to make output available.
virtual void traceStreamBufferContents (ExecStream &stream, ExecStreamBufAccessor &bufAccessor, TraceLevel traceLevel)
Traces the contents of a stream buffer.
void makeRunnable (ExecStream &stream)
Requests that a specific stream be considered for execution.
virtual SharedExecStreamBufAccessor newBufAccessor ()
Creates a new ExecStreamBufAccessor suitable for use with this scheduler.
virtual void createBufferProvisionAdapter (ExecStreamEmbryo &embryo)
Creates a new adapter stream capable of buffering the output of a stream with BUFPROV_CONSUMER for use as input to a stream with BUFPROV_PRODUCER.
virtual void createCopyProvisionAdapter (ExecStreamEmbryo &embryo)
Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER.
virtual uint getDegreeOfParallelism ()
**Returns:**the degree of parallelism implemented by this scheduler, or 1 for a non-parallel scheduler
virtual void initTraceSource (SharedTraceTarget pTraceTarget, std::string name)
For use when initialization has to be deferred until after construction.
void trace (TraceLevel level, std::string message) const
Records a trace message.
bool isTracing () const
**Returns:**true iff tracing is enabled for this source
bool isTracingLevel (TraceLevel level) const
Determines whether a particular level is being traced.
TraceTarget & getTraceTarget () const
**Returns:**the TraceTarget for this source
SharedTraceTarget getSharedTraceTarget () const
**Returns:**the SharedTraceTarget for this source
std::string getTraceSourceName () const
Gets the name of this source.
void setTraceSourceName (std::string const &n)
Sets the name of this source.
TraceLevel getMinimumTraceLevel () const
void disableTracing ()
Protected Member Functions
ExecStreamResult executeStream (ExecStream &stream, ExecStreamQuantum const &quantum)
Executes one stream, performing tracing if enabled.
virtual void tracePreExecution (ExecStream &stream, ExecStreamQuantum const &quantum)
Traces before execution of a stream.
virtual void tracePostExecution (ExecStream &stream, ExecStreamResult rc)
Traces after execution of a stream.
virtual void traceStreamBuffers (ExecStream &stream, TraceLevel inputTupleTraceLevel, TraceLevel outputTupleTraceLevel)
Traces the states of the input and output buffers adjacent to a stream.
Protected Attributes
bool tracingFine
Private Member Functions
bool findNextConsumer (ExecStreamGraphImpl &graphImpl, const ExecStreamGraphImpl::GraphRep &graphRep, const ExecStream &stream, ExecStreamGraphImpl::Edge &edge, ExecStreamId &current, ExecStreamBufState skipState)
Finds the next consumer to execute for a given producer.
Private Attributes
volatile bool aborted
SharedExecStreamGraph pGraph

Detailed Description

DfsTreeExecStreamScheduler is a reference implementation of the ExecStreamScheduler interface.

See SchedulerDesign for more details.

Version:

Id

//open/dev/fennel/exec/DfsTreeExecStreamScheduler.h#16

Definition at line 42 of file DfsTreeExecStreamScheduler.h.


Constructor & Destructor Documentation

DfsTreeExecStreamScheduler::DfsTreeExecStreamScheduler ( SharedTraceTarget pTraceTarget,
std::string name
) [explicit]

Constructs a new scheduler.

Parameters:

pTraceTarget the TraceTarget to which messages will be sent, or NULL to disable tracing entirely
name the name to use for tracing this scheduler

Definition at line 33 of file DfsTreeExecStreamScheduler.cpp.

| DfsTreeExecStreamScheduler::~DfsTreeExecStreamScheduler | ( | | ) | [virtual] | | -------------------------------------------------------- | - | | - | ----------- |


Member Function Documentation

Finds the next consumer to execute for a given producer.

Parameters:

graphImpl current stream graph
graphRep graph representation of current stream graph
stream currrent execution stream
edge returns edge to consumer to execute next
current returns id of consumer to execute next
skipState state to skip when looking for next consumer

Returns:

false if reached sink vertex, else true

Definition at line 183 of file DfsTreeExecStreamScheduler.cpp.

References EXECBUF_EMPTY, EXECBUF_EOS, EXECBUF_UNDERFLOW, ExecStreamId, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStream::getName(), ExecStreamBufAccessor::getState(), ExecStreamGraphImpl::getStreamFromVertex(), and TRACE_FINE.

Referenced by readStream().

00190 { 00191 ExecStreamGraphImpl::OutEdgeIterPair outEdges = 00192 boost::out_edges(current,graphRep); 00193 00194 bool emptyFound = false; 00195
00196 ExecStreamGraphImpl::Edge emptyEdge = edge; 00197 ExecStreamId emptyStreamId = current; 00198 00199 for (; outEdges.first != outEdges.second; ++(outEdges.first)) { 00200 edge = *(outEdges.first); 00201 current = boost::target(edge,graphRep); 00202 if (boost::out_degree(current,graphRep) == 0) { 00203
00204 assert(!graphImpl.getStreamFromVertex(current)); 00205 FENNEL_TRACE( 00206 TRACE_FINE, 00207 "leaving readStream " << stream.getName()); 00208 return false; 00209 } 00210 00211 ExecStreamBufAccessor &bufAccessor = 00212 graphImpl.getBufAccessorFromEdge(edge); 00213 00214
00215
00216
00217
00218
00219 if (bufAccessor.getState() == EXECBUF_EMPTY) { 00220 if (!emptyFound) { 00221 emptyFound = true; 00222 emptyEdge = edge; 00223 emptyStreamId = current; 00224 } 00225 continue; 00226 } 00227 00228 if (bufAccessor.getState() != skipState) { 00229 break; 00230 } 00231 assert(!(skipState == EXECBUF_UNDERFLOW && 00232 bufAccessor.getState() == EXECBUF_EOS)); 00233 } 00234 00235 if (outEdges.first == outEdges.second && emptyFound) { 00236 edge = emptyEdge; 00237 current = emptyStreamId; 00238 } else { 00239 assert(!(skipState == EXECBUF_UNDERFLOW && 00240 outEdges.first == outEdges.second)); 00241 } 00242 00243 return true; 00244 }

| void DfsTreeExecStreamScheduler::start | ( | | ) | [virtual] | | -------------------------------------- | - | | - | ----------- |

void DfsTreeExecStreamScheduler::setRunnable ( ExecStream & stream,
bool
) [virtual]
void DfsTreeExecStreamScheduler::abort ( ExecStreamGraph & graph ) [virtual]

Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling.

Returns immediately, not waiting for abort request to be fully processed.

Parameters:

graph graph to abort; must be one of the graphs associated with this scheduler

Implements ExecStreamScheduler.

Definition at line 81 of file DfsTreeExecStreamScheduler.cpp.

References aborted, and TRACE_FINE.

00082 { 00083 FENNEL_TRACE(TRACE_FINE,"abort requested"); 00084 00085 aborted = true; 00086 }

| void DfsTreeExecStreamScheduler::checkAbort | ( | | ) | const [virtual] | | ------------------------------------------- | - | | - | ----------------- |

| void DfsTreeExecStreamScheduler::stop | ( | | ) | [virtual] | | ------------------------------------- | - | | - | ----------- |

Reads data from a stream, first performing any scheduling necessary to make output available.

Parameters:

stream the stream from which to read

Returns:

accessor for output data buffer

Implements ExecStreamScheduler.

Definition at line 104 of file DfsTreeExecStreamScheduler.cpp.

References checkAbort(), EXECBUF_EOS, EXECBUF_UNDERFLOW, EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_QUANTUM_EXPIRED, ExecStreamId, ExecStreamScheduler::executeStream(), findNextConsumer(), ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamGraphImpl::getGraphRep(), ExecStream::getName(), ExecStreamGraphImpl::getStreamFromVertex(), ExecStream::getStreamId(), and TRACE_FINE.

00106 { 00107 FENNEL_TRACE( 00108 TRACE_FINE, 00109 "entering readStream " << stream.getName()); 00110 00111 ExecStreamId current = stream.getStreamId(); 00112 ExecStreamQuantum quantum; 00113 00114 ExecStreamGraphImpl &graphImpl = 00115 dynamic_cast<ExecStreamGraphImpl&>(*pGraph); 00116 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00117 00118
00119 assert(boost::out_degree(current,graphRep) == 1); 00120 assert(!graphImpl.getStreamFromVertex( 00121 boost::target( 00122 *(boost::out_edges(current,graphRep).first), 00123 graphRep))); 00124 00125
00126 00127 for (;;) { 00128 ExecStreamGraphImpl::InEdgeIterPair inEdges = 00129 boost::in_edges(current,graphRep); 00130 for (; inEdges.first != inEdges.second; ++(inEdges.first)) { 00131 ExecStreamGraphImpl::Edge edge = *(inEdges.first); 00132 ExecStreamBufAccessor &bufAccessor = 00133 graphImpl.getBufAccessorFromEdge(edge); 00134 if (bufAccessor.getState() == EXECBUF_UNDERFLOW) { 00135
00136 current = boost::source(edge,graphRep); 00137 break; 00138 } 00139 } 00140 if (inEdges.first != inEdges.second) { 00141
00142 continue; 00143 } 00144 00145 SharedExecStream pStream = graphImpl.getStreamFromVertex(current); 00146 ExecStreamResult rc = executeStream(*pStream, quantum); 00147 00148 checkAbort(); 00149 00150 ExecStreamGraphImpl::Edge edge; 00151 00152 switch (rc) { 00153 case EXECRC_EOS: 00154
00155 if (findNextConsumer( 00156 graphImpl, graphRep, stream, edge, current, EXECBUF_EOS)) 00157 { 00158 return graphImpl.getBufAccessorFromEdge(edge); 00159 } 00160
00161 break; 00162 case EXECRC_BUF_OVERFLOW: 00163
00164
00165 if (findNextConsumer( 00166 graphImpl, graphRep, stream, edge, current, EXECBUF_UNDERFLOW)) 00167 { 00168 return graphImpl.getBufAccessorFromEdge(edge); 00169 } 00170 break; 00171 case EXECRC_BUF_UNDERFLOW: 00172
00173
00174 break; 00175 case EXECRC_QUANTUM_EXPIRED: 00176 break; 00177 default: 00178 permAssert(false); 00179 } 00180 } 00181 }

void ExecStreamScheduler::traceStreamBuffers ( ExecStream & stream,
TraceLevel inputTupleTraceLevel,
TraceLevel outputTupleTraceLevel
) [protected, virtual, inherited]

Traces the states of the input and output buffers adjacent to a stream.

Parameters:

stream stream whose buffers are to be traced
inputTupleTraceLevel trace level at which tuple contents of input buffers are to be traced
outputTupleTraceLevel trace level at which tuple contents of output buffers are to be traced

Definition at line 140 of file ExecStreamScheduler.cpp.

References ExecStreamBufState_names, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamBufAccessor::getConsumptionAvailable(), ExecStream::getGraph(), ExecStreamGraphImpl::getGraphRep(), ExecStreamBufAccessor::getProductionAvailable(), ExecStreamBufAccessor::getState(), ExecStream::getStreamId(), ExecStreamBufAccessor::hasPendingEOS(), TraceSource::isTracingLevel(), TRACE_FINER, and ExecStreamScheduler::traceStreamBufferContents().

Referenced by ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().

00144 { 00145 ExecStreamGraphImpl &graphImpl = 00146 dynamic_cast<ExecStreamGraphImpl&>(stream.getGraph()); 00147 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00148 00149 ExecStreamGraphImpl::InEdgeIterPair inEdges = 00150 boost::in_edges(stream.getStreamId(),graphRep); 00151 for (uint i = 0; inEdges.first != inEdges.second; 00152 ++(inEdges.first), ++i) 00153 { 00154 ExecStreamGraphImpl::Edge edge = *(inEdges.first); 00155 ExecStreamBufAccessor &bufAccessor = 00156 graphImpl.getBufAccessorFromEdge(edge); 00157 FENNEL_TRACE( 00158 TRACE_FINER, 00159 "input buffer " << i << ": " 00160 << ExecStreamBufState_names[bufAccessor.getState()] 00161 << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "") 00162 << ", consumption available = " 00163 << bufAccessor.getConsumptionAvailable()); 00164 if (stream.isTracingLevel(inputTupleTraceLevel)) { 00165 traceStreamBufferContents( 00166 stream, bufAccessor, inputTupleTraceLevel); 00167 } 00168 } 00169 00170 ExecStreamGraphImpl::OutEdgeIterPair outEdges = 00171 boost::out_edges(stream.getStreamId(),graphRep); 00172 for (uint i = 0; outEdges.first != outEdges.second; 00173 ++(outEdges.first), ++i) { 00174 ExecStreamGraphImpl::Edge edge = *(outEdges.first); 00175 ExecStreamBufAccessor &bufAccessor = 00176 graphImpl.getBufAccessorFromEdge(edge); 00177 FENNEL_TRACE( 00178 TRACE_FINER, 00179 "output buffer " << i << ": " 00180 << ExecStreamBufState_names[bufAccessor.getState()] 00181 << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "") 00182 << ", consumption available = " 00183 << bufAccessor.getConsumptionAvailable() 00184 << ", production available = " 00185 << bufAccessor.getProductionAvailable()); 00186 if (stream.isTracingLevel(outputTupleTraceLevel)) { 00187 traceStreamBufferContents( 00188 stream, bufAccessor, outputTupleTraceLevel); 00189 } 00190 } 00191 }

void ExecStreamScheduler::makeRunnable ( ExecStream & stream ) [inline, inherited]
void ExecStreamScheduler::createBufferProvisionAdapter ( ExecStreamEmbryo & embryo ) [virtual, inherited]
void ExecStreamScheduler::createCopyProvisionAdapter ( ExecStreamEmbryo & embryo ) [virtual, inherited]

Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER.

Default implementation is CopyExecStream. Caller is responsible for filling in generic ExecStreamParams after return.

Parameters:

embryo receives new adapter stream

Definition at line 92 of file ExecStreamScheduler.cpp.

References ExecStreamEmbryo::init().

| uint ExecStreamScheduler::getDegreeOfParallelism | ( | | ) | [virtual, inherited] | | ------------------------------------------------------------------------------------------------------------- | - | | - | ---------------------- |

void TraceSource::initTraceSource ( SharedTraceTarget pTraceTarget,
std::string name
) [virtual, inherited]
void TraceSource::trace ( TraceLevel level,
std::string message
) const [inherited]

| bool TraceSource::isTracing | ( | | ) | const [inline, inherited] | | --------------------------- | - | | - | --------------------------- |

bool TraceSource::isTracingLevel ( TraceLevel level ) const [inline, inherited]

| TraceTarget& TraceSource::getTraceTarget | ( | | ) | const [inline, inherited] | | ----------------------------------------------------------------- | - | | - | --------------------------- |

| std::string TraceSource::getTraceSourceName | ( | | ) | const [inline, inherited] | | ------------------------------------------- | - | | - | --------------------------- |

void TraceSource::setTraceSourceName ( std::string const & n ) [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 136 of file TraceSource.h.

00137 { 00138 name = n; 00139 }

| TraceLevel TraceSource::getMinimumTraceLevel | ( | | ) | const [inline, inherited] | | ------------------------------------------------------------------------------------------------------ | - | | - | --------------------------- |

| void TraceSource::disableTracing | ( | | ) | [inherited] | | -------------------------------- | - | | - | ------------- |


Member Data Documentation


The documentation for this class was generated from the following files:


Generated on Mon Jun 22 04:00:30 2009 for Fennel by doxygen 1.5.1