uvco 0.1
Loading...
Searching...
No Matches
uvco::Udp Class Reference

A UDP socket. More...

#include <udp.h>

Collaboration diagram for uvco::Udp:

Classes

struct  RecvAwaiter_
struct  SendAwaiter_

Public Member Functions

 Udp (const Loop &loop)
 Set up a UDP object.
 Udp (Udp &&other)=default
Udpoperator= (Udp &&other)=default
 Udp (const Udp &)=delete
Udpoperator= (const Udp &)=delete
 ~Udp ()
Promise< void > bind (std::string_view address, uint16_t port, unsigned int flag=0)
 Bind UDP socket to address.
Promise< void > bind (const AddressHandle &address, unsigned int flag=0)
Promise< void > connect (std::string_view address, uint16_t port, bool ipv6only=false)
 Connect UDP socket to address.
Promise< void > connect (const AddressHandle &address)
Promise< void > send (std::span< const char > buffer, std::optional< AddressHandle > address={})
Promise< std::string > receiveOne ()
Promise< std::pair< std::string, AddressHandle > > receiveOneFrom ()
MultiPromise< std::pair< std::string, AddressHandle > > receiveMany ()
void stopReceiveMany (MultiPromise< std::pair< std::string, AddressHandle > > &packets)
void setBroadcast (bool enabled)
 Enable sending to broadcast addresses.
void setTtl (uint8_t ttl)
 Set TTL on sent packets. TTL must be between 1 and 255.
void setMulticastInterface (const std::string &interfaceAddress)
 Set multicast interface for sending or receiving data.
void setMulticastLoop (bool enabled)
void joinMulticast (const std::string &address, const std::string &interface)
 Join a multicast group.
void leaveMulticast (const std::string &address, const std::string &interface)
 Leave multicast group.
AddressHandle getSockname () const
 Obtain locally bound name of socket.
std::optional< AddressHandlegetPeername () const
 Obtain peer name if connected.
void close ()

Private Member Functions

int udpStartReceive ()
void udpStopReceive ()

Static Private Member Functions

static void onSendDone (uv_udp_send_t *req, uv_status status)
static void onReceiveOne (uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned int flags)

Private Attributes

const Looploop_
std::unique_ptr< uv_udp_t > udp_
bool connected_ = false

Detailed Description

A UDP socket.

Constructor & Destructor Documentation

◆ Udp() [1/3]

uvco::Udp::Udp ( const Loop & loop)
explicit

Set up a UDP object.

74 : loop_{&loop}, udp_{std::make_unique<uv_udp_t>()} {
75 uv_udp_init(loop.uvloop(), udp_.get());
76}
const Loop * loop_
Definition udp.h:109
std::unique_ptr< uv_udp_t > udp_
Definition udp.h:110

◆ Udp() [2/3]

uvco::Udp::Udp ( Udp && other)
default

◆ Udp() [3/3]

uvco::Udp::Udp ( const Udp & )
delete

◆ ~Udp()

uvco::Udp::~Udp ( )
78 {
79 if (udp_) {
81 closeHandle(udp_.release());
82 }
83}
void udpStopReceive()
Definition udp.cc:246
void closeHandle(Handle *handle, void(*closer)(CloserArg *, void(*)(uv_handle_t *)))
Definition close.h:37

Member Function Documentation

◆ bind() [1/2]

Promise< void > uvco::Udp::bind ( const AddressHandle & address,
unsigned int flag = 0 )
98 {
99 const uv_status status = uv_udp_bind(udp_.get(), address.sockaddr(), flag);
100 if (status != 0) {
101 close();
102 throw UvcoException{status, "binding UDP socket"};
103 }
104 co_return;
105}
void close()
Definition udp.cc:218
int uv_status
Result of a libuv operation, an errno error code.
Definition internal_utils.h:22

◆ bind() [2/2]

Promise< void > uvco::Udp::bind ( std::string_view address,
uint16_t port,
unsigned int flag = 0 )

Bind UDP socket to address.

◆ close()

void uvco::Udp::close ( )

Close UDP socket. The socket is not closed immediately; it typically takes another turn of the libuv event loop before the closing operation is finished. In most cases you don't need to care about that.

218 {
219 if (udp_ == nullptr) {
220 return;
221 }
222 if (!dataIsNull(udp_.get())) {
223 auto *const awaiter = getData<RecvAwaiter_>(udp_.get());
224 awaiter->buffer_.put(uv_status{UV_ECANCELED});
225 if (awaiter->handle_) {
226 // Loop::enqueue(awaiter->handle_.value());
227 const std::coroutine_handle<> h = awaiter->handle_.value();
228 awaiter->handle_.reset();
229 // This immediately resumes the receiveMany generator which will get the
230 // ECANCELED exception, store it, and make it available to anyone waiting
231 // on it.
232 h.resume();
233 }
234 }
235 closeHandle(udp_.release());
236 connected_ = false;
237}
bool connected_
Definition udp.h:111
Into * getData(const Handle *handle)
Obtain data pointer set on handle with nullptr check and type cast.
Definition internal_utils.h:42
bool dataIsNull(Handle *handle)
Check if handle data is null.
Definition internal_utils.h:101

◆ connect() [1/2]

Promise< void > uvco::Udp::connect ( const AddressHandle & address)
123 {
124 uv_udp_connect(udp_.get(), nullptr);
125 const uv_status status = uv_udp_connect(udp_.get(), address.sockaddr());
126 if (status != 0) {
127 close();
128 throw UvcoException{status, "connecting UDP socket"};
129 }
130 connected_ = true;
131 co_return;
132}

◆ connect() [2/2]

Promise< void > uvco::Udp::connect ( std::string_view address,
uint16_t port,
bool ipv6only = false )

Connect UDP socket to address.

108 {
109 Resolver resolver{*loop_};
110 const int hint = ipv6only ? AF_INET6 : AF_UNSPEC;
111 AddressHandle addressHandle = co_await resolver.gai(address, port, hint);
112
113 uv_udp_connect(udp_.get(), nullptr);
114 const uv_status status = uv_udp_connect(udp_.get(), addressHandle.sockaddr());
115 if (status != 0) {
116 close();
117 throw UvcoException{status, "connecting UDP socket"};
118 }
119 connected_ = true;
120 co_return;
121}

◆ getPeername()

std::optional< AddressHandle > uvco::Udp::getPeername ( ) const
nodiscard

Obtain peer name if connected.

364 {
365 struct sockaddr_storage address{};
366 int ss_size = sizeof(struct sockaddr_storage);
367 const uv_status status =
368 uv_udp_getpeername(udp_.get(), (struct sockaddr *)&address, &ss_size);
369 if (status < 0) {
370 if (status == UV_ENOTCONN) {
371 return std::nullopt;
372 }
373 throw UvcoException(status, "Error in getpeername");
374 }
375 AddressHandle addressHandle{(struct sockaddr *)&address};
376 return addressHandle;
377}

◆ getSockname()

AddressHandle uvco::Udp::getSockname ( ) const
nodiscard

Obtain locally bound name of socket.

352 {
353 struct sockaddr_storage address{};
354 int ss_size = sizeof(struct sockaddr_storage);
355 const uv_status status =
356 uv_udp_getsockname(udp_.get(), (struct sockaddr *)&address, &ss_size);
357 if (status < 0) {
358 throw UvcoException(status, "Error in getsockname");
359 }
360 AddressHandle addressHandle{(struct sockaddr *)&address};
361 return addressHandle;
362}

◆ joinMulticast()

void uvco::Udp::joinMulticast ( const std::string & address,
const std::string & interface )

Join a multicast group.

335 {
336 const uv_status status = uv_udp_set_membership(
337 udp_.get(), address.c_str(), interface.c_str(), UV_JOIN_GROUP);
338 if (status != 0) {
339 throw UvcoException(status, "join multicast group");
340 }
341}

◆ leaveMulticast()

void uvco::Udp::leaveMulticast ( const std::string & address,
const std::string & interface )

Leave multicast group.

344 {
345 const uv_status status = uv_udp_set_membership(
346 udp_.get(), address.c_str(), interface.c_str(), UV_LEAVE_GROUP);
347 if (status != 0) {
348 throw UvcoException(status, "join multicast group");
349 }
350}

◆ onReceiveOne()

void uvco::Udp::onReceiveOne ( uv_udp_t * handle,
ssize_t nread,
const uv_buf_t * buf,
const struct sockaddr * addr,
unsigned int flags )
staticprivate
257 {
258 auto *awaiter = getDataOrNull<RecvAwaiter_>(handle);
259 // cancelled?
260 if (awaiter == nullptr) {
261 uv_udp_recv_stop(handle);
262 freeUvBuf(buf);
263 return;
264 }
265
266 if (addr == nullptr) {
267 // Error or asking to free buffers.
268 if (0 == (flags & UV_UDP_MMSG_CHUNK)) {
269 freeUvBuf(buf);
270 }
271 return;
272 }
273
274 if (awaiter->stop_receiving_) {
275 uv_udp_recv_stop(handle);
276 }
277 if (awaiter->buffer_.hasSpace()) {
278 if (nread >= 0) {
279 awaiter->buffer_.put(RecvAwaiter_::QueueItem_{
280 std::make_pair(std::string{buf->base, static_cast<size_t>(nread)},
281 AddressHandle{addr})});
282 } else {
283 awaiter->buffer_.put(
284 RecvAwaiter_::QueueItem_{static_cast<uv_status>(nread)});
285 }
286 } else {
287 fmt::print(stderr, "Udp::onReceiveOne: dropping packet, buffer full\n");
288 }
289
290 if (0 == (flags & UV_UDP_MMSG_CHUNK)) {
291 freeUvBuf(buf);
292 }
293
294 // Only enqueues once; if this callback is called again, the receiver will
295 // already have been resumed.
296 if (awaiter->handle_) {
297 std::coroutine_handle<void> resumeHandle = *awaiter->handle_;
298 awaiter->handle_.reset();
299 Loop::enqueue(resumeHandle);
300 }
301}
static void enqueue(std::coroutine_handle<> handle)
Definition loop.cc:94
Into * getDataOrNull(const Handle *handle)
Definition internal_utils.h:51
void freeUvBuf(const uv_buf_t *buf)
Definition internal_utils.cc:26
std::variant< std::pair< std::string, AddressHandle >, uv_status > QueueItem_
Definition udp.cc:37

◆ onSendDone()

void uvco::Udp::onSendDone ( uv_udp_send_t * req,
uv_status status )
staticprivate
407 {
408 auto *const awaiter = getRequestDataOrNull<SendAwaiter_>(req);
409 if (awaiter == nullptr) {
410 // Send request cancelled/dropped.
411 return;
412 }
413 awaiter->status_ = status;
414 if (awaiter->handle_) {
415 std::coroutine_handle<void> resumeHandle = *awaiter->handle_;
416 awaiter->handle_.reset();
417 Loop::enqueue(resumeHandle);
418 }
419}
Into * getRequestDataOrNull(const Request *req)
Definition internal_utils.h:69

◆ operator=() [1/2]

Udp & uvco::Udp::operator= ( const Udp & )
delete

◆ operator=() [2/2]

Udp & uvco::Udp::operator= ( Udp && other)
default

◆ receiveMany()

MultiPromise< std::pair< std::string, AddressHandle > > uvco::Udp::receiveMany ( )

Generate packets received on socket. Call stopReceiveMany() when no more packets are desired; otherwise this will continue indefinitely.

Only one coroutine can be receiving at a time. This is currently enforced by assertions.

189 {
190 BOOST_ASSERT_MSG(dataIsNull(udp_.get()),
191 "only one coroutine can receive from UDP socket at a time");
192 RecvAwaiter_ awaiter{*udp_};
193 awaiter.stop_receiving_ = false;
194 const OnExit onExit{[this] {
195 if (udp_) {
197 resetData(udp_.get());
198 }
199 }};
200
201 const uv_status status = udpStartReceive();
202 if (status != 0) {
203 throw UvcoException(status, "receiveMany(): uv_udp_recv_start()");
204 }
205
206 while (uv_is_active((uv_handle_t *)udp_.get()) != 0) {
207 // Awaiter returns empty optional on requested stop (stopReceiveMany()).
208 std::optional<std::pair<std::string, AddressHandle>> buffer =
209 co_await awaiter;
210 if (!buffer) {
211 break;
212 }
213 co_yield std::move(buffer.value());
214 }
215 co_return;
216}
int udpStartReceive()
Definition udp.cc:251
void resetData(Handle *handle)
Reset data pointer on handle to nullptr.
Definition internal_utils.h:83
Definition udp.cc:35
bool stop_receiving_
Definition udp.cc:54

◆ receiveOne()

Promise< std::string > uvco::Udp::receiveOne ( )

Receive a single UDP packet.

TODO: use a better-suited buffer type.

Only one coroutine can be receiving at a time. This is currently enforced by assertions.

163 {
164 std::pair<std::basic_string<char>, AddressHandle> packet =
165 co_await receiveOneFrom();
166 co_return std::move(packet.first);
167}
Promise< std::pair< std::string, AddressHandle > > receiveOneFrom()
Definition udp.cc:169

◆ receiveOneFrom()

Promise< std::pair< std::string, AddressHandle > > uvco::Udp::receiveOneFrom ( )

Receive a single UDP packet and also return the sender's address.

Only one coroutine can be receiving at a time. This is currently enforced by assertions.

169 {
170 BOOST_ASSERT_MSG(dataIsNull(udp_.get()),
171 "only one coroutine can receive from UDP socket at a time");
172 RecvAwaiter_ awaiter{*udp_, 1};
173 awaiter.stop_receiving_ = true;
174 const OnExit onExit{[this] { resetData(udp_.get()); }};
175
176 const uv_status status = udpStartReceive();
177 if (status != 0) {
178 throw UvcoException(status, "uv_udp_recv_start()");
179 }
180
181 // Exception thrown here if occurred.
182 std::optional<std::pair<std::string, AddressHandle>> packet =
183 co_await awaiter;
184
185 // Any exceptions are thrown in RecvAwaiter_::await_resume
186 co_return std::move(packet.value());
187}

◆ send()

Promise< void > uvco::Udp::send ( std::span< const char > buffer,
std::optional< AddressHandle > address = {} )

Send to address, or send to connected peer. Must be a mutable buffer because libuv requires it - the buffer will not be modified.

135 {
136 uv_udp_send_t req{};
137 SendAwaiter_ sendAwaiter{req};
138
139 std::array<uv_buf_t, 1> bufs{};
140 // The buffer is never written to, so this is necessary to interface
141 // with the legacy C code.
142 bufs[0] = uv_buf_init(const_cast<char *>(buffer.data()), buffer.size_bytes());
143
144 const struct sockaddr *addr = nullptr;
145 if (address) {
146 addr = address->sockaddr();
147 }
148
149 const uv_status status =
150 uv_udp_send(&req, udp_.get(), bufs.begin(), 1, addr, onSendDone);
151 if (status != 0) {
152 throw UvcoException{status, "uv_udp_send() failed immediately"};
153 }
154
155 const uv_status status_done = co_await sendAwaiter;
156 if (status_done != 0) {
157 throw UvcoException{status_done, "uv_udp_send() failed while sending"};
158 }
159
160 co_return;
161}
static void onSendDone(uv_udp_send_t *req, uv_status status)
Definition udp.cc:407
Definition udp.cc:57

◆ setBroadcast()

void uvco::Udp::setBroadcast ( bool enabled)

Enable sending to broadcast addresses.

303 {
304 const uv_status status =
305 uv_udp_set_broadcast(udp_.get(), static_cast<int>(enabled));
306 if (status != 0) {
307 throw UvcoException(status, "join multicast group");
308 }
309}

◆ setMulticastInterface()

void uvco::Udp::setMulticastInterface ( const std::string & interfaceAddress)

Set multicast interface for sending or receiving data.

318 {
319 const uv_status status =
320 uv_udp_set_multicast_interface(udp_.get(), interfaceAddress.c_str());
321 if (status != 0) {
322 throw UvcoException(status, "join multicast group");
323 }
324}

◆ setMulticastLoop()

void uvco::Udp::setMulticastLoop ( bool enabled)

If the loop flag is enabled, sent multicast packets will arrive back on the sending socket.

326 {
327 const uv_status status =
328 uv_udp_set_multicast_loop(udp_.get(), static_cast<int>(enabled));
329 if (status != 0) {
330 throw UvcoException(status, "join multicast group");
331 }
332}

◆ setTtl()

void uvco::Udp::setTtl ( uint8_t ttl)

Set TTL on sent packets. TTL must be between 1 and 255.

311 {
312 const uv_status status = uv_udp_set_ttl(udp_.get(), static_cast<int>(ttl));
313 if (status != 0) {
314 throw UvcoException(status, "join multicast group");
315 }
316}

◆ stopReceiveMany()

void uvco::Udp::stopReceiveMany ( MultiPromise< std::pair< std::string, AddressHandle > > & packets)

Stop receiving with receiveMany() by cancelling the receiving generator coroutine. Supply the MultiPromise obtained from receiveMany() in order to guarantee a complete clean-up.

240 {
242 // Cancel receiving generator if currently suspended by co_yield.
243 packets.cancel();
244}

◆ udpStartReceive()

int uvco::Udp::udpStartReceive ( )
private
251 {
252 BOOST_ASSERT(udp_);
253 return uv_udp_recv_start(udp_.get(), allocator, onReceiveOne);
254}
static void onReceiveOne(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned int flags)
Definition udp.cc:256
void allocator(uv_handle_t *, size_t sugg, uv_buf_t *buf)
libuv allocator.
Definition internal_utils.cc:18

◆ udpStopReceive()

void uvco::Udp::udpStopReceive ( )
private
246 {
247 BOOST_ASSERT(udp_);
248 uv_udp_recv_stop(udp_.get());
249}

Member Data Documentation

◆ connected_

bool uvco::Udp::connected_ = false
private

◆ loop_

const Loop* uvco::Udp::loop_
private

◆ udp_

std::unique_ptr<uv_udp_t> uvco::Udp::udp_
private

The documentation for this class was generated from the following files: