Fennel: /home/pub/open/dev/fennel/lucidera/test/LcsMultiClusterAppendTest.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 #include "fennel/common/CommonPreamble.h" 00023 #include "fennel/common/FemEnums.h" 00024 #include "fennel/test/ExecStreamUnitTestBase.h" 00025 #include "fennel/lucidera/colstore/LcsClusterAppendExecStream.h" 00026 #include "fennel/lucidera/colstore/LcsRowScanExecStream.h" 00027 #include "fennel/btree/BTreeBuilder.h" 00028 #include "fennel/ftrs/BTreeInsertExecStream.h" 00029 #include "fennel/ftrs/BTreeSearchExecStream.h" 00030 #include "fennel/ftrs/BTreeExecStream.h" 00031 #include "fennel/tuple/StandardTypeDescriptor.h" 00032 #include "fennel/tuple/TupleDescriptor.h" 00033 #include "fennel/exec/MockProducerExecStream.h" 00034 #include "fennel/exec/ValuesExecStream.h" 00035 #include "fennel/exec/ExecStreamEmbryo.h" 00036 #include "fennel/exec/SplitterExecStream.h" 00037 #include "fennel/exec/BarrierExecStream.h" 00038 #include "fennel/exec/DynamicParam.h" 00039 #include "fennel/cache/Cache.h" 00040 #include <stdarg.h> 00041 00042 #include <boost/test/test_tools.hpp> 00043 00044 using namespace fennel; 00045 00049 class LcsMultiClusterAppendTest : public ExecStreamUnitTestBase 00050 { 00051 protected: 00052 StandardTypeDescriptorFactory stdTypeFactory; 00053 TupleAttributeDescriptor attrDesc_int64; 00054 TupleAttributeDescriptor attrDesc_bitmap; 00055 00056 vector<boost::shared_ptr > bTreeClusters; 00057 00069 void loadClusters(uint nRows, uint nCols, uint nClusters); 00070 00076 void scanCols( 00077 uint nRows, uint nCols, uint nClusters, TupleProjection proj); 00078 00079 public: 00080 explicit LcsMultiClusterAppendTest() 00081 { 00082 FENNEL_UNIT_TEST_CASE(LcsMultiClusterAppendTest, testLoad); 00083 } 00084 00085 void testCaseSetUp(); 00086 void testCaseTearDown(); 00087 00088 void testLoad(); 00089 }; 00090 00091 void LcsMultiClusterAppendTest::testLoad() 00092 { 00093
00094
00095 00096 uint nRows = 100000; 00097 uint nCols = 5; 00098 uint nClusters = 7; 00099 TupleProjection proj; 00100 00101 loadClusters(nRows, nCols, nClusters); 00102 resetExecStreamTest(); 00103 00104
00105 for (uint i = 0; i < nClusters * nCols; i++) { 00106 proj.push_back(i); 00107 } 00108 scanCols(nRows, nCols, nClusters, proj); 00109 } 00110 00111 void LcsMultiClusterAppendTest::loadClusters( 00112 uint nRows, 00113 uint nCols, 00114 uint nClusters) 00115 { 00116
00117 00118 MockProducerExecStreamParams mockParams; 00119 for (uint i = 0; i < nCols * nClusters; i++) { 00120 mockParams.outputTupleDesc.push_back(attrDesc_int64); 00121 } 00122 mockParams.nRows = nRows; 00123 00124 vector<boost::shared_ptr<ColumnGenerator > > columnGenerators; 00125 for (uint i = 0; i < nCols * nClusters; i++) { 00126 SharedInt64ColumnGenerator col = 00127 SharedInt64ColumnGenerator(new DupColumnGenerator(i + 1)); 00128 columnGenerators.push_back(col); 00129 } 00130 mockParams.pGenerator.reset( 00131 new CompositeExecStreamGenerator(columnGenerators)); 00132 00133 ExecStreamEmbryo mockStreamEmbryo; 00134 mockStreamEmbryo.init(new MockProducerExecStream(), mockParams); 00135 mockStreamEmbryo.getStream()->setName("MockProducerExecStream"); 00136 00137
00138 00139 SplitterExecStreamParams splitterParams; 00140 ExecStreamEmbryo splitterStreamEmbryo; 00141 00142 splitterStreamEmbryo.init(new SplitterExecStream(), splitterParams); 00143 splitterStreamEmbryo.getStream()->setName("SplitterExecStream"); 00144 00145
00146 00147 vector lcsAppendEmbryos; 00148 for (uint i = 0; i < nClusters; i++) { 00149 LcsClusterAppendExecStreamParams lcsAppendParams; 00150 lcsAppendParams.scratchAccessor = 00151 pSegmentFactory->newScratchSegment(pCache, 10); 00152 lcsAppendParams.pCacheAccessor = pCache; 00153 lcsAppendParams.pSegment = pRandomSegment; 00154 00155
00156
00157 (lcsAppendParams.tupleDesc).push_back(attrDesc_int64); 00158 (lcsAppendParams.tupleDesc).push_back(attrDesc_int64); 00159 00160
00161 (lcsAppendParams.keyProj).push_back(0); 00162 00163
00164 lcsAppendParams.outputTupleDesc.push_back(attrDesc_int64); 00165 00166 for (uint j = 0; j < nCols; j++) { 00167 lcsAppendParams.inputProj.push_back(i * nCols + j); 00168 } 00169 lcsAppendParams.pRootMap = 0; 00170 lcsAppendParams.rootPageIdParamId = DynamicParamId(0); 00171 00172
00173
00174 00175 boost::shared_ptr pBTreeDesc = 00176 boost::shared_ptr (new BTreeDescriptor()); 00177 bTreeClusters.push_back(pBTreeDesc); 00178 pBTreeDesc->segmentAccessor.pSegment = lcsAppendParams.pSegment; 00179 pBTreeDesc->segmentAccessor.pCacheAccessor = pCache; 00180 pBTreeDesc->tupleDescriptor = lcsAppendParams.tupleDesc; 00181 pBTreeDesc->keyProjection = lcsAppendParams.keyProj; 00182 pBTreeDesc->rootPageId = NULL_PAGE_ID; 00183 lcsAppendParams.pageOwnerId = pBTreeDesc->pageOwnerId; 00184 lcsAppendParams.segmentId = pBTreeDesc->segmentId; 00185 00186 BTreeBuilder builder(*pBTreeDesc, pRandomSegment); 00187 builder.createEmptyRoot(); 00188 lcsAppendParams.rootPageId = pBTreeDesc->rootPageId = 00189 builder.getRootPageId(); 00190 00191
00192 00193 LcsClusterAppendExecStream *lcsStream = 00194 new LcsClusterAppendExecStream(); 00195 00196 ExecStreamEmbryo lcsAppendStreamEmbryo; 00197 lcsAppendStreamEmbryo.init(lcsStream, lcsAppendParams); 00198 std::ostringstream oss; 00199 oss << "LcsClusterAppendExecStream" << "#" << i; 00200 lcsAppendStreamEmbryo.getStream()-> setName(oss.str()); 00201 lcsAppendEmbryos.push_back(lcsAppendStreamEmbryo); 00202 } 00203 00204
00205 00206 BarrierExecStreamParams barrierParams; 00207 barrierParams.outputTupleDesc.push_back(attrDesc_int64); 00208 barrierParams.returnMode = BARRIER_RET_ANY_INPUT; 00209 00210 ExecStreamEmbryo barrierStreamEmbryo; 00211 barrierStreamEmbryo.init(new BarrierExecStream(), barrierParams); 00212 barrierStreamEmbryo.getStream()->setName("BarrierExecStream"); 00213 00214
00215 SharedExecStream pOutputStream = prepareDAG( 00216 mockStreamEmbryo, splitterStreamEmbryo, lcsAppendEmbryos, 00217 barrierStreamEmbryo); 00218 00219
00220 RampExecStreamGenerator expectedResultGenerator(mockParams.nRows); 00221 00222 verifyOutput(*pOutputStream, 1, expectedResultGenerator); 00223 } 00224 00225 void LcsMultiClusterAppendTest::scanCols( 00226 uint nRows, 00227 uint nCols, 00228 uint nClusters, 00229 TupleProjection proj) 00230 { 00231
00232
00233 00234 LcsRowScanExecStreamParams scanParams; 00235 scanParams.hasExtraFilter = false; 00236 scanParams.isFullScan = true; 00237 scanParams.samplingMode = SAMPLING_OFF; 00238 for (uint i = 0; i < nClusters; i++) { 00239 struct LcsClusterScanDef clusterScanDef; 00240 00241 for (uint j = 0; j < nCols; j++) { 00242 clusterScanDef.clusterTupleDesc.push_back(attrDesc_int64); 00243 } 00244 00245 clusterScanDef.pSegment = bTreeClusters[i]->segmentAccessor.pSegment; 00246 clusterScanDef.pCacheAccessor = 00247 bTreeClusters[i]->segmentAccessor.pCacheAccessor; 00248 clusterScanDef.tupleDesc = bTreeClusters[i]->tupleDescriptor; 00249 clusterScanDef.keyProj = bTreeClusters[i]->keyProjection; 00250 clusterScanDef.rootPageId = bTreeClusters[i]->rootPageId; 00251 clusterScanDef.pageOwnerId = bTreeClusters[i]->pageOwnerId; 00252 clusterScanDef.segmentId = bTreeClusters[i]->segmentId; 00253 00254 scanParams.lcsClusterScanDefs.push_back(clusterScanDef); 00255 } 00256 00257
00258 00259 scanParams.outputProj = proj; 00260 for (uint i = 0; i < proj.size(); i++) { 00261 scanParams.outputTupleDesc.push_back(attrDesc_int64); 00262 } 00263 00264
00265
00266 00267 ValuesExecStreamParams valuesParams; 00268 ExecStreamEmbryo valuesStreamEmbryo; 00269 boost::shared_array pBuffer; 00270 00271 valuesParams.outputTupleDesc.push_back(attrDesc_int64); 00272 valuesParams.outputTupleDesc.push_back(attrDesc_bitmap); 00273 valuesParams.outputTupleDesc.push_back(attrDesc_bitmap); 00274 00275 uint bufferSize = 16; 00276 pBuffer.reset(new FixedBuffer[bufferSize]); 00277 valuesParams.pTupleBuffer = pBuffer; 00278 valuesParams.bufSize = 0; 00279 valuesStreamEmbryo.init(new ValuesExecStream(), valuesParams); 00280 valuesStreamEmbryo.getStream()->setName("ValuesExecStream"); 00281 00282 ExecStreamEmbryo scanStreamEmbryo; 00283 00284 scanStreamEmbryo.init(new LcsRowScanExecStream(), scanParams); 00285 scanStreamEmbryo.getStream()->setName("RowScanExecStream"); 00286 00287 SharedExecStream pOutputStream = 00288 prepareTransformGraph(valuesStreamEmbryo, scanStreamEmbryo); 00289 00290
00291 00292 vector<boost::shared_ptr<ColumnGenerator > > columnGenerators; 00293 for (uint i = 0; i < proj.size(); i++) { 00294 SharedInt64ColumnGenerator col = 00295 SharedInt64ColumnGenerator(new DupColumnGenerator(proj[i] + 1)); 00296 columnGenerators.push_back(col); 00297 } 00298 00299 CompositeExecStreamGenerator resultGenerator(columnGenerators); 00300 verifyOutput(*pOutputStream, nRows, resultGenerator); 00301 } 00302 00303 void LcsMultiClusterAppendTest::testCaseSetUp() 00304 { 00305 ExecStreamUnitTestBase::testCaseSetUp(); 00306 00307 attrDesc_int64 = TupleAttributeDescriptor( 00308 stdTypeFactory.newDataType(STANDARD_TYPE_INT_64)); 00309 attrDesc_bitmap = TupleAttributeDescriptor( 00310 stdTypeFactory.newDataType(STANDARD_TYPE_VARBINARY), 00311 true, pRandomSegment->getUsablePageSize() / 8); 00312 } 00313 00314 void LcsMultiClusterAppendTest::testCaseTearDown() 00315 { 00316 for (uint i = 0; i < bTreeClusters.size(); i++) { 00317 bTreeClusters[i]->segmentAccessor.reset(); 00318 } 00319 ExecStreamUnitTestBase::testCaseTearDown(); 00320 } 00321 00322 FENNEL_UNIT_TEST_SUITE(LcsMultiClusterAppendTest); 00323 00324 00325