20#ifndef HIGHWAY_HWY_CONTRIB_THREAD_POOL_THREAD_POOL_H_
21#define HIGHWAY_HWY_CONTRIB_THREAD_POOL_THREAD_POOL_H_
41#define HWY_POOL_INLINE HWY_NOINLINE
43#ifndef HWY_POOL_SETRANGE_INLINE
46#define HWY_POOL_SETRANGE_INLINE HWY_NOINLINE
48#define HWY_POOL_SETRANGE_INLINE
61 uint32_t
Next(uint32_t current,
const Divisor& divisor)
const {
73 if (
HWY_MIN(trailing_a, trailing_b) != 0)
return false;
81 const uint32_t tmp_a = a;
86 if (b == 1)
return true;
90 if (a == 0)
return false;
103 const uint32_t inc = (size & 1) ? 1 : 2;
105 for (uint32_t x = start | 1; x < start + size * 16; x += inc) {
131 const Divisor div_workers(
static_cast<uint32_t
>(num_workers));
135 static_cast<uint32_t
>(num_workers),
136 static_cast<uint32_t
>((thread + 1) * 257 + thread * 13));
140 victims_[0] =
static_cast<uint32_t
>(thread);
151 wait_mode_.store(wait_mode, std::memory_order_release);
154 return wait_mode_.load(std::memory_order_acquire);
164 const auto rel = std::memory_order_release;
166 end_.store(end, rel);
174 return begin_.fetch_add(1, std::memory_order_relaxed);
194 typedef void (*
RunFunc)(
const void* opaque, uint64_t task,
size_t thread_id);
197 template <
class Closure>
198 static void CallClosure(
const void* opaque, uint64_t task,
size_t thread) {
199 (*
reinterpret_cast<const Closure*
>(opaque))(task, thread);
204 template <
class Closure>
205 void Store(
const Closure& closure, uint64_t begin, uint64_t end) {
206 const auto rel = std::memory_order_release;
207 func_.store(
static_cast<RunFunc>(&CallClosure<Closure>), rel);
208 opaque_.store(
reinterpret_cast<const void*
>(&closure), rel);
210 end_.store(end, rel);
214 const auto acq = std::memory_order_acquire;
216 end =
end_.load(acq);
218 return func_.load(acq);
231 static constexpr uint32_t
kMask = 0xF;
236 static constexpr uint32_t
kWork = 2;
237 static constexpr uint32_t
kNop = 3;
246 const uint32_t epoch = ++
epoch_;
247 const uint32_t seq_cmd = (epoch <<
kShift) | cmd;
248 seq_cmd_.store(seq_cmd, std::memory_order_release);
259 uint32_t& prev_seq_cmd) {
266 prev_seq_cmd = seq_cmd;
267 return seq_cmd &
kMask;
272 const uint32_t prev_seq_cmd, std::atomic<uint32_t>& current) {
275 const uint32_t seq_cmd = current.load(std::memory_order_acquire);
276 if (seq_cmd != prev_seq_cmd)
return seq_cmd;
290 static constexpr size_t kU64PerCacheLine =
HWY_ALIGNMENT /
sizeof(uint64_t);
294 for (
size_t i = 0; i < 4; ++i) {
295 num_finished_[i * kU64PerCacheLine].store(0, std::memory_order_release);
300 const size_t i = (thread & 3);
301 num_finished_[i * kU64PerCacheLine].fetch_add(1, std::memory_order_release);
307 const auto acq = std::memory_order_acquire;
310 const uint64_t sum = num_finished_[0 * kU64PerCacheLine].load(acq) +
311 num_finished_[1 * kU64PerCacheLine].load(acq) +
312 num_finished_[2 * kU64PerCacheLine].load(acq) +
313 num_finished_[3 * kU64PerCacheLine].load(acq);
314 if (sum == num_workers)
break;
320 std::atomic<uint64_t> num_finished_[4 * kU64PerCacheLine];
326 return *
reinterpret_cast<PoolWorker*
>(
reinterpret_cast<uint8_t*
>(&barrier) +
347 : num_workers_(
HWY_MAX(num_threads, size_t{1})) {
349 bytes_ = hwy::AllocateAligned<uint8_t>(size);
351 mem_ =
new (bytes_.get())
PoolMem();
353 for (
size_t thread = 0; thread < num_workers_; ++thread) {
354 new (&mem_->Worker(thread))
PoolWorker(thread, num_workers_);
359 std::atomic_thread_fence(std::memory_order_release);
363 for (
size_t thread = 0; thread < num_workers_; ++thread) {
364 mem_->Worker(thread).~PoolWorker();
401 template <
class Closure>
402 static bool Plan(uint64_t begin, uint64_t end,
size_t num_workers,
403 const Closure& closure,
PoolMem& mem) {
406 const size_t num_tasks =
static_cast<size_t>(end - begin);
412 for (uint64_t task = begin; task < end; ++task) {
424 const size_t remainder = num_tasks % num_workers;
425 const size_t min_tasks = num_tasks / num_workers;
427 uint64_t task = begin;
428 for (
size_t thread = 0; thread < num_workers; ++thread) {
429 const uint64_t my_end = task + min_tasks + (thread < remainder);
449 const auto func = tasks.
WorkerGet(begin, end, opaque);
453 const uint64_t task = begin + thread;
455 func(opaque, task, thread);
484 func(opaque, task, thread);
498 const int chars_written = snprintf(buf,
sizeof(buf),
format, thread);
500 chars_written <=
static_cast<int>(
sizeof(buf) - 1));
501 HWY_ASSERT(0 == pthread_setname_np(pthread_self(), buf));
531 std::atomic_thread_fence(std::memory_order_acquire);
535 uint32_t prev_seq_cmd = PoolCommands::WorkerInitialSeqCmd();
539 const uint32_t command =
541 if (
HWY_UNLIKELY(command == PoolCommands::kTerminate)) {
543 }
else if (
HWY_LIKELY(command == PoolCommands::kWork)) {
544 ParallelFor::WorkerRun(thread, num_workers, *mem);
546 }
else if (command == PoolCommands::kNop) {
558 return static_cast<size_t>(std::thread::hardware_concurrency() - 1);
564 explicit ThreadPool(
size_t num_threads) : owner_(num_threads) {
566 const size_t num_workers = owner_.NumWorkers();
570 threads_.reserve(num_workers - 1);
571 for (
size_t thread = 0; thread < num_workers - 1; ++thread) {
572 threads_.emplace_back(ThreadFunc, thread, num_workers, owner_.Mem());
581 for (std::thread& thread : threads_) {
607 for (
size_t thread = 0; thread < owner_.NumWorkers(); ++thread) {
626 template <
class Closure>
627 void Run(uint64_t begin, uint64_t end,
const Closure& closure) {
628 const size_t num_workers = NumWorkers();
631 if (
HWY_LIKELY(ParallelFor::Plan(begin, end, num_workers, closure, mem))) {
639 const size_t thread = num_workers - 1;
640 ParallelFor::WorkerRun(thread, num_workers, mem);
651 static bool NoInit(
size_t ) {
return true; }
661 template <
class InitClosure,
class RunClosure>
662 bool Run(uint64_t begin, uint64_t end,
const InitClosure& init_closure,
663 const RunClosure& run_closure) {
664 if (!init_closure(NumThreads()))
return false;
665 Run(begin, end, run_closure);
680 std::atomic<int> busy_{0};
#define HWY_ALIGNMENT
Definition aligned_allocator.h:41
#define HWY_MAX(a, b)
Definition base.h:177
#define HWY_MIN(a, b)
Definition base.h:176
#define HWY_ABORT(format,...)
Definition base.h:233
#define HWY_INLINE
Definition base.h:101
#define HWY_DASSERT(condition)
Definition base.h:290
#define HWY_ASSERT(condition)
Definition base.h:237
#define HWY_LIKELY(expr)
Definition base.h:106
#define HWY_UNLIKELY(expr)
Definition base.h:107
uint32_t GetDivisor() const
Definition base.h:2760
uint32_t Remainder(uint32_t n) const
Definition base.h:2770
Definition thread_pool.h:382
static HWY_POOL_INLINE void WorkerRun(const size_t thread, size_t num_workers, PoolMem &mem)
Definition thread_pool.h:439
static bool Plan(uint64_t begin, uint64_t end, size_t num_workers, const Closure &closure, PoolMem &mem)
Definition thread_pool.h:402
Definition thread_pool.h:289
void WorkerArrive(size_t thread)
Definition thread_pool.h:299
void Reset()
Definition thread_pool.h:293
HWY_POOL_INLINE void WaitAll(size_t num_workers)
Definition thread_pool.h:306
Definition thread_pool.h:229
static constexpr size_t kShift
Definition thread_pool.h:232
void Broadcast(uint32_t cmd)
Definition thread_pool.h:244
static constexpr uint32_t kMask
Definition thread_pool.h:231
uint32_t epoch_
Definition thread_pool.h:283
static constexpr uint32_t kNop
Definition thread_pool.h:237
static constexpr uint32_t kTerminate
Definition thread_pool.h:235
std::atomic< uint32_t > seq_cmd_
Definition thread_pool.h:284
static HWY_INLINE uint32_t SpinUntilDifferent(const uint32_t prev_seq_cmd, std::atomic< uint32_t > ¤t)
Definition thread_pool.h:271
static constexpr uint32_t kInitial
Definition thread_pool.h:230
static uint32_t WorkerInitialSeqCmd()
Definition thread_pool.h:241
uint32_t WorkerWaitForNewCommand(PoolWaitMode wait_mode, uint32_t &prev_seq_cmd)
Definition thread_pool.h:258
static constexpr uint32_t kWork
Definition thread_pool.h:236
Definition thread_pool.h:343
~PoolMemOwner()
Definition thread_pool.h:362
size_t NumWorkers() const
Definition thread_pool.h:369
PoolMem * Mem() const
Definition thread_pool.h:371
PoolMem * mem_
Definition thread_pool.h:377
hwy::AlignedFreeUniquePtr< uint8_t[]> bytes_
Definition thread_pool.h:376
const size_t num_workers_
Definition thread_pool.h:374
PoolMemOwner(size_t num_threads)
Definition thread_pool.h:345
Definition thread_pool.h:190
std::atomic< const void * > opaque_
Definition thread_pool.h:223
std::atomic< RunFunc > func_
Definition thread_pool.h:222
void(* RunFunc)(const void *opaque, uint64_t task, size_t thread_id)
Definition thread_pool.h:194
static void CallClosure(const void *opaque, uint64_t task, size_t thread)
Definition thread_pool.h:198
void Store(const Closure &closure, uint64_t begin, uint64_t end)
Definition thread_pool.h:205
std::atomic< uint64_t > begin_
Definition thread_pool.h:224
std::atomic< uint64_t > end_
Definition thread_pool.h:225
RunFunc WorkerGet(uint64_t &begin, uint64_t &end, const void *&opaque) const
Definition thread_pool.h:213
Definition thread_pool.h:123
HWY_POOL_SETRANGE_INLINE void SetRange(uint64_t begin, uint64_t end)
Definition thread_pool.h:163
std::array< uint32_t, kMaxVictims > victims_
Definition thread_pool.h:183
uint32_t num_victims_
Definition thread_pool.h:182
static constexpr size_t kMaxVictims
Definition thread_pool.h:124
void SetWaitMode(PoolWaitMode wait_mode)
Definition thread_pool.h:150
std::atomic< PoolWaitMode > wait_mode_
Definition thread_pool.h:181
PoolWaitMode WorkerGetWaitMode() const
Definition thread_pool.h:153
std::atomic< uint64_t > end_
Definition thread_pool.h:179
PoolWorker(size_t thread, size_t num_workers)
Definition thread_pool.h:127
uint64_t WorkerGetEnd() const
Definition thread_pool.h:170
hwy::Span< const uint32_t > Victims() const
Definition thread_pool.h:157
std::atomic< uint64_t > begin_
Definition thread_pool.h:178
uint8_t padding_[HWY_ALIGNMENT - 16 - 8 - sizeof(victims_)]
Definition thread_pool.h:185
uint64_t WorkerReserveTask()
Definition thread_pool.h:173
Definition thread_pool.h:55
static bool CoprimeNonzero(uint32_t a, uint32_t b)
Definition thread_pool.h:69
ShuffledIota(uint32_t coprime)
Definition thread_pool.h:58
uint32_t coprime_
Definition thread_pool.h:114
static uint32_t FindAnotherCoprime(uint32_t size, uint32_t start)
Definition thread_pool.h:97
uint32_t Next(uint32_t current, const Divisor &divisor) const
Definition thread_pool.h:61
ShuffledIota()
Definition thread_pool.h:57
Definition aligned_allocator.h:267
Definition thread_pool.h:525
ThreadPool(const ThreadPool &)=delete
ThreadPool(size_t num_threads)
Definition thread_pool.h:564
PoolMem & InternalMem() const
Definition thread_pool.h:670
bool Run(uint64_t begin, uint64_t end, const InitClosure &init_closure, const RunClosure &run_closure)
Definition thread_pool.h:662
ThreadPool & operator&(const ThreadPool &)=delete
void SetWaitMode(PoolWaitMode mode)
Definition thread_pool.h:598
static size_t MaxThreads()
Definition thread_pool.h:557
static void ThreadFunc(size_t thread, size_t num_workers, PoolMem *mem)
Definition thread_pool.h:526
PoolMemOwner owner_
Definition thread_pool.h:676
~ThreadPool()
Definition thread_pool.h:577
size_t NumWorkers() const
Definition thread_pool.h:592
std::vector< std::thread > threads_
Definition thread_pool.h:674
size_t NumThreads() const
Definition thread_pool.h:655
void Run(uint64_t begin, uint64_t end, const Closure &closure)
Definition thread_pool.h:627
static bool NoInit(size_t)
Definition thread_pool.h:651
std::unique_ptr< T, AlignedFreer > AlignedFreeUniquePtr
Definition aligned_allocator.h:247
static void SetThreadName(const char *format, int thread)
Definition thread_pool.h:495
HWY_INLINE HWY_ATTR_CACHE void Pause()
Definition cache_control.h:108
static void WakeAll(std::atomic< uint32_t > ¤t)
Definition futex.h:158
HWY_API size_t Num0BitsBelowLS1Bit_Nonzero32(const uint32_t x)
Definition base.h:2540
PoolWaitMode
Definition thread_pool.h:120
constexpr size_t CeilLog2(TI x)
Definition base.h:2669
static uint32_t BlockUntilDifferent(const uint32_t prev, const std::atomic< uint32_t > ¤t)
Definition futex.h:80
HWY_DLLEXPORT HWY_NORETURN void int const char * format
Definition base.h:231
Definition thread_pool.h:324
PoolTasks tasks
Definition thread_pool.h:331
PoolBarrier barrier
Definition thread_pool.h:336
PoolCommands commands
Definition thread_pool.h:332
PoolWorker & Worker(size_t thread)
Definition thread_pool.h:325
#define HWY_POOL_INLINE
Definition thread_pool.h:41
#define HWY_POOL_SETRANGE_INLINE
Definition thread_pool.h:48