Fennel: /home/pub/open/dev/fennel/hashexe/LhxHashTable.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 #include "fennel/common/CommonPreamble.h" 00024 #include "fennel/hashexe/LhxHashTable.h" 00025 #include "fennel/hashexe/LhxHashTableDump.h" 00026 #include "fennel/tuple/TuplePrinter.h" 00027 #include 00028 00029 using namespace std; 00030 00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxHashTable.cpp#3 $"); 00032 00033 void LhxHashDataAccessor::init(TupleDescriptor const &inputDataDesc) 00034 { 00035 dataDescriptor = inputDataDesc; 00036 dataTuple.compute(dataDescriptor); 00037 dataAccessor.compute(dataDescriptor); 00038 } 00039 00040 void LhxHashDataAccessor::unpack( 00041 TupleData &outputTuple, 00042 TupleProjection &destProj) 00043 { 00044 PBuffer buf = getBuffer(); 00045 00046 assert (buf != NULL); 00047 00048 if (destProj.size() > 0) { 00049
00050
00051
00052
00053 00054
00055 00056 00057 00058 uint tupleSize = min(destProj.size(), dataTuple.size()); 00059 00060
00061 00062 00063 00064 dataAccessor.unmarshal(dataTuple); 00065 00066 for (int i = 0; i < tupleSize; i ++) { 00067 outputTuple[destProj[i]].copyFrom(dataTuple[i]); 00068 } 00069 } else { 00070
00071 00072 00073 dataAccessor.unmarshal(outputTuple); 00074 } 00075 } 00076 00077 string LhxHashDataAccessor::toString() 00078 { 00079 TuplePrinter tuplePrinter; 00080 ostringstream dataTrace; 00081 TupleProjection allFields; 00082 allFields.clear(); 00083 00084 unpack(dataTuple, allFields); 00085 dataTrace << "[Data Node] "; 00086 tuplePrinter.print(dataTrace, dataDescriptor, dataTuple); 00087 return dataTrace.str(); 00088 } 00089 00090 LhxHashKeyAccessor::LhxHashKeyAccessor() 00091 : LhxHashNodeAccessor( 00092 sizeof(PBuffer) + sizeof(uint8_t) + sizeof(PBuffer *)) 00093 { 00094 firstDataOffset = 0; 00095
00096 00097 00098 isMatchedOffset = firstDataOffset + sizeof(PBuffer); 00099
00100 00101 00102 nextSlotOffset = isMatchedOffset + sizeof(uint8_t); 00103 } 00104 00105 void LhxHashKeyAccessor::init( 00106 TupleDescriptor const &keyDescInit, 00107 TupleProjection const &keyColsProjInit, 00108 TupleProjection const &aggsProjInit) 00109 { 00110 keyDescriptor = keyDescInit; 00111 keyTuple.compute(keyDescriptor); 00112 keyAccessor.compute(keyDescriptor); 00113 00114 keyColsProj = keyColsProjInit; 00115 aggsProj = aggsProjInit; 00116 00117 keyColsDesc.projectFrom(keyDescriptor, keyColsProj); 00118 } 00119 00120 void LhxHashKeyAccessor::addData(PBuffer inputData) 00121 { 00122 PBuffer firstDataNode = getFirstData(); 00123
00124 00125 00126 firstData.setNext(inputData, firstDataNode); 00127 setFirstData(inputData); 00128 } 00129 00130 void LhxHashKeyAccessor::unpack( 00131 TupleData &outputTuple, 00132 TupleProjection &destProj) 00133 { 00134 PBuffer buf = getBuffer(); 00135 00136 assert (buf != NULL); 00137 00138 if (destProj.size() > 0) { 00139
00140 00141 00142 00143 uint tupleSize = min(destProj.size(), keyTuple.size()); 00144 00145
00146 00147 00148 00149 keyAccessor.unmarshal(keyTuple); 00150 00151 for (int i = 0; i < tupleSize; i ++) { 00152 outputTuple[destProj[i]].copyFrom(keyTuple[i]); 00153 } 00154 } else { 00155
00156 00157 00158 keyAccessor.unmarshal(outputTuple); 00159 } 00160 } 00161 00162 bool LhxHashKeyAccessor::matches( 00163 TupleData const &inputTuple, 00164 TupleProjection const &inputKeyProj) 00165 { 00166 assert(inputKeyProj.size() == keyColsProj.size()); 00167 00168 inputKey.projectFrom(inputTuple, inputKeyProj); 00169 00170 keyAccessor.unmarshal(keyTuple); 00171 00172 currentKey.projectFrom(keyTuple, keyColsProj); 00173 00174 return keyColsDesc.compareTuples( 00175 keyTuple, keyColsProj, 00176 inputTuple, inputKeyProj) == 0; 00177 } 00178 00179 string LhxHashKeyAccessor::toString() 00180 { 00181 TuplePrinter tuplePrinter; 00182 ostringstream keyTrace; 00183 TupleProjection allFields; 00184 allFields.clear(); 00185 00186 keyTuple.compute(keyDescriptor); 00187 unpack(keyTuple, allFields); 00188 keyTrace << "[Key Node] [" 00189 << (isMatched() ? "matched" : "unmatched") 00190 << " next " << getNextSlot() << "] "; 00191 tuplePrinter.print(keyTrace, keyDescriptor, keyTuple); 00192 return keyTrace.str(); 00193 } 00194 00195 void LhxHashBlockAccessor::init(uint usablePageSize) 00196 { 00197 blockUsableSize = usablePageSize - getBufferOffset(); 00198 numSlotsPerBlock = blockUsableSize / sizeof(PBuffer); 00199 } 00200 00201 void LhxHashBlockAccessor::setCurrent( 00202 PBuffer blockPtrInit, 00203 bool valid, 00204 bool clearContent) 00205 { 00206 LhxHashNodeAccessor::setCurrent(blockPtrInit); 00207 freePtr = getBuffer(); 00208 assert(freePtr); 00209 endPtr = freePtr + blockUsableSize; 00210 00211 if (valid) { 00212 freePtr = endPtr; 00213 } else if (clearContent) { 00214
00215 00216 00217 00218 00219 memset(freePtr, 0, blockUsableSize); 00220 } 00221 00222 } 00223 00224 PBuffer LhxHashBlockAccessor::allocBuffer(uint bufSize) 00225 { 00226 PBuffer resultPtr = freePtr; 00227 00228 if (freePtr + bufSize > endPtr) { 00229 resultPtr = NULL; 00230 } else { 00231 freePtr += bufSize; 00232 } 00233 return resultPtr; 00234 } 00235 00236 PBuffer *LhxHashBlockAccessor::getSlot(uint slotNum) 00237 { 00238 assert (getCurrent() != NULL); 00239 if (slotNum >= numSlotsPerBlock) { 00240
00241 00242 00243 return NULL; 00244 } else { 00245 return (PBuffer *)(getBuffer() + slotNum * sizeof(PBuffer)); 00246 } 00247 } 00248 00249 void LhxHashTable::init( 00250 uint partitionLevelInit, 00251 LhxHashInfo const &hashInfo, 00252 uint buildInputIndex) 00253 { 00254 maxBlockCount = hashInfo.numCachePages; 00255 assert (maxBlockCount > 1); 00256 scratchAccessor = hashInfo.memSegmentAccessor; 00257 partitionLevel = partitionLevelInit; 00258 bufferLock.accessSegment(scratchAccessor); 00259 currentBlockCount = 0; 00260 00261
00262 00263 00264 RecordNum cndKeys = hashInfo.cndKeys[buildInputIndex]; 00265 uint usablePageSize = scratchAccessor.pSegment->getUsablePageSize(); 00266 00267 calculateNumSlots(cndKeys, usablePageSize, maxBlockCount); 00268 00269
00270 00271 00272 filterNull = hashInfo.filterNull[buildInputIndex]; 00273 00274 filterNullKeyProj = hashInfo.filterNullKeyProj[buildInputIndex]; 00275 removeDuplicate = hashInfo.removeDuplicate[buildInputIndex]; 00276 00277 blockAccessor.init(usablePageSize); 00278 nodeBlockAccessor.init(usablePageSize); 00279 maxBufferSize = nodeBlockAccessor.getUsableSize(); 00280 00281 hashGen.init(partitionLevel); 00282 hashGenSub.init(partitionLevel + 1); 00283 00284 uint i; 00285 00286
00287 00288 00289 TupleDescriptor const &buildTupleDesc = hashInfo.inputDesc[buildInputIndex]; 00290 keyColsProj = hashInfo.keyProj[buildInputIndex]; 00291 00292
00293 00294 00295 00296 isKeyColVarChar = hashInfo.isKeyColVarChar[buildInputIndex]; 00297 aggsProj = hashInfo.aggsProj; 00298 dataProj = hashInfo.dataProj[buildInputIndex]; 00299 00300 isGroupBy = false; 00301 00302
00303 00304 00305 00306 TupleDescriptor keyDesc; 00307 TupleDescriptor dataDesc; 00308 TupleProjection keyColsProjInKey; 00309 TupleProjection aggsProjInKey; 00310 00311 uint keyCount = keyColsProj.size(); 00312 for (i = 0; i < keyCount; i++) { 00313 keyDesc.push_back(buildTupleDesc[keyColsProj[i]]); 00314 keyColsProjInKey.push_back(i); 00315 } 00316 00317 keyColsAndAggsProj = keyColsProj; 00318 for (i = 0; i < aggsProj.size(); i++) { 00319 keyColsAndAggsProj.push_back(aggsProj[i]); 00320 keyDesc.push_back(buildTupleDesc[aggsProj[i]]); 00321 aggsProjInKey.push_back(i + keyCount); 00322 } 00323 00324 hashKeyAccessor.init(keyDesc, keyColsProjInKey, aggsProjInKey); 00325 00326 for (i = 0; i < dataProj.size(); i++) { 00327 dataDesc.push_back(buildTupleDesc[dataProj[i]]); 00328 } 00329 00330 hashDataAccessor.init(dataDesc); 00331 } 00332 00333 void LhxHashTable::init( 00334 uint partitionLevelInit, 00335 LhxHashInfo const &hashInfo, 00336 AggComputerList *aggList, 00337 uint buildInputIndex) 00338 { 00339 init(partitionLevelInit, hashInfo, buildInputIndex); 00340 00341 aggComputers = aggList; 00342
00343 00344 00345 00346 aggWorkingTuple.compute(hashInfo.inputDesc[buildInputIndex]); 00347 aggResultTuple.computeAndAllocate(hashInfo.inputDesc[buildInputIndex]); 00348 00349 isGroupBy = true; 00350 00351 if (aggList->size() > 0) { 00352 hasAggregates = true; 00353 } else { 00354 hasAggregates = false; 00355 } 00356 } 00357 00358 PBuffer LhxHashTable::allocBlock() 00359 { 00360 PBuffer resultBlock; 00361 00362 if (currentBlockCount < maxBlockCount) { 00363 currentBlockCount ++; 00364
00365 00366 00367 bufferLock.allocatePage(); 00368 resultBlock = bufferLock.getPage().getWritableData(); 00369 bufferLock.unlock(); 00370 00371
00372 00373 00374 blockAccessor.setCurrent(resultBlock, false, false); 00375 blockAccessor.setNext(NULL); 00376 } else { 00377
00378 00379 00380 resultBlock = NULL; 00381 } 00382 return resultBlock; 00383 } 00384 00385 PBuffer LhxHashTable::allocBuffer(uint bufSize) 00386 { 00387 PBuffer resultBuf = nodeBlockAccessor.allocBuffer(bufSize); 00388 00389 if (!resultBuf) { 00390
00391 00392 00393 PBuffer nextBlock = nodeBlockAccessor.getNext(); 00394 if (nextBlock) { 00395 currentBlock = nextBlock; 00396 } else { 00397 PBuffer newBlock = allocBlock(); 00398 nodeBlockAccessor.setNext(newBlock); 00399 currentBlock = newBlock; 00400 } 00401 00402 if (currentBlock) { 00403 nodeBlockAccessor.setCurrent(currentBlock, false, false); 00404 resultBuf = nodeBlockAccessor.allocBuffer(bufSize); 00405 00406 assert (resultBuf); 00407 } 00408 } 00409 00410 return resultBuf; 00411 } 00412 00413 bool LhxHashTable::allocateResources(bool reuse) 00414 { 00415 assert (numSlots != 0); 00416 00417 PBuffer newBlock; 00418 00419 slotBlocks.clear(); 00420 firstSlot = NULL; 00421 lastSlot = NULL; 00422 00423 if (!reuse) { 00424 firstBlock = allocBlock(); 00425 } 00426 00427 currentBlock = firstBlock; 00428 00429
00430 00431 00432 assert (currentBlock != NULL); 00433 00434 uint numSlotsPerBlock = blockAccessor.getSlotsPerBlock(); 00435 00436
00437 00438 00439 nodeBlockAccessor.setCurrent(currentBlock, false, true); 00440 slotBlocks.push_back(currentBlock); 00441 00442 if (numSlots <= numSlotsPerBlock) { 00443
00444 00445 00446 00447 00448 00449 nodeBlockAccessor.allocSlots(numSlots); 00450 return true; 00451 } 00452 00453
00454 00455 00456 int numSlotsToAlloc = numSlots - numSlotsPerBlock; 00457 00458 while (numSlotsToAlloc > 0) { 00459 newBlock = NULL; 00460 if (reuse) { 00461 newBlock = nodeBlockAccessor.getNext(); 00462 } 00463 00464 if (!newBlock) { 00465 newBlock = allocBlock(); 00466 if (!newBlock) { 00467 return false; 00468 } 00469 } 00470 00471
00472 00473 00474 nodeBlockAccessor.setNext(newBlock); 00475 currentBlock = newBlock; 00476 nodeBlockAccessor.setCurrent(currentBlock, false, true); 00477 slotBlocks.push_back(currentBlock); 00478 00479 if (numSlotsToAlloc <= numSlotsPerBlock) { 00480
00481 00482 00483 00484 00485 00486 nodeBlockAccessor.allocSlots(numSlotsToAlloc); 00487 } 00488 00489 numSlotsToAlloc -= numSlotsPerBlock; 00490 } 00491 return true; 00492 } 00493 00494 void LhxHashTable::releaseResources(bool reuse) 00495 { 00496
00497 00498 00499 00500 00501 if (!reuse && scratchAccessor.pSegment) { 00502 scratchAccessor.pSegment->deallocatePageRange( 00503 NULL_PAGE_ID, 00504 NULL_PAGE_ID); 00505 firstBlock = NULL; 00506 currentBlockCount = 0; 00507 } 00508 00509 hashKeyAccessor.reset(); 00510 hashDataAccessor.reset(); 00511 blockAccessor.reset(); 00512 nodeBlockAccessor.reset(); 00513 currentBlock = NULL; 00514 } 00515 00516 void LhxHashTable::calculateNumSlots( 00517 RecordNum cndKeys, 00518 uint usablePageSize, 00519 BlockNum numBlocks) 00520 { 00521
00522
00523 if (isMAXU(cndKeys)) { 00524 cndKeys = RecordNum(10000); 00525 } 00526 00527
00528 00529 00530 00531 uint slotsLow = numBlocks * usablePageSize / sizeof(PBuffer) / 100; 00532 uint slotsHigh = numBlocks * usablePageSize / sizeof(PBuffer) / 10; 00533 00534 numSlots = 00535 max(slotsNeeded(cndKeys), slotsLow); 00536 00537 numSlots = min(numSlots, slotsHigh); 00538 } 00539 00540 void LhxHashTable::calculateSize( 00541 LhxHashInfo const &hashInfo, 00542 uint inputIndex, 00543 BlockNum &numBlocks) 00544 { 00545 uint usablePageSize = 00546 (hashInfo.memSegmentAccessor.pSegment)->getUsablePageSize() 00547 - sizeof(PBuffer); 00548 00549 TupleDescriptor const &inputDesc = hashInfo.inputDesc[inputIndex]; 00550 00551 TupleProjection const &keyProj = hashInfo.keyProj[inputIndex]; 00552 00553 TupleProjection const &dataProj = hashInfo.dataProj[inputIndex]; 00554 00555 RecordNum cndKeys = hashInfo.cndKeys[inputIndex]; 00556 RecordNum numRows = hashInfo.numRows[inputIndex]; 00557
00558
00559 if (isMAXU(cndKeys) || isMAXU(numRows)) { 00560 numBlocks = MAXU; 00561 return; 00562 } 00563 00564 TupleDescriptor keyDesc; 00565 keyDesc.projectFrom(inputDesc, keyProj); 00566 00567 TupleDescriptor dataDesc; 00568 dataDesc.projectFrom(inputDesc, dataProj); 00569 00570 LhxHashKeyAccessor tmpKey; 00571 LhxHashDataAccessor tmpData; 00572 00573 TupleProjection tmpKeyProj; 00574 TupleProjection tmpAggsProj; 00575 00576
00577 00578 00579 for (int i = 0; i < keyDesc.size(); i ++) { 00580 tmpKeyProj.push_back(i); 00581 } 00582 00583 tmpKey.init(keyDesc, tmpKeyProj, tmpAggsProj); 00584 tmpData.init(dataDesc); 00585 00586 double totalBytes = 00587 slotsNeeded(cndKeys) * sizeof(PBuffer) 00588 + cndKeys * tmpKey.getAvgStorageSize() 00589 + numRows * tmpData.getAvgStorageSize(); 00590 double nBlocks = ceil(totalBytes / usablePageSize); 00591 if (nBlocks >= BlockNum(MAXU)) { 00592 numBlocks = BlockNum(MAXU) - 1; 00593 } else { 00594 numBlocks = BlockNum(nBlocks); 00595 } 00596 } 00597 00598 00599 PBuffer *LhxHashTable::getSlot(uint slotNum) 00600 { 00601 PBuffer *slot; 00602 uint slotsPerBlock = blockAccessor.getSlotsPerBlock(); 00603 00604 blockAccessor.setCurrent(slotBlocks[slotNum / slotsPerBlock], true, false); 00605 00606 slot = blockAccessor.getSlot(slotNum % slotsPerBlock); 00607 00608 assert (slot); 00609 00610 return slot; 00611 } 00612 00613 PBuffer LhxHashTable::findKeyLocation( 00614 TupleData const &inputTuple, 00615 TupleProjection const &inputKeyProj, 00616 bool isProbing, 00617 bool removeDuplicateProbe) 00618 { 00619 uint slotNum = 00620 (hashGen.hash(inputTuple, inputKeyProj, isKeyColVarChar)) % numSlots; 00621 00622 PBuffer *slot = getSlot(slotNum); 00623 PBuffer keyLocation = (PBuffer)slot; 00624 PBuffer firstKey = *slot; 00625 PBuffer nextKey; 00626 00627 if (firstKey) { 00628
00629 00630 00631 00632 hashKeyAccessor.setCurrent(firstKey, true); 00633 while (hashKeyAccessor.matches(inputTuple, inputKeyProj)) { 00634 nextKey = hashKeyAccessor.getNext(); 00635 if (!nextKey) { 00636 return NULL; 00637 } 00638 00639 keyLocation = hashKeyAccessor.getNextLocation(); 00640 hashKeyAccessor.setCurrent(nextKey, true); 00641 } 00642 } else { 00643 return NULL; 00644 } 00645 00646
00647 00648 00649 if (removeDuplicateProbe && hashKeyAccessor.isMatched()) { 00650 return NULL; 00651 } 00652 00653 if (isProbing) { 00654 hashKeyAccessor.setMatched(true); 00655 } 00656 00657 return keyLocation; 00658 } 00659 00660 bool LhxHashTable::addKeyData(TupleData const &inputTuple) 00661 { 00662
00663
00664
00665
00666
00667 uint slotNum = 00668 (hashGen.hash(inputTuple, keyColsProj, isKeyColVarChar)) % numSlots; 00669 00670 PBuffer *slot = getSlot(slotNum); 00671 PBuffer *newLastSlot = NULL; 00672 00673 if (firstSlot) { 00674 firstSlot = slot; 00675 lastSlot = slot; 00676 } else { 00677 if (!(*slot)) { 00678
00679
00680 newLastSlot = slot; 00681 } 00682 } 00683 00684 PBuffer newNextKey = slot; 00685 00686 PBuffer newKey = NULL; 00687 00688 if (isGroupBy) { 00689 tmpKeyTuple.projectFrom(inputTuple, keyColsProj); 00690 hashKeyAccessor.checkStorageSize(tmpKeyTuple, maxBufferSize); 00691 uint newKeyLen = 00692 hashKeyAccessor.getStorageSize(tmpKeyTuple); 00693 newKey = allocBuffer(newKeyLen); 00694 } else { 00695 aggResultTuple.resetBuffer(); 00696 for (int i = 0; i < keyColsProj.size() ; i ++) { 00697 aggResultTuple[i].copyFrom(inputTuple[keyColsProj[i]]); 00698 } 00699 00700 for (int i = 0; i < aggComputers->size(); i ++) { 00701 (aggComputers)[i].initAccumulator( 00702 aggResultTuple[aggsProj[i]], inputTuple); 00703 } 00704 hashKeyAccessor.checkStorageSize(aggResultTuple, maxBufferSize); 00705 newKey = 00706 allocBuffer(hashKeyAccessor.getStorageSize(aggResultTuple)); 00707 } 00708 00709 PBuffer newData = NULL; 00710 00711 if (isGroupBy) { 00712
00713 00714 00715 tmpDataTuple.projectFrom(inputTuple, dataProj); 00716 hashDataAccessor.checkStorageSize(tmpDataTuple, maxBufferSize); 00717 uint newDataLen = hashDataAccessor.getStorageSize(tmpDataTuple); 00718 newData = allocBuffer(newDataLen); 00719 } 00720 00721 if (!newKey || (isGroupBy && !newData)) { 00722
00723 00724 00725 return false; 00726 } 00727 00728 PBuffer nextSlot = NULL; 00729 00730 if (newNextKey) { 00731
00732
00733 hashKeyAccessor.setCurrent(newNextKey, true); 00734 nextSlot = hashKeyAccessor.getNextSlot(); 00735 hashKeyAccessor.setNextSlot(NULL); 00736 } 00737 00738 slot = newKey; 00739 hashKeyAccessor.setCurrent(newKey, false); 00740 hashKeyAccessor.setMatched(false); 00741 hashKeyAccessor.setNext(newNextKey); 00742 hashKeyAccessor.setNextSlot(nextSlot); 00743 hashKeyAccessor.setFirstData(NULL); 00744 00745 if (isGroupBy) { 00746
00747 00748 00749 hashKeyAccessor.pack(tmpKeyTuple); 00750 00751
00752 00753 00754 hashKeyAccessor.setCurrent(newKey, true); 00755 hashDataAccessor.setCurrent(newData, false); 00756 hashDataAccessor.pack(tmpDataTuple); 00757 hashKeyAccessor.addData(newData); 00758 } else { 00759
00760 00761 00762 hashKeyAccessor.pack(aggResultTuple); 00763 } 00764 00765 00766
00767 00768 00769 if (newLastSlot) { 00770 hashKeyAccessor.setCurrent((
lastSlot), true); 00771 hashKeyAccessor.setNextSlot(newLastSlot); 00772 lastSlot = newLastSlot; 00773 } 00774 00775 return true; 00776 } 00777 00778 bool LhxHashTable::addData(PBuffer keyNode, TupleData const &inputTuple) 00779 { 00780
00781 00782 00783 00784 00785 00786 00787 hashKeyAccessor.setCurrent(keyNode, true); 00788 00789 tmpDataTuple.projectFrom(inputTuple, dataProj); 00790 00791 hashDataAccessor.checkStorageSize(tmpDataTuple, maxBufferSize); 00792 00793 uint newDataLen = 00794 hashDataAccessor.getStorageSize(tmpDataTuple); 00795 PBuffer newData = allocBuffer(newDataLen); 00796 00797 if (!newData) { 00798
00799 00800 00801 return false; 00802 } 00803 00804 hashDataAccessor.setCurrent(newData, false); 00805 hashDataAccessor.pack(tmpDataTuple); 00806 hashKeyAccessor.addData(newData); 00807 return true; 00808 } 00809 00810 bool LhxHashTable::aggData(PBuffer destKeyLoc, TupleData const &inputTuple) 00811 { 00812 PBuffer destKey; 00813
00814 00815 00816 memcpy((PBuffer)&destKey, destKeyLoc, sizeof(PBuffer)); 00817 00818 hashKeyAccessor.setCurrent(destKey, true); 00819 00820 aggResultTuple.resetBuffer(); 00821 00822 hashKeyAccessor.unpack(aggWorkingTuple, keyColsAndAggsProj); 00823 00824 for (int i = 0; i < keyColsProj.size() ; i ++) { 00825 aggResultTuple[i].copyFrom(inputTuple[keyColsProj[i]]); 00826 } 00827 00828 for (int i = 0; i < aggComputers->size(); i ++) { 00829 (
aggComputers)[i].updateAccumulator( 00830 aggWorkingTuple[aggsProj[i]], 00831 aggResultTuple[aggsProj[i]], 00832 inputTuple); 00833 } 00834 00835 hashKeyAccessor.checkStorageSize(aggResultTuple, maxBufferSize); 00836 00837 uint newResultSize = 00838 hashKeyAccessor.getStorageSize(aggResultTuple); 00839 00840 uint oldResultSize = 00841 hashKeyAccessor.getStorageSize(aggWorkingTuple); 00842 00843 if (newResultSize > oldResultSize) { 00844 PBuffer newKey = NULL; 00845 PBuffer newNextKey = hashKeyAccessor.getNext(); 00846 00847
00848 00849 00850 00851 newKey = allocBuffer(newResultSize); 00852 00853 if (newKey) { 00854
00855 00856 00857 00858 PBuffer nextSlot = hashKeyAccessor.getNextSlot(); 00859 00860
00861 00862 00863 00864 memcpy(destKeyLoc, (PBuffer)&newKey, sizeof(PBuffer)); 00865 00866 hashKeyAccessor.setCurrent(newKey, false); 00867 hashKeyAccessor.setMatched(false); 00868 hashKeyAccessor.setNext(newNextKey); 00869 hashKeyAccessor.pack(aggResultTuple); 00870 hashKeyAccessor.setNextSlot(nextSlot); 00871 return true; 00872 } else { 00873 return false; 00874 } 00875 } else { 00876
00877 00878 00879 hashKeyAccessor.pack(aggResultTuple); 00880 return true; 00881 } 00882 } 00883 00884 bool LhxHashTable::addTuple(TupleData const &inputTuple) 00885 { 00886 if (filterNull && inputTuple.containsNull(filterNullKeyProj)) { 00887
00888 00889 00890 00891 00892 return true; 00893 } 00894 00895
00896 00897 00898 bool isProbing = false; 00899 bool removeDuplicateProbe = false; 00900 PBuffer destKeyLoc = 00901 findKeyLocation( 00902 inputTuple, keyColsProj, isProbing, 00903 removeDuplicateProbe); 00904 00905 if (!destKeyLoc) { 00906
00907 00908 00909 return addKeyData(inputTuple); 00910 } else if (removeDuplicate) { 00911
00912 00913 00914 return true; 00915 } else { 00916
00917 00918 00919 00920 00921 if (isGroupBy) { 00922 PBuffer destKey; 00923
00924 00925 00926 memcpy((PBuffer
)&destKey, destKeyLoc, sizeof(PBuffer)); 00927 00928 assert (destKey); 00929 00930 return addData(destKey, inputTuple); 00931 } else { 00932 if (hasAggregates) { 00933 return true; 00934 } 00935 return aggData(destKeyLoc, inputTuple); 00936 } 00937 } 00938 } 00939 00940 PBuffer LhxHashTable::findKey( 00941 TupleData const &inputTuple, 00942 TupleProjection const &inputKeyProj, 00943 bool removeDuplicateProbe) 00944 { 00945 PBuffer destKey; 00946 PBuffer destKeyLoc; 00947 bool isProbing = true; 00948 destKeyLoc = 00949 findKeyLocation( 00950 inputTuple, inputKeyProj, isProbing, 00951 removeDuplicateProbe); 00952 00953 if (destKeyLoc) { 00954
00955 00956 00957 memcpy((PBuffer)&destKey, destKeyLoc, sizeof(PBuffer)); 00958 return destKey; 00959 } else { 00960 return NULL; 00961 } 00962 } 00963 00964 string LhxHashTable::printSlot(uint slotNum) 00965 { 00966 ostringstream slotTrace; 00967 PBuffer *slot = getSlot(slotNum); 00968 00969 slotTrace << "[Slot] [" << slotNum << "] [" << slot <<"]\n"; 00970 00971
00972 00973 00974 PBuffer currentHashKey = *slot; 00975 while (currentHashKey) { 00976 hashKeyAccessor.setCurrent(currentHashKey, true); 00977 slotTrace << " " << hashKeyAccessor.toString() << "\n"; 00978 00979
00980 00981 00982 PBuffer currentHashData = hashKeyAccessor.getFirstData(); 00983 while (currentHashData) { 00984 hashDataAccessor.setCurrent(currentHashData, true); 00985 slotTrace << " " << hashDataAccessor.toString() << "\n"; 00986
00987 00988 00989 currentHashData = hashDataAccessor.getNext(); 00990 } 00991 00992
00993 00994 00995 currentHashKey = hashKeyAccessor.getNext(); 00996 } 00997 return slotTrace.str(); 00998 } 00999 01000 string LhxHashTable::toString() 01001 { 01002 ostringstream hashTableTrace; 01003 01004 hashTableTrace << "\n" 01005 << "[Hash Table : maximum # blocks = " << maxBlockCount << "]\n" 01006 << "[ current # blocks = " << currentBlockCount << "]\n" 01007 << "[ # slots = " << numSlots << "]\n" 01008 << "[ partition level = " << partitionLevel << "]\n" 01009 << "[ first slot = " << firstSlot << "]\n" 01010 << "[ last slot = " << lastSlot << "]\n"; 01011 01012 for (int i = 0; i < numSlots; i ++) { 01013 hashTableTrace << printSlot(i); 01014 } 01015 01016 return hashTableTrace.str(); 01017 } 01018 01019 bool LhxHashTableReader::advanceSlot() 01020 { 01021 if (boundKey) { 01022 curKey = NULL; 01023 01024 if (isPositioned) { 01025 curSlot = hashTable->getFirstSlot(); 01026 } else { 01027 curSlot = hashTable->getNextSlot(curSlot); 01028 } 01029 01030 if (curSlot && *curSlot) { 01031 curKey = curSlot; 01032 if (returnUnMatched) { 01033
01034 01035 01036 hashKeyAccessor.setCurrent(
curSlot, true); 01037 01038
01039 01040 01041 while (hashKeyAccessor.isMatched()) { 01042 curKey = hashKeyAccessor.getNext(); 01043 if (curKey) { 01044 curSlot = hashTable->getNextSlot(curSlot); 01045 if (curSlot) { 01046 curKey = *curSlot; 01047 } else { 01048 curKey = NULL; 01049 } 01050 } 01051 01052 if (curKey) { 01053 hashKeyAccessor.setCurrent(curKey, true); 01054 } else { 01055
01056 01057 01058 break; 01059 } 01060 } 01061 } 01062 } 01063 01064 if (curKey) { 01065
01066 01067 01068 return false; 01069 } 01070 } 01071 01072 hashKeyAccessor.setCurrent(curKey, true); 01073 01074 if (isGroupBy) { 01075 curData = hashKeyAccessor.getFirstData(); 01076 assert(curData); 01077 hashDataAccessor.setCurrent(curData, true); 01078 } 01079 01080 return true; 01081 } 01082 01083 bool LhxHashTableReader::advanceKey() 01084 { 01085 while ((curKey = hashKeyAccessor.getNext())) { 01086 if (returnUnMatched) { 01087 break; 01088 } else { 01089 hashKeyAccessor.setCurrent(curKey, true); 01090 if (hashKeyAccessor.isMatched()) { 01091 break; 01092 } 01093 } 01094 } 01095 01096 if (curKey) { 01097 hashKeyAccessor.setCurrent(curKey, true); 01098 if (isGroupBy) { 01099 curData = hashKeyAccessor.getFirstData(); 01100 assert(curData); 01101 hashDataAccessor.setCurrent(curData, true); 01102 } 01103 return true; 01104 } else { 01105 return false; 01106 } 01107 } 01108 01109 bool LhxHashTableReader::advanceData() 01110 { 01111 if (isGroupBy) { 01112 return false; 01113 } 01114 01115 curData = hashDataAccessor.getNext(); 01116 if (curData) { 01117 hashDataAccessor.setCurrent(curData, true); 01118 return true; 01119 } else { 01120 return false; 01121 } 01122 } 01123 01124 void LhxHashTableReader::produceTuple(TupleData &outputTuple) 01125 { 01126 hashKeyAccessor.unpack(outputTuple, keyColsAndAggsProj); 01127 if (isGroupBy) { 01128 hashDataAccessor.unpack(outputTuple, dataProj); 01129 } 01130 } 01131 01132 void LhxHashTableReader::init( 01133 LhxHashTable *hashTableInit, 01134 LhxHashInfo const &hashInfo, 01135 uint buildInputIndex) 01136 { 01137
01138 01139 01140 TupleDescriptor const &outputTupleDesc = 01141 hashInfo.inputDesc[buildInputIndex]; 01142 TupleProjection const &keyColsProj = hashInfo.keyProj[buildInputIndex]; 01143 TupleProjection const &aggsProj = hashInfo.aggsProj; 01144 01145 dataProj = hashInfo.dataProj[buildInputIndex]; 01146 01147
01148 01149 01150 01151 TupleDescriptor keyDesc; 01152 TupleDescriptor dataDesc; 01153 TupleProjection keyColsProjInKey; 01154 TupleProjection aggsProjInKey; 01155 uint keyCount = keyColsProj.size(); 01156 uint i; 01157 01158 for (i = 0; i < keyCount; i++) { 01159 keyDesc.push_back(outputTupleDesc[keyColsProj[i]]); 01160 keyColsProjInKey.push_back(i); 01161 } 01162 01163 keyColsAndAggsProj = keyColsProj; 01164 uint aggsProjSize = aggsProj.size(); 01165 01166 for (i = 0; i < aggsProjSize; i ++) { 01167 keyColsAndAggsProj.push_back(aggsProj[i]); 01168 keyDesc.push_back(outputTupleDesc[aggsProj[i]]); 01169 aggsProjInKey.push_back(i + keyCount); 01170 } 01171 01172 hashKeyAccessor.init(keyDesc, keyColsProjInKey, aggsProjInKey); 01173 01174 for (i = 0; i < dataProj.size(); i++) { 01175 dataDesc.push_back(outputTupleDesc[dataProj[i]]); 01176 } 01177 01178 hashDataAccessor.init(dataDesc); 01179 01180 hashTable = hashTableInit; 01181 isGroupBy = hashTable->isHashGroupBy(); 01182 01183
01184 01185 01186 01187 bindKey(NULL); 01188 } 01189 01190 bool LhxHashTableReader::getNext(TupleData &outputTuple) 01191 { 01192 if (isPositioned) { 01193 assert (!(boundKey && returnUnMatched)); 01194 01195
01196 01197 01198 if (advanceSlot()) { 01199
01200 01201 01202 return false; 01203 } 01204 produceTuple(outputTuple); 01205 isPositioned = true; 01206 return true; 01207 } 01208 01209 if (advanceData()) { 01210 produceTuple(outputTuple); 01211 return true; 01212 } else { 01213 if (boundKey) { 01214
01215 01216 01217 return false; 01218 } else { 01219
01220 01221 01222 if (advanceKey()) { 01223 produceTuple(outputTuple); 01224 return true; 01225 } else { 01226
01227 01228 01229 01230 if (advanceSlot()) { 01231 produceTuple(outputTuple); 01232 return true; 01233 } else { 01234 return false; 01235 } 01236 } 01237 } 01238 } 01239 } 01240 01241 01242 #ifdef MSVC 01243 class UnreferencedHashexeStructs 01244 { 01245 LhxHashTableDump dump; 01246 }; 01247 #endif 01248 01249 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxHashTable.cpp#3 $"); 01250 01251