uvco 0.1
Loading...
Searching...
No Matches
Classes | Public Member Functions | Protected Member Functions | Protected Attributes | Static Private Member Functions | List of all members
uvco::StreamServerBase< UvStreamType, StreamType > Class Template Reference

#include <stream_server_base.h>

Classes

struct  ConnectionAwaiter_
 

Public Member Functions

 StreamServerBase (const StreamServerBase &)=delete
 
 StreamServerBase (StreamServerBase &&)=default
 
StreamServerBaseoperator= (const StreamServerBase &)=delete
 
StreamServerBaseoperator= (StreamServerBase &&)=default
 
 ~StreamServerBase ()
 
MultiPromise< StreamType > listen (int backlog=128)
 
Promise< void > close ()
 Close server and stop accepting client connections; must be awaited.
 

Protected Member Functions

 StreamServerBase (std::unique_ptr< UvStreamType > socket)
 

Protected Attributes

std::unique_ptr< UvStreamType > socket_
 

Static Private Member Functions

static void onNewConnection (uv_stream_t *stream, uv_status status)
 

Detailed Description

template<typename UvStreamType, typename StreamType>
class uvco::StreamServerBase< UvStreamType, StreamType >

Not for use in user code; base class for e.g. UnixServer and TcpServer.

Because accepting connections looks the same for Unix and TCP servers, the behavior is defined here and shared by both. However, the implementation must be generic over the stream type, so the actual stream type is a template parameter.

Constructor & Destructor Documentation

◆ StreamServerBase() [1/3]

template<typename UvStreamType , typename StreamType >
uvco::StreamServerBase< UvStreamType, StreamType >::StreamServerBase ( const StreamServerBase< UvStreamType, StreamType > &  )
delete

◆ StreamServerBase() [2/3]

template<typename UvStreamType , typename StreamType >
uvco::StreamServerBase< UvStreamType, StreamType >::StreamServerBase ( StreamServerBase< UvStreamType, StreamType > &&  )
default

◆ ~StreamServerBase()

template<typename UvStreamType , typename StreamType >
uvco::StreamServerBase< UvStreamType, StreamType >::~StreamServerBase ( )
31 {
32 if (socket_) {
33 fmt::print(stderr, "StreamServerBase::~StreamServerBase(): closing server "
34 "in dtor; this will leak memory. "
35 "Please co_await server.close() if possible.\n");
36 // Asynchronously close handle. It's better to leak memory than file
37 // descriptors.
38 closeHandle(socket_.release());
39 }
40}
std::unique_ptr< UvStreamType > socket_
Definition stream_server_base.h:59
Promise< void > closeHandle(T *handle, C closer)
Definition close.h:28

◆ StreamServerBase() [3/3]

template<typename UvStreamType , typename StreamType >
uvco::StreamServerBase< UvStreamType, StreamType >::StreamServerBase ( std::unique_ptr< UvStreamType >  socket)
inlineexplicitprotected
58 : socket_{std::move(socket)} {}

Member Function Documentation

◆ close()

template<typename UvStreamType , typename StreamType >
Promise< void > uvco::StreamServerBase< UvStreamType, StreamType >::close ( )

Close server and stop accepting client connections; must be awaited.

43 {
44 if (!dataIsNull(socket_.get())) {
45 auto *awaiter = getData<ConnectionAwaiter_>(socket_.get());
46 // Resume listener coroutine and tell it to exit.
47 // If awaiter == nullptr, one of two things is true:
48 // 1. listener is currently not running
49 // 2. listener has yielded and is suspended there: the listener generator
50 // will be cancelled when its MultiPromise is dropped.
51 if (awaiter->handle_) {
52 awaiter->stop();
53 }
54 }
55 co_await closeHandle(socket_.get());
56 socket_.reset();
57}
bool dataIsNull(Handle *handle)
Definition internal_utils.h:66

◆ listen()

template<typename UvStreamType , typename StreamType >
MultiPromise< StreamType > uvco::StreamServerBase< UvStreamType, StreamType >::listen ( int  backlog = 128)

Return client connections as clients connect.

Raises exceptions if errors occur during accepting or listening.

This generator may not be co_awaited on after having called close().

61 {
62 BOOST_ASSERT(socket_);
63 ConnectionAwaiter_ awaiter{*socket_};
64 setData(socket_.get(), &awaiter);
65
66 const uv_status listenStatus =
67 uv_listen((uv_stream_t *)socket_.get(), backlog,
69 if (listenStatus != 0) {
70 setData(socket_.get(), (void *)nullptr);
71 throw UvcoException{listenStatus,
72 "StreamServerBase::listen(): failed to listen"};
73 }
74
75 while (true) {
76 const bool acceptOk = co_await awaiter;
77 if (!acceptOk) {
78 // At this point, do not touch socket_->data anymore!
79 // This is the result of ConnectionAwaiter_::stop(), and
80 // data points to a CloseAwaiter_ object.
81 break;
82 }
83
84 for (auto it = awaiter.accepted_.begin(); it != awaiter.accepted_.end();
85 it++) {
86 auto &streamSlot = *it;
87
88 if (streamSlot.index() == 0) {
89 const uv_status status = std::get<0>(streamSlot);
90 BOOST_ASSERT(status != 0);
91 // When the error is handled, the user code can again call listen()
92 // and will process the remaining connections. Therefore, first remove
93 // the already processed connections.
94 awaiter.accepted_.erase(awaiter.accepted_.begin(), it);
95 setData(socket_.get(), (void *)nullptr);
96 throw UvcoException{status,
97 "UnixStreamServer failed to accept a connection!"};
98 } else {
99 // Awkward handlign: if the returned MultiPromise is dropped, we will
100 // never return from co_yield. However, dropping may happen after
101 // calling `close()`, so we cannot rely on socket_ still existing.
102 //
103 // `close()` also relies on whether `socket_->data` is `nullptr` or not
104 // to decide if the socket has been closed already.
105 setData(socket_.get(), (void *)nullptr);
106 co_yield std::move(std::get<1>(streamSlot));
107 setData(socket_.get(), &awaiter);
108 }
109 }
110 awaiter.accepted_.clear();
111 }
112 setData(socket_.get(), (void *)nullptr);
113}
static void onNewConnection(uv_stream_t *stream, uv_status status)
Definition stream_server_base_impl.cc:145
void setData(Handle *handle, Data *data)
Definition internal_utils.h:55
int uv_status
Result of a libuv operation, an errno error code.
Definition internal_utils.h:22

◆ onNewConnection()

template<typename UvStreamType , typename StreamType >
void uvco::StreamServerBase< UvStreamType, StreamType >::onNewConnection ( uv_stream_t *  stream,
uv_status  status 
)
staticprivate
146 {
147 const auto *server = (UvStreamType *)stream;
148 auto *connectionAwaiter = getData<ConnectionAwaiter_>(server);
149 uv_loop_t *const loop = connectionAwaiter->socket_.loop;
150
151 if (status == 0) {
152 auto clientStream = std::make_unique<UvStreamType>();
153 UvStreamInitHelper<UvStreamType>::init(loop, clientStream.get());
154 const uv_status acceptStatus =
155 uv_accept((uv_stream_t *)server, (uv_stream_t *)clientStream.get());
156 if (acceptStatus == 0) {
157 connectionAwaiter->accepted_.emplace_back(
158 StreamType{std::move(clientStream)});
159 } else {
160 connectionAwaiter->accepted_.emplace_back(acceptStatus);
161 }
162 } else {
163 connectionAwaiter->accepted_.emplace_back(status);
164 }
165
166 if (connectionAwaiter->handle_) {
167 Loop::enqueue(*connectionAwaiter->handle_);
168 connectionAwaiter->handle_.reset();
169 }
170}
static void enqueue(std::coroutine_handle<> handle)
Definition loop.cc:73
static void init(uv_loop_t *, UvStreamType *)
Definition stream_server_base.h:27

◆ operator=() [1/2]

template<typename UvStreamType , typename StreamType >
StreamServerBase & uvco::StreamServerBase< UvStreamType, StreamType >::operator= ( const StreamServerBase< UvStreamType, StreamType > &  )
delete

◆ operator=() [2/2]

template<typename UvStreamType , typename StreamType >
StreamServerBase & uvco::StreamServerBase< UvStreamType, StreamType >::operator= ( StreamServerBase< UvStreamType, StreamType > &&  )
default

Member Data Documentation

◆ socket_

template<typename UvStreamType , typename StreamType >
std::unique_ptr<UvStreamType> uvco::StreamServerBase< UvStreamType, StreamType >::socket_
protected

The documentation for this class was generated from the following files: