Line data Source code
1 : #if !defined(__linux__)
2 : #error "fd_xsk requires Linux operating system with XDP support"
3 : #endif
4 :
5 : #include <linux/if_xdp.h>
6 : #include <linux/limits.h>
7 :
8 : #include <net/if.h>
9 : #include <sys/socket.h>
10 : #include <sys/types.h>
11 :
12 : #include <unistd.h>
13 :
14 : #include <errno.h>
15 : #include <string.h>
16 : #include <stdlib.h>
17 : #include <stdio.h>
18 : #include <sys/mman.h>
19 :
20 : #include "fd_xsk_private.h"
21 : #include "fd_xdp_redirect_user.h"
22 :
23 : /* TODO move this into more appropriate header file
24 : and set based on architecture, etc. */
25 : #define FD_ACQUIRE FD_COMPILER_MFENCE
26 138 : #define FD_RELEASE FD_COMPILER_MFENCE
27 :
28 : /* Set to 1 to trace packet events to debug log */
29 :
30 : #if 0
31 : #define TRACE_PACKET(...) FD_LOG_DEBUG(( __VA_ARGS__ ))
32 : #else
33 : #define TRACE_PACKET(...)
34 : #endif
35 :
36 : ulong
37 6 : fd_xsk_align( void ) {
38 6 : return FD_XSK_ALIGN;
39 6 : }
40 :
41 : static ulong
42 : fd_xsk_umem_footprint( ulong frame_sz,
43 : ulong fr_depth,
44 : ulong rx_depth,
45 : ulong tx_depth,
46 15 : ulong cr_depth ) {
47 : /* TODO overflow checks */
48 15 : ulong sz = 0UL;
49 15 : sz+=fd_ulong_align_up( fr_depth*frame_sz, FD_XSK_ALIGN );
50 15 : sz+=fd_ulong_align_up( rx_depth*frame_sz, FD_XSK_ALIGN );
51 15 : sz+=fd_ulong_align_up( tx_depth*frame_sz, FD_XSK_ALIGN );
52 15 : sz+=fd_ulong_align_up( cr_depth*frame_sz, FD_XSK_ALIGN );
53 15 : return sz;
54 15 : }
55 :
56 : ulong
57 : fd_xsk_footprint( ulong frame_sz,
58 : ulong fr_depth,
59 : ulong rx_depth,
60 : ulong tx_depth,
61 57 : ulong cr_depth ) {
62 :
63 : /* Linux 4.18 requires XSK frames to be 2048-byte aligned and no
64 : larger than page size. */
65 57 : if( FD_UNLIKELY( frame_sz!=2048UL && frame_sz!=4096UL ) ) return 0UL;
66 39 : if( FD_UNLIKELY( fr_depth==0UL ) ) return 0UL;
67 33 : if( FD_UNLIKELY( rx_depth==0UL ) ) return 0UL;
68 27 : if( FD_UNLIKELY( tx_depth==0UL ) ) return 0UL;
69 21 : if( FD_UNLIKELY( cr_depth==0UL ) ) return 0UL;
70 :
71 : /* TODO overflow checks */
72 15 : return fd_ulong_align_up( sizeof(fd_xsk_t), FD_XSK_UMEM_ALIGN )
73 15 : + fd_xsk_umem_footprint( frame_sz, fr_depth, rx_depth, tx_depth, cr_depth );
74 21 : }
75 :
76 : /* New/delete *********************************************************/
77 :
78 : void *
79 : fd_xsk_new( void * shmem,
80 : ulong frame_sz,
81 : ulong fr_depth,
82 : ulong rx_depth,
83 : ulong tx_depth,
84 33 : ulong cr_depth ) {
85 :
86 : /* Validate arguments */
87 :
88 33 : if( FD_UNLIKELY( !shmem ) ) {
89 3 : FD_LOG_WARNING(( "NULL shmem" ));
90 3 : return NULL;
91 3 : }
92 :
93 30 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_xsk_align() ) ) ) {
94 3 : FD_LOG_WARNING(( "misaligned shmem" ));
95 3 : return NULL;
96 3 : }
97 :
98 27 : fd_xsk_t * xsk = (fd_xsk_t *)shmem;
99 :
100 27 : ulong footprint = fd_xsk_footprint( frame_sz, fr_depth, rx_depth, tx_depth, cr_depth );
101 27 : if( FD_UNLIKELY( !footprint ) ) {
102 21 : FD_LOG_WARNING(( "invalid footprint for config" ));
103 21 : return NULL;
104 21 : }
105 :
106 : /* Reset fd_xsk_t state. No need to clear UMEM area */
107 :
108 6 : fd_memset( xsk, 0, sizeof(fd_xsk_t) );
109 :
110 6 : xsk->xsk_fd = -1;
111 6 : xsk->xdp_map_fd = -1;
112 6 : xsk->xdp_udp_map_fd = -1;
113 :
114 : /* Copy config */
115 :
116 6 : xsk->params.frame_sz = frame_sz;
117 6 : xsk->params.fr_depth = fr_depth;
118 6 : xsk->params.rx_depth = rx_depth;
119 6 : xsk->params.tx_depth = tx_depth;
120 6 : xsk->params.cr_depth = cr_depth;
121 :
122 : /* Derive offsets (TODO overflow check) */
123 :
124 6 : ulong xsk_off = 0UL;
125 6 : xsk_off+=fr_depth*frame_sz;
126 6 : xsk_off+=rx_depth*frame_sz;
127 6 : xsk_off+=tx_depth*frame_sz;
128 6 : xsk_off+=cr_depth*frame_sz;
129 6 : xsk->params.umem_sz = xsk_off;
130 :
131 : /* Mark object as valid */
132 :
133 6 : FD_COMPILER_MFENCE();
134 6 : FD_VOLATILE( xsk->magic ) = FD_XSK_MAGIC;
135 6 : FD_COMPILER_MFENCE();
136 :
137 6 : return (void *)xsk;
138 27 : }
139 :
140 : void *
141 9 : fd_xsk_delete( void * shxsk ) {
142 :
143 9 : if( FD_UNLIKELY( !shxsk ) ) {
144 0 : FD_LOG_WARNING(( "NULL shxsk" ));
145 0 : return NULL;
146 0 : }
147 :
148 9 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shxsk, fd_xsk_align() ) ) ) {
149 0 : FD_LOG_WARNING(( "misaligned shxsk" ));
150 0 : return NULL;
151 0 : }
152 :
153 9 : fd_xsk_t * xsk = (fd_xsk_t *)shxsk;
154 :
155 9 : if( FD_UNLIKELY( xsk->magic!=FD_XSK_MAGIC ) ) {
156 3 : FD_LOG_WARNING(( "bad magic" ));
157 3 : return NULL;
158 3 : }
159 :
160 6 : FD_COMPILER_MFENCE();
161 6 : FD_VOLATILE( xsk->magic ) = 0UL;
162 6 : FD_COMPILER_MFENCE();
163 :
164 6 : return (void *)xsk;
165 9 : }
166 :
167 : /* Join/leave *********************************************************/
168 :
169 : /* fd_xsk_mmap_offset_cstr: Returns a cstr describing the given offset
170 : param (6th argument of mmap(2)) assuming fd (5th param of mmap(2)) is
171 : an XSK file descriptor. Returned cstr is valid until next call. */
172 : static char const *
173 0 : fd_xsk_mmap_offset_cstr( long mmap_off ) {
174 0 : switch( mmap_off ) {
175 0 : case XDP_PGOFF_RX_RING: return "XDP_PGOFF_RX_RING";
176 0 : case XDP_PGOFF_TX_RING: return "XDP_PGOFF_TX_RING";
177 0 : case XDP_UMEM_PGOFF_FILL_RING: return "XDP_UMEM_PGOFF_FILL_RING";
178 0 : case XDP_UMEM_PGOFF_COMPLETION_RING: return "XDP_UMEM_PGOFF_COMPLETION_RING";
179 0 : default: {
180 0 : static char buf[ 19UL ];
181 0 : snprintf( buf, 19UL, "0x%lx", (ulong)mmap_off );
182 0 : return buf;
183 0 : }
184 0 : }
185 0 : }
186 :
187 : /* fd_xsk_mmap_ring maps the given XSK ring into the local address space
188 : and populates fd_ring_desc_t. Every successful call to this function
189 : should eventually be paired with a call to fd_xsk_munmap_ring(). */
190 : static int
191 : fd_xsk_mmap_ring( fd_ring_desc_t * ring,
192 : int xsk_fd,
193 : long map_off,
194 : ulong elem_sz,
195 : ulong depth,
196 0 : struct xdp_ring_offset const * ring_offset ) {
197 : /* TODO what is ring_offset->desc ? */
198 : /* TODO: mmap was originally called with MAP_POPULATE,
199 : but this symbol isn't available with this build */
200 :
201 : /* sanity check */
202 0 : if( depth > (ulong)UINT_MAX ) {
203 0 : return -1;
204 0 : }
205 :
206 0 : ulong map_sz = ring_offset->desc + depth*elem_sz;
207 :
208 0 : void * res = mmap( NULL, map_sz, PROT_READ|PROT_WRITE, MAP_SHARED, xsk_fd, map_off );
209 0 : if( FD_UNLIKELY( !res ) ) {
210 0 : FD_LOG_WARNING(( "mmap(NULL, %lu, PROT_READ|PROT_WRITE, MAP_SHARED, xsk_fd, %s) failed (%i-%s)",
211 0 : map_sz, fd_xsk_mmap_offset_cstr( map_off ), errno, fd_io_strerror( errno ) ));
212 0 : return -1;
213 0 : }
214 :
215 : /* TODO add unit test asserting that cached prod/cons seq gets
216 : cleared on join */
217 0 : fd_memset( ring, 0, sizeof(fd_ring_desc_t) );
218 :
219 0 : ring->mem = res;
220 0 : ring->map_sz = map_sz;
221 0 : ring->depth = (uint)depth;
222 0 : ring->ptr = (void *)( (ulong)res + ring_offset->desc );
223 0 : ring->flags = (uint *)( (ulong)res + ring_offset->flags );
224 0 : ring->prod = (uint *)( (ulong)res + ring_offset->producer );
225 0 : ring->cons = (uint *)( (ulong)res + ring_offset->consumer );
226 :
227 0 : return 0;
228 0 : }
229 :
230 : /* fd_xsk_munmap_ring unmaps the given XSK ring from the local address
231 : space and zeroes fd_ring_desc_t. */
232 : static void
233 : fd_xsk_munmap_ring( fd_ring_desc_t * ring,
234 0 : long map_off ) {
235 0 : if( FD_UNLIKELY( !ring->mem ) ) return;
236 :
237 0 : void * mem = ring->mem;
238 0 : ulong sz = ring->map_sz;
239 :
240 0 : fd_memset( ring, 0, sizeof(fd_ring_desc_t) );
241 :
242 0 : if( FD_UNLIKELY( 0!=munmap( mem, sz ) ) )
243 0 : FD_LOG_WARNING(( "munmap(%p, %lu) on %s ring failed (%i-%s)",
244 0 : mem, sz, fd_xsk_mmap_offset_cstr( map_off ), errno, fd_io_strerror( errno ) ));
245 0 : }
246 :
247 : /* fd_xsk_cleanup undoes a (partial) join by releasing all active kernel
248 : objects, such as mapped memory regions and file descriptors. Assumes
249 : that no join to `xsk` is currently being used. */
250 :
251 : fd_xsk_t *
252 0 : fd_xsk_fini( fd_xsk_t * xsk ) {
253 : /* Undo memory mappings */
254 :
255 0 : fd_xsk_munmap_ring( &xsk->ring_rx, XDP_PGOFF_RX_RING );
256 0 : fd_xsk_munmap_ring( &xsk->ring_tx, XDP_PGOFF_TX_RING );
257 0 : fd_xsk_munmap_ring( &xsk->ring_fr, XDP_UMEM_PGOFF_FILL_RING );
258 0 : fd_xsk_munmap_ring( &xsk->ring_cr, XDP_UMEM_PGOFF_COMPLETION_RING );
259 :
260 : /* Release eBPF map FDs */
261 :
262 0 : if( FD_LIKELY( xsk->xdp_map_fd>=0 ) ) {
263 0 : close( xsk->xdp_map_fd );
264 0 : xsk->xdp_map_fd = -1;
265 0 : }
266 0 : if( FD_LIKELY( xsk->xdp_udp_map_fd>=0 ) ) {
267 0 : close( xsk->xdp_udp_map_fd );
268 0 : xsk->xdp_udp_map_fd = -1;
269 0 : }
270 :
271 : /* Release XSK */
272 :
273 0 : if( FD_LIKELY( xsk->xsk_fd>=0 ) ) {
274 : /* Clear XSK descriptors */
275 0 : fd_memset( &xsk->offsets, 0, sizeof(struct xdp_mmap_offsets) );
276 0 : fd_memset( &xsk->umem, 0, sizeof(struct xdp_umem_reg) );
277 : /* Close XSK */
278 0 : close( xsk->xsk_fd );
279 0 : xsk->xsk_fd = -1;
280 0 : }
281 :
282 0 : return xsk;
283 0 : }
284 :
285 : /* fd_xsk_setup_umem: Initializes xdp_umem_reg and hooks up XSK with
286 : UMEM rings via setsockopt(). Retrieves xdp_mmap_offsets via
287 : getsockopt(). Returns 1 on success, 0 on failure. */
288 : static int
289 0 : fd_xsk_setup_umem( fd_xsk_t * xsk ) {
290 : /* Find byte offset of UMEM area */
291 0 : ulong umem_off = fd_ulong_align_up( sizeof(fd_xsk_t), FD_XSK_UMEM_ALIGN );
292 :
293 : /* Initialize xdp_umem_reg */
294 0 : xsk->umem.headroom = 0; /* TODO no need for headroom for now */
295 0 : xsk->umem.addr = (ulong)xsk + umem_off;
296 0 : xsk->umem.chunk_size = (uint)xsk->params.frame_sz;
297 0 : xsk->umem.len = xsk->params.umem_sz;
298 :
299 : /* Register UMEM region */
300 0 : int res;
301 0 : res = setsockopt( xsk->xsk_fd, SOL_XDP, XDP_UMEM_REG,
302 0 : &xsk->umem, sizeof(struct xdp_umem_reg) );
303 0 : if( FD_UNLIKELY( res!=0 ) ) {
304 0 : FD_LOG_WARNING(( "setsockopt(SOL_XDP, XDP_UMEM_REG) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
305 0 : return -1;
306 0 : }
307 :
308 : /* Set ring frame counts */
309 0 : # define FD_SET_XSK_RING_DEPTH(name, var) \
310 0 : do { \
311 0 : res = setsockopt( xsk->xsk_fd, SOL_XDP, name, &(var), 8UL ); \
312 0 : if( FD_UNLIKELY( res!=0 ) ) { \
313 0 : FD_LOG_WARNING(( "setsockopt(SOL_XDP, " #name ") failed (%i-%s)", \
314 0 : errno, fd_io_strerror( errno ) )); \
315 0 : return -1; \
316 0 : } \
317 0 : } while(0)
318 0 : FD_SET_XSK_RING_DEPTH( XDP_UMEM_FILL_RING, xsk->params.fr_depth );
319 0 : FD_SET_XSK_RING_DEPTH( XDP_RX_RING, xsk->params.rx_depth );
320 0 : FD_SET_XSK_RING_DEPTH( XDP_TX_RING, xsk->params.tx_depth );
321 0 : FD_SET_XSK_RING_DEPTH( XDP_UMEM_COMPLETION_RING, xsk->params.cr_depth );
322 0 : # undef FD_SET_XSK_RING_DEPTH
323 :
324 : /* Request ring offsets */
325 0 : socklen_t offsets_sz = sizeof(struct xdp_mmap_offsets);
326 0 : res = getsockopt( xsk->xsk_fd, SOL_XDP, XDP_MMAP_OFFSETS,
327 0 : &xsk->offsets, &offsets_sz );
328 0 : if( FD_UNLIKELY( res!=0 ) ) {
329 0 : FD_LOG_WARNING(( "getsockopt(SOL_XDP, XDP_MMAP_OFFSETS) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
330 0 : return -1;
331 0 : }
332 :
333 : /* OK */
334 0 : return 0;
335 0 : }
336 :
337 : /* fd_xsk_init: Creates and configures an XSK socket object, and
338 : attaches to a preinstalled XDP program. The various steps are
339 : implemented in fd_xsk_setup_{...}. */
340 :
341 : fd_xsk_t *
342 : fd_xsk_init( fd_xsk_t * xsk,
343 : uint if_idx,
344 : uint if_queue,
345 3 : uint bind_flags ) {
346 :
347 3 : if( FD_UNLIKELY( !xsk ) ) { FD_LOG_WARNING(( "NULL xsk" )); return NULL; }
348 :
349 : /* Create XDP socket (XSK) */
350 :
351 0 : xsk->xsk_fd = socket( AF_XDP, SOCK_RAW, 0 );
352 0 : if( FD_UNLIKELY( xsk->xsk_fd<0 ) ) {
353 0 : FD_LOG_WARNING(( "Failed to create XSK (%i-%s)", errno, fd_io_strerror( errno ) ));
354 0 : return NULL;
355 0 : }
356 :
357 : /* Associate UMEM region of fd_xsk_t with XSK via setsockopt() */
358 :
359 0 : if( FD_UNLIKELY( 0!=fd_xsk_setup_umem( xsk ) ) ) goto fail;
360 :
361 : /* Map XSK rings into local address space */
362 :
363 0 : if( FD_UNLIKELY( 0!=fd_xsk_mmap_ring( &xsk->ring_rx, xsk->xsk_fd, XDP_PGOFF_RX_RING, sizeof(struct xdp_desc), xsk->params.rx_depth, &xsk->offsets.rx ) ) ) goto fail;
364 0 : if( FD_UNLIKELY( 0!=fd_xsk_mmap_ring( &xsk->ring_tx, xsk->xsk_fd, XDP_PGOFF_TX_RING, sizeof(struct xdp_desc), xsk->params.tx_depth, &xsk->offsets.tx ) ) ) goto fail;
365 0 : if( FD_UNLIKELY( 0!=fd_xsk_mmap_ring( &xsk->ring_fr, xsk->xsk_fd, XDP_UMEM_PGOFF_FILL_RING, sizeof(ulong), xsk->params.fr_depth, &xsk->offsets.fr ) ) ) goto fail;
366 0 : if( FD_UNLIKELY( 0!=fd_xsk_mmap_ring( &xsk->ring_cr, xsk->xsk_fd, XDP_UMEM_PGOFF_COMPLETION_RING, sizeof(ulong), xsk->params.cr_depth, &xsk->offsets.cr ) ) ) goto fail;
367 :
368 : /* Bind XSK to queue on network interface */
369 :
370 0 : uint flags = XDP_USE_NEED_WAKEUP | bind_flags;
371 0 : struct sockaddr_xdp sa = {
372 0 : .sxdp_family = PF_XDP,
373 0 : .sxdp_ifindex = if_idx,
374 0 : .sxdp_queue_id = if_queue,
375 : /* See extended commentary below for details on on
376 : XDP_USE_NEED_WAKEUP flag. */
377 0 : .sxdp_flags = (ushort)flags
378 0 : };
379 :
380 0 : char if_name[ IF_NAMESIZE ] = {0};
381 :
382 0 : if( FD_UNLIKELY( 0!=bind( xsk->xsk_fd, (void *)&sa, sizeof(struct sockaddr_xdp) ) ) ) {
383 0 : FD_LOG_WARNING(( "bind( PF_XDP, ifindex=%u (%s), queue_id=%u, flags=%x ) failed (%i-%s)",
384 0 : if_idx, if_indextoname( if_idx, if_name ), if_queue, flags, errno, fd_io_strerror( errno ) ));
385 0 : goto fail;
386 0 : }
387 :
388 : /* We've seen that some popular Intel NICs seem to have a bug that
389 : prevents them from working in SKB mode with certain kernel
390 : versions. We can identify them by sendto returning ENXIO or EINVAL
391 : in newer versions. The core of the problem is that the kernel
392 : calls the generic ndo_bpf pointer instead of the driver-specific
393 : version. This means that the driver's pointer to the BPF program
394 : never gets set, yet the driver's wakeup function gets called. */
395 0 : if( FD_UNLIKELY( -1==sendto( xsk->xsk_fd, NULL, 0, MSG_DONTWAIT, NULL, 0 ) ) ) {
396 0 : if( FD_LIKELY( errno==ENXIO || errno==EINVAL ) ) {
397 0 : FD_LOG_ERR(( "xsk sendto failed xsk_fd=%d (%i-%s). This likely indicates "
398 0 : "a bug with your NIC driver. Try switching XDP mode using "
399 0 : "tiles.net.xdp_mode in the configuration TOML, and then running\n"
400 0 : "fdctl configure fini xdp --config path_to_configuration_toml.\n"
401 0 : "Certain Intel NICs with certain driver/kernel combinations "
402 0 : "are known to exhibit this issue in skb mode but work in drv "
403 0 : "mode.", xsk->xsk_fd, errno, fd_io_strerror( errno ) ));
404 0 : } else {
405 0 : FD_LOG_WARNING(( "xsk sendto failed xsk_fd=%d (%i-%s)", xsk->xsk_fd, errno, fd_io_strerror( errno ) ));
406 0 : }
407 0 : }
408 :
409 : /* XSK successfully configured. Traffic will arrive in XSK after
410 : configuring an XDP program to forward packets via XDP_REDIRECT.
411 : This requires providing the XSK file descriptor to the program via
412 : XSKMAP and is done in a separate step. */
413 :
414 0 : xsk->if_idx = if_idx;
415 0 : xsk->if_queue_id = if_queue;
416 :
417 0 : FD_LOG_INFO(( "AF_XDP socket initialized: bind( PF_XDP, ifindex=%u (%s), queue_id=%u, flags=%x ) success",
418 0 : if_idx, if_indextoname( if_idx, if_name ), if_queue, flags ));
419 :
420 0 : return xsk;
421 :
422 0 : fail:
423 0 : fd_xsk_fini( xsk );
424 0 : return NULL;
425 0 : }
426 :
427 : fd_xsk_t *
428 6 : fd_xsk_join( void * shxsk ) {
429 : /* TODO: Joining the same fd_xsk_t from two threads is invalid.
430 : Document that and add a lock. */
431 :
432 : /* Argument checks */
433 :
434 6 : if( FD_UNLIKELY( !shxsk ) ) {
435 0 : FD_LOG_WARNING(( "NULL shxsk" ));
436 0 : return NULL;
437 0 : }
438 :
439 6 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shxsk, fd_xsk_align() ) ) ) {
440 0 : FD_LOG_WARNING(( "misaligned shxsk" ));
441 0 : return NULL;
442 0 : }
443 :
444 : /* fd_xsk_t state coherence check. A successful call to fd_xsk_new()
445 : should not allow for any of these fail conditions. */
446 :
447 6 : fd_xsk_t * xsk = (fd_xsk_t *)shxsk;
448 :
449 6 : if( FD_UNLIKELY( xsk->magic!=FD_XSK_MAGIC ) ) {
450 3 : FD_LOG_WARNING(( "bad magic (not an fd_xsk_t?)" ));
451 3 : return NULL;
452 3 : }
453 :
454 3 : return xsk;
455 6 : }
456 :
457 : void *
458 0 : fd_xsk_leave( fd_xsk_t * xsk ) {
459 :
460 0 : if( FD_UNLIKELY( !xsk ) ) {
461 0 : FD_LOG_WARNING(( "NULL xsk" ));
462 0 : return NULL;
463 0 : }
464 :
465 0 : return (void *)xsk;
466 0 : }
467 :
468 : /* Public helper methods **********************************************/
469 :
470 : void *
471 18 : fd_xsk_umem_laddr( fd_xsk_t * xsk ) {
472 18 : return (void *)xsk->umem.addr;
473 18 : }
474 :
475 : FD_FN_PURE int
476 3 : fd_xsk_fd( fd_xsk_t * const xsk ) {
477 3 : return xsk->xsk_fd;
478 3 : }
479 :
480 : FD_FN_PURE uint
481 6 : fd_xsk_ifidx( fd_xsk_t * const xsk ) {
482 6 : return xsk->if_idx;
483 6 : }
484 :
485 : FD_FN_PURE uint
486 6 : fd_xsk_ifqueue( fd_xsk_t * const xsk ) {
487 6 : return xsk->if_queue_id;
488 6 : }
489 :
490 : /* RX/TX implementation ***********************************************/
491 :
492 : ulong
493 : fd_xsk_rx_enqueue( fd_xsk_t * xsk,
494 : ulong * offset,
495 36 : ulong count ) {
496 : /* to make frames available for receive, we enqueue onto the fill ring */
497 :
498 : /* fill ring */
499 36 : fd_ring_desc_t * fill = &xsk->ring_fr;
500 :
501 : /* fetch cached consumer, producer */
502 36 : uint prod = fill->cached_prod;
503 36 : uint cons = fill->cached_cons;
504 :
505 : /* assuming frame sizes are powers of 2 */
506 36 : ulong frame_mask = xsk->params.frame_sz - 1UL;
507 :
508 : /* ring capacity */
509 36 : uint cap = fill->depth;
510 :
511 : /* if not enough for batch, update cache */
512 36 : if( cap - ( prod - cons ) < count ) {
513 6 : cons = fill->cached_cons = FD_VOLATILE_CONST( *fill->cons );
514 6 : }
515 :
516 : /* sz is min( available, count ) */
517 36 : ulong sz = cap - ( prod - cons );
518 36 : if( sz > count ) sz = count;
519 :
520 : /* set ring[j] to the specified indices */
521 36 : ulong * ring = fill->frame_ring;
522 36 : uint mask = fill->depth - 1U;
523 87 : for( ulong j = 0; j < sz; ++j ) {
524 51 : uint k = prod & mask;
525 51 : ring[k] = offset[j] & ~frame_mask;
526 :
527 51 : prod++;
528 51 : }
529 :
530 : /* ensure data is visible before producer index */
531 36 : FD_RELEASE();
532 :
533 : /* update producer */
534 36 : fill->cached_prod = prod;
535 36 : FD_VOLATILE( *fill->prod ) = prod;
536 :
537 : /* Be sure to see additional comments below about the TX path.
538 :
539 : XDP by default operates in a mode where if it runs out of buffers
540 : to stick arriving packets into (a/k/a the fill ring is empty) then
541 : the driver will busy spin waiting for the fill ring to be
542 : replenished, so it can pick that up and start writing incoming
543 : packets again.
544 :
545 : Some applications don't like this, because if the driver is pinning
546 : a core waiting for the fill ring, the application might be trying
547 : to use that core to replenish it and never get a chance, leading to
548 : a kind of CPU pinned deadlock.
549 :
550 : So the kernel introduced a new flag to fix this,
551 : XDP_USE_NEED_WAKEUP. The way this flag works is that if it's set,
552 : then the driver won't busy loop when it runs out of fill ring
553 : entries, it'll just park itself and wait for a notification from
554 : the kernel that there are new entries available to use.
555 :
556 : So the application needs to tell the kernel to wake the driver,
557 : when there are new fill ring entries, which it can do by calling
558 : recvmsg on the XSK file descriptor. This is, according to the
559 : kernel docs, a performance win for applications where the driver
560 : would busy loop on its own core as well, since it allows you to
561 : avoid spurious syscalls in the TX path (see the comments on that
562 : below), and we should only rarely need to invoke the syscall here,
563 : since it requires running out of frames in the fill ring.
564 :
565 : That situation describes us (we pin all cores specially), so this
566 : is really just a super minor performance optimization for the TX
567 : path, to sometimes avoid a `sendto` syscall. But anyway...
568 :
569 : This flag requires special driver support to actually be faster. If
570 : the driver does not support then the kernel will default to
571 : rx_need_wakeup always returning false, tx_need_wakeup always
572 : returning true, and the driver busy spinning same as it did before,
573 : the application doesn't need to know about driver support or not.
574 :
575 : Finally, note that none of this is what we actually want. What we
576 : want is to never call any of this stuff, and just have the driver
577 : spin two cores for us permanently, one for the TX path and one for
578 : the RX path. Then we never need to notify, never need to make
579 : syscalls, and the performance would be even better. Sadly, this
580 : is not possible. */
581 36 : if( FD_UNLIKELY( fd_xsk_rx_need_wakeup( xsk ) ) ) {
582 0 : struct msghdr _ignored[ 1 ] = { 0 };
583 0 : if( FD_UNLIKELY( -1==recvmsg( xsk->xsk_fd, _ignored, MSG_DONTWAIT ) ) ) {
584 0 : if( FD_UNLIKELY( errno!=EAGAIN ) ) {
585 0 : FD_LOG_WARNING(( "xsk recvmsg failed xsk_fd=%d (%i-%s)", xsk->xsk_fd, errno, fd_io_strerror( errno ) ));
586 0 : }
587 0 : }
588 0 : }
589 :
590 36 : return sz;
591 36 : }
592 :
593 : ulong
594 : fd_xsk_rx_enqueue2( fd_xsk_t * xsk,
595 : fd_xsk_frame_meta_t * meta,
596 21 : ulong count ) {
597 : /* to make frames available for receive, we enqueue onto the fill ring */
598 :
599 : /* fill ring */
600 21 : fd_ring_desc_t * fill = &xsk->ring_fr;
601 :
602 : /* fetch cached consumer, producer */
603 21 : uint prod = fill->cached_prod;
604 21 : uint cons = fill->cached_cons;
605 :
606 : /* assuming frame sizes are powers of 2 */
607 21 : ulong frame_mask = xsk->params.frame_sz - 1UL;
608 :
609 : /* ring capacity */
610 21 : ulong cap = fill->depth;
611 :
612 : /* if not enough for batch, update cache */
613 21 : if( cap - ( prod - cons ) < count ) {
614 15 : cons = fill->cached_cons = FD_VOLATILE_CONST( *fill->cons );
615 15 : }
616 :
617 : /* sz is min( available, count ) */
618 21 : ulong sz = cap - ( prod - cons );
619 21 : if( sz > count ) sz = count;
620 :
621 : /* set ring[j] to the specified indices */
622 21 : ulong * ring = fill->frame_ring;
623 21 : uint mask = fill->depth - 1;
624 78 : for( ulong j = 0; j < sz; ++j ) {
625 57 : uint k = prod & mask;
626 57 : ring[k] = meta[j].off & ~frame_mask;
627 :
628 57 : prod++;
629 57 : }
630 :
631 : /* ensure data is visible before producer index */
632 21 : FD_RELEASE();
633 :
634 : /* update producer */
635 21 : fill->cached_prod = prod;
636 21 : FD_VOLATILE( *fill->prod ) = prod;
637 :
638 : /* See the corresponding comments in fd_xsk_rx_enqueue */
639 21 : if( FD_UNLIKELY( fd_xsk_rx_need_wakeup( xsk ) ) ) {
640 0 : struct msghdr _ignored[ 1 ] = { 0 };
641 0 : if( FD_UNLIKELY( -1==recvmsg( xsk->xsk_fd, _ignored, MSG_DONTWAIT ) ) ) {
642 0 : if( FD_UNLIKELY( errno!=EAGAIN ) ) {
643 0 : FD_LOG_WARNING(( "xsk recvmsg failed xsk_fd=%d (%i-%s)", xsk->xsk_fd, errno, fd_io_strerror( errno ) ));
644 0 : }
645 0 : }
646 0 : }
647 :
648 21 : return sz;
649 21 : }
650 :
651 : ulong
652 : fd_xsk_tx_enqueue( fd_xsk_t * xsk,
653 : fd_xsk_frame_meta_t * meta,
654 : ulong count,
655 21 : int flush ) {
656 : /* to submit frames for tx, we enqueue onto the tx ring */
657 :
658 : /* tx ring */
659 21 : fd_ring_desc_t * tx = &xsk->ring_tx;
660 :
661 : /* fetch cached consumer, producer */
662 21 : uint prod = tx->cached_prod;
663 21 : uint cons = tx->cached_cons;
664 :
665 : /* ring capacity */
666 21 : uint cap = tx->depth;
667 :
668 : /* if not enough for batch, update cache */
669 21 : if( cap - ( prod - cons ) < (uint)count ) {
670 6 : cons = tx->cached_cons = FD_VOLATILE_CONST( *tx->cons );
671 6 : }
672 :
673 : /* sz is min( available, count ) */
674 21 : uint sz = cap - ( prod - cons );
675 21 : if( sz > (uint)count ) sz = (uint)count;
676 :
677 : /* set ring[j] to the specified indices */
678 21 : struct xdp_desc * ring = tx->packet_ring;
679 21 : uint mask = tx->depth - 1;
680 :
681 21 : TRACE_PACKET( "tx packets ring=%p seq=%u cnt=%u", (void *)ring, prod, sz );
682 69 : for( ulong j = 0; j < sz; ++j ) {
683 48 : ulong k = prod & mask;
684 48 : ring[k].addr = meta[j].off;
685 48 : ring[k].len = meta[j].sz;
686 48 : ring[k].options = 0;
687 :
688 48 : prod++;
689 48 : }
690 :
691 : /* ensure data is visible before producer index */
692 21 : FD_RELEASE();
693 :
694 21 : tx->cached_prod = prod;
695 :
696 21 : if( flush ) {
697 : /* update producer */
698 21 : FD_VOLATILE( *tx->prod ) = prod;
699 :
700 : /* In the TX path of XDP, we always need to call sendto to inform
701 : the kernel there are new messages in the TX ring and it should
702 : wake the driver (how else would they know? there is no kthread
703 : polling for it).
704 :
705 : There is a small optimization: if the XDP_USE_NEED_WAKEUP flag is
706 : provided, then we can ask the kernel if a wakeup is needed. Why
707 : wouldn't it be? Just for a very special case: if the driver is
708 : already about to be woken up, because it has a completion IRQ
709 : already scheduled. The only effect of this is to save a syscall
710 : in certain cases so it's a somewhat minor optimization.
711 :
712 : None the less, we enable XDP_USE_NEED_WAKEUP, so we might as well
713 : check this and save a syscall rather than calling sendto always.
714 :
715 : Notice that XDP_USE_NEED_WAKEUP is an optimization, and it
716 : requires special driver support. In the case that the driver
717 : does not support this, the kernel will default to always
718 : returning true from the need wakeup, so it reverts to the
719 : non-optimized behavior.
720 :
721 : The flush argument here allows us to coalesce transactions
722 : together, and isn't really related to the `sendto` syscall, but
723 : we only call `sendto` if flush is true, because otherwise there
724 : are no new TX messages in the ring and waking up the driver will
725 : have no effect. */
726 21 : if( fd_xsk_tx_need_wakeup( xsk ) ) {
727 0 : if( FD_UNLIKELY( -1==sendto( xsk->xsk_fd, NULL, 0, MSG_DONTWAIT, NULL, 0 ) ) ) {
728 0 : if( FD_UNLIKELY( errno!=EAGAIN ) ) {
729 0 : FD_LOG_WARNING(( "xsk sendto failed xsk_fd=%d (%i-%s)", xsk->xsk_fd, errno, fd_io_strerror( errno ) ));
730 0 : }
731 0 : }
732 0 : }
733 21 : }
734 :
735 21 : return sz;
736 21 : }
737 :
738 : ulong
739 : fd_xsk_rx_complete( fd_xsk_t * xsk,
740 : fd_xsk_frame_meta_t * batch,
741 21 : ulong capacity ) {
742 : /* rx ring */
743 21 : fd_ring_desc_t * rx = &xsk->ring_rx;
744 :
745 21 : uint prod = rx->cached_prod;
746 21 : uint cons = rx->cached_cons;
747 :
748 : /* how many frames are available? */
749 21 : uint avail = prod - cons;
750 :
751 : /* should we update the cache */
752 21 : if( (ulong)avail < capacity ) {
753 : /* we update cons (and keep cache up to date)
754 : they update prod
755 : so only need to fetch actual prod */
756 21 : prod = rx->cached_prod = FD_VOLATILE_CONST( *rx->prod );
757 21 : avail = prod - cons;
758 21 : }
759 :
760 21 : ulong sz = avail;
761 21 : if( sz > capacity ) sz = capacity;
762 :
763 21 : uint mask = rx->depth - 1;
764 21 : struct xdp_desc * ring = rx->packet_ring;
765 :
766 21 : TRACE_PACKET( "rx packets ring=%p seq=%u cnt=%lu", (void *)ring, cons, sz );
767 102 : for( ulong j = 0; j < sz; ++j ) {
768 81 : ulong k = cons & mask;
769 81 : batch[j].off = ring[k].addr;
770 81 : batch[j].sz = ring[k].len;
771 81 : batch[j].flags = 0;
772 :
773 81 : cons++;
774 81 : }
775 :
776 21 : FD_RELEASE();
777 :
778 21 : rx->cached_cons = cons;
779 21 : FD_VOLATILE( *rx->cons ) = cons;
780 :
781 21 : return sz;
782 21 : }
783 :
784 : ulong
785 27 : fd_xsk_tx_complete( fd_xsk_t * xsk, ulong * batch, ulong capacity ) {
786 : /* cr ring */
787 27 : fd_ring_desc_t * cr = &xsk->ring_cr;
788 :
789 27 : uint prod = cr->cached_prod;
790 27 : uint cons = cr->cached_cons;
791 :
792 : /* how many frames are available? */
793 27 : uint avail = prod - cons;
794 :
795 : /* should we update the cache */
796 27 : if( (ulong)avail < capacity ) {
797 : /* we update cons (and keep cache up to date)
798 : they update prod
799 : so only need to fetch actual prod */
800 18 : prod = cr->cached_prod = FD_VOLATILE_CONST( *cr->prod );
801 18 : avail = prod - cons;
802 18 : }
803 :
804 27 : ulong sz = avail;
805 27 : if( sz > capacity ) sz = capacity;
806 :
807 27 : uint mask = cr->depth - 1;
808 27 : ulong * ring = cr->frame_ring;
809 102 : for( ulong j = 0; j < sz; ++j ) {
810 75 : ulong k = cons & mask;
811 75 : batch[j] = ring[k];
812 :
813 75 : cons++;
814 75 : }
815 :
816 27 : FD_RELEASE();
817 :
818 27 : cr->cached_cons = cons;
819 27 : FD_VOLATILE( *cr->cons ) = cons;
820 :
821 27 : return sz;
822 27 : }
823 :
824 : ulong
825 : fd_xsk_tx_complete2( fd_xsk_t * xsk,
826 : fd_xsk_frame_meta_t * batch,
827 12 : ulong capacity ) {
828 : /* cr ring */
829 12 : fd_ring_desc_t * cr = &xsk->ring_cr;
830 :
831 12 : uint prod = cr->cached_prod;
832 12 : uint cons = cr->cached_cons;
833 :
834 : /* how many frames are available? */
835 12 : uint avail = prod - cons;
836 :
837 : /* should we update the cache */
838 12 : if( (ulong)avail < capacity ) {
839 : /* we update cons (and keep cache up to date)
840 : they update prod
841 : so only need to fetch actual prod */
842 12 : prod = cr->cached_prod = FD_VOLATILE_CONST( *cr->prod );
843 12 : avail = prod - cons;
844 12 : }
845 :
846 12 : ulong sz = avail;
847 12 : if( sz > capacity ) sz = capacity;
848 :
849 12 : uint mask = cr->depth - 1;
850 12 : ulong * ring = cr->frame_ring;
851 63 : for( ulong j = 0; j < sz; ++j ) {
852 51 : ulong k = cons & mask;
853 51 : batch[j].off = ring[k];
854 :
855 51 : cons++;
856 51 : }
857 :
858 12 : FD_RELEASE();
859 :
860 12 : cr->cached_cons = cons;
861 12 : FD_VOLATILE( *cr->cons ) = cons;
862 :
863 12 : return sz;
864 12 : }
865 :
866 : FD_FN_CONST fd_xsk_params_t const *
867 6 : fd_xsk_get_params( fd_xsk_t const * xsk ) {
868 6 : return &xsk->params;
869 6 : }
|