Fennel: /home/pub/open/dev/fennel/test/LhxJoinExecStreamTest.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 #include "fennel/common/CommonPreamble.h" 00024 #include "fennel/test/ExecStreamUnitTestBase.h" 00025 #include "fennel/hashexe/LhxJoinExecStream.h" 00026 #include "fennel/sorter/ExternalSortExecStream.h" 00027 #include "fennel/tuple/StandardTypeDescriptor.h" 00028 #include "fennel/exec/MockProducerExecStream.h" 00029 #include "fennel/exec/ExecStreamEmbryo.h" 00030 #include "fennel/exec/ExecStreamScheduler.h" 00031 #include "fennel/exec/ExecStreamGraph.h" 00032 #include "fennel/cache/Cache.h" 00033 00034 #include <boost/test/test_tools.hpp> 00035 00036 using namespace fennel; 00037 00038 class LhxJoinExecStreamTest : public ExecStreamUnitTestBase 00039 { 00040 void testSequentialImpl( 00041 uint numRows, 00042 uint forcePartitionLevel, 00043 bool enableJoinFilter, 00044 bool enableSubPartStat); 00045 00046 void testDupImpl( 00047 uint numRows, 00048 uint cndKeyLeft, 00049 uint cndKeyRight, 00050 uint forcePartitionLevel, 00051 bool enableJoinFilter, 00052 bool enableSubPartStat, 00053 bool needSort, 00054 bool fakeInterrupt); 00055 00056 void testImpl( 00057 uint numInputRows, 00058 uint keyCount, 00059 uint cndKeys, 00060 uint numResultRows, 00061 TupleDescriptor &inputDesc, 00062 TupleDescriptor &outputDesc, 00063 TupleProjection &outputProj, 00064 SharedMockProducerExecStreamGenerator pLeftGenerator, 00065 SharedMockProducerExecStreamGenerator pRightGenerator, 00066 CompositeExecStreamGenerator &verifier, 00067 uint forcePartitionLevel, 00068 bool enableJoinFilter, 00069 bool enableSubPartStat, 00070 bool needSort, 00071 bool fakeInterrupt); 00072 00073 public: 00074 explicit LhxJoinExecStreamTest() 00075 { 00076 FENNEL_UNIT_TEST_CASE(LhxJoinExecStreamTest,testSequential); 00077 FENNEL_UNIT_TEST_CASE(LhxJoinExecStreamTest,testDup1); 00078 FENNEL_UNIT_TEST_CASE(LhxJoinExecStreamTest,testDup2); 00079 FENNEL_UNIT_TEST_CASE(LhxJoinExecStreamTest,testConst); 00080 00081 00082 00083 00084 00085 00086 00087 00088 00089 00090 00091 00092 00093 00094 00095 00096 00097 00098 00099 00100 00101 00102 00103 00104 00105 00106 00107 00108 00109 00110 00111 00112 00113 00114 00115 00116 00117 00118 00119 FENNEL_UNIT_TEST_CASE( 00120 LhxJoinExecStreamTest, 00121 testSequentialPartitionFilterStat); 00122 FENNEL_UNIT_TEST_CASE( 00123 LhxJoinExecStreamTest, 00124 testDup1PartitionFilterStat); 00125 FENNEL_UNIT_TEST_CASE( 00126 LhxJoinExecStreamTest, 00127 testDup2PartitionFilterStat); 00128 FENNEL_UNIT_TEST_CASE( 00129 LhxJoinExecStreamTest, 00130 testConstPartitionFilterStat); 00131 00132 FENNEL_UNIT_TEST_CASE(LhxJoinExecStreamTest,testConstCleanup); 00133 } 00134 00135
00136 00137 00138 void testSequential(); 00139 00140 void testSequentialPartition(); 00141 void testSequentialPartitionFilter(); 00142 void testSequentialPartitionStat(); 00143 void testSequentialPartitionFilterStat(); 00144 00145
00146 00147 00148 00149 00150 00151 00152 void testDup1(); 00153 00154 void testDup1Partition(); 00155 void testDup1PartitionFilter(); 00156 void testDup1PartitionStat(); 00157 void testDup1PartitionFilterStat(); 00158 00159
00160 00161 00162 00163 00164 00165 00166 void testDup2(); 00167 00168 void testDup2Partition(); 00169 void testDup2PartitionFilter(); 00170 void testDup2PartitionStat(); 00171 void testDup2PartitionFilterStat(); 00172 00173
00174 00175 00176 00177 00178 00179 00180 void testConst(); 00181 void testConstPartition(); 00182 void testConstPartitionStat(); 00183 void testConstPartitionFilterStat(); 00184 void testConstCleanup(); 00185 }; 00186 00187 void LhxJoinExecStreamTest::testSequential() 00188 { 00189 testSequentialImpl(1000, 0, true, true); 00190 } 00191 00192 void LhxJoinExecStreamTest::testSequentialPartition() 00193 { 00194 testSequentialImpl(1000, 2, false, false); 00195 } 00196 00197 void LhxJoinExecStreamTest::testSequentialPartitionFilter() 00198 { 00199 testSequentialImpl(1000, 2, true, false); 00200 } 00201 00202 void LhxJoinExecStreamTest::testSequentialPartitionStat() 00203 { 00204 testSequentialImpl(1000, 2, false, true); 00205 } 00206 00207 void LhxJoinExecStreamTest::testSequentialPartitionFilterStat() 00208 { 00209 testSequentialImpl(1000, 2, true, true); 00210 } 00211 00212 void LhxJoinExecStreamTest::testDup1() 00213 { 00214 testDupImpl(960, 16, 60, 0, false, false, false, false); 00215 } 00216 00217 void LhxJoinExecStreamTest::testDup1Partition() 00218 { 00219 testDupImpl(960, 16, 60, 2, false, false, true, false); 00220 } 00221 00222 void LhxJoinExecStreamTest::testDup1PartitionFilter() 00223 { 00224 testDupImpl(960, 16, 60, 2, true, false, true, false); 00225 } 00226 00227 void LhxJoinExecStreamTest::testDup1PartitionStat() 00228 { 00229 testDupImpl(960, 16, 60, 2, false, true, true, false); 00230 } 00231 00232 void LhxJoinExecStreamTest::testDup1PartitionFilterStat() 00233 { 00234 testDupImpl(960, 16, 60, 2, true, true, true, false); 00235 } 00236 00237 void LhxJoinExecStreamTest::testDup2() 00238 { 00239 testDupImpl(960, 60, 16, 0, false, false, false, false); 00240 } 00241 00242 void LhxJoinExecStreamTest::testDup2Partition() 00243 { 00244 testDupImpl(960, 60, 16, 2, false, false, true, false); 00245 } 00246 00247 void LhxJoinExecStreamTest::testDup2PartitionFilter() 00248 { 00249 testDupImpl(960, 60, 16, 2, true, false, true, false); 00250 } 00251 00252 void LhxJoinExecStreamTest::testDup2PartitionStat() 00253 { 00254 testDupImpl(960, 60, 16, 2, false, true, true, false); 00255 } 00256 00257 void LhxJoinExecStreamTest::testDup2PartitionFilterStat() 00258 { 00259 testDupImpl(960, 60, 16, 2, true, true, true, false); 00260 } 00261 00262 void LhxJoinExecStreamTest::testConst() 00263 { 00264 testDupImpl(960, 1, 60, 0, false, false, false, false); 00265 } 00266 00267 void LhxJoinExecStreamTest::testConstPartition() 00268 { 00269 testDupImpl(960, 1, 60, 2, false, false, false, false); 00270 } 00271 00272 void LhxJoinExecStreamTest::testConstPartitionStat() 00273 { 00274 testDupImpl(960, 1, 60, 2, false, true, false, false); 00275 } 00276 00277 void LhxJoinExecStreamTest::testConstPartitionFilterStat() 00278 { 00279 testDupImpl(960, 1, 60, 2, true, true, false, false); 00280 } 00281 00282 void LhxJoinExecStreamTest::testConstCleanup() 00283 { 00284
00285 00286 00287 testDupImpl(960, 1, 60, 2, false, false, false, true); 00288 } 00289 00290 void LhxJoinExecStreamTest::testSequentialImpl( 00291 uint numRows, 00292 uint forcePartitionLevel, 00293 bool enableJoinFilter, 00294 bool enableSubPartStat) 00295 { 00296 uint numColsLeft; 00297 uint numColsRight; 00298 numColsRight = numColsLeft = 1; 00299 uint keyCount = 1; 00300 uint cndKeys = numRows; 00301 00302 assert (keyCount <= numColsRight && keyCount <= numColsLeft); 00303 00304 vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > > 00305 leftColumnGenerators; 00306 vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > > 00307 rightColumnGenerators; 00308 vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > > 00309 outColumnGenerators; 00310 00311 StandardTypeDescriptorFactory stdTypeFactory; 00312 TupleAttributeDescriptor attrDesc( 00313 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00314 00315 TupleDescriptor inputDesc; 00316 TupleDescriptor outputDesc; 00317 TupleProjection outputProj; 00318 00319 uint i; 00320 00321 for (i = 0; i < numColsLeft; i++) { 00322 leftColumnGenerators.push_back( 00323 SharedInt64ColumnGenerator(new SeqColumnGenerator())); 00324
00325 00326 00327 inputDesc.push_back(attrDesc); 00328 00329
00330 00331 00332 outColumnGenerators.push_back( 00333 SharedInt64ColumnGenerator(new SeqColumnGenerator())); 00334 outputDesc.push_back(attrDesc); 00335 outputProj.push_back(i); 00336 } 00337 00338 for (; i < numColsLeft + numColsRight; i++) { 00339 rightColumnGenerators.push_back( 00340 SharedInt64ColumnGenerator(new SeqColumnGenerator())); 00341 00342
00343 00344 00345 outColumnGenerators.push_back( 00346 SharedInt64ColumnGenerator(new SeqColumnGenerator())); 00347 outputDesc.push_back(attrDesc); 00348 outputProj.push_back(i); 00349 } 00350 00351 SharedMockProducerExecStreamGenerator pLeftGenerator( 00352 new CompositeExecStreamGenerator(leftColumnGenerators)); 00353 00354 SharedMockProducerExecStreamGenerator pRightGenerator( 00355 new CompositeExecStreamGenerator(rightColumnGenerators)); 00356 00357 CompositeExecStreamGenerator verifier(outColumnGenerators); 00358 00359 bool needSort = (forcePartitionLevel > 0) ? true : false; 00360 bool fakeInterrupt = false; 00361 00362 testImpl( 00363 numRows, keyCount, cndKeys, numRows, inputDesc, outputDesc, 00364 outputProj, pLeftGenerator, pRightGenerator, verifier, 00365 forcePartitionLevel, enableJoinFilter, enableSubPartStat, 00366 needSort, fakeInterrupt); 00367 } 00368 00369 void LhxJoinExecStreamTest::testDupImpl( 00370 uint numRows, 00371 uint cndKeyLeft, 00372 uint cndKeyRight, 00373 uint forcePartitionLevel, 00374 bool enableJoinFilter, 00375 bool enableSubPartStat, 00376 bool needSort, 00377 bool fakeInterrupt) 00378 { 00379 assert (!fakeInterrupt || !needSort); 00380 00381 uint numColsLeft; 00382 uint numColsRight; 00383 numColsRight = numColsLeft = 2; 00384 uint keyCount = 1; 00385 uint cndKeys; 00386 00387 assert (keyCount <= numColsRight && keyCount <= numColsLeft); 00388 00389 vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > > 00390 leftColumnGenerators; 00391 vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > > 00392 rightColumnGenerators; 00393 vector<boost::shared_ptr<ColumnGenerator< ::int64_t > > > 00394 outColumnGenerators; 00395 00396 StandardTypeDescriptorFactory stdTypeFactory; 00397 TupleAttributeDescriptor attrDesc( 00398 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00399 00400 TupleDescriptor inputDesc; 00401 TupleDescriptor outputDesc; 00402 TupleProjection outputProj; 00403 00404 uint i; 00405 00406 for (i = 0; i < numColsLeft; i++) { 00407 leftColumnGenerators.push_back( 00408 SharedInt64ColumnGenerator(new 00409 DupColumnGenerator(numRows / cndKeyLeft))); 00410 outColumnGenerators.push_back( 00411 SharedInt64ColumnGenerator(new 00412 DupColumnGenerator(numRows*numRows/cndKeyLeft/cndKeyRight))); 00413 00414 inputDesc.push_back(attrDesc); 00415 outputDesc.push_back(attrDesc); 00416 outputProj.push_back(i); 00417 } 00418 00419 for (; i < numColsLeft + numColsRight; i++) { 00420 rightColumnGenerators.push_back( 00421 SharedInt64ColumnGenerator(new 00422 DupColumnGenerator(numRows / cndKeyRight))); 00423 outColumnGenerators.push_back( 00424 SharedInt64ColumnGenerator(new 00425 DupColumnGenerator(numRows*numRows/cndKeyLeft/cndKeyRight))); 00426 00427 outputDesc.push_back(attrDesc); 00428 outputProj.push_back(i); 00429 } 00430 00431 cndKeys = cndKeyRight; 00432 00433 SharedMockProducerExecStreamGenerator pLeftGenerator( 00434 new CompositeExecStreamGenerator(leftColumnGenerators)); 00435 00436 SharedMockProducerExecStreamGenerator pRightGenerator( 00437 new CompositeExecStreamGenerator(rightColumnGenerators)); 00438 00439 CompositeExecStreamGenerator verifier(outColumnGenerators); 00440 00441 uint numResRows = (cndKeyLeft > cndKeyRight) ? 00442 (numRows * numRows / cndKeyLeft) : 00443 (numRows * numRows / cndKeyRight); 00444 00445 testImpl( 00446 numRows, keyCount, cndKeys, numResRows, inputDesc, outputDesc, 00447 outputProj, pLeftGenerator, pRightGenerator, verifier, 00448 forcePartitionLevel, enableJoinFilter, enableSubPartStat, needSort, 00449 fakeInterrupt); 00450 } 00451 00452 void LhxJoinExecStreamTest::testImpl( 00453 uint numInputRows, uint keyCount, uint cndKeys, uint numResultRows, 00454 TupleDescriptor &inputDesc, TupleDescriptor &outputDesc, 00455 TupleProjection &outputProj, 00456 SharedMockProducerExecStreamGenerator pLeftGenerator, 00457 SharedMockProducerExecStreamGenerator pRightGenerator, 00458 CompositeExecStreamGenerator &verifier, 00459 uint forcePartitionLevel, bool enableJoinFilter, bool enableSubPartStat, 00460 bool needSort, bool fakeInterrupt) 00461 { 00462 TupleProjection leftKeyProj; 00463 TupleProjection rightKeyProj; 00464 00465
00466 00467 00468 MockProducerExecStreamParams mockParams; 00469 mockParams.outputTupleDesc = inputDesc; 00470 mockParams.nRows = numInputRows; 00471 00472 mockParams.pGenerator = pLeftGenerator; 00473 ExecStreamEmbryo leftInputStreamEmbryo; 00474 leftInputStreamEmbryo.init(new MockProducerExecStream(),mockParams); 00475 leftInputStreamEmbryo.getStream()->setName("LeftInputExecStream"); 00476 00477
00478 00479 00480 mockParams.pGenerator = pRightGenerator; 00481 ExecStreamEmbryo rightInputStreamEmbryo; 00482 rightInputStreamEmbryo.init(new MockProducerExecStream(),mockParams); 00483 rightInputStreamEmbryo.getStream()->setName("RightInputExecStream"); 00484 00485
00486 00487 00488 LhxJoinExecStreamParams joinParams; 00489
00490 00491 00492 joinParams.leftInner = true; 00493 joinParams.leftOuter = false; 00494 joinParams.rightInner = true; 00495 joinParams.rightOuter = false; 00496 00497 joinParams.setopAll = false; 00498 joinParams.setopDistinct = false; 00499 00500 joinParams.forcePartitionLevel = forcePartitionLevel; 00501 joinParams.enableJoinFilter = enableJoinFilter; 00502 joinParams.enableSubPartStat = enableSubPartStat; 00503 joinParams.enableSwing = true; 00504 00505 joinParams.outputProj = outputProj; 00506 joinParams.cndKeys = cndKeys; 00507 joinParams.numRows = numInputRows; 00508 00509 for (int i = 0; i < keyCount; i ++) { 00510 joinParams.leftKeyProj.push_back(i); 00511 joinParams.rightKeyProj.push_back(i); 00512 } 00513 00514
00515 00516 00517 joinParams.outputTupleDesc = outputDesc; 00518
00519 00520 00521 joinParams.pCacheAccessor = pCache; 00522 int cacheSize = 100; 00523 joinParams.scratchAccessor = 00524 pSegmentFactory->newScratchSegment(pCache, cacheSize); 00525 joinParams.pTempSegment = pRandomSegment; 00526 00527 ExecStreamEmbryo joinStreamEmbryo; 00528 joinStreamEmbryo.init(new LhxJoinExecStream(),joinParams); 00529 joinStreamEmbryo.getStream()->setName("LhxJoinExecStream"); 00530 00531 SharedExecStream pOutputStream; 00532 00533 if (needSort) { 00534 ExternalSortExecStreamParams sortParams; 00535 sortParams.outputTupleDesc = outputDesc; 00536 sortParams.distinctness = DUP_ALLOW; 00537 sortParams.pTempSegment = pRandomSegment; 00538 sortParams.pCacheAccessor = pCache; 00539 sortParams.scratchAccessor = 00540 pSegmentFactory->newScratchSegment(pCache, 10); 00541 sortParams.keyProj.push_back(0); 00542 sortParams.storeFinalRun = false; 00543 sortParams.estimatedNumRows = MAXU; 00544 sortParams.earlyClose = false; 00545 ExecStreamEmbryo sortStreamEmbryo; 00546 sortStreamEmbryo.init( 00547 ExternalSortExecStream::newExternalSortExecStream(),sortParams); 00548 sortStreamEmbryo.getStream()->setName("ExternalSortExecStream"); 00549 00550 pOutputStream = prepareConfluenceTransformGraph( 00551 leftInputStreamEmbryo, rightInputStreamEmbryo, joinStreamEmbryo, 00552 sortStreamEmbryo); 00553 } else { 00554 pOutputStream = prepareConfluenceGraph( 00555 leftInputStreamEmbryo, rightInputStreamEmbryo, joinStreamEmbryo); 00556 } 00557 00558
00559
00560 00561 verifyOutput( 00562 *pOutputStream, 00563 fakeInterrupt ? 1 : numResultRows, 00564 verifier, 00565 fakeInterrupt); 00566 00567 if (fakeInterrupt) { 00568
00569 pScheduler->stop(); 00570 pGraph->close(); 00571 } 00572 00573 BOOST_CHECK_EQUAL(0, pRandomSegment->getAllocatedSizeInPages()); 00574 } 00575 00576 FENNEL_UNIT_TEST_SUITE(LhxJoinExecStreamTest); 00577 00578