Line data Source code
1 : #define _GNU_SOURCE /* dup3 */
2 : #include "fd_sock_tile_private.h"
3 : #include "../../topo/fd_topo.h"
4 : #include "../../../util/net/fd_eth.h"
5 : #include "../../../util/net/fd_ip4.h"
6 : #include "../../../util/net/fd_udp.h"
7 :
8 : #include <assert.h> /* assert */
9 : #include <stdalign.h> /* alignof */
10 : #include <errno.h>
11 : #include <fcntl.h> /* fcntl */
12 : #include <unistd.h> /* dup3, close */
13 : #include <netinet/in.h> /* sockaddr_in */
14 : #include <sys/socket.h> /* socket */
15 : #include "generated/sock_seccomp.h"
16 : #include "../../metrics/fd_metrics.h"
17 :
18 : /* recv/sendmmsg packet count in batch and tango burst depth
19 : FIXME make configurable in the future?
20 : FIXME keep in sync with fd_net_tile_topo.c */
21 0 : #define STEM_BURST (64UL)
22 :
23 : /* Place RX socket file descriptors in contiguous integer range. */
24 0 : #define RX_SOCK_FD_MIN (128)
25 :
26 : /* Controls max ancillary data size.
27 : Must be aligned by alignof(struct cmsghdr) */
28 0 : #define FD_SOCK_CMSG_MAX (64UL)
29 :
30 : static ulong
31 : populate_allowed_seccomp( fd_topo_t const * topo,
32 : fd_topo_tile_t const * tile,
33 : ulong out_cnt,
34 0 : struct sock_filter * out ) {
35 0 : FD_SCRATCH_ALLOC_INIT( l, fd_topo_obj_laddr( topo, tile->tile_obj_id ) );
36 0 : fd_sock_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sock_tile_t), sizeof(fd_sock_tile_t) );
37 0 : populate_sock_filter_policy_sock( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->tx_sock, RX_SOCK_FD_MIN, RX_SOCK_FD_MIN+(uint)ctx->sock_cnt );
38 0 : return sock_filter_policy_sock_instr_cnt;
39 0 : }
40 :
41 : static ulong
42 : populate_allowed_fds( fd_topo_t const * topo,
43 : fd_topo_tile_t const * tile,
44 : ulong out_fds_cnt,
45 0 : int * out_fds ) {
46 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
47 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
48 0 : fd_sock_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sock_tile_t), sizeof(fd_sock_tile_t) );
49 :
50 0 : ulong sock_cnt = ctx->sock_cnt;
51 0 : if( FD_UNLIKELY( out_fds_cnt<sock_cnt+3UL ) ) {
52 0 : FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
53 0 : }
54 :
55 0 : ulong out_cnt = 0UL;
56 :
57 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
58 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
59 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
60 0 : }
61 0 : out_fds[ out_cnt++ ] = ctx->tx_sock;
62 0 : for( ulong j=0UL; j<sock_cnt; j++ ) {
63 0 : out_fds[ out_cnt++ ] = ctx->pollfd[ j ].fd;
64 0 : }
65 0 : return out_cnt;
66 0 : }
67 :
68 : FD_FN_CONST static inline ulong
69 0 : tx_scratch_footprint( void ) {
70 0 : return STEM_BURST * fd_ulong_align_up( FD_NET_MTU, FD_CHUNK_ALIGN );
71 0 : }
72 :
73 : FD_FN_CONST static inline ulong
74 0 : scratch_align( void ) {
75 0 : return 4096UL;
76 0 : }
77 :
78 : FD_FN_PURE static inline ulong
79 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
80 0 : ulong l = FD_LAYOUT_INIT;
81 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_sock_tile_t), sizeof(fd_sock_tile_t) );
82 0 : l = FD_LAYOUT_APPEND( l, alignof(struct iovec), STEM_BURST*sizeof(struct iovec) );
83 0 : l = FD_LAYOUT_APPEND( l, alignof(struct cmsghdr), STEM_BURST*FD_SOCK_CMSG_MAX );
84 0 : l = FD_LAYOUT_APPEND( l, alignof(struct sockaddr_in), STEM_BURST*sizeof(struct sockaddr_in) );
85 0 : l = FD_LAYOUT_APPEND( l, alignof(struct mmsghdr), STEM_BURST*sizeof(struct mmsghdr) );
86 0 : l = FD_LAYOUT_APPEND( l, FD_CHUNK_ALIGN, tx_scratch_footprint() );
87 0 : return FD_LAYOUT_FINI( l, scratch_align() );
88 0 : }
89 :
90 : /* create_udp_socket creates and configures a new UDP socket for the
91 : sock tile at the given file descriptor ID. */
92 :
93 : static void
94 : create_udp_socket( int sock_fd,
95 0 : ushort udp_port ) {
96 :
97 0 : if( fcntl( sock_fd, F_GETFD, 0 )!=-1 ) {
98 0 : FD_LOG_ERR(( "file descriptor %d already exists", sock_fd ));
99 0 : } else if( errno!=EBADF ) {
100 0 : FD_LOG_ERR(( "fcntl(F_GETFD) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
101 0 : }
102 :
103 0 : int orig_fd = socket( AF_INET, SOCK_DGRAM, IPPROTO_UDP );
104 0 : if( FD_UNLIKELY( orig_fd<0 ) ) {
105 0 : FD_LOG_ERR(( "socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
106 0 : }
107 :
108 0 : int reuseport = 1;
109 0 : if( FD_UNLIKELY( setsockopt( orig_fd, SOL_SOCKET, SO_REUSEPORT, &reuseport, sizeof(int) )<0 ) ) {
110 0 : FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_REUSEPORT,1) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
111 0 : }
112 :
113 0 : int ip_pktinfo = 1;
114 0 : if( FD_UNLIKELY( setsockopt( orig_fd, IPPROTO_IP, IP_PKTINFO, &ip_pktinfo, sizeof(int) )<0 ) ) {
115 0 : FD_LOG_ERR(( "setsockopt(IPPROTO_IP,IP_PKTINFO,1) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
116 0 : }
117 :
118 : /* TODO SO_RCVBUF */
119 :
120 0 : struct sockaddr_in saddr = {
121 0 : .sin_family = AF_INET,
122 0 : .sin_addr.s_addr = 0,
123 0 : .sin_port = fd_ushort_bswap( udp_port ),
124 0 : };
125 0 : if( FD_UNLIKELY( 0!=bind( orig_fd, fd_type_pun_const( &saddr ), sizeof(struct sockaddr_in) ) ) ) {
126 0 : FD_LOG_ERR(( "bind(0.0.0.0:%i) failed (%i-%s)", udp_port, errno, fd_io_strerror( errno ) ));
127 0 : }
128 :
129 0 : # if defined(__linux__)
130 0 : int dup_res = dup3( orig_fd, sock_fd, O_CLOEXEC );
131 : # else
132 : int dup_res = dup2( orig_fd, sock_fd );
133 : # endif
134 0 : if( FD_UNLIKELY( dup_res!=sock_fd ) ) {
135 0 : FD_LOG_ERR(( "dup2 returned %i (%i-%s)", sock_fd, errno, fd_io_strerror( errno ) ));
136 0 : }
137 :
138 0 : if( FD_UNLIKELY( 0!=close( orig_fd ) ) ) {
139 0 : FD_LOG_ERR(( "close(%d) failed (%i-%s)", orig_fd, errno, fd_io_strerror( errno ) ));
140 0 : }
141 :
142 0 : }
143 :
144 : static void
145 : privileged_init( fd_topo_t * topo,
146 0 : fd_topo_tile_t * tile ) {
147 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
148 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
149 0 : fd_sock_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sock_tile_t), sizeof(fd_sock_tile_t) );
150 0 : struct iovec * batch_iov = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct iovec), STEM_BURST*sizeof(struct iovec) );
151 0 : void * batch_cmsg = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct cmsghdr), STEM_BURST*FD_SOCK_CMSG_MAX );
152 0 : struct sockaddr_in * batch_sa = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct sockaddr_in), STEM_BURST*sizeof(struct sockaddr_in) );
153 0 : struct mmsghdr * batch_msg = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct mmsghdr), STEM_BURST*sizeof(struct mmsghdr) );
154 0 : uchar * tx_scratch = FD_SCRATCH_ALLOC_APPEND( l, FD_CHUNK_ALIGN, tx_scratch_footprint() );
155 :
156 0 : assert( scratch==ctx );
157 :
158 0 : fd_memset( ctx, 0, sizeof(fd_sock_tile_t) );
159 0 : fd_memset( batch_iov, 0, STEM_BURST*sizeof(struct iovec) );
160 0 : fd_memset( batch_sa, 0, STEM_BURST*sizeof(struct sockaddr_in) );
161 0 : fd_memset( batch_msg, 0, STEM_BURST*sizeof(struct mmsghdr) );
162 :
163 0 : ctx->batch_cnt = 0UL;
164 0 : ctx->batch_iov = batch_iov;
165 0 : ctx->batch_cmsg = batch_cmsg;
166 0 : ctx->batch_sa = batch_sa;
167 0 : ctx->batch_msg = batch_msg;
168 0 : ctx->tx_scratch0 = tx_scratch;
169 0 : ctx->tx_scratch1 = tx_scratch + tx_scratch_footprint();
170 0 : ctx->tx_ptr = tx_scratch;
171 :
172 : /* Create receive sockets. Incrementally assign them to file
173 : descriptors starting at sock_fd_min. */
174 :
175 0 : int sock_fd_min = RX_SOCK_FD_MIN;
176 0 : ushort udp_port_candidates[] = {
177 0 : (ushort)tile->net.legacy_transaction_listen_port,
178 0 : (ushort)tile->net.quic_transaction_listen_port,
179 0 : (ushort)tile->net.shred_listen_port,
180 0 : (ushort)tile->net.gossip_listen_port,
181 0 : (ushort)tile->net.repair_intake_listen_port,
182 0 : (ushort)tile->net.repair_serve_listen_port,
183 0 : };
184 0 : static char const * udp_port_links[] = {
185 0 : "net_quic", /* legacy_transaction_listen_port */
186 0 : "net_quic", /* quic_transaction_listen_port */
187 0 : "net_shred", /* shred_listen_port */
188 0 : "net_gossip", /* gossip_listen_port */
189 0 : "net_repair", /* repair_intake_listen_port */
190 0 : "net_repair" /* repair_intake_listen_port */
191 0 : };
192 0 : static uchar const udp_port_protos[] = {
193 0 : DST_PROTO_TPU_UDP, /* legacy_transaction_listen_port */
194 0 : DST_PROTO_TPU_QUIC, /* quic_transaction_listen_port */
195 0 : DST_PROTO_SHRED, /* shred_listen_port */
196 0 : DST_PROTO_GOSSIP, /* gossip_listen_port */
197 0 : DST_PROTO_REPAIR, /* repair_intake_listen_port */
198 : DST_PROTO_REPAIR /* repair_intake_listen_port */
199 0 : };
200 0 : for( uint candidate_idx=0U; candidate_idx<6; candidate_idx++ ) {
201 0 : if( !udp_port_candidates[ candidate_idx ] ) continue;
202 0 : uint sock_idx = ctx->sock_cnt;
203 0 : if( candidate_idx>FD_SOCK_TILE_MAX_SOCKETS ) FD_LOG_ERR(( "too many sockets" ));
204 0 : ushort port = (ushort)udp_port_candidates[ candidate_idx ];
205 :
206 0 : char const * target_link = udp_port_links[ candidate_idx ];
207 0 : ctx->link_rx_map[ sock_idx ] = 0xFF;
208 0 : for( ulong j=0UL; j<(tile->out_cnt); j++ ) {
209 0 : if( 0==strcmp( topo->links[ tile->out_link_id[ j ] ].name, target_link ) ) {
210 0 : ctx->proto_id [ sock_idx ] = (uchar)udp_port_protos[ candidate_idx ];
211 0 : ctx->link_rx_map [ sock_idx ] = (uchar)j;
212 0 : ctx->rx_sock_port[ sock_idx ] = (ushort)port;
213 0 : break;
214 0 : }
215 0 : }
216 0 : if( ctx->link_rx_map[ sock_idx ]==0xFF ) {
217 0 : continue; /* listen port number has no associated links */
218 0 : }
219 :
220 0 : int sock_fd = sock_fd_min + (int)sock_idx;
221 0 : create_udp_socket( sock_fd, port );
222 0 : ctx->pollfd[ sock_idx ].fd = sock_fd;
223 0 : ctx->pollfd[ sock_idx ].events = POLLIN;
224 0 : ctx->sock_cnt++;
225 0 : }
226 :
227 : /* Create transmit socket */
228 :
229 0 : int tx_sock = socket( AF_INET, SOCK_RAW|SOCK_CLOEXEC, FD_IP4_HDR_PROTOCOL_UDP );
230 0 : if( FD_UNLIKELY( tx_sock<0 ) ) {
231 0 : FD_LOG_ERR(( "socket(AF_INET,SOCK_RAW|SOCK_CLOEXEC,17) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
232 0 : }
233 :
234 0 : ctx->tx_sock = tx_sock;
235 :
236 0 : }
237 :
238 : static void
239 : unprivileged_init( fd_topo_t * topo,
240 0 : fd_topo_tile_t * tile ) {
241 0 : fd_sock_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
242 :
243 0 : if( FD_UNLIKELY( tile->out_cnt > MAX_NET_OUTS ) ) {
244 0 : FD_LOG_ERR(( "sock tile has %lu out links which exceeds the max (%lu)", tile->out_cnt, MAX_NET_OUTS ));
245 0 : }
246 :
247 0 : for( ulong i=0UL; i<(tile->out_cnt); i++ ) {
248 0 : if( 0!=strncmp( topo->links[ tile->out_link_id[ i ] ].name, "net_", 4 ) ) {
249 0 : FD_LOG_ERR(( "out link %lu is not a net RX link", i ));
250 0 : }
251 0 : fd_topo_link_t * link = &topo->links[ tile->out_link_id[ i ] ];
252 0 : ctx->link_rx[ i ].base = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
253 0 : ctx->link_rx[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->link_rx[ i ].base, link->dcache );
254 0 : ctx->link_rx[ i ].wmark = fd_dcache_compact_wmark( ctx->link_rx[ i ].base, link->dcache, link->mtu );
255 0 : ctx->link_rx[ i ].chunk = ctx->link_rx[ i ].chunk0;
256 0 : if( FD_UNLIKELY( link->burst < STEM_BURST ) ) {
257 0 : FD_LOG_ERR(( "link %lu dcache burst is too low (%lu<%lu)",
258 0 : tile->out_link_id[ i ], link->burst, STEM_BURST ));
259 0 : }
260 0 : }
261 :
262 0 : for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
263 0 : if( !strstr( topo->links[ tile->in_link_id[ i ] ].name, "_net" ) ) {
264 0 : FD_LOG_ERR(( "in link %lu is not a net TX link", i ));
265 0 : }
266 0 : fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
267 0 : ctx->link_tx[ i ].base = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
268 0 : ctx->link_tx[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->link_tx[ i ].base, link->dcache );
269 0 : ctx->link_tx[ i ].wmark = fd_dcache_compact_wmark( ctx->link_tx[ i ].base, link->dcache, link->mtu );
270 0 : }
271 :
272 0 : }
273 :
274 : /* RX PATH (socket->tango) ********************************************/
275 :
276 : /* FIXME Pace RX polling and interleave it with TX jobs to reduce TX
277 : tail latency */
278 :
279 : /* poll_rx_socket does one recvmmsg batch receive on the given socket
280 : index. Returns the number of packets returned by recvmmsg. */
281 :
282 : static ulong
283 : poll_rx_socket( fd_sock_tile_t * ctx,
284 : fd_stem_context_t * stem,
285 : uint sock_idx,
286 : int sock_fd,
287 0 : ushort proto ) {
288 0 : ulong hdr_sz = sizeof(fd_eth_hdr_t) + sizeof(fd_ip4_hdr_t) + sizeof(fd_udp_hdr_t);
289 0 : ulong payload_max = FD_NET_MTU-hdr_sz;
290 0 : uchar rx_link = ctx->link_rx_map[ sock_idx ];
291 0 : ushort dport = ctx->rx_sock_port[ sock_idx ];
292 :
293 0 : fd_sock_link_rx_t * link = ctx->link_rx + rx_link;
294 0 : void * const base = link->base;
295 0 : ulong const chunk0 = link->chunk0;
296 0 : ulong const wmark = link->wmark;
297 0 : ulong chunk_next = link->chunk;
298 0 : uchar * cmsg_next = ctx->batch_cmsg;
299 :
300 0 : ctx->tx_ptr = ctx->tx_scratch0;
301 0 : for( ulong j=0UL; j<STEM_BURST; j++ ) {
302 0 : ctx->batch_iov[ j ].iov_base = (uchar *)fd_chunk_to_laddr( base, chunk_next ) + hdr_sz;
303 0 : ctx->batch_iov[ j ].iov_len = payload_max;
304 0 : ctx->batch_msg[ j ].msg_hdr = (struct msghdr) {
305 0 : .msg_iov = ctx->batch_iov+j,
306 0 : .msg_iovlen = 1,
307 0 : .msg_name = ctx->batch_sa+j,
308 0 : .msg_namelen = sizeof(struct sockaddr_in),
309 0 : .msg_control = cmsg_next,
310 0 : .msg_controllen = FD_SOCK_CMSG_MAX,
311 0 : };
312 0 : cmsg_next += FD_SOCK_CMSG_MAX;
313 : /* Speculatively prepare all chunk indexes for a receive.
314 : At function exit, chunks into which a packet was received are
315 : committed, all others are freed. */
316 0 : chunk_next = fd_dcache_compact_next( chunk_next, FD_NET_MTU, chunk0, wmark );
317 0 : }
318 0 : if( FD_UNLIKELY( ctx->tx_ptr > ctx->tx_scratch1 ) ) {
319 0 : FD_LOG_ERR(( "Out of bounds batch" ));
320 0 : }
321 :
322 0 : int msg_cnt = recvmmsg( sock_fd, ctx->batch_msg, STEM_BURST, MSG_DONTWAIT, NULL );
323 0 : if( FD_UNLIKELY( msg_cnt<0 ) ) {
324 0 : if( FD_LIKELY( errno==EAGAIN ) ) return 0UL;
325 : /* unreachable if socket is in a valid state */
326 0 : FD_LOG_ERR(( "recvmmsg failed (%i-%s)", errno, fd_io_strerror( errno ) ));
327 0 : }
328 0 : long ts = fd_tickcount();
329 0 : ctx->metrics.sys_recvmmsg_cnt++;
330 :
331 0 : if( FD_UNLIKELY( msg_cnt==0 ) ) return 0UL;
332 :
333 : /* Track the chunk index of the last frag populated, so we can derive
334 : the chunk indexes for the next poll_rx_socket call.
335 : Guaranteed to be set since msg_cnt>0. */
336 0 : ulong last_chunk;
337 :
338 0 : for( ulong j=0; j<(ulong)msg_cnt; j++ ) {
339 0 : uchar * payload = ctx->batch_iov[ j ].iov_base;
340 0 : ulong payload_sz = ctx->batch_msg[ j ].msg_len;
341 0 : struct sockaddr_in * sa = ctx->batch_msg[ j ].msg_hdr.msg_name;
342 0 : if( FD_UNLIKELY( sa->sin_family!=AF_INET ) ) {
343 : /* unreachable */
344 0 : FD_LOG_ERR(( "Received packet with unexpected sin_family %i", sa->sin_family ));
345 0 : continue;
346 0 : }
347 :
348 0 : long daddr = -1;
349 0 : struct cmsghdr * cmsg = CMSG_FIRSTHDR( &ctx->batch_msg[ j ].msg_hdr );
350 0 : if( FD_LIKELY( cmsg ) ) {
351 0 : do {
352 0 : if( FD_LIKELY( (cmsg->cmsg_level==IPPROTO_IP) &
353 0 : (cmsg->cmsg_type ==IP_PKTINFO) ) ) {
354 0 : struct in_pktinfo const * pi = (struct in_pktinfo const *)CMSG_DATA( cmsg );
355 0 : daddr = pi->ipi_addr.s_addr;
356 0 : }
357 0 : cmsg = CMSG_NXTHDR( &ctx->batch_msg[ j ].msg_hdr, cmsg );
358 0 : } while( FD_UNLIKELY( cmsg ) ); /* optimize for 1 cmsg */
359 0 : }
360 0 : if( FD_UNLIKELY( daddr<0L ) ) {
361 : /* unreachable because IP_PKTINFO was set */
362 0 : FD_LOG_ERR(( "Missing IP_PKTINFO on incoming packet" ));
363 0 : }
364 :
365 0 : ulong frame_sz = payload_sz + hdr_sz;
366 0 : fd_eth_hdr_t * eth_hdr = (fd_eth_hdr_t *)( payload-42UL );
367 0 : fd_ip4_hdr_t * ip_hdr = (fd_ip4_hdr_t *)( payload-28UL );
368 0 : fd_udp_hdr_t * udp_hdr = (fd_udp_hdr_t *)( payload- 8UL );
369 0 : memset( eth_hdr->dst, 0, 6 );
370 0 : memset( eth_hdr->src, 0, 6 );
371 0 : eth_hdr->net_type = fd_ushort_bswap( FD_ETH_HDR_TYPE_IP );
372 0 : *ip_hdr = (fd_ip4_hdr_t) {
373 0 : .verihl = FD_IP4_VERIHL( 4, 5 ),
374 0 : .net_tot_len = fd_ushort_bswap( (ushort)( payload_sz+28UL ) ),
375 0 : .ttl = 1,
376 0 : .protocol = FD_IP4_HDR_PROTOCOL_UDP,
377 0 : };
378 0 : uint daddr_ = (uint)(ulong)daddr;
379 0 : memcpy( ip_hdr->saddr_c, &sa->sin_addr.s_addr, 4 );
380 0 : memcpy( ip_hdr->daddr_c, &daddr_, 4 );
381 0 : *udp_hdr = (fd_udp_hdr_t) {
382 0 : .net_sport = sa->sin_port,
383 0 : .net_dport = (ushort)fd_ushort_bswap( (ushort)dport ),
384 0 : .net_len = (ushort)fd_ushort_bswap( (ushort)( payload_sz+8UL ) ),
385 0 : .check = 0
386 0 : };
387 :
388 0 : ctx->metrics.rx_pkt_cnt++;
389 0 : ulong chunk = fd_laddr_to_chunk( base, eth_hdr );
390 0 : ulong sig = fd_disco_netmux_sig( sa->sin_addr.s_addr, fd_ushort_bswap( sa->sin_port ), 0U, proto, hdr_sz );
391 0 : ulong tspub = fd_frag_meta_ts_comp( ts );
392 0 : fd_stem_publish( stem, rx_link, sig, chunk, frame_sz, 0UL, 0UL, tspub );
393 0 : last_chunk = chunk;
394 0 : }
395 :
396 : /* Rewind the chunk index to the first free index. */
397 0 : link->chunk = fd_dcache_compact_next( last_chunk, FD_NET_MTU, chunk0, wmark );
398 0 : return (ulong)msg_cnt;
399 0 : }
400 :
401 : static ulong
402 : poll_rx( fd_sock_tile_t * ctx,
403 0 : fd_stem_context_t * stem ) {
404 0 : ulong pkt_cnt = 0UL;
405 0 : if( FD_UNLIKELY( ctx->batch_cnt ) ) {
406 0 : FD_LOG_ERR(( "Batch is not clean" ));
407 0 : }
408 0 : ctx->tx_idle_cnt = 0; /* restart TX polling */
409 0 : if( FD_UNLIKELY( poll( ctx->pollfd, ctx->sock_cnt, 0 )<0 ) ) {
410 0 : FD_LOG_ERR(( "poll failed (%i-%s)", errno, fd_io_strerror( errno ) ));
411 0 : }
412 0 : for( uint j=0UL; j<ctx->sock_cnt; j++ ) {
413 0 : if( ctx->pollfd[ j ].revents & (POLLIN|POLLERR) ) {
414 0 : pkt_cnt += poll_rx_socket(
415 0 : ctx,
416 0 : stem,
417 0 : j,
418 0 : ctx->pollfd[ j ].fd,
419 0 : ctx->proto_id[ j ]
420 0 : );
421 0 : }
422 0 : ctx->pollfd[ j ].revents = 0;
423 0 : }
424 0 : return pkt_cnt;
425 0 : }
426 :
427 : /* TX PATH (tango->socket) ********************************************/
428 :
429 : static void
430 0 : flush_tx_batch( fd_sock_tile_t * ctx ) {
431 0 : ulong batch_cnt = ctx->batch_cnt;
432 0 : int send_cnt = sendmmsg( ctx->tx_sock, ctx->batch_msg, (uint)batch_cnt, MSG_DONTWAIT );
433 0 : if( FD_UNLIKELY( send_cnt<(long)batch_cnt ) ) {
434 0 : if( FD_UNLIKELY( send_cnt<0 ) ) {
435 0 : if( FD_LIKELY( (errno==EAGAIN) | (errno==ENOBUFS) ) ) {
436 0 : ctx->metrics.tx_drop_cnt += batch_cnt;
437 0 : goto done;
438 0 : }
439 0 : FD_LOG_ERR(( "sendmmsg failed (%i-%s)", errno, fd_io_strerror( errno ) ));
440 0 : }
441 0 : ctx->metrics.tx_drop_cnt += batch_cnt-(ulong)send_cnt;
442 0 : }
443 0 : ctx->metrics.tx_pkt_cnt += (ulong)send_cnt;
444 0 : done:
445 0 : ctx->metrics.sys_sendmmsg_cnt++;
446 0 : ctx->batch_cnt = 0;
447 0 : }
448 :
449 : /* before_frag is called when a new frag has been detected. The sock
450 : tile can do early filtering here in the future. For example, it may
451 : want to install routing logic here to take turns with an XDP tile.
452 : (Fast path with slow fallback) */
453 :
454 : static inline int
455 : before_frag( fd_sock_tile_t * ctx FD_PARAM_UNUSED,
456 : ulong in_idx FD_PARAM_UNUSED,
457 : ulong seq FD_PARAM_UNUSED,
458 0 : ulong sig ) {
459 0 : ulong proto = fd_disco_netmux_sig_proto( sig );
460 0 : if( FD_UNLIKELY( proto!=DST_PROTO_OUTGOING ) ) return 1;
461 0 : return 0; /* continue */
462 0 : }
463 :
464 : /* during_frag is called when a new frag passed early filtering.
465 : Speculatively copies data into a sendmmsg buffer. (If all tiles
466 : respect backpressure could eliminate this copy) */
467 :
468 : static inline void
469 : during_frag( fd_sock_tile_t * ctx,
470 : ulong in_idx,
471 : ulong seq FD_PARAM_UNUSED,
472 : ulong sig,
473 : ulong chunk,
474 : ulong sz,
475 0 : ulong ctl FD_PARAM_UNUSED ) {
476 0 : if( FD_UNLIKELY( chunk<ctx->link_tx[ in_idx ].chunk0 || chunk>ctx->link_tx[ in_idx ].wmark || sz>FD_NET_MTU ) ) {
477 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->link_tx[ in_idx ].chunk0, ctx->link_tx[ in_idx ].wmark ));
478 0 : }
479 :
480 0 : ulong const hdr_min = sizeof(fd_eth_hdr_t)+sizeof(fd_ip4_hdr_t)+sizeof(fd_udp_hdr_t);
481 0 : if( FD_UNLIKELY( sz<hdr_min ) ) {
482 : /* FIXME support ICMP messages in the future? */
483 0 : FD_LOG_ERR(( "packet too small %lu (in_idx=%lu)", sz, in_idx ));
484 0 : }
485 :
486 0 : uchar const * frame = fd_chunk_to_laddr_const( ctx->link_tx[ in_idx ].base, chunk );
487 0 : ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
488 0 : uchar const * payload = frame+hdr_sz;
489 0 : if( FD_UNLIKELY( hdr_sz>sz || hdr_sz<hdr_min ) ) {
490 0 : FD_LOG_ERR(( "packet from in_idx=%lu corrupt: hdr_sz=%lu total_sz=%lu",
491 0 : in_idx, hdr_sz, sz ));
492 0 : }
493 0 : ulong payload_sz = sz-hdr_sz;
494 :
495 0 : fd_ip4_hdr_t const * ip_hdr = (fd_ip4_hdr_t const *)( frame +sizeof(fd_eth_hdr_t) );
496 0 : fd_udp_hdr_t const * udp_hdr = (fd_udp_hdr_t const *)( payload-sizeof(fd_udp_hdr_t) );
497 0 : if( FD_UNLIKELY( ( FD_IP4_GET_VERSION( *ip_hdr )!=4 ) |
498 0 : ( ip_hdr->protocol != FD_IP4_HDR_PROTOCOL_UDP ) ) ) {
499 0 : FD_LOG_ERR(( "packet from in_idx=%lu: sock tile only supports IPv4 UDP for now", in_idx ));
500 0 : }
501 :
502 0 : ulong msg_sz = sizeof(fd_udp_hdr_t) + payload_sz;
503 :
504 0 : ulong batch_idx = ctx->batch_cnt;
505 0 : assert( batch_idx<STEM_BURST );
506 0 : struct mmsghdr * msg = ctx->batch_msg + batch_idx;
507 0 : struct sockaddr_in * sa = ctx->batch_sa + batch_idx;
508 0 : struct iovec * iov = ctx->batch_iov + batch_idx;
509 0 : struct cmsghdr * cmsg = (void *)( (ulong)ctx->batch_cmsg + batch_idx*FD_SOCK_CMSG_MAX );
510 0 : uchar * buf = ctx->tx_ptr;
511 :
512 0 : *iov = (struct iovec) {
513 0 : .iov_base = buf,
514 0 : .iov_len = msg_sz,
515 0 : };
516 0 : sa->sin_family = AF_INET;
517 0 : sa->sin_addr.s_addr = FD_LOAD( uint, ip_hdr->daddr_c );
518 0 : sa->sin_port = 0; /* ignored */
519 :
520 0 : cmsg->cmsg_level = IPPROTO_IP;
521 0 : cmsg->cmsg_type = IP_PKTINFO;
522 0 : cmsg->cmsg_len = CMSG_LEN( sizeof(struct in_pktinfo) );
523 0 : struct in_pktinfo * pi = (struct in_pktinfo *)CMSG_DATA( cmsg );
524 0 : pi->ipi_ifindex = 0;
525 0 : pi->ipi_addr.s_addr = 0;
526 0 : pi->ipi_spec_dst.s_addr = ip_hdr->saddr;
527 :
528 0 : *msg = (struct mmsghdr) {
529 0 : .msg_hdr = {
530 0 : .msg_name = sa,
531 0 : .msg_namelen = sizeof(struct sockaddr_in),
532 0 : .msg_iov = iov,
533 0 : .msg_iovlen = 1,
534 0 : .msg_control = cmsg,
535 0 : .msg_controllen = CMSG_LEN( sizeof(struct in_pktinfo) )
536 0 : }
537 0 : };
538 :
539 0 : memcpy( buf, udp_hdr, sizeof(fd_udp_hdr_t) );
540 0 : fd_memcpy( buf+sizeof(fd_udp_hdr_t), payload, payload_sz );
541 0 : }
542 :
543 : /* after_frag is called when a frag was copied into a sendmmsg buffer. */
544 :
545 : static void
546 : after_frag( fd_sock_tile_t * ctx,
547 : ulong in_idx FD_PARAM_UNUSED,
548 : ulong seq FD_PARAM_UNUSED,
549 : ulong sig FD_PARAM_UNUSED,
550 : ulong sz,
551 : ulong tsorig FD_PARAM_UNUSED,
552 : ulong tspub FD_PARAM_UNUSED,
553 0 : fd_stem_context_t * stem FD_PARAM_UNUSED ) {
554 : /* Commit the packet added in during_frag */
555 :
556 0 : ctx->tx_idle_cnt = 0;
557 0 : ctx->batch_cnt++;
558 : /* Technically leaves a gap. sz is always larger than the payload
559 : written to tx_ptr because Ethernet & IPv4 headers were stripped. */
560 0 : ctx->tx_ptr += fd_ulong_align_up( sz, FD_CHUNK_ALIGN );
561 :
562 0 : if( ctx->batch_cnt >= STEM_BURST ) {
563 0 : flush_tx_batch( ctx );
564 0 : }
565 0 : }
566 :
567 : /* End TX path ********************************************************/
568 :
569 : /* after_credit is called every stem iteration when there are enough
570 : flow control credits to publish a burst of fragments. */
571 :
572 : static inline void
573 : after_credit( fd_sock_tile_t * ctx,
574 : fd_stem_context_t * stem,
575 : int * poll_in FD_PARAM_UNUSED,
576 0 : int * charge_busy ) {
577 0 : if( ctx->tx_idle_cnt > 512 ) {
578 0 : if( ctx->batch_cnt ) {
579 0 : flush_tx_batch( ctx );
580 0 : }
581 0 : ulong pkt_cnt = poll_rx( ctx, stem );
582 0 : *charge_busy = pkt_cnt!=0;
583 0 : }
584 0 : ctx->tx_idle_cnt++;
585 0 : }
586 :
587 : static void
588 0 : metrics_write( fd_sock_tile_t * ctx ) {
589 0 : FD_MCNT_SET( SOCK, SYSCALLS_RECVMMSG, ctx->metrics.sys_recvmmsg_cnt );
590 0 : FD_MCNT_SET( SOCK, SYSCALLS_SENDMMSG, ctx->metrics.sys_sendmmsg_cnt );
591 0 : FD_MCNT_SET( SOCK, RX_PKT_CNT, ctx->metrics.rx_pkt_cnt );
592 0 : FD_MCNT_SET( SOCK, TX_PKT_CNT, ctx->metrics.tx_pkt_cnt );
593 0 : FD_MCNT_SET( SOCK, TX_DROP_CNT, ctx->metrics.tx_drop_cnt );
594 0 : }
595 :
596 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_sock_tile_t
597 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_sock_tile_t)
598 :
599 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
600 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
601 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
602 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
603 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
604 :
605 : #include "../../stem/fd_stem.c"
606 :
607 : fd_topo_run_tile_t fd_tile_sock = {
608 : .name = "sock",
609 : .populate_allowed_seccomp = populate_allowed_seccomp,
610 : .populate_allowed_fds = populate_allowed_fds,
611 : .scratch_align = scratch_align,
612 : .scratch_footprint = scratch_footprint,
613 : .privileged_init = privileged_init,
614 : .unprivileged_init = unprivileged_init,
615 : .run = stem_run,
616 : };
|