Grok 12.0.1
thread_pool.h
Go to the documentation of this file.
1// Copyright 2023 Google LLC
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16// Modified from BSD-licensed code
17// Copyright (c) the JPEG XL Project Authors. All rights reserved.
18// See https://github.com/libjxl/libjxl/blob/main/LICENSE.
19
20#ifndef HIGHWAY_HWY_CONTRIB_THREAD_POOL_THREAD_POOL_H_
21#define HIGHWAY_HWY_CONTRIB_THREAD_POOL_THREAD_POOL_H_
22
23// IWYU pragma: begin_exports
24#include <stddef.h>
25#include <stdint.h>
26#include <stdio.h> // snprintf
27
28#include <array>
29#include <thread> //NOLINT
30// IWYU pragma: end_exports
31
32#include <atomic>
33#include <vector>
34
35#include "hwy/aligned_allocator.h" // HWY_ALIGNMENT
36#include "hwy/base.h"
37#include "hwy/cache_control.h" // Pause
39
40// Temporary NOINLINE for profiling.
41#define HWY_POOL_INLINE HWY_NOINLINE
42
43#ifndef HWY_POOL_SETRANGE_INLINE
44#if HWY_ARCH_ARM
45// Workaround for invalid codegen on Arm (begin_ is larger than expected).
46#define HWY_POOL_SETRANGE_INLINE HWY_NOINLINE
47#else
48#define HWY_POOL_SETRANGE_INLINE
49#endif
50#endif // HWY_POOL_SETRANGE_INLINE
51
52namespace hwy {
53
54// Generates a random permutation of [0, size). O(1) storage.
56 public:
57 ShuffledIota() : coprime_(1) {} // for PoolWorker
58 explicit ShuffledIota(uint32_t coprime) : coprime_(coprime) {}
59
60 // Returns the next after `current`, using an LCG-like generator.
61 uint32_t Next(uint32_t current, const Divisor& divisor) const {
62 HWY_DASSERT(current < divisor.GetDivisor());
63 // (coprime * i + current) % size, see https://lemire.me/blog/2017/09/18/.
64 return divisor.Remainder(current + coprime_);
65 }
66
67 // Returns true if a and b have no common denominator except 1. Based on
68 // binary GCD. Assumes a and b are nonzero. Also used in tests.
69 static bool CoprimeNonzero(uint32_t a, uint32_t b) {
70 const size_t trailing_a = Num0BitsBelowLS1Bit_Nonzero32(a);
71 const size_t trailing_b = Num0BitsBelowLS1Bit_Nonzero32(b);
72 // If both have at least one trailing zero, they are both divisible by 2.
73 if (HWY_MIN(trailing_a, trailing_b) != 0) return false;
74
75 // If one of them has a trailing zero, shift it out.
76 a >>= trailing_a;
77 b >>= trailing_b;
78
79 for (;;) {
80 // Swap such that a >= b.
81 const uint32_t tmp_a = a;
82 a = HWY_MAX(tmp_a, b);
83 b = HWY_MIN(tmp_a, b);
84
85 // When the smaller number is 1, they were coprime.
86 if (b == 1) return true;
87
88 a -= b;
89 // a == b means there was a common factor, so not coprime.
90 if (a == 0) return false;
92 }
93 }
94
95 // Returns another coprime >= `start`, or 1 for small `size`.
96 // Used to seed independent ShuffledIota instances.
97 static uint32_t FindAnotherCoprime(uint32_t size, uint32_t start) {
98 if (size <= 2) {
99 return 1;
100 }
101
102 // Avoids even x for even sizes, which are sure to be rejected.
103 const uint32_t inc = (size & 1) ? 1 : 2;
104
105 for (uint32_t x = start | 1; x < start + size * 16; x += inc) {
106 if (CoprimeNonzero(x, static_cast<uint32_t>(size))) {
107 return x;
108 }
109 }
110
111 HWY_ABORT("unreachable");
112 }
113
114 uint32_t coprime_;
115};
116
117// We want predictable struct/class sizes so we can reason about cache lines.
118#pragma pack(push, 1)
119
120enum class PoolWaitMode : uint32_t { kBlock, kSpin };
121
122// Worker's private working set.
123class PoolWorker { // HWY_ALIGNMENT bytes
124 static constexpr size_t kMaxVictims = 4;
125
126 public:
127 PoolWorker(size_t thread, size_t num_workers) {
129 num_victims_ = static_cast<uint32_t>(HWY_MIN(kMaxVictims, num_workers));
130
131 const Divisor div_workers(static_cast<uint32_t>(num_workers));
132
133 // Increase gap between coprimes to reduce collisions.
134 const uint32_t coprime = ShuffledIota::FindAnotherCoprime(
135 static_cast<uint32_t>(num_workers),
136 static_cast<uint32_t>((thread + 1) * 257 + thread * 13));
137 const ShuffledIota shuffled_iota(coprime);
138
139 // To simplify WorkerRun, our own thread is the first to 'steal' from.
140 victims_[0] = static_cast<uint32_t>(thread);
141 for (uint32_t i = 1; i < num_victims_; ++i) {
142 victims_[i] = shuffled_iota.Next(victims_[i - 1], div_workers);
143 HWY_DASSERT(victims_[i] != thread);
144 }
145
146 (void)padding_;
147 }
148 ~PoolWorker() = default;
149
150 void SetWaitMode(PoolWaitMode wait_mode) {
151 wait_mode_.store(wait_mode, std::memory_order_release);
152 }
154 return wait_mode_.load(std::memory_order_acquire);
155 }
156
159 static_cast<size_t>(num_victims_));
160 }
161
162 // Called from main thread in Plan().
163 HWY_POOL_SETRANGE_INLINE void SetRange(uint64_t begin, uint64_t end) {
164 const auto rel = std::memory_order_release;
165 begin_.store(begin, rel);
166 end_.store(end, rel);
167 }
168
169 // Returns the STL-style end of this worker's assigned range.
170 uint64_t WorkerGetEnd() const { return end_.load(std::memory_order_acquire); }
171
172 // Returns the next task to execute. If >= WorkerGetEnd(), it must be skipped.
173 uint64_t WorkerReserveTask() {
174 return begin_.fetch_add(1, std::memory_order_relaxed);
175 }
176
177 private:
178 std::atomic<uint64_t> begin_;
179 std::atomic<uint64_t> end_; // only changes during SetRange
180
181 std::atomic<PoolWaitMode> wait_mode_; // (32-bit)
182 uint32_t num_victims_; // <= kPoolMaxVictims
183 std::array<uint32_t, kMaxVictims> victims_;
184
185 uint8_t padding_[HWY_ALIGNMENT - 16 - 8 - sizeof(victims_)];
186};
187static_assert(sizeof(PoolWorker) == HWY_ALIGNMENT, "");
188
189// Modified by main thread, shared with all workers.
190class PoolTasks { // 32 bytes
191 // Signature of the (internal) function called from workers(s) for each
192 // `task` in the [`begin`, `end`) passed to Run(). Closures (lambdas) do not
193 // receive the first argument, which points to the lambda object.
194 typedef void (*RunFunc)(const void* opaque, uint64_t task, size_t thread_id);
195
196 // Calls closure(task, thread). Signature must match RunFunc.
197 template <class Closure>
198 static void CallClosure(const void* opaque, uint64_t task, size_t thread) {
199 (*reinterpret_cast<const Closure*>(opaque))(task, thread);
200 }
201
202 public:
203 // Called from main thread in Plan().
204 template <class Closure>
205 void Store(const Closure& closure, uint64_t begin, uint64_t end) {
206 const auto rel = std::memory_order_release;
207 func_.store(static_cast<RunFunc>(&CallClosure<Closure>), rel);
208 opaque_.store(reinterpret_cast<const void*>(&closure), rel);
209 begin_.store(begin, rel);
210 end_.store(end, rel);
211 }
212
213 RunFunc WorkerGet(uint64_t& begin, uint64_t& end, const void*& opaque) const {
214 const auto acq = std::memory_order_acquire;
215 begin = begin_.load(acq);
216 end = end_.load(acq);
217 opaque = opaque_.load(acq);
218 return func_.load(acq);
219 }
220
221 private:
222 std::atomic<RunFunc> func_;
223 std::atomic<const void*> opaque_;
224 std::atomic<uint64_t> begin_;
225 std::atomic<uint64_t> end_;
226};
227
228// Modified by main thread, shared with all workers.
229class PoolCommands { // 16 bytes
230 static constexpr uint32_t kInitial = 0;
231 static constexpr uint32_t kMask = 0xF; // for command, rest is ABA counter.
232 static constexpr size_t kShift = hwy::CeilLog2(kMask);
233
234 public:
235 static constexpr uint32_t kTerminate = 1;
236 static constexpr uint32_t kWork = 2;
237 static constexpr uint32_t kNop = 3;
238
239 // Workers must initialize their copy to this so that they wait for the first
240 // command as intended.
241 static uint32_t WorkerInitialSeqCmd() { return kInitial; }
242
243 // Sends `cmd` to all workers.
244 void Broadcast(uint32_t cmd) {
245 HWY_DASSERT(cmd <= kMask);
246 const uint32_t epoch = ++epoch_;
247 const uint32_t seq_cmd = (epoch << kShift) | cmd;
248 seq_cmd_.store(seq_cmd, std::memory_order_release);
249
250 // Wake any worker whose wait_mode_ is or was kBlock.
252
253 // Workers are either starting up, or waiting for a command. Either way,
254 // they will not miss this command, so no need to wait for them here.
255 }
256
257 // Returns the command, i.e., one of the public constants, e.g., kTerminate.
259 uint32_t& prev_seq_cmd) {
260 uint32_t seq_cmd;
261 if (HWY_LIKELY(wait_mode == PoolWaitMode::kSpin)) {
262 seq_cmd = SpinUntilDifferent(prev_seq_cmd, seq_cmd_);
263 } else {
264 seq_cmd = BlockUntilDifferent(prev_seq_cmd, seq_cmd_);
265 }
266 prev_seq_cmd = seq_cmd;
267 return seq_cmd & kMask;
268 }
269
270 private:
272 const uint32_t prev_seq_cmd, std::atomic<uint32_t>& current) {
273 for (;;) {
274 hwy::Pause();
275 const uint32_t seq_cmd = current.load(std::memory_order_acquire);
276 if (seq_cmd != prev_seq_cmd) return seq_cmd;
277 }
278 }
279
280 // Counter for ABA-proofing WorkerWaitForNewCommand. Stored next to seq_cmd_
281 // because both are written at the same time by the main thread. Sharding this
282 // 4x (one per cache line) is not helpful.
283 uint32_t epoch_{0};
284 std::atomic<uint32_t> seq_cmd_{kInitial};
285};
286
287// Modified by main thread AND workers.
288// TODO(janwas): more scalable tree
289class alignas(HWY_ALIGNMENT) PoolBarrier { // 4 * HWY_ALIGNMENT bytes
290 static constexpr size_t kU64PerCacheLine = HWY_ALIGNMENT / sizeof(uint64_t);
291
292 public:
293 void Reset() {
294 for (size_t i = 0; i < 4; ++i) {
295 num_finished_[i * kU64PerCacheLine].store(0, std::memory_order_release);
296 }
297 }
298
299 void WorkerArrive(size_t thread) {
300 const size_t i = (thread & 3);
301 num_finished_[i * kU64PerCacheLine].fetch_add(1, std::memory_order_release);
302 }
303
304 // Spin until all have called Arrive(). Note that workers spin for a new
305 // command, not the barrier itself.
306 HWY_POOL_INLINE void WaitAll(size_t num_workers) {
307 const auto acq = std::memory_order_acquire;
308 for (;;) {
309 hwy::Pause();
310 const uint64_t sum = num_finished_[0 * kU64PerCacheLine].load(acq) +
311 num_finished_[1 * kU64PerCacheLine].load(acq) +
312 num_finished_[2 * kU64PerCacheLine].load(acq) +
313 num_finished_[3 * kU64PerCacheLine].load(acq);
314 if (sum == num_workers) break;
315 }
316 }
317
318 private:
319 // Sharded to reduce contention. Four counters, each in their own cache line.
320 std::atomic<uint64_t> num_finished_[4 * kU64PerCacheLine];
321};
322
323// All mutable pool and worker state.
324struct alignas(HWY_ALIGNMENT) PoolMem {
325 PoolWorker& Worker(size_t thread) {
326 return *reinterpret_cast<PoolWorker*>(reinterpret_cast<uint8_t*>(&barrier) +
327 sizeof(barrier) +
328 thread * sizeof(PoolWorker));
329 }
330
333 // barrier is more write-heavy, hence keep in another cache line.
334 uint8_t padding[HWY_ALIGNMENT - sizeof(tasks) - sizeof(commands)];
335
337 static_assert(sizeof(barrier) % HWY_ALIGNMENT == 0, "");
338
339 // Followed by `num_workers` PoolWorker.
340};
341
342// Aligned allocation and initialization of variable-length PoolMem.
344 public:
345 explicit PoolMemOwner(size_t num_threads)
346 // There is at least one worker, the main thread.
347 : num_workers_(HWY_MAX(num_threads, size_t{1})) {
348 const size_t size = sizeof(PoolMem) + num_workers_ * sizeof(PoolWorker);
349 bytes_ = hwy::AllocateAligned<uint8_t>(size);
350 HWY_ASSERT(bytes_);
351 mem_ = new (bytes_.get()) PoolMem();
352
353 for (size_t thread = 0; thread < num_workers_; ++thread) {
354 new (&mem_->Worker(thread)) PoolWorker(thread, num_workers_);
355 }
356
357 // Publish non-atomic stores in mem_ - that is the only shared state workers
358 // access before they call WorkerWaitForNewCommand.
359 std::atomic_thread_fence(std::memory_order_release);
360 }
361
363 for (size_t thread = 0; thread < num_workers_; ++thread) {
364 mem_->Worker(thread).~PoolWorker();
365 }
366 mem_->~PoolMem();
367 }
368
369 size_t NumWorkers() const { return num_workers_; }
370
371 PoolMem* Mem() const { return mem_; }
372
373 private:
374 const size_t num_workers_; // >= 1
375 // Aligned allocation ensures we do not straddle cache lines.
378};
379
380// Plans and executes parallel-for loops with work-stealing. No synchronization
381// because there is no mutable shared state.
382class ParallelFor { // 0 bytes
383 // A prior version of this code attempted to assign only as much work as a
384 // thread will actually use. As with OpenMP's 'guided' strategy, we assigned
385 // remaining/(k*num_threads) in each iteration. Although the worst-case
386 // imbalance is bounded, this required several rounds of work allocation, and
387 // the atomic counter did not scale to > 30 threads.
388 //
389 // We now use work stealing instead, where already-finished threads look for
390 // and perform work from others, as if they were that thread. This deals with
391 // imbalances as they arise, but care is required to reduce contention. We
392 // randomize the order in which threads choose victims to steal from.
393 //
394 // Results: across 10K calls Run(), we observe a mean of 5.1 tasks per
395 // thread, and standard deviation 0.67, indicating good load-balance.
396
397 public:
398 // Make preparations for workers to later run `closure(i)` for all `i` in
399 // `[begin, end)`. Called from the main thread; workers are initializing or
400 // spinning for a command. Returns false if there are no tasks or workers.
401 template <class Closure>
402 static bool Plan(uint64_t begin, uint64_t end, size_t num_workers,
403 const Closure& closure, PoolMem& mem) {
404 // If there are no tasks, we are done.
405 HWY_DASSERT(begin <= end);
406 const size_t num_tasks = static_cast<size_t>(end - begin);
407 if (HWY_UNLIKELY(num_tasks == 0)) return false;
408
409 // If there are no workers, run all tasks already on the main thread without
410 // the overhead of planning.
411 if (HWY_UNLIKELY(num_workers <= 1)) {
412 for (uint64_t task = begin; task < end; ++task) {
413 closure(task, /*thread=*/0);
414 }
415 return false;
416 }
417
418 // Store for later retrieval by all workers in WorkerRun. Must happen after
419 // the loop above because it may be re-entered by concurrent threads.
420 mem.tasks.Store(closure, begin, end);
421
422 // Assigning all remainders to the last thread causes imbalance. We instead
423 // give one more to each thread whose index is less.
424 const size_t remainder = num_tasks % num_workers;
425 const size_t min_tasks = num_tasks / num_workers;
426
427 uint64_t task = begin;
428 for (size_t thread = 0; thread < num_workers; ++thread) {
429 const uint64_t my_end = task + min_tasks + (thread < remainder);
430 mem.Worker(thread).SetRange(task, my_end);
431 task = my_end;
432 }
433 HWY_DASSERT(task == end);
434 return true;
435 }
436
437 // Must be called for each `thread` in [0, num_workers), but only if
438 // Plan returned true.
439 static HWY_POOL_INLINE void WorkerRun(const size_t thread, size_t num_workers,
440 PoolMem& mem) {
441 // Nonzero, otherwise Plan returned false and this should not be called.
442 HWY_DASSERT(num_workers != 0);
443 HWY_DASSERT(thread < num_workers);
444
445 const PoolTasks& tasks = mem.tasks;
446
447 uint64_t begin, end;
448 const void* opaque;
449 const auto func = tasks.WorkerGet(begin, end, opaque);
450
451 // Special case for <= 1 task per worker - avoid any shared state.
452 if (HWY_UNLIKELY(end <= begin + num_workers)) {
453 const uint64_t task = begin + thread;
454 if (HWY_LIKELY(task < end)) {
455 func(opaque, task, thread);
456 }
457 return;
458 }
459
460 // For each worker in random order, attempt to do all their work.
461 for (uint32_t victim : mem.Worker(thread).Victims()) {
462 PoolWorker* other_worker = &mem.Worker(victim);
463
464 // Until all of other_worker's work is done:
465 const uint64_t end = other_worker->WorkerGetEnd();
466 for (;;) {
467 // On x86 this generates a LOCK prefix, but that is only expensive if
468 // there is actually contention, which is unlikely because we shard the
469 // counters, threads do not quite proceed in lockstep due to memory
470 // traffic, and stealing happens in semi-random order.
471 uint64_t task = other_worker->WorkerReserveTask();
472
473 // The worker that first sets `task` to `end` exits this loop. After
474 // that, `task` can be incremented up to `num_workers - 1` times, once
475 // per other worker.
476 HWY_DASSERT(task < end + num_workers);
477
478 if (HWY_UNLIKELY(task >= end)) {
479 hwy::Pause(); // Reduce coherency traffic while stealing.
480 break;
481 }
482 // `thread` is the one we are actually running on; this is important
483 // because it is the TLS index for user code.
484 func(opaque, task, thread);
485 }
486 }
487 }
488};
489
490#pragma pack(pop)
491
492// Sets the name of the current thread to the format string `format`, which must
493// include %d for `thread`. Currently only implemented for pthreads (*nix and
494// OSX); Windows involves throwing an exception.
495static inline void SetThreadName(const char* format, int thread) {
496#if HWY_OS_LINUX
497 char buf[16] = {}; // Linux limit, including \0
498 const int chars_written = snprintf(buf, sizeof(buf), format, thread);
499 HWY_ASSERT(0 < chars_written &&
500 chars_written <= static_cast<int>(sizeof(buf) - 1));
501 HWY_ASSERT(0 == pthread_setname_np(pthread_self(), buf));
502#else
503 (void)format;
504 (void)thread;
505#endif
506}
507
508// Highly efficient parallel-for, intended for workloads with thousands of
509// fork-join regions which consist of calling tasks[t](i) for a few hundred i,
510// using dozens of threads.
511//
512// To reduce scheduling overhead, we assume that tasks are statically known and
513// that threads do not schedule new work themselves. This allows us to avoid
514// queues and only store a counter plus the current task. The latter is a
515// pointer to a lambda function, without the allocation/indirection required for
516// std::function.
517//
518// To reduce fork/join latency, we use an efficient barrier, optionally
519// support spin-waits via SetWaitMode, and avoid any mutex/lock.
520//
521// To eliminate false sharing and enable reasoning about cache line traffic, the
522// worker state uses a single aligned allocation.
523//
524// For load-balancing, we use work stealing in random order.
526 static void ThreadFunc(size_t thread, size_t num_workers, PoolMem* mem) {
527 HWY_DASSERT(thread < num_workers);
528 SetThreadName("worker%03zu", static_cast<int>(thread));
529
530 // Ensure mem is ready to use (synchronize with PoolMemOwner's fence).
531 std::atomic_thread_fence(std::memory_order_acquire);
532
533 PoolWorker& worker = mem->Worker(thread);
534 PoolCommands& commands = mem->commands;
535 uint32_t prev_seq_cmd = PoolCommands::WorkerInitialSeqCmd();
536
537 for (;;) {
538 const PoolWaitMode wait_mode = worker.WorkerGetWaitMode();
539 const uint32_t command =
540 commands.WorkerWaitForNewCommand(wait_mode, prev_seq_cmd);
541 if (HWY_UNLIKELY(command == PoolCommands::kTerminate)) {
542 return; // exits thread
543 } else if (HWY_LIKELY(command == PoolCommands::kWork)) {
544 ParallelFor::WorkerRun(thread, num_workers, *mem);
545 mem->barrier.WorkerArrive(thread);
546 } else if (command == PoolCommands::kNop) {
547 // do nothing - used to change wait mode
548 } else {
549 HWY_DASSERT(false); // unknown command
550 }
551 }
552 }
553
554 public:
555 // This typically includes hyperthreads, hence it is a loose upper bound.
556 // -1 because these are in addition to the main thread.
557 static size_t MaxThreads() {
558 return static_cast<size_t>(std::thread::hardware_concurrency() - 1);
559 }
560
561 // `num_threads` should not exceed `MaxThreads()`. If `num_threads` <= 1,
562 // Run() runs only on the main thread. Otherwise, we launch `num_threads - 1`
563 // threads because the main thread also participates.
564 explicit ThreadPool(size_t num_threads) : owner_(num_threads) {
565 (void)busy_; // unused in non-debug builds, avoid warning
566 const size_t num_workers = owner_.NumWorkers();
567
568 // Launch threads without waiting afterwards: they will receive the next
569 // PoolCommands once ready.
570 threads_.reserve(num_workers - 1);
571 for (size_t thread = 0; thread < num_workers - 1; ++thread) {
572 threads_.emplace_back(ThreadFunc, thread, num_workers, owner_.Mem());
573 }
574 }
575
576 // Waits for all threads to exit.
578 PoolMem& mem = *owner_.Mem();
579 mem.commands.Broadcast(PoolCommands::kTerminate); // requests threads exit
580
581 for (std::thread& thread : threads_) {
582 HWY_ASSERT(thread.joinable());
583 thread.join();
584 }
585 }
586
587 ThreadPool(const ThreadPool&) = delete;
588 ThreadPool& operator&(const ThreadPool&) = delete;
589
590 // Returns number of PoolWorker, i.e., one more than the largest `thread`
591 // argument. Useful for callers that want to allocate thread-local storage.
592 size_t NumWorkers() const { return owner_.NumWorkers(); }
593
594 // `mode` is initially `kBlock`, which means futex. Switching to `kSpin`
595 // reduces fork-join overhead especially when there are many calls to `Run`,
596 // but wastes power when waiting over long intervals. Inexpensive, OK to call
597 // multiple times, but not concurrently with any `Run`.
599 // Run must not be active, otherwise we may overwrite the previous command
600 // before it is seen by all workers.
601 HWY_DASSERT(busy_.fetch_add(1) == 0);
602
603 PoolMem& mem = *owner_.Mem();
604
605 // For completeness/consistency, set on all workers, including the main
606 // thread, even though it will never wait for a command.
607 for (size_t thread = 0; thread < owner_.NumWorkers(); ++thread) {
608 mem.Worker(thread).SetWaitMode(mode);
609 }
610
611 // Send a no-op command so that workers wake as soon as possible. Skip the
612 // expensive barrier - workers may miss this command, but it is fine for
613 // them to wake up later and get the next actual command.
614 mem.commands.Broadcast(PoolCommands::kNop);
615
616 HWY_DASSERT(busy_.fetch_add(-1) == 1);
617 }
618
619 // parallel-for: Runs `closure(task, thread)` on worker thread(s) for every
620 // `task` in `[begin, end)`. Note that the unit of work should be large
621 // enough to amortize the function call overhead, but small enough that each
622 // worker processes a few tasks. Thus each `task` is usually a loop.
623 //
624 // Not thread-safe - concurrent calls to `Run` in the same ThreadPool are
625 // forbidden unless NumWorkers() == 0. We check for that in debug builds.
626 template <class Closure>
627 void Run(uint64_t begin, uint64_t end, const Closure& closure) {
628 const size_t num_workers = NumWorkers();
629 PoolMem& mem = *owner_.Mem();
630
631 if (HWY_LIKELY(ParallelFor::Plan(begin, end, num_workers, closure, mem))) {
632 // Only check if we are going to fork/join.
633 HWY_DASSERT(busy_.fetch_add(1) == 0);
634
635 mem.barrier.Reset();
636 mem.commands.Broadcast(PoolCommands::kWork);
637
638 // Also perform work on main thread instead of busy-waiting.
639 const size_t thread = num_workers - 1;
640 ParallelFor::WorkerRun(thread, num_workers, mem);
641 mem.barrier.WorkerArrive(thread);
642
643 mem.barrier.WaitAll(num_workers);
644
645 HWY_DASSERT(busy_.fetch_add(-1) == 1);
646 }
647 }
648
649 // Can pass this as init_closure when no initialization is needed.
650 // DEPRECATED, better to call the Run() overload without the init_closure arg.
651 static bool NoInit(size_t /*num_threads*/) { return true; } // DEPRECATED
652
653 // DEPRECATED equivalent of NumWorkers. Note that this is not the same as the
654 // ctor argument because num_threads = 0 has the same effect as 1.
655 size_t NumThreads() const { return NumWorkers(); } // DEPRECATED
656
657 // DEPRECATED prior interface with 32-bit tasks and first calling
658 // `init_closure(num_threads)`. Instead, perform any init before this, calling
659 // NumWorkers() for an upper bound on the thread indices, then call the
660 // other overload.
661 template <class InitClosure, class RunClosure>
662 bool Run(uint64_t begin, uint64_t end, const InitClosure& init_closure,
663 const RunClosure& run_closure) {
664 if (!init_closure(NumThreads())) return false;
665 Run(begin, end, run_closure);
666 return true;
667 }
668
669 // Only for use in tests.
670 PoolMem& InternalMem() const { return *owner_.Mem(); }
671
672 private:
673 // Unmodified after ctor, but cannot be const because we call thread::join().
674 std::vector<std::thread> threads_;
675
677
678 // In debug builds, detects if functions are re-entered; always present so
679 // that the memory layout does not change.
680 std::atomic<int> busy_{0};
681};
682
683} // namespace hwy
684
685#endif // HIGHWAY_HWY_CONTRIB_THREAD_POOL_THREAD_POOL_H_
#define HWY_ALIGNMENT
Definition aligned_allocator.h:41
#define HWY_MAX(a, b)
Definition base.h:177
#define HWY_MIN(a, b)
Definition base.h:176
#define HWY_ABORT(format,...)
Definition base.h:233
#define HWY_INLINE
Definition base.h:101
#define HWY_DASSERT(condition)
Definition base.h:290
#define HWY_ASSERT(condition)
Definition base.h:237
#define HWY_LIKELY(expr)
Definition base.h:106
#define HWY_UNLIKELY(expr)
Definition base.h:107
Definition base.h:2745
uint32_t GetDivisor() const
Definition base.h:2760
uint32_t Remainder(uint32_t n) const
Definition base.h:2770
Definition thread_pool.h:382
static HWY_POOL_INLINE void WorkerRun(const size_t thread, size_t num_workers, PoolMem &mem)
Definition thread_pool.h:439
static bool Plan(uint64_t begin, uint64_t end, size_t num_workers, const Closure &closure, PoolMem &mem)
Definition thread_pool.h:402
Definition thread_pool.h:289
void WorkerArrive(size_t thread)
Definition thread_pool.h:299
void Reset()
Definition thread_pool.h:293
HWY_POOL_INLINE void WaitAll(size_t num_workers)
Definition thread_pool.h:306
Definition thread_pool.h:229
static constexpr size_t kShift
Definition thread_pool.h:232
void Broadcast(uint32_t cmd)
Definition thread_pool.h:244
static constexpr uint32_t kMask
Definition thread_pool.h:231
uint32_t epoch_
Definition thread_pool.h:283
static constexpr uint32_t kNop
Definition thread_pool.h:237
static constexpr uint32_t kTerminate
Definition thread_pool.h:235
std::atomic< uint32_t > seq_cmd_
Definition thread_pool.h:284
static HWY_INLINE uint32_t SpinUntilDifferent(const uint32_t prev_seq_cmd, std::atomic< uint32_t > &current)
Definition thread_pool.h:271
static constexpr uint32_t kInitial
Definition thread_pool.h:230
static uint32_t WorkerInitialSeqCmd()
Definition thread_pool.h:241
uint32_t WorkerWaitForNewCommand(PoolWaitMode wait_mode, uint32_t &prev_seq_cmd)
Definition thread_pool.h:258
static constexpr uint32_t kWork
Definition thread_pool.h:236
Definition thread_pool.h:343
~PoolMemOwner()
Definition thread_pool.h:362
size_t NumWorkers() const
Definition thread_pool.h:369
PoolMem * Mem() const
Definition thread_pool.h:371
PoolMem * mem_
Definition thread_pool.h:377
hwy::AlignedFreeUniquePtr< uint8_t[]> bytes_
Definition thread_pool.h:376
const size_t num_workers_
Definition thread_pool.h:374
PoolMemOwner(size_t num_threads)
Definition thread_pool.h:345
Definition thread_pool.h:190
std::atomic< const void * > opaque_
Definition thread_pool.h:223
std::atomic< RunFunc > func_
Definition thread_pool.h:222
void(* RunFunc)(const void *opaque, uint64_t task, size_t thread_id)
Definition thread_pool.h:194
static void CallClosure(const void *opaque, uint64_t task, size_t thread)
Definition thread_pool.h:198
void Store(const Closure &closure, uint64_t begin, uint64_t end)
Definition thread_pool.h:205
std::atomic< uint64_t > begin_
Definition thread_pool.h:224
std::atomic< uint64_t > end_
Definition thread_pool.h:225
RunFunc WorkerGet(uint64_t &begin, uint64_t &end, const void *&opaque) const
Definition thread_pool.h:213
Definition thread_pool.h:123
HWY_POOL_SETRANGE_INLINE void SetRange(uint64_t begin, uint64_t end)
Definition thread_pool.h:163
std::array< uint32_t, kMaxVictims > victims_
Definition thread_pool.h:183
uint32_t num_victims_
Definition thread_pool.h:182
static constexpr size_t kMaxVictims
Definition thread_pool.h:124
void SetWaitMode(PoolWaitMode wait_mode)
Definition thread_pool.h:150
std::atomic< PoolWaitMode > wait_mode_
Definition thread_pool.h:181
PoolWaitMode WorkerGetWaitMode() const
Definition thread_pool.h:153
std::atomic< uint64_t > end_
Definition thread_pool.h:179
PoolWorker(size_t thread, size_t num_workers)
Definition thread_pool.h:127
uint64_t WorkerGetEnd() const
Definition thread_pool.h:170
hwy::Span< const uint32_t > Victims() const
Definition thread_pool.h:157
std::atomic< uint64_t > begin_
Definition thread_pool.h:178
uint8_t padding_[HWY_ALIGNMENT - 16 - 8 - sizeof(victims_)]
Definition thread_pool.h:185
uint64_t WorkerReserveTask()
Definition thread_pool.h:173
~PoolWorker()=default
Definition thread_pool.h:55
static bool CoprimeNonzero(uint32_t a, uint32_t b)
Definition thread_pool.h:69
ShuffledIota(uint32_t coprime)
Definition thread_pool.h:58
uint32_t coprime_
Definition thread_pool.h:114
static uint32_t FindAnotherCoprime(uint32_t size, uint32_t start)
Definition thread_pool.h:97
uint32_t Next(uint32_t current, const Divisor &divisor) const
Definition thread_pool.h:61
ShuffledIota()
Definition thread_pool.h:57
Definition aligned_allocator.h:267
Definition thread_pool.h:525
ThreadPool(const ThreadPool &)=delete
ThreadPool(size_t num_threads)
Definition thread_pool.h:564
PoolMem & InternalMem() const
Definition thread_pool.h:670
bool Run(uint64_t begin, uint64_t end, const InitClosure &init_closure, const RunClosure &run_closure)
Definition thread_pool.h:662
ThreadPool & operator&(const ThreadPool &)=delete
void SetWaitMode(PoolWaitMode mode)
Definition thread_pool.h:598
static size_t MaxThreads()
Definition thread_pool.h:557
static void ThreadFunc(size_t thread, size_t num_workers, PoolMem *mem)
Definition thread_pool.h:526
PoolMemOwner owner_
Definition thread_pool.h:676
~ThreadPool()
Definition thread_pool.h:577
size_t NumWorkers() const
Definition thread_pool.h:592
std::vector< std::thread > threads_
Definition thread_pool.h:674
size_t NumThreads() const
Definition thread_pool.h:655
void Run(uint64_t begin, uint64_t end, const Closure &closure)
Definition thread_pool.h:627
static bool NoInit(size_t)
Definition thread_pool.h:651
Definition abort.h:8
std::unique_ptr< T, AlignedFreer > AlignedFreeUniquePtr
Definition aligned_allocator.h:247
static void SetThreadName(const char *format, int thread)
Definition thread_pool.h:495
HWY_INLINE HWY_ATTR_CACHE void Pause()
Definition cache_control.h:108
static void WakeAll(std::atomic< uint32_t > &current)
Definition futex.h:158
HWY_API size_t Num0BitsBelowLS1Bit_Nonzero32(const uint32_t x)
Definition base.h:2540
PoolWaitMode
Definition thread_pool.h:120
constexpr size_t CeilLog2(TI x)
Definition base.h:2669
static uint32_t BlockUntilDifferent(const uint32_t prev, const std::atomic< uint32_t > &current)
Definition futex.h:80
HWY_DLLEXPORT HWY_NORETURN void int const char * format
Definition base.h:231
Definition thread_pool.h:324
PoolTasks tasks
Definition thread_pool.h:331
PoolBarrier barrier
Definition thread_pool.h:336
PoolCommands commands
Definition thread_pool.h:332
PoolWorker & Worker(size_t thread)
Definition thread_pool.h:325
#define HWY_POOL_INLINE
Definition thread_pool.h:41
#define HWY_POOL_SETRANGE_INLINE
Definition thread_pool.h:48