libcoro  1.0
Coroutine support library for C++20
scheduler.h
1 #pragma once
2 
3 #include "function.h"
4 #include "future.h"
5 #include "thread_pool.h"
6 
7 #include <mutex>
8 #include <chrono>
9 namespace coro {
10 
11 
13 
20 template<typename CondVar>
21 class scheduler_t {
22 public:
23 
24  using notify_t = promise<void>::notify;
25 
27  using resume_cb = function<void(notify_t)>;
28 
29  using promise_t = promise<void>;
30  using ident_t = const void *;
31 
33 
39  scheduler_t() = default;
40 
42 
50  template<std::convertible_to<CondVar> CondVarCfg>
51  explicit scheduler_t(CondVarCfg &&cfg):_cond(std::forward<CondVarCfg>(cfg)) {}
52 
55  stop();
56  }
57 
58 
60 
66  future<void> sleep_until(std::chrono::system_clock::time_point tp, ident_t ident = nullptr) {
67  return [&](auto promise) {
68  std::unique_lock lk(_mx);
69  if (std::find(_blk.begin(), _blk.end(), ident) != _blk.end()) return ;
70  _items.push_back({std::move(promise), tp, ident});
71  std::push_heap(_items.begin(), _items.end(), compare_items);
72  notify();
73  };
74  }
75 
77 
83  template<typename A, typename B>
84  future<void> sleep_for(std::chrono::duration<A,B> dur, ident_t ident = nullptr) {
85  return sleep_until(std::chrono::system_clock::now()+dur, ident);
86  }
87 
88 
89 
90 
92 
102  template<typename T, std::invocable<notify_t> ResumeCB>
103  decltype(auto) run(future<T> &fut, ResumeCB &&cb) {
104  bool stopflag = false;
105  if (fut.await_ready() || !fut.set_callback([this,&stopflag]{
106  std::lock_guard _(_mx);
107  stopflag = true;
108  notify();
109  })) {
110  return fut.await_resume();
111  }
112  auto cur = _current;
113  trace::on_block([&] {
114  worker(std::forward<ResumeCB>(cb), stopflag);
115  }, fut);;
116  _current = cur;
117  return fut.await_resume();
118  }
119 
121 
131  template<typename T, std::invocable<notify_t> ResumeCB>
132  decltype(auto) run(future<T> &&fut, ResumeCB &&cb) {
133  return run(fut, std::forward<ResumeCB>(cb));
134  }
135 
137 
142  template<typename T>
143  decltype(auto) run(future<T> &fut) {
144  return run(fut,[](notify_t){});
145  }
146 
148 
153  template<typename T>
154  decltype(auto) run(future<T> &&fut) {
155  return run(fut);
156  }
157 
159 
161 
166  static auto thread_pool(unsigned int threads) {
167  return [thr = std::make_shared<thread_pool_t>(threads)](notify_t ntf){
168  thr->enqueue(std::move(ntf));
169  };
170  }
172 
177  static auto thread_pool(std::shared_ptr<coro::thread_pool> pool) {
178  return [pool](notify_t ntf){
179  pool->enqueue(std::move(ntf));
180  };
181  }
182 
184 
195  template<std::invocable<notify_t> ResumeCB>
196  bool start(ResumeCB &&resumecb) {
197  if (_thr.joinable()) return false;
198  _stop = false;
199  _thr = std::thread([this, rcb = std::move(resumecb)]() mutable {
200  worker(std::move(rcb), _stop);
201  });
202  return true;
203  }
204 
206 
212  bool start() {
213  return start([](notify_t){});
214  }
215 
217  void stop() {
218  std::unique_lock lk(_mx);
219  _stop = true;
220  lk.unlock();
221  if (scheduler_t::_current != this) {
222  notify();
223  if (_thr.joinable()) _thr.join();
224  } else {
225  scheduler_t::_current = nullptr;
226  if (_thr.joinable()) _thr.detach();
227  }
228  }
229 
230 
232 
237  static scheduler_t * current() {return _current;}
238 
239 
242  private:
243  struct deleter {
244  ident_t _ident;
245  void operator()(scheduler_t *sch) {
246  sch->_blk.erase(std::remove(sch->_blk.begin(),sch->_blk.end(), _ident), sch->_blk.end());
247  }
248  };
249 
250  std::unique_ptr<scheduler_t, deleter> _ptr;
251 
252  pending_cancel(scheduler_t &sch, ident_t ident)
253  :_ptr(&sch, {ident}) {};
254 
255  friend class scheduler_t;
256  };
257 
259 
266  pending_cancel cancel(ident_t ident) {
268  std::lock_guard lk(_mx);
269  d = cancel_lk(ident);
270  _blk.push_back(ident);
271  return pending_cancel(*this, ident);
272  }
273 
274 
275 
276 protected:
277 
278  struct item_t {
279  promise_t _prom;
280  std::chrono::system_clock::time_point _tp;
281  ident_t _ident;
282  };
283 
284  static bool compare_items(const item_t &a, const item_t &b) {
285  return a._tp > b._tp;
286  }
287 
288  std::mutex _mx;
289  CondVar _cond;
290  std::vector<item_t> _items;
291  std::vector<ident_t> _blk;
292  bool _stop = false;
293  static thread_local scheduler_t *_current;
294  std::thread _thr;
295 
296  void notify() {
297  _cond.notify_all();
298  }
299 
300  template<typename ResumeCB>
301  void worker(ResumeCB &&rcb, bool &stop_flag) noexcept {
302  std::unique_lock lk(_mx);
303 
304  _current = this;
305 
306  while (!stop_flag) {
307  auto now = std::chrono::system_clock::now();
308  if (!_items.empty() && _items.front()._tp < now) {
309  auto d = _items.front()._prom();
310  std::pop_heap(_items.begin(),_items.end(), compare_items);
311  _items.pop_back();
312  lk.unlock();
313  rcb(std::move(d));
314  if (_current != this) return;
315  lk.lock();
316  } else {
317  wait_next(lk);
318  }
319  }
320 
321  _current = nullptr;
322 
323 
324  }
325 
326  void wait_next(std::unique_lock<std::mutex> &lk) {
327  if (!_items.empty()) {
328  _cond.wait_until(lk, _items.front()._tp);
329  } else {
330  _cond.wait(lk);
331  }
332  }
333 
334  promise<void>::notify cancel_lk(ident_t ident) {
335  auto iter = std::find_if(_items.begin(), _items.end(), [&](const item_t &item){
336  return item._ident == ident;
337  });
338  if (iter == _items.end()) return {};
339  if (iter == _items.begin()) {
340  std::pop_heap(_items.begin(), _items.end(), compare_items);
341  auto d = _items.back()._prom.cancel();
342  _items.pop_back();
343  return d;
344  } else {
345  auto d = iter->_prom.cancel();
346  item_t &x = *iter;
347  if (&x != &_items.back()) {
348  std::swap(x, _items.back());
349  std::make_heap(_items.begin(), _items.end(), compare_items);
350  }
351  _items.pop_back();
352  return d;
353  }
354  }
355 
356 
357 };
359 
365 
366 template<typename CondVar>
367 inline thread_local scheduler_t<CondVar> *scheduler_t<CondVar>::_current = nullptr;
368 
369 }
370 
Move only function wrapper with small object optimization.
Definition: function.h:23
Carries reference to future<T>, callable, sets value of an associated future<T>
Definition: future.h:73
While this object is held, cancel is in effect.
Definition: scheduler.h:241
scheduler_t(CondVarCfg &&cfg)
initialize the scheduler. It is not started, you must start it manually
Definition: scheduler.h:51
bool start()
Start scheduler at background.
Definition: scheduler.h:212
pending_cancel cancel(ident_t ident)
Cancel scheduled operation.
Definition: scheduler.h:266
~scheduler_t()
stop and destroy scheduler
Definition: scheduler.h:54
static auto thread_pool(unsigned int threads)
Create a thread pool for the scheduler.
Definition: scheduler.h:166
bool start(ResumeCB &&resumecb)
Start scheduler at background.
Definition: scheduler.h:196
void stop()
stop the running scheduler
Definition: scheduler.h:217
static auto thread_pool(std::shared_ptr< coro::thread_pool > pool)
Attach a thread pool to the scheduler.
Definition: scheduler.h:177
decltype(auto) run(future< T > &fut, ResumeCB &&cb)
run the scheduler in signle thread mode, stop when future is ready
Definition: scheduler.h:103
future< void > sleep_until(std::chrono::system_clock::time_point tp, ident_t ident=nullptr)
sleep until specified time point
Definition: scheduler.h:66
scheduler_t()=default
initialize the scheduler. It is not started, you must start it manually
static scheduler_t * current()
returns current scheduler (for the current thread)
Definition: scheduler.h:237
future< void > sleep_for(std::chrono::duration< A, B > dur, ident_t ident=nullptr)
sleep for specified duration
Definition: scheduler.h:84
scheduler for coroutines
Definition: scheduler.h:21
thread pool implementation
Definition: thread_pool.h:27
thread_pool_t< std::condition_variable > thread_pool
Thread pool implementation.
Definition: thread_pool.h:266
main namespace
Definition: aggregator.h:8