LCOV - code coverage report
Current view: top level - discof/restore - fd_snapct_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 145 1429 10.1 %
Date: 2026-06-29 05:51:35 Functions: 5 62 8.1 %

          Line data    Source code
       1             : #define _GNU_SOURCE
       2             : #include "fd_snapct_tile.h"
       3             : #include "utils/fd_sspeer.h"
       4             : #include "utils/fd_ssping.h"
       5             : #include "utils/fd_ssctrl.h"
       6             : #include "utils/fd_ssarchive.h"
       7             : #include "utils/fd_http_resolver.h"
       8             : #include "utils/fd_ssmsg.h"
       9             : 
      10             : #include "../../disco/topo/fd_topo.h"
      11             : #include "../../disco/metrics/fd_metrics.h"
      12             : #include "../../flamenco/gossip/fd_gossip_message.h"
      13             : #include "../../waltz/openssl/fd_openssl_tile.h"
      14             : 
      15             : #include <errno.h>
      16             : #include <stdio.h>
      17             : #include <fcntl.h>
      18             : #include <unistd.h>
      19             : #include <sys/stat.h>
      20             : #include <sys/syscall.h>
      21             : #include <netinet/tcp.h>
      22             : #include <netinet/in.h>
      23             : 
      24             : #include "generated/fd_snapct_tile_seccomp.h"
      25             : 
      26             : #define NAME "snapct"
      27             : 
      28             : /* FIXME: Do a finishing pass over the default.toml config options / comments */
      29             : 
      30          57 : #define GOSSIP_PEERS_MAX (FD_CONTACT_INFO_TABLE_SIZE)
      31          27 : #define SERVER_PEERS_MAX (FD_TOPO_SNAPSHOTS_SERVERS_MAX_RESOLVED)
      32          27 : #define TOTAL_PEERS_MAX  (GOSSIP_PEERS_MAX + SERVER_PEERS_MAX)
      33             : 
      34           0 : #define IN_KIND_ACK    (0)
      35           0 : #define IN_KIND_SNAPLD (1)
      36           0 : #define IN_KIND_GOSSIP (2)
      37             : #define MAX_IN_LINKS   (4)
      38             : 
      39           0 : #define TEMP_FULL_SNAP_NAME ".snapshot.tar.bz2-partial"
      40           0 : #define TEMP_INCR_SNAP_NAME ".incremental-snapshot.tar.bz2-partial"
      41             : 
      42             : struct fd_snapct_out_link {
      43             :   ulong       idx;
      44             :   fd_wksp_t * mem;
      45             :   ulong       chunk0;
      46             :   ulong       wmark;
      47             :   ulong       chunk;
      48             :   ulong       mtu;
      49             : };
      50             : typedef struct fd_snapct_out_link fd_snapct_out_link_t;
      51             : 
      52           0 : #define FD_SNAPCT_COLLECTING_PEERS_TIMEOUT (90L*1000L*1000L*1000L) /* 1.5 minutes */
      53             : 
      54             : struct gossip_ci_entry {
      55             :   fd_pubkey_t   pubkey;
      56             :   int           allowed;
      57             :   fd_ip4_port_t rpc_addr;
      58             :   ulong         map_next;
      59             : };
      60             : typedef struct gossip_ci_entry gossip_ci_entry_t;
      61             : 
      62             : #define MAP_NAME               gossip_ci_map
      63           9 : #define MAP_KEY                pubkey
      64             : #define MAP_ELE_T              gossip_ci_entry_t
      65             : #define MAP_KEY_T              fd_pubkey_t
      66          12 : #define MAP_NEXT               map_next
      67          12 : #define MAP_KEY_EQ(k0,k1)      fd_pubkey_eq( k0, k1 )
      68          51 : #define MAP_KEY_HASH(key,seed) fd_hash( seed, key, sizeof(fd_pubkey_t) )
      69             : #include "../../util/tmpl/fd_map_chain.c"
      70             : 
      71             : /* Standalone blacklist keyed on fd_sspeer_key_t (peer identity).
      72             :    Keying on identity rather than network address is intentional:
      73             :    a peer's address can change freely, but its identity cannot.
      74             :    For gossip peers the identity is the pubkey; for URL peers it is
      75             :    the (hostname, resolved_addr) pair.  Blacklisted peers are
      76             :    permanently banned for the bootstrap lifetime.  The blacklist uses
      77             :    its own dedicated pool so entries are never evicted by pool
      78             :    pressure, unlike the ssping peer pool. */
      79             : 
      80             : struct fd_sspeer_blacklist_entry {
      81             :   fd_sspeer_key_t key;
      82             :   ulong           pool_next;
      83             :   ulong           map_next;
      84             : };
      85             : typedef struct fd_sspeer_blacklist_entry fd_sspeer_blacklist_entry_t;
      86             : 
      87             : #define POOL_NAME  blacklist_pool
      88          30 : #define POOL_T     fd_sspeer_blacklist_entry_t
      89             : #define POOL_IDX_T ulong
      90      394008 : #define POOL_NEXT  pool_next
      91             : #include "../../util/tmpl/fd_pool.c"
      92             : 
      93             : #define MAP_NAME               blacklist_map
      94          18 : #define MAP_KEY                key
      95             : #define MAP_ELE_T              fd_sspeer_blacklist_entry_t
      96             : #define MAP_KEY_T              fd_sspeer_key_t
      97          51 : #define MAP_NEXT               map_next
      98          39 : #define MAP_KEY_EQ(k0,k1)      (fd_sspeer_key_eq(k0,k1))
      99          63 : #define MAP_KEY_HASH(key,seed) (fd_sspeer_key_hash(key,seed))
     100             : #include "../../util/tmpl/fd_map_chain.c"
     101             : 
     102             : struct fd_snapct_tile {
     103             :   struct fd_topo_tile_snapct config;
     104             :   int                        gossip_enabled;
     105             :   int                        download_enabled;
     106             : 
     107             :   fd_ssping_t *          ssping;
     108             :   fd_http_resolver_t *   ssresolver;
     109             :   fd_sspeer_selector_t * selector;
     110             :   ulong                  selector_seed;
     111             : 
     112             :   fd_sspeer_blacklist_entry_t * blacklist_pool;
     113             :   blacklist_map_t *             blacklist_map;
     114             : 
     115             :   int           state;
     116             :   int           malformed;
     117             :   int           load_complete;
     118             :   long          deadline_nanos;
     119             :   int           flush_ack;
     120             :   int           flush_ack_cnt;
     121             :   fd_sspeer_t   peer;
     122             : 
     123             :   struct {
     124             :     int dir_fd;
     125             :     int full_snapshot_fd;
     126             :     int incremental_snapshot_fd;
     127             :   } local_out;
     128             : 
     129             :   char http_full_snapshot_name[ PATH_MAX ];
     130             :   char http_incr_snapshot_name[ PATH_MAX ];
     131             : 
     132             :   void const * gossip_in_mem;
     133             :   void const * snapld_in_mem;
     134             :   uchar        in_kind[ MAX_IN_LINKS ];
     135             : 
     136             :   struct {
     137             :     ulong full_slot;
     138             :     ulong slot;
     139             :     int   pending;
     140             :   } predicted_incremental;
     141             : 
     142             :   struct {
     143             :     ulong full_snapshot_slot;
     144             :     char  full_snapshot_path[ PATH_MAX ];
     145             :     ulong full_snapshot_size;
     146             :     int   full_snapshot_zstd;
     147             : 
     148             :     uchar full_snapshot_hash[ FD_HASH_FOOTPRINT ];
     149             :     uchar incremental_snapshot_hash[ FD_HASH_FOOTPRINT ];
     150             : 
     151             :     ulong incremental_snapshot_slot;
     152             :     char  incremental_snapshot_path[ PATH_MAX ];
     153             :     ulong incremental_snapshot_size;
     154             :     int   incremental_snapshot_zstd;
     155             :   } local_in;
     156             : 
     157             :   struct {
     158             :     struct {
     159             :       ulong bytes_read;
     160             :       ulong bytes_written;
     161             :       ulong bytes_total;
     162             :       uint  num_retries;
     163             :     } full;
     164             : 
     165             :     struct {
     166             :       ulong bytes_read;
     167             :       ulong bytes_written;
     168             :       ulong bytes_total;
     169             :       uint  num_retries;
     170             :     } incremental;
     171             :   } metrics;
     172             : 
     173             :   struct {
     174             :     gossip_ci_entry_t * ci_table;  /* flat array of all gossip entries, allowed or not */
     175             :     gossip_ci_map_t *   ci_map;    /* map from pubkey to only allowed gossip entries */
     176             :     ulong               allowed_cnt; /* number of allowed entries in ci_map */
     177             :     int                 saturated;
     178             :   } gossip;
     179             : 
     180             :   long snapshot_start_timestamp_ns;
     181             : 
     182             :   fd_snapct_out_link_t out_ld;
     183             :   fd_snapct_out_link_t out_gui;
     184             :   fd_snapct_out_link_t out_rp;
     185             : };
     186             : typedef struct fd_snapct_tile fd_snapct_tile_t;
     187             : 
     188             : static int
     189           0 : gossip_enabled( fd_topo_tile_t const * tile ) {
     190           0 :   return tile->snapct.sources.gossip.allow_any || tile->snapct.sources.gossip.allow_list_cnt>0UL;
     191           0 : }
     192             : 
     193             : static int
     194           0 : download_enabled( fd_topo_tile_t const * tile ) {
     195           0 :   return gossip_enabled( tile ) || tile->snapct.sources.servers_cnt>0UL;
     196           0 : }
     197             : 
     198             : FD_FN_CONST static inline ulong
     199           0 : loose_footprint( fd_topo_tile_t const * tile ) {
     200           0 :   (void)tile;
     201             :   /* Leftover space for OpenSSL allocations */
     202           0 :   return 1<<26UL; /* 64 MiB */
     203           0 : }
     204             : 
     205             : static ulong
     206          90 : scratch_align( void ) {
     207          90 :   return fd_ulong_max( alignof(fd_snapct_tile_t),
     208          90 :          fd_ulong_max( fd_ssping_align(),
     209          90 :          fd_ulong_max( alignof(gossip_ci_entry_t),
     210          90 :          fd_ulong_max( gossip_ci_map_align(),
     211          90 :          fd_ulong_max( fd_http_resolver_align(),
     212          90 :          fd_ulong_max( fd_sspeer_selector_align(),
     213          90 :          fd_ulong_max( blacklist_pool_align(),
     214          90 :                        blacklist_map_align() ) ) ) ) ) ) );
     215          90 : }
     216             : 
     217             : static ulong
     218          30 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
     219          30 :   ulong l = FD_LAYOUT_INIT;
     220          30 :   l = FD_LAYOUT_APPEND( l, alignof(fd_snapct_tile_t),  sizeof(fd_snapct_tile_t)                                                   );
     221          30 :   l = FD_LAYOUT_APPEND( l, fd_ssping_align(),          fd_ssping_footprint( TOTAL_PEERS_MAX )                                     );
     222          30 :   l = FD_LAYOUT_APPEND( l, alignof(gossip_ci_entry_t), sizeof(gossip_ci_entry_t) * GOSSIP_PEERS_MAX                               );
     223          30 :   l = FD_LAYOUT_APPEND( l, gossip_ci_map_align(),      gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
     224          30 :   l = FD_LAYOUT_APPEND( l, fd_http_resolver_align(),   fd_http_resolver_footprint( SERVER_PEERS_MAX )                             );
     225          30 :   l = FD_LAYOUT_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( TOTAL_PEERS_MAX )                            );
     226          30 :   l = FD_LAYOUT_APPEND( l, blacklist_pool_align(),     blacklist_pool_footprint( TOTAL_PEERS_MAX )                               );
     227          30 :   l = FD_LAYOUT_APPEND( l, blacklist_map_align(),      blacklist_map_footprint( blacklist_map_chain_cnt_est( TOTAL_PEERS_MAX ) )  );
     228          30 :   l = FD_LAYOUT_APPEND( l, fd_alloc_align(),           fd_alloc_footprint()                                                       );
     229          30 :   return FD_LAYOUT_FINI( l, scratch_align() );
     230          30 : }
     231             : 
     232             : static inline int
     233           0 : should_shutdown( fd_snapct_tile_t * ctx ) {
     234           0 :   return ctx->state==FD_SNAPCT_STATE_SHUTDOWN;
     235           0 : }
     236             : 
     237             : static void
     238           0 : metrics_write( fd_snapct_tile_t * ctx ) {
     239           0 :   FD_MGAUGE_SET( SNAPCT, FULL_BYTES_READ,               ctx->metrics.full.bytes_read );
     240           0 :   FD_MGAUGE_SET( SNAPCT, FULL_BYTES_WRITTEN,            ctx->metrics.full.bytes_written );
     241           0 :   FD_MGAUGE_SET( SNAPCT, FULL_SIZE_BYTES,              ctx->metrics.full.bytes_total );
     242           0 :   FD_MGAUGE_SET( SNAPCT, FULL_RETRY,          ctx->metrics.full.num_retries );
     243             : 
     244           0 :   FD_MGAUGE_SET( SNAPCT, INCREMENTAL_BYTES_READ,        ctx->metrics.incremental.bytes_read );
     245           0 :   FD_MGAUGE_SET( SNAPCT, INCREMENTAL_BYTES_WRITTEN,     ctx->metrics.incremental.bytes_written );
     246           0 :   FD_MGAUGE_SET( SNAPCT, INCREMENTAL_SIZE_BYTES,       ctx->metrics.incremental.bytes_total );
     247           0 :   FD_MGAUGE_SET( SNAPCT, INCREMENTAL_RETRY,   ctx->metrics.incremental.num_retries );
     248             : 
     249           0 :   FD_MGAUGE_SET( SNAPCT, PREDICTED_SLOT,                ctx->predicted_incremental.slot );
     250             : 
     251           0 : #if FD_HAS_OPENSSL
     252           0 :   FD_MCNT_SET(   SNAPCT, SSL_ALLOC_FAILED,                fd_ossl_alloc_errors );
     253           0 : #endif
     254             : 
     255           0 :   FD_MGAUGE_SET( SNAPCT, STATE, (ulong)ctx->state );
     256           0 : }
     257             : 
     258             : static void
     259             : snapshot_path_gui_publish( fd_snapct_tile_t *  ctx,
     260             :                            fd_stem_context_t * stem,
     261             :                            char const *        path,
     262           0 :                            int                 is_full ) {
     263             :   /* The messages below cannot be obtained directly from metrics. */
     264           0 :   fd_snapct_update_t * out = fd_chunk_to_laddr( ctx->out_gui.mem, ctx->out_gui.chunk );
     265           0 :   FD_TEST( fd_cstr_printf_check( out->read_path, PATH_MAX, NULL, "%s", path ) );
     266           0 :   out->is_download = 0;
     267           0 :   out->type = fd_int_if( is_full, FD_SNAPCT_SNAPSHOT_TYPE_FULL, FD_SNAPCT_SNAPSHOT_TYPE_INCREMENTAL );
     268           0 :   fd_stem_publish( stem, ctx->out_gui.idx, 0UL, ctx->out_gui.chunk, sizeof(fd_snapct_update_t) , 0UL, 0UL, 0UL );
     269           0 :   ctx->out_gui.chunk = fd_dcache_compact_next( ctx->out_gui.chunk, sizeof(fd_snapct_update_t), ctx->out_gui.chunk0, ctx->out_gui.wmark );
     270           0 : }
     271             : 
     272             : static void
     273           0 : predict_incremental( fd_snapct_tile_t * ctx ) {
     274           0 :   if( FD_UNLIKELY( !ctx->config.incremental_snapshots ) ) return;
     275           0 :   if( FD_UNLIKELY( ctx->predicted_incremental.full_slot==FD_SSPEER_SLOT_UNKNOWN ) ) return;
     276             : 
     277           0 :   fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
     278             : 
     279           0 :   if( FD_LIKELY( best.addr.l ) ) {
     280           0 :     if( FD_UNLIKELY( ctx->predicted_incremental.slot!=best.incr_slot ) ) {
     281           0 :       ctx->predicted_incremental.slot    = best.incr_slot;
     282           0 :       ctx->predicted_incremental.pending = 1;
     283           0 :     }
     284           0 :   }
     285           0 : }
     286             : 
     287             : static void
     288             : on_resolve( void *                  _ctx,
     289             :             fd_sspeer_key_t const * key,
     290             :             fd_ip4_port_t           addr,
     291             :             ulong                   full_slot,
     292             :             ulong                   incr_slot,
     293             :             uchar                   full_hash[ FD_HASH_FOOTPRINT ],
     294           0 :             uchar                   incr_hash[ FD_HASH_FOOTPRINT ] ) {
     295           0 :   fd_snapct_tile_t * ctx = (fd_snapct_tile_t *)_ctx;
     296             : 
     297           0 :   if( FD_UNLIKELY( full_slot!=FD_SSPEER_SLOT_UNKNOWN && full_slot>=FD_SSPEER_PLAUSIBLE_MAX_SLOT ) ) return;
     298           0 :   if( FD_UNLIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN && incr_slot>=FD_SSPEER_PLAUSIBLE_MAX_SLOT ) ) return;
     299             : 
     300             :   /* Do not update peers that have been permanently blacklisted. */
     301           0 :   if( FD_UNLIKELY( key && blacklist_map_ele_query( ctx->blacklist_map, key, NULL, ctx->blacklist_pool ) ) ) return;
     302             :   /* Do not re-add peers whose addr is temporarily banned by ssping. */
     303           0 :   if( FD_UNLIKELY( fd_ssping_is_invalidated( ctx->ssping, addr ) ) ) return;
     304             : 
     305             :   /* add() handles both new and existing peers, so peers removed from
     306             :      the selector during a previous blacklist, timeout, or failed
     307             :      re-resolve are re-added with the freshly resolved data. */
     308           0 :   ulong score = fd_sspeer_selector_add( ctx->selector, key, addr, FD_SSPEER_LATENCY_UNKNOWN,
     309           0 :                                         full_slot, incr_slot, full_hash, incr_hash );
     310           0 :   if( FD_UNLIKELY( score==FD_SSPEER_SCORE_INVALID ) ) {
     311           0 :     if( FD_UNLIKELY( key==NULL ) ) {
     312           0 :       FD_LOG_DEBUG(( "selector add on resolve failed for NULL peer key" ));
     313           0 :     } else {
     314           0 :       if( FD_UNLIKELY( !key->is_url ) ) {
     315           0 :         FD_BASE58_ENCODE_32_BYTES( key->pubkey->key, pubkey_b58 );
     316           0 :         FD_LOG_DEBUG(( "selector add on resolve failed for peer with pubkey %s", pubkey_b58 ));
     317           0 :       } else {
     318           0 :         FD_LOG_DEBUG(( "selector add on resolve failed for peer %s with addr " FD_IP4_ADDR_FMT ":%hu", key->url.hostname,
     319           0 :                        FD_IP4_ADDR_FMT_ARGS( key->url.resolved_addr.addr ), fd_ushort_bswap( key->url.resolved_addr.port ) ));
     320           0 :       }
     321           0 :     }
     322           0 :   }
     323           0 :   fd_sspeer_selector_process_cluster_slot( ctx->selector );
     324           0 :   predict_incremental( ctx );
     325           0 : }
     326             : 
     327             : static void
     328             : on_ping( void *        _ctx,
     329             :          fd_ip4_port_t addr,
     330           0 :          ulong         latency ) {
     331           0 :   fd_snapct_tile_t * ctx = (fd_snapct_tile_t *)_ctx;
     332             : 
     333           0 :   ulong cnt = fd_sspeer_selector_update_on_ping( ctx->selector, addr, latency );
     334           0 :   if( FD_UNLIKELY( !cnt ) ) {
     335             :     /* The update may fail in normal operation, e.g. after a peer has
     336             :        been removed from the selector.  The log level is set to a
     337             :        minimum accordingly. */
     338           0 :     FD_LOG_DEBUG(( "selector update on ping did not find address " FD_IP4_ADDR_FMT ":%hu",
     339           0 :                    FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ) ));
     340           0 :   }
     341           0 :   predict_incremental( ctx );
     342           0 : }
     343             : 
     344             : static void
     345             : on_snapshot_hash( fd_snapct_tile_t *                 ctx,
     346             :                   fd_sspeer_key_t const *            key,
     347             :                   fd_ip4_port_t                      addr,
     348           0 :                   fd_gossip_update_message_t const * msg ) {
     349           0 :   ulong         full_slot = msg->snapshot_hashes->full_slot;
     350           0 :   ulong         incr_slot = FD_SSPEER_SLOT_UNKNOWN;
     351           0 :   uchar const * incr_hash = NULL;
     352             : 
     353           0 :   for( ulong i=0UL; i<msg->snapshot_hashes->incremental_len; i++ ) {
     354           0 :     if( FD_LIKELY( !incr_hash || msg->snapshot_hashes->incremental[ i ].slot>incr_slot ) ) {
     355           0 :       incr_slot = msg->snapshot_hashes->incremental[ i ].slot;
     356           0 :       incr_hash = msg->snapshot_hashes->incremental[ i ].hash;
     357           0 :     }
     358           0 :   }
     359             : 
     360           0 :   if( FD_UNLIKELY( full_slot>=FD_SSPEER_PLAUSIBLE_MAX_SLOT ) ) return;
     361           0 :   if( FD_UNLIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN && incr_slot>=FD_SSPEER_PLAUSIBLE_MAX_SLOT ) ) return;
     362             : 
     363           0 :   if( FD_UNLIKELY( !addr.l ) ) {
     364             :     /* A peer that does not advertise an rpc_addr cannot be added to
     365             :        the selector: if previously added, remove it.  The remove
     366             :        operation becomes a no-op if the peer is not found. */
     367           0 :     fd_sspeer_selector_remove( ctx->selector, key );
     368           0 :     return;
     369           0 :   }
     370             :   /* Do not re-add peers that have been permanently blacklisted. */
     371           0 :   if( FD_UNLIKELY( blacklist_map_ele_query( ctx->blacklist_map, key, NULL, ctx->blacklist_pool ) ) ) return;
     372             :   /* Do not re-add peers whose addr is temporarily banned by ssping. */
     373           0 :   if( FD_UNLIKELY( fd_ssping_is_invalidated( ctx->ssping, addr ) ) ) return;
     374             :   /* The add may fail due to capacity/pool exhaustion.  The cluster
     375             :      slot is recomputed from tracked peers only, so a failed add will
     376             :      not influence the cluster slot. */
     377           0 :   fd_sspeer_selector_add( ctx->selector, key, addr, FD_SSPEER_LATENCY_UNKNOWN,
     378           0 :                           full_slot, incr_slot,
     379           0 :                           msg->snapshot_hashes->full_hash, incr_hash );
     380           0 :   fd_sspeer_selector_process_cluster_slot( ctx->selector );
     381           0 :   predict_incremental( ctx );
     382           0 : }
     383             : 
     384             : static void
     385             : send_expected_slot( fd_snapct_tile_t *  ctx,
     386             :                     fd_stem_context_t * stem,
     387           0 :                     ulong               slot ) {
     388           0 :   uint tsorig; uint tspub;
     389           0 :   fd_ssmsg_slot_to_frag( slot, &tsorig, &tspub );
     390           0 :   fd_stem_publish( stem, ctx->out_rp.idx, FD_SSMSG_EXPECTED_SLOT, 0UL, 0UL, 0UL, tsorig, tspub );
     391           0 : }
     392             : 
     393             : static void
     394           0 : rename_full_snapshot( fd_snapct_tile_t * ctx ) {
     395           0 :   FD_TEST( -1!=ctx->local_out.dir_fd );
     396             : 
     397           0 :   if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd && ctx->http_full_snapshot_name[ 0 ]!='\0' ) ) {
     398           0 :     if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, TEMP_FULL_SNAP_NAME, ctx->local_out.dir_fd, ctx->http_full_snapshot_name ) ) )
     399           0 :       FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     400           0 :   }
     401           0 : }
     402             : 
     403             : static void
     404           0 : rename_incr_snapshot( fd_snapct_tile_t * ctx ) {
     405           0 :   FD_TEST( -1!=ctx->local_out.dir_fd );
     406             : 
     407           0 :   if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd && ctx->http_incr_snapshot_name[ 0 ]!='\0' ) ) {
     408           0 :     if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, TEMP_INCR_SNAP_NAME, ctx->local_out.dir_fd, ctx->http_incr_snapshot_name ) ) )
     409           0 :       FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     410           0 :   }
     411           0 : }
     412             : 
     413             : static ulong
     414             : rlimit_file_cnt( fd_topo_t const *      topo FD_PARAM_UNUSED,
     415           0 :                  fd_topo_tile_t const * tile ) {
     416           0 :   ulong cnt = 1UL +                             /* stderr */
     417           0 :               1UL +                             /* logfile */
     418           0 :               1UL;                              /* boot control pipe */
     419           0 :   if( download_enabled( tile ) ) {
     420           0 :     cnt +=    FD_SSPING_FD_CNT +                /* ssping sockets */
     421           0 :               2UL +                             /* dirfd + full snapshot download temp fd */
     422           0 :               tile->snapct.sources.servers_cnt; /* http resolver peer full sockets */
     423           0 :     if( tile->snapct.incremental_snapshots ) {
     424           0 :       cnt +=  1UL +                             /* incr snapshot download temp fd */
     425           0 :               tile->snapct.sources.servers_cnt; /* http resolver peer incr sockets */
     426           0 :     }
     427           0 :   }
     428           0 :   return cnt;
     429           0 : }
     430             : 
     431             : static ulong
     432             : populate_allowed_seccomp( fd_topo_t const *      topo,
     433             :                           fd_topo_tile_t const * tile,
     434             :                           ulong                  out_cnt,
     435           0 :                           struct sock_filter *   out ) {
     436             : 
     437           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     438             : 
     439           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     440           0 :   fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
     441             : 
     442           0 :   int min_ping_fd = INT_MAX;
     443           0 :   int max_ping_fd = 0;
     444           0 :   if( download_enabled( tile ) ) {
     445           0 :     min_ping_fd = FD_SSPING_FD_MIN;
     446           0 :     max_ping_fd = FD_SSPING_FD_MIN + (int)FD_SSPING_FD_CNT - 1;
     447           0 :   }
     448             : 
     449           0 :   populate_sock_filter_policy_fd_snapct_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->local_out.dir_fd, (uint)ctx->local_out.full_snapshot_fd, (uint)ctx->local_out.incremental_snapshot_fd, (uint)min_ping_fd, (uint)max_ping_fd );
     450           0 :   return sock_filter_policy_fd_snapct_tile_instr_cnt;
     451           0 : }
     452             : 
     453             : static ulong
     454             : populate_allowed_fds( fd_topo_t const *      topo,
     455             :                       fd_topo_tile_t const * tile,
     456             :                       ulong                  out_fds_cnt,
     457           0 :                       int *                  out_fds ) {
     458           0 :   if( FD_UNLIKELY( out_fds_cnt<5UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu is too small", out_fds_cnt ));
     459             : 
     460           0 :   ulong out_cnt = 0;
     461           0 :   out_fds[ out_cnt++ ] = 2UL; /* stderr */
     462           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
     463           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     464           0 :   }
     465             : 
     466           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     467             : 
     468           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     469           0 :   fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
     470           0 :   if( FD_LIKELY( -1!=ctx->local_out.dir_fd ) )                  out_fds[ out_cnt++ ] = ctx->local_out.dir_fd;
     471           0 :   if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) )        out_fds[ out_cnt++ ] = ctx->local_out.full_snapshot_fd;
     472           0 :   if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.incremental_snapshot_fd;
     473           0 :   if( FD_LIKELY( download_enabled( tile ) ) ) {
     474           0 :     if( FD_UNLIKELY( out_cnt+FD_SSPING_FD_CNT > out_fds_cnt ) ) {
     475           0 :       FD_LOG_ERR(( "out_fds_cnt %lu must be at least %lu", out_fds_cnt, out_cnt + FD_SSPING_FD_CNT ));
     476           0 :     }
     477           0 :     for( int i=FD_SSPING_FD_MIN; i<FD_SSPING_FD_MIN+(int)FD_SSPING_FD_CNT; i++ ) {
     478           0 :       out_fds[ out_cnt++ ] = i;
     479           0 :     }
     480           0 :   }
     481             : 
     482           0 :   return out_cnt;
     483           0 : }
     484             : 
     485             : static void
     486             : init_load( fd_snapct_tile_t *  ctx,
     487             :            fd_stem_context_t * stem,
     488             :            int                 full,
     489           0 :            int                 file ) {
     490           0 :   ctx->snapshot_start_timestamp_ns = fd_log_wallclock();
     491           0 :   fd_ssctrl_init_t * out = fd_chunk_to_laddr( ctx->out_ld.mem, ctx->out_ld.chunk );
     492           0 :   out->file = file;
     493           0 :   out->zstd = !file || (full ? ctx->local_in.full_snapshot_zstd : ctx->local_in.incremental_snapshot_zstd);
     494           0 :   if( file ) {
     495           0 :     out->slot = full ? ctx->local_in.full_snapshot_slot : ctx->local_in.incremental_snapshot_slot;
     496           0 :     if( full ) fd_memcpy( out->snapshot_hash, ctx->local_in.full_snapshot_hash, FD_HASH_FOOTPRINT );
     497           0 :     else       fd_memcpy( out->snapshot_hash, ctx->local_in.incremental_snapshot_hash, FD_HASH_FOOTPRINT );
     498           0 :   } else {
     499           0 :     out->slot = full ? ctx->predicted_incremental.full_slot : ctx->predicted_incremental.slot;
     500           0 :     if( full ) fd_memcpy( out->snapshot_hash, ctx->peer.full_hash, FD_HASH_FOOTPRINT );
     501           0 :     else       fd_memcpy( out->snapshot_hash, ctx->peer.incr_hash, FD_HASH_FOOTPRINT );
     502           0 :   }
     503             : 
     504           0 :   if( !file ) {
     505           0 :     out->addr = ctx->peer.addr;
     506           0 :     char encoded_hash[ FD_BASE58_ENCODED_32_SZ ];
     507           0 :     if( full ) {
     508           0 :       fd_base58_encode_32( ctx->peer.full_hash, NULL, encoded_hash );
     509           0 :       FD_TEST( fd_cstr_printf_check( out->path, PATH_MAX, &out->path_len, "/snapshot-%lu-%s.tar.zst", ctx->peer.full_slot, encoded_hash ) );
     510           0 :       FD_TEST( fd_cstr_printf_check( ctx->http_full_snapshot_name, PATH_MAX, NULL, "snapshot-%lu-%s.tar.zst", ctx->peer.full_slot, encoded_hash ) );
     511           0 :     } else {
     512           0 :       fd_base58_encode_32( ctx->peer.incr_hash, NULL, encoded_hash );
     513           0 :       FD_TEST( fd_cstr_printf_check( out->path, PATH_MAX, &out->path_len, "/incremental-snapshot-%lu-%lu-%s.tar.zst", ctx->peer.full_slot, ctx->peer.incr_slot, encoded_hash ) );
     514           0 :       FD_TEST( fd_cstr_printf_check( ctx->http_incr_snapshot_name, PATH_MAX, NULL, "incremental-snapshot-%lu-%lu-%s.tar.zst", ctx->peer.full_slot, ctx->peer.incr_slot, encoded_hash ) );
     515           0 :     }
     516             : 
     517           0 :     out->is_https = 0; /* if not found in the config list, it's not https */
     518           0 :     out->hostname[0] = '\0'; /* .. and it doesn't have a hostname either. */
     519           0 :     for( ulong i=0UL; i<SERVER_PEERS_MAX; i++ ) {
     520           0 :       if( FD_UNLIKELY( ctx->peer.addr.l==ctx->config.sources.servers[ i ].addr.l ) ) {
     521           0 :         fd_cstr_ncpy( out->hostname, ctx->config.sources.servers[ i ].hostname, sizeof(out->hostname) );
     522           0 :         out->is_https = ctx->config.sources.servers[ i ].is_https;
     523           0 :         break;
     524           0 :       }
     525           0 :     }
     526           0 :   }
     527           0 :   fd_stem_publish( stem, ctx->out_ld.idx, full ? FD_SNAPSHOT_MSG_CTRL_INIT_FULL : FD_SNAPSHOT_MSG_CTRL_INIT_INCR, ctx->out_ld.chunk, sizeof(fd_ssctrl_init_t), 0UL, 0UL, 0UL );
     528           0 :   ctx->out_ld.chunk = fd_dcache_compact_next( ctx->out_ld.chunk, sizeof(fd_ssctrl_init_t), ctx->out_ld.chunk0, ctx->out_ld.wmark );
     529           0 :   ctx->flush_ack = 0;
     530           0 :   ctx->load_complete = 0;
     531             : 
     532             :   /* If we are downloading the snapshot, we will get the snapshot size
     533             :      in bytes from a metadata message sent from snapld. */
     534           0 :   if( file ) {
     535           0 :     if( full ) ctx->metrics.full.bytes_total = ctx->local_in.full_snapshot_size;
     536           0 :     else       ctx->metrics.incremental.bytes_total = ctx->local_in.incremental_snapshot_size;
     537           0 :   }
     538             : 
     539           0 :   if( !file ) {
     540           0 :     if( full ) {
     541             :       /* reset any written content in the full output snapshot */
     542           0 :       if( FD_UNLIKELY( -1==ftruncate( ctx->local_out.full_snapshot_fd, 0UL ) ) ) {
     543           0 :         FD_LOG_ERR(( "ftruncate(%s) failed (%i-%s)", ctx->http_full_snapshot_name, errno, fd_io_strerror( errno ) ));
     544           0 :       }
     545           0 :       if( FD_UNLIKELY( -1==lseek( ctx->local_out.full_snapshot_fd, 0L, SEEK_SET ) ) ) {
     546           0 :         FD_LOG_ERR(( "lseek(%s) failed (%i-%s)", ctx->http_full_snapshot_name, errno, fd_io_strerror( errno ) ));
     547           0 :       }
     548           0 :     } else {
     549             :       /* reset any written content in the incremental snapshot output
     550             :          file */
     551           0 :       if( FD_UNLIKELY( -1==ftruncate( ctx->local_out.incremental_snapshot_fd, 0UL ) ) ) {
     552           0 :         FD_LOG_ERR(( "ftruncate(%s) failed (%i-%s)", ctx->http_incr_snapshot_name, errno, fd_io_strerror( errno ) ));
     553           0 :       }
     554           0 :       if( FD_UNLIKELY( -1==lseek( ctx->local_out.incremental_snapshot_fd, 0L, SEEK_SET ) ) ) {
     555           0 :         FD_LOG_ERR(( "lseek(%s) failed (%i-%s)", ctx->http_incr_snapshot_name, errno, fd_io_strerror( errno ) ));
     556           0 :       }
     557           0 :     }
     558           0 :   }
     559             : 
     560             :   /* Regardless of whether we load the snapshot from a file or download
     561             :      it, we know the name of the snapshot and can publish it to the gui
     562             :      here. */
     563           0 :   if( full ) {
     564           0 :     if( FD_LIKELY( !!ctx->out_gui.mem ) ) {
     565           0 :       if( file ) {
     566           0 :         fd_cstr_fini( ctx->http_full_snapshot_name );
     567           0 :         snapshot_path_gui_publish( ctx, stem, ctx->local_in.full_snapshot_path, 1 );
     568           0 :       }
     569           0 :       else {
     570           0 :         char snapshot_path[ PATH_MAX+30UL ]; /* 30 is fd_cstr_nlen( "https://255.255.255.255:65536/", ULONG_MAX ) */
     571           0 :         FD_TEST( fd_cstr_printf_check( snapshot_path, sizeof(snapshot_path), NULL, "http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_full_snapshot_name ) );
     572           0 :         snapshot_path_gui_publish( ctx, stem, snapshot_path, 1 );
     573           0 :       }
     574           0 :     }
     575           0 :   } else {
     576           0 :     if( FD_LIKELY( !!ctx->out_gui.mem ) ) {
     577           0 :       if( file ) {
     578           0 :         fd_cstr_fini( ctx->http_incr_snapshot_name );
     579           0 :         snapshot_path_gui_publish( ctx, stem, ctx->local_in.incremental_snapshot_path, 0 );
     580           0 :       } else {
     581           0 :         char snapshot_path[ PATH_MAX+30UL ]; /* 30 is fd_cstr_nlen( "https://255.255.255.255:65536/", ULONG_MAX ) */
     582           0 :         FD_TEST( fd_cstr_printf_check( snapshot_path, sizeof(snapshot_path), NULL, "http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_incr_snapshot_name ) );
     583           0 :         snapshot_path_gui_publish( ctx, stem, snapshot_path, 0 );
     584           0 :       }
     585           0 :     }
     586           0 :   }
     587           0 : }
     588             : 
     589             : static void
     590             : log_download( fd_snapct_tile_t * ctx,
     591             :               int                full,
     592             :               fd_ip4_port_t      addr,
     593           0 :               ulong              slot ) {
     594           0 :   for( gossip_ci_map_iter_t iter = gossip_ci_map_iter_init( ctx->gossip.ci_map, ctx->gossip.ci_table );
     595           0 :       !gossip_ci_map_iter_done( iter, ctx->gossip.ci_map, ctx->gossip.ci_table );
     596           0 :       iter = gossip_ci_map_iter_next( iter, ctx->gossip.ci_map, ctx->gossip.ci_table ) ) {
     597           0 :     gossip_ci_entry_t const * ci_entry = gossip_ci_map_iter_ele_const( iter, ctx->gossip.ci_map, ctx->gossip.ci_table );
     598           0 :     if( ci_entry->rpc_addr.l==addr.l ) {
     599           0 :       FD_TEST( ci_entry->allowed );
     600           0 :       FD_BASE58_ENCODE_32_BYTES( ci_entry->pubkey.uc, pubkey_b58 );
     601           0 :       FD_LOG_NOTICE(( "downloading %s snapshot at slot %lu from allowed gossip peer %s at http://" FD_IP4_ADDR_FMT ":%hu/%s",
     602           0 :                       full ? "full" : "incremental", slot, pubkey_b58,
     603           0 :                       FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ),
     604           0 :                       full ? ctx->http_full_snapshot_name : ctx->http_incr_snapshot_name ));
     605           0 :       return;
     606           0 :     }
     607           0 :   }
     608             : 
     609           0 :   for( ulong i=0UL; i<ctx->config.sources.servers_cnt; i++ ) {
     610           0 :     if( addr.l==ctx->config.sources.servers[ i ].addr.l ) {
     611           0 :       if( ctx->config.sources.servers[ i ].is_https ) {
     612           0 :         FD_LOG_NOTICE(( "downloading %s snapshot at slot %lu from configured server with index %lu at https://%s:%hu/%s",
     613           0 :                         full ? "full" : "incremental", slot, i,
     614           0 :                         ctx->config.sources.servers[ i ].hostname, fd_ushort_bswap( addr.port ),
     615           0 :                         full ? ctx->http_full_snapshot_name : ctx->http_incr_snapshot_name ));
     616           0 :       } else {
     617           0 :         FD_LOG_NOTICE(( "downloading %s snapshot at slot %lu from configured server with index %lu at http://" FD_IP4_ADDR_FMT ":%hu/%s",
     618           0 :                         full ? "full" : "incremental", slot, i,
     619           0 :                         FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ),
     620           0 :                         full ? ctx->http_full_snapshot_name : ctx->http_incr_snapshot_name ));
     621           0 :       }
     622           0 :       return;
     623           0 :     }
     624           0 :   }
     625             : 
     626           0 :   FD_TEST( 0 ); /* should not be possible */
     627           0 : }
     628             : 
     629             : static void
     630             : log_completion( fd_snapct_tile_t * ctx,
     631           0 :                 int                full ) {
     632           0 :   double elapsed = (double)(fd_log_wallclock() - ctx->snapshot_start_timestamp_ns) / 1e9;
     633           0 :   FD_LOG_INFO(( "%s snapshot load completed in %.3f seconds", full ? "full" : "incremental", elapsed ));
     634           0 : }
     635             : 
     636             : /* Blacklist the current peer: invalidate in ssping, remove from the
     637             :    selector, and add to the dedicated blacklist map.  Skips the insert
     638             :    if the peer is already blacklisted (dedup).  Warns if the pool is
     639             :    full (the ssping ban still provides temporary protection). */
     640             : static void
     641          24 : blacklist_peer( fd_snapct_tile_t * ctx ) {
     642          24 :   fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
     643          24 :   fd_sspeer_selector_remove_by_addr( ctx->selector, ctx->peer.addr );
     644          24 :   fd_sspeer_selector_process_cluster_slot( ctx->selector );
     645          24 :   if( FD_UNLIKELY( blacklist_map_ele_query( ctx->blacklist_map, &ctx->peer.key, NULL, ctx->blacklist_pool ) ) ) return;
     646          21 :   if( FD_LIKELY( blacklist_pool_free( ctx->blacklist_pool ) ) ) {
     647          18 :     fd_sspeer_blacklist_entry_t * bl = blacklist_pool_ele_acquire( ctx->blacklist_pool );
     648          18 :     bl->key = ctx->peer.key;
     649          18 :     blacklist_map_ele_insert( ctx->blacklist_map, bl, ctx->blacklist_pool );
     650          18 :     if( FD_UNLIKELY( !ctx->peer.key.is_url ) ) {
     651          18 :       FD_BASE58_ENCODE_32_BYTES( ctx->peer.key.pubkey->uc, pubkey_b58 );
     652          18 :       FD_LOG_WARNING(( "permanently blacklisted peer identity %s", pubkey_b58 ));
     653          18 :     } else {
     654           0 :       FD_LOG_WARNING(( "permanently blacklisted peer identity %s",
     655           0 :                        ctx->peer.key.url.hostname[0] ? ctx->peer.key.url.hostname : "(none)" ));
     656           0 :     }
     657          18 :   } else {
     658           3 :     FD_LOG_WARNING(( "blacklist pool full, peer banned via ssping only" ));
     659           3 :   }
     660          21 : }
     661             : 
     662             : static void
     663             : after_credit( fd_snapct_tile_t *  ctx,
     664             :               fd_stem_context_t * stem,
     665             :               int *               opt_poll_in FD_PARAM_UNUSED,
     666           0 :               int *               charge_busy FD_PARAM_UNUSED ) {
     667           0 :   long now = fd_log_wallclock();
     668             : 
     669           0 :   if( FD_LIKELY( ctx->ssping ) ) fd_ssping_advance( ctx->ssping, now, ctx->selector );
     670           0 :   if( FD_LIKELY( ctx->ssresolver ) ) fd_http_resolver_advance( ctx->ssresolver, now, ctx->selector );
     671             : 
     672             :   /* Advances above may remove peers, making cluster_slot dirty.
     673             :      Recompute so best() calls below use up-to-date scores.
     674             :      No-op when the dirty flag is not set internally. */
     675           0 :   fd_sspeer_selector_process_cluster_slot( ctx->selector );
     676             : 
     677             :   /* send an expected slot message as the predicted incremental
     678             :      could have changed as a result of the pinger, resolver, or from
     679             :      processing gossip frags in gossip_frag. */
     680           0 :   if( FD_LIKELY( ctx->predicted_incremental.pending ) ) {
     681           0 :     send_expected_slot( ctx, stem, ctx->predicted_incremental.slot );
     682           0 :     ctx->predicted_incremental.pending = 0;
     683           0 :   }
     684             : 
     685             :   /* Note: All state transitions should occur within this switch
     686             :      statement to make it easier to reason about the state management. */
     687             : 
     688           0 :   switch ( ctx->state ) {
     689             : 
     690             :     /* ============================================================== */
     691           0 :     case FD_SNAPCT_STATE_INIT: {
     692           0 :       if( FD_UNLIKELY( !ctx->download_enabled ) ) {
     693           0 :         ulong local_slot = ctx->config.incremental_snapshots ? ctx->local_in.incremental_snapshot_slot : ctx->local_in.full_snapshot_slot;
     694           0 :         send_expected_slot( ctx, stem, local_slot );
     695           0 :         FD_LOG_NOTICE(( "reading full snapshot at slot %lu from local file `%s`", ctx->local_in.full_snapshot_slot, ctx->local_in.full_snapshot_path ));
     696           0 :         ctx->predicted_incremental.full_slot = ctx->local_in.full_snapshot_slot;
     697           0 :         ctx->state = FD_SNAPCT_STATE_READING_FULL_FILE;
     698           0 :         init_load( ctx, stem, 1, 1 );
     699           0 :         break;
     700           0 :       }
     701           0 :       ctx->deadline_nanos = now+ctx->config.wait_for_peers_timeout_nanos;
     702           0 :       ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS;
     703           0 :       break;
     704           0 :     }
     705             : 
     706             :     /* ============================================================== */
     707           0 :     case FD_SNAPCT_STATE_WAITING_FOR_PEERS: {
     708           0 :       if( FD_UNLIKELY( now>ctx->deadline_nanos ) ) FD_LOG_ERR(( "timed out waiting for peers." ));
     709             : 
     710           0 :       fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, FD_SSPEER_SLOT_UNKNOWN );
     711           0 :       if( FD_LIKELY( best.addr.l ) ) {
     712           0 :         ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS;
     713           0 :         ctx->deadline_nanos = now+FD_SNAPCT_COLLECTING_PEERS_TIMEOUT;
     714           0 :       }
     715           0 :       break;
     716           0 :     }
     717             : 
     718             :     /* ============================================================== */
     719           0 :     case FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL: {
     720           0 :       if( FD_UNLIKELY( now>ctx->deadline_nanos ) ) FD_LOG_ERR(( "timed out waiting for incremental snapshot peers." ));
     721             : 
     722           0 :       FD_TEST( ctx->predicted_incremental.full_slot!=FD_SSPEER_SLOT_UNKNOWN );
     723           0 :       fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
     724           0 :       if( FD_LIKELY( best.addr.l ) ) {
     725           0 :         ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
     726           0 :         ctx->deadline_nanos = now;
     727           0 :       }
     728           0 :       break;
     729           0 :     }
     730             : 
     731             :     /* ============================================================== */
     732           0 :     case FD_SNAPCT_STATE_COLLECTING_PEERS: {
     733           0 :       if( FD_UNLIKELY( !ctx->gossip.saturated && now<ctx->deadline_nanos ) ) break;
     734             : 
     735           0 :       fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, FD_SSPEER_SLOT_UNKNOWN );
     736           0 :       if( FD_UNLIKELY( !best.addr.l ) ) {
     737           0 :         if( !ctx->gossip_enabled ) {
     738           0 :           FD_LOG_ERR(( "no peers are available and discovery of new peers via gossip is disabled. aborting." ));
     739           0 :         }
     740           0 :         ctx->deadline_nanos = now + ctx->config.wait_for_peers_timeout_nanos;
     741           0 :         ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS;
     742           0 :         break;
     743           0 :       }
     744             : 
     745           0 :       fd_sscluster_slot_t cluster = fd_sspeer_selector_cluster_slot( ctx->selector );
     746           0 :       if( FD_UNLIKELY( cluster.incremental==FD_SSPEER_SLOT_UNKNOWN && ctx->config.incremental_snapshots ) ) {
     747             :         /* We must have a cluster full slot to be in this state. */
     748           0 :         FD_TEST( cluster.full!=FD_SSPEER_SLOT_UNKNOWN );
     749             :         /* fall back to full snapshot only if the highest cluster slot
     750             :            is a full snapshot only */
     751           0 :         FD_LOG_WARNING(( "incremental snapshots were enabled via [snapshots.incremental_snapshots], but no incremental snapshot is available in the cluster. "
     752           0 :                          "falling back to full snapshots only." ));
     753           0 :         ctx->config.incremental_snapshots = 0;
     754           0 :       }
     755             : 
     756           0 :       ulong cluster_slot = ctx->config.incremental_snapshots ? cluster.incremental : cluster.full;
     757             : 
     758             :       /* Determine the best effective slot achievable using the local
     759             :          full snapshot.  When incrementals are disabled, the effective
     760             :          slot is the full snapshot slot itself.  When enabled, it is the
     761             :          best incremental we can pair with the local full (either from
     762             :          a local file or downloaded from a peer). */
     763             : 
     764           0 :       ulong local_effective_slot = ULONG_MAX;
     765           0 :       if( FD_LIKELY( ctx->local_in.full_snapshot_slot!=ULONG_MAX ) ) {
     766           0 :         if( FD_LIKELY( ctx->config.incremental_snapshots ) ) {
     767           0 :           ulong local_incr = ctx->local_in.incremental_snapshot_slot;
     768           0 :           if( local_incr!=ULONG_MAX && local_incr>=fd_ulong_sat_sub( cluster_slot, ctx->config.sources.max_local_incremental_age ) ) {
     769           0 :             local_effective_slot = local_incr;
     770           0 :           } else {
     771           0 :             fd_sspeer_t best_incr = fd_sspeer_selector_best( ctx->selector, 1, ctx->local_in.full_snapshot_slot );
     772           0 :             if( FD_LIKELY( best_incr.addr.l ) ) {
     773           0 :               ctx->predicted_incremental.slot         = best_incr.incr_slot;
     774           0 :               ctx->local_in.incremental_snapshot_slot = ULONG_MAX; /* don't use the local incremental */
     775           0 :               local_effective_slot                    = best_incr.incr_slot;
     776           0 :             }
     777           0 :           }
     778           0 :         } else {
     779           0 :           local_effective_slot = ctx->local_in.full_snapshot_slot;
     780           0 :         }
     781           0 :       }
     782             : 
     783           0 :       int can_use_local_full = local_effective_slot!=ULONG_MAX &&
     784           0 :                                local_effective_slot>=fd_ulong_sat_sub( cluster_slot, ctx->config.sources.max_local_full_effective_age );
     785           0 :       if( FD_LIKELY( can_use_local_full ) ) {
     786           0 :         send_expected_slot( ctx, stem, local_effective_slot );
     787             : 
     788           0 :         FD_LOG_NOTICE(( "reading full snapshot at slot %lu with cluster slot %lu from local file `%s`",
     789           0 :                         ctx->local_in.full_snapshot_slot, cluster_slot, ctx->local_in.full_snapshot_path ));
     790           0 :         ctx->predicted_incremental.full_slot = ctx->local_in.full_snapshot_slot;
     791           0 :         ctx->state                           = FD_SNAPCT_STATE_READING_FULL_FILE;
     792           0 :         init_load( ctx, stem, 1, 1 );
     793           0 :       } else {
     794           0 :         if( FD_LIKELY( ctx->local_in.full_snapshot_slot!=ULONG_MAX ) ) {
     795           0 :           if( local_effective_slot==ULONG_MAX ) {
     796           0 :             if( ctx->local_in.incremental_snapshot_slot!=ULONG_MAX ) {
     797           0 :               FD_LOG_NOTICE(( "local full snapshot at slot %lu cannot be used because local incremental snapshot at slot %lu "
     798           0 :                               "is too old and no downloadable incremental could be found (cluster slot %lu), downloading instead",
     799           0 :                               ctx->local_in.full_snapshot_slot, ctx->local_in.incremental_snapshot_slot, cluster_slot ));
     800           0 :             } else {
     801           0 :               FD_LOG_NOTICE(( "local full snapshot at slot %lu cannot be used because no matching incremental snapshot "
     802           0 :                               "could be found (cluster slot %lu), downloading instead",
     803           0 :                               ctx->local_in.full_snapshot_slot, cluster_slot ));
     804           0 :             }
     805           0 :           } else {
     806           0 :             FD_LOG_NOTICE(( "local full snapshot at slot %lu (effective slot %lu) is too old for cluster slot %lu max age %u, downloading instead",
     807           0 :                             ctx->local_in.full_snapshot_slot, local_effective_slot, cluster_slot, ctx->config.sources.max_local_full_effective_age ));
     808           0 :           }
     809           0 :         } else {
     810           0 :           FD_LOG_INFO(( "no local snapshot available, downloading from peer" ));
     811           0 :         }
     812             : 
     813           0 :         if( FD_UNLIKELY( !ctx->config.incremental_snapshots ) ) {
     814           0 :           send_expected_slot( ctx, stem, best.full_slot );
     815           0 :         } else {
     816           0 :           fd_sspeer_t best_incremental = fd_sspeer_selector_best( ctx->selector, 1, best.full_slot );
     817           0 :           if( FD_LIKELY( best_incremental.addr.l ) ) {
     818           0 :             ctx->predicted_incremental.slot = best_incremental.incr_slot;
     819           0 :             send_expected_slot( ctx, stem, best_incremental.incr_slot );
     820           0 :           }
     821           0 :         }
     822             : 
     823           0 :         ctx->peer                            = best;
     824           0 :         ctx->state                           = FD_SNAPCT_STATE_READING_FULL_HTTP;
     825           0 :         ctx->predicted_incremental.full_slot = best.full_slot;
     826           0 :         init_load( ctx, stem, 1, 0 );
     827           0 :         log_download( ctx, 1, best.addr, best.full_slot );
     828           0 :       }
     829           0 :       break;
     830           0 :     }
     831             : 
     832             :     /* ============================================================== */
     833           0 :     case FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL: {
     834           0 :       if( FD_UNLIKELY( now<ctx->deadline_nanos ) ) break;
     835             : 
     836           0 :       fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
     837           0 :       if( FD_UNLIKELY( !best.addr.l ) ) {
     838           0 :         if( !ctx->gossip_enabled ) {
     839           0 :           FD_LOG_ERR(( "no incremental snapshot peers are available and discovery of new peers via gossip is disabled. aborting." ));
     840           0 :         }
     841           0 :         ctx->deadline_nanos = now + ctx->config.wait_for_peers_timeout_nanos;
     842           0 :         ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL;
     843           0 :         break;
     844           0 :       }
     845             : 
     846             :       /* decide whether to use the local incremental snapshot if one
     847             :          exists and is not too old, otherwise download a new incremental
     848             :          snapshot. */
     849           0 :       ulong cluster_slot  = fd_sspeer_selector_cluster_slot( ctx->selector ).incremental;
     850           0 :       ulong local_slot    = ctx->local_in.incremental_snapshot_slot;
     851           0 :       int   local_too_old = local_slot<fd_ulong_sat_sub( cluster_slot, ctx->config.sources.max_local_incremental_age );
     852           0 :       if( FD_LIKELY( local_slot!=ULONG_MAX && !local_too_old ) ) {
     853           0 :         ctx->predicted_incremental.slot = local_slot;
     854           0 :         send_expected_slot( ctx, stem, local_slot );
     855             : 
     856           0 :         FD_LOG_NOTICE(( "reading incremental snapshot at slot %lu from local file `%s`", ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
     857           0 :         ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_FILE;
     858           0 :         init_load( ctx, stem, 0, 1 );
     859           0 :       } else {
     860           0 :         ctx->predicted_incremental.slot = best.incr_slot;
     861           0 :         send_expected_slot( ctx, stem, best.incr_slot );
     862             : 
     863           0 :         ctx->peer  = best;
     864           0 :         ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP;
     865           0 :         init_load( ctx, stem, 0, 0 );
     866           0 :         log_download( ctx, 0, best.addr, best.incr_slot );
     867           0 :       }
     868           0 :       break;
     869           0 :     }
     870             : 
     871             :     /* ============================================================== */
     872           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI:
     873           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     874           0 :         ctx->malformed = 0;
     875           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     876           0 :         ctx->flush_ack = 0;
     877           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET;
     878           0 :         FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from local file `%s`",
     879           0 :                          ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
     880           0 :         break;
     881           0 :       }
     882             : 
     883           0 :       if( ctx->flush_ack < ctx->flush_ack_cnt ) break;
     884             : 
     885           0 :       ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE;
     886           0 :       fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
     887           0 :       ctx->flush_ack = 0;
     888           0 :       break;
     889             : 
     890             :     /* ============================================================== */
     891           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE:
     892           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     893           0 :         ctx->malformed = 0;
     894           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     895           0 :         ctx->flush_ack = 0;
     896           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET;
     897           0 :         FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from local file `%s`",
     898           0 :                          ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
     899           0 :         break;
     900           0 :       }
     901             : 
     902           0 :       if( ctx->flush_ack < ctx->flush_ack_cnt ) break;
     903             : 
     904           0 :       log_completion( ctx, 0/*incr*/ );
     905           0 :       ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
     906           0 :       fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
     907           0 :       break;
     908             : 
     909             :     /* ============================================================== */
     910           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI:
     911           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     912           0 :         ctx->malformed = 0;
     913           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     914           0 :         ctx->flush_ack = 0;
     915           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
     916           0 :         FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
     917           0 :                          "blacklisting peer due to download failure.",
     918           0 :                          ctx->predicted_incremental.slot,
     919           0 :                          FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_incr_snapshot_name ));
     920           0 :         blacklist_peer( ctx );
     921           0 :         break;
     922           0 :       }
     923             : 
     924           0 :       if( ctx->flush_ack < ctx->flush_ack_cnt ) break;
     925             : 
     926           0 :       ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE;
     927           0 :       fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
     928           0 :       ctx->flush_ack = 0;
     929           0 :       break;
     930             : 
     931             :     /* ============================================================== */
     932           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE:
     933           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     934           0 :         ctx->malformed = 0;
     935           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     936           0 :         ctx->flush_ack = 0;
     937           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
     938           0 :         FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
     939           0 :                          "blacklisting peer due to download failure.",
     940           0 :                          ctx->predicted_incremental.slot,
     941           0 :                          FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_incr_snapshot_name ));
     942           0 :         blacklist_peer( ctx );
     943           0 :         break;
     944           0 :       }
     945             : 
     946           0 :       if( ctx->flush_ack < ctx->flush_ack_cnt ) break;
     947             : 
     948           0 :       log_completion( ctx, 0/*incr*/ );
     949           0 :       ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
     950           0 :       rename_incr_snapshot( ctx );
     951           0 :       fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
     952           0 :       break;
     953             : 
     954             :     /* ============================================================== */
     955           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI:
     956           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     957           0 :         ctx->malformed = 0;
     958           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     959           0 :         ctx->flush_ack = 0;
     960           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET;
     961           0 :         FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from local file `%s`",
     962           0 :                          ctx->local_in.full_snapshot_slot, ctx->local_in.full_snapshot_path ));
     963           0 :         break;
     964           0 :       }
     965             : 
     966           0 :       if( ctx->flush_ack < ctx->flush_ack_cnt ) break;
     967             : 
     968           0 :       ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE;
     969           0 :       ulong sig = ctx->config.incremental_snapshots &&
     970           0 :                   (ctx->local_in.incremental_snapshot_slot!=ULONG_MAX || ctx->download_enabled) ? FD_SNAPSHOT_MSG_CTRL_NEXT : FD_SNAPSHOT_MSG_CTRL_DONE;
     971           0 :       if( sig==FD_SNAPSHOT_MSG_CTRL_DONE && ctx->config.incremental_snapshots ) {
     972             :         /* set incremental snapshots to 0 if there is no local
     973             :             incremental snapshot and download is not enabled. */
     974           0 :         FD_LOG_INFO(( "incremental snapshots were enabled via [snapshots.incremental_snapshots] "
     975           0 :                       "but no incremental snapshot exists on disk and no snapshot peers are configured. "
     976           0 :                       "skipping incremental snapshot load." ));
     977           0 :         ctx->config.incremental_snapshots = 0;
     978           0 :       }
     979           0 :       fd_stem_publish( stem, ctx->out_ld.idx, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
     980           0 :       ctx->flush_ack = 0;
     981           0 :       break;
     982             : 
     983             :     /* ============================================================== */
     984           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE:
     985           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     986           0 :         ctx->malformed = 0;
     987           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     988           0 :         ctx->flush_ack = 0;
     989           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET;
     990           0 :         FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from local file `%s`",
     991           0 :                          ctx->local_in.full_snapshot_slot, ctx->local_in.full_snapshot_path ));
     992           0 :         break;
     993           0 :       }
     994             : 
     995           0 :       if( ctx->flush_ack < ctx->flush_ack_cnt ) break;
     996             : 
     997           0 :       log_completion( ctx, 1/*full*/ );
     998           0 :       if( FD_LIKELY( !ctx->config.incremental_snapshots ) ) {
     999           0 :         ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
    1000           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
    1001           0 :         break;
    1002           0 :       }
    1003             : 
    1004           0 :       if( FD_LIKELY( ctx->download_enabled ) ) {
    1005           0 :         ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
    1006           0 :         ctx->deadline_nanos = 0L;
    1007           0 :       } else {
    1008           0 :         FD_LOG_NOTICE(( "reading incremental snapshot at slot %lu from local file `%s`", ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
    1009           0 :         ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_FILE;
    1010           0 :         init_load( ctx, stem, 0, 1 );
    1011           0 :       }
    1012           0 :       break;
    1013             : 
    1014             :     /* ============================================================== */
    1015           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI:
    1016           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
    1017           0 :         ctx->malformed = 0;
    1018           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
    1019           0 :         ctx->flush_ack = 0;
    1020           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
    1021           0 :         FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
    1022           0 :                          "blacklisting peer due to download failure.",
    1023           0 :                          ctx->predicted_incremental.full_slot,
    1024           0 :                          FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_full_snapshot_name ));
    1025           0 :         blacklist_peer( ctx );
    1026           0 :         break;
    1027           0 :       }
    1028             : 
    1029           0 :       if( ctx->flush_ack < ctx->flush_ack_cnt ) break;
    1030             : 
    1031           0 :       ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE;
    1032           0 :       fd_stem_publish( stem, ctx->out_ld.idx, ctx->config.incremental_snapshots ? FD_SNAPSHOT_MSG_CTRL_NEXT : FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
    1033           0 :       ctx->flush_ack = 0;
    1034           0 :       break;
    1035             : 
    1036             :     /* ============================================================== */
    1037           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE:
    1038           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
    1039           0 :         ctx->malformed = 0;
    1040           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
    1041           0 :         ctx->flush_ack = 0;
    1042           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
    1043           0 :         FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
    1044           0 :                          "blacklisting peer due to download failure.",
    1045           0 :                          ctx->predicted_incremental.full_slot,
    1046           0 :                          FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_full_snapshot_name ));
    1047           0 :         blacklist_peer( ctx );
    1048           0 :         break;
    1049           0 :       }
    1050             : 
    1051           0 :       if( ctx->flush_ack < ctx->flush_ack_cnt ) break;
    1052             : 
    1053           0 :       rename_full_snapshot( ctx );
    1054             : 
    1055           0 :       log_completion( ctx, 1/*full*/ );
    1056           0 :       if( FD_LIKELY( !ctx->config.incremental_snapshots ) ) {
    1057           0 :         ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
    1058           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
    1059           0 :         break;
    1060           0 :       }
    1061             : 
    1062             :       /* Get the best incremental peer to download from */
    1063           0 :       fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
    1064           0 :       if( FD_UNLIKELY( !best.addr.l ) ) {
    1065           0 :         ctx->deadline_nanos = now;
    1066           0 :         ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
    1067           0 :         break;
    1068           0 :       }
    1069             : 
    1070           0 :       ctx->predicted_incremental.slot = best.incr_slot;
    1071           0 :       send_expected_slot( ctx, stem, best.incr_slot );
    1072             : 
    1073           0 :       ctx->peer  = best;
    1074           0 :       ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP;
    1075           0 :       init_load( ctx, stem, 0, 0 );
    1076           0 :       log_download( ctx, 0, best.addr, best.incr_slot );
    1077           0 :       break;
    1078             : 
    1079             :     /* ============================================================== */
    1080           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
    1081           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET:
    1082           0 :       if( FD_UNLIKELY( ctx->flush_ack<ctx->flush_ack_cnt ) ) break;
    1083             : 
    1084           0 :       if( ctx->metrics.full.num_retries==ctx->config.max_retry_abort ) {
    1085           0 :         FD_LOG_ERR(( "hit retry limit of %u for full snapshot, aborting", ctx->config.max_retry_abort ));
    1086           0 :       }
    1087             : 
    1088           0 :       ctx->metrics.full.num_retries++;
    1089           0 :       FD_LOG_NOTICE(( "retrying full snapshot download (attempt %u/%u)",
    1090           0 :                       ctx->metrics.full.num_retries, ctx->config.max_retry_abort ));
    1091             : 
    1092           0 :       ctx->metrics.full.bytes_read           = 0UL;
    1093           0 :       ctx->metrics.full.bytes_written        = 0UL;
    1094           0 :       ctx->metrics.full.bytes_total          = 0UL;
    1095             : 
    1096           0 :       ctx->metrics.incremental.bytes_read    = 0UL;
    1097           0 :       ctx->metrics.incremental.bytes_written = 0UL;
    1098           0 :       ctx->metrics.incremental.bytes_total   = 0UL;
    1099             : 
    1100           0 :       if( !ctx->download_enabled ) {
    1101             :         /* if we are unable to download new snapshots and unable to load
    1102             :            our local snapshot, we must shutdown the validator. */
    1103           0 :         FD_LOG_ERR(( "unable to load local snapshot %s and no snapshot peers were configured. aborting.", ctx->local_in.full_snapshot_path ));
    1104           0 :       } else {
    1105           0 :         if( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ) ctx->local_in.full_snapshot_slot = ULONG_MAX;
    1106           0 :         ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS;
    1107           0 :         ctx->deadline_nanos = 0L;
    1108           0 :       }
    1109           0 :       break;
    1110             : 
    1111             :     /* ============================================================== */
    1112           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET:
    1113           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
    1114           0 :       if( FD_UNLIKELY( ctx->flush_ack<ctx->flush_ack_cnt ) ) break;
    1115             : 
    1116           0 :       if( ctx->metrics.incremental.num_retries==ctx->config.max_retry_abort ) {
    1117           0 :         FD_LOG_ERR(("hit retry limit of %u for incremental snapshot. aborting", ctx->config.max_retry_abort ));
    1118           0 :       }
    1119             : 
    1120           0 :       ctx->metrics.incremental.num_retries++;
    1121           0 :       FD_LOG_NOTICE(( "retrying incremental snapshot download (attempt %u/%u)",
    1122           0 :                       ctx->metrics.incremental.num_retries, ctx->config.max_retry_abort ));
    1123             : 
    1124           0 :       ctx->metrics.incremental.bytes_read    = 0UL;
    1125           0 :       ctx->metrics.incremental.bytes_written = 0UL;
    1126           0 :       ctx->metrics.incremental.bytes_total   = 0UL;
    1127             : 
    1128           0 :       if( !ctx->download_enabled ) {
    1129             :         /* if we are unable to download new snapshots and unable to load
    1130             :            our local snapshot, we must shutdown the validator. */
    1131           0 :         FD_LOG_ERR(( "unable to load local snapshot %s and no snapshot peers were configured. aborting.", ctx->local_in.incremental_snapshot_path ));
    1132           0 :       } else {
    1133           0 :         if( ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ctx->local_in.incremental_snapshot_slot = ULONG_MAX;
    1134           0 :         ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
    1135           0 :         ctx->deadline_nanos = 0L;
    1136           0 :       }
    1137           0 :       break;
    1138             : 
    1139             :     /* ============================================================== */
    1140           0 :     case FD_SNAPCT_STATE_READING_FULL_FILE:
    1141           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
    1142           0 :         ctx->malformed = 0;
    1143           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
    1144           0 :         ctx->flush_ack = 0;
    1145           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET;
    1146           0 :         FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from local file `%s`",
    1147           0 :                          ctx->local_in.full_snapshot_slot, ctx->local_in.full_snapshot_path ));
    1148           0 :         break;
    1149           0 :       }
    1150           0 :       if( FD_UNLIKELY( ctx->flush_ack < ctx->flush_ack_cnt ) ) break;
    1151           0 :       if( FD_UNLIKELY( ctx->load_complete ) ) {
    1152           0 :         ctx->load_complete = 0;
    1153           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FINI, 0UL, 0UL, 0UL, 0UL, 0UL );
    1154           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI;
    1155           0 :         ctx->flush_ack = 0;
    1156           0 :       }
    1157           0 :       break;
    1158             : 
    1159             :     /* ============================================================== */
    1160           0 :     case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
    1161           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
    1162           0 :         ctx->malformed = 0;
    1163           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
    1164           0 :         ctx->flush_ack = 0;
    1165           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET;
    1166           0 :         FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from local file `%s`",
    1167           0 :                          ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
    1168           0 :         break;
    1169           0 :       }
    1170           0 :       if( FD_UNLIKELY( ctx->flush_ack < ctx->flush_ack_cnt ) ) break;
    1171           0 :       if( FD_UNLIKELY( ctx->load_complete ) ) {
    1172           0 :         ctx->load_complete = 0;
    1173           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FINI, 0UL, 0UL, 0UL, 0UL, 0UL );
    1174           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI;
    1175           0 :         ctx->flush_ack = 0;
    1176           0 :       }
    1177           0 :       break;
    1178             : 
    1179             :     /* ============================================================== */
    1180           0 :     case FD_SNAPCT_STATE_READING_FULL_HTTP:
    1181           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
    1182           0 :         ctx->malformed = 0;
    1183           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
    1184           0 :         ctx->flush_ack = 0;
    1185           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
    1186           0 :         FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
    1187           0 :                          "blacklisting peer due to download failure",
    1188           0 :                          ctx->predicted_incremental.full_slot,
    1189           0 :                          FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_full_snapshot_name ));
    1190           0 :         blacklist_peer( ctx );
    1191           0 :         break;
    1192           0 :       }
    1193           0 :       if( FD_UNLIKELY( ctx->flush_ack < ctx->flush_ack_cnt ) ) break;
    1194           0 :       if( FD_UNLIKELY( ctx->load_complete ) ) {
    1195           0 :         ctx->load_complete = 0;
    1196           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FINI, 0UL, 0UL, 0UL, 0UL, 0UL );
    1197           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI;
    1198           0 :         ctx->flush_ack = 0;
    1199           0 :       }
    1200           0 :       break;
    1201             : 
    1202             :     /* ============================================================== */
    1203           0 :     case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP:
    1204           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
    1205           0 :         ctx->malformed = 0;
    1206           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
    1207           0 :         ctx->flush_ack = 0;
    1208           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
    1209           0 :         FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
    1210           0 :                          "blacklisting peer due to download failure",
    1211           0 :                          ctx->predicted_incremental.slot,
    1212           0 :                          FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_incr_snapshot_name ));
    1213           0 :         blacklist_peer( ctx );
    1214           0 :         break;
    1215           0 :       }
    1216           0 :       if( FD_UNLIKELY( ctx->flush_ack < ctx->flush_ack_cnt ) ) break;
    1217           0 :       if( FD_UNLIKELY( ctx->load_complete ) ) {
    1218           0 :         ctx->load_complete = 0;
    1219           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FINI, 0UL, 0UL, 0UL, 0UL, 0UL );
    1220           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI;
    1221           0 :         ctx->flush_ack = 0;
    1222           0 :       }
    1223           0 :       break;
    1224             : 
    1225             :     /* ============================================================== */
    1226           0 :     case FD_SNAPCT_STATE_SHUTDOWN:
    1227             :       /* Transitioning to the shutdown state indicates snapshot load is
    1228             :          completed without errors.  Otherwise, snapct would have aborted
    1229             :          earlier. */
    1230           0 :       break;
    1231             : 
    1232             :     /* ============================================================== */
    1233           0 :     default: FD_LOG_ERR(( "unexpected state %s", fd_snapct_state_str( ctx->state ) ));
    1234           0 :   }
    1235           0 : }
    1236             : 
    1237             : static void
    1238             : gossip_frag( fd_snapct_tile_t *  ctx,
    1239             :              ulong               sig,
    1240             :              ulong               sz FD_PARAM_UNUSED,
    1241          27 :              ulong               chunk ) {
    1242          27 :   FD_TEST( ctx->gossip_enabled );
    1243             : 
    1244          27 :   if( FD_UNLIKELY( sig==FD_GOSSIP_UPDATE_TAG_PEER_SATURATED ) ) {
    1245           0 :     ctx->gossip.saturated = 1;
    1246           0 :     return;
    1247           0 :   }
    1248             : 
    1249          27 :   if( !( sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO ||
    1250          27 :          sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE ||
    1251          27 :          sig==FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES ) ) return;
    1252             : 
    1253          27 :   fd_gossip_update_message_t const * msg = fd_chunk_to_laddr_const( ctx->gossip_in_mem, chunk );
    1254          27 :   switch( msg->tag ) {
    1255          21 :     case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO: {
    1256          21 :       FD_TEST( msg->contact_info->idx<GOSSIP_PEERS_MAX );
    1257          21 :       fd_pubkey_t const * pubkey = (fd_pubkey_t const *)msg->origin;
    1258          21 :       gossip_ci_entry_t * entry = ctx->gossip.ci_table + msg->contact_info->idx;
    1259          21 :       if( FD_UNLIKELY( !fd_pubkey_eq( &entry->pubkey, pubkey ) ) ) {
    1260             :         /* Initialize the new gossip entry, which may or may not be allowed */
    1261          18 :         FD_TEST( fd_pubkey_check_zero( &entry->pubkey ) );
    1262          18 :         entry->pubkey      = *pubkey;
    1263          18 :         entry->rpc_addr.l  = 0UL;
    1264          18 :         if( ctx->config.sources.gossip.allow_any ) {
    1265           9 :           entry->allowed = 1;
    1266           9 :           for( ulong i=0UL; i<ctx->config.sources.gossip.block_list_cnt; i++ ) {
    1267           3 :             if( fd_pubkey_eq( pubkey, &ctx->config.sources.gossip.block_list[ i ] ) ) {
    1268           3 :               entry->allowed = 0;
    1269           3 :               break;
    1270           3 :             }
    1271           3 :           }
    1272           9 :         } else {
    1273           9 :           entry->allowed = 0;
    1274           9 :           for( ulong i=0UL; i<ctx->config.sources.gossip.allow_list_cnt; i++ ) {
    1275           3 :             if( fd_pubkey_eq( pubkey, &ctx->config.sources.gossip.allow_list[ i ] ) ) {
    1276           3 :               entry->allowed = 1;
    1277           3 :               break;
    1278           3 :             }
    1279           3 :           }
    1280           9 :         }
    1281          18 :         FD_TEST(  ULONG_MAX==gossip_ci_map_idx_query_const( ctx->gossip.ci_map, pubkey, ULONG_MAX, ctx->gossip.ci_table ) );
    1282          18 :         if( entry->allowed ) {
    1283           9 :           gossip_ci_map_idx_insert( ctx->gossip.ci_map, msg->contact_info->idx, ctx->gossip.ci_table );
    1284           9 :           ctx->gossip.allowed_cnt++;
    1285             :           /* Allow-list shortcut: if an explicit allow list is
    1286             :              configured and all expected peers have arrived, declare
    1287             :              saturation immediately without waiting for the gossip
    1288             :              tile's general saturation signal. */
    1289           9 :           if( FD_UNLIKELY( !ctx->config.sources.gossip.allow_any &&
    1290           9 :                            ctx->config.sources.gossip.allow_list_cnt>0UL &&
    1291           9 :                            ctx->gossip.allowed_cnt==ctx->config.sources.gossip.allow_list_cnt ) ) {
    1292           3 :             FD_LOG_NOTICE(( "all %lu allowed gossip peers discovered", ctx->config.sources.gossip.allow_list_cnt ));
    1293           3 :             ctx->gossip.saturated = 1;
    1294           3 :           }
    1295           9 :         }
    1296          18 :       }
    1297          21 :       if( !entry->allowed ) break;
    1298             :       /* Maybe update the RPC address of a new or existing allowed gossip peer */
    1299          12 :       fd_ip4_port_t cur_addr = entry->rpc_addr;
    1300          12 :       fd_ip4_port_t new_addr;
    1301          12 :       new_addr.addr = msg->contact_info->value->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_RPC ].is_ipv6 ? 0 : msg->contact_info->value->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_RPC ].ip4;
    1302          12 :       new_addr.port = msg->contact_info->value->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_RPC ].port;
    1303             : 
    1304          12 :       if( FD_UNLIKELY( new_addr.l!=cur_addr.l ) ) {
    1305           0 :         fd_sspeer_key_t entry_key = {0};
    1306           0 :         *entry_key.pubkey = entry->pubkey;
    1307           0 :         entry_key.is_url  = 0;
    1308           0 :         if( FD_LIKELY( !!new_addr.l ) ) {
    1309             :           /* Do not re-add peers that have been permanently blacklisted
    1310             :              or whose new address is temporarily banned by ssping.
    1311             :              Bookkeeping (ssping cleanup, rpc_addr update) still runs
    1312             :              so the CI table and ssping pool stay consistent. */
    1313           0 :           if( FD_LIKELY( !blacklist_map_ele_query( ctx->blacklist_map, &entry_key, NULL, ctx->blacklist_pool ) &&
    1314           0 :                          !fd_ssping_is_invalidated( ctx->ssping, new_addr ) ) ) {
    1315           0 :             if( FD_LIKELY( FD_SSPEER_SCORE_INVALID!=fd_sspeer_selector_add( ctx->selector, &entry_key, new_addr,
    1316           0 :                                                                             FD_SSPEER_LATENCY_UNKNOWN, FD_SSPEER_SLOT_UNKNOWN,
    1317           0 :                                                                             FD_SSPEER_SLOT_UNKNOWN, NULL, NULL ) ) ) {
    1318           0 :               fd_ssping_add( ctx->ssping, new_addr );
    1319           0 :             }
    1320           0 :           }
    1321           0 :         } else {
    1322           0 :           fd_sspeer_selector_remove( ctx->selector, &entry_key );
    1323           0 :         }
    1324           0 :         if( FD_LIKELY( !!cur_addr.l ) ) {
    1325           0 :           fd_ssping_remove( ctx->ssping, cur_addr );
    1326           0 :         }
    1327           0 :         entry->rpc_addr = new_addr;
    1328           0 :         if( !ctx->config.sources.gossip.allow_any ) {
    1329           0 :           FD_BASE58_ENCODE_32_BYTES( pubkey->uc, pubkey_b58 );
    1330           0 :           if( FD_LIKELY( !!new_addr.l ) ) {
    1331           0 :             FD_LOG_NOTICE(( "allowed gossip peer added with public key `%s` and RPC address `" FD_IP4_ADDR_FMT ":%hu`",
    1332           0 :                             pubkey_b58, FD_IP4_ADDR_FMT_ARGS( new_addr.addr ), fd_ushort_bswap( new_addr.port ) ));
    1333           0 :           } else {
    1334           0 :             FD_LOG_WARNING(( "allowed gossip peer with public key `%s` does not advertise an RPC address", pubkey_b58 ));
    1335           0 :           }
    1336           0 :         }
    1337           0 :       }
    1338          12 :       break;
    1339          21 :     }
    1340           6 :     case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE: {
    1341           6 :       FD_TEST( msg->contact_info_remove->idx<GOSSIP_PEERS_MAX );
    1342           6 :       gossip_ci_entry_t * entry = ctx->gossip.ci_table + msg->contact_info_remove->idx;
    1343           6 :       ulong rem_idx = gossip_ci_map_idx_remove( ctx->gossip.ci_map, &entry->pubkey, ULONG_MAX, ctx->gossip.ci_table );
    1344           6 :       if( rem_idx!=ULONG_MAX ) {
    1345           3 :         FD_TEST( entry->allowed && rem_idx==msg->contact_info_remove->idx );
    1346           3 :         ctx->gossip.allowed_cnt--;
    1347           3 :         fd_ip4_port_t addr = entry->rpc_addr;
    1348           3 :         if( FD_LIKELY( !!addr.l ) ) {
    1349           0 :           fd_ssping_remove( ctx->ssping, addr );
    1350           0 :           fd_sspeer_key_t entry_key = {0};
    1351           0 :           *entry_key.pubkey = entry->pubkey;
    1352           0 :           entry_key.is_url  = 0;
    1353           0 :           fd_sspeer_selector_remove( ctx->selector, &entry_key );
    1354           0 :         }
    1355           3 :         if( !ctx->config.sources.gossip.allow_any ) {
    1356           0 :           FD_BASE58_ENCODE_32_BYTES( entry->pubkey.uc, pubkey_b58 );
    1357           0 :           FD_LOG_WARNING(( "allowed gossip peer removed with public key `%s` and RPC address `" FD_IP4_ADDR_FMT ":%hu`",
    1358           0 :                            pubkey_b58, FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ) ));
    1359           0 :         }
    1360           3 :       }
    1361           6 :       fd_memset( entry, 0, sizeof(*entry) );
    1362           6 :       break;
    1363           6 :     }
    1364           0 :     case FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES: {
    1365           0 :       ulong idx = gossip_ci_map_idx_query_const( ctx->gossip.ci_map, (fd_pubkey_t const *)msg->origin, ULONG_MAX, ctx->gossip.ci_table );
    1366           0 :       if( FD_LIKELY( idx!=ULONG_MAX ) ) {
    1367           0 :         gossip_ci_entry_t * entry = ctx->gossip.ci_table + idx;
    1368           0 :         FD_TEST( entry->allowed );
    1369           0 :         fd_sspeer_key_t entry_key = {0};
    1370           0 :         *entry_key.pubkey = entry->pubkey;
    1371           0 :         entry_key.is_url  = 0;
    1372           0 :         on_snapshot_hash( ctx, &entry_key, entry->rpc_addr, msg );
    1373           0 :       }
    1374           0 :       break;
    1375           0 :     }
    1376           0 :     default:
    1377           0 :       FD_LOG_ERR(( "snapct: unexpected gossip tag %u", (uint)msg->tag ));
    1378           0 :       break;
    1379          27 :   }
    1380          27 : }
    1381             : 
    1382             : /* Validate and handle a pipeline control ack for INIT_{FULL,INCR},
    1383             :    NEXT, DONE, and FINI.  Returns 0 on success, and -1 if the sig was
    1384             :    unrecognized or the state was unexpected. */
    1385             : static int
    1386             : process_ctrl_ack( fd_snapct_tile_t * ctx,
    1387           0 :                   ulong              sig ) {
    1388           0 :   switch( sig ) {
    1389           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
    1390           0 :       if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_READING_FULL_HTTP ||
    1391           0 :                      ctx->state==FD_SNAPCT_STATE_READING_FULL_FILE ) ) {
    1392           0 :         ctx->flush_ack++;
    1393           0 :         FD_TEST( ctx->flush_ack <= ctx->flush_ack_cnt );
    1394           0 :       } else if( FD_UNLIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET ||
    1395           0 :                               ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ) ) {
    1396             :         /* Safe to ignore -- stale ack from before RESET. */
    1397           0 :       } else return -1;
    1398           0 :       return 0;
    1399             : 
    1400           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
    1401           0 :       if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP ||
    1402           0 :                      ctx->state==FD_SNAPCT_STATE_READING_INCREMENTAL_FILE ) ) {
    1403           0 :         ctx->flush_ack++;
    1404           0 :         FD_TEST( ctx->flush_ack <= ctx->flush_ack_cnt );
    1405           0 :       } else if( FD_UNLIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET ||
    1406           0 :                               ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ) {
    1407             :         /* Safe to ignore -- stale ack from before RESET. */
    1408           0 :       } else return -1;
    1409           0 :       return 0;
    1410             : 
    1411           0 :     case FD_SNAPSHOT_MSG_CTRL_NEXT:
    1412           0 :       if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE ||
    1413           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE ) ) {
    1414           0 :         ctx->flush_ack++;
    1415           0 :         FD_TEST( ctx->flush_ack <= ctx->flush_ack_cnt );
    1416           0 :       } else if( FD_UNLIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET ||
    1417           0 :                               ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ) ) {
    1418             :         /* Safe to ignore -- stale ack from before RESET. */
    1419           0 :       } else return -1;
    1420           0 :       return 0;
    1421             : 
    1422           0 :     case FD_SNAPSHOT_MSG_CTRL_DONE:
    1423           0 :       if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE ||
    1424           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE ||
    1425           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE ||
    1426           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE ) ) {
    1427           0 :         ctx->flush_ack++;
    1428           0 :         FD_TEST( ctx->flush_ack <= ctx->flush_ack_cnt );
    1429           0 :       } else if( FD_UNLIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET ||
    1430           0 :                               ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ||
    1431           0 :                               ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET ||
    1432           0 :                               ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ) {
    1433             :         /* Safe to ignore -- stale ack from before RESET. */
    1434           0 :       } else return -1;
    1435           0 :       return 0;
    1436             : 
    1437           0 :     case FD_SNAPSHOT_MSG_CTRL_FINI:
    1438           0 :       if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI ||
    1439           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI ||
    1440           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI ||
    1441           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI ) ) {
    1442           0 :         ctx->flush_ack++;
    1443           0 :         FD_TEST( ctx->flush_ack <= ctx->flush_ack_cnt );
    1444           0 :       } else if( FD_UNLIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET ||
    1445           0 :                               ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ||
    1446           0 :                               ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET ||
    1447           0 :                               ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ) {
    1448             :         /* Safe to ignore -- stale ack from before RESET. */
    1449           0 :       } else return -1;
    1450           0 :       return 0;
    1451             : 
    1452           0 :     default:
    1453           0 :       return -1;
    1454           0 :   }
    1455           0 : }
    1456             : 
    1457             : static void
    1458             : snapld_frag( fd_snapct_tile_t *  ctx,
    1459             :              ulong               sig,
    1460             :              ulong               sz,
    1461             :              ulong               chunk,
    1462          15 :              fd_stem_context_t * stem ) {
    1463          15 :   if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_META ) ) {
    1464             :     /* Before snapld starts sending down data fragments, it first sends
    1465             :        a metadata message containing the total size of the snapshot as
    1466             :        well as the filename.  This is only done for HTTP loading. */
    1467           0 :     int full;
    1468           0 :     switch( ctx->state ) {
    1469           0 :       case FD_SNAPCT_STATE_READING_FULL_HTTP:        full = 1; break;
    1470           0 :       case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP: full = 0; break;
    1471             : 
    1472           0 :       case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
    1473           0 :       case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
    1474           0 :         return; /* Ignore */
    1475           0 :       default: FD_LOG_ERR(( "invalid meta frag in state %s", fd_snapct_state_str( ctx->state ) ));
    1476           0 :     }
    1477             : 
    1478           0 :     FD_TEST( sz==sizeof(fd_ssctrl_meta_t) );
    1479           0 :     fd_ssctrl_meta_t const * meta = fd_chunk_to_laddr_const( ctx->snapld_in_mem, chunk );
    1480             : 
    1481           0 :     if( FD_UNLIKELY( meta->total_sz==0UL ) ) {
    1482           0 :       if( FD_UNLIKELY( !ctx->malformed ) ) {
    1483           0 :         FD_LOG_WARNING(( "received zero Content-Length metadata for %s snapshot, marking malformed", full ? "full" : "incremental" ));
    1484           0 :         ctx->malformed = 1;
    1485           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
    1486           0 :       }
    1487           0 :       return;
    1488           0 :     }
    1489             : 
    1490           0 :     if( full ) ctx->metrics.full.bytes_total        = meta->total_sz;
    1491           0 :     else       ctx->metrics.incremental.bytes_total = meta->total_sz;
    1492             : 
    1493           0 :     return;
    1494           0 :   }
    1495          15 :   if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_CTRL_FAIL ) ) {
    1496           0 :     if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET ||
    1497           0 :                    ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ||
    1498           0 :                    ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET ||
    1499           0 :                    ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ) {
    1500           0 :       ctx->flush_ack++;
    1501           0 :       FD_TEST( ctx->flush_ack <= ctx->flush_ack_cnt );
    1502           0 :     } else {
    1503           0 :       FD_LOG_ERR(( "unexpected control frag %lu (%s) in state %d (%s)", sig, fd_ssctrl_msg_ctrl_str( sig ), ctx->state, fd_snapct_state_str( ctx->state ) ));
    1504           0 :     }
    1505           0 :     return;
    1506           0 :   }
    1507          15 :   if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_CTRL_ERROR ) ) {
    1508             :     /* CTRL_ERROR message directly from snapld can be snapld-generated
    1509             :        or snapct-generated-and-forwarded. */
    1510           0 :     switch( ctx->state ) {
    1511           0 :       case FD_SNAPCT_STATE_READING_FULL_FILE:
    1512           0 :       case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI:
    1513           0 :       case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE:
    1514           0 :       case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
    1515           0 :       case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI:
    1516           0 :       case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE:
    1517           0 :       case FD_SNAPCT_STATE_READING_FULL_HTTP:
    1518           0 :       case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI:
    1519           0 :       case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE:
    1520           0 :       case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP:
    1521           0 :       case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI:
    1522           0 :       case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE:
    1523           0 :         FD_LOG_WARNING(( "received error from snapld in state %d (%s)",
    1524           0 :                          ctx->state, fd_snapct_state_str( ctx->state ) ));
    1525           0 :         ctx->malformed = 1;
    1526           0 :         break;
    1527           0 :       default:
    1528           0 :         break;
    1529           0 :     }
    1530           0 :     return;
    1531           0 :   }
    1532          15 :   if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_LOAD_COMPLETE ) ) {
    1533          15 :     int full = 0;
    1534          15 :     switch( ctx->state ) {
    1535           3 :       case FD_SNAPCT_STATE_READING_FULL_FILE:
    1536           6 :       case FD_SNAPCT_STATE_READING_FULL_HTTP:        full = 1; break;
    1537           0 :       case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
    1538           3 :       case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP: full = 0; break;
    1539           3 :       case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET:
    1540           3 :       case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
    1541           3 :       case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET:
    1542           6 :       case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
    1543           6 :         return; /* Ignore during reset. */
    1544           0 :       default:
    1545           0 :         FD_LOG_ERR(( "invalid load_complete in state %s", fd_snapct_state_str( ctx->state ) ));
    1546           0 :         return;
    1547          15 :     }
    1548           9 :     if( FD_UNLIKELY( ctx->malformed ) ) return;
    1549             :     /* Validate that all expected bytes were received. */
    1550           6 :     ulong bytes_read  = full ? ctx->metrics.full.bytes_read  : ctx->metrics.incremental.bytes_read;
    1551           6 :     ulong bytes_total = full ? ctx->metrics.full.bytes_total : ctx->metrics.incremental.bytes_total;
    1552           6 :     if( FD_UNLIKELY( !bytes_total || bytes_read!=bytes_total ) ) {
    1553           0 :       ctx->malformed = 1;
    1554           0 :       FD_LOG_WARNING(( "load_complete but bytes_read %lu != bytes_total %lu for %s snapshot",
    1555           0 :                        bytes_read, bytes_total, full ? "full" : "incremental" ));
    1556           0 :       fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
    1557           0 :       return;
    1558           0 :     }
    1559           6 :     ctx->load_complete = 1;
    1560           6 :     return;
    1561           6 :   }
    1562           0 :   if( FD_UNLIKELY( sig!=FD_SNAPSHOT_MSG_DATA ) ) {
    1563           0 :     if( process_ctrl_ack( ctx, sig ) ) {
    1564           0 :       FD_LOG_ERR(( "unexpected control frag %lu (%s) in state %d (%s)", sig, fd_ssctrl_msg_ctrl_str( sig ), ctx->state, fd_snapct_state_str( ctx->state ) ));
    1565           0 :     }
    1566           0 :     return;
    1567           0 :   }
    1568             : 
    1569           0 :   int full, file;
    1570           0 :   switch( ctx->state ) {
    1571             :     /* Expected cases, fall through below */
    1572           0 :     case FD_SNAPCT_STATE_READING_FULL_FILE:        full = 1; file = 1; break;
    1573           0 :     case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE: full = 0; file = 1; break;
    1574           0 :     case FD_SNAPCT_STATE_READING_FULL_HTTP:        full = 1; file = 0; break;
    1575           0 :     case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP: full = 0; file = 0; break;
    1576             : 
    1577           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET:
    1578           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET:
    1579           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
    1580           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
    1581             :       /* We are waiting for a reset to fully propagate through the
    1582             :          pipeline, just throw away any trailing data frags. */
    1583           0 :       return;
    1584             : 
    1585           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI:
    1586           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI:
    1587           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI:
    1588           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI:
    1589           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE:
    1590           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE:
    1591           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE:
    1592           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE:
    1593             :       /* Based on previously received data frags, we expected that the
    1594             :          current full / incremental snapshot was finished, but then we
    1595             :          received additional data frags.  Unsafe to continue so throw
    1596             :          away the whole snapshot.  Do not publish MSG_CTRL_ERROR here:
    1597             :          data forwarding is already complete, and the state machine
    1598             :          will publish MSG_CTRL_FAIL on the next tick. */
    1599           0 :       if( !ctx->malformed ) {
    1600           0 :         ctx->malformed = 1;
    1601           0 :         FD_LOG_WARNING(( "complete snapshot loaded but read %lu extra bytes", sz ));
    1602           0 :       }
    1603           0 :       return;
    1604             : 
    1605           0 :     case FD_SNAPCT_STATE_WAITING_FOR_PEERS:
    1606           0 :     case FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL:
    1607           0 :     case FD_SNAPCT_STATE_COLLECTING_PEERS:
    1608           0 :     case FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL:
    1609           0 :     case FD_SNAPCT_STATE_SHUTDOWN:
    1610           0 :     default:
    1611           0 :       FD_LOG_ERR(( "invalid data frag in state %s", fd_snapct_state_str( ctx->state ) ));
    1612           0 :       return;
    1613           0 :   }
    1614             : 
    1615           0 :   if( FD_UNLIKELY( full  && ctx->metrics.full.bytes_total==0UL ) ) {
    1616           0 :     if( !ctx->malformed ) {
    1617           0 :       ctx->malformed = 1;
    1618           0 :       FD_LOG_WARNING(( "received data frag for full snapshot with zero bytes_total" ));
    1619           0 :       fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
    1620           0 :     }
    1621           0 :     return;
    1622           0 :   }
    1623           0 :   if( FD_UNLIKELY( !full && ctx->metrics.incremental.bytes_total==0UL ) ) {
    1624           0 :     if( !ctx->malformed ) {
    1625           0 :       ctx->malformed = 1;
    1626           0 :       FD_LOG_WARNING(( "received data frag for incremental snapshot with zero bytes_total" ));
    1627           0 :       fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
    1628           0 :     }
    1629           0 :     return;
    1630           0 :   }
    1631             : 
    1632           0 :   if( full ) ctx->metrics.full.bytes_read        += sz;
    1633           0 :   else       ctx->metrics.incremental.bytes_read += sz;
    1634             : 
    1635           0 :   if( !file && -1!=ctx->local_out.dir_fd ) {
    1636           0 :     uchar const * data = fd_chunk_to_laddr_const( ctx->snapld_in_mem, chunk );
    1637           0 :     int fd = full ? ctx->local_out.full_snapshot_fd : ctx->local_out.incremental_snapshot_fd;
    1638           0 :     ulong written_sz = 0;
    1639           0 :     while( written_sz<sz ) {
    1640           0 :       long result = write( fd, data+written_sz, sz-written_sz );
    1641           0 :       if( FD_UNLIKELY( -1==result && errno==EINTR ) ) continue;
    1642           0 :       if( FD_UNLIKELY( -1==result && errno==ENOSPC ) ) {
    1643           0 :         FD_LOG_ERR(( "Out of disk space when writing out snapshot data to `%s`", ctx->config.snapshots_path ));
    1644           0 :       } else if( FD_UNLIKELY( 0L>result ) ) {
    1645           0 :         FD_LOG_ERR(( "write() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
    1646           0 :       } else if( FD_UNLIKELY( 0L==result ) ) {
    1647           0 :         FD_LOG_ERR(( "write() returned 0 for non-zero count" ));
    1648           0 :       }
    1649             : 
    1650           0 :       written_sz += (ulong)result;
    1651           0 :     }
    1652           0 :     if( full ) ctx->metrics.full.bytes_written        += sz;
    1653           0 :     else       ctx->metrics.incremental.bytes_written += sz;
    1654           0 :   }
    1655             : 
    1656           0 :   if( FD_UNLIKELY( ( full && ctx->metrics.full.bytes_read        > ctx->metrics.full.bytes_total ) ||
    1657           0 :                    (!full && ctx->metrics.incremental.bytes_read > ctx->metrics.incremental.bytes_total ) ) ) {
    1658           0 :     if( !ctx->malformed ) {
    1659           0 :       ctx->malformed = 1;
    1660           0 :       FD_LOG_WARNING(( "expected %s snapshot size of %lu bytes but read %lu bytes",
    1661           0 :                        full ? "full" : "incremental",
    1662           0 :                        full ? ctx->metrics.full.bytes_total : ctx->metrics.incremental.bytes_total,
    1663           0 :                        full ? ctx->metrics.full.bytes_read  : ctx->metrics.incremental.bytes_read ));
    1664           0 :       fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
    1665           0 :     }
    1666           0 :     return;
    1667           0 :   }
    1668           0 : }
    1669             : 
    1670             : static void
    1671             : ctrl_ack_frag( fd_snapct_tile_t *  ctx,
    1672           0 :                ulong               sig ) {
    1673           0 :   switch( sig ) {
    1674           0 :     case FD_SNAPSHOT_MSG_CTRL_FAIL:
    1675           0 :       if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET ||
    1676           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ||
    1677           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET ||
    1678           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ) {
    1679           0 :         ctx->flush_ack++;
    1680           0 :         FD_TEST( ctx->flush_ack <= ctx->flush_ack_cnt );
    1681           0 :       } else {
    1682           0 :         FD_LOG_ERR(( "unexpected control frag %lu (%s) in state %d (%s)", sig, fd_ssctrl_msg_ctrl_str( sig ), ctx->state, fd_snapct_state_str( ctx->state ) ));
    1683           0 :       }
    1684           0 :       return;
    1685             : 
    1686           0 :     case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
    1687           0 :       return;
    1688             : 
    1689           0 :     case FD_SNAPSHOT_MSG_CTRL_ERROR:
    1690           0 :       switch( ctx->state ) {
    1691           0 :         case FD_SNAPCT_STATE_READING_FULL_FILE:
    1692           0 :         case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI:
    1693           0 :         case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE:
    1694           0 :         case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
    1695           0 :         case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI:
    1696           0 :         case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE:
    1697           0 :         case FD_SNAPCT_STATE_READING_FULL_HTTP:
    1698           0 :         case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI:
    1699           0 :         case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE:
    1700           0 :         case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP:
    1701           0 :         case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI:
    1702           0 :         case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE:
    1703             :           /* Do not publish MSG_CTRL_ERROR: the error originated
    1704             :              downstream, so re-publishing would be redundant.
    1705             :              MSG_CTRL_FAIL follows on the next state machine tick. */
    1706           0 :           FD_LOG_WARNING(( "received error from downstream tile while in state %s",
    1707           0 :                            fd_snapct_state_str( ctx->state ) ));
    1708           0 :           ctx->malformed = 1;
    1709           0 :           break;
    1710           0 :         default:
    1711           0 :           break;
    1712           0 :       }
    1713           0 :       return;
    1714             : 
    1715           0 :     default:
    1716           0 :       break;
    1717           0 :   }
    1718           0 :   if( process_ctrl_ack( ctx, sig ) ) {
    1719           0 :     FD_LOG_ERR(( "unexpected control frag %lu (%s) in state %d (%s)", sig, fd_ssctrl_msg_ctrl_str( sig ), ctx->state, fd_snapct_state_str( ctx->state ) ));
    1720           0 :   }
    1721           0 : }
    1722             : 
    1723             : static int
    1724             : returnable_frag( fd_snapct_tile_t *  ctx,
    1725             :                  ulong               in_idx,
    1726             :                  ulong               seq    FD_PARAM_UNUSED,
    1727             :                  ulong               sig,
    1728             :                  ulong               chunk,
    1729             :                  ulong               sz,
    1730             :                  ulong               ctl    FD_PARAM_UNUSED,
    1731             :                  ulong               tsorig FD_PARAM_UNUSED,
    1732             :                  ulong               tspub  FD_PARAM_UNUSED,
    1733           0 :                  fd_stem_context_t * stem ) {
    1734           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) {
    1735           0 :     gossip_frag( ctx, sig, sz, chunk );
    1736           0 :   } else if( ctx->in_kind[ in_idx ]==IN_KIND_SNAPLD ) {
    1737           0 :     snapld_frag( ctx, sig, sz, chunk, stem );
    1738           0 :   } else if( ctx->in_kind[ in_idx ]==IN_KIND_ACK ) {
    1739           0 :     ctrl_ack_frag( ctx, sig );
    1740           0 :   } else FD_LOG_ERR(( "invalid in_kind %lu %u", in_idx, (uint)ctx->in_kind[ in_idx ] ));
    1741           0 :   return 0;
    1742           0 : }
    1743             : 
    1744             : static void
    1745             : privileged_init( fd_topo_t const *      topo,
    1746           0 :                  fd_topo_tile_t const * tile ) {
    1747           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
    1748             : 
    1749           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
    1750           0 :   fd_snapct_tile_t * ctx         = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t),  sizeof(fd_snapct_tile_t) );
    1751           0 :   void *             _ssping     = FD_SCRATCH_ALLOC_APPEND( l, fd_ssping_align(),          fd_ssping_footprint( TOTAL_PEERS_MAX ) );
    1752           0 :                                    FD_SCRATCH_ALLOC_APPEND( l, alignof(gossip_ci_entry_t), sizeof(gossip_ci_entry_t)*GOSSIP_PEERS_MAX );
    1753           0 :                                    FD_SCRATCH_ALLOC_APPEND( l, gossip_ci_map_align(),      gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
    1754           0 :   void *             _ssresolver = FD_SCRATCH_ALLOC_APPEND( l, fd_http_resolver_align(),   fd_http_resolver_footprint( SERVER_PEERS_MAX )  );
    1755           0 :                                    FD_SCRATCH_ALLOC_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( TOTAL_PEERS_MAX ) );
    1756           0 :                                    FD_SCRATCH_ALLOC_APPEND( l, blacklist_pool_align(),     blacklist_pool_footprint( TOTAL_PEERS_MAX ) );
    1757           0 :                                    FD_SCRATCH_ALLOC_APPEND( l, blacklist_map_align(),      blacklist_map_footprint( blacklist_map_chain_cnt_est( TOTAL_PEERS_MAX ) ) );
    1758             : 
    1759           0 : #if FD_HAS_OPENSSL
    1760           0 :   void * _alloc = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
    1761           0 :   fd_alloc_t * alloc = fd_alloc_join( fd_alloc_new( _alloc, 1UL ), tile->kind_id );
    1762           0 :   fd_ossl_tile_init( alloc );
    1763           0 : #endif
    1764             : 
    1765           0 :   ctx->ssping = NULL;
    1766           0 :   if( FD_LIKELY( download_enabled( tile ) ) )         ctx->ssping = fd_ssping_join( fd_ssping_new( _ssping, TOTAL_PEERS_MAX, 1UL, on_ping, ctx ) );
    1767           0 :   if( FD_LIKELY( tile->snapct.sources.servers_cnt ) ) ctx->ssresolver = fd_http_resolver_join( fd_http_resolver_new( _ssresolver, SERVER_PEERS_MAX, tile->snapct.incremental_snapshots, on_resolve, ctx ) );
    1768           0 :   else                                                ctx->ssresolver = NULL;
    1769             : 
    1770           0 :   fd_ssarchive_remove_old_snapshots( tile->snapct.snapshots_path,
    1771           0 :                                      tile->snapct.max_full_snapshots_to_keep,
    1772           0 :                                      tile->snapct.max_incremental_snapshots_to_keep );
    1773             : 
    1774           0 :   ulong full_slot = ULONG_MAX;
    1775           0 :   ulong incremental_slot = ULONG_MAX;
    1776           0 :   int full_is_zstd = 0;
    1777           0 :   int incremental_is_zstd = 0;
    1778           0 :   char full_path[ PATH_MAX ] = {0};
    1779           0 :   char incremental_path[ PATH_MAX ] = {0};
    1780           0 :   uchar full_snapshot_hash[ FD_HASH_FOOTPRINT ] = {0};
    1781           0 :   uchar incremental_snapshot_hash[ FD_HASH_FOOTPRINT ] = {0};
    1782           0 :   if( FD_UNLIKELY( -1==fd_ssarchive_latest_pair( tile->snapct.snapshots_path,
    1783           0 :                                                  tile->snapct.incremental_snapshots,
    1784           0 :                                                  &full_slot,
    1785           0 :                                                  &incremental_slot,
    1786           0 :                                                  full_path,
    1787           0 :                                                  incremental_path,
    1788           0 :                                                  &full_is_zstd,
    1789           0 :                                                  &incremental_is_zstd,
    1790           0 :                                                  full_snapshot_hash,
    1791           0 :                                                  incremental_snapshot_hash ) ) ) {
    1792           0 :     if( FD_UNLIKELY( !download_enabled( tile ) ) ) {
    1793           0 :       FD_LOG_ERR(( "No snapshots found in `%s` and no download sources are enabled. "
    1794           0 :                    "Please enable downloading via [snapshots.sources] and restart.", tile->snapct.snapshots_path ));
    1795           0 :     }
    1796           0 :     ctx->local_in.full_snapshot_slot        = ULONG_MAX;
    1797           0 :     ctx->local_in.incremental_snapshot_slot = ULONG_MAX;
    1798           0 :     ctx->local_in.full_snapshot_size        = 0UL;
    1799           0 :     ctx->local_in.incremental_snapshot_size = 0UL;
    1800           0 :     ctx->local_in.full_snapshot_zstd        = 0;
    1801           0 :     ctx->local_in.incremental_snapshot_zstd = 0;
    1802           0 :     fd_cstr_fini( ctx->local_in.full_snapshot_path );
    1803           0 :     fd_cstr_fini( ctx->local_in.incremental_snapshot_path );
    1804           0 :     fd_memset( ctx->local_in.full_snapshot_hash, 0, FD_HASH_FOOTPRINT );
    1805           0 :     fd_memset( ctx->local_in.incremental_snapshot_hash, 0, FD_HASH_FOOTPRINT );
    1806           0 :   } else {
    1807           0 :     FD_TEST( full_slot!=ULONG_MAX );
    1808             : 
    1809           0 :     ctx->local_in.full_snapshot_slot        = full_slot;
    1810           0 :     ctx->local_in.incremental_snapshot_slot = incremental_slot;
    1811           0 :     ctx->local_in.full_snapshot_zstd        = full_is_zstd;
    1812           0 :     ctx->local_in.incremental_snapshot_zstd = incremental_is_zstd;
    1813             : 
    1814           0 :     fd_cstr_ncpy( ctx->local_in.full_snapshot_path, full_path, PATH_MAX );
    1815           0 :     fd_memcpy( ctx->local_in.full_snapshot_hash, full_snapshot_hash, FD_HASH_FOOTPRINT );
    1816           0 :     struct stat full_stat;
    1817           0 :     if( FD_UNLIKELY( -1==stat( ctx->local_in.full_snapshot_path, &full_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", full_path, errno, fd_io_strerror( errno ) ));
    1818           0 :     if( FD_UNLIKELY( !S_ISREG( full_stat.st_mode ) ) ) FD_LOG_ERR(( "full snapshot path `%s` is not a regular file", full_path ));
    1819           0 :     ctx->local_in.full_snapshot_size = (ulong)full_stat.st_size;
    1820             : 
    1821           0 :     if( FD_LIKELY( incremental_slot!=ULONG_MAX ) ) {
    1822           0 :       fd_cstr_ncpy( ctx->local_in.incremental_snapshot_path, incremental_path, PATH_MAX );
    1823           0 :       fd_memcpy( ctx->local_in.incremental_snapshot_hash, incremental_snapshot_hash, FD_HASH_FOOTPRINT );
    1824           0 :       struct stat incremental_stat;
    1825           0 :       if( FD_UNLIKELY( -1==stat( ctx->local_in.incremental_snapshot_path, &incremental_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", incremental_path, errno, fd_io_strerror( errno ) ));
    1826           0 :       if( FD_UNLIKELY( !S_ISREG( incremental_stat.st_mode ) ) ) FD_LOG_ERR(( "incremental snapshot path `%s` is not a regular file", incremental_path ));
    1827           0 :       ctx->local_in.incremental_snapshot_size = (ulong)incremental_stat.st_size;
    1828           0 :     } else {
    1829           0 :       ctx->local_in.incremental_snapshot_size = 0UL;
    1830           0 :       fd_cstr_fini( ctx->local_in.incremental_snapshot_path );
    1831           0 :     }
    1832           0 :   }
    1833             : 
    1834           0 :   ctx->local_out.dir_fd                  = -1;
    1835           0 :   ctx->local_out.full_snapshot_fd        = -1;
    1836           0 :   ctx->local_out.incremental_snapshot_fd = -1;
    1837           0 :   if( FD_LIKELY( download_enabled( tile ) ) ) {
    1838             :     /* Switch to non-root uid/gid for file creation so snapshot files
    1839             :        are owned by the target user, not root. */
    1840           0 :     gid_t gid = getgid();
    1841           0 :     uid_t uid = getuid();
    1842           0 :     if( FD_LIKELY( !gid && -1==syscall( __NR_setresgid, -1, tile->snapct.target_gid, -1 ) ) )
    1843           0 :       FD_LOG_ERR(( "setresgid() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
    1844           0 :     if( FD_LIKELY( !uid && -1==syscall( __NR_setresuid, -1, tile->snapct.target_uid, -1 ) ) )
    1845           0 :       FD_LOG_ERR(( "setresuid() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
    1846             : 
    1847           0 :     ctx->local_out.dir_fd = open( tile->snapct.snapshots_path, O_DIRECTORY|O_CLOEXEC );
    1848           0 :     if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) FD_LOG_ERR(( "open(%s) failed (%i-%s)", tile->snapct.snapshots_path, errno, fd_io_strerror( errno ) ));
    1849             : 
    1850           0 :     ctx->local_out.full_snapshot_fd = openat( ctx->local_out.dir_fd, TEMP_FULL_SNAP_NAME, O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
    1851           0 :     if( FD_UNLIKELY( -1==ctx->local_out.full_snapshot_fd ) ) FD_LOG_ERR(( "open(%s/%s) failed (%i-%s)", tile->snapct.snapshots_path, TEMP_FULL_SNAP_NAME, errno, fd_io_strerror( errno ) ));
    1852             : 
    1853           0 :     if( FD_LIKELY( tile->snapct.incremental_snapshots ) ) {
    1854           0 :       ctx->local_out.incremental_snapshot_fd = openat( ctx->local_out.dir_fd, TEMP_INCR_SNAP_NAME, O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
    1855           0 :       if( FD_UNLIKELY( -1==ctx->local_out.incremental_snapshot_fd ) ) FD_LOG_ERR(( "open(%s/%s) failed (%i-%s)", tile->snapct.snapshots_path, TEMP_INCR_SNAP_NAME, errno, fd_io_strerror( errno ) ));
    1856           0 :     }
    1857             : 
    1858           0 :     if( FD_UNLIKELY( -1==syscall( __NR_setresuid, -1, uid, -1 ) ) )
    1859           0 :       FD_LOG_ERR(( "setresuid() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
    1860           0 :     if( FD_UNLIKELY( -1==syscall( __NR_setresgid, -1, gid, -1 ) ) )
    1861           0 :       FD_LOG_ERR(( "setresgid() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
    1862           0 :   }
    1863             : 
    1864           0 :   FD_TEST( fd_rng_secure( &ctx->selector_seed, 8UL ) );
    1865           0 : }
    1866             : 
    1867             : static inline fd_snapct_out_link_t
    1868             : out1( fd_topo_t const *      topo,
    1869             :       fd_topo_tile_t const * tile,
    1870           0 :       char const *           name ) {
    1871           0 :   ulong idx = ULONG_MAX;
    1872             : 
    1873           0 :   for( ulong i=0UL; i<tile->out_cnt; i++ ) {
    1874           0 :     fd_topo_link_t const * link = &topo->links[ tile->out_link_id[ i ] ];
    1875           0 :     if( !strcmp( link->name, name ) ) {
    1876           0 :       if( FD_UNLIKELY( idx!=ULONG_MAX ) ) FD_LOG_ERR(( "tile %s:%lu had multiple output links named %s but expected one", tile->name, tile->kind_id, name ));
    1877           0 :       idx = i;
    1878           0 :     }
    1879           0 :   }
    1880             : 
    1881           0 :   if( FD_UNLIKELY( idx==ULONG_MAX ) ) return (fd_snapct_out_link_t){ .idx = ULONG_MAX, .mem = NULL, .chunk0 = 0, .wmark = 0, .chunk = 0, .mtu = 0 };
    1882             : 
    1883           0 :   ulong mtu = topo->links[ tile->out_link_id[ idx ] ].mtu;
    1884           0 :   if( FD_UNLIKELY( mtu==0UL ) ) return (fd_snapct_out_link_t){ .idx = idx, .mem = NULL, .chunk0 = ULONG_MAX, .wmark = ULONG_MAX, .chunk = ULONG_MAX, .mtu = mtu };
    1885             : 
    1886           0 :   void * mem   = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ idx ] ].dcache_obj_id ].wksp_id ].wksp;
    1887           0 :   ulong chunk0 = fd_dcache_compact_chunk0( mem, topo->links[ tile->out_link_id[ idx ] ].dcache );
    1888           0 :   ulong wmark  = fd_dcache_compact_wmark ( mem, topo->links[ tile->out_link_id[ idx ] ].dcache, mtu );
    1889           0 :   return (fd_snapct_out_link_t){ .idx = idx, .mem = mem, .chunk0 = chunk0, .wmark = wmark, .chunk = chunk0, .mtu = mtu };
    1890           0 : }
    1891             : 
    1892             : static void
    1893             : unprivileged_init( fd_topo_t const *      topo,
    1894           0 :                    fd_topo_tile_t const * tile ) {
    1895           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
    1896             : 
    1897           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
    1898           0 :   fd_snapct_tile_t * ctx  = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t),  sizeof(fd_snapct_tile_t)       );
    1899           0 :                             FD_SCRATCH_ALLOC_APPEND( l, fd_ssping_align(),          fd_ssping_footprint( TOTAL_PEERS_MAX ) );
    1900           0 :   void * _ci_table        = FD_SCRATCH_ALLOC_APPEND( l, alignof(gossip_ci_entry_t), sizeof(gossip_ci_entry_t) * GOSSIP_PEERS_MAX );
    1901           0 :   void * _ci_map          = FD_SCRATCH_ALLOC_APPEND( l, gossip_ci_map_align(),      gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
    1902           0 :                             FD_SCRATCH_ALLOC_APPEND( l, fd_http_resolver_align(),   fd_http_resolver_footprint( SERVER_PEERS_MAX ) );
    1903           0 :   void * _selector        = FD_SCRATCH_ALLOC_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( TOTAL_PEERS_MAX ) );
    1904           0 :   void * _bl_pool         = FD_SCRATCH_ALLOC_APPEND( l, blacklist_pool_align(),     blacklist_pool_footprint( TOTAL_PEERS_MAX ) );
    1905           0 :   void * _bl_map          = FD_SCRATCH_ALLOC_APPEND( l, blacklist_map_align(),      blacklist_map_footprint( blacklist_map_chain_cnt_est( TOTAL_PEERS_MAX ) ) );
    1906             : 
    1907           0 :   ctx->config = tile->snapct;
    1908           0 :   ctx->gossip_enabled   = gossip_enabled( tile );
    1909           0 :   ctx->download_enabled = download_enabled( tile );
    1910             : 
    1911           0 :   ctx->selector       = fd_sspeer_selector_join( fd_sspeer_selector_new( _selector, TOTAL_PEERS_MAX, ctx->selector_seed ) );
    1912           0 :   ctx->blacklist_pool = blacklist_pool_join( blacklist_pool_new( _bl_pool, TOTAL_PEERS_MAX ) );
    1913           0 :   ctx->blacklist_map  = blacklist_map_join( blacklist_map_new( _bl_map, blacklist_map_chain_cnt_est( TOTAL_PEERS_MAX ), ctx->selector_seed ) );
    1914             : 
    1915           0 :   if( ctx->config.sources.servers_cnt ) {
    1916           0 :     for( ulong i=0UL; i<tile->snapct.sources.servers_cnt; i++ ) {
    1917             :       /* The peers needs to be added to resolver and to selector.
    1918             :          Only if this succeeds, add the peer to ssping list. */
    1919           0 :       if( FD_LIKELY( !fd_http_resolver_add( ctx->ssresolver,
    1920           0 :                                             tile->snapct.sources.servers[ i ].addr,
    1921           0 :                                             tile->snapct.sources.servers[ i ].hostname,
    1922           0 :                                             tile->snapct.sources.servers[ i ].is_https,
    1923           0 :                                             ctx->selector ) ) ) {
    1924           0 :         fd_ssping_add( ctx->ssping, tile->snapct.sources.servers[ i ].addr );
    1925           0 :       }
    1926           0 :     }
    1927           0 :   }
    1928             : 
    1929           0 :   if( FD_UNLIKELY( !ctx->config.incremental_snapshots ) ) {
    1930           0 :     FD_LOG_WARNING(( "incremental snapshots disabled via [snapshots.incremental_snapshots]." ));
    1931           0 :   }
    1932             : 
    1933           0 :   ctx->state          = FD_SNAPCT_STATE_INIT;
    1934           0 :   ctx->malformed      = 0;
    1935           0 :   ctx->load_complete  = 0;
    1936           0 :   FD_CHECK_ERR( ctx->config.wait_for_peers_timeout_nanos>0L, "snapct wait_for_peers_timeout_nanos must be positive" );
    1937           0 :   ctx->deadline_nanos = fd_log_wallclock() + ctx->config.wait_for_peers_timeout_nanos;
    1938           0 :   ctx->flush_ack      = 0;
    1939           0 :   ctx->flush_ack_cnt  = 0;
    1940           0 :   ctx->peer.addr.l    = 0UL;
    1941             : 
    1942           0 :   fd_memset( ctx->http_full_snapshot_name, 0, PATH_MAX );
    1943           0 :   fd_memset( ctx->http_incr_snapshot_name, 0, PATH_MAX );
    1944             : 
    1945           0 :   ctx->gossip_in_mem = NULL;
    1946           0 :   int has_snapld_dc = 0, ack_cnt = 0;
    1947           0 :   FD_TEST( tile->in_cnt<=MAX_IN_LINKS );
    1948           0 :   for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
    1949           0 :     fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ i ] ];
    1950           0 :     if( 0==strcmp( in_link->name, "gossip_out" ) ) {
    1951           0 :       ctx->in_kind[ i ]  = IN_KIND_GOSSIP;
    1952           0 :       ctx->gossip_in_mem = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
    1953           0 :     } else if( 0==strcmp( in_link->name, "snapld_dc" ) ) {
    1954           0 :       ctx->in_kind[ i ]  = IN_KIND_SNAPLD;
    1955           0 :       ctx->snapld_in_mem = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
    1956           0 :       FD_TEST( !has_snapld_dc );
    1957           0 :       has_snapld_dc = 1;
    1958           0 :     } else if( 0==strcmp( in_link->name, "snapin_ct" ) || 0==strcmp( in_link->name, "snapwr_ct" ) ){
    1959           0 :       ctx->in_kind[ i ] = IN_KIND_ACK;
    1960           0 :       ack_cnt++;
    1961           0 :     }
    1962           0 :   }
    1963           0 :   FD_TEST( has_snapld_dc && ack_cnt>0 );
    1964           0 :   ctx->flush_ack_cnt = ack_cnt + 1; /* +1 for snapld (acks via snapld_dc) */
    1965           0 :   FD_TEST( ctx->gossip_enabled==(ctx->gossip_in_mem!=NULL) );
    1966             : 
    1967           0 :   ctx->predicted_incremental.full_slot = FD_SSPEER_SLOT_UNKNOWN;
    1968           0 :   ctx->predicted_incremental.slot      = FD_SSPEER_SLOT_UNKNOWN;
    1969           0 :   ctx->predicted_incremental.pending   = 0;
    1970             : 
    1971           0 :   fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
    1972             : 
    1973           0 :   fd_memset( _ci_table, 0, sizeof(gossip_ci_entry_t) * GOSSIP_PEERS_MAX );
    1974           0 :   ctx->gossip.ci_table             = _ci_table;
    1975           0 :   ctx->gossip.ci_map               = gossip_ci_map_join( gossip_ci_map_new( _ci_map, gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ), 0UL ) );
    1976           0 :   ctx->gossip.allowed_cnt          = 0UL;
    1977           0 :   ctx->gossip.saturated            = !ctx->gossip_enabled;
    1978             : 
    1979           0 :   if( FD_UNLIKELY( tile->out_cnt<2UL || tile->out_cnt>3UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2-3", tile->out_cnt ));
    1980           0 :   ctx->out_ld  = out1( topo, tile, "snapct_ld"   );
    1981           0 :   ctx->out_gui = out1( topo, tile, "snapct_gui"  );
    1982           0 :   ctx->out_rp  = out1( topo, tile, "snapct_repr" );
    1983           0 : }
    1984             : 
    1985             : /* after_credit can result in as many as 5 stem publishes in some code
    1986             :    paths, and returnable_frag can result in 1. */
    1987           0 : #define STEM_BURST 6UL
    1988             : 
    1989           0 : #define STEM_LAZY  1000L
    1990             : 
    1991           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_snapct_tile_t
    1992           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapct_tile_t)
    1993             : 
    1994             : #define STEM_CALLBACK_SHOULD_SHUTDOWN     should_shutdown
    1995           0 : #define STEM_CALLBACK_METRICS_WRITE       metrics_write
    1996           0 : #define STEM_CALLBACK_AFTER_CREDIT        after_credit
    1997           0 : #define STEM_CALLBACK_RETURNABLE_FRAG     returnable_frag
    1998             : 
    1999             : #include "../../disco/stem/fd_stem.c"
    2000             : 
    2001             : #ifndef FD_TILE_TEST
    2002             : fd_topo_run_tile_t fd_tile_snapct = {
    2003             :   .name                     = NAME,
    2004             :   .rlimit_file_cnt_fn       = rlimit_file_cnt,
    2005             :   .populate_allowed_seccomp = populate_allowed_seccomp,
    2006             :   .populate_allowed_fds     = populate_allowed_fds,
    2007             :   .scratch_align            = scratch_align,
    2008             :   .scratch_footprint        = scratch_footprint,
    2009             :   .loose_footprint          = loose_footprint,
    2010             :   .privileged_init          = privileged_init,
    2011             :   .unprivileged_init        = unprivileged_init,
    2012             :   .run                      = stem_run,
    2013             :   .keep_host_networking     = 1,
    2014             :   .allow_connect            = 1,
    2015             :   .allow_renameat           = 1,
    2016             : };
    2017             : #endif
    2018             : 
    2019             : #undef NAME

Generated by: LCOV version 1.14