5 #include "thread_pool.h"
20 template<
typename CondVar>
30 using ident_t =
const void *;
50 template<std::convertible_to<CondVar> CondVarCfg>
51 explicit scheduler_t(CondVarCfg &&cfg):_cond(std::forward<CondVarCfg>(cfg)) {}
68 std::unique_lock lk(_mx);
69 if (std::find(_blk.begin(), _blk.end(), ident) != _blk.end()) return ;
70 _items.push_back({std::move(
promise), tp, ident});
71 std::push_heap(_items.begin(), _items.end(), compare_items);
83 template<
typename A,
typename B>
85 return sleep_until(std::chrono::system_clock::now()+dur, ident);
102 template<
typename T, std::invocable<notify_t> ResumeCB>
104 bool stopflag =
false;
105 if (fut.await_ready() || !fut.set_callback([
this,&stopflag]{
106 std::lock_guard _(_mx);
110 return fut.await_resume();
113 trace::on_block([&] {
114 worker(std::forward<ResumeCB>(cb), stopflag);
117 return fut.await_resume();
131 template<
typename T, std::invocable<notify_t> ResumeCB>
133 return run(fut, std::forward<ResumeCB>(cb));
144 return run(fut,[](notify_t){});
167 return [thr = std::make_shared<thread_pool_t>(threads)](notify_t ntf){
168 thr->enqueue(std::move(ntf));
177 static auto thread_pool(std::shared_ptr<coro::thread_pool> pool) {
178 return [pool](notify_t ntf){
179 pool->enqueue(std::move(ntf));
195 template<std::invocable<notify_t> ResumeCB>
197 if (_thr.joinable())
return false;
199 _thr = std::thread([
this, rcb = std::move(resumecb)]()
mutable {
200 worker(std::move(rcb), _stop);
213 return start([](notify_t){});
218 std::unique_lock lk(_mx);
221 if (scheduler_t::_current !=
this) {
223 if (_thr.joinable()) _thr.join();
225 scheduler_t::_current =
nullptr;
226 if (_thr.joinable()) _thr.detach();
246 sch->_blk.erase(std::remove(sch->_blk.begin(),sch->_blk.end(), _ident), sch->_blk.end());
250 std::unique_ptr<scheduler_t, deleter> _ptr;
253 :_ptr(&sch, {ident}) {};
268 std::lock_guard lk(_mx);
269 d = cancel_lk(ident);
270 _blk.push_back(ident);
280 std::chrono::system_clock::time_point _tp;
284 static bool compare_items(
const item_t &a,
const item_t &b) {
285 return a._tp > b._tp;
290 std::vector<item_t> _items;
291 std::vector<ident_t> _blk;
300 template<
typename ResumeCB>
301 void worker(ResumeCB &&rcb,
bool &stop_flag) noexcept {
302 std::unique_lock lk(_mx);
307 auto now = std::chrono::system_clock::now();
308 if (!_items.empty() && _items.front()._tp < now) {
309 auto d = _items.front()._prom();
310 std::pop_heap(_items.begin(),_items.end(), compare_items);
314 if (_current !=
this)
return;
326 void wait_next(std::unique_lock<std::mutex> &lk) {
327 if (!_items.empty()) {
328 _cond.wait_until(lk, _items.front()._tp);
334 promise<void>::notify cancel_lk(ident_t ident) {
335 auto iter = std::find_if(_items.begin(), _items.end(), [&](
const item_t &item){
336 return item._ident == ident;
338 if (iter == _items.end())
return {};
339 if (iter == _items.begin()) {
340 std::pop_heap(_items.begin(), _items.end(), compare_items);
341 auto d = _items.back()._prom.cancel();
345 auto d = iter->_prom.cancel();
347 if (&x != &_items.back()) {
348 std::swap(x, _items.back());
349 std::make_heap(_items.begin(), _items.end(), compare_items);
366 template<
typename CondVar>
Move only function wrapper with small object optimization.
Carries reference to future<T>, callable, sets value of an associated future<T>
While this object is held, cancel is in effect.
scheduler_t(CondVarCfg &&cfg)
initialize the scheduler. It is not started, you must start it manually
bool start()
Start scheduler at background.
pending_cancel cancel(ident_t ident)
Cancel scheduled operation.
~scheduler_t()
stop and destroy scheduler
static auto thread_pool(unsigned int threads)
Create a thread pool for the scheduler.
bool start(ResumeCB &&resumecb)
Start scheduler at background.
void stop()
stop the running scheduler
static auto thread_pool(std::shared_ptr< coro::thread_pool > pool)
Attach a thread pool to the scheduler.
decltype(auto) run(future< T > &fut, ResumeCB &&cb)
run the scheduler in signle thread mode, stop when future is ready
future< void > sleep_until(std::chrono::system_clock::time_point tp, ident_t ident=nullptr)
sleep until specified time point
scheduler_t()=default
initialize the scheduler. It is not started, you must start it manually
static scheduler_t * current()
returns current scheduler (for the current thread)
future< void > sleep_for(std::chrono::duration< A, B > dur, ident_t ident=nullptr)
sleep for specified duration
thread pool implementation
thread_pool_t< std::condition_variable > thread_pool
Thread pool implementation.