Dispatch Queue
Dispatch Queue / Thread Pool implementation for C++11 with built-in C++20 coroutine support
 
Loading...
Searching...
No Matches
dispatch_queue.hpp
1#pragma once
2
3#include <functional>
4#include <utility>
5
6#include "function_result.hpp"
7#include "task.hpp"
8#include "promise.hpp"
9#include "worker_pool.hpp"
10
11namespace dispatch_queue {
12
14public:
20
26
38 template<typename Fn>
39 dispatch_queue(int thread_count, Fn&& worker_init) {
40 if (thread_count < 0) {
41 thread_count = std::thread::hardware_concurrency();
42 }
43 if (thread_count > 0) {
44 worker_pool = std::make_unique<detail::worker_pool>(task_queue, thread_count, worker_init);
45 }
46 }
47
48 dispatch_queue(const dispatch_queue&) = delete;
49 dispatch_queue& operator=(const dispatch_queue&) = delete;
50
55
63 template<typename F, typename... Args, typename Ret = detail::function_result<F, Args...>>
64 task<Ret> dispatch(F&& f, Args&&... args) {
65 return dispatch_internal(false, std::forward<F>(f), std::forward<Args>(args)...);
66 }
67
76 template<typename F, typename... Args, typename Ret = detail::function_result<F, Args...>>
77 task<Ret> dispatch_main(F&& f, Args&&... args) {
78 return dispatch_internal(true, std::forward<F>(f), std::forward<Args>(args)...);
79 }
80
84 bool is_threaded() const;
85
90 int thread_count() const;
91
95 size_t size() const;
96
100 bool empty() const;
101
106 void clear();
107
112 void main_loop();
113
117 void wait();
118
124 template<class Rep, class Period>
125 bool wait_for(const std::chrono::duration<Rep, Period>& timeout_duration) {
126 if (worker_pool) {
127 return worker_pool->wait_for(timeout_duration);
128 }
129 else {
130 return true;
131 }
132 }
133
139 template<class Clock, class Duration>
140 bool wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time) {
141 if (worker_pool) {
142 return worker_pool->wait_until(timeout_time);
143 }
144 else {
145 return true;
146 }
147 }
148
154 void shutdown();
155
156#ifdef __cpp_lib_coroutine
157private:
158 struct dispatch_awaiter {
160
161 bool await_ready() const noexcept { return false; }
162 void await_suspend(std::coroutine_handle<> cont) const {
163 dispatch_queue.dispatch([cont]{
164 cont();
165 if (cont.done()) {
166 cont.destroy();
167 }
168 });
169 }
170 void await_resume() {}
171 };
172
173 struct dispatch_main_awaiter {
175
176 bool await_ready() const noexcept { return false; }
177 void await_suspend(std::coroutine_handle<> cont) const {
179 cont();
180 if (cont.done()) {
181 cont.destroy();
182 }
183 });
184 }
185 void await_resume() {}
186 };
187public:
198 dispatch_awaiter dispatch() {
199 return dispatch_awaiter(*this);
200 }
201
211 dispatch_main_awaiter dispatch_main() {
212 return dispatch_main_awaiter(*this);
213 }
214#endif
215
216private:
217 std::unique_ptr<detail::worker_pool> worker_pool;
219
220 template<typename F, typename... Args, typename Ret = detail::function_result<F, Args...>>
221 task<Ret> dispatch_internal(bool run_on_main_loop, F&& f, Args&&... args) {
222 auto work = std::bind(std::move(f), std::forward<Args>(args)...);
223 if (worker_pool) {
224 auto future = detail::task_future<Ret>::create_pending();
225 worker_pool->enqueue_task({ future->wrap(work) }, run_on_main_loop);
226 return task<Ret>(future);
227 }
228 else if (run_on_main_loop) {
229 auto future = detail::task_future<Ret>::create_pending();
230 task_queue.push({ future->wrap(work) }, run_on_main_loop);
231 return task<Ret>(future);
232 }
233 else {
234 auto future = detail::task_future<Ret>::create(work);
235 return task<Ret>(future);
236 }
237 }
238};
239
240} // end namespace dispatch_queue
Definition pending_task_queue.hpp:11
Definition dispatch_queue.hpp:13
task< Ret > dispatch_main(F &&f, Args &&... args)
Definition dispatch_queue.hpp:77
dispatch_main_awaiter dispatch_main()
Definition dispatch_queue.hpp:211
bool wait_for(const std::chrono::duration< Rep, Period > &timeout_duration)
Definition dispatch_queue.hpp:125
dispatch_awaiter dispatch()
Definition dispatch_queue.hpp:198
int thread_count() const
Definition dispatch_queue.cpp:22
task< Ret > dispatch(F &&f, Args &&... args)
Definition dispatch_queue.hpp:64
bool wait_until(const std::chrono::time_point< Clock, Duration > &timeout_time)
Definition dispatch_queue.hpp:140
dispatch_queue(int thread_count, Fn &&worker_init)
Definition dispatch_queue.hpp:39
dispatch_queue()
Definition dispatch_queue.cpp:5
Definition task.hpp:22