Fennel: /home/pub/open/dev/fennel/test/CorrelationJoinExecStreamTestSuite.cpp Source File (original) (raw)
00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/test/CorrelationJoinExecStreamTestSuite.h" 00026 #include "fennel/exec/CorrelationJoinExecStream.h" 00027 #include "fennel/tuple/StandardTypeDescriptor.h" 00028 #include "fennel/tuple/TupleOverflowExcn.h" 00029 #include "fennel/exec/MockProducerExecStream.h" 00030 #include "fennel/exec/ExecStreamEmbryo.h" 00031 #include "fennel/exec/ExecStreamGraph.h" 00032 00033 using namespace fennel; 00034 00035 CorrelationJoinExecStreamTestSuite::CorrelationJoinExecStreamTestSuite( 00036 bool addAllTests) 00037 { 00038 if (addAllTests) { 00039 FENNEL_UNIT_TEST_CASE( 00040 CorrelationJoinExecStreamTestSuite, testCorrelationJoin); 00041 } 00042 00043 StandardTypeDescriptorFactory stdTypeFactory; 00044 00045 descAttrInt64 = TupleAttributeDescriptor( 00046 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00047 descInt64.push_back(descAttrInt64); 00048 } 00049 00050 void CorrelationJoinExecStreamTestSuite::testCorrelationJoin() 00051 { 00052 MockProducerExecStreamParams paramsMockLeft; 00053 paramsMockLeft.outputTupleDesc.push_back(descAttrInt64); 00054 paramsMockLeft.pGenerator.reset(new RampExecStreamGenerator); 00055 paramsMockLeft.nRows = 5000; 00056 00057 ExecStreamEmbryo leftStreamEmbryo; 00058 leftStreamEmbryo.init(new MockProducerExecStream(),paramsMockLeft); 00059 leftStreamEmbryo.getStream()->setName("LeftProducerExecStream"); 00060 00061 DynamicParamId dynamicParamId(1); 00062 MockProducerExecStreamParams paramsMockRight(paramsMockLeft); 00063 paramsMockRight.pGenerator.reset(new DynamicParamExecStreamGenerator( 00064 dynamicParamId, 00065 pGraph->getDynamicParamManager())); 00066 paramsMockRight.nRows = 10; 00067 00068 ExecStreamEmbryo rightStreamEmbryo; 00069 rightStreamEmbryo.init(new MockProducerExecStream(),paramsMockRight); 00070 rightStreamEmbryo.getStream()->setName("RightProducerExecStream"); 00071 00072 CorrelationJoinExecStreamParams paramsJoin; 00073 00074 Correlation correlation(dynamicParamId, 0); 00075 paramsJoin.correlations.push_back(correlation); 00076 00077 ExecStreamEmbryo joinStreamEmbryo; 00078 joinStreamEmbryo.init(new CorrelationJoinExecStream(),paramsJoin); 00079 joinStreamEmbryo.getStream()->setName("CorrelationJoinExecStream"); 00080 00081 SharedExecStream pOutputStream = prepareConfluenceGraph( 00082 leftStreamEmbryo, 00083 rightStreamEmbryo, 00084 joinStreamEmbryo); 00085 00086 StairCaseExecStreamGenerator rampExpectedGenerator( 00087 1, paramsMockRight.nRows); 00088 verifyOutput( 00089 *pOutputStream, 00090 paramsMockLeft.nRows * paramsMockRight.nRows, 00091 rampExpectedGenerator); 00092 } 00093 00094