Fennel: /home/pub/open/dev/fennel/farrago/CmdInterpreter.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/farrago/CmdInterpreter.h" 00026 #include "fennel/farrago/JavaErrorTarget.h" 00027 #include "fennel/farrago/JavaTraceTarget.h" 00028 #include "fennel/exec/ExecStreamGraphEmbryo.h" 00029 #include "fennel/exec/SimpleExecStreamGovernor.h" 00030 #include "fennel/farrago/ExecStreamBuilder.h" 00031 #include "fennel/cache/CacheParams.h" 00032 #include "fennel/common/ConfigMap.h" 00033 #include "fennel/common/FennelExcn.h" 00034 #include "fennel/common/FennelResource.h" 00035 #include "fennel/common/InvalidParamExcn.h" 00036 #include "fennel/common/Backtrace.h" 00037 #include "fennel/btree/BTreeBuilder.h" 00038 #include "fennel/db/Database.h" 00039 #include "fennel/db/CheckpointThread.h" 00040 #include "fennel/txn/LogicalTxn.h" 00041 #include "fennel/txn/LogicalTxnLog.h" 00042 #include "fennel/tuple/StoredTypeDescriptorFactory.h" 00043 #include "fennel/segment/SegmentFactory.h" 00044 #include "fennel/segment/SnapshotRandomAllocationSegment.h" 00045 #include "fennel/exec/ParallelExecStreamScheduler.h" 00046 #include "fennel/exec/DfsTreeExecStreamScheduler.h" 00047 #include "fennel/exec/ExecStreamGraph.h" 00048 #include "fennel/farrago/ExecStreamFactory.h" 00049 #include "fennel/ftrs/FtrsTableWriterFactory.h" 00050 #include "fennel/btree/BTreeVerifier.h" 00051 00052 #include <boost/lexical_cast.hpp> 00053 00054 #include <malloc.h> 00055 00056 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/farrago/CmdInterpreter.cpp#62 $"); 00057 00058 int64_t CmdInterpreter::executeCommand( 00059 ProxyCmd &cmd) 00060 { 00061 resultHandle = 0; 00062
00063 FemVisitor::visitTbl.accept(*this,cmd); 00064 return resultHandle; 00065 } 00066 00067 CmdInterpreter::DbHandle *CmdInterpreter::getDbHandle( 00068 SharedProxyDbHandle pHandle) 00069 { 00070 return reinterpret_cast<DbHandle *>(pHandle->getLongHandle()); 00071 } 00072 00073 CmdInterpreter::TxnHandle *CmdInterpreter::getTxnHandle( 00074 SharedProxyTxnHandle pHandle) 00075 { 00076 return reinterpret_cast<TxnHandle *>(pHandle->getLongHandle()); 00077 } 00078 00079 CmdInterpreter::StreamGraphHandle *CmdInterpreter::getStreamGraphHandle( 00080 SharedProxyStreamGraphHandle pHandle) 00081 { 00082 return reinterpret_cast<StreamGraphHandle *>(pHandle->getLongHandle()); 00083 } 00084 00085 SavepointId CmdInterpreter::getSavepointId(SharedProxySvptHandle pHandle) 00086 { 00087 return SavepointId(pHandle->getLongHandle()); 00088 } 00089 00090 TxnId CmdInterpreter::getCsn(SharedProxyCsnHandle pHandle) 00091 { 00092 return TxnId(pHandle->getLongHandle()); 00093 } 00094 00095 void CmdInterpreter::setDbHandle( 00096 SharedProxyDbHandle,DbHandle *pHandle) 00097 { 00098 resultHandle = reinterpret_cast(pHandle); 00099 } 00100 00101 void CmdInterpreter::setTxnHandle( 00102 SharedProxyTxnHandle,TxnHandle *pHandle) 00103 { 00104 resultHandle = reinterpret_cast(pHandle); 00105 } 00106 00107 void CmdInterpreter::setStreamGraphHandle( 00108 SharedProxyStreamGraphHandle,StreamGraphHandle pHandle) 00109 { 00110 resultHandle = reinterpret_cast(pHandle); 00111 } 00112 00113 void CmdInterpreter::setExecStreamHandle( 00114 SharedProxyStreamHandle,ExecStream pStream) 00115 { 00116 resultHandle = reinterpret_cast(pStream); 00117 } 00118 00119 void CmdInterpreter::setSvptHandle( 00120 SharedProxySvptHandle,SavepointId svptId) 00121 { 00122 resultHandle = opaqueToInt(svptId); 00123 } 00124 00125 void CmdInterpreter::setCsnHandle( 00126 SharedProxyCsnHandle, TxnId csnId) 00127 { 00128 resultHandle = opaqueToInt(csnId); 00129 } 00130 00131 CmdInterpreter::DbHandle CmdInterpreter::newDbHandle() 00132 { 00133 return new DbHandle(); 00134 } 00135 00136 CmdInterpreter::TxnHandle CmdInterpreter::newTxnHandle() 00137 { 00138 return new TxnHandle(); 00139 } 00140 00141 CmdInterpreter::DbHandle::~DbHandle() 00142 { 00143 statsTimer.stop(); 00144 00145
00146 if (pDb) { 00147 pDb->close(); 00148 } 00149 JniUtil::decrementHandleCount(DBHANDLE_TRACE_TYPE_STR, this); 00150 00151 JniUtil::shutdown(); 00152 } 00153 00154 CmdInterpreter::TxnHandle::~TxnHandle() 00155 { 00156 JniUtil::decrementHandleCount(TXNHANDLE_TRACE_TYPE_STR, this); 00157 } 00158 00159 CmdInterpreter::StreamGraphHandle::~StreamGraphHandle() 00160 { 00161 if (javaRuntimeContext) { 00162 JniEnvAutoRef pEnv; 00163 pEnv->DeleteGlobalRef(javaRuntimeContext); 00164 } 00165 JniUtil::decrementHandleCount(STREAMGRAPHHANDLE_TRACE_TYPE_STR, this); 00166 } 00167 00168 JavaTraceTarget *CmdInterpreter::newTraceTarget() 00169 { 00170 return new JavaTraceTarget(); 00171 } 00172 00173 SharedErrorTarget CmdInterpreter::newErrorTarget( 00174 jobject fennelJavaErrorTarget) 00175 { 00176 SharedErrorTarget errorTarget; 00177 errorTarget.reset(new JavaErrorTarget(fennelJavaErrorTarget)); 00178 return errorTarget; 00179 } 00180 00181 void CmdInterpreter::visit(ProxyCmdOpenDatabase &cmd) 00182 { 00183 ConfigMap configMap; 00184 00185 SharedProxyDatabaseParam pParam = cmd.getParams(); 00186 for (; pParam; ++pParam) { 00187 configMap.setStringParam(pParam->getName(),pParam->getValue()); 00188 } 00189 00190 CacheParams cacheParams; 00191 cacheParams.readConfig(configMap); 00192 SharedCache pCache = Cache::newCache(cacheParams); 00193 00194 JniUtilParams jniUtilParams; 00195 jniUtilParams.readConfig(configMap); 00196 JniUtil::configure(jniUtilParams); 00197 00198 DeviceMode openMode = cmd.isCreateDatabase() 00199 ? DeviceMode::createNew 00200 : DeviceMode::load; 00201 00202 std::auto_ptr pDbHandle(newDbHandle()); 00203 JniUtil::incrementHandleCount(DBHANDLE_TRACE_TYPE_STR, pDbHandle.get()); 00204 00205 JavaTraceTarget *pJavaTraceTarget = newTraceTarget(); 00206 pDbHandle->pTraceTarget.reset(pJavaTraceTarget); 00207
00208 AutoBacktrace::setTraceTarget(pDbHandle->pTraceTarget); 00209 00210 SharedDatabase pDb; 00211 try { 00212 pDb = Database::newDatabase( 00213 pCache, 00214 configMap, 00215 openMode, 00216 pDbHandle->pTraceTarget, 00217 SharedPseudoUuidGenerator(new JniPseudoUuidGenerator())); 00218 } catch (...) { 00219 AutoBacktrace::setTraceTarget(); 00220 throw; 00221 } 00222 00223 pDbHandle->pDb = pDb; 00224 00225 ExecStreamResourceKnobs knobSettings; 00226 knobSettings.cacheReservePercentage = 00227 configMap.getIntParam("cacheReservePercentage"); 00228 knobSettings.expectedConcurrentStatements = 00229 configMap.getIntParam("expectedConcurrentStatements"); 00230 00231 ExecStreamResourceQuantity resourcesAvailable; 00232 resourcesAvailable.nCachePages = pCache->getMaxLockedPages(); 00233 00234 pDbHandle->pResourceGovernor = 00235 SharedExecStreamGovernor( 00236 new SimpleExecStreamGovernor( 00237 knobSettings, resourcesAvailable, 00238 pDbHandle->pTraceTarget, 00239 "xo.resourceGovernor")); 00240 00241 if (pDb->isRecoveryRequired()) { 00242 SegmentAccessor scratchAccessor = 00243 pDb->getSegmentFactory()->newScratchSegment(pDb->getCache()); 00244 FtrsTableWriterFactory recoveryFactory( 00245 pDb, 00246 pDb->getCache(), 00247 pDb->getTypeFactory(), 00248 scratchAccessor); 00249 pDb->recover(recoveryFactory); 00250 cmd.setResultRecoveryRequired(true); 00251 } else { 00252 cmd.setResultRecoveryRequired(false); 00253 } 00254 pDbHandle->statsTimer.setTarget(*pJavaTraceTarget); 00255 pDbHandle->statsTimer.addSource(pDb); 00256 pDbHandle->statsTimer.addSource(pDbHandle->pResourceGovernor); 00257 pDbHandle->statsTimer.start(); 00258 00259
00260
00261 if (pCache->getMaxAllocatedPageCount() != cacheParams.nMemPagesMax || 00262 pCache->getAllocatedPageCount() != cacheParams.nMemPagesInit) 00263 { 00264 FENNEL_DELEGATE_TRACE( 00265 TRACE_WARNING, 00266 pDb, 00267 "Unable to allocate " 00268 << cacheParams.nMemPagesInit 00269 << " (of " 00270 << cacheParams.nMemPagesMax 00271 << " max) cache pages; allocated " 00272 << pCache->getAllocatedPageCount() 00273 << " cache pages."); 00274 } 00275 00276 setDbHandle(cmd.getResultHandle(),pDbHandle.release()); 00277 } 00278 00279 void CmdInterpreter::visit(ProxyCmdCloseDatabase &cmd) 00280 { 00281 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle()); 00282 pDbHandle->pResourceGovernor.reset(); 00283 AutoBacktrace::setTraceTarget(); 00284 deleteAndNullify(pDbHandle); 00285 } 00286 00287 void CmdInterpreter::visit(ProxyCmdCheckpoint &cmd) 00288 { 00289 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle()); 00290 00291 pDbHandle->pDb->requestCheckpoint( 00292 cmd.isFuzzy() ? CHECKPOINT_FLUSH_FUZZY : CHECKPOINT_FLUSH_ALL, 00293 cmd.isAsync()); 00294 } 00295 00296 void CmdInterpreter::visit(ProxyCmdSetParam &cmd) 00297 { 00298 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle()); 00299 SharedProxyDatabaseParam pParam = cmd.getParam(); 00300 00301 std::string paramName = pParam->getName(); 00302 00303 if (paramName.compare("cachePagesInit") == 0) { 00304 int pageCount = boost::lexical_cast(pParam->getValue()); 00305 SharedCache pCache = pDbHandle->pDb->getCache(); 00306 if (pageCount <= 0 || pageCount > pCache->getMaxAllocatedPageCount()) { 00307 throw InvalidParamExcn("1", "'cachePagesMax'"); 00308 } 00309 00310 bool decreasingPageCount = pageCount < pCache->getAllocatedPageCount(); 00311 if (decreasingPageCount) { 00312
00313 ExecStreamResourceQuantity available; 00314 available.nCachePages = pageCount; 00315 if (!pDbHandle->pResourceGovernor->setResourceAvailability( 00316 available, EXEC_RESOURCE_CACHE_PAGES)) 00317 { 00318 throw InvalidParamExcn( 00319 "the number of pages currently assigned (plus reserve)", 00320 "'cachePagesMax'"); 00321 } 00322 } 00323 00324 pCache->setAllocatedPageCount(pageCount); 00325 00326 if (!decreasingPageCount) { 00327
00328 ExecStreamResourceQuantity available; 00329 available.nCachePages = pageCount; 00330 bool result = 00331 pDbHandle->pResourceGovernor->setResourceAvailability( 00332 available, EXEC_RESOURCE_CACHE_PAGES); 00333 assert(result); 00334 } 00335 } else if (paramName.compare("expectedConcurrentStatements") == 0) { 00336 int nStatements = boost::lexical_cast(pParam->getValue()); 00337 SharedCache pCache = pDbHandle->pDb->getCache(); 00338
00339 if (nStatements <= 0 || 00340 nStatements > pCache->getMaxLockedPages() / 5) 00341 { 00342 throw InvalidParamExcn("1", "'cachePagesInit/5'"); 00343 } 00344 ExecStreamResourceKnobs knob; 00345 knob.expectedConcurrentStatements = nStatements; 00346 pDbHandle->pResourceGovernor->setResourceKnob( 00347 knob, EXEC_KNOB_EXPECTED_CONCURRENT_STATEMENTS); 00348 00349 } else if (paramName.compare("cacheReservePercentage") == 0) { 00350 int percent = boost::lexical_cast(pParam->getValue()); 00351 if (percent <= 0 || percent > 99) { 00352 throw InvalidParamExcn("1", "99"); 00353 } 00354 ExecStreamResourceKnobs knob; 00355 knob.cacheReservePercentage = percent; 00356 if (!pDbHandle->pResourceGovernor->setResourceKnob( 00357 knob, EXEC_KNOB_CACHE_RESERVE_PERCENTAGE)) 00358 { 00359 throw InvalidParamExcn( 00360 "1", 00361 "a percentage that sets aside fewer pages, to allow for pages already assigned"); 00362 } 00363 } 00364 } 00365 00366 void CmdInterpreter::getBTreeForIndexCmd( 00367 ProxyIndexCmd &cmd,PageId rootPageId,BTreeDescriptor &treeDescriptor) 00368 { 00369 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle()); 00370 00371 readTupleDescriptor( 00372 treeDescriptor.tupleDescriptor, 00373 *(cmd.getTupleDesc()),pTxnHandle->pDb->getTypeFactory()); 00374 00375 CmdInterpreter::readTupleProjection( 00376 treeDescriptor.keyProjection,cmd.getKeyProj()); 00377 00378 treeDescriptor.pageOwnerId = PageOwnerId(cmd.getIndexId()); 00379 treeDescriptor.segmentId = SegmentId(cmd.getSegmentId()); 00380 treeDescriptor.segmentAccessor.pSegment = 00381 pTxnHandle->pDb->getSegmentById( 00382 treeDescriptor.segmentId, 00383 pTxnHandle->pSnapshotSegment); 00384 treeDescriptor.segmentAccessor.pCacheAccessor = pTxnHandle->pDb->getCache(); 00385 treeDescriptor.rootPageId = rootPageId; 00386 } 00387 00388 void CmdInterpreter::visit(ProxyCmdCreateIndex &cmd) 00389 { 00390
00391 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle()); 00392 SXMutexSharedGuard actionMutexGuard( 00393 pTxnHandle->pDb->getCheckpointThread()->getActionMutex()); 00394 00395 BTreeDescriptor treeDescriptor; 00396 getBTreeForIndexCmd(cmd,NULL_PAGE_ID,treeDescriptor); 00397 BTreeBuilder builder(treeDescriptor); 00398 builder.createEmptyRoot(); 00399 resultHandle = opaqueToInt(builder.getRootPageId()); 00400 } 00401 00402 void CmdInterpreter::visit(ProxyCmdTruncateIndex &cmd) 00403 { 00404 dropOrTruncateIndex(cmd, false); 00405 } 00406 00407 void CmdInterpreter::visit(ProxyCmdDropIndex &cmd) 00408 { 00409 dropOrTruncateIndex(cmd, true); 00410 } 00411 00412 void CmdInterpreter::visit(ProxyCmdVerifyIndex &cmd) 00413 { 00414
00415 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle()); 00416 SXMutexSharedGuard actionMutexGuard( 00417 pTxnHandle->pDb->getCheckpointThread()->getActionMutex()); 00418 00419 BTreeDescriptor treeDescriptor; 00420 getBTreeForIndexCmd(cmd,PageId(cmd.getRootPageId()),treeDescriptor); 00421 TupleProjection leafPageIdProj; 00422 if (cmd.getLeafPageIdProj()) { 00423 CmdInterpreter::readTupleProjection( 00424 leafPageIdProj, cmd.getLeafPageIdProj()); 00425 } 00426 bool estimate = cmd.isEstimate(); 00427 bool includeTuples = cmd.isIncludeTuples(); 00428 bool keys = (!estimate); 00429 bool leaf = ((!estimate) || includeTuples); 00430 BTreeVerifier verifier(treeDescriptor); 00431 verifier.verify(true, keys, leaf); 00432 BTreeStatistics statistics = verifier.getStatistics(); 00433 long pageCount = statistics.nNonLeafNodes + statistics.nLeafNodes; 00434 if (includeTuples) { 00435 pageCount += statistics.nTuples; 00436 } 00437 cmd.setResultPageCount(pageCount); 00438 00439 if (keys) { 00440 cmd.setResultUniqueKeyCount(statistics.nUniqueKeys); 00441 } else { 00442 cmd.clearResultUniqueKeyCount(); 00443 } 00444 } 00445 00446 void CmdInterpreter::dropOrTruncateIndex( 00447 ProxyCmdDropIndex &cmd, bool drop) 00448 { 00449
00450 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle()); 00451 SXMutexSharedGuard actionMutexGuard( 00452 pTxnHandle->pDb->getCheckpointThread()->getActionMutex()); 00453 00454 BTreeDescriptor treeDescriptor; 00455 getBTreeForIndexCmd(cmd,PageId(cmd.getRootPageId()),treeDescriptor); 00456 TupleProjection leafPageIdProj; 00457 if (cmd.getLeafPageIdProj()) { 00458 CmdInterpreter::readTupleProjection( 00459 leafPageIdProj, cmd.getLeafPageIdProj()); 00460 } 00461 BTreeBuilder builder(treeDescriptor); 00462 builder.truncate(drop, leafPageIdProj.size() ? &leafPageIdProj : NULL); 00463 } 00464 00465 void CmdInterpreter::visit(ProxyCmdBeginTxn &cmd) 00466 { 00467 beginTxn(cmd, cmd.isReadOnly(), NULL_TXN_ID); 00468 } 00469 00470 void CmdInterpreter::beginTxn(ProxyBeginTxnCmd &cmd, bool readOnly, TxnId csn) 00471 { 00472 assert(readOnly || csn == NULL_TXN_ID); 00473 00474
00475 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle()); 00476 SharedDatabase pDb = pDbHandle->pDb; 00477 00478 SXMutexSharedGuard actionMutexGuard( 00479 pDb->getCheckpointThread()->getActionMutex()); 00480 00481 std::auto_ptr pTxnHandle(newTxnHandle()); 00482 JniUtil::incrementHandleCount(TXNHANDLE_TRACE_TYPE_STR, pTxnHandle.get()); 00483 pTxnHandle->pDb = pDb; 00484 pTxnHandle->readOnly = readOnly; 00485
00486 pTxnHandle->pTxn = pDb->getTxnLog()->newLogicalTxn(pDb->getCache()); 00487 pTxnHandle->pResourceGovernor = pDbHandle->pResourceGovernor; 00488 00489
00490
00491 SegmentAccessor scratchAccessor; 00492 00493 pTxnHandle->pFtrsTableWriterFactory = SharedFtrsTableWriterFactory( 00494 new FtrsTableWriterFactory( 00495 pDb, 00496 pDb->getCache(), 00497 pDb->getTypeFactory(), 00498 scratchAccessor)); 00499 00500
00501
00502
00503
00504 if (pDb->areSnapshotsEnabled()) { 00505 if (csn == NULL_TXN_ID) { 00506 csn = pTxnHandle->pTxn->getTxnId(); 00507 } 00508 pTxnHandle->pSnapshotSegment = 00509 pDb->getSegmentFactory()->newSnapshotRandomAllocationSegment( 00510 pDb->getDataSegment(), 00511 pDb->getDataSegment(), 00512 csn, 00513 false); 00514 pTxnHandle->pReadCommittedSnapshotSegment = 00515 pDb->getSegmentFactory()->newSnapshotRandomAllocationSegment( 00516 pDb->getDataSegment(), 00517 pDb->getDataSegment(), 00518 csn, 00519 true); 00520 } else { 00521 assert(csn == NULL_TXN_ID); 00522 } 00523 00524 setTxnHandle(cmd.getResultHandle(),pTxnHandle.release()); 00525 } 00526 00527 void CmdInterpreter::visit(ProxyCmdBeginTxnWithCsn &cmd) 00528 { 00529 beginTxn(cmd, true, getCsn(cmd.getCsnHandle())); 00530 } 00531 00532 void CmdInterpreter::visit(ProxyCmdSavepoint &cmd) 00533 { 00534 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle()); 00535 00536
00537 SXMutexSharedGuard actionMutexGuard( 00538 pTxnHandle->pDb->getCheckpointThread()->getActionMutex()); 00539 00540 setSvptHandle( 00541 cmd.getResultHandle(), 00542 pTxnHandle->pTxn->createSavepoint()); 00543 } 00544 00545 void CmdInterpreter::visit(ProxyCmdCommit &cmd) 00546 { 00547 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle()); 00548 SharedDatabase pDb = pTxnHandle->pDb; 00549 00550
00551 bool txnBlocksCheckpoint = !pTxnHandle->readOnly && pDb->shouldForceTxns(); 00552 SXMutexSharedGuard actionMutexGuard( 00553 pDb->getCheckpointThread()->getActionMutex()); 00554 00555 if (pDb->areSnapshotsEnabled()) { 00556
00557
00558
00559
00560 pTxnHandle->pTxn->commit(); 00561 pTxnHandle->pTxn = pDb->getTxnLog()->newLogicalTxn(pDb->getCache()); 00562 SnapshotRandomAllocationSegment *pSnapshotSegment = 00563 SegmentFactory::dynamicCast<SnapshotRandomAllocationSegment *>( 00564 pTxnHandle->pSnapshotSegment); 00565 TxnId commitTxnId = pTxnHandle->pTxn->getTxnId(); 00566 pSnapshotSegment->commitChanges(commitTxnId); 00567 00568
00569
00570
00571
00572
00573
00574
00575 if (txnBlocksCheckpoint) { 00576 pTxnHandle->pSnapshotSegment->checkpoint(CHECKPOINT_FLUSH_ALL); 00577 } 00578 } 00579 00580 if (cmd.getSvptHandle()) { 00581 SavepointId svptId = getSavepointId(cmd.getSvptHandle()); 00582 pTxnHandle->pTxn->commitSavepoint(svptId); 00583 } else { 00584 pTxnHandle->pTxn->commit(); 00585 deleteAndNullify(pTxnHandle); 00586 if (txnBlocksCheckpoint) { 00587
00588 actionMutexGuard.unlock(); 00589
00590
00591
00592 pDb->requestCheckpoint(CHECKPOINT_FLUSH_ALL, false); 00593 } 00594 } 00595 } 00596 00597 void CmdInterpreter::visit(ProxyCmdRollback &cmd) 00598 { 00599 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle()); 00600 SharedDatabase pDb = pTxnHandle->pDb; 00601 00602
00603 bool txnBlocksCheckpoint = !pTxnHandle->readOnly && pDb->shouldForceTxns(); 00604 SXMutexSharedGuard actionMutexGuard( 00605 pDb->getCheckpointThread()->getActionMutex()); 00606 00607 if (pDb->areSnapshotsEnabled()) { 00608 SnapshotRandomAllocationSegment *pSegment = 00609 SegmentFactory::dynamicCast<SnapshotRandomAllocationSegment *>( 00610 pTxnHandle->pSnapshotSegment); 00611 pSegment->rollbackChanges(); 00612 } 00613 00614 if (cmd.getSvptHandle()) { 00615 SavepointId svptId = getSavepointId(cmd.getSvptHandle()); 00616 pTxnHandle->pTxn->rollback(&svptId); 00617 } else { 00618 pTxnHandle->pTxn->rollback(); 00619 deleteAndNullify(pTxnHandle); 00620 if (txnBlocksCheckpoint && !pDb->areSnapshotsEnabled()) { 00621
00622
00623
00624
00625 pDb->recoverOnline(); 00626 } 00627 } 00628 } 00629 00630 void CmdInterpreter::visit(ProxyCmdGetTxnCsn &cmd) 00631 { 00632 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle()); 00633 SharedDatabase pDb = pTxnHandle->pDb; 00634 assert(pDb->areSnapshotsEnabled()); 00635 SnapshotRandomAllocationSegment *pSegment = 00636 SegmentFactory::dynamicCast<SnapshotRandomAllocationSegment *>( 00637 pTxnHandle->pSnapshotSegment); 00638 setCsnHandle(cmd.getResultHandle(), pSegment->getSnapshotCsn()); 00639 } 00640 00641 void CmdInterpreter::visit(ProxyCmdGetLastCommittedTxnId &cmd) 00642 { 00643 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle()); 00644 SharedDatabase pDb = pDbHandle->pDb; 00645 setCsnHandle(cmd.getResultHandle(), pDb->getLastCommittedTxnId()); 00646 } 00647 00648 void CmdInterpreter::visit(ProxyCmdCreateExecutionStreamGraph &cmd) 00649 { 00650 #if 0 00651 struct mallinfo minfo = mallinfo(); 00652 std::cout << "Number of allocated bytes before stream graph construction = " 00653 << minfo.uordblks << " bytes" << std::endl; 00654 #endif 00655 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle()); 00656 SharedDatabase pDb = pTxnHandle->pDb; 00657 SharedExecStreamGraph pGraph = 00658 ExecStreamGraph::newExecStreamGraph(); 00659 pGraph->setTxn(pTxnHandle->pTxn); 00660 pGraph->setResourceGovernor(pTxnHandle->pResourceGovernor); 00661 std::auto_ptr pStreamGraphHandle( 00662 new StreamGraphHandle()); 00663 JniUtil::incrementHandleCount( 00664 STREAMGRAPHHANDLE_TRACE_TYPE_STR, pStreamGraphHandle.get()); 00665 pStreamGraphHandle->javaRuntimeContext = NULL; 00666 pStreamGraphHandle->pTxnHandle = pTxnHandle; 00667 pStreamGraphHandle->pExecStreamGraph = pGraph; 00668 pStreamGraphHandle->pExecStreamFactory.reset( 00669 new ExecStreamFactory( 00670 pDb, 00671 pTxnHandle->pFtrsTableWriterFactory, 00672 pStreamGraphHandle.get())); 00673
00674
00675
00676
00677 if (pDb->areSnapshotsEnabled()) { 00678 pStreamGraphHandle->pSegment = 00679 pDb->getSegmentFactory()->newDynamicDelegatingSegment( 00680 pTxnHandle->pSnapshotSegment); 00681 pStreamGraphHandle->pReadCommittedSegment = 00682 pDb->getSegmentFactory()->newDynamicDelegatingSegment( 00683 pTxnHandle->pReadCommittedSnapshotSegment); 00684 } 00685 setStreamGraphHandle( 00686 cmd.getResultHandle(), 00687 pStreamGraphHandle.release()); 00688 } 00689 00690 void CmdInterpreter::visit(ProxyCmdPrepareExecutionStreamGraph &cmd) 00691 { 00692 StreamGraphHandle *pStreamGraphHandle = getStreamGraphHandle( 00693 cmd.getStreamGraphHandle()); 00694 TxnHandle *pTxnHandle = pStreamGraphHandle->pTxnHandle; 00695
00696 SharedExecStreamScheduler pScheduler; 00697 std::string schedulerName = "xo.scheduler"; 00698 if (cmd.getDegreeOfParallelism() == 1) { 00699 pScheduler.reset( 00700 new DfsTreeExecStreamScheduler( 00701 pTxnHandle->pDb->getSharedTraceTarget(), 00702 schedulerName)); 00703 } else { 00704 pScheduler.reset( 00705 new ParallelExecStreamScheduler( 00706 pTxnHandle->pDb->getSharedTraceTarget(), 00707 schedulerName, 00708 JniUtil::getThreadTracker(), 00709 cmd.getDegreeOfParallelism())); 00710 } 00711 ExecStreamGraphEmbryo graphEmbryo( 00712 pStreamGraphHandle->pExecStreamGraph, 00713 pScheduler, 00714 pTxnHandle->pDb->getCache(), 00715 pTxnHandle->pDb->getSegmentFactory()); 00716 pStreamGraphHandle->pExecStreamFactory->setGraphEmbryo(graphEmbryo); 00717 ExecStreamBuilder streamBuilder( 00718 graphEmbryo, 00719 *(pStreamGraphHandle->pExecStreamFactory)); 00720 streamBuilder.buildStreamGraph(cmd, true); 00721 pStreamGraphHandle->pExecStreamFactory.reset(); 00722 pStreamGraphHandle->pScheduler = pScheduler; 00723 #if 0 00724 struct mallinfo minfo = mallinfo(); 00725 std::cout << "Number of allocated bytes after stream graph construction = " 00726 << minfo.uordblks << " bytes" << std::endl; 00727 #endif 00728 } 00729 00730 void CmdInterpreter::visit(ProxyCmdCreateStreamHandle &cmd) 00731 { 00732 StreamGraphHandle *pStreamGraphHandle = getStreamGraphHandle( 00733 cmd.getStreamGraphHandle()); 00734 SharedExecStream pStream; 00735 if (cmd.isInput()) { 00736 pStream = 00737 pStreamGraphHandle->pExecStreamGraph->findLastStream( 00738 cmd.getStreamName(), 0); 00739 } else { 00740 pStream = 00741 pStreamGraphHandle->pExecStreamGraph->findStream( 00742 cmd.getStreamName()); 00743 } 00744 00745 setExecStreamHandle( 00746 cmd.getResultHandle(), 00747 pStream.get()); 00748 } 00749 00750 PageId CmdInterpreter::StreamGraphHandle::getRoot(PageOwnerId pageOwnerId) 00751 { 00752 JniEnvAutoRef pEnv; 00753 jlong x = opaqueToInt(pageOwnerId); 00754 x = pEnv->CallLongMethod( 00755 javaRuntimeContext,JniUtil::methGetIndexRoot,x); 00756 return PageId(x); 00757 } 00758 00759 void CmdInterpreter::readTupleDescriptor( 00760 TupleDescriptor &tupleDesc, 00761 ProxyTupleDescriptor &javaTupleDesc, 00762 StoredTypeDescriptorFactory const &typeFactory) 00763 { 00764 tupleDesc.clear(); 00765 SharedProxyTupleAttrDescriptor pAttr = javaTupleDesc.getAttrDescriptor(); 00766 for (; pAttr; ++pAttr) { 00767 StoredTypeDescriptor const &typeDescriptor = 00768 typeFactory.newDataType(pAttr->getTypeOrdinal()); 00769 tupleDesc.push_back( 00770 TupleAttributeDescriptor( 00771 typeDescriptor,pAttr->isNullable(),pAttr->getByteLength())); 00772 } 00773 } 00774 00775 void CmdInterpreter::readTupleProjection( 00776 TupleProjection &tupleProj, 00777 SharedProxyTupleProjection pJavaTupleProj) 00778 { 00779 tupleProj.clear(); 00780 SharedProxyTupleAttrProjection pAttr = pJavaTupleProj->getAttrProjection(); 00781 for (; pAttr; ++pAttr) { 00782 tupleProj.push_back(pAttr->getAttributeIndex()); 00783 } 00784 } 00785 00786 void CmdInterpreter::visit(ProxyCmdAlterSystemDeallocate &cmd) 00787 { 00788 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle()); 00789 SharedDatabase pDb = pDbHandle->pDb; 00790 if (!pDb->areSnapshotsEnabled()) { 00791
00792 return; 00793 } else { 00794 uint64_t paramVal = cmd.getOldestLabelCsn(); 00795 TxnId labelCsn = isMAXU(paramVal) ? NULL_TXN_ID : TxnId(paramVal); 00796 pDb->deallocateOldPages(labelCsn); 00797 } 00798 } 00799 00800 void CmdInterpreter::visit(ProxyCmdVersionIndexRoot &cmd) 00801 { 00802 TxnHandle *pTxnHandle = getTxnHandle(cmd.getTxnHandle()); 00803 SharedDatabase pDb = pTxnHandle->pDb; 00804 assert(pDb->areSnapshotsEnabled()); 00805 00806 SnapshotRandomAllocationSegment *pSnapshotSegment = 00807 SegmentFactory::dynamicCast<SnapshotRandomAllocationSegment *>( 00808 pTxnHandle->pSnapshotSegment); 00809 pSnapshotSegment->versionPage( 00810 PageId(cmd.getOldRootPageId()), 00811 PageId(cmd.getNewRootPageId())); 00812 } 00813 00814 void CmdInterpreter::visit(ProxyCmdInitiateBackup &cmd) 00815 { 00816 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle()); 00817 SharedDatabase pDb = pDbHandle->pDb; 00818 uint64_t paramVal = cmd.getLowerBoundCsn(); 00819 TxnId lowerBoundCsn = isMAXU(paramVal) ? NULL_TXN_ID : TxnId(paramVal); 00820 FileSize dataDeviceSize; 00821 00822 volatile bool abortFlag = false; 00823 TxnId csn = 00824 pDb->initiateBackup( 00825 cmd.getBackupPathname(), 00826 cmd.isCheckSpaceRequirements(), 00827 FileSize(cmd.getSpacePadding()), 00828 lowerBoundCsn, 00829 cmd.getCompressionProgram(), 00830 dataDeviceSize, 00831 (pExecHandle == NULL) ? abortFlag : pExecHandle->aborted); 00832 cmd.setResultDataDeviceSize(dataDeviceSize); 00833 setCsnHandle(cmd.getResultHandle(), csn); 00834 } 00835 00836 void CmdInterpreter::visit(ProxyCmdCompleteBackup &cmd) 00837 { 00838 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle()); 00839 SharedDatabase pDb = pDbHandle->pDb; 00840 uint64_t paramVal = cmd.getLowerBoundCsn(); 00841 TxnId lowerBoundCsn = isMAXU(paramVal) ? NULL_TXN_ID : TxnId(paramVal); 00842 volatile bool abortFlag = false; 00843 pDb->completeBackup( 00844 lowerBoundCsn, 00845 TxnId(cmd.getUpperBoundCsn()), 00846 (pExecHandle == NULL) ? abortFlag : pExecHandle->aborted); 00847 } 00848 00849 void CmdInterpreter::visit(ProxyCmdAbandonBackup &cmd) 00850 { 00851 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle()); 00852 SharedDatabase pDb = pDbHandle->pDb; 00853 pDb->abortBackup(); 00854 } 00855 00856 void CmdInterpreter::visit(ProxyCmdRestoreFromBackup &cmd) 00857 { 00858 DbHandle *pDbHandle = getDbHandle(cmd.getDbHandle()); 00859 SharedDatabase pDb = pDbHandle->pDb; 00860 uint64_t paramVal = cmd.getLowerBoundCsn(); 00861 TxnId lowerBoundCsn = isMAXU(paramVal) ? NULL_TXN_ID : TxnId(paramVal); 00862 volatile bool abortFlag = false; 00863 pDb->restoreFromBackup( 00864 cmd.getBackupPathname(), 00865 cmd.getFileSize(), 00866 cmd.getCompressionProgram(), 00867 lowerBoundCsn, 00868 TxnId(cmd.getUpperBoundCsn()), 00869 (pExecHandle == NULL) ? abortFlag : pExecHandle->aborted); 00870 } 00871 00872 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/farrago/CmdInterpreter.cpp#62 $"); 00873 00874