src/corosio/src/tcp_server.cpp

64.6% Lines (42/65) 84.6% Functions (11/13)
src/corosio/src/tcp_server.cpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #include <boost/corosio/tcp_server.hpp>
12 #include <boost/corosio/detail/except.hpp>
13 #include <condition_variable>
14 #include <mutex>
15 #include <utility>
16
17 namespace boost::corosio {
18
19 struct tcp_server::impl
20 {
21 std::mutex join_mutex;
22 std::condition_variable join_cv;
23 capy::execution_context& ctx;
24 std::vector<tcp_acceptor> ports;
25 std::stop_source stop;
26
27 9 explicit impl(capy::execution_context& c) noexcept : ctx(c) {}
28 };
29
30 tcp_server::impl*
31 9 tcp_server::make_impl(capy::execution_context& ctx)
32 {
33 9 return new impl(ctx);
34 }
35
36 9 tcp_server::~tcp_server()
37 {
38 9 delete impl_;
39 9 }
40
41 tcp_server::tcp_server(tcp_server&& o) noexcept
42 : impl_(std::exchange(o.impl_, nullptr))
43 , ex_(o.ex_)
44 , waiters_(std::exchange(o.waiters_, nullptr))
45 , idle_head_(std::exchange(o.idle_head_, nullptr))
46 , active_head_(std::exchange(o.active_head_, nullptr))
47 , active_tail_(std::exchange(o.active_tail_, nullptr))
48 , active_accepts_(std::exchange(o.active_accepts_, 0))
49 , storage_(std::move(o.storage_))
50 , running_(std::exchange(o.running_, false))
51 {
52 }
53
54 tcp_server&
55 tcp_server::operator=(tcp_server&& o) noexcept
56 {
57 delete impl_;
58 impl_ = std::exchange(o.impl_, nullptr);
59 ex_ = o.ex_;
60 waiters_ = std::exchange(o.waiters_, nullptr);
61 idle_head_ = std::exchange(o.idle_head_, nullptr);
62 active_head_ = std::exchange(o.active_head_, nullptr);
63 active_tail_ = std::exchange(o.active_tail_, nullptr);
64 active_accepts_ = std::exchange(o.active_accepts_, 0);
65 storage_ = std::move(o.storage_);
66 running_ = std::exchange(o.running_, false);
67 return *this;
68 }
69
70 // Accept loop: wait for idle worker, accept connection, dispatch
71 capy::task<void>
72 8 tcp_server::do_accept(tcp_acceptor& acc)
73 {
74 // Analyzer can't trace value through coroutine await_transform
75 // NOLINTNEXTLINE(clang-analyzer-core.uninitialized.UndefReturn)
76 auto env = co_await capy::this_coro::environment;
77 while (!env->stop_token.stop_requested())
78 {
79 // Wait for an idle worker before blocking on accept
80 auto& w = co_await pop();
81 auto [ec] = co_await acc.accept(w.socket());
82 if (ec)
83 {
84 co_await push(w);
85 continue;
86 }
87 w.run(launcher{*this, w});
88 }
89 16 }
90
91 std::error_code
92 9 tcp_server::bind(endpoint ep)
93 {
94 9 impl_->ports.emplace_back(impl_->ctx);
95 9 auto ec = impl_->ports.back().listen(ep);
96 9 if (ec)
97 1 impl_->ports.pop_back();
98 9 return ec;
99 }
100
101 void
102 10 tcp_server::start()
103 {
104 // Idempotent - only start if not already running
105 10 if (running_)
106 1 return;
107
108 // Previous session must be fully stopped before restart
109 9 if (active_accepts_ != 0)
110 1 detail::throw_logic_error(
111 "tcp_server::start: previous session not joined");
112
113 8 running_ = true;
114
115 8 impl_->stop = {}; // Fresh stop source
116 8 auto st = impl_->stop.get_token();
117
118 8 active_accepts_ = impl_->ports.size();
119
120 // Launch with completion handler that decrements counter
121 16 for (auto& t : impl_->ports)
122 16 capy::run_async(ex_, st, [this]() {
123 8 std::lock_guard lock(impl_->join_mutex);
124 8 if (--active_accepts_ == 0)
125 8 impl_->join_cv.notify_all();
126 16 })(do_accept(t));
127 8 }
128
129 void
130 10 tcp_server::stop()
131 {
132 // Idempotent - only stop if running
133 10 if (!running_)
134 2 return;
135 8 running_ = false;
136
137 // Stop accept loops
138 8 impl_->stop.request_stop();
139
140 // Launch cancellation coroutine on server executor
141 8 capy::run_async(ex_, std::stop_token{})(do_stop());
142 }
143
144 void
145 4 tcp_server::join()
146 {
147 4 std::unique_lock lock(impl_->join_mutex);
148 8 impl_->join_cv.wait(lock, [this] { return active_accepts_ == 0; });
149 4 }
150
151 capy::task<>
152 8 tcp_server::do_stop()
153 {
154 // Running on server executor - safe to iterate active list
155 // Just cancel, don't modify list - workers return themselves when done
156 for (auto* w = active_head_; w; w = w->next_)
157 w->stop_.request_stop();
158 co_return;
159 16 }
160
161 } // namespace boost::corosio
162