LCOV - code coverage report
Current view: top level - discof/restore - fd_snaplh_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 575 0.0 %
Date: 2026-02-13 06:06:24 Functions: 0 31 0.0 %

          Line data    Source code
       1             : #include "../../disco/topo/fd_topo.h"
       2             : #include "../../disco/metrics/fd_metrics.h"
       3             : #include "../../ballet/lthash/fd_lthash.h"
       4             : #include "../../ballet/lthash/fd_lthash_adder.h"
       5             : #include "../../util/pod/fd_pod.h"
       6             : #include "../../vinyl/io/fd_vinyl_io.h"
       7             : #include "../../vinyl/io/ur/fd_vinyl_io_ur_private.h"
       8             : #include "../../vinyl/bstream/fd_vinyl_bstream.h"
       9             : #include "../../util/io_uring/fd_io_uring_setup.h"
      10             : #include "../../util/io_uring/fd_io_uring_register.h"
      11             : #include "../../util/io_uring/fd_io_uring.h"
      12             : #include "generated/fd_snaplh_tile_seccomp.h"
      13             : 
      14             : #include "utils/fd_ssctrl.h"
      15             : #include "utils/fd_vinyl_admin.h"
      16             : 
      17             : #include <errno.h>
      18             : #include <sys/stat.h> /* fstat */
      19             : #include <fcntl.h>    /* open  */
      20             : #include <unistd.h>   /* close */
      21             : 
      22             : #include "../../vinyl/io/ur/fd_vinyl_io_ur.h"
      23             : 
      24             : #define NAME "snaplh"
      25             : 
      26             : #define IN_CNT_MAX     (2UL)
      27           0 : #define IN_KIND_SNAPLV (0UL)
      28           0 : #define IN_KIND_SNAPWH (1UL)
      29             : 
      30             : #define VINYL_LTHASH_BLOCK_ALIGN  FD_VINYL_BSTREAM_BLOCK_SZ
      31           0 : #define VINYL_LTHASH_BLOCK_MAX_SZ (16UL<<20)
      32             : FD_STATIC_ASSERT( VINYL_LTHASH_BLOCK_MAX_SZ>(sizeof(fd_snapshot_full_account_t)+FD_VINYL_BSTREAM_BLOCK_SZ+2*VINYL_LTHASH_BLOCK_ALIGN), "VINYL_LTHASH_BLOCK_MAX_SZ" );
      33             : 
      34           0 : #define VINYL_LTHASH_RD_REQ_MAX   (32UL)
      35           0 : #define VINYL_LTHASH_IORING_DEPTH (2*VINYL_LTHASH_RD_REQ_MAX)
      36             : 
      37           0 : #define VINYL_LTHASH_IO_SPAD_MAX  (2<<20UL)
      38             : 
      39           0 : #define VINYL_LTHASH_RD_REQ_FREE  (0UL)
      40           0 : #define VINYL_LTHASH_RD_REQ_PEND  (1UL)
      41           0 : #define VINYL_LTHASH_RD_REQ_SENT  (2UL)
      42             : 
      43             : struct in_link_private {
      44             :   fd_wksp_t *  wksp;
      45             :   ulong        chunk0;
      46             :   ulong        wmark;
      47             :   ulong        mtu;
      48             :   void const * base;
      49             :   ulong *      seq_sync;  /* fseq->seq[0] */
      50             : };
      51             : typedef struct in_link_private in_link_t;
      52             : 
      53             : struct out_link_private {
      54             :   fd_wksp_t *  wksp;
      55             :   ulong        chunk0;
      56             :   ulong        wmark;
      57             :   ulong        chunk;
      58             :   ulong        mtu;
      59             : };
      60             : typedef struct out_link_private out_link_t;
      61             : 
      62             : struct fd_snaplh_tile {
      63             :   uint state;
      64             :   int  full;
      65             : 
      66             :   ulong seed;
      67             :   ulong lthash_tile_cnt;
      68             :   ulong lthash_tile_idx;
      69             :   ulong lthash_tile_add_cnt;
      70             :   ulong lthash_tile_sub_cnt;
      71             :   ulong lthash_tile_add_idx;
      72             :   ulong lthash_tile_sub_idx;
      73             :   ulong pairs_seen;
      74             :   ulong lthash_req_seen;
      75             : 
      76             :   /* Database params */
      77             :   ulong const * io_seed;
      78             : 
      79             :   fd_lthash_adder_t   adder[1];
      80             :   fd_lthash_adder_t   adder_sub[1];
      81             :   uchar               data[FD_RUNTIME_ACC_SZ_MAX];
      82             : 
      83             :   fd_lthash_value_t   running_lthash;
      84             :   fd_lthash_value_t   running_lthash_sub;
      85             : 
      86             :   struct {
      87             :     int               dev_fd;
      88             :     ulong             dev_sz;
      89             :     ulong             dev_base;
      90             :     void *            pair_mem;
      91             :     void *            pair_tmp;
      92             : 
      93             :     struct {
      94             :       fd_vinyl_bstream_phdr_t phdr  [VINYL_LTHASH_RD_REQ_MAX];
      95             :       fd_vinyl_io_rd_t        rd_req[VINYL_LTHASH_RD_REQ_MAX];
      96             :     } pending;
      97             :     ulong             pending_rd_req_cnt;
      98             : 
      99             :     fd_vinyl_io_t *   io;
     100             :     fd_vinyl_admin_t * admin;
     101             :   } vinyl;
     102             : 
     103             :   struct {
     104             :     struct {
     105             :       ulong accounts_hashed;
     106             :     } full;
     107             : 
     108             :     struct {
     109             :       ulong accounts_hashed;
     110             :     } incremental;
     111             :   } metrics;
     112             : 
     113             :   ulong       wh_finish_fseq;
     114             :   ulong       wh_last_in_seq;
     115             : 
     116             :   in_link_t   in[IN_CNT_MAX];
     117             :   uchar       in_kind[IN_CNT_MAX];
     118             :   out_link_t  out;
     119             : 
     120             :   int         lthash_completion_pending;
     121             :   int         fail_completion_pending;
     122             : 
     123             :   /* io_uring setup */
     124             : 
     125             :   fd_io_uring_t ioring[1];
     126             :   int           io_uring_enabled;
     127             : };
     128             : 
     129             : typedef struct fd_snaplh_tile fd_snaplh_t;
     130             : 
     131             : static inline int
     132           0 : should_shutdown( fd_snaplh_t * ctx ) {
     133           0 :   return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
     134           0 : }
     135             : 
     136             : static ulong
     137           0 : scratch_align( void ) {
     138           0 :   return alignof(fd_snaplh_t);
     139           0 : }
     140             : 
     141             : static ulong
     142           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     143           0 :   (void)tile;
     144           0 :   ulong l = FD_LAYOUT_INIT;
     145           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_snaplh_t),      sizeof(fd_snaplh_t)                                );
     146           0 :   l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN,  VINYL_LTHASH_BLOCK_MAX_SZ                          );
     147           0 :   l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN,  VINYL_LTHASH_BLOCK_MAX_SZ                          );
     148           0 :   l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN,  VINYL_LTHASH_BLOCK_MAX_SZ                          );
     149           0 :   l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN,  VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ  );
     150           0 :   l = FD_LAYOUT_APPEND( l, fd_vinyl_io_ur_align(),    fd_vinyl_io_ur_footprint(VINYL_LTHASH_IO_SPAD_MAX) );
     151           0 :   l = FD_LAYOUT_APPEND( l, fd_io_uring_shmem_align(), fd_io_uring_shmem_footprint( VINYL_LTHASH_IORING_DEPTH, VINYL_LTHASH_IORING_DEPTH ) );
     152           0 :   return FD_LAYOUT_FINI( l, alignof(fd_snaplh_t) );
     153           0 : }
     154             : 
     155             : static void
     156           0 : metrics_write( fd_snaplh_t * ctx ) {
     157           0 :   FD_MGAUGE_SET( SNAPLH, FULL_ACCOUNTS_HASHED,        ctx->metrics.full.accounts_hashed );
     158           0 :   FD_MGAUGE_SET( SNAPLH, INCREMENTAL_ACCOUNTS_HASHED, ctx->metrics.incremental.accounts_hashed );
     159           0 :   FD_MGAUGE_SET( SNAPLH, STATE,                       (ulong)(ctx->state) );
     160           0 : }
     161             : 
     162             : static inline int
     163           0 : should_hash_account( fd_snaplh_t * ctx ) {
     164           0 :   return (ctx->pairs_seen % ctx->lthash_tile_add_cnt)==ctx->lthash_tile_add_idx;
     165           0 : }
     166             : 
     167             : static inline int
     168           0 : should_process_lthash_request( fd_snaplh_t * ctx ) {
     169           0 :   return (ctx->lthash_req_seen % ctx->lthash_tile_sub_cnt)==ctx->lthash_tile_sub_idx;
     170           0 : }
     171             : 
     172             : static void
     173             : streamlined_hash( fd_snaplh_t *       restrict ctx,
     174             :                   fd_lthash_adder_t * restrict adder,
     175             :                   fd_lthash_value_t * restrict running_lthash,
     176           0 :                   uchar const *       restrict _pair ) {
     177           0 :   uchar const * pair = _pair;
     178           0 :   fd_vinyl_bstream_phdr_t const * phdr = (fd_vinyl_bstream_phdr_t const *)pair;
     179           0 :   pair += sizeof(fd_vinyl_bstream_phdr_t);
     180           0 :   fd_account_meta_t const * meta = (fd_account_meta_t const *)pair;
     181           0 :   pair += sizeof(fd_account_meta_t);
     182           0 :   uchar const * data = pair;
     183             : 
     184           0 :   ulong data_len      = meta->dlen;
     185           0 :   const char * pubkey = phdr->key.c;
     186           0 :   ulong lamports      = meta->lamports;
     187           0 :   const uchar * owner = meta->owner;
     188           0 :   uchar executable = (uchar)( !meta->executable ? 0U : 1U) ;
     189             : 
     190           0 :   if( FD_UNLIKELY( data_len > FD_RUNTIME_ACC_SZ_MAX ) ) FD_LOG_ERR(( "Found unusually large account (data_sz=%lu), aborting", data_len ));
     191           0 :   if( FD_UNLIKELY( lamports==0UL ) ) return;
     192             : 
     193           0 :   fd_lthash_adder_push_solana_account( adder,
     194           0 :                                        running_lthash,
     195           0 :                                        pubkey,
     196           0 :                                        data,
     197           0 :                                        data_len,
     198           0 :                                        lamports,
     199           0 :                                        executable,
     200           0 :                                        owner );
     201             : 
     202           0 :   if( FD_LIKELY( ctx->full ) ) ctx->metrics.full.accounts_hashed++;
     203           0 :   else                         ctx->metrics.incremental.accounts_hashed++;
     204           0 : }
     205             : 
     206             : static void
     207             : handle_vinyl_lthash_request_bd( fd_snaplh_t *             ctx,
     208             :                                 ulong                     seq,
     209           0 :                                 fd_vinyl_bstream_phdr_t * acc_hdr ) {
     210             : 
     211             :   /* The bd version is blocking, therefore ctx->pending is not used. */
     212           0 :   ulong const io_seed = FD_VOLATILE_CONST( *ctx->io_seed );
     213             : 
     214           0 :   ulong val_esz = fd_vinyl_bstream_ctl_sz( acc_hdr->ctl );
     215           0 :   ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
     216             : 
     217             :   /* dev_seq shows where the seq is physically located in device. */
     218           0 :   ulong dev_seq  = ( seq + ctx->vinyl.dev_base ) % ctx->vinyl.dev_sz;
     219           0 :   ulong rd_off   = fd_ulong_align_dn( dev_seq, FD_VINYL_BSTREAM_BLOCK_SZ );
     220           0 :   ulong pair_off = (dev_seq - rd_off);
     221           0 :   ulong rd_sz    = fd_ulong_align_up( pair_off + pair_sz, FD_VINYL_BSTREAM_BLOCK_SZ );
     222           0 :   FD_TEST( rd_sz < VINYL_LTHASH_BLOCK_MAX_SZ );
     223             : 
     224           0 :   uchar * pair = ((uchar*)ctx->vinyl.pair_mem) + pair_off;
     225           0 :   fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)pair;
     226             : 
     227           0 :   for(;;) {
     228           0 :     ulong sz    = rd_sz;
     229           0 :     ulong rsz   = fd_ulong_min( rd_sz, ctx->vinyl.dev_sz - rd_off );
     230           0 :     uchar * dst = ctx->vinyl.pair_mem;
     231           0 :     uchar * tmp = ctx->vinyl.pair_tmp;
     232             : 
     233           0 :     bd_read( ctx->vinyl.dev_fd, rd_off, dst, rsz );
     234           0 :     sz -= rsz;
     235           0 :     if( FD_UNLIKELY( sz ) ) {
     236             :       /* When the dev wraps around, the dev_base needs to be skipped.
     237             :          This means: increase the size multiple of the alignment,
     238             :          read into a temporary buffer, and memcpy into the dst at the
     239             :          correct offset. */
     240           0 :       bd_read( ctx->vinyl.dev_fd, 0, tmp, sz + FD_VINYL_BSTREAM_BLOCK_SZ );
     241           0 :       fd_memcpy( dst + rsz, tmp + ctx->vinyl.dev_base, sz );
     242           0 :     }
     243             : 
     244           0 :     if( FD_LIKELY( !memcmp( phdr, acc_hdr, sizeof(fd_vinyl_bstream_phdr_t)) ) ) {
     245             : 
     246             :       /* test bstream pair integrity hashes */
     247           0 :       int test = !fd_vinyl_bstream_pair_test( io_seed, seq, (fd_vinyl_bstream_block_t *)pair, pair_sz );
     248           0 :       if( FD_LIKELY( test ) ) break;
     249           0 :     }
     250           0 :     FD_LOG_WARNING(( "phdr mismatch! - this should not happen under bstream_seq" ));
     251           0 :     FD_SPIN_PAUSE();
     252           0 :   }
     253             : 
     254           0 :   streamlined_hash( ctx, ctx->adder_sub, &ctx->running_lthash_sub, pair );
     255           0 : }
     256             : 
     257             : FD_FN_UNUSED static inline ulong
     258           0 : rd_req_ctx_get_idx( ulong rd_req_ctx ) {
     259           0 :   return ( rd_req_ctx >>  0 ) & ((1UL<<32)-1UL);
     260           0 : }
     261             : 
     262             : FD_FN_UNUSED static inline ulong
     263           0 : rd_req_ctx_get_status( ulong rd_req_ctx ) {
     264           0 :   return ( rd_req_ctx >> 32 ) & ((1UL<<32)-1UL);
     265           0 : }
     266             : 
     267             : FD_FN_UNUSED static inline void
     268             : rd_req_ctx_into_parts( ulong   rd_req_ctx,
     269             :                        ulong * idx,
     270           0 :                        ulong * status ) {
     271           0 :   *idx    = rd_req_ctx_get_idx( rd_req_ctx );
     272           0 :   *status = rd_req_ctx_get_status( rd_req_ctx );
     273           0 : }
     274             : 
     275             : FD_FN_UNUSED static inline ulong
     276             : rd_req_ctx_from_parts( ulong idx,
     277           0 :                        ulong status ) {
     278           0 :   return ( idx & ((1UL<<32)-1UL) ) | ( status << 32 );
     279           0 : }
     280             : 
     281             : FD_FN_UNUSED static inline ulong
     282             : rd_req_ctx_update_status( ulong rd_req_ctx,
     283           0 :                           ulong status ) {
     284           0 :   return rd_req_ctx_from_parts( rd_req_ctx_get_idx( rd_req_ctx ), status );
     285           0 : }
     286             : 
     287             : static void
     288             : handle_vinyl_lthash_compute_from_rd_req( fd_snaplh_t *      ctx,
     289           0 :                                          fd_vinyl_io_rd_t * rd_req ) {
     290           0 :   ulong idx = rd_req_ctx_get_idx( rd_req->ctx );
     291             : 
     292           0 :   fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)rd_req->dst;
     293           0 :   fd_vinyl_bstream_phdr_t * acc_hdr = &ctx->vinyl.pending.phdr[ idx ];
     294             : 
     295             :   /* test the retrieved header (it must mach the request) */
     296           0 :   FD_TEST( !memcmp( phdr, acc_hdr, sizeof(fd_vinyl_bstream_phdr_t)) );
     297             : 
     298           0 :   ulong const io_seed = FD_VOLATILE_CONST( *ctx->io_seed );
     299           0 :   ulong   seq     = rd_req->seq;
     300           0 :   uchar * pair    = (uchar*)rd_req->dst;
     301           0 :   ulong   pair_sz = rd_req->sz;
     302             : 
     303             :   /* test the bstream pair integrity hashes */
     304           0 :   FD_TEST( !fd_vinyl_bstream_pair_test( io_seed, seq, (fd_vinyl_bstream_block_t *)pair, pair_sz ) );
     305             : 
     306           0 :   streamlined_hash( ctx, ctx->adder_sub, &ctx->running_lthash_sub, pair );
     307           0 : }
     308             : 
     309             : /* Process next read completion */
     310             : 
     311             : static inline ulong
     312           0 : consume_available_cqe( fd_snaplh_t * ctx ) {
     313           0 :   if( FD_LIKELY( !ctx->vinyl.pending_rd_req_cnt ) ) return 0UL;
     314           0 :   if( FD_UNLIKELY( !ctx->io_uring_enabled ) ) return 0UL;
     315           0 :   if( !fd_io_uring_cq_ready( ctx->ioring->cq ) ) return 0UL;
     316             : 
     317             :   /* At this point, there is at least one unconsumed CQE */
     318             : 
     319           0 :   fd_vinyl_io_rd_t * rd_req = NULL;
     320           0 :   if( FD_LIKELY( fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, 0/*non blocking*/ )==FD_VINYL_SUCCESS ) ) {
     321           0 :     handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
     322           0 :     rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
     323           0 :     rd_req->seq = ULONG_MAX;
     324           0 :     rd_req->sz  = 0UL;
     325           0 :     ctx->vinyl.pending_rd_req_cnt--;
     326           0 :     return 1UL;
     327           0 :   }
     328           0 :   return 0UL;
     329           0 : }
     330             : 
     331             : static void
     332             : handle_vinyl_lthash_request_ur( fd_snaplh_t *             ctx,
     333             :                                 ulong                     seq,
     334           0 :                                 fd_vinyl_bstream_phdr_t * acc_hdr ) {
     335             :   /* Find a free slot */
     336           0 :   ulong free_i = ULONG_MAX;
     337           0 :   if( FD_LIKELY( ctx->vinyl.pending_rd_req_cnt<VINYL_LTHASH_RD_REQ_MAX ) ) {
     338           0 :     for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
     339           0 :       fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
     340           0 :       if( FD_UNLIKELY( rd_req_ctx_get_status( rd_req->ctx )==VINYL_LTHASH_RD_REQ_FREE ) ) {
     341           0 :         free_i = i;
     342           0 :         break;
     343           0 :       }
     344           0 :     }
     345           0 :   } else {
     346           0 :     fd_vinyl_io_rd_t * rd_req = NULL;
     347           0 :     fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, FD_VINYL_IO_FLAG_BLOCKING );
     348           0 :     FD_TEST( rd_req!=NULL );
     349           0 :     handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
     350           0 :     rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
     351           0 :     rd_req->seq = ULONG_MAX;
     352           0 :     rd_req->sz  = 0UL;
     353           0 :     free_i      = rd_req_ctx_get_idx( rd_req->ctx );
     354           0 :     ctx->vinyl.pending_rd_req_cnt--;
     355           0 :   }
     356           0 :   FD_CRIT( free_i<VINYL_LTHASH_RD_REQ_MAX, "read request free index exceeds max value" );
     357             : 
     358             :   /* Populate the empty slot and submit */
     359           0 :   fd_vinyl_bstream_phdr_t * in_phdr = &ctx->vinyl.pending.phdr[ free_i ];
     360           0 :   *in_phdr = *acc_hdr;
     361           0 :   ulong val_esz = fd_vinyl_bstream_ctl_sz( acc_hdr->ctl );
     362           0 :   ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
     363             : 
     364             :   /* Fixup io addressable range */
     365           0 :   fd_vinyl_io_t * io = ctx->vinyl.io;
     366           0 :   io->seq_past    = fd_ulong_align_dn( seq,         FD_VINYL_BSTREAM_BLOCK_SZ );
     367           0 :   io->seq_present = fd_ulong_align_up( seq+pair_sz, FD_VINYL_BSTREAM_BLOCK_SZ );
     368           0 :   if( io->type==FD_VINYL_IO_TYPE_UR ) {
     369           0 :     fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io;
     370           0 :     ur->seq_clean = ur->seq_cache = ur->seq_write = io->seq_present;
     371           0 :   }
     372             : 
     373           0 :   fd_vinyl_io_rd_t * rd_req  = &ctx->vinyl.pending.rd_req[ free_i ];
     374           0 :   rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_PEND );
     375           0 :   rd_req->seq = seq;
     376           0 :   rd_req->sz  = pair_sz;
     377           0 :   fd_vinyl_io_read( ctx->vinyl.io, rd_req );
     378           0 :   rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_SENT );
     379           0 :   ctx->vinyl.pending_rd_req_cnt++;
     380           0 : }
     381             : 
     382             : static void
     383           0 : handle_vinyl_lthash_request_ur_consume_all( fd_snaplh_t * ctx ) {
     384           0 :   while( ctx->vinyl.pending_rd_req_cnt ) {
     385           0 :     fd_vinyl_io_rd_t * rd_req = NULL;
     386           0 :     fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, FD_VINYL_IO_FLAG_BLOCKING );
     387           0 :     FD_TEST( rd_req!=NULL );
     388           0 :     handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
     389           0 :     rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
     390           0 :     rd_req->seq = ULONG_MAX;
     391           0 :     rd_req->sz  = 0UL;
     392           0 :     ctx->vinyl.pending_rd_req_cnt--;
     393           0 :   }
     394           0 :   FD_CRIT( !ctx->vinyl.pending_rd_req_cnt, "pending read requests count not zero" );
     395           0 :   for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
     396           0 :     fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
     397           0 :     FD_CRIT( rd_req_ctx_get_status( rd_req->ctx )==VINYL_LTHASH_RD_REQ_FREE, "pending request status is not free" );
     398           0 :   }
     399           0 : }
     400             : 
     401             : static void
     402             : handle_lthash_completion( fd_snaplh_t * ctx,
     403           0 :                           fd_stem_context_t * stem ) {
     404           0 :   if( FD_LIKELY( !ctx->lthash_completion_pending ) ) return;
     405             : 
     406           0 :   if( fd_seq_inc( ctx->wh_last_in_seq, 1UL )>=ctx->wh_finish_fseq ) {
     407           0 :     fd_lthash_adder_flush( ctx->adder, &ctx->running_lthash );
     408           0 :     fd_lthash_adder_flush( ctx->adder_sub, &ctx->running_lthash_sub );
     409           0 :     fd_lthash_sub( &ctx->running_lthash, &ctx->running_lthash_sub );
     410           0 :     uchar * lthash_out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk );
     411           0 :     fd_memcpy( lthash_out, &ctx->running_lthash, sizeof(fd_lthash_value_t) );
     412           0 :     fd_stem_publish( stem, 0UL, FD_SNAPSHOT_HASH_MSG_RESULT_ADD, ctx->out.chunk, FD_LTHASH_LEN_BYTES, 0UL, 0UL, 0UL );
     413           0 :     ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, FD_LTHASH_LEN_BYTES, ctx->out.chunk0, ctx->out.wmark );
     414           0 :     ctx->lthash_completion_pending = 0;
     415           0 :   }
     416           0 : }
     417             : 
     418             : static void
     419             : handle_fail_completion( fd_snaplh_t * ctx,
     420           0 :                   fd_stem_context_t * stem ) {
     421           0 :   if( FD_LIKELY( !ctx->fail_completion_pending ) ) return;
     422             : 
     423           0 :   if( fd_seq_inc( ctx->wh_last_in_seq, 1UL )>=ctx->wh_finish_fseq ) {
     424           0 :     fd_lthash_adder_flush( ctx->adder, &ctx->running_lthash );
     425           0 :     fd_lthash_adder_flush( ctx->adder_sub, &ctx->running_lthash_sub );
     426           0 :     fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     427           0 :     ctx->fail_completion_pending = 0;
     428           0 :   }
     429           0 : }
     430             : 
     431             : static void
     432             : before_credit( fd_snaplh_t *       ctx,
     433             :                fd_stem_context_t * stem FD_PARAM_UNUSED,
     434           0 :                int *               charge_busy ) {
     435           0 :   *charge_busy = !!consume_available_cqe( ctx );
     436           0 : }
     437             : 
     438             : static void
     439             : handle_wh_data_frag( fd_snaplh_t * ctx,
     440             :                      ulong         in_idx,
     441             :                      ulong         chunk,      /* compressed input pointer */
     442             :                      ulong         sz_comp,    /* compressed input size */
     443           0 :                      fd_stem_context_t * stem ) {
     444           0 :   FD_CRIT( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH, "incorrect in kind" );
     445             : 
     446           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
     447             :     /* skip all wh data frags when in error state. */
     448           0 :     return;
     449           0 :   }
     450           0 :   if( FD_UNLIKELY( ctx->fail_completion_pending ) ) {
     451             :     /* handle_fail_completion may succeed (complete) either when the
     452             :        control frag that triggers it is received (conditional upon
     453             :        having no pending wh data frags) or after all wh data frags have
     454             :        been received and processed.  Once the fail control message
     455             :        is received, the state transitions into idle. */
     456           0 :     handle_fail_completion( ctx, stem );
     457           0 :     return;
     458           0 :   }
     459             : 
     460           0 :   uchar const * rem    = fd_chunk_to_laddr_const( ctx->in[ in_idx ].base, chunk );
     461           0 :   ulong         rem_sz = sz_comp<<FD_VINYL_BSTREAM_BLOCK_LG_SZ;
     462           0 :   FD_CRIT( fd_ulong_is_aligned( (ulong)rem, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
     463           0 :   FD_CRIT( fd_ulong_is_aligned( rem_sz, FD_VINYL_BSTREAM_BLOCK_SZ ),     "misaligned write request" );
     464             : 
     465           0 :   while( rem_sz ) {
     466           0 :     FD_CRIT( rem_sz>=FD_VINYL_BSTREAM_BLOCK_SZ, "corrupted bstream block" );
     467             : 
     468           0 :     fd_vinyl_bstream_phdr_t const * phdr = (fd_vinyl_bstream_phdr_t *)rem;
     469           0 :     ulong ctl      = phdr->ctl;
     470           0 :     int   ctl_type = fd_vinyl_bstream_ctl_type( ctl );
     471           0 :     switch( ctl_type ) {
     472             : 
     473           0 :       case FD_VINYL_BSTREAM_CTL_TYPE_PAIR: {
     474           0 :         ulong val_esz = fd_vinyl_bstream_ctl_sz( ctl );
     475           0 :         ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
     476           0 :         if( FD_LIKELY( should_hash_account( ctx ) ) ) {
     477           0 :           uchar * pair = ctx->vinyl.pair_mem;
     478           0 :           fd_memcpy( pair, rem, pair_sz );
     479           0 :           streamlined_hash( ctx, ctx->adder, &ctx->running_lthash, pair );
     480           0 :         }
     481           0 :         rem    += pair_sz;
     482           0 :         rem_sz -= pair_sz;
     483           0 :         ctx->pairs_seen++;
     484           0 :         break;
     485           0 :       }
     486             : 
     487           0 :       case FD_VINYL_BSTREAM_CTL_TYPE_ZPAD: {
     488           0 :         rem    += FD_VINYL_BSTREAM_BLOCK_SZ;
     489           0 :         rem_sz -= FD_VINYL_BSTREAM_BLOCK_SZ;
     490           0 :         break;
     491           0 :       }
     492             : 
     493           0 :       default:
     494           0 :         FD_LOG_CRIT(( "unexpected vinyl bstream block ctl=%016lx", ctl ));
     495           0 :     }
     496           0 :   }
     497             : 
     498           0 :   if( ctx->state==FD_SNAPSHOT_STATE_FINISHING ) {
     499             :     /* handle_lthash_completion may succeed (complete) either when the
     500             :        control frag that triggers it is received (conditional upon
     501             :        having no pending wh data frags) or after all wh data frags have
     502             :        been received and processed. */
     503           0 :     handle_lthash_completion( ctx, stem );
     504           0 :   }
     505           0 : }
     506             : 
     507             : static void
     508             : handle_lv_data_frag( fd_snaplh_t * ctx,
     509             :                      ulong         in_idx,
     510           0 :                      ulong         chunk ) { /* compressed input pointer */
     511             : 
     512           0 :   if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
     513             :     /* skip all lv data frags when in error state. */
     514           0 :     return;
     515           0 :   }
     516           0 :   if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING  ) ) {
     517           0 :     FD_LOG_ERR(( "invalid state for lv data frag %u", ctx->state ));
     518           0 :     return;
     519           0 :   }
     520             : 
     521           0 :   if( FD_LIKELY( should_process_lthash_request( ctx ) ) ) {
     522           0 :     uchar const * indata = fd_chunk_to_laddr_const( ctx->in[ in_idx ].wksp, chunk );
     523           0 :     ulong seq;
     524           0 :     fd_vinyl_bstream_phdr_t acc_hdr[1];
     525           0 :     memcpy( &seq,    indata, sizeof(ulong) );
     526           0 :     memcpy( acc_hdr, indata + sizeof(ulong), sizeof(fd_vinyl_bstream_phdr_t) );
     527           0 :     if( FD_LIKELY( ctx->io_uring_enabled ) ) {
     528           0 :       handle_vinyl_lthash_request_ur( ctx, seq, acc_hdr );
     529           0 :     } else {
     530           0 :       handle_vinyl_lthash_request_bd( ctx, seq, acc_hdr );
     531           0 :     }
     532           0 :   }
     533           0 :   ctx->lthash_req_seen++;
     534           0 : }
     535             : 
     536             : static inline ulong
     537             : tsorig_tspub_to_fseq( ulong tsorig,
     538           0 :                       ulong tspub ) {
     539           0 :   return (tspub<<32 ) | tsorig;
     540           0 : }
     541             : 
     542             : static void
     543             : handle_control_frag( fd_snaplh_t * ctx,
     544             :                      ulong         sig,
     545             :                      ulong         tsorig,
     546             :                      ulong         tspub,
     547           0 :                     fd_stem_context_t * stem  ) {
     548           0 :   if( ctx->state==FD_SNAPSHOT_STATE_ERROR && sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) {
     549             :     /* Control messages move along the snapshot load pipeline.  Since
     550             :        error conditions can be triggered by any tile in the pipeline,
     551             :        it is possible to be in error state and still receive otherwise
     552             :        valid messages.  Only a fail message can revert this. */
     553           0 :     return;
     554           0 :   };
     555             : 
     556           0 :   switch( sig ) {
     557           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
     558           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_INCR: {
     559           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
     560           0 :       ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
     561           0 :       ctx->full  = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
     562           0 :       fd_lthash_zero( &ctx->running_lthash );
     563           0 :       fd_lthash_zero( &ctx->running_lthash_sub );
     564           0 :       fd_lthash_adder_new( ctx->adder );
     565           0 :       fd_lthash_adder_new( ctx->adder_sub );
     566           0 :       break;
     567           0 :     }
     568             : 
     569           0 :     case FD_SNAPSHOT_MSG_CTRL_FINI: {
     570           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING );
     571           0 :       ctx->state = FD_SNAPSHOT_STATE_FINISHING;
     572           0 :       ctx->wh_finish_fseq = tsorig_tspub_to_fseq( tsorig, tspub );
     573           0 :       if( FD_LIKELY( ctx->io_uring_enabled ) ) {
     574           0 :         handle_vinyl_lthash_request_ur_consume_all( ctx );
     575           0 :       }
     576           0 :       ctx->lthash_completion_pending = 1;
     577             :       /* handle_lthash_completion may succeed (complete) either here
     578             :          (if there are no pending wh data frags) or after all wh data
     579             :          frags have been received and processed. */
     580           0 :       handle_lthash_completion( ctx, stem );
     581           0 :       break;
     582           0 :     }
     583             : 
     584           0 :     case FD_SNAPSHOT_MSG_CTRL_NEXT:
     585           0 :     case FD_SNAPSHOT_MSG_CTRL_DONE: {
     586           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
     587           0 :       ctx->state = FD_SNAPSHOT_STATE_IDLE;
     588           0 :       break;
     589           0 :     }
     590             : 
     591           0 :     case FD_SNAPSHOT_MSG_CTRL_ERROR: {
     592           0 :       FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     593           0 :       ctx->state = FD_SNAPSHOT_STATE_ERROR;
     594           0 :       break;
     595           0 :     }
     596             : 
     597           0 :     case FD_SNAPSHOT_MSG_CTRL_FAIL: {
     598           0 :       FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     599           0 :       ctx->state = FD_SNAPSHOT_STATE_IDLE;
     600           0 :       ctx->wh_finish_fseq = tsorig_tspub_to_fseq( tsorig, tspub );
     601           0 :       if( FD_LIKELY( ctx->io_uring_enabled ) ) {
     602           0 :         handle_vinyl_lthash_request_ur_consume_all( ctx );
     603           0 :       }
     604           0 :       ctx->fail_completion_pending = 1;
     605             :       /* handle_fail_completion may succeed (complete) either here (if
     606             :          there are no pending wh data frags) or after all wh data frags
     607             :          have been received and processed. */
     608           0 :       handle_fail_completion( ctx, stem );
     609           0 :       break;
     610           0 :     }
     611             : 
     612           0 :     case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
     613           0 :       FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
     614           0 :       ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
     615           0 :       break;
     616             : 
     617           0 :     default: {
     618           0 :       FD_LOG_ERR(( "unexpected control sig %lu", sig ));
     619           0 :       break;
     620           0 :     }
     621           0 :   }
     622           0 : }
     623             : 
     624             : static inline int
     625             : returnable_frag( fd_snaplh_t *       ctx,
     626             :                  ulong               in_idx,
     627             :                  ulong               seq,
     628             :                  ulong               sig,
     629             :                  ulong               chunk,
     630             :                  ulong               sz,
     631             :                  ulong               ctl,
     632             :                  ulong               tsorig,
     633             :                  ulong               tspub,
     634           0 :                  fd_stem_context_t * stem ) {
     635           0 :   (void)sz; (void)ctl;
     636           0 :   FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
     637             : 
     638           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH ) )          handle_wh_data_frag( ctx, in_idx, chunk, tsorig, stem );
     639           0 :   else if( FD_UNLIKELY( sig==FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH ) ) handle_lv_data_frag( ctx, in_idx, chunk );
     640           0 :   else                                                               handle_control_frag( ctx, sig, tsorig, tspub, stem );
     641             : 
     642             :   /* Because fd_stem may not return flow control credits fast enough,
     643             :      always update fseq (consumer progress) here. */
     644           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH ) ) {
     645           0 :     ctx->wh_last_in_seq = seq;
     646           0 :     fd_fseq_update( ctx->in[ in_idx ].seq_sync, fd_seq_inc( seq, 1UL ) );
     647           0 :   }
     648             : 
     649           0 :   return 0;
     650           0 : }
     651             : 
     652             : static ulong
     653             : populate_allowed_fds( fd_topo_t      const * topo FD_PARAM_UNUSED,
     654             :                       fd_topo_tile_t const * tile FD_PARAM_UNUSED,
     655             :                       ulong                  out_fds_cnt,
     656           0 :                       int *                  out_fds ) {
     657           0 :   if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
     658             : 
     659           0 :   ulong out_cnt = 0;
     660           0 :   out_fds[ out_cnt++ ] = 2UL; /* stderr */
     661           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
     662           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     663           0 :   }
     664             : 
     665           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     666           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     667           0 :   fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
     668             : 
     669           0 :   out_fds[ out_cnt++ ] = ctx->vinyl.dev_fd;
     670             : 
     671           0 :   if( FD_LIKELY( ctx->ioring->ioring_fd>=0 ) ) {
     672           0 :     out_fds[ out_cnt++ ] = ctx->ioring->ioring_fd;
     673           0 :   }
     674             : 
     675           0 :   return out_cnt;
     676           0 : }
     677             : 
     678             : static void
     679           0 : during_housekeeping( fd_snaplh_t * ctx ) {
     680             : 
     681             :   /* Service io_uring instance */
     682             : 
     683           0 :   if( FD_LIKELY( ctx->io_uring_enabled ) ) {
     684           0 :     uint sq_drops = fd_io_uring_sq_dropped( ctx->ioring->sq );
     685           0 :     if( FD_UNLIKELY( sq_drops ) ) {
     686           0 :       FD_LOG_CRIT(( "kernel io_uring dropped I/O requests, cannot continue (sq_dropped=%u)", sq_drops ));
     687           0 :     }
     688             : 
     689           0 :     uint cq_drops = fd_io_uring_cq_overflow( ctx->ioring->cq );
     690           0 :     if( FD_UNLIKELY( cq_drops ) ) {
     691           0 :       FD_LOG_CRIT(( "kernel io_uring dropped I/O completions, cannot continue (cq_overflow=%u)", cq_drops ));
     692           0 :     }
     693           0 :   }
     694             : 
     695           0 : }
     696             : 
     697             : static ulong
     698             : populate_allowed_seccomp( fd_topo_t const *      topo,
     699             :                           fd_topo_tile_t const * tile,
     700             :                           ulong                  out_cnt,
     701           0 :                           struct sock_filter *   out ) {
     702           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     703           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     704           0 :   fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
     705             : 
     706           0 :   populate_sock_filter_policy_fd_snaplh_tile( out_cnt, out,
     707           0 :       (uint)fd_log_private_logfile_fd(),
     708           0 :       (uint)ctx->vinyl.dev_fd,
     709           0 :       (uint)ctx->ioring->ioring_fd /* possibly -1 */ );
     710           0 :   return sock_filter_policy_fd_snaplh_tile_instr_cnt;
     711           0 : }
     712             : 
     713             : static fd_vinyl_io_t *
     714             : snaplh_io_uring_init( fd_snaplh_t * ctx,
     715             :                       void *        uring_shmem,
     716             :                       void *        vinyl_io_ur_mem,
     717           0 :                       int           dev_fd ) {
     718           0 :   ulong const uring_depth = VINYL_LTHASH_IORING_DEPTH;
     719           0 :   fd_io_uring_params_t params[1];
     720           0 :   fd_io_uring_params_init( params, uring_depth );
     721             : 
     722           0 :   if( FD_UNLIKELY( !fd_io_uring_init_shmem( ctx->ioring, params, uring_shmem, uring_depth, uring_depth ) ) ) {
     723           0 :     FD_LOG_ERR(( "fd_io_uring_init_shmem failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     724           0 :   }
     725           0 :   fd_io_uring_t * ioring = ctx->ioring;
     726             : 
     727           0 :   if( FD_UNLIKELY( fd_io_uring_register_files( ioring->ioring_fd, &dev_fd, 1 )<0 ) ) {
     728           0 :     FD_LOG_ERR(( "io_uring_register_files failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     729           0 :   }
     730             : 
     731           0 :   fd_io_uring_restriction_t res[3] = {
     732           0 :     { .opcode    = FD_IORING_RESTRICTION_SQE_OP,
     733           0 :       .sqe_op    = IORING_OP_READ },
     734           0 :     { .opcode    = FD_IORING_RESTRICTION_SQE_FLAGS_REQUIRED,
     735           0 :       .sqe_flags = IOSQE_FIXED_FILE },
     736           0 :     { .opcode    = FD_IORING_RESTRICTION_SQE_FLAGS_ALLOWED,
     737           0 :       .sqe_flags = 0 }
     738           0 :   };
     739           0 :   if( FD_UNLIKELY( fd_io_uring_register_restrictions( ioring->ioring_fd, res, 3U )<0 ) ) {
     740           0 :     FD_LOG_ERR(( "io_uring_register_restrictions failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     741           0 :   }
     742             : 
     743           0 :   if( FD_UNLIKELY( fd_io_uring_enable_rings( ioring->ioring_fd )<0 ) ) {
     744           0 :     FD_LOG_ERR(( "io_uring_enable_rings failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     745           0 :   }
     746             : 
     747           0 :   ulong align = fd_vinyl_io_ur_align();
     748           0 :   FD_TEST( fd_ulong_is_pow2( align ) );
     749             : 
     750           0 :   ulong footprint = fd_vinyl_io_ur_footprint( VINYL_LTHASH_IO_SPAD_MAX );
     751           0 :   FD_TEST( fd_ulong_is_aligned( footprint, align ) );
     752             : 
     753             :   /* Before invoking fd_vinyl_io_ur_init, the sync block must be
     754             :      already available.  Although in principle one could keep
     755             :      calling fd_vinyl_io_ur_init until it returns !=NULL, doing this
     756             :      would log uncessary (and misleading) warnings. */
     757           0 :   FD_LOG_INFO(( "waiting for account database creation" ));
     758           0 :   for(;;) {
     759           0 :     fd_vinyl_bstream_block_t block[1];
     760           0 :     ulong dev_sync = 0UL; /* Use the beginning of the file for the sync block */
     761           0 :     bd_read( dev_fd, dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ );
     762           0 :     int type = fd_vinyl_bstream_ctl_type( block->sync.ctl );
     763           0 :     if( FD_UNLIKELY( type != FD_VINYL_BSTREAM_CTL_TYPE_SYNC ) ) continue;
     764           0 :     ulong io_seed = block->sync.hash_trail;
     765           0 :     if( FD_LIKELY( !fd_vinyl_bstream_block_test( io_seed, block ) ) ) break;
     766           0 :     fd_log_sleep( 1e6 ); /* 1ms */
     767           0 :   }
     768           0 :   FD_LOG_INFO(( "found valid account database sync block, attaching ..." ));
     769             : 
     770           0 :   fd_vinyl_io_t * io = fd_vinyl_io_ur_init( vinyl_io_ur_mem, VINYL_LTHASH_IO_SPAD_MAX, dev_fd, ioring );
     771           0 :   if( FD_UNLIKELY( !io ) ) FD_LOG_ERR(( "vinyl_io_ur_init failed" ));
     772           0 :   return io;
     773           0 : }
     774             : 
     775             : static void
     776             : privileged_init( fd_topo_t *      topo,
     777           0 :                  fd_topo_tile_t * tile ) {
     778           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     779             : 
     780           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     781           0 :   fd_snaplh_t * ctx  = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t),      sizeof(fd_snaplh_t)                                );
     782           0 :   void * pair_mem    = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN,  VINYL_LTHASH_BLOCK_MAX_SZ                          ); (void)pair_mem;
     783           0 :   void * pair_tmp    = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN,  VINYL_LTHASH_BLOCK_MAX_SZ                          ); (void)pair_tmp;
     784           0 :   void * rd_req_mem  = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN,  VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ  ); (void)rd_req_mem;
     785           0 :   void * uring_mem   = FD_SCRATCH_ALLOC_APPEND( l, fd_vinyl_io_ur_align(),    fd_vinyl_io_ur_footprint(VINYL_LTHASH_IO_SPAD_MAX) );
     786           0 :   void * uring_shmem = FD_SCRATCH_ALLOC_APPEND( l, fd_io_uring_shmem_align(), fd_io_uring_shmem_footprint( VINYL_LTHASH_IORING_DEPTH, VINYL_LTHASH_IORING_DEPTH ) );
     787             : 
     788           0 :   FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
     789             : 
     790             :   /* Set up io_bd dependencies */
     791             : 
     792           0 :   char const * bstream_path = tile->snaplh.vinyl_path;
     793             :   /* Note: it would be possible to use O_DIRECT, but it would require
     794             :      VINYL_LTHASH_BLOCK_ALIGN to be 4096UL, which substantially
     795             :      increases the read overhead, making it slower (keep in mind that
     796             :      a rather large subset of mainnet accounts typically fits inside
     797             :      one FD_VINYL_BSTREAM_BLOCK_SZ. */
     798           0 :   int dev_fd = open( bstream_path, O_RDONLY|O_CLOEXEC, 0444 );
     799           0 :   if( FD_UNLIKELY( dev_fd<0 ) ) {
     800           0 :     FD_LOG_ERR(( "open(%s,O_RDONLY|O_CLOEXEC, 0444) failed (%i-%s)",
     801           0 :                  bstream_path, errno, fd_io_strerror( errno ) ));
     802           0 :   }
     803             : 
     804           0 :   struct stat st;
     805           0 :   if( FD_UNLIKELY( 0!=fstat( dev_fd, &st ) ) ) FD_LOG_ERR(( "fstat(%s) failed (%i-%s)", bstream_path, errno, strerror( errno ) ));
     806             : 
     807           0 :   ctx->vinyl.dev_fd  = dev_fd;
     808           0 :   ulong bstream_sz   = (ulong)st.st_size;
     809           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( bstream_sz, FD_VINYL_BSTREAM_BLOCK_SZ ) ) ) {
     810           0 :     FD_LOG_ERR(( "vinyl file %s has misaligned size (%lu bytes)", bstream_path, bstream_sz ));
     811           0 :   }
     812           0 :   ctx->vinyl.dev_sz   = bstream_sz;
     813           0 :   ctx->vinyl.dev_base = FD_VINYL_BSTREAM_BLOCK_SZ;
     814             : 
     815           0 :   ctx->vinyl.io = NULL;
     816           0 :   ctx->ioring->ioring_fd = -1;
     817             : 
     818           0 :   if( FD_LIKELY( tile->snaplh.io_uring_enabled ) ) {
     819           0 :     ctx->vinyl.io = snaplh_io_uring_init( ctx, uring_shmem, uring_mem, dev_fd );
     820           0 :   }
     821           0 :   ctx->io_uring_enabled = tile->snaplh.io_uring_enabled;
     822           0 : }
     823             : 
     824             : static void
     825             : unprivileged_init( fd_topo_t *      topo,
     826           0 :                    fd_topo_tile_t * tile ) {
     827           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     828             : 
     829           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     830           0 :   fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t),     sizeof(fd_snaplh_t)                               );
     831           0 :   void *   pair_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ                         );
     832           0 :   void *   pair_tmp = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ                         );
     833           0 :   void * rd_req_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ );
     834             : 
     835           0 :   FD_TEST( fd_topo_tile_name_cnt( topo, "snaplh" )<=FD_SNAPSHOT_MAX_SNAPLH_TILES );
     836             : 
     837           0 :   ctx->vinyl.pair_mem = pair_mem;
     838           0 :   ctx->vinyl.pair_tmp = pair_tmp;
     839             : 
     840           0 :   if( FD_UNLIKELY( tile->in_cnt!=IN_CNT_MAX ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected %lu", tile->in_cnt, IN_CNT_MAX ));
     841           0 :   if( FD_UNLIKELY( tile->out_cnt!=1UL       ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 1",  tile->out_cnt            ));
     842             : 
     843           0 :   ctx->io_seed = NULL;
     844             : 
     845           0 :   for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
     846           0 :     fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
     847           0 :     fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
     848           0 :     if( FD_LIKELY( 0==strcmp( in_link->name, "snaplv_lh" ) ) ) {
     849           0 :       ctx->in[ i ].wksp     = in_wksp->wksp;
     850           0 :       ctx->in[ i ].chunk0   = fd_dcache_compact_chunk0( ctx->in[ i ].wksp, in_link->dcache );
     851           0 :       ctx->in[ i ].wmark    = fd_dcache_compact_wmark( ctx->in[ i ].wksp, in_link->dcache, in_link->mtu );
     852           0 :       ctx->in[ i ].mtu      = in_link->mtu;
     853           0 :       ctx->in[ i ].base     = NULL;
     854           0 :       ctx->in[ i ].seq_sync = NULL;
     855           0 :       ctx->in_kind[ i ]     = IN_KIND_SNAPLV;
     856           0 :     } else if( FD_LIKELY( 0==strcmp( in_link->name, "snapwh_wr" ) ) ) {
     857           0 :       ctx->in[ i ].wksp     = in_wksp->wksp;
     858           0 :       ctx->in[ i ].chunk0   = 0;
     859           0 :       ctx->in[ i ].wmark    = 0;
     860           0 :       ctx->in[ i ].mtu      = 0;
     861           0 :       ctx->in[ i ].base     = fd_dcache_join( fd_topo_obj_laddr( topo, tile->snaplh.dcache_obj_id ) );
     862           0 :       ctx->in[ i ].seq_sync = tile->in_link_fseq[ i ];
     863           0 :       ctx->wh_last_in_seq   = fd_fseq_query( tile->in_link_fseq[ i ] );
     864           0 :       ctx->in_kind[ i ]     = IN_KIND_SNAPWH;
     865           0 :       ctx->io_seed          = (ulong const *)fd_dcache_app_laddr_const( ctx->in[ i ].base );
     866           0 :       FD_TEST( ctx->in[ i ].base );
     867           0 :     } else {
     868           0 :       FD_LOG_ERR(( "tile `" NAME "` has unexpected in link name `%s`", in_link->name ));
     869           0 :     }
     870           0 :   }
     871             : 
     872           0 :   FD_TEST( ctx->io_seed );
     873             : 
     874           0 :   fd_topo_link_t * out_link = &topo->links[ tile->out_link_id[ 0UL ] ];
     875           0 :   ctx->out.wksp    = topo->workspaces[ topo->objs[ out_link->dcache_obj_id ].wksp_id ].wksp;
     876           0 :   ctx->out.chunk0  = fd_dcache_compact_chunk0( fd_wksp_containing( out_link->dcache ), out_link->dcache );
     877           0 :   ctx->out.wmark   = fd_dcache_compact_wmark ( ctx->out.wksp, out_link->dcache, out_link->mtu );
     878           0 :   ctx->out.chunk   = ctx->out.chunk0;
     879           0 :   ctx->out.mtu     = out_link->mtu;
     880           0 :   FD_TEST( 0==strcmp( out_link->name, "snaplh_lv" ) );
     881             : 
     882           0 :   fd_lthash_adder_new( ctx->adder );
     883           0 :   fd_lthash_adder_new( ctx->adder_sub );
     884             : 
     885           0 :   ctx->metrics.full.accounts_hashed        = 0UL;
     886           0 :   ctx->metrics.incremental.accounts_hashed = 0UL;
     887             : 
     888           0 :   memset( ctx->vinyl.pending.phdr,   0, sizeof(fd_vinyl_bstream_phdr_t) * VINYL_LTHASH_RD_REQ_MAX );
     889           0 :   memset( ctx->vinyl.pending.rd_req, 0, sizeof(fd_vinyl_io_rd_t)        * VINYL_LTHASH_RD_REQ_MAX );
     890           0 :   for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
     891           0 :     fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
     892           0 :     rd_req->ctx = rd_req_ctx_from_parts( i, VINYL_LTHASH_RD_REQ_FREE );
     893           0 :     rd_req->dst = NULL;
     894           0 :     if( rd_req_mem!=NULL ) {
     895           0 :       rd_req->dst = ((uchar*)rd_req_mem) + i*VINYL_LTHASH_BLOCK_MAX_SZ;
     896           0 :     }
     897           0 :   }
     898           0 :   ctx->vinyl.pending_rd_req_cnt = 0UL;
     899             : 
     900           0 :   ctx->state                   = FD_SNAPSHOT_STATE_IDLE;
     901           0 :   ctx->full                    = 1;
     902           0 :   ctx->lthash_tile_cnt         = fd_topo_tile_name_cnt( topo, "snaplh" );
     903           0 :   ctx->lthash_tile_idx         = tile->kind_id;
     904             :   /* This may seem redundant, but it provides flexibility around which
     905             :      tiles and do addition and subtraction of lthash. */
     906           0 :   ctx->lthash_tile_add_cnt     = ctx->lthash_tile_cnt;
     907           0 :   ctx->lthash_tile_sub_cnt     = ctx->lthash_tile_cnt;
     908           0 :   ctx->lthash_tile_add_idx     = ctx->lthash_tile_idx;
     909           0 :   ctx->lthash_tile_sub_idx     = ctx->lthash_tile_idx;
     910           0 :   if( ctx->lthash_tile_add_idx != ULONG_MAX ) FD_TEST( ctx->lthash_tile_add_idx < ctx->lthash_tile_add_cnt );
     911           0 :   if( ctx->lthash_tile_sub_idx != ULONG_MAX ) FD_TEST( ctx->lthash_tile_sub_idx < ctx->lthash_tile_sub_cnt );
     912           0 :   ctx->pairs_seen              = 0UL;
     913           0 :   ctx->lthash_req_seen         = 0UL;
     914           0 :   fd_lthash_zero( &ctx->running_lthash );
     915           0 :   fd_lthash_zero( &ctx->running_lthash_sub );
     916             : 
     917           0 :   ulong vinyl_admin_obj_id = fd_pod_query_ulong( topo->props, "vinyl_admin", ULONG_MAX );
     918           0 :   FD_TEST( vinyl_admin_obj_id!=ULONG_MAX );
     919           0 :   fd_vinyl_admin_t * vinyl_admin = fd_vinyl_admin_join( fd_topo_obj_laddr( topo, vinyl_admin_obj_id ) );
     920           0 :   FD_TEST( vinyl_admin );
     921           0 :   ctx->vinyl.admin = vinyl_admin;
     922           0 :   for(;;) {
     923             :     /* This query can be done without the need of an rwlock. */
     924           0 :     ulong vinyl_admin_status = fd_vinyl_admin_ulong_query( &vinyl_admin->status );
     925           0 :     if( FD_LIKELY( vinyl_admin_status!=FD_VINYL_ADMIN_STATUS_INIT_PENDING &&
     926           0 :                    vinyl_admin_status!=FD_VINYL_ADMIN_STATUS_ERROR ) ) break;
     927           0 :     fd_log_sleep( (long)1e6 /*1ms*/ );
     928           0 :     FD_SPIN_PAUSE();
     929           0 :   }
     930             : 
     931           0 :   ctx->lthash_completion_pending = 0;
     932           0 :   ctx->fail_completion_pending   = 0;
     933           0 : }
     934             : 
     935           0 : #define STEM_BURST 1UL
     936           0 : #define STEM_LAZY  1000L
     937             : 
     938           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_snaplh_t
     939           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaplh_t)
     940             : 
     941             : #define STEM_CALLBACK_SHOULD_SHUTDOWN     should_shutdown
     942           0 : #define STEM_CALLBACK_METRICS_WRITE       metrics_write
     943           0 : #define STEM_CALLBACK_RETURNABLE_FRAG     returnable_frag
     944           0 : #define STEM_CALLBACK_BEFORE_CREDIT       before_credit
     945           0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
     946             : 
     947             : #include "../../disco/stem/fd_stem.c"
     948             : 
     949             : fd_topo_run_tile_t fd_tile_snaplh = {
     950             :   .name                     = NAME,
     951             :   .populate_allowed_fds     = populate_allowed_fds,
     952             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     953             :   .scratch_align            = scratch_align,
     954             :   .scratch_footprint        = scratch_footprint,
     955             :   .privileged_init          = privileged_init,
     956             :   .unprivileged_init        = unprivileged_init,
     957             :   .run                      = stem_run,
     958             : };
     959             : 
     960             : #undef NAME

Generated by: LCOV version 1.14