uvco 0.1
Loading...
Searching...
No Matches
channel.h
Go to the documentation of this file.
1// uvco (c) 2024 Lewin Bormann. See LICENSE for specific terms.
2
3#pragma once
4
5#include <uv.h>
6
8#include "uvco/exception.h"
11#include "uvco/run.h"
12
13#include <boost/assert.hpp>
14#include <coroutine>
15
16namespace uvco {
17
20
43template <typename T> class Channel {
44public:
46 explicit Channel(unsigned capacity, unsigned max_waiters = 16)
47 : queue_{capacity}, read_waiting_{max_waiters},
49
59 template <typename U> Promise<void> put(U &&value) {
60 if (!queue_.hasSpace()) {
61 // Block until a reader has popped an item.
63 // Return value indicates if queue is filled with >= 1 item (true) or
64 // empty (false).
65 co_await awaiter;
66 BOOST_VERIFY(queue_.hasSpace());
67 }
68 queue_.put(std::forward<U>(value));
69
70 // NOTE: this will switch control to the reader until it suspends; keep this
71 // in mind.
72 //
73 // For a filled queue, this will result in a nice lock-step switching back
74 // and forth.
76 co_return;
77 }
78
84 if (queue_.empty()) {
86 BOOST_VERIFY(co_await awaiter);
87 }
88 T item{queue_.get()};
89 // NOTE: this will switch control to the writer until it suspends; keep this
90 // in mind.
92 co_return item;
93 }
94
102 while (true) {
103 if (queue_.empty()) {
105 BOOST_VERIFY(co_await awaiter);
106 }
107 T item = queue_.get();
108 awake_writer();
109 // Suspends until consumer asks for next item.
110 co_yield item;
111 }
112 }
113
114private:
116 // TODO: a multi-reader/writer queue is easily achieved by converting the
117 // optionals into queues. This may be interesting in future.
120
122 if (!read_waiting_.empty()) {
123 auto handle = read_waiting_.get();
124 // Slower than direct resume but interacts more nicely with other
125 // coroutines.
126 Loop::enqueue(handle);
127 }
128 }
130 if (!write_waiting_.empty()) {
131 auto handle = write_waiting_.get();
132 Loop::enqueue(handle);
133 }
134 }
135
138 BoundedQueue<std::coroutine_handle<>> &slot)
139 : queue_{queue}, waiters_{slot} {}
140
141 bool await_ready() { return false; }
142
143 bool await_suspend(std::coroutine_handle<> handle) {
144 if (!waiters_.hasSpace()) {
145 throw UvcoException(
146 UV_EBUSY,
147 "too many coroutines waiting for reading/writing a channel");
148 }
149 waiters_.put(handle);
150 return true;
151 }
152
153 bool await_resume() { return !queue_.empty(); }
154
155 // References Channel<T>::queue_
158 };
159};
160
162
163} // namespace uvco
Definition bounded_queue.h:23
T get()
Pop an item from the queue.
Definition bounded_queue.h:45
void put(U &&elem)
Push an item to the queue.
Definition bounded_queue.h:29
bool empty() const
size() == 0
Definition bounded_queue.h:59
bool hasSpace() const
size() < capacity()
Definition bounded_queue.h:61
Definition channel.h:43
void awake_writer()
Definition channel.h:129
Promise< void > put(U &&value)
Definition channel.h:59
Channel(unsigned capacity, unsigned max_waiters=16)
Create a channel for up to capacity items.
Definition channel.h:46
BoundedQueue< std::coroutine_handle<> > write_waiting_
Definition channel.h:119
BoundedQueue< std::coroutine_handle<> > read_waiting_
Definition channel.h:118
void awake_reader()
Definition channel.h:121
MultiPromise< T > getAll()
Definition channel.h:101
BoundedQueue< T > queue_
Definition channel.h:115
Promise< T > get()
Definition channel.h:83
static void enqueue(std::coroutine_handle<> handle)
Definition loop.cc:73
Definition multipromise.h:133
Definition promise.h:76
Definition async_work.cc:17
Definition channel.h:136
BoundedQueue< T > & queue_
Definition channel.h:156
bool await_resume()
Definition channel.h:153
BoundedQueue< std::coroutine_handle<> > & waiters_
Definition channel.h:157
ChannelAwaiter_(BoundedQueue< T > &queue, BoundedQueue< std::coroutine_handle<> > &slot)
Definition channel.h:137
bool await_ready()
Definition channel.h:141
bool await_suspend(std::coroutine_handle<> handle)
Definition channel.h:143
Definition exception.h:19