Fennel: /home/pub/open/dev/fennel/lucidera/colstore/LcsClusterAppendExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 #include "fennel/common/CommonPreamble.h" 00023 #include "fennel/lucidera/colstore/LcsClusterAppendExecStream.h" 00024 #include "fennel/lucidera/colstore/LcsClusterNode.h" 00025 #include "fennel/exec/ExecStreamBufAccessor.h" 00026 #include "fennel/btree/BTreeWriter.h" 00027 #include <boost/scoped_array.hpp> 00028 00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterAppendExecStream.cpp#23 $"); 00030 00031 void LcsClusterAppendExecStream::prepare( 00032 LcsClusterAppendExecStreamParams const &params) 00033 { 00034 BTreeExecStream::prepare(params); 00035 ConduitExecStream::prepare(params); 00036 00037 tableColsTupleDesc = pInAccessor->getTupleDesc(); 00038 initTupleLoadParams(params.inputProj); 00039 00040
00041
00042 00043 pInAccessor->bindProjection(params.inputProj); 00044 00045
00046 00047 scratchAccessor = params.scratchAccessor; 00048 bufferLock.accessSegment(scratchAccessor); 00049 00050
00051
00052
00053
00054 00055 TupleDescriptor outputTupleDesc; 00056 00057 outputTupleDesc = pOutAccessor->getTupleDesc(); 00058 outputTuple.compute(outputTupleDesc); 00059 outputTuple[0].pData = (PConstBuffer) &numRowCompressed; 00060 if (outputTupleDesc.size() > 1) { 00061 outputTuple[1].pData = (PConstBuffer) &startRow; 00062 } 00063 00064 outputTupleAccessor = & pOutAccessor->getScratchTupleAccessor(); 00065 00066 blockSize = treeDescriptor.segmentAccessor.pSegment->getUsablePageSize(); 00067 00068 } 00069 00070 void LcsClusterAppendExecStream::initTupleLoadParams( 00071 const TupleProjection &inputProj) 00072 { 00073 numColumns = inputProj.size(); 00074 clusterColsTupleDesc.projectFrom(tableColsTupleDesc, inputProj); 00075 clusterColsTupleData.compute(clusterColsTupleDesc); 00076 00077
00078 colTupleDesc.reset(new TupleDescriptor[numColumns]); 00079 for (int i = 0; i < numColumns; i++) { 00080 colTupleDesc[i].push_back(tableColsTupleDesc[inputProj[i]]); 00081 } 00082 } 00083 00084 void LcsClusterAppendExecStream::getResourceRequirements( 00085 ExecStreamResourceQuantity &minQuantity, 00086 ExecStreamResourceQuantity &optQuantity) 00087 { 00088 ConduitExecStream::getResourceRequirements(minQuantity,optQuantity); 00089 00090
00091
00092
00093
00094
00095
00096
00097 minQuantity.nCachePages += (numColumns * 4) + 6; 00098 00099
00100 optQuantity = minQuantity; 00101 } 00102 00103 void LcsClusterAppendExecStream::open(bool restart) 00104 { 00105 BTreeExecStream::open(restart); 00106 ConduitExecStream::open(restart); 00107 00108 if (!restart) { 00109 outputTupleBuffer.reset( 00110 new FixedBuffer[outputTupleAccessor->getMaxByteCount()]); 00111 } 00112 00113 init(); 00114 isDone = false; 00115 } 00116 00117 ExecStreamResult LcsClusterAppendExecStream::execute( 00118 ExecStreamQuantum const &quantum) 00119 { 00120 return compress(quantum); 00121 } 00122 00123 void LcsClusterAppendExecStream::closeImpl() 00124 { 00125 BTreeExecStream::closeImpl(); 00126 ConduitExecStream::closeImpl(); 00127 outputTupleBuffer.reset(); 00128 close(); 00129 } 00130 00131 void LcsClusterAppendExecStream::init() 00132 { 00133 pIndexBlock = 0; 00134 firstRow = LcsRid(0); 00135 lastRow = LcsRid(0); 00136 startRow = LcsRid(0); 00137 rowCnt = 0; 00138 indexBlockDirty = false; 00139 arraysAlloced = false; 00140 compressCalled = false; 00141 numRowCompressed = 0; 00142 } 00143 00144 ExecStreamResult LcsClusterAppendExecStream::compress( 00145 ExecStreamQuantum const &quantum) 00146 { 00147 uint i, j, k; 00148 bool canFit = false; 00149 bool undoInsert = false; 00150 00151 if (isDone) { 00152
00153 pOutAccessor->markEOS(); 00154 return EXECRC_EOS; 00155 } 00156 00157 for (i = 0; i < quantum.nTuplesMax; i++) { 00158
00159
00160
00161 ExecStreamResult rc = getTupleForLoad(); 00162 00163
00164 if (rc == EXECRC_EOS) { 00165
00166
00167 if (rowCnt) { 00168
00169
00170 if (rowCnt < 8 || (rowCnt % 8) == 0) { 00171 writeBatch(true); 00172 } else { 00173 writeBatch(false); 00174 } 00175 } 00176 00177
00178
00179
00180
00181 writeBlock(); 00182 if (lcsBlockBuilder) { 00183 lcsBlockBuilder->close(); 00184 } 00185 close(); 00186 00187
00188
00189
00190
00191 00192 outputTupleAccessor->marshal(outputTuple, outputTupleBuffer.get()); 00193 pOutAccessor->provideBufferForConsumption( 00194 outputTupleBuffer.get(), 00195 outputTupleBuffer.get() + 00196 outputTupleAccessor->getCurrentByteCount()); 00197 00198 isDone = true; 00199 return EXECRC_BUF_OVERFLOW; 00200 } else if (rc != EXECRC_YIELD) { 00201 return rc; 00202 } 00203 00204
00205
00206
00207 undoInsert = false; 00208 00209 for (j = 0; j < numColumns; j++) { 00210 hash[j].insert( 00211 clusterColsTupleData[j], &hashValOrd[j], &undoInsert); 00212 00213 if (undoInsert) { 00214
00215
00216
00217 for (k = 0; k <= j; k++) { 00218 hash[k].undoInsert(clusterColsTupleData[k]); 00219 } 00220 break; 00221 } 00222 } 00223 00224
00225
00226
00227
00228 if (!undoInsert) { 00229 canFit = true; 00230 } else { 00231 canFit = false; 00232 } 00233 00234 if (canFit) { 00235
00236 for (j = 0; j < numColumns; j++) { 00237 addValueOrdinal(j, hashValOrd[j].getValOrd()); 00238 } 00239 00240 rowCnt++; 00241 00242
00243 if (isRowArrayFull()) { 00244 writeBatch(false); 00245 } 00246 } else { 00247
00248 writeBatch(false); 00249 00250
00251
00252
00253
00254 continue; 00255 } 00256 00257
00258
00259 postProcessTuple(); 00260 } 00261 00262 return EXECRC_QUANTUM_EXPIRED; 00263 } 00264 00265 ExecStreamResult LcsClusterAppendExecStream::getTupleForLoad() 00266 { 00267 if (pInAccessor->getState() == EXECBUF_EOS) { 00268 return EXECRC_EOS; 00269 } 00270 00271 if (pInAccessor->demandData()) { 00272 return EXECRC_BUF_UNDERFLOW; 00273 } 00274 00275
00276 initLoad(); 00277 00278 if (pInAccessor->isTupleConsumptionPending()) { 00279 pInAccessor->unmarshalProjectedTuple(clusterColsTupleData); 00280 } 00281 00282 return EXECRC_YIELD; 00283 } 00284 00285 void LcsClusterAppendExecStream::initLoad() 00286 { 00287
00288
00289
00290
00291
00292
00293
00294
00295
00296 00297 if (compressCalled) { 00298 compressCalled = true; 00299 00300
00301
00302
00303
00304 lcsBlockBuilder = SharedLcsClusterNodeWriter( 00305 new LcsClusterNodeWriter( 00306 treeDescriptor, 00307 scratchAccessor, 00308 clusterColsTupleDesc, 00309 getSharedTraceTarget(), 00310 getTraceSourceName())); 00311 00312 allocArrays(); 00313 00314
00315 for (uint i = 0; i < numColumns; i++) { 00316 bufferLock.allocatePage(); 00317 rowBlock[i] = bufferLock.getPage().getWritableData(); 00318 bufferLock.unlock(); 00319 00320 bufferLock.allocatePage(); 00321 hashBlock[i] = bufferLock.getPage().getWritableData(); 00322 bufferLock.unlock(); 00323 00324 bufferLock.allocatePage(); 00325 builderBlock[i] = bufferLock.getPage().getWritableData(); 00326 bufferLock.unlock(); 00327 00328 hash[i].init( 00329 hashBlock[i], lcsBlockBuilder, colTupleDesc[i], i, blockSize); 00330 } 00331 00332 nRowsMax = blockSize / sizeof(uint16_t); 00333 00334
00335 00336 PLcsClusterNode pExistingIndexBlock; 00337 00338 bool found = getLastBlock(pExistingIndexBlock); 00339 if (found) { 00340
00341 pIndexBlock = pExistingIndexBlock; 00342 00343
00344
00345 loadExistingBlock(); 00346 } else { 00347
00348 startNewBlock(); 00349 startRow = LcsRid(0); 00350 } 00351 } 00352 } 00353 00354 void LcsClusterAppendExecStream::postProcessTuple() 00355 { 00356 pInAccessor->consumeTuple(); 00357 numRowCompressed++; 00358 } 00359 00360 void LcsClusterAppendExecStream::close() 00361 { 00362 if (scratchAccessor.pSegment) { 00363 scratchAccessor.pSegment->deallocatePageRange( 00364 NULL_PAGE_ID, NULL_PAGE_ID); 00365 } 00366 rowBlock.reset(); 00367 hashBlock.reset(); 00368 builderBlock.reset(); 00369 00370 hash.reset(); 00371 hashValOrd.reset(); 00372 tempBuf.reset(); 00373 maxValueSize.reset(); 00374 00375 lcsBlockBuilder.reset(); 00376 } 00377 00378 void LcsClusterAppendExecStream::startNewBlock() 00379 { 00380 firstRow = lastRow; 00381 00382
00383 pIndexBlock = lcsBlockBuilder->allocateClusterPage(firstRow); 00384 00385
00386 lcsBlockBuilder->init( 00387 numColumns, reinterpret_cast<uint8_t *> (pIndexBlock), 00388 builderBlock.get(), blockSize); 00389 00390
00391 for (uint i = 0; i < numColumns; i++) { 00392 hash[i].init( 00393 hashBlock[i], lcsBlockBuilder, colTupleDesc[i], i, blockSize); 00394 } 00395 00396
00397
00398
00399
00400 if (rowCnt >= 8) { 00401 rowCnt = 0; 00402 } 00403 indexBlockDirty = false; 00404 00405
00406 lcsBlockBuilder->openNew(firstRow); 00407 } 00408 00409 bool LcsClusterAppendExecStream::getLastBlock(PLcsClusterNode &pBlock) 00410 { 00411 if (lcsBlockBuilder->getLastClusterPageForWrite(pBlock, firstRow)) { 00412 return false; 00413 } else { 00414 return true; 00415 } 00416 } 00417 00418 void LcsClusterAppendExecStream::loadExistingBlock() 00419 { 00420 boost::scoped_array numVals;
00421 boost::scoped_array lastValOff; 00422 boost::scoped_array<boost::scoped_array > aLeftOverBufs; 00423
00424
00425
00426 uint anLeftOvers;
00427
00428
00429
00430 boost::scoped_array aiFixedSize;
00431
00432
00433 LcsHashValOrd vOrd; 00434 00435 uint i, j; 00436 RecordNum startRowCnt; 00437 RecordNum nrows; 00438 00439 lcsBlockBuilder->init( 00440 numColumns, reinterpret_cast<uint8_t *> (pIndexBlock), 00441 builderBlock.get(), blockSize); 00442 00443 lastValOff.reset(new uint16_t[numColumns]); 00444 numVals.reset(new uint[numColumns]); 00445 00446
00447
00448
00449
00450
00451 00452
00453
00454
00455 00456 bool bStartNewBlock = 00457 lcsBlockBuilder->openAppend(numVals.get(), lastValOff.get(), nrows); 00458 lastRow = firstRow + nrows; 00459 startRow = lastRow; 00460 00461
00462 aiFixedSize.reset(new uint[numColumns]); 00463 aLeftOverBufs.reset(new boost::scoped_array[numColumns]); 00464 00465 startRowCnt = rowCnt; 00466 00467
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478 for (i = 0; i < numColumns; i++) { 00479
00480 rowCnt = startRowCnt; 00481 lcsBlockBuilder->describeLastBatch(i, anLeftOvers, aiFixedSize[i]); 00482 00483
00484
00485
00486 if (anLeftOvers > 0) { 00487 aLeftOverBufs[i].reset( 00488 new FixedBuffer[anLeftOvers * aiFixedSize[i]]); 00489 lcsBlockBuilder->rollBackLastBatch(i, aLeftOverBufs[i].get()); 00490 indexBlockDirty = true; 00491 } 00492 } 00493 00494
00495 lastRow -= anLeftOvers; 00496 00497
00498 if (bStartNewBlock) { 00499 writeBlock(); 00500 startNewBlock(); 00501 } 00502 00503
00504 for (i = 0; i < numColumns; i++) { 00505
00506 rowCnt = startRowCnt; 00507 00508 if (!bStartNewBlock) { 00509
00510
00511
00512
00513
00514 hash[i].restore(numVals[i], lastValOff[i]); 00515 } 00516 00517
00518
00519
00520 if (anLeftOvers > 0) { 00521 uint8_t *val; 00522 bool undoInsert = false; 00523 00524
00525
00526
00527
00528
00529 if (hash[i].isHashFull(anLeftOvers)) { 00530 hash[i].startNewBatch(anLeftOvers); 00531 } 00532 00533 for (j = 0, val = aLeftOverBufs[i].get(); 00534 j < anLeftOvers; 00535 j++, val += aiFixedSize[i]) 00536 { 00537 hash[i].insert(val, &vOrd, &undoInsert); 00538 00539
00540 assert(!undoInsert); 00541 addValueOrdinal(i, vOrd.getValOrd()); 00542 rowCnt++; 00543 } 00544 } 00545 } 00546 } 00547 00548 void LcsClusterAppendExecStream::addValueOrdinal(uint column, uint16_t vOrd) 00549 { 00550 uint16_t *rowWordArray = (uint16_t *) rowBlock[column]; 00551 rowWordArray[rowCnt] = vOrd; 00552 00553
00554 indexBlockDirty = true; 00555 } 00556 00557 bool LcsClusterAppendExecStream::isRowArrayFull() 00558 { 00559 if (rowCnt >= nRowsMax) { 00560 return true; 00561 } else { 00562 return false; 00563 } 00564 } 00565 00566 void LcsClusterAppendExecStream::writeBatch(bool lastBatch) 00567 { 00568 uint16_t *oVals; 00569 uint leftOvers; 00570 PBuffer val; 00571 LcsBatchMode mode; 00572 uint i, j; 00573 uint origRowCnt, count = 0; 00574 00575 lastRow += rowCnt; 00576 00577 for (origRowCnt = rowCnt, i = 0; i < numColumns; i++) { 00578 rowCnt = origRowCnt; 00579 00580
00581 maxValueSize[i] = hash[i].getMaxValueSize(); 00582 00583
00584 lcsBlockBuilder->pickCompressionMode( 00585 i, maxValueSize[i], rowCnt, &oVals, mode); 00586 leftOvers = rowCnt > 8 ? rowCnt % 8 : 0; 00587 00588
00589
00590
00591
00592
00593 if (leftOvers) { 00594 tempBuf[i].reset(new FixedBuffer[leftOvers * maxValueSize[i]]); 00595 count = leftOvers; 00596 00597 } else if (origRowCnt < 8) { 00598 tempBuf[i].reset(new FixedBuffer[origRowCnt * maxValueSize[i]]); 00599 count = origRowCnt; 00600 } else { 00601
00602 tempBuf[i].reset(); 00603 } 00604 00605
00606 if (LCS_FIXED == mode || LCS_VARIABLE == mode) { 00607 hash[i].prepareFixedOrVariableBatch( 00608 (PBuffer) rowBlock[i], rowCnt); 00609 lcsBlockBuilder->putFixedVarBatch( 00610 i, (uint16_t *) rowBlock[i], tempBuf[i].get()); 00611 if (mode == LCS_FIXED) { 00612 hash[i].clearFixedEntries(); 00613 } 00614 00615 } else { 00616 uint16_t numVals; 00617 00618
00619 hash[i].prepareCompressedBatch( 00620 (PBuffer) rowBlock[i], rowCnt, (uint16_t *) &numVals, oVals); 00621 lcsBlockBuilder->putCompressedBatch( 00622 i, (PBuffer) rowBlock[i], tempBuf[i].get()); 00623 } 00624 00625
00626 rowCnt = 0; 00627 hash[i].startNewBatch(!lastBatch ? count : 0); 00628 } 00629 00630
00631 if (!lastBatch) { 00632 lastRow -= count; 00633 } 00634 bool bStartNewBlock; 00635 bStartNewBlock = false; 00636 00637
00638
00639
00640
00641
00642 if (!lastBatch && origRowCnt < 8) { 00643
00644 for (i = 0; i < numColumns; i++) { 00645 lcsBlockBuilder->rollBackLastBatch(i, tempBuf[i].get()); 00646 } 00647 bStartNewBlock = true; 00648 } 00649 00650
00651
00652
00653
00654 if (bStartNewBlock || (!lastBatch && lcsBlockBuilder->isEndOfBlock())) { 00655 writeBlock(); 00656 startNewBlock(); 00657 } 00658 00659
00660 if (!lastBatch) { 00661 for (i = 0; i < numColumns; i++) { 00662 rowCnt = 0; 00663 for (j = 0, val = tempBuf[i].get(); j < count; j++) { 00664 LcsHashValOrd vOrd; 00665 bool undoInsert = false; 00666 00667 hash[i].insert(val, &vOrd, &undoInsert); 00668 00669
00670
00671 assert(!undoInsert); 00672 addValueOrdinal(i, vOrd.getValOrd()); 00673 rowCnt++; 00674 val += maxValueSize[i]; 00675 } 00676 } 00677 } 00678 00679 for (i = 0; i < numColumns; i++) { 00680 if (tempBuf[i].get()) { 00681 tempBuf[i].reset(); 00682 } 00683 } 00684 } 00685 00686 void LcsClusterAppendExecStream::writeBlock() 00687 { 00688 if (indexBlockDirty) { 00689
00690
00691 if (rowCnt) { 00692 writeBatch(true); 00693 00694
00695
00696 00697
00698
00699 if (indexBlockDirty) { 00700 return; 00701 } 00702 } 00703 00704
00705
00706 lcsBlockBuilder->endBlock(); 00707 00708
00709 00710 indexBlockDirty = false; 00711 } 00712 } 00713 00714 void LcsClusterAppendExecStream::allocArrays() 00715 { 00716
00717 if (arraysAlloced) { 00718 return; 00719 } 00720 arraysAlloced = true; 00721 00722
00723 hash.reset(new LcsHash[numColumns]); 00724 00725
00726 rowBlock.reset(new PBuffer[numColumns]); 00727 hashBlock.reset(new PBuffer[numColumns]); 00728 00729 builderBlock.reset(new PBuffer[numColumns]); 00730 00731 hashValOrd.reset(new LcsHashValOrd[numColumns]); 00732 tempBuf.reset(new boost::scoped_array[numColumns]); 00733 maxValueSize.reset(new uint[numColumns]); 00734 } 00735 00736 ExecStreamBufProvision 00737 LcsClusterAppendExecStream::getOutputBufProvision() const 00738 { 00739 return BUFPROV_PRODUCER; 00740 } 00741 00742 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterAppendExecStream.cpp#23 $"); 00743 00744