Fennel: /home/pub/open/dev/fennel/segment/SegInputStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/segment/SegInputStream.h" 00026 00027 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/segment/SegInputStream.cpp#11 $"); 00028 00029 SharedSegInputStream SegInputStream::newSegInputStream( 00030 SegmentAccessor const &segmentAccessor, 00031 PageId beginPageId) 00032 { 00033 return SharedSegInputStream( 00034 new SegInputStream(segmentAccessor,beginPageId), 00035 ClosableObjectDestructor()); 00036 } 00037 00038 SegInputStream::SegInputStream( 00039 SegmentAccessor const &segmentAccessor, 00040 PageId beginPageId, 00041 uint cbExtraHeader) 00042 : SegStream(segmentAccessor,cbExtraHeader) 00043 { 00044 if (beginPageId == FIRST_LINEAR_PAGE_ID) { 00045 assert( 00046 getSegment()->getAllocationOrder() == Segment::LINEAR_ALLOCATION); 00047 } 00048 currPageId = beginPageId; 00049 shouldDeallocate = false; 00050 } 00051 00052 void SegInputStream::startPrefetch() 00053 { 00054 pageIter.mapRange(segmentAccessor,currPageId); 00055 } 00056 00057 void SegInputStream::endPrefetch() 00058 { 00059 pageIter.makeSingular(); 00060 } 00061 00062 void SegInputStream::lockBuffer() 00063 { 00064 pageLock.lockShared(currPageId); 00065 SegStreamNode const &node = pageLock.getNodeForRead(); 00066 PConstBuffer pFirstByte = 00067 reinterpret_cast(&node) + cbPageHeader; 00068 setBuffer(pFirstByte,node.cbData); 00069 } 00070 00071 void SegInputStream::readNextBuffer() 00072 { 00073 nullifyBuffer(); 00074 if (pageLock.isLocked()) { 00075 if (currPageId != NULL_PAGE_ID) { 00076 if (pageIter.isSingular()) { 00077 currPageId = getSegment()->getPageSuccessor(currPageId); 00078 } else { 00079 ++pageIter; 00080 assert(*pageIter == getSegment()->getPageSuccessor(currPageId)); 00081 currPageId = *pageIter; 00082 } 00083 } 00084 if (shouldDeallocate) { 00085 pageLock.deallocateLockedPage(); 00086 } else { 00087 pageLock.unlock(); 00088 } 00089 } 00090 if (currPageId == NULL_PAGE_ID) { 00091 return; 00092 } 00093 lockBuffer(); 00094 } 00095 00096 void SegInputStream::readPrevBuffer() 00097 { 00098 assert(pageIter.isSingular()); 00099 assert( 00100 getSegment()->getAllocationOrder() >= Segment::CONSECUTIVE_ALLOCATION); 00101 nullifyBuffer(); 00102 if (Segment::getLinearBlockNum(currPageId) == 0) { 00103 return; 00104 } 00105 --currPageId; 00106 lockBuffer(); 00107 } 00108 00109 void SegInputStream::closeImpl() 00110 { 00111 pageIter.makeSingular(); 00112 if (shouldDeallocate) { 00113 pageLock.unlock(); 00114 while (currPageId != NULL_PAGE_ID) { 00115 PageId nextPageId = getSegment()->getPageSuccessor(currPageId); 00116 pageLock.deallocateUnlockedPage(currPageId); 00117 currPageId = nextPageId; 00118 } 00119 } 00120 SegStream::closeImpl(); 00121 } 00122 00123 void SegInputStream::getSegPos(SegStreamPosition &pos) 00124 { 00125 CompoundId::setPageId(pos.segByteId,currPageId); 00126 CompoundId::setByteOffset(pos.segByteId,getBytesConsumed()); 00127 pos.cbOffset = cbOffset; 00128 } 00129 00130 void SegInputStream::seekSegPos(SegStreamPosition const &pos) 00131 { 00132 assert(pageIter.isSingular()); 00133 currPageId = CompoundId::getPageId(pos.segByteId); 00134 lockBuffer(); 00135 uint cb = CompoundId::getByteOffset(pos.segByteId); 00136 if (cb == CompoundId::MAX_BYTE_OFFSET) { 00137 consumeReadPointer(getBytesAvailable()); 00138 } else { 00139 consumeReadPointer(cb); 00140 } 00141 00142 cbOffset = pos.cbOffset; 00143 } 00144 00145 void SegInputStream::setDeallocate( 00146 bool shouldDeallocateInit) 00147 { 00148 shouldDeallocate = shouldDeallocateInit; 00149 } 00150 00151 bool SegInputStream::isDeallocating() 00152 { 00153 return shouldDeallocate; 00154 } 00155 00156 SharedByteStreamMarker SegInputStream::newMarker() 00157 { 00158 return SharedByteStreamMarker(new SegStreamMarker(*this)); 00159 } 00160 00161 void SegInputStream::mark(ByteStreamMarker &marker) 00162 { 00163 assert(&(marker.getStream()) == this); 00164 00165
00166 SegStreamMarker &segMarker = 00167 dynamic_cast<SegStreamMarker &>(marker); 00168 getSegPos(segMarker.segPos); 00169 } 00170 00171 void SegInputStream::reset(ByteStreamMarker const &marker) 00172 { 00173 assert(&(marker.getStream()) == this); 00174 00175
00176 SegStreamMarker const &segMarker = 00177 dynamic_cast<SegStreamMarker const &>(marker); 00178 00179
00180 bool prefetch = pageIter.isSingular(); 00181 endPrefetch(); 00182 00183 seekSegPos(segMarker.segPos); 00184 00185
00186 if (prefetch) { 00187 startPrefetch(); 00188 } 00189 } 00190 00191 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/segment/SegInputStream.cpp#11 $"); 00192 00193