Line data Source code
1 : #define _GNU_SOURCE /* dup3 */
2 : #include "fd_sock_tile_private.h"
3 : #include "../fd_net_common.h"
4 : #include "../../topo/fd_topo.h"
5 : #include "../../../util/net/fd_eth.h"
6 : #include "../../../util/net/fd_ip4.h"
7 : #include "../../../util/net/fd_udp.h"
8 :
9 : #include <assert.h> /* assert */
10 : #include <stdalign.h> /* alignof */
11 : #include <errno.h>
12 : #include <fcntl.h> /* fcntl */
13 : #include <unistd.h> /* dup3, close */
14 : #include <netinet/in.h> /* sockaddr_in */
15 : #include <sys/socket.h> /* socket */
16 : #include "../../metrics/fd_metrics.h"
17 :
18 : #include "generated/fd_sock_tile_seccomp.h"
19 :
20 : /* recv/sendmmsg packet count in batch and tango burst depth
21 : FIXME make configurable in the future?
22 : FIXME keep in sync with fd_net_tile_topo.c */
23 0 : #define STEM_BURST (64UL)
24 :
25 : /* Place RX socket file descriptors in contiguous integer range. */
26 0 : #define RX_SOCK_FD_MIN (128)
27 :
28 : /* Controls max ancillary data size.
29 : Must be aligned by alignof(struct cmsghdr) */
30 0 : #define FD_SOCK_CMSG_MAX (64UL)
31 :
32 : /* Value of the sock_idx for Firedancer repair intake.
33 : Used to determine whether repair packets should go to shred vs repair tile.
34 : This value is validated at startup. */
35 0 : #define REPAIR_SHRED_SOCKET_ID (4U)
36 :
37 : static ulong
38 : populate_allowed_seccomp( fd_topo_t const * topo,
39 : fd_topo_tile_t const * tile,
40 : ulong out_cnt,
41 0 : struct sock_filter * out ) {
42 0 : FD_SCRATCH_ALLOC_INIT( l, fd_topo_obj_laddr( topo, tile->tile_obj_id ) );
43 0 : fd_sock_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sock_tile_t), sizeof(fd_sock_tile_t) );
44 :
45 0 : populate_sock_filter_policy_fd_sock_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->tx_sock, RX_SOCK_FD_MIN, RX_SOCK_FD_MIN+(uint)ctx->sock_cnt );
46 0 : return sock_filter_policy_fd_sock_tile_instr_cnt;
47 0 : }
48 :
49 : static ulong
50 : populate_allowed_fds( fd_topo_t const * topo,
51 : fd_topo_tile_t const * tile,
52 : ulong out_fds_cnt,
53 0 : int * out_fds ) {
54 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
55 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
56 0 : fd_sock_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sock_tile_t), sizeof(fd_sock_tile_t) );
57 :
58 0 : ulong sock_cnt = ctx->sock_cnt;
59 0 : if( FD_UNLIKELY( out_fds_cnt<sock_cnt+3UL ) ) {
60 0 : FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
61 0 : }
62 :
63 0 : ulong out_cnt = 0UL;
64 :
65 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
66 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
67 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
68 0 : }
69 0 : out_fds[ out_cnt++ ] = ctx->tx_sock;
70 0 : for( ulong j=0UL; j<sock_cnt; j++ ) {
71 0 : out_fds[ out_cnt++ ] = ctx->pollfd[ j ].fd;
72 0 : }
73 0 : return out_cnt;
74 0 : }
75 :
76 : FD_FN_CONST static inline ulong
77 0 : tx_scratch_footprint( void ) {
78 0 : return STEM_BURST * fd_ulong_align_up( FD_NET_MTU, FD_CHUNK_ALIGN );
79 0 : }
80 :
81 : FD_FN_CONST static inline ulong
82 0 : scratch_align( void ) {
83 0 : return 4096UL;
84 0 : }
85 :
86 : FD_FN_PURE static inline ulong
87 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
88 0 : ulong l = FD_LAYOUT_INIT;
89 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_sock_tile_t), sizeof(fd_sock_tile_t) );
90 0 : l = FD_LAYOUT_APPEND( l, alignof(struct iovec), STEM_BURST*sizeof(struct iovec) );
91 0 : l = FD_LAYOUT_APPEND( l, alignof(struct cmsghdr), STEM_BURST*FD_SOCK_CMSG_MAX );
92 0 : l = FD_LAYOUT_APPEND( l, alignof(struct sockaddr_in), STEM_BURST*sizeof(struct sockaddr_in) );
93 0 : l = FD_LAYOUT_APPEND( l, alignof(struct mmsghdr), STEM_BURST*sizeof(struct mmsghdr) );
94 0 : l = FD_LAYOUT_APPEND( l, FD_CHUNK_ALIGN, tx_scratch_footprint() );
95 0 : return FD_LAYOUT_FINI( l, scratch_align() );
96 0 : }
97 :
98 : /* create_udp_socket creates and configures a new UDP socket for the
99 : sock tile at the given file descriptor ID. */
100 :
101 : static void
102 : create_udp_socket( int sock_fd,
103 : uint bind_addr,
104 : ushort udp_port,
105 0 : int so_rcvbuf ) {
106 :
107 0 : if( fcntl( sock_fd, F_GETFD, 0 )!=-1 ) {
108 0 : FD_LOG_ERR(( "file descriptor %d already exists", sock_fd ));
109 0 : } else if( errno!=EBADF ) {
110 0 : FD_LOG_ERR(( "fcntl(F_GETFD) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
111 0 : }
112 :
113 0 : int orig_fd = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
114 0 : if( FD_UNLIKELY( orig_fd<0 ) ) {
115 0 : FD_LOG_ERR(( "socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
116 0 : }
117 :
118 0 : int reuseport = 1;
119 0 : if( FD_UNLIKELY( setsockopt( orig_fd, SOL_SOCKET, SO_REUSEPORT, &reuseport, sizeof(int) )<0 ) ) {
120 0 : FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_REUSEPORT,1) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
121 0 : }
122 :
123 0 : int ip_pktinfo = 1;
124 0 : if( FD_UNLIKELY( setsockopt( orig_fd, IPPROTO_IP, IP_PKTINFO, &ip_pktinfo, sizeof(int) )<0 ) ) {
125 0 : FD_LOG_ERR(( "setsockopt(IPPROTO_IP,IP_PKTINFO,1) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
126 0 : }
127 :
128 0 : if( FD_UNLIKELY( 0!=setsockopt( orig_fd, SOL_SOCKET, SO_RCVBUF, &so_rcvbuf, sizeof(int) ) ) ) {
129 0 : FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_RCVBUF,%i) failed (%i-%s)", so_rcvbuf, errno, fd_io_strerror( errno ) ));
130 0 : }
131 :
132 0 : struct sockaddr_in saddr = {
133 0 : .sin_family = AF_INET,
134 0 : .sin_addr.s_addr = bind_addr,
135 0 : .sin_port = fd_ushort_bswap( udp_port ),
136 0 : };
137 0 : if( FD_UNLIKELY( 0!=bind( orig_fd, fd_type_pun_const( &saddr ), sizeof(struct sockaddr_in) ) ) ) {
138 0 : FD_LOG_ERR(( "bind(0.0.0.0:%i) failed (%i-%s)", udp_port, errno, fd_io_strerror( errno ) ));
139 0 : }
140 :
141 0 : # if defined(__linux__)
142 0 : int dup_res = dup3( orig_fd, sock_fd, O_CLOEXEC );
143 : # else
144 : int dup_res = dup2( orig_fd, sock_fd );
145 : # endif
146 0 : if( FD_UNLIKELY( dup_res!=sock_fd ) ) {
147 0 : FD_LOG_ERR(( "dup2 returned %i (%i-%s)", sock_fd, errno, fd_io_strerror( errno ) ));
148 0 : }
149 :
150 0 : if( FD_UNLIKELY( 0!=close( orig_fd ) ) ) {
151 0 : FD_LOG_ERR(( "close(%d) failed (%i-%s)", orig_fd, errno, fd_io_strerror( errno ) ));
152 0 : }
153 :
154 0 : }
155 :
156 : static void
157 : privileged_init( fd_topo_t * topo,
158 0 : fd_topo_tile_t * tile ) {
159 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
160 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
161 0 : fd_sock_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sock_tile_t), sizeof(fd_sock_tile_t) );
162 0 : struct iovec * batch_iov = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct iovec), STEM_BURST*sizeof(struct iovec) );
163 0 : void * batch_cmsg = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct cmsghdr), STEM_BURST*FD_SOCK_CMSG_MAX );
164 0 : struct sockaddr_in * batch_sa = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct sockaddr_in), STEM_BURST*sizeof(struct sockaddr_in) );
165 0 : struct mmsghdr * batch_msg = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct mmsghdr), STEM_BURST*sizeof(struct mmsghdr) );
166 0 : uchar * tx_scratch = FD_SCRATCH_ALLOC_APPEND( l, FD_CHUNK_ALIGN, tx_scratch_footprint() );
167 :
168 0 : assert( scratch==ctx );
169 :
170 0 : fd_memset( ctx, 0, sizeof(fd_sock_tile_t) );
171 0 : fd_memset( batch_iov, 0, STEM_BURST*sizeof(struct iovec) );
172 0 : fd_memset( batch_sa, 0, STEM_BURST*sizeof(struct sockaddr_in) );
173 0 : fd_memset( batch_msg, 0, STEM_BURST*sizeof(struct mmsghdr) );
174 :
175 0 : ctx->batch_cnt = 0UL;
176 0 : ctx->batch_iov = batch_iov;
177 0 : ctx->batch_cmsg = batch_cmsg;
178 0 : ctx->batch_sa = batch_sa;
179 0 : ctx->batch_msg = batch_msg;
180 0 : ctx->tx_scratch0 = tx_scratch;
181 0 : ctx->tx_scratch1 = tx_scratch + tx_scratch_footprint();
182 0 : ctx->tx_ptr = tx_scratch;
183 :
184 : /* Create receive sockets. Incrementally assign them to file
185 : descriptors starting at sock_fd_min. */
186 :
187 0 : int sock_fd_min = RX_SOCK_FD_MIN;
188 0 : ushort udp_port_candidates[] = {
189 0 : (ushort)tile->sock.net.legacy_transaction_listen_port,
190 0 : (ushort)tile->sock.net.quic_transaction_listen_port,
191 0 : (ushort)tile->sock.net.shred_listen_port,
192 0 : (ushort)tile->sock.net.gossip_listen_port,
193 0 : (ushort)tile->sock.net.repair_intake_listen_port,
194 0 : (ushort)tile->sock.net.repair_serve_listen_port,
195 0 : (ushort)tile->sock.net.send_src_port
196 0 : };
197 0 : static char const * udp_port_links[] = {
198 0 : "net_quic", /* legacy_transaction_listen_port */
199 0 : "net_quic", /* quic_transaction_listen_port */
200 0 : "net_shred", /* shred_listen_port (turbine) */
201 0 : "net_gossip", /* gossip_listen_port */
202 0 : "net_shred", /* shred_listen_port (repair) */
203 0 : "net_repair", /* repair_serve_listen_port */
204 0 : "net_send" /* send_src_port */
205 0 : };
206 0 : static uchar const udp_port_protos[] = {
207 0 : DST_PROTO_TPU_UDP, /* legacy_transaction_listen_port */
208 0 : DST_PROTO_TPU_QUIC, /* quic_transaction_listen_port */
209 0 : DST_PROTO_SHRED, /* shred_listen_port (turbine) */
210 0 : DST_PROTO_GOSSIP, /* gossip_listen_port */
211 0 : DST_PROTO_REPAIR, /* shred_listen_port (repair) */
212 0 : DST_PROTO_REPAIR /* repair_serve_listen_port */
213 0 : };
214 0 : for( uint candidate_idx=0U; candidate_idx<6; candidate_idx++ ) {
215 0 : if( !udp_port_candidates[ candidate_idx ] ) continue;
216 0 : uint sock_idx = ctx->sock_cnt;
217 0 : if( candidate_idx>FD_SOCK_TILE_MAX_SOCKETS ) FD_LOG_ERR(( "too many sockets" ));
218 0 : ushort port = (ushort)udp_port_candidates[ candidate_idx ];
219 :
220 : /* Validate value of REPAIR_SHRED_SOCKET_ID */
221 0 : if( udp_port_candidates[sock_idx]==tile->sock.net.repair_intake_listen_port )
222 0 : FD_TEST( sock_idx==REPAIR_SHRED_SOCKET_ID );
223 0 : if( udp_port_candidates[sock_idx]==tile->sock.net.repair_serve_listen_port )
224 0 : FD_TEST( sock_idx==REPAIR_SHRED_SOCKET_ID+1 );
225 :
226 0 : char const * target_link = udp_port_links[ candidate_idx ];
227 0 : ctx->link_rx_map[ sock_idx ] = 0xFF;
228 0 : for( ulong j=0UL; j<(tile->out_cnt); j++ ) {
229 0 : if( 0==strcmp( topo->links[ tile->out_link_id[ j ] ].name, target_link ) ) {
230 0 : ctx->proto_id [ sock_idx ] = (uchar)udp_port_protos[ candidate_idx ];
231 0 : ctx->link_rx_map [ sock_idx ] = (uchar)j;
232 0 : ctx->rx_sock_port[ sock_idx ] = (ushort)port;
233 0 : break;
234 0 : }
235 0 : }
236 0 : if( ctx->link_rx_map[ sock_idx ]==0xFF ) {
237 0 : continue; /* listen port number has no associated links */
238 0 : }
239 :
240 0 : int sock_fd = sock_fd_min + (int)sock_idx;
241 0 : create_udp_socket( sock_fd, tile->sock.net.bind_address, port, tile->sock.so_rcvbuf );
242 0 : ctx->pollfd[ sock_idx ].fd = sock_fd;
243 0 : ctx->pollfd[ sock_idx ].events = POLLIN;
244 0 : ctx->sock_cnt++;
245 0 : }
246 :
247 : /* Create transmit socket */
248 :
249 0 : int tx_sock = socket( AF_INET, SOCK_RAW|SOCK_CLOEXEC, FD_IP4_HDR_PROTOCOL_UDP );
250 0 : if( FD_UNLIKELY( tx_sock<0 ) ) {
251 0 : FD_LOG_ERR(( "socket(AF_INET,SOCK_RAW|SOCK_CLOEXEC,17) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
252 0 : }
253 :
254 0 : if( FD_UNLIKELY( 0!=setsockopt( tx_sock, SOL_SOCKET, SO_SNDBUF, &tile->sock.so_sndbuf, sizeof(int) ) ) ) {
255 0 : FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_SNDBUF,%i) failed (%i-%s)", tile->sock.so_sndbuf, errno, fd_io_strerror( errno ) ));
256 0 : }
257 :
258 0 : ctx->tx_sock = tx_sock;
259 0 : ctx->bind_address = tile->sock.net.bind_address;
260 :
261 0 : }
262 :
263 : static void
264 : unprivileged_init( fd_topo_t * topo,
265 0 : fd_topo_tile_t * tile ) {
266 0 : fd_sock_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
267 :
268 0 : if( FD_UNLIKELY( tile->out_cnt > MAX_NET_OUTS ) ) {
269 0 : FD_LOG_ERR(( "sock tile has %lu out links which exceeds the max (%lu)", tile->out_cnt, MAX_NET_OUTS ));
270 0 : }
271 :
272 0 : for( ulong i=0UL; i<(tile->out_cnt); i++ ) {
273 0 : if( 0!=strncmp( topo->links[ tile->out_link_id[ i ] ].name, "net_", 4 ) ) {
274 0 : FD_LOG_ERR(( "out link %lu is not a net RX link", i ));
275 0 : }
276 0 : fd_topo_link_t * link = &topo->links[ tile->out_link_id[ i ] ];
277 0 : ctx->link_rx[ i ].base = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
278 0 : ctx->link_rx[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->link_rx[ i ].base, link->dcache );
279 0 : ctx->link_rx[ i ].wmark = fd_dcache_compact_wmark( ctx->link_rx[ i ].base, link->dcache, link->mtu );
280 0 : ctx->link_rx[ i ].chunk = ctx->link_rx[ i ].chunk0;
281 0 : if( FD_UNLIKELY( link->burst < STEM_BURST ) ) {
282 0 : FD_LOG_ERR(( "link %lu dcache burst is too low (%lu<%lu)",
283 0 : tile->out_link_id[ i ], link->burst, STEM_BURST ));
284 0 : }
285 0 : }
286 :
287 0 : for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
288 0 : if( !strstr( topo->links[ tile->in_link_id[ i ] ].name, "_net" ) ) {
289 0 : FD_LOG_ERR(( "in link %lu is not a net TX link", i ));
290 0 : }
291 0 : fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
292 0 : ctx->link_tx[ i ].base = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
293 0 : ctx->link_tx[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->link_tx[ i ].base, link->dcache );
294 0 : ctx->link_tx[ i ].wmark = fd_dcache_compact_wmark( ctx->link_tx[ i ].base, link->dcache, link->mtu );
295 0 : }
296 :
297 0 : }
298 :
299 : /* RX PATH (socket->tango) ********************************************/
300 :
301 : /* FIXME Pace RX polling and interleave it with TX jobs to reduce TX
302 : tail latency */
303 :
304 : /* poll_rx_socket does one recvmmsg batch receive on the given socket
305 : index. Returns the number of packets returned by recvmmsg. */
306 :
307 : static ulong
308 : poll_rx_socket( fd_sock_tile_t * ctx,
309 : fd_stem_context_t * stem,
310 : uint sock_idx,
311 : int sock_fd,
312 0 : ushort proto ) {
313 0 : ulong hdr_sz = sizeof(fd_eth_hdr_t) + sizeof(fd_ip4_hdr_t) + sizeof(fd_udp_hdr_t);
314 0 : ulong payload_max = FD_NET_MTU-hdr_sz;
315 0 : uchar rx_link = ctx->link_rx_map[ sock_idx ];
316 0 : ushort dport = ctx->rx_sock_port[ sock_idx ];
317 :
318 0 : fd_sock_link_rx_t * link = ctx->link_rx + rx_link;
319 0 : void * const base = link->base;
320 0 : ulong const chunk0 = link->chunk0;
321 0 : ulong const wmark = link->wmark;
322 0 : ulong chunk_next = link->chunk;
323 0 : uchar * cmsg_next = ctx->batch_cmsg;
324 :
325 0 : for( ulong j=0UL; j<STEM_BURST; j++ ) {
326 0 : ctx->batch_iov[ j ].iov_base = (uchar *)fd_chunk_to_laddr( base, chunk_next ) + hdr_sz;
327 0 : ctx->batch_iov[ j ].iov_len = payload_max;
328 0 : ctx->batch_msg[ j ].msg_hdr = (struct msghdr) {
329 0 : .msg_iov = ctx->batch_iov+j,
330 0 : .msg_iovlen = 1,
331 0 : .msg_name = ctx->batch_sa+j,
332 0 : .msg_namelen = sizeof(struct sockaddr_in),
333 0 : .msg_control = cmsg_next,
334 0 : .msg_controllen = FD_SOCK_CMSG_MAX,
335 0 : };
336 0 : cmsg_next += FD_SOCK_CMSG_MAX;
337 : /* Speculatively prepare all chunk indexes for a receive.
338 : At function exit, chunks into which a packet was received are
339 : committed, all others are freed. */
340 0 : chunk_next = fd_dcache_compact_next( chunk_next, FD_NET_MTU, chunk0, wmark );
341 0 : }
342 :
343 0 : int msg_cnt = recvmmsg( sock_fd, ctx->batch_msg, STEM_BURST, MSG_DONTWAIT, NULL );
344 0 : if( FD_UNLIKELY( msg_cnt<0 ) ) {
345 0 : if( FD_LIKELY( errno==EAGAIN ) ) return 0UL;
346 : /* unreachable if socket is in a valid state */
347 0 : FD_LOG_ERR(( "recvmmsg failed (%i-%s)", errno, fd_io_strerror( errno ) ));
348 0 : }
349 0 : long ts = fd_tickcount();
350 0 : ctx->metrics.sys_recvmmsg_cnt++;
351 :
352 0 : if( FD_UNLIKELY( msg_cnt==0 ) ) return 0UL;
353 :
354 : /* Track the chunk index of the last frag populated, so we can derive
355 : the chunk indexes for the next poll_rx_socket call.
356 : Guaranteed to be set since msg_cnt>0. */
357 0 : ulong last_chunk;
358 :
359 0 : for( ulong j=0; j<(ulong)msg_cnt; j++ ) {
360 0 : uchar * payload = ctx->batch_iov[ j ].iov_base;
361 0 : ulong payload_sz = ctx->batch_msg[ j ].msg_len;
362 0 : struct sockaddr_in * sa = ctx->batch_msg[ j ].msg_hdr.msg_name;
363 0 : ulong frame_sz = payload_sz + hdr_sz;
364 0 : ctx->metrics.rx_bytes_total += frame_sz;
365 0 : if( FD_UNLIKELY( sa->sin_family!=AF_INET ) ) {
366 : /* unreachable */
367 0 : FD_LOG_ERR(( "Received packet with unexpected sin_family %i", sa->sin_family ));
368 0 : }
369 :
370 0 : long daddr = -1;
371 0 : struct cmsghdr * cmsg = CMSG_FIRSTHDR( &ctx->batch_msg[ j ].msg_hdr );
372 0 : if( FD_LIKELY( cmsg ) ) {
373 0 : do {
374 0 : if( FD_LIKELY( (cmsg->cmsg_level==IPPROTO_IP) &
375 0 : (cmsg->cmsg_type ==IP_PKTINFO) ) ) {
376 0 : struct in_pktinfo const * pi = (struct in_pktinfo const *)CMSG_DATA( cmsg );
377 0 : daddr = pi->ipi_addr.s_addr;
378 0 : }
379 0 : cmsg = CMSG_NXTHDR( &ctx->batch_msg[ j ].msg_hdr, cmsg );
380 0 : } while( FD_UNLIKELY( cmsg ) ); /* optimize for 1 cmsg */
381 0 : }
382 0 : if( FD_UNLIKELY( daddr<0L ) ) {
383 : /* unreachable because IP_PKTINFO was set */
384 0 : FD_LOG_ERR(( "Missing IP_PKTINFO on incoming packet" ));
385 0 : }
386 :
387 0 : fd_eth_hdr_t * eth_hdr = (fd_eth_hdr_t *)( payload-42UL );
388 0 : fd_ip4_hdr_t * ip_hdr = (fd_ip4_hdr_t *)( payload-28UL );
389 0 : fd_udp_hdr_t * udp_hdr = (fd_udp_hdr_t *)( payload- 8UL );
390 0 : memset( eth_hdr->dst, 0, 6 );
391 0 : memset( eth_hdr->src, 0, 6 );
392 0 : eth_hdr->net_type = fd_ushort_bswap( FD_ETH_HDR_TYPE_IP );
393 0 : *ip_hdr = (fd_ip4_hdr_t) {
394 0 : .verihl = FD_IP4_VERIHL( 4, 5 ),
395 0 : .net_tot_len = fd_ushort_bswap( (ushort)( payload_sz+28UL ) ),
396 0 : .ttl = 1,
397 0 : .protocol = FD_IP4_HDR_PROTOCOL_UDP,
398 0 : };
399 0 : uint daddr_ = (uint)(ulong)daddr;
400 0 : memcpy( ip_hdr->saddr_c, &sa->sin_addr.s_addr, 4 );
401 0 : memcpy( ip_hdr->daddr_c, &daddr_, 4 );
402 0 : *udp_hdr = (fd_udp_hdr_t) {
403 0 : .net_sport = sa->sin_port,
404 0 : .net_dport = (ushort)fd_ushort_bswap( (ushort)dport ),
405 0 : .net_len = (ushort)fd_ushort_bswap( (ushort)( payload_sz+8UL ) ),
406 0 : .check = 0
407 0 : };
408 :
409 0 : ctx->metrics.rx_pkt_cnt++;
410 0 : ulong chunk = fd_laddr_to_chunk( base, eth_hdr );
411 0 : ulong sig = fd_disco_netmux_sig( sa->sin_addr.s_addr, fd_ushort_bswap( sa->sin_port ), 0U, proto, hdr_sz );
412 0 : ulong tspub = fd_frag_meta_ts_comp( ts );
413 :
414 : /* default for repair intake is to send to [shreds] to shred tile.
415 : ping messages should be routed to the repair. */
416 0 : if( FD_UNLIKELY( sock_idx==REPAIR_SHRED_SOCKET_ID && frame_sz==REPAIR_PING_SZ ) ) {
417 0 : uchar repair_rx_link = ctx->link_rx_map[ REPAIR_SHRED_SOCKET_ID+1 ];
418 0 : fd_sock_link_rx_t * repair_link = ctx->link_rx + repair_rx_link;
419 0 : uchar * repair_buf = fd_chunk_to_laddr( repair_link->base, repair_link->chunk );
420 0 : memcpy( repair_buf, eth_hdr, frame_sz );
421 0 : fd_stem_publish( stem, repair_rx_link, sig, repair_link->chunk, frame_sz, 0UL, 0UL, tspub );
422 0 : repair_link->chunk = fd_dcache_compact_next( repair_link->chunk, FD_NET_MTU, repair_link->chunk0, repair_link->wmark );
423 0 : } else {
424 0 : fd_stem_publish( stem, rx_link, sig, chunk, frame_sz, 0UL, 0UL, tspub );
425 0 : }
426 :
427 0 : last_chunk = chunk;
428 0 : }
429 :
430 : /* Rewind the chunk index to the first free index. */
431 0 : link->chunk = fd_dcache_compact_next( last_chunk, FD_NET_MTU, chunk0, wmark );
432 0 : return (ulong)msg_cnt;
433 0 : }
434 :
435 : static ulong
436 : poll_rx( fd_sock_tile_t * ctx,
437 0 : fd_stem_context_t * stem ) {
438 0 : ulong pkt_cnt = 0UL;
439 0 : if( FD_UNLIKELY( ctx->batch_cnt ) ) {
440 0 : FD_LOG_ERR(( "Batch is not clean" ));
441 0 : }
442 0 : ctx->tx_idle_cnt = 0; /* restart TX polling */
443 0 : if( FD_UNLIKELY( fd_syscall_poll( ctx->pollfd, ctx->sock_cnt, 0 )<0 ) ) {
444 0 : FD_LOG_ERR(( "fd_syscall_poll failed (%i-%s)", errno, fd_io_strerror( errno ) ));
445 0 : }
446 0 : for( uint j=0UL; j<ctx->sock_cnt; j++ ) {
447 0 : if( ctx->pollfd[ j ].revents & (POLLIN|POLLERR) ) {
448 0 : pkt_cnt += poll_rx_socket(
449 0 : ctx,
450 0 : stem,
451 0 : j,
452 0 : ctx->pollfd[ j ].fd,
453 0 : ctx->proto_id[ j ]
454 0 : );
455 0 : }
456 0 : ctx->pollfd[ j ].revents = 0;
457 0 : }
458 0 : return pkt_cnt;
459 0 : }
460 :
461 : /* TX PATH (tango->socket) ********************************************/
462 :
463 : static void
464 0 : flush_tx_batch( fd_sock_tile_t * ctx ) {
465 0 : ulong batch_cnt = ctx->batch_cnt;
466 0 : for( int j = 0; j < (int)batch_cnt; /* incremented in loop */ ) {
467 0 : int remain = (int)batch_cnt - j;
468 0 : int send_cnt = sendmmsg( ctx->tx_sock, ctx->batch_msg + j, (uint)remain, MSG_DONTWAIT );
469 0 : if( send_cnt>=0 ) {
470 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_NO_ERROR_IDX ]++;
471 0 : }
472 0 : if( FD_UNLIKELY( send_cnt < remain ) ) {
473 0 : ctx->metrics.tx_drop_cnt++;
474 0 : if( FD_UNLIKELY( send_cnt < 0 ) ) {
475 0 : switch( errno ) {
476 0 : case EAGAIN:
477 0 : case ENOBUFS:
478 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_SLOW_IDX ]++;
479 0 : break;
480 0 : case EPERM:
481 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_PERM_IDX ]++;
482 0 : break;
483 0 : case ENETUNREACH:
484 0 : case EHOSTUNREACH:
485 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_UNREACH_IDX ]++;
486 0 : break;
487 0 : case ENONET:
488 0 : case ENETDOWN:
489 0 : case EHOSTDOWN:
490 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_DOWN_IDX ]++;
491 0 : break;
492 0 : default:
493 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_OTHER_IDX ]++;
494 : /* log with NOTICE, since flushing has a significant negative performance impact */
495 0 : FD_LOG_NOTICE(( "sendmmsg failed (%i-%s)", errno, fd_io_strerror( errno ) ));
496 0 : }
497 :
498 : /* first message failed, so skip failing message and continue */
499 0 : j++;
500 0 : } else {
501 : /* send_cnt succeeded, so skip those and also the failing message */
502 0 : j += send_cnt + 1;
503 :
504 : /* add the successful count */
505 0 : ctx->metrics.tx_pkt_cnt += (ulong)send_cnt;
506 0 : }
507 :
508 0 : continue;
509 0 : }
510 :
511 : /* send_cnt == batch_cnt, so we sent everything */
512 0 : ctx->metrics.tx_pkt_cnt += (ulong)send_cnt;
513 0 : break;
514 0 : }
515 :
516 0 : ctx->tx_ptr = ctx->tx_scratch0;
517 0 : ctx->batch_cnt = 0;
518 0 : }
519 :
520 : /* before_frag is called when a new frag has been detected. The sock
521 : tile can do early filtering here in the future. For example, it may
522 : want to install routing logic here to take turns with an XDP tile.
523 : (Fast path with slow fallback) */
524 :
525 : static inline int
526 : before_frag( fd_sock_tile_t * ctx FD_PARAM_UNUSED,
527 : ulong in_idx FD_PARAM_UNUSED,
528 : ulong seq FD_PARAM_UNUSED,
529 0 : ulong sig ) {
530 0 : ulong proto = fd_disco_netmux_sig_proto( sig );
531 0 : if( FD_UNLIKELY( proto!=DST_PROTO_OUTGOING ) ) return 1;
532 0 : return 0; /* continue */
533 0 : }
534 :
535 : /* during_frag is called when a new frag passed early filtering.
536 : Speculatively copies data into a sendmmsg buffer. (If all tiles
537 : respect backpressure could eliminate this copy) */
538 :
539 : static inline void
540 : during_frag( fd_sock_tile_t * ctx,
541 : ulong in_idx,
542 : ulong seq FD_PARAM_UNUSED,
543 : ulong sig,
544 : ulong chunk,
545 : ulong sz,
546 0 : ulong ctl FD_PARAM_UNUSED ) {
547 0 : if( FD_UNLIKELY( chunk<ctx->link_tx[ in_idx ].chunk0 || chunk>ctx->link_tx[ in_idx ].wmark || sz>FD_NET_MTU ) ) {
548 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->link_tx[ in_idx ].chunk0, ctx->link_tx[ in_idx ].wmark ));
549 0 : }
550 :
551 0 : ulong const hdr_min = sizeof(fd_eth_hdr_t)+sizeof(fd_ip4_hdr_t)+sizeof(fd_udp_hdr_t);
552 0 : if( FD_UNLIKELY( sz<hdr_min ) ) {
553 : /* FIXME support ICMP messages in the future? */
554 0 : FD_LOG_ERR(( "packet too small %lu (in_idx=%lu)", sz, in_idx ));
555 0 : }
556 :
557 0 : uchar const * frame = fd_chunk_to_laddr_const( ctx->link_tx[ in_idx ].base, chunk );
558 0 : ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
559 0 : uchar const * payload = frame+hdr_sz;
560 0 : if( FD_UNLIKELY( hdr_sz>sz || hdr_sz<hdr_min ) ) {
561 0 : FD_LOG_ERR(( "packet from in_idx=%lu corrupt: hdr_sz=%lu total_sz=%lu",
562 0 : in_idx, hdr_sz, sz ));
563 0 : }
564 0 : ulong payload_sz = sz-hdr_sz;
565 :
566 0 : fd_ip4_hdr_t const * ip_hdr = (fd_ip4_hdr_t const *)( frame +sizeof(fd_eth_hdr_t) );
567 0 : fd_udp_hdr_t const * udp_hdr = (fd_udp_hdr_t const *)( payload-sizeof(fd_udp_hdr_t) );
568 0 : if( FD_UNLIKELY( ( FD_IP4_GET_VERSION( *ip_hdr )!=4 ) |
569 0 : ( ip_hdr->protocol != FD_IP4_HDR_PROTOCOL_UDP ) ) ) {
570 0 : FD_LOG_ERR(( "packet from in_idx=%lu: sock tile only supports IPv4 UDP for now", in_idx ));
571 0 : }
572 :
573 0 : ulong msg_sz = sizeof(fd_udp_hdr_t) + payload_sz;
574 :
575 0 : ulong batch_idx = ctx->batch_cnt;
576 0 : assert( batch_idx<STEM_BURST );
577 0 : struct mmsghdr * msg = ctx->batch_msg + batch_idx;
578 0 : struct sockaddr_in * sa = ctx->batch_sa + batch_idx;
579 0 : struct iovec * iov = ctx->batch_iov + batch_idx;
580 0 : struct cmsghdr * cmsg = (void *)( (ulong)ctx->batch_cmsg + batch_idx*FD_SOCK_CMSG_MAX );
581 0 : uchar * buf = ctx->tx_ptr;
582 :
583 0 : *iov = (struct iovec) {
584 0 : .iov_base = buf,
585 0 : .iov_len = msg_sz,
586 0 : };
587 0 : sa->sin_family = AF_INET;
588 0 : sa->sin_addr.s_addr = FD_LOAD( uint, ip_hdr->daddr_c );
589 0 : sa->sin_port = 0; /* ignored */
590 :
591 0 : cmsg->cmsg_level = IPPROTO_IP;
592 0 : cmsg->cmsg_type = IP_PKTINFO;
593 0 : cmsg->cmsg_len = CMSG_LEN( sizeof(struct in_pktinfo) );
594 0 : struct in_pktinfo * pi = (struct in_pktinfo *)CMSG_DATA( cmsg );
595 0 : pi->ipi_ifindex = 0;
596 0 : pi->ipi_addr.s_addr = 0;
597 0 : pi->ipi_spec_dst.s_addr = fd_uint_if( !!ip_hdr->saddr, ip_hdr->saddr, ctx->bind_address );
598 :
599 0 : *msg = (struct mmsghdr) {
600 0 : .msg_hdr = {
601 0 : .msg_name = sa,
602 0 : .msg_namelen = sizeof(struct sockaddr_in),
603 0 : .msg_iov = iov,
604 0 : .msg_iovlen = 1,
605 0 : .msg_control = cmsg,
606 0 : .msg_controllen = CMSG_LEN( sizeof(struct in_pktinfo) )
607 0 : }
608 0 : };
609 :
610 0 : memcpy( buf, udp_hdr, sizeof(fd_udp_hdr_t) );
611 0 : fd_memcpy( buf+sizeof(fd_udp_hdr_t), payload, payload_sz );
612 0 : ctx->metrics.tx_bytes_total += sz;
613 0 : }
614 :
615 : /* after_frag is called when a frag was copied into a sendmmsg buffer. */
616 :
617 : static void
618 : after_frag( fd_sock_tile_t * ctx,
619 : ulong in_idx FD_PARAM_UNUSED,
620 : ulong seq FD_PARAM_UNUSED,
621 : ulong sig FD_PARAM_UNUSED,
622 : ulong sz,
623 : ulong tsorig FD_PARAM_UNUSED,
624 : ulong tspub FD_PARAM_UNUSED,
625 0 : fd_stem_context_t * stem FD_PARAM_UNUSED ) {
626 : /* Commit the packet added in during_frag */
627 :
628 0 : ctx->tx_idle_cnt = 0;
629 0 : ctx->batch_cnt++;
630 : /* Technically leaves a gap. sz is always larger than the payload
631 : written to tx_ptr because Ethernet & IPv4 headers were stripped. */
632 0 : ctx->tx_ptr += fd_ulong_align_up( sz, FD_CHUNK_ALIGN );
633 :
634 0 : if( ctx->batch_cnt >= STEM_BURST ) {
635 0 : flush_tx_batch( ctx );
636 0 : }
637 0 : }
638 :
639 : /* End TX path ********************************************************/
640 :
641 : /* after_credit is called every stem iteration when there are enough
642 : flow control credits to publish a burst of fragments. */
643 :
644 : static inline void
645 : after_credit( fd_sock_tile_t * ctx,
646 : fd_stem_context_t * stem,
647 : int * poll_in FD_PARAM_UNUSED,
648 0 : int * charge_busy ) {
649 0 : if( ctx->tx_idle_cnt > 512 ) {
650 0 : if( ctx->batch_cnt ) {
651 0 : flush_tx_batch( ctx );
652 0 : }
653 0 : ulong pkt_cnt = poll_rx( ctx, stem );
654 0 : *charge_busy = pkt_cnt!=0;
655 0 : }
656 0 : ctx->tx_idle_cnt++;
657 0 : }
658 :
659 : static void
660 0 : metrics_write( fd_sock_tile_t * ctx ) {
661 0 : FD_MCNT_SET( SOCK, SYSCALLS_RECVMMSG, ctx->metrics.sys_recvmmsg_cnt );
662 0 : FD_MCNT_ENUM_COPY( SOCK, SYSCALLS_SENDMMSG, ctx->metrics.sys_sendmmsg_cnt );
663 0 : FD_MCNT_SET( SOCK, RX_PKT_CNT, ctx->metrics.rx_pkt_cnt );
664 0 : FD_MCNT_SET( SOCK, TX_PKT_CNT, ctx->metrics.tx_pkt_cnt );
665 0 : FD_MCNT_SET( SOCK, TX_DROP_CNT, ctx->metrics.tx_drop_cnt );
666 0 : FD_MCNT_SET( SOCK, TX_BYTES_TOTAL, ctx->metrics.tx_bytes_total );
667 0 : FD_MCNT_SET( SOCK, RX_BYTES_TOTAL, ctx->metrics.rx_bytes_total );
668 0 : }
669 :
670 : static ulong
671 : rlimit_file_cnt( fd_topo_t const * topo,
672 0 : fd_topo_tile_t const * tile ) {
673 0 : fd_sock_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
674 0 : return RX_SOCK_FD_MIN + ctx->sock_cnt;
675 0 : }
676 :
677 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_sock_tile_t
678 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_sock_tile_t)
679 :
680 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
681 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
682 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
683 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
684 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
685 :
686 : #include "../../stem/fd_stem.c"
687 :
688 : fd_topo_run_tile_t fd_tile_sock = {
689 : .name = "sock",
690 : .rlimit_file_cnt_fn = rlimit_file_cnt,
691 : .populate_allowed_seccomp = populate_allowed_seccomp,
692 : .populate_allowed_fds = populate_allowed_fds,
693 : .scratch_align = scratch_align,
694 : .scratch_footprint = scratch_footprint,
695 : .privileged_init = privileged_init,
696 : .unprivileged_init = unprivileged_init,
697 : .run = stem_run,
698 : };
|