LCOV - code coverage report
Current view: top level - discof/restore - fd_snapct_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 1121 0.0 %
Date: 2026-02-13 06:06:24 Functions: 0 28 0.0 %

          Line data    Source code
       1             : #include "fd_snapct_tile.h"
       2             : #include "utils/fd_ssping.h"
       3             : #include "utils/fd_ssctrl.h"
       4             : #include "utils/fd_ssarchive.h"
       5             : #include "utils/fd_http_resolver.h"
       6             : #include "utils/fd_ssmsg.h"
       7             : 
       8             : #include "../../disco/topo/fd_topo.h"
       9             : #include "../../disco/metrics/fd_metrics.h"
      10             : #include "../../flamenco/gossip/fd_gossip_types.h"
      11             : #include "../../waltz/openssl/fd_openssl_tile.h"
      12             : 
      13             : #include <errno.h>
      14             : #include <stdio.h>
      15             : #include <fcntl.h>
      16             : #include <unistd.h>
      17             : #include <sys/stat.h>
      18             : #include <netinet/tcp.h>
      19             : #include <netinet/in.h>
      20             : 
      21             : #include "generated/fd_snapct_tile_seccomp.h"
      22             : 
      23             : #define NAME "snapct"
      24             : 
      25             : /* FIXME: Implement full_effective_age_cancel_threshold */
      26             : /* FIXME: Add more timeout config options and have consistent behavior */
      27             : /* FIXME: Do a finishing pass over the default.toml config options / comments */
      28             : /* FIXME: Improve behavior when using incremental_snapshots = false */
      29             : /* FIXME: Handle cases where no explicitly allowed peers advertise RPC */
      30             : /* FIXME: Make the code more strict about duplicate IP:port's */
      31             : 
      32           0 : #define GOSSIP_PEERS_MAX (FD_CONTACT_INFO_TABLE_SIZE)
      33           0 : #define SERVER_PEERS_MAX (FD_TOPO_SNAPSHOTS_SERVERS_MAX_RESOLVED)
      34           0 : #define TOTAL_PEERS_MAX  (GOSSIP_PEERS_MAX + SERVER_PEERS_MAX)
      35             : 
      36           0 : #define IN_KIND_ACK    (0)
      37           0 : #define IN_KIND_SNAPLD (1)
      38           0 : #define IN_KIND_GOSSIP (2)
      39             : #define MAX_IN_LINKS   (3)
      40             : 
      41           0 : #define TEMP_FULL_SNAP_NAME ".snapshot.tar.bz2-partial"
      42           0 : #define TEMP_INCR_SNAP_NAME ".incremental-snapshot.tar.bz2-partial"
      43             : 
      44             : struct fd_snapct_out_link {
      45             :   ulong       idx;
      46             :   fd_wksp_t * mem;
      47             :   ulong       chunk0;
      48             :   ulong       wmark;
      49             :   ulong       chunk;
      50             :   ulong       mtu;
      51             : };
      52             : typedef struct fd_snapct_out_link fd_snapct_out_link_t;
      53             : 
      54             : #define FD_SNAPCT_GOSSIP_FRESH_DEADLINE_NANOS      (10L*1000L*1000L*1000L)    /* gossip contact info is pushed every ~7.5 seconds */
      55           0 : #define FD_SNAPCT_GOSSIP_SATURATION_CHECK_INTERVAL (      10L*1000L*1000L)
      56           0 : #define FD_SNAPCT_GOSSIP_SATURATION_THRESHOLD      (0.05)                     /* 5% fresh peers */
      57             : 
      58           0 : #define FD_SNAPCT_COLLECTING_PEERS_TIMEOUT         (2L*60L*1000L*1000L*1000L) /* 2 minutes */
      59           0 : #define FD_SNAPCT_WAITING_FOR_PEERS_TIMEOUT        (30L*1000L*1000L*1000L)    /* 30 seconds */
      60             : 
      61             : struct gossip_ci_entry {
      62             :   fd_pubkey_t   pubkey;
      63             :   int           allowed;
      64             :   fd_ip4_port_t rpc_addr;
      65             :   long          added_nanos;
      66             :   ulong         map_next;
      67             : };
      68             : typedef struct gossip_ci_entry gossip_ci_entry_t;
      69             : 
      70             : #define MAP_NAME               gossip_ci_map
      71           0 : #define MAP_KEY                pubkey
      72             : #define MAP_ELE_T              gossip_ci_entry_t
      73             : #define MAP_KEY_T              fd_pubkey_t
      74           0 : #define MAP_NEXT               map_next
      75           0 : #define MAP_KEY_EQ(k0,k1)      fd_pubkey_eq( k0, k1 )
      76           0 : #define MAP_KEY_HASH(key,seed) fd_hash( seed, key, sizeof(fd_pubkey_t) )
      77             : #include "../../util/tmpl/fd_map_chain.c"
      78             : 
      79             : struct fd_snapct_tile {
      80             :   struct fd_topo_tile_snapct config;
      81             :   int                        gossip_enabled;
      82             :   int                        download_enabled;
      83             : 
      84             :   fd_ssping_t *          ssping;
      85             :   fd_http_resolver_t *   ssresolver;
      86             :   fd_sspeer_selector_t * selector;
      87             : 
      88             :   int           state;
      89             :   int           malformed;
      90             :   long          deadline_nanos;
      91             :   int           flush_ack;
      92             :   fd_sspeer_t   peer;
      93             : 
      94             :   struct {
      95             :     int dir_fd;
      96             :     int full_snapshot_fd;
      97             :     int incremental_snapshot_fd;
      98             :   } local_out;
      99             : 
     100             :   char http_full_snapshot_name[ PATH_MAX ];
     101             :   char http_incr_snapshot_name[ PATH_MAX ];
     102             : 
     103             :   fd_wksp_t const * gossip_in_mem;
     104             :   fd_wksp_t const * snapld_in_mem;
     105             :   uchar             in_kind[ MAX_IN_LINKS ];
     106             : 
     107             :   struct {
     108             :     ulong full_slot;
     109             :     ulong slot;
     110             :     int   pending;
     111             :   } predicted_incremental;
     112             : 
     113             :   struct {
     114             :     ulong full_snapshot_slot;
     115             :     char  full_snapshot_path[ PATH_MAX ];
     116             :     ulong full_snapshot_size;
     117             :     int   full_snapshot_zstd;
     118             : 
     119             :     ulong incremental_snapshot_slot;
     120             :     char  incremental_snapshot_path[ PATH_MAX ];
     121             :     ulong incremental_snapshot_size;
     122             :     int   incremental_snapshot_zstd;
     123             :   } local_in;
     124             : 
     125             :   struct {
     126             :     struct {
     127             :       ulong bytes_read;
     128             :       ulong bytes_written;
     129             :       ulong bytes_total;
     130             :       uint  num_retries;
     131             :     } full;
     132             : 
     133             :     struct {
     134             :       ulong bytes_read;
     135             :       ulong bytes_written;
     136             :       ulong bytes_total;
     137             :       uint  num_retries;
     138             :     } incremental;
     139             :   } metrics;
     140             : 
     141             :   struct {
     142             :     gossip_ci_entry_t * ci_table;  /* flat array of all gossip entries, allowed or not */
     143             :     gossip_ci_map_t *   ci_map;    /* map from pubkey to only allowed gossip entries */
     144             :     ulong               fresh_cnt;
     145             :     ulong               total_cnt;
     146             :     int                 saturated;
     147             :     long                next_saturated_check;
     148             :   } gossip;
     149             : 
     150             :   fd_snapct_out_link_t out_ld;
     151             :   fd_snapct_out_link_t out_gui;
     152             :   fd_snapct_out_link_t out_rp;
     153             : };
     154             : typedef struct fd_snapct_tile fd_snapct_tile_t;
     155             : 
     156             : static int
     157           0 : gossip_enabled( fd_topo_tile_t const * tile ) {
     158           0 :   return tile->snapct.sources.gossip.allow_any || tile->snapct.sources.gossip.allow_list_cnt>0UL;
     159           0 : }
     160             : 
     161             : static int
     162           0 : download_enabled( fd_topo_tile_t const * tile ) {
     163           0 :   return gossip_enabled( tile ) || tile->snapct.sources.servers_cnt>0UL;
     164           0 : }
     165             : 
     166             : FD_FN_CONST static inline ulong
     167           0 : loose_footprint( fd_topo_tile_t const * tile ) {
     168           0 :   (void)tile;
     169             :   /* Leftover space for OpenSSL allocations */
     170           0 :   return 1<<26UL; /* 64 MiB */
     171           0 : }
     172             : 
     173             : static ulong
     174           0 : scratch_align( void ) {
     175           0 :   return fd_ulong_max( alignof(fd_snapct_tile_t),
     176           0 :          fd_ulong_max( fd_ssping_align(),
     177           0 :          fd_ulong_max( alignof(gossip_ci_entry_t),
     178           0 :          fd_ulong_max( gossip_ci_map_align(),
     179           0 :          fd_ulong_max( fd_http_resolver_align(),
     180           0 :                        fd_sspeer_selector_align() ) ) ) ) );
     181           0 : }
     182             : 
     183             : static ulong
     184           0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
     185           0 :   ulong l = FD_LAYOUT_INIT;
     186           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_snapct_tile_t),  sizeof(fd_snapct_tile_t)                                                   );
     187           0 :   l = FD_LAYOUT_APPEND( l, fd_ssping_align(),          fd_ssping_footprint( TOTAL_PEERS_MAX )                                     );
     188           0 :   l = FD_LAYOUT_APPEND( l, alignof(gossip_ci_entry_t), sizeof(gossip_ci_entry_t) * GOSSIP_PEERS_MAX                               );
     189           0 :   l = FD_LAYOUT_APPEND( l, gossip_ci_map_align(),      gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
     190           0 :   l = FD_LAYOUT_APPEND( l, fd_http_resolver_align(),   fd_http_resolver_footprint( SERVER_PEERS_MAX )                             );
     191           0 :   l = FD_LAYOUT_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( TOTAL_PEERS_MAX )                            );
     192           0 :   l = FD_LAYOUT_APPEND( l, fd_alloc_align(),           fd_alloc_footprint()                                                       );
     193           0 :   return FD_LAYOUT_FINI( l, scratch_align() );
     194           0 : }
     195             : 
     196             : static inline int
     197           0 : should_shutdown( fd_snapct_tile_t * ctx ) {
     198           0 :   return ctx->state==FD_SNAPCT_STATE_SHUTDOWN;
     199           0 : }
     200             : 
     201             : static void
     202           0 : during_housekeeping( fd_snapct_tile_t * ctx ) {
     203           0 :   long now = fd_log_wallclock();
     204             : 
     205           0 :   if( FD_UNLIKELY( !ctx->gossip.saturated && now>ctx->gossip.next_saturated_check ) ) {
     206           0 :     ctx->gossip.next_saturated_check = now + FD_SNAPCT_GOSSIP_SATURATION_CHECK_INTERVAL;
     207             : 
     208           0 :     ulong fresh_cnt = 0UL;
     209           0 :     ulong total_cnt = 0UL;
     210           0 :     for( gossip_ci_map_iter_t iter = gossip_ci_map_iter_init( ctx->gossip.ci_map, ctx->gossip.ci_table );
     211           0 :          !gossip_ci_map_iter_done( iter, ctx->gossip.ci_map, ctx->gossip.ci_table );
     212           0 :          iter = gossip_ci_map_iter_next( iter, ctx->gossip.ci_map, ctx->gossip.ci_table ) ) {
     213           0 :       gossip_ci_entry_t const * ci_entry = gossip_ci_map_iter_ele_const( iter, ctx->gossip.ci_map, ctx->gossip.ci_table );
     214           0 :       if( FD_UNLIKELY( ci_entry->added_nanos>(now-FD_SNAPCT_GOSSIP_FRESH_DEADLINE_NANOS) ) ) fresh_cnt++;
     215           0 :       total_cnt++;
     216           0 :     }
     217           0 :     ctx->gossip.fresh_cnt = fresh_cnt;
     218           0 :     ctx->gossip.total_cnt = total_cnt;
     219             : 
     220           0 :     if( total_cnt!=0UL && total_cnt==ctx->config.sources.gossip.allow_list_cnt ) ctx->gossip.saturated = 1;
     221           0 :     else {
     222           0 :       double fresh = total_cnt ? (double)fresh_cnt/(double)total_cnt : 1.0;
     223           0 :       ctx->gossip.saturated = fresh<FD_SNAPCT_GOSSIP_SATURATION_THRESHOLD;
     224           0 :     }
     225           0 :   }
     226           0 : }
     227             : 
     228             : static void
     229           0 : metrics_write( fd_snapct_tile_t * ctx ) {
     230           0 :   FD_MGAUGE_SET( SNAPCT, FULL_BYTES_READ,               ctx->metrics.full.bytes_read );
     231           0 :   FD_MGAUGE_SET( SNAPCT, FULL_BYTES_WRITTEN,            ctx->metrics.full.bytes_written );
     232           0 :   FD_MGAUGE_SET( SNAPCT, FULL_BYTES_TOTAL,              ctx->metrics.full.bytes_total );
     233           0 :   FD_MGAUGE_SET( SNAPCT, FULL_DOWNLOAD_RETRIES,         ctx->metrics.full.num_retries );
     234             : 
     235           0 :   FD_MGAUGE_SET( SNAPCT, INCREMENTAL_BYTES_READ,        ctx->metrics.incremental.bytes_read );
     236           0 :   FD_MGAUGE_SET( SNAPCT, INCREMENTAL_BYTES_WRITTEN,     ctx->metrics.incremental.bytes_written );
     237           0 :   FD_MGAUGE_SET( SNAPCT, INCREMENTAL_BYTES_TOTAL,       ctx->metrics.incremental.bytes_total );
     238           0 :   FD_MGAUGE_SET( SNAPCT, INCREMENTAL_DOWNLOAD_RETRIES,  ctx->metrics.incremental.num_retries );
     239             : 
     240           0 :   FD_MGAUGE_SET( SNAPCT, GOSSIP_FRESH_COUNT,            ctx->gossip.fresh_cnt );
     241           0 :   FD_MGAUGE_SET( SNAPCT, GOSSIP_TOTAL_COUNT,            ctx->gossip.total_cnt );
     242             : 
     243           0 :   FD_MGAUGE_SET( SNAPCT, PREDICTED_SLOT,                ctx->predicted_incremental.slot );
     244             : 
     245           0 : #if FD_HAS_OPENSSL
     246           0 :   FD_MCNT_SET(   SNAPCT, SSL_ALLOC_ERRORS,                fd_ossl_alloc_errors );
     247           0 : #endif
     248             : 
     249           0 :   FD_MGAUGE_SET( SNAPCT, STATE, (ulong)ctx->state );
     250           0 : }
     251             : 
     252             : static void
     253             : snapshot_path_gui_publish( fd_snapct_tile_t *  ctx,
     254             :                            fd_stem_context_t * stem,
     255             :                            char const *        path,
     256           0 :                            int                 is_full ) {
     257             :   /* FIXME: Consider whether we can get everything we need from metrics
     258             :      rather than creating an entire link for this rare message */
     259           0 :   fd_snapct_update_t * out = fd_chunk_to_laddr( ctx->out_gui.mem, ctx->out_gui.chunk );
     260           0 :   FD_TEST( fd_cstr_printf_check( out->read_path, PATH_MAX, NULL, "%s", path ) );
     261           0 :   out->is_download = 0;
     262           0 :   out->type = fd_int_if( is_full, FD_SNAPCT_SNAPSHOT_TYPE_FULL, FD_SNAPCT_SNAPSHOT_TYPE_INCREMENTAL );
     263           0 :   fd_stem_publish( stem, ctx->out_gui.idx, 0UL, ctx->out_gui.chunk, sizeof(fd_snapct_update_t) , 0UL, 0UL, 0UL );
     264           0 :   ctx->out_gui.chunk = fd_dcache_compact_next( ctx->out_gui.chunk, sizeof(fd_snapct_update_t), ctx->out_gui.chunk0, ctx->out_gui.wmark );
     265           0 : }
     266             : 
     267             : static void
     268           0 : predict_incremental( fd_snapct_tile_t * ctx ) {
     269           0 :   if( FD_UNLIKELY( !ctx->config.incremental_snapshots ) ) return;
     270           0 :   if( FD_UNLIKELY( ctx->predicted_incremental.full_slot==ULONG_MAX ) ) return;
     271             : 
     272           0 :   fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
     273             : 
     274           0 :   if( FD_LIKELY( best.addr.l ) ) {
     275           0 :     if( FD_UNLIKELY( ctx->predicted_incremental.slot!=best.incr_slot ) ) {
     276           0 :       ctx->predicted_incremental.slot    = best.incr_slot;
     277           0 :       ctx->predicted_incremental.pending = 1;
     278           0 :     }
     279           0 :   }
     280           0 : }
     281             : 
     282             : static void
     283             : on_resolve( void *        _ctx,
     284             :             fd_ip4_port_t addr,
     285             :             ulong         full_slot,
     286             :             ulong         incr_slot,
     287             :             uchar         full_hash[ FD_HASH_FOOTPRINT ],
     288           0 :             uchar         incr_hash[ FD_HASH_FOOTPRINT ] ) {
     289           0 :   fd_snapct_tile_t * ctx = (fd_snapct_tile_t *)_ctx;
     290             : 
     291           0 :   fd_sspeer_selector_add( ctx->selector, addr, ULONG_MAX, full_slot, incr_slot, full_hash, incr_hash );
     292           0 :   fd_sspeer_selector_process_cluster_slot( ctx->selector, full_slot, incr_slot );
     293           0 :   predict_incremental( ctx );
     294           0 : }
     295             : 
     296             : static void
     297             : on_ping( void *        _ctx,
     298             :          fd_ip4_port_t addr,
     299           0 :          ulong         latency ) {
     300           0 :   fd_snapct_tile_t * ctx = (fd_snapct_tile_t *)_ctx;
     301             : 
     302           0 :   fd_sspeer_selector_add( ctx->selector, addr, latency, ULONG_MAX, ULONG_MAX, NULL, NULL );
     303           0 :   predict_incremental( ctx );
     304           0 : }
     305             : 
     306             : static void
     307             : on_snapshot_hash( fd_snapct_tile_t *                 ctx,
     308             :                   fd_ip4_port_t                      addr,
     309           0 :                   fd_gossip_update_message_t const * msg ) {
     310           0 :   ulong         full_slot = msg->snapshot_hashes.full->slot;
     311           0 :   ulong         incr_slot = 0UL;
     312           0 :   uchar const * incr_hash = NULL;
     313             : 
     314           0 :   for( ulong i=0UL; i<msg->snapshot_hashes.incremental_len; i++ ) {
     315           0 :     if( FD_LIKELY( msg->snapshot_hashes.incremental[ i ].slot>incr_slot ) ) {
     316           0 :       incr_slot = msg->snapshot_hashes.incremental[ i ].slot;
     317           0 :       incr_hash = msg->snapshot_hashes.incremental[ i ].hash;
     318           0 :     }
     319           0 :   }
     320             : 
     321           0 :   fd_sspeer_selector_add( ctx->selector, addr, ULONG_MAX, full_slot, incr_slot, msg->snapshot_hashes.full->hash, incr_hash );
     322           0 :   fd_sspeer_selector_process_cluster_slot( ctx->selector, full_slot, incr_slot );
     323           0 :   predict_incremental( ctx );
     324           0 : }
     325             : 
     326             : static void
     327             : send_expected_slot( fd_snapct_tile_t *  ctx,
     328             :                     fd_stem_context_t * stem,
     329           0 :                     ulong               slot ) {
     330           0 :   uint tsorig; uint tspub;
     331           0 :   fd_ssmsg_slot_to_frag( slot, &tsorig, &tspub );
     332           0 :   fd_stem_publish( stem, ctx->out_rp.idx, FD_SSMSG_EXPECTED_SLOT, 0UL, 0UL, 0UL, tsorig, tspub );
     333           0 : }
     334             : 
     335             : static void
     336           0 : rename_snapshots( fd_snapct_tile_t * ctx ) {
     337           0 :   FD_TEST( -1!=ctx->local_out.dir_fd );
     338             : 
     339             :   /* FIXME: We should rename the full snapshot earlier as soon as the
     340             :      download is complete.  That way, if the validator crashes during the
     341             :      incremental load, we can still use the snapshot on the next run. */
     342             : 
     343           0 :   if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd && ctx->http_full_snapshot_name[ 0 ]!='\0' ) ) {
     344           0 :     if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, TEMP_FULL_SNAP_NAME, ctx->local_out.dir_fd, ctx->http_full_snapshot_name ) ) )
     345           0 :       FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     346           0 :   }
     347           0 :   if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd && ctx->http_incr_snapshot_name[ 0 ]!='\0' ) ) {
     348           0 :     if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, TEMP_INCR_SNAP_NAME, ctx->local_out.dir_fd, ctx->http_incr_snapshot_name ) ) )
     349           0 :       FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     350           0 :   }
     351           0 : }
     352             : 
     353             : static ulong
     354             : rlimit_file_cnt( fd_topo_t const *      topo FD_PARAM_UNUSED,
     355           0 :                  fd_topo_tile_t const * tile ) {
     356           0 :   ulong cnt = 1UL +                             /* stderr */
     357           0 :               1UL;                              /* logfile */
     358           0 :   if( download_enabled( tile ) ) {
     359           0 :     cnt +=    1UL +                             /* ssping socket */
     360           0 :               2UL +                             /* dirfd + full snapshot download temp fd */
     361           0 :               tile->snapct.sources.servers_cnt; /* http resolver peer full sockets */
     362           0 :     if( tile->snapct.incremental_snapshots ) {
     363           0 :       cnt +=  1UL +                             /* incr snapshot download temp fd */
     364           0 :               tile->snapct.sources.servers_cnt; /* http resolver peer incr sockets */
     365           0 :     }
     366           0 :   }
     367           0 :   return cnt;
     368           0 : }
     369             : 
     370             : static ulong
     371             : populate_allowed_seccomp( fd_topo_t const *      topo,
     372             :                           fd_topo_tile_t const * tile,
     373             :                           ulong                  out_cnt,
     374           0 :                           struct sock_filter *   out ) {
     375             : 
     376           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     377             : 
     378           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     379           0 :   fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
     380             : 
     381           0 :   int ping_fd = download_enabled( tile ) ? fd_ssping_get_sockfd( ctx->ssping ) : -1;
     382           0 :   populate_sock_filter_policy_fd_snapct_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->local_out.dir_fd, (uint)ctx->local_out.full_snapshot_fd, (uint)ctx->local_out.incremental_snapshot_fd, (uint)ping_fd );
     383           0 :   return sock_filter_policy_fd_snapct_tile_instr_cnt;
     384           0 : }
     385             : 
     386             : static ulong
     387             : populate_allowed_fds( fd_topo_t const *      topo,
     388             :                       fd_topo_tile_t const * tile,
     389             :                       ulong                  out_fds_cnt,
     390           0 :                       int *                  out_fds ) {
     391           0 :   if( FD_UNLIKELY( out_fds_cnt<6UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
     392             : 
     393           0 :   ulong out_cnt = 0;
     394           0 :   out_fds[ out_cnt++ ] = 2UL; /* stderr */
     395           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
     396           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     397           0 :   }
     398             : 
     399           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     400             : 
     401           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     402           0 :   fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
     403           0 :   if( FD_LIKELY( -1!=ctx->local_out.dir_fd ) )                  out_fds[ out_cnt++ ] = ctx->local_out.dir_fd;
     404           0 :   if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) )        out_fds[ out_cnt++ ] = ctx->local_out.full_snapshot_fd;
     405           0 :   if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.incremental_snapshot_fd;
     406           0 :   if( FD_LIKELY( download_enabled( tile ) ) )                   out_fds[ out_cnt++ ] = fd_ssping_get_sockfd( ctx->ssping );
     407             : 
     408           0 :   return out_cnt;
     409           0 : }
     410             : 
     411             : static void
     412             : init_load( fd_snapct_tile_t *  ctx,
     413             :            fd_stem_context_t * stem,
     414             :            int                 full,
     415           0 :            int                 file ) {
     416           0 :   fd_ssctrl_init_t * out = fd_chunk_to_laddr( ctx->out_ld.mem, ctx->out_ld.chunk );
     417           0 :   out->file = file;
     418           0 :   out->zstd = !file || (full ? ctx->local_in.full_snapshot_zstd : ctx->local_in.incremental_snapshot_zstd);
     419           0 :   if( file ) out->slot = full ? ctx->local_in.full_snapshot_slot : ctx->local_in.incremental_snapshot_slot;
     420           0 :   else       out->slot = full ? ctx->predicted_incremental.full_slot : ctx->predicted_incremental.slot;
     421             : 
     422           0 :   if( !file ) {
     423           0 :     out->addr = ctx->peer.addr;
     424           0 :     char encoded_hash[ FD_BASE58_ENCODED_32_SZ ];
     425           0 :     if( full ) {
     426           0 :       fd_base58_encode_32( ctx->peer.full_hash, NULL, encoded_hash );
     427           0 :       FD_TEST( fd_cstr_printf_check( out->path, PATH_MAX, &out->path_len, "/snapshot-%lu-%s.tar.zst", ctx->peer.full_slot, encoded_hash ) );
     428           0 :       FD_TEST( fd_cstr_printf_check( ctx->http_full_snapshot_name, PATH_MAX, NULL, "snapshot-%lu-%s.tar.zst", ctx->peer.full_slot, encoded_hash ) );
     429           0 :     } else {
     430           0 :       fd_base58_encode_32( ctx->peer.incr_hash, NULL, encoded_hash );
     431           0 :       FD_TEST( fd_cstr_printf_check( out->path, PATH_MAX, &out->path_len, "/incremental-snapshot-%lu-%lu-%s.tar.zst", ctx->peer.full_slot, ctx->peer.incr_slot, encoded_hash ) );
     432           0 :       FD_TEST( fd_cstr_printf_check( ctx->http_incr_snapshot_name, PATH_MAX, NULL, "incremental-snapshot-%lu-%lu-%s.tar.zst", ctx->peer.full_slot, ctx->peer.incr_slot, encoded_hash ) );
     433           0 :     }
     434             : 
     435           0 :     for( ulong i=0UL; i<SERVER_PEERS_MAX; i++ ) {
     436           0 :       if( FD_UNLIKELY( ctx->peer.addr.l==ctx->config.sources.servers[ i ].addr.l ) ) {
     437           0 :         fd_cstr_ncpy( out->hostname, ctx->config.sources.servers[ i ].hostname, sizeof(out->hostname) );
     438           0 :         out->is_https = ctx->config.sources.servers[ i ].is_https;
     439           0 :         break;
     440           0 :       }
     441           0 :     }
     442           0 :   }
     443           0 :   fd_stem_publish( stem, ctx->out_ld.idx, full ? FD_SNAPSHOT_MSG_CTRL_INIT_FULL : FD_SNAPSHOT_MSG_CTRL_INIT_INCR, ctx->out_ld.chunk, sizeof(fd_ssctrl_init_t), 0UL, 0UL, 0UL );
     444           0 :   ctx->out_ld.chunk = fd_dcache_compact_next( ctx->out_ld.chunk, sizeof(fd_ssctrl_init_t), ctx->out_ld.chunk0, ctx->out_ld.wmark );
     445           0 :   ctx->flush_ack = 0;
     446             : 
     447             :   /* If we are downloading the snapshot, we will get the snapshot size
     448             :      in bytes from a metadata message sent from snapld. */
     449           0 :   if( file ) {
     450           0 :     if( full ) ctx->metrics.full.bytes_total = ctx->local_in.full_snapshot_size;
     451           0 :     else       ctx->metrics.incremental.bytes_total = ctx->local_in.incremental_snapshot_size;
     452           0 :   }
     453             : 
     454           0 :   if( !file ) {
     455           0 :     if( full ) {
     456             :       /* reset any written content in the full output snapshot */
     457           0 :       if( FD_UNLIKELY( -1==ftruncate( ctx->local_out.full_snapshot_fd, 0UL ) ) ) {
     458           0 :         FD_LOG_ERR(( "ftruncate(%s) failed (%i-%s)", ctx->http_full_snapshot_name, errno, fd_io_strerror( errno ) ));
     459           0 :       }
     460           0 :       if( FD_UNLIKELY( -1==lseek( ctx->local_out.full_snapshot_fd, 0L, SEEK_SET ) ) ) {
     461           0 :         FD_LOG_ERR(( "lseek(%s) failed (%i-%s)", ctx->http_full_snapshot_name, errno, fd_io_strerror( errno ) ));
     462           0 :       }
     463           0 :     } else {
     464             :       /* reset any written content in the incremental snapshot output
     465             :          file */
     466           0 :       if( FD_UNLIKELY( -1==ftruncate( ctx->local_out.incremental_snapshot_fd, 0UL ) ) ) {
     467           0 :         FD_LOG_ERR(( "ftruncate(%s) failed (%i-%s)", ctx->http_incr_snapshot_name, errno, fd_io_strerror( errno ) ));
     468           0 :       }
     469           0 :       if( FD_UNLIKELY( -1==lseek( ctx->local_out.incremental_snapshot_fd, 0L, SEEK_SET ) ) ) {
     470           0 :         FD_LOG_ERR(( "lseek(%s) failed (%i-%s)", ctx->http_incr_snapshot_name, errno, fd_io_strerror( errno ) ));
     471           0 :       }
     472           0 :     }
     473           0 :   }
     474             : 
     475             :   /* Regardless of whether we load the snapshot from a file or download
     476             :      it, we know the name of the snapshot and can publish it to the gui
     477             :      here. */
     478           0 :   if( full ) {
     479           0 :     if( FD_LIKELY( !!ctx->out_gui.mem ) ) {
     480           0 :       if( file ) {
     481           0 :         fd_cstr_fini( ctx->http_full_snapshot_name );
     482           0 :         snapshot_path_gui_publish( ctx, stem, ctx->local_in.full_snapshot_path, 1 );
     483           0 :       }
     484           0 :       else {
     485           0 :         char snapshot_path[ PATH_MAX+30UL ]; /* 30 is fd_cstr_nlen( "https://255.255.255.255:65536/", ULONG_MAX ) */
     486           0 :         FD_TEST( fd_cstr_printf_check( snapshot_path, sizeof(snapshot_path), NULL, "http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_full_snapshot_name ) );
     487           0 :         snapshot_path_gui_publish( ctx, stem, snapshot_path, 1 );
     488           0 :       }
     489           0 :     }
     490           0 :   } else {
     491           0 :     if( FD_LIKELY( !!ctx->out_gui.mem ) ) {
     492           0 :       if( file ) {
     493           0 :         fd_cstr_fini( ctx->http_incr_snapshot_name );
     494           0 :         snapshot_path_gui_publish( ctx, stem, ctx->local_in.incremental_snapshot_path, 0 );
     495           0 :       } else {
     496           0 :         char snapshot_path[ PATH_MAX+30UL ]; /* 30 is fd_cstr_nlen( "https://255.255.255.255:65536/", ULONG_MAX ) */
     497           0 :         FD_TEST( fd_cstr_printf_check( snapshot_path, sizeof(snapshot_path), NULL, "http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_incr_snapshot_name ) );
     498           0 :         snapshot_path_gui_publish( ctx, stem, snapshot_path, 0 );
     499           0 :       }
     500           0 :     }
     501           0 :   }
     502           0 : }
     503             : 
     504             : static void
     505             : log_download( fd_snapct_tile_t * ctx,
     506             :               int                full,
     507             :               fd_ip4_port_t      addr,
     508           0 :               ulong              slot ) {
     509           0 :   for( gossip_ci_map_iter_t iter = gossip_ci_map_iter_init( ctx->gossip.ci_map, ctx->gossip.ci_table );
     510           0 :       !gossip_ci_map_iter_done( iter, ctx->gossip.ci_map, ctx->gossip.ci_table );
     511           0 :       iter = gossip_ci_map_iter_next( iter, ctx->gossip.ci_map, ctx->gossip.ci_table ) ) {
     512           0 :     gossip_ci_entry_t const * ci_entry = gossip_ci_map_iter_ele_const( iter, ctx->gossip.ci_map, ctx->gossip.ci_table );
     513           0 :     if( ci_entry->rpc_addr.l==addr.l ) {
     514           0 :       FD_TEST( ci_entry->allowed );
     515           0 :       FD_BASE58_ENCODE_32_BYTES( ci_entry->pubkey.uc, pubkey_b58 );
     516           0 :       FD_LOG_NOTICE(( "downloading %s snapshot at slot %lu from allowed gossip peer %s at http://" FD_IP4_ADDR_FMT ":%hu/%s",
     517           0 :                       full ? "full" : "incremental", slot, pubkey_b58,
     518           0 :                       FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ),
     519           0 :                       full ? ctx->http_full_snapshot_name : ctx->http_incr_snapshot_name ));
     520           0 :       return;
     521           0 :     }
     522           0 :   }
     523             : 
     524           0 :   for( ulong i=0UL; i<ctx->config.sources.servers_cnt; i++ ) {
     525           0 :     if( addr.l==ctx->config.sources.servers[ i ].addr.l ) {
     526           0 :       if( ctx->config.sources.servers[ i ].is_https ) {
     527           0 :         FD_LOG_NOTICE(( "downloading %s snapshot at slot %lu from configured server with index %lu at https://%s:%hu/%s",
     528           0 :                         full ? "full" : "incremental", slot, i,
     529           0 :                         ctx->config.sources.servers[ i ].hostname, fd_ushort_bswap( addr.port ),
     530           0 :                         full ? ctx->http_full_snapshot_name : ctx->http_incr_snapshot_name ));
     531           0 :       } else {
     532           0 :         FD_LOG_NOTICE(( "downloading %s snapshot at slot %lu from configured server with index %lu at http://" FD_IP4_ADDR_FMT ":%hu/%s",
     533           0 :                         full ? "full" : "incremental", slot, i,
     534           0 :                         FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ),
     535           0 :                         full ? ctx->http_full_snapshot_name : ctx->http_incr_snapshot_name ));
     536           0 :       }
     537           0 :       return;
     538           0 :     }
     539           0 :   }
     540             : 
     541           0 :   FD_TEST( 0 ); /* should not be possible */
     542           0 : }
     543             : 
     544             : static void
     545             : after_credit( fd_snapct_tile_t *  ctx,
     546             :               fd_stem_context_t * stem,
     547             :               int *               opt_poll_in FD_PARAM_UNUSED,
     548           0 :               int *               charge_busy FD_PARAM_UNUSED ) {
     549           0 :   long now = fd_log_wallclock();
     550             : 
     551           0 :   if( FD_LIKELY( ctx->ssping ) ) fd_ssping_advance( ctx->ssping, now, ctx->selector );
     552           0 :   if( FD_LIKELY( ctx->ssresolver ) ) fd_http_resolver_advance( ctx->ssresolver, now, ctx->selector );
     553             : 
     554             :   /* send an expected slot message as the predicted incremental
     555             :      could have changed as a result of the pinger, resolver, or from
     556             :      processing gossip frags in gossip_frag. */
     557           0 :   if( FD_LIKELY( ctx->predicted_incremental.pending ) ) {
     558           0 :     send_expected_slot( ctx, stem, ctx->predicted_incremental.slot );
     559           0 :     ctx->predicted_incremental.pending = 0;
     560           0 :   }
     561             : 
     562             :   /* Note: All state transitions should occur within this switch
     563             :      statement to make it easier to reason about the state management. */
     564             : 
     565             :   /* FIXME: Collapse WAITING_FOR_PEERS and COLLECTING_PEERS states for
     566             :      both full and incremental variants? */
     567             : 
     568           0 :   switch ( ctx->state ) {
     569             : 
     570             :     /* ============================================================== */
     571           0 :     case FD_SNAPCT_STATE_INIT: {
     572           0 :       if( FD_UNLIKELY( !ctx->download_enabled ) ) {
     573           0 :         ulong local_slot = ctx->config.incremental_snapshots ? ctx->local_in.incremental_snapshot_slot : ctx->local_in.full_snapshot_slot;
     574           0 :         send_expected_slot( ctx, stem, local_slot );
     575           0 :         FD_LOG_NOTICE(( "reading full snapshot at slot %lu from local file `%s`", ctx->local_in.full_snapshot_slot, ctx->local_in.full_snapshot_path ));
     576           0 :         ctx->predicted_incremental.full_slot = ctx->local_in.full_snapshot_slot;
     577           0 :         ctx->state = FD_SNAPCT_STATE_READING_FULL_FILE;
     578           0 :         init_load( ctx, stem, 1, 1 );
     579           0 :         break;
     580           0 :       }
     581           0 :       ctx->deadline_nanos = now+FD_SNAPCT_WAITING_FOR_PEERS_TIMEOUT;
     582           0 :       ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS;
     583           0 :       break;
     584           0 :     }
     585             : 
     586             :     /* ============================================================== */
     587           0 :     case FD_SNAPCT_STATE_WAITING_FOR_PEERS: {
     588           0 :       if( FD_UNLIKELY( now>ctx->deadline_nanos ) ) FD_LOG_ERR(( "timed out waiting for peers." ));
     589             : 
     590           0 :       fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, ULONG_MAX );
     591           0 :       if( FD_LIKELY( best.addr.l ) ) {
     592           0 :         ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS;
     593           0 :         ctx->deadline_nanos = now+FD_SNAPCT_COLLECTING_PEERS_TIMEOUT;
     594           0 :       }
     595           0 :       break;
     596           0 :     }
     597             : 
     598             :     /* ============================================================== */
     599           0 :     case FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL: {
     600           0 :       if( FD_UNLIKELY( now>ctx->deadline_nanos ) ) FD_LOG_ERR(( "timed out waiting for peers." ));
     601             : 
     602           0 :       FD_TEST( ctx->predicted_incremental.full_slot!=ULONG_MAX );
     603           0 :       fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
     604           0 :       if( FD_LIKELY( best.addr.l ) ) {
     605           0 :         ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
     606           0 :         ctx->deadline_nanos = now;
     607           0 :       }
     608           0 :       break;
     609           0 :     }
     610             : 
     611             :     /* ============================================================== */
     612           0 :     case FD_SNAPCT_STATE_COLLECTING_PEERS: {
     613           0 :       if( FD_UNLIKELY( !ctx->gossip.saturated && now<ctx->deadline_nanos ) ) break;
     614             : 
     615           0 :       fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, ULONG_MAX );
     616           0 :       if( FD_UNLIKELY( !best.addr.l ) ) {
     617           0 :         ctx->deadline_nanos = now + FD_SNAPCT_WAITING_FOR_PEERS_TIMEOUT;
     618           0 :         ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS;
     619           0 :         break;
     620           0 :       }
     621             : 
     622           0 :       fd_sscluster_slot_t cluster = fd_sspeer_selector_cluster_slot( ctx->selector );
     623           0 :       if( FD_UNLIKELY( cluster.incremental==ULONG_MAX && ctx->config.incremental_snapshots ) ) {
     624             :         /* We must have a cluster full slot to be in this state. */
     625           0 :         FD_TEST( cluster.full!=ULONG_MAX );
     626             :         /* fall back to full snapshot only if the highest cluster slot
     627             :            is a full snapshot only */
     628           0 :         ctx->config.incremental_snapshots = 0;
     629           0 :       }
     630             : 
     631           0 :       ulong       cluster_slot    = ctx->config.incremental_snapshots ? cluster.incremental : cluster.full;
     632           0 :       ulong       local_slot      = ctx->config.incremental_snapshots ? ctx->local_in.incremental_snapshot_slot : ctx->local_in.full_snapshot_slot;
     633           0 :       ulong       local_slot_with_download = local_slot;
     634           0 :       int         local_too_old   = local_slot!=ULONG_MAX && ctx->local_in.full_snapshot_slot!=ULONG_MAX && local_slot<fd_ulong_sat_sub( cluster_slot, ctx->config.sources.max_local_incremental_age );
     635           0 :       int         local_full_only = ctx->local_in.incremental_snapshot_slot==ULONG_MAX && ctx->local_in.full_snapshot_slot!=ULONG_MAX;
     636           0 :       if( FD_LIKELY( (ctx->config.incremental_snapshots && local_full_only) || local_too_old ) ) {
     637           0 :         fd_sspeer_t best_incremental = fd_sspeer_selector_best( ctx->selector, 1, ctx->local_in.full_snapshot_slot );
     638           0 :         if( FD_LIKELY( best_incremental.addr.l ) ) {
     639           0 :           ctx->predicted_incremental.slot = best_incremental.incr_slot;
     640           0 :           local_slot_with_download = best_incremental.incr_slot;
     641           0 :           ctx->local_in.incremental_snapshot_slot = ULONG_MAX; /* don't use the local incremental snapshot */
     642           0 :         }
     643           0 :       }
     644             : 
     645           0 :       int can_use_local_full = local_slot_with_download!=ULONG_MAX && ctx->local_in.full_snapshot_slot!=ULONG_MAX &&
     646           0 :                                local_slot_with_download>=fd_ulong_sat_sub( cluster_slot, ctx->config.sources.max_local_full_effective_age );
     647           0 :       if( FD_LIKELY( can_use_local_full ) ) {
     648           0 :         send_expected_slot( ctx, stem, local_slot );
     649             : 
     650           0 :         FD_LOG_NOTICE(( "reading full snapshot at slot %lu from local file `%s`", ctx->local_in.full_snapshot_slot, ctx->local_in.full_snapshot_path ));
     651           0 :         ctx->predicted_incremental.full_slot = ctx->local_in.full_snapshot_slot;
     652           0 :         ctx->state                           = FD_SNAPCT_STATE_READING_FULL_FILE;
     653           0 :         init_load( ctx, stem, 1, 1 );
     654           0 :       } else {
     655           0 :         if( FD_UNLIKELY( !ctx->config.incremental_snapshots ) ) send_expected_slot( ctx, stem, best.full_slot );
     656             : 
     657           0 :         fd_sspeer_t best_incremental = fd_sspeer_selector_best( ctx->selector, 1, best.full_slot );
     658           0 :         if( FD_LIKELY( best_incremental.addr.l ) ) {
     659           0 :           ctx->predicted_incremental.slot = best_incremental.incr_slot;
     660           0 :           send_expected_slot( ctx, stem, best_incremental.incr_slot );
     661           0 :         }
     662             : 
     663           0 :         ctx->peer                            = best;
     664           0 :         ctx->state                           = FD_SNAPCT_STATE_READING_FULL_HTTP;
     665           0 :         ctx->predicted_incremental.full_slot = best.full_slot;
     666           0 :         init_load( ctx, stem, 1, 0 );
     667           0 :         log_download( ctx, 1, best.addr, best.full_slot );
     668           0 :       }
     669           0 :       break;
     670           0 :     }
     671             : 
     672             :     /* ============================================================== */
     673           0 :     case FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL: {
     674           0 :       if( FD_UNLIKELY( now<ctx->deadline_nanos ) ) break;
     675             : 
     676           0 :       fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
     677           0 :       if( FD_UNLIKELY( !best.addr.l ) ) {
     678           0 :         ctx->deadline_nanos = now + FD_SNAPCT_WAITING_FOR_PEERS_TIMEOUT;
     679           0 :         ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL;
     680           0 :         break;
     681           0 :       }
     682             : 
     683             :       /* decide whether to use the local incremental snapshot if one
     684             :          exists and is not too old, otherwise download a new incremental
     685             :          snapshot. */
     686           0 :       ulong cluster_slot  = fd_sspeer_selector_cluster_slot( ctx->selector ).incremental;
     687           0 :       ulong local_slot    = ctx->local_in.incremental_snapshot_slot;
     688           0 :       int   local_too_old = local_slot<fd_ulong_sat_sub( cluster_slot, ctx->config.sources.max_local_incremental_age );
     689           0 :       if( FD_LIKELY( local_slot!=ULONG_MAX && !local_too_old ) ) {
     690           0 :         ctx->predicted_incremental.slot = local_slot;
     691           0 :         send_expected_slot( ctx, stem, local_slot );
     692             : 
     693           0 :         FD_LOG_NOTICE(( "reading incremental snapshot at slot %lu from local file `%s`", ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
     694           0 :         ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_FILE;
     695           0 :         init_load( ctx, stem, 0, 1 );
     696           0 :       } else {
     697           0 :         ctx->predicted_incremental.slot = best.incr_slot;
     698           0 :         send_expected_slot( ctx, stem, best.incr_slot );
     699             : 
     700           0 :         ctx->peer  = best;
     701           0 :         ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP;
     702           0 :         init_load( ctx, stem, 0, 0 );
     703           0 :         log_download( ctx, 0, best.addr, best.incr_slot );
     704           0 :       }
     705           0 :       break;
     706           0 :     }
     707             : 
     708             :     /* ============================================================== */
     709           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI:
     710           0 :       if( !ctx->flush_ack ) break;
     711             : 
     712           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     713           0 :         ctx->malformed = 0;
     714           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     715           0 :         ctx->flush_ack = 0;
     716           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET;
     717           0 :         FD_LOG_WARNING(( "error reading snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path ));
     718           0 :         break;
     719           0 :       }
     720             : 
     721           0 :       ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE;
     722           0 :       fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
     723           0 :       ctx->flush_ack = 0;
     724           0 :       break;
     725             : 
     726             :     /* ============================================================== */
     727           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE:
     728           0 :       if( !ctx->flush_ack ) break;
     729             : 
     730           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     731           0 :         ctx->malformed = 0;
     732           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     733           0 :         ctx->flush_ack = 0;
     734           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET;
     735           0 :         FD_LOG_WARNING(( "error reading snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path ));
     736           0 :         break;
     737           0 :       }
     738             : 
     739           0 :       ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
     740           0 :       fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
     741           0 :       break;
     742             : 
     743             :     /* ============================================================== */
     744           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI:
     745           0 :       if( !ctx->flush_ack ) break;
     746             : 
     747           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     748           0 :         ctx->malformed = 0;
     749           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     750           0 :         ctx->flush_ack = 0;
     751           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
     752           0 :         FD_LOG_WARNING(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2",
     753           0 :                          FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ) ));
     754           0 :         fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
     755           0 :         fd_sspeer_selector_remove( ctx->selector, ctx->peer.addr );
     756           0 :         break;
     757           0 :       }
     758             : 
     759           0 :       ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE;
     760           0 :       fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
     761           0 :       ctx->flush_ack = 0;
     762           0 :       break;
     763             : 
     764             :     /* ============================================================== */
     765           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE:
     766           0 :       if( !ctx->flush_ack ) break;
     767             : 
     768           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     769           0 :         ctx->malformed = 0;
     770           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     771           0 :         ctx->flush_ack = 0;
     772           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
     773           0 :         FD_LOG_WARNING(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2",
     774           0 :                          FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ) ));
     775           0 :         fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
     776           0 :         fd_sspeer_selector_remove( ctx->selector, ctx->peer.addr );
     777           0 :         break;
     778           0 :       }
     779             : 
     780           0 :       ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
     781           0 :       rename_snapshots( ctx );
     782           0 :       fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
     783           0 :       break;
     784             : 
     785             :     /* ============================================================== */
     786           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI:
     787           0 :       if( !ctx->flush_ack ) break;
     788             : 
     789           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     790           0 :         ctx->malformed = 0;
     791           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     792           0 :         ctx->flush_ack = 0;
     793           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET;
     794           0 :         FD_LOG_WARNING(( "error reading snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
     795           0 :         break;
     796           0 :       }
     797             : 
     798           0 :       ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE;
     799           0 :       ulong sig = ctx->config.incremental_snapshots &&
     800           0 :                   (ctx->local_in.incremental_snapshot_slot!=ULONG_MAX || ctx->download_enabled) ? FD_SNAPSHOT_MSG_CTRL_NEXT : FD_SNAPSHOT_MSG_CTRL_DONE;
     801           0 :       if( sig==FD_SNAPSHOT_MSG_CTRL_DONE && ctx->config.incremental_snapshots ) {
     802             :         /* set incremental snapshots to 0 if there is no local
     803             :             incremental snapshot and download is not enabled. */
     804           0 :         FD_LOG_WARNING(( "incremental snapshots were enabled via [snapshots.incremental_snapshots] "
     805           0 :                           "but no incremental snapshot exists on disk and no snapshot peers are configured. "
     806           0 :                           "skipping incremental snapshot load." ));
     807           0 :         ctx->config.incremental_snapshots = 0;
     808           0 :       }
     809           0 :       fd_stem_publish( stem, ctx->out_ld.idx, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
     810           0 :       ctx->flush_ack = 0;
     811           0 :       break;
     812             : 
     813             :     /* ============================================================== */
     814           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE:
     815           0 :       if( !ctx->flush_ack ) break;
     816             : 
     817           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     818           0 :         ctx->malformed = 0;
     819           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     820           0 :         ctx->flush_ack = 0;
     821           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET;
     822           0 :         FD_LOG_WARNING(( "error reading snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
     823           0 :         break;
     824           0 :       }
     825             : 
     826           0 :       if( FD_LIKELY( !ctx->config.incremental_snapshots ) ) {
     827           0 :         ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
     828           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
     829           0 :         break;
     830           0 :       }
     831             : 
     832           0 :       if( FD_LIKELY( ctx->download_enabled ) ) {
     833           0 :         ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
     834           0 :         ctx->deadline_nanos = 0L;
     835           0 :       } else {
     836           0 :         FD_LOG_NOTICE(( "reading incremental snapshot at slot %lu from local file `%s`", ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
     837           0 :         ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_FILE;
     838           0 :         init_load( ctx, stem, 0, 1 );
     839           0 :       }
     840           0 :       break;
     841             : 
     842             :     /* ============================================================== */
     843           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI:
     844           0 :       if( !ctx->flush_ack ) break;
     845             : 
     846           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     847           0 :         ctx->malformed = 0;
     848           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     849           0 :         ctx->flush_ack = 0;
     850           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
     851           0 :         FD_LOG_WARNING(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
     852           0 :                          FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ) ));
     853           0 :         fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
     854           0 :         fd_sspeer_selector_remove( ctx->selector, ctx->peer.addr );
     855           0 :         break;
     856           0 :       }
     857             : 
     858           0 :       ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE;
     859           0 :       fd_stem_publish( stem, ctx->out_ld.idx, ctx->config.incremental_snapshots ? FD_SNAPSHOT_MSG_CTRL_NEXT : FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
     860           0 :       ctx->flush_ack = 0;
     861           0 :       break;
     862             : 
     863             :     /* ============================================================== */
     864           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE:
     865           0 :       if( !ctx->flush_ack ) break;
     866             : 
     867           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     868           0 :         ctx->malformed = 0;
     869           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     870           0 :         ctx->flush_ack = 0;
     871           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
     872           0 :         FD_LOG_WARNING(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
     873           0 :                          FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ) ));
     874           0 :         fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
     875           0 :         fd_sspeer_selector_remove( ctx->selector, ctx->peer.addr );
     876           0 :         break;
     877           0 :       }
     878             : 
     879           0 :       if( FD_LIKELY( !ctx->config.incremental_snapshots ) ) {
     880           0 :         ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
     881           0 :         rename_snapshots( ctx );
     882           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
     883           0 :         break;
     884           0 :       }
     885             : 
     886             :       /* Get the best incremental peer to download from */
     887           0 :       fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
     888           0 :       if( FD_UNLIKELY( !best.addr.l ) ) {
     889           0 :         ctx->deadline_nanos = now;
     890           0 :         ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
     891           0 :         break;
     892           0 :       }
     893             : 
     894           0 :       ctx->predicted_incremental.slot = best.incr_slot;
     895           0 :       send_expected_slot( ctx, stem, best.incr_slot );
     896             : 
     897           0 :       ctx->peer  = best;
     898           0 :       ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP;
     899           0 :       init_load( ctx, stem, 0, 0 );
     900           0 :       log_download( ctx, 0, best.addr, best.incr_slot );
     901           0 :       break;
     902             : 
     903             :     /* ============================================================== */
     904           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
     905           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET:
     906           0 :       if( !ctx->flush_ack ) break;
     907             : 
     908           0 :       if( ctx->metrics.full.num_retries==ctx->config.max_retry_abort ) {
     909           0 :         FD_LOG_ERR(( "hit retry limit of %u for full snapshot, aborting", ctx->config.max_retry_abort ));
     910           0 :       }
     911             : 
     912           0 :       ctx->metrics.full.num_retries++;
     913             : 
     914           0 :       ctx->metrics.full.bytes_read           = 0UL;
     915           0 :       ctx->metrics.full.bytes_written        = 0UL;
     916           0 :       ctx->metrics.full.bytes_total          = 0UL;
     917             : 
     918           0 :       ctx->metrics.incremental.bytes_read    = 0UL;
     919           0 :       ctx->metrics.incremental.bytes_written = 0UL;
     920           0 :       ctx->metrics.incremental.bytes_total   = 0UL;
     921             : 
     922           0 :       if( !ctx->download_enabled ) {
     923             :         /* if we are unable to download new snapshots and unable to load
     924             :            our local snapshot, we must shutdown the validator. */
     925           0 :         FD_LOG_ERR(( "unable to load local snapshot %s and no snapshot peers were configured. aborting.", ctx->local_in.full_snapshot_path ));
     926           0 :       } else {
     927           0 :         if( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ) ctx->local_in.full_snapshot_slot = ULONG_MAX;
     928           0 :         ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS;
     929           0 :         ctx->deadline_nanos = 0L;
     930           0 :       }
     931           0 :       break;
     932             : 
     933             :     /* ============================================================== */
     934           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET:
     935           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
     936           0 :       if( !ctx->flush_ack ) break;
     937             : 
     938           0 :       if( ctx->metrics.incremental.num_retries==ctx->config.max_retry_abort ) {
     939           0 :         FD_LOG_ERR(("hit retry limit of %u for incremental snapshot. aborting", ctx->config.max_retry_abort ));
     940           0 :       }
     941             : 
     942           0 :       ctx->metrics.incremental.num_retries++;
     943             : 
     944           0 :       ctx->metrics.incremental.bytes_read    = 0UL;
     945           0 :       ctx->metrics.incremental.bytes_written = 0UL;
     946           0 :       ctx->metrics.incremental.bytes_total   = 0UL;
     947             : 
     948           0 :       if( !ctx->download_enabled ) {
     949             :         /* if we are unable to download new snapshots and unable to load
     950             :            our local snapshot, we must shutdown the validator. */
     951           0 :         FD_LOG_ERR(( "unable to load local snapshot %s and no snapshot peers were configured. aborting.", ctx->local_in.full_snapshot_path ));
     952           0 :       } else {
     953           0 :         if( ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ctx->local_in.incremental_snapshot_slot = ULONG_MAX;
     954           0 :         ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
     955           0 :         ctx->deadline_nanos = 0L;
     956           0 :       }
     957           0 :       break;
     958             : 
     959             :     /* ============================================================== */
     960           0 :     case FD_SNAPCT_STATE_READING_FULL_FILE:
     961           0 :       if( FD_UNLIKELY( !ctx->flush_ack ) ) break;
     962           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     963           0 :         ctx->malformed = 0;
     964           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     965           0 :         ctx->flush_ack = 0;
     966           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET;
     967           0 :         FD_LOG_WARNING(( "error reading snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
     968           0 :         break;
     969           0 :       }
     970           0 :       FD_TEST( ctx->metrics.full.bytes_total!=0UL );
     971           0 :       if( FD_UNLIKELY( ctx->metrics.full.bytes_read == ctx->metrics.full.bytes_total ) ) {
     972           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FINI, 0UL, 0UL, 0UL, 0UL, 0UL );
     973           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI;
     974           0 :         ctx->flush_ack = 0;
     975           0 :       }
     976           0 :       break;
     977             : 
     978             :     /* ============================================================== */
     979           0 :     case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
     980           0 :       if( FD_UNLIKELY( !ctx->flush_ack ) ) break;
     981           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     982           0 :         ctx->malformed = 0;
     983           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     984           0 :         ctx->flush_ack = 0;
     985           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET;
     986           0 :         FD_LOG_WARNING(( "error reading snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path ));
     987           0 :         break;
     988           0 :       }
     989           0 :       FD_TEST( ctx->metrics.incremental.bytes_total!=0UL );
     990           0 :       if ( FD_UNLIKELY( ctx->metrics.incremental.bytes_read == ctx->metrics.incremental.bytes_total ) ) {
     991           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FINI, 0UL, 0UL, 0UL, 0UL, 0UL );
     992           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI;
     993           0 :         ctx->flush_ack = 0;
     994           0 :       }
     995           0 :       break;
     996             : 
     997             :     /* ============================================================== */
     998           0 :     case FD_SNAPCT_STATE_READING_FULL_HTTP:
     999           0 :       if( FD_UNLIKELY( !ctx->flush_ack ) ) break;
    1000           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
    1001           0 :         ctx->malformed = 0;
    1002           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
    1003           0 :         ctx->flush_ack = 0;
    1004           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
    1005           0 :         FD_LOG_WARNING(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
    1006           0 :                          FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ) ));
    1007           0 :         fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
    1008           0 :         fd_sspeer_selector_remove( ctx->selector, ctx->peer.addr );
    1009           0 :         break;
    1010           0 :       }
    1011           0 :       if( FD_UNLIKELY( ctx->metrics.full.bytes_total!=0UL && ctx->metrics.full.bytes_read==ctx->metrics.full.bytes_total ) ) {
    1012           0 :         ulong sig = FD_SNAPSHOT_MSG_CTRL_FINI;
    1013           0 :         fd_stem_publish( stem, ctx->out_ld.idx, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
    1014           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI;
    1015           0 :         ctx->flush_ack = 0;
    1016           0 :       }
    1017           0 :       break;
    1018             : 
    1019             :     /* ============================================================== */
    1020           0 :     case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP:
    1021           0 :       if( FD_UNLIKELY( !ctx->flush_ack ) ) break;
    1022           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
    1023           0 :         ctx->malformed = 0;
    1024           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
    1025           0 :         ctx->flush_ack = 0;
    1026           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
    1027           0 :         FD_LOG_WARNING(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2",
    1028           0 :                          FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ) ));
    1029           0 :         fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
    1030           0 :         fd_sspeer_selector_remove( ctx->selector, ctx->peer.addr );
    1031           0 :         break;
    1032           0 :       }
    1033           0 :       if ( FD_UNLIKELY( ctx->metrics.incremental.bytes_total!=0UL && ctx->metrics.incremental.bytes_read==ctx->metrics.incremental.bytes_total ) ) {
    1034           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FINI, 0UL, 0UL, 0UL, 0UL, 0UL );
    1035           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI;
    1036           0 :         ctx->flush_ack = 0;
    1037           0 :       }
    1038           0 :       break;
    1039             : 
    1040             :     /* ============================================================== */
    1041           0 :     case FD_SNAPCT_STATE_SHUTDOWN:
    1042             :       /* Transitioning to the shutdown state indicates snapshot load is
    1043             :          completed without errors.  Otherwise, snapct would have aborted
    1044             :          earlier. */
    1045           0 :       break;
    1046             : 
    1047             :     /* ============================================================== */
    1048           0 :     default: FD_LOG_ERR(( "unexpected state %d", ctx->state ));
    1049           0 :   }
    1050           0 : }
    1051             : 
    1052             : static void
    1053             : gossip_frag( fd_snapct_tile_t *  ctx,
    1054             :              ulong               sig,
    1055             :              ulong               sz FD_PARAM_UNUSED,
    1056           0 :              ulong               chunk ) {
    1057           0 :   FD_TEST( ctx->gossip_enabled );
    1058             : 
    1059           0 :   if( !( sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO ||
    1060           0 :          sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE ||
    1061           0 :          sig==FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES ) ) return;
    1062             : 
    1063           0 :   fd_gossip_update_message_t const * msg    = fd_chunk_to_laddr_const( ctx->gossip_in_mem, chunk );
    1064           0 :   fd_pubkey_t const *                pubkey = (fd_pubkey_t const *)msg->origin_pubkey;
    1065           0 :   switch( msg->tag ) {
    1066           0 :     case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO: {
    1067           0 :       FD_TEST( msg->contact_info.idx<GOSSIP_PEERS_MAX );
    1068           0 :       gossip_ci_entry_t * entry = ctx->gossip.ci_table + msg->contact_info.idx;
    1069           0 :       if( FD_UNLIKELY( !fd_pubkey_eq( &entry->pubkey, pubkey ) ) ) {
    1070             :         /* Initialize the new gossip entry, which may or may not be allowed */
    1071           0 :         FD_TEST( fd_pubkey_check_zero( &entry->pubkey ) );
    1072           0 :         entry->pubkey      = *pubkey;
    1073           0 :         entry->rpc_addr.l  = 0UL;
    1074           0 :         entry->added_nanos = fd_log_wallclock();
    1075           0 :         if( ctx->config.sources.gossip.allow_any ) {
    1076           0 :           entry->allowed = 1;
    1077           0 :           for( ulong i=0UL; i<ctx->config.sources.gossip.block_list_cnt; i++ ) {
    1078           0 :             if( fd_pubkey_eq( pubkey, &ctx->config.sources.gossip.block_list[ i ] ) ) {
    1079           0 :               entry->allowed = 0;
    1080           0 :               break;
    1081           0 :             }
    1082           0 :           }
    1083           0 :         } else {
    1084           0 :           entry->allowed = 0;
    1085           0 :           for( ulong i=0UL; i<ctx->config.sources.gossip.allow_list_cnt; i++ ) {
    1086           0 :             if( fd_pubkey_eq( pubkey, &ctx->config.sources.gossip.allow_list[ i ] ) ) {
    1087           0 :               entry->allowed = 1;
    1088           0 :               break;
    1089           0 :             }
    1090           0 :           }
    1091           0 :         }
    1092           0 :         FD_TEST(  ULONG_MAX==gossip_ci_map_idx_query_const( ctx->gossip.ci_map, pubkey, ULONG_MAX, ctx->gossip.ci_table ) );
    1093           0 :         if( entry->allowed ) gossip_ci_map_idx_insert( ctx->gossip.ci_map, msg->contact_info.idx, ctx->gossip.ci_table );
    1094           0 :       }
    1095           0 :       if( !entry->allowed ) break;
    1096             :       /* Maybe update the RPC address of a new or existing allowed gossip peer */
    1097           0 :       fd_ip4_port_t cur_addr = entry->rpc_addr;
    1098           0 :       fd_ip4_port_t new_addr = msg->contact_info.contact_info->sockets[ FD_CONTACT_INFO_SOCKET_RPC ];
    1099           0 :       if( FD_UNLIKELY( new_addr.l!=cur_addr.l ) ) {
    1100           0 :         entry->rpc_addr = new_addr;
    1101           0 :         if( FD_LIKELY( !!cur_addr.l ) ) {
    1102           0 :           int removed = fd_ssping_remove( ctx->ssping, cur_addr );
    1103           0 :           if( FD_LIKELY( removed ) ) fd_sspeer_selector_remove( ctx->selector, cur_addr );
    1104           0 :         }
    1105           0 :         if( FD_LIKELY( !!new_addr.l ) ) fd_ssping_add( ctx->ssping, new_addr );
    1106           0 :         if( !ctx->config.sources.gossip.allow_any ) {
    1107           0 :           FD_BASE58_ENCODE_32_BYTES( pubkey->uc, pubkey_b58 );
    1108           0 :           if( FD_LIKELY( !!new_addr.l ) ) {
    1109           0 :             FD_LOG_NOTICE(( "allowed gossip peer added with public key `%s` and RPC address `" FD_IP4_ADDR_FMT ":%hu`",
    1110           0 :                             pubkey_b58, FD_IP4_ADDR_FMT_ARGS( new_addr.addr ), fd_ushort_bswap( new_addr.port ) ));
    1111           0 :           } else {
    1112           0 :             FD_LOG_WARNING(( "allowed gossip peer with public key `%s` does not advertise an RPC address", pubkey_b58 ));
    1113           0 :           }
    1114           0 :         }
    1115           0 :       }
    1116           0 :       break;
    1117           0 :     }
    1118           0 :     case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE: {
    1119           0 :       FD_TEST( msg->contact_info_remove.idx<GOSSIP_PEERS_MAX );
    1120           0 :       gossip_ci_entry_t * entry = ctx->gossip.ci_table + msg->contact_info_remove.idx;
    1121           0 :       if( FD_UNLIKELY( !fd_pubkey_eq( &entry->pubkey, pubkey ) ) ) {
    1122           0 :         FD_TEST( fd_pubkey_check_zero( &entry->pubkey ) );
    1123           0 :         break;
    1124           0 :       }
    1125           0 :       ulong rem_idx = gossip_ci_map_idx_remove( ctx->gossip.ci_map, pubkey, ULONG_MAX, ctx->gossip.ci_table );
    1126           0 :       if( rem_idx==ULONG_MAX ) break;
    1127           0 :       FD_TEST( entry->allowed && rem_idx==msg->contact_info_remove.idx );
    1128           0 :       fd_ip4_port_t addr = entry->rpc_addr;
    1129           0 :       if( FD_LIKELY( !!addr.l ) ) {
    1130           0 :         int removed = fd_ssping_remove( ctx->ssping, addr );
    1131           0 :         if( FD_LIKELY( removed ) ) fd_sspeer_selector_remove( ctx->selector, addr );
    1132           0 :       }
    1133           0 :       if( !ctx->config.sources.gossip.allow_any ) {
    1134           0 :         FD_BASE58_ENCODE_32_BYTES( pubkey->uc, pubkey_b58 );
    1135           0 :         FD_LOG_WARNING(( "allowed gossip peer removed with public key `%s` and RPC address `" FD_IP4_ADDR_FMT ":%hu`",
    1136           0 :                          pubkey_b58, FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ) ));
    1137           0 :       }
    1138           0 :       fd_memset( entry, 0, sizeof(*entry) );
    1139           0 :       break;
    1140           0 :     }
    1141           0 :     case FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES: {
    1142           0 :       ulong idx = gossip_ci_map_idx_query_const( ctx->gossip.ci_map, pubkey, ULONG_MAX, ctx->gossip.ci_table );
    1143           0 :       if( FD_LIKELY( idx!=ULONG_MAX ) ) {
    1144           0 :         gossip_ci_entry_t * entry = ctx->gossip.ci_table + idx;
    1145           0 :         FD_TEST( entry->allowed );
    1146           0 :         on_snapshot_hash( ctx, entry->rpc_addr, msg );
    1147           0 :       }
    1148           0 :       break;
    1149           0 :     }
    1150           0 :     default:
    1151           0 :       FD_LOG_ERR(( "snapct: unexpected gossip tag %u", (uint)msg->tag ));
    1152           0 :       break;
    1153           0 :   }
    1154           0 : }
    1155             : 
    1156             : static void
    1157             : snapld_frag( fd_snapct_tile_t *  ctx,
    1158             :              ulong               sig,
    1159             :              ulong               sz,
    1160           0 :              ulong               chunk ) {
    1161           0 :   if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_META ) ) {
    1162             :     /* Before snapld starts sending down data fragments, it first sends
    1163             :        a metadata message containing the total size of the snapshot as
    1164             :        well as the filename.  This is only done for HTTP loading. */
    1165           0 :     int full;
    1166           0 :     switch( ctx->state ) {
    1167           0 :       case FD_SNAPCT_STATE_READING_FULL_HTTP:        full = 1; break;
    1168           0 :       case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP: full = 0; break;
    1169             : 
    1170           0 :       case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
    1171           0 :       case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
    1172           0 :         return; /* Ignore */
    1173           0 :       default: FD_LOG_ERR(( "invalid meta frag in state %d", ctx->state ));
    1174           0 :     }
    1175             : 
    1176           0 :     FD_TEST( sz==sizeof(fd_ssctrl_meta_t) );
    1177           0 :     fd_ssctrl_meta_t const * meta = fd_chunk_to_laddr_const( ctx->snapld_in_mem, chunk );
    1178             : 
    1179           0 :     if( full ) ctx->metrics.full.bytes_total        = meta->total_sz;
    1180           0 :     else       ctx->metrics.incremental.bytes_total = meta->total_sz;
    1181             : 
    1182           0 :     return;
    1183           0 :   }
    1184           0 :   if( FD_UNLIKELY( sig!=FD_SNAPSHOT_MSG_DATA ) ) return;
    1185             : 
    1186           0 :   int full, file;
    1187           0 :   switch( ctx->state ) {
    1188             :     /* Expected cases, fall through below */
    1189           0 :     case FD_SNAPCT_STATE_READING_FULL_FILE:        full = 1; file = 1; break;
    1190           0 :     case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE: full = 0; file = 1; break;
    1191           0 :     case FD_SNAPCT_STATE_READING_FULL_HTTP:        full = 1; file = 0; break;
    1192           0 :     case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP: full = 0; file = 0; break;
    1193             : 
    1194           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET:
    1195           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET:
    1196           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
    1197           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
    1198             :       /* We are waiting for a reset to fully propagate through the
    1199             :          pipeline, just throw away any trailing data frags. */
    1200           0 :       return;
    1201             : 
    1202           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI:
    1203           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI:
    1204           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI:
    1205           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI:
    1206           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE:
    1207           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE:
    1208           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE:
    1209           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE:
    1210             :       /* Based on previously received data frags, we expected that the
    1211             :          current full / incremental snapshot was finished, but then we
    1212             :          received additional data frags.  Unsafe to continue so throw
    1213             :          away the whole snapshot. */
    1214           0 :       if( !ctx->malformed ) {
    1215           0 :         ctx->malformed = 1;
    1216           0 :         FD_LOG_WARNING(( "complete snapshot loaded but read %lu extra bytes", sz ));
    1217           0 :       }
    1218           0 :       return;
    1219             : 
    1220           0 :     case FD_SNAPCT_STATE_WAITING_FOR_PEERS:
    1221           0 :     case FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL:
    1222           0 :     case FD_SNAPCT_STATE_COLLECTING_PEERS:
    1223           0 :     case FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL:
    1224           0 :     case FD_SNAPCT_STATE_SHUTDOWN:
    1225           0 :     default:
    1226           0 :       FD_LOG_ERR(( "invalid data frag in state %d", ctx->state ));
    1227           0 :       return;
    1228           0 :   }
    1229             : 
    1230           0 :   if( full ) FD_TEST( ctx->metrics.full.bytes_total       !=0UL );
    1231           0 :   else       FD_TEST( ctx->metrics.incremental.bytes_total!=0UL );
    1232             : 
    1233           0 :   if( full ) ctx->metrics.full.bytes_read        += sz;
    1234           0 :   else       ctx->metrics.incremental.bytes_read += sz;
    1235             : 
    1236           0 :   if( !file && -1!=ctx->local_out.dir_fd ) {
    1237           0 :     ulong written_sz = 0;
    1238           0 :     while( written_sz<sz ) {
    1239           0 :       uchar const * data = fd_chunk_to_laddr_const( ctx->snapld_in_mem, chunk );
    1240           0 :       int fd = full ? ctx->local_out.full_snapshot_fd : ctx->local_out.incremental_snapshot_fd;
    1241           0 :       long result = write( fd, data, sz );
    1242           0 :       if( FD_UNLIKELY( -1==result && errno==ENOSPC ) ) {
    1243           0 :         FD_LOG_ERR(( "Out of disk space when writing out snapshot data to `%s`", ctx->config.snapshots_path ));
    1244           0 :       } else if( FD_UNLIKELY( 0L>result ) ) {
    1245           0 :         FD_LOG_ERR(( "write() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
    1246           0 :       }
    1247             : 
    1248           0 :       written_sz += (ulong)result;
    1249           0 :     }
    1250           0 :     if( full ) ctx->metrics.full.bytes_written        += sz;
    1251           0 :     else       ctx->metrics.incremental.bytes_written += sz;
    1252           0 :   }
    1253             : 
    1254           0 :   if( FD_UNLIKELY( ( full && ctx->metrics.full.bytes_read        > ctx->metrics.full.bytes_total ) ||
    1255           0 :                    (!full && ctx->metrics.incremental.bytes_read > ctx->metrics.incremental.bytes_total ) ) ) {
    1256           0 :     if( !ctx->malformed ) {
    1257           0 :       ctx->malformed = 1;
    1258           0 :       FD_LOG_WARNING(( "expected %s snapshot size of %lu bytes but read %lu bytes",
    1259           0 :                        full ? "full" : "incremental",
    1260           0 :                        full ? ctx->metrics.full.bytes_total : ctx->metrics.incremental.bytes_total,
    1261           0 :                        full ? ctx->metrics.full.bytes_read  : ctx->metrics.incremental.bytes_read ));
    1262             : 
    1263           0 :     }
    1264           0 :   }
    1265           0 : }
    1266             : 
    1267             : /* FIXME rename */
    1268             : static void
    1269             : snapin_frag( fd_snapct_tile_t *  ctx,
    1270           0 :              ulong               sig ) {
    1271           0 :   switch( sig ) {
    1272           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
    1273           0 :       if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_READING_FULL_HTTP ||
    1274           0 :                      ctx->state==FD_SNAPCT_STATE_READING_FULL_FILE ) ) {
    1275           0 :         FD_TEST( !ctx->flush_ack );
    1276           0 :         ctx->flush_ack = 1;
    1277           0 :       } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
    1278           0 :       break;
    1279             : 
    1280           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
    1281           0 :       if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP ||
    1282           0 :                      ctx->state==FD_SNAPCT_STATE_READING_INCREMENTAL_FILE ) ) {
    1283           0 :         FD_TEST( !ctx->flush_ack );
    1284           0 :         ctx->flush_ack = 1;
    1285           0 :       } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
    1286           0 :       break;
    1287             : 
    1288           0 :     case FD_SNAPSHOT_MSG_CTRL_NEXT:
    1289           0 :       if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE ||
    1290           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE ) ) {
    1291           0 :         FD_TEST( !ctx->flush_ack );
    1292           0 :         ctx->flush_ack = 1;
    1293           0 :       } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
    1294           0 :       break;
    1295             : 
    1296           0 :     case FD_SNAPSHOT_MSG_CTRL_DONE:
    1297           0 :       if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE ||
    1298           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE ||
    1299           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE ||
    1300           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE ) ) {
    1301           0 :         FD_TEST( !ctx->flush_ack );
    1302           0 :         ctx->flush_ack = 1;
    1303           0 :       } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
    1304           0 :       break;
    1305             : 
    1306           0 :     case FD_SNAPSHOT_MSG_CTRL_FINI:
    1307           0 :       if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI ||
    1308           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI ||
    1309           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI ||
    1310           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI ) ) {
    1311           0 :         FD_TEST( !ctx->flush_ack );
    1312           0 :         ctx->flush_ack = 1;
    1313           0 :       } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
    1314           0 :       break;
    1315             : 
    1316           0 :     case FD_SNAPSHOT_MSG_CTRL_FAIL:
    1317           0 :       if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET ||
    1318           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ||
    1319           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET ||
    1320           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ) {
    1321           0 :         FD_TEST( !ctx->flush_ack );
    1322           0 :         ctx->flush_ack = 1;
    1323           0 :       } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
    1324           0 :       break;
    1325             : 
    1326           0 :     case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
    1327           0 :       break;
    1328             : 
    1329           0 :     case FD_SNAPSHOT_MSG_CTRL_ERROR:
    1330           0 :       switch( ctx->state ) {
    1331           0 :         case FD_SNAPCT_STATE_READING_FULL_FILE:
    1332           0 :         case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI:
    1333           0 :         case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE:
    1334           0 :         case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
    1335           0 :         case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI:
    1336           0 :         case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE:
    1337           0 :         case FD_SNAPCT_STATE_READING_FULL_HTTP:
    1338           0 :         case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI:
    1339           0 :         case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE:
    1340           0 :         case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP:
    1341           0 :         case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI:
    1342           0 :         case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE:
    1343           0 :           ctx->malformed = 1;
    1344           0 :           ctx->flush_ack = 1;
    1345           0 :           break;
    1346           0 :         default:
    1347           0 :           break;
    1348           0 :       }
    1349           0 :       break;
    1350           0 :   }
    1351           0 : }
    1352             : 
    1353             : static int
    1354             : returnable_frag( fd_snapct_tile_t *  ctx,
    1355             :                  ulong               in_idx,
    1356             :                  ulong               seq    FD_PARAM_UNUSED,
    1357             :                  ulong               sig,
    1358             :                  ulong               chunk,
    1359             :                  ulong               sz,
    1360             :                  ulong               ctl    FD_PARAM_UNUSED,
    1361             :                  ulong               tsorig FD_PARAM_UNUSED,
    1362             :                  ulong               tspub  FD_PARAM_UNUSED,
    1363           0 :                  fd_stem_context_t * stem   FD_PARAM_UNUSED ) {
    1364           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) {
    1365           0 :     gossip_frag( ctx, sig, sz, chunk );
    1366           0 :   } else if( ctx->in_kind[ in_idx ]==IN_KIND_SNAPLD ) {
    1367           0 :     snapld_frag( ctx, sig, sz, chunk );
    1368           0 :   } else if( ctx->in_kind[ in_idx ]==IN_KIND_ACK ) {
    1369           0 :     snapin_frag( ctx, sig );
    1370           0 :   } else FD_LOG_ERR(( "invalid in_kind %lu %u", in_idx, (uint)ctx->in_kind[ in_idx ] ));
    1371           0 :   return 0;
    1372           0 : }
    1373             : 
    1374             : static void
    1375             : privileged_init( fd_topo_t *      topo,
    1376           0 :                  fd_topo_tile_t * tile ) {
    1377           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
    1378             : 
    1379           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
    1380           0 :   fd_snapct_tile_t * ctx         = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t),  sizeof(fd_snapct_tile_t) );
    1381           0 :   void *             _ssping     = FD_SCRATCH_ALLOC_APPEND( l, fd_ssping_align(),          fd_ssping_footprint( TOTAL_PEERS_MAX ) );
    1382           0 :                                    FD_SCRATCH_ALLOC_APPEND( l, alignof(gossip_ci_entry_t), sizeof(gossip_ci_entry_t)*GOSSIP_PEERS_MAX );
    1383           0 :                                    FD_SCRATCH_ALLOC_APPEND( l, gossip_ci_map_align(),      gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
    1384           0 :   void *             _ssresolver = FD_SCRATCH_ALLOC_APPEND( l, fd_http_resolver_align(),   fd_http_resolver_footprint( SERVER_PEERS_MAX )  );
    1385           0 :                                    FD_SCRATCH_ALLOC_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( TOTAL_PEERS_MAX ) );
    1386             : 
    1387           0 : #if FD_HAS_OPENSSL
    1388           0 :   void * _alloc = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
    1389           0 :   fd_alloc_t * alloc = fd_alloc_join( fd_alloc_new( _alloc, 1UL ), tile->kind_id );
    1390           0 :   fd_ossl_tile_init( alloc );
    1391           0 : #endif
    1392             : 
    1393           0 :   ctx->ssping = NULL;
    1394           0 :   if( FD_LIKELY( download_enabled( tile ) ) )         ctx->ssping = fd_ssping_join( fd_ssping_new( _ssping, TOTAL_PEERS_MAX, 1UL, on_ping, ctx ) );
    1395           0 :   if( FD_LIKELY( tile->snapct.sources.servers_cnt ) ) ctx->ssresolver = fd_http_resolver_join( fd_http_resolver_new( _ssresolver, SERVER_PEERS_MAX, tile->snapct.incremental_snapshots, on_resolve, ctx ) );
    1396           0 :   else                                                ctx->ssresolver = NULL;
    1397             : 
    1398             :   /* FIXME: We will keep too many snapshots if we have local snapshots
    1399             :      but elect not to use them due to their age. */
    1400           0 :   fd_ssarchive_remove_old_snapshots( tile->snapct.snapshots_path,
    1401           0 :                                      tile->snapct.max_full_snapshots_to_keep,
    1402           0 :                                      tile->snapct.max_incremental_snapshots_to_keep );
    1403             : 
    1404           0 :   ulong full_slot = ULONG_MAX;
    1405           0 :   ulong incremental_slot = ULONG_MAX;
    1406           0 :   int full_is_zstd = 0;
    1407           0 :   int incremental_is_zstd = 0;
    1408           0 :   char full_path[ PATH_MAX ] = {0};
    1409           0 :   char incremental_path[ PATH_MAX ] = {0};
    1410           0 :   if( FD_UNLIKELY( -1==fd_ssarchive_latest_pair( tile->snapct.snapshots_path,
    1411           0 :                                                  tile->snapct.incremental_snapshots,
    1412           0 :                                                  &full_slot,
    1413           0 :                                                  &incremental_slot,
    1414           0 :                                                  full_path,
    1415           0 :                                                  incremental_path,
    1416           0 :                                                  &full_is_zstd,
    1417           0 :                                                  &incremental_is_zstd ) ) ) {
    1418           0 :     if( FD_UNLIKELY( !download_enabled( tile ) ) ) {
    1419           0 :       FD_LOG_ERR(( "No snapshots found in `%s` and no download sources are enabled. "
    1420           0 :                    "Please enable downloading via [snapshots.sources] and restart.", tile->snapct.snapshots_path ));
    1421           0 :     }
    1422           0 :     ctx->local_in.full_snapshot_slot        = ULONG_MAX;
    1423           0 :     ctx->local_in.incremental_snapshot_slot = ULONG_MAX;
    1424           0 :     ctx->local_in.full_snapshot_size        = 0UL;
    1425           0 :     ctx->local_in.incremental_snapshot_size = 0UL;
    1426           0 :     ctx->local_in.full_snapshot_zstd        = 0;
    1427           0 :     ctx->local_in.incremental_snapshot_zstd = 0;
    1428           0 :     fd_cstr_fini( ctx->local_in.full_snapshot_path );
    1429           0 :     fd_cstr_fini( ctx->local_in.incremental_snapshot_path );
    1430           0 :   } else {
    1431           0 :     FD_TEST( full_slot!=ULONG_MAX );
    1432             : 
    1433           0 :     ctx->local_in.full_snapshot_slot        = full_slot;
    1434           0 :     ctx->local_in.incremental_snapshot_slot = incremental_slot;
    1435           0 :     ctx->local_in.full_snapshot_zstd        = full_is_zstd;
    1436           0 :     ctx->local_in.incremental_snapshot_zstd = incremental_is_zstd;
    1437             : 
    1438           0 :     strncpy( ctx->local_in.full_snapshot_path, full_path, PATH_MAX );
    1439           0 :     struct stat full_stat;
    1440           0 :     if( FD_UNLIKELY( -1==stat( ctx->local_in.full_snapshot_path, &full_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", full_path, errno, fd_io_strerror( errno ) ));
    1441           0 :     if( FD_UNLIKELY( !S_ISREG( full_stat.st_mode ) ) ) FD_LOG_ERR(( "full snapshot path `%s` is not a regular file", full_path ));
    1442           0 :     ctx->local_in.full_snapshot_size = (ulong)full_stat.st_size;
    1443             : 
    1444           0 :     if( FD_LIKELY( incremental_slot!=ULONG_MAX ) ) {
    1445           0 :       strncpy( ctx->local_in.incremental_snapshot_path, incremental_path, PATH_MAX );
    1446           0 :       struct stat incremental_stat;
    1447           0 :       if( FD_UNLIKELY( -1==stat( ctx->local_in.incremental_snapshot_path, &incremental_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", incremental_path, errno, fd_io_strerror( errno ) ));
    1448           0 :       if( FD_UNLIKELY( !S_ISREG( incremental_stat.st_mode ) ) ) FD_LOG_ERR(( "incremental snapshot path `%s` is not a regular file", incremental_path ));
    1449           0 :       ctx->local_in.incremental_snapshot_size = (ulong)incremental_stat.st_size;
    1450           0 :     } else {
    1451           0 :       ctx->local_in.incremental_snapshot_size = 0UL;
    1452           0 :       fd_cstr_fini( ctx->local_in.incremental_snapshot_path );
    1453           0 :     }
    1454           0 :   }
    1455             : 
    1456           0 :   ctx->local_out.dir_fd                  = -1;
    1457           0 :   ctx->local_out.full_snapshot_fd        = -1;
    1458           0 :   ctx->local_out.incremental_snapshot_fd = -1;
    1459           0 :   if( FD_LIKELY( download_enabled( tile ) ) ) {
    1460           0 :     ctx->local_out.dir_fd = open( tile->snapct.snapshots_path, O_DIRECTORY|O_CLOEXEC );
    1461           0 :     if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) FD_LOG_ERR(( "open(%s) failed (%i-%s)", tile->snapct.snapshots_path, errno, fd_io_strerror( errno ) ));
    1462             : 
    1463           0 :     ctx->local_out.full_snapshot_fd = openat( ctx->local_out.dir_fd, TEMP_FULL_SNAP_NAME, O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
    1464           0 :     if( FD_UNLIKELY( -1==ctx->local_out.full_snapshot_fd ) ) FD_LOG_ERR(( "open(%s/%s) failed (%i-%s)", tile->snapct.snapshots_path, TEMP_FULL_SNAP_NAME, errno, fd_io_strerror( errno ) ));
    1465             : 
    1466           0 :     if( FD_LIKELY( tile->snapct.incremental_snapshots ) ) {
    1467           0 :       ctx->local_out.incremental_snapshot_fd = openat( ctx->local_out.dir_fd, TEMP_INCR_SNAP_NAME, O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
    1468           0 :       if( FD_UNLIKELY( -1==ctx->local_out.incremental_snapshot_fd ) ) FD_LOG_ERR(( "open(%s/%s) failed (%i-%s)", tile->snapct.snapshots_path, TEMP_INCR_SNAP_NAME, errno, fd_io_strerror( errno ) ));
    1469           0 :     }
    1470           0 :   }
    1471           0 : }
    1472             : 
    1473             : static inline fd_snapct_out_link_t
    1474             : out1( fd_topo_t const *      topo,
    1475             :       fd_topo_tile_t const * tile,
    1476           0 :       char const *           name ) {
    1477           0 :   ulong idx = ULONG_MAX;
    1478             : 
    1479           0 :   for( ulong i=0UL; i<tile->out_cnt; i++ ) {
    1480           0 :     fd_topo_link_t const * link = &topo->links[ tile->out_link_id[ i ] ];
    1481           0 :     if( !strcmp( link->name, name ) ) {
    1482           0 :       if( FD_UNLIKELY( idx!=ULONG_MAX ) ) FD_LOG_ERR(( "tile %s:%lu had multiple output links named %s but expected one", tile->name, tile->kind_id, name ));
    1483           0 :       idx = i;
    1484           0 :     }
    1485           0 :   }
    1486             : 
    1487           0 :   if( FD_UNLIKELY( idx==ULONG_MAX ) ) return (fd_snapct_out_link_t){ .idx = ULONG_MAX, .mem = NULL, .chunk0 = 0, .wmark = 0, .chunk = 0, .mtu = 0 };
    1488             : 
    1489           0 :   ulong mtu = topo->links[ tile->out_link_id[ idx ] ].mtu;
    1490           0 :   if( FD_UNLIKELY( mtu==0UL ) ) return (fd_snapct_out_link_t){ .idx = idx, .mem = NULL, .chunk0 = ULONG_MAX, .wmark = ULONG_MAX, .chunk = ULONG_MAX, .mtu = mtu };
    1491             : 
    1492           0 :   void * mem   = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ idx ] ].dcache_obj_id ].wksp_id ].wksp;
    1493           0 :   ulong chunk0 = fd_dcache_compact_chunk0( mem, topo->links[ tile->out_link_id[ idx ] ].dcache );
    1494           0 :   ulong wmark  = fd_dcache_compact_wmark ( mem, topo->links[ tile->out_link_id[ idx ] ].dcache, mtu );
    1495           0 :   return (fd_snapct_out_link_t){ .idx = idx, .mem = mem, .chunk0 = chunk0, .wmark = wmark, .chunk = chunk0, .mtu = mtu };
    1496           0 : }
    1497             : 
    1498             : static void
    1499             : unprivileged_init( fd_topo_t *      topo,
    1500           0 :                    fd_topo_tile_t * tile ) {
    1501           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
    1502             : 
    1503           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
    1504           0 :   fd_snapct_tile_t * ctx  = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t),  sizeof(fd_snapct_tile_t)       );
    1505           0 :                             FD_SCRATCH_ALLOC_APPEND( l, fd_ssping_align(),          fd_ssping_footprint( TOTAL_PEERS_MAX ) );
    1506           0 :   void * _ci_table        = FD_SCRATCH_ALLOC_APPEND( l, alignof(gossip_ci_entry_t), sizeof(gossip_ci_entry_t) * GOSSIP_PEERS_MAX );
    1507           0 :   void * _ci_map          = FD_SCRATCH_ALLOC_APPEND( l, gossip_ci_map_align(),      gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
    1508           0 :                             FD_SCRATCH_ALLOC_APPEND( l, fd_http_resolver_align(),   fd_http_resolver_footprint( SERVER_PEERS_MAX ) );
    1509           0 :   void * _selector        = FD_SCRATCH_ALLOC_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( TOTAL_PEERS_MAX ) );
    1510             : 
    1511           0 :   ctx->config = tile->snapct;
    1512           0 :   ctx->gossip_enabled   = gossip_enabled( tile );
    1513           0 :   ctx->download_enabled = download_enabled( tile );
    1514             : 
    1515           0 :   if( ctx->config.sources.servers_cnt ) {
    1516           0 :     for( ulong i=0UL; i<tile->snapct.sources.servers_cnt; i++ ) {
    1517           0 :       fd_ssping_add       ( ctx->ssping, tile->snapct.sources.servers[ i ].addr );
    1518           0 :       fd_http_resolver_add( ctx->ssresolver,
    1519           0 :                             tile->snapct.sources.servers[ i ].addr,
    1520           0 :                             tile->snapct.sources.servers[ i ].hostname,
    1521           0 :                             tile->snapct.sources.servers[ i ].is_https );
    1522           0 :     }
    1523           0 :   }
    1524             : 
    1525           0 :   ctx->selector = fd_sspeer_selector_join( fd_sspeer_selector_new( _selector, TOTAL_PEERS_MAX, ctx->config.incremental_snapshots, 1UL ) );
    1526             : 
    1527           0 :   ctx->state          = FD_SNAPCT_STATE_INIT;
    1528           0 :   ctx->malformed      = 0;
    1529           0 :   ctx->deadline_nanos = fd_log_wallclock() + FD_SNAPCT_WAITING_FOR_PEERS_TIMEOUT;
    1530           0 :   ctx->flush_ack      = 0;
    1531           0 :   ctx->peer.addr.l    = 0UL;
    1532             : 
    1533           0 :   fd_memset( ctx->http_full_snapshot_name, 0, PATH_MAX );
    1534           0 :   fd_memset( ctx->http_incr_snapshot_name, 0, PATH_MAX );
    1535             : 
    1536           0 :   ctx->gossip_in_mem = NULL;
    1537           0 :   int has_snapld_dc = 0, has_ack_loopback = 0;
    1538           0 :   FD_TEST( tile->in_cnt<=MAX_IN_LINKS );
    1539           0 :   for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
    1540           0 :     fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
    1541           0 :     if( 0==strcmp( in_link->name, "gossip_out" ) ) {
    1542           0 :       ctx->in_kind[ i ]  = IN_KIND_GOSSIP;
    1543           0 :       ctx->gossip_in_mem = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
    1544           0 :     } else if( 0==strcmp( in_link->name, "snapld_dc" ) ) {
    1545           0 :       ctx->in_kind[ i ]  = IN_KIND_SNAPLD;
    1546           0 :       ctx->snapld_in_mem = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
    1547           0 :       FD_TEST( !has_snapld_dc );
    1548           0 :       has_snapld_dc = 1;
    1549           0 :     } else if( 0==strcmp( in_link->name, "snapin_ct" ) || 0==strcmp( in_link->name, "snapls_ct" ) ||
    1550           0 :                0==strcmp( in_link->name, "snapwm_ct" ) || 0==strcmp( in_link->name, "snaplv_ct" ) ) {
    1551           0 :       ctx->in_kind[ i ] = IN_KIND_ACK;
    1552           0 :       FD_TEST( !has_ack_loopback );
    1553           0 :       has_ack_loopback = 1;
    1554           0 :     }
    1555           0 :   }
    1556           0 :   FD_TEST( has_snapld_dc && has_ack_loopback );
    1557           0 :   FD_TEST( ctx->gossip_enabled==(ctx->gossip_in_mem!=NULL) );
    1558             : 
    1559           0 :   ctx->predicted_incremental.full_slot = ULONG_MAX;
    1560           0 :   ctx->predicted_incremental.slot      = ULONG_MAX;
    1561           0 :   ctx->predicted_incremental.pending   = 0;
    1562             : 
    1563           0 :   fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
    1564             : 
    1565           0 :   fd_memset( _ci_table, 0, sizeof(gossip_ci_entry_t) * GOSSIP_PEERS_MAX );
    1566           0 :   ctx->gossip.ci_table             = _ci_table;
    1567           0 :   ctx->gossip.ci_map               = gossip_ci_map_join( gossip_ci_map_new( _ci_map, gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ), 0UL ) );
    1568           0 :   ctx->gossip.fresh_cnt            = 0UL;
    1569           0 :   ctx->gossip.total_cnt            = 0UL;
    1570           0 :   ctx->gossip.saturated            = !ctx->gossip_enabled;
    1571           0 :   ctx->gossip.next_saturated_check = 0;
    1572             : 
    1573           0 :   if( FD_UNLIKELY( tile->out_cnt<2UL || tile->out_cnt>3UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2-3", tile->out_cnt ));
    1574           0 :   ctx->out_ld  = out1( topo, tile, "snapct_ld"   );
    1575           0 :   ctx->out_gui = out1( topo, tile, "snapct_gui"  );
    1576           0 :   ctx->out_rp  = out1( topo, tile, "snapct_repr" );
    1577           0 : }
    1578             : 
    1579             : /* after_credit can result in as many as 5 stem publishes in some code
    1580             :    paths, and returnable_frag can result in 1. */
    1581           0 : #define STEM_BURST 6UL
    1582             : 
    1583           0 : #define STEM_LAZY  1000L
    1584             : 
    1585           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_snapct_tile_t
    1586           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapct_tile_t)
    1587             : 
    1588             : #define STEM_CALLBACK_SHOULD_SHUTDOWN     should_shutdown
    1589           0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
    1590           0 : #define STEM_CALLBACK_METRICS_WRITE       metrics_write
    1591           0 : #define STEM_CALLBACK_AFTER_CREDIT        after_credit
    1592           0 : #define STEM_CALLBACK_RETURNABLE_FRAG     returnable_frag
    1593             : 
    1594             : #include "../../disco/stem/fd_stem.c"
    1595             : 
    1596             : fd_topo_run_tile_t fd_tile_snapct = {
    1597             :   .name                     = NAME,
    1598             :   .rlimit_file_cnt_fn       = rlimit_file_cnt,
    1599             :   .populate_allowed_seccomp = populate_allowed_seccomp,
    1600             :   .populate_allowed_fds     = populate_allowed_fds,
    1601             :   .scratch_align            = scratch_align,
    1602             :   .scratch_footprint        = scratch_footprint,
    1603             :   .loose_footprint          = loose_footprint,
    1604             :   .privileged_init          = privileged_init,
    1605             :   .unprivileged_init        = unprivileged_init,
    1606             :   .run                      = stem_run,
    1607             :   .keep_host_networking     = 1,
    1608             :   .allow_connect            = 1,
    1609             :   .allow_renameat           = 1,
    1610             : };
    1611             : 
    1612             : #undef NAME

Generated by: LCOV version 1.14