LCOV - code coverage report
Current view: top level - discof/restore - fd_snaplh_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 451 0.0 %
Date: 2026-01-24 04:58:51 Functions: 0 29 0.0 %

          Line data    Source code
       1             : #include "../../disco/topo/fd_topo.h"
       2             : #include "../../disco/metrics/fd_metrics.h"
       3             : #include "../../ballet/lthash/fd_lthash.h"
       4             : #include "../../ballet/lthash/fd_lthash_adder.h"
       5             : #include "../../vinyl/io/fd_vinyl_io.h"
       6             : #include "../../vinyl/bstream/fd_vinyl_bstream.h"
       7             : #include "generated/fd_snaplh_tile_seccomp.h"
       8             : 
       9             : #include "utils/fd_ssctrl.h"
      10             : 
      11             : #include <errno.h>
      12             : #include <sys/stat.h> /* fstat */
      13             : #include <fcntl.h>    /* open  */
      14             : #include <unistd.h>   /* close */
      15             : 
      16             : #if FD_HAS_LIBURING
      17             : #include "../../vinyl/io/fd_vinyl_io_ur.h"
      18             : #include <liburing.h>
      19             : typedef struct io_uring fd_io_uring_t;
      20             : #else
      21             : typedef char            fd_io_uring_t;
      22             : #endif
      23             : 
      24             : #define NAME "snaplh"
      25             : 
      26             : #define IN_CNT_MAX     (2UL)
      27           0 : #define IN_KIND_SNAPLV (0UL)
      28           0 : #define IN_KIND_SNAPWH (1UL)
      29             : 
      30           0 : #define VINYL_LTHASH_BLOCK_ALIGN  (512UL) /* O_DIRECT would require 4096UL */
      31           0 : #define VINYL_LTHASH_BLOCK_MAX_SZ (16UL<<20)
      32             : FD_STATIC_ASSERT( VINYL_LTHASH_BLOCK_MAX_SZ>(sizeof(fd_snapshot_full_account_t)+FD_VINYL_BSTREAM_BLOCK_SZ+2*VINYL_LTHASH_BLOCK_ALIGN), "VINYL_LTHASH_BLOCK_MAX_SZ" );
      33             : 
      34           0 : #define VINYL_LTHASH_RD_REQ_MAX   (32UL)
      35             : 
      36             : #define VINYL_LTHASH_IO_SPAD_MAX  (2<<20UL)
      37             : 
      38           0 : #define VINYL_LTHASH_RD_REQ_FREE  (0UL)
      39           0 : #define VINYL_LTHASH_RD_REQ_PEND  (1UL)
      40           0 : #define VINYL_LTHASH_RD_REQ_SENT  (2UL)
      41             : 
      42             : struct in_link_private {
      43             :   fd_wksp_t *  wksp;
      44             :   ulong        chunk0;
      45             :   ulong        wmark;
      46             :   ulong        mtu;
      47             :   void const * base;
      48             :   ulong *      seq_sync;  /* fseq->seq[0] */
      49             : };
      50             : typedef struct in_link_private in_link_t;
      51             : 
      52             : struct out_link_private {
      53             :   fd_wksp_t *  wksp;
      54             :   ulong        chunk0;
      55             :   ulong        wmark;
      56             :   ulong        chunk;
      57             :   ulong        mtu;
      58             : };
      59             : typedef struct out_link_private out_link_t;
      60             : 
      61             : struct fd_snaplh_tile {
      62             :   uint state;
      63             :   int  full;
      64             : 
      65             :   ulong seed;
      66             :   ulong lthash_tile_cnt;
      67             :   ulong lthash_tile_idx;
      68             :   ulong lthash_tile_add_cnt;
      69             :   ulong lthash_tile_sub_cnt;
      70             :   ulong lthash_tile_add_idx;
      71             :   ulong lthash_tile_sub_idx;
      72             :   ulong pairs_seen;
      73             :   ulong lthash_req_seen;
      74             : 
      75             :   /* Database params */
      76             :   ulong const * io_seed;
      77             : 
      78             :   fd_lthash_adder_t   adder[1];
      79             :   fd_lthash_adder_t   adder_sub[1];
      80             :   uchar               data[FD_RUNTIME_ACC_SZ_MAX];
      81             : 
      82             :   fd_lthash_value_t   running_lthash;
      83             :   fd_lthash_value_t   running_lthash_sub;
      84             : 
      85             :   struct {
      86             :     int               dev_fd;
      87             :     ulong             dev_sz;
      88             :     ulong             dev_base;
      89             :     void *            pair_mem;
      90             :     void *            pair_tmp;
      91             : 
      92             :     struct {
      93             :       fd_vinyl_bstream_phdr_t phdr  [VINYL_LTHASH_RD_REQ_MAX];
      94             :       fd_vinyl_io_rd_t        rd_req[VINYL_LTHASH_RD_REQ_MAX];
      95             :     } pending;
      96             :     ulong             pending_rd_req_cnt;
      97             : 
      98             :     int               io_uring_enabled;
      99             :     fd_vinyl_io_t *   io;
     100             :     fd_io_uring_t *   ring;
     101             :   } vinyl;
     102             : 
     103             :   struct {
     104             :     struct {
     105             :       ulong accounts_hashed;
     106             :     } full;
     107             : 
     108             :     struct {
     109             :       ulong accounts_hashed;
     110             :     } incremental;
     111             :   } metrics;
     112             : 
     113             :   ulong       wh_finish_fseq;
     114             :   ulong       wh_last_in_seq;
     115             : 
     116             :   in_link_t   in[IN_CNT_MAX];
     117             :   uchar       in_kind[IN_CNT_MAX];
     118             :   out_link_t  out;
     119             : };
     120             : 
     121             : typedef struct fd_snaplh_tile fd_snaplh_t;
     122             : 
     123             : static inline int
     124           0 : should_shutdown( fd_snaplh_t * ctx ) {
     125           0 :   return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
     126           0 : }
     127             : 
     128             : static ulong
     129           0 : scratch_align( void ) {
     130           0 :   return alignof(fd_snaplh_t);
     131           0 : }
     132             : 
     133             : static ulong
     134           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     135           0 :   (void)tile;
     136           0 :   ulong l = FD_LAYOUT_INIT;
     137           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_snaplh_t),     sizeof(fd_snaplh_t)                                );
     138           0 :   l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ                          );
     139           0 :   l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ                          );
     140           0 :   l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ                          );
     141             :   #if FD_HAS_LIBURING
     142             :   l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ  );
     143             :   l = FD_LAYOUT_APPEND( l, alignof(struct io_uring), sizeof(struct io_uring)                            );
     144             :   l = FD_LAYOUT_APPEND( l, fd_vinyl_io_ur_align(),   fd_vinyl_io_ur_footprint(VINYL_LTHASH_IO_SPAD_MAX) );
     145             :   #endif
     146           0 :   return FD_LAYOUT_FINI( l, alignof(fd_snaplh_t) );
     147           0 : }
     148             : 
     149             : static void
     150           0 : metrics_write( fd_snaplh_t * ctx ) {
     151           0 :   FD_MGAUGE_SET( SNAPLH, FULL_ACCOUNTS_HASHED,        ctx->metrics.full.accounts_hashed );
     152           0 :   FD_MGAUGE_SET( SNAPLH, INCREMENTAL_ACCOUNTS_HASHED, ctx->metrics.incremental.accounts_hashed );
     153           0 :   FD_MGAUGE_SET( SNAPLH, STATE,                       (ulong)(ctx->state) );
     154           0 : }
     155             : 
     156             : static inline int
     157           0 : should_hash_account( fd_snaplh_t * ctx ) {
     158           0 :   return (ctx->pairs_seen % ctx->lthash_tile_add_cnt)==ctx->lthash_tile_add_idx;
     159           0 : }
     160             : 
     161             : static inline int
     162           0 : should_process_lthash_request( fd_snaplh_t * ctx ) {
     163           0 :   return (ctx->lthash_req_seen % ctx->lthash_tile_sub_cnt)==ctx->lthash_tile_sub_idx;
     164           0 : }
     165             : 
     166             : FD_FN_UNUSED static void
     167             : streamlined_hash( fd_snaplh_t *       restrict ctx,
     168             :                   fd_lthash_adder_t * restrict adder,
     169             :                   fd_lthash_value_t * restrict running_lthash,
     170           0 :                   uchar const *       restrict _pair ) {
     171           0 :   uchar const * pair = _pair;
     172           0 :   fd_vinyl_bstream_phdr_t const * phdr = (fd_vinyl_bstream_phdr_t const *)pair;
     173           0 :   pair += sizeof(fd_vinyl_bstream_phdr_t);
     174           0 :   fd_account_meta_t const * meta = (fd_account_meta_t const *)pair;
     175           0 :   pair += sizeof(fd_account_meta_t);
     176           0 :   uchar const * data = pair;
     177             : 
     178           0 :   ulong data_len      = meta->dlen;
     179           0 :   const char * pubkey = phdr->key.c;
     180           0 :   ulong lamports      = meta->lamports;
     181           0 :   const uchar * owner = meta->owner;
     182           0 :   uchar executable = (uchar)( !meta->executable ? 0U : 1U) ;
     183             : 
     184           0 :   if( FD_UNLIKELY( data_len > FD_RUNTIME_ACC_SZ_MAX ) ) FD_LOG_ERR(( "Found unusually large account (data_sz=%lu), aborting", data_len ));
     185           0 :   if( FD_UNLIKELY( lamports==0UL ) ) return;
     186             : 
     187           0 :   fd_lthash_adder_push_solana_account( adder,
     188           0 :                                        running_lthash,
     189           0 :                                        pubkey,
     190           0 :                                        data,
     191           0 :                                        data_len,
     192           0 :                                        lamports,
     193           0 :                                        executable,
     194           0 :                                        owner );
     195             : 
     196           0 :   if( FD_LIKELY( ctx->full ) ) ctx->metrics.full.accounts_hashed++;
     197           0 :   else                         ctx->metrics.incremental.accounts_hashed++;
     198           0 : }
     199             : 
     200             : FD_FN_UNUSED static inline void
     201             : bd_read( int    fd,
     202             :          ulong  off,
     203             :          void * buf,
     204           0 :          ulong  sz ) {
     205           0 :   ssize_t ssz = pread( fd, buf, sz, (off_t)off );
     206           0 :   if( FD_LIKELY( ssz==(ssize_t)sz ) ) return;
     207           0 :   if( ssz<(ssize_t)0 ) FD_LOG_CRIT(( "pread(fd %i,off %lu,sz %lu) failed (%i-%s)", fd, off, sz, errno, fd_io_strerror( errno ) ));
     208           0 :   /**/                 FD_LOG_CRIT(( "pread(fd %i,off %lu,sz %lu) failed (unexpected sz %li)", fd, off, sz, (long)ssz ));
     209           0 : }
     210             : 
     211             : FD_FN_UNUSED static void
     212             : handle_vinyl_lthash_request_bd( fd_snaplh_t *             ctx,
     213             :                                 ulong                     seq,
     214           0 :                                 fd_vinyl_bstream_phdr_t * acc_hdr ) {
     215             : 
     216             :   /* The bd version is blocking, therefore ctx->pending is not used. */
     217           0 :   ulong const io_seed = FD_VOLATILE_CONST( *ctx->io_seed );
     218             : 
     219           0 :   ulong val_esz = fd_vinyl_bstream_ctl_sz( acc_hdr->ctl );
     220           0 :   ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
     221             : 
     222             :   /* dev_seq shows where the seq is physically located in device. */
     223           0 :   ulong dev_seq  = ( seq + ctx->vinyl.dev_base ) % ctx->vinyl.dev_sz;
     224           0 :   ulong rd_off   = fd_ulong_align_dn( dev_seq, VINYL_LTHASH_BLOCK_ALIGN );
     225           0 :   ulong pair_off = (dev_seq - rd_off);
     226           0 :   ulong rd_sz    = fd_ulong_align_up( pair_off + pair_sz, VINYL_LTHASH_BLOCK_ALIGN );
     227           0 :   FD_TEST( rd_sz < VINYL_LTHASH_BLOCK_MAX_SZ );
     228             : 
     229           0 :   uchar * pair = ((uchar*)ctx->vinyl.pair_mem) + pair_off;
     230           0 :   fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)pair;
     231             : 
     232           0 :   for(;;) {
     233           0 :     ulong sz    = rd_sz;
     234           0 :     ulong rsz   = fd_ulong_min( rd_sz, ctx->vinyl.dev_sz - rd_off );
     235           0 :     uchar * dst = ctx->vinyl.pair_mem;
     236           0 :     uchar * tmp = ctx->vinyl.pair_tmp;
     237             : 
     238           0 :     bd_read( ctx->vinyl.dev_fd, rd_off, dst, rsz );
     239           0 :     sz -= rsz;
     240           0 :     if( FD_UNLIKELY( sz ) ) {
     241             :       /* When the dev wraps around, the dev_base needs to be skipped.
     242             :          This means: increase the size multiple of the alignment,
     243             :          read into a temporary buffer, and memcpy into the dst at the
     244             :          correct offset. */
     245           0 :       bd_read( ctx->vinyl.dev_fd, 0, tmp, sz + VINYL_LTHASH_BLOCK_ALIGN );
     246           0 :       fd_memcpy( dst + rsz, tmp + ctx->vinyl.dev_base, sz );
     247           0 :     }
     248             : 
     249           0 :     if( FD_LIKELY( !memcmp( phdr, acc_hdr, sizeof(fd_vinyl_bstream_phdr_t)) ) ) {
     250             : 
     251             :       /* test bstream pair integrity hashes */
     252           0 :       int test = !fd_vinyl_bstream_pair_test( io_seed, seq, (fd_vinyl_bstream_block_t *)pair, pair_sz );
     253           0 :       if( FD_LIKELY( test ) ) break;
     254           0 :     }
     255           0 :     FD_LOG_WARNING(( "phdr mismatch! - this should not happen under bstream_seq" ));
     256           0 :     FD_SPIN_PAUSE();
     257           0 :   }
     258             : 
     259           0 :   streamlined_hash( ctx, ctx->adder_sub, &ctx->running_lthash_sub, pair );
     260           0 : }
     261             : 
     262             : FD_FN_UNUSED static inline ulong
     263           0 : rd_req_ctx_get_idx( ulong rd_req_ctx ) {
     264           0 :   return ( rd_req_ctx >>  0 ) & ((1UL<<32)-1UL);
     265           0 : }
     266             : 
     267             : FD_FN_UNUSED static inline ulong
     268           0 : rd_req_ctx_get_status( ulong rd_req_ctx ) {
     269           0 :   return ( rd_req_ctx >> 32 ) & ((1UL<<32)-1UL);
     270           0 : }
     271             : 
     272             : FD_FN_UNUSED static inline void
     273             : rd_req_ctx_into_parts( ulong   rd_req_ctx,
     274             :                        ulong * idx,
     275           0 :                        ulong * status ) {
     276           0 :   *idx    = rd_req_ctx_get_idx( rd_req_ctx );
     277           0 :   *status = rd_req_ctx_get_status( rd_req_ctx );
     278           0 : }
     279             : 
     280             : FD_FN_UNUSED static inline ulong
     281             : rd_req_ctx_from_parts( ulong idx,
     282           0 :                        ulong status ) {
     283           0 :   return ( idx & ((1UL<<32)-1UL) ) | ( status << 32 );
     284           0 : }
     285             : 
     286             : FD_FN_UNUSED static inline ulong
     287             : rd_req_ctx_update_status( ulong rd_req_ctx,
     288           0 :                           ulong status ) {
     289           0 :   return rd_req_ctx_from_parts( rd_req_ctx_get_idx( rd_req_ctx ), status );
     290           0 : }
     291             : 
     292             : FD_FN_UNUSED static void
     293             : handle_vinyl_lthash_compute_from_rd_req( fd_snaplh_t *      ctx,
     294           0 :                                          fd_vinyl_io_rd_t * rd_req ) {
     295           0 :   ulong idx = rd_req_ctx_get_idx( rd_req->ctx );
     296             : 
     297           0 :   fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)rd_req->dst;
     298           0 :   fd_vinyl_bstream_phdr_t * acc_hdr = &ctx->vinyl.pending.phdr[ idx ];
     299             : 
     300             :   /* test the retrieved header (it must mach the request) */
     301           0 :   FD_TEST( !memcmp( phdr, acc_hdr, sizeof(fd_vinyl_bstream_phdr_t)) );
     302             : 
     303           0 :   ulong const io_seed = FD_VOLATILE_CONST( *ctx->io_seed );
     304           0 :   ulong   seq     = rd_req->seq;
     305           0 :   uchar * pair    = (uchar*)rd_req->dst;
     306           0 :   ulong   pair_sz = rd_req->sz;
     307             : 
     308             :   /* test the bstream pair integrity hashes */
     309           0 :   FD_TEST( !fd_vinyl_bstream_pair_test( io_seed, seq, (fd_vinyl_bstream_block_t *)pair, pair_sz ) );
     310             : 
     311           0 :   streamlined_hash( ctx, ctx->adder_sub, &ctx->running_lthash_sub, pair );
     312           0 : }
     313             : 
     314             : static inline ulong
     315             : consume_available_cqe( fd_snaplh_t * ctx,
     316           0 :                        int           peek_first ) {
     317           0 :   if( FD_LIKELY( !ctx->vinyl.pending_rd_req_cnt ) ) return 0UL;
     318           0 :   if( FD_UNLIKELY( !ctx->vinyl.io_uring_enabled ) ) return 0UL;
     319             :   /* Consume one at a time, so that incoming frags can be processed
     320             :      with minimum delay. */
     321             :   #if FD_HAS_LIBURING
     322             :   if( FD_UNLIKELY( !!peek_first ) ) {
     323             :     struct io_uring_cqe * cqe = NULL;
     324             :     io_uring_peek_cqe( ctx->vinyl.ring, &cqe ); if( !cqe ) return 0UL;
     325             :   }
     326             :   fd_vinyl_io_rd_t * rd_req = NULL;
     327             :   if( FD_LIKELY( fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, 0/*non blocking*/ )==FD_VINYL_SUCCESS ) ) {
     328             :     handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
     329             :     rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
     330             :     rd_req->seq = ULONG_MAX;
     331             :     rd_req->sz  = 0UL;
     332             :     ctx->vinyl.pending_rd_req_cnt--;
     333             :     return 1UL;
     334             :   }
     335             :   #else
     336           0 :   (void)peek_first;
     337           0 :   #endif
     338           0 :   return 0UL;
     339           0 : }
     340             : 
     341             : FD_FN_UNUSED static void
     342             : handle_vinyl_lthash_request_ur( fd_snaplh_t *             ctx,
     343             :                                 ulong                     seq,
     344           0 :                                 fd_vinyl_bstream_phdr_t * acc_hdr ) {
     345             :   /* Find a free slot */
     346           0 :   ulong free_i = ULONG_MAX;
     347           0 :   if( FD_LIKELY( ctx->vinyl.pending_rd_req_cnt<VINYL_LTHASH_RD_REQ_MAX ) ) {
     348           0 :     for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
     349           0 :       fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
     350           0 :       if( FD_UNLIKELY( rd_req_ctx_get_status( rd_req->ctx )==VINYL_LTHASH_RD_REQ_FREE ) ) {
     351           0 :         free_i = i;
     352           0 :         break;
     353           0 :       }
     354           0 :     }
     355           0 :   } else {
     356           0 :     fd_vinyl_io_rd_t * rd_req = NULL;
     357           0 :     fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, FD_VINYL_IO_FLAG_BLOCKING );
     358           0 :     FD_TEST( rd_req!=NULL );
     359           0 :     handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
     360           0 :     rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
     361           0 :     rd_req->seq = ULONG_MAX;
     362           0 :     rd_req->sz  = 0UL;
     363           0 :     free_i      = rd_req_ctx_get_idx( rd_req->ctx );
     364           0 :     ctx->vinyl.pending_rd_req_cnt--;
     365           0 :   }
     366           0 :   FD_CRIT( free_i<VINYL_LTHASH_RD_REQ_MAX, "read request free index exceeds max value" );
     367             : 
     368             :   /* Populate the empty slot and submit */
     369           0 :   fd_vinyl_bstream_phdr_t * in_phdr = &ctx->vinyl.pending.phdr[ free_i ];
     370           0 :   memcpy( in_phdr, acc_hdr, sizeof(fd_vinyl_bstream_phdr_t) );
     371           0 :   ulong val_esz = fd_vinyl_bstream_ctl_sz( acc_hdr->ctl );
     372           0 :   ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
     373             : 
     374           0 :   fd_vinyl_io_rd_t * rd_req  = &ctx->vinyl.pending.rd_req[ free_i ];
     375           0 :   rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_PEND );
     376           0 :   rd_req->seq = seq;
     377           0 :   rd_req->sz  = pair_sz;
     378           0 :   fd_vinyl_io_read( ctx->vinyl.io, rd_req );
     379           0 :   rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_SENT );
     380           0 :   ctx->vinyl.pending_rd_req_cnt++;
     381           0 : }
     382             : 
     383             : FD_FN_UNUSED static void
     384           0 : handle_vinyl_lthash_request_ur_consume_all( fd_snaplh_t * ctx ) {
     385           0 :   while( ctx->vinyl.pending_rd_req_cnt ) {
     386           0 :     fd_vinyl_io_rd_t * rd_req = NULL;
     387           0 :     fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, FD_VINYL_IO_FLAG_BLOCKING );
     388           0 :     FD_TEST( rd_req!=NULL );
     389           0 :     handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
     390           0 :     rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
     391           0 :     rd_req->seq = ULONG_MAX;
     392           0 :     rd_req->sz  = 0UL;
     393           0 :     ctx->vinyl.pending_rd_req_cnt--;
     394           0 :   }
     395           0 :   FD_CRIT( !ctx->vinyl.pending_rd_req_cnt, "pending read requests count not zero" );
     396           0 :   for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
     397           0 :     fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
     398           0 :     FD_CRIT( rd_req_ctx_get_status( rd_req->ctx )==VINYL_LTHASH_RD_REQ_FREE, "pending request status is not free" );
     399           0 :   }
     400           0 : }
     401             : 
     402             : FD_FN_UNUSED static uint
     403             : handle_lthash_completion( fd_snaplh_t * ctx,
     404           0 :                           fd_stem_context_t * stem ) {
     405           0 :   fd_lthash_adder_flush( ctx->adder, &ctx->running_lthash );
     406           0 :   fd_lthash_adder_flush( ctx->adder_sub, &ctx->running_lthash_sub );
     407           0 :   if( fd_seq_inc( ctx->wh_last_in_seq, 1UL )==ctx->wh_finish_fseq ) {
     408           0 :     fd_lthash_sub( &ctx->running_lthash, &ctx->running_lthash_sub );
     409           0 :     uchar * lthash_out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk );
     410           0 :     fd_memcpy( lthash_out, &ctx->running_lthash, sizeof(fd_lthash_value_t) );
     411           0 :     fd_stem_publish( stem, 0UL, FD_SNAPSHOT_HASH_MSG_RESULT_ADD, ctx->out.chunk, FD_LTHASH_LEN_BYTES, 0UL, 0UL, 0UL );
     412           0 :     ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, FD_LTHASH_LEN_BYTES, ctx->out.chunk0, ctx->out.wmark );
     413           0 :     return FD_SNAPSHOT_STATE_IDLE;
     414           0 :   }
     415           0 :   return ctx->state;
     416           0 : }
     417             : 
     418             : static void
     419             : before_credit( fd_snaplh_t *       ctx,
     420             :                fd_stem_context_t * stem FD_PARAM_UNUSED,
     421           0 :                int *               charge_busy ) {
     422           0 :   *charge_busy = !!consume_available_cqe( ctx, 1/*peek_first*/ );
     423           0 : }
     424             : 
     425             : static void
     426             : handle_wh_data_frag( fd_snaplh_t * ctx,
     427             :                      ulong         in_idx,
     428             :                      ulong         chunk,      /* compressed input pointer */
     429             :                      ulong         sz_comp,    /* compressed input size */
     430           0 :                      fd_stem_context_t * stem ) {
     431           0 :   FD_CRIT( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH, "incorrect in kind" );
     432             : 
     433           0 :   uchar const * rem    = fd_chunk_to_laddr_const( ctx->in[ in_idx ].base, chunk );
     434           0 :   ulong         rem_sz = sz_comp<<FD_VINYL_BSTREAM_BLOCK_LG_SZ;
     435           0 :   FD_CRIT( fd_ulong_is_aligned( (ulong)rem, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
     436           0 :   FD_CRIT( fd_ulong_is_aligned( rem_sz, FD_VINYL_BSTREAM_BLOCK_SZ ),     "misaligned write request" );
     437             : 
     438           0 :   while( rem_sz ) {
     439           0 :     FD_CRIT( rem_sz>=FD_VINYL_BSTREAM_BLOCK_SZ, "corrupted bstream block" );
     440             : 
     441           0 :     fd_vinyl_bstream_phdr_t const * phdr = (fd_vinyl_bstream_phdr_t *)rem;
     442           0 :     ulong ctl      = phdr->ctl;
     443           0 :     int   ctl_type = fd_vinyl_bstream_ctl_type( ctl );
     444           0 :     switch( ctl_type ) {
     445             : 
     446           0 :       case FD_VINYL_BSTREAM_CTL_TYPE_PAIR: {
     447           0 :         ulong val_esz = fd_vinyl_bstream_ctl_sz( ctl );
     448           0 :         ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
     449           0 :         if( FD_LIKELY( should_hash_account( ctx ) ) ) {
     450           0 :           uchar * pair = ctx->vinyl.pair_mem;
     451           0 :           fd_memcpy( pair, rem, pair_sz );
     452           0 :           streamlined_hash( ctx, ctx->adder, &ctx->running_lthash, pair );
     453           0 :         }
     454           0 :         rem    += pair_sz;
     455           0 :         rem_sz -= pair_sz;
     456           0 :         ctx->pairs_seen++;
     457           0 :         break;
     458           0 :       }
     459             : 
     460           0 :       case FD_VINYL_BSTREAM_CTL_TYPE_ZPAD: {
     461           0 :         rem    += FD_VINYL_BSTREAM_BLOCK_SZ;
     462           0 :         rem_sz -= FD_VINYL_BSTREAM_BLOCK_SZ;
     463           0 :         break;
     464           0 :       }
     465             : 
     466           0 :       default:
     467           0 :         FD_LOG_CRIT(( "unexpected vinyl bstream block ctl=%016lx", ctl ));
     468           0 :     }
     469           0 :   }
     470             : 
     471           0 :   if( ctx->state==FD_SNAPSHOT_STATE_FINISHING ) {
     472           0 :     ctx->state = handle_lthash_completion( ctx, stem );
     473           0 :   }
     474           0 : }
     475             : 
     476             : static void
     477             : handle_lv_data_frag( fd_snaplh_t * ctx,
     478             :                      ulong         in_idx,
     479             :                      ulong         chunk,      /* compressed input pointer */
     480           0 :                      ulong         sz_comp ) { /* compressed input size */
     481           0 :   (void)sz_comp;
     482           0 :   if( FD_LIKELY( should_process_lthash_request( ctx ) ) ) {
     483           0 :     uchar const * indata = fd_chunk_to_laddr_const( ctx->in[ in_idx ].wksp, chunk );
     484           0 :     ulong seq;
     485           0 :     fd_vinyl_bstream_phdr_t acc_hdr[1];
     486           0 :     memcpy( &seq,    indata, sizeof(ulong) );
     487           0 :     memcpy( acc_hdr, indata + sizeof(ulong), sizeof(fd_vinyl_bstream_phdr_t) );
     488           0 :     if( FD_LIKELY( ctx->vinyl.io_uring_enabled ) ) {
     489           0 :       handle_vinyl_lthash_request_ur( ctx, seq, acc_hdr );
     490           0 :     } else {
     491           0 :       handle_vinyl_lthash_request_bd( ctx, seq, acc_hdr );
     492           0 :     }
     493           0 :   }
     494           0 :   ctx->lthash_req_seen++;
     495           0 : }
     496             : 
     497             : static inline ulong
     498             : tsorig_tspub_to_fseq( ulong tsorig,
     499           0 :                       ulong tspub ) {
     500           0 :   return (tspub<<32 ) | tsorig;
     501           0 : }
     502             : 
     503             : static void
     504             : handle_control_frag( fd_snaplh_t * ctx,
     505             :                      ulong         sig,
     506             :                      ulong         tsorig,
     507             :                      ulong         tspub,
     508           0 :                     fd_stem_context_t * stem  ) {
     509           0 :   switch( sig ) {
     510           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
     511           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
     512           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
     513           0 :       ctx->full  = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
     514           0 :       ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
     515           0 :       fd_lthash_zero( &ctx->running_lthash );
     516           0 :       fd_lthash_zero( &ctx->running_lthash_sub );
     517           0 :       fd_lthash_adder_new( ctx->adder );
     518           0 :       fd_lthash_adder_new( ctx->adder_sub );
     519           0 :       break;
     520             : 
     521           0 :     case FD_SNAPSHOT_MSG_CTRL_FAIL:
     522           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ||
     523           0 :                ctx->state==FD_SNAPSHOT_STATE_FINISHING ||
     524           0 :                ctx->state==FD_SNAPSHOT_STATE_ERROR );
     525           0 :       ctx->state = FD_SNAPSHOT_STATE_IDLE;
     526           0 :       fd_lthash_zero( &ctx->running_lthash );
     527           0 :       fd_lthash_zero( &ctx->running_lthash_sub );
     528           0 :       fd_lthash_adder_new( ctx->adder );
     529           0 :       fd_lthash_adder_new( ctx->adder_sub );
     530           0 :       break;
     531             : 
     532           0 :     case FD_SNAPSHOT_MSG_CTRL_NEXT:
     533           0 :     case FD_SNAPSHOT_MSG_CTRL_DONE:{
     534           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ||
     535           0 :                ctx->state==FD_SNAPSHOT_STATE_FINISHING  ||
     536           0 :                ctx->state==FD_SNAPSHOT_STATE_ERROR );
     537           0 :       ctx->wh_finish_fseq = tsorig_tspub_to_fseq( tsorig, tspub );
     538           0 :       ctx->state = FD_SNAPSHOT_STATE_FINISHING;
     539             : 
     540           0 :       if( ctx->state==FD_SNAPSHOT_STATE_FINISHING ) {
     541           0 :         if( FD_LIKELY( ctx->vinyl.io_uring_enabled ) ) {
     542           0 :           handle_vinyl_lthash_request_ur_consume_all( ctx );
     543           0 :         }
     544           0 :         ctx->state = handle_lthash_completion( ctx, stem );
     545           0 :       }
     546           0 :       break;
     547           0 :     }
     548             : 
     549           0 :     case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
     550           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
     551           0 :       ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
     552           0 :       break;
     553             : 
     554           0 :     case FD_SNAPSHOT_MSG_CTRL_ERROR:
     555           0 :       ctx->state = FD_SNAPSHOT_STATE_ERROR;
     556           0 :       break;
     557             : 
     558           0 :     default:
     559           0 :       FD_LOG_ERR(( "unexpected control sig %lu", sig ));
     560           0 :       return;
     561           0 :   }
     562           0 : }
     563             : 
     564             : static inline int
     565             : returnable_frag( fd_snaplh_t *       ctx,
     566             :                  ulong               in_idx,
     567             :                  ulong               seq,
     568             :                  ulong               sig,
     569             :                  ulong               chunk,
     570             :                  ulong               sz,
     571             :                  ulong               ctl    FD_PARAM_UNUSED,
     572             :                  ulong               tsorig,
     573             :                  ulong               tspub,
     574           0 :                  fd_stem_context_t * stem ) {
     575           0 :   FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     576             : 
     577           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH ) )          handle_wh_data_frag( ctx, in_idx, chunk, sz/*sz_comp*/, stem );
     578           0 :   else if( FD_UNLIKELY( sig==FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH ) ) handle_lv_data_frag( ctx, in_idx, chunk, sz );
     579           0 :   else                                                               handle_control_frag( ctx, sig, tsorig, tspub, stem );
     580             : 
     581             :   /* Because fd_stem may not return flow control credits fast enough,
     582             :      always update fseq (consumer progress) here. */
     583           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH ) ) {
     584           0 :     ctx->wh_last_in_seq = seq;
     585           0 :     fd_fseq_update( ctx->in[ in_idx ].seq_sync, fd_seq_inc( seq, 1UL ) );
     586           0 :   }
     587             : 
     588           0 :   return 0;
     589           0 : }
     590             : 
     591             : static ulong
     592             : populate_allowed_fds( fd_topo_t      const * topo FD_PARAM_UNUSED,
     593             :                       fd_topo_tile_t const * tile FD_PARAM_UNUSED,
     594             :                       ulong                  out_fds_cnt,
     595           0 :                       int *                  out_fds ) {
     596           0 :   if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
     597             : 
     598           0 :   ulong out_cnt = 0;
     599           0 :   out_fds[ out_cnt++ ] = 2UL; /* stderr */
     600           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
     601           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     602           0 :   }
     603             : 
     604           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     605           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     606           0 :   fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
     607             : 
     608           0 :   out_fds[ out_cnt++ ] = ctx->vinyl.dev_fd;
     609             : 
     610             :   #if FD_HAS_LIBURING
     611             :   if( FD_LIKELY( !!ctx->vinyl.ring ) ) out_fds[ out_cnt++ ] = ctx->vinyl.ring->ring_fd;
     612             :   #endif
     613             : 
     614           0 :   return out_cnt;
     615           0 : }
     616             : 
     617             : static ulong
     618             : populate_allowed_seccomp( fd_topo_t const *      topo,
     619             :                           fd_topo_tile_t const * tile,
     620             :                           ulong                  out_cnt,
     621           0 :                           struct sock_filter *   out ) {
     622           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     623           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     624           0 :   fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
     625             : 
     626           0 :   int ring_fd = INT_MAX;
     627             :   #if FD_HAS_LIBURING
     628             :   if( !!ctx->vinyl.ring ) ring_fd = ctx->vinyl.ring->ring_fd;
     629             :   #endif
     630           0 :   populate_sock_filter_policy_fd_snaplh_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->vinyl.dev_fd, (uint)ring_fd );
     631           0 :   return sock_filter_policy_fd_snaplh_tile_instr_cnt;
     632           0 : }
     633             : 
     634             : static void
     635             : privileged_init( fd_topo_t *      topo,
     636           0 :                  fd_topo_tile_t * tile ) {
     637           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     638             : 
     639           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     640           0 :   fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t),     sizeof(fd_snaplh_t)                                );
     641           0 :   void *   pair_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ                          ); (void)pair_mem;
     642           0 :   void *   pair_tmp = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ                          ); (void)pair_tmp;
     643             :   #if FD_HAS_LIBURING
     644             :   void * rd_req_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ  ); (void)rd_req_mem;
     645             :   void *  _ring_mem = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct io_uring), sizeof(struct io_uring)                            );
     646             :   void *  uring_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_vinyl_io_ur_align(),   fd_vinyl_io_ur_footprint(VINYL_LTHASH_IO_SPAD_MAX) );
     647             :   #endif
     648             : 
     649           0 :   FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
     650             : 
     651             :   /* Set up io_bd dependencies */
     652             : 
     653           0 :   char const * bstream_path = tile->snaplh.vinyl_path;
     654             :   /* Note: it would be possible to use O_DIRECT, but it would require
     655             :      VINYL_LTHASH_BLOCK_ALIGN to be 4096UL, which substantially
     656             :      increases the read overhead, making it slower (keep in mind that
     657             :      a rather large subset of mainnet accounts typically fits inside
     658             :      one FD_VINYL_BSTREAM_BLOCK_SZ. */
     659           0 :   int dev_fd = open( bstream_path, O_RDONLY|O_CLOEXEC, 0444 );
     660           0 :   if( FD_UNLIKELY( dev_fd<0 ) ) {
     661           0 :     FD_LOG_ERR(( "open(%s,O_RDONLY|O_CLOEXEC, 0444) failed (%i-%s)",
     662           0 :                  bstream_path, errno, fd_io_strerror( errno ) ));
     663           0 :   }
     664             : 
     665           0 :   struct stat st;
     666           0 :   if( FD_UNLIKELY( 0!=fstat( dev_fd, &st ) ) ) FD_LOG_ERR(( "fstat(%s) failed (%i-%s)", bstream_path, errno, strerror( errno ) ));
     667             : 
     668           0 :   ctx->vinyl.dev_fd  = dev_fd;
     669           0 :   ulong bstream_sz   = (ulong)st.st_size;
     670           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( bstream_sz, FD_VINYL_BSTREAM_BLOCK_SZ ) ) ) {
     671           0 :     FD_LOG_ERR(( "vinyl file %s has misaligned size (%lu bytes)", bstream_path, bstream_sz ));
     672           0 :   }
     673           0 :   ctx->vinyl.dev_sz   = bstream_sz;
     674           0 :   ctx->vinyl.dev_base = FD_VINYL_BSTREAM_BLOCK_SZ;
     675             : 
     676           0 :   if( FD_LIKELY( tile->snaplh.io_uring_enabled ) ) {
     677           0 :     #if !FD_HAS_LIBURING
     678           0 :       FD_LOG_ERR(( "config has enabled io_uring but liburing is not available" ));
     679           0 :     #endif
     680           0 :   }
     681           0 :   ctx->vinyl.io_uring_enabled = !!tile->snaplh.io_uring_enabled;
     682             : 
     683           0 :   ctx->vinyl.io               = NULL;
     684           0 :   ctx->vinyl.ring             = NULL;
     685             :   #if FD_HAS_LIBURING
     686             :   if( FD_LIKELY( tile->snaplh.io_uring_enabled ) ) {
     687             :     /* Join the bstream using io_ur */
     688             :     struct io_uring * ring = _ring_mem;
     689             :     uint depth = 2 * VINYL_LTHASH_RD_REQ_MAX;
     690             :     struct io_uring_params params = {
     691             :       .flags = IORING_SETUP_CQSIZE |
     692             :               IORING_SETUP_DEFER_TASKRUN |
     693             :               IORING_SETUP_SINGLE_ISSUER |
     694             :               IORING_SETUP_COOP_TASKRUN,
     695             :       .cq_entries = depth
     696             :     };
     697             :     int init_err = io_uring_queue_init_params( depth, ring, &params );
     698             :     if( FD_UNLIKELY( init_err==-EPERM ) ) {
     699             :       FD_LOG_ERR(( "missing privileges to setup io_uring" ));
     700             :     } else if( init_err<0 ) {
     701             :       FD_LOG_ERR(( "io_uring_queue_init_params failed (%i-%s)", init_err, fd_io_strerror( -init_err ) ));
     702             :     }
     703             :     FD_TEST( 0==io_uring_register_files( ring, &dev_fd, 1 ) );
     704             : 
     705             :     ulong align = fd_vinyl_io_ur_align();
     706             :     FD_TEST( fd_ulong_is_pow2( align ) );
     707             : 
     708             :     ulong footprint = fd_vinyl_io_ur_footprint( VINYL_LTHASH_IO_SPAD_MAX );
     709             :     FD_TEST( fd_ulong_is_aligned( footprint, align ) );
     710             : 
     711             :     /* Before invoking fd_vinyl_io_ur_init, the sync block must be
     712             :        already available.  Although in principle one could keep
     713             :        calling fd_vinyl_io_ur_init until it returns !=NULL, doing this
     714             :        would log uncessary (and misleading) warnings. */
     715             :     for(;;) {
     716             :       fd_vinyl_bstream_block_t block[1];
     717             :       ulong dev_sync = 0UL; /* Use the beginning of the file for the sync block */
     718             :       bd_read( dev_fd, dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ );
     719             :       int type = fd_vinyl_bstream_ctl_type ( block->sync.ctl );
     720             :       if( FD_UNLIKELY( type != FD_VINYL_BSTREAM_CTL_TYPE_SYNC ) ) continue;
     721             :       ulong io_seed = block->sync.hash_trail;
     722             :       if( FD_LIKELY( !fd_vinyl_bstream_block_test( io_seed, block ) ) ) break;
     723             :       FD_SPIN_PAUSE();
     724             :     }
     725             : 
     726             :     fd_vinyl_io_t * io = fd_vinyl_io_ur_init( uring_mem, VINYL_LTHASH_IO_SPAD_MAX, dev_fd, ring );
     727             :     FD_TEST( io );
     728             :     ctx->vinyl.io = io;
     729             : 
     730             :     ctx->vinyl.ring = ring;
     731             :   }
     732             :   #endif
     733           0 : }
     734             : 
     735             : static void
     736             : unprivileged_init( fd_topo_t *      topo,
     737           0 :                    fd_topo_tile_t * tile ) {
     738           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     739             : 
     740           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     741           0 :   fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t),     sizeof(fd_snaplh_t)                               );
     742           0 :   void *   pair_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ                         );
     743           0 :   void *   pair_tmp = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ                         );
     744           0 :   void * rd_req_mem = NULL;
     745             :   #if FD_HAS_LIBURING
     746             :   rd_req_mem        = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ );
     747             :   #endif
     748             : 
     749           0 :   FD_TEST( fd_topo_tile_name_cnt( topo, "snaplh" )<=FD_SNAPSHOT_MAX_SNAPLH_TILES );
     750             : 
     751           0 :   ctx->vinyl.pair_mem = pair_mem;
     752           0 :   ctx->vinyl.pair_tmp = pair_tmp;
     753             : 
     754           0 :   if( FD_UNLIKELY( tile->in_cnt!=IN_CNT_MAX ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected %lu", tile->in_cnt, IN_CNT_MAX ));
     755           0 :   if( FD_UNLIKELY( tile->out_cnt!=1UL       ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 1",  tile->out_cnt            ));
     756             : 
     757           0 :   ctx->io_seed = NULL;
     758             : 
     759           0 :   for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
     760           0 :     fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
     761           0 :     fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
     762           0 :     if( FD_LIKELY( 0==strcmp( in_link->name, "snaplv_lh" ) ) ) {
     763           0 :       ctx->in[ i ].wksp     = in_wksp->wksp;
     764           0 :       ctx->in[ i ].chunk0   = fd_dcache_compact_chunk0( ctx->in[ i ].wksp, in_link->dcache );
     765           0 :       ctx->in[ i ].wmark    = fd_dcache_compact_wmark( ctx->in[ i ].wksp, in_link->dcache, in_link->mtu );
     766           0 :       ctx->in[ i ].mtu      = in_link->mtu;
     767           0 :       ctx->in[ i ].base     = NULL;
     768           0 :       ctx->in[ i ].seq_sync = NULL;
     769           0 :       ctx->in_kind[ i ]     = IN_KIND_SNAPLV;
     770           0 :     } else if( FD_LIKELY( 0==strcmp( in_link->name, "snapwh_wr" ) ) ) {
     771           0 :       ctx->in[ i ].wksp     = in_wksp->wksp;
     772           0 :       ctx->in[ i ].chunk0   = 0;
     773           0 :       ctx->in[ i ].wmark    = 0;
     774           0 :       ctx->in[ i ].mtu      = 0;
     775           0 :       ctx->in[ i ].base     = fd_dcache_join( fd_topo_obj_laddr( topo, tile->snaplh.dcache_obj_id ) );
     776           0 :       ctx->in[ i ].seq_sync = tile->in_link_fseq[ i ];
     777           0 :       ctx->wh_last_in_seq   = fd_fseq_query( tile->in_link_fseq[ i ] );
     778           0 :       ctx->in_kind[ i ]     = IN_KIND_SNAPWH;
     779           0 :       ctx->io_seed          = (ulong const *)fd_dcache_app_laddr_const( ctx->in[ i ].base );
     780           0 :       FD_TEST( ctx->in[ i ].base );
     781           0 :     } else {
     782           0 :       FD_LOG_ERR(( "tile `" NAME "` has unexpected in link name `%s`", in_link->name ));
     783           0 :     }
     784           0 :   }
     785             : 
     786           0 :   FD_TEST( ctx->io_seed );
     787             : 
     788           0 :   fd_topo_link_t * out_link = &topo->links[ tile->out_link_id[ 0UL ] ];
     789           0 :   ctx->out.wksp    = topo->workspaces[ topo->objs[ out_link->dcache_obj_id ].wksp_id ].wksp;
     790           0 :   ctx->out.chunk0  = fd_dcache_compact_chunk0( fd_wksp_containing( out_link->dcache ), out_link->dcache );
     791           0 :   ctx->out.wmark   = fd_dcache_compact_wmark ( ctx->out.wksp, out_link->dcache, out_link->mtu );
     792           0 :   ctx->out.chunk   = ctx->out.chunk0;
     793           0 :   ctx->out.mtu     = out_link->mtu;
     794           0 :   FD_TEST( 0==strcmp( out_link->name, "snaplh_lv" ) );
     795             : 
     796           0 :   fd_lthash_adder_new( ctx->adder );
     797           0 :   fd_lthash_adder_new( ctx->adder_sub );
     798             : 
     799           0 :   ctx->metrics.full.accounts_hashed        = 0UL;
     800           0 :   ctx->metrics.incremental.accounts_hashed = 0UL;
     801             : 
     802           0 :   memset( ctx->vinyl.pending.phdr,   0, sizeof(fd_vinyl_bstream_phdr_t) * VINYL_LTHASH_RD_REQ_MAX );
     803           0 :   memset( ctx->vinyl.pending.rd_req, 0, sizeof(fd_vinyl_io_rd_t)        * VINYL_LTHASH_RD_REQ_MAX );
     804           0 :   for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
     805           0 :     fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
     806           0 :     rd_req->ctx = rd_req_ctx_from_parts( i, VINYL_LTHASH_RD_REQ_FREE );
     807           0 :     rd_req->dst = NULL;
     808           0 :     if( rd_req_mem!=NULL ) {
     809           0 :       rd_req->dst = ((uchar*)rd_req_mem) + i*VINYL_LTHASH_BLOCK_MAX_SZ;
     810           0 :     }
     811           0 :   }
     812           0 :   ctx->vinyl.pending_rd_req_cnt = 0UL;
     813             : 
     814           0 :   ctx->state                   = FD_SNAPSHOT_STATE_IDLE;
     815           0 :   ctx->full                    = 1;
     816           0 :   ctx->lthash_tile_cnt         = fd_topo_tile_name_cnt( topo, "snaplh" );
     817           0 :   ctx->lthash_tile_idx         = tile->kind_id;
     818             :   /* This may seem redundant, but it provides flexibility around which
     819             :      tiles and do addition and subtraction of lthash. */
     820           0 :   ctx->lthash_tile_add_cnt     = ctx->lthash_tile_cnt;
     821           0 :   ctx->lthash_tile_sub_cnt     = ctx->lthash_tile_cnt;
     822           0 :   ctx->lthash_tile_add_idx     = ctx->lthash_tile_idx;
     823           0 :   ctx->lthash_tile_sub_idx     = ctx->lthash_tile_idx;
     824           0 :   if( ctx->lthash_tile_add_idx != ULONG_MAX ) FD_TEST( ctx->lthash_tile_add_idx < ctx->lthash_tile_add_cnt );
     825           0 :   if( ctx->lthash_tile_sub_idx != ULONG_MAX ) FD_TEST( ctx->lthash_tile_sub_idx < ctx->lthash_tile_sub_cnt );
     826           0 :   ctx->pairs_seen              = 0UL;
     827           0 :   ctx->lthash_req_seen         = 0UL;
     828           0 :   fd_lthash_zero( &ctx->running_lthash );
     829           0 :   fd_lthash_zero( &ctx->running_lthash_sub );
     830           0 : }
     831             : 
     832           0 : #define STEM_BURST 1UL
     833           0 : #define STEM_LAZY  1000L
     834             : 
     835           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_snaplh_t
     836           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaplh_t)
     837             : 
     838             : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
     839           0 : #define STEM_CALLBACK_METRICS_WRITE   metrics_write
     840           0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
     841           0 : #define STEM_CALLBACK_BEFORE_CREDIT   before_credit
     842             : 
     843             : #include "../../disco/stem/fd_stem.c"
     844             : 
     845             : fd_topo_run_tile_t fd_tile_snaplh = {
     846             :   .name                     = NAME,
     847             :   .populate_allowed_fds     = populate_allowed_fds,
     848             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     849             :   .scratch_align            = scratch_align,
     850             :   .scratch_footprint        = scratch_footprint,
     851             :   .privileged_init          = privileged_init,
     852             :   .unprivileged_init        = unprivileged_init,
     853             :   .run                      = stem_run,
     854             : };
     855             : 
     856             : #undef NAME

Generated by: LCOV version 1.14