libcoro  1.0
Coroutine support library for C++20
distributor.h
1 #pragma once
2 
3 #include "../coro/subscription.h"
4 #include "../coro/queue.h"
5 
6 #include <vector>
7 #include <map>
8 
9 namespace coro {
10 
11 template<typename T, typename Lock, typename QueueImpl>
12 class distributor_queue;
13 
14 struct nolock {
15  void lock() {}
16  void unlock() {}
17 };
18 
19 template<typename From, typename To>
20 concept is_static_cast_convertible = requires(From from){
21  {static_cast<To>(from)};
22 };
23 
25 
45 template<typename T, typename Lock = nolock>
46 class distributor {
47 public:
48 
49  using value_type = T;
50  using reference_type = std::add_lvalue_reference_t<T>;
53  using pending_notify = typename promise<reference_type>::notify;
54  using ID = const void *;
55 
57  distributor() = default;
59  distributor(Lock &&lk) :
60  _mx(std::forward<Lock>(lk)) {
61  }
62 
64 
70  template<typename ... Args>
71  void publish(Args &&... args) {
72  promise_t plist = collect();
73  if constexpr(sizeof...(Args) == 1 && (is_static_cast_convertible<Args &&, reference_type> && ...)) {
74  plist(args...);
75  } else {
76  value_type tmp(std::forward<Args>(args)...);
77  plist(tmp);
78  }
79  }
80 
82 
86  void cancel_all() {
87  collect().cancel();
88  }
89 
91  void reject_all(std::exception_ptr e) {
92  collect().reject(std::move(e));
93  }
94 
96  void reject_all() {
97  auto e = std::current_exception();
98  if (e) reject_all(std::move(e));
99  else cancel_all();
100  }
101 
103 
107  subscription subscribe(ID id = { }) {
108  return typename subscription::subscribe_fn([this, id](promise_t promise) ->prepared_coro {
109  subscribe(std::move(promise), id);
110  return {};
111  });
112  }
113 
114  void subscribe(promise_t &&prom, ID id = { }) {
115  trace::awaiting_ref(*this, prom.get_future());
116  std::lock_guard _(_mx);
117  _subscribers.push_back(std::pair<promise_t, ID>(std::move(prom), id));
118  }
119 
121 
126  pending_notify drop(ID id) {
127  pending_notify ntf;
128  {
129  std::lock_guard _(_mx);
130  auto iter = std::find_if(_subscribers.begin(), _subscribers.end(),
131  [&](const auto &iter) {
132  return iter.second == id;
133  });
134  if (iter == _subscribers.end())
135  return {};
136  ntf = iter->first.cancel();
137  if (&(iter->first) != &_subscribers.back().first) {
138  std::swap(*iter, _subscribers.back());
139  }
140  _subscribers.pop_back();
141  }
142  return ntf;
143  }
144 
146 
158  template<typename QueueImpl = typename queue<T>::behavior>
159  using queue = distributor_queue<T, Lock, QueueImpl>;
160 
161 protected:
162  [[no_unique_address]] Lock _mx;
163  std::vector<std::pair<promise_t, ID> > _subscribers;
164 
165  promise_t collect() {
166  std::lock_guard _(_mx);
167  promise_t p;
168  while (!_subscribers.empty()) {
169  p += _subscribers.back().first;
170  _subscribers.pop_back();
171  }
172  return p;
173  }
174 };
175 
176 template<typename T, typename Lock, typename QueueImpl>
177 class distributor_queue: public queue<T, QueueImpl> {
178 public:
179 
180  distributor_queue() = default;
181  distributor_queue(QueueImpl qimpl) :
182  queue<T, QueueImpl>(std::move(qimpl)) {
183  }
184  distributor_queue(distributor<T, Lock> &dist) {
185  subscribe(dist);
186  }
187  distributor_queue(const distributor_queue&) = delete;
188  distributor_queue& operator=(const distributor_queue&) = delete;
189 
191 
200  void subscribe(distributor<T, Lock> &dist) {
201  _connection = &dist;
202  charge();
203  }
204 
206 
211  void unsubscribe() {
212  if (_connection) {
213  _connection->drop(this);
214  _connection = nullptr;
215  }
216 
217  }
218 
219  ~distributor_queue() {
220  unsubscribe();
221  }
222 
223 protected:
224  typename distributor<T, Lock>::subscription _dist_value;
225  distributor<T, Lock> *_connection = nullptr;
226  void charge() {
227  _dist_value = _connection->subscribe(this);
228  _dist_value >> [&]{value_ready();};
229  }
230 
231  void value_ready() {
232  try {
233  if (_dist_value.has_value()) {
234  this->push(T(_dist_value.get()));
235  charge();
236  } else {
237  this->close();
238  }
239  } catch (...) {
240  this->close(std::current_exception());
241  }
242  }
243 };
244 
246 
266 template<typename T, std::invocable<T> FilterFn, std::invoke_result_t<FilterFn, T> important_message = 0>
268 public:
269  using FilterResult = std::invoke_result_t<FilterFn, T>;
270 
271  template<std::convertible_to<FilterFn> Fn>
272  filtered_update_queue(Fn &&fn):_filter(std::forward<Fn>(fn)) {}
273 
274  filtered_update_queue() = default;
275 
276  bool empty() const {return _main_queue.empty() && _updates.empty();}
277 
279 
284  T &front() {
285  if (_main_queue.empty()) return _updates.begin()->second;
286  return _main_queue.front();
287  }
289 
294  const T &front() const {
295  if (_main_queue.empty()) return _updates.begin()->second;
296  return _main_queue.front();
297  }
298 
300 
303  void pop() {
304  if (_main_queue.empty()) _updates.erase(_updates.begin());
305  else _main_queue.pop_front();
306  }
307 
308  void push(const T &x) {
309  emplace(x);
310  }
311 
312  void push(T &&x) {
313  emplace(std::move(x));
314  }
315 
316  template<typename ... Args>
317  void emplace(Args && ... args) {
318  //assume that message is important (we can't determine ID from args)
319  //construct it in queue
320  _main_queue.emplace_back(std::forward<Args>(args)...);
321  //retrieve it constructed
322  T &itm = _main_queue.back();
323  //retrieve is ID
324  FilterResult id = _filter(itm);
325 
326  //if it is important message, we are good
327  if (id == important_message) return;
328 
329  //find whether there is message with same ID
330  auto iter = _updates.find(id);
331  //if not
332  if (iter == _updates.end()) {
333  //create it
334  _updates.emplace(id, std::move(itm));
335  } else {
336  //otherwise replace it
337  iter->second = std::move(itm);
338  }
339  //remove it from the queue )
340  _main_queue.pop_back();
341  }
342 
343 protected:
344  using MainQueue = std::deque<T>;
345  using Updates = std::map<FilterResult, T>;
346 
347  FilterFn _filter;
348  MainQueue _main_queue;
349  Updates _updates;
350 
351 };
352 
353 }
void reject_all()
reject all with current exception
Definition: distributor.h:96
subscription subscribe(ID id={ })
subscribe
Definition: distributor.h:107
void reject_all(std::exception_ptr e)
reject all with exception
Definition: distributor.h:91
distributor()=default
default constructor
pending_notify drop(ID id)
drop single subscriber
Definition: distributor.h:126
void cancel_all()
drops all subscribers
Definition: distributor.h:86
void publish(Args &&... args)
publish the value
Definition: distributor.h:71
distributor_queue< T, Lock, QueueImpl > queue
Implements queue above distributor.
Definition: distributor.h:159
distributor(Lock &&lk)
initialize lock instance
Definition: distributor.h:59
Distributes single event to multiple coroutines (subscribbers)
Definition: distributor.h:46
T & front()
Retrieves first item in queue.
Definition: distributor.h:284
void pop()
Removes first item from the queue.
Definition: distributor.h:303
const T & front() const
Retrieves first item in queue.
Definition: distributor.h:294
A special purpose queue which is intended to filter events sent to slow subscribers.
Definition: distributor.h:267
contains prepared coroutine (prepared to run)
Definition: prepared_coro.h:15
contain notification to be delivered to the asociated future
Definition: future.h:91
notify reject(std::exception_ptr e)
reject the future with exception
Definition: future.h:245
notify cancel()
cancel the future (resolve without value)
Definition: future.h:222
Carries reference to future<T>, callable, sets value of an associated future<T>
Definition: future.h:73
void close(std::exception_ptr e=nullptr)
close the queue
Definition: queue.h:169
auto push(const T &x)
Push item to the queue.
Definition: queue.h:81
Awaiter for subscriptions.
Definition: subscription.h:29
main namespace
Definition: aggregator.h:8