Fennel: /home/pub/open/dev/fennel/hashexe/LhxJoinExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 #include "fennel/common/CommonPreamble.h" 00024 #include "fennel/hashexe/LhxJoinExecStream.h" 00025 #include "fennel/segment/Segment.h" 00026 #include "fennel/exec/ExecStreamBufAccessor.h" 00027 #include "fennel/tuple/StandardTypeDescriptor.h" 00028 00029 using namespace std; 00030 00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxJoinExecStream.cpp#4 $"); 00032 00033 void LhxJoinExecStream::prepare( 00034 LhxJoinExecStreamParams const &params) 00035 { 00036 assert (params.leftKeyProj.size() == params.rightKeyProj.size()); 00037 00038 ConfluenceExecStream::prepare(params); 00039 00040 setJoinType(params); 00041 setHashInfo(params); 00042 00043 uint numInputs = inAccessors.size(); 00044 00045 inputTuple.reset(new TupleData[2]); 00046 inputTupleSize.reset(new uint[2]); 00047 00048 for (int inputIndex = 0; inputIndex < numInputs; inputIndex++) { 00049 inputTuple[inputIndex].compute( 00050 inAccessors[inputIndex]->getTupleDesc()); 00051 inputTupleSize[inputIndex] = inputTuple[inputIndex].size(); 00052 } 00053 00054
00055 00056 00057 forcePartitionLevel = params.forcePartitionLevel; 00058 enableSubPartStat = params.enableSubPartStat; 00059 00060
00061 00062 00063 00064 00065 00066 00067 00068 00069 00070 00071 00072 00073 00074 00075 00076 00077 00078 00079 00080 00081 00082 00083 00084 00085 00086 00087 00088 00089 00090 00091 bool leftAntiJoin = 00092 (returnProbeOuter() && returnProbeInner() && returnBuild()); 00093 00094 bool rightAntiJoin = 00095 (returnBuildOuter() && returnBuildInner() && returnProbe()); 00096 00097 bool antiJoin = leftAntiJoin || rightAntiJoin; 00098 00099 enableSwing = params.enableSwing && (!(antiJoin && setopDistinct)); 00100 00101
00102 00103 00104 00105 hashTable.calculateSize( 00106 hashInfo, 00107 DefaultBuildInputIndex, 00108 numBlocksHashTable); 00109 00110 TupleDescriptor outputDesc; 00111 00112 if (params.outputProj.size() != 0) { 00113 outputDesc.projectFrom(params.outputTupleDesc, params.outputProj); 00114 } else { 00115 outputDesc = params.outputTupleDesc; 00116 } 00117 00118 outputTuple.compute(outputDesc); 00119 00120 assert (outputTuple.size() == (inputTupleSize[0] + inputTupleSize[1]) || 00121 outputTuple.size() == inputTupleSize[0]|| 00122 outputTuple.size() == inputTupleSize[1]); 00123 00124 pOutAccessor->setTupleShape(outputDesc); 00125 00126
00127 00128 00129 numMiscCacheBlocks = LhxPlan::LhxChildPartCount * numInputs; 00130 } 00131 00132 void LhxJoinExecStream::getResourceRequirements( 00133 ExecStreamResourceQuantity &minQuantity, 00134 ExecStreamResourceQuantity &optQuantity, 00135 ExecStreamResourceSettingType &optType) 00136 { 00137 ConfluenceExecStream::getResourceRequirements(minQuantity,optQuantity); 00138 00139 uint minPages = LhxHashTable::LhxHashTableMinPages + numMiscCacheBlocks; 00140 minQuantity.nCachePages += minPages; 00141
00142 if (isMAXU(numBlocksHashTable)) { 00143 optType = EXEC_RESOURCE_UNBOUNDED; 00144 } else { 00145
00146
00147 optQuantity.nCachePages += std::max(minPages + 1, numBlocksHashTable); 00148 optType = EXEC_RESOURCE_ESTIMATE; 00149 } 00150 } 00151 00152 void LhxJoinExecStream::setResourceAllocation( 00153 ExecStreamResourceQuantity &quantity) 00154 { 00155 ConfluenceExecStream::setResourceAllocation(quantity); 00156 hashInfo.numCachePages = quantity.nCachePages - numMiscCacheBlocks; 00157 } 00158 00159 void LhxJoinExecStream::open(bool restart) 00160 { 00161 ConfluenceExecStream::open(restart); 00162 00163 if (restart) { 00164 hashTable.releaseResources(); 00165 }; 00166 00167 uint partitionLevel = 0; 00168 00169
00170 00171 00172 00173 00174 probePart = SharedLhxPartition(new LhxPartition(this)); 00175 buildPart = SharedLhxPartition(new LhxPartition(this)); 00176 00177 (probePart->segStream).reset(); 00178 probePart->inputIndex = DefaultProbeInputIndex; 00179 00180 (buildPart->segStream).reset(); 00181 buildPart->inputIndex = DefaultBuildInputIndex; 00182 00183 vector partitionList; 00184 partitionList.push_back(probePart); 00185 partitionList.push_back(buildPart); 00186 00187 vector<shared_array > subPartStats; 00188 subPartStats.push_back(shared_array()); 00189 subPartStats.push_back(shared_array()); 00190 00191 shared_ptr<dynamic_bitset<> > joinFilterInit = 00192 shared_ptr<dynamic_bitset<> >(); 00193 00194 VectorOfUint filteredRows; 00195 filteredRows.push_back(0); 00196 filteredRows.push_back(0); 00197 00198
00199 00200 00201 rootPlan = SharedLhxPlan(new LhxPlan()); 00202 rootPlan->init( 00203 WeakLhxPlan(), 00204 partitionLevel, 00205 partitionList, 00206 subPartStats, 00207 joinFilterInit, 00208 filteredRows, 00209 enableSubPartStat, 00210 enableSwing); 00211 00212
00213 00214 00215 partInfo.init(&hashInfo); 00216 00217 curPlan = rootPlan.get(); 00218 isTopPlan = true; 00219 00220 hashTable.init( 00221 curPlan->getPartitionLevel(), 00222 hashInfo, 00223 curPlan->getBuildInput()); 00224 hashTableReader.init(&hashTable, hashInfo, curPlan->getBuildInput()); 00225 00226 bool status = hashTable.allocateResources(); 00227 assert (status); 00228 00229 buildReader.open(curPlan->getBuildPartition(), hashInfo); 00230 00231 joinState = (forcePartitionLevel > 0) ? ForcePartitionBuild : Build; 00232 nextState.clear(); 00233 } 00234 00235 ExecStreamResult LhxJoinExecStream::execute(ExecStreamQuantum const &quantum) 00236 { 00237 while (true) { 00238 switch (joinState) { 00239 case ForcePartitionBuild: 00240 { 00241 TupleData &buildTuple = inputTuple[curPlan->getBuildInput()]; 00242 00243
00244 00245 00246 for (;;) { 00247 if (buildReader.isTupleConsumptionPending()) { 00248 if (buildReader.getState() == EXECBUF_EOS) { 00249
00250 00251 00252 buildReader.close(); 00253 probeReader.open( 00254 curPlan->getProbePartition(), 00255 hashInfo); 00256 joinState = Probe; 00257 numTuplesProduced = 0; 00258 break; 00259 } 00260 00261 if (buildReader.demandData()) { 00262 if (isTopPlan) { 00263
00264 00265 00266 return EXECRC_BUF_UNDERFLOW; 00267 } else { 00268
00269 00270 00271 00272 00273 break; 00274 } 00275 } 00276 buildReader.unmarshalTuple(buildTuple); 00277 } 00278 00279
00280 00281 00282 00283 00284 00285 if (curPlan->getPartitionLevel() < forcePartitionLevel || 00286 hashTable.addTuple(buildTuple)) { 00287
00288 00289 00290 00291 00292 partInfo.open( 00293 &hashTableReader, &buildReader, buildTuple, 00294 curPlan->getProbePartition(), 00295 curPlan->getBuildInput()); 00296 joinState = Partition; 00297 break; 00298 } 00299 buildReader.consumeTuple(); 00300 } 00301 break; 00302 } 00303 case Build: 00304 { 00305 TupleData &buildTuple = inputTuple[curPlan->getBuildInput()]; 00306 00307
00308 00309 00310 for (;;) { 00311 if (buildReader.isTupleConsumptionPending()) { 00312 if (buildReader.getState() == EXECBUF_EOS) { 00313
00314 00315 00316 buildReader.close(); 00317 probeReader.open( 00318 curPlan->getProbePartition(), 00319 hashInfo); 00320 joinState = Probe; 00321 numTuplesProduced = 0; 00322 break; 00323 } 00324 00325 if (buildReader.demandData()) { 00326 if (isTopPlan) { 00327
00328 00329 00330 return EXECRC_BUF_UNDERFLOW; 00331 } else { 00332
00333 00334 00335 00336 00337 break; 00338 } 00339 } 00340 buildReader.unmarshalTuple(buildTuple); 00341 } 00342 00343
00344 00345 00346 if (hashTable.addTuple(buildTuple)) { 00347
00348 00349 00350 00351 00352 partInfo.open( 00353 &hashTableReader, &buildReader, buildTuple, 00354 curPlan->getProbePartition(), 00355 curPlan->getBuildInput()); 00356 joinState = Partition; 00357 break; 00358 } 00359 buildReader.consumeTuple(); 00360 } 00361 break; 00362 } 00363 case Partition: 00364 { 00365 for (;;) { 00366 if (curPlan->generatePartitions(hashInfo, partInfo) 00367 == PartitionUnderflow) { 00368
00369 00370 00371 return EXECRC_BUF_UNDERFLOW; 00372 } else { 00373
00374 00375 00376 00377 break; 00378 } 00379 } 00380 partInfo.close(); 00381 joinState = CreateChildPlan; 00382 break; 00383 } 00384 case CreateChildPlan: 00385 { 00386
00387 00388 00389 curPlan->createChildren( 00390 partInfo, 00391 enableSubPartStat, 00392 enableSwing); 00393 00394 FENNEL_TRACE(TRACE_FINE, curPlan->toString()); 00395 00396
00397 00398 00399 curPlan = curPlan->getFirstChild().get(); 00400 isTopPlan = false; 00401 00402 hashTable.releaseResources(); 00403 00404 hashTable.init( 00405 curPlan->getPartitionLevel(), 00406 hashInfo, 00407 curPlan->getBuildInput()); 00408 hashTableReader.init( 00409 &hashTable, 00410 hashInfo, 00411 curPlan->getBuildInput()); 00412 00413 bool status = hashTable.allocateResources(); 00414 assert (status); 00415 buildReader.open(curPlan->getBuildPartition(), hashInfo); 00416 00417 joinState = 00418 (forcePartitionLevel > 0) ? ForcePartitionBuild : Build; 00419 nextState.clear(); 00420 break; 00421 } 00422 case GetNextPlan: 00423 { 00424 hashTable.releaseResources(); 00425 00426 checkAbort(); 00427 00428 curPlan = curPlan->getNextLeaf(); 00429 00430 if (curPlan) { 00431 hashTable.init( 00432 curPlan->getPartitionLevel(), 00433 hashInfo, 00434 curPlan->getBuildInput()); 00435 hashTableReader.init( 00436 &hashTable, 00437 hashInfo, 00438 curPlan->getBuildInput()); 00439 00440 bool status = hashTable.allocateResources(); 00441 assert (status); 00442 buildReader.open(curPlan->getBuildPartition(), hashInfo); 00443 joinState = 00444 (forcePartitionLevel > 0) ? ForcePartitionBuild : Build; 00445 nextState.clear(); 00446 } else { 00447 joinState = Done; 00448 } 00449 break; 00450 } 00451 case Probe: 00452 { 00453 TupleData &probeTuple = inputTuple[curPlan->getProbeInput()]; 00454 uint probeTupleSize = inputTupleSize[curPlan->getProbeInput()]; 00455 TupleProjection &probeKeyProj = 00456 hashInfo.keyProj[curPlan->getProbeInput()]; 00457 uint buildTupleSize = inputTupleSize[curPlan->getBuildInput()]; 00458 bool removeDuplicateProbe = 00459 hashInfo.removeDuplicate[curPlan->getProbeInput()]; 00460 TupleProjection &filterNullProbeKeyProj = 00461 hashInfo.filterNullKeyProj[curPlan->getProbeInput()]; 00462 bool filterNullProbe = regularJoin; 00463 00464 uint probeFieldOffset = 00465 returnBuild(curPlan) ? 00466 buildTupleSize * curPlan->getProbeInput() : 0; 00467 uint buildFieldOffset = 00468 returnProbe(curPlan) ? 00469 probeTupleSize * curPlan->getBuildInput() : 0; 00470 uint probeFieldLength = 00471 returnProbe(curPlan) ? probeTupleSize : 0; 00472 uint buildFieldLength = 00473 returnBuild(curPlan) ? buildTupleSize : 0; 00474 00475
00476 00477 00478 for (;;) { 00479 if (probeReader.isTupleConsumptionPending()) { 00480 if (probeReader.getState() == EXECBUF_EOS) { 00481 probeReader.close(); 00482 if (returnBuildOuter(curPlan)) { 00483
00484 00485 00486 00487 00488 00489 00490 00491 00492 00493 hashTableReader.bindUnMatched(); 00494 00495
00496 00497 00498 00499 for (uint i = 0; i < probeFieldLength; i ++) { 00500 outputTuple[i + probeFieldOffset].pData = 00501 NULL; 00502 } 00503 joinState = ProduceBuild; 00504 nextState.push_back(GetNextPlan); 00505 } else { 00506
00507 00508 00509 joinState = GetNextPlan; 00510 } 00511 break; 00512 } 00513 if (probeReader.demandData()) { 00514 if (isTopPlan) { 00515
00516 00517 00518 return EXECRC_BUF_UNDERFLOW; 00519 } else { 00520
00521 00522 00523 00524 00525 break; 00526 } 00527 } 00528 probeReader.unmarshalTuple(probeTuple); 00529 } 00530 00531 PBuffer keyBuf = NULL; 00532 00533
00534 00535 00536 00537 00538 if (!filterNullProbe || 00539 !probeTuple.containsNull(filterNullProbeKeyProj)) { 00540 keyBuf = 00541 hashTable.findKey( 00542 probeTuple, 00543 probeKeyProj, 00544 removeDuplicateProbe); 00545 } 00546 00547 if (keyBuf) { 00548 if (returnBuildInner(curPlan)) { 00549
00550 00551 00552 00553 00554 00555 00556 00557 00558 00559 00560 00561 00562 00563 for (uint i = 0; i < probeFieldLength; i ++) { 00564 outputTuple[i + probeFieldOffset].copyFrom( 00565 probeTuple[i]); 00566 } 00567 00571 hashTableReader.bindKey(keyBuf); 00572 joinState = ProduceBuild; 00573 nextState.push_back(Probe); 00574 break; 00575 } else if (returnProbeInner(curPlan) && 00576 returnProbeOuter() && returnBuild(curPlan)) { 00577
00578 00579 00580 00581 00582 00583 00584 00585 00586 00587 for (uint i = 0; i < probeFieldLength; i ++) { 00588 outputTuple[i + probeFieldOffset].copyFrom( 00589 probeTuple[i]); 00590 } 00591 joinState = ProducePending; 00592 nextState.push_back(Probe); 00593 break; 00594 } else { 00595
00596 00597 00598 00599 00600 00601 00602 probeReader.consumeTuple(); 00603 } 00604 } else { 00605
00606 00607 00608 00609 if (returnProbeOuter(curPlan)) { 00610
00611 00612 00613 00614 00615 00616 00617 for (uint i = 0; i < probeFieldLength; i ++) { 00618 outputTuple[i + probeFieldOffset].copyFrom( 00619 probeTuple[i]); 00620 } 00621 00622 for (uint i = 0; i < buildFieldLength; i ++) { 00623 outputTuple[i + buildFieldOffset].pData = NULL; 00624 } 00625 joinState = ProducePending; 00626 nextState.push_back(Probe); 00627 break; 00628 } else { 00629 probeReader.consumeTuple(); 00630 } 00631 } 00632 } 00633 break; 00634 } 00635 case ProduceBuild: 00636 { 00637 TupleData &buildTuple = inputTuple[curPlan->getBuildInput()]; 00638 uint probeTupleSize = inputTupleSize[curPlan->getProbeInput()]; 00639 uint buildTupleSize = inputTupleSize[curPlan->getBuildInput()]; 00640 uint buildFieldOffset = 00641 returnProbe(curPlan) ? 00642 probeTupleSize * curPlan->getBuildInput() : 0; 00643 uint buildFieldLength = 00644 returnBuild(curPlan) ? buildTupleSize : 0; 00645 00646
00647 00648 00649 00650 00651 if (hashTableReader.getNext(buildTuple)) { 00652 for (uint i = 0; i < buildFieldLength; i ++) { 00653 outputTuple[i + buildFieldOffset].copyFrom( 00654 buildTuple[i]); 00655 } 00656 00657 joinState = ProducePending; 00658
00659 00660 00661 00662 nextState.push_back(ProduceBuild); 00663 } else { 00664 joinState = nextState.back(); 00665 nextState.pop_back(); 00666 if (joinState == Probe) { 00667 probeReader.consumeTuple(); 00668 } 00669 } 00670 break; 00671 } 00672 case ProducePending: 00673 { 00674 if (pOutAccessor->produceTuple(outputTuple)) { 00675 numTuplesProduced++; 00676 joinState = nextState.back(); 00677 nextState.pop_back(); 00678 if (joinState == Probe) { 00679 probeReader.consumeTuple(); 00680 } 00681 } else { 00682 numTuplesProduced = 0; 00683 return EXECRC_BUF_OVERFLOW; 00684 } 00685 00686
00687 00688 00689 00690 if (numTuplesProduced >= quantum.nTuplesMax) { 00691
00692 00693 00694 numTuplesProduced = 0; 00695 return EXECRC_QUANTUM_EXPIRED; 00696 } 00697 break; 00698 } 00699 case Done: 00700 { 00701 pOutAccessor->markEOS(); 00702 return EXECRC_EOS; 00703 } 00704 } 00705 } 00706 00707
00708 00709 00710 assert (false); 00711 } 00712 00713 void LhxJoinExecStream::closeImpl() 00714 { 00715 hashTable.releaseResources(); 00716 if (rootPlan) { 00717 rootPlan->close(); 00718 rootPlan.reset(); 00719 } 00720 ConfluenceExecStream::closeImpl(); 00721 } 00722 00723 void LhxJoinExecStream::setJoinType( 00724 LhxJoinExecStreamParams const &params) 00725 { 00726
00727 00728 00729 00730 00731 00732 00733 00734 00735 00736 00737 00738 00739 00740 00741 00742 00743 00744 00745 00746 00747 00748 00749 joinType.reset(new dynamic_bitset<>(4)); 00750 00751 joinType->set(0, params.leftInner); 00752 joinType->set(1, params.leftOuter); 00753 joinType->set(2, params.rightInner); 00754 joinType->set(3, params.rightOuter); 00755 00756
00757 00758 00759 00760 00761 00762 00763 assert (joinType->count() != 0); 00764 00765 regularJoin = !params.setopDistinct && !params.setopAll; 00766 setopDistinct = params.setopDistinct && !params.setopAll; 00767 setopAll = !params.setopDistinct && params.setopAll; 00768 00769 assert (setopAll && (regularJoin || setopDistinct)); 00770 00771
00772 00773 00774 00775 00776 bool leftAnti = 00777 (returnProbeOuter() && returnProbeInner() && returnBuild()); 00778 00779 assert (!(leftAnti && setopDistinct)); 00780 } 00781 00782 void LhxJoinExecStream::setHashInfo( 00783 LhxJoinExecStreamParams const &params) 00784 { 00785 uint numInputs = inAccessors.size(); 00786 for (int inputIndex = 0; inputIndex < numInputs; inputIndex++) { 00787 hashInfo.streamBufAccessor.push_back(inAccessors[inputIndex]); 00788 hashInfo.inputDesc.push_back( 00789 inAccessors[inputIndex]->getTupleDesc()); 00790
00791 00792 00793 hashInfo.removeDuplicate.push_back(setopDistinct); 00794 hashInfo.numRows.push_back(params.numRows); 00795 hashInfo.cndKeys.push_back(params.cndKeys); 00796 } 00797 00798 bool leftSemi = 00799 (returnProbeInner() && returnProbeOuter() && returnBuild()); 00800 00801 bool rightSemi = 00802 (returnBuildInner() && returnBuildOuter() && returnProbe()); 00803 00804
00805 00806 00807 00808 00809 00810 00811 00812 00813 00814 00815 00816 00817 00818 00819 00820 00821 if (leftSemi) { 00822 hashInfo.removeDuplicate[DefaultBuildInputIndex] = true; 00823 } 00824 00825 if (rightSemi) { 00826 hashInfo.removeDuplicate[DefaultProbeInputIndex] = true; 00827 } 00828 00829
00830 00831 00832 00833 hashInfo.filterNull.push_back(regularJoin && returnProbeOuter()); 00834 hashInfo.filterNull.push_back(regularJoin && returnBuildOuter()); 00835 00836 hashInfo.keyProj.push_back(params.leftKeyProj); 00837 hashInfo.keyProj.push_back(params.rightKeyProj); 00838 00839 TupleProjection filterNullLeftKeyProj; 00840 TupleProjection filterNullRightKeyProj; 00841 00842
00843
00844 filterNullLeftKeyProj.projectFrom( 00845 params.leftKeyProj, params.filterNullKeyProj); 00846 00847 filterNullRightKeyProj.projectFrom( 00848 params.rightKeyProj, params.filterNullKeyProj); 00849 00850 hashInfo.filterNullKeyProj.push_back(filterNullLeftKeyProj); 00851 hashInfo.filterNullKeyProj.push_back(filterNullRightKeyProj); 00852 00853 hashInfo.useJoinFilter.push_back( 00854 params.enableJoinFilter && returnProbeOuter()); 00855 hashInfo.useJoinFilter.push_back( 00856 params.enableJoinFilter && returnBuildOuter()); 00857 00858 hashInfo.memSegmentAccessor = params.scratchAccessor; 00859 hashInfo.externalSegmentAccessor.pCacheAccessor = params.pCacheAccessor; 00860 hashInfo.externalSegmentAccessor.pSegment = params.pTempSegment; 00861 00862 for (int inputIndex = 0; inputIndex < numInputs; inputIndex++) { 00863 TupleProjection &keyProj = hashInfo.keyProj[inputIndex]; 00864 TupleDescriptor &inputDesc = hashInfo.inputDesc[inputIndex]; 00865 00866 vector isKeyVarChar; 00867 TupleProjection dataProj; 00868 00869
00870 00871 00872 00873 for (int j = 0; j < keyProj.size(); j ++) { 00874 StoredTypeDescriptor::Ordinal ordinal = 00875 inputDesc[keyProj[j]].pTypeDescriptor->getOrdinal(); 00876 if (ordinal == STANDARD_TYPE_VARCHAR) { 00877 isKeyVarChar.push_back(HASH_TRIM_VARCHAR); 00878 } else if (ordinal == STANDARD_TYPE_UNICODE_VARCHAR) { 00879 isKeyVarChar.push_back(HASH_TRIM_UNICODE_VARCHAR); 00880 } else { 00881 isKeyVarChar.push_back(HASH_TRIM_NONE); 00882 } 00883 } 00884 00885 hashInfo.isKeyColVarChar.push_back(isKeyVarChar); 00886 00887
00888 00889 00890 00891 for (int i = 0; i < inputDesc.size(); i ++) { 00892
00893 00894 00895 bool colIsKey = false; 00896 for (int j = 0; j < keyProj.size(); j ++) { 00897 if (i == keyProj[j]) { 00898 colIsKey = true; 00899 break; 00900 } 00901 } 00902 if (!colIsKey) { 00903 dataProj.push_back(i); 00904 } 00905 } 00906 hashInfo.dataProj.push_back(dataProj); 00907 } 00908 } 00909 00910 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxJoinExecStream.cpp#4 $"); 00911 00912