Fennel: /home/pub/open/dev/fennel/farrago/ExecStreamFactory.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/farrago/ExecStreamFactory.h" 00026 #include "fennel/farrago/JavaSinkExecStream.h" 00027 #include "fennel/farrago/JavaTransformExecStream.h" 00028 #include "fennel/farrago/CmdInterpreter.h" 00029 #include "fennel/ftrs/BTreePrefetchSearchExecStream.h" 00030 #include "fennel/ftrs/BTreeScanExecStream.h" 00031 #include "fennel/ftrs/BTreeSearchExecStream.h" 00032 #include "fennel/ftrs/BTreeSearchUniqueExecStream.h" 00033 #include "fennel/ftrs/FtrsTableWriterExecStream.h" 00034 #include "fennel/ftrs/BTreeSortExecStream.h" 00035 #include "fennel/exec/MergeExecStream.h" 00036 #include "fennel/exec/SegBufferExecStream.h" 00037 #include "fennel/exec/SegBufferReaderExecStream.h" 00038 #include "fennel/exec/SegBufferWriterExecStream.h" 00039 #include "fennel/exec/SplitterExecStream.h" 00040 #include "fennel/exec/BarrierExecStream.h" 00041 #include "fennel/exec/ValuesExecStream.h" 00042 #include "fennel/exec/ExecStreamGraphEmbryo.h" 00043 #include "fennel/ftrs/FtrsTableWriterFactory.h" 00044 #include "fennel/exec/CartesianJoinExecStream.h" 00045 #include "fennel/exec/SortedAggExecStream.h" 00046 #include "fennel/exec/MockProducerExecStream.h" 00047 #include "fennel/exec/ReshapeExecStream.h" 00048 #include "fennel/exec/NestedLoopJoinExecStream.h" 00049 #include "fennel/exec/BernoulliSamplingExecStream.h" 00050 #include "fennel/calculator/CalcExecStream.h" 00051 #include "fennel/exec/CollectExecStream.h" 00052 #include "fennel/exec/UncollectExecStream.h" 00053 #include "fennel/exec/CorrelationJoinExecStream.h" 00054 #include "fennel/db/Database.h" 00055 #include "fennel/db/CheckpointThread.h" 00056 #include "fennel/tuple/TupleDescriptor.h" 00057 #include "fennel/tuple/TupleAccessor.h" 00058 #include "fennel/cache/QuotaCacheAccessor.h" 00059 #include "fennel/segment/SegmentFactory.h" 00060 #include "fennel/sorter/ExternalSortExecStream.h" 00061 #include "fennel/flatfile/FlatFileExecStream.h" 00062 #include "fennel/hashexe/LhxJoinExecStream.h" 00063 #include "fennel/hashexe/LhxAggExecStream.h" 00064 00065 FENNEL_BEGIN_CPPFILE( 00066 "$Id: //open/dev/fennel/farrago/ExecStreamFactory.cpp#45 $"); 00067 00068 ExecStreamFactory::ExecStreamFactory( 00069 SharedDatabase pDatabaseInit, 00070 SharedFtrsTableWriterFactory pTableWriterFactoryInit, 00071 CmdInterpreter::StreamGraphHandle *pStreamGraphHandleInit) 00072 { 00073 pDatabase = pDatabaseInit; 00074 pTableWriterFactory = pTableWriterFactoryInit; 00075 pStreamGraphHandle = pStreamGraphHandleInit; 00076 pGraphEmbryo = NULL; 00077 } 00078 00079 SharedDatabase ExecStreamFactory::getDatabase() 00080 { 00081 return pDatabase; 00082 } 00083 00084 void ExecStreamFactory::setGraphEmbryo( 00085 ExecStreamGraphEmbryo &graphEmbryo) 00086 { 00087 pGraphEmbryo = &graphEmbryo; 00088 } 00089 00090 void ExecStreamFactory::setScratchAccessor( 00091 SegmentAccessor &scratchAccessorInit) 00092 { 00093 scratchAccessor = scratchAccessorInit; 00094 } 00095 00096 void ExecStreamFactory::addSubFactory( 00097 SharedExecStreamSubFactory pSubFactory) 00098 { 00099 subFactories.push_back(pSubFactory); 00100 } 00101 00102 ExecStreamEmbryo const &ExecStreamFactory::visitStream( 00103 ProxyExecutionStreamDef &streamDef) 00104 { 00105 bool created = false; 00106 00107
00108 std::vector::iterator ppSubFactory; 00109 for (ppSubFactory = subFactories.begin(); 00110 ppSubFactory != subFactories.end(); ++ppSubFactory) 00111 { 00112 ExecStreamSubFactory &subFactory = **ppSubFactory; 00113 created = subFactory.createStream( 00114 *this, 00115 streamDef, 00116 embryo); 00117 if (created) { 00118 break; 00119 } 00120 } 00121 00122 if (!created) { 00123
00124 invokeVisit(streamDef); 00125 } 00126 embryo.getStream()->setName(streamDef.getName()); 00127 return embryo; 00128 } 00129 00130 void ExecStreamFactory::invokeVisit( 00131 ProxyExecutionStreamDef &streamDef) 00132 { 00133 FemVisitor::visitTbl.accept(*this,streamDef); 00134 } 00135 00136 00137 00138 00139 00140 00141 00142 void ExecStreamFactory::visit(ProxyBarrierStreamDef &streamDef) 00143 { 00144 BarrierExecStreamParams params; 00145 readTupleStreamParams(params, streamDef); 00146 params.returnMode = streamDef.getReturnMode(); 00147 readBarrierDynamicParams(params, streamDef); 00148 embryo.init(new BarrierExecStream(), params); 00149 } 00150 00151 void ExecStreamFactory::readBarrierDynamicParams( 00152 BarrierExecStreamParams &params, 00153 ProxyBarrierStreamDef &streamDef) 00154 { 00155 SharedProxyDynamicParameter dynamicParam = streamDef.getDynamicParameter(); 00156 for (; dynamicParam; ++dynamicParam) { 00157 DynamicParamId p = (DynamicParamId) dynamicParam->getParameterId(); 00158 params.parameterIds.push_back(p); 00159 } 00160 } 00161 00162 void ExecStreamFactory::visit(ProxyBufferingTupleStreamDef &streamDef) 00163 { 00164 SegBufferExecStreamParams params; 00165 readTupleStreamParams(params, streamDef); 00166 params.multipass = streamDef.isMultipass(); 00167 if (!streamDef.isInMemory()) { 00168 params.scratchAccessor.pSegment = pDatabase->getTempSegment(); 00169 params.scratchAccessor.pCacheAccessor = params.pCacheAccessor; 00170 } 00171 embryo.init(new SegBufferExecStream(), params); 00172 } 00173 00174 void ExecStreamFactory::visit(ProxyBufferWriterStreamDef &streamDef) 00175 { 00176 SegBufferWriterExecStreamParams params; 00177 readExecStreamParams(params, streamDef); 00178 readTupleDescriptor(params.outputTupleDesc, streamDef.getOutputDesc()); 00179 if (!streamDef.isInMemory()) { 00180 params.scratchAccessor.pSegment = pDatabase->getTempSegment(); 00181 params.scratchAccessor.pCacheAccessor = params.pCacheAccessor; 00182 } 00183 assert(streamDef.isMultipass()); 00184 params.readerRefCountParamId = 00185 readDynamicParamId(streamDef.getReaderRefCountParamId()); 00186 embryo.init(new SegBufferWriterExecStream(), params); 00187 } 00188 00189 void ExecStreamFactory::visit(ProxyBufferReaderStreamDef &streamDef) 00190 { 00191 SegBufferReaderExecStreamParams params; 00192 readTupleStreamParams(params, streamDef); 00193 if (!streamDef.isInMemory()) { 00194 params.scratchAccessor.pSegment = pDatabase->getTempSegment(); 00195 params.scratchAccessor.pCacheAccessor = params.pCacheAccessor; 00196 } 00197 assert(streamDef.isMultipass()); 00198 params.readerRefCountParamId = 00199 readDynamicParamId(streamDef.getReaderRefCountParamId()); 00200 embryo.init(new SegBufferReaderExecStream(), params); 00201 } 00202 00203 void ExecStreamFactory::visit(ProxyCartesianProductStreamDef &streamDef) 00204 { 00205 CartesianJoinExecStreamParams params; 00206 readTupleStreamParams(params, streamDef); 00207 params.leftOuter = streamDef.isLeftOuter(); 00208 embryo.init(new CartesianJoinExecStream(), params); 00209 } 00210 00211 void ExecStreamFactory::visit(ProxyIndexLoaderDef &streamDef) 00212 { 00213 BTreeInsertExecStreamParams params; 00214 readTupleStreamParams(params, streamDef); 00215 readBTreeStreamParams(params, streamDef); 00216 params.distinctness = streamDef.getDistinctness(); 00217 params.monotonic = streamDef.isMonotonic(); 00218 embryo.init(new BTreeInsertExecStream(), params); 00219 } 00220 00221 void ExecStreamFactory::visit(ProxyIndexScanDef &streamDef) 00222 { 00223 BTreeScanExecStreamParams params; 00224 readBTreeReadStreamParams(params, streamDef); 00225 embryo.init( 00226 new BTreeScanExecStream(), 00227 params); 00228 } 00229 00230 void ExecStreamFactory::visit(ProxyIndexSearchDef &streamDef) 00231 { 00232 assert(!(streamDef.isUniqueKey() && streamDef.isPrefetch())); 00233 if (streamDef.isPrefetch()) { 00234 BTreePrefetchSearchExecStreamParams params; 00235 initBTreePrefetchSearchParams(params, streamDef); 00236 embryo.init( 00237 new BTreePrefetchSearchExecStream(), 00238 params); 00239 } else { 00240 BTreeSearchExecStreamParams params; 00241 readBTreeSearchStreamParams(params, streamDef); 00242 embryo.init( 00243 streamDef.isUniqueKey() 00244 ? new BTreeSearchUniqueExecStream() : new BTreeSearchExecStream(), 00245 params); 00246 } 00247 } 00248 00249 void ExecStreamFactory::initBTreePrefetchSearchParams( 00250 BTreePrefetchSearchExecStreamParams &params, 00251 ProxyIndexSearchDef &streamDef) 00252 { 00253 readBTreeSearchStreamParams(params, streamDef); 00254
00255
00256 createPrivateScratchSegment(params); 00257 } 00258 00259 void ExecStreamFactory::visit(ProxyJavaSinkStreamDef &streamDef) 00260 { 00261 JavaSinkExecStreamParams params; 00262 readExecStreamParams(params, streamDef); 00263 params.pStreamGraphHandle = pStreamGraphHandle; 00264 params.javaFennelPipeTupleIterId = streamDef.getStreamId(); 00265 embryo.init(new JavaSinkExecStream(), params); 00266 } 00267 00268 void ExecStreamFactory::visit(ProxyJavaTransformStreamDef &streamDef) 00269 { 00270 JavaTransformExecStreamParams params; 00271 00272 readExecStreamParams(params, streamDef); 00273 00274 readTupleDescriptor(params.outputTupleDesc, streamDef.getOutputDesc()); 00275 00276 params.pStreamGraphHandle = pStreamGraphHandle; 00277 params.javaClassName = streamDef.getJavaClassName(); 00278 embryo.init(new JavaTransformExecStream(), params); 00279 } 00280 00281 void ExecStreamFactory::visit(ProxyMergeStreamDef &streamDef) 00282 { 00283 MergeExecStreamParams params; 00284 readTupleStreamParams(params, streamDef); 00285 if (!streamDef.isSequential()) { 00286 params.isParallel = true; 00287 } 00288
00289 assert(!streamDef.isPrePullInputs()); 00290 embryo.init(new MergeExecStream(), params); 00291 } 00292 00293 void ExecStreamFactory::visit(ProxyMockTupleStreamDef &streamDef) 00294 { 00295 MockProducerExecStreamParams params; 00296 readTupleStreamParams(params, streamDef); 00297 params.nRows = streamDef.getRowCount(); 00298 embryo.init(new MockProducerExecStream(), params); 00299 } 00300 00301 void ExecStreamFactory::visit(ProxyTableDeleterDef &streamDef) 00302 { 00303 FtrsTableWriterExecStreamParams params; 00304 params.actionType = FtrsTableWriter::ACTION_DELETE; 00305 readTableWriterStreamParams(params, streamDef); 00306 embryo.init(new FtrsTableWriterExecStream(), params); 00307 } 00308 00309 void ExecStreamFactory::visit(ProxyTableInserterDef &streamDef) 00310 { 00311 FtrsTableWriterExecStreamParams params; 00312 params.actionType = FtrsTableWriter::ACTION_INSERT; 00313 readTableWriterStreamParams(params, streamDef); 00314 embryo.init(new FtrsTableWriterExecStream(), params); 00315 } 00316 00317 void ExecStreamFactory::visit(ProxyTableUpdaterDef &streamDef) 00318 { 00319 FtrsTableWriterExecStreamParams params; 00320 params.actionType = FtrsTableWriter::ACTION_UPDATE; 00321 SharedProxyTupleProjection pUpdateProj = streamDef.getUpdateProj(); 00322 CmdInterpreter::readTupleProjection( 00323 params.updateProj, 00324 pUpdateProj); 00325 readTableWriterStreamParams(params, streamDef); 00326 embryo.init(new FtrsTableWriterExecStream(), params); 00327 } 00328 00329 void ExecStreamFactory::visit(ProxySortedAggStreamDef &streamDef) 00330 { 00331 SortedAggExecStreamParams params; 00332 readAggStreamParams(params, streamDef); 00333 embryo.init(new SortedAggExecStream(), params); 00334 } 00335 00336 void ExecStreamFactory::implementSortWithBTree(ProxySortingStreamDef &streamDef) 00337 { 00338 BTreeSortExecStreamParams params; 00339 readTupleStreamParams(params,streamDef); 00340 params.distinctness = streamDef.getDistinctness(); 00341 params.monotonic = false; 00342 params.pSegment = pDatabase->getTempSegment(); 00343 params.rootPageId = NULL_PAGE_ID; 00344 params.segmentId = Database::TEMP_SEGMENT_ID; 00345 params.pageOwnerId = ANON_PAGE_OWNER_ID; 00346 params.pRootMap = NULL; 00347 params.rootPageIdParamId = DynamicParamId(0); 00348 CmdInterpreter::readTupleProjection( 00349 params.keyProj, 00350 streamDef.getKeyProj()); 00351
00352
00353 params.tupleDesc = params.outputTupleDesc; 00354 embryo.init(new BTreeSortExecStream(), params); 00355 } 00356 00357 void ExecStreamFactory::visit(ProxySplitterStreamDef &streamDef) 00358 { 00359 SplitterExecStreamParams params; 00360 readExecStreamParams(params, streamDef); 00361 readTupleDescriptor(params.outputTupleDesc, streamDef.getOutputDesc()); 00362 embryo.init(new SplitterExecStream(), params); 00363 } 00364 00365 00366 void ExecStreamFactory::visit(ProxyValuesStreamDef &streamDef) 00367 { 00368 ValuesExecStreamParams params; 00369 readTupleStreamParams(params, streamDef); 00370 00371
00372 jobject tupleBytesBase64 = streamDef.pEnv->CallObjectMethod( 00373 streamDef.jObject, ProxyValuesStreamDef::meth_getTupleBytesBase64); 00374 00375
00376 jbyteArray jbytes = (jbyteArray) streamDef.pEnv->CallStaticObjectMethod( 00377 JniUtil::classRhBase64, 00378 JniUtil::methBase64Decode, 00379 tupleBytesBase64); 00380 00381
00382 params.bufSize = streamDef.pEnv->GetArrayLength(jbytes); 00383 params.pTupleBuffer.reset(new FixedBuffer[params.bufSize]); 00384 streamDef.pEnv->GetByteArrayRegion( 00385 jbytes, 0, params.bufSize, 00386 reinterpret_cast<jbyte *>(params.pTupleBuffer.get())); 00387 00388 embryo.init(new ValuesExecStream(), params); 00389 } 00390 00391 void ExecStreamFactory::visit(ProxyReshapeStreamDef &streamDef) 00392 { 00393 ReshapeExecStreamParams params; 00394 readTupleStreamParams(params, streamDef); 00395 00396 params.compOp = streamDef.getCompareOp(); 00397 if (params.compOp != COMP_NOOP) { 00398
00399 jobject tupleBytesBase64 = streamDef.pEnv->CallObjectMethod( 00400 streamDef.jObject, 00401 ProxyReshapeStreamDef::meth_getTupleCompareBytesBase64); 00402 00403
00404 jbyteArray jbytes = (jbyteArray) streamDef.pEnv->CallStaticObjectMethod( 00405 JniUtil::classRhBase64, 00406 JniUtil::methBase64Decode, 00407 tupleBytesBase64); 00408 00409
00410 int bufSize = streamDef.pEnv->GetArrayLength(jbytes); 00411 params.pCompTupleBuffer.reset(new FixedBuffer[bufSize]); 00412 streamDef.pEnv->GetByteArrayRegion( 00413 jbytes, 0, bufSize, 00414 reinterpret_cast<jbyte *>(params.pCompTupleBuffer.get())); 00415 00416 CmdInterpreter::readTupleProjection( 00417 params.inputCompareProj, streamDef.getInputCompareProjection()); 00418 } 00419 00420 CmdInterpreter::readTupleProjection( 00421 params.outputProj, streamDef.getOutputProjection()); 00422 00423 SharedProxyReshapeParameter dynamicParam = streamDef.getReshapeParameter(); 00424 for (; dynamicParam; ++dynamicParam) { 00425 int offset = dynamicParam->getCompareOffset(); 00426 ReshapeParameter reshapeParam( 00427 DynamicParamId(dynamicParam->getDynamicParamId()), 00428 (offset < 0) ? MAXU : uint(offset), 00429 dynamicParam->isOutputParam()); 00430 params.dynamicParameters.push_back(reshapeParam); 00431 } 00432 00433 embryo.init(new ReshapeExecStream(), params); 00434 } 00435 00436 void ExecStreamFactory::visit(ProxyNestedLoopJoinStreamDef &streamDef) 00437 { 00438 NestedLoopJoinExecStreamParams params; 00439 readTupleStreamParams(params, streamDef); 00440 params.leftOuter = streamDef.isLeftOuter(); 00441 00442 SharedProxyCorrelation dynamicParam = streamDef.getLeftJoinKey(); 00443 for (; dynamicParam; ++dynamicParam) { 00444 NestedLoopJoinKey joinKey( 00445 DynamicParamId(dynamicParam->getId()), 00446 dynamicParam->getOffset()); 00447 params.leftJoinKeys.push_back(joinKey); 00448 } 00449 00450 embryo.init(new NestedLoopJoinExecStream(), params); 00451 } 00452 00453 void ExecStreamFactory::visit(ProxyBernoulliSamplingStreamDef &streamDef) 00454 { 00455 BernoulliSamplingExecStreamParams params; 00456 readTupleStreamParams(params, streamDef); 00457 00458 params.samplingRate = streamDef.getSamplingRate(); 00459 params.isRepeatable = streamDef.isRepeatable(); 00460 params.repeatableSeed = streamDef.getRepeatableSeed(); 00461 00462 embryo.init(new BernoulliSamplingExecStream(), params); 00463 } 00464 00465 void ExecStreamFactory::visit(ProxyCalcTupleStreamDef &streamDef) 00466 { 00467 CalcExecStreamParams params; 00468 readTupleStreamParams(params, streamDef); 00469 params.program = streamDef.getProgram(); 00470 params.isFilter = streamDef.isFilter(); 00471 embryo.init( 00472 new CalcExecStream(), 00473 params); 00474 } 00475 00476 void ExecStreamFactory::visit(ProxyCorrelationJoinStreamDef &streamDef) 00477 { 00478 CorrelationJoinExecStreamParams params; 00479 readTupleStreamParams(params, streamDef); 00480 SharedProxyCorrelation pCorrelation = streamDef.getCorrelations(); 00481 for (; pCorrelation; ++pCorrelation) { 00482 Correlation correlation( 00483 DynamicParamId(pCorrelation->getId()), 00484 pCorrelation->getOffset()); 00485 params.correlations.push_back(correlation); 00486 } 00487 embryo.init(new CorrelationJoinExecStream(), params); 00488 } 00489 00490 void ExecStreamFactory::visit(ProxyCollectTupleStreamDef &streamDef) 00491 { 00492 CollectExecStreamParams params; 00493 readTupleStreamParams(params, streamDef); 00494 embryo.init(new CollectExecStream(), params); 00495 } 00496 00497 void ExecStreamFactory::visit(ProxyUncollectTupleStreamDef &streamDef) 00498 { 00499 UncollectExecStreamParams params; 00500 readTupleStreamParams(params, streamDef); 00501 embryo.init(new UncollectExecStream(), params); 00502 } 00503 00504 void ExecStreamFactory::visit(ProxySortingStreamDef &streamDef) 00505 { 00506 if (streamDef.getDistinctness() != DUP_ALLOW) { 00507
00508 implementSortWithBTree(streamDef); 00509 return; 00510 } 00511 00512 SharedDatabase pDatabase = getDatabase(); 00513 00514 ExternalSortExecStreamParams params; 00515 00516 readTupleStreamParams(params, streamDef); 00517 00518
00519 createPrivateScratchSegment(params); 00520 00521 params.distinctness = streamDef.getDistinctness(); 00522 params.pTempSegment = pDatabase->getTempSegment(); 00523 params.storeFinalRun = false; 00524 params.estimatedNumRows = streamDef.getEstimatedNumRows(); 00525 params.earlyClose = streamDef.isEarlyClose(); 00526 CmdInterpreter::readTupleProjection( 00527 params.keyProj, 00528 streamDef.getKeyProj()); 00529 params.descendingKeyColumns.resize(params.keyProj.size(), false); 00530 if (streamDef.getDescendingProj()) { 00531 TupleProjection descendingProj; 00532 CmdInterpreter::readTupleProjection( 00533 descendingProj, 00534 streamDef.getDescendingProj()); 00535 for (uint i = 0; i < descendingProj.size(); ++i) { 00536 params.descendingKeyColumns[descendingProj[i]] = true; 00537 } 00538 } 00539 embryo.init( 00540 ExternalSortExecStream::newExternalSortExecStream(), 00541 params); 00542 } 00543 00544 char ExecStreamFactory::readCharParam(const std::string &val) 00545 { 00546 assert(val.size() <= 1); 00547 if (val.size() == 0) { 00548 return 0; 00549 } 00550 return val.at(0); 00551 } 00552 00553 void ExecStreamFactory::visit(ProxyFlatFileTupleStreamDef &streamDef) 00554 { 00555 FlatFileExecStreamParams params; 00556 readTupleStreamParams(params, streamDef); 00557 00558 assert(streamDef.getDataFilePath().size() > 0); 00559 params.dataFilePath = streamDef.getDataFilePath(); 00560 params.errorFilePath = streamDef.getErrorFilePath(); 00561 params.fieldDelim = readCharParam(streamDef.getFieldDelimiter()); 00562 params.rowDelim = readCharParam(streamDef.getRowDelimiter()); 00563 params.quoteChar = readCharParam(streamDef.getQuoteCharacter()); 00564 params.escapeChar = readCharParam(streamDef.getEscapeCharacter()); 00565 params.header = streamDef.isHasHeader(); 00566 params.lenient = streamDef.isLenient(); 00567 params.trim = streamDef.isTrim(); 00568 params.mapped = streamDef.isMapped(); 00569 readColumnList(streamDef, params.columnNames); 00570 00571 params.numRowsScan = streamDef.getNumRowsScan(); 00572 params.calcProgram = streamDef.getCalcProgram(); 00573 if (params.numRowsScan > 0 && params.calcProgram.size() > 0) { 00574 params.mode = FLATFILE_MODE_SAMPLE; 00575 } else if (params.numRowsScan > 0) { 00576 params.mode = FLATFILE_MODE_DESCRIBE; 00577 } else if (params.numRowsScan == 0 && params.calcProgram.size() == 0) { 00578 params.mode = FLATFILE_MODE_QUERY_TEXT; 00579 } 00580 embryo.init(FlatFileExecStream::newFlatFileExecStream(), params); 00581 } 00582 00583 void ExecStreamFactory::visit(ProxyLhxJoinStreamDef &streamDef) 00584 { 00585 TupleProjection tmpProj; 00586 00587 LhxJoinExecStreamParams params; 00588 readTupleStreamParams(params, streamDef); 00589 00590
00591 00592 00593 createPrivateScratchSegment(params); 00594 00595
00596 00597 00598 SharedDatabase pDatabase = getDatabase(); 00599 params.pTempSegment = pDatabase->getTempSegment(); 00600 00601
00602 00603 00604 00605 params.leftInner = streamDef.isLeftInner(); 00606 params.leftOuter = streamDef.isLeftOuter(); 00607 params.rightInner = streamDef.isRightInner(); 00608 params.rightOuter = streamDef.isRightOuter(); 00609 params.setopDistinct = streamDef.isSetopDistinct(); 00610 params.setopAll = streamDef.isSetopAll(); 00611 00612
00613 00614 00615 params.forcePartitionLevel = 0; 00616 params.enableJoinFilter = true; 00617 params.enableSubPartStat = true; 00618 params.enableSwing = true; 00619 00620 CmdInterpreter::readTupleProjection( 00621 params.leftKeyProj, streamDef.getLeftKeyProj()); 00622 00623 CmdInterpreter::readTupleProjection( 00624 params.rightKeyProj, streamDef.getRightKeyProj()); 00625 00626 CmdInterpreter::readTupleProjection( 00627 params.filterNullKeyProj, streamDef.getFilterNullProj()); 00628 00629
00630 00631 00632 params.cndKeys = streamDef.getCndBuildKeys(); 00633 params.numRows = streamDef.getNumBuildRows(); 00634 00635 embryo.init(new LhxJoinExecStream(), params); 00636 } 00637 00638 void ExecStreamFactory::visit(ProxyLhxAggStreamDef &streamDef) 00639 { 00640 LhxAggExecStreamParams params; 00641 readAggStreamParams(params, streamDef); 00642 00643
00644 00645 00646 createPrivateScratchSegment(params); 00647 00648
00649 00650 00651 SharedDatabase pDatabase = getDatabase(); 00652 params.pTempSegment = pDatabase->getTempSegment(); 00653 00654
00655 00656 00657 params.cndGroupByKeys = streamDef.getCndGroupByKeys(); 00658 params.numRows = streamDef.getNumRows(); 00659 00660
00661 00662 00663 params.forcePartitionLevel = 0; 00664 00665
00666 00667 00668 00669 00670 00671 00672 params.enableSubPartStat = false; 00673 00674 embryo.init(new LhxAggExecStream(), params); 00675 } 00676 00677 void ExecStreamFactory::readColumnList( 00678 ProxyFlatFileTupleStreamDef &streamDef, 00679 std::vectorstd::string &names) 00680 { 00681 SharedProxyColumnName pColumnName = streamDef.getColumn(); 00682 00683 for (; pColumnName; ++pColumnName) { 00684 names.push_back(pColumnName->getName()); 00685 } 00686 } 00687 00688 void ExecStreamFactory::readExecStreamParams( 00689 ExecStreamParams &params, 00690 ProxyExecutionStreamDef &streamDef) 00691 { 00692 createQuotaAccessors(params); 00693 } 00694 00695 void ExecStreamFactory::readTupleDescriptor( 00696 TupleDescriptor& desc, 00697 SharedProxyTupleDescriptor def) 00698 { 00699 assert(def); 00700 CmdInterpreter::readTupleDescriptor( 00701 desc, *def, pDatabase->getTypeFactory()); 00702 } 00703 00704 void ExecStreamFactory::readTupleStreamParams( 00705 SingleOutputExecStreamParams &params, 00706 ProxyTupleStreamDef &streamDef) 00707 { 00708 readExecStreamParams(params,streamDef); 00709 readTupleDescriptor(params.outputTupleDesc, streamDef.getOutputDesc()); 00710 } 00711 00712 void ExecStreamFactory::createPrivateScratchSegment(ExecStreamParams &params) 00713 { 00714
00715 assert(params.pCacheAccessor); 00716 00717 params.scratchAccessor = 00718 pDatabase->getSegmentFactory()->newScratchSegment( 00719 pDatabase->getCache()); 00720 SharedQuotaCacheAccessor pSuperQuotaAccessor = 00721 boost::dynamic_pointer_cast( 00722 params.pCacheAccessor); 00723 params.scratchAccessor.pCacheAccessor.reset( 00724 new QuotaCacheAccessor( 00725 pSuperQuotaAccessor, 00726 params.scratchAccessor.pCacheAccessor, 00727 UINT_MAX)); 00728 } 00729 00730 void ExecStreamFactory::createQuotaAccessors( 00731 ExecStreamParams &params) 00732 { 00733 assert(pGraphEmbryo); 00734 pGraphEmbryo->initStreamParams(params); 00735 } 00736 00737 void ExecStreamFactory::readTableWriterStreamParams( 00738 FtrsTableWriterExecStreamParams &params, 00739 ProxyTableWriterDef &streamDef) 00740 { 00741 readTupleStreamParams(params, streamDef); 00742 params.pTableWriterFactory = pTableWriterFactory; 00743 params.tableId = ANON_PAGE_OWNER_ID; 00744 params.pActionMutex = &(pDatabase->getCheckpointThread()->getActionMutex()); 00745 00746 SharedProxyIndexWriterDef pIndexWriterDef = streamDef.getIndexWriter(); 00747 for (; pIndexWriterDef; ++pIndexWriterDef) { 00748 FtrsTableIndexWriterParams indexParams; 00749
00750 indexParams.pCacheAccessor = params.pCacheAccessor; 00751 indexParams.scratchAccessor = params.scratchAccessor; 00752 readIndexWriterParams(indexParams, *pIndexWriterDef); 00753 SharedProxyTupleProjection pInputProj = 00754 pIndexWriterDef->getInputProj(); 00755 if (pInputProj) { 00756 CmdInterpreter::readTupleProjection( 00757 indexParams.inputProj, 00758 pInputProj); 00759 } else { 00760
00761 params.tableId = indexParams.pageOwnerId; 00762 } 00763 params.indexParams.push_back(indexParams); 00764 } 00765 assert(params.tableId != ANON_PAGE_OWNER_ID); 00766 } 00767 00768 void ExecStreamFactory::readBTreeStreamParams( 00769 BTreeExecStreamParams &params, 00770 ProxyIndexAccessorDef &streamDef) 00771 { 00772 assert(params.pCacheAccessor); 00773 readBTreeParams(params, streamDef); 00774 } 00775 00776 void ExecStreamFactory::readBTreeParams( 00777 BTreeParams &params, 00778 ProxyIndexAccessorDef &streamDef) 00779 { 00780 params.rootPageIdParamId = 00781 readDynamicParamId(streamDef.getRootPageIdParamId()); 00782 if (params.rootPageIdParamId > DynamicParamId(0) && 00783 streamDef.getRootPageId() == -1) 00784 { 00785
00786
00787 params.segmentId = Database::TEMP_SEGMENT_ID; 00788 params.pageOwnerId = ANON_PAGE_OWNER_ID; 00789 params.pSegment = pDatabase->getTempSegment(); 00790 params.rootPageId = NULL_PAGE_ID; 00791 params.pRootMap = NULL; 00792 } else { 00793 params.segmentId = SegmentId(streamDef.getSegmentId()); 00794 params.pageOwnerId = PageOwnerId(streamDef.getIndexId()); 00795 assert(VALID_PAGE_OWNER_ID(params.pageOwnerId)); 00796
00797
00798
00799 if (streamDef.isReadOnlyCommittedData()) { 00800 params.pSegment = 00801 pDatabase->getSegmentById( 00802 params.segmentId, 00803 pStreamGraphHandle->pReadCommittedSegment); 00804 } else { 00805 params.pSegment = 00806 pDatabase->getSegmentById( 00807 params.segmentId, 00808 pStreamGraphHandle->pSegment); 00809 } 00810 if (streamDef.getRootPageId() != -1) { 00811 params.rootPageId = PageId(streamDef.getRootPageId()); 00812 params.pRootMap = NULL; 00813 } else { 00814 params.rootPageId = NULL_PAGE_ID; 00815 if (params.rootPageIdParamId == DynamicParamId(0)) { 00816 params.pRootMap = pStreamGraphHandle; 00817 } 00818 } 00819 } 00820 readTupleDescriptor(params.tupleDesc, streamDef.getTupleDesc()); 00821 CmdInterpreter::readTupleProjection( 00822 params.keyProj, 00823 streamDef.getKeyProj()); 00824 00825 } 00826 00827 DynamicParamId ExecStreamFactory::readDynamicParamId(const int val) 00828 { 00829
00830 uint (id) = (val < 0) ? 0 : (uint) val; 00831 return (DynamicParamId) id; 00832 } 00833 00834 void ExecStreamFactory::readBTreeReadStreamParams( 00835 BTreeReadExecStreamParams &params, 00836 ProxyIndexScanDef &streamDef) 00837 { 00838 readTupleStreamParams(params, streamDef); 00839 readBTreeStreamParams(params, streamDef); 00840 CmdInterpreter::readTupleProjection( 00841 params.outputProj, 00842 streamDef.getOutputProj()); 00843 } 00844 00845 void ExecStreamFactory::readIndexWriterParams( 00846 FtrsTableIndexWriterParams &params, 00847 ProxyIndexWriterDef &indexWriterDef) 00848 { 00849 readBTreeStreamParams(params, indexWriterDef); 00850 params.distinctness = indexWriterDef.getDistinctness(); 00851 params.updateInPlace = indexWriterDef.isUpdateInPlace(); 00852 } 00853 00854 void ExecStreamFactory::readBTreeSearchStreamParams( 00855 BTreeSearchExecStreamParams &params, 00856 ProxyIndexSearchDef &streamDef) 00857 { 00858 readBTreeReadStreamParams(params, streamDef); 00859 params.outerJoin = streamDef.isOuterJoin(); 00860 if (streamDef.getInputKeyProj()) { 00861 CmdInterpreter::readTupleProjection( 00862 params.inputKeyProj, 00863 streamDef.getInputKeyProj()); 00864 } 00865 if (streamDef.getInputJoinProj()) { 00866 CmdInterpreter::readTupleProjection( 00867 params.inputJoinProj, 00868 streamDef.getInputJoinProj()); 00869 } 00870 if (streamDef.getInputDirectiveProj()) { 00871 CmdInterpreter::readTupleProjection( 00872 params.inputDirectiveProj, 00873 streamDef.getInputDirectiveProj()); 00874 } 00875 00876 SharedProxyCorrelation dynamicParam = streamDef.getSearchKeyParameter(); 00877 for (; dynamicParam; ++dynamicParam) { 00878 BTreeSearchKeyParameter searchKeyParam( 00879 DynamicParamId(dynamicParam->getId()), 00880 dynamicParam->getOffset()); 00881 params.searchKeyParams.push_back(searchKeyParam); 00882 } 00883 } 00884 00885 void ExecStreamFactory::readAggStreamParams( 00886 SortedAggExecStreamParams &params, 00887 ProxyAggStreamDef &streamDef) 00888 { 00889 readTupleStreamParams(params,streamDef); 00890 SharedProxyAggInvocation pAggInvocation = streamDef.getAggInvocation(); 00891 for (; pAggInvocation; ++pAggInvocation) { 00892 AggInvocation aggInvocation; 00893 aggInvocation.aggFunction = pAggInvocation->getFunction(); 00894 aggInvocation.iInputAttr = 00895 pAggInvocation->getInputAttributeIndex(); 00896 params.aggInvocations.push_back(aggInvocation); 00897 } 00898 params.groupByKeyCount = streamDef.getGroupingPrefixSize(); 00899 } 00900 00901 ExecStreamSubFactory::~ExecStreamSubFactory() 00902 { 00903 } 00904 00905 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/farrago/ExecStreamFactory.cpp#45 $"); 00906 00907