Fennel: /home/pub/open/dev/fennel/lucidera/colstore/LcsClusterReader.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 #include "fennel/common/CommonPreamble.h" 00023 #include "fennel/lucidera/colstore/LcsClusterReader.h" 00024 #include "fennel/lucidera/colstore/LcsColumnReader.h" 00025 #include "fennel/btree/BTreeWriter.h" 00026 #include "fennel/tuple/TupleAccessor.h" 00027 #include "fennel/segment/SegPageEntryIterImpl.h" 00028 00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterReader.cpp#14 $"); 00030 00031 LcsClusterReader::LcsClusterReader( 00032 BTreeDescriptor const &treeDescriptor, 00033 CircularBuffer *pRidRuns) 00034 : 00035 LcsClusterAccessBase(treeDescriptor), 00036 SegPageEntryIterSource(), 00037 ridRunIter(pRidRuns), 00038 prefetchQueue(4000) 00039 { 00040 bTreeReader = SharedBTreeReader(new BTreeReader(treeDescriptor)); 00041 if (pRidRuns == NULL) { 00042 noPrefetch = true; 00043 } else { 00044 noPrefetch = false; 00045 prefetchQueue.setPrefetchSource(*this); 00046 } 00047 } 00048 00049 LcsClusterReader::~LcsClusterReader() 00050 { 00051 } 00052 00053 void LcsClusterReader::setRootPageId(PageId rootPageId) 00054 { 00055 bTreeReader->setRootPageId(rootPageId); 00056 } 00057 00058 LcsClusterNode const &LcsClusterReader::readClusterPage() 00059 { 00060 bTreeReader->getTupleAccessorForRead().unmarshal(bTreeTupleData); 00061 clusterPageId = readClusterPageId(); 00062
00063
00064 bTreeRid = readRid(); 00065 clusterLock.lockShared(clusterPageId); 00066 LcsClusterNode const &node = clusterLock.getNodeForRead(); 00067 assert(bTreeRid == node.firstRID); 00068 return node; 00069 } 00070 00071 bool LcsClusterReader::getFirstClusterPageForRead( 00072 PConstLcsClusterNode &pBlock) 00073 { 00074 bool found; 00075 00076 found = bTreeReader->searchFirst(); 00077 if (!found) { 00078 return false; 00079 } 00080 00081 LcsClusterNode const &node = readClusterPage(); 00082 pBlock = &node; 00083 return true; 00084 } 00085 00086 bool LcsClusterReader::getNextClusterPageForRead(PConstLcsClusterNode &pBlock) 00087 { 00088 bool found; 00089 00090 found = bTreeReader->searchNext(); 00091 if (!found) { 00092 return false; 00093 } 00094 00095 LcsClusterNode const &node = readClusterPage(); 00096 pBlock = &node; 00097 return true; 00098 } 00099 00100 void LcsClusterReader::initColumnReaders( 00101 uint nClusterColsInit, 00102 TupleProjection const &clusterProj) 00103 { 00104 nClusterCols = nClusterColsInit; 00105 nColsToRead = clusterProj.size(); 00106 clusterCols.reset(new LcsColumnReader[nColsToRead]); 00107 for (uint i = 0; i < nColsToRead; i++) { 00108 clusterCols[i].init(this, clusterProj[i]); 00109 } 00110 } 00111 00112 void LcsClusterReader::open() 00113 { 00114 pLeaf = NULL; 00115 pRangeBatches = NULL; 00116 ridRunIter.reset(); 00117 00118
00119 nextBTreeEntry.first = PageId(0); 00120 nextBTreeEntry.second = LcsRid(MAXU); 00121 00122 nextPrefetchEntry.first = PageId(0); 00123 nextPrefetchEntry.second = LcsRid(MAXU); 00124 00125 dumbPrefetch = false; 00126 nextRid = LcsRid(0); 00127 } 00128 00129 void LcsClusterReader::close() 00130 { 00131 bTreeReader->endSearch(); 00132 unlockClusterPage(); 00133 } 00134 00135 bool LcsClusterReader::position(LcsRid rid) 00136 { 00137 bool found; 00138 00139 currRid = rid; 00140 if (pLeaf) { 00141
00142
00143 00144 found = positionInBlock(rid); 00145 if (found) { 00146 return true; 00147 } 00148 } else { 00149 if (noPrefetch) { 00150 if (bTreeReader->searchFirst()) { 00151 bTreeReader->endSearch(); 00152 } 00153 } else { 00154
00155 prefetchQueue.mapRange(segmentAccessor, NULL_PAGE_ID); 00156 } 00157 } 00158 00159 if (noPrefetch) { 00160 found = searchForRid(rid); 00161 if (!found) { 00162 return false; 00163 } 00164 moveToBlock(readClusterPageId()); 00165 } else { 00166 found = moveToBlockWithRid(rid); 00167 if (!found) { 00168 return false; 00169 } 00170 } 00171 00172 found = positionInBlock(rid); 00173
00174 if (!found) { 00175 return false; 00176 } 00177 00178 return true; 00179 } 00180 00181 bool LcsClusterReader::searchForRid(LcsRid rid) 00182 { 00183 bTreeTupleData[0].pData = (PConstBuffer) &rid; 00184
00185 bTreeReader->searchForKey( 00186 bTreeTupleData, DUP_SEEK_BEGIN, false); 00187 if (bTreeReader->isSingular()) { 00188 return false; 00189 } 00190 bTreeReader->getTupleAccessorForRead().unmarshal(bTreeTupleData); 00191 00192 LcsRid key = readRid(); 00193 assert(key <= rid); 00194 return true; 00195 } 00196 00197 void LcsClusterReader::moveToBlock(PageId clusterPageId) 00198 { 00199
00200
00201 00202 clusterLock.lockShared(clusterPageId); 00203 LcsClusterNode const &page = clusterLock.getNodeForRead(); 00204 pLHdr = &page; 00205 setUpBlock(); 00206 } 00207 00208 bool LcsClusterReader::moveToBlockWithRid(LcsRid rid) 00209 { 00210 PageId clusterPageId; 00211 00212
00213
00214
00215 for (;;) { 00216 std::pair<PageId, LcsRid> &prefetchEntry = 00217 (nextPrefetchEntry.second == LcsRid(MAXU)) ? 00218 *prefetchQueue : nextPrefetchEntry; 00219 00220 clusterPageId = prefetchEntry.first; 00221 if (clusterPageId == NULL_PAGE_ID) { 00222 return false; 00223 } 00224 00225 LcsRid prefetchRid = prefetchEntry.second; 00226 assert(prefetchRid <= rid); 00227 00228
00229
00230
00231 ++prefetchQueue; 00232 nextPrefetchEntry = *prefetchQueue; 00233 if (nextPrefetchEntry.first == NULL_PAGE_ID || 00234 rid < nextPrefetchEntry.second) 00235 { 00236 break; 00237 } else { 00238 continue; 00239 } 00240 } 00241 00242
00243 clusterLock.lockShared(clusterPageId); 00244 LcsClusterNode const &page = clusterLock.getNodeForRead(); 00245 pLHdr = &page; 00246 setUpBlock(); 00247 return true; 00248 } 00249 00250 bool LcsClusterReader::positionInBlock(LcsRid rid) 00251 { 00252
00253
00254 while (rid >= getRangeEndRid() 00255 && pRangeBatches + nClusterCols < pBatches + pLHdr->nBatch) { 00256 rangeStartRid += pRangeBatches->nRow; 00257 00258 pRangeBatches += nClusterCols;
00259 00260
00261
00262 rangeEndRid = rangeStartRid + pRangeBatches->nRow; 00263 } 00264 00265
00266 if (rid < getRangeEndRid()) { 00267 assert(rid >= rangeStartRid); 00268 positionInRange(opaqueToInt(rid) - opaqueToInt(rangeStartRid)); 00269 return true; 00270 } else { 00271 return false; 00272 } 00273 } 00274 00275 void LcsClusterReader::setUpBlock() 00276 { 00277
00278 pLeaf = (PBuffer) pLHdr; 00279 setHdrOffsets(pLHdr); 00280 00281 assert(pLHdr->nBatch > 0); 00282 00283 pBatches = (PLcsBatchDir) (pLeaf + pLHdr->oBatch); 00284 rangeStartRid = pLHdr->firstRID; 00285 00286
00287 pRangeBatches = pBatches; 00288
00289 nRangePos = 0; 00290 00291
00292 rangeEndRid = rangeStartRid + pRangeBatches->nRow; 00293 } 00294 00295 bool LcsClusterReader::advance(uint nRids) 00296 { 00297 uint newPos = nRangePos + nRids; 00298 00299 if (newPos < pRangeBatches->nRow) { 00300 nRangePos = newPos; 00301 return true; 00302 } else { 00303 return false; 00304 } 00305 } 00306 00307 PageId LcsClusterReader::getNextPageForPrefetch(LcsRid &rid, bool &found) 00308 { 00309 found = true; 00310 for (;;) { 00311 if (dumbPrefetch) { 00312 rid = currRid; 00313 } else { 00314 rid = getFetchRids(ridRunIter, nextRid, false); 00315 } 00316 00317
00318
00319
00320 if (rid == LcsRid(MAXU)) { 00321 if (ridRunIter.done()) { 00322 rid = LcsRid(0); 00323 return NULL_PAGE_ID; 00324 } else { 00325
00326
00327
00328 dumbPrefetch = true; 00329 continue; 00330 } 00331 } 00332 00333 if (!(nextBTreeEntry.second == LcsRid(MAXU) && 00334 nextBTreeEntry.first == PageId(0))) 00335 { 00336
00337
00338 if (nextBTreeEntry.first == NULL_PAGE_ID) { 00339 rid = LcsRid(0); 00340 return NULL_PAGE_ID; 00341 } 00342 00343
00344
00345
00346
00347 if (rid < nextBTreeEntry.second) { 00348 if (dumbPrefetch) { 00349 rid = nextBTreeEntry.second; 00350 PageId pageId = nextBTreeEntry.first; 00351 getNextBTreeEntry(); 00352 return pageId; 00353 } 00354 nextRid = nextBTreeEntry.second; 00355 continue; 00356 } else { 00357
00358
00359 break; 00360 } 00361 } else { 00362
00363 break; 00364 } 00365 } 00366 00367 bool rc = searchForRid(rid); 00368 if (!rc) { 00369 return NULL_PAGE_ID; 00370 } 00371 rid = readRid(); 00372 PageId pageId = readClusterPageId(); 00373 getNextBTreeEntry(); 00374 return pageId; 00375 } 00376 00377 void LcsClusterReader::getNextBTreeEntry() 00378 { 00379 bool rc = bTreeReader->searchNext(); 00380 if (!rc) { 00381
00382 nextBTreeEntry.first = NULL_PAGE_ID; 00383 } else { 00384 bTreeReader->getTupleAccessorForRead().unmarshal(bTreeTupleData); 00385 nextBTreeEntry.first = readClusterPageId(); 00386 nextBTreeEntry.second = readRid(); 00387 } 00388 } 00389 00390 LcsRid LcsClusterReader::getFetchRids( 00391 CircularBufferIter &ridRunIter, 00392 LcsRid &nextRid, 00393 bool remove) 00394 { 00395 for (;;) { 00396 if (ridRunIter.end()) { 00397 return LcsRid(MAXU); 00398 } 00399 00400 LcsRidRun &currRidRun = *ridRunIter; 00401 if (nextRid < currRidRun.startRid) { 00402 nextRid = currRidRun.startRid + 1; 00403 return currRidRun.startRid; 00404 } else if ( 00405 (currRidRun.nRids == RecordNum(MAXU) && 00406 nextRid >= currRidRun.startRid) || 00407 (nextRid < currRidRun.startRid + currRidRun.nRids)) 00408 { 00409 return nextRid++; 00410 } else { 00411 ++ridRunIter; 00412 if (remove) { 00413 ridRunIter.removeFront(); 00414 } 00415 } 00416 } 00417 } 00418 00419 void LcsClusterReader::catchUp(uint parentBufPos, LcsRid parentNextRid) 00420 { 00421 if (parentNextRid - 1 > nextRid) { 00422 nextRid = parentNextRid - 1; 00423 } 00424 if (parentBufPos > ridRunIter.getCurrPos()) { 00425 ridRunIter.setCurrPos(parentBufPos); 00426 } 00427 } 00428 00429 RecordNum LcsClusterReader::getNumRows() 00430 { 00431
00432 if (bTreeReader->searchLast() == false) { 00433 bTreeReader->endSearch(); 00434 return RecordNum(0); 00435 } 00436 bTreeReader->getTupleAccessorForRead().unmarshal(bTreeTupleData); 00437 LcsClusterNode const &node = readClusterPage(); 00438 00439
00440 RecordNum nRows = RecordNum(opaqueToInt(node.firstRID)); 00441 PLcsBatchDir pBatch = (PLcsBatchDir) ((PBuffer) &node + node.oBatch); 00442 for (uint i = 0; i < node.nBatch; i += nClusterCols) { 00443 nRows += pBatch[i].nRow; 00444 } 00445 00446 bTreeReader->endSearch(); 00447 return nRows; 00448 } 00449 00450 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/lucidera/colstore/LcsClusterReader.cpp#14 $"); 00451 00452