Fennel: /home/pub/open/dev/fennel/lucidera/colstore/LcsClusterNodeWriter.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 #include "fennel/common/CommonPreamble.h" 00023 #include "fennel/lucidera/colstore/LcsClusterNodeWriter.h" 00024 #include "fennel/tuple/TupleAccessor.h" 00025 #include <boost/scoped_array.hpp> 00026 00027 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterNodeWriter.cpp#20 $"); 00028 00029 LcsClusterNodeWriter::LcsClusterNodeWriter( 00030 BTreeDescriptor const &treeDescriptorInit, 00031 SegmentAccessor const &accessorInit, 00032 TupleDescriptor const &colTupleDescInit, 00033 SharedTraceTarget pTraceTargetInit, 00034 std::string nameInit) : 00035 LcsClusterAccessBase(treeDescriptorInit), 00036 TraceSource(pTraceTargetInit, nameInit) 00037 { 00038 scratchAccessor = accessorInit; 00039 bufferLock.accessSegment(scratchAccessor); 00040 bTreeWriter = SharedBTreeWriter( 00041 new BTreeWriter(treeDescriptorInit, scratchAccessor, true)); 00042 colTupleDesc = colTupleDescInit; 00043 clusterDump = 00044 SharedLcsClusterDump( 00045 new LcsClusterDump( 00046 treeDescriptorInit, 00047 colTupleDesc, 00048 TRACE_FINE, 00049 pTraceTargetInit, 00050 nameInit)); 00051 nClusterCols = 0; 00052 pHdr = 0; 00053 hdrSize = 0; 00054 pIndexBlock = 0; 00055 pBlock = 0; 00056 szBlock = 0; 00057 minSzLeft = 0; 00058 batchDirs.reset(); 00059 pValBank.reset(); 00060 oValBank.reset(); 00061 batchOffset.reset(); 00062 batchCount.reset(); 00063 szLeft = 0; 00064 nBits.reset(); 00065 nextWidthChange.reset(); 00066 arraysAllocated = false; 00067 valBankStart.reset(); 00068 bForceMode.reset(); 00069 forceModeCount.reset(); 00070 maxValueSize.reset(); 00071 } 00072 00073 LcsClusterNodeWriter::~LcsClusterNodeWriter() 00074 { 00075 close(); 00076 } 00077 00078 void LcsClusterNodeWriter::close() 00079 { 00080
00081 if (clusterLock.isLocked()) { 00082 clusterLock.flushPage(true); 00083 } 00084 clusterLock.unlock(); 00085 00086 bTreeWriter.reset(); 00087 batchDirs.reset(); 00088 pValBank.reset(); 00089 valBankStart.reset(); 00090 forceModeCount.reset(); 00091 bForceMode.reset(); 00092 oValBank.reset(); 00093 batchOffset.reset(); 00094 batchCount.reset(); 00095 nBits.reset(); 00096 nextWidthChange.reset(); 00097 maxValueSize.reset(); 00098 attrAccessors.reset(); 00099 } 00100 00101 bool LcsClusterNodeWriter::getLastClusterPageForWrite( 00102 PLcsClusterNode &pBlock, LcsRid &firstRid) 00103 { 00104
00105
00106 00107 if (bTreeWriter->searchLast() == false) { 00108 bTreeWriter->endSearch(); 00109 return false; 00110 } 00111 00112 bTreeWriter->getTupleAccessorForRead().unmarshal(bTreeTupleData); 00113 clusterPageId = readClusterPageId(); 00114 clusterLock.lockExclusive(clusterPageId); 00115 pBlock = &(clusterLock.getNodeForWrite()); 00116 firstRid = pBlock->firstRID; 00117 00118
00119
00120 bTreeWriter->endSearch(); 00121 00122 if (isTracingLevel(TRACE_FINE)) { 00123 FENNEL_TRACE( 00124 TRACE_FINE, 00125 "Calling ClusterDump from getLastClusterPageForWrite"); 00126 clusterDump->dump(opaqueToInt(clusterPageId), pBlock, szBlock); 00127 } 00128 00129 return true; 00130 } 00131 00132 PLcsClusterNode LcsClusterNodeWriter::allocateClusterPage(LcsRid firstRid) 00133 { 00134
00135
00136 00137 PageId prevPageId = NULL_PAGE_ID; 00138 00139 if (clusterLock.isLocked()) { 00140
00141 prevPageId = clusterLock.getPageId(); 00142 00143
00144
00145
00146 clusterLock.flushPage(true); 00147 } 00148 00149 clusterPageId = clusterLock.allocatePage(); 00150 if (prevPageId != NULL_PAGE_ID) { 00151 segmentAccessor.pSegment->setPageSuccessor(prevPageId, clusterPageId); 00152 } 00153 bTreeRid = firstRid; 00154 bTreeTupleData[0].pData = reinterpret_cast<uint8_t *> (&firstRid); 00155 bTreeTupleData[1].pData = reinterpret_cast<uint8_t *> (&clusterPageId); 00156 bTreeWriter->insertTupleData(bTreeTupleData, DUP_FAIL); 00157 return &(clusterLock.getNodeForWrite()); 00158 } 00159 00160 void LcsClusterNodeWriter::init( 00161 uint nColumn, PBuffer iBlock, PBuffer *pB, uint szB) 00162 { 00163 nClusterCols = nColumn; 00164 pIndexBlock = iBlock; 00165 pBlock = pB; 00166 szBlock = szB; 00167 pHdr = (PLcsClusterNode) pIndexBlock; 00168 00169 hdrSize = getClusterSubHeaderSize(nClusterCols); 00170 00171
00172
00173 00174 setHdrOffsets(pHdr); 00175 00176 minSzLeft = nClusterCols * (LcsMaxLeftOver * sizeof(uint16_t) + 00177 sizeof(LcsBatchDir)); 00178 00179 allocArrays(); 00180 } 00181 00182 void LcsClusterNodeWriter::openNew(LcsRid startRID) 00183 { 00184 int i; 00185 00186
00187 pHdr->firstRID = startRID; 00188 pHdr->nColumn = nClusterCols; 00189 pHdr->nBatch = 0; 00190 pHdr->oBatch = hdrSize; 00191 00192 for (i = 0; i < nClusterCols; i++) { 00193 lastVal[i] = szBlock; 00194 firstVal[i] = (uint16_t) szBlock; 00195 nVal[i] = 0; 00196 delta[i] = 0; 00197 batchDirs[i].mode = LCS_COMPRESSED; 00198 batchDirs[i].nVal = 0; 00199 batchDirs[i].nRow = 0; 00200 batchDirs[i].oVal = 0; 00201 batchDirs[i].oLastValHighMark = lastVal[i]; 00202 batchDirs[i].nValHighMark = nVal[i]; 00203 batchOffset[i] = hdrSize; 00204
00205 nBits[i] = 0; 00206 nextWidthChange[i] = 1; 00207 batchCount[i] = 0; 00208 } 00209 00210
00211
00212 00213 szLeft = szBlock - hdrSize - 00214 (2 * sizeof(LcsBatchDir)) * nClusterCols; 00215 szLeft = std::max(szLeft, 0); 00216 assert(szLeft >= 0); 00217 } 00218 00219 bool LcsClusterNodeWriter::openAppend( 00220 uint *nValOffsets, uint16_t lastValOffsets, RecordNum &nrows) 00221 { 00222 int i; 00223 00224
00225 szLeft = lastVal[nClusterCols - 1] - pHdr->oBatch - 00226 (pHdr->nBatch + 2
nClusterCols) * sizeof(LcsBatchDir); 00227 szLeft = std::max(szLeft, 0); 00228 assert(szLeft >= 0); 00229 00230
00231
00232 nrows = moveFromIndexToTemp(); 00233 00234 for (i = 0; i < nClusterCols; i++) { 00235 nValOffsets[i] = nVal[i]; 00236 lastValOffsets[i] = lastVal[i]; 00237 memset(&batchDirs[i], 0, sizeof(LcsBatchDir)); 00238 00239 batchDirs[i].oLastValHighMark = lastVal[i]; 00240 batchDirs[i].nValHighMark = nVal[i]; 00241 batchDirs[i].mode = LCS_COMPRESSED; 00242 00243
00244 nBits[i] = 0; 00245 nextWidthChange[i] = 1; 00246 00247 oValBank[i] = 0; 00248 batchCount[i] = pHdr->nBatch / nClusterCols; 00249 } 00250 00251 return (szLeft == 0); 00252 } 00253 00254 void LcsClusterNodeWriter::describeLastBatch( 00255 uint column, uint &dRow, uint &recSize) 00256 { 00257 PLcsBatchDir pBatch; 00258 00259 pBatch = (PLcsBatchDir) (pBlock[column] + batchOffset[column]); 00260 dRow = pBatch[batchCount[column] -1].nRow % 8; 00261 recSize = pBatch[batchCount[column] -1].recSize; 00262 } 00263 00264 uint16_t LcsClusterNodeWriter::getNextVal(uint column, uint16_t thisVal) 00265 { 00266 if (thisVal && thisVal != szBlock) { 00267 return 00268 (uint16_t) (thisVal + 00269 attrAccessors[column].getStoredByteCount( 00270 pBlock[column] + thisVal)); 00271 } else { 00272 return 0; 00273 } 00274 } 00275 00276 void LcsClusterNodeWriter::rollBackLastBatch(uint column, PBuffer pBuf) 00277 { 00278 uint i; 00279 PLcsBatchDir pBatch; 00280 uint16_t *pValOffsets; 00281 00282 uint8_t *pBit;
00283 WidthVec w;
00284 PtrVec p;
00285 uint iV;
00286 00287 uint16_t rows[LcsMaxRollBack];
00288 int origSzLeft; 00289 uint len; 00290 00291
00292 pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column]); 00293 batchDirs[column] = pBatch[batchCount[column] -1]; 00294 00295
00296 origSzLeft = lastVal[column] - batchOffset[column] - 00297 (batchCount[column]+2)*sizeof(LcsBatchDir); 00298 00299 if ((batchDirs[column].nRow > 8) || (batchDirs[column].nRow % 8) == 0) { 00300 return; 00301 } 00302 00303 if (batchDirs[column].mode == LCS_COMPRESSED) { 00304
00305 iV = bitVecWidth(calcWidth(batchDirs[column].nVal), w); 00306 00307
00308 pBit = pBlock[column] + batchDirs[column].oVal + 00309 batchDirs[column].nVal * sizeof(uint16_t); 00310 00311
00312 bitVecPtr(batchDirs[column].nRow, iV, w, p, pBit); 00313 00314
00315 readBitVecs(rows, iV, w, p, 0, batchDirs[column].nRow); 00316 00317
00318 pValOffsets = (uint16_t *)(pBlock[column] + batchDirs[column].oVal); 00319 00320
00321 for (i = 0; i < batchDirs[column].nRow; 00322 i++, pBuf += batchDirs[column].recSize) 00323 { 00324 len = 00325 attrAccessors[column].getStoredByteCount( 00326 pBlock[column] + pValOffsets[rows[i]]); 00327 memcpy(pBuf, pBlock[column] + pValOffsets[rows[i]], len); 00328 } 00329 00330 } else if (batchDirs[column].mode == LCS_FIXED) { 00331
00332
00333 memcpy( 00334 pBuf, 00335 pBlock[column] + batchDirs[column].oVal, 00336 batchDirs[column].nRow * batchDirs[column].recSize); 00337 } else { 00338
00339
00340 pValOffsets = (uint16_t *)(pBlock[column] + batchDirs[column].oVal); 00341 00342
00343 for (i = 0; i < batchDirs[column].nRow; 00344 i++, pBuf += batchDirs[column].recSize) 00345 { 00346 len = 00347 attrAccessors[column].getStoredByteCount( 00348 pBlock[column] + pValOffsets[i]); 00349 memcpy(pBuf, pBlock[column] + pValOffsets[i], len); 00350 } 00351 } 00352 00353
00354 batchCount[column]--; 00355
00356 batchOffset[column] = batchDirs[column].oVal; 00357 00358
00359 memmove( 00360 pBlock[column] + batchOffset[column], 00361 pBatch, 00362 batchCount[column] * sizeof(LcsBatchDir)); 00363 00364
00365
00366
00367
00368 int newSz; 00369 newSz = lastVal[column] - batchOffset[column] - 00370 (batchCount[column] + 2) * sizeof(LcsBatchDir); 00371 szLeft += (newSz - origSzLeft); 00372 szLeft = std::max(szLeft, 0); 00373 assert(szLeft >= 0); 00374 00375
00376 nBits[column] = 0; 00377 nextWidthChange[column] = 1; 00378 00379
00380 batchDirs[column].mode = LCS_COMPRESSED; 00381 batchDirs[column].nVal = 0; 00382 batchDirs[column].nRow = 0; 00383 batchDirs[column].oVal = 0; 00384 batchDirs[column].recSize = 0; 00385 } 00386 00387 00388 00389 bool LcsClusterNodeWriter::addValue(uint column, bool bFirstTimeInBatch) 00390 { 00391
00392 szLeft -= sizeof(uint16_t); 00393 00394
00395 if (szLeft < ((int) nClusterCols * LcsMaxSzLeftError)) { 00396
00397 szLeft += sizeof(uint16_t); 00398 assert(szLeft >= 0); 00399 return false; 00400 } 00401 00402 if (bFirstTimeInBatch) { 00403
00404
00405 batchDirs[column].nVal++; 00406 00407
00408
00409 if (batchDirs[column].nVal == nextWidthChange[column]) { 00410
00411
00412 nBits[column] = calcWidth(batchDirs[column].nVal); 00413 nextWidthChange[column] = (1 << nBits[column]) + 1; 00414 } 00415 } 00416 00417 return true; 00418 } 00419 00420 00421 00422 bool LcsClusterNodeWriter::addValue(uint column, PBuffer pVal, uint16_t *oVal) 00423 { 00424 uint16_t lastValOffset; 00425 int oldSzLeft = szLeft; 00426 uint szVal = attrAccessors[column].getStoredByteCount(pVal); 00427 00428
00429
00430
00431
00432 if (bForceMode[column] == fixed) { 00433 if (szVal > maxValueSize[column]) { 00434 szLeft -= batchDirs[column].nVal * 00435 (szVal - maxValueSize[column]); 00436 maxValueSize[column] = szVal; 00437 } 00438 } 00439 00440
00441
00442
00443
00444
00445 if (bForceMode[column] == fixed) { 00446 szLeft -= maxValueSize[column]; 00447 } else { 00448
00449
00450
00451
00452 szLeft -= (sizeof(uint16_t) + szVal) ; 00453 } 00454 00455
00456 if (szLeft < ((int) nClusterCols * LcsMaxSzLeftError)) { 00457
00458 szLeft = oldSzLeft; 00459 assert(szLeft >= 0); 00460 return false; 00461 } 00462 00463
00464 00465 lastValOffset = lastVal[column] - szVal; 00466 00467
00468 batchDirs[column].nVal++; 00469 00470
00471
00472 if (batchDirs[column].nVal == nextWidthChange[column]) { 00473
00474
00475 nBits[column] = calcWidth(batchDirs[column].nVal); 00476 nextWidthChange[column] = (1 << nBits[column]) + 1; 00477 } 00478 00479 lastVal[column] = lastValOffset; 00480 00481
00482
00483
00484 if (fixed == bForceMode[column]) { 00485 memcpy(pValBank[column] + lastValOffset, pVal, szVal); 00486 } else { 00487 memcpy(pBlock[column] + lastValOffset, pVal, szVal); 00488 } 00489 00490
00491 *oVal = lastValOffset; 00492 00493 nVal[column]++; 00494 00495 return true; 00496 } 00497 00498 void LcsClusterNodeWriter::undoValue( 00499 uint column, PBuffer pVal, bool bFirstInBatch) 00500 { 00501
00502
00503
00504
00505 uint szVal = 00506 (pVal) ? attrAccessors[column].getStoredByteCount(pVal) : 0; 00507 00508
00509 szLeft += (sizeof(uint16_t) + szVal) ; 00510 assert(szLeft >= 0); 00511 00512
00513 if (bFirstInBatch) { 00514
00515 batchDirs[column].nVal--; 00516 00517
00518 if (batchDirs[column].nVal == 0) { 00519 nextWidthChange[column] = 1; 00520 } else { 00521
00522
00523 nBits[column] = calcWidth(batchDirs[column].nVal); 00524 nextWidthChange[column] = (1 << nBits[column]) + 1; 00525 } 00526 } 00527 00528 if (pVal) { 00529
00530 lastVal[column] += szVal; 00531 nVal[column]--; 00532 } 00533 } 00534 00535 void LcsClusterNodeWriter::putCompressedBatch( 00536 uint column, PBuffer pRows, PBuffer pBuf) 00537 { 00538 uint i, j, b; 00539 uint iRow; 00540 uint nByte; 00541 uint8_t *pBit; 00542 uint16_t *pOffs; 00543 PLcsBatchDir pBatch; 00544 00545 WidthVec w;
00546 PtrVec p;
00547 uint iV;
00548 00549
00550
00551
00552
00553
00554
00555
00556 00557
00558
00559 00560 if (batchDirs[column].nRow > 8) { 00561 uint len; 00562 pOffs = (uint16_t *)(pBlock[column] + batchDirs[column].oVal); 00563 for (i = round8Boundary((uint32_t) batchDirs[column].nRow); 00564 i < batchDirs[column].nRow; i++, pBuf += batchDirs[column].recSize) 00565 { 00566 iRow = ((uint16_t *) pRows)[i]; 00567 len = 00568 attrAccessors[column].getStoredByteCount( 00569 pBlock[column] + pOffs[iRow]); 00570 memcpy(pBuf, pBlock[column] + pOffs[iRow], len); 00571 } 00572 batchDirs[column].nRow = 00573 round8Boundary((uint32_t) batchDirs[column].nRow); 00574 } 00575 00576
00577 iV = bitVecWidth(nBits[column], w); 00578 00579
00580 pBit = pBlock[column] + batchDirs[column].oVal + 00581 batchDirs[column].nVal*sizeof(uint16_t); 00582 00583
00584 nByte = bitVecPtr(batchDirs[column].nRow, iV, w, p, pBit); 00585 memset(pBit, 0, nByte); 00586 00587 for (j = 0, b = 0; j < iV ; j++) { 00588 switch (w[j]) { 00589 case 16: 00590 memcpy(p[j], pRows, batchDirs[column].nRow * sizeof(uint16_t)); 00591 break; 00592 00593 case 8: 00594 for (i = 0; i < batchDirs[column].nRow ; i++) { 00595 (p[j])[i] = (uint8_t)((uint16_t *) pRows)[i]; 00596 } 00597 break; 00598 00599 case 4: 00600 for (i = 0; i < batchDirs[column].nRow ; i++) { 00601 setBits( 00602 p[j] + i / 2 , 00603 4, 00604 (i % 2) * 4, 00605 (uint16_t)(((uint16_t *) pRows)[i] >> b)); 00606 } 00607 break; 00608 00609 case 2: 00610 for (i = 0; i < batchDirs[column].nRow ; i++) { 00611 setBits( 00612 p[j] + i / 4 , 00613 2, 00614 (i % 4) * 2, 00615 (uint16_t)(((uint16_t *) pRows)[i] >> b)); 00616 } 00617 break; 00618 00619 case 1: 00620 for (i = 0; i < batchDirs[column].nRow ; i++) { 00621 setBits( 00622 p[j] + i / 8 , 00623 1, 00624 (i % 8), 00625 (uint16_t)(((uint16_t *)pRows)[i] >> b)); 00626 } 00627 break; 00628 00629 default: 00630 ; 00631 } 00632 b += w[j]; 00633 } 00634 00635
00636 pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column]); 00637 pBatch[batchCount[column]] = batchDirs[column]; 00638 batchCount[column]++; 00639 00640
00641 batchDirs[column].mode = LCS_COMPRESSED; 00642 batchDirs[column].oLastValHighMark = lastVal[column]; 00643 batchDirs[column].nValHighMark = nVal[column]; 00644 batchDirs[column].nVal = 0; 00645 batchDirs[column].oVal = batchOffset[column]; 00646 batchDirs[column].nRow = 0; 00647 00648
00649 nBits[column] = 0; 00650 nextWidthChange[column] = 1 ; 00651 } 00652 00653 void LcsClusterNodeWriter::putFixedVarBatch( 00654 uint column, uint16_t pRows, PBuffer pBuf) 00655 { 00656 uint i; 00657 uint batchRows; 00658 PBuffer pVal; 00659 PLcsBatchDir pBatch; 00660 PBuffer src; 00661 uint batchRecSize; 00662 uint16_t localLastVal; 00663 uint16_t localoValBank; 00664 PBuffer localpValBank, localpBlock; 00665 00666 00667
00668
00669
00670
00671
00672
00673 batchRows = (batchDirs[column].nRow > 8) 00674 ? batchDirs[column].nRow & 0xfffffff8 : batchDirs[column].nRow; 00675 00676
00677 pVal = pBlock[column] + batchDirs[column].oVal; 00678 if (batchDirs[column].mode == LCS_VARIABLE) { 00679
00680
00681
00682 memcpy(pVal, pRows, batchRows * sizeof(uint16_t)); 00683 } else { 00684
00685 assert(batchDirs[column].mode == LCS_FIXED); 00686 00687 batchRecSize = batchDirs[column].recSize; 00688 localLastVal = lastVal[column]; 00689 localpValBank = pValBank[column] + valBankStart[column]; 00690 localoValBank = oValBank[column]; 00691 localpBlock = pBlock[column]; 00692 00693
00694
00695 for (i = 0; i < batchRows; i++) { 00696
00697
00698 src = valueSource( 00699 localLastVal, localpValBank, localoValBank, 00700 localpBlock, pRows[i]); 00701 uint len = attrAccessors[column].getStoredByteCount(src); 00702 memcpy(pVal, src, len); 00703 pVal += batchRecSize; 00704 } 00705 } 00706 00707
00708
00709 if (bForceMode[column] != none) { 00710 if (forceModeCount[column] > 20) { 00711 bForceMode[column] = none; 00712 forceModeCount[column] = 0; 00713 } 00714 } 00715 00716 batchRecSize = batchDirs[column].recSize; 00717 localLastVal = lastVal[column]; 00718 localpValBank = pValBank[column] + valBankStart[column]; 00719 localoValBank = oValBank[column]; 00720 localpBlock = pBlock[column]; 00721 00722
00723 pVal = pBuf; 00724 for (i = batchRows; i < batchDirs[column].nRow; i++) { 00725
00726
00727
00728 src = valueSource( 00729 localLastVal, localpValBank, localoValBank, 00730 localpBlock, pRows[i]); 00731 uint len = attrAccessors[column].getStoredByteCount(src); 00732 memcpy(pVal, src, len); 00733 pVal += batchRecSize; 00734 } 00735 00736 if (pValBank[column]) { 00737 oValBank[column] = 0; 00738 } 00739 00740
00741 batchDirs[column].nRow = batchRows; 00742 pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column]); 00743 pBatch[batchCount[column]] = batchDirs[column]; 00744 00745
00746 batchCount[column]++; 00747 00748
00749
00750 switch (bForceMode[column]) { 00751 case none: 00752 batchDirs[column].mode = LCS_COMPRESSED; 00753 break; 00754 case fixed: 00755 batchDirs[column].mode = LCS_FIXED; 00756 break; 00757 case variable: 00758 batchDirs[column].mode = LCS_VARIABLE; 00759 break; 00760 default: 00761 assert(false); 00762 } 00763 batchDirs[column].oLastValHighMark = lastVal[column]; 00764 batchDirs[column].nValHighMark = nVal[column]; 00765 batchDirs[column].nVal = 0; 00766 batchDirs[column].oVal = batchOffset[column]; 00767 batchDirs[column].nRow = 0; 00768 00769
00770 nBits[column] = 0; 00771 nextWidthChange[column] = 1 ; 00772 00773 maxValueSize[column] = 0; 00774 } 00775 00776 void LcsClusterNodeWriter::pickCompressionMode( 00777 uint column, uint recSize, uint nRow, uint16_t **pValOffset, 00778 LcsBatchMode &compressionMode) 00779 { 00780 uint nByte; 00781 PLcsBatchDir pBatch; 00782 WidthVec w;
00783 uint iV;
00784 00785 00786 uint szCompressed;
00787 uint szVariable;
00788 uint szFixed;
00789 uint szNonCompressed; 00790 uint deltaVal; 00791 uint batchRows;
00792
00793 00794
00795 batchDirs[column].nRow = nRow; 00796 batchDirs[column].recSize = recSize; 00797 00798
00799
00800
00801 00802
00803
00804 batchRows = (nRow > 8) ? nRow & 0xfffffff8 : nRow; 00805 00806 szCompressed = batchDirs[column].nVal
sizeof(uint16_t) + 00807 (nBits[column]*nRow + LcsMaxSzLeftError * 8) / 8 00808 + (batchDirs[column].oLastValHighMark - lastVal[column]); 00809 00810
00811 szVariable = batchDirs[column].nRow * sizeof(uint16_t) 00812 + (batchDirs[column].oLastValHighMark - lastVal[column]); 00813 00814
00815
00816 uint leftOverSize; 00817 leftOverSize = LcsMaxLeftOver * sizeof(uint16_t) + 00818 (3 * LcsMaxLeftOver + LcsMaxSzLeftError * 8) / 8 00819 + LcsMaxLeftOver * recSize; 00820 szFixed = nRow * recSize + leftOverSize; 00821 00822 szNonCompressed = std::min(szFixed, szVariable); 00823 00824
00825
00826
00827
00828
00829
00830 00831 if ((fixed == bForceMode[column] || variable == bForceMode[column]) 00832 || szCompressed > szNonCompressed) { 00833
00834 pValOffset = NULL; 00835 batchDirs[column].nVal = 0; 00836 00837 forceModeCount[column]++; 00838 00839
00840 if (fixed == bForceMode[column] || szNonCompressed == szFixed) { 00841
00842
00843 batchDirs[column].mode = LCS_FIXED; 00844 00845
00846 if (bForceMode[column] != fixed) { 00847
00848
00849
00850 00851
00852 deltaVal = batchDirs[column].oLastValHighMark - lastVal[column]; 00853 00854
00855
00856 if (deltaVal) { 00857 memcpy( 00858 pValBank[column], 00859 pBlock[column] + lastVal[column], 00860 deltaVal); 00861 } 00862 00863 valBankStart[column] = 0; 00864 00865
00866
00867 bForceMode[column] = fixed; 00868 00869
00870
00871 assert(szVariable >= szFixed); 00872 szLeft += (szVariable - szFixed); 00873 assert(szLeft >= 0); 00874 } else { 00875 valBankStart[column] = lastVal[column]; 00876 } 00877 00878
00879
00880 00881
00882 oValBank[column] = lastVal[column]; 00883 lastVal[column] = batchDirs[column].oLastValHighMark; 00884 nVal[column] = batchDirs[column].nValHighMark; 00885 00886
00887 nByte = batchRows * batchDirs[column].recSize; 00888 00889 } else { 00890
00891 00892 batchDirs[column].mode = LCS_VARIABLE; 00893 00894
00895 nByte = batchRows
sizeof(uint16_t); 00896 00897
00898
00899 bForceMode[column] = variable; 00900 } 00901 } else { 00902
00903
00904 00905 pValOffset = (uint16_t )(pBlock[column] + batchOffset[column]); 00906 00907
00908 iV = bitVecWidth(nBits[column], w); 00909 00910
00911
00912 nByte = sizeofBitVec(batchRows, iV, w) + 00913 batchDirs[column].nVal * sizeof(uint16_t); 00914 00915
00916 assert(szVariable >= szCompressed); 00917 szLeft += (szVariable - szCompressed); 00918 assert(szLeft >= 0); 00919 } 00920 00921 compressionMode = batchDirs[column].mode; 00922 00923
00924
00925 pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column] + nByte); 00926 memmove( 00927 pBatch, 00928 pBlock[column] + batchOffset[column], 00929 batchCount[column] * sizeof(LcsBatchDir)); 00930 00931
00932
00933 szLeft -= sizeof(LcsBatchDir); 00934 szLeft = std::max(szLeft, 0); 00935 assert(szLeft >= 0); 00936 00937
00938
00939 batchDirs[column].oVal = batchOffset[column]; 00940 00941
00942
00943
00944 batchOffset[column] = (batchOffset[column] + nByte); 00945 } 00946 00947 00948 00949 00950 00951 00952 void myCopy(void
pDest, void
pSrc, uint sz) 00953 { 00954 if (pDest == pSrc) { 00955 return; 00956 } else { 00957 memcpy(pDest, pSrc, sz); 00958 } 00959 } 00960 00961 RecordNum LcsClusterNodeWriter::moveFromIndexToTemp() 00962 { 00963 PLcsBatchDir pBatch; 00964 boost::scoped_array batchDirOffset; 00965 uint16_t loc; 00966 uint column; 00967 uint batchCount = pHdr->nBatch / nClusterCols; 00968 uint b; 00969 00970 batchDirOffset.reset(new uint16_t[pHdr->nBatch]); 00971 00972
00973
00974
00975
00976 for (column = 0; column < nClusterCols; column++) { 00977 uint sz = firstVal[column] - lastVal[column]; 00978 loc = (uint16_t) (szBlock - sz); 00979 myCopy(pBlock[column] + loc, pIndexBlock + lastVal[column], sz); 00980 00981
00982 lastVal[column] = loc; 00983 firstVal[column] = (uint16_t) szBlock; 00984 } 00985 00986
00987 00988 pBatch = (PLcsBatchDir)(pIndexBlock + pHdr->oBatch); 00989 for (column = 0; column < nClusterCols; column++) { 00990 uint i; 00991 loc = hdrSize; 00992 00993
00994 for (b = column, i = 0; i < batchCount; i++, b = b + nClusterCols) { 00995 uint16_t batchStart = loc; 00996 00997 if (pBatch[b].mode == LCS_COMPRESSED) { 00998 uint8_t *pBit; 00999 WidthVec w;
01000 PtrVec p;
01001 uint iV;
01002 uint sizeOffsets, nBytes; 01003 01004
01005 sizeOffsets = pBatch[b].nVal * sizeof(uint16_t); 01006 myCopy( 01007 pBlock[column] + loc, pIndexBlock + pBatch[b].oVal, 01008 sizeOffsets); 01009 01010
01011 loc = (uint16_t) (loc + sizeOffsets); 01012 01013
01014 iV = bitVecWidth(calcWidth(pBatch[b].nVal), w); 01015 01016
01017 pBit = pIndexBlock + pBatch[b].oVal + sizeOffsets; 01018 01019
01020 nBytes = bitVecPtr(pBatch[b].nRow, iV, w, p, pBit); 01021 01022 myCopy(pBlock[column] + loc, pBit, nBytes); 01023 01024
01025 loc = (uint16_t) (loc + nBytes); 01026 } else if (pBatch[b].mode == LCS_VARIABLE) { 01027 uint sizeOffsets; 01028 01029 sizeOffsets = pBatch[b].nRow * sizeof(uint16_t); 01030 01031
01032 myCopy( 01033 pBlock[column] + loc, pIndexBlock + pBatch[b].oVal, 01034 sizeOffsets); 01035 01036
01037 loc = (uint16_t) (loc + sizeOffsets); 01038 } else { 01039
01040 uint sizeFixed; 01041 01042 sizeFixed = pBatch[b].nRow * pBatch[b].recSize; 01043
01044 myCopy( 01045 pBlock[column] + loc, pIndexBlock + pBatch[b].oVal, 01046 sizeFixed); 01047 01048
01049 loc = (uint16_t) (loc + sizeFixed); 01050 } 01051 01052
01053 batchDirOffset[b] = batchStart; 01054 } 01055 01056
01057 01058 uint16_t dirLoc; 01059 b = column; 01060 dirLoc = loc; 01061 batchOffset[column] = dirLoc; 01062 01063
01064 for (i = 0; i < batchCount; i++) { 01065 PLcsBatchDir pTempBatch = (PLcsBatchDir)(pBlock[column] + dirLoc); 01066 myCopy(pTempBatch, &pBatch[b], sizeof(LcsBatchDir)); 01067 01068 pTempBatch->oVal = batchDirOffset[b]; 01069
01070 b = b + nClusterCols; 01071 dirLoc += sizeof(LcsBatchDir); 01072 } 01073 } 01074 01075
01076 pBatch = (PLcsBatchDir)(pIndexBlock + pHdr->oBatch); 01077 RecordNum nrows = 0; 01078 for (b = 0; b < pHdr->nBatch; b = b + nClusterCols) { 01079 nrows += pBatch[b].nRow; 01080 } 01081 01082 batchDirOffset.reset(); 01083 return nrows; 01084 } 01085 01086 void LcsClusterNodeWriter::moveFromTempToIndex() 01087 { 01088 PLcsBatchDir pBatch; 01089 uint sz, numBatches = batchCount[0]; 01090 uint16_t offset, loc; 01091 uint column, b; 01092 01093
01094
01095 01096 for (offset = (uint16_t) szBlock, column = 0; column < nClusterCols; 01097 column++) 01098 { 01099 sz = szBlock - lastVal[column]; 01100 myCopy( 01101 pIndexBlock + (offset - sz), pBlock[column] + lastVal[column], sz); 01102 01103
01104 delta[column] = (uint16_t)(szBlock - offset); 01105 01106
01107
01108 firstVal[column] = offset; 01109 offset = (uint16_t) (offset - sz); 01110 lastVal[column] = offset; 01111 } 01112 01113
01114 01115 for (loc = hdrSize, b = 0; b < numBatches; b++) { 01116 for (column = 0; column < nClusterCols; column++) { 01117 uint16_t batchStart = loc; 01118 01119 pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column]); 01120 01121 if (pBatch[b].mode == LCS_COMPRESSED) { 01122 uint8_t *pBit; 01123 WidthVec w;
01124 PtrVec p;
01125 uint iV;
01126 uint sizeOffsets, nBytes; 01127 01128 sizeOffsets = pBatch[b].nVal * sizeof(uint16_t); 01129 01130
01131 myCopy( 01132 pIndexBlock + loc, pBlock[column] + pBatch[b].oVal, 01133 sizeOffsets); 01134 01135
01136 loc = (uint16_t) (loc + sizeOffsets); 01137 01138
01139 iV = bitVecWidth(calcWidth(pBatch[b].nVal), w); 01140 01141
01142 pBit = pBlock[column] + pBatch[b].oVal + sizeOffsets; 01143 01144
01145 nBytes = bitVecPtr(pBatch[b].nRow, iV, w, p, pBit); 01146 01147 myCopy(pIndexBlock + loc, pBit, nBytes); 01148 01149
01150 loc = (uint16_t)(loc + nBytes); 01151 01152 } else if (pBatch[b].mode == LCS_VARIABLE) { 01153 uint sizeOffsets; 01154 01155 sizeOffsets = pBatch[b].nRow * sizeof(uint16_t); 01156 01157
01158 myCopy( 01159 pIndexBlock + loc, pBlock[column] + pBatch[b].oVal, 01160 sizeOffsets); 01161 01162
01163 loc = (uint16_t) (loc + sizeOffsets); 01164 } else { 01165
01166 uint sizeFixed; 01167 01168 sizeFixed = pBatch[b].nRow * pBatch[b].recSize; 01169
01170 myCopy( 01171 pIndexBlock + loc, pBlock[column] + pBatch[b].oVal, 01172 sizeFixed); 01173 01174
01175 loc = (uint16_t) (loc + sizeFixed); 01176 } 01177 01178
01179 pBatch[b].oVal = batchStart; 01180 } 01181 } 01182 01183
01184 pHdr->nBatch = nClusterCols * numBatches; 01185 01186
01187 pHdr->oBatch = loc; 01188 01189
01190 for (b = 0; b < numBatches; b++) { 01191 for (column = 0; column < nClusterCols; column++) { 01192 pBatch = (PLcsBatchDir)(pBlock[column] + batchOffset[column]); 01193 myCopy(pIndexBlock + loc, &pBatch[b], sizeof(LcsBatchDir)); 01194 loc += sizeof(LcsBatchDir); 01195 } 01196 } 01197 01198 if (isTracingLevel(TRACE_FINE)) { 01199 FENNEL_TRACE( 01200 TRACE_FINE, "Calling ClusterDump from moveFromTempToIndex"); 01201 clusterDump->dump(opaqueToInt(clusterPageId), pHdr, szBlock); 01202 } 01203 } 01204 01205 void LcsClusterNodeWriter::allocArrays() 01206 { 01207
01208 if (arraysAllocated) { 01209 arraysAllocated = true; 01210 01211 batchDirs.reset(new LcsBatchDir[nClusterCols]); 01212 01213 pValBank.reset(new PBuffer[nClusterCols]); 01214 01215
01216 01217 attrAccessors.reset(new UnalignedAttributeAccessor[nClusterCols]); 01218 01219 for (uint col = 0; col < nClusterCols; col++) { 01220 bufferLock.allocatePage(); 01221 pValBank[col] = bufferLock.getPage().getWritableData(); 01222
01223
01224
01225
01226 bufferLock.unlock(); 01227 01228 attrAccessors[col].compute(colTupleDesc[col]); 01229 } 01230 01231 valBankStart.reset(new uint16_t[nClusterCols]); 01232 01233 forceModeCount.reset(new uint[nClusterCols]); 01234 01235 bForceMode.reset(new ForceMode[nClusterCols]); 01236 01237 oValBank.reset(new uint16_t[nClusterCols]); 01238 01239 batchOffset.reset(new uint16_t[nClusterCols]); 01240 01241 batchCount.reset(new uint[nClusterCols]); 01242 01243 nBits.reset(new uint[nClusterCols]); 01244 01245 nextWidthChange.reset(new uint[nClusterCols]); 01246 01247 maxValueSize.reset(new uint[nClusterCols]); 01248 } 01249 01250 memset(valBankStart.get(), 0, nClusterCols * sizeof(uint16_t)); 01251 memset(forceModeCount.get(), 0, nClusterCols * sizeof(uint)); 01252 memset(bForceMode.get(), 0, nClusterCols * sizeof(ForceMode)); 01253 memset(oValBank.get(), 0, nClusterCols * sizeof(uint16_t)); 01254 memset(batchOffset.get(), 0, nClusterCols * sizeof(uint16_t)); 01255 memset(batchCount.get(), 0, nClusterCols * sizeof(uint)); 01256 memset(nBits.get(), 0, nClusterCols * sizeof(uint)); 01257 memset(nextWidthChange.get(), 0, nClusterCols * sizeof(uint)); 01258 memset(maxValueSize.get(), 0, nClusterCols * sizeof(uint)); 01259 } 01260 01261 01262 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterNodeWriter.cpp#20 $"); 01263 01264