LCOV - code coverage report
Current view: top level - discof/restore - fd_snapin_tile_funk.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 215 0.0 %
Date: 2025-12-07 04:58:33 Functions: 0 5 0.0 %

          Line data    Source code
       1             : /* fd_snapin_tile_funk.c contains APIs to load accounts into funk. */
       2             : 
       3             : #include "fd_snapin_tile_private.h"
       4             : #include "../../flamenco/accdb/fd_accdb_sync.h"
       5             : 
       6             : int
       7             : fd_snapin_process_account_header_funk( fd_snapin_tile_t *            ctx,
       8           0 :                                        fd_ssparse_advance_result_t * result ) {
       9           0 :   fd_funk_t * funk = ctx->accdb_admin->funk;
      10             : 
      11           0 :   fd_funk_rec_key_t id = FD_LOAD( fd_funk_rec_key_t, result->account_header.pubkey );
      12           0 :   fd_funk_rec_query_t query[1];
      13           0 :   fd_funk_rec_t * rec = fd_funk_rec_query_try( funk, ctx->xid, &id, query );
      14           0 :   fd_funk_rec_t const * existing_rec = rec;
      15             : 
      16           0 :   int early_exit = 0;
      17           0 :   if( !ctx->full && !existing_rec ) {
      18           0 :     fd_accdb_peek_t peek[1];
      19           0 :     if( fd_accdb_peek( ctx->accdb, peek, ctx->xid, result->account_header.pubkey ) ) {
      20           0 :       existing_rec = peek->acc->rec;
      21           0 :     }
      22           0 :   }
      23           0 :   if( FD_UNLIKELY( existing_rec ) ) {
      24           0 :     fd_account_meta_t * meta = fd_funk_val( existing_rec, funk->wksp );
      25           0 :     if( FD_UNLIKELY( meta ) ) {
      26           0 :       if( FD_LIKELY( meta->slot>result->account_header.slot ) ) {
      27           0 :         ctx->acc_data = NULL;
      28           0 :         fd_snapin_send_duplicate_account( ctx, result->account_header.lamports, NULL, result->account_header.data_len, (uchar)result->account_header.executable, result->account_header.owner, result->account_header.pubkey, 0, &early_exit );
      29           0 :         return early_exit;
      30           0 :       }
      31           0 :       fd_snapin_send_duplicate_account( ctx, meta->lamports, (uchar const *)meta + sizeof(fd_account_meta_t), meta->dlen, meta->executable, meta->owner, result->account_header.pubkey, 1, &early_exit);
      32           0 :     }
      33           0 :   }
      34             : 
      35           0 :   int should_publish = 0;
      36           0 :   fd_funk_rec_prepare_t prepare[1];
      37           0 :   if( FD_LIKELY( !rec ) ) {
      38           0 :     should_publish = 1;
      39           0 :     rec = fd_funk_rec_prepare( funk, ctx->xid, &id, prepare, NULL );
      40           0 :     FD_TEST( rec );
      41           0 :   }
      42             : 
      43           0 :   fd_account_meta_t * meta = fd_funk_val( rec, funk->wksp );
      44             :   /* Allocate data space from heap, free old value (if any) */
      45           0 :   fd_funk_val_flush( rec, funk->alloc, funk->wksp );
      46           0 :   ulong const alloc_sz = sizeof(fd_account_meta_t)+result->account_header.data_len;
      47           0 :   ulong       alloc_max;
      48           0 :   meta = fd_alloc_malloc_at_least( funk->alloc, 16UL, alloc_sz, &alloc_max );
      49           0 :   if( FD_UNLIKELY( !meta ) ) FD_LOG_ERR(( "Ran out of heap memory while loading snapshot (increase [funk.heap_size_gib])" ));
      50           0 :   memset( meta, 0, sizeof(fd_account_meta_t) );
      51           0 :   rec->val_gaddr = fd_wksp_gaddr_fast( funk->wksp, meta );
      52           0 :   rec->val_max   = (uint)( fd_ulong_min( alloc_max, FD_FUNK_REC_VAL_MAX ) & FD_FUNK_REC_VAL_MAX );
      53           0 :   rec->val_sz    = (uint)( alloc_sz  & FD_FUNK_REC_VAL_MAX );
      54             : 
      55           0 :   meta->dlen       = (uint)result->account_header.data_len;
      56           0 :   meta->slot       = result->account_header.slot;
      57           0 :   memcpy( meta->owner, result->account_header.owner, sizeof(fd_pubkey_t) );
      58           0 :   meta->lamports   = result->account_header.lamports;
      59           0 :   meta->executable = (uchar)result->account_header.executable;
      60             : 
      61           0 :   ctx->acc_data = (uchar*)meta + sizeof(fd_account_meta_t);
      62           0 :   ctx->metrics.accounts_inserted++;
      63             : 
      64           0 :   if( FD_LIKELY( should_publish ) ) fd_funk_rec_publish( funk, prepare );
      65           0 :   return early_exit;
      66           0 : }
      67             : 
      68             : int
      69             : fd_snapin_process_account_data_funk( fd_snapin_tile_t *            ctx,
      70           0 :                                      fd_ssparse_advance_result_t * result ) {
      71           0 :   int early_exit = 0;
      72           0 :   if( FD_UNLIKELY( !ctx->acc_data ) ) {
      73           0 :     fd_snapin_send_duplicate_account_data( ctx, result->account_data.data, result->account_data.data_sz, &early_exit );
      74           0 :     return early_exit;
      75           0 :   }
      76             : 
      77           0 :   fd_memcpy( ctx->acc_data, result->account_data.data, result->account_data.data_sz );
      78           0 :   ctx->acc_data += result->account_data.data_sz;
      79           0 :   return 0;
      80           0 : }
      81             : 
      82             : /* streamlined_insert inserts an unfragmented account.
      83             :    Only used while loading a full snapshot, not an incremental. */
      84             : 
      85             : static void
      86             : streamlined_insert( fd_snapin_tile_t * ctx,
      87             :                     fd_funk_rec_t *    rec,
      88             :                     uchar const *      frame,
      89           0 :                     ulong              slot ) {
      90           0 :   ulong data_len   = fd_ulong_load_8_fast( frame+0x08UL );
      91           0 :   ulong lamports   = fd_ulong_load_8_fast( frame+0x30UL );
      92           0 :   ulong rent_epoch = fd_ulong_load_8_fast( frame+0x38UL ); (void)rent_epoch;
      93           0 :   uchar owner[32];   memcpy( owner, frame+0x40UL, 32UL );
      94           0 :   _Bool executable = !!frame[ 0x60UL ];
      95             : 
      96           0 :   fd_funk_t * funk = ctx->accdb_admin->funk;
      97           0 :   if( FD_UNLIKELY( data_len > FD_RUNTIME_ACC_SZ_MAX ) ) FD_LOG_ERR(( "Found unusually large account (data_sz=%lu), aborting", data_len ));
      98           0 :   fd_funk_val_flush( rec, funk->alloc, funk->wksp );
      99           0 :   ulong const alloc_sz = sizeof(fd_account_meta_t)+data_len;
     100           0 :   ulong       alloc_max;
     101           0 :   fd_account_meta_t * meta = fd_alloc_malloc_at_least( funk->alloc, 16UL, alloc_sz, &alloc_max );
     102           0 :   if( FD_UNLIKELY( !meta ) ) FD_LOG_ERR(( "Ran out of heap memory while loading snapshot (increase [funk.heap_size_gib])" ));
     103           0 :   memset( meta, 0, sizeof(fd_account_meta_t) );
     104           0 :   rec->val_gaddr = fd_wksp_gaddr_fast( funk->wksp, meta );
     105           0 :   rec->val_max   = (uint)( fd_ulong_min( alloc_max, FD_FUNK_REC_VAL_MAX ) & FD_FUNK_REC_VAL_MAX );
     106           0 :   rec->val_sz    = (uint)( alloc_sz  & FD_FUNK_REC_VAL_MAX );
     107             : 
     108             :   /* Write metadata */
     109           0 :   meta->dlen = (uint)data_len;
     110           0 :   meta->slot = slot;
     111           0 :   memcpy( meta->owner, owner, sizeof(fd_pubkey_t) );
     112           0 :   meta->lamports   = lamports;
     113           0 :   meta->executable = (uchar)executable;
     114             : 
     115             :   /* Write data */
     116           0 :   uchar * acc_data = (uchar *)( meta+1 );
     117           0 :   fd_memcpy( acc_data, frame+0x88UL, data_len );
     118             : 
     119           0 :   ctx->metrics.accounts_inserted++;
     120           0 : }
     121             : 
     122             : /* process_account_batch is a happy path performance optimization
     123             :    handling insertion of lots of small accounts.
     124             : 
     125             :    The main optimization implemented for funk is doing hash map memory
     126             :    accesses in parallel to amortize DRAM latency. */
     127             : 
     128             : int
     129             : fd_snapin_process_account_batch_funk( fd_snapin_tile_t *            ctx,
     130             :                                       fd_ssparse_advance_result_t * result,
     131           0 :                                       buffered_account_batch_t *    buffered_batch ) {
     132           0 :   int early_exit  = 0;
     133           0 :   ulong start_idx = result ? 0 : buffered_batch->remaining_idx;
     134           0 :   fd_funk_t *         funk    = ctx->accdb_admin->funk;
     135           0 :   fd_funk_rec_map_t * rec_map = funk->rec_map;
     136           0 :   fd_funk_rec_t *     rec_tbl = funk->rec_pool->ele;
     137           0 :   fd_funk_rec_map_shmem_private_chain_t * chain_tbl = fd_funk_rec_map_shmem_private_chain( rec_map->map, 0UL );
     138             : 
     139             :   /* Derive map chains */
     140           0 :   uint chain_idx[ FD_SSPARSE_ACC_BATCH_MAX ];
     141           0 :   ulong chain_mask = rec_map->map->chain_cnt-1UL;
     142           0 :   for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
     143           0 :     uchar const * frame  = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
     144           0 :     uchar const * pubkey = frame+0x10UL;
     145           0 :     ulong         memo   = fd_funk_rec_key_hash1( pubkey, rec_map->map->seed );
     146           0 :     chain_idx[ i ] = (uint)( memo&chain_mask );
     147           0 :   }
     148             : 
     149             :   /* Parallel load hash chain heads */
     150           0 :   uint map_node [ FD_SSPARSE_ACC_BATCH_MAX ];
     151           0 :   uint chain_cnt[ FD_SSPARSE_ACC_BATCH_MAX ];
     152           0 :   for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
     153           0 :     map_node [ i ] =       chain_tbl[ chain_idx[ i ] ].head_cidx;
     154           0 :     chain_cnt[ i ] = (uint)chain_tbl[ chain_idx[ i ] ].ver_cnt;
     155           0 :   }
     156           0 :   uint chain_max = 0U;
     157           0 :   for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
     158           0 :     chain_max = fd_uint_max( chain_max, chain_cnt[ i ] );
     159           0 :   }
     160             : 
     161             :   /* Parallel walk hash chains */
     162           0 :   static fd_funk_rec_t dummy_rec = { .map_next = UINT_MAX };
     163           0 :   fd_funk_rec_t * rec[ FD_SSPARSE_ACC_BATCH_MAX ] = {0};
     164           0 :   for( ulong j=0UL; j<chain_max; j++ ) {
     165           0 :     for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
     166           0 :       uchar const *   frame     = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
     167           0 :       uchar const *   pubkey    = frame+0x10UL;
     168           0 :       int const       has_node  = j<chain_cnt[ i ];
     169           0 :       fd_funk_rec_t * node      = has_node ? rec_tbl+map_node[ i ] : &dummy_rec;
     170           0 :       int const       key_match = 0==memcmp( node->pair.key, pubkey, sizeof(fd_funk_rec_key_t) );
     171           0 :       if( has_node && key_match ) rec[ i ] = node;
     172           0 :       map_node[ i ] = node->map_next;
     173           0 :     }
     174           0 :   }
     175             : 
     176             :   /* Create map entries */
     177           0 :   ulong insert_limit = FD_SSPARSE_ACC_BATCH_MAX;
     178           0 :   for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
     179           0 :     ulong         slot       = result ? result->account_batch.slot : buffered_batch->slot;
     180           0 :     uchar const * frame      = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
     181           0 :     uchar const * pubkey     = frame+0x10UL;
     182           0 :     ulong         data_len   = fd_ulong_load_8_fast( frame+0x08UL );
     183           0 :     ulong         lamports   = fd_ulong_load_8_fast( frame+0x30UL );
     184           0 :     ulong         rent_epoch = fd_ulong_load_8_fast( frame+0x38UL ); (void)rent_epoch;
     185           0 :     _Bool         executable = !!frame[ 0x60UL ];
     186           0 :     uchar const * data       = frame+0x88UL;
     187           0 :     uchar owner[32];   memcpy( owner, frame+0x40UL, 32UL );
     188           0 :     fd_funk_rec_key_t key = FD_LOAD( fd_funk_rec_key_t, pubkey );
     189             : 
     190           0 :     fd_funk_rec_t * r = rec[ i ];
     191           0 :     if( FD_LIKELY( !r ) ) {  /* optimize for new account */
     192           0 :       r = fd_funk_rec_pool_acquire( funk->rec_pool, NULL, 0, NULL );
     193           0 :       FD_TEST( r );
     194           0 :       memset( r, 0, sizeof(fd_funk_rec_t) );
     195           0 :       fd_funk_txn_xid_copy( r->pair.xid, ctx->xid );
     196           0 :       fd_funk_rec_key_copy( r->pair.key, &key );
     197           0 :       r->prev_idx = UINT_MAX;
     198           0 :       r->next_idx = UINT_MAX;
     199             : 
     200             :       /* Insert to hash map.  In theory, a key could appear twice in the
     201             :          same batch.  All accounts in a batch are guaranteed to be from
     202             :          the same slot though, so this is fine, assuming that accdb code
     203             :          gracefully handles duplicate hash map entries. */
     204           0 :       fd_funk_rec_map_shmem_private_chain_t * chain = &chain_tbl[ chain_idx[ i ] ];
     205           0 :       ulong ver_cnt    = chain->ver_cnt;
     206           0 :       uint  head_cidx  = chain->head_cidx;
     207           0 :       chain->ver_cnt   = fd_funk_rec_map_private_vcnt( fd_funk_rec_map_private_vcnt_ver( ver_cnt ), fd_funk_rec_map_private_vcnt_cnt( ver_cnt )+1UL );
     208           0 :       chain->head_cidx = (uint)( r-rec_tbl );
     209           0 :       r->map_next      = head_cidx;
     210           0 :       rec[ i ]         = r;
     211           0 :     } else {  /* existing record for key found */
     212           0 :       fd_account_meta_t const * existing = fd_funk_val( r, funk->wksp );
     213           0 :       if( FD_UNLIKELY( !existing ) ) FD_LOG_HEXDUMP_NOTICE(( "r", r, sizeof(fd_funk_rec_t) ));
     214           0 :       FD_TEST( existing );
     215           0 :       if( existing->slot > slot ) {
     216           0 :         rec[ i ] = NULL;  /* skip record if existing value is newer */
     217             :         /* send the skipped account to the subtracting hash tile */
     218           0 :         fd_snapin_send_duplicate_account( ctx, lamports, data, data_len, executable, owner, pubkey, 1, &early_exit );
     219           0 :       } else if( slot > existing->slot) {
     220             :         /* send the to-be-replaced account to the subtracting hash tile */
     221           0 :         fd_snapin_send_duplicate_account( ctx, existing->lamports, (uchar const *)existing + sizeof(fd_account_meta_t), existing->dlen, existing->executable, existing->owner, pubkey, 1, &early_exit );
     222           0 :       } else { /* slot==existing->slot */
     223           0 :         FD_TEST( 0 );
     224           0 :       }
     225             : 
     226           0 :       if( FD_LIKELY( early_exit ) ) {
     227             :         /* buffer account batch if not already buffered */
     228           0 :         if( FD_LIKELY( result && i<FD_SSPARSE_ACC_BATCH_MAX-1UL ) ) {
     229           0 :           FD_TEST( ctx->buffered_batch.batch_cnt==0UL );
     230           0 :           fd_memcpy( ctx->buffered_batch.batch, result->account_batch.batch, sizeof(uchar const*)*FD_SSPARSE_ACC_BATCH_MAX );
     231           0 :           ctx->buffered_batch.slot          = result->account_batch.slot;
     232           0 :           ctx->buffered_batch.batch_cnt     = result->account_batch.batch_cnt;
     233           0 :           ctx->buffered_batch.remaining_idx = i + 1UL;
     234           0 :         }
     235             : 
     236           0 :         insert_limit = i+1UL;
     237           0 :         break;
     238           0 :       }
     239           0 :     }
     240           0 :   }
     241             : 
     242             :   /* Actually insert accounts */
     243           0 :   for( ulong i=start_idx; i<insert_limit; i++ ) {
     244           0 :     uchar const * frame = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
     245           0 :     ulong slot = result ? result->account_batch.slot : buffered_batch->slot;
     246           0 :     if( rec[ i ] ) {
     247           0 :       streamlined_insert( ctx, rec[ i ], frame, slot );
     248           0 :     }
     249           0 :   }
     250             : 
     251           0 :   if( FD_LIKELY( buffered_batch ) ) {
     252           0 :     if( FD_LIKELY( insert_limit==FD_SSPARSE_ACC_BATCH_MAX ) ) {
     253           0 :       buffered_batch->batch_cnt     = 0UL;
     254           0 :       buffered_batch->remaining_idx = 0UL;
     255           0 :     } else {
     256           0 :       buffered_batch->remaining_idx = insert_limit;
     257           0 :     }
     258           0 :   }
     259             : 
     260           0 :   return early_exit;
     261           0 : }
     262             : 
     263             : void
     264             : fd_snapin_read_account_funk( fd_snapin_tile_t *  ctx,
     265             :                              void const *        acct_addr,
     266             :                              fd_account_meta_t * meta,
     267             :                              uchar *             data,
     268           0 :                              ulong               data_max ) {
     269           0 :   memset( meta, 0, sizeof(fd_account_meta_t) );
     270             : 
     271             :   /* Start a speculative database query.
     272             :      It is assumed that no conflicting database accesses take place
     273             :      while the account is being read from funk. */
     274             : 
     275           0 :   fd_accdb_peek_t peek_[1];
     276           0 :   fd_accdb_peek_t * peek = fd_accdb_peek( ctx->accdb, peek_, ctx->xid, acct_addr );
     277           0 :   if( FD_UNLIKELY( !peek ) ) return;
     278             : 
     279           0 :   ulong data_sz = fd_accdb_ref_data_sz( peek->acc );
     280           0 :   if( FD_UNLIKELY( data_sz>data_max ) ) {
     281           0 :     FD_BASE58_ENCODE_32_BYTES( acct_addr, acct_addr_b58 );
     282           0 :     FD_LOG_WARNING(( "failed to read account %s: account data size (%lu bytes) exceeds buffer size (%lu bytes)",
     283           0 :                      acct_addr_b58, (ulong)meta->dlen, data_max ));
     284           0 :   }
     285             : 
     286           0 :   memcpy( meta->owner, fd_accdb_ref_owner( peek->acc ), sizeof(fd_pubkey_t) );
     287           0 :   meta->lamports   = fd_accdb_ref_lamports( peek->acc );
     288           0 :   meta->slot       = fd_accdb_ref_slot( peek->acc );
     289           0 :   meta->dlen       = (uint)data_sz;
     290           0 :   meta->executable = !!fd_accdb_ref_exec_bit( peek->acc );
     291           0 :   fd_memcpy( data, fd_accdb_ref_data_const( peek->acc ), data_sz );
     292             : 
     293           0 :   FD_CRIT( fd_accdb_peek_test( peek ), "invalid read" );
     294           0 :   fd_accdb_peek_drop( peek );
     295           0 : }

Generated by: LCOV version 1.14