src/corosio/src/detail/epoll/sockets.cpp

80.1% Lines (371/463) 94.4% Functions (34/36)
src/corosio/src/detail/epoll/sockets.cpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/dispatch_coro.hpp"
18
19 #include <boost/corosio/detail/except.hpp>
20 #include <boost/capy/buffers.hpp>
21
22 #include <utility>
23
24 #include <errno.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <sys/epoll.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30
31 namespace boost::corosio::detail {
32
33 // Register an op with the reactor, handling cached edge events.
34 // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
35 void
36 5018 epoll_socket_impl::register_op(
37 epoll_op& op,
38 epoll_op*& desc_slot,
39 bool& ready_flag,
40 bool& cancel_flag) noexcept
41 {
42 5018 svc_.work_started();
43
44 5018 std::lock_guard lock(desc_state_.mutex);
45 5018 bool io_done = false;
46 5018 if (ready_flag)
47 {
48 142 ready_flag = false;
49 142 op.perform_io();
50 142 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
51 142 if (!io_done)
52 142 op.errn = 0;
53 }
54
55 5018 if (cancel_flag)
56 {
57 95 cancel_flag = false;
58 95 op.cancelled.store(true, std::memory_order_relaxed);
59 }
60
61 5018 if (io_done || op.cancelled.load(std::memory_order_acquire))
62 {
63 95 svc_.post(&op);
64 95 svc_.work_finished();
65 }
66 else
67 {
68 4923 desc_slot = &op;
69 }
70 5018 }
71
72 void
73 105 epoll_op::canceller::operator()() const noexcept
74 {
75 105 op->cancel();
76 105 }
77
78 void
79 epoll_connect_op::cancel() noexcept
80 {
81 if (socket_impl_)
82 socket_impl_->cancel_single_op(*this);
83 else
84 request_cancel();
85 }
86
87 void
88 99 epoll_read_op::cancel() noexcept
89 {
90 99 if (socket_impl_)
91 99 socket_impl_->cancel_single_op(*this);
92 else
93 request_cancel();
94 99 }
95
96 void
97 epoll_write_op::cancel() noexcept
98 {
99 if (socket_impl_)
100 socket_impl_->cancel_single_op(*this);
101 else
102 request_cancel();
103 }
104
105 void
106 81568 epoll_op::operator()()
107 {
108 81568 stop_cb.reset();
109
110 81568 socket_impl_->svc_.scheduler().reset_inline_budget();
111
112 81568 if (cancelled.load(std::memory_order_acquire))
113 206 *ec_out = capy::error::canceled;
114 81362 else if (errn != 0)
115 *ec_out = make_err(errn);
116 81362 else if (is_read_operation() && bytes_transferred == 0)
117 *ec_out = capy::error::eof;
118 else
119 81362 *ec_out = {};
120
121 81568 *bytes_out = bytes_transferred;
122
123 // Move to stack before resuming coroutine. The coroutine might close
124 // the socket, releasing the last wrapper ref. If impl_ptr were the
125 // last ref and we destroyed it while still in operator(), we'd have
126 // use-after-free. Moving to local ensures destruction happens at
127 // function exit, after all member accesses are complete.
128 81568 capy::executor_ref saved_ex(ex);
129 81568 std::coroutine_handle<> saved_h(h);
130 81568 auto prevent_premature_destruction = std::move(impl_ptr);
131 81568 dispatch_coro(saved_ex, saved_h).resume();
132 81568 }
133
134 void
135 4816 epoll_connect_op::operator()()
136 {
137 4816 stop_cb.reset();
138
139 4816 socket_impl_->svc_.scheduler().reset_inline_budget();
140
141 4816 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
142
143 // Cache endpoints on successful connect
144 4816 if (success && socket_impl_)
145 {
146 // Query local endpoint via getsockname (may fail, but remote is always known)
147 4814 endpoint local_ep;
148 4814 sockaddr_in local_addr{};
149 4814 socklen_t local_len = sizeof(local_addr);
150 4814 if (::getsockname(
151 4814 fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
152 4814 local_ep = from_sockaddr_in(local_addr);
153 // Always cache remote endpoint; local may be default if getsockname failed
154 4814 static_cast<epoll_socket_impl*>(socket_impl_)
155 4814 ->set_endpoints(local_ep, target_endpoint);
156 }
157
158 4816 if (cancelled.load(std::memory_order_acquire))
159 *ec_out = capy::error::canceled;
160 4816 else if (errn != 0)
161 2 *ec_out = make_err(errn);
162 else
163 4814 *ec_out = {};
164
165 // Move to stack before resuming. See epoll_op::operator()() for rationale.
166 4816 capy::executor_ref saved_ex(ex);
167 4816 std::coroutine_handle<> saved_h(h);
168 4816 auto prevent_premature_destruction = std::move(impl_ptr);
169 4816 dispatch_coro(saved_ex, saved_h).resume();
170 4816 }
171
172 14499 epoll_socket_impl::epoll_socket_impl(epoll_socket_service& svc) noexcept
173 14499 : svc_(svc)
174 {
175 14499 }
176
177 14499 epoll_socket_impl::~epoll_socket_impl() = default;
178
179 std::coroutine_handle<>
180 4816 epoll_socket_impl::connect(
181 std::coroutine_handle<> h,
182 capy::executor_ref ex,
183 endpoint ep,
184 std::stop_token token,
185 std::error_code* ec)
186 {
187 4816 auto& op = conn_;
188
189 4816 sockaddr_in addr = detail::to_sockaddr_in(ep);
190 int result =
191 4816 ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
192
193 4816 if (result == 0)
194 {
195 sockaddr_in local_addr{};
196 socklen_t local_len = sizeof(local_addr);
197 if (::getsockname(
198 fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
199 local_endpoint_ = detail::from_sockaddr_in(local_addr);
200 remote_endpoint_ = ep;
201 }
202
203 4816 if (result == 0 || errno != EINPROGRESS)
204 {
205 int err = (result < 0) ? errno : 0;
206 if (svc_.scheduler().try_consume_inline_budget())
207 {
208 *ec = err ? make_err(err) : std::error_code{};
209 return dispatch_coro(ex, h);
210 }
211 op.reset();
212 op.h = h;
213 op.ex = ex;
214 op.ec_out = ec;
215 op.fd = fd_;
216 op.target_endpoint = ep;
217 op.start(token, this);
218 op.impl_ptr = shared_from_this();
219 op.complete(err, 0);
220 svc_.post(&op);
221 return std::noop_coroutine();
222 }
223
224 // EINPROGRESS — register with reactor
225 4816 op.reset();
226 4816 op.h = h;
227 4816 op.ex = ex;
228 4816 op.ec_out = ec;
229 4816 op.fd = fd_;
230 4816 op.target_endpoint = ep;
231 4816 op.start(token, this);
232 4816 op.impl_ptr = shared_from_this();
233
234 4816 register_op(
235 4816 op, desc_state_.connect_op, desc_state_.write_ready,
236 4816 desc_state_.connect_cancel_pending);
237 4816 return std::noop_coroutine();
238 }
239
240 std::coroutine_handle<>
241 203804 epoll_socket_impl::read_some(
242 std::coroutine_handle<> h,
243 capy::executor_ref ex,
244 io_buffer_param param,
245 std::stop_token token,
246 std::error_code* ec,
247 std::size_t* bytes_out)
248 {
249 203804 auto& op = rd_;
250 203804 op.reset();
251
252 203804 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
253 203804 op.iovec_count =
254 203804 static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
255
256 203804 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
257 {
258 1 op.empty_buffer_read = true;
259 1 op.h = h;
260 1 op.ex = ex;
261 1 op.ec_out = ec;
262 1 op.bytes_out = bytes_out;
263 1 op.start(token, this);
264 1 op.impl_ptr = shared_from_this();
265 1 op.complete(0, 0);
266 1 svc_.post(&op);
267 1 return std::noop_coroutine();
268 }
269
270 407606 for (int i = 0; i < op.iovec_count; ++i)
271 {
272 203803 op.iovecs[i].iov_base = bufs[i].data();
273 203803 op.iovecs[i].iov_len = bufs[i].size();
274 }
275
276 // Speculative read
277 ssize_t n;
278 do
279 {
280 203803 n = ::readv(fd_, op.iovecs, op.iovec_count);
281 }
282 203803 while (n < 0 && errno == EINTR);
283
284 203803 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
285 {
286 203601 int err = (n < 0) ? errno : 0;
287 203601 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
288
289 203601 if (svc_.scheduler().try_consume_inline_budget())
290 {
291 162928 if (err)
292 *ec = make_err(err);
293 162928 else if (n == 0)
294 5 *ec = capy::error::eof;
295 else
296 162923 *ec = {};
297 162928 *bytes_out = bytes;
298 162928 return dispatch_coro(ex, h);
299 }
300 40673 op.h = h;
301 40673 op.ex = ex;
302 40673 op.ec_out = ec;
303 40673 op.bytes_out = bytes_out;
304 40673 op.start(token, this);
305 40673 op.impl_ptr = shared_from_this();
306 40673 op.complete(err, bytes);
307 40673 svc_.post(&op);
308 40673 return std::noop_coroutine();
309 }
310
311 // EAGAIN — register with reactor
312 202 op.h = h;
313 202 op.ex = ex;
314 202 op.ec_out = ec;
315 202 op.bytes_out = bytes_out;
316 202 op.fd = fd_;
317 202 op.start(token, this);
318 202 op.impl_ptr = shared_from_this();
319
320 202 register_op(
321 202 op, desc_state_.read_op, desc_state_.read_ready,
322 202 desc_state_.read_cancel_pending);
323 202 return std::noop_coroutine();
324 }
325
326 std::coroutine_handle<>
327 203604 epoll_socket_impl::write_some(
328 std::coroutine_handle<> h,
329 capy::executor_ref ex,
330 io_buffer_param param,
331 std::stop_token token,
332 std::error_code* ec,
333 std::size_t* bytes_out)
334 {
335 203604 auto& op = wr_;
336 203604 op.reset();
337
338 203604 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
339 203604 op.iovec_count =
340 203604 static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
341
342 203604 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
343 {
344 1 op.h = h;
345 1 op.ex = ex;
346 1 op.ec_out = ec;
347 1 op.bytes_out = bytes_out;
348 1 op.start(token, this);
349 1 op.impl_ptr = shared_from_this();
350 1 op.complete(0, 0);
351 1 svc_.post(&op);
352 1 return std::noop_coroutine();
353 }
354
355 407206 for (int i = 0; i < op.iovec_count; ++i)
356 {
357 203603 op.iovecs[i].iov_base = bufs[i].data();
358 203603 op.iovecs[i].iov_len = bufs[i].size();
359 }
360
361 // Speculative write
362 203603 msghdr msg{};
363 203603 msg.msg_iov = op.iovecs;
364 203603 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
365
366 ssize_t n;
367 do
368 {
369 203603 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
370 }
371 203603 while (n < 0 && errno == EINTR);
372
373 203603 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
374 {
375 203603 int err = (n < 0) ? errno : 0;
376 203603 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
377
378 203603 if (svc_.scheduler().try_consume_inline_budget())
379 {
380 162912 *ec = err ? make_err(err) : std::error_code{};
381 162912 *bytes_out = bytes;
382 162912 return dispatch_coro(ex, h);
383 }
384 40691 op.h = h;
385 40691 op.ex = ex;
386 40691 op.ec_out = ec;
387 40691 op.bytes_out = bytes_out;
388 40691 op.start(token, this);
389 40691 op.impl_ptr = shared_from_this();
390 40691 op.complete(err, bytes);
391 40691 svc_.post(&op);
392 40691 return std::noop_coroutine();
393 }
394
395 // EAGAIN — register with reactor
396 op.h = h;
397 op.ex = ex;
398 op.ec_out = ec;
399 op.bytes_out = bytes_out;
400 op.fd = fd_;
401 op.start(token, this);
402 op.impl_ptr = shared_from_this();
403
404 register_op(
405 op, desc_state_.write_op, desc_state_.write_ready,
406 desc_state_.write_cancel_pending);
407 return std::noop_coroutine();
408 }
409
410 std::error_code
411 3 epoll_socket_impl::shutdown(tcp_socket::shutdown_type what) noexcept
412 {
413 int how;
414 3 switch (what)
415 {
416 1 case tcp_socket::shutdown_receive:
417 1 how = SHUT_RD;
418 1 break;
419 1 case tcp_socket::shutdown_send:
420 1 how = SHUT_WR;
421 1 break;
422 1 case tcp_socket::shutdown_both:
423 1 how = SHUT_RDWR;
424 1 break;
425 default:
426 return make_err(EINVAL);
427 }
428 3 if (::shutdown(fd_, how) != 0)
429 return make_err(errno);
430 3 return {};
431 }
432
433 std::error_code
434 5 epoll_socket_impl::set_no_delay(bool value) noexcept
435 {
436 5 int flag = value ? 1 : 0;
437 5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
438 return make_err(errno);
439 5 return {};
440 }
441
442 bool
443 5 epoll_socket_impl::no_delay(std::error_code& ec) const noexcept
444 {
445 5 int flag = 0;
446 5 socklen_t len = sizeof(flag);
447 5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
448 {
449 ec = make_err(errno);
450 return false;
451 }
452 5 ec = {};
453 5 return flag != 0;
454 }
455
456 std::error_code
457 4 epoll_socket_impl::set_keep_alive(bool value) noexcept
458 {
459 4 int flag = value ? 1 : 0;
460 4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
461 return make_err(errno);
462 4 return {};
463 }
464
465 bool
466 4 epoll_socket_impl::keep_alive(std::error_code& ec) const noexcept
467 {
468 4 int flag = 0;
469 4 socklen_t len = sizeof(flag);
470 4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
471 {
472 ec = make_err(errno);
473 return false;
474 }
475 4 ec = {};
476 4 return flag != 0;
477 }
478
479 std::error_code
480 1 epoll_socket_impl::set_receive_buffer_size(int size) noexcept
481 {
482 1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
483 return make_err(errno);
484 1 return {};
485 }
486
487 int
488 3 epoll_socket_impl::receive_buffer_size(std::error_code& ec) const noexcept
489 {
490 3 int size = 0;
491 3 socklen_t len = sizeof(size);
492 3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
493 {
494 ec = make_err(errno);
495 return 0;
496 }
497 3 ec = {};
498 3 return size;
499 }
500
501 std::error_code
502 1 epoll_socket_impl::set_send_buffer_size(int size) noexcept
503 {
504 1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
505 return make_err(errno);
506 1 return {};
507 }
508
509 int
510 3 epoll_socket_impl::send_buffer_size(std::error_code& ec) const noexcept
511 {
512 3 int size = 0;
513 3 socklen_t len = sizeof(size);
514 3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
515 {
516 ec = make_err(errno);
517 return 0;
518 }
519 3 ec = {};
520 3 return size;
521 }
522
523 std::error_code
524 8 epoll_socket_impl::set_linger(bool enabled, int timeout) noexcept
525 {
526 8 if (timeout < 0)
527 1 return make_err(EINVAL);
528 struct ::linger lg;
529 7 lg.l_onoff = enabled ? 1 : 0;
530 7 lg.l_linger = timeout;
531 7 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
532 return make_err(errno);
533 7 return {};
534 }
535
536 tcp_socket::linger_options
537 3 epoll_socket_impl::linger(std::error_code& ec) const noexcept
538 {
539 3 struct ::linger lg{};
540 3 socklen_t len = sizeof(lg);
541 3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
542 {
543 ec = make_err(errno);
544 return {};
545 }
546 3 ec = {};
547 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
548 }
549
550 void
551 187 epoll_socket_impl::cancel() noexcept
552 {
553 187 auto self = weak_from_this().lock();
554 187 if (!self)
555 return;
556
557 187 conn_.request_cancel();
558 187 rd_.request_cancel();
559 187 wr_.request_cancel();
560
561 187 epoll_op* conn_claimed = nullptr;
562 187 epoll_op* rd_claimed = nullptr;
563 187 epoll_op* wr_claimed = nullptr;
564 {
565 187 std::lock_guard lock(desc_state_.mutex);
566 187 if (desc_state_.connect_op == &conn_)
567 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
568 else
569 187 desc_state_.connect_cancel_pending = true;
570 187 if (desc_state_.read_op == &rd_)
571 3 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
572 else
573 184 desc_state_.read_cancel_pending = true;
574 187 if (desc_state_.write_op == &wr_)
575 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
576 else
577 187 desc_state_.write_cancel_pending = true;
578 187 }
579
580 187 if (conn_claimed)
581 {
582 conn_.impl_ptr = self;
583 svc_.post(&conn_);
584 svc_.work_finished();
585 }
586 187 if (rd_claimed)
587 {
588 3 rd_.impl_ptr = self;
589 3 svc_.post(&rd_);
590 3 svc_.work_finished();
591 }
592 187 if (wr_claimed)
593 {
594 wr_.impl_ptr = self;
595 svc_.post(&wr_);
596 svc_.work_finished();
597 }
598 187 }
599
600 void
601 99 epoll_socket_impl::cancel_single_op(epoll_op& op) noexcept
602 {
603 99 auto self = weak_from_this().lock();
604 99 if (!self)
605 return;
606
607 99 op.request_cancel();
608
609 99 epoll_op** desc_op_ptr = nullptr;
610 99 if (&op == &conn_)
611 desc_op_ptr = &desc_state_.connect_op;
612 99 else if (&op == &rd_)
613 99 desc_op_ptr = &desc_state_.read_op;
614 else if (&op == &wr_)
615 desc_op_ptr = &desc_state_.write_op;
616
617 99 if (desc_op_ptr)
618 {
619 99 epoll_op* claimed = nullptr;
620 {
621 99 std::lock_guard lock(desc_state_.mutex);
622 99 if (*desc_op_ptr == &op)
623 99 claimed = std::exchange(*desc_op_ptr, nullptr);
624 else if (&op == &conn_)
625 desc_state_.connect_cancel_pending = true;
626 else if (&op == &rd_)
627 desc_state_.read_cancel_pending = true;
628 else if (&op == &wr_)
629 desc_state_.write_cancel_pending = true;
630 99 }
631 99 if (claimed)
632 {
633 99 op.impl_ptr = self;
634 99 svc_.post(&op);
635 99 svc_.work_finished();
636 }
637 }
638 99 }
639
640 void
641 43466 epoll_socket_impl::close_socket() noexcept
642 {
643 43466 auto self = weak_from_this().lock();
644 43466 if (self)
645 {
646 43466 conn_.request_cancel();
647 43466 rd_.request_cancel();
648 43466 wr_.request_cancel();
649
650 43466 epoll_op* conn_claimed = nullptr;
651 43466 epoll_op* rd_claimed = nullptr;
652 43466 epoll_op* wr_claimed = nullptr;
653 {
654 43466 std::lock_guard lock(desc_state_.mutex);
655 43466 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
656 43466 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
657 43466 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
658 43466 desc_state_.read_ready = false;
659 43466 desc_state_.write_ready = false;
660 43466 desc_state_.read_cancel_pending = false;
661 43466 desc_state_.write_cancel_pending = false;
662 43466 desc_state_.connect_cancel_pending = false;
663 43466 }
664
665 43466 if (conn_claimed)
666 {
667 conn_.impl_ptr = self;
668 svc_.post(&conn_);
669 svc_.work_finished();
670 }
671 43466 if (rd_claimed)
672 {
673 1 rd_.impl_ptr = self;
674 1 svc_.post(&rd_);
675 1 svc_.work_finished();
676 }
677 43466 if (wr_claimed)
678 {
679 wr_.impl_ptr = self;
680 svc_.post(&wr_);
681 svc_.work_finished();
682 }
683
684 43466 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
685 88 desc_state_.impl_ref_ = self;
686 }
687
688 43466 if (fd_ >= 0)
689 {
690 9641 if (desc_state_.registered_events != 0)
691 9641 svc_.scheduler().deregister_descriptor(fd_);
692 9641 ::close(fd_);
693 9641 fd_ = -1;
694 }
695
696 43466 desc_state_.fd = -1;
697 43466 desc_state_.registered_events = 0;
698
699 43466 local_endpoint_ = endpoint{};
700 43466 remote_endpoint_ = endpoint{};
701 43466 }
702
703 203 epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
704 203 : state_(
705 std::make_unique<epoll_socket_state>(
706 203 ctx.use_service<epoll_scheduler>()))
707 {
708 203 }
709
710 406 epoll_socket_service::~epoll_socket_service() {}
711
712 void
713 203 epoll_socket_service::shutdown()
714 {
715 203 std::lock_guard lock(state_->mutex_);
716
717 203 while (auto* impl = state_->socket_list_.pop_front())
718 impl->close_socket();
719
720 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
721 // drains completed_ops_, calling destroy() on each queued op. If we
722 // released our shared_ptrs now, an epoll_op::destroy() could free the
723 // last ref to an impl whose embedded descriptor_state is still linked
724 // in the queue — use-after-free on the next pop(). Letting ~state_
725 // release the ptrs (during service destruction, after scheduler
726 // shutdown) keeps every impl alive until all ops have been drained.
727 203 }
728
729 io_object::implementation*
730 14499 epoll_socket_service::construct()
731 {
732 14499 auto impl = std::make_shared<epoll_socket_impl>(*this);
733 14499 auto* raw = impl.get();
734
735 {
736 14499 std::lock_guard lock(state_->mutex_);
737 14499 state_->socket_list_.push_back(raw);
738 14499 state_->socket_ptrs_.emplace(raw, std::move(impl));
739 14499 }
740
741 14499 return raw;
742 14499 }
743
744 void
745 14499 epoll_socket_service::destroy(io_object::implementation* impl)
746 {
747 14499 auto* epoll_impl = static_cast<epoll_socket_impl*>(impl);
748 14499 epoll_impl->close_socket();
749 14499 std::lock_guard lock(state_->mutex_);
750 14499 state_->socket_list_.remove(epoll_impl);
751 14499 state_->socket_ptrs_.erase(epoll_impl);
752 14499 }
753
754 std::error_code
755 4827 epoll_socket_service::open_socket(tcp_socket::implementation& impl)
756 {
757 4827 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
758 4827 epoll_impl->close_socket();
759
760 4827 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
761 4827 if (fd < 0)
762 return make_err(errno);
763
764 4827 epoll_impl->fd_ = fd;
765
766 // Register fd with epoll (edge-triggered mode)
767 4827 epoll_impl->desc_state_.fd = fd;
768 {
769 4827 std::lock_guard lock(epoll_impl->desc_state_.mutex);
770 4827 epoll_impl->desc_state_.read_op = nullptr;
771 4827 epoll_impl->desc_state_.write_op = nullptr;
772 4827 epoll_impl->desc_state_.connect_op = nullptr;
773 4827 }
774 4827 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
775
776 4827 return {};
777 }
778
779 void
780 24140 epoll_socket_service::close(io_object::handle& h)
781 {
782 24140 static_cast<epoll_socket_impl*>(h.get())->close_socket();
783 24140 }
784
785 void
786 81564 epoll_socket_service::post(epoll_op* op)
787 {
788 81564 state_->sched_.post(op);
789 81564 }
790
791 void
792 5018 epoll_socket_service::work_started() noexcept
793 {
794 5018 state_->sched_.work_started();
795 5018 }
796
797 void
798 198 epoll_socket_service::work_finished() noexcept
799 {
800 198 state_->sched_.work_finished();
801 198 }
802
803 } // namespace boost::corosio::detail
804
805 #endif // BOOST_COROSIO_HAS_EPOLL
806