TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
12 :
13 : #include <boost/corosio/detail/dispatch_coro.hpp>
14 : #include <boost/corosio/native/detail/coro_op_complete.hpp>
15 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
16 : #include <boost/corosio/native/detail/make_err.hpp>
17 : #include <boost/corosio/io/io_object.hpp>
18 :
19 : #include <coroutine>
20 : #include <mutex>
21 : #include <utility>
22 :
23 : #include <netinet/in.h>
24 : #include <sys/socket.h>
25 : #include <unistd.h>
26 :
27 : namespace boost::corosio::detail {
28 :
29 : /** Complete a base read/write operation.
30 :
31 : Translates the recorded errno and cancellation state into
32 : an error_code, stores the byte count, then resumes the
33 : caller via symmetric transfer.
34 :
35 : @tparam Op The concrete operation type.
36 : @param op The operation to complete.
37 : */
38 : template<typename Op>
39 : void
40 HIT 89347 : complete_io_op(Op& op)
41 : {
42 89347 : op.stop_cb.reset();
43 89347 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
44 :
45 : // is_read_operation() already folds in the empty-buffer case (it
46 : // returns false for a zero-length read), so empty_buffer stays false
47 : // here and the shared EOF test reduces to the reactor's original
48 : // `is_read && bytes == 0`.
49 178694 : decode_io_result(
50 : op.ec_out,
51 89347 : op.cancelled.load(std::memory_order_acquire),
52 89347 : op.errn != 0 ? make_err(op.errn) : std::error_code{},
53 89347 : op.is_read_operation(), op.bytes_transferred, /*empty_buffer=*/false);
54 :
55 89347 : *op.bytes_out = op.bytes_transferred;
56 :
57 89347 : coro_resume(&op);
58 89347 : }
59 :
60 : /** Complete a datagram recv operation (connected mode).
61 :
62 : Like complete_io_op but does not translate zero bytes into
63 : EOF. Zero-length datagrams are valid and should be reported
64 : as success with 0 bytes transferred.
65 :
66 : @param op The operation to complete.
67 : */
68 : template<typename Op>
69 : void
70 : complete_dgram_recv_op(Op& op)
71 : {
72 : op.stop_cb.reset();
73 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
74 :
75 : // No EOF: a zero-length datagram is valid (success with 0 bytes).
76 : decode_io_result(
77 : op.ec_out,
78 : op.cancelled.load(std::memory_order_acquire),
79 : op.errn != 0 ? make_err(op.errn) : std::error_code{},
80 : /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
81 :
82 : *op.bytes_out = op.bytes_transferred;
83 :
84 : coro_resume(&op);
85 : }
86 :
87 : /** Complete a wait operation.
88 :
89 : Wait operations report only an error_code — no bytes_transferred,
90 : no EOF translation. Used for socket and acceptor wait() awaitables;
91 : picks the impl pointer set by start() to reach the scheduler.
92 :
93 : @tparam Op The concrete wait operation type.
94 : @param op The operation to complete.
95 : */
96 : template<typename Op>
97 : void
98 72 : complete_wait_op(Op& op)
99 : {
100 72 : op.stop_cb.reset();
101 72 : if (op.socket_impl_)
102 60 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
103 12 : else if (auto* sched = op.acceptor_impl_->desc_state_.scheduler_)
104 12 : sched->reset_inline_budget();
105 :
106 : // Wait reports only success/cancel/error — no bytes, no EOF.
107 144 : decode_io_result(
108 : op.ec_out,
109 72 : op.cancelled.load(std::memory_order_acquire),
110 72 : op.errn != 0 ? make_err(op.errn) : std::error_code{},
111 : /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
112 :
113 72 : coro_resume(&op);
114 72 : }
115 :
116 : /** Complete a connect operation with endpoint caching.
117 :
118 : On success, queries the local endpoint via getsockname and
119 : caches both endpoints in the socket impl. Then resumes the
120 : caller via symmetric transfer.
121 :
122 : @tparam Op The concrete connect operation type.
123 : @param op The operation to complete.
124 : */
125 : template<typename Op>
126 : void
127 8205 : complete_connect_op(Op& op)
128 : {
129 8205 : op.stop_cb.reset();
130 8205 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
131 :
132 8205 : bool success =
133 8205 : (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
134 :
135 8205 : if (success && op.socket_impl_)
136 : {
137 : using ep_type = decltype(op.target_endpoint);
138 8173 : ep_type local_ep;
139 8173 : sockaddr_storage local_storage{};
140 8173 : socklen_t local_len = sizeof(local_storage);
141 8173 : if (::getsockname(
142 : op.fd, reinterpret_cast<sockaddr*>(&local_storage),
143 8173 : &local_len) == 0)
144 8153 : local_ep =
145 8173 : from_sockaddr_as(local_storage, local_len, ep_type{});
146 8173 : op.socket_impl_->set_endpoints(local_ep, op.target_endpoint);
147 : }
148 :
149 16386 : decode_io_result(
150 : op.ec_out,
151 8205 : op.cancelled.load(std::memory_order_acquire),
152 8205 : op.errn != 0 ? make_err(op.errn) : std::error_code{},
153 : /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
154 :
155 8205 : coro_resume(&op);
156 8205 : }
157 :
158 : /** Construct and register a peer socket from an accepted fd.
159 :
160 : Creates a new socket impl via the acceptor's associated
161 : socket service, registers it with the scheduler, and caches
162 : the local and remote endpoints.
163 :
164 : @tparam SocketImpl The concrete socket implementation type.
165 : @tparam AcceptorImpl The concrete acceptor implementation type.
166 : @param acceptor_impl The acceptor that accepted the connection.
167 : @param accepted_fd The accepted file descriptor (set to -1 on success).
168 : @param peer_storage The peer address from accept().
169 : @param impl_out Output pointer for the new socket impl.
170 : @param ec_out Output pointer for any error.
171 : @return True on success, false on failure.
172 : */
173 : template<typename SocketImpl, typename AcceptorImpl>
174 : bool
175 8142 : setup_accepted_socket(
176 : AcceptorImpl* acceptor_impl,
177 : int& accepted_fd,
178 : sockaddr_storage const& peer_storage,
179 : socklen_t peer_addrlen,
180 : io_object::implementation** impl_out,
181 : std::error_code* ec_out)
182 : {
183 8142 : auto* socket_svc = acceptor_impl->service().stream_service();
184 8142 : if (!socket_svc)
185 : {
186 MIS 0 : *ec_out = make_err(ENOENT);
187 0 : return false;
188 : }
189 :
190 HIT 8142 : auto& impl = static_cast<SocketImpl&>(*socket_svc->construct());
191 8142 : impl.set_socket(accepted_fd);
192 :
193 8142 : impl.desc_state_.fd = accepted_fd;
194 : {
195 8142 : std::lock_guard lock(impl.desc_state_.mutex);
196 8142 : impl.desc_state_.read_op = nullptr;
197 8142 : impl.desc_state_.write_op = nullptr;
198 8142 : impl.desc_state_.connect_op = nullptr;
199 8142 : }
200 8142 : socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
201 :
202 : using ep_type = decltype(acceptor_impl->local_endpoint());
203 8142 : impl.set_endpoints(
204 : acceptor_impl->local_endpoint(),
205 8142 : from_sockaddr_as(
206 : peer_storage,
207 : peer_addrlen,
208 : ep_type{}));
209 :
210 8142 : if (impl_out)
211 8142 : *impl_out = &impl;
212 8142 : accepted_fd = -1;
213 8142 : return true;
214 : }
215 :
216 : /** Complete an accept operation.
217 :
218 : Sets up the peer socket on success, or closes the accepted
219 : fd on failure. Then resumes the caller via symmetric transfer.
220 :
221 : @tparam SocketImpl The concrete socket implementation type.
222 : @tparam Op The concrete accept operation type.
223 : @param op The operation to complete.
224 : */
225 : template<typename SocketImpl, typename Op>
226 : void
227 8184 : complete_accept_op(Op& op)
228 : {
229 8184 : op.stop_cb.reset();
230 8184 : if (auto* sched = op.acceptor_impl_->desc_state_.scheduler_)
231 8180 : sched->reset_inline_budget();
232 :
233 8184 : bool success =
234 8184 : (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
235 :
236 16364 : decode_io_result(
237 : op.ec_out,
238 8184 : op.cancelled.load(std::memory_order_acquire),
239 8184 : op.errn != 0 ? make_err(op.errn) : std::error_code{},
240 : /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
241 :
242 8184 : if (success && op.accepted_fd >= 0 && op.acceptor_impl_)
243 : {
244 8142 : if (!setup_accepted_socket<SocketImpl>(
245 8142 : op.acceptor_impl_, op.accepted_fd, op.peer_storage,
246 : op.peer_addrlen, op.impl_out, op.ec_out))
247 MIS 0 : success = false;
248 : }
249 :
250 HIT 8184 : if (!success || !op.acceptor_impl_)
251 : {
252 42 : if (op.accepted_fd >= 0)
253 : {
254 2 : ::close(op.accepted_fd);
255 2 : op.accepted_fd = -1;
256 : }
257 42 : if (op.impl_out)
258 42 : *op.impl_out = nullptr;
259 : }
260 :
261 8184 : coro_resume(&op);
262 8184 : }
263 :
264 : /** Complete a datagram operation (send_to or recv_from).
265 :
266 : For recv_from operations, writes the source endpoint from the
267 : recorded sockaddr_storage into the caller's endpoint pointer.
268 : Then resumes the caller via symmetric transfer.
269 :
270 : @tparam Op The concrete datagram operation type.
271 : @param op The operation to complete.
272 : */
273 : template<typename Op>
274 : void
275 58 : complete_datagram_op(Op& op)
276 : {
277 58 : op.stop_cb.reset();
278 58 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
279 :
280 : // No EOF: a zero-length datagram is valid (success with 0 bytes).
281 116 : decode_io_result(
282 : op.ec_out,
283 58 : op.cancelled.load(std::memory_order_acquire),
284 58 : op.errn != 0 ? make_err(op.errn) : std::error_code{},
285 : /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
286 :
287 58 : *op.bytes_out = op.bytes_transferred;
288 :
289 58 : coro_resume(&op);
290 58 : }
291 :
292 : /** Complete a datagram operation with source endpoint capture.
293 :
294 : For recv_from operations, writes the source endpoint from the
295 : recorded sockaddr_storage into the caller's endpoint pointer.
296 : Then resumes the caller via symmetric transfer.
297 :
298 : @tparam Op The concrete datagram operation type.
299 : @param op The operation to complete.
300 : @param source_out Optional pointer to store source endpoint
301 : (non-null for recv_from, null for send_to).
302 : */
303 : template<typename Op, typename Endpoint>
304 : void
305 66 : complete_datagram_op(Op& op, Endpoint* source_out)
306 : {
307 66 : op.stop_cb.reset();
308 66 : op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
309 :
310 : // No EOF: a zero-length datagram is valid (success with 0 bytes).
311 132 : decode_io_result(
312 : op.ec_out,
313 66 : op.cancelled.load(std::memory_order_acquire),
314 66 : op.errn != 0 ? make_err(op.errn) : std::error_code{},
315 : /*is_read=*/false, /*bytes=*/0, /*empty_buffer=*/false);
316 :
317 66 : *op.bytes_out = op.bytes_transferred;
318 :
319 110 : if (source_out && !op.cancelled.load(std::memory_order_acquire) &&
320 44 : op.errn == 0)
321 88 : *source_out = from_sockaddr_as(
322 44 : op.source_storage,
323 : op.source_addrlen,
324 : Endpoint{});
325 :
326 66 : coro_resume(&op);
327 66 : }
328 :
329 : } // namespace boost::corosio::detail
330 :
331 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
|