Line data Source code
1 : #include "fd_snapct_tile.h"
2 : #include "utils/fd_sspeer.h"
3 : #include "utils/fd_ssping.h"
4 : #include "utils/fd_ssctrl.h"
5 : #include "utils/fd_ssarchive.h"
6 : #include "utils/fd_http_resolver.h"
7 : #include "utils/fd_ssmsg.h"
8 :
9 : #include "../../disco/topo/fd_topo.h"
10 : #include "../../disco/metrics/fd_metrics.h"
11 : #include "../../flamenco/gossip/fd_gossip_message.h"
12 : #include "../../waltz/openssl/fd_openssl_tile.h"
13 :
14 : #include <errno.h>
15 : #include <stdio.h>
16 : #include <fcntl.h>
17 : #include <unistd.h>
18 : #include <sys/stat.h>
19 : #include <netinet/tcp.h>
20 : #include <netinet/in.h>
21 :
22 : #include "generated/fd_snapct_tile_seccomp.h"
23 :
24 : #define NAME "snapct"
25 :
26 : /* FIXME: Do a finishing pass over the default.toml config options / comments */
27 :
28 0 : #define GOSSIP_PEERS_MAX (FD_CONTACT_INFO_TABLE_SIZE)
29 0 : #define SERVER_PEERS_MAX (FD_TOPO_SNAPSHOTS_SERVERS_MAX_RESOLVED)
30 0 : #define TOTAL_PEERS_MAX (GOSSIP_PEERS_MAX + SERVER_PEERS_MAX)
31 :
32 0 : #define IN_KIND_ACK (0)
33 0 : #define IN_KIND_SNAPLD (1)
34 0 : #define IN_KIND_GOSSIP (2)
35 : #define MAX_IN_LINKS (3)
36 :
37 0 : #define TEMP_FULL_SNAP_NAME ".snapshot.tar.bz2-partial"
38 0 : #define TEMP_INCR_SNAP_NAME ".incremental-snapshot.tar.bz2-partial"
39 :
40 : struct fd_snapct_out_link {
41 : ulong idx;
42 : fd_wksp_t * mem;
43 : ulong chunk0;
44 : ulong wmark;
45 : ulong chunk;
46 : ulong mtu;
47 : };
48 : typedef struct fd_snapct_out_link fd_snapct_out_link_t;
49 :
50 0 : #define FD_SNAPCT_COLLECTING_PEERS_TIMEOUT (90L*1000L*1000L*1000L) /* 1.5 minutes */
51 0 : #define FD_SNAPCT_WAITING_FOR_PEERS_TIMEOUT (30L*1000L*1000L*1000L) /* 30 seconds */
52 :
53 : struct gossip_ci_entry {
54 : fd_pubkey_t pubkey;
55 : int allowed;
56 : fd_ip4_port_t rpc_addr;
57 : ulong map_next;
58 : };
59 : typedef struct gossip_ci_entry gossip_ci_entry_t;
60 :
61 : #define MAP_NAME gossip_ci_map
62 0 : #define MAP_KEY pubkey
63 : #define MAP_ELE_T gossip_ci_entry_t
64 : #define MAP_KEY_T fd_pubkey_t
65 0 : #define MAP_NEXT map_next
66 0 : #define MAP_KEY_EQ(k0,k1) fd_pubkey_eq( k0, k1 )
67 0 : #define MAP_KEY_HASH(key,seed) fd_hash( seed, key, sizeof(fd_pubkey_t) )
68 : #include "../../util/tmpl/fd_map_chain.c"
69 :
70 : struct fd_snapct_tile {
71 : struct fd_topo_tile_snapct config;
72 : int gossip_enabled;
73 : int download_enabled;
74 :
75 : fd_ssping_t * ssping;
76 : fd_http_resolver_t * ssresolver;
77 : fd_sspeer_selector_t * selector;
78 : ulong selector_seed;
79 :
80 : int state;
81 : int malformed;
82 : long deadline_nanos;
83 : int flush_ack;
84 : fd_sspeer_t peer;
85 :
86 : struct {
87 : int dir_fd;
88 : int full_snapshot_fd;
89 : int incremental_snapshot_fd;
90 : } local_out;
91 :
92 : char http_full_snapshot_name[ PATH_MAX ];
93 : char http_incr_snapshot_name[ PATH_MAX ];
94 :
95 : fd_wksp_t const * gossip_in_mem;
96 : fd_wksp_t const * snapld_in_mem;
97 : uchar in_kind[ MAX_IN_LINKS ];
98 :
99 : struct {
100 : ulong full_slot;
101 : ulong slot;
102 : int pending;
103 : } predicted_incremental;
104 :
105 : struct {
106 : ulong full_snapshot_slot;
107 : char full_snapshot_path[ PATH_MAX ];
108 : ulong full_snapshot_size;
109 : int full_snapshot_zstd;
110 :
111 : uchar full_snapshot_hash[ FD_HASH_FOOTPRINT ];
112 : uchar incremental_snapshot_hash[ FD_HASH_FOOTPRINT ];
113 :
114 : ulong incremental_snapshot_slot;
115 : char incremental_snapshot_path[ PATH_MAX ];
116 : ulong incremental_snapshot_size;
117 : int incremental_snapshot_zstd;
118 : } local_in;
119 :
120 : struct {
121 : struct {
122 : ulong bytes_read;
123 : ulong bytes_written;
124 : ulong bytes_total;
125 : uint num_retries;
126 : } full;
127 :
128 : struct {
129 : ulong bytes_read;
130 : ulong bytes_written;
131 : ulong bytes_total;
132 : uint num_retries;
133 : } incremental;
134 : } metrics;
135 :
136 : struct {
137 : gossip_ci_entry_t * ci_table; /* flat array of all gossip entries, allowed or not */
138 : gossip_ci_map_t * ci_map; /* map from pubkey to only allowed gossip entries */
139 : ulong allowed_cnt; /* number of allowed entries in ci_map */
140 : int saturated;
141 : } gossip;
142 :
143 : long snapshot_start_timestamp_ns;
144 :
145 : fd_snapct_out_link_t out_ld;
146 : fd_snapct_out_link_t out_gui;
147 : fd_snapct_out_link_t out_rp;
148 : };
149 : typedef struct fd_snapct_tile fd_snapct_tile_t;
150 :
151 : static int
152 0 : gossip_enabled( fd_topo_tile_t const * tile ) {
153 0 : return tile->snapct.sources.gossip.allow_any || tile->snapct.sources.gossip.allow_list_cnt>0UL;
154 0 : }
155 :
156 : static int
157 0 : download_enabled( fd_topo_tile_t const * tile ) {
158 0 : return gossip_enabled( tile ) || tile->snapct.sources.servers_cnt>0UL;
159 0 : }
160 :
161 : FD_FN_CONST static inline ulong
162 0 : loose_footprint( fd_topo_tile_t const * tile ) {
163 0 : (void)tile;
164 : /* Leftover space for OpenSSL allocations */
165 0 : return 1<<26UL; /* 64 MiB */
166 0 : }
167 :
168 : static ulong
169 0 : scratch_align( void ) {
170 0 : return fd_ulong_max( alignof(fd_snapct_tile_t),
171 0 : fd_ulong_max( fd_ssping_align(),
172 0 : fd_ulong_max( alignof(gossip_ci_entry_t),
173 0 : fd_ulong_max( gossip_ci_map_align(),
174 0 : fd_ulong_max( fd_http_resolver_align(),
175 0 : fd_sspeer_selector_align() ) ) ) ) );
176 0 : }
177 :
178 : static ulong
179 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
180 0 : ulong l = FD_LAYOUT_INIT;
181 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
182 0 : l = FD_LAYOUT_APPEND( l, fd_ssping_align(), fd_ssping_footprint( TOTAL_PEERS_MAX ) );
183 0 : l = FD_LAYOUT_APPEND( l, alignof(gossip_ci_entry_t), sizeof(gossip_ci_entry_t) * GOSSIP_PEERS_MAX );
184 0 : l = FD_LAYOUT_APPEND( l, gossip_ci_map_align(), gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
185 0 : l = FD_LAYOUT_APPEND( l, fd_http_resolver_align(), fd_http_resolver_footprint( SERVER_PEERS_MAX ) );
186 0 : l = FD_LAYOUT_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( TOTAL_PEERS_MAX ) );
187 0 : l = FD_LAYOUT_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
188 0 : return FD_LAYOUT_FINI( l, scratch_align() );
189 0 : }
190 :
191 : static inline int
192 0 : should_shutdown( fd_snapct_tile_t * ctx ) {
193 0 : return ctx->state==FD_SNAPCT_STATE_SHUTDOWN;
194 0 : }
195 :
196 : static void
197 0 : metrics_write( fd_snapct_tile_t * ctx ) {
198 0 : FD_MGAUGE_SET( SNAPCT, FULL_BYTES_READ, ctx->metrics.full.bytes_read );
199 0 : FD_MGAUGE_SET( SNAPCT, FULL_BYTES_WRITTEN, ctx->metrics.full.bytes_written );
200 0 : FD_MGAUGE_SET( SNAPCT, FULL_BYTES_TOTAL, ctx->metrics.full.bytes_total );
201 0 : FD_MGAUGE_SET( SNAPCT, FULL_DOWNLOAD_RETRIES, ctx->metrics.full.num_retries );
202 :
203 0 : FD_MGAUGE_SET( SNAPCT, INCREMENTAL_BYTES_READ, ctx->metrics.incremental.bytes_read );
204 0 : FD_MGAUGE_SET( SNAPCT, INCREMENTAL_BYTES_WRITTEN, ctx->metrics.incremental.bytes_written );
205 0 : FD_MGAUGE_SET( SNAPCT, INCREMENTAL_BYTES_TOTAL, ctx->metrics.incremental.bytes_total );
206 0 : FD_MGAUGE_SET( SNAPCT, INCREMENTAL_DOWNLOAD_RETRIES, ctx->metrics.incremental.num_retries );
207 :
208 0 : FD_MGAUGE_SET( SNAPCT, PREDICTED_SLOT, ctx->predicted_incremental.slot );
209 :
210 0 : #if FD_HAS_OPENSSL
211 0 : FD_MCNT_SET( SNAPCT, SSL_ALLOC_ERRORS, fd_ossl_alloc_errors );
212 0 : #endif
213 :
214 0 : FD_MGAUGE_SET( SNAPCT, STATE, (ulong)ctx->state );
215 0 : }
216 :
217 : static void
218 : snapshot_path_gui_publish( fd_snapct_tile_t * ctx,
219 : fd_stem_context_t * stem,
220 : char const * path,
221 0 : int is_full ) {
222 : /* The messages below cannot be obtained directly from metrics. */
223 0 : fd_snapct_update_t * out = fd_chunk_to_laddr( ctx->out_gui.mem, ctx->out_gui.chunk );
224 0 : FD_TEST( fd_cstr_printf_check( out->read_path, PATH_MAX, NULL, "%s", path ) );
225 0 : out->is_download = 0;
226 0 : out->type = fd_int_if( is_full, FD_SNAPCT_SNAPSHOT_TYPE_FULL, FD_SNAPCT_SNAPSHOT_TYPE_INCREMENTAL );
227 0 : fd_stem_publish( stem, ctx->out_gui.idx, 0UL, ctx->out_gui.chunk, sizeof(fd_snapct_update_t) , 0UL, 0UL, 0UL );
228 0 : ctx->out_gui.chunk = fd_dcache_compact_next( ctx->out_gui.chunk, sizeof(fd_snapct_update_t), ctx->out_gui.chunk0, ctx->out_gui.wmark );
229 0 : }
230 :
231 : static void
232 0 : predict_incremental( fd_snapct_tile_t * ctx ) {
233 0 : if( FD_UNLIKELY( !ctx->config.incremental_snapshots ) ) return;
234 0 : if( FD_UNLIKELY( ctx->predicted_incremental.full_slot==FD_SSPEER_SLOT_UNKNOWN ) ) return;
235 :
236 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
237 :
238 0 : if( FD_LIKELY( best.addr.l ) ) {
239 0 : if( FD_UNLIKELY( ctx->predicted_incremental.slot!=best.incr_slot ) ) {
240 0 : ctx->predicted_incremental.slot = best.incr_slot;
241 0 : ctx->predicted_incremental.pending = 1;
242 0 : }
243 0 : }
244 0 : }
245 :
246 : static void
247 : on_resolve( void * _ctx,
248 : fd_sspeer_key_t const * key,
249 : ulong full_slot,
250 : ulong incr_slot,
251 : uchar full_hash[ FD_HASH_FOOTPRINT ],
252 0 : uchar incr_hash[ FD_HASH_FOOTPRINT ] ) {
253 0 : fd_snapct_tile_t * ctx = (fd_snapct_tile_t *)_ctx;
254 :
255 0 : int status = fd_sspeer_selector_update_on_resolve( ctx->selector, key, full_slot, incr_slot, full_hash, incr_hash );
256 0 : if( FD_UNLIKELY( status<0 ) ) {
257 : /* The update may fail in normal operation, e.g. after a peer has
258 : been removed from the selector. The log level is set to a
259 : minimum accordingly. */
260 0 : if( FD_UNLIKELY( key==NULL ) ) {
261 0 : FD_LOG_DEBUG(( "selector update on resolve returned %d for NULL peer key", status ) );
262 0 : } else {
263 0 : if( FD_UNLIKELY( !key->is_url ) ) {
264 0 : FD_BASE58_ENCODE_32_BYTES( key->pubkey->key, pubkey_b58 );
265 0 : FD_LOG_DEBUG(( "selector update on resolve returned %d for peer with pubkey %s", status, pubkey_b58 ) );
266 0 : } else {
267 0 : FD_LOG_DEBUG(( "selector update on resolve returned %d for peer %s with addr " FD_IP4_ADDR_FMT ":%hu", status, key->url.hostname,
268 0 : FD_IP4_ADDR_FMT_ARGS( key->url.resolved_addr.addr ), fd_ushort_bswap( key->url.resolved_addr.port ) ));
269 0 : }
270 0 : }
271 0 : }
272 0 : fd_sspeer_selector_process_cluster_slot( ctx->selector, full_slot, incr_slot );
273 0 : predict_incremental( ctx );
274 0 : }
275 :
276 : static void
277 : on_ping( void * _ctx,
278 : fd_ip4_port_t addr,
279 0 : ulong latency ) {
280 0 : fd_snapct_tile_t * ctx = (fd_snapct_tile_t *)_ctx;
281 :
282 0 : ulong cnt = fd_sspeer_selector_update_on_ping( ctx->selector, addr, latency );
283 0 : if( FD_UNLIKELY( !cnt ) ) {
284 : /* The update may fail in normal operation, e.g. after a peer has
285 : been removed from the selector. The log level is set to a
286 : minimum accordingly. */
287 0 : FD_LOG_DEBUG(( "selector update on ping did not find address " FD_IP4_ADDR_FMT ":%hu",
288 0 : FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ) ));
289 0 : }
290 0 : predict_incremental( ctx );
291 0 : }
292 :
293 : static void
294 : on_snapshot_hash( fd_snapct_tile_t * ctx,
295 : fd_sspeer_key_t const * key,
296 : fd_ip4_port_t addr,
297 0 : fd_gossip_update_message_t const * msg ) {
298 0 : ulong full_slot = msg->snapshot_hashes->full_slot;
299 0 : ulong incr_slot = FD_SSPEER_SLOT_UNKNOWN;
300 0 : uchar const * incr_hash = NULL;
301 :
302 0 : for( ulong i=0UL; i<msg->snapshot_hashes->incremental_len; i++ ) {
303 0 : if( FD_LIKELY( !incr_hash || msg->snapshot_hashes->incremental[ i ].slot>incr_slot ) ) {
304 0 : incr_slot = msg->snapshot_hashes->incremental[ i ].slot;
305 0 : incr_hash = msg->snapshot_hashes->incremental[ i ].hash;
306 0 : }
307 0 : }
308 :
309 0 : if( FD_UNLIKELY( !addr.l ) ) {
310 : /* A peer that does not advertise an rpc_addr cannot be added to
311 : the selector: if previously added, remove it. The remove
312 : operation becomes a no-op if the peer is not found. */
313 0 : fd_sspeer_selector_remove( ctx->selector, key );
314 0 : return;
315 0 : }
316 : /* The add may fail due to capacity/pool exhaustion, but the cluster
317 : slot should still be updated since the gossip observation is valid
318 : cluster-level intelligence regardless of whether this particular
319 : peer can be tracked. */
320 0 : fd_sspeer_selector_add( ctx->selector, key, addr, FD_SSPEER_LATENCY_UNKNOWN,
321 0 : full_slot, incr_slot,
322 0 : msg->snapshot_hashes->full_hash, incr_hash );
323 0 : fd_sspeer_selector_process_cluster_slot( ctx->selector, full_slot, incr_slot );
324 0 : predict_incremental( ctx );
325 0 : }
326 :
327 : static void
328 : send_expected_slot( fd_snapct_tile_t * ctx,
329 : fd_stem_context_t * stem,
330 0 : ulong slot ) {
331 0 : uint tsorig; uint tspub;
332 0 : fd_ssmsg_slot_to_frag( slot, &tsorig, &tspub );
333 0 : fd_stem_publish( stem, ctx->out_rp.idx, FD_SSMSG_EXPECTED_SLOT, 0UL, 0UL, 0UL, tsorig, tspub );
334 0 : }
335 :
336 : static void
337 0 : rename_full_snapshot( fd_snapct_tile_t * ctx ) {
338 0 : FD_TEST( -1!=ctx->local_out.dir_fd );
339 :
340 0 : if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd && ctx->http_full_snapshot_name[ 0 ]!='\0' ) ) {
341 0 : if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, TEMP_FULL_SNAP_NAME, ctx->local_out.dir_fd, ctx->http_full_snapshot_name ) ) )
342 0 : FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
343 0 : }
344 0 : }
345 :
346 : static void
347 0 : rename_incr_snapshot( fd_snapct_tile_t * ctx ) {
348 0 : FD_TEST( -1!=ctx->local_out.dir_fd );
349 :
350 0 : if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd && ctx->http_incr_snapshot_name[ 0 ]!='\0' ) ) {
351 0 : if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, TEMP_INCR_SNAP_NAME, ctx->local_out.dir_fd, ctx->http_incr_snapshot_name ) ) )
352 0 : FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
353 0 : }
354 0 : }
355 :
356 : static ulong
357 : rlimit_file_cnt( fd_topo_t const * topo FD_PARAM_UNUSED,
358 0 : fd_topo_tile_t const * tile ) {
359 0 : ulong cnt = 1UL + /* stderr */
360 0 : 1UL; /* logfile */
361 0 : if( download_enabled( tile ) ) {
362 0 : cnt += 1UL + /* ssping socket */
363 0 : 2UL + /* dirfd + full snapshot download temp fd */
364 0 : tile->snapct.sources.servers_cnt; /* http resolver peer full sockets */
365 0 : if( tile->snapct.incremental_snapshots ) {
366 0 : cnt += 1UL + /* incr snapshot download temp fd */
367 0 : tile->snapct.sources.servers_cnt; /* http resolver peer incr sockets */
368 0 : }
369 0 : }
370 0 : return cnt;
371 0 : }
372 :
373 : static ulong
374 : populate_allowed_seccomp( fd_topo_t const * topo,
375 : fd_topo_tile_t const * tile,
376 : ulong out_cnt,
377 0 : struct sock_filter * out ) {
378 :
379 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
380 :
381 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
382 0 : fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
383 :
384 0 : int ping_fd = download_enabled( tile ) ? fd_ssping_get_sockfd( ctx->ssping ) : -1;
385 0 : populate_sock_filter_policy_fd_snapct_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->local_out.dir_fd, (uint)ctx->local_out.full_snapshot_fd, (uint)ctx->local_out.incremental_snapshot_fd, (uint)ping_fd );
386 0 : return sock_filter_policy_fd_snapct_tile_instr_cnt;
387 0 : }
388 :
389 : static ulong
390 : populate_allowed_fds( fd_topo_t const * topo,
391 : fd_topo_tile_t const * tile,
392 : ulong out_fds_cnt,
393 0 : int * out_fds ) {
394 0 : if( FD_UNLIKELY( out_fds_cnt<6UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
395 :
396 0 : ulong out_cnt = 0;
397 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
398 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
399 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
400 0 : }
401 :
402 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
403 :
404 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
405 0 : fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
406 0 : if( FD_LIKELY( -1!=ctx->local_out.dir_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.dir_fd;
407 0 : if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.full_snapshot_fd;
408 0 : if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.incremental_snapshot_fd;
409 0 : if( FD_LIKELY( download_enabled( tile ) ) ) out_fds[ out_cnt++ ] = fd_ssping_get_sockfd( ctx->ssping );
410 :
411 0 : return out_cnt;
412 0 : }
413 :
414 : static void
415 : init_load( fd_snapct_tile_t * ctx,
416 : fd_stem_context_t * stem,
417 : int full,
418 0 : int file ) {
419 0 : ctx->snapshot_start_timestamp_ns = fd_log_wallclock();
420 0 : fd_ssctrl_init_t * out = fd_chunk_to_laddr( ctx->out_ld.mem, ctx->out_ld.chunk );
421 0 : out->file = file;
422 0 : out->zstd = !file || (full ? ctx->local_in.full_snapshot_zstd : ctx->local_in.incremental_snapshot_zstd);
423 0 : if( file ) {
424 0 : out->slot = full ? ctx->local_in.full_snapshot_slot : ctx->local_in.incremental_snapshot_slot;
425 0 : if( full ) fd_memcpy( out->snapshot_hash, ctx->local_in.full_snapshot_hash, FD_HASH_FOOTPRINT );
426 0 : else fd_memcpy( out->snapshot_hash, ctx->local_in.incremental_snapshot_hash, FD_HASH_FOOTPRINT );
427 0 : } else {
428 0 : out->slot = full ? ctx->predicted_incremental.full_slot : ctx->predicted_incremental.slot;
429 0 : if( full ) fd_memcpy( out->snapshot_hash, ctx->peer.full_hash, FD_HASH_FOOTPRINT );
430 0 : else fd_memcpy( out->snapshot_hash, ctx->peer.incr_hash, FD_HASH_FOOTPRINT );
431 0 : }
432 :
433 0 : if( !file ) {
434 0 : out->addr = ctx->peer.addr;
435 0 : char encoded_hash[ FD_BASE58_ENCODED_32_SZ ];
436 0 : if( full ) {
437 0 : fd_base58_encode_32( ctx->peer.full_hash, NULL, encoded_hash );
438 0 : FD_TEST( fd_cstr_printf_check( out->path, PATH_MAX, &out->path_len, "/snapshot-%lu-%s.tar.zst", ctx->peer.full_slot, encoded_hash ) );
439 0 : FD_TEST( fd_cstr_printf_check( ctx->http_full_snapshot_name, PATH_MAX, NULL, "snapshot-%lu-%s.tar.zst", ctx->peer.full_slot, encoded_hash ) );
440 0 : } else {
441 0 : fd_base58_encode_32( ctx->peer.incr_hash, NULL, encoded_hash );
442 0 : FD_TEST( fd_cstr_printf_check( out->path, PATH_MAX, &out->path_len, "/incremental-snapshot-%lu-%lu-%s.tar.zst", ctx->peer.full_slot, ctx->peer.incr_slot, encoded_hash ) );
443 0 : FD_TEST( fd_cstr_printf_check( ctx->http_incr_snapshot_name, PATH_MAX, NULL, "incremental-snapshot-%lu-%lu-%s.tar.zst", ctx->peer.full_slot, ctx->peer.incr_slot, encoded_hash ) );
444 0 : }
445 :
446 0 : for( ulong i=0UL; i<SERVER_PEERS_MAX; i++ ) {
447 0 : if( FD_UNLIKELY( ctx->peer.addr.l==ctx->config.sources.servers[ i ].addr.l ) ) {
448 0 : fd_cstr_ncpy( out->hostname, ctx->config.sources.servers[ i ].hostname, sizeof(out->hostname) );
449 0 : out->is_https = ctx->config.sources.servers[ i ].is_https;
450 0 : break;
451 0 : }
452 0 : }
453 0 : }
454 0 : fd_stem_publish( stem, ctx->out_ld.idx, full ? FD_SNAPSHOT_MSG_CTRL_INIT_FULL : FD_SNAPSHOT_MSG_CTRL_INIT_INCR, ctx->out_ld.chunk, sizeof(fd_ssctrl_init_t), 0UL, 0UL, 0UL );
455 0 : ctx->out_ld.chunk = fd_dcache_compact_next( ctx->out_ld.chunk, sizeof(fd_ssctrl_init_t), ctx->out_ld.chunk0, ctx->out_ld.wmark );
456 0 : ctx->flush_ack = 0;
457 :
458 : /* If we are downloading the snapshot, we will get the snapshot size
459 : in bytes from a metadata message sent from snapld. */
460 0 : if( file ) {
461 0 : if( full ) ctx->metrics.full.bytes_total = ctx->local_in.full_snapshot_size;
462 0 : else ctx->metrics.incremental.bytes_total = ctx->local_in.incremental_snapshot_size;
463 0 : }
464 :
465 0 : if( !file ) {
466 0 : if( full ) {
467 : /* reset any written content in the full output snapshot */
468 0 : if( FD_UNLIKELY( -1==ftruncate( ctx->local_out.full_snapshot_fd, 0UL ) ) ) {
469 0 : FD_LOG_ERR(( "ftruncate(%s) failed (%i-%s)", ctx->http_full_snapshot_name, errno, fd_io_strerror( errno ) ));
470 0 : }
471 0 : if( FD_UNLIKELY( -1==lseek( ctx->local_out.full_snapshot_fd, 0L, SEEK_SET ) ) ) {
472 0 : FD_LOG_ERR(( "lseek(%s) failed (%i-%s)", ctx->http_full_snapshot_name, errno, fd_io_strerror( errno ) ));
473 0 : }
474 0 : } else {
475 : /* reset any written content in the incremental snapshot output
476 : file */
477 0 : if( FD_UNLIKELY( -1==ftruncate( ctx->local_out.incremental_snapshot_fd, 0UL ) ) ) {
478 0 : FD_LOG_ERR(( "ftruncate(%s) failed (%i-%s)", ctx->http_incr_snapshot_name, errno, fd_io_strerror( errno ) ));
479 0 : }
480 0 : if( FD_UNLIKELY( -1==lseek( ctx->local_out.incremental_snapshot_fd, 0L, SEEK_SET ) ) ) {
481 0 : FD_LOG_ERR(( "lseek(%s) failed (%i-%s)", ctx->http_incr_snapshot_name, errno, fd_io_strerror( errno ) ));
482 0 : }
483 0 : }
484 0 : }
485 :
486 : /* Regardless of whether we load the snapshot from a file or download
487 : it, we know the name of the snapshot and can publish it to the gui
488 : here. */
489 0 : if( full ) {
490 0 : if( FD_LIKELY( !!ctx->out_gui.mem ) ) {
491 0 : if( file ) {
492 0 : fd_cstr_fini( ctx->http_full_snapshot_name );
493 0 : snapshot_path_gui_publish( ctx, stem, ctx->local_in.full_snapshot_path, 1 );
494 0 : }
495 0 : else {
496 0 : char snapshot_path[ PATH_MAX+30UL ]; /* 30 is fd_cstr_nlen( "https://255.255.255.255:65536/", ULONG_MAX ) */
497 0 : FD_TEST( fd_cstr_printf_check( snapshot_path, sizeof(snapshot_path), NULL, "http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_full_snapshot_name ) );
498 0 : snapshot_path_gui_publish( ctx, stem, snapshot_path, 1 );
499 0 : }
500 0 : }
501 0 : } else {
502 0 : if( FD_LIKELY( !!ctx->out_gui.mem ) ) {
503 0 : if( file ) {
504 0 : fd_cstr_fini( ctx->http_incr_snapshot_name );
505 0 : snapshot_path_gui_publish( ctx, stem, ctx->local_in.incremental_snapshot_path, 0 );
506 0 : } else {
507 0 : char snapshot_path[ PATH_MAX+30UL ]; /* 30 is fd_cstr_nlen( "https://255.255.255.255:65536/", ULONG_MAX ) */
508 0 : FD_TEST( fd_cstr_printf_check( snapshot_path, sizeof(snapshot_path), NULL, "http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_incr_snapshot_name ) );
509 0 : snapshot_path_gui_publish( ctx, stem, snapshot_path, 0 );
510 0 : }
511 0 : }
512 0 : }
513 0 : }
514 :
515 : static void
516 : log_download( fd_snapct_tile_t * ctx,
517 : int full,
518 : fd_ip4_port_t addr,
519 0 : ulong slot ) {
520 0 : for( gossip_ci_map_iter_t iter = gossip_ci_map_iter_init( ctx->gossip.ci_map, ctx->gossip.ci_table );
521 0 : !gossip_ci_map_iter_done( iter, ctx->gossip.ci_map, ctx->gossip.ci_table );
522 0 : iter = gossip_ci_map_iter_next( iter, ctx->gossip.ci_map, ctx->gossip.ci_table ) ) {
523 0 : gossip_ci_entry_t const * ci_entry = gossip_ci_map_iter_ele_const( iter, ctx->gossip.ci_map, ctx->gossip.ci_table );
524 0 : if( ci_entry->rpc_addr.l==addr.l ) {
525 0 : FD_TEST( ci_entry->allowed );
526 0 : FD_BASE58_ENCODE_32_BYTES( ci_entry->pubkey.uc, pubkey_b58 );
527 0 : FD_LOG_NOTICE(( "downloading %s snapshot at slot %lu from allowed gossip peer %s at http://" FD_IP4_ADDR_FMT ":%hu/%s",
528 0 : full ? "full" : "incremental", slot, pubkey_b58,
529 0 : FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ),
530 0 : full ? ctx->http_full_snapshot_name : ctx->http_incr_snapshot_name ));
531 0 : return;
532 0 : }
533 0 : }
534 :
535 0 : for( ulong i=0UL; i<ctx->config.sources.servers_cnt; i++ ) {
536 0 : if( addr.l==ctx->config.sources.servers[ i ].addr.l ) {
537 0 : if( ctx->config.sources.servers[ i ].is_https ) {
538 0 : FD_LOG_NOTICE(( "downloading %s snapshot at slot %lu from configured server with index %lu at https://%s:%hu/%s",
539 0 : full ? "full" : "incremental", slot, i,
540 0 : ctx->config.sources.servers[ i ].hostname, fd_ushort_bswap( addr.port ),
541 0 : full ? ctx->http_full_snapshot_name : ctx->http_incr_snapshot_name ));
542 0 : } else {
543 0 : FD_LOG_NOTICE(( "downloading %s snapshot at slot %lu from configured server with index %lu at http://" FD_IP4_ADDR_FMT ":%hu/%s",
544 0 : full ? "full" : "incremental", slot, i,
545 0 : FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ),
546 0 : full ? ctx->http_full_snapshot_name : ctx->http_incr_snapshot_name ));
547 0 : }
548 0 : return;
549 0 : }
550 0 : }
551 :
552 0 : FD_TEST( 0 ); /* should not be possible */
553 0 : }
554 :
555 : static void
556 : log_completion( fd_snapct_tile_t * ctx,
557 0 : int full ) {
558 0 : double elapsed = (double)(fd_log_wallclock() - ctx->snapshot_start_timestamp_ns) / 1e9;
559 0 : FD_LOG_NOTICE(( "%s snapshot load completed in %.3f seconds", full ? "full" : "incremental", elapsed ));
560 0 : }
561 :
562 : static void
563 : after_credit( fd_snapct_tile_t * ctx,
564 : fd_stem_context_t * stem,
565 : int * opt_poll_in FD_PARAM_UNUSED,
566 0 : int * charge_busy FD_PARAM_UNUSED ) {
567 0 : long now = fd_log_wallclock();
568 :
569 0 : if( FD_LIKELY( ctx->ssping ) ) fd_ssping_advance( ctx->ssping, now, ctx->selector );
570 0 : if( FD_LIKELY( ctx->ssresolver ) ) fd_http_resolver_advance( ctx->ssresolver, now, ctx->selector );
571 :
572 : /* send an expected slot message as the predicted incremental
573 : could have changed as a result of the pinger, resolver, or from
574 : processing gossip frags in gossip_frag. */
575 0 : if( FD_LIKELY( ctx->predicted_incremental.pending ) ) {
576 0 : send_expected_slot( ctx, stem, ctx->predicted_incremental.slot );
577 0 : ctx->predicted_incremental.pending = 0;
578 0 : }
579 :
580 : /* Note: All state transitions should occur within this switch
581 : statement to make it easier to reason about the state management. */
582 :
583 0 : switch ( ctx->state ) {
584 :
585 : /* ============================================================== */
586 0 : case FD_SNAPCT_STATE_INIT: {
587 0 : if( FD_UNLIKELY( !ctx->download_enabled ) ) {
588 0 : ulong local_slot = ctx->config.incremental_snapshots ? ctx->local_in.incremental_snapshot_slot : ctx->local_in.full_snapshot_slot;
589 0 : send_expected_slot( ctx, stem, local_slot );
590 0 : FD_LOG_NOTICE(( "reading full snapshot at slot %lu from local file `%s`", ctx->local_in.full_snapshot_slot, ctx->local_in.full_snapshot_path ));
591 0 : ctx->predicted_incremental.full_slot = ctx->local_in.full_snapshot_slot;
592 0 : ctx->state = FD_SNAPCT_STATE_READING_FULL_FILE;
593 0 : init_load( ctx, stem, 1, 1 );
594 0 : break;
595 0 : }
596 0 : ctx->deadline_nanos = now+FD_SNAPCT_WAITING_FOR_PEERS_TIMEOUT;
597 0 : ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS;
598 0 : break;
599 0 : }
600 :
601 : /* ============================================================== */
602 0 : case FD_SNAPCT_STATE_WAITING_FOR_PEERS: {
603 0 : if( FD_UNLIKELY( now>ctx->deadline_nanos ) ) FD_LOG_ERR(( "timed out waiting for peers." ));
604 :
605 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, FD_SSPEER_SLOT_UNKNOWN );
606 0 : if( FD_LIKELY( best.addr.l ) ) {
607 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS;
608 0 : ctx->deadline_nanos = now+FD_SNAPCT_COLLECTING_PEERS_TIMEOUT;
609 0 : }
610 0 : break;
611 0 : }
612 :
613 : /* ============================================================== */
614 0 : case FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL: {
615 0 : if( FD_UNLIKELY( now>ctx->deadline_nanos ) ) FD_LOG_ERR(( "timed out waiting for incremental snapshot peers." ));
616 :
617 0 : FD_TEST( ctx->predicted_incremental.full_slot!=FD_SSPEER_SLOT_UNKNOWN );
618 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
619 0 : if( FD_LIKELY( best.addr.l ) ) {
620 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
621 0 : ctx->deadline_nanos = now;
622 0 : }
623 0 : break;
624 0 : }
625 :
626 : /* ============================================================== */
627 0 : case FD_SNAPCT_STATE_COLLECTING_PEERS: {
628 0 : if( FD_UNLIKELY( !ctx->gossip.saturated && now<ctx->deadline_nanos ) ) break;
629 :
630 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, FD_SSPEER_SLOT_UNKNOWN );
631 0 : if( FD_UNLIKELY( !best.addr.l ) ) {
632 0 : if( !ctx->gossip_enabled ) {
633 0 : FD_LOG_ERR(( "no peers are available and discovery of new peers via gossip is disabled. aborting." ));
634 0 : }
635 0 : ctx->deadline_nanos = now + FD_SNAPCT_WAITING_FOR_PEERS_TIMEOUT;
636 0 : ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS;
637 0 : break;
638 0 : }
639 :
640 0 : fd_sscluster_slot_t cluster = fd_sspeer_selector_cluster_slot( ctx->selector );
641 0 : if( FD_UNLIKELY( cluster.incremental==FD_SSPEER_SLOT_UNKNOWN && ctx->config.incremental_snapshots ) ) {
642 : /* We must have a cluster full slot to be in this state. */
643 0 : FD_TEST( cluster.full!=FD_SSPEER_SLOT_UNKNOWN );
644 : /* fall back to full snapshot only if the highest cluster slot
645 : is a full snapshot only */
646 0 : FD_LOG_WARNING(( "incremental snapshots were enabled via [snapshots.incremental_snapshots], but no incremental snapshot is available in the cluster. "
647 0 : "falling back to full snapshots only." ));
648 0 : ctx->config.incremental_snapshots = 0;
649 0 : }
650 :
651 0 : ulong cluster_slot = ctx->config.incremental_snapshots ? cluster.incremental : cluster.full;
652 0 : ulong local_slot = ctx->config.incremental_snapshots ? ctx->local_in.incremental_snapshot_slot : ctx->local_in.full_snapshot_slot;
653 0 : ulong local_slot_with_download = local_slot;
654 0 : int local_too_old = local_slot!=ULONG_MAX && ctx->local_in.full_snapshot_slot!=ULONG_MAX && local_slot<fd_ulong_sat_sub( cluster_slot, ctx->config.sources.max_local_incremental_age );
655 0 : int local_full_only = ctx->local_in.incremental_snapshot_slot==ULONG_MAX && ctx->local_in.full_snapshot_slot!=ULONG_MAX;
656 0 : if( FD_LIKELY( (ctx->config.incremental_snapshots && local_full_only) || local_too_old ) ) {
657 0 : fd_sspeer_t best_incremental = fd_sspeer_selector_best( ctx->selector, 1, ctx->local_in.full_snapshot_slot );
658 0 : if( FD_LIKELY( best_incremental.addr.l ) ) {
659 0 : ctx->predicted_incremental.slot = best_incremental.incr_slot;
660 0 : local_slot_with_download = best_incremental.incr_slot;
661 0 : ctx->local_in.incremental_snapshot_slot = ULONG_MAX; /* don't use the local incremental snapshot */
662 0 : }
663 0 : }
664 :
665 0 : int can_use_local_full = local_slot_with_download!=ULONG_MAX && ctx->local_in.full_snapshot_slot!=ULONG_MAX &&
666 0 : local_slot_with_download>=fd_ulong_sat_sub( cluster_slot, ctx->config.sources.max_local_full_effective_age );
667 0 : if( FD_LIKELY( can_use_local_full ) ) {
668 0 : send_expected_slot( ctx, stem, local_slot_with_download );
669 :
670 0 : FD_LOG_NOTICE(( "reading full snapshot at slot %lu with cluster slot %lu from local file `%s`",
671 0 : ctx->local_in.full_snapshot_slot, cluster_slot, ctx->local_in.full_snapshot_path ));
672 0 : ctx->predicted_incremental.full_slot = ctx->local_in.full_snapshot_slot;
673 0 : ctx->state = FD_SNAPCT_STATE_READING_FULL_FILE;
674 0 : init_load( ctx, stem, 1, 1 );
675 0 : } else {
676 0 : if( FD_LIKELY( ctx->local_in.full_snapshot_slot!=ULONG_MAX ) ) {
677 0 : FD_LOG_NOTICE(( "local snapshot at slot %lu is too old for cluster slot %lu max age %u, downloading instead",
678 0 : local_slot, cluster_slot, ctx->config.sources.max_local_full_effective_age ));
679 0 : } else {
680 0 : FD_LOG_NOTICE(( "no local snapshot available, downloading from peer" ));
681 0 : }
682 :
683 0 : if( FD_UNLIKELY( !ctx->config.incremental_snapshots ) ) send_expected_slot( ctx, stem, best.full_slot );
684 :
685 0 : fd_sspeer_t best_incremental = fd_sspeer_selector_best( ctx->selector, 1, best.full_slot );
686 0 : if( FD_LIKELY( best_incremental.addr.l ) ) {
687 0 : ctx->predicted_incremental.slot = best_incremental.incr_slot;
688 0 : send_expected_slot( ctx, stem, best_incremental.incr_slot );
689 0 : }
690 :
691 0 : ctx->peer = best;
692 0 : ctx->state = FD_SNAPCT_STATE_READING_FULL_HTTP;
693 0 : ctx->predicted_incremental.full_slot = best.full_slot;
694 0 : init_load( ctx, stem, 1, 0 );
695 0 : log_download( ctx, 1, best.addr, best.full_slot );
696 0 : }
697 0 : break;
698 0 : }
699 :
700 : /* ============================================================== */
701 0 : case FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL: {
702 0 : if( FD_UNLIKELY( now<ctx->deadline_nanos ) ) break;
703 :
704 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
705 0 : if( FD_UNLIKELY( !best.addr.l ) ) {
706 0 : if( !ctx->gossip_enabled ) {
707 0 : FD_LOG_ERR(( "no incremental snapshot peers are available and discovery of new peers via gossip is disabled. aborting." ));
708 0 : }
709 0 : ctx->deadline_nanos = now + FD_SNAPCT_WAITING_FOR_PEERS_TIMEOUT;
710 0 : ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL;
711 0 : break;
712 0 : }
713 :
714 : /* decide whether to use the local incremental snapshot if one
715 : exists and is not too old, otherwise download a new incremental
716 : snapshot. */
717 0 : ulong cluster_slot = fd_sspeer_selector_cluster_slot( ctx->selector ).incremental;
718 0 : ulong local_slot = ctx->local_in.incremental_snapshot_slot;
719 0 : int local_too_old = local_slot<fd_ulong_sat_sub( cluster_slot, ctx->config.sources.max_local_incremental_age );
720 0 : if( FD_LIKELY( local_slot!=ULONG_MAX && !local_too_old ) ) {
721 0 : ctx->predicted_incremental.slot = local_slot;
722 0 : send_expected_slot( ctx, stem, local_slot );
723 :
724 0 : FD_LOG_NOTICE(( "reading incremental snapshot at slot %lu from local file `%s`", ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
725 0 : ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_FILE;
726 0 : init_load( ctx, stem, 0, 1 );
727 0 : } else {
728 0 : ctx->predicted_incremental.slot = best.incr_slot;
729 0 : send_expected_slot( ctx, stem, best.incr_slot );
730 :
731 0 : ctx->peer = best;
732 0 : ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP;
733 0 : init_load( ctx, stem, 0, 0 );
734 0 : log_download( ctx, 0, best.addr, best.incr_slot );
735 0 : }
736 0 : break;
737 0 : }
738 :
739 : /* ============================================================== */
740 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI:
741 0 : if( !ctx->flush_ack ) break;
742 :
743 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
744 0 : ctx->malformed = 0;
745 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
746 0 : ctx->flush_ack = 0;
747 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET;
748 0 : FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from local file `%s`",
749 0 : ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
750 0 : break;
751 0 : }
752 :
753 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE;
754 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
755 0 : ctx->flush_ack = 0;
756 0 : break;
757 :
758 : /* ============================================================== */
759 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE:
760 0 : if( !ctx->flush_ack ) break;
761 :
762 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
763 0 : ctx->malformed = 0;
764 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
765 0 : ctx->flush_ack = 0;
766 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET;
767 0 : FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from local file `%s`",
768 0 : ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
769 0 : break;
770 0 : }
771 :
772 0 : log_completion( ctx, 0/*incr*/ );
773 0 : ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
774 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
775 0 : break;
776 :
777 : /* ============================================================== */
778 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI:
779 0 : if( !ctx->flush_ack ) break;
780 :
781 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
782 0 : ctx->malformed = 0;
783 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
784 0 : ctx->flush_ack = 0;
785 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
786 0 : FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
787 0 : "blacklisting peer due to download failure.",
788 0 : ctx->predicted_incremental.slot,
789 0 : FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_incr_snapshot_name ));
790 0 : fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
791 0 : fd_sspeer_selector_remove_by_addr( ctx->selector, ctx->peer.addr );
792 0 : break;
793 0 : }
794 :
795 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE;
796 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
797 0 : ctx->flush_ack = 0;
798 0 : break;
799 :
800 : /* ============================================================== */
801 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE:
802 0 : if( !ctx->flush_ack ) break;
803 :
804 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
805 0 : ctx->malformed = 0;
806 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
807 0 : ctx->flush_ack = 0;
808 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
809 0 : FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
810 0 : "blacklisting peer due to download failure.",
811 0 : ctx->predicted_incremental.slot,
812 0 : FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_incr_snapshot_name ));
813 0 : fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
814 0 : fd_sspeer_selector_remove_by_addr( ctx->selector, ctx->peer.addr );
815 0 : break;
816 0 : }
817 :
818 0 : log_completion( ctx, 0/*incr*/ );
819 0 : ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
820 0 : rename_incr_snapshot( ctx );
821 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
822 0 : break;
823 :
824 : /* ============================================================== */
825 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI:
826 0 : if( !ctx->flush_ack ) break;
827 :
828 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
829 0 : ctx->malformed = 0;
830 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
831 0 : ctx->flush_ack = 0;
832 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET;
833 0 : FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from local file `%s`",
834 0 : ctx->local_in.full_snapshot_slot, ctx->local_in.full_snapshot_path ));
835 0 : break;
836 0 : }
837 :
838 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE;
839 0 : ulong sig = ctx->config.incremental_snapshots &&
840 0 : (ctx->local_in.incremental_snapshot_slot!=ULONG_MAX || ctx->download_enabled) ? FD_SNAPSHOT_MSG_CTRL_NEXT : FD_SNAPSHOT_MSG_CTRL_DONE;
841 0 : if( sig==FD_SNAPSHOT_MSG_CTRL_DONE && ctx->config.incremental_snapshots ) {
842 : /* set incremental snapshots to 0 if there is no local
843 : incremental snapshot and download is not enabled. */
844 0 : FD_LOG_WARNING(( "incremental snapshots were enabled via [snapshots.incremental_snapshots] "
845 0 : "but no incremental snapshot exists on disk and no snapshot peers are configured. "
846 0 : "skipping incremental snapshot load." ));
847 0 : ctx->config.incremental_snapshots = 0;
848 0 : }
849 0 : fd_stem_publish( stem, ctx->out_ld.idx, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
850 0 : ctx->flush_ack = 0;
851 0 : break;
852 :
853 : /* ============================================================== */
854 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE:
855 0 : if( !ctx->flush_ack ) break;
856 :
857 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
858 0 : ctx->malformed = 0;
859 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
860 0 : ctx->flush_ack = 0;
861 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET;
862 0 : FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from local file `%s`",
863 0 : ctx->local_in.full_snapshot_slot, ctx->local_in.full_snapshot_path ));
864 0 : break;
865 0 : }
866 :
867 0 : log_completion( ctx, 1/*full*/ );
868 0 : if( FD_LIKELY( !ctx->config.incremental_snapshots ) ) {
869 0 : ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
870 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
871 0 : break;
872 0 : }
873 :
874 0 : if( FD_LIKELY( ctx->download_enabled ) ) {
875 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
876 0 : ctx->deadline_nanos = 0L;
877 0 : } else {
878 0 : FD_LOG_NOTICE(( "reading incremental snapshot at slot %lu from local file `%s`", ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
879 0 : ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_FILE;
880 0 : init_load( ctx, stem, 0, 1 );
881 0 : }
882 0 : break;
883 :
884 : /* ============================================================== */
885 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI:
886 0 : if( !ctx->flush_ack ) break;
887 :
888 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
889 0 : ctx->malformed = 0;
890 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
891 0 : ctx->flush_ack = 0;
892 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
893 0 : FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
894 0 : "blacklisting peer due to download failure.",
895 0 : ctx->predicted_incremental.full_slot,
896 0 : FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_full_snapshot_name ));
897 0 : fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
898 0 : fd_sspeer_selector_remove_by_addr( ctx->selector, ctx->peer.addr );
899 0 : break;
900 0 : }
901 :
902 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE;
903 0 : fd_stem_publish( stem, ctx->out_ld.idx, ctx->config.incremental_snapshots ? FD_SNAPSHOT_MSG_CTRL_NEXT : FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
904 0 : ctx->flush_ack = 0;
905 0 : break;
906 :
907 : /* ============================================================== */
908 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE:
909 0 : if( !ctx->flush_ack ) break;
910 :
911 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
912 0 : ctx->malformed = 0;
913 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
914 0 : ctx->flush_ack = 0;
915 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
916 0 : FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
917 0 : "blacklisting peer due to download failure.",
918 0 : ctx->predicted_incremental.full_slot,
919 0 : FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_full_snapshot_name ));
920 0 : fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
921 0 : fd_sspeer_selector_remove_by_addr( ctx->selector, ctx->peer.addr );
922 0 : break;
923 0 : }
924 :
925 0 : rename_full_snapshot( ctx );
926 :
927 0 : log_completion( ctx, 1/*full*/ );
928 0 : if( FD_LIKELY( !ctx->config.incremental_snapshots ) ) {
929 0 : ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
930 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
931 0 : break;
932 0 : }
933 :
934 : /* Get the best incremental peer to download from */
935 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
936 0 : if( FD_UNLIKELY( !best.addr.l ) ) {
937 0 : ctx->deadline_nanos = now;
938 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
939 0 : break;
940 0 : }
941 :
942 0 : ctx->predicted_incremental.slot = best.incr_slot;
943 0 : send_expected_slot( ctx, stem, best.incr_slot );
944 :
945 0 : ctx->peer = best;
946 0 : ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP;
947 0 : init_load( ctx, stem, 0, 0 );
948 0 : log_download( ctx, 0, best.addr, best.incr_slot );
949 0 : break;
950 :
951 : /* ============================================================== */
952 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
953 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET:
954 0 : if( !ctx->flush_ack ) break;
955 :
956 0 : if( ctx->metrics.full.num_retries==ctx->config.max_retry_abort ) {
957 0 : FD_LOG_ERR(( "hit retry limit of %u for full snapshot, aborting", ctx->config.max_retry_abort ));
958 0 : }
959 :
960 0 : ctx->metrics.full.num_retries++;
961 0 : FD_LOG_NOTICE(( "retrying full snapshot download (attempt %u/%u)",
962 0 : ctx->metrics.full.num_retries, ctx->config.max_retry_abort ));
963 :
964 0 : ctx->metrics.full.bytes_read = 0UL;
965 0 : ctx->metrics.full.bytes_written = 0UL;
966 0 : ctx->metrics.full.bytes_total = 0UL;
967 :
968 0 : ctx->metrics.incremental.bytes_read = 0UL;
969 0 : ctx->metrics.incremental.bytes_written = 0UL;
970 0 : ctx->metrics.incremental.bytes_total = 0UL;
971 :
972 0 : if( !ctx->download_enabled ) {
973 : /* if we are unable to download new snapshots and unable to load
974 : our local snapshot, we must shutdown the validator. */
975 0 : FD_LOG_ERR(( "unable to load local snapshot %s and no snapshot peers were configured. aborting.", ctx->local_in.full_snapshot_path ));
976 0 : } else {
977 0 : if( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ) ctx->local_in.full_snapshot_slot = ULONG_MAX;
978 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS;
979 0 : ctx->deadline_nanos = 0L;
980 0 : }
981 0 : break;
982 :
983 : /* ============================================================== */
984 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET:
985 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
986 0 : if( !ctx->flush_ack ) break;
987 :
988 0 : if( ctx->metrics.incremental.num_retries==ctx->config.max_retry_abort ) {
989 0 : FD_LOG_ERR(("hit retry limit of %u for incremental snapshot. aborting", ctx->config.max_retry_abort ));
990 0 : }
991 :
992 0 : ctx->metrics.incremental.num_retries++;
993 0 : FD_LOG_NOTICE(( "retrying incremental snapshot download (attempt %u/%u)",
994 0 : ctx->metrics.incremental.num_retries, ctx->config.max_retry_abort ));
995 :
996 0 : ctx->metrics.incremental.bytes_read = 0UL;
997 0 : ctx->metrics.incremental.bytes_written = 0UL;
998 0 : ctx->metrics.incremental.bytes_total = 0UL;
999 :
1000 0 : if( !ctx->download_enabled ) {
1001 : /* if we are unable to download new snapshots and unable to load
1002 : our local snapshot, we must shutdown the validator. */
1003 0 : FD_LOG_ERR(( "unable to load local snapshot %s and no snapshot peers were configured. aborting.", ctx->local_in.incremental_snapshot_path ));
1004 0 : } else {
1005 0 : if( ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ctx->local_in.incremental_snapshot_slot = ULONG_MAX;
1006 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
1007 0 : ctx->deadline_nanos = 0L;
1008 0 : }
1009 0 : break;
1010 :
1011 : /* ============================================================== */
1012 0 : case FD_SNAPCT_STATE_READING_FULL_FILE:
1013 0 : if( FD_UNLIKELY( !ctx->flush_ack ) ) break;
1014 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
1015 0 : ctx->malformed = 0;
1016 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
1017 0 : ctx->flush_ack = 0;
1018 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET;
1019 0 : FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from local file `%s`",
1020 0 : ctx->local_in.full_snapshot_slot, ctx->local_in.full_snapshot_path ));
1021 0 : break;
1022 0 : }
1023 0 : FD_TEST( ctx->metrics.full.bytes_total!=0UL );
1024 0 : if( FD_UNLIKELY( ctx->metrics.full.bytes_read == ctx->metrics.full.bytes_total ) ) {
1025 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FINI, 0UL, 0UL, 0UL, 0UL, 0UL );
1026 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI;
1027 0 : ctx->flush_ack = 0;
1028 0 : }
1029 0 : break;
1030 :
1031 : /* ============================================================== */
1032 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
1033 0 : if( FD_UNLIKELY( !ctx->flush_ack ) ) break;
1034 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
1035 0 : ctx->malformed = 0;
1036 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
1037 0 : ctx->flush_ack = 0;
1038 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET;
1039 0 : FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from local file `%s`",
1040 0 : ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
1041 0 : break;
1042 0 : }
1043 0 : FD_TEST( ctx->metrics.incremental.bytes_total!=0UL );
1044 0 : if ( FD_UNLIKELY( ctx->metrics.incremental.bytes_read == ctx->metrics.incremental.bytes_total ) ) {
1045 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FINI, 0UL, 0UL, 0UL, 0UL, 0UL );
1046 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI;
1047 0 : ctx->flush_ack = 0;
1048 0 : }
1049 0 : break;
1050 :
1051 : /* ============================================================== */
1052 0 : case FD_SNAPCT_STATE_READING_FULL_HTTP:
1053 0 : if( FD_UNLIKELY( !ctx->flush_ack ) ) break;
1054 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
1055 0 : ctx->malformed = 0;
1056 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
1057 0 : ctx->flush_ack = 0;
1058 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
1059 0 : FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
1060 0 : "blacklisting peer due to download failure",
1061 0 : ctx->predicted_incremental.full_slot,
1062 0 : FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_full_snapshot_name ));
1063 0 : fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
1064 0 : fd_sspeer_selector_remove_by_addr( ctx->selector, ctx->peer.addr );
1065 0 : break;
1066 0 : }
1067 0 : if( FD_UNLIKELY( ctx->metrics.full.bytes_total!=0UL && ctx->metrics.full.bytes_read==ctx->metrics.full.bytes_total ) ) {
1068 0 : ulong sig = FD_SNAPSHOT_MSG_CTRL_FINI;
1069 0 : fd_stem_publish( stem, ctx->out_ld.idx, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
1070 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI;
1071 0 : ctx->flush_ack = 0;
1072 0 : }
1073 0 : break;
1074 :
1075 : /* ============================================================== */
1076 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP:
1077 0 : if( FD_UNLIKELY( !ctx->flush_ack ) ) break;
1078 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
1079 0 : ctx->malformed = 0;
1080 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
1081 0 : ctx->flush_ack = 0;
1082 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
1083 0 : FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
1084 0 : "blacklisting peer due to download failure",
1085 0 : ctx->predicted_incremental.slot,
1086 0 : FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_incr_snapshot_name ));
1087 0 : fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
1088 0 : fd_sspeer_selector_remove_by_addr( ctx->selector, ctx->peer.addr );
1089 0 : break;
1090 0 : }
1091 0 : if ( FD_UNLIKELY( ctx->metrics.incremental.bytes_total!=0UL && ctx->metrics.incremental.bytes_read==ctx->metrics.incremental.bytes_total ) ) {
1092 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FINI, 0UL, 0UL, 0UL, 0UL, 0UL );
1093 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI;
1094 0 : ctx->flush_ack = 0;
1095 0 : }
1096 0 : break;
1097 :
1098 : /* ============================================================== */
1099 0 : case FD_SNAPCT_STATE_SHUTDOWN:
1100 : /* Transitioning to the shutdown state indicates snapshot load is
1101 : completed without errors. Otherwise, snapct would have aborted
1102 : earlier. */
1103 0 : break;
1104 :
1105 : /* ============================================================== */
1106 0 : default: FD_LOG_ERR(( "unexpected state %d", ctx->state ));
1107 0 : }
1108 0 : }
1109 :
1110 : static void
1111 : gossip_frag( fd_snapct_tile_t * ctx,
1112 : ulong sig,
1113 : ulong sz FD_PARAM_UNUSED,
1114 0 : ulong chunk ) {
1115 0 : FD_TEST( ctx->gossip_enabled );
1116 :
1117 0 : if( FD_UNLIKELY( sig==FD_GOSSIP_UPDATE_TAG_PEER_SATURATED ) ) {
1118 0 : FD_LOG_NOTICE(( "gossip peer discovery saturated" ));
1119 0 : ctx->gossip.saturated = 1;
1120 0 : return;
1121 0 : }
1122 :
1123 0 : if( !( sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO ||
1124 0 : sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE ||
1125 0 : sig==FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES ) ) return;
1126 :
1127 0 : fd_gossip_update_message_t const * msg = fd_chunk_to_laddr_const( ctx->gossip_in_mem, chunk );
1128 0 : switch( msg->tag ) {
1129 0 : case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO: {
1130 0 : FD_TEST( msg->contact_info->idx<GOSSIP_PEERS_MAX );
1131 0 : fd_pubkey_t const * pubkey = (fd_pubkey_t const *)msg->origin;
1132 0 : gossip_ci_entry_t * entry = ctx->gossip.ci_table + msg->contact_info->idx;
1133 0 : if( FD_UNLIKELY( !fd_pubkey_eq( &entry->pubkey, pubkey ) ) ) {
1134 : /* Initialize the new gossip entry, which may or may not be allowed */
1135 0 : FD_TEST( fd_pubkey_check_zero( &entry->pubkey ) );
1136 0 : entry->pubkey = *pubkey;
1137 0 : entry->rpc_addr.l = 0UL;
1138 0 : if( ctx->config.sources.gossip.allow_any ) {
1139 0 : entry->allowed = 1;
1140 0 : for( ulong i=0UL; i<ctx->config.sources.gossip.block_list_cnt; i++ ) {
1141 0 : if( fd_pubkey_eq( pubkey, &ctx->config.sources.gossip.block_list[ i ] ) ) {
1142 0 : entry->allowed = 0;
1143 0 : break;
1144 0 : }
1145 0 : }
1146 0 : } else {
1147 0 : entry->allowed = 0;
1148 0 : for( ulong i=0UL; i<ctx->config.sources.gossip.allow_list_cnt; i++ ) {
1149 0 : if( fd_pubkey_eq( pubkey, &ctx->config.sources.gossip.allow_list[ i ] ) ) {
1150 0 : entry->allowed = 1;
1151 0 : break;
1152 0 : }
1153 0 : }
1154 0 : }
1155 0 : FD_TEST( ULONG_MAX==gossip_ci_map_idx_query_const( ctx->gossip.ci_map, pubkey, ULONG_MAX, ctx->gossip.ci_table ) );
1156 0 : if( entry->allowed ) {
1157 0 : gossip_ci_map_idx_insert( ctx->gossip.ci_map, msg->contact_info->idx, ctx->gossip.ci_table );
1158 0 : ctx->gossip.allowed_cnt++;
1159 : /* Allow-list shortcut: if an explicit allow list is
1160 : configured and all expected peers have arrived, declare
1161 : saturation immediately without waiting for the gossip
1162 : tile's general saturation signal. */
1163 0 : if( FD_UNLIKELY( !ctx->config.sources.gossip.allow_any &&
1164 0 : ctx->config.sources.gossip.allow_list_cnt>0UL &&
1165 0 : ctx->gossip.allowed_cnt==ctx->config.sources.gossip.allow_list_cnt ) ) {
1166 0 : FD_LOG_NOTICE(( "all %lu allowed gossip peers discovered", ctx->config.sources.gossip.allow_list_cnt ));
1167 0 : ctx->gossip.saturated = 1;
1168 0 : }
1169 0 : }
1170 0 : }
1171 0 : if( !entry->allowed ) break;
1172 : /* Maybe update the RPC address of a new or existing allowed gossip peer */
1173 0 : fd_ip4_port_t cur_addr = entry->rpc_addr;
1174 0 : fd_ip4_port_t new_addr;
1175 0 : new_addr.addr = msg->contact_info->value->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_RPC ].is_ipv6 ? 0 : msg->contact_info->value->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_RPC ].ip4;
1176 0 : new_addr.port = msg->contact_info->value->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_RPC ].port;
1177 :
1178 0 : if( FD_UNLIKELY( new_addr.l!=cur_addr.l ) ) {
1179 0 : fd_sspeer_key_t entry_key = {0};
1180 0 : *entry_key.pubkey = entry->pubkey;
1181 0 : entry_key.is_url = 0;
1182 0 : if( FD_LIKELY( !!new_addr.l ) ) {
1183 0 : if( FD_UNLIKELY( FD_SSPEER_SCORE_INVALID==fd_sspeer_selector_add( ctx->selector, &entry_key, new_addr,
1184 0 : FD_SSPEER_LATENCY_UNKNOWN, FD_SSPEER_SLOT_UNKNOWN,
1185 0 : FD_SSPEER_SLOT_UNKNOWN, NULL, NULL ) ) ) {
1186 0 : break;
1187 0 : }
1188 0 : fd_ssping_add( ctx->ssping, new_addr );
1189 0 : } else {
1190 0 : fd_sspeer_selector_remove( ctx->selector, &entry_key );
1191 0 : }
1192 0 : if( FD_LIKELY( !!cur_addr.l ) ) {
1193 0 : fd_ssping_remove( ctx->ssping, cur_addr );
1194 0 : }
1195 0 : entry->rpc_addr = new_addr;
1196 0 : if( !ctx->config.sources.gossip.allow_any ) {
1197 0 : FD_BASE58_ENCODE_32_BYTES( pubkey->uc, pubkey_b58 );
1198 0 : if( FD_LIKELY( !!new_addr.l ) ) {
1199 0 : FD_LOG_NOTICE(( "allowed gossip peer added with public key `%s` and RPC address `" FD_IP4_ADDR_FMT ":%hu`",
1200 0 : pubkey_b58, FD_IP4_ADDR_FMT_ARGS( new_addr.addr ), fd_ushort_bswap( new_addr.port ) ));
1201 0 : } else {
1202 0 : FD_LOG_WARNING(( "allowed gossip peer with public key `%s` does not advertise an RPC address", pubkey_b58 ));
1203 0 : }
1204 0 : }
1205 0 : }
1206 0 : break;
1207 0 : }
1208 0 : case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE: {
1209 0 : FD_TEST( msg->contact_info_remove->idx<GOSSIP_PEERS_MAX );
1210 0 : gossip_ci_entry_t * entry = ctx->gossip.ci_table + msg->contact_info_remove->idx;
1211 0 : ulong rem_idx = gossip_ci_map_idx_remove( ctx->gossip.ci_map, &entry->pubkey, ULONG_MAX, ctx->gossip.ci_table );
1212 0 : if( rem_idx==ULONG_MAX ) break;
1213 0 : FD_TEST( entry->allowed && rem_idx==msg->contact_info_remove->idx );
1214 0 : ctx->gossip.allowed_cnt--;
1215 0 : fd_ip4_port_t addr = entry->rpc_addr;
1216 0 : if( FD_LIKELY( !!addr.l ) ) {
1217 0 : fd_ssping_remove( ctx->ssping, addr );
1218 0 : fd_sspeer_key_t entry_key = {0};
1219 0 : *entry_key.pubkey = entry->pubkey;
1220 0 : entry_key.is_url = 0;
1221 0 : fd_sspeer_selector_remove( ctx->selector, &entry_key );
1222 0 : }
1223 0 : if( !ctx->config.sources.gossip.allow_any ) {
1224 0 : FD_BASE58_ENCODE_32_BYTES( entry->pubkey.uc, pubkey_b58 );
1225 0 : FD_LOG_WARNING(( "allowed gossip peer removed with public key `%s` and RPC address `" FD_IP4_ADDR_FMT ":%hu`",
1226 0 : pubkey_b58, FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ) ));
1227 0 : }
1228 0 : fd_memset( entry, 0, sizeof(*entry) );
1229 0 : break;
1230 0 : }
1231 0 : case FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES: {
1232 0 : ulong idx = gossip_ci_map_idx_query_const( ctx->gossip.ci_map, (fd_pubkey_t const *)msg->origin, ULONG_MAX, ctx->gossip.ci_table );
1233 0 : if( FD_LIKELY( idx!=ULONG_MAX ) ) {
1234 0 : gossip_ci_entry_t * entry = ctx->gossip.ci_table + idx;
1235 0 : FD_TEST( entry->allowed );
1236 0 : fd_sspeer_key_t entry_key = {0};
1237 0 : *entry_key.pubkey = entry->pubkey;
1238 0 : entry_key.is_url = 0;
1239 0 : on_snapshot_hash( ctx, &entry_key, entry->rpc_addr, msg );
1240 0 : }
1241 0 : break;
1242 0 : }
1243 0 : default:
1244 0 : FD_LOG_ERR(( "snapct: unexpected gossip tag %u", (uint)msg->tag ));
1245 0 : break;
1246 0 : }
1247 0 : }
1248 :
1249 : static void
1250 : snapld_frag( fd_snapct_tile_t * ctx,
1251 : ulong sig,
1252 : ulong sz,
1253 0 : ulong chunk ) {
1254 0 : if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_META ) ) {
1255 : /* Before snapld starts sending down data fragments, it first sends
1256 : a metadata message containing the total size of the snapshot as
1257 : well as the filename. This is only done for HTTP loading. */
1258 0 : int full;
1259 0 : switch( ctx->state ) {
1260 0 : case FD_SNAPCT_STATE_READING_FULL_HTTP: full = 1; break;
1261 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP: full = 0; break;
1262 :
1263 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
1264 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
1265 0 : return; /* Ignore */
1266 0 : default: FD_LOG_ERR(( "invalid meta frag in state %d", ctx->state ));
1267 0 : }
1268 :
1269 0 : FD_TEST( sz==sizeof(fd_ssctrl_meta_t) );
1270 0 : fd_ssctrl_meta_t const * meta = fd_chunk_to_laddr_const( ctx->snapld_in_mem, chunk );
1271 :
1272 0 : if( full ) ctx->metrics.full.bytes_total = meta->total_sz;
1273 0 : else ctx->metrics.incremental.bytes_total = meta->total_sz;
1274 :
1275 0 : return;
1276 0 : }
1277 0 : if( FD_UNLIKELY( sig!=FD_SNAPSHOT_MSG_DATA ) ) return;
1278 :
1279 0 : int full, file;
1280 0 : switch( ctx->state ) {
1281 : /* Expected cases, fall through below */
1282 0 : case FD_SNAPCT_STATE_READING_FULL_FILE: full = 1; file = 1; break;
1283 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE: full = 0; file = 1; break;
1284 0 : case FD_SNAPCT_STATE_READING_FULL_HTTP: full = 1; file = 0; break;
1285 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP: full = 0; file = 0; break;
1286 :
1287 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET:
1288 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET:
1289 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
1290 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
1291 : /* We are waiting for a reset to fully propagate through the
1292 : pipeline, just throw away any trailing data frags. */
1293 0 : return;
1294 :
1295 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI:
1296 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI:
1297 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI:
1298 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI:
1299 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE:
1300 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE:
1301 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE:
1302 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE:
1303 : /* Based on previously received data frags, we expected that the
1304 : current full / incremental snapshot was finished, but then we
1305 : received additional data frags. Unsafe to continue so throw
1306 : away the whole snapshot. */
1307 0 : if( !ctx->malformed ) {
1308 0 : ctx->malformed = 1;
1309 0 : FD_LOG_WARNING(( "complete snapshot loaded but read %lu extra bytes", sz ));
1310 0 : }
1311 0 : return;
1312 :
1313 0 : case FD_SNAPCT_STATE_WAITING_FOR_PEERS:
1314 0 : case FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL:
1315 0 : case FD_SNAPCT_STATE_COLLECTING_PEERS:
1316 0 : case FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL:
1317 0 : case FD_SNAPCT_STATE_SHUTDOWN:
1318 0 : default:
1319 0 : FD_LOG_ERR(( "invalid data frag in state %d", ctx->state ));
1320 0 : return;
1321 0 : }
1322 :
1323 0 : if( full ) FD_TEST( ctx->metrics.full.bytes_total !=0UL );
1324 0 : else FD_TEST( ctx->metrics.incremental.bytes_total!=0UL );
1325 :
1326 0 : if( full ) ctx->metrics.full.bytes_read += sz;
1327 0 : else ctx->metrics.incremental.bytes_read += sz;
1328 :
1329 0 : if( !file && -1!=ctx->local_out.dir_fd ) {
1330 0 : ulong written_sz = 0;
1331 0 : while( written_sz<sz ) {
1332 0 : uchar const * data = fd_chunk_to_laddr_const( ctx->snapld_in_mem, chunk );
1333 0 : int fd = full ? ctx->local_out.full_snapshot_fd : ctx->local_out.incremental_snapshot_fd;
1334 0 : long result = write( fd, data, sz );
1335 0 : if( FD_UNLIKELY( -1==result && errno==ENOSPC ) ) {
1336 0 : FD_LOG_ERR(( "Out of disk space when writing out snapshot data to `%s`", ctx->config.snapshots_path ));
1337 0 : } else if( FD_UNLIKELY( 0L>result ) ) {
1338 0 : FD_LOG_ERR(( "write() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
1339 0 : }
1340 :
1341 0 : written_sz += (ulong)result;
1342 0 : }
1343 0 : if( full ) ctx->metrics.full.bytes_written += sz;
1344 0 : else ctx->metrics.incremental.bytes_written += sz;
1345 0 : }
1346 :
1347 0 : if( FD_UNLIKELY( ( full && ctx->metrics.full.bytes_read > ctx->metrics.full.bytes_total ) ||
1348 0 : (!full && ctx->metrics.incremental.bytes_read > ctx->metrics.incremental.bytes_total ) ) ) {
1349 0 : if( !ctx->malformed ) {
1350 0 : ctx->malformed = 1;
1351 0 : FD_LOG_WARNING(( "expected %s snapshot size of %lu bytes but read %lu bytes",
1352 0 : full ? "full" : "incremental",
1353 0 : full ? ctx->metrics.full.bytes_total : ctx->metrics.incremental.bytes_total,
1354 0 : full ? ctx->metrics.full.bytes_read : ctx->metrics.incremental.bytes_read ));
1355 :
1356 0 : }
1357 0 : }
1358 0 : }
1359 :
1360 : static void
1361 : ctrl_ack_frag( fd_snapct_tile_t * ctx,
1362 0 : ulong sig ) {
1363 0 : switch( sig ) {
1364 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
1365 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_READING_FULL_HTTP ||
1366 0 : ctx->state==FD_SNAPCT_STATE_READING_FULL_FILE ) ) {
1367 0 : FD_TEST( !ctx->flush_ack );
1368 0 : ctx->flush_ack = 1;
1369 0 : } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
1370 0 : break;
1371 :
1372 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
1373 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP ||
1374 0 : ctx->state==FD_SNAPCT_STATE_READING_INCREMENTAL_FILE ) ) {
1375 0 : FD_TEST( !ctx->flush_ack );
1376 0 : ctx->flush_ack = 1;
1377 0 : } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
1378 0 : break;
1379 :
1380 0 : case FD_SNAPSHOT_MSG_CTRL_NEXT:
1381 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE ||
1382 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE ) ) {
1383 0 : FD_TEST( !ctx->flush_ack );
1384 0 : ctx->flush_ack = 1;
1385 0 : } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
1386 0 : break;
1387 :
1388 0 : case FD_SNAPSHOT_MSG_CTRL_DONE:
1389 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE ||
1390 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE ||
1391 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE ||
1392 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE ) ) {
1393 0 : FD_TEST( !ctx->flush_ack );
1394 0 : ctx->flush_ack = 1;
1395 0 : } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
1396 0 : break;
1397 :
1398 0 : case FD_SNAPSHOT_MSG_CTRL_FINI:
1399 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI ||
1400 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI ||
1401 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI ||
1402 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI ) ) {
1403 0 : FD_TEST( !ctx->flush_ack );
1404 0 : ctx->flush_ack = 1;
1405 0 : } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
1406 0 : break;
1407 :
1408 0 : case FD_SNAPSHOT_MSG_CTRL_FAIL:
1409 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET ||
1410 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ||
1411 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET ||
1412 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ) {
1413 0 : FD_TEST( !ctx->flush_ack );
1414 0 : ctx->flush_ack = 1;
1415 0 : } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
1416 0 : break;
1417 :
1418 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
1419 0 : break;
1420 :
1421 0 : case FD_SNAPSHOT_MSG_CTRL_ERROR:
1422 0 : switch( ctx->state ) {
1423 0 : case FD_SNAPCT_STATE_READING_FULL_FILE:
1424 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI:
1425 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE:
1426 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
1427 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI:
1428 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE:
1429 0 : case FD_SNAPCT_STATE_READING_FULL_HTTP:
1430 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI:
1431 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE:
1432 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP:
1433 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI:
1434 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE:
1435 0 : FD_LOG_WARNING(( "received error from downstream tile while in state %s",
1436 0 : fd_snapct_state_str( (ulong)ctx->state ) ));
1437 0 : ctx->malformed = 1;
1438 0 : ctx->flush_ack = 1;
1439 0 : break;
1440 0 : default:
1441 0 : break;
1442 0 : }
1443 0 : break;
1444 0 : }
1445 0 : }
1446 :
1447 : static int
1448 : returnable_frag( fd_snapct_tile_t * ctx,
1449 : ulong in_idx,
1450 : ulong seq FD_PARAM_UNUSED,
1451 : ulong sig,
1452 : ulong chunk,
1453 : ulong sz,
1454 : ulong ctl FD_PARAM_UNUSED,
1455 : ulong tsorig FD_PARAM_UNUSED,
1456 : ulong tspub FD_PARAM_UNUSED,
1457 0 : fd_stem_context_t * stem FD_PARAM_UNUSED ) {
1458 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) {
1459 0 : gossip_frag( ctx, sig, sz, chunk );
1460 0 : } else if( ctx->in_kind[ in_idx ]==IN_KIND_SNAPLD ) {
1461 0 : snapld_frag( ctx, sig, sz, chunk );
1462 0 : } else if( ctx->in_kind[ in_idx ]==IN_KIND_ACK ) {
1463 0 : ctrl_ack_frag( ctx, sig );
1464 0 : } else FD_LOG_ERR(( "invalid in_kind %lu %u", in_idx, (uint)ctx->in_kind[ in_idx ] ));
1465 0 : return 0;
1466 0 : }
1467 :
1468 : static void
1469 : privileged_init( fd_topo_t * topo,
1470 0 : fd_topo_tile_t * tile ) {
1471 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1472 :
1473 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1474 0 : fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
1475 0 : void * _ssping = FD_SCRATCH_ALLOC_APPEND( l, fd_ssping_align(), fd_ssping_footprint( TOTAL_PEERS_MAX ) );
1476 0 : FD_SCRATCH_ALLOC_APPEND( l, alignof(gossip_ci_entry_t), sizeof(gossip_ci_entry_t)*GOSSIP_PEERS_MAX );
1477 0 : FD_SCRATCH_ALLOC_APPEND( l, gossip_ci_map_align(), gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
1478 0 : void * _ssresolver = FD_SCRATCH_ALLOC_APPEND( l, fd_http_resolver_align(), fd_http_resolver_footprint( SERVER_PEERS_MAX ) );
1479 0 : FD_SCRATCH_ALLOC_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( TOTAL_PEERS_MAX ) );
1480 :
1481 0 : #if FD_HAS_OPENSSL
1482 0 : void * _alloc = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
1483 0 : fd_alloc_t * alloc = fd_alloc_join( fd_alloc_new( _alloc, 1UL ), tile->kind_id );
1484 0 : fd_ossl_tile_init( alloc );
1485 0 : #endif
1486 :
1487 0 : ctx->ssping = NULL;
1488 0 : if( FD_LIKELY( download_enabled( tile ) ) ) ctx->ssping = fd_ssping_join( fd_ssping_new( _ssping, TOTAL_PEERS_MAX, 1UL, on_ping, ctx ) );
1489 0 : if( FD_LIKELY( tile->snapct.sources.servers_cnt ) ) ctx->ssresolver = fd_http_resolver_join( fd_http_resolver_new( _ssresolver, SERVER_PEERS_MAX, tile->snapct.incremental_snapshots, on_resolve, ctx ) );
1490 0 : else ctx->ssresolver = NULL;
1491 :
1492 0 : fd_ssarchive_remove_old_snapshots( tile->snapct.snapshots_path,
1493 0 : tile->snapct.max_full_snapshots_to_keep,
1494 0 : tile->snapct.max_incremental_snapshots_to_keep );
1495 :
1496 0 : ulong full_slot = ULONG_MAX;
1497 0 : ulong incremental_slot = ULONG_MAX;
1498 0 : int full_is_zstd = 0;
1499 0 : int incremental_is_zstd = 0;
1500 0 : char full_path[ PATH_MAX ] = {0};
1501 0 : char incremental_path[ PATH_MAX ] = {0};
1502 0 : uchar full_snapshot_hash[ FD_HASH_FOOTPRINT ] = {0};
1503 0 : uchar incremental_snapshot_hash[ FD_HASH_FOOTPRINT ] = {0};
1504 0 : if( FD_UNLIKELY( -1==fd_ssarchive_latest_pair( tile->snapct.snapshots_path,
1505 0 : tile->snapct.incremental_snapshots,
1506 0 : &full_slot,
1507 0 : &incremental_slot,
1508 0 : full_path,
1509 0 : incremental_path,
1510 0 : &full_is_zstd,
1511 0 : &incremental_is_zstd,
1512 0 : full_snapshot_hash,
1513 0 : incremental_snapshot_hash ) ) ) {
1514 0 : if( FD_UNLIKELY( !download_enabled( tile ) ) ) {
1515 0 : FD_LOG_ERR(( "No snapshots found in `%s` and no download sources are enabled. "
1516 0 : "Please enable downloading via [snapshots.sources] and restart.", tile->snapct.snapshots_path ));
1517 0 : }
1518 0 : ctx->local_in.full_snapshot_slot = ULONG_MAX;
1519 0 : ctx->local_in.incremental_snapshot_slot = ULONG_MAX;
1520 0 : ctx->local_in.full_snapshot_size = 0UL;
1521 0 : ctx->local_in.incremental_snapshot_size = 0UL;
1522 0 : ctx->local_in.full_snapshot_zstd = 0;
1523 0 : ctx->local_in.incremental_snapshot_zstd = 0;
1524 0 : fd_cstr_fini( ctx->local_in.full_snapshot_path );
1525 0 : fd_cstr_fini( ctx->local_in.incremental_snapshot_path );
1526 0 : fd_memset( ctx->local_in.full_snapshot_hash, 0, FD_HASH_FOOTPRINT );
1527 0 : fd_memset( ctx->local_in.incremental_snapshot_hash, 0, FD_HASH_FOOTPRINT );
1528 0 : } else {
1529 0 : FD_TEST( full_slot!=ULONG_MAX );
1530 :
1531 0 : ctx->local_in.full_snapshot_slot = full_slot;
1532 0 : ctx->local_in.incremental_snapshot_slot = incremental_slot;
1533 0 : ctx->local_in.full_snapshot_zstd = full_is_zstd;
1534 0 : ctx->local_in.incremental_snapshot_zstd = incremental_is_zstd;
1535 :
1536 0 : fd_cstr_ncpy( ctx->local_in.full_snapshot_path, full_path, PATH_MAX );
1537 0 : fd_memcpy( ctx->local_in.full_snapshot_hash, full_snapshot_hash, FD_HASH_FOOTPRINT );
1538 0 : struct stat full_stat;
1539 0 : if( FD_UNLIKELY( -1==stat( ctx->local_in.full_snapshot_path, &full_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", full_path, errno, fd_io_strerror( errno ) ));
1540 0 : if( FD_UNLIKELY( !S_ISREG( full_stat.st_mode ) ) ) FD_LOG_ERR(( "full snapshot path `%s` is not a regular file", full_path ));
1541 0 : ctx->local_in.full_snapshot_size = (ulong)full_stat.st_size;
1542 :
1543 0 : if( FD_LIKELY( incremental_slot!=ULONG_MAX ) ) {
1544 0 : fd_cstr_ncpy( ctx->local_in.incremental_snapshot_path, incremental_path, PATH_MAX );
1545 0 : fd_memcpy( ctx->local_in.incremental_snapshot_hash, incremental_snapshot_hash, FD_HASH_FOOTPRINT );
1546 0 : struct stat incremental_stat;
1547 0 : if( FD_UNLIKELY( -1==stat( ctx->local_in.incremental_snapshot_path, &incremental_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", incremental_path, errno, fd_io_strerror( errno ) ));
1548 0 : if( FD_UNLIKELY( !S_ISREG( incremental_stat.st_mode ) ) ) FD_LOG_ERR(( "incremental snapshot path `%s` is not a regular file", incremental_path ));
1549 0 : ctx->local_in.incremental_snapshot_size = (ulong)incremental_stat.st_size;
1550 0 : } else {
1551 0 : ctx->local_in.incremental_snapshot_size = 0UL;
1552 0 : fd_cstr_fini( ctx->local_in.incremental_snapshot_path );
1553 0 : }
1554 0 : }
1555 :
1556 0 : ctx->local_out.dir_fd = -1;
1557 0 : ctx->local_out.full_snapshot_fd = -1;
1558 0 : ctx->local_out.incremental_snapshot_fd = -1;
1559 0 : if( FD_LIKELY( download_enabled( tile ) ) ) {
1560 0 : ctx->local_out.dir_fd = open( tile->snapct.snapshots_path, O_DIRECTORY|O_CLOEXEC );
1561 0 : if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) FD_LOG_ERR(( "open(%s) failed (%i-%s)", tile->snapct.snapshots_path, errno, fd_io_strerror( errno ) ));
1562 :
1563 0 : ctx->local_out.full_snapshot_fd = openat( ctx->local_out.dir_fd, TEMP_FULL_SNAP_NAME, O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
1564 0 : if( FD_UNLIKELY( -1==ctx->local_out.full_snapshot_fd ) ) FD_LOG_ERR(( "open(%s/%s) failed (%i-%s)", tile->snapct.snapshots_path, TEMP_FULL_SNAP_NAME, errno, fd_io_strerror( errno ) ));
1565 :
1566 0 : if( FD_LIKELY( tile->snapct.incremental_snapshots ) ) {
1567 0 : ctx->local_out.incremental_snapshot_fd = openat( ctx->local_out.dir_fd, TEMP_INCR_SNAP_NAME, O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
1568 0 : if( FD_UNLIKELY( -1==ctx->local_out.incremental_snapshot_fd ) ) FD_LOG_ERR(( "open(%s/%s) failed (%i-%s)", tile->snapct.snapshots_path, TEMP_INCR_SNAP_NAME, errno, fd_io_strerror( errno ) ));
1569 0 : }
1570 0 : }
1571 :
1572 0 : FD_TEST( fd_rng_secure( &ctx->selector_seed, 8UL ) );
1573 0 : }
1574 :
1575 : static inline fd_snapct_out_link_t
1576 : out1( fd_topo_t const * topo,
1577 : fd_topo_tile_t const * tile,
1578 0 : char const * name ) {
1579 0 : ulong idx = ULONG_MAX;
1580 :
1581 0 : for( ulong i=0UL; i<tile->out_cnt; i++ ) {
1582 0 : fd_topo_link_t const * link = &topo->links[ tile->out_link_id[ i ] ];
1583 0 : if( !strcmp( link->name, name ) ) {
1584 0 : if( FD_UNLIKELY( idx!=ULONG_MAX ) ) FD_LOG_ERR(( "tile %s:%lu had multiple output links named %s but expected one", tile->name, tile->kind_id, name ));
1585 0 : idx = i;
1586 0 : }
1587 0 : }
1588 :
1589 0 : if( FD_UNLIKELY( idx==ULONG_MAX ) ) return (fd_snapct_out_link_t){ .idx = ULONG_MAX, .mem = NULL, .chunk0 = 0, .wmark = 0, .chunk = 0, .mtu = 0 };
1590 :
1591 0 : ulong mtu = topo->links[ tile->out_link_id[ idx ] ].mtu;
1592 0 : if( FD_UNLIKELY( mtu==0UL ) ) return (fd_snapct_out_link_t){ .idx = idx, .mem = NULL, .chunk0 = ULONG_MAX, .wmark = ULONG_MAX, .chunk = ULONG_MAX, .mtu = mtu };
1593 :
1594 0 : void * mem = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ idx ] ].dcache_obj_id ].wksp_id ].wksp;
1595 0 : ulong chunk0 = fd_dcache_compact_chunk0( mem, topo->links[ tile->out_link_id[ idx ] ].dcache );
1596 0 : ulong wmark = fd_dcache_compact_wmark ( mem, topo->links[ tile->out_link_id[ idx ] ].dcache, mtu );
1597 0 : return (fd_snapct_out_link_t){ .idx = idx, .mem = mem, .chunk0 = chunk0, .wmark = wmark, .chunk = chunk0, .mtu = mtu };
1598 0 : }
1599 :
1600 : static void
1601 : unprivileged_init( fd_topo_t * topo,
1602 0 : fd_topo_tile_t * tile ) {
1603 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1604 :
1605 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1606 0 : fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
1607 0 : FD_SCRATCH_ALLOC_APPEND( l, fd_ssping_align(), fd_ssping_footprint( TOTAL_PEERS_MAX ) );
1608 0 : void * _ci_table = FD_SCRATCH_ALLOC_APPEND( l, alignof(gossip_ci_entry_t), sizeof(gossip_ci_entry_t) * GOSSIP_PEERS_MAX );
1609 0 : void * _ci_map = FD_SCRATCH_ALLOC_APPEND( l, gossip_ci_map_align(), gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
1610 0 : FD_SCRATCH_ALLOC_APPEND( l, fd_http_resolver_align(), fd_http_resolver_footprint( SERVER_PEERS_MAX ) );
1611 0 : void * _selector = FD_SCRATCH_ALLOC_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( TOTAL_PEERS_MAX ) );
1612 :
1613 0 : ctx->config = tile->snapct;
1614 0 : ctx->gossip_enabled = gossip_enabled( tile );
1615 0 : ctx->download_enabled = download_enabled( tile );
1616 :
1617 0 : ctx->selector = fd_sspeer_selector_join( fd_sspeer_selector_new( _selector, TOTAL_PEERS_MAX, ctx->config.incremental_snapshots, ctx->selector_seed ) );
1618 :
1619 0 : if( ctx->config.sources.servers_cnt ) {
1620 0 : for( ulong i=0UL; i<tile->snapct.sources.servers_cnt; i++ ) {
1621 : /* The peers needs to be added to resolver and to selector.
1622 : Only if this succeeds, add the peer to ssping list. */
1623 0 : if( FD_LIKELY( !fd_http_resolver_add( ctx->ssresolver,
1624 0 : tile->snapct.sources.servers[ i ].addr,
1625 0 : tile->snapct.sources.servers[ i ].hostname,
1626 0 : tile->snapct.sources.servers[ i ].is_https,
1627 0 : ctx->selector ) ) ) {
1628 0 : fd_ssping_add( ctx->ssping, tile->snapct.sources.servers[ i ].addr );
1629 0 : }
1630 0 : }
1631 0 : }
1632 :
1633 0 : if( FD_UNLIKELY( !ctx->config.incremental_snapshots ) ) {
1634 0 : FD_LOG_WARNING(( "incremental snapshots disabled via [snapshots.incremental_snapshots]." ));
1635 0 : }
1636 :
1637 0 : ctx->state = FD_SNAPCT_STATE_INIT;
1638 0 : ctx->malformed = 0;
1639 0 : ctx->deadline_nanos = fd_log_wallclock() + FD_SNAPCT_WAITING_FOR_PEERS_TIMEOUT;
1640 0 : ctx->flush_ack = 0;
1641 0 : ctx->peer.addr.l = 0UL;
1642 :
1643 0 : fd_memset( ctx->http_full_snapshot_name, 0, PATH_MAX );
1644 0 : fd_memset( ctx->http_incr_snapshot_name, 0, PATH_MAX );
1645 :
1646 0 : ctx->gossip_in_mem = NULL;
1647 0 : int has_snapld_dc = 0, has_ack_loopback = 0;
1648 0 : FD_TEST( tile->in_cnt<=MAX_IN_LINKS );
1649 0 : for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
1650 0 : fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
1651 0 : if( 0==strcmp( in_link->name, "gossip_out" ) ) {
1652 0 : ctx->in_kind[ i ] = IN_KIND_GOSSIP;
1653 0 : ctx->gossip_in_mem = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
1654 0 : } else if( 0==strcmp( in_link->name, "snapld_dc" ) ) {
1655 0 : ctx->in_kind[ i ] = IN_KIND_SNAPLD;
1656 0 : ctx->snapld_in_mem = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
1657 0 : FD_TEST( !has_snapld_dc );
1658 0 : has_snapld_dc = 1;
1659 0 : } else if( 0==strcmp( in_link->name, "snapin_ct" ) || 0==strcmp( in_link->name, "snapls_ct" ) ||
1660 0 : 0==strcmp( in_link->name, "snapwm_ct" ) || 0==strcmp( in_link->name, "snaplv_ct" ) ) {
1661 0 : ctx->in_kind[ i ] = IN_KIND_ACK;
1662 0 : FD_TEST( !has_ack_loopback );
1663 0 : has_ack_loopback = 1;
1664 0 : }
1665 0 : }
1666 0 : FD_TEST( has_snapld_dc && has_ack_loopback );
1667 0 : FD_TEST( ctx->gossip_enabled==(ctx->gossip_in_mem!=NULL) );
1668 :
1669 0 : ctx->predicted_incremental.full_slot = FD_SSPEER_SLOT_UNKNOWN;
1670 0 : ctx->predicted_incremental.slot = FD_SSPEER_SLOT_UNKNOWN;
1671 0 : ctx->predicted_incremental.pending = 0;
1672 :
1673 0 : fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
1674 :
1675 0 : fd_memset( _ci_table, 0, sizeof(gossip_ci_entry_t) * GOSSIP_PEERS_MAX );
1676 0 : ctx->gossip.ci_table = _ci_table;
1677 0 : ctx->gossip.ci_map = gossip_ci_map_join( gossip_ci_map_new( _ci_map, gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ), 0UL ) );
1678 0 : ctx->gossip.allowed_cnt = 0UL;
1679 0 : ctx->gossip.saturated = !ctx->gossip_enabled;
1680 :
1681 0 : if( FD_UNLIKELY( tile->out_cnt<2UL || tile->out_cnt>3UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2-3", tile->out_cnt ));
1682 0 : ctx->out_ld = out1( topo, tile, "snapct_ld" );
1683 0 : ctx->out_gui = out1( topo, tile, "snapct_gui" );
1684 0 : ctx->out_rp = out1( topo, tile, "snapct_repr" );
1685 0 : }
1686 :
1687 : /* after_credit can result in as many as 5 stem publishes in some code
1688 : paths, and returnable_frag can result in 1. */
1689 0 : #define STEM_BURST 6UL
1690 :
1691 0 : #define STEM_LAZY 1000L
1692 :
1693 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapct_tile_t
1694 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapct_tile_t)
1695 :
1696 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
1697 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
1698 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
1699 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
1700 :
1701 : #include "../../disco/stem/fd_stem.c"
1702 :
1703 : fd_topo_run_tile_t fd_tile_snapct = {
1704 : .name = NAME,
1705 : .rlimit_file_cnt_fn = rlimit_file_cnt,
1706 : .populate_allowed_seccomp = populate_allowed_seccomp,
1707 : .populate_allowed_fds = populate_allowed_fds,
1708 : .scratch_align = scratch_align,
1709 : .scratch_footprint = scratch_footprint,
1710 : .loose_footprint = loose_footprint,
1711 : .privileged_init = privileged_init,
1712 : .unprivileged_init = unprivileged_init,
1713 : .run = stem_run,
1714 : .keep_host_networking = 1,
1715 : .allow_connect = 1,
1716 : .allow_renameat = 1,
1717 : };
1718 :
1719 : #undef NAME
|