LCOV - code coverage report
Current view: top level - discof/restore - fd_snapin_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 207 0.0 %
Date: 2025-08-05 05:04:49 Functions: 0 14 0.0 %

          Line data    Source code
       1             : #include "utils/fd_ssctrl.h"
       2             : #include "utils/fd_snapshot_parser.h"
       3             : #include "utils/fd_ssmsg.h"
       4             : 
       5             : #include "../../disco/topo/fd_topo.h"
       6             : #include "../../disco/metrics/fd_metrics.h"
       7             : #include "../../flamenco/runtime/fd_acc_mgr.h"
       8             : #include "../../flamenco/types/fd_types.h"
       9             : #include "../../funk/fd_funk.h"
      10             : 
      11             : #define NAME "snapin"
      12             : 
      13             : /* The snapin tile is a state machine that parses and loads a full
      14             :    and optionally an incremental snapshot.  It is currently responsible
      15             :    for loading accounts into an in-memory database, though this may
      16             :    change. */
      17             : 
      18           0 : #define FD_SNAPIN_STATE_LOADING   (0) /* We are inserting accounts from a snapshot */
      19           0 : #define FD_SNAPIN_STATE_DONE      (1) /* We are done inserting accounts from a snapshot */
      20           0 : #define FD_SNAPIN_STATE_MALFORMED (1) /* The snapshot is malformed, we are waiting for a reset notification */
      21           0 : #define FD_SNAPIN_STATE_SHUTDOWN  (2) /* The tile is done, been told to shut down, and has likely already exited */
      22             : 
      23             : struct fd_snapin_tile {
      24             :   int full;
      25             :   int state;
      26             : 
      27             :   ulong seed;
      28             :   long boot_timestamp;
      29             : 
      30             :   fd_funk_t       funk[1];
      31             :   fd_funk_txn_t * funk_txn;
      32             :   uchar *         acc_data;
      33             : 
      34             :   fd_stem_context_t * stem;
      35             :   fd_snapshot_parser_t * ssparse;
      36             : 
      37             :   struct {
      38             :     ulong full_bytes_read;
      39             :     ulong incremental_bytes_read;
      40             :     ulong accounts_inserted;
      41             :   } metrics;
      42             : 
      43             :   struct {
      44             :     fd_wksp_t * wksp;
      45             :     ulong       chunk0;
      46             :     ulong       wmark;
      47             :     ulong       mtu;
      48             :   } in;
      49             : 
      50             :   struct {
      51             :     fd_wksp_t * wksp;
      52             :     ulong       chunk0;
      53             :     ulong       wmark;
      54             :     ulong       chunk;
      55             :     ulong       mtu;
      56             :   } manifest_out;
      57             : };
      58             : 
      59             : typedef struct fd_snapin_tile fd_snapin_tile_t;
      60             : 
      61             : static inline int
      62           0 : should_shutdown( fd_snapin_tile_t * ctx ) {
      63           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_SHUTDOWN ) ) {
      64           0 :     FD_LOG_NOTICE(( "loaded %.1fM accounts from snapshot in %.1f seconds", (double)ctx->metrics.accounts_inserted/1e6, (double)(fd_log_wallclock()-ctx->boot_timestamp)/1e9 ));
      65           0 :   }
      66           0 :   return ctx->state==FD_SNAPIN_STATE_SHUTDOWN;
      67           0 : }
      68             : 
      69             : static ulong
      70           0 : scratch_align( void ) {
      71           0 :   return 128UL;
      72           0 : }
      73             : 
      74             : static ulong
      75           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
      76           0 :   (void)tile;
      77           0 :   ulong l = FD_LAYOUT_INIT;
      78           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_snapin_tile_t),  sizeof(fd_snapin_tile_t)                  );
      79           0 :   l = FD_LAYOUT_APPEND( l, fd_snapshot_parser_align(), fd_snapshot_parser_footprint( 1UL<<24UL ) );
      80           0 :   return FD_LAYOUT_FINI( l, alignof(fd_snapin_tile_t) );
      81           0 : }
      82             : 
      83             : static void
      84           0 : metrics_write( fd_snapin_tile_t * ctx ) {
      85           0 :   FD_MGAUGE_SET( SNAPIN, FULL_BYTES_READ, ctx->metrics.full_bytes_read );
      86           0 :   FD_MGAUGE_SET( SNAPIN, INCREMENTAL_BYTES_READ, ctx->metrics.incremental_bytes_read );
      87             : 
      88           0 :   FD_MGAUGE_SET( SNAPIN, ACCOUNTS_INSERTED, ctx->metrics.accounts_inserted );
      89           0 :   FD_MGAUGE_SET( SNAPIN, STATE, (ulong)ctx->state );
      90           0 : }
      91             : 
      92             : static void
      93             : manifest_cb( void * _ctx,
      94           0 :              ulong  manifest_sz ) {
      95           0 :   fd_snapin_tile_t * ctx = (fd_snapin_tile_t*)_ctx;
      96             : 
      97           0 :   ulong sz = sizeof(fd_snapshot_manifest_t)+manifest_sz;
      98           0 :   FD_TEST( sz<=ctx->manifest_out.mtu );
      99           0 :   ulong sig = ctx->full ? fd_ssmsg_sig( FD_SSMSG_MANIFEST_FULL, manifest_sz ) :
     100           0 :                           fd_ssmsg_sig( FD_SSMSG_MANIFEST_INCREMENTAL, manifest_sz );
     101           0 :   fd_stem_publish( ctx->stem, 0UL, sig, ctx->manifest_out.chunk, sz, 0UL, 0UL, 0UL );
     102           0 :   ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk, sz, ctx->manifest_out.chunk0, ctx->manifest_out.wmark );
     103           0 : }
     104             : 
     105             : static int
     106             : is_duplicate_account( fd_snapin_tile_t * ctx,
     107           0 :                       uchar const *      account_pubkey ) {
     108           0 :   fd_account_meta_t const * rec_meta = fd_funk_get_acc_meta_readonly( ctx->funk,
     109           0 :                                                                       ctx->funk_txn,
     110           0 :                                                                       (fd_pubkey_t*)account_pubkey,
     111           0 :                                                                       NULL,
     112           0 :                                                                       NULL,
     113           0 :                                                                       NULL );
     114           0 :   if( FD_UNLIKELY( rec_meta ) ) {
     115           0 :     if( FD_LIKELY( rec_meta->slot>ctx->ssparse->accv_slot ) ) return 1;
     116             : 
     117             :     /* TODO: Reaching here means the existing value is a duplicate
     118             :        account.  We need to hash the existing account and subtract that
     119             :        hash from the running lthash. */
     120           0 :   }
     121             : 
     122           0 :   return 0;
     123           0 : }
     124             : 
     125             : static void
     126             : account_cb( void *                          _ctx,
     127           0 :             fd_solana_account_hdr_t const * hdr ) {
     128           0 :   fd_snapin_tile_t * ctx = (fd_snapin_tile_t*)_ctx;
     129             : 
     130           0 :   if( FD_UNLIKELY( is_duplicate_account( ctx, hdr->meta.pubkey ) ) ) {
     131           0 :     ctx->acc_data = NULL;
     132           0 :     return;
     133           0 :   }
     134             : 
     135           0 :   FD_TXN_ACCOUNT_DECL( rec );
     136           0 :   int err = fd_txn_account_init_from_funk_mutable( rec,
     137           0 :                                                    (fd_pubkey_t*)hdr->meta.pubkey,
     138           0 :                                                    ctx->funk,
     139           0 :                                                    ctx->funk_txn,
     140           0 :                                                    /* do_create */ 1,
     141           0 :                                                    hdr->meta.data_len );
     142           0 :   if( FD_UNLIKELY( err!=FD_ACC_MGR_SUCCESS ) ) FD_LOG_ERR(( "fd_txn_account_init_from_funk_mutable failed (%d)", err ));
     143             : 
     144           0 :   rec->vt->set_data_len( rec, hdr->meta.data_len );
     145           0 :   rec->vt->set_slot( rec, ctx->ssparse->accv_slot );
     146           0 :   rec->vt->set_hash( rec, &hdr->hash );
     147           0 :   rec->vt->set_info( rec, &hdr->info );
     148             : 
     149           0 :   ctx->acc_data = rec->vt->get_data_mut( rec );
     150           0 :   ctx->metrics.accounts_inserted++;
     151           0 :   fd_txn_account_mutable_fini( rec, ctx->funk, ctx->funk_txn );
     152           0 : }
     153             : 
     154             : static void
     155             : account_data_cb( void *        _ctx,
     156             :                  uchar const * buf,
     157           0 :                  ulong         data_sz ) {
     158           0 :   fd_snapin_tile_t * ctx = (fd_snapin_tile_t*)_ctx;
     159           0 :   if( FD_UNLIKELY( !ctx->acc_data ) ) return;
     160             : 
     161           0 :   fd_memcpy( ctx->acc_data, buf, data_sz );
     162           0 :   ctx->acc_data += data_sz;
     163           0 : }
     164             : 
     165             : static void
     166             : transition_malformed( fd_snapin_tile_t * ctx,
     167           0 :                      fd_stem_context_t * stem ) {
     168           0 :   ctx->state = FD_SNAPIN_STATE_MALFORMED;
     169           0 :   fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_MALFORMED, 0UL, 0UL, 0UL, 0UL, 0UL );
     170           0 : }
     171             : 
     172             : static void
     173             : handle_data_frag( fd_snapin_tile_t *  ctx,
     174             :                   ulong               chunk,
     175             :                   ulong               sz,
     176           0 :                   fd_stem_context_t * stem ) {
     177           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_MALFORMED ) ) return;
     178             : 
     179           0 :   FD_TEST( ctx->state==FD_SNAPIN_STATE_LOADING || ctx->state==FD_SNAPIN_STATE_DONE );
     180           0 :   FD_TEST( chunk>=ctx->in.chunk0 && chunk<=ctx->in.wmark && sz<=ctx->in.mtu );
     181             : 
     182           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_DONE ) ) {
     183           0 :     FD_LOG_WARNING(( "received data fragment while in done state" ));
     184           0 :     transition_malformed( ctx, stem );
     185           0 :     return;
     186           0 :   }
     187             : 
     188           0 :   uchar const * const chunk_start = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
     189           0 :   uchar const * const chunk_end = chunk_start + sz;
     190           0 :   uchar const *       cur       = chunk_start;
     191             : 
     192           0 :   for(;;) {
     193           0 :     if( FD_UNLIKELY( cur>=chunk_end ) ) {
     194           0 :       break;
     195           0 :     }
     196             : 
     197           0 :     cur = fd_snapshot_parser_process_chunk( ctx->ssparse, cur, (ulong)( chunk_end-cur ) );
     198           0 :     if( FD_UNLIKELY( ctx->ssparse->flags ) ) {
     199           0 :       if( FD_UNLIKELY( ctx->ssparse->flags & SNAP_FLAG_FAILED ) ) {
     200           0 :         transition_malformed( ctx, stem );
     201           0 :         return;
     202           0 :       }
     203           0 :     }
     204           0 :   }
     205             : 
     206           0 :   if( FD_UNLIKELY( ctx->ssparse->flags & SNAP_FLAG_DONE ) ) ctx->state = FD_SNAPIN_STATE_DONE;
     207             : 
     208           0 :   if( FD_LIKELY( ctx->full ) ) ctx->metrics.full_bytes_read += sz;
     209           0 :   else                         ctx->metrics.incremental_bytes_read += sz;
     210           0 : }
     211             : 
     212             : static void
     213             : handle_control_frag( fd_snapin_tile_t *  ctx,
     214             :                      fd_stem_context_t * stem,
     215           0 :                      ulong               sig ) {
     216           0 :   switch( sig ) {
     217           0 :     case FD_SNAPSHOT_MSG_CTRL_RESET_FULL:
     218           0 :       ctx->full = 1;
     219           0 :       fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
     220           0 :       fd_funk_txn_cancel_root( ctx->funk );
     221           0 :       ctx->state = FD_SNAPIN_STATE_LOADING;
     222           0 :       break;
     223           0 :     case FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL:
     224           0 :       ctx->full = 0;
     225           0 :       fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
     226           0 :       if( FD_UNLIKELY( !ctx->funk_txn ) ) fd_funk_txn_cancel_root( ctx->funk );
     227           0 :       else                                fd_funk_txn_cancel( ctx->funk, ctx->funk_txn, 0 );
     228           0 :       ctx->state = FD_SNAPIN_STATE_LOADING;
     229           0 :       break;
     230           0 :     case FD_SNAPSHOT_MSG_CTRL_EOF_FULL:
     231           0 :       FD_TEST( ctx->full );
     232           0 :       if( FD_UNLIKELY( ctx->state!=FD_SNAPIN_STATE_DONE ) ) {
     233           0 :         FD_LOG_WARNING(( "unexpected end of snapshot when not done parsing" ));
     234           0 :         transition_malformed( ctx, stem );
     235           0 :         break;
     236           0 :       }
     237             : 
     238           0 :       fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
     239             : 
     240           0 :       fd_funk_txn_xid_t incremental_xid = fd_funk_generate_xid();
     241           0 :       ctx->funk_txn = fd_funk_txn_prepare( ctx->funk, ctx->funk_txn, &incremental_xid, 0 );
     242           0 :       ctx->full     = 0;
     243           0 :       ctx->state    = FD_SNAPIN_STATE_LOADING;
     244           0 :       break;
     245           0 :     case FD_SNAPSHOT_MSG_CTRL_DONE:
     246           0 :       if( FD_UNLIKELY( ctx->state!=FD_SNAPIN_STATE_DONE ) ) {
     247           0 :         FD_LOG_WARNING(( "unexpected end of snapshot when not done parsing" ));
     248           0 :         transition_malformed( ctx, stem );
     249           0 :         break;
     250           0 :       }
     251             : 
     252           0 :       if( FD_LIKELY( ctx->funk_txn ) ) fd_funk_txn_publish_into_parent( ctx->funk, ctx->funk_txn, 0 );
     253           0 :       fd_stem_publish( stem, 0UL, fd_ssmsg_sig( FD_SSMSG_DONE, 0UL ), 0UL, 0UL, 0UL, 0UL, 0UL );
     254           0 :       break;
     255           0 :     case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
     256           0 :       ctx->state = FD_SNAPIN_STATE_SHUTDOWN;
     257           0 :       break;
     258           0 :     default:
     259           0 :       FD_LOG_ERR(( "unexpected control sig %lu", sig ));
     260           0 :       return;
     261           0 :   }
     262             : 
     263             :   /* We must acknowledge after handling the control frag, because if it
     264             :      causes us to generate a malformed transition, that must be sent
     265             :      back to the snaprd controller before the acknowledgement. */
     266           0 :   fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_ACK, 0UL, 0UL, 0UL, 0UL, 0UL );
     267           0 : }
     268             : 
     269             : static inline int
     270             : returnable_frag( fd_snapin_tile_t *  ctx,
     271             :                  ulong               in_idx,
     272             :                  ulong               seq,
     273             :                  ulong               sig,
     274             :                  ulong               chunk,
     275             :                  ulong               sz,
     276             :                  ulong               tsorig,
     277             :                  ulong               tspub,
     278           0 :                  fd_stem_context_t * stem ) {
     279           0 :   (void)in_idx;
     280           0 :   (void)seq;
     281           0 :   (void)sig;
     282           0 :   (void)tsorig;
     283           0 :   (void)tspub;
     284             : 
     285           0 :   ctx->stem = stem;
     286             : 
     287           0 :   FD_TEST( ctx->state!=FD_SNAPIN_STATE_SHUTDOWN );
     288             : 
     289           0 :   if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) handle_data_frag( ctx, chunk, sz, stem );
     290           0 :   else                                           handle_control_frag( ctx, stem, sig  );
     291             : 
     292           0 :   return 0;
     293           0 : }
     294             : 
     295             : static void
     296             : privileged_init( fd_topo_t *      topo,
     297           0 :                  fd_topo_tile_t * tile ) {
     298           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     299             : 
     300           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     301           0 :   fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
     302             : 
     303           0 :   FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
     304           0 : }
     305             : 
     306             : FD_FN_UNUSED static void
     307             : unprivileged_init( fd_topo_t *      topo,
     308           0 :                    fd_topo_tile_t * tile ) {
     309           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     310             : 
     311           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     312           0 :   fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t),  sizeof(fd_snapin_tile_t)                  );
     313           0 :   void * _ssparse        = FD_SCRATCH_ALLOC_APPEND( l, fd_snapshot_parser_align(), fd_snapshot_parser_footprint( 1UL<<24UL ) );
     314             : 
     315           0 :   ctx->full = 1;
     316           0 :   ctx->state = FD_SNAPIN_STATE_LOADING;
     317             : 
     318           0 :   ctx->boot_timestamp = fd_log_wallclock();
     319             : 
     320           0 :   FD_TEST( fd_funk_join( ctx->funk, fd_topo_obj_laddr( topo, tile->snapin.funk_obj_id ) ) );
     321           0 :   ctx->funk_txn = fd_funk_txn_query( fd_funk_root( ctx->funk ), ctx->funk->txn_map );
     322             : 
     323           0 :   ctx->ssparse = fd_snapshot_parser_new( _ssparse, ctx, ctx->seed, 1UL<<24UL, manifest_cb, account_cb, account_data_cb );
     324             : 
     325           0 :   FD_TEST( ctx->ssparse );
     326             : 
     327           0 :   fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
     328             : 
     329           0 :   if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
     330           0 :   if( FD_UNLIKELY( tile->in_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1",  tile->in_cnt  ));
     331           0 :   if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2",  tile->out_cnt  ));
     332             : 
     333           0 :   fd_topo_link_t * writer_link = &topo->links[ tile->out_link_id[ 0UL ] ];
     334           0 :   ctx->manifest_out.wksp    = topo->workspaces[ topo->objs[ writer_link->dcache_obj_id ].wksp_id ].wksp;
     335           0 :   ctx->manifest_out.chunk0  = fd_dcache_compact_chunk0( fd_wksp_containing( writer_link->dcache ), writer_link->dcache );
     336           0 :   ctx->manifest_out.wmark   = fd_dcache_compact_wmark ( ctx->manifest_out.wksp, writer_link->dcache, writer_link->mtu );
     337           0 :   ctx->manifest_out.chunk   = ctx->manifest_out.chunk0;
     338           0 :   ctx->manifest_out.mtu     = writer_link->mtu;
     339             : 
     340           0 :   fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
     341             : 
     342           0 :   fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0UL ] ];
     343           0 :   fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
     344           0 :   ctx->in.wksp                   = in_wksp->wksp;;
     345           0 :   ctx->in.chunk0                 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
     346           0 :   ctx->in.wmark                  = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
     347           0 :   ctx->in.mtu                    = in_link->mtu;
     348           0 : }
     349             : 
     350           0 : #define STEM_BURST 2UL /* For control fragments, one acknowledgement, and one malformed message */
     351           0 : #define STEM_LAZY  1000L
     352             : 
     353           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_snapin_tile_t
     354           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapin_tile_t)
     355             : 
     356             : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
     357           0 : #define STEM_CALLBACK_METRICS_WRITE   metrics_write
     358           0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
     359             : 
     360             : #include "../../disco/stem/fd_stem.c"
     361             : 
     362             : fd_topo_run_tile_t fd_tile_snapin = {
     363             :   .name              = NAME,
     364             :   .scratch_align     = scratch_align,
     365             :   .scratch_footprint = scratch_footprint,
     366             :   .privileged_init   = privileged_init,
     367             :   .unprivileged_init = unprivileged_init,
     368             :   .run               = stem_run,
     369             : };
     370             : 
     371             : #undef NAME

Generated by: LCOV version 1.14