Line data    Source code 
       1             : #include "fd_snapct_tile.h"
       2             : #include "utils/fd_ssping.h"
       3             : #include "utils/fd_ssctrl.h"
       4             : #include "utils/fd_ssarchive.h"
       5             : #include "utils/fd_http_resolver.h"
       6             : #include "utils/fd_ssmsg.h"
       7             : 
       8             : #include "../../disco/topo/fd_topo.h"
       9             : #include "../../disco/metrics/fd_metrics.h"
      10             : #include "../../flamenco/gossip/fd_gossip_types.h"
      11             : 
      12             : #include <errno.h>
      13             : #include <stdio.h>
      14             : #include <fcntl.h>
      15             : #include <unistd.h>
      16             : #include <sys/stat.h>
      17             : #include <netinet/tcp.h>
      18             : #include <netinet/in.h>
      19             : 
      20             : #include "generated/fd_snapct_tile_seccomp.h"
      21             : 
      22             : #define NAME "snapct"
      23             : 
      24           0 : #define GOSSIP_PEERS_MAX (FD_CONTACT_INFO_TABLE_SIZE)
      25           0 : #define SERVER_PEERS_MAX (16UL) /* Maximum number of configured http peers */
      26           0 : #define TOTAL_PEERS_MAX  (GOSSIP_PEERS_MAX + SERVER_PEERS_MAX)
      27             : 
      28           0 : #define IN_KIND_SNAPIN  (0)
      29           0 : #define IN_KIND_SNAPLD  (1)
      30           0 : #define IN_KIND_GOSSIP  (2)
      31             : #define MAX_IN_LINKS    (3)
      32             : 
      33             : struct fd_restore_out_link {
      34             :   ulong       idx;
      35             :   fd_wksp_t * mem;
      36             :   ulong       chunk0;
      37             :   ulong       wmark;
      38             :   ulong       chunk;
      39             :   ulong       mtu;
      40             : };
      41             : 
      42             : typedef struct fd_restore_out_link fd_restore_out_link_t;
      43             : 
      44             : #define FD_SNAPCT_GOSSIP_FRESH_DEADLINE_NANOS              (7.5L*1000L*1000L*1000L)   /* gossip contact info is pushed every 7.5 seconds */
      45           0 : #define FD_SNAPCT_GOSSIP_SATURATION_THRESHOLD              (0.05)                     /* 5% fresh peers */
      46           0 : #define FD_SNAPCT_GOSSIP_TIMEOUT_DEADLINE_NANOS            (2L*60L*1000L*1000L*1000L) /* 2 minutes */
      47           0 : #define FD_SNAPCT_WAITING_FOR_PEERS_TIMEOUT_DEADLINE_NANOS (2L*60L*1000L*1000L*1000L) /* 2 minutes */
      48             : 
      49             : struct fd_snapct_gossip_ci_entry {
      50             :   fd_ip4_port_t rpc_addr;
      51             :   fd_pubkey_t   pubkey;
      52             :   long          wallclock_nanos;
      53             :   ulong         map_next;
      54             : };
      55             : typedef struct fd_snapct_gossip_ci_entry fd_snapct_gossip_ci_entry_t;
      56             : 
      57             : #define MAP_NAME               gossip_ci_map
      58           0 : #define MAP_KEY                pubkey
      59             : #define MAP_ELE_T              fd_snapct_gossip_ci_entry_t
      60             : #define MAP_KEY_T              fd_pubkey_t
      61           0 : #define MAP_NEXT               map_next
      62           0 : #define MAP_KEY_EQ(k0,k1)      fd_pubkey_eq( k0, k1 )
      63           0 : #define MAP_KEY_HASH(key,seed) (seed^fd_ulong_load_8( (key)->uc ))
      64             : #include "../../util/tmpl/fd_map_chain.c"
      65             : 
      66             : struct fd_snapct_tile {
      67             :   fd_ssping_t *          ssping;
      68             :   fd_http_resolver_t *   ssresolver;
      69             :   fd_sspeer_selector_t * selector;
      70             : 
      71             :   int   state;
      72             :   int   malformed;
      73             :   long  deadline_nanos;
      74             :   int   flush_ack;
      75             : 
      76             :   fd_ip4_port_t addr;
      77             : 
      78             :   struct {
      79             :     char  full_snapshot_path[ PATH_MAX ];
      80             :     char  incremental_snapshot_path[ PATH_MAX ];
      81             :     char  full_snapshot_name[ PATH_MAX ];
      82             :     char  incremental_snapshot_name[ PATH_MAX ];
      83             : 
      84             :     int   dir_fd;
      85             :     int   full_snapshot_fd;
      86             :     int   incremental_snapshot_fd;
      87             :   } local_out;
      88             : 
      89             :   uchar in_kind[ MAX_IN_LINKS ];
      90             : 
      91             :   struct {
      92             :     ulong full_slot;
      93             :     ulong slot;
      94             :     int   dirty;
      95             :   } predicted_incremental;
      96             : 
      97             :   struct {
      98             :     ulong full_snapshot_slot;
      99             :     char  full_snapshot_path[ PATH_MAX ];
     100             :     ulong full_snapshot_size;
     101             : 
     102             :     ulong incremental_snapshot_slot;
     103             :     char  incremental_snapshot_path[ PATH_MAX ];
     104             :     ulong incremental_snapshot_size;
     105             :   } local_in;
     106             : 
     107             :   struct {
     108             :     char path[ PATH_MAX ];
     109             :     int  do_download;
     110             :     int  incremental_snapshot_fetch;
     111             :     uint maximum_local_snapshot_age;
     112             :     uint minimum_download_speed_mib;
     113             :     uint maximum_download_retry_abort;
     114             :     uint max_full_snapshots_to_keep;
     115             :     uint max_incremental_snapshots_to_keep;
     116             :     int  gossip_peers_enabled;
     117             :   } config;
     118             : 
     119             :   struct {
     120             :     struct {
     121             :       ulong bytes_read;
     122             :       ulong bytes_written;
     123             :       ulong bytes_total;
     124             :       uint  num_retries;
     125             :     } full;
     126             : 
     127             :     struct {
     128             :       ulong bytes_read;
     129             :       ulong bytes_written;
     130             :       ulong bytes_total;
     131             :       uint  num_retries;
     132             :     } incremental;
     133             :   } metrics;
     134             : 
     135             :   struct {
     136             :     fd_wksp_t * mem;
     137             :     ulong       chunk0;
     138             :     ulong       wmark;
     139             :     ulong       mtu;
     140             :   } gossip_in;
     141             : 
     142             :   struct {
     143             :     fd_wksp_t * mem;
     144             :     ulong       chunk0;
     145             :     ulong       wmark;
     146             :     ulong       mtu;
     147             :   } snapld_in;
     148             : 
     149             :   struct {
     150             :     fd_snapct_gossip_ci_entry_t * ci_table;
     151             :     gossip_ci_map_t *             ci_map;
     152             :     double                        fresh;
     153             :     ulong                         fresh_cnt;
     154             :     ulong                         total_cnt;
     155             :     int                           saturated;
     156             :   } gossip;
     157             : 
     158             :   fd_restore_out_link_t out_ld;
     159             :   fd_restore_out_link_t out_gui;
     160             :   fd_restore_out_link_t out_rp;
     161             : };
     162             : 
     163             : typedef struct fd_snapct_tile fd_snapct_tile_t;
     164             : 
     165             : static ulong
     166           0 : scratch_align( void ) {
     167           0 :   return fd_ulong_max( alignof(fd_snapct_tile_t),
     168           0 :          fd_ulong_max( fd_ssping_align(),
     169           0 :          fd_ulong_max( alignof(fd_snapct_gossip_ci_entry_t),
     170           0 :          fd_ulong_max( gossip_ci_map_align(),
     171           0 :          fd_ulong_max( fd_http_resolver_align(),
     172           0 :                        fd_sspeer_selector_align() ) ) ) ) );
     173           0 : }
     174             : 
     175             : static ulong
     176           0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
     177           0 :   ulong l = FD_LAYOUT_INIT;
     178           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_snapct_tile_t),            sizeof(fd_snapct_tile_t)                                                   );
     179           0 :   l = FD_LAYOUT_APPEND( l, fd_ssping_align(),                    fd_ssping_footprint( TOTAL_PEERS_MAX )                                     );
     180           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_snapct_gossip_ci_entry_t), sizeof(fd_snapct_gossip_ci_entry_t) * GOSSIP_PEERS_MAX                     );
     181           0 :   l = FD_LAYOUT_APPEND( l, gossip_ci_map_align(),                gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
     182           0 :   l = FD_LAYOUT_APPEND( l, fd_http_resolver_align(),             fd_http_resolver_footprint( SERVER_PEERS_MAX )                             );
     183           0 :   l = FD_LAYOUT_APPEND( l, fd_sspeer_selector_align(),           fd_sspeer_selector_footprint( TOTAL_PEERS_MAX )                            );
     184           0 :   return FD_LAYOUT_FINI( l, scratch_align() );
     185           0 : }
     186             : 
     187             : static inline int
     188           0 : should_shutdown( fd_snapct_tile_t * ctx ) {
     189           0 :   return ctx->state==FD_SNAPCT_STATE_SHUTDOWN;
     190           0 : }
     191             : 
     192             : static int
     193             : gossip_saturated( fd_snapct_tile_t * ctx,
     194           0 :                   long               now ) {
     195           0 :   if( FD_UNLIKELY( !ctx->config.gossip_peers_enabled ) ) return 1;
     196           0 :   if( FD_UNLIKELY( ctx->gossip.saturated ) ) return 1;
     197             : 
     198           0 :   ulong fresh_cnt = 0UL;
     199           0 :   ulong total_cnt = 0UL;
     200           0 :   for( gossip_ci_map_iter_t iter = gossip_ci_map_iter_init( ctx->gossip.ci_map, ctx->gossip.ci_table );
     201           0 :         !gossip_ci_map_iter_done( iter, ctx->gossip.ci_map, ctx->gossip.ci_table );
     202           0 :         iter = gossip_ci_map_iter_next( iter, ctx->gossip.ci_map, ctx->gossip.ci_table ) ) {
     203           0 :     fd_snapct_gossip_ci_entry_t const * ci_entry = gossip_ci_map_iter_ele_const( iter, ctx->gossip.ci_map, ctx->gossip.ci_table );
     204           0 :     if( FD_UNLIKELY( ci_entry->wallclock_nanos>(now-FD_SNAPCT_GOSSIP_FRESH_DEADLINE_NANOS) ) ) fresh_cnt++;
     205           0 :     total_cnt++;
     206           0 :   }
     207             : 
     208           0 :   double fresh = total_cnt ? (double)fresh_cnt/(double)total_cnt : 1.0;
     209           0 :   ctx->gossip.fresh_cnt = fresh_cnt;
     210           0 :   ctx->gossip.total_cnt = total_cnt;
     211           0 :   ctx->gossip.fresh     = fresh;
     212           0 :   ctx->gossip.saturated = fresh<FD_SNAPCT_GOSSIP_SATURATION_THRESHOLD;
     213           0 :   return ctx->gossip.saturated;
     214           0 : }
     215             : 
     216             : static void
     217           0 : metrics_write( fd_snapct_tile_t * ctx ) {
     218           0 :   FD_MGAUGE_SET( SNAPCT, FULL_BYTES_READ,               ctx->metrics.full.bytes_read );
     219           0 :   FD_MGAUGE_SET( SNAPCT, FULL_BYTES_WRITTEN,            ctx->metrics.full.bytes_written );
     220           0 :   FD_MGAUGE_SET( SNAPCT, FULL_BYTES_TOTAL,              ctx->metrics.full.bytes_total );
     221           0 :   FD_MGAUGE_SET( SNAPCT, FULL_DOWNLOAD_RETRIES,         ctx->metrics.full.num_retries );
     222             : 
     223           0 :   FD_MGAUGE_SET( SNAPCT, INCREMENTAL_BYTES_READ,        ctx->metrics.incremental.bytes_read );
     224           0 :   FD_MGAUGE_SET( SNAPCT, INCREMENTAL_BYTES_WRITTEN,     ctx->metrics.incremental.bytes_written );
     225           0 :   FD_MGAUGE_SET( SNAPCT, INCREMENTAL_BYTES_TOTAL,       ctx->metrics.incremental.bytes_total );
     226           0 :   FD_MGAUGE_SET( SNAPCT, INCREMENTAL_DOWNLOAD_RETRIES,  ctx->metrics.incremental.num_retries );
     227             : 
     228           0 :   FD_MGAUGE_SET( SNAPCT, GOSSIP_FRESH_COUNT,            ctx->gossip.fresh_cnt );
     229           0 :   FD_MGAUGE_SET( SNAPCT, GOSSIP_TOTAL_COUNT,            ctx->gossip.total_cnt );
     230             : 
     231           0 :   FD_MGAUGE_SET( SNAPCT, PREDICTED_SLOT,                ctx->predicted_incremental.slot );
     232             : 
     233           0 :   FD_MGAUGE_SET( SNAPCT, STATE, (ulong)ctx->state );
     234           0 : }
     235             : 
     236             : static void
     237             : snapshot_path_gui_publish( fd_snapct_tile_t *  ctx,
     238             :                            fd_stem_context_t * stem,
     239             :                            char const *        path,
     240           0 :                            int                 is_full ) {
     241           0 :   fd_snapct_update_t * out = fd_chunk_to_laddr( ctx->out_gui.mem, ctx->out_gui.chunk );
     242           0 :   FD_TEST( fd_cstr_printf_check( out->read_path, PATH_MAX, NULL, "%s", path ) );
     243           0 :   out->is_download = 0;
     244           0 :   out->type = fd_int_if( is_full, FD_SNAPCT_SNAPSHOT_TYPE_FULL, FD_SNAPCT_SNAPSHOT_TYPE_INCREMENTAL );
     245           0 :   fd_stem_publish( stem, ctx->out_gui.idx, 0UL, ctx->out_gui.chunk, sizeof(fd_snapct_update_t) , 0UL, 0UL, 0UL );
     246           0 :   ctx->out_gui.chunk = fd_dcache_compact_next( ctx->out_gui.chunk, sizeof(fd_snapct_update_t), ctx->out_gui.chunk0, ctx->out_gui.wmark );
     247           0 : }
     248             : 
     249             : static void
     250           0 : predict_incremental( fd_snapct_tile_t * ctx ) {
     251           0 :   if( FD_UNLIKELY( !ctx->config.incremental_snapshot_fetch ) ) return;
     252           0 :   if( FD_UNLIKELY( ctx->predicted_incremental.full_slot==ULONG_MAX ) ) return;
     253             : 
     254           0 :   fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
     255             : 
     256           0 :   if( FD_LIKELY( best.addr.l ) ) {
     257           0 :     if( FD_UNLIKELY( ctx->predicted_incremental.slot!=best.ssinfo.incremental.slot ) ) {
     258           0 :       ctx->predicted_incremental.slot  = best.ssinfo.incremental.slot;
     259           0 :       ctx->predicted_incremental.dirty = 1;
     260           0 :     }
     261           0 :   }
     262           0 : }
     263             : 
     264             : static void
     265             : on_resolve( void *              _ctx,
     266             :             fd_ip4_port_t       addr,
     267           0 :             fd_ssinfo_t const * ssinfo ) {
     268           0 :   fd_snapct_tile_t * ctx = (fd_snapct_tile_t *)_ctx;
     269             : 
     270           0 :   fd_sspeer_selector_add( ctx->selector, addr, ULONG_MAX, ssinfo );
     271           0 :   fd_sspeer_selector_process_cluster_slot( ctx->selector, ssinfo->full.slot, ssinfo->incremental.slot );
     272           0 :   predict_incremental( ctx );
     273           0 : }
     274             : 
     275             : static void
     276             : on_ping( void *        _ctx,
     277             :          fd_ip4_port_t addr,
     278           0 :          ulong         latency ) {
     279           0 :   fd_snapct_tile_t * ctx = (fd_snapct_tile_t *)_ctx;
     280             : 
     281           0 :   fd_sspeer_selector_add( ctx->selector, addr, latency, NULL );
     282           0 :   predict_incremental( ctx );
     283           0 : }
     284             : 
     285             : static void
     286             : on_snapshot_hash( fd_snapct_tile_t *                 ctx,
     287             :                   fd_ip4_port_t                      addr,
     288           0 :                   fd_gossip_update_message_t const * msg ) {
     289           0 :   ulong full_slot = msg->snapshot_hashes.full->slot;
     290           0 :   ulong incr_slot = 0UL;
     291             : 
     292           0 :   for( ulong i=0UL; i<msg->snapshot_hashes.incremental_len; i++ ) {
     293           0 :     if( FD_LIKELY( msg->snapshot_hashes.incremental[ i ].slot>incr_slot ) ) {
     294           0 :       incr_slot = msg->snapshot_hashes.incremental[ i ].slot;
     295           0 :     }
     296           0 :   }
     297             : 
     298           0 :   fd_ssinfo_t ssinfo = { .full = { .slot = msg->snapshot_hashes.full->slot },
     299           0 :                         .incremental = { .slot = incr_slot, .base_slot = full_slot } };
     300             : 
     301           0 :   fd_sspeer_selector_add( ctx->selector, addr, ULONG_MAX, &ssinfo );
     302           0 :   fd_sspeer_selector_process_cluster_slot( ctx->selector, full_slot, incr_slot );
     303           0 :   predict_incremental( ctx );
     304           0 : }
     305             : 
     306             : static void
     307             : send_expected_slot( fd_snapct_tile_t *  ctx,
     308             :                     fd_stem_context_t * stem,
     309           0 :                     ulong               slot ) {
     310           0 :   uint tsorig; uint tspub;
     311           0 :   fd_ssmsg_slot_to_frag( slot, &tsorig, &tspub );
     312           0 :   fd_stem_publish( stem, ctx->out_rp.idx, FD_SSMSG_EXPECTED_SLOT, 0UL, 0UL, 0UL, tsorig, tspub );
     313           0 : }
     314             : 
     315             : static void
     316           0 : rename_snapshots( fd_snapct_tile_t * ctx ) {
     317           0 :   if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) return;
     318             : 
     319           0 :   if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd && ctx->local_out.full_snapshot_name[ 0 ]!='\0' ) ) {
     320           0 :     if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", ctx->local_out.dir_fd, ctx->local_out.full_snapshot_name ) ) )
     321           0 :       FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     322           0 :   }
     323           0 :   if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd && ctx->local_out.incremental_snapshot_name[ 0 ]!='\0' ) ) {
     324           0 :     if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", ctx->local_out.dir_fd, ctx->local_out.incremental_snapshot_name ) ) )
     325           0 :       FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     326           0 :   }
     327           0 : }
     328             : 
     329             : static void
     330           0 : remove_temp_files( fd_snapct_tile_t * ctx ) {
     331           0 :   if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) return;
     332             : 
     333           0 :   if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) ) {
     334           0 :     if( FD_UNLIKELY( -1==unlinkat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", 0 ) ) )
     335           0 :       FD_LOG_ERR(( "unlinkat(snapshot.tar.bz2-partial) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     336           0 :   }
     337           0 :   if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) {
     338           0 :     if( FD_UNLIKELY( -1==unlinkat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", 0 ) ) )
     339           0 :       FD_LOG_ERR(( "unlinkat(incremental-snapshot.tar.bz2-partial) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     340           0 :   }
     341           0 : }
     342             : 
     343             : static ulong
     344             : rlimit_file_cnt( fd_topo_t const *      topo FD_PARAM_UNUSED,
     345           0 :                  fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
     346             :   /* stderr, logfile, dirfd, local out full fd, local out incremental
     347             :      fd, and one spare for a socket(). */
     348             : 
     349           0 :   return 1UL +              /* stderr */
     350           0 :          1UL +              /* logfile */
     351           0 :          1UL +              /* ssping socket */
     352           0 :          SERVER_PEERS_MAX + /* http resolver max peers sockets */
     353           0 :          3UL;               /* dirfd + 2 snapshot file fds in the worst case */
     354           0 : }
     355             : 
     356             : static ulong
     357             : populate_allowed_seccomp( fd_topo_t const *      topo,
     358             :                           fd_topo_tile_t const * tile,
     359             :                           ulong                  out_cnt,
     360           0 :                           struct sock_filter *   out ) {
     361             : 
     362           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     363             : 
     364           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     365           0 :   fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
     366             : 
     367           0 :   populate_sock_filter_policy_fd_snapct_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->local_out.dir_fd, (uint)ctx->local_out.full_snapshot_fd, (uint)ctx->local_out.incremental_snapshot_fd, (uint)fd_ssping_get_sockfd( ctx->ssping ) );
     368           0 :   return sock_filter_policy_fd_snapct_tile_instr_cnt;
     369           0 : }
     370             : 
     371             : static ulong
     372             : populate_allowed_fds( fd_topo_t const *      topo,
     373             :                       fd_topo_tile_t const * tile,
     374             :                       ulong                  out_fds_cnt,
     375           0 :                       int *                  out_fds ) {
     376           0 :   if( FD_UNLIKELY( out_fds_cnt<5UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
     377             : 
     378           0 :   ulong out_cnt = 0;
     379           0 :   out_fds[ out_cnt++ ] = 2UL; /* stderr */
     380           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
     381           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     382           0 :   }
     383             : 
     384           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     385             : 
     386           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     387           0 :   fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
     388           0 :   if( FD_LIKELY( -1!=ctx->local_out.dir_fd ) )                  out_fds[ out_cnt++ ] = ctx->local_out.dir_fd;
     389           0 :   if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) )        out_fds[ out_cnt++ ] = ctx->local_out.full_snapshot_fd;
     390           0 :   if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.incremental_snapshot_fd;
     391           0 :   out_fds[ out_cnt++ ] = fd_ssping_get_sockfd( ctx->ssping );
     392             : 
     393           0 :   return out_cnt;
     394           0 : }
     395             : 
     396             : static void
     397             : init_load( fd_snapct_tile_t *  ctx,
     398             :            fd_stem_context_t * stem,
     399             :            int full,
     400           0 :            int file ) {
     401           0 :   fd_ssctrl_init_t * out = fd_chunk_to_laddr( ctx->out_ld.mem, ctx->out_ld.chunk );
     402           0 :   out->file = file;
     403           0 :   if( !file ) out->addr = ctx->addr;
     404           0 :   fd_stem_publish( stem, ctx->out_ld.idx, full ? FD_SNAPSHOT_MSG_CTRL_INIT_FULL : FD_SNAPSHOT_MSG_CTRL_INIT_INCR, ctx->out_ld.chunk, sizeof(fd_ssctrl_init_t), 0UL, 0UL, 0UL );
     405           0 :   ctx->out_ld.chunk = fd_dcache_compact_next( ctx->out_ld.chunk, sizeof(fd_ssctrl_init_t), ctx->out_ld.chunk0, ctx->out_ld.wmark );
     406             : 
     407           0 :   if( file ) {
     408             :     /* When loading from a local file and not from HTTP, there is no
     409             :        future metadata message to initialize total size / filename, as
     410             :        these are already known immediately. */
     411           0 :     if( full ) {
     412           0 :       ctx->metrics.full.bytes_total = ctx->local_in.full_snapshot_size;
     413           0 :       fd_cstr_fini( ctx->local_out.full_snapshot_name );
     414           0 :       if( FD_LIKELY( !!ctx->out_gui.mem ) ) {
     415           0 :         snapshot_path_gui_publish( ctx, stem, ctx->local_in.full_snapshot_path, 1 );
     416           0 :       }
     417           0 :     } else {
     418           0 :       ctx->metrics.incremental.bytes_total = ctx->local_in.incremental_snapshot_size;
     419           0 :       fd_cstr_fini( ctx->local_out.incremental_snapshot_name );
     420           0 :       if( FD_LIKELY( !!ctx->out_gui.mem ) ) {
     421           0 :         snapshot_path_gui_publish( ctx, stem, ctx->local_in.incremental_snapshot_path, 0 );
     422           0 :       }
     423           0 :     }
     424           0 :   }
     425           0 : }
     426             : 
     427             : static void
     428             : after_credit( fd_snapct_tile_t *  ctx,
     429             :               fd_stem_context_t * stem,
     430             :               int *               opt_poll_in FD_PARAM_UNUSED,
     431           0 :               int *               charge_busy FD_PARAM_UNUSED ) {
     432           0 :   long now = fd_log_wallclock();
     433             : 
     434           0 :   fd_ssping_advance( ctx->ssping, now, ctx->selector );
     435           0 :   fd_http_resolver_advance( ctx->ssresolver, now, ctx->selector );
     436             : 
     437             :   /* send an expected slot message as the predicted incremental
     438             :      could have changed as a result of the pinger, resolver, or from
     439             :      processing gossip frags in gossip_frag. */
     440           0 :   if( FD_LIKELY( ctx->predicted_incremental.dirty ) ) {
     441           0 :     send_expected_slot( ctx, stem, ctx->predicted_incremental.slot );
     442           0 :     ctx->predicted_incremental.dirty = 0;
     443           0 :   }
     444             : 
     445             :   /* Note: All state transitions should occur within this switch
     446             :      statement to make it easier to reason about the state management. */
     447             : 
     448           0 :   switch ( ctx->state ) {
     449             : 
     450             :     /* ============================================================== */
     451           0 :     case FD_SNAPCT_STATE_WAITING_FOR_PEERS: {
     452           0 :       if( FD_UNLIKELY( now>ctx->deadline_nanos ) ) FD_LOG_ERR(( "timed out waiting for peers." ));
     453             : 
     454           0 :       if( FD_UNLIKELY( !ctx->config.do_download ) ) {
     455           0 :         ulong local_slot = ctx->config.incremental_snapshot_fetch ? ctx->local_in.incremental_snapshot_slot : ctx->local_in.full_snapshot_slot;
     456           0 :         send_expected_slot( ctx, stem, local_slot );
     457           0 :         FD_LOG_NOTICE(( "reading full snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
     458           0 :         ctx->predicted_incremental.full_slot = ctx->local_in.full_snapshot_slot;
     459           0 :         ctx->state                           = FD_SNAPCT_STATE_READING_FULL_FILE;
     460           0 :         init_load( ctx, stem, 1, 1 );
     461           0 :         break;
     462           0 :       }
     463             : 
     464           0 :       fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, ULONG_MAX );
     465           0 :       if( FD_LIKELY( best.addr.l ) ) {
     466           0 :         ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS;
     467           0 :         ctx->deadline_nanos = now+FD_SNAPCT_GOSSIP_TIMEOUT_DEADLINE_NANOS;
     468           0 :       }
     469           0 :       break;
     470           0 :     }
     471             : 
     472             :     /* ============================================================== */
     473           0 :     case FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL: {
     474           0 :       fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, ULONG_MAX );
     475           0 :       if( FD_LIKELY( best.addr.l ) ) {
     476           0 :         ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
     477           0 :         ctx->deadline_nanos = now;
     478           0 :       }
     479           0 :       break;
     480           0 :     }
     481             : 
     482             :     /* ============================================================== */
     483           0 :     case FD_SNAPCT_STATE_COLLECTING_PEERS: {
     484           0 :       if( FD_UNLIKELY( !gossip_saturated( ctx, now ) && now<ctx->deadline_nanos ) ) break;
     485             : 
     486           0 :       fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, ULONG_MAX );
     487           0 :       if( FD_UNLIKELY( !best.addr.l ) ) {
     488           0 :         ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS;
     489           0 :         break;
     490           0 :       }
     491             : 
     492           0 :       fd_sscluster_slot_t cluster = fd_sspeer_selector_cluster_slot( ctx->selector );
     493           0 :       if( FD_UNLIKELY( cluster.incremental==ULONG_MAX && ctx->config.incremental_snapshot_fetch ) ) {
     494             :         /* We must have a cluster full slot to be in this state. */
     495           0 :         FD_TEST( cluster.full!=ULONG_MAX );
     496             :         /* fall back to full snapshot only if the highest cluster slot
     497             :            is a full snapshot only */
     498           0 :         ctx->config.incremental_snapshot_fetch = 0;
     499           0 :       }
     500             : 
     501           0 :       ulong       cluster_slot    = ctx->config.incremental_snapshot_fetch ? cluster.incremental : cluster.full;
     502           0 :       ulong       local_slot      = ctx->config.incremental_snapshot_fetch ? ctx->local_in.incremental_snapshot_slot : ctx->local_in.full_snapshot_slot;
     503           0 :       ulong       local_slot_with_download = local_slot;
     504           0 :       int         local_too_old   = local_slot!=ULONG_MAX && local_slot<fd_ulong_sat_sub( cluster_slot, ctx->config.maximum_local_snapshot_age );
     505           0 :       int         local_full_only = ctx->local_in.incremental_snapshot_slot==ULONG_MAX && ctx->local_in.full_snapshot_slot!=ULONG_MAX;
     506           0 :       if( FD_LIKELY( (ctx->config.incremental_snapshot_fetch && local_full_only) || local_too_old ) ) {
     507           0 :         fd_sspeer_t best_incremental = fd_sspeer_selector_best( ctx->selector, 1, ctx->local_in.full_snapshot_slot );
     508           0 :         if( FD_LIKELY( best_incremental.addr.l ) ) {
     509           0 :           ctx->predicted_incremental.slot = best_incremental.ssinfo.incremental.slot;
     510           0 :           local_slot_with_download = best_incremental.ssinfo.incremental.slot;
     511           0 :           ctx->local_in.incremental_snapshot_slot = ULONG_MAX; /* don't use the local incremental snapshot */
     512           0 :         }
     513           0 :       }
     514             : 
     515           0 :       int can_use_local_full = local_slot_with_download!=ULONG_MAX && local_slot_with_download>=fd_ulong_sat_sub( cluster_slot, ctx->config.maximum_local_snapshot_age );
     516           0 :       if( FD_LIKELY( can_use_local_full ) ) {
     517           0 :         send_expected_slot( ctx, stem, local_slot );
     518             : 
     519           0 :         FD_LOG_NOTICE(( "reading full snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
     520           0 :         ctx->predicted_incremental.full_slot = ctx->local_in.full_snapshot_slot;
     521           0 :         ctx->state                           = FD_SNAPCT_STATE_READING_FULL_FILE;
     522           0 :         init_load( ctx, stem, 1, 1 );
     523           0 :       } else {
     524           0 :         if( FD_UNLIKELY( !ctx->config.incremental_snapshot_fetch ) ) send_expected_slot( ctx, stem, best.ssinfo.full.slot );
     525             : 
     526           0 :         fd_sspeer_t best_incremental = fd_sspeer_selector_best( ctx->selector, 1, best.ssinfo.full.slot );
     527           0 :         if( FD_LIKELY( best_incremental.addr.l ) ) {
     528           0 :           ctx->predicted_incremental.slot = best_incremental.ssinfo.incremental.slot;
     529           0 :           send_expected_slot( ctx, stem, best_incremental.ssinfo.incremental.slot );
     530           0 :         }
     531             : 
     532           0 :         FD_LOG_NOTICE(( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( best.addr.addr ), best.addr.port ));
     533           0 :         ctx->addr                            = best.addr;
     534           0 :         ctx->state                           = FD_SNAPCT_STATE_READING_FULL_HTTP;
     535           0 :         ctx->predicted_incremental.full_slot = best.ssinfo.full.slot;
     536           0 :         init_load( ctx, stem, 1, 0 );
     537           0 :       }
     538           0 :       break;
     539           0 :     }
     540             : 
     541             :     /* ============================================================== */
     542           0 :     case FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL: {
     543           0 :       if( FD_UNLIKELY( now<ctx->deadline_nanos ) ) break;
     544             : 
     545           0 :       fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
     546           0 :       if( FD_UNLIKELY( !best.addr.l ) ) {
     547           0 :         ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL;
     548           0 :         break;
     549           0 :       }
     550             : 
     551           0 :       ctx->addr = best.addr;
     552           0 :       FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( best.addr.addr ), best.addr.port ));
     553           0 :       ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP;
     554           0 :       init_load( ctx, stem, 0, 0 );
     555           0 :       break;
     556           0 :     }
     557             : 
     558             :     /* ============================================================== */
     559           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE:
     560           0 :       if( !ctx->flush_ack ) break;
     561             : 
     562           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     563           0 :         ctx->malformed = 0;
     564           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     565           0 :         ctx->flush_ack = 0;
     566           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET;
     567           0 :         FD_LOG_WARNING(( "error reading snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path ));
     568           0 :         break;
     569           0 :       }
     570             : 
     571           0 :       ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
     572           0 :       remove_temp_files( ctx );
     573           0 :       metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
     574           0 :       fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
     575           0 :       break;
     576             : 
     577             :     /* ============================================================== */
     578           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP:
     579           0 :       if( !ctx->flush_ack ) break;
     580             : 
     581           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     582           0 :         ctx->malformed = 0;
     583           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     584           0 :         ctx->flush_ack = 0;
     585           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
     586           0 :         FD_LOG_WARNING(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2",
     587           0 :                          FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
     588           0 :         fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
     589           0 :         fd_sspeer_selector_remove( ctx->selector, ctx->addr );
     590           0 :         break;
     591           0 :       }
     592             : 
     593           0 :       ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
     594           0 :       rename_snapshots( ctx );
     595           0 :       metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
     596           0 :       fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
     597           0 :       break;
     598             : 
     599             :     /* ============================================================== */
     600           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_FILE:
     601           0 :       if( !ctx->flush_ack ) break;
     602             : 
     603           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     604           0 :         ctx->malformed = 0;
     605           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     606           0 :         ctx->flush_ack = 0;
     607           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET;
     608           0 :         FD_LOG_WARNING(( "error reading snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
     609           0 :         break;
     610           0 :       }
     611             : 
     612           0 :       if( FD_LIKELY( !ctx->config.incremental_snapshot_fetch ) ) {
     613           0 :         ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
     614           0 :         remove_temp_files( ctx );
     615           0 :         metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
     616           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
     617           0 :         break;
     618           0 :       }
     619             : 
     620           0 :       if( FD_LIKELY( ctx->local_in.incremental_snapshot_slot==ULONG_MAX ) ) {
     621           0 :         ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
     622           0 :         ctx->deadline_nanos = 0L;
     623           0 :       } else {
     624           0 :         FD_LOG_NOTICE(( "reading incremental snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path ));
     625           0 :         ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_FILE;
     626           0 :         init_load( ctx, stem, 0, 1 );
     627           0 :       }
     628           0 :       break;
     629             : 
     630             :     /* ============================================================== */
     631           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP:
     632           0 :       if( !ctx->flush_ack ) break;
     633             : 
     634           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     635           0 :         ctx->malformed = 0;
     636           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     637           0 :         ctx->flush_ack = 0;
     638           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
     639           0 :         FD_LOG_WARNING(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
     640           0 :                          FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
     641           0 :         fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
     642           0 :         fd_sspeer_selector_remove( ctx->selector, ctx->addr );
     643           0 :         break;
     644           0 :       }
     645             : 
     646           0 :       if( FD_LIKELY( !ctx->config.incremental_snapshot_fetch ) ) {
     647           0 :         ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
     648           0 :         rename_snapshots( ctx );
     649           0 :         metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
     650           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
     651           0 :         break;
     652           0 :       }
     653             : 
     654             :       /* Get the best incremental peer to download from */
     655             :       /* TODO: We should just transition to collecting_peers_incremental
     656             :          here rather than failing the full snapshot? */
     657           0 :       fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
     658           0 :       if( FD_UNLIKELY( !best.addr.l ) ) {
     659           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     660           0 :         ctx->flush_ack = 0;
     661           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
     662           0 :         break;
     663           0 :       }
     664             : 
     665           0 :       if( FD_UNLIKELY( ctx->predicted_incremental.slot!=best.ssinfo.incremental.slot ) ) {
     666           0 :         ctx->predicted_incremental.slot = best.ssinfo.incremental.slot;
     667           0 :         send_expected_slot( ctx, stem, best.ssinfo.incremental.slot );
     668           0 :       }
     669             : 
     670           0 :       ctx->addr = best.addr;
     671           0 :       FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
     672           0 :       ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP;
     673           0 :       init_load( ctx, stem, 0, 0 );
     674           0 :       break;
     675             : 
     676             :     /* ============================================================== */
     677           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
     678           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET:
     679           0 :       if( !ctx->flush_ack ) break;
     680             : 
     681           0 :       ctx->metrics.full.bytes_read    = 0UL;
     682           0 :       ctx->metrics.full.bytes_written = 0UL;
     683           0 :       ctx->metrics.full.bytes_total   = 0UL;
     684             : 
     685           0 :       ctx->metrics.incremental.bytes_read    = 0UL;
     686           0 :       ctx->metrics.incremental.bytes_written = 0UL;
     687           0 :       ctx->metrics.incremental.bytes_total   = 0UL;
     688             : 
     689           0 :       ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS;
     690           0 :       ctx->deadline_nanos = 0L;
     691           0 :       break;
     692             : 
     693             :     /* ============================================================== */
     694           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET:
     695           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
     696           0 :       if( !ctx->flush_ack ) break;
     697             : 
     698           0 :       ctx->metrics.incremental.bytes_read    = 0UL;
     699           0 :       ctx->metrics.incremental.bytes_written = 0UL;
     700           0 :       ctx->metrics.incremental.bytes_total   = 0UL;
     701             : 
     702           0 :       ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
     703           0 :       ctx->deadline_nanos = 0L;
     704           0 :       break;
     705             : 
     706             :     /* ============================================================== */
     707           0 :     case FD_SNAPCT_STATE_READING_FULL_FILE:
     708           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     709           0 :         ctx->malformed = 0;
     710           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     711           0 :         ctx->flush_ack = 0;
     712           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET;
     713           0 :         FD_LOG_WARNING(( "error reading snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
     714           0 :         break;
     715           0 :       }
     716           0 :       FD_TEST( ctx->metrics.full.bytes_total!=0UL );
     717           0 :       if( FD_UNLIKELY( ctx->metrics.full.bytes_read == ctx->metrics.full.bytes_total ) ) {
     718           0 :         ulong sig = ctx->config.incremental_snapshot_fetch ? FD_SNAPSHOT_MSG_CTRL_NEXT : FD_SNAPSHOT_MSG_CTRL_DONE;
     719           0 :         fd_stem_publish( stem, ctx->out_ld.idx, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
     720           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE;
     721           0 :         ctx->flush_ack = 0;
     722           0 :       }
     723           0 :       break;
     724             : 
     725             :     /* ============================================================== */
     726           0 :     case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
     727           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     728           0 :         ctx->malformed = 0;
     729           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     730           0 :         ctx->flush_ack = 0;
     731           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET;
     732           0 :         FD_LOG_WARNING(( "error reading snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path ));
     733           0 :         break;
     734           0 :       }
     735           0 :       FD_TEST( ctx->metrics.incremental.bytes_total!=0UL );
     736           0 :       if ( FD_UNLIKELY( ctx->metrics.incremental.bytes_read == ctx->metrics.incremental.bytes_total ) ) {
     737           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
     738           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE;
     739           0 :         ctx->flush_ack = 0;
     740           0 :       }
     741           0 :       break;
     742             : 
     743             :     /* ============================================================== */
     744           0 :     case FD_SNAPCT_STATE_READING_FULL_HTTP:
     745           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     746           0 :         ctx->malformed = 0;
     747           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     748           0 :         ctx->flush_ack = 0;
     749           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
     750           0 :         FD_LOG_WARNING(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
     751           0 :                          FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
     752           0 :         fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
     753           0 :         fd_sspeer_selector_remove( ctx->selector, ctx->addr );
     754           0 :         break;
     755           0 :       }
     756           0 :       if( FD_UNLIKELY( ctx->metrics.full.bytes_total!=0UL && ctx->metrics.full.bytes_read==ctx->metrics.full.bytes_total ) ) {
     757           0 :         ulong sig = ctx->config.incremental_snapshot_fetch ? FD_SNAPSHOT_MSG_CTRL_NEXT : FD_SNAPSHOT_MSG_CTRL_DONE;
     758           0 :         fd_stem_publish( stem, ctx->out_ld.idx, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
     759           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP;
     760           0 :         ctx->flush_ack = 0;
     761           0 :       }
     762           0 :       break;
     763             : 
     764             :     /* ============================================================== */
     765           0 :     case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP:
     766           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     767           0 :         ctx->malformed = 0;
     768           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
     769           0 :         ctx->flush_ack = 0;
     770           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
     771           0 :         FD_LOG_WARNING(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2",
     772           0 :                          FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
     773           0 :         fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
     774           0 :         fd_sspeer_selector_remove( ctx->selector, ctx->addr );
     775           0 :         break;
     776           0 :       }
     777           0 :       if ( FD_UNLIKELY( ctx->metrics.incremental.bytes_total!=0UL && ctx->metrics.incremental.bytes_read==ctx->metrics.incremental.bytes_total ) ) {
     778           0 :         fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
     779           0 :         ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP;
     780           0 :         ctx->flush_ack = 0;
     781           0 :       }
     782           0 :       break;
     783             : 
     784             :     /* ============================================================== */
     785           0 :     case FD_SNAPCT_STATE_SHUTDOWN:
     786           0 :       break;
     787             : 
     788             :     /* ============================================================== */
     789           0 :     default: FD_LOG_ERR(( "unexpected state %d", ctx->state ));
     790           0 :   }
     791           0 : }
     792             : 
     793             : static void
     794             : gossip_frag( fd_snapct_tile_t *  ctx,
     795             :              ulong               sig,
     796             :              ulong               sz FD_PARAM_UNUSED,
     797           0 :              ulong               chunk ) {
     798             : 
     799           0 :   if( !( ( sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO ||
     800           0 :            sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE ||
     801           0 :            sig==FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES ) &&
     802           0 :          ctx->config.gossip_peers_enabled ) ) return;
     803             : 
     804           0 :   FD_TEST( chunk>=ctx->gossip_in.chunk0 && chunk<=ctx->gossip_in.wmark );
     805           0 :   fd_gossip_update_message_t const * msg = fd_chunk_to_laddr_const( ctx->gossip_in.mem, chunk );
     806           0 :   switch( msg->tag ) {
     807           0 :     case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO: {
     808             :       /* FIXME: Keep ports as network byte order */
     809           0 :       fd_ip4_port_t new_addr    = msg->contact_info.contact_info->sockets[ FD_CONTACT_INFO_SOCKET_RPC ];
     810           0 :       new_addr.port             = fd_ushort_bswap( new_addr.port );
     811             : 
     812           0 :       if( FD_LIKELY( ctx->config.gossip_peers_enabled ) ) {
     813           0 :         fd_snapct_gossip_ci_entry_t * entry = ctx->gossip.ci_table + msg->contact_info.idx;
     814           0 :         if( FD_UNLIKELY( !fd_pubkey_eq( &entry->pubkey, (fd_pubkey_t const *)msg->origin_pubkey ) ) ) {
     815           0 :           FD_TEST( fd_pubkey_check_zero( &entry->pubkey ) );
     816           0 :           FD_TEST( ULONG_MAX==gossip_ci_map_idx_query_const( ctx->gossip.ci_map, (fd_pubkey_t const *)msg->origin_pubkey, ULONG_MAX, ctx->gossip.ci_table ) );
     817           0 :           gossip_ci_map_idx_insert( ctx->gossip.ci_map, msg->contact_info.idx, ctx->gossip.ci_table );
     818           0 :           entry->pubkey = *(fd_pubkey_t const *)msg->origin_pubkey;
     819           0 :           entry->wallclock_nanos = msg->wallclock_nanos;
     820           0 :         }
     821           0 :         fd_ip4_port_t cur_addr = entry->rpc_addr;
     822           0 :         if( new_addr.l!=cur_addr.l ) {
     823           0 :           entry->rpc_addr    = new_addr;
     824           0 :           if( FD_LIKELY( !!cur_addr.l ) ) {
     825           0 :             int removed = fd_ssping_remove( ctx->ssping, cur_addr );
     826           0 :             if( FD_LIKELY( removed ) ) fd_sspeer_selector_remove( ctx->selector, cur_addr );
     827           0 :           }
     828           0 :           if( FD_LIKELY( !!new_addr.l ) ) fd_ssping_add( ctx->ssping, new_addr );
     829           0 :         }
     830           0 :       }
     831           0 :       break;
     832           0 :     }
     833           0 :     case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE: {
     834           0 :       fd_snapct_gossip_ci_entry_t * entry = ctx->gossip.ci_table + msg->contact_info.idx;
     835           0 :       if( FD_UNLIKELY( !fd_pubkey_eq( &entry->pubkey, (fd_pubkey_t const *)msg->origin_pubkey ) ) ) {
     836           0 :         FD_TEST( fd_pubkey_check_zero( &entry->pubkey ) );
     837           0 :         break;
     838           0 :       }
     839           0 :       ulong rem_idx = gossip_ci_map_idx_remove( ctx->gossip.ci_map, (fd_pubkey_t const *)msg->origin_pubkey, ULONG_MAX, ctx->gossip.ci_table );
     840           0 :       FD_TEST( msg->contact_info.idx==rem_idx );
     841           0 :       fd_ip4_port_t addr = entry->rpc_addr;
     842           0 :       if( FD_LIKELY( !!addr.l ) ) {
     843           0 :         int removed = fd_ssping_remove( ctx->ssping, addr );
     844           0 :         if( FD_LIKELY( removed ) ) fd_sspeer_selector_remove( ctx->selector, addr );
     845           0 :       }
     846           0 :       fd_memset( entry, 0, sizeof(*entry) );
     847           0 :       break;
     848           0 :     }
     849           0 :     case FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES: {
     850           0 :       ulong idx = gossip_ci_map_idx_query_const( ctx->gossip.ci_map, (fd_pubkey_t const *)msg->origin_pubkey, ULONG_MAX, ctx->gossip.ci_table );
     851           0 :       if( FD_LIKELY( idx!=ULONG_MAX ) ) {
     852           0 :         fd_snapct_gossip_ci_entry_t * entry = ctx->gossip.ci_table + idx;
     853           0 :         if( FD_LIKELY( ctx->config.gossip_peers_enabled ) ) {
     854           0 :           on_snapshot_hash( ctx, entry->rpc_addr, msg );
     855           0 :         }
     856           0 :       }
     857           0 :       break;
     858           0 :     }
     859           0 :     default:
     860           0 :       FD_LOG_ERR(( "snapct: unexpected gossip tag %u", (uint)msg->tag ));
     861           0 :       break;
     862           0 :   }
     863           0 : }
     864             : 
     865             : static void
     866             : snapld_frag( fd_snapct_tile_t *  ctx,
     867             :              ulong               sig,
     868             :              ulong               sz,
     869             :              ulong               chunk,
     870           0 :              fd_stem_context_t * stem ) {
     871           0 :   if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_META ) ) {
     872             :     /* Before snapld starts sending down data fragments, it first sends
     873             :        a metadata message containing the total size of the snapshot as
     874             :        well as the filename.  This is only done for HTTP loading. */
     875           0 :     int full;
     876           0 :     switch( ctx->state ) {
     877           0 :       case FD_SNAPCT_STATE_READING_FULL_HTTP:        full = 1; break;
     878           0 :       case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP: full = 0; break;
     879             : 
     880           0 :       case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
     881           0 :       case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
     882           0 :         return; /* Ignore */
     883           0 :       default: FD_LOG_ERR(( "invalid meta frag in state %d", ctx->state ));
     884           0 :     }
     885             : 
     886           0 :     FD_TEST( sz==sizeof(fd_ssctrl_meta_t) );
     887           0 :     fd_ssctrl_meta_t const * meta = fd_chunk_to_laddr_const( ctx->snapld_in.mem, chunk );
     888             : 
     889           0 :     if( full ) fd_memcpy( ctx->local_out.full_snapshot_name,        meta->name, PATH_MAX );
     890           0 :     else       fd_memcpy( ctx->local_out.incremental_snapshot_name, meta->name, PATH_MAX );
     891             : 
     892           0 :     if( FD_LIKELY( !!ctx->out_gui.mem ) ) {
     893           0 :       char snapshot_path[ PATH_MAX+30UL ]; /* 30 is fd_cstr_nlen( "https://255.255.255.255:65536/", ULONG_MAX ) */
     894           0 :       FD_TEST( fd_cstr_printf_check( snapshot_path, sizeof(snapshot_path), NULL, "http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), fd_ushort_bswap( ctx->addr.port ), meta->name ) );
     895           0 :       snapshot_path_gui_publish( ctx, stem, snapshot_path, full );
     896           0 :     }
     897             : 
     898           0 :     if( full ) ctx->metrics.full.bytes_total        = meta->total_sz;
     899           0 :     else       ctx->metrics.incremental.bytes_total = meta->total_sz;
     900             : 
     901           0 :     return;
     902           0 :   }
     903           0 :   if( FD_UNLIKELY( sig!=FD_SNAPSHOT_MSG_DATA ) ) return;
     904             : 
     905           0 :   int full, file;
     906           0 :   switch( ctx->state ) {
     907             :     /* Expected cases, fall through below */
     908           0 :     case FD_SNAPCT_STATE_READING_FULL_FILE:        full = 1; file = 1; break;
     909           0 :     case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE: full = 0; file = 1; break;
     910           0 :     case FD_SNAPCT_STATE_READING_FULL_HTTP:        full = 1; file = 0; break;
     911           0 :     case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP: full = 0; file = 0; break;
     912             : 
     913           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET:
     914           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET:
     915           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
     916           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
     917             :       /* We are waiting for a reset to fully propagate through the
     918             :          pipeline, just throw away any trailing data frags. */
     919           0 :       return;
     920             : 
     921           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_FILE:
     922           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE:
     923           0 :     case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP:
     924           0 :     case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP:
     925             :       /* Based on previously received data frags, we expected that the
     926             :          current full / incremental snapshot was finished, but then we
     927             :          received additional data frags.  Unsafe to continue so throw
     928             :          away the whole snapshot. */
     929           0 :       if( !ctx->malformed ) {
     930           0 :         ctx->malformed = 1;
     931           0 :         FD_LOG_WARNING(( "complete snapshot loaded but read %lu extra bytes", sz ));
     932           0 :       }
     933           0 :       return;
     934             : 
     935           0 :     case FD_SNAPCT_STATE_WAITING_FOR_PEERS:
     936           0 :     case FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL:
     937           0 :     case FD_SNAPCT_STATE_COLLECTING_PEERS:
     938           0 :     case FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL:
     939           0 :     case FD_SNAPCT_STATE_SHUTDOWN:
     940           0 :     default:
     941           0 :       FD_LOG_ERR(( "invalid data frag in state %d", ctx->state ));
     942           0 :       return;
     943           0 :   }
     944             : 
     945           0 :   if( full ) FD_TEST( ctx->metrics.full.bytes_total       !=0UL );
     946           0 :   else       FD_TEST( ctx->metrics.incremental.bytes_total!=0UL );
     947             : 
     948           0 :   if( full ) ctx->metrics.full.bytes_read        += sz;
     949           0 :   else       ctx->metrics.incremental.bytes_read += sz;
     950             : 
     951           0 :   if( !file && -1!=ctx->local_out.dir_fd ) {
     952           0 :     uchar const * data = fd_chunk_to_laddr_const( ctx->snapld_in.mem, chunk );
     953           0 :     int fd = full ? ctx->local_out.full_snapshot_fd : ctx->local_out.incremental_snapshot_fd;
     954           0 :     long result = write( fd, data, sz );
     955           0 :     if( FD_UNLIKELY( -1==result && errno==ENOSPC ) ) {
     956           0 :       char const * snapshot_path = full ? ctx->local_out.full_snapshot_path : ctx->local_out.incremental_snapshot_path;
     957           0 :       FD_LOG_ERR(( "Out of disk space when writing out snapshot data to `%s`", snapshot_path ));
     958           0 :     } else if( FD_UNLIKELY( 0L>result ) ) {
     959           0 :       FD_LOG_ERR(( "write() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     960           0 :     } else if( FD_UNLIKELY( sz!=(ulong)result ) ) {
     961           0 :       FD_LOG_ERR(( "paritial write(%lu)=%ld", sz, result ));
     962           0 :     }
     963           0 :     if( full ) ctx->metrics.full.bytes_written        += sz;
     964           0 :     else       ctx->metrics.incremental.bytes_written += sz;
     965           0 :   }
     966             : 
     967           0 :   if( FD_UNLIKELY( ( full && ctx->metrics.full.bytes_read        > ctx->metrics.full.bytes_total ) ||
     968           0 :                    (!full && ctx->metrics.incremental.bytes_read > ctx->metrics.incremental.bytes_total ) ) ) {
     969           0 :     if( !ctx->malformed ) {
     970           0 :       ctx->malformed = 1;
     971           0 :       FD_LOG_WARNING(( "expected %s snapshot size of %lu bytes but read %lu bytes",
     972           0 :                        full ? "full" : "incremental",
     973           0 :                        full ? ctx->metrics.full.bytes_total : ctx->metrics.incremental.bytes_total,
     974           0 :                        full ? ctx->metrics.full.bytes_read : ctx->metrics.incremental.bytes_read ));
     975             : 
     976           0 :     }
     977           0 :   }
     978           0 : }
     979             : 
     980             : static void
     981             : snapin_frag( fd_snapct_tile_t *  ctx,
     982           0 :              ulong               sig ) {
     983           0 :   switch( sig ) {
     984           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
     985           0 :     case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
     986             :       /* Note: We do not need to wait for the init control message to
     987             :          be flushed through the entire pipeline, like we do for fail and
     988             :          done.  It is safe to immediately send a fail message downstream. */
     989           0 :       break;
     990             : 
     991           0 :     case FD_SNAPSHOT_MSG_CTRL_NEXT:
     992           0 :       if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP ||
     993           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE ) ) {
     994           0 :         FD_TEST( !ctx->flush_ack );
     995           0 :         ctx->flush_ack = 1;
     996           0 :       } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
     997           0 :       break;
     998             : 
     999           0 :     case FD_SNAPSHOT_MSG_CTRL_DONE:
    1000           0 :       if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP ||
    1001           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE ||
    1002           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP ||
    1003           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE ) ) {
    1004           0 :         FD_TEST( !ctx->flush_ack );
    1005           0 :         ctx->flush_ack = 1;
    1006           0 :       } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
    1007           0 :       break;
    1008             : 
    1009           0 :     case FD_SNAPSHOT_MSG_CTRL_FAIL:
    1010           0 :       if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET ||
    1011           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ||
    1012           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET ||
    1013           0 :                      ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ) {
    1014           0 :         FD_TEST( !ctx->flush_ack );
    1015           0 :         ctx->flush_ack = 1;
    1016           0 :       } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
    1017           0 :       break;
    1018             : 
    1019           0 :     case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
    1020           0 :       break;
    1021             : 
    1022           0 :     case FD_SNAPSHOT_MSG_CTRL_ERROR:
    1023           0 :       switch( ctx->state ) {
    1024           0 :         case FD_SNAPCT_STATE_READING_FULL_FILE:
    1025           0 :         case FD_SNAPCT_STATE_FLUSHING_FULL_FILE:
    1026           0 :         case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
    1027           0 :         case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE:
    1028           0 :         case FD_SNAPCT_STATE_READING_FULL_HTTP:
    1029           0 :         case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP:
    1030           0 :         case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP:
    1031           0 :         case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP:
    1032           0 :           ctx->malformed = 1;
    1033           0 :           break;
    1034           0 :         default:
    1035           0 :           break;
    1036           0 :       }
    1037           0 :       break;
    1038           0 :   }
    1039           0 : }
    1040             : 
    1041             : static int
    1042             : returnable_frag( fd_snapct_tile_t *  ctx,
    1043             :                  ulong               in_idx,
    1044             :                  ulong               seq    FD_PARAM_UNUSED,
    1045             :                  ulong               sig,
    1046             :                  ulong               chunk,
    1047             :                  ulong               sz,
    1048             :                  ulong               ctl    FD_PARAM_UNUSED,
    1049             :                  ulong               tsorig FD_PARAM_UNUSED,
    1050             :                  ulong               tspub  FD_PARAM_UNUSED,
    1051           0 :                  fd_stem_context_t * stem ) {
    1052           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) {
    1053           0 :     gossip_frag( ctx, sig, sz, chunk );
    1054           0 :   } else if( ctx->in_kind[ in_idx ]==IN_KIND_SNAPLD ) {
    1055           0 :     snapld_frag( ctx, sig, sz, chunk, stem );
    1056           0 :   } else if( ctx->in_kind[ in_idx ]==IN_KIND_SNAPIN ) {
    1057           0 :     snapin_frag( ctx, sig );
    1058           0 :   } else FD_LOG_ERR(( "invalid in_kind %lu %hhu", in_idx, ctx->in_kind[ in_idx ] ));
    1059           0 :   return 0;
    1060           0 : }
    1061             : 
    1062             : static void
    1063             : privileged_init( fd_topo_t *      topo,
    1064           0 :                  fd_topo_tile_t * tile ) {
    1065           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
    1066             : 
    1067           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
    1068           0 :   fd_snapct_tile_t * ctx     = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t),  sizeof(fd_snapct_tile_t) );
    1069           0 :   void *             _ssping = FD_SCRATCH_ALLOC_APPEND( l, fd_ssping_align(),          fd_ssping_footprint( TOTAL_PEERS_MAX ) );
    1070             : 
    1071           0 :   fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
    1072             : 
    1073           0 :   ctx->ssping = fd_ssping_join( fd_ssping_new( _ssping, TOTAL_PEERS_MAX, 1UL, on_ping, ctx ) );
    1074           0 :   FD_TEST( ctx->ssping );
    1075             : 
    1076             :   /* By default, the snapct tile selects peers and its initial state is
    1077             :      WAITING_FOR_PEERS. */
    1078           0 :   ctx->state          = FD_SNAPCT_STATE_WAITING_FOR_PEERS;
    1079           0 :   ctx->deadline_nanos = fd_log_wallclock() + FD_SNAPCT_WAITING_FOR_PEERS_TIMEOUT_DEADLINE_NANOS;
    1080             : 
    1081           0 :   ctx->local_out.dir_fd                  = -1;
    1082           0 :   ctx->local_out.full_snapshot_fd        = -1;
    1083           0 :   ctx->local_out.incremental_snapshot_fd = -1;
    1084           0 :   fd_memset( ctx->local_out.full_snapshot_name, 0, PATH_MAX );
    1085           0 :   fd_memset( ctx->local_out.incremental_snapshot_name, 0, PATH_MAX );
    1086             : 
    1087           0 :   fd_ssarchive_remove_old_snapshots( tile->snapct.snapshots_path,
    1088           0 :                                      tile->snapct.max_full_snapshots_to_keep,
    1089           0 :                                      tile->snapct.max_incremental_snapshots_to_keep );
    1090             : 
    1091           0 :   ulong full_slot = ULONG_MAX;
    1092           0 :   ulong incremental_slot = ULONG_MAX;
    1093           0 :   char full_path[ PATH_MAX ] = {0};
    1094           0 :   char incremental_path[ PATH_MAX ] = {0};
    1095           0 :   if( FD_UNLIKELY( -1==fd_ssarchive_latest_pair( tile->snapct.snapshots_path,
    1096           0 :                                                  tile->snapct.incremental_snapshot_fetch,
    1097           0 :                                                  &full_slot,
    1098           0 :                                                  &incremental_slot,
    1099           0 :                                                  full_path,
    1100           0 :                                                  incremental_path ) ) ) {
    1101           0 :     if( FD_UNLIKELY( !tile->snapct.do_download ) ) {
    1102           0 :       FD_LOG_ERR(( "No snapshots found in `%s` and downloading is disabled. "
    1103           0 :                    "Please enable downloading via [snapshots.download] and restart.", tile->snapct.snapshots_path ));
    1104           0 :     }
    1105           0 :     ctx->local_in.full_snapshot_slot        = ULONG_MAX;
    1106           0 :     ctx->local_in.incremental_snapshot_slot = ULONG_MAX;
    1107           0 :   } else {
    1108           0 :     FD_TEST( full_slot!=ULONG_MAX );
    1109             : 
    1110           0 :     ctx->local_in.full_snapshot_slot        = full_slot;
    1111           0 :     ctx->local_in.incremental_snapshot_slot = incremental_slot;
    1112             : 
    1113           0 :     strncpy( ctx->local_in.full_snapshot_path, full_path, PATH_MAX );
    1114           0 :     struct stat full_stat;
    1115           0 :     if( FD_UNLIKELY( -1==stat( ctx->local_in.full_snapshot_path, &full_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", full_path, errno, fd_io_strerror( errno ) ));
    1116           0 :     if( FD_UNLIKELY( !S_ISREG( full_stat.st_mode ) ) ) FD_LOG_ERR(( "full snapshot path `%s` is not a regular file", full_path ));
    1117           0 :     ctx->local_in.full_snapshot_size = (ulong)full_stat.st_size;
    1118             : 
    1119           0 :     if( FD_LIKELY( incremental_slot!=ULONG_MAX ) ) {
    1120           0 :       strncpy( ctx->local_in.incremental_snapshot_path, incremental_path, PATH_MAX );
    1121           0 :       struct stat incremental_stat;
    1122           0 :       if( FD_UNLIKELY( -1==stat( ctx->local_in.incremental_snapshot_path, &incremental_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", incremental_path, errno, fd_io_strerror( errno ) ));
    1123           0 :       if( FD_UNLIKELY( !S_ISREG( incremental_stat.st_mode ) ) ) FD_LOG_ERR(( "incremental snapshot path `%s` is not a regular file", incremental_path ));
    1124           0 :       ctx->local_in.incremental_snapshot_size = (ulong)incremental_stat.st_size;
    1125           0 :     }
    1126             : 
    1127           0 :     ctx->local_out.dir_fd                  = -1;
    1128           0 :     ctx->local_out.full_snapshot_fd        = -1;
    1129           0 :     ctx->local_out.incremental_snapshot_fd = -1;
    1130           0 :   }
    1131             : 
    1132             :   /* Set up download descriptors because even if we have local
    1133             :      snapshots, we may need to download new snapshots if the local
    1134             :      snapshots are too old. */
    1135           0 :   ctx->local_out.dir_fd = open( tile->snapct.snapshots_path, O_DIRECTORY|O_CLOEXEC );
    1136           0 :   if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", tile->snapct.snapshots_path, errno, fd_io_strerror( errno ) ));
    1137             : 
    1138           0 :   FD_TEST( fd_cstr_printf_check( ctx->local_out.full_snapshot_path, PATH_MAX, NULL, "%s/snapshot.tar.bz2-partial", tile->snapct.snapshots_path ) );
    1139           0 :   ctx->local_out.full_snapshot_fd = openat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
    1140           0 :   if( FD_UNLIKELY( -1==ctx->local_out.full_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_out.full_snapshot_path, errno, fd_io_strerror( errno ) ));
    1141             : 
    1142           0 :   if( FD_LIKELY( tile->snapct.incremental_snapshot_fetch ) ) {
    1143           0 :     FD_TEST( fd_cstr_printf_check( ctx->local_out.incremental_snapshot_path, PATH_MAX, NULL, "%s/incremental-snapshot.tar.bz2-partial", tile->snapct.snapshots_path ) );
    1144           0 :     ctx->local_out.incremental_snapshot_fd = openat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
    1145           0 :     if( FD_UNLIKELY( -1==ctx->local_out.incremental_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_out.incremental_snapshot_path, errno, fd_io_strerror( errno ) ));
    1146           0 :   } else {
    1147           0 :     ctx->local_out.incremental_snapshot_fd = -1;
    1148           0 :   }
    1149           0 : }
    1150             : 
    1151             : static inline fd_restore_out_link_t
    1152             : out1( fd_topo_t const *      topo,
    1153             :       fd_topo_tile_t const * tile,
    1154           0 :       char const *           name ) {
    1155           0 :   ulong idx = ULONG_MAX;
    1156             : 
    1157           0 :   for( ulong i=0UL; i<tile->out_cnt; i++ ) {
    1158           0 :     fd_topo_link_t const * link = &topo->links[ tile->out_link_id[ i ] ];
    1159           0 :     if( !strcmp( link->name, name ) ) {
    1160           0 :       if( FD_UNLIKELY( idx!=ULONG_MAX ) ) FD_LOG_ERR(( "tile %s:%lu had multiple output links named %s but expected one", tile->name, tile->kind_id, name ));
    1161           0 :       idx = i;
    1162           0 :     }
    1163           0 :   }
    1164             : 
    1165           0 :   if( FD_UNLIKELY( idx==ULONG_MAX ) ) return (fd_restore_out_link_t){ .idx = ULONG_MAX, .mem = NULL, .chunk0 = 0, .wmark = 0, .chunk = 0, .mtu = 0 };
    1166             : 
    1167           0 :   ulong mtu = topo->links[ tile->out_link_id[ idx ] ].mtu;
    1168           0 :   if( FD_UNLIKELY( mtu==0UL ) ) return (fd_restore_out_link_t){ .idx = idx, .mem = NULL, .chunk0 = ULONG_MAX, .wmark = ULONG_MAX, .chunk = ULONG_MAX, .mtu = mtu };
    1169             : 
    1170           0 :   void * mem   = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ idx ] ].dcache_obj_id ].wksp_id ].wksp;
    1171           0 :   ulong chunk0 = fd_dcache_compact_chunk0( mem, topo->links[ tile->out_link_id[ idx ] ].dcache );
    1172           0 :   ulong wmark  = fd_dcache_compact_wmark ( mem, topo->links[ tile->out_link_id[ idx ] ].dcache, mtu );
    1173           0 :   return (fd_restore_out_link_t){ .idx = idx, .mem = mem, .chunk0 = chunk0, .wmark = wmark, .chunk = chunk0, .mtu = mtu };
    1174           0 : }
    1175             : 
    1176             : static void
    1177             : unprivileged_init( fd_topo_t *      topo,
    1178           0 :                    fd_topo_tile_t * tile ) {
    1179           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
    1180             : 
    1181           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
    1182           0 :   fd_snapct_tile_t * ctx  = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t),            sizeof(fd_snapct_tile_t)       );
    1183           0 :                             FD_SCRATCH_ALLOC_APPEND( l, fd_ssping_align(),                    fd_ssping_footprint( TOTAL_PEERS_MAX ) );
    1184           0 :   void * _ci_table        = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_gossip_ci_entry_t), sizeof(fd_snapct_gossip_ci_entry_t) * GOSSIP_PEERS_MAX );
    1185           0 :   void * _ci_map          = FD_SCRATCH_ALLOC_APPEND( l, gossip_ci_map_align(),                gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
    1186           0 :   void * _ssresolver      = FD_SCRATCH_ALLOC_APPEND( l, fd_http_resolver_align(),             fd_http_resolver_footprint( SERVER_PEERS_MAX ) );
    1187           0 :   void * _selector        = FD_SCRATCH_ALLOC_APPEND( l, fd_sspeer_selector_align(),           fd_sspeer_selector_footprint( TOTAL_PEERS_MAX ) );
    1188             : 
    1189           0 :   ctx->malformed = 0;
    1190             : 
    1191           0 :   fd_memcpy( ctx->config.path, tile->snapct.snapshots_path, PATH_MAX );
    1192           0 :   ctx->config.incremental_snapshot_fetch        = tile->snapct.incremental_snapshot_fetch;
    1193           0 :   ctx->config.do_download                       = tile->snapct.do_download;
    1194           0 :   ctx->config.maximum_local_snapshot_age        = tile->snapct.maximum_local_snapshot_age;
    1195           0 :   ctx->config.minimum_download_speed_mib        = tile->snapct.minimum_download_speed_mib;
    1196           0 :   ctx->config.max_full_snapshots_to_keep        = tile->snapct.max_full_snapshots_to_keep;
    1197           0 :   ctx->config.max_incremental_snapshots_to_keep = tile->snapct.max_incremental_snapshots_to_keep;
    1198           0 :   ctx->config.gossip_peers_enabled              = tile->snapct.gossip_peers_enabled;
    1199             : 
    1200           0 :   if( FD_UNLIKELY( !tile->snapct.maximum_download_retry_abort ) ) ctx->config.maximum_download_retry_abort = UINT_MAX;
    1201           0 :   else                                                            ctx->config.maximum_download_retry_abort = tile->snapct.maximum_download_retry_abort;
    1202             : 
    1203           0 :   ctx->selector = fd_sspeer_selector_join( fd_sspeer_selector_new( _selector, TOTAL_PEERS_MAX, ctx->config.incremental_snapshot_fetch, 1UL ) );
    1204             : 
    1205           0 :   fd_memset( _ci_table, 0, sizeof(fd_snapct_gossip_ci_entry_t) * GOSSIP_PEERS_MAX );
    1206           0 :   ctx->gossip.ci_table = _ci_table;
    1207             : 
    1208           0 :   ctx->gossip.ci_map = gossip_ci_map_join( gossip_ci_map_new( _ci_map, gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ), 0UL ) );
    1209             : 
    1210           0 :   int has_snapld_dc = 0, has_snapin_rd = 0;
    1211           0 :   FD_TEST( tile->in_cnt<=MAX_IN_LINKS );
    1212           0 :   for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
    1213           0 :     fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
    1214           0 :     if( 0==strcmp( in_link->name, "gossip_out" ) ) {
    1215           0 :       ctx->in_kind[ i ]     = IN_KIND_GOSSIP;
    1216           0 :       ctx->gossip_in.mem    = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
    1217           0 :       ctx->gossip_in.chunk0 = fd_dcache_compact_chunk0( ctx->gossip_in.mem, in_link->dcache );
    1218           0 :       ctx->gossip_in.wmark  = fd_dcache_compact_wmark ( ctx->gossip_in.mem, in_link->dcache, in_link->mtu );
    1219           0 :       ctx->gossip_in.mtu    = in_link->mtu;
    1220           0 :     } else if( 0==strcmp( in_link->name, "snapld_dc" ) ) {
    1221           0 :       ctx->in_kind[ i ]     = IN_KIND_SNAPLD;
    1222           0 :       ctx->snapld_in.mem    = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
    1223           0 :       ctx->snapld_in.chunk0 = fd_dcache_compact_chunk0( ctx->snapld_in.mem, in_link->dcache );
    1224           0 :       ctx->snapld_in.wmark  = fd_dcache_compact_wmark ( ctx->snapld_in.mem, in_link->dcache, in_link->mtu );
    1225           0 :       ctx->snapld_in.mtu    = in_link->mtu;
    1226           0 :       FD_TEST( !has_snapld_dc );
    1227           0 :       has_snapld_dc = 1;
    1228           0 :     } else if( 0==strcmp( in_link->name, "snapin_rd" ) ) {
    1229           0 :       ctx->in_kind[ i ] = IN_KIND_SNAPIN;
    1230           0 :       FD_TEST( !has_snapin_rd );
    1231           0 :       has_snapin_rd = 1;
    1232           0 :     }
    1233           0 :   }
    1234           0 :   FD_TEST( has_snapld_dc && has_snapin_rd );
    1235             : 
    1236           0 :   ctx->ssresolver = fd_http_resolver_join( fd_http_resolver_new( _ssresolver, SERVER_PEERS_MAX, ctx->config.incremental_snapshot_fetch, on_resolve, ctx ) );
    1237           0 :   FD_TEST( ctx->ssresolver );
    1238             : 
    1239           0 :   if( FD_UNLIKELY( tile->out_cnt<2UL || tile->out_cnt>3UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2-3", tile->out_cnt ));
    1240           0 :   ctx->out_ld  = out1( topo, tile, "snapct_ld"   );
    1241           0 :   ctx->out_gui = out1( topo, tile, "snapct_gui"  );
    1242           0 :   ctx->out_rp  = out1( topo, tile, "snapct_repr" );
    1243             : 
    1244           0 :   for( ulong i=0UL; i<tile->snapct.http.peers_cnt; i++ ) {
    1245           0 :     tile->snapct.http.peers[ i ].port = fd_ushort_bswap( tile->snapct.http.peers[ i ].port ); /* TODO: should be fixed in a future PR */
    1246           0 :     fd_ssping_add( ctx->ssping, tile->snapct.http.peers[ i ] );
    1247           0 :     fd_http_resolver_add( ctx->ssresolver, tile->snapct.http.peers[ i ] );
    1248           0 :   }
    1249             : 
    1250           0 :   ctx->predicted_incremental.full_slot = ULONG_MAX;
    1251           0 :   ctx->predicted_incremental.slot      = ULONG_MAX;
    1252           0 :   ctx->predicted_incremental.dirty     = 0;
    1253             : 
    1254           0 :   ctx->gossip.saturated            = 0;
    1255           0 : }
    1256             : 
    1257             : /* after_credit can result in as many as 5 stem publishes in some code
    1258             :    paths, and returnable_frag can result in 1. */
    1259           0 : #define STEM_BURST 6UL
    1260             : 
    1261           0 : #define STEM_LAZY  1000L
    1262             : 
    1263           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_snapct_tile_t
    1264           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapct_tile_t)
    1265             : 
    1266             : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
    1267           0 : #define STEM_CALLBACK_METRICS_WRITE   metrics_write
    1268           0 : #define STEM_CALLBACK_AFTER_CREDIT    after_credit
    1269           0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
    1270             : 
    1271             : #include "../../disco/stem/fd_stem.c"
    1272             : 
    1273             : fd_topo_run_tile_t fd_tile_snapct = {
    1274             :   .name                     = NAME,
    1275             :   .rlimit_file_cnt_fn       = rlimit_file_cnt,
    1276             :   .populate_allowed_seccomp = populate_allowed_seccomp,
    1277             :   .populate_allowed_fds     = populate_allowed_fds,
    1278             :   .scratch_align            = scratch_align,
    1279             :   .scratch_footprint        = scratch_footprint,
    1280             :   .privileged_init          = privileged_init,
    1281             :   .unprivileged_init        = unprivileged_init,
    1282             :   .run                      = stem_run,
    1283             :   .keep_host_networking     = 1,
    1284             :   .allow_connect            = 1,
    1285             :   .allow_renameat           = 1,
    1286             : };
    1287             : 
    1288             : #undef NAME
       |