src/corosio/src/detail/select/sockets.hpp

100.0% Lines (18/18) 100.0% Functions (7/7)
src/corosio/src/detail/select/sockets.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_SELECT_SOCKETS_HPP
11 #define BOOST_COROSIO_DETAIL_SELECT_SOCKETS_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/tcp_socket.hpp>
19 #include <boost/capy/ex/executor_ref.hpp>
20 #include <boost/capy/ex/execution_context.hpp>
21 #include "src/detail/intrusive.hpp"
22 #include "src/detail/socket_service.hpp"
23
24 #include "src/detail/select/op.hpp"
25 #include "src/detail/select/scheduler.hpp"
26
27 #include <memory>
28 #include <mutex>
29 #include <unordered_map>
30
31 /*
32 select Socket Implementation
33 ============================
34
35 This mirrors the epoll_sockets design for behavioral consistency.
36 Each I/O operation follows the same pattern:
37 1. Try the syscall immediately (non-blocking socket)
38 2. If it succeeds or fails with a real error, post to completion queue
39 3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
40
41 Cancellation
42 ------------
43 See op.hpp for the completion/cancellation race handling via the
44 `registered` atomic. cancel() must complete pending operations (post
45 them with cancelled flag) so coroutines waiting on them can resume.
46 close_socket() calls cancel() first to ensure this.
47
48 Impl Lifetime with shared_ptr
49 -----------------------------
50 Socket impls use enable_shared_from_this. The service owns impls via
51 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
52 removal. When a user calls close(), we call cancel() which posts pending
53 ops to the scheduler.
54
55 CRITICAL: The posted ops must keep the impl alive until they complete.
56 Otherwise the scheduler would process a freed op (use-after-free). The
57 cancel() method captures shared_from_this() into op.impl_ptr before
58 posting. When the op completes, impl_ptr is cleared, allowing the impl
59 to be destroyed if no other references exist.
60
61 Service Ownership
62 -----------------
63 select_socket_service owns all socket impls. destroy() removes the
64 shared_ptr from the map, but the impl may survive if ops still hold
65 impl_ptr refs. shutdown() closes all sockets and clears the map; any
66 in-flight ops will complete and release their refs.
67 */
68
69 namespace boost::corosio::detail {
70
71 class select_socket_service;
72 class select_socket_impl;
73
74 /// Socket implementation for select backend.
75 class select_socket_impl final
76 : public tcp_socket::implementation
77 , public std::enable_shared_from_this<select_socket_impl>
78 , public intrusive_list<select_socket_impl>::node
79 {
80 friend class select_socket_service;
81
82 public:
83 explicit select_socket_impl(select_socket_service& svc) noexcept;
84
85 std::coroutine_handle<> connect(
86 std::coroutine_handle<>,
87 capy::executor_ref,
88 endpoint,
89 std::stop_token,
90 std::error_code*) override;
91
92 std::coroutine_handle<> read_some(
93 std::coroutine_handle<>,
94 capy::executor_ref,
95 io_buffer_param,
96 std::stop_token,
97 std::error_code*,
98 std::size_t*) override;
99
100 std::coroutine_handle<> write_some(
101 std::coroutine_handle<>,
102 capy::executor_ref,
103 io_buffer_param,
104 std::stop_token,
105 std::error_code*,
106 std::size_t*) override;
107
108 std::error_code shutdown(tcp_socket::shutdown_type what) noexcept override;
109
110 21677 native_handle_type native_handle() const noexcept override
111 {
112 21677 return fd_;
113 }
114
115 // Socket options
116 std::error_code set_no_delay(bool value) noexcept override;
117 bool no_delay(std::error_code& ec) const noexcept override;
118
119 std::error_code set_keep_alive(bool value) noexcept override;
120 bool keep_alive(std::error_code& ec) const noexcept override;
121
122 std::error_code set_receive_buffer_size(int size) noexcept override;
123 int receive_buffer_size(std::error_code& ec) const noexcept override;
124
125 std::error_code set_send_buffer_size(int size) noexcept override;
126 int send_buffer_size(std::error_code& ec) const noexcept override;
127
128 std::error_code set_linger(bool enabled, int timeout) noexcept override;
129 tcp_socket::linger_options
130 linger(std::error_code& ec) const noexcept override;
131
132 16 endpoint local_endpoint() const noexcept override
133 {
134 16 return local_endpoint_;
135 }
136 16 endpoint remote_endpoint() const noexcept override
137 {
138 16 return remote_endpoint_;
139 }
140 bool is_open() const noexcept
141 {
142 return fd_ >= 0;
143 }
144 void cancel() noexcept override;
145 void cancel_single_op(select_op& op) noexcept;
146 void close_socket() noexcept;
147 3561 void set_socket(int fd) noexcept
148 {
149 3561 fd_ = fd;
150 3561 }
151 7122 void set_endpoints(endpoint local, endpoint remote) noexcept
152 {
153 7122 local_endpoint_ = local;
154 7122 remote_endpoint_ = remote;
155 7122 }
156
157 select_connect_op conn_;
158 select_read_op rd_;
159 select_write_op wr_;
160
161 private:
162 select_socket_service& svc_;
163 int fd_ = -1;
164 endpoint local_endpoint_;
165 endpoint remote_endpoint_;
166 };
167
168 /** State for select socket service. */
169 class select_socket_state
170 {
171 public:
172 133 explicit select_socket_state(select_scheduler& sched) noexcept
173 133 : sched_(sched)
174 {
175 133 }
176
177 select_scheduler& sched_;
178 std::mutex mutex_;
179 intrusive_list<select_socket_impl> socket_list_;
180 std::unordered_map<select_socket_impl*, std::shared_ptr<select_socket_impl>>
181 socket_ptrs_;
182 };
183
184 /** select socket service implementation.
185
186 Inherits from socket_service to enable runtime polymorphism.
187 Uses key_type = socket_service for service lookup.
188 */
189 class select_socket_service final : public socket_service
190 {
191 public:
192 explicit select_socket_service(capy::execution_context& ctx);
193 ~select_socket_service() override;
194
195 select_socket_service(select_socket_service const&) = delete;
196 select_socket_service& operator=(select_socket_service const&) = delete;
197
198 void shutdown() override;
199
200 io_object::implementation* construct() override;
201 void destroy(io_object::implementation*) override;
202 void close(io_object::handle&) override;
203 std::error_code open_socket(tcp_socket::implementation& impl) override;
204
205 11139 select_scheduler& scheduler() const noexcept
206 {
207 11139 return state_->sched_;
208 }
209 void post(select_op* op);
210 void work_started() noexcept;
211 void work_finished() noexcept;
212
213 private:
214 std::unique_ptr<select_socket_state> state_;
215 };
216
217 // Backward compatibility alias
218 using select_sockets = select_socket_service;
219
220 } // namespace boost::corosio::detail
221
222 #endif // BOOST_COROSIO_HAS_SELECT
223
224 #endif // BOOST_COROSIO_DETAIL_SELECT_SOCKETS_HPP
225