21 template<
typename T,
typename QueueImpl = std::queue<T> >
26 using behavior = QueueImpl;
31 template<std::convertible_to<QueueImpl> Q>
32 explicit queue(Q &&qimpl):_item_queue(std::forward<Q>(qimpl)) {}
44 std::lock_guard lk(other._mx);
45 _awaiters = std::move(other._awaiters);
46 _item_queue = std::move(other._item_queue);
49 queue &operator= (
const queue &other) =
delete;
60 template<
typename ... Args>
62 std::unique_lock lk(_mx);
63 if (_awaiters.empty()) {
64 _item_queue.emplace(std::forward<Args>(args)...);
68 auto prom = std::move(_awaiters.front());
72 return prom(std::forward<Args>(args)...);
96 return [&](
auto prom) {
103 std::unique_lock lk(_mx);
104 if (_item_queue.empty()) {
106 if (_exception)
return prom.
reject(_exception);
107 else return prom.
cancel();
109 trace::awaiting_ref(*
this, prom.
get_future());
110 _awaiters.push(std::move(prom));
113 auto ntf = prom(std::move(_item_queue.front()));
124 std::unique_lock lk(_mx);
125 if (_item_queue.empty()) {
126 if (_exception) std::rethrow_exception(_exception);
129 std::optional<T> out (std::move(_item_queue.front()));
142 std::lock_guard _(_mx);
143 return _item_queue.empty();
150 std::lock_guard _(_mx);
151 return _item_queue.size();
155 std::lock_guard _(_mx);
169 void close(std::exception_ptr e =
nullptr) {
170 std::unique_lock lk(_mx);
173 auto z = std::move(_awaiters);
182 while (!z.empty()) z.pop();
188 std::lock_guard _(_mx);
190 _exception =
nullptr;
193 mutable std::mutex _mx;
194 QueueImpl _item_queue;
195 std::queue<promise<T> > _awaiters;
196 std::exception_ptr _exception;
197 bool _closed =
false;
Contains future value of T, can be co_awaited in coroutine.
contain notification to be delivered to the asociated future
notify reject(std::exception_ptr e)
reject the future with exception
const FutureType * get_future() const
Retrieve pointer to an associated future.
notify cancel()
cancel the future (resolve without value)
Carries reference to future<T>, callable, sets value of an associated future<T>
promise< T >::notify pop(promise< T > &prom)
Pop item into a promise.
auto push(T &&x)
Push item to the queue.
std::size_t size() const
retrieve current size of the queue
void close(std::exception_ptr e=nullptr)
close the queue
void reopen()
Reactivates the queue.
promise< T >::notify emplace(Args &&... args)
Push the item to the queue (emplace)
queue(queue &&other)
The queue is movable.
queue()=default
Construct default queue.
bool empty() const
determines whether queue is empty
void clear()
remove all items from the queue
auto push(const T &x)
Push item to the queue.
queue(const queue &other)=delete
The queue is not copyable.
future< T > pop()
Pop the items.
std::optional< T > try_pop()
Pops item non-blocking way.
queue(Q &&qimpl)
Construct queue - initialize internal queue object.
Implements asychronous queue with support for coroutines.