LCOV - code coverage report
Current view: top level - discof/restore - fd_snapin_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 205 0.0 %
Date: 2025-09-19 04:41:14 Functions: 0 13 0.0 %

          Line data    Source code
       1             : #include "utils/fd_ssctrl.h"
       2             : #include "utils/fd_snapshot_parser.h"
       3             : #include "utils/fd_ssmsg.h"
       4             : 
       5             : #include "../../disco/topo/fd_topo.h"
       6             : #include "../../disco/metrics/fd_metrics.h"
       7             : #include "../../flamenco/runtime/fd_acc_mgr.h"
       8             : #include "../../flamenco/types/fd_types.h"
       9             : #include "../../funk/fd_funk.h"
      10             : 
      11             : #define NAME "snapin"
      12             : 
      13             : /* The snapin tile is a state machine that parses and loads a full
      14             :    and optionally an incremental snapshot.  It is currently responsible
      15             :    for loading accounts into an in-memory database, though this may
      16             :    change. */
      17             : 
      18           0 : #define FD_SNAPIN_STATE_LOADING   (0) /* We are inserting accounts from a snapshot */
      19           0 : #define FD_SNAPIN_STATE_DONE      (1) /* We are done inserting accounts from a snapshot */
      20           0 : #define FD_SNAPIN_STATE_MALFORMED (1) /* The snapshot is malformed, we are waiting for a reset notification */
      21           0 : #define FD_SNAPIN_STATE_SHUTDOWN  (2) /* The tile is done, been told to shut down, and has likely already exited */
      22             : 
      23             : struct fd_snapin_tile {
      24             :   int full;
      25             :   int state;
      26             : 
      27             :   ulong seed;
      28             :   long boot_timestamp;
      29             : 
      30             :   fd_funk_t       funk[1];
      31             :   fd_funk_txn_t * funk_txn;
      32             :   uchar *         acc_data;
      33             : 
      34             :   fd_stem_context_t * stem;
      35             :   fd_snapshot_parser_t * ssparse;
      36             : 
      37             :   struct {
      38             :     ulong full_bytes_read;
      39             :     ulong incremental_bytes_read;
      40             :     ulong accounts_inserted;
      41             :   } metrics;
      42             : 
      43             :   struct {
      44             :     fd_wksp_t * wksp;
      45             :     ulong       chunk0;
      46             :     ulong       wmark;
      47             :     ulong       mtu;
      48             :   } in;
      49             : 
      50             :   struct {
      51             :     fd_wksp_t * wksp;
      52             :     ulong       chunk0;
      53             :     ulong       wmark;
      54             :     ulong       chunk;
      55             :     ulong       mtu;
      56             :   } manifest_out;
      57             : };
      58             : 
      59             : typedef struct fd_snapin_tile fd_snapin_tile_t;
      60             : 
      61             : static inline int
      62           0 : should_shutdown( fd_snapin_tile_t * ctx ) {
      63           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_SHUTDOWN ) ) {
      64           0 :     FD_LOG_NOTICE(( "loaded %.1fM accounts from snapshot in %.1f seconds", (double)ctx->metrics.accounts_inserted/1e6, (double)(fd_log_wallclock()-ctx->boot_timestamp)/1e9 ));
      65           0 :   }
      66           0 :   return ctx->state==FD_SNAPIN_STATE_SHUTDOWN;
      67           0 : }
      68             : 
      69             : static ulong
      70           0 : scratch_align( void ) {
      71           0 :   return 128UL;
      72           0 : }
      73             : 
      74             : static ulong
      75           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
      76           0 :   (void)tile;
      77           0 :   ulong l = FD_LAYOUT_INIT;
      78           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_snapin_tile_t),  sizeof(fd_snapin_tile_t)                  );
      79           0 :   l = FD_LAYOUT_APPEND( l, fd_snapshot_parser_align(), fd_snapshot_parser_footprint( 1UL<<24UL ) );
      80           0 :   return FD_LAYOUT_FINI( l, alignof(fd_snapin_tile_t) );
      81           0 : }
      82             : 
      83             : static void
      84           0 : metrics_write( fd_snapin_tile_t * ctx ) {
      85           0 :   FD_MGAUGE_SET( SNAPIN, FULL_BYTES_READ, ctx->metrics.full_bytes_read );
      86           0 :   FD_MGAUGE_SET( SNAPIN, INCREMENTAL_BYTES_READ, ctx->metrics.incremental_bytes_read );
      87             : 
      88           0 :   FD_MGAUGE_SET( SNAPIN, ACCOUNTS_INSERTED, ctx->metrics.accounts_inserted );
      89           0 :   FD_MGAUGE_SET( SNAPIN, STATE, (ulong)ctx->state );
      90           0 : }
      91             : 
      92             : static void
      93           0 : manifest_cb( void * _ctx ) {
      94           0 :   fd_snapin_tile_t * ctx = (fd_snapin_tile_t*)_ctx;
      95             : 
      96           0 :   ulong sig = ctx->full ? fd_ssmsg_sig( FD_SSMSG_MANIFEST_FULL ) :
      97           0 :                           fd_ssmsg_sig( FD_SSMSG_MANIFEST_INCREMENTAL );
      98           0 :   fd_stem_publish( ctx->stem, 0UL, sig, ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), 0UL, 0UL, 0UL );
      99           0 :   ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), ctx->manifest_out.chunk0, ctx->manifest_out.wmark );
     100           0 : }
     101             : 
     102             : static void
     103             : account_cb( void *                          _ctx,
     104           0 :             fd_solana_account_hdr_t const * hdr ) {
     105           0 :   fd_snapin_tile_t * ctx = (fd_snapin_tile_t*)_ctx;
     106             : 
     107           0 :   fd_funk_rec_key_t id = fd_funk_acc_key( (fd_pubkey_t*)hdr->meta.pubkey );
     108           0 :   fd_funk_rec_query_t query[1];
     109           0 :   fd_funk_rec_t const * rec = fd_funk_rec_query_try( ctx->funk, ctx->funk_txn, &id, query );
     110             : 
     111           0 :   int should_publish = 0;
     112           0 :   fd_funk_rec_prepare_t prepare[1];
     113           0 :   if( FD_LIKELY( !rec ) ) {
     114           0 :     should_publish = 1;
     115           0 :     rec = fd_funk_rec_prepare( ctx->funk, ctx->funk_txn, &id, prepare, NULL );
     116           0 :     FD_TEST( rec );
     117           0 :   }
     118             : 
     119           0 :   fd_account_meta_t * meta = fd_funk_val( rec, ctx->funk->wksp );
     120           0 :   if( FD_UNLIKELY( meta ) ) {
     121           0 :     if( FD_LIKELY( meta->slot>ctx->ssparse->accv_slot ) ) {
     122           0 :       ctx->acc_data = NULL;
     123           0 :       return;
     124           0 :     }
     125             : 
     126             :     /* TODO: Reaching here means the existing value is a duplicate
     127             :        account.  We need to hash the existing account and subtract that
     128             :        hash from the running lthash. */
     129           0 :   }
     130             : 
     131           0 :   if( FD_LIKELY( rec->val_sz<sizeof(fd_account_meta_t)+hdr->meta.data_len ) ) {
     132           0 :     meta = fd_funk_val_truncate( (fd_funk_rec_t*)rec, ctx->funk->alloc, ctx->funk->wksp, 0UL, sizeof(fd_account_meta_t)+hdr->meta.data_len, NULL );
     133           0 :     FD_TEST( meta );
     134           0 :   }
     135             : 
     136           0 :   meta->dlen       = (uint)hdr->meta.data_len;
     137           0 :   meta->slot       = ctx->ssparse->accv_slot;
     138           0 :   memcpy( meta->owner, hdr->info.owner, sizeof(fd_pubkey_t) );
     139           0 :   meta->lamports   = hdr->info.lamports;
     140           0 :   meta->executable = hdr->info.executable;
     141             : 
     142           0 :   ctx->acc_data = (uchar*)meta + sizeof(fd_account_meta_t);
     143           0 :   ctx->metrics.accounts_inserted++;
     144             : 
     145           0 :   if( FD_LIKELY( should_publish ) ) fd_funk_rec_publish( ctx->funk, prepare );
     146           0 : }
     147             : 
     148             : static void
     149             : account_data_cb( void *        _ctx,
     150             :                  uchar const * buf,
     151           0 :                  ulong         data_sz ) {
     152           0 :   fd_snapin_tile_t * ctx = (fd_snapin_tile_t*)_ctx;
     153           0 :   if( FD_UNLIKELY( !ctx->acc_data ) ) return;
     154             : 
     155           0 :   fd_memcpy( ctx->acc_data, buf, data_sz );
     156           0 :   ctx->acc_data += data_sz;
     157           0 : }
     158             : 
     159             : static void
     160             : transition_malformed( fd_snapin_tile_t * ctx,
     161           0 :                      fd_stem_context_t * stem ) {
     162           0 :   ctx->state = FD_SNAPIN_STATE_MALFORMED;
     163           0 :   fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_MALFORMED, 0UL, 0UL, 0UL, 0UL, 0UL );
     164           0 : }
     165             : 
     166             : static void
     167             : handle_data_frag( fd_snapin_tile_t *  ctx,
     168             :                   ulong               chunk,
     169             :                   ulong               sz,
     170           0 :                   fd_stem_context_t * stem ) {
     171           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_MALFORMED ) ) return;
     172             : 
     173           0 :   FD_TEST( ctx->state==FD_SNAPIN_STATE_LOADING || ctx->state==FD_SNAPIN_STATE_DONE );
     174           0 :   FD_TEST( chunk>=ctx->in.chunk0 && chunk<=ctx->in.wmark && sz<=ctx->in.mtu );
     175             : 
     176           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_DONE ) ) {
     177           0 :     FD_LOG_WARNING(( "received data fragment while in done state" ));
     178           0 :     transition_malformed( ctx, stem );
     179           0 :     return;
     180           0 :   }
     181             : 
     182           0 :   uchar const * const chunk_start = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
     183           0 :   uchar const * const chunk_end = chunk_start + sz;
     184           0 :   uchar const *       cur       = chunk_start;
     185             : 
     186           0 :   for(;;) {
     187           0 :     if( FD_UNLIKELY( cur>=chunk_end ) ) {
     188           0 :       break;
     189           0 :     }
     190             : 
     191           0 :     cur = fd_snapshot_parser_process_chunk( ctx->ssparse, cur, (ulong)( chunk_end-cur ) );
     192           0 :     if( FD_UNLIKELY( ctx->ssparse->flags ) ) {
     193           0 :       if( FD_UNLIKELY( ctx->ssparse->flags & SNAP_FLAG_FAILED ) ) {
     194           0 :         transition_malformed( ctx, stem );
     195           0 :         return;
     196           0 :       }
     197           0 :     }
     198           0 :   }
     199             : 
     200           0 :   if( FD_UNLIKELY( ctx->ssparse->flags & SNAP_FLAG_DONE ) ) ctx->state = FD_SNAPIN_STATE_DONE;
     201             : 
     202           0 :   if( FD_LIKELY( ctx->full ) ) ctx->metrics.full_bytes_read += sz;
     203           0 :   else                         ctx->metrics.incremental_bytes_read += sz;
     204           0 : }
     205             : 
     206             : static void
     207             : handle_control_frag( fd_snapin_tile_t *  ctx,
     208             :                      fd_stem_context_t * stem,
     209           0 :                      ulong               sig ) {
     210           0 :   switch( sig ) {
     211           0 :     case FD_SNAPSHOT_MSG_CTRL_RESET_FULL:
     212           0 :       ctx->full = 1;
     213           0 :       fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
     214           0 :       fd_funk_txn_cancel_root( ctx->funk );
     215           0 :       ctx->state = FD_SNAPIN_STATE_LOADING;
     216           0 :       break;
     217           0 :     case FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL:
     218           0 :       ctx->full = 0;
     219           0 :       fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
     220           0 :       if( FD_UNLIKELY( !ctx->funk_txn ) ) fd_funk_txn_cancel_root( ctx->funk );
     221           0 :       else                                fd_funk_txn_cancel( ctx->funk, ctx->funk_txn, 0 );
     222           0 :       ctx->state = FD_SNAPIN_STATE_LOADING;
     223           0 :       break;
     224           0 :     case FD_SNAPSHOT_MSG_CTRL_EOF_FULL:
     225           0 :       FD_TEST( ctx->full );
     226           0 :       if( FD_UNLIKELY( ctx->state!=FD_SNAPIN_STATE_DONE ) ) {
     227           0 :         FD_LOG_WARNING(( "unexpected end of snapshot when not done parsing" ));
     228           0 :         transition_malformed( ctx, stem );
     229           0 :         break;
     230           0 :       }
     231             : 
     232           0 :       fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
     233             : 
     234           0 :       fd_funk_txn_xid_t incremental_xid = fd_funk_generate_xid();
     235           0 :       ctx->funk_txn = fd_funk_txn_prepare( ctx->funk, ctx->funk_txn, &incremental_xid, 0 );
     236           0 :       FD_TEST( ctx->funk_txn );
     237             : 
     238           0 :       ctx->full     = 0;
     239           0 :       ctx->state    = FD_SNAPIN_STATE_LOADING;
     240           0 :       break;
     241           0 :     case FD_SNAPSHOT_MSG_CTRL_DONE:
     242           0 :       if( FD_UNLIKELY( ctx->state!=FD_SNAPIN_STATE_DONE ) ) {
     243           0 :         FD_LOG_WARNING(( "unexpected end of snapshot when not done parsing" ));
     244           0 :         transition_malformed( ctx, stem );
     245           0 :         break;
     246           0 :       }
     247             : 
     248           0 :       if( FD_LIKELY( ctx->funk_txn ) ) fd_funk_txn_publish_into_parent( ctx->funk, ctx->funk_txn, 0 );
     249           0 :       fd_stem_publish( stem, 0UL, fd_ssmsg_sig( FD_SSMSG_DONE ), 0UL, 0UL, 0UL, 0UL, 0UL );
     250           0 :       break;
     251           0 :     case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
     252           0 :       ctx->state = FD_SNAPIN_STATE_SHUTDOWN;
     253           0 :       metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
     254           0 :       break;
     255           0 :     default:
     256           0 :       FD_LOG_ERR(( "unexpected control sig %lu", sig ));
     257           0 :       return;
     258           0 :   }
     259             : 
     260             :   /* We must acknowledge after handling the control frag, because if it
     261             :      causes us to generate a malformed transition, that must be sent
     262             :      back to the snaprd controller before the acknowledgement. */
     263           0 :   fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_ACK, 0UL, 0UL, 0UL, 0UL, 0UL );
     264           0 : }
     265             : 
     266             : static inline int
     267             : returnable_frag( fd_snapin_tile_t *  ctx,
     268             :                  ulong               in_idx,
     269             :                  ulong               seq,
     270             :                  ulong               sig,
     271             :                  ulong               chunk,
     272             :                  ulong               sz,
     273             :                  ulong               ctl,
     274             :                  ulong               tsorig,
     275             :                  ulong               tspub,
     276           0 :                  fd_stem_context_t * stem ) {
     277           0 :   (void)in_idx;
     278           0 :   (void)seq;
     279           0 :   (void)ctl;
     280           0 :   (void)tsorig;
     281           0 :   (void)tspub;
     282             : 
     283           0 :   ctx->stem = stem;
     284             : 
     285           0 :   FD_TEST( ctx->state!=FD_SNAPIN_STATE_SHUTDOWN );
     286             : 
     287           0 :   if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) handle_data_frag( ctx, chunk, sz, stem );
     288           0 :   else                                           handle_control_frag( ctx, stem, sig  );
     289             : 
     290           0 :   return 0;
     291           0 : }
     292             : 
     293             : static void
     294             : privileged_init( fd_topo_t *      topo,
     295           0 :                  fd_topo_tile_t * tile ) {
     296           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     297             : 
     298           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     299           0 :   fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
     300             : 
     301           0 :   FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
     302           0 : }
     303             : 
     304             : FD_FN_UNUSED static void
     305             : unprivileged_init( fd_topo_t *      topo,
     306           0 :                    fd_topo_tile_t * tile ) {
     307           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     308             : 
     309           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     310           0 :   fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t),  sizeof(fd_snapin_tile_t)                  );
     311           0 :   void * _ssparse        = FD_SCRATCH_ALLOC_APPEND( l, fd_snapshot_parser_align(), fd_snapshot_parser_footprint( 1UL<<24UL ) );
     312             : 
     313           0 :   ctx->full = 1;
     314           0 :   ctx->state = FD_SNAPIN_STATE_LOADING;
     315             : 
     316           0 :   ctx->boot_timestamp = fd_log_wallclock();
     317             : 
     318           0 :   FD_TEST( fd_funk_join( ctx->funk, fd_topo_obj_laddr( topo, tile->snapin.funk_obj_id ) ) );
     319           0 :   ctx->funk_txn = fd_funk_txn_query( fd_funk_root( ctx->funk ), ctx->funk->txn_map );
     320             : 
     321           0 :   ctx->ssparse = fd_snapshot_parser_new( _ssparse, ctx, ctx->seed, 1UL<<24UL, manifest_cb, account_cb, account_data_cb );
     322             : 
     323           0 :   FD_TEST( ctx->ssparse );
     324             : 
     325           0 :   fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
     326             : 
     327           0 :   if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
     328           0 :   if( FD_UNLIKELY( tile->in_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1",  tile->in_cnt  ));
     329           0 :   if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2",  tile->out_cnt  ));
     330             : 
     331           0 :   fd_topo_link_t * writer_link = &topo->links[ tile->out_link_id[ 0UL ] ];
     332           0 :   ctx->manifest_out.wksp    = topo->workspaces[ topo->objs[ writer_link->dcache_obj_id ].wksp_id ].wksp;
     333           0 :   ctx->manifest_out.chunk0  = fd_dcache_compact_chunk0( fd_wksp_containing( writer_link->dcache ), writer_link->dcache );
     334           0 :   ctx->manifest_out.wmark   = fd_dcache_compact_wmark ( ctx->manifest_out.wksp, writer_link->dcache, writer_link->mtu );
     335           0 :   ctx->manifest_out.chunk   = ctx->manifest_out.chunk0;
     336           0 :   ctx->manifest_out.mtu     = writer_link->mtu;
     337             : 
     338           0 :   fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
     339             : 
     340           0 :   fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0UL ] ];
     341           0 :   fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
     342           0 :   ctx->in.wksp                   = in_wksp->wksp;;
     343           0 :   ctx->in.chunk0                 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
     344           0 :   ctx->in.wmark                  = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
     345           0 :   ctx->in.mtu                    = in_link->mtu;
     346           0 : }
     347             : 
     348           0 : #define STEM_BURST 2UL /* For control fragments, one acknowledgement, and one malformed message */
     349           0 : #define STEM_LAZY  1000L
     350             : 
     351           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_snapin_tile_t
     352           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapin_tile_t)
     353             : 
     354             : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
     355           0 : #define STEM_CALLBACK_METRICS_WRITE   metrics_write
     356           0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
     357             : 
     358             : #include "../../disco/stem/fd_stem.c"
     359             : 
     360             : fd_topo_run_tile_t fd_tile_snapin = {
     361             :   .name              = NAME,
     362             :   .scratch_align     = scratch_align,
     363             :   .scratch_footprint = scratch_footprint,
     364             :   .privileged_init   = privileged_init,
     365             :   .unprivileged_init = unprivileged_init,
     366             :   .run               = stem_run,
     367             : };
     368             : 
     369             : #undef NAME

Generated by: LCOV version 1.14