uvco 0.1
Loading...
Searching...
No Matches
channel.h
Go to the documentation of this file.
1// uvco (c) 2024 Lewin Bormann. See LICENSE for specific terms.
2
3#pragma once
4
5#include <uv.h>
6
8#include "uvco/exception.h"
11#include "uvco/run.h"
12
13#include <boost/assert.hpp>
14#include <coroutine>
15
16namespace uvco {
17
20
43template <typename T> class Channel {
44public:
46 explicit Channel(unsigned capacity, unsigned max_waiters = 16)
47 : queue_{capacity}, read_waiting_{max_waiters},
48 write_waiting_{max_waiters} {}
49
60 template <typename U> Promise<void> put(U value) {
61 if (!queue_.hasSpace()) {
62 // Block until a reader has popped an item.
64 // Return value indicates if queue is filled with >= 1 item (true) or
65 // empty (false).
66 co_await awaiter;
67 BOOST_VERIFY(queue_.hasSpace());
68 }
69 queue_.put(std::forward<U>(value));
70
71 // NOTE: this will switch control to the reader until it suspends; keep this
72 // in mind.
73 //
74 // For a filled queue, this will result in a nice lock-step switching back
75 // and forth.
77 co_return;
78 }
79
85 if (queue_.empty()) {
87 BOOST_VERIFY(co_await awaiter);
88 }
89 T item{queue_.get()};
90 // NOTE: this will switch control to the writer until it suspends; keep this
91 // in mind.
93 co_return item;
94 }
95
103 while (true) {
104 if (queue_.empty()) {
106 BOOST_VERIFY(co_await awaiter);
107 }
108 T item = queue_.get();
109 awake_writer();
110 // Suspends until consumer asks for next item.
111 co_yield item;
112 }
113 }
114
115private:
117 // TODO: a multi-reader/writer queue is easily achieved by converting the
118 // optionals into queues. This may be interesting in future.
121
123 while (!read_waiting_.empty()) {
124 std::coroutine_handle<void> handle = read_waiting_.get();
125 // Skip cancelled coroutines.
126 if (handle != nullptr) {
127 Loop::enqueue(handle);
128 break;
129 }
130 }
131 }
132
134 while (!write_waiting_.empty()) {
135 std::coroutine_handle<void> handle = write_waiting_.get();
136 if (handle != nullptr) {
137 Loop::enqueue(handle);
138 break;
139 }
140 }
141 }
142
145 BoundedQueue<std::coroutine_handle<>> &slot)
146 : queue_{queue}, waiters_{slot} {}
148 // Remove us from waiters.
149 waiters_.forEach([this](std::coroutine_handle<> &h) {
150 // Remove pending coroutine from waiter queue.
151 if (h == thisCoro_) {
152 h = nullptr;
153 }
154 });
155 }
156
157 bool await_ready() { return false; }
158
159 bool await_suspend(std::coroutine_handle<> handle) {
160 if (!waiters_.hasSpace()) {
161 throw UvcoException(
162 UV_EBUSY,
163 "too many coroutines waiting for reading/writing a channel");
164 }
165 thisCoro_ = handle;
166 waiters_.put(handle);
167 return true;
168 }
169
170 bool await_resume() { return !queue_.empty(); }
171
172 // References Channel<T>::queue_
175 std::coroutine_handle<> thisCoro_;
176 };
177};
178
180
181} // namespace uvco
Definition bounded_queue.h:24
void awake_writer()
Definition channel.h:133
Promise< void > put(U value)
Definition channel.h:60
Channel(unsigned capacity, unsigned max_waiters=16)
Create a channel for up to capacity items.
Definition channel.h:46
BoundedQueue< std::coroutine_handle<> > write_waiting_
Definition channel.h:120
BoundedQueue< std::coroutine_handle<> > read_waiting_
Definition channel.h:119
void awake_reader()
Definition channel.h:122
MultiPromise< T > getAll()
Definition channel.h:102
BoundedQueue< T > queue_
Definition channel.h:116
Promise< T > get()
Definition channel.h:84
static void enqueue(std::coroutine_handle<> handle)
Definition loop.cc:94
Definition multipromise.h:127
Definition promise.h:49
Definition async_work.cc:18
Definition channel.h:143
BoundedQueue< T > & queue_
Definition channel.h:173
bool await_resume()
Definition channel.h:170
std::coroutine_handle thisCoro_
Definition channel.h:175
BoundedQueue< std::coroutine_handle<> > & waiters_
Definition channel.h:174
ChannelAwaiter_(BoundedQueue< T > &queue, BoundedQueue< std::coroutine_handle<> > &slot)
Definition channel.h:144
bool await_ready()
Definition channel.h:157
bool await_suspend(std::coroutine_handle<> handle)
Definition channel.h:159
~ChannelAwaiter_()
Definition channel.h:147
Definition exception.h:19