include/boost/corosio/tcp_server.hpp

93.5% Lines (130/139) 97.1% List of functions (33/34)
tcp_server.hpp
f(x) Functions (34)
Function Calls Lines Blocks
boost::corosio::tcp_server::idle_push(boost::corosio::tcp_server::worker_base*) :182 154x 100.0% 100.0% boost::corosio::tcp_server::idle_pop() :188 36x 100.0% 100.0% boost::corosio::tcp_server::idle_empty() const :196 42x 100.0% 100.0% boost::corosio::tcp_server::active_push(boost::corosio::tcp_server::worker_base*) :202 18x 100.0% 100.0% boost::corosio::tcp_server::active_remove(boost::corosio::tcp_server::worker_base*) :213 42x 100.0% 91.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::promise_type<boost::corosio::tcp_server::launch_coro<boost::corosio::io_context::executor_type>&, boost::corosio::io_context::executor_type, std::stop_token, boost::corosio::tcp_server*&, boost::capy::task<void>&, boost::corosio::tcp_server::worker_base*&>(boost::corosio::tcp_server::launch_coro<boost::corosio::io_context::executor_type>&&, boost::corosio::io_context::executor_type, std::stop_token, boost::corosio::tcp_server*&, boost::capy::task<void>&, boost::corosio::tcp_server::worker_base*&) :252 18x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::get_return_object() :260 18x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::initial_suspend() :265 18x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::final_suspend() :269 18x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::return_void() :273 18x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::unhandled_exception() :274 0 0.0% 0.0% auto boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::await_transform<boost::capy::task<void> >(boost::capy::task<void>&&) :281 18x 100.0% 100.0% auto boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type::await_transform<boost::corosio::tcp_server::push_awaitable>(boost::corosio::tcp_server::push_awaitable&&) :281 18x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::launch_wrapper(std::__n4861::coroutine_handle<boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::promise_type>) :309 18x 100.0% 100.0% boost::corosio::tcp_server::launch_wrapper<boost::corosio::io_context::executor_type>::~launch_wrapper() :314 18x 75.0% 75.0% boost::corosio::tcp_server::launch_coro<boost::corosio::io_context::executor_type>::operator()(boost::corosio::io_context::executor_type, std::stop_token, boost::corosio::tcp_server*, boost::capy::task<void>, boost::corosio::tcp_server::worker_base*) :334 18x 100.0% 46.0% boost::corosio::tcp_server::push_awaitable::push_awaitable(boost::corosio::tcp_server&, boost::corosio::tcp_server::worker_base&) :354 38x 100.0% 100.0% boost::corosio::tcp_server::push_awaitable::await_ready() const :360 38x 100.0% 100.0% boost::corosio::tcp_server::push_awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :366 38x 100.0% 100.0% boost::corosio::tcp_server::push_awaitable::await_resume() :373 38x 100.0% 100.0% boost::corosio::tcp_server::pop_awaitable::pop_awaitable(boost::corosio::tcp_server&) :399 42x 100.0% 100.0% boost::corosio::tcp_server::pop_awaitable::await_ready() const :401 42x 100.0% 100.0% boost::corosio::tcp_server::pop_awaitable::await_suspend(std::__n4861::coroutine_handle<void>, boost::capy::io_env const*) :407 6x 100.0% 100.0% boost::corosio::tcp_server::pop_awaitable::await_resume() :417 42x 100.0% 100.0% boost::corosio::tcp_server::push(boost::corosio::tcp_server::worker_base&) :426 38x 100.0% 100.0% boost::corosio::tcp_server::push_sync(boost::corosio::tcp_server::worker_base&) :433 4x 50.0% 80.0% boost::corosio::tcp_server::pop() :450 42x 100.0% 100.0% boost::corosio::tcp_server::launcher::launcher(boost::corosio::tcp_server&, boost::corosio::tcp_server::worker_base&) :515 22x 100.0% 100.0% boost::corosio::tcp_server::launcher::~launcher() :521 22x 100.0% 100.0% void boost::corosio::tcp_server::launcher::operator()<boost::corosio::io_context::executor_type>(boost::corosio::io_context::executor_type const&, boost::capy::task<void>) :548 20x 100.0% 58.0% boost::corosio::tcp_server::launcher::operator()<boost::corosio::io_context::executor_type>(boost::corosio::io_context::executor_type const&, boost::capy::task<void>)::guard_t::~guard_t() :563 18x 91.7% 67.0% boost::corosio::tcp_server::tcp_server<boost::corosio::io_context, boost::corosio::io_context::executor_type>(boost::corosio::io_context&, boost::corosio::io_context::executor_type) :600 34x 100.0% 100.0% void boost::corosio::tcp_server::set_workers<std::vector<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> >, std::allocator<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> > > > >(std::vector<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> >, std::allocator<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> > > >&&) :664 34x 100.0% 100.0% boost::corosio::tcp_server::set_workers<std::vector<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> >, std::allocator<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> > > > >(std::vector<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> >, std::allocator<std::unique_ptr<boost::corosio::tcp_server::worker_base, std::default_delete<boost::corosio::tcp_server::worker_base> > > >&&)::{lambda(void*)#1}::operator()(void*) const :676 34x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_TCP_SERVER_HPP
11 #define BOOST_COROSIO_TCP_SERVER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/except.hpp>
15 #include <boost/corosio/tcp_acceptor.hpp>
16 #include <boost/corosio/tcp_socket.hpp>
17 #include <boost/corosio/io_context.hpp>
18 #include <boost/corosio/endpoint.hpp>
19 #include <boost/capy/task.hpp>
20 #include <boost/capy/concept/execution_context.hpp>
21 #include <boost/capy/concept/io_awaitable.hpp>
22 #include <boost/capy/concept/executor.hpp>
23 #include <boost/capy/ex/any_executor.hpp>
24 #include <boost/capy/ex/frame_allocator.hpp>
25 #include <boost/capy/ex/io_env.hpp>
26 #include <boost/capy/ex/run_async.hpp>
27
28 #include <coroutine>
29 #include <memory>
30 #include <ranges>
31 #include <vector>
32
33 namespace boost::corosio {
34
35 #ifdef _MSC_VER
36 #pragma warning(push)
37 #pragma warning(disable : 4251) // class needs to have dll-interface
38 #endif
39
40 /** TCP server with pooled workers.
41
42 This class manages a pool of reusable worker objects that handle
43 incoming connections. When a connection arrives, an idle worker
44 is dispatched to handle it. After the connection completes, the
45 worker returns to the pool for reuse, avoiding allocation overhead
46 per connection.
47
48 Workers are set via @ref set_workers as a forward range of
49 pointer-like objects (e.g., `unique_ptr<worker_base>`). The server
50 takes ownership of the container via type erasure.
51
52 @par Thread Safety
53 Distinct objects: Safe.
54 Shared objects: Unsafe.
55
56 @par Lifecycle
57 The server operates in three states:
58
59 - **Stopped**: Initial state, or after @ref join completes.
60 - **Running**: After @ref start, actively accepting connections.
61 - **Stopping**: After @ref stop, draining active work.
62
63 State transitions:
64 @code
65 [Stopped] --start()--> [Running] --stop()--> [Stopping] --join()--> [Stopped]
66 @endcode
67
68 @par Running the Server
69 @code
70 io_context ioc;
71 tcp_server srv(ioc, ioc.get_executor());
72 srv.set_workers(make_workers(ioc, 100));
73 srv.bind(endpoint{address_v4::any(), 8080});
74 srv.start();
75 ioc.run(); // Blocks until all work completes
76 @endcode
77
78 @par Graceful Shutdown
79 To shut down gracefully, call @ref stop then drain the io_context:
80 @code
81 // From a signal handler or timer callback:
82 srv.stop();
83
84 // ioc.run() returns after pending work drains.
85 // Then from the thread that called ioc.run():
86 srv.join(); // Wait for accept loops to finish
87 @endcode
88
89 @par Restart After Stop
90 The server can be restarted after a complete shutdown cycle.
91 You must drain the io_context and call @ref join before restarting:
92 @code
93 srv.start();
94 ioc.run_for( 10s ); // Run for a while
95 srv.stop(); // Signal shutdown
96 ioc.run(); // REQUIRED: drain pending completions
97 srv.join(); // REQUIRED: wait for accept loops
98
99 // Now safe to restart
100 srv.start();
101 ioc.run();
102 @endcode
103
104 @par WARNING: What NOT to Do
105 - Do NOT call @ref join from inside a worker coroutine (deadlock).
106 - Do NOT call @ref join from a thread running `ioc.run()` (deadlock).
107 - Do NOT call @ref start without completing @ref join after @ref stop.
108 - Do NOT call `ioc.stop()` for graceful shutdown; use @ref stop instead.
109
110 @par Example
111 @code
112 class my_worker : public tcp_server::worker_base
113 {
114 corosio::tcp_socket sock_;
115 capy::any_executor ex_;
116 public:
117 my_worker(io_context& ctx)
118 : sock_(ctx)
119 , ex_(ctx.get_executor())
120 {
121 }
122
123 corosio::tcp_socket& socket() override { return sock_; }
124
125 void run(launcher launch) override
126 {
127 launch(ex_, [](corosio::tcp_socket* sock) -> capy::task<>
128 {
129 // handle connection using sock
130 co_return;
131 }(&sock_));
132 }
133 };
134
135 auto make_workers(io_context& ctx, int n)
136 {
137 std::vector<std::unique_ptr<tcp_server::worker_base>> v;
138 v.reserve(n);
139 for(int i = 0; i < n; ++i)
140 v.push_back(std::make_unique<my_worker>(ctx));
141 return v;
142 }
143
144 io_context ioc;
145 tcp_server srv(ioc, ioc.get_executor());
146 srv.set_workers(make_workers(ioc, 100));
147 @endcode
148
149 @see worker_base, set_workers, launcher
150 */
151 class BOOST_COROSIO_DECL tcp_server
152 {
153 public:
154 class worker_base; ///< Abstract base for connection handlers.
155 class launcher; ///< Move-only handle to launch worker coroutines.
156
157 private:
158 struct waiter
159 {
160 waiter* next;
161 std::coroutine_handle<> h;
162 detail::continuation_op cont_op;
163 worker_base* w;
164 };
165
166 struct impl;
167
168 static impl* make_impl(capy::execution_context& ctx);
169
170 impl* impl_;
171 capy::any_executor ex_;
172 waiter* waiters_ = nullptr;
173 worker_base* idle_head_ = nullptr; // Forward list: available workers
174 worker_base* active_head_ =
175 nullptr; // Doubly linked: workers handling connections
176 worker_base* active_tail_ = nullptr; // Tail for O(1) push_back
177 std::size_t active_accepts_ = 0; // Number of active do_accept coroutines
178 std::shared_ptr<void> storage_; // Owns the worker container (type-erased)
179 bool running_ = false;
180
181 // Idle list (forward/singly linked) - push front, pop front
182 154x void idle_push(worker_base* w) noexcept
183 {
184 154x w->next_ = idle_head_;
185 154x idle_head_ = w;
186 154x }
187
188 36x worker_base* idle_pop() noexcept
189 {
190 36x auto* w = idle_head_;
191 36x if (w)
192 36x idle_head_ = w->next_;
193 36x return w;
194 }
195
196 42x bool idle_empty() const noexcept
197 {
198 42x return idle_head_ == nullptr;
199 }
200
201 // Active list (doubly linked) - push back, remove anywhere
202 18x void active_push(worker_base* w) noexcept
203 {
204 18x w->next_ = nullptr;
205 18x w->prev_ = active_tail_;
206 18x if (active_tail_)
207 4x active_tail_->next_ = w;
208 else
209 14x active_head_ = w;
210 18x active_tail_ = w;
211 18x }
212
213 42x void active_remove(worker_base* w) noexcept
214 {
215 // Skip if not in active list (e.g., after failed accept)
216 42x if (w != active_head_ && w->prev_ == nullptr)
217 24x return;
218 18x if (w->prev_)
219 4x w->prev_->next_ = w->next_;
220 else
221 14x active_head_ = w->next_;
222 18x if (w->next_)
223 2x w->next_->prev_ = w->prev_;
224 else
225 16x active_tail_ = w->prev_;
226 18x w->prev_ = nullptr; // Mark as not in active list
227 }
228
229 template<capy::Executor Ex>
230 struct launch_wrapper
231 {
232 struct promise_type
233 {
234 Ex ex; // Executor stored directly in frame (outlives child tasks)
235 capy::io_env env_;
236
237 // For regular coroutines: first arg is executor, second is stop token
238 template<class E, class S, class... Args>
239 requires capy::Executor<std::decay_t<E>>
240 promise_type(E e, S s, Args&&...)
241 : ex(std::move(e))
242 , env_{
243 capy::executor_ref(ex), std::move(s),
244 capy::get_current_frame_allocator()}
245 {
246 }
247
248 // For lambda coroutines: first arg is closure, second is executor, third is stop token
249 template<class Closure, class E, class S, class... Args>
250 requires(!capy::Executor<std::decay_t<Closure>> &&
251 capy::Executor<std::decay_t<E>>)
252 18x promise_type(Closure&&, E e, S s, Args&&...)
253 18x : ex(std::move(e))
254 18x , env_{
255 18x capy::executor_ref(ex), std::move(s),
256 18x capy::get_current_frame_allocator()}
257 {
258 18x }
259
260 18x launch_wrapper get_return_object() noexcept
261 {
262 return {
263 18x std::coroutine_handle<promise_type>::from_promise(*this)};
264 }
265 18x std::suspend_always initial_suspend() noexcept
266 {
267 18x return {};
268 }
269 18x std::suspend_never final_suspend() noexcept
270 {
271 18x return {};
272 }
273 18x void return_void() noexcept {}
274 void unhandled_exception()
275 {
276 std::terminate();
277 }
278
279 // Inject io_env for IoAwaitable
280 template<capy::IoAwaitable Awaitable>
281 36x auto await_transform(Awaitable&& a)
282 {
283 using AwaitableT = std::decay_t<Awaitable>;
284 struct adapter
285 {
286 AwaitableT aw;
287 capy::io_env const* env;
288
289 bool await_ready()
290 {
291 return aw.await_ready();
292 }
293 decltype(auto) await_resume()
294 {
295 return aw.await_resume();
296 }
297
298 auto await_suspend(std::coroutine_handle<promise_type> h)
299 {
300 return aw.await_suspend(h, env);
301 }
302 };
303 72x return adapter{std::forward<Awaitable>(a), &env_};
304 36x }
305 };
306
307 std::coroutine_handle<promise_type> h;
308
309 18x launch_wrapper(std::coroutine_handle<promise_type> handle) noexcept
310 18x : h(handle)
311 {
312 18x }
313
314 18x ~launch_wrapper()
315 {
316 18x if (h)
317 h.destroy();
318 18x }
319
320 launch_wrapper(launch_wrapper&& o) noexcept
321 : h(std::exchange(o.h, nullptr))
322 {
323 }
324
325 launch_wrapper(launch_wrapper const&) = delete;
326 launch_wrapper& operator=(launch_wrapper const&) = delete;
327 launch_wrapper& operator=(launch_wrapper&&) = delete;
328 };
329
330 // Named functor to avoid incomplete lambda type in coroutine promise
331 template<class Executor>
332 struct launch_coro
333 {
334 18x launch_wrapper<Executor> operator()(
335 Executor,
336 std::stop_token,
337 tcp_server* self,
338 capy::task<void> t,
339 worker_base* wp)
340 {
341 // Executor and stop token stored in promise via constructor
342 co_await std::move(t);
343 co_await self->push(*wp); // worker goes back to idle list
344 36x }
345 };
346
347 class push_awaitable
348 {
349 tcp_server& self_;
350 worker_base& w_;
351 detail::continuation_op cont_op_;
352
353 public:
354 38x push_awaitable(tcp_server& self, worker_base& w) noexcept
355 38x : self_(self)
356 38x , w_(w)
357 {
358 38x }
359
360 38x bool await_ready() const noexcept
361 {
362 38x return false;
363 }
364
365 std::coroutine_handle<>
366 38x await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
367 {
368 // Symmetric transfer to server's executor
369 38x cont_op_.cont.h = h;
370 38x return self_.ex_.dispatch(cont_op_.cont);
371 }
372
373 38x void await_resume() noexcept
374 {
375 // Running on server executor - safe to modify lists
376 // Remove from active (if present), then wake waiter or add to idle
377 38x self_.active_remove(&w_);
378 38x if (self_.waiters_)
379 {
380 6x auto* wait = self_.waiters_;
381 6x self_.waiters_ = wait->next;
382 6x wait->w = &w_;
383 6x wait->cont_op.cont.h = wait->h;
384 6x self_.ex_.post(wait->cont_op.cont);
385 }
386 else
387 {
388 32x self_.idle_push(&w_);
389 }
390 38x }
391 };
392
393 class pop_awaitable
394 {
395 tcp_server& self_;
396 waiter wait_;
397
398 public:
399 42x pop_awaitable(tcp_server& self) noexcept : self_(self), wait_{} {}
400
401 42x bool await_ready() const noexcept
402 {
403 42x return !self_.idle_empty();
404 }
405
406 bool
407 6x await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
408 {
409 // Running on server executor (do_accept runs there)
410 6x wait_.h = h;
411 6x wait_.w = nullptr;
412 6x wait_.next = self_.waiters_;
413 6x self_.waiters_ = &wait_;
414 6x return true;
415 }
416
417 42x worker_base& await_resume() noexcept
418 {
419 // Running on server executor
420 42x if (wait_.w)
421 6x return *wait_.w; // Woken by push_awaitable
422 36x return *self_.idle_pop();
423 }
424 };
425
426 38x push_awaitable push(worker_base& w)
427 {
428 38x return push_awaitable{*this, w};
429 }
430
431 // Synchronous version for destructor/guard paths
432 // Must be called from server executor context
433 4x void push_sync(worker_base& w) noexcept
434 {
435 4x active_remove(&w);
436 4x if (waiters_)
437 {
438 auto* wait = waiters_;
439 waiters_ = wait->next;
440 wait->w = &w;
441 wait->cont_op.cont.h = wait->h;
442 ex_.post(wait->cont_op.cont);
443 }
444 else
445 {
446 4x idle_push(&w);
447 }
448 4x }
449
450 42x pop_awaitable pop()
451 {
452 42x return pop_awaitable{*this};
453 }
454
455 capy::task<void> do_accept(tcp_acceptor& acc);
456
457 public:
458 /** Abstract base class for connection handlers.
459
460 Derive from this class to implement custom connection handling.
461 Each worker owns a socket and is reused across multiple
462 connections to avoid per-connection allocation.
463
464 @see tcp_server, launcher
465 */
466 class BOOST_COROSIO_DECL worker_base
467 {
468 // Ordered largest to smallest for optimal packing
469 std::stop_source stop_; // ~16 bytes
470 worker_base* next_ = nullptr; // 8 bytes - used by idle and active lists
471 worker_base* prev_ = nullptr; // 8 bytes - used only by active list
472
473 friend class tcp_server;
474
475 public:
476 /// Construct a worker.
477 worker_base();
478
479 /// Destroy the worker.
480 virtual ~worker_base();
481
482 /** Handle an accepted connection.
483
484 Called when this worker is dispatched to handle a new
485 connection. The implementation must invoke the launcher
486 exactly once to start the handling coroutine.
487
488 @param launch Handle to launch the connection coroutine.
489 */
490 virtual void run(launcher launch) = 0;
491
492 /// Return the socket used for connections.
493 virtual corosio::tcp_socket& socket() = 0;
494 };
495
496 /** Move-only handle to launch a worker coroutine.
497
498 Passed to @ref worker_base::run to start the connection-handling
499 coroutine. The launcher ensures the worker returns to the idle
500 pool when the coroutine completes or if launching fails.
501
502 The launcher must be invoked exactly once via `operator()`.
503 If destroyed without invoking, the worker is returned to the
504 idle pool automatically.
505
506 @see worker_base::run
507 */
508 class BOOST_COROSIO_DECL launcher
509 {
510 tcp_server* srv_;
511 worker_base* w_;
512
513 friend class tcp_server;
514
515 22x launcher(tcp_server& srv, worker_base& w) noexcept : srv_(&srv), w_(&w)
516 {
517 22x }
518
519 public:
520 /// Return the worker to the pool if not launched.
521 22x ~launcher()
522 {
523 22x if (w_)
524 4x srv_->push_sync(*w_);
525 22x }
526
527 launcher(launcher&& o) noexcept
528 : srv_(o.srv_)
529 , w_(std::exchange(o.w_, nullptr))
530 {
531 }
532 launcher(launcher const&) = delete;
533 launcher& operator=(launcher const&) = delete;
534 launcher& operator=(launcher&&) = delete;
535
536 /** Launch the connection-handling coroutine.
537
538 Starts the given coroutine on the specified executor. When
539 the coroutine completes, the worker is automatically returned
540 to the idle pool.
541
542 @param ex The executor to run the coroutine on.
543 @param task The coroutine to execute.
544
545 @throws std::logic_error If this launcher was already invoked.
546 */
547 template<class Executor>
548 20x void operator()(Executor const& ex, capy::task<void> task)
549 {
550 20x if (!w_)
551 2x detail::throw_logic_error(); // launcher already invoked
552
553 18x auto* w = std::exchange(w_, nullptr);
554
555 // Worker is being dispatched - add to active list
556 18x srv_->active_push(w);
557
558 // Return worker to pool if coroutine setup throws
559 struct guard_t
560 {
561 tcp_server* srv;
562 worker_base* w;
563 18x ~guard_t()
564 {
565 18x if (w)
566 srv->push_sync(*w);
567 18x }
568 18x } guard{srv_, w};
569
570 // Reset worker's stop source for this connection
571 18x w->stop_ = {};
572 18x auto st = w->stop_.get_token();
573
574 18x auto wrapper =
575 18x launch_coro<Executor>{}(ex, st, srv_, std::move(task), w);
576
577 // Executor and stop token stored in promise via constructor
578 18x ex.post(std::exchange(wrapper.h, nullptr)); // Release before post
579 18x guard.w = nullptr; // Success - dismiss guard
580 18x }
581 };
582
583 /** Construct a TCP server.
584
585 @tparam Ctx Execution context type satisfying ExecutionContext.
586 @tparam Ex Executor type satisfying Executor.
587
588 @param ctx The execution context for socket operations.
589 @param ex The executor for dispatching coroutines.
590
591 @par Example
592 @code
593 tcp_server srv(ctx, ctx.get_executor());
594 srv.set_workers(make_workers(ctx, 100));
595 srv.bind(endpoint{...});
596 srv.start();
597 @endcode
598 */
599 template<capy::ExecutionContext Ctx, capy::Executor Ex>
600 34x tcp_server(Ctx& ctx, Ex ex) : impl_(make_impl(ctx))
601 34x , ex_(std::move(ex))
602 {
603 34x }
604
605 public:
606 /// Destroy the server, stopping all accept loops.
607 ~tcp_server();
608
609 tcp_server(tcp_server const&) = delete;
610 tcp_server& operator=(tcp_server const&) = delete;
611
612 /** Move construct from another server.
613
614 @param o The source server. After the move, @p o is
615 in a valid but unspecified state.
616 */
617 tcp_server(tcp_server&& o) noexcept;
618
619 /** Move assign from another server.
620
621 @param o The source server. After the move, @p o is
622 in a valid but unspecified state.
623
624 @return `*this`.
625 */
626 tcp_server& operator=(tcp_server&& o) noexcept;
627
628 /** Bind to a local endpoint.
629
630 Creates an acceptor listening on the specified endpoint.
631 Multiple endpoints can be bound by calling this method
632 multiple times before @ref start.
633
634 @param ep The local endpoint to bind to.
635
636 @return The error code if binding fails.
637 */
638 std::error_code bind(endpoint ep);
639
640 /** Set the worker pool.
641
642 Replaces any existing workers with the given range. Any
643 previous workers are released and the idle/active lists
644 are cleared before populating with new workers.
645
646 @tparam Range Forward range of pointer-like objects to worker_base.
647
648 @param workers Range of workers to manage. Each element must
649 support `std::to_address()` yielding `worker_base*`.
650
651 @par Example
652 @code
653 std::vector<std::unique_ptr<my_worker>> workers;
654 for(int i = 0; i < 100; ++i)
655 workers.push_back(std::make_unique<my_worker>(ctx));
656 srv.set_workers(std::move(workers));
657 @endcode
658 */
659 template<std::ranges::forward_range Range>
660 requires std::convertible_to<
661 decltype(std::to_address(
662 std::declval<std::ranges::range_value_t<Range>&>())),
663 worker_base*>
664 34x void set_workers(Range&& workers)
665 {
666 // Clear existing state
667 34x storage_.reset();
668 34x idle_head_ = nullptr;
669 34x active_head_ = nullptr;
670 34x active_tail_ = nullptr;
671
672 // Take ownership and populate idle list
673 using StorageType = std::decay_t<Range>;
674 34x auto* p = new StorageType(std::forward<Range>(workers));
675 34x storage_ = std::shared_ptr<void>(
676 34x p, [](void* ptr) { delete static_cast<StorageType*>(ptr); });
677 152x for (auto&& elem : *static_cast<StorageType*>(p))
678 118x idle_push(std::to_address(elem));
679 34x }
680
681 /** Start accepting connections.
682
683 Launches accept loops for all bound endpoints. Incoming
684 connections are dispatched to idle workers from the pool.
685
686 Calling `start()` on an already-running server has no effect.
687
688 @par Preconditions
689 - At least one endpoint bound via @ref bind.
690 - Workers provided to the constructor.
691 - If restarting, @ref join must have completed first.
692
693 @par Effects
694 Creates one accept coroutine per bound endpoint. Each coroutine
695 runs on the server's executor, waiting for connections and
696 dispatching them to idle workers.
697
698 @par Restart Sequence
699 To restart after stopping, complete the full shutdown cycle:
700 @code
701 srv.start();
702 ioc.run_for( 1s );
703 srv.stop(); // 1. Signal shutdown
704 ioc.run(); // 2. Drain remaining completions
705 srv.join(); // 3. Wait for accept loops
706
707 // Now safe to restart
708 srv.start();
709 ioc.run();
710 @endcode
711
712 @par Thread Safety
713 Not thread safe.
714
715 @throws std::logic_error If a previous session has not been
716 joined (accept loops still active).
717 */
718 void start();
719
720 /** Return the local endpoint for the i-th bound port.
721
722 @param index Zero-based index into the list of bound ports.
723
724 @return The local endpoint, or a default-constructed endpoint
725 if @p index is out of range or the acceptor is not open.
726 */
727 endpoint local_endpoint(std::size_t index = 0) const noexcept;
728
729 /** Stop accepting connections.
730
731 Signals all listening ports to stop accepting new connections
732 and requests cancellation of active workers via their stop tokens.
733
734 This function returns immediately; it does not wait for workers
735 to finish. Pending I/O operations complete asynchronously.
736
737 Calling `stop()` on a non-running server has no effect.
738
739 @par Effects
740 - Closes all acceptors (pending accepts complete with error).
741 - Requests stop on each active worker's stop token.
742 - Workers observing their stop token should exit promptly.
743
744 @par Postconditions
745 No new connections will be accepted. Active workers continue
746 until they observe their stop token or complete naturally.
747
748 @par What Happens Next
749 After calling `stop()`:
750 1. Let `ioc.run()` return (drains pending completions).
751 2. Call @ref join to wait for accept loops to finish.
752 3. Only then is it safe to restart or destroy the server.
753
754 @par Thread Safety
755 Not thread safe.
756
757 @see join, start
758 */
759 void stop();
760
761 /** Block until all accept loops complete.
762
763 Blocks the calling thread until all accept coroutines launched
764 by @ref start have finished executing. This synchronizes the
765 shutdown sequence, ensuring the server is fully stopped before
766 restarting or destroying it.
767
768 @par Preconditions
769 @ref stop has been called and `ioc.run()` has returned.
770
771 @par Postconditions
772 All accept loops have completed. The server is in the stopped
773 state and may be restarted via @ref start.
774
775 @par Example (Correct Usage)
776 @code
777 // main thread
778 srv.start();
779 ioc.run(); // Blocks until work completes
780 srv.join(); // Safe: called after ioc.run() returns
781 @endcode
782
783 @par WARNING: Deadlock Scenarios
784 Calling `join()` from the wrong context causes deadlock:
785
786 @code
787 // WRONG: calling join() from inside a worker coroutine
788 void run( launcher launch ) override
789 {
790 launch( ex, [this]() -> capy::task<>
791 {
792 srv_.join(); // DEADLOCK: blocks the executor
793 co_return;
794 }());
795 }
796
797 // WRONG: calling join() while ioc.run() is still active
798 std::thread t( [&]{ ioc.run(); } );
799 srv.stop();
800 srv.join(); // DEADLOCK: ioc.run() still running in thread t
801 @endcode
802
803 @par Thread Safety
804 May be called from any thread, but will deadlock if called
805 from within the io_context event loop or from a worker coroutine.
806
807 @see stop, start
808 */
809 void join();
810
811 private:
812 capy::task<> do_stop();
813 };
814
815 #ifdef _MSC_VER
816 #pragma warning(pop)
817 #endif
818
819 } // namespace boost::corosio
820
821 #endif
822