Fennel: /home/pub/open/dev/fennel/txn/LogicalTxn.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/txn/LogicalTxn.h" 00026 #include "fennel/txn/LogicalTxnParticipant.h" 00027 #include "fennel/segment/SegOutputStream.h" 00028 #include "fennel/segment/SegInputStream.h" 00029 #include "fennel/segment/SpillOutputStream.h" 00030 #include "fennel/txn/LogicalTxnLog.h" 00031 #include "fennel/txn/LogicalRecoveryLog.h" 00032 #include "fennel/txn/LogicalRecoveryTxn.h" 00033 00034 #include <boost/bind.hpp> 00035 00036 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/txn/LogicalTxn.cpp#11 $"); 00037 00038 LogicalTxn::LogicalTxn( 00039 TxnId txnIdInit, 00040 SharedLogicalTxnLog pLogInit, 00041 SharedCacheAccessor pCacheAccessorInit) 00042 : txnId(txnIdInit), 00043 pLog(pLogInit), 00044 pCacheAccessor(pCacheAccessorInit) 00045 { 00046 pOutputStream = SpillOutputStream::newSpillOutputStream( 00047 pLog->pSegmentFactory, 00048 pCacheAccessor, 00049 LogicalRecoveryLog::getLongLogFileName(txnId)); 00050 00051
00052
00053
00054 pOutputStream->setWriteLatency(WRITE_EAGER_ASYNC); 00055 00056 state = STATE_LOGGING_TXN; 00057 svpt.cbActionPrev = 0; 00058 svpt.cbLogged = 0; 00059 checkpointed = false; 00060 } 00061 00062 LogicalTxn::~LogicalTxn() 00063 { 00064 assert(isEnded()); 00065 assert(participants.empty()); 00066 } 00067 00068 void LogicalTxn::addParticipant(SharedLogicalTxnParticipant pParticipant) 00069 { 00070 if (pParticipant->pTxn) { 00071 assert(pParticipant->pTxn == this); 00072 return; 00073 } 00074 participants.push_back(pParticipant); 00075 pParticipant->pTxn = this; 00076 pParticipant->enableLogging(true); 00077 describeParticipant(pParticipant); 00078 } 00079 00080 ByteOutputStream &LogicalTxn::beginLogicalAction( 00081 LogicalTxnParticipant &participant, 00082 LogicalActionType actionType) 00083 { 00084 assert(participant.pTxn == this); 00085 return beginLogicalAction(&participant,actionType); 00086 } 00087 00088 ByteOutputStream &LogicalTxn::beginLogicalAction( 00089 LogicalTxnParticipant *pParticipant, 00090 LogicalActionType actionType) 00091 { 00092 assert(state == STATE_LOGGING_TXN); 00093 LogicalTxnActionHeader actionHeader; 00094 actionHeader.pParticipant = pParticipant; 00095 actionHeader.actionType = actionType; 00096 actionHeader.cbActionPrev = svpt.cbActionPrev; 00097 pOutputStream->writeValue(actionHeader); 00098 state = STATE_LOGGING_ACTION; 00099 return *pOutputStream; 00100 } 00101 00102 void LogicalTxn::endLogicalAction() 00103 { 00104 assert(state == STATE_LOGGING_ACTION); 00105 state = STATE_LOGGING_TXN; 00106 svpt.cbActionPrev = 00107 pOutputStream->getOffset() - svpt.cbLogged; 00108 svpt.cbLogged = pOutputStream->getOffset(); 00109 } 00110 00111 SavepointId LogicalTxn::createSavepoint() 00112 { 00113 assert(state == STATE_LOGGING_TXN); 00114 SavepointId svptId = SavepointId(savepoints.size()); 00115 savepoints.push_back(svpt); 00116 return svptId; 00117 } 00118 00119 void LogicalTxn::commitSavepoint(SavepointId svptId) 00120 { 00121 assert(state == STATE_LOGGING_TXN); 00122 uint iSvpt = opaqueToInt(svptId); 00123 assert(iSvpt < savepoints.size()); 00124 savepoints.resize(iSvpt); 00125 } 00126 00127 void LogicalTxn::rollback(SavepointId const *pSvptId) 00128 { 00129 assert(state == STATE_LOGGING_TXN); 00130 if (pSvptId) { 00131 uint iSvpt = opaqueToInt(*pSvptId); 00132 assert(iSvpt < savepoints.size()); 00133 savepoints.resize(iSvpt + 1); 00134 rollbackToSavepoint(savepoints[iSvpt]); 00135 return; 00136 } 00137 00138
00139 SharedLogicalTxn pThis = shared_from_this(); 00140 00141
00142
00143 forgetAllParticipants(); 00144 00145 SharedSegment pLongLogSegment = pOutputStream->getSegment(); 00146 SharedByteInputStream pInputStream = 00147 pOutputStream->getInputStream(SEEK_STREAM_END); 00148 pOutputStream->close(); 00149 assert(svpt.cbLogged == pInputStream->getOffset()); 00150 00151 { 00152 state = STATE_ROLLING_BACK; 00153 LogicalRecoveryTxn recoveryTxn(pInputStream,NULL); 00154 recoveryTxn.undoActions(svpt); 00155 } 00156 00157 svpt.cbLogged = pInputStream->getOffset(); 00158 state = STATE_ROLLED_BACK; 00159 pInputStream.reset(); 00160 if (pLongLogSegment) { 00161 pLongLogSegment->checkpoint(CHECKPOINT_DISCARD); 00162 pLongLogSegment.reset(); 00163 } 00164 pLog->rollbackTxn(pThis); 00165 pLog.reset(); 00166 pOutputStream.reset(); 00167 } 00168 00169 void LogicalTxn::commit() 00170 { 00171
00172 SharedLogicalTxn pThis = shared_from_this(); 00173 pLog->commitTxn(pThis); 00174 pLog.reset(); 00175 state = STATE_COMMITTED; 00176 forgetAllParticipants(); 00177 } 00178 00179 void LogicalTxn::describeAllParticipants() 00180 { 00181 std::for_each( 00182 participants.begin(), 00183 participants.end(), 00184 boost::bind(&LogicalTxn::describeParticipant,this,_1)); 00185 } 00186 00187 void LogicalTxn::describeParticipant(SharedLogicalTxnParticipant pParticipant) 00188 { 00189 beginLogicalAction(pParticipant,ACTION_TXN_DESCRIBE_PARTICIPANT); 00190 LogicalTxnClassId classId = 00191 pParticipant->getParticipantClassId(); 00192 pOutputStream->writeValue(classId); 00193 pParticipant->describeParticipant(pOutputStream); 00194 endLogicalAction(); 00195 } 00196 00197 void LogicalTxn::forgetAllParticipants() 00198 { 00199 std::for_each( 00200 participants.begin(), 00201 participants.end(), 00202 boost::bind(&LogicalTxnParticipant::clearLogicalTxn,_1)); 00203 participants.clear(); 00204 } 00205 00206 void LogicalTxn::rollbackToSavepoint(LogicalTxnSavepoint &oldSvpt) 00207 { 00208 assert(oldSvpt.cbLogged <= svpt.cbLogged); 00209
00210 std::for_each( 00211 participants.begin(), 00212 participants.end(), 00213 boost::bind(&LogicalTxnParticipant::enableLogging,_1,false)); 00214 00215
00216 00217 SharedByteInputStream pInputStream = 00218 pOutputStream->getInputStream(SEEK_STREAM_END); 00219 assert(svpt.cbLogged == pInputStream->getOffset()); 00220 { 00221 state = STATE_ROLLING_BACK; 00222 LogicalRecoveryTxn recoveryTxn(pInputStream,NULL); 00223 recoveryTxn.undoActions(svpt,MAXU,oldSvpt.cbLogged); 00224 state = STATE_LOGGING_TXN; 00225 } 00226 pInputStream.reset(); 00227 00228
00229 std::for_each( 00230 participants.begin(), 00231 participants.end(), 00232 boost::bind(&LogicalTxnParticipant::enableLogging,_1,true)); 00233 00234
00235 beginLogicalAction(NULL,ACTION_TXN_ROLLBACK_TO_SAVEPOINT); 00236 pOutputStream->writeValue(oldSvpt); 00237 endLogicalAction(); 00238 } 00239 00240 SharedLogicalTxnLog LogicalTxn::getLog() 00241 { 00242 return pLog; 00243 } 00244 00245 bool LogicalTxn::isEnded() const 00246 { 00247 return state == STATE_ROLLED_BACK || state == STATE_COMMITTED; 00248 } 00249 00250 TxnId LogicalTxn::getTxnId() const 00251 { 00252 return txnId; 00253 } 00254 00255 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/txn/LogicalTxn.cpp#11 $"); 00256 00257