Fennel: /home/pub/open/dev/fennel/hashexe/LhxPartition.h Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 #ifndef Fennel_LhxPartition_Included 00024 #define Fennel_LhxPartition_Included 00025 00026 #include "fennel/tuple/TupleData.h" 00027 #include "fennel/tuple/TupleDescriptor.h" 00028 #include "fennel/tuple/TupleAccessor.h" 00029 #include "fennel/tuple/TuplePrinter.h" 00030 #include "fennel/segment/SegInputStream.h" 00031 #include "fennel/segment/SegOutputStream.h" 00032 #include "fennel/segment/SegStreamAllocation.h" 00033 #include "fennel/hashexe/LhxHashBase.h" 00034 #include "fennel/hashexe/LhxHashTable.h" 00035 #include "fennel/exec/ExecStreamBufAccessor.h" 00036 #include "fennel/exec/ExecStream.h" 00037 #include "fennel/exec/AggComputer.h" 00038 #include <boost/dynamic_bitset.hpp> 00039 #include <boost/scoped_array.hpp> 00040 #include <boost/shared_array.hpp> 00041 #include <boost/enable_shared_from_this.hpp> 00042 00043 using namespace std; 00044 using namespace boost; 00045 00046 FENNEL_BEGIN_NAMESPACE 00047 00054 struct LhxPartition 00055 { 00056
00057 00058 00059 SharedSegStreamAllocation segStream; 00060 00061
00062 00063 00064 00065 00066 uint inputIndex; 00067 00072 ExecStream *pExecStream; 00073 00074 explicit LhxPartition(ExecStream *pExecStreamInit); 00075 }; 00076 00077 class FENNEL_HASHEXE_EXPORT LhxPartitionWriter 00078 { 00082 SharedLhxPartition destPartition; 00083 00087 SharedSegOutputStream pSegOutputStream; 00088 00092 TupleAccessor tupleAccessor; 00093 00094 bool isAggregate; 00095 00101 LhxHashTable hashTable; 00102 LhxHashTableReader hashTableReader; 00103 TupleData partialAggTuple; 00104 00105 public: 00106 void open( 00107 SharedLhxPartition destPartitionInit, 00108 LhxHashInfo const &hashInfo); 00109 00110 void open( 00111 SharedLhxPartition destPartitionInit, 00112 LhxHashInfo &hashInfo, 00113 AggComputerList *aggList, 00114 uint numWriterCachePages); 00115 00116 inline void allocateResources(); 00117 inline void releaseResources(); 00118 void marshalTuple(TupleData const &inputTuple); 00119 void aggAndMarshalTuple(TupleData const &inputTuple); 00120 void close(); 00121 }; 00122 00123 class FENNEL_HASHEXE_EXPORT LhxPartitionReader 00124 { 00128 SharedLhxPartition srcPartition; 00129 00133 SharedSegInputStream pSegInputStream; 00134 00138 TupleAccessor tupleAccessor; 00139 00143 uint tupleStorageLength; 00144 00145 bool srcIsInputStream; 00146 ExecStreamBufState bufState; 00147 TupleDescriptor outputTupleDesc; 00148 00154 SharedExecStreamBufAccessor streamBufAccessor; 00155 00156 public: 00157 void open( 00158 SharedLhxPartition srcPartition, 00159 LhxHashInfo const &hashInfo); 00160 00161 bool isTupleConsumptionPending(); 00162 bool demandData(); 00163 void unmarshalTuple(TupleData &outputTuple); 00164 void consumeTuple(); 00165 void close(); 00166 inline ExecStreamBufState getState() const; 00167 inline TupleDescriptor const &getTupleDesc() const; 00168 inline SharedLhxPartition getSourcePartition() const; 00169 }; 00170 00171 00172 00173 00174 00175 00176 enum LhxPartitionState { 00177 PartitionUnderflow, PartitionEndOfData 00178 }; 00179 00180 struct LhxPartitionInfo 00181 { 00185 LhxHashTableReader *hashTableReader; 00186 00197 LhxPartitionReader probeReader; 00198 LhxPartitionReader *reader; 00199 00203 TupleData buildTuple; 00204 00205
00206
00207
00208 00220 vector writerList; 00221 vector destPartitionList; 00222
00223
00224 vector<shared_ptr<dynamic_bitset<> > > joinFilterList; 00225 shared_array filteredRowCountList; 00226 00227
00228 00229 00230 00231 00232 00233 vector<shared_array > subPartStatList; 00234 00235 uint numInputs; 00236 uint curInputIndex; 00237 00238 LhxHashInfo *hashInfo; 00239 00240
00241 00242 00243 00244 bool partitionMemory; 00245 00246 LhxPartitionInfo() 00247 { 00248 reader = NULL; 00249 hashTableReader = NULL; 00250 } 00251 00252
00253
00257 void init(LhxHashInfo *hashInfoInit); 00258 00266 void open( 00267 LhxHashTableReader *hashTableReaderInit, 00268 LhxPartitionReader *buildReader, 00269 TupleData &buildTuple, 00270 SharedLhxPartition probePartition, 00271 uint buildInputIndex); 00272 00279 void open( 00280 LhxHashTableReader *hashTableReaderInit, 00281 LhxPartitionReader *buildReader, 00282 TupleData &buildTuple, 00283 AggComputerList *aggList); 00284 00288 void close(); 00289 }; 00290 00291 class FENNEL_HASHEXE_EXPORT LhxPlan 00292 : public enable_shared_from_this 00293 { 00294 uint partitionLevel; 00295 vector partitions; 00296 shared_array joinSideToInputMap; 00297 00298 shared_ptr<dynamic_bitset<> > joinFilter; 00299 shared_array filteredRowCount; 00300 00301
00302 00303 00304 00305 00306 shared_array subPartToChildMap; 00307 vector<shared_array > childPartSize; 00308 00309 shared_array inputSize; 00310 00311
00312 00313 00314 00315 00316 00317 00318 00319 00320 00321 00322 WeakLhxPlan parentPlan; 00323 SharedLhxPlan firstChildPlan; 00324 SharedLhxPlan siblingPlan; 00325 00329 inline void addSibling(SharedLhxPlan siblingPlan); 00330 00336 void mapSubPartToChild(vector<shared_array > &subPartStats); 00337 00342 uint calculateChildIndex(uint hashKey, uint curInputIndex); 00343 00344 inline bool isBuildChildPart(uint childPartIndex); 00345 00346 inline bool isProbeChildPart(uint childPartIndex); 00347 00348 inline uint getBuildChildPart(uint childPartIndex); 00349 00350 inline uint getProbeChildPart(uint childPartIndex); 00351 00352 public: 00353
00354 00355 00356 00357 00358 static const uint LhxSubPartCount = 16; 00359 static const uint LhxChildPartCount = 3; 00360 00364 void init( 00365 WeakLhxPlan parentPlanInit, 00366 uint partitionLevelInit, 00367 vector &partitionsInit, 00368 bool enableSubPartStat); 00369 00373 void init( 00374 WeakLhxPlan parentPlanInit, 00375 uint partitionLevelInit, 00376 vector &partitionsInit, 00377 vector<shared_array > &subPartStats, 00378 shared_ptr<dynamic_bitset<> > filterInit, 00379 VectorOfUint &filteredRowsInit, 00380 bool enableSubPartStat, 00381 bool enableSwing); 00382 00383 00387 LhxPartitionState generatePartitions( 00388 LhxHashInfo const &hashInfo, 00389 LhxPartitionInfo &partInfo); 00390 00395 void createChildren(LhxHashInfo const &hashInfo, bool enableSubPartStat); 00396 00400 void createChildren( 00401 LhxPartitionInfo &partInfo, 00402 bool enableSubPartStat, 00403 bool enableSwing); 00404 00408 inline uint getPartitionLevel(); 00409 00413 inline SharedLhxPartition getBuildPartition(); 00414 inline SharedLhxPartition getProbePartition(); 00415 inline SharedLhxPartition getPartition(uint inputIndex); 00416 00417
00418 00419 00420 00421 inline uint getBuildInput(); 00422 inline uint getProbeInput(); 00423 00424
00425 00426 00427 00428 00429 inline uint getJoinSide(uint inputIndex); 00430 00434 inline SharedLhxPlan getFirstChild(); 00435 00439 LhxPlan *getFirstLeaf(); 00440 00444 LhxPlan *getNextLeaf(); 00445 00450 void close(); 00451 00457 string toString(); 00458 }; 00459 00460 inline LhxPartition::LhxPartition(ExecStream *pExecStreamInit) 00461 { 00462 pExecStream = pExecStreamInit; 00463 } 00464 00465 inline ExecStreamBufState LhxPartitionReader::getState() const 00466 { 00467 if (srcIsInputStream) { 00468 return streamBufAccessor->getState(); 00469 } else { 00470 return bufState; 00471 } 00472 } 00473 00474 inline SharedLhxPartition LhxPartitionReader::getSourcePartition() const 00475 { 00476 return srcPartition; 00477 } 00478 00479 inline TupleDescriptor const &LhxPartitionReader::getTupleDesc() const 00480 { 00481 return outputTupleDesc; 00482 } 00483 00484 inline void LhxPartitionWriter::allocateResources() 00485 { 00486 bool status = hashTable.allocateResources(); 00487 assert(status); 00488 } 00489 00490 inline void LhxPartitionWriter::releaseResources() 00491 { 00492 hashTable.releaseResources(); 00493 } 00494 00495 inline void LhxPlan::addSibling(SharedLhxPlan siblingPlanInit) 00496 { 00497 siblingPlan = siblingPlanInit; 00498 } 00499 00500 inline SharedLhxPlan LhxPlan::getFirstChild() 00501 { 00502 return firstChildPlan; 00503 } 00504 00505 inline uint LhxPlan::getPartitionLevel() 00506 { 00507 return partitionLevel; 00508 } 00509 00510 inline uint LhxPlan::getProbeInput() 00511 { 00512 return joinSideToInputMap[0]; 00513 } 00514 00515 inline uint LhxPlan::getBuildInput() 00516 { 00517 return joinSideToInputMap[partitions.size() - 1]; 00518 } 00519 00520 inline SharedLhxPartition LhxPlan::getProbePartition() 00521 { 00522 return partitions[getProbeInput()]; 00523 } 00524 00525 inline SharedLhxPartition LhxPlan::getBuildPartition() 00526 { 00527 return partitions[getBuildInput()]; 00528 } 00529 00530 inline SharedLhxPartition LhxPlan::getPartition(uint inputIndex) 00531 { 00532 return partitions[inputIndex]; 00533 } 00534 00535 inline uint LhxPlan::getJoinSide(uint inputIndex) 00536 { 00537 uint i = 0; 00538 while ((joinSideToInputMap[i] != inputIndex) 00539 && (i < partitions.size())) 00540 { 00541 i ++; 00542 } 00543 00544 return i; 00545 } 00546 00547 inline bool LhxPlan::isBuildChildPart(uint childPartIndex) 00548 { 00549 return ((childPartIndex / LhxChildPartCount) == getBuildInput()); 00550 } 00551 00552 inline bool LhxPlan::isProbeChildPart(uint childPartIndex) 00553 { 00554 return ((childPartIndex / LhxChildPartCount) == getProbeInput()); 00555 } 00556 00557 inline uint LhxPlan::getBuildChildPart(uint childPartIndex) 00558 { 00559 return ((childPartIndex % LhxChildPartCount) + 00560 getBuildInput() * LhxChildPartCount); 00561 } 00562 00563 inline uint LhxPlan::getProbeChildPart(uint childPartIndex) 00564 { 00565 return ((childPartIndex % LhxChildPartCount) + 00566 getProbeInput() * LhxChildPartCount); 00567 } 00568 00569 FENNEL_END_NAMESPACE 00570 00571 #endif 00572 00573