LCOV - code coverage report
Current view: top level - discof/restore - fd_snapin_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 559 0.0 %
Date: 2025-10-27 04:40:00 Functions: 0 20 0.0 %

          Line data    Source code
       1             : #include "utils/fd_ssctrl.h"
       2             : #include "utils/fd_ssparse.h"
       3             : #include "utils/fd_ssmanifest_parser.h"
       4             : #include "utils/fd_slot_delta_parser.h"
       5             : #include "utils/fd_ssmsg.h"
       6             : 
       7             : #include "../../disco/topo/fd_topo.h"
       8             : #include "../../disco/metrics/fd_metrics.h"
       9             : #include "../../flamenco/accdb/fd_accdb_admin.h"
      10             : #include "../../flamenco/accdb/fd_accdb_user.h"
      11             : #include "../../flamenco/runtime/fd_acc_mgr.h"
      12             : #include "../../flamenco/runtime/fd_txncache.h"
      13             : #include "../../flamenco/runtime/sysvar/fd_sysvar_slot_history.h"
      14             : 
      15             : #include "generated/fd_snapin_tile_seccomp.h"
      16             : 
      17             : #define NAME "snapin"
      18             : 
      19             : /* The snapin tile is a state machine that parses and loads a full
      20             :    and optionally an incremental snapshot.  It is currently responsible
      21             :    for loading accounts into an in-memory database, though this may
      22             :    change. */
      23             : 
      24             : /* 300 here is from status_cache.rs::MAX_CACHE_ENTRIES which is the most
      25             :    root slots Agave could possibly serve in a snapshot. */
      26             : #define FD_SNAPIN_TXNCACHE_MAX_ENTRIES (300UL*FD_PACK_MAX_TXNCACHE_TXN_PER_SLOT)
      27             : 
      28             : /* 300 root slots in the slot deltas array, and each one references all
      29             :    151 prior blockhashes that it's able to. */
      30             : #define FD_SNAPIN_MAX_SLOT_DELTA_GROUPS (300UL*151UL)
      31             : 
      32           0 : #define FD_SNAPIN_OUT_SNAPCT   0UL
      33           0 : #define FD_SNAPIN_OUT_MANIFEST 1UL
      34             : 
      35             : struct fd_blockhash_entry {
      36             :   fd_hash_t blockhash;
      37             : 
      38             :   struct {
      39             :     ulong prev;
      40             :     ulong next;
      41             :   } map;
      42             : };
      43             : 
      44             : typedef struct fd_blockhash_entry fd_blockhash_entry_t;
      45             : 
      46             : #define MAP_NAME                           blockhash_map
      47           0 : #define MAP_KEY                            blockhash
      48             : #define MAP_KEY_T                          fd_hash_t
      49             : #define MAP_ELE_T                          fd_blockhash_entry_t
      50           0 : #define MAP_KEY_EQ(k0,k1)                  (!memcmp((k0),(k1), sizeof(fd_hash_t)))
      51           0 : #define MAP_KEY_HASH(key,seed)             (fd_hash((seed),(key),sizeof(fd_hash_t)))
      52           0 : #define MAP_PREV                           map.prev
      53           0 : #define MAP_NEXT                           map.next
      54             : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
      55             : #include "../../util/tmpl/fd_map_chain.c"
      56             : 
      57             : struct blockhash_group {
      58             :   uchar blockhash[ 32UL ];
      59             :   ulong txnhash_offset;
      60             : };
      61             : 
      62             : typedef struct blockhash_group blockhash_group_t;
      63             : 
      64             : struct fd_snapin_tile {
      65             :   int full;
      66             :   int state;
      67             : 
      68             :   ulong seed;
      69             :   long boot_timestamp;
      70             : 
      71             :   fd_accdb_admin_t accdb_admin[1];
      72             :   fd_accdb_user_t  accdb[1];
      73             : 
      74             :   fd_txncache_t * txncache;
      75             :   uchar *         acc_data;
      76             : 
      77             :   fd_funk_txn_xid_t xid[1]; /* txn XID */
      78             : 
      79             :   fd_stem_context_t *      stem;
      80             :   fd_ssparse_t *           ssparse;
      81             :   fd_ssmanifest_parser_t * manifest_parser;
      82             :   fd_slot_delta_parser_t * slot_delta_parser;
      83             : 
      84             :   struct {
      85             :     int manifest_done;
      86             :     int status_cache_done;
      87             :     int manifest_processed;
      88             :   } flags;
      89             : 
      90             :   ulong bank_slot;
      91             : 
      92             :   ulong blockhash_offsets_len;
      93             :   blockhash_group_t * blockhash_offsets;
      94             : 
      95             :   ulong txncache_entries_len;
      96             :   fd_sstxncache_entry_t * txncache_entries;
      97             : 
      98             :   fd_txncache_fork_id_t txncache_root_fork_id;
      99             : 
     100             :   struct {
     101             :     ulong full_bytes_read;
     102             :     ulong incremental_bytes_read;
     103             :     ulong accounts_inserted;
     104             :   } metrics;
     105             : 
     106             :   struct {
     107             :     fd_wksp_t * wksp;
     108             :     ulong       chunk0;
     109             :     ulong       wmark;
     110             :     ulong       mtu;
     111             :     ulong       pos;
     112             :   } in;
     113             : 
     114             :   struct {
     115             :     fd_wksp_t * wksp;
     116             :     ulong       chunk0;
     117             :     ulong       wmark;
     118             :     ulong       chunk;
     119             :     ulong       mtu;
     120             :   } manifest_out;
     121             : };
     122             : 
     123             : typedef struct fd_snapin_tile fd_snapin_tile_t;
     124             : 
     125             : static inline int
     126           0 : should_shutdown( fd_snapin_tile_t * ctx ) {
     127           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN ) ) {
     128           0 :     FD_LOG_NOTICE(( "loaded %.1fM accounts from snapshot in %.3f seconds", (double)ctx->metrics.accounts_inserted/1e6, (double)(fd_log_wallclock()-ctx->boot_timestamp)/1e9 ));
     129           0 :   }
     130           0 :   return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
     131           0 : }
     132             : 
     133             : static ulong
     134           0 : scratch_align( void ) {
     135           0 :   return 128UL;
     136           0 : }
     137             : 
     138             : static ulong
     139           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     140           0 :   (void)tile;
     141           0 :   ulong l = FD_LAYOUT_INIT;
     142           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_snapin_tile_t),      sizeof(fd_snapin_tile_t)                             );
     143           0 :   l = FD_LAYOUT_APPEND( l, fd_ssparse_align(),             fd_ssparse_footprint( 1UL<<24UL )                    );
     144           0 :   l = FD_LAYOUT_APPEND( l, fd_txncache_align(),            fd_txncache_footprint( tile->snapin.max_live_slots ) );
     145           0 :   l = FD_LAYOUT_APPEND( l, fd_ssmanifest_parser_align(),   fd_ssmanifest_parser_footprint()                     );
     146           0 :   l = FD_LAYOUT_APPEND( l, fd_slot_delta_parser_align(),   fd_slot_delta_parser_footprint()                     );
     147           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_sstxncache_entry_t), sizeof(fd_sstxncache_entry_t)*FD_SNAPIN_TXNCACHE_MAX_ENTRIES );
     148           0 :   l = FD_LAYOUT_APPEND( l, alignof(blockhash_group_t),     sizeof(blockhash_group_t)*FD_SNAPIN_MAX_SLOT_DELTA_GROUPS    );
     149           0 :   return FD_LAYOUT_FINI( l, alignof(fd_snapin_tile_t) );
     150           0 : }
     151             : 
     152             : static void
     153           0 : metrics_write( fd_snapin_tile_t * ctx ) {
     154           0 :   FD_MGAUGE_SET( SNAPIN, FULL_BYTES_READ,        ctx->metrics.full_bytes_read );
     155           0 :   FD_MGAUGE_SET( SNAPIN, INCREMENTAL_BYTES_READ, ctx->metrics.incremental_bytes_read );
     156           0 :   FD_MGAUGE_SET( SNAPIN, ACCOUNTS_INSERTED,      ctx->metrics.accounts_inserted );
     157           0 :   FD_MGAUGE_SET( SNAPIN, STATE, (ulong)ctx->state );
     158           0 : }
     159             : 
     160             : static int
     161             : verify_slot_deltas_with_slot_history( fd_snapin_tile_t *         ctx,
     162           0 :                                       fd_slot_history_global_t * slot_history ) {
     163             : 
     164           0 :   for( ulong i=0UL; i<ctx->txncache_entries_len; i++ ) {
     165           0 :     fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[i];
     166           0 :     if( FD_UNLIKELY( fd_sysvar_slot_history_find_slot( slot_history, entry->slot )!=FD_SLOT_HISTORY_SLOT_FOUND ) ) return -1;
     167           0 :   }
     168           0 :   return 0;
     169           0 : }
     170             : 
     171             : static int
     172             : verify_slot_deltas_with_bank_slot( fd_snapin_tile_t * ctx,
     173           0 :                                    ulong              bank_slot ) {
     174           0 :   for( ulong i=0UL; i<ctx->txncache_entries_len; i++ ) {
     175           0 :     fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[i];
     176           0 :     if( FD_UNLIKELY( entry->slot>bank_slot ) ) return -1;
     177           0 :   }
     178           0 :   return 0;
     179           0 : }
     180             : 
     181             : static void
     182             : transition_malformed( fd_snapin_tile_t *  ctx,
     183           0 :                       fd_stem_context_t * stem ) {
     184           0 :   ctx->state = FD_SNAPSHOT_STATE_ERROR;
     185           0 :   fd_stem_publish( stem, FD_SNAPIN_OUT_SNAPCT, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
     186           0 : }
     187             : 
     188             : static int
     189             : populate_txncache( fd_snapin_tile_t *                     ctx,
     190             :                    fd_snapshot_manifest_blockhash_t const blockhashes[ static 301UL ],
     191           0 :                    ulong                                  blockhashes_len ) {
     192             :   /* Our txncache internally contains the fork structure for the chain,
     193             :      which we need to recreate here.  Because snapshots are only served
     194             :      for rooted slots, there is actually no forking, and the bank forks
     195             :      are just a single bank, the root, like
     196             : 
     197             :        _root
     198             : 
     199             :      But the txncache also must contain the 150 more recent banks prior
     200             :      to the root (151 rooted banks total), looking like,
     201             : 
     202             : 
     203             :        _root_150 -> _root_149 -> ... -> _root_2 -> _root_1 -> _root
     204             : 
     205             :      Our txncache is "slot agnostic" meaning there is no concept of a
     206             :      slot number in it.  It just has a fork tree structure.  So long as
     207             :      the fork tree is isomorphic to the actual bank forks, and each bank
     208             :      has the correct blockhash, it works.
     209             : 
     210             :      So the challenge is simply to create this chain of 151 forks in the
     211             :      txncache, with correct blockhashes, and then insert all the
     212             :      transactions into it.
     213             : 
     214             :      Constructing the chain of blockhashes is easy.  It is just the
     215             :      BLOCKHASH_QUEUE array in the manifest.  This array is unfortuantely
     216             :      not sorted and appears in random order, but it has a hash_index
     217             :      field which is a gapless index, starting at some arbitrary offset,
     218             :      so we can back out the 151 blockhashes we need from this, by first
     219             :      finding the max hash_index as _max and then collecting hash entries
     220             :      via,
     221             : 
     222             :        _root_150 -> _root_149 -> ... -> _root_2 -> _root_1 -> _root
     223             :        _max-150  -> _max-149  -> ... -> _max-2  -> _max-1  -> _max
     224             : 
     225             :      Now the remaining problem is inserting transactions into this
     226             :      chain.  Remember each transaction needs to be inserted with:
     227             : 
     228             :       (a) The fork ID (position of the bank in the chain) it was executed in.
     229             :       (b) The blockhash of the bank it referenced.
     230             : 
     231             :     (b) is trivial to retrieve, as it's in the actual slot_deltas entry
     232             :     in the manifest served by Agave.  But (a) is mildly annoying.  Agave
     233             :     serves slot_deltas based on slot, so we need an additional mapping
     234             :     from slot to position in our banks chain.  It turns out we have to
     235             :     go to yet another structure in the manifest to retrieve this, the
     236             :     ancestors array.  This is just an array of slot values,  so we need
     237             :     to sort it, and line it up against our banks chain like so,
     238             : 
     239             :        _root_150  -> _root_149  -> ... -> _root_2  -> _root_1  -> _root
     240             :        _max-150   -> _max-149   -> ... -> _max-2   -> _max-1   -> _max
     241             :        _slots_150 -> _slots_149 -> ... -> _slots_2 -> _slots_1 -> _slots
     242             : 
     243             :     From there we are done.
     244             : 
     245             :     Well almost ... if you were paying attention you might have noticed
     246             :     this is a lot of work and we are lazy.  Why don't we just ignore the
     247             :     slot mapping and assume everything executed at the root slot
     248             :     exactly?  The only invariant we should maintain from a memory
     249             :     perspective is that at most, across all active banks,
     250             :     FD_MAX_TXN_PER_SLOT transactions are stored per slot, but we
     251             :     have preserved that.  It is not true "per slot" technically, but
     252             :     it's true across all slots, and the memory is aggregated.  It will
     253             :     also always be true, even as slots are garbage collected, because
     254             :     entries are collected by referece blockhash, not executed slot.
     255             : 
     256             :     ... actually we can't do this.  There's more broken things here.
     257             :     The Agave status decided to only store 20 bytes for 32 byte
     258             :     transaction hashes to save on memory.  That's OK, but they didn't
     259             :     just take the first 20 bytes.  They instead, for each blockhash,
     260             :     take a random offset between 0 and 12, and store bytes
     261             :     [ offset, offset+20 ) of the transaction hash.  We need to know this
     262             :     offset to be able to query the txncache later, so we need to
     263             :     retrieve it from the slot_deltas entry in the manifest, and key it
     264             :     into our txncache.  Unfortunately this offset is stored per slot in
     265             :     the slot_deltas entry.  So we need to first go and retrieve the
     266             :     ancestors array, sort it, and line it up against our banks chain as
     267             :     described above, and then go through slot deltas, to retrieve the
     268             :     offset for each slot, and stick it into the appropriate bank in
     269             :     our chain. */
     270             : 
     271           0 :   FD_TEST( blockhashes_len<=301UL );
     272           0 :   FD_TEST( blockhashes_len>0UL );
     273             : 
     274           0 :   ulong seq_min = ULONG_MAX;
     275           0 :   for( ulong i=0UL; i<blockhashes_len; i++ ) seq_min = fd_ulong_min( seq_min, blockhashes[ i ].hash_index );
     276             : 
     277           0 :   ulong seq_max;
     278           0 :   if( FD_UNLIKELY( __builtin_uaddl_overflow( seq_min, blockhashes_len, &seq_max ) ) ) {
     279           0 :     FD_LOG_WARNING(( "corrupt snapshot: blockhash queue sequence number wraparound (seq_min=%lu age_cnt=%lu)", seq_min, blockhashes_len ));
     280           0 :     transition_malformed( ctx, ctx->stem );
     281           0 :     return 1;
     282           0 :   }
     283             : 
     284             :   /* First let's construct the chain array as described above.  But
     285             :      index 0 will be the root, index 1 the root's parent, etc. */
     286             : 
     287           0 :   struct {
     288           0 :     int exists;
     289           0 :     uchar blockhash[ 32UL ];
     290           0 :     fd_txncache_fork_id_t fork_id;
     291           0 :     ulong txnhash_offset;
     292           0 :   } banks[ 301UL ] = {0};
     293             : 
     294           0 :   for( ulong i=0UL; i<blockhashes_len; i++ ) {
     295           0 :     fd_snapshot_manifest_blockhash_t const * elem = &blockhashes[ i ];
     296           0 :     ulong idx;
     297           0 :     if( FD_UNLIKELY( __builtin_usubl_overflow( elem->hash_index, seq_min, &idx ) ) ) {
     298           0 :       FD_LOG_WARNING(( "corrupt snapshot: gap in blockhash queue (seq=[%lu,%lu) idx=%lu)", seq_min, seq_max, blockhashes[ i ].hash_index ));
     299           0 :       transition_malformed( ctx, ctx->stem );
     300           0 :       return 1;
     301           0 :     }
     302             : 
     303           0 :     if( FD_UNLIKELY( idx>=blockhashes_len ) ) {
     304           0 :       FD_LOG_WARNING(( "corrupt snapshot: blockhash queue index out of range (seq_min=%lu age_cnt=%lu idx=%lu)", seq_min, blockhashes_len, idx ));
     305           0 :       transition_malformed( ctx, ctx->stem );
     306           0 :       return 1;
     307           0 :     }
     308             : 
     309           0 :     if( FD_UNLIKELY( banks[ blockhashes_len-1UL-idx ].exists ) ) {
     310           0 :       FD_LOG_WARNING(( "corrupt snapshot: duplicate blockhash hash_index %lu", elem->hash_index ));
     311           0 :       transition_malformed( ctx, ctx->stem );
     312           0 :       return 1;
     313           0 :     }
     314             : 
     315           0 :     banks[ blockhashes_len-1UL-idx ].fork_id.val = USHORT_MAX;
     316           0 :     banks[ blockhashes_len-1UL-idx ].txnhash_offset = ULONG_MAX;
     317           0 :     memcpy( banks[ blockhashes_len-1UL-idx ].blockhash, elem->hash, 32UL );
     318           0 :     banks[ blockhashes_len-1UL-idx ].exists = 1;
     319           0 :   }
     320             : 
     321           0 :   ulong chain_len = fd_ulong_min( blockhashes_len, 151UL );
     322             : 
     323             :   /* Now we need a hashset of just the 151 most recent blockhashes,
     324             :      anything else is a nonce transaction which we do not insert, or an
     325             :      already expired transaction which can also be discarded. */
     326             : 
     327           0 :   uchar * _map = fd_alloca_check( alignof(blockhash_map_t), blockhash_map_footprint( 1024UL ) );
     328           0 :   blockhash_map_t * blockhash_map = blockhash_map_join( blockhash_map_new( _map, 1024UL, ctx->seed ) );
     329           0 :   FD_TEST( blockhash_map );
     330             : 
     331           0 :   fd_blockhash_entry_t blockhash_pool[ 151UL ];
     332           0 :   for( ulong i=0UL; i<chain_len; i++ ) {
     333           0 :     fd_memcpy( blockhash_pool[ i ].blockhash.uc, banks[ i ].blockhash, 32UL );
     334             : 
     335           0 :     if( FD_UNLIKELY( blockhash_map_ele_query_const( blockhash_map, &blockhash_pool[ i ].blockhash, NULL, blockhash_pool ) ) ) {
     336           0 :       FD_LOG_WARNING(( "corrupt snapshot: duplicate blockhash %s in 151 most recent blockhashes", FD_BASE58_ENC_32_ALLOCA( banks[ i ].blockhash ) ));
     337           0 :       transition_malformed( ctx, ctx->stem );
     338           0 :       return 1;
     339           0 :     }
     340             : 
     341           0 :     blockhash_map_ele_insert( blockhash_map, &blockhash_pool[ i ], blockhash_pool );
     342           0 :   }
     343             : 
     344             :   /* Now load the blockhash offsets for these blockhashes ... */
     345           0 :   FD_TEST( ctx->blockhash_offsets_len ); /* Must be at least one else nothing would be rooted */
     346           0 :   for( ulong i=0UL; i<ctx->blockhash_offsets_len; i++ ) {
     347           0 :     fd_hash_t key;
     348           0 :     fd_memcpy( key.uc, ctx->blockhash_offsets[ i ].blockhash, 32UL );
     349           0 :     fd_blockhash_entry_t * entry = blockhash_map_ele_query( blockhash_map, &key, NULL, blockhash_pool );
     350           0 :     if( FD_UNLIKELY( !entry ) ) continue; /* Not in the most recent 151 blockhashes */
     351             : 
     352           0 :     ulong chain_idx = (ulong)(entry - blockhash_pool);
     353             : 
     354           0 :     if( FD_UNLIKELY( banks[ chain_idx ].txnhash_offset!=ULONG_MAX && banks[ chain_idx ].txnhash_offset!=ctx->blockhash_offsets[ i ].txnhash_offset ) ) {
     355           0 :       FD_LOG_WARNING(( "corrupt snapshot: conflicting txnhash offsets for blockhash %s", FD_BASE58_ENC_32_ALLOCA( entry->blockhash.uc ) ));
     356           0 :       transition_malformed( ctx, ctx->stem );
     357           0 :       return 1;
     358           0 :     }
     359             : 
     360           0 :     banks[ chain_idx ].txnhash_offset = ctx->blockhash_offsets[ i ].txnhash_offset;
     361           0 :   }
     362             : 
     363             :   /* Construct the linear fork chain in the txncache. */
     364             : 
     365           0 :   fd_txncache_fork_id_t parent = { .val = USHORT_MAX };
     366           0 :   for( ulong i=0UL; i<chain_len; i++ ) banks[ chain_len-1UL-i ].fork_id = parent = fd_txncache_attach_child( ctx->txncache, parent );
     367           0 :   for( ulong i=0UL; i<chain_len; i++ ) fd_txncache_attach_blockhash( ctx->txncache, banks[ i ].fork_id, banks[ i ].blockhash );
     368             : 
     369             :   /* Now insert all transactions as if they executed at the current
     370             :      root, per above. */
     371             : 
     372           0 :   ulong insert_cnt = 0UL;
     373           0 :   for( ulong i=0UL; i<ctx->txncache_entries_len; i++ ) {
     374           0 :     fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[ i ];
     375           0 :     fd_hash_t key;
     376           0 :     fd_memcpy( key.uc, entry->blockhash, 32UL );
     377           0 :     if( FD_UNLIKELY( !blockhash_map_ele_query_const( blockhash_map, &key, NULL, blockhash_pool ) ) ) continue;
     378             : 
     379           0 :     insert_cnt++;
     380           0 :     fd_txncache_insert( ctx->txncache, banks[ 0UL ].fork_id, entry->blockhash, entry->txnhash );
     381           0 :   }
     382             : 
     383           0 :   FD_LOG_INFO(( "inserted %lu/%lu transactions into the txncache", insert_cnt, ctx->txncache_entries_len ));
     384             : 
     385             :   /* Then finalize all the banks (freezing them) and setting the txnhash
     386             :      offset so future queries use the correct offset.  If the offset is
     387             :      ULONG_MAX this is valid, it means the blockhash had no transactions
     388             :      in it, so there's nothing in the status cache under that blockhash.
     389             : 
     390             :      Just set the offset to 0 in this case, it doesn't matter, but
     391             :      should be valid between 0 and 12 inclusive. */
     392           0 :   for( ulong i=0UL; i<chain_len; i++ ) {
     393           0 :     ulong txnhash_offset = banks[ chain_len-1UL-i ].txnhash_offset==ULONG_MAX ? 0UL : banks[ chain_len-1UL-i ].txnhash_offset;
     394           0 :     fd_txncache_finalize_fork( ctx->txncache, banks[ chain_len-1UL-i ].fork_id, txnhash_offset, banks[ chain_len-1UL-i ].blockhash );
     395           0 :   }
     396             : 
     397           0 :   for( ulong i=1UL; i<chain_len; i++ ) fd_txncache_advance_root( ctx->txncache, banks[ chain_len-1UL-i ].fork_id );
     398             : 
     399           0 :   ctx->txncache_root_fork_id = parent;
     400             : 
     401           0 :   return 0;
     402           0 : }
     403             : 
     404             : static void
     405           0 : process_manifest( fd_snapin_tile_t * ctx ) {
     406           0 :   fd_snapshot_manifest_t * manifest = fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk );
     407             : 
     408           0 :   ctx->bank_slot = manifest->slot;
     409           0 :   if( FD_UNLIKELY( verify_slot_deltas_with_bank_slot( ctx, manifest->slot ) ) ) {
     410           0 :     FD_LOG_WARNING(( "slot deltas verification failed" ));
     411           0 :     transition_malformed( ctx, ctx->stem );
     412           0 :     return;
     413           0 :   }
     414             : 
     415           0 :   if( FD_UNLIKELY( populate_txncache( ctx, manifest->blockhashes, manifest->blockhashes_len ) ) ) {
     416           0 :     FD_LOG_WARNING(( "populating txncache failed" ));
     417           0 :     transition_malformed( ctx, ctx->stem );
     418           0 :     return;
     419           0 :   }
     420             : 
     421           0 :   manifest->txncache_fork_id = ctx->txncache_root_fork_id.val;
     422             : 
     423           0 :   ulong sig = ctx->full ? fd_ssmsg_sig( FD_SSMSG_MANIFEST_FULL ) :
     424           0 :                           fd_ssmsg_sig( FD_SSMSG_MANIFEST_INCREMENTAL );
     425           0 :   fd_stem_publish( ctx->stem, FD_SNAPIN_OUT_MANIFEST, sig, ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), 0UL, 0UL, 0UL );
     426           0 :   ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), ctx->manifest_out.chunk0, ctx->manifest_out.wmark );
     427           0 : }
     428             : 
     429             : static void
     430             : process_account_header( fd_snapin_tile_t *            ctx,
     431           0 :                         fd_ssparse_advance_result_t * result ) {
     432           0 :   fd_funk_t * funk = ctx->accdb->funk;
     433             : 
     434           0 :   fd_funk_rec_key_t id = fd_funk_acc_key( (fd_pubkey_t const*)result->account_header.pubkey );
     435           0 :   fd_funk_rec_query_t query[1];
     436           0 :   fd_funk_rec_t * rec = fd_funk_rec_query_try( funk, ctx->xid, &id, query );
     437             : 
     438           0 :   int should_publish = 0;
     439           0 :   fd_funk_rec_prepare_t prepare[1];
     440           0 :   if( FD_LIKELY( !rec ) ) {
     441           0 :     should_publish = 1;
     442           0 :     rec = fd_funk_rec_prepare( funk, ctx->xid, &id, prepare, NULL );
     443           0 :     FD_TEST( rec );
     444           0 :   }
     445             : 
     446           0 :   fd_account_meta_t * meta = fd_funk_val( rec, funk->wksp );
     447           0 :   if( FD_UNLIKELY( meta ) ) {
     448           0 :     if( FD_LIKELY( meta->slot>result->account_header.slot ) ) {
     449           0 :       ctx->acc_data = NULL;
     450           0 :       return;
     451           0 :     }
     452             : 
     453             :     /* TODO: Reaching here means the existing value is a duplicate
     454             :        account.  We need to hash the existing account and subtract that
     455             :        hash from the running lthash. */
     456           0 :   }
     457             : 
     458             :   /* Allocate data space from heap, free old value (if any) */
     459           0 :   fd_funk_val_flush( rec, funk->alloc, funk->wksp );
     460           0 :   ulong const alloc_sz = sizeof(fd_account_meta_t)+result->account_header.data_len;
     461           0 :   ulong       alloc_max;
     462           0 :   meta = fd_alloc_malloc_at_least( funk->alloc, 16UL, alloc_sz, &alloc_max );
     463           0 :   if( FD_UNLIKELY( !meta ) ) FD_LOG_ERR(( "Ran out of heap memory while loading snapshot (increase [funk.heap_size_gib])" ));
     464           0 :   memset( meta, 0, sizeof(fd_account_meta_t) );
     465           0 :   rec->val_gaddr = fd_wksp_gaddr_fast( funk->wksp, meta );
     466           0 :   rec->val_max   = (uint)( fd_ulong_min( alloc_max, FD_FUNK_REC_VAL_MAX ) & FD_FUNK_REC_VAL_MAX );
     467           0 :   rec->val_sz    = (uint)( alloc_sz  & FD_FUNK_REC_VAL_MAX );
     468             : 
     469           0 :   meta->dlen       = (uint)result->account_header.data_len;
     470           0 :   meta->slot       = result->account_header.slot;
     471           0 :   memcpy( meta->owner, result->account_header.owner, sizeof(fd_pubkey_t) );
     472           0 :   meta->lamports   = result->account_header.lamports;
     473           0 :   meta->executable = (uchar)result->account_header.executable;
     474             : 
     475           0 :   ctx->acc_data = (uchar*)meta + sizeof(fd_account_meta_t);
     476           0 :   ctx->metrics.accounts_inserted++;
     477             : 
     478           0 :   if( FD_LIKELY( should_publish ) ) fd_funk_rec_publish( funk, prepare );
     479           0 : }
     480             : 
     481             : static void
     482             : process_account_data( fd_snapin_tile_t *            ctx,
     483           0 :                       fd_ssparse_advance_result_t * result ) {
     484           0 :   if( FD_UNLIKELY( !ctx->acc_data ) ) return;
     485             : 
     486           0 :   fd_memcpy( ctx->acc_data, result->account_data.data, result->account_data.data_sz );
     487           0 :   ctx->acc_data += result->account_data.data_sz;
     488           0 : }
     489             : 
     490             : /* streamlined_insert inserts an unfragmented account.
     491             :    Only used while loading a full snapshot, not an incremental. */
     492             : 
     493             : static void
     494             : streamlined_insert( fd_snapin_tile_t * ctx,
     495             :                     fd_funk_rec_t *    rec,
     496             :                     uchar const *      frame,
     497           0 :                     ulong              slot ) {
     498           0 :   ulong data_len   = fd_ulong_load_8_fast( frame+0x08UL );
     499           0 :   ulong lamports   = fd_ulong_load_8_fast( frame+0x30UL );
     500           0 :   ulong rent_epoch = fd_ulong_load_8_fast( frame+0x38UL ); (void)rent_epoch;
     501           0 :   uchar owner[32];   memcpy( owner, frame+0x40UL, 32UL );
     502           0 :   _Bool executable = !!frame[ 0x60UL ];
     503             : 
     504           0 :   fd_funk_t * funk = ctx->accdb->funk;
     505           0 :   if( FD_UNLIKELY( data_len > FD_RUNTIME_ACC_SZ_MAX ) ) FD_LOG_ERR(( "Found unusually large account (data_sz=%lu), aborting", data_len ));
     506           0 :   fd_funk_val_flush( rec, funk->alloc, funk->wksp );
     507           0 :   ulong const alloc_sz = sizeof(fd_account_meta_t)+data_len;
     508           0 :   ulong       alloc_max;
     509           0 :   fd_account_meta_t * meta = fd_alloc_malloc_at_least( funk->alloc, 16UL, alloc_sz, &alloc_max );
     510           0 :   if( FD_UNLIKELY( !meta ) ) FD_LOG_ERR(( "Ran out of heap memory while loading snapshot (increase [funk.heap_size_gib])" ));
     511           0 :   memset( meta, 0, sizeof(fd_account_meta_t) );
     512           0 :   rec->val_gaddr = fd_wksp_gaddr_fast( funk->wksp, meta );
     513           0 :   rec->val_max   = (uint)( fd_ulong_min( alloc_max, FD_FUNK_REC_VAL_MAX ) & FD_FUNK_REC_VAL_MAX );
     514           0 :   rec->val_sz    = (uint)( alloc_sz  & FD_FUNK_REC_VAL_MAX );
     515             : 
     516             :   /* Write metadata */
     517           0 :   meta->dlen = (uint)data_len;
     518           0 :   meta->slot = slot;
     519           0 :   memcpy( meta->owner, owner, sizeof(fd_pubkey_t) );
     520           0 :   meta->lamports   = lamports;
     521           0 :   meta->executable = (uchar)executable;
     522             : 
     523             :   /* Write data */
     524           0 :   uchar * acc_data = (uchar *)( meta+1 );
     525           0 :   fd_memcpy( acc_data, frame+0x88UL, data_len );
     526             : 
     527           0 :   ctx->metrics.accounts_inserted++;
     528           0 : }
     529             : 
     530             : /* process_account_batch is a happy path performance optimization
     531             :    handling insertion of lots of small accounts.
     532             : 
     533             :    The main optimization implemented for funk is prefetching hash map
     534             :    accesses. */
     535             : 
     536             : __attribute__((noinline)) static void
     537             : process_account_batch( fd_snapin_tile_t *            ctx,
     538           0 :                        fd_ssparse_advance_result_t * result ) {
     539           0 :   fd_funk_t *         funk    = ctx->accdb->funk;
     540           0 :   fd_funk_rec_map_t * rec_map = funk->rec_map;
     541           0 :   fd_funk_rec_t *     rec_tbl = funk->rec_pool->ele;
     542           0 :   fd_funk_rec_map_shmem_private_chain_t * chain_tbl = fd_funk_rec_map_shmem_private_chain( rec_map->map, 0UL );
     543             : 
     544             :   /* Derive map chains */
     545           0 :   uint chain_idx[ FD_SSPARSE_ACC_BATCH_MAX ];
     546           0 :   ulong chain_mask = rec_map->map->chain_cnt-1UL;
     547           0 :   for( ulong i=0UL; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
     548           0 :     uchar const * frame  = result->account_batch.batch[ i ];
     549           0 :     uchar const * pubkey = frame+0x10UL;
     550           0 :     ulong         memo   = fd_funk_rec_key_hash1( pubkey, 0UL, rec_map->map->seed );
     551           0 :     chain_idx[ i ] = (uint)( memo&chain_mask );
     552           0 :   }
     553             : 
     554             :   /* Parallel load hash chain heads */
     555           0 :   uint map_node [ FD_SSPARSE_ACC_BATCH_MAX ];
     556           0 :   uint chain_cnt[ FD_SSPARSE_ACC_BATCH_MAX ];
     557           0 :   for( ulong i=0UL; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
     558           0 :     map_node [ i ] =       chain_tbl[ chain_idx[ i ] ].head_cidx;
     559           0 :     chain_cnt[ i ] = (uint)chain_tbl[ chain_idx[ i ] ].ver_cnt;
     560           0 :   }
     561           0 :   uint chain_max = 0U;
     562           0 :   for( ulong i=0UL; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
     563           0 :     chain_max = fd_uint_max( chain_max, chain_cnt[ i ] );
     564           0 :   }
     565             : 
     566             :   /* Parallel walk hash chains */
     567           0 :   static fd_funk_rec_t dummy_rec = { .map_next = UINT_MAX };
     568           0 :   fd_funk_rec_t * rec[ FD_SSPARSE_ACC_BATCH_MAX ] = {0};
     569           0 :   for( ulong j=0UL; j<chain_max; j++ ) {
     570           0 :     for( ulong i=0UL; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
     571           0 :       uchar const *   frame     = result->account_batch.batch[ i ];
     572           0 :       uchar const *   pubkey    = frame+0x10UL;
     573           0 :       int const       has_node  = j<chain_cnt[ i ];
     574           0 :       fd_funk_rec_t * node      = has_node ? rec_tbl+map_node[ i ] : &dummy_rec;
     575           0 :       int const       key_match = 0==memcmp( node->pair.key, pubkey, sizeof(fd_funk_rec_key_t) );
     576           0 :       if( has_node && key_match ) rec[ i ] = node;
     577           0 :       map_node[ i ] = node->map_next;
     578           0 :     }
     579           0 :   }
     580             : 
     581             :   /* Create map entries */
     582           0 :   for( ulong i=0UL; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
     583           0 :     uchar const * frame  = result->account_batch.batch[ i ];
     584           0 :     uchar const * pubkey = frame+0x10UL;
     585           0 :     fd_funk_rec_key_t key = FD_LOAD( fd_funk_rec_key_t, pubkey );
     586             : 
     587           0 :     fd_funk_rec_t * r = rec[ i ];
     588           0 :     if( FD_LIKELY( !r ) ) {  /* optimize for new account */
     589           0 :       r = fd_funk_rec_pool_acquire( funk->rec_pool, NULL, 0, NULL );
     590           0 :       FD_TEST( r );
     591           0 :       memset( r, 0, sizeof(fd_funk_rec_t) );
     592           0 :       fd_funk_txn_xid_copy( r->pair.xid, ctx->xid );
     593           0 :       fd_funk_rec_key_copy( r->pair.key, &key );
     594             : 
     595             :       /* Insert to hash map.  In theory, a key could appear twice in the
     596             :          same batch.  All accounts in a batch are guaranteed to be from
     597             :          the same slot though, so this is fine, assuming that accdb code
     598             :          gracefully handles duplicate hash map entries. */
     599           0 :       fd_funk_rec_map_shmem_private_chain_t * chain = &chain_tbl[ chain_idx[ i ] ];
     600           0 :       ulong ver_cnt    = chain->ver_cnt;
     601           0 :       uint  head_cidx  = chain->head_cidx;
     602           0 :       chain->ver_cnt   = fd_funk_rec_map_private_vcnt( fd_funk_rec_map_private_vcnt_ver( ver_cnt ), fd_funk_rec_map_private_vcnt_cnt( ver_cnt )+1UL );
     603           0 :       chain->head_cidx = (uint)( r-rec_tbl );
     604           0 :       r->map_next      = head_cidx;
     605           0 :       rec[ i ]         = r;
     606           0 :     } else {  /* existing record for key found */
     607           0 :       fd_account_meta_t const * existing = fd_funk_val( r, funk->wksp );
     608           0 :       if( FD_UNLIKELY( !existing ) ) FD_LOG_HEXDUMP_NOTICE(( "r", r, sizeof(fd_funk_rec_t) ));
     609           0 :       FD_TEST( existing );
     610           0 :       if( existing->slot > result->account_batch.slot ) {
     611           0 :         rec[ i ] = NULL;  /* skip record if existing value is newer */
     612           0 :       }
     613           0 :     }
     614           0 :   }
     615             : 
     616             :   /* Actually insert accounts */
     617           0 :   for( ulong i=0UL; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
     618           0 :     if( rec[ i ] ) {
     619           0 :       streamlined_insert( ctx, rec[ i ], result->account_batch.batch[ i ], result->account_batch.slot );
     620           0 :     }
     621           0 :   }
     622           0 : }
     623             : 
     624             : static int
     625             : handle_data_frag( fd_snapin_tile_t *  ctx,
     626             :                   ulong               chunk,
     627             :                   ulong               sz,
     628           0 :                   fd_stem_context_t * stem ) {
     629           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_FINISHING ) ) {
     630           0 :     transition_malformed( ctx, stem );
     631           0 :     return 0;
     632           0 :   }
     633           0 :   else if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
     634             :     /* Ignore all data frags after observing an error in the stream until
     635             :        we receive fail & init control messages to restart processing. */
     636           0 :     return 0;
     637           0 :   }
     638           0 :   else if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) ) {
     639           0 :     FD_LOG_ERR(( "invalid state for data frag %d", ctx->state ));
     640           0 :   }
     641             : 
     642           0 :   FD_TEST( chunk>=ctx->in.chunk0 && chunk<=ctx->in.wmark && sz<=ctx->in.mtu );
     643             : 
     644           0 :   for(;;) {
     645           0 :     if( FD_UNLIKELY( sz-ctx->in.pos==0UL ) ) break;
     646             : 
     647           0 :     uchar const * data = (uchar const *)fd_chunk_to_laddr_const( ctx->in.wksp, chunk ) + ctx->in.pos;
     648             : 
     649           0 :     fd_ssparse_advance_result_t result[1];
     650           0 :     int res = fd_ssparse_advance( ctx->ssparse, data, sz-ctx->in.pos, result );
     651           0 :     switch( res ) {
     652           0 :       case FD_SSPARSE_ADVANCE_ERROR:
     653           0 :         transition_malformed( ctx, stem );
     654           0 :         return 0;
     655           0 :       case FD_SSPARSE_ADVANCE_AGAIN:
     656           0 :         break;
     657           0 :       case FD_SSPARSE_ADVANCE_MANIFEST: {
     658           0 :         int res = fd_ssmanifest_parser_consume( ctx->manifest_parser,
     659           0 :                                                 result->manifest.data,
     660           0 :                                                 result->manifest.data_sz,
     661           0 :                                                 result->manifest.acc_vec_map,
     662           0 :                                                 result->manifest.acc_vec_pool );
     663           0 :         if( FD_UNLIKELY( res==FD_SSMANIFEST_PARSER_ADVANCE_ERROR ) ) {
     664           0 :           transition_malformed( ctx, stem );
     665           0 :           return 0;
     666           0 :         } else if( FD_LIKELY( res==FD_SSMANIFEST_PARSER_ADVANCE_DONE ) ) {
     667           0 :           ctx->flags.manifest_done = 1;
     668           0 :         }
     669           0 :         break;
     670           0 :       }
     671           0 :       case FD_SSPARSE_ADVANCE_STATUS_CACHE: {
     672           0 :         fd_slot_delta_parser_advance_result_t sd_result[1];
     673           0 :         ulong bytes_remaining = result->status_cache.data_sz;
     674             : 
     675           0 :         while( bytes_remaining ) {
     676           0 :           int res = fd_slot_delta_parser_consume( ctx->slot_delta_parser,
     677           0 :                                                   result->status_cache.data,
     678           0 :                                                   bytes_remaining,
     679           0 :                                                   sd_result );
     680           0 :           if( FD_UNLIKELY( res<0 ) ) {
     681           0 :             transition_malformed( ctx, stem );
     682           0 :             return 0;
     683           0 :           } else if( FD_LIKELY( res==FD_SLOT_DELTA_PARSER_ADVANCE_GROUP ) ) {
     684           0 :             if( FD_UNLIKELY( ctx->blockhash_offsets_len>=FD_SNAPIN_MAX_SLOT_DELTA_GROUPS ) ) FD_LOG_ERR(( "blockhash offsets overflow, max is %lu", FD_SNAPIN_MAX_SLOT_DELTA_GROUPS ));
     685             : 
     686           0 :             memcpy( ctx->blockhash_offsets[ ctx->blockhash_offsets_len ].blockhash, sd_result->group.blockhash, 32UL );
     687           0 :             ctx->blockhash_offsets[ ctx->blockhash_offsets_len ].txnhash_offset = sd_result->group.txnhash_offset;
     688           0 :             ctx->blockhash_offsets_len++;
     689           0 :           } else if( FD_LIKELY( res==FD_SLOT_DELTA_PARSER_ADVANCE_ENTRY ) ) {
     690           0 :             if( FD_UNLIKELY( ctx->txncache_entries_len>=FD_SNAPIN_TXNCACHE_MAX_ENTRIES ) ) FD_LOG_ERR(( "txncache entries overflow, max is %lu", FD_SNAPIN_TXNCACHE_MAX_ENTRIES ));
     691           0 :             ctx->txncache_entries[ ctx->txncache_entries_len++ ] = *sd_result->entry;
     692           0 :           }
     693             : 
     694           0 :           bytes_remaining           -= sd_result->bytes_consumed;
     695           0 :           result->status_cache.data += sd_result->bytes_consumed;
     696           0 :         }
     697             : 
     698           0 :         ctx->flags.status_cache_done = fd_slot_delta_parser_consume( ctx->slot_delta_parser, result->status_cache.data, 0UL, sd_result )==FD_SLOT_DELTA_PARSER_ADVANCE_DONE;
     699           0 :         break;
     700           0 :       }
     701           0 :       case FD_SSPARSE_ADVANCE_ACCOUNT_HEADER:
     702           0 :         process_account_header( ctx, result );
     703           0 :         break;
     704           0 :       case FD_SSPARSE_ADVANCE_ACCOUNT_DATA:
     705           0 :         process_account_data( ctx, result );
     706           0 :         break;
     707           0 :       case FD_SSPARSE_ADVANCE_ACCOUNT_BATCH:
     708           0 :         process_account_batch( ctx, result );
     709           0 :         break;
     710           0 :       case FD_SSPARSE_ADVANCE_DONE:
     711           0 :         ctx->state = FD_SNAPSHOT_STATE_FINISHING;
     712           0 :         break;
     713           0 :       default:
     714           0 :         FD_LOG_ERR(( "unexpected fd_ssparse_advance result %d", res ));
     715           0 :         break;
     716           0 :     }
     717             : 
     718           0 :     if( FD_UNLIKELY( !ctx->flags.manifest_processed && ctx->flags.manifest_done && ctx->flags.status_cache_done ) ) {
     719           0 :       process_manifest( ctx );
     720           0 :       ctx->flags.manifest_processed = 1;
     721           0 :     }
     722             : 
     723           0 :     ctx->in.pos += result->bytes_consumed;
     724           0 :     if( FD_LIKELY( ctx->full ) ) ctx->metrics.full_bytes_read        += result->bytes_consumed;
     725           0 :     else                         ctx->metrics.incremental_bytes_read += result->bytes_consumed;
     726           0 :   }
     727             : 
     728           0 :   int reprocess_frag = ctx->in.pos<sz;
     729           0 :   if( FD_LIKELY( !reprocess_frag ) ) ctx->in.pos = 0UL;
     730           0 :   return reprocess_frag;
     731           0 : }
     732             : 
     733             : static void
     734             : handle_control_frag( fd_snapin_tile_t *  ctx,
     735             :                      fd_stem_context_t * stem,
     736           0 :                      ulong               sig ) {
     737           0 :   fd_funk_t * funk = ctx->accdb->funk;
     738           0 :   switch( sig ) {
     739           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
     740           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
     741           0 :       fd_ssparse_batch_enable( ctx->ssparse, sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL );
     742           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
     743           0 :       ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
     744           0 :       ctx->full = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
     745           0 :       ctx->txncache_entries_len  = 0UL;
     746           0 :       ctx->blockhash_offsets_len = 0UL;
     747           0 :       fd_txncache_reset( ctx->txncache );
     748           0 :       fd_ssparse_reset( ctx->ssparse );
     749           0 :       fd_ssmanifest_parser_init( ctx->manifest_parser, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ) );
     750           0 :       fd_slot_delta_parser_init( ctx->slot_delta_parser );
     751           0 :       fd_memset( &ctx->flags, 0, sizeof(ctx->flags) );
     752           0 :       break;
     753             : 
     754           0 :     case FD_SNAPSHOT_MSG_CTRL_FAIL:
     755           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ||
     756           0 :                ctx->state==FD_SNAPSHOT_STATE_FINISHING ||
     757           0 :                ctx->state==FD_SNAPSHOT_STATE_ERROR );
     758           0 :       ctx->state = FD_SNAPSHOT_STATE_IDLE;
     759             : 
     760           0 :       if( ctx->full ) {
     761           0 :         fd_accdb_clear( ctx->accdb_admin );
     762           0 :       } else {
     763           0 :         fd_accdb_cancel( ctx->accdb_admin, ctx->xid );
     764           0 :         fd_funk_txn_xid_copy( ctx->xid, fd_funk_last_publish( funk ) );
     765           0 :       }
     766           0 :       break;
     767             : 
     768           0 :     case FD_SNAPSHOT_MSG_CTRL_NEXT: {
     769           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ||
     770           0 :                ctx->state==FD_SNAPSHOT_STATE_FINISHING  ||
     771           0 :                ctx->state==FD_SNAPSHOT_STATE_ERROR );
     772           0 :       if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_FINISHING ) ) {
     773           0 :         transition_malformed( ctx, stem );
     774           0 :         return;
     775           0 :       }
     776           0 :       ctx->state = FD_SNAPSHOT_STATE_IDLE;
     777             : 
     778           0 :       fd_funk_txn_xid_t incremental_xid = { .ul={ LONG_MAX, LONG_MAX } };
     779           0 :       fd_accdb_attach_child( ctx->accdb_admin, ctx->xid, &incremental_xid );
     780           0 :       fd_funk_txn_xid_copy( ctx->xid, &incremental_xid );
     781           0 :       break;
     782           0 :     }
     783             : 
     784           0 :     case FD_SNAPSHOT_MSG_CTRL_DONE: {
     785           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ||
     786           0 :                ctx->state==FD_SNAPSHOT_STATE_FINISHING  ||
     787           0 :                ctx->state==FD_SNAPSHOT_STATE_ERROR );
     788           0 :       if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_FINISHING ) ) {
     789           0 :         transition_malformed( ctx, stem );
     790           0 :         return;
     791           0 :       }
     792           0 :       ctx->state = FD_SNAPSHOT_STATE_IDLE;
     793             : 
     794           0 :       uchar slot_history_mem[ FD_SYSVAR_SLOT_HISTORY_FOOTPRINT ];
     795           0 :       fd_slot_history_global_t * slot_history = fd_sysvar_slot_history_read( funk, ctx->xid, slot_history_mem );
     796           0 :       if( FD_UNLIKELY( verify_slot_deltas_with_slot_history( ctx, slot_history ) ) ) {
     797           0 :         FD_LOG_WARNING(( "slot deltas verification failed" ));
     798           0 :         transition_malformed( ctx, stem );
     799           0 :         break;
     800           0 :       }
     801             : 
     802             :       /* Publish any remaining funk txn */
     803           0 :       if( FD_LIKELY( fd_funk_last_publish_is_frozen( funk ) ) ) {
     804           0 :         fd_accdb_advance_root( ctx->accdb_admin, ctx->xid );
     805           0 :       }
     806           0 :       FD_TEST( !fd_funk_last_publish_is_frozen( funk ) );
     807             : 
     808             :       /* Make 'Last published' XID equal the restored slot number */
     809           0 :       fd_funk_txn_xid_t target_xid = { .ul = { ctx->bank_slot, 0UL } };
     810           0 :       fd_accdb_attach_child( ctx->accdb_admin, ctx->xid, &target_xid );
     811           0 :       fd_accdb_advance_root( ctx->accdb_admin,           &target_xid );
     812           0 :       fd_funk_txn_xid_copy( ctx->xid, &target_xid );
     813             : 
     814           0 :       fd_stem_publish( stem, FD_SNAPIN_OUT_MANIFEST, fd_ssmsg_sig( FD_SSMSG_DONE ), 0UL, 0UL, 0UL, 0UL, 0UL );
     815           0 :       break;
     816           0 :     }
     817             : 
     818           0 :     case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
     819           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
     820           0 :       ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
     821           0 :       metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
     822           0 :       break;
     823             : 
     824           0 :     case FD_SNAPSHOT_MSG_CTRL_ERROR:
     825           0 :       ctx->state = FD_SNAPSHOT_STATE_ERROR;
     826           0 :       break;
     827             : 
     828           0 :     default:
     829           0 :       FD_LOG_ERR(( "unexpected control sig %lu", sig ));
     830           0 :       return;
     831           0 :   }
     832             : 
     833             :   /* Forward the control message down the pipeline */
     834           0 :   fd_stem_publish( stem, FD_SNAPIN_OUT_SNAPCT, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
     835           0 : }
     836             : 
     837             : static inline int
     838             : returnable_frag( fd_snapin_tile_t *  ctx,
     839             :                  ulong               in_idx FD_PARAM_UNUSED,
     840             :                  ulong               seq    FD_PARAM_UNUSED,
     841             :                  ulong               sig,
     842             :                  ulong               chunk,
     843             :                  ulong               sz,
     844             :                  ulong               ctl    FD_PARAM_UNUSED,
     845             :                  ulong               tsorig FD_PARAM_UNUSED,
     846             :                  ulong               tspub  FD_PARAM_UNUSED,
     847           0 :                  fd_stem_context_t * stem ) {
     848           0 :   FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     849             : 
     850           0 :   ctx->stem = stem;
     851           0 :   if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) return handle_data_frag( ctx, chunk, sz, stem );
     852           0 :   else                                           handle_control_frag( ctx, stem, sig );
     853           0 :   ctx->stem = NULL;
     854             : 
     855           0 :   return 0;
     856           0 : }
     857             : 
     858             : static ulong
     859             : populate_allowed_fds( fd_topo_t      const * topo FD_PARAM_UNUSED,
     860             :                       fd_topo_tile_t const * tile FD_PARAM_UNUSED,
     861             :                       ulong                  out_fds_cnt,
     862           0 :                       int *                  out_fds ) {
     863           0 :   if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
     864             : 
     865           0 :   ulong out_cnt = 0;
     866           0 :   out_fds[ out_cnt++ ] = 2UL; /* stderr */
     867           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
     868           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     869           0 :   }
     870             : 
     871           0 :   return out_cnt;
     872           0 : }
     873             : 
     874             : static ulong
     875             : populate_allowed_seccomp( fd_topo_t const *      topo FD_PARAM_UNUSED,
     876             :                           fd_topo_tile_t const * tile FD_PARAM_UNUSED,
     877             :                           ulong                  out_cnt,
     878           0 :                           struct sock_filter *   out ) {
     879             : 
     880           0 :   populate_sock_filter_policy_fd_snapin_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
     881           0 :   return sock_filter_policy_fd_snapin_tile_instr_cnt;
     882           0 : }
     883             : 
     884             : 
     885             : static void
     886             : privileged_init( fd_topo_t *      topo,
     887           0 :                  fd_topo_tile_t * tile ) {
     888           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     889             : 
     890           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     891           0 :   fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
     892             : 
     893           0 :   FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
     894           0 : }
     895             : 
     896             : FD_FN_UNUSED static void
     897             : unprivileged_init( fd_topo_t *      topo,
     898           0 :                    fd_topo_tile_t * tile ) {
     899           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     900             : 
     901           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     902           0 :   fd_snapin_tile_t * ctx  = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t),     sizeof(fd_snapin_tile_t)                             );
     903           0 :   void * _ssparse         = FD_SCRATCH_ALLOC_APPEND( l, fd_ssparse_align(),            fd_ssparse_footprint( 1UL<<24UL )                    );
     904           0 :   void * _txncache        = FD_SCRATCH_ALLOC_APPEND( l, fd_txncache_align(),           fd_txncache_footprint( tile->snapin.max_live_slots ) );
     905           0 :   void * _manifest_parser = FD_SCRATCH_ALLOC_APPEND( l, fd_ssmanifest_parser_align(),  fd_ssmanifest_parser_footprint()                              );
     906           0 :   void * _sd_parser       = FD_SCRATCH_ALLOC_APPEND( l, fd_slot_delta_parser_align(),  fd_slot_delta_parser_footprint()                              );
     907           0 :   ctx->txncache_entries   = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sstxncache_entry_t), sizeof(fd_sstxncache_entry_t)*FD_SNAPIN_TXNCACHE_MAX_ENTRIES );
     908           0 :   ctx->blockhash_offsets  = FD_SCRATCH_ALLOC_APPEND( l, alignof(blockhash_group_t),     sizeof(blockhash_group_t)*FD_SNAPIN_MAX_SLOT_DELTA_GROUPS    );
     909             : 
     910           0 :   ctx->full = 1;
     911           0 :   ctx->state = FD_SNAPSHOT_STATE_IDLE;
     912             : 
     913           0 :   ctx->boot_timestamp = fd_log_wallclock();
     914             : 
     915           0 :   FD_TEST( fd_accdb_admin_join( ctx->accdb_admin, fd_topo_obj_laddr( topo, tile->snapin.funk_obj_id ) ) );
     916           0 :   FD_TEST( fd_accdb_user_join ( ctx->accdb,       fd_topo_obj_laddr( topo, tile->snapin.funk_obj_id ) ) );
     917           0 :   fd_funk_txn_xid_copy( ctx->xid, fd_funk_root( ctx->accdb_admin->funk ) );
     918             : 
     919           0 :   void * _txncache_shmem = fd_topo_obj_laddr( topo, tile->snapin.txncache_obj_id );
     920           0 :   fd_txncache_shmem_t * txncache_shmem = fd_txncache_shmem_join( _txncache_shmem );
     921           0 :   FD_TEST( txncache_shmem );
     922           0 :   ctx->txncache = fd_txncache_join( fd_txncache_new( _txncache, txncache_shmem ) );
     923           0 :   FD_TEST( ctx->txncache );
     924             : 
     925           0 :   ctx->txncache_entries_len = 0UL;
     926           0 :   ctx->blockhash_offsets_len = 0UL;
     927             : 
     928           0 :   ctx->ssparse = fd_ssparse_new( _ssparse, 1UL<<24UL, ctx->seed );
     929           0 :   FD_TEST( ctx->ssparse );
     930             : 
     931           0 :   ctx->manifest_parser = fd_ssmanifest_parser_join( fd_ssmanifest_parser_new( _manifest_parser ) );
     932           0 :   FD_TEST( ctx->manifest_parser );
     933             : 
     934           0 :   ctx->slot_delta_parser = fd_slot_delta_parser_join( fd_slot_delta_parser_new( _sd_parser ) );
     935           0 :   FD_TEST( ctx->slot_delta_parser );
     936             : 
     937           0 :   fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
     938             : 
     939           0 :   if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
     940           0 :   if( FD_UNLIKELY( tile->in_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1",  tile->in_cnt  ));
     941           0 :   if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2",  tile->out_cnt  ));
     942             : 
     943           0 :   fd_topo_link_t * snapct_link = &topo->links[ tile->out_link_id[ FD_SNAPIN_OUT_SNAPCT ] ];
     944           0 :   FD_TEST( 0==strcmp( snapct_link->name, "snapin_rd" ) );
     945             : 
     946           0 :   fd_topo_link_t * writer_link = &topo->links[ tile->out_link_id[ FD_SNAPIN_OUT_MANIFEST ] ];
     947           0 :   FD_TEST( 0==strcmp( writer_link->name, "snapin_manif" ) );
     948           0 :   ctx->manifest_out.wksp   = topo->workspaces[ topo->objs[ writer_link->dcache_obj_id ].wksp_id ].wksp;
     949           0 :   ctx->manifest_out.chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( writer_link->dcache ), writer_link->dcache );
     950           0 :   ctx->manifest_out.wmark  = fd_dcache_compact_wmark ( ctx->manifest_out.wksp, writer_link->dcache, writer_link->mtu );
     951           0 :   ctx->manifest_out.chunk  = ctx->manifest_out.chunk0;
     952           0 :   ctx->manifest_out.mtu    = writer_link->mtu;
     953             : 
     954           0 :   fd_ssparse_reset( ctx->ssparse );
     955           0 :   fd_ssmanifest_parser_init( ctx->manifest_parser, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ) );
     956           0 :   fd_slot_delta_parser_init( ctx->slot_delta_parser );
     957             : 
     958           0 :   fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0UL ] ];
     959           0 :   fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
     960           0 :   ctx->in.wksp                   = in_wksp->wksp;;
     961           0 :   ctx->in.chunk0                 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
     962           0 :   ctx->in.wmark                  = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
     963           0 :   ctx->in.mtu                    = in_link->mtu;
     964           0 :   ctx->in.pos                    = 0UL;
     965             : 
     966           0 :   fd_memset( &ctx->flags, 0, sizeof(ctx->flags) );
     967           0 : }
     968             : 
     969             : /* Control fragments can result in one extra publish to forward the
     970             :    message down the pipeline, in addition to the result / malformed
     971             :    message / etc. */
     972           0 : #define STEM_BURST 2UL
     973             : 
     974           0 : #define STEM_LAZY  1000L
     975             : 
     976           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_snapin_tile_t
     977           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapin_tile_t)
     978             : 
     979             : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
     980           0 : #define STEM_CALLBACK_METRICS_WRITE   metrics_write
     981           0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
     982             : 
     983             : #include "../../disco/stem/fd_stem.c"
     984             : 
     985             : fd_topo_run_tile_t fd_tile_snapin = {
     986             :   .name                     = NAME,
     987             :   .populate_allowed_fds     = populate_allowed_fds,
     988             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     989             :   .scratch_align            = scratch_align,
     990             :   .scratch_footprint        = scratch_footprint,
     991             :   .privileged_init          = privileged_init,
     992             :   .unprivileged_init        = unprivileged_init,
     993             :   .run                      = stem_run,
     994             : };
     995             : 
     996             : #undef NAME

Generated by: LCOV version 1.14