LCOV - code coverage report
Current view: top level - disco/events - fd_circq.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 103 162 63.6 %
Date: 2026-02-13 06:06:24 Functions: 7 14 50.0 %

          Line data    Source code
       1             : #include "fd_circq.h"
       2             : 
       3             : #include "../../util/log/fd_log.h"
       4             : 
       5             : struct __attribute__((aligned(8UL))) fd_circq_message_private {
       6             :   ulong align;
       7             :   ulong footprint;
       8             : 
       9             :   /* Offset withn the circular buffer data region of where the next
      10             :      message starts, if there is one.  This is not always the same as
      11             :      aligning up this message + footprint, because the next message may
      12             :      have wrapped around to the start of the buffer. */
      13             :   ulong next;
      14             : };
      15             : 
      16             : typedef struct fd_circq_message_private fd_circq_message_t;
      17             : 
      18             : FD_FN_CONST ulong
      19           0 : fd_circq_align( void ) {
      20           0 :   return FD_CIRCQ_ALIGN;
      21           0 : }
      22             : 
      23             : FD_FN_CONST ulong
      24           0 : fd_circq_footprint( ulong sz ) {
      25           0 :   return sizeof( fd_circq_t ) + sz;
      26           0 : }
      27             : 
      28             : void *
      29             : fd_circq_new( void * shmem,
      30          27 :               ulong  sz ) {
      31          27 :   fd_circq_t * circq = (fd_circq_t *)shmem;
      32          27 :   circq->cnt  = 0UL;
      33          27 :   circq->head = 0UL;
      34          27 :   circq->tail = 0UL;
      35          27 :   circq->size = sz;
      36          27 :   circq->cursor = ULONG_MAX;
      37          27 :   circq->cursor_seq = 0UL;
      38          27 :   circq->cursor_push_seq = 0UL;
      39             : 
      40          27 :   memset( &circq->metrics, 0, sizeof( circq->metrics ) );
      41             : 
      42          27 :   return shmem;
      43          27 : }
      44             : 
      45             : fd_circq_t *
      46          27 : fd_circq_join( void * shbuf ) {
      47          27 :   return (fd_circq_t *)shbuf;
      48          27 : }
      49             : 
      50             : void *
      51           0 : fd_circq_leave( fd_circq_t * buf ) {
      52           0 :   return (void *)buf;
      53           0 : }
      54             : 
      55             : void *
      56           0 : fd_circq_delete( void * shbuf ) {
      57           0 :   return shbuf;
      58           0 : }
      59             : 
      60             : static inline void FD_FN_UNUSED
      61           0 : verify( fd_circq_t * circq ) {
      62           0 :   FD_TEST( circq->head<circq->size );
      63           0 :   FD_TEST( circq->tail<circq->size );
      64           0 :   FD_TEST( circq->tail!=circq->head || circq->cnt<=1 );
      65           0 :   if( !circq->cnt ) {
      66           0 :     FD_TEST( circq->head==0UL );
      67           0 :     FD_TEST( circq->tail==0UL );
      68           0 :   } else if( circq->cnt==1UL ) {
      69           0 :     FD_TEST( circq->head==circq->tail );
      70           0 :   }
      71           0 : 
      72           0 :   uchar * buf = (uchar *)(circq+1);
      73           0 : 
      74           0 :   ulong current = circq->head;
      75           0 :   int wrapped = 0;
      76           0 :   for( ulong i=0UL; i<circq->cnt; i++ ) {
      77           0 :     fd_circq_message_t * message = (fd_circq_message_t *)(buf+current);
      78           0 :     ulong start = current;
      79           0 :     ulong end = fd_ulong_align_up( start+sizeof( fd_circq_message_t ), message->align ) + message->footprint;
      80           0 :     if( wrapped ) FD_TEST( end<=circq->head );
      81           0 :     FD_TEST( start<end );
      82           0 :     FD_TEST( end<=circq->size );
      83           0 :     current = message->next;
      84           0 :     if( current<start ) wrapped = 1;
      85           0 :   }
      86           0 : }
      87             : 
      88             : static void
      89             : evict( fd_circq_t * circq,
      90             :        ulong        from,
      91   330266850 :        ulong        to ) {
      92   330266850 :   uchar * buf = (uchar *)(circq+1);
      93             : 
      94   531593781 :   for(;;) {
      95   531593781 :     if( FD_UNLIKELY( !circq->cnt ) ) return;
      96             : 
      97   439888764 :     fd_circq_message_t * head = (fd_circq_message_t *)(buf+circq->head);
      98             : 
      99   439888764 :     ulong start = circq->head;
     100   439888764 :     ulong end = fd_ulong_align_up( start + sizeof( fd_circq_message_t ), head->align ) + head->footprint;
     101             : 
     102   439888764 :     if( FD_UNLIKELY( (start<to && end>from) ) ) {
     103   201326931 :       circq->cnt--;
     104   201326931 :       circq->metrics.drop_cnt++;
     105   201326931 :       if( FD_LIKELY( !circq->cnt ) ) circq->head = circq->tail = 0UL;
     106   109621944 :       else                           circq->head = head->next;
     107   238561833 :     } else {
     108   238561833 :       break;
     109   238561833 :     }
     110   439888764 :   }
     111   330266850 : }
     112             : 
     113             : uchar *
     114             : fd_circq_push_back( fd_circq_t * circq,
     115             :                     ulong        align,
     116   201327063 :                     ulong        footprint ) {
     117   201327063 :   if( FD_UNLIKELY( !fd_ulong_is_pow2( align ) ) ) {
     118           0 :     FD_LOG_WARNING(( "align must be a power of 2" ));
     119           0 :     return NULL;
     120           0 :   }
     121   201327063 :   if( FD_UNLIKELY( align>FD_CIRCQ_ALIGN ) ) {
     122           0 :     FD_LOG_WARNING(( "align must be at most %lu", FD_CIRCQ_ALIGN ));
     123           0 :     return NULL;
     124           0 :   }
     125             : 
     126   201327063 :   ulong required = fd_ulong_align_up( sizeof( fd_circq_message_t ), align ) + footprint;
     127   201327063 :   if( FD_UNLIKELY( required>circq->size ) ) {
     128           3 :     FD_LOG_WARNING(( "tried to push message which was too large %lu>%lu", required, circq->size ));
     129           3 :     return NULL;
     130           3 :   }
     131             : 
     132   201327060 :   uchar * buf = (uchar *)(circq+1);
     133             : 
     134   201327060 :   ulong current = 0UL;
     135   201327060 :   fd_circq_message_t * message = NULL;
     136   201327060 :   if( FD_LIKELY( circq->cnt ) ) {
     137   201327030 :     message = (fd_circq_message_t *)(buf+circq->tail);
     138   201327030 :     current = fd_ulong_align_up( fd_ulong_align_up( circq->tail+sizeof( fd_circq_message_t ), message->align )+message->footprint, alignof( fd_circq_message_t ) );
     139   201327030 :   }
     140             : 
     141   201327060 :   if( FD_UNLIKELY( current+required>circq->size ) ) {
     142   128939790 :     evict( circq, current, circq->size );
     143   128939790 :     evict( circq, 0UL, required );
     144             : 
     145   128939790 :     circq->tail = 0UL;
     146   128939790 :     if( FD_LIKELY( circq->cnt && message ) ) message->next = 0UL;
     147   128939790 :   } else {
     148    72387270 :     evict( circq, current, current+required );
     149             : 
     150    72387270 :     circq->tail = current;
     151    72387270 :     if( FD_LIKELY( circq->cnt && message ) ) message->next = current;
     152    72387270 :   }
     153             : 
     154   201327060 :   circq->cnt++;
     155   201327060 :   fd_circq_message_t * next_message = (fd_circq_message_t *)(buf+circq->tail);
     156   201327060 :   next_message->align = align;
     157   201327060 :   next_message->footprint = footprint;
     158   201327060 :   circq->cursor_push_seq++;
     159   201327060 :   return (uchar *)(next_message+1);
     160   201327063 : }
     161             : 
     162             : void
     163             : fd_circq_resize_back( fd_circq_t * circq,
     164           0 :                       ulong        new_footprint ) {
     165           0 :   FD_TEST( circq->cnt );
     166             : 
     167           0 :   uchar * buf = (uchar *)(circq+1);
     168           0 :   fd_circq_message_t * message = (fd_circq_message_t *)(buf+circq->tail);
     169           0 :   FD_TEST( new_footprint<=message->footprint );
     170             : 
     171           0 :   message->footprint = new_footprint;
     172           0 : }
     173             : 
     174             : uchar const *
     175             : fd_circq_cursor_advance( fd_circq_t * circq,
     176         414 :                          ulong *      msg_sz ) {
     177             :   /* First call or after reset - start from head */
     178         414 :   if( FD_UNLIKELY( circq->cursor==ULONG_MAX ) ) {
     179          72 :     if( FD_UNLIKELY( !circq->cnt ) ) return NULL;
     180          66 :     circq->cursor = circq->head;
     181          66 :     circq->cursor_seq = circq->cursor_push_seq - circq->cnt;
     182         342 :   } else {
     183             :     /* Already iterating - move to next */
     184         342 :     if( FD_UNLIKELY( circq->cursor_seq >= circq->cursor_push_seq ) ) return NULL;
     185             : 
     186         333 :     uchar * buf = (uchar *)(circq+1);
     187         333 :     fd_circq_message_t * message = (fd_circq_message_t *)(buf+circq->cursor);
     188         333 :     circq->cursor = message->next;
     189         333 :   }
     190             : 
     191         399 :   uchar * buf = (uchar *)(circq+1);
     192         399 :   fd_circq_message_t * current_msg = (fd_circq_message_t *)(buf+circq->cursor);
     193         399 :   circq->cursor_seq++;
     194         399 :   if( FD_LIKELY( msg_sz ) ) *msg_sz = current_msg->footprint;
     195         399 :   return (uchar *)(current_msg+1);
     196         414 : }
     197             : 
     198             : int
     199             : fd_circq_pop_until( fd_circq_t * circq,
     200          18 :                     ulong        cursor ) {
     201          18 :   if( FD_UNLIKELY( cursor>=circq->cursor_push_seq ) ) return -1;
     202             : 
     203          15 :   ulong oldest_seq = circq->cursor_push_seq-circq->cnt;
     204          15 :   if( FD_UNLIKELY( cursor<oldest_seq ) ) return 0;
     205             : 
     206          12 :   ulong to_pop = fd_ulong_min( cursor-oldest_seq+1UL, circq->cnt );
     207             : 
     208          12 :   uchar * buf = (uchar *)(circq+1);
     209          36 :   for( ulong i=0UL; i<to_pop; i++ ) {
     210          24 :     fd_circq_message_t * message = (fd_circq_message_t *)(buf+circq->head);
     211          24 :     circq->cnt--;
     212             : 
     213          24 :     if( FD_UNLIKELY( !circq->cnt ) ) {
     214           6 :       circq->head = circq->tail = 0UL;
     215          18 :     } else {
     216          18 :       circq->head = message->next;
     217          18 :       FD_TEST( circq->head<circq->size );
     218          18 :     }
     219          24 :   }
     220             : 
     221          12 :   if( FD_UNLIKELY( !circq->cnt ) ) circq->cursor = ULONG_MAX;
     222          12 :   return 0;
     223          12 : }
     224             : 
     225             : void
     226          60 : fd_circq_reset_cursor( fd_circq_t * circq ) {
     227          60 :   circq->cursor = ULONG_MAX;
     228          60 : }
     229             : 
     230             : ulong
     231           0 : fd_circq_bytes_used( fd_circq_t const * circq ) {
     232           0 :   if( FD_UNLIKELY( !circq->cnt ) ) return 0UL;
     233             : 
     234           0 :   uchar const * buf = (uchar const *)(circq+1);
     235             : 
     236           0 :   fd_circq_message_t const * tail_msg = (fd_circq_message_t const *)(buf+circq->tail);
     237           0 :   ulong tail_end = fd_ulong_align_up( circq->tail + sizeof(fd_circq_message_t), tail_msg->align ) + tail_msg->footprint;
     238             : 
     239           0 :   if( FD_LIKELY( circq->tail>=circq->head ) ) return tail_end - circq->head;
     240           0 :   else return (circq->size - circq->head) + tail_end;
     241           0 : }

Generated by: LCOV version 1.14