include/boost/corosio/io_stream.hpp

97.1% Lines (34/35) 100.0% Functions (27/27)
include/boost/corosio/io_stream.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_IO_STREAM_HPP
11 #define BOOST_COROSIO_IO_STREAM_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/io_object.hpp>
15 #include <boost/capy/io_result.hpp>
16 #include <boost/corosio/io_buffer_param.hpp>
17 #include <boost/capy/ex/executor_ref.hpp>
18 #include <boost/capy/ex/io_env.hpp>
19 #include <system_error>
20
21 #include <coroutine>
22 #include <cstddef>
23 #include <stop_token>
24
25 namespace boost::corosio {
26
27 /** Platform stream with read/write operations.
28
29 This base class provides the fundamental async read and write
30 operations for kernel-level stream I/O. Derived classes wrap
31 OS-specific stream implementations (sockets, pipes, etc.) and
32 satisfy @ref capy::ReadStream and @ref capy::WriteStream concepts.
33
34 @par Semantics
35 Concrete classes wrap direct platform I/O completed by the kernel.
36 Functions taking `io_stream&` signal "platform implementation
37 required" - use this when you need actual kernel I/O rather than
38 a mock or test double.
39
40 For generic stream algorithms that work with test mocks,
41 use `template<capy::Stream S>` instead of `io_stream&`.
42
43 @par Thread Safety
44 Distinct objects: Safe.
45 Shared objects: Unsafe. All calls to a single stream must be made
46 from the same implicit or explicit serialization context.
47
48 @par Example
49 @code
50 // Read until buffer full or EOF
51 capy::task<> read_all( io_stream& stream, std::span<char> buf )
52 {
53 std::size_t total = 0;
54 while( total < buf.size() )
55 {
56 auto [ec, n] = co_await stream.read_some(
57 capy::buffer( buf.data() + total, buf.size() - total ) );
58 if( ec == capy::cond::eof )
59 break;
60 if( ec.failed() )
61 capy::detail::throw_system_error( ec );
62 total += n;
63 }
64 }
65 @endcode
66
67 @see capy::Stream, capy::ReadStream, capy::WriteStream, tcp_socket
68 */
69 class BOOST_COROSIO_DECL io_stream : public io_object
70 {
71 public:
72 /** Asynchronously read data from the stream.
73
74 This operation suspends the calling coroutine and initiates a
75 kernel-level read. The coroutine resumes when the operation
76 completes.
77
78 @li The operation completes when:
79 @li At least one byte has been read into the buffer sequence
80 @li The peer closes the connection (EOF)
81 @li An error occurs
82 @li The operation is cancelled via stop token or `cancel()`
83
84 @par Concurrency
85 At most one write operation may be in flight concurrently with
86 this read. No other read operations may be in flight until this
87 operation completes. Note that concurrent in-flight operations
88 does not imply the initiating calls may be made concurrently;
89 all calls must be serialized.
90
91 @par Cancellation
92 Supports cancellation via `std::stop_token` propagated through
93 the IoAwaitable protocol, or via the I/O object's `cancel()`
94 member. When cancelled, the operation completes with an error
95 that compares equal to `capy::cond::canceled`.
96
97 @par Preconditions
98 The stream must be open and connected.
99
100 @param buffers The buffer sequence to read data into. The caller
101 retains ownership and must ensure validity until the
102 operation completes.
103
104 @return An awaitable yielding `(error_code, std::size_t)`.
105 On success, `bytes_transferred` contains the number of bytes
106 read. Compare error codes to conditions, not specific values:
107 @li `capy::cond::eof` - Peer closed connection (TCP FIN)
108 @li `capy::cond::canceled` - Operation was cancelled
109
110 @par Example
111 @code
112 // Simple read with error handling
113 auto [ec, n] = co_await stream.read_some( capy::buffer( buf ) );
114 if( ec == capy::cond::eof )
115 co_return; // Connection closed gracefully
116 if( ec.failed() )
117 capy::detail::throw_system_error( ec );
118 process( buf, n );
119 @endcode
120
121 @note This operation may read fewer bytes than the buffer
122 capacity. Use a loop or `capy::async_read` to read an
123 exact amount.
124
125 @see write_some, capy::async_read
126 */
127 template<capy::MutableBufferSequence MB>
128 327949 auto read_some(MB const& buffers)
129 {
130 327949 return read_some_awaitable<MB>(*this, buffers);
131 }
132
133 /** Asynchronously write data to the stream.
134
135 This operation suspends the calling coroutine and initiates a
136 kernel-level write. The coroutine resumes when the operation
137 completes.
138
139 @li The operation completes when:
140 @li At least one byte has been written from the buffer sequence
141 @li An error occurs (including connection reset by peer)
142 @li The operation is cancelled via stop token or `cancel()`
143
144 @par Concurrency
145 At most one read operation may be in flight concurrently with
146 this write. No other write operations may be in flight until
147 this operation completes. Note that concurrent in-flight
148 operations does not imply the initiating calls may be made
149 concurrently; all calls must be serialized.
150
151 @par Cancellation
152 Supports cancellation via `std::stop_token` propagated through
153 the IoAwaitable protocol, or via the I/O object's `cancel()`
154 member. When cancelled, the operation completes with an error
155 that compares equal to `capy::cond::canceled`.
156
157 @par Preconditions
158 The stream must be open and connected.
159
160 @param buffers The buffer sequence containing data to write.
161 The caller retains ownership and must ensure validity
162 until the operation completes.
163
164 @return An awaitable yielding `(error_code, std::size_t)`.
165 On success, `bytes_transferred` contains the number of bytes
166 written. Compare error codes to conditions, not specific
167 values:
168 @li `capy::cond::canceled` - Operation was cancelled
169 @li `std::errc::broken_pipe` - Peer closed connection
170
171 @par Example
172 @code
173 // Write all data
174 std::string_view data = "Hello, World!";
175 std::size_t written = 0;
176 while( written < data.size() )
177 {
178 auto [ec, n] = co_await stream.write_some(
179 capy::buffer( data.data() + written,
180 data.size() - written ) );
181 if( ec.failed() )
182 capy::detail::throw_system_error( ec );
183 written += n;
184 }
185 @endcode
186
187 @note This operation may write fewer bytes than the buffer
188 contains. Use a loop or `capy::async_write` to write
189 all data.
190
191 @see read_some, capy::async_write
192 */
193 template<capy::ConstBufferSequence CB>
194 327585 auto write_some(CB const& buffers)
195 {
196 327585 return write_some_awaitable<CB>(*this, buffers);
197 }
198
199 protected:
200 /// Awaitable for async read operations.
201 template<class MutableBufferSequence>
202 struct read_some_awaitable
203 {
204 io_stream& ios_;
205 MutableBufferSequence buffers_;
206 std::stop_token token_;
207 mutable std::error_code ec_;
208 mutable std::size_t bytes_transferred_ = 0;
209
210 327949 read_some_awaitable(
211 io_stream& ios, MutableBufferSequence buffers) noexcept
212 327949 : ios_(ios)
213 327949 , buffers_(std::move(buffers))
214 {
215 327949 }
216
217 327949 bool await_ready() const noexcept
218 {
219 327949 return token_.stop_requested();
220 }
221
222 327949 capy::io_result<std::size_t> await_resume() const noexcept
223 {
224 327949 if (token_.stop_requested())
225 198 return {make_error_code(std::errc::operation_canceled), 0};
226 327751 return {ec_, bytes_transferred_};
227 }
228
229 327949 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
230 -> std::coroutine_handle<>
231 {
232 327949 token_ = env->stop_token;
233 983847 return ios_.get().read_some(
234 983847 h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
235 }
236 };
237
238 /// Awaitable for async write operations.
239 template<class ConstBufferSequence>
240 struct write_some_awaitable
241 {
242 io_stream& ios_;
243 ConstBufferSequence buffers_;
244 std::stop_token token_;
245 mutable std::error_code ec_;
246 mutable std::size_t bytes_transferred_ = 0;
247
248 327585 write_some_awaitable(
249 io_stream& ios, ConstBufferSequence buffers) noexcept
250 327585 : ios_(ios)
251 327585 , buffers_(std::move(buffers))
252 {
253 327585 }
254
255 327585 bool await_ready() const noexcept
256 {
257 327585 return token_.stop_requested();
258 }
259
260 327585 capy::io_result<std::size_t> await_resume() const noexcept
261 {
262 327585 if (token_.stop_requested())
263 return {make_error_code(std::errc::operation_canceled), 0};
264 327585 return {ec_, bytes_transferred_};
265 }
266
267 327585 auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
268 -> std::coroutine_handle<>
269 {
270 327585 token_ = env->stop_token;
271 982755 return ios_.get().write_some(
272 982755 h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
273 }
274 };
275
276 public:
277 /** Platform-specific stream implementation interface.
278
279 Derived classes implement this interface to provide kernel-level
280 read and write operations for each supported platform (IOCP,
281 epoll, kqueue, io_uring).
282 */
283 struct implementation : io_object::implementation
284 {
285 /// Initiate platform read operation.
286 virtual std::coroutine_handle<> read_some(
287 std::coroutine_handle<>,
288 capy::executor_ref,
289 io_buffer_param,
290 std::stop_token,
291 std::error_code*,
292 std::size_t*) = 0;
293
294 /// Initiate platform write operation.
295 virtual std::coroutine_handle<> write_some(
296 std::coroutine_handle<>,
297 capy::executor_ref,
298 io_buffer_param,
299 std::stop_token,
300 std::error_code*,
301 std::size_t*) = 0;
302 };
303
304 protected:
305 /// Construct stream from a handle.
306 16826 explicit io_stream(handle h) noexcept : io_object(std::move(h)) {}
307
308 private:
309 /// Return implementation downcasted to stream interface.
310 655534 implementation& get() const noexcept
311 {
312 655534 return *static_cast<implementation*>(h_.get());
313 }
314 };
315
316 } // namespace boost::corosio
317
318 #endif
319