LCOV - code coverage report
Current view: top level - disco/store - fd_shredb.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 137 202 67.8 %
Date: 2026-06-23 08:01:55 Functions: 11 12 91.7 %

          Line data    Source code
       1             : #include "fd_shredb.h"
       2             : 
       3             : #include <errno.h>
       4             : #include <sys/mman.h>
       5             : #include <fcntl.h>
       6             : #include <unistd.h>
       7             : 
       8             : static inline ulong
       9         180 : fd_shredb_max_shreds_for_gib( ulong max_size_gib ) {
      10         180 :   return (max_size_gib*1024UL*1024UL*1024UL) / sizeof(fd_shredb_entry_t);
      11         180 : }
      12             : 
      13             : /* We size the slot map such that it will never fill before we start
      14             :    evicting from the shred_map/ring buffer. The minimum number of shreds
      15             :    per slot is 32 (one FEC set), so it is guaranteed that in the worst case
      16             :    we will be able to represent every FEC set inserted into the database.
      17             : 
      18             :    Remember that we will always be inserting complete sets, consisting of
      19             :    32 data shreds at a time. */
      20             : static inline ulong
      21          90 : fd_shredb_max_slots_for_gib( ulong max_size_gib ) {
      22          90 :   return fd_shredb_max_shreds_for_gib( max_size_gib ) / 32UL;
      23          90 : }
      24             : 
      25             : FD_FN_CONST ulong
      26          66 : fd_shredb_footprint( ulong max_size_gib ) {
      27          66 :   if( FD_UNLIKELY( !max_size_gib ) ) return 0UL;
      28             : 
      29          63 :   ulong max_shreds = fd_shredb_max_shreds_for_gib( max_size_gib );
      30          63 :   ulong max_slots  = fd_shredb_max_slots_for_gib ( max_size_gib );
      31             : 
      32          63 :   int lg_shred_cnt = fd_ulong_find_msb( fd_ulong_pow2_up( max_shreds ) );
      33          63 :   int lg_slot_cnt  = fd_ulong_find_msb( fd_ulong_pow2_up( max_slots  ) );
      34             : 
      35          63 :   ulong l = FD_LAYOUT_INIT;
      36          63 :   l = FD_LAYOUT_APPEND( l, alignof(fd_shredb_t),        sizeof(fd_shredb_t)                           );
      37          63 :   l = FD_LAYOUT_APPEND( l, fd_shredb_shred_map_align(), fd_shredb_shred_map_footprint( lg_shred_cnt ) );
      38          63 :   l = FD_LAYOUT_APPEND( l, fd_shredb_slot_map_align(),  fd_shredb_slot_map_footprint ( lg_slot_cnt  ) );
      39          63 :   return FD_LAYOUT_FINI( l, fd_shredb_align() );
      40          66 : }
      41             : 
      42             : void *
      43             : fd_shredb_new( void       * shmem,
      44             :                ulong        max_size_gib,
      45             :                char const * file_path,
      46          27 :                ulong        seed ) {
      47          27 :   if( FD_UNLIKELY( !shmem ) ) {
      48           0 :     FD_LOG_WARNING(( "NULL shmem" ));
      49           0 :     return NULL;
      50           0 :   }
      51             : 
      52          27 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_shredb_align() ) ) ) {
      53           0 :     FD_LOG_WARNING(( "misaligned shmem" ));
      54           0 :     return NULL;
      55           0 :   }
      56             : 
      57          27 :   if( FD_UNLIKELY( !file_path ) ) {
      58           0 :     FD_LOG_WARNING(( "NULL file_path" ));
      59           0 :     return NULL;
      60           0 :   }
      61             : 
      62          27 :   ulong footprint = fd_shredb_footprint( max_size_gib );
      63          27 :   if( FD_UNLIKELY( !footprint ) ) {
      64           0 :     FD_LOG_WARNING(( "bad max_size_gib (%lu)", max_size_gib ));
      65           0 :     return NULL;
      66           0 :   }
      67             : 
      68          27 :   ulong max_shreds = fd_shredb_max_shreds_for_gib( max_size_gib );
      69          27 :   ulong max_slots  = fd_shredb_max_slots_for_gib ( max_size_gib );
      70             : 
      71          27 :   int lg_shred_cnt = fd_ulong_find_msb( fd_ulong_pow2_up( max_shreds ) );
      72          27 :   int lg_slot_cnt  = fd_ulong_find_msb( fd_ulong_pow2_up( max_slots  ) );
      73             : 
      74          27 :   FD_SCRATCH_ALLOC_INIT( l, shmem );
      75          27 :   /**/                   FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_shredb_t),        sizeof(fd_shredb_t)                           );
      76          27 :   void * shred_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_shredb_shred_map_align(), fd_shredb_shred_map_footprint( lg_shred_cnt ) );
      77          27 :   void * slot_map_mem  = FD_SCRATCH_ALLOC_APPEND( l, fd_shredb_slot_map_align(),  fd_shredb_slot_map_footprint ( lg_slot_cnt  ) );
      78             : 
      79          27 :   fd_shredb_t * store = (fd_shredb_t *)shmem;
      80          27 :   store->shred_map    = fd_shredb_shred_map_new( shred_map_mem, lg_shred_cnt, seed );
      81          27 :   store->slot_map     = fd_shredb_slot_map_new ( slot_map_mem,  lg_slot_cnt,  seed );
      82             : 
      83          27 :   int fd = open( file_path, O_RDWR | O_CREAT | O_TRUNC, (mode_t)0600 );
      84          27 :   if( FD_UNLIKELY( fd<0 ) ) {
      85           0 :     FD_LOG_WARNING(( "open(%s) failed (%i-%s)", file_path, errno, fd_io_strerror( errno ) ));
      86           0 :     return NULL;
      87           0 :   }
      88             : 
      89          27 :   ulong initial_shreds = 128UL;
      90          27 :   if( FD_UNLIKELY( ftruncate( fd, (off_t)(initial_shreds * sizeof(fd_shredb_entry_t)) ) ) ) {
      91           0 :     FD_LOG_WARNING(( "ftruncate failed (%i-%s)", errno, fd_io_strerror( errno ) ));
      92           0 :     close( fd );
      93           0 :     return NULL;
      94           0 :   }
      95             : 
      96          27 :   ulong mapped_sz = max_shreds * sizeof(fd_shredb_entry_t);
      97          27 :   void * mapped = mmap( NULL, mapped_sz, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0 );
      98          27 :   if( FD_UNLIKELY( mapped==MAP_FAILED ) ) {
      99           0 :     FD_LOG_WARNING(( "mmap failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     100           0 :     close( fd );
     101           0 :     return NULL;
     102           0 :   }
     103             : 
     104          27 :   store->max_shreds  = max_shreds;
     105          27 :   store->write_head  = 0UL;
     106          27 :   store->cnt         = 0UL;
     107          27 :   store->mapped_sz   = mapped_sz;
     108          27 :   store->mapped      = mapped;
     109          27 :   store->fd          = fd;
     110          27 :   store->file_shreds = initial_shreds;
     111             : 
     112          27 :   FD_TEST( FD_SCRATCH_ALLOC_FINI( l, fd_shredb_align() )==(ulong)shmem + footprint );
     113             : 
     114          27 :   return shmem;
     115          27 : }
     116             : 
     117             : fd_shredb_t *
     118          27 : fd_shredb_join( void * shstore ) {
     119          27 :   if( FD_UNLIKELY( !shstore ) ) {
     120           0 :     FD_LOG_WARNING(( "NULL shstore" ));
     121           0 :     return NULL;
     122           0 :   }
     123             : 
     124          27 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shstore, fd_shredb_align() ) ) ) {
     125           0 :     FD_LOG_WARNING(( "misaligned shstore" ));
     126           0 :     return NULL;
     127           0 :   }
     128             : 
     129          27 :   fd_shredb_t * store = (fd_shredb_t *)shstore;
     130          27 :   store->shred_map = fd_shredb_shred_map_join( store->shred_map );
     131          27 :   store->slot_map  = fd_shredb_slot_map_join ( store->slot_map  );
     132             : 
     133          27 :   return (fd_shredb_t *)shstore;
     134          27 : }
     135             : 
     136             : void *
     137          27 : fd_shredb_leave( fd_shredb_t const * store ) {
     138          27 :   if( FD_UNLIKELY( !store ) ) {
     139           0 :     FD_LOG_WARNING(( "NULL store" ));
     140           0 :     return NULL;
     141           0 :   }
     142             : 
     143          27 :   return (void *)store;
     144          27 : }
     145             : 
     146             : void *
     147          27 : fd_shredb_delete( void * shstore ) {
     148          27 :   if( FD_UNLIKELY( !shstore ) ) {
     149           0 :     FD_LOG_WARNING(( "NULL shstore" ));
     150           0 :     return NULL;
     151           0 :   }
     152             : 
     153          27 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shstore, fd_shredb_align() ) ) ) {
     154           0 :     FD_LOG_WARNING(( "misaligned shstore" ));
     155           0 :     return NULL;
     156           0 :   }
     157             : 
     158          27 :   fd_shredb_t * store = (fd_shredb_t *)shstore;
     159          27 :   if( FD_UNLIKELY( !store->mapped ) ) {
     160           0 :     FD_LOG_WARNING(( "NULL mapped" ));
     161           0 :     return NULL;
     162           0 :   }
     163          27 :   munmap( store->mapped, store->mapped_sz );
     164          27 :   close( store->fd );
     165             : 
     166          27 :   return shstore;
     167          27 : }
     168             : 
     169             : static void
     170             : fd_shredb_slot_evict( fd_shredb_t * store,
     171             :                       ulong         slot,
     172      483852 :                       uint          evicted_shred_idx ) {
     173      483852 :   fd_shredb_slot_entry_t * se = fd_shredb_slot_map_query( store->slot_map, slot, NULL );
     174      483852 :   FD_TEST( se );
     175             : 
     176      483852 :   se->cnt--;
     177      483852 :   if( FD_UNLIKELY( se->cnt==0UL ) ) {
     178       15213 :     fd_shredb_slot_map_remove( store->slot_map, se );
     179       15213 :     return;
     180       15213 :   }
     181             : 
     182             :   /* If the shred evicted was the highest in that slot, walk down and
     183             :      find the new highest that still exists in the per-shred map. */
     184      468639 :   if( evicted_shred_idx==se->highest_shred_idx ) {
     185           0 :     for( uint idx = evicted_shred_idx; ; idx-- ) {
     186           0 :       ulong key = fd_shredb_key_pack( slot, idx );
     187           0 :       if( fd_shredb_shred_map_query( store->shred_map, key, NULL ) ) {
     188           0 :         se->highest_shred_idx = idx;
     189           0 :         return;
     190           0 :       }
     191           0 :       if( FD_UNLIKELY( idx==0U ) ) break;
     192           0 :     }
     193           0 :     FD_LOG_ERR(( "corrupt store state" ));
     194           0 :   }
     195      468639 : }
     196             : 
     197             : static inline fd_shredb_entry_t *
     198     3120963 : fd_shredb_ring( fd_shredb_t * store ) {
     199     3120963 :   return (fd_shredb_entry_t *)store->mapped;
     200     3120963 : }
     201             : 
     202             : void
     203             : fd_shredb_insert( fd_shredb_t      * store,
     204             :                   fd_shred_t const * shred,
     205     3060699 :                   ulong              shred_sz ) {
     206     3060699 :   ulong slot      = shred->slot;
     207     3060699 :   uint  shred_idx = shred->idx;
     208             : 
     209     3060699 :   FD_LOG_DEBUG(( "inserting shred into store (slot=%lu, shred_idx=%u)", slot, shred_idx ));
     210             : 
     211     3060699 :   ulong key = fd_shredb_key_pack( slot, shred_idx );
     212     3060699 :   if( fd_shredb_shred_map_query( store->shred_map, key, NULL ) ) return;
     213             : 
     214             :   /* Grow the backing file if the write head has reached the current
     215             :      file capacity.  Double the file size each time (superlinear growth)
     216             :      until we hit max_shreds, after which the ring simply evicts. */
     217     3060699 :   if( FD_UNLIKELY( store->write_head>=store->file_shreds ) ) {
     218          81 :     ulong new_file_shreds = fd_ulong_min( store->file_shreds * 2UL, store->max_shreds );
     219          81 :     if( FD_UNLIKELY( ftruncate( store->fd, (off_t)(new_file_shreds * sizeof(fd_shredb_entry_t)) ) ) ) {
     220           0 :       FD_LOG_ERR(( "ftruncate failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     221           0 :     }
     222          81 :     store->file_shreds = new_file_shreds;
     223          81 :   }
     224             : 
     225     3060699 :   fd_shredb_entry_t * ring  = fd_shredb_ring( store );
     226     3060699 :   fd_shredb_entry_t * entry = &ring[ store->write_head ];
     227             : 
     228             :   /* If this ring entry is occupied, evict the old entry. */
     229     3060699 :   if( FD_LIKELY( entry->occupied ) ) {
     230      483852 :     ulong old_slot = fd_shredb_key_slot( entry->key );
     231      483852 :     uint  old_idx  = fd_shredb_key_shred_idx( entry->key );
     232             : 
     233      483852 :     fd_shredb_shred_entry_t * old = fd_shredb_shred_map_query( store->shred_map, entry->key, NULL );
     234      483852 :     if( FD_LIKELY( old ) ) fd_shredb_shred_map_remove( store->shred_map, old );
     235             : 
     236      483852 :     fd_shredb_slot_evict( store, old_slot, old_idx );
     237      483852 :     store->cnt--;
     238      483852 :   }
     239             : 
     240     3060699 :   entry->key      = key;
     241     3060699 :   entry->occupied = 1;
     242     3060699 :   entry->shred_sz = (ushort)shred_sz;
     243     3060699 :   fd_memcpy( entry->shred, shred, shred_sz );
     244             : 
     245     3060699 :   fd_shredb_shred_entry_t * map_entry = fd_shredb_shred_map_insert( store->shred_map, key );
     246     3060699 :   FD_TEST( map_entry );
     247     3060699 :   map_entry->ring_idx = store->write_head;
     248             : 
     249     3060699 :   fd_shredb_slot_entry_t * se = fd_shredb_slot_map_query( store->slot_map, slot, NULL );
     250     3060699 :   if( FD_LIKELY( se ) ) {
     251     2964915 :     se->cnt++;
     252     2964915 :     se->highest_shred_idx = fd_uint_max( se->highest_shred_idx, shred_idx );
     253     2964915 :   } else {
     254       95784 :     se = fd_shredb_slot_map_insert( store->slot_map, slot );
     255       95784 :     FD_TEST( se );
     256       95784 :     se->highest_shred_idx = shred_idx;
     257       95784 :     se->cnt               = 1UL;
     258       95784 :   }
     259             : 
     260     3060699 :   store->cnt++;
     261     3060699 :   store->write_head = (store->write_head + 1UL) % store->max_shreds;
     262     3060699 : }
     263             : 
     264             : int
     265             : fd_shredb_query( fd_shredb_t * store,
     266             :                  ulong         slot,
     267             :                  uint          shred_idx,
     268       90279 :                  uchar         out[ FD_SHRED_MAX_SZ ] ) {
     269             :   /* Fast-fail, if we have never heard of this slot, we must have no shreds for it. */
     270       90279 :   if( !fd_shredb_slot_map_query( store->slot_map, slot, NULL ) ) return -1;
     271             : 
     272       60267 :   ulong key = fd_shredb_key_pack( slot, shred_idx );
     273       60267 :   fd_shredb_shred_entry_t const * map_entry = fd_shredb_shred_map_query( store->shred_map, key, NULL );
     274       60267 :   if( FD_UNLIKELY( !map_entry ) ) return -1; /* No such shred. */
     275             : 
     276       60264 :   fd_shredb_entry_t * ring  = fd_shredb_ring( store );
     277       60264 :   fd_shredb_entry_t * entry = &ring[ map_entry->ring_idx ];
     278             : 
     279       60264 :   fd_memcpy( out, entry->shred, entry->shred_sz );
     280       60264 :   return entry->shred_sz;
     281       60267 : }
     282             : 
     283             : int fd_shredb_query_highest( fd_shredb_t * store,
     284             :                              ulong         slot,
     285             :                              uint          min_shred_idx,
     286           0 :                              uchar         out[ FD_SHRED_MAX_SZ ] ) {
     287           0 :   fd_shredb_slot_entry_t * se = fd_shredb_slot_map_query( store->slot_map, slot, NULL );
     288           0 :   if( FD_UNLIKELY( !se ) ) return -1;
     289             : 
     290             :   /* Check if the highest known index meets the threshold. */
     291           0 :   if( se->highest_shred_idx < min_shred_idx ) return -1;
     292             : 
     293           0 :   ulong key = fd_shredb_key_pack( slot, se->highest_shred_idx );
     294           0 :   fd_shredb_shred_entry_t const * map_entry = fd_shredb_shred_map_query( store->shred_map, key, NULL );
     295           0 :   FD_TEST( map_entry );
     296             : 
     297           0 :   fd_shredb_entry_t * ring  = fd_shredb_ring( store );
     298           0 :   fd_shredb_entry_t * entry = &ring[ map_entry->ring_idx ];
     299             : 
     300           0 :   fd_memcpy( out, entry->shred, entry->shred_sz );
     301           0 :   return entry->shred_sz;
     302           0 : }

Generated by: LCOV version 1.14