LCOV - code coverage report
Current view: top level - waltz/xdp - fd_xsk_aio.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 198 238 83.2 %
Date: 2025-01-08 12:08:44 Functions: 11 11 100.0 %

          Line data    Source code
       1             : #if !defined(__linux__)
       2             : #error "fd_xsk_aio requires Linux operating system with XDP support"
       3             : #endif
       4             : 
       5             : #include "../../util/fd_util.h"
       6             : #include "fd_xsk_aio_private.h"
       7             : 
       8             : /* Forward declarations */
       9             : static int
      10             : fd_xsk_aio_send( void *                    ctx,
      11             :                  fd_aio_pkt_info_t const * batch,
      12             :                  ulong                     batch_cnt,
      13             :                  ulong *                   opt_batch_idx,
      14             :                  int                       flush );
      15             : 
      16             : ulong
      17           6 : fd_xsk_aio_align( void ) {
      18           6 :   return FD_XSK_AIO_ALIGN;
      19           6 : }
      20             : 
      21             : FD_FN_CONST ulong
      22             : fd_xsk_aio_footprint( ulong tx_depth,
      23          21 :                       ulong pkt_cnt ) {
      24          21 :   if( FD_UNLIKELY( tx_depth==0UL ) ) return 0UL;
      25          15 :   if( FD_UNLIKELY( pkt_cnt ==0UL ) ) return 0UL;
      26             : 
      27           9 :   ulong sz =      1UL*sizeof( fd_xsk_aio_t        )
      28           9 :            +  pkt_cnt*sizeof( fd_xsk_frame_meta_t )
      29           9 :            +  pkt_cnt*sizeof( fd_aio_pkt_info_t   )
      30           9 :            + tx_depth*sizeof( ulong               );
      31             : 
      32           9 :   sz = fd_ulong_align_up( sz, FD_XSK_AIO_ALIGN );
      33             :   /* assert( sz%FD_XSK_AIO_ALIGN==0UL ) */
      34           9 :   return sz;
      35          15 : }
      36             : 
      37             : void *
      38             : fd_xsk_aio_new( void * mem,
      39             :                 ulong  tx_depth,
      40          15 :                 ulong  pkt_cnt ) {
      41             : 
      42          15 :   if( FD_UNLIKELY( !mem ) ) {
      43           3 :     FD_LOG_WARNING(( "NULL mem" ));
      44           3 :     return NULL;
      45           3 :   }
      46             : 
      47          12 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_xsk_aio_align() ) ) ) {
      48           3 :     FD_LOG_WARNING(( "misaligned mem" ));
      49           3 :     return NULL;
      50           3 :   }
      51             : 
      52           9 :   ulong footprint = fd_xsk_aio_footprint( tx_depth, pkt_cnt );
      53           9 :   if( FD_UNLIKELY( !footprint ) ) {
      54           6 :     FD_LOG_WARNING(( "invalid footprint for tx_depth (%lu), pkt_cnt (%lu)",
      55           6 :                       tx_depth, pkt_cnt ));
      56           6 :     return NULL;
      57           6 :   }
      58             : 
      59           3 :   fd_memset( mem, 0, footprint );
      60             : 
      61             :   /* Allocate objects in fd_xsk_aio_t */
      62             : 
      63           3 :   fd_xsk_aio_t * xsk_aio = (fd_xsk_aio_t *)mem;
      64             : 
      65             :   /* Assumes alignment of `fd_xsk_aio_t` matches alignment of
      66             :      `fd_xsk_frame_meta_t` and `fd_aio_pkt_info_t`. */
      67             : 
      68           3 :   ulong meta_off     =                    sizeof(fd_xsk_aio_t       );
      69           3 :   ulong pkt_off      = meta_off + pkt_cnt*sizeof(fd_xsk_frame_meta_t);
      70           3 :   ulong tx_stack_off = pkt_off  + pkt_cnt*sizeof(fd_aio_pkt_info_t  );
      71             : 
      72           3 :   xsk_aio->pkt_depth    = pkt_cnt;
      73           3 :   xsk_aio->tx_depth     = tx_depth;
      74           3 :   xsk_aio->meta_off     = meta_off;
      75           3 :   xsk_aio->pkt_off      = pkt_off;
      76           3 :   xsk_aio->tx_stack_off = tx_stack_off;
      77             : 
      78           3 :   xsk_aio->metrics.tx_cnt = 0UL;
      79           3 :   xsk_aio->metrics.tx_sz  = 0UL;
      80           3 :   xsk_aio->metrics.rx_cnt = 0UL;
      81           3 :   xsk_aio->metrics.rx_sz  = 0UL;
      82             : 
      83             :   /* Mark object as valid */
      84             : 
      85           3 :   FD_COMPILER_MFENCE();
      86           3 :   FD_VOLATILE( xsk_aio->magic ) = FD_XSK_AIO_MAGIC;
      87           3 :   FD_COMPILER_MFENCE();
      88             : 
      89           3 :   return xsk_aio;
      90           9 : }
      91             : 
      92             : 
      93             : fd_xsk_aio_t *
      94             : fd_xsk_aio_join( void *     shxsk_aio,
      95           9 :                  fd_xsk_t * xsk ) {
      96             : 
      97           9 :   if( FD_UNLIKELY( !shxsk_aio ) ) {
      98           0 :     FD_LOG_WARNING(( "NULL shxsk_aio" ));
      99           0 :     return NULL;
     100           0 :   }
     101             : 
     102           9 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shxsk_aio, fd_xsk_aio_align() ) ) ) {
     103           0 :     FD_LOG_WARNING(( "misaligned shxsk_aio" ));
     104           0 :     return NULL;
     105           0 :   }
     106             : 
     107           9 :   if( FD_UNLIKELY( !xsk ) ) {
     108           3 :     FD_LOG_WARNING(( "NULL xsk" ));
     109           3 :     return NULL;
     110           3 :   }
     111             : 
     112             :   /* Validate memory layout */
     113             : 
     114           6 :   fd_xsk_aio_t * xsk_aio = (fd_xsk_aio_t *)shxsk_aio;
     115             : 
     116           6 :   if( FD_UNLIKELY( xsk_aio->magic!=FD_XSK_AIO_MAGIC ) ) {
     117           3 :     FD_LOG_WARNING(( "bad magic (not an fd_xsk_aio_t?)" ));
     118           3 :     return NULL;
     119           3 :   }
     120             : 
     121           3 :   if( FD_UNLIKELY( xsk_aio->xsk ) ) {
     122           0 :     FD_LOG_WARNING(( "xsk_aio in an unclean state, resetting" ));
     123           0 :     xsk_aio->xsk = NULL;
     124             :     /* continue */
     125           0 :   }
     126             : 
     127           3 :   fd_xsk_params_t const * params = fd_xsk_get_params( xsk );
     128             : 
     129           3 :   if( FD_UNLIKELY( params->tx_depth != xsk_aio->tx_depth ) ) {
     130           0 :     FD_LOG_WARNING(( "incompatible xsk (tx_depth=%lu) and xsk_aio (tx_depth=%lu)",
     131           0 :                      params->tx_depth, xsk_aio->tx_depth ));
     132           0 :     return NULL;
     133           0 :   }
     134             : 
     135             :   /* Reset state */
     136             : 
     137           3 :   xsk_aio->xsk = xsk;
     138           3 :   fd_aio_delete( &xsk_aio->rx );
     139           3 :   fd_aio_delete( &xsk_aio->tx );
     140             : 
     141           3 :   xsk_aio->frame_mem      = fd_xsk_umem_laddr( xsk );
     142           3 :   xsk_aio->frame_sz       = params->frame_sz;
     143           3 :   xsk_aio->rx_off         = 0;
     144           3 :   xsk_aio->tx_off         = params->rx_depth;
     145           3 :   xsk_aio->tx_stack       = fd_xsk_aio_tx_stack( xsk_aio );
     146           3 :   xsk_aio->tx_stack_depth = params->tx_depth;
     147           3 :   xsk_aio->tx_top         = 0;
     148             : 
     149             :   /* Setup local TX */
     150             : 
     151           3 :   fd_aio_t * tx = fd_aio_join( fd_aio_new( &xsk_aio->tx, xsk_aio, fd_xsk_aio_send ) );
     152           3 :   if( FD_UNLIKELY( !tx ) ) {
     153           0 :     FD_LOG_WARNING(( "Failed to join local tx aio" ));
     154           0 :     return NULL;
     155           0 :   }
     156             : 
     157             :   /* Reset RX callback (laddr pointers to external object) */
     158             : 
     159           3 :   memset( &xsk_aio->rx, 0, sizeof(fd_aio_t) );
     160             : 
     161             :   /* Enqueue frames to RX ring for receive (via fill ring) */
     162             : 
     163           3 :   ulong frame_off = xsk_aio->rx_off;
     164           3 :   ulong frame_sz  = params->frame_sz;
     165           3 :   ulong rx_depth  = params->rx_depth;
     166           3 :   ulong tx_depth  = params->tx_depth;
     167             : 
     168          27 :   for( ulong j=0; j<rx_depth; j++ ) {
     169          24 :     ulong enq_cnt =  fd_xsk_rx_enqueue( xsk, &frame_off, 1U );
     170          24 :     frame_off     += frame_sz;
     171             : 
     172          24 :     if( FD_UNLIKELY( !enq_cnt ) ) {
     173           0 :       FD_LOG_WARNING(( "fd_xsk_rx_enqueue() failed, was fd_xsk_t properly flushed?" ));
     174           0 :       return NULL;
     175           0 :     }
     176          24 :   }
     177             : 
     178             :   /* Add all TX frames to the free stack */
     179             : 
     180           3 :   frame_off = xsk_aio->tx_off*frame_sz;
     181          27 :   for( ulong j=0; j<tx_depth; j++ ) {
     182          24 :     xsk_aio->tx_stack[xsk_aio->tx_top] =  frame_off;
     183          24 :                       xsk_aio->tx_top++;
     184          24 :     frame_off                          += frame_sz;
     185          24 :   }
     186             : 
     187           3 :   return (fd_xsk_aio_t *)xsk_aio;
     188           3 : }
     189             : 
     190             : 
     191             : void *
     192           3 : fd_xsk_aio_leave( fd_xsk_aio_t * xsk_aio ) {
     193             : 
     194           3 :   if( FD_UNLIKELY( !xsk_aio ) ) {
     195           0 :     FD_LOG_WARNING(( "NULL xsk_aio" ));
     196           0 :     return NULL;
     197           0 :   }
     198             : 
     199           3 :   xsk_aio->xsk = NULL;
     200             : 
     201           3 :   fd_aio_delete( fd_aio_leave( &xsk_aio->rx ) );
     202           3 :   fd_aio_delete( fd_aio_leave( &xsk_aio->tx ) );
     203             : 
     204           3 :   return (void *)xsk_aio;
     205           3 : }
     206             : 
     207             : void *
     208           6 : fd_xsk_aio_delete( void * shxsk_aio ) {
     209             : 
     210           6 :   if( FD_UNLIKELY( !shxsk_aio ) ) {
     211           0 :     FD_LOG_WARNING(( "NULL shxsk_aio" ));
     212           0 :     return NULL;
     213           0 :   }
     214             : 
     215           6 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shxsk_aio, fd_xsk_aio_align() ) ) ) {
     216           0 :     FD_LOG_WARNING(( "misaligned xsk_aio" ));
     217           0 :     return NULL;
     218           0 :   }
     219             : 
     220           6 :   fd_xsk_aio_t * xsk_aio = (fd_xsk_aio_t *)shxsk_aio;
     221             : 
     222           6 :   if( FD_UNLIKELY( xsk_aio->magic!=FD_XSK_AIO_MAGIC ) ) {
     223           3 :     FD_LOG_WARNING(( "bad magic" ));
     224           3 :     return NULL;
     225           3 :   }
     226             : 
     227           3 :   FD_COMPILER_MFENCE();
     228           3 :   FD_VOLATILE( xsk_aio->magic ) = 0UL;
     229           3 :   FD_COMPILER_MFENCE();
     230             : 
     231           3 :   return (void *)xsk_aio;
     232           6 : }
     233             : 
     234             : 
     235             : fd_aio_t const *
     236           3 : fd_xsk_aio_get_tx( fd_xsk_aio_t const * xsk_aio ) {
     237           3 :   return &xsk_aio->tx;
     238           3 : }
     239             : 
     240             : void
     241             : fd_xsk_aio_set_rx( fd_xsk_aio_t *   xsk_aio,
     242           3 :                    fd_aio_t const * aio ) {
     243           3 :   fd_memcpy( &xsk_aio->rx, aio, sizeof(fd_aio_t) );
     244           3 : }
     245             : 
     246             : 
     247             : int
     248           9 : fd_xsk_aio_service( fd_xsk_aio_t * xsk_aio ) {
     249           9 :   fd_xsk_t *            xsk         = xsk_aio->xsk;
     250           9 :   fd_aio_t *            ingress     = &xsk_aio->rx;
     251           9 :   fd_xsk_frame_meta_t * meta        = fd_xsk_aio_meta( xsk_aio );
     252           9 :   fd_aio_pkt_info_t *   pkt         = fd_xsk_aio_pkts( xsk_aio );
     253           9 :   ulong                 pkt_depth   = xsk_aio->pkt_depth;
     254           9 :   ulong                 frame_laddr = (ulong)fd_xsk_umem_laddr( xsk_aio->xsk );
     255             : 
     256             :   /* try completing receives */
     257           9 :   ulong rx_avail = fd_xsk_rx_complete( xsk, meta, pkt_depth );
     258             : 
     259             :   /* forward to aio */
     260           9 :   if( rx_avail ) {
     261          36 :     for( ulong j=0; j<rx_avail; j++ ) {
     262          30 :       pkt[j] = (fd_aio_pkt_info_t) {
     263          30 :         .buf    = (void *)(frame_laddr + meta[j].off),
     264          30 :         .buf_sz = (ushort)meta[j].sz
     265          30 :       };
     266          30 :     }
     267             : 
     268           6 :     fd_aio_send( ingress, pkt, rx_avail, NULL, 1 );
     269             :     /* TODO frames may not all be processed at this point
     270             :        we should count them, and possibly buffer them */
     271             : 
     272           6 :     xsk_aio->metrics.rx_cnt += rx_avail;
     273          36 :     for( ulong j=0; j<rx_avail; j++ ) xsk_aio->metrics.rx_sz += meta[j].sz;
     274             : 
     275             :     /* return frames to rx ring */
     276           6 :     ulong enq_rc = fd_xsk_rx_enqueue2( xsk, meta, rx_avail );
     277           6 :     if( FD_UNLIKELY( enq_rc < rx_avail ) ) {
     278             :       /* keep trying indefinitely */
     279             :       /* TODO consider adding a timeout */
     280           0 :       ulong j = enq_rc;
     281           0 :       while( rx_avail > j ) {
     282           0 :         ulong enq_rc = fd_xsk_rx_enqueue2( xsk, meta + j, rx_avail - j );
     283           0 :         j += enq_rc;
     284           0 :       }
     285           0 :     }
     286           6 :   }
     287             : 
     288             :   /* any tx to complete? */
     289           9 :   ulong tx_completed = fd_xsk_tx_complete( xsk,
     290           9 :                                            xsk_aio->tx_stack       + xsk_aio->tx_top,
     291           9 :                                            xsk_aio->tx_stack_depth - xsk_aio->tx_top );
     292           9 :   xsk_aio->tx_top += tx_completed;
     293             : 
     294           9 :   return rx_avail || tx_completed;
     295           9 : }
     296             : 
     297             : 
     298             : static void
     299           6 : fd_xsk_aio_tx_complete( fd_xsk_aio_t * xsk_aio ) {
     300           6 :   ulong tx_completed = fd_xsk_tx_complete( xsk_aio->xsk,
     301           6 :                                            xsk_aio->tx_stack       + xsk_aio->tx_top,
     302           6 :                                            xsk_aio->tx_stack_depth - xsk_aio->tx_top );
     303           6 :   xsk_aio->tx_top += tx_completed;
     304           6 : }
     305             : 
     306             : 
     307             : /* fd_xsk_aio_send is an aio callback that transmits the given batch of
     308             :    packets through the XSK. */
     309             : static int
     310             : fd_xsk_aio_send( void *                    ctx,
     311             :                  fd_aio_pkt_info_t const * pkt,
     312             :                  ulong                     pkt_cnt,
     313             :                  ulong *                   opt_batch_idx,
     314           9 :                  int                       flush ) {
     315             : 
     316           9 :   fd_xsk_aio_t * xsk_aio = (fd_xsk_aio_t*)ctx;
     317           9 :   fd_xsk_t *     xsk     = xsk_aio->xsk;
     318             : 
     319           9 :   if( FD_UNLIKELY( pkt_cnt==0UL ) ) {
     320           3 :     if( flush ) {
     321           3 :       fd_xsk_frame_meta_t meta[1] = {{0}};
     322           3 :       ulong sent_cnt = fd_xsk_tx_enqueue( xsk, meta, 0, 1 );
     323           3 :       (void)sent_cnt;
     324           3 :     }
     325           3 :     return FD_AIO_SUCCESS;
     326           3 :   }
     327             : 
     328             :   /* Check if any previous send operations completed
     329             :      to reclaim transmit frames. */
     330           6 :   fd_xsk_aio_tx_complete( xsk_aio );
     331             : 
     332             :   /* Refuse to send more packets than we have metadata frames */
     333           6 :   ulong       batch_cnt = pkt_cnt; /* Number of frames to attempt to send */
     334           6 :   ulong const pkt_depth = xsk_aio->pkt_depth;
     335           6 :   if( FD_UNLIKELY( batch_cnt>pkt_depth ) )
     336           0 :     batch_cnt = pkt_depth;
     337             : 
     338             :   /* Find UMEM and meta params */
     339           6 :   uchar *               frame_mem  = xsk_aio->frame_mem;          /* UMEM region     */
     340           6 :   ulong                 frame_sz   = xsk_aio->frame_sz;           /* UMEM frame sz   */
     341           6 :   fd_xsk_frame_meta_t * meta       = fd_xsk_aio_meta( xsk_aio );  /* frame meta heap */
     342             : 
     343             :   /* Number of packets pending fd_xsk_tx_enqueue */
     344           6 :   ulong pending_cnt=0;
     345             : 
     346             :   /* XSK send prepare loop.  Terminates when the largest possible tx
     347             :      batch has been formed.  meta[0..pkt_idx] is populated with frames
     348             :      to be handed off to fd_xsk_tx_enqueue. */
     349           6 :   ulong pkt_idx;
     350          30 :   for( pkt_idx=0; pkt_idx<batch_cnt; ++pkt_idx ) {
     351             :     /* Pop a TX frame from our stack */
     352          27 :     if( FD_UNLIKELY( !xsk_aio->tx_top ) )
     353           3 :       break;
     354          24 :     --xsk_aio->tx_top;
     355          24 :     ulong offset = xsk_aio->tx_stack[xsk_aio->tx_top];
     356             : 
     357          24 :     uchar const * data    = pkt[ pkt_idx ].buf;
     358          24 :     ulong         data_sz = pkt[ pkt_idx ].buf_sz;
     359             : 
     360             :     /* MTU check */
     361          24 :     if( FD_UNLIKELY( data_sz>frame_sz ) ) {
     362           0 :       FD_LOG_WARNING(( "frame too large for xsk ring (%lu > %lu), aborting send",
     363           0 :                        data_sz, frame_sz ));
     364           0 :       if( opt_batch_idx ) *opt_batch_idx = 0UL;
     365           0 :       return FD_AIO_ERR_INVAL;
     366           0 :     }
     367             : 
     368             :     /* Copy aio packet payload into TX frame */
     369          24 :     fd_memcpy( frame_mem + offset, data, data_sz );
     370             : 
     371             :     /* Write XSK meta */
     372          24 :     meta[pending_cnt] = (fd_xsk_frame_meta_t){
     373          24 :       .off   = offset,
     374          24 :       .sz    = (uint)data_sz,
     375          24 :       .flags = 0U
     376          24 :     };
     377          24 :     pending_cnt++;
     378          24 :   }
     379             : 
     380             :   /* Enqueue send */
     381           6 :   ulong sent_cnt=0UL;
     382           6 :   if( FD_LIKELY( pending_cnt>0UL || flush ) )
     383           6 :     sent_cnt = fd_xsk_tx_enqueue( xsk, meta, pending_cnt, flush );
     384             : 
     385           6 :   xsk_aio->metrics.tx_cnt += sent_cnt;
     386          30 :   for( ulong j=0; j<sent_cnt; j++ ) xsk_aio->metrics.tx_sz += meta[j].sz;
     387             : 
     388             :   /* Sent less than user requested? */
     389           6 :   if( FD_UNLIKELY( sent_cnt<pkt_cnt ) ) {
     390           3 :     if( FD_LIKELY( opt_batch_idx ) ) *opt_batch_idx = sent_cnt;
     391           3 :     return FD_AIO_ERR_AGAIN;
     392           3 :   }
     393             : 
     394           3 :   return FD_AIO_SUCCESS;
     395           6 : }

Generated by: LCOV version 1.14