3 #include "prepared_coro.h"
4 #include "exceptions.h"
6 #include <condition_variable>
26 template<
typename CondVar>
38 _thr.reserve(threads);
41 template<std::convertible_to<CondVar> CondVarInit>
43 :_to_start(threads),_cond(std::forward<CondVarInit>(cinit)) {
53 template<std::invocable<> Fn>
55 std::unique_lock lk(_mx);
56 if (_stop)
return false;
57 _que.emplace(std::forward<Fn>(fn));
65 template<std::invocable<> Fn>
67 return enqueue(std::forward<Fn>(fn));
79 template<
typename Fn,
typename ... Args>
80 requires std::invocable<Fn, Args...>
81 auto run(Fn &&fn, Args && ... args) ->
future<std::invoke_result_t<Fn, Args...> > {
85 args = std::make_tuple(std::forward<Args>(args)...)]()
mutable{
87 if constexpr(std::is_void_v<std::invoke_result_t<Fn, Args...> >) {
88 std::apply(fn, std::move(args));
91 promise([&]{
return std::apply(fn, std::move(args));});
106 std::vector<std::thread> tmp;
108 std::lock_guard lk(_mx);
112 std::swap(_thr, tmp);
115 auto this_thr = std::this_thread::get_id();
117 if (this_thr == t.get_id()) {
150 if (!
enqueue(std::move(prep))) {
164 std::lock_guard lk(_mx);
165 if (_enqueued == _finished) {
168 _joins.push_back({_enqueued, std::move(
promise)});
169 std::push_heap(_joins.begin(), _joins.end());
176 std::lock_guard lk(_mx);
185 std::vector<std::thread> _thr;
186 mutable std::mutex _mx;
188 std::queue<
function<void()> > _que;
191 unsigned int _to_start = 0;
197 int operator<=>(
const join_info &other)
const {
198 return other._target - _target;
202 std::vector<join_info> _joins;
206 void check_join(std::unique_lock<std::mutex> &lk) {
207 promise<void> joined;
210 if (_joins.empty() || _joins.front()._target > _finished)
return;
212 joined += _joins.front()._prom;
213 std::pop_heap(_joins.begin(), _joins.end());
215 }
while (!_joins.empty() && _joins.front()._target <= _finished);
221 void notify(std::unique_lock<std::mutex> &mx) {
223 _thr.emplace_back([](
thread_pool_t *me){me->worker();},
this);
234 std::unique_lock lk(_mx);
240 auto fn = std::move(_que.front());
244 if (_current !=
this)
return;
254 template<
typename CondVar>
255 inline thread_local thread_pool_t<CondVar> *thread_pool_t<CondVar>::_current =
nullptr;
Exception is thrown on attempt to retrieve value after promise has been broken.
Contains future value of T, can be co_awaited in coroutine.
std::coroutine_handle release()
release handle
contains prepared coroutine (prepared to run)
notify reject(std::exception_ptr e)
reject the future with exception
Carries reference to future<T>, callable, sets value of an associated future<T>
bool enqueue(Fn &&fn)
enqueue function
static constexpr bool await_ready() noexcept
co_await support (never ready)
future< void > join()
wait to process all enqueued tasks
void await_suspend(std::coroutine_handle<> h)
co_await support - resumes the coroutine inside of thread_pool
static thread_pool_t * current()
returns current thread pool (in context of managed thread)
bool operator>>(Fn &&fn)
alias to enqueue
thread_pool_t(unsigned int threads)
construct thread pool
void await_resume()
co_await support (nothing returned)
void stop()
stop thread pool
requires std::invocable< Fn, Args... > auto run(Fn &&fn, Args &&... args) -> future< std::invoke_result_t< Fn, Args... > >
Run a function in the thread_pool.
bool is_stopped() const
test whether is stopped
thread pool implementation