Fennel: /home/pub/open/dev/fennel/hashexe/LhxJoinExecStream.h Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 #ifndef Fennel_LhxJoinExecStream_Included 00024 #define Fennel_LhxJoinExecStream_Included 00025 00026 #include "fennel/exec/ConfluenceExecStream.h" 00027 #include "fennel/hashexe/LhxHashBase.h" 00028 #include "fennel/hashexe/LhxHashTable.h" 00029 #include "fennel/hashexe/LhxPartition.h" 00030 00031 using namespace boost; 00032 00033 FENNEL_BEGIN_NAMESPACE 00034 00042 struct LhxJoinExecStreamParams : public ConfluenceExecStreamParams 00043 { 00047 SharedSegment pTempSegment; 00048 00052 bool enableSubPartStat; 00053 00058 uint forcePartitionLevel; 00059 00060
00061
00062 00069 RecordNum cndKeys; 00070 00074 RecordNum numRows; 00075 00079 bool leftInner; 00080 00084 bool leftOuter; 00085 00089 bool rightInner; 00090 00094 bool rightOuter; 00095 00099 TupleProjection leftKeyProj; 00100 00104 TupleProjection rightKeyProj; 00105 00109 TupleProjection filterNullKeyProj; 00110 00115 TupleProjection outputProj; 00116 00125 bool setopDistinct; 00126 bool setopAll; 00127 00131 bool enableJoinFilter; 00132 00136 bool enableSwing; 00137 }; 00138 00139 class FENNEL_HASHEXE_EXPORT LhxJoinExecStream 00140 : public ConfluenceExecStream 00141 { 00142
00143
00144 00145 enum LhxDefaultJoinInputIndex { 00146 DefaultProbeInputIndex = 0, DefaultBuildInputIndex = 1 00147 }; 00148 00149 enum LhxJoinState { 00150 ForcePartitionBuild, Build, Probe, 00151 ProduceBuild, ProducePending, 00152 Partition, CreateChildPlan, GetNextPlan, Done 00153 }; 00154 00158 shared_array inputTuple; 00159 shared_array inputTupleSize; 00160 00164 TupleData outputTuple; 00165 00169 uint numTuplesProduced; 00170 00174 LhxHashInfo hashInfo; 00175 00179 LhxHashTable hashTable; 00180 LhxHashTableReader hashTableReader; 00181 00186 BlockNum numBlocksHashTable; 00187 00191 uint numMiscCacheBlocks; 00192 00193
00194 00195 00196 bool isTopPlan; 00197 SharedLhxPlan rootPlan; 00198 LhxPlan *curPlan; 00199 00204 LhxPartitionInfo partInfo; 00205 00209 SharedLhxPartition buildPart; 00210 SharedLhxPartition probePart; 00211 00215 LhxPartitionReader buildReader; 00216 LhxPartitionReader probeReader; 00217 00221 bool enableSubPartStat; 00222 00226 bool enableSwing; 00227 00232 uint forcePartitionLevel; 00233 00234
00235 00236 00237 LhxJoinState joinState; 00238 00242 vector nextState; 00243 00244
00245 00246 00247 shared_ptr<dynamic_bitset<> > joinType; 00248 00256 bool regularJoin; 00257 bool setopDistinct; 00258 bool setopAll; 00259 00263 virtual void closeImpl(); 00264 00265
00266 00267 00268 void setJoinType(LhxJoinExecStreamParams const &params); 00269 00270
00271 00272 00273 void setHashInfo(LhxJoinExecStreamParams const &params); 00274 00275
00276 00277 00278 00279 inline bool returnProbeInner(LhxPlan *curPlan = NULL); 00280 00281
00282 00283 00284 inline bool returnBuildInner(LhxPlan *curPlan = NULL); 00285 00286
00287 00288 00289 inline bool returnProbeOuter(LhxPlan *curPlan = NULL); 00290 00291
00292 00293 00294 inline bool returnBuildOuter(LhxPlan *curPlan = NULL); 00295 00296
00297 00298 00299 inline bool returnInner(LhxPlan *curPlan = NULL); 00300 00301
00302 00303 00304 inline bool returnProbe(LhxPlan *curPlan = NULL); 00305 00306
00307 00308 00309 inline bool returnBuild(LhxPlan *curPlan = NULL); 00310 00311 public: 00312
00313 00314 00315 virtual void prepare(LhxJoinExecStreamParams const &params); 00316 00317 virtual void open(bool restart); 00318 00319 virtual ExecStreamResult execute(ExecStreamQuantum const &quantum); 00320 00321 virtual void getResourceRequirements( 00322 ExecStreamResourceQuantity &minQuantity, 00323 ExecStreamResourceQuantity &optQuantity, 00324 ExecStreamResourceSettingType &optType); 00325 00326 virtual void setResourceAllocation( 00327 ExecStreamResourceQuantity &quantity); 00328 }; 00329 00330 inline bool LhxJoinExecStream::returnProbeInner(LhxPlan *curPlan) 00331 { 00332 uint probeInput = (curPlan == NULL) ? 0 : curPlan->getProbeInput(); 00333 return joinType->test(probeInput * 2 + 0); 00334 } 00335 00336 inline bool LhxJoinExecStream::returnBuildInner(LhxPlan *curPlan) 00337 { 00338 uint buildInput = (curPlan == NULL) ? 1 : curPlan->getBuildInput(); 00339 return joinType->test(buildInput * 2 + 0); 00340 } 00341 00342 inline bool LhxJoinExecStream::returnProbeOuter(LhxPlan *curPlan) 00343 { 00344 uint probeInput = (curPlan == NULL) ? 0 : curPlan->getProbeInput(); 00345 return joinType->test(probeInput * 2 + 1); 00346 } 00347 00348 inline bool LhxJoinExecStream::returnBuildOuter(LhxPlan *curPlan) 00349 { 00350 uint buildInput = (curPlan == NULL) ? 1 : curPlan->getBuildInput(); 00351 return joinType->test(buildInput * 2 + 1); 00352 } 00353 00354 inline bool LhxJoinExecStream::returnInner(LhxPlan *curPlan) 00355 { 00356 return (returnProbeInner(curPlan) && returnBuildInner(curPlan)); 00357 } 00358 00359 inline bool LhxJoinExecStream::returnProbe(LhxPlan *curPlan) 00360 { 00361 return (returnProbeInner(curPlan) || returnProbeOuter(curPlan)); 00362 } 00363 00364 inline bool LhxJoinExecStream::returnBuild(LhxPlan *curPlan) 00365 { 00366 return (returnBuildInner(curPlan) || returnBuildOuter(curPlan)); 00367 } 00368 00369 FENNEL_END_NAMESPACE 00370 00371 #endif 00372 00373