include/boost/corosio/tcp_server.hpp

78.8% Lines (108/137) 91.4% Functions (32/35)
include/boost/corosio/tcp_server.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_TCP_SERVER_HPP
11 #define BOOST_COROSIO_TCP_SERVER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/except.hpp>
15 #include <boost/corosio/tcp_acceptor.hpp>
16 #include <boost/corosio/tcp_socket.hpp>
17 #include <boost/corosio/io_context.hpp>
18 #include <boost/corosio/endpoint.hpp>
19 #include <boost/capy/task.hpp>
20 #include <boost/capy/concept/execution_context.hpp>
21 #include <boost/capy/concept/io_awaitable.hpp>
22 #include <boost/capy/concept/executor.hpp>
23 #include <boost/capy/ex/any_executor.hpp>
24 #include <boost/capy/ex/frame_allocator.hpp>
25 #include <boost/capy/ex/io_env.hpp>
26 #include <boost/capy/ex/run_async.hpp>
27
28 #include <coroutine>
29 #include <memory>
30 #include <ranges>
31 #include <vector>
32
33 namespace boost::corosio {
34
35 #ifdef _MSC_VER
36 #pragma warning(push)
37 #pragma warning(disable : 4251) // class needs to have dll-interface
38 #endif
39
40 /** TCP server with pooled workers.
41
42 This class manages a pool of reusable worker objects that handle
43 incoming connections. When a connection arrives, an idle worker
44 is dispatched to handle it. After the connection completes, the
45 worker returns to the pool for reuse, avoiding allocation overhead
46 per connection.
47
48 Workers are set via @ref set_workers as a forward range of
49 pointer-like objects (e.g., `unique_ptr<worker_base>`). The server
50 takes ownership of the container via type erasure.
51
52 @par Thread Safety
53 Distinct objects: Safe.
54 Shared objects: Unsafe.
55
56 @par Lifecycle
57 The server operates in three states:
58
59 - **Stopped**: Initial state, or after @ref join completes.
60 - **Running**: After @ref start, actively accepting connections.
61 - **Stopping**: After @ref stop, draining active work.
62
63 State transitions:
64 @code
65 [Stopped] --start()--> [Running] --stop()--> [Stopping] --join()--> [Stopped]
66 @endcode
67
68 @par Running the Server
69 @code
70 io_context ioc;
71 tcp_server srv(ioc, ioc.get_executor());
72 srv.set_workers(make_workers(ioc, 100));
73 srv.bind(endpoint{address_v4::any(), 8080});
74 srv.start();
75 ioc.run(); // Blocks until all work completes
76 @endcode
77
78 @par Graceful Shutdown
79 To shut down gracefully, call @ref stop then drain the io_context:
80 @code
81 // From a signal handler or timer callback:
82 srv.stop();
83
84 // ioc.run() returns after pending work drains.
85 // Then from the thread that called ioc.run():
86 srv.join(); // Wait for accept loops to finish
87 @endcode
88
89 @par Restart After Stop
90 The server can be restarted after a complete shutdown cycle.
91 You must drain the io_context and call @ref join before restarting:
92 @code
93 srv.start();
94 ioc.run_for( 10s ); // Run for a while
95 srv.stop(); // Signal shutdown
96 ioc.run(); // REQUIRED: drain pending completions
97 srv.join(); // REQUIRED: wait for accept loops
98
99 // Now safe to restart
100 srv.start();
101 ioc.run();
102 @endcode
103
104 @par WARNING: What NOT to Do
105 - Do NOT call @ref join from inside a worker coroutine (deadlock).
106 - Do NOT call @ref join from a thread running `ioc.run()` (deadlock).
107 - Do NOT call @ref start without completing @ref join after @ref stop.
108 - Do NOT call `ioc.stop()` for graceful shutdown; use @ref stop instead.
109
110 @par Example
111 @code
112 class my_worker : public tcp_server::worker_base
113 {
114 corosio::tcp_socket sock_;
115 capy::any_executor ex_;
116 public:
117 my_worker(io_context& ctx)
118 : sock_(ctx)
119 , ex_(ctx.get_executor())
120 {
121 }
122
123 corosio::tcp_socket& socket() override { return sock_; }
124
125 void run(launcher launch) override
126 {
127 launch(ex_, [](corosio::tcp_socket* sock) -> capy::task<>
128 {
129 // handle connection using sock
130 co_return;
131 }(&sock_));
132 }
133 };
134
135 auto make_workers(io_context& ctx, int n)
136 {
137 std::vector<std::unique_ptr<tcp_server::worker_base>> v;
138 v.reserve(n);
139 for(int i = 0; i < n; ++i)
140 v.push_back(std::make_unique<my_worker>(ctx));
141 return v;
142 }
143
144 io_context ioc;
145 tcp_server srv(ioc, ioc.get_executor());
146 srv.set_workers(make_workers(ioc, 100));
147 @endcode
148
149 @see worker_base, set_workers, launcher
150 */
151 class BOOST_COROSIO_DECL tcp_server
152 {
153 public:
154 class worker_base; ///< Abstract base for connection handlers.
155 class launcher; ///< Move-only handle to launch worker coroutines.
156
157 private:
158 struct waiter
159 {
160 waiter* next;
161 std::coroutine_handle<> h;
162 worker_base* w;
163 };
164
165 struct impl;
166
167 static impl* make_impl(capy::execution_context& ctx);
168
169 impl* impl_;
170 capy::any_executor ex_;
171 waiter* waiters_ = nullptr;
172 worker_base* idle_head_ = nullptr; // Forward list: available workers
173 worker_base* active_head_ =
174 nullptr; // Doubly linked: workers handling connections
175 worker_base* active_tail_ = nullptr; // Tail for O(1) push_back
176 std::size_t active_accepts_ = 0; // Number of active do_accept coroutines
177 std::shared_ptr<void> storage_; // Owns the worker container (type-erased)
178 bool running_ = false;
179
180 // Idle list (forward/singly linked) - push front, pop front
181 45 void idle_push(worker_base* w) noexcept
182 {
183 45 w->next_ = idle_head_;
184 45 idle_head_ = w;
185 45 }
186
187 9 worker_base* idle_pop() noexcept
188 {
189 9 auto* w = idle_head_;
190 9 if (w)
191 9 idle_head_ = w->next_;
192 9 return w;
193 }
194
195 9 bool idle_empty() const noexcept
196 {
197 9 return idle_head_ == nullptr;
198 }
199
200 // Active list (doubly linked) - push back, remove anywhere
201 3 void active_push(worker_base* w) noexcept
202 {
203 3 w->next_ = nullptr;
204 3 w->prev_ = active_tail_;
205 3 if (active_tail_)
206 active_tail_->next_ = w;
207 else
208 3 active_head_ = w;
209 3 active_tail_ = w;
210 3 }
211
212 9 void active_remove(worker_base* w) noexcept
213 {
214 // Skip if not in active list (e.g., after failed accept)
215 9 if (w != active_head_ && w->prev_ == nullptr)
216 6 return;
217 3 if (w->prev_)
218 w->prev_->next_ = w->next_;
219 else
220 3 active_head_ = w->next_;
221 3 if (w->next_)
222 w->next_->prev_ = w->prev_;
223 else
224 3 active_tail_ = w->prev_;
225 3 w->prev_ = nullptr; // Mark as not in active list
226 }
227
228 template<capy::Executor Ex>
229 struct launch_wrapper
230 {
231 struct promise_type
232 {
233 Ex ex; // Executor stored directly in frame (outlives child tasks)
234 capy::io_env env_;
235
236 // For regular coroutines: first arg is executor, second is stop token
237 template<class E, class S, class... Args>
238 requires capy::Executor<std::decay_t<E>>
239 promise_type(E e, S s, Args&&...)
240 : ex(std::move(e))
241 , env_{
242 capy::executor_ref(ex), std::move(s),
243 capy::get_current_frame_allocator()}
244 {
245 }
246
247 // For lambda coroutines: first arg is closure, second is executor, third is stop token
248 template<class Closure, class E, class S, class... Args>
249 requires(!capy::Executor<std::decay_t<Closure>> &&
250 capy::Executor<std::decay_t<E>>)
251 3 promise_type(Closure&&, E e, S s, Args&&...)
252 3 : ex(std::move(e))
253 3 , env_{
254 3 capy::executor_ref(ex), std::move(s),
255 3 capy::get_current_frame_allocator()}
256 {
257 3 }
258
259 3 launch_wrapper get_return_object() noexcept
260 {
261 return {
262 3 std::coroutine_handle<promise_type>::from_promise(*this)};
263 }
264 3 std::suspend_always initial_suspend() noexcept
265 {
266 3 return {};
267 }
268 3 std::suspend_never final_suspend() noexcept
269 {
270 3 return {};
271 }
272 3 void return_void() noexcept {}
273 void unhandled_exception()
274 {
275 std::terminate();
276 }
277
278 // Inject io_env for IoAwaitable
279 template<capy::IoAwaitable Awaitable>
280 6 auto await_transform(Awaitable&& a)
281 {
282 using AwaitableT = std::decay_t<Awaitable>;
283 struct adapter
284 {
285 AwaitableT aw;
286 capy::io_env const* env;
287
288 bool await_ready()
289 {
290 return aw.await_ready();
291 }
292 decltype(auto) await_resume()
293 {
294 return aw.await_resume();
295 }
296
297 auto await_suspend(std::coroutine_handle<promise_type> h)
298 {
299 return aw.await_suspend(h, env);
300 }
301 };
302 9 return adapter{std::forward<Awaitable>(a), &env_};
303 3 }
304 };
305
306 std::coroutine_handle<promise_type> h;
307
308 3 launch_wrapper(std::coroutine_handle<promise_type> handle) noexcept
309 3 : h(handle)
310 {
311 3 }
312
313 3 ~launch_wrapper()
314 {
315 3 if (h)
316 h.destroy();
317 3 }
318
319 launch_wrapper(launch_wrapper&& o) noexcept
320 : h(std::exchange(o.h, nullptr))
321 {
322 }
323
324 launch_wrapper(launch_wrapper const&) = delete;
325 launch_wrapper& operator=(launch_wrapper const&) = delete;
326 launch_wrapper& operator=(launch_wrapper&&) = delete;
327 };
328
329 // Named functor to avoid incomplete lambda type in coroutine promise
330 template<class Executor>
331 struct launch_coro
332 {
333 3 launch_wrapper<Executor> operator()(
334 Executor,
335 std::stop_token,
336 tcp_server* self,
337 capy::task<void> t,
338 worker_base* wp)
339 {
340 // Executor and stop token stored in promise via constructor
341 co_await std::move(t);
342 co_await self->push(*wp); // worker goes back to idle list
343 6 }
344 };
345
346 class push_awaitable
347 {
348 tcp_server& self_;
349 worker_base& w_;
350
351 public:
352 9 push_awaitable(tcp_server& self, worker_base& w) noexcept
353 9 : self_(self)
354 9 , w_(w)
355 {
356 9 }
357
358 9 bool await_ready() const noexcept
359 {
360 9 return false;
361 }
362
363 std::coroutine_handle<>
364 9 await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
365 {
366 // Symmetric transfer to server's executor
367 9 return self_.ex_.dispatch(h);
368 }
369
370 9 void await_resume() noexcept
371 {
372 // Running on server executor - safe to modify lists
373 // Remove from active (if present), then wake waiter or add to idle
374 9 self_.active_remove(&w_);
375 9 if (self_.waiters_)
376 {
377 auto* wait = self_.waiters_;
378 self_.waiters_ = wait->next;
379 wait->w = &w_;
380 self_.ex_.post(wait->h);
381 }
382 else
383 {
384 9 self_.idle_push(&w_);
385 }
386 9 }
387 };
388
389 class pop_awaitable
390 {
391 tcp_server& self_;
392 waiter wait_;
393
394 public:
395 9 pop_awaitable(tcp_server& self) noexcept : self_(self), wait_{} {}
396
397 9 bool await_ready() const noexcept
398 {
399 9 return !self_.idle_empty();
400 }
401
402 bool
403 await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
404 {
405 // Running on server executor (do_accept runs there)
406 wait_.h = h;
407 wait_.w = nullptr;
408 wait_.next = self_.waiters_;
409 self_.waiters_ = &wait_;
410 return true;
411 }
412
413 9 worker_base& await_resume() noexcept
414 {
415 // Running on server executor
416 9 if (wait_.w)
417 return *wait_.w; // Woken by push_awaitable
418 9 return *self_.idle_pop();
419 }
420 };
421
422 9 push_awaitable push(worker_base& w)
423 {
424 9 return push_awaitable{*this, w};
425 }
426
427 // Synchronous version for destructor/guard paths
428 // Must be called from server executor context
429 void push_sync(worker_base& w) noexcept
430 {
431 active_remove(&w);
432 if (waiters_)
433 {
434 auto* wait = waiters_;
435 waiters_ = wait->next;
436 wait->w = &w;
437 ex_.post(wait->h);
438 }
439 else
440 {
441 idle_push(&w);
442 }
443 }
444
445 9 pop_awaitable pop()
446 {
447 9 return pop_awaitable{*this};
448 }
449
450 capy::task<void> do_accept(tcp_acceptor& acc);
451
452 public:
453 /** Abstract base class for connection handlers.
454
455 Derive from this class to implement custom connection handling.
456 Each worker owns a socket and is reused across multiple
457 connections to avoid per-connection allocation.
458
459 @see tcp_server, launcher
460 */
461 class BOOST_COROSIO_DECL worker_base
462 {
463 // Ordered largest to smallest for optimal packing
464 std::stop_source stop_; // ~16 bytes
465 worker_base* next_ = nullptr; // 8 bytes - used by idle and active lists
466 worker_base* prev_ = nullptr; // 8 bytes - used only by active list
467
468 friend class tcp_server;
469
470 public:
471 /// Destroy the worker.
472 36 virtual ~worker_base() = default;
473
474 /** Handle an accepted connection.
475
476 Called when this worker is dispatched to handle a new
477 connection. The implementation must invoke the launcher
478 exactly once to start the handling coroutine.
479
480 @param launch Handle to launch the connection coroutine.
481 */
482 virtual void run(launcher launch) = 0;
483
484 /// Return the socket used for connections.
485 virtual corosio::tcp_socket& socket() = 0;
486 };
487
488 /** Move-only handle to launch a worker coroutine.
489
490 Passed to @ref worker_base::run to start the connection-handling
491 coroutine. The launcher ensures the worker returns to the idle
492 pool when the coroutine completes or if launching fails.
493
494 The launcher must be invoked exactly once via `operator()`.
495 If destroyed without invoking, the worker is returned to the
496 idle pool automatically.
497
498 @see worker_base::run
499 */
500 class BOOST_COROSIO_DECL launcher
501 {
502 tcp_server* srv_;
503 worker_base* w_;
504
505 friend class tcp_server;
506
507 3 launcher(tcp_server& srv, worker_base& w) noexcept : srv_(&srv), w_(&w)
508 {
509 3 }
510
511 public:
512 /// Return the worker to the pool if not launched.
513 3 ~launcher()
514 {
515 3 if (w_)
516 srv_->push_sync(*w_);
517 3 }
518
519 launcher(launcher&& o) noexcept
520 : srv_(o.srv_)
521 , w_(std::exchange(o.w_, nullptr))
522 {
523 }
524 launcher(launcher const&) = delete;
525 launcher& operator=(launcher const&) = delete;
526 launcher& operator=(launcher&&) = delete;
527
528 /** Launch the connection-handling coroutine.
529
530 Starts the given coroutine on the specified executor. When
531 the coroutine completes, the worker is automatically returned
532 to the idle pool.
533
534 @param ex The executor to run the coroutine on.
535 @param task The coroutine to execute.
536
537 @throws std::logic_error If this launcher was already invoked.
538 */
539 template<class Executor>
540 3 void operator()(Executor const& ex, capy::task<void> task)
541 {
542 3 if (!w_)
543 detail::throw_logic_error(); // launcher already invoked
544
545 3 auto* w = std::exchange(w_, nullptr);
546
547 // Worker is being dispatched - add to active list
548 3 srv_->active_push(w);
549
550 // Return worker to pool if coroutine setup throws
551 struct guard_t
552 {
553 tcp_server* srv;
554 worker_base* w;
555 3 ~guard_t()
556 {
557 3 if (w)
558 srv->push_sync(*w);
559 3 }
560 3 } guard{srv_, w};
561
562 // Reset worker's stop source for this connection
563 3 w->stop_ = {};
564 3 auto st = w->stop_.get_token();
565
566 3 auto wrapper =
567 3 launch_coro<Executor>{}(ex, st, srv_, std::move(task), w);
568
569 // Executor and stop token stored in promise via constructor
570 3 ex.post(std::exchange(wrapper.h, nullptr)); // Release before post
571 3 guard.w = nullptr; // Success - dismiss guard
572 3 }
573 };
574
575 /** Construct a TCP server.
576
577 @tparam Ctx Execution context type satisfying ExecutionContext.
578 @tparam Ex Executor type satisfying Executor.
579
580 @param ctx The execution context for socket operations.
581 @param ex The executor for dispatching coroutines.
582
583 @par Example
584 @code
585 tcp_server srv(ctx, ctx.get_executor());
586 srv.set_workers(make_workers(ctx, 100));
587 srv.bind(endpoint{...});
588 srv.start();
589 @endcode
590 */
591 template<capy::ExecutionContext Ctx, capy::Executor Ex>
592 9 tcp_server(Ctx& ctx, Ex ex) : impl_(make_impl(ctx))
593 9 , ex_(std::move(ex))
594 {
595 9 }
596
597 public:
598 ~tcp_server();
599 tcp_server(tcp_server const&) = delete;
600 tcp_server& operator=(tcp_server const&) = delete;
601 tcp_server(tcp_server&& o) noexcept;
602 tcp_server& operator=(tcp_server&& o) noexcept;
603
604 /** Bind to a local endpoint.
605
606 Creates an acceptor listening on the specified endpoint.
607 Multiple endpoints can be bound by calling this method
608 multiple times before @ref start.
609
610 @param ep The local endpoint to bind to.
611
612 @return The error code if binding fails.
613 */
614 std::error_code bind(endpoint ep);
615
616 /** Set the worker pool.
617
618 Replaces any existing workers with the given range. Any
619 previous workers are released and the idle/active lists
620 are cleared before populating with new workers.
621
622 @tparam Range Forward range of pointer-like objects to worker_base.
623
624 @param workers Range of workers to manage. Each element must
625 support `std::to_address()` yielding `worker_base*`.
626
627 @par Example
628 @code
629 std::vector<std::unique_ptr<my_worker>> workers;
630 for(int i = 0; i < 100; ++i)
631 workers.push_back(std::make_unique<my_worker>(ctx));
632 srv.set_workers(std::move(workers));
633 @endcode
634 */
635 template<std::ranges::forward_range Range>
636 requires std::convertible_to<
637 decltype(std::to_address(
638 std::declval<std::ranges::range_value_t<Range>&>())),
639 worker_base*>
640 9 void set_workers(Range&& workers)
641 {
642 // Clear existing state
643 9 storage_.reset();
644 9 idle_head_ = nullptr;
645 9 active_head_ = nullptr;
646 9 active_tail_ = nullptr;
647
648 // Take ownership and populate idle list
649 using StorageType = std::decay_t<Range>;
650 9 auto* p = new StorageType(std::forward<Range>(workers));
651 9 storage_ = std::shared_ptr<void>(
652 9 p, [](void* ptr) { delete static_cast<StorageType*>(ptr); });
653 45 for (auto&& elem : *static_cast<StorageType*>(p))
654 36 idle_push(std::to_address(elem));
655 9 }
656
657 /** Start accepting connections.
658
659 Launches accept loops for all bound endpoints. Incoming
660 connections are dispatched to idle workers from the pool.
661
662 Calling `start()` on an already-running server has no effect.
663
664 @par Preconditions
665 - At least one endpoint bound via @ref bind.
666 - Workers provided to the constructor.
667 - If restarting, @ref join must have completed first.
668
669 @par Effects
670 Creates one accept coroutine per bound endpoint. Each coroutine
671 runs on the server's executor, waiting for connections and
672 dispatching them to idle workers.
673
674 @par Restart Sequence
675 To restart after stopping, complete the full shutdown cycle:
676 @code
677 srv.start();
678 ioc.run_for( 1s );
679 srv.stop(); // 1. Signal shutdown
680 ioc.run(); // 2. Drain remaining completions
681 srv.join(); // 3. Wait for accept loops
682
683 // Now safe to restart
684 srv.start();
685 ioc.run();
686 @endcode
687
688 @par Thread Safety
689 Not thread safe.
690
691 @throws std::logic_error If a previous session has not been
692 joined (accept loops still active).
693 */
694 void start();
695
696 /** Stop accepting connections.
697
698 Signals all listening ports to stop accepting new connections
699 and requests cancellation of active workers via their stop tokens.
700
701 This function returns immediately; it does not wait for workers
702 to finish. Pending I/O operations complete asynchronously.
703
704 Calling `stop()` on a non-running server has no effect.
705
706 @par Effects
707 - Closes all acceptors (pending accepts complete with error).
708 - Requests stop on each active worker's stop token.
709 - Workers observing their stop token should exit promptly.
710
711 @par Postconditions
712 No new connections will be accepted. Active workers continue
713 until they observe their stop token or complete naturally.
714
715 @par What Happens Next
716 After calling `stop()`:
717 1. Let `ioc.run()` return (drains pending completions).
718 2. Call @ref join to wait for accept loops to finish.
719 3. Only then is it safe to restart or destroy the server.
720
721 @par Thread Safety
722 Not thread safe.
723
724 @see join, start
725 */
726 void stop();
727
728 /** Block until all accept loops complete.
729
730 Blocks the calling thread until all accept coroutines launched
731 by @ref start have finished executing. This synchronizes the
732 shutdown sequence, ensuring the server is fully stopped before
733 restarting or destroying it.
734
735 @par Preconditions
736 @ref stop has been called and `ioc.run()` has returned.
737
738 @par Postconditions
739 All accept loops have completed. The server is in the stopped
740 state and may be restarted via @ref start.
741
742 @par Example (Correct Usage)
743 @code
744 // main thread
745 srv.start();
746 ioc.run(); // Blocks until work completes
747 srv.join(); // Safe: called after ioc.run() returns
748 @endcode
749
750 @par WARNING: Deadlock Scenarios
751 Calling `join()` from the wrong context causes deadlock:
752
753 @code
754 // WRONG: calling join() from inside a worker coroutine
755 void run( launcher launch ) override
756 {
757 launch( ex, [this]() -> capy::task<>
758 {
759 srv_.join(); // DEADLOCK: blocks the executor
760 co_return;
761 }());
762 }
763
764 // WRONG: calling join() while ioc.run() is still active
765 std::thread t( [&]{ ioc.run(); } );
766 srv.stop();
767 srv.join(); // DEADLOCK: ioc.run() still running in thread t
768 @endcode
769
770 @par Thread Safety
771 May be called from any thread, but will deadlock if called
772 from within the io_context event loop or from a worker coroutine.
773
774 @see stop, start
775 */
776 void join();
777
778 private:
779 capy::task<> do_stop();
780 };
781
782 #ifdef _MSC_VER
783 #pragma warning(pop)
784 #endif
785
786 } // namespace boost::corosio
787
788 #endif
789