uvco 0.1
Loading...
Searching...
No Matches
stream.h
Go to the documentation of this file.
1// uvco (c) 2023 Lewin Bormann. See LICENSE for specific terms.
2
3#pragma once
4
5#include <boost/assert.hpp>
6#include <fmt/core.h>
7#include <span>
8#include <string_view>
9#include <uv.h>
10#include <uv/unix.h>
11
12#include "uvco/close.h"
14#include "uvco/loop/loop.h"
16#include "uvco/run.h"
17
18#include <array>
19#include <coroutine>
20#include <cstdio>
21#include <memory>
22#include <optional>
23#include <string>
24#include <utility>
25
26namespace uvco {
27
30
33public:
34 template <typename Stream>
35 explicit StreamBase(std::unique_ptr<Stream> stream)
36 : stream_{(uv_stream_t *)stream.release()} {}
37 StreamBase(const StreamBase &) = delete;
38 StreamBase(StreamBase &&) = default;
39 StreamBase &operator=(const StreamBase &) = delete;
41 virtual ~StreamBase();
42
43 static constexpr size_t defaultMaxReadSize = 4096;
44
56 read(size_t maxSize = defaultMaxReadSize);
57
64 Promise<size_t> read(std::span<char> buffer);
65
74 [[nodiscard]] Promise<size_t> write(std::string buf);
75
78 [[nodiscard]] Promise<void> shutdown();
79
85 [[nodiscard]] Promise<void> close();
86
88 [[nodiscard]] const uv_stream_t *underlying() const { return stream_.get(); }
89
90protected:
91 uv_stream_t &stream() {
92 BOOST_ASSERT(stream_);
93 return *stream_;
94 }
96 BOOST_ASSERT(stream_);
97 stream_.reset();
98 }
99
100private:
101 std::unique_ptr<uv_stream_t, UvHandleDeleter> stream_;
102 // Currently suspended readers/writers to be notified on close().
103 std::optional<std::coroutine_handle<>> reader_;
104 std::optional<std::coroutine_handle<>> writer_;
105
107 ShutdownAwaiter_() = default;
108 static void onShutdown(uv_shutdown_t *req, uv_status status);
109
110 bool await_ready();
111 bool await_suspend(std::coroutine_handle<> handle);
112 void await_resume();
113
114 std::optional<std::coroutine_handle<>> handle_;
115 std::optional<uv_status> status_;
116 };
117
119 explicit InStreamAwaiter_(StreamBase &stream, std::span<char> buffer)
120 : stream_{stream}, buffer_{buffer} {}
121
122 bool await_ready();
123 bool await_suspend(std::coroutine_handle<> handle);
124 size_t await_resume();
125
126 void start_read();
127 void stop_read();
128
129 static void allocate(uv_handle_t *handle, size_t suggested_size,
130 uv_buf_t *buf);
131 static void onInStreamRead(uv_stream_t *stream, ssize_t nread,
132 const uv_buf_t *buf);
133
135 std::span<char> buffer_;
136 std::optional<ssize_t> status_;
137 std::optional<std::coroutine_handle<>> handle_;
138 };
139
141 OutStreamAwaiter_(StreamBase &stream, std::string_view buffer);
142
143 [[nodiscard]] std::array<uv_buf_t, 1> prepare_buffers() const;
144
145 bool await_ready();
146 bool await_suspend(std::coroutine_handle<> handle);
148
149 static void onOutStreamWrite(uv_write_t *write, uv_status status);
150
151 std::optional<std::coroutine_handle<>> handle_;
152 std::optional<uv_status> status_;
153
154 // State necessary for both immediate and delayed writing.
155 std::string_view buffer_;
156 uv_write_t write_{};
158 };
159};
160
167class TtyStream : public StreamBase {
168public:
169 // Takes ownership of stream.
170 TtyStream(TtyStream &&other) = default;
171 TtyStream(const TtyStream &other) = delete;
173 TtyStream &operator=(const TtyStream &) = delete;
174 ~TtyStream() override = default;
175
176 static TtyStream tty(const Loop &loop, int fd);
177 static TtyStream stdin(const Loop &loop) { return tty(loop, 0); }
178 static TtyStream stdout(const Loop &loop) { return tty(loop, 1); }
179 static TtyStream stderr(const Loop &loop) { return tty(loop, 2); }
180
181private:
182 explicit TtyStream(std::unique_ptr<uv_tty_t> stream)
183 : StreamBase{std::move(stream)} {}
184};
185
187
188} // namespace uvco
Definition loop.h:26
Definition promise.h:76
A plain stream, permitting reading, writing, and closing.
Definition stream.h:32
Promise< void > shutdown()
Definition stream.cc:78
Promise< std::optional< std::string > > read(size_t maxSize=defaultMaxReadSize)
Definition stream.cc:50
static constexpr size_t defaultMaxReadSize
Definition stream.h:43
StreamBase & operator=(const StreamBase &)=delete
std::optional< std::coroutine_handle<> > reader_
Definition stream.h:103
StreamBase(StreamBase &&)=default
uv_stream_t & stream()
Definition stream.h:91
Promise< void > close()
Definition stream.cc:89
StreamBase & operator=(StreamBase &&)=default
const uv_stream_t * underlying() const
Return the underlying UV stream object.
Definition stream.h:88
virtual ~StreamBase()
Definition stream.cc:28
Promise< size_t > write(std::string buf)
Definition stream.cc:69
StreamBase(const StreamBase &)=delete
std::optional< std::coroutine_handle<> > writer_
Definition stream.h:104
void destroyStream()
Definition stream.h:95
StreamBase(std::unique_ptr< Stream > stream)
Definition stream.h:35
std::unique_ptr< uv_stream_t, UvHandleDeleter > stream_
Definition stream.h:101
Definition stream.h:167
TtyStream & operator=(TtyStream &&)=default
static TtyStream tty(const Loop &loop, int fd)
Definition stream.cc:40
TtyStream(TtyStream &&other)=default
TtyStream & operator=(const TtyStream &)=delete
~TtyStream() override=default
TtyStream(const TtyStream &other)=delete
TtyStream(std::unique_ptr< uv_tty_t > stream)
Definition stream.h:182
static TtyStream stdin(const Loop &loop)
Definition stream.h:177
static TtyStream stdout(const Loop &loop)
Definition stream.h:178
static TtyStream stderr(const Loop &loop)
Definition stream.h:179
int uv_status
Result of a libuv operation, an errno error code.
Definition internal_utils.h:22
Definition async_work.cc:17
Definition stream.h:118
std::optional< ssize_t > status_
Definition stream.h:136
bool await_ready()
Definition stream.cc:104
InStreamAwaiter_(StreamBase &stream, std::span< char > buffer)
Definition stream.h:119
static void allocate(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
Definition stream.cc:143
std::span< char > buffer_
Definition stream.h:135
void start_read()
Definition stream.cc:152
std::optional< std::coroutine_handle<> > handle_
Definition stream.h:137
size_t await_resume()
Definition stream.cc:125
void stop_read()
Definition stream.cc:157
StreamBase & stream_
Definition stream.h:134
bool await_suspend(std::coroutine_handle<> handle)
Definition stream.cc:115
static void onInStreamRead(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
Definition stream.cc:162
std::optional< uv_status > status_
Definition stream.h:152
std::array< uv_buf_t, 1 > prepare_buffers() const
Definition stream.cc:182
OutStreamAwaiter_(StreamBase &stream, std::string_view buffer)
Definition stream.cc:178
StreamBase & stream_
Definition stream.h:157
uv_status await_resume()
Definition stream.cc:214
uv_write_t write_
Definition stream.h:156
std::string_view buffer_
Definition stream.h:155
bool await_suspend(std::coroutine_handle<> handle)
Definition stream.cc:199
std::optional< std::coroutine_handle<> > handle_
Definition stream.h:151
bool await_ready()
Definition stream.cc:189
static void onOutStreamWrite(uv_write_t *write, uv_status status)
Definition stream.cc:224
Definition stream.h:106
static void onShutdown(uv_shutdown_t *req, uv_status status)
Definition stream.cc:252
bool await_ready()
Definition stream.cc:236
bool await_suspend(std::coroutine_handle<> handle)
Definition stream.cc:238
std::optional< uv_status > status_
Definition stream.h:115
std::optional< std::coroutine_handle<> > handle_
Definition stream.h:114
void await_resume()
Definition stream.cc:245