Fennel: /home/pub/open/dev/fennel/lucidera/colstore/LcsRowScanExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 #include "fennel/common/CommonPreamble.h" 00023 #include "fennel/tuple/StandardTypeDescriptor.h" 00024 #include "fennel/lucidera/colstore/LcsRowScanExecStream.h" 00025 #include "fennel/exec/ExecStreamBufAccessor.h" 00026 #include "fennel/common/SearchEndpoint.h" 00027 #include <math.h> 00028 00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsRowScanExecStream.cpp#28 $"); 00030 00031 int32_t LcsRowScanExecStreamParams::defaultSystemSamplingClumps = 10; 00032 00033 LcsRowScanExecStream::LcsRowScanExecStream() 00034 : 00035 LcsRowScanBaseExecStream(), 00036 ridRunIter(&ridRuns) 00037 { 00038 ridRuns.resize(4000); 00039 } 00040 00041 void LcsRowScanExecStream::prepareResidualFilters( 00042 LcsRowScanExecStreamParams const &params) 00043 { 00044 nFilters = params.residualFilterCols.size(); 00045 00046
00047 00048 00049 VectorOfUint valueCols; 00050 uint j, k = 0; 00051 for (uint i = 0; i < nFilters; i++) { 00052 for (j = 0; j < params.outputProj.size(); j++) { 00053 if (params.outputProj[j] == params.residualFilterCols[i]) { 00054 valueCols.push_back(j); 00055 break; 00056 } 00057 } 00058 00059 if (j >= params.outputProj.size()) { 00060 valueCols.push_back(params.outputProj.size() + k); 00061 k++; 00062 } 00063 } 00064 00065
00066 00067 00068 uint valueClus; 00069 uint clusterPos; 00070 uint clusterStart = 0; 00071 uint realClusterStart = 0; 00072 00073 filters.reset(new PLcsResidualColumnFilters[nFilters]); 00074 00075 for (uint i = 0; i < nClusters; i++) { 00076 uint clusterEnd = clusterStart + 00077 params.lcsClusterScanDefs[i].clusterTupleDesc.size() - 1; 00078 00079 for (uint j = 0; j < nFilters; j++) { 00080 if (params.residualFilterCols[j] >= clusterStart && 00081 params.residualFilterCols[j] <= clusterEnd) 00082 { 00083 valueClus = i; 00084 00085
00086 00087 00088 for (uint k = 0; k < projMap.size(); k++) { 00089 if (projMap[k] == valueCols[j]) { 00090 clusterPos = k - realClusterStart - 00091 nonClusterCols.size(); 00092 00093 LcsResidualColumnFilters &filter = 00094 pClusters[valueClus]-> 00095 clusterCols[clusterPos]. 00096 getFilters(); 00097 00098 filters[j] = &filter; 00099 00100 filter.hasResidualFilters = true; 00101 00102 filter.readerKeyProj.push_back(valueCols[j]); 00103 filter.inputKeyDesc.projectFrom( 00104 projDescriptor, 00105 filter.readerKeyProj); 00106 filter.attrAccessor.compute( 00107 filter.inputKeyDesc[0]); 00108 00109 filter.lowerBoundProj.push_back(1); 00110 filter.upperBoundProj.push_back(3); 00111 filter.readerKeyData.computeAndAllocate( 00112 filter.inputKeyDesc); 00113 00114 break; 00115 } 00116 } 00117
00118 } 00119 } 00120
00121 clusterStart = clusterEnd + 1; 00122 realClusterStart += pClusters[i]->nColsToRead; 00123 } 00124 } 00125 00126 void LcsRowScanExecStream::prepare(LcsRowScanExecStreamParams const &params) 00127 { 00128 LcsRowScanBaseExecStream::prepare(params); 00129 00130 isFullScan = params.isFullScan; 00131 hasExtraFilter = params.hasExtraFilter; 00132 00133
00134 ridTupleData.compute(inAccessors[0]->getTupleDesc()); 00135 00136
00137 TupleDescriptor inputDesc = inAccessors[0]->getTupleDesc(); 00138 assert(inputDesc.size() == 3); 00139 StandardTypeDescriptorFactory stdTypeFactory; 00140 TupleAttributeDescriptor expectedRidDesc( 00141 stdTypeFactory.newDataType(STANDARD_TYPE_RECORDNUM)); 00142 assert(inputDesc[0] == expectedRidDesc); 00143 00144 assert(hasExtraFilter == (inAccessors.size() > 1)); 00145 00146 if (hasExtraFilter) { 00147 prepareResidualFilters(params); 00148 } else { 00149 nFilters = 0; 00150 } 00151 00152
00153 00154 00155 for (uint i = 0; i < params.outputProj.size(); i++) { 00156 outputProj.push_back(i); 00157 } 00158 00159 pOutAccessor->setTupleShape(pOutAccessor->getTupleDesc()); 00160 outputTupleData.computeAndAllocate(projDescriptor); 00161 00162
00163 00164 00165 00166 00167 projOutputTupleData.compute(pOutAccessor->getTupleDesc()); 00168 00169 attrAccessors.resize(projDescriptor.size()); 00170 for (uint i = 0; i < projDescriptor.size(); ++i) { 00171 attrAccessors[i].compute(projDescriptor[i]); 00172 } 00173 00174
00175 samplingMode = params.samplingMode; 00176 00177 if (samplingMode != SAMPLING_OFF) { 00178 samplingRate = params.samplingRate; 00179 rowCount = params.samplingRowCount; 00180 00181 if (samplingMode == SAMPLING_BERNOULLI) { 00182 isSamplingRepeatable = params.samplingIsRepeatable; 00183 repeatableSeed = params.samplingRepeatableSeed; 00184 samplingClumps = -1; 00185 00186 samplingRng.reset(new BernoulliRng(samplingRate)); 00187 } else { 00188 assert(isFullScan); 00189 00190 samplingClumps = params.samplingClumps; 00191 assert(samplingClumps > 0); 00192 00193 isSamplingRepeatable = false; 00194 } 00195 } 00196 } 00197 00198 void LcsRowScanExecStream::open(bool restart) 00199 { 00200 LcsRowScanBaseExecStream::open(restart); 00201 producePending = false; 00202 tupleFound = false; 00203 nRidsRead = 0; 00204 ridRunsBuilt = false; 00205 currRidRun.startRid = LcsRid(MAXU); 00206 currRidRun.nRids = 0; 00207 ridRuns.clear(); 00208 ridRunIter.reset(); 00209 00210 if (isFullScan) { 00211 inputRid = LcsRid(0); 00212 readDeletedRid = true; 00213 deletedRidEos = false; 00214 } 00215 nextRid = LcsRid(0); 00216 ridReader.init(inAccessors[0], ridTupleData); 00217 00218
00219 00220 00221 00222 00223 00224 00225 if (!restart) { 00226 iFilterToInitialize = 0; 00227 } else if (iFilterToInitialize < nFilters) { 00228 if (filters[iFilterToInitialize]->filterDataInitialized) { 00229 filters[iFilterToInitialize]->filterData.clear(); 00230 } 00231 } 00232 00233 if (samplingMode == SAMPLING_BERNOULLI) { 00234 if (isSamplingRepeatable) { 00235 samplingRng->reseed(repeatableSeed); 00236 } else if (!restart) { 00237 samplingRng->reseed(static_cast(time(0))); 00238 } 00239 } else if (samplingMode == SAMPLING_SYSTEM) { 00240 clumpSize = 0; 00241 clumpDistance = 0; 00242 clumpPos = 0; 00243 numClumpsBuilt = 0; 00244 00245 initializeSystemSampling(); 00246 } 00247 } 00248 00249 void LcsRowScanExecStream::getResourceRequirements( 00250 ExecStreamResourceQuantity &minQuantity, 00251 ExecStreamResourceQuantity &optQuantity) 00252 { 00253 LcsRowScanBaseExecStream::getResourceRequirements(minQuantity, optQuantity); 00254 } 00255 00256 bool LcsRowScanExecStream::initializeFiltersIfNeeded() 00257 { 00258
00259 00260 00261 for (; iFilterToInitialize < nFilters; iFilterToInitialize++) { 00262 SharedExecStreamBufAccessor &pInAccessor = 00263 inAccessors[iFilterToInitialize + 1]; 00264 TupleAccessor &inputAccessor = 00265 pInAccessor->getConsumptionTupleAccessor(); 00266 00267 if (pInAccessor->getState() != EXECBUF_EOS) { 00268 PLcsResidualColumnFilters filter = filters[iFilterToInitialize]; 00269 00270 while (pInAccessor->demandData()) { 00271 SharedLcsResidualFilter filterData(new LcsResidualFilter); 00272 00273 pInAccessor->accessConsumptionTuple(); 00274 00275
00276 00277 00278 filterData->boundData.compute(pInAccessor->getTupleDesc()); 00279 filterData->boundBuf.reset( 00280 new FixedBuffer[inputAccessor.getCurrentByteCount()]); 00281 00282 memcpy( 00283 filterData->boundBuf.get(), 00284 pInAccessor->getConsumptionStart(), 00285 inputAccessor.getCurrentByteCount()); 00286 00287
00288 00289 00290 00291 00292 PConstBuffer tmpBuf; 00293 tmpBuf = inputAccessor.getCurrentTupleBuf(); 00294 inputAccessor.setCurrentTupleBuf(filterData->boundBuf.get()); 00295 inputAccessor.unmarshal(filterData->boundData); 00296 inputAccessor.setCurrentTupleBuf(tmpBuf); 00297 00298
00299 00300 00301 filterData->lowerBoundDirective = 00302 SearchEndpoint(*filterData->boundData[0].pData); 00303 filterData->upperBoundDirective = 00304 SearchEndpoint(*filterData->boundData[2].pData); 00305 00306 filter->filterData.push_back(filterData); 00307 00308 pInAccessor->consumeTuple(); 00309 } 00310 00311 if (pInAccessor->getState() != EXECBUF_EOS) { 00312 return false; 00313 } 00314 } 00315 filters[iFilterToInitialize]->filterDataInitialized = true; 00316 } 00317 return true; 00318 } 00319 00320 00321 void LcsRowScanExecStream::initializeSystemSampling() 00322 { 00323 clumpPos = 0; 00324 clumpSkipPos = 0; 00325 00326 FENNEL_TRACE(TRACE_FINE, "rowCount = " << rowCount); 00327 FENNEL_TRACE( 00328 TRACE_FINE, "samplingRate = " << static_cast(samplingRate)); 00329 00330 if (rowCount <= 0) { 00331
00332 clumpSize = 1; 00333 clumpDistance = 0; 00334 numClumps = 0; 00335 return; 00336 } 00337 00338
00339
00340 numClumps = samplingClumps; 00341 00342
00343 int64_t sampleSize = 00344 static_cast( 00345 round( 00346 static_cast(rowCount) * 00347 static_cast(samplingRate))); 00348 if (sampleSize < numClumps) { 00349
00350
00351 sampleSize = numClumps; 00352 } 00353 00354 if (sampleSize > rowCount) { 00355
00356
00357 sampleSize = rowCount; 00358 numClumps = 1; 00359 } 00360 00361 FENNEL_TRACE(TRACE_FINE, "sampleSize = " << sampleSize); 00362 00363 clumpSize = 00364 static_cast( 00365 round( 00366 static_cast(sampleSize) / 00367 static_cast(numClumps))); 00368 assert(sampleSize >= clumpSize); 00369 assert(clumpSize >= 1); 00370 00371 FENNEL_TRACE(TRACE_FINE, "clumpSize = " << clumpSize); 00372 00373 if (numClumps > 1) { 00374
00375 clumpDistance = 00376 static_cast( 00377 round( 00378 static_cast(rowCount - sampleSize) / 00379 static_cast(numClumps - 1))); 00380 00381
00382
00383 uint64_t rowsRequired = 00384 (clumpSize + clumpDistance) * (numClumps - 1) + clumpSize; 00385 if (rowsRequired > rowCount && clumpDistance > 0) { 00386 clumpDistance--; 00387 } 00388 } else { 00389
00390 clumpDistance = (rowCount - sampleSize); 00391 } 00392 00393 FENNEL_TRACE(TRACE_FINE, "clumpDistance = " << clumpDistance); 00394 } 00395 00396 00397 ExecStreamResult LcsRowScanExecStream::execute(ExecStreamQuantum const &quantum) 00398 { 00399 if (initializeFiltersIfNeeded()) { 00400 return EXECRC_BUF_UNDERFLOW; 00401 } 00402 00403 for (uint i = 0; i < quantum.nTuplesMax; i++) { 00404 uint iClu; 00405 bool passedFilter; 00406 00407 while (producePending) { 00408
00409 if (ridRunsBuilt && ridRuns.nFreeSpace() > 100) { 00410 ExecStreamResult rc = fillRidRunBuffer(); 00411 if (rc != EXECRC_YIELD) { 00412 return rc; 00413 } 00414 } 00415 00416
00417
00418 LcsRid rid = 00419 LcsClusterReader::getFetchRids(ridRunIter, nextRid, true); 00420 if (rid == LcsRid(MAXU)) { 00421 assert(ridRunIter.done()); 00422 pOutAccessor->markEOS(); 00423 return EXECRC_EOS; 00424 } 00425 00426 uint prevClusterEnd = 0; 00427
00428 outputTupleData.resetBuffer(); 00429 00430
00431 for (uint j = 0; j < nonClusterCols.size(); j++) { 00432 if (nonClusterCols[j] == LCS_RID_COLUMN_ID) { 00433 memcpy( 00434 const_cast(outputTupleData[projMap[j]].pData), 00435 (PBuffer) &rid, sizeof(LcsRid)); 00436 prevClusterEnd++; 00437 } else { 00438 permAssert(false); 00439 } 00440 } 00441 00442
00443 for (iClu = 0, passedFilter = true; iClu < nClusters; iClu++) { 00444 SharedLcsClusterReader &pScan = pClusters[iClu]; 00445 00446
00447 pScan->catchUp(ridRunIter.getCurrPos(), nextRid); 00448 00449
00450
00451 00452 if (!pScan->isPositioned() || rid >= pScan->getRangeEndRid()) { 00453 bool rc = pScan->position(rid); 00454 00455
00456
00457 if (rc == false) 00458 break; 00459 00460 assert(rid >= pScan->getRangeStartRid() 00461 && rid < pScan->getRangeEndRid()); 00462 00463
00464 syncColumns(pScan); 00465 } else { 00466
00467 assert(rid > pScan->getRangeStartRid()); 00468 00469
00470
00471
00472 pScan->advanceWithinBatch( 00473 opaqueToInt(rid - pScan->getCurrentRid())); 00474 } 00475 00476 passedFilter = 00477 readColVals( 00478 pScan, 00479 outputTupleData, 00480 prevClusterEnd); 00481 if (!passedFilter) { 00482 break; 00483 } 00484 prevClusterEnd += pScan->nColsToRead; 00485 } 00486 00487 if (!passedFilter) { 00488 continue; 00489 } 00490 if (iClu == nClusters) { 00491 tupleFound = true; 00492 } 00493 producePending = true; 00494 } 00495 00496
00497 projOutputTupleData.projectFrom(outputTupleData, outputProj); 00498 if (tupleFound) { 00499 if (pOutAccessor->produceTuple(projOutputTupleData)) { 00500 return EXECRC_BUF_OVERFLOW; 00501 } 00502 } 00503 producePending = false; 00504 00505 if (isFullScan) { 00506
00507 if (tupleFound) { 00508 pOutAccessor->markEOS(); 00509 return EXECRC_EOS; 00510 } 00511 } 00512 00513 tupleFound = false; 00514 nRidsRead++; 00515 } 00516 00517 return EXECRC_QUANTUM_EXPIRED; 00518 } 00519 00520 ExecStreamResult LcsRowScanExecStream::fillRidRunBuffer() 00521 { 00522 ExecStreamResult rc; 00523 RecordNum nRows; 00524 00525 do { 00526 if (isFullScan) { 00527 rc = ridReader.readRidAndAdvance(inputRid); 00528 if (rc == EXECRC_EOS) { 00529 ridRunsBuilt = true; 00530 break; 00531 } 00532 if (rc != EXECRC_YIELD) { 00533 return rc; 00534 } 00535 nRows = 1; 00536 00537 } else { 00538 if (deletedRidEos && readDeletedRid) { 00539 rc = ridReader.readRidAndAdvance(deletedRid); 00540 if (rc == EXECRC_EOS) { 00541 deletedRidEos = true; 00542 if (samplingMode == SAMPLING_OFF) { 00543 ridRunsBuilt = true; 00544 } else if (samplingMode == SAMPLING_SYSTEM && 00545 numClumps == 0) 00546 { 00547 ridRunsBuilt = true; 00548 break; 00549 } 00550 } else if (rc != EXECRC_YIELD) { 00551 return rc; 00552 } else { 00553 readDeletedRid = false; 00554 } 00555 } 00556
00557 if (deletedRidEos && inputRid == deletedRid) { 00558 inputRid++; 00559 readDeletedRid = true; 00560 continue; 00561 } else { 00562 if (deletedRidEos) { 00563 nRows = MAXU; 00564 } else { 00565 nRows = opaqueToInt(deletedRid - inputRid); 00566 } 00567 } 00568 } 00569 00570 if (samplingMode != SAMPLING_OFF) { 00571 if (samplingMode == SAMPLING_SYSTEM) { 00572 if (clumpSkipPos > 0) { 00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585 if (deletedRidEos) { 00586
00587 inputRid += clumpSkipPos; 00588 clumpSkipPos = 0; 00589 } else if (readDeletedRid) { 00590 if (deletedRid > inputRid + clumpSkipPos) { 00591
00592 inputRid += clumpSkipPos; 00593 clumpSkipPos = 0; 00594 nRows = opaqueToInt(deletedRid - inputRid); 00595 } else { 00596
00597 clumpSkipPos -= opaqueToInt(deletedRid - inputRid); 00598 inputRid = deletedRid; 00599 continue; 00600 } 00601 } else { 00602
00603 clumpSkipPos--; 00604 inputRid++; 00605 continue; 00606 } 00607 } 00608 00609 if (nRows >= clumpSize - clumpPos) { 00610
00611
00612 nRows = clumpSize - clumpPos; 00613 clumpPos = 0; 00614 clumpSkipPos = clumpDistance; 00615 if (++numClumpsBuilt == numClumps) { 00616 ridRunsBuilt = true; 00617 } 00618 } else { 00619
00620 clumpPos += nRows; 00621 } 00622 } else { 00623
00624 if (opaqueToInt(inputRid) >= opaqueToInt(rowCount)) { 00625 ridRunsBuilt = true; 00626 break; 00627 } 00628 if (samplingRng->nextValue()) { 00629 inputRid++; 00630 continue; 00631 } 00632 nRows = 1; 00633 } 00634 } 00635 00636 if (currRidRun.startRid == LcsRid(MAXU)) { 00637 currRidRun.startRid = inputRid; 00638 currRidRun.nRids = nRows; 00639 } else if (currRidRun.startRid + currRidRun.nRids == inputRid) { 00640
00641
00642 if (nRows == RecordNum(MAXU)) { 00643 currRidRun.nRids = MAXU; 00644 } else { 00645 currRidRun.nRids += nRows; 00646 } 00647 } else { 00648
00649 ridRuns.push_back(currRidRun); 00650 00651
00652 currRidRun.startRid = inputRid; 00653 currRidRun.nRids = nRows; 00654 } 00655 00656 if (isFullScan) { 00657 inputRid += nRows; 00658 } 00659 } while (ridRuns.spaceAvailable() && ridRunsBuilt); 00660 00661
00662 if (ridRunsBuilt && currRidRun.startRid != LcsRid(MAXU)) { 00663 ridRuns.push_back(currRidRun); 00664 } 00665 00666 if (ridRunsBuilt) { 00667 ridRuns.setReadOnly(); 00668 } 00669 return EXECRC_YIELD; 00670 } 00671 00672 void LcsRowScanExecStream::closeImpl() 00673 { 00674 LcsRowScanBaseExecStream::closeImpl(); 00675 00676 for (uint i = 0; i < nFilters; i++) { 00677 filters[i]->filterData.clear(); 00678 } 00679 } 00680 00681 void LcsRowScanExecStream::buildOutputProj( 00682 TupleProjection &outputProj, 00683 LcsRowScanBaseExecStreamParams const &params) 00684 { 00685 LcsRowScanExecStreamParams const &rowScanParams = 00686 dynamic_cast<const LcsRowScanExecStreamParams&>(params); 00687 00688
00689 00690 00691 for (uint i = 0; i < rowScanParams.outputProj.size(); i++) { 00692 outputProj.push_back(rowScanParams.outputProj[i]); 00693 } 00694 for (uint i = 0; i < rowScanParams.residualFilterCols.size(); i++) { 00695 uint j; 00696 for (j = 0; j < rowScanParams.outputProj.size(); j++) { 00697 if (rowScanParams.outputProj[j] == 00698 rowScanParams.residualFilterCols[i]) 00699 { 00700 break; 00701 } 00702 } 00703 00704 if (j >= rowScanParams.outputProj.size()) { 00705 outputProj.push_back(rowScanParams.residualFilterCols[i]); 00706 } 00707 } 00708 } 00709 00710 00711 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsRowScanExecStream.cpp#28 $"); 00712 00713