Fennel: /home/pub/open/dev/fennel/test/ExecStreamUnitTestBase.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/test/ExecStreamUnitTestBase.h" 00026 #include "fennel/exec/ExecStreamGraph.h" 00027 #include "fennel/exec/ExecStreamGraphEmbryo.h" 00028 #include "fennel/exec/ExecStreamScheduler.h" 00029 #include "fennel/exec/ExecStream.h" 00030 #include "fennel/exec/ScratchBufferExecStream.h" 00031 #include "fennel/exec/ExecStreamEmbryo.h" 00032 #include "fennel/exec/ExecStreamBufAccessor.h" 00033 #include "fennel/exec/MockProducerExecStream.h" 00034 #include "fennel/tuple/TuplePrinter.h" 00035 #include "fennel/cache/QuotaCacheAccessor.h" 00036 00037 #include <boost/test/test_tools.hpp> 00038 00039 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/test/ExecStreamUnitTestBase.cpp#23 $"); 00040 00041 SharedExecStream ExecStreamUnitTestBase::prepareSourceGraph( 00042 ExecStreamEmbryo &sourceStreamEmbryo) 00043 { 00044 std::vector transforms; 00045 return prepareTransformGraph(sourceStreamEmbryo, transforms); 00046 } 00047 00048 SharedExecStream ExecStreamUnitTestBase::prepareTransformGraph( 00049 ExecStreamEmbryo &sourceStreamEmbryo, 00050 ExecStreamEmbryo &transformStreamEmbryo) 00051 { 00052 std::vector transforms; 00053 transforms.push_back(transformStreamEmbryo); 00054 return prepareTransformGraph(sourceStreamEmbryo, transforms); 00055 } 00056 00057 SharedExecStream ExecStreamUnitTestBase::prepareTransformGraph( 00058 ExecStreamEmbryo &sourceStreamEmbryo, 00059 std::vector &transforms) 00060 { 00061 pGraphEmbryo->saveStreamEmbryo(sourceStreamEmbryo); 00062 std::vector::iterator it; 00063 00064
00065 for (it = transforms.begin(); it != transforms.end(); ++it) { 00066 pGraphEmbryo->saveStreamEmbryo(*it); 00067 } 00068 00069
00070 ExecStreamEmbryo& previousStream = sourceStreamEmbryo; 00071 for (it = transforms.begin(); it != transforms.end(); ++it) { 00072 pGraphEmbryo->addDataflow( 00073 previousStream.getStream()->getName(), 00074 (*it).getStream()->getName()); 00075 previousStream = *it; 00076 } 00077 00078 SharedExecStream pAdaptedStream = 00079 pGraphEmbryo->addAdapterFor( 00080 previousStream.getStream()->getName(), 00081 0, 00082 BUFPROV_PRODUCER); 00083 pGraph->addOutputDataflow(pAdaptedStream->getStreamId()); 00084 00085 pGraphEmbryo->prepareGraph(shared_from_this(), ""); 00086 return pAdaptedStream; 00087 } 00088 00089 SharedExecStream ExecStreamUnitTestBase::prepareConfluenceGraph( 00090 ExecStreamEmbryo &sourceStreamEmbryo1, 00091 ExecStreamEmbryo &sourceStreamEmbryo2, 00092 ExecStreamEmbryo &confluenceStreamEmbryo) 00093 { 00094 std::vector sourceStreamEmbryos; 00095 sourceStreamEmbryos.push_back(sourceStreamEmbryo1); 00096 sourceStreamEmbryos.push_back(sourceStreamEmbryo2); 00097 return prepareConfluenceGraph(sourceStreamEmbryos, confluenceStreamEmbryo); 00098 } 00099 00100 SharedExecStream ExecStreamUnitTestBase::prepareConfluenceTransformGraph( 00101 ExecStreamEmbryo &sourceStreamEmbryo1, 00102 ExecStreamEmbryo &sourceStreamEmbryo2, 00103 ExecStreamEmbryo &confluenceStreamEmbryo, 00104 ExecStreamEmbryo &transformStreamEmbryo) 00105 { 00106 std::vector sourceStreamEmbryos; 00107 sourceStreamEmbryos.push_back(sourceStreamEmbryo1); 00108 sourceStreamEmbryos.push_back(sourceStreamEmbryo2); 00109 00110 std::vector::iterator it; 00111 00112 for (it = sourceStreamEmbryos.begin(); it != sourceStreamEmbryos.end(); 00113 ++it) 00114 { 00115 pGraphEmbryo->saveStreamEmbryo(*it); 00116 } 00117 pGraphEmbryo->saveStreamEmbryo(confluenceStreamEmbryo); 00118 00119 for (it = sourceStreamEmbryos.begin(); it != sourceStreamEmbryos.end(); 00120 ++it) 00121 { 00122 pGraphEmbryo->addDataflow( 00123 (*it).getStream()->getName(), 00124 confluenceStreamEmbryo.getStream()->getName()); 00125 } 00126 00127 std::vector transforms; 00128 transforms.push_back(transformStreamEmbryo); 00129 ExecStreamEmbryo& previousStream = confluenceStreamEmbryo; 00130 00131
00132 for (it = transforms.begin(); it != transforms.end(); ++it) { 00133 pGraphEmbryo->saveStreamEmbryo(*it); 00134 } 00135 00136 for (it = transforms.begin(); it != transforms.end(); ++it) { 00137 pGraphEmbryo->addDataflow( 00138 previousStream.getStream()->getName(), 00139 (it).getStream()->getName()); 00140 previousStream = it; 00141 } 00142 00143 00144 SharedExecStream pAdaptedStream = 00145 pGraphEmbryo->addAdapterFor( 00146 previousStream.getStream()->getName(), 00147 0, 00148 BUFPROV_PRODUCER); 00149 pGraph->addOutputDataflow(pAdaptedStream->getStreamId()); 00150 00151 pGraphEmbryo->prepareGraph(shared_from_this(), ""); 00152 return pAdaptedStream; 00153 } 00154 00155 SharedExecStream ExecStreamUnitTestBase::prepareConfluenceGraph( 00156 std::vector &sourceStreamEmbryos, 00157 ExecStreamEmbryo &confluenceStreamEmbryo) 00158 { 00159 std::vector<std::vector > sourceStreamEmbryosList; 00160 std::vector::iterator it; 00161 std::vector sourceStreamList; 00162 for (it = sourceStreamEmbryos.begin(); it != sourceStreamEmbryos.end(); 00163 it++) 00164 { 00165 sourceStreamList.clear(); 00166 sourceStreamList.push_back(it); 00167 sourceStreamEmbryosList.push_back(sourceStreamList); 00168 } 00169 00170 return 00171 prepareConfluenceGraph(sourceStreamEmbryosList, confluenceStreamEmbryo); 00172 } 00173 00174 SharedExecStream ExecStreamUnitTestBase::prepareConfluenceGraph( 00175 std::vector<std::vector > &sourceStreamEmbryosList, 00176 ExecStreamEmbryo &confluenceStreamEmbryo) 00177 { 00178 pGraphEmbryo->saveStreamEmbryo(confluenceStreamEmbryo); 00179 00180 for (int i = 0; i < sourceStreamEmbryosList.size(); i++) { 00181 for (int j = 0; j < sourceStreamEmbryosList[i].size(); j++) { 00182 pGraphEmbryo->saveStreamEmbryo(sourceStreamEmbryosList[i][j]); 00183 } 00184 00185
00186 for (int j = 1; j < sourceStreamEmbryosList[i].size(); j++) { 00187 pGraphEmbryo->addDataflow( 00188 sourceStreamEmbryosList[i][j - 1].getStream()->getName(), 00189 sourceStreamEmbryosList[i][j].getStream()->getName()); 00190 } 00191 pGraphEmbryo->addDataflow( 00192 sourceStreamEmbryosList[i].back().getStream()->getName(), 00193 confluenceStreamEmbryo.getStream()->getName()); 00194 } 00195 00196 SharedExecStream pAdaptedStream = 00197 pGraphEmbryo->addAdapterFor( 00198 confluenceStreamEmbryo.getStream()->getName(), 0, 00199 BUFPROV_PRODUCER); 00200 pGraph->addOutputDataflow( 00201 pAdaptedStream->getStreamId()); 00202 00203 pGraphEmbryo->prepareGraph(shared_from_this(), ""); 00204 00205 return pAdaptedStream; 00206 } 00207 00208 SharedExecStream ExecStreamUnitTestBase::prepareDAG( 00209 ExecStreamEmbryo &srcStreamEmbryo, 00210 ExecStreamEmbryo &splitterStreamEmbryo, 00211 std::vector &interStreamEmbryos, 00212 ExecStreamEmbryo &destStreamEmbryo, 00213 bool createSink, 00214 bool saveSrc) 00215 { 00216 std::vector<std::vector > listOfList; 00217 00218
00219
00220
00221 for (uint i = 0; i < interStreamEmbryos.size(); i++) { 00222 std::vector interStreamEmbryoList; 00223 00224 interStreamEmbryoList.push_back(interStreamEmbryos[i]); 00225 listOfList.push_back(interStreamEmbryoList); 00226 } 00227 return prepareDAG( 00228 srcStreamEmbryo, splitterStreamEmbryo, listOfList, destStreamEmbryo, 00229 createSink, saveSrc); 00230 } 00231 00232 SharedExecStream ExecStreamUnitTestBase::prepareDAG( 00233 ExecStreamEmbryo &srcStreamEmbryo, 00234 ExecStreamEmbryo &splitterStreamEmbryo, 00235 std::vector<std::vector > &interStreamEmbryos, 00236 ExecStreamEmbryo &destStreamEmbryo, 00237 bool createSink, 00238 bool saveSrc) 00239 { 00240 if (saveSrc) { 00241 pGraphEmbryo->saveStreamEmbryo(srcStreamEmbryo); 00242 } 00243 pGraphEmbryo->saveStreamEmbryo(splitterStreamEmbryo); 00244 00245
00246 for (int i = 0; i < interStreamEmbryos.size(); i++) { 00247 for (int j = 0; j < interStreamEmbryos[i].size(); j++) { 00248 pGraphEmbryo->saveStreamEmbryo(interStreamEmbryos[i][j]); 00249 } 00250 00251
00252 for (int j = 1; j < interStreamEmbryos[i].size(); j++) { 00253 pGraphEmbryo->addDataflow( 00254 interStreamEmbryos[i][j - 1].getStream()->getName(), 00255 interStreamEmbryos[i][j].getStream()->getName()); 00256 } 00257 } 00258 00259 pGraphEmbryo->saveStreamEmbryo(destStreamEmbryo); 00260 00261 pGraphEmbryo->addDataflow( 00262 srcStreamEmbryo.getStream()->getName(), 00263 splitterStreamEmbryo.getStream()->getName()); 00264 00265
00266 for (int i = 0; i < interStreamEmbryos.size(); i++) { 00267 pGraphEmbryo->addDataflow( 00268 splitterStreamEmbryo.getStream()->getName(), 00269 interStreamEmbryos[i][0].getStream()->getName()); 00270 pGraphEmbryo->addDataflow( 00271 interStreamEmbryos[i].back().getStream()->getName(), 00272 destStreamEmbryo.getStream()->getName()); 00273 } 00274 00275 SharedExecStream pAdaptedStream; 00276 00277 if (createSink) { 00278 pAdaptedStream = pGraphEmbryo->addAdapterFor( 00279 destStreamEmbryo.getStream()->getName(), 0, 00280 BUFPROV_PRODUCER); 00281 pGraph->addOutputDataflow(pAdaptedStream->getStreamId()); 00282 00283 pGraphEmbryo->prepareGraph(shared_from_this(), ""); 00284 } 00285 00286 return pAdaptedStream; 00287 } 00288 00289 void ExecStreamUnitTestBase::testCaseSetUp() 00290 { 00291 ExecStreamTestBase::testCaseSetUp(); 00292 openRandomSegment(); 00293 pGraph = newStreamGraph(); 00294 pGraphEmbryo = newStreamGraphEmbryo(pGraph); 00295 pGraph->setResourceGovernor(pResourceGovernor); 00296 00297
00298
00299
00300
00301 pCacheAccessor.reset( 00302 new TransactionalCacheAccessor(pCache)); 00303 } 00304 00305 void ExecStreamUnitTestBase::resetExecStreamTest() 00306 { 00307 if (pScheduler) { 00308 pScheduler->stop(); 00309 } 00310 tearDownExecStreamTest(); 00311 00312 pScheduler.reset(newScheduler()); 00313 pGraph = newStreamGraph(); 00314 pGraphEmbryo = newStreamGraphEmbryo(pGraph); 00315 pGraph->setResourceGovernor(pResourceGovernor); 00316 } 00317 00318 00319 void ExecStreamUnitTestBase::tearDownExecStreamTest() 00320 { 00321 pGraph.reset(); 00322 pGraphEmbryo.reset(); 00323 } 00324 00325 void ExecStreamUnitTestBase::verifyOutput( 00326 ExecStream &stream, 00327 uint nRowsExpected, 00328 MockProducerExecStreamGenerator &generator, 00329 bool stopEarly) 00330 { 00331
00332 00333 pResourceGovernor->requestResources(
pGraph); 00334 pGraph->open(); 00335 pScheduler->start(); 00336 uint nRows = 0; 00337 for (;;) { 00338 ExecStreamBufAccessor &bufAccessor = 00339 pScheduler->readStream(stream); 00340 if (bufAccessor.getState() == EXECBUF_EOS) { 00341 break; 00342 } 00343 BOOST_REQUIRE(bufAccessor.isConsumptionPossible()); 00344 const uint nCol = 00345 bufAccessor.getConsumptionTupleAccessor().size(); 00346 BOOST_REQUIRE(nCol == bufAccessor.getTupleDesc().size()); 00347 BOOST_REQUIRE(nCol >= 1); 00348 TupleData inputTuple; 00349 inputTuple.compute(bufAccessor.getTupleDesc()); 00350 for (;;) { 00351 if (!bufAccessor.demandData()) { 00352 break; 00353 } 00354 BOOST_REQUIRE(nRows < nRowsExpected); 00355 bufAccessor.unmarshalTuple(inputTuple); 00356 for (int col = 0; col < nCol; ++col) { 00357 int64_t actualValue = 00358 *reinterpret_cast<int64_t const *>(inputTuple[col].pData); 00359 int64_t expectedValue = generator.generateValue(nRows, col); 00360 if (actualValue != expectedValue) { 00361 std::cout << "(Row, Col) = (" << nRows << ", " << col <<")" 00362 << std::endl; 00363 BOOST_CHECK_EQUAL(expectedValue,actualValue); 00364 return; 00365 } 00366 } 00367 bufAccessor.consumeTuple(); 00368 ++nRows; 00369 if (stopEarly && nRows == nRowsExpected) { 00370 return; 00371 } 00372 } 00373 } 00374 BOOST_CHECK_EQUAL(nRowsExpected,nRows); 00375 } 00376 00377 void ExecStreamUnitTestBase::verifyConstantOutput( 00378 ExecStream &stream, 00379 const TupleData &expectedTuple, 00380 uint nRowsExpected) 00381 { 00382
00383 00384 pResourceGovernor->requestResources(
pGraph); 00385 pGraph->open(); 00386 pScheduler->start(); 00387 uint nRows = 0; 00388 for (;;) { 00389 ExecStreamBufAccessor &bufAccessor = 00390 pScheduler->readStream(stream); 00391 if (bufAccessor.getState() == EXECBUF_EOS) { 00392 break; 00393 } 00394 BOOST_REQUIRE(bufAccessor.isConsumptionPossible()); 00395 00396 if (!bufAccessor.demandData()) { 00397 break; 00398 } 00399 BOOST_REQUIRE(nRows < nRowsExpected); 00400 00401 TupleData actualTuple; 00402 actualTuple.compute(bufAccessor.getTupleDesc()); 00403 bufAccessor.unmarshalTuple(actualTuple); 00404 00405 int c = bufAccessor.getTupleDesc().compareTuples( 00406 expectedTuple, actualTuple); 00407 bufAccessor.consumeTuple(); 00408 ++nRows; 00409 if (c) { 00410 #if 1 00411 TupleDescriptor statusDesc = bufAccessor.getTupleDesc(); 00412 TuplePrinter tuplePrinter; 00413 tuplePrinter.print(std::cout, statusDesc, actualTuple); 00414 tuplePrinter.print(std::cout, statusDesc, expectedTuple); 00415 std::cout << std::endl; 00416 #endif 00417 BOOST_CHECK_EQUAL(0,c); 00418 break; 00419 } 00420 } 00421 BOOST_CHECK_EQUAL(nRowsExpected, nRows); 00422 } 00423 00424 void ExecStreamUnitTestBase::verifyBufferedOutput( 00425 ExecStream &stream, 00426 TupleDescriptor outputTupleDesc, 00427 uint nRowsExpected, 00428 PBuffer expectedBuffer) 00429 { 00430
00431 00432 TupleAccessor expectedOutputAccessor; 00433 expectedOutputAccessor.compute(outputTupleDesc); 00434 TupleData expectedTuple(outputTupleDesc); 00435 uint bufOffset = 0; 00436 pResourceGovernor->requestResources(
pGraph); 00437 pGraph->open(); 00438 pScheduler->start(); 00439 uint nRows = 0; 00440 for (;;) { 00441 ExecStreamBufAccessor &bufAccessor = 00442 pScheduler->readStream(stream); 00443 if (bufAccessor.getState() == EXECBUF_EOS) { 00444 break; 00445 } 00446 BOOST_REQUIRE(bufAccessor.getTupleDesc() == outputTupleDesc); 00447 BOOST_REQUIRE(bufAccessor.isConsumptionPossible()); 00448 const uint nCol = 00449 bufAccessor.getConsumptionTupleAccessor().size(); 00450 BOOST_REQUIRE(nCol == bufAccessor.getTupleDesc().size()); 00451 BOOST_REQUIRE(nCol >= 1); 00452 TupleData inputTuple; 00453 inputTuple.compute(bufAccessor.getTupleDesc()); 00454 for (;;) { 00455 if (!bufAccessor.demandData()) { 00456 break; 00457 } 00458 BOOST_REQUIRE(nRows < nRowsExpected); 00459 bufAccessor.unmarshalTuple(inputTuple); 00460 expectedOutputAccessor.setCurrentTupleBuf( 00461 expectedBuffer + bufOffset); 00462 expectedOutputAccessor.unmarshal(expectedTuple); 00463 int c = outputTupleDesc.compareTuples(inputTuple, expectedTuple); 00464 if (c) { 00465 std::cout << "(Row) = (" << nRows << ")" 00466 << " -- Tuples don't match"<< std::endl; 00467 BOOST_CHECK_EQUAL(0,c); 00468 return; 00469 } 00470 bufAccessor.consumeTuple(); 00471 bufOffset += expectedOutputAccessor.getCurrentByteCount(); 00472 ++nRows; 00473 } 00474 } 00475 BOOST_CHECK_EQUAL(nRowsExpected,nRows); 00476 } 00477 00478 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/test/ExecStreamUnitTestBase.cpp#23 $"); 00479 00480