LCOV - code coverage report
Current view: top level - include/boost/corosio - io_stream.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 97.1 % 35 34 1
Test Date: 2026-02-16 16:21:08 Functions: 100.0 % 27 27

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_IO_STREAM_HPP
      11                 : #define BOOST_COROSIO_IO_STREAM_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/config.hpp>
      14                 : #include <boost/corosio/io_object.hpp>
      15                 : #include <boost/capy/io_result.hpp>
      16                 : #include <boost/corosio/io_buffer_param.hpp>
      17                 : #include <boost/capy/ex/executor_ref.hpp>
      18                 : #include <boost/capy/ex/io_env.hpp>
      19                 : #include <system_error>
      20                 : 
      21                 : #include <coroutine>
      22                 : #include <cstddef>
      23                 : #include <stop_token>
      24                 : 
      25                 : namespace boost::corosio {
      26                 : 
      27                 : /** Platform stream with read/write operations.
      28                 : 
      29                 :     This base class provides the fundamental async read and write
      30                 :     operations for kernel-level stream I/O. Derived classes wrap
      31                 :     OS-specific stream implementations (sockets, pipes, etc.) and
      32                 :     satisfy @ref capy::ReadStream and @ref capy::WriteStream concepts.
      33                 : 
      34                 :     @par Semantics
      35                 :     Concrete classes wrap direct platform I/O completed by the kernel.
      36                 :     Functions taking `io_stream&` signal "platform implementation
      37                 :     required" - use this when you need actual kernel I/O rather than
      38                 :     a mock or test double.
      39                 : 
      40                 :     For generic stream algorithms that work with test mocks,
      41                 :     use `template<capy::Stream S>` instead of `io_stream&`.
      42                 : 
      43                 :     @par Thread Safety
      44                 :     Distinct objects: Safe.
      45                 :     Shared objects: Unsafe. All calls to a single stream must be made
      46                 :     from the same implicit or explicit serialization context.
      47                 : 
      48                 :     @par Example
      49                 :     @code
      50                 :     // Read until buffer full or EOF
      51                 :     capy::task<> read_all( io_stream& stream, std::span<char> buf )
      52                 :     {
      53                 :         std::size_t total = 0;
      54                 :         while( total < buf.size() )
      55                 :         {
      56                 :             auto [ec, n] = co_await stream.read_some(
      57                 :                 capy::buffer( buf.data() + total, buf.size() - total ) );
      58                 :             if( ec == capy::cond::eof )
      59                 :                 break;
      60                 :             if( ec.failed() )
      61                 :                 capy::detail::throw_system_error( ec );
      62                 :             total += n;
      63                 :         }
      64                 :     }
      65                 :     @endcode
      66                 : 
      67                 :     @see capy::Stream, capy::ReadStream, capy::WriteStream, tcp_socket
      68                 : */
      69                 : class BOOST_COROSIO_DECL io_stream : public io_object
      70                 : {
      71                 : public:
      72                 :     /** Asynchronously read data from the stream.
      73                 : 
      74                 :         This operation suspends the calling coroutine and initiates a
      75                 :         kernel-level read. The coroutine resumes when the operation
      76                 :         completes.
      77                 : 
      78                 :         @li The operation completes when:
      79                 :         @li At least one byte has been read into the buffer sequence
      80                 :         @li The peer closes the connection (EOF)
      81                 :         @li An error occurs
      82                 :         @li The operation is cancelled via stop token or `cancel()`
      83                 : 
      84                 :         @par Concurrency
      85                 :         At most one write operation may be in flight concurrently with
      86                 :         this read. No other read operations may be in flight until this
      87                 :         operation completes. Note that concurrent in-flight operations
      88                 :         does not imply the initiating calls may be made concurrently;
      89                 :         all calls must be serialized.
      90                 : 
      91                 :         @par Cancellation
      92                 :         Supports cancellation via `std::stop_token` propagated through
      93                 :         the IoAwaitable protocol, or via the I/O object's `cancel()`
      94                 :         member. When cancelled, the operation completes with an error
      95                 :         that compares equal to `capy::cond::canceled`.
      96                 : 
      97                 :         @par Preconditions
      98                 :         The stream must be open and connected.
      99                 : 
     100                 :         @param buffers The buffer sequence to read data into. The caller
     101                 :             retains ownership and must ensure validity until the
     102                 :             operation completes.
     103                 : 
     104                 :         @return An awaitable yielding `(error_code, std::size_t)`.
     105                 :             On success, `bytes_transferred` contains the number of bytes
     106                 :             read. Compare error codes to conditions, not specific values:
     107                 :             @li `capy::cond::eof` - Peer closed connection (TCP FIN)
     108                 :             @li `capy::cond::canceled` - Operation was cancelled
     109                 : 
     110                 :         @par Example
     111                 :         @code
     112                 :         // Simple read with error handling
     113                 :         auto [ec, n] = co_await stream.read_some( capy::buffer( buf ) );
     114                 :         if( ec == capy::cond::eof )
     115                 :             co_return;  // Connection closed gracefully
     116                 :         if( ec.failed() )
     117                 :             capy::detail::throw_system_error( ec );
     118                 :         process( buf, n );
     119                 :         @endcode
     120                 : 
     121                 :         @note This operation may read fewer bytes than the buffer
     122                 :             capacity. Use a loop or `capy::async_read` to read an
     123                 :             exact amount.
     124                 : 
     125                 :         @see write_some, capy::async_read
     126                 :     */
     127                 :     template<capy::MutableBufferSequence MB>
     128 HIT      327949 :     auto read_some(MB const& buffers)
     129                 :     {
     130          327949 :         return read_some_awaitable<MB>(*this, buffers);
     131                 :     }
     132                 : 
     133                 :     /** Asynchronously write data to the stream.
     134                 : 
     135                 :         This operation suspends the calling coroutine and initiates a
     136                 :         kernel-level write. The coroutine resumes when the operation
     137                 :         completes.
     138                 : 
     139                 :         @li The operation completes when:
     140                 :         @li At least one byte has been written from the buffer sequence
     141                 :         @li An error occurs (including connection reset by peer)
     142                 :         @li The operation is cancelled via stop token or `cancel()`
     143                 : 
     144                 :         @par Concurrency
     145                 :         At most one read operation may be in flight concurrently with
     146                 :         this write. No other write operations may be in flight until
     147                 :         this operation completes. Note that concurrent in-flight
     148                 :         operations does not imply the initiating calls may be made
     149                 :         concurrently; all calls must be serialized.
     150                 : 
     151                 :         @par Cancellation
     152                 :         Supports cancellation via `std::stop_token` propagated through
     153                 :         the IoAwaitable protocol, or via the I/O object's `cancel()`
     154                 :         member. When cancelled, the operation completes with an error
     155                 :         that compares equal to `capy::cond::canceled`.
     156                 : 
     157                 :         @par Preconditions
     158                 :         The stream must be open and connected.
     159                 : 
     160                 :         @param buffers The buffer sequence containing data to write.
     161                 :             The caller retains ownership and must ensure validity
     162                 :             until the operation completes.
     163                 : 
     164                 :         @return An awaitable yielding `(error_code, std::size_t)`.
     165                 :             On success, `bytes_transferred` contains the number of bytes
     166                 :             written. Compare error codes to conditions, not specific
     167                 :             values:
     168                 :             @li `capy::cond::canceled` - Operation was cancelled
     169                 :             @li `std::errc::broken_pipe` - Peer closed connection
     170                 : 
     171                 :         @par Example
     172                 :         @code
     173                 :         // Write all data
     174                 :         std::string_view data = "Hello, World!";
     175                 :         std::size_t written = 0;
     176                 :         while( written < data.size() )
     177                 :         {
     178                 :             auto [ec, n] = co_await stream.write_some(
     179                 :                 capy::buffer( data.data() + written,
     180                 :                               data.size() - written ) );
     181                 :             if( ec.failed() )
     182                 :                 capy::detail::throw_system_error( ec );
     183                 :             written += n;
     184                 :         }
     185                 :         @endcode
     186                 : 
     187                 :         @note This operation may write fewer bytes than the buffer
     188                 :             contains. Use a loop or `capy::async_write` to write
     189                 :             all data.
     190                 : 
     191                 :         @see read_some, capy::async_write
     192                 :     */
     193                 :     template<capy::ConstBufferSequence CB>
     194          327585 :     auto write_some(CB const& buffers)
     195                 :     {
     196          327585 :         return write_some_awaitable<CB>(*this, buffers);
     197                 :     }
     198                 : 
     199                 : protected:
     200                 :     /// Awaitable for async read operations.
     201                 :     template<class MutableBufferSequence>
     202                 :     struct read_some_awaitable
     203                 :     {
     204                 :         io_stream& ios_;
     205                 :         MutableBufferSequence buffers_;
     206                 :         std::stop_token token_;
     207                 :         mutable std::error_code ec_;
     208                 :         mutable std::size_t bytes_transferred_ = 0;
     209                 : 
     210          327949 :         read_some_awaitable(
     211                 :             io_stream& ios, MutableBufferSequence buffers) noexcept
     212          327949 :             : ios_(ios)
     213          327949 :             , buffers_(std::move(buffers))
     214                 :         {
     215          327949 :         }
     216                 : 
     217          327949 :         bool await_ready() const noexcept
     218                 :         {
     219          327949 :             return token_.stop_requested();
     220                 :         }
     221                 : 
     222          327949 :         capy::io_result<std::size_t> await_resume() const noexcept
     223                 :         {
     224          327949 :             if (token_.stop_requested())
     225             198 :                 return {make_error_code(std::errc::operation_canceled), 0};
     226          327751 :             return {ec_, bytes_transferred_};
     227                 :         }
     228                 : 
     229          327949 :         auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
     230                 :             -> std::coroutine_handle<>
     231                 :         {
     232          327949 :             token_ = env->stop_token;
     233          983847 :             return ios_.get().read_some(
     234          983847 :                 h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
     235                 :         }
     236                 :     };
     237                 : 
     238                 :     /// Awaitable for async write operations.
     239                 :     template<class ConstBufferSequence>
     240                 :     struct write_some_awaitable
     241                 :     {
     242                 :         io_stream& ios_;
     243                 :         ConstBufferSequence buffers_;
     244                 :         std::stop_token token_;
     245                 :         mutable std::error_code ec_;
     246                 :         mutable std::size_t bytes_transferred_ = 0;
     247                 : 
     248          327585 :         write_some_awaitable(
     249                 :             io_stream& ios, ConstBufferSequence buffers) noexcept
     250          327585 :             : ios_(ios)
     251          327585 :             , buffers_(std::move(buffers))
     252                 :         {
     253          327585 :         }
     254                 : 
     255          327585 :         bool await_ready() const noexcept
     256                 :         {
     257          327585 :             return token_.stop_requested();
     258                 :         }
     259                 : 
     260          327585 :         capy::io_result<std::size_t> await_resume() const noexcept
     261                 :         {
     262          327585 :             if (token_.stop_requested())
     263 MIS           0 :                 return {make_error_code(std::errc::operation_canceled), 0};
     264 HIT      327585 :             return {ec_, bytes_transferred_};
     265                 :         }
     266                 : 
     267          327585 :         auto await_suspend(std::coroutine_handle<> h, capy::io_env const* env)
     268                 :             -> std::coroutine_handle<>
     269                 :         {
     270          327585 :             token_ = env->stop_token;
     271          982755 :             return ios_.get().write_some(
     272          982755 :                 h, env->executor, buffers_, token_, &ec_, &bytes_transferred_);
     273                 :         }
     274                 :     };
     275                 : 
     276                 : public:
     277                 :     /** Platform-specific stream implementation interface.
     278                 : 
     279                 :         Derived classes implement this interface to provide kernel-level
     280                 :         read and write operations for each supported platform (IOCP,
     281                 :         epoll, kqueue, io_uring).
     282                 :     */
     283                 :     struct implementation : io_object::implementation
     284                 :     {
     285                 :         /// Initiate platform read operation.
     286                 :         virtual std::coroutine_handle<> read_some(
     287                 :             std::coroutine_handle<>,
     288                 :             capy::executor_ref,
     289                 :             io_buffer_param,
     290                 :             std::stop_token,
     291                 :             std::error_code*,
     292                 :             std::size_t*) = 0;
     293                 : 
     294                 :         /// Initiate platform write operation.
     295                 :         virtual std::coroutine_handle<> write_some(
     296                 :             std::coroutine_handle<>,
     297                 :             capy::executor_ref,
     298                 :             io_buffer_param,
     299                 :             std::stop_token,
     300                 :             std::error_code*,
     301                 :             std::size_t*) = 0;
     302                 :     };
     303                 : 
     304                 : protected:
     305                 :     /// Construct stream from a handle.
     306           16826 :     explicit io_stream(handle h) noexcept : io_object(std::move(h)) {}
     307                 : 
     308                 : private:
     309                 :     /// Return implementation downcasted to stream interface.
     310          655534 :     implementation& get() const noexcept
     311                 :     {
     312          655534 :         return *static_cast<implementation*>(h_.get());
     313                 :     }
     314                 : };
     315                 : 
     316                 : } // namespace boost::corosio
     317                 : 
     318                 : #endif
        

Generated by: LCOV version 2.3