Dispatch Queue
Dispatch Queue / Thread Pool implementation for C++11 with built-in C++20 coroutine support
 
Loading...
Searching...
No Matches
worker_pool.cpp
1#include "../include/worker_pool.hpp"
2
3namespace dispatch_queue {
4
5namespace detail {
6
7worker_pool::~worker_pool() {
8 shutdown();
9}
10
11int worker_pool::thread_count() const {
12 return worker_threads.size();
13}
14
15size_t worker_pool::size() {
16 std::lock_guard<std::mutex> lock(mutex);
17 return task_queue.size();
18}
19
20void worker_pool::enqueue_task(pending_task&& task, bool run_on_main_loop) {
21 {
22 std::lock_guard<std::mutex> lock(mutex);
23 task_queue.push(std::move(task), run_on_main_loop);
24 }
25 task_condition_variable.notify_one();
26}
27
28std::deque<pending_task> worker_pool::pop_main_loop_tasks() {
29 std::lock_guard<std::mutex> lock(mutex);
30 return task_queue.pop_main_loop_tasks();
31}
32
33void worker_pool::clear() {
34 std::lock_guard<std::mutex> lock(mutex);
35 task_queue.clear();
36}
37
38void worker_pool::shutdown() {
39 if (worker_threads.empty()) {
40 return;
41 }
42
43 {
44 std::lock_guard<std::mutex> lock(mutex);
45 is_shutting_down = true;
46 }
47 for (int i = 0; i < thread_count(); i++) {
48 task_condition_variable.notify_one();
49 }
50 for (auto& thread : worker_threads) {
51 if (thread.joinable()) {
52 thread.join();
53 }
54 }
55 worker_threads.clear();
56 is_shutting_down = false;
57}
58
59void worker_pool::wait() {
60 std::unique_lock<std::mutex> lock(mutex);
61 all_done_condition_variable.wait(lock, wait_predicate());
62}
63
64void worker_pool::run_task_loop() {
65 while (true) {
66 // 1. Get a valid task
67 pending_task task;
68 {
69 std::unique_lock<std::mutex> lock(mutex);
70 task_condition_variable.wait(lock, [this, &task]() { return is_shutting_down || task_queue.try_pop(task); });
71 if (is_shutting_down) {
72 return;
73 }
74 }
75
76 // 2. Do some work
77 task();
78
79 // 3. If all is done, notify waiters
80 bool all_done;
81 {
82 std::lock_guard<std::mutex> lock(mutex);
83 all_done = task_queue.empty();
84 }
85 if (all_done) {
86 all_done_condition_variable.notify_all();
87 }
88 }
89}
90
91} // end namespace detail
92
93} // end namespace dispatch_queue