Fennel: /home/pub/open/dev/fennel/lucidera/bitmap/LbmMinusExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 #include "fennel/common/CommonPreamble.h" 00023 #include "fennel/exec/ExecStreamBufAccessor.h" 00024 #include "fennel/exec/ExecStreamGraphImpl.h" 00025 #include "fennel/lucidera/bitmap/LbmMinusExecStream.h" 00026 00027 FENNEL_BEGIN_CPPFILE("$Id:"); 00028 00029 void LbmMinusExecStream::prepare(LbmMinusExecStreamParams const &params) 00030 { 00031 LbmBitOpExecStream::prepare(params); 00032 00033 if (nFields) { 00034
00035 prefixedBitmapTuple.compute(inAccessors[0]->getTupleDesc()); 00036 00037
00038
00039
00040 TupleDescriptor prevTupleDesc; 00041 TupleDescriptor const &inputDesc = inAccessors[0]->getTupleDesc(); 00042 for (int i = 0; i < nFields; i ++) { 00043 prevTupleDesc.push_back(inputDesc[i]); 00044 } 00045 prevTuple.computeAndAllocate(prevTupleDesc); 00046 } 00047 subtrahendBitmap.resize(0); 00048 } 00049 00050 void LbmMinusExecStream::open(bool restart) 00051 { 00052 LbmBitOpExecStream::open(restart); 00053 subtrahendsDone = false; 00054 needToRead = true; 00055 minSubtrahendRid = LcsRid(0); 00056 maxSubtrahendRid = LcsRid(0); 00057 baseRid = LcsRid(0); 00058 advancePending = false; 00059
00060 rowLimit = 0; 00061 inputType = UNKNOWN_INPUT; 00062 copyPrefixPending = false; 00063 prevTupleValid = false; 00064 minuendReader.init(inAccessors[0], bitmapSegTuples[0]); 00065 00066 if (nFields > 0) { 00067 subtrahendBitmap.resize(SUBTRAHEND_BITMAP_SIZE); 00068 } 00069 restartSubtrahends(); 00070 } 00071 00072 ExecStreamResult LbmMinusExecStream::execute(ExecStreamQuantum const &quantum) 00073 { 00074 ExecStreamResult rc; 00075 00076
00077 if (inputType == UNKNOWN_INPUT) { 00078 rc = advanceSubtrahends(LcsRid(0)); 00079 if (rc != EXECRC_YIELD) { 00080 return rc; 00081 } 00082 int dummy; 00083 rc = findMinInput(dummy); 00084 if (rc == EXECRC_EOS) { 00085 inputType = EMPTY_INPUT; 00086 if (nFields == 0) { 00087 subtrahendsDone = true; 00088 } 00089 } else { 00090 inputType = NONEMPTY_INPUT; 00091 } 00092 } 00093 00094 if (producePending) { 00095 rc = producePendingOutput(0); 00096 if (rc != EXECRC_YIELD) { 00097 return rc; 00098 } 00099 } 00100 00101 bool skipMinus = false; 00102 if (copyPrefixPending) { 00103 copyPrefix(); 00104 copyPrefixPending = false; 00105 needToRead = false; 00106
00107
00108 skipMinus = checkNeedForRestart(); 00109 } 00110 00111 for (uint i = 0; i < quantum.nTuplesMax; i++) { 00112
00113
00114 if (needToRead) { 00115 rc = readMinuendInputAndFlush(baseRid, baseByteSeg, baseLen); 00116 if (rc != EXECRC_YIELD) { 00117 return rc; 00118 } 00119 00120
00121 skipMinus = checkNeedForRestart(); 00122 } 00123 00124
00125
00126
00127 if ((nFields == 0 && subtrahendsDone) || !skipMinus) { 00128 if (advancePending) { 00129 rc = 00130 advanceSingleSubtrahend( 00131 advanceSubtrahendInputNo, 00132 advanceSubtrahendRid); 00133 if (rc != EXECRC_YIELD && rc != EXECRC_EOS) { 00134 return rc; 00135 } 00136 advancePending = false; 00137 } else { 00138 rc = advanceSubtrahends(baseRid); 00139 if (rc != EXECRC_YIELD) { 00140 return rc; 00141 } 00142 } 00143 00144 rc = minusSegments(baseRid, baseByteSeg, baseLen); 00145 if (rc != EXECRC_YIELD && rc != EXECRC_EOS) { 00146 return rc; 00147 } 00148 } 00149 00150
00151
00152 needToRead = true; 00153 startRid = baseRid + baseLen * LbmSegment::LbmOneByteSize; 00154 addRid = baseRid; 00155 addByteSeg = pByteSegBuf; 00156 addLen = baseLen; 00157 if (addSegments()) { 00158 return EXECRC_BUF_OVERFLOW; 00159 } 00160 00161
00162 } 00163 00164 return EXECRC_QUANTUM_EXPIRED; 00165 } 00166 00167 ExecStreamResult LbmMinusExecStream::readMinuendInputAndFlush( 00168 LcsRid &currRid, PBuffer &currByteSeg, uint &currLen) 00169 { 00170 ExecStreamResult rc; 00171 bool unordered = false; 00172 00173
00174
00175 if (nFields == 0) { 00176 rc = readInput(0, currRid, currByteSeg, currLen); 00177 } else { 00178 rc = readMinuendInput(currRid, currByteSeg, currLen); 00179 if (currRid < startRid) { 00180 unordered = true; 00181 } 00182 } 00183 if (rc != EXECRC_YIELD) { 00184 return rc; 00185 } 00186 00187
00188 memcpy(pByteSegBuf, baseByteSeg - baseLen + 1, baseLen); 00189 needToRead = false; 00190
00191
00192
00193 startRid = baseRid; 00194 writeStartRidParamValue(); 00195 iInput = 1; 00196 00197
00198 if (nFields == 0) { 00199 return rc; 00200 } 00201 00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218 if (prevTupleValid) { 00219 if (minuendReader.getTupleChange()) { 00220 minuendReader.resetChangeListener(); 00221 int keyComp = comparePrefixes(); 00222 if (keyComp != 0 || unordered) { 00223 needSubtrahendRestart = true; 00224 if (flush()) { 00225 copyPrefixPending = true; 00226 return EXECRC_BUF_OVERFLOW; 00227 } 00228 copyPrefix(); 00229 } 00230 } 00231 } else { 00232 prevTupleValid = true; 00233 copyPrefix(); 00234 minuendReader.resetChangeListener(); 00235 } 00236 return rc; 00237 } 00238 00239 ExecStreamResult LbmMinusExecStream::readMinuendInput( 00240 LcsRid &currRid, PBuffer &currByteSeg, uint &currLen) 00241 { 00242 LbmByteNumber byteNumber; 00243 ExecStreamResult rc = minuendReader.readSegmentAndAdvance( 00244 byteNumber, currByteSeg, currLen); 00245 currRid = byteNumberToRid(byteNumber); 00246 if (rc == EXECRC_EOS) { 00247
00248 if (! flush()) { 00249 return EXECRC_BUF_OVERFLOW; 00250 } 00251 pOutAccessor->markEOS(); 00252 return EXECRC_EOS; 00253 } else if (rc != EXECRC_YIELD) { 00254 return rc; 00255 } 00256 00257
00258
00259 assert(currLen <= bitmapBufSize); 00260 00261 return EXECRC_YIELD; 00262 } 00263 00264 int LbmMinusExecStream::comparePrefixes() 00265 { 00266 int ret = 00267 (inAccessors[0]->getTupleDesc()).compareTuplesKey( 00268 prevTuple, 00269 bitmapSegTuples[0], 00270 nFields); 00271 return ret; 00272 } 00273 00274 void LbmMinusExecStream::restartSubtrahends() 00275 { 00276 minSubtrahendRid = LcsRid(0); 00277 advancePending = false; 00278 for (uint i = 1; i < nInputs; i++) { 00279 pGraph->getStreamInput(getStreamId(), i)->open(true); 00280 segmentReaders[i].init( 00281 inAccessors[i], 00282 bitmapSegTuples[i], 00283 (subtrahendsDone && nFields > 0), 00284 &subtrahendBitmap); 00285 } 00286 iInput = 1; 00287 needSubtrahendRestart = false; 00288 } 00289 00290 void LbmMinusExecStream::copyPrefix() 00291 { 00292
00293 00294 00295 00296 prevTuple.resetBuffer(); 00297 00298 for (int i = 0; i < nFields; i ++) { 00299 prevTuple[i].memCopyFrom(bitmapSegTuples[0][i]); 00300 } 00301 } 00302 00303 ExecStreamResult LbmMinusExecStream::advanceSingleSubtrahend( 00304 int inputNo, 00305 LcsRid rid) 00306 { 00307 ExecStreamResult rc = segmentReaders[inputNo].advanceToRid(rid); 00308 return rc; 00309 } 00310 00311 ExecStreamResult LbmMinusExecStream::advanceSubtrahends(LcsRid baseRid) 00312 { 00313
00314
00315 if (minSubtrahendRid > baseRid) { 00316 return EXECRC_YIELD; 00317 } 00318 00319
00320 for (; iInput < nInputs; iInput++) { 00321 ExecStreamResult rc = segmentReaders[iInput].advanceToRid(baseRid); 00322 if (rc == EXECRC_EOS) { 00323 continue; 00324 } 00325 if (rc != EXECRC_YIELD) { 00326 return rc; 00327 } 00328 } 00329 00330 return EXECRC_YIELD; 00331 } 00332 00333 bool LbmMinusExecStream::checkNeedForRestart() 00334 { 00335
00336
00337 00338 if (nFields == 0) { 00339 return false; 00340 } else if (inputType == EMPTY_INPUT) { 00341
00342 return true; 00343 } else if (needSubtrahendRestart) { 00344 bool skipMinus = canSkipMinus(); 00345
00346
00347 if (!skipMinus && minSubtrahendRid > startRid) { 00348 restartSubtrahends(); 00349 } 00350 return skipMinus; 00351 } else { 00352 return canSkipMinus(); 00353 } 00354 } 00355 00356 bool LbmMinusExecStream::canSkipMinus() 00357 { 00358 LcsRid rid = baseRid; 00359 LcsRid endRid = baseRid + baseLen * LbmSegment::LbmOneByteSize - 1; 00360 00361
00362
00363 if (subtrahendsDone) { 00364
00365
00366 if (rid > maxSubtrahendRid) { 00367 return true; 00368 } 00369 } else { 00370
00371
00372
00373 for (uint i = 1; i < nInputs; i++) { 00374 if (endRid > segmentReaders[i].getMaxRidSet()) { 00375 return false; 00376 } 00377 } 00378 } 00379 00380 PBuffer seg = baseByteSeg; 00381 for (uint i = 0; i < baseLen; i++) { 00382 uint8_t byte = *((uint8_t *) seg); 00383 for (uint j = 0; j < LbmSegment::LbmOneByteSize; j++) { 00384 if (byte & 1) { 00385
00386 if (subtrahendBitmap.test( 00387 opaqueToInt(rid % SUBTRAHEND_BITMAP_SIZE))) 00388 { 00389 return false; 00390 } 00391 } 00392 byte = byte >> 1; 00393 rid++; 00394 } 00395 seg--; 00396 } 00397 return true; 00398 } 00399 00400 ExecStreamResult LbmMinusExecStream::minusSegments( 00401 LcsRid baseRid, PBuffer baseByteSeg, uint baseLen) 00402 { 00403 while (true) { 00404
00405
00406 int minInput; 00407 ExecStreamResult rc = findMinInput(minInput); 00408 if (rc == EXECRC_EOS) { 00409 return rc; 00410 } 00411 00412 LcsRid currRid; 00413 PBuffer currByteSeg; 00414 uint currLen; 00415 segmentReaders[minInput].readCurrentByteSegment( 00416 currRid, currByteSeg, currLen); 00417 00418
00419
00420 uint offset = 00421 opaqueToInt(currRid - baseRid) / LbmSegment::LbmOneByteSize; 00422 if (offset >= baseLen) { 00423 break; 00424 } 00425 00426
00427
00428 currLen = std::min(currLen, baseLen - offset); 00429 00430
00431
00432 PBuffer out = pByteSegBuf + baseLen - 1 - offset; 00433 uint len = currLen; 00434 while (len--) { 00435 *out-- &= ~(*currByteSeg--); 00436 } 00437 00438
00439
00440
00441 rc = segmentReaders[minInput].advanceToRid( 00442 currRid + currLen * LbmSegment::LbmOneByteSize); 00443 if (rc != EXECRC_YIELD && rc != EXECRC_EOS) { 00444 advancePending = true; 00445 advanceSubtrahendRid = 00446 currRid + currLen * LbmSegment::LbmOneByteSize; 00447 advanceSubtrahendInputNo = minInput; 00448 return rc; 00449 } 00450 } 00451 00452 return EXECRC_YIELD; 00453 } 00454 00455 ExecStreamResult LbmMinusExecStream::findMinInput(int &minInput) 00456 { 00457 minInput = -1; 00458 00459 for (uint i = 1; i < nInputs; i++) { 00460 if (inAccessors[i]->getState() == EXECBUF_EOS) { 00461 continue; 00462 } 00463 00464 LcsRid currRid; 00465 PBuffer currByteSeg; 00466 uint currLen; 00467 segmentReaders[i].readCurrentByteSegment( 00468 currRid, currByteSeg, currLen); 00469 00470 if (minInput == -1 || currRid < minSubtrahendRid) { 00471 minInput = i; 00472 minSubtrahendRid = currRid; 00473 } 00474 } 00475 00476 if (minInput == -1) { 00477
00478
00479
00480
00481 subtrahendsDone = true; 00482 for (uint i = 1; i < nInputs; i++) { 00483 LcsRid rid = segmentReaders[i].getMaxRidSet(); 00484 if (rid > maxSubtrahendRid) { 00485 maxSubtrahendRid = rid; 00486 } 00487 } 00488 minSubtrahendRid = maxSubtrahendRid + 1; 00489 return EXECRC_EOS; 00490 } else { 00491 return EXECRC_YIELD; 00492 } 00493 } 00494 00495 bool LbmMinusExecStream::produceTuple(TupleData bitmapTuple) 00496 { 00497
00498
00499 if (nFields) { 00500 for (uint i = 0; i < nFields; i++) { 00501 prefixedBitmapTuple[i].copyFrom(prevTuple[i]); 00502 } 00503 assert (prefixedBitmapTuple.size() == nFields + bitmapTuple.size()); 00504 for (uint i = 0; i < 3; i++) { 00505 prefixedBitmapTuple[nFields + i].copyFrom(bitmapTuple[i]); 00506 } 00507 return pOutAccessor->produceTuple(prefixedBitmapTuple); 00508 } 00509 return pOutAccessor->produceTuple(bitmapTuple); 00510 } 00511 00512 void LbmMinusExecStream::closeImpl() 00513 { 00514 subtrahendBitmap.resize(0); 00515 LbmBitOpExecStream::closeImpl(); 00516 } 00517 00518 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmMinusExecStream.cpp#16 $"); 00519 00520