Fennel: /home/pub/open/dev/fennel/ftrs/BTreeSearchExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/ftrs/BTreeSearchExecStream.h" 00026 #include "fennel/btree/BTreeReader.h" 00027 #include "fennel/exec/ExecStreamBufAccessor.h" 00028 #include "fennel/tuple/StandardTypeDescriptor.h" 00029 00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/ftrs/BTreeSearchExecStream.cpp#15 $"); 00031 00032 void BTreeSearchExecStream::prepare(BTreeSearchExecStreamParams const &params) 00033 { 00034 BTreeReadExecStream::prepare(params); 00035 ConduitExecStream::prepare(params); 00036 00037 leastUpper = true; 00038 outerJoin = params.outerJoin; 00039 searchKeyParams = params.searchKeyParams; 00040 00041
00042 00043 TupleDescriptor const &inputDesc = pInAccessor->getTupleDesc(); 00044 00045 TupleAccessor &inputAccessor = pInAccessor->getConsumptionTupleAccessor(); 00046 00047 if (params.inputDirectiveProj.size()) { 00048 assert(params.inputDirectiveProj.size() == 2); 00049
00050
00051
00052
00053 assert(params.inputKeyProj.size() > 0); 00054 assert((params.inputKeyProj.size() % 2) == 0); 00055 directiveAccessor.bind(inputAccessor, params.inputDirectiveProj); 00056 TupleDescriptor inputDirectiveDesc; 00057 inputDirectiveDesc.projectFrom(inputDesc, params.inputDirectiveProj); 00058 00059
00060 StandardTypeDescriptorFactory stdTypeFactory; 00061 TupleAttributeDescriptor expectedDirectiveDesc( 00062 stdTypeFactory.newDataType(STANDARD_TYPE_CHAR)); 00063 expectedDirectiveDesc.cbStorage = 1; 00064 assert( 00065 inputDirectiveDesc[LOWER_BOUND_DIRECTIVE] == expectedDirectiveDesc); 00066 assert( 00067 inputDirectiveDesc[UPPER_BOUND_DIRECTIVE] == expectedDirectiveDesc); 00068 00069 directiveData.compute(inputDirectiveDesc); 00070 } 00071 00072 if (params.inputKeyProj.size()) { 00073 TupleProjection inputKeyProj = params.inputKeyProj; 00074 if (params.inputDirectiveProj.size()) { 00075
00076
00077 TupleProjection upperBoundProj; 00078 int n = inputKeyProj.size() / 2; 00079
00080 upperBoundProj.resize(n); 00081
00082 std::copy( 00083 inputKeyProj.begin() + n, 00084 inputKeyProj.end(), 00085 upperBoundProj.begin()); 00086
00087 inputKeyProj.resize(n); 00088 00089 upperBoundAccessor.bind(inputAccessor, upperBoundProj); 00090 upperBoundDesc.projectFrom(inputDesc, upperBoundProj); 00091 upperBoundData.compute(upperBoundDesc); 00092 00093 assert( 00094 searchKeyParams.size() == 0 || 00095 (searchKeyParams.size() >= (n-1)2+1 && 00096 searchKeyParams.size() <= n*2)); 00097 } else { 00098 assert(searchKeyParams.size() == 0); 00099 } 00100 inputKeyAccessor.bind(inputAccessor,inputKeyProj); 00101 inputKeyDesc.projectFrom(inputDesc,inputKeyProj); 00102 } else { 00103 inputKeyDesc = inputDesc; 00104 assert(searchKeyParams.size() == 0); 00105 } 00106 inputKeyData.compute(inputKeyDesc); 00107 00108 if (upperBoundDesc.size()) { 00109
00110
00111 assert(upperBoundDesc == inputKeyDesc); 00112 } 00113 00114 preFilterNulls = false; 00115 if ((outerJoin && inputKeyDesc.containsNullable()) || 00116 searchKeyParams.size() > 0) 00117 { 00118
00119
00120
00121 preFilterNulls = true; 00122 00123
00124
00125
00126 if (searchKeyParams.size() == 0) { 00127 for (uint i = 0; i < inputKeyData.size(); i++) { 00128 searchKeyProj.push_back(i); 00129 upperBoundKeyProj.push_back(i); 00130 } 00131 } 00132 } 00133 00134 inputJoinAccessor.bind(inputAccessor,params.inputJoinProj); 00135 00136 TupleDescriptor joinDescriptor; 00137 joinDescriptor.projectFrom(inputDesc,params.inputJoinProj); 00138 00139 TupleProjection readerKeyProj = treeDescriptor.keyProjection; 00140 readerKeyProj.resize(inputKeyDesc.size()); 00141 readerKeyData.compute(inputKeyDesc); 00142 00143 nJoinAttributes = params.outputTupleDesc.size() - params.outputProj.size(); 00144 } 00145 00146 void BTreeSearchExecStream::open(bool restart) 00147 { 00148
00149
00150 if (!restart && opaqueToInt(rootPageIdParamId) > 0) { 00151 treeDescriptor.rootPageId = 00152 reinterpret_cast<PageId const *>( 00153 pDynamicParamManager->getParam(rootPageIdParamId).getDatum().pData); 00154 } 00155 BTreeReadExecStream::open(restart); 00156 ConduitExecStream::open(restart); 00157 dynamicKeysRead = false; 00158 00159 if (restart) { 00160 return; 00161 } 00162 00163
00164 TupleProjection readerKeyProj = treeDescriptor.keyProjection; 00165 readerKeyProj.resize(inputKeyDesc.size()); 00166 readerKeyAccessor.bind( 00167 pReader->getTupleAccessorForRead(), 00168 readerKeyProj); 00169 } 00170 00171 ExecStreamResult BTreeSearchExecStream::execute( 00172 ExecStreamQuantum const &quantum) 00173 { 00174 ExecStreamResult rc = precheckConduitBuffers(); 00175 if (rc != EXECRC_YIELD) { 00176 return rc; 00177 } 00178 00179 uint nTuples = 0; 00180 assert(quantum.nTuplesMax > 0); 00181 00182
00183 for (;;) { 00184 if (innerSearchLoop()) { 00185 return EXECRC_BUF_UNDERFLOW; 00186 } 00187 00188
00189 rc = innerFetchLoop(quantum, nTuples); 00190 if (rc == EXECRC_YIELD) { 00191 pInAccessor->consumeTuple(); 00192 } else { 00193 return rc; 00194 } 00195 } 00196 } 00197 00198 bool BTreeSearchExecStream::innerSearchLoop() 00199 { 00200 while (pReader->isPositioned()) { 00201 if (pInAccessor->demandData()) { 00202 return false; 00203 } 00204 00205 readSearchKey(); 00206 readDirectives(); 00207 pSearchKey = &inputKeyData; 00208 readUpperBoundKey(); 00209 if (searchForKey()) { 00210 pInAccessor->consumeTuple(); 00211 } 00212 } 00213 return true; 00214 } 00215 00216 void BTreeSearchExecStream::readSearchKey() 00217 { 00218
00219
00220
00221 TupleAccessor &inputAccessor = 00222 pInAccessor->accessConsumptionTuple(); 00223 00224 if (searchKeyParams.size() == 0) { 00225 if (inputKeyAccessor.size()) { 00226
00227 inputKeyAccessor.unmarshal(inputKeyData); 00228 } else { 00229
00230 inputAccessor.unmarshal(inputKeyData); 00231 } 00232 } else { 00233
00234
00235 assert(dynamicKeysRead); 00236 00237
00238
00239
00240
00241 uint nParams = searchKeyParams.size(); 00242 searchKeyProj.clear(); 00243 for (uint i = 0; i < nParams / 2; i++) { 00244 inputKeyData[searchKeyParams[i].keyOffset] = 00245 pDynamicParamManager->getParam( 00246 searchKeyParams[i].dynamicParamId).getDatum(); 00247 searchKeyProj.push_back(i); 00248 } 00249
00250
00251 if ((nParams%2) && searchKeyParams[nParams/2].keyOffset == nParams/2) { 00252 inputKeyData[nParams / 2] = 00253 pDynamicParamManager->getParam( 00254 searchKeyParams[nParams / 2].dynamicParamId).getDatum(); 00255
00256
00257
00258 searchKeyProj.push_back(nParams / 2); 00259 } 00260 00261 dynamicKeysRead = true; 00262 } 00263 } 00264 00265 void BTreeSearchExecStream::readDirectives() 00266 { 00267 if (directiveAccessor.size()) { 00268
00269 lowerBoundDirective = SEARCH_CLOSED_LOWER; 00270 upperBoundDirective = SEARCH_CLOSED_UPPER; 00271 return; 00272 } 00273 00274 directiveAccessor.unmarshal(directiveData); 00275 00276
00277 assert(directiveData[LOWER_BOUND_DIRECTIVE].pData); 00278 assert(directiveData[UPPER_BOUND_DIRECTIVE].pData); 00279 00280 lowerBoundDirective = 00281 SearchEndpoint(
(directiveData[LOWER_BOUND_DIRECTIVE].pData)); 00282 upperBoundDirective = 00283 SearchEndpoint(
(directiveData[UPPER_BOUND_DIRECTIVE].pData)); 00284 } 00285 00286 bool BTreeSearchExecStream::searchForKey() 00287 { 00288 switch (lowerBoundDirective) { 00289 case SEARCH_UNBOUNDED_LOWER: 00290 if (pSearchKey->size() <= 1) { 00291 pReader->searchFirst(); 00292 break; 00293 } 00294
00295
00296
00297 case SEARCH_CLOSED_LOWER: 00298 pReader->searchForKey(pSearchKey, DUP_SEEK_BEGIN, leastUpper); 00299 break; 00300 case SEARCH_OPEN_LOWER: 00301 pReader->searchForKey(pSearchKey, DUP_SEEK_END, leastUpper); 00302 break; 00303 default: 00304 permFail( 00305 "unexpected lower bound directive: " 00306 << (char) lowerBoundDirective); 00307 } 00308 00309 bool match = true; 00310 if (preFilterNulls && pSearchKey->containsNull(searchKeyProj)) { 00311
00312
00313
00314 match = false; 00315 } else { 00316 if (pReader->isSingular()) { 00317
00318 match = false; 00319 } else { 00320 if (preFilterNulls && 00321 upperBoundData.containsNull(upperBoundKeyProj)) 00322 { 00323 match = false; 00324 } else { 00325 match = testInterval(); 00326 } 00327 } 00328 } 00329 00330 if (!match) { 00331 if (outerJoin) { 00332 pReader->endSearch(); 00333 return false; 00334 } 00335
00336 for (uint i = nJoinAttributes; i < tupleData.size(); ++i) { 00337 tupleData[i].pData = NULL; 00338 } 00339 } else { 00340 projAccessor.unmarshal( 00341 tupleData.begin() + nJoinAttributes); 00342 } 00343 00344
00345 if (inputJoinAccessor.size()) { 00346 inputJoinAccessor.unmarshal(tupleData); 00347 } 00348 00349 return true; 00350 } 00351 00352 void BTreeSearchExecStream::readUpperBoundKey() 00353 { 00354 if (searchKeyParams.size() == 0) { 00355 if (upperBoundDesc.size()) { 00356 upperBoundAccessor.unmarshal(upperBoundData); 00357 } else { 00358 upperBoundData = *pSearchKey; 00359 } 00360 } else { 00361
00362
00363
00364
00365 uint nParams = searchKeyParams.size(); 00366
00367
00368
00369
00370
00371 upperBoundKeyProj.clear(); 00372 if (!(nParams % 2) 00373 || searchKeyParams[nParams / 2].keyOffset == nParams / 2 + 1) 00374 { 00375 upperBoundData[0] = 00376 pDynamicParamManager->getParam( 00377 searchKeyParams[nParams / 2].dynamicParamId).getDatum(); 00378 upperBoundKeyProj.push_back(0); 00379 } 00380 uint keySize = upperBoundData.size(); 00381 for (uint i = nParams / 2 + 1; i < nParams; i++) { 00382 upperBoundData[searchKeyParams[i].keyOffset - keySize] = 00383 pDynamicParamManager->getParam( 00384 searchKeyParams[i].dynamicParamId).getDatum(); 00385 upperBoundKeyProj.push_back(i - keySize); 00386 } 00387 } 00388 } 00389 00390 bool BTreeSearchExecStream::testInterval() 00391 { 00392 if (upperBoundDirective == SEARCH_UNBOUNDED_UPPER) { 00393
00394
00395
00396 if (pSearchKey->size() > 1) { 00397 readerKeyAccessor.unmarshal(readerKeyData); 00398 int c = 00399 inputKeyDesc.compareTuplesKey( 00400 readerKeyData, 00401 *pSearchKey, 00402 pSearchKey->size() - 1); 00403 if (c != 0) { 00404 return false; 00405 } 00406 } 00407 return true; 00408 } else { 00409 readerKeyAccessor.unmarshal(readerKeyData); 00410 int c = inputKeyDesc.compareTuples(upperBoundData, readerKeyData); 00411 if (upperBoundDirective == SEARCH_CLOSED_UPPER) { 00412
00413
00414
00415
00416 if (leastUpper && lowerBoundDirective == SEARCH_CLOSED_LOWER && 00417 pSearchKey->size() > 1 && c > 0) 00418 { 00419 return checkNextKey(); 00420 } 00421 if (c >= 0) { 00422 return true; 00423 } 00424 } else { 00425 if (c > 0) { 00426 return true; 00427 } 00428 } 00429 } 00430 return false; 00431 } 00432 00433 bool BTreeSearchExecStream::checkNextKey() 00434 { 00435
00436 if (pReader->searchNext()) { 00437 return false; 00438 } 00439 readerKeyAccessor.unmarshal(readerKeyData); 00440 int c = inputKeyDesc.compareTuples(upperBoundData, readerKeyData); 00441
00442 assert(c <= 0); 00443 return (c == 0); 00444 } 00445 00446 ExecStreamResult BTreeSearchExecStream::innerFetchLoop( 00447 ExecStreamQuantum const &quantum, 00448 uint &nTuples) 00449 { 00450 for (;;) { 00451 if (nTuples >= quantum.nTuplesMax) { 00452 return EXECRC_QUANTUM_EXPIRED; 00453 } 00454 if (reachedTupleLimit(nTuples)) { 00455 return EXECRC_BUF_OVERFLOW; 00456 } 00457 if (pOutAccessor->produceTuple(tupleData)) { 00458 ++nTuples; 00459 } else { 00460 return EXECRC_BUF_OVERFLOW; 00461 } 00462 if (pReader->searchNext()) { 00463 if (testInterval()) { 00464 projAccessor.unmarshal( 00465 tupleData.begin() + nJoinAttributes); 00466
00467 continue; 00468 } 00469 } 00470 pReader->endSearch(); 00471
00472 return EXECRC_YIELD; 00473 } 00474 } 00475 00476 void BTreeSearchExecStream::closeImpl() 00477 { 00478 ConduitExecStream::closeImpl(); 00479 BTreeReadExecStream::closeImpl(); 00480 } 00481 00482 bool BTreeSearchExecStream::reachedTupleLimit(uint nTuples) 00483 { 00484 return false; 00485 } 00486 00487 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/ftrs/BTreeSearchExecStream.cpp#15 $"); 00488 00489