Dispatch Queue
Dispatch Queue / Thread Pool implementation for C++11 with built-in C++20 coroutine support
 
Loading...
Searching...
No Matches
worker_pool.hpp
1#pragma once
2
3#include <condition_variable>
4#include <mutex>
5#include <thread>
6
7#include "pending_task_queue.hpp"
8
9
10namespace dispatch_queue {
11
12namespace detail {
13
14class worker_pool {
15 auto wait_predicate() const {
16 return [this]{ return is_shutting_down || task_queue.empty(); };
17 }
18public:
19 template<typename Fn>
20 worker_pool(pending_task_queue& task_queue, int thread_count, Fn&& worker_init)
21 : task_queue(task_queue)
22 {
23 worker_threads.reserve(thread_count);
24 for (int i = 0; i < thread_count; i++) {
25 worker_threads.emplace_back([&, this, i]() {
26 worker_init(i);
27 run_task_loop();
28 });
29 }
30 }
31 ~worker_pool();
32
33 worker_pool(const worker_pool&) = delete;
34 worker_pool& operator=(const worker_pool&) = delete;
35
36 int thread_count() const;
37 size_t size();
38
39 void enqueue_task(pending_task&& task, bool run_on_main_loop);
40 std::deque<pending_task> pop_main_loop_tasks();
41 void clear();
42 void shutdown();
43
44 void wait();
45
46 template<class Rep, class Period>
47 bool wait_for(const std::chrono::duration<Rep, Period>& timeout_duration) {
48 std::unique_lock<std::mutex> lock(mutex);
49 return all_done_condition_variable.wait_for(lock, timeout_duration, wait_predicate());
50 }
51
52 template<class Clock, class Duration>
53 bool wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time) {
54 std::unique_lock<std::mutex> lock(mutex);
55 return all_done_condition_variable.wait_until(lock, timeout_time, wait_predicate());
56 }
57
58private:
59 std::mutex mutex;
60 std::condition_variable task_condition_variable;
61 std::condition_variable all_done_condition_variable;
62 std::vector<std::thread> worker_threads;
63 pending_task_queue& task_queue;
64 bool is_shutting_down;
65
66 void run_task_loop();
67};
68
69} // end namespace detail
70
71} // end namespace dispatch_queue
Definition pending_task_queue.hpp:11
Definition task.hpp:22