LCOV - code coverage report
Current view: top level - disco/quic - fd_tpu_reasm.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 165 247 66.8 %
Date: 2025-01-08 12:08:44 Functions: 12 13 92.3 %

          Line data    Source code
       1             : #include "fd_tpu.h"
       2             : #include "fd_tpu_reasm_private.h"
       3             : 
       4             : FD_FN_CONST ulong
       5           6 : fd_tpu_reasm_align( void ) {
       6           6 :   return alignof(fd_tpu_reasm_t);
       7           6 : }
       8             : 
       9             : FD_FN_CONST ulong
      10             : fd_tpu_reasm_footprint( ulong depth,
      11          24 :                         ulong burst ) {
      12             : 
      13          24 :   if( FD_UNLIKELY(
      14          24 :       ( fd_ulong_popcnt( depth )!=1 ) |
      15          24 :       ( depth>0x7fffffffUL          ) |
      16          24 :       ( burst<2                     ) |
      17          24 :       ( burst>0x7fffffffUL          ) ) )
      18          15 :     return 0UL;
      19             : 
      20           9 :   ulong slot_cnt  = depth+burst;
      21           9 :   ulong chain_cnt = fd_tpu_reasm_map_chain_cnt_est( slot_cnt );
      22           9 :   return FD_LAYOUT_FINI( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_APPEND( FD_LAYOUT_INIT,
      23          24 :       fd_tpu_reasm_align(),         sizeof(fd_tpu_reasm_t)                  ), /* hdr       */
      24          24 :       alignof(uint),                depth   *sizeof(uint)                   ), /* pub_slots */
      25          24 :       alignof(fd_tpu_reasm_slot_t), slot_cnt*sizeof(fd_tpu_reasm_slot_t)    ), /* slots     */
      26          24 :       fd_tpu_reasm_map_align(),     fd_tpu_reasm_map_footprint( chain_cnt ) ), /* map       */
      27          24 :       fd_tpu_reasm_align() );
      28             : 
      29          24 : }
      30             : 
      31             : void *
      32             : fd_tpu_reasm_new( void * shmem,
      33             :                   ulong  depth,
      34             :                   ulong  burst,
      35             :                   ulong  orig,
      36           3 :                   void * dcache ) {
      37             : 
      38           3 :   if( FD_UNLIKELY( !shmem ) ) return NULL;
      39           3 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, FD_TPU_REASM_ALIGN ) ) ) return NULL;
      40           3 :   if( FD_UNLIKELY( !fd_tpu_reasm_footprint( depth, burst ) ) ) return NULL;
      41           3 :   if( FD_UNLIKELY( orig > FD_FRAG_META_ORIG_MAX ) ) return NULL;
      42             : 
      43           3 :   ulong req_data_sz = fd_tpu_reasm_req_data_sz( depth, burst );
      44           3 :   if( FD_UNLIKELY( fd_dcache_data_sz( dcache )<req_data_sz ) ) {
      45           0 :     FD_LOG_WARNING(( "dcache data_sz is too small (need %lu, have %lu)", req_data_sz, fd_dcache_data_sz( dcache ) ));
      46           0 :     return NULL;
      47           0 :   }
      48             : 
      49             :   /* Memory layout */
      50             : 
      51           3 :   ulong slot_cnt = depth+burst;
      52           3 :   if( FD_UNLIKELY( !slot_cnt ) ) return NULL;
      53           3 :   ulong chain_cnt = fd_tpu_reasm_map_chain_cnt_est( slot_cnt );
      54             : 
      55           3 :   FD_SCRATCH_ALLOC_INIT( l, shmem );
      56           3 :   fd_tpu_reasm_t *      reasm     = FD_SCRATCH_ALLOC_APPEND( l, fd_tpu_reasm_align(),         sizeof(fd_tpu_reasm_t)                  );
      57           3 :   ulong *               pub_slots = FD_SCRATCH_ALLOC_APPEND( l, alignof(uint),                depth*sizeof(uint)                      );
      58           3 :   fd_tpu_reasm_slot_t * slots     = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_tpu_reasm_slot_t), slot_cnt*sizeof(fd_tpu_reasm_slot_t)    );
      59           3 :   void *                map_mem   = FD_SCRATCH_ALLOC_APPEND( l, fd_tpu_reasm_map_align(),     fd_tpu_reasm_map_footprint( chain_cnt ) );
      60           3 :   FD_SCRATCH_ALLOC_FINI( l, fd_tpu_reasm_align() );
      61             : 
      62           3 :   fd_memset( reasm, 0, sizeof(fd_tpu_reasm_t) );
      63           3 :   fd_memset( slots, 0, burst*sizeof(fd_tpu_reasm_slot_t) );
      64             : 
      65           3 :   fd_tpu_reasm_map_t * map = fd_tpu_reasm_map_join( fd_tpu_reasm_map_new( map_mem, chain_cnt, 0UL ) );
      66           3 :   if( FD_UNLIKELY( !map ) ) {
      67           0 :     FD_LOG_WARNING(( "fd_tpu_reasm_map_new failed" ));
      68           0 :     return NULL;
      69           0 :   }
      70             : 
      71             :   /* Initialize reasm object */
      72             : 
      73           3 :   reasm->slots_off     = (ulong)( (uchar *)slots     - (uchar *)reasm );
      74           3 :   reasm->pub_slots_off = (ulong)( (uchar *)pub_slots - (uchar *)reasm );
      75           3 :   reasm->map_off       = (ulong)( (uchar *)map       - (uchar *)reasm );
      76           3 :   reasm->dcache        = dcache;
      77             : 
      78           3 :   reasm->depth    = (uint)depth;
      79           3 :   reasm->burst    = (uint)burst;
      80           3 :   reasm->head     = (uint)slot_cnt-1U;
      81           3 :   reasm->tail     = (uint)depth;
      82           3 :   reasm->slot_cnt = (uint)slot_cnt;
      83           3 :   reasm->orig     = (ushort)orig;
      84             : 
      85             :   /* Initial slot distribution */
      86             : 
      87           3 :   fd_tpu_reasm_reset( reasm );
      88             : 
      89           3 :   FD_COMPILER_MFENCE();
      90           3 :   reasm->magic = FD_TPU_REASM_MAGIC;
      91           3 :   FD_COMPILER_MFENCE();
      92             : 
      93           3 :   return reasm;
      94           3 : }
      95             : 
      96             : void
      97           6 : fd_tpu_reasm_reset( fd_tpu_reasm_t * reasm ) {
      98             : 
      99           6 :   uint depth    = reasm->depth;
     100           6 :   uint burst    = reasm->burst;
     101           6 :   uint node_cnt = depth+burst;
     102             : 
     103           6 :   fd_tpu_reasm_slot_t * slots     = fd_tpu_reasm_slots_laddr( reasm );
     104           6 :   uint *                pub_slots = fd_tpu_reasm_pub_slots_laddr( reasm );
     105           6 :   fd_tpu_reasm_map_t *  map       = fd_tpu_reasm_map_laddr( reasm );
     106             : 
     107             :   /* The initial state moves the first 'depth' slots to the mcache (PUB)
     108             :      and leaves the rest as FREE. */
     109             : 
     110         774 :   for( uint j=0U; j<depth; j++ ) {
     111         768 :     fd_tpu_reasm_slot_t * slot = slots + j;
     112         768 :     slot->k.state     = FD_TPU_REASM_STATE_PUB;
     113         768 :     slot->k.conn_uid  = ULONG_MAX;
     114         768 :     slot->k.stream_id = 0xffffffffffff;
     115         768 :     slot->k.sz        = 0;
     116         768 :     slot->chain_next = UINT_MAX;
     117         768 :     pub_slots[ j ]   = j;
     118         768 :   }
     119         774 :   for( uint j=depth; j<node_cnt; j++ ) {
     120         768 :     fd_tpu_reasm_slot_t * slot = slots + j;
     121         768 :     slot->k.state     = FD_TPU_REASM_STATE_FREE;
     122         768 :     slot->k.conn_uid  = ULONG_MAX;
     123         768 :     slot->k.stream_id = 0xffffffffffff;
     124         768 :     slot->k.sz        = 0;
     125         768 :     slot->lru_prev    = fd_uint_if( j<node_cnt-1U, j+1U, UINT_MAX );
     126         768 :     slot->lru_next    = fd_uint_if( j>depth,       j-1U, UINT_MAX );
     127         768 :     slot->chain_next  = UINT_MAX;
     128         768 :   }
     129             : 
     130             :   /* Clear the entire hash map */
     131             : 
     132           6 :   ulong  chain_cnt = fd_tpu_reasm_map_chain_cnt( map );
     133           6 :   uint * chains    = fd_tpu_reasm_map_private_chain( map );
     134         774 :   for( uint j=0U; j<chain_cnt; j++ ) {
     135         768 :     chains[ j ] = UINT_MAX;
     136         768 :   }
     137           6 : }
     138             : 
     139             : fd_tpu_reasm_t *
     140           3 : fd_tpu_reasm_join( void * shreasm ) {
     141           3 :   fd_tpu_reasm_t * reasm = shreasm;
     142           3 :   if( FD_UNLIKELY( reasm->magic != FD_TPU_REASM_MAGIC ) ) {
     143           0 :     FD_LOG_WARNING(( "bad magic" ));
     144           0 :     return NULL;
     145           0 :   }
     146           3 :   return reasm;
     147           3 : }
     148             : 
     149             : void *
     150           3 : fd_tpu_reasm_leave( fd_tpu_reasm_t * reasm ) {
     151           3 :   return reasm;
     152           3 : }
     153             : 
     154             : void *
     155           3 : fd_tpu_reasm_delete( void * shreasm ) {
     156           3 :   fd_tpu_reasm_t * reasm = shreasm;
     157           3 :   if( FD_UNLIKELY( !reasm ) ) return NULL;
     158           3 :   reasm->magic = 0UL;
     159           3 :   return shreasm;
     160           3 : }
     161             : 
     162             : fd_tpu_reasm_slot_t *
     163             : fd_tpu_reasm_query( fd_tpu_reasm_t * reasm,
     164             :                     ulong            conn_uid,
     165       76509 :                     ulong            stream_id ) {
     166       76509 :   return smap_query( reasm, conn_uid, stream_id );
     167       76509 : }
     168             : 
     169             : fd_tpu_reasm_slot_t *
     170             : fd_tpu_reasm_prepare( fd_tpu_reasm_t * reasm,
     171             :                       ulong            conn_uid,
     172             :                       ulong            stream_id,
     173       76125 :                       long             tsorig ) {
     174       76125 :   fd_tpu_reasm_slot_t * slot = slotq_pop_tail( reasm );
     175       76125 :   smap_remove( reasm, slot );
     176       76125 :   slot_begin( slot );
     177       76125 :   slotq_push_head( reasm, slot );
     178       76125 :   slot->k.conn_uid  = conn_uid;
     179       76125 :   slot->k.stream_id = stream_id & FD_TPU_REASM_SID_MASK;
     180       76125 :   smap_insert( reasm, slot );
     181       76125 :   slot->tsorig_comp = (uint)fd_frag_meta_ts_comp( tsorig );
     182       76125 :   return slot;
     183       76125 : }
     184             : 
     185             : int
     186             : fd_tpu_reasm_frag( fd_tpu_reasm_t *      reasm,
     187             :                    fd_tpu_reasm_slot_t * slot,
     188             :                    uchar const *         data,
     189             :                    ulong                 data_sz,
     190       65841 :                    ulong                 data_off ) {
     191             : 
     192       65841 :   if( FD_UNLIKELY( slot->k.state != FD_TPU_REASM_STATE_BUSY ) )
     193           0 :     return FD_TPU_REASM_ERR_STATE;
     194             : 
     195       65841 :   ulong slot_idx = slot_get_idx( reasm, slot );
     196       65841 :   ulong mtu      = FD_TPU_REASM_MTU;
     197       65841 :   ulong sz0      = slot->k.sz;
     198             : 
     199       65841 :   if( FD_UNLIKELY( data_off>sz0 ) ) {
     200           0 :     fd_tpu_reasm_cancel( reasm, slot );
     201           0 :     return FD_TPU_REASM_ERR_SKIP;
     202           0 :   }
     203             : 
     204       65841 :   if( FD_UNLIKELY( data_off<sz0 ) ) {
     205             :     /* Fragment partially known ... should not happen */
     206           0 :     ulong skip = sz0 - data_off;
     207           0 :     if( skip>data_sz ) return FD_TPU_REASM_SUCCESS;
     208           0 :     data_off  += skip;
     209           0 :     data_sz   -= skip;
     210           0 :     data      += skip;
     211           0 :   }
     212             : 
     213       65841 :   ulong sz1 = sz0 + data_sz;
     214       65841 :   if( FD_UNLIKELY( (sz1<sz0)|(sz1>mtu) ) ) {
     215           0 :     fd_tpu_reasm_cancel( reasm, slot );
     216           0 :     return FD_TPU_REASM_ERR_SZ;
     217           0 :   }
     218             : 
     219       65841 :   uchar * msg = slot_get_data( reasm, slot_idx );
     220       65841 :   fd_memcpy( msg+sz0, data, data_sz );
     221             : 
     222       65841 :   slot->k.sz = (ushort)( sz1 & FD_TPU_REASM_SZ_MASK );
     223       65841 :   return FD_TPU_REASM_SUCCESS;
     224       65841 : }
     225             : 
     226             : int
     227             : fd_tpu_reasm_publish( fd_tpu_reasm_t *      reasm,
     228             :                       fd_tpu_reasm_slot_t * slot,
     229             :                       fd_frag_meta_t *      mcache,
     230             :                       void *                base,  /* Assumed aligned FD_CHUNK_ALIGN */
     231             :                       ulong                 seq,
     232       65841 :                       long                  tspub ) {
     233             : 
     234       65841 :   ulong depth = reasm->depth;
     235             : 
     236       65841 :   if( FD_UNLIKELY( slot->k.state != FD_TPU_REASM_STATE_BUSY ) )
     237           0 :     return FD_TPU_REASM_ERR_STATE;
     238             : 
     239             :   /* Derive chunk index */
     240       65841 :   uint    slot_idx = slot_get_idx( reasm, slot );
     241       65841 :   uchar * data     = slot_get_data( reasm, slot_idx );
     242       65841 :   ulong   chunk    = fd_laddr_to_chunk( base, data );
     243       65841 :   if( FD_UNLIKELY( ( (ulong)data<(ulong)base ) |
     244       65841 :                    ( chunk>UINT_MAX          ) ) ) {
     245           0 :     FD_LOG_CRIT(( "invalid base %p for slot %p in tpu_reasm %p",
     246           0 :                   base, (void *)slot, (void *)reasm ));
     247           0 :   }
     248             : 
     249             :   /* Find least recently published slot.  This is our "freed slot".
     250             :      (Every time a new slot is published, another slot is simultaneously
     251             :      freed) */
     252       65841 :   uint * pub_slot       = fd_tpu_reasm_pub_slots_laddr( reasm ) + fd_mcache_line_idx( seq, depth );
     253       65841 :   uint   freed_slot_idx = *pub_slot;
     254       65841 :   if( FD_UNLIKELY( freed_slot_idx >= reasm->slot_cnt ) ) {
     255             :     /* mcache corruption */
     256           0 :     FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm slot %u out of bounds (max %u)",
     257           0 :                      freed_slot_idx, reasm->slot_cnt ));
     258           0 :     fd_tpu_reasm_reset( reasm );
     259           0 :     return FD_TPU_REASM_ERR_STATE;
     260           0 :   }
     261             : 
     262             :   /* Publish to mcache */
     263       65841 :   ulong sz          = slot->k.sz;
     264       65841 :   ulong ctl         = fd_frag_meta_ctl( reasm->orig, 1, 1, 0 );
     265       65841 :   ulong tsorig_comp = slot->tsorig_comp;
     266       65841 :   ulong tspub_comp  = fd_frag_meta_ts_comp( tspub );
     267             : 
     268       65841 : # if FD_HAS_AVX
     269       65841 :   fd_mcache_publish_avx( mcache, depth, seq, 0UL, chunk, sz, ctl, tsorig_comp, tspub_comp );
     270             : # elif FD_HAS_SSE
     271             :   fd_mcache_publish_sse( mcache, depth, seq, 0UL, chunk, sz, ctl, tsorig_comp, tspub_comp );
     272             : # else
     273             :   fd_mcache_publish    ( mcache, depth, seq, 0UL, chunk, sz, ctl, tsorig_comp, tspub_comp );
     274             : # endif
     275             : 
     276             :   /* Mark new slot as published */
     277       65841 :   slotq_remove( reasm, slot );
     278       65841 :   slot->k.state = FD_TPU_REASM_STATE_PUB;
     279       65841 :   *pub_slot = slot_idx;
     280             : 
     281             :   /* Free oldest published slot */
     282       65841 :   fd_tpu_reasm_slot_t * free_slot = fd_tpu_reasm_slots_laddr( reasm ) + freed_slot_idx;
     283       65841 :   uint free_slot_state = free_slot->k.state;
     284       65841 :   if( FD_UNLIKELY( free_slot_state != FD_TPU_REASM_STATE_PUB ) ) {
     285             :     /* mcache/slots out of sync (memory leak) */
     286           0 :     FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm seq %lu owns slot %u, but it's state is %u",
     287           0 :                      seq, freed_slot_idx, free_slot_state ));
     288           0 :     fd_tpu_reasm_reset( reasm );
     289           0 :     return FD_TPU_REASM_ERR_STATE;
     290           0 :   }
     291       65841 :   free_slot->k.state = FD_TPU_REASM_STATE_FREE;
     292       65841 :   slotq_push_tail( reasm, free_slot );
     293             : 
     294       65841 :   return FD_TPU_REASM_SUCCESS;
     295       65841 : }
     296             : 
     297             : void
     298             : fd_tpu_reasm_cancel( fd_tpu_reasm_t *      reasm,
     299        9348 :                      fd_tpu_reasm_slot_t * slot ) {
     300        9348 :   if( FD_UNLIKELY( slot->k.state != FD_TPU_REASM_STATE_BUSY ) ) return;
     301        9348 :   slotq_remove( reasm, slot );
     302        9348 :   smap_remove( reasm, slot );
     303        9348 :   slot->k.state     = FD_TPU_REASM_STATE_FREE;
     304        9348 :   slot->k.conn_uid  = ULONG_MAX;
     305        9348 :   slot->k.stream_id = 0UL;
     306        9348 :   slotq_push_tail( reasm, slot );
     307        9348 : }
     308             : 
     309             : int
     310             : fd_tpu_reasm_publish_fast( fd_tpu_reasm_t * reasm,
     311             :                            uchar const *    data,
     312             :                            ulong            sz,
     313             :                            fd_frag_meta_t * mcache,
     314             :                            void *           base,  /* Assumed aligned FD_CHUNK_ALIGN */
     315             :                            ulong            seq,
     316           0 :                            long             tspub ) {
     317             : 
     318           0 :   ulong depth = reasm->depth;
     319           0 :   if( FD_UNLIKELY( sz>FD_TPU_REASM_MTU ) ) return FD_TPU_REASM_ERR_SZ;
     320             : 
     321             :   /* Acquire least recent slot.  This is our "new slot" */
     322           0 :   fd_tpu_reasm_slot_t * slot = slotq_pop_tail( reasm );
     323           0 :   smap_remove( reasm, slot );
     324           0 :   slot_begin( slot );
     325             : 
     326             :   /* Derive buffer address of new slot */
     327           0 :   uint    slot_idx = slot_get_idx( reasm, slot );
     328           0 :   uchar * buf      = slot_get_data( reasm, slot_idx );
     329           0 :   ulong   chunk    = fd_laddr_to_chunk( base, buf );
     330           0 :   if( FD_UNLIKELY( ( (ulong)buf<(ulong)base ) |
     331           0 :                    ( chunk>UINT_MAX         ) ) ) {
     332           0 :     FD_LOG_ERR(( "Computed invalid chunk index (base=%p buf=%p chunk=%lx)",
     333           0 :                  base, (void *)buf, chunk ));
     334           0 :   }
     335             : 
     336             :   /* Find least recently published slot.  This is our "freed slot".
     337             :      (Every time a new slot is published, another slot is simultaneously
     338             :      freed) */
     339           0 :   uint * pub_slot       = fd_tpu_reasm_pub_slots_laddr( reasm ) + fd_mcache_line_idx( seq, depth );
     340           0 :   uint   freed_slot_idx = *pub_slot;
     341           0 :   if( FD_UNLIKELY( freed_slot_idx >= reasm->slot_cnt ) ) {
     342             :     /* mcache corruption */
     343           0 :     FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm slot %u out of bounds (max %u)",
     344           0 :                      freed_slot_idx, reasm->slot_cnt ));
     345           0 :     fd_tpu_reasm_reset( reasm );
     346           0 :     return FD_TPU_REASM_ERR_STATE;
     347           0 :   }
     348             : 
     349             :   /* Copy data into new slot */
     350           0 :   FD_COMPILER_MFENCE();
     351           0 :   slot->k.sz = sz & FD_TPU_REASM_SZ_MASK;
     352           0 :   fd_memcpy( buf, data, sz );
     353           0 :   FD_COMPILER_MFENCE();
     354           0 :   slot->k.state = FD_TPU_REASM_STATE_PUB;
     355           0 :   FD_COMPILER_MFENCE();
     356             : 
     357             :   /* Publish new slot, while simultaneously removing all references to
     358             :      the old slot */
     359           0 :   *pub_slot = slot_idx;
     360           0 :   ulong ctl         = fd_frag_meta_ctl( reasm->orig, 1, 1, 0 );
     361           0 :   uint  tsorig_comp = slot->tsorig_comp;
     362           0 :   uint  tspub_comp  = (uint)fd_frag_meta_ts_comp( tspub );
     363           0 : # if FD_HAS_AVX
     364           0 :   fd_mcache_publish_avx( mcache, depth, seq, 0UL, chunk, sz, ctl, tsorig_comp, tspub_comp );
     365             : # elif FD_HAS_SSE
     366             :   fd_mcache_publish_sse( mcache, depth, seq, 0UL, chunk, sz, ctl, tsorig_comp, tspub_comp );
     367             : # else
     368             :   fd_mcache_publish    ( mcache, depth, seq, 0UL, chunk, sz, ctl, tsorig_comp, tspub_comp );
     369             : # endif
     370             : 
     371             :   /* Free old slot */
     372           0 :   fd_tpu_reasm_slot_t * free_slot = fd_tpu_reasm_slots_laddr( reasm ) + freed_slot_idx;
     373           0 :   uint free_slot_state = free_slot->k.state;
     374           0 :   if( FD_UNLIKELY( free_slot_state != FD_TPU_REASM_STATE_PUB ) ) {
     375             :     /* mcache/slots out of sync (memory leak) */
     376           0 :     FD_LOG_WARNING(( "mcache corruption detected! tpu_reasm seq %lu owns slot %u, but it's state is %u",
     377           0 :                      seq, freed_slot_idx, free_slot_state ));
     378           0 :     fd_tpu_reasm_reset( reasm );
     379           0 :     return FD_TPU_REASM_ERR_STATE;
     380           0 :   }
     381           0 :   free_slot->k.state = FD_TPU_REASM_STATE_FREE;
     382           0 :   slotq_push_tail( reasm, free_slot );
     383           0 :   return FD_TPU_REASM_SUCCESS;
     384           0 : }

Generated by: LCOV version 1.14