Fennel: /home/pub/open/dev/fennel/farrago/ExecStreamBuilder.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/farrago/ExecStreamBuilder.h" 00026 #include "fennel/exec/ExecStreamGraph.h" 00027 #include "fennel/exec/ExecStream.h" 00028 #include "fennel/db/Database.h" 00029 00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/farrago/ExecStreamBuilder.cpp#16 $"); 00031 00032 ExecStreamBuilder::ExecStreamBuilder( 00033 ExecStreamGraphEmbryo &graphEmbryoInit, 00034 ExecStreamFactory &streamFactoryInit) 00035 : graphEmbryo(graphEmbryoInit), 00036 streamFactory(streamFactoryInit) 00037 { 00038 } 00039 00040 ExecStreamBuilder::~ExecStreamBuilder() 00041 { 00042 } 00043 00044 void ExecStreamBuilder::buildStreamGraph( 00045 ProxyCmdPrepareExecutionStreamGraph &cmd, 00046 bool assumeOutputFromSinks) 00047 { 00048 streamFactory.setScratchAccessor(graphEmbryo.getScratchAccessor()); 00049 00050
00051 SharedProxyExecutionStreamDef pStreamDef = cmd.getStreamDefs(); 00052 for (; pStreamDef; ++pStreamDef) { 00053 buildStream(*pStreamDef); 00054 } 00055 00056
00057 pStreamDef = cmd.getStreamDefs(); 00058 for (; pStreamDef; ++pStreamDef) { 00059 buildStreamInputs(*pStreamDef); 00060 00061 if (getExplicitOutputCount(*pStreamDef) && assumeOutputFromSinks) { 00062
00063
00064 std::string name = pStreamDef->getName(); 00065 SharedExecStream pAdaptedStream = 00066 graphEmbryo.addAdapterFor(name, 0, BUFPROV_PRODUCER); 00067 graphEmbryo.getGraph().addOutputDataflow( 00068 pAdaptedStream->getStreamId()); 00069 } 00070 } 00071 00072
00073
00074 pStreamDef = cmd.getStreamDefs(); 00075 for (; pStreamDef; ++pStreamDef) { 00076 buildStreamOutputs(*pStreamDef); 00077 } 00078 00079
00080 graphEmbryo.prepareGraph( 00081 streamFactory.getDatabase()->getSharedTraceTarget(), 00082 "xo."); 00083 } 00084 00085 void ExecStreamBuilder::buildStream( 00086 ProxyExecutionStreamDef &streamDef) 00087 { 00088 ExecStreamEmbryo embryo = streamFactory.visitStream(streamDef); 00089 graphEmbryo.saveStreamEmbryo(embryo); 00090 SharedProxyDynamicParamUse pParamUse = streamDef.getDynamicParamUse(); 00091 for (; pParamUse; ++pParamUse) { 00092 DynamicParamId dynamicParamId(pParamUse->getDynamicParamId()); 00093 if (pParamUse->isRead()) { 00094 if (false) 00095 std::cout << "stream " << embryo.getStream()->getStreamId() 00096 << " reads param " << dynamicParamId << std::endl; 00097 graphEmbryo.getGraph().declareDynamicParamReader( 00098 embryo.getStream()->getStreamId(), 00099 dynamicParamId); 00100 } else { 00101 if (false) 00102 std::cout << "stream " << embryo.getStream()->getStreamId() 00103 << " writes param " << dynamicParamId << std::endl; 00104 graphEmbryo.getGraph().declareDynamicParamWriter( 00105 embryo.getStream()->getStreamId(), 00106 dynamicParamId); 00107 } 00108 } 00109 } 00110 00111 void ExecStreamBuilder::buildStreamInputs( 00112 ProxyExecutionStreamDef &streamDef) 00113 { 00114 std::string name = streamDef.getName(); 00115 SharedProxyExecStreamDataFlow pInputFlow = streamDef.getInputFlow(); 00116 for (; pInputFlow; ++pInputFlow) { 00117 SharedProxyExecutionStreamDef pInput = pInputFlow->getProducer(); 00118
00119
00120
00121
00122
00123
00124
00125
00126 if (getExplicitOutputCount(*pInput) > 1) { 00127 continue; 00128 } 00129 std::string inputName = pInput->getName(); 00130 graphEmbryo.addDataflow(inputName, name, pInputFlow->isImplicit()); 00131 } 00132 } 00133 00134 void ExecStreamBuilder::buildStreamOutputs( 00135 ProxyExecutionStreamDef &streamDef) 00136 { 00137 std::string name = streamDef.getName(); 00138 SharedProxyExecStreamDataFlow pOutputFlow = streamDef.getOutputFlow(); 00139 if (!(getExplicitOutputCount(streamDef) > 1)) { 00140 return; 00141 } 00142 for (; pOutputFlow; ++pOutputFlow) { 00143 SharedProxyExecutionStreamDef pOutput = pOutputFlow->getConsumer(); 00144 std::string outputName = pOutput->getName(); 00145 graphEmbryo.addDataflow(name, outputName, pOutputFlow->isImplicit()); 00146 } 00147 } 00148 00149 int ExecStreamBuilder::getExplicitOutputCount( 00150 ProxyExecutionStreamDef &streamDef) 00151 { 00152 int nExplicitOutputs = 0; 00153 SharedProxyExecStreamDataFlow pOutputFlow = streamDef.getOutputFlow(); 00154 for (; pOutputFlow; ++pOutputFlow) { 00155 if (!pOutputFlow->isImplicit()) { 00156 ++nExplicitOutputs; 00157 } 00158 } 00159 return nExplicitOutputs; 00160 } 00161 00162 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/farrago/ExecStreamBuilder.cpp#16 $"); 00163 00164