LLVM: lib/Support/Parallel.cpp Source File (original) (raw)
1
2
3
4
5
6
7
8
11#include "llvm/Config/llvm-config.h"
16
17#include
18#include
19#include
20#include
21#include
22#include
23
25
26namespace llvm {
28#if LLVM_ENABLE_THREADS
29
30#ifdef _WIN32
31static thread_local unsigned threadIndex = UINT_MAX;
32
34#else
35thread_local unsigned threadIndex = UINT_MAX;
36#endif
37
39
40namespace {
41
42
43class Executor {
44public:
45 virtual ~Executor() = default;
46 virtual void add(std::function<void()> func) = 0;
48
49 static Executor *getDefaultExecutor();
50};
51
52
53
54class ThreadPoolExecutor : public Executor {
55public:
59
61
62
64 Threads.resize(1);
65 std::lock_guardstd::mutex Lock(Mutex);
66
67
68 auto &Thread0 = Threads[0];
69 Thread0 = std::thread([this, S] {
71 Threads.emplace_back([this, S, I] { work(S, I); });
72 if (Stop)
73 break;
74 }
75 ThreadsCreated.set_value();
76 work(S, 0);
77 });
78 }
79
80
81
82 ThreadPoolExecutor() = delete;
83
84 void stop() {
85 {
86 std::lock_guardstd::mutex Lock(Mutex);
87 if (Stop)
88 return;
89 Stop = true;
90 }
91 Cond.notify_all();
92 ThreadsCreated.get_future().wait();
93 }
94
95 ~ThreadPoolExecutor() override {
96 stop();
97 std::thread::id CurrentThreadId = std::this_thread::get_id();
98 for (std::thread &T : Threads)
99 if (T.get_id() == CurrentThreadId)
100 T.detach();
101 else
102 T.join();
103 }
104
105 struct Creator {
106 static void *call() { return new ThreadPoolExecutor(strategy); }
107 };
108 struct Deleter {
109 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
110 };
111
112 void add(std::function<void()> F) override {
113 {
114 std::lock_guardstd::mutex Lock(Mutex);
115 WorkStack.push_back(std::move(F));
116 }
117 Cond.notify_one();
118 }
119
121
122private:
124 threadIndex = ThreadID;
126
127
128
129
130
131
132 static thread_local std::unique_ptr Backoff;
133
134 while (true) {
135 if (TheJobserver) {
136
137
138
139
140
141
142
143
144
147 do {
148 if (Stop)
149 return;
150 Slot = TheJobserver->tryAcquire();
151 if (Slot.isValid())
152 break;
154
156 [&] { TheJobserver->release(std::move(Slot)); });
157
158 while (true) {
159 std::function<void()> Task;
160 {
161 std::unique_lockstd::mutex Lock(Mutex);
162 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
163 if (Stop && WorkStack.empty())
164 return;
165 if (WorkStack.empty())
166 break;
167 Task = std::move(WorkStack.back());
168 WorkStack.pop_back();
169 }
170 Task();
171 }
172 } else {
173 std::unique_lockstd::mutex Lock(Mutex);
174 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
175 if (Stop)
176 break;
177 auto Task = std::move(WorkStack.back());
178 WorkStack.pop_back();
179 Lock.unlock();
180 Task();
181 }
182 }
183 }
184
185 std::atomic Stop{false};
186 std::vector<std::function<void()>> WorkStack;
187 std::mutex Mutex;
188 std::condition_variable Cond;
189 std::promise ThreadsCreated;
190 std::vectorstd::thread Threads;
192
194};
195
196Executor *Executor::getDefaultExecutor() {
197#ifdef _WIN32
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
217 ThreadPoolExecutor::Deleter>
218 ManagedExec;
219 static std::unique_ptr Exec(&(*ManagedExec));
220 return Exec.get();
221#else
222
223
224
225
226 static ThreadPoolExecutor Exec(strategy);
227 return &Exec;
228#endif
229}
230}
231}
232
234 return detail::Executor::getDefaultExecutor()->getThreadCount();
235}
236#endif
237
238
239
240
241
243#if LLVM_ENABLE_THREADS
245 (threadIndex == UINT_MAX)) {}
246#else
247 : Parallel(false) {}
248#endif
250
251
252 L.sync();
253}
254
256#if LLVM_ENABLE_THREADS
257 if (Parallel) {
258 L.inc();
259 detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
260 F();
261 L.dec();
262 });
263 return;
264 }
265#endif
266 F();
267}
268
269}
270}
271
274#if LLVM_ENABLE_THREADS
276 auto NumItems = End - Begin;
277
278
279 auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
280 if (TaskSize == 0)
281 TaskSize = 1;
282
284 for (; Begin + TaskSize < End; Begin += TaskSize) {
285 TG.spawn([=, &Fn] {
286 for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
287 Fn(I);
288 });
289 }
290 if (Begin != End) {
291 TG.spawn([=, &Fn] {
292 for (size_t I = Begin; I != End; ++I)
293 Fn(I);
294 });
295 }
296 return;
297 }
298#endif
299
300 for (; Begin != End; ++Begin)
301 Fn(Begin);
302}
static GCRegistry::Add< CoreCLRGC > E("coreclr", "CoreCLR-compatible GC")
const SmallVectorImpl< MachineOperand > & Cond
This file defines the make_scope_exit function, which executes user-defined cleanup logic at scope ex...
static cl::opt< int > ThreadCount("threads", cl::init(0))
A class to help implement exponential backoff.
LLVM_ABI bool waitForNextAttempt()
Blocks while waiting for the next attempt.
A JobSlot represents a single job slot that can be acquired from or released to a jobserver pool.
The public interface for a jobserver client.
static LLVM_ABI_FOR_TEST JobserverClient * getInstance()
Returns the singleton instance of the JobserverClient.
ManagedStatic - This transparently changes the behavior of global statics to be lazily constructed on...
This tells how a thread pool will be used.
LLVM_ABI void apply_thread_strategy(unsigned ThreadPoolNum) const
Assign the current thread to an ideal hardware CPU or NUMA node.
LLVM_ABI unsigned compute_thread_count() const
Retrieves the max available threads for the current strategy.
bool UseJobserver
If true, the thread pool will attempt to coordinate with a GNU Make jobserver, acquiring a job slot b...
An efficient, type-erasing, non-owning reference to a callable.
LLVM_ABI ~TaskGroup()
Definition Parallel.cpp:249
LLVM_ABI TaskGroup()
Definition Parallel.cpp:242
LLVM_ABI void spawn(std::function< void()> f)
Definition Parallel.cpp:255
LLVM_ABI ThreadPoolStrategy strategy
Definition Parallel.cpp:24
unsigned getThreadIndex()
SmartMutex< false > Mutex
Mutex - A standard, always enforced mutex.
This is an optimization pass for GlobalISel generic memory operations.
detail::scope_exit< std::decay_t< Callable > > make_scope_exit(Callable &&F)
LLVM_ABI void parallelFor(size_t Begin, size_t End, function_ref< void(size_t)> Fn)
Definition Parallel.cpp:272