Fennel: ParallelExecStreamScheduler Class Reference (original) (raw)

ParallelExecStreamScheduler is a parallel implementation of the ExecStreamScheduler interface. More...

#include <[ParallelExecStreamScheduler.h](ParallelExecStreamScheduler%5F8h-source.html)>

Inheritance diagram for ParallelExecStreamScheduler:

List of all members.

Public Member Functions
ParallelExecStreamScheduler (SharedTraceTarget pTraceTarget, std::string name, ThreadTracker &threadTracker, uint degreeOfParallelism)
Constructs a new scheduler.
virtual ~ParallelExecStreamScheduler ()
virtual void addGraph (SharedExecStreamGraph pGraph)
Adds a graph to be scheduled.
virtual void removeGraph (SharedExecStreamGraph pGraph)
Removes a graph currently being scheduled.
virtual void start ()
Starts this scheduler, preparing it to execute streams.
virtual void setRunnable (ExecStream &stream, bool)
Sets whether that a specific stream should be considered for execution.
virtual void makeRunnable (ExecStream &stream)
Requests that a specific stream be considered for execution.
virtual void abort (ExecStreamGraph &graph)
Asynchronously aborts execution of any scheduled streams contained by a particular graph and prevents further scheduling.
virtual void checkAbort () const
Checks whether there is an abort request for this scheduler, and if so, throws an AbortExcn.
virtual void stop ()
Shuts down this scheduler, preventing any further streams from being scheduled.
virtual ExecStreamBufAccessor & readStream (ExecStream &stream)
Reads data from a stream, first performing any scheduling necessary to make output available.
virtual void createBufferProvisionAdapter (ExecStreamEmbryo &embryo)
Creates a new adapter stream capable of buffering the output of a stream with BUFPROV_CONSUMER for use as input to a stream with BUFPROV_PRODUCER.
virtual uint getDegreeOfParallelism ()
**Returns:**the degree of parallelism implemented by this scheduler, or 1 for a non-parallel scheduler
virtual void traceStreamBufferContents (ExecStream &stream, ExecStreamBufAccessor &bufAccessor, TraceLevel traceLevel)
Traces the contents of a stream buffer.
virtual SharedExecStreamBufAccessor newBufAccessor ()
Creates a new ExecStreamBufAccessor suitable for use with this scheduler.
virtual void createCopyProvisionAdapter (ExecStreamEmbryo &embryo)
Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER.
virtual void initTraceSource (SharedTraceTarget pTraceTarget, std::string name)
For use when initialization has to be deferred until after construction.
void trace (TraceLevel level, std::string message) const
Records a trace message.
bool isTracing () const
**Returns:**true iff tracing is enabled for this source
bool isTracingLevel (TraceLevel level) const
Determines whether a particular level is being traced.
TraceTarget & getTraceTarget () const
**Returns:**the TraceTarget for this source
SharedTraceTarget getSharedTraceTarget () const
**Returns:**the SharedTraceTarget for this source
std::string getTraceSourceName () const
Gets the name of this source.
void setTraceSourceName (std::string const &n)
Sets the name of this source.
TraceLevel getMinimumTraceLevel () const
void disableTracing ()
Protected Member Functions
ExecStreamResult executeStream (ExecStream &stream, ExecStreamQuantum const &quantum)
Executes one stream, performing tracing if enabled.
virtual void tracePreExecution (ExecStream &stream, ExecStreamQuantum const &quantum)
Traces before execution of a stream.
virtual void tracePostExecution (ExecStream &stream, ExecStreamResult rc)
Traces after execution of a stream.
virtual void traceStreamBuffers (ExecStream &stream, TraceLevel inputTupleTraceLevel, TraceLevel outputTupleTraceLevel)
Traces the states of the input and output buffers adjacent to a stream.
Protected Attributes
bool tracingFine
StrictMutex mutex
LocalCondition condition
Private Types
enum StreamState { SS_SLEEPING, SS_RUNNING, SS_INHIBITED }
enum ManagerState { MGR_RUNNING, MGR_STOPPING, MGR_STOPPED }
typedef std::hash_map< ExecStreamId, StreamStateMapEntry > StreamStateMap
typedef std::deque< ExecStreamId > InhibitedQueue
Private Member Functions
void tryExecuteManager ()
void executeManager ()
void tryExecuteTask (ExecStream &)
void executeTask (ExecStream &)
bool addToQueue (ExecStreamId streamId)
void signalSentinel (ExecStreamId sentinelId)
void retryInhibitedQueue ()
void processCompletedTask (ParallelExecResult const &task)
bool isInhibited (ExecStreamId streamId)
void alterNeighborInhibition (ExecStreamId streamId, int delta)
Private Attributes
SharedExecStreamGraph pGraph
ThreadPool< ParallelExecTask > threadPool
std::deque< ParallelExecResult > completedQueue
ThreadTracker & threadTracker
StreamStateMap streamStateMap
ManagerState mgrState
InhibitedQueue inhibitedQueue
InhibitedQueue transitQueue
LocalCondition sentinelCondition
uint degreeOfParallelism
boost::scoped_ptr< FennelExcn > pPendingExcn
Friends
class ParallelExecTask
Classes
struct StreamStateMapEntry

Detailed Description

ParallelExecStreamScheduler is a parallel implementation of the ExecStreamScheduler interface.

For more information, see the design doc in Eigenpedia.

Version:

Id

//open/dev/fennel/exec/ParallelExecStreamScheduler.h#12

Definition at line 98 of file ParallelExecStreamScheduler.h.


Member Typedef Documentation


Member Enumeration Documentation


Constructor & Destructor Documentation

ParallelExecStreamScheduler::ParallelExecStreamScheduler ( SharedTraceTarget pTraceTarget,
std::string name,
ThreadTracker & threadTracker,
uint degreeOfParallelism
) [explicit]

| ParallelExecStreamScheduler::~ParallelExecStreamScheduler | ( | | ) | [virtual] | | ---------------------------------------------------------- | - | | - | ----------- |


Member Function Documentation

| void ParallelExecStreamScheduler::tryExecuteManager | ( | | ) | [private] | | --------------------------------------------------- | - | | - | ----------- |

| void ParallelExecStreamScheduler::executeManager | ( | | ) | [private] | | ------------------------------------------------ | - | | - | ----------- |

| void ParallelExecStreamScheduler::tryExecuteTask | ( | ExecStream & | | ) | [private] | | ------------------------------------------------ | - | ------------------------------------ | | - | ----------- |

| void ParallelExecStreamScheduler::executeTask | ( | ExecStream & | | ) | [private] | | --------------------------------------------- | - | ------------------------------------ | | - | ----------- |

bool ParallelExecStreamScheduler::addToQueue ( ExecStreamId streamId ) [private]

Definition at line 424 of file ParallelExecStreamScheduler.cpp.

References alterNeighborInhibition(), ExecStreamGraphImpl::getStreamFromVertex(), inhibitedQueue, isInhibited(), pPendingExcn, SS_INHIBITED, SS_RUNNING, SS_SLEEPING, streamStateMap, ThreadPool< Task >::submitTask(), and threadPool.

Referenced by processCompletedTask(), and retryInhibitedQueue().

void ParallelExecStreamScheduler::signalSentinel ( ExecStreamId sentinelId ) [private]

| void ParallelExecStreamScheduler::retryInhibitedQueue | ( | | ) | [private] | | ----------------------------------------------------- | - | | - | ----------- |

void ParallelExecStreamScheduler::processCompletedTask ( ParallelExecResult const & task ) [private]

Definition at line 303 of file ParallelExecStreamScheduler.cpp.

References addToQueue(), alterNeighborInhibition(), EXECBUF_EMPTY, EXECBUF_UNDERFLOW, EXECRC_BUF_OVERFLOW, EXECRC_BUF_UNDERFLOW, EXECRC_EOS, EXECRC_QUANTUM_EXPIRED, ExecStreamId, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamGraphImpl::getGraphRep(), ParallelExecResult::getResultCode(), ExecStreamBufAccessor::getState(), ParallelExecResult::getStreamId(), retryInhibitedQueue(), signalSentinel(), SS_SLEEPING, and streamStateMap.

Referenced by tryExecuteManager().

bool ParallelExecStreamScheduler::isInhibited ( ExecStreamId streamId ) [inline, private]
void ParallelExecStreamScheduler::alterNeighborInhibition ( ExecStreamId streamId,
int delta
) [inline, private]

| void ParallelExecStreamScheduler::start | ( | | ) | [virtual] | | --------------------------------------- | - | | - | ----------- |

Starts this scheduler, preparing it to execute streams.

Implements ExecStreamScheduler.

Definition at line 103 of file ParallelExecStreamScheduler.cpp.

References alterNeighborInhibition(), degreeOfParallelism, ExecStreamId, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamGraphImpl::getGraphRep(), ExecStreamGraphImpl::getStreamFromVertex(), MGR_RUNNING, mgrState, pGraph, pPendingExcn, ExecStreamBufAccessor::requestProduction(), SS_SLEEPING, ThreadPoolBase::start(), streamStateMap, ThreadPool< Task >::submitTask(), threadPool, and TRACE_FINE.

void ParallelExecStreamScheduler::setRunnable ( ExecStream & stream,
bool
) [virtual]
void ParallelExecStreamScheduler::makeRunnable ( ExecStream & stream ) [virtual]
void ParallelExecStreamScheduler::abort ( ExecStreamGraph & graph ) [virtual]

| void ParallelExecStreamScheduler::checkAbort | ( | | ) | const [virtual] | | -------------------------------------------- | - | | - | ----------------- |

| void ParallelExecStreamScheduler::stop | ( | | ) | [virtual] | | -------------------------------------- | - | | - | ----------- |

Shuts down this scheduler, preventing any further streams from being scheduled.

Implements ExecStreamScheduler.

Definition at line 175 of file ParallelExecStreamScheduler.cpp.

References completedQueue, SynchMonitoredObject::condition, inhibitedQueue, MGR_STOPPED, MGR_STOPPING, mgrState, SynchMonitoredObject::mutex, pPendingExcn, sentinelCondition, ThreadPoolBase::stop(), threadPool, and TRACE_FINE.

Reads data from a stream, first performing any scheduling necessary to make output available.

Parameters:

stream the stream from which to read

Returns:

accessor for output data buffer

Implements ExecStreamScheduler.

Definition at line 258 of file ParallelExecStreamScheduler.cpp.

References completedQueue, SynchMonitoredObject::condition, EXECBUF_EMPTY, EXECBUF_UNDERFLOW, EXECRC_BUF_UNDERFLOW, ExecStreamId, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamGraphImpl::getGraphRep(), ExecStream::getName(), ExecStreamBufAccessor::getState(), ExecStreamGraphImpl::getStreamFromVertex(), ExecStream::getStreamId(), SynchMonitoredObject::mutex, pPendingExcn, ExecStreamBufAccessor::requestProduction(), sentinelCondition, SS_SLEEPING, streamStateMap, and TRACE_FINE.

void ParallelExecStreamScheduler::createBufferProvisionAdapter ( ExecStreamEmbryo & embryo ) [virtual]

| uint ParallelExecStreamScheduler::getDegreeOfParallelism | ( | | ) | [virtual] | | --------------------------------------------------------------------------------------------------------------------- | - | | - | ----------- |

void ExecStreamScheduler::traceStreamBuffers ( ExecStream & stream,
TraceLevel inputTupleTraceLevel,
TraceLevel outputTupleTraceLevel
) [protected, virtual, inherited]

Traces the states of the input and output buffers adjacent to a stream.

Parameters:

stream stream whose buffers are to be traced
inputTupleTraceLevel trace level at which tuple contents of input buffers are to be traced
outputTupleTraceLevel trace level at which tuple contents of output buffers are to be traced

Definition at line 140 of file ExecStreamScheduler.cpp.

References ExecStreamBufState_names, ExecStreamGraphImpl::getBufAccessorFromEdge(), ExecStreamBufAccessor::getConsumptionAvailable(), ExecStream::getGraph(), ExecStreamGraphImpl::getGraphRep(), ExecStreamBufAccessor::getProductionAvailable(), ExecStreamBufAccessor::getState(), ExecStream::getStreamId(), ExecStreamBufAccessor::hasPendingEOS(), TraceSource::isTracingLevel(), TRACE_FINER, and ExecStreamScheduler::traceStreamBufferContents().

Referenced by ExecStreamScheduler::tracePostExecution(), and ExecStreamScheduler::tracePreExecution().

00144 { 00145 ExecStreamGraphImpl &graphImpl = 00146 dynamic_cast<ExecStreamGraphImpl&>(stream.getGraph()); 00147 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00148 00149 ExecStreamGraphImpl::InEdgeIterPair inEdges = 00150 boost::in_edges(stream.getStreamId(),graphRep); 00151 for (uint i = 0; inEdges.first != inEdges.second; 00152 ++(inEdges.first), ++i) 00153 { 00154 ExecStreamGraphImpl::Edge edge = *(inEdges.first); 00155 ExecStreamBufAccessor &bufAccessor = 00156 graphImpl.getBufAccessorFromEdge(edge); 00157 FENNEL_TRACE( 00158 TRACE_FINER, 00159 "input buffer " << i << ": " 00160 << ExecStreamBufState_names[bufAccessor.getState()] 00161 << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "") 00162 << ", consumption available = " 00163 << bufAccessor.getConsumptionAvailable()); 00164 if (stream.isTracingLevel(inputTupleTraceLevel)) { 00165 traceStreamBufferContents( 00166 stream, bufAccessor, inputTupleTraceLevel); 00167 } 00168 } 00169 00170 ExecStreamGraphImpl::OutEdgeIterPair outEdges = 00171 boost::out_edges(stream.getStreamId(),graphRep); 00172 for (uint i = 0; outEdges.first != outEdges.second; 00173 ++(outEdges.first), ++i) { 00174 ExecStreamGraphImpl::Edge edge = *(outEdges.first); 00175 ExecStreamBufAccessor &bufAccessor = 00176 graphImpl.getBufAccessorFromEdge(edge); 00177 FENNEL_TRACE( 00178 TRACE_FINER, 00179 "output buffer " << i << ": " 00180 << ExecStreamBufState_names[bufAccessor.getState()] 00181 << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "") 00182 << ", consumption available = " 00183 << bufAccessor.getConsumptionAvailable() 00184 << ", production available = " 00185 << bufAccessor.getProductionAvailable()); 00186 if (stream.isTracingLevel(outputTupleTraceLevel)) { 00187 traceStreamBufferContents( 00188 stream, bufAccessor, outputTupleTraceLevel); 00189 } 00190 } 00191 }

void ExecStreamScheduler::createCopyProvisionAdapter ( ExecStreamEmbryo & embryo ) [virtual, inherited]

Creates a new adapter stream capable of copying the output of a stream with BUFPROV_PRODUCER into the input of a stream with BUFPROV_CONSUMER.

Default implementation is CopyExecStream. Caller is responsible for filling in generic ExecStreamParams after return.

Parameters:

embryo receives new adapter stream

Definition at line 92 of file ExecStreamScheduler.cpp.

References ExecStreamEmbryo::init().

void TraceSource::initTraceSource ( SharedTraceTarget pTraceTarget,
std::string name
) [virtual, inherited]
void TraceSource::trace ( TraceLevel level,
std::string message
) const [inherited]

| bool TraceSource::isTracing | ( | | ) | const [inline, inherited] | | --------------------------- | - | | - | --------------------------- |

bool TraceSource::isTracingLevel ( TraceLevel level ) const [inline, inherited]

| TraceTarget& TraceSource::getTraceTarget | ( | | ) | const [inline, inherited] | | ----------------------------------------------------------------- | - | | - | --------------------------- |

| std::string TraceSource::getTraceSourceName | ( | | ) | const [inline, inherited] | | ------------------------------------------- | - | | - | --------------------------- |

void TraceSource::setTraceSourceName ( std::string const & n ) [inline, inherited]

Sets the name of this source.

Useful to construct dynamic names for fine-grained filtering.

Definition at line 136 of file TraceSource.h.

00137 { 00138 name = n; 00139 }

| TraceLevel TraceSource::getMinimumTraceLevel | ( | | ) | const [inline, inherited] | | ------------------------------------------------------------------------------------------------------ | - | | - | --------------------------- |

| void TraceSource::disableTracing | ( | | ) | [inherited] | | -------------------------------- | - | | - | ------------- |



Member Data Documentation

Definition at line 38 of file SynchMonitoredObject.h.

Referenced by abort(), LogicalTxnLog::checkpoint(), Database::checkpointImpl(), CheckpointThread::closeImpl(), LogicalTxnLog::commitTxn(), executeManager(), executeTask(), LogicalTxnLog::getOldestActiveTxnId(), LogicalTxnLog::newLogicalTxn(), readStream(), SXMutex::release(), GroupLock::release(), Database::requestCheckpoint(), CheckpointThread::requestCheckpoint(), LogicalTxnLog::rollbackTxn(), TimerThread::run(), CheckpointThread::run(), ThreadPoolBase::runPooledThread(), SXMutex::setSchedulingPolicy(), TimerThread::signalImmediate(), signalSentinel(), ThreadPoolBase::start(), TimerThread::stop(), ThreadPoolBase::stop(), stop(), ThreadPool< RandomAccessRequest >::submitTask(), tryExecuteManager(), tryExecuteTask(), SXMutex::tryUpgrade(), SXMutex::waitFor(), GroupLock::waitFor(), and Database::writeStats().

Definition at line 39 of file SynchMonitoredObject.h.

Referenced by abort(), Database::checkpointImpl(), CheckpointThread::closeImpl(), LogicalTxnLog::commitTxnWithGroup(), executeTask(), readStream(), SXMutex::release(), GroupLock::release(), Database::requestCheckpoint(), CheckpointThread::requestCheckpoint(), TimerThread::run(), CheckpointThread::run(), ThreadPoolBase::runPooledThread(), TimerThread::signalImmediate(), TimerThread::stop(), ThreadPoolBase::stop(), stop(), ThreadPool< RandomAccessRequest >::submitTask(), tryExecuteManager(), tryExecuteTask(), SXMutex::waitFor(), and GroupLock::waitFor().


The documentation for this class was generated from the following files:


Generated on Mon Jun 22 04:00:40 2009 for Fennel by doxygen 1.5.1