src/corosio/src/detail/epoll/scheduler.cpp

82.4% Lines (406/493) 93.5% Functions (43/46)
src/corosio/src/detail/epoll/scheduler.cpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/timer_service.hpp"
17 #include "src/detail/make_err.hpp"
18 #include "src/detail/posix/resolver_service.hpp"
19 #include "src/detail/posix/signals.hpp"
20
21 #include <boost/corosio/detail/except.hpp>
22 #include <boost/corosio/detail/thread_local_ptr.hpp>
23
24 #include <chrono>
25 #include <limits>
26 #include <utility>
27
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <sys/epoll.h>
31 #include <sys/eventfd.h>
32 #include <sys/socket.h>
33 #include <sys/timerfd.h>
34 #include <unistd.h>
35
36 /*
37 epoll Scheduler - Single Reactor Model
38 ======================================
39
40 This scheduler uses a thread coordination strategy to provide handler
41 parallelism and avoid the thundering herd problem.
42 Instead of all threads blocking on epoll_wait(), one thread becomes the
43 "reactor" while others wait on a condition variable for handler work.
44
45 Thread Model
46 ------------
47 - ONE thread runs epoll_wait() at a time (the reactor thread)
48 - OTHER threads wait on cond_ (condition variable) for handlers
49 - When work is posted, exactly one waiting thread wakes via notify_one()
50 - This matches Windows IOCP semantics where N posted items wake N threads
51
52 Event Loop Structure (do_one)
53 -----------------------------
54 1. Lock mutex, try to pop handler from queue
55 2. If got handler: execute it (unlocked), return
56 3. If queue empty and no reactor running: become reactor
57 - Run epoll_wait (unlocked), queue I/O completions, loop back
58 4. If queue empty and reactor running: wait on condvar for work
59
60 The task_running_ flag ensures only one thread owns epoll_wait().
61 After the reactor queues I/O completions, it loops back to try getting
62 a handler, giving priority to handler execution over more I/O polling.
63
64 Signaling State (state_)
65 ------------------------
66 The state_ variable encodes two pieces of information:
67 - Bit 0: signaled flag (1 = signaled, persists until cleared)
68 - Upper bits: waiter count (each waiter adds 2 before blocking)
69
70 This allows efficient coordination:
71 - Signalers only call notify when waiters exist (state_ > 1)
72 - Waiters check if already signaled before blocking (fast-path)
73
74 Wake Coordination (wake_one_thread_and_unlock)
75 ----------------------------------------------
76 When posting work:
77 - If waiters exist (state_ > 1): signal and notify_one()
78 - Else if reactor running: interrupt via eventfd write
79 - Else: no-op (thread will find work when it checks queue)
80
81 This avoids waking threads unnecessarily. With cascading wakes,
82 each handler execution wakes at most one additional thread if
83 more work exists in the queue.
84
85 Work Counting
86 -------------
87 outstanding_work_ tracks pending operations. When it hits zero, run()
88 returns. Each operation increments on start, decrements on completion.
89
90 Timer Integration
91 -----------------
92 Timers are handled by timer_service. The reactor adjusts epoll_wait
93 timeout to wake for the nearest timer expiry. When a new timer is
94 scheduled earlier than current, timer_service calls interrupt_reactor()
95 to re-evaluate the timeout.
96 */
97
98 namespace boost::corosio::detail {
99
100 struct scheduler_context
101 {
102 epoll_scheduler const* key;
103 scheduler_context* next;
104 op_queue private_queue;
105 long private_outstanding_work;
106 int inline_budget;
107 int inline_budget_max;
108 bool unassisted;
109
110 197 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
111 197 : key(k)
112 197 , next(n)
113 197 , private_outstanding_work(0)
114 197 , inline_budget(0)
115 197 , inline_budget_max(2)
116 197 , unassisted(false)
117 {
118 197 }
119 };
120
121 namespace {
122
123 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
124
125 struct thread_context_guard
126 {
127 scheduler_context frame_;
128
129 197 explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
130 197 : frame_(ctx, context_stack.get())
131 {
132 197 context_stack.set(&frame_);
133 197 }
134
135 197 ~thread_context_guard() noexcept
136 {
137 197 if (!frame_.private_queue.empty())
138 frame_.key->drain_thread_queue(
139 frame_.private_queue, frame_.private_outstanding_work);
140 197 context_stack.set(frame_.next);
141 197 }
142 };
143
144 scheduler_context*
145 649350 find_context(epoll_scheduler const* self) noexcept
146 {
147 649350 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
148 647671 if (c->key == self)
149 647671 return c;
150 1679 return nullptr;
151 }
152
153 } // namespace
154
155 void
156 91207 epoll_scheduler::reset_inline_budget() const noexcept
157 {
158 91207 if (auto* ctx = find_context(this))
159 {
160 // Cap when no other thread absorbed queued work. A moderate
161 // cap (4) amortizes scheduling for small buffers while avoiding
162 // bursty I/O that fills socket buffers and stalls large transfers.
163 91207 if (ctx->unassisted)
164 {
165 91207 ctx->inline_budget_max = 4;
166 91207 ctx->inline_budget = 4;
167 91207 return;
168 }
169 // Ramp up when previous cycle fully consumed budget.
170 // Reset on partial consumption (EAGAIN hit or peer got scheduled).
171 if (ctx->inline_budget == 0)
172 ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
173 else if (ctx->inline_budget < ctx->inline_budget_max)
174 ctx->inline_budget_max = 2;
175 ctx->inline_budget = ctx->inline_budget_max;
176 }
177 }
178
179 bool
180 407206 epoll_scheduler::try_consume_inline_budget() const noexcept
181 {
182 407206 if (auto* ctx = find_context(this))
183 {
184 407206 if (ctx->inline_budget > 0)
185 {
186 325840 --ctx->inline_budget;
187 325840 return true;
188 }
189 }
190 81366 return false;
191 }
192
193 void
194 67051 descriptor_state::operator()()
195 {
196 67051 is_enqueued_.store(false, std::memory_order_relaxed);
197
198 // Take ownership of impl ref set by close_socket() to prevent
199 // the owning impl from being freed while we're executing
200 67051 auto prevent_impl_destruction = std::move(impl_ref_);
201
202 67051 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
203 67051 if (ev == 0)
204 {
205 scheduler_->compensating_work_started();
206 return;
207 }
208
209 67051 op_queue local_ops;
210
211 67051 int err = 0;
212 67051 if (ev & EPOLLERR)
213 {
214 1 socklen_t len = sizeof(err);
215 1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
216 err = errno;
217 1 if (err == 0)
218 err = EIO;
219 }
220
221 {
222 67051 std::lock_guard lock(mutex);
223 67051 if (ev & EPOLLIN)
224 {
225 21411 if (read_op)
226 {
227 4816 auto* rd = read_op;
228 4816 if (err)
229 rd->complete(err, 0);
230 else
231 4816 rd->perform_io();
232
233 4816 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
234 {
235 rd->errn = 0;
236 }
237 else
238 {
239 4816 read_op = nullptr;
240 4816 local_ops.push(rd);
241 }
242 }
243 else
244 {
245 16595 read_ready = true;
246 }
247 }
248 67051 if (ev & EPOLLOUT)
249 {
250 62239 bool had_write_op = (connect_op || write_op);
251 62239 if (connect_op)
252 {
253 4816 auto* cn = connect_op;
254 4816 if (err)
255 1 cn->complete(err, 0);
256 else
257 4815 cn->perform_io();
258 4816 connect_op = nullptr;
259 4816 local_ops.push(cn);
260 }
261 62239 if (write_op)
262 {
263 auto* wr = write_op;
264 if (err)
265 wr->complete(err, 0);
266 else
267 wr->perform_io();
268
269 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
270 {
271 wr->errn = 0;
272 }
273 else
274 {
275 write_op = nullptr;
276 local_ops.push(wr);
277 }
278 }
279 62239 if (!had_write_op)
280 57423 write_ready = true;
281 }
282 67051 if (err)
283 {
284 1 if (read_op)
285 {
286 read_op->complete(err, 0);
287 local_ops.push(std::exchange(read_op, nullptr));
288 }
289 1 if (write_op)
290 {
291 write_op->complete(err, 0);
292 local_ops.push(std::exchange(write_op, nullptr));
293 }
294 1 if (connect_op)
295 {
296 connect_op->complete(err, 0);
297 local_ops.push(std::exchange(connect_op, nullptr));
298 }
299 }
300 67051 }
301
302 // Execute first handler inline — the scheduler's work_cleanup
303 // accounts for this as the "consumed" work item
304 67051 scheduler_op* first = local_ops.pop();
305 67051 if (first)
306 {
307 9632 scheduler_->post_deferred_completions(local_ops);
308 9632 (*first)();
309 }
310 else
311 {
312 57419 scheduler_->compensating_work_started();
313 }
314 67051 }
315
316 203 epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
317 203 : epoll_fd_(-1)
318 203 , event_fd_(-1)
319 203 , timer_fd_(-1)
320 203 , outstanding_work_(0)
321 203 , stopped_(false)
322 203 , shutdown_(false)
323 203 , task_running_{false}
324 203 , task_interrupted_(false)
325 406 , state_(0)
326 {
327 203 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
328 203 if (epoll_fd_ < 0)
329 detail::throw_system_error(make_err(errno), "epoll_create1");
330
331 203 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
332 203 if (event_fd_ < 0)
333 {
334 int errn = errno;
335 ::close(epoll_fd_);
336 detail::throw_system_error(make_err(errn), "eventfd");
337 }
338
339 203 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
340 203 if (timer_fd_ < 0)
341 {
342 int errn = errno;
343 ::close(event_fd_);
344 ::close(epoll_fd_);
345 detail::throw_system_error(make_err(errn), "timerfd_create");
346 }
347
348 203 epoll_event ev{};
349 203 ev.events = EPOLLIN | EPOLLET;
350 203 ev.data.ptr = nullptr;
351 203 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
352 {
353 int errn = errno;
354 ::close(timer_fd_);
355 ::close(event_fd_);
356 ::close(epoll_fd_);
357 detail::throw_system_error(make_err(errn), "epoll_ctl");
358 }
359
360 203 epoll_event timer_ev{};
361 203 timer_ev.events = EPOLLIN | EPOLLERR;
362 203 timer_ev.data.ptr = &timer_fd_;
363 203 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
364 {
365 int errn = errno;
366 ::close(timer_fd_);
367 ::close(event_fd_);
368 ::close(epoll_fd_);
369 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
370 }
371
372 203 timer_svc_ = &get_timer_service(ctx, *this);
373 203 timer_svc_->set_on_earliest_changed(
374 timer_service::callback(this, [](void* p) {
375 5029 auto* self = static_cast<epoll_scheduler*>(p);
376 5029 self->timerfd_stale_.store(true, std::memory_order_release);
377 5029 if (self->task_running_.load(std::memory_order_acquire))
378 self->interrupt_reactor();
379 5029 }));
380
381 // Initialize resolver service
382 203 get_resolver_service(ctx, *this);
383
384 // Initialize signal service
385 203 get_signal_service(ctx, *this);
386
387 // Push task sentinel to interleave reactor runs with handler execution
388 203 completed_ops_.push(&task_op_);
389 203 }
390
391 406 epoll_scheduler::~epoll_scheduler()
392 {
393 203 if (timer_fd_ >= 0)
394 203 ::close(timer_fd_);
395 203 if (event_fd_ >= 0)
396 203 ::close(event_fd_);
397 203 if (epoll_fd_ >= 0)
398 203 ::close(epoll_fd_);
399 406 }
400
401 void
402 203 epoll_scheduler::shutdown()
403 {
404 {
405 203 std::unique_lock lock(mutex_);
406 203 shutdown_ = true;
407
408 433 while (auto* h = completed_ops_.pop())
409 {
410 230 if (h == &task_op_)
411 203 continue;
412 27 lock.unlock();
413 27 h->destroy();
414 27 lock.lock();
415 230 }
416
417 203 signal_all(lock);
418 203 }
419
420 203 outstanding_work_.store(0, std::memory_order_release);
421
422 203 if (event_fd_ >= 0)
423 203 interrupt_reactor();
424 203 }
425
426 void
427 6863 epoll_scheduler::post(std::coroutine_handle<> h) const
428 {
429 struct post_handler final : scheduler_op
430 {
431 std::coroutine_handle<> h_;
432
433 6863 explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
434
435 13726 ~post_handler() override = default;
436
437 6863 void operator()() override
438 {
439 6863 auto h = h_;
440 6863 delete this;
441 6863 h.resume();
442 6863 }
443
444 void destroy() override
445 {
446 delete this;
447 }
448 };
449
450 6863 auto ph = std::make_unique<post_handler>(h);
451
452 // Fast path: same thread posts to private queue
453 // Only count locally; work_cleanup batches to global counter
454 6863 if (auto* ctx = find_context(this))
455 {
456 5210 ++ctx->private_outstanding_work;
457 5210 ctx->private_queue.push(ph.release());
458 5210 return;
459 }
460
461 // Slow path: cross-thread post requires mutex
462 1653 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
463
464 1653 std::unique_lock lock(mutex_);
465 1653 completed_ops_.push(ph.release());
466 1653 wake_one_thread_and_unlock(lock);
467 6863 }
468
469 void
470 86655 epoll_scheduler::post(scheduler_op* h) const
471 {
472 // Fast path: same thread posts to private queue
473 // Only count locally; work_cleanup batches to global counter
474 86655 if (auto* ctx = find_context(this))
475 {
476 86629 ++ctx->private_outstanding_work;
477 86629 ctx->private_queue.push(h);
478 86629 return;
479 }
480
481 // Slow path: cross-thread post requires mutex
482 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
483
484 26 std::unique_lock lock(mutex_);
485 26 completed_ops_.push(h);
486 26 wake_one_thread_and_unlock(lock);
487 26 }
488
489 bool
490 697 epoll_scheduler::running_in_this_thread() const noexcept
491 {
492 697 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
493 457 if (c->key == this)
494 457 return true;
495 240 return false;
496 }
497
498 void
499 187 epoll_scheduler::stop()
500 {
501 187 std::unique_lock lock(mutex_);
502 187 if (!stopped_)
503 {
504 164 stopped_ = true;
505 164 signal_all(lock);
506 164 interrupt_reactor();
507 }
508 187 }
509
510 bool
511 18 epoll_scheduler::stopped() const noexcept
512 {
513 18 std::unique_lock lock(mutex_);
514 36 return stopped_;
515 18 }
516
517 void
518 49 epoll_scheduler::restart()
519 {
520 49 std::unique_lock lock(mutex_);
521 49 stopped_ = false;
522 49 }
523
524 std::size_t
525 183 epoll_scheduler::run()
526 {
527 366 if (outstanding_work_.load(std::memory_order_acquire) == 0)
528 {
529 18 stop();
530 18 return 0;
531 }
532
533 165 thread_context_guard ctx(this);
534 165 std::unique_lock lock(mutex_);
535
536 165 std::size_t n = 0;
537 for (;;)
538 {
539 160701 if (!do_one(lock, -1, &ctx.frame_))
540 165 break;
541 160536 if (n != (std::numeric_limits<std::size_t>::max)())
542 160536 ++n;
543 160536 if (!lock.owns_lock())
544 73742 lock.lock();
545 }
546 165 return n;
547 165 }
548
549 std::size_t
550 2 epoll_scheduler::run_one()
551 {
552 4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
553 {
554 stop();
555 return 0;
556 }
557
558 2 thread_context_guard ctx(this);
559 2 std::unique_lock lock(mutex_);
560 2 return do_one(lock, -1, &ctx.frame_);
561 2 }
562
563 std::size_t
564 34 epoll_scheduler::wait_one(long usec)
565 {
566 68 if (outstanding_work_.load(std::memory_order_acquire) == 0)
567 {
568 7 stop();
569 7 return 0;
570 }
571
572 27 thread_context_guard ctx(this);
573 27 std::unique_lock lock(mutex_);
574 27 return do_one(lock, usec, &ctx.frame_);
575 27 }
576
577 std::size_t
578 2 epoll_scheduler::poll()
579 {
580 4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
581 {
582 1 stop();
583 1 return 0;
584 }
585
586 1 thread_context_guard ctx(this);
587 1 std::unique_lock lock(mutex_);
588
589 1 std::size_t n = 0;
590 for (;;)
591 {
592 3 if (!do_one(lock, 0, &ctx.frame_))
593 1 break;
594 2 if (n != (std::numeric_limits<std::size_t>::max)())
595 2 ++n;
596 2 if (!lock.owns_lock())
597 2 lock.lock();
598 }
599 1 return n;
600 1 }
601
602 std::size_t
603 4 epoll_scheduler::poll_one()
604 {
605 8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
606 {
607 2 stop();
608 2 return 0;
609 }
610
611 2 thread_context_guard ctx(this);
612 2 std::unique_lock lock(mutex_);
613 2 return do_one(lock, 0, &ctx.frame_);
614 2 }
615
616 void
617 9703 epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
618 {
619 9703 epoll_event ev{};
620 9703 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
621 9703 ev.data.ptr = desc;
622
623 9703 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
624 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
625
626 9703 desc->registered_events = ev.events;
627 9703 desc->fd = fd;
628 9703 desc->scheduler_ = this;
629
630 9703 std::lock_guard lock(desc->mutex);
631 9703 desc->read_ready = false;
632 9703 desc->write_ready = false;
633 9703 }
634
635 void
636 9703 epoll_scheduler::deregister_descriptor(int fd) const
637 {
638 9703 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
639 9703 }
640
641 void
642 15592 epoll_scheduler::work_started() noexcept
643 {
644 15592 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
645 15592 }
646
647 void
648 22305 epoll_scheduler::work_finished() noexcept
649 {
650 44610 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
651 158 stop();
652 22305 }
653
654 void
655 57419 epoll_scheduler::compensating_work_started() const noexcept
656 {
657 57419 auto* ctx = find_context(this);
658 57419 if (ctx)
659 57419 ++ctx->private_outstanding_work;
660 57419 }
661
662 void
663 epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
664 {
665 // Note: outstanding_work_ was already incremented when posting
666 std::unique_lock lock(mutex_);
667 completed_ops_.splice(queue);
668 if (count > 0)
669 maybe_unlock_and_signal_one(lock);
670 }
671
672 void
673 9632 epoll_scheduler::post_deferred_completions(op_queue& ops) const
674 {
675 9632 if (ops.empty())
676 9632 return;
677
678 // Fast path: if on scheduler thread, use private queue
679 if (auto* ctx = find_context(this))
680 {
681 ctx->private_queue.splice(ops);
682 return;
683 }
684
685 // Slow path: add to global queue and wake a thread
686 std::unique_lock lock(mutex_);
687 completed_ops_.splice(ops);
688 wake_one_thread_and_unlock(lock);
689 }
690
691 void
692 393 epoll_scheduler::interrupt_reactor() const
693 {
694 // Only write if not already armed to avoid redundant writes
695 393 bool expected = false;
696 393 if (eventfd_armed_.compare_exchange_strong(
697 expected, true, std::memory_order_release,
698 std::memory_order_relaxed))
699 {
700 275 std::uint64_t val = 1;
701 275 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
702 }
703 393 }
704
705 void
706 367 epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
707 {
708 367 state_ |= 1;
709 367 cond_.notify_all();
710 367 }
711
712 bool
713 1679 epoll_scheduler::maybe_unlock_and_signal_one(
714 std::unique_lock<std::mutex>& lock) const
715 {
716 1679 state_ |= 1;
717 1679 if (state_ > 1)
718 {
719 lock.unlock();
720 cond_.notify_one();
721 return true;
722 }
723 1679 return false;
724 }
725
726 bool
727 205118 epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
728 {
729 205118 state_ |= 1;
730 205118 bool have_waiters = state_ > 1;
731 205118 lock.unlock();
732 205118 if (have_waiters)
733 cond_.notify_one();
734 205118 return have_waiters;
735 }
736
737 void
738 1 epoll_scheduler::clear_signal() const
739 {
740 1 state_ &= ~std::size_t(1);
741 1 }
742
743 void
744 1 epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
745 {
746 2 while ((state_ & 1) == 0)
747 {
748 1 state_ += 2;
749 1 cond_.wait(lock);
750 1 state_ -= 2;
751 }
752 1 }
753
754 void
755 epoll_scheduler::wait_for_signal_for(
756 std::unique_lock<std::mutex>& lock, long timeout_us) const
757 {
758 if ((state_ & 1) == 0)
759 {
760 state_ += 2;
761 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
762 state_ -= 2;
763 }
764 }
765
766 void
767 1679 epoll_scheduler::wake_one_thread_and_unlock(
768 std::unique_lock<std::mutex>& lock) const
769 {
770 1679 if (maybe_unlock_and_signal_one(lock))
771 return;
772
773 1679 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
774 {
775 26 task_interrupted_ = true;
776 26 lock.unlock();
777 26 interrupt_reactor();
778 }
779 else
780 {
781 1653 lock.unlock();
782 }
783 }
784
785 /** RAII guard for handler execution work accounting.
786
787 Handler consumes 1 work item, may produce N new items via fast-path posts.
788 Net change = N - 1:
789 - If N > 1: add (N-1) to global (more work produced than consumed)
790 - If N == 1: net zero, do nothing
791 - If N < 1: call work_finished() (work consumed, may trigger stop)
792
793 Also drains private queue to global for other threads to process.
794 */
795 struct work_cleanup
796 {
797 epoll_scheduler* scheduler;
798 std::unique_lock<std::mutex>* lock;
799 scheduler_context* ctx;
800
801 160569 ~work_cleanup()
802 {
803 160569 if (ctx)
804 {
805 160569 long produced = ctx->private_outstanding_work;
806 160569 if (produced > 1)
807 7 scheduler->outstanding_work_.fetch_add(
808 produced - 1, std::memory_order_relaxed);
809 160562 else if (produced < 1)
810 16345 scheduler->work_finished();
811 // produced == 1: net zero, handler consumed what it produced
812 160569 ctx->private_outstanding_work = 0;
813
814 160569 if (!ctx->private_queue.empty())
815 {
816 86805 lock->lock();
817 86805 scheduler->completed_ops_.splice(ctx->private_queue);
818 }
819 }
820 else
821 {
822 // No thread context - slow-path op was already counted globally
823 scheduler->work_finished();
824 }
825 160569 }
826 };
827
828 /** RAII guard for reactor work accounting.
829
830 Reactor only produces work via timer/signal callbacks posting handlers.
831 Unlike handler execution which consumes 1, the reactor consumes nothing.
832 All produced work must be flushed to global counter.
833 */
834 struct task_cleanup
835 {
836 epoll_scheduler const* scheduler;
837 std::unique_lock<std::mutex>* lock;
838 scheduler_context* ctx;
839
840 54417 ~task_cleanup()
841 54417 {
842 54417 if (!ctx)
843 return;
844
845 54417 if (ctx->private_outstanding_work > 0)
846 {
847 5021 scheduler->outstanding_work_.fetch_add(
848 5021 ctx->private_outstanding_work, std::memory_order_relaxed);
849 5021 ctx->private_outstanding_work = 0;
850 }
851
852 54417 if (!ctx->private_queue.empty())
853 {
854 5021 if (!lock->owns_lock())
855 lock->lock();
856 5021 scheduler->completed_ops_.splice(ctx->private_queue);
857 }
858 54417 }
859 };
860
861 void
862 10038 epoll_scheduler::update_timerfd() const
863 {
864 10038 auto nearest = timer_svc_->nearest_expiry();
865
866 10038 itimerspec ts{};
867 10038 int flags = 0;
868
869 10038 if (nearest == timer_service::time_point::max())
870 {
871 // No timers - disarm by setting to 0 (relative)
872 }
873 else
874 {
875 9993 auto now = std::chrono::steady_clock::now();
876 9993 if (nearest <= now)
877 {
878 // Use 1ns instead of 0 - zero disarms the timerfd
879 162 ts.it_value.tv_nsec = 1;
880 }
881 else
882 {
883 9831 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
884 9831 nearest - now)
885 9831 .count();
886 9831 ts.it_value.tv_sec = nsec / 1000000000;
887 9831 ts.it_value.tv_nsec = nsec % 1000000000;
888 // Ensure non-zero to avoid disarming if duration rounds to 0
889 9831 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
890 ts.it_value.tv_nsec = 1;
891 }
892 }
893
894 10038 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
895 detail::throw_system_error(make_err(errno), "timerfd_settime");
896 10038 }
897
898 void
899 54417 epoll_scheduler::run_task(
900 std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
901 {
902 54417 int timeout_ms = task_interrupted_ ? 0 : -1;
903
904 54417 if (lock.owns_lock())
905 9868 lock.unlock();
906
907 54417 task_cleanup on_exit{this, &lock, ctx};
908
909 // Flush deferred timerfd programming before blocking
910 54417 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
911 5017 update_timerfd();
912
913 // Event loop runs without mutex held
914 epoll_event events[128];
915 54417 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
916
917 54417 if (nfds < 0 && errno != EINTR)
918 detail::throw_system_error(make_err(errno), "epoll_wait");
919
920 54417 bool check_timers = false;
921 54417 op_queue local_ops;
922
923 // Process events without holding the mutex
924 126588 for (int i = 0; i < nfds; ++i)
925 {
926 72171 if (events[i].data.ptr == nullptr)
927 {
928 std::uint64_t val;
929 // Mutex released above; analyzer can't track unlock via ref
930 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
931 72 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
932 72 eventfd_armed_.store(false, std::memory_order_relaxed);
933 72 continue;
934 72 }
935
936 72099 if (events[i].data.ptr == &timer_fd_)
937 {
938 std::uint64_t expirations;
939 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
940 [[maybe_unused]] auto r =
941 5021 ::read(timer_fd_, &expirations, sizeof(expirations));
942 5021 check_timers = true;
943 5021 continue;
944 5021 }
945
946 // Deferred I/O: just set ready events and enqueue descriptor
947 // No per-descriptor mutex locking in reactor hot path!
948 67078 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
949 67078 desc->add_ready_events(events[i].events);
950
951 // Only enqueue if not already enqueued
952 67078 bool expected = false;
953 67078 if (desc->is_enqueued_.compare_exchange_strong(
954 expected, true, std::memory_order_release,
955 std::memory_order_relaxed))
956 {
957 67078 local_ops.push(desc);
958 }
959 }
960
961 // Process timers only when timerfd fires
962 54417 if (check_timers)
963 {
964 5021 timer_svc_->process_expired();
965 5021 update_timerfd();
966 }
967
968 54417 lock.lock();
969
970 54417 if (!local_ops.empty())
971 44093 completed_ops_.splice(local_ops);
972 54417 }
973
974 std::size_t
975 160735 epoll_scheduler::do_one(
976 std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
977 {
978 for (;;)
979 {
980 215153 if (stopped_)
981 165 return 0;
982
983 214988 scheduler_op* op = completed_ops_.pop();
984
985 // Handle reactor sentinel - time to poll for I/O
986 214988 if (op == &task_op_)
987 {
988 54418 bool more_handlers = !completed_ops_.empty();
989
990 // Nothing to run the reactor for: no pending work to wait on,
991 // or caller requested a non-blocking poll
992 64287 if (!more_handlers &&
993 19738 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
994 timeout_us == 0))
995 {
996 1 completed_ops_.push(&task_op_);
997 1 return 0;
998 }
999
1000 54417 task_interrupted_ = more_handlers || timeout_us == 0;
1001 54417 task_running_.store(true, std::memory_order_release);
1002
1003 54417 if (more_handlers)
1004 44549 unlock_and_signal_one(lock);
1005
1006 54417 run_task(lock, ctx);
1007
1008 54417 task_running_.store(false, std::memory_order_relaxed);
1009 54417 completed_ops_.push(&task_op_);
1010 54417 continue;
1011 54417 }
1012
1013 // Handle operation
1014 160570 if (op != nullptr)
1015 {
1016 160569 bool more = !completed_ops_.empty();
1017
1018 160569 if (more)
1019 160569 ctx->unassisted = !unlock_and_signal_one(lock);
1020 else
1021 {
1022 ctx->unassisted = false;
1023 lock.unlock();
1024 }
1025
1026 160569 work_cleanup on_exit{this, &lock, ctx};
1027
1028 160569 (*op)();
1029 160569 return 1;
1030 160569 }
1031
1032 // No pending work to wait on, or caller requested non-blocking poll
1033 2 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1034 timeout_us == 0)
1035 return 0;
1036
1037 1 clear_signal();
1038 1 if (timeout_us < 0)
1039 1 wait_for_signal(lock);
1040 else
1041 wait_for_signal_for(lock, timeout_us);
1042 54418 }
1043 }
1044
1045 } // namespace boost::corosio::detail
1046
1047 #endif
1048