uvco 0.1
Loading...
Searching...
No Matches
Classes | Public Member Functions | Static Public Attributes | Protected Member Functions | Private Attributes | List of all members
uvco::StreamBase Class Reference

A plain stream, permitting reading, writing, and closing. More...

#include <stream.h>

Inheritance diagram for uvco::StreamBase:
Inheritance graph
[legend]

Classes

struct  InStreamAwaiter_
 
struct  OutStreamAwaiter_
 
struct  ShutdownAwaiter_
 

Public Member Functions

template<typename Stream >
 StreamBase (std::unique_ptr< Stream > stream)
 
 StreamBase (const StreamBase &)=delete
 
 StreamBase (StreamBase &&)=default
 
StreamBaseoperator= (const StreamBase &)=delete
 
StreamBaseoperator= (StreamBase &&)=default
 
virtual ~StreamBase ()
 
Promise< std::optional< std::string > > read (size_t maxSize=defaultMaxReadSize)
 
Promise< size_t > read (std::span< char > buffer)
 
Promise< size_t > write (std::string buf)
 
Promise< void > shutdown ()
 
Promise< void > close ()
 
const uv_stream_t * underlying () const
 Return the underlying UV stream object.
 

Static Public Attributes

static constexpr size_t defaultMaxReadSize = 4096
 

Protected Member Functions

uv_stream_t & stream ()
 
void destroyStream ()
 

Private Attributes

std::unique_ptr< uv_stream_t, UvHandleDeleterstream_
 
std::optional< std::coroutine_handle<> > reader_
 
std::optional< std::coroutine_handle<> > writer_
 

Detailed Description

A plain stream, permitting reading, writing, and closing.

Constructor & Destructor Documentation

◆ StreamBase() [1/3]

template<typename Stream >
uvco::StreamBase::StreamBase ( std::unique_ptr< Stream >  stream)
inlineexplicit
36 : stream_{(uv_stream_t *)stream.release()} {}
uv_stream_t & stream()
Definition stream.h:91
std::unique_ptr< uv_stream_t, UvHandleDeleter > stream_
Definition stream.h:101

◆ StreamBase() [2/3]

uvco::StreamBase::StreamBase ( const StreamBase )
delete

◆ StreamBase() [3/3]

uvco::StreamBase::StreamBase ( StreamBase &&  )
default

◆ ~StreamBase()

uvco::StreamBase::~StreamBase ( )
virtual
28 {
29 // close() MUST be called and awaited before dtor.
30 if (stream_) {
31 fmt::print(stderr, "StreamBase::~StreamBase(): closing stream in dtor; "
32 "this will leak memory. "
33 "Please co_await stream.close() if possible.\n");
34 // Asynchronously close handle. It's better to leak memory than file
35 // descriptors.
36 closeHandle(stream_.release());
37 }
38}
Promise< void > closeHandle(T *handle, C closer)
Definition close.h:28

Member Function Documentation

◆ close()

Promise< void > uvco::StreamBase::close ( )

The result of close() must be co_awaited; otherwise memory may be leaked. (this is not an issue just before termination of a process)

Informs pending readers and writers of the close and causes them to return an empty optional.

89 {
90 auto stream = std::move(stream_);
91 co_await closeHandle(stream.get());
92 if (reader_) {
93 const auto reader = *reader_;
94 reader_.reset();
95 Loop::enqueue(reader);
96 }
97 if (writer_) {
98 const auto writer = *writer_;
99 writer_.reset();
100 Loop::enqueue(writer);
101 }
102}
static void enqueue(std::coroutine_handle<> handle)
Definition loop.cc:73
std::optional< std::coroutine_handle<> > reader_
Definition stream.h:103
std::optional< std::coroutine_handle<> > writer_
Definition stream.h:104

◆ destroyStream()

void uvco::StreamBase::destroyStream ( )
inlineprotected
95 {
96 BOOST_ASSERT(stream_);
97 stream_.reset();
98 }

◆ operator=() [1/2]

StreamBase & uvco::StreamBase::operator= ( const StreamBase )
delete

◆ operator=() [2/2]

StreamBase & uvco::StreamBase::operator= ( StreamBase &&  )
default

◆ read() [1/2]

Promise< std::optional< std::string > > uvco::StreamBase::read ( size_t  maxSize = defaultMaxReadSize)

Read available data (up to maxSize bytes) from stream. Returns std::nullopt on EOF or closed handle (close()).

Throws UvcoException on error.

NOTE: Consider using read(std::span<char>) for better performance.

NOTE: only one reader is allowed to be active at a time. If a read is started while another is still active, uvco will abort the process (in Debug mode), or ignore the first read (in Release mode).

50 {
51 // This is a promise root function, i.e. origin of a promise.
52 std::string buf(maxSize, '\0');
53 InStreamAwaiter_ awaiter{*this, buf};
54 const size_t nRead = co_await awaiter;
55 if (nRead == 0) {
56 // EOF.
57 co_return std::nullopt;
58 }
59 buf.resize(nRead);
60 co_return buf;
61}

◆ read() [2/2]

Promise< size_t > uvco::StreamBase::read ( std::span< char >  buffer)

Read available data (up to buffer.size() bytes) from stream. Returns the number of bytes read, or 0 on EOF or closed handle (close()).

Only one read() coroutine may be active at a time.

Throws UvcoException on error.

63 {
64 InStreamAwaiter_ awaiter{*this, buffer};
65 size_t n = co_await awaiter;
66 co_return n;
67}

◆ shutdown()

Promise< void > uvco::StreamBase::shutdown ( )

Shut down stream for writing. This is a half-close; the other side can still write. The result of shutdown() must be co_awaited.

78 {
79 uv_shutdown_t shutdownReq;
80 ShutdownAwaiter_ awaiter;
81
82 shutdownReq.data = &awaiter;
83 uv_shutdown(&shutdownReq, &stream(),
85 co_await awaiter;
86 co_return;
87}
static void onShutdown(uv_shutdown_t *req, uv_status status)
Definition stream.cc:252

◆ stream()

uv_stream_t & uvco::StreamBase::stream ( )
inlineprotected
91 {
92 BOOST_ASSERT(stream_);
93 return *stream_;
94 }

◆ underlying()

const uv_stream_t * uvco::StreamBase::underlying ( ) const
inline

Return the underlying UV stream object.

88{ return stream_.get(); }

◆ write()

Promise< size_t > uvco::StreamBase::write ( std::string  buf)

Write a buffer to the stream. A copy of buf is taken because it is undetermined when the actual write will occur. Await the result if the status is important; the write will be executed even without awaiting (as long as the process keeps running).

NOTE: only one writer is allowed to be active at a time. If two writes are started simultaneously, the process will be aborted in Debug mode, or the first write() coroutine will not return in Release mode.

69 {
70 OutStreamAwaiter_ awaiter{*this, std::move(buf)};
71 uv_status status = co_await awaiter;
72 if (status < 0) {
73 throw UvcoException{status, "StreamBase::write() encountered error"};
74 }
75 co_return static_cast<size_t>(status);
76}
int uv_status
Result of a libuv operation, an errno error code.
Definition internal_utils.h:22

Member Data Documentation

◆ defaultMaxReadSize

constexpr size_t uvco::StreamBase::defaultMaxReadSize = 4096
staticconstexpr

◆ reader_

std::optional<std::coroutine_handle<> > uvco::StreamBase::reader_
private

◆ stream_

std::unique_ptr<uv_stream_t, UvHandleDeleter> uvco::StreamBase::stream_
private

◆ writer_

std::optional<std::coroutine_handle<> > uvco::StreamBase::writer_
private

The documentation for this class was generated from the following files: