Fennel: /home/pub/open/dev/fennel/sorter/ExternalSortRunLoader.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/sorter/ExternalSortRunLoader.h" 00026 #include "fennel/sorter/ExternalSortInfo.h" 00027 #include "fennel/exec/ExecStreamBufAccessor.h" 00028 00029 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortRunLoader.cpp#1 $"); 00030 00031 ExternalSortRunLoader::ExternalSortRunLoader(ExternalSortInfo &sortInfoIn) 00032 : sortInfo(sortInfoIn) 00033 { 00034 nMemPagesMax = 0; 00035 pDataBuffer = NULL; 00036 pIndexBuffer = NULL; 00037 00038 nTuplesLoaded = nTuplesFetched = 0; 00039 00040 runningParallelTask = false; 00041 00042 bufferLock.accessSegment(sortInfo.memSegmentAccessor); 00043 00044 tupleAccessor.compute(sortInfo.tupleDesc); 00045 tupleAccessor2.compute(sortInfo.tupleDesc); 00046 00047 keyData.compute(sortInfo.keyDesc); 00048 keyData2 = keyData; 00049 00050 keyAccessor.bind(tupleAccessor,sortInfo.keyProj); 00051 keyAccessor2.bind(tupleAccessor2,sortInfo.keyProj); 00052 00053
00054
00055 uint nKeysPerPage = sortInfo.cbPage / sizeof(PBuffer); 00056 assert(nKeysPerPage > 1); 00057 nKeysPerPage >>= 1; 00058 indexToPageShift = 1; 00059 indexPageMask = 1; 00060 while ((nKeysPerPage & indexPageMask) == 0) { 00061 ++indexToPageShift; 00062 indexPageMask <<= 1; 00063 ++indexPageMask; 00064 } 00065 } 00066 00067 ExternalSortRunLoader::~ExternalSortRunLoader() 00068 { 00069 releaseResources(); 00070 } 00071 00072 inline PBuffer &ExternalSortRunLoader::getPointerArrayEntry(uint iTuple) 00073 { 00074
00075
00076 uint iPage = iTuple >> indexToPageShift; 00077 uint iSubKey = iTuple & indexPageMask; 00078 PBuffer *pPage = reinterpret_cast<PBuffer *>(indexBuffers[iPage]); 00079 return pPage[iSubKey]; 00080 } 00081 00082 void ExternalSortRunLoader::startRun() 00083 { 00084 if (nMemPagesMax) { 00085 nMemPagesMax = sortInfo.nSortMemPagesPerRun; 00086 } 00087 00088 freeBuffers.insert( 00089 freeBuffers.end(),indexBuffers.begin(),indexBuffers.end()); 00090 freeBuffers.insert( 00091 freeBuffers.end(),dataBuffers.begin(),dataBuffers.end()); 00092 indexBuffers.clear(); 00093 dataBuffers.clear(); 00094 pDataBuffer = NULL; 00095 pIndexBuffer = NULL; 00096 00097 if (allocateDataBuffer()) { 00098 permAssert(false); 00099 } 00100 00101 if (allocateIndexBuffer()) { 00102 permAssert(false); 00103 } 00104 00105 nTuplesLoaded = nTuplesFetched = 0; 00106 00107 fetchArray.nTuples = 0; 00108 } 00109 00110 PBuffer ExternalSortRunLoader::allocateBuffer() 00111 { 00112 PBuffer pBuffer; 00113 00114 if (freeBuffers.empty()) { 00115 pBuffer = freeBuffers.back(); 00116 freeBuffers.pop_back(); 00117 return pBuffer; 00118 } 00119 00120 if ((indexBuffers.size() + dataBuffers.size()) >= nMemPagesMax) { 00121 return NULL; 00122 } 00123 00124 bufferLock.allocatePage(); 00125 pBuffer = bufferLock.getPage().getWritableData(); 00126 00127
00128
00129
00130 bufferLock.unlock(); 00131 00132 return pBuffer; 00133 } 00134 00135 bool ExternalSortRunLoader::allocateDataBuffer() 00136 { 00137 pDataBuffer = allocateBuffer(); 00138 if (pDataBuffer) { 00139 dataBuffers.push_back(pDataBuffer); 00140 pDataBufferEnd = pDataBuffer + sortInfo.cbPage; 00141 return true; 00142 } else { 00143 return false; 00144 } 00145 } 00146 00147 bool ExternalSortRunLoader::allocateIndexBuffer() 00148 { 00149 pIndexBuffer = allocateBuffer(); 00150 if (pIndexBuffer) { 00151 indexBuffers.push_back(pIndexBuffer); 00152 pIndexBufferEnd = pIndexBuffer + sortInfo.cbPage; 00153 return true; 00154 } else { 00155 return false; 00156 } 00157 } 00158 00159 void ExternalSortRunLoader::releaseResources() 00160 { 00161 freeBuffers.clear(); 00162 indexBuffers.clear(); 00163 dataBuffers.clear(); 00164 nMemPagesMax = 0; 00165 00166
00167
00168 00169 sortInfo.memSegmentAccessor.pSegment->deallocatePageRange( 00170 NULL_PAGE_ID,NULL_PAGE_ID); 00171 } 00172 00173 ExternalSortRC ExternalSortRunLoader::loadRun( 00174 ExecStreamBufAccessor &bufAccessor) 00175 { 00176 for (;;) { 00177 if (!bufAccessor.demandData()) { 00178 break; 00179 } 00180 uint cbAvailable = bufAccessor.getConsumptionAvailable(); 00181 assert(cbAvailable); 00182 PConstBuffer pSrc = bufAccessor.getConsumptionStart(); 00183 bool overflow = false; 00184 uint cbCopy = 0; 00185 while (cbCopy < cbAvailable) { 00186 PConstBuffer pSrcTuple = pSrc + cbCopy; 00187 uint cbTuple = tupleAccessor.getBufferByteCount(pSrcTuple); 00188 assert(cbTuple); 00189 assert(cbTuple <= tupleAccessor.getMaxByteCount()); 00190 00191
00192 if (pIndexBuffer >= pIndexBufferEnd) { 00193 if (allocateIndexBuffer()) { 00194 overflow = true; 00195 break; 00196 } 00197 } 00198 00199
00200 if (pDataBuffer + cbCopy + cbTuple > pDataBufferEnd) { 00201 if (!cbCopy) { 00202
00203 if (allocateDataBuffer()) { 00204
00205 return EXTSORT_OVERFLOW; 00206 } 00207 } 00208
00209
00210 break; 00211 } 00212 00213 *((PBuffer *) pIndexBuffer) = pDataBuffer + cbCopy; 00214 pIndexBuffer += sizeof(PBuffer); 00215 nTuplesLoaded++; 00216 cbCopy += cbTuple; 00217 } 00218 if (cbCopy) { 00219 memcpy(pDataBuffer,pSrc,cbCopy); 00220 pDataBuffer += cbCopy; 00221 bufAccessor.consumeData(pSrc + cbCopy); 00222 } 00223 if (overflow) { 00224 return EXTSORT_OVERFLOW; 00225 } 00226 } 00227 00228 assert(nTuplesLoaded); 00229 return EXTSORT_SUCCESS; 00230 } 00231 00232 void ExternalSortRunLoader::sort() 00233 { 00234 assert (nTuplesLoaded); 00235 00236 quickSort(0,nTuplesLoaded - 1); 00237 } 00238 00239 ExternalSortFetchArray &ExternalSortRunLoader::bindFetchArray() 00240 { 00241 return fetchArray; 00242 } 00243 00244 ExternalSortRC ExternalSortRunLoader::fetch(uint nTuplesRequested) 00245 { 00246 if (nTuplesFetched >= nTuplesLoaded) { 00247 return EXTSORT_ENDOFDATA; 00248 } 00249 00250 fetchArray.ppTupleBuffers = (PBuffer *) 00251 &(getPointerArrayEntry(nTuplesFetched)); 00252 uint pageEnd = (nTuplesFetched | indexPageMask) + 1; 00253 pageEnd = std::min(pageEnd,nTuplesLoaded); 00254 fetchArray.nTuples = std::min(nTuplesRequested,pageEnd - nTuplesFetched); 00255 nTuplesFetched += fetchArray.nTuples; 00256 00257 return EXTSORT_SUCCESS; 00258 } 00259 00260 inline void ExternalSortRunLoader::quickSortSwap(uint l,uint r) 00261 { 00262 std::swap(getPointerArrayEntry(l),getPointerArrayEntry(r)); 00263 } 00264 00265 00266 const uint step_factor = 7; 00267 00268 PBuffer ExternalSortRunLoader::quickSortFindPivot(uint l, uint r) 00269 { 00270 uint i, j, cnt, step; 00271 PBuffer vals[step_factor]; 00272 00273 if (r <= l) { 00274 return NULL; 00275 } 00276 00277 cnt = 0; 00278 step = ((r - l) / step_factor) + 1; 00279 for (i = l; i <= r; i += step) { 00280 vals[cnt] = getPointerArrayEntry(i); 00281 j = cnt++; 00282 while (j > 0) { 00283 tupleAccessor.setCurrentTupleBuf(vals[j]); 00284 tupleAccessor2.setCurrentTupleBuf(vals[j - 1]); 00285 keyAccessor.unmarshal(keyData); 00286 keyAccessor2.unmarshal(keyData2); 00287 if (sortInfo.compareKeys(keyData,keyData2) >= 0) { 00288 break; 00289 } 00290 std::swap(vals[j],vals[j - 1]); 00291 j--; 00292 } 00293 } 00294 if (step == 1) { 00295 for (i = 0; i < cnt; ++i) { 00296 getPointerArrayEntry(l + i) = vals[i]; 00297 } 00298 return NULL; 00299 } 00300 return vals[(cnt >> 1)]; 00301 } 00302 00303 uint ExternalSortRunLoader::quickSortPartition(uint l,uint r,PBuffer pivot) 00304 { 00305 l--; 00306 r++; 00307 00308 tupleAccessor.setCurrentTupleBuf(pivot); 00309 keyAccessor.unmarshal(keyData); 00310 for (;;) { 00311 for (;;) { 00312 ++l; 00313 tupleAccessor2.setCurrentTupleBuf(getPointerArrayEntry(l)); 00314 keyAccessor2.unmarshal(keyData2); 00315 if (sortInfo.compareKeys(keyData2,keyData) >= 0) { 00316 break; 00317 } 00318 } 00319 for (;;) { 00320 --r; 00321 tupleAccessor2.setCurrentTupleBuf(getPointerArrayEntry(r)); 00322 keyAccessor2.unmarshal(keyData2); 00323 if (sortInfo.compareKeys(keyData2,keyData) <= 0) { 00324 break; 00325 } 00326 } 00327 if (l < r) { 00328 quickSortSwap(l, r); 00329 } else { 00330 return l; 00331 } 00332 } 00333 } 00334 00335 void ExternalSortRunLoader::quickSort(uint l,uint r) 00336 { 00337 PBuffer pPivotTuple; 00338 uint x; 00339 00340 pPivotTuple = quickSortFindPivot(l,r); 00341 if (pPivotTuple) { 00342 x = quickSortPartition(l,r,pPivotTuple); 00343 if (x == l) { 00344
00345
00346 x++; 00347 } 00348 quickSort(l,x - 1); 00349 quickSort(x,r); 00350 } 00351 } 00352 00353 uint ExternalSortRunLoader::getLoadedTupleCount() 00354 { 00355 return nTuplesLoaded; 00356 } 00357 00358 bool ExternalSortRunLoader::isStarted() 00359 { 00360 return nTuplesLoaded > nTuplesFetched; 00361 } 00362 00363 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortRunLoader.cpp#1 $"); 00364 00365