TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_IO_STREAM_HPP
11 : #define BOOST_COROSIO_IO_STREAM_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/io_object.hpp>
15 : #include <boost/capy/io_result.hpp>
16 : #include <boost/corosio/io_buffer_param.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 : #include <boost/capy/ex/io_env.hpp>
19 : #include <system_error>
20 :
21 : #include <coroutine>
22 : #include <cstddef>
23 : #include <stop_token>
24 :
25 : namespace boost::corosio {
26 :
27 : /** Platform stream with read/write operations.
28 :
29 : This base class provides the fundamental async read and write
30 : operations for kernel-level stream I/O. Derived classes wrap
31 : OS-specific stream implementations (sockets, pipes, etc.) and
32 : satisfy @ref capy::ReadStream and @ref capy::WriteStream concepts.
33 :
34 : @par Semantics
35 : Concrete classes wrap direct platform I/O completed by the kernel.
36 : Functions taking `io_stream&` signal "platform implementation
37 : required" - use this when you need actual kernel I/O rather than
38 : a mock or test double.
39 :
40 : For generic stream algorithms that work with test mocks,
41 : use `template<capy::Stream S>` instead of `io_stream&`.
42 :
43 : @par Thread Safety
44 : Distinct objects: Safe.
45 : Shared objects: Unsafe. All calls to a single stream must be made
46 : from the same implicit or explicit serialization context.
47 :
48 : @par Example
49 : @code
50 : // Read until buffer full or EOF
51 : capy::task<> read_all( io_stream& stream, std::span<char> buf )
52 : {
53 : std::size_t total = 0;
54 : while( total < buf.size() )
55 : {
56 : auto [ec, n] = co_await stream.read_some(
57 : capy::buffer( buf.data() + total, buf.size() - total ) );
58 : if( ec == capy::cond::eof )
59 : break;
60 : if( ec.failed() )
61 : capy::detail::throw_system_error( ec );
62 : total += n;
63 : }
64 : }
65 : @endcode
66 :
67 : @see capy::Stream, capy::ReadStream, capy::WriteStream, tcp_socket
68 : */
69 : class BOOST_COROSIO_DECL io_stream : public io_object
70 : {
71 : public:
72 : /** Asynchronously read data from the stream.
73 :
74 : This operation suspends the calling coroutine and initiates a
75 : kernel-level read. The coroutine resumes when the operation
76 : completes.
77 :
78 : @li The operation completes when:
79 : @li At least one byte has been read into the buffer sequence
80 : @li The peer closes the connection (EOF)
81 : @li An error occurs
82 : @li The operation is cancelled via stop token or `cancel()`
83 :
84 : @par Concurrency
85 : At most one write operation may be in flight concurrently with
86 : this read. No other read operations may be in flight until this
87 : operation completes. Note that concurrent in-flight operations
88 : does not imply the initiating calls may be made concurrently;
89 : all calls must be serialized.
90 :
91 : @par Cancellation
92 : Supports cancellation via `std::stop_token` propagated through
93 : the IoAwaitable protocol, or via the I/O object's `cancel()`
94 : member. When cancelled, the operation completes with an error
95 : that compares equal to `capy::cond::canceled`.
96 :
97 : @par Preconditions
98 : The stream must be open and connected.
99 :
100 : @param buffers The buffer sequence to read data into. The caller
101 : retains ownership and must ensure validity until the
102 : operation completes.
103 :
104 : @return An awaitable yielding `(error_code, std::size_t)`.
105 : On success, `bytes_transferred` contains the number of bytes
106 : read. Compare error codes to conditions, not specific values:
107 : @li `capy::cond::eof` - Peer closed connection (TCP FIN)
108 : @li `capy::cond::canceled` - Operation was cancelled
109 :
110 : @par Example
111 : @code
112 : // Simple read with error handling
113 : auto [ec, n] = co_await stream.read_some( capy::buffer( buf ) );
114 : if( ec == capy::cond::eof )
115 : co_return; // Connection closed gracefully
116 : if( ec.failed() )
117 : capy::detail::throw_system_error( ec );
118 : process( buf, n );
119 : @endcode
120 :
121 : @note This operation may read fewer bytes than the buffer
122 : capacity. Use a loop or `capy::async_read` to read an
123 : exact amount.
124 :
125 : @see write_some, capy::async_read
126 : */
127 : template<capy::MutableBufferSequence MB>
128 HIT 327949 : auto read_some(MB const& buffers)
129 : {
130 327949 : return read_some_awaitable<MB>(*this, buffers);
131 : }
132 :
133 : /** Asynchronously write data to the stream.
134 :
135 : This operation suspends the calling coroutine and initiates a
136 : kernel-level write. The coroutine resumes when the operation
137 : completes.
138 :
139 : @li The operation completes when:
140 : @li At least one byte has been written from the buffer sequence
141 : @li An error occurs (including connection reset by peer)
142 : @li The operation is cancelled via stop token or `cancel()`
143 :
144 : @par Concurrency
145 : At most one read operation may be in flight concurrently with
146 : this write. No other write operations may be in flight until
147 : this operation completes. Note that concurrent in-flight
148 : operations does not imply the initiating calls may be made
149 : concurrently; all calls must be serialized.
150 :
151 : @par Cancellation
152 : Supports cancellation via `std::stop_token` propagated through
153 : the IoAwaitable protocol, or via the I/O object's `cancel()`
154 : member. When cancelled, the operation completes with an error
155 : that compares equal to `capy::cond::canceled`.
156 :
157 : @par Preconditions
158 : The stream must be open and connected.
159 :
160 : @param buffers The buffer sequence containing data to write.
161 : The caller retains ownership and must ensure validity
162 : until the operation completes.
163 :
164 : @return An awaitable yielding `(error_code, std::size_t)`.
165 : On success, `bytes_transferred` contains the number of bytes
166 : written. Compare error codes to conditions, not specific
167 : values:
168 : @li `capy::cond::canceled` - Operation was cancelled
169 : @li `std::errc::broken_pipe` - Peer closed connection
170 :
171 : @par Example
172 : @code
173 : // Write all data
174 : std::string_view data = "Hello, World!";
175 : std::size_t written = 0;
176 : while( written < data.size() )
177 : {
178 : auto [ec, n] = co_await stream.write_some(
179 : capy::buffer( data.data() + written,
180 : data.size() - written ) );
181 : if( ec.failed() )
182 : capy::detail::throw_system_error( ec );
183 : written += n;
184 : }
185 : @endcode
186 :
187 : @note This operation may write fewer bytes than the buffer
188 : contains. Use a loop or `capy::async_write` to write
189 : all data.
190 :
191 : @see read_some, capy::async_write
192 : */
193 : template<capy::ConstBufferSequence CB>
194 327585 : auto write_some(CB const& buffers)
195 : {
196 327585 : return write_some_awaitable<CB>(*this, buffers);
197 : }
198 :
199 : protected:
200 : /// Awaitable for async read operations.
201 : template<class MutableBufferSequence>
202 : struct read_some_awaitable
203 : {
204 : io_stream& ios_;
205 : MutableBufferSequence buffers_;
206 : std::stop_token token_;
207 : mutable std::error_code ec_;
208 : mutable std::size_t bytes_transferred_ = 0;
209 :
210 327949 : read_some_awaitable(
211 : io_stream& ios, MutableBufferSequence buffers) noexcept
212 327949 : : ios_(ios)
213 327949 : , buffers_(std::move(buffers))
214 : {
215 327949 : }
216 :
217 327949 : bool await_ready() const noexcept
218 : {
219 327949 : return token_.stop_requested();
220 : }
221 :
222 327949 : capy::io_result<std::size_t> await_resume() const noexcept
223 : {
224 327949 : if (token_.stop_requested())
225 198 : return {make_error_code(std::errc::operation_canceled), 0};
226 327751 : return {ec_, bytes_transferred_};
227 : }
228 :
229 327949 : auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
230 : -> std::coroutine_handle<>
231 : {
232 327949 : token_ = env->stop_token;
233 983847 : return ios_.get().read_some(
234 983847 : h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
235 : }
236 : };
237 :
238 : /// Awaitable for async write operations.
239 : template<class ConstBufferSequence>
240 : struct write_some_awaitable
241 : {
242 : io_stream& ios_;
243 : ConstBufferSequence buffers_;
244 : std::stop_token token_;
245 : mutable std::error_code ec_;
246 : mutable std::size_t bytes_transferred_ = 0;
247 :
248 327585 : write_some_awaitable(
249 : io_stream& ios, ConstBufferSequence buffers) noexcept
250 327585 : : ios_(ios)
251 327585 : , buffers_(std::move(buffers))
252 : {
253 327585 : }
254 :
255 327585 : bool await_ready() const noexcept
256 : {
257 327585 : return token_.stop_requested();
258 : }
259 :
260 327585 : capy::io_result<std::size_t> await_resume() const noexcept
261 : {
262 327585 : if (token_.stop_requested())
263 MIS 0 : return {make_error_code(std::errc::operation_canceled), 0};
264 HIT 327585 : return {ec_, bytes_transferred_};
265 : }
266 :
267 327585 : auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
268 : -> std::coroutine_handle<>
269 : {
270 327585 : token_ = env->stop_token;
271 982755 : return ios_.get().write_some(
272 982755 : h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
273 : }
274 : };
275 :
276 : public:
277 : /** Platform-specific stream implementation interface.
278 :
279 : Derived classes implement this interface to provide kernel-level
280 : read and write operations for each supported platform (IOCP,
281 : epoll, kqueue, io_uring).
282 : */
283 : struct implementation : io_object::implementation
284 : {
285 : /// Initiate platform read operation.
286 : virtual std::coroutine_handle<> read_some(
287 : std::coroutine_handle<>,
288 : capy::executor_ref,
289 : io_buffer_param,
290 : std::stop_token,
291 : std::error_code*,
292 : std::size_t*) = 0;
293 :
294 : /// Initiate platform write operation.
295 : virtual std::coroutine_handle<> write_some(
296 : std::coroutine_handle<>,
297 : capy::executor_ref,
298 : io_buffer_param,
299 : std::stop_token,
300 : std::error_code*,
301 : std::size_t*) = 0;
302 : };
303 :
304 : protected:
305 : /// Construct stream from a handle.
306 16826 : explicit io_stream(handle h) noexcept : io_object(std::move(h)) {}
307 :
308 : private:
309 : /// Return implementation downcasted to stream interface.
310 655534 : implementation& get() const noexcept
311 : {
312 655534 : return *static_cast<implementation*>(h_.get());
313 : }
314 : };
315 :
316 : } // namespace boost::corosio
317 :
318 : #endif
|