Fennel: /home/pub/open/dev/fennel/btree/BTreeWriter.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/btree/BTreeWriter.h" 00026 #include "fennel/btree/BTreeRecoveryFactory.h" 00027 #include "fennel/btree/BTreeAccessBaseImpl.h" 00028 #include "fennel/btree/BTreeReaderImpl.h" 00029 #include "fennel/btree/BTreeDuplicateKeyExcn.h" 00030 #include "fennel/txn/LogicalTxn.h" 00031 #include "fennel/common/ByteOutputStream.h" 00032 #include "fennel/common/ByteInputStream.h" 00033 #include "fennel/tuple/TuplePrinter.h" 00034 00035 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/btree/BTreeWriter.cpp#23 $"); 00036 00037 00038 BTreeWriter::BTreeWriter( 00039 BTreeDescriptor const &descriptor, 00040 SegmentAccessor const &scratchAccessorInit, 00041 bool monotonicInit) 00042 : BTreeReader(descriptor), 00043 scratchAccessor(scratchAccessorInit) 00044 { 00045
00046 leafLockMode = LOCKMODE_X; 00047 scratchPageLock.accessSegment(scratchAccessor); 00048 splitTupleBuffer.reset( 00049 new FixedBuffer[pNonLeafNodeAccessor->tupleAccessor.getMaxByteCount()]); 00050 leafTupleBuffer.reset( 00051 new FixedBuffer[pLeafNodeAccessor->tupleAccessor.getMaxByteCount()]); 00052 monotonic = monotonicInit; 00053 } 00054 00055 BTreeWriter::~BTreeWriter() 00056 { 00057 } 00058 00059 void BTreeWriter::insertTupleData( 00060 TupleData const &tupleData,Distinctness distinctness) 00061 { 00062
00063 pLeafNodeAccessor->tupleAccessor.marshal(tupleData,leafTupleBuffer.get()); 00064 insertTupleFromBuffer(leafTupleBuffer.get(),distinctness); 00065 } 00066 00067 uint BTreeWriter::insertTupleFromBuffer( 00068 PConstBuffer pTupleBuffer,Distinctness distinctness) 00069 { 00070 BTreeNodeAccessor &nodeAccessor = *pLeafNodeAccessor; 00071 nodeAccessor.tupleAccessor.setCurrentTupleBuf(pTupleBuffer); 00072 00073 validateTupleSize(nodeAccessor.tupleAccessor); 00074 00075 uint cbTuple = nodeAccessor.tupleAccessor.getCurrentByteCount(); 00076 00077 #if 0 00078 TuplePrinter tuplePrinter; 00079 tuplePrinter.print(std::cout, keyDescriptor, searchKeyData); 00080 std::cout << std::endl; 00081 #endif 00082 00083
00084
00085
00086
00087 if (monotonic) { 00088 if (isPositioned()) { 00089 ++iTupleOnLowestLevel; 00090 assert(checkMonotonicity(nodeAccessor, pTupleBuffer)); 00091 } else { 00092 positionSearchKey(nodeAccessor); 00093 } 00094 } else { 00095 bool duplicate = positionSearchKey(nodeAccessor); 00096 00097
00098
00099
00100 if (duplicate) { 00101 switch (distinctness) { 00102 case DUP_ALLOW: 00103 break; 00104 case DUP_DISCARD: 00105 endSearch(); 00106 return cbTuple; 00107 default: 00108 if (searchKeyData.containsNull()) { 00109 endSearch(); 00110 throw BTreeDuplicateKeyExcn(keyDescriptor,searchKeyData); 00111 } 00112 } 00113 } 00114 } 00115 00116 if (isLoggingEnabled()) { 00117 ByteOutputStream &logStream = getLogicalTxn()->beginLogicalAction( 00118 *this, 00119 ACTION_INSERT); 00120 memcpy( 00121 logStream.getWritePointer(cbTuple), 00122 pTupleBuffer, 00123 cbTuple); 00124 logStream.consumeWritePointer(cbTuple); 00125 getLogicalTxn()->endLogicalAction(); 00126 } 00127 00128 bool split = attemptInsertWithoutSplit( 00129 pageLock,pTupleBuffer,cbTuple,iTupleOnLowestLevel); 00130 if (split) { 00131 splitCurrentNode(pTupleBuffer,cbTuple,iTupleOnLowestLevel); 00132 } 00133 00134
00135
00136 if (monotonic || split) { 00137 endSearch(); 00138 } 00139 00140 return cbTuple; 00141 } 00142 00143 bool BTreeWriter::attemptInsertWithoutSplit( 00144 BTreePageLock &targetPageLock, 00145 PConstBuffer pTupleBuffer,uint cbTuple,uint iPosition) 00146 { 00147 BTreeNode *pNode = &(targetPageLock.getNodeForWrite()); 00148 BTreeNodeAccessor &nodeAccessor = getNodeAccessor(*pNode); 00149 switch (nodeAccessor.calculateCapacity(*pNode,cbTuple)) { 00150 case BTreeNodeAccessor::CAN_FIT: 00151 break; 00152 case BTreeNodeAccessor::CAN_FIT_WITH_COMPACTION: 00153 compactNode(pageLock); 00154
00155
00156 pNode = &(targetPageLock.getNodeForWrite()); 00157 assert(nodeAccessor.calculateCapacity(*pNode,cbTuple) 00158 == BTreeNodeAccessor::CAN_FIT); 00159 00160 #if 0 00161 std::cerr << "AFTER COMPACTION:" << std::endl; 00162 nodeAccessor.dumpNode(std::cerr,*pNode,pageId); 00163 #endif 00164 00165 break; 00166 case BTreeNodeAccessor::CAN_NOT_FIT: 00167 return false; 00168 } 00169 PBuffer pBuffer = nodeAccessor.allocateEntry(*pNode,iPosition,cbTuple); 00170 memcpy(pBuffer,pTupleBuffer,cbTuple); 00171 return true; 00172 } 00173 00174 void BTreeWriter::compactNode(BTreePageLock &targetPageLock) 00175 { 00176 BTreeNode &node = targetPageLock.getNodeForWrite(); 00177 if (scratchPageLock.isLocked()) { 00178 scratchPageLock.allocatePage(); 00179 } 00180 BTreeNode &scratchNode = scratchPageLock.getNodeForWrite(); 00181 BTreeNodeAccessor &nodeAccessor = getNodeAccessor(node); 00182 nodeAccessor.clearNode(scratchNode,getSegment()->getUsablePageSize()); 00183 nodeAccessor.compactNode(node,scratchNode); 00184 pageLock.swapBuffers(scratchPageLock); 00185 } 00186 00187 void BTreeWriter::splitCurrentNode( 00188 PConstBuffer pTupleBuffer,uint cbTuple,uint iNewTuple) 00189 { 00190
00191 PageId leftPageId = pageId; 00192 BTreeNode &node = pageLock.getNodeForWrite(); 00193 BTreeNodeAccessor &nodeAccessor = getNodeAccessor(node); 00194 00195
00196 BTreePageLock newPageLock(treeDescriptor.segmentAccessor); 00197 PageId newPageId = newPageLock.allocatePage(getPageOwnerId()); 00198 BTreeNode &newNode = newPageLock.getNodeForWrite(); 00199 nodeAccessor.clearNode(newNode,getSegment()->getUsablePageSize()); 00200 00201 #if 0 00202 std::cerr << "BEFORE SPLIT:" << std::endl; 00203 nodeAccessor.dumpNode(std::cerr,node,pageId); 00204 #endif 00205 00206 setRightSibling(newNode,newPageId,node.rightSibling); 00207 setRightSibling(node,pageId,newPageId); 00208 00209 nodeAccessor.splitNode(node,newNode,cbTuple,monotonic); 00210 00211 #if 0 00212 std::cerr << "LEFT AFTER SPLIT:" << std::endl; 00213 nodeAccessor.dumpNode(std::cerr,node,pageId); 00214 std::cerr << "RIGHT AFTER SPLIT:" << std::endl; 00215 nodeAccessor.dumpNode(std::cerr,newNode,newPageId); 00216 #endif 00217 00218
00219 BTreePageLock *pLockForNewTuple = NULL; 00220 if (iNewTuple < node.nEntries) { 00221
00222 pLockForNewTuple = &pageLock; 00223 } else if (iNewTuple > node.nEntries) { 00224
00225 pLockForNewTuple = &newPageLock; 00226 } else { 00227
00228 if (nodeAccessor.calculateCapacity(newNode,cbTuple) != 00229 BTreeNodeAccessor::CAN_NOT_FIT) 00230 { 00231 pLockForNewTuple = &newPageLock; 00232 } else { 00233 pLockForNewTuple = &pageLock; 00234 } 00235 } 00236 if (pLockForNewTuple == &newPageLock) { 00237 iNewTuple -= node.nEntries; 00238 } 00239 00240
00241
00242 bool inserted = attemptInsertWithoutSplit( 00243 *pLockForNewTuple,pTupleBuffer,cbTuple,iNewTuple); 00244 permAssert(inserted); 00245 00246
00247
00248
00249
00250 00251 BTreeNode const &leftNode = pageLock.getNodeForRead(); 00252 BTreeNode const &rightNode = newPageLock.getNodeForRead(); 00253 00254 if (pageStack.empty()) { 00255
00256 grow(rightNode, newPageId); 00257 00258
00259
00260
00261
00262 return; 00263 } 00264 00265
00266
00267 assert(leftNode.nEntries); 00268 if (monotonic) { 00269 pageLock.flushPage(true); 00270 } 00271 00272
00273 BTreeNodeAccessor &upperNodeAccessor = *pNonLeafNodeAccessor; 00274 00275
00276
00277 nodeAccessor.accessTuple(leftNode,leftNode.nEntries - 1); 00278 nodeAccessor.unmarshalKey(upperNodeAccessor.tupleData); 00279 upperNodeAccessor.tupleData.back().pData = 00280 reinterpret_cast(&leftPageId); 00281 upperNodeAccessor.tupleAccessor.marshal( 00282 upperNodeAccessor.tupleData, 00283 splitTupleBuffer.get()); 00284 00285
00286
00287
00288
00289 uint nRightKeys = nodeAccessor.getKeyCount(rightNode); 00290 nodeAccessor.accessTuple(rightNode,nRightKeys - 1); 00291 nodeAccessor.unmarshalKey(searchKeyData); 00292 00293
00294
00295
00296
00297 bool rightMostNode = (rightNode.rightSibling == NULL_PAGE_ID); 00298 assert(monotonic || (monotonic && rightMostNode)); 00299 uint iPosition = lockParentPage(leftNode.height, rightMostNode); 00300 00301
00302
00303
00304 00305
00306
00307 BTreeNode &upperNode = pageLock.getNodeForWrite(); 00308 upperNodeAccessor.tupleAccessor.setCurrentTupleBuf( 00309 const_cast( 00310 upperNodeAccessor.getEntryForRead(upperNode,iPosition))); 00311 TupleDatum &childDatum = upperNodeAccessor.tupleData.back(); 00312 pChildAccessor->unmarshalValue( 00313 upperNodeAccessor.tupleAccessor, 00314 childDatum); 00315 PageId &childPageId = 00316 *(reinterpret_cast<PageId *>(const_cast(childDatum.pData))); 00317 assert(childPageId == leftPageId); 00318 childPageId = newPageId; 00319 00320
00321
00322 upperNodeAccessor.tupleAccessor.setCurrentTupleBuf(splitTupleBuffer.get()); 00323 cbTuple = upperNodeAccessor.tupleAccessor.getCurrentByteCount(); 00324 inserted = attemptInsertWithoutSplit( 00325 pageLock,splitTupleBuffer.get(),cbTuple,iPosition); 00326 if (inserted) { 00327
00328 return; 00329 } 00330 00331
00332 00333
00334 newPageLock.unlock(); 00335 00336 splitCurrentNode(splitTupleBuffer.get(),cbTuple,iPosition); 00337 } 00338 00339 uint BTreeWriter::lockParentPage(uint height, bool rightMostNode) 00340 { 00341 assert(pageStack.empty()); 00342 pageId = pageStack.back(); 00343 pageStack.pop_back(); 00344 00345 uint iPosition; 00346 for (;;) { 00347 pageLock.lockPageWithCoupling(pageId,LOCKMODE_X); 00348 00349 bool found; 00350 BTreeNode const &parentNode = pageLock.getNodeForRead(); 00351 if (pageId == getRootPageId() && parentNode.height != height+1) { 00352
00353
00354
00355
00356 00357
00358
00359
00360 pageLock.unlock(); 00361 BTreePageLock stackPageLock(treeDescriptor.segmentAccessor); 00362 uint iPosition; 00363 for (;;) { 00364 stackPageLock.lockPageWithCoupling(pageId,LOCKMODE_S); 00365 bool found; 00366 BTreeNode const &node = stackPageLock.getNodeForRead(); 00367 if (node.height == height) { 00368
00369 stackPageLock.unlock(); 00370 break; 00371 } 00372 BTreeNodeAccessor &nodeAccessor = getNodeAccessor(node); 00373 00374 iPosition = nodeAccessor.binarySearch( 00375 node, 00376 keyDescriptor, 00377 searchKeyData, 00378 DUP_SEEK_ANY, 00379 true, 00380 nodeAccessor.tupleData, 00381 found); 00382 nodeAccessor.accessTuple(node, iPosition); 00383 nodeAccessor.unmarshalKey(nodeAccessor.tupleData); 00384 if (iPosition == nodeAccessor.getKeyCount(node)) { 00385 assert(!found); 00386
00387
00388 if (node.rightSibling != NULL_PAGE_ID) { 00389 pageId = node.rightSibling; 00390 continue; 00391 } 00392 } 00393 pageStack.push_back(pageId); 00394 pageId = *(PageId *)(nodeAccessor.tupleData.back().pData); 00395 stackPageLock.unlock(); 00396 } 00397 pageId = pageStack.back(); 00398 pageStack.pop_back(); 00399
00400 pageLock.lockPageWithCoupling(pageId,LOCKMODE_X); 00401 } 00402 00403
00404 BTreeNode const &node = pageLock.getNodeForRead(); 00405 00406 BTreeNodeAccessor &nodeAccessor = getNodeAccessor(node); 00407 if (rightMostNode) { 00408
00409
00410 iPosition = nodeAccessor.getKeyCount(node); 00411 break; 00412 } 00413 00414
00415 00416 iPosition = nodeAccessor.binarySearch( 00417 node, 00418 keyDescriptor, 00419 searchKeyData, 00420 DUP_SEEK_ANY, 00421 true, 00422 nodeAccessor.tupleData, 00423 found); 00424 00425 if (iPosition == nodeAccessor.getKeyCount(node)) { 00426 assert(!found); 00427
00428
00429 if (node.rightSibling != NULL_PAGE_ID) { 00430 pageId = node.rightSibling; 00431 continue; 00432 } else { 00433 break; 00434 } 00435 } 00436
00437 break; 00438 } 00439 return iPosition; 00440 } 00441 00442 void BTreeWriter::grow( 00443 BTreeNode const &rightNode, PageId rightPageId) 00444 { 00445 BTreeNode &node = pageLock.getNodeForWrite(); 00446 BTreeNodeAccessor &nodeAccessor = getNodeAccessor(node); 00447
00448
00449
00450
00451 #if 0 00452 std::cerr << "before grow root:" << std::endl; 00453 nodeAccessor.dumpNode(std::cerr,node,pageId); 00454 std::cerr << "before grow right:" << std::endl; 00455 nodeAccessor.dumpNode(std::cerr,rightNode,rightPageId); 00456 #endif 00457 00458 BTreePageLock newLeftPageLock(treeDescriptor.segmentAccessor); 00459 PageId newLeftPageId = newLeftPageLock.allocatePage(getPageOwnerId()); 00460 BTreeNode &newLeftNode = newLeftPageLock.getNodeForWrite(); 00461 nodeAccessor.clearNode(newLeftNode,getSegment()->getUsablePageSize()); 00462
00463 newLeftNode = node; 00464 memcpy( 00465 newLeftNode.getDataForWrite(), 00466 node.getDataForRead(), 00467 getSegment()->getUsablePageSize() - sizeof(BTreeNode)); 00468 00469
00470
00471 getSegment()->setPageSuccessor(newLeftPageId, rightPageId); 00472 getSegment()->setPageSuccessor(pageId, NULL_PAGE_ID); 00473 00474
00475
00476 00477 nodeAccessor.clearNode(node,getSegment()->getUsablePageSize()); 00478 node.height = newLeftNode.height + 1; 00479 BTreeNodeAccessor &rootNodeAccessor = getNonLeafNodeAccessor(node); 00480 00481
00482 00483 nodeAccessor.accessTuple(newLeftNode, newLeftNode.nEntries - 1); 00484 nodeAccessor.unmarshalKey(rootNodeAccessor.tupleData); 00485 rootNodeAccessor.tupleData.back().pData = 00486 reinterpret_cast(&newLeftPageId); 00487 uint cb = rootNodeAccessor.tupleAccessor.getByteCount( 00488 rootNodeAccessor.tupleData); 00489 PBuffer pBuffer1 = rootNodeAccessor.allocateEntry(node,0,cb); 00490 rootNodeAccessor.tupleAccessor.marshal( 00491 rootNodeAccessor.tupleData, pBuffer1); 00492 00493 nodeAccessor.accessTuple(rightNode, rightNode.nEntries - 1); 00494 nodeAccessor.unmarshalKey(rootNodeAccessor.tupleData); 00495 rootNodeAccessor.tupleData.back().pData = 00496 reinterpret_cast(&rightPageId); 00497 cb = rootNodeAccessor.tupleAccessor.getByteCount( 00498 rootNodeAccessor.tupleData); 00499 00500 PBuffer pBuffer2 = rootNodeAccessor.allocateEntry(node,1,cb); 00501 rootNodeAccessor.tupleAccessor.marshal( 00502 rootNodeAccessor.tupleData, pBuffer2); 00503 #if 0 00504 std::cerr << "after grow root:" << std::endl; 00505 rootNodeAccessor.dumpNode(std::cerr,node,pageId); 00506 std::cerr << "after grow left:" << std::endl; 00507 nodeAccessor.dumpNode(std::cerr,newLeftNode,newLeftPageId); 00508 std::cerr << "after grow right:" << std::endl; 00509 nodeAccessor.dumpNode(std::cerr,rightNode,rightPageId); 00510 #endif 00511 if (monotonic) { 00512 newLeftPageLock.flushPage(true); 00513 } 00514 newLeftPageLock.unlock(); 00515 return; 00516 } 00517 00518 inline void BTreeWriter::optimizeRootLockMode() 00519 { 00520 if (pageId != getRootPageId()) { 00521
00522
00523 rootLockMode = LOCKMODE_S; 00524 } 00525 } 00526 00527 void BTreeWriter::deleteCurrent() 00528 { 00529
00530 00531 assert(pageLock.isLocked()); 00532 00533 optimizeRootLockMode(); 00534 00535 BTreeNode &node = pageLock.getNodeForWrite(); 00536 BTreeNodeAccessor &nodeAccessor = getLeafNodeAccessor(node); 00537 00538 if (isLoggingEnabled()) { 00539 uint cbTuple = nodeAccessor.tupleAccessor.getCurrentByteCount(); 00540 ByteOutputStream &logStream = getLogicalTxn()->beginLogicalAction( 00541 *this, 00542 ACTION_DELETE); 00543 memcpy( 00544 logStream.getWritePointer(cbTuple), 00545 nodeAccessor.tupleAccessor.getCurrentTupleBuf(), 00546 cbTuple); 00547 logStream.consumeWritePointer(cbTuple); 00548 getLogicalTxn()->endLogicalAction(); 00549 } 00550 00551 nodeAccessor.deallocateEntry(node,iTupleOnLowestLevel); 00552 00553
00554 --iTupleOnLowestLevel; 00555 } 00556 00557 bool BTreeWriter::updateCurrent(TupleData const &tupleData) 00558 { 00559
00560 00561 PBuffer pTupleBuf; 00562 00563 assert(pageLock.isLocked()); 00564 00565 optimizeRootLockMode(); 00566 00567 BTreeNode *pNode = &(pageLock.getNodeForWrite()); 00568 BTreeNodeAccessor &nodeAccessor = getLeafNodeAccessor(*pNode); 00569 00570 assert(isLoggingEnabled()); 00571 00572 if (nodeAccessor.hasFixedWidthEntries()) { 00573
00574 pTupleBuf = const_cast( 00575 nodeAccessor.getEntryForRead(*pNode,iTupleOnLowestLevel)); 00576 nodeAccessor.tupleAccessor.marshal(tupleData,pTupleBuf); 00577 return true; 00578 } 00579 00580
00581 uint cbTuple = 00582 nodeAccessor.tupleAccessor.getByteCount(tupleData); 00583 int cbDelta = 00584 cbTuple - nodeAccessor.tupleAccessor.getCurrentByteCount(); 00585 if (cbDelta < 0) { 00586 cbDelta = 0; 00587 } 00588 00589
00590
00591 switch (nodeAccessor.calculateCapacity(*pNode,cbDelta)) { 00592 case BTreeNodeAccessor::CAN_FIT: 00593 nodeAccessor.deallocateEntry(*pNode,iTupleOnLowestLevel); 00594 break; 00595 case BTreeNodeAccessor::CAN_FIT_WITH_COMPACTION: 00596 nodeAccessor.deallocateEntry(*pNode,iTupleOnLowestLevel); 00597 compactNode(pageLock); 00598
00599
00600 pNode = &(pageLock.getNodeForWrite()); 00601 assert(nodeAccessor.calculateCapacity(*pNode,cbTuple) 00602 == BTreeNodeAccessor::CAN_FIT); 00603 break; 00604 case BTreeNodeAccessor::CAN_NOT_FIT: 00605 return false; 00606 default: 00607 permAssert(false); 00608 } 00609 00610 pTupleBuf = nodeAccessor.allocateEntry(*pNode,iTupleOnLowestLevel,cbTuple); 00611 nodeAccessor.tupleAccessor.marshal(tupleData,pTupleBuf); 00612 return true; 00613 } 00614 00615 LogicalTxnClassId BTreeWriter::getParticipantClassId() const 00616 { 00617 return BTreeRecoveryFactory::getParticipantClassId(); 00618 } 00619 00620 void BTreeWriter::describeParticipant( 00621 ByteOutputStream &logStream) 00622 { 00623 PageId rootPageId = getRootPageId(); 00624 logStream.writeValue(rootPageId); 00625 getTupleDescriptor().writePersistent(logStream); 00626 getKeyProjection().writePersistent(logStream); 00627 } 00628 00629 void BTreeWriter::undoLogicalAction( 00630 LogicalActionType actionType, 00631 ByteInputStream &logStream) 00632 { 00633 switch (actionType) { 00634 case ACTION_INSERT: 00635 deleteLogged(logStream); 00636 break; 00637 case ACTION_DELETE: 00638 insertLogged(logStream); 00639 break; 00640 default: 00641 permAssert(false); 00642 } 00643 } 00644 00645 void BTreeWriter::redoLogicalAction( 00646 LogicalActionType actionType, 00647 ByteInputStream &logStream) 00648 { 00649 switch (actionType) { 00650 case ACTION_INSERT: 00651 insertLogged(logStream); 00652 break; 00653 case ACTION_DELETE: 00654 deleteLogged(logStream); 00655 break; 00656 default: 00657 permAssert(false); 00658 } 00659 } 00660 00661 void BTreeWriter::insertLogged(ByteInputStream &logStream) 00662 { 00663
00664 uint cbTuple = insertTupleFromBuffer( 00665 logStream.getReadPointer(1),DUP_ALLOW); 00666 logStream.consumeReadPointer(cbTuple); 00667 } 00668 00669 void BTreeWriter::deleteLogged(ByteInputStream &logStream) 00670 { 00671 pLeafNodeAccessor->tupleAccessor.setCurrentTupleBuf( 00672 logStream.getReadPointer(1)); 00673 uint cbTuple = pLeafNodeAccessor->tupleAccessor.getCurrentByteCount(); 00674 pLeafNodeAccessor->unmarshalKey(searchKeyData); 00675 if (searchForKey(searchKeyData,DUP_SEEK_ANY)) { 00676 deleteCurrent(); 00677 } else { 00678
00679 } 00680 endSearch(); 00681 logStream.consumeReadPointer(cbTuple); 00682 } 00683 00684 void BTreeWriter::releaseScratchBuffers() 00685 { 00686 scratchPageLock.unlock(); 00687 } 00688 00689 bool BTreeWriter::positionSearchKey(BTreeNodeAccessor &nodeAccessor) 00690 { 00691 nodeAccessor.unmarshalKey(searchKeyData); 00692 pageStack.clear(); 00693 DuplicateSeek seekMode = (monotonic) ? DUP_SEEK_END : DUP_SEEK_ANY; 00694 bool duplicate = searchForKeyTemplate< true, std::vector >( 00695 searchKeyData,seekMode,true,pageStack,getRootPageId(),rootLockMode, 00696 READ_ALL); 00697 return duplicate; 00698 } 00699 00700 bool BTreeWriter::checkMonotonicity( 00701 BTreeNodeAccessor &nodeAccessor, PConstBuffer pTupleBuffer) 00702 { 00703
00704 if (iTupleOnLowestLevel == 0) { 00705 return true; 00706 } 00707 BTreeNode const &node = pageLock.getNodeForRead(); 00708 accessTupleInline(node, iTupleOnLowestLevel - 1); 00709 nodeAccessor.unmarshalKey(comparisonKeyData); 00710 00711 nodeAccessor.tupleAccessor.setCurrentTupleBuf(pTupleBuffer); 00712 nodeAccessor.unmarshalKey(searchKeyData); 00713 int keyComp = keyDescriptor.compareTuples(comparisonKeyData, searchKeyData); 00714 00715 return (keyComp <= 0 && node.nEntries == iTupleOnLowestLevel); 00716 } 00717 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/btree/BTreeWriter.cpp#23 $"); 00718 00719