Fennel: /home/pub/open/dev/fennel/test/LhxHashTableTest.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 #include "fennel/common/CommonPreamble.h" 00024 #include "fennel/test/SegStorageTestBase.h" 00025 #include "fennel/hashexe/LhxHashTable.h" 00026 #include "fennel/hashexe/LhxHashTableDump.h" 00027 #include "fennel/hashexe/LhxPartition.h" 00028 #include "fennel/tuple/StandardTypeDescriptor.h" 00029 #include "fennel/tuple/TupleDescriptor.h" 00030 #include "fennel/tuple/TuplePrinter.h" 00031 #include "fennel/cache/Cache.h" 00032 00033 #include <boost/scoped_array.hpp> 00034 #include <boost/test/test_tools.hpp> 00035 00036 using namespace fennel; 00037 00041 class LhxHashTableTest : virtual public SegStorageTestBase 00042 { 00043 StandardTypeDescriptorFactory stdTypeFactory; 00044 LhxHashInfo hashInfo; 00045 uint buildInputIndex; 00046 00047 uint writeHashTable( 00048 LhxHashInfo const &hashInfo, 00049 LhxHashTable &hashTable, 00050 SharedLhxPartition destPartition); 00051 00052 uint readPartition( 00053 LhxHashInfo &hashInfo, 00054 SharedLhxPartition srcPartition, 00055 ostringstream &dataTrace); 00056 00057 void testInsert( 00058 uint numRows, 00059 uint maxBlockCount, 00060 uint partitionLevel, 00061 VectorOfUint &repeatSeqValues, 00062 uint numKeyCols, 00063 uint numAggs, 00064 uint numDataCols, 00065 bool dumpHashTable, 00066 bool writeToPartition, 00067 uint recursivePartitioning, 00068 string testName); 00069 00070 public: 00071 explicit LhxHashTableTest() 00072 { 00073 FENNEL_UNIT_TEST_CASE(LhxHashTableTest, testInsert1Ka); 00074 FENNEL_UNIT_TEST_CASE(LhxHashTableTest, testInsert1Kb); 00075 } 00076 00077 virtual void testCaseSetUp(); 00078 virtual void testCaseTearDown(); 00079 00080 void testInsert1Ka(); 00081 void testInsert1Kb(); 00082 }; 00083 00084 void LhxHashTableTest::testCaseSetUp() 00085 { 00086 openStorage(DeviceMode::createNew); 00087 openRandomSegment(); 00088 hashInfo.externalSegmentAccessor.pSegment = pRandomSegment; 00089 hashInfo.externalSegmentAccessor.pCacheAccessor = pCache; 00090 hashInfo.memSegmentAccessor = 00091 pSegmentFactory->newScratchSegment(pCache, 100); 00092 } 00093 00094 void LhxHashTableTest::testCaseTearDown() 00095 { 00096 hashInfo.inputDesc.clear(); 00097 hashInfo.keyProj.clear(); 00098 hashInfo.isKeyColVarChar.clear(); 00099 00100 hashInfo.aggsProj.clear(); 00101 hashInfo.dataProj.clear(); 00102 00103 hashInfo.memSegmentAccessor.pSegment->deallocatePageRange( 00104 NULL_PAGE_ID, 00105 NULL_PAGE_ID); 00106 hashInfo.externalSegmentAccessor.reset(); 00107 hashInfo.memSegmentAccessor.reset(); 00108 SegStorageTestBase::testCaseTearDown(); 00109 } 00110 00111 uint LhxHashTableTest::writeHashTable( 00112 LhxHashInfo const &hashInfo, 00113 LhxHashTable &hashTable, 00114 SharedLhxPartition destPartition) 00115 { 00116 uint tuplesWritten = 0; 00117 LhxPartitionWriter writer; 00118 00119 LhxHashTableReader hashTableReader; 00120 hashTableReader.init(&hashTable, hashInfo, buildInputIndex); 00121 hashTableReader.bindKey(NULL); 00122 TupleData outputTuple; 00123 00124 outputTuple.compute(hashInfo.inputDesc[destPartition->inputIndex]); 00125 00126
00127 writer.open(destPartition, (LhxHashInfo const &)hashInfo); 00128 while (hashTableReader.getNext(outputTuple)) { 00129 writer.marshalTuple(outputTuple); 00130 tuplesWritten ++; 00131 } 00132 writer.close(); 00133 00134 return tuplesWritten; 00135 } 00136 00137 uint LhxHashTableTest::readPartition( 00138 LhxHashInfo &hashInfo, 00139 SharedLhxPartition srcPartition, 00140 ostringstream &dataTrace) 00141 { 00142 LhxPartitionReader reader; 00143 uint tuplesRead = 0; 00144 TupleData outputTuple; 00145 TuplePrinter tuplePrinter; 00146 TupleDescriptor &inputTupleDesc = hashInfo.inputDesc[1]; 00147 00148 outputTuple.compute(hashInfo.inputDesc[srcPartition->inputIndex]); 00149 00150 reader.open(srcPartition, (LhxHashInfo const &)hashInfo); 00151 00152 for (;;) { 00153 if (!reader.isTupleConsumptionPending()) { 00154 if (reader.getState() == EXECBUF_EOS) { 00155 break; 00156 } 00157 if (!reader.demandData()) { 00158 break; 00159 } 00160 reader.unmarshalTuple(outputTuple); 00161 00162 tuplePrinter.print(dataTrace, inputTupleDesc, outputTuple); 00163 dataTrace << "\n"; 00164 00165 tuplesRead ++; 00166 } 00167 00168 reader.consumeTuple(); 00169 } 00170 reader.close(); 00171 return tuplesRead; 00172 } 00173 00174 void LhxHashTableTest::testInsert( 00175 uint numRows, 00176 uint maxBlockCount, 00177 uint partitionLevel, 00178 VectorOfUint &repeatSeqValues, 00179 uint numKeyCols, 00180 uint numAggs, 00181 uint numDataCols, 00182 bool dumpHashTable, 00183 bool writeToPartition, 00184 uint recursivePartitioning, 00185 string testName) 00186 { 00187 LhxHashTable hashTable; 00188 00189 hashInfo.numCachePages = maxBlockCount; 00190 00191 TupleAttributeDescriptor attrDesc_int32 = 00192 TupleAttributeDescriptor( 00193 stdTypeFactory.newDataType(STANDARD_TYPE_INT_32)); 00194 TupleData inputTuple; 00195 00196 uint i, j; 00197 00198
00199 00200 00201 uint numCols = numKeyCols + numAggs + numDataCols; 00202 00203 boost::scoped_array colValues(new uint[numCols]); 00204 00205 TupleDescriptor inputDesc; 00206 TupleProjection keyProj; 00207 TupleProjection dataProj; 00208 std::vector isKeyVarChar; 00209 00210 for (i = 0; i < numCols; i ++) { 00211 inputDesc.push_back(attrDesc_int32); 00212 00213 if (i < numKeyCols) { 00214 keyProj.push_back(i); 00215 isKeyVarChar.push_back(HASH_TRIM_NONE); 00216 } else if (i < numKeyCols + numAggs) { 00217 hashInfo.aggsProj.push_back(i); 00218 } else { 00219 dataProj.push_back(i); 00220 } 00221 } 00222 00223
00224 00225 00226 00227 uint cndKeys = 1; 00228 for (i = 0; i < numKeyCols; i ++) { 00229 cndKeys *= repeatSeqValues[i]; 00230 } 00231 00232 uint numInputs = 2; 00233 buildInputIndex = numInputs - 1; 00234 for (i = 0; i < numInputs; i ++) { 00235 hashInfo.inputDesc.push_back(inputDesc); 00236 hashInfo.keyProj.push_back(keyProj); 00237 hashInfo.isKeyColVarChar.push_back(isKeyVarChar); 00238 hashInfo.dataProj.push_back(dataProj); 00239 hashInfo.useJoinFilter.push_back(false); 00240 hashInfo.filterNull.push_back(false); 00241
00242 TupleProjection filterNullKeyProj; 00243 hashInfo.filterNullKeyProj.push_back(filterNullKeyProj); 00244 hashInfo.removeDuplicate.push_back(false); 00245 hashInfo.numRows.push_back(numRows); 00246 hashInfo.cndKeys.push_back(cndKeys); 00247 } 00248 00249 TupleDescriptor &inputTupleDesc = hashInfo.inputDesc.back(); 00250 TupleProjection &keyColsProj = hashInfo.keyProj.back(); 00251 00252 hashTable.init(partitionLevel, hashInfo, buildInputIndex); 00253 00254 uint usablePageSize = 00255 (hashInfo.memSegmentAccessor.pSegment)->getUsablePageSize(); 00256 00257 hashTable.calculateNumSlots( 00258 cndKeys, usablePageSize, hashInfo.numCachePages); 00259 00260 bool status = hashTable.allocateResources(); 00261 00262 assert(status); 00263 00264 inputTuple.compute(inputTupleDesc); 00265 00266
00267 00268 00269 for (i = 0; i < numRows; i ++) { 00270 for (j = 0; j < numCols; j++) { 00271 colValues[j] = i % repeatSeqValues[j]; 00272 inputTuple[j].pData = (PBuffer)&(colValues[j]); 00273 } 00274 00275 status = 00276 hashTable.addTuple(inputTuple); 00277 00278 assert(status); 00279 } 00280 00281 LhxHashTableReader hashTableReader; 00282 hashTableReader.init(&hashTable, hashInfo, buildInputIndex); 00283 TupleData outputTuple; 00284 00285 outputTuple.compute(inputTupleDesc); 00286 00287 TuplePrinter tuplePrinter; 00288 ostringstream dataTrace; 00289 dataTrace << "All Inserted Tuples:\n"; 00290 uint numTuples = 0; 00291 00292
00293 00294 00295 while (hashTableReader.getNext(outputTuple)) { 00296 tuplePrinter.print(dataTrace, inputTupleDesc, outputTuple); 00297 dataTrace << "\n"; 00298 numTuples ++; 00299 } 00300 assert (numTuples == numRows); 00301 00302
00303 00304 00305 dataTrace << "All Matched Tuples:\n"; 00306 numTuples = 0; 00307
00308 00309 00310 00311 00312 00313 for (i = 0; i < numRows; i ++) { 00314 for (j = 0; j < numCols; j++) { 00315 colValues[j] = i % repeatSeqValues[j]; 00316 inputTuple[j].pData = (PBuffer)&(colValues[j]); 00317 } 00318 00319 PBuffer matchingKey = 00320 hashTable.findKey(inputTuple, keyColsProj, true); 00321 00322 if (matchingKey) { 00323 hashTableReader.bindKey(matchingKey); 00324 while (hashTableReader.getNext(outputTuple)) { 00325 tuplePrinter.print(dataTrace, inputTupleDesc, outputTuple); 00326 dataTrace << "\n"; 00327 numTuples ++; 00328 } 00329 } 00330 } 00331 00332 assert (numTuples == numRows); 00333 00334 if (dumpHashTable) { 00335 LhxHashTableDump hashTableDump( 00336 TRACE_INFO, 00337 shared_from_this(), 00338 "LhxHashTableTest"); 00339 hashTableDump.dump(hashTable); 00340 hashTableDump.dump(dataTrace.str()); 00341 } 00342 00343 if (writeToPartition) { 00344 SharedLhxPartition partition = 00345 SharedLhxPartition(new LhxPartition(NULL)); 00346 partition->inputIndex = 1; 00347 00348
00349 uint tuplesWritten = 00350 writeHashTable( 00351 (LhxHashInfo const &)hashInfo, 00352 hashTable, 00353 partition); 00354 00355
00356 ostringstream dataTrace; 00357 00358 dataTrace << "[Tuples read from partitions-1]\n"; 00359 00360 uint tuplesRead = 00361 readPartition(hashInfo, partition, dataTrace); 00362 00363 if (dumpHashTable) { 00364 LhxHashTableDump hashTableDump( 00365 TRACE_INFO, 00366 shared_from_this(), 00367 "LhxHashTableTest"); 00368 hashTableDump.dump(dataTrace.str()); 00369 } 00370
00371 assert (tuplesWritten == numRows && tuplesRead == tuplesWritten); 00372 } 00373 00374 if (recursivePartitioning > 0) { 00375
00376
00377
00378
00379
00380
00381 00382
00383
00384 std::vector partitions; 00385 uint tuplesWritten[2]; 00386 00387
00388 for (int j = 0; j < 2; j ++) { 00389 partitions.push_back( 00390 SharedLhxPartition(new LhxPartition(NULL))); 00391 partitions[j]->inputIndex = 1; 00392 tuplesWritten[j] = 00393 writeHashTable( 00394 (LhxHashInfo const &)hashInfo, 00395 hashTable, 00396 partitions[j]); 00397 } 00398 00399 assert (tuplesWritten[0] == numRows && 00400 tuplesWritten[0] == tuplesWritten[1]); 00401 00402 uint tuplesRead[2]; 00403 SharedLhxPlan plan = SharedLhxPlan(new LhxPlan()); 00404 00405 plan->init(WeakLhxPlan(), 0, partitions, false); 00406 00407 LhxPlan *leafPlan; 00408 uint numLeafPlanCreated = 1; 00409 uint numLeafPlanRead = 0; 00410 00411 for (int i = 0; i < recursivePartitioning; i ++) { 00412 numLeafPlanCreated *= LhxPlan::LhxChildPartCount; 00413 numLeafPlanRead = 0; 00414 leafPlan = plan->getFirstLeaf(); 00415 00416
00417 while (leafPlan) { 00418 leafPlan->createChildren(hashInfo, false); 00419
00420
00421 leafPlan = leafPlan->getFirstLeaf(); 00422 for (int k = 0; k < LhxPlan::LhxChildPartCount; k ++) { 00423 leafPlan = leafPlan->getNextLeaf(); 00424 } 00425 } 00426 } 00427 00428
00429
00430
00431 00432 tuplesRead[0] = 0; 00433 tuplesRead[1] = 0; 00434 00435
00436 leafPlan = plan->getFirstLeaf(); 00437 00438 while (leafPlan) { 00439 numLeafPlanRead ++; 00440 for (int j = 0; j < 2; j ++) { 00441 ostringstream dataTrace; 00442 dataTrace << "[Tuples read from partitions-2]" 00443 << "recursion depth" << recursivePartitioning 00444 << "inputindex " << j << "\n"; 00445 tuplesRead[j] += 00446 readPartition( 00447 hashInfo, 00448 leafPlan->getPartition(j), 00449 dataTrace); 00450 if (dumpHashTable) { 00451 LhxHashTableDump hashTableDump( 00452 TRACE_INFO, 00453 shared_from_this(), 00454 "LhxHashTableTest"); 00455 hashTableDump.dump(dataTrace.str()); 00456 } 00457 } 00458 leafPlan = leafPlan->getNextLeaf(); 00459 } 00460 00461 assert (numLeafPlanRead == numLeafPlanCreated); 00462 assert ((tuplesRead[0] == tuplesRead[1]) && 00463 (tuplesRead[0] == numRows)); 00464 } 00465 00466 hashTable.releaseResources(); 00467 colValues.reset(); 00468 } 00469 00470 void LhxHashTableTest::testInsert1Ka() 00471 { 00472 uint numRows = 100; 00473 uint maxBlockCount = 10; 00474 uint partitionLevel = 0; 00475 VectorOfUint values; 00476 uint numKeyCols = 1; 00477 uint numAggs = 0; 00478 uint numDataCols = 0; 00479 bool dumpHashTable = true; 00480 bool writeToPartition = true; 00481 uint recursivePartitioning = 3; 00482 string testName = "testInsert1K"; 00483 uint i; 00484 00485 for (i = 0; i < numKeyCols; i ++) { 00486
00487 00488 00489 values.push_back(i + 10); 00490 } 00491 00492 for (i = 0; i < numAggs; i ++) { 00493 values.push_back(10); 00494 } 00495 00496 for (i = 0; i < numDataCols; i ++) { 00497 values.push_back(i + 1); 00498 } 00499 00500 testInsert( 00501 numRows, maxBlockCount, partitionLevel, 00502 values, numKeyCols, numAggs, numDataCols, 00503 dumpHashTable, writeToPartition, recursivePartitioning, 00504 testName); 00505 } 00506 00507 void LhxHashTableTest::testInsert1Kb() 00508 { 00509 uint numRows = 1000; 00510 uint maxBlockCount = 10; 00511 uint partitionLevel = 0; 00512 VectorOfUint values; 00513 uint numKeyCols = 2; 00514 uint numAggs = 0; 00515 uint numDataCols = 4; 00516 bool dumpHashTable = true; 00517 bool writeToPartition = true; 00518 uint recursivePartitioning = 3; 00519 string testName = "testInsert1K"; 00520 uint i; 00521 00522 for (i = 0; i < numKeyCols; i ++) { 00523
00524 00525 00526 values.push_back(i + 10); 00527 } 00528 00529 for (i = 0; i < numAggs; i ++) { 00530 values.push_back(10); 00531 } 00532 00533 for (i = 0; i < numDataCols; i ++) { 00534 values.push_back(i + 1); 00535 } 00536 00537 testInsert( 00538 numRows, maxBlockCount, partitionLevel, 00539 values, numKeyCols, numAggs, numDataCols, 00540 dumpHashTable, writeToPartition, recursivePartitioning, 00541 testName); 00542 } 00543 00544 FENNEL_UNIT_TEST_SUITE(LhxHashTableTest); 00545 00546 00547