LCOV - code coverage report
Current view: top level - flamenco/runtime - fd_blockstore.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 640 0.0 %
Date: 2025-08-01 05:13:22 Functions: 0 24 0.0 %

          Line data    Source code
       1             : #include "fd_blockstore.h"
       2             : 
       3             : #include <fcntl.h>
       4             : #include <string.h>
       5             : #include <stdio.h> /* snprintf */
       6             : #include <unistd.h>
       7             : 
       8             : void *
       9             : fd_blockstore_new( void * shmem,
      10             :                    ulong  wksp_tag,
      11             :                    ulong  seed,
      12             :                    ulong  shred_max,
      13             :                    ulong  block_max,
      14           0 :                    ulong  idx_max ) {
      15             :   /* TODO temporary fix to make sure block_max is a power of 2, as
      16             :      required for slot map para. We should change to err in config
      17             :      verification eventually */
      18           0 :   block_max = fd_ulong_pow2_up( block_max );
      19           0 :   ulong lock_cnt = fd_ulong_min( block_max, BLOCK_INFO_LOCK_CNT );
      20             : 
      21           0 :   fd_blockstore_shmem_t * blockstore_shmem = (fd_blockstore_shmem_t *)shmem;
      22             : 
      23           0 :   if( FD_UNLIKELY( !blockstore_shmem ) ) {
      24           0 :     FD_LOG_WARNING(( "NULL blockstore_shmem" ));
      25           0 :     return NULL;
      26           0 :   }
      27             : 
      28           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)blockstore_shmem, fd_blockstore_align() ) )) {
      29           0 :     FD_LOG_WARNING(( "misaligned blockstore_shmem" ));
      30           0 :     return NULL;
      31           0 :   }
      32             : 
      33           0 :   if( FD_UNLIKELY( !wksp_tag ) ) {
      34           0 :     FD_LOG_WARNING(( "bad wksp_tag" ));
      35           0 :     return NULL;
      36           0 :   }
      37             : 
      38           0 :   fd_wksp_t * wksp = fd_wksp_containing( blockstore_shmem );
      39           0 :   if( FD_UNLIKELY( !wksp ) ) {
      40           0 :     FD_LOG_WARNING(( "shmem must be part of a workspace" ));
      41           0 :     return NULL;
      42           0 :   }
      43             : 
      44           0 :   if( FD_UNLIKELY( !fd_ulong_is_pow2( shred_max ) ) ) {
      45           0 :     shred_max = fd_ulong_pow2_up( shred_max );
      46           0 :     FD_LOG_WARNING(( "blockstore implementation requires shred_max to be a power of two, rounding it up to %lu", shred_max ));
      47           0 :   }
      48             : 
      49           0 :   fd_memset( blockstore_shmem, 0, sizeof( fd_blockstore_shmem_t ) );
      50             : 
      51           0 :   int   lg_idx_max   = fd_ulong_find_msb( fd_ulong_pow2_up( idx_max ) );
      52             : 
      53           0 :   FD_SCRATCH_ALLOC_INIT( l, shmem );
      54           0 :   blockstore_shmem  = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_blockstore_shmem_t), sizeof(fd_blockstore_shmem_t) );
      55           0 :   void * shreds     = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_buf_shred_t),        sizeof(fd_buf_shred_t) * shred_max );
      56           0 :   void * shred_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_buf_shred_pool_align(),      fd_buf_shred_pool_footprint() );
      57           0 :   void * shred_map  = FD_SCRATCH_ALLOC_APPEND( l, fd_buf_shred_map_align(),       fd_buf_shred_map_footprint( shred_max ) );
      58           0 :   void * blocks     = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_block_info_t),       sizeof(fd_block_info_t) * block_max );
      59           0 :   void * block_map  = FD_SCRATCH_ALLOC_APPEND( l, fd_block_map_align(),           fd_block_map_footprint( block_max, lock_cnt, BLOCK_INFO_PROBE_CNT ) );
      60           0 :   void * block_idx  = FD_SCRATCH_ALLOC_APPEND( l, fd_block_idx_align(),           fd_block_idx_footprint( lg_idx_max ) );
      61           0 :   void * slot_deque = FD_SCRATCH_ALLOC_APPEND( l, fd_slot_deque_align(),          fd_slot_deque_footprint( block_max ) );
      62           0 :   void * alloc      = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(),               fd_alloc_footprint() );
      63           0 :   ulong  top        = FD_SCRATCH_ALLOC_FINI( l, fd_blockstore_align() );
      64           0 :   FD_TEST( fd_ulong_align_up( top - (ulong)shmem, fd_alloc_align() ) == fd_ulong_align_up( fd_blockstore_footprint( shred_max, block_max, idx_max ), fd_alloc_align() ) );
      65             : 
      66           0 :   (void)shreds;
      67           0 :   fd_buf_shred_pool_new( shred_pool );
      68           0 :   fd_buf_shred_map_new ( shred_map, shred_max, seed );
      69           0 :   memset( blocks, 0, sizeof(fd_block_info_t) * block_max );
      70           0 :   FD_TEST( fd_block_map_new ( block_map, block_max, lock_cnt, BLOCK_INFO_PROBE_CNT, seed ) );
      71             : 
      72             :   /* Caller is in charge of freeing map_slot_para element store set.
      73             :      We need to explicitly do this since blocks is memset to 0, which
      74             :      is not a "freed" state in map_slot_para (slot 0 is a valid key). */
      75           0 :   fd_block_info_t * blocks_ = (fd_block_info_t *)blocks;
      76           0 :   for( ulong i=0UL; i<block_max; i++ ) {
      77           0 :     fd_block_map_private_ele_free( NULL, /* Not needed, avoids a join on block_map */
      78           0 :                                    &blocks_[i] );
      79           0 :   }
      80             : 
      81           0 :   blockstore_shmem->block_idx_gaddr  = fd_wksp_gaddr( wksp, fd_block_idx_join( fd_block_idx_new( block_idx, lg_idx_max ) ) );
      82           0 :   blockstore_shmem->slot_deque_gaddr = fd_wksp_gaddr( wksp, fd_slot_deque_join (fd_slot_deque_new( slot_deque, block_max ) ) );
      83           0 :   blockstore_shmem->alloc_gaddr      = fd_wksp_gaddr( wksp, fd_alloc_join (fd_alloc_new( alloc, wksp_tag ), wksp_tag ) );
      84             : 
      85           0 :   FD_TEST( blockstore_shmem->block_idx_gaddr  );
      86           0 :   FD_TEST( blockstore_shmem->slot_deque_gaddr );
      87           0 :   FD_TEST( blockstore_shmem->alloc_gaddr      );
      88             : 
      89           0 :   blockstore_shmem->blockstore_gaddr = fd_wksp_gaddr_fast( wksp, blockstore_shmem );
      90           0 :   blockstore_shmem->wksp_tag         = wksp_tag;
      91           0 :   blockstore_shmem->seed             = seed;
      92             : 
      93           0 :   blockstore_shmem->archiver = (fd_blockstore_archiver_t){
      94           0 :       .fd_size_max = FD_BLOCKSTORE_ARCHIVE_MIN_SIZE,
      95           0 :       .head        = FD_BLOCKSTORE_ARCHIVE_START,
      96           0 :       .tail        = FD_BLOCKSTORE_ARCHIVE_START,
      97           0 :       .num_blocks  = 0,
      98           0 :   };
      99             : 
     100           0 :   blockstore_shmem->lps = FD_SLOT_NULL;
     101           0 :   blockstore_shmem->hcs = FD_SLOT_NULL;
     102           0 :   blockstore_shmem->wmk = FD_SLOT_NULL;
     103             : 
     104           0 :   blockstore_shmem->shred_max  = shred_max;
     105           0 :   blockstore_shmem->block_max  = block_max;
     106           0 :   blockstore_shmem->idx_max    = idx_max;
     107             : 
     108           0 :   FD_COMPILER_MFENCE();
     109           0 :   FD_VOLATILE( blockstore_shmem->magic ) = FD_BLOCKSTORE_MAGIC;
     110           0 :   FD_COMPILER_MFENCE();
     111             : 
     112           0 :   return (void *)blockstore_shmem;
     113           0 : }
     114             : 
     115             : fd_blockstore_t *
     116           0 : fd_blockstore_join( void * ljoin, void * shblockstore ) {
     117           0 :   fd_blockstore_t *       join       = (fd_blockstore_t *)ljoin;
     118           0 :   fd_blockstore_shmem_t * blockstore = (fd_blockstore_shmem_t *)shblockstore;
     119             : 
     120           0 :   if( FD_UNLIKELY( !join ) ) {
     121           0 :     FD_LOG_WARNING(( "NULL ljoin" ));
     122           0 :     return NULL;
     123           0 :   }
     124             : 
     125           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)join, alignof(fd_blockstore_t) ) ) ) {
     126           0 :     FD_LOG_WARNING(( "misaligned ljoin" ));
     127           0 :     return NULL;
     128           0 :   }
     129             : 
     130           0 :   if( FD_UNLIKELY( !blockstore ) ) {
     131           0 :     FD_LOG_WARNING(( "NULL shblockstore" ));
     132           0 :     return NULL;
     133           0 :   }
     134             : 
     135           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)blockstore, fd_blockstore_align() ) )) {
     136           0 :     FD_LOG_WARNING(( "misaligned shblockstore" ));
     137           0 :     return NULL;
     138           0 :   }
     139             : 
     140           0 :   if( FD_UNLIKELY( blockstore->magic != FD_BLOCKSTORE_MAGIC ) ) {
     141           0 :     FD_LOG_WARNING(( "bad magic" ));
     142           0 :     return NULL;
     143           0 :   }
     144             : 
     145           0 :   FD_SCRATCH_ALLOC_INIT( l, shblockstore );
     146           0 :   blockstore        = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_blockstore_shmem_t), sizeof(fd_blockstore_shmem_t) );
     147           0 :   void * shreds     = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_buf_shred_t),        sizeof(fd_buf_shred_t) * blockstore->shred_max );
     148           0 :   void * shred_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_buf_shred_pool_align(),      fd_buf_shred_pool_footprint() );
     149           0 :   void * shred_map  = FD_SCRATCH_ALLOC_APPEND( l, fd_buf_shred_map_align(),       fd_buf_shred_map_footprint( blockstore->shred_max ) );
     150           0 :   void * blocks     = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_block_info_t),        sizeof(fd_block_info_t) * blockstore->block_max );
     151           0 :   void * block_map  = FD_SCRATCH_ALLOC_APPEND( l, fd_block_map_align(),           fd_block_map_footprint( blockstore->block_max,
     152           0 :                                                                                                           fd_ulong_min(blockstore->block_max, BLOCK_INFO_LOCK_CNT),
     153           0 :                                                                                                           BLOCK_INFO_PROBE_CNT ) );
     154           0 :   FD_SCRATCH_ALLOC_FINI( l, fd_blockstore_align() );
     155             : 
     156           0 :   join->shmem = blockstore;
     157           0 :   fd_buf_shred_pool_join( join->shred_pool, shred_pool, shreds, blockstore->shred_max );
     158           0 :   fd_buf_shred_map_join ( join->shred_map,  shred_map,  shreds, blockstore->shred_max );
     159           0 :   fd_block_map_join     ( join->block_map,  block_map,  blocks );
     160             : 
     161           0 :   FD_TEST( fd_buf_shred_pool_verify( join->shred_pool ) == FD_POOL_SUCCESS );
     162           0 :   FD_TEST( fd_buf_shred_map_verify ( join->shred_map  ) == FD_MAP_SUCCESS );
     163           0 :   FD_TEST( fd_block_map_verify     ( join->block_map  ) == FD_MAP_SUCCESS );
     164             : 
     165           0 :   return join;
     166           0 : }
     167             : 
     168             : void *
     169           0 : fd_blockstore_leave( fd_blockstore_t * blockstore ) {
     170             : 
     171           0 :   if( FD_UNLIKELY( !blockstore ) ) {
     172           0 :     FD_LOG_WARNING(( "NULL blockstore" ));
     173           0 :     return NULL;
     174           0 :   }
     175             : 
     176           0 :   fd_wksp_t * wksp = fd_wksp_containing( blockstore );
     177           0 :   if( FD_UNLIKELY( !wksp ) ) {
     178           0 :     FD_LOG_WARNING(( "shmem must be part of a workspace" ));
     179           0 :     return NULL;
     180           0 :   }
     181             : 
     182           0 :   FD_TEST( fd_buf_shred_pool_leave( blockstore->shred_pool ) );
     183           0 :   FD_TEST( fd_buf_shred_map_leave( blockstore->shred_map ) );
     184           0 :   FD_TEST( fd_block_map_leave( blockstore->block_map ) );
     185           0 :   FD_TEST( fd_block_idx_leave( fd_blockstore_block_idx( blockstore ) ) );
     186           0 :   FD_TEST( fd_slot_deque_leave( fd_blockstore_slot_deque( blockstore ) ) );
     187           0 :   FD_TEST( fd_alloc_leave( fd_blockstore_alloc( blockstore ) ) );
     188             : 
     189           0 :   return (void *)blockstore;
     190           0 : }
     191             : 
     192             : void *
     193           0 : fd_blockstore_delete( void * shblockstore ) {
     194           0 :   fd_blockstore_t * blockstore = (fd_blockstore_t *)shblockstore;
     195             : 
     196           0 :   if( FD_UNLIKELY( !blockstore ) ) {
     197           0 :     FD_LOG_WARNING(( "NULL shblockstore" ));
     198           0 :     return NULL;
     199           0 :   }
     200             : 
     201           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)blockstore, fd_blockstore_align() ) )) {
     202           0 :     FD_LOG_WARNING(( "misaligned shblockstore" ));
     203           0 :     return NULL;
     204           0 :   }
     205             : 
     206           0 :   if( FD_UNLIKELY( blockstore->shmem->magic != FD_BLOCKSTORE_MAGIC ) ) {
     207           0 :     FD_LOG_WARNING(( "bad magic" ));
     208           0 :     return NULL;
     209           0 :   }
     210             : 
     211           0 :   fd_wksp_t * wksp = fd_wksp_containing( blockstore );
     212           0 :   if( FD_UNLIKELY( !wksp ) ) {
     213           0 :     FD_LOG_WARNING(( "shmem must be part of a workspace" ));
     214           0 :     return NULL;
     215           0 :   }
     216             : 
     217             :   /* Delete all structures. */
     218             : 
     219           0 :   FD_TEST( fd_buf_shred_pool_delete( &blockstore->shred_pool ) );
     220           0 :   FD_TEST( fd_buf_shred_map_delete( &blockstore->shred_map ) );
     221           0 :   FD_TEST( fd_block_map_delete( &blockstore->block_map ) );
     222           0 :   FD_TEST( fd_block_idx_delete( fd_blockstore_block_idx( blockstore ) ) );
     223           0 :   FD_TEST( fd_slot_deque_delete( fd_blockstore_slot_deque( blockstore ) ) );
     224           0 :   FD_TEST( fd_alloc_delete( fd_blockstore_alloc( blockstore ) ) );
     225             : 
     226           0 :   FD_COMPILER_MFENCE();
     227           0 :   FD_VOLATILE( blockstore->shmem->magic ) = 0UL;
     228           0 :   FD_COMPILER_MFENCE();
     229             : 
     230           0 :   return blockstore;
     231           0 : }
     232             : 
     233             : #define check_read_err_safe( cond, msg )            \
     234             :   do {                                              \
     235             :     if( FD_UNLIKELY( cond ) ) {                     \
     236             :       FD_LOG_WARNING(( "[%s] %s", __func__, msg )); \
     237             :       return FD_BLOCKSTORE_ERR_SLOT_MISSING;        \
     238             :     }                                               \
     239             : } while(0);
     240             : 
     241             : fd_blockstore_t *
     242             : fd_blockstore_init( fd_blockstore_t *      blockstore,
     243             :                     int                    fd,
     244             :                     ulong                  fd_size_max,
     245           0 :                     ulong                  slot ) {
     246             : 
     247           0 :   if( fd_size_max < FD_BLOCKSTORE_ARCHIVE_MIN_SIZE ) {
     248           0 :     FD_LOG_ERR(( "archive file size too small" ));
     249           0 :     return NULL;
     250           0 :   }
     251           0 :   blockstore->shmem->archiver.fd_size_max = fd_size_max;
     252             : 
     253             :   //build_idx( blockstore, fd );
     254           0 :   lseek( fd, 0, SEEK_END );
     255             : 
     256             :   /* initialize fields using slot bank */
     257             : 
     258           0 :   ulong smr = slot;
     259             : 
     260           0 :   blockstore->shmem->lps = smr;
     261           0 :   blockstore->shmem->hcs = smr;
     262           0 :   blockstore->shmem->wmk = smr;
     263             : 
     264           0 :   fd_block_map_query_t query[1];
     265             : 
     266           0 :   int err = fd_block_map_prepare( blockstore->block_map, &smr, NULL, query, FD_MAP_FLAG_BLOCKING );
     267           0 :   fd_block_info_t * ele = fd_block_map_query_ele( query );
     268           0 :   if ( FD_UNLIKELY( err ) ) FD_LOG_ERR(( "failed to prepare block map for slot %lu", smr ));
     269             : 
     270           0 :   ele->slot = smr;
     271           0 :   memset( ele->child_slots, UCHAR_MAX, FD_BLOCKSTORE_CHILD_SLOT_MAX * sizeof( ulong ) );
     272           0 :   ele->child_slot_cnt = 0;
     273           0 :   ele->flags          = fd_uchar_set_bit(
     274           0 :                       fd_uchar_set_bit(
     275           0 :                       fd_uchar_set_bit(
     276           0 :                       fd_uchar_set_bit(
     277           0 :                       fd_uchar_set_bit( ele->flags,
     278           0 :                                         FD_BLOCK_FLAG_COMPLETED ),
     279           0 :                                         FD_BLOCK_FLAG_PROCESSED ),
     280           0 :                                         FD_BLOCK_FLAG_EQVOCSAFE ),
     281           0 :                                         FD_BLOCK_FLAG_CONFIRMED ),
     282           0 :                                         FD_BLOCK_FLAG_FINALIZED );
     283             :   // ele->ref_tick = 0;
     284           0 :   ele->ts             = 0;
     285           0 :   ele->consumed_idx   = 0;
     286           0 :   ele->received_idx   = 0;
     287           0 :   ele->buffered_idx   = 0;
     288           0 :   ele->data_complete_idx = 0;
     289           0 :   ele->slot_complete_idx = 0;
     290           0 :   ele->ticks_consumed        = 0;
     291           0 :   ele->tick_hash_count_accum = 0;
     292           0 :   fd_block_set_null( ele->data_complete_idxs );
     293             : 
     294             :   /* Set all fields to 0. Caller's responsibility to check gaddr and sz != 0. */
     295             : 
     296           0 :   fd_block_map_publish( query );
     297             : 
     298           0 :   return blockstore;
     299           0 : }
     300             : 
     301             : void
     302           0 : fd_blockstore_fini( fd_blockstore_t * blockstore ) {
     303             :   /* Free all allocations by removing all slots (whether they are
     304             :      complete or not). */
     305           0 :   fd_block_info_t * ele0 = (fd_block_info_t *)fd_block_map_shele( blockstore->block_map );
     306           0 :   ulong block_max = fd_block_map_ele_max( blockstore->block_map );
     307           0 :   for( ulong ele_idx=0; ele_idx<block_max; ele_idx++ ) {
     308           0 :     fd_block_info_t * ele = ele0 + ele_idx;
     309           0 :     if( ele->slot == 0 ) continue; /* unused */
     310           0 :     fd_blockstore_slot_remove( blockstore, ele->slot );
     311           0 :   }
     312           0 : }
     313             : 
     314             : /* Remove a slot from blockstore */
     315             : void
     316           0 : fd_blockstore_slot_remove( fd_blockstore_t * blockstore, ulong slot ) {
     317           0 :   FD_LOG_DEBUG(( "[%s] slot: %lu", __func__, slot ));
     318             : 
     319             :   /* It is not safe to remove a replaying block. */
     320           0 :   fd_block_map_query_t query[1] = { 0 };
     321           0 :   ulong parent_slot  = FD_SLOT_NULL;
     322           0 :   ulong received_idx = 0;
     323           0 :   int    err  = FD_MAP_ERR_AGAIN;
     324           0 :   while( err == FD_MAP_ERR_AGAIN ) {
     325           0 :     err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
     326           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
     327           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return; /* slot not found */
     328           0 :     fd_block_info_t * block_info = fd_block_map_query_ele( query );
     329           0 :     if( FD_UNLIKELY( fd_uchar_extract_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING ) ) ) {
     330           0 :       FD_LOG_WARNING(( "[%s] slot %lu has replay in progress. not removing.", __func__, slot ));
     331           0 :       return;
     332           0 :     }
     333           0 :     parent_slot  = block_info->parent_slot;
     334           0 :     received_idx = block_info->received_idx;
     335           0 :     err = fd_block_map_query_test( query );
     336           0 :   }
     337             : 
     338           0 :   err = fd_block_map_remove( blockstore->block_map, &slot, query, FD_MAP_FLAG_BLOCKING );
     339             :   /* not possible to fail */
     340           0 :   FD_TEST( !fd_blockstore_block_info_test( blockstore, slot ) );
     341             : 
     342             :   /* Unlink slot from its parent only if it is not published. */
     343           0 :   err = fd_block_map_prepare( blockstore->block_map, &parent_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
     344           0 :   fd_block_info_t * parent_block_info = fd_block_map_query_ele( query );
     345           0 :   if( FD_LIKELY( parent_block_info ) ) {
     346           0 :     for( ulong i = 0; i < parent_block_info->child_slot_cnt; i++ ) {
     347           0 :       if( FD_LIKELY( parent_block_info->child_slots[i] == slot ) ) {
     348           0 :         parent_block_info->child_slots[i] =
     349           0 :             parent_block_info->child_slots[--parent_block_info->child_slot_cnt];
     350           0 :       }
     351           0 :     }
     352           0 :   }
     353           0 :   fd_block_map_publish( query );
     354             : 
     355             :   /* Remove buf_shreds. */
     356           0 :   for( uint idx = 0; idx < received_idx; idx++ ) {
     357           0 :     fd_blockstore_shred_remove( blockstore, slot, idx );
     358           0 :   }
     359             : 
     360           0 :   return;
     361           0 : }
     362             : 
     363             : void
     364             : fd_blockstore_publish( fd_blockstore_t * blockstore,
     365             :                        int fd FD_PARAM_UNUSED,
     366           0 :                        ulong wmk ) {
     367           0 :   FD_LOG_NOTICE(( "[%s] wmk %lu => smr %lu", __func__, blockstore->shmem->wmk, wmk ));
     368             : 
     369             :   /* Caller is incorrectly calling publish. */
     370             : 
     371           0 :   if( FD_UNLIKELY( blockstore->shmem->wmk == wmk ) ) {
     372           0 :     FD_LOG_WARNING(( "[%s] attempting to re-publish when wmk %lu already at smr %lu", __func__, blockstore->shmem->wmk, wmk ));
     373           0 :     return;
     374           0 :   }
     375             : 
     376             :   /* q uses the slot_deque as the BFS queue */
     377             : 
     378           0 :   ulong * q = fd_blockstore_slot_deque( blockstore );
     379             : 
     380             :   /* Clear the deque, preparing it to be reused. */
     381             : 
     382           0 :   fd_slot_deque_remove_all( q );
     383             : 
     384             :   /* Push the watermark onto the queue. */
     385             : 
     386           0 :   fd_slot_deque_push_tail( q, blockstore->shmem->wmk );
     387             : 
     388             :   /* Conduct a BFS to find slots to prune or archive. */
     389             : 
     390           0 :   while( !fd_slot_deque_empty( q ) ) {
     391           0 :     ulong slot = fd_slot_deque_pop_head( q );
     392           0 :     fd_block_map_query_t query[1];
     393             :     /* Blocking read -- we need the block_info ptr to be valid for the
     394             :        whole time that we are writing stuff to the archiver file. */
     395           0 :     int err = fd_block_map_prepare( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
     396           0 :     if( FD_UNLIKELY( err ) ) {
     397           0 :       FD_LOG_WARNING(( "[%s] failed to prepare block map for blockstore publishing %lu", __func__, slot ));
     398           0 :       continue;
     399           0 :     }
     400           0 :     fd_block_info_t * block_info = fd_block_map_query_ele( query );
     401             : 
     402             :     /* Add slot's children to the queue. */
     403             : 
     404           0 :     for( ulong i = 0; i < block_info->child_slot_cnt; i++ ) {
     405             : 
     406             :       /* Stop upon reaching the SMR. */
     407             : 
     408           0 :       if( FD_LIKELY( block_info->child_slots[i] != wmk ) ) {
     409           0 :         fd_slot_deque_push_tail( q, block_info->child_slots[i] );
     410           0 :       }
     411           0 :     }
     412             : 
     413             :     /* Archive the block into a file if it is finalized. */
     414             : 
     415             :     /* if( fd_uchar_extract_bit( block_info->flags, FD_BLOCK_FLAG_FINALIZED ) ) {
     416             :       fd_block_t * block = fd_wksp_laddr_fast( fd_blockstore_wksp( blockstore ), block_info->block_gaddr );
     417             :       uchar * data = fd_wksp_laddr_fast( fd_blockstore_wksp( blockstore ), block->data_gaddr );
     418             : 
     419             :       fd_block_idx_t * block_idx = fd_blockstore_block_idx( blockstore );
     420             : 
     421             :       if( FD_UNLIKELY( fd_block_idx_query( block_idx, slot, NULL ) ) ) {
     422             :         FD_LOG_ERR(( "[%s] invariant violation. attempted to re-archive finalized block: %lu", __func__, slot ));
     423             :       } else {
     424             :         fd_blockstore_ser_t ser = {
     425             :           .block_map = block_info,
     426             :           .block     = block,
     427             :           .data      = data
     428             :         };
     429             :         fd_blockstore_block_checkpt( blockstore, &ser, fd, slot );
     430             :       }
     431             :     } */
     432           0 :     fd_block_map_cancel( query ); // TODO: maybe we should not make prepare so large and instead call prepare again in helpers
     433           0 :     fd_blockstore_slot_remove( blockstore, slot );
     434           0 :   }
     435             : 
     436             :   /* Scan to clean up any orphaned blocks or shreds < new SMR. */
     437             : 
     438           0 :   for (ulong slot = blockstore->shmem->wmk; slot < wmk; slot++) {
     439           0 :     fd_blockstore_slot_remove( blockstore, slot );
     440           0 :   }
     441             : 
     442           0 :   blockstore->shmem->wmk = wmk;
     443             : 
     444           0 :   return;
     445           0 : }
     446             : 
     447             : void
     448           0 : fd_blockstore_shred_remove( fd_blockstore_t * blockstore, ulong slot, uint idx ) {
     449           0 :   fd_shred_key_t key = { slot, idx };
     450             : 
     451           0 :   fd_buf_shred_map_query_t query[1] = { 0 };
     452           0 :   int err = fd_buf_shred_map_remove( blockstore->shred_map, &key, NULL, query, FD_MAP_FLAG_BLOCKING );
     453           0 :   if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] map corrupt: shred %lu %u", __func__, slot, idx ));
     454             : 
     455           0 :   if( FD_LIKELY( err == FD_MAP_SUCCESS ) ) {
     456           0 :     fd_buf_shred_t * shred = fd_buf_shred_map_query_ele( query );
     457           0 :     int err = fd_buf_shred_pool_release( blockstore->shred_pool, shred, 1 );
     458           0 :     if( FD_UNLIKELY( err == FD_POOL_ERR_INVAL ) ) FD_LOG_ERR(( "[%s] pool error: shred %lu %u not in pool", __func__, slot, idx ));
     459           0 :     if( FD_UNLIKELY( err == FD_POOL_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] pool corrupt: shred %lu %u", __func__, slot, idx ));
     460           0 :     FD_TEST( !err );
     461           0 :   }
     462             :   // FD_TEST( fd_buf_shred_pool_verify( blockstore->shred_pool ) == FD_POOL_SUCCESS );
     463             :   // FD_TEST( fd_buf_shred_map_verify ( blockstore->shred_map  ) == FD_MAP_SUCCESS );
     464           0 : }
     465             : 
     466             : void
     467           0 : fd_blockstore_shred_insert( fd_blockstore_t * blockstore, fd_shred_t const * shred ) {
     468             :   // FD_LOG_NOTICE(( "[%s] slot %lu idx %u", __func__, shred->slot, shred->idx ));
     469             : 
     470           0 :   ulong slot = shred->slot;
     471             : 
     472           0 :   if( FD_UNLIKELY( !fd_shred_is_data( shred->variant ) ) ) FD_LOG_ERR(( "Expected data shred" ));
     473             : 
     474           0 :   if( FD_UNLIKELY( slot < blockstore->shmem->wmk ) ) {
     475           0 :     FD_LOG_DEBUG(( "[%s] slot %lu < wmk %lu. not inserting shred", __func__, slot, blockstore->shmem->wmk ));
     476           0 :     return;
     477           0 :   }
     478             : 
     479           0 :   fd_shred_key_t key = { slot, .idx = shred->idx };
     480             : 
     481             :   /* Test if the blockstore already contains this shred key. */
     482             : 
     483           0 :   if( FD_UNLIKELY( fd_blockstore_shred_test( blockstore, slot, shred->idx ) ) ) {
     484             : 
     485             :     /* If we receive a shred with the same key (slot and shred idx) but
     486             :        different payload as one we already have, we'll only keep the
     487             :        first. Once we receive the full block, we'll use merkle chaining
     488             :        from the last FEC set to determine whether we have the correct
     489             :        shred at every index.
     490             : 
     491             :        Later, if the block fails to replay (dead block) or the block
     492             :        hash doesn't match the one we observe from votes, we'll dump the
     493             :        entire block and use repair to recover the one a majority (52%)
     494             :        of the cluster has voted on. */
     495             : 
     496           0 :     for(;;) {
     497           0 :       fd_buf_shred_map_query_t query[1]  = { 0 };
     498           0 :       int err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query, 0 );
     499           0 :       if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] %s. shred: (%lu, %u)", __func__, fd_buf_shred_map_strerror( err ), slot, shred->idx ));
     500           0 :       if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
     501           0 :       fd_buf_shred_t * buf_shred = fd_buf_shred_map_query_ele( query );
     502             :       /* An existing shred has the same key.  Eqvoc iff the payload is different */
     503           0 :       buf_shred->eqvoc = fd_shred_payload_sz( &buf_shred->hdr ) != fd_shred_payload_sz( shred ) ||
     504           0 :                          0!=memcmp( fd_shred_data_payload( &buf_shred->hdr ), fd_shred_data_payload( shred ), fd_shred_payload_sz( shred ) );
     505           0 :       err = fd_buf_shred_map_query_test( query );
     506           0 :       if( FD_LIKELY( err == FD_MAP_SUCCESS) ) break;
     507           0 :     }
     508           0 :     return;
     509           0 :   }
     510             : 
     511             :   /* Insert the new shred. */
     512             : 
     513           0 :   int err;
     514           0 :   fd_buf_shred_t * ele = fd_buf_shred_pool_acquire( blockstore->shred_pool, NULL, 1, &err );
     515           0 :   if( FD_UNLIKELY( err == FD_POOL_ERR_EMPTY ) )   FD_LOG_ERR(( "[%s] %s. increase blockstore shred_max.", __func__, fd_buf_shred_pool_strerror( err ) ));
     516           0 :   if( FD_UNLIKELY( err == FD_POOL_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] %s.", __func__, fd_buf_shred_pool_strerror( err ) ));
     517             : 
     518           0 :   ele->key = key;
     519           0 :   ele->hdr = *shred;
     520           0 :   fd_memcpy( &ele->buf, shred, fd_shred_sz( shred ) );
     521           0 :   err = fd_buf_shred_map_insert( blockstore->shred_map, ele, FD_MAP_FLAG_BLOCKING );
     522           0 :   if( FD_UNLIKELY( err == FD_MAP_ERR_INVAL ) ) FD_LOG_ERR(( "[%s] map error. ele not in pool.", __func__ ));
     523             : 
     524             :   /* Update shred's associated slot meta */
     525             : 
     526           0 :   if( FD_UNLIKELY( !fd_blockstore_block_info_test( blockstore, slot ) ) ) {
     527           0 :     fd_block_map_query_t query[1] = { 0 };
     528             :     /* Prepare will succeed regardless of if the key is in the map or not. It either returns
     529             :        the element at that idx, or it will return a spot to insert new stuff. So we need to check
     530             :        if that space is actually unused, to signify that we are adding a new entry. */
     531             : 
     532             :     /* Try to insert slot into block_map TODO make non blocking? */
     533             : 
     534           0 :     err = fd_block_map_prepare( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
     535           0 :     fd_block_info_t * block_info = fd_block_map_query_ele( query );
     536             : 
     537           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_FULL ) ){
     538           0 :       FD_LOG_ERR(( "[%s] OOM: failed to insert new block map entry. blockstore needs to save metadata for all slots >= SMR, so increase memory or check for issues with publishing new SMRs.", __func__ ));
     539           0 :     }
     540             : 
     541             :     /* Initialize the block_info. Note some fields are initialized
     542             :        to dummy values because we do not have all the necessary metadata
     543             :        yet. */
     544             : 
     545           0 :     block_info->slot = slot;
     546             : 
     547           0 :     block_info->parent_slot = slot - shred->data.parent_off;
     548           0 :     memset( block_info->child_slots, UCHAR_MAX, FD_BLOCKSTORE_CHILD_SLOT_MAX * sizeof(ulong) );
     549           0 :     block_info->child_slot_cnt = 0;
     550             : 
     551           0 :     block_info->block_hash     = ( fd_hash_t ){ 0 };
     552           0 :     block_info->bank_hash      = ( fd_hash_t ){ 0 };
     553           0 :     block_info->flags          = fd_uchar_set_bit( 0, FD_BLOCK_FLAG_RECEIVING );
     554           0 :     block_info->ts             = 0;
     555             :     // block_info->ref_tick = (uchar)( (int)shred->data.flags &
     556             :                                               //  (int)FD_SHRED_DATA_REF_TICK_MASK );
     557           0 :     block_info->buffered_idx   = UINT_MAX;
     558           0 :     block_info->received_idx   = 0;
     559           0 :     block_info->consumed_idx   = UINT_MAX;
     560             : 
     561           0 :     block_info->data_complete_idx = UINT_MAX;
     562           0 :     block_info->slot_complete_idx = UINT_MAX;
     563             : 
     564           0 :     block_info->ticks_consumed        = 0;
     565           0 :     block_info->tick_hash_count_accum = 0;
     566             : 
     567           0 :     fd_block_set_null( block_info->data_complete_idxs );
     568             : 
     569           0 :     block_info->block_gaddr    = 0;
     570             : 
     571           0 :     fd_block_map_publish( query );
     572             : 
     573           0 :     FD_TEST( fd_blockstore_block_info_test( blockstore, slot ) );
     574           0 :   }
     575           0 :   fd_block_map_query_t query[1] = { 0 };
     576           0 :   err = fd_block_map_prepare( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
     577           0 :   fd_block_info_t * block_info = fd_block_map_query_ele( query );   /* should be impossible for this to fail */
     578             : 
     579             :   /* Advance the buffered_idx watermark. */
     580             : 
     581           0 :   uint prev_buffered_idx = block_info->buffered_idx;
     582           0 :   while( FD_LIKELY( fd_blockstore_shred_test( blockstore, slot, block_info->buffered_idx + 1 ) ) ) {
     583           0 :     block_info->buffered_idx++;
     584           0 :   }
     585             : 
     586             :   /* Mark the ending shred idxs of entry batches. */
     587             : 
     588           0 :   fd_block_set_insert_if( block_info->data_complete_idxs, shred->data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE, shred->idx );
     589             : 
     590             :   /* Advance the data_complete_idx watermark using the shreds in between
     591             :      the previous consumed_idx and current consumed_idx. */
     592             : 
     593           0 :   for (uint idx = prev_buffered_idx + 1; block_info->buffered_idx != FD_SHRED_IDX_NULL && idx <= block_info->buffered_idx; idx++) {
     594           0 :     if( FD_UNLIKELY( fd_block_set_test( block_info->data_complete_idxs, idx ) ) ) {
     595           0 :       block_info->data_complete_idx = idx;
     596           0 :     }
     597           0 :   }
     598             : 
     599             :   /* Update received_idx and slot_complete_idx.  */
     600             : 
     601           0 :   block_info->received_idx = fd_uint_max( block_info->received_idx, shred->idx + 1 );
     602           0 :   if( FD_UNLIKELY( shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE ) ) {
     603             :     // FD_LOG_NOTICE(( "slot %lu %u complete", slot, shred->idx ));
     604           0 :     block_info->slot_complete_idx = shred->idx;
     605           0 :   }
     606             : 
     607           0 :   ulong parent_slot       = block_info->parent_slot;
     608             : 
     609             :   // FD_LOG_DEBUG(( "shred: (%lu, %u). consumed: %u, received: %u, complete: %u",
     610             :   //              slot,
     611             :   //              shred->idx,
     612             :   //              block_info->buffered_idx,
     613             :   //              block_info->received_idx,
     614             :   //              block_info->slot_complete_idx ));
     615           0 :   fd_block_map_publish( query );
     616             : 
     617             :   /* Update ancestry metadata: parent_slot, is_connected, next_slot.
     618             : 
     619             :      If the parent_slot happens to be very old, there's a chance that
     620             :      it's hash probe could collide with an existing slot in the block
     621             :      map, and cause what looks like an OOM. Instead of using map_prepare
     622             :      and hitting this collision, we can either check that the
     623             :      parent_slot lives in the map with a block_info_test, or use the
     624             :      shmem wmk value as a more general guard against querying for
     625             :      parents that are too old. */
     626             : 
     627           0 :   if( FD_LIKELY( parent_slot < blockstore->shmem->wmk ) ) return;
     628             : 
     629           0 :   err = fd_block_map_prepare( blockstore->block_map, &parent_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
     630           0 :   fd_block_info_t * parent_block_info = fd_block_map_query_ele( query );
     631             : 
     632             :   /* Add this slot to its parent's child slots if not already there. */
     633             : 
     634           0 :   if( FD_LIKELY( parent_block_info && parent_block_info->slot == parent_slot ) ) {
     635           0 :     int found = 0;
     636           0 :     for( ulong i = 0; i < parent_block_info->child_slot_cnt; i++ ) {
     637           0 :       if( FD_LIKELY( parent_block_info->child_slots[i] == slot ) ) {
     638           0 :         found = 1;
     639           0 :         break;
     640           0 :       }
     641           0 :     }
     642           0 :     if( FD_UNLIKELY( !found ) ) { /* add to parent's child slots if not already there */
     643           0 :       if( FD_UNLIKELY( parent_block_info->child_slot_cnt == FD_BLOCKSTORE_CHILD_SLOT_MAX ) ) {
     644           0 :         FD_LOG_ERR(( "failed to add slot %lu to parent %lu's children. exceeding child slot max",
     645           0 :                       slot,
     646           0 :                       parent_block_info->slot ));
     647           0 :       }
     648           0 :       parent_block_info->child_slots[parent_block_info->child_slot_cnt++] = slot;
     649           0 :     }
     650           0 :   }
     651           0 :   if( FD_LIKELY( err == FD_MAP_SUCCESS ) ) {
     652           0 :     fd_block_map_publish( query );
     653           0 :   } else {
     654             :     /* err is FD_MAP_ERR_FULL. Not in a valid prepare. Can happen if we
     655             :        are about to OOM, or if the parents are so far away that it just
     656             :        happens to chain longer than the probe_max. Somewhat covered by
     657             :        the early return, but there are some edge cases where we reach
     658             :        here, and it shouldn't be a LOG_ERR */
     659           0 :     FD_LOG_WARNING(( "block info not found for parent slot %lu. Have we seen it before?", parent_slot ));
     660           0 :   }
     661             : 
     662             :   //FD_TEST( fd_block_map_verify( blockstore->block_map ) == FD_MAP_SUCCESS );
     663           0 : }
     664             : 
     665             : int
     666           0 : fd_blockstore_shred_test( fd_blockstore_t * blockstore, ulong slot, uint idx ) {
     667           0 :   fd_shred_key_t key = { slot, idx };
     668           0 :   fd_buf_shred_map_query_t query[1] = { 0 };
     669             : 
     670           0 :   for(;;) {
     671           0 :     int err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query, 0 );
     672           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] slot: %lu idx: %u. %s", __func__, slot, idx, fd_buf_shred_map_strerror( err ) ));
     673           0 :     if( FD_LIKELY( !fd_buf_shred_map_query_test( query ) ) ) return err != FD_MAP_ERR_KEY;
     674           0 :   }
     675           0 : }
     676             : 
     677             : int
     678           0 : fd_blockstore_block_info_test( fd_blockstore_t * blockstore, ulong slot ) {
     679           0 :   int err = FD_MAP_ERR_AGAIN;
     680           0 :   while( err == FD_MAP_ERR_AGAIN ){
     681           0 :     fd_block_map_query_t query[1] = { 0 };
     682           0 :     err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
     683           0 :     if( err == FD_MAP_ERR_AGAIN ) continue;
     684           0 :     if( err == FD_MAP_ERR_KEY ) return 0;
     685           0 :     err = fd_block_map_query_test( query );
     686           0 :   }
     687           0 :   return 1;
     688           0 : }
     689             : 
     690             : fd_block_info_t *
     691           0 : fd_blockstore_block_map_query( fd_blockstore_t * blockstore, ulong slot ){
     692           0 :   fd_block_map_query_t quer[1] = { 0 };
     693           0 :   int err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, quer, FD_MAP_FLAG_BLOCKING );
     694           0 :   fd_block_info_t * meta = fd_block_map_query_ele( quer );
     695           0 :   if( err ) return NULL;
     696           0 :   return meta;
     697           0 : }
     698             : 
     699             : int
     700           0 : fd_blockstore_block_info_remove( fd_blockstore_t * blockstore, ulong slot ){
     701           0 :    int err = FD_MAP_ERR_AGAIN;
     702           0 :    while( err == FD_MAP_ERR_AGAIN ){
     703           0 :      err = fd_block_map_remove( blockstore->block_map, &slot, NULL, 0 );
     704           0 :      if( err == FD_MAP_ERR_KEY ) return FD_BLOCKSTORE_ERR_SLOT_MISSING;
     705           0 :    }
     706           0 :   return FD_BLOCKSTORE_SUCCESS;
     707           0 : }
     708             : 
     709             : long
     710           0 : fd_buf_shred_query_copy_data( fd_blockstore_t * blockstore, ulong slot, uint idx, void * buf, ulong buf_sz ) {
     711           0 :   if( buf_sz < FD_SHRED_MAX_SZ ) return -1;
     712           0 :   fd_shred_key_t key = { slot, idx };
     713           0 :   ulong          sz  = 0;
     714           0 :   int            err = FD_MAP_ERR_AGAIN;
     715           0 :   while( err == FD_MAP_ERR_AGAIN ) {
     716           0 :     fd_buf_shred_map_query_t query[1] = { 0 };
     717           0 :     err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query, 0 );
     718           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return -1;
     719           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] map corrupt. shred %lu %u", __func__, slot, idx ));
     720           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
     721           0 :     fd_buf_shred_t const * shred = fd_buf_shred_map_query_ele_const( query );
     722           0 :     sz = fd_shred_sz( &shred->hdr );
     723           0 :     memcpy( buf, shred->buf, sz );
     724           0 :     err = fd_buf_shred_map_query_test( query );
     725           0 :   }
     726           0 :   FD_TEST( !err );
     727           0 :   return (long)sz;
     728           0 : }
     729             : 
     730             : int
     731           0 : fd_blockstore_block_hash_query( fd_blockstore_t * blockstore, ulong slot, fd_hash_t * hash_out ) {
     732           0 :   for(;;) { /* Speculate */
     733           0 :     fd_block_map_query_t query[1] = { 0 };
     734           0 :     int err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
     735           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) )   return FD_BLOCKSTORE_ERR_KEY;
     736           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
     737           0 :     fd_block_info_t * block_info = fd_block_map_query_ele( query );
     738           0 :     *hash_out = block_info->block_hash;
     739           0 :     if( FD_LIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) return FD_BLOCKSTORE_SUCCESS;
     740           0 :   }
     741           0 : }
     742             : 
     743             : int
     744           0 : fd_blockstore_bank_hash_query( fd_blockstore_t * blockstore, ulong slot, fd_hash_t * hash_out ) {
     745           0 :   for(;;) { /* Speculate */
     746           0 :     fd_block_map_query_t query[1] = { 0 };
     747           0 :     int err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
     748           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) )   return FD_BLOCKSTORE_ERR_KEY;
     749           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
     750           0 :     fd_block_info_t * block_info = fd_block_map_query_ele( query );
     751           0 :     *hash_out = block_info->bank_hash;
     752           0 :     if( FD_LIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) return FD_BLOCKSTORE_SUCCESS;
     753           0 :   }
     754           0 : }
     755             : 
     756             : ulong
     757           0 : fd_blockstore_parent_slot_query( fd_blockstore_t * blockstore, ulong slot ) {
     758           0 :   int err = FD_MAP_ERR_AGAIN;
     759           0 :   ulong parent_slot = FD_SLOT_NULL;
     760           0 :   while( err == FD_MAP_ERR_AGAIN ){
     761           0 :     fd_block_map_query_t query[1] = { 0 };
     762           0 :     err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
     763           0 :     fd_block_info_t * block_info = fd_block_map_query_ele( query );
     764             : 
     765           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return FD_SLOT_NULL;
     766           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
     767             : 
     768           0 :     parent_slot = block_info->parent_slot;
     769           0 :     err = fd_block_map_query_test( query );
     770           0 :   }
     771           0 :   return parent_slot;
     772           0 : }
     773             : 
     774             : int
     775             : fd_blockstore_slice_query( fd_blockstore_t * blockstore,
     776             :                            ulong             slot,
     777             :                            uint              start_idx,
     778             :                            uint              end_idx /* inclusive */,
     779             :                            ulong             max,
     780             :                            uchar *           buf,
     781           0 :                            ulong *           buf_sz ) {
     782             :   /* verify that the batch idxs provided is at batch boundaries*/
     783             : 
     784             :   // FD_LOG_NOTICE(( "querying for %lu %u %u", slot, start_idx, end_idx ));
     785             : 
     786           0 :   ulong off = 0;
     787           0 :   for(uint idx = start_idx; idx <= end_idx; idx++) {
     788           0 :     ulong payload_sz = 0;
     789             : 
     790           0 :     for(;;) { /* speculative copy one shred */
     791           0 :       fd_shred_key_t key = { slot, idx };
     792           0 :       fd_buf_shred_map_query_t query[1] = { 0 };
     793           0 :       int err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query, 0 );
     794           0 :       if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ){
     795           0 :         FD_LOG_WARNING(( "[%s] key: (%lu, %u) %s", __func__, slot, idx, fd_buf_shred_map_strerror( err ) ));
     796           0 :         return FD_BLOCKSTORE_ERR_CORRUPT;
     797           0 :       }
     798           0 :       if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ){
     799           0 :         FD_LOG_WARNING(( "[%s] key: (%lu, %u) %s", __func__, slot, idx, fd_buf_shred_map_strerror( err ) ));
     800           0 :         return FD_BLOCKSTORE_ERR_KEY;
     801           0 :       }
     802           0 :       if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
     803             : 
     804           0 :       fd_buf_shred_t const * shred      = fd_buf_shred_map_query_ele_const( query );
     805           0 :       uchar const *          payload    = fd_shred_data_payload( &shred->hdr );
     806           0 :       payload_sz                        = fd_shred_payload_sz( &shred->hdr );
     807           0 :       if( FD_UNLIKELY( off + payload_sz > max ) ) {
     808           0 :         FD_LOG_WARNING(( "[%s] increase `max`", __func__ )); /* caller needs to increase max */
     809           0 :         return FD_BLOCKSTORE_ERR_INVAL;
     810           0 :       }
     811             : 
     812           0 :       if( FD_UNLIKELY( payload_sz > FD_SHRED_DATA_PAYLOAD_MAX ) ) return FD_BLOCKSTORE_ERR_SHRED_INVALID;
     813           0 :       if( FD_UNLIKELY( off + payload_sz > max ) ) return FD_BLOCKSTORE_ERR_NO_MEM;
     814           0 :       fd_memcpy( buf + off, payload, payload_sz );
     815           0 :       err = fd_buf_shred_map_query_test( query );
     816           0 :       if( FD_LIKELY( err == FD_MAP_SUCCESS ) ) break;
     817           0 :     }; /* successful speculative copy */
     818             : 
     819           0 :     off += payload_sz;
     820           0 :   }
     821           0 :   *buf_sz = off;
     822           0 :   return FD_BLOCKSTORE_SUCCESS;
     823           0 : }
     824             : 
     825             : int
     826           0 : fd_blockstore_shreds_complete( fd_blockstore_t * blockstore, ulong slot ){
     827             :   //fd_block_t * block_exists = fd_blockstore_block_query( blockstore,  slot );
     828           0 :   fd_block_map_query_t query[1];
     829           0 :   int complete = 0;
     830           0 :   int err     = FD_MAP_ERR_AGAIN;
     831           0 :   while( err == FD_MAP_ERR_AGAIN ){
     832           0 :     err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
     833           0 :     fd_block_info_t * block_info = fd_block_map_query_ele( query );
     834           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return 0;
     835           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
     836           0 :     complete = ( block_info->buffered_idx != FD_SHRED_IDX_NULL ) &&
     837           0 :                ( block_info->slot_complete_idx == block_info->buffered_idx );
     838           0 :     err = fd_block_map_query_test( query );
     839           0 :   }
     840           0 :   return complete;
     841             : 
     842             :   /* When replacing block_query( slot ) != NULL with this function:
     843             :      There are other things verified in a successful deshred & scan block that are not verified here.
     844             :      scan_block does a round of well-formedness checks like parsing txns, and no premature end of batch
     845             :      like needing cnt, microblock, microblock format.
     846             : 
     847             :      This maybe should be fine in places where we check both
     848             :      shreds_complete and flag PROCESSED/REPLAYING is set, because validation has been for sure done
     849             :      if the block has been replayed
     850             : 
     851             :      Should be careful in places that call this now that happen before the block is replayed, if we want
     852             :      to assume the shreds are well-formed we can't. */
     853             : 
     854           0 : }
     855             : 
     856             : int
     857             : fd_blockstore_block_map_query_volatile( fd_blockstore_t * blockstore,
     858             :                                         int               fd,
     859             :                                         ulong             slot,
     860           0 :                                         fd_block_info_t *  block_info_out ) {
     861             : 
     862             :   /* WARNING: this code is extremely delicate. Do NOT modify without
     863             :      understanding all the invariants. In particular, we must never
     864             :      dereference through a corrupt pointer. It's OK for the destination
     865             :      data to be overwritten/invalid as long as the memory location is
     866             :      valid. As long as we don't crash, we can validate the data after it
     867             :      is read. */
     868             : 
     869           0 :   fd_block_idx_t * block_idx = fd_blockstore_block_idx( blockstore );
     870             : 
     871           0 :   ulong off = ULONG_MAX;
     872           0 :   for( ;; ) {
     873           0 :     fd_block_idx_t * idx_entry = fd_block_idx_query( block_idx, slot, NULL );
     874           0 :     if( FD_LIKELY( idx_entry ) ) off = idx_entry->off;
     875           0 :     break;
     876           0 :   }
     877             : 
     878           0 :   if( FD_UNLIKELY( off < ULONG_MAX ) ) { /* optimize for non-archival queries */
     879           0 :     if( FD_UNLIKELY( lseek( fd, (long)off, SEEK_SET ) == -1 ) ) {
     880           0 :       FD_LOG_WARNING(( "failed to seek" ));
     881           0 :       return FD_BLOCKSTORE_ERR_SLOT_MISSING;
     882           0 :     }
     883           0 :     ulong rsz;
     884           0 :     int   err = fd_io_read( fd, block_info_out, sizeof( fd_block_info_t ), sizeof( fd_block_info_t ), &rsz );
     885           0 :     if( FD_UNLIKELY( err ) ) {
     886           0 :       FD_LOG_WARNING(( "failed to read block map entry" ));
     887           0 :       return FD_BLOCKSTORE_ERR_SLOT_MISSING;
     888           0 :     }
     889           0 :     return FD_BLOCKSTORE_SUCCESS;
     890           0 :   }
     891             : 
     892           0 :   int err = FD_MAP_ERR_AGAIN;
     893           0 :   while( err == FD_MAP_ERR_AGAIN ) {
     894           0 :     fd_block_map_query_t quer[1] = { 0 };
     895           0 :     err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, quer, 0 );
     896           0 :     fd_block_info_t const * query = fd_block_map_query_ele_const( quer );
     897           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return FD_BLOCKSTORE_ERR_SLOT_MISSING;
     898             : 
     899           0 :     *block_info_out = *query;
     900             : 
     901           0 :     err = fd_block_map_query_test( quer );
     902           0 :   }
     903           0 :   return FD_BLOCKSTORE_SUCCESS;
     904           0 : }
     905             : 
     906             : void
     907           0 : fd_blockstore_log_block_status( fd_blockstore_t * blockstore, ulong around_slot ) {
     908           0 :   fd_block_map_query_t query[1] = { 0 };
     909           0 :   uint received_idx = 0;
     910           0 :   uint buffered_idx = 0;
     911           0 :   uint slot_complete_idx = 0;
     912             : 
     913           0 :   for( ulong i = around_slot - 5; i < around_slot + 20; ++i ) {
     914           0 :     int err = FD_MAP_ERR_AGAIN;
     915           0 :     while( err == FD_MAP_ERR_AGAIN ){
     916           0 :       err = fd_block_map_query_try( blockstore->block_map, &i, NULL, query, 0 );
     917           0 :       fd_block_info_t * slot_entry = fd_block_map_query_ele( query );
     918           0 :       if( err == FD_MAP_ERR_KEY ) break;
     919           0 :       if( err == FD_MAP_ERR_AGAIN ) continue;
     920           0 :       received_idx = slot_entry->received_idx;
     921           0 :       buffered_idx = slot_entry->buffered_idx;
     922           0 :       slot_complete_idx = slot_entry->slot_complete_idx;
     923           0 :       err = fd_block_map_query_test( query );
     924           0 :       if( err == FD_MAP_ERR_KEY ) break;
     925           0 :     }
     926             : 
     927           0 :     if( err == FD_MAP_ERR_KEY ) continue;
     928             : 
     929           0 :     FD_LOG_NOTICE(( "%sslot=%lu received=%u consumed=%u finished=%u",
     930           0 :                     ( i == around_slot ? "*" : " " ),
     931           0 :                     i,
     932           0 :                     received_idx,
     933           0 :                     buffered_idx,
     934           0 :                     slot_complete_idx ));
     935           0 :   }
     936           0 : }
     937             : 
     938             : static char *
     939           0 : fd_smart_size( ulong sz, char * tmp, size_t tmpsz ) {
     940           0 :   if( sz <= (1UL<<7) )
     941           0 :     snprintf( tmp, tmpsz, "%lu B", sz );
     942           0 :   else if( sz <= (1UL<<17) )
     943           0 :     snprintf( tmp, tmpsz, "%.3f KB", ((double)sz/((double)(1UL<<10))) );
     944           0 :   else if( sz <= (1UL<<27) )
     945           0 :     snprintf( tmp, tmpsz, "%.3f MB", ((double)sz/((double)(1UL<<20))) );
     946           0 :   else
     947           0 :     snprintf( tmp, tmpsz, "%.3f GB", ((double)sz/((double)(1UL<<30))) );
     948           0 :   return tmp;
     949           0 : }
     950             : 
     951             : void
     952           0 : fd_blockstore_log_mem_usage( fd_blockstore_t * blockstore ) {
     953           0 :   char tmp1[100];
     954             : 
     955           0 :   FD_LOG_NOTICE(( "blockstore base footprint: %s",
     956           0 :                   fd_smart_size( sizeof(fd_blockstore_t), tmp1, sizeof(tmp1) ) ));
     957           0 :   ulong shred_max = fd_buf_shred_pool_ele_max( blockstore->shred_pool );
     958           0 :   FD_LOG_NOTICE(( "shred pool footprint: %s %lu entries)",
     959           0 :                   fd_smart_size( fd_buf_shred_pool_footprint(), tmp1, sizeof(tmp1) ),
     960           0 :                   shred_max ));
     961           0 :   ulong shred_map_cnt = fd_buf_shred_map_chain_cnt( blockstore->shred_map );
     962           0 :   FD_LOG_NOTICE(( "shred map footprint: %s (%lu chains, load is %.3f)",
     963           0 :                   fd_smart_size( fd_buf_shred_map_footprint( shred_map_cnt ), tmp1, sizeof(tmp1) ),
     964           0 :                   shred_map_cnt,
     965           0 :                   (double)shred_map_cnt) );
     966             : 
     967             :   /*fd_block_info_t * slot_map = fd_blockstore_block_map( blockstore );
     968             :   ulong slot_map_cnt = fd_block_map_key_cnt( slot_map );
     969             :   ulong slot_map_max = fd_block_map_key_max( slot_map );
     970             :   FD_LOG_NOTICE(( "slot map footprint: %s (%lu entries used out of %lu, %lu%%)",
     971             :                   fd_smart_size( fd_block_map_footprint( slot_map_max ), tmp1, sizeof(tmp1) ),
     972             :                   slot_map_cnt,
     973             :                   slot_map_max,
     974             :                   (100U*slot_map_cnt)/slot_map_max )); */
     975             : 
     976           0 :   ulong block_cnt = 0;
     977             : 
     978           0 :   ulong * q = fd_blockstore_slot_deque( blockstore );
     979           0 :   fd_slot_deque_remove_all( q );
     980           0 :   fd_slot_deque_push_tail( q, blockstore->shmem->wmk );
     981           0 :   while( !fd_slot_deque_empty( q ) ) {
     982           0 :     ulong curr = fd_slot_deque_pop_head( q );
     983             : 
     984           0 :     fd_block_map_query_t query[1] = { 0 };
     985           0 :     int err = fd_block_map_query_try( blockstore->block_map, &curr, NULL, query, FD_MAP_FLAG_BLOCKING );
     986           0 :     fd_block_info_t * block_info = fd_block_map_query_ele( query );
     987           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_KEY || !block_info ) ) continue;
     988             : 
     989           0 :     for( ulong i = 0; i < block_info->child_slot_cnt; i++ ) {
     990           0 :       fd_slot_deque_push_tail( q, block_info->child_slots[i] );
     991           0 :     }
     992           0 :   }
     993             : 
     994           0 :   if( block_cnt )
     995           0 :     FD_LOG_NOTICE(( "block cnt: %lu",
     996           0 :                     block_cnt ));
     997           0 : }

Generated by: LCOV version 1.14