libcoro  1.0
Coroutine support library for C++20
queue.h
1 #pragma once
2 
3 #include "future.h"
4 
5 #include <mutex>
6 #include <optional>
7 #include <queue>
8 
9 namespace coro {
10 
12 
21 template<typename T, typename QueueImpl = std::queue<T> >
22 class queue {
23 public:
24 
25  using value_type = T;
26  using behavior = QueueImpl;
27 
29  queue() = default;
31  template<std::convertible_to<QueueImpl> Q>
32  explicit queue(Q &&qimpl):_item_queue(std::forward<Q>(qimpl)) {}
34  queue(const queue &other) = delete;
36 
43  queue(queue &&other):_closed(other._closed) {
44  std::lock_guard lk(other._mx);
45  _awaiters = std::move(other._awaiters);
46  _item_queue = std::move(other._item_queue);
47  other._closed = true;
48  }
49  queue &operator= (const queue &other) = delete;
50  queue &operator= (queue &other) = delete;
51 
52 
54 
60  template<typename ... Args>
61  typename promise<T>::notify emplace(Args &&... args) {
62  std::unique_lock lk(_mx);
63  if (_awaiters.empty()) { //no awaiters? push to the queue
64  _item_queue.emplace(std::forward<Args>(args)...);
65  return {};
66  }
67  //pick first awaiter
68  auto prom = std::move(_awaiters.front());
69  _awaiters.pop();
70  lk.unlock();
71  //construct the item directly to the awaiter
72  return prom(std::forward<Args>(args)...);
73  }
74 
76 
81  auto push(const T &x) {return emplace(x);}
83 
88  auto push(T &&x) {return emplace(std::move(x));}
89 
91 
96  return [&](auto prom) {
97  pop(prom);
98  };
99  }
100 
103  std::unique_lock lk(_mx);
104  if (_item_queue.empty()) {
105  if (_closed) {
106  if (_exception) return prom.reject(_exception);
107  else return prom.cancel(); //breaks promise
108  }
109  trace::awaiting_ref(*this, prom.get_future());
110  _awaiters.push(std::move(prom)); //remember promise
111  return {};
112  }
113  auto ntf = prom(std::move(_item_queue.front())); //resolve promise
114  _item_queue.pop();
115  lk.unlock();
116  return ntf;
117  }
118 
120 
123  std::optional<T> try_pop() {
124  std::unique_lock lk(_mx);
125  if (_item_queue.empty()) {
126  if (_exception) std::rethrow_exception(_exception);
127  return {};
128  }
129  std::optional<T> out (std::move(_item_queue.front()));
130  _item_queue.pop();
131  return out;
132  }
133 
135 
141  bool empty() const {
142  std::lock_guard _(_mx);
143  return _item_queue.empty();
144  }
146 
149  std::size_t size() const {
150  std::lock_guard _(_mx);
151  return _item_queue.size();
152  }
154  void clear() {
155  std::lock_guard _(_mx);
156  _item_queue = {};
157  }
159 
169  void close(std::exception_ptr e = nullptr) {
170  std::unique_lock lk(_mx);
171  if (_closed) return;
172  _exception = e;
173  auto z = std::move(_awaiters);
174  _closed = true;
175  lk.unlock();
176  if (e) {
177  while (!z.empty()) {
178  z.front().reject(e);
179  z.pop();
180  }
181  } else {
182  while (!z.empty()) z.pop(); //break all promises
183  }
184  }
185 
187  void reopen() {
188  std::lock_guard _(_mx);
189  _closed = false;
190  _exception = nullptr;
191  }
192 protected:
193  mutable std::mutex _mx;
194  QueueImpl _item_queue;
195  std::queue<promise<T> > _awaiters;
196  std::exception_ptr _exception;
197  bool _closed = false;
198 };
199 
200 }
Contains future value of T, can be co_awaited in coroutine.
Definition: future.h:417
contain notification to be delivered to the asociated future
Definition: future.h:91
notify reject(std::exception_ptr e)
reject the future with exception
Definition: future.h:245
const FutureType * get_future() const
Retrieve pointer to an associated future.
Definition: future.h:285
notify cancel()
cancel the future (resolve without value)
Definition: future.h:222
Carries reference to future<T>, callable, sets value of an associated future<T>
Definition: future.h:73
promise< T >::notify pop(promise< T > &prom)
Pop item into a promise.
Definition: queue.h:102
auto push(T &&x)
Push item to the queue.
Definition: queue.h:88
std::size_t size() const
retrieve current size of the queue
Definition: queue.h:149
void close(std::exception_ptr e=nullptr)
close the queue
Definition: queue.h:169
void reopen()
Reactivates the queue.
Definition: queue.h:187
promise< T >::notify emplace(Args &&... args)
Push the item to the queue (emplace)
Definition: queue.h:61
queue(queue &&other)
The queue is movable.
Definition: queue.h:43
queue()=default
Construct default queue.
bool empty() const
determines whether queue is empty
Definition: queue.h:141
void clear()
remove all items from the queue
Definition: queue.h:154
auto push(const T &x)
Push item to the queue.
Definition: queue.h:81
queue(const queue &other)=delete
The queue is not copyable.
future< T > pop()
Pop the items.
Definition: queue.h:95
std::optional< T > try_pop()
Pops item non-blocking way.
Definition: queue.h:123
queue(Q &&qimpl)
Construct queue - initialize internal queue object.
Definition: queue.h:32
Implements asychronous queue with support for coroutines.
Definition: queue.h:22
main namespace
Definition: aggregator.h:8