src/corosio/src/detail/epoll/acceptors.cpp

81.0% Lines (187/231) 100.0% Functions (18/18)
src/corosio/src/detail/epoll/acceptors.cpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/acceptors.hpp"
15 #include "src/detail/epoll/sockets.hpp"
16 #include "src/detail/endpoint_convert.hpp"
17 #include "src/detail/dispatch_coro.hpp"
18 #include "src/detail/make_err.hpp"
19
20 #include <utility>
21
22 #include <errno.h>
23 #include <netinet/in.h>
24 #include <sys/epoll.h>
25 #include <sys/socket.h>
26 #include <unistd.h>
27
28 namespace boost::corosio::detail {
29
30 void
31 6 epoll_accept_op::cancel() noexcept
32 {
33 6 if (acceptor_impl_)
34 6 acceptor_impl_->cancel_single_op(*this);
35 else
36 request_cancel();
37 6 }
38
39 void
40 4823 epoll_accept_op::operator()()
41 {
42 4823 stop_cb.reset();
43
44 4823 static_cast<epoll_acceptor_impl*>(acceptor_impl_)
45 4823 ->service()
46 4823 .scheduler()
47 4823 .reset_inline_budget();
48
49 4823 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
50
51 4823 if (cancelled.load(std::memory_order_acquire))
52 9 *ec_out = capy::error::canceled;
53 4814 else if (errn != 0)
54 *ec_out = make_err(errn);
55 else
56 4814 *ec_out = {};
57
58 // Set up the peer socket on success
59 4823 if (success && accepted_fd >= 0 && acceptor_impl_)
60 {
61 4814 auto* socket_svc = static_cast<epoll_acceptor_impl*>(acceptor_impl_)
62 4814 ->service()
63 4814 .socket_service();
64 4814 if (socket_svc)
65 {
66 auto& impl =
67 4814 static_cast<epoll_socket_impl&>(*socket_svc->construct());
68 4814 impl.set_socket(accepted_fd);
69
70 4814 impl.desc_state_.fd = accepted_fd;
71 {
72 4814 std::lock_guard lock(impl.desc_state_.mutex);
73 4814 impl.desc_state_.read_op = nullptr;
74 4814 impl.desc_state_.write_op = nullptr;
75 4814 impl.desc_state_.connect_op = nullptr;
76 4814 }
77 4814 socket_svc->scheduler().register_descriptor(
78 accepted_fd, &impl.desc_state_);
79
80 4814 impl.set_endpoints(
81 4814 static_cast<epoll_acceptor_impl*>(acceptor_impl_)
82 ->local_endpoint(),
83 4814 from_sockaddr_in(peer_addr));
84
85 4814 if (impl_out)
86 4814 *impl_out = &impl;
87 4814 accepted_fd = -1;
88 }
89 else
90 {
91 // No socket service — treat as error
92 *ec_out = make_err(ENOENT);
93 success = false;
94 }
95 }
96
97 4823 if (!success || !acceptor_impl_)
98 {
99 9 if (accepted_fd >= 0)
100 {
101 ::close(accepted_fd);
102 accepted_fd = -1;
103 }
104 9 if (impl_out)
105 9 *impl_out = nullptr;
106 }
107
108 // Move to stack before resuming. See epoll_op::operator()() for rationale.
109 4823 capy::executor_ref saved_ex(ex);
110 4823 std::coroutine_handle<> saved_h(h);
111 4823 auto prevent_premature_destruction = std::move(impl_ptr);
112 4823 dispatch_coro(saved_ex, saved_h).resume();
113 4823 }
114
115 65 epoll_acceptor_impl::epoll_acceptor_impl(epoll_acceptor_service& svc) noexcept
116 65 : svc_(svc)
117 {
118 65 }
119
120 std::coroutine_handle<>
121 4823 epoll_acceptor_impl::accept(
122 std::coroutine_handle<> h,
123 capy::executor_ref ex,
124 std::stop_token token,
125 std::error_code* ec,
126 io_object::implementation** impl_out)
127 {
128 4823 auto& op = acc_;
129 4823 op.reset();
130 4823 op.h = h;
131 4823 op.ex = ex;
132 4823 op.ec_out = ec;
133 4823 op.impl_out = impl_out;
134 4823 op.fd = fd_;
135 4823 op.start(token, this);
136
137 4823 sockaddr_in addr{};
138 4823 socklen_t addrlen = sizeof(addr);
139 int accepted;
140 do
141 {
142 4823 accepted = ::accept4(
143 fd_, reinterpret_cast<sockaddr*>(&addr), &addrlen,
144 SOCK_NONBLOCK | SOCK_CLOEXEC);
145 }
146 4823 while (accepted < 0 && errno == EINTR);
147
148 4823 if (accepted >= 0)
149 {
150 {
151 2 std::lock_guard lock(desc_state_.mutex);
152 2 desc_state_.read_ready = false;
153 2 }
154
155 2 if (svc_.scheduler().try_consume_inline_budget())
156 {
157 auto* socket_svc = svc_.socket_service();
158 if (socket_svc)
159 {
160 auto& impl =
161 static_cast<epoll_socket_impl&>(*socket_svc->construct());
162 impl.set_socket(accepted);
163
164 impl.desc_state_.fd = accepted;
165 {
166 std::lock_guard lock(impl.desc_state_.mutex);
167 impl.desc_state_.read_op = nullptr;
168 impl.desc_state_.write_op = nullptr;
169 impl.desc_state_.connect_op = nullptr;
170 }
171 socket_svc->scheduler().register_descriptor(
172 accepted, &impl.desc_state_);
173
174 impl.set_endpoints(local_endpoint_, from_sockaddr_in(addr));
175
176 *ec = {};
177 if (impl_out)
178 *impl_out = &impl;
179 }
180 else
181 {
182 ::close(accepted);
183 *ec = make_err(ENOENT);
184 if (impl_out)
185 *impl_out = nullptr;
186 }
187 return dispatch_coro(ex, h);
188 }
189
190 2 op.accepted_fd = accepted;
191 2 op.peer_addr = addr;
192 2 op.complete(0, 0);
193 2 op.impl_ptr = shared_from_this();
194 2 svc_.post(&op);
195 2 return std::noop_coroutine();
196 }
197
198 4821 if (errno == EAGAIN || errno == EWOULDBLOCK)
199 {
200 4821 op.impl_ptr = shared_from_this();
201 4821 svc_.work_started();
202
203 4821 std::lock_guard lock(desc_state_.mutex);
204 4821 bool io_done = false;
205 4821 if (desc_state_.read_ready)
206 {
207 desc_state_.read_ready = false;
208 op.perform_io();
209 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
210 if (!io_done)
211 op.errn = 0;
212 }
213
214 4821 if (io_done || op.cancelled.load(std::memory_order_acquire))
215 {
216 svc_.post(&op);
217 svc_.work_finished();
218 }
219 else
220 {
221 4821 desc_state_.read_op = &op;
222 }
223 4821 return std::noop_coroutine();
224 4821 }
225
226 op.complete(errno, 0);
227 op.impl_ptr = shared_from_this();
228 svc_.post(&op);
229 // completion is always posted to scheduler queue, never inline.
230 return std::noop_coroutine();
231 }
232
233 void
234 1 epoll_acceptor_impl::cancel() noexcept
235 {
236 1 cancel_single_op(acc_);
237 1 }
238
239 void
240 7 epoll_acceptor_impl::cancel_single_op(epoll_op& op) noexcept
241 {
242 7 auto self = weak_from_this().lock();
243 7 if (!self)
244 return;
245
246 7 op.request_cancel();
247
248 7 epoll_op* claimed = nullptr;
249 {
250 7 std::lock_guard lock(desc_state_.mutex);
251 7 if (desc_state_.read_op == &op)
252 7 claimed = std::exchange(desc_state_.read_op, nullptr);
253 7 }
254 7 if (claimed)
255 {
256 7 op.impl_ptr = self;
257 7 svc_.post(&op);
258 7 svc_.work_finished();
259 }
260 7 }
261
262 void
263 256 epoll_acceptor_impl::close_socket() noexcept
264 {
265 256 auto self = weak_from_this().lock();
266 256 if (self)
267 {
268 256 acc_.request_cancel();
269
270 256 epoll_op* claimed = nullptr;
271 {
272 256 std::lock_guard lock(desc_state_.mutex);
273 256 claimed = std::exchange(desc_state_.read_op, nullptr);
274 256 desc_state_.read_ready = false;
275 256 desc_state_.write_ready = false;
276 256 }
277
278 256 if (claimed)
279 {
280 2 acc_.impl_ptr = self;
281 2 svc_.post(&acc_);
282 2 svc_.work_finished();
283 }
284
285 256 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
286 desc_state_.impl_ref_ = self;
287 }
288
289 256 if (fd_ >= 0)
290 {
291 62 if (desc_state_.registered_events != 0)
292 62 svc_.scheduler().deregister_descriptor(fd_);
293 62 ::close(fd_);
294 62 fd_ = -1;
295 }
296
297 256 desc_state_.fd = -1;
298 256 desc_state_.registered_events = 0;
299
300 256 local_endpoint_ = endpoint{};
301 256 }
302
303 203 epoll_acceptor_service::epoll_acceptor_service(capy::execution_context& ctx)
304 203 : ctx_(ctx)
305 203 , state_(
306 std::make_unique<epoll_acceptor_state>(
307 203 ctx.use_service<epoll_scheduler>()))
308 {
309 203 }
310
311 406 epoll_acceptor_service::~epoll_acceptor_service() {}
312
313 void
314 203 epoll_acceptor_service::shutdown()
315 {
316 203 std::lock_guard lock(state_->mutex_);
317
318 203 while (auto* impl = state_->acceptor_list_.pop_front())
319 impl->close_socket();
320
321 // Don't clear acceptor_ptrs_ here — same rationale as
322 // epoll_socket_service::shutdown(). Let ~state_ release ptrs
323 // after scheduler shutdown has drained all queued ops.
324 203 }
325
326 io_object::implementation*
327 65 epoll_acceptor_service::construct()
328 {
329 65 auto impl = std::make_shared<epoll_acceptor_impl>(*this);
330 65 auto* raw = impl.get();
331
332 65 std::lock_guard lock(state_->mutex_);
333 65 state_->acceptor_list_.push_back(raw);
334 65 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
335
336 65 return raw;
337 65 }
338
339 void
340 65 epoll_acceptor_service::destroy(io_object::implementation* impl)
341 {
342 65 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(impl);
343 65 epoll_impl->close_socket();
344 65 std::lock_guard lock(state_->mutex_);
345 65 state_->acceptor_list_.remove(epoll_impl);
346 65 state_->acceptor_ptrs_.erase(epoll_impl);
347 65 }
348
349 void
350 127 epoll_acceptor_service::close(io_object::handle& h)
351 {
352 127 static_cast<epoll_acceptor_impl*>(h.get())->close_socket();
353 127 }
354
355 std::error_code
356 64 epoll_acceptor_service::open_acceptor(
357 tcp_acceptor::implementation& impl, endpoint ep, int backlog)
358 {
359 64 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
360 64 epoll_impl->close_socket();
361
362 64 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
363 64 if (fd < 0)
364 return make_err(errno);
365
366 64 int reuse = 1;
367 64 ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
368
369 64 sockaddr_in addr = detail::to_sockaddr_in(ep);
370 64 if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
371 {
372 2 int errn = errno;
373 2 ::close(fd);
374 2 return make_err(errn);
375 }
376
377 62 if (::listen(fd, backlog) < 0)
378 {
379 int errn = errno;
380 ::close(fd);
381 return make_err(errn);
382 }
383
384 62 epoll_impl->fd_ = fd;
385
386 // Register fd with epoll (edge-triggered mode)
387 62 epoll_impl->desc_state_.fd = fd;
388 {
389 62 std::lock_guard lock(epoll_impl->desc_state_.mutex);
390 62 epoll_impl->desc_state_.read_op = nullptr;
391 62 }
392 62 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
393
394 // Cache the local endpoint (queries OS for ephemeral port if port was 0)
395 62 sockaddr_in local_addr{};
396 62 socklen_t local_len = sizeof(local_addr);
397 62 if (::getsockname(
398 62 fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
399 62 epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
400
401 62 return {};
402 }
403
404 void
405 11 epoll_acceptor_service::post(epoll_op* op)
406 {
407 11 state_->sched_.post(op);
408 11 }
409
410 void
411 4821 epoll_acceptor_service::work_started() noexcept
412 {
413 4821 state_->sched_.work_started();
414 4821 }
415
416 void
417 9 epoll_acceptor_service::work_finished() noexcept
418 {
419 9 state_->sched_.work_finished();
420 9 }
421
422 epoll_socket_service*
423 4814 epoll_acceptor_service::socket_service() const noexcept
424 {
425 4814 auto* svc = ctx_.find_service<detail::socket_service>();
426 4814 return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
427 }
428
429 } // namespace boost::corosio::detail
430
431 #endif // BOOST_COROSIO_HAS_EPOLL
432