Fennel: /home/pub/open/dev/fennel/exec/CorrelationJoinExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/exec/CorrelationJoinExecStream.h" 00026 #include "fennel/exec/DynamicParam.h" 00027 #include "fennel/exec/ExecStreamBufAccessor.h" 00028 #include "fennel/exec/ExecStreamGraph.h" 00029 #include "fennel/exec/ExecStreamScheduler.h" 00030 #include "fennel/tuple/TuplePrinter.h" 00031 00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/CorrelationJoinExecStream.cpp#3 $"); 00033 00034 void CorrelationJoinExecStream::prepare( 00035 CorrelationJoinExecStreamParams const &params) 00036 { 00037 assert(inAccessors.size() == 2); 00038 00039 pLeftBufAccessor = inAccessors[0]; 00040 assert(pLeftBufAccessor); 00041 00042 pRightBufAccessor = inAccessors[1]; 00043 assert(pRightBufAccessor); 00044 00045 TupleDescriptor const &leftDesc = pLeftBufAccessor->getTupleDesc(); 00046 TupleDescriptor const &rightDesc = pRightBufAccessor->getTupleDesc(); 00047 00048 TupleDescriptor outputDesc; 00049 outputDesc.insert(outputDesc.end(),leftDesc.begin(),leftDesc.end()); 00050 outputDesc.insert(outputDesc.end(),rightDesc.begin(),rightDesc.end()); 00051 outputData.compute(outputDesc); 00052 pOutAccessor->setTupleShape(outputDesc); 00053 00054 nLeftAttributes = leftDesc.size(); 00055 correlations.assign( 00056 params.correlations.begin(), 00057 params.correlations.end()); 00058
00059
00060 assert(correlations.size() <= nLeftAttributes); 00061 00062 ConfluenceExecStream::prepare(params); 00063 } 00064 00065 void CorrelationJoinExecStream::open(bool restart) 00066 { 00067 ConfluenceExecStream::open(restart); 00068 00069 if (!restart) { 00070 leftRowCount = 0; 00071 for (std::vector::iterator it = correlations.begin(); 00072 it != correlations.end(); ++it) 00073 { 00074 pDynamicParamManager->createParam( 00075 it->dynamicParamId, 00076 pLeftBufAccessor->getTupleDesc()[it->leftAttributeOrdinal]); 00077 00078
00079
00080
00081 const std::vector &readerStreamIds = 00082 pGraph->getDynamicParamReaders(it->dynamicParamId); 00083 for (std::vector::const_iterator it2 = 00084 readerStreamIds.begin(); 00085 it2 != readerStreamIds.end(); ++it2) 00086 { 00087 pGraph->getScheduler()->setRunnable( 00088 *pGraph->getStream(*it2), false); 00089 } 00090 } 00091 } 00092 } 00093 00094 void CorrelationJoinExecStream::close() 00095 { 00096 std::vector::iterator it = correlations.begin(); 00097 for ( ; it != correlations.end(); ++it) { 00098 pDynamicParamManager->deleteParam(it->dynamicParamId); 00099 } 00100 ConfluenceExecStream::closeImpl(); 00101 } 00102 00103 ExecStreamResult CorrelationJoinExecStream::execute( 00104 ExecStreamQuantum const &quantum) 00105 { 00106
00107 uint nTuplesProduced = 0; 00108 00109 for (;;) { 00110 if (pLeftBufAccessor->isTupleConsumptionPending()) { 00111 if (pLeftBufAccessor->getState() == EXECBUF_EOS) { 00112 pOutAccessor->markEOS(); 00113 return EXECRC_EOS; 00114 } 00115 if (pLeftBufAccessor->demandData()) { 00116 return EXECRC_BUF_UNDERFLOW; 00117 } 00118 pLeftBufAccessor->unmarshalTuple(outputData); 00119
00120 std::vector::iterator it = correlations.begin(); 00121 for ( ; it != correlations.end(); ++it) { 00122 pDynamicParamManager->writeParam( 00123 it->dynamicParamId, outputData[it->leftAttributeOrdinal]); 00124 } 00125 00126
00127 pGraph->getStreamInput(getStreamId(),1)->open(true); 00128 00129
00130 if (++leftRowCount == 1) { 00131 for (std::vector::iterator it = 00132 correlations.begin(); 00133 it != correlations.end(); ++it) 00134 { 00135
00136
00137
00138
00139 const std::vector &readerStreamIds = 00140 pGraph->getDynamicParamReaders(it->dynamicParamId); 00141 for (std::vector::const_iterator it2 = 00142 readerStreamIds.begin(); 00143 it2 != readerStreamIds.end(); ++it2) 00144 { 00145 pGraph->getScheduler()->setRunnable( 00146 *pGraph->getStream(*it2), true); 00147 } 00148 } 00149 } 00150 } 00151 for (;;) { 00152 if (pRightBufAccessor->isTupleConsumptionPending()) { 00153 if (pRightBufAccessor->getState() == EXECBUF_EOS) { 00154 pLeftBufAccessor->consumeTuple(); 00155 break; 00156 } 00157 if (pRightBufAccessor->demandData()) { 00158 return EXECRC_BUF_UNDERFLOW; 00159 } 00160 pRightBufAccessor->unmarshalTuple( 00161 outputData, nLeftAttributes); 00162 break; 00163 } 00164 00165 if (pOutAccessor->produceTuple(outputData)) { 00166 ++nTuplesProduced; 00167 } else { 00168 return EXECRC_BUF_OVERFLOW; 00169 } 00170 00171 pRightBufAccessor->consumeTuple(); 00172 00173 if (nTuplesProduced >= quantum.nTuplesMax) { 00174 return EXECRC_QUANTUM_EXPIRED; 00175 } 00176 } 00177 } 00178 } 00179 00180 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/CorrelationJoinExecStream.cpp#3 $"); 00181 00182