uvco 0.1
Loading...
Searching...
No Matches
Classes | Public Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | List of all members
uvco::Udp Class Reference

#include <udp.h>

Collaboration diagram for uvco::Udp:
Collaboration graph
[legend]

Classes

struct  RecvAwaiter_
 
struct  SendAwaiter_
 

Public Member Functions

 Udp (const Loop &loop)
 Set up a UDP object.
 
 Udp (Udp &&other)=default
 
Udpoperator= (Udp &&other)=default
 
 Udp (const Udp &)=delete
 
Udpoperator= (const Udp &)=delete
 
 ~Udp ()
 
Promise< void > bind (std::string_view address, uint16_t port, unsigned int flag=0)
 Bind UDP socket to address.
 
Promise< void > bind (const AddressHandle &address, unsigned int flag=0)
 
Promise< void > connect (std::string_view address, uint16_t port, bool ipv6only=false)
 Connect UDP socket to address.
 
Promise< void > connect (const AddressHandle &address)
 
Promise< void > send (std::span< char > buffer, std::optional< AddressHandle > address={})
 
Promise< std::string > receiveOne ()
 
Promise< std::pair< std::string, AddressHandle > > receiveOneFrom ()
 
MultiPromise< std::pair< std::string, AddressHandle > > receiveMany ()
 
void stopReceiveMany (MultiPromise< std::pair< std::string, AddressHandle > > &packets)
 
void setBroadcast (bool enabled)
 Enable sending to broadcast addresses.
 
void setTtl (uint8_t ttl)
 Set TTL on sent packets. TTL must be between 1 and 255.
 
void setMulticastInterface (const std::string &interfaceAddress)
 Set multicast interface for sending or receiving data.
 
void setMulticastLoop (bool enabled)
 
void joinMulticast (const std::string &address, const std::string &interface)
 Join a multicast group.
 
void leaveMulticast (const std::string &address, const std::string &interface)
 Leave multicast group.
 
AddressHandle getSockname () const
 Obtain locally bound name of socket.
 
std::optional< AddressHandlegetPeername () const
 Obtain peer name if connected.
 
Promise< void > close ()
 
uv_udp_t * underlying () const
 

Private Member Functions

int udpStartReceive ()
 
void udpStopReceive ()
 

Static Private Member Functions

static void onReceiveOne (uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned int flags)
 
static void onSendDone (uv_udp_send_t *req, uv_status status)
 

Private Attributes

const Looploop_
 
std::unique_ptr< uv_udp_t > udp_
 
bool connected_ = false
 
bool is_receiving_ = false
 

Detailed Description

Interface to UDP functionality: can be connected or disconnected datagram client/server.

Constructor & Destructor Documentation

◆ Udp() [1/3]

uvco::Udp::Udp ( const Loop loop)
explicit

Set up a UDP object.

34 : loop_{&loop}, udp_{std::make_unique<uv_udp_t>()} {
35 uv_udp_init(loop.uvloop(), udp_.get());
36}
const Loop * loop_
Definition udp.h:114
std::unique_ptr< uv_udp_t > udp_
Definition udp.h:115

◆ Udp() [2/3]

uvco::Udp::Udp ( Udp &&  other)
default

◆ Udp() [3/3]

uvco::Udp::Udp ( const Udp )
delete

◆ ~Udp()

uvco::Udp::~Udp ( )
38 {
39 if (is_receiving_) {
40 fmt::print(stderr, "Udp::~Udp(): please co_await udp.stopReceiveMany() "
41 "before dropping Udp instance.\n");
42 }
43 if (udp_) {
44 fmt::print(stderr, "Udp::~Udp(): closing UDP socket in dtor; "
45 "this will leak memory. "
46 "Please co_await udp.close() if possible.\n");
48 closeHandle(udp_.release());
49 }
50}
bool is_receiving_
Definition udp.h:118
void udpStopReceive()
Definition udp.cc:226
Promise< void > closeHandle(T *handle, C closer)
Definition close.h:28

Member Function Documentation

◆ bind() [1/2]

Promise< void > uvco::Udp::bind ( const AddressHandle address,
unsigned int  flag = 0 
)
65 {
66 const uv_status status = uv_udp_bind(udp_.get(), address.sockaddr(), flag);
67 if (status != 0) {
68 co_await close();
69 throw UvcoException{status, "binding UDP socket"};
70 }
71}
Promise< void > close()
Definition udp.cc:188
int uv_status
Result of a libuv operation, an errno error code.
Definition internal_utils.h:22

◆ bind() [2/2]

Promise< void > uvco::Udp::bind ( std::string_view  address,
uint16_t  port,
unsigned int  flag = 0 
)

Bind UDP socket to address.

◆ close()

Promise< void > uvco::Udp::close ( )

Close UDP socket. Await on the returned promise to ensure that the socket is fully closed.

188 {
189 BOOST_ASSERT(udp_);
190 if (!dataIsNull(udp_.get())) {
191 auto *const awaiter = getData<RecvAwaiter_>(udp_.get());
192 fmt::print(stderr, "Udp::close(): stopping receiving. Please instead use "
193 "Udp::stopReceivingMany() explicitly.\n");
194 // Force return from receiveMany() generator.
195 if (awaiter->handle_) {
196 const auto resumeHandle = awaiter->handle_.value();
197 awaiter->handle_.reset();
198 resumeHandle.resume();
199 }
200 }
201 co_await closeHandle(udp_.get());
202 udp_.reset();
203 connected_ = false;
204}
bool connected_
Definition udp.h:116
bool dataIsNull(Handle *handle)
Definition internal_utils.h:66

◆ connect() [1/2]

Promise< void > uvco::Udp::connect ( const AddressHandle address)
88 {
89 uv_udp_connect(udp_.get(), nullptr);
90 const uv_status status = uv_udp_connect(udp_.get(), address.sockaddr());
91 if (status != 0) {
92 co_await close();
93 throw UvcoException{status, "connecting UDP socket"};
94 }
95 connected_ = true;
96}

◆ connect() [2/2]

Promise< void > uvco::Udp::connect ( std::string_view  address,
uint16_t  port,
bool  ipv6only = false 
)

Connect UDP socket to address.

74 {
75 Resolver resolver{*loop_};
76 const int hint = ipv6only ? AF_INET6 : AF_UNSPEC;
77 AddressHandle addressHandle = co_await resolver.gai(address, port, hint);
78
79 uv_udp_connect(udp_.get(), nullptr);
80 const uv_status status = uv_udp_connect(udp_.get(), addressHandle.sockaddr());
81 if (status != 0) {
82 co_await close();
83 throw UvcoException{status, "connecting UDP socket"};
84 }
85 connected_ = true;
86}

◆ getPeername()

std::optional< AddressHandle > uvco::Udp::getPeername ( ) const

Obtain peer name if connected.

340 {
341 struct sockaddr_storage address {};
342 int ss_size = sizeof(struct sockaddr_storage);
343 const uv_status status =
344 uv_udp_getpeername(udp_.get(), (struct sockaddr *)&address, &ss_size);
345 if (status < 0) {
346 if (status == UV_ENOTCONN) {
347 return std::nullopt;
348 }
349 throw UvcoException(status, "Error in getpeername");
350 }
351 AddressHandle addressHandle{(struct sockaddr *)&address};
352 return addressHandle;
353}

◆ getSockname()

AddressHandle uvco::Udp::getSockname ( ) const

Obtain locally bound name of socket.

328 {
329 struct sockaddr_storage address {};
330 int ss_size = sizeof(struct sockaddr_storage);
331 const uv_status status =
332 uv_udp_getsockname(udp_.get(), (struct sockaddr *)&address, &ss_size);
333 if (status < 0) {
334 throw UvcoException(status, "Error in getsockname");
335 }
336 AddressHandle addressHandle{(struct sockaddr *)&address};
337 return addressHandle;
338}

◆ joinMulticast()

void uvco::Udp::joinMulticast ( const std::string &  address,
const std::string &  interface 
)

Join a multicast group.

311 {
312 const uv_status status = uv_udp_set_membership(
313 udp_.get(), address.c_str(), interface.c_str(), UV_JOIN_GROUP);
314 if (status != 0) {
315 throw UvcoException(status, "join multicast group");
316 }
317}

◆ leaveMulticast()

void uvco::Udp::leaveMulticast ( const std::string &  address,
const std::string &  interface 
)

Leave multicast group.

320 {
321 const uv_status status = uv_udp_set_membership(
322 udp_.get(), address.c_str(), interface.c_str(), UV_LEAVE_GROUP);
323 if (status != 0) {
324 throw UvcoException(status, "join multicast group");
325 }
326}

◆ onReceiveOne()

void uvco::Udp::onReceiveOne ( uv_udp_t *  handle,
ssize_t  nread,
const uv_buf_t *  buf,
const struct sockaddr *  addr,
unsigned int  flags 
)
staticprivate
237 {
238
239 BOOST_ASSERT(!dataIsNull(handle));
240 auto *awaiter = getData<RecvAwaiter_>(handle);
241
242 if (addr == nullptr) {
243 // Error or asking to free buffers.
244 if (0 == (flags & UV_UDP_MMSG_CHUNK)) {
245 freeUvBuf(buf);
246 }
247 return;
248 }
249
250 if (awaiter->stop_receiving_) {
251 uv_udp_recv_stop(handle);
252 }
253 if (awaiter->buffer_.hasSpace()) {
254 if (nread >= 0) {
255 awaiter->buffer_.put(RecvAwaiter_::QueueItem_{
256 std::make_pair(std::string{buf->base, static_cast<size_t>(nread)},
257 AddressHandle{addr})});
258 } else {
259 awaiter->buffer_.put(
260 RecvAwaiter_::QueueItem_{static_cast<uv_status>(nread)});
261 }
262 } else {
263 fmt::print(stderr, "Udp::onReceiveOne: dropping packet, buffer full\n");
264 }
265
266 if (0 == (flags & UV_UDP_MMSG_CHUNK)) {
267 freeUvBuf(buf);
268 }
269
270 // Only enqueues once; if this callback is called again, the receiver will
271 // already have been resumed.
272 if (awaiter->handle_) {
273 auto resumeHandle = *awaiter->handle_;
274 awaiter->handle_.reset();
275 Loop::enqueue(resumeHandle);
276 }
277}
static void enqueue(std::coroutine_handle<> handle)
Definition loop.cc:73
void freeUvBuf(const uv_buf_t *buf)
Definition internal_utils.cc:26
std::variant< std::pair< std::string, AddressHandle >, uv_status > QueueItem_
Definition udp.h:128

◆ onSendDone()

void uvco::Udp::onSendDone ( uv_udp_send_t *  req,
uv_status  status 
)
staticprivate
381 {
382 auto *const awaiter = getRequestData<SendAwaiter_>(req);
383 awaiter->status_ = status;
384 if (awaiter->handle_) {
385 auto resumeHandle = *awaiter->handle_;
386 awaiter->handle_.reset();
387 Loop::enqueue(resumeHandle);
388 }
389}

◆ operator=() [1/2]

Udp & uvco::Udp::operator= ( const Udp )
delete

◆ operator=() [2/2]

Udp & uvco::Udp::operator= ( Udp &&  other)
default

◆ receiveMany()

MultiPromise< std::pair< std::string, AddressHandle > > uvco::Udp::receiveMany ( )

Generate packets received on socket. Call stopReceiveMany() when no more packets are desired; otherwise this will continue indefinitely.

Only one coroutine can be receiving at a time. This is currently enforced by assertions.

157 {
158 RecvAwaiter_ awaiter{};
159 awaiter.stop_receiving_ = false;
160 BOOST_ASSERT(dataIsNull(udp_.get()));
161 setData(udp_.get(), &awaiter);
162
163 const uv_status status = udpStartReceive();
164 if (status != 0) {
165 setData(udp_.get(), (void *)nullptr);
166 throw UvcoException(status, "receiveMany(): uv_udp_recv_start()");
167 }
168
169 while (uv_is_active((uv_handle_t *)udp_.get()) != 0) {
170 // Awaiter returns empty optional on requested stop (stopReceiveMany()).
171 std::optional<std::pair<std::string, AddressHandle>> buffer =
172 co_await awaiter;
173 if (!buffer) {
174 break;
175 }
176 // It's possible that co_yield doesn't resume anymore, therefore clear
177 // reference to local awaiter.
178 setData(udp_.get(), (void *)nullptr);
179 co_yield std::move(buffer.value());
180 BOOST_ASSERT(dataIsNull(udp_.get()));
181 setData(udp_.get(), &awaiter);
182 }
183 setData(udp_.get(), (void *)nullptr);
185 co_return;
186}
int udpStartReceive()
Definition udp.cc:231
void setData(Handle *handle, Data *data)
Definition internal_utils.h:55

◆ receiveOne()

Promise< std::string > uvco::Udp::receiveOne ( )

Receive a single UDP packet.

TODO: use a better-suited buffer type.

Only one coroutine can be receiving at a time. This is currently enforced by assertions.

133 {
134 auto packet = co_await receiveOneFrom();
135 co_return std::move(packet.first);
136}
Promise< std::pair< std::string, AddressHandle > > receiveOneFrom()
Definition udp.cc:138

◆ receiveOneFrom()

Promise< std::pair< std::string, AddressHandle > > uvco::Udp::receiveOneFrom ( )

Receive a single UDP packet and also return the sender's address.

Only one coroutine can be receiving at a time. This is currently enforced by assertions.

138 {
139 RecvAwaiter_ awaiter{};
140 BOOST_ASSERT(dataIsNull(udp_.get()));
141 setData(udp_.get(), &awaiter);
142 const uv_status status = udpStartReceive();
143 if (status != 0) {
144 setData(udp_.get(), (void *)nullptr);
145 throw UvcoException(status, "uv_udp_recv_start()");
146 }
147
148 // Exception thrown here if occurred.
149 std::optional<std::pair<std::string, AddressHandle>> packet =
150 co_await awaiter;
151
152 // Any exceptions are thrown in RecvAwaiter_::await_resume
153 setData(udp_.get(), (void *)nullptr);
154 co_return std::move(packet.value());
155}

◆ send()

Promise< void > uvco::Udp::send ( std::span< char >  buffer,
std::optional< AddressHandle address = {} 
)

Send to address, or send to connected peer. Must be a mutable buffer because libuv requires it - the buffer will not be modified.

99 {
100 SendAwaiter_ sendAwaiter{};
101 uv_udp_send_t req{};
102 setRequestData(&req, &sendAwaiter);
103
104 std::array<uv_buf_t, 1> bufs{};
105 // The buffer is never written to, so this is necessary to interface
106 // with the legacy C code.
107 bufs[0].base = &(*buffer.begin());
108 bufs[0].len = buffer.size_bytes();
109
110 const struct sockaddr *addr = nullptr;
111 if (address) {
112 addr = address->sockaddr();
113 }
114
115 const uv_status status =
116 uv_udp_send(&req, udp_.get(), bufs.begin(), 1, addr, onSendDone);
117 if (status != 0) {
118 setRequestData(&req, (void *)nullptr);
119 throw UvcoException{status, "uv_udp_send() failed immediately"};
120 }
121
122 const uv_status status_done = co_await sendAwaiter;
123 if (status_done != 0) {
124 setRequestData(&req, (void *)nullptr);
125 throw UvcoException{status_done, "uv_udp_send() failed while sending"};
126 }
127
128 setRequestData(&req, (void *)nullptr);
129
130 co_return;
131}
static void onSendDone(uv_udp_send_t *req, uv_status status)
Definition udp.cc:381
void setRequestData(Request *req, Data *data)
Definition internal_utils.h:61

◆ setBroadcast()

void uvco::Udp::setBroadcast ( bool  enabled)

Enable sending to broadcast addresses.

279 {
280 const uv_status status =
281 uv_udp_set_broadcast(udp_.get(), static_cast<int>(enabled));
282 if (status != 0) {
283 throw UvcoException(status, "join multicast group");
284 }
285}

◆ setMulticastInterface()

void uvco::Udp::setMulticastInterface ( const std::string &  interfaceAddress)

Set multicast interface for sending or receiving data.

294 {
295 const uv_status status =
296 uv_udp_set_multicast_interface(udp_.get(), interfaceAddress.c_str());
297 if (status != 0) {
298 throw UvcoException(status, "join multicast group");
299 }
300}

◆ setMulticastLoop()

void uvco::Udp::setMulticastLoop ( bool  enabled)

If the loop flag is enabled, sent multicast packets will arrive back on the sending socket.

302 {
303 const uv_status status =
304 uv_udp_set_multicast_loop(udp_.get(), static_cast<int>(enabled));
305 if (status != 0) {
306 throw UvcoException(status, "join multicast group");
307 }
308}

◆ setTtl()

void uvco::Udp::setTtl ( uint8_t  ttl)

Set TTL on sent packets. TTL must be between 1 and 255.

287 {
288 const uv_status status = uv_udp_set_ttl(udp_.get(), static_cast<int>(ttl));
289 if (status != 0) {
290 throw UvcoException(status, "join multicast group");
291 }
292}

◆ stopReceiveMany()

void uvco::Udp::stopReceiveMany ( MultiPromise< std::pair< std::string, AddressHandle > > &  packets)

Stop receiving with receiveMany() by cancelling the receiving generator coroutine. Supply the MultiPromise obtained from receiveMany() in order to guarantee a complete clean-up.

207 {
209 // Cancel receiving generator if currently suspended by co_yield.
210 packets.cancel();
211 if (dataIsNull(udp_.get())) {
212 return;
213 }
214 auto *const currentAwaiter = getData<RecvAwaiter_>(udp_.get());
215 // If generator is suspended on co_await, resume it synchronously so it can
216 // exit before the Udp instance is possibly destroyed.
217 if (currentAwaiter->handle_) {
218 // Don't schedule this on the event loop: we must synchronously terminate
219 // the onReceiveMany() loop, otherwise it will exist after destruction of
220 // the Udp instance and read invalid memory.
221 currentAwaiter->handle_->resume();
222 // Don't touch currentAwaiter after this!!
223 }
224}

◆ udpStartReceive()

int uvco::Udp::udpStartReceive ( )
private
231 {
232 BOOST_ASSERT(udp_);
233 return uv_udp_recv_start(udp_.get(), allocator, onReceiveOne);
234}
static void onReceiveOne(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned int flags)
Definition udp.cc:236
void allocator(uv_handle_t *, size_t sugg, uv_buf_t *buf)
libuv allocator.
Definition internal_utils.cc:18

◆ udpStopReceive()

void uvco::Udp::udpStopReceive ( )
private
226 {
227 BOOST_ASSERT(udp_);
228 uv_udp_recv_stop(udp_.get());
229}

◆ underlying()

uv_udp_t * uvco::Udp::underlying ( ) const
355{ return udp_.get(); }

Member Data Documentation

◆ connected_

bool uvco::Udp::connected_ = false
private

◆ is_receiving_

bool uvco::Udp::is_receiving_ = false
private

◆ loop_

const Loop* uvco::Udp::loop_
private

◆ udp_

std::unique_ptr<uv_udp_t> uvco::Udp::udp_
private

The documentation for this class was generated from the following files: