Fennel: /home/pub/open/dev/fennel/exec/CartesianJoinExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/exec/CartesianJoinExecStream.h" 00026 #include "fennel/exec/ExecStreamBufAccessor.h" 00027 #include "fennel/exec/ExecStreamGraph.h" 00028 00029 #include 00030 00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/CartesianJoinExecStream.cpp#14 $"); 00032 00033 void CartesianJoinExecStream::prepare( 00034 CartesianJoinExecStreamParams const &params) 00035 { 00036 assert(checkNumInputs()); 00037 pLeftBufAccessor = inAccessors[0]; 00038 assert(pLeftBufAccessor); 00039 00040 pRightBufAccessor = inAccessors[1]; 00041 assert(pRightBufAccessor); 00042 00043 leftOuter = params.leftOuter; 00044 00045 SharedExecStream pLeftInput = pGraph->getStreamInput(getStreamId(), 0); 00046 assert(pLeftInput); 00047 pRightInput = pGraph->getStreamInput(getStreamId(), 1); 00048 assert(pRightInput); 00049 FENNEL_TRACE( 00050 TRACE_FINE, 00051 "left input " << pLeftInput->getStreamId() << 00052 ' ' << pLeftInput->getName() << 00053 ", right input " << pRightInput->getStreamId() << 00054 ' ' << pRightInput->getName()); 00055 00056 00057 TupleDescriptor const &leftDesc = pLeftBufAccessor->getTupleDesc(); 00058 TupleDescriptor const &rightDesc = pRightBufAccessor->getTupleDesc(); 00059 00060 TupleDescriptor outputDesc; 00061 outputDesc.insert(outputDesc.end(),leftDesc.begin(),leftDesc.end()); 00062 uint iFirstRight = outputDesc.size(); 00063 outputDesc.insert(outputDesc.end(),rightDesc.begin(),rightDesc.end()); 00064 if (leftOuter) { 00065
00066
00067 for (uint i = iFirstRight; i < outputDesc.size(); ++i) { 00068 outputDesc[i].isNullable = true; 00069 } 00070 } 00071 if (params.outputTupleDesc.size()) { 00072 assert(params.outputTupleDesc == outputDesc); 00073 } 00074 outputData.compute(outputDesc); 00075 pOutAccessor->setTupleShape(outputDesc); 00076 00077 nLeftAttributes = leftDesc.size(); 00078 00079 ConfluenceExecStream::prepare(params); 00080 } 00081 00082 bool CartesianJoinExecStream::checkNumInputs() 00083 { 00084 return (inAccessors.size() == 2); 00085 } 00086 00087 void CartesianJoinExecStream::open(bool restart) 00088 { 00089 ConfluenceExecStream::open(restart); 00090 } 00091 00092 00093 inline std::ostream& operator<< ( 00094 std::ostream& os, SharedExecStreamBufAccessor buf) 00095 { 00096 os << ExecStreamBufState_names[buf->getState()]; 00097 if (buf->hasPendingEOS()) { 00098 os << "(EOS pending)"; 00099 } 00100 return os; 00101 } 00102 00103 ExecStreamResult CartesianJoinExecStream::execute( 00104 ExecStreamQuantum const &quantum) 00105 { 00106
00107 00108
00109
00110
00111
00112
00113
00114 00115
00116
00117
00118 00119 uint nTuplesProduced = 0; 00120 00121 for (;;) { 00122 if (pLeftBufAccessor->isTupleConsumptionPending()) { 00123 if (pLeftBufAccessor->getState() == EXECBUF_EOS) { 00124 pOutAccessor->markEOS(); 00125 return EXECRC_EOS; 00126 } 00127 if (pLeftBufAccessor->demandData()) { 00128 FENNEL_TRACE_THREAD( 00129 TRACE_FINE, 00130 "left underflow; left input " << pLeftBufAccessor << 00131 " right input " << pRightBufAccessor); 00132 return EXECRC_BUF_UNDERFLOW; 00133 } 00134 pLeftBufAccessor->unmarshalTuple(outputData); 00135 processLeftInput(); 00136 rightInputEmpty = true; 00137 } 00138 ExecStreamResult rc = preProcessRightInput(); 00139 if (rc != EXECRC_YIELD) { 00140 return rc; 00141 } 00142 for (;;) { 00143 if (pRightBufAccessor->isTupleConsumptionPending()) { 00144 if (pRightBufAccessor->getState() == EXECBUF_EOS) { 00145 if (leftOuter && rightInputEmpty) { 00146
00147
00148 for (int i = nLeftAttributes; 00149 i < outputData.size(); ++i) 00150 { 00151 outputData[i].pData = NULL; 00152 } 00153 00154 if (pOutAccessor->produceTuple(outputData)) { 00155 ++nTuplesProduced; 00156 } else { 00157 return EXECRC_BUF_OVERFLOW; 00158 } 00159 00160 if (nTuplesProduced >= quantum.nTuplesMax) { 00161 return EXECRC_QUANTUM_EXPIRED; 00162 } 00163 } 00164 00165 pLeftBufAccessor->consumeTuple(); 00166
00167 pRightInput->open(true); 00168 FENNEL_TRACE_THREAD( 00169 TRACE_FINE, 00170 "re-opened right input " << pRightBufAccessor); 00171
00172
00173 break; 00174 } 00175 if (pRightBufAccessor->demandData()) { 00176 FENNEL_TRACE_THREAD( 00177 TRACE_FINE, 00178 "right underflow; left input " << pLeftBufAccessor << 00179 " right input " << pRightBufAccessor); 00180 return EXECRC_BUF_UNDERFLOW; 00181 } 00182 rightInputEmpty = false; 00183 pRightBufAccessor->unmarshalTuple( 00184 outputData, nLeftAttributes); 00185 break; 00186 } 00187 00188 if (pOutAccessor->produceTuple(outputData)) { 00189 ++nTuplesProduced; 00190 } else { 00191 return EXECRC_BUF_OVERFLOW; 00192 } 00193 00194 pRightBufAccessor->consumeTuple(); 00195 00196 if (nTuplesProduced >= quantum.nTuplesMax) { 00197 return EXECRC_QUANTUM_EXPIRED; 00198 } 00199 } 00200 } 00201 } 00202 00203 ExecStreamResult CartesianJoinExecStream::preProcessRightInput() 00204 { 00205 return EXECRC_YIELD; 00206 } 00207 00208 void CartesianJoinExecStream::processLeftInput() 00209 { 00210 } 00211 00212 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/CartesianJoinExecStream.cpp#14 $"); 00213 00214