LCOV - code coverage report
Current view: top level - discof/restore - fd_snaplv_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 461 0.0 %
Date: 2026-03-31 06:22:16 Functions: 0 21 0.0 %

          Line data    Source code
       1             : #include "fd_snaplv_tile_private.h"
       2             : #include "../../disco/topo/fd_topo.h"
       3             : #include "../../disco/metrics/fd_metrics.h"
       4             : #include "../../ballet/lthash/fd_lthash.h"
       5             : #include "../../util/pod/fd_pod.h"
       6             : #include "../../vinyl/bstream/fd_vinyl_bstream.h"
       7             : #include "generated/fd_snaplv_tile_seccomp.h"
       8             : 
       9             : #include "utils/fd_ssctrl.h"
      10             : #include "utils/fd_vinyl_admin.h"
      11             : 
      12             : #define NAME "snaplv"
      13             : 
      14           0 : #define IN_KIND_SNAPWM (0)
      15           0 : #define IN_KIND_SNAPLH (1)
      16             : #define MAX_IN_LINKS   (16UL) /* at least 1 snapwm and FD_SNAPSHOT_MAX_SNAPLH_TILES */
      17             : 
      18             : #define OUT_LINK_CNT (3UL)
      19           0 : #define OUT_LINK_LH  (0)
      20           0 : #define OUT_LINK_CT  (1)
      21             : #define OUT_LINK_WR  (2)
      22             : 
      23             : struct out_link {
      24             :   ulong       idx;
      25             :   fd_wksp_t * mem;
      26             :   ulong       chunk0;
      27             :   ulong       wmark;
      28             :   ulong       chunk;
      29             : };
      30             : typedef struct out_link out_link_t;
      31             : 
      32             : struct fd_snaplv_tile {
      33             :   uint                state;
      34             :   int                 full;
      35             : 
      36             :   ulong               num_hash_tiles;
      37             :   ulong               num_write_tiles;
      38             : 
      39             :   uchar               in_kind[MAX_IN_LINKS];
      40             :   ulong               adder_in_offset;
      41             : 
      42             :   out_link_t          out_link[OUT_LINK_CNT];
      43             : 
      44             :   long running_capitalization;
      45             :   long dup_capitalization;
      46             :   long manifest_capitalization;
      47             : 
      48             :   struct {
      49             :     ulong             bstream_seq_last;
      50             :     struct {
      51             :       int                     active[FD_SNAPLV_DUP_PENDING_CNT_MAX];
      52             :       ulong                   seq   [FD_SNAPLV_DUP_PENDING_CNT_MAX];
      53             :       fd_vinyl_bstream_phdr_t phdr  [FD_SNAPLV_DUP_PENDING_CNT_MAX];
      54             :     } pending;
      55             :     ulong             pending_cnt;
      56             :     fd_vinyl_admin_t *  admin;
      57             :   } vinyl;
      58             : 
      59             :   struct {
      60             :     fd_lthash_value_t expected_lthash;
      61             :     fd_lthash_value_t calculated_lthash;
      62             :     ulong             received_lthashes;
      63             :     ulong             ack_sig;
      64             :     int               awaiting_results;
      65             :     int               hash_check_done;
      66             :   } hash_accum;
      67             : 
      68             :   struct {
      69             :     ulong exp_sig;
      70             :     ulong ack_cnt;
      71             :     int   wait;
      72             :   } fail;
      73             : 
      74             :   struct {
      75             :     fd_lthash_value_t full_lthash;
      76             :     long              capitalization;
      77             :   } recovery;
      78             : 
      79             :   struct {
      80             :     struct {
      81             :       ulong           duplicate_accounts_hashed;
      82             :     } full;
      83             : 
      84             :     struct {
      85             :       ulong           duplicate_accounts_hashed;
      86             :     } incremental;
      87             :   } metrics;
      88             : 
      89             :   struct {
      90             :     fd_wksp_t *       wksp;
      91             :     ulong             chunk0;
      92             :     ulong             wmark;
      93             :     ulong             mtu;
      94             :     ulong             pos;
      95             :   } in;
      96             : 
      97             :   struct {
      98             :     fd_wksp_t *       wksp;
      99             :     ulong             chunk0;
     100             :     ulong             wmark;
     101             :     ulong             mtu;
     102             :   } adder_in[FD_SNAPSHOT_MAX_SNAPLH_TILES];
     103             : };
     104             : 
     105             : typedef struct fd_snaplv_tile fd_snaplv_t;
     106             : 
     107             : static inline int
     108           0 : should_shutdown( fd_snaplv_t * ctx ) {
     109           0 :   return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
     110           0 : }
     111             : 
     112             : static ulong
     113           0 : scratch_align( void ) {
     114           0 :   return alignof(fd_snaplv_t);
     115           0 : }
     116             : 
     117             : static ulong
     118           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     119           0 :   (void)tile;
     120           0 :   ulong l = FD_LAYOUT_INIT;
     121           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_snaplv_t), sizeof(fd_snaplv_t) );
     122           0 :   return FD_LAYOUT_FINI( l, alignof(fd_snaplv_t) );
     123           0 : }
     124             : 
     125             : static void
     126           0 : metrics_write( fd_snaplv_t * ctx ) {
     127           0 :   (void)ctx;
     128           0 :   FD_MGAUGE_SET( SNAPLV, FULL_DUPLICATE_ACCOUNTS_HASHED,        ctx->metrics.full.duplicate_accounts_hashed );
     129           0 :   FD_MGAUGE_SET( SNAPLV, INCREMENTAL_DUPLICATE_ACCOUNTS_HASHED, ctx->metrics.incremental.duplicate_accounts_hashed );
     130           0 :   FD_MGAUGE_SET( SNAPLV, STATE,                                 (ulong)(ctx->state) );
     131           0 : }
     132             : 
     133             : static void
     134             : transition_malformed( fd_snaplv_t *  ctx,
     135           0 :                       fd_stem_context_t * stem ) {
     136           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) return;
     137           0 :   ctx->state = FD_SNAPSHOT_STATE_ERROR;
     138           0 :   fd_stem_publish( stem, ctx->out_link[ OUT_LINK_LH ].idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
     139           0 :   fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
     140           0 : }
     141             : 
     142             : static void
     143             : handle_vinyl_lthash_request( fd_snaplv_t *             ctx,
     144             :                              fd_stem_context_t *       stem,
     145             :                              ulong                     seq,
     146           0 :                              fd_vinyl_bstream_phdr_t * acc_hdr ) {
     147             : 
     148           0 :   out_link_t * o_link = &ctx->out_link[ OUT_LINK_LH ];
     149           0 :   uchar * data = fd_chunk_to_laddr( o_link->mem, o_link->chunk );
     150           0 :   memcpy( data, &seq, sizeof(ulong) );
     151           0 :   memcpy( data + sizeof(ulong), acc_hdr, sizeof(fd_vinyl_bstream_phdr_t) );
     152           0 :   ulong data_sz = sizeof(ulong)+sizeof(fd_vinyl_bstream_phdr_t);
     153           0 :   fd_stem_publish( stem, ctx->out_link[ OUT_LINK_LH ].idx, FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH, o_link->chunk, data_sz, 0UL, 0UL, 0UL );
     154           0 :   o_link->chunk = fd_dcache_compact_next( o_link->chunk, data_sz, o_link->chunk0, o_link->wmark );
     155             : 
     156           0 :   if( ctx->full ) ctx->metrics.full.duplicate_accounts_hashed++;
     157           0 :   else            ctx->metrics.incremental.duplicate_accounts_hashed++;
     158           0 : }
     159             : 
     160             : static inline void
     161           0 : handle_vinyl_lthash_seq_sync( fd_snaplv_t * ctx ) {
     162           0 :   ulong bstream_seq_min = ULONG_MAX;
     163           0 :   for( ulong i=0; i<ctx->num_write_tiles; i++ ) {
     164             :     /* There is a way to avoid a lock here: every wr_seq[i] is a ulong,
     165             :        each assigned to a unique write tile, and it works the same way
     166             :        as a stem's fseq or an mcache's seq.  Therefore, from the point
     167             :        of view snaplv, it can directly read them at any time.
     168             :        Only snapwm is allowed to overwrite the wr_seq array during the
     169             :        initialization of a full/incr snapshot, but it does so after
     170             :        synchronizing with the write tiles (making sure that they have
     171             :        already completed all pending writes) and before instructing
     172             :        snaplv to start processing the snapshot. */
     173           0 :     ulong bstream_seq = fd_vinyl_admin_ulong_query( &ctx->vinyl.admin->wr_seq[ i ] );
     174           0 :     bstream_seq_min = fd_ulong_min( bstream_seq_min, bstream_seq );
     175           0 :   }
     176           0 :   ctx->vinyl.bstream_seq_last = bstream_seq_min;
     177           0 : }
     178             : 
     179             : static inline int
     180             : handle_vinyl_lthash_seq_check_fast( fd_snaplv_t * ctx,
     181           0 :                                     ulong              seq ) {
     182           0 :   return seq < ctx->vinyl.bstream_seq_last;
     183           0 : }
     184             : 
     185             : static inline void
     186             : handle_vinyl_lthash_seq_check_until_match( fd_snaplv_t * ctx,
     187             :                                            ulong         seq,
     188           0 :                                            int           do_sleep ) {
     189           0 :   for(;;) {
     190           0 :     if( handle_vinyl_lthash_seq_check_fast( ctx, seq ) ) break;
     191           0 :     FD_SPIN_PAUSE();
     192           0 :     if( do_sleep ) fd_log_sleep( (long)1e3 ); /* 1 microsecond */
     193           0 :     handle_vinyl_lthash_seq_sync( ctx );
     194           0 :   }
     195           0 : }
     196             : 
     197             : static inline void
     198             : handle_vinyl_lthash_request_drain_all( fd_snaplv_t *       ctx,
     199           0 :                                        fd_stem_context_t * stem ) {
     200           0 :   for( ulong i=0; i<FD_SNAPLV_DUP_PENDING_CNT_MAX; i++ ) {
     201           0 :     if( !ctx->vinyl.pending.active[ i ] ) continue;
     202           0 :     handle_vinyl_lthash_seq_check_until_match( ctx, ctx->vinyl.pending.seq[ i ], 1/*do_sleep*/ );
     203           0 :     handle_vinyl_lthash_request( ctx, stem, ctx->vinyl.pending.seq[ i ], &ctx->vinyl.pending.phdr[ i ] );
     204           0 :     ctx->vinyl.pending.active[ i ] = 0;
     205           0 :     ctx->vinyl.pending_cnt--;
     206           0 :   }
     207           0 :   FD_TEST( !ctx->vinyl.pending_cnt );
     208           0 : }
     209             : 
     210             : static inline void
     211             : handle_vinyl_lthash_pending_list( fd_snaplv_t *        ctx,
     212           0 :                                   fd_stem_context_t *  stem ) {
     213             :   /* Try to consume as many pending requests as possible. */
     214           0 :   for( ulong i=0; i<FD_SNAPLV_DUP_PENDING_CNT_MAX; i++ ) {
     215           0 :     if( FD_LIKELY( !ctx->vinyl.pending.active[ i ] ) ) continue;
     216           0 :     if( handle_vinyl_lthash_seq_check_fast( ctx, ctx->vinyl.pending.seq[ i ] ) ) {
     217           0 :       handle_vinyl_lthash_request( ctx, stem, ctx->vinyl.pending.seq[ i ], &ctx->vinyl.pending.phdr[ i ] );
     218           0 :       ctx->vinyl.pending.active[ i ] = 0;
     219           0 :       ctx->vinyl.pending_cnt--;
     220           0 :     }
     221           0 :   }
     222           0 : }
     223             : 
     224             : static void
     225             : handle_data_frag( fd_snaplv_t *       ctx,
     226             :                   fd_stem_context_t * stem,
     227             :                   ulong               sig,
     228             :                   ulong               chunk,
     229             :                   ulong               sz,
     230           0 :                   ulong               tspub ) {
     231           0 :   (void)chunk; (void)sz;
     232             : 
     233           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
     234             :     /* skip all data frags when in error state. */
     235           0 :     return;
     236           0 :   }
     237           0 :   if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) ) {
     238           0 :     FD_LOG_ERR(( "received data frag %s (%lu) while in unexpected state %s (%u)",
     239           0 :                  fd_ssctrl_msg_ctrl_str( sig ), sig,
     240           0 :                  fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
     241           0 :     return;
     242           0 :   }
     243           0 :   if( FD_UNLIKELY( sig!=FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH ) ) {
     244           0 :     FD_LOG_ERR(( "received incorrect data frag %s (%lu) in state %s (%u)",
     245           0 :                  fd_ssctrl_msg_ctrl_str( sig ), sig,
     246           0 :                  fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
     247           0 :     return;
     248           0 :   }
     249             : 
     250           0 :   uchar const * in_data = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
     251             : 
     252           0 :   ulong const acc_sz    = sizeof(ulong)+sizeof(fd_vinyl_bstream_phdr_t);
     253           0 :   ulong const batch_sz  = sz;
     254           0 :   ulong const batch_cnt = tspub;
     255           0 :   if( FD_UNLIKELY( batch_cnt>FD_SNAPLV_DUP_BATCH_IN_CNT_MAX ) ) {
     256           0 :     FD_LOG_CRIT(( "batch count %lu exceeds batch count max %lu", batch_cnt, FD_SNAPLV_DUP_BATCH_IN_CNT_MAX ));
     257           0 :   }
     258           0 :   if( FD_UNLIKELY( (batch_cnt*acc_sz)!=batch_sz ) ) {
     259           0 :     FD_LOG_CRIT(( "batch count %lu with account size %lu does not match batch size %lu", batch_cnt, acc_sz, batch_sz ));
     260           0 :   }
     261             : 
     262           0 :   for( ulong acc_i=0UL; acc_i<batch_cnt; acc_i++ ) {
     263             : 
     264             :     /* move in_data pointer forward */
     265           0 :     uchar const * acc_data = in_data;
     266           0 :     in_data += acc_sz;
     267             : 
     268           0 :     ulong acc_data_seq = fd_ulong_load_8( acc_data );
     269             : 
     270           0 :     if( FD_LIKELY( handle_vinyl_lthash_seq_check_fast( ctx, acc_data_seq ) ) ) {
     271             :       /* The request can be processed immediately, skipping the pending list. */
     272           0 :       fd_vinyl_bstream_phdr_t acc_data_phdr[1];
     273           0 :       memcpy( acc_data_phdr, acc_data + sizeof(ulong), sizeof(fd_vinyl_bstream_phdr_t) );
     274           0 :       handle_vinyl_lthash_request( ctx, stem, acc_data_seq, acc_data_phdr );
     275           0 :       continue;
     276           0 :     }
     277             : 
     278             :     /* Find an empty slot in the pending list. */
     279           0 :     ulong seq_min_i = ULONG_MAX;
     280           0 :     ulong seq_min   = ULONG_MAX;
     281           0 :     ulong free_i    = ULONG_MAX;
     282           0 :     if( FD_UNLIKELY( ctx->vinyl.pending_cnt==FD_SNAPLV_DUP_PENDING_CNT_MAX ) ) {
     283             :       /* an entry must be consumed to free a slot */
     284           0 :       for( ulong i=0; i<FD_SNAPLV_DUP_PENDING_CNT_MAX; i++ ) {
     285           0 :         ulong seq = ctx->vinyl.pending.seq[ i ];
     286           0 :         seq_min_i = fd_ulong_if( seq_min > seq, i, seq_min_i );
     287           0 :         seq_min   = fd_ulong_min( seq_min, seq );
     288           0 :       }
     289           0 :       handle_vinyl_lthash_seq_check_until_match( ctx, ctx->vinyl.pending.seq[ seq_min_i ], 1/*do_sleep*/ );
     290           0 :       handle_vinyl_lthash_request( ctx, stem, ctx->vinyl.pending.seq[ seq_min_i ], &ctx->vinyl.pending.phdr[ seq_min_i ] );
     291           0 :       ctx->vinyl.pending.active[ seq_min_i ] = 0;
     292           0 :       ctx->vinyl.pending_cnt--;
     293           0 :       free_i = seq_min_i;
     294           0 :     } else {
     295             :       /* Pick a free slot. */
     296           0 :       free_i = 0UL;
     297           0 :       for( ; free_i<FD_SNAPLV_DUP_PENDING_CNT_MAX; free_i++ ) {
     298           0 :         if( !ctx->vinyl.pending.active[ free_i ] ) break;
     299           0 :       }
     300           0 :     }
     301             : 
     302             :     /* Populate the free slot. */
     303           0 :     ctx->vinyl.pending.seq[ free_i ] = acc_data_seq;
     304           0 :     memcpy( &ctx->vinyl.pending.phdr[ free_i ], acc_data + sizeof(ulong), sizeof(fd_vinyl_bstream_phdr_t) );
     305           0 :     ctx->vinyl.pending.active[ free_i ] = 1;
     306           0 :     ctx->vinyl.pending_cnt++;
     307           0 :   }
     308           0 : }
     309             : 
     310             : static void
     311             : handle_control_frag( fd_snaplv_t *       ctx,
     312             :                      fd_stem_context_t * stem,
     313             :                      ulong               sig,
     314             :                      ulong               in_idx,
     315             :                      ulong               tsorig,
     316           0 :                      ulong               tspub ) {
     317           0 :   (void)in_idx;
     318             : 
     319           0 :   if( ctx->in_kind[ in_idx ]==IN_KIND_SNAPLH ) {
     320           0 :     if( FD_UNLIKELY( !ctx->fail.wait ) ) {
     321           0 :       FD_LOG_CRIT(( "received unexpected control frag %s (%lu) from snaplh in state %s (%u)",
     322           0 :                     fd_ssctrl_msg_ctrl_str( sig ), sig,
     323           0 :                     fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
     324           0 :     }
     325           0 :     if( FD_UNLIKELY( sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) ) {
     326           0 :       FD_LOG_CRIT(( "received incorrect control frag %s (%lu) from snaplh in state %s (%u)",
     327           0 :                     fd_ssctrl_msg_ctrl_str( sig ), sig,
     328           0 :                     fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
     329           0 :     }
     330           0 :     ctx->fail.ack_cnt++;
     331           0 :     return;
     332           0 :   }
     333             : 
     334           0 :   if( ctx->state==FD_SNAPSHOT_STATE_ERROR && sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) {
     335             :     /* Control messages move along the snapshot load pipeline.  Since
     336             :        error conditions can be triggered by any tile in the pipeline,
     337             :        it is possible to be in error state and still receive otherwise
     338             :        valid messages.  Only a fail message can revert this. */
     339           0 :     return;
     340           0 :   };
     341             : 
     342           0 :   int forward_to_ct = 1;
     343             : 
     344           0 :   switch( sig ) {
     345           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
     346           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_INCR: {
     347           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
     348           0 :       ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
     349           0 :       ctx->full  = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
     350             : 
     351           0 :       if( sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL ) {
     352           0 :         fd_lthash_zero( &ctx->hash_accum.calculated_lthash );
     353           0 :         fd_lthash_zero( &ctx->recovery.full_lthash );
     354           0 :         ctx->running_capitalization = 0L;
     355           0 :         ctx->dup_capitalization     = 0L;
     356           0 :       } else {
     357             :         /* The lthash for the incremental snapshot is computed starting
     358             :            from the full snapshot lthash.  Since an init message may
     359             :            be received after a fail message, always start from the
     360             :            recovery value. */
     361           0 :         ctx->hash_accum.calculated_lthash = ctx->recovery.full_lthash;
     362           0 :         ctx->running_capitalization = ctx->recovery.capitalization;
     363           0 :         ctx->dup_capitalization     = 0L;
     364           0 :       }
     365             : 
     366           0 :       break;
     367           0 :     }
     368             : 
     369           0 :     case FD_SNAPSHOT_MSG_CTRL_FINI: {
     370           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING );
     371           0 :       ctx->state = FD_SNAPSHOT_STATE_FINISHING;
     372           0 :       ctx->hash_accum.ack_sig          = sig;
     373           0 :       ctx->hash_accum.awaiting_results = 1;
     374           0 :       handle_vinyl_lthash_request_drain_all( ctx, stem );
     375           0 :       forward_to_ct = 0;
     376           0 :       break; /* the ack is sent when all hashes are received */
     377           0 :     }
     378             : 
     379           0 :     case FD_SNAPSHOT_MSG_CTRL_NEXT:
     380           0 :     case FD_SNAPSHOT_MSG_CTRL_DONE: {
     381           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
     382           0 :       ctx->state = FD_SNAPSHOT_STATE_IDLE;
     383             : 
     384             :       /* back up full_lthash for future recovery. */
     385           0 :       if( sig==FD_SNAPSHOT_MSG_CTRL_NEXT ) {
     386           0 :         ctx->recovery.full_lthash    = ctx->hash_accum.calculated_lthash;
     387           0 :         ctx->recovery.capitalization = ctx->running_capitalization;
     388           0 :       }
     389           0 :       break;
     390           0 :     }
     391             : 
     392           0 :     case FD_SNAPSHOT_MSG_CTRL_ERROR: {
     393           0 :       FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     394           0 :       ctx->state = FD_SNAPSHOT_STATE_ERROR;
     395           0 :       break;
     396           0 :     }
     397             : 
     398           0 :     case FD_SNAPSHOT_MSG_CTRL_FAIL: {
     399           0 :       FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     400           0 :       ctx->state = FD_SNAPSHOT_STATE_IDLE;
     401           0 :       ctx->fail.exp_sig = FD_SNAPSHOT_MSG_CTRL_FAIL;
     402           0 :       ctx->fail.ack_cnt = 0UL;
     403           0 :       ctx->fail.wait = 1;
     404           0 :       forward_to_ct = 0;
     405           0 :       break;
     406           0 :     }
     407             : 
     408           0 :     case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: {
     409           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
     410           0 :       ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
     411           0 :       break;
     412           0 :     }
     413             : 
     414           0 :     default: {
     415           0 :       FD_LOG_ERR(( "unexpected control frag %s (%lu) in state %s (%u)",
     416           0 :                    fd_ssctrl_msg_ctrl_str( sig ), sig,
     417           0 :                    fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
     418           0 :       break;
     419           0 :     }
     420           0 :   }
     421             : 
     422             :   /* Forward the control message down the pipeline */
     423           0 :   fd_stem_publish( stem, ctx->out_link[ OUT_LINK_LH ].idx, sig, 0UL, 0UL, 0UL, tsorig, tspub );
     424           0 :   if( !forward_to_ct ) return;
     425           0 :   fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, sig, 0UL, 0UL, 0UL, tsorig, tspub );
     426           0 : }
     427             : 
     428             : static void
     429             : handle_hash_frag( fd_snaplv_t *       ctx,
     430             :                   fd_stem_context_t * stem,
     431             :                   ulong               in_idx,
     432             :                   ulong               sig,
     433             :                   ulong               chunk,
     434           0 :                   ulong               sz ) {
     435           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
     436             :     /* skip all hash frags when in error state. */
     437           0 :     return;
     438           0 :   }
     439           0 :   if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING &&
     440           0 :                    ctx->state!=FD_SNAPSHOT_STATE_FINISHING ) ) {
     441           0 :     if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWM) ) {
     442           0 :       FD_LOG_WARNING(( "received invalid data frag %s (%lu) from snapwm in state %s (%u)",
     443           0 :                        fd_ssctrl_msg_ctrl_str( sig ), sig,
     444           0 :                        fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
     445           0 :       transition_malformed( ctx, stem );
     446           0 :     }
     447           0 :     return;
     448           0 :   }
     449           0 :   switch( sig ) {
     450           0 :     case FD_SNAPSHOT_HASH_MSG_RESULT_ADD: {
     451           0 :       FD_TEST( sz==sizeof(fd_ssctrl_hash_result_t) );
     452           0 :       FD_TEST( ctx->in_kind[ in_idx ]==IN_KIND_SNAPLH );
     453           0 :       fd_ssctrl_hash_result_t const * result = fd_chunk_to_laddr_const( ctx->adder_in[ in_idx-ctx->adder_in_offset ].wksp, chunk );
     454           0 :       fd_lthash_add( &ctx->hash_accum.calculated_lthash, &result->lthash );
     455           0 :       ctx->running_capitalization = fd_long_sat_add( ctx->running_capitalization, result->capitalization );
     456           0 :       if( FD_UNLIKELY( ctx->running_capitalization==LONG_MAX ) ) {
     457           0 :         FD_LOG_WARNING(( "capitalization overflow, running_capitalization=%ld, result_capitalization=%ld", ctx->running_capitalization, result->capitalization ));
     458           0 :         transition_malformed( ctx, stem );
     459           0 :         return;
     460           0 :       }
     461           0 :       ctx->hash_accum.received_lthashes++;
     462           0 :       break;
     463           0 :     }
     464           0 :     case FD_SNAPSHOT_HASH_MSG_RESULT_SUB: {
     465           0 :       FD_TEST( sz==sizeof(fd_ssctrl_hash_result_t) );
     466           0 :       FD_TEST( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWM );
     467           0 :       fd_ssctrl_hash_result_t const * result = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
     468           0 :       fd_lthash_sub( &ctx->hash_accum.calculated_lthash, &result->lthash );
     469           0 :       FD_TEST( result->capitalization>=0L );
     470           0 :       ctx->dup_capitalization = fd_long_sat_add( ctx->dup_capitalization, result->capitalization );
     471           0 :       break;
     472           0 :     }
     473           0 :     default: {
     474           0 :       FD_LOG_ERR(( "unexpected hash frag %s (%lu) in state %s (%lu)",
     475           0 :                    fd_ssctrl_msg_ctrl_str( sig ), sig,
     476           0 :                    fd_ssctrl_state_str( (ulong)ctx->state ), (ulong)ctx->state ));
     477           0 :       break;
     478           0 :     }
     479           0 :   }
     480           0 : }
     481             : 
     482             : static inline void
     483             : handle_expected_hash_and_capitalization_message( fd_snaplv_t *       ctx,
     484             :                                                  fd_stem_context_t * stem,
     485             :                                                  ulong               sig,
     486             :                                                  ulong               chunk,
     487           0 :                                                  ulong               sz ) {
     488           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
     489             :     /* skip this message when in error state. */
     490           0 :     return;
     491           0 :   }
     492             : 
     493           0 :   if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) ) {
     494           0 :     FD_LOG_WARNING(( "received invalid message %s (%lu) from snapwm in state %s (%u)",
     495           0 :                      fd_ssctrl_msg_ctrl_str( sig ), sig,
     496           0 :                      fd_ssctrl_state_str( (ulong)ctx->state ), ctx->state ));
     497           0 :     transition_malformed( ctx, stem );
     498           0 :     return;
     499           0 :   }
     500             : 
     501           0 :   if( FD_UNLIKELY( sz!=sizeof(fd_ssctrl_hash_result_t) ) ) {
     502           0 :     FD_LOG_ERR(( "unexpected msg sz %lu for sig FD_SNAPSHOT_HASH_MSG_EXP_AND_CAPITAL", sz ));
     503           0 :     return;
     504           0 :   }
     505             : 
     506           0 :   fd_ssctrl_hash_result_t const * expected = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
     507           0 :   ctx->hash_accum.expected_lthash = expected->lthash;
     508           0 :   ctx->manifest_capitalization    = expected->capitalization;
     509           0 : }
     510             : 
     511             : static inline int
     512             : returnable_frag( fd_snaplv_t *       ctx,
     513             :                  ulong               in_idx,
     514             :                  ulong               seq    FD_PARAM_UNUSED,
     515             :                  ulong               sig,
     516             :                  ulong               chunk,
     517             :                  ulong               sz,
     518             :                  ulong               ctl    FD_PARAM_UNUSED,
     519             :                  ulong               tsorig,
     520             :                  ulong               tspub,
     521           0 :                  fd_stem_context_t * stem ) {
     522           0 :   FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     523             : 
     524           0 :   if( FD_LIKELY( sig==FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH ) )       handle_data_frag( ctx, stem, sig, chunk, sz, tspub );
     525           0 :   else if( FD_LIKELY( sig==FD_SNAPSHOT_HASH_MSG_RESULT_ADD ||
     526           0 :                       sig==FD_SNAPSHOT_HASH_MSG_RESULT_SUB ) )      handle_hash_frag( ctx, stem, in_idx, sig, chunk, sz );
     527           0 :   else if( FD_LIKELY( sig==FD_SNAPSHOT_HASH_MSG_EXP_AND_CAPITAL ) ) handle_expected_hash_and_capitalization_message( ctx, stem, sig, chunk, sz );
     528           0 :   else                                                              handle_control_frag( ctx, stem, sig, in_idx, tsorig, tspub );
     529             : 
     530           0 :   return 0;
     531           0 : }
     532             : 
     533             : static void
     534             : before_credit( fd_snaplv_t *       ctx,
     535             :                fd_stem_context_t * stem FD_PARAM_UNUSED,
     536           0 :                int *               charge_busy ) {
     537           0 :   *charge_busy = 0;
     538           0 :   handle_vinyl_lthash_seq_sync( ctx );
     539           0 : }
     540             : 
     541             : static void
     542             : after_credit( fd_snaplv_t *        ctx,
     543             :               fd_stem_context_t *  stem,
     544             :               int *                opt_poll_in FD_PARAM_UNUSED,
     545           0 :               int *                charge_busy FD_PARAM_UNUSED ) {
     546             : 
     547           0 :   handle_vinyl_lthash_pending_list( ctx, stem );
     548             : 
     549           0 :   if( FD_UNLIKELY( ctx->hash_accum.awaiting_results && ctx->hash_accum.received_lthashes==ctx->num_hash_tiles ) ) {
     550             : 
     551           0 :     ctx->hash_accum.awaiting_results  = 0;
     552           0 :     ctx->hash_accum.received_lthashes = 0UL;
     553           0 :     ctx->running_capitalization       = fd_long_sat_sub( ctx->running_capitalization, ctx->dup_capitalization );
     554           0 :     if( FD_UNLIKELY( ctx->running_capitalization==LONG_MIN ) ) {
     555           0 :       FD_LOG_WARNING(( "capitalization underflow, running_capitalization=%ld, dup_capitalization=%ld", ctx->running_capitalization, ctx->dup_capitalization ));
     556           0 :       transition_malformed( ctx, stem );
     557           0 :       return;
     558           0 :     }
     559           0 :     if( FD_UNLIKELY( ctx->running_capitalization<0L ) ) {
     560           0 :       FD_LOG_WARNING(( "computed capitalization %ld is invalid", ctx->running_capitalization ));
     561           0 :       transition_malformed( ctx, stem );
     562           0 :       return;
     563           0 :     }
     564           0 :     long computed_capitalization = ctx->running_capitalization;
     565           0 :     int capitalization_match     = computed_capitalization==ctx->manifest_capitalization;
     566             : 
     567           0 :     int lthash_match = !memcmp( &ctx->hash_accum.expected_lthash, &ctx->hash_accum.calculated_lthash, sizeof(fd_lthash_value_t) );
     568             : 
     569           0 :     if( FD_UNLIKELY( !lthash_match ) ) {
     570             :       /* SnapshotError::MismatchedHash
     571             :          https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L479 */
     572           0 :       FD_LOG_WARNING(( "calculated accounts lthash %s does not match accounts lthash %s in %s snapshot manifest",
     573           0 :                         FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.calculated_lthash ),
     574           0 :                         FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.expected_lthash ),
     575           0 :                         ctx->full?"full":"incremental" ));
     576           0 :       transition_malformed( ctx, stem );
     577           0 :       return;
     578           0 :     } else {
     579           0 :       FD_LOG_INFO(( "calculated accounts lthash %s matches accounts lthash %s in %s snapshot manifest",
     580           0 :                      FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.calculated_lthash ),
     581           0 :                      FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.expected_lthash ),
     582           0 :                      ctx->full?"full":"incremental" ));
     583           0 :     }
     584             : 
     585           0 :     if( FD_UNLIKELY( !capitalization_match ) ) {
     586             :       /* SnapshotError::MismatchedCapitalization
     587             :          https://github.com/anza-xyz/agave/blob/v4.0.0-beta.2/runtime/src/snapshot_bank_utils.rs#L217 */
     588           0 :       FD_LOG_WARNING(( "%s snapshot manifest capitalization %ld does not match computed capitalization %ld",
     589           0 :                        ctx->full?"full":"incremental", ctx->manifest_capitalization, computed_capitalization ));
     590           0 :       transition_malformed( ctx, stem );
     591           0 :       return;
     592           0 :     }
     593             : 
     594           0 :     fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, ctx->hash_accum.ack_sig, 0UL, 0UL, 0UL, 0UL, 0UL );
     595           0 :   }
     596             : 
     597           0 :   if( FD_UNLIKELY( ctx->fail.wait && ctx->fail.ack_cnt==ctx->num_hash_tiles ) ) {
     598           0 :     fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, ctx->fail.exp_sig, 0UL, 0UL, 0UL, 0UL, 0UL );
     599           0 :     ctx->fail.exp_sig = 0UL;
     600           0 :     ctx->fail.ack_cnt = 0UL;
     601           0 :     ctx->fail.wait    = 0;
     602           0 :     return;
     603           0 :   }
     604           0 : }
     605             : 
     606             : static ulong
     607             : populate_allowed_fds( fd_topo_t      const * topo FD_PARAM_UNUSED,
     608             :                       fd_topo_tile_t const * tile FD_PARAM_UNUSED,
     609             :                       ulong                  out_fds_cnt,
     610           0 :                       int *                  out_fds ) {
     611           0 :   if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "unexpected out_fds_cnt %lu", out_fds_cnt ));
     612             : 
     613           0 :   ulong out_cnt = 0;
     614           0 :   out_fds[ out_cnt++ ] = 2UL; /* stderr */
     615           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
     616           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     617           0 :   }
     618           0 :   return out_cnt;
     619           0 : }
     620             : 
     621             : static ulong
     622             : populate_allowed_seccomp( fd_topo_t const *      topo FD_PARAM_UNUSED,
     623             :                           fd_topo_tile_t const * tile FD_PARAM_UNUSED,
     624             :                           ulong                  out_cnt,
     625           0 :                           struct sock_filter *   out ) {
     626           0 :   populate_sock_filter_policy_fd_snaplv_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
     627           0 :   return sock_filter_policy_fd_snaplv_tile_instr_cnt;
     628           0 : }
     629             : 
     630             : static void
     631             : unprivileged_init( fd_topo_t *      topo,
     632           0 :                    fd_topo_tile_t * tile ) {
     633           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     634             : 
     635           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     636           0 :   fd_snaplv_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplv_t), sizeof(fd_snaplv_t)         );
     637             : 
     638           0 :   FD_TEST( fd_topo_tile_name_cnt( topo, "snaplh" )<=FD_SNAPSHOT_MAX_SNAPLH_TILES );
     639             : 
     640           0 :   ulong expected_in_cnt = 1UL + fd_topo_tile_name_cnt( topo, "snaplh" );
     641           0 :   if( FD_UNLIKELY( tile->in_cnt!=expected_in_cnt ) )  FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected %lu",  tile->in_cnt, expected_in_cnt ));
     642           0 :   if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt ));
     643             : 
     644           0 :   ulong adder_idx = 0UL;
     645           0 :   for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
     646           0 :     fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
     647           0 :     fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
     648             : 
     649           0 :     if( FD_LIKELY( 0==strcmp( in_link->name, "snapwm_lv" ) ) ) {
     650           0 :       ctx->in.wksp                   = in_wksp->wksp;
     651           0 :       ctx->in.chunk0                 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
     652           0 :       ctx->in.wmark                  = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
     653           0 :       ctx->in.mtu                    = in_link->mtu;
     654           0 :       ctx->in.pos                    = 0UL;
     655           0 :       ctx->in_kind[ i ]              = IN_KIND_SNAPWM;
     656             : 
     657           0 :     } else if( FD_LIKELY( 0==strcmp( in_link->name, "snaplh_lv" ) ) ) {
     658           0 :       ctx->adder_in[ adder_idx ].wksp    = in_wksp->wksp;
     659           0 :       ctx->adder_in[ adder_idx ].chunk0  = fd_dcache_compact_chunk0( ctx->adder_in[ adder_idx ].wksp, in_link->dcache );
     660           0 :       ctx->adder_in[ adder_idx ].wmark   = fd_dcache_compact_wmark ( ctx->adder_in[ adder_idx ].wksp, in_link->dcache, in_link->mtu );
     661           0 :       ctx->adder_in[ adder_idx ].mtu     = in_link->mtu;
     662           0 :       ctx->in_kind[ i ]                  = IN_KIND_SNAPLH;
     663           0 :       if( FD_LIKELY( adder_idx==0UL ) ) ctx->adder_in_offset = i;
     664           0 :       adder_idx++;
     665             : 
     666           0 :     } else {
     667           0 :       FD_LOG_ERR(( "tile `" NAME "` has unexpected in link name `%s`", in_link->name ));
     668           0 :     }
     669           0 :   }
     670             : 
     671           0 :   ctx->vinyl.bstream_seq_last = 0UL;
     672             : 
     673           0 :   for( uint i=0U; i<(tile->out_cnt); i++ ) {
     674           0 :     fd_topo_link_t * link = &topo->links[ tile->out_link_id[ i ] ];
     675             : 
     676           0 :     if( 0==strcmp( link->name, "snaplv_ct" ) ) {
     677           0 :       out_link_t * o_link = &ctx->out_link[ OUT_LINK_CT ];
     678           0 :       o_link->idx    = i;
     679           0 :       o_link->mem    = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
     680           0 :       o_link->chunk0 = 0UL;
     681           0 :       o_link->wmark  = 0UL;
     682           0 :       o_link->chunk  = 0UL;
     683             : 
     684           0 :     } else if( 0==strcmp( link->name, "snaplv_lh" ) ) {
     685           0 :       out_link_t * o_link = &ctx->out_link[ OUT_LINK_LH ];
     686           0 :       o_link->idx    = i;
     687           0 :       o_link->mem    = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
     688           0 :       o_link->chunk0 = fd_dcache_compact_chunk0( o_link->mem, link->dcache );
     689           0 :       o_link->wmark  = fd_dcache_compact_wmark( o_link->mem, link->dcache, link->mtu );
     690           0 :       o_link->chunk  = o_link->chunk0;
     691             : 
     692           0 :     } else {
     693           0 :       FD_LOG_ERR(( "unexpected output link %s", link->name ));
     694           0 :     }
     695           0 :   }
     696             : 
     697           0 :   memset( ctx->vinyl.pending.active, 0, FD_SNAPLV_DUP_PENDING_CNT_MAX*sizeof(int) );
     698           0 :   ctx->vinyl.pending_cnt = 0;
     699             : 
     700           0 :   ulong vinyl_admin_obj_id = fd_pod_query_ulong( topo->props, "vinyl_admin", ULONG_MAX );
     701           0 :   FD_TEST( vinyl_admin_obj_id!=ULONG_MAX );
     702           0 :   fd_vinyl_admin_t * vinyl_admin = fd_vinyl_admin_join( fd_topo_obj_laddr( topo, vinyl_admin_obj_id ) );
     703           0 :   FD_TEST( vinyl_admin );
     704           0 :   ctx->vinyl.admin = vinyl_admin;
     705           0 :   for(;;) {
     706             :     /* This query can be done without the need of an rwlock. */
     707           0 :     ulong vinyl_admin_status = fd_vinyl_admin_ulong_query( &vinyl_admin->status );
     708           0 :     if( FD_LIKELY( vinyl_admin_status!=FD_VINYL_ADMIN_STATUS_INIT_PENDING &&
     709           0 :                    vinyl_admin_status!=FD_VINYL_ADMIN_STATUS_ERROR ) ) break;
     710           0 :     fd_log_sleep( (long)1e6 /*1ms*/ );
     711           0 :     FD_SPIN_PAUSE();
     712           0 :   }
     713             : 
     714           0 :   ctx->metrics.full.duplicate_accounts_hashed        = 0UL;
     715           0 :   ctx->metrics.incremental.duplicate_accounts_hashed = 0UL;
     716             : 
     717           0 :   ctx->state                        = FD_SNAPSHOT_STATE_IDLE;
     718           0 :   ctx->full                         = 1;
     719             : 
     720           0 :   ctx->num_hash_tiles               = fd_topo_tile_name_cnt( topo, "snaplh" );
     721           0 :   ctx->num_write_tiles              = fd_topo_tile_name_cnt( topo, "snapwr" );
     722           0 :   FD_TEST( ctx->num_write_tiles<=FD_VINYL_ADMIN_WR_SEQ_CNT_MAX );
     723             : 
     724           0 :   ctx->hash_accum.received_lthashes = 0UL;
     725           0 :   ctx->hash_accum.awaiting_results  = 0;
     726           0 :   ctx->hash_accum.hash_check_done   = 0;
     727             : 
     728           0 :   fd_lthash_zero( &ctx->hash_accum.calculated_lthash );
     729           0 :   fd_lthash_zero( &ctx->recovery.full_lthash );
     730             : 
     731           0 :   ctx->recovery.capitalization = 0L;
     732           0 :   ctx->running_capitalization  = 0L;
     733           0 :   ctx->dup_capitalization      = 0L;
     734           0 :   ctx->manifest_capitalization = 0L;
     735             : 
     736           0 :   ctx->fail.exp_sig = 0UL;
     737           0 :   ctx->fail.ack_cnt = 0UL;
     738           0 :   ctx->fail.wait    = 0;
     739           0 : }
     740             : 
     741           0 : #define STEM_BURST (FD_SNAPLV_STEM_BURST)
     742           0 : #define STEM_LAZY  1000L
     743             : 
     744           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_snaplv_t
     745           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaplv_t)
     746             : 
     747             : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
     748           0 : #define STEM_CALLBACK_METRICS_WRITE   metrics_write
     749           0 : #define STEM_CALLBACK_BEFORE_CREDIT   before_credit
     750           0 : #define STEM_CALLBACK_AFTER_CREDIT    after_credit
     751           0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
     752             : 
     753             : 
     754             : #include "../../disco/stem/fd_stem.c"
     755             : 
     756             : fd_topo_run_tile_t fd_tile_snaplv = {
     757             :   .name                     = NAME,
     758             :   .populate_allowed_fds     = populate_allowed_fds,
     759             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     760             :   .scratch_align            = scratch_align,
     761             :   .scratch_footprint        = scratch_footprint,
     762             :   .unprivileged_init        = unprivileged_init,
     763             :   .run                      = stem_run,
     764             : };
     765             : 
     766             : #undef NAME

Generated by: LCOV version 1.14