LCOV - code coverage report
Current view: top level - discof/restore - fd_snapin_tile_funk.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 221 0.0 %
Date: 2026-01-23 05:02:40 Functions: 0 5 0.0 %

          Line data    Source code
       1             : /* fd_snapin_tile_funk.c contains APIs to load accounts into funk. */
       2             : 
       3             : #include "fd_snapin_tile_private.h"
       4             : #include "../../flamenco/accdb/fd_accdb_sync.h"
       5             : #include "../../funk/fd_funk.h"
       6             : 
       7             : int
       8             : fd_snapin_process_account_header_funk( fd_snapin_tile_t *            ctx,
       9           0 :                                        fd_ssparse_advance_result_t * result ) {
      10           0 :   fd_funk_t * funk = ctx->funk;
      11             : 
      12           0 :   fd_funk_rec_key_t id = FD_LOAD( fd_funk_rec_key_t, result->account_header.pubkey );
      13           0 :   fd_funk_rec_query_t query[1];
      14           0 :   fd_funk_rec_t * rec = fd_funk_rec_query_try( funk, ctx->xid, &id, query );
      15           0 :   fd_funk_rec_t const * existing_rec = rec;
      16             : 
      17           0 :   ctx->metrics.accounts_loaded++;
      18             : 
      19           0 :   int early_exit = 0;
      20           0 :   if( !ctx->full && !existing_rec ) {
      21           0 :     existing_rec = fd_funk_rec_query_try( funk, fd_funk_root( funk ), &id, query );
      22           0 :   }
      23           0 :   if( FD_UNLIKELY( existing_rec ) ) {
      24           0 :     fd_account_meta_t * meta = fd_funk_val( existing_rec, funk->wksp );
      25           0 :     if( FD_UNLIKELY( meta ) ) {
      26           0 :       if( FD_LIKELY( meta->slot>result->account_header.slot ) ) {
      27           0 :         ctx->acc_data = NULL;
      28           0 :         ctx->metrics.accounts_ignored++;
      29           0 :         fd_snapin_send_duplicate_account( ctx, result->account_header.lamports, NULL, result->account_header.data_len, (uchar)result->account_header.executable, result->account_header.owner, result->account_header.pubkey, 0, &early_exit );
      30           0 :         return early_exit;
      31           0 :       }
      32           0 :       ctx->metrics.accounts_replaced++;
      33           0 :       fd_snapin_send_duplicate_account( ctx, meta->lamports, (uchar const *)meta + sizeof(fd_account_meta_t), meta->dlen, meta->executable, meta->owner, result->account_header.pubkey, 1, &early_exit);
      34           0 :     }
      35           0 :   }
      36             : 
      37           0 :   int should_publish = 0;
      38           0 :   fd_funk_rec_prepare_t prepare[1];
      39           0 :   if( FD_LIKELY( !rec ) ) {
      40           0 :     should_publish = 1;
      41           0 :     rec = fd_funk_rec_prepare( funk, ctx->xid, &id, prepare, NULL );
      42           0 :     FD_TEST( rec );
      43           0 :   }
      44             : 
      45           0 :   fd_account_meta_t * meta = fd_funk_val( rec, funk->wksp );
      46             :   /* Allocate data space from heap, free old value (if any) */
      47           0 :   fd_funk_val_flush( rec, funk->alloc, funk->wksp );
      48           0 :   ulong const alloc_sz = sizeof(fd_account_meta_t)+result->account_header.data_len;
      49           0 :   ulong       alloc_max;
      50           0 :   meta = fd_alloc_malloc_at_least( funk->alloc, 16UL, alloc_sz, &alloc_max );
      51           0 :   if( FD_UNLIKELY( !meta ) ) FD_LOG_ERR(( "Ran out of heap memory while loading snapshot (increase [funk.heap_size_gib])" ));
      52           0 :   memset( meta, 0, sizeof(fd_account_meta_t) );
      53           0 :   rec->val_gaddr = fd_wksp_gaddr_fast( funk->wksp, meta );
      54           0 :   rec->val_max   = (uint)( fd_ulong_min( alloc_max, FD_FUNK_REC_VAL_MAX ) & FD_FUNK_REC_VAL_MAX );
      55           0 :   rec->val_sz    = (uint)( alloc_sz  & FD_FUNK_REC_VAL_MAX );
      56             : 
      57           0 :   meta->dlen       = (uint)result->account_header.data_len;
      58           0 :   meta->slot       = result->account_header.slot;
      59           0 :   memcpy( meta->owner, result->account_header.owner, sizeof(fd_pubkey_t) );
      60           0 :   meta->lamports   = result->account_header.lamports;
      61           0 :   meta->executable = (uchar)result->account_header.executable;
      62             : 
      63           0 :   ctx->acc_data = (uchar*)meta + sizeof(fd_account_meta_t);
      64             : 
      65           0 :   if( FD_LIKELY( should_publish ) ) fd_funk_rec_publish( funk, prepare );
      66           0 :   return early_exit;
      67           0 : }
      68             : 
      69             : int
      70             : fd_snapin_process_account_data_funk( fd_snapin_tile_t *            ctx,
      71           0 :                                      fd_ssparse_advance_result_t * result ) {
      72           0 :   int early_exit = 0;
      73           0 :   if( FD_UNLIKELY( !ctx->acc_data ) ) {
      74           0 :     fd_snapin_send_duplicate_account_data( ctx, result->account_data.data, result->account_data.data_sz, &early_exit );
      75           0 :     return early_exit;
      76           0 :   }
      77             : 
      78           0 :   fd_memcpy( ctx->acc_data, result->account_data.data, result->account_data.data_sz );
      79           0 :   ctx->acc_data += result->account_data.data_sz;
      80           0 :   return 0;
      81           0 : }
      82             : 
      83             : /* streamlined_insert inserts an unfragmented account.
      84             :    Only used while loading a full snapshot, not an incremental. */
      85             : 
      86             : static void
      87             : streamlined_insert( fd_snapin_tile_t * ctx,
      88             :                     fd_funk_rec_t *    rec,
      89             :                     uchar const *      frame,
      90           0 :                     ulong              slot ) {
      91           0 :   ulong data_len   = fd_ulong_load_8_fast( frame+0x08UL );
      92           0 :   ulong lamports   = fd_ulong_load_8_fast( frame+0x30UL );
      93           0 :   ulong rent_epoch = fd_ulong_load_8_fast( frame+0x38UL ); (void)rent_epoch;
      94           0 :   uchar owner[32];   memcpy( owner, frame+0x40UL, 32UL );
      95           0 :   _Bool executable = !!frame[ 0x60UL ];
      96             : 
      97           0 :   fd_funk_t * funk = ctx->funk;
      98           0 :   if( FD_UNLIKELY( data_len > FD_RUNTIME_ACC_SZ_MAX ) ) FD_LOG_ERR(( "Found unusually large account (data_sz=%lu), aborting", data_len ));
      99           0 :   fd_funk_val_flush( rec, funk->alloc, funk->wksp );
     100           0 :   ulong const alloc_sz = sizeof(fd_account_meta_t)+data_len;
     101           0 :   ulong       alloc_max;
     102           0 :   fd_account_meta_t * meta = fd_alloc_malloc_at_least( funk->alloc, 16UL, alloc_sz, &alloc_max );
     103           0 :   if( FD_UNLIKELY( !meta ) ) FD_LOG_ERR(( "Ran out of heap memory while loading snapshot (increase [funk.heap_size_gib])" ));
     104           0 :   memset( meta, 0, sizeof(fd_account_meta_t) );
     105           0 :   rec->val_gaddr = fd_wksp_gaddr_fast( funk->wksp, meta );
     106           0 :   rec->val_max   = (uint)( fd_ulong_min( alloc_max, FD_FUNK_REC_VAL_MAX ) & FD_FUNK_REC_VAL_MAX );
     107           0 :   rec->val_sz    = (uint)( alloc_sz  & FD_FUNK_REC_VAL_MAX );
     108             : 
     109             :   /* Write metadata */
     110           0 :   meta->dlen = (uint)data_len;
     111           0 :   meta->slot = slot;
     112           0 :   memcpy( meta->owner, owner, sizeof(fd_pubkey_t) );
     113           0 :   meta->lamports   = lamports;
     114           0 :   meta->executable = (uchar)executable;
     115             : 
     116             :   /* Write data */
     117           0 :   uchar * acc_data = (uchar *)( meta+1 );
     118           0 :   fd_memcpy( acc_data, frame+0x88UL, data_len );
     119           0 : }
     120             : 
     121             : /* process_account_batch is a happy path performance optimization
     122             :    handling insertion of lots of small accounts.
     123             : 
     124             :    The main optimization implemented for funk is doing hash map memory
     125             :    accesses in parallel to amortize DRAM latency. */
     126             : 
     127             : int
     128             : fd_snapin_process_account_batch_funk( fd_snapin_tile_t *            ctx,
     129             :                                       fd_ssparse_advance_result_t * result,
     130           0 :                                       buffered_account_batch_t *    buffered_batch ) {
     131           0 :   int early_exit  = 0;
     132           0 :   ulong start_idx = result ? 0 : buffered_batch->remaining_idx;
     133           0 :   fd_funk_t *         funk    = ctx->funk;
     134           0 :   fd_funk_rec_map_t * rec_map = funk->rec_map;
     135           0 :   fd_funk_rec_t *     rec_tbl = funk->rec_pool->ele;
     136           0 :   fd_funk_rec_map_shmem_private_chain_t * chain_tbl = fd_funk_rec_map_shmem_private_chain( rec_map->map, 0UL );
     137             : 
     138             :   /* Derive map chains */
     139           0 :   uint chain_idx[ FD_SSPARSE_ACC_BATCH_MAX ];
     140           0 :   ulong chain_mask = rec_map->map->chain_cnt-1UL;
     141           0 :   for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
     142           0 :     uchar const * frame  = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
     143           0 :     uchar const * pubkey = frame+0x10UL;
     144           0 :     ulong         memo   = fd_funk_rec_key_hash1( pubkey, rec_map->map->seed );
     145           0 :     chain_idx[ i ] = (uint)( memo&chain_mask );
     146           0 :   }
     147             : 
     148             :   /* Parallel load hash chain heads */
     149           0 :   uint map_node [ FD_SSPARSE_ACC_BATCH_MAX ];
     150           0 :   uint chain_cnt[ FD_SSPARSE_ACC_BATCH_MAX ];
     151           0 :   for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
     152           0 :     map_node [ i ] =       chain_tbl[ chain_idx[ i ] ].head_cidx;
     153           0 :     chain_cnt[ i ] = (uint)chain_tbl[ chain_idx[ i ] ].ver_cnt;
     154           0 :   }
     155           0 :   uint chain_max = 0U;
     156           0 :   for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
     157           0 :     chain_max = fd_uint_max( chain_max, chain_cnt[ i ] );
     158           0 :   }
     159             : 
     160             :   /* Parallel walk hash chains */
     161           0 :   static fd_funk_rec_t dummy_rec = { .map_next = UINT_MAX };
     162           0 :   fd_funk_rec_t * rec[ FD_SSPARSE_ACC_BATCH_MAX ] = {0};
     163           0 :   for( ulong j=0UL; j<chain_max; j++ ) {
     164           0 :     for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
     165           0 :       uchar const *   frame     = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
     166           0 :       uchar const *   pubkey    = frame+0x10UL;
     167           0 :       int const       has_node  = j<chain_cnt[ i ];
     168           0 :       fd_funk_rec_t * node      = has_node ? rec_tbl+map_node[ i ] : &dummy_rec;
     169           0 :       int const       key_match = 0==memcmp( node->pair.key, pubkey, sizeof(fd_funk_rec_key_t) );
     170           0 :       if( has_node && key_match ) rec[ i ] = node;
     171           0 :       map_node[ i ] = node->map_next;
     172           0 :     }
     173           0 :   }
     174             : 
     175             :   /* Create map entries */
     176           0 :   ulong insert_limit = FD_SSPARSE_ACC_BATCH_MAX;
     177           0 :   for( ulong i=start_idx; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
     178           0 :     ulong         slot       = result ? result->account_batch.slot : buffered_batch->slot;
     179           0 :     uchar const * frame      = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
     180           0 :     uchar const * pubkey     = frame+0x10UL;
     181           0 :     ulong         data_len   = fd_ulong_load_8_fast( frame+0x08UL );
     182           0 :     ulong         lamports   = fd_ulong_load_8_fast( frame+0x30UL );
     183           0 :     ulong         rent_epoch = fd_ulong_load_8_fast( frame+0x38UL ); (void)rent_epoch;
     184           0 :     _Bool         executable = !!frame[ 0x60UL ];
     185           0 :     uchar const * data       = frame+0x88UL;
     186           0 :     uchar owner[32];   memcpy( owner, frame+0x40UL, 32UL );
     187           0 :     fd_funk_rec_key_t key = FD_LOAD( fd_funk_rec_key_t, pubkey );
     188             : 
     189           0 :     ctx->metrics.accounts_loaded++;
     190           0 :     fd_funk_rec_t * r = rec[ i ];
     191           0 :     if( FD_LIKELY( !r ) ) {  /* optimize for new account */
     192           0 :       r = fd_funk_rec_pool_acquire( funk->rec_pool, NULL, 0, NULL );
     193           0 :       FD_TEST( r );
     194             : 
     195           0 :       fd_funk_txn_xid_copy( r->pair.xid, ctx->xid );
     196           0 :       fd_funk_rec_key_copy( r->pair.key, &key );
     197           0 :       r->map_next  = 0U;
     198           0 :       r->ver_lock  = fd_funk_rec_ver_lock( 1UL, 0UL );
     199           0 :       r->next_idx  = UINT_MAX;
     200           0 :       r->prev_idx  = UINT_MAX;
     201           0 :       r->val_sz    = 0;
     202           0 :       r->val_max   = 0;
     203           0 :       r->tag       = 0;
     204           0 :       r->val_gaddr = 0UL;
     205             : 
     206             :       /* Insert to hash map.  In theory, a key could appear twice in the
     207             :          same batch.  All accounts in a batch are guaranteed to be from
     208             :          the same slot though, so this is fine, assuming that accdb code
     209             :          gracefully handles duplicate hash map entries. */
     210           0 :       fd_funk_rec_map_shmem_private_chain_t * chain = &chain_tbl[ chain_idx[ i ] ];
     211           0 :       ulong ver_cnt    = chain->ver_cnt;
     212           0 :       uint  head_cidx  = chain->head_cidx;
     213           0 :       chain->ver_cnt   = fd_funk_rec_map_private_vcnt( fd_funk_rec_map_private_vcnt_ver( ver_cnt ), fd_funk_rec_map_private_vcnt_cnt( ver_cnt )+1UL );
     214           0 :       chain->head_cidx = (uint)( r-rec_tbl );
     215           0 :       r->map_next      = head_cidx;
     216           0 :       rec[ i ]         = r;
     217           0 :     } else {  /* existing record for key found */
     218           0 :       fd_account_meta_t const * existing = fd_funk_val( r, funk->wksp );
     219           0 :       if( FD_UNLIKELY( !existing ) ) FD_LOG_HEXDUMP_NOTICE(( "r", r, sizeof(fd_funk_rec_t) ));
     220           0 :       FD_TEST( existing );
     221           0 :       if( existing->slot > slot ) {
     222           0 :         rec[ i ] = NULL;  /* skip record if existing value is newer */
     223             :         /* send the skipped account to the subtracting hash tile */
     224           0 :         ctx->metrics.accounts_ignored++;
     225           0 :         fd_snapin_send_duplicate_account( ctx, lamports, data, data_len, executable, owner, pubkey, 1, &early_exit );
     226           0 :       } else if( slot > existing->slot) {
     227             :         /* send the to-be-replaced account to the subtracting hash tile */
     228           0 :         ctx->metrics.accounts_replaced++;
     229           0 :         fd_snapin_send_duplicate_account( ctx, existing->lamports, (uchar const *)existing + sizeof(fd_account_meta_t), existing->dlen, existing->executable, existing->owner, pubkey, 1, &early_exit );
     230           0 :       } else { /* slot==existing->slot */
     231           0 :         FD_TEST( 0 );
     232           0 :       }
     233             : 
     234           0 :       if( FD_LIKELY( early_exit ) ) {
     235             :         /* buffer account batch if not already buffered */
     236           0 :         if( FD_LIKELY( result && i<FD_SSPARSE_ACC_BATCH_MAX-1UL ) ) {
     237           0 :           FD_TEST( ctx->buffered_batch.batch_cnt==0UL );
     238           0 :           fd_memcpy( ctx->buffered_batch.batch, result->account_batch.batch, sizeof(uchar const*)*FD_SSPARSE_ACC_BATCH_MAX );
     239           0 :           ctx->buffered_batch.slot          = result->account_batch.slot;
     240           0 :           ctx->buffered_batch.batch_cnt     = result->account_batch.batch_cnt;
     241           0 :           ctx->buffered_batch.remaining_idx = i + 1UL;
     242           0 :         }
     243             : 
     244           0 :         insert_limit = i+1UL;
     245           0 :         break;
     246           0 :       }
     247           0 :     }
     248           0 :   }
     249             : 
     250             :   /* Actually insert accounts */
     251           0 :   for( ulong i=start_idx; i<insert_limit; i++ ) {
     252           0 :     uchar const * frame = result ? result->account_batch.batch[ i ] : buffered_batch->batch[ i ];
     253           0 :     ulong slot = result ? result->account_batch.slot : buffered_batch->slot;
     254           0 :     if( rec[ i ] ) {
     255           0 :       streamlined_insert( ctx, rec[ i ], frame, slot );
     256           0 :     }
     257           0 :   }
     258             : 
     259           0 :   if( FD_LIKELY( buffered_batch ) ) {
     260           0 :     if( FD_LIKELY( insert_limit==FD_SSPARSE_ACC_BATCH_MAX ) ) {
     261           0 :       buffered_batch->batch_cnt     = 0UL;
     262           0 :       buffered_batch->remaining_idx = 0UL;
     263           0 :     } else {
     264           0 :       buffered_batch->remaining_idx = insert_limit;
     265           0 :     }
     266           0 :   }
     267             : 
     268           0 :   return early_exit;
     269           0 : }
     270             : 
     271             : void
     272             : fd_snapin_read_account_funk( fd_snapin_tile_t *  ctx,
     273             :                              void const *        acct_addr,
     274             :                              fd_account_meta_t * meta,
     275             :                              uchar *             data,
     276           0 :                              ulong               data_max ) {
     277           0 :   memset( meta, 0, sizeof(fd_account_meta_t) );
     278             : 
     279             :   /* Start a speculative database query.
     280             :      It is assumed that no conflicting database accesses take place
     281             :      while the account is being read from funk. */
     282             : 
     283           0 :   fd_accdb_ro_t ro[1];
     284           0 :   if( FD_UNLIKELY( !fd_accdb_open_ro( ctx->accdb, ro, ctx->xid, acct_addr ) ) ) {
     285           0 :     return;
     286           0 :   }
     287             : 
     288           0 :   ulong data_sz = fd_accdb_ref_data_sz( ro );
     289           0 :   if( FD_UNLIKELY( data_sz>data_max ) ) {
     290           0 :     FD_BASE58_ENCODE_32_BYTES( acct_addr, acct_addr_b58 );
     291           0 :     FD_LOG_CRIT(( "failed to read account %s: account data size (%lu bytes) exceeds buffer size (%lu bytes)",
     292           0 :                   acct_addr_b58, data_sz, data_max ));
     293           0 :   }
     294             : 
     295           0 :   memcpy( meta->owner, fd_accdb_ref_owner( ro ), sizeof(fd_pubkey_t) );
     296           0 :   meta->lamports   = fd_accdb_ref_lamports( ro );
     297           0 :   meta->slot       = fd_accdb_ref_slot( ro );
     298           0 :   meta->dlen       = (uint)data_sz;
     299           0 :   meta->executable = !!fd_accdb_ref_exec_bit( ro );
     300           0 :   fd_memcpy( data, fd_accdb_ref_data_const( ro ), data_sz );
     301             : 
     302           0 :   fd_accdb_close_ro( ctx->accdb, ro );
     303           0 : }

Generated by: LCOV version 1.14