Fennel: /home/pub/open/dev/fennel/lucidera/bitmap/LbmUnionExecStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 #include "fennel/common/CommonPreamble.h" 00023 #include "fennel/exec/ExecStreamBufAccessor.h" 00024 #include "fennel/lucidera/bitmap/LbmUnionExecStream.h" 00025 00026 #include <math.h> 00027 00028 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmUnionExecStream.cpp#15 $"); 00029 00030 void LbmUnionExecStream::prepare(LbmUnionExecStreamParams const &params) 00031 { 00032 ConfluenceExecStream::prepare(params); 00033 maxRid = params.maxRid; 00034 00035
00036 ridLimitParamId = params.ridLimitParamId; 00037 assert(opaqueToInt(ridLimitParamId) > 0); 00038 00039
00040 startRidParamId = params.startRidParamId; 00041 segmentLimitParamId = params.segmentLimitParamId; 00042 00043
00044 ridLimitDatum.pData = (PConstBuffer) &ridLimit; 00045 ridLimitDatum.cbData = sizeof(ridLimit); 00046 00047 assert(inAccessors[0]->getTupleDesc() == pOutAccessor->getTupleDesc()); 00048 00049
00050 inputTuple.compute(inAccessors[0]->getTupleDesc()); 00051 00052
00053 scratchAccessor = params.scratchAccessor; 00054 workspacePageLock.accessSegment(scratchAccessor); 00055 writerPageLock.accessSegment(scratchAccessor); 00056 pageSize = scratchAccessor.pSegment->getUsablePageSize(); 00057 } 00058 00059 void LbmUnionExecStream::getResourceRequirements( 00060 ExecStreamResourceQuantity &minQuantity, 00061 ExecStreamResourceQuantity &optQuantity, 00062 ExecStreamResourceSettingType &optType) 00063 { 00064 ConfluenceExecStream::getResourceRequirements(minQuantity, optQuantity); 00065 00066
00067
00068
00069 minQuantity.nCachePages += 2; 00070 optQuantity.nCachePages += 2 + computeOptWorkspacePages(maxRid) + 1; 00071 optType = EXEC_RESOURCE_ESTIMATE; 00072 } 00073 00074 void LbmUnionExecStream::setResourceAllocation( 00075 ExecStreamResourceQuantity &quantity) 00076 { 00077 ConfluenceExecStream::setResourceAllocation(quantity); 00078 00079
00080 nWorkspacePages = quantity.nCachePages - 1; 00081 ridLimit = computeRidLimit(nWorkspacePages); 00082 } 00083 00084 void LbmUnionExecStream::open(bool restart) 00085 { 00086 ConfluenceExecStream::open(restart); 00087 00088 if (!restart) { 00089 uint bitmapColSize = pOutAccessor->getTupleDesc()[1].cbStorage; 00090 uint writerBufSize = LbmEntry::getScratchBufferSize(bitmapColSize); 00091 writerPageLock.allocatePage(); 00092 PBuffer writerBuf = writerPageLock.getPage().getWritableData(); 00093 segmentWriter.init( 00094 writerBuf, writerBufSize, pOutAccessor->getTupleDesc(), false); 00095
00096 reverseArea = writerBuf + writerBufSize; 00097 reverseAreaSize = 00098 scratchAccessor.pSegment->getUsablePageSize() - writerBufSize; 00099 00100
00101 boost::shared_array ppBuffers(new PBuffer[nWorkspacePages]); 00102 assert(ppBuffers != NULL); 00103 for (uint i = 0; i < nWorkspacePages; i++) { 00104 workspacePageLock.allocatePage(); 00105 ppBuffers[i] = workspacePageLock.getPage().getWritableData(); 00106 workspacePageLock.unlock(); 00107 } 00108 ByteBuffer *pBuffer = new ByteBuffer(); 00109 pBuffer->init(ppBuffers, nWorkspacePages, pageSize); 00110 SharedByteBuffer pWorkspaceBuffer(pBuffer); 00111 uint maxSegmentSize = LbmEntry::getMaxBitmapSize(bitmapColSize); 00112 workspace.init(pWorkspaceBuffer, maxSegmentSize); 00113 00114
00115 pDynamicParamManager->createParam( 00116 ridLimitParamId, pOutAccessor->getTupleDesc()[0]); 00117 pDynamicParamManager->writeParam(ridLimitParamId, ridLimitDatum); 00118 } else { 00119 workspace.reset(); 00120 segmentWriter.reset(); 00121 } 00122 00123 writePending = false; 00124 producePending = false; 00125 isDone = false; 00126 segmentReader.init(inAccessors[0], inputTuple); 00127 } 00128 00129 ExecStreamResult LbmUnionExecStream::execute( 00130 ExecStreamQuantum const &quantum) 00131 { 00132 if (isDone) { 00133 pOutAccessor->markEOS(); 00134 return EXECRC_EOS; 00135 } 00136 00137 if (isConsumerSridSet()) { 00138
00139 requestedSrid = (LcsRid) *reinterpret_cast<RecordNum const *>( 00140 pDynamicParamManager->getParam(startRidParamId).getDatum().pData); 00141 workspace.advanceToSrid(requestedSrid); 00142 } 00143 if (isSegmentLimitSet()) { 00144 segmentsRemaining = *reinterpret_cast<uint const *>( 00145 pDynamicParamManager->getParam(segmentLimitParamId) 00146 .getDatum().pData); 00147 } 00148 00149 for (uint i = 0; i < quantum.nTuplesMax; i++) { 00150 while (! producePending) { 00151
00152 if (isSegmentLimitSet() && segmentsRemaining == 0) { 00153 return EXECRC_QUANTUM_EXPIRED; 00154 } 00155 00156 ExecStreamResult status = readSegment(); 00157 if (status == EXECRC_EOS) { 00158
00159 isDone = workspace.isEmpty() && segmentWriter.isEmpty(); 00160 if (! isDone) { 00161 transferLast(); 00162 producePending = true; 00163 break; 00164 } 00165 return EXECRC_EOS; 00166 } 00167 if (status != EXECRC_YIELD) { 00168 return status; 00169 } 00170 if (! writeSegment()) { 00171 producePending = (! segmentWriter.isEmpty()); 00172 } 00173 } 00174 00175 if (! produceTuple()) { 00176 return EXECRC_BUF_OVERFLOW; 00177 } 00178 producePending = false; 00179 } 00180 return EXECRC_QUANTUM_EXPIRED; 00181 } 00182 00183 void LbmUnionExecStream::closeImpl() 00184 { 00185 ConfluenceExecStream::closeImpl(); 00186 00187 if (scratchAccessor.pSegment) { 00188 scratchAccessor.pSegment->deallocatePageRange( 00189 NULL_PAGE_ID, NULL_PAGE_ID); 00190 } 00191 } 00192 00193 uint LbmUnionExecStream::computeOptWorkspacePages(LcsRid maxRid) 00194 { 00195
00196 return 2; 00197 } 00198 00199 uint LbmUnionExecStream::computeRidLimit(uint nWorkspacePages) 00200 { 00201
00202
00203
00204
00205 uint bytes = (uint) ((nWorkspacePages - 0.25) * pageSize); 00206 return bytes * LbmSegment::LbmOneByteSize; 00207 } 00208 00209 bool LbmUnionExecStream::isConsumerSridSet() 00210 { 00211 return (opaqueToInt(startRidParamId) > 0); 00212 } 00213 00214 bool LbmUnionExecStream::isSegmentLimitSet() 00215 { 00216 return (opaqueToInt(segmentLimitParamId) > 0); 00217 } 00218 00219 ExecStreamResult LbmUnionExecStream::readSegment() 00220 { 00221 if (writePending) { 00222 return EXECRC_YIELD; 00223 } 00224 ExecStreamResult status = segmentReader.readSegmentAndAdvance( 00225 inputSegment.byteNum, inputSegment.byteSeg, inputSegment.len); 00226 if (status == EXECRC_YIELD) { 00227 writePending = true; 00228 } 00229 return status; 00230 } 00231 00232 bool LbmUnionExecStream::writeSegment() 00233 { 00234 assert(writePending = true); 00235 00236
00237 LcsRid currentSrid = segmentReader.getSrid(); 00238 workspace.setProductionLimit(currentSrid); 00239 if (transfer()) { 00240 return false; 00241 } 00242 if (workspace.isEmpty()) { 00243 workspace.advanceToSrid(currentSrid); 00244 } 00245 00246
00247 bool success = workspace.addSegment(inputSegment); 00248 assert(success); 00249 writePending = false; 00250 return true; 00251 } 00252 00253 void LbmUnionExecStream::transferLast() 00254 { 00255 workspace.removeLimit(); 00256 transfer(); 00257 } 00258 00259 bool LbmUnionExecStream::transfer() 00260 { 00261 while (workspace.canProduce()) { 00262 if (isSegmentLimitSet() && segmentsRemaining == 0) { 00263 return false; 00264 } 00265 00266 LbmByteSegment seg = workspace.getSegment(); 00267 assert(seg.len < reverseAreaSize); 00268 PBuffer reverseStart = reverseArea + seg.len - 1; 00269 for (int i = 0; i < seg.len; i++) { 00270 reverseStart[-i] = seg.byteSeg[i]; 00271 } 00272 LcsRid startRid = seg.getSrid(); 00273 if (! segmentWriter.addSegment(startRid, reverseArea, seg.len)) { 00274 return false; 00275 } 00276 workspace.advancePastSegment(); 00277 00278 if (isSegmentLimitSet()) { 00279 segmentsRemaining--; 00280 } 00281 } 00282 return true; 00283 } 00284 00285 bool LbmUnionExecStream::produceTuple() 00286 { 00287 assert(producePending); 00288 assert(! segmentWriter.isEmpty()); 00289 00290 outputTuple = segmentWriter.produceSegmentTuple(); 00291 if (pOutAccessor->produceTuple(outputTuple)) { 00292 segmentWriter.reset(); 00293 producePending = false; 00294 return true; 00295 } 00296 return false; 00297 } 00298 00299 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/bitmap/LbmUnionExecStream.cpp#15 $"); 00300 00301