Fennel: /home/pub/open/dev/fennel/lucidera/bitmap/LbmBitOpExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 #include "fennel/common/CommonPreamble.h" 00023 #include "fennel/exec/ExecStreamBufAccessor.h" 00024 #include "fennel/lucidera/bitmap/LbmBitOpExecStream.h" 00025 00026 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmBitOpExecStream.cpp#11 $"); 00027 00028 void LbmBitOpExecStream::prepare(LbmBitOpExecStreamParams const &params) 00029 { 00030 ConfluenceExecStream::prepare(params); 00031 00032
00033 rowLimitParamId = params.rowLimitParamId; 00034 startRidParamId = params.startRidParamId; 00035 00036
00037 rowLimitDatum.pData = (PConstBuffer) &rowLimit; 00038 rowLimitDatum.cbData = sizeof(rowLimit); 00039 startRidDatum.pData = (PConstBuffer) &startRid; 00040 startRidDatum.cbData = sizeof(startRid); 00041 00042
00043 nInputs = inAccessors.size(); 00044 segmentReaders.reset(new LbmSegmentReader[nInputs]); 00045 bitmapSegTuples.reset(new TupleData[nInputs]); 00046 for (uint i = 0; i < nInputs; i++) { 00047 bitmapSegTuples[i].compute(inAccessors[i]->getTupleDesc()); 00048 } 00049 00050 nFields = inAccessors[0]->getTupleDesc().size() - 3; 00051 } 00052 00053 void LbmBitOpExecStream::open(bool restart) 00054 { 00055 ConfluenceExecStream::open(restart); 00056 00057 uint nKeys = pOutAccessor->getTupleDesc().size() - 3; 00058 00059 if (!restart) { 00060
00061
00062 if (opaqueToInt(rowLimitParamId) > 0) { 00063 pDynamicParamManager->createParam( 00064 rowLimitParamId, pOutAccessor->getTupleDesc()[nKeys]); 00065 } 00066 if (opaqueToInt(startRidParamId) > 0) { 00067 pDynamicParamManager->createParam( 00068 startRidParamId, pOutAccessor->getTupleDesc()[nKeys]); 00069 } 00070 } 00071 00072
00073
00074
00075 if (outputBuf) { 00076
00077
00078 uint bitmapColSize = pOutAccessor->getTupleDesc()[nKeys + 1].cbStorage; 00079 uint outputBufSize = LbmEntry::getScratchBufferSize(bitmapColSize); 00080 outputBuf.reset(new FixedBuffer[outputBufSize]); 00081 00082
00083
00084 segmentWriter.init( 00085 outputBuf.get(), outputBufSize, 00086 inAccessors[1]->getTupleDesc(), true); 00087 00088
00089
00090 bitmapBufSize = LbmEntry::getMaxBitmapSize(bitmapColSize); 00091 byteSegBuf.reset(new FixedBuffer[bitmapBufSize]); 00092 pByteSegBuf = byteSegBuf.get(); 00093 00094 } else { 00095 segmentWriter.reset(); 00096 } 00097 00098 startRid = LcsRid(0); 00099 rowLimit = 1; 00100 producePending = false; 00101 writeStartRidParamValue(); 00102 if (opaqueToInt(rowLimitParamId) > 0) { 00103 pDynamicParamManager->writeParam(rowLimitParamId, rowLimitDatum); 00104 } 00105 for (uint i = 0; i < nInputs; i++) { 00106 segmentReaders[i].init(inAccessors[i], bitmapSegTuples[i]); 00107 } 00108 } 00109 00110 ExecStreamResult LbmBitOpExecStream::producePendingOutput(uint iInput) 00111 { 00112 if (produceTuple(outputTuple)) { 00113 return EXECRC_BUF_OVERFLOW; 00114 } 00115
00116
00117 if (segmentWriter.isEmpty()) { 00118 segmentWriter.reset(); 00119 if (addSegments()) { 00120 return EXECRC_BUF_OVERFLOW; 00121 } 00122 } 00123 producePending = false; 00124 if (inAccessors[iInput]->getState() == EXECBUF_EOS) { 00125 pOutAccessor->markEOS(); 00126 return EXECRC_EOS; 00127 } 00128 00129 return EXECRC_YIELD; 00130 } 00131 00132 ExecStreamResult LbmBitOpExecStream::readInput( 00133 uint iInput, LcsRid &currRid, PBuffer &currByteSeg, uint &currLen) 00134 { 00135 ExecStreamResult rc = segmentReaders[iInput].advanceToRid(startRid); 00136 00137 if (rc == EXECRC_EOS) { 00138
00139 if (! flush()) { 00140 return EXECRC_BUF_OVERFLOW; 00141 } 00142 pOutAccessor->markEOS(); 00143 return EXECRC_EOS; 00144 } else if (rc != EXECRC_YIELD) { 00145 return rc; 00146 } 00147 00148 segmentReaders[iInput].readCurrentByteSegment( 00149 currRid, currByteSeg, currLen); 00150
00151
00152 assert(currLen <= bitmapBufSize); 00153 00154 return EXECRC_YIELD; 00155 } 00156 00157 bool LbmBitOpExecStream::flush() 00158 { 00159 assert (producePending); 00160 00161 if (segmentWriter.isEmpty()) { 00162 outputTuple = segmentWriter.produceSegmentTuple(); 00163 segmentWriter.reset(); 00164 if (produceTuple(outputTuple)) { 00165 producePending = true; 00166 } 00167 } 00168 return producePending; 00169 } 00170 00171 bool LbmBitOpExecStream::addSegments() 00172 { 00173 while (addLen > 0) { 00174 if (segmentWriter.addSegment(addRid, addByteSeg, addLen)) { 00175 break; 00176 } 00177 00178 outputTuple = segmentWriter.produceSegmentTuple(); 00179 if (produceTuple(outputTuple)) { 00180 producePending = true; 00181 return false; 00182 } 00183 00184
00185
00186 segmentWriter.reset(); 00187 } 00188 00189 return true; 00190 } 00191 00192 bool LbmBitOpExecStream::produceTuple(TupleData bitmapTuple) 00193 { 00194 assert(pOutAccessor->getTupleDesc().size() == bitmapTuple.size()); 00195 return pOutAccessor->produceTuple(bitmapTuple); 00196 } 00197 00198 void LbmBitOpExecStream::closeImpl() 00199 { 00200 ConfluenceExecStream::closeImpl(); 00201 outputBuf.reset(); 00202 byteSegBuf.reset(); 00203 } 00204 00205 void LbmBitOpExecStream::writeStartRidParamValue() 00206 { 00207 if (opaqueToInt(startRidParamId) > 0) { 00208 pDynamicParamManager->writeParam(startRidParamId, startRidDatum); 00209 } 00210 } 00211 00212 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmBitOpExecStream.cpp#11 $"); 00213 00214