Line data Source code
1 : /* The net tile translates between AF_XDP and fd_tango
2 : traffic. It is responsible for setting up the XDP and
3 : XSK socket configuration. */
4 :
5 : #include <errno.h>
6 : #include <fcntl.h>
7 : #include <net/if.h>
8 : #include <sys/socket.h> /* MSG_DONTWAIT needed before importing the net seccomp filter */
9 : #include <linux/if_xdp.h>
10 :
11 : #include "../../../../disco/metrics/fd_metrics.h"
12 : #include "../../../../disco/netlink/fd_netlink_tile.h" /* neigh4_solicit */
13 : #include "../../../../disco/topo/fd_topo.h"
14 :
15 : #include "../../../../waltz/ip/fd_fib4.h"
16 : #include "../../../../waltz/neigh/fd_neigh4_map.h"
17 : #include "../../../../waltz/xdp/fd_xdp_redirect_user.h" /* fd_xsk_activate */
18 : #include "../../../../waltz/xdp/fd_xsk_private.h"
19 : #include "../../../../util/log/fd_dtrace.h"
20 :
21 : #include <unistd.h>
22 : #include <linux/unistd.h>
23 :
24 : #include "generated/net_seccomp.h"
25 :
26 : /* MAX_NET_INS controls the max number of TX links that a net tile can
27 : serve. */
28 :
29 : #define MAX_NET_INS (32UL)
30 :
31 : /* FD_XDP_STATS_INTERVAL_NS controls the XDP stats refresh interval.
32 : This should be lower than the interval at which the metrics tile
33 : collects metrics. */
34 :
35 0 : #define FD_XDP_STATS_INTERVAL_NS (11e6) /* 11ms */
36 :
37 : /* fd_net_in_ctx_t contains consumer information for an incoming tango
38 : link. It is used as part of the TX path. */
39 :
40 : typedef struct {
41 : fd_wksp_t * mem;
42 : ulong chunk0;
43 : ulong wmark;
44 : } fd_net_in_ctx_t;
45 :
46 : /* fd_net_out_ctx_t contains publisher information for a link to a
47 : downstream app tile. It is used as part of the RX path. */
48 :
49 : typedef struct {
50 : fd_frag_meta_t * mcache;
51 : ulong * sync;
52 : ulong depth;
53 : ulong seq;
54 :
55 : fd_wksp_t * mem;
56 : ulong chunk0;
57 : ulong wmark;
58 : ulong chunk;
59 : } fd_net_out_ctx_t;
60 :
61 : /* fd_net_flusher_t controls the pacing of XDP sendto calls for flushing
62 : TX batches. In the 'wakeup' XDP mode, no TX occurs unless the net
63 : tile wakes up the kernel periodically using the sendto() syscall.
64 : If sendto() is called too frequently, time is wasted on context
65 : switches. If sendto() is called not often enough, packets are
66 : delayed or dropped. sendto() calls make almost no guarantees how
67 : much packets are sent out, nor do they indicate when the kernel
68 : finishes a wakeup call (asynchronously dispatched). The net tile
69 : thus uses a myraid of flush triggers that were tested for best
70 : performance. */
71 :
72 : struct fd_net_flusher {
73 :
74 : /* Packets that were enqueued after the last sendto() wakeup are
75 : considered "pending". If there are more than pending_wmark packets
76 : pending, a wakeup is dispatched. Thus, this dispatch trigger is
77 : proportional to packet rate, but does not trigger if I/O is seldom. */
78 : ulong pending_cnt;
79 : ulong pending_wmark;
80 :
81 : /* Sometimes, packets are not flushed out even after a sendto()
82 : wakeup. This can result in the tail of a burst getting delayed or
83 : overrun. If more than tail_flush_backoff ticks pass since the last
84 : sendto() wakeup and there are still unacknowledged packets in the
85 : TX ring, issues another wakeup. */
86 : long next_tail_flush_ticks;
87 : long tail_flush_backoff;
88 :
89 : };
90 :
91 : typedef struct fd_net_flusher fd_net_flusher_t;
92 :
93 : FD_PROTOTYPES_BEGIN
94 :
95 : /* fd_net_flusher_inc marks a new packet as enqueued. */
96 :
97 : static inline void
98 : fd_net_flusher_inc( fd_net_flusher_t * flusher,
99 0 : long now ) {
100 0 : flusher->pending_cnt++;
101 0 : long next_flush = now + flusher->tail_flush_backoff;
102 0 : flusher->next_tail_flush_ticks = fd_long_min( flusher->next_tail_flush_ticks, next_flush );
103 0 : }
104 :
105 : /* fd_net_flusher_check returns 1 if a sendto() wakeup should be issued
106 : immediately. now is a recent fd_tickcount() value.
107 : If tx_ring_empty==0 then the kernel is caught up with the net tile
108 : on the XDP TX ring. (Otherwise, the kernel is behind the net tile) */
109 :
110 : static inline int
111 : fd_net_flusher_check( fd_net_flusher_t * flusher,
112 : long now,
113 0 : int tx_ring_empty ) {
114 0 : int flush_level = flusher->pending_cnt >= flusher->pending_wmark;
115 0 : int flush_timeout = now >= flusher->next_tail_flush_ticks;
116 0 : int flush = flush_level || flush_timeout;
117 0 : if( !flush ) return 0;
118 0 : if( FD_UNLIKELY( tx_ring_empty ) ) {
119 : /* Flush requested but caught up */
120 0 : flusher->pending_cnt = 0UL;
121 0 : flusher->next_tail_flush_ticks = LONG_MAX;
122 0 : return 0;
123 0 : }
124 0 : return 1;
125 0 : }
126 :
127 : /* fd_net_flusher_wakeup signals a sendto() wakeup was done. now is a
128 : recent fd_tickcount() value. */
129 :
130 : static inline void
131 : fd_net_flusher_wakeup( fd_net_flusher_t * flusher,
132 0 : long now ) {
133 0 : flusher->pending_cnt = 0UL;
134 0 : flusher->next_tail_flush_ticks = now + flusher->tail_flush_backoff;
135 0 : }
136 :
137 : FD_PROTOTYPES_END
138 :
139 : /* fd_net_free_ring is a FIFO queue that stores pointers to free XDP TX
140 : frames. */
141 :
142 : struct fd_net_free_ring {
143 : ulong prod;
144 : ulong cons;
145 : ulong depth;
146 : ulong * queue;
147 : };
148 : typedef struct fd_net_free_ring fd_net_free_ring_t;
149 :
150 : typedef struct {
151 : /* An "XSK" is an AF_XDP socket */
152 : uint xsk_cnt;
153 : fd_xsk_t * xsk[ 2 ];
154 : int prog_link_fds[ 2 ];
155 :
156 : /* All net tiles are subscribed to the same TX links. (These are
157 : incoming links from app tiles asking the net tile to send out packets)
158 : The net tiles "take turns" doing TX jobs based on the L3+L4 dst hash.
159 : net_tile_id is the index of the current interface, net_tile_cnt is the
160 : total amount of interfaces. */
161 : uint net_tile_id;
162 : uint net_tile_cnt;
163 :
164 : /* Details pertaining to an inflight send op */
165 : struct {
166 : uint if_idx; /* 0: main interface, 1: loopback */
167 : void * frame;
168 : uchar mac_addrs[12]; /* First 12 bytes of Ethernet header */
169 : } tx_op;
170 :
171 : /* Round-robin cycle serivce operations */
172 : uint rr_idx;
173 :
174 : /* Rings tracking free packet buffers */
175 : fd_net_free_ring_t free_tx[ 2 ];
176 :
177 : uint src_ip_addr;
178 : uchar src_mac_addr[6];
179 :
180 : ushort shred_listen_port;
181 : ushort quic_transaction_listen_port;
182 : ushort legacy_transaction_listen_port;
183 : ushort gossip_listen_port;
184 : ushort repair_intake_listen_port;
185 : ushort repair_serve_listen_port;
186 :
187 : ulong in_cnt;
188 : fd_net_in_ctx_t in[ MAX_NET_INS ];
189 :
190 : fd_net_out_ctx_t quic_out[1];
191 : fd_net_out_ctx_t shred_out[1];
192 : fd_net_out_ctx_t gossip_out[1];
193 : fd_net_out_ctx_t repair_out[1];
194 :
195 : /* XDP stats refresh timer */
196 : long xdp_stats_interval_ticks;
197 : long next_xdp_stats_refresh;
198 :
199 : /* TX flush timers */
200 : fd_net_flusher_t tx_flusher[2]; /* one per XSK */
201 :
202 : /* Route and neighbor tables */
203 : fd_fib4_t const * fib_local;
204 : fd_fib4_t const * fib_main;
205 : fd_neigh4_hmap_t neigh4[1];
206 : fd_netlink_neigh4_solicit_link_t neigh4_solicit[1];
207 :
208 : struct {
209 : ulong rx_pkt_cnt;
210 : ulong rx_bytes_total;
211 : ulong rx_undersz_cnt;
212 : ulong rx_fill_blocked_cnt;
213 : ulong rx_backp_cnt;
214 :
215 : ulong tx_submit_cnt;
216 : ulong tx_complete_cnt;
217 : ulong tx_bytes_total;
218 : ulong tx_route_fail_cnt;
219 : ulong tx_no_xdp_cnt;
220 : ulong tx_neigh_fail_cnt;
221 : ulong tx_full_fail_cnt;
222 :
223 : ulong xsk_tx_wakeup_cnt;
224 : ulong xsk_rx_wakeup_cnt;
225 : } metrics;
226 : } fd_net_ctx_t;
227 :
228 : FD_FN_CONST static inline ulong
229 3 : scratch_align( void ) {
230 3 : return 4096UL;
231 3 : }
232 :
233 : FD_FN_PURE static inline ulong
234 3 : scratch_footprint( fd_topo_tile_t const * tile ) {
235 : /* TODO reproducing this conditional memory layout twice is susceptible to bugs. Use more robust object discovery */
236 3 : (void)tile;
237 3 : ulong l = FD_LAYOUT_INIT;
238 3 : l = FD_LAYOUT_APPEND( l, alignof(fd_net_ctx_t), sizeof(fd_net_ctx_t) );
239 3 : l = FD_LAYOUT_APPEND( l, fd_xsk_align(), fd_xsk_footprint( FD_NET_MTU, tile->net.xdp_rx_queue_size, tile->net.xdp_rx_queue_size, tile->net.xdp_tx_queue_size, tile->net.xdp_tx_queue_size ) );
240 3 : l = FD_LAYOUT_APPEND( l, alignof(ulong), tile->net.xdp_tx_queue_size * sizeof(ulong) );
241 3 : if( FD_UNLIKELY( strcmp( tile->net.interface, "lo" ) && tile->kind_id == 0 ) ) {
242 3 : l = FD_LAYOUT_APPEND( l, fd_xsk_align(), fd_xsk_footprint( FD_NET_MTU, tile->net.xdp_rx_queue_size, tile->net.xdp_rx_queue_size, tile->net.xdp_tx_queue_size, tile->net.xdp_tx_queue_size ) );
243 3 : l = FD_LAYOUT_APPEND( l, alignof(ulong), tile->net.xdp_tx_queue_size * sizeof(ulong) );
244 3 : }
245 3 : return FD_LAYOUT_FINI( l, scratch_align() );
246 3 : }
247 :
248 : static void
249 0 : metrics_write( fd_net_ctx_t * ctx ) {
250 0 : FD_MCNT_SET( NET, RX_PKT_CNT, ctx->metrics.rx_pkt_cnt );
251 0 : FD_MCNT_SET( NET, RX_BYTES_TOTAL, ctx->metrics.rx_bytes_total );
252 0 : FD_MCNT_SET( NET, RX_UNDERSZ_CNT, ctx->metrics.rx_undersz_cnt );
253 0 : FD_MCNT_SET( NET, RX_FILL_BLOCKED_CNT, ctx->metrics.rx_fill_blocked_cnt );
254 0 : FD_MCNT_SET( NET, RX_BACKPRESSURE_CNT, ctx->metrics.rx_backp_cnt );
255 :
256 0 : FD_MCNT_SET( NET, TX_SUBMIT_CNT, ctx->metrics.tx_submit_cnt );
257 0 : FD_MCNT_SET( NET, TX_COMPLETE_CNT, ctx->metrics.tx_complete_cnt );
258 0 : FD_MCNT_SET( NET, TX_BYTES_TOTAL, ctx->metrics.tx_bytes_total );
259 0 : FD_MCNT_SET( NET, TX_ROUTE_FAIL_CNT, ctx->metrics.tx_route_fail_cnt );
260 0 : FD_MCNT_SET( NET, TX_NEIGHBOR_FAIL_CNT, ctx->metrics.tx_neigh_fail_cnt );
261 0 : FD_MCNT_SET( NET, TX_FULL_FAIL_CNT, ctx->metrics.tx_full_fail_cnt );
262 :
263 0 : FD_MCNT_SET( NET, XSK_TX_WAKEUP_CNT, ctx->metrics.xsk_tx_wakeup_cnt );
264 0 : FD_MCNT_SET( NET, XSK_RX_WAKEUP_CNT, ctx->metrics.xsk_rx_wakeup_cnt );
265 0 : }
266 :
267 : struct xdp_statistics_v0 {
268 : __u64 rx_dropped; /* Dropped for other reasons */
269 : __u64 rx_invalid_descs; /* Dropped due to invalid descriptor */
270 : __u64 tx_invalid_descs; /* Dropped due to invalid descriptor */
271 : };
272 :
273 : struct xdp_statistics_v1 {
274 : __u64 rx_dropped; /* Dropped for other reasons */
275 : __u64 rx_invalid_descs; /* Dropped due to invalid descriptor */
276 : __u64 tx_invalid_descs; /* Dropped due to invalid descriptor */
277 : __u64 rx_ring_full; /* Dropped due to rx ring being full */
278 : __u64 rx_fill_ring_empty_descs; /* Failed to retrieve item from fill ring */
279 : __u64 tx_ring_empty_descs; /* Failed to retrieve item from tx ring */
280 : };
281 :
282 : static void
283 0 : poll_xdp_statistics( fd_net_ctx_t * ctx ) {
284 0 : struct xdp_statistics_v1 stats = {0};
285 0 : ulong xsk_cnt = ctx->xsk_cnt;
286 0 : for( ulong j=0UL; j<xsk_cnt; j++ ) {
287 0 : struct xdp_statistics_v1 sub_stats;
288 0 : uint optlen = (uint)sizeof(struct xdp_statistics_v1);
289 0 : if( FD_UNLIKELY( -1==getsockopt( ctx->xsk[ j ]->xsk_fd, SOL_XDP, XDP_STATISTICS, &sub_stats, &optlen ) ) )
290 0 : FD_LOG_ERR(( "getsockopt(SOL_XDP, XDP_STATISTICS) failed: %s", strerror( errno ) ));
291 0 : if( FD_UNLIKELY( optlen!=sizeof(struct xdp_statistics_v0) &&
292 0 : optlen!=sizeof(struct xdp_statistics_v1) ) ) {
293 0 : FD_LOG_ERR(( "getsockopt(SOL_XDP, XDP_STATISTICS) returned unexpected size %u", optlen ));
294 0 : }
295 0 : stats.rx_dropped += sub_stats.rx_dropped;
296 0 : stats.rx_invalid_descs += sub_stats.rx_invalid_descs;
297 0 : stats.tx_invalid_descs += sub_stats.tx_invalid_descs;
298 0 : stats.rx_ring_full += sub_stats.rx_ring_full;
299 0 : stats.rx_fill_ring_empty_descs += sub_stats.rx_fill_ring_empty_descs;
300 0 : stats.tx_ring_empty_descs += sub_stats.tx_ring_empty_descs;
301 0 : }
302 :
303 0 : FD_MCNT_SET( NET, XDP_RX_DROPPED_OTHER, stats.rx_dropped );
304 0 : FD_MCNT_SET( NET, XDP_RX_INVALID_DESCS, stats.rx_invalid_descs );
305 0 : FD_MCNT_SET( NET, XDP_TX_INVALID_DESCS, stats.tx_invalid_descs );
306 0 : FD_MCNT_SET( NET, XDP_RX_RING_FULL, stats.rx_ring_full );
307 0 : FD_MCNT_SET( NET, XDP_RX_FILL_RING_EMPTY_DESCS, stats.rx_fill_ring_empty_descs );
308 0 : FD_MCNT_SET( NET, XDP_TX_RING_EMPTY_DESCS, stats.tx_ring_empty_descs );
309 0 : }
310 :
311 : /* net_is_fatal_xdp_error returns 1 if the given errno returned by an
312 : XDP API indicates a non-recoverable error code. The net tile should
313 : crash if it sees such an error so the problem does not go undetected.
314 : Otherwise, returns 0. */
315 :
316 : static int
317 0 : net_is_fatal_xdp_error( int err ) {
318 0 : return err==ESOCKTNOSUPPORT || err==EOPNOTSUPP || err==EINVAL ||
319 0 : err==EPERM;
320 0 : }
321 :
322 : /* net_tx_ready returns 1 if the current XSK is ready to submit a TX send
323 : job. If the XSK is blocked for sends, returns 0. Reasons for block
324 : include:
325 : - No XSK TX buffer is available
326 : - XSK TX ring is full */
327 :
328 : static int
329 : net_tx_ready( fd_net_ctx_t * ctx,
330 0 : uint if_idx ) {
331 0 : fd_xsk_t * xsk = ctx->xsk[ if_idx ];
332 0 : fd_ring_desc_t * tx_ring = &xsk->ring_tx;
333 0 : fd_net_free_ring_t * free = ctx->free_tx + if_idx;
334 0 : if( free->prod == free->cons ) return 0; /* drop */
335 0 : if( tx_ring->prod - tx_ring->cons >= tx_ring->depth ) return 0; /* drop */
336 0 : return 1;
337 0 : }
338 :
339 : /* net_rx_wakeup triggers xsk_recvmsg to run in the kernel. Needs to be
340 : called periodically in order to receive packets. */
341 :
342 : static void
343 : net_rx_wakeup( fd_net_ctx_t * ctx,
344 0 : fd_xsk_t * xsk ) {
345 0 : if( !fd_xsk_rx_need_wakeup( xsk ) ) return;
346 0 : struct msghdr _ignored[ 1 ] = { 0 };
347 0 : if( FD_UNLIKELY( -1==recvmsg( xsk->xsk_fd, _ignored, MSG_DONTWAIT ) ) ) {
348 0 : if( FD_UNLIKELY( net_is_fatal_xdp_error( errno ) ) ) {
349 0 : FD_LOG_ERR(( "xsk recvmsg failed xsk_fd=%d (%i-%s)", xsk->xsk_fd, errno, fd_io_strerror( errno ) ));
350 0 : }
351 0 : if( FD_UNLIKELY( errno!=EAGAIN ) ) {
352 0 : long ts = fd_log_wallclock();
353 0 : if( ts > xsk->log_suppress_until_ns ) {
354 0 : FD_LOG_WARNING(( "xsk recvmsg failed xsk_fd=%d (%i-%s)", xsk->xsk_fd, errno, fd_io_strerror( errno ) ));
355 0 : xsk->log_suppress_until_ns = ts + (long)1e9;
356 0 : }
357 0 : }
358 0 : }
359 0 : ctx->metrics.xsk_rx_wakeup_cnt++;
360 0 : }
361 :
362 : /* net_tx_wakeup triggers xsk_sendmsg to run in the kernel. Needs to be
363 : called periodically in order to transmit packets. */
364 :
365 : static void
366 : net_tx_wakeup( fd_net_ctx_t * ctx,
367 0 : fd_xsk_t * xsk ) {
368 0 : if( !fd_xsk_tx_need_wakeup( xsk ) ) return;
369 0 : if( FD_VOLATILE_CONST( *xsk->ring_tx.prod )==FD_VOLATILE_CONST( *xsk->ring_tx.cons ) ) return;
370 0 : if( FD_UNLIKELY( -1==sendto( xsk->xsk_fd, NULL, 0, MSG_DONTWAIT, NULL, 0 ) ) ) {
371 0 : if( FD_UNLIKELY( net_is_fatal_xdp_error( errno ) ) ) {
372 0 : FD_LOG_ERR(( "xsk sendto failed xsk_fd=%d (%i-%s)", xsk->xsk_fd, errno, fd_io_strerror( errno ) ));
373 0 : }
374 0 : if( FD_UNLIKELY( errno!=EAGAIN ) ) {
375 0 : long ts = fd_log_wallclock();
376 0 : if( ts > xsk->log_suppress_until_ns ) {
377 0 : FD_LOG_WARNING(( "xsk sendto failed xsk_fd=%d (%i-%s)", xsk->xsk_fd, errno, fd_io_strerror( errno ) ));
378 0 : xsk->log_suppress_until_ns = ts + (long)1e9;
379 0 : }
380 0 : }
381 0 : }
382 0 : ctx->metrics.xsk_tx_wakeup_cnt++;
383 0 : }
384 :
385 : /* net_tx_periodic_wakeup does a timer based xsk_sendmsg wakeup. */
386 :
387 : static inline void
388 : net_tx_periodic_wakeup( fd_net_ctx_t * ctx,
389 : uint if_idx,
390 0 : long now ) {
391 0 : uint tx_prod = FD_VOLATILE_CONST( *ctx->xsk[ if_idx ]->ring_tx.prod );
392 0 : uint tx_cons = FD_VOLATILE_CONST( *ctx->xsk[ if_idx ]->ring_tx.cons );
393 0 : int tx_ring_empty = tx_prod==tx_cons;
394 0 : if( fd_net_flusher_check( ctx->tx_flusher+if_idx, now, tx_ring_empty ) ) {
395 0 : net_tx_wakeup( ctx, ctx->xsk[ if_idx ] );
396 0 : fd_net_flusher_wakeup( ctx->tx_flusher+if_idx, now );
397 0 : }
398 0 : }
399 :
400 : static void
401 0 : during_housekeeping( fd_net_ctx_t * ctx ) {
402 0 : long now = fd_tickcount();
403 :
404 0 : if( now > ctx->next_xdp_stats_refresh ) {
405 0 : ctx->next_xdp_stats_refresh = now + ctx->xdp_stats_interval_ticks;
406 0 : poll_xdp_statistics( ctx );
407 0 : }
408 :
409 0 : for( uint j=0U; j<ctx->xsk_cnt; j++ ) {
410 0 : net_rx_wakeup( ctx, ctx->xsk[ j ] );
411 0 : }
412 0 : }
413 :
414 : /* net_tx_route resolves the destination interface index, src MAC address,
415 : and dst MAC address. Returns 1 on success, 0 on failure. On success,
416 : tx_op->{if_idx,mac_addrs} is set. */
417 :
418 : static int
419 : net_tx_route( fd_net_ctx_t * ctx,
420 0 : uint dst_ip ) {
421 :
422 : /* Route lookup */
423 :
424 0 : fd_fib4_hop_t hop[2] = {0};
425 0 : fd_fib4_lookup( ctx->fib_local, hop+0, dst_ip, 0UL );
426 0 : fd_fib4_lookup( ctx->fib_main, hop+1, dst_ip, 0UL );
427 0 : fd_fib4_hop_t const * next_hop = fd_fib4_hop_or( hop+0, hop+1 );
428 :
429 0 : uint rtype = next_hop->rtype;
430 0 : uint if_idx = next_hop->if_idx;
431 :
432 0 : if( FD_UNLIKELY( rtype==FD_FIB4_RTYPE_LOCAL ) ) {
433 0 : rtype = FD_FIB4_RTYPE_UNICAST;
434 0 : if_idx = 1;
435 0 : }
436 :
437 0 : if( FD_UNLIKELY( rtype!=FD_FIB4_RTYPE_UNICAST ) ) {
438 0 : ctx->metrics.tx_route_fail_cnt++;
439 0 : return 0;
440 0 : }
441 :
442 0 : if( if_idx==1 ) {
443 : /* Set Ethernet src and dst address to 00:00:00:00:00:00 */
444 0 : memset( ctx->tx_op.mac_addrs, 0, 12UL );
445 0 : ctx->tx_op.if_idx = 1;
446 0 : return 1;
447 0 : }
448 :
449 0 : if( FD_UNLIKELY( if_idx!=ctx->xsk[ 0 ]->if_idx ) ) {
450 0 : ctx->metrics.tx_no_xdp_cnt++;
451 0 : return 0;
452 0 : }
453 0 : ctx->tx_op.if_idx = 0;
454 :
455 : /* Neighbor resolve */
456 :
457 0 : uint neigh_ip = next_hop->ip4_gw;
458 0 : if( !neigh_ip ) neigh_ip = dst_ip;
459 :
460 0 : fd_neigh4_hmap_query_t neigh_query[1];
461 0 : int neigh_res = fd_neigh4_hmap_query_try( ctx->neigh4, &neigh_ip, NULL, neigh_query, 0 );
462 0 : if( FD_UNLIKELY( neigh_res!=FD_MAP_SUCCESS ) ) {
463 : /* Neighbor not found */
464 0 : fd_netlink_neigh4_solicit( ctx->neigh4_solicit, neigh_ip, if_idx, fd_frag_meta_ts_comp( fd_tickcount() ) );
465 0 : ctx->metrics.tx_neigh_fail_cnt++;
466 0 : return 0;
467 0 : }
468 0 : fd_neigh4_entry_t const * neigh = fd_neigh4_hmap_query_ele_const( neigh_query );
469 0 : if( FD_UNLIKELY( neigh->state != FD_NEIGH4_STATE_ACTIVE ) ) {
470 0 : ctx->metrics.tx_neigh_fail_cnt++;
471 0 : return 0;
472 0 : }
473 :
474 0 : memcpy( ctx->tx_op.mac_addrs+0, neigh->mac_addr, 6 );
475 0 : memcpy( ctx->tx_op.mac_addrs+6, ctx->src_mac_addr, 6 );
476 :
477 0 : if( FD_UNLIKELY( fd_neigh4_hmap_query_test( neigh_query ) ) ) {
478 0 : ctx->metrics.tx_neigh_fail_cnt++;
479 0 : return 0;
480 0 : }
481 :
482 0 : return 1;
483 0 : }
484 :
485 : /* before_frag is called when a new metadata descriptor for a TX job is
486 : found. This callback determines whether this net tile is responsible
487 : for the TX job. If so, it prepares the TX op for the during_frag and
488 : after_frag callbacks. */
489 :
490 : static inline int
491 : before_frag( fd_net_ctx_t * ctx,
492 : ulong in_idx,
493 : ulong seq,
494 0 : ulong sig ) {
495 0 : (void)in_idx; (void)seq;
496 :
497 : /* Find interface index of next packet */
498 :
499 0 : ulong proto = fd_disco_netmux_sig_proto( sig );
500 0 : if( FD_UNLIKELY( proto!=DST_PROTO_OUTGOING ) ) return 1;
501 :
502 0 : uint dst_ip = fd_disco_netmux_sig_dst_ip( sig );
503 0 : if( FD_UNLIKELY( !net_tx_route( ctx, dst_ip ) ) ) return 1;
504 :
505 0 : uint net_tile_id = ctx->net_tile_id;
506 0 : uint net_tile_cnt = ctx->net_tile_cnt;
507 0 : uint if_idx = ctx->tx_op.if_idx;
508 0 : if( FD_UNLIKELY( if_idx>=ctx->xsk_cnt ) ) return 1; /* ignore */
509 :
510 : /* Load balance TX */
511 :
512 0 : uint hash = (uint)fd_disco_netmux_sig_hash( sig );
513 0 : uint target_idx = hash % net_tile_cnt;
514 0 : if( if_idx==1 ) target_idx = 0; /* loopback always targets tile 0 */
515 :
516 : /* Skip if another net tile is responsible for this packet */
517 :
518 0 : if( net_tile_id!=target_idx ) return 1; /* ignore */
519 :
520 : /* Skip if TX is blocked */
521 :
522 0 : if( FD_UNLIKELY( !net_tx_ready( ctx, if_idx ) ) ) {
523 0 : ctx->metrics.tx_full_fail_cnt++;
524 0 : return 1;
525 0 : }
526 :
527 : /* Allocate buffer for receive */
528 :
529 0 : fd_net_free_ring_t * free = ctx->free_tx + if_idx;
530 0 : ulong alloc_seq = free->cons;
531 0 : void * frame = (void *)free->queue[ alloc_seq % free->depth ];
532 0 : free->cons = fd_seq_inc( alloc_seq, 1UL );
533 :
534 0 : ctx->tx_op.if_idx = if_idx;
535 0 : ctx->tx_op.frame = frame;
536 :
537 0 : return 0; /* continue */
538 0 : }
539 :
540 : /* during_frag is called when before_frag has committed to transmit an
541 : outgoing packet. */
542 :
543 : static inline void
544 : during_frag( fd_net_ctx_t * ctx,
545 : ulong in_idx,
546 : ulong seq,
547 : ulong sig,
548 : ulong chunk,
549 0 : ulong sz ) {
550 0 : (void)in_idx; (void)seq; (void)sig;
551 :
552 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>FD_NET_MTU ) )
553 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
554 :
555 0 : if( FD_UNLIKELY( sz<14UL ) )
556 0 : FD_LOG_ERR(( "packet too small %lu (in_idx=%lu)", sz, in_idx ));
557 :
558 0 : fd_xsk_t * xsk = ctx->xsk[ ctx->tx_op.if_idx ];
559 :
560 0 : void * frame = ctx->tx_op.frame;
561 0 : if( FD_UNLIKELY( (ulong)frame+sz > (ulong)xsk->umem.addr + xsk->umem.len ) )
562 0 : FD_LOG_ERR(( "frame %p out of bounds (beyond %p)", frame, (void *)( (ulong)xsk->umem.addr + xsk->umem.len ) ));
563 0 : if( FD_UNLIKELY( (ulong)frame < (ulong)xsk->umem.addr ) )
564 0 : FD_LOG_ERR(( "frame %p out of bounds (below %p)", frame, (void *)xsk->umem.addr ));
565 :
566 : /* Speculatively copy frame into XDP buffer */
567 0 : uchar const * src = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
568 0 : fd_memcpy( ctx->tx_op.frame, src, sz );
569 0 : }
570 :
571 : /* after_frag is called when the during_frag memcpy was _not_ overrun. */
572 :
573 : static void
574 : after_frag( fd_net_ctx_t * ctx,
575 : ulong in_idx,
576 : ulong seq,
577 : ulong sig,
578 : ulong sz,
579 : ulong tsorig,
580 0 : fd_stem_context_t * stem ) {
581 0 : (void)in_idx; (void)seq; (void)sig; (void)tsorig; (void)stem;
582 :
583 : /* Current send operation */
584 :
585 0 : uint if_idx = ctx->tx_op.if_idx;
586 0 : uchar * frame = ctx->tx_op.frame;
587 0 : fd_xsk_t * xsk = ctx->xsk[ if_idx ];
588 :
589 0 : memcpy( frame, ctx->tx_op.mac_addrs, 12 );
590 :
591 : /* Submit packet TX job
592 :
593 : Invariant for ring_tx: prod-cons<length
594 : (This invariant breaks if any other packet is sent over this ring
595 : between before_frag and this point, e.g. send_arp_probe.) */
596 :
597 0 : fd_ring_desc_t * tx_ring = &xsk->ring_tx;
598 0 : uint tx_seq = FD_VOLATILE_CONST( *tx_ring->prod );
599 0 : uint tx_mask = tx_ring->depth - 1U;
600 0 : xsk->ring_tx.packet_ring[ tx_seq&tx_mask ] = (struct xdp_desc) {
601 0 : .addr = (ulong)frame - (ulong)xsk->umem.addr,
602 0 : .len = (uint)sz,
603 0 : .options = 0
604 0 : };
605 :
606 : /* Frame is now owned by kernel. Clear tx_op. */
607 0 : ctx->tx_op.frame = NULL;
608 :
609 : /* Register newly enqueued packet */
610 0 : FD_VOLATILE( *xsk->ring_tx.prod ) = tx_ring->cached_prod = tx_seq+1U;
611 0 : ctx->metrics.tx_submit_cnt++;
612 0 : ctx->metrics.tx_bytes_total += sz;
613 0 : fd_net_flusher_inc( ctx->tx_flusher+if_idx, fd_tickcount() );
614 :
615 0 : }
616 :
617 : /* net_rx_packet is called when a new Ethernet frame is available.
618 : Attempts to copy out the frame to a downstream tile. */
619 :
620 : static void
621 : net_rx_packet( fd_net_ctx_t * ctx,
622 : fd_stem_context_t * stem,
623 : uchar const * packet,
624 0 : ulong sz ) {
625 :
626 0 : uchar const * packet_end = packet + sz;
627 0 : uchar const * iphdr = packet + 14U;
628 :
629 : /* Filter for UDP/IPv4 packets. Test for ethtype and ipproto in 1
630 : branch */
631 0 : uint test_ethip = ( (uint)packet[12] << 16u ) | ( (uint)packet[13] << 8u ) | (uint)packet[23];
632 0 : if( FD_UNLIKELY( test_ethip!=0x080011 ) )
633 0 : FD_LOG_ERR(( "Firedancer received a packet from the XDP program that was either "
634 0 : "not an IPv4 packet, or not a UDP packet. It is likely your XDP program "
635 0 : "is not configured correctly." ));
636 :
637 : /* IPv4 is variable-length, so lookup IHL to find start of UDP */
638 0 : uint iplen = ( ( (uint)iphdr[0] ) & 0x0FU ) * 4U;
639 0 : uchar const * udp = iphdr + iplen;
640 :
641 : /* Ignore if UDP header is too short */
642 0 : if( FD_UNLIKELY( udp+8U > packet_end ) ) {
643 0 : FD_DTRACE_PROBE( net_tile_err_rx_undersz );
644 0 : ctx->metrics.rx_undersz_cnt++;
645 0 : return;
646 0 : }
647 :
648 : /* Extract IP dest addr and UDP src/dest port */
649 0 : uint ip_srcaddr = *(uint *)( iphdr+12UL );
650 0 : ushort udp_srcport = fd_ushort_bswap( *(ushort *)( udp+0UL ) );
651 0 : ushort udp_dstport = fd_ushort_bswap( *(ushort *)( udp+2UL ) );
652 :
653 0 : FD_DTRACE_PROBE_4( net_tile_pkt_rx, ip_srcaddr, udp_srcport, udp_dstport, sz );
654 :
655 0 : ushort proto;
656 0 : fd_net_out_ctx_t * out;
657 0 : if( FD_UNLIKELY( udp_dstport==ctx->shred_listen_port ) ) {
658 0 : proto = DST_PROTO_SHRED;
659 0 : out = ctx->shred_out;
660 0 : } else if( FD_UNLIKELY( udp_dstport==ctx->quic_transaction_listen_port ) ) {
661 0 : proto = DST_PROTO_TPU_QUIC;
662 0 : out = ctx->quic_out;
663 0 : } else if( FD_UNLIKELY( udp_dstport==ctx->legacy_transaction_listen_port ) ) {
664 0 : proto = DST_PROTO_TPU_UDP;
665 0 : out = ctx->quic_out;
666 0 : } else if( FD_UNLIKELY( udp_dstport==ctx->gossip_listen_port ) ) {
667 0 : proto = DST_PROTO_GOSSIP;
668 0 : out = ctx->gossip_out;
669 0 : } else if( FD_UNLIKELY( udp_dstport==ctx->repair_intake_listen_port ) ) {
670 0 : proto = DST_PROTO_REPAIR;
671 0 : out = ctx->repair_out;
672 0 : } else if( FD_UNLIKELY( udp_dstport==ctx->repair_serve_listen_port ) ) {
673 0 : proto = DST_PROTO_REPAIR;
674 0 : out = ctx->repair_out;
675 0 : } else {
676 :
677 0 : FD_LOG_ERR(( "Firedancer received a UDP packet on port %hu which was not expected. "
678 0 : "Only the following ports should be configured to forward packets: "
679 0 : "%hu, %hu, %hu, %hu, %hu, %hu (excluding any 0 ports, which can be ignored)."
680 0 : "Please report this error to Firedancer maintainers.",
681 0 : udp_dstport,
682 0 : ctx->shred_listen_port,
683 0 : ctx->quic_transaction_listen_port,
684 0 : ctx->legacy_transaction_listen_port,
685 0 : ctx->gossip_listen_port,
686 0 : ctx->repair_intake_listen_port,
687 0 : ctx->repair_serve_listen_port ));
688 0 : }
689 :
690 0 : fd_memcpy( fd_chunk_to_laddr( out->mem, out->chunk ), packet, sz );
691 :
692 : /* tile can decide how to partition based on src ip addr and src port */
693 0 : ulong sig = fd_disco_netmux_sig( ip_srcaddr, udp_srcport, 0U, proto, 14UL+8UL+iplen );
694 :
695 0 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
696 0 : fd_mcache_publish( out->mcache, out->depth, out->seq, sig, out->chunk, sz, 0, 0, tspub );
697 :
698 0 : *stem->cr_avail -= stem->cr_decrement_amount;
699 :
700 0 : out->seq = fd_seq_inc( out->seq, 1UL );
701 0 : out->chunk = fd_dcache_compact_next( out->chunk, FD_NET_MTU, out->chunk0, out->wmark );
702 :
703 0 : ctx->metrics.rx_pkt_cnt++;
704 0 : ctx->metrics.rx_bytes_total += sz;
705 :
706 0 : }
707 :
708 : /* net_comp_event is called when an XDP TX frame is free again. */
709 :
710 : static void
711 : net_comp_event( fd_net_ctx_t * ctx,
712 : fd_xsk_t * xsk,
713 : uint xsk_idx,
714 0 : uint comp_seq ) {
715 :
716 : /* Locate the incoming frame */
717 :
718 0 : fd_ring_desc_t * comp_ring = &xsk->ring_cr;
719 0 : uint comp_mask = comp_ring->depth - 1U;
720 0 : ulong frame = FD_VOLATILE_CONST( comp_ring->frame_ring[ comp_seq&comp_mask ] );
721 0 : ulong const frame_mask = FD_NET_MTU - 1UL;
722 0 : if( FD_UNLIKELY( frame+FD_NET_MTU > xsk->umem.len ) ) {
723 0 : FD_LOG_ERR(( "Bounds check failed: frame=0x%lx umem.len=0x%lx",
724 0 : frame, (ulong)xsk->umem.len ));
725 0 : }
726 :
727 : /* Check if we have space to return the freed frame */
728 :
729 0 : fd_net_free_ring_t * free = ctx->free_tx + xsk_idx;
730 0 : ulong free_prod = free->prod;
731 0 : ulong free_mask = free->depth - 1UL;
732 0 : long free_cnt = fd_seq_diff( free_prod, free->cons );
733 0 : if( FD_UNLIKELY( free_cnt>=(long)free->depth ) ) return; /* blocked */
734 :
735 0 : free->queue[ free_prod&free_mask ] = xsk->umem.addr + (frame & (~frame_mask));
736 0 : free->prod = fd_seq_inc( free_prod, 1UL );
737 :
738 : /* Wind up for next iteration */
739 :
740 0 : FD_VOLATILE( *comp_ring->cons ) = comp_ring->cached_cons = comp_seq+1U;
741 :
742 0 : ctx->metrics.tx_complete_cnt++;
743 :
744 0 : }
745 :
746 : /* net_rx_event is called when a new XDP RX frame is available. Calls
747 : net_rx_packet, then returns the packet back to the kernel via the fill
748 : ring. */
749 :
750 : static void
751 : net_rx_event( fd_net_ctx_t * ctx,
752 : fd_stem_context_t * stem,
753 : fd_xsk_t * xsk,
754 0 : uint rx_seq ) {
755 :
756 0 : if( FD_UNLIKELY( *stem->cr_avail < stem->cr_decrement_amount ) ) {
757 0 : ctx->metrics.rx_backp_cnt++;
758 0 : return;
759 0 : }
760 :
761 : /* Locate the incoming frame */
762 :
763 0 : fd_ring_desc_t * rx_ring = &xsk->ring_rx;
764 0 : uint rx_mask = rx_ring->depth - 1U;
765 0 : struct xdp_desc frame = FD_VOLATILE_CONST( rx_ring->packet_ring[ rx_seq&rx_mask ] );
766 :
767 0 : if( FD_UNLIKELY( frame.len>FD_NET_MTU ) )
768 0 : FD_LOG_ERR(( "received a UDP packet with a too large payload (%u)", frame.len ));
769 :
770 : /* Check if we have space in the fill ring to free the frame */
771 :
772 0 : fd_ring_desc_t * fill_ring = &xsk->ring_fr;
773 0 : uint fill_depth = fill_ring->depth;
774 0 : uint fill_mask = fill_depth-1U;
775 0 : ulong frame_mask = FD_NET_MTU - 1UL;
776 0 : uint fill_prod = FD_VOLATILE_CONST( *fill_ring->prod );
777 0 : uint fill_cons = FD_VOLATILE_CONST( *fill_ring->cons );
778 :
779 0 : if( FD_UNLIKELY( (int)(fill_prod-fill_cons) >= (int)fill_depth ) ) {
780 0 : ctx->metrics.rx_fill_blocked_cnt++;
781 0 : return; /* blocked */
782 0 : }
783 :
784 : /* Pass it to the receive handler */
785 :
786 0 : uchar const * packet = (uchar const *)xsk->umem.addr + frame.addr;
787 0 : net_rx_packet( ctx, stem, packet, frame.len );
788 :
789 0 : FD_COMPILER_MFENCE();
790 0 : FD_VOLATILE( *rx_ring->cons ) = rx_ring->cached_cons = rx_seq+1U;
791 :
792 : /* Free the frame by returning it back to the fill ring */
793 :
794 0 : fill_ring->frame_ring[ fill_prod&fill_mask ] = frame.addr & (~frame_mask);
795 0 : FD_VOLATILE( *fill_ring->prod ) = fill_ring->cached_prod = fill_prod+1U;
796 :
797 0 : }
798 :
799 : /* before_credit is called every loop iteration. */
800 :
801 : static void
802 : before_credit( fd_net_ctx_t * ctx,
803 : fd_stem_context_t * stem,
804 0 : int * charge_busy ) {
805 0 : (void)stem;
806 :
807 : /* A previous send attempt was overrun. A corrupt copy of the packet was
808 : placed into an XDP frame, but the frame was not yet submitted to the
809 : TX ring. Return the tx buffer to the free list. */
810 :
811 0 : if( ctx->tx_op.frame ) {
812 0 : *charge_busy = 1;
813 0 : fd_net_free_ring_t * free = ctx->free_tx + ctx->tx_op.if_idx;
814 0 : ulong alloc_seq = free->prod;
815 0 : free->queue[ alloc_seq % free->depth ] = (ulong)ctx->tx_op.frame;
816 0 : free->prod = fd_seq_inc( alloc_seq, 1UL );
817 0 : ctx->tx_op.frame = NULL;
818 0 : }
819 :
820 : /* Check if new packets are available or if TX frames are free again
821 : (Round-robin through sockets) */
822 :
823 0 : uint rr_idx = ctx->rr_idx;
824 0 : fd_xsk_t * rr_xsk = ctx->xsk[ rr_idx ];
825 0 : ctx->rr_idx++;
826 0 : ctx->rr_idx = fd_uint_if( ctx->rr_idx>=ctx->xsk_cnt, 0, ctx->rr_idx );
827 :
828 0 : uint rx_cons = FD_VOLATILE_CONST( *rr_xsk->ring_rx.cons );
829 0 : uint rx_prod = FD_VOLATILE_CONST( *rr_xsk->ring_rx.prod );
830 0 : if( rx_cons!=rx_prod ) {
831 0 : *charge_busy = 1;
832 0 : rr_xsk->ring_rx.cached_prod = rx_prod;
833 0 : net_rx_event( ctx, stem, rr_xsk, rx_cons );
834 0 : }
835 :
836 0 : uint comp_cons = FD_VOLATILE_CONST( *rr_xsk->ring_cr.cons );
837 0 : uint comp_prod = FD_VOLATILE_CONST( *rr_xsk->ring_cr.prod );
838 0 : if( comp_cons!=comp_prod ) {
839 0 : *charge_busy = 1;
840 0 : rr_xsk->ring_cr.cached_prod = comp_prod;
841 0 : net_comp_event( ctx, rr_xsk, rr_idx, comp_cons );
842 0 : }
843 :
844 0 : net_tx_periodic_wakeup( ctx, rr_idx, fd_tickcount() );
845 :
846 0 : }
847 :
848 : /* net_xsk_bootstrap does the initial UMEM frame to RX/TX ring assignments.
849 : First assigns UMEM frames to the XDP FILL ring, then assigns frames to
850 : the net tile free_tx queue. */
851 :
852 : static void
853 : net_xsk_bootstrap( fd_net_ctx_t * ctx,
854 0 : uint xsk_idx ) {
855 0 : fd_xsk_t * xsk = ctx->xsk[ xsk_idx ];
856 :
857 0 : ulong frame_off = 0UL;
858 0 : ulong const frame_sz = FD_NET_MTU;
859 0 : ulong const rx_depth = ctx->xsk[ xsk_idx ]->ring_rx.depth;
860 0 : ulong const tx_depth = ctx->free_tx[ xsk_idx ].depth;
861 :
862 0 : fd_ring_desc_t * fill = &xsk->ring_fr;
863 0 : uint fill_prod = fill->cached_prod;
864 0 : for( ulong j=0UL; j<rx_depth; j++ ) {
865 0 : fill->frame_ring[ j ] = frame_off;
866 0 : frame_off += frame_sz;
867 0 : }
868 0 : FD_VOLATILE( *fill->prod ) = fill->cached_prod = fill_prod + (uint)rx_depth;
869 :
870 0 : ulong const umem_base = xsk->umem.addr;
871 0 : for( ulong j=0; j<tx_depth; j++ ) {
872 0 : ctx->free_tx[ xsk_idx ].queue[ j ] = umem_base + frame_off;
873 0 : frame_off += frame_sz;
874 0 : }
875 0 : ctx->free_tx[ xsk_idx ].prod = tx_depth;
876 0 : ctx->free_tx[ xsk_idx ].depth = tx_depth;
877 0 : }
878 :
879 : /* init_link_session is part of privileged_init. It only runs on net
880 : tile 0. This function does shared pre-configuration used by all
881 : other net tiles. This includes installing the XDP program and
882 : setting up the XSKMAP into which the other net tiles can register
883 : themselves into.
884 :
885 : session, link_session, lo_session get initialized with session
886 : objects. tile points to the net tile's config. if_idx, lo_idx
887 : locate the device IDs of the main and loopback interface.
888 : *xsk_map_fd, *lo_xsk_map_fd are set to the newly created XSKMAP file
889 : descriptors.
890 :
891 : Note that if the main interface is loopback, then the loopback-
892 : related structures are uninitialized.
893 :
894 : Kernel object references:
895 :
896 : BPF_LINK file descriptor
897 : |
898 : +-> XDP program installation on NIC
899 : | |
900 : | +-> XDP program <-- BPF_PROG file descriptor (prog_fd)
901 : |
902 : +-> XSKMAP object <-- BPF_MAP file descriptor (xsk_map)
903 : |
904 : +-> BPF_MAP object <-- BPF_MAP file descriptor (udp_dsts) */
905 :
906 : static void
907 : privileged_init( fd_topo_t * topo,
908 0 : fd_topo_tile_t * tile ) {
909 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
910 :
911 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
912 :
913 0 : fd_net_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_net_ctx_t), sizeof(fd_net_ctx_t) );
914 0 : fd_memset( ctx, 0, sizeof(fd_net_ctx_t) );
915 :
916 0 : uint if_idx = if_nametoindex( tile->net.interface );
917 0 : if( FD_UNLIKELY( !if_idx ) ) FD_LOG_ERR(( "if_nametoindex(%s) failed", tile->net.interface ));
918 :
919 : /* Create and install XSKs */
920 :
921 0 : int xsk_map_fd = 123462;
922 0 : ctx->prog_link_fds[ 0 ] = 123463;
923 0 : ctx->xsk[ 0 ] =
924 0 : fd_xsk_join(
925 0 : fd_xsk_new( FD_SCRATCH_ALLOC_APPEND( l, fd_xsk_align(), fd_xsk_footprint( FD_NET_MTU, tile->net.xdp_rx_queue_size, tile->net.xdp_rx_queue_size, tile->net.xdp_tx_queue_size, tile->net.xdp_tx_queue_size ) ),
926 0 : FD_NET_MTU,
927 0 : tile->net.xdp_rx_queue_size,
928 0 : tile->net.xdp_rx_queue_size,
929 0 : tile->net.xdp_tx_queue_size,
930 0 : tile->net.xdp_tx_queue_size ) );
931 0 : if( FD_UNLIKELY( !ctx->xsk[ 0 ] ) ) FD_LOG_ERR(( "fd_xsk_new failed" ));
932 0 : uint flags = tile->net.zero_copy ? XDP_ZEROCOPY : XDP_COPY;
933 0 : if( FD_UNLIKELY( !fd_xsk_init( ctx->xsk[ 0 ], if_idx, (uint)tile->kind_id, flags ) ) ) FD_LOG_ERR(( "failed to bind xsk for net tile %lu", tile->kind_id ));
934 0 : if( FD_UNLIKELY( !fd_xsk_activate( ctx->xsk[ 0 ], xsk_map_fd ) ) ) FD_LOG_ERR(( "failed to activate xsk for net tile %lu", tile->kind_id ));
935 :
936 0 : if( FD_UNLIKELY( fd_sandbox_gettid()==fd_sandbox_getpid() ) ) {
937 : /* Kind of gross.. in single threaded mode we don't want to close the xsk_map_fd
938 : since it's shared with other net tiles. Just check for that by seeing if we
939 : are the only thread in the process. */
940 0 : if( FD_UNLIKELY( -1==close( xsk_map_fd ) ) ) FD_LOG_ERR(( "close(%d) failed (%d-%s)", xsk_map_fd, errno, fd_io_strerror( errno ) ));
941 0 : }
942 :
943 0 : ctx->free_tx[ 0 ].queue = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), tile->net.xdp_tx_queue_size * sizeof(ulong) );
944 0 : ctx->free_tx[ 0 ].depth = tile->net.xdp_tx_queue_size;
945 :
946 : /* Networking tile at index 0 also binds to loopback (only queue 0 available on lo) */
947 :
948 0 : if( FD_UNLIKELY( strcmp( tile->net.interface, "lo" ) && !tile->kind_id ) ) {
949 0 : ctx->xsk_cnt = 2;
950 :
951 0 : ushort udp_port_candidates[] = {
952 0 : (ushort)tile->net.legacy_transaction_listen_port,
953 0 : (ushort)tile->net.quic_transaction_listen_port,
954 0 : (ushort)tile->net.shred_listen_port,
955 0 : (ushort)tile->net.gossip_listen_port,
956 0 : (ushort)tile->net.repair_intake_listen_port,
957 0 : (ushort)tile->net.repair_serve_listen_port,
958 0 : };
959 :
960 0 : uint lo_idx = if_nametoindex( "lo" );
961 0 : if( FD_UNLIKELY( !lo_idx ) ) FD_LOG_ERR(( "if_nametoindex(lo) failed" ));
962 :
963 0 : fd_xdp_fds_t lo_fds = fd_xdp_install( lo_idx,
964 0 : tile->net.src_ip_addr,
965 0 : sizeof(udp_port_candidates)/sizeof(udp_port_candidates[0]),
966 0 : udp_port_candidates,
967 0 : "skb" );
968 :
969 0 : ctx->prog_link_fds[ 1 ] = lo_fds.prog_link_fd;
970 0 : ctx->xsk[ 1 ] =
971 0 : fd_xsk_join(
972 0 : fd_xsk_new( FD_SCRATCH_ALLOC_APPEND( l, fd_xsk_align(), fd_xsk_footprint( FD_NET_MTU, tile->net.xdp_rx_queue_size, tile->net.xdp_rx_queue_size, tile->net.xdp_tx_queue_size, tile->net.xdp_tx_queue_size ) ),
973 0 : FD_NET_MTU,
974 0 : tile->net.xdp_rx_queue_size,
975 0 : tile->net.xdp_rx_queue_size,
976 0 : tile->net.xdp_tx_queue_size,
977 0 : tile->net.xdp_tx_queue_size ) );
978 0 : if( FD_UNLIKELY( !ctx->xsk[ 1 ] ) ) FD_LOG_ERR(( "fd_xsk_join failed" ));
979 0 : if( FD_UNLIKELY( !fd_xsk_init( ctx->xsk[ 1 ], lo_idx, (uint)tile->kind_id, 0 /* flags */ ) ) ) FD_LOG_ERR(( "failed to bind lo_xsk" ));
980 0 : if( FD_UNLIKELY( !fd_xsk_activate( ctx->xsk[ 1 ], lo_fds.xsk_map_fd ) ) ) FD_LOG_ERR(( "failed to activate lo_xsk" ));
981 0 : if( FD_UNLIKELY( -1==close( lo_fds.xsk_map_fd ) ) ) FD_LOG_ERR(( "close(%d) failed (%d-%s)", xsk_map_fd, errno, fd_io_strerror( errno ) ));
982 :
983 0 : ctx->free_tx[ 1 ].queue = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), tile->net.xdp_tx_queue_size * sizeof(ulong) );
984 0 : ctx->free_tx[ 1 ].depth = tile->net.xdp_tx_queue_size;
985 0 : }
986 :
987 0 : double tick_per_ns = fd_tempo_tick_per_ns( NULL );
988 0 : ctx->xdp_stats_interval_ticks = (long)( FD_XDP_STATS_INTERVAL_NS * tick_per_ns );
989 :
990 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
991 0 : if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
992 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
993 0 : }
994 :
995 : static void
996 : unprivileged_init( fd_topo_t * topo,
997 0 : fd_topo_tile_t * tile ) {
998 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
999 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1000 0 : fd_net_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_net_ctx_t), sizeof(fd_net_ctx_t) );
1001 :
1002 0 : ctx->net_tile_id = (uint)tile->kind_id;
1003 0 : ctx->net_tile_cnt = (uint)fd_topo_tile_name_cnt( topo, tile->name );
1004 :
1005 0 : ctx->src_ip_addr = tile->net.src_ip_addr;
1006 0 : memcpy( ctx->src_mac_addr, tile->net.src_mac_addr, 6UL );
1007 :
1008 0 : ctx->shred_listen_port = tile->net.shred_listen_port;
1009 0 : ctx->quic_transaction_listen_port = tile->net.quic_transaction_listen_port;
1010 0 : ctx->legacy_transaction_listen_port = tile->net.legacy_transaction_listen_port;
1011 0 : ctx->gossip_listen_port = tile->net.gossip_listen_port;
1012 0 : ctx->repair_intake_listen_port = tile->net.repair_intake_listen_port;
1013 0 : ctx->repair_serve_listen_port = tile->net.repair_serve_listen_port;
1014 :
1015 : /* Put a bound on chunks we read from the input, to make sure they
1016 : are within in the data region of the workspace. */
1017 :
1018 0 : if( FD_UNLIKELY( !tile->in_cnt ) ) FD_LOG_ERR(( "net tile in link cnt is zero" ));
1019 0 : if( FD_UNLIKELY( tile->in_cnt>MAX_NET_INS ) ) FD_LOG_ERR(( "net tile in link cnt %lu exceeds MAX_NET_INS %lu", tile->in_cnt, MAX_NET_INS ));
1020 0 : FD_TEST( tile->in_cnt<=32 );
1021 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
1022 0 : fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
1023 0 : if( FD_UNLIKELY( link->mtu!=FD_NET_MTU ) ) FD_LOG_ERR(( "net tile in link does not have a normal MTU" ));
1024 :
1025 0 : ctx->in[ i ].mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1026 0 : ctx->in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in[ i ].mem, link->dcache );
1027 0 : ctx->in[ i ].wmark = fd_dcache_compact_wmark( ctx->in[ i ].mem, link->dcache, link->mtu );
1028 0 : }
1029 :
1030 0 : for( ulong i = 0; i < tile->out_cnt; i++ ) {
1031 0 : fd_topo_link_t * out_link = &topo->links[ tile->out_link_id[ i ] ];
1032 0 : if( strcmp( out_link->name, "net_quic" ) == 0 ) {
1033 0 : fd_topo_link_t * quic_out = out_link;
1034 0 : ctx->quic_out->mcache = quic_out->mcache;
1035 0 : ctx->quic_out->sync = fd_mcache_seq_laddr( ctx->quic_out->mcache );
1036 0 : ctx->quic_out->depth = fd_mcache_depth( ctx->quic_out->mcache );
1037 0 : ctx->quic_out->seq = fd_mcache_seq_query( ctx->quic_out->sync );
1038 0 : ctx->quic_out->chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( quic_out->dcache ), quic_out->dcache );
1039 0 : ctx->quic_out->mem = topo->workspaces[ topo->objs[ quic_out->dcache_obj_id ].wksp_id ].wksp;
1040 0 : ctx->quic_out->wmark = fd_dcache_compact_wmark ( ctx->quic_out->mem, quic_out->dcache, quic_out->mtu );
1041 0 : ctx->quic_out->chunk = ctx->quic_out->chunk0;
1042 0 : } else if( strcmp( out_link->name, "net_shred" ) == 0 ) {
1043 0 : fd_topo_link_t * shred_out = out_link;
1044 0 : ctx->shred_out->mcache = shred_out->mcache;
1045 0 : ctx->shred_out->sync = fd_mcache_seq_laddr( ctx->shred_out->mcache );
1046 0 : ctx->shred_out->depth = fd_mcache_depth( ctx->shred_out->mcache );
1047 0 : ctx->shred_out->seq = fd_mcache_seq_query( ctx->shred_out->sync );
1048 0 : ctx->shred_out->chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( shred_out->dcache ), shred_out->dcache );
1049 0 : ctx->shred_out->mem = topo->workspaces[ topo->objs[ shred_out->dcache_obj_id ].wksp_id ].wksp;
1050 0 : ctx->shred_out->wmark = fd_dcache_compact_wmark ( ctx->shred_out->mem, shred_out->dcache, shred_out->mtu );
1051 0 : ctx->shred_out->chunk = ctx->shred_out->chunk0;
1052 0 : } else if( strcmp( out_link->name, "net_gossip" ) == 0 ) {
1053 0 : fd_topo_link_t * gossip_out = out_link;
1054 0 : ctx->gossip_out->mcache = gossip_out->mcache;
1055 0 : ctx->gossip_out->sync = fd_mcache_seq_laddr( ctx->gossip_out->mcache );
1056 0 : ctx->gossip_out->depth = fd_mcache_depth( ctx->gossip_out->mcache );
1057 0 : ctx->gossip_out->seq = fd_mcache_seq_query( ctx->gossip_out->sync );
1058 0 : ctx->gossip_out->chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( gossip_out->dcache ), gossip_out->dcache );
1059 0 : ctx->gossip_out->mem = topo->workspaces[ topo->objs[ gossip_out->dcache_obj_id ].wksp_id ].wksp;
1060 0 : ctx->gossip_out->wmark = fd_dcache_compact_wmark ( ctx->gossip_out->mem, gossip_out->dcache, gossip_out->mtu );
1061 0 : ctx->gossip_out->chunk = ctx->gossip_out->chunk0;
1062 0 : } else if( strcmp( out_link->name, "net_repair" ) == 0 ) {
1063 0 : fd_topo_link_t * repair_out = out_link;
1064 0 : ctx->repair_out->mcache = repair_out->mcache;
1065 0 : ctx->repair_out->sync = fd_mcache_seq_laddr( ctx->repair_out->mcache );
1066 0 : ctx->repair_out->depth = fd_mcache_depth( ctx->repair_out->mcache );
1067 0 : ctx->repair_out->seq = fd_mcache_seq_query( ctx->repair_out->sync );
1068 0 : ctx->repair_out->chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( repair_out->dcache ), repair_out->dcache );
1069 0 : ctx->repair_out->mem = topo->workspaces[ topo->objs[ repair_out->dcache_obj_id ].wksp_id ].wksp;
1070 0 : ctx->repair_out->wmark = fd_dcache_compact_wmark ( ctx->repair_out->mem, repair_out->dcache, repair_out->mtu );
1071 0 : ctx->repair_out->chunk = ctx->repair_out->chunk0;
1072 0 : } else if( strcmp( out_link->name, "net_netlink" ) == 0 ) {
1073 0 : fd_topo_link_t * netlink_out = out_link;
1074 0 : ctx->neigh4_solicit->mcache = netlink_out->mcache;
1075 0 : ctx->neigh4_solicit->depth = fd_mcache_depth( ctx->neigh4_solicit->mcache );
1076 0 : ctx->neigh4_solicit->seq = fd_mcache_seq_query( fd_mcache_seq_laddr( ctx->neigh4_solicit->mcache ) );
1077 0 : } else {
1078 0 : FD_LOG_ERR(( "unrecognized out link `%s`", out_link->name ));
1079 0 : }
1080 0 : }
1081 :
1082 : /* Check if any of the tiles we set a listen port for do not have an outlink. */
1083 0 : if( FD_UNLIKELY( ctx->shred_listen_port!=0 && ctx->shred_out->mcache==NULL ) ) {
1084 0 : FD_LOG_ERR(( "shred listen port set but no out link was found" ));
1085 0 : } else if( FD_UNLIKELY( ctx->quic_transaction_listen_port!=0 && ctx->quic_out->mcache==NULL ) ) {
1086 0 : FD_LOG_ERR(( "quic transaction listen port set but no out link was found" ));
1087 0 : } else if( FD_UNLIKELY( ctx->legacy_transaction_listen_port!=0 && ctx->quic_out->mcache==NULL ) ) {
1088 0 : FD_LOG_ERR(( "legacy transaction listen port set but no out link was found" ));
1089 0 : } else if( FD_UNLIKELY( ctx->gossip_listen_port!=0 && ctx->gossip_out->mcache==NULL ) ) {
1090 0 : FD_LOG_ERR(( "gossip listen port set but no out link was found" ));
1091 0 : } else if( FD_UNLIKELY( ctx->repair_intake_listen_port!=0 && ctx->repair_out->mcache==NULL ) ) {
1092 0 : FD_LOG_ERR(( "repair intake port set but no out link was found" ));
1093 0 : } else if( FD_UNLIKELY( ctx->repair_serve_listen_port!=0 && ctx->repair_out->mcache==NULL ) ) {
1094 0 : FD_LOG_ERR(( "repair serve listen port set but no out link was found" ));
1095 0 : } else if( FD_UNLIKELY( ctx->neigh4_solicit->mcache==NULL ) ) {
1096 0 : FD_LOG_ERR(( "netlink request link not found" ));
1097 0 : }
1098 :
1099 0 : for( uint j=0U; j<2U; j++ ) {
1100 0 : ctx->tx_flusher[ j ].pending_wmark = (ulong)( (double)tile->net.xdp_tx_queue_size * 0.7 );
1101 0 : ctx->tx_flusher[ j ].tail_flush_backoff = (long)( (double)tile->net.tx_flush_timeout_ns * fd_tempo_tick_per_ns( NULL ) );
1102 0 : ctx->tx_flusher[ j ].next_tail_flush_ticks = LONG_MAX;
1103 0 : }
1104 :
1105 : /* Join netbase objects */
1106 0 : ctx->fib_local = fd_fib4_join( fd_topo_obj_laddr( topo, tile->net.fib4_local_obj_id ) );
1107 0 : ctx->fib_main = fd_fib4_join( fd_topo_obj_laddr( topo, tile->net.fib4_main_obj_id ) );
1108 0 : if( FD_UNLIKELY( !ctx->fib_local || !ctx->fib_main ) ) FD_LOG_ERR(( "fd_fib4_join failed" ));
1109 0 : if( FD_UNLIKELY( !fd_neigh4_hmap_join(
1110 0 : ctx->neigh4,
1111 0 : fd_topo_obj_laddr( topo, tile->net.neigh4_obj_id ),
1112 0 : fd_topo_obj_laddr( topo, tile->net.neigh4_ele_obj_id ) ) ) ) {
1113 0 : FD_LOG_ERR(( "fd_neigh4_hmap_join failed" ));
1114 0 : }
1115 :
1116 0 : for( uint j=0U; j<ctx->xsk_cnt; j++ ) {
1117 0 : net_xsk_bootstrap( ctx, j );
1118 0 : net_rx_wakeup( ctx, ctx->xsk[ j ] );
1119 0 : net_tx_wakeup( ctx, ctx->xsk[ j ] );
1120 0 : }
1121 0 : }
1122 :
1123 : static ulong
1124 : populate_allowed_seccomp( fd_topo_t const * topo,
1125 : fd_topo_tile_t const * tile,
1126 : ulong out_cnt,
1127 0 : struct sock_filter * out ) {
1128 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1129 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1130 0 : fd_net_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_net_ctx_t ), sizeof( fd_net_ctx_t ) );
1131 :
1132 : /* A bit of a hack, if there is no loopback XSK for this tile, we still need to pass
1133 : two "allow" FD arguments to the net policy, so we just make them both the same. */
1134 0 : int allow_fd2 = ctx->xsk_cnt>1UL ? ctx->xsk[ 1 ]->xsk_fd : ctx->xsk[ 0 ]->xsk_fd;
1135 0 : FD_TEST( ctx->xsk[ 0 ]->xsk_fd >= 0 && allow_fd2 >= 0 );
1136 0 : populate_sock_filter_policy_net( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->xsk[ 0 ]->xsk_fd, (uint)allow_fd2 );
1137 0 : return sock_filter_policy_net_instr_cnt;
1138 0 : }
1139 :
1140 : static ulong
1141 : populate_allowed_fds( fd_topo_t const * topo,
1142 : fd_topo_tile_t const * tile,
1143 : ulong out_fds_cnt,
1144 0 : int * out_fds ) {
1145 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1146 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1147 0 : fd_net_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_net_ctx_t ), sizeof( fd_net_ctx_t ) );
1148 :
1149 0 : if( FD_UNLIKELY( out_fds_cnt<6UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
1150 :
1151 0 : ulong out_cnt = 0UL;
1152 :
1153 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
1154 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
1155 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
1156 :
1157 0 : out_fds[ out_cnt++ ] = ctx->xsk[ 0 ]->xsk_fd;
1158 0 : out_fds[ out_cnt++ ] = ctx->prog_link_fds[ 0 ];
1159 0 : if( FD_LIKELY( ctx->xsk_cnt>1UL ) ) out_fds[ out_cnt++ ] = ctx->xsk[ 1 ]->xsk_fd;
1160 0 : if( FD_LIKELY( ctx->xsk_cnt>1UL ) ) out_fds[ out_cnt++ ] = ctx->prog_link_fds[ 1 ];
1161 0 : return out_cnt;
1162 0 : }
1163 :
1164 0 : #define STEM_BURST (1UL)
1165 0 : #define STEM_LAZY ((ulong)30e3) /* 30 us */
1166 :
1167 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_net_ctx_t
1168 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_net_ctx_t)
1169 :
1170 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
1171 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
1172 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
1173 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
1174 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
1175 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
1176 :
1177 : #include "../../../../disco/stem/fd_stem.c"
1178 :
1179 : fd_topo_run_tile_t fd_tile_net = {
1180 : .name = "net",
1181 : .populate_allowed_seccomp = populate_allowed_seccomp,
1182 : .populate_allowed_fds = populate_allowed_fds,
1183 : .scratch_align = scratch_align,
1184 : .scratch_footprint = scratch_footprint,
1185 : .privileged_init = privileged_init,
1186 : .unprivileged_init = unprivileged_init,
1187 : .run = stem_run,
1188 : };
|