Fennel: /home/pub/open/dev/fennel/hashexe/LhxAggExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 #include "fennel/common/CommonPreamble.h" 00024 #include "fennel/hashexe/LhxAggExecStream.h" 00025 #include "fennel/segment/Segment.h" 00026 #include "fennel/exec/ExecStreamBufAccessor.h" 00027 #include "fennel/tuple/StandardTypeDescriptor.h" 00028 00029 using namespace std; 00030 00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxAggExecStream.cpp#2 $"); 00032 00033 void LhxAggExecStream::prepare( 00034 LhxAggExecStreamParams const &params) 00035 { 00036 ConduitExecStream::prepare(params); 00037 00038 setHashInfo(params); 00039 setAggComputers(hashInfo, params.aggInvocations); 00040 00041
00042 00043 00044 forcePartitionLevel = params.forcePartitionLevel; 00045 enableSubPartStat = params.enableSubPartStat; 00046 00047 buildInputIndex = hashInfo.inputDesc.size() - 1; 00048 00049
00050 00051 00052 00053 hashTable.calculateSize(hashInfo, buildInputIndex, numBlocksHashTable); 00054 00055 TupleDescriptor outputDesc; 00056 00057 outputDesc = hashInfo.inputDesc[buildInputIndex]; 00058 00059 if (!params.outputTupleDesc.empty()) { 00060 assert (outputDesc == params.outputTupleDesc); 00061 } 00062 00063 outputTuple.compute(outputDesc); 00064 pOutAccessor->setTupleShape(outputDesc); 00065 00066
00067 00068 00069 uint numInputs = 1; 00070 numMiscCacheBlocks = LhxPlan::LhxChildPartCount * numInputs; 00071 } 00072 00073 void LhxAggExecStream::getResourceRequirements( 00074 ExecStreamResourceQuantity &minQuantity, 00075 ExecStreamResourceQuantity &optQuantity, 00076 ExecStreamResourceSettingType &optType) 00077 { 00078 ConduitExecStream::getResourceRequirements(minQuantity,optQuantity); 00079 00080 uint minPages = 00081 LhxHashTable::LhxHashTableMinPages * LhxPlan::LhxChildPartCount 00082 + numMiscCacheBlocks; 00083 minQuantity.nCachePages += minPages; 00084
00085 if (isMAXU(numBlocksHashTable)) { 00086 optType = EXEC_RESOURCE_UNBOUNDED; 00087 } else { 00088
00089
00090 optQuantity.nCachePages += std::max(minPages + 1, numBlocksHashTable); 00091 optType = EXEC_RESOURCE_ESTIMATE; 00092 } 00093 } 00094 00095 void LhxAggExecStream::setResourceAllocation( 00096 ExecStreamResourceQuantity &quantity) 00097 { 00098 ConduitExecStream::setResourceAllocation(quantity); 00099 hashInfo.numCachePages = quantity.nCachePages - numMiscCacheBlocks; 00100 } 00101 00102 00103 void LhxAggExecStream::open(bool restart) 00104 { 00105 ConduitExecStream::open(restart); 00106 00107 if (restart) { 00108 hashTable.releaseResources(); 00109 } 00110 00111 uint partitionLevel = 0; 00112 hashTable.init(partitionLevel, hashInfo, &aggComputers, buildInputIndex); 00113 hashTableReader.init(&hashTable, hashInfo, buildInputIndex); 00114 00115 bool status = hashTable.allocateResources(); 00116 00117 assert(status); 00118 00119
00120
00121
00122
00123
00124 00125
00126 00127 00128 00129 00130 vector partitionList; 00131 00132 buildPart = SharedLhxPartition(new LhxPartition(this)); 00133
00134
00135 buildPart->segStream.reset(); 00136 buildPart->inputIndex = 0; 00137 partitionList.push_back(buildPart); 00138 00139 rootPlan = SharedLhxPlan(new LhxPlan()); 00140 rootPlan->init( 00141 WeakLhxPlan(), 00142 partitionLevel, 00143 partitionList, 00144 enableSubPartStat); 00145 00146
00147 00148 00149 partInfo.init(&hashInfo); 00150 00151
00152 00153 00154 curPlan = rootPlan.get(); 00155 isTopPlan = true; 00156 00157 buildReader.open(curPlan->getPartition(buildInputIndex), hashInfo); 00158 00159 aggState = (forcePartitionLevel > 0) ? ForcePartitionBuild : Build; 00160 } 00161 00162 ExecStreamResult LhxAggExecStream::execute(ExecStreamQuantum const &quantum) 00163 { 00164 while (true) { 00165
00166
00167
00168
00169 switch (aggState) { 00170
00171
00172
00173
00174 case ForcePartitionBuild: 00175 { 00176
00177 00178 00179
00180
00181 inputTuple.compute(buildReader.getTupleDesc()); 00182 for (;;) { 00183 if (buildReader.isTupleConsumptionPending()) { 00184 if (buildReader.getState() == EXECBUF_EOS) { 00185 numTuplesProduced = 0; 00186
00187 00188 00189 aggState = Produce; 00190 break; 00191 } 00192 00193 if (buildReader.demandData()) { 00194 if (isTopPlan) { 00195
00196 00197 00198 00199 return EXECRC_BUF_UNDERFLOW; 00200 } else { 00201
00202 00203 00204 00205 00206 break; 00207 } 00208 } 00209 buildReader.unmarshalTuple(inputTuple); 00210 } 00211 00212
00213 00214 00215 00216 00217 00218 if (curPlan->getPartitionLevel() < forcePartitionLevel || 00219 hashTable.addTuple(inputTuple)) { 00220 if (isTopPlan) { 00221 partInfo.open( 00222 &hashTableReader, 00223 &buildReader, 00224 inputTuple, 00225 &aggComputers); 00226 } else { 00227 partInfo.open( 00228 &hashTableReader, 00229 &buildReader, 00230 inputTuple, 00231 &partialAggComputers); 00232 } 00233 aggState = Partition; 00234 break; 00235 } 00236 buildReader.consumeTuple(); 00237 } 00238 break; 00239 } 00240 case Build: 00241 { 00242
00243 00244 00245 inputTuple.compute(buildReader.getTupleDesc()); 00246 for (;;) { 00247 if (buildReader.isTupleConsumptionPending()) { 00248 if (buildReader.getState() == EXECBUF_EOS) { 00249 buildReader.close(); 00250 numTuplesProduced = 0; 00251
00252 00253 00254 aggState = Produce; 00255 break; 00256 } 00257 00258 if (buildReader.demandData()) { 00259 if (isTopPlan) { 00260
00261 00262 00263 00264 return EXECRC_BUF_UNDERFLOW; 00265 } else { 00266
00267 00268 00269 00270 00271 break; 00272 } 00273 } 00274 buildReader.unmarshalTuple(inputTuple); 00275 } 00276 00277
00278 00279 00280 if (hashTable.addTuple(inputTuple)) { 00281 if (isTopPlan) { 00282 partInfo.open( 00283 &hashTableReader, 00284 &buildReader, 00285 inputTuple, 00286 &aggComputers); 00287 } else { 00288 partInfo.open( 00289 &hashTableReader, 00290 &buildReader, 00291 inputTuple, 00292 &partialAggComputers); 00293 } 00294 aggState = Partition; 00295 break; 00296 } 00297 buildReader.consumeTuple(); 00298 } 00299 break; 00300 } 00301 case Partition: 00302 { 00303 for (;;) { 00304 if (curPlan->generatePartitions(hashInfo, partInfo) 00305 == PartitionUnderflow) { 00306
00307 00308 00309 return EXECRC_BUF_UNDERFLOW; 00310 } else { 00311
00312
00313 00314 00315 00316 break; 00317 } 00318 } 00319 partInfo.close(); 00320 aggState = CreateChildPlan; 00321 break; 00322 } 00323 case CreateChildPlan: 00324 { 00325
00326 00327 00328 curPlan->createChildren(partInfo, false, false); 00329 00330 FENNEL_TRACE(TRACE_FINE, curPlan->toString()); 00331 00332
00333
00334
00335
00336
00337
00338 00339 00340 curPlan = curPlan->getFirstChild().get(); 00341 isTopPlan = false; 00342 00343 hashTable.releaseResources(); 00344 00345
00346 00347 00348 00349 00350 00351 hashTable.init( 00352 curPlan->getPartitionLevel(), 00353 hashInfo, 00354 &partialAggComputers, 00355 buildInputIndex); 00356 hashTableReader.init( 00357 &hashTable, 00358 hashInfo, 00359 buildInputIndex); 00360 00361 bool status = hashTable.allocateResources(); 00362 assert(status); 00363 00364 buildReader.open( 00365 curPlan->getPartition(buildInputIndex), 00366 hashInfo); 00367 00368 aggState = 00369 (forcePartitionLevel > 0) ? ForcePartitionBuild : Build; 00370 break; 00371 } 00372 case GetNextPlan: 00373 { 00374 hashTable.releaseResources(); 00375 00376 checkAbort(); 00377 00378 curPlan = curPlan->getNextLeaf(); 00379 00380 if (curPlan) { 00381
00382 00383 00384 00385 00386 00387 hashTable.init( 00388 curPlan->getPartitionLevel(), 00389 hashInfo, 00390 &partialAggComputers, 00391 buildInputIndex); 00392 hashTableReader.init(&hashTable, hashInfo, buildInputIndex); 00393 bool status = hashTable.allocateResources(); 00394 assert(status); 00395 00396 buildReader.open( 00397 curPlan->getPartition(buildInputIndex), 00398 hashInfo); 00399 00400 aggState = 00401 (forcePartitionLevel > 0) ? ForcePartitionBuild : Build; 00402 } else { 00403 aggState = Done; 00404 } 00405 break; 00406 } 00407 case Produce: 00408 { 00409
00410
00411
00412
00413
00414
00415
00416
00417 00418 00419 00420 if (hashTableReader.getNext(outputTuple)) { 00421 aggState = ProducePending; 00422
00423 00424 00425 00426 nextState = Produce; 00427 } else { 00428 aggState = GetNextPlan; 00429 } 00430 break; 00431 } 00432 case ProducePending: 00433 { 00434 if (pOutAccessor->produceTuple(outputTuple)) { 00435 numTuplesProduced++; 00436 aggState = nextState; 00437 } else { 00438 numTuplesProduced = 0; 00439 return EXECRC_BUF_OVERFLOW; 00440 } 00441 00442
00443 00444 00445 00446 if (numTuplesProduced >= quantum.nTuplesMax) { 00447
00448 00449 00450 numTuplesProduced = 0; 00451 return EXECRC_QUANTUM_EXPIRED; 00452 } 00453 break; 00454 } 00455 case Done: 00456 { 00457 pOutAccessor->markEOS(); 00458 return EXECRC_EOS; 00459 } 00460 } 00461 } 00462 00463
00464 00465 00466 assert(false); 00467 } 00468 00469 void LhxAggExecStream::closeImpl() 00470 { 00471 hashTable.releaseResources(); 00472 if (rootPlan) { 00473 rootPlan->close(); 00474 rootPlan.reset(); 00475 } 00476
00477
00478
00479 ConduitExecStream::closeImpl(); 00480 } 00481 00482 void LhxAggExecStream::setAggComputers( 00483 LhxHashInfo &hashInfo, 00484 AggInvocationList const &aggInvocations) 00485 { 00486
00487 00488 00489 TupleDescriptor inputDesc = pInAccessor->getTupleDesc(); 00490 00491
00492 00493 00494 00495 TupleDescriptor &hashDesc = hashInfo.inputDesc.back(); 00496 00497
00498 00499 00500 TupleProjection &aggsProj = hashInfo.aggsProj; 00501 00506 AggFunction partialAggFunction; 00507 00508 uint i = 0; 00509 00510 assert (aggInvocations.size() == aggsProj.size()); 00511 00512 for (AggInvocationConstIter pInvocation(aggInvocations.begin()); 00513 pInvocation != aggInvocations.end(); 00514 ++pInvocation) 00515 { 00516 switch (pInvocation->aggFunction) { 00517 case AGG_FUNC_COUNT: 00518 partialAggFunction = AGG_FUNC_SUM; 00519 break; 00520 case AGG_FUNC_SUM: 00521 case AGG_FUNC_MIN: 00522 case AGG_FUNC_MAX: 00523 case AGG_FUNC_SINGLE_VALUE: 00524 partialAggFunction = pInvocation->aggFunction; 00525 break; 00526 default: 00527 permFail("unknown aggregation function: " 00528 << pInvocation->aggFunction); 00529 break; 00530 } 00531 00532
00533 00534 00535 TupleAttributeDescriptor const *pInputAttr = NULL; 00536 if (pInvocation->iInputAttr != -1) { 00537 pInputAttr = &(inputDesc[pInvocation->iInputAttr]); 00538 } 00539 aggComputers.push_back( 00540 AggComputer::newAggComputer( 00541 pInvocation->aggFunction, pInputAttr)); 00542 aggComputers.back().setInputAttrIndex(pInvocation->iInputAttr); 00543 00544
00545 00546 00547 TupleAttributeDescriptor const *pInputAttrPartialAgg = 00548 &(hashDesc[aggsProj[i]]); 00549 partialAggComputers.push_back( 00550 AggComputer::newAggComputer( 00551 partialAggFunction, pInputAttrPartialAgg)); 00552 partialAggComputers.back().setInputAttrIndex(aggsProj[i]); 00553 i ++; 00554 } 00555 } 00556 00557 void LhxAggExecStream::setHashInfo( 00558 LhxAggExecStreamParams const &params) 00559 { 00560 TupleDescriptor inputDesc = pInAccessor->getTupleDesc(); 00561 00562 hashInfo.streamBufAccessor.push_back(pInAccessor); 00563 00564 hashInfo.cndKeys.push_back(params.cndGroupByKeys); 00565 00566 hashInfo.numRows.push_back(params.numRows); 00567 00568 hashInfo.filterNull.push_back(false); 00569 00570
00571 TupleProjection filterNullKeyProj; 00572 hashInfo.filterNullKeyProj.push_back(filterNullKeyProj); 00573 00574 hashInfo.removeDuplicate.push_back(false); 00575 hashInfo.useJoinFilter.push_back(false); 00576 00577 hashInfo.memSegmentAccessor = params.scratchAccessor; 00578 hashInfo.externalSegmentAccessor.pCacheAccessor = params.pCacheAccessor; 00579 hashInfo.externalSegmentAccessor.pSegment = params.pTempSegment; 00580 00581 TupleProjection keyProj; 00582 vector isKeyColVarChar; 00583 00584 for (int i = 0; i < params.groupByKeyCount; i ++) { 00585 keyProj.push_back(i); 00586
00587 00588 00589 00590 StoredTypeDescriptor::Ordinal ordinal = 00591 inputDesc[i].pTypeDescriptor->getOrdinal(); 00592 if (ordinal == STANDARD_TYPE_VARCHAR) { 00593 isKeyColVarChar.push_back(HASH_TRIM_VARCHAR); 00594 } else if (ordinal == STANDARD_TYPE_UNICODE_VARCHAR) { 00595 isKeyColVarChar.push_back(HASH_TRIM_UNICODE_VARCHAR); 00596 } else { 00597 isKeyColVarChar.push_back(HASH_TRIM_NONE); 00598 } 00599 } 00600 hashInfo.keyProj.push_back(keyProj); 00601 hashInfo.isKeyColVarChar.push_back(isKeyColVarChar); 00602 00603
00604 00605 00606 TupleProjection dataProj; 00607 hashInfo.dataProj.push_back(dataProj); 00608 00609
00610 00611 00612 TupleDescriptor keyDesc; 00613 keyDesc.projectFrom(inputDesc, keyProj); 00614 00615
00616 00617 00618 StandardTypeDescriptorFactory stdTypeFactory; 00619 TupleAttributeDescriptor countDesc( 00620 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00621 00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633 00634
00635
00636 00637 00638 00639 00640 int i = params.groupByKeyCount; 00641 for (AggInvocationConstIter pInvocation(params.aggInvocations.begin()); 00642 pInvocation != params.aggInvocations.end(); 00643 ++pInvocation) 00644 { 00645 switch (pInvocation->aggFunction) { 00646 case AGG_FUNC_COUNT: 00647 keyDesc.push_back(countDesc); 00648 break; 00649 case AGG_FUNC_SUM: 00650 case AGG_FUNC_MIN: 00651 case AGG_FUNC_MAX: 00652 case AGG_FUNC_SINGLE_VALUE: 00653
00654 keyDesc.push_back(inputDesc[pInvocation->iInputAttr]); 00655 keyDesc.back().isNullable = true; 00656 break; 00657 } 00658 hashInfo.aggsProj.push_back(i++); 00659 } 00660 00661 hashInfo.inputDesc.push_back(keyDesc); 00662 } 00663 00664 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/hashexe/LhxAggExecStream.cpp#2 $"); 00665 00666