src/corosio/src/detail/epoll/scheduler.hpp

0.0% Lines (0/2) 0.0% Functions (0/2)
src/corosio/src/detail/epoll/scheduler.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include "src/detail/scheduler_impl.hpp"
21 #include "src/detail/scheduler_op.hpp"
22
23 #include <atomic>
24 #include <condition_variable>
25 #include <cstddef>
26 #include <cstdint>
27 #include <mutex>
28
29 namespace boost::corosio::detail {
30
31 struct epoll_op;
32 struct descriptor_state;
33 struct scheduler_context;
34
35 /** Linux scheduler using epoll for I/O multiplexing.
36
37 This scheduler implements the scheduler interface using Linux epoll
38 for efficient I/O event notification. It uses a single reactor model
39 where one thread runs epoll_wait while other threads
40 wait on a condition variable for handler work. This design provides:
41
42 - Handler parallelism: N posted handlers can execute on N threads
43 - No thundering herd: condition_variable wakes exactly one thread
44 - IOCP parity: Behavior matches Windows I/O completion port semantics
45
46 When threads call run(), they first try to execute queued handlers.
47 If the queue is empty and no reactor is running, one thread becomes
48 the reactor and runs epoll_wait. Other threads wait on a condition
49 variable until handlers are available.
50
51 @par Thread Safety
52 All public member functions are thread-safe.
53 */
54 class epoll_scheduler final
55 : public scheduler_impl
56 , public capy::execution_context::service
57 {
58 public:
59 using key_type = scheduler;
60
61 /** Construct the scheduler.
62
63 Creates an epoll instance, eventfd for reactor interruption,
64 and timerfd for kernel-managed timer expiry.
65
66 @param ctx Reference to the owning execution_context.
67 @param concurrency_hint Hint for expected thread count (unused).
68 */
69 epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
70
71 /// Destroy the scheduler.
72 ~epoll_scheduler() override;
73
74 epoll_scheduler(epoll_scheduler const&) = delete;
75 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
76
77 void shutdown() override;
78 void post(std::coroutine_handle<> h) const override;
79 void post(scheduler_op* h) const override;
80 bool running_in_this_thread() const noexcept override;
81 void stop() override;
82 bool stopped() const noexcept override;
83 void restart() override;
84 std::size_t run() override;
85 std::size_t run_one() override;
86 std::size_t wait_one(long usec) override;
87 std::size_t poll() override;
88 std::size_t poll_one() override;
89
90 /** Return the epoll file descriptor.
91
92 Used by socket services to register file descriptors
93 for I/O event notification.
94
95 @return The epoll file descriptor.
96 */
97 int epoll_fd() const noexcept
98 {
99 return epoll_fd_;
100 }
101
102 /** Reset the thread's inline completion budget.
103
104 Called at the start of each posted completion handler to
105 grant a fresh budget for speculative inline completions.
106 */
107 void reset_inline_budget() const noexcept;
108
109 /** Consume one unit of inline budget if available.
110
111 @return True if budget was available and consumed.
112 */
113 bool try_consume_inline_budget() const noexcept;
114
115 /** Register a descriptor for persistent monitoring.
116
117 The fd is registered once and stays registered until explicitly
118 deregistered. Events are dispatched via descriptor_state which
119 tracks pending read/write/connect operations.
120
121 @param fd The file descriptor to register.
122 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
123 */
124 void register_descriptor(int fd, descriptor_state* desc) const;
125
126 /** Deregister a persistently registered descriptor.
127
128 @param fd The file descriptor to deregister.
129 */
130 void deregister_descriptor(int fd) const;
131
132 void work_started() noexcept override;
133 void work_finished() noexcept override;
134
135 /** Offset a forthcoming work_finished from work_cleanup.
136
137 Called by descriptor_state when all I/O returned EAGAIN and no
138 handler will be executed. Must be called from a scheduler thread.
139 */
140 void compensating_work_started() const noexcept;
141
142 /** Drain work from thread context's private queue to global queue.
143
144 Called by thread_context_guard destructor when a thread exits run().
145 Transfers pending work to the global queue under mutex protection.
146
147 @param queue The private queue to drain.
148 @param count Item count for wakeup decisions (wakes other threads if positive).
149 */
150 void drain_thread_queue(op_queue& queue, long count) const;
151
152 /** Post completed operations for deferred invocation.
153
154 If called from a thread running this scheduler, operations go to
155 the thread's private queue (fast path). Otherwise, operations are
156 added to the global queue under mutex and a waiter is signaled.
157
158 @par Preconditions
159 work_started() must have been called for each operation.
160
161 @param ops Queue of operations to post.
162 */
163 void post_deferred_completions(op_queue& ops) const;
164
165 private:
166 friend struct work_cleanup;
167 friend struct task_cleanup;
168
169 std::size_t do_one(
170 std::unique_lock<std::mutex>& lock,
171 long timeout_us,
172 scheduler_context* ctx);
173 void run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx);
174 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
175 void interrupt_reactor() const;
176 void update_timerfd() const;
177
178 /** Set the signaled state and wake all waiting threads.
179
180 @par Preconditions
181 Mutex must be held.
182
183 @param lock The held mutex lock.
184 */
185 void signal_all(std::unique_lock<std::mutex>& lock) const;
186
187 /** Set the signaled state and wake one waiter if any exist.
188
189 Only unlocks and signals if at least one thread is waiting.
190 Use this when the caller needs to perform a fallback action
191 (such as interrupting the reactor) when no waiters exist.
192
193 @par Preconditions
194 Mutex must be held.
195
196 @param lock The held mutex lock.
197
198 @return `true` if unlocked and signaled, `false` if lock still held.
199 */
200 bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
201
202 /** Set the signaled state, unlock, and wake one waiter if any exist.
203
204 Always unlocks the mutex. Use this when the caller will release
205 the lock regardless of whether a waiter exists.
206
207 @par Preconditions
208 Mutex must be held.
209
210 @param lock The held mutex lock.
211
212 @return `true` if a waiter was signaled, `false` otherwise.
213 */
214 bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
215
216 /** Clear the signaled state before waiting.
217
218 @par Preconditions
219 Mutex must be held.
220 */
221 void clear_signal() const;
222
223 /** Block until the signaled state is set.
224
225 Returns immediately if already signaled (fast-path). Otherwise
226 increments the waiter count, waits on the condition variable,
227 and decrements the waiter count upon waking.
228
229 @par Preconditions
230 Mutex must be held.
231
232 @param lock The held mutex lock.
233 */
234 void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
235
236 /** Block until signaled or timeout expires.
237
238 @par Preconditions
239 Mutex must be held.
240
241 @param lock The held mutex lock.
242 @param timeout_us Maximum time to wait in microseconds.
243 */
244 void wait_for_signal_for(
245 std::unique_lock<std::mutex>& lock, long timeout_us) const;
246
247 int epoll_fd_;
248 int event_fd_; // for interrupting reactor
249 int timer_fd_; // timerfd for kernel-managed timer expiry
250 mutable std::mutex mutex_;
251 mutable std::condition_variable cond_;
252 mutable op_queue completed_ops_;
253 mutable std::atomic<long> outstanding_work_;
254 bool stopped_;
255 bool shutdown_;
256
257 // True while a thread is blocked in epoll_wait. Used by
258 // wake_one_thread_and_unlock and work_finished to know when
259 // an eventfd interrupt is needed instead of a condvar signal.
260 mutable std::atomic<bool> task_running_{false};
261
262 // True when the reactor has been told to do a non-blocking poll
263 // (more handlers queued or poll mode). Prevents redundant eventfd
264 // writes and controls the epoll_wait timeout.
265 mutable bool task_interrupted_ = false;
266
267 // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
268 mutable std::size_t state_ = 0;
269
270 // Edge-triggered eventfd state
271 mutable std::atomic<bool> eventfd_armed_{false};
272
273 // Set when the earliest timer changes; flushed before epoll_wait
274 // blocks. Avoids timerfd_settime syscalls for timers that are
275 // scheduled then cancelled without being waited on.
276 mutable std::atomic<bool> timerfd_stale_{false};
277
278 // Sentinel operation for interleaving reactor runs with handler execution.
279 // Ensures the reactor runs periodically even when handlers are continuously
280 // posted, preventing starvation of I/O events, timers, and signals.
281 struct task_op final : scheduler_op
282 {
283 void operator()() override {}
284 void destroy() override {}
285 };
286 task_op task_op_;
287 };
288
289 } // namespace boost::corosio::detail
290
291 #endif // BOOST_COROSIO_HAS_EPOLL
292
293 #endif // BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
294