TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
12 :
13 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 : #include <boost/corosio/io/io_object.hpp>
15 : #include <boost/corosio/endpoint.hpp>
16 : #include <boost/corosio/detail/continuation_op.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 :
19 : #include <atomic>
20 : #include <coroutine>
21 : #include <cstddef>
22 : #include <memory>
23 : #include <optional>
24 : #include <stop_token>
25 : #include <system_error>
26 :
27 : #include <errno.h>
28 :
29 : #include <netinet/in.h>
30 : #include <sys/socket.h>
31 : #include <sys/uio.h>
32 :
33 : namespace boost::corosio::detail {
34 :
35 : /** Base operation for reactor-based backends.
36 :
37 : Holds per-operation state that depends on the concrete backend
38 : socket/acceptor types: coroutine handle, executor, output
39 : pointers, file descriptor, stop_callback, and type-specific
40 : impl pointers.
41 :
42 : Fields shared across all backends (errn, bytes_transferred,
43 : cancelled, impl_ptr, perform_io, complete) live in
44 : reactor_op_base so the scheduler and descriptor_state can
45 : access them without template instantiation.
46 :
47 : @tparam Socket The backend socket impl type (forward-declared).
48 : @tparam Acceptor The backend acceptor impl type (forward-declared).
49 : */
50 : template<class Socket, class Acceptor>
51 : struct reactor_op : reactor_op_base
52 : {
53 : // The op envelope — coroutine handle h, cont_op, executor ex, ec_out,
54 : // bytes_out, cancelled, stop_cb (+ its canceller), impl_ptr — lives in
55 : // coro_op (via reactor_op_base) and is shared with io_uring/IOCP.
56 : // reactor_op adds only the reactor-specific routing state below.
57 :
58 : /// File descriptor this operation targets.
59 : int fd = -1;
60 :
61 : /// Owning socket impl (for stop_token cancellation routing).
62 : Socket* socket_impl_ = nullptr;
63 :
64 : /// Owning acceptor impl (for stop_token cancellation routing).
65 : Acceptor* acceptor_impl_ = nullptr;
66 :
67 HIT 153134 : reactor_op() = default;
68 :
69 : /// Reset operation state for reuse.
70 459920 : void reset() noexcept
71 : {
72 459920 : fd = -1;
73 459920 : errn = 0;
74 459920 : bytes_transferred = 0;
75 459920 : cancelled.store(false, std::memory_order_relaxed);
76 459920 : impl_ptr.reset();
77 459920 : socket_impl_ = nullptr;
78 459920 : acceptor_impl_ = nullptr;
79 459920 : }
80 :
81 : /// Return true if this is a read-direction operation.
82 44482 : virtual bool is_read_operation() const noexcept
83 : {
84 44482 : return false;
85 : }
86 :
87 : /// Cancel this operation via the owning impl.
88 : virtual void cancel() noexcept = 0;
89 :
90 : /// coro_op cancellation hook (fired by the shared canceller when the
91 : /// stop_token requests cancellation): route to the impl-specific cancel().
92 288 : void on_cancel() noexcept override
93 : {
94 288 : cancel();
95 288 : }
96 :
97 : /// Destroy without invoking.
98 12 : void destroy() override
99 : {
100 12 : stop_cb.reset();
101 12 : reactor_op_base::destroy();
102 12 : }
103 :
104 : /// Arm the stop-token callback for a socket operation.
105 97744 : void start(std::stop_token const& token, Socket* impl)
106 : {
107 97744 : socket_impl_ = impl;
108 97744 : acceptor_impl_ = nullptr;
109 97744 : coro_op::start(token);
110 97744 : }
111 :
112 : /// Arm the stop-token callback for an acceptor operation.
113 8205 : void start(std::stop_token const& token, Acceptor* impl)
114 : {
115 8205 : socket_impl_ = nullptr;
116 8205 : acceptor_impl_ = impl;
117 8205 : coro_op::start(token);
118 8205 : }
119 : };
120 :
121 : /** Shared connect operation.
122 :
123 : Checks SO_ERROR for connect completion status. The operator()()
124 : and cancel() are provided by the concrete backend type.
125 :
126 : @tparam Base The backend's base op type.
127 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
128 : */
129 : template<class Base, class Endpoint = endpoint>
130 : struct reactor_connect_op : Base
131 : {
132 : /// Endpoint to connect to.
133 : Endpoint target_endpoint;
134 :
135 : /// Reset operation state for reuse.
136 8205 : void reset() noexcept
137 : {
138 8205 : Base::reset();
139 8205 : target_endpoint = Endpoint{};
140 8205 : }
141 :
142 8141 : void perform_io() noexcept override
143 : {
144 8141 : int err = 0;
145 8141 : socklen_t len = sizeof(err);
146 8141 : if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
147 MIS 0 : err = errno;
148 HIT 8141 : this->complete(err, 0);
149 8141 : }
150 : };
151 :
152 : /** Readiness-only wait operation.
153 :
154 : Does not perform any I/O syscall. Completion is signalled by
155 : the reactor delivering the requested edge event; reactor_descriptor_state
156 : calls complete() directly and never invokes perform_io().
157 :
158 : @tparam Base The backend's base op type.
159 : */
160 : template<class Base>
161 : struct reactor_wait_op : Base
162 : {
163 : /* Mirror of reactor_event_read from reactor_descriptor_state.hpp.
164 : Including that header from here would create an include cycle
165 : (descriptor_state -> reactor_op_base; reactor_op -> reactor_op_base),
166 : so we carry the value locally. Both must stay in sync. */
167 : static constexpr std::uint32_t read_event = 0x001;
168 :
169 : /// Which event bit this wait targets (reactor_event_read/write/error).
170 : std::uint32_t wait_event = 0;
171 :
172 72 : void reset() noexcept
173 : {
174 72 : Base::reset();
175 72 : wait_event = 0;
176 72 : }
177 :
178 MIS 0 : bool is_read_operation() const noexcept override
179 : {
180 0 : return wait_event == read_event;
181 : }
182 :
183 : /* perform_io() should never be called for a wait op — readiness
184 : IS the completion. Overridden here to satisfy the virtual and
185 : produce a safe result if called defensively. */
186 0 : void perform_io() noexcept override
187 : {
188 0 : this->complete(0, 0);
189 0 : }
190 : };
191 :
192 : /** Shared scatter-read operation.
193 :
194 : Uses readv() with an EINTR retry loop.
195 :
196 : @tparam Base The backend's base op type.
197 : */
198 : template<class Base>
199 : struct reactor_read_op : Base
200 : {
201 : /// Maximum scatter-gather buffer count.
202 : static constexpr std::size_t max_buffers = 16;
203 :
204 : /// Scatter-gather I/O vectors.
205 : iovec iovecs[max_buffers];
206 :
207 : /// Number of active I/O vectors.
208 : int iovec_count = 0;
209 :
210 : /// True for zero-length reads (completed immediately).
211 : bool empty_buffer_read = false;
212 :
213 : /// Return true (this is a read-direction operation).
214 HIT 44865 : bool is_read_operation() const noexcept override
215 : {
216 44865 : return !empty_buffer_read;
217 : }
218 :
219 221795 : void reset() noexcept
220 : {
221 221795 : Base::reset();
222 221795 : iovec_count = 0;
223 221795 : empty_buffer_read = false;
224 221795 : }
225 :
226 638 : void perform_io() noexcept override
227 : {
228 : ssize_t n;
229 : do
230 : {
231 638 : n = ::readv(this->fd, iovecs, iovec_count);
232 : }
233 638 : while (n < 0 && errno == EINTR);
234 :
235 638 : if (n >= 0)
236 403 : this->complete(0, static_cast<std::size_t>(n));
237 : else
238 235 : this->complete(errno, 0);
239 638 : }
240 : };
241 :
242 : /** Shared gather-write operation.
243 :
244 : Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
245 : which returns ssize_t (bytes written or -1 with errno set).
246 :
247 : @tparam Base The backend's base op type.
248 : @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
249 : */
250 : template<class Base, class WritePolicy>
251 : struct reactor_write_op : Base
252 : {
253 : /// The write syscall policy type.
254 : using write_policy = WritePolicy;
255 :
256 : /// Maximum scatter-gather buffer count.
257 : static constexpr std::size_t max_buffers = 16;
258 :
259 : /// Scatter-gather I/O vectors.
260 : iovec iovecs[max_buffers];
261 :
262 : /// Number of active I/O vectors.
263 : int iovec_count = 0;
264 :
265 221189 : void reset() noexcept
266 : {
267 221189 : Base::reset();
268 221189 : iovec_count = 0;
269 221189 : }
270 :
271 132 : void perform_io() noexcept override
272 : {
273 132 : ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
274 132 : if (n >= 0)
275 129 : this->complete(0, static_cast<std::size_t>(n));
276 : else
277 3 : this->complete(errno, 0);
278 132 : }
279 : };
280 :
281 : /** Shared accept operation.
282 :
283 : Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
284 : which returns the accepted fd or -1 with errno set.
285 :
286 : @tparam Base The backend's base op type.
287 : @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
288 : */
289 : template<class Base, class AcceptPolicy>
290 : struct reactor_accept_op : Base
291 : {
292 : /// File descriptor of the accepted connection.
293 : int accepted_fd = -1;
294 :
295 : /// Pointer to the peer socket implementation.
296 : io_object::implementation* peer_impl = nullptr;
297 :
298 : /// Output pointer for the accepted implementation.
299 : io_object::implementation** impl_out = nullptr;
300 :
301 : /// Peer address storage filled by accept.
302 : sockaddr_storage peer_storage{};
303 :
304 : /// Peer address length returned by accept.
305 : socklen_t peer_addrlen = 0;
306 :
307 8193 : void reset() noexcept
308 : {
309 8193 : Base::reset();
310 8193 : accepted_fd = -1;
311 8193 : peer_impl = nullptr;
312 8193 : impl_out = nullptr;
313 8193 : peer_storage = {};
314 8193 : peer_addrlen = 0;
315 8193 : }
316 :
317 8132 : void perform_io() noexcept override
318 : {
319 8132 : int new_fd = AcceptPolicy::do_accept(
320 8132 : this->fd, peer_storage, peer_addrlen);
321 8132 : if (new_fd >= 0)
322 : {
323 8132 : accepted_fd = new_fd;
324 8132 : this->complete(0, 0);
325 : }
326 : else
327 : {
328 MIS 0 : this->complete(errno, 0);
329 : }
330 HIT 8132 : }
331 : };
332 :
333 : /** Shared connected send operation for datagram sockets.
334 :
335 : Uses sendmsg() with msg_name=nullptr (connected mode).
336 :
337 : @tparam Base The backend's base op type.
338 : */
339 : template<class Base>
340 : struct reactor_send_op : Base
341 : {
342 : /// Maximum scatter-gather buffer count.
343 : static constexpr std::size_t max_buffers = 16;
344 :
345 : /// Scatter-gather I/O vectors.
346 : iovec iovecs[max_buffers];
347 :
348 : /// Number of active I/O vectors.
349 : int iovec_count = 0;
350 :
351 : /// User-supplied message flags.
352 : int msg_flags = 0;
353 :
354 106 : void reset() noexcept
355 : {
356 106 : Base::reset();
357 106 : iovec_count = 0;
358 106 : msg_flags = 0;
359 106 : }
360 :
361 32 : void perform_io() noexcept override
362 : {
363 32 : msghdr msg{};
364 32 : msg.msg_iov = iovecs;
365 32 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
366 :
367 : #ifdef MSG_NOSIGNAL
368 32 : int send_flags = msg_flags | MSG_NOSIGNAL;
369 : #else
370 : int send_flags = msg_flags;
371 : #endif
372 :
373 : ssize_t n;
374 : do
375 : {
376 32 : n = ::sendmsg(this->fd, &msg, send_flags);
377 : }
378 32 : while (n < 0 && errno == EINTR);
379 :
380 32 : if (n >= 0)
381 30 : this->complete(0, static_cast<std::size_t>(n));
382 : else
383 2 : this->complete(errno, 0);
384 32 : }
385 : };
386 :
387 : /** Shared connected recv operation for datagram sockets.
388 :
389 : Uses recvmsg() with msg_name=nullptr (connected mode).
390 : Unlike reactor_read_op, does not map n==0 to EOF
391 : (zero-length datagrams are valid).
392 :
393 : @tparam Base The backend's base op type.
394 : */
395 : template<class Base>
396 : struct reactor_recv_op : Base
397 : {
398 : /// Maximum scatter-gather buffer count.
399 : static constexpr std::size_t max_buffers = 16;
400 :
401 : /// Scatter-gather I/O vectors.
402 : iovec iovecs[max_buffers];
403 :
404 : /// Number of active I/O vectors.
405 : int iovec_count = 0;
406 :
407 : /// User-supplied message flags.
408 : int msg_flags = 0;
409 :
410 : /// Return true (this is a read-direction operation).
411 MIS 0 : bool is_read_operation() const noexcept override
412 : {
413 0 : return true;
414 : }
415 :
416 HIT 104 : void reset() noexcept
417 : {
418 104 : Base::reset();
419 104 : iovec_count = 0;
420 104 : msg_flags = 0;
421 104 : }
422 :
423 37 : void perform_io() noexcept override
424 : {
425 37 : msghdr msg{};
426 37 : msg.msg_iov = iovecs;
427 37 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
428 :
429 : ssize_t n;
430 : do
431 : {
432 37 : n = ::recvmsg(this->fd, &msg, msg_flags);
433 : }
434 37 : while (n < 0 && errno == EINTR);
435 :
436 37 : if (n >= 0)
437 34 : this->complete(0, static_cast<std::size_t>(n));
438 : else
439 3 : this->complete(errno, 0);
440 37 : }
441 : };
442 :
443 : /** Shared send_to operation for datagram sockets.
444 :
445 : Uses sendmsg() with the destination endpoint in msg_name.
446 :
447 : @tparam Base The backend's base op type.
448 : */
449 : template<class Base>
450 : struct reactor_send_to_op : Base
451 : {
452 : /// Maximum scatter-gather buffer count.
453 : static constexpr std::size_t max_buffers = 16;
454 :
455 : /// Scatter-gather I/O vectors.
456 : iovec iovecs[max_buffers];
457 :
458 : /// Number of active I/O vectors.
459 : int iovec_count = 0;
460 :
461 : /// Destination address storage.
462 : sockaddr_storage dest_storage{};
463 :
464 : /// Destination address length.
465 : socklen_t dest_len = 0;
466 :
467 : /// User-supplied message flags.
468 : int msg_flags = 0;
469 :
470 122 : void reset() noexcept
471 : {
472 122 : Base::reset();
473 122 : iovec_count = 0;
474 122 : dest_storage = {};
475 122 : dest_len = 0;
476 122 : msg_flags = 0;
477 122 : }
478 :
479 32 : void perform_io() noexcept override
480 : {
481 32 : msghdr msg{};
482 32 : msg.msg_name = &dest_storage;
483 32 : msg.msg_namelen = dest_len;
484 32 : msg.msg_iov = iovecs;
485 32 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
486 :
487 : #ifdef MSG_NOSIGNAL
488 32 : int send_flags = msg_flags | MSG_NOSIGNAL;
489 : #else
490 : int send_flags = msg_flags;
491 : #endif
492 :
493 : ssize_t n;
494 : do
495 : {
496 32 : n = ::sendmsg(this->fd, &msg, send_flags);
497 : }
498 32 : while (n < 0 && errno == EINTR);
499 :
500 32 : if (n >= 0)
501 30 : this->complete(0, static_cast<std::size_t>(n));
502 : else
503 2 : this->complete(errno, 0);
504 32 : }
505 : };
506 :
507 : /** Shared recv_from operation for datagram sockets.
508 :
509 : Uses recvmsg() with msg_name to capture the source endpoint.
510 :
511 : @tparam Base The backend's base op type.
512 : @tparam Endpoint The endpoint type (endpoint or local_endpoint).
513 : */
514 : template<class Base, class Endpoint = endpoint>
515 : struct reactor_recv_from_op : Base
516 : {
517 : /// Maximum scatter-gather buffer count.
518 : static constexpr std::size_t max_buffers = 16;
519 :
520 : /// Scatter-gather I/O vectors.
521 : iovec iovecs[max_buffers];
522 :
523 : /// Number of active I/O vectors.
524 : int iovec_count = 0;
525 :
526 : /// Source address storage filled by recvmsg.
527 : sockaddr_storage source_storage{};
528 :
529 : /// Actual source address length returned by recvmsg.
530 : socklen_t source_addrlen = 0;
531 :
532 : /// Output pointer for the source endpoint (set by do_recv_from).
533 : Endpoint* source_out = nullptr;
534 :
535 : /// User-supplied message flags.
536 : int msg_flags = 0;
537 :
538 : /// Return true (this is a read-direction operation).
539 MIS 0 : bool is_read_operation() const noexcept override
540 : {
541 0 : return true;
542 : }
543 :
544 HIT 134 : void reset() noexcept
545 : {
546 134 : Base::reset();
547 134 : iovec_count = 0;
548 134 : source_storage = {};
549 134 : source_addrlen = 0;
550 134 : source_out = nullptr;
551 134 : msg_flags = 0;
552 134 : }
553 :
554 39 : void perform_io() noexcept override
555 : {
556 39 : msghdr msg{};
557 39 : msg.msg_name = &source_storage;
558 39 : msg.msg_namelen = sizeof(source_storage);
559 39 : msg.msg_iov = iovecs;
560 39 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
561 :
562 : ssize_t n;
563 : do
564 : {
565 39 : n = ::recvmsg(this->fd, &msg, msg_flags);
566 : }
567 39 : while (n < 0 && errno == EINTR);
568 :
569 39 : if (n >= 0)
570 : {
571 36 : source_addrlen = msg.msg_namelen;
572 36 : this->complete(0, static_cast<std::size_t>(n));
573 : }
574 : else
575 3 : this->complete(errno, 0);
576 39 : }
577 : };
578 :
579 : } // namespace boost::corosio::detail
580 :
581 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
|