src/corosio/src/detail/select/scheduler.hpp

0.0% Lines (0/2) 0.0% Functions (0/2)
src/corosio/src/detail/select/scheduler.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_SELECT_SCHEDULER_HPP
11 #define BOOST_COROSIO_DETAIL_SELECT_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include "src/detail/scheduler_impl.hpp"
21 #include "src/detail/scheduler_op.hpp"
22
23 #include <sys/select.h>
24
25 #include <atomic>
26 #include <condition_variable>
27 #include <cstddef>
28 #include <mutex>
29 #include <unordered_map>
30
31 namespace boost::corosio::detail {
32
33 struct select_op;
34
35 /** POSIX scheduler using select() for I/O multiplexing.
36
37 This scheduler implements the scheduler interface using the POSIX select()
38 call for I/O event notification. It uses a single reactor model
39 where one thread runs select() while other threads wait on a condition
40 variable for handler work. This design provides:
41
42 - Handler parallelism: N posted handlers can execute on N threads
43 - No thundering herd: condition_variable wakes exactly one thread
44 - Portability: Works on all POSIX systems
45
46 The design mirrors epoll_scheduler for behavioral consistency:
47 - Same single-reactor thread coordination model
48 - Same work counting semantics
49 - Same timer integration pattern
50
51 Known Limitations:
52 - FD_SETSIZE (~1024) limits maximum concurrent connections
53 - O(n) scanning: rebuilds fd_sets each iteration
54 - Level-triggered only (no edge-triggered mode)
55
56 @par Thread Safety
57 All public member functions are thread-safe.
58 */
59 class select_scheduler final
60 : public scheduler_impl
61 , public capy::execution_context::service
62 {
63 public:
64 using key_type = scheduler;
65
66 /** Construct the scheduler.
67
68 Creates a self-pipe for reactor interruption.
69
70 @param ctx Reference to the owning execution_context.
71 @param concurrency_hint Hint for expected thread count (unused).
72 */
73 select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
74
75 ~select_scheduler() override;
76
77 select_scheduler(select_scheduler const&) = delete;
78 select_scheduler& operator=(select_scheduler const&) = delete;
79
80 void shutdown() override;
81 void post(std::coroutine_handle<> h) const override;
82 void post(scheduler_op* h) const override;
83 bool running_in_this_thread() const noexcept override;
84 void stop() override;
85 bool stopped() const noexcept override;
86 void restart() override;
87 std::size_t run() override;
88 std::size_t run_one() override;
89 std::size_t wait_one(long usec) override;
90 std::size_t poll() override;
91 std::size_t poll_one() override;
92
93 /** Return the maximum file descriptor value supported.
94
95 Returns FD_SETSIZE - 1, the maximum fd value that can be
96 monitored by select(). Operations with fd >= FD_SETSIZE
97 will fail with EINVAL.
98
99 @return The maximum supported file descriptor value.
100 */
101 static constexpr int max_fd() noexcept
102 {
103 return FD_SETSIZE - 1;
104 }
105
106 /** Register a file descriptor for monitoring.
107
108 @param fd The file descriptor to register.
109 @param op The operation associated with this fd.
110 @param events Event mask: 1 = read, 2 = write, 3 = both.
111 */
112 void register_fd(int fd, select_op* op, int events) const;
113
114 /** Unregister a file descriptor from monitoring.
115
116 @param fd The file descriptor to unregister.
117 @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
118 */
119 void deregister_fd(int fd, int events) const;
120
121 void work_started() noexcept override;
122 void work_finished() noexcept override;
123
124 // Event flags for register_fd/deregister_fd
125 static constexpr int event_read = 1;
126 static constexpr int event_write = 2;
127
128 private:
129 std::size_t do_one(long timeout_us);
130 void run_reactor(std::unique_lock<std::mutex>& lock);
131 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
132 void interrupt_reactor() const;
133 long calculate_timeout(long requested_timeout_us) const;
134
135 // Self-pipe for interrupting select()
136 int pipe_fds_[2]; // [0]=read, [1]=write
137
138 mutable std::mutex mutex_;
139 mutable std::condition_variable wakeup_event_;
140 mutable op_queue completed_ops_;
141 mutable std::atomic<long> outstanding_work_;
142 std::atomic<bool> stopped_;
143 bool shutdown_;
144
145 // Per-fd state for tracking registered operations
146 struct fd_state
147 {
148 select_op* read_op = nullptr;
149 select_op* write_op = nullptr;
150 };
151 mutable std::unordered_map<int, fd_state> registered_fds_;
152 mutable int max_fd_ = -1;
153
154 // Single reactor thread coordination
155 mutable bool reactor_running_ = false;
156 mutable bool reactor_interrupted_ = false;
157 mutable int idle_thread_count_ = 0;
158
159 // Sentinel operation for interleaving reactor runs with handler execution.
160 // Ensures the reactor runs periodically even when handlers are continuously
161 // posted, preventing timer starvation.
162 struct task_op final : scheduler_op
163 {
164 void operator()() override {}
165 void destroy() override {}
166 };
167 task_op task_op_;
168 };
169
170 } // namespace boost::corosio::detail
171
172 #endif // BOOST_COROSIO_HAS_SELECT
173
174 #endif // BOOST_COROSIO_DETAIL_SELECT_SCHEDULER_HPP
175