Fennel: /home/pub/open/dev/fennel/test/ExecStreamTestSuite.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/common/FemEnums.h" 00026 #include "fennel/btree/BTreeDescriptor.h" 00027 #include "fennel/btree/BTreeBuilder.h" 00028 #include "fennel/btree/BTreeReader.h" 00029 #include "fennel/test/ExecStreamTestSuite.h" 00030 #include "fennel/exec/ExecStreamScheduler.h" 00031 #include "fennel/exec/ExecStream.h" 00032 #include "fennel/exec/ExecStreamGraph.h" 00033 #include "fennel/exec/ExecStreamBufAccessor.h" 00034 #include "fennel/exec/MockProducerExecStream.h" 00035 #include "fennel/exec/ScratchBufferExecStream.h" 00036 #include "fennel/exec/DoubleBufferExecStream.h" 00037 #include "fennel/exec/CopyExecStream.h" 00038 #include "fennel/exec/MergeExecStream.h" 00039 #include "fennel/exec/SegBufferExecStream.h" 00040 #include "fennel/exec/SegBufferReaderExecStream.h" 00041 #include "fennel/exec/SegBufferWriterExecStream.h" 00042 #include "fennel/exec/CartesianJoinExecStream.h" 00043 #include "fennel/exec/SortedAggExecStream.h" 00044 #include "fennel/exec/ReshapeExecStream.h" 00045 #include "fennel/exec/SplitterExecStream.h" 00046 #include "fennel/exec/BarrierExecStream.h" 00047 #include "fennel/exec/ValuesExecStream.h" 00048 #include "fennel/exec/NestedLoopJoinExecStream.h" 00049 #include "fennel/exec/ExecStreamEmbryo.h" 00050 #include "fennel/tuple/StandardTypeDescriptor.h" 00051 #include "fennel/ftrs/BTreeInsertExecStream.h" 00052 00053 using namespace fennel; 00054 00055 uint ExecStreamTestSuite::getDegreeOfParallelism() 00056 { 00057 return 1; 00058 } 00059 00060 void ExecStreamTestSuite::testScratchBufferExecStream() 00061 { 00062 StandardTypeDescriptorFactory stdTypeFactory; 00063 TupleAttributeDescriptor attrDesc( 00064 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00065 00066 MockProducerExecStreamParams mockParams; 00067 mockParams.outputTupleDesc.push_back(attrDesc); 00068 mockParams.nRows = 5000;
00069 mockParams.pGenerator.reset(new RampExecStreamGenerator()); 00070 00071 ExecStreamEmbryo mockStreamEmbryo; 00072 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams); 00073 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 00074 00075 ScratchBufferExecStreamParams bufParams; 00076 bufParams.scratchAccessor = 00077 pSegmentFactory->newScratchSegment(pCache,1); 00078 00079 ExecStreamEmbryo bufStreamEmbryo; 00080 bufStreamEmbryo.init(new ScratchBufferExecStream(),bufParams); 00081 bufStreamEmbryo.getStream()->setName("ScratchBufferExecStream"); 00082 00083 SharedExecStream pOutputStream = prepareTransformGraph( 00084 mockStreamEmbryo, bufStreamEmbryo); 00085 00086 verifyOutput( 00087 *pOutputStream, 00088 mockParams.nRows, 00089 *(mockParams.pGenerator)); 00090 } 00091 00092 00093 void ExecStreamTestSuite::testDoubleBufferExecStream() 00094 { 00095 StandardTypeDescriptorFactory stdTypeFactory; 00096 TupleAttributeDescriptor attrDesc( 00097 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00098 00099 MockProducerExecStreamParams mockParams; 00100 mockParams.outputTupleDesc.push_back(attrDesc); 00101 mockParams.nRows = 25000;
00102 mockParams.pGenerator.reset(new RampExecStreamGenerator()); 00103 00104 ExecStreamEmbryo mockStreamEmbryo; 00105 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams); 00106 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 00107 00108 DoubleBufferExecStreamParams bufParams; 00109 bufParams.scratchAccessor = 00110 pSegmentFactory->newScratchSegment(pCache,1); 00111 00112 ExecStreamEmbryo bufStreamEmbryo; 00113 bufStreamEmbryo.init(new DoubleBufferExecStream(),bufParams); 00114 bufStreamEmbryo.getStream()->setName("DoubleBufferExecStream"); 00115 00116 SharedExecStream pOutputStream = prepareTransformGraph( 00117 mockStreamEmbryo, bufStreamEmbryo); 00118 00119 verifyOutput( 00120 *pOutputStream, 00121 mockParams.nRows, 00122 *(mockParams.pGenerator)); 00123 } 00124 00125 void ExecStreamTestSuite::testCopyExecStream() 00126 { 00127 StandardTypeDescriptorFactory stdTypeFactory; 00128 TupleAttributeDescriptor attrDesc( 00129 stdTypeFactory.newDataType(STANDARD_TYPE_INT_32)); 00130 00131 MockProducerExecStreamParams mockParams; 00132 mockParams.outputTupleDesc.push_back(attrDesc); 00133 mockParams.nRows = 10000;
00134 00135 ExecStreamEmbryo mockStreamEmbryo; 00136 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams); 00137 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 00138 00139 CopyExecStreamParams copyParams; 00140 copyParams.outputTupleDesc.push_back(attrDesc); 00141 00142 ExecStreamEmbryo copyStreamEmbryo; 00143 copyStreamEmbryo.init(new CopyExecStream(),copyParams); 00144 copyStreamEmbryo.getStream()->setName("CopyExecStream"); 00145 00146 SharedExecStream pOutputStream = prepareTransformGraph( 00147 mockStreamEmbryo,copyStreamEmbryo); 00148 00149 int32_t zero = 0; 00150 TupleDescriptor expectedDesc; 00151 expectedDesc.push_back(attrDesc); 00152 TupleData expectedTuple; 00153 expectedTuple.compute(expectedDesc); 00154 expectedTuple[0].pData = reinterpret_cast(&zero); 00155 verifyConstantOutput( 00156 *pOutputStream, 00157 expectedTuple, 00158 mockParams.nRows); 00159 } 00160 00161 void ExecStreamTestSuite::testMergeExecStream() 00162 { 00163
00164 00165 StandardTypeDescriptorFactory stdTypeFactory; 00166 TupleAttributeDescriptor attrDesc( 00167 stdTypeFactory.newDataType(STANDARD_TYPE_INT_32)); 00168 00169 MockProducerExecStreamParams paramsMock; 00170 paramsMock.outputTupleDesc.push_back(attrDesc); 00171 paramsMock.nRows = 10000; 00172 00173 ExecStreamEmbryo mockStreamEmbryo1; 00174 mockStreamEmbryo1.init(new MockProducerExecStream(),paramsMock); 00175 mockStreamEmbryo1.getStream()->setName("MockProducerExecStream1"); 00176 00177 ExecStreamEmbryo mockStreamEmbryo2; 00178 mockStreamEmbryo2.init(new MockProducerExecStream(),paramsMock); 00179 mockStreamEmbryo2.getStream()->setName("MockProducerExecStream2"); 00180 00181 MergeExecStreamParams paramsMerge; 00182 paramsMerge.outputTupleDesc.push_back(attrDesc); 00183 if (getDegreeOfParallelism() != 1) { 00184 paramsMerge.isParallel = true; 00185 } 00186 00187 ExecStreamEmbryo mergeStreamEmbryo; 00188 mergeStreamEmbryo.init(new MergeExecStream(),paramsMerge); 00189 mergeStreamEmbryo.getStream()->setName("MergeExecStream"); 00190 00191 SharedExecStream pOutputStream = prepareConfluenceGraph( 00192 mockStreamEmbryo1, 00193 mockStreamEmbryo2, 00194 mergeStreamEmbryo); 00195 00196 int32_t zero = 0; 00197 TupleDescriptor expectedDesc; 00198 expectedDesc.push_back(attrDesc); 00199 TupleData expectedTuple; 00200 expectedTuple.compute(expectedDesc); 00201 expectedTuple[0].pData = reinterpret_cast(&zero); 00202 verifyConstantOutput( 00203 pOutputStream, 00204 expectedTuple, 00205 2paramsMock.nRows); 00206 } 00207 00208 void ExecStreamTestSuite::testSegBufferExecStream() 00209 { 00210 StandardTypeDescriptorFactory stdTypeFactory; 00211 TupleAttributeDescriptor attrDesc( 00212 stdTypeFactory.newDataType(STANDARD_TYPE_INT_32)); 00213 00214 MockProducerExecStreamParams mockParams; 00215 mockParams.outputTupleDesc.push_back(attrDesc); 00216 mockParams.nRows = 10000;
00217 00218 ExecStreamEmbryo mockStreamEmbryo; 00219 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams); 00220 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 00221 00222 SegBufferExecStreamParams bufParams; 00223 bufParams.scratchAccessor.pSegment = pRandomSegment; 00224 bufParams.scratchAccessor.pCacheAccessor = pCacheAccessor; 00225 bufParams.multipass = false; 00226 00227 ExecStreamEmbryo bufStreamEmbryo; 00228 bufStreamEmbryo.init(new SegBufferExecStream(),bufParams); 00229 bufStreamEmbryo.getStream()->setName("SegBufferExecStream"); 00230 00231 SharedExecStream pOutputStream = prepareTransformGraph( 00232 mockStreamEmbryo, bufStreamEmbryo); 00233 00234 int32_t zero = 0; 00235 TupleDescriptor expectedDesc; 00236 expectedDesc.push_back(attrDesc); 00237 TupleData expectedTuple; 00238 expectedTuple.compute(expectedDesc); 00239 expectedTuple[0].pData = reinterpret_cast(&zero); 00240 verifyConstantOutput( 00241 *pOutputStream, 00242 expectedTuple, 00243 mockParams.nRows); 00244 } 00245 00246 void ExecStreamTestSuite::testCartesianJoinExecStream( 00247 uint nRowsOuter,uint nRowsInner) 00248 { 00249
00250 00251 StandardTypeDescriptorFactory stdTypeFactory; 00252 TupleAttributeDescriptor attrDesc( 00253 stdTypeFactory.newDataType(STANDARD_TYPE_INT_32)); 00254 00255 MockProducerExecStreamParams paramsMockOuter; 00256 paramsMockOuter.outputTupleDesc.push_back(attrDesc); 00257 paramsMockOuter.nRows = nRowsOuter; 00258 00259 ExecStreamEmbryo outerStreamEmbryo; 00260 outerStreamEmbryo.init(new MockProducerExecStream(),paramsMockOuter); 00261 outerStreamEmbryo.getStream()->setName("OuterProducerExecStream"); 00262 00263 MockProducerExecStreamParams paramsMockInner(paramsMockOuter); 00264 paramsMockInner.nRows = nRowsInner; 00265 00266 ExecStreamEmbryo innerStreamEmbryo; 00267 innerStreamEmbryo.init(new MockProducerExecStream(),paramsMockInner); 00268 innerStreamEmbryo.getStream()->setName("InnerProducerExecStream"); 00269 00270 CartesianJoinExecStreamParams paramsJoin; 00271 paramsJoin.leftOuter = false; 00272 00273 ExecStreamEmbryo joinStreamEmbryo; 00274 joinStreamEmbryo.init(new CartesianJoinExecStream(),paramsJoin); 00275 joinStreamEmbryo.getStream()->setName("CartesianJoinExecStream"); 00276 00277 SharedExecStream pOutputStream = prepareConfluenceGraph( 00278 outerStreamEmbryo, 00279 innerStreamEmbryo, 00280 joinStreamEmbryo); 00281 00282 int32_t zero = 0; 00283 TupleDescriptor expectedDesc; 00284 expectedDesc.push_back(attrDesc); 00285 expectedDesc.push_back(attrDesc); 00286 TupleData expectedTuple; 00287 expectedTuple.compute(expectedDesc); 00288 expectedTuple[0].pData = reinterpret_cast(&zero); 00289 expectedTuple[1].pData = reinterpret_cast(&zero); 00290 verifyConstantOutput( 00291 pOutputStream, 00292 expectedTuple, 00293 nRowsOuternRowsInner); 00294 } 00295 00296 void ExecStreamTestSuite::testCountAggExecStream() 00297 { 00298 StandardTypeDescriptorFactory stdTypeFactory; 00299 TupleAttributeDescriptor attrDesc( 00300 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00301 00302 MockProducerExecStreamParams mockParams; 00303 mockParams.outputTupleDesc.push_back(attrDesc); 00304 mockParams.nRows = 10000;
00305 00306 ExecStreamEmbryo mockStreamEmbryo; 00307 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams); 00308 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 00309 00310
00311 SortedAggExecStreamParams aggParams; 00312 aggParams.groupByKeyCount = 0; 00313 aggParams.outputTupleDesc.push_back(attrDesc); 00314 AggInvocation countInvocation; 00315 countInvocation.aggFunction = AGG_FUNC_COUNT; 00316 countInvocation.iInputAttr = -1; 00317 aggParams.aggInvocations.push_back(countInvocation); 00318 00319 ExecStreamEmbryo aggStreamEmbryo; 00320 aggStreamEmbryo.init(new SortedAggExecStream(),aggParams); 00321 aggStreamEmbryo.getStream()->setName("SortedAggExecStream"); 00322 00323 SharedExecStream pOutputStream = prepareTransformGraph( 00324 mockStreamEmbryo,aggStreamEmbryo); 00325 00326
00327
00328 RampExecStreamGenerator expectedResultGenerator(mockParams.nRows); 00329 00330 verifyOutput(*pOutputStream, 1, expectedResultGenerator); 00331 } 00332 00333 void ExecStreamTestSuite::testSumAggExecStream() 00334 { 00335 StandardTypeDescriptorFactory stdTypeFactory; 00336 TupleAttributeDescriptor attrDesc( 00337 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00338 00339 MockProducerExecStreamParams mockParams; 00340 mockParams.outputTupleDesc.push_back(attrDesc); 00341 mockParams.nRows = 10000;
00342 mockParams.pGenerator.reset(new RampExecStreamGenerator()); 00343 00344 ExecStreamEmbryo mockStreamEmbryo; 00345 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams); 00346 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 00347 00348
00349 SortedAggExecStreamParams aggParams; 00350 aggParams.groupByKeyCount = 0; 00351 attrDesc.isNullable = true; 00352 aggParams.outputTupleDesc.push_back(attrDesc); 00353 AggInvocation sumInvocation; 00354 sumInvocation.aggFunction = AGG_FUNC_SUM; 00355 sumInvocation.iInputAttr = 0; 00356 aggParams.aggInvocations.push_back(sumInvocation); 00357 00358 ExecStreamEmbryo aggStreamEmbryo; 00359 aggStreamEmbryo.init(new SortedAggExecStream(),aggParams); 00360 aggStreamEmbryo.getStream()->setName("SortedAggExecStream"); 00361 00362 SharedExecStream pOutputStream = prepareTransformGraph( 00363 mockStreamEmbryo,aggStreamEmbryo); 00364 00365
00366
00367 RampExecStreamGenerator expectedResultGenerator( 00368 (mockParams.nRows-1)*mockParams.nRows/2); 00369 00370 verifyOutput(*pOutputStream, 1, expectedResultGenerator); 00371 } 00372 00373 void ExecStreamTestSuite::testGroupAggExecStreamNrows(uint nrows) 00374 { 00375 StandardTypeDescriptorFactory stdTypeFactory; 00376 TupleAttributeDescriptor attrDesc( 00377 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00378 00379
00380 MockProducerExecStreamParams mockParams; 00381 mockParams.outputTupleDesc.push_back(attrDesc); 00382 mockParams.outputTupleDesc.push_back(attrDesc); 00383 mockParams.nRows = nrows;
00384 mockParams.pGenerator.reset(new RampDuplicateExecStreamGenerator()); 00385 00386 ExecStreamEmbryo mockStreamEmbryo; 00387 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams); 00388 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 00389 00390
00391 SortedAggExecStreamParams aggParams; 00392 aggParams.groupByKeyCount = 1; 00393 aggParams.outputTupleDesc.push_back(attrDesc); 00394 aggParams.outputTupleDesc.push_back(attrDesc); 00395 AggInvocation countInvocation; 00396 countInvocation.aggFunction = AGG_FUNC_COUNT; 00397 countInvocation.iInputAttr = -1; 00398 aggParams.aggInvocations.push_back(countInvocation); 00399 00400 ExecStreamEmbryo aggStreamEmbryo; 00401 00402 aggStreamEmbryo.init(new SortedAggExecStream(),aggParams); 00403 aggStreamEmbryo.getStream()->setName("SortedAggExecStream"); 00404 00405 SharedExecStream pOutputStream = prepareTransformGraph( 00406 mockStreamEmbryo,aggStreamEmbryo); 00407 00408
00409
00410 vector<boost::shared_ptr<ColumnGenerator > > columnGenerators; 00411 00412 SharedInt64ColumnGenerator col = 00413 SharedInt64ColumnGenerator(new SeqColumnGenerator()); 00414 columnGenerators.push_back(col); 00415 00416 col = SharedInt64ColumnGenerator(new ConstColumnGenerator(2)); 00417 columnGenerators.push_back(col); 00418 00419 CompositeExecStreamGenerator expectedResultGenerator(columnGenerators); 00420 00421 verifyOutput(*pOutputStream, mockParams.nRows/2, expectedResultGenerator); 00422 } 00423 00424 void ExecStreamTestSuite::testReshapeExecStream( 00425 bool filter, bool cast, uint expectedNRows, int expectedStart, 00426 bool compareParam, 00427 std::hash_set const &outputParams) 00428 { 00429 assert(!compareParam || filter == compareParam); 00430 StandardTypeDescriptorFactory stdTypeFactory; 00431 TupleAttributeDescriptor nullAttrDesc( 00432 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64), 00433 true, sizeof(int64_t)); 00434 TupleAttributeDescriptor notNullAttrDesc( 00435 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00436 00437
00438
00439
00440
00441
00442
00443
00444 MockProducerExecStreamParams mockParams; 00445 for (int i = 0; i < 6; i++) { 00446 mockParams.outputTupleDesc.push_back(notNullAttrDesc); 00447 } 00448 vector<boost::shared_ptr<ColumnGenerator > > columnGenerators; 00449 SharedInt64ColumnGenerator colGen; 00450 for (int i = 0; i < 4; i++) { 00451 colGen = SharedInt64ColumnGenerator(new SeqColumnGenerator(i)); 00452 columnGenerators.push_back(colGen); 00453 } 00454 colGen = SharedInt64ColumnGenerator(new DupColumnGenerator(25, 0)); 00455 columnGenerators.push_back(colGen); 00456 colGen = SharedInt64ColumnGenerator(new DupColumnGenerator(10, 0)); 00457 columnGenerators.push_back(colGen); 00458 mockParams.nRows = 1000; 00459 mockParams.pGenerator.reset( 00460 new CompositeExecStreamGenerator(columnGenerators)); 00461 00462 ExecStreamEmbryo mockStreamEmbryo; 00463 mockStreamEmbryo.init(new MockProducerExecStream(),mockParams); 00464 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 00465 00466
00467
00468
00469
00470
00471
00472
00473
00474 ReshapeExecStreamParams rsParams; 00475 boost::shared_array pBuffer; 00476 std::vector paramVals; 00477 paramVals.push_back(10); 00478 paramVals.push_back(20); 00479 paramVals.push_back(50); 00480 if (!filter) { 00481 rsParams.compOp = COMP_NOOP; 00482 } else { 00483 rsParams.compOp = COMP_EQ; 00484 TupleDescriptor compareDesc; 00485
00486 compareDesc.push_back(nullAttrDesc); 00487 if (!compareParam) { 00488 compareDesc.push_back(nullAttrDesc); 00489 } 00490 TupleData compareData; 00491 compareData.compute(compareDesc); 00492 if (compareParam) { 00493 compareData[0].pData = (PConstBuffer) &paramVals[1]; 00494 } else { 00495 compareData[0].pData = (PConstBuffer) &paramVals[1]; 00496 compareData[1].pData = (PConstBuffer) &paramVals[2]; 00497 } 00498 TupleAccessor tupleAccessor; 00499 tupleAccessor.compute(compareDesc); 00500 pBuffer.reset(new FixedBuffer[tupleAccessor.getMaxByteCount()]); 00501 tupleAccessor.marshal(compareData, pBuffer.get()); 00502 } 00503 rsParams.pCompTupleBuffer = pBuffer; 00504 00505 TupleProjection tupleProj; 00506 tupleProj.push_back(4); 00507 tupleProj.push_back(5); 00508 rsParams.inputCompareProj = tupleProj; 00509 00510 tupleProj.clear(); 00511 tupleProj.push_back(3); 00512 tupleProj.push_back(0); 00513 tupleProj.push_back(2); 00514 rsParams.outputProj = tupleProj; 00515 00516 for (int i = 0; i < 3; i++) { 00517 if (cast) { 00518 rsParams.outputTupleDesc.push_back(nullAttrDesc); 00519 } else { 00520 rsParams.outputTupleDesc.push_back(notNullAttrDesc); 00521 } 00522 } 00523 00524
00525
00526
00527 std::vector dynamicParams; 00528 if (compareParam || outputParams.size() > 0) { 00529 for (uint i = 1; i < paramVals.size() + 1; i++) { 00530 SharedDynamicParamManager pDynamicParamManager = 00531 pGraph->getDynamicParamManager(); 00532 pDynamicParamManager->createParam( 00533 DynamicParamId(i), 00534 notNullAttrDesc); 00535 TupleDatum paramValDatum; 00536 paramValDatum.pData = (PConstBuffer) &(paramVals[i - 1]); 00537 paramValDatum.cbData = sizeof(int64_t); 00538 pDynamicParamManager->writeParam( 00539 DynamicParamId(i), 00540 paramValDatum); 00541 dynamicParams.push_back( 00542 ReshapeParameter( 00543 DynamicParamId(i), 00544 ((i == 3) && compareParam) ? uint(5) : MAXU, 00545 (outputParams.find(i - 1) != outputParams.end()))); 00546 } 00547 } 00548 rsParams.dynamicParameters = dynamicParams; 00549 00550
00551 columnGenerators.clear(); 00552 colGen = SharedInt64ColumnGenerator( 00553 new SeqColumnGenerator(expectedStart + 3)); 00554 columnGenerators.push_back(colGen); 00555 colGen = SharedInt64ColumnGenerator( 00556 new SeqColumnGenerator(expectedStart)); 00557 columnGenerators.push_back(colGen); 00558 colGen = SharedInt64ColumnGenerator( 00559 new SeqColumnGenerator(expectedStart + 2)); 00560 columnGenerators.push_back(colGen); 00561 for (uint i = 0; i < dynamicParams.size(); i++) { 00562 if (dynamicParams[i].outputParam) { 00563 colGen = 00564 SharedInt64ColumnGenerator( 00565 new ConstColumnGenerator( 00566 paramVals[opaqueToInt( 00567 dynamicParams[i].dynamicParamId) - 1])); 00568 columnGenerators.push_back(colGen); 00569 rsParams.outputTupleDesc.push_back(notNullAttrDesc); 00570 } 00571 } 00572 00573 ExecStreamEmbryo rsStreamEmbryo; 00574 rsStreamEmbryo.init(new ReshapeExecStream(),rsParams); 00575 rsStreamEmbryo.getStream()->setName("ReshapeExecStream"); 00576 SharedExecStream pOutputStream = prepareTransformGraph( 00577 mockStreamEmbryo, rsStreamEmbryo); 00578 00579 CompositeExecStreamGenerator resultGenerator(columnGenerators); 00580 verifyOutput(*pOutputStream, expectedNRows, resultGenerator); 00581 } 00582 00583 void ExecStreamTestSuite::testSingleValueAggExecStream() 00584 { 00585 StandardTypeDescriptorFactory stdTypeFactory; 00586 TupleAttributeDescriptor attrDesc( 00587 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00588 TupleAttributeDescriptor attrDescNullable( 00589 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64), true, 00590 sizeof(int64_t)); 00591 00592
00593
00594 vector<boost::shared_ptr<ColumnGenerator > > columnGeneratorsIn; 00595 00596 SharedInt64ColumnGenerator col = 00597 SharedInt64ColumnGenerator(new DupColumnGenerator(1)); 00598 columnGeneratorsIn.push_back(col); 00599 00600
00601 MockProducerExecStreamParams mockParams; 00602 mockParams.outputTupleDesc.push_back(attrDesc); 00603 mockParams.nRows = 10; 00604 mockParams.pGenerator.reset( 00605 new CompositeExecStreamGenerator(columnGeneratorsIn)); 00606 00607 ExecStreamEmbryo mockStreamEmbryo; 00608 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams); 00609 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 00610 00611
00612 SortedAggExecStreamParams aggParams; 00613 aggParams.groupByKeyCount = 1; 00614 aggParams.outputTupleDesc.push_back(attrDesc); 00615 aggParams.outputTupleDesc.push_back(attrDescNullable); 00616 AggInvocation singleValueInvocation; 00617 singleValueInvocation.aggFunction = AGG_FUNC_SINGLE_VALUE; 00618 singleValueInvocation.iInputAttr = 0; 00619 aggParams.aggInvocations.push_back(singleValueInvocation); 00620 00621 ExecStreamEmbryo aggStreamEmbryo; 00622 00623 aggStreamEmbryo.init(new SortedAggExecStream(),aggParams); 00624 aggStreamEmbryo.getStream()->setName("SortedAggExecStream"); 00625 00626 SharedExecStream pOutputStream = prepareTransformGraph( 00627 mockStreamEmbryo,aggStreamEmbryo); 00628 00629
00630 vector<boost::shared_ptr<ColumnGenerator > > columnGeneratorsOut; 00631 00632 col = 00633 SharedInt64ColumnGenerator(new DupColumnGenerator(1)); 00634 columnGeneratorsOut.push_back(col); 00635 00636 col = 00637 SharedInt64ColumnGenerator(new DupColumnGenerator(1)); 00638 columnGeneratorsOut.push_back(col); 00639 00640 CompositeExecStreamGenerator expectedResultGenerator(columnGeneratorsOut); 00641 00642 verifyOutput(*pOutputStream, mockParams.nRows, expectedResultGenerator); 00643 } 00644 00645 void ExecStreamTestSuite::testMergeImplicitPullInputs() 00646 { 00647
00648
00649
00650
00651
00652 00653 StandardTypeDescriptorFactory stdTypeFactory; 00654 TupleAttributeDescriptor attrDesc( 00655 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00656 TupleAttributeDescriptor nullAttrDesc( 00657 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64), 00658 true, sizeof(int64_t)); 00659 00660
00661
00662
00663
00664 00665 MockProducerExecStreamParams mockParams; 00666 mockParams.outputTupleDesc.push_back(attrDesc); 00667
00668 uint nInputs = 5; 00669 uint nRows = nInputs * 4000; 00670 mockParams.nRows = nRows; 00671 vector<boost::shared_ptr<ColumnGenerator > > columnGenerator; 00672 columnGenerator.push_back( 00673 SharedInt64ColumnGenerator( 00674 new DupRepeatingSeqColumnGenerator(nInputs, 1))); 00675 mockParams.pGenerator.reset( 00676 new CompositeExecStreamGenerator(columnGenerator)); 00677 00678 ExecStreamEmbryo mockStreamEmbryo; 00679 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams); 00680 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 00681 00682
00683
00684
00685 00686 SplitterExecStreamParams splitterParams; 00687 ExecStreamEmbryo splitterStreamEmbryo; 00688 splitterStreamEmbryo.init(new SplitterExecStream(), splitterParams); 00689 splitterStreamEmbryo.getStream()->setName("SplitterExecStream"); 00690 00691 vector<vector > reshapeEmbryoStreamList; 00692 for (int i = 0; i < nInputs; i++) { 00693 ReshapeExecStreamParams rsParams; 00694 boost::shared_array pBuffer; 00695 rsParams.compOp = COMP_EQ; 00696 int64_t key = i; 00697 TupleDescriptor compareDesc; 00698
00699 compareDesc.push_back(nullAttrDesc); 00700 TupleData compareData; 00701 compareData.compute(compareDesc); 00702 compareData[0].pData = (PConstBuffer) &key; 00703 TupleAccessor tupleAccessor; 00704 tupleAccessor.compute(compareDesc); 00705 pBuffer.reset(new FixedBuffer[tupleAccessor.getMaxByteCount()]); 00706 tupleAccessor.marshal(compareData, pBuffer.get()); 00707 rsParams.pCompTupleBuffer = pBuffer; 00708 TupleProjection tupleProj; 00709 tupleProj.push_back(0); 00710 rsParams.inputCompareProj = tupleProj; 00711 rsParams.outputProj = tupleProj; 00712 00713 ExecStreamEmbryo rsStreamEmbryo; 00714 rsStreamEmbryo.init(new ReshapeExecStream(), rsParams); 00715 std::ostringstream oss; 00716 oss << "ReshapeExecStream" << "#" << i; 00717 rsStreamEmbryo.getStream()->setName(oss.str()); 00718 00719 vector reshapeStreamEmbryo; 00720 reshapeStreamEmbryo.push_back(rsStreamEmbryo); 00721 00722
00723
00724
00725 if (i != 0) { 00726 SegBufferExecStreamParams bufParams; 00727 bufParams.scratchAccessor.pSegment = pRandomSegment; 00728 bufParams.scratchAccessor.pCacheAccessor = pCacheAccessor; 00729 bufParams.multipass = false; 00730 00731 ExecStreamEmbryo bufStreamEmbryo; 00732 bufStreamEmbryo.init(new SegBufferExecStream(), bufParams); 00733 std::ostringstream oss; 00734 oss << "SegBufferExecStream" << "#" << i; 00735 bufStreamEmbryo.getStream()->setName(oss.str()); 00736 00737 reshapeStreamEmbryo.push_back(bufStreamEmbryo); 00738 } 00739 reshapeEmbryoStreamList.push_back(reshapeStreamEmbryo); 00740 } 00741 00742
00743 MergeExecStreamParams mergeParams; 00744 mergeParams.outputTupleDesc.push_back(attrDesc); 00745 00746 ExecStreamEmbryo mergeStreamEmbryo; 00747 mergeStreamEmbryo.init(new MergeExecStream(), mergeParams); 00748 mergeStreamEmbryo.getStream()->setName("MergeExecStream"); 00749 00750 SharedExecStream pOutputStream = 00751 prepareDAG( 00752 mockStreamEmbryo, 00753 splitterStreamEmbryo, 00754 reshapeEmbryoStreamList, 00755 mergeStreamEmbryo); 00756 00757
00758
00759 StairCaseExecStreamGenerator expectedResultGenerator(1, nRows / nInputs); 00760 00761 verifyOutput(*pOutputStream, nRows, expectedResultGenerator); 00762 } 00763 00764 void ExecStreamTestSuite::testBTreeInsertExecStream( 00765 bool useDynamicBTree, 00766 uint nRows) 00767 { 00768 StandardTypeDescriptorFactory stdTypeFactory; 00769 TupleAttributeDescriptor attrDesc( 00770 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00771 00772
00773
00774
00775 MockProducerExecStreamParams mockParams; 00776 mockParams.nRows = nRows; 00777 mockParams.outputTupleDesc.push_back(attrDesc); 00778 mockParams.outputTupleDesc.push_back(attrDesc); 00779 vector<boost::shared_ptr<ColumnGenerator > > columnGenerators; 00780 columnGenerators.push_back( 00781 SharedInt64ColumnGenerator(new SeqColumnGenerator(0))); 00782 columnGenerators.push_back( 00783 SharedInt64ColumnGenerator(new SeqColumnGenerator(nRows))); 00784 mockParams.pGenerator.reset( 00785 new CompositeExecStreamGenerator(columnGenerators)); 00786 00787 ExecStreamEmbryo mockStreamEmbryo; 00788 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams); 00789 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 00790 00791
00792 BTreeDescriptor descriptor; 00793 BTreeInsertExecStreamParams bTreeInsertParams; 00794 descriptor.tupleDescriptor.push_back(attrDesc); 00795 descriptor.tupleDescriptor.push_back(attrDesc); 00796 descriptor.keyProjection.push_back(0); 00797 descriptor.segmentAccessor.pSegment = pRandomSegment; 00798 descriptor.segmentAccessor.pCacheAccessor = pCacheAccessor; 00799 BTreeBuilder builder(descriptor, pRandomSegment); 00800 if (!useDynamicBTree) { 00801 builder.createEmptyRoot(); 00802 descriptor.rootPageId = builder.getRootPageId(); 00803 bTreeInsertParams.rootPageIdParamId = DynamicParamId(0); 00804 } else { 00805 descriptor.rootPageId = NULL_PAGE_ID; 00806 bTreeInsertParams.rootPageIdParamId = DynamicParamId(1); 00807 } 00808 00809 bTreeInsertParams.scratchAccessor = 00810 pSegmentFactory->newScratchSegment(pCache, 10); 00811 bTreeInsertParams.pCacheAccessor = pCacheAccessor; 00812 bTreeInsertParams.distinctness = DUP_FAIL; 00813 bTreeInsertParams.monotonic = true; 00814 bTreeInsertParams.pSegment = pRandomSegment; 00815 bTreeInsertParams.pCacheAccessor = pCacheAccessor; 00816 bTreeInsertParams.rootPageId = descriptor.rootPageId; 00817 bTreeInsertParams.segmentId = descriptor.segmentId; 00818 bTreeInsertParams.pageOwnerId = descriptor.pageOwnerId; 00819 bTreeInsertParams.tupleDesc = descriptor.tupleDescriptor; 00820 bTreeInsertParams.keyProj = descriptor.keyProjection; 00821 bTreeInsertParams.pRootMap = 0; 00822 bTreeInsertParams.outputTupleDesc.push_back(attrDesc); 00823 00824 ExecStreamEmbryo bTreeInsertEmbryo; 00825 bTreeInsertEmbryo.init(new BTreeInsertExecStream(), bTreeInsertParams); 00826 bTreeInsertEmbryo.getStream()->setName("BTreeInsertExecStream"); 00827 00828 SharedExecStream pOutputStream = 00829 prepareTransformGraph(mockStreamEmbryo, bTreeInsertEmbryo); 00830 00831 ConstExecStreamGenerator expectedResultGenerator(0); 00832 verifyOutput(*pOutputStream, 0, expectedResultGenerator); 00833 00834
00835 if (useDynamicBTree) { 00836 descriptor.rootPageId = 00837 *reinterpret_cast<PageId const *>( 00838 pGraph->getDynamicParamManager()->getParam( 00839 DynamicParamId(1)).getDatum().pData); 00840 } 00841 00842
00843
00844 BTreeReader reader(descriptor); 00845 bool found = reader.searchFirst(); 00846 if (!found) { 00847 BOOST_FAIL("searchFirst found nothing"); 00848 } 00849 TupleData tupleData; 00850 tupleData.compute(descriptor.tupleDescriptor); 00851 for (uint i = 0; i < nRows; i++) { 00852 if (!found) { 00853 BOOST_FAIL("Could not searchNext for key #" << i); 00854 } 00855 reader.getTupleAccessorForRead().unmarshal(tupleData); 00856 uint64_t key = *reinterpret_cast<uint64_t const *>(tupleData[0].pData); 00857 uint64_t val = *reinterpret_cast<uint64_t const *>(tupleData[1].pData); 00858 BOOST_CHECK_EQUAL(key, i); 00859 BOOST_CHECK_EQUAL(val, i + nRows); 00860 found = reader.searchNext(); 00861 } 00862 if (!reader.isSingular()) { 00863 BOOST_FAIL("Should have reached end of tree"); 00864 } 00865 reader.endSearch(); 00866 } 00867 00868 void ExecStreamTestSuite::testNestedLoopJoinExecStream( 00869 uint nRowsLeft, 00870 uint nRowsRight) 00871 { 00872
00873 00874 StandardTypeDescriptorFactory stdTypeFactory; 00875 TupleAttributeDescriptor attrDesc( 00876 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00877 00878
00879 MockProducerExecStreamParams paramsMockOuter; 00880 paramsMockOuter.outputTupleDesc.push_back(attrDesc); 00881 paramsMockOuter.nRows = nRowsLeft; 00882 paramsMockOuter.pGenerator.reset(new RampExecStreamGenerator(0, 1)); 00883 00884 ExecStreamEmbryo outerStreamEmbryo; 00885 outerStreamEmbryo.init(new MockProducerExecStream(), paramsMockOuter); 00886 outerStreamEmbryo.getStream()->setName("OuterProducerExecStream"); 00887 00888
00889
00890
00891
00892
00893 MockProducerExecStreamParams paramsMockInner; 00894 paramsMockInner.outputTupleDesc.push_back(attrDesc); 00895 paramsMockInner.nRows = nRowsRight; 00896 paramsMockInner.pGenerator.reset(new RampExecStreamGenerator(0, 1)); 00897 00898 ExecStreamEmbryo innerStreamEmbryo; 00899 innerStreamEmbryo.init(new MockProducerExecStream(), paramsMockInner); 00900 innerStreamEmbryo.getStream()->setName("InnerProducerExecStream"); 00901 00902 ReshapeExecStreamParams paramsReshape; 00903 paramsReshape.compOp = COMP_EQ; 00904 paramsReshape.outputProj.push_back(0); 00905 paramsReshape.inputCompareProj.push_back(0); 00906 paramsReshape.dynamicParameters.push_back( 00907 ReshapeParameter(DynamicParamId(1), 0, false)); 00908 paramsReshape.outputTupleDesc.push_back(attrDesc); 00909 00910 ExecStreamEmbryo reshapeStreamEmbryo; 00911 reshapeStreamEmbryo.init(new ReshapeExecStream(), paramsReshape); 00912 reshapeStreamEmbryo.getStream()->setName("ReshapeExecStream"); 00913 00914
00915
00916
00917 ValuesExecStreamParams paramsValues; 00918 paramsValues.bufSize = 0; 00919 paramsValues.outputTupleDesc.push_back(attrDesc); 00920 ExecStreamEmbryo valuesStreamEmbryo; 00921 valuesStreamEmbryo.init(new ValuesExecStream(), paramsValues); 00922 valuesStreamEmbryo.getStream()->setName("ValuesExecStream"); 00923 00924
00925 std::vector<std::vector > sourceStreamEmbryosList; 00926 std::vector sourceStreamEmbryos; 00927 sourceStreamEmbryos.push_back(outerStreamEmbryo); 00928 sourceStreamEmbryosList.push_back(sourceStreamEmbryos); 00929 00930 sourceStreamEmbryos.clear(); 00931 sourceStreamEmbryos.push_back(innerStreamEmbryo); 00932 sourceStreamEmbryos.push_back(reshapeStreamEmbryo); 00933 sourceStreamEmbryosList.push_back(sourceStreamEmbryos); 00934 00935 sourceStreamEmbryos.clear(); 00936 sourceStreamEmbryos.push_back(valuesStreamEmbryo); 00937 sourceStreamEmbryosList.push_back(sourceStreamEmbryos); 00938 00939 NestedLoopJoinExecStreamParams paramsJoin; 00940 paramsJoin.leftOuter = false; 00941 paramsJoin.leftJoinKeys.push_back( 00942 NestedLoopJoinKey(DynamicParamId(1), 0)); 00943 00944 ExecStreamEmbryo joinStreamEmbryo; 00945 joinStreamEmbryo.init(new NestedLoopJoinExecStream(), paramsJoin); 00946 joinStreamEmbryo.getStream()->setName("NestedLoopJoinExecStream"); 00947 00948 SharedExecStream pOutputStream = 00949 prepareConfluenceGraph(sourceStreamEmbryosList, joinStreamEmbryo); 00950 00951 vector<boost::shared_ptr<ColumnGenerator > > columnGenerators; 00952 SharedInt64ColumnGenerator colGen = 00953 SharedInt64ColumnGenerator(new SeqColumnGenerator(0)); 00954 columnGenerators.push_back(colGen); 00955 colGen = SharedInt64ColumnGenerator(new SeqColumnGenerator(0)); 00956 columnGenerators.push_back(colGen); 00957 00958 CompositeExecStreamGenerator resultGenerator(columnGenerators); 00959 verifyOutput( 00960 *pOutputStream, std::min(nRowsLeft, nRowsRight), resultGenerator); 00961 } 00962 00963 void ExecStreamTestSuite::testSplitterPlusBarrier() 00964 { 00965 StandardTypeDescriptorFactory stdTypeFactory; 00966 TupleAttributeDescriptor attrDesc( 00967 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00968 MockProducerExecStreamParams mockParams; 00969 mockParams.outputTupleDesc.push_back(attrDesc); 00970 uint nRows = 10000; 00971 mockParams.nRows = nRows; 00972 ExecStreamEmbryo mockStreamEmbryo; 00973 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams); 00974 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 00975 00976 SplitterExecStreamParams splitterParams; 00977 ExecStreamEmbryo splitterStreamEmbryo; 00978 splitterStreamEmbryo.init(new SplitterExecStream(), splitterParams); 00979 splitterStreamEmbryo.getStream()->setName("SplitterExecStream"); 00980 00981 vector<vector > aggEmbryoStreamList; 00982 for (int i = 0; i < 10; i++) { 00983 SortedAggExecStreamParams aggParams; 00984 aggParams.groupByKeyCount = 0; 00985 aggParams.outputTupleDesc.push_back(attrDesc); 00986 AggInvocation countInvocation; 00987 countInvocation.aggFunction = AGG_FUNC_COUNT; 00988 countInvocation.iInputAttr = -1; 00989 aggParams.aggInvocations.push_back(countInvocation); 00990 00991 ExecStreamEmbryo aggStreamEmbryo; 00992 aggStreamEmbryo.init(new SortedAggExecStream(),aggParams); 00993 std::ostringstream oss; 00994 oss << "AggExecStream" << "#" << i; 00995 aggStreamEmbryo.getStream()->setName(oss.str()); 00996 vector v; 00997 v.push_back(aggStreamEmbryo); 00998 aggEmbryoStreamList.push_back(v); 00999 } 01000 01001 BarrierExecStreamParams barrierParams; 01002 barrierParams.outputTupleDesc.push_back(attrDesc); 01003 barrierParams.returnMode = BARRIER_RET_ANY_INPUT; 01004 ExecStreamEmbryo barrierStreamEmbryo; 01005 barrierStreamEmbryo.init(new BarrierExecStream(), barrierParams); 01006 barrierStreamEmbryo.getStream()->setName("BarrierExecStream"); 01007 01008 SharedExecStream pOutputStream = 01009 prepareDAG( 01010 mockStreamEmbryo, 01011 splitterStreamEmbryo, 01012 aggEmbryoStreamList, 01013 barrierStreamEmbryo); 01014 01015 ConstExecStreamGenerator expectedResultGenerator(nRows); 01016 verifyOutput(*pOutputStream, 1, expectedResultGenerator); 01017 } 01018 01019 void ExecStreamTestSuite::testSegBufferReaderWriterExecStream( 01020 bool restartable, 01021 bool earlyClose) 01022 { 01023
01024
01025
01026
01027
01028
01029
01030
01031
01032
01033
01034
01035
01036 01037 StandardTypeDescriptorFactory stdTypeFactory; 01038 TupleAttributeDescriptor attrDesc( 01039 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 01040 01041 MockProducerExecStreamParams mockParams; 01042 mockParams.outputTupleDesc.push_back(attrDesc); 01043 uint nRows; 01044 if (!restartable) { 01045 nRows = 10000; 01046 } else { 01047 nRows = 200; 01048 } 01049 mockParams.nRows = nRows; 01050 01051 ExecStreamEmbryo mockStreamEmbryo; 01052 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams); 01053 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 01054 01055 SegBufferWriterExecStreamParams writerParams; 01056 writerParams.scratchAccessor.pSegment = pRandomSegment; 01057 writerParams.scratchAccessor.pCacheAccessor = pCacheAccessor; 01058 writerParams.readerRefCountParamId = DynamicParamId(1); 01059 writerParams.outputTupleDesc.push_back(attrDesc); 01060 01061 ExecStreamEmbryo writerStreamEmbryo; 01062 writerStreamEmbryo.init( 01063 new SegBufferWriterExecStream(), 01064 writerParams); 01065 writerStreamEmbryo.getStream()->setName("SegBufferWriterExecStream"); 01066 01067 SegBufferReaderExecStreamParams readerParams; 01068 readerParams.scratchAccessor.pSegment = pRandomSegment; 01069 readerParams.scratchAccessor.pCacheAccessor = pCacheAccessor; 01070 readerParams.readerRefCountParamId = DynamicParamId(1); 01071 readerParams.outputTupleDesc.push_back(attrDesc); 01072 01073 ExecStreamEmbryo readerStreamEmbryo1; 01074 readerStreamEmbryo1.init( 01075 new SegBufferReaderExecStream(), 01076 readerParams); 01077 readerStreamEmbryo1.getStream()->setName("SegBufferReaderExecStream1"); 01078 01079 ExecStreamEmbryo readerStreamEmbryo2; 01080 readerStreamEmbryo2.init( 01081 new SegBufferReaderExecStream(), 01082 readerParams); 01083 readerStreamEmbryo2.getStream()->setName("SegBufferReaderExecStream2"); 01084 01085 std::vector<std::vector > interStreamEmbryosList; 01086 std::vector readerInput; 01087 readerInput.push_back(readerStreamEmbryo1); 01088 if (earlyClose) { 01089 SegBufferExecStreamParams bufParams; 01090 bufParams.scratchAccessor.pSegment = pRandomSegment; 01091 bufParams.scratchAccessor.pCacheAccessor = pCacheAccessor; 01092 bufParams.multipass = false; 01093 01094 ExecStreamEmbryo bufStreamEmbryo; 01095 bufStreamEmbryo.init(new SegBufferExecStream(),bufParams); 01096 bufStreamEmbryo.getStream()->setName("SegBufferExecStream1"); 01097 readerInput.push_back(bufStreamEmbryo); 01098 } 01099 interStreamEmbryosList.push_back(readerInput); 01100 01101 readerInput.clear(); 01102 readerInput.push_back(readerStreamEmbryo2); 01103 if (earlyClose) { 01104 SegBufferExecStreamParams bufParams; 01105 bufParams.scratchAccessor.pSegment = pRandomSegment; 01106 bufParams.scratchAccessor.pCacheAccessor = pCacheAccessor; 01107 if (restartable) { 01108 bufParams.multipass = true; 01109 } 01110 01111 ExecStreamEmbryo bufStreamEmbryo; 01112 bufStreamEmbryo.init(new SegBufferExecStream(),bufParams); 01113 bufStreamEmbryo.getStream()->setName("SegBufferExecStream2"); 01114 readerInput.push_back(bufStreamEmbryo); 01115 } 01116 interStreamEmbryosList.push_back(readerInput); 01117 01118 MergeExecStreamParams mergeParams; 01119 CartesianJoinExecStreamParams joinParams; 01120 if (restartable) { 01121 joinParams.leftOuter = false; 01122 } else { 01123 mergeParams.outputTupleDesc.push_back(attrDesc); 01124 if (getDegreeOfParallelism() != 1) { 01125 mergeParams.isParallel = true; 01126 } 01127 } 01128 01129 ExecStreamEmbryo execStreamEmbryo; 01130 if (restartable) { 01131 execStreamEmbryo.init(new CartesianJoinExecStream(), joinParams); 01132 execStreamEmbryo.getStream()->setName("CartesianJoinExecStream"); 01133 } else { 01134 execStreamEmbryo.init(new MergeExecStream(), mergeParams); 01135 execStreamEmbryo.getStream()->setName("MergeExecStream"); 01136 } 01137 01138 SharedExecStream pOutputStream = 01139 prepareDAG( 01140 mockStreamEmbryo, 01141 writerStreamEmbryo, 01142 interStreamEmbryosList, 01143 execStreamEmbryo); 01144 01145 int64_t zero = 0; 01146 TupleDescriptor expectedDesc; 01147 expectedDesc.push_back(attrDesc); 01148 if (restartable) { 01149 expectedDesc.push_back(attrDesc); 01150 } 01151 TupleData expectedTuple; 01152 expectedTuple.compute(expectedDesc); 01153 expectedTuple[0].pData = reinterpret_cast(&zero); 01154 if (restartable) { 01155 expectedTuple[1].pData = reinterpret_cast(&zero); 01156 } 01157 01158 verifyConstantOutput( 01159 *pOutputStream, 01160 expectedTuple, 01161 restartable ? nRows * nRows : 2 * nRows); 01162 } 01163 01164