src/corosio/src/detail/select/scheduler.cpp

72.7% Lines (261/359) 87.1% Functions (27/31)
src/corosio/src/detail/select/scheduler.cpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_SELECT
13
14 #include "src/detail/select/scheduler.hpp"
15 #include "src/detail/select/op.hpp"
16 #include "src/detail/timer_service.hpp"
17 #include "src/detail/make_err.hpp"
18 #include "src/detail/posix/resolver_service.hpp"
19 #include "src/detail/posix/signals.hpp"
20
21 #include <boost/corosio/detail/except.hpp>
22 #include <boost/corosio/detail/thread_local_ptr.hpp>
23
24 #include <algorithm>
25 #include <chrono>
26 #include <limits>
27
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <sys/select.h>
31 #include <sys/socket.h>
32 #include <unistd.h>
33
34 /*
35 select Scheduler - Single Reactor Model
36 =======================================
37
38 This scheduler mirrors the epoll_scheduler design but uses select() instead
39 of epoll for I/O multiplexing. The thread coordination strategy is identical:
40 one thread becomes the "reactor" while others wait on a condition variable.
41
42 Thread Model
43 ------------
44 - ONE thread runs select() at a time (the reactor thread)
45 - OTHER threads wait on wakeup_event_ (condition variable) for handlers
46 - When work is posted, exactly one waiting thread wakes via notify_one()
47
48 Key Differences from epoll
49 --------------------------
50 - Uses self-pipe instead of eventfd for interruption (more portable)
51 - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
52 - FD_SETSIZE limit (~1024 fds on most systems)
53 - Level-triggered only (no edge-triggered mode)
54
55 Self-Pipe Pattern
56 -----------------
57 To interrupt a blocking select() call (e.g., when work is posted or a timer
58 expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
59 always in the read_fds set, so select() returns immediately. We drain the
60 pipe to clear the readable state.
61
62 fd-to-op Mapping
63 ----------------
64 We use an unordered_map<int, fd_state> to track which operations are
65 registered for each fd. This allows O(1) lookup when select() returns
66 ready fds. Each fd can have at most one read op and one write op registered.
67 */
68
69 namespace boost::corosio::detail {
70
71 namespace {
72
73 struct scheduler_context
74 {
75 select_scheduler const* key;
76 scheduler_context* next;
77 };
78
79 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
80
81 struct thread_context_guard
82 {
83 scheduler_context frame_;
84
85 120 explicit thread_context_guard(select_scheduler const* ctx) noexcept
86 120 : frame_{ctx, context_stack.get()}
87 {
88 120 context_stack.set(&frame_);
89 120 }
90
91 120 ~thread_context_guard() noexcept
92 {
93 120 context_stack.set(frame_.next);
94 120 }
95 };
96
97 } // namespace
98
99 133 select_scheduler::select_scheduler(capy::execution_context& ctx, int)
100 133 : pipe_fds_{-1, -1}
101 133 , outstanding_work_(0)
102 133 , stopped_(false)
103 133 , shutdown_(false)
104 133 , max_fd_(-1)
105 133 , reactor_running_(false)
106 133 , reactor_interrupted_(false)
107 266 , idle_thread_count_(0)
108 {
109 // Create self-pipe for interrupting select()
110 133 if (::pipe(pipe_fds_) < 0)
111 detail::throw_system_error(make_err(errno), "pipe");
112
113 // Set both ends to non-blocking and close-on-exec
114 399 for (int i = 0; i < 2; ++i)
115 {
116 266 int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
117 266 if (flags == -1)
118 {
119 int errn = errno;
120 ::close(pipe_fds_[0]);
121 ::close(pipe_fds_[1]);
122 detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
123 }
124 266 if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
125 {
126 int errn = errno;
127 ::close(pipe_fds_[0]);
128 ::close(pipe_fds_[1]);
129 detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
130 }
131 266 if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
132 {
133 int errn = errno;
134 ::close(pipe_fds_[0]);
135 ::close(pipe_fds_[1]);
136 detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
137 }
138 }
139
140 133 timer_svc_ = &get_timer_service(ctx, *this);
141 133 timer_svc_->set_on_earliest_changed(
142 timer_service::callback(this, [](void* p) {
143 3779 static_cast<select_scheduler*>(p)->interrupt_reactor();
144 3779 }));
145
146 // Initialize resolver service
147 133 get_resolver_service(ctx, *this);
148
149 // Initialize signal service
150 133 get_signal_service(ctx, *this);
151
152 // Push task sentinel to interleave reactor runs with handler execution
153 133 completed_ops_.push(&task_op_);
154 133 }
155
156 266 select_scheduler::~select_scheduler()
157 {
158 133 if (pipe_fds_[0] >= 0)
159 133 ::close(pipe_fds_[0]);
160 133 if (pipe_fds_[1] >= 0)
161 133 ::close(pipe_fds_[1]);
162 266 }
163
164 void
165 133 select_scheduler::shutdown()
166 {
167 {
168 133 std::unique_lock lock(mutex_);
169 133 shutdown_ = true;
170
171 266 while (auto* h = completed_ops_.pop())
172 {
173 133 if (h == &task_op_)
174 133 continue;
175 lock.unlock();
176 h->destroy();
177 lock.lock();
178 133 }
179 133 }
180
181 133 outstanding_work_.store(0, std::memory_order_release);
182
183 133 if (pipe_fds_[1] >= 0)
184 133 interrupt_reactor();
185
186 133 wakeup_event_.notify_all();
187 133 }
188
189 void
190 4117 select_scheduler::post(std::coroutine_handle<> h) const
191 {
192 struct post_handler final : scheduler_op
193 {
194 std::coroutine_handle<> h_;
195
196 4117 explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
197
198 8234 ~post_handler() override = default;
199
200 4117 void operator()() override
201 {
202 4117 auto h = h_;
203 4117 delete this;
204 4117 h.resume();
205 4117 }
206
207 void destroy() override
208 {
209 delete this;
210 }
211 };
212
213 4117 auto ph = std::make_unique<post_handler>(h);
214 4117 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
215
216 4117 std::unique_lock lock(mutex_);
217 4117 completed_ops_.push(ph.release());
218 4117 wake_one_thread_and_unlock(lock);
219 4117 }
220
221 void
222 251812 select_scheduler::post(scheduler_op* h) const
223 {
224 251812 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
225
226 251812 std::unique_lock lock(mutex_);
227 251812 completed_ops_.push(h);
228 251812 wake_one_thread_and_unlock(lock);
229 251812 }
230
231 bool
232 553 select_scheduler::running_in_this_thread() const noexcept
233 {
234 553 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
235 369 if (c->key == this)
236 369 return true;
237 184 return false;
238 }
239
240 void
241 99 select_scheduler::stop()
242 {
243 99 bool expected = false;
244 99 if (stopped_.compare_exchange_strong(
245 expected, true, std::memory_order_release,
246 std::memory_order_relaxed))
247 {
248 // Wake all threads so they notice stopped_ and exit
249 {
250 99 std::lock_guard lock(mutex_);
251 99 wakeup_event_.notify_all();
252 99 }
253 99 interrupt_reactor();
254 }
255 99 }
256
257 bool
258 3 select_scheduler::stopped() const noexcept
259 {
260 3 return stopped_.load(std::memory_order_acquire);
261 }
262
263 void
264 34 select_scheduler::restart()
265 {
266 34 stopped_.store(false, std::memory_order_release);
267 34 }
268
269 std::size_t
270 96 select_scheduler::run()
271 {
272 96 if (stopped_.load(std::memory_order_acquire))
273 return 0;
274
275 192 if (outstanding_work_.load(std::memory_order_acquire) == 0)
276 {
277 stop();
278 return 0;
279 }
280
281 96 thread_context_guard ctx(this);
282
283 96 std::size_t n = 0;
284 263245 while (do_one(-1))
285 263149 if (n != (std::numeric_limits<std::size_t>::max)())
286 263149 ++n;
287 96 return n;
288 96 }
289
290 std::size_t
291 select_scheduler::run_one()
292 {
293 if (stopped_.load(std::memory_order_acquire))
294 return 0;
295
296 if (outstanding_work_.load(std::memory_order_acquire) == 0)
297 {
298 stop();
299 return 0;
300 }
301
302 thread_context_guard ctx(this);
303 return do_one(-1);
304 }
305
306 std::size_t
307 27 select_scheduler::wait_one(long usec)
308 {
309 27 if (stopped_.load(std::memory_order_acquire))
310 3 return 0;
311
312 48 if (outstanding_work_.load(std::memory_order_acquire) == 0)
313 {
314 stop();
315 return 0;
316 }
317
318 24 thread_context_guard ctx(this);
319 24 return do_one(usec);
320 24 }
321
322 std::size_t
323 select_scheduler::poll()
324 {
325 if (stopped_.load(std::memory_order_acquire))
326 return 0;
327
328 if (outstanding_work_.load(std::memory_order_acquire) == 0)
329 {
330 stop();
331 return 0;
332 }
333
334 thread_context_guard ctx(this);
335
336 std::size_t n = 0;
337 while (do_one(0))
338 if (n != (std::numeric_limits<std::size_t>::max)())
339 ++n;
340 return n;
341 }
342
343 std::size_t
344 select_scheduler::poll_one()
345 {
346 if (stopped_.load(std::memory_order_acquire))
347 return 0;
348
349 if (outstanding_work_.load(std::memory_order_acquire) == 0)
350 {
351 stop();
352 return 0;
353 }
354
355 thread_context_guard ctx(this);
356 return do_one(0);
357 }
358
359 void
360 7407 select_scheduler::register_fd(int fd, select_op* op, int events) const
361 {
362 // Validate fd is within select() limits
363 7407 if (fd < 0 || fd >= FD_SETSIZE)
364 detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
365
366 {
367 7407 std::lock_guard lock(mutex_);
368
369 7407 auto& state = registered_fds_[fd];
370 7407 if (events & event_read)
371 3845 state.read_op = op;
372 7407 if (events & event_write)
373 3562 state.write_op = op;
374
375 7407 if (fd > max_fd_)
376 228 max_fd_ = fd;
377 7407 }
378
379 // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
380 // with the newly registered fd.
381 7407 interrupt_reactor();
382 7407 }
383
384 void
385 7339 select_scheduler::deregister_fd(int fd, int events) const
386 {
387 7339 std::lock_guard lock(mutex_);
388
389 7339 auto it = registered_fds_.find(fd);
390 7339 if (it == registered_fds_.end())
391 7176 return;
392
393 163 if (events & event_read)
394 163 it->second.read_op = nullptr;
395 163 if (events & event_write)
396 it->second.write_op = nullptr;
397
398 // Remove entry if both are null
399 163 if (!it->second.read_op && !it->second.write_op)
400 {
401 163 registered_fds_.erase(it);
402
403 // Recalculate max_fd_ if needed
404 163 if (fd == max_fd_)
405 {
406 162 max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
407 162 for (auto& [registered_fd, state] : registered_fds_)
408 {
409 if (registered_fd > max_fd_)
410 max_fd_ = registered_fd;
411 }
412 }
413 }
414 7339 }
415
416 void
417 11761 select_scheduler::work_started() noexcept
418 {
419 11761 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
420 11761 }
421
422 void
423 267690 select_scheduler::work_finished() noexcept
424 {
425 535380 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
426 99 stop();
427 267690 }
428
429 void
430 15188 select_scheduler::interrupt_reactor() const
431 {
432 15188 char byte = 1;
433 15188 [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
434 15188 }
435
436 void
437 255929 select_scheduler::wake_one_thread_and_unlock(
438 std::unique_lock<std::mutex>& lock) const
439 {
440 255929 if (idle_thread_count_ > 0)
441 {
442 // Idle worker exists - wake it via condvar
443 wakeup_event_.notify_one();
444 lock.unlock();
445 }
446 255929 else if (reactor_running_ && !reactor_interrupted_)
447 {
448 // No idle workers but reactor is running - interrupt it
449 3770 reactor_interrupted_ = true;
450 3770 lock.unlock();
451 3770 interrupt_reactor();
452 }
453 else
454 {
455 // No one to wake
456 252159 lock.unlock();
457 }
458 255929 }
459
460 struct work_guard
461 {
462 select_scheduler* self;
463 263173 ~work_guard()
464 {
465 263173 self->work_finished();
466 263173 }
467 };
468
469 long
470 10942 select_scheduler::calculate_timeout(long requested_timeout_us) const
471 {
472 10942 if (requested_timeout_us == 0)
473 return 0;
474
475 10942 auto nearest = timer_svc_->nearest_expiry();
476 10942 if (nearest == timer_service::time_point::max())
477 35 return requested_timeout_us;
478
479 10907 auto now = std::chrono::steady_clock::now();
480 10907 if (nearest <= now)
481 76 return 0;
482
483 auto timer_timeout_us =
484 10831 std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
485 10831 .count();
486
487 // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
488 10831 constexpr auto long_max =
489 static_cast<long long>((std::numeric_limits<long>::max)());
490 10831 auto capped_timer_us = (std::min)(
491 21662 (std::max)(static_cast<long long>(timer_timeout_us),
492 10831 static_cast<long long>(0)),
493 10831 long_max);
494
495 10831 if (requested_timeout_us < 0)
496 10831 return static_cast<long>(capped_timer_us);
497
498 // requested_timeout_us is already long, so min() result fits in long
499 return static_cast<long>((std::min)(
500 static_cast<long long>(requested_timeout_us),
501 capped_timer_us));
502 }
503
504 void
505 139367 select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
506 {
507 // Calculate timeout considering timers, use 0 if interrupted
508 long effective_timeout_us =
509 139367 reactor_interrupted_ ? 0 : calculate_timeout(-1);
510
511 // Build fd_sets from registered_fds_
512 fd_set read_fds, write_fds, except_fds;
513 2369239 FD_ZERO(&read_fds);
514 2369239 FD_ZERO(&write_fds);
515 2369239 FD_ZERO(&except_fds);
516
517 // Always include the interrupt pipe
518 139367 FD_SET(pipe_fds_[0], &read_fds);
519 139367 int nfds = pipe_fds_[0];
520
521 // Add registered fds
522 157165 for (auto& [fd, state] : registered_fds_)
523 {
524 17798 if (state.read_op)
525 14236 FD_SET(fd, &read_fds);
526 17798 if (state.write_op)
527 {
528 3562 FD_SET(fd, &write_fds);
529 // Also monitor for errors on connect operations
530 3562 FD_SET(fd, &except_fds);
531 }
532 17798 if (fd > nfds)
533 14239 nfds = fd;
534 }
535
536 // Convert timeout to timeval
537 struct timeval tv;
538 139367 struct timeval* tv_ptr = nullptr;
539 139367 if (effective_timeout_us >= 0)
540 {
541 139332 tv.tv_sec = effective_timeout_us / 1000000;
542 139332 tv.tv_usec = effective_timeout_us % 1000000;
543 139332 tv_ptr = &tv;
544 }
545
546 139367 lock.unlock();
547
548 139367 int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
549 139367 int saved_errno = errno;
550
551 // Process timers outside the lock
552 139367 timer_svc_->process_expired();
553
554 139367 if (ready < 0 && saved_errno != EINTR)
555 detail::throw_system_error(make_err(saved_errno), "select");
556
557 // Re-acquire lock before modifying completed_ops_
558 139367 lock.lock();
559
560 // Drain the interrupt pipe if readable
561 139367 if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
562 {
563 char buf[256];
564 22606 while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
565 {
566 }
567 }
568
569 // Process I/O completions
570 139367 int completions_queued = 0;
571 139367 if (ready > 0)
572 {
573 // Iterate over registered fds (copy keys to avoid iterator invalidation)
574 11303 std::vector<int> fds_to_check;
575 11303 fds_to_check.reserve(registered_fds_.size());
576 25573 for (auto& [fd, state] : registered_fds_)
577 14270 fds_to_check.push_back(fd);
578
579 25573 for (int fd : fds_to_check)
580 {
581 14270 auto it = registered_fds_.find(fd);
582 14270 if (it == registered_fds_.end())
583 continue;
584
585 14270 auto& state = it->second;
586
587 // Check for errors (especially for connect operations)
588 14270 bool has_error = FD_ISSET(fd, &except_fds);
589
590 // Process read readiness
591 14270 if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
592 {
593 3682 auto* op = state.read_op;
594 // Claim the op by exchanging to unregistered. Both registering and
595 // registered states mean the op is ours to complete.
596 3682 auto prev = op->registered.exchange(
597 select_registration_state::unregistered,
598 std::memory_order_acq_rel);
599 3682 if (prev != select_registration_state::unregistered)
600 {
601 3682 state.read_op = nullptr;
602
603 3682 if (has_error)
604 {
605 int errn = 0;
606 socklen_t len = sizeof(errn);
607 if (::getsockopt(
608 fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
609 errn = errno;
610 if (errn == 0)
611 errn = EIO;
612 op->complete(errn, 0);
613 }
614 else
615 {
616 3682 op->perform_io();
617 }
618
619 3682 completed_ops_.push(op);
620 3682 ++completions_queued;
621 }
622 }
623
624 // Process write readiness
625 14270 if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
626 {
627 3562 auto* op = state.write_op;
628 // Claim the op by exchanging to unregistered. Both registering and
629 // registered states mean the op is ours to complete.
630 3562 auto prev = op->registered.exchange(
631 select_registration_state::unregistered,
632 std::memory_order_acq_rel);
633 3562 if (prev != select_registration_state::unregistered)
634 {
635 3562 state.write_op = nullptr;
636
637 3562 if (has_error)
638 {
639 int errn = 0;
640 socklen_t len = sizeof(errn);
641 if (::getsockopt(
642 fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
643 errn = errno;
644 if (errn == 0)
645 errn = EIO;
646 op->complete(errn, 0);
647 }
648 else
649 {
650 3562 op->perform_io();
651 }
652
653 3562 completed_ops_.push(op);
654 3562 ++completions_queued;
655 }
656 }
657
658 // Clean up empty entries
659 14270 if (!state.read_op && !state.write_op)
660 7244 registered_fds_.erase(it);
661 }
662 11303 }
663
664 139367 if (completions_queued > 0)
665 {
666 3685 if (completions_queued == 1)
667 126 wakeup_event_.notify_one();
668 else
669 3559 wakeup_event_.notify_all();
670 }
671 139367 }
672
673 std::size_t
674 263269 select_scheduler::do_one(long timeout_us)
675 {
676 263269 std::unique_lock lock(mutex_);
677
678 for (;;)
679 {
680 402636 if (stopped_.load(std::memory_order_acquire))
681 96 return 0;
682
683 402540 scheduler_op* op = completed_ops_.pop();
684
685 402540 if (op == &task_op_)
686 {
687 139367 bool more_handlers = !completed_ops_.empty();
688
689 139367 if (!more_handlers)
690 {
691 21884 if (outstanding_work_.load(std::memory_order_acquire) == 0)
692 {
693 completed_ops_.push(&task_op_);
694 return 0;
695 }
696 10942 if (timeout_us == 0)
697 {
698 completed_ops_.push(&task_op_);
699 return 0;
700 }
701 }
702
703 139367 reactor_interrupted_ = more_handlers || timeout_us == 0;
704 139367 reactor_running_ = true;
705
706 139367 if (more_handlers && idle_thread_count_ > 0)
707 wakeup_event_.notify_one();
708
709 139367 run_reactor(lock);
710
711 139367 reactor_running_ = false;
712 139367 completed_ops_.push(&task_op_);
713 139367 continue;
714 139367 }
715
716 263173 if (op != nullptr)
717 {
718 263173 lock.unlock();
719 263173 work_guard g{this};
720 263173 (*op)();
721 263173 return 1;
722 263173 }
723
724 if (outstanding_work_.load(std::memory_order_acquire) == 0)
725 return 0;
726
727 if (timeout_us == 0)
728 return 0;
729
730 ++idle_thread_count_;
731 if (timeout_us < 0)
732 wakeup_event_.wait(lock);
733 else
734 wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
735 --idle_thread_count_;
736 139367 }
737 263269 }
738
739 } // namespace boost::corosio::detail
740
741 #endif // BOOST_COROSIO_HAS_SELECT
742