LCOV - code coverage report
Current view: top level - discof/restore - fd_snaprd_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 672 0.0 %
Date: 2025-09-19 04:41:14 Functions: 0 15 0.0 %

          Line data    Source code
       1             : #include "utils/fd_ssping.h"
       2             : #include "utils/fd_sshttp.h"
       3             : #include "utils/fd_ssctrl.h"
       4             : #include "utils/fd_ssarchive.h"
       5             : 
       6             : #include "../../disco/topo/fd_topo.h"
       7             : #include "../../disco/metrics/fd_metrics.h"
       8             : #include "../../flamenco/gossip/fd_gossip_types.h"
       9             : 
      10             : #include <errno.h>
      11             : #include <fcntl.h>
      12             : #include <string.h>
      13             : #include <unistd.h>
      14             : #include <stdio.h>
      15             : #include <sys/stat.h>
      16             : 
      17             : #define NAME "snaprd"
      18             : 
      19             : /* The snaprd tile at a high level is a state machine that downloads
      20             :    snapshots from the network or reads snapshots from disk and produces
      21             :    a byte stream that is parsed by downstream snapshot consumer tiles.
      22             :    The snaprd tile gathers the latest SnapshotHashes information from
      23             :    gossip to decide whether to download snapshots or read local
      24             :    snapshots from disk.  If the snaprd tile needs to download a snapshot,
      25             :    it goes through the process of discovering and selecting elegible
      26             :    peers from gossip to download from. */
      27             : 
      28           0 : #define FD_SNAPRD_STATE_WAITING_FOR_PEERS         ( 0) /* Waiting for first peer to arrive from gossip to download from */
      29           0 : #define FD_SNAPRD_STATE_COLLECTING_PEERS          ( 1) /* First peer arrived, wait a little longer to see if a better one arrives */
      30           0 : #define FD_SNAPRD_STATE_READING_FULL_FILE         ( 2) /* Full file looks better than peer, reading it from disk */
      31           0 : #define FD_SNAPRD_STATE_FLUSHING_FULL_FILE        ( 3) /* Full file was read ok, confirm it decompressed and inserted ok */
      32           0 : #define FD_SNAPRD_STATE_FLUSHING_FULL_FILE_RESET  ( 4) /* Resetting to load full snapshot from file again, confirm decompress and inserter are reset too */
      33           0 : #define FD_SNAPRD_STATE_READING_INCREMENTAL_FILE  ( 5) /* Incremental file looks better than peer, reading it from disk */
      34           0 : #define FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE ( 6) /* Incremental file was read ok, confirm it decompressed and inserted ok */
      35           0 : #define FD_SNAPRD_STATE_READING_FULL_HTTP         ( 7) /* Peer was selected, reading full snapshot from HTTP */
      36           0 : #define FD_SNAPRD_STATE_FLUSHING_FULL_HTTP        ( 8) /* Full snapshot was downloaded ok, confirm it decompressed and inserted ok */
      37           0 : #define FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET  ( 9) /* Resetting to load full snapshot from HTTP again, confirm decompress and inserter are reset too */
      38           0 : #define FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP  (10) /* Peer was selected, reading incremental snapshot from HTTP */
      39           0 : #define FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP (11) /* Incremental snapshot was downloaded ok, confirm it decompressed and inserted ok */
      40           0 : #define FD_SNAPRD_STATE_SHUTDOWN                  (12) /* The tile is done, and has likely already exited */
      41             : 
      42           0 : #define SNAPRD_FILE_BUF_SZ (1024UL*1024UL) /* 1 MiB */
      43             : 
      44           0 : #define IN_KIND_SNAPCTL (0)
      45           0 : #define IN_KIND_GOSSIP  (1)
      46             : #define MAX_IN_LINKS    (3)
      47             : 
      48             : struct fd_snaprd_tile {
      49             :   fd_ssping_t * ssping;
      50             :   fd_sshttp_t * sshttp;
      51             : 
      52             :   int   state;
      53             :   int   malformed;
      54             :   long  deadline_nanos;
      55             :   ulong ack_cnt;
      56             :   int   peer_selection;
      57             : 
      58             :   long diagnostic_deadline_nanos;
      59             : 
      60             :   fd_ip4_port_t addr;
      61             : 
      62             :   struct {
      63             :     ulong write_buffer_pos;
      64             :     ulong write_buffer_len;
      65             :     uchar write_buffer[ SNAPRD_FILE_BUF_SZ ];
      66             : 
      67             :     char  full_snapshot_path[ PATH_MAX ];
      68             :     char  incremental_snapshot_path[ PATH_MAX ];
      69             : 
      70             :     int   dir_fd;
      71             :     int   full_snapshot_fd;
      72             :     int   incremental_snapshot_fd;
      73             :   } local_out;
      74             : 
      75             :   uchar in_kind[ MAX_IN_LINKS ];
      76             : 
      77             :   struct {
      78             :     ulong full_snapshot_slot;
      79             :     int   full_snapshot_fd;
      80             :     char  full_snapshot_path[ PATH_MAX ];
      81             :     ulong full_snapshot_size;
      82             : 
      83             :     ulong incremental_snapshot_slot;
      84             :     int   incremental_snapshot_fd;
      85             :     char  incremental_snapshot_path[ PATH_MAX ];
      86             :     ulong incremental_snapshot_size;
      87             :   } local_in;
      88             : 
      89             :   struct {
      90             :     char path[ PATH_MAX ];
      91             :     int  do_download;
      92             :     int  incremental_snapshot_fetch;
      93             :     uint maximum_local_snapshot_age;
      94             :     uint minimum_download_speed_mib;
      95             :     uint maximum_download_retry_abort;
      96             :     uint max_full_snapshots_to_keep;
      97             :     uint max_incremental_snapshots_to_keep;
      98             :   } config;
      99             : 
     100             :   struct {
     101             :     struct {
     102             :       ulong bytes_read;
     103             :       ulong bytes_written;
     104             :       ulong bytes_total;
     105             :       uint  num_retries;
     106             :     } full;
     107             : 
     108             :     struct {
     109             :       ulong bytes_read;
     110             :       ulong bytes_written;
     111             :       ulong bytes_total;
     112             :       uint  num_retries;
     113             :     } incremental;
     114             :   } metrics;
     115             : 
     116             :   /* TODO: Don't do this ... should be in the monitor instead */
     117             :   struct {
     118             :     ulong prev_bytes_read;
     119             :     ulong prev_accounts_inserted;    volatile ulong * cur_accounts_inserted;
     120             : 
     121             :     ulong prev_snaprd_backp_prefrag; volatile ulong * cur_snaprd_backp_prefrag;
     122             :     ulong prev_snaprd_wait;          volatile ulong * cur_snaprd_caughtup_postfrag;
     123             :     ulong prev_snapdc_backp_prefrag; volatile ulong * cur_snapdc_backp_prefrag;
     124             :     ulong prev_snapdc_wait;          volatile ulong * cur_snapdc_caughtup_postfrag;
     125             :     ulong prev_snapin_backp_prefrag; volatile ulong * cur_snapin_backp_prefrag;
     126             :     ulong prev_snapin_wait;          volatile ulong * cur_snapin_caughtup_postfrag;
     127             :   } diagnostics;
     128             : 
     129             :   struct {
     130             :     fd_wksp_t * mem;
     131             :     ulong       chunk0;
     132             :     ulong       wmark;
     133             :     ulong       mtu;
     134             :   } gossip_in;
     135             : 
     136             :   struct {
     137             :     fd_gossip_update_message_t tmp_upd_buf;
     138             :     fd_contact_info_t *        ci_table;
     139             :   } gossip;
     140             : 
     141             :   struct {
     142             :     fd_wksp_t * wksp;
     143             :     ulong       chunk0;
     144             :     ulong       wmark;
     145             :     ulong       chunk;
     146             :     ulong       mtu;
     147             :   } out;
     148             : };
     149             : 
     150             : typedef struct fd_snaprd_tile fd_snaprd_tile_t;
     151             : 
     152             : static ulong
     153           0 : scratch_align( void ) {
     154           0 :   return alignof(fd_snaprd_tile_t);
     155           0 : }
     156             : 
     157             : static ulong
     158           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     159           0 :   (void)tile;
     160           0 :   ulong l = FD_LAYOUT_INIT;
     161           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_snaprd_tile_t),  sizeof(fd_snaprd_tile_t)       );
     162           0 :   l = FD_LAYOUT_APPEND( l, fd_sshttp_align(),          fd_sshttp_footprint()          );
     163           0 :   l = FD_LAYOUT_APPEND( l, fd_ssping_align(),          fd_ssping_footprint( 65536UL ) );
     164           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_contact_info_t), sizeof(fd_contact_info_t) * FD_CONTACT_INFO_TABLE_SIZE );
     165           0 :   return FD_LAYOUT_FINI( l, alignof(fd_snaprd_tile_t) );
     166           0 : }
     167             : 
     168             : static inline int
     169           0 : should_shutdown( fd_snaprd_tile_t * ctx ) {
     170           0 :   return ctx->state==FD_SNAPRD_STATE_SHUTDOWN;
     171           0 : }
     172             : 
     173             : static void
     174           0 : metrics_write( fd_snaprd_tile_t * ctx ) {
     175           0 :   FD_MGAUGE_SET( SNAPRD, FULL_BYTES_READ,               ctx->metrics.full.bytes_read );
     176           0 :   FD_MGAUGE_SET( SNAPRD, FULL_BYTES_WRITTEN,            ctx->metrics.full.bytes_written );
     177           0 :   FD_MGAUGE_SET( SNAPRD, FULL_BYTES_TOTAL,              ctx->metrics.full.bytes_total );
     178           0 :   FD_MGAUGE_SET( SNAPRD, FULL_DOWNLOAD_RETRIES,         ctx->metrics.full.num_retries );
     179             : 
     180           0 :   FD_MGAUGE_SET( SNAPRD, INCREMENTAL_BYTES_READ,        ctx->metrics.incremental.bytes_read );
     181           0 :   FD_MGAUGE_SET( SNAPRD, INCREMENTAL_BYTES_WRITTEN,     ctx->metrics.incremental.bytes_written );
     182           0 :   FD_MGAUGE_SET( SNAPRD, INCREMENTAL_BYTES_TOTAL,       ctx->metrics.incremental.bytes_total );
     183           0 :   FD_MGAUGE_SET( SNAPRD, INCREMENTAL_DOWNLOAD_RETRIES,  ctx->metrics.incremental.num_retries );
     184             : 
     185           0 :   FD_MGAUGE_SET( SNAPRD, STATE, (ulong)ctx->state );
     186           0 : }
     187             : 
     188             : static void
     189             : read_file_data( fd_snaprd_tile_t *  ctx,
     190           0 :                 fd_stem_context_t * stem ) {
     191           0 :   uchar * out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk );
     192             : 
     193           0 :   FD_TEST( ctx->state==FD_SNAPRD_STATE_READING_INCREMENTAL_FILE || ctx->state==FD_SNAPRD_STATE_READING_FULL_FILE );
     194           0 :   int full = ctx->state==FD_SNAPRD_STATE_READING_FULL_FILE;
     195           0 :   long result = read( full ? ctx->local_in.full_snapshot_fd : ctx->local_in.incremental_snapshot_fd , out, ctx->out.mtu );
     196           0 :   if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) return;
     197           0 :   else if( FD_UNLIKELY( -1==result ) ) FD_LOG_ERR(( "read() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     198             : 
     199           0 :   switch( ctx->state ) {
     200           0 :     case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE:
     201           0 :       ctx->metrics.incremental.bytes_read += (ulong)result;
     202           0 :       break;
     203           0 :     case FD_SNAPRD_STATE_READING_FULL_FILE:
     204           0 :       ctx->metrics.full.bytes_read += (ulong)result;
     205           0 :       break;
     206           0 :     default:
     207           0 :       break;
     208           0 :   }
     209             : 
     210           0 :   if( FD_UNLIKELY( !result ) ) {
     211           0 :     switch( ctx->state ) {
     212           0 :       case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE:
     213           0 :         fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
     214           0 :         ctx->state = FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE;
     215           0 :         break;
     216           0 :       case FD_SNAPRD_STATE_READING_FULL_FILE:
     217           0 :         if( FD_LIKELY( ctx->config.incremental_snapshot_fetch ) ) {
     218           0 :           fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_EOF_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
     219           0 :         } else {
     220           0 :           fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
     221           0 :         }
     222           0 :         ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_FILE;
     223           0 :         break;
     224           0 :       default:
     225           0 :         break;
     226           0 :     }
     227           0 :     return;
     228           0 :   }
     229             : 
     230           0 :   fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out.chunk, (ulong)result, 0UL, 0UL, 0UL );
     231           0 :   ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, (ulong)result, ctx->out.chunk0, ctx->out.wmark );
     232           0 : }
     233             : 
     234             : static void
     235             : read_http_data( fd_snaprd_tile_t *  ctx,
     236             :                 fd_stem_context_t * stem,
     237           0 :                 long                now ) {
     238           0 :   uchar * out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk );
     239             : 
     240           0 :   ulong buffer_avail = fd_ulong_if( -1!=ctx->local_out.dir_fd, SNAPRD_FILE_BUF_SZ-ctx->local_out.write_buffer_len, ULONG_MAX );
     241           0 :   ulong data_len = fd_ulong_min( buffer_avail, ctx->out.mtu );
     242           0 :   int result = fd_sshttp_advance( ctx->sshttp, &data_len, out, now );
     243             : 
     244           0 :   switch( result ) {
     245           0 :     case FD_SSHTTP_ADVANCE_AGAIN: break;
     246           0 :     case FD_SSHTTP_ADVANCE_ERROR: {
     247           0 :       FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
     248           0 :                       FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), fd_ushort_bswap( ctx->addr.port ) ));
     249           0 :       fd_ssping_invalidate( ctx->ssping, ctx->addr, now );
     250           0 :       fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
     251           0 :       ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
     252           0 :       ctx->deadline_nanos = now;
     253           0 :       break;
     254           0 :     }
     255           0 :     case FD_SSHTTP_ADVANCE_DONE: {
     256           0 :       switch( ctx->state ) {
     257           0 :         case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP:
     258           0 :           fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
     259           0 :           ctx->state = FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP;
     260           0 :           break;
     261           0 :         case FD_SNAPRD_STATE_READING_FULL_HTTP:
     262           0 :           if( FD_LIKELY( ctx->config.incremental_snapshot_fetch ) ) {
     263           0 :             fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_EOF_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
     264           0 :           } else {
     265           0 :             fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
     266           0 :           }
     267           0 :           ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP;
     268           0 :           break;
     269           0 :         default:
     270           0 :           break;
     271           0 :       }
     272           0 :       break;
     273           0 :     }
     274           0 :     case FD_SSHTTP_ADVANCE_DATA: {
     275           0 :       if( FD_LIKELY( ctx->state==FD_SNAPRD_STATE_READING_FULL_HTTP ) ) ctx->metrics.full.bytes_total = fd_sshttp_content_len( ctx->sshttp );
     276           0 :       else                                                             ctx->metrics.incremental.bytes_total = fd_sshttp_content_len( ctx->sshttp );
     277             : 
     278           0 :       fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out.chunk, data_len, 0UL, 0UL, 0UL );
     279           0 :       ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, data_len, ctx->out.chunk0, ctx->out.wmark );
     280             : 
     281           0 :       ulong written_sz = 0UL;
     282           0 :       if( FD_LIKELY( -1!=ctx->local_out.dir_fd && !ctx->local_out.write_buffer_len ) ) {
     283           0 :         while( written_sz<data_len ) {
     284           0 :           int full = ctx->state==FD_SNAPRD_STATE_READING_FULL_HTTP;
     285           0 :           int fd = full ? ctx->local_out.full_snapshot_fd : ctx->local_out.incremental_snapshot_fd;
     286           0 :           long result = write( fd, out+written_sz, data_len-written_sz );
     287           0 :           if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) break;
     288           0 :           else if( FD_UNLIKELY( -1==result && errno==ENOSPC ) ) {
     289           0 :             char const * snapshot_path = full ? ctx->local_out.full_snapshot_path : ctx->local_out.incremental_snapshot_path;
     290           0 :             FD_LOG_ERR(( "Out of disk space when writing out snapshot data to `%s`", snapshot_path ));
     291           0 :           } else if( FD_UNLIKELY( -1==result ) ) {
     292           0 :             FD_LOG_ERR(( "write() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     293           0 :             break;
     294           0 :           }
     295           0 :           written_sz += (ulong)result;
     296           0 :         }
     297           0 :       }
     298             : 
     299           0 :       if( FD_UNLIKELY( written_sz<data_len ) ) {
     300           0 :         fd_memcpy( ctx->local_out.write_buffer+ctx->local_out.write_buffer_len, out+written_sz, data_len-written_sz );
     301           0 :       }
     302           0 :       ctx->local_out.write_buffer_len += data_len-written_sz;
     303             : 
     304           0 :       switch( ctx->state ) {
     305           0 :         case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP:
     306           0 :           ctx->metrics.incremental.bytes_read += data_len;
     307           0 :           ctx->metrics.incremental.bytes_written += written_sz;
     308           0 :           break;
     309           0 :         case FD_SNAPRD_STATE_READING_FULL_HTTP:
     310           0 :           ctx->metrics.full.bytes_read += data_len;
     311           0 :           ctx->metrics.full.bytes_written += written_sz;
     312           0 :           break;
     313           0 :         default:
     314           0 :           FD_LOG_ERR(( "unexpected state %d", ctx->state ));
     315           0 :           break;
     316           0 :       }
     317             : 
     318           0 :       break;
     319           0 :     }
     320           0 :     default:
     321           0 :       FD_LOG_ERR(( "unexpected fd_sshttp_advance result %d", result ));
     322           0 :       break;
     323           0 :   }
     324           0 : }
     325             : 
     326             : static void
     327           0 : drain_buffer( fd_snaprd_tile_t * ctx ) {
     328           0 :   if( FD_UNLIKELY( ctx->state!=FD_SNAPRD_STATE_READING_FULL_HTTP &&
     329           0 :                    ctx->state!=FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP &&
     330           0 :                    ctx->state!=FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP &&
     331           0 :                    ctx->state!=FD_SNAPRD_STATE_FLUSHING_FULL_HTTP ) ) return;
     332             : 
     333           0 :   if( FD_LIKELY( -1==ctx->local_out.dir_fd || !ctx->local_out.write_buffer_len ) ) return;
     334             : 
     335           0 :   int full = ctx->state==FD_SNAPRD_STATE_READING_FULL_HTTP || ctx->state==FD_SNAPRD_STATE_FLUSHING_FULL_HTTP;
     336           0 :   int fd = full ? ctx->local_out.full_snapshot_fd : ctx->local_out.incremental_snapshot_fd;
     337             : 
     338           0 :   ulong written_sz = 0UL;
     339           0 :   while( ctx->local_out.write_buffer_pos+written_sz<ctx->local_out.write_buffer_len ) {
     340           0 :     long result = write( fd, ctx->local_out.write_buffer+ctx->local_out.write_buffer_pos+written_sz, ctx->local_out.write_buffer_len-written_sz );
     341           0 :     if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) break;
     342           0 :     else if( FD_UNLIKELY( -1==result && errno==ENOSPC ) ) {
     343           0 :       char const * snapshot_path = full ? ctx->local_out.full_snapshot_path : ctx->local_out.incremental_snapshot_path;
     344           0 :       FD_LOG_ERR(( "Out of disk space when writing out snapshot data to `%s`", snapshot_path ));
     345           0 :     } else if( FD_UNLIKELY( -1==result ) ) {
     346           0 :       FD_LOG_ERR(( "write() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     347           0 :       break;
     348           0 :     }
     349           0 :     written_sz += (ulong)result;
     350           0 :   }
     351             : 
     352           0 :   ctx->local_out.write_buffer_pos += written_sz;
     353             : 
     354           0 :   if( FD_LIKELY( ctx->local_out.write_buffer_pos==ctx->local_out.write_buffer_len ) ) {
     355           0 :     ctx->local_out.write_buffer_pos = 0UL;
     356           0 :     ctx->local_out.write_buffer_len = 0UL;
     357           0 :   }
     358             : 
     359           0 :   if( FD_LIKELY( full ) ) ctx->metrics.full.bytes_written += written_sz;
     360           0 :   else                    ctx->metrics.incremental.bytes_written += written_sz;
     361           0 : }
     362             : 
     363             : static void
     364           0 : rename_snapshots( fd_snaprd_tile_t * ctx ) {
     365           0 :   if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) return;
     366           0 :   char const * full_snapshot_name;
     367           0 :   char const * incremental_snapshot_name;
     368           0 :   fd_sshttp_snapshot_names( ctx->sshttp, &full_snapshot_name, &incremental_snapshot_name );
     369             : 
     370           0 :   if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) ) {
     371           0 :     if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", ctx->local_out.dir_fd, full_snapshot_name ) ) )
     372           0 :       FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     373           0 :   }
     374           0 :   if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) {
     375           0 :     if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", ctx->local_out.dir_fd, incremental_snapshot_name ) ) )
     376           0 :       FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     377           0 :   }
     378           0 : }
     379             : 
     380             : static void
     381           0 : print_diagnostics( fd_snaprd_tile_t * ctx ) {
     382           0 :   double bandwidth = (double)((ctx->metrics.full.bytes_read+ctx->metrics.incremental.bytes_read)-ctx->diagnostics.prev_bytes_read)/1e6;
     383             : 
     384           0 :   ulong snaprd_backp = *ctx->diagnostics.cur_snaprd_backp_prefrag;
     385           0 :   ulong snaprd_wait = *ctx->diagnostics.cur_snaprd_caughtup_postfrag + snaprd_backp;
     386           0 :   ulong snapdc_backp = *ctx->diagnostics.cur_snapdc_backp_prefrag;
     387           0 :   ulong snapdc_wait = *ctx->diagnostics.cur_snapdc_caughtup_postfrag + snapdc_backp;
     388           0 :   ulong snapin_backp = *ctx->diagnostics.cur_snapin_backp_prefrag;
     389           0 :   ulong snapin_wait = *ctx->diagnostics.cur_snapin_caughtup_postfrag + snapin_backp;
     390             : 
     391           0 :   ulong accounts_inserted = *ctx->diagnostics.cur_accounts_inserted;
     392             : 
     393           0 :   double ns_per_tick = 1.0/fd_tempo_tick_per_ns( NULL );
     394             : 
     395           0 :   switch( ctx->state ) {
     396           0 :     case FD_SNAPRD_STATE_WAITING_FOR_PEERS:
     397           0 :       FD_LOG_NOTICE(( "waiting for peers from gossip" ));
     398           0 :       break;
     399           0 :     case FD_SNAPRD_STATE_COLLECTING_PEERS:
     400           0 :       FD_LOG_NOTICE(( "collecting peers from gossip" ));
     401           0 :       break;
     402           0 :     case FD_SNAPRD_STATE_READING_FULL_FILE: {
     403           0 :       double progress = 0.0;
     404           0 :       if( FD_LIKELY( ctx->metrics.full.bytes_total ) ) progress = 100.0 * (double)ctx->metrics.full.bytes_read / (double)ctx->metrics.full.bytes_total;
     405           0 :       FD_LOG_NOTICE(( "restoring full from file ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s",
     406           0 :         progress,
     407           0 :         bandwidth,
     408           0 :         ((double)(snaprd_backp-ctx->diagnostics.prev_snaprd_backp_prefrag)*ns_per_tick )/1e7,
     409           0 :         ((double)(snapdc_backp-ctx->diagnostics.prev_snapdc_backp_prefrag)*ns_per_tick )/1e7,
     410           0 :         ((double)(snapin_backp-ctx->diagnostics.prev_snapin_backp_prefrag)*ns_per_tick )/1e7,
     411           0 :         100-(((double)(snaprd_wait-ctx->diagnostics.prev_snaprd_wait)*ns_per_tick )/1e7 ),
     412           0 :         100-(((double)(snapdc_wait-ctx->diagnostics.prev_snapdc_wait)*ns_per_tick )/1e7 ),
     413           0 :         100-(((double)(snapin_wait-ctx->diagnostics.prev_snapin_wait)*ns_per_tick )/1e7 ),
     414           0 :         (double)( accounts_inserted-ctx->diagnostics.prev_accounts_inserted  )/1e6 ));
     415           0 :       break;
     416           0 :     }
     417           0 :     case FD_SNAPRD_STATE_FLUSHING_FULL_FILE: {
     418           0 :       FD_LOG_NOTICE(( "flushing full from file ... 100.0 %% bw=   0 MB/s" ));
     419           0 :       break;
     420           0 :     }
     421           0 :     case FD_SNAPRD_STATE_FLUSHING_FULL_FILE_RESET:
     422           0 :       FD_LOG_NOTICE(( "resetting full from file ... 100.0 %% bw=   0 MB/s" ));
     423           0 :       break;
     424           0 :     case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE: {
     425           0 :       double progress = 0.0;
     426           0 :       if( FD_LIKELY( ctx->metrics.incremental.bytes_total ) ) progress = 100.0 * (double)ctx->metrics.incremental.bytes_read / (double)ctx->metrics.incremental.bytes_total;
     427           0 :       FD_LOG_NOTICE(( "restoring incremental from file ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s",
     428           0 :         progress,
     429           0 :         bandwidth,
     430           0 :         ((double)(snaprd_backp-ctx->diagnostics.prev_snaprd_backp_prefrag)*ns_per_tick )/1e7,
     431           0 :         ((double)(snapdc_backp-ctx->diagnostics.prev_snapdc_backp_prefrag)*ns_per_tick )/1e7,
     432           0 :         ((double)(snapin_backp-ctx->diagnostics.prev_snapin_backp_prefrag)*ns_per_tick )/1e7,
     433           0 :         100-(((double)(snaprd_wait-ctx->diagnostics.prev_snaprd_wait)*ns_per_tick )/1e7 ),
     434           0 :         100-(((double)(snapdc_wait-ctx->diagnostics.prev_snapdc_wait)*ns_per_tick )/1e7 ),
     435           0 :         100-(((double)(snapin_wait-ctx->diagnostics.prev_snapin_wait)*ns_per_tick )/1e7 ),
     436           0 :         (double)( accounts_inserted-ctx->diagnostics.prev_accounts_inserted  )/1e6 ));
     437           0 :       break;
     438           0 :     }
     439           0 :     case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE: {
     440           0 :       FD_LOG_NOTICE(( "flushing incremental from file ... 100.0 %% bw=   0 MB/s" ));
     441           0 :       break;
     442           0 :     }
     443           0 :     case FD_SNAPRD_STATE_READING_FULL_HTTP: {
     444           0 :       double progress = 0.0;
     445           0 :       if( FD_LIKELY( ctx->metrics.full.bytes_total ) ) progress = 100.0 * (double)ctx->metrics.full.bytes_read / (double)ctx->metrics.full.bytes_total;
     446           0 :       FD_LOG_NOTICE(( "restoring full from http ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s",
     447           0 :         progress,
     448           0 :         bandwidth,
     449           0 :         ((double)(snaprd_backp-ctx->diagnostics.prev_snaprd_backp_prefrag)*ns_per_tick )/1e7,
     450           0 :         ((double)(snapdc_backp-ctx->diagnostics.prev_snapdc_backp_prefrag)*ns_per_tick )/1e7,
     451           0 :         ((double)(snapin_backp-ctx->diagnostics.prev_snapin_backp_prefrag)*ns_per_tick )/1e7,
     452           0 :         100-(((double)(snaprd_wait-ctx->diagnostics.prev_snaprd_wait)*ns_per_tick )/1e7 ),
     453           0 :         100-(((double)(snapdc_wait-ctx->diagnostics.prev_snapdc_wait)*ns_per_tick )/1e7 ),
     454           0 :         100-(((double)(snapin_wait-ctx->diagnostics.prev_snapin_wait)*ns_per_tick )/1e7 ),
     455           0 :         (double)( accounts_inserted-ctx->diagnostics.prev_accounts_inserted  )/1e6 ));
     456           0 :       break;
     457           0 :     }
     458           0 :     case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP: {
     459           0 :       FD_LOG_NOTICE(( "flushing full from http ... 100.0 %% bw=   0 MB/s" ));
     460           0 :       break;
     461           0 :     }
     462           0 :     case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET: {
     463           0 :       FD_LOG_NOTICE(( "resetting full from http ... 100.0 %% bw=   0 MB/s" ));
     464           0 :       break;
     465           0 :     }
     466           0 :     case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP: {
     467           0 :       double progress = 0.0;
     468           0 :       if( FD_LIKELY( ctx->metrics.incremental.bytes_total ) ) progress = 100.0 * (double)ctx->metrics.incremental.bytes_read / (double)ctx->metrics.incremental.bytes_total;
     469           0 :       FD_LOG_NOTICE(( "restoring incremental from http ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s",
     470           0 :         progress,
     471           0 :         bandwidth,
     472           0 :         ((double)(snaprd_backp-ctx->diagnostics.prev_snaprd_backp_prefrag)*ns_per_tick )/1e7,
     473           0 :         ((double)(snapdc_backp-ctx->diagnostics.prev_snapdc_backp_prefrag)*ns_per_tick )/1e7,
     474           0 :         ((double)(snapin_backp-ctx->diagnostics.prev_snapin_backp_prefrag)*ns_per_tick )/1e7,
     475           0 :         100-(((double)(snaprd_wait-ctx->diagnostics.prev_snaprd_wait)*ns_per_tick )/1e7 ),
     476           0 :         100-(((double)(snapdc_wait-ctx->diagnostics.prev_snapdc_wait)*ns_per_tick )/1e7 ),
     477           0 :         100-(((double)(snapin_wait-ctx->diagnostics.prev_snapin_wait)*ns_per_tick )/1e7 ),
     478           0 :         (double)( accounts_inserted-ctx->diagnostics.prev_accounts_inserted  )/1e6 ));
     479           0 :       break;
     480           0 :     }
     481           0 :     case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP: {
     482           0 :       FD_LOG_NOTICE(( "flushing incremental from http ... 100.0 %% bw=   0 MB/s" ));
     483           0 :       break;
     484           0 :     }
     485           0 :     case FD_SNAPRD_STATE_SHUTDOWN: {
     486           0 :       break;
     487           0 :     }
     488           0 :     default:
     489           0 :       break;
     490           0 :   }
     491             : 
     492           0 :   ctx->diagnostics.prev_bytes_read = ctx->metrics.full.bytes_read+ctx->metrics.incremental.bytes_read;
     493             : 
     494           0 :   ctx->diagnostics.prev_snaprd_backp_prefrag = snaprd_backp;
     495           0 :   ctx->diagnostics.prev_snaprd_wait          = snaprd_wait;
     496           0 :   ctx->diagnostics.prev_snapdc_backp_prefrag = snapdc_backp;
     497           0 :   ctx->diagnostics.prev_snapdc_wait          = snapdc_wait;
     498           0 :   ctx->diagnostics.prev_snapin_backp_prefrag = snapin_backp;
     499           0 :   ctx->diagnostics.prev_snapin_wait          = snapin_wait;
     500             : 
     501           0 :   ctx->diagnostics.prev_accounts_inserted    = accounts_inserted;
     502           0 : }
     503             : 
     504             : static void
     505             : after_credit( fd_snaprd_tile_t *  ctx,
     506             :               fd_stem_context_t * stem,
     507             :               int *               opt_poll_in,
     508           0 :               int *               charge_busy ) {
     509           0 :   (void)opt_poll_in;
     510           0 :   (void)charge_busy;
     511             : 
     512           0 :   long now = fd_log_wallclock();
     513           0 :   if( FD_LIKELY( ctx->peer_selection ) ) {
     514           0 :     fd_ssping_advance( ctx->ssping, now );
     515           0 :   }
     516             : 
     517           0 :   drain_buffer( ctx );
     518             : 
     519             :   /* All control fragments sent by the snaprd tile must be fully
     520             :      acknowledged by all downstream consumers before processing can
     521             :      proceed, to prevent tile state machines from getting out of sync
     522             :      (see fd_ssctrl.h for more details).  Currently there are two
     523             :      downstream consumers, snapdc and snapin. */
     524           0 : #define NUM_SNAP_CONSUMERS (2UL)
     525             : 
     526           0 :   if( FD_UNLIKELY( now>ctx->diagnostic_deadline_nanos ) ) {
     527           0 :     ctx->diagnostic_deadline_nanos = now+(long)1e9;
     528           0 :     print_diagnostics( ctx );
     529           0 :   }
     530             : 
     531           0 :   switch ( ctx->state ) {
     532           0 :     case FD_SNAPRD_STATE_WAITING_FOR_PEERS: {
     533           0 :       fd_ip4_port_t best = fd_ssping_best( ctx->ssping );
     534           0 :       if( FD_LIKELY( best.l ) ) {
     535           0 :         ctx->state = FD_SNAPRD_STATE_COLLECTING_PEERS;
     536           0 :         ctx->deadline_nanos = now+500L*1000L*1000L;
     537           0 :       }
     538           0 :       break;
     539           0 :     }
     540           0 :     case FD_SNAPRD_STATE_COLLECTING_PEERS: {
     541           0 :       if( FD_UNLIKELY( now<ctx->deadline_nanos ) ) break;
     542             : 
     543           0 :       fd_ip4_port_t best = fd_ssping_best( ctx->ssping );
     544           0 :       if( FD_UNLIKELY( !best.l ) ) {
     545           0 :         ctx->state = FD_SNAPRD_STATE_WAITING_FOR_PEERS;
     546           0 :         break;
     547           0 :       }
     548             : 
     549           0 :       ulong highest_cluster_slot = 0UL; /* TODO: Implement, using incremental snapshot slot for age */
     550           0 :       if( FD_LIKELY( ctx->local_in.full_snapshot_slot!=ULONG_MAX && ctx->local_in.full_snapshot_slot>=fd_ulong_sat_sub( highest_cluster_slot, ctx->config.maximum_local_snapshot_age ) ) ) {
     551           0 :         FD_LOG_NOTICE(( "reading full snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
     552           0 :         ctx->metrics.full.bytes_total = ctx->local_in.full_snapshot_size;
     553           0 :         ctx->state = FD_SNAPRD_STATE_READING_FULL_FILE;
     554           0 :       } else {
     555           0 :         FD_LOG_NOTICE(( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( best.addr ), fd_ushort_bswap( best.port ) ));
     556           0 :         ctx->addr  = best;
     557           0 :         ctx->state = FD_SNAPRD_STATE_READING_FULL_HTTP;
     558           0 :         fd_sshttp_init( ctx->sshttp, best, "/snapshot.tar.bz2", 17UL, now );
     559           0 :       }
     560           0 :       break;
     561           0 :     }
     562           0 :     case FD_SNAPRD_STATE_READING_FULL_FILE:
     563           0 :     case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE:
     564           0 :       read_file_data( ctx, stem );
     565           0 :       break;
     566           0 :     case FD_SNAPRD_STATE_READING_FULL_HTTP:
     567           0 :     case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP: {
     568           0 :       read_http_data( ctx, stem, now );
     569           0 :       break;
     570           0 :     }
     571           0 :     case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE:
     572           0 :     case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP:
     573           0 :       if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
     574           0 :       ctx->ack_cnt = 0UL;
     575             : 
     576           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     577           0 :         fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
     578           0 :         ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
     579           0 :         ctx->malformed = 0;
     580           0 :         break;
     581           0 :       }
     582             : 
     583           0 :       if( FD_UNLIKELY( ctx->local_out.write_buffer_len ) ) break;
     584             : 
     585           0 :       rename_snapshots( ctx );
     586           0 :       ctx->state = FD_SNAPRD_STATE_SHUTDOWN;
     587           0 :       metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
     588           0 :       fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
     589           0 :       break;
     590           0 :     case FD_SNAPRD_STATE_FLUSHING_FULL_FILE:
     591           0 :       if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
     592           0 :       ctx->ack_cnt = 0UL;
     593             : 
     594           0 :       if( FD_LIKELY( !ctx->config.incremental_snapshot_fetch ) ) {
     595           0 :         ctx->state = FD_SNAPRD_STATE_SHUTDOWN;
     596           0 :         metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
     597           0 :         fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
     598           0 :         break;
     599           0 :       }
     600             : 
     601           0 :       FD_LOG_NOTICE(( "reading incremental snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path ));
     602           0 :       ctx->metrics.incremental.bytes_total = ctx->local_in.incremental_snapshot_size;
     603           0 :       ctx->state = FD_SNAPRD_STATE_READING_INCREMENTAL_FILE;
     604           0 :       break;
     605           0 :     case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP:
     606           0 :       if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
     607           0 :       ctx->ack_cnt = 0UL;
     608             : 
     609           0 :       if( FD_UNLIKELY( ctx->malformed ) ) {
     610           0 :         fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
     611           0 :         ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
     612           0 :         ctx->malformed = 0;
     613           0 :         break;
     614           0 :       }
     615             : 
     616           0 :       if( FD_UNLIKELY( ctx->local_out.write_buffer_len ) ) break;
     617             : 
     618           0 :       if( FD_LIKELY( !ctx->config.incremental_snapshot_fetch ) ) {
     619           0 :         rename_snapshots( ctx );
     620           0 :         ctx->state = FD_SNAPRD_STATE_SHUTDOWN;
     621           0 :         metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
     622           0 :         fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
     623           0 :         break;
     624           0 :       }
     625             : 
     626           0 :       FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), fd_ushort_bswap( ctx->addr.port ) ));
     627           0 :       fd_sshttp_init( ctx->sshttp, ctx->addr, "/incremental-snapshot.tar.bz2", 29UL, fd_log_wallclock() );
     628           0 :       ctx->state = FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP;
     629           0 :       break;
     630           0 :     case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET:
     631           0 :     case FD_SNAPRD_STATE_FLUSHING_FULL_FILE_RESET:
     632           0 :       if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
     633           0 :       ctx->ack_cnt = 0UL;
     634             : 
     635           0 :       ctx->metrics.full.bytes_read = 0UL;
     636           0 :       ctx->metrics.full.bytes_written = 0UL;
     637           0 :       ctx->metrics.full.bytes_total = 0UL;
     638             : 
     639           0 :       ctx->metrics.incremental.bytes_read = 0UL;
     640           0 :       ctx->metrics.incremental.bytes_written = 0UL;
     641           0 :       ctx->metrics.incremental.bytes_total = 0UL;
     642             : 
     643           0 :       ctx->state = FD_SNAPRD_STATE_COLLECTING_PEERS;
     644           0 :       ctx->deadline_nanos = 0L;
     645           0 :       break;
     646           0 :     case FD_SNAPRD_STATE_SHUTDOWN:
     647           0 :       break;
     648           0 :     default: {
     649           0 :       FD_LOG_ERR(( "unexpected state %d", ctx->state ));
     650           0 :       break;
     651           0 :     }
     652           0 :   }
     653           0 : }
     654             : 
     655             : static int
     656             : before_frag( fd_snaprd_tile_t * ctx FD_PARAM_UNUSED,
     657             :              ulong              in_idx,
     658             :              ulong              seq FD_PARAM_UNUSED,
     659           0 :              ulong              sig ) {
     660           0 :   if( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ){
     661           0 :     (void)sig;
     662             :     // return !( sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO ||
     663             :     //           sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE ||
     664             :     //           sig==FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES );
     665           0 :     return 1;
     666           0 :   }
     667           0 :   return 0;
     668           0 : }
     669             : 
     670             : static void
     671             : during_frag( fd_snaprd_tile_t * ctx,
     672             :              ulong              in_idx,
     673             :              ulong              seq FD_PARAM_UNUSED,
     674             :              ulong              sig FD_PARAM_UNUSED,
     675             :              ulong              chunk,
     676             :              ulong              sz,
     677           0 :              ulong              ctl FD_PARAM_UNUSED) {
     678           0 :   if( ctx->in_kind[ in_idx ]!= IN_KIND_GOSSIP ) return;
     679             : 
     680           0 :   if( FD_UNLIKELY( chunk<ctx->gossip_in.chunk0 ||
     681           0 :                    chunk>ctx->gossip_in.wmark ) ) {
     682           0 :     FD_LOG_ERR(( "snaprd: unexpected chunk %lu", chunk ));
     683           0 :   }
     684             :   /* TODO: Size checks */
     685           0 :   fd_memcpy( &ctx->gossip.tmp_upd_buf, fd_chunk_to_laddr( ctx->gossip_in.mem, chunk ), sz );
     686           0 : }
     687             : 
     688             : static void
     689             : after_frag( fd_snaprd_tile_t *  ctx,
     690             :             ulong               in_idx,
     691             :             ulong               seq,
     692             :             ulong               sig,
     693             :             ulong               sz,
     694             :             ulong               tsorig,
     695             :             ulong               tspub,
     696           0 :             fd_stem_context_t * stem ) {
     697           0 :   (void)in_idx;
     698             : 
     699           0 :   (void)seq;
     700           0 :   (void)tsorig;
     701           0 :   (void)tspub;
     702           0 :   (void)sz;
     703             : 
     704           0 :   if( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) {
     705           0 :     fd_gossip_update_message_t * msg = &ctx->gossip.tmp_upd_buf;
     706           0 :     switch( msg->tag ) {
     707           0 :       case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO: {
     708           0 :           fd_contact_info_t * cur      = &ctx->gossip.ci_table[ msg->contact_info.idx ];
     709           0 :           fd_ip4_port_t       cur_addr = ctx->gossip.ci_table[ msg->contact_info.idx ].sockets[ FD_CONTACT_INFO_SOCKET_RPC ];
     710           0 :           if( cur_addr.l ){
     711           0 :             fd_ssping_remove( ctx->ssping, cur_addr );
     712           0 :           }
     713           0 :           fd_contact_info_t * new = msg->contact_info.contact_info;
     714           0 :           fd_ip4_port_t new_addr  = new->sockets[ FD_CONTACT_INFO_SOCKET_RPC ];
     715           0 :           if( new_addr.l ) {
     716           0 :             fd_ssping_add( ctx->ssping, new_addr );
     717           0 :           }
     718           0 :           *cur = *new;
     719           0 :         }
     720           0 :         break;
     721           0 :       case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE: {
     722           0 :           fd_contact_info_t * cur  = &ctx->gossip.ci_table[ msg->contact_info_remove.idx ];
     723           0 :           fd_ip4_port_t       addr = cur->sockets[ FD_CONTACT_INFO_SOCKET_RPC ];
     724           0 :           if( addr.l ) {
     725           0 :             fd_ssping_remove( ctx->ssping, addr );
     726           0 :           }
     727           0 :         }
     728           0 :         break;
     729           0 :       case FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES:
     730             :         /* TODO */
     731           0 :         break;
     732           0 :     }
     733             : 
     734           0 :   } else {
     735           0 :     FD_TEST( sig==FD_SNAPSHOT_MSG_CTRL_ACK || sig==FD_SNAPSHOT_MSG_CTRL_MALFORMED );
     736             : 
     737           0 :     if( FD_LIKELY( sig==FD_SNAPSHOT_MSG_CTRL_ACK ) ) ctx->ack_cnt++;
     738           0 :     else {
     739           0 :       FD_TEST( ctx->state!=FD_SNAPRD_STATE_SHUTDOWN &&
     740           0 :                ctx->state!=FD_SNAPRD_STATE_COLLECTING_PEERS &&
     741           0 :                ctx->state!=FD_SNAPRD_STATE_WAITING_FOR_PEERS );
     742             : 
     743           0 :       switch( ctx->state) {
     744           0 :         case FD_SNAPRD_STATE_READING_FULL_FILE:
     745           0 :         case FD_SNAPRD_STATE_FLUSHING_FULL_FILE:
     746           0 :         case FD_SNAPRD_STATE_FLUSHING_FULL_FILE_RESET:
     747           0 :           FD_LOG_ERR(( "error reading snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
     748           0 :         case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE:
     749           0 :         case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE:
     750           0 :           FD_LOG_ERR(( "error reading snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path ));
     751           0 :         case FD_SNAPRD_STATE_READING_FULL_HTTP:
     752           0 :         case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP:
     753           0 :           FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
     754           0 :                           FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
     755           0 :           fd_sshttp_cancel( ctx->sshttp );
     756           0 :           fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
     757           0 :           fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
     758           0 :           ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
     759           0 :           break;
     760           0 :         case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP:
     761           0 :         case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP:
     762           0 :           if( FD_UNLIKELY( ctx->malformed ) ) break;
     763             : 
     764           0 :           FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2",
     765           0 :                           FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
     766           0 :           fd_sshttp_cancel( ctx->sshttp );
     767           0 :           fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
     768             :           /* We would like to transition to FULL_HTTP_RESET, but we
     769             :              can't do it just yet, because we have already sent a DONE
     770             :              control fragment, and need to wait for acknowledges to come
     771             :              back first, to ensure there's only one control message
     772             :              outstanding at a time. */
     773           0 :           ctx->malformed = 1;
     774           0 :           break;
     775           0 :         case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET:
     776           0 :           break;
     777           0 :         default:
     778           0 :           FD_LOG_ERR(( "unexpected state %d", ctx->state ));
     779           0 :           break;
     780           0 :       }
     781           0 :     }
     782           0 :   }
     783           0 : }
     784             : 
     785             : static void
     786             : privileged_init( fd_topo_t *      topo,
     787           0 :                  fd_topo_tile_t * tile ) {
     788           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     789             : 
     790           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     791           0 :   fd_snaprd_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaprd_tile_t),  sizeof(fd_snaprd_tile_t) );
     792             : 
     793           0 :   fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
     794             : 
     795             :   /* By default, the snaprd tile selects peers and its initial state is
     796             :      WAITING_FOR_PEERS. */
     797           0 :   ctx->peer_selection = 1;
     798           0 :   ctx->state = FD_SNAPRD_STATE_WAITING_FOR_PEERS;
     799             : 
     800           0 :   fd_ssarchive_remove_old_snapshots( tile->snaprd.snapshots_path,
     801           0 :                                      tile->snaprd.max_full_snapshots_to_keep,
     802           0 :                                      tile->snaprd.max_incremental_snapshots_to_keep );
     803             : 
     804           0 :   ulong full_slot = ULONG_MAX;
     805           0 :   ulong incremental_slot = ULONG_MAX;
     806           0 :   char full_path[ PATH_MAX ] = {0};
     807           0 :   char incremental_path[ PATH_MAX ] = {0};
     808           0 :   if( FD_UNLIKELY( -1==fd_ssarchive_latest_pair( tile->snaprd.snapshots_path,
     809           0 :                                                  tile->snaprd.incremental_snapshot_fetch,
     810           0 :                                                  &full_slot,
     811           0 :                                                  &incremental_slot,
     812           0 :                                                  full_path,
     813           0 :                                                  incremental_path ) ) ) {
     814           0 :     if( FD_UNLIKELY( !tile->snaprd.do_download ) ) {
     815           0 :       FD_LOG_ERR(( "No snapshots found in `%s` and downloading is disabled. "
     816           0 :                    "Please enable downloading via [snapshots.download] and restart.", tile->snaprd.snapshots_path ));
     817           0 :     }
     818             : 
     819           0 :     ctx->local_in.full_snapshot_slot        = ULONG_MAX;
     820           0 :     ctx->local_in.incremental_snapshot_slot = ULONG_MAX;
     821             : 
     822           0 :     ctx->local_out.dir_fd = open( tile->snaprd.snapshots_path, O_DIRECTORY|O_CLOEXEC );
     823           0 :     if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", tile->snaprd.snapshots_path, errno, fd_io_strerror( errno ) ));
     824             : 
     825           0 :     FD_TEST( fd_cstr_printf_check( ctx->local_out.full_snapshot_path, PATH_MAX, NULL, "%s/snapshot.tar.bz2-partial", tile->snaprd.snapshots_path ) );
     826           0 :     ctx->local_out.full_snapshot_fd = openat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
     827           0 :     if( FD_UNLIKELY( -1==ctx->local_out.full_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_out.full_snapshot_path, errno, fd_io_strerror( errno ) ));
     828             : 
     829           0 :     if( FD_LIKELY( tile->snaprd.incremental_snapshot_fetch ) ) {
     830           0 :       FD_TEST( fd_cstr_printf_check( ctx->local_out.incremental_snapshot_path, PATH_MAX, NULL, "%s/incremental-snapshot.tar.bz2-partial", tile->snaprd.snapshots_path ) );
     831           0 :       ctx->local_out.incremental_snapshot_fd = openat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
     832           0 :       if( FD_UNLIKELY( -1==ctx->local_out.incremental_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_out.incremental_snapshot_path, errno, fd_io_strerror( errno ) ));
     833           0 :     } else {
     834           0 :       ctx->local_out.incremental_snapshot_fd = -1;
     835           0 :     }
     836           0 :   } else {
     837           0 :     FD_TEST( full_slot!=ULONG_MAX );
     838             : 
     839           0 :     ctx->local_in.full_snapshot_slot = full_slot;
     840           0 :     ctx->local_in.incremental_snapshot_slot = incremental_slot;
     841             : 
     842           0 :     strncpy( ctx->local_in.full_snapshot_path, full_path, PATH_MAX );
     843           0 :     ctx->local_in.full_snapshot_fd = open( ctx->local_in.full_snapshot_path, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
     844           0 :     if( FD_UNLIKELY( -1==ctx->local_in.full_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_in.full_snapshot_path, errno, fd_io_strerror( errno ) ));
     845             : 
     846           0 :     struct stat full_stat;
     847           0 :     if( FD_UNLIKELY( -1==fstat( ctx->local_in.full_snapshot_fd, &full_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", full_path, errno, fd_io_strerror( errno ) ));
     848           0 :     if( FD_UNLIKELY( !S_ISREG( full_stat.st_mode ) ) ) FD_LOG_ERR(( "full snapshot path `%s` is not a regular file", full_path ));
     849           0 :     ctx->local_in.full_snapshot_size = (ulong)full_stat.st_size;
     850             : 
     851           0 :     if( FD_LIKELY( tile->snaprd.incremental_snapshot_fetch ) ) FD_TEST( incremental_slot!=ULONG_MAX );
     852             : 
     853           0 :     if( FD_LIKELY( incremental_slot!=ULONG_MAX ) ) {
     854           0 :       strncpy( ctx->local_in.incremental_snapshot_path, incremental_path, PATH_MAX );
     855           0 :       ctx->local_in.incremental_snapshot_fd = open( ctx->local_in.incremental_snapshot_path, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
     856           0 :       if( FD_UNLIKELY( -1==ctx->local_in.incremental_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_in.incremental_snapshot_path, errno, fd_io_strerror( errno ) ));
     857             : 
     858           0 :       struct stat incremental_stat;
     859           0 :       if( FD_UNLIKELY( -1==fstat( ctx->local_in.incremental_snapshot_fd, &incremental_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", incremental_path, errno, fd_io_strerror( errno ) ));
     860           0 :       if( FD_UNLIKELY( !S_ISREG( incremental_stat.st_mode ) ) ) FD_LOG_ERR(( "incremental snapshot path `%s` is not a regular file", incremental_path ));
     861           0 :       ctx->local_in.incremental_snapshot_size = (ulong)incremental_stat.st_size;
     862           0 :     }
     863             : 
     864           0 :     ctx->local_out.dir_fd = -1;
     865           0 :     ctx->local_out.full_snapshot_fd = -1;
     866           0 :     ctx->local_out.incremental_snapshot_fd = -1;
     867             : 
     868           0 :     if( FD_UNLIKELY( tile->snaprd.maximum_local_snapshot_age==0U ) ) {
     869             :       /* Disable peer selection if we are reading snapshots from disk
     870             :          and there is no maximum local snapshot age set.  Set the
     871             :          initial state to READING_FULL_FILE to avoid peer selection
     872             :          logic.
     873             : 
     874             :          TODO: Why? Document in TOML. */
     875           0 :       ctx->peer_selection = 0;
     876           0 :       ctx->state = FD_SNAPRD_STATE_READING_FULL_FILE;
     877           0 :       ctx->metrics.full.bytes_total = ctx->local_in.full_snapshot_size;
     878           0 :       FD_LOG_NOTICE(( "reading full snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
     879           0 :     }
     880           0 :   }
     881           0 : }
     882             : 
     883             : static void
     884             : unprivileged_init( fd_topo_t *      topo,
     885           0 :                    fd_topo_tile_t * tile ) {
     886           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     887             : 
     888           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     889           0 :   fd_snaprd_tile_t * ctx  = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaprd_tile_t),  sizeof(fd_snaprd_tile_t)       );
     890           0 :   void * _sshttp          = FD_SCRATCH_ALLOC_APPEND( l, fd_sshttp_align(),          fd_sshttp_footprint()          );
     891           0 :   void * _ssping          = FD_SCRATCH_ALLOC_APPEND( l, fd_ssping_align(),          fd_ssping_footprint( 65536UL ) );
     892             :   // void * _gossip_peers_rx = FD_SCRATCH_ALLOC_APPEND( l, gossip_peers_rx_align(),    gossip_peers_rx_footprint()    );
     893           0 :   void * _ci_table        = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_contact_info_t), sizeof(fd_contact_info_t) * FD_CONTACT_INFO_TABLE_SIZE );
     894             : 
     895           0 :   ctx->ack_cnt = 0UL;
     896           0 :   ctx->malformed = 0;
     897             : 
     898           0 :   if( FD_UNLIKELY( tile->snaprd.diagnostics ) ) ctx->diagnostic_deadline_nanos = fd_log_wallclock()+(long)1e9;
     899           0 :   else                                          ctx->diagnostic_deadline_nanos = LONG_MAX;
     900             : 
     901           0 :   ctx->local_out.write_buffer_pos = 0UL;
     902           0 :   ctx->local_out.write_buffer_len = 0UL;
     903             : 
     904           0 :   fd_memcpy( ctx->config.path, tile->snaprd.snapshots_path, PATH_MAX );
     905           0 :   ctx->config.incremental_snapshot_fetch        = tile->snaprd.incremental_snapshot_fetch;
     906           0 :   ctx->config.do_download                       = tile->snaprd.do_download;
     907           0 :   ctx->config.maximum_local_snapshot_age        = tile->snaprd.maximum_local_snapshot_age;
     908           0 :   ctx->config.minimum_download_speed_mib        = tile->snaprd.minimum_download_speed_mib;
     909           0 :   ctx->config.max_full_snapshots_to_keep        = tile->snaprd.max_full_snapshots_to_keep;
     910           0 :   ctx->config.max_incremental_snapshots_to_keep = tile->snaprd.max_incremental_snapshots_to_keep;
     911             : 
     912           0 :   if( FD_UNLIKELY( !tile->snaprd.maximum_download_retry_abort ) ) ctx->config.maximum_download_retry_abort = UINT_MAX;
     913           0 :   else                                                            ctx->config.maximum_download_retry_abort = tile->snaprd.maximum_download_retry_abort;
     914             : 
     915           0 :   ctx->ssping = fd_ssping_join( fd_ssping_new( _ssping, 65536UL, 1UL ) );
     916           0 :   FD_TEST( ctx->ssping );
     917             : 
     918           0 :   ctx->sshttp = fd_sshttp_join( fd_sshttp_new( _sshttp ) );
     919           0 :   FD_TEST( ctx->sshttp );
     920             : 
     921           0 :   memset( &ctx->diagnostics, 0, sizeof(ctx->diagnostics) );
     922             : 
     923           0 :   fd_topo_tile_t * snaprd_tile = &topo->tiles[ fd_topo_find_tile( topo, "snaprd", 0UL ) ];
     924           0 :   fd_topo_tile_t * snapdc_tile = &topo->tiles[ fd_topo_find_tile( topo, "snapdc", 0UL ) ];
     925           0 :   fd_topo_tile_t * snapin_tile = &topo->tiles[ fd_topo_find_tile( topo, "snapin", 0UL ) ];
     926           0 :   ulong volatile * const snaprd_metrics = fd_metrics_tile( snaprd_tile->metrics );
     927           0 :   ulong volatile * const snapdc_metrics = fd_metrics_tile( snapdc_tile->metrics );
     928           0 :   ulong volatile * const snapin_metrics = fd_metrics_tile( snapin_tile->metrics );
     929           0 :   ctx->diagnostics.cur_snaprd_backp_prefrag     = snaprd_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG );
     930           0 :   ctx->diagnostics.cur_snaprd_caughtup_postfrag = snaprd_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG );
     931           0 :   ctx->diagnostics.cur_snapdc_backp_prefrag     = snapdc_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG );
     932           0 :   ctx->diagnostics.cur_snapdc_caughtup_postfrag = snapdc_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG );
     933           0 :   ctx->diagnostics.cur_snapin_backp_prefrag     = snapin_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG );
     934           0 :   ctx->diagnostics.cur_snapin_caughtup_postfrag = snapin_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG );
     935           0 :   ctx->diagnostics.cur_accounts_inserted = snapin_metrics+MIDX( GAUGE, SNAPIN, ACCOUNTS_INSERTED );
     936             : 
     937           0 :   ctx->gossip.ci_table = _ci_table;
     938             :   /* zero-out memory so that we can perform null checks in after_frag */
     939           0 :   fd_memset( ctx->gossip.ci_table, 0, sizeof(fd_contact_info_t) * FD_CONTACT_INFO_TABLE_SIZE );
     940             : 
     941           0 :   FD_TEST( tile->in_cnt<=MAX_IN_LINKS );
     942           0 :   for( ulong i=0UL; i<(tile->in_cnt); i++ ){
     943           0 :     fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
     944           0 :     if( 0==strcmp( in_link->name, "gossip_out" ) ) {
     945             :       // has_gossip_in         = 1;
     946           0 :       ctx->in_kind[ i ]     = IN_KIND_GOSSIP;
     947             :       // ctx->gossip_in.mem    = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
     948             :       // ctx->gossip_in.chunk0 = fd_dcache_compact_chunk0( ctx->gossip_in.mem, in_link->dcache );
     949             :       // ctx->gossip_in.wmark  = fd_dcache_compact_wmark ( ctx->gossip_in.mem, in_link->dcache, in_link->mtu );
     950             :       // ctx->gossip_in.mtu    = in_link->mtu;
     951           0 :     } else if( 0==strcmp( in_link->name, "snapdc_rd" ) ||
     952           0 :                0==strcmp( in_link->name, "snapin_rd" ) ) {
     953           0 :       ctx->in_kind[ i ] = IN_KIND_SNAPCTL;
     954           0 :     }
     955           0 :   }
     956             : 
     957           0 :   for( ulong i=0UL; i<tile->snaprd.http.peers_cnt; i++ ) fd_ssping_add( ctx->ssping, tile->snaprd.http.peers[ i ] );
     958             : 
     959           0 :   if( FD_UNLIKELY( tile->out_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 1", tile->out_cnt ));
     960             : 
     961           0 :   ctx->out.wksp   = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ 0 ] ].dcache_obj_id ].wksp_id ].wksp;
     962           0 :   ctx->out.chunk0 = fd_dcache_compact_chunk0( ctx->out.wksp, topo->links[ tile->out_link_id[ 0 ] ].dcache );
     963           0 :   ctx->out.wmark  = fd_dcache_compact_wmark ( ctx->out.wksp, topo->links[ tile->out_link_id[ 0 ] ].dcache, topo->links[ tile->out_link_id[ 0 ] ].mtu );
     964           0 :   ctx->out.chunk  = ctx->out.chunk0;
     965           0 :   ctx->out.mtu    = topo->links[ tile->out_link_id[ 0 ] ].mtu;
     966           0 : }
     967             : 
     968           0 : #define STEM_BURST 2UL /* One control message, and one data message */
     969           0 : #define STEM_LAZY  1000L
     970             : 
     971           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_snaprd_tile_t
     972           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaprd_tile_t)
     973             : 
     974             : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
     975           0 : #define STEM_CALLBACK_METRICS_WRITE   metrics_write
     976           0 : #define STEM_CALLBACK_AFTER_CREDIT    after_credit
     977           0 : #define STEM_CALLBACK_BEFORE_FRAG     before_frag
     978           0 : #define STEM_CALLBACK_DURING_FRAG     during_frag
     979           0 : #define STEM_CALLBACK_AFTER_FRAG      after_frag
     980             : 
     981             : #include "../../disco/stem/fd_stem.c"
     982             : 
     983             : fd_topo_run_tile_t fd_tile_snaprd = {
     984             :   .name                 = NAME,
     985             :   .scratch_align        = scratch_align,
     986             :   .scratch_footprint    = scratch_footprint,
     987             :   .privileged_init      = privileged_init,
     988             :   .unprivileged_init    = unprivileged_init,
     989             :   .run                  = stem_run,
     990             :   .keep_host_networking = 1,
     991             :   .allow_connect        = 1
     992             : };
     993             : 
     994             : #undef NAME

Generated by: LCOV version 1.14