Fennel: /home/pub/open/dev/fennel/ftrs/BTreePrefetchSearchExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/btree/BTreeNonLeafReader.h" 00026 #include "fennel/btree/BTreeLeafReader.h" 00027 #include "fennel/btree/BTreeNodeAccessor.h" 00028 #include "fennel/segment/SegPageEntryIterImpl.h" 00029 #include "fennel/exec/ExecStreamBufAccessor.h" 00030 #include "fennel/ftrs/BTreePrefetchSearchExecStream.h" 00031 00032 #include "math.h" 00033 00034 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/ftrs/BTreePrefetchSearchExecStream.cpp#7 $"); 00035 00036 BTreePrefetchSearchExecStream::BTreePrefetchSearchExecStream() : leafPageQueue() 00037 { 00038 } 00039 00040 void BTreePrefetchSearchExecStream::prepare( 00041 BTreePrefetchSearchExecStreamParams const &params) 00042 { 00043 BTreeSearchExecStream::prepare(params); 00044 00045 savedLowerBoundAccessor.compute(inputKeyDesc); 00046 pfLowerBoundData.compute(inputKeyDesc); 00047 if (upperBoundDesc.size() == 0) { 00048 savedUpperBoundAccessor.compute(inputKeyDesc); 00049 pfUpperBoundData.compute(inputKeyDesc); 00050 } else { 00051 savedUpperBoundAccessor.compute(upperBoundDesc); 00052 pfUpperBoundData.compute(upperBoundDesc); 00053 } 00054 00055 scratchLock.accessSegment(scratchAccessor); 00056 scratchPageSize = scratchAccessor.pSegment->getUsablePageSize(); 00057 } 00058 00059 void BTreePrefetchSearchExecStream::getResourceRequirements( 00060 ExecStreamResourceQuantity &minQuantity, 00061 ExecStreamResourceQuantity &optQuantity, 00062 ExecStreamResourceSettingType &optType) 00063 { 00064 BTreeSearchExecStream::getResourceRequirements(minQuantity, optQuantity); 00065 00066
00067 minQuantity.nCachePages += 1; 00068 optQuantity.nCachePages += 1; 00069 nMiscCachePages = minQuantity.nCachePages; 00070 00071
00072
00073
00074
00075 minQuantity.nCachePages += 1; 00076 keyValuesSize = savedLowerBoundAccessor.getMaxByteCount() * 2; 00077 00078
00079
00080
00081
00082 if (keyValuesSize > scratchPageSize) { 00083 bigMaxKey = true; 00084 keyValuesSize = scratchPageSize; 00085 minQuantity.nCachePages += 1; 00086 } else { 00087 bigMaxKey = false; 00088 } 00089 00090 nEntriesPerScratchPage = scratchPageSize / keyValuesSize; 00091 uint optNumPages = (uint) ceil(8192.0 / nEntriesPerScratchPage); 00092 assert(optNumPages >= 1); 00093 optQuantity.nCachePages += optNumPages; 00094 optType = EXEC_RESOURCE_ACCURATE; 00095 } 00096 00097 void BTreePrefetchSearchExecStream::setResourceAllocation( 00098 ExecStreamResourceQuantity &quantity) 00099 { 00100 BTreeSearchExecStream::setResourceAllocation(quantity); 00101 nPrefetchScratchPages = quantity.nCachePages - nMiscCachePages; 00102 } 00103 00104 void BTreePrefetchSearchExecStream::open(bool restart) 00105 { 00106 BTreeSearchExecStream::open(restart); 00107 pNonLeafReader = 00108 SharedBTreeNonLeafReader(new BTreeNonLeafReader(treeDescriptor)); 00109 initialPrefetchDone = false; 00110 processingDone = false; 00111 prevLeafSearchRc = true; 00112 returnedRoot = false; 00113 rootOnly = (pNonLeafReader->isRootOnly()); 00114 00115 if (!restart) { 00116 uint nPrefetchEntries = 00117 (bigMaxKey) ? 00118 nPrefetchScratchPages / 2 : 00119 nPrefetchScratchPages * nEntriesPerScratchPage; 00120 leafPageQueue.resize(nPrefetchEntries); 00121 allocateScratchPages(); 00122 leafPageQueue.setPrefetchSource(*this); 00123 } 00124 } 00125 00126 SharedBTreeReader BTreePrefetchSearchExecStream::newReader() 00127 { 00128 pLeafReader = SharedBTreeLeafReader(new BTreeLeafReader(treeDescriptor)); 00129 pBTreeAccessBase = pBTreeReader = pLeafReader; 00130 return pBTreeReader; 00131 } 00132 00133 void BTreePrefetchSearchExecStream::allocateScratchPages() 00134 { 00135 for (uint i = 0; i < nPrefetchScratchPages; i++) { 00136 scratchLock.allocatePage(); 00137 PBuffer page = scratchLock.getPage().getWritableData(); 00138 scratchPages.push_back(page); 00139 } 00140 currPage = 0; 00141 currPageEntry = 0; 00142 } 00143 00144 void BTreePrefetchSearchExecStream::initPrefetchEntry( 00145 BTreePrefetchSearchKey &searchKey) 00146 { 00147 if (bigMaxKey) { 00148 searchKey.lowerKeyBuffer = scratchPages[currPage++]; 00149 searchKey.upperKeyBuffer = scratchPages[currPage++]; 00150 } else { 00151 searchKey.lowerKeyBuffer = 00152 scratchPages[currPage] + currPageEntry * keyValuesSize; 00153 searchKey.upperKeyBuffer = searchKey.lowerKeyBuffer + keyValuesSize / 2; 00154 if (++currPageEntry == nEntriesPerScratchPage) { 00155 currPage++; 00156 currPageEntry = 0; 00157 } 00158 } 00159 } 00160 00161 ExecStreamResult BTreePrefetchSearchExecStream::execute( 00162 ExecStreamQuantum const &quantum) 00163 { 00164 uint nTuples = 0; 00165 00166
00167
00168 for (;;) { 00169
00170 if (innerSearchLoop()) { 00171 return EXECRC_BUF_UNDERFLOW; 00172 } 00173 00174
00175
00176 if (pInAccessor->getState() == EXECBUF_EOS && processingDone) { 00177 pOutAccessor->markEOS(); 00178 return EXECRC_EOS; 00179 } 00180 00181
00182
00183 ExecStreamResult rc = innerFetchLoop(quantum, nTuples); 00184 if (rc != EXECRC_YIELD) { 00185 return rc; 00186 } 00187 } 00188 } 00189 00190 bool BTreePrefetchSearchExecStream::innerSearchLoop() 00191 { 00192
00193
00194 while (pReader->isPositioned()) { 00195
00196
00197 if (pInAccessor->getState() != EXECBUF_EOS && 00198 pInAccessor->demandData()) 00199 { 00200 return false; 00201 } 00202 00203 if (initialPrefetchDone) { 00204 if (pInAccessor->getState() == EXECBUF_EOS) { 00205 processingDone = true; 00206 break; 00207 } 00208 getPrefetchSearchKey(); 00209 leafPageQueue.mapRange( 00210 treeDescriptor.segmentAccessor, 00211 NULL_PAGE_ID); 00212 initialPrefetchDone = true; 00213 } else { 00214 ++leafPageQueue; 00215 } 00216 00217 std::pair<PageId, BTreePrefetchSearchKey> &prefetchEntry = 00218 *leafPageQueue; 00219
00220
00221 if (prefetchEntry.first == NULL_PAGE_ID) { 00222 processingDone = true; 00223 break; 00224 } 00225 00226
00227
00228
00229
00230
00231 setUpSearchKey(prefetchEntry.second); 00232 pLeafReader->setCurrentPageId(prefetchEntry.first); 00233 00234
00235
00236
00237
00238 if (prefetchEntry.second.newSearch || prevLeafSearchRc) { 00239 prevLeafSearchRc = searchForKey(); 00240 if (prevLeafSearchRc) { 00241 break; 00242 } 00243 } else { 00244 if (pReader->searchFirst() && testInterval()) { 00245 projAccessor.unmarshal(tupleData.begin()); 00246 break; 00247 } 00248 } 00249
00250
00251 pReader->endSearch(); 00252 } 00253 return true; 00254 } 00255 00256 void BTreePrefetchSearchExecStream::getPrefetchSearchKey() 00257 { 00258 readSearchKey(); 00259 readDirectives(); 00260 setAdditionalKeys(); 00261 readUpperBoundKey(); 00262 pfLowerBoundDirective = lowerBoundDirective; 00263 pfUpperBoundDirective = upperBoundDirective; 00264 pfLowerBoundData = *pSearchKey; 00265 pfUpperBoundData = upperBoundData; 00266 } 00267 00268 void BTreePrefetchSearchExecStream::setUpSearchKey( 00269 BTreePrefetchSearchKey const &searchKey) 00270 { 00271 lowerBoundDirective = searchKey.lowerBoundDirective; 00272 upperBoundDirective = searchKey.upperBoundDirective; 00273 setLowerBoundKey(searchKey.lowerKeyBuffer); 00274 savedUpperBoundAccessor.setCurrentTupleBuf(searchKey.upperKeyBuffer); 00275 savedUpperBoundAccessor.unmarshal(upperBoundData); 00276 } 00277 00278 void BTreePrefetchSearchExecStream::setLowerBoundKey(PConstBuffer buf) 00279 { 00280 savedLowerBoundAccessor.setCurrentTupleBuf(buf); 00281 savedLowerBoundAccessor.unmarshal(inputKeyData); 00282 pSearchKey = &inputKeyData; 00283 } 00284 00285 void BTreePrefetchSearchExecStream::setAdditionalKeys() 00286 { 00287 pSearchKey = &inputKeyData; 00288 } 00289 00290 PageId BTreePrefetchSearchExecStream::getNextPageForPrefetch( 00291 BTreePrefetchSearchKey &searchKey, 00292 bool &found) 00293 { 00294 found = true; 00295 00296
00297
00298
00299 if (rootOnly) { 00300 while (true) { 00301 if (returnedRoot) { 00302 if (pInAccessor->getState() == EXECBUF_EOS) { 00303 return NULL_PAGE_ID; 00304 } else if (pInAccessor->demandData()) { 00305 found = false; 00306 return NULL_PAGE_ID; 00307 } 00308 returnedRoot = false; 00309 getPrefetchSearchKey(); 00310 } 00311 returnedRoot = true; 00312 setSearchKeyData(true, searchKey); 00313 pInAccessor->consumeTuple(); 00314 return pNonLeafReader->getRootPageId(); 00315 } 00316 } 00317 00318 bool first; 00319 00320 while (true) { 00321
00322
00323
00324
00325 if (pNonLeafReader->isPositioned()) { 00326 first = false; 00327 if (endOnNextKey) { 00328 pInAccessor->consumeTuple(); 00329 pNonLeafReader->endSearch(); 00330 continue; 00331 } else { 00332 pNonLeafReader->searchNext(); 00333 } 00334 00335
00336
00337
00338 } else { 00339 if (pInAccessor->isTupleConsumptionPending()) { 00340
00341
00342
00343 if (pInAccessor->getState() == EXECBUF_EOS) { 00344 return NULL_PAGE_ID; 00345 } else if (pInAccessor->demandData()) { 00346 found = false; 00347 return NULL_PAGE_ID; 00348 } 00349 getPrefetchSearchKey(); 00350 } 00351 first = true; 00352 switch (pfLowerBoundDirective) { 00353 case SEARCH_UNBOUNDED_LOWER: 00354 if (pfLowerBoundData.size() <= 1) { 00355 pNonLeafReader->searchFirst(); 00356 break; 00357 } 00358
00359
00360
00361 case SEARCH_CLOSED_LOWER: 00362 pNonLeafReader->searchForKey( 00363 pfLowerBoundData, DUP_SEEK_BEGIN, leastUpper); 00364 break; 00365 case SEARCH_OPEN_LOWER: 00366 pNonLeafReader->searchForKey( 00367 pfLowerBoundData, DUP_SEEK_END, leastUpper); 00368 break; 00369 default: 00370 permFail( 00371 "unexpected lower bound directive: " 00372 << (char) lowerBoundDirective); 00373 } 00374 } 00375 00376
00377
00378
00379
00380 assert(pNonLeafReader->isSingular()); 00381 endOnNextKey = testNonLeafInterval(); 00382 setSearchKeyData(first, searchKey); 00383 return pNonLeafReader->getChildForCurrent(); 00384 } 00385 } 00386 00387 void BTreePrefetchSearchExecStream::setSearchKeyData( 00388 bool newSearch, 00389 BTreePrefetchSearchKey &searchKey) 00390 { 00391 searchKey.newSearch = newSearch; 00392 searchKey.lowerBoundDirective = 00393 pfLowerBoundDirective; 00394 searchKey.upperBoundDirective = 00395 pfUpperBoundDirective; 00396 if (bigMaxKey) { 00397 assert( 00398 savedLowerBoundAccessor.getByteCount(pfLowerBoundData) 00399 <= keyValuesSize); 00400 assert( 00401 savedUpperBoundAccessor.getByteCount(pfUpperBoundData) 00402 <= keyValuesSize); 00403 } 00404 savedLowerBoundAccessor.marshal(pfLowerBoundData, searchKey.lowerKeyBuffer); 00405 savedUpperBoundAccessor.marshal(pfUpperBoundData, searchKey.upperKeyBuffer); 00406 } 00407 00408 bool BTreePrefetchSearchExecStream::testNonLeafInterval() 00409 { 00410
00411
00412 if (pNonLeafReader->isPositionedOnInfinityKey()) { 00413 return true; 00414 } 00415 00416 BTreeNodeAccessor &nodeAccessor = 00417 pNonLeafReader->getNonLeafNodeAccessor(); 00418 if (pfUpperBoundDirective == SEARCH_UNBOUNDED_UPPER) { 00419
00420
00421
00422
00423 if (pfLowerBoundData.size() > 1) { 00424 nodeAccessor.unmarshalKey(readerKeyData); 00425 int c = 00426 inputKeyDesc.compareTuplesKey( 00427 pfLowerBoundData, 00428 readerKeyData, 00429 pfLowerBoundData.size() - 1); 00430
00431
00432 permAssert(c <= 0); 00433 return (c < 0); 00434 } else { 00435 return false; 00436 } 00437 } else { 00438 nodeAccessor.unmarshalKey(readerKeyData); 00439 int c = inputKeyDesc.compareTuples(pfUpperBoundData, readerKeyData); 00440
00441
00442 if (c == 0) { 00443 if (pfUpperBoundDirective == SEARCH_OPEN_UPPER) { 00444 c = -1; 00445 } 00446 } 00447 return (c < 0); 00448 } 00449 } 00450 00451 void BTreePrefetchSearchExecStream::closeImpl() 00452 { 00453 BTreeSearchExecStream::closeImpl(); 00454 scratchPages.clear(); 00455 if (scratchAccessor.pSegment) { 00456 scratchAccessor.pSegment->deallocatePageRange( 00457 NULL_PAGE_ID, NULL_PAGE_ID); 00458 } 00459 } 00460 00461 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/ftrs/BTreePrefetchSearchExecStream.cpp#7 $"); 00462 00463