Fennel: /home/pub/open/dev/fennel/exec/SimpleExecStreamGovernor.cpp Source File (original) (raw)

00001 00002 00003 00004 00005 00006 00007 00008 00009 00010 00011 00012 00013 00014 00015 00016 00017 00018 00019 00020 00021 00022 00023 00024 #include "fennel/common/CommonPreamble.h" 00025 #include "fennel/segment/ScratchMemExcn.h" 00026 #include "fennel/exec/SimpleExecStreamGovernor.h" 00027 #include "fennel/exec/ExecStream.h" 00028 #include "fennel/exec/ExecStreamGraphImpl.h" 00029 00030 #include <math.h> 00031 00032 FENNEL_BEGIN_CPPFILE("$Id: //open/dev/fennel/exec/SimpleExecStreamGovernor.cpp#9 $"); 00033 00034 SimpleExecStreamGovernor::SimpleExecStreamGovernor( 00035 ExecStreamResourceKnobs const &knobSettings, 00036 ExecStreamResourceQuantity const &resourcesAvailable, 00037 SharedTraceTarget pTraceTargetInit, 00038 std::string nameInit) 00039 : TraceSource(pTraceTargetInit, nameInit), 00040 ExecStreamGovernor( 00041 knobSettings, resourcesAvailable, pTraceTargetInit, nameInit) 00042 { 00043 perGraphAllocation = computePerGraphAllocation(); 00044 } 00045 00046 SimpleExecStreamGovernor::~SimpleExecStreamGovernor() 00047 { 00048 } 00049 00050 bool SimpleExecStreamGovernor::setResourceKnob( 00051 ExecStreamResourceKnobs const &knob, ExecStreamResourceKnobType knobType) 00052 { 00053 StrictMutexGuard mutexGuard(mutex); 00054 00055 switch (knobType) { 00056 case EXEC_KNOB_EXPECTED_CONCURRENT_STATEMENTS: 00057 knobSettings.expectedConcurrentStatements = 00058 knob.expectedConcurrentStatements; 00059 perGraphAllocation = computePerGraphAllocation(); 00060 FENNEL_TRACE( 00061 TRACE_FINE, 00062 "Expected concurrent statements set to " << 00063 knobSettings.expectedConcurrentStatements << 00064 ". Per graph allocation is now " << perGraphAllocation << 00065 " cache pages."); 00066 break; 00067 00068 case EXEC_KNOB_CACHE_RESERVE_PERCENTAGE: 00069
00070
00071 double percent = (100 - knobSettings.cacheReservePercentage) / 100.0; 00072 uint totalPagesAvailable = (uint) 00073 ((resourcesAvailable.nCachePages + resourcesAssigned.nCachePages) / 00074 percent); 00075 uint numReserve = 00076 totalPagesAvailable * knob.cacheReservePercentage / 100; 00077 if (totalPagesAvailable - numReserve < resourcesAssigned.nCachePages) { 00078 return false; 00079 } 00080 knobSettings.cacheReservePercentage = knob.cacheReservePercentage; 00081 resourcesAvailable.nCachePages = 00082 totalPagesAvailable - numReserve - resourcesAssigned.nCachePages; 00083 perGraphAllocation = computePerGraphAllocation(); 00084 FENNEL_TRACE( 00085 TRACE_FINE, 00086 "Cache reserve percentage set to " << 00087 knobSettings.cacheReservePercentage << 00088 ". Per graph allocation is now " << perGraphAllocation << 00089 " cache pages."); 00090 break; 00091 } 00092 00093 return true; 00094 } 00095 00096 bool SimpleExecStreamGovernor::setResourceAvailability( 00097 ExecStreamResourceQuantity const &available, 00098 ExecStreamResourceType resourceType) 00099 { 00100 StrictMutexGuard mutexGuard(mutex); 00101 00102 switch (resourceType) { 00103 case EXEC_RESOURCE_CACHE_PAGES: 00104 { 00105 uint pagesAvailable = 00106 available.nCachePages * 00107 (100 - knobSettings.cacheReservePercentage) / 100; 00108 if (pagesAvailable < resourcesAssigned.nCachePages) { 00109 return false; 00110 } 00111 resourcesAvailable.nCachePages = 00112 (pagesAvailable - resourcesAssigned.nCachePages); 00113 perGraphAllocation = computePerGraphAllocation(); 00114 FENNEL_TRACE( 00115 TRACE_FINE, 00116 resourcesAvailable.nCachePages << 00117 " cache pages now available for assignment. " << 00118 "Per graph allocation is now " << perGraphAllocation << 00119 " cache pages."); 00120 break; 00121 } 00122 00123 case EXEC_RESOURCE_THREADS: 00124 resourcesAvailable.nThreads = available.nThreads; 00125 break; 00126 } 00127 00128 return true; 00129 } 00130 void SimpleExecStreamGovernor::requestResources(ExecStreamGraph &graph) 00131 { 00132 FENNEL_TRACE(TRACE_FINE, "requestResources"); 00133 00134 StrictMutexGuard mutexGuard(mutex); 00135 00136 std::vector sortedStreams = graph.getSortedStreams(); 00137 boost::scoped_array resourceReqts; 00138 boost::scoped_array sqrtDiffOptMin; 00139 uint nStreams = sortedStreams.size(); 00140 00141 resourceReqts.reset(new ExecStreamResourceRequirements[nStreams]); 00142 sqrtDiffOptMin.reset(new double[nStreams]); 00143 00144
00145
00146 uint allocationAmount = 00147 std::min(resourcesAvailable.nCachePages, perGraphAllocation); 00148 FENNEL_TRACE( 00149 TRACE_FINE, 00150 allocationAmount << " cache pages available for stream graph"); 00151 00152
00153
00154 uint totalMin = 0; 00155 uint totalOpt = 0; 00156 double totalSqrtDiffs = 0; 00157 bool allAccurate = true; 00158 for (uint i = 0; i < nStreams; i++) { 00159 ExecStreamResourceQuantity minQuantity, optQuantity; 00160 ExecStreamResourceSettingType optType; 00161 sortedStreams[i]->getResourceRequirements( 00162 minQuantity, optQuantity, optType); 00163 assert( 00164 optType == EXEC_RESOURCE_UNBOUNDED || 00165 minQuantity.nCachePages <= optQuantity.nCachePages); 00166 assert(minQuantity.nThreads <= optQuantity.nThreads); 00167 00168 ExecStreamResourceRequirements &reqt = resourceReqts[i]; 00169 reqt.minReqt = minQuantity.nCachePages; 00170 totalMin += reqt.minReqt; 00171 reqt.optType = optType; 00172 00173 switch (optType) { 00174 case EXEC_RESOURCE_ACCURATE: 00175 reqt.optReqt = optQuantity.nCachePages; 00176 sqrtDiffOptMin[i] = sqrt(double(reqt.optReqt - reqt.minReqt)); 00177 break; 00178 case EXEC_RESOURCE_ESTIMATE: 00179 reqt.optReqt = optQuantity.nCachePages; 00180 sqrtDiffOptMin[i] = sqrt(double(reqt.optReqt - reqt.minReqt)); 00181 allAccurate = false; 00182 break; 00183 case EXEC_RESOURCE_UNBOUNDED: 00184
00185
00186
00187
00188
00189
00190 sqrtDiffOptMin[i] = sqrt(double(allocationAmount)); 00191 allAccurate = false; 00192
00193
00194 reqt.optReqt = reqt.minReqt + allocationAmount; 00195 break; 00196 } 00197 totalOpt += reqt.optReqt; 00198 totalSqrtDiffs += sqrtDiffOptMin[i]; 00199 } 00200 00201
00202 if (totalMin > allocationAmount && 00203 totalMin > resourcesAvailable.nCachePages) 00204 { 00205 FENNEL_TRACE( 00206 TRACE_FINE, 00207 "Minimum request of " << totalMin << " cache pages not met"); 00208 throw ScratchMemExcn(); 00209 } 00210 00211 uint totalAssigned; 00212 00213
00214 if (totalMin >= allocationAmount) { 00215 assignCachePages(sortedStreams, resourceReqts, true); 00216 totalAssigned = totalMin; 00217 FENNEL_TRACE( 00218 TRACE_FINE, 00219 "Mininum request of " << totalMin << " cache pages assigned"); 00220 00221 } else if (totalOpt <= allocationAmount) { 00222
00223
00224 if (allAccurate) { 00225 assignCachePages(sortedStreams, resourceReqts, false); 00226 totalAssigned = totalOpt; 00227 FENNEL_TRACE( 00228 TRACE_FINE, 00229 "Optimum request of " << totalOpt << " cache pages assigned"); 00230 00231 } else { 00232
00233
00234
00235
00236
00237
00238 uint assigned = 00239 distributeCachePages( 00240 sortedStreams, resourceReqts, sqrtDiffOptMin, 00241 totalSqrtDiffs, allocationAmount - totalOpt, true); 00242 totalAssigned = assigned; 00243 FENNEL_TRACE( 00244 TRACE_FINE, 00245 assigned << 00246 " cache pages assigned, based on an optimum request for " << 00247 totalOpt << " cache pages"); 00248 } 00249 00250 } else { 00251
00252
00253 uint assigned = 00254 distributeCachePages( 00255 sortedStreams, resourceReqts, sqrtDiffOptMin, totalSqrtDiffs, 00256 allocationAmount - totalMin, false); 00257 totalAssigned = assigned; 00258 FENNEL_TRACE( 00259 TRACE_FINE, 00260 assigned << 00261 " cache pages assigned based on an optimum request for " << 00262 totalOpt << " cache pages"); 00263 } 00264 00265
00266 resourcesAssigned.nCachePages += totalAssigned; 00267 resourcesAvailable.nCachePages -= totalAssigned; 00268 SharedExecStreamResourceQuantity 00269 pQuantity(new ExecStreamResourceQuantity()); 00270 pQuantity->nCachePages = totalAssigned; 00271 resourceMap.insert( 00272 ExecStreamGraphResourceMap::value_type(&graph, pQuantity)); 00273 00274 FENNEL_TRACE( 00275 TRACE_FINE, 00276 resourcesAvailable.nCachePages << 00277 " cache pages remaining for assignment"); 00278 } 00279 00280 void SimpleExecStreamGovernor::assignCachePages( 00281 std::vector &streams, 00282 boost::scoped_array const &reqts, 00283 bool assignMin) 00284 { 00285 for (uint i = 0; i < streams.size(); i++) { 00286 ExecStreamResourceQuantity quantity; 00287 quantity.nCachePages = 00288 (assignMin) ? reqts[i].minReqt : reqts[i].optReqt; 00289 streams[i]->setResourceAllocation(quantity); 00290 if (isTracingLevel(TRACE_FINER)) { 00291 traceCachePageRequest( 00292 quantity.nCachePages, reqts[i], streams[i]->getName()); 00293 } 00294 } 00295 } 00296 00297 uint SimpleExecStreamGovernor::distributeCachePages( 00298 std::vector &streams, 00299 boost::scoped_array const &reqts, 00300 boost::scoped_array const &sqrtDiffOptMin, 00301 double totalSqrtDiffs, 00302 uint excessAvailable, bool assignOpt) 00303 { 00304
00305
00306
00307 if (assignOpt) { 00308 totalSqrtDiffs = 0; 00309 for (uint i = 0; i < streams.size(); i++) { 00310 if (reqts[i].optType != EXEC_RESOURCE_ACCURATE) { 00311 totalSqrtDiffs += sqrtDiffOptMin[i]; 00312 } 00313 } 00314 } 00315 00316 uint excessAssigned = 0; 00317 uint totalAssigned = 0; 00318 for (uint i = 0; i < streams.size(); i++) { 00319 uint amount; 00320 ExecStreamResourceRequirements &reqt = reqts[i]; 00321 if (assignOpt && reqt.optType == EXEC_RESOURCE_ACCURATE) { 00322 amount = 0; 00323 } else { 00324 amount = (uint) floor(excessAvailable * sqrtDiffOptMin[i] / 00325 totalSqrtDiffs); 00326 } 00327 assert(amount <= (excessAvailable - excessAssigned)); 00328 excessAssigned += amount; 00329 00330 ExecStreamResourceQuantity quantity; 00331 if (assignOpt) { 00332 assert(reqt.optType != EXEC_RESOURCE_UNBOUNDED); 00333 quantity.nCachePages = reqt.optReqt; 00334 } else { 00335 quantity.nCachePages = reqt.minReqt; 00336 } 00337 quantity.nCachePages += amount; 00338 totalAssigned += quantity.nCachePages; 00339 streams[i]->setResourceAllocation(quantity); 00340 if (isTracingLevel(TRACE_FINER)) { 00341 traceCachePageRequest( 00342 quantity.nCachePages, reqt, streams[i]->getName()); 00343 } 00344 } 00345 00346 return totalAssigned; 00347 } 00348 00349 void SimpleExecStreamGovernor::returnResources(ExecStreamGraph &graph) 00350 { 00351 StrictMutexGuard mutexGuard(mutex); 00352 00353 ExecStreamGraphResourceMap::const_iterator iter = resourceMap.find(&graph); 00354 if (iter == resourceMap.end()) { 00355
00356 return; 00357 } 00358 SharedExecStreamResourceQuantity pQuantity = iter->second; 00359 resourcesAssigned.nCachePages -= pQuantity->nCachePages; 00360 resourcesAvailable.nCachePages += pQuantity->nCachePages; 00361 FENNEL_TRACE( 00362 TRACE_FINE, 00363 "Returned " << pQuantity->nCachePages << " cache pages. " << 00364 resourcesAvailable.nCachePages << 00365 " cache pages now available for assignment"); 00366 00367 resourceMap.erase(&graph); 00368 } 00369 00370 FENNEL_END_CPPFILE("$Id: //open/dev/fennel/exec/SimpleExecStreamGovernor.cpp#9 $"); 00371 00372