Fennel: /home/pub/open/dev/fennel/hashexe/LhxPartition.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 #include "fennel/common/CommonPreamble.h" 00024 #include "fennel/hashexe/LhxPartition.h" 00025 #include "fennel/hashexe/LhxHashGenerator.h" 00026 #include "fennel/exec/ExecStreamBufAccessor.h" 00027 00028 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxPartition.cpp#4 $"); 00029 00030 void LhxPartitionWriter::open( 00031 SharedLhxPartition destPartitionInit, 00032 LhxHashInfo const &hashInfo) 00033 { 00034 destPartition = destPartitionInit; 00035 00036 tupleAccessor.compute(hashInfo.inputDesc[destPartition->inputIndex]); 00037 00038 pSegOutputStream = SegOutputStream::newSegOutputStream( 00039 hashInfo.externalSegmentAccessor); 00040 destPartition->segStream = 00041 SegStreamAllocation::newSegStreamAllocation(); 00042 destPartition->segStream->beginWrite(pSegOutputStream); 00043 00044 isAggregate = false; 00045 } 00046 00047 void LhxPartitionWriter::open( 00048 SharedLhxPartition destPartitionInit, 00049 LhxHashInfo &hashInfo, 00050 AggComputerList *aggList, 00051 uint numWriterCachePages) 00052 { 00053 destPartition = destPartitionInit; 00054 tupleAccessor.compute(hashInfo.inputDesc[destPartition->inputIndex]); 00055 00056 pSegOutputStream = SegOutputStream::newSegOutputStream( 00057 hashInfo.externalSegmentAccessor); 00058 destPartition->segStream = 00059 SegStreamAllocation::newSegStreamAllocation(); 00060 destPartition->segStream->beginWrite(pSegOutputStream); 00061 00062 isAggregate = true; 00063
00064 00065 00066 uint partitionLevel = 0; 00067 uint savedNumCachePages = hashInfo.numCachePages; 00068 hashInfo.numCachePages = numWriterCachePages; 00069 00070 hashTable.init( 00071 partitionLevel, 00072 hashInfo, 00073 aggList, 00074 destPartition->inputIndex); 00075 hashTableReader.init(&hashTable, hashInfo, destPartition->inputIndex); 00076 00077 hashInfo.numCachePages = savedNumCachePages; 00078 00079 uint cndKeys = hashInfo.cndKeys.back(); 00080 uint usablePageSize = 00081 (hashInfo.memSegmentAccessor.pSegment)->getUsablePageSize(); 00082 00083 hashTable.calculateNumSlots(cndKeys, usablePageSize, numWriterCachePages); 00084 00085 partialAggTuple.compute(hashInfo.inputDesc[destPartition->inputIndex]); 00086 } 00087 00088 void LhxPartitionWriter::close() 00089 { 00090 if (isAggregate) { 00091
00092 00093 00094 while (hashTableReader.getNext(partialAggTuple)) { 00095 uint tupleStorageLength = 00096 tupleAccessor.getByteCount(partialAggTuple); 00097 PBuffer pDestBuf = 00098 pSegOutputStream->getWritePointer(tupleStorageLength); 00099 tupleAccessor.marshal(partialAggTuple, pDestBuf); 00100 pSegOutputStream->consumeWritePointer(tupleStorageLength); 00101 } 00102 } 00103 destPartition->segStream->endWrite(); 00104 pSegOutputStream->close(); 00105 } 00106 00107 void LhxPartitionWriter::marshalTuple(TupleData const &inputTuple) 00108 { 00109 uint tupleStorageLength = tupleAccessor.getByteCount(inputTuple); 00110 PBuffer pDestBuf = pSegOutputStream->getWritePointer(tupleStorageLength); 00111 tupleAccessor.marshal(inputTuple, pDestBuf); 00112 pSegOutputStream->consumeWritePointer(tupleStorageLength); 00113 } 00114 00115 void LhxPartitionWriter::aggAndMarshalTuple(TupleData const &inputTuple) 00116 { 00117 while (hashTable.addTuple(inputTuple)) { 00118
00119 00120 00121 while (hashTableReader.getNext(partialAggTuple)) { 00122 uint tupleStorageLength = 00123 tupleAccessor.getByteCount(partialAggTuple); 00124 PBuffer pDestBuf = 00125 pSegOutputStream->getWritePointer(tupleStorageLength); 00126 tupleAccessor.marshal(partialAggTuple, pDestBuf); 00127 pSegOutputStream->consumeWritePointer(tupleStorageLength); 00128 } 00129 bool reuse = true; 00130
00131 00132 00133 bool status = hashTable.allocateResources(reuse); 00134 assert(status); 00135
00136 00137 00138 00139 hashTableReader.bindKey(NULL); 00140 } 00141 } 00142 00143 void LhxPartitionReader::open( 00144 SharedLhxPartition srcPartitionInit, 00145 LhxHashInfo const &hashInfo) 00146 { 00147 bufState = EXECBUF_NONEMPTY; 00148 srcPartition = srcPartitionInit; 00149 00150 if (srcPartition->segStream) { 00151
00152 00153 00154 00155 srcIsInputStream = true; 00156 } else { 00157 srcIsInputStream = false; 00158 } 00159 00160 if (srcIsInputStream) { 00161 streamBufAccessor = 00162 hashInfo.streamBufAccessor[srcPartition->inputIndex]; 00163 outputTupleDesc = streamBufAccessor->getTupleDesc(); 00164 } else { 00165 outputTupleDesc = hashInfo.inputDesc[srcPartition->inputIndex]; 00166 tupleAccessor.compute(outputTupleDesc); 00167 tupleAccessor.resetCurrentTupleBuf(); 00168 00169
00170 00171 00172 00173 00174 pSegInputStream = srcPartition->segStream->getInputStream(); 00175 pSegInputStream->startPrefetch(); 00176 } 00177 } 00178 00179 void LhxPartitionReader::close() 00180 { 00181 if (srcIsInputStream) { 00182
00183 00184 00185 } else { 00186 pSegInputStream->close(); 00187 } 00188 } 00189 00190 void LhxPartitionReader::unmarshalTuple(TupleData &outputTuple) 00191 { 00192 if (srcIsInputStream) { 00193
00194 00195 00196 streamBufAccessor->unmarshalTuple(outputTuple); 00197 } else { 00198 tupleAccessor.unmarshal(outputTuple); 00199 } 00200 } 00201 00202 void LhxPartitionReader::consumeTuple() 00203 { 00204 if (srcIsInputStream) { 00205 streamBufAccessor->consumeTuple(); 00206 } else { 00207 tupleAccessor.resetCurrentTupleBuf(); 00208 pSegInputStream->consumeReadPointer(tupleStorageLength); 00209 } 00210 } 00211 00212 bool LhxPartitionReader::isTupleConsumptionPending() 00213 { 00214 if (srcIsInputStream) { 00215 return streamBufAccessor->isTupleConsumptionPending(); 00216 } else { 00217 if (tupleAccessor.getCurrentTupleBuf()) { 00218 return true; 00219 } else { 00220 return false; 00221 } 00222 } 00223 } 00224 00225 bool LhxPartitionReader::demandData() 00226 { 00227 if (srcIsInputStream) { 00228 return streamBufAccessor->demandData(); 00229 } else { 00230
00231 00232 00233 uint bytesReadable = 0; 00234 PConstBuffer pSrcBuf = 00235 pSegInputStream->getReadPointer(1, &bytesReadable); 00236 00237
00238 00239 00240 00241 if (!pSrcBuf) { 00242 bufState = EXECBUF_EOS; 00243 return false; 00244 } else { 00245 tupleStorageLength = tupleAccessor.getBufferByteCount(pSrcBuf); 00246 assert(bytesReadable >= tupleStorageLength); 00247 if (bytesReadable == tupleStorageLength) { 00248
00249
00250 if (srcPartition->pExecStream) { 00251 srcPartition->pExecStream->checkAbort(); 00252 } 00253 } 00254 tupleAccessor.setCurrentTupleBuf(pSrcBuf); 00255 return true; 00256 } 00257 } 00258 } 00259 00260 void LhxPartitionInfo::init(LhxHashInfo *hashInfoInit) 00261 { 00262 hashInfo = hashInfoInit; 00263 numInputs = (hashInfo->inputDesc).size(); 00264 00265 writerList.clear(); 00266
00267 00268 00269 00270 for (uint i = 0; i < numInputs * LhxPlan::LhxChildPartCount; i ++) { 00271 writerList.push_back( 00272 SharedLhxPartitionWriter(new LhxPartitionWriter())); 00273 } 00274 00275 filteredRowCountList.reset( 00276 new uint[numInputs * LhxPlan::LhxChildPartCount]); 00277 } 00278 00279 void LhxPartitionInfo::open( 00280 LhxHashTableReader *hashTableReaderInit, 00281 LhxPartitionReader *buildReader, 00282 TupleData &buildTupleInit, 00283 SharedLhxPartition probePartition, 00284 uint buildInputIndex) 00285 { 00286 uint i, j; 00287 00288 probeReader.open(probePartition, *hashInfo); 00289 00290
00291 00292 00293 curInputIndex = buildInputIndex; 00294 00295 hashTableReader = hashTableReaderInit; 00296
00297 00298 00299 00300 hashTableReader->bindKey(NULL); 00301 00302
00303 00304 00305 reader = buildReader; 00306 00307
00308 00309 00310 buildTuple = buildTupleInit; 00311 00312 destPartitionList.clear(); 00313 subPartStatList.clear(); 00314 joinFilterList.clear(); 00315 shared_array curSubPartStat; 00316 00317 for (i = 0; i < numInputs * LhxPlan::LhxChildPartCount; i ++) { 00318 destPartitionList.push_back( 00319 SharedLhxPartition(new LhxPartition(probePartition->pExecStream))); 00320 destPartitionList[i]->inputIndex = (i / LhxPlan::LhxChildPartCount); 00321 subPartStatList.push_back( 00322 shared_array(new uint[LhxPlan::LhxSubPartCount])); 00323 00324 curSubPartStat = subPartStatList[i]; 00325 00326 for (j = 0; j < LhxPlan::LhxSubPartCount; j ++) { 00327 curSubPartStat[j] = 0; 00328 } 00329 00330
00331 00332 00333 00334 joinFilterList.push_back(shared_ptr<dynamic_bitset<> >()); 00335 00336 writerList[i]->open(destPartitionList[i], *hashInfo); 00337 filteredRowCountList[i] = 0; 00338 } 00339 00340
00341 00342 00343 partitionMemory = true; 00344 } 00345 00346 void LhxPartitionInfo::open( 00347 LhxHashTableReader *hashTableReaderInit, 00348 LhxPartitionReader *buildReader, 00349 TupleData &buildTupleInit, 00350 AggComputerList *aggList) 00351 { 00352 uint i, j; 00353 assert (numInputs == 1); 00354 uint buildIndex = numInputs - 1; 00355 00356 curInputIndex = buildIndex; 00357 00358 hashTableReader = hashTableReaderInit; 00359
00360 00361 00362 00363 hashTableReader->bindKey(NULL); 00364 00365
00366
00367 00368 00369 reader = buildReader; 00370 00371
00372 00373 00374 buildTuple = buildTupleInit; 00375 00376
00377 00378 00379 00380 uint numWriterCachePages = 00381 hashInfo->numCachePages / LhxPlan::LhxChildPartCount; 00382 00383 destPartitionList.clear(); 00384 subPartStatList.clear(); 00385 joinFilterList.clear(); 00386 shared_array curSubPartStat; 00387 00388 for (i = 0; i < numInputs * LhxPlan::LhxChildPartCount; i ++) { 00389 destPartitionList.push_back( 00390 SharedLhxPartition( 00391 new LhxPartition(reader->getSourcePartition()->pExecStream))); 00392 destPartitionList[i]->inputIndex = (i / LhxPlan::LhxChildPartCount); 00393 subPartStatList.push_back( 00394 shared_array(new uint[LhxPlan::LhxSubPartCount])); 00395 00396 curSubPartStat = subPartStatList[i]; 00397 00398 for (j = 0; j < LhxPlan::LhxSubPartCount; j ++) { 00399 curSubPartStat[j] = 0; 00400 } 00401 00402
00403
00404 00405 00406 00407 joinFilterList.push_back(shared_ptr<dynamic_bitset<> >()); 00408 00409 writerList[i]->open( 00410 destPartitionList[i], 00411 *hashInfo, 00412 aggList, 00413 numWriterCachePages); 00414 filteredRowCountList[i] = 0; 00415 } 00416 00417
00418 00419 00420 partitionMemory = true; 00421 } 00422 00423 void LhxPartitionInfo::close() 00424 { 00425 reader->close(); 00426 00427 uint numWriters = writerList.size(); 00428 00429 for (uint i = 0; i < numWriters; i ++) { 00430 writerList[i]->close(); 00431 } 00432 00433
00434 00435 00436 00437 00438 00439 00440 for (uint i = 0; i < numWriters; i ++) { 00441 writerList[i]->releaseResources(); 00442 } 00443 } 00444 00445 void LhxPlan::init( 00446 WeakLhxPlan parentPlanInit, 00447 uint partitionLevelInit, 00448 vector &partitionsInit, 00449 bool enableSubPartStat) 00450 { 00451
00452 00453 00454 shared_ptr<dynamic_bitset<> > joinFilterInit = 00455 shared_ptr<dynamic_bitset<> >(); 00456 vector<shared_array > subPartStatsInit; 00457 VectorOfUint filteredRows; 00458 00459 for (uint i = 0; i < partitionsInit.size(); i ++) { 00460 subPartStatsInit.push_back(shared_array()); 00461 filteredRows.push_back(0); 00462 } 00463 00464 init( 00465 parentPlanInit, partitionLevelInit, partitionsInit, 00466 subPartStatsInit, joinFilterInit, filteredRows, 00467 enableSubPartStat, false); 00468 } 00469 00470 void LhxPlan::init( 00471 WeakLhxPlan parentPlanInit, 00472 uint partitionLevelInit, 00473 vector &partitionsInit, 00474 vector<shared_array > &subPartStats, 00475 shared_ptr<dynamic_bitset<> > joinFilterInit, 00476 VectorOfUint &filteredRowsInit, 00477 bool enableSubPartStat, 00478 bool enableSwing) 00479 { 00480 uint numInputs = partitionsInit.size(); 00481 00482 partitionLevel = partitionLevelInit; 00483 parentPlan = parentPlanInit; 00484 00485
00486 00487 00488 00489 00490 00491 joinFilter = joinFilterInit; 00492 00493 filteredRowCount.reset(new uint[numInputs]); 00494 inputSize.reset(new uint[numInputs]); 00495 joinSideToInputMap.reset(new uint[numInputs]); 00496 subPartToChildMap.reset(); 00497 00498
00499
00500
00501 00502 00503 00504 00505 00506 for (int i = 0; i < numInputs; i ++) { 00507 partitions.push_back(partitionsInit[i]); 00508 filteredRowCount[i] = filteredRowsInit[i]; 00509 joinSideToInputMap[i] = i; 00510 00511 inputSize[i] = 0; 00512 shared_array inputSubPartStat = subPartStats[i]; 00513 00514 if (inputSubPartStat) { 00515 for (int k = 0; k < LhxSubPartCount; k ++) { 00516 inputSize[i] += inputSubPartStat[k]; 00517 } 00518 } 00519 } 00520 00521
00522 00523 00524 00525 00526 if (enableSwing && 00527 (numInputs == 2) && (inputSize[0] < inputSize[1])) { 00528 joinSideToInputMap[0] = 1; 00529 joinSideToInputMap[1] = 0; 00530 } 00531 00532 subPartToChildMap.reset(); 00533 00534 if (enableSubPartStat) { 00535
00536 00537 00538 00539 00540 mapSubPartToChild(subPartStats); 00541 } 00542 } 00543 00544 void LhxPlan::mapSubPartToChild( 00545 vector<shared_array > &subPartStats) 00546 { 00547 uint numInputs = partitions.size(); 00548 uint buildIndex = getBuildInput(); 00549 shared_array buildSubPartStat = subPartStats[buildIndex]; 00550 00551 if (!buildSubPartStat) { 00552 return; 00553 } 00554 00555 uint i, j, k; 00556 00557 subPartToChildMap.reset(new uint[LhxSubPartCount]); 00558 00559 for (i = 0; i < numInputs; i ++) { 00560 childPartSize.push_back( 00561 shared_array(new uint[LhxChildPartCount])); 00562 } 00563 00564 shared_array buildChildPartSize = childPartSize[buildIndex]; 00565 00566 for (i = 0; i < LhxChildPartCount; i ++) { 00567 buildChildPartSize[i] = 0; 00568 } 00569 00570 j = 0; 00571 for (i = 0; i < LhxSubPartCount; i ++) { 00572 buildChildPartSize[j] += buildSubPartStat[i]; 00573 subPartToChildMap[i] = j; 00574 00575 k = 1; 00576 while ( 00577 (buildChildPartSize[j] 00578 > buildChildPartSize[(j + k) % LhxChildPartCount]) 00579 && k < LhxChildPartCount) 00580 { 00581 k ++; 00582 } 00583 00584 if (k == LhxChildPartCount) { 00585
00586
00587 j = (j + 1) % LhxChildPartCount; 00588 } 00589 } 00590 00591
00592 00593 00594 if (numInputs == 2) { 00595 uint probeIndex = getProbeInput(); 00596 shared_array probeChildPartSize = childPartSize[probeIndex]; 00597 shared_array probeSubPartStat = subPartStats[probeIndex]; 00598 00599 for (i = 0; i < LhxChildPartCount; i ++) { 00600 probeChildPartSize[i] = 0; 00601 } 00602 00603 for (i = 0; i < LhxSubPartCount; i ++) { 00604 probeChildPartSize[subPartToChildMap[i]] += probeSubPartStat[i]; 00605 } 00606 } 00607 } 00608 00609 uint LhxPlan::calculateChildIndex(uint hashKey, uint curInputIndex) 00610 { 00611 if (subPartToChildMap) { 00612 return (subPartToChildMap[hashKey % LhxSubPartCount] + 00613 curInputIndex * LhxChildPartCount); 00614 } else { 00615 return (hashKey % LhxChildPartCount + 00616 curInputIndex * LhxChildPartCount); 00617 } 00618 } 00619 00620 LhxPartitionState LhxPlan::generatePartitions( 00621 LhxHashInfo const &hashInfo, 00622 LhxPartitionInfo &partInfo) 00623 { 00624
00625
00626 uint filterSize = 4096; 00627 bool isAggregate = (partInfo.numInputs == 1); 00628 00629 LhxHashGenerator hashGenPrev; 00630 LhxHashGenerator hashGen; 00631 LhxHashGenerator hashGenNext; 00632 00633 hashGenPrev.init(partitionLevel); 00634 hashGen.init(partitionLevel + 1); 00635 hashGenNext.init(partitionLevel + 2); 00636 00637 LhxPartitionReader *&reader = partInfo.reader; 00638 vector &writerList = partInfo.writerList; 00639 vector<shared_ptr<dynamic_bitset<> > > &joinFilterList = 00640 partInfo.joinFilterList; 00641 vector<shared_array > &subPartStatList = partInfo.subPartStatList; 00642 shared_array &filteredRowCountList = partInfo.filteredRowCountList; 00643 00644 uint &curInputIndex = partInfo.curInputIndex; 00645 uint otherInputIndex = partInfo.numInputs - curInputIndex - 1; 00646 00647 TupleData inputTuple; 00648 TupleDescriptor inputTupleDesc = reader->getTupleDesc(); 00649 inputTuple.compute(inputTupleDesc); 00650 00651 uint prevHashKey; 00652 uint hashKey; 00653 uint nextHashKey; 00654 00655 uint childPartIndex; 00656 bool writeToPartition; 00657 00658 uint statIndex; 00659 shared_array curSubPartStat; 00660 00661
00662 00663 00664 00665 if (partInfo.partitionMemory) { 00666 TupleData hashTableTuple; 00667 TupleDescriptor hashTableTupleDesc = hashInfo.inputDesc[curInputIndex]; 00668 00669 hashTableTuple.compute(hashTableTupleDesc); 00670 00671 while ((partInfo.hashTableReader)->getNext(hashTableTuple)) { 00672 writeToPartition = false; 00673 00674 hashKey = hashGen.hash( 00675 hashTableTuple, 00676 hashInfo.keyProj[curInputIndex], 00677 hashInfo.isKeyColVarChar[curInputIndex]); 00678 00679 childPartIndex = calculateChildIndex(hashKey, curInputIndex); 00680 00681 if (hashInfo.useJoinFilter[curInputIndex]) { 00682
00683 00684 00685 00686 if (partitionLevel == 0) { 00687 writeToPartition = true; 00688 } else { 00689 prevHashKey = 00690 hashGenPrev.hash( 00691 hashTableTuple, 00692 hashInfo.keyProj[curInputIndex], 00693 hashInfo.isKeyColVarChar[curInputIndex]); 00694 if (joinFilter && 00695 joinFilter->test(prevHashKey % filterSize)) { 00696 writeToPartition = true; 00697 } else { 00698 filteredRowCountList[childPartIndex]++; 00699 } 00700 } 00701 } else { 00702
00703 00704 00705 writeToPartition = true; 00706 } 00707 00708 if (writeToPartition) { 00709 writerList[childPartIndex]->marshalTuple(hashTableTuple); 00710 00711 nextHashKey = hashGenNext.hash( 00712 hashTableTuple, 00713 hashInfo.keyProj[curInputIndex], 00714 hashInfo.isKeyColVarChar[curInputIndex]); 00715 00716 statIndex = nextHashKey % LhxSubPartCount; 00717 curSubPartStat = subPartStatList[childPartIndex]; 00718 curSubPartStat[statIndex]++; 00719 00720
00721 00722 00723 00724 00725 if (!joinFilterList[childPartIndex]) { 00726
00727 00728 00729 joinFilterList[childPartIndex].reset( 00730 new dynamic_bitset<>(filterSize)); 00731 } 00732 joinFilterList[childPartIndex]->set(hashKey % filterSize); 00733 } 00734 } 00735 00736 if (isAggregate) { 00737
00738 00739 00740 00741 ((partInfo.hashTableReader)->getHashTable())->releaseResources(); 00742 for (int i = 0; i < writerList.size();i ++) { 00743 writerList[i]->allocateResources(); 00744 } 00745 } 00746 00747
00748 00749 00750 00751 partInfo.partitionMemory = false; 00752 00753
00754 00755 00756 00757 inputTuple = partInfo.buildTuple; 00758 } 00759 00760 for (;;) { 00761
00762 00763 00764 00765 00766 if (!reader->isTupleConsumptionPending()) { 00767 if (reader->getState() == EXECBUF_EOS) { 00768 if (curInputIndex == getProbeInput()) { 00769
00770 00771 00772 00773 return PartitionEndOfData; 00774 } else { 00775 curInputIndex = getProbeInput(); 00776 otherInputIndex = partInfo.numInputs - curInputIndex - 1; 00777 reader->close(); 00778 reader = &partInfo.probeReader; 00779 inputTupleDesc = reader->getTupleDesc(); 00780 inputTuple.compute(inputTupleDesc); 00781 continue; 00782 } 00783 } 00784 00785 if (!reader->demandData()) { 00786 if (partitionLevel == 0) { 00787
00788 00789 00790 return PartitionUnderflow; 00791 } else { 00792 if (curInputIndex == getProbeInput()) { 00793
00794 00795 00796 00797 return PartitionEndOfData; 00798 } else { 00799 curInputIndex = getProbeInput(); 00800 reader->close(); 00801 reader = &partInfo.probeReader; 00802 inputTupleDesc = reader->getTupleDesc(); 00803 inputTuple.compute(inputTupleDesc); 00804 continue; 00805 } 00806 } 00807 } 00808 00809 reader->unmarshalTuple(inputTuple); 00810 } 00811 00812 writeToPartition = false; 00813 00814 hashKey = hashGen.hash( 00815 inputTuple, 00816 hashInfo.keyProj[curInputIndex], 00817 hashInfo.isKeyColVarChar[curInputIndex]); 00818 00819 childPartIndex = calculateChildIndex(hashKey, curInputIndex); 00820 00821 nextHashKey = hashGenNext.hash( 00822 inputTuple, 00823 hashInfo.keyProj[curInputIndex], 00824 hashInfo.isKeyColVarChar[curInputIndex]); 00825 00826 statIndex = nextHashKey % LhxSubPartCount; 00827 00828 if (!isAggregate) { 00829 if (hashInfo.useJoinFilter[curInputIndex]) { 00830
00831 00832 00833 if (isBuildChildPart(childPartIndex)) { 00834
00835 00836 00837 00838 00839 if (partitionLevel == 0) { 00840 writeToPartition = true; 00841 } else { 00842 prevHashKey = 00843 hashGenPrev.hash( 00844 inputTuple, 00845 hashInfo.keyProj[curInputIndex], 00846 hashInfo.isKeyColVarChar[curInputIndex]); 00847 if (joinFilter && 00848 joinFilter->test(prevHashKey % filterSize)) { 00849 writeToPartition = true; 00850 } else { 00851 filteredRowCountList[childPartIndex]++; 00852 } 00853 } 00854 } else { 00855
00856 00857 00858 00859 00860 if (joinFilterList[getBuildChildPart(childPartIndex)] && 00861 joinFilterList[getBuildChildPart(childPartIndex)]-> 00862 test(hashKey % filterSize)) { 00863 writeToPartition = true; 00864 } else { 00865 filteredRowCountList[childPartIndex]++; 00866 } 00867 } 00868 } else { 00869
00870 00871 00872 writeToPartition = true; 00873 } 00874 00875 if (writeToPartition) { 00876 writerList[childPartIndex]->marshalTuple(inputTuple); 00877 curSubPartStat = subPartStatList[childPartIndex]; 00878 curSubPartStat[statIndex]++; 00879 00880
00881 00882 00883 00884 00885 if (!joinFilterList[childPartIndex]) { 00886
00887 00888 00889 joinFilterList[childPartIndex].reset( 00890 new dynamic_bitset<>(filterSize)); 00891 } 00892 joinFilterList[childPartIndex]->set(hashKey % filterSize); 00893 } 00894 } else { 00895 writerList[childPartIndex]->aggAndMarshalTuple(inputTuple); 00896 (subPartStatList[childPartIndex])[statIndex]++; 00897 } 00898 00899 reader->consumeTuple(); 00900 } 00901 } 00902 00903 void LhxPlan::createChildren( 00904 LhxHashInfo const &hashInfo, 00905 bool enableSubPartStat) 00906 { 00907 LhxHashGenerator hashGen; 00908 hashGen.init(partitionLevel + 1); 00909 00910 uint numInputs = hashInfo.inputDesc.size(); 00911 00912 vector destPartitionList(LhxChildPartCount * numInputs); 00913 00914 LhxPartitionReader reader; 00915 LhxPartitionWriter writerList[LhxChildPartCount]; 00916 uint childNum, i, j; 00917 TupleData outputTuple; 00918 00919
00920 00921 00922 for (j = 0; j < numInputs; j ++) { 00923 reader.open(partitions[j], hashInfo); 00924 outputTuple.compute(hashInfo.inputDesc[j]); 00925 00926 for (i = 0; i < LhxChildPartCount; i ++) { 00927 uint index = j * LhxChildPartCount + i; 00928 destPartitionList[index].reset( 00929 new LhxPartition(partitions[j]->pExecStream)); 00930 destPartitionList[index]->inputIndex = j; 00931 writerList[i].open(destPartitionList[index], hashInfo); 00932 } 00933 00934 for (;;) { 00935 if (!reader.isTupleConsumptionPending()) { 00936 if (reader.getState() == EXECBUF_EOS) { 00937
00938 00939 00940 break; 00941 } 00942 if (!reader.demandData()) { 00943 break; 00944 } 00945 reader.unmarshalTuple(outputTuple); 00946 } 00947 00948 childNum = 00949 hashGen.hash( 00950 outputTuple, 00951 hashInfo.keyProj[j], 00952 hashInfo.isKeyColVarChar[j]) % LhxChildPartCount; 00953 00954 writerList[childNum].marshalTuple(outputTuple); 00955 reader.consumeTuple(); 00956 } 00957 00958 for (i = 0; i < LhxChildPartCount; i ++) { 00959 writerList[i].close(); 00960 } 00961 00962
00963 00964 00965 00966 00967 00968 00969 for (i = 0; i < LhxChildPartCount; i ++) { 00970 writerList[i].releaseResources(); 00971 } 00972 00973 reader.close(); 00974 } 00975 00976
00977 00978 00979 for (i = 0; i < LhxChildPartCount; i ++) { 00980 SharedLhxPlan newChildPlan = SharedLhxPlan(new LhxPlan()); 00981 vector partitionList; 00982 partitionList.push_back(destPartitionList[i]); 00983 partitionList.push_back(destPartitionList[i + LhxChildPartCount]); 00984 00985 newChildPlan->init( 00986 WeakLhxPlan(shared_from_this()), 00987 partitionLevel + 1, 00988 partitionList, 00989 enableSubPartStat); 00990 00991 newChildPlan->addSibling(firstChildPlan); 00992 firstChildPlan = newChildPlan; 00993 } 00994 } 00995 00996 void LhxPlan::createChildren( 00997 LhxPartitionInfo &partInfo, 00998 bool enableSubPartStat, 00999 bool enableSwing) 01000 { 01001 uint i, j; 01002 01003 for (i = 0; i < LhxChildPartCount; i ++) { 01004 SharedLhxPlan newChildPlan = SharedLhxPlan(new LhxPlan()); 01005 vector partitionList; 01006 vector<shared_array > subPartStats; 01007 VectorOfUint filteredRows; 01008 for (j = 0; j < partInfo.numInputs; j ++) { 01009 partitionList.push_back( 01010 partInfo.destPartitionList[i + LhxChildPartCount * j]); 01011 subPartStats.push_back( 01012 partInfo.subPartStatList[i + LhxChildPartCount * j]); 01013 filteredRows.push_back( 01014 partInfo.filteredRowCountList[i + LhxChildPartCount * j]); 01015 } 01016 newChildPlan->init( 01017 WeakLhxPlan(shared_from_this()), 01018 partitionLevel + 1, 01019 partitionList, 01020 subPartStats, 01021 partInfo.joinFilterList[getProbeChildPart(i)], 01022 filteredRows, 01023 enableSubPartStat, 01024 enableSwing); 01025 01026 newChildPlan->addSibling(firstChildPlan); 01027 firstChildPlan = newChildPlan; 01028 } 01029 partInfo.destPartitionList.clear(); 01030 partInfo.joinFilterList.clear(); 01031 } 01032 01033 LhxPlan *LhxPlan::getFirstLeaf() 01034 { 01035 if (firstChildPlan) { 01036 return this; 01037 } else { 01038 return firstChildPlan->getFirstLeaf(); 01039 } 01040 } 01041 01042 LhxPlan *LhxPlan::getNextLeaf() 01043 { 01044 if (siblingPlan) { 01045 return siblingPlan->getFirstLeaf(); 01046 } else { 01047 WeakLhxPlan parent = this->parentPlan; 01048 SharedLhxPlan shared_parent = parent.lock(); 01049 01050 if (shared_parent) { 01051 return shared_parent->getNextLeaf(); 01052 } else { 01053 return NULL; 01054 } 01055 } 01056 } 01057 01058 void LhxPlan::close() 01059 { 01060 if (firstChildPlan) { 01061 firstChildPlan->close(); 01062 } 01063 01064 if (siblingPlan) { 01065 siblingPlan->close(); 01066 } 01067 01068 for (uint i = 0; i < partitions.size(); i ++) { 01069 if (partitions[i] && partitions[i]->segStream) { 01070 partitions[i]->segStream->close(); 01071 } 01072 } 01073 } 01074 01075 string LhxPlan::toString() 01076 { 01077 ostringstream planTrace; 01078 01079 planTrace << "\n" 01080 << "[Plan : addr = " << this << "]\n" 01081 << "[ level = " << partitionLevel << "]\n" 01082 << "[ parent = " << parentPlan.lock().get() << "]\n" 01083 << "[ firstChild = " << firstChildPlan.get() << "]\n" 01084 << "[ sibling = " << siblingPlan.get() << "]\n"; 01085 01086
01087 01088 01089 planTrace << "[joinFilter = "; 01090 if (joinFilter) { 01091 planTrace << joinFilter.get(); 01092 } 01093 planTrace << "]\n"; 01094 01095 for (uint i = 0; i < partitions.size(); i ++) { 01096 planTrace << "[Partition(" << i << ")]\n" 01097 << "[ inputIndex = " << partitions[i]->inputIndex << "]\n" 01098 << "[ join side = " << getJoinSide(partitions[i]->inputIndex) << "]\n" 01099 << "[ filteredRows = " << filteredRowCount[i] << "]\n" 01100 << "[ inputSize = " << inputSize[i] << "]\n"; 01101 planTrace << "[ childPartSize = "; 01102 if (childPartSize.size() > i) { 01103 shared_array oneChildPartSize = childPartSize[i]; 01104 if (oneChildPartSize) { 01105 for (uint j = 0; j < LhxChildPartCount; j ++) { 01106 planTrace << oneChildPartSize[j] << " "; 01107 } 01108 } 01109 } 01110 planTrace << "]\n"; 01111 } 01112 01113 planTrace << "[subPartToChildMap = "; 01114 if (subPartToChildMap) { 01115 for (uint i = 0; i < LhxSubPartCount; i ++) { 01116 planTrace << subPartToChildMap[i] << " "; 01117 } 01118 } 01119 planTrace << "]\n"; 01120 01121 SharedLhxPlan childPlan = firstChildPlan; 01122 01123 while (childPlan) { 01124 planTrace << childPlan->toString(); 01125 childPlan = childPlan->siblingPlan; 01126 } 01127 01128 return planTrace.str(); 01129 } 01130 01131 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxPartition.cpp#4 $"); 01132 01133