LCOV - code coverage report
Current view: top level - disco/events - fd_circq.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 111 174 63.8 %
Date: 2026-06-29 05:51:35 Functions: 8 16 50.0 %

          Line data    Source code
       1             : #include "fd_circq.h"
       2             : 
       3             : #include "../../util/log/fd_log.h"
       4             : 
       5             : struct __attribute__((aligned(8UL))) fd_circq_message_private {
       6             :   ulong align;
       7             :   ulong footprint;
       8             : 
       9             :   /* Offset withn the circular buffer data region of where the next
      10             :      message starts, if there is one.  This is not always the same as
      11             :      aligning up this message + footprint, because the next message may
      12             :      have wrapped around to the start of the buffer. */
      13             :   ulong next;
      14             : };
      15             : 
      16             : typedef struct fd_circq_message_private fd_circq_message_t;
      17             : 
      18             : FD_FN_CONST ulong
      19           0 : fd_circq_align( void ) {
      20           0 :   return FD_CIRCQ_ALIGN;
      21           0 : }
      22             : 
      23             : FD_FN_CONST ulong
      24           0 : fd_circq_footprint( ulong sz ) {
      25           0 :   return sizeof( fd_circq_t ) + sz;
      26           0 : }
      27             : 
      28             : void *
      29             : fd_circq_new( void * shmem,
      30          36 :               ulong  sz ) {
      31          36 :   fd_circq_t * circq = (fd_circq_t *)shmem;
      32          36 :   circq->cnt  = 0UL;
      33          36 :   circq->head = 0UL;
      34          36 :   circq->tail = 0UL;
      35          36 :   circq->size = sz;
      36          36 :   circq->cursor = ULONG_MAX;
      37          36 :   circq->cursor_seq = 0UL;
      38          36 :   circq->cursor_push_seq = 0UL;
      39             : 
      40          36 :   memset( &circq->metrics, 0, sizeof( circq->metrics ) );
      41             : 
      42          36 :   return shmem;
      43          36 : }
      44             : 
      45             : fd_circq_t *
      46          36 : fd_circq_join( void * shbuf ) {
      47          36 :   return (fd_circq_t *)shbuf;
      48          36 : }
      49             : 
      50             : void *
      51           0 : fd_circq_leave( fd_circq_t * buf ) {
      52           0 :   return (void *)buf;
      53           0 : }
      54             : 
      55             : void *
      56           0 : fd_circq_delete( void * shbuf ) {
      57           0 :   return shbuf;
      58           0 : }
      59             : 
      60             : static inline void FD_FN_UNUSED
      61           0 : verify( fd_circq_t * circq ) {
      62           0 :   FD_TEST( circq->head<circq->size );
      63           0 :   FD_TEST( circq->tail<circq->size );
      64           0 :   FD_TEST( circq->tail!=circq->head || circq->cnt<=1 );
      65           0 :   if( !circq->cnt ) {
      66           0 :     FD_TEST( circq->head==0UL );
      67           0 :     FD_TEST( circq->tail==0UL );
      68           0 :   } else if( circq->cnt==1UL ) {
      69           0 :     FD_TEST( circq->head==circq->tail );
      70           0 :   }
      71           0 : 
      72           0 :   uchar * buf = (uchar *)(circq+1);
      73           0 : 
      74           0 :   ulong current = circq->head;
      75           0 :   int wrapped = 0;
      76           0 :   for( ulong i=0UL; i<circq->cnt; i++ ) {
      77           0 :     fd_circq_message_t * message = (fd_circq_message_t *)(buf+current);
      78           0 :     ulong start = current;
      79           0 :     ulong end = fd_ulong_align_up( start+sizeof( fd_circq_message_t ), message->align ) + message->footprint;
      80           0 :     if( wrapped ) FD_TEST( end<=circq->head );
      81           0 :     FD_TEST( start<end );
      82           0 :     FD_TEST( end<=circq->size );
      83           0 :     current = message->next;
      84           0 :     if( current<start ) wrapped = 1;
      85           0 :   }
      86           0 : }
      87             : 
      88             : /* Recover from eviction logic removing elements at the cursor */
      89             : 
      90             : static inline void
      91    12583278 : overrun_recover( fd_circq_t * circq ) {
      92    12583278 :   if( FD_UNLIKELY( circq->cursor==ULONG_MAX ) ) return;
      93             : 
      94         300 :   ulong oldest_seq = circq->cursor_push_seq - circq->cnt;
      95         300 :   if( FD_UNLIKELY( circq->cursor_seq<=oldest_seq ) ) circq->cursor = ULONG_MAX;
      96         300 : }
      97             : 
      98             : static void
      99             : evict( fd_circq_t * circq,
     100             :        ulong        from,
     101    20642571 :        ulong        to ) {
     102    20642571 :   uchar * buf = (uchar *)(circq+1);
     103             : 
     104    33225831 :   for(;;) {
     105    33225831 :     if( FD_UNLIKELY( !circq->cnt ) ) return;
     106             : 
     107    27493179 :     fd_circq_message_t * head = (fd_circq_message_t *)(buf+circq->head);
     108             : 
     109    27493179 :     ulong start = circq->head;
     110    27493179 :     ulong end = fd_ulong_align_up( start + sizeof( fd_circq_message_t ), head->align ) + head->footprint;
     111             : 
     112    27493179 :     if( FD_UNLIKELY( (start<to && end>from) ) ) {
     113    12583260 :       circq->cnt--;
     114    12583260 :       circq->metrics.drop_cnt++;
     115    12583260 :       if( FD_LIKELY( !circq->cnt ) ) circq->head = circq->tail = 0UL;
     116     6850647 :       else                           circq->head = head->next;
     117    12583260 :       overrun_recover( circq );
     118    14909919 :     } else {
     119    14909919 :       break;
     120    14909919 :     }
     121    27493179 :   }
     122    20642571 : }
     123             : 
     124             : uchar *
     125             : fd_circq_push_back( fd_circq_t * circq,
     126             :                     ulong        align,
     127    12583410 :                     ulong        footprint ) {
     128    12583410 :   if( FD_UNLIKELY( !fd_ulong_is_pow2( align ) ) ) {
     129           0 :     FD_LOG_WARNING(( "align must be a power of 2" ));
     130           0 :     return NULL;
     131           0 :   }
     132    12583410 :   if( FD_UNLIKELY( align>FD_CIRCQ_ALIGN ) ) {
     133           0 :     FD_LOG_WARNING(( "align must be at most %lu", FD_CIRCQ_ALIGN ));
     134           0 :     return NULL;
     135           0 :   }
     136             : 
     137    12583410 :   ulong required = fd_ulong_align_up( sizeof( fd_circq_message_t ), align ) + footprint;
     138    12583410 :   if( FD_UNLIKELY( required>circq->size ) ) {
     139           3 :     FD_LOG_WARNING(( "tried to push message which was too large %lu>%lu", required, circq->size ));
     140           3 :     return NULL;
     141           3 :   }
     142             : 
     143    12583407 :   uchar * buf = (uchar *)(circq+1);
     144             : 
     145    12583407 :   ulong current = 0UL;
     146    12583407 :   fd_circq_message_t * message = NULL;
     147    12583407 :   if( FD_LIKELY( circq->cnt ) ) {
     148    12583368 :     message = (fd_circq_message_t *)(buf+circq->tail);
     149    12583368 :     current = fd_ulong_align_up( fd_ulong_align_up( circq->tail+sizeof( fd_circq_message_t ), message->align )+message->footprint, alignof( fd_circq_message_t ) );
     150    12583368 :   }
     151             : 
     152    12583407 :   if( FD_UNLIKELY( current+required>circq->size ) ) {
     153     8059164 :     evict( circq, current, circq->size );
     154     8059164 :     evict( circq, 0UL, required );
     155             : 
     156     8059164 :     circq->tail = 0UL;
     157     8059164 :     if( FD_LIKELY( circq->cnt && message ) ) message->next = 0UL;
     158     8059164 :   } else {
     159     4524243 :     evict( circq, current, current+required );
     160             : 
     161     4524243 :     circq->tail = current;
     162     4524243 :     if( FD_LIKELY( circq->cnt && message ) ) message->next = current;
     163     4524243 :   }
     164             : 
     165    12583407 :   circq->cnt++;
     166    12583407 :   fd_circq_message_t * next_message = (fd_circq_message_t *)(buf+circq->tail);
     167    12583407 :   next_message->align = align;
     168    12583407 :   next_message->footprint = footprint;
     169    12583407 :   next_message->next = ULONG_MAX;
     170    12583407 :   circq->cursor_push_seq++;
     171    12583407 :   return (uchar *)(next_message+1);
     172    12583410 : }
     173             : 
     174             : void
     175             : fd_circq_resize_back( fd_circq_t * circq,
     176           0 :                       ulong        new_footprint ) {
     177           0 :   FD_TEST( circq->cnt );
     178             : 
     179           0 :   uchar * buf = (uchar *)(circq+1);
     180           0 :   fd_circq_message_t * message = (fd_circq_message_t *)(buf+circq->tail);
     181           0 :   FD_TEST( new_footprint<=message->footprint );
     182             : 
     183           0 :   message->footprint = new_footprint;
     184           0 : }
     185             : 
     186             : uchar const *
     187             : fd_circq_cursor_advance( fd_circq_t * circq,
     188         441 :                          ulong *      msg_sz ) {
     189             :   /* First call or after reset - start from head */
     190         441 :   if( FD_UNLIKELY( circq->cursor==ULONG_MAX ) ) {
     191         309 :     if( FD_UNLIKELY( !circq->cnt ) ) return NULL;
     192         303 :     circq->cursor = circq->head;
     193         303 :     circq->cursor_seq = circq->cursor_push_seq - circq->cnt;
     194         303 :   } else {
     195             :     /* Already iterating - move to next */
     196         132 :     if( FD_UNLIKELY( circq->cursor_seq >= circq->cursor_push_seq ) ) return NULL;
     197             : 
     198         120 :     uchar * buf = (uchar *)(circq+1);
     199         120 :     fd_circq_message_t * message = (fd_circq_message_t *)(buf+circq->cursor);
     200         120 :     circq->cursor = message->next;
     201         120 :   }
     202             : 
     203         423 :   uchar * buf = (uchar *)(circq+1);
     204         423 :   fd_circq_message_t * current_msg = (fd_circq_message_t *)(buf+circq->cursor);
     205         423 :   circq->cursor_seq++;
     206         423 :   if( FD_LIKELY( msg_sz ) ) *msg_sz = current_msg->footprint;
     207         423 :   return (uchar *)(current_msg+1);
     208         441 : }
     209             : 
     210             : int
     211             : fd_circq_pop_until( fd_circq_t * circq,
     212          27 :                     ulong        cursor ) {
     213          27 :   if( FD_UNLIKELY( cursor>=circq->cursor_seq ) ) return -1;
     214             : 
     215          21 :   ulong oldest_seq = circq->cursor_push_seq-circq->cnt;
     216          21 :   if( FD_UNLIKELY( cursor<oldest_seq ) ) return 0;
     217             : 
     218          18 :   ulong to_pop = fd_ulong_min( cursor-oldest_seq+1UL, circq->cnt );
     219             : 
     220          18 :   uchar * buf = (uchar *)(circq+1);
     221          48 :   for( ulong i=0UL; i<to_pop; i++ ) {
     222          30 :     fd_circq_message_t * message = (fd_circq_message_t *)(buf+circq->head);
     223          30 :     circq->cnt--;
     224             : 
     225          30 :     if( FD_UNLIKELY( !circq->cnt ) ) {
     226           6 :       circq->head = circq->tail = 0UL;
     227          24 :     } else {
     228          24 :       circq->head = message->next;
     229          24 :       FD_TEST( circq->head<circq->size );
     230          24 :     }
     231          30 :   }
     232             : 
     233          18 :   if( FD_UNLIKELY( !circq->cnt ) ) circq->cursor = ULONG_MAX;
     234          18 :   overrun_recover( circq );
     235          18 :   return 0;
     236          18 : }
     237             : 
     238             : void
     239          63 : fd_circq_reset_cursor( fd_circq_t * circq ) {
     240          63 :   circq->cursor = ULONG_MAX;
     241          63 : }
     242             : 
     243             : ulong
     244           0 : fd_circq_bytes_used( fd_circq_t const * circq ) {
     245           0 :   if( FD_UNLIKELY( !circq->cnt ) ) return 0UL;
     246             : 
     247           0 :   uchar const * buf = (uchar const *)(circq+1);
     248             : 
     249           0 :   fd_circq_message_t const * tail_msg = (fd_circq_message_t const *)(buf+circq->tail);
     250           0 :   ulong tail_end = fd_ulong_align_up( circq->tail + sizeof(fd_circq_message_t), tail_msg->align ) + tail_msg->footprint;
     251             : 
     252           0 :   if( FD_LIKELY( circq->tail>=circq->head ) ) return tail_end - circq->head;
     253           0 :   else return (circq->size - circq->head) + tail_end;
     254           0 : }
     255             : 
     256             : ulong
     257           0 : fd_circq_unsent_cnt( fd_circq_t const * circq ) {
     258           0 :   if( FD_UNLIKELY( circq->cursor==ULONG_MAX ) ) return circq->cnt;
     259           0 :   return fd_ulong_min( circq->cursor_push_seq - circq->cursor_seq, circq->cnt );
     260           0 : }

Generated by: LCOV version 1.14