libcoro  1.0
Coroutine support library for C++20
future_list.h
1 #pragma once
2 
3 #include "future.h"
4 #include "async.h"
5 #include <deque>
6 
7 #include <condition_variable>
8 namespace coro {
9 
10 
12 
25 template<typename T>
27 public:
28 
29  constexpr pointer_wrapper(T &item):_ptr(&item) {}
30 
31  constexpr T &operator* () const {return *_ptr;}
32  constexpr T *operator->() const {return _ptr;}
33  constexpr operator T &() const {return *_ptr;}
34  constexpr T *get() const {return *_ptr;}
35 
36  explicit constexpr operator bool() const {return _ptr != nullptr;}
37 
38  friend constexpr bool operator==(const pointer_wrapper<T> &a, T *b) {
39  return a._ptr == b;
40  }
41  friend constexpr bool operator==(T *a, const pointer_wrapper<T> &b) {
42  return a == b._ptr;
43  }
44  friend constexpr bool operator==(const pointer_wrapper<T> &a, const pointer_wrapper<T> &b) {
45  return a._ptr == b._ptr;
46  }
47 
48 protected:
49  T *_ptr;
50 };
51 
52 template<typename T>
53 concept is_pointer_like = (requires(T v) {
54  {*v};
55 } && std::is_move_constructible_v<T>);
56 
58 template<typename T>
59 using pointer_value = std::decay_t<decltype(*std::declval<T>())>;
60 
61 
62 template<typename T>
63 struct unwrap_pointer_like_def {using type = T;};
64 template<is_pointer_like T>
65 struct unwrap_pointer_like_def<T> {using type = pointer_value<T>;};
66 template<typename T>
67 using unwrap_pointer_like=typename unwrap_pointer_like_def<T>::type;
68 
69 
70 
71 
72 
74 template<typename T>
75 concept iterator_type = requires(T v) {
76  {*v};
77  {++v};
78  {v == v};
79 };
80 
82 template<typename T>
83 concept container_type = requires(T v) {
84  {v.begin() == v.end()};
85  {v.begin()}->iterator_type;
86  {v.end()}->iterator_type;
87  typename std::decay_t<T>::value_type;
88 };
89 
90 
92 
105 template<typename T>
106 class all_of : public future<void> {
107 public:
108 
110 
113  all_of(std::initializer_list<pointer_wrapper<T> > lst)
114  :all_of(lst.begin(), lst.end()) {}
115 
117 
120  template<container_type Container>
121  all_of(Container &&cont)
122  :all_of(cont.begin(), cont.end()) {}
123 
125 
130  template<iterator_type Iter>
131  all_of(Iter from, Iter to) {
132  _prom = this->get_promise();
133  while (from != to) {
134  ++_count;
135  if constexpr(is_pointer_like<decltype(*from)>) {
136  (*from)->then([this]{finish();});
137  } else {
138  (*from).then([this]{finish();});
139  }
140  ++from;
141  }
142  finish();
143  }
144 
145 
146 protected:
147  promise<void> _prom = {};
148  std::atomic<unsigned int> _count = {1};
149  void finish() {
150  if (--_count == 0) {
151  _prom();
152  }
153  }
154 };
155 
156 template<typename X>
157 all_of(std::initializer_list<X>) -> all_of<X>;
158 template<container_type T>
159 all_of(T) -> all_of<unwrap_pointer_like<typename T::value_type> >;
160 
161 
162 
163 
165 
175 template<typename T>
176 class when_each {
177 public:
178 
180 
181  using future_ptr = std::add_pointer_t<T>;
182 
183  using result_promise_type = typename result_future_type::promise_t;
184  using result_notify_type = typename result_promise_type::notify;
185 
186 
188 
193  class iterator {
194  public:
195 
196  using iterator_category = std::random_access_iterator_tag;
197  using value_type = typename result_future_type::value_type;
198  using reference = std::add_lvalue_reference<result_future_type>;
199  using difference_type = std::ptrdiff_t;
200 
201 
203  iterator() = default;
206  _tmp._prom = _tmp._fut.get_promise();
207  _owner->charge(this);
208  return _tmp._fut;
209  }
210 
211  void operator()(result_promise_type prom) const {
212  _tmp._prom = std::move(prom);
213  _owner->charge(this);
214  }
215 
217  iterator &operator++() {_index++;return *this;}
219  iterator operator++(int) {return iterator(_owner, _index++);}
221  iterator &operator--() {_index--;return *this;}
223  iterator operator--(int) {return iterator(_owner, _index--);}
225  iterator &operator+=(std::ptrdiff_t n) {_index += n;return *this;}
227  iterator &operator-=(std::ptrdiff_t n) {_index -= n;return *this;}
229  std::ptrdiff_t operator-(const iterator &other) const {
230  return static_cast<std::ptrdiff_t>(_index)-static_cast<std::ptrdiff_t>(other._index);
231  }
233  bool operator==(const iterator &other) const {return operator-(other) == 0;}
234 
235 
236  protected:
237  iterator(when_each *owner, std::size_t index):_owner(owner),_index(index) {}
238 
239  struct tmp {
240  result_promise_type _prom;
241  result_future_type _fut;
242  tmp() = default;
243  tmp(tmp &&other):_prom(std::move(other._prom)) {}
244  tmp &operator=(tmp &&other) {
245  _prom = std::move(other._prom);
246  return *this;
247  }
248  };
249 
250  when_each *_owner = nullptr;
251  std::size_t _index = 0;
252  mutable tmp _tmp;
253  friend class when_each;
254 
255  };
256 
258 
261  iterator begin() {return {this,0};}
263  iterator end() {return {this,_sep};}
264 
266 
269  when_each(std::initializer_list<pointer_wrapper<T> > lst)
270  :when_each(lst.begin(), lst.end()) {}
271 
273 
276  template<container_type Container>
277  when_each(Container &cont)
278  :when_each(cont.begin(), cont.end()) {}
279 
281 
285  template<iterator_type Iter>
286  when_each(Iter from, Iter to) {
287  std::lock_guard _(_mx);
288  while (from != to) {
289  if constexpr(is_pointer_like<decltype(*from)>) {
290  future_ptr ptr = &(*(*from));
291  _awaiting.push_back(ptr);
292  trace::awaiting_ref(*ptr, this);
293  } else {
294  future_ptr ptr = &(*from);
295  _awaiting.push_back(ptr);
296  trace::awaiting_ref(*ptr, this);
297  }
298  ++from;
299  }
300  _sep = _awaiting.size();
301  for (std::size_t i = 0; i < _sep; ++i) {
302  future_ptr ptr = _awaiting[i];
303  if (!ptr->set_callback([this, ptr]{
304  return finish(ptr).symmetric_transfer();
305  })) {
306  _awaiting.push_back(ptr);
307 
308  }
309  }
310 
311  }
312 
313  ~when_each() {
314  cleanup();
315  }
316 
317 
318 protected:
319  std::vector<future_ptr> _awaiting;
320  std::basic_string<const iterator *> _iters;
321  std::size_t _sep;
322  mutable std::mutex _mx;
323  std::condition_variable *_cond = nullptr;
324 
325  auto remain_pending() {
326  return 2*_sep - _awaiting.size();
327  }
328 
329  result_notify_type finish(future_ptr ptr) {
330  std::lock_guard _(_mx);
331  auto idx = _awaiting.size() - _sep;
332  _awaiting.push_back(ptr);
333  result_promise_type proms;
334  _iters.erase( std::remove_if(_iters.begin(), _iters.end(), [&](const iterator *it){
335  if (it->_index == idx) {
336  proms += it->_tmp._prom;
337  return true;
338  }
339  return false;
340  }),_iters.end());
341  if (_cond) _cond->notify_all();;
342  if (proms) return ptr->forward_to(proms);
343  return {};
344  }
345 
346 
347  bool charge(const iterator *it) {
348  result_notify_type ntf;
349  std::lock_guard _(_mx);
350  auto idx = it->_index+_sep;
351  if (idx < _awaiting.size()) {
352  ntf = _awaiting[idx]->forward_to(it->_tmp._prom);
353  return false;
354  }
355  _iters.push_back(it);
356  return true;
357  }
358 
359  void cleanup() {
360  if (remain_pending()) {
361  std::unique_lock lk(_mx);
362  std::size_t found_remain = 0;
363  for (std::size_t i = 0; i < _sep; ++i) {
364  if (_awaiting[i]->set_callback([]{})) ++found_remain;
365  }
366  if (remain_pending() != found_remain) {
367  std::condition_variable cond;
368  _cond = &cond;
369  cond.wait(lk, [&]{
370  return remain_pending() == found_remain;
371  });
372  }
373  }
374  }
375 
376 };
377 
378 template<typename X>
379 when_each(std::initializer_list<X>) -> when_each<X>;
380 template<container_type T>
381 when_each(T) -> when_each<unwrap_pointer_like<typename T::value_type> >;
382 
383 
385 
395 template<typename T>
396 class any_of : public when_each<T>::result_future_type {
397 public:
398 
399  any_of(std::initializer_list<pointer_wrapper<T> > lst)
400  :any_of(lst.begin(), lst.end()) {}
401 
402  template<container_type Container>
403  any_of(Container &&cont)
404  :any_of(cont.begin(), cont.end()) {}
405 
406  template<iterator_type Iter>
407  any_of(Iter from, Iter to)
408  :_each_of(std::move(from), std::move(to)) {
409 
410  _iter = _each_of.begin();
411  _iter(this->get_promise());
412 
413  }
414 
415 
416 protected:
417  when_each<T> _each_of;
418  typename when_each<T>::iterator _iter;
419 
420 };
421 
422 template<typename X>
423 any_of(std::initializer_list<X>) -> any_of<X>;
424 template<container_type T>
426 
427 
428 
429 
431 
437 template<typename T>
438 class task_list: public std::deque<T> {
439 public:
440 
441  using std::deque<T>::deque;
442 
444 
451  template<std::invocable Fn>
452  void operator<<(Fn &&fn) {
453  static_assert(std::is_same_v<std::invoke_result_t<Fn>, T>, "Return value type mismatch");
454  this->emplace_back(construct_using<Fn>(fn));
455  }
456 
457 
459 
466  template<std::invocable Fn>
467  friend void operator>>(Fn &&fn, task_list &lst) {
468  static_assert(std::is_same_v<std::invoke_result_t<Fn>, T>, "Return value type mismatch");
469  lst.emplace_front(construct_using<Fn &>(fn));
470  }
471 
472  using std::deque<T>::push_back;
473  using std::deque<T>::push_front;
474 
476  template<typename X, typename Y> requires std::constructible_from<T, async<X,Y> >
478  (*this) << [&]()->T{return x;};
479  }
481  template<typename X, typename Y> requires std::constructible_from<T, async<X,Y> >
483  [&]()->T{return x;} >> (*this);
484  }
485 
486 };
487 
488 
489 }
490 
491 
492 
493 
all_of(std::initializer_list< pointer_wrapper< T > > lst)
construct using initializer list
Definition: future_list.h:113
all_of(Iter from, Iter to)
construct using iterator pair
Definition: future_list.h:131
all_of(Container &&cont)
construct using a container
Definition: future_list.h:121
this awaitable is resolved, when all objects specified by constructor are resolved
Definition: future_list.h:106
Awaitable returns result of a first complete object.
Definition: future_list.h:396
COROUTINE: Coroutine for asynchronous operation.
Definition: async.h:55
Constructor for emplace.
Definition: construct.h:20
promise_t get_promise()
Retrieve promise and begin evaluation.
Definition: future.h:567
Contains future value of T, can be co_awaited in coroutine.
Definition: future.h:417
Wraps object into pointer.
Definition: future_list.h:26
friend void operator>>(Fn &&fn, task_list &lst)
Redirect return value to the task list.
Definition: future_list.h:467
requires std::constructible_from< T, async< X, Y > > void push_back(async< X, Y > x)
Start async function and store result to the list (as last item)
Definition: future_list.h:477
requires std::constructible_from< T, async< X, Y > > void push_front(async< X, Y > x)
Start async function and store result to the list (as first item)
Definition: future_list.h:482
void operator<<(Fn &&fn)
Redirect return value to the task list.
Definition: future_list.h:452
helps to create task list: list of started task as list of futures
Definition: future_list.h:438
bool operator==(const iterator &other) const
comparison
Definition: future_list.h:233
std::ptrdiff_t operator-(const iterator &other) const
difference
Definition: future_list.h:229
iterator operator++(int)
go to next item
Definition: future_list.h:219
iterator()=default
default construct - note such iterator cannot be used for iterating
iterator & operator+=(std::ptrdiff_t n)
skip n items
Definition: future_list.h:225
result_future_type & operator*() const
retrieve item. Note that result is awaitable reference. You need co_await the result;
Definition: future_list.h:205
iterator & operator-=(std::ptrdiff_t n)
skip n items
Definition: future_list.h:227
iterator & operator--()
go to previous item
Definition: future_list.h:221
iterator operator--(int)
go to previous item
Definition: future_list.h:223
iterator & operator++()
go to next item
Definition: future_list.h:217
iterator for when_each - it always return awaitable reference (future)
Definition: future_list.h:193
when_each(Iter from, Iter to)
construct using iterator pair
Definition: future_list.h:286
iterator end()
retrieve end iterator
Definition: future_list.h:263
iterator begin()
retrieve iterator
Definition: future_list.h:261
when_each(Container &cont)
construct using a container
Definition: future_list.h:277
when_each(std::initializer_list< pointer_wrapper< T > > lst)
construct using initializer list
Definition: future_list.h:269
Process all futures in order of completion.
Definition: future_list.h:176
concept iterator_type
any iterator
Definition: future_list.h:75
std::decay_t< decltype(*std::declval< T >())> pointer_value
determines type of pointer value
Definition: future_list.h:59
concept container_type
any container (must have begin and end)
Definition: future_list.h:83
main namespace
Definition: aggregator.h:8