LLVM: lib/Support/ThreadPool.cpp Source File (original) (raw)

1

2

3

4

5

6

7

8

9

10

11

12

14

15#include "llvm/Config/llvm-config.h"

16

20

21using namespace llvm;

22

24

25

26

27

28

29

30

31

32

33#if LLVM_ENABLE_THREADS

34

36 : Strategy(S), MaxThreadCount(S.compute_thread_count()) {}

37

38void StdThreadPool::grow(int requested) {

40 if (Threads.size() >= MaxThreadCount)

41 return;

42 int newThreadCount = std::min(requested, MaxThreadCount);

43 while (static_cast<int>(Threads.size()) < newThreadCount) {

44 int ThreadID = Threads.size();

45 Threads.emplace_back([this, ThreadID] {

47 Strategy.apply_thread_strategy(ThreadID);

48 processTasks(nullptr);

49 });

50 }

51}

52

53#ifndef NDEBUG

54

56 *CurrentThreadTaskGroups = nullptr;

57#endif

58

59

61 while (true) {

62 std::function<void()> Task;

64 {

65 std::unique_lockstd::mutex LockGuard(QueueLock);

66 bool workCompletedForGroup = false;

67

68 QueueCondition.wait(LockGuard, [&] {

69 return !EnableFlag || !Tasks.empty() ||

70 (WaitingForGroup != nullptr &&

71 (workCompletedForGroup =

72 workCompletedUnlocked(WaitingForGroup)));

73 });

74

75 if (!EnableFlag && Tasks.empty())

76 return;

77 if (WaitingForGroup != nullptr && workCompletedForGroup)

78 return;

79

80

81

82

83

84 ++ActiveThreads;

85 Task = std::move(Tasks.front().first);

86 GroupOfTask = Tasks.front().second;

87

88

89 if (GroupOfTask != nullptr)

90 ++ActiveGroups[GroupOfTask];

91 Tasks.pop_front();

92 }

93#ifndef NDEBUG

94 if (CurrentThreadTaskGroups == nullptr)

95 CurrentThreadTaskGroups = new std::vector<ThreadPoolTaskGroup *>;

96 CurrentThreadTaskGroups->push_back(GroupOfTask);

97#endif

98

99

100 Task();

101

102#ifndef NDEBUG

103 CurrentThreadTaskGroups->pop_back();

104 if (CurrentThreadTaskGroups->empty()) {

105 delete CurrentThreadTaskGroups;

106 CurrentThreadTaskGroups = nullptr;

107 }

108#endif

109

110 bool Notify;

111 bool NotifyGroup;

112 {

113

114 std::lock_guardstd::mutex LockGuard(QueueLock);

115 --ActiveThreads;

116 if (GroupOfTask != nullptr) {

117 auto A = ActiveGroups.find(GroupOfTask);

118 if (--(A->second) == 0)

119 ActiveGroups.erase(A);

120 }

121 Notify = workCompletedUnlocked(GroupOfTask);

122 NotifyGroup = GroupOfTask != nullptr && Notify;

123 }

124

125

126 if (Notify)

127 CompletionCondition.notify_all();

128

129

130

131 if (NotifyGroup)

132 QueueCondition.notify_all();

133 }

134}

135

136bool StdThreadPool::workCompletedUnlocked(ThreadPoolTaskGroup *Group) const {

137 if (Group == nullptr)

138 return !ActiveThreads && Tasks.empty();

139 return ActiveGroups.count(Group) == 0 &&

141 [Group](const auto &T) { return T.second == Group; });

142}

143

144void StdThreadPool::wait() {

145 assert(!isWorkerThread());

146

147 std::unique_lockstd::mutex LockGuard(QueueLock);

148 CompletionCondition.wait(LockGuard,

149 [&] { return workCompletedUnlocked(nullptr); });

150}

151

153

154 if (!isWorkerThread()) {

155 std::unique_lockstd::mutex LockGuard(QueueLock);

156 CompletionCondition.wait(LockGuard,

157 [&] { return workCompletedUnlocked(&Group); });

158 return;

159 }

160

161 assert(CurrentThreadTaskGroups == nullptr ||

163

164

165

166 processTasks(&Group);

167}

168

169bool StdThreadPool::isWorkerThread() const {

171 llvm::thread::id CurrentThreadId = llvm::this_thread::get_id();

173 if (CurrentThreadId == Thread.get_id())

174 return true;

175 return false;

176}

177

178

179StdThreadPool::~StdThreadPool() {

180 {

181 std::unique_lockstd::mutex LockGuard(QueueLock);

182 EnableFlag = false;

183 }

184 QueueCondition.notify_all();

186 for (auto &Worker : Threads)

187 Worker.join();

188}

189

190#endif

191

192

196 errs() << "Warning: request a ThreadPool with " << ThreadCount

197 << " threads, but LLVM_ENABLE_THREADS has been turned off\n";

198 }

199}

200

202

203 while (!Tasks.empty()) {

204 auto Task = std::move(Tasks.front().first);

205 Tasks.pop_front();

206 Task();

207 }

208}

209

211

212

214}

215

218}

219

static GCRegistry::Add< ErlangGC > A("erlang", "erlang-compatible garbage collector")

#define LLVM_THREAD_LOCAL

\macro LLVM_THREAD_LOCAL A thread-local storage specifier which can be used with globals,...

assert(ImpDefSCC.getReg()==AMDGPU::SCC &&ImpDefSCC.isDef())

static cl::opt< int > ThreadCount("threads", cl::init(0))

SingleThreadExecutor(ThreadPoolStrategy ignored={})

Construct a non-threaded pool, ignoring using the hardware strategy.

void wait() override

Blocking wait for all the tasks to execute first.

~SingleThreadExecutor() override

Blocking destructor: the pool will first execute the pending tasks.

bool isWorkerThread() const

Returns true if the current thread is a worker thread of this thread pool.

virtual ~ThreadPoolInterface()

Destroying the pool will drain the pending tasks and wait.

This tells how a thread pool will be used.

unsigned compute_thread_count() const

Retrieves the max available threads for the current strategy.

A group of tasks to be run on a thread pool.

SmartScopedReader< false > ScopedReader

SmartScopedWriter< false > ScopedWriter

This is an optimization pass for GlobalISel generic memory operations.

bool any_of(R &&range, UnaryPredicate P)

Provide wrappers to std::any_of which take ranges instead of having to pass begin/end explicitly.

auto formatv(bool Validate, const char *Fmt, Ts &&...Vals)

void report_fatal_error(Error Err, bool gen_crash_diag=true)

Report a serious error, calling any installed error handler.

void set_thread_name(const Twine &Name)

Set the name of the current thread.

raw_fd_ostream & errs()

This returns a reference to a raw_ostream for standard error.

bool is_contained(R &&Range, const E &Element)

Returns true if Element is found in Range.