Line data Source code
1 : #include "../../../../disco/tiles.h"
2 :
3 : /* The net tile translates between AF_XDP and fd_tango
4 : traffic. It is responsible for setting up the XDP and
5 : XSK socket configuration.
6 :
7 : ### Why does this tile bind to loopback?
8 :
9 : The Linux kernel does some short circuiting optimizations
10 : when sending packets to an IP address that's owned by the
11 : same host. The optimization is basically to route them over
12 : to the loopback interface directly, bypassing the network
13 : hardware.
14 :
15 : This redirection to the loopback interface happens before
16 : XDP programs are executed, so local traffic destined for
17 : our listen addresses will not get ingested correctly.
18 :
19 : There are two reasons we send traffic locally,
20 :
21 : * For testing and development.
22 : * The Agave code sends local traffic to itself to
23 : as part of routine operation (eg, when it's the leader
24 : it sends votes to its own TPU socket).
25 :
26 : So for now we need to also bind to loopback. This is a
27 : small performance hit for other traffic, but we only
28 : redirect packets destined for our target IP and port so
29 : it will not otherwise interfere. Loopback only supports
30 : XDP in SKB mode. */
31 :
32 : #include <errno.h>
33 : #include <fcntl.h>
34 : #include <net/if.h>
35 : #include <sys/socket.h> /* MSG_DONTWAIT needed before importing the net seccomp filter */
36 : #include <linux/if_xdp.h>
37 :
38 : #include "generated/net_seccomp.h"
39 :
40 : #include "../../../../disco/metrics/fd_metrics.h"
41 :
42 : #include "../../../../waltz/quic/fd_quic.h"
43 : #include "../../../../waltz/xdp/fd_xdp.h"
44 : #include "../../../../waltz/xdp/fd_xdp1.h"
45 : #include "../../../../waltz/xdp/fd_xsk_aio_private.h"
46 : #include "../../../../waltz/xdp/fd_xsk_private.h"
47 : #include "../../../../util/net/fd_ip4.h"
48 : #include "../../../../waltz/ip/fd_ip.h"
49 :
50 : #include <unistd.h>
51 : #include <linux/unistd.h>
52 :
53 : #define MAX_NET_INS (32UL)
54 :
55 : typedef struct {
56 : fd_wksp_t * mem;
57 : ulong chunk0;
58 : ulong wmark;
59 : } fd_net_in_ctx_t;
60 :
61 : typedef struct {
62 : fd_frag_meta_t * mcache;
63 : ulong * sync;
64 : ulong depth;
65 : ulong seq;
66 :
67 : fd_wksp_t * mem;
68 : ulong chunk0;
69 : ulong wmark;
70 : ulong chunk;
71 : } fd_net_out_ctx_t;
72 :
73 : typedef struct {
74 : ulong xsk_cnt;
75 : fd_xsk_t * xsk[ 2 ];
76 : fd_xsk_aio_t * xsk_aio[ 2 ];
77 : int prog_link_fds[ 2 ];
78 :
79 : ulong round_robin_cnt;
80 : ulong round_robin_id;
81 :
82 : const fd_aio_t * tx;
83 : const fd_aio_t * lo_tx;
84 :
85 : uchar frame[ FD_NET_MTU ];
86 :
87 : uint src_ip_addr;
88 : uchar src_mac_addr[6];
89 :
90 : ushort shred_listen_port;
91 : ushort quic_transaction_listen_port;
92 : ushort legacy_transaction_listen_port;
93 : ushort gossip_listen_port;
94 : ushort repair_intake_listen_port;
95 : ushort repair_serve_listen_port;
96 :
97 : ulong in_cnt;
98 : fd_net_in_ctx_t in[ MAX_NET_INS ];
99 :
100 : fd_net_out_ctx_t quic_out[1];
101 : fd_net_out_ctx_t shred_out[1];
102 : fd_net_out_ctx_t gossip_out[1];
103 : fd_net_out_ctx_t repair_out[1];
104 :
105 : fd_ip_t * ip;
106 : long ip_next_upd;
107 :
108 : struct {
109 : ulong tx_dropped_cnt;
110 : } metrics;
111 : } fd_net_ctx_t;
112 :
113 : FD_FN_CONST static inline ulong
114 3 : scratch_align( void ) {
115 3 : return 4096UL;
116 3 : }
117 :
118 : FD_FN_PURE static inline ulong
119 3 : scratch_footprint( fd_topo_tile_t const * tile ) {
120 : /* TODO reproducing this conditional memory layout twice is susceptible to bugs. Use more robust object discovery */
121 3 : (void)tile;
122 3 : ulong l = FD_LAYOUT_INIT;
123 3 : l = FD_LAYOUT_APPEND( l, alignof(fd_net_ctx_t), sizeof(fd_net_ctx_t) );
124 3 : l = FD_LAYOUT_APPEND( l, fd_aio_align(), fd_aio_footprint() );
125 3 : l = FD_LAYOUT_APPEND( l, fd_xsk_align(), fd_xsk_footprint( FD_NET_MTU, tile->net.xdp_rx_queue_size, tile->net.xdp_rx_queue_size, tile->net.xdp_tx_queue_size, tile->net.xdp_tx_queue_size ) );
126 3 : l = FD_LAYOUT_APPEND( l, fd_xsk_aio_align(), fd_xsk_aio_footprint( tile->net.xdp_tx_queue_size, tile->net.xdp_aio_depth ) );
127 3 : if( FD_UNLIKELY( strcmp( tile->net.interface, "lo" ) && tile->kind_id == 0 ) ) {
128 3 : l = FD_LAYOUT_APPEND( l, fd_xsk_align(), fd_xsk_footprint( FD_NET_MTU, tile->net.xdp_rx_queue_size, tile->net.xdp_rx_queue_size, tile->net.xdp_tx_queue_size, tile->net.xdp_tx_queue_size ) );
129 3 : l = FD_LAYOUT_APPEND( l, fd_xsk_aio_align(), fd_xsk_aio_footprint( tile->net.xdp_tx_queue_size, tile->net.xdp_aio_depth ) );
130 3 : }
131 3 : l = FD_LAYOUT_APPEND( l, fd_ip_align(), fd_ip_footprint( 0UL, 0UL ) );
132 3 : return FD_LAYOUT_FINI( l, scratch_align() );
133 3 : }
134 :
135 : /* net_rx_aio_send is a callback invoked by aio when new data is
136 : received on an incoming xsk. The xsk might be bound to any interface
137 : or ports, so the purpose of this callback is to determine if the
138 : packet might be a valid transaction, and whether it is QUIC or
139 : non-QUIC (raw UDP) before forwarding to the appropriate handler.
140 :
141 : This callback is supposed to return the number of packets in the
142 : batch which were successfully processed, but we always return
143 : batch_cnt since there is no logic in place to backpressure this far
144 : up the stack, and there is no sane way to "not handle" an incoming
145 : packet. */
146 : static int
147 : net_rx_aio_send( void * _ctx,
148 : fd_aio_pkt_info_t const * batch,
149 : ulong batch_cnt,
150 : ulong * opt_batch_idx,
151 0 : int flush ) {
152 0 : (void)flush;
153 :
154 0 : fd_net_ctx_t * ctx = (fd_net_ctx_t *)_ctx;
155 :
156 0 : for( ulong i=0UL; i<batch_cnt; i++ ) {
157 0 : uchar const * packet = batch[i].buf;
158 0 : uchar const * packet_end = packet + batch[i].buf_sz;
159 :
160 0 : if( FD_UNLIKELY( batch[i].buf_sz > FD_NET_MTU ) )
161 0 : FD_LOG_ERR(( "received a UDP packet with a too large payload (%u)", batch[i].buf_sz ));
162 :
163 0 : uchar const * iphdr = packet + 14U;
164 :
165 : /* Filter for UDP/IPv4 packets. Test for ethtype and ipproto in 1
166 : branch */
167 0 : uint test_ethip = ( (uint)packet[12] << 16u ) | ( (uint)packet[13] << 8u ) | (uint)packet[23];
168 0 : if( FD_UNLIKELY( test_ethip!=0x080011 ) )
169 0 : FD_LOG_ERR(( "Firedancer received a packet from the XDP program that was either "
170 0 : "not an IPv4 packet, or not a UDP packet. It is likely your XDP program "
171 0 : "is not configured correctly." ));
172 :
173 : /* IPv4 is variable-length, so lookup IHL to find start of UDP */
174 0 : uint iplen = ( ( (uint)iphdr[0] ) & 0x0FU ) * 4U;
175 0 : uchar const * udp = iphdr + iplen;
176 :
177 : /* Ignore if UDP header is too short */
178 0 : if( FD_UNLIKELY( udp+8U > packet_end ) ) continue;
179 :
180 : /* Extract IP dest addr and UDP src/dest port */
181 0 : uint ip_srcaddr = *(uint *)( iphdr+12UL );
182 0 : ushort udp_srcport = fd_ushort_bswap( *(ushort *)( udp+0UL ) );
183 0 : ushort udp_dstport = fd_ushort_bswap( *(ushort *)( udp+2UL ) );
184 :
185 0 : ushort proto;
186 0 : fd_net_out_ctx_t * out;
187 0 : if( FD_UNLIKELY( udp_dstport==ctx->shred_listen_port ) ) {
188 0 : proto = DST_PROTO_SHRED;
189 0 : out = ctx->shred_out;
190 0 : } else if( FD_UNLIKELY( udp_dstport==ctx->quic_transaction_listen_port ) ) {
191 0 : proto = DST_PROTO_TPU_QUIC;
192 0 : out = ctx->quic_out;
193 0 : } else if( FD_UNLIKELY( udp_dstport==ctx->legacy_transaction_listen_port ) ) {
194 0 : proto = DST_PROTO_TPU_UDP;
195 0 : out = ctx->quic_out;
196 0 : } else if( FD_UNLIKELY( udp_dstport==ctx->gossip_listen_port ) ) {
197 0 : proto = DST_PROTO_GOSSIP;
198 0 : out = ctx->gossip_out;
199 0 : } else if( FD_UNLIKELY( udp_dstport==ctx->repair_intake_listen_port ) ) {
200 0 : proto = DST_PROTO_REPAIR;
201 0 : out = ctx->repair_out;
202 0 : } else if( FD_UNLIKELY( udp_dstport==ctx->repair_serve_listen_port ) ) {
203 0 : proto = DST_PROTO_REPAIR;
204 0 : out = ctx->repair_out;
205 0 : } else {
206 :
207 0 : FD_LOG_ERR(( "Firedancer received a UDP packet on port %hu which was not expected. "
208 0 : "Only the following ports should be configured to forward packets: "
209 0 : "%hu, %hu, %hu, %hu, %hu, %hu (excluding any 0 ports, which can be ignored)."
210 0 : "It is likely you changed the port configuration in your TOML file and "
211 0 : "did not reload the XDP program. You can reload the program by running "
212 0 : "`fdctl configure fini xdp && fdctl configure init xdp`.",
213 0 : udp_dstport,
214 0 : ctx->shred_listen_port,
215 0 : ctx->quic_transaction_listen_port,
216 0 : ctx->legacy_transaction_listen_port,
217 0 : ctx->gossip_listen_port,
218 0 : ctx->repair_intake_listen_port,
219 0 : ctx->repair_serve_listen_port ));
220 0 : }
221 :
222 0 : fd_memcpy( fd_chunk_to_laddr( out->mem, out->chunk ), packet, batch[ i ].buf_sz );
223 :
224 : /* tile can decide how to partition based on src ip addr and src port */
225 0 : ulong sig = fd_disco_netmux_sig( ip_srcaddr, udp_srcport, 0U, proto, 14UL+8UL+iplen );
226 :
227 0 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
228 0 : fd_mcache_publish( out->mcache, out->depth, out->seq, sig, out->chunk, batch[ i ].buf_sz, 0, 0, tspub );
229 :
230 0 : out->seq = fd_seq_inc( out->seq, 1UL );
231 0 : out->chunk = fd_dcache_compact_next( out->chunk, FD_NET_MTU, out->chunk0, out->wmark );
232 0 : }
233 :
234 0 : if( FD_LIKELY( opt_batch_idx ) ) {
235 0 : *opt_batch_idx = batch_cnt;
236 0 : }
237 :
238 0 : return FD_AIO_SUCCESS;
239 0 : }
240 :
241 : static void
242 0 : metrics_write( fd_net_ctx_t * ctx ) {
243 0 : ulong rx_cnt = ctx->xsk_aio[ 0 ]->metrics.rx_cnt;
244 0 : ulong rx_sz = ctx->xsk_aio[ 0 ]->metrics.rx_sz;
245 0 : ulong tx_cnt = ctx->xsk_aio[ 0 ]->metrics.tx_cnt;
246 0 : ulong tx_sz = ctx->xsk_aio[ 0 ]->metrics.tx_sz;
247 0 : if( FD_LIKELY( ctx->xsk_aio[ 1 ] ) ) {
248 0 : rx_cnt += ctx->xsk_aio[ 1 ]->metrics.rx_cnt;
249 0 : rx_sz += ctx->xsk_aio[ 1 ]->metrics.rx_sz;
250 0 : tx_cnt += ctx->xsk_aio[ 1 ]->metrics.tx_cnt;
251 0 : tx_sz += ctx->xsk_aio[ 1 ]->metrics.tx_sz;
252 0 : }
253 :
254 0 : FD_MCNT_SET( NET, RECEIVED_PACKETS, rx_cnt );
255 0 : FD_MCNT_SET( NET, RECEIVED_BYTES, rx_sz );
256 0 : FD_MCNT_SET( NET, SENT_PACKETS, tx_cnt );
257 0 : FD_MCNT_SET( NET, SENT_BYTES, tx_sz );
258 :
259 0 : FD_MCNT_SET( NET, TX_DROPPED, ctx->metrics.tx_dropped_cnt );
260 0 : }
261 :
262 : static void
263 : before_credit( fd_net_ctx_t * ctx,
264 : fd_stem_context_t * stem,
265 0 : int * charge_busy ) {
266 0 : (void)stem;
267 :
268 0 : for( ulong i=0UL; i<ctx->xsk_cnt; i++ ) {
269 0 : if( FD_LIKELY( fd_xsk_aio_service( ctx->xsk_aio[i] ) ) ) {
270 0 : *charge_busy = 1;
271 0 : }
272 0 : }
273 0 : }
274 :
275 : struct xdp_statistics_v0 {
276 : __u64 rx_dropped; /* Dropped for other reasons */
277 : __u64 rx_invalid_descs; /* Dropped due to invalid descriptor */
278 : __u64 tx_invalid_descs; /* Dropped due to invalid descriptor */
279 : };
280 :
281 : struct xdp_statistics_v1 {
282 : __u64 rx_dropped; /* Dropped for other reasons */
283 : __u64 rx_invalid_descs; /* Dropped due to invalid descriptor */
284 : __u64 tx_invalid_descs; /* Dropped due to invalid descriptor */
285 : __u64 rx_ring_full; /* Dropped due to rx ring being full */
286 : __u64 rx_fill_ring_empty_descs; /* Failed to retrieve item from fill ring */
287 : __u64 tx_ring_empty_descs; /* Failed to retrieve item from tx ring */
288 : };
289 :
290 : static inline void
291 0 : poll_xdp_statistics( fd_net_ctx_t * ctx ) {
292 0 : struct xdp_statistics_v1 stats;
293 0 : uint optlen = (uint)sizeof(stats);
294 0 : if( FD_UNLIKELY( -1==getsockopt( ctx->xsk[ 0 ]->xsk_fd, SOL_XDP, XDP_STATISTICS, &stats, &optlen ) ) )
295 0 : FD_LOG_ERR(( "getsockopt(SOL_XDP, XDP_STATISTICS) failed: %s", strerror( errno ) ));
296 :
297 0 : if( FD_LIKELY( optlen==sizeof(struct xdp_statistics_v1) ) ) {
298 0 : FD_MCNT_SET( NET, XDP_RX_DROPPED_OTHER, stats.rx_dropped );
299 0 : FD_MCNT_SET( NET, XDP_RX_DROPPED_RING_FULL, stats.rx_ring_full );
300 :
301 0 : FD_TEST( !stats.rx_invalid_descs );
302 0 : FD_TEST( !stats.tx_invalid_descs );
303 : /* TODO: We shouldn't ever try to tx or rx with empty descs but we
304 : seem to sometimes. */
305 : // FD_TEST( !stats.rx_fill_ring_empty_descs );
306 : // FD_TEST( !stats.tx_ring_empty_descs );
307 0 : } else if( FD_LIKELY( optlen==sizeof(struct xdp_statistics_v0) ) ) {
308 0 : FD_MCNT_SET( NET, XDP_RX_DROPPED_OTHER, stats.rx_dropped );
309 :
310 0 : FD_TEST( !stats.rx_invalid_descs );
311 0 : FD_TEST( !stats.tx_invalid_descs );
312 0 : } else {
313 0 : FD_LOG_ERR(( "getsockopt(SOL_XDP, XDP_STATISTICS) returned unexpected size %u", optlen ));
314 0 : }
315 0 : }
316 :
317 : static void
318 0 : during_housekeeping( fd_net_ctx_t * ctx ) {
319 0 : long now = fd_log_wallclock();
320 0 : if( FD_UNLIKELY( now > ctx->ip_next_upd ) ) {
321 0 : ctx->ip_next_upd = now + (long)60e9;
322 0 : fd_ip_arp_fetch( ctx->ip );
323 0 : fd_ip_route_fetch( ctx->ip );
324 0 : }
325 :
326 : /* Only net tile 0 polls the statistics, as they are retrieved for the
327 : XDP socket which is shared across all net tiles. */
328 :
329 0 : if( FD_LIKELY( !ctx->round_robin_id ) ) poll_xdp_statistics( ctx );
330 0 : }
331 :
332 : FD_FN_PURE static int
333 : route_loopback( uint tile_ip_addr,
334 0 : ulong sig ) {
335 0 : return fd_disco_netmux_sig_dst_ip( sig )==FD_IP4_ADDR(127,0,0,1) ||
336 0 : fd_disco_netmux_sig_dst_ip( sig )==tile_ip_addr;
337 0 : }
338 :
339 : static inline int
340 : before_frag( fd_net_ctx_t * ctx,
341 : ulong in_idx,
342 : ulong seq,
343 0 : ulong sig ) {
344 0 : (void)in_idx;
345 :
346 0 : ulong proto = fd_disco_netmux_sig_proto( sig );
347 0 : if( FD_UNLIKELY( proto!=DST_PROTO_OUTGOING ) ) return 1;
348 :
349 : /* Round robin by sequence number for now, QUIC should be modified to
350 : echo the net tile index back so we can transmit on the same queue.
351 :
352 : 127.0.0.1 packets for localhost must go out on net tile 0 which
353 : owns the loopback interface XSK, which only has 1 queue. */
354 :
355 0 : if( FD_UNLIKELY( route_loopback( ctx->src_ip_addr, sig ) ) ) return ctx->round_robin_id != 0UL;
356 0 : else return (seq % ctx->round_robin_cnt) != ctx->round_robin_id;
357 0 : }
358 :
359 : static inline void
360 : during_frag( fd_net_ctx_t * ctx,
361 : ulong in_idx,
362 : ulong seq,
363 : ulong sig,
364 : ulong chunk,
365 0 : ulong sz ) {
366 0 : (void)in_idx;
367 0 : (void)seq;
368 0 : (void)sig;
369 :
370 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>FD_NET_MTU ) )
371 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
372 :
373 0 : uchar * src = (uchar *)fd_chunk_to_laddr( ctx->in[ in_idx ].mem, chunk );
374 0 : fd_memcpy( ctx->frame, src, sz ); // TODO: Change xsk_aio interface to eliminate this copy
375 0 : }
376 :
377 : static void
378 : send_arp_probe( fd_net_ctx_t * ctx,
379 : uint dst_ip_addr,
380 0 : uint ifindex ) {
381 0 : uchar arp_buf[FD_IP_ARP_SZ];
382 0 : ulong arp_len = 0UL;
383 :
384 0 : uint src_ip_addr = ctx->src_ip_addr;
385 0 : uchar * src_mac_addr = ctx->src_mac_addr;
386 :
387 : /* prepare arp table */
388 0 : int arp_table_rtn = fd_ip_update_arp_table( ctx->ip, dst_ip_addr, ifindex );
389 :
390 0 : if( FD_UNLIKELY( arp_table_rtn == FD_IP_SUCCESS ) ) {
391 : /* generate a probe */
392 0 : fd_ip_arp_gen_arp_probe( arp_buf, FD_IP_ARP_SZ, &arp_len, dst_ip_addr, fd_uint_bswap( src_ip_addr ), src_mac_addr );
393 :
394 : /* send the probe */
395 0 : fd_aio_pkt_info_t aio_buf = { .buf = arp_buf, .buf_sz = (ushort)arp_len };
396 0 : ulong sent_cnt;
397 0 : int aio_err = ctx->tx->send_func( ctx->xsk_aio[ 0 ], &aio_buf, 1, &sent_cnt, 1 );
398 0 : ctx->metrics.tx_dropped_cnt += aio_err!=FD_AIO_SUCCESS;
399 0 : }
400 0 : }
401 :
402 : static void
403 : after_frag( fd_net_ctx_t * ctx,
404 : ulong in_idx,
405 : ulong seq,
406 : ulong sig,
407 : ulong sz,
408 : ulong tsorig,
409 0 : fd_stem_context_t * stem ) {
410 0 : (void)in_idx;
411 0 : (void)seq;
412 0 : (void)sig;
413 0 : (void)tsorig;
414 0 : (void)stem;
415 :
416 0 : fd_aio_pkt_info_t aio_buf = { .buf = ctx->frame, .buf_sz = (ushort)sz };
417 0 : if( FD_UNLIKELY( route_loopback( ctx->src_ip_addr, sig ) ) ) {
418 : /* Set Ethernet src and dst address to 00:00:00:00:00:00 */
419 0 : memset( ctx->frame, 0, 12UL );
420 :
421 0 : ulong sent_cnt;
422 0 : int aio_err = ctx->lo_tx->send_func( ctx->xsk_aio[ 1 ], &aio_buf, 1, &sent_cnt, 1 );
423 0 : ctx->metrics.tx_dropped_cnt += aio_err!=FD_AIO_SUCCESS;
424 0 : } else {
425 : /* extract dst ip */
426 0 : uint dst_ip = fd_uint_bswap( fd_disco_netmux_sig_dst_ip( sig ) );
427 :
428 0 : uint next_hop = 0U;
429 0 : uchar dst_mac[6] = {0};
430 0 : uint if_idx = 0;
431 :
432 : /* route the packet */
433 : /*
434 : * determine the destination:
435 : * same host
436 : * same subnet
437 : * other
438 : * determine the next hop
439 : * localhost
440 : * gateway
441 : * subnet local host
442 : * determine the mac address of the next hop address
443 : * and the local ipv4 and eth addresses */
444 0 : int rtn = fd_ip_route_ip_addr( dst_mac, &next_hop, &if_idx, ctx->ip, dst_ip );
445 0 : if( FD_UNLIKELY( rtn == FD_IP_PROBE_RQD ) ) {
446 : /* another fd_net instance might have already resolved this address
447 : so simply try another fetch */
448 0 : fd_ip_arp_fetch( ctx->ip );
449 0 : rtn = fd_ip_route_ip_addr( dst_mac, &next_hop, &if_idx, ctx->ip, dst_ip );
450 0 : }
451 :
452 0 : long now;
453 0 : switch( rtn ) {
454 0 : case FD_IP_PROBE_RQD:
455 : /* TODO possibly buffer some data while waiting for ARPs to complete */
456 : /* TODO rate limit ARPs */
457 : /* TODO add caching of ip_dst -> routing info */
458 0 : send_arp_probe( ctx, next_hop, if_idx );
459 :
460 : /* refresh tables */
461 0 : now = fd_log_wallclock();
462 0 : ctx->ip_next_upd = now + (long)200e3;
463 0 : break;
464 0 : case FD_IP_NO_ROUTE:
465 : /* cannot make progress here */
466 0 : break;
467 0 : case FD_IP_SUCCESS:
468 : /* set destination mac address */
469 0 : memcpy( ctx->frame, dst_mac, 6UL );
470 :
471 : /* set source mac address */
472 0 : memcpy( ctx->frame + 6UL, ctx->src_mac_addr, 6UL );
473 :
474 0 : ulong sent_cnt;
475 0 : int aio_err = ctx->tx->send_func( ctx->xsk_aio[ 0 ], &aio_buf, 1, &sent_cnt, 1 );
476 0 : ctx->metrics.tx_dropped_cnt += aio_err!=FD_AIO_SUCCESS;
477 0 : break;
478 0 : case FD_IP_RETRY:
479 : /* refresh tables */
480 0 : now = fd_log_wallclock();
481 0 : ctx->ip_next_upd = now + (long)200e3;
482 : /* TODO consider buffering */
483 0 : break;
484 0 : case FD_IP_MULTICAST:
485 0 : case FD_IP_BROADCAST:
486 0 : default:
487 : /* should not occur in current use cases */
488 0 : break;
489 0 : }
490 0 : }
491 0 : }
492 :
493 : /* init_link_session is part of privileged_init. It only runs on net
494 : tile 0. This function does shared pre-configuration used by all
495 : other net tiles. This includes installing the XDP program and
496 : setting up the XSKMAP into which the other net tiles can register
497 : themselves into.
498 :
499 : session, link_session, lo_session get initialized with session
500 : objects. tile points to the net tile's config. if_idx, lo_idx
501 : locate the device IDs of the main and loopback interface.
502 : *xsk_map_fd, *lo_xsk_map_fd are set to the newly created XSKMAP file
503 : descriptors.
504 :
505 : Note that if the main interface is loopback, then the loopback-
506 : related structures are uninitialized.
507 :
508 : Kernel object references:
509 :
510 : BPF_LINK file descriptor
511 : |
512 : +-> XDP program installation on NIC
513 : | |
514 : | +-> XDP program <-- BPF_PROG file descriptor (prog_fd)
515 : |
516 : +-> XSKMAP object <-- BPF_MAP file descriptor (xsk_map)
517 : |
518 : +-> BPF_MAP object <-- BPF_MAP file descriptor (udp_dsts) */
519 :
520 : static void
521 : privileged_init( fd_topo_t * topo,
522 0 : fd_topo_tile_t * tile ) {
523 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
524 :
525 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
526 :
527 0 : fd_net_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_net_ctx_t), sizeof(fd_net_ctx_t) );
528 0 : FD_SCRATCH_ALLOC_APPEND( l, fd_aio_align(), fd_aio_footprint() );
529 :
530 0 : uint if_idx = if_nametoindex( tile->net.interface );
531 0 : if( FD_UNLIKELY( !if_idx ) ) FD_LOG_ERR(( "if_nametoindex(%s) failed", tile->net.interface ));
532 :
533 : /* Create and install XSKs */
534 :
535 0 : int xsk_map_fd = 123462;
536 0 : ctx->prog_link_fds[ 0 ] = 123463;
537 0 : ctx->xsk[ 0 ] =
538 0 : fd_xsk_join(
539 0 : fd_xsk_new( FD_SCRATCH_ALLOC_APPEND( l, fd_xsk_align(), fd_xsk_footprint( FD_NET_MTU, tile->net.xdp_rx_queue_size, tile->net.xdp_rx_queue_size, tile->net.xdp_tx_queue_size, tile->net.xdp_tx_queue_size ) ),
540 0 : FD_NET_MTU,
541 0 : tile->net.xdp_rx_queue_size,
542 0 : tile->net.xdp_rx_queue_size,
543 0 : tile->net.xdp_tx_queue_size,
544 0 : tile->net.xdp_tx_queue_size ) );
545 0 : if( FD_UNLIKELY( !ctx->xsk[ 0 ] ) ) FD_LOG_ERR(( "fd_xsk_new failed" ));
546 0 : uint flags = tile->net.zero_copy ? XDP_ZEROCOPY : XDP_COPY;
547 0 : if( FD_UNLIKELY( !fd_xsk_init( ctx->xsk[ 0 ], if_idx, (uint)tile->kind_id, flags ) ) ) FD_LOG_ERR(( "failed to bind xsk for net tile %lu", tile->kind_id ));
548 0 : if( FD_UNLIKELY( !fd_xsk_activate( ctx->xsk[ 0 ], xsk_map_fd ) ) ) FD_LOG_ERR(( "failed to activate xsk for net tile %lu", tile->kind_id ));
549 :
550 0 : if( FD_UNLIKELY( fd_sandbox_gettid()==fd_sandbox_getpid() ) ) {
551 : /* Kind of gross.. in single threaded mode we don't want to close the xsk_map_fd
552 : since it's shared with other net tiles. Just check for that by seeing if we
553 : are the only thread in the process. */
554 0 : if( FD_UNLIKELY( -1==close( xsk_map_fd ) ) ) FD_LOG_ERR(( "close(%d) failed (%d-%s)", xsk_map_fd, errno, fd_io_strerror( errno ) ));
555 0 : }
556 :
557 0 : ctx->xsk_aio[ 0 ] = fd_xsk_aio_join( fd_xsk_aio_new( FD_SCRATCH_ALLOC_APPEND( l, fd_xsk_aio_align(), fd_xsk_aio_footprint( tile->net.xdp_tx_queue_size, tile->net.xdp_aio_depth ) ),
558 0 : tile->net.xdp_tx_queue_size,
559 0 : tile->net.xdp_aio_depth ), ctx->xsk[ 0 ] );
560 0 : if( FD_UNLIKELY( !ctx->xsk_aio[ 0 ] ) ) FD_LOG_ERR(( "fd_xsk_aio_join failed" ));
561 :
562 : /* Networking tile at index 0 also binds to loopback (only queue 0 available on lo) */
563 :
564 0 : if( FD_UNLIKELY( strcmp( tile->net.interface, "lo" ) && !tile->kind_id ) ) {
565 0 : ctx->xsk_cnt = 2;
566 :
567 0 : ushort udp_port_candidates[] = {
568 0 : (ushort)tile->net.legacy_transaction_listen_port,
569 0 : (ushort)tile->net.quic_transaction_listen_port,
570 0 : (ushort)tile->net.shred_listen_port,
571 0 : (ushort)tile->net.gossip_listen_port,
572 0 : (ushort)tile->net.repair_intake_listen_port,
573 0 : (ushort)tile->net.repair_serve_listen_port,
574 0 : };
575 :
576 0 : uint lo_idx = if_nametoindex( "lo" );
577 0 : if( FD_UNLIKELY( !lo_idx ) ) FD_LOG_ERR(( "if_nametoindex(lo) failed" ));
578 :
579 0 : fd_xdp_fds_t lo_fds = fd_xdp_install( lo_idx,
580 0 : tile->net.src_ip_addr,
581 0 : sizeof(udp_port_candidates)/sizeof(udp_port_candidates[0]),
582 0 : udp_port_candidates,
583 0 : "skb" );
584 :
585 0 : ctx->prog_link_fds[ 1 ] = lo_fds.prog_link_fd;
586 0 : ctx->xsk[ 1 ] =
587 0 : fd_xsk_join(
588 0 : fd_xsk_new( FD_SCRATCH_ALLOC_APPEND( l, fd_xsk_align(), fd_xsk_footprint( FD_NET_MTU, tile->net.xdp_rx_queue_size, tile->net.xdp_rx_queue_size, tile->net.xdp_tx_queue_size, tile->net.xdp_tx_queue_size ) ),
589 0 : FD_NET_MTU,
590 0 : tile->net.xdp_rx_queue_size,
591 0 : tile->net.xdp_rx_queue_size,
592 0 : tile->net.xdp_tx_queue_size,
593 0 : tile->net.xdp_tx_queue_size ) );
594 0 : if( FD_UNLIKELY( !ctx->xsk[ 1 ] ) ) FD_LOG_ERR(( "fd_xsk_join failed" ));
595 0 : if( FD_UNLIKELY( !fd_xsk_init( ctx->xsk[ 1 ], lo_idx, (uint)tile->kind_id, 0 /* flags */ ) ) ) FD_LOG_ERR(( "failed to bind lo_xsk" ));
596 0 : if( FD_UNLIKELY( !fd_xsk_activate( ctx->xsk[ 1 ], lo_fds.xsk_map_fd ) ) ) FD_LOG_ERR(( "failed to activate lo_xsk" ));
597 0 : if( FD_UNLIKELY( -1==close( lo_fds.xsk_map_fd ) ) ) FD_LOG_ERR(( "close(%d) failed (%d-%s)", xsk_map_fd, errno, fd_io_strerror( errno ) ));
598 :
599 0 : ctx->xsk_aio[ 1 ] = fd_xsk_aio_join( fd_xsk_aio_new( FD_SCRATCH_ALLOC_APPEND( l, fd_xsk_aio_align(), fd_xsk_aio_footprint( tile->net.xdp_tx_queue_size, tile->net.xdp_aio_depth ) ),
600 0 : tile->net.xdp_tx_queue_size,
601 0 : tile->net.xdp_aio_depth ), ctx->xsk[ 1 ] );
602 0 : if( FD_UNLIKELY( !ctx->xsk_aio[ 1 ] ) ) FD_LOG_ERR(( "fd_xsk_aio_new failed" ));
603 0 : }
604 :
605 0 : ctx->ip = fd_ip_join( fd_ip_new( FD_SCRATCH_ALLOC_APPEND( l, fd_ip_align(), fd_ip_footprint( 0UL, 0UL ) ), 0UL, 0UL ) );
606 0 : }
607 :
608 : static void
609 : unprivileged_init( fd_topo_t * topo,
610 0 : fd_topo_tile_t * tile ) {
611 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
612 :
613 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
614 :
615 0 : fd_net_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_net_ctx_t), sizeof(fd_net_ctx_t) );
616 0 : fd_aio_t * net_rx_aio = fd_aio_join( fd_aio_new( FD_SCRATCH_ALLOC_APPEND( l, fd_aio_align(), fd_aio_footprint() ), ctx, net_rx_aio_send ) );
617 0 : if( FD_UNLIKELY( !net_rx_aio ) ) FD_LOG_ERR(( "fd_aio_join failed" ));
618 :
619 0 : ctx->round_robin_cnt = fd_topo_tile_name_cnt( topo, tile->name );
620 0 : ctx->round_robin_id = tile->kind_id;
621 :
622 0 : fd_xsk_aio_set_rx( ctx->xsk_aio[ 0 ], net_rx_aio );
623 0 : ctx->tx = fd_xsk_aio_get_tx( ctx->xsk_aio[ 0 ] );
624 :
625 0 : if( FD_UNLIKELY( ctx->xsk_cnt>1UL ) ) {
626 0 : fd_xsk_aio_set_rx( ctx->xsk_aio[ 1 ], net_rx_aio );
627 0 : ctx->lo_tx = fd_xsk_aio_get_tx( ctx->xsk_aio[ 1 ] );
628 0 : }
629 :
630 0 : ctx->src_ip_addr = tile->net.src_ip_addr;
631 0 : memcpy( ctx->src_mac_addr, tile->net.src_mac_addr, 6UL );
632 :
633 0 : ctx->metrics.tx_dropped_cnt = 0UL;
634 :
635 0 : ctx->shred_listen_port = tile->net.shred_listen_port;
636 0 : ctx->quic_transaction_listen_port = tile->net.quic_transaction_listen_port;
637 0 : ctx->legacy_transaction_listen_port = tile->net.legacy_transaction_listen_port;
638 0 : ctx->gossip_listen_port = tile->net.gossip_listen_port;
639 0 : ctx->repair_intake_listen_port = tile->net.repair_intake_listen_port;
640 0 : ctx->repair_serve_listen_port = tile->net.repair_serve_listen_port;
641 :
642 : /* Put a bound on chunks we read from the input, to make sure they
643 : are within in the data region of the workspace. */
644 :
645 0 : if( FD_UNLIKELY( !tile->in_cnt ) ) FD_LOG_ERR(( "net tile in link cnt is zero" ));
646 0 : if( FD_UNLIKELY( tile->in_cnt>MAX_NET_INS ) ) FD_LOG_ERR(( "net tile in link cnt %lu exceeds MAX_NET_INS %lu", tile->in_cnt, MAX_NET_INS ));
647 0 : FD_TEST( tile->in_cnt<=32 );
648 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
649 0 : fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
650 0 : if( FD_UNLIKELY( link->mtu!=FD_NET_MTU ) ) FD_LOG_ERR(( "net tile in link does not have a normal MTU" ));
651 :
652 0 : ctx->in[ i ].mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
653 0 : ctx->in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in[ i ].mem, link->dcache );
654 0 : ctx->in[ i ].wmark = fd_dcache_compact_wmark( ctx->in[ i ].mem, link->dcache, link->mtu );
655 0 : }
656 :
657 0 : for( ulong i = 0; i < tile->out_cnt; i++ ) {
658 0 : fd_topo_link_t * out_link = &topo->links[ tile->out_link_id[ i ] ];
659 0 : if( strcmp( out_link->name, "net_quic" ) == 0 ) {
660 0 : fd_topo_link_t * quic_out = out_link;
661 0 : ctx->quic_out->mcache = quic_out->mcache;
662 0 : ctx->quic_out->sync = fd_mcache_seq_laddr( ctx->quic_out->mcache );
663 0 : ctx->quic_out->depth = fd_mcache_depth( ctx->quic_out->mcache );
664 0 : ctx->quic_out->seq = fd_mcache_seq_query( ctx->quic_out->sync );
665 0 : ctx->quic_out->chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( quic_out->dcache ), quic_out->dcache );
666 0 : ctx->quic_out->mem = topo->workspaces[ topo->objs[ quic_out->dcache_obj_id ].wksp_id ].wksp;
667 0 : ctx->quic_out->wmark = fd_dcache_compact_wmark ( ctx->quic_out->mem, quic_out->dcache, quic_out->mtu );
668 0 : ctx->quic_out->chunk = ctx->quic_out->chunk0;
669 0 : } else if( strcmp( out_link->name, "net_shred" ) == 0 ) {
670 0 : fd_topo_link_t * shred_out = out_link;
671 0 : ctx->shred_out->mcache = shred_out->mcache;
672 0 : ctx->shred_out->sync = fd_mcache_seq_laddr( ctx->shred_out->mcache );
673 0 : ctx->shred_out->depth = fd_mcache_depth( ctx->shred_out->mcache );
674 0 : ctx->shred_out->seq = fd_mcache_seq_query( ctx->shred_out->sync );
675 0 : ctx->shred_out->chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( shred_out->dcache ), shred_out->dcache );
676 0 : ctx->shred_out->mem = topo->workspaces[ topo->objs[ shred_out->dcache_obj_id ].wksp_id ].wksp;
677 0 : ctx->shred_out->wmark = fd_dcache_compact_wmark ( ctx->shred_out->mem, shred_out->dcache, shred_out->mtu );
678 0 : ctx->shred_out->chunk = ctx->shred_out->chunk0;
679 0 : } else if( strcmp( out_link->name, "net_gossip" ) == 0 ) {
680 0 : fd_topo_link_t * gossip_out = out_link;
681 0 : ctx->gossip_out->mcache = gossip_out->mcache;
682 0 : ctx->gossip_out->sync = fd_mcache_seq_laddr( ctx->gossip_out->mcache );
683 0 : ctx->gossip_out->depth = fd_mcache_depth( ctx->gossip_out->mcache );
684 0 : ctx->gossip_out->seq = fd_mcache_seq_query( ctx->gossip_out->sync );
685 0 : ctx->gossip_out->chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( gossip_out->dcache ), gossip_out->dcache );
686 0 : ctx->gossip_out->mem = topo->workspaces[ topo->objs[ gossip_out->dcache_obj_id ].wksp_id ].wksp;
687 0 : ctx->gossip_out->wmark = fd_dcache_compact_wmark ( ctx->gossip_out->mem, gossip_out->dcache, gossip_out->mtu );
688 0 : ctx->gossip_out->chunk = ctx->gossip_out->chunk0;
689 0 : } else if( strcmp( out_link->name, "net_repair" ) == 0 ) {
690 0 : fd_topo_link_t * repair_out = out_link;
691 0 : ctx->repair_out->mcache = repair_out->mcache;
692 0 : ctx->repair_out->sync = fd_mcache_seq_laddr( ctx->repair_out->mcache );
693 0 : ctx->repair_out->depth = fd_mcache_depth( ctx->repair_out->mcache );
694 0 : ctx->repair_out->seq = fd_mcache_seq_query( ctx->repair_out->sync );
695 0 : ctx->repair_out->chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( repair_out->dcache ), repair_out->dcache );
696 0 : ctx->repair_out->mem = topo->workspaces[ topo->objs[ repair_out->dcache_obj_id ].wksp_id ].wksp;
697 0 : ctx->repair_out->wmark = fd_dcache_compact_wmark ( ctx->repair_out->mem, repair_out->dcache, repair_out->mtu );
698 0 : ctx->repair_out->chunk = ctx->repair_out->chunk0;
699 0 : } else {
700 0 : FD_LOG_ERR(( "unrecognized out link `%s`", out_link->name ));
701 0 : }
702 0 : }
703 :
704 : /* Check if any of the tiles we set a listen port for do not have an outlink. */
705 0 : if( FD_UNLIKELY( ctx->shred_listen_port!=0 && ctx->shred_out->mcache==NULL ) ) {
706 0 : FD_LOG_ERR(( "shred listen port set but no out link was found" ));
707 0 : } else if( FD_UNLIKELY( ctx->quic_transaction_listen_port!=0 && ctx->quic_out->mcache==NULL ) ) {
708 0 : FD_LOG_ERR(( "quic transaction listen port set but no out link was found" ));
709 0 : } else if( FD_UNLIKELY( ctx->legacy_transaction_listen_port!=0 && ctx->quic_out->mcache==NULL ) ) {
710 0 : FD_LOG_ERR(( "legacy transaction listen port set but no out link was found" ));
711 0 : } else if( FD_UNLIKELY( ctx->gossip_listen_port!=0 && ctx->gossip_out->mcache==NULL ) ) {
712 0 : FD_LOG_ERR(( "gossip listen port set but no out link was found" ));
713 0 : } else if( FD_UNLIKELY( ctx->repair_intake_listen_port!=0 && ctx->repair_out->mcache==NULL ) ) {
714 0 : FD_LOG_ERR(( "repair intake port set but no out link was found" ));
715 0 : } else if( FD_UNLIKELY( ctx->repair_serve_listen_port!=0 && ctx->repair_out->mcache==NULL ) ) {
716 0 : FD_LOG_ERR(( "repair serve listen port set but no out link was found" ));
717 0 : }
718 :
719 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
720 0 : if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
721 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
722 0 : }
723 :
724 : static ulong
725 : populate_allowed_seccomp( fd_topo_t const * topo,
726 : fd_topo_tile_t const * tile,
727 : ulong out_cnt,
728 0 : struct sock_filter * out ) {
729 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
730 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
731 0 : fd_net_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_net_ctx_t ), sizeof( fd_net_ctx_t ) );
732 :
733 : /* A bit of a hack, if there is no loopback XSK for this tile, we still need to pass
734 : two "allow" FD arguments to the net policy, so we just make them both the same. */
735 0 : int allow_fd2 = ctx->xsk_cnt>1UL ? ctx->xsk[ 1 ]->xsk_fd : ctx->xsk[ 0 ]->xsk_fd;
736 0 : FD_TEST( ctx->xsk[ 0 ]->xsk_fd >= 0 && allow_fd2 >= 0 );
737 0 : int netlink_fd = fd_ip_netlink_get( ctx->ip )->fd;
738 0 : populate_sock_filter_policy_net( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->xsk[ 0 ]->xsk_fd, (uint)allow_fd2, (uint)netlink_fd );
739 0 : return sock_filter_policy_net_instr_cnt;
740 0 : }
741 :
742 : static ulong
743 : populate_allowed_fds( fd_topo_t const * topo,
744 : fd_topo_tile_t const * tile,
745 : ulong out_fds_cnt,
746 0 : int * out_fds ) {
747 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
748 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
749 0 : fd_net_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_net_ctx_t ), sizeof( fd_net_ctx_t ) );
750 :
751 0 : if( FD_UNLIKELY( out_fds_cnt<7UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
752 :
753 0 : ulong out_cnt = 0UL;
754 :
755 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
756 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
757 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
758 0 : out_fds[ out_cnt++ ] = fd_ip_netlink_get( ctx->ip )->fd;
759 :
760 0 : out_fds[ out_cnt++ ] = ctx->xsk[ 0 ]->xsk_fd;
761 0 : out_fds[ out_cnt++ ] = ctx->prog_link_fds[ 0 ];
762 0 : if( FD_LIKELY( ctx->xsk_cnt>1UL ) ) out_fds[ out_cnt++ ] = ctx->xsk[ 1 ]->xsk_fd;
763 0 : if( FD_LIKELY( ctx->xsk_cnt>1UL ) ) out_fds[ out_cnt++ ] = ctx->prog_link_fds[ 1 ];
764 0 : return out_cnt;
765 0 : }
766 :
767 0 : #define STEM_BURST (1UL)
768 :
769 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_net_ctx_t
770 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_net_ctx_t)
771 :
772 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
773 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
774 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
775 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
776 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
777 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
778 :
779 : #include "../../../../disco/stem/fd_stem.c"
780 :
781 : fd_topo_run_tile_t fd_tile_net = {
782 : .name = "net",
783 : .populate_allowed_seccomp = populate_allowed_seccomp,
784 : .populate_allowed_fds = populate_allowed_fds,
785 : .scratch_align = scratch_align,
786 : .scratch_footprint = scratch_footprint,
787 : .privileged_init = privileged_init,
788 : .unprivileged_init = unprivileged_init,
789 : .run = stem_run,
790 : };
|