src/corosio/src/detail/select/acceptors.cpp

63.6% Lines (166/261) 88.9% Functions (16/18)
src/corosio/src/detail/select/acceptors.cpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_SELECT
13
14 #include "src/detail/select/acceptors.hpp"
15 #include "src/detail/select/sockets.hpp"
16 #include "src/detail/endpoint_convert.hpp"
17 #include "src/detail/dispatch_coro.hpp"
18 #include "src/detail/make_err.hpp"
19
20 #include <errno.h>
21 #include <fcntl.h>
22 #include <netinet/in.h>
23 #include <sys/socket.h>
24 #include <unistd.h>
25
26 namespace boost::corosio::detail {
27
28 void
29 select_accept_op::cancel() noexcept
30 {
31 if (acceptor_impl_)
32 acceptor_impl_->cancel_single_op(*this);
33 else
34 request_cancel();
35 }
36
37 void
38 3564 select_accept_op::operator()()
39 {
40 3564 stop_cb.reset();
41
42 3564 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
43
44 3564 if (ec_out)
45 {
46 3564 if (cancelled.load(std::memory_order_acquire))
47 3 *ec_out = capy::error::canceled;
48 3561 else if (errn != 0)
49 *ec_out = make_err(errn);
50 else
51 3561 *ec_out = {};
52 }
53
54 3564 if (success && accepted_fd >= 0)
55 {
56 3561 if (acceptor_impl_)
57 {
58 auto* socket_svc =
59 3561 static_cast<select_acceptor_impl*>(acceptor_impl_)
60 3561 ->service()
61 3561 .socket_service();
62 3561 if (socket_svc)
63 {
64 auto& impl =
65 3561 static_cast<select_socket_impl&>(*socket_svc->construct());
66 3561 impl.set_socket(accepted_fd);
67
68 3561 sockaddr_in local_addr{};
69 3561 socklen_t local_len = sizeof(local_addr);
70 3561 sockaddr_in remote_addr{};
71 3561 socklen_t remote_len = sizeof(remote_addr);
72
73 3561 endpoint local_ep, remote_ep;
74 3561 if (::getsockname(
75 accepted_fd, reinterpret_cast<sockaddr*>(&local_addr),
76 3561 &local_len) == 0)
77 3561 local_ep = from_sockaddr_in(local_addr);
78 3561 if (::getpeername(
79 accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr),
80 3561 &remote_len) == 0)
81 3561 remote_ep = from_sockaddr_in(remote_addr);
82
83 3561 impl.set_endpoints(local_ep, remote_ep);
84
85 3561 if (impl_out)
86 3561 *impl_out = &impl;
87
88 3561 accepted_fd = -1;
89 }
90 else
91 {
92 if (ec_out && !*ec_out)
93 *ec_out = make_err(ENOENT);
94 ::close(accepted_fd);
95 accepted_fd = -1;
96 if (impl_out)
97 *impl_out = nullptr;
98 }
99 }
100 else
101 {
102 ::close(accepted_fd);
103 accepted_fd = -1;
104 if (impl_out)
105 *impl_out = nullptr;
106 }
107 3561 }
108 else
109 {
110 3 if (accepted_fd >= 0)
111 {
112 ::close(accepted_fd);
113 accepted_fd = -1;
114 }
115
116 3 if (peer_impl)
117 {
118 auto* socket_svc_cleanup =
119 static_cast<select_acceptor_impl*>(acceptor_impl_)
120 ->service()
121 .socket_service();
122 if (socket_svc_cleanup)
123 socket_svc_cleanup->destroy(peer_impl);
124 peer_impl = nullptr;
125 }
126
127 3 if (impl_out)
128 3 *impl_out = nullptr;
129 }
130
131 // Move to stack before destroying the frame
132 3564 capy::executor_ref saved_ex(ex);
133 3564 std::coroutine_handle<> saved_h(h);
134 3564 impl_ptr.reset();
135 3564 dispatch_coro(saved_ex, saved_h).resume();
136 3564 }
137
138 44 select_acceptor_impl::select_acceptor_impl(
139 44 select_acceptor_service& svc) noexcept
140 44 : svc_(svc)
141 {
142 44 }
143
144 std::coroutine_handle<>
145 3564 select_acceptor_impl::accept(
146 std::coroutine_handle<> h,
147 capy::executor_ref ex,
148 std::stop_token token,
149 std::error_code* ec,
150 io_object::implementation** impl_out)
151 {
152 3564 auto& op = acc_;
153 3564 op.reset();
154 3564 op.h = h;
155 3564 op.ex = ex;
156 3564 op.ec_out = ec;
157 3564 op.impl_out = impl_out;
158 3564 op.fd = fd_;
159 3564 op.start(token, this);
160
161 3564 sockaddr_in addr{};
162 3564 socklen_t addrlen = sizeof(addr);
163 3564 int accepted = ::accept(fd_, reinterpret_cast<sockaddr*>(&addr), &addrlen);
164
165 3564 if (accepted >= 0)
166 {
167 // Reject fds that exceed select()'s FD_SETSIZE limit.
168 // Better to fail now than during later async operations.
169 2 if (accepted >= FD_SETSIZE)
170 {
171 ::close(accepted);
172 op.accepted_fd = -1;
173 op.complete(EINVAL, 0);
174 op.impl_ptr = shared_from_this();
175 svc_.post(&op);
176 // completion is always posted to scheduler queue, never inline.
177 return std::noop_coroutine();
178 }
179
180 // Set non-blocking and close-on-exec flags.
181 // A non-blocking socket is essential for the async reactor;
182 // if we can't configure it, fail rather than risk blocking.
183 2 int flags = ::fcntl(accepted, F_GETFL, 0);
184 2 if (flags == -1)
185 {
186 int err = errno;
187 ::close(accepted);
188 op.accepted_fd = -1;
189 op.complete(err, 0);
190 op.impl_ptr = shared_from_this();
191 svc_.post(&op);
192 // completion is always posted to scheduler queue, never inline.
193 return std::noop_coroutine();
194 }
195
196 2 if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
197 {
198 int err = errno;
199 ::close(accepted);
200 op.accepted_fd = -1;
201 op.complete(err, 0);
202 op.impl_ptr = shared_from_this();
203 svc_.post(&op);
204 // completion is always posted to scheduler queue, never inline.
205 return std::noop_coroutine();
206 }
207
208 2 if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
209 {
210 int err = errno;
211 ::close(accepted);
212 op.accepted_fd = -1;
213 op.complete(err, 0);
214 op.impl_ptr = shared_from_this();
215 svc_.post(&op);
216 // completion is always posted to scheduler queue, never inline.
217 return std::noop_coroutine();
218 }
219
220 2 op.accepted_fd = accepted;
221 2 op.complete(0, 0);
222 2 op.impl_ptr = shared_from_this();
223 2 svc_.post(&op);
224 // completion is always posted to scheduler queue, never inline.
225 2 return std::noop_coroutine();
226 }
227
228 3562 if (errno == EAGAIN || errno == EWOULDBLOCK)
229 {
230 3562 svc_.work_started();
231 3562 op.impl_ptr = shared_from_this();
232
233 // Set registering BEFORE register_fd to close the race window where
234 // reactor sees an event before we set registered.
235 3562 op.registered.store(
236 select_registration_state::registering, std::memory_order_release);
237 3562 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
238
239 // Transition to registered. If this fails, reactor or cancel already
240 // claimed the op (state is now unregistered), so we're done. However,
241 // we must still deregister the fd because cancel's deregister_fd may
242 // have run before our register_fd, leaving the fd orphaned.
243 3562 auto expected = select_registration_state::registering;
244 3562 if (!op.registered.compare_exchange_strong(
245 expected, select_registration_state::registered,
246 std::memory_order_acq_rel))
247 {
248 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
249 // completion is always posted to scheduler queue, never inline.
250 return std::noop_coroutine();
251 }
252
253 // If cancelled was set before we registered, handle it now.
254 3562 if (op.cancelled.load(std::memory_order_acquire))
255 {
256 auto prev = op.registered.exchange(
257 select_registration_state::unregistered,
258 std::memory_order_acq_rel);
259 if (prev != select_registration_state::unregistered)
260 {
261 svc_.scheduler().deregister_fd(
262 fd_, select_scheduler::event_read);
263 op.impl_ptr = shared_from_this();
264 svc_.post(&op);
265 svc_.work_finished();
266 }
267 }
268 // completion is always posted to scheduler queue, never inline.
269 3562 return std::noop_coroutine();
270 }
271
272 op.complete(errno, 0);
273 op.impl_ptr = shared_from_this();
274 svc_.post(&op);
275 // completion is always posted to scheduler queue, never inline.
276 return std::noop_coroutine();
277 }
278
279 void
280 1 select_acceptor_impl::cancel() noexcept
281 {
282 1 auto self = weak_from_this().lock();
283 1 if (!self)
284 return;
285
286 1 auto prev = acc_.registered.exchange(
287 select_registration_state::unregistered, std::memory_order_acq_rel);
288 1 acc_.request_cancel();
289
290 1 if (prev != select_registration_state::unregistered)
291 {
292 1 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
293 1 acc_.impl_ptr = self;
294 1 svc_.post(&acc_);
295 1 svc_.work_finished();
296 }
297 1 }
298
299 void
300 select_acceptor_impl::cancel_single_op(select_op& op) noexcept
301 {
302 auto self = weak_from_this().lock();
303 if (!self)
304 return;
305
306 // Called from stop_token callback to cancel a specific pending operation.
307 auto prev = op.registered.exchange(
308 select_registration_state::unregistered, std::memory_order_acq_rel);
309 op.request_cancel();
310
311 if (prev != select_registration_state::unregistered)
312 {
313 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
314
315 op.impl_ptr = self;
316 svc_.post(&op);
317 svc_.work_finished();
318 }
319 }
320
321 void
322 172 select_acceptor_impl::close_socket() noexcept
323 {
324 172 auto self = weak_from_this().lock();
325 172 if (self)
326 {
327 172 auto prev = acc_.registered.exchange(
328 select_registration_state::unregistered,
329 std::memory_order_acq_rel);
330 172 acc_.request_cancel();
331
332 172 if (prev != select_registration_state::unregistered)
333 {
334 2 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
335 2 acc_.impl_ptr = self;
336 2 svc_.post(&acc_);
337 2 svc_.work_finished();
338 }
339 }
340
341 172 if (fd_ >= 0)
342 {
343 42 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
344 42 ::close(fd_);
345 42 fd_ = -1;
346 }
347
348 172 local_endpoint_ = endpoint{};
349 172 }
350
351 133 select_acceptor_service::select_acceptor_service(capy::execution_context& ctx)
352 133 : ctx_(ctx)
353 133 , state_(
354 std::make_unique<select_acceptor_state>(
355 133 ctx.use_service<select_scheduler>()))
356 {
357 133 }
358
359 266 select_acceptor_service::~select_acceptor_service() {}
360
361 void
362 133 select_acceptor_service::shutdown()
363 {
364 133 std::lock_guard lock(state_->mutex_);
365
366 133 while (auto* impl = state_->acceptor_list_.pop_front())
367 impl->close_socket();
368
369 // Don't clear acceptor_ptrs_ here — same rationale as
370 // select_socket_service::shutdown(). Let ~state_ release ptrs
371 // after scheduler shutdown has drained all queued ops.
372 133 }
373
374 io_object::implementation*
375 44 select_acceptor_service::construct()
376 {
377 44 auto impl = std::make_shared<select_acceptor_impl>(*this);
378 44 auto* raw = impl.get();
379
380 44 std::lock_guard lock(state_->mutex_);
381 44 state_->acceptor_list_.push_back(raw);
382 44 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
383
384 44 return raw;
385 44 }
386
387 void
388 44 select_acceptor_service::destroy(io_object::implementation* impl)
389 {
390 44 auto* select_impl = static_cast<select_acceptor_impl*>(impl);
391 44 select_impl->close_socket();
392 44 std::lock_guard lock(state_->mutex_);
393 44 state_->acceptor_list_.remove(select_impl);
394 44 state_->acceptor_ptrs_.erase(select_impl);
395 44 }
396
397 void
398 86 select_acceptor_service::close(io_object::handle& h)
399 {
400 86 static_cast<select_acceptor_impl*>(h.get())->close_socket();
401 86 }
402
403 std::error_code
404 42 select_acceptor_service::open_acceptor(
405 tcp_acceptor::implementation& impl, endpoint ep, int backlog)
406 {
407 42 auto* select_impl = static_cast<select_acceptor_impl*>(&impl);
408 42 select_impl->close_socket();
409
410 42 int fd = ::socket(AF_INET, SOCK_STREAM, 0);
411 42 if (fd < 0)
412 return make_err(errno);
413
414 // Set non-blocking and close-on-exec
415 42 int flags = ::fcntl(fd, F_GETFL, 0);
416 42 if (flags == -1)
417 {
418 int errn = errno;
419 ::close(fd);
420 return make_err(errn);
421 }
422 42 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
423 {
424 int errn = errno;
425 ::close(fd);
426 return make_err(errn);
427 }
428 42 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
429 {
430 int errn = errno;
431 ::close(fd);
432 return make_err(errn);
433 }
434
435 // Check fd is within select() limits
436 42 if (fd >= FD_SETSIZE)
437 {
438 ::close(fd);
439 return make_err(EMFILE);
440 }
441
442 42 int reuse = 1;
443 42 ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
444
445 42 sockaddr_in addr = detail::to_sockaddr_in(ep);
446 42 if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
447 {
448 int errn = errno;
449 ::close(fd);
450 return make_err(errn);
451 }
452
453 42 if (::listen(fd, backlog) < 0)
454 {
455 int errn = errno;
456 ::close(fd);
457 return make_err(errn);
458 }
459
460 42 select_impl->fd_ = fd;
461
462 // Cache the local endpoint (queries OS for ephemeral port if port was 0)
463 42 sockaddr_in local_addr{};
464 42 socklen_t local_len = sizeof(local_addr);
465 42 if (::getsockname(
466 42 fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
467 42 select_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
468
469 42 return {};
470 }
471
472 void
473 5 select_acceptor_service::post(select_op* op)
474 {
475 5 state_->sched_.post(op);
476 5 }
477
478 void
479 3562 select_acceptor_service::work_started() noexcept
480 {
481 3562 state_->sched_.work_started();
482 3562 }
483
484 void
485 3 select_acceptor_service::work_finished() noexcept
486 {
487 3 state_->sched_.work_finished();
488 3 }
489
490 select_socket_service*
491 3561 select_acceptor_service::socket_service() const noexcept
492 {
493 3561 auto* svc = ctx_.find_service<detail::socket_service>();
494 3561 return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
495 }
496
497 } // namespace boost::corosio::detail
498
499 #endif // BOOST_COROSIO_HAS_SELECT
500