uvco 0.1
Loading...
Searching...
No Matches
Classes | Public Member Functions | Private Member Functions | Private Attributes | List of all members
uvco::Channel< T > Class Template Reference

#include <channel.h>

Collaboration diagram for uvco::Channel< T >:
Collaboration graph
[legend]

Classes

struct  ChannelAwaiter_
 

Public Member Functions

 Channel (unsigned capacity, unsigned max_waiters=16)
 Create a channel for up to capacity items.
 
template<typename U >
Promise< voidput (U &&value)
 
Promise< T > get ()
 
MultiPromise< TgetAll ()
 

Private Member Functions

void awake_reader ()
 
void awake_writer ()
 

Private Attributes

BoundedQueue< Tqueue_
 
BoundedQueue< std::coroutine_handle<> > read_waiting_
 
BoundedQueue< std::coroutine_handle<> > write_waiting_
 

Detailed Description

template<typename T>
class uvco::Channel< T >

A Channel is similar to a Go channel: buffered, and blocking for reading and writing if empty/full respectively.

A bounded-capacity channel for items of type T. A channel can be waited on by at most max_waiters coroutines. If more coroutines want to wait, an exception is thrown.

A reader waits while the channel is empty, and is awoken by the first writer. A writer waits while the channel is full, and is awoken by the first reader.

When only using a channel to communicate small objects between coroutines, it takes about 1 µs per send/receive operation on a slightly older i5-7300U CPU @ 2.60GHz CPU (clang 17) using the RunMode::Deferred event loop mode. This includes the entire coroutine dance of suspending/resuming between the reader and writer. (RunMode::Immediate is ~25% faster)

Caveat 1: the channel is obviously not thread safe. Only use within one loop. Caveat 2: As you can notice, the Channel is independent of a Loop. This means that a runMain() may return despite there being channels in use and awaited on. Ensure that at least one uv operation (socket read/write/listen, timer, etc.) is running to keep the loop alive.

Constructor & Destructor Documentation

◆ Channel()

template<typename T >
uvco::Channel< T >::Channel ( unsigned  capacity,
unsigned  max_waiters = 16 
)
inlineexplicit

Create a channel for up to capacity items.

47 : queue_{capacity}, read_waiting_{max_waiters},
BoundedQueue< std::coroutine_handle<> > write_waiting_
Definition channel.h:119
BoundedQueue< std::coroutine_handle<> > read_waiting_
Definition channel.h:118
BoundedQueue< T > queue_
Definition channel.h:115
Promise< T > get()
Definition channel.h:83

Member Function Documentation

◆ awake_reader()

template<typename T >
void uvco::Channel< T >::awake_reader ( )
inlineprivate
121 {
122 if (!read_waiting_.empty()) {
123 auto handle = read_waiting_.get();
124 // Slower than direct resume but interacts more nicely with other
125 // coroutines.
126 Loop::enqueue(handle);
127 }
128 }
T get()
Pop an item from the queue.
Definition bounded_queue.h:45
bool empty() const
size() == 0
Definition bounded_queue.h:59
static void enqueue(std::coroutine_handle<> handle)
Definition loop.cc:73

◆ awake_writer()

template<typename T >
void uvco::Channel< T >::awake_writer ( )
inlineprivate
129 {
130 if (!write_waiting_.empty()) {
131 auto handle = write_waiting_.get();
132 Loop::enqueue(handle);
133 }
134 }

◆ get()

template<typename T >
Promise< T > uvco::Channel< T >::get ( )
inline

Take an item from the channel.

Suspends if no items are available. The suspended coroutine is resumed by the next writer adding an item.

83 {
84 if (queue_.empty()) {
85 ChannelAwaiter_ awaiter{queue_, read_waiting_};
86 BOOST_VERIFY(co_await awaiter);
87 }
88 T item{queue_.get()};
89 // NOTE: this will switch control to the writer until it suspends; keep this
90 // in mind.
92 co_return item;
93 }
void awake_writer()
Definition channel.h:129

◆ getAll()

template<typename T >
MultiPromise< T > uvco::Channel< T >::getAll ( )
inline

Continuously read items from channel by repeatedly co_awaiting the returned MultiPromise. (getAll() is a generator)

Remember to call MultiPromise<T>::cancel() when you are done with the channel, although this should happen automatically when the last MultiPromise object is dropped.

101 {
102 while (true) {
103 if (queue_.empty()) {
104 ChannelAwaiter_ awaiter{queue_, read_waiting_};
105 BOOST_VERIFY(co_await awaiter);
106 }
107 T item = queue_.get();
108 awake_writer();
109 // Suspends until consumer asks for next item.
110 co_yield item;
111 }
112 }

◆ put()

template<typename T >
template<typename U >
Promise< void > uvco::Channel< T >::put ( U &&  value)
inline

Put an item into the channel.

Suspends if no space is available in the channel. The suspended coroutine is resumed by the next reader taking out an item.

Template method: implements perfect forwarding for both copy and move insertion.

NOTE: template argument restriction may not be entirely correct?

59 {
60 if (!queue_.hasSpace()) {
61 // Block until a reader has popped an item.
62 ChannelAwaiter_ awaiter{queue_, write_waiting_};
63 // Return value indicates if queue is filled with >= 1 item (true) or
64 // empty (false).
65 co_await awaiter;
66 BOOST_VERIFY(queue_.hasSpace());
67 }
68 queue_.put(std::forward<U>(value));
69
70 // NOTE: this will switch control to the reader until it suspends; keep this
71 // in mind.
72 //
73 // For a filled queue, this will result in a nice lock-step switching back
74 // and forth.
76 co_return;
77 }
void awake_reader()
Definition channel.h:121

Member Data Documentation

◆ queue_

template<typename T >
BoundedQueue<T> uvco::Channel< T >::queue_
private

◆ read_waiting_

template<typename T >
BoundedQueue<std::coroutine_handle<> > uvco::Channel< T >::read_waiting_
private

◆ write_waiting_

template<typename T >
BoundedQueue<std::coroutine_handle<> > uvco::Channel< T >::write_waiting_
private

The documentation for this class was generated from the following file: