Dispatch Queue
Dispatch Queue / Thread Pool implementation for C++11 with built-in C++20 coroutine support
 
Loading...
Searching...
No Matches
task_future.hpp
1#pragma once
2
3#include <condition_variable>
4#include <exception>
5#include <functional>
6#include <memory>
7#include <mutex>
8#include <vector>
9
10#include "function_result.hpp"
11
12namespace dispatch_queue {
13
14enum class task_state {
16 pending,
18 ready,
20 failed,
21};
22
23namespace detail {
24
25#ifdef __cpp_exceptions
26 #define DISPATCH_QUEUE_TRY try
27 #define DISPATCH_QUEUE_CATCH(...) catch(__VA_ARGS__)
28#else
29 #define DISPATCH_QUEUE_TRY
30 #define DISPATCH_QUEUE_CATCH(...) if (0)
31#endif
32
33class task_future_base {
34 auto wait_predicate() {
35 return [this]{ return state != task_state::pending; };
36 }
37public:
38 task_state get_state() {
39 std::lock_guard<std::mutex> lock(mutex);
40 return state;
41 }
42
43 std::exception_ptr get_exception() {
44 std::lock_guard<std::mutex> lock(mutex);
45 return exception;
46 }
47
48 void set_exception(std::exception_ptr exception) {
49 {
50 std::lock_guard<std::mutex> lock(mutex);
51 state = task_state::failed;
52 this->exception = exception;
53 }
54 condition_variable.notify_all();
55 }
56
57 void wait() {
58 std::unique_lock<std::mutex> lock(mutex);
59 condition_variable.wait(lock, wait_predicate());
60 }
61
62 template<class Rep, class Period>
63 bool wait_for(const std::chrono::duration<Rep, Period>& timeout_duration) {
64 std::unique_lock<std::mutex> lock(mutex);
65 return condition_variable.wait_for(lock, timeout_duration, wait_predicate());
66 }
67
68 template<class Clock, class Duration>
69 bool wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time) {
70 std::unique_lock<std::mutex> lock(mutex);
71 return condition_variable.wait_until(lock, timeout_time, wait_predicate());
72 }
73
74protected:
75 std::mutex mutex;
76 std::condition_variable condition_variable;
77 std::exception_ptr exception;
78 task_state state;
79
81
82 task_future_base(private_construct, task_state state)
83 : state(state)
84 {
85 }
86 task_future_base(private_construct, std::exception_ptr exception)
87 : state(task_state::failed)
88 , exception(exception)
89 {
90 }
91
92 task_future_base(const task_future_base&) = delete;
93 task_future_base& operator=(const task_future_base&) = delete;
94};
95
96
97template<typename T>
98class task_future : public task_future_base, public std::enable_shared_from_this<task_future<T>> {
99public:
100 using value_type = T;
101
102 template<typename... Args>
103 task_future(private_construct, Args&&... args)
104 : task_future_base(private_construct{}, std::forward<Args>(args)...)
105 , empty()
106 {
107 }
108 task_future(private_construct, T&& value)
109 : task_future_base(private_construct{}, task_state::ready)
110 , value(std::move(value))
111 {
112 }
113
114 ~task_future() {
115 if (state == task_state::ready) {
116 value.~T();
117 }
118 }
119
120 static std::shared_ptr<task_future> create_pending() {
121 return std::make_shared<task_future>(private_construct{}, task_state::pending);
122 }
123 static std::shared_ptr<task_future> create_ready(T&& value) {
124 return std::make_shared<task_future>(private_construct{}, std::move(value));
125 }
126 static std::shared_ptr<task_future> create_failed(std::exception_ptr exception) {
127 return std::make_shared<task_future>(private_construct{}, exception);
128 }
129 template<typename F>
130 static std::shared_ptr<task_future> create(F&& work) {
131 DISPATCH_QUEUE_TRY {
132 auto value = work();
133 return create_ready(std::move(value));
134 }
135 DISPATCH_QUEUE_CATCH(...) {
136 return create_failed(std::current_exception());
137 }
138 }
139
140 template<typename F>
141 auto then(F&& f) {
142 auto continuation_future = task_future<function_result<F>>::create_pending();
143 std::unique_lock<std::mutex> lock(mutex);
144 if (state == task_state::pending) {
145 continuations.push_back([=]() {
146 continuation_future->do_work(f);
147 });
148 }
149 else {
150 lock.unlock();
151 continuation_future->do_work(f);
152 }
153 return continuation_future;
154 }
155
156 T get() {
157 wait();
158 if (exception) {
159 std::rethrow_exception(exception);
160 }
161 return value;
162 }
163
164 template<typename F, typename... Args>
165 void do_work(F&& work, Args&&... args) {
166 DISPATCH_QUEUE_TRY {
167 auto value = work(std::forward<Args>(args)...);
168 set_value(std::move(value));
169 }
170 DISPATCH_QUEUE_CATCH(...) {
171 set_exception(std::current_exception());
172 }
173
174 auto continuations = std::move(this->continuations);
175 for (auto&& continuation : continuations) {
176 continuation();
177 }
178 }
179
180 template<typename F>
181 auto wrap(F&& work) {
182 auto shared_this = this->shared_from_this();
183 return [shared_this, work]{
184 shared_this->do_work(work);
185 };
186 }
187
188 void set_value(T&& value) {
189 {
190 std::lock_guard<std::mutex> lock(mutex);
191 state = task_state::ready;
192 this->value = std::move(value);
193 }
194 condition_variable.notify_all();
195 }
196
197private:
198 std::vector<std::function<void()>> continuations;
199 union {
200 struct{} empty;
201 T value;
202 };
203};
204
205
206template<>
207class task_future<void> : public task_future_base, public std::enable_shared_from_this<task_future<void>> {
208public:
209 using value_type = void;
210
211 template<typename... Args>
212 task_future(private_construct, Args&&... args)
213 : task_future_base(private_construct{}, std::forward<Args>(args)...)
214 {
215 }
216
217 static std::shared_ptr<task_future> create_pending() {
218 return std::make_shared<task_future>(private_construct{}, task_state::pending);
219 }
220 static std::shared_ptr<task_future> create_ready() {
221 return std::make_shared<task_future>(private_construct{}, task_state::ready);
222 }
223 static std::shared_ptr<task_future> create_failed(std::exception_ptr exception) {
224 return std::make_shared<task_future>(private_construct{}, exception);
225 }
226 template<typename F>
227 static std::shared_ptr<task_future> create(F&& work) {
228 DISPATCH_QUEUE_TRY {
229 work();
230 return create_ready();
231 }
232 DISPATCH_QUEUE_CATCH(...) {
233 return create_failed(std::current_exception());
234 }
235 }
236
237 template<typename F>
238 auto then(F&& f) {
239 auto continuation_future = task_future<function_result<F>>::create_pending();
240 std::unique_lock<std::mutex> lock(mutex);
241 if (state == task_state::pending) {
242 continuations.push_back([=]() {
243 continuation_future->do_work(f);
244 });
245 }
246 else {
247 lock.unlock();
248 continuation_future->do_work(f);
249 }
250 return continuation_future;
251 }
252
253 void get() {
254 wait();
255 if (exception) {
256 std::rethrow_exception(exception);
257 }
258 }
259
260 template<typename F, typename... Args>
261 void do_work(F&& work, Args&&... args) {
262 DISPATCH_QUEUE_TRY {
263 work(std::forward<Args>(args)...);
264 set_value();
265 }
266 DISPATCH_QUEUE_CATCH(...) {
267 set_exception(std::current_exception());
268 }
269
270 auto continuations = std::move(this->continuations);
271 for (auto&& continuation : continuations) {
272 continuation();
273 }
274 }
275
276 template<typename F>
277 auto wrap(F&& work) {
278 auto shared_this = this->shared_from_this();
279 return [shared_this, work]{
280 shared_this->do_work(work);
281 };
282 }
283
284 void set_value() {
285 {
286 std::lock_guard<std::mutex> lock(mutex);
287 state = task_state::ready;
288 }
289 condition_variable.notify_all();
290 }
291
292private:
293 std::vector<std::function<void()>> continuations;
294};
295
296} // end namespace detail
297
298} // end namespace dispatch_queue
Definition task_future.hpp:33