libcoro  1.0
Coroutine support library for C++20
condition.h
1 #pragma once
2 
3 #include "prepared_coro.h"
4 
5 #include <atomic>
6 #include <mutex>
7 #include <unordered_map>
8 #include <vector>
9 #include <bit>
10 #include <optional>
11 
12 namespace coro {
13 
14 
20 namespace _details {
21 
22 
23 class abstract_condition_awaiter {
24 public:
25  prepared_coro notify() noexcept {
26  n.store(true, std::memory_order_relaxed);
27  n.notify_all();
28  return prepared_coro(_h);
29  }
30  virtual bool test(const void *addr) noexcept = 0;
31  virtual const void *get_addr() noexcept = 0;
32  virtual ~abstract_condition_awaiter() = default;
33  abstract_condition_awaiter *_next = nullptr;
34 protected:
35  std::coroutine_handle<> _h;
36  std::atomic<bool> n = {false};
37 };
38 
39 
40 
41 
42  // Function that returns true if n
43  // is prime else returns false
44  constexpr bool isPrime(std::size_t n)
45  {
46  // Corner cases
47  if (n <= 1) return false;
48  if (n <= 3) return true;
49 
50  // This is checked so that we can skip
51  // middle five numbers in below loop
52  if (n%2 == 0 || n%3 == 0) return false;
53 
54  for (std::size_t i=5; i*i<=n; i=i+6)
55  if (n%i == 0 || n%(i+2) == 0)
56  return false;
57 
58  return true;
59  }
60 
61  constexpr std::size_t next_prime_twice_than(std::size_t x) {
62  x = x * 2+1;
63  if (!(x & 1)) ++x;
64  while (!isPrime(x)) {
65  x+=2;
66  }
67  return x;
68  }
69 
70 
71 
72 
73 
74 class awaiter_map {
75 public:
76 
77  bool reg_awaiter(const void *addr, abstract_condition_awaiter *awt) {
78  trace::awaiting_ref(*reinterpret_cast<const std::uintptr_t *>(addr), awt);
79  std::lock_guard _(_mx);
80  if (awt->test(addr)) return false;
81  insert_item(addr, awt);
82  return true;
83  }
84 
85  template<std::invocable<prepared_coro> Fn>
86  void notify_addr(const void *addr, Fn &&fn) {
87  auto done_lst = get_list_to_notify(addr);
88  while (done_lst) {
89  auto tmp = done_lst;
90  done_lst = done_lst->_next;
91  fn(tmp->notify());
92  }
93  }
94 
95  static awaiter_map instance;
96 
97 protected:
98 
99  std::mutex _mx;
100  std::vector<abstract_condition_awaiter *> _hashtable;
101  std::size_t _count_keys;
102 
103 
104  abstract_condition_awaiter * &map_address(const void *address) {
105  std::uintptr_t n = std::bit_cast<std::uintptr_t>(address);
106  auto pos = n % _hashtable.size();
107  return _hashtable[pos];
108  }
109 
110  void insert_item(const void *address, abstract_condition_awaiter *awt) {
111  test_resize();
112  auto &b = map_address(address);
113  awt->_next = b;
114  b = awt;
115  if (awt->_next == nullptr) ++_count_keys;
116  }
117 
118  abstract_condition_awaiter *get_list_to_notify(const void *addr) {
119  abstract_condition_awaiter *new_lst = nullptr;
120  abstract_condition_awaiter *done_lst = nullptr;
121 
122  std::lock_guard _(_mx);
123  if (_hashtable.size() == 0) return nullptr;
124  auto &b = map_address(addr);
125  abstract_condition_awaiter *lst = b;
126  if (lst == nullptr) return nullptr;
127  while (lst != nullptr) {
128  auto tmp = lst;
129  lst = lst->_next;
130  if (tmp->test(addr)) {
131  tmp->_next = done_lst;
132  done_lst = tmp;
133  } else {
134  tmp->_next = new_lst;
135  new_lst = tmp;
136  }
137  }
138  b = new_lst;
139  if (!b) {
140  --_count_keys;
141  }
142 
143  return done_lst;
144  }
145 
146 
147  void test_resize() {
148  auto sz =_hashtable.size();
149  if (_count_keys * 2 >= sz) {
150  std::size_t newsz = _details::next_prime_twice_than(std::max<std::size_t>(sz, 16));
151  std::vector<abstract_condition_awaiter *> tmp(newsz, nullptr);
152  std::swap(tmp, _hashtable);
153  _count_keys = 0;
154  for (auto &b :tmp) {
155  while (b) {
156  auto x = b;
157  b = b->_next;
158  insert_item(x->get_addr(), x);
159  }
160  }
161  }
162  }
163 
164 
165 
166 };
167 
168 inline awaiter_map awaiter_map::instance;
169 
170 
171 }
172 
174 
194 template<typename T, typename Pred>
195 class condition : public _details::abstract_condition_awaiter {
196 public:
197 
198  static_assert(std::is_invocable_r_v<bool, Pred, T &> || std::is_invocable_r_v<bool, Pred>);
199 
201 
205  condition(T &var, Pred &&pred):_variable(var),_predicate(std::forward<Pred>(pred)) {}
206 
207 
209 
214  bool await_ready() const {
215  if constexpr (std::is_invocable_r_v<bool, Pred>) {
216  return _predicate();
217  } else {
218  return _predicate(_variable);
219  }
220  }
221 
223 
229  T &await_resume() const {
230  if (_exp) std::rethrow_exception(_exp);
231  return _variable;
232  }
233 
235 
236  bool await_suspend(std::coroutine_handle<> h) {
237  this->_h = h;
238  return _details::awaiter_map::instance.reg_awaiter(&_variable, this);
239  }
240 
242 
249  T &wait() {
250  if (!await_ready() &&
251  _details::awaiter_map::instance.reg_awaiter(&_variable, this)) {
252  this->n.wait(false);
253  }
254  return await_resume();
255  }
256 
257 
258 
259 protected:
260  T &_variable;
261  Pred _predicate;
262  std::exception_ptr _exp;
263 
264  virtual bool test(const void *addr) noexcept override{
265  try {
266  return addr == &_variable && await_ready();
267  } catch (...) {
268  _exp = std::current_exception();
269  return true;
270  }
271  }
272  virtual const void *get_addr() noexcept override {
273  return &_variable;
274  }
275 
276 };
277 
278 
279 
281 
292 template<typename T>
293 void notify_condition(const T &var) noexcept {
294  _details::awaiter_map::instance.notify_addr(&var,[](auto){});
295 }
296 
298 
312 template<typename T, std::invocable<prepared_coro> Fn>
313 void notify_condition(const T &var, Fn &&scheduler) noexcept {
314  _details::awaiter_map::instance.notify_addr(&var,scheduler);
315 }
316 
317 
319 
330 template<typename T, typename Pred>
331 void condition_sync_wait(T &var, Pred &&pred) {
332  coro::condition<T, Pred> c(var, std::forward<Pred>(pred));
333  return c.wait();
334 
335 }
336 
337 }
338 
339 
340 
341 
342 
bool await_suspend(std::coroutine_handle<> h)
co_await support
Definition: condition.h:236
condition(T &var, Pred &&pred)
constructor
Definition: condition.h:205
T & await_resume() const
co_await support
Definition: condition.h:229
T & wait()
Synchronous waiting, allows to condition be used in normal function.
Definition: condition.h:249
bool await_ready() const
co_await support
Definition: condition.h:214
await on a condition
Definition: condition.h:195
scheduler for coroutines
Definition: scheduler.h:21
void condition_sync_wait(T &var, Pred &&pred)
Perform synchronous waiting with condition.
Definition: condition.h:331
void notify_condition(const T &var) noexcept
notifies variable about change in the condition.
Definition: condition.h:293
main namespace
Definition: aggregator.h:8