Fennel: /home/pub/open/dev/fennel/exec/ExecStreamScheduler.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/exec/ExecStreamScheduler.h" 00026 #include "fennel/exec/ExecStreamGraphImpl.h" 00027 #include "fennel/exec/ExecStreamBufAccessor.h" 00028 #include "fennel/exec/CopyExecStream.h" 00029 #include "fennel/exec/ScratchBufferExecStream.h" 00030 #include "fennel/exec/ExecStreamEmbryo.h" 00031 #include "fennel/tuple/TupleData.h" 00032 #include "fennel/tuple/TuplePrinter.h" 00033 00034 #include 00035 00036 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamScheduler.cpp#17 $"); 00037 00038 ExecStreamScheduler::ExecStreamScheduler( 00039 SharedTraceTarget pTraceTargetInit, 00040 std::string nameInit) 00041 : TraceSource(pTraceTargetInit, nameInit) 00042 { 00043 tracingFine = isTracingLevel(TRACE_FINE); 00044 } 00045 00046 ExecStreamScheduler::~ExecStreamScheduler() 00047 { 00048 } 00049 00050 void ExecStreamScheduler::addGraph(SharedExecStreamGraph pGraph) 00051 { 00052 assert(!pGraph->pScheduler); 00053 pGraph->pScheduler = this; 00054 00055 if (tracingFine) { 00056 std::string dotFileName; 00057 const char *fennelHome = getenv("FENNEL_HOME"); 00058 if (fennelHome) { 00059 dotFileName += fennelHome; 00060 dotFileName += "/trace/"; 00061 } 00062 dotFileName += "ExecStreamGraph.dot"; 00063 std::ofstream dotStream(dotFileName.c_str()); 00064 pGraph->renderGraphviz(dotStream); 00065 } 00066 00067
00068
00069 std::vector streams = pGraph->getSortedStreams(); 00070 for (uint i = 0; i < streams.size(); ++i) { 00071 if (streams[i]->isTracingLevel(TRACE_FINE)) { 00072 tracingFine = true; 00073 return; 00074 } 00075 } 00076 } 00077 00078 SharedExecStreamBufAccessor ExecStreamScheduler::newBufAccessor() 00079 { 00080 return SharedExecStreamBufAccessor(new ExecStreamBufAccessor()); 00081 } 00082 00083 void ExecStreamScheduler::createBufferProvisionAdapter( 00084 ExecStreamEmbryo &embryo) 00085 { 00086 ScratchBufferExecStreamParams adapterParams; 00087 embryo.init( 00088 new ScratchBufferExecStream(), 00089 adapterParams); 00090 } 00091 00092 void ExecStreamScheduler::createCopyProvisionAdapter( 00093 ExecStreamEmbryo &embryo) 00094 { 00095 CopyExecStreamParams adapterParams; 00096 embryo.init( 00097 new CopyExecStream(), 00098 adapterParams); 00099 } 00100 00101 void ExecStreamScheduler::removeGraph(SharedExecStreamGraph pGraph) 00102 { 00103 assert(pGraph->pScheduler == this); 00104 pGraph->pScheduler = NULL; 00105 } 00106 00107 00108 00109 00110 00111 00112 void ExecStreamScheduler::tracePreExecution( 00113 ExecStream &stream, 00114 ExecStreamQuantum const &quantum) 00115 { 00116 FENNEL_TRACE( 00117 TRACE_FINE, 00118 "executing " << stream.getStreamId() << ' ' << stream.getName()); 00119 if (isMAXU(quantum.nTuplesMax)) { 00120 FENNEL_TRACE( 00121 TRACE_FINE, 00122 "nTuplesMax = " << quantum.nTuplesMax); 00123 } 00124 00125 traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINEST); 00126 } 00127 00128 void ExecStreamScheduler::tracePostExecution( 00129 ExecStream &stream, 00130 ExecStreamResult rc) 00131 { 00132 FENNEL_TRACE( 00133 TRACE_FINE, 00134 "executed " << stream.getStreamId() << ' ' << stream.getName() 00135 << " with result " << ExecStreamResult_names[rc]); 00136 00137 traceStreamBuffers(stream, TRACE_FINEST, TRACE_FINER); 00138 } 00139 00140 void ExecStreamScheduler::traceStreamBuffers( 00141 ExecStream &stream, 00142 TraceLevel inputTupleTraceLevel, 00143 TraceLevel outputTupleTraceLevel) 00144 { 00145 ExecStreamGraphImpl &graphImpl = 00146 dynamic_cast<ExecStreamGraphImpl&>(stream.getGraph()); 00147 ExecStreamGraphImpl::GraphRep const &graphRep = graphImpl.getGraphRep(); 00148 00149 ExecStreamGraphImpl::InEdgeIterPair inEdges = 00150 boost::in_edges(stream.getStreamId(),graphRep); 00151 for (uint i = 0; inEdges.first != inEdges.second; 00152 ++(inEdges.first), ++i) 00153 { 00154 ExecStreamGraphImpl::Edge edge = *(inEdges.first); 00155 ExecStreamBufAccessor &bufAccessor = 00156 graphImpl.getBufAccessorFromEdge(edge); 00157 FENNEL_TRACE( 00158 TRACE_FINER, 00159 "input buffer " << i << ": " 00160 << ExecStreamBufState_names[bufAccessor.getState()] 00161 << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "") 00162 << ", consumption available = " 00163 << bufAccessor.getConsumptionAvailable()); 00164 if (stream.isTracingLevel(inputTupleTraceLevel)) { 00165 traceStreamBufferContents( 00166 stream, bufAccessor, inputTupleTraceLevel); 00167 } 00168 } 00169 00170 ExecStreamGraphImpl::OutEdgeIterPair outEdges = 00171 boost::out_edges(stream.getStreamId(),graphRep); 00172 for (uint i = 0; outEdges.first != outEdges.second; 00173 ++(outEdges.first), ++i) { 00174 ExecStreamGraphImpl::Edge edge = *(outEdges.first); 00175 ExecStreamBufAccessor &bufAccessor = 00176 graphImpl.getBufAccessorFromEdge(edge); 00177 FENNEL_TRACE( 00178 TRACE_FINER, 00179 "output buffer " << i << ": " 00180 << ExecStreamBufState_names[bufAccessor.getState()] 00181 << (bufAccessor.hasPendingEOS() ? ", EOS pending" : "") 00182 << ", consumption available = " 00183 << bufAccessor.getConsumptionAvailable() 00184 << ", production available = " 00185 << bufAccessor.getProductionAvailable()); 00186 if (stream.isTracingLevel(outputTupleTraceLevel)) { 00187 traceStreamBufferContents( 00188 stream, bufAccessor, outputTupleTraceLevel); 00189 } 00190 } 00191 } 00192 00193 void ExecStreamScheduler::traceStreamBufferContents( 00194 ExecStream &stream, 00195 ExecStreamBufAccessor &bufAccessor, 00196 TraceLevel traceLevel) 00197 { 00198 TupleDescriptor const &tupleDesc = bufAccessor.getTupleDesc(); 00199 TupleData tupleData(tupleDesc); 00200 TupleAccessor &tupleAccessor = bufAccessor.getScratchTupleAccessor(); 00201 00202 for (PConstBuffer pTuple = bufAccessor.getConsumptionStart(); 00203 pTuple != bufAccessor.getConsumptionEnd(); 00204 pTuple += tupleAccessor.getCurrentByteCount()) 00205 { 00206 tupleAccessor.setCurrentTupleBuf(pTuple); 00207
00208 assert(pTuple + tupleAccessor.getCurrentByteCount() 00209 <= bufAccessor.getConsumptionEnd()); 00210 tupleAccessor.unmarshal(tupleData); 00211
00212 std::ostringstream oss; 00213 TuplePrinter tuplePrinter; 00214 tuplePrinter.print(oss,tupleDesc,tupleData); 00215 stream.trace(traceLevel,oss.str()); 00216 } 00217 } 00218 00219 void ExecStreamScheduler::checkAbort() const 00220 { 00221 } 00222 00223 uint ExecStreamScheduler::getDegreeOfParallelism() 00224 { 00225 return 1; 00226 } 00227 00228 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamScheduler.cpp#17 $"); 00229 00230