libcoro  1.0
Coroutine support library for C++20
future.h
1 #pragma once
2 #include "common.h"
3 #include "function.h"
4 #include "exceptions.h"
5 
6 #include "prepared_coro.h"
7 #include "allocator.h"
8 #include "construct.h"
9 
10 #include <atomic>
11 #include <memory>
12 #include <optional>
13 #include <utility>
14 #include <bit>
15 
16 namespace coro {
17 
19 
23 template<typename T>
24 class future;
26 
35 template<typename T, bool atomic = false>
36 class promise;
38 
47 template<typename T>
48 class deferred_future;
49 
50 namespace _details {
51 template<typename T> class coro_promise_base;
52 };
53 
54 template<typename T, typename ... Args>
55 concept avoid_same_kind = sizeof...(Args) != 1 || !std::is_same_v<T, std::decay_t<Args>...>;
56 
57 template<typename Alloc>
58 concept is_allocator = requires(Alloc alloc, std::size_t n) {
59  {alloc.allocate(n)} -> std::same_as<typename Alloc::value_type *>;
60 };
61 
62 template<typename T, typename Ret, typename ... Args>
63 concept invocable_r_exact = std::is_same_v<std::invoke_result_t<T, Args...>, Ret>;
64 
65 template<typename T, typename ... U>
66 concept future_constructible = ((sizeof...(U) == 1 && (invocable_r_exact<U, T> && ...)) || std::is_constructible_v<T, U...>);
67 
68 template<typename T>
69 using atomic_promise = promise<T, true>;
70 
71 
72 template<typename T, bool atomic>
73 class promise {
74 public:
75 
78 
79 
81 
91  class notify {
92  public:
93  using FutureType = future<T>;
94 
95  notify() = default;
96 
98  void cancel() {_ptr->clearStorage();}
100  void deliver() {_ptr.reset();}
101 
103 
111  template<std::invocable<function<prepared_coro()> > Fn>
112  void deliver(Fn &&fn) {
113  auto ptr = _ptr.release();
114  if (ptr) ptr->set_resolved(std::forward<Fn>(fn));
115  }
116 
117 
119 
127  std::coroutine_handle<> symmetric_transfer() {
128  auto ptr = _ptr.release();
129  if (ptr) {
130  prepared_coro retval;
131  ptr->set_resolved([&](auto &&awt){
132  retval = awt();
133  });
134  return retval.symmetric_transfer();
135  } else {
136  return std::noop_coroutine();
137  }
138 
139  }
141  explicit operator bool() const {return static_cast<bool>(_ptr);}
142 
143  void operator()() {
144  _ptr.reset();
145  }
146 
149  if (_ptr == nullptr) return std::move(other);
150  _ptr->attach(other._ptr.release());
151  return std::move(*this);
152  }
153 
156  if (_ptr == nullptr) {
157  _ptr = std::move(other._ptr);
158  } else {
159  _ptr->attach(other._ptr.release());
160  }
161  return *this;
162  }
163 
166  return operator+=(other);
167  }
168 
169  protected:
170 
172  notify(FutureType *fut):_ptr(fut) {}
173 
174  struct NotifyAction {
175  void operator()(FutureType *fut) {fut->set_resolved([](auto &c){c();});}
176  };
177 
178  std::unique_ptr<FutureType, NotifyAction> _ptr;
179 
180  friend class promise<T>;
181  };
182 
183 
184 
185 
187  promise() = default;
188 
190 
195  explicit promise(FutureType *ptr):_ptr(ptr) {}
196 
198  promise(promise &&other):_ptr(other.claim()) {}
199 
201  template<bool x>
202  promise(promise<T, x> &&other): _ptr(other.claim()) {}
203 
206  if (this != &other) {
207  cancel();
208  _ptr.store(other.claim(), std::memory_order_relaxed);
209  }
210  return *this;
211  }
212 
215  cancel();
216  }
217 
219 
223  return claim();
224  }
225 
227 
231  template<typename ... Args>
232  notify operator()(Args && ... args) {
233  static_assert((std::is_void_v<T> && sizeof...(Args) == 0)
234  || future_constructible<T, Args ...>, "Value is not constructible from arguments");
235  auto fut = claim();
236  if (fut) fut->set_value(std::forward<Args>(args)...);
237  return fut;
238  }
239 
241 
245  notify reject(std::exception_ptr e) {
246  auto fut = claim();
247  if (fut) fut->set_exception(std::move(e));
248  return fut;
249  }
250 
252 
258  auto e = std::current_exception();
259  return e?reject(std::move(e)):cancel();
260  }
261 
263  template<typename E>
264  notify reject(E && exception) {
265  return reject(std::make_exception_ptr<std::decay_t<E> >(std::forward<E>(exception)));
266  }
267 
269 
274  return claim();
275  }
276 
278 
285  const FutureType *get_future() const {
286  return _ptr.load(std::memory_order_relaxed);
287  }
288 
289 
291 
303  template<bool f>
305  promise z = other + (*this);
306  _ptr = z.claim();
307  return *this;
308  }
309 
310  template<bool f>
311  promise &operator+=(promise<T, f> &&other) {
312  return this->operator+=(other);
313  }
314 
316 
328  template<bool f>
330  static_assert(std::is_void_v<T> || std::is_copy_constructible_v<T>, "Requires T copy constructible");
331  auto a = claim();
332  auto b = other.claim();
333  if (!a) return promise(b);
334  a->attach(b);
335  return promise(a);
336  }
337 
338  operator bool () const {
339  return _ptr.load(std::memory_order_relaxed) != nullptr;
340  }
341 
342 protected:
343 
344  struct non_atomic {
345  FutureType *_ptr = {};
346  non_atomic() = default;
347  non_atomic(FutureType *ptr):_ptr(ptr) {}
348  FutureType *exchange(FutureType *x, std::memory_order) {
349  return std::exchange(_ptr, x);
350  }
351  void store(FutureType *x, std::memory_order) {
352  _ptr = x;
353  }
354  FutureType *load(std::memory_order) const {
355  return _ptr;
356  }
357  bool compare_exchange_strong(FutureType *& old, FutureType *nw) {
358  if (_ptr == old) {
359  _ptr = nw;
360  return true;
361  } else {
362  old = _ptr;
363  return false;
364  }
365  }
366  };
367 
368  using Holder = std::conditional_t<atomic, std::atomic<FutureType *>, non_atomic>;
369 
370  Holder _ptr = {};
371 
372  FutureType *claim() {return _ptr.exchange(nullptr, std::memory_order_relaxed);}
373 };
374 
375 struct deferred_tag {};
376 
377 inline constexpr deferred_tag deferred= {};
378 
379 namespace _details {
380 
381  template<typename T>
382  class wait_awaiter {
383  public:
384  wait_awaiter(T *fut):self(fut) {}
385  wait_awaiter(const wait_awaiter &other) = delete;
386  wait_awaiter &operator=(const wait_awaiter &other) = delete;
387  ~wait_awaiter() {wait_now();}
388  bool await_ready() const noexcept {return self->await_ready();}
389  auto await_suspend(std::coroutine_handle<> h) noexcept {return self->await_suspend(h);}
390  void await_resume() const noexcept {}
391  protected:
392  T *self;
393  void wait_now() {
394  self->wait_internal();
395  }
396 
397  };
398 
399  template<typename T, bool flag>
400  class [[nodiscard]] has_value_awaiter: public wait_awaiter<T> {
401  public:
402  using wait_awaiter<T>::wait_awaiter;
403  bool await_resume() const noexcept {
404  return this->self->has_value() == flag;
405  }
406  has_value_awaiter<T, !flag> operator !() const {return this->self;}
407  operator bool() {
408  this->wait_now();
409  return await_resume();
410  }
411  };
412 
413 }
414 
415 
416 template<typename T>
417 class [[nodiscard]] future {
418 public:
419 
420  /*
421  * +-------------------+
422  * | chain | 8
423  * +-------------------+
424  * | atomic(_state) | 1
425  * +-------------------+
426  * | enum(_result) | 1
427  * +-------------------+
428  * | await_cleanup | 1
429  * +-------------------+ (padding 5)
430  * +-------------------+
431  * | |
432  * | * awaiter |
433  * | * deferred | 32
434  * | |
435  * +-------------------+
436  * | |
437  * | * exception |
438  * | * value | 8+
439  * | |
440  * +-------------------+
441  *
442  * 56+
443  */
444 
445 
446  using value_type = T;
447  using promise_t = promise<T>;
448  using value_store_type = std::conditional_t<std::is_reference_v<T>,
449  std::add_pointer_t<std::remove_reference_t<T> >,
450  std::conditional_t<std::is_void_v<T>, bool, T> >;
452 
457  using cast_ret_value = std::conditional_t<std::is_void_v<T>, bool, std::add_rvalue_reference_t<T> >;
459 
464  using ret_value = std::conditional_t<std::is_void_v<T>, void, std::add_rvalue_reference_t<T> >;
465  using awaiter_type = function<prepared_coro()>;
466  using deferred_eval_fn_type = function<prepared_coro(promise_t)>;
467  using wait_awaiter = _details::wait_awaiter<future>;
468  using canceled_awaiter = _details::has_value_awaiter<future,false>;
469 
470  future() {/*can't be default*/}
471 
472  future(const future &) = delete; //<can't copy nor move
473  future &operator=(const future &) = delete; //<can't copy nor move
474 
476  template<typename ... Args>
477  requires (std::constructible_from<value_store_type, Args ...> && avoid_same_kind<future, Args...>)
478  future(Args && ... args):_result(Result::value), _value(std::forward<Args>(args)...) {}
479 
480 
482 
485  template<typename ... Args>
486  future(std::in_place_t, Args &&... args): _result(Result::value), _value(std::forward<Args>(args)...) {}
487 
489  future(std::exception_ptr e):_result(Result::exception), _exception(std::move(e)) {}
490 
492 
495  template<std::invocable<promise_t> Fn>
496  future(Fn &&fn):_state(State::pending) {
497  std::forward<Fn>(fn)(promise_t(this));
498  }
499 
501 
508  template<std::invocable<promise_t> Fn>
509  future(deferred_tag, Fn &&fn) {
510  setDeferredEvaluation(std::forward<Fn>(fn));
511  }
512 
513  template<invocable_r_exact<future<T> > Fn>
514  future(Fn &&fn) {
515  new(this) future(std::forward<Fn>(fn)());
516  }
517 
519 
523  checkInProgress();
524  if (_state.load(std::memory_order_relaxed) == State::deferred) {
525  std::destroy_at(&_deferred);
526  }
527  clearStorage();
528  }
529 
531 
547  template<invocable_r_exact<future> Fn>
548  future &operator << (Fn &&fn) {
549  std::destroy_at(this);
550  try {
551  std::construct_at(this, std::forward<Fn>(fn));
552  } catch (...) {
553  std::construct_at(this);
554  }
555  return *this;
556  }
557 
559 
568  auto old = State::resolved;
569  if (!_state.compare_exchange_strong(old, State::pending)) {
570  if (old == State::deferred) {
571  std::destroy_at(&_deferred);
572  _state.store(State::pending);
573  } else {
574  throw still_pending_exception();
575  }
576  }
577  clearStorage();
578  _chain = nullptr;
579  return promise_t{this};
580  }
581 
583 
592  prepared_coro out;
593  if (is_deferred()) {
594  startDeferredEvaluation([&](auto &&h){out = std::move(h);});
595  }
596  return out;
597  }
598 
600 
616  template<std::invocable<> Fn>
617  [[nodiscard]] bool set_callback(Fn &&fn) {
618  return RegAwtRes::constructed_ready == register_awaiter(std::forward<Fn>(fn), [](auto &&){});
619  }
620 
622 
628  bool unset_callback() {
629  return RegAwtRes::constructed_ready == register_awaiter([]{return prepared_coro{};}, [](auto &&){});
630  }
631 
632 
634 
646  template<std::invocable<> Fn>
647  bool then(Fn &&fn) {
648  auto res = register_awaiter(std::forward<Fn>(fn), [](auto &&){});
649  switch (res) {
650  case RegAwtRes::resolved: {
651  fn();
652  } return false;
653  case RegAwtRes::constructed_resolved: {
654  _awaiter();
655  } return false;
656  default:
657  return true;
658  }
659  }
660 
662 
671  template<std::invocable<> Fn>
672  bool operator>>(Fn &&fn) {
673  return then(std::forward<Fn>(fn));
674  }
675 
676 
677 
679 
686  wait_awaiter wait() noexcept {return this;}
687 
688 
691  wait();
692  return await_resume();
693  }
694 
696  operator cast_ret_value() {
697  wait();
698  return getInternal();
699  }
700 
702 
706  bool is_pending() const {
707  return _state.load(std::memory_order_relaxed) != State::resolved;
708  }
709 
711 
715  bool is_in_progress() const {
716  auto st = _state.load(std::memory_order_relaxed);
717  return st != State::resolved && st != State::deferred;
718  }
719 
721 
725  bool is_deferred() const {
726  return _state.load(std::memory_order_relaxed) == State::deferred;
727  }
728 
730 
733  bool is_awaited() const {
734  return _state.load(std::memory_order_relaxed) == State::awaited;
735  }
736 
738  bool await_ready() const noexcept {
739  return _state.load(std::memory_order_relaxed) == State::resolved;
740  }
741 
743 
753  std::coroutine_handle<> await_suspend(std::coroutine_handle<> h) noexcept {
754  prepared_coro retval = {};
755  auto cb = [h]{return h;};
756  auto resume_fn = [&retval](prepared_coro h){retval = std::move(h);};
757  if (RegAwtRes::constructed_ready == register_awaiter(cb, resume_fn)) {
758  return retval.symmetric_transfer();
759  }
760  return h;
761  }
762 
764  ret_value await_resume() {
765  if constexpr(std::is_void_v<T>) {
766  getInternal();
767  } else {
768  return getInternal();
769  }
770  }
771 
773 
779  bool has_value() const {
780  return _result != Result::not_set;
781  }
782 
784 
790  bool has_exception() const {
791  return _result == Result::exception;
792  }
793 
794 
796 
807  canceled_awaiter operator!() {return this;}
808 
810 
821  template<typename X>
822  typename promise<X>::notify forward_to(promise<X> &prom) & noexcept {
823  if (this->is_pending()) return {};
824  switch (_result) {
825  case Result::exception: return prom.reject(_exception);
826  case Result::value:
827  if constexpr(std::is_void_v<X>) {
828  return prom();
829  } else {
830  if constexpr(std::is_reference_v<T>) {
831  return prom(*_value);
832  } else {
833  return prom(_value);
834  }
835  }
836  default:
837  case Result::not_set:
838  return prom.cancel();
839 
840  }
841 
842  }
843  template<typename X>
844  typename promise<X>::notify forward_to(promise<X> &&prom) & noexcept {
845  return forward_to(prom);
846  }
847 
849 
860  template<typename X>
861  typename promise<X>::notify forward_to(promise<X> &prom) && noexcept {
862  if (this->is_pending()) return {};
863  switch (_result) {
864  case Result::exception: return prom.reject(_exception);
865  case Result::value:
866  if constexpr(std::is_void_v<X>) {
867  return prom();
868  } else {
869  if constexpr(std::is_reference_v<T>) {
870  return prom(std::move(*_value));
871  } else {
872  return prom(std::move(_value));
873  }
874  }
875  default:
876  case Result::not_set:
877  return prom.cancel();
878 
879  }
880 
881  }
882 
883  template<typename X>
884  typename promise<X>::notify forward_to(promise<X> &&prom) && noexcept {
885  return std::move(*this).forward_to(prom);
886  }
887 
888 
890 
903  template<typename X, std::invocable<cast_ret_value> Fn>
904  typename promise<X>::notify convert_to(promise<X> &prom, Fn &&convert) noexcept {
905  if (this->is_pending()) return {};
906 
907  switch (_result) {
908  case Result::exception: return prom.reject(_exception);
909  case Result::value:
910  if constexpr(std::is_void_v<std::invoke_result_t<Fn,cast_ret_value> >) {
911  if constexpr(std::is_reference_v<T>) {
912  std::forward<Fn>(convert)(_value);
913  return prom();
914  } else {
915  std::forward<Fn>(convert)(_value);
916  return prom();
917  }
918  } else {
919  if constexpr(std::is_reference_v<T>) {
920  return prom(std::forward<Fn>(convert)(std::forward<cast_ret_value>(*_value)));
921  } else {
922  return prom(std::forward<Fn>(convert)(std::forward<cast_ret_value>(_value)));
923  }
924  }
925  default:
926  case Result::not_set:
927  return prom.cancel();
928 
929  }
930  }
931  template<typename X, std::invocable<cast_ret_value> Fn>
932  typename promise<X>::notify convert_to(promise<X> &&prom, Fn &&convert) noexcept {
933  return convert_to(prom, std::forward<Fn>(convert));
934  }
935 
936 
937 
938 protected:
939 
940  enum class State: char {
941  //future is resolved
942  resolved,
943  //evaluation is deferred
944  deferred,
945  //future is pending - no awaiter
946  pending,
947  //future is pending - awaiter is set
948  awaited,
949  //future is pending - deferred evaluation is current in progress
950  evaluating,
951 
952  };
953 
954  enum class Result : char{
955  //result is not set (yet?)
956  not_set,
957  //result contains value
958  value,
959  //result contains exception
960  exception,
961  };
962 
963  enum class RegAwtRes {
964  //awaiter constructed, but future is resolved
965  constructed_resolved,
966  //awaiter constructed and ready
967  constructed_ready,
968  //future resolved, nothing happened
969  resolved
970 
971  };
972 
973  using ChainPtr = future *;
974 
975  ChainPtr _chain = {};
976  std::atomic<State> _state = {State::resolved}; //< by default, future is resolved in canceled state
977  Result _result = Result::not_set; //< by default, future has no value
978  bool _awaiter_cleanup = false; //is set to true, if awaiter must be destroyed (in resolved state)
979 
980 
981  //because deferred and awaited are mutually exclusive,
982  //reuse single spcopy_ace for both functions
983  union {
984  awaiter_type _awaiter;
985  deferred_eval_fn_type _deferred;
986  };
987 
988 
989  union {
990  value_store_type _value;
991  std::exception_ptr _exception;
992  coro::function<T(), std::max(sizeof(value_store_type),sizeof(void *)*2)> _lambda;
993  };
994 
995 
996 
997  void clearStorage() {
998  switch (_result) {
999  default: break;
1000  case Result::value: std::destroy_at(&_value);break;
1001  case Result::exception: std::destroy_at(&_exception);break;
1002  }
1003  _result = Result::not_set;
1004  check_awaiter_cleanup();
1005  }
1006 
1007  void wait_internal() {
1008  auto st = _state.load(std::memory_order_acquire);
1009  while (st != State::resolved) {
1010  if (st == State::deferred) {
1011  startDeferredEvaluation([](auto c){c();});
1012  } else {
1013  trace::on_block([&]{
1014  _state.wait(st);
1015  },*this);
1016  }
1017  st = _state.load(std::memory_order_acquire);
1018  }
1019  }
1020 
1021  cast_ret_value getInternal() {
1022  switch (_result) {
1023  case Result::value:
1024  if constexpr(std::is_reference_v<T>) {
1025  return std::forward<cast_ret_value>(*_value);
1026  } else {
1027  return std::forward<cast_ret_value>(_value);
1028  }
1029 
1030  case Result::exception: std::rethrow_exception(_exception); break;
1031  default: break;
1032  }
1033  throw await_canceled_exception();
1034  }
1035 
1036 
1038 
1045  template<typename ResumeFn>
1046  bool startDeferredEvaluation(ResumeFn &&resume_fn) noexcept {
1047  //lock awaiter state (should be deferred previously)
1048  State new_state = State::evaluating;
1049  _state.exchange(new_state, std::memory_order_acquire);
1050  //call the deferred function and pass promise, pass coroutine to resume_fn
1051  resume_fn(_deferred(promise_t(this)));
1052  //destroy deferred function to leave place for awaiter
1053  std::destroy_at(&_deferred);
1054  //try to set pending - however, it can be already resolved now
1055  //return false, if resolved
1056  return _state.compare_exchange_strong(new_state, State::pending, std::memory_order_release);
1057 
1058  }
1059 
1061 
1066  template<std::invocable<promise_t> Fn>
1067  void setDeferredEvaluation(Fn &&fn) {
1068  //check return value - should return coroutine handle
1069  if constexpr(std::is_invocable_r_v<prepared_coro, Fn, promise_t> ) {
1070  //we expecting resolved state - other state is error
1071  State old = State::resolved;
1072  if (!_state.compare_exchange_strong(old,State::deferred, std::memory_order_relaxed)) {
1073  throw still_pending_exception();
1074  }
1075  //construct the function
1076  try {
1077  std::construct_at(&_deferred, std::forward<Fn>(fn));
1078  return;
1079  } catch (...) {
1080  //in case of exception, restore state
1081  _state.store(old, std::memory_order_relaxed);
1082  //and rethrow
1083  throw;
1084  }
1085  } else {
1086  //otherwise, wrap the function
1087  setDeferredEvaluation([xfn = std::move(fn)](promise_t prom) mutable->prepared_coro{
1088  xfn(std::move(prom));
1089  return {};
1090  });
1091  }
1092  }
1093 
1094  template<typename ... Args>
1095  void set_value(Args && ... args) {
1096  try {
1097  clearStorage();
1098  if constexpr(std::is_reference_v<T>) {
1099  std::construct_at(&_value, &args...);
1100  } else if constexpr(sizeof...(Args) == 1 && (invocable_r_exact<Args, value_store_type> && ...)) {
1101  new (&_value) value_store_type(args()...);
1102  } else {
1103  std::construct_at(&_value, std::forward<Args>(args)...);
1104  }
1105  _result = Result::value;
1106 
1107  if (_chain) {
1108  if constexpr(std::is_void_v<T>) {
1109  _chain->set_value();
1110  } else if constexpr(std::is_reference_v<T>) {
1111  _chain->set_value(*_value);
1112  } else if constexpr(std::is_copy_constructible_v<T>) {
1113  _chain->set_value(_value);
1114  } /* else {
1115  will cancel await (exception) as we cannot copy the valUE
1116  }*/
1117  }
1118  } catch (...) {
1119  set_exception(std::current_exception());
1120  }
1121  }
1122 
1123  void set_exception(std::exception_ptr e) {
1124  clearStorage();
1125  std::construct_at(&_exception, std::move(e));
1126  _result = Result::exception;
1127  if (_chain) _chain->set_exception(_exception);
1128  }
1129 
1130  template<typename SchedulerFn>
1131  void set_resolved(SchedulerFn &&schfn) {
1132  State st = _state.exchange(State::resolved);
1133  auto chain = _chain;
1134  _state.notify_all();
1135  if (st == State::awaited) {
1136  _awaiter_cleanup = true;
1137  schfn(_awaiter);
1138  }
1139  if (chain) chain->set_resolved(std::forward<SchedulerFn>(schfn));
1140  }
1141 
1142 
1143  void check_awaiter_cleanup() {
1144  if (_awaiter_cleanup) {
1145  std::destroy_at(&_awaiter);
1146  _awaiter_cleanup = false;
1147  }
1148  }
1149 
1150 
1151  template<std::invocable<> Awt, typename ResumeFn >
1152  RegAwtRes register_awaiter(Awt &&awt, ResumeFn &&resumeFn) {
1153  //lock awaiter slot
1154  State st = _state.exchange(State::evaluating);
1155  switch (st) {
1156  //already resolved?
1157  case State::resolved: _state.store(st); //restore state
1158  return RegAwtRes::resolved;
1159 
1160  case State::pending: //<still pending? move awaiter
1161  new(&_awaiter) auto(to_awaiter(awt));
1162  break;
1163  default:
1164  case State::evaluating: //<locked - this is error
1165  throw still_pending_exception();
1166  case State::awaited: //<already awaited? destroy previous awaiter and construct new one
1167  std::destroy_at(&_awaiter);
1168  new(&_awaiter) auto(to_awaiter(awt));
1169  break;
1170  case State::deferred: //<deferred ? perform evaluation and try again
1171  if (!startDeferredEvaluation(resumeFn)) return RegAwtRes::resolved;
1172  return register_awaiter(std::forward<Awt>(awt),std::forward<ResumeFn>(resumeFn));
1173  }
1174  st = State::evaluating; //replace evaluating with awaited
1175  if (!_state.compare_exchange_strong(st, State::awaited)) {
1176  //if failed, it should be resolved
1177  //mark that awaited must be clean up (always)
1178  _awaiter_cleanup = true;
1179  return RegAwtRes::constructed_resolved;
1180  }
1181  //awaiter has been set
1182  return RegAwtRes::constructed_ready;
1183  }
1184 
1185  template<std::invocable<> Fn>
1186  static awaiter_type to_awaiter(Fn &fn) {
1187  if constexpr (std::is_constructible_v<prepared_coro, std::invoke_result_t<Fn> >) {
1188  return std::move(fn);
1189  } else {
1190  return [fn = std::move(fn)]() mutable -> prepared_coro {
1191  fn();return {};
1192  };
1193  }
1194  }
1195 
1196  void checkInProgress() {
1197  if (is_in_progress()) throw still_pending_exception();
1198  }
1199 
1200 
1202 
1218  void attach(future *x) {
1219  future **ch = &_chain;
1220  while (*ch) ch = &((*ch)->_chain);
1221  *ch = x;
1222  }
1223 
1225 
1233  future *r = _chain;
1234  if (r) _chain = r->_chain;
1235  return r;
1236  }
1237 
1238  friend class _details::coro_promise_base<T>;
1239  friend class _details::wait_awaiter<future>;
1240  friend class promise<T>;
1241 };
1242 
1243 
1244 template<typename T>
1245 class deferred_future: public future<T> {
1246 public:
1247 
1248  using promise_t = typename future<T>::promise_t;
1249  using State = typename future<T>::State;
1250  using Result = typename future<T>::Result;
1251  using future<T>::future;
1252 
1253  template<std::invocable<promise_t> Fn>
1254  deferred_future(Fn &&fn) {
1255  this->setDeferredEvaluation(std::forward<Fn>(fn));
1256  }
1257  deferred_future(deferred_future &&other) {
1258  other.checkInProgress();
1259  if (other._state == State::deferred) {
1260  std::construct_at(&this->_deferred, std::move(other._deferred));
1261  this->_state = State::deferred;
1262  }
1263  switch (other._result) {
1264  case Result::value: std::construct_at(&this->_value, std::move(other._value)); break;
1265  case Result::exception: std::construct_at(&this->_exception, std::move(other._exception)); break;
1266  default: break;
1267  }
1268  this->_result = other._result;
1269  }
1270 
1271  deferred_future &operator=(deferred_future &&other) {
1272  if (this != &other){
1273  this->checkInProgress();
1274  other.checkInProgress();
1275  std::destroy_at(this);
1276  std::construct_at(this, std::move(other));
1277  }
1278  return *this;
1279  }
1280 
1282 
1285  void operator()(promise_t &&prom) & {
1286  State st = this->_state;
1287  if (st == State::deferred) {
1288  this->_deferred(std::move(prom));
1289  std::destroy_at(&this->_deferred);
1290  this->_state = State::resolved;
1291  } else if (st == State::resolved) {
1292  this->forward_to(prom);
1293  } else {
1294  throw still_pending_exception();
1295  }
1296  }
1297 
1299 
1302  void operator()(promise_t &&prom) && {
1303  State st = this->_state;
1304  if (st == State::deferred) {
1305  this->_deferred(std::move(prom));
1306  std::destroy_at(&this->_deferred);
1307  this->_state = State::resolved;
1308  } else if (st == State::resolved) {
1309  std::move(*this).forward_to(prom);
1310  } else {
1311  throw still_pending_exception();
1312  }
1313  }
1314 
1315 
1316 };
1317 
1318 namespace _details {
1319 
1320 template<typename T>
1321 class coro_promise_base: public trace::base_promise_type {
1322 protected:
1323 
1324  future<T> *fut = nullptr;
1325 
1326  prepared_coro set_resolved() {
1327  prepared_coro ret;
1328  auto tmp = std::exchange(fut, nullptr);
1329  tmp->set_resolved([&](auto &&fn){ret = fn();});
1330  return ret;
1331  }
1332  template<typename ... Args>
1333  void set_value(Args && ... args) const {
1334  if (fut) fut->set_value(std::forward<Args>(args)...);
1335  else {
1336  if constexpr ((std::invocable<Args> && ...)) {
1337  (args(), ...);
1338  }
1339  }
1340  }
1341 
1342  coro_promise_base() = default;
1343  coro_promise_base(const coro_promise_base &x) = delete;
1344  coro_promise_base &operator=(const coro_promise_base &) = delete;
1345  ~coro_promise_base() {
1346  if (fut) fut->set_resolved([](auto &&){});
1347  }
1348 
1349 
1350  void set_exception(std::exception_ptr e) {
1351  if (fut) fut->set_exception(std::move(e));
1352  }
1353 
1354 
1355 public:
1356  void unhandled_exception() {
1357  if (fut) fut->set_exception(std::current_exception());
1358  }
1359 
1360 };
1361 
1362 
1363 template<typename T>
1364 class coro_promise: public coro_promise_base<T> {
1365 public:
1366 
1367  template<typename Arg>
1368  requires future_constructible<T, Arg>
1369  void return_value(Arg &&arg) {
1370  this->set_value(std::forward<Arg>(arg));
1371  }
1372 
1373 
1374 };
1375 
1376 template<>
1377 class coro_promise<void>: public coro_promise_base<void> {
1378 public:
1379  void return_void() {
1380  this->set_value();
1381  }
1382 
1383 };
1384 
1385 }
1386 
1388 
1405 template<typename T>
1407 public:
1408 
1409  /*
1410  * +----------------+
1411  * | shared_ptr |---------------------> +----------------+
1412  * +----------------+ | |
1413  * | _next - l.list | | |
1414  * +----------------+ | future<T> |
1415  * | _state | | |
1416  * +----------------+ | |
1417  * | | +----------------+
1418  * | _awaiter/cb | | chain - l.list |
1419  * | | +----------------+
1420  * +----------------+
1421  *
1422  *
1423  *
1424  */
1425 
1426 
1427  using value_type = typename future<T>::value_type;
1428  using value_store_type = typename future<T>::value_store_type;
1429  using cast_ret_value = typename future<T>::cast_ret_value;
1430  using ret_value = typename future<T>::ret_value;
1431  using awaiter_cb = function<prepared_coro()>;
1432  using wait_awaiter = _details::wait_awaiter<shared_future>;
1433  using canceled_awaiter = _details::has_value_awaiter<shared_future,false>;
1434 
1436 
1440 
1441 
1443 
1457  template<typename Arg0, typename ... Args>
1458  requires (!std::is_same_v<std::decay_t<Arg0>, shared_future>)
1459  shared_future(Arg0 &&arg0, Args && ...args) {
1460  if constexpr(is_allocator<std::decay_t<Arg0> >) {
1461  static_assert(std::is_constructible_v<Shared, Args ...>, "Invalid arguments");
1462  _shared_future = std::allocate_shared<Shared>(arg0, std::forward<Args>(args)...);
1463  } else {
1464  static_assert(std::is_constructible_v<Shared, Arg0, Args ...>, "Invalid arguments");
1465  _shared_future = std::make_shared<Shared>(arg0, std::forward<Args>(args)...);
1466  }
1467  _shared_future->init_callback(_shared_future);
1468  }
1469 
1471  shared_future(shared_future &other):_shared_future(other._shared_future) {}
1473  shared_future(const shared_future &other):_shared_future(other._shared_future) {}
1476  this->_shared_future = other._shared_future;
1477  _state.store(State::unused, std::memory_order_relaxed);
1478  return *this;
1479  }
1480 
1483  check_in_progress();
1484  }
1485 
1488  _shared_future = std::make_shared<Shared>();
1489  promise<T> p = _shared_future->get_promise();
1490  init();
1491  return p;
1492  }
1493 
1495 
1501  template<typename stl_allocator>
1502  promise<T> get_promise(stl_allocator &&allocator) {
1503  _shared_future = std::allocate_shared<Shared>(std::forward<allocator>(allocator));
1504  promise<T> p = _shared_future->get_promise();
1505  init();
1506  return p;
1507  }
1508 
1509 
1511  template<std::invocable<> Fn>
1512  void operator<<(Fn &&fn) {
1513  static_assert(std::is_invocable_r_v<coro::future<T>, Fn>);
1514  _shared_future = std::make_shared<Shared>(std::forward<Fn>(fn));
1515  init();
1516  }
1517 
1519 
1522  template<std::invocable<> Fn, typename std_allocator>
1523  void operator<<(std::tuple<Fn, std_allocator> && tpl) {
1524  static_assert(std::is_invocable_r_v<coro::future<T>, Fn>);
1525  _shared_future = std::allocate_shared<Shared>(
1526  std::forward<std_allocator>(std::get<std_allocator>(tpl)),
1527  std::forward<Fn>(std::get<Fn>(tpl)));
1528  init();
1529  }
1530 
1531 
1533 
1538  template<std::invocable<> Fn>
1539  bool set_callback(Fn &&fn) {
1540  auto st = _state.exchange(State::unused);
1541  if constexpr(std::is_constructible_v<prepared_coro, std::invoke_result_t<Fn> >) {
1542  _awaiter.emplace(std::forward<Fn>(fn));
1543  } else {
1544  _awaiter.emplace([fn = std::move(fn)]() mutable ->prepared_coro{
1545  fn(); return {};
1546  });
1547  }
1548  if (st == State::notified) return false;
1549  auto st2 = State::unused;
1550  if (_state.compare_exchange_strong(st2, State::awaited)) {
1551  if (st == State::unused) return _shared_future->register_target(this);
1552  return true;
1553  }
1554  return false;
1555  }
1556 
1558 
1564  template<std::invocable<> Fn>
1565  bool then(Fn &&fn) {
1566  if (!set_callback(std::forward<Fn>(fn))) {
1567  (*_awaiter)();
1568  return false;
1569  }
1570  return true;
1571  }
1572 
1573 
1575  template<std::invocable<> Fn>
1576  bool operator>>(Fn &&fn) {
1577  return when_resolved(std::forward<Fn>(fn));
1578  }
1579 
1580 
1582 
1585  wait_awaiter wait() {return this;}
1586 
1587 
1589  ret_value get() {
1590  wait();
1591  return _shared_future->get();
1592  }
1593 
1594 
1596  operator cast_ret_value (){
1597  wait();
1598  return *_shared_future;
1599  }
1600 
1602 
1606  bool is_pending() const {
1607  return _shared_future && _shared_future->is_pending();
1608  }
1609 
1611 
1615  bool is_in_progress() const {
1616  return _shared_future && _shared_future->is_in_progress();
1617  }
1618 
1619 
1621 
1624  bool is_awaited() const {
1625  return _state.load(std::memory_order_relaxed) == State::awaited;
1626  }
1627 
1629  bool await_ready() const {
1630  return !_shared_future || _shared_future->await_ready();
1631  }
1632 
1634 
1644  bool await_suspend(std::coroutine_handle<> h) {
1645  return set_callback([h]{return h;});
1646  }
1647 
1649  ret_value await_resume() {
1650  return _shared_future->await_resume();
1651  }
1652 
1653 
1655 
1658  void reset() {
1659  _shared_future.reset();
1660  }
1661 
1663 
1669  bool has_value() const {
1670  return _shared_future->has_value();
1671  }
1672 
1674 
1680  bool has_exception() const {
1681  return _shared_future->has_exception();
1682  }
1683 
1685 
1696  canceled_awaiter operator!() {return this;}
1697 
1698 
1699 
1700 
1701 protected:
1702 
1703  class Shared: public future<T> {
1704  public:
1705 
1706  using future<T>::future;
1707 
1708  static void init_callback(std::shared_ptr<Shared> self) {
1709  self->then([self = std::move(self)]() mutable -> prepared_coro{
1710  auto r = self->notify_targets();
1711  self.reset();
1712  return r;
1713  });
1714  }
1715 
1716  prepared_coro notify_targets() {
1717  auto n = _await_chain.exchange(disabled_slot());
1718  return n?notify_targets(n):prepared_coro{};
1719  }
1720 
1721  static prepared_coro notify_targets(shared_future *list) {
1722  if (list->_next) {
1723  auto c1 = notify_targets(list->_next);
1724  auto c2 = list->activate();
1725  if (c1) {
1726  if (c2) {
1727  c1();
1728  return c2;
1729  }
1730  return c1;
1731  }
1732  return c2;
1733  } else {
1734  return list->activate();
1735  }
1736  }
1737 
1738  bool register_target(shared_future *item) {
1739  while (!_await_chain.compare_exchange_strong(item->_next, item)) {
1740  if (item->_next == disabled_slot()) return false;
1741  }
1742  return true;
1743  }
1744 
1745 
1746  protected:
1747  std::atomic<shared_future *> _await_chain = {};
1748  };
1749 
1750 
1751 
1752  enum class State: char {
1754  unused,
1756  awaited,
1758  notified
1759  };
1760 
1761 
1762  std::shared_ptr<Shared> _shared_future;
1763  shared_future *_next = nullptr;
1764  std::atomic<State> _state = {State::unused};
1765  std::optional<awaiter_cb> _awaiter;
1766 
1767  prepared_coro activate() {
1768  auto st = _state.exchange(State::notified);
1769  if (st == State::awaited) {
1770  return (*_awaiter)();
1771  }
1772  return {};
1773  }
1774 
1775  void wait_internal() {
1776  _shared_future->wait();
1777  }
1778 
1779 
1780  void check_in_progress() {
1781  if (_state.load(std::memory_order_relaxed) == State::awaited) {
1782  throw still_pending_exception();
1783  }
1784  }
1785 
1786 
1787  void init() {
1788  _shared_future->init_callback(_shared_future);
1789  _state.store(State::unused, std::memory_order_relaxed);
1790  }
1791 
1792 
1793  friend class _details::wait_awaiter<shared_future>;
1794 
1795 
1796  static shared_future<T> * disabled_slot() {return reinterpret_cast<shared_future<T> *>(1);}
1797 
1798 
1799 };
1800 
1801 
1802 #if 0
1803 
1804 class future_with_callback {
1805 public:
1806 
1807  template<typename Fut>
1808  future_with_callback(Fut &&fut) {
1809  this->fut = &fut;
1810  set_cb = [](void *fut, function<prepared_coro()>fun){
1811  using FutT = std::decay_t<Fut>;
1812  FutT *f = reinterpret_cast<FutT *>(fut);
1813  f->then(std::move(fun));
1814  };
1815  }
1816 
1817  template<std::invocable<> Fn>
1818  void operator>>(Fn &&fn) {
1819  if constexpr(std::is_same_v<std::invoke_result_t<Fn>, prepared_coro>) {
1820  set_cb(fut, std::forward<Fn>(fn));
1821  } else{
1822  set_cb(fut, [fn = std::move(fn)]()mutable -> prepared_coro {
1823  fn();
1824  return {};
1825  });
1826  }
1827  }
1828 
1829 protected:
1830  void *fut;
1831  void (*set_cb)(void *fut, function<prepared_coro()>);
1832 };
1833 
1834 
1835 
1837 
1840 class all_of: public future<void> {
1841 public:
1842 
1843 
1844  template<typename Iter>
1845  all_of(Iter from, Iter to) {
1846  result = get_promise();
1847  auto cb = [this]{if (--remain == 0) result();};
1848  while (from != to) {
1849  ++remain;
1850  static_cast<future_with_callback>(*from) >> cb;
1851  ++from;
1852  }
1853  cb();
1854  }
1855 
1856  all_of(std::initializer_list<future_with_callback> iter)
1857  :all_of(iter.begin(), iter.end()) {}
1858 
1859 protected:
1860  promise<void> result = {};
1861  std::atomic<int> remain = {1};
1862 
1863 };
1864 
1865 #endif
1866 #if 0
1867 class any_of: public future<unsigned int> {
1868 public:
1869 
1870  template<typename Iter>
1871  any_of(Iter from, Iter to) {
1872  cleanup = [=]{do_cleanup(from, to);};
1873  result = get_promise();
1874  unsigned int idx = 0;
1875  while (from != to) {
1876  static_cast<future_with_callback>(*from) >> [idx, this] {
1877  unsigned int r = notset;
1878  if (selected.compare_exchange_strong(r, idx)) {
1879  finish();
1880  }
1881  };
1882  ++from;
1883  ++idx;
1884  }
1885  if (inited.fetch_add(2) > 0) {
1886  cleanup();
1887  }
1888  }
1889 
1890 
1891  any_of(std::initializer_list<future_with_callback> iter)
1892  :any_of(iter.begin(), iter.end()) {}
1893 
1894 
1895 protected:
1896 
1897  void finish() {
1898  if (inited.fetch_add(1) > 1) {
1899  cleanup();
1900  }
1901  result(selected);
1902  }
1903 
1904 
1905  static constexpr unsigned int notset = std::bit_cast<unsigned int>(-1);
1906  promise<unsigned int> result = {};
1907  std::atomic<unsigned int> selected = {notset};
1908  std::atomic<int> inited = {0}; //0 - nothing, 1 - resolved before init, 2 - init waiting, 3 - resolved after init
1909  function<void(), 8*sizeof(void *)> cleanup;
1910 
1911  static void do_cleanup(auto from, auto to) {
1912  while (from != to) {
1913  static_cast<future_with_callback>(*from) >> []{return prepared_coro();};
1914  ++from;
1915  }
1916  }
1917 };
1918 #endif
1919 
1920 
1921 
1922 }
1923 
void operator()(promise_t &&prom) &
By calling deferred future with a promise, the future is started and result is trensfered to the prom...
Definition: future.h:1285
void operator()(promise_t &&prom) &&
By calling deferred future with a promise, the future is started and result is trensfered to the prom...
Definition: future.h:1302
Contains future value of T, where evaluation is deferred until the value is needed.
Definition: future.h:1245
future(deferred_tag, Fn &&fn)
Construct future with deferred evaluation.
Definition: future.h:509
requires(std::constructible_from< value_store_type, Args ... > &&avoid_same_kind< future, Args... >) future(Args &&... args)
Construct future already resolved with a value.
Definition: future.h:477
bool startDeferredEvaluation(ResumeFn &&resume_fn) noexcept
Starts evaluation of deferred future.
Definition: future.h:1046
promise_t get_promise()
Retrieve promise and begin evaluation.
Definition: future.h:567
bool has_exception() const
Determines, whether future has exception.
Definition: future.h:790
promise< X >::notify forward_to(promise< X > &prom) &noexcept
Forward value, possibly convert it, to different promise.
Definition: future.h:822
bool is_deferred() const
Determine deferred status.
Definition: future.h:725
std::coroutine_handle await_suspend(std::coroutine_handle<> h) noexcept
co_await support, called with suspended coroutine
Definition: future.h:753
void setDeferredEvaluation(Fn &&fn)
Initializes deferred evaluation.
Definition: future.h:1067
std::conditional_t< std::is_void_v< T >, void, std::add_rvalue_reference_t< T > > ret_value
type which is used as return value of get() and await_resume()
Definition: future.h:464
bool has_value() const
Determines, whether future has a value.
Definition: future.h:779
future(Fn &&fn)
Construct future which is evaluated inside of lambda function.
Definition: future.h:496
bool await_ready() const noexcept
co_await support, returns true, if value is ready (resolved)
Definition: future.h:738
future(std::in_place_t, Args &&... args)
Construct future already resolved with a value.
Definition: future.h:486
std::conditional_t< std::is_void_v< T >, bool, std::add_rvalue_reference_t< T > > cast_ret_value
type which is used for cast operator ()
Definition: future.h:457
bool unset_callback()
unset callback
Definition: future.h:628
promise< X >::notify convert_to(promise< X > &prom, Fn &&convert) noexcept
Forward value, possibly convert it, to different promise.
Definition: future.h:904
bool is_awaited() const
Determine whether an awaiter is set.
Definition: future.h:733
bool is_in_progress() const
Determine in progress status.
Definition: future.h:715
ret_value get()
Retrieves value, performs synchronous wait.
Definition: future.h:690
void attach(future *x)
Attach future to internal linked list.
Definition: future.h:1218
future * detach()
Detach linked list from the future.
Definition: future.h:1232
bool operator>>(Fn &&fn)
Sets function which is called once future is resolved (always)
Definition: future.h:672
wait_awaiter wait() noexcept
Perform synchronous wait on resolution.
Definition: future.h:686
bool then(Fn &&fn)
Sets function which is called once future is resolved (always)
Definition: future.h:647
canceled_awaiter operator!()
awaitable for canceled operation
Definition: future.h:807
bool is_pending() const
Determines pending status.
Definition: future.h:706
promise< X >::notify forward_to(promise< X > &prom) &&noexcept
Forward value, possibly convert it, to different promise.
Definition: future.h:861
bool set_callback(Fn &&fn)
Sets callback which is called once future is resolved (in future)
Definition: future.h:617
prepared_coro start()
Start deferred execution.
Definition: future.h:591
~future()
dtor
Definition: future.h:522
Contains future value of T, can be co_awaited in coroutine.
Definition: future.h:417
std::coroutine_handle symmetric_transfer()
release handle to be used in function await_suspend()
Definition: prepared_coro.h:53
contains prepared coroutine (prepared to run)
Definition: prepared_coro.h:15
std::coroutine_handle symmetric_transfer()
deliver the notification with ability to switch to the coroutine
Definition: future.h:127
notify(FutureType *fut)
Construct deferred notify from future.
Definition: future.h:172
void deliver()
deliver the notification now
Definition: future.h:100
notify operator+(notify &other)
combine notification into one object
Definition: future.h:148
notify & operator+=(notify &other)
append a notification
Definition: future.h:155
void deliver(Fn &&fn)
deliver notification through the function
Definition: future.h:112
void cancel()
cancel the future resolution (notify still needs to be delivered)
Definition: future.h:98
notify & operator+=(notify &&other)
append a notification
Definition: future.h:165
contain notification to be delivered to the asociated future
Definition: future.h:91
FutureType * release()
Release the future pointer from the promise object.
Definition: future.h:273
promise(FutureType *ptr)
Bound promise to future.
Definition: future.h:195
notify reject(std::exception_ptr e)
reject the future with exception
Definition: future.h:245
promise & operator+=(promise< T, f > &other)
Combine two promises into one.
Definition: future.h:304
~promise()
Dtor - if future is pending, cancels it.
Definition: future.h:214
const FutureType * get_future() const
Retrieve pointer to an associated future.
Definition: future.h:285
notify cancel()
cancel the future (resolve without value)
Definition: future.h:222
notify reject(E &&exception)
Reject with exception.
Definition: future.h:264
notify reject()
reject or cancel the future with exception
Definition: future.h:257
promise()=default
construct unbound promise
notify operator()(Args &&... args)
set value
Definition: future.h:232
promise & operator=(promise &&other)
Assign by move.
Definition: future.h:205
promise(promise &&other)
Move.
Definition: future.h:198
promise(promise< T, x > &&other)
Move change MT Safety.
Definition: future.h:202
promise operator+(promise< T, f > &other)
Combine two promises into one.
Definition: future.h:329
future< T > FutureType
Associated future.
Definition: future.h:77
Carries reference to future<T>, callable, sets value of an associated future<T>
Definition: future.h:73
bool has_exception() const
Determines, whether future has exception.
Definition: future.h:1680
bool then(Fn &&fn)
execute callback when future is resolved
Definition: future.h:1565
bool has_value() const
Determines, whether future has a value.
Definition: future.h:1669
shared_future(shared_future &other)
shared future is copyable
Definition: future.h:1471
void operator<<(std::tuple< Fn, std_allocator > &&tpl)
Redirect return value of function returning coro::future to instance of shared_future.
Definition: future.h:1523
bool await_suspend(std::coroutine_handle<> h)
co_await support, called with suspended coroutine
Definition: future.h:1644
bool set_callback(Fn &&fn)
set callback which is invoked when future is resolved
Definition: future.h:1539
void operator<<(Fn &&fn)
Redirect return value of function returning coro::future to instance of shared_future.
Definition: future.h:1512
shared_future(const shared_future &other)
shared future is copyable
Definition: future.h:1473
canceled_awaiter operator!()
awaitable for canceled operation
Definition: future.h:1696
ret_value get()
Retrieves value, performs synchronous wait.
Definition: future.h:1589
requires(!std::is_same_v< std::decay_t< Arg0 >, shared_future >) shared_future(Arg0 &&arg0
Construct shared future initialize it same way asi coro::future.
bool is_awaited() const
Determine whether an awaiter is set.
Definition: future.h:1624
bool await_ready() const
co_await support, returns true, if value is ready (resolved)
Definition: future.h:1629
promise< T > get_promise(stl_allocator &&allocator)
Initialize future and get promise.
Definition: future.h:1502
shared_future()
Construct shared future uninitialized.
Definition: future.h:1439
wait_awaiter wait()
Perform synchronous wait on resolution.
Definition: future.h:1585
bool is_in_progress() const
Determine in progress status.
Definition: future.h:1615
bool operator>>(Fn &&fn)
alias to then()
Definition: future.h:1576
void reset()
reset state
Definition: future.h:1658
~shared_future()
destructor
Definition: future.h:1482
promise< T > get_promise()
Initialize future and get promise.
Definition: future.h:1487
shared_future & operator=(const shared_future &other)
you can assign
Definition: future.h:1475
bool is_pending() const
Determines pending status.
Definition: future.h:1606
Future which can be shared (by copying - like shared_ptr)
Definition: future.h:1406
Exception is thrown on attempt to retrieve promise when the future is already pending.
Definition: exceptions.h:19
main namespace
Definition: aggregator.h:8