Fennel: /home/pub/open/dev/fennel/test/CollectExecStreamTestSuite.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/test/CollectExecStreamTestSuite.h" 00026 #include "fennel/exec/CollectExecStream.h" 00027 #include "fennel/exec/UncollectExecStream.h" 00028 #include "fennel/tuple/StandardTypeDescriptor.h" 00029 #include "fennel/tuple/TupleOverflowExcn.h" 00030 #include "fennel/exec/MockProducerExecStream.h" 00031 #include "fennel/exec/ExecStreamEmbryo.h" 00032 00033 using namespace fennel; 00034 00035 CollectExecStreamTestSuite::CollectExecStreamTestSuite(bool addAllTests) 00036 { 00037 if (addAllTests) { 00038 FENNEL_UNIT_TEST_CASE(CollectExecStreamTestSuite,testCollectInts); 00039 FENNEL_UNIT_TEST_CASE(CollectExecStreamTestSuite,testCollectUncollect); 00040 FENNEL_UNIT_TEST_CASE( 00041 CollectExecStreamTestSuite,testCollectCollectUncollectUncollect); 00042 } 00043 00044 StandardTypeDescriptorFactory stdTypeFactory; 00045 00046 descAttrInt64 = 00047 TupleAttributeDescriptor( 00048 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00049 descInt64.push_back(descAttrInt64); 00050 00051 descAttrVarbinary32 = 00052 TupleAttributeDescriptor( 00053 stdTypeFactory.newDataType(STANDARD_TYPE_VARBINARY), true, 32); 00054 descVarbinary32.push_back(descAttrVarbinary32); 00055 } 00056 00057 void CollectExecStreamTestSuite::testCollectInts() 00058 { 00059 uint rows = 2; 00060 MockProducerExecStreamParams mockParams; 00061 mockParams.outputTupleDesc.push_back(descAttrInt64); 00062 mockParams.nRows = rows; 00063 mockParams.pGenerator.reset(new RampExecStreamGenerator(1)); 00064 00065 CollectExecStreamParams collectParams; 00066 collectParams.outputTupleDesc = descVarbinary32; 00067 00068 ExecStreamEmbryo mockStreamEmbryo; 00069 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams); 00070 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 00071 00072 ExecStreamEmbryo collectStreamEmbryo; 00073 collectStreamEmbryo.init(new CollectExecStream(), collectParams); 00074 collectStreamEmbryo.getStream()->setName("CollectExecStream"); 00075 00076 00077
00078 uint8_t intArrayBuff[32]; 00079 uint64_t one = 1; 00080 TupleData oneData(descInt64); 00081 oneData[0].pData = (PConstBuffer) &one; 00082 TupleAccessor oneAccessor; 00083 oneAccessor.compute(descInt64); 00084 assert(oneAccessor.getMaxByteCount() <= sizeof(intArrayBuff)); 00085 oneAccessor.marshal(oneData, (PBuffer) intArrayBuff); 00086 00087 uint64_t two = 2; 00088 TupleData twoData(descInt64); 00089 twoData[0].pData = (PConstBuffer) &two; 00090 TupleAccessor twoAccessor; 00091 twoAccessor.compute(descInt64); 00092 assert((oneAccessor.getMaxByteCount() + twoAccessor.getMaxByteCount()) <= 00093 sizeof(intArrayBuff)); 00094 twoAccessor.marshal( 00095 twoData, 00096 ((PBuffer)intArrayBuff) + oneAccessor.getMaxByteCount()); 00097 00098 uint8_t varbinaryBuff[1000]; 00099 TupleData binData(descVarbinary32); 00100 binData[0].pData = (PConstBuffer) intArrayBuff; 00101 binData[0].cbData = 00102 oneAccessor.getMaxByteCount() + twoAccessor.getMaxByteCount(); 00103 TupleAccessor binAccessor; 00104 binAccessor.compute(descVarbinary32); 00105 binAccessor.marshal(binData, (PBuffer) varbinaryBuff); 00106 00107 00108 SharedExecStream pOutputStream = prepareTransformGraph( 00109 mockStreamEmbryo, collectStreamEmbryo); 00110 00111 verifyConstantOutput(*pOutputStream, binData, 1); 00112 } 00113 00114 void CollectExecStreamTestSuite::testCollectUncollect() 00115 { 00116 StandardTypeDescriptorFactory stdTypeFactory; 00117 uint rows = 127; 00118 00119 TupleAttributeDescriptor tupleDescAttr( 00120 stdTypeFactory.newDataType(STANDARD_TYPE_VARBINARY), 00121 true, 00122 2 * rows * sizeof(uint64_t)); 00123 TupleDescriptor tupleDesc; 00124 tupleDesc.push_back(tupleDescAttr); 00125 00126 MockProducerExecStreamParams mockParams; 00127 mockParams.outputTupleDesc.push_back(descAttrInt64); 00128 mockParams.nRows = rows; 00129 mockParams.pGenerator.reset(new RampExecStreamGenerator()); 00130 00131 CollectExecStreamParams collectParams; 00132 collectParams.outputTupleDesc = tupleDesc; 00133 00134 UncollectExecStreamParams uncollectParams; 00135 uncollectParams.outputTupleDesc = descInt64; 00136 00137 ExecStreamEmbryo mockStreamEmbryo; 00138 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams); 00139 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 00140 00141 ExecStreamEmbryo collectStreamEmbryo; 00142 collectStreamEmbryo.init(new CollectExecStream(), collectParams); 00143 collectStreamEmbryo.getStream()->setName("CollectExecStream"); 00144 00145 ExecStreamEmbryo uncollectStreamEmbryo; 00146 uncollectStreamEmbryo.init(new UncollectExecStream(), uncollectParams); 00147 uncollectStreamEmbryo.getStream()->setName("UncollectExecStream"); 00148 00149 00150 std::vector transforms; 00151 transforms.push_back(collectStreamEmbryo); 00152 transforms.push_back(uncollectStreamEmbryo); 00153 SharedExecStream pOutputStream = prepareTransformGraph( 00154 mockStreamEmbryo, transforms); 00155 00156 RampExecStreamGenerator rampExpectedGenerator; 00157 00158 verifyOutput(*pOutputStream, rows, rampExpectedGenerator); 00159 } 00160 00161 void CollectExecStreamTestSuite::testCollectCollectUncollectUncollect() { 00162 StandardTypeDescriptorFactory stdTypeFactory; 00163 uint rows = 3; 00164 00165 TupleAttributeDescriptor tupleDescAttr1( 00166 stdTypeFactory.newDataType(STANDARD_TYPE_VARBINARY), 00167 true, 00168 2 * rows * sizeof(uint64_t)); 00169 TupleDescriptor vbDesc1; 00170 vbDesc1.push_back(tupleDescAttr1); 00171 00172 TupleAttributeDescriptor tupleDescAttr2( 00173 stdTypeFactory.newDataType(STANDARD_TYPE_VARBINARY), 00174 true, 00175 2 * rows * rows * sizeof(uint64_t)); 00176 TupleDescriptor vbDesc2; 00177 vbDesc2.push_back(tupleDescAttr2); 00178 00179 MockProducerExecStreamParams mockParams; 00180 mockParams.outputTupleDesc.push_back(descAttrInt64); 00181 mockParams.nRows = rows; 00182 mockParams.pGenerator.reset(new RampExecStreamGenerator()); 00183 00184 CollectExecStreamParams collectParams1; 00185 collectParams1.outputTupleDesc = vbDesc1; 00186 00187 CollectExecStreamParams collectParams2; 00188 collectParams2.outputTupleDesc = vbDesc2; 00189 00190 UncollectExecStreamParams uncollectParams1; 00191 uncollectParams1.outputTupleDesc = vbDesc1; 00192 00193 UncollectExecStreamParams uncollectParams2; 00194 uncollectParams2.outputTupleDesc = descInt64; 00195 00196 ExecStreamEmbryo mockStreamEmbryo; 00197 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams); 00198 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 00199 00200 ExecStreamEmbryo collectStreamEmbryo1; 00201 collectStreamEmbryo1.init(new CollectExecStream(), collectParams1); 00202 collectStreamEmbryo1.getStream()->setName("CollectExecStream1"); 00203 00204 ExecStreamEmbryo collectStreamEmbryo2; 00205 collectStreamEmbryo2.init(new CollectExecStream(), collectParams2); 00206 collectStreamEmbryo2.getStream()->setName("CollectExecStream2"); 00207 00208 ExecStreamEmbryo uncollectStreamEmbryo1; 00209 uncollectStreamEmbryo1.init(new UncollectExecStream(), uncollectParams1); 00210 uncollectStreamEmbryo1.getStream()->setName("UncollectExecStream1"); 00211 00212 ExecStreamEmbryo uncollectStreamEmbryo2; 00213 uncollectStreamEmbryo2.init(new UncollectExecStream(), uncollectParams2); 00214 uncollectStreamEmbryo2.getStream()->setName("UncollectExecStream2"); 00215 00216 std::vector transforms; 00217 transforms.push_back(collectStreamEmbryo1); 00218 transforms.push_back(collectStreamEmbryo2); 00219 transforms.push_back(uncollectStreamEmbryo1); 00220 transforms.push_back(uncollectStreamEmbryo2); 00221 SharedExecStream pOutputStream = prepareTransformGraph( 00222 mockStreamEmbryo, transforms); 00223 00224 RampExecStreamGenerator rampExpectedGenerator; 00225 00226 verifyOutput(*pOutputStream, rows, rampExpectedGenerator); 00227 } 00228 00229