Fennel: /home/pub/open/dev/fennel/sorter/ExternalSortExecStreamImpl.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/sorter/ExternalSortExecStreamImpl.h" 00026 #include "fennel/segment/Segment.h" 00027 #include "fennel/segment/SegStreamAllocation.h" 00028 #include "fennel/exec/ExecStreamGraphImpl.h" 00029 #include "fennel/exec/ExecStreamBufAccessor.h" 00030 00031 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortExecStreamImpl.cpp#1 $"); 00032 00033 ExternalSortExecStream ExternalSortExecStream::newExternalSortExecStream() 00034 { 00035 return new ExternalSortExecStreamImpl(); 00036 } 00037 00038 ExternalSortInfo::ExternalSortInfo(ExecStream &streamInit) 00039 : stream(streamInit) 00040 { 00041 nSortMemPages = 0; 00042 nSortMemPagesPerRun = 0; 00043 cbPage = 0; 00044 } 00045 00046 int ExternalSortInfo::compareKeys(TupleData const &key1, TupleData const &key2) 00047 { 00048 int c = keyDesc.compareTuples(key1, key2); 00049 if (!c) { 00050 return 0; 00051 } 00052
00053 int i = (c > 0) ? c : -c; 00054
00055 --i; 00056 if (descendingKeyColumns[i]) { 00057
00058 return -c; 00059 } else { 00060 return c; 00061 } 00062 } 00063 00064 ExternalSortExecStreamImpl::ExternalSortExecStreamImpl() 00065 : sortInfo(this) 00066 { 00067 } 00068 00069 void ExternalSortExecStreamImpl::prepare( 00070 ExternalSortExecStreamParams const &params) 00071 { 00072 ConduitExecStream::prepare(params); 00073 00074 pTempSegment = params.pTempSegment; 00075 resultsReady = false; 00076 nParallel = 1; 00077 storeFinalRun = params.storeFinalRun; 00078 estimatedNumRows = params.estimatedNumRows; 00079 earlyClose = params.earlyClose; 00080 00081 switch (params.distinctness) { 00082 case DUP_ALLOW: 00083 break; 00084 case DUP_DISCARD: 00085
00086 permAssert(false); 00087 case DUP_FAIL: 00088
00089 permAssert(false); 00090 } 00091 00092 TupleDescriptor const &srcRecDef = pInAccessor->getTupleDesc(); 00093 sortInfo.keyProj = params.keyProj; 00094 assert(params.outputTupleDesc == srcRecDef); 00095 sortInfo.tupleDesc = srcRecDef; 00096 sortInfo.keyDesc.projectFrom(sortInfo.tupleDesc,params.keyProj); 00097 sortInfo.descendingKeyColumns = params.descendingKeyColumns; 00098 if (sortInfo.descendingKeyColumns.empty()) { 00099
00100 sortInfo.descendingKeyColumns.resize(sortInfo.keyProj.size(), false); 00101 } 00102 sortInfo.cbPage = params.pTempSegment->getFullPageSize(); 00103 sortInfo.memSegmentAccessor = params.scratchAccessor; 00104 sortInfo.externalSegmentAccessor.pCacheAccessor = params.pCacheAccessor; 00105 sortInfo.externalSegmentAccessor.pSegment = params.pTempSegment; 00106 sortInfo.nSortMemPages = 0; 00107 } 00108 00109 void ExternalSortExecStreamImpl::getResourceRequirements( 00110 ExecStreamResourceQuantity &minQuantity, 00111 ExecStreamResourceQuantity &optQuantity, 00112 ExecStreamResourceSettingType &optType) 00113 { 00114 ConduitExecStream::getResourceRequirements(minQuantity,optQuantity); 00115 00116
00117 uint minPages = 3; 00118 minQuantity.nCachePages += minPages; 00119 00120
00121
00122
00123 if (isMAXU(estimatedNumRows)) { 00124 optType = EXEC_RESOURCE_UNBOUNDED; 00125 } else { 00126
00127
00128 RecordNum nPages = 00129 estimatedNumRows * 00130 ((pOutAccessor->getScratchTupleAccessor().getMaxByteCount() + 00131 pOutAccessor->getScratchTupleAccessor().getMinByteCount()) / 2) / 00132 sortInfo.memSegmentAccessor.pSegment->getUsablePageSize(); 00133 uint numPages; 00134 if (nPages >= uint(MAXU)) { 00135 numPages = uint(MAXU) - 1; 00136 } else { 00137 numPages = uint(nPages); 00138 } 00139
00140
00141 optQuantity.nCachePages += std::max(minPages + 1, numPages); 00142 optType = EXEC_RESOURCE_ESTIMATE; 00143 } 00144 } 00145 00146 void ExternalSortExecStreamImpl::setResourceAllocation( 00147 ExecStreamResourceQuantity &quantity) 00148 { 00149
00150 ConduitExecStream::setResourceAllocation(quantity); 00151 sortInfo.nSortMemPages = quantity.nCachePages; 00152 nParallel = quantity.nThreads + 1; 00153 00154
00155
00156
00157 assert(nParallel == 1); 00158 } 00159 00160 void ExternalSortExecStreamImpl::open(bool restart) 00161 { 00162 if (restart) { 00163 releaseResources(); 00164 } 00165 00166 ConduitExecStream::open(restart); 00167 00168
00169 sortInfo.nSortMemPagesPerRun = (sortInfo.nSortMemPages / nParallel); 00170 00171
00172 assert(sortInfo.nSortMemPagesPerRun > 0); 00173 sortInfo.nSortMemPagesPerRun--; 00174 00175
00176 assert(sortInfo.nSortMemPagesPerRun > 1); 00177 00178 runLoaders.reset(new SharedExternalSortRunLoader[nParallel]); 00179 for (uint i = 0; i < nParallel; ++i) { 00180 runLoaders[i].reset(new ExternalSortRunLoader(sortInfo)); 00181 } 00182 00183 pOutputWriter.reset(new ExternalSortOutput(sortInfo)); 00184 00185 for (uint i = 0; i < nParallel; ++i) { 00186 runLoaders[i]->startRun(); 00187 } 00188 00189
00190 pOutputWriter->setSubStream(
(runLoaders[0])); 00191 00192 resultsReady = false; 00193 } 00194 00195 ExecStreamResult ExternalSortExecStreamImpl::execute( 00196 ExecStreamQuantum const &quantum) 00197 { 00198 if (resultsReady) { 00199 if (pInAccessor->getState() != EXECBUF_EOS) { 00200 ExecStreamResult rc = precheckConduitBuffers(); 00201 if (rc != EXECRC_YIELD) { 00202 return rc; 00203 } 00204 if (nParallel > 1) { 00205
00206 computeFirstResultParallel(); 00207 } else { 00208 computeFirstResult(); 00209 return EXECRC_BUF_UNDERFLOW; 00210 } 00211 } else { 00212 ExternalSortRunLoader &runLoader = (runLoaders[0]); 00213 if (runLoader.isStarted()) { 00214 sortRun(runLoader); 00215 if (storedRuns.size() || storeFinalRun) { 00216
00217 storeRun(runLoader); 00218 } 00219 } 00220 mergeFirstResult(); 00221 00222
00223 if (earlyClose) { 00224 ExecStreamGraphImpl &graphImpl = 00225 dynamic_cast<ExecStreamGraphImpl&>(getGraph()); 00226 graphImpl.closeProducers(getStreamId()); 00227 } 00228 00229 resultsReady = true; 00230 } 00231 } 00232 00233 return pOutputWriter->fetch(
pOutAccessor); 00234 } 00235 00236 void ExternalSortExecStreamImpl::closeImpl() 00237 { 00238 releaseResources(); 00239 ConduitExecStream::closeImpl(); 00240 } 00241 00242 void ExternalSortExecStreamImpl::releaseResources() 00243 { 00244 if (pFinalRunAccessor) { 00245 pFinalRunAccessor->releaseResources(); 00246 } 00247 00248 runLoaders.reset(); 00249 pMerger.reset(); 00250 pOutputWriter.reset(); 00251 pFinalRunAccessor.reset(); 00252 storedRuns.clear(); 00253 } 00254 00255 void ExternalSortExecStreamImpl::computeFirstResult() 00256 { 00257 ExternalSortRunLoader &runLoader = (runLoaders[0]); 00258 for (;;) { 00259 if (!runLoader.isStarted()) { 00260 runLoader.startRun(); 00261 } 00262 ExternalSortRC rc = runLoader.loadRun(pInAccessor); 00263 if (rc == EXTSORT_OVERFLOW) { 00264 sortRun(runLoader); 00265 storeRun(runLoader); 00266 } else { 00267 return; 00268 } 00269 } 00270 } 00271 00272 void ExternalSortExecStreamImpl::storeRun(ExternalSortSubStream &subStream) 00273 { 00274 FENNEL_TRACE( 00275 TRACE_FINE, 00276 "storing run " << storedRuns.size()); 00277 00278 boost::scoped_ptr pRunAccessor; 00279 pRunAccessor.reset(new ExternalSortRunAccessor(sortInfo)); 00280 pRunAccessor->storeRun(subStream); 00281 00282 StrictMutexGuard mutexGuard(storedRunMutex); 00283 storedRuns.push_back(pRunAccessor->getStoredRun()); 00284 } 00285 00286 void ExternalSortExecStreamImpl::mergeFirstResult() 00287 { 00288 if (storedRuns.size()) { 00289 for (uint i = 0; i < nParallel; i++) { 00290 runLoaders[i]->releaseResources(); 00291 } 00292 00293 if (pMerger) { 00294 pMerger.reset(new ExternalSortMerger(sortInfo)); 00295 pMerger->initRunAccess(); 00296 } 00297 00298 uint iFirstRun = storedRuns.size() - 1; 00299 while (iFirstRun > 0) { 00300 uint nRunsToMerge; 00301 00302
00303
00304
00305 uint nMergePages = sortInfo.nSortMemPages - 1; 00306 if (storedRuns.size() <= nMergePages) { 00307 nRunsToMerge = storedRuns.size(); 00308 } else { 00309 nRunsToMerge = std::min( 00310 storedRuns.size() - nMergePages + 1, 00311 nMergePages); 00312 } 00313 00314 optimizeRunOrder(); 00315 iFirstRun = storedRuns.size() - nRunsToMerge; 00316 00317 FENNEL_TRACE( 00318 TRACE_FINE, 00319 "merging from run " << iFirstRun 00320 << " with run count = " << nRunsToMerge); 00321 00322 pMerger->startMerge( 00323 storedRuns.begin() + iFirstRun, nRunsToMerge); 00324 if ((iFirstRun > 0) || storeFinalRun) { 00325 storeRun(
pMerger); 00326 deleteStoredRunInfo(iFirstRun,nRunsToMerge); 00327 } 00328 } 00329 00330 if (storedRuns.size() == 1) { 00331 if (pFinalRunAccessor) { 00332 pFinalRunAccessor.reset(new ExternalSortRunAccessor(sortInfo)); 00333 } 00334 00335 FENNEL_TRACE( 00336 TRACE_FINE, 00337 "fetching from final run"); 00338 00339 pFinalRunAccessor->initRead(); 00340 pFinalRunAccessor->startRead(storedRuns[0]); 00341 pMerger->releaseResources(); 00342 pOutputWriter->setSubStream(pFinalRunAccessor); 00343 } else { 00344 FENNEL_TRACE( 00345 TRACE_FINE, 00346 "fetching from final merge with run count = " 00347 << storedRuns.size()); 00348 00349 pOutputWriter->setSubStream(pMerger); 00350 } 00351 } 00352 } 00353 00354 void ExternalSortExecStreamImpl::optimizeRunOrder() 00355 { 00356 uint i = storedRuns.size() - 1; 00357 while ((i > 0) 00358 && (storedRuns[i]->getWrittenPageCount() 00359 > storedRuns[i - 1]->getWrittenPageCount())) 00360 { 00361 std::swap(storedRuns[i],storedRuns[i - 1]); 00362 i--; 00363 } 00364 } 00365 00366 void ExternalSortExecStreamImpl::deleteStoredRunInfo(uint iFirstRun,uint nRuns) 00367 { 00368 StrictMutexGuard mutexGuard(storedRunMutex); 00369 storedRuns.erase( 00370 storedRuns.begin() + iFirstRun, 00371 storedRuns.begin() + iFirstRun + nRuns); 00372 } 00373 00374 void ExternalSortExecStreamImpl::computeFirstResultParallel() 00375 { 00376
00377 00378 assert(nParallel > 1); 00379 00380
00381
00382 threadPool.start(nParallel - 1); 00383 try { 00384 for (;;) { 00385 ExternalSortRunLoader &runLoader = reserveRunLoader(); 00386 runLoader.startRun(); 00387
00388 #if 0 00389 ExternalSortRC rc = runLoader.loadRun(*pInputStream); 00390 #else 00391 ExternalSortRC rc = EXTSORT_ENDOFDATA; 00392 #endif 00393 if (rc == EXTSORT_ENDOFDATA) { 00394
00395
00396 unreserveRunLoader(runLoader); 00397 break; 00398 } 00399
00400
00401 #if 0 00402 ExternalSortTask task(*this,runLoader); 00403 threadPool.submitTask(task); 00404 #endif 00405 } 00406 } catch (...) { 00407
00408 00409
00410 threadPool.stop(); 00411 throw; 00412 } 00413 00414
00415 threadPool.stop(); 00416 00417 mergeFirstResult(); 00418 resultsReady = true; 00419 } 00420 00421 void ExternalSortTask::execute() 00422 { 00423 sortStream.sortRun(runLoader); 00424 sortStream.storeRun(runLoader); 00425 sortStream.unreserveRunLoader(runLoader); 00426 } 00427 00428 void ExternalSortExecStreamImpl::sortRun(ExternalSortRunLoader &runLoader) 00429 { 00430 FENNEL_TRACE( 00431 TRACE_FINE, 00432 "sorting run with tuple count = " 00433 << runLoader.getLoadedTupleCount()); 00434 runLoader.sort(); 00435 } 00436 00437 ExternalSortRunLoader &ExternalSortExecStreamImpl::reserveRunLoader() 00438 { 00439 StrictMutexGuard mutexGuard(runLoaderMutex); 00440 for (;;) { 00441 for (uint i = 0; i < nParallel; ++i) { 00442 ExternalSortRunLoader &runLoader = *(runLoaders[i]); 00443 if (!runLoader.runningParallelTask) { 00444 runLoader.runningParallelTask = true; 00445 return runLoader; 00446 } 00447 } 00448 runLoaderAvailable.wait(mutexGuard); 00449 } 00450 } 00451 00452 void ExternalSortExecStreamImpl::unreserveRunLoader( 00453 ExternalSortRunLoader &runLoader) 00454 { 00455 StrictMutexGuard mutexGuard(runLoaderMutex); 00456 runLoader.runningParallelTask = false; 00457 runLoaderAvailable.notify_all(); 00458 } 00459 00460 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortExecStreamImpl.cpp#1 $"); 00461 00462