3 #include "../coro/subscription.h"
4 #include "../coro/queue.h"
11 template<
typename T,
typename Lock,
typename QueueImpl>
12 class distributor_queue;
19 template<
typename From,
typename To>
20 concept is_static_cast_convertible = requires(From from){
21 {
static_cast<To
>(from)};
45 template<
typename T,
typename Lock = nolock>
50 using reference_type = std::add_lvalue_reference_t<T>;
54 using ID =
const void *;
60 _mx(std::forward<Lock>(lk)) {
70 template<
typename ... Args>
73 if constexpr(
sizeof...(Args) == 1 && (is_static_cast_convertible<Args &&, reference_type> && ...)) {
76 value_type tmp(std::forward<Args>(args)...);
92 collect().
reject(std::move(e));
97 auto e = std::current_exception();
114 void subscribe(promise_t &&prom, ID
id = { }) {
115 trace::awaiting_ref(*
this, prom.get_future());
116 std::lock_guard _(_mx);
117 _subscribers.push_back(std::pair<promise_t, ID>(std::move(prom),
id));
129 std::lock_guard _(_mx);
130 auto iter = std::find_if(_subscribers.begin(), _subscribers.end(),
131 [&](
const auto &iter) {
132 return iter.second == id;
134 if (iter == _subscribers.end())
136 ntf = iter->first.cancel();
137 if (&(iter->first) != &_subscribers.back().first) {
138 std::swap(*iter, _subscribers.back());
140 _subscribers.pop_back();
158 template<typename QueueImpl = typename queue<T>::behavior>
159 using queue = distributor_queue<T, Lock, QueueImpl>;
162 [[no_unique_address]] Lock _mx;
163 std::vector<std::pair<promise_t, ID> > _subscribers;
166 std::lock_guard _(_mx);
168 while (!_subscribers.empty()) {
169 p += _subscribers.back().first;
170 _subscribers.pop_back();
176 template<
typename T,
typename Lock,
typename QueueImpl>
177 class distributor_queue:
public queue<T, QueueImpl> {
180 distributor_queue() =
default;
181 distributor_queue(QueueImpl qimpl) :
182 queue<T, QueueImpl>(std::move(qimpl)) {
184 distributor_queue(distributor<T, Lock> &dist) {
187 distributor_queue(
const distributor_queue&) =
delete;
188 distributor_queue& operator=(
const distributor_queue&) =
delete;
200 void subscribe(distributor<T, Lock> &dist) {
213 _connection->drop(
this);
214 _connection =
nullptr;
219 ~distributor_queue() {
224 typename distributor<T, Lock>::subscription _dist_value;
225 distributor<T, Lock> *_connection =
nullptr;
227 _dist_value = _connection->subscribe(
this);
228 _dist_value >> [&]{value_ready();};
233 if (_dist_value.has_value()) {
234 this->
push(T(_dist_value.get()));
240 this->
close(std::current_exception());
266 template<
typename T, std::invocable<T> FilterFn, std::invoke_result_t<FilterFn, T> important_message = 0>
269 using FilterResult = std::invoke_result_t<FilterFn, T>;
271 template<std::convertible_to<FilterFn> Fn>
276 bool empty()
const {
return _main_queue.empty() && _updates.empty();}
285 if (_main_queue.empty())
return _updates.begin()->second;
286 return _main_queue.front();
295 if (_main_queue.empty())
return _updates.begin()->second;
296 return _main_queue.front();
304 if (_main_queue.empty()) _updates.erase(_updates.begin());
305 else _main_queue.pop_front();
308 void push(
const T &x) {
313 emplace(std::move(x));
316 template<
typename ... Args>
317 void emplace(Args && ... args) {
320 _main_queue.emplace_back(std::forward<Args>(args)...);
322 T &itm = _main_queue.back();
324 FilterResult
id = _filter(itm);
327 if (
id == important_message)
return;
330 auto iter = _updates.find(
id);
332 if (iter == _updates.end()) {
334 _updates.emplace(
id, std::move(itm));
337 iter->second = std::move(itm);
340 _main_queue.pop_back();
344 using MainQueue = std::deque<T>;
345 using Updates = std::map<FilterResult, T>;
348 MainQueue _main_queue;
void reject_all()
reject all with current exception
subscription subscribe(ID id={ })
subscribe
void reject_all(std::exception_ptr e)
reject all with exception
distributor()=default
default constructor
pending_notify drop(ID id)
drop single subscriber
void cancel_all()
drops all subscribers
void publish(Args &&... args)
publish the value
distributor_queue< T, Lock, QueueImpl > queue
Implements queue above distributor.
distributor(Lock &&lk)
initialize lock instance
Distributes single event to multiple coroutines (subscribbers)
T & front()
Retrieves first item in queue.
void pop()
Removes first item from the queue.
const T & front() const
Retrieves first item in queue.
A special purpose queue which is intended to filter events sent to slow subscribers.
contains prepared coroutine (prepared to run)
contain notification to be delivered to the asociated future
notify reject(std::exception_ptr e)
reject the future with exception
notify cancel()
cancel the future (resolve without value)
Carries reference to future<T>, callable, sets value of an associated future<T>
void close(std::exception_ptr e=nullptr)
close the queue
auto push(const T &x)
Push item to the queue.
Awaiter for subscriptions.