Fennel: /home/pub/open/dev/fennel/segment/SegPageBackupRestoreDevice.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/common/FennelResource.h" 00026 #include "fennel/common/FileSystem.h" 00027 #include "fennel/segment/SegPageBackupRestoreDevice.h" 00028 #include "fennel/segment/SegmentFactory.h" 00029 00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/segment/SegPageBackupRestoreDevice.cpp#6 $"); 00031 00032 void BackupRestorePage::setParent(WeakSegPageBackupRestoreDevice pParentInit) 00033 { 00034 pParent = pParentInit; 00035 } 00036 00037 PBuffer BackupRestorePage::getBuffer() const 00038 { 00039 return pBuffer; 00040 } 00041 00042 void BackupRestorePage::setBufferSize(uint bufferSizeInit) 00043 { 00044 bufferSize = bufferSizeInit; 00045 } 00046 00047 uint BackupRestorePage::getBufferSize() const 00048 { 00049 return bufferSize; 00050 } 00051 00052 void BackupRestorePage::setBuffer(PBuffer pBufferInit) 00053 { 00054 pBuffer = pBufferInit; 00055 } 00056 00057 BlockNum BackupRestorePage::getPageCounter() 00058 { 00059 return pageCounter; 00060 } 00061 00062 void BackupRestorePage::setPageCounter(BlockNum counter) 00063 { 00064 pageCounter = counter; 00065 } 00066 00067 void BackupRestorePage::setReadRequest(bool isReadInit) 00068 { 00069 isRead = isReadInit; 00070 } 00071 00072 void BackupRestorePage::notifyTransferCompletion(bool bSuccess) 00073 { 00074 SharedSegPageBackupRestoreDevice sharedPtr = pParent.lock(); 00075 StrictMutexGuard mutexGuard(sharedPtr->getMutex()); 00076 00077 if (isRead) { 00078 sharedPtr->notifyReadTransferCompletion(*this, bSuccess); 00079 } else { 00080 sharedPtr->notifyWriteTransferCompletion(*this, bSuccess); 00081 } 00082 00083
00084
00085 sharedPtr.reset(); 00086 } 00087 00088 SharedSegPageBackupRestoreDevice 00089 SegPageBackupRestoreDevice::newSegPageBackupRestoreDevice( 00090 const std::string &backupFilePathname, 00091 const char *mode, 00092 const std::string &compressionProgram, 00093 uint nScratchPages, 00094 uint nReservedPages, 00095 SegmentAccessor &scratchAccessor, 00096 DeviceAccessScheduler &scheduler, 00097 SharedRandomAccessDevice pDataDevice) 00098 { 00099 SharedSegPageBackupRestoreDevice pDevice = 00100 SharedSegPageBackupRestoreDevice( 00101 new SegPageBackupRestoreDevice( 00102 backupFilePathname, 00103 mode, 00104 compressionProgram, 00105 nScratchPages, 00106 nReservedPages, 00107 scratchAccessor, 00108 scheduler, 00109 pDataDevice), 00110 ClosableObjectDestructor()); 00111 pDevice->init(); 00112 return pDevice; 00113 } 00114 00115 SegPageBackupRestoreDevice::SegPageBackupRestoreDevice( 00116 const std::string &backupFilePathnameInit, 00117 const char *modeInit, 00118 const std::string &compressionProgramInit, 00119 uint nScratchPagesInit, 00120 uint nReservedPagesInit, 00121 SegmentAccessor &scratchAccessorInit, 00122 DeviceAccessScheduler &schedulerInit, 00123 SharedRandomAccessDevice pDataDeviceInit) : 00124 backupFilePathname(backupFilePathnameInit), 00125 nReservedPages(nReservedPagesInit), 00126 scratchAccessor(scratchAccessorInit), 00127 scheduler(schedulerInit), 00128 pDataDevice(pDataDeviceInit) 00129 { 00130 mode = (char *) modeInit; 00131 setCompressionProgramPathname(compressionProgramInit); 00132 nScratchPages = nScratchPagesInit; 00133 currPageReadCount = 0; 00134 currPageWriteCount = 0; 00135 backupFile = NULL; 00136 } 00137 00138 void SegPageBackupRestoreDevice::init() 00139 { 00140 if (compressionProgram.length() == 0) { 00141 backupFile = fopen(backupFilePathname.c_str(), mode); 00142 isCompressed = false; 00143 } else { 00144 #ifdef MSVC 00145 throw FennelExcn( 00146 FennelResource::instance().unsupportedOperation("popen")); 00147 #else 00148 std::ostringstream cmd; 00149 if (mode[0] == 'r') { 00150 cmd << compressionProgram.c_str() << " -dc " 00151 << backupFilePathname.c_str(); 00152 } else { 00153 cmd << compressionProgram.c_str() << " > " 00154 << backupFilePathname.c_str(); 00155 } 00156 backupFile = popen(cmd.str().c_str(), mode); 00157 isCompressed = true; 00158 #endif 00159 } 00160 00161 if (backupFile == NULL) { 00162 throw SysCallExcn( 00163 FennelResource::instance().openBackupFileFailed( 00164 backupFilePathname)); 00165 } 00166 00167 scratchLock.accessSegment(scratchAccessor); 00168 pageSize = scratchAccessor.pSegment->getUsablePageSize(); 00169 initScratchPages(scratchLock, nScratchPages, nReservedPages, pageSize); 00170 } 00171 00172 void SegPageBackupRestoreDevice::setCompressionProgramPathname( 00173 const std::string &programName) 00174 { 00175 if (programName.length() == 0) { 00176 compressionProgram = ""; 00177 } else { 00178 std::string path = "/bin/"; 00179 compressionProgram = path + programName; 00180 if (FileSystem::doesFileExist(compressionProgram.c_str())) { 00181 path = "/usr/bin/"; 00182 compressionProgram = path + programName; 00183 if (FileSystem::doesFileExist(compressionProgram.c_str())) { 00184 compressionProgram = programName; 00185 } 00186 } 00187 } 00188 } 00189 00190 void SegPageBackupRestoreDevice::initScratchPages( 00191 SegPageLock &scratchLock, 00192 uint nScratchPages, 00193 uint nReservedPages, 00194 uint bufferSize) 00195 { 00196 StrictMutexGuard mutexGuard(mutex); 00197 00198
00199
00200 backupRestorePages.reset(new BackupRestorePage[nScratchPages]); 00201 for (uint i = 0; i < nScratchPages; i++) { 00202 scratchLock.allocatePage(); 00203 backupRestorePages[i].setParent( 00204 WeakSegPageBackupRestoreDevice(shared_from_this())); 00205 backupRestorePages[i].setBufferSize(pageSize); 00206 backupRestorePages[i].setBuffer( 00207 scratchLock.getPage().getWritableData()); 00208 freeScratchPageQueue.push_back(&backupRestorePages[i]); 00209 } 00210 00211 reservedPages.reset(new PBuffer[nReservedPages]); 00212 for (uint i = 0; i < nReservedPages; i++) { 00213 scratchLock.allocatePage(); 00214 reservedPages[i] = scratchLock.getPage().getWritableData(); 00215 } 00216 } 00217 00218 PBuffer SegPageBackupRestoreDevice::getReservedBufferPage() 00219 { 00220 if (nReservedPages == 0) { 00221 assert(false); 00222 } 00223 return reservedPages[--nReservedPages]; 00224 } 00225 00226 void SegPageBackupRestoreDevice::writeBackupPage(PConstBuffer pageBuffer) 00227 { 00228 writeBackupPage(pageBuffer, false); 00229 } 00230 00231 void SegPageBackupRestoreDevice::writeBackupPage( 00232 PConstBuffer pageBuffer, 00233 bool scheduledWrite) 00234 { 00235 size_t pagesWritten = fwrite(pageBuffer, pageSize, 1, backupFile); 00236 if (pagesWritten < 1) { 00237 if (!scheduledWrite) { 00238 throw SysCallExcn( 00239 FennelResource::instance().writeBackupFileFailed( 00240 backupFilePathname)); 00241 } else if (pPendingExcn) { 00242
00243
00244
00245 pPendingExcn.reset( 00246 new SysCallExcn( 00247 FennelResource::instance().writeBackupFileFailed( 00248 backupFilePathname))); 00249 } 00250 } 00251 } 00252 00253 void SegPageBackupRestoreDevice::backupPage(BlockId blockId) 00254 { 00255
00256
00257 00258
00259 BackupRestorePage *pScratchPage = getFreeScratchPage(); 00260 00261 pScratchPage->setPageCounter(currPageReadCount++); 00262 RandomAccessRequest request; 00263 request.pDevice = pDataDevice.get(); 00264 request.cbOffset = (FileSize) pageSize * CompoundId::getBlockNum(blockId); 00265 request.cbTransfer = pageSize; 00266 request.type = RandomAccessRequest::READ; 00267 pScratchPage->setReadRequest(true); 00268 request.bindingList.push_back(*pScratchPage); 00269 scheduler.schedule(request); 00270 } 00271 00272 BackupRestorePage *SegPageBackupRestoreDevice::getFreeScratchPage() 00273 { 00274 while (true) { 00275 StrictMutexGuard mutexGuard(mutex); 00276 if (freeScratchPageQueue.empty()) { 00277 BackupRestorePage *freePage = freeScratchPageQueue.back(); 00278 freeScratchPageQueue.pop_back(); 00279 return freePage; 00280 } 00281 00282
00283
00284 boost::xtime atv; 00285 convertTimeout(100, atv); 00286 freeScratchPageCondition.timed_wait(mutexGuard, atv); 00287 checkPendingException(); 00288 } 00289 } 00290 00291 void SegPageBackupRestoreDevice::notifyReadTransferCompletion( 00292 BackupRestorePage &scratchPage, 00293 bool bSuccess) 00294 { 00295 if (!bSuccess) { 00296 if (pPendingExcn) { 00297 pPendingExcn.reset( 00298 new SysCallExcn( 00299 FennelResource::instance().readDataPageFailed())); 00300 } 00301 return; 00302 } 00303 00304 00305
00306
00307
00308 if (scratchPage.getPageCounter() == currPageWriteCount) { 00309 writeBackupPage(scratchPage.getBuffer(), true); 00310 freeScratchPage(scratchPage); 00311 } else { 00312 pendingWriteMap[scratchPage.getPageCounter()] = &scratchPage; 00313 } 00314 00315
00316
00317 while (true) { 00318 PendingWriteMapIter iter = pendingWriteMap.find(currPageWriteCount); 00319 if (iter == pendingWriteMap.end()) { 00320 break; 00321 } 00322 BackupRestorePage *nextPage = iter->second; 00323 pendingWriteMap.erase(currPageWriteCount); 00324 writeBackupPage(nextPage->getBuffer(), true); 00325 freeScratchPage(*nextPage); 00326
00327
00328 } 00329 } 00330 00331 void SegPageBackupRestoreDevice::freeScratchPage(BackupRestorePage &scratchPage) 00332 { 00333 ++currPageWriteCount; 00334 freeScratchPageQueue.push_back(&scratchPage); 00335 freeScratchPageCondition.notify_all(); 00336 } 00337 00338 void SegPageBackupRestoreDevice::waitForPendingWrites() 00339 { 00340 StrictMutexGuard mutexGuard(mutex); 00341 while (currPageWriteCount < currPageReadCount) { 00342 freeScratchPageCondition.wait(mutexGuard); 00343 checkPendingException(); 00344 } 00345 00346 checkPendingException(); 00347 } 00348 00349 void SegPageBackupRestoreDevice::restorePage(BlockId blockId) 00350 { 00351
00352 BackupRestorePage *pScratchPage = getFreeScratchPage(); 00353 00354 size_t pagesRead = 00355 fread(pScratchPage->getBuffer(), pageSize, 1, backupFile); 00356 if (pagesRead < 1) { 00357 throw SysCallExcn( 00358 FennelResource::instance().readBackupFileFailed( 00359 backupFilePathname)); 00360 } 00361 00362
00363 pScratchPage->setPageCounter(currPageReadCount++); 00364 RandomAccessRequest request; 00365 request.pDevice = pDataDevice.get(); 00366 request.cbOffset = (FileSize) pageSize * CompoundId::getBlockNum(blockId); 00367 request.cbTransfer = pageSize; 00368 request.type = RandomAccessRequest::WRITE; 00369 pScratchPage->setReadRequest(false); 00370 request.bindingList.push_back(*pScratchPage); 00371 scheduler.schedule(request); 00372 } 00373 00374 void SegPageBackupRestoreDevice::notifyWriteTransferCompletion( 00375 BackupRestorePage &scratchPage, 00376 bool bSuccess) 00377 { 00378 if (!bSuccess) { 00379 if (pPendingExcn) { 00380 pPendingExcn.reset( 00381 new SysCallExcn( 00382 FennelResource::instance().writeDataPageFailed())); 00383 } 00384 return; 00385 } 00386 00387 freeScratchPage(scratchPage); 00388 } 00389 00390 void SegPageBackupRestoreDevice::checkPendingException() 00391 { 00392 if (pPendingExcn) { 00393 pPendingExcn->throwSelf(); 00394 } 00395 } 00396 00397 void SegPageBackupRestoreDevice::closeImpl() 00398 { 00399
00400
00401
00402 waitForPendingWrites(); 00403 00404 StrictMutexGuard mutexGuard(mutex); 00405 if (scratchAccessor.pSegment) { 00406 scratchAccessor.pSegment->deallocatePageRange( 00407 NULL_PAGE_ID, NULL_PAGE_ID); 00408 } 00409 backupRestorePages.reset(); 00410 reservedPages.reset(); 00411 freeScratchPageQueue.clear(); 00412 if (backupFile != NULL) { 00413 if (isCompressed) { 00414 #ifdef MSVC 00415 throw FennelExcn( 00416 FennelResource::instance().unsupportedOperation("pclose")); 00417 #else 00418 pclose(backupFile); 00419 #endif 00420 } else { 00421 fclose(backupFile); 00422 } 00423 backupFile = NULL; 00424 } 00425 } 00426 00427 StrictMutex &SegPageBackupRestoreDevice::getMutex() 00428 { 00429 return mutex; 00430 } 00431 00432 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/segment/SegPageBackupRestoreDevice.cpp#6 $"); 00433 00434