Fennel: /home/pub/open/dev/fennel/exec/ExecStreamGraphEmbryo.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/exec/ExecStreamGraphEmbryo.h" 00026 #include "fennel/exec/ExecStreamGraph.h" 00027 #include "fennel/exec/ExecStream.h" 00028 #include "fennel/exec/ExecStreamEmbryo.h" 00029 #include "fennel/exec/ExecStreamBufAccessor.h" 00030 #include "fennel/exec/ExecStreamScheduler.h" 00031 #include "fennel/cache/QuotaCacheAccessor.h" 00032 #include "fennel/segment/SegmentAccessor.h" 00033 #include "fennel/segment/SegmentFactory.h" 00034 #include "fennel/cache/Cache.h" 00035 #include 00036 00037 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamGraphEmbryo.cpp#16 $"); 00038 00039 ExecStreamGraphEmbryo::ExecStreamGraphEmbryo( 00040 SharedExecStreamGraph pGraphInit, 00041 SharedExecStreamScheduler pSchedulerInit, 00042 SharedCache pCacheInit, 00043 SharedSegmentFactory pSegmentFactoryInit) 00044 { 00045 pGraph = pGraphInit; 00046 pScheduler = pSchedulerInit; 00047 pCacheAccessor = pCacheInit; 00048 scratchAccessor = 00049 pSegmentFactoryInit->newScratchSegment(pCacheInit); 00050 00051 pGraph->setScratchSegment(scratchAccessor.pSegment); 00052 } 00053 00054 ExecStreamGraphEmbryo::~ExecStreamGraphEmbryo() 00055 { 00056 } 00057 00058 SharedExecStream ExecStreamGraphEmbryo::addAdapterFor( 00059 const std::string &name, 00060 uint iOutput, 00061 ExecStreamBufProvision requiredDataflow) 00062 { 00063
00064
00065
00066 00067
00068 SharedExecStream pLastStream = pGraph->findLastStream(name, iOutput); 00069 ExecStreamBufProvision availableDataflow = 00070 pLastStream->getOutputBufProvision(); 00071 assert(availableDataflow != BUFPROV_NONE); 00072 00073
00074 std::string adapterName; 00075 { 00076 int id = pGraph->getOutputCount(pLastStream->getStreamId()); 00077 std::ostringstream oss; 00078 oss << pLastStream->getName() << "#" << id << ".provisioner"; 00079 adapterName = oss.str(); 00080 } 00081 00082
00083 switch (requiredDataflow) { 00084 case BUFPROV_CONSUMER: 00085 if (availableDataflow == BUFPROV_PRODUCER) { 00086 ExecStreamEmbryo embryo; 00087 pScheduler->createCopyProvisionAdapter(embryo); 00088 initializeAdapter(embryo, name, iOutput, adapterName); 00089 return embryo.getStream(); 00090 } 00091 break; 00092 case BUFPROV_PRODUCER: 00093 if (availableDataflow == BUFPROV_CONSUMER) { 00094 ExecStreamEmbryo embryo; 00095 pScheduler->createBufferProvisionAdapter(embryo); 00096 initializeAdapter(embryo, name, iOutput, adapterName); 00097 return embryo.getStream(); 00098 } 00099 break; 00100 default: 00101 permAssert(false); 00102 } 00103 return pLastStream; 00104 } 00105 00106 void ExecStreamGraphEmbryo::initializeAdapter( 00107 ExecStreamEmbryo &embryo, 00108 std::string const &streamName, 00109 uint iOutput, 00110 std::string const &adapterName) 00111 { 00112 initStreamParams(*(embryo.getParams())); 00113 embryo.getStream()->setName(adapterName); 00114 saveStreamEmbryo(embryo); 00115 pGraph->interposeStream( 00116 streamName, iOutput, embryo.getStream()->getStreamId()); 00117 } 00118 00119 void ExecStreamGraphEmbryo::saveStreamEmbryo(ExecStreamEmbryo &embryo) 00120 { 00121 allStreamEmbryos[embryo.getStream()->getName()] = embryo; 00122 pGraph->addStream(embryo.getStream()); 00123 } 00124 00125 ExecStreamEmbryo &ExecStreamGraphEmbryo::getStreamEmbryo( 00126 std::string const &name) 00127 { 00128 StreamMapIter pPair = allStreamEmbryos.find(name); 00129 assert(pPair != allStreamEmbryos.end()); 00130 return pPair->second; 00131 } 00132 00133 void ExecStreamGraphEmbryo::addDataflow( 00134 const std::string &source, 00135 const std::string &target, 00136 bool isImplicit) 00137 { 00138 SharedExecStream pSourceStream = 00139 pGraph->findStream(source); 00140 SharedExecStream pTargetStream = 00141 pGraph->findStream(target); 00142 SharedExecStream pInput; 00143 if (isImplicit) { 00144 pInput = pSourceStream; 00145 } else { 00146 uint iOutput = pGraph->getOutputCount(pSourceStream->getStreamId()); 00147 ExecStreamBufProvision requiredConversion = 00148 pSourceStream->getOutputBufConversion(); 00149 if (requiredConversion != BUFPROV_NONE) { 00150 addAdapterFor(source, iOutput, requiredConversion); 00151 } 00152 ExecStreamBufProvision requiredDataflow = 00153 pTargetStream->getInputBufProvision(); 00154 addAdapterFor(source, iOutput, requiredDataflow); 00155 pInput = pGraph->findLastStream(source, iOutput); 00156 } 00157 pGraph->addDataflow( 00158 pInput->getStreamId(), 00159 pTargetStream->getStreamId(), 00160 isImplicit); 00161 } 00162 00163 void ExecStreamGraphEmbryo::initStreamParams(ExecStreamParams &params) 00164 { 00165 params.pCacheAccessor = pCacheAccessor; 00166 params.scratchAccessor = scratchAccessor; 00167 00168
00169
00170 uint quota = 0; 00171 SharedQuotaCacheAccessor pQuotaAccessor( 00172 new QuotaCacheAccessor( 00173 SharedQuotaCacheAccessor(), 00174 params.pCacheAccessor, 00175 quota)); 00176 params.pCacheAccessor = pQuotaAccessor; 00177 00178
00179
00180 params.scratchAccessor.pCacheAccessor.reset( 00181 new QuotaCacheAccessor( 00182 pQuotaAccessor, 00183 params.scratchAccessor.pCacheAccessor, 00184 quota)); 00185 } 00186 00187 ExecStreamGraph &ExecStreamGraphEmbryo::getGraph() 00188 { 00189 return pGraph; 00190 } 00191 00192 SegmentAccessor &ExecStreamGraphEmbryo::getScratchAccessor() 00193 { 00194 return scratchAccessor; 00195 } 00196 00197 void ExecStreamGraphEmbryo::prepareGraph( 00198 SharedTraceTarget pTraceTarget, 00199 std::string const &tracePrefix) 00200 { 00201 pGraph->prepare(pScheduler); 00202 std::vector sortedStreams = 00203 pGraph->getSortedStreams(); 00204 std::vector::iterator pos; 00205 for (pos = sortedStreams.begin(); pos != sortedStreams.end(); pos++) { 00206 std::string name = (*pos)->getName(); 00207 ExecStreamEmbryo &embryo = getStreamEmbryo(name); 00208
00209
00210 std::string traceName = tracePrefix + name; 00211 ExecStreamId streamId = embryo.getStream()->getStreamId(); 00212 embryo.getStream()->initTraceSource( 00213 pTraceTarget, 00214 traceName); 00215 embryo.prepareStream(); 00216 00217
00218 uint outputCount = pGraph->getOutputCount(streamId); 00219 for (uint i = 0; i < outputCount; ++i) { 00220 SharedExecStreamBufAccessor outAccessor = 00221 pGraph->getStreamOutputAccessor(streamId, i); 00222 if (outAccessor->getTupleDesc().empty()) { 00223 permFail("Forgot to initialize output #" << i << "of stream '" 00224 << traceName << "'"); 00225 } 00226 } 00227 } 00228 00229 pScheduler->addGraph(pGraph); 00230 } 00231 00232 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/ExecStreamGraphEmbryo.cpp#16 $"); 00233 00234