Fennel: /home/pub/open/dev/fennel/segment/VersionedSegment.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/segment/VersionedSegment.h" 00026 #include "fennel/segment/WALSegment.h" 00027 #include "fennel/segment/SegPageLock.h" 00028 #include "fennel/segment/SegmentFactory.h" 00029 #include "fennel/cache/CachePage.h" 00030 00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/segment/VersionedSegment.cpp#19 $"); 00032 00033 00034 00035 00041 struct VersionedPageFooter 00042 { 00048 PageId dataPageId; 00049 00053 SegVersionNum versionNumber; 00054 00058 PseudoUuid onlineUuid; 00059 00065 uint64_t checksum; 00066 }; 00067 00068 VersionedSegment::VersionedSegment( 00069 SharedSegment dataSegmentInit, 00070 SharedSegment logSegmentInit, 00071 PseudoUuid const &onlineUuidInit, 00072 SegVersionNum versionNumberInit) 00073 : DelegatingSegment(dataSegmentInit) 00074 { 00075 logSegment = logSegmentInit; 00076 pWALSegment = SegmentFactory::dynamicCast<WALSegment *>(logSegment); 00077 assert(pWALSegment); 00078 00079 setUsablePageSize( 00080 DelegatingSegment::getUsablePageSize() 00081 - sizeof(VersionedPageFooter)); 00082 00083 onlineUuid = onlineUuidInit; 00084 versionNumber = versionNumberInit; 00085 oldestLogPageId = NULL_PAGE_ID; 00086 newestLogPageId = NULL_PAGE_ID; 00087 lastCheckpointLogPageId = NULL_PAGE_ID; 00088 inRecovery = false; 00089 } 00090 00091 VersionedSegment::~VersionedSegment() 00092 { 00093 pWALSegment = NULL; 00094 assert(dataToLogMap.empty()); 00095 } 00096 00097 00098 00099 void VersionedSegment::delegatedCheckpoint( 00100 Segment &delegatingSegment,CheckpointType checkpointType) 00101 { 00102 if (checkpointType != CHECKPOINT_DISCARD) { 00103
00104
00105 logSegment->checkpoint(checkpointType); 00106 assert(pWALSegment->getMinDirtyPageId() == NULL_PAGE_ID); 00107 } 00108 if (checkpointType == CHECKPOINT_FLUSH_FUZZY) { 00109 MappedPageListenerPredicate pagePredicate(delegatingSegment); 00110 fuzzyCheckpointSet.setDelegatePagePredicate(pagePredicate); 00111 pCache->checkpointPages(fuzzyCheckpointSet,checkpointType); 00112 fuzzyCheckpointSet.finishCheckpoint(); 00113 if (lastCheckpointLogPageId != NULL_PAGE_ID) { 00114 oldestLogPageId = logSegment->getPageSuccessor( 00115 lastCheckpointLogPageId); 00116 } else { 00117 oldestLogPageId = NULL_PAGE_ID; 00118 } 00119 } else { 00120 DelegatingSegment::delegatedCheckpoint( 00121 delegatingSegment,checkpointType); 00122 fuzzyCheckpointSet.clear(); 00123 oldestLogPageId = NULL_PAGE_ID; 00124 } 00125 00126 if (checkpointType == CHECKPOINT_DISCARD) { 00127 logSegment->checkpoint(checkpointType); 00128 } 00129 00130 StrictMutexGuard mutexGuard(mutex); 00131 ++versionNumber; 00132 dataToLogMap.clear(); 00133 } 00134 00135 void VersionedSegment::deallocateCheckpointedLog(CheckpointType checkpointType) 00136 { 00137 if (checkpointType == CHECKPOINT_FLUSH_FUZZY) { 00138 if (lastCheckpointLogPageId != NULL_PAGE_ID) { 00139 logSegment->deallocatePageRange( 00140 NULL_PAGE_ID,lastCheckpointLogPageId); 00141 if (lastCheckpointLogPageId == newestLogPageId) { 00142 newestLogPageId = NULL_PAGE_ID; 00143 } 00144 } 00145 } else { 00146 logSegment->deallocatePageRange(NULL_PAGE_ID,NULL_PAGE_ID); 00147 newestLogPageId = NULL_PAGE_ID; 00148 } 00149 lastCheckpointLogPageId = newestLogPageId; 00150 } 00151 00152 void VersionedSegment::deallocatePageRange( 00153 PageId startPageId,PageId endPageId) 00154 { 00155
00156 assert(startPageId == endPageId); 00157 assert(startPageId != NULL_PAGE_ID); 00158 00159
00160 DelegatingSegment::deallocatePageRange(startPageId,endPageId); 00161 } 00162 00163 void VersionedSegment::notifyPageDirty(CachePage &page,bool bDataValid) 00164 { 00165 DelegatingSegment::notifyPageDirty(page,bDataValid); 00166 00167 if (inRecovery) { 00168
00169
00170
00171 return; 00172 } 00173 00174 VersionedPageFooter *pDataFooter = reinterpret_cast<VersionedPageFooter *>( 00175 getWritableFooter(page)); 00176 00177 if (!bDataValid) { 00178
00179 pDataFooter->dataPageId = NULL_PAGE_ID; 00180 pDataFooter->onlineUuid.generateInvalid(); 00181 pDataFooter->versionNumber = versionNumber; 00182 pDataFooter->checksum = 0; 00183 return; 00184 } 00185 00186 assert(pDataFooter->versionNumber <= versionNumber); 00187 if (pDataFooter->versionNumber == versionNumber) { 00188
00189 return; 00190 } 00191 00192
00193 SegmentAccessor logSegmentAccessor(logSegment,pCache); 00194 SegPageLock logPageLock(logSegmentAccessor); 00195 PageId logPageId = logPageLock.allocatePage(); 00196 00197
00198 00199
00200 PBuffer pLogPageBuffer = logPageLock.getPage().getWritableData(); 00201 memcpy( 00202 pLogPageBuffer, 00203 page.getReadableData(), 00204 getUsablePageSize()); 00205 VersionedPageFooter *pLogFooter = reinterpret_cast<VersionedPageFooter *>( 00206 getWritableFooter(logPageLock.getPage())); 00207 pLogFooter->versionNumber = versionNumber - 1; 00208 pLogFooter->onlineUuid = onlineUuid; 00209 PageId dataPageId = DelegatingSegment::translateBlockId( 00210 page.getBlockId()); 00211 pLogFooter->dataPageId = dataPageId; 00212 00213 pLogFooter->checksum = computeChecksum(pLogPageBuffer); 00214 00215
00216 pDataFooter->versionNumber = versionNumber; 00217 00218
00219 pCache->nicePage(logPageLock.getPage()); 00220 00221 StrictMutexGuard mutexGuard(mutex); 00222 dataToLogMap[dataPageId] = logPageId; 00223 if ((newestLogPageId == NULL_PAGE_ID) || (logPageId > newestLogPageId)) { 00224 newestLogPageId = logPageId; 00225 } 00226 if ((oldestLogPageId == NULL_PAGE_ID) || (logPageId < oldestLogPageId)) { 00227 oldestLogPageId = logPageId; 00228 } 00229 } 00230 00231 SegVersionNum VersionedSegment::computeChecksum(void const *pPageData) 00232 { 00233 crcComputer.reset(); 00234 crcComputer.process_bytes(pPageData,getUsablePageSize()); 00235 return crcComputer.checksum(); 00236 } 00237 00238 bool VersionedSegment::canFlushPage(CachePage &page) 00239 { 00240
00241 00242 PageId minLogPageId = pWALSegment->getMinDirtyPageId(); 00243 if (minLogPageId == NULL_PAGE_ID) { 00244 return DelegatingSegment::canFlushPage(page); 00245 } 00246 00247 StrictMutexGuard mutexGuard(mutex); 00248 PageId dataPageId = DelegatingSegment::translateBlockId( 00249 page.getBlockId()); 00250 PageMapConstIter pLogPageId = dataToLogMap.find(dataPageId); 00251 if (pLogPageId == dataToLogMap.end()) { 00252
00253 return DelegatingSegment::canFlushPage(page); 00254 } 00255 PageId logPageId = pLogPageId->second; 00256 if (logPageId >= minLogPageId) { 00257 return false; 00258 } 00259 return DelegatingSegment::canFlushPage(page); 00260 } 00261 00262 void VersionedSegment::prepareOnlineRecovery() 00263 { 00264
00265
00266 logSegment->checkpoint(CHECKPOINT_FLUSH_ALL); 00267 00268 StrictMutexGuard mutexGuard(mutex); 00269 00270 dataToLogMap.clear(); 00271 oldestLogPageId = NULL_PAGE_ID; 00272 } 00273 00274 void VersionedSegment::recover( 00275 SharedSegment pDelegatingSegment, 00276 PageId firstLogPageId, 00277 SegVersionNum versionNumberInit, 00278 PseudoUuid const &onlineUuidInit) 00279 { 00280 onlineUuid = onlineUuidInit; 00281 recover(pDelegatingSegment, firstLogPageId, versionNumberInit); 00282 } 00283 00284 void VersionedSegment::recover( 00285 SharedSegment pDelegatingSegment, 00286 PageId firstLogPageId, 00287 SegVersionNum versionNumberInit) 00288 { 00289 assert(dataToLogMap.empty()); 00290 assert(pWALSegment->getMinDirtyPageId() == NULL_PAGE_ID); 00291 00292 inRecovery = true; 00293 00294 if (isMAXU(versionNumberInit)) { 00295 versionNumber = versionNumberInit; 00296 } 00297 00298
00299
00300
00301
00302
00303 std::hash_set recoveredPageSet; 00304 00305
00306 00307
00308
00309 SegmentAccessor logSegmentAccessor(logSegment,pCache); 00310 SegmentAccessor dataSegmentAccessor(pDelegatingSegment,pCache); 00311 for (; firstLogPageId != NULL_PAGE_ID; 00312 firstLogPageId = logSegment->getPageSuccessor(firstLogPageId)) 00313 { 00314 SegPageLock logPageLock(logSegmentAccessor); 00315 logPageLock.lockShared(firstLogPageId); 00316 if (!logPageLock.getPage().isDataValid()) { 00317 break; 00318 } 00319 PConstBuffer pLogPageBuffer = logPageLock.getPage().getReadableData(); 00320 VersionedPageFooter const *pLogFooter = 00321 reinterpret_cast<VersionedPageFooter const *>( 00322 getReadableFooter(logPageLock.getPage())); 00323 if (pLogFooter->checksum != computeChecksum(pLogPageBuffer)) { 00324 break; 00325 } 00326 if (pLogFooter->onlineUuid != onlineUuid) { 00327 break; 00328 } 00329 assert(pLogFooter->versionNumber < (versionNumber + 2)); 00330 if (pLogFooter->versionNumber < versionNumber) { 00331 continue; 00332 } 00333 if (recoveredPageSet.find(pLogFooter->dataPageId) 00334 != recoveredPageSet.end()) 00335 { 00336 assert(pLogFooter->versionNumber > versionNumber); 00337 continue; 00338 } 00339 00340 SegPageLock dataPageLock(dataSegmentAccessor); 00341 dataPageLock.lockExclusive(pLogFooter->dataPageId); 00342 memcpy( 00343 dataPageLock.getPage().getWritableData(), 00344 pLogPageBuffer, 00345 getFullPageSize()); 00346 recoveredPageSet.insert(pLogFooter->dataPageId); 00347 } 00348 00349 inRecovery = false; 00350 } 00351 00352 SegVersionNum VersionedSegment::getPageVersion(CachePage &page) 00353 { 00354 VersionedPageFooter const *pFooter = 00355 reinterpret_cast<VersionedPageFooter const *>( 00356 getReadableFooter(page)); 00357 return pFooter->versionNumber; 00358 } 00359 00360 SegVersionNum VersionedSegment::getVersionNumber() const 00361 { 00362 return versionNumber; 00363 } 00364 00365 SharedSegment VersionedSegment::getLogSegment() const 00366 { 00367 return logSegment; 00368 } 00369 00370 PageId VersionedSegment::getOnlineRecoveryPageId() const 00371 { 00372 return oldestLogPageId; 00373 } 00374 00375 PageId VersionedSegment::getRecoveryPageId() const 00376 { 00377 if (oldestLogPageId == NULL_PAGE_ID) { 00378
00379
00380 return FIRST_LINEAR_PAGE_ID; 00381 } else { 00382 return oldestLogPageId; 00383 } 00384 } 00385 00386 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/segment/VersionedSegment.cpp#19 $"); 00387 00388