LCOV - code coverage report
Current view: top level - discof/restore - fd_snaplv_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 393 0.0 %
Date: 2026-02-14 05:50:46 Functions: 0 20 0.0 %

          Line data    Source code
       1             : #include "fd_snaplv_tile_private.h"
       2             : #include "../../disco/topo/fd_topo.h"
       3             : #include "../../disco/metrics/fd_metrics.h"
       4             : #include "../../ballet/lthash/fd_lthash.h"
       5             : #include "../../util/pod/fd_pod.h"
       6             : #include "../../vinyl/bstream/fd_vinyl_bstream.h"
       7             : #include "generated/fd_snaplv_tile_seccomp.h"
       8             : 
       9             : #include "utils/fd_ssctrl.h"
      10             : #include "utils/fd_vinyl_admin.h"
      11             : 
      12             : #define NAME "snaplv"
      13             : 
      14           0 : #define IN_KIND_SNAPWM (0)
      15           0 : #define IN_KIND_SNAPLH (1)
      16             : #define MAX_IN_LINKS   (16UL) /* at least 1 snapwm and FD_SNAPSHOT_MAX_SNAPLH_TILES */
      17             : 
      18             : #define OUT_LINK_CNT (3UL)
      19           0 : #define OUT_LINK_LH  (0)
      20           0 : #define OUT_LINK_CT  (1)
      21             : #define OUT_LINK_WR  (2)
      22             : 
      23             : struct out_link {
      24             :   ulong       idx;
      25             :   fd_wksp_t * mem;
      26             :   ulong       chunk0;
      27             :   ulong       wmark;
      28             :   ulong       chunk;
      29             : };
      30             : typedef struct out_link out_link_t;
      31             : 
      32             : struct fd_snaplv_tile {
      33             :   uint                state;
      34             :   int                 full;
      35             : 
      36             :   ulong               num_hash_tiles;
      37             :   ulong               num_write_tiles;
      38             : 
      39             :   uchar               in_kind[MAX_IN_LINKS];
      40             :   ulong               adder_in_offset;
      41             : 
      42             :   out_link_t          out_link[OUT_LINK_CNT];
      43             : 
      44             :   struct {
      45             :     ulong             bstream_seq_last;
      46             :     struct {
      47             :       int                     active[FD_SNAPLV_DUP_PENDING_CNT_MAX];
      48             :       ulong                   seq   [FD_SNAPLV_DUP_PENDING_CNT_MAX];
      49             :       fd_vinyl_bstream_phdr_t phdr  [FD_SNAPLV_DUP_PENDING_CNT_MAX];
      50             :     } pending;
      51             :     ulong             pending_cnt;
      52             :     fd_vinyl_admin_t *  admin;
      53             :   } vinyl;
      54             : 
      55             :   struct {
      56             :     fd_lthash_value_t expected_lthash;
      57             :     fd_lthash_value_t calculated_lthash;
      58             :     ulong             received_lthashes;
      59             :     ulong             ack_sig;
      60             :     int               awaiting_results;
      61             :     int               hash_check_done;
      62             :   } hash_accum;
      63             : 
      64             :   fd_lthash_value_t   running_lthash;
      65             : 
      66             :   struct {
      67             :     ulong exp_sig;
      68             :     ulong ack_cnt;
      69             :     int   wait;
      70             :   } fail;
      71             : 
      72             :   struct {
      73             :     fd_lthash_value_t full_lthash;
      74             :   } recovery;
      75             : 
      76             :   struct {
      77             :     struct {
      78             :       ulong           duplicate_accounts_hashed;
      79             :     } full;
      80             : 
      81             :     struct {
      82             :       ulong           duplicate_accounts_hashed;
      83             :     } incremental;
      84             :   } metrics;
      85             : 
      86             :   struct {
      87             :     fd_wksp_t *       wksp;
      88             :     ulong             chunk0;
      89             :     ulong             wmark;
      90             :     ulong             mtu;
      91             :     ulong             pos;
      92             :   } in;
      93             : 
      94             :   struct {
      95             :     fd_wksp_t *       wksp;
      96             :     ulong             chunk0;
      97             :     ulong             wmark;
      98             :     ulong             mtu;
      99             :   } adder_in[FD_SNAPSHOT_MAX_SNAPLH_TILES];
     100             : };
     101             : 
     102             : typedef struct fd_snaplv_tile fd_snaplv_t;
     103             : 
     104             : static inline int
     105           0 : should_shutdown( fd_snaplv_t * ctx ) {
     106           0 :   return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
     107           0 : }
     108             : 
     109             : static ulong
     110           0 : scratch_align( void ) {
     111           0 :   return alignof(fd_snaplv_t);
     112           0 : }
     113             : 
     114             : static ulong
     115           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     116           0 :   (void)tile;
     117           0 :   ulong l = FD_LAYOUT_INIT;
     118           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_snaplv_t), sizeof(fd_snaplv_t) );
     119           0 :   return FD_LAYOUT_FINI( l, alignof(fd_snaplv_t) );
     120           0 : }
     121             : 
     122             : static void
     123           0 : metrics_write( fd_snaplv_t * ctx ) {
     124           0 :   (void)ctx;
     125           0 :   FD_MGAUGE_SET( SNAPLV, FULL_DUPLICATE_ACCOUNTS_HASHED,        ctx->metrics.full.duplicate_accounts_hashed );
     126           0 :   FD_MGAUGE_SET( SNAPLV, INCREMENTAL_DUPLICATE_ACCOUNTS_HASHED, ctx->metrics.incremental.duplicate_accounts_hashed );
     127           0 :   FD_MGAUGE_SET( SNAPLV, STATE,                                 (ulong)(ctx->state) );
     128           0 : }
     129             : 
     130             : static void
     131             : transition_malformed( fd_snaplv_t *  ctx,
     132           0 :                       fd_stem_context_t * stem ) {
     133           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) return;
     134           0 :   ctx->state = FD_SNAPSHOT_STATE_ERROR;
     135           0 :   fd_stem_publish( stem, ctx->out_link[ OUT_LINK_LH ].idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
     136           0 :   fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
     137           0 : }
     138             : 
     139             : static void
     140             : handle_vinyl_lthash_request( fd_snaplv_t *             ctx,
     141             :                              fd_stem_context_t *       stem,
     142             :                              ulong                     seq,
     143           0 :                              fd_vinyl_bstream_phdr_t * acc_hdr ) {
     144             : 
     145           0 :   out_link_t * o_link = &ctx->out_link[ OUT_LINK_LH ];
     146           0 :   uchar * data = fd_chunk_to_laddr( o_link->mem, o_link->chunk );
     147           0 :   memcpy( data, &seq, sizeof(ulong) );
     148           0 :   memcpy( data + sizeof(ulong), acc_hdr, sizeof(fd_vinyl_bstream_phdr_t) );
     149           0 :   ulong data_sz = sizeof(ulong)+sizeof(fd_vinyl_bstream_phdr_t);
     150           0 :   fd_stem_publish( stem, ctx->out_link[ OUT_LINK_LH ].idx, FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH, o_link->chunk, data_sz, 0UL, 0UL, 0UL );
     151           0 :   o_link->chunk = fd_dcache_compact_next( o_link->chunk, data_sz, o_link->chunk0, o_link->wmark );
     152             : 
     153           0 :   if( ctx->full ) ctx->metrics.full.duplicate_accounts_hashed++;
     154           0 :   else            ctx->metrics.incremental.duplicate_accounts_hashed++;
     155           0 : }
     156             : 
     157             : static inline void
     158           0 : handle_vinyl_lthash_seq_sync( fd_snaplv_t * ctx ) {
     159           0 :   ulong bstream_seq_min = ULONG_MAX;
     160           0 :   for( ulong i=0; i<ctx->num_write_tiles; i++ ) {
     161             :     /* There is a way to avoid a lock here: every wr_seq[i] is a ulong,
     162             :        each assigned to a unique write tile, and it works the same way
     163             :        as a stem's fseq or an mcache's seq.  Therefore, from the point
     164             :        of view snaplv, it can directly read them at any time.
     165             :        Only snapwm is allowed to overwrite the wr_seq array during the
     166             :        initialization of a full/incr snapshot, but it does so after
     167             :        synchronizing with the write tiles (making sure that they have
     168             :        already completed all pending writes) and before instructing
     169             :        snaplv to start processing the snapshot. */
     170           0 :     ulong bstream_seq = fd_vinyl_admin_ulong_query( &ctx->vinyl.admin->wr_seq[ i ] );
     171           0 :     bstream_seq_min = fd_ulong_min( bstream_seq_min, bstream_seq );
     172           0 :   }
     173           0 :   ctx->vinyl.bstream_seq_last = bstream_seq_min;
     174           0 : }
     175             : 
     176             : static inline int
     177             : handle_vinyl_lthash_seq_check_fast( fd_snaplv_t * ctx,
     178           0 :                                     ulong              seq ) {
     179           0 :   return seq < ctx->vinyl.bstream_seq_last;
     180           0 : }
     181             : 
     182             : static inline void
     183             : handle_vinyl_lthash_seq_check_until_match( fd_snaplv_t * ctx,
     184             :                                            ulong         seq,
     185           0 :                                            int           do_sleep ) {
     186           0 :   for(;;) {
     187           0 :     if( handle_vinyl_lthash_seq_check_fast( ctx, seq ) ) break;
     188           0 :     FD_SPIN_PAUSE();
     189           0 :     if( do_sleep ) fd_log_sleep( (long)1e3 ); /* 1 microsecond */
     190           0 :     handle_vinyl_lthash_seq_sync( ctx );
     191           0 :   }
     192           0 : }
     193             : 
     194             : static inline void
     195             : handle_vinyl_lthash_request_drain_all( fd_snaplv_t *       ctx,
     196           0 :                                        fd_stem_context_t * stem ) {
     197           0 :   for( ulong i=0; i<FD_SNAPLV_DUP_PENDING_CNT_MAX; i++ ) {
     198           0 :     if( !ctx->vinyl.pending.active[ i ] ) continue;
     199           0 :     handle_vinyl_lthash_seq_check_until_match( ctx, ctx->vinyl.pending.seq[ i ], 1/*do_sleep*/ );
     200           0 :     handle_vinyl_lthash_request( ctx, stem, ctx->vinyl.pending.seq[ i ], &ctx->vinyl.pending.phdr[ i ] );
     201           0 :     ctx->vinyl.pending.active[ i ] = 0;
     202           0 :     ctx->vinyl.pending_cnt--;
     203           0 :   }
     204           0 :   FD_TEST( !ctx->vinyl.pending_cnt );
     205           0 : }
     206             : 
     207             : static inline void
     208             : handle_vinyl_lthash_pending_list( fd_snaplv_t *        ctx,
     209           0 :                                   fd_stem_context_t *  stem ) {
     210             :   /* Try to consume as many pending requests as possible. */
     211           0 :   for( ulong i=0; i<FD_SNAPLV_DUP_PENDING_CNT_MAX; i++ ) {
     212           0 :     if( FD_LIKELY( !ctx->vinyl.pending.active[ i ] ) ) continue;
     213           0 :     if( handle_vinyl_lthash_seq_check_fast( ctx, ctx->vinyl.pending.seq[ i ] ) ) {
     214           0 :       handle_vinyl_lthash_request( ctx, stem, ctx->vinyl.pending.seq[ i ], &ctx->vinyl.pending.phdr[ i ] );
     215           0 :       ctx->vinyl.pending.active[ i ] = 0;
     216           0 :       ctx->vinyl.pending_cnt--;
     217           0 :     }
     218           0 :   }
     219           0 : }
     220             : 
     221             : static void
     222             : handle_data_frag( fd_snaplv_t *       ctx,
     223             :                   fd_stem_context_t * stem,
     224             :                   ulong               sig,
     225             :                   ulong               chunk,
     226             :                   ulong               sz,
     227           0 :                   ulong               tspub ) {
     228           0 :   (void)chunk; (void)sz;
     229             : 
     230           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
     231             :     /* skip all data frags when in error state. */
     232           0 :     return;
     233           0 :   }
     234           0 :   if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) ) {
     235           0 :     FD_LOG_ERR(( "invalid state %u for data frag %lu", ctx->state, sig ));
     236           0 :     return;
     237           0 :   }
     238           0 :   if( FD_UNLIKELY( sig!=FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH ) ) {
     239           0 :     FD_LOG_ERR(( "unexpected sig %lu in handle_data_frag", sig ));
     240           0 :     return;
     241           0 :   }
     242             : 
     243           0 :   uchar const * in_data = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
     244             : 
     245           0 :   ulong const acc_sz    = sizeof(ulong)+sizeof(fd_vinyl_bstream_phdr_t);
     246           0 :   ulong const batch_sz  = sz;
     247           0 :   ulong const batch_cnt = tspub;
     248           0 :   if( FD_UNLIKELY( batch_cnt>FD_SNAPLV_DUP_BATCH_IN_CNT_MAX ) ) {
     249           0 :     FD_LOG_CRIT(( "batch count %lu exceeds FD_SNAPLV_DUP_BATCH_IN_CNT_MAX %lu", batch_cnt, FD_SNAPLV_DUP_BATCH_IN_CNT_MAX ));
     250           0 :   }
     251           0 :   if( FD_UNLIKELY( (batch_cnt*acc_sz)!=batch_sz ) ) {
     252           0 :     FD_LOG_CRIT(( "batch count %lu with account size %lu does not match batch size %lu", batch_cnt, acc_sz, batch_sz ));
     253           0 :   }
     254             : 
     255           0 :   for( ulong acc_i=0UL; acc_i<batch_cnt; acc_i++ ) {
     256             : 
     257             :     /* move in_data pointer forward */
     258           0 :     uchar const * acc_data = in_data;
     259           0 :     in_data += acc_sz;
     260             : 
     261           0 :     ulong acc_data_seq = fd_ulong_load_8( acc_data );
     262             : 
     263           0 :     if( FD_LIKELY( handle_vinyl_lthash_seq_check_fast( ctx, acc_data_seq ) ) ) {
     264             :       /* The request can be processed immediately, skipping the pending list. */
     265           0 :       fd_vinyl_bstream_phdr_t acc_data_phdr[1];
     266           0 :       memcpy( acc_data_phdr, acc_data + sizeof(ulong), sizeof(fd_vinyl_bstream_phdr_t) );
     267           0 :       handle_vinyl_lthash_request( ctx, stem, acc_data_seq, acc_data_phdr );
     268           0 :       continue;
     269           0 :     }
     270             : 
     271             :     /* Find an empty slot in the pending list. */
     272           0 :     ulong seq_min_i = ULONG_MAX;
     273           0 :     ulong seq_min   = ULONG_MAX;
     274           0 :     ulong free_i    = ULONG_MAX;
     275           0 :     if( FD_UNLIKELY( ctx->vinyl.pending_cnt==FD_SNAPLV_DUP_PENDING_CNT_MAX ) ) {
     276             :       /* an entry must be consumed to free a slot */
     277           0 :       for( ulong i=0; i<FD_SNAPLV_DUP_PENDING_CNT_MAX; i++ ) {
     278           0 :         ulong seq = ctx->vinyl.pending.seq[ i ];
     279           0 :         seq_min_i = fd_ulong_if( seq_min > seq, i, seq_min_i );
     280           0 :         seq_min   = fd_ulong_min( seq_min, seq );
     281           0 :       }
     282           0 :       handle_vinyl_lthash_seq_check_until_match( ctx, ctx->vinyl.pending.seq[ seq_min_i ], 1/*do_sleep*/ );
     283           0 :       handle_vinyl_lthash_request( ctx, stem, ctx->vinyl.pending.seq[ seq_min_i ], &ctx->vinyl.pending.phdr[ seq_min_i ] );
     284           0 :       ctx->vinyl.pending.active[ seq_min_i ] = 0;
     285           0 :       ctx->vinyl.pending_cnt--;
     286           0 :       free_i = seq_min_i;
     287           0 :     } else {
     288             :       /* Pick a free slot. */
     289           0 :       free_i = 0UL;
     290           0 :       for( ; free_i<FD_SNAPLV_DUP_PENDING_CNT_MAX; free_i++ ) {
     291           0 :         if( !ctx->vinyl.pending.active[ free_i ] ) break;
     292           0 :       }
     293           0 :     }
     294             : 
     295             :     /* Populate the free slot. */
     296           0 :     ctx->vinyl.pending.seq[ free_i ] = acc_data_seq;
     297           0 :     memcpy( &ctx->vinyl.pending.phdr[ free_i ], acc_data + sizeof(ulong), sizeof(fd_vinyl_bstream_phdr_t) );
     298           0 :     ctx->vinyl.pending.active[ free_i ] = 1;
     299           0 :     ctx->vinyl.pending_cnt++;
     300           0 :   }
     301           0 : }
     302             : 
     303             : static void
     304             : handle_control_frag( fd_snaplv_t *       ctx,
     305             :                      fd_stem_context_t * stem,
     306             :                      ulong               sig,
     307             :                      ulong               in_idx,
     308             :                      ulong               tsorig,
     309           0 :                      ulong               tspub ) {
     310           0 :   (void)in_idx;
     311             : 
     312           0 :   if( ctx->in_kind[ in_idx ]==IN_KIND_SNAPLH ) {
     313           0 :     if( FD_UNLIKELY( !ctx->fail.wait ) ) FD_LOG_CRIT(( "received unexpected sig %lu msg from snaplh", sig ));
     314           0 :     if( FD_UNLIKELY( sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) ) FD_LOG_CRIT(( "received incorrect sig %lu msg from snaplh", sig ));
     315           0 :     ctx->fail.ack_cnt++;
     316           0 :     return;
     317           0 :   }
     318             : 
     319           0 :   if( ctx->state==FD_SNAPSHOT_STATE_ERROR && sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) {
     320             :     /* Control messages move along the snapshot load pipeline.  Since
     321             :        error conditions can be triggered by any tile in the pipeline,
     322             :        it is possible to be in error state and still receive otherwise
     323             :        valid messages.  Only a fail message can revert this. */
     324           0 :     return;
     325           0 :   };
     326             : 
     327           0 :   int forward_to_ct = 1;
     328             : 
     329           0 :   switch( sig ) {
     330           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
     331           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_INCR: {
     332           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
     333           0 :       ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
     334           0 :       ctx->full  = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
     335           0 :       fd_lthash_zero( &ctx->running_lthash );
     336             : 
     337           0 :       if( sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL ) {
     338           0 :         fd_lthash_zero( &ctx->hash_accum.calculated_lthash );
     339           0 :         fd_lthash_zero( &ctx->recovery.full_lthash );
     340           0 :       } else {
     341             :         /* The lthash for the incremental snapshot is computed starting
     342             :            from the full snapshot lthash.  Since an init message may
     343             :            be received after a fail message, always start from the
     344             :            recovery value. */
     345           0 :         ctx->hash_accum.calculated_lthash = ctx->recovery.full_lthash;
     346           0 :       }
     347             : 
     348           0 :       break;
     349           0 :     }
     350             : 
     351           0 :     case FD_SNAPSHOT_MSG_CTRL_FINI: {
     352           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING );
     353           0 :       ctx->state = FD_SNAPSHOT_STATE_FINISHING;
     354           0 :       ctx->hash_accum.ack_sig          = sig;
     355           0 :       ctx->hash_accum.awaiting_results = 1;
     356           0 :       handle_vinyl_lthash_request_drain_all( ctx, stem );
     357           0 :       forward_to_ct = 0;
     358           0 :       break; /* the ack is sent when all hashes are received */
     359           0 :     }
     360             : 
     361           0 :     case FD_SNAPSHOT_MSG_CTRL_NEXT:
     362           0 :     case FD_SNAPSHOT_MSG_CTRL_DONE: {
     363           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
     364           0 :       ctx->state = FD_SNAPSHOT_STATE_IDLE;
     365             :       /* back up full_lthash for future recovery. */
     366           0 :       if( sig==FD_SNAPSHOT_MSG_CTRL_NEXT ) {
     367           0 :         ctx->recovery.full_lthash = ctx->hash_accum.calculated_lthash;
     368           0 :       }
     369           0 :       break;
     370           0 :     }
     371             : 
     372           0 :     case FD_SNAPSHOT_MSG_CTRL_ERROR: {
     373           0 :       FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     374           0 :       ctx->state = FD_SNAPSHOT_STATE_ERROR;
     375           0 :       break;
     376           0 :     }
     377             : 
     378           0 :     case FD_SNAPSHOT_MSG_CTRL_FAIL: {
     379           0 :       FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     380           0 :       ctx->state = FD_SNAPSHOT_STATE_IDLE;
     381           0 :       ctx->fail.exp_sig = FD_SNAPSHOT_MSG_CTRL_FAIL;
     382           0 :       ctx->fail.ack_cnt = 0UL;
     383           0 :       ctx->fail.wait = 1;
     384           0 :       forward_to_ct = 0;
     385           0 :       break;
     386           0 :     }
     387             : 
     388           0 :     case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: {
     389           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
     390           0 :       ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
     391           0 :       break;
     392           0 :     }
     393             : 
     394           0 :     default: {
     395           0 :       FD_LOG_ERR(( "unexpected control sig %lu", sig ));
     396           0 :       break;
     397           0 :     }
     398           0 :   }
     399             : 
     400             :   /* Forward the control message down the pipeline */
     401           0 :   fd_stem_publish( stem, ctx->out_link[ OUT_LINK_LH ].idx, sig, 0UL, 0UL, 0UL, tsorig, tspub );
     402           0 :   if( !forward_to_ct ) return;
     403           0 :   fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, sig, 0UL, 0UL, 0UL, tsorig, tspub );
     404           0 : }
     405             : 
     406             : static void
     407             : handle_hash_frag( fd_snaplv_t * ctx,
     408             :                   ulong         in_idx,
     409             :                   ulong         sig,
     410             :                   ulong         chunk,
     411           0 :                   ulong         sz ) {
     412           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
     413             :     /* skip all hash frags when in error state. */
     414           0 :     return;
     415           0 :   }
     416           0 :   if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING &&
     417           0 :                    ctx->state!=FD_SNAPSHOT_STATE_FINISHING ) ) {
     418           0 :     if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWM) ) FD_LOG_ERR(( "invalid state for data frag %u", ctx->state ));
     419           0 :     return;
     420           0 :   }
     421           0 :   switch( sig ) {
     422           0 :     case FD_SNAPSHOT_HASH_MSG_RESULT_ADD: {
     423           0 :       FD_TEST( sz==sizeof(fd_lthash_value_t) );
     424           0 :       FD_TEST( ctx->in_kind[ in_idx ]==IN_KIND_SNAPLH );
     425           0 :       fd_lthash_value_t const * result = fd_chunk_to_laddr_const( ctx->adder_in[ in_idx-ctx->adder_in_offset ].wksp, chunk );
     426           0 :       fd_lthash_add( &ctx->hash_accum.calculated_lthash, result );
     427           0 :       ctx->hash_accum.received_lthashes++;
     428           0 :       break;
     429           0 :     }
     430           0 :     case FD_SNAPSHOT_HASH_MSG_RESULT_SUB: {
     431           0 :       FD_TEST( sz==sizeof(fd_lthash_value_t) );
     432           0 :       FD_TEST( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWM );
     433           0 :       fd_lthash_value_t const * result = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
     434           0 :       fd_lthash_sub( &ctx->hash_accum.calculated_lthash, result );
     435           0 :       break;
     436           0 :     }
     437           0 :     case FD_SNAPSHOT_HASH_MSG_EXPECTED: {
     438           0 :       FD_TEST( sz==sizeof(fd_lthash_value_t) );
     439           0 :       FD_TEST( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWM );
     440           0 :       fd_lthash_value_t const * result = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
     441           0 :       ctx->hash_accum.expected_lthash = *result;
     442           0 :       break;
     443           0 :     }
     444           0 :     default: {
     445           0 :       FD_LOG_ERR(( "unexpected hash sig %lu", sig ));
     446           0 :       break;
     447           0 :     }
     448           0 :   }
     449           0 : }
     450             : 
     451             : static inline int
     452             : returnable_frag( fd_snaplv_t *       ctx,
     453             :                  ulong               in_idx,
     454             :                  ulong               seq    FD_PARAM_UNUSED,
     455             :                  ulong               sig,
     456             :                  ulong               chunk,
     457             :                  ulong               sz,
     458             :                  ulong               ctl    FD_PARAM_UNUSED,
     459             :                  ulong               tsorig,
     460             :                  ulong               tspub,
     461           0 :                  fd_stem_context_t * stem ) {
     462           0 :   FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     463             : 
     464           0 :   if( FD_LIKELY( sig==FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH ) ) handle_data_frag( ctx, stem, sig, chunk, sz, tspub );
     465           0 :   else if( FD_LIKELY( sig==FD_SNAPSHOT_HASH_MSG_RESULT_ADD ||
     466           0 :                       sig==FD_SNAPSHOT_HASH_MSG_RESULT_SUB ||
     467           0 :                       sig==FD_SNAPSHOT_HASH_MSG_EXPECTED ) )  handle_hash_frag( ctx, in_idx, sig, chunk, sz );
     468           0 :   else                                                        handle_control_frag( ctx, stem, sig, in_idx, tsorig, tspub );
     469             : 
     470           0 :   return 0;
     471           0 : }
     472             : 
     473             : static void
     474             : before_credit( fd_snaplv_t *       ctx,
     475             :                fd_stem_context_t * stem FD_PARAM_UNUSED,
     476           0 :                int *               charge_busy ) {
     477           0 :   *charge_busy = 0;
     478           0 :   handle_vinyl_lthash_seq_sync( ctx );
     479           0 : }
     480             : 
     481             : static void
     482             : after_credit( fd_snaplv_t *        ctx,
     483             :               fd_stem_context_t *  stem,
     484             :               int *                opt_poll_in FD_PARAM_UNUSED,
     485           0 :               int *                charge_busy FD_PARAM_UNUSED ) {
     486             : 
     487           0 :   handle_vinyl_lthash_pending_list( ctx, stem );
     488             : 
     489           0 :   if( FD_UNLIKELY( ctx->hash_accum.awaiting_results && ctx->hash_accum.received_lthashes==ctx->num_hash_tiles ) ) {
     490             : 
     491           0 :     ctx->hash_accum.awaiting_results  = 0;
     492           0 :     ctx->hash_accum.received_lthashes = 0UL;
     493             : 
     494           0 :     fd_lthash_sub( &ctx->hash_accum.calculated_lthash, &ctx->running_lthash );
     495             : 
     496           0 :     int test = memcmp( &ctx->hash_accum.expected_lthash, &ctx->hash_accum.calculated_lthash, sizeof(fd_lthash_value_t) );
     497             : 
     498           0 :     if( FD_UNLIKELY( test ) ) {
     499             :       /* SnapshotError::MismatchedHash
     500             :          https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L479 */
     501           0 :       FD_LOG_WARNING(( "calculated accounts lthash %s does not match accounts lthash %s in snapshot manifest",
     502           0 :                         FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.calculated_lthash ),
     503           0 :                         FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.expected_lthash ) ));
     504           0 :       transition_malformed( ctx, stem );
     505           0 :       return;
     506           0 :     } else {
     507           0 :       FD_LOG_NOTICE(( "calculated accounts lthash %s matches accounts lthash %s in snapshot manifest",
     508           0 :                       FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.calculated_lthash ),
     509           0 :                       FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.expected_lthash ) ));
     510           0 :       fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, ctx->hash_accum.ack_sig, 0UL, 0UL, 0UL, 0UL, 0UL );
     511           0 :     }
     512           0 :   }
     513             : 
     514           0 :   if( FD_UNLIKELY( ctx->fail.wait && ctx->fail.ack_cnt==ctx->num_hash_tiles ) ) {
     515           0 :     fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, ctx->fail.exp_sig, 0UL, 0UL, 0UL, 0UL, 0UL );
     516           0 :     ctx->fail.exp_sig = 0UL;
     517           0 :     ctx->fail.ack_cnt = 0UL;
     518           0 :     ctx->fail.wait = 0;
     519           0 :     return;
     520           0 :   }
     521           0 : }
     522             : 
     523             : static ulong
     524             : populate_allowed_fds( fd_topo_t      const * topo FD_PARAM_UNUSED,
     525             :                       fd_topo_tile_t const * tile FD_PARAM_UNUSED,
     526             :                       ulong                  out_fds_cnt,
     527           0 :                       int *                  out_fds ) {
     528           0 :   if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
     529             : 
     530           0 :   ulong out_cnt = 0;
     531           0 :   out_fds[ out_cnt++ ] = 2UL; /* stderr */
     532           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
     533           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     534           0 :   }
     535           0 :   return out_cnt;
     536           0 : }
     537             : 
     538             : static ulong
     539             : populate_allowed_seccomp( fd_topo_t const *      topo FD_PARAM_UNUSED,
     540             :                           fd_topo_tile_t const * tile FD_PARAM_UNUSED,
     541             :                           ulong                  out_cnt,
     542           0 :                           struct sock_filter *   out ) {
     543           0 :   populate_sock_filter_policy_fd_snaplv_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
     544           0 :   return sock_filter_policy_fd_snaplv_tile_instr_cnt;
     545           0 : }
     546             : 
     547             : static void
     548             : unprivileged_init( fd_topo_t *      topo,
     549           0 :                    fd_topo_tile_t * tile ) {
     550           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     551             : 
     552           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     553           0 :   fd_snaplv_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplv_t), sizeof(fd_snaplv_t)         );
     554             : 
     555           0 :   FD_TEST( fd_topo_tile_name_cnt( topo, "snaplh" )<=FD_SNAPSHOT_MAX_SNAPLH_TILES );
     556             : 
     557           0 :   ulong expected_in_cnt = 1UL + fd_topo_tile_name_cnt( topo, "snaplh" );
     558           0 :   if( FD_UNLIKELY( tile->in_cnt!=expected_in_cnt ) )  FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected %lu",  tile->in_cnt, expected_in_cnt ));
     559           0 :   if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt ));
     560             : 
     561           0 :   ulong adder_idx = 0UL;
     562           0 :   for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
     563           0 :     fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
     564           0 :     fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
     565             : 
     566           0 :     if( FD_LIKELY( 0==strcmp( in_link->name, "snapwm_lv" ) ) ) {
     567           0 :       ctx->in.wksp                   = in_wksp->wksp;
     568           0 :       ctx->in.chunk0                 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
     569           0 :       ctx->in.wmark                  = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
     570           0 :       ctx->in.mtu                    = in_link->mtu;
     571           0 :       ctx->in.pos                    = 0UL;
     572           0 :       ctx->in_kind[ i ]              = IN_KIND_SNAPWM;
     573             : 
     574           0 :     } else if( FD_LIKELY( 0==strcmp( in_link->name, "snaplh_lv" ) ) ) {
     575           0 :       ctx->adder_in[ adder_idx ].wksp    = in_wksp->wksp;
     576           0 :       ctx->adder_in[ adder_idx ].chunk0  = fd_dcache_compact_chunk0( ctx->adder_in[ adder_idx ].wksp, in_link->dcache );
     577           0 :       ctx->adder_in[ adder_idx ].wmark   = fd_dcache_compact_wmark ( ctx->adder_in[ adder_idx ].wksp, in_link->dcache, in_link->mtu );
     578           0 :       ctx->adder_in[ adder_idx ].mtu     = in_link->mtu;
     579           0 :       ctx->in_kind[ i ]                  = IN_KIND_SNAPLH;
     580           0 :       if( FD_LIKELY( adder_idx==0UL ) ) ctx->adder_in_offset = i;
     581           0 :       adder_idx++;
     582             : 
     583           0 :     } else {
     584           0 :       FD_LOG_ERR(( "tile `" NAME "` has unexpected in link name `%s`", in_link->name ));
     585           0 :     }
     586           0 :   }
     587             : 
     588           0 :   ctx->vinyl.bstream_seq_last = 0UL;
     589             : 
     590           0 :   for( uint i=0U; i<(tile->out_cnt); i++ ) {
     591           0 :     fd_topo_link_t * link = &topo->links[ tile->out_link_id[ i ] ];
     592             : 
     593           0 :     if( 0==strcmp( link->name, "snaplv_ct" ) ) {
     594           0 :       out_link_t * o_link = &ctx->out_link[ OUT_LINK_CT ];
     595           0 :       o_link->idx    = i;
     596           0 :       o_link->mem    = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
     597           0 :       o_link->chunk0 = 0UL;
     598           0 :       o_link->wmark  = 0UL;
     599           0 :       o_link->chunk  = 0UL;
     600             : 
     601           0 :     } else if( 0==strcmp( link->name, "snaplv_lh" ) ) {
     602           0 :       out_link_t * o_link = &ctx->out_link[ OUT_LINK_LH ];
     603           0 :       o_link->idx    = i;
     604           0 :       o_link->mem    = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
     605           0 :       o_link->chunk0 = fd_dcache_compact_chunk0( o_link->mem, link->dcache );
     606           0 :       o_link->wmark  = fd_dcache_compact_wmark( o_link->mem, link->dcache, link->mtu );
     607           0 :       o_link->chunk  = o_link->chunk0;
     608             : 
     609           0 :     } else {
     610           0 :       FD_LOG_ERR(( "unexpected output link %s", link->name ));
     611           0 :     }
     612           0 :   }
     613             : 
     614           0 :   memset( ctx->vinyl.pending.active, 0, FD_SNAPLV_DUP_PENDING_CNT_MAX*sizeof(int) );
     615           0 :   ctx->vinyl.pending_cnt = 0;
     616             : 
     617           0 :   ulong vinyl_admin_obj_id = fd_pod_query_ulong( topo->props, "vinyl_admin", ULONG_MAX );
     618           0 :   FD_TEST( vinyl_admin_obj_id!=ULONG_MAX );
     619           0 :   fd_vinyl_admin_t * vinyl_admin = fd_vinyl_admin_join( fd_topo_obj_laddr( topo, vinyl_admin_obj_id ) );
     620           0 :   FD_TEST( vinyl_admin );
     621           0 :   ctx->vinyl.admin = vinyl_admin;
     622           0 :   for(;;) {
     623             :     /* This query can be done without the need of an rwlock. */
     624           0 :     ulong vinyl_admin_status = fd_vinyl_admin_ulong_query( &vinyl_admin->status );
     625           0 :     if( FD_LIKELY( vinyl_admin_status!=FD_VINYL_ADMIN_STATUS_INIT_PENDING &&
     626           0 :                    vinyl_admin_status!=FD_VINYL_ADMIN_STATUS_ERROR ) ) break;
     627           0 :     fd_log_sleep( (long)1e6 /*1ms*/ );
     628           0 :     FD_SPIN_PAUSE();
     629           0 :   }
     630             : 
     631           0 :   ctx->metrics.full.duplicate_accounts_hashed        = 0UL;
     632           0 :   ctx->metrics.incremental.duplicate_accounts_hashed = 0UL;
     633             : 
     634           0 :   ctx->state                        = FD_SNAPSHOT_STATE_IDLE;
     635           0 :   ctx->full                         = 1;
     636             : 
     637           0 :   ctx->num_hash_tiles               = fd_topo_tile_name_cnt( topo, "snaplh" );
     638           0 :   ctx->num_write_tiles              = fd_topo_tile_name_cnt( topo, "snapwr" );
     639           0 :   FD_TEST( ctx->num_write_tiles<=FD_VINYL_ADMIN_WR_SEQ_CNT_MAX );
     640             : 
     641           0 :   ctx->hash_accum.received_lthashes = 0UL;
     642           0 :   ctx->hash_accum.awaiting_results  = 0;
     643           0 :   ctx->hash_accum.hash_check_done   = 0;
     644             : 
     645           0 :   fd_lthash_zero( &ctx->hash_accum.calculated_lthash );
     646           0 :   fd_lthash_zero( &ctx->running_lthash );
     647           0 :   fd_lthash_zero( &ctx->recovery.full_lthash );
     648             : 
     649           0 :   ctx->fail.exp_sig = 0UL;
     650           0 :   ctx->fail.ack_cnt = 0UL;
     651           0 :   ctx->fail.wait    = 0;
     652           0 : }
     653             : 
     654           0 : #define STEM_BURST (FD_SNAPLV_STEM_BURST)
     655           0 : #define STEM_LAZY  1000L
     656             : 
     657           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_snaplv_t
     658           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaplv_t)
     659             : 
     660             : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
     661           0 : #define STEM_CALLBACK_METRICS_WRITE   metrics_write
     662           0 : #define STEM_CALLBACK_BEFORE_CREDIT   before_credit
     663           0 : #define STEM_CALLBACK_AFTER_CREDIT    after_credit
     664           0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
     665             : 
     666             : 
     667             : #include "../../disco/stem/fd_stem.c"
     668             : 
     669             : fd_topo_run_tile_t fd_tile_snaplv = {
     670             :   .name                     = NAME,
     671             :   .populate_allowed_fds     = populate_allowed_fds,
     672             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     673             :   .scratch_align            = scratch_align,
     674             :   .scratch_footprint        = scratch_footprint,
     675             :   .unprivileged_init        = unprivileged_init,
     676             :   .run                      = stem_run,
     677             : };
     678             : 
     679             : #undef NAME

Generated by: LCOV version 1.14