Fennel: /home/pub/open/dev/fennel/lucidera/bitmap/LbmGeneratorExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 #include "fennel/common/CommonPreamble.h" 00023 #include "fennel/lucidera/bitmap/LbmGeneratorExecStream.h" 00024 #include "fennel/exec/ExecStreamBufAccessor.h" 00025 #include "fennel/tuple/UnalignedAttributeAccessor.h" 00026 00027 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmGeneratorExecStream.cpp#26 $"); 00028 00029 void LbmGeneratorExecStream::prepare(LbmGeneratorExecStreamParams const &params) 00030 { 00031 BTreeExecStream::prepare(params); 00032 LcsRowScanBaseExecStream::prepare(params); 00033 00034 insertRowCountParamId = params.insertRowCountParamId; 00035 assert(opaqueToInt(insertRowCountParamId) > 0); 00036 00037 createIndex = params.createIndex; 00038 parameterIds.resize(nClusters); 00039 for (uint i = 0; i < nClusters; i++) { 00040 parameterIds[i] = params.lcsClusterScanDefs[i].rootPageIdParamId; 00041 } 00042 00043 scratchLock.accessSegment(scratchAccessor); 00044 scratchPageSize = scratchAccessor.pSegment->getUsablePageSize(); 00045 00046
00047 assert(inAccessors[0]->getTupleDesc().size() == 2); 00048 inputTuple.compute(inAccessors[0]->getTupleDesc()); 00049 00050
00051 bitmapTuple.computeAndAllocate(pOutAccessor->getTupleDesc()); 00052 00053
00054 assert(treeDescriptor.tupleDescriptor == pOutAccessor->getTupleDesc()); 00055 bitmapTupleDesc = treeDescriptor.tupleDescriptor; 00056 00057 attrAccessors.resize(bitmapTupleDesc.size()); 00058 for (int i = 0; i < bitmapTupleDesc.size(); ++i) { 00059 attrAccessors[i].compute(bitmapTupleDesc[i]); 00060 } 00061 00062 nIdxKeys = treeDescriptor.keyProjection.size() - 1; 00063 00064
00065 00066 LbmEntry::getSizeBounds( 00067 bitmapTupleDesc, 00068 treeDescriptor.segmentAccessor.pSegment->getUsablePageSize(), 00069 minBitmapSize, 00070 maxBitmapSize); 00071 00072 ridRuns.resize(1); 00073 } 00074 00075 void LbmGeneratorExecStream::open(bool restart) 00076 { 00077 BTreeExecStream::open(restart); 00078 LcsRowScanBaseExecStream::open(restart); 00079 nBitmapEntries = 0; 00080 flushIdx = 0; 00081 nBitmapBuffers = 0; 00082 nScratchPagesAllocated = 0; 00083 producePending = LBM_NOFLUSH_PENDING; 00084 skipRead = false; 00085 rowCount = 0; 00086 batchRead = false; 00087 doneReading = false; 00088 revertToSingletons = false; 00089 ridRuns.clear(); 00090 if (!restart) { 00091 pDynamicParamManager->createParam( 00092 insertRowCountParamId, inAccessors[0]->getTupleDesc()[0]); 00093 00094
00095
00096 if (parameterIds.size() > 0) { 00097 for (uint i = 0; i < nClusters; i++) { 00098 if (opaqueToInt(parameterIds[i]) > 0) { 00099 pClusters[i]->setRootPageId( 00100 *reinterpret_cast<PageId const *>( 00101 pDynamicParamManager->getParam(parameterIds[i]). 00102 getDatum().pData)); 00103 } 00104 } 00105 } 00106 } 00107 } 00108 00109 void LbmGeneratorExecStream::getResourceRequirements( 00110 ExecStreamResourceQuantity &minQuantity, 00111 ExecStreamResourceQuantity &optQuantity, 00112 ExecStreamResourceSettingType &optType) 00113 { 00114 BTreeExecStream::getResourceRequirements(minQuantity, optQuantity); 00115 LcsRowScanBaseExecStream::getResourceRequirements(minQuantity, optQuantity); 00116 numMiscScratchPages = minQuantity.nCachePages; 00117 00118
00119 minQuantity.nCachePages += 1; 00120 00121
00122
00123
00124
00125
00126
00127 if (nIdxKeys > 1) { 00128 optQuantity.nCachePages += 1; 00129 optType = EXEC_RESOURCE_ACCURATE; 00130 } else { 00131 optQuantity.nCachePages += 11; 00132 optType = EXEC_RESOURCE_ESTIMATE; 00133 } 00134 } 00135 00136 void LbmGeneratorExecStream::setResourceAllocation( 00137 ExecStreamResourceQuantity &quantity) 00138 { 00139 BTreeExecStream::setResourceAllocation(quantity); 00140 LcsRowScanBaseExecStream::setResourceAllocation(quantity); 00141 00142 maxNumScratchPages = quantity.nCachePages - numMiscScratchPages; 00143 } 00144 00145 ExecStreamResult LbmGeneratorExecStream::execute( 00146 ExecStreamQuantum const &quantum) 00147 { 00148
00149 00150 if (inAccessors[0]->getState() == EXECBUF_EOS) { 00151 if (doneReading) { 00152 pOutAccessor->markEOS(); 00153 return EXECRC_EOS; 00154 } 00155 00156 } else { 00157 if (inAccessors[0]->demandData()) { 00158 return EXECRC_BUF_UNDERFLOW; 00159 } 00160 00161 inAccessors[0]->unmarshalTuple(inputTuple); 00162 00163
00164
00165 LcsRidRun ridRun; 00166 if (createIndex) { 00167 numRowsToLoad = 0; 00168 ridRun.nRids = RecordNum(MAXU); 00169 startRid = LcsRid(0); 00170 } else { 00171 numRowsToLoad = 00172 *reinterpret_cast<RecordNum const *> (inputTuple[0].pData); 00173 ridRun.nRids = numRowsToLoad; 00174 startRid = 00175 *reinterpret_cast<LcsRid const *> (inputTuple[1].pData); 00176 } 00177 currRid = startRid; 00178 00179
00180
00181 ridRun.startRid = startRid; 00182 ridRuns.push_back(ridRun); 00183 00184
00185
00186 pDynamicParamManager->writeParam(insertRowCountParamId, inputTuple[0]); 00187 00188 inAccessors[0]->consumeTuple(); 00189 00190
00191
00192 if (createIndex && numRowsToLoad == 0) { 00193 doneReading = true; 00194 return EXECRC_BUF_UNDERFLOW; 00195 } 00196 00197
00198 for (uint iClu = 0; iClu < nClusters; iClu++) { 00199 SharedLcsClusterReader &pScan = pClusters[iClu]; 00200 if (!pScan->position(startRid)) { 00201
00202 doneReading = true; 00203 return EXECRC_BUF_UNDERFLOW; 00204 } 00205 syncColumns(pScan); 00206 } 00207 00208
00209
00210 bool rc = initBitmapTable(1); 00211 assert(rc); 00212 } 00213 00214
00215 00216 switch (producePending) { 00217 case LBM_ENTRYFLUSH_PENDING: 00218
00219 if (pOutAccessor->produceTuple(outputTuple)) { 00220 return EXECRC_BUF_OVERFLOW; 00221 } 00222 bitmapTable[flushStart].inuse = false; 00223 break; 00224 case LBM_TABLEFLUSH_PENDING: 00225 if (flushTable(flushStart)) { 00226 return EXECRC_BUF_OVERFLOW; 00227 } 00228 break; 00229 default: 00230 break; 00231 } 00232 producePending = LBM_NOFLUSH_PENDING; 00233 00234 ExecStreamResult rc; 00235 00236 if (nIdxKeys == 1) { 00237 rc = generateSingleKeyBitmaps(quantum); 00238 } else { 00239 rc = generateMultiKeyBitmaps(quantum); 00240 } 00241 00242 switch (rc) { 00243 case EXECRC_BUF_OVERFLOW: 00244 case EXECRC_QUANTUM_EXPIRED: 00245 return rc; 00246 case EXECRC_EOS: 00247
00248 if (createIndex) { 00249
00250
00251 assert(rowCount >= numRowsToLoad); 00252 } 00253 doneReading = true; 00254 return EXECRC_BUF_UNDERFLOW; 00255 default: 00256 permAssert(false); 00257 } 00258 } 00259 00260 ExecStreamResult LbmGeneratorExecStream::generateSingleKeyBitmaps( 00261 ExecStreamQuantum const &quantum) 00262 { 00263
00264
00265 for (uint i = 0; i < quantum.nTuplesMax; i++) { 00266 if (revertToSingletons && 00267 pClusters[0]->clusterCols[0].batchIsCompressed()) 00268 { 00269 if (generateBitmaps()) { 00270 return EXECRC_BUF_OVERFLOW; 00271 } 00272 } else if (batchRead) { 00273 if (generateSingletons()) { 00274 return EXECRC_BUF_OVERFLOW; 00275 } 00276 } 00277 00278
00279 batchRead = false; 00280 revertToSingletons = false; 00281 SharedLcsClusterReader &pScan = pClusters[0]; 00282 if (!pScan->nextRange()) { 00283 return EXECRC_EOS; 00284 } 00285 pScan->clusterCols[0].sync(); 00286 } 00287 return EXECRC_QUANTUM_EXPIRED; 00288 } 00289 00290 ExecStreamResult LbmGeneratorExecStream::generateMultiKeyBitmaps( 00291 ExecStreamQuantum const &quantum) 00292 { 00293
00294
00295 for (uint i = 0; i < quantum.nTuplesMax; i++) { 00296 uint prevClusterEnd = 0; 00297 if (skipRead) { 00298
00299
00300 bitmapTuple.resetBuffer(); 00301 for (uint iClu = 0; iClu < nClusters; iClu++) { 00302 SharedLcsClusterReader &pScan = pClusters[iClu]; 00303 00304 if (currRid >= pScan->getRangeEndRid()) { 00305
00306
00307 if (!pScan->nextRange()) { 00308 assert( 00309 iClu == 0 && 00310 (nClusters == 1 || pClusters[1]->nextRange())); 00311 return EXECRC_EOS; 00312 } 00313 assert( 00314 currRid >= pScan->getRangeStartRid() && 00315 currRid < pScan->getRangeEndRid()); 00316 syncColumns(pScan); 00317 } else { 00318 assert(currRid >= pScan->getRangeStartRid()); 00319 pScan->advanceWithinBatch( 00320 opaqueToInt(currRid - pScan->getCurrentRid())); 00321 } 00322 readColVals( 00323 pScan, 00324 bitmapTuple, 00325 prevClusterEnd); 00326 prevClusterEnd += pScan->nColsToRead; 00327 } 00328 } 00329 00330 createSingletonBitmapEntry(); 00331 if (flushEntry(0)) { 00332 return EXECRC_BUF_OVERFLOW; 00333 } 00334 } 00335 return EXECRC_QUANTUM_EXPIRED; 00336 } 00337 00338 void LbmGeneratorExecStream::createSingletonBitmapEntry() 00339 { 00340
00341
00342
00343 initRidAndBitmap(bitmapTuple, &currRid); 00344 bool rc = addRidToBitmap(0, bitmapTuple, currRid); 00345 assert(rc); 00346 skipRead = false; 00347 ++currRid; 00348 ++rowCount; 00349 } 00350 00351 void LbmGeneratorExecStream::closeImpl() 00352 { 00353 BTreeExecStream::closeImpl(); 00354 LcsRowScanBaseExecStream::closeImpl(); 00355 keyCodes.clear(); 00356 bitmapTable.clear(); 00357 scratchPages.clear(); 00358 00359 if (scratchAccessor.pSegment) { 00360 scratchAccessor.pSegment->deallocatePageRange( 00361 NULL_PAGE_ID, NULL_PAGE_ID); 00362 } 00363 } 00364 00365 bool LbmGeneratorExecStream::generateBitmaps() 00366 { 00367
00368
00369 LcsColumnReader &colReader = pClusters[0]->clusterCols[0]; 00370 uint nDistinctVals = colReader.getBatchValCount(); 00371 00372
00373 uint nRows = pClusters[0]->getRangeRowsLeft(); 00374 00375
00376 if (batchRead) { 00377 uint nRead; 00378 00379
00380
00381 if (initBitmapTable(nDistinctVals)) { 00382 revertToSingletons = true; 00383 return generateSingletons(); 00384 } 00385 00386 keyCodes.resize(nRows); 00387 colReader.readCompressedBatch(nRows, &keyCodes[0], &nRead); 00388 assert(nRows == nRead); 00389 00390 batchRead = true; 00391 currBatch = 0; 00392 } 00393 00394
00395
00396 for (uint i = currBatch; i < nRows; i++) { 00397 if (skipRead) { 00398 PBuffer curValue = colReader.getBatchValue(keyCodes[i]); 00399
00400
00401 bitmapTuple.resetBuffer(); 00402 00403 attrAccessors[0].loadValue(bitmapTuple[0], curValue); 00404 initRidAndBitmap(bitmapTuple, &currRid); 00405 } 00406 if (addRidToBitmap(keyCodes[i], bitmapTuple, currRid)) { 00407 currBatch = i; 00408 skipRead = true; 00409 return false; 00410 } 00411 ++currRid; 00412 ++rowCount; 00413 skipRead = false; 00414 } 00415 00416
00417
00418 if (flushTable(0)) { 00419
00420
00421 currBatch = nRows; 00422 return false; 00423 } 00424 00425 return true; 00426 } 00427 00428 bool LbmGeneratorExecStream::generateSingletons() 00429 { 00430 SharedLcsClusterReader &pScan = pClusters[0]; 00431 00432 do { 00433
00434
00435 if (skipRead) { 00436 uint prevClusterEnd = 0; 00437 00438
00439
00440 bitmapTuple.resetBuffer(); 00441 00442 for (uint iCluCol = 0; iCluCol < pScan->nColsToRead; iCluCol++) { 00443 PBuffer curValue = 00444 pScan->clusterCols[iCluCol].getCurrentValue(); 00445 uint idx = projMap[prevClusterEnd + iCluCol]; 00446 00447 attrAccessors[idx].loadValue(bitmapTuple[idx], curValue); 00448 } 00449 prevClusterEnd += pScan->nColsToRead; 00450 } 00451 00452 createSingletonBitmapEntry(); 00453 if (flushEntry(0)) { 00454
00455
00456 if (advanceReader(pScan)) { 00457
00458
00459 batchRead = true; 00460 } 00461 return false; 00462 } 00463 00464
00465
00466
00467 if (advanceReader(pScan)) { 00468 return true; 00469 } 00470 } while (true); 00471 } 00472 00473 bool LbmGeneratorExecStream::advanceReader(SharedLcsClusterReader &pScan) 00474 { 00475 if (!pScan->advance(1)) { 00476 return false; 00477 } 00478 syncColumns(pScan); 00479 return true; 00480 } 00481 00482 bool LbmGeneratorExecStream::initBitmapTable(uint nEntries) 00483 { 00484
00485
00486
00487 uint nBufsPerPage = (uint) ceil((double) nEntries / maxNumScratchPages); 00488 uint currSize = scratchPageSize / nBufsPerPage; 00489 00490 if (currSize < minBitmapSize) { 00491 currSize = minBitmapSize; 00492 nBufsPerPage = scratchPageSize / currSize; 00493 } else if (currSize > maxBitmapSize) { 00494 currSize = maxBitmapSize; 00495 nBufsPerPage = scratchPageSize / currSize; 00496 } 00497 00498
00499
00500
00501
00502 uint nBuffers = nBufsPerPage * maxNumScratchPages; 00503 if (nBuffers < 8 && nEntries > nBuffers) { 00504 return false; 00505 } 00506 00507 if (nEntries > nBitmapEntries) { 00508
00509
00510 bitmapTable.resize(nEntries); 00511 for (uint i = nBitmapEntries; i < nEntries; i++) { 00512 bitmapTable[i].pBitmap = SharedLbmEntry(new LbmEntry()); 00513 } 00514 } 00515 00516 if (nEntries != nBitmapEntries) { 00517
00518
00519
00520
00521 00522 uint nPages = (uint) ceil((double) nEntries / nBufsPerPage); 00523 if (nPages > maxNumScratchPages) { 00524 nPages = maxNumScratchPages; 00525 } 00526 while (nPages > nScratchPagesAllocated) { 00527 scratchLock.allocatePage(); 00528 PBuffer newPage = scratchLock.getPage().getWritableData(); 00529 scratchPages.push_back(newPage); 00530 ++nScratchPagesAllocated; 00531 } 00532 uint idx = 0; 00533 for (uint i = 0; i < nPages; i++) { 00534 uint offset = 0; 00535 for (uint j = 0; j < nBufsPerPage; j++) { 00536 idx = i * nBufsPerPage + j; 00537 if (idx == nEntries) { 00538 break; 00539 } 00540 bitmapTable[idx].bufferPtr = scratchPages[i] + offset; 00541 offset += currSize; 00542 } 00543 if (idx == nEntries) { 00544 break; 00545 } 00546 } 00547
00548 for (uint i = idx + 1; i < nEntries; i++) { 00549 bitmapTable[i].bufferPtr = NULL; 00550 } 00551 } 00552 00553 for (uint i = 0; i < nEntries; i++) { 00554 bitmapTable[i].inuse = false; 00555 } 00556 flushIdx = 0; 00557 nBitmapEntries = nEntries; 00558 entrySize = currSize; 00559 00560 return true; 00561 } 00562 00563 void LbmGeneratorExecStream::initRidAndBitmap( 00564 TupleData &bitmapTuple, LcsRid* pCurrRid) 00565 { 00566 bitmapTuple[nIdxKeys].pData = (PConstBuffer) pCurrRid; 00567 bitmapTuple[nIdxKeys + 1].pData = NULL; 00568 bitmapTuple[nIdxKeys + 2].pData = NULL; 00569 } 00570 00571 bool LbmGeneratorExecStream::addRidToBitmap( 00572 uint keycode, TupleData &initBitmap, LcsRid rid) 00573 { 00574 assert(keycode <= nBitmapEntries); 00575 00576 if (bitmapTable[keycode].inuse) { 00577 assert(bitmapTable[keycode].bufferPtr); 00578 00579 bool maxedOut = bitmapTable[keycode].pBitmap->setRID(rid); 00580 if (maxedOut) { 00581 if (flushEntry(keycode)) { 00582 return false; 00583 } 00584 assert(bitmapTable[keycode].inuse); 00585 bitmapTable[keycode].inuse = true; 00586
00587
00588 bitmapTable[keycode].pBitmap->setEntryTuple(initBitmap); 00589 } 00590 } else { 00591 if (bitmapTable[keycode].bufferPtr) { 00592
00593
00594 PBuffer bufPtr = flushBuffer(rid); 00595 if (!bufPtr) { 00596 return false; 00597 } 00598 bitmapTable[keycode].bufferPtr = bufPtr; 00599 } 00600
00601 bitmapTable[keycode].pBitmap->init( 00602 bitmapTable[keycode].bufferPtr, NULL, entrySize, bitmapTupleDesc); 00603 bitmapTable[keycode].pBitmap->setEntryTuple(initBitmap); 00604 bitmapTable[keycode].inuse = true; 00605 } 00606 00607 return true; 00608 } 00609 00610 PBuffer LbmGeneratorExecStream::flushBuffer(LcsRid addRid) 00611 { 00612
00613
00614
00615
00616
00617
00618 PBuffer retPtr; 00619 uint nAttempts = 0; 00620 do { 00621 ++nAttempts; 00622 if (nAttempts > nBitmapEntries) { 00623
00624
00625 permAssert(false); 00626 } 00627 if (bitmapTable[flushIdx].bufferPtr) { 00628 retPtr = bitmapTable[flushIdx].bufferPtr; 00629 if (bitmapTable[flushIdx].inuse) { 00630
00631
00632
00633
00634 if (bitmapTable[flushIdx].pBitmap->inRange(addRid)) { 00635 flushIdx = ++flushIdx % nBitmapEntries; 00636 continue; 00637 } 00638 if (flushEntry(flushIdx)) { 00639 return NULL; 00640 } 00641 } 00642 bitmapTable[flushIdx].bufferPtr = NULL; 00643 break; 00644 } 00645 flushIdx = ++flushIdx % nBitmapEntries; 00646 } while (true); 00647 00648 flushIdx = ++flushIdx % nBitmapEntries; 00649 return retPtr; 00650 } 00651 00652 bool LbmGeneratorExecStream::flushTable(uint start) 00653 { 00654 assert(start <= nBitmapEntries); 00655 for (uint i = start; i < nBitmapEntries; i++) { 00656 if (bitmapTable[i].inuse) { 00657 if (flushEntry(i)) { 00658 producePending = LBM_TABLEFLUSH_PENDING; 00659 return false; 00660 } 00661 } 00662 } 00663 00664 return true; 00665 } 00666 00667 bool LbmGeneratorExecStream::flushEntry(uint keycode) 00668 { 00669 assert(bitmapTable[keycode].inuse && bitmapTable[keycode].bufferPtr); 00670 00671
00672
00673 00674 outputTuple = bitmapTable[keycode].pBitmap->produceEntryTuple(); 00675 00676 FENNEL_TRACE(TRACE_FINE, LbmEntry::toString(outputTuple)); 00677 00678 if (pOutAccessor->produceTuple(outputTuple)) { 00679 flushStart = keycode; 00680 producePending = LBM_ENTRYFLUSH_PENDING; 00681 return false; 00682 } 00683 00684
00685 bitmapTable[keycode].inuse = false; 00686 00687 return true; 00688 } 00689 00690 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmGeneratorExecStream.cpp#26 $"); 00691 00692