Fennel: /home/pub/open/dev/fennel/test/ExecStreamGovernorTest.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/test/ExecStreamUnitTestBase.h" 00026 #include "fennel/segment/ScratchMemExcn.h" 00027 #include "fennel/exec/ExecStreamScheduler.h" 00028 #include "fennel/exec/ExecStream.h" 00029 #include "fennel/exec/ExecStreamGraph.h" 00030 #include "fennel/exec/ExecStreamBufAccessor.h" 00031 #include "fennel/exec/ExecStreamGovernor.h" 00032 #include "fennel/exec/MockResourceExecStream.h" 00033 #include "fennel/exec/BarrierExecStream.h" 00034 #include "fennel/exec/ExecStreamEmbryo.h" 00035 #include "fennel/tuple/StandardTypeDescriptor.h" 00036 00037 #include <boost/test/test_tools.hpp> 00038 00039 using namespace fennel; 00040 00044 class ExecStreamGovernorTest : public ExecStreamUnitTestBase 00045 { 00061 void testGovernor( 00062 uint nProducers, 00063 std::vector const &minReqts, 00064 std::vector const &optReqts, 00065 std::vector optTypes, 00066 std::vector expected, 00067 bool exception = false); 00068 00069 public: 00070 explicit ExecStreamGovernorTest() 00071 { 00072 FENNEL_UNIT_TEST_CASE(ExecStreamGovernorTest, testOptLessAccurate); 00073 FENNEL_UNIT_TEST_CASE(ExecStreamGovernorTest, testOptLessEstimate); 00074 FENNEL_UNIT_TEST_CASE(ExecStreamGovernorTest, testOptEqualEstimate); 00075 FENNEL_UNIT_TEST_CASE(ExecStreamGovernorTest, testInBetween); 00076 FENNEL_UNIT_TEST_CASE( 00077 ExecStreamGovernorTest, testMinEqualAllocation); 00078 FENNEL_UNIT_TEST_CASE( 00079 ExecStreamGovernorTest, testMinGreaterAllocation); 00080 FENNEL_UNIT_TEST_CASE( 00081 ExecStreamGovernorTest, testMinGreaterAvailable); 00082 FENNEL_UNIT_TEST_CASE(ExecStreamGovernorTest, testReturnResources); 00083 } 00084 00085 void testOptLessAccurate(); 00086 void testOptLessEstimate(); 00087 void testOptEqualEstimate(); 00088 void testInBetween(); 00089 void testMinGreaterAllocation(); 00090 void testMinEqualAllocation(); 00091 void testMinGreaterAvailable(); 00092 void testReturnResources(); 00093 00094 virtual void testCaseSetUp(); 00095 }; 00096 00097 void ExecStreamGovernorTest::testCaseSetUp() 00098 { 00099 ExecStreamUnitTestBase::testCaseSetUp(); 00100 00101
00102 ExecStreamResourceQuantity quantity; 00103 quantity.nCachePages = 100; 00104 pResourceGovernor->setResourceAvailability( 00105 quantity, EXEC_RESOURCE_CACHE_PAGES); 00106 00107 ExecStreamResourceKnobs knob; 00108 knob.expectedConcurrentStatements = 1; 00109 pResourceGovernor->setResourceKnob( 00110 knob, EXEC_KNOB_EXPECTED_CONCURRENT_STATEMENTS); 00111 } 00112 00118 void ExecStreamGovernorTest::testOptLessAccurate() 00119 { 00120 uint nProducers = 2; 00121 std::vector minReqts; 00122 std::vector optReqts; 00123 std::vector expected; 00124 std::vector optTypes; 00125 00126 ExecStreamResourceQuantity quantity; 00127 ExecStreamResourceSettingType optType; 00128 00129
00130 quantity.nCachePages = 10; 00131 minReqts.push_back(quantity); 00132 quantity.nCachePages = 15; 00133 optReqts.push_back(quantity); 00134 expected.push_back(quantity); 00135 optType = EXEC_RESOURCE_ACCURATE; 00136 optTypes.push_back(optType); 00137 00138
00139 quantity.nCachePages = 20; 00140 minReqts.push_back(quantity); 00141 quantity.nCachePages = 40; 00142 optReqts.push_back(quantity); 00143 expected.push_back(quantity); 00144 optType = EXEC_RESOURCE_ACCURATE; 00145 optTypes.push_back(optType); 00146 00147 testGovernor(nProducers, minReqts, optReqts, optTypes, expected); 00148 } 00149 00154 void ExecStreamGovernorTest::testOptLessEstimate() 00155 { 00156 uint nProducers = 4; 00157 std::vector minReqts; 00158 std::vector optReqts; 00159 std::vector expected; 00160 std::vector optTypes; 00161 00162 ExecStreamResourceQuantity quantity; 00163 ExecStreamResourceSettingType optType; 00164 00165
00166
00167 00168
00169 quantity.nCachePages = 10; 00170 minReqts.push_back(quantity); 00171 quantity.nCachePages = 11; 00172 optReqts.push_back(quantity); 00173 expected.push_back(quantity); 00174 optType = EXEC_RESOURCE_ACCURATE; 00175 optTypes.push_back(optType); 00176 00177
00178 quantity.nCachePages = 15; 00179 minReqts.push_back(quantity); 00180 quantity.nCachePages = 17; 00181 optReqts.push_back(quantity); 00182 expected.push_back(quantity); 00183 optType = EXEC_RESOURCE_ACCURATE; 00184 optTypes.push_back(optType); 00185 00186
00187 quantity.nCachePages = 20; 00188 minReqts.push_back(quantity); 00189 quantity.nCachePages = 23; 00190 optReqts.push_back(quantity); 00191 quantity.nCachePages = 38; 00192 expected.push_back(quantity); 00193 optType = EXEC_RESOURCE_ESTIMATE; 00194 optTypes.push_back(optType); 00195 00196
00197 quantity.nCachePages = 25; 00198 minReqts.push_back(quantity); 00199 quantity.nCachePages = 29; 00200 optReqts.push_back(quantity); 00201 expected.push_back(quantity); 00202 optType = EXEC_RESOURCE_ACCURATE; 00203 optTypes.push_back(optType); 00204 00205 testGovernor(nProducers, minReqts, optReqts, optTypes, expected); 00206 } 00207 00212 void ExecStreamGovernorTest::testOptEqualEstimate() 00213 { 00214 uint nProducers = 4; 00215 std::vector minReqts; 00216 std::vector optReqts; 00217 std::vector expected; 00218 std::vector optTypes; 00219 00220 ExecStreamResourceQuantity quantity; 00221 ExecStreamResourceSettingType optType; 00222 00223
00224
00225 00226
00227 quantity.nCachePages = 10; 00228 minReqts.push_back(quantity); 00229 quantity.nCachePages = 20; 00230 optReqts.push_back(quantity); 00231 expected.push_back(quantity); 00232 optType = EXEC_RESOURCE_ESTIMATE; 00233 optTypes.push_back(optType); 00234 00235
00236 quantity.nCachePages = 15; 00237 minReqts.push_back(quantity); 00238 quantity.nCachePages = 17; 00239 optReqts.push_back(quantity); 00240 expected.push_back(quantity); 00241 optType = EXEC_RESOURCE_ACCURATE; 00242 optTypes.push_back(optType); 00243 00244
00245 quantity.nCachePages = 20; 00246 minReqts.push_back(quantity); 00247 quantity.nCachePages = 23; 00248 optReqts.push_back(quantity); 00249 expected.push_back(quantity); 00250 optType = EXEC_RESOURCE_ACCURATE; 00251 optTypes.push_back(optType); 00252 00253
00254 quantity.nCachePages = 25; 00255 minReqts.push_back(quantity); 00256 quantity.nCachePages = 35; 00257 optReqts.push_back(quantity); 00258 expected.push_back(quantity); 00259 optType = EXEC_RESOURCE_ESTIMATE; 00260 optTypes.push_back(optType); 00261 00262 testGovernor(nProducers, minReqts, optReqts, optTypes, expected); 00263 } 00264 00269 void ExecStreamGovernorTest::testInBetween() 00270 { 00271 uint nProducers = 4; 00272 std::vector minReqts; 00273 std::vector optReqts; 00274 std::vector expected; 00275 std::vector optTypes; 00276 00277 ExecStreamResourceQuantity quantity; 00278 ExecStreamResourceSettingType optType; 00279 00280
00281
00282 00283
00284 quantity.nCachePages = 10; 00285 minReqts.push_back(quantity); 00286 quantity.nCachePages = 25; 00287 optReqts.push_back(quantity); 00288 quantity.nCachePages = 14; 00289 expected.push_back(quantity); 00290 optType = EXEC_RESOURCE_ACCURATE; 00291 optTypes.push_back(optType); 00292 00293
00294 quantity.nCachePages = 15; 00295 minReqts.push_back(quantity); 00296 quantity.nCachePages = 31; 00297 optReqts.push_back(quantity); 00298 quantity.nCachePages = 19; 00299 expected.push_back(quantity); 00300 optType = EXEC_RESOURCE_ESTIMATE; 00301 optTypes.push_back(optType); 00302 00303
00304 quantity.nCachePages = 20; 00305 minReqts.push_back(quantity); 00306 quantity.nCachePages = 0; 00307 optReqts.push_back(quantity); 00308 quantity.nCachePages = 31; 00309 expected.push_back(quantity); 00310 optType = EXEC_RESOURCE_UNBOUNDED; 00311 optTypes.push_back(optType); 00312 00313
00314 quantity.nCachePages = 25; 00315 minReqts.push_back(quantity); 00316 quantity.nCachePages = 42; 00317 optReqts.push_back(quantity); 00318 quantity.nCachePages = 29; 00319 expected.push_back(quantity); 00320 optType = EXEC_RESOURCE_ESTIMATE; 00321 optTypes.push_back(optType); 00322 00323 testGovernor(nProducers, minReqts, optReqts, optTypes, expected); 00324 } 00325 00330 void ExecStreamGovernorTest::testMinGreaterAllocation() 00331 { 00332
00333
00334 ExecStreamResourceQuantity quantity; 00335 quantity.nCachePages = 200; 00336 pResourceGovernor->setResourceAvailability( 00337 quantity, EXEC_RESOURCE_CACHE_PAGES); 00338 ExecStreamResourceKnobs knob; 00339 knob.expectedConcurrentStatements = 2; 00340 pResourceGovernor->setResourceKnob( 00341 knob, EXEC_KNOB_EXPECTED_CONCURRENT_STATEMENTS); 00342 00343 uint nProducers = 2; 00344 std::vector minReqts; 00345 std::vector optReqts; 00346 std::vector expected; 00347 std::vector optTypes; 00348 00349 ExecStreamResourceSettingType optType; 00350 00351
00352 quantity.nCachePages = 50; 00353 minReqts.push_back(quantity); 00354 optReqts.push_back(quantity); 00355 expected.push_back(quantity); 00356 optType = EXEC_RESOURCE_ACCURATE; 00357 optTypes.push_back(optType); 00358 00359
00360 quantity.nCachePages = 55; 00361 minReqts.push_back(quantity); 00362 optReqts.push_back(quantity); 00363 expected.push_back(quantity); 00364 optType = EXEC_RESOURCE_ACCURATE; 00365 optTypes.push_back(optType); 00366 00367 testGovernor(nProducers, minReqts, optReqts, optTypes, expected); 00368 } 00369 00374 void ExecStreamGovernorTest::testMinEqualAllocation() 00375 { 00376 uint nProducers = 2; 00377 std::vector minReqts; 00378 std::vector optReqts; 00379 std::vector expected; 00380 std::vector optTypes; 00381 00382 ExecStreamResourceQuantity quantity; 00383 ExecStreamResourceSettingType optType; 00384 00385
00386
00387 quantity.nCachePages = 50; 00388 minReqts.push_back(quantity); 00389 quantity.nCachePages = 60; 00390 optReqts.push_back(quantity); 00391 quantity.nCachePages = 50; 00392 expected.push_back(quantity); 00393 optType = EXEC_RESOURCE_ACCURATE; 00394 optTypes.push_back(optType); 00395 00396
00397 quantity.nCachePages = 45; 00398 minReqts.push_back(quantity); 00399 quantity.nCachePages = 50; 00400 optReqts.push_back(quantity); 00401 quantity.nCachePages = 45; 00402 expected.push_back(quantity); 00403 optType = EXEC_RESOURCE_ACCURATE; 00404 optTypes.push_back(optType); 00405 00406 testGovernor(nProducers, minReqts, optReqts, optTypes, expected); 00407 } 00408 00413 void ExecStreamGovernorTest::testMinGreaterAvailable() 00414 { 00415 uint nProducers = 2; 00416 std::vector minReqts; 00417 std::vector optReqts; 00418 std::vector expected; 00419 std::vector optTypes; 00420 00421 ExecStreamResourceQuantity quantity; 00422 ExecStreamResourceSettingType optType; 00423 00424
00425
00426 quantity.nCachePages = 50; 00427 minReqts.push_back(quantity); 00428 optReqts.push_back(quantity); 00429 expected.push_back(quantity); 00430 optType = EXEC_RESOURCE_ACCURATE; 00431 optTypes.push_back(optType); 00432 00433
00434 quantity.nCachePages = 46; 00435 minReqts.push_back(quantity); 00436 optReqts.push_back(quantity); 00437 expected.push_back(quantity); 00438 optType = EXEC_RESOURCE_ACCURATE; 00439 optTypes.push_back(optType); 00440 00441 testGovernor(nProducers, minReqts, optReqts, optTypes, expected, true); 00442 } 00443 00449 void ExecStreamGovernorTest::testReturnResources() 00450 { 00451 uint nProducers = 2; 00452 std::vector minReqts; 00453 std::vector optReqts; 00454 std::vector expected; 00455 std::vector optTypes; 00456 00457 ExecStreamResourceQuantity quantity; 00458 ExecStreamResourceSettingType optType; 00459 00460
00461 quantity.nCachePages = 45; 00462 minReqts.push_back(quantity); 00463 optReqts.push_back(quantity); 00464 expected.push_back(quantity); 00465 optType = EXEC_RESOURCE_ACCURATE; 00466 optTypes.push_back(optType); 00467 00468
00469 quantity.nCachePages = 45; 00470 minReqts.push_back(quantity); 00471 optReqts.push_back(quantity); 00472 expected.push_back(quantity); 00473 optType = EXEC_RESOURCE_ACCURATE; 00474 optTypes.push_back(optType); 00475 00476 testGovernor(nProducers, minReqts, optReqts, optTypes, expected); 00477 00478 resetExecStreamTest(); 00479 minReqts.clear(); 00480 optReqts.clear(); 00481 expected.clear(); 00482 optTypes.clear(); 00483 00484
00485 quantity.nCachePages = 45; 00486 minReqts.push_back(quantity); 00487 optReqts.push_back(quantity); 00488 expected.push_back(quantity); 00489 optType = EXEC_RESOURCE_ACCURATE; 00490 optTypes.push_back(optType); 00491 00492
00493 quantity.nCachePages = 50; 00494 minReqts.push_back(quantity); 00495 optReqts.push_back(quantity); 00496 expected.push_back(quantity); 00497 optType = EXEC_RESOURCE_ACCURATE; 00498 optTypes.push_back(optType); 00499 00500 testGovernor(nProducers, minReqts, optReqts, optTypes, expected); 00501 } 00502 00503 void ExecStreamGovernorTest::testGovernor( 00504 uint nProducers, 00505 std::vector const &minReqts, 00506 std::vector const &optReqts, 00507 std::vector optTypes, 00508 std::vector expected, 00509 bool exception) 00510 { 00511 StandardTypeDescriptorFactory stdTypeFactory; 00512 TupleAttributeDescriptor int8AttrDesc( 00513 stdTypeFactory.newDataType(STANDARD_TYPE_INT_8)); 00514 00515 std::vector producerStreamEmbryos; 00516 for (uint i = 0; i < nProducers; i++) { 00517 MockResourceExecStreamParams producerParams; 00518 producerParams.minReqt = minReqts[i]; 00519 producerParams.optReqt = optReqts[i]; 00520 producerParams.optTypeInput = optTypes[i]; 00521 producerParams.expected = expected[i]; 00522 00523
00524
00525 producerParams.scratchAccessor = 00526 pSegmentFactory->newScratchSegment(pCache, expected[i].nCachePages); 00527 producerParams.pCacheAccessor = pCache; 00528 producerParams.outputTupleDesc.push_back(int8AttrDesc); 00529 00530 ExecStreamEmbryo producerStreamEmbryo; 00531 producerStreamEmbryo.init( 00532 new MockResourceExecStream(), producerParams); 00533 std::ostringstream oss; 00534 oss << "MockResourceExecStream" << "#" << i; 00535 producerStreamEmbryo.getStream()->setName(oss.str()); 00536 producerStreamEmbryos.push_back(producerStreamEmbryo); 00537 } 00538 00539 BarrierExecStreamParams barrierParams; 00540 barrierParams.outputTupleDesc.push_back(int8AttrDesc); 00541 barrierParams.returnMode = BARRIER_RET_ANY_INPUT; 00542 00543 ExecStreamEmbryo barrierStreamEmbryo; 00544 barrierStreamEmbryo.init(new BarrierExecStream(), barrierParams); 00545 barrierStreamEmbryo.getStream()->setName("BarrierExecStream"); 00546 00547 SharedExecStream pOutputStream = prepareConfluenceGraph( 00548 producerStreamEmbryos, barrierStreamEmbryo); 00549 00550 int8_t expectedOutput = 1; 00551 TupleData expectedTuple; 00552 expectedTuple.compute(barrierParams.outputTupleDesc); 00553 expectedTuple[0].pData = (PConstBuffer) &expectedOutput; 00554 00555
00556 try { 00557 verifyConstantOutput(*pOutputStream, expectedTuple, 1); 00558 if (exception) { 00559 BOOST_FAIL("Cache memory not exhausted"); 00560 } 00561 } catch (FennelExcn &ex) { 00562 std::string errMsg = ex.getMessage(); 00563 if (errMsg.compare(ScratchMemExcn().getMessage()) != 0) { 00564 BOOST_FAIL("Wrong exception returned"); 00565 } 00566 } 00567 } 00568 00569 FENNEL_UNIT_TEST_SUITE(ExecStreamGovernorTest); 00570 00571