Fennel: /home/pub/open/dev/fennel/sorter/ExternalSortMerger.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/sorter/ExternalSortMerger.h" 00026 #include "fennel/sorter/ExternalSortInfo.h" 00027 #include "fennel/sorter/ExternalSortRunAccessor.h" 00028 #include "fennel/exec/ExecStream.h" 00029 00030 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortMerger.cpp#1 $"); 00031 00032 ExternalSortMerger::ExternalSortMerger(ExternalSortInfo &sortInfoIn) 00033 : sortInfo(sortInfoIn) 00034 { 00035 nRuns = 0; 00036 00037 tupleAccessor.compute(sortInfo.tupleDesc); 00038 tupleAccessor2.compute(sortInfo.tupleDesc); 00039 00040 keyData.compute(sortInfo.keyDesc); 00041 keyData2 = keyData; 00042 00043 keyAccessor.bind(tupleAccessor,sortInfo.keyProj); 00044 keyAccessor2.bind(tupleAccessor2,sortInfo.keyProj); 00045 00046 nMergeMemPages = sortInfo.nSortMemPages + 1; 00047 00048 ppRunAccessors.reset(new SharedExternalSortRunAccessor[nMergeMemPages]); 00049 ppFetchArrays.reset(new ExternalSortFetchArray *[nMergeMemPages]); 00050 00051 pOrds.reset(new uint[nMergeMemPages]); 00052 memset(pOrds.get(),0,sizeof(uint) * nMergeMemPages); 00053 00054 mergeInfo.reset(new ExternalSortMergeInfo[nMergeMemPages]); 00055 00056 fetchArray.ppTupleBuffers = &(ppTupleBuffers[0]); 00057 } 00058 00059 ExternalSortMerger::~ExternalSortMerger() 00060 { 00061 releaseResources(); 00062 } 00063 00064 void ExternalSortMerger::initRunAccess() 00065 { 00066 for (uint i = 1; i < nMergeMemPages; i++) { 00067 ppRunAccessors[i].reset(new ExternalSortRunAccessor(sortInfo)); 00068 ppRunAccessors[i]->initRead(); 00069 ppFetchArrays[i] = &(ppRunAccessors[i]->bindFetchArray()); 00070 } 00071 } 00072 00073 void ExternalSortMerger::releaseResources() 00074 { 00075 ppRunAccessors.reset(); 00076 ppFetchArrays.reset(); 00077 pOrds.reset(); 00078 mergeInfo.reset(); 00079 } 00080 00081 void ExternalSortMerger::startMerge( 00082 std::vector::iterator pStoredRun, 00083 uint nRunsToMerge) 00084 { 00085 assert (nRunsToMerge < nMergeMemPages); 00086 00087 nRuns = nRunsToMerge; 00088 00089 for (uint i = 1; i < nMergeMemPages; i++) { 00090 ppRunAccessors[i]->resetRead(); 00091 pOrds[i] = 0; 00092 } 00093 00094 for (uint i = 1; i <= nRuns; i++) { 00095 ppRunAccessors[i]->startRead(pStoredRun[i - 1]); 00096 ppRunAccessors[i]->fetch(EXTSORT_FETCH_ARRAY_SIZE); 00097 mergeInfo[i].val = ppFetchArrays[i]->ppTupleBuffers[0]; 00098 mergeInfo[i].runOrd = i; 00099 } 00100 fetchArray.nTuples = 0; 00101 00102 heapBuild(); 00103 } 00104 00105 ExternalSortFetchArray &ExternalSortMerger::bindFetchArray() 00106 { 00107 return fetchArray; 00108 } 00109 00110 inline uint ExternalSortMerger::heapParent(uint i) 00111 { 00112 return (i >> 1); 00113 } 00114 00115 inline uint ExternalSortMerger::heapLeft(uint i) 00116 { 00117 return (i << 1); 00118 } 00119 00120 inline uint ExternalSortMerger::heapRight(uint i) 00121 { 00122 return (i << 1) + 1; 00123 } 00124 00125 inline void ExternalSortMerger::heapExchange(uint i,uint j) 00126 { 00127 std::swap(mergeInfo[i],mergeInfo[j]); 00128 } 00129 00130 inline ExternalSortMergeInfo &ExternalSortMerger::getMergeHigh() 00131 { 00132 return mergeInfo[1]; 00133 } 00134 00135 void ExternalSortMerger::heapify(uint i) 00136 { 00137 uint l, r, highest = i; 00138 00139 l = heapLeft(i); 00140 r = heapRight(i); 00141 00142 if (l <= nRuns) { 00143 tupleAccessor.setCurrentTupleBuf(mergeInfo[l].val); 00144 tupleAccessor2.setCurrentTupleBuf(mergeInfo[i].val); 00145 keyAccessor.unmarshal(keyData); 00146 keyAccessor2.unmarshal(keyData2); 00147 if (sortInfo.compareKeys(keyData,keyData2) < 0) { 00148 highest = l; 00149 } 00150 } 00151 00152 if (r <= nRuns) { 00153 tupleAccessor.setCurrentTupleBuf(mergeInfo[r].val); 00154 tupleAccessor2.setCurrentTupleBuf(mergeInfo[highest].val); 00155 keyAccessor.unmarshal(keyData); 00156 keyAccessor2.unmarshal(keyData2); 00157 if (sortInfo.compareKeys(keyData,keyData2) < 0) { 00158 highest = r; 00159 } 00160 } 00161 00162 if (highest != i) { 00163 heapExchange(highest,i); 00164 heapify(highest); 00165 } 00166 } 00167 00168 void ExternalSortMerger::heapBuild() 00169 { 00170 for (uint i = (nRuns / 2); i > 0; i--) { 00171 heapify(i); 00172 } 00173 } 00174 00175 ExternalSortRC ExternalSortMerger::checkFetch() 00176 { 00177 if (nRuns == 0) { 00178 return EXTSORT_ENDOFDATA; 00179 } 00180 if (pOrds[getMergeHigh().runOrd] 00181 >= ppFetchArrays[getMergeHigh().runOrd]->nTuples) 00182 { 00183 ExternalSortRC rc = ppRunAccessors[getMergeHigh().runOrd]->fetch( 00184 EXTSORT_FETCH_ARRAY_SIZE); 00185 if (rc != EXTSORT_SUCCESS) { 00186 assert(rc == EXTSORT_ENDOFDATA); 00187 heapExchange(1, nRuns); 00188 if (--nRuns == 0) { 00189 return EXTSORT_ENDOFDATA; 00190 } 00191 } else { 00192 pOrds[getMergeHigh().runOrd] = 0; 00193 } 00194 } 00195 00196 return EXTSORT_SUCCESS; 00197 } 00198 00199 ExternalSortRC ExternalSortMerger::fetch(uint nTuplesRequested) 00200 { 00201 sortInfo.stream.checkAbort(); 00202 00203 if (nTuplesRequested > EXTSORT_FETCH_ARRAY_SIZE) { 00204 nTuplesRequested = EXTSORT_FETCH_ARRAY_SIZE; 00205 } 00206 00207 ExternalSortRC rc = checkFetch(); 00208 if (rc != EXTSORT_SUCCESS) { 00209
00210 return rc; 00211 } 00212 00213 fetchArray.nTuples = 0; 00214 do { 00215 getMergeHigh().val = 00216 ppFetchArrays[getMergeHigh().runOrd]->ppTupleBuffers[ 00217 pOrds[getMergeHigh().runOrd]]; 00218 00219 heapify(1); 00220 00221 fetchArray.ppTupleBuffers[fetchArray.nTuples] = getMergeHigh().val; 00222 fetchArray.nTuples++; 00223 nTuplesRequested--; 00224 00225 pOrds[getMergeHigh().runOrd]++; 00226 } while (nTuplesRequested 00227 && (pOrds[getMergeHigh().runOrd] 00228 < ppFetchArrays[getMergeHigh().runOrd]->nTuples)); 00229 00230 return EXTSORT_SUCCESS; 00231 } 00232 00233 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/sorter/ExternalSortMerger.cpp#1 $"); 00234 00235