LLVM: lib/Support/Parallel.cpp Source File (original) (raw)

1

2

3

4

5

6

7

8

11#include "llvm/Config/llvm-config.h"

16

17#include

18#include

19#include

20#include

21#include

22#include

23

25

26namespace llvm {

28#if LLVM_ENABLE_THREADS

29

30#ifdef _WIN32

31static thread_local unsigned threadIndex = UINT_MAX;

32

34#else

35thread_local unsigned threadIndex = UINT_MAX;

36#endif

37

39

40namespace {

41

42

43class Executor {

44public:

45 virtual ~Executor() = default;

46 virtual void add(std::function<void()> func) = 0;

48

49 static Executor *getDefaultExecutor();

50};

51

52

53

54class ThreadPoolExecutor : public Executor {

55public:

59

61

62

64 Threads.resize(1);

65 std::lock_guardstd::mutex Lock(Mutex);

66

67

68 auto &Thread0 = Threads[0];

69 Thread0 = std::thread([this, S] {

71 Threads.emplace_back([this, S, I] { work(S, I); });

72 if (Stop)

73 break;

74 }

75 ThreadsCreated.set_value();

76 work(S, 0);

77 });

78 }

79

80

81

82 ThreadPoolExecutor() = delete;

83

84 void stop() {

85 {

86 std::lock_guardstd::mutex Lock(Mutex);

87 if (Stop)

88 return;

89 Stop = true;

90 }

91 Cond.notify_all();

92 ThreadsCreated.get_future().wait();

93 }

94

95 ~ThreadPoolExecutor() override {

96 stop();

97 std::thread::id CurrentThreadId = std::this_thread::get_id();

98 for (std::thread &T : Threads)

99 if (T.get_id() == CurrentThreadId)

100 T.detach();

101 else

102 T.join();

103 }

104

105 struct Creator {

106 static void *call() { return new ThreadPoolExecutor(strategy); }

107 };

108 struct Deleter {

109 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }

110 };

111

112 void add(std::function<void()> F) override {

113 {

114 std::lock_guardstd::mutex Lock(Mutex);

115 WorkStack.push_back(std::move(F));

116 }

117 Cond.notify_one();

118 }

119

121

122private:

124 threadIndex = ThreadID;

126

127

128

129

130

131

132 static thread_local std::unique_ptr Backoff;

133

134 while (true) {

135 if (TheJobserver) {

136

137

138

139

140

141

142

143

144

147 do {

148 if (Stop)

149 return;

150 Slot = TheJobserver->tryAcquire();

151 if (Slot.isValid())

152 break;

154

156 [&] { TheJobserver->release(std::move(Slot)); });

157

158 while (true) {

159 std::function<void()> Task;

160 {

161 std::unique_lockstd::mutex Lock(Mutex);

162 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });

163 if (Stop && WorkStack.empty())

164 return;

165 if (WorkStack.empty())

166 break;

167 Task = std::move(WorkStack.back());

168 WorkStack.pop_back();

169 }

170 Task();

171 }

172 } else {

173 std::unique_lockstd::mutex Lock(Mutex);

174 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });

175 if (Stop)

176 break;

177 auto Task = std::move(WorkStack.back());

178 WorkStack.pop_back();

179 Lock.unlock();

180 Task();

181 }

182 }

183 }

184

185 std::atomic Stop{false};

186 std::vector<std::function<void()>> WorkStack;

187 std::mutex Mutex;

188 std::condition_variable Cond;

189 std::promise ThreadsCreated;

190 std::vectorstd::thread Threads;

192

194};

195

196Executor *Executor::getDefaultExecutor() {

197#ifdef _WIN32

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,

217 ThreadPoolExecutor::Deleter>

218 ManagedExec;

219 static std::unique_ptr Exec(&(*ManagedExec));

220 return Exec.get();

221#else

222

223

224

225

226 static ThreadPoolExecutor Exec(strategy);

227 return &Exec;

228#endif

229}

230}

231}

232

234 return detail::Executor::getDefaultExecutor()->getThreadCount();

235}

236#endif

237

238

239

240

241

243#if LLVM_ENABLE_THREADS

245 (threadIndex == UINT_MAX)) {}

246#else

247 : Parallel(false) {}

248#endif

250

251

252 L.sync();

253}

254

256#if LLVM_ENABLE_THREADS

257 if (Parallel) {

258 L.inc();

259 detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {

260 F();

261 L.dec();

262 });

263 return;

264 }

265#endif

266 F();

267}

268

269}

270}

271

274#if LLVM_ENABLE_THREADS

276 auto NumItems = End - Begin;

277

278

279 auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;

280 if (TaskSize == 0)

281 TaskSize = 1;

282

284 for (; Begin + TaskSize < End; Begin += TaskSize) {

285 TG.spawn([=, &Fn] {

286 for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)

287 Fn(I);

288 });

289 }

290 if (Begin != End) {

291 TG.spawn([=, &Fn] {

292 for (size_t I = Begin; I != End; ++I)

293 Fn(I);

294 });

295 }

296 return;

297 }

298#endif

299

300 for (; Begin != End; ++Begin)

301 Fn(Begin);

302}

static GCRegistry::Add< CoreCLRGC > E("coreclr", "CoreCLR-compatible GC")

const SmallVectorImpl< MachineOperand > & Cond

This file defines the make_scope_exit function, which executes user-defined cleanup logic at scope ex...

static cl::opt< int > ThreadCount("threads", cl::init(0))

A class to help implement exponential backoff.

LLVM_ABI bool waitForNextAttempt()

Blocks while waiting for the next attempt.

A JobSlot represents a single job slot that can be acquired from or released to a jobserver pool.

The public interface for a jobserver client.

static LLVM_ABI_FOR_TEST JobserverClient * getInstance()

Returns the singleton instance of the JobserverClient.

ManagedStatic - This transparently changes the behavior of global statics to be lazily constructed on...

This tells how a thread pool will be used.

LLVM_ABI void apply_thread_strategy(unsigned ThreadPoolNum) const

Assign the current thread to an ideal hardware CPU or NUMA node.

LLVM_ABI unsigned compute_thread_count() const

Retrieves the max available threads for the current strategy.

bool UseJobserver

If true, the thread pool will attempt to coordinate with a GNU Make jobserver, acquiring a job slot b...

An efficient, type-erasing, non-owning reference to a callable.

LLVM_ABI ~TaskGroup()

Definition Parallel.cpp:249

LLVM_ABI TaskGroup()

Definition Parallel.cpp:242

LLVM_ABI void spawn(std::function< void()> f)

Definition Parallel.cpp:255

LLVM_ABI ThreadPoolStrategy strategy

Definition Parallel.cpp:24

unsigned getThreadIndex()

SmartMutex< false > Mutex

Mutex - A standard, always enforced mutex.

This is an optimization pass for GlobalISel generic memory operations.

detail::scope_exit< std::decay_t< Callable > > make_scope_exit(Callable &&F)

LLVM_ABI void parallelFor(size_t Begin, size_t End, function_ref< void(size_t)> Fn)

Definition Parallel.cpp:272