Line data Source code
1 : #define _GNU_SOURCE /* dup3 */
2 : #include "fd_sock_tile_private.h"
3 : #include "../../topo/fd_topo.h"
4 : #include "../../../util/net/fd_eth.h"
5 : #include "../../../util/net/fd_ip4.h"
6 : #include "../../../util/net/fd_udp.h"
7 :
8 : #include <assert.h> /* assert */
9 : #include <stdalign.h> /* alignof */
10 : #include <errno.h>
11 : #include <fcntl.h> /* fcntl */
12 : #include <unistd.h> /* dup3, close */
13 : #include <netinet/in.h> /* sockaddr_in */
14 : #include <sys/socket.h> /* socket */
15 : #include "generated/sock_seccomp.h"
16 : #include "../../metrics/fd_metrics.h"
17 :
18 : /* recv/sendmmsg packet count in batch and tango burst depth
19 : FIXME make configurable in the future?
20 : FIXME keep in sync with fd_net_tile_topo.c */
21 0 : #define STEM_BURST (64UL)
22 :
23 : /* Place RX socket file descriptors in contiguous integer range. */
24 0 : #define RX_SOCK_FD_MIN (128)
25 :
26 : /* Controls max ancillary data size.
27 : Must be aligned by alignof(struct cmsghdr) */
28 0 : #define FD_SOCK_CMSG_MAX (64UL)
29 :
30 : static ulong
31 : populate_allowed_seccomp( fd_topo_t const * topo,
32 : fd_topo_tile_t const * tile,
33 : ulong out_cnt,
34 0 : struct sock_filter * out ) {
35 0 : FD_SCRATCH_ALLOC_INIT( l, fd_topo_obj_laddr( topo, tile->tile_obj_id ) );
36 0 : fd_sock_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sock_tile_t), sizeof(fd_sock_tile_t) );
37 0 : populate_sock_filter_policy_sock( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->tx_sock, RX_SOCK_FD_MIN, RX_SOCK_FD_MIN+(uint)ctx->sock_cnt );
38 0 : return sock_filter_policy_sock_instr_cnt;
39 0 : }
40 :
41 : static ulong
42 : populate_allowed_fds( fd_topo_t const * topo,
43 : fd_topo_tile_t const * tile,
44 : ulong out_fds_cnt,
45 0 : int * out_fds ) {
46 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
47 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
48 0 : fd_sock_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sock_tile_t), sizeof(fd_sock_tile_t) );
49 :
50 0 : ulong sock_cnt = ctx->sock_cnt;
51 0 : if( FD_UNLIKELY( out_fds_cnt<sock_cnt+3UL ) ) {
52 0 : FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
53 0 : }
54 :
55 0 : ulong out_cnt = 0UL;
56 :
57 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
58 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
59 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
60 0 : }
61 0 : out_fds[ out_cnt++ ] = ctx->tx_sock;
62 0 : for( ulong j=0UL; j<sock_cnt; j++ ) {
63 0 : out_fds[ out_cnt++ ] = ctx->pollfd[ j ].fd;
64 0 : }
65 0 : return out_cnt;
66 0 : }
67 :
68 : FD_FN_CONST static inline ulong
69 0 : tx_scratch_footprint( void ) {
70 0 : return STEM_BURST * fd_ulong_align_up( FD_NET_MTU, FD_CHUNK_ALIGN );
71 0 : }
72 :
73 : FD_FN_CONST static inline ulong
74 0 : scratch_align( void ) {
75 0 : return 4096UL;
76 0 : }
77 :
78 : FD_FN_PURE static inline ulong
79 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
80 0 : ulong l = FD_LAYOUT_INIT;
81 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_sock_tile_t), sizeof(fd_sock_tile_t) );
82 0 : l = FD_LAYOUT_APPEND( l, alignof(struct iovec), STEM_BURST*sizeof(struct iovec) );
83 0 : l = FD_LAYOUT_APPEND( l, alignof(struct cmsghdr), STEM_BURST*FD_SOCK_CMSG_MAX );
84 0 : l = FD_LAYOUT_APPEND( l, alignof(struct sockaddr_in), STEM_BURST*sizeof(struct sockaddr_in) );
85 0 : l = FD_LAYOUT_APPEND( l, alignof(struct mmsghdr), STEM_BURST*sizeof(struct mmsghdr) );
86 0 : l = FD_LAYOUT_APPEND( l, FD_CHUNK_ALIGN, tx_scratch_footprint() );
87 0 : return FD_LAYOUT_FINI( l, scratch_align() );
88 0 : }
89 :
90 : /* create_udp_socket creates and configures a new UDP socket for the
91 : sock tile at the given file descriptor ID. */
92 :
93 : static void
94 : create_udp_socket( int sock_fd,
95 : uint bind_addr,
96 : ushort udp_port,
97 0 : int so_rcvbuf ) {
98 :
99 0 : if( fcntl( sock_fd, F_GETFD, 0 )!=-1 ) {
100 0 : FD_LOG_ERR(( "file descriptor %d already exists", sock_fd ));
101 0 : } else if( errno!=EBADF ) {
102 0 : FD_LOG_ERR(( "fcntl(F_GETFD) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
103 0 : }
104 :
105 0 : int orig_fd = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
106 0 : if( FD_UNLIKELY( orig_fd<0 ) ) {
107 0 : FD_LOG_ERR(( "socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
108 0 : }
109 :
110 0 : int reuseport = 1;
111 0 : if( FD_UNLIKELY( setsockopt( orig_fd, SOL_SOCKET, SO_REUSEPORT, &reuseport, sizeof(int) )<0 ) ) {
112 0 : FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_REUSEPORT,1) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
113 0 : }
114 :
115 0 : int ip_pktinfo = 1;
116 0 : if( FD_UNLIKELY( setsockopt( orig_fd, IPPROTO_IP, IP_PKTINFO, &ip_pktinfo, sizeof(int) )<0 ) ) {
117 0 : FD_LOG_ERR(( "setsockopt(IPPROTO_IP,IP_PKTINFO,1) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
118 0 : }
119 :
120 0 : if( FD_UNLIKELY( 0!=setsockopt( orig_fd, SOL_SOCKET, SO_RCVBUF, &so_rcvbuf, sizeof(int) ) ) ) {
121 0 : FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_RCVBUF,%i) failed (%i-%s)", so_rcvbuf, errno, fd_io_strerror( errno ) ));
122 0 : }
123 :
124 0 : struct sockaddr_in saddr = {
125 0 : .sin_family = AF_INET,
126 0 : .sin_addr.s_addr = bind_addr,
127 0 : .sin_port = fd_ushort_bswap( udp_port ),
128 0 : };
129 0 : if( FD_UNLIKELY( 0!=bind( orig_fd, fd_type_pun_const( &saddr ), sizeof(struct sockaddr_in) ) ) ) {
130 0 : FD_LOG_ERR(( "bind(0.0.0.0:%i) failed (%i-%s)", udp_port, errno, fd_io_strerror( errno ) ));
131 0 : }
132 :
133 0 : # if defined(__linux__)
134 0 : int dup_res = dup3( orig_fd, sock_fd, O_CLOEXEC );
135 : # else
136 : int dup_res = dup2( orig_fd, sock_fd );
137 : # endif
138 0 : if( FD_UNLIKELY( dup_res!=sock_fd ) ) {
139 0 : FD_LOG_ERR(( "dup2 returned %i (%i-%s)", sock_fd, errno, fd_io_strerror( errno ) ));
140 0 : }
141 :
142 0 : if( FD_UNLIKELY( 0!=close( orig_fd ) ) ) {
143 0 : FD_LOG_ERR(( "close(%d) failed (%i-%s)", orig_fd, errno, fd_io_strerror( errno ) ));
144 0 : }
145 :
146 0 : }
147 :
148 : static void
149 : privileged_init( fd_topo_t * topo,
150 0 : fd_topo_tile_t * tile ) {
151 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
152 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
153 0 : fd_sock_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sock_tile_t), sizeof(fd_sock_tile_t) );
154 0 : struct iovec * batch_iov = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct iovec), STEM_BURST*sizeof(struct iovec) );
155 0 : void * batch_cmsg = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct cmsghdr), STEM_BURST*FD_SOCK_CMSG_MAX );
156 0 : struct sockaddr_in * batch_sa = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct sockaddr_in), STEM_BURST*sizeof(struct sockaddr_in) );
157 0 : struct mmsghdr * batch_msg = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct mmsghdr), STEM_BURST*sizeof(struct mmsghdr) );
158 0 : uchar * tx_scratch = FD_SCRATCH_ALLOC_APPEND( l, FD_CHUNK_ALIGN, tx_scratch_footprint() );
159 :
160 0 : assert( scratch==ctx );
161 :
162 0 : fd_memset( ctx, 0, sizeof(fd_sock_tile_t) );
163 0 : fd_memset( batch_iov, 0, STEM_BURST*sizeof(struct iovec) );
164 0 : fd_memset( batch_sa, 0, STEM_BURST*sizeof(struct sockaddr_in) );
165 0 : fd_memset( batch_msg, 0, STEM_BURST*sizeof(struct mmsghdr) );
166 :
167 0 : ctx->batch_cnt = 0UL;
168 0 : ctx->batch_iov = batch_iov;
169 0 : ctx->batch_cmsg = batch_cmsg;
170 0 : ctx->batch_sa = batch_sa;
171 0 : ctx->batch_msg = batch_msg;
172 0 : ctx->tx_scratch0 = tx_scratch;
173 0 : ctx->tx_scratch1 = tx_scratch + tx_scratch_footprint();
174 0 : ctx->tx_ptr = tx_scratch;
175 :
176 : /* Create receive sockets. Incrementally assign them to file
177 : descriptors starting at sock_fd_min. */
178 :
179 0 : int sock_fd_min = RX_SOCK_FD_MIN;
180 0 : ushort udp_port_candidates[] = {
181 0 : (ushort)tile->sock.net.legacy_transaction_listen_port,
182 0 : (ushort)tile->sock.net.quic_transaction_listen_port,
183 0 : (ushort)tile->sock.net.shred_listen_port,
184 0 : (ushort)tile->sock.net.gossip_listen_port,
185 0 : (ushort)tile->sock.net.repair_intake_listen_port,
186 0 : (ushort)tile->sock.net.repair_serve_listen_port,
187 0 : (ushort)tile->sock.net.send_src_port
188 0 : };
189 0 : static char const * udp_port_links[] = {
190 0 : "net_quic", /* legacy_transaction_listen_port */
191 0 : "net_quic", /* quic_transaction_listen_port */
192 0 : "net_shred", /* shred_listen_port (turbine) */
193 0 : "net_gossip", /* gossip_listen_port */
194 0 : "net_shred", /* shred_listen_port (repair) */
195 0 : "net_repair", /* repair_serve_listen_port */
196 0 : "net_send" /* send_src_port */
197 0 : };
198 0 : static uchar const udp_port_protos[] = {
199 0 : DST_PROTO_TPU_UDP, /* legacy_transaction_listen_port */
200 0 : DST_PROTO_TPU_QUIC, /* quic_transaction_listen_port */
201 0 : DST_PROTO_SHRED, /* shred_listen_port (turbine) */
202 0 : DST_PROTO_GOSSIP, /* gossip_listen_port */
203 0 : DST_PROTO_SHRED, /* shred_listen_port (repair) */
204 : DST_PROTO_REPAIR /* repair_serve_listen_port */
205 0 : };
206 0 : for( uint candidate_idx=0U; candidate_idx<6; candidate_idx++ ) {
207 0 : if( !udp_port_candidates[ candidate_idx ] ) continue;
208 0 : uint sock_idx = ctx->sock_cnt;
209 0 : if( candidate_idx>FD_SOCK_TILE_MAX_SOCKETS ) FD_LOG_ERR(( "too many sockets" ));
210 0 : ushort port = (ushort)udp_port_candidates[ candidate_idx ];
211 :
212 0 : char const * target_link = udp_port_links[ candidate_idx ];
213 0 : ctx->link_rx_map[ sock_idx ] = 0xFF;
214 0 : for( ulong j=0UL; j<(tile->out_cnt); j++ ) {
215 0 : if( 0==strcmp( topo->links[ tile->out_link_id[ j ] ].name, target_link ) ) {
216 0 : ctx->proto_id [ sock_idx ] = (uchar)udp_port_protos[ candidate_idx ];
217 0 : ctx->link_rx_map [ sock_idx ] = (uchar)j;
218 0 : ctx->rx_sock_port[ sock_idx ] = (ushort)port;
219 0 : break;
220 0 : }
221 0 : }
222 0 : if( ctx->link_rx_map[ sock_idx ]==0xFF ) {
223 0 : continue; /* listen port number has no associated links */
224 0 : }
225 :
226 0 : int sock_fd = sock_fd_min + (int)sock_idx;
227 0 : create_udp_socket( sock_fd, tile->sock.net.bind_address, port, tile->sock.so_rcvbuf );
228 0 : ctx->pollfd[ sock_idx ].fd = sock_fd;
229 0 : ctx->pollfd[ sock_idx ].events = POLLIN;
230 0 : ctx->sock_cnt++;
231 0 : }
232 :
233 : /* Create transmit socket */
234 :
235 0 : int tx_sock = socket( AF_INET, SOCK_RAW|SOCK_CLOEXEC, FD_IP4_HDR_PROTOCOL_UDP );
236 0 : if( FD_UNLIKELY( tx_sock<0 ) ) {
237 0 : FD_LOG_ERR(( "socket(AF_INET,SOCK_RAW|SOCK_CLOEXEC,17) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
238 0 : }
239 :
240 0 : if( FD_UNLIKELY( 0!=setsockopt( tx_sock, SOL_SOCKET, SO_SNDBUF, &tile->sock.so_sndbuf, sizeof(int) ) ) ) {
241 0 : FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_SNDBUF,%i) failed (%i-%s)", tile->sock.so_sndbuf, errno, fd_io_strerror( errno ) ));
242 0 : }
243 :
244 0 : ctx->tx_sock = tx_sock;
245 0 : ctx->bind_address = tile->sock.net.bind_address;
246 :
247 0 : }
248 :
249 : static void
250 : unprivileged_init( fd_topo_t * topo,
251 0 : fd_topo_tile_t * tile ) {
252 0 : fd_sock_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
253 :
254 0 : if( FD_UNLIKELY( tile->out_cnt > MAX_NET_OUTS ) ) {
255 0 : FD_LOG_ERR(( "sock tile has %lu out links which exceeds the max (%lu)", tile->out_cnt, MAX_NET_OUTS ));
256 0 : }
257 :
258 0 : for( ulong i=0UL; i<(tile->out_cnt); i++ ) {
259 0 : if( 0!=strncmp( topo->links[ tile->out_link_id[ i ] ].name, "net_", 4 ) ) {
260 0 : FD_LOG_ERR(( "out link %lu is not a net RX link", i ));
261 0 : }
262 0 : fd_topo_link_t * link = &topo->links[ tile->out_link_id[ i ] ];
263 0 : ctx->link_rx[ i ].base = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
264 0 : ctx->link_rx[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->link_rx[ i ].base, link->dcache );
265 0 : ctx->link_rx[ i ].wmark = fd_dcache_compact_wmark( ctx->link_rx[ i ].base, link->dcache, link->mtu );
266 0 : ctx->link_rx[ i ].chunk = ctx->link_rx[ i ].chunk0;
267 0 : if( FD_UNLIKELY( link->burst < STEM_BURST ) ) {
268 0 : FD_LOG_ERR(( "link %lu dcache burst is too low (%lu<%lu)",
269 0 : tile->out_link_id[ i ], link->burst, STEM_BURST ));
270 0 : }
271 0 : }
272 :
273 0 : for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
274 0 : if( !strstr( topo->links[ tile->in_link_id[ i ] ].name, "_net" ) ) {
275 0 : FD_LOG_ERR(( "in link %lu is not a net TX link", i ));
276 0 : }
277 0 : fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
278 0 : ctx->link_tx[ i ].base = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
279 0 : ctx->link_tx[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->link_tx[ i ].base, link->dcache );
280 0 : ctx->link_tx[ i ].wmark = fd_dcache_compact_wmark( ctx->link_tx[ i ].base, link->dcache, link->mtu );
281 0 : }
282 :
283 0 : }
284 :
285 : /* RX PATH (socket->tango) ********************************************/
286 :
287 : /* FIXME Pace RX polling and interleave it with TX jobs to reduce TX
288 : tail latency */
289 :
290 : /* poll_rx_socket does one recvmmsg batch receive on the given socket
291 : index. Returns the number of packets returned by recvmmsg. */
292 :
293 : static ulong
294 : poll_rx_socket( fd_sock_tile_t * ctx,
295 : fd_stem_context_t * stem,
296 : uint sock_idx,
297 : int sock_fd,
298 0 : ushort proto ) {
299 0 : ulong hdr_sz = sizeof(fd_eth_hdr_t) + sizeof(fd_ip4_hdr_t) + sizeof(fd_udp_hdr_t);
300 0 : ulong payload_max = FD_NET_MTU-hdr_sz;
301 0 : uchar rx_link = ctx->link_rx_map[ sock_idx ];
302 0 : ushort dport = ctx->rx_sock_port[ sock_idx ];
303 :
304 0 : fd_sock_link_rx_t * link = ctx->link_rx + rx_link;
305 0 : void * const base = link->base;
306 0 : ulong const chunk0 = link->chunk0;
307 0 : ulong const wmark = link->wmark;
308 0 : ulong chunk_next = link->chunk;
309 0 : uchar * cmsg_next = ctx->batch_cmsg;
310 :
311 0 : for( ulong j=0UL; j<STEM_BURST; j++ ) {
312 0 : ctx->batch_iov[ j ].iov_base = (uchar *)fd_chunk_to_laddr( base, chunk_next ) + hdr_sz;
313 0 : ctx->batch_iov[ j ].iov_len = payload_max;
314 0 : ctx->batch_msg[ j ].msg_hdr = (struct msghdr) {
315 0 : .msg_iov = ctx->batch_iov+j,
316 0 : .msg_iovlen = 1,
317 0 : .msg_name = ctx->batch_sa+j,
318 0 : .msg_namelen = sizeof(struct sockaddr_in),
319 0 : .msg_control = cmsg_next,
320 0 : .msg_controllen = FD_SOCK_CMSG_MAX,
321 0 : };
322 0 : cmsg_next += FD_SOCK_CMSG_MAX;
323 : /* Speculatively prepare all chunk indexes for a receive.
324 : At function exit, chunks into which a packet was received are
325 : committed, all others are freed. */
326 0 : chunk_next = fd_dcache_compact_next( chunk_next, FD_NET_MTU, chunk0, wmark );
327 0 : }
328 :
329 0 : int msg_cnt = recvmmsg( sock_fd, ctx->batch_msg, STEM_BURST, MSG_DONTWAIT, NULL );
330 0 : if( FD_UNLIKELY( msg_cnt<0 ) ) {
331 0 : if( FD_LIKELY( errno==EAGAIN ) ) return 0UL;
332 : /* unreachable if socket is in a valid state */
333 0 : FD_LOG_ERR(( "recvmmsg failed (%i-%s)", errno, fd_io_strerror( errno ) ));
334 0 : }
335 0 : long ts = fd_tickcount();
336 0 : ctx->metrics.sys_recvmmsg_cnt++;
337 :
338 0 : if( FD_UNLIKELY( msg_cnt==0 ) ) return 0UL;
339 :
340 : /* Track the chunk index of the last frag populated, so we can derive
341 : the chunk indexes for the next poll_rx_socket call.
342 : Guaranteed to be set since msg_cnt>0. */
343 0 : ulong last_chunk;
344 :
345 0 : for( ulong j=0; j<(ulong)msg_cnt; j++ ) {
346 0 : uchar * payload = ctx->batch_iov[ j ].iov_base;
347 0 : ulong payload_sz = ctx->batch_msg[ j ].msg_len;
348 0 : struct sockaddr_in * sa = ctx->batch_msg[ j ].msg_hdr.msg_name;
349 0 : ulong frame_sz = payload_sz + hdr_sz;
350 0 : ctx->metrics.rx_bytes_total += frame_sz;
351 0 : if( FD_UNLIKELY( sa->sin_family!=AF_INET ) ) {
352 : /* unreachable */
353 0 : FD_LOG_ERR(( "Received packet with unexpected sin_family %i", sa->sin_family ));
354 0 : continue;
355 0 : }
356 :
357 0 : long daddr = -1;
358 0 : struct cmsghdr * cmsg = CMSG_FIRSTHDR( &ctx->batch_msg[ j ].msg_hdr );
359 0 : if( FD_LIKELY( cmsg ) ) {
360 0 : do {
361 0 : if( FD_LIKELY( (cmsg->cmsg_level==IPPROTO_IP) &
362 0 : (cmsg->cmsg_type ==IP_PKTINFO) ) ) {
363 0 : struct in_pktinfo const * pi = (struct in_pktinfo const *)CMSG_DATA( cmsg );
364 0 : daddr = pi->ipi_addr.s_addr;
365 0 : }
366 0 : cmsg = CMSG_NXTHDR( &ctx->batch_msg[ j ].msg_hdr, cmsg );
367 0 : } while( FD_UNLIKELY( cmsg ) ); /* optimize for 1 cmsg */
368 0 : }
369 0 : if( FD_UNLIKELY( daddr<0L ) ) {
370 : /* unreachable because IP_PKTINFO was set */
371 0 : FD_LOG_ERR(( "Missing IP_PKTINFO on incoming packet" ));
372 0 : }
373 :
374 0 : fd_eth_hdr_t * eth_hdr = (fd_eth_hdr_t *)( payload-42UL );
375 0 : fd_ip4_hdr_t * ip_hdr = (fd_ip4_hdr_t *)( payload-28UL );
376 0 : fd_udp_hdr_t * udp_hdr = (fd_udp_hdr_t *)( payload- 8UL );
377 0 : memset( eth_hdr->dst, 0, 6 );
378 0 : memset( eth_hdr->src, 0, 6 );
379 0 : eth_hdr->net_type = fd_ushort_bswap( FD_ETH_HDR_TYPE_IP );
380 0 : *ip_hdr = (fd_ip4_hdr_t) {
381 0 : .verihl = FD_IP4_VERIHL( 4, 5 ),
382 0 : .net_tot_len = fd_ushort_bswap( (ushort)( payload_sz+28UL ) ),
383 0 : .ttl = 1,
384 0 : .protocol = FD_IP4_HDR_PROTOCOL_UDP,
385 0 : };
386 0 : uint daddr_ = (uint)(ulong)daddr;
387 0 : memcpy( ip_hdr->saddr_c, &sa->sin_addr.s_addr, 4 );
388 0 : memcpy( ip_hdr->daddr_c, &daddr_, 4 );
389 0 : *udp_hdr = (fd_udp_hdr_t) {
390 0 : .net_sport = sa->sin_port,
391 0 : .net_dport = (ushort)fd_ushort_bswap( (ushort)dport ),
392 0 : .net_len = (ushort)fd_ushort_bswap( (ushort)( payload_sz+8UL ) ),
393 0 : .check = 0
394 0 : };
395 :
396 0 : ctx->metrics.rx_pkt_cnt++;
397 0 : ulong chunk = fd_laddr_to_chunk( base, eth_hdr );
398 0 : ulong sig = fd_disco_netmux_sig( sa->sin_addr.s_addr, fd_ushort_bswap( sa->sin_port ), 0U, proto, hdr_sz );
399 0 : ulong tspub = fd_frag_meta_ts_comp( ts );
400 0 : fd_stem_publish( stem, rx_link, sig, chunk, frame_sz, 0UL, 0UL, tspub );
401 0 : last_chunk = chunk;
402 0 : }
403 :
404 : /* Rewind the chunk index to the first free index. */
405 0 : link->chunk = fd_dcache_compact_next( last_chunk, FD_NET_MTU, chunk0, wmark );
406 0 : return (ulong)msg_cnt;
407 0 : }
408 :
409 : static ulong
410 : poll_rx( fd_sock_tile_t * ctx,
411 0 : fd_stem_context_t * stem ) {
412 0 : ulong pkt_cnt = 0UL;
413 0 : if( FD_UNLIKELY( ctx->batch_cnt ) ) {
414 0 : FD_LOG_ERR(( "Batch is not clean" ));
415 0 : }
416 0 : ctx->tx_idle_cnt = 0; /* restart TX polling */
417 0 : if( FD_UNLIKELY( poll( ctx->pollfd, ctx->sock_cnt, 0 )<0 ) ) {
418 0 : FD_LOG_ERR(( "poll failed (%i-%s)", errno, fd_io_strerror( errno ) ));
419 0 : }
420 0 : for( uint j=0UL; j<ctx->sock_cnt; j++ ) {
421 0 : if( ctx->pollfd[ j ].revents & (POLLIN|POLLERR) ) {
422 0 : pkt_cnt += poll_rx_socket(
423 0 : ctx,
424 0 : stem,
425 0 : j,
426 0 : ctx->pollfd[ j ].fd,
427 0 : ctx->proto_id[ j ]
428 0 : );
429 0 : }
430 0 : ctx->pollfd[ j ].revents = 0;
431 0 : }
432 0 : return pkt_cnt;
433 0 : }
434 :
435 : /* TX PATH (tango->socket) ********************************************/
436 :
437 : static void
438 0 : flush_tx_batch( fd_sock_tile_t * ctx ) {
439 0 : ulong batch_cnt = ctx->batch_cnt;
440 0 : for( int j = 0; j < (int)batch_cnt; /* incremented in loop */ ) {
441 0 : int remain = (int)batch_cnt - j;
442 0 : int send_cnt = sendmmsg( ctx->tx_sock, ctx->batch_msg + j, (uint)remain, MSG_DONTWAIT );
443 0 : if( send_cnt>=0 ) {
444 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_NO_ERROR_IDX ]++;
445 0 : }
446 0 : if( FD_UNLIKELY( send_cnt < remain ) ) {
447 0 : ctx->metrics.tx_drop_cnt++;
448 0 : if( FD_UNLIKELY( send_cnt < 0 ) ) {
449 0 : switch( errno ) {
450 0 : case EAGAIN:
451 0 : case ENOBUFS:
452 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_SLOW_IDX ]++;
453 0 : break;
454 0 : case EPERM:
455 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_PERM_IDX ]++;
456 0 : break;
457 0 : case ENETUNREACH:
458 0 : case EHOSTUNREACH:
459 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_UNREACH_IDX ]++;
460 0 : break;
461 0 : case ENONET:
462 0 : case ENETDOWN:
463 0 : case EHOSTDOWN:
464 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_DOWN_IDX ]++;
465 0 : break;
466 0 : default:
467 0 : ctx->metrics.sys_sendmmsg_cnt[ FD_METRICS_ENUM_SOCK_ERR_V_OTHER_IDX ]++;
468 : /* log with NOTICE, since flushing has a significant negative performance impact */
469 0 : FD_LOG_NOTICE(( "sendmmsg failed (%i-%s)", errno, fd_io_strerror( errno ) ));
470 0 : }
471 :
472 : /* first message failed, so skip failing message and continue */
473 0 : j++;
474 0 : } else {
475 : /* send_cnt succeeded, so skip those and also the failing message */
476 0 : j += send_cnt + 1;
477 :
478 : /* add the successful count */
479 0 : ctx->metrics.tx_pkt_cnt += (ulong)send_cnt;
480 0 : }
481 :
482 0 : continue;
483 0 : }
484 :
485 : /* send_cnt == batch_cnt, so we sent everything */
486 0 : ctx->metrics.tx_pkt_cnt += (ulong)send_cnt;
487 0 : break;
488 0 : }
489 :
490 0 : ctx->tx_ptr = ctx->tx_scratch0;
491 0 : ctx->batch_cnt = 0;
492 0 : }
493 :
494 : /* before_frag is called when a new frag has been detected. The sock
495 : tile can do early filtering here in the future. For example, it may
496 : want to install routing logic here to take turns with an XDP tile.
497 : (Fast path with slow fallback) */
498 :
499 : static inline int
500 : before_frag( fd_sock_tile_t * ctx FD_PARAM_UNUSED,
501 : ulong in_idx FD_PARAM_UNUSED,
502 : ulong seq FD_PARAM_UNUSED,
503 0 : ulong sig ) {
504 0 : ulong proto = fd_disco_netmux_sig_proto( sig );
505 0 : if( FD_UNLIKELY( proto!=DST_PROTO_OUTGOING ) ) return 1;
506 0 : return 0; /* continue */
507 0 : }
508 :
509 : /* during_frag is called when a new frag passed early filtering.
510 : Speculatively copies data into a sendmmsg buffer. (If all tiles
511 : respect backpressure could eliminate this copy) */
512 :
513 : static inline void
514 : during_frag( fd_sock_tile_t * ctx,
515 : ulong in_idx,
516 : ulong seq FD_PARAM_UNUSED,
517 : ulong sig,
518 : ulong chunk,
519 : ulong sz,
520 0 : ulong ctl FD_PARAM_UNUSED ) {
521 0 : if( FD_UNLIKELY( chunk<ctx->link_tx[ in_idx ].chunk0 || chunk>ctx->link_tx[ in_idx ].wmark || sz>FD_NET_MTU ) ) {
522 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->link_tx[ in_idx ].chunk0, ctx->link_tx[ in_idx ].wmark ));
523 0 : }
524 :
525 0 : ulong const hdr_min = sizeof(fd_eth_hdr_t)+sizeof(fd_ip4_hdr_t)+sizeof(fd_udp_hdr_t);
526 0 : if( FD_UNLIKELY( sz<hdr_min ) ) {
527 : /* FIXME support ICMP messages in the future? */
528 0 : FD_LOG_ERR(( "packet too small %lu (in_idx=%lu)", sz, in_idx ));
529 0 : }
530 :
531 0 : uchar const * frame = fd_chunk_to_laddr_const( ctx->link_tx[ in_idx ].base, chunk );
532 0 : ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
533 0 : uchar const * payload = frame+hdr_sz;
534 0 : if( FD_UNLIKELY( hdr_sz>sz || hdr_sz<hdr_min ) ) {
535 0 : FD_LOG_ERR(( "packet from in_idx=%lu corrupt: hdr_sz=%lu total_sz=%lu",
536 0 : in_idx, hdr_sz, sz ));
537 0 : }
538 0 : ulong payload_sz = sz-hdr_sz;
539 :
540 0 : fd_ip4_hdr_t const * ip_hdr = (fd_ip4_hdr_t const *)( frame +sizeof(fd_eth_hdr_t) );
541 0 : fd_udp_hdr_t const * udp_hdr = (fd_udp_hdr_t const *)( payload-sizeof(fd_udp_hdr_t) );
542 0 : if( FD_UNLIKELY( ( FD_IP4_GET_VERSION( *ip_hdr )!=4 ) |
543 0 : ( ip_hdr->protocol != FD_IP4_HDR_PROTOCOL_UDP ) ) ) {
544 0 : FD_LOG_ERR(( "packet from in_idx=%lu: sock tile only supports IPv4 UDP for now", in_idx ));
545 0 : }
546 :
547 0 : ulong msg_sz = sizeof(fd_udp_hdr_t) + payload_sz;
548 :
549 0 : ulong batch_idx = ctx->batch_cnt;
550 0 : assert( batch_idx<STEM_BURST );
551 0 : struct mmsghdr * msg = ctx->batch_msg + batch_idx;
552 0 : struct sockaddr_in * sa = ctx->batch_sa + batch_idx;
553 0 : struct iovec * iov = ctx->batch_iov + batch_idx;
554 0 : struct cmsghdr * cmsg = (void *)( (ulong)ctx->batch_cmsg + batch_idx*FD_SOCK_CMSG_MAX );
555 0 : uchar * buf = ctx->tx_ptr;
556 :
557 0 : *iov = (struct iovec) {
558 0 : .iov_base = buf,
559 0 : .iov_len = msg_sz,
560 0 : };
561 0 : sa->sin_family = AF_INET;
562 0 : sa->sin_addr.s_addr = FD_LOAD( uint, ip_hdr->daddr_c );
563 0 : sa->sin_port = 0; /* ignored */
564 :
565 0 : cmsg->cmsg_level = IPPROTO_IP;
566 0 : cmsg->cmsg_type = IP_PKTINFO;
567 0 : cmsg->cmsg_len = CMSG_LEN( sizeof(struct in_pktinfo) );
568 0 : struct in_pktinfo * pi = (struct in_pktinfo *)CMSG_DATA( cmsg );
569 0 : pi->ipi_ifindex = 0;
570 0 : pi->ipi_addr.s_addr = 0;
571 0 : pi->ipi_spec_dst.s_addr = fd_uint_if( !!ip_hdr->saddr, ip_hdr->saddr, ctx->bind_address );
572 :
573 0 : *msg = (struct mmsghdr) {
574 0 : .msg_hdr = {
575 0 : .msg_name = sa,
576 0 : .msg_namelen = sizeof(struct sockaddr_in),
577 0 : .msg_iov = iov,
578 0 : .msg_iovlen = 1,
579 0 : .msg_control = cmsg,
580 0 : .msg_controllen = CMSG_LEN( sizeof(struct in_pktinfo) )
581 0 : }
582 0 : };
583 :
584 0 : memcpy( buf, udp_hdr, sizeof(fd_udp_hdr_t) );
585 0 : fd_memcpy( buf+sizeof(fd_udp_hdr_t), payload, payload_sz );
586 0 : ctx->metrics.tx_bytes_total += sz;
587 0 : }
588 :
589 : /* after_frag is called when a frag was copied into a sendmmsg buffer. */
590 :
591 : static void
592 : after_frag( fd_sock_tile_t * ctx,
593 : ulong in_idx FD_PARAM_UNUSED,
594 : ulong seq FD_PARAM_UNUSED,
595 : ulong sig FD_PARAM_UNUSED,
596 : ulong sz,
597 : ulong tsorig FD_PARAM_UNUSED,
598 : ulong tspub FD_PARAM_UNUSED,
599 0 : fd_stem_context_t * stem FD_PARAM_UNUSED ) {
600 : /* Commit the packet added in during_frag */
601 :
602 0 : ctx->tx_idle_cnt = 0;
603 0 : ctx->batch_cnt++;
604 : /* Technically leaves a gap. sz is always larger than the payload
605 : written to tx_ptr because Ethernet & IPv4 headers were stripped. */
606 0 : ctx->tx_ptr += fd_ulong_align_up( sz, FD_CHUNK_ALIGN );
607 :
608 0 : if( ctx->batch_cnt >= STEM_BURST ) {
609 0 : flush_tx_batch( ctx );
610 0 : }
611 0 : }
612 :
613 : /* End TX path ********************************************************/
614 :
615 : /* after_credit is called every stem iteration when there are enough
616 : flow control credits to publish a burst of fragments. */
617 :
618 : static inline void
619 : after_credit( fd_sock_tile_t * ctx,
620 : fd_stem_context_t * stem,
621 : int * poll_in FD_PARAM_UNUSED,
622 0 : int * charge_busy ) {
623 0 : if( ctx->tx_idle_cnt > 512 ) {
624 0 : if( ctx->batch_cnt ) {
625 0 : flush_tx_batch( ctx );
626 0 : }
627 0 : ulong pkt_cnt = poll_rx( ctx, stem );
628 0 : *charge_busy = pkt_cnt!=0;
629 0 : }
630 0 : ctx->tx_idle_cnt++;
631 0 : }
632 :
633 : static void
634 0 : metrics_write( fd_sock_tile_t * ctx ) {
635 0 : FD_MCNT_SET( SOCK, SYSCALLS_RECVMMSG, ctx->metrics.sys_recvmmsg_cnt );
636 0 : FD_MCNT_ENUM_COPY( SOCK, SYSCALLS_SENDMMSG, ctx->metrics.sys_sendmmsg_cnt );
637 0 : FD_MCNT_SET( SOCK, RX_PKT_CNT, ctx->metrics.rx_pkt_cnt );
638 0 : FD_MCNT_SET( SOCK, TX_PKT_CNT, ctx->metrics.tx_pkt_cnt );
639 0 : FD_MCNT_SET( SOCK, TX_DROP_CNT, ctx->metrics.tx_drop_cnt );
640 0 : FD_MCNT_SET( SOCK, TX_BYTES_TOTAL, ctx->metrics.tx_bytes_total );
641 0 : FD_MCNT_SET( SOCK, RX_BYTES_TOTAL, ctx->metrics.rx_bytes_total );
642 0 : }
643 :
644 : static ulong
645 : rlimit_file_cnt( fd_topo_t const * topo,
646 0 : fd_topo_tile_t const * tile ) {
647 0 : fd_sock_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
648 0 : return RX_SOCK_FD_MIN + ctx->sock_cnt;
649 0 : }
650 :
651 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_sock_tile_t
652 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_sock_tile_t)
653 :
654 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
655 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
656 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
657 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
658 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
659 :
660 : #include "../../stem/fd_stem.c"
661 :
662 : fd_topo_run_tile_t fd_tile_sock = {
663 : .name = "sock",
664 : .rlimit_file_cnt_fn = rlimit_file_cnt,
665 : .populate_allowed_seccomp = populate_allowed_seccomp,
666 : .populate_allowed_fds = populate_allowed_fds,
667 : .scratch_align = scratch_align,
668 : .scratch_footprint = scratch_footprint,
669 : .privileged_init = privileged_init,
670 : .unprivileged_init = unprivileged_init,
671 : .run = stem_run,
672 : };
|