Fennel: /home/pub/open/dev/fennel/exec/ReshapeExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/tuple/StandardTypeDescriptor.h" 00026 #include "fennel/exec/ExecStreamBufAccessor.h" 00027 #include "fennel/exec/ReshapeExecStream.h" 00028 00029 #include 00030 00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/ReshapeExecStream.cpp#10 $"); 00032 00033 void ReshapeExecStream::prepare(ReshapeExecStreamParams const &params) 00034 { 00035 ConduitExecStream::prepare(params); 00036 00037 TupleDescriptor const &inputDesc = pInAccessor->getTupleDesc(); 00038 TupleAccessor &inputAccessor = pInAccessor->getConsumptionTupleAccessor(); 00039 dynamicParameters.assign( 00040 params.dynamicParameters.begin(), 00041 params.dynamicParameters.end()); 00042 00043 compOp = params.compOp; 00044 if (compOp != COMP_NOOP) { 00045 initCompareData(params, inputDesc, inputAccessor); 00046 } 00047 00048
00049 outputProjAccessor.bind(inputAccessor, params.outputProj); 00050 inputOutputDesc.projectFrom(inputDesc, params.outputProj); 00051 00052
00053 outputDesc = pOutAccessor->getTupleDesc(); 00054 outputData.compute(outputDesc); 00055 00056
00057
00058 uint numOutputDynParams = 0; 00059 for (uint i = 0; i < dynamicParameters.size(); i++) { 00060 if (dynamicParameters[i].outputParam) { 00061 numOutputDynParams++; 00062 } 00063 } 00064 00065
00066
00067 assert(inputOutputDesc.size() == outputDesc.size() - numOutputDynParams); 00068 TupleDescriptor partialOutputDesc; 00069 if (numOutputDynParams == 0) { 00070 partialOutputDesc = outputDesc; 00071 } else if (inputOutputDesc.size() > 0) { 00072 partialOutputDesc.resize(inputOutputDesc.size()); 00073 std::copy( 00074 outputDesc.begin(), 00075 outputDesc.end() - numOutputDynParams, 00076 partialOutputDesc.begin()); 00077 } 00078 00079
00080 castRequired = (inputOutputDesc != partialOutputDesc); 00081 if (castRequired) { 00082 TupleProjection proj; 00083 if (compOp == COMP_NE) { 00084 proj = params.inputCompareProj; 00085 } 00086 assert(checkCastTypes(proj, inputOutputDesc, partialOutputDesc)); 00087 inputOutputData.compute(inputOutputDesc); 00088 } 00089 } 00090 00091 void ReshapeExecStream::initCompareData( 00092 ReshapeExecStreamParams const &params, 00093 TupleDescriptor const &inputDesc, 00094 TupleAccessor const &inputAccessor) 00095 { 00096
00097 assert(params.inputCompareProj.size() > 0); 00098 TupleProjection inputCompareProj = params.inputCompareProj; 00099 compTupleDesc.projectFrom(inputDesc, inputCompareProj); 00100
00101 for (uint i = 0; i < compTupleDesc.size(); i++) { 00102 compTupleDesc[i].isNullable = true; 00103 } 00104 00105
00106 inputCompareProjAccessor.bind(inputAccessor, inputCompareProj); 00107 inputCompareData.compute(compTupleDesc); 00108 00109
00110
00111
00112 TupleDescriptor partialCompTupleDesc; 00113 numCompDynParams = 0; 00114 for (uint i = 0; i < dynamicParameters.size(); i++) { 00115 if (isMAXU(dynamicParameters[i].compareOffset)) { 00116 numCompDynParams++; 00117 } 00118 } 00119 if (numCompDynParams > 0) { 00120 partialCompTupleDesc.resize(compTupleDesc.size() - numCompDynParams); 00121 std::copy( 00122 compTupleDesc.begin(), 00123 compTupleDesc.end() - numCompDynParams, 00124 partialCompTupleDesc.begin()); 00125 } 00126 00127 paramCompareData.compute(compTupleDesc); 00128 if (numCompDynParams == 0) { 00129 copyCompareTuple( 00130 compTupleDesc, 00131 paramCompareData, 00132 params.pCompTupleBuffer.get()); 00133 } else if (partialCompTupleDesc.size() > 0) { 00134 TupleData partialCompareData; 00135 partialCompareData.compute(partialCompTupleDesc); 00136 copyCompareTuple( 00137 partialCompTupleDesc, 00138 partialCompareData, 00139 params.pCompTupleBuffer.get()); 00140 00141
00142
00143 std::copy( 00144 partialCompareData.begin(), 00145 partialCompareData.end(), 00146 paramCompareData.begin()); 00147 } 00148 00149
00150
00151 lastKey.push_back(paramCompareData.size() - 1); 00152 lastKeyDesc.projectFrom(compTupleDesc, lastKey); 00153 } 00154 00155 void ReshapeExecStream::copyCompareTuple( 00156 TupleDescriptor const &tupleDesc, 00157 TupleData &tupleData, 00158 PBuffer tupleBuffer) 00159 { 00160 TupleAccessor tupleAccessor; 00161 tupleAccessor.compute(tupleDesc); 00162 tupleAccessor.setCurrentTupleBuf(tupleBuffer); 00163 uint nBytes = tupleAccessor.getCurrentByteCount(); 00164 compTupleBuffer.reset(new FixedBuffer[nBytes]); 00165 memcpy(compTupleBuffer.get(), tupleBuffer, nBytes); 00166 tupleAccessor.setCurrentTupleBuf(compTupleBuffer.get()); 00167 tupleAccessor.unmarshal(tupleData); 00168 } 00169 00170 bool ReshapeExecStream::checkCastTypes( 00171 const TupleProjection &compareProj, 00172 const TupleDescriptor &inputTupleDesc, 00173 const TupleDescriptor &outputTupleDesc) 00174 { 00175 for (uint i = 0; i < inputTupleDesc.size(); i++) { 00176 if (!(inputTupleDesc[i] == outputTupleDesc[i])) { 00177
00178
00179 if (inputTupleDesc[i].isNullable && 00180 !outputTupleDesc[i].isNullable) 00181 { 00182 assert(nullFilter(compareProj, i)); 00183 } else { 00184 assert( 00185 (inputTupleDesc[i].isNullable == 00186 outputTupleDesc[i].isNullable) 00187 || (!inputTupleDesc[i].isNullable 00188 && outputTupleDesc[i].isNullable)); 00189 } 00190 StoredTypeDescriptor::Ordinal inputType = 00191 inputTupleDesc[i].pTypeDescriptor->getOrdinal(); 00192 StoredTypeDescriptor::Ordinal outputType = 00193 outputTupleDesc[i].pTypeDescriptor->getOrdinal(); 00194 00195
00196
00197
00198 bool inputUnicode = false; 00199 if (inputType == STANDARD_TYPE_UNICODE_CHAR) { 00200 inputType = STANDARD_TYPE_CHAR; 00201 inputUnicode = true; 00202 } 00203 if (inputType == STANDARD_TYPE_UNICODE_VARCHAR) { 00204 inputType = STANDARD_TYPE_VARCHAR; 00205 inputUnicode = true; 00206 } 00207 00208 bool outputUnicode = false; 00209 if (outputType == STANDARD_TYPE_UNICODE_CHAR) { 00210 outputType = STANDARD_TYPE_CHAR; 00211 outputUnicode = true; 00212 } 00213 if (outputType == STANDARD_TYPE_UNICODE_VARCHAR) { 00214 outputType = STANDARD_TYPE_VARCHAR; 00215 outputUnicode = true; 00216 } 00217 00218 if (inputUnicode || outputUnicode) { 00219 assert(inputUnicode && outputUnicode); 00220 } 00221 00222 if (inputType != outputType) { 00223
00224
00225 assert( 00226 (inputType == STANDARD_TYPE_CHAR) 00227 && (outputType == STANDARD_TYPE_VARCHAR)); 00228 } 00229 if (inputTupleDesc[i].cbStorage != outputTupleDesc[i].cbStorage) { 00230
00231
00232 assert( 00233 ((inputType == STANDARD_TYPE_VARCHAR) 00234 || (inputType == STANDARD_TYPE_CHAR)) 00235 && (outputType == STANDARD_TYPE_VARCHAR)); 00236 } 00237 } 00238 } 00239 return true; 00240 } 00241 00242 bool ReshapeExecStream::nullFilter( 00243 const TupleProjection &compareProj, uint colno) 00244 { 00245 for (uint i = 0; i < compareProj.size(); i++) { 00246 if (compareProj[i] == colno) { 00247 if (paramCompareData[i].pData) { 00248 return true; 00249 } else { 00250 break; 00251 } 00252 } 00253 } 00254 return false; 00255 } 00256 00257 void ReshapeExecStream::open(bool restart) 00258 { 00259 ConduitExecStream::open(restart); 00260 producePending = false; 00261 paramsRead = false; 00262 } 00263 00264 ExecStreamResult ReshapeExecStream::execute( 00265 ExecStreamQuantum const &quantum) 00266 { 00267 if (paramsRead) { 00268 readDynamicParams(); 00269 paramsRead = true; 00270 } 00271 00272 ExecStreamResult rc = precheckConduitBuffers(); 00273 if (rc != EXECRC_YIELD) { 00274 return rc; 00275 } 00276 00277 if (producePending) { 00278 if (pOutAccessor->produceTuple(outputData)) { 00279 return EXECRC_BUF_OVERFLOW; 00280 } 00281 pInAccessor->consumeTuple(); 00282 producePending = false; 00283 } 00284 00285 for (uint i = 0; i < quantum.nTuplesMax; i++) { 00286 if (pInAccessor->demandData()) { 00287 return EXECRC_BUF_UNDERFLOW; 00288 } 00289 00290 pInAccessor->accessConsumptionTuple(); 00291 00292
00293 if (compOp != COMP_NOOP) { 00294 bool pass = compareInput(); 00295 if (!pass) { 00296 pInAccessor->consumeTuple(); 00297 continue; 00298 } 00299 } 00300 00301 if (castRequired) { 00302 castOutput(); 00303 } else { 00304 outputProjAccessor.unmarshal(outputData); 00305 } 00306 producePending = true; 00307 if (pOutAccessor->produceTuple(outputData)) { 00308 return EXECRC_BUF_OVERFLOW; 00309 } 00310 producePending = false; 00311 pInAccessor->consumeTuple(); 00312 } 00313 00314 return EXECRC_QUANTUM_EXPIRED; 00315 } 00316 00317 void ReshapeExecStream::readDynamicParams() 00318 { 00319 uint currCompIdx = paramCompareData.size() - numCompDynParams; 00320 uint currOutputIdx = inputOutputDesc.size(); 00321 for (uint i = 0; i < dynamicParameters.size(); i++) { 00322 if (isMAXU(dynamicParameters[i].compareOffset)) { 00323 TupleDatum const &param = 00324 pDynamicParamManager->getParam( 00325 dynamicParameters[i].dynamicParamId).getDatum(); 00326 paramCompareData[currCompIdx++] = param; 00327 } 00328 if (dynamicParameters[i].outputParam) { 00329 TupleDatum const &param = 00330 pDynamicParamManager->getParam( 00331 dynamicParameters[i].dynamicParamId).getDatum(); 00332 outputData[currOutputIdx++] = param; 00333 } 00334 } 00335 } 00336 00337 bool ReshapeExecStream::compareInput() 00338 { 00339 inputCompareProjAccessor.unmarshal(inputCompareData); 00340 int rc; 00341 00342
00343
00344
00345 if (compOp == COMP_EQ) { 00346 rc = compTupleDesc.compareTuples(inputCompareData, paramCompareData); 00347 } else { 00348 rc = 00349 compTupleDesc.compareTuplesKey( 00350 inputCompareData, paramCompareData, 00351 paramCompareData.size() - 1); 00352 if (rc != 0) { 00353 return false; 00354 } 00355
00356 if (paramCompareData[paramCompareData.size() - 1].pData) { 00357 rc = 00358 lastKeyDesc.compareTuples( 00359 inputCompareData, lastKey, paramCompareData, lastKey); 00360 } else { 00361 bool containsNullKey; 00362 rc = 00363 lastKeyDesc.compareTuples( 00364 inputCompareData, lastKey, paramCompareData, lastKey, 00365 &containsNullKey); 00366 if (containsNullKey) { 00367 return false; 00368 } 00369 } 00370 } 00371 00372 bool pass; 00373 switch (compOp) { 00374 case COMP_EQ: 00375 pass = (rc == 0); 00376 break; 00377 case COMP_NE: 00378 pass = (rc != 0); 00379 break; 00380 case COMP_LT: 00381 pass = (rc < 0); 00382 break; 00383 case COMP_LE: 00384 pass = (rc <= 0); 00385 break; 00386 case COMP_GT: 00387 pass = (rc > 0); 00388 break; 00389 case COMP_GE: 00390 pass = (rc >= 0); 00391 break; 00392 default: 00393 pass = false; 00394 permAssert(false); 00395 } 00396 return pass; 00397 } 00398 00399 void ReshapeExecStream::castOutput() 00400 { 00401 outputProjAccessor.unmarshal(inputOutputData); 00402 for (uint i = 0; i < inputOutputData.size(); i++) { 00403
00404 uint len = std::min( 00405 inputOutputData[i].cbData, outputDesc[i].cbStorage); 00406 outputData[i].cbData = len; 00407 if (inputOutputData[i].pData) { 00408 outputData[i].pData = inputOutputData[i].pData; 00409 } else { 00410 outputData[i].pData = NULL; 00411 } 00412 } 00413 } 00414 00415 void ReshapeExecStream::closeImpl() 00416 { 00417 ConduitExecStream::closeImpl(); 00418 } 00419 00420 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/ReshapeExecStream.cpp#10 $"); 00421 00422