libcoro  1.0
Coroutine support library for C++20
aggregator.h
1 #pragma once
2 
3 #include "coroutine.h"
4 #include "generator.h"
5 #include "queue.h"
6 #include "on_leave.h"
7 #include <vector>
8 namespace coro {
9 
10 namespace _details {
11 template<typename T>
12 coroutine aggregator_cleanup(std::vector<generator<T> > , std::vector<deferred_future<T> > futures) {
13  //cycle over all futures and @b co_await for just has_value - we don't need the value
14  for (auto &f: futures) {
15  trace::awaiting_ref(f, &futures);
16  co_await f.wait();
17  }
18 }
19 }
20 
22 
38 template<typename T, coro_allocator Alloc = std_allocator>
39 generator<T, Alloc> aggregator(Alloc &, std::vector<generator<T> > gens) {
40 
41 
42  //list of futures waiting for results from generators
43  std::vector<deferred_future<T> > futures;
44  //queue - stores index to resolved future
46  std::size_t cnt = gens.size();
47 
48  //activate function - activate generator and awaits it passing index to queue
49  auto activate = [&](std::size_t idx) {
50  futures[idx] = gens[idx]();
51  futures[idx] >> [&q, idx]{
52  return q.push(idx).symmetric_transfer();
53 // q.push(idx); return prepared_coro();
54  };
55  trace::awaiting_ref(gens[idx],&q);
56  };
57 
58  //prepare array of futures
59  futures.resize(cnt);
60 
61  //activate all generators
62  for (std::size_t i = 0; i < cnt; ++i) activate(i);
63 
64  //define function, which is called on exit (including destroy)
65  on_leave lv=[&]{
66 
68  bool pending = false;
69  for (auto &f: futures) {
70  //this should return false, as the future is already resolved
71  //but when returns true, we need handle this as special case
72  //disarming all futures allows to detach it from the queue and detect such state
73  auto r = f.set_callback([]{});
74  pending = pending || r;
75  }
76  //if such future exists, install a coroutine, which awaits to resolution
77  if (pending) {
78  //perform cleanup inside of coroutine
79  _details::aggregator_cleanup(std::move(gens), std::move(futures));
80  }
81  };
82 
83  std::exception_ptr e = {};
84  //any running generator?
85  while (cnt) {
86 
87  //wait on queue
88  std::size_t idx = co_await q.pop();
89 
90  try {
91  //retrieve index, if it has value
92  if (futures[idx].has_value()) {
93  //yield value of future
94  co_yield std::move(futures[idx]).get();
95  //activate the generator
96  activate(idx);
97  } else {
98  //if generator has no value, decrease count of running generators
99  --cnt;
100  }
101  continue;
102  } catch (...) {
103  e = std::current_exception();
104  }
105  co_yield std::move(e);
106 
107  activate(idx);
108  }
109 }
110 
112 template<typename T>
114  return aggregator(standard_allocator, std::move(gens));
115 }
116 
117 
118 template<typename T, typename Alloc, std::convertible_to<generator<T> > ... Args>
119 void aggregator_create_list(std::vector<generator<T> > &out, generator<T, Alloc> &&gen1, Args &&... gens) {
120  out.push_back(generator<T>(std::move(gen1)));
121  aggregator_create_list(out, std::forward<Args>(gens)...);
122 }
123 
124 template<typename T>
125 void aggregator_create_list(std::vector<generator<T> > &) {}
126 
127 
128 
130 template<typename T, typename Alloc, std::convertible_to<generator<T> > ... Args>
131 auto aggregator(generator<T, Alloc> &&gen1, Args &&... gens) {
132  std::vector<generator<T> > out;
133  out.reserve(1+sizeof...(gens));
134  aggregator_create_list(out, std::move(gen1), std::forward<Args>(gens)...);
135  return aggregator(std::move(out));
136 
137 }
138 
140 template<typename T, typename Alloc, coro_allocator GenAlloc, std::convertible_to<generator<T> > ... Args>
141 auto aggregator(GenAlloc &genalloc, generator<T, Alloc> &&gen1, Args &&... gens) {
142  std::vector<generator<T> > out;
143  out.reserve(1+sizeof...(gens));
144  aggregator_create_list(out, std::move(gen1), std::forward<Args>(gens)...);
145  return aggregator(genalloc, std::move(out));
146 
147 }
148 
149 /*
150  * @}
151  */
152 }
153 
154 
155 
COROUTINE: Basic coroutine.
Definition: coroutine.h:28
Contains future value of T, where evaluation is deferred until the value is needed.
Definition: future.h:1245
COROUTINE: Generator.
Definition: generator.h:94
Defines function, which is called when function is exited.
Definition: on_leave.h:23
std::size_t size() const
retrieve current size of the queue
Definition: queue.h:149
future< T > pop()
Pop the items.
Definition: queue.h:95
Implements asychronous queue with support for coroutines.
Definition: queue.h:22
constexpr std_allocator standard_allocator
Global instance for std_allocator which can be used anywhere the allocator is requested.
Definition: allocator.h:22
generator< T, Alloc > aggregator(Alloc &, std::vector< generator< T > > gens)
Construct generator which aggregates results of multiple generators.
Definition: aggregator.h:39
main namespace
Definition: aggregator.h:8