Fennel: /home/pub/open/dev/fennel/segment/SpillOutputStream.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/segment/SpillOutputStream.h" 00026 #include "fennel/segment/SegOutputStream.h" 00027 #include "fennel/segment/SegInputStream.h" 00028 #include "fennel/segment/SegmentFactory.h" 00029 #include "fennel/common/ByteArrayInputStream.h" 00030 00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/segment/SpillOutputStream.cpp#7 $"); 00032 00033 00034 00035 SharedSpillOutputStream SpillOutputStream::newSpillOutputStream( 00036 SharedSegmentFactory pSegmentFactory, 00037 SharedCacheAccessor pCacheAccessor, 00038 std::string spillFileName) 00039 { 00040 return SharedSpillOutputStream( 00041 new SpillOutputStream(pSegmentFactory,pCacheAccessor,spillFileName), 00042 ClosableObjectDestructor()); 00043 } 00044 00045 SpillOutputStream::SpillOutputStream( 00046 SharedSegmentFactory pSegmentFactoryInit, 00047 SharedCacheAccessor pCacheAccessorInit, 00048 std::string spillFileNameInit) 00049 : pSegmentFactory(pSegmentFactoryInit), 00050 pCacheAccessor(pCacheAccessorInit), 00051 spillFileName(spillFileNameInit) 00052 { 00053
00054
00055 scratchAccessor = pSegmentFactory->newScratchSegment( 00056 pCacheAccessor->getCache(), 00057 1); 00058 scratchPageLock.accessSegment(scratchAccessor); 00059 scratchPageLock.allocatePage(); 00060 cbBuffer = scratchAccessor.pSegment->getUsablePageSize(); 00061 setBuffer( 00062 scratchPageLock.getPage().getWritableData(), 00063 cbBuffer); 00064 } 00065 00066 SpillOutputStream::~SpillOutputStream() 00067 { 00068 } 00069 00070 void SpillOutputStream::flushBuffer(uint cbRequested) 00071 { 00072 if (scratchPageLock.isLocked()) { 00073 assert(pSegOutputStream); 00074
00075 spill(); 00076 } else { 00077 assert(pSegOutputStream); 00078 assert(scratchPageLock.isLocked()); 00079
00080 assert(cbBuffer >= getBytesAvailable()); 00081 pSegOutputStream->consumeWritePointer(cbBuffer - getBytesAvailable()); 00082 } 00083 assert(pSegOutputStream); 00084 if (cbRequested) { 00085 PBuffer pBuffer = 00086 pSegOutputStream->getWritePointer(cbRequested,&cbBuffer); 00087 setBuffer(pBuffer,cbBuffer); 00088 } else { 00089 pSegOutputStream->hardPageBreak(); 00090 cbBuffer = 0; 00091 } 00092 } 00093 00094 void SpillOutputStream::closeImpl() 00095 { 00096 if (scratchPageLock.isLocked()) { 00097
00098 scratchPageLock.unlock(); 00099 } else { 00100 assert(pSegOutputStream); 00101 assert(scratchPageLock.isLocked()); 00102
00103 pSegOutputStream->consumeWritePointer(cbBuffer - getBytesAvailable()); 00104 cbBuffer = 0; 00105 pSegOutputStream.reset(); 00106 } 00107 } 00108 00109 void SpillOutputStream::setWriteLatency(WriteLatency writeLatency) 00110 { 00111 ByteOutputStream::setWriteLatency(writeLatency); 00112 if (pSegOutputStream) { 00113 pSegOutputStream->setWriteLatency(writeLatency); 00114 } 00115 } 00116 00117 00118 00119 00120 00121 00122 00123 void SpillOutputStream::spill() 00124 { 00125 DeviceMode devMode = DeviceMode::createNew; 00126
00127
00128 devMode.direct = true; 00129 SharedSegment pLongLogSegment = 00130 pSegmentFactory->newTempDeviceSegment( 00131 scratchPageLock.getCacheAccessor()->getCache(), 00132 devMode, 00133 spillFileName); 00134 SegmentAccessor segmentAccessor(pLongLogSegment,pCacheAccessor); 00135 pSegOutputStream = SegOutputStream::newSegOutputStream(segmentAccessor); 00136 pSegOutputStream->setWriteLatency(writeLatency); 00137 pSegOutputStream->writeBytes( 00138 scratchPageLock.getPage().getReadableData(), 00139 cbBuffer - getBytesAvailable()); 00140 scratchPageLock.unlock(); 00141 } 00142 00143 SharedByteInputStream SpillOutputStream::getInputStream( 00144 SeekPosition seekPosition) 00145 { 00146 if (scratchPageLock.isLocked()) { 00147 SharedByteInputStream pInputStream = 00148 ByteArrayInputStream::newByteArrayInputStream( 00149 scratchPageLock.getPage().getReadableData(), 00150 getOffset()); 00151 if (seekPosition == SEEK_STREAM_END) { 00152 pInputStream->seekForward(getOffset()); 00153 } 00154 return pInputStream; 00155 } else { 00156 assert(pSegOutputStream); 00157 updatePage(); 00158 SharedSegment pSegment = pSegOutputStream->getSegment(); 00159 SegStreamPosition endPos; 00160 if (seekPosition == SEEK_STREAM_END) { 00161 pSegOutputStream->getSegPos(endPos); 00162 } 00163 SegmentAccessor segmentAccessor(pSegment,pCacheAccessor); 00164 SharedSegInputStream pInputStream = 00165 SegInputStream::newSegInputStream(segmentAccessor); 00166 if (seekPosition == SEEK_STREAM_END) { 00167 pInputStream->seekSegPos(endPos); 00168 } 00169 return pInputStream; 00170 } 00171 } 00172 00173 void SpillOutputStream::updatePage() 00174 { 00175 if (cbBuffer) { 00176 return; 00177 } 00178 assert(cbBuffer > getBytesAvailable()); 00179 uint cbConsumed = cbBuffer - getBytesAvailable(); 00180 pSegOutputStream->consumeWritePointer(cbConsumed); 00181 cbBuffer -= cbConsumed; 00182 pSegOutputStream->updatePage(); 00183 } 00184 00185 SharedSegment SpillOutputStream::getSegment() 00186 { 00187 if (pSegOutputStream) { 00188 return pSegOutputStream->getSegment(); 00189 } else { 00190 return SharedSegment(); 00191 } 00192 } 00193 00194 SharedSegOutputStream SpillOutputStream::getSegOutputStream() 00195 { 00196 return pSegOutputStream; 00197 } 00198 00199 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/segment/SpillOutputStream.cpp#7 $"); 00200 00201