uvco 0.1
Loading...
Searching...
No Matches
uvco::StreamBase Class Reference

A plain stream, permitting reading, writing, and closing. More...

#include <stream.h>

Inheritance diagram for uvco::StreamBase:

Classes

struct  InStreamAwaiter_
struct  OutStreamAwaiter_
struct  ShutdownAwaiter_

Public Member Functions

template<typename Stream>
 StreamBase (std::unique_ptr< Stream > stream)
 StreamBase (const StreamBase &)=delete
 StreamBase (StreamBase &&)=default
StreamBaseoperator= (const StreamBase &)=delete
StreamBaseoperator= (StreamBase &&)=default
virtual ~StreamBase ()
Promise< std::optional< std::string > > read (size_t maxSize=defaultMaxReadSize)
Promise< size_t > read (std::span< char > buffer)
Promise< void > write (std::string buf)
Promise< void > shutdown ()
void close ()
const uv_stream_t * underlying () const
 Return the underlying UV stream object.
uv_os_fd_t fd () const

Static Public Attributes

static constexpr size_t defaultMaxReadSize = 4080

Protected Member Functions

uv_stream_t & stream ()
void destroyStream ()

Private Attributes

std::unique_ptr< uv_stream_t, UvHandleDeleterstream_
std::coroutine_handle reader_
std::coroutine_handle writer_

Detailed Description

A plain stream, permitting reading, writing, and closing.

Constructor & Destructor Documentation

◆ StreamBase() [1/3]

template<typename Stream>
uvco::StreamBase::StreamBase ( std::unique_ptr< Stream > stream)
inlineexplicit
38 : stream_{(uv_stream_t *)stream.release()} {}
uv_stream_t & stream()
Definition stream.h:104
std::unique_ptr< uv_stream_t, UvHandleDeleter > stream_
Definition stream.h:114

◆ StreamBase() [2/3]

uvco::StreamBase::StreamBase ( const StreamBase & )
delete

◆ StreamBase() [3/3]

uvco::StreamBase::StreamBase ( StreamBase && )
default

◆ ~StreamBase()

uvco::StreamBase::~StreamBase ( )
virtual

Closes the stream if not already closed. It's always best to explicitly call co_await stream.close(), but as a backup, the destructor will close the stream if still open. To do so, it will schedule a callback on the libuv loop.

93{ close(); }
void close()
Definition stream.cc:170

Member Function Documentation

◆ close()

void uvco::StreamBase::close ( )

Informs pending readers and writers of the close and causes them to return an empty optional.

170 {
171 if (stream_ != nullptr) {
172 closeHandle(stream_.release());
173 }
174 if (reader_ != nullptr) {
175 std::coroutine_handle<> reader = reader_;
176 reader_ = nullptr;
177 reader.resume();
178 }
179 if (writer_ != nullptr) {
180 std::coroutine_handle<> writer = writer_;
181 writer_ = nullptr;
182 writer.resume();
183 }
184}
std::coroutine_handle reader_
Definition stream.h:117
std::coroutine_handle writer_
Definition stream.h:118
void closeHandle(Handle *handle, void(*closer)(CloserArg *, void(*)(uv_handle_t *)))
Definition close.h:37

◆ destroyStream()

void uvco::StreamBase::destroyStream ( )
inlineprotected
108 {
109 BOOST_ASSERT(stream_);
110 stream_.reset();
111 }

◆ fd()

uv_os_fd_t uvco::StreamBase::fd ( ) const
nodiscard
361 {
362 uv_os_fd_t fd{};
363 const uv_status status = uv_fileno((uv_handle_t *)stream_.get(), &fd);
364 if (status != 0) {
365 throw UvcoException(status, "StreamBase::fd(): uv_fileno() failed: " +
366 std::string{uv_strerror(status)});
367 }
368 return fd;
369}
uv_os_fd_t fd() const
Definition stream.cc:361
int uv_status
Result of a libuv operation, an errno error code.
Definition internal_utils.h:22

◆ operator=() [1/2]

StreamBase & uvco::StreamBase::operator= ( const StreamBase & )
delete

◆ operator=() [2/2]

StreamBase & uvco::StreamBase::operator= ( StreamBase && )
default

◆ read() [1/2]

Promise< std::optional< std::string > > uvco::StreamBase::read ( size_t maxSize = defaultMaxReadSize)
nodiscard

Read available data (up to maxSize bytes) from stream. Returns std::nullopt on EOF or closed handle (close()).

Throws UvcoException on error.

NOTE: Consider using read(std::span<char>) for better performance.

NOTE: only one reader is allowed to be active at a time. If a read is started while another is still active, uvco will abort the process (in Debug mode), or ignore the first read (in Release mode).

105 {
106 // This is a promise root function, i.e. origin of a promise.
107 std::string buf(maxSize, '\0');
108 InStreamAwaiter_ awaiter{*this, buf};
109 const size_t nRead = co_await awaiter;
110 if (nRead == 0) {
111 // EOF.
112 co_return std::nullopt;
113 }
114 buf.resize(nRead);
115 co_return buf;
116}
Definition stream.cc:42

◆ read() [2/2]

Promise< size_t > uvco::StreamBase::read ( std::span< char > buffer)

Read available data (up to buffer.size() bytes) from stream. Returns the number of bytes read, or 0 on EOF or closed handle (close()).

Only one read() coroutine may be active at a time. The stream must outlive the coroutine, i.e. live until co_await stream.read(...) returns a result.

Throws UvcoException on error.

118 {
119 InStreamAwaiter_ awaiter{*this, buffer};
120 co_return (co_await awaiter);
121}

◆ shutdown()

Promise< void > uvco::StreamBase::shutdown ( )
nodiscard

Shut down stream for writing. This is a half-close; the other side can still write. The result of shutdown() must be co_awaited.

154 {
155 auto shutdownReq = std::make_unique<uv_shutdown_t>();
156 ShutdownAwaiter_ awaiter;
157 const OnExit _onExit{[req = shutdownReq.get(), &awaiter] {
158 if (awaiter.handle_) {
159 resetRequestData(req);
160 }
161 }};
162
163 setRequestData(shutdownReq.get(), &awaiter);
164 uv_shutdown(shutdownReq.release(), &stream(),
166 co_await awaiter;
167 co_return;
168}
void resetRequestData(Request *req)
Definition internal_utils.h:95
void setRequestData(Request *req, Data *data)
Set data pointer on request.
Definition internal_utils.h:90
Definition stream.cc:30
static void onShutdown(uv_shutdown_t *req, uv_status status)
Definition stream.cc:346

◆ stream()

uv_stream_t & uvco::StreamBase::stream ( )
inlineprotected
104 {
105 BOOST_ASSERT(stream_);
106 return *stream_;
107 }

◆ underlying()

const uv_stream_t * uvco::StreamBase::underlying ( ) const
inlinenodiscard

Return the underlying UV stream object.

99{ return stream_.get(); }

◆ write()

Promise< void > uvco::StreamBase::write ( std::string buf)
nodiscard

Write a buffer to the stream. A copy of buf is taken because it is undetermined when the actual write will occur. Await the result if the status is important; the write will be executed even without awaiting (as long as the process keeps running).

NOTE: only one writer is allowed to be active at a time. If two writes are started simultaneously, the process will be aborted in Debug mode, or the first write() coroutine will not return in Release mode.

WARNING: due to the interactions with libuv, writes cannot be cancelled. That's why write() always takes ownership of a buffer to avoid use-after-free issues. The safety comes at the cost of increased allocations, obviously.

123 {
124 OutStreamAwaiter_ awaiter{*this, std::move(buf)};
125 std::array<uv_buf_t, 1> bufs{};
126 bufs[0] = uv_buf_init(const_cast<char *>(awaiter.write_->buffer.data()),
127 awaiter.write_->buffer.size());
128
129 setRequestData(awaiter.write_.get(), &awaiter);
130 const OnExit _onExit{[write = awaiter.write_.get(), &awaiter] {
131 // handle_ is reset by onOutStreamWrite callback. If it's non-null, it's
132 // because write() was cancelled.
133 if (awaiter.handle_) {
135 }
136 }};
137
138 const uv_status status =
139 uv_write((uv_write_t *)(awaiter.write_.release()), &stream(), bufs.data(),
141 if (status < 0) {
142 throw UvcoException{status,
143 "StreamBase::write() encountered error in uv_write"};
144 }
145 const uv_status completionStatus = co_await awaiter;
146 // may be UV_ECANCELED if the stream has been dropped.
147 if (completionStatus < 0) {
148 throw UvcoException{
149 status, "StreamBase::write() encountered error while awaiting write"};
150 }
151 co_return;
152}
Promise< void > write(std::string buf)
Definition stream.cc:123
static void onOutStreamWrite(uv_write_t *write, uv_status status)
Definition stream.cc:315

Member Data Documentation

◆ defaultMaxReadSize

size_t uvco::StreamBase::defaultMaxReadSize = 4080
staticconstexpr

◆ reader_

std::coroutine_handle uvco::StreamBase::reader_
private

◆ stream_

std::unique_ptr<uv_stream_t, UvHandleDeleter> uvco::StreamBase::stream_
private

◆ writer_

std::coroutine_handle uvco::StreamBase::writer_
private

The documentation for this class was generated from the following files: