Fennel: /home/pub/open/dev/fennel/lucidera/bitmap/LbmChopperExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 #include "fennel/common/CommonPreamble.h" 00023 #include "fennel/exec/ExecStreamBufAccessor.h" 00024 #include "fennel/lucidera/bitmap/LbmChopperExecStream.h" 00025 00026 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmChopperExecStream.cpp#4 $"); 00027 00028 LbmChopperExecStream::LbmChopperExecStream() 00029 { 00030 ridLimitParamId = DynamicParamId(0); 00031 } 00032 00033 void LbmChopperExecStream::prepare(LbmChopperExecStreamParams const &params) 00034 { 00035 ConfluenceExecStream::prepare(params); 00036 00037
00038 ridLimitParamId = params.ridLimitParamId; 00039 assert(opaqueToInt(ridLimitParamId) > 0); 00040 00041
00042 inputTuple.compute(inAccessors[0]->getTupleDesc()); 00043 00044
00045 SegmentAccessor scratchAccessor = params.scratchAccessor; 00046 writerPageLock.accessSegment(scratchAccessor); 00047 pageSize = scratchAccessor.pSegment->getUsablePageSize(); 00048 } 00049 00050 void LbmChopperExecStream::getResourceRequirements( 00051 ExecStreamResourceQuantity &minQuantity, 00052 ExecStreamResourceQuantity &optQuantity) 00053 { 00054 ConfluenceExecStream::getResourceRequirements(minQuantity, optQuantity); 00055 00056
00057 minQuantity.nCachePages += 1; 00058 optQuantity.nCachePages += 1; 00059 } 00060 00061 void LbmChopperExecStream::open(bool restart) 00062 { 00063 ConfluenceExecStream::open(restart); 00064 00065 if (!restart) { 00066 uint bitmapColSize = pOutAccessor->getTupleDesc()[1].cbStorage; 00067 uint writerBufSize = LbmEntry::getScratchBufferSize(bitmapColSize); 00068 writerPageLock.allocatePage(); 00069 PBuffer writerBuf = writerPageLock.getPage().getWritableData(); 00070 segmentWriter.init( 00071 writerBuf, writerBufSize, pOutAccessor->getTupleDesc(), false); 00072 } else { 00073 segmentWriter.reset(); 00074 } 00075 00076 state = LBM_STATE_READ; 00077 writePending = false; 00078 producePending = false; 00079 segmentReader.init(inAccessors[0], inputTuple); 00080 } 00081 00082 ExecStreamResult LbmChopperExecStream::execute( 00083 ExecStreamQuantum const &quantum) 00084 { 00085 ridLimit = *reinterpret_cast<RecordNum const *>( 00086 pDynamicParamManager->getParam(ridLimitParamId).getDatum().pData); 00087 00088 uint nTuples = 0; 00089 ExecStreamResult status; 00090 while (nTuples < quantum.nTuplesMax) { 00091 switch (state) { 00092 case LBM_STATE_READ: 00093 status = readSegment(); 00094 if (status == EXECRC_EOS) { 00095
00096 if (! segmentWriter.isEmpty()) { 00097 producePending = true; 00098 state = LBM_STATE_PRODUCE; 00099 continue; 00100 } 00101 state = LBM_STATE_DONE; 00102 continue; 00103 } 00104 if (status != EXECRC_YIELD) { 00105 return status; 00106 } 00107 state = LBM_STATE_WRITE; 00108 continue; 00109 case LBM_STATE_WRITE: 00110 if (! writeSegment()) { 00111 producePending = true; 00112 state = LBM_STATE_PRODUCE; 00113 continue; 00114 } 00115 nTuples++; 00116 state = LBM_STATE_READ; 00117 continue; 00118 case LBM_STATE_PRODUCE: 00119 if (! produceTuple()) { 00120 return EXECRC_BUF_OVERFLOW; 00121 } 00122 state = writePending ? LBM_STATE_WRITE : LBM_STATE_READ; 00123 continue; 00124 case LBM_STATE_DONE: 00125 pOutAccessor->markEOS(); 00126 return EXECRC_EOS; 00127 default: 00128 assert(false); 00129 } 00130 } 00131 return EXECRC_QUANTUM_EXPIRED; 00132 } 00133 00134 void LbmChopperExecStream::closeImpl() 00135 { 00136 ConfluenceExecStream::closeImpl(); 00137 } 00138 00139 ExecStreamResult LbmChopperExecStream::readSegment() 00140 { 00141 if (writePending) { 00142 return EXECRC_YIELD; 00143 } 00144 ExecStreamResult status = segmentReader.readSegmentAndAdvance( 00145 inputSegment.byteNum, inputSegment.byteSeg, inputSegment.len); 00146 if (status == EXECRC_YIELD) { 00147 writePending = true; 00148 } 00149 return status; 00150 } 00151 00152 bool LbmChopperExecStream::writeSegment() 00153 { 00154 assert(writePending = true); 00155 LcsRid startRid = inputSegment.getSrid(); 00156 LcsRid endRid = inputSegment.getEndRid(); 00157 assert(opaqueToInt(endRid - startRid) <= ridLimit); 00158 00159
00160
00161
00162 bool firstWrite = segmentWriter.isEmpty(); 00163 if (! firstWrite) { 00164 if (startRid < currentEndRid) { 00165 return false; 00166 } 00167 if (opaqueToInt(endRid - currentSrid) > ridLimit) { 00168 return false; 00169 } 00170 } 00171 00172
00173 PBuffer byteSeg = inputSegment.byteSeg - (inputSegment.len - 1); 00174 if (segmentWriter.addSegment( 00175 startRid, 00176 byteSeg, 00177 inputSegment.len)) 00178 { 00179 writePending = false; 00180 if (firstWrite) { 00181 currentSrid = startRid; 00182 } 00183 currentEndRid = endRid; 00184 return true; 00185 } 00186 return false; 00187 } 00188 00189 bool LbmChopperExecStream::produceTuple() 00190 { 00191 assert(producePending); 00192 00193 TupleData outputTuple = segmentWriter.produceSegmentTuple(); 00194 if (pOutAccessor->produceTuple(outputTuple)) { 00195 segmentWriter.reset(); 00196 producePending = false; 00197 return true; 00198 } 00199 return false; 00200 } 00201 00202 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmChopperExecStream.cpp#4 $"); 00203 00204