libcoro  1.0
Coroutine support library for C++20
collector.h
1 #pragma once
2 
3 #include "future.h"
4 #include "async.h"
5 
6 
7 namespace coro {
8 
9 namespace _details {
10 
11  template<typename Collectible>
12  class collectible_factory {
13  public:
14  virtual ~collectible_factory() = default;
15  virtual Collectible create() = 0;
16  };
17 
18  template<typename Collectible, typename Fn>
19  class collectible_factory_fn: public collectible_factory<Collectible> {
20  public:
21  collectible_factory_fn(Fn &&fn):_fn(std::forward<Fn>(fn)) {}
22  virtual Collectible create() {return _fn();}
23  protected:
24  std::decay_t<Fn> _fn;
25  };
26 
27  template<typename Collectible, typename Fn>
28  auto create_collectible_factory(Fn &&fn) {
29  return collectible_factory_fn<Collectible, Fn>(std::forward<Fn>(fn));
30  }
31 
32 }
33 
35 
43 template<typename Collectible, typename Result, coro_allocator Alloc = std_allocator>
44 class collector: public future<Result> {
45 public:
46 
47  class promise_type: public _details::coro_promise<Result> {
48  public:
49  std::suspend_never initial_suspend() const noexcept {return {};}
50 
51  promise_type() {
52  trace::set_class(std::coroutine_handle<promise_type>::from_promise(*this), typeid(collector).name());
53  }
54 
55  struct final_awaiter: std::suspend_always {
56  std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> me) noexcept {
57  promise_type &self = me.promise();
58  self.set_resolved();
59  return trace::on_switch(me,self._waiting(true).symmetric_transfer(),{});
60  }
61  };
62 
63  final_awaiter final_suspend() const noexcept {return {};}
64  collector get_return_object() {return {this};}
65 
66  struct yield_awaiter {
67  promise_type *self;
68  static constexpr bool await_ready() noexcept {return false;}
69  std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> me, std::source_location loc = std::source_location::current()) {
70  self = &me.promise();
71  return trace::on_switch(me, self->_waiting(false).symmetric_transfer(),&loc);
72  }
73  Collectible await_resume() {
74  return self->_factory->create();
75  }
76  };
77 
78  template<typename X>
79  yield_awaiter yield_value([[maybe_unused]] X &&x) {
80  trace::on_yield(std::coroutine_handle<const promise_type>::from_promise(*this),x);
81  return {};
82  }
83 
84  prepared_coro connect(collector *col) {
85  prepared_coro out;
86  if (this->fut != col) {
87  if (this->fut) out = this->set_resolved();
88  this->fut = col->get_promise().release();
89  }
90  return out;
91  }
92 
93  void resume() {
94  auto h =std::coroutine_handle<promise_type>::from_promise(*this);
95  trace::awaiting_ref(h, _waiting.get_future());
96  trace::resume(h);
97  }
98  void destroy() {
99  std::coroutine_handle<promise_type>::from_promise(*this).destroy();
100  }
101  bool done() const {
102  return std::coroutine_handle<const promise_type>::from_promise(*this).done();
103  }
104 
105  _details::collectible_factory<Collectible> *_factory;
106  promise<bool> _waiting;
107 
108  };
109 
111  collector() = default;
113  collector(collector &&other):_prom(std::move(other._prom)) {
114  _prom->connect(this);
115  }
117  template<typename A>
118  collector(collector<Collectible, Result, A> &&other): _prom(cast_promise(other._prom.release())) {
119 
120  }
121 
122 
125  if (this != &other) {
126  std::destroy_at(this);
127  std::construct_at(this, std::move(other));
128  }
129  return *this;
130  }
131 
133 
140  template<typename ... Args>
141  future<bool> operator()(Args &&... args) {
142  static_assert(future_constructible<Collectible, Args ...>);
143  return [&](auto promise) {
144  if (_prom->done()) {
145  promise(true);
146  return;
147  }
148  _prom->_waiting = std::move(promise);
149 
150  if constexpr(sizeof...(Args) == 1 && (invocable_r_exact<Args, Collectible> && ...)) {
151  [this](auto &&arg, auto &&...) {
152  this->set_factory_resume([&]()->Collectible{
153  return arg();
154  });
155  }();
156  } else {
157  this->set_factory_resume([&]{
158  return Collectible(std::forward<Args>(args)...);
159  });
160  }
161  };
162  }
163 
164  operator ident_t() const {return std::coroutine_handle<promise_type>::from_promise(*_prom);}
165 
166 protected:
167 
168  collector(promise_type *p):_prom(p) {
169  _prom->connect(this);
170  }
171 
172  template<typename X>
173  static promise_type *cast_promise(X *other) {
174  return static_cast<promise_type *>(static_cast<_details::coro_promise_base<Result> *>(other));
175  }
176 
177 
178  struct Deleter {
179  void operator()(promise_type *x) {
180  x->destroy();
181  }
182  };
183 
184  template<typename A, typename B, coro_allocator C> friend class collector;
185 
186  std::unique_ptr<promise_type,Deleter> _prom;
187 
188  template<typename Fn>
189  void set_factory_resume(Fn &&fn) {
190  _details::collectible_factory_fn<Collectible, Fn> f(std::forward<Fn>(fn));
191  _prom->_factory = &f;
192  _prom->resume();
193  }
194 
195 
196 };
197 
198 
199 
200 }
collector & operator=(collector &&other)
assign by move
Definition: collector.h:124
future< bool > operator()(Args &&... args)
call collected and push next collectible item
Definition: collector.h:141
collector(collector &&other)
move
Definition: collector.h:113
collector()=default
construct uninitalized collector - you can initialize it later by assignment
collector(collector< Collectible, Result, A > &&other)
convert from different allocator
Definition: collector.h:118
The collector is a reversed generator. The coroutine consumes values and then returns a result.
Definition: collector.h:44
promise_t get_promise()
Retrieve promise and begin evaluation.
Definition: future.h:567
std::coroutine_handle await_suspend(std::coroutine_handle<> h) noexcept
co_await support, called with suspended coroutine
Definition: future.h:753
bool await_ready() const noexcept
co_await support, returns true, if value is ready (resolved)
Definition: future.h:738
Contains future value of T, can be co_awaited in coroutine.
Definition: future.h:417
contains prepared coroutine (prepared to run)
Definition: prepared_coro.h:15
FutureType * release()
Release the future pointer from the promise object.
Definition: future.h:273
Carries reference to future<T>, callable, sets value of an associated future<T>
Definition: future.h:73
void resume(std::coroutine_handle<> h) noexcept
Record resumption of an coroutine.
Definition: trace.h:382
std::coroutine_handle on_switch(std::coroutine_handle<>, std::coroutine_handle<> to, const void *)
Record switch (symmetric transfer) from one coroutine to other.
Definition: trace.h:393
main namespace
Definition: aggregator.h:8