libcoro  1.0
Coroutine support library for C++20
subscription.h
1 #pragma once
2 
3 #include "future.h"
4 
5 namespace coro {
6 
8 
28 template<typename T, typename SubscribeFn = function<prepared_coro(promise<T>)> >
29 class subscription {
30 public:
31 
32  static_assert(std::is_constructible_v<function<prepared_coro(promise<T>)>, SubscribeFn>);
33 
34  using subscribe_fn = SubscribeFn;
35 
36  using canceled_awaiter = _details::has_value_awaiter<subscription,false>;
37  using ret_value = typename future<T>::ret_value;
38 
40  subscription() = default;
42  subscription(SubscribeFn &&fn): _fn(std::move(fn)) {}
44  subscription(const SubscribeFn &fn): _fn(fn) {}
46  subscription(subscription &&other):_fn(std::move(other._fn)) {}
49  if (this != &other) {
50  _fn = std::move(other._fn);
51  }
52  return *this;
53  }
54 
56 
62  decltype(auto) get() {return _fut.await_resume();}
63 
65 
69  static constexpr bool await_ready() noexcept {return false;}
71 
75  std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) {
76  return then([h]() ->prepared_coro {return h;}).symmetric_transfer();
77  }
79 
80  ret_value await_resume() {
81  return _fut.await_resume();
82  }
83 
85 
92  canceled_awaiter operator!() {return this;}
93 
95  bool has_value() const {return _fut.has_value();}
96 
97 
99 
103  template<std::invocable Fn>
104  prepared_coro then(Fn &&fn) {
105  auto prom = _fut.get_promise();
106  _fut.then(std::forward<Fn>(fn));
107  return std::forward<SubscribeFn>(_fn)(std::move(prom));
108  }
109 
111 
115  template<std::invocable Fn>
116  void operator>>(Fn &&fn) {
117  then(std::forward<Fn>(fn));
118  }
119 
120  struct lock_guard {
121  std::atomic<bool> *unlock = nullptr;
122  void operator()(auto) {
123  unlock->store(true);
124  unlock->notify_all();
125  }
126  };
127 
128 
129  using lock_ptr = std::unique_ptr<std::remove_reference_t<ret_value>, lock_guard>;
130 
132  static constexpr lock_ptr init_lock() {return {};}
133 
135 
144  static constexpr lock_ptr init_lock(std::atomic<bool> &unlock) {return lock_ptr{
145  //hack: first argument is an pointer, which is ignored, but must not be nullptr
146  reinterpret_cast<std::remove_reference_t<ret_value> *>(&unlock), {&unlock}
147  };};
148 
150 
157  void lock(lock_ptr &ptr) {
158  std::atomic<bool> ready = {false};
159  std::atomic<bool> *unlock = nullptr;
160  then([&]{
161  std::atomic<bool> lk;
162  unlock = &lk;
163  ready.store(true);
164  ready.notify_all();
165  lk.wait(false);
166  });
167  ptr.reset();
168  ready.wait(false);
169  if (has_value()) {
170  ret_value ref = _fut.get();
171  ptr = lock_ptr(&ref, {unlock});
172  } else {
173  unlock->store(true);
174  unlock->notify_all();
175  ptr = lock_ptr(nullptr, {});
176  }
177  }
178 
179 protected:
180  future<T> _fut;
181  SubscribeFn _fn;
182 
183  future<T> &start() {
184  if (!_fut.is_pending()) _fn(_fut.get_promise());
185  return _fut;
186  }
187 
188  void wait_internal() {
190  }
191  friend class _details::wait_awaiter<subscription>;
192 
193 };
194 
195 
196 }
promise_t get_promise()
Retrieve promise and begin evaluation.
Definition: future.h:567
std::conditional_t< std::is_void_v< T >, void, std::add_rvalue_reference_t< T > > ret_value
type which is used as return value of get() and await_resume()
Definition: future.h:464
bool is_pending() const
Determines pending status.
Definition: future.h:706
Contains future value of T, can be co_awaited in coroutine.
Definition: future.h:417
contains prepared coroutine (prepared to run)
Definition: prepared_coro.h:15
Carries reference to future<T>, callable, sets value of an associated future<T>
Definition: future.h:73
subscription(const SubscribeFn &fn)
construct and pass subscribe function
Definition: subscription.h:44
subscription()=default
default constructor
subscription(SubscribeFn &&fn)
construct and pass subscribe function
Definition: subscription.h:42
static constexpr lock_ptr init_lock(std::atomic< bool > &unlock)
initialize lock_ptr, set atomic variable, which receives signal once the thread starts waiting on a s...
Definition: subscription.h:144
prepared_coro then(Fn &&fn)
define callback, which is called when value is ready
Definition: subscription.h:104
ret_value await_resume()
returns current value
Definition: subscription.h:80
void lock(lock_ptr &ptr)
subscribe and lock value during processing
Definition: subscription.h:157
void operator>>(Fn &&fn)
define callback, which is called when value is ready
Definition: subscription.h:116
decltype(auto) get()
retrieve last value
Definition: subscription.h:62
bool has_value() const
Determine, whether has value.
Definition: subscription.h:95
static constexpr bool await_ready() noexcept
co_await support
Definition: subscription.h:69
subscription(subscription &&other)
movable
Definition: subscription.h:46
canceled_awaiter operator!()
wait and determine whether operation is canceled
Definition: subscription.h:92
static constexpr lock_ptr init_lock()
initialize lock_ptr object, which can be used to call lock() to wait on subscription and process resu...
Definition: subscription.h:132
std::coroutine_handle await_suspend(std::coroutine_handle<> h)
co_await support Subscribe and wait for a value
Definition: subscription.h:75
subscription & operator=(subscription &&other)
movable
Definition: subscription.h:48
Awaiter for subscriptions.
Definition: subscription.h:29
main namespace
Definition: aggregator.h:8