Line data Source code
1 : #define _GNU_SOURCE
2 : #include "fd_snapct_tile.h"
3 : #include "utils/fd_sspeer.h"
4 : #include "utils/fd_ssping.h"
5 : #include "utils/fd_ssctrl.h"
6 : #include "utils/fd_ssarchive.h"
7 : #include "utils/fd_http_resolver.h"
8 : #include "utils/fd_ssmsg.h"
9 :
10 : #include "../../disco/topo/fd_topo.h"
11 : #include "../../disco/metrics/fd_metrics.h"
12 : #include "../../flamenco/gossip/fd_gossip_message.h"
13 : #include "../../waltz/openssl/fd_openssl_tile.h"
14 :
15 : #include <errno.h>
16 : #include <stdio.h>
17 : #include <fcntl.h>
18 : #include <unistd.h>
19 : #include <sys/stat.h>
20 : #include <sys/syscall.h>
21 : #include <netinet/tcp.h>
22 : #include <netinet/in.h>
23 :
24 : #include "generated/fd_snapct_tile_seccomp.h"
25 :
26 : #define NAME "snapct"
27 :
28 : /* FIXME: Do a finishing pass over the default.toml config options / comments */
29 :
30 57 : #define GOSSIP_PEERS_MAX (FD_CONTACT_INFO_TABLE_SIZE)
31 27 : #define SERVER_PEERS_MAX (FD_TOPO_SNAPSHOTS_SERVERS_MAX_RESOLVED)
32 27 : #define TOTAL_PEERS_MAX (GOSSIP_PEERS_MAX + SERVER_PEERS_MAX)
33 :
34 0 : #define IN_KIND_ACK (0)
35 0 : #define IN_KIND_SNAPLD (1)
36 0 : #define IN_KIND_GOSSIP (2)
37 : #define MAX_IN_LINKS (4)
38 :
39 0 : #define TEMP_FULL_SNAP_NAME ".snapshot.tar.bz2-partial"
40 0 : #define TEMP_INCR_SNAP_NAME ".incremental-snapshot.tar.bz2-partial"
41 :
42 : struct fd_snapct_out_link {
43 : ulong idx;
44 : fd_wksp_t * mem;
45 : ulong chunk0;
46 : ulong wmark;
47 : ulong chunk;
48 : ulong mtu;
49 : };
50 : typedef struct fd_snapct_out_link fd_snapct_out_link_t;
51 :
52 0 : #define FD_SNAPCT_COLLECTING_PEERS_TIMEOUT (90L*1000L*1000L*1000L) /* 1.5 minutes */
53 :
54 : struct gossip_ci_entry {
55 : fd_pubkey_t pubkey;
56 : int allowed;
57 : fd_ip4_port_t rpc_addr;
58 : ulong map_next;
59 : };
60 : typedef struct gossip_ci_entry gossip_ci_entry_t;
61 :
62 : #define MAP_NAME gossip_ci_map
63 9 : #define MAP_KEY pubkey
64 : #define MAP_ELE_T gossip_ci_entry_t
65 : #define MAP_KEY_T fd_pubkey_t
66 12 : #define MAP_NEXT map_next
67 12 : #define MAP_KEY_EQ(k0,k1) fd_pubkey_eq( k0, k1 )
68 51 : #define MAP_KEY_HASH(key,seed) fd_hash( seed, key, sizeof(fd_pubkey_t) )
69 : #include "../../util/tmpl/fd_map_chain.c"
70 :
71 : /* Standalone blacklist keyed on fd_sspeer_key_t (peer identity).
72 : Keying on identity rather than network address is intentional:
73 : a peer's address can change freely, but its identity cannot.
74 : For gossip peers the identity is the pubkey; for URL peers it is
75 : the (hostname, resolved_addr) pair. Blacklisted peers are
76 : permanently banned for the bootstrap lifetime. The blacklist uses
77 : its own dedicated pool so entries are never evicted by pool
78 : pressure, unlike the ssping peer pool. */
79 :
80 : struct fd_sspeer_blacklist_entry {
81 : fd_sspeer_key_t key;
82 : ulong pool_next;
83 : ulong map_next;
84 : };
85 : typedef struct fd_sspeer_blacklist_entry fd_sspeer_blacklist_entry_t;
86 :
87 : #define POOL_NAME blacklist_pool
88 30 : #define POOL_T fd_sspeer_blacklist_entry_t
89 : #define POOL_IDX_T ulong
90 394008 : #define POOL_NEXT pool_next
91 : #include "../../util/tmpl/fd_pool.c"
92 :
93 : #define MAP_NAME blacklist_map
94 18 : #define MAP_KEY key
95 : #define MAP_ELE_T fd_sspeer_blacklist_entry_t
96 : #define MAP_KEY_T fd_sspeer_key_t
97 51 : #define MAP_NEXT map_next
98 39 : #define MAP_KEY_EQ(k0,k1) (fd_sspeer_key_eq(k0,k1))
99 63 : #define MAP_KEY_HASH(key,seed) (fd_sspeer_key_hash(key,seed))
100 : #include "../../util/tmpl/fd_map_chain.c"
101 :
102 : struct fd_snapct_tile {
103 : struct fd_topo_tile_snapct config;
104 : int gossip_enabled;
105 : int download_enabled;
106 :
107 : fd_ssping_t * ssping;
108 : fd_http_resolver_t * ssresolver;
109 : fd_sspeer_selector_t * selector;
110 : ulong selector_seed;
111 :
112 : fd_sspeer_blacklist_entry_t * blacklist_pool;
113 : blacklist_map_t * blacklist_map;
114 :
115 : int state;
116 : int malformed;
117 : int load_complete;
118 : long deadline_nanos;
119 : int flush_ack;
120 : int flush_ack_cnt;
121 : fd_sspeer_t peer;
122 :
123 : struct {
124 : int dir_fd;
125 : int full_snapshot_fd;
126 : int incremental_snapshot_fd;
127 : } local_out;
128 :
129 : char http_full_snapshot_name[ PATH_MAX ];
130 : char http_incr_snapshot_name[ PATH_MAX ];
131 :
132 : void const * gossip_in_mem;
133 : void const * snapld_in_mem;
134 : uchar in_kind[ MAX_IN_LINKS ];
135 :
136 : struct {
137 : ulong full_slot;
138 : ulong slot;
139 : int pending;
140 : } predicted_incremental;
141 :
142 : struct {
143 : ulong full_snapshot_slot;
144 : char full_snapshot_path[ PATH_MAX ];
145 : ulong full_snapshot_size;
146 : int full_snapshot_zstd;
147 :
148 : uchar full_snapshot_hash[ FD_HASH_FOOTPRINT ];
149 : uchar incremental_snapshot_hash[ FD_HASH_FOOTPRINT ];
150 :
151 : ulong incremental_snapshot_slot;
152 : char incremental_snapshot_path[ PATH_MAX ];
153 : ulong incremental_snapshot_size;
154 : int incremental_snapshot_zstd;
155 : } local_in;
156 :
157 : struct {
158 : struct {
159 : ulong bytes_read;
160 : ulong bytes_written;
161 : ulong bytes_total;
162 : uint num_retries;
163 : } full;
164 :
165 : struct {
166 : ulong bytes_read;
167 : ulong bytes_written;
168 : ulong bytes_total;
169 : uint num_retries;
170 : } incremental;
171 : } metrics;
172 :
173 : struct {
174 : gossip_ci_entry_t * ci_table; /* flat array of all gossip entries, allowed or not */
175 : gossip_ci_map_t * ci_map; /* map from pubkey to only allowed gossip entries */
176 : ulong allowed_cnt; /* number of allowed entries in ci_map */
177 : int saturated;
178 : } gossip;
179 :
180 : long snapshot_start_timestamp_ns;
181 :
182 : fd_snapct_out_link_t out_ld;
183 : fd_snapct_out_link_t out_gui;
184 : fd_snapct_out_link_t out_rp;
185 : };
186 : typedef struct fd_snapct_tile fd_snapct_tile_t;
187 :
188 : static int
189 0 : gossip_enabled( fd_topo_tile_t const * tile ) {
190 0 : return tile->snapct.sources.gossip.allow_any || tile->snapct.sources.gossip.allow_list_cnt>0UL;
191 0 : }
192 :
193 : static int
194 0 : download_enabled( fd_topo_tile_t const * tile ) {
195 0 : return gossip_enabled( tile ) || tile->snapct.sources.servers_cnt>0UL;
196 0 : }
197 :
198 : FD_FN_CONST static inline ulong
199 0 : loose_footprint( fd_topo_tile_t const * tile ) {
200 0 : (void)tile;
201 : /* Leftover space for OpenSSL allocations */
202 0 : return 1<<26UL; /* 64 MiB */
203 0 : }
204 :
205 : static ulong
206 90 : scratch_align( void ) {
207 90 : return fd_ulong_max( alignof(fd_snapct_tile_t),
208 90 : fd_ulong_max( fd_ssping_align(),
209 90 : fd_ulong_max( alignof(gossip_ci_entry_t),
210 90 : fd_ulong_max( gossip_ci_map_align(),
211 90 : fd_ulong_max( fd_http_resolver_align(),
212 90 : fd_ulong_max( fd_sspeer_selector_align(),
213 90 : fd_ulong_max( blacklist_pool_align(),
214 90 : blacklist_map_align() ) ) ) ) ) ) );
215 90 : }
216 :
217 : static ulong
218 30 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
219 30 : ulong l = FD_LAYOUT_INIT;
220 30 : l = FD_LAYOUT_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
221 30 : l = FD_LAYOUT_APPEND( l, fd_ssping_align(), fd_ssping_footprint( TOTAL_PEERS_MAX ) );
222 30 : l = FD_LAYOUT_APPEND( l, alignof(gossip_ci_entry_t), sizeof(gossip_ci_entry_t) * GOSSIP_PEERS_MAX );
223 30 : l = FD_LAYOUT_APPEND( l, gossip_ci_map_align(), gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
224 30 : l = FD_LAYOUT_APPEND( l, fd_http_resolver_align(), fd_http_resolver_footprint( SERVER_PEERS_MAX ) );
225 30 : l = FD_LAYOUT_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( TOTAL_PEERS_MAX ) );
226 30 : l = FD_LAYOUT_APPEND( l, blacklist_pool_align(), blacklist_pool_footprint( TOTAL_PEERS_MAX ) );
227 30 : l = FD_LAYOUT_APPEND( l, blacklist_map_align(), blacklist_map_footprint( blacklist_map_chain_cnt_est( TOTAL_PEERS_MAX ) ) );
228 30 : l = FD_LAYOUT_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
229 30 : return FD_LAYOUT_FINI( l, scratch_align() );
230 30 : }
231 :
232 : static inline int
233 0 : should_shutdown( fd_snapct_tile_t * ctx ) {
234 0 : return ctx->state==FD_SNAPCT_STATE_SHUTDOWN;
235 0 : }
236 :
237 : static void
238 0 : metrics_write( fd_snapct_tile_t * ctx ) {
239 0 : FD_MGAUGE_SET( SNAPCT, FULL_BYTES_READ, ctx->metrics.full.bytes_read );
240 0 : FD_MGAUGE_SET( SNAPCT, FULL_BYTES_WRITTEN, ctx->metrics.full.bytes_written );
241 0 : FD_MGAUGE_SET( SNAPCT, FULL_SIZE_BYTES, ctx->metrics.full.bytes_total );
242 0 : FD_MGAUGE_SET( SNAPCT, FULL_RETRY, ctx->metrics.full.num_retries );
243 :
244 0 : FD_MGAUGE_SET( SNAPCT, INCREMENTAL_BYTES_READ, ctx->metrics.incremental.bytes_read );
245 0 : FD_MGAUGE_SET( SNAPCT, INCREMENTAL_BYTES_WRITTEN, ctx->metrics.incremental.bytes_written );
246 0 : FD_MGAUGE_SET( SNAPCT, INCREMENTAL_SIZE_BYTES, ctx->metrics.incremental.bytes_total );
247 0 : FD_MGAUGE_SET( SNAPCT, INCREMENTAL_RETRY, ctx->metrics.incremental.num_retries );
248 :
249 0 : FD_MGAUGE_SET( SNAPCT, PREDICTED_SLOT, ctx->predicted_incremental.slot );
250 :
251 0 : #if FD_HAS_OPENSSL
252 0 : FD_MCNT_SET( SNAPCT, SSL_ALLOC_FAILED, fd_ossl_alloc_errors );
253 0 : #endif
254 :
255 0 : FD_MGAUGE_SET( SNAPCT, STATE, (ulong)ctx->state );
256 0 : }
257 :
258 : static void
259 : snapshot_path_gui_publish( fd_snapct_tile_t * ctx,
260 : fd_stem_context_t * stem,
261 : char const * path,
262 0 : int is_full ) {
263 : /* The messages below cannot be obtained directly from metrics. */
264 0 : fd_snapct_update_t * out = fd_chunk_to_laddr( ctx->out_gui.mem, ctx->out_gui.chunk );
265 0 : FD_TEST( fd_cstr_printf_check( out->read_path, PATH_MAX, NULL, "%s", path ) );
266 0 : out->is_download = 0;
267 0 : out->type = fd_int_if( is_full, FD_SNAPCT_SNAPSHOT_TYPE_FULL, FD_SNAPCT_SNAPSHOT_TYPE_INCREMENTAL );
268 0 : fd_stem_publish( stem, ctx->out_gui.idx, 0UL, ctx->out_gui.chunk, sizeof(fd_snapct_update_t) , 0UL, 0UL, 0UL );
269 0 : ctx->out_gui.chunk = fd_dcache_compact_next( ctx->out_gui.chunk, sizeof(fd_snapct_update_t), ctx->out_gui.chunk0, ctx->out_gui.wmark );
270 0 : }
271 :
272 : static void
273 0 : predict_incremental( fd_snapct_tile_t * ctx ) {
274 0 : if( FD_UNLIKELY( !ctx->config.incremental_snapshots ) ) return;
275 0 : if( FD_UNLIKELY( ctx->predicted_incremental.full_slot==FD_SSPEER_SLOT_UNKNOWN ) ) return;
276 :
277 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
278 :
279 0 : if( FD_LIKELY( best.addr.l ) ) {
280 0 : if( FD_UNLIKELY( ctx->predicted_incremental.slot!=best.incr_slot ) ) {
281 0 : ctx->predicted_incremental.slot = best.incr_slot;
282 0 : ctx->predicted_incremental.pending = 1;
283 0 : }
284 0 : }
285 0 : }
286 :
287 : static void
288 : on_resolve( void * _ctx,
289 : fd_sspeer_key_t const * key,
290 : fd_ip4_port_t addr,
291 : ulong full_slot,
292 : ulong incr_slot,
293 : uchar full_hash[ FD_HASH_FOOTPRINT ],
294 0 : uchar incr_hash[ FD_HASH_FOOTPRINT ] ) {
295 0 : fd_snapct_tile_t * ctx = (fd_snapct_tile_t *)_ctx;
296 :
297 0 : if( FD_UNLIKELY( full_slot!=FD_SSPEER_SLOT_UNKNOWN && full_slot>=FD_SSPEER_PLAUSIBLE_MAX_SLOT ) ) return;
298 0 : if( FD_UNLIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN && incr_slot>=FD_SSPEER_PLAUSIBLE_MAX_SLOT ) ) return;
299 :
300 : /* Do not update peers that have been permanently blacklisted. */
301 0 : if( FD_UNLIKELY( key && blacklist_map_ele_query( ctx->blacklist_map, key, NULL, ctx->blacklist_pool ) ) ) return;
302 : /* Do not re-add peers whose addr is temporarily banned by ssping. */
303 0 : if( FD_UNLIKELY( fd_ssping_is_invalidated( ctx->ssping, addr ) ) ) return;
304 :
305 : /* add() handles both new and existing peers, so peers removed from
306 : the selector during a previous blacklist, timeout, or failed
307 : re-resolve are re-added with the freshly resolved data. */
308 0 : ulong score = fd_sspeer_selector_add( ctx->selector, key, addr, FD_SSPEER_LATENCY_UNKNOWN,
309 0 : full_slot, incr_slot, full_hash, incr_hash );
310 0 : if( FD_UNLIKELY( score==FD_SSPEER_SCORE_INVALID ) ) {
311 0 : if( FD_UNLIKELY( key==NULL ) ) {
312 0 : FD_LOG_DEBUG(( "selector add on resolve failed for NULL peer key" ));
313 0 : } else {
314 0 : if( FD_UNLIKELY( !key->is_url ) ) {
315 0 : FD_BASE58_ENCODE_32_BYTES( key->pubkey->key, pubkey_b58 );
316 0 : FD_LOG_DEBUG(( "selector add on resolve failed for peer with pubkey %s", pubkey_b58 ));
317 0 : } else {
318 0 : FD_LOG_DEBUG(( "selector add on resolve failed for peer %s with addr " FD_IP4_ADDR_FMT ":%hu", key->url.hostname,
319 0 : FD_IP4_ADDR_FMT_ARGS( key->url.resolved_addr.addr ), fd_ushort_bswap( key->url.resolved_addr.port ) ));
320 0 : }
321 0 : }
322 0 : }
323 0 : fd_sspeer_selector_process_cluster_slot( ctx->selector );
324 0 : predict_incremental( ctx );
325 0 : }
326 :
327 : static void
328 : on_ping( void * _ctx,
329 : fd_ip4_port_t addr,
330 0 : ulong latency ) {
331 0 : fd_snapct_tile_t * ctx = (fd_snapct_tile_t *)_ctx;
332 :
333 0 : ulong cnt = fd_sspeer_selector_update_on_ping( ctx->selector, addr, latency );
334 0 : if( FD_UNLIKELY( !cnt ) ) {
335 : /* The update may fail in normal operation, e.g. after a peer has
336 : been removed from the selector. The log level is set to a
337 : minimum accordingly. */
338 0 : FD_LOG_DEBUG(( "selector update on ping did not find address " FD_IP4_ADDR_FMT ":%hu",
339 0 : FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ) ));
340 0 : }
341 0 : predict_incremental( ctx );
342 0 : }
343 :
344 : static void
345 : on_snapshot_hash( fd_snapct_tile_t * ctx,
346 : fd_sspeer_key_t const * key,
347 : fd_ip4_port_t addr,
348 0 : fd_gossip_update_message_t const * msg ) {
349 0 : ulong full_slot = msg->snapshot_hashes->full_slot;
350 0 : ulong incr_slot = FD_SSPEER_SLOT_UNKNOWN;
351 0 : uchar const * incr_hash = NULL;
352 :
353 0 : for( ulong i=0UL; i<msg->snapshot_hashes->incremental_len; i++ ) {
354 0 : if( FD_LIKELY( !incr_hash || msg->snapshot_hashes->incremental[ i ].slot>incr_slot ) ) {
355 0 : incr_slot = msg->snapshot_hashes->incremental[ i ].slot;
356 0 : incr_hash = msg->snapshot_hashes->incremental[ i ].hash;
357 0 : }
358 0 : }
359 :
360 0 : if( FD_UNLIKELY( full_slot>=FD_SSPEER_PLAUSIBLE_MAX_SLOT ) ) return;
361 0 : if( FD_UNLIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN && incr_slot>=FD_SSPEER_PLAUSIBLE_MAX_SLOT ) ) return;
362 :
363 0 : if( FD_UNLIKELY( !addr.l ) ) {
364 : /* A peer that does not advertise an rpc_addr cannot be added to
365 : the selector: if previously added, remove it. The remove
366 : operation becomes a no-op if the peer is not found. */
367 0 : fd_sspeer_selector_remove( ctx->selector, key );
368 0 : return;
369 0 : }
370 : /* Do not re-add peers that have been permanently blacklisted. */
371 0 : if( FD_UNLIKELY( blacklist_map_ele_query( ctx->blacklist_map, key, NULL, ctx->blacklist_pool ) ) ) return;
372 : /* Do not re-add peers whose addr is temporarily banned by ssping. */
373 0 : if( FD_UNLIKELY( fd_ssping_is_invalidated( ctx->ssping, addr ) ) ) return;
374 : /* The add may fail due to capacity/pool exhaustion. The cluster
375 : slot is recomputed from tracked peers only, so a failed add will
376 : not influence the cluster slot. */
377 0 : fd_sspeer_selector_add( ctx->selector, key, addr, FD_SSPEER_LATENCY_UNKNOWN,
378 0 : full_slot, incr_slot,
379 0 : msg->snapshot_hashes->full_hash, incr_hash );
380 0 : fd_sspeer_selector_process_cluster_slot( ctx->selector );
381 0 : predict_incremental( ctx );
382 0 : }
383 :
384 : static void
385 : send_expected_slot( fd_snapct_tile_t * ctx,
386 : fd_stem_context_t * stem,
387 0 : ulong slot ) {
388 0 : uint tsorig; uint tspub;
389 0 : fd_ssmsg_slot_to_frag( slot, &tsorig, &tspub );
390 0 : fd_stem_publish( stem, ctx->out_rp.idx, FD_SSMSG_EXPECTED_SLOT, 0UL, 0UL, 0UL, tsorig, tspub );
391 0 : }
392 :
393 : static void
394 0 : rename_full_snapshot( fd_snapct_tile_t * ctx ) {
395 0 : FD_TEST( -1!=ctx->local_out.dir_fd );
396 :
397 0 : if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd && ctx->http_full_snapshot_name[ 0 ]!='\0' ) ) {
398 0 : if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, TEMP_FULL_SNAP_NAME, ctx->local_out.dir_fd, ctx->http_full_snapshot_name ) ) )
399 0 : FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
400 0 : }
401 0 : }
402 :
403 : static void
404 0 : rename_incr_snapshot( fd_snapct_tile_t * ctx ) {
405 0 : FD_TEST( -1!=ctx->local_out.dir_fd );
406 :
407 0 : if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd && ctx->http_incr_snapshot_name[ 0 ]!='\0' ) ) {
408 0 : if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, TEMP_INCR_SNAP_NAME, ctx->local_out.dir_fd, ctx->http_incr_snapshot_name ) ) )
409 0 : FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
410 0 : }
411 0 : }
412 :
413 : static ulong
414 : rlimit_file_cnt( fd_topo_t const * topo FD_PARAM_UNUSED,
415 0 : fd_topo_tile_t const * tile ) {
416 0 : ulong cnt = 1UL + /* stderr */
417 0 : 1UL + /* logfile */
418 0 : 1UL; /* boot control pipe */
419 0 : if( download_enabled( tile ) ) {
420 0 : cnt += FD_SSPING_FD_CNT + /* ssping sockets */
421 0 : 2UL + /* dirfd + full snapshot download temp fd */
422 0 : tile->snapct.sources.servers_cnt; /* http resolver peer full sockets */
423 0 : if( tile->snapct.incremental_snapshots ) {
424 0 : cnt += 1UL + /* incr snapshot download temp fd */
425 0 : tile->snapct.sources.servers_cnt; /* http resolver peer incr sockets */
426 0 : }
427 0 : }
428 0 : return cnt;
429 0 : }
430 :
431 : static ulong
432 : populate_allowed_seccomp( fd_topo_t const * topo,
433 : fd_topo_tile_t const * tile,
434 : ulong out_cnt,
435 0 : struct sock_filter * out ) {
436 :
437 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
438 :
439 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
440 0 : fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
441 :
442 0 : int min_ping_fd = INT_MAX;
443 0 : int max_ping_fd = 0;
444 0 : if( download_enabled( tile ) ) {
445 0 : min_ping_fd = FD_SSPING_FD_MIN;
446 0 : max_ping_fd = FD_SSPING_FD_MIN + (int)FD_SSPING_FD_CNT - 1;
447 0 : }
448 :
449 0 : populate_sock_filter_policy_fd_snapct_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->local_out.dir_fd, (uint)ctx->local_out.full_snapshot_fd, (uint)ctx->local_out.incremental_snapshot_fd, (uint)min_ping_fd, (uint)max_ping_fd );
450 0 : return sock_filter_policy_fd_snapct_tile_instr_cnt;
451 0 : }
452 :
453 : static ulong
454 : populate_allowed_fds( fd_topo_t const * topo,
455 : fd_topo_tile_t const * tile,
456 : ulong out_fds_cnt,
457 0 : int * out_fds ) {
458 0 : if( FD_UNLIKELY( out_fds_cnt<5UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu is too small", out_fds_cnt ));
459 :
460 0 : ulong out_cnt = 0;
461 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
462 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
463 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
464 0 : }
465 :
466 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
467 :
468 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
469 0 : fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
470 0 : if( FD_LIKELY( -1!=ctx->local_out.dir_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.dir_fd;
471 0 : if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.full_snapshot_fd;
472 0 : if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.incremental_snapshot_fd;
473 0 : if( FD_LIKELY( download_enabled( tile ) ) ) {
474 0 : if( FD_UNLIKELY( out_cnt+FD_SSPING_FD_CNT > out_fds_cnt ) ) {
475 0 : FD_LOG_ERR(( "out_fds_cnt %lu must be at least %lu", out_fds_cnt, out_cnt + FD_SSPING_FD_CNT ));
476 0 : }
477 0 : for( int i=FD_SSPING_FD_MIN; i<FD_SSPING_FD_MIN+(int)FD_SSPING_FD_CNT; i++ ) {
478 0 : out_fds[ out_cnt++ ] = i;
479 0 : }
480 0 : }
481 :
482 0 : return out_cnt;
483 0 : }
484 :
485 : static void
486 : init_load( fd_snapct_tile_t * ctx,
487 : fd_stem_context_t * stem,
488 : int full,
489 0 : int file ) {
490 0 : ctx->snapshot_start_timestamp_ns = fd_log_wallclock();
491 0 : fd_ssctrl_init_t * out = fd_chunk_to_laddr( ctx->out_ld.mem, ctx->out_ld.chunk );
492 0 : out->file = file;
493 0 : out->zstd = !file || (full ? ctx->local_in.full_snapshot_zstd : ctx->local_in.incremental_snapshot_zstd);
494 0 : if( file ) {
495 0 : out->slot = full ? ctx->local_in.full_snapshot_slot : ctx->local_in.incremental_snapshot_slot;
496 0 : if( full ) fd_memcpy( out->snapshot_hash, ctx->local_in.full_snapshot_hash, FD_HASH_FOOTPRINT );
497 0 : else fd_memcpy( out->snapshot_hash, ctx->local_in.incremental_snapshot_hash, FD_HASH_FOOTPRINT );
498 0 : } else {
499 0 : out->slot = full ? ctx->predicted_incremental.full_slot : ctx->predicted_incremental.slot;
500 0 : if( full ) fd_memcpy( out->snapshot_hash, ctx->peer.full_hash, FD_HASH_FOOTPRINT );
501 0 : else fd_memcpy( out->snapshot_hash, ctx->peer.incr_hash, FD_HASH_FOOTPRINT );
502 0 : }
503 :
504 0 : if( !file ) {
505 0 : out->addr = ctx->peer.addr;
506 0 : char encoded_hash[ FD_BASE58_ENCODED_32_SZ ];
507 0 : if( full ) {
508 0 : fd_base58_encode_32( ctx->peer.full_hash, NULL, encoded_hash );
509 0 : FD_TEST( fd_cstr_printf_check( out->path, PATH_MAX, &out->path_len, "/snapshot-%lu-%s.tar.zst", ctx->peer.full_slot, encoded_hash ) );
510 0 : FD_TEST( fd_cstr_printf_check( ctx->http_full_snapshot_name, PATH_MAX, NULL, "snapshot-%lu-%s.tar.zst", ctx->peer.full_slot, encoded_hash ) );
511 0 : } else {
512 0 : fd_base58_encode_32( ctx->peer.incr_hash, NULL, encoded_hash );
513 0 : FD_TEST( fd_cstr_printf_check( out->path, PATH_MAX, &out->path_len, "/incremental-snapshot-%lu-%lu-%s.tar.zst", ctx->peer.full_slot, ctx->peer.incr_slot, encoded_hash ) );
514 0 : FD_TEST( fd_cstr_printf_check( ctx->http_incr_snapshot_name, PATH_MAX, NULL, "incremental-snapshot-%lu-%lu-%s.tar.zst", ctx->peer.full_slot, ctx->peer.incr_slot, encoded_hash ) );
515 0 : }
516 :
517 0 : out->is_https = 0; /* if not found in the config list, it's not https */
518 0 : out->hostname[0] = '\0'; /* .. and it doesn't have a hostname either. */
519 0 : for( ulong i=0UL; i<SERVER_PEERS_MAX; i++ ) {
520 0 : if( FD_UNLIKELY( ctx->peer.addr.l==ctx->config.sources.servers[ i ].addr.l ) ) {
521 0 : fd_cstr_ncpy( out->hostname, ctx->config.sources.servers[ i ].hostname, sizeof(out->hostname) );
522 0 : out->is_https = ctx->config.sources.servers[ i ].is_https;
523 0 : break;
524 0 : }
525 0 : }
526 0 : }
527 0 : fd_stem_publish( stem, ctx->out_ld.idx, full ? FD_SNAPSHOT_MSG_CTRL_INIT_FULL : FD_SNAPSHOT_MSG_CTRL_INIT_INCR, ctx->out_ld.chunk, sizeof(fd_ssctrl_init_t), 0UL, 0UL, 0UL );
528 0 : ctx->out_ld.chunk = fd_dcache_compact_next( ctx->out_ld.chunk, sizeof(fd_ssctrl_init_t), ctx->out_ld.chunk0, ctx->out_ld.wmark );
529 0 : ctx->flush_ack = 0;
530 0 : ctx->load_complete = 0;
531 :
532 : /* If we are downloading the snapshot, we will get the snapshot size
533 : in bytes from a metadata message sent from snapld. */
534 0 : if( file ) {
535 0 : if( full ) ctx->metrics.full.bytes_total = ctx->local_in.full_snapshot_size;
536 0 : else ctx->metrics.incremental.bytes_total = ctx->local_in.incremental_snapshot_size;
537 0 : }
538 :
539 0 : if( !file ) {
540 0 : if( full ) {
541 : /* reset any written content in the full output snapshot */
542 0 : if( FD_UNLIKELY( -1==ftruncate( ctx->local_out.full_snapshot_fd, 0UL ) ) ) {
543 0 : FD_LOG_ERR(( "ftruncate(%s) failed (%i-%s)", ctx->http_full_snapshot_name, errno, fd_io_strerror( errno ) ));
544 0 : }
545 0 : if( FD_UNLIKELY( -1==lseek( ctx->local_out.full_snapshot_fd, 0L, SEEK_SET ) ) ) {
546 0 : FD_LOG_ERR(( "lseek(%s) failed (%i-%s)", ctx->http_full_snapshot_name, errno, fd_io_strerror( errno ) ));
547 0 : }
548 0 : } else {
549 : /* reset any written content in the incremental snapshot output
550 : file */
551 0 : if( FD_UNLIKELY( -1==ftruncate( ctx->local_out.incremental_snapshot_fd, 0UL ) ) ) {
552 0 : FD_LOG_ERR(( "ftruncate(%s) failed (%i-%s)", ctx->http_incr_snapshot_name, errno, fd_io_strerror( errno ) ));
553 0 : }
554 0 : if( FD_UNLIKELY( -1==lseek( ctx->local_out.incremental_snapshot_fd, 0L, SEEK_SET ) ) ) {
555 0 : FD_LOG_ERR(( "lseek(%s) failed (%i-%s)", ctx->http_incr_snapshot_name, errno, fd_io_strerror( errno ) ));
556 0 : }
557 0 : }
558 0 : }
559 :
560 : /* Regardless of whether we load the snapshot from a file or download
561 : it, we know the name of the snapshot and can publish it to the gui
562 : here. */
563 0 : if( full ) {
564 0 : if( FD_LIKELY( !!ctx->out_gui.mem ) ) {
565 0 : if( file ) {
566 0 : fd_cstr_fini( ctx->http_full_snapshot_name );
567 0 : snapshot_path_gui_publish( ctx, stem, ctx->local_in.full_snapshot_path, 1 );
568 0 : }
569 0 : else {
570 0 : char snapshot_path[ PATH_MAX+30UL ]; /* 30 is fd_cstr_nlen( "https://255.255.255.255:65536/", ULONG_MAX ) */
571 0 : FD_TEST( fd_cstr_printf_check( snapshot_path, sizeof(snapshot_path), NULL, "http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_full_snapshot_name ) );
572 0 : snapshot_path_gui_publish( ctx, stem, snapshot_path, 1 );
573 0 : }
574 0 : }
575 0 : } else {
576 0 : if( FD_LIKELY( !!ctx->out_gui.mem ) ) {
577 0 : if( file ) {
578 0 : fd_cstr_fini( ctx->http_incr_snapshot_name );
579 0 : snapshot_path_gui_publish( ctx, stem, ctx->local_in.incremental_snapshot_path, 0 );
580 0 : } else {
581 0 : char snapshot_path[ PATH_MAX+30UL ]; /* 30 is fd_cstr_nlen( "https://255.255.255.255:65536/", ULONG_MAX ) */
582 0 : FD_TEST( fd_cstr_printf_check( snapshot_path, sizeof(snapshot_path), NULL, "http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_incr_snapshot_name ) );
583 0 : snapshot_path_gui_publish( ctx, stem, snapshot_path, 0 );
584 0 : }
585 0 : }
586 0 : }
587 0 : }
588 :
589 : static void
590 : log_download( fd_snapct_tile_t * ctx,
591 : int full,
592 : fd_ip4_port_t addr,
593 0 : ulong slot ) {
594 0 : for( gossip_ci_map_iter_t iter = gossip_ci_map_iter_init( ctx->gossip.ci_map, ctx->gossip.ci_table );
595 0 : !gossip_ci_map_iter_done( iter, ctx->gossip.ci_map, ctx->gossip.ci_table );
596 0 : iter = gossip_ci_map_iter_next( iter, ctx->gossip.ci_map, ctx->gossip.ci_table ) ) {
597 0 : gossip_ci_entry_t const * ci_entry = gossip_ci_map_iter_ele_const( iter, ctx->gossip.ci_map, ctx->gossip.ci_table );
598 0 : if( ci_entry->rpc_addr.l==addr.l ) {
599 0 : FD_TEST( ci_entry->allowed );
600 0 : FD_BASE58_ENCODE_32_BYTES( ci_entry->pubkey.uc, pubkey_b58 );
601 0 : FD_LOG_NOTICE(( "downloading %s snapshot at slot %lu from allowed gossip peer %s at http://" FD_IP4_ADDR_FMT ":%hu/%s",
602 0 : full ? "full" : "incremental", slot, pubkey_b58,
603 0 : FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ),
604 0 : full ? ctx->http_full_snapshot_name : ctx->http_incr_snapshot_name ));
605 0 : return;
606 0 : }
607 0 : }
608 :
609 0 : for( ulong i=0UL; i<ctx->config.sources.servers_cnt; i++ ) {
610 0 : if( addr.l==ctx->config.sources.servers[ i ].addr.l ) {
611 0 : if( ctx->config.sources.servers[ i ].is_https ) {
612 0 : FD_LOG_NOTICE(( "downloading %s snapshot at slot %lu from configured server with index %lu at https://%s:%hu/%s",
613 0 : full ? "full" : "incremental", slot, i,
614 0 : ctx->config.sources.servers[ i ].hostname, fd_ushort_bswap( addr.port ),
615 0 : full ? ctx->http_full_snapshot_name : ctx->http_incr_snapshot_name ));
616 0 : } else {
617 0 : FD_LOG_NOTICE(( "downloading %s snapshot at slot %lu from configured server with index %lu at http://" FD_IP4_ADDR_FMT ":%hu/%s",
618 0 : full ? "full" : "incremental", slot, i,
619 0 : FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ),
620 0 : full ? ctx->http_full_snapshot_name : ctx->http_incr_snapshot_name ));
621 0 : }
622 0 : return;
623 0 : }
624 0 : }
625 :
626 0 : FD_TEST( 0 ); /* should not be possible */
627 0 : }
628 :
629 : static void
630 : log_completion( fd_snapct_tile_t * ctx,
631 0 : int full ) {
632 0 : double elapsed = (double)(fd_log_wallclock() - ctx->snapshot_start_timestamp_ns) / 1e9;
633 0 : FD_LOG_INFO(( "%s snapshot load completed in %.3f seconds", full ? "full" : "incremental", elapsed ));
634 0 : }
635 :
636 : /* Blacklist the current peer: invalidate in ssping, remove from the
637 : selector, and add to the dedicated blacklist map. Skips the insert
638 : if the peer is already blacklisted (dedup). Warns if the pool is
639 : full (the ssping ban still provides temporary protection). */
640 : static void
641 24 : blacklist_peer( fd_snapct_tile_t * ctx ) {
642 24 : fd_ssping_invalidate( ctx->ssping, ctx->peer.addr, fd_log_wallclock() );
643 24 : fd_sspeer_selector_remove_by_addr( ctx->selector, ctx->peer.addr );
644 24 : fd_sspeer_selector_process_cluster_slot( ctx->selector );
645 24 : if( FD_UNLIKELY( blacklist_map_ele_query( ctx->blacklist_map, &ctx->peer.key, NULL, ctx->blacklist_pool ) ) ) return;
646 21 : if( FD_LIKELY( blacklist_pool_free( ctx->blacklist_pool ) ) ) {
647 18 : fd_sspeer_blacklist_entry_t * bl = blacklist_pool_ele_acquire( ctx->blacklist_pool );
648 18 : bl->key = ctx->peer.key;
649 18 : blacklist_map_ele_insert( ctx->blacklist_map, bl, ctx->blacklist_pool );
650 18 : if( FD_UNLIKELY( !ctx->peer.key.is_url ) ) {
651 18 : FD_BASE58_ENCODE_32_BYTES( ctx->peer.key.pubkey->uc, pubkey_b58 );
652 18 : FD_LOG_WARNING(( "permanently blacklisted peer identity %s", pubkey_b58 ));
653 18 : } else {
654 0 : FD_LOG_WARNING(( "permanently blacklisted peer identity %s",
655 0 : ctx->peer.key.url.hostname[0] ? ctx->peer.key.url.hostname : "(none)" ));
656 0 : }
657 18 : } else {
658 3 : FD_LOG_WARNING(( "blacklist pool full, peer banned via ssping only" ));
659 3 : }
660 21 : }
661 :
662 : static void
663 : after_credit( fd_snapct_tile_t * ctx,
664 : fd_stem_context_t * stem,
665 : int * opt_poll_in FD_PARAM_UNUSED,
666 0 : int * charge_busy FD_PARAM_UNUSED ) {
667 0 : long now = fd_log_wallclock();
668 :
669 0 : if( FD_LIKELY( ctx->ssping ) ) fd_ssping_advance( ctx->ssping, now, ctx->selector );
670 0 : if( FD_LIKELY( ctx->ssresolver ) ) fd_http_resolver_advance( ctx->ssresolver, now, ctx->selector );
671 :
672 : /* Advances above may remove peers, making cluster_slot dirty.
673 : Recompute so best() calls below use up-to-date scores.
674 : No-op when the dirty flag is not set internally. */
675 0 : fd_sspeer_selector_process_cluster_slot( ctx->selector );
676 :
677 : /* send an expected slot message as the predicted incremental
678 : could have changed as a result of the pinger, resolver, or from
679 : processing gossip frags in gossip_frag. */
680 0 : if( FD_LIKELY( ctx->predicted_incremental.pending ) ) {
681 0 : send_expected_slot( ctx, stem, ctx->predicted_incremental.slot );
682 0 : ctx->predicted_incremental.pending = 0;
683 0 : }
684 :
685 : /* Note: All state transitions should occur within this switch
686 : statement to make it easier to reason about the state management. */
687 :
688 0 : switch ( ctx->state ) {
689 :
690 : /* ============================================================== */
691 0 : case FD_SNAPCT_STATE_INIT: {
692 0 : if( FD_UNLIKELY( !ctx->download_enabled ) ) {
693 0 : ulong local_slot = ctx->config.incremental_snapshots ? ctx->local_in.incremental_snapshot_slot : ctx->local_in.full_snapshot_slot;
694 0 : send_expected_slot( ctx, stem, local_slot );
695 0 : FD_LOG_NOTICE(( "reading full snapshot at slot %lu from local file `%s`", ctx->local_in.full_snapshot_slot, ctx->local_in.full_snapshot_path ));
696 0 : ctx->predicted_incremental.full_slot = ctx->local_in.full_snapshot_slot;
697 0 : ctx->state = FD_SNAPCT_STATE_READING_FULL_FILE;
698 0 : init_load( ctx, stem, 1, 1 );
699 0 : break;
700 0 : }
701 0 : ctx->deadline_nanos = now+ctx->config.wait_for_peers_timeout_nanos;
702 0 : ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS;
703 0 : break;
704 0 : }
705 :
706 : /* ============================================================== */
707 0 : case FD_SNAPCT_STATE_WAITING_FOR_PEERS: {
708 0 : if( FD_UNLIKELY( now>ctx->deadline_nanos ) ) FD_LOG_ERR(( "timed out waiting for peers." ));
709 :
710 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, FD_SSPEER_SLOT_UNKNOWN );
711 0 : if( FD_LIKELY( best.addr.l ) ) {
712 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS;
713 0 : ctx->deadline_nanos = now+FD_SNAPCT_COLLECTING_PEERS_TIMEOUT;
714 0 : }
715 0 : break;
716 0 : }
717 :
718 : /* ============================================================== */
719 0 : case FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL: {
720 0 : if( FD_UNLIKELY( now>ctx->deadline_nanos ) ) FD_LOG_ERR(( "timed out waiting for incremental snapshot peers." ));
721 :
722 0 : FD_TEST( ctx->predicted_incremental.full_slot!=FD_SSPEER_SLOT_UNKNOWN );
723 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
724 0 : if( FD_LIKELY( best.addr.l ) ) {
725 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
726 0 : ctx->deadline_nanos = now;
727 0 : }
728 0 : break;
729 0 : }
730 :
731 : /* ============================================================== */
732 0 : case FD_SNAPCT_STATE_COLLECTING_PEERS: {
733 0 : if( FD_UNLIKELY( !ctx->gossip.saturated && now<ctx->deadline_nanos ) ) break;
734 :
735 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, FD_SSPEER_SLOT_UNKNOWN );
736 0 : if( FD_UNLIKELY( !best.addr.l ) ) {
737 0 : if( !ctx->gossip_enabled ) {
738 0 : FD_LOG_ERR(( "no peers are available and discovery of new peers via gossip is disabled. aborting." ));
739 0 : }
740 0 : ctx->deadline_nanos = now + ctx->config.wait_for_peers_timeout_nanos;
741 0 : ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS;
742 0 : break;
743 0 : }
744 :
745 0 : fd_sscluster_slot_t cluster = fd_sspeer_selector_cluster_slot( ctx->selector );
746 0 : if( FD_UNLIKELY( cluster.incremental==FD_SSPEER_SLOT_UNKNOWN && ctx->config.incremental_snapshots ) ) {
747 : /* We must have a cluster full slot to be in this state. */
748 0 : FD_TEST( cluster.full!=FD_SSPEER_SLOT_UNKNOWN );
749 : /* fall back to full snapshot only if the highest cluster slot
750 : is a full snapshot only */
751 0 : FD_LOG_WARNING(( "incremental snapshots were enabled via [snapshots.incremental_snapshots], but no incremental snapshot is available in the cluster. "
752 0 : "falling back to full snapshots only." ));
753 0 : ctx->config.incremental_snapshots = 0;
754 0 : }
755 :
756 0 : ulong cluster_slot = ctx->config.incremental_snapshots ? cluster.incremental : cluster.full;
757 :
758 : /* Determine the best effective slot achievable using the local
759 : full snapshot. When incrementals are disabled, the effective
760 : slot is the full snapshot slot itself. When enabled, it is the
761 : best incremental we can pair with the local full (either from
762 : a local file or downloaded from a peer). */
763 :
764 0 : ulong local_effective_slot = ULONG_MAX;
765 0 : if( FD_LIKELY( ctx->local_in.full_snapshot_slot!=ULONG_MAX ) ) {
766 0 : if( FD_LIKELY( ctx->config.incremental_snapshots ) ) {
767 0 : ulong local_incr = ctx->local_in.incremental_snapshot_slot;
768 0 : if( local_incr!=ULONG_MAX && local_incr>=fd_ulong_sat_sub( cluster_slot, ctx->config.sources.max_local_incremental_age ) ) {
769 0 : local_effective_slot = local_incr;
770 0 : } else {
771 0 : fd_sspeer_t best_incr = fd_sspeer_selector_best( ctx->selector, 1, ctx->local_in.full_snapshot_slot );
772 0 : if( FD_LIKELY( best_incr.addr.l ) ) {
773 0 : ctx->predicted_incremental.slot = best_incr.incr_slot;
774 0 : ctx->local_in.incremental_snapshot_slot = ULONG_MAX; /* don't use the local incremental */
775 0 : local_effective_slot = best_incr.incr_slot;
776 0 : }
777 0 : }
778 0 : } else {
779 0 : local_effective_slot = ctx->local_in.full_snapshot_slot;
780 0 : }
781 0 : }
782 :
783 0 : int can_use_local_full = local_effective_slot!=ULONG_MAX &&
784 0 : local_effective_slot>=fd_ulong_sat_sub( cluster_slot, ctx->config.sources.max_local_full_effective_age );
785 0 : if( FD_LIKELY( can_use_local_full ) ) {
786 0 : send_expected_slot( ctx, stem, local_effective_slot );
787 :
788 0 : FD_LOG_NOTICE(( "reading full snapshot at slot %lu with cluster slot %lu from local file `%s`",
789 0 : ctx->local_in.full_snapshot_slot, cluster_slot, ctx->local_in.full_snapshot_path ));
790 0 : ctx->predicted_incremental.full_slot = ctx->local_in.full_snapshot_slot;
791 0 : ctx->state = FD_SNAPCT_STATE_READING_FULL_FILE;
792 0 : init_load( ctx, stem, 1, 1 );
793 0 : } else {
794 0 : if( FD_LIKELY( ctx->local_in.full_snapshot_slot!=ULONG_MAX ) ) {
795 0 : if( local_effective_slot==ULONG_MAX ) {
796 0 : if( ctx->local_in.incremental_snapshot_slot!=ULONG_MAX ) {
797 0 : FD_LOG_NOTICE(( "local full snapshot at slot %lu cannot be used because local incremental snapshot at slot %lu "
798 0 : "is too old and no downloadable incremental could be found (cluster slot %lu), downloading instead",
799 0 : ctx->local_in.full_snapshot_slot, ctx->local_in.incremental_snapshot_slot, cluster_slot ));
800 0 : } else {
801 0 : FD_LOG_NOTICE(( "local full snapshot at slot %lu cannot be used because no matching incremental snapshot "
802 0 : "could be found (cluster slot %lu), downloading instead",
803 0 : ctx->local_in.full_snapshot_slot, cluster_slot ));
804 0 : }
805 0 : } else {
806 0 : FD_LOG_NOTICE(( "local full snapshot at slot %lu (effective slot %lu) is too old for cluster slot %lu max age %u, downloading instead",
807 0 : ctx->local_in.full_snapshot_slot, local_effective_slot, cluster_slot, ctx->config.sources.max_local_full_effective_age ));
808 0 : }
809 0 : } else {
810 0 : FD_LOG_INFO(( "no local snapshot available, downloading from peer" ));
811 0 : }
812 :
813 0 : if( FD_UNLIKELY( !ctx->config.incremental_snapshots ) ) {
814 0 : send_expected_slot( ctx, stem, best.full_slot );
815 0 : } else {
816 0 : fd_sspeer_t best_incremental = fd_sspeer_selector_best( ctx->selector, 1, best.full_slot );
817 0 : if( FD_LIKELY( best_incremental.addr.l ) ) {
818 0 : ctx->predicted_incremental.slot = best_incremental.incr_slot;
819 0 : send_expected_slot( ctx, stem, best_incremental.incr_slot );
820 0 : }
821 0 : }
822 :
823 0 : ctx->peer = best;
824 0 : ctx->state = FD_SNAPCT_STATE_READING_FULL_HTTP;
825 0 : ctx->predicted_incremental.full_slot = best.full_slot;
826 0 : init_load( ctx, stem, 1, 0 );
827 0 : log_download( ctx, 1, best.addr, best.full_slot );
828 0 : }
829 0 : break;
830 0 : }
831 :
832 : /* ============================================================== */
833 0 : case FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL: {
834 0 : if( FD_UNLIKELY( now<ctx->deadline_nanos ) ) break;
835 :
836 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
837 0 : if( FD_UNLIKELY( !best.addr.l ) ) {
838 0 : if( !ctx->gossip_enabled ) {
839 0 : FD_LOG_ERR(( "no incremental snapshot peers are available and discovery of new peers via gossip is disabled. aborting." ));
840 0 : }
841 0 : ctx->deadline_nanos = now + ctx->config.wait_for_peers_timeout_nanos;
842 0 : ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL;
843 0 : break;
844 0 : }
845 :
846 : /* decide whether to use the local incremental snapshot if one
847 : exists and is not too old, otherwise download a new incremental
848 : snapshot. */
849 0 : ulong cluster_slot = fd_sspeer_selector_cluster_slot( ctx->selector ).incremental;
850 0 : ulong local_slot = ctx->local_in.incremental_snapshot_slot;
851 0 : int local_too_old = local_slot<fd_ulong_sat_sub( cluster_slot, ctx->config.sources.max_local_incremental_age );
852 0 : if( FD_LIKELY( local_slot!=ULONG_MAX && !local_too_old ) ) {
853 0 : ctx->predicted_incremental.slot = local_slot;
854 0 : send_expected_slot( ctx, stem, local_slot );
855 :
856 0 : FD_LOG_NOTICE(( "reading incremental snapshot at slot %lu from local file `%s`", ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
857 0 : ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_FILE;
858 0 : init_load( ctx, stem, 0, 1 );
859 0 : } else {
860 0 : ctx->predicted_incremental.slot = best.incr_slot;
861 0 : send_expected_slot( ctx, stem, best.incr_slot );
862 :
863 0 : ctx->peer = best;
864 0 : ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP;
865 0 : init_load( ctx, stem, 0, 0 );
866 0 : log_download( ctx, 0, best.addr, best.incr_slot );
867 0 : }
868 0 : break;
869 0 : }
870 :
871 : /* ============================================================== */
872 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI:
873 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
874 0 : ctx->malformed = 0;
875 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
876 0 : ctx->flush_ack = 0;
877 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET;
878 0 : FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from local file `%s`",
879 0 : ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
880 0 : break;
881 0 : }
882 :
883 0 : if( ctx->flush_ack < ctx->flush_ack_cnt ) break;
884 :
885 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE;
886 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
887 0 : ctx->flush_ack = 0;
888 0 : break;
889 :
890 : /* ============================================================== */
891 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE:
892 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
893 0 : ctx->malformed = 0;
894 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
895 0 : ctx->flush_ack = 0;
896 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET;
897 0 : FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from local file `%s`",
898 0 : ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
899 0 : break;
900 0 : }
901 :
902 0 : if( ctx->flush_ack < ctx->flush_ack_cnt ) break;
903 :
904 0 : log_completion( ctx, 0/*incr*/ );
905 0 : ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
906 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
907 0 : break;
908 :
909 : /* ============================================================== */
910 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI:
911 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
912 0 : ctx->malformed = 0;
913 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
914 0 : ctx->flush_ack = 0;
915 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
916 0 : FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
917 0 : "blacklisting peer due to download failure.",
918 0 : ctx->predicted_incremental.slot,
919 0 : FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_incr_snapshot_name ));
920 0 : blacklist_peer( ctx );
921 0 : break;
922 0 : }
923 :
924 0 : if( ctx->flush_ack < ctx->flush_ack_cnt ) break;
925 :
926 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE;
927 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
928 0 : ctx->flush_ack = 0;
929 0 : break;
930 :
931 : /* ============================================================== */
932 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE:
933 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
934 0 : ctx->malformed = 0;
935 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
936 0 : ctx->flush_ack = 0;
937 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
938 0 : FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
939 0 : "blacklisting peer due to download failure.",
940 0 : ctx->predicted_incremental.slot,
941 0 : FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_incr_snapshot_name ));
942 0 : blacklist_peer( ctx );
943 0 : break;
944 0 : }
945 :
946 0 : if( ctx->flush_ack < ctx->flush_ack_cnt ) break;
947 :
948 0 : log_completion( ctx, 0/*incr*/ );
949 0 : ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
950 0 : rename_incr_snapshot( ctx );
951 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
952 0 : break;
953 :
954 : /* ============================================================== */
955 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI:
956 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
957 0 : ctx->malformed = 0;
958 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
959 0 : ctx->flush_ack = 0;
960 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET;
961 0 : FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from local file `%s`",
962 0 : ctx->local_in.full_snapshot_slot, ctx->local_in.full_snapshot_path ));
963 0 : break;
964 0 : }
965 :
966 0 : if( ctx->flush_ack < ctx->flush_ack_cnt ) break;
967 :
968 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE;
969 0 : ulong sig = ctx->config.incremental_snapshots &&
970 0 : (ctx->local_in.incremental_snapshot_slot!=ULONG_MAX || ctx->download_enabled) ? FD_SNAPSHOT_MSG_CTRL_NEXT : FD_SNAPSHOT_MSG_CTRL_DONE;
971 0 : if( sig==FD_SNAPSHOT_MSG_CTRL_DONE && ctx->config.incremental_snapshots ) {
972 : /* set incremental snapshots to 0 if there is no local
973 : incremental snapshot and download is not enabled. */
974 0 : FD_LOG_INFO(( "incremental snapshots were enabled via [snapshots.incremental_snapshots] "
975 0 : "but no incremental snapshot exists on disk and no snapshot peers are configured. "
976 0 : "skipping incremental snapshot load." ));
977 0 : ctx->config.incremental_snapshots = 0;
978 0 : }
979 0 : fd_stem_publish( stem, ctx->out_ld.idx, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
980 0 : ctx->flush_ack = 0;
981 0 : break;
982 :
983 : /* ============================================================== */
984 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE:
985 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
986 0 : ctx->malformed = 0;
987 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
988 0 : ctx->flush_ack = 0;
989 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET;
990 0 : FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from local file `%s`",
991 0 : ctx->local_in.full_snapshot_slot, ctx->local_in.full_snapshot_path ));
992 0 : break;
993 0 : }
994 :
995 0 : if( ctx->flush_ack < ctx->flush_ack_cnt ) break;
996 :
997 0 : log_completion( ctx, 1/*full*/ );
998 0 : if( FD_LIKELY( !ctx->config.incremental_snapshots ) ) {
999 0 : ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
1000 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
1001 0 : break;
1002 0 : }
1003 :
1004 0 : if( FD_LIKELY( ctx->download_enabled ) ) {
1005 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
1006 0 : ctx->deadline_nanos = 0L;
1007 0 : } else {
1008 0 : FD_LOG_NOTICE(( "reading incremental snapshot at slot %lu from local file `%s`", ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
1009 0 : ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_FILE;
1010 0 : init_load( ctx, stem, 0, 1 );
1011 0 : }
1012 0 : break;
1013 :
1014 : /* ============================================================== */
1015 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI:
1016 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
1017 0 : ctx->malformed = 0;
1018 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
1019 0 : ctx->flush_ack = 0;
1020 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
1021 0 : FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
1022 0 : "blacklisting peer due to download failure.",
1023 0 : ctx->predicted_incremental.full_slot,
1024 0 : FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_full_snapshot_name ));
1025 0 : blacklist_peer( ctx );
1026 0 : break;
1027 0 : }
1028 :
1029 0 : if( ctx->flush_ack < ctx->flush_ack_cnt ) break;
1030 :
1031 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE;
1032 0 : fd_stem_publish( stem, ctx->out_ld.idx, ctx->config.incremental_snapshots ? FD_SNAPSHOT_MSG_CTRL_NEXT : FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
1033 0 : ctx->flush_ack = 0;
1034 0 : break;
1035 :
1036 : /* ============================================================== */
1037 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE:
1038 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
1039 0 : ctx->malformed = 0;
1040 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
1041 0 : ctx->flush_ack = 0;
1042 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
1043 0 : FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
1044 0 : "blacklisting peer due to download failure.",
1045 0 : ctx->predicted_incremental.full_slot,
1046 0 : FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_full_snapshot_name ));
1047 0 : blacklist_peer( ctx );
1048 0 : break;
1049 0 : }
1050 :
1051 0 : if( ctx->flush_ack < ctx->flush_ack_cnt ) break;
1052 :
1053 0 : rename_full_snapshot( ctx );
1054 :
1055 0 : log_completion( ctx, 1/*full*/ );
1056 0 : if( FD_LIKELY( !ctx->config.incremental_snapshots ) ) {
1057 0 : ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
1058 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
1059 0 : break;
1060 0 : }
1061 :
1062 : /* Get the best incremental peer to download from */
1063 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
1064 0 : if( FD_UNLIKELY( !best.addr.l ) ) {
1065 0 : ctx->deadline_nanos = now;
1066 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
1067 0 : break;
1068 0 : }
1069 :
1070 0 : ctx->predicted_incremental.slot = best.incr_slot;
1071 0 : send_expected_slot( ctx, stem, best.incr_slot );
1072 :
1073 0 : ctx->peer = best;
1074 0 : ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP;
1075 0 : init_load( ctx, stem, 0, 0 );
1076 0 : log_download( ctx, 0, best.addr, best.incr_slot );
1077 0 : break;
1078 :
1079 : /* ============================================================== */
1080 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
1081 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET:
1082 0 : if( FD_UNLIKELY( ctx->flush_ack<ctx->flush_ack_cnt ) ) break;
1083 :
1084 0 : if( ctx->metrics.full.num_retries==ctx->config.max_retry_abort ) {
1085 0 : FD_LOG_ERR(( "hit retry limit of %u for full snapshot, aborting", ctx->config.max_retry_abort ));
1086 0 : }
1087 :
1088 0 : ctx->metrics.full.num_retries++;
1089 0 : FD_LOG_NOTICE(( "retrying full snapshot download (attempt %u/%u)",
1090 0 : ctx->metrics.full.num_retries, ctx->config.max_retry_abort ));
1091 :
1092 0 : ctx->metrics.full.bytes_read = 0UL;
1093 0 : ctx->metrics.full.bytes_written = 0UL;
1094 0 : ctx->metrics.full.bytes_total = 0UL;
1095 :
1096 0 : ctx->metrics.incremental.bytes_read = 0UL;
1097 0 : ctx->metrics.incremental.bytes_written = 0UL;
1098 0 : ctx->metrics.incremental.bytes_total = 0UL;
1099 :
1100 0 : if( !ctx->download_enabled ) {
1101 : /* if we are unable to download new snapshots and unable to load
1102 : our local snapshot, we must shutdown the validator. */
1103 0 : FD_LOG_ERR(( "unable to load local snapshot %s and no snapshot peers were configured. aborting.", ctx->local_in.full_snapshot_path ));
1104 0 : } else {
1105 0 : if( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ) ctx->local_in.full_snapshot_slot = ULONG_MAX;
1106 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS;
1107 0 : ctx->deadline_nanos = 0L;
1108 0 : }
1109 0 : break;
1110 :
1111 : /* ============================================================== */
1112 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET:
1113 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
1114 0 : if( FD_UNLIKELY( ctx->flush_ack<ctx->flush_ack_cnt ) ) break;
1115 :
1116 0 : if( ctx->metrics.incremental.num_retries==ctx->config.max_retry_abort ) {
1117 0 : FD_LOG_ERR(("hit retry limit of %u for incremental snapshot. aborting", ctx->config.max_retry_abort ));
1118 0 : }
1119 :
1120 0 : ctx->metrics.incremental.num_retries++;
1121 0 : FD_LOG_NOTICE(( "retrying incremental snapshot download (attempt %u/%u)",
1122 0 : ctx->metrics.incremental.num_retries, ctx->config.max_retry_abort ));
1123 :
1124 0 : ctx->metrics.incremental.bytes_read = 0UL;
1125 0 : ctx->metrics.incremental.bytes_written = 0UL;
1126 0 : ctx->metrics.incremental.bytes_total = 0UL;
1127 :
1128 0 : if( !ctx->download_enabled ) {
1129 : /* if we are unable to download new snapshots and unable to load
1130 : our local snapshot, we must shutdown the validator. */
1131 0 : FD_LOG_ERR(( "unable to load local snapshot %s and no snapshot peers were configured. aborting.", ctx->local_in.incremental_snapshot_path ));
1132 0 : } else {
1133 0 : if( ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ctx->local_in.incremental_snapshot_slot = ULONG_MAX;
1134 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
1135 0 : ctx->deadline_nanos = 0L;
1136 0 : }
1137 0 : break;
1138 :
1139 : /* ============================================================== */
1140 0 : case FD_SNAPCT_STATE_READING_FULL_FILE:
1141 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
1142 0 : ctx->malformed = 0;
1143 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
1144 0 : ctx->flush_ack = 0;
1145 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET;
1146 0 : FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from local file `%s`",
1147 0 : ctx->local_in.full_snapshot_slot, ctx->local_in.full_snapshot_path ));
1148 0 : break;
1149 0 : }
1150 0 : if( FD_UNLIKELY( ctx->flush_ack < ctx->flush_ack_cnt ) ) break;
1151 0 : if( FD_UNLIKELY( ctx->load_complete ) ) {
1152 0 : ctx->load_complete = 0;
1153 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FINI, 0UL, 0UL, 0UL, 0UL, 0UL );
1154 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI;
1155 0 : ctx->flush_ack = 0;
1156 0 : }
1157 0 : break;
1158 :
1159 : /* ============================================================== */
1160 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
1161 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
1162 0 : ctx->malformed = 0;
1163 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
1164 0 : ctx->flush_ack = 0;
1165 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET;
1166 0 : FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from local file `%s`",
1167 0 : ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
1168 0 : break;
1169 0 : }
1170 0 : if( FD_UNLIKELY( ctx->flush_ack < ctx->flush_ack_cnt ) ) break;
1171 0 : if( FD_UNLIKELY( ctx->load_complete ) ) {
1172 0 : ctx->load_complete = 0;
1173 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FINI, 0UL, 0UL, 0UL, 0UL, 0UL );
1174 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI;
1175 0 : ctx->flush_ack = 0;
1176 0 : }
1177 0 : break;
1178 :
1179 : /* ============================================================== */
1180 0 : case FD_SNAPCT_STATE_READING_FULL_HTTP:
1181 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
1182 0 : ctx->malformed = 0;
1183 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
1184 0 : ctx->flush_ack = 0;
1185 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
1186 0 : FD_LOG_WARNING(( "failed to load full snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
1187 0 : "blacklisting peer due to download failure",
1188 0 : ctx->predicted_incremental.full_slot,
1189 0 : FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_full_snapshot_name ));
1190 0 : blacklist_peer( ctx );
1191 0 : break;
1192 0 : }
1193 0 : if( FD_UNLIKELY( ctx->flush_ack < ctx->flush_ack_cnt ) ) break;
1194 0 : if( FD_UNLIKELY( ctx->load_complete ) ) {
1195 0 : ctx->load_complete = 0;
1196 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FINI, 0UL, 0UL, 0UL, 0UL, 0UL );
1197 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI;
1198 0 : ctx->flush_ack = 0;
1199 0 : }
1200 0 : break;
1201 :
1202 : /* ============================================================== */
1203 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP:
1204 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
1205 0 : ctx->malformed = 0;
1206 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
1207 0 : ctx->flush_ack = 0;
1208 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
1209 0 : FD_LOG_WARNING(( "failed to load incremental snapshot at slot %lu from http://" FD_IP4_ADDR_FMT ":%hu/%s. "
1210 0 : "blacklisting peer due to download failure",
1211 0 : ctx->predicted_incremental.slot,
1212 0 : FD_IP4_ADDR_FMT_ARGS( ctx->peer.addr.addr ), fd_ushort_bswap( ctx->peer.addr.port ), ctx->http_incr_snapshot_name ));
1213 0 : blacklist_peer( ctx );
1214 0 : break;
1215 0 : }
1216 0 : if( FD_UNLIKELY( ctx->flush_ack < ctx->flush_ack_cnt ) ) break;
1217 0 : if( FD_UNLIKELY( ctx->load_complete ) ) {
1218 0 : ctx->load_complete = 0;
1219 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FINI, 0UL, 0UL, 0UL, 0UL, 0UL );
1220 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI;
1221 0 : ctx->flush_ack = 0;
1222 0 : }
1223 0 : break;
1224 :
1225 : /* ============================================================== */
1226 0 : case FD_SNAPCT_STATE_SHUTDOWN:
1227 : /* Transitioning to the shutdown state indicates snapshot load is
1228 : completed without errors. Otherwise, snapct would have aborted
1229 : earlier. */
1230 0 : break;
1231 :
1232 : /* ============================================================== */
1233 0 : default: FD_LOG_ERR(( "unexpected state %s", fd_snapct_state_str( ctx->state ) ));
1234 0 : }
1235 0 : }
1236 :
1237 : static void
1238 : gossip_frag( fd_snapct_tile_t * ctx,
1239 : ulong sig,
1240 : ulong sz FD_PARAM_UNUSED,
1241 27 : ulong chunk ) {
1242 27 : FD_TEST( ctx->gossip_enabled );
1243 :
1244 27 : if( FD_UNLIKELY( sig==FD_GOSSIP_UPDATE_TAG_PEER_SATURATED ) ) {
1245 0 : ctx->gossip.saturated = 1;
1246 0 : return;
1247 0 : }
1248 :
1249 27 : if( !( sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO ||
1250 27 : sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE ||
1251 27 : sig==FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES ) ) return;
1252 :
1253 27 : fd_gossip_update_message_t const * msg = fd_chunk_to_laddr_const( ctx->gossip_in_mem, chunk );
1254 27 : switch( msg->tag ) {
1255 21 : case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO: {
1256 21 : FD_TEST( msg->contact_info->idx<GOSSIP_PEERS_MAX );
1257 21 : fd_pubkey_t const * pubkey = (fd_pubkey_t const *)msg->origin;
1258 21 : gossip_ci_entry_t * entry = ctx->gossip.ci_table + msg->contact_info->idx;
1259 21 : if( FD_UNLIKELY( !fd_pubkey_eq( &entry->pubkey, pubkey ) ) ) {
1260 : /* Initialize the new gossip entry, which may or may not be allowed */
1261 18 : FD_TEST( fd_pubkey_check_zero( &entry->pubkey ) );
1262 18 : entry->pubkey = *pubkey;
1263 18 : entry->rpc_addr.l = 0UL;
1264 18 : if( ctx->config.sources.gossip.allow_any ) {
1265 9 : entry->allowed = 1;
1266 9 : for( ulong i=0UL; i<ctx->config.sources.gossip.block_list_cnt; i++ ) {
1267 3 : if( fd_pubkey_eq( pubkey, &ctx->config.sources.gossip.block_list[ i ] ) ) {
1268 3 : entry->allowed = 0;
1269 3 : break;
1270 3 : }
1271 3 : }
1272 9 : } else {
1273 9 : entry->allowed = 0;
1274 9 : for( ulong i=0UL; i<ctx->config.sources.gossip.allow_list_cnt; i++ ) {
1275 3 : if( fd_pubkey_eq( pubkey, &ctx->config.sources.gossip.allow_list[ i ] ) ) {
1276 3 : entry->allowed = 1;
1277 3 : break;
1278 3 : }
1279 3 : }
1280 9 : }
1281 18 : FD_TEST( ULONG_MAX==gossip_ci_map_idx_query_const( ctx->gossip.ci_map, pubkey, ULONG_MAX, ctx->gossip.ci_table ) );
1282 18 : if( entry->allowed ) {
1283 9 : gossip_ci_map_idx_insert( ctx->gossip.ci_map, msg->contact_info->idx, ctx->gossip.ci_table );
1284 9 : ctx->gossip.allowed_cnt++;
1285 : /* Allow-list shortcut: if an explicit allow list is
1286 : configured and all expected peers have arrived, declare
1287 : saturation immediately without waiting for the gossip
1288 : tile's general saturation signal. */
1289 9 : if( FD_UNLIKELY( !ctx->config.sources.gossip.allow_any &&
1290 9 : ctx->config.sources.gossip.allow_list_cnt>0UL &&
1291 9 : ctx->gossip.allowed_cnt==ctx->config.sources.gossip.allow_list_cnt ) ) {
1292 3 : FD_LOG_NOTICE(( "all %lu allowed gossip peers discovered", ctx->config.sources.gossip.allow_list_cnt ));
1293 3 : ctx->gossip.saturated = 1;
1294 3 : }
1295 9 : }
1296 18 : }
1297 21 : if( !entry->allowed ) break;
1298 : /* Maybe update the RPC address of a new or existing allowed gossip peer */
1299 12 : fd_ip4_port_t cur_addr = entry->rpc_addr;
1300 12 : fd_ip4_port_t new_addr;
1301 12 : new_addr.addr = msg->contact_info->value->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_RPC ].is_ipv6 ? 0 : msg->contact_info->value->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_RPC ].ip4;
1302 12 : new_addr.port = msg->contact_info->value->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_RPC ].port;
1303 :
1304 12 : if( FD_UNLIKELY( new_addr.l!=cur_addr.l ) ) {
1305 0 : fd_sspeer_key_t entry_key = {0};
1306 0 : *entry_key.pubkey = entry->pubkey;
1307 0 : entry_key.is_url = 0;
1308 0 : if( FD_LIKELY( !!new_addr.l ) ) {
1309 : /* Do not re-add peers that have been permanently blacklisted
1310 : or whose new address is temporarily banned by ssping.
1311 : Bookkeeping (ssping cleanup, rpc_addr update) still runs
1312 : so the CI table and ssping pool stay consistent. */
1313 0 : if( FD_LIKELY( !blacklist_map_ele_query( ctx->blacklist_map, &entry_key, NULL, ctx->blacklist_pool ) &&
1314 0 : !fd_ssping_is_invalidated( ctx->ssping, new_addr ) ) ) {
1315 0 : if( FD_LIKELY( FD_SSPEER_SCORE_INVALID!=fd_sspeer_selector_add( ctx->selector, &entry_key, new_addr,
1316 0 : FD_SSPEER_LATENCY_UNKNOWN, FD_SSPEER_SLOT_UNKNOWN,
1317 0 : FD_SSPEER_SLOT_UNKNOWN, NULL, NULL ) ) ) {
1318 0 : fd_ssping_add( ctx->ssping, new_addr );
1319 0 : }
1320 0 : }
1321 0 : } else {
1322 0 : fd_sspeer_selector_remove( ctx->selector, &entry_key );
1323 0 : }
1324 0 : if( FD_LIKELY( !!cur_addr.l ) ) {
1325 0 : fd_ssping_remove( ctx->ssping, cur_addr );
1326 0 : }
1327 0 : entry->rpc_addr = new_addr;
1328 0 : if( !ctx->config.sources.gossip.allow_any ) {
1329 0 : FD_BASE58_ENCODE_32_BYTES( pubkey->uc, pubkey_b58 );
1330 0 : if( FD_LIKELY( !!new_addr.l ) ) {
1331 0 : FD_LOG_NOTICE(( "allowed gossip peer added with public key `%s` and RPC address `" FD_IP4_ADDR_FMT ":%hu`",
1332 0 : pubkey_b58, FD_IP4_ADDR_FMT_ARGS( new_addr.addr ), fd_ushort_bswap( new_addr.port ) ));
1333 0 : } else {
1334 0 : FD_LOG_WARNING(( "allowed gossip peer with public key `%s` does not advertise an RPC address", pubkey_b58 ));
1335 0 : }
1336 0 : }
1337 0 : }
1338 12 : break;
1339 21 : }
1340 6 : case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE: {
1341 6 : FD_TEST( msg->contact_info_remove->idx<GOSSIP_PEERS_MAX );
1342 6 : gossip_ci_entry_t * entry = ctx->gossip.ci_table + msg->contact_info_remove->idx;
1343 6 : ulong rem_idx = gossip_ci_map_idx_remove( ctx->gossip.ci_map, &entry->pubkey, ULONG_MAX, ctx->gossip.ci_table );
1344 6 : if( rem_idx!=ULONG_MAX ) {
1345 3 : FD_TEST( entry->allowed && rem_idx==msg->contact_info_remove->idx );
1346 3 : ctx->gossip.allowed_cnt--;
1347 3 : fd_ip4_port_t addr = entry->rpc_addr;
1348 3 : if( FD_LIKELY( !!addr.l ) ) {
1349 0 : fd_ssping_remove( ctx->ssping, addr );
1350 0 : fd_sspeer_key_t entry_key = {0};
1351 0 : *entry_key.pubkey = entry->pubkey;
1352 0 : entry_key.is_url = 0;
1353 0 : fd_sspeer_selector_remove( ctx->selector, &entry_key );
1354 0 : }
1355 3 : if( !ctx->config.sources.gossip.allow_any ) {
1356 0 : FD_BASE58_ENCODE_32_BYTES( entry->pubkey.uc, pubkey_b58 );
1357 0 : FD_LOG_WARNING(( "allowed gossip peer removed with public key `%s` and RPC address `" FD_IP4_ADDR_FMT ":%hu`",
1358 0 : pubkey_b58, FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ) ));
1359 0 : }
1360 3 : }
1361 6 : fd_memset( entry, 0, sizeof(*entry) );
1362 6 : break;
1363 6 : }
1364 0 : case FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES: {
1365 0 : ulong idx = gossip_ci_map_idx_query_const( ctx->gossip.ci_map, (fd_pubkey_t const *)msg->origin, ULONG_MAX, ctx->gossip.ci_table );
1366 0 : if( FD_LIKELY( idx!=ULONG_MAX ) ) {
1367 0 : gossip_ci_entry_t * entry = ctx->gossip.ci_table + idx;
1368 0 : FD_TEST( entry->allowed );
1369 0 : fd_sspeer_key_t entry_key = {0};
1370 0 : *entry_key.pubkey = entry->pubkey;
1371 0 : entry_key.is_url = 0;
1372 0 : on_snapshot_hash( ctx, &entry_key, entry->rpc_addr, msg );
1373 0 : }
1374 0 : break;
1375 0 : }
1376 0 : default:
1377 0 : FD_LOG_ERR(( "snapct: unexpected gossip tag %u", (uint)msg->tag ));
1378 0 : break;
1379 27 : }
1380 27 : }
1381 :
1382 : /* Validate and handle a pipeline control ack for INIT_{FULL,INCR},
1383 : NEXT, DONE, and FINI. Returns 0 on success, and -1 if the sig was
1384 : unrecognized or the state was unexpected. */
1385 : static int
1386 : process_ctrl_ack( fd_snapct_tile_t * ctx,
1387 0 : ulong sig ) {
1388 0 : switch( sig ) {
1389 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
1390 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_READING_FULL_HTTP ||
1391 0 : ctx->state==FD_SNAPCT_STATE_READING_FULL_FILE ) ) {
1392 0 : ctx->flush_ack++;
1393 0 : FD_TEST( ctx->flush_ack <= ctx->flush_ack_cnt );
1394 0 : } else if( FD_UNLIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET ||
1395 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ) ) {
1396 : /* Safe to ignore -- stale ack from before RESET. */
1397 0 : } else return -1;
1398 0 : return 0;
1399 :
1400 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
1401 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP ||
1402 0 : ctx->state==FD_SNAPCT_STATE_READING_INCREMENTAL_FILE ) ) {
1403 0 : ctx->flush_ack++;
1404 0 : FD_TEST( ctx->flush_ack <= ctx->flush_ack_cnt );
1405 0 : } else if( FD_UNLIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET ||
1406 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ) {
1407 : /* Safe to ignore -- stale ack from before RESET. */
1408 0 : } else return -1;
1409 0 : return 0;
1410 :
1411 0 : case FD_SNAPSHOT_MSG_CTRL_NEXT:
1412 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE ||
1413 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE ) ) {
1414 0 : ctx->flush_ack++;
1415 0 : FD_TEST( ctx->flush_ack <= ctx->flush_ack_cnt );
1416 0 : } else if( FD_UNLIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET ||
1417 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ) ) {
1418 : /* Safe to ignore -- stale ack from before RESET. */
1419 0 : } else return -1;
1420 0 : return 0;
1421 :
1422 0 : case FD_SNAPSHOT_MSG_CTRL_DONE:
1423 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE ||
1424 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE ||
1425 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE ||
1426 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE ) ) {
1427 0 : ctx->flush_ack++;
1428 0 : FD_TEST( ctx->flush_ack <= ctx->flush_ack_cnt );
1429 0 : } else if( FD_UNLIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET ||
1430 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ||
1431 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET ||
1432 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ) {
1433 : /* Safe to ignore -- stale ack from before RESET. */
1434 0 : } else return -1;
1435 0 : return 0;
1436 :
1437 0 : case FD_SNAPSHOT_MSG_CTRL_FINI:
1438 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI ||
1439 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI ||
1440 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI ||
1441 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI ) ) {
1442 0 : ctx->flush_ack++;
1443 0 : FD_TEST( ctx->flush_ack <= ctx->flush_ack_cnt );
1444 0 : } else if( FD_UNLIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET ||
1445 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ||
1446 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET ||
1447 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ) {
1448 : /* Safe to ignore -- stale ack from before RESET. */
1449 0 : } else return -1;
1450 0 : return 0;
1451 :
1452 0 : default:
1453 0 : return -1;
1454 0 : }
1455 0 : }
1456 :
1457 : static void
1458 : snapld_frag( fd_snapct_tile_t * ctx,
1459 : ulong sig,
1460 : ulong sz,
1461 : ulong chunk,
1462 15 : fd_stem_context_t * stem ) {
1463 15 : if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_META ) ) {
1464 : /* Before snapld starts sending down data fragments, it first sends
1465 : a metadata message containing the total size of the snapshot as
1466 : well as the filename. This is only done for HTTP loading. */
1467 0 : int full;
1468 0 : switch( ctx->state ) {
1469 0 : case FD_SNAPCT_STATE_READING_FULL_HTTP: full = 1; break;
1470 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP: full = 0; break;
1471 :
1472 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
1473 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
1474 0 : return; /* Ignore */
1475 0 : default: FD_LOG_ERR(( "invalid meta frag in state %s", fd_snapct_state_str( ctx->state ) ));
1476 0 : }
1477 :
1478 0 : FD_TEST( sz==sizeof(fd_ssctrl_meta_t) );
1479 0 : fd_ssctrl_meta_t const * meta = fd_chunk_to_laddr_const( ctx->snapld_in_mem, chunk );
1480 :
1481 0 : if( FD_UNLIKELY( meta->total_sz==0UL ) ) {
1482 0 : if( FD_UNLIKELY( !ctx->malformed ) ) {
1483 0 : FD_LOG_WARNING(( "received zero Content-Length metadata for %s snapshot, marking malformed", full ? "full" : "incremental" ));
1484 0 : ctx->malformed = 1;
1485 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
1486 0 : }
1487 0 : return;
1488 0 : }
1489 :
1490 0 : if( full ) ctx->metrics.full.bytes_total = meta->total_sz;
1491 0 : else ctx->metrics.incremental.bytes_total = meta->total_sz;
1492 :
1493 0 : return;
1494 0 : }
1495 15 : if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_CTRL_FAIL ) ) {
1496 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET ||
1497 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ||
1498 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET ||
1499 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ) {
1500 0 : ctx->flush_ack++;
1501 0 : FD_TEST( ctx->flush_ack <= ctx->flush_ack_cnt );
1502 0 : } else {
1503 0 : FD_LOG_ERR(( "unexpected control frag %lu (%s) in state %d (%s)", sig, fd_ssctrl_msg_ctrl_str( sig ), ctx->state, fd_snapct_state_str( ctx->state ) ));
1504 0 : }
1505 0 : return;
1506 0 : }
1507 15 : if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_CTRL_ERROR ) ) {
1508 : /* CTRL_ERROR message directly from snapld can be snapld-generated
1509 : or snapct-generated-and-forwarded. */
1510 0 : switch( ctx->state ) {
1511 0 : case FD_SNAPCT_STATE_READING_FULL_FILE:
1512 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI:
1513 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE:
1514 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
1515 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI:
1516 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE:
1517 0 : case FD_SNAPCT_STATE_READING_FULL_HTTP:
1518 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI:
1519 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE:
1520 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP:
1521 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI:
1522 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE:
1523 0 : FD_LOG_WARNING(( "received error from snapld in state %d (%s)",
1524 0 : ctx->state, fd_snapct_state_str( ctx->state ) ));
1525 0 : ctx->malformed = 1;
1526 0 : break;
1527 0 : default:
1528 0 : break;
1529 0 : }
1530 0 : return;
1531 0 : }
1532 15 : if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_LOAD_COMPLETE ) ) {
1533 15 : int full = 0;
1534 15 : switch( ctx->state ) {
1535 3 : case FD_SNAPCT_STATE_READING_FULL_FILE:
1536 6 : case FD_SNAPCT_STATE_READING_FULL_HTTP: full = 1; break;
1537 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
1538 3 : case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP: full = 0; break;
1539 3 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET:
1540 3 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
1541 3 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET:
1542 6 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
1543 6 : return; /* Ignore during reset. */
1544 0 : default:
1545 0 : FD_LOG_ERR(( "invalid load_complete in state %s", fd_snapct_state_str( ctx->state ) ));
1546 0 : return;
1547 15 : }
1548 9 : if( FD_UNLIKELY( ctx->malformed ) ) return;
1549 : /* Validate that all expected bytes were received. */
1550 6 : ulong bytes_read = full ? ctx->metrics.full.bytes_read : ctx->metrics.incremental.bytes_read;
1551 6 : ulong bytes_total = full ? ctx->metrics.full.bytes_total : ctx->metrics.incremental.bytes_total;
1552 6 : if( FD_UNLIKELY( !bytes_total || bytes_read!=bytes_total ) ) {
1553 0 : ctx->malformed = 1;
1554 0 : FD_LOG_WARNING(( "load_complete but bytes_read %lu != bytes_total %lu for %s snapshot",
1555 0 : bytes_read, bytes_total, full ? "full" : "incremental" ));
1556 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
1557 0 : return;
1558 0 : }
1559 6 : ctx->load_complete = 1;
1560 6 : return;
1561 6 : }
1562 0 : if( FD_UNLIKELY( sig!=FD_SNAPSHOT_MSG_DATA ) ) {
1563 0 : if( process_ctrl_ack( ctx, sig ) ) {
1564 0 : FD_LOG_ERR(( "unexpected control frag %lu (%s) in state %d (%s)", sig, fd_ssctrl_msg_ctrl_str( sig ), ctx->state, fd_snapct_state_str( ctx->state ) ));
1565 0 : }
1566 0 : return;
1567 0 : }
1568 :
1569 0 : int full, file;
1570 0 : switch( ctx->state ) {
1571 : /* Expected cases, fall through below */
1572 0 : case FD_SNAPCT_STATE_READING_FULL_FILE: full = 1; file = 1; break;
1573 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE: full = 0; file = 1; break;
1574 0 : case FD_SNAPCT_STATE_READING_FULL_HTTP: full = 1; file = 0; break;
1575 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP: full = 0; file = 0; break;
1576 :
1577 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET:
1578 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET:
1579 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
1580 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
1581 : /* We are waiting for a reset to fully propagate through the
1582 : pipeline, just throw away any trailing data frags. */
1583 0 : return;
1584 :
1585 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI:
1586 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI:
1587 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI:
1588 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI:
1589 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE:
1590 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE:
1591 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE:
1592 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE:
1593 : /* Based on previously received data frags, we expected that the
1594 : current full / incremental snapshot was finished, but then we
1595 : received additional data frags. Unsafe to continue so throw
1596 : away the whole snapshot. Do not publish MSG_CTRL_ERROR here:
1597 : data forwarding is already complete, and the state machine
1598 : will publish MSG_CTRL_FAIL on the next tick. */
1599 0 : if( !ctx->malformed ) {
1600 0 : ctx->malformed = 1;
1601 0 : FD_LOG_WARNING(( "complete snapshot loaded but read %lu extra bytes", sz ));
1602 0 : }
1603 0 : return;
1604 :
1605 0 : case FD_SNAPCT_STATE_WAITING_FOR_PEERS:
1606 0 : case FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL:
1607 0 : case FD_SNAPCT_STATE_COLLECTING_PEERS:
1608 0 : case FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL:
1609 0 : case FD_SNAPCT_STATE_SHUTDOWN:
1610 0 : default:
1611 0 : FD_LOG_ERR(( "invalid data frag in state %s", fd_snapct_state_str( ctx->state ) ));
1612 0 : return;
1613 0 : }
1614 :
1615 0 : if( FD_UNLIKELY( full && ctx->metrics.full.bytes_total==0UL ) ) {
1616 0 : if( !ctx->malformed ) {
1617 0 : ctx->malformed = 1;
1618 0 : FD_LOG_WARNING(( "received data frag for full snapshot with zero bytes_total" ));
1619 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
1620 0 : }
1621 0 : return;
1622 0 : }
1623 0 : if( FD_UNLIKELY( !full && ctx->metrics.incremental.bytes_total==0UL ) ) {
1624 0 : if( !ctx->malformed ) {
1625 0 : ctx->malformed = 1;
1626 0 : FD_LOG_WARNING(( "received data frag for incremental snapshot with zero bytes_total" ));
1627 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
1628 0 : }
1629 0 : return;
1630 0 : }
1631 :
1632 0 : if( full ) ctx->metrics.full.bytes_read += sz;
1633 0 : else ctx->metrics.incremental.bytes_read += sz;
1634 :
1635 0 : if( !file && -1!=ctx->local_out.dir_fd ) {
1636 0 : uchar const * data = fd_chunk_to_laddr_const( ctx->snapld_in_mem, chunk );
1637 0 : int fd = full ? ctx->local_out.full_snapshot_fd : ctx->local_out.incremental_snapshot_fd;
1638 0 : ulong written_sz = 0;
1639 0 : while( written_sz<sz ) {
1640 0 : long result = write( fd, data+written_sz, sz-written_sz );
1641 0 : if( FD_UNLIKELY( -1==result && errno==EINTR ) ) continue;
1642 0 : if( FD_UNLIKELY( -1==result && errno==ENOSPC ) ) {
1643 0 : FD_LOG_ERR(( "Out of disk space when writing out snapshot data to `%s`", ctx->config.snapshots_path ));
1644 0 : } else if( FD_UNLIKELY( 0L>result ) ) {
1645 0 : FD_LOG_ERR(( "write() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
1646 0 : } else if( FD_UNLIKELY( 0L==result ) ) {
1647 0 : FD_LOG_ERR(( "write() returned 0 for non-zero count" ));
1648 0 : }
1649 :
1650 0 : written_sz += (ulong)result;
1651 0 : }
1652 0 : if( full ) ctx->metrics.full.bytes_written += sz;
1653 0 : else ctx->metrics.incremental.bytes_written += sz;
1654 0 : }
1655 :
1656 0 : if( FD_UNLIKELY( ( full && ctx->metrics.full.bytes_read > ctx->metrics.full.bytes_total ) ||
1657 0 : (!full && ctx->metrics.incremental.bytes_read > ctx->metrics.incremental.bytes_total ) ) ) {
1658 0 : if( !ctx->malformed ) {
1659 0 : ctx->malformed = 1;
1660 0 : FD_LOG_WARNING(( "expected %s snapshot size of %lu bytes but read %lu bytes",
1661 0 : full ? "full" : "incremental",
1662 0 : full ? ctx->metrics.full.bytes_total : ctx->metrics.incremental.bytes_total,
1663 0 : full ? ctx->metrics.full.bytes_read : ctx->metrics.incremental.bytes_read ));
1664 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
1665 0 : }
1666 0 : return;
1667 0 : }
1668 0 : }
1669 :
1670 : static void
1671 : ctrl_ack_frag( fd_snapct_tile_t * ctx,
1672 0 : ulong sig ) {
1673 0 : switch( sig ) {
1674 0 : case FD_SNAPSHOT_MSG_CTRL_FAIL:
1675 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET ||
1676 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ||
1677 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET ||
1678 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ) {
1679 0 : ctx->flush_ack++;
1680 0 : FD_TEST( ctx->flush_ack <= ctx->flush_ack_cnt );
1681 0 : } else {
1682 0 : FD_LOG_ERR(( "unexpected control frag %lu (%s) in state %d (%s)", sig, fd_ssctrl_msg_ctrl_str( sig ), ctx->state, fd_snapct_state_str( ctx->state ) ));
1683 0 : }
1684 0 : return;
1685 :
1686 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
1687 0 : return;
1688 :
1689 0 : case FD_SNAPSHOT_MSG_CTRL_ERROR:
1690 0 : switch( ctx->state ) {
1691 0 : case FD_SNAPCT_STATE_READING_FULL_FILE:
1692 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_FINI:
1693 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_DONE:
1694 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
1695 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_FINI:
1696 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_DONE:
1697 0 : case FD_SNAPCT_STATE_READING_FULL_HTTP:
1698 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_FINI:
1699 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_DONE:
1700 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP:
1701 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_FINI:
1702 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_DONE:
1703 : /* Do not publish MSG_CTRL_ERROR: the error originated
1704 : downstream, so re-publishing would be redundant.
1705 : MSG_CTRL_FAIL follows on the next state machine tick. */
1706 0 : FD_LOG_WARNING(( "received error from downstream tile while in state %s",
1707 0 : fd_snapct_state_str( ctx->state ) ));
1708 0 : ctx->malformed = 1;
1709 0 : break;
1710 0 : default:
1711 0 : break;
1712 0 : }
1713 0 : return;
1714 :
1715 0 : default:
1716 0 : break;
1717 0 : }
1718 0 : if( process_ctrl_ack( ctx, sig ) ) {
1719 0 : FD_LOG_ERR(( "unexpected control frag %lu (%s) in state %d (%s)", sig, fd_ssctrl_msg_ctrl_str( sig ), ctx->state, fd_snapct_state_str( ctx->state ) ));
1720 0 : }
1721 0 : }
1722 :
1723 : static int
1724 : returnable_frag( fd_snapct_tile_t * ctx,
1725 : ulong in_idx,
1726 : ulong seq FD_PARAM_UNUSED,
1727 : ulong sig,
1728 : ulong chunk,
1729 : ulong sz,
1730 : ulong ctl FD_PARAM_UNUSED,
1731 : ulong tsorig FD_PARAM_UNUSED,
1732 : ulong tspub FD_PARAM_UNUSED,
1733 0 : fd_stem_context_t * stem ) {
1734 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) {
1735 0 : gossip_frag( ctx, sig, sz, chunk );
1736 0 : } else if( ctx->in_kind[ in_idx ]==IN_KIND_SNAPLD ) {
1737 0 : snapld_frag( ctx, sig, sz, chunk, stem );
1738 0 : } else if( ctx->in_kind[ in_idx ]==IN_KIND_ACK ) {
1739 0 : ctrl_ack_frag( ctx, sig );
1740 0 : } else FD_LOG_ERR(( "invalid in_kind %lu %u", in_idx, (uint)ctx->in_kind[ in_idx ] ));
1741 0 : return 0;
1742 0 : }
1743 :
1744 : static void
1745 : privileged_init( fd_topo_t const * topo,
1746 0 : fd_topo_tile_t const * tile ) {
1747 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1748 :
1749 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1750 0 : fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
1751 0 : void * _ssping = FD_SCRATCH_ALLOC_APPEND( l, fd_ssping_align(), fd_ssping_footprint( TOTAL_PEERS_MAX ) );
1752 0 : FD_SCRATCH_ALLOC_APPEND( l, alignof(gossip_ci_entry_t), sizeof(gossip_ci_entry_t)*GOSSIP_PEERS_MAX );
1753 0 : FD_SCRATCH_ALLOC_APPEND( l, gossip_ci_map_align(), gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
1754 0 : void * _ssresolver = FD_SCRATCH_ALLOC_APPEND( l, fd_http_resolver_align(), fd_http_resolver_footprint( SERVER_PEERS_MAX ) );
1755 0 : FD_SCRATCH_ALLOC_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( TOTAL_PEERS_MAX ) );
1756 0 : FD_SCRATCH_ALLOC_APPEND( l, blacklist_pool_align(), blacklist_pool_footprint( TOTAL_PEERS_MAX ) );
1757 0 : FD_SCRATCH_ALLOC_APPEND( l, blacklist_map_align(), blacklist_map_footprint( blacklist_map_chain_cnt_est( TOTAL_PEERS_MAX ) ) );
1758 :
1759 0 : #if FD_HAS_OPENSSL
1760 0 : void * _alloc = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
1761 0 : fd_alloc_t * alloc = fd_alloc_join( fd_alloc_new( _alloc, 1UL ), tile->kind_id );
1762 0 : fd_ossl_tile_init( alloc );
1763 0 : #endif
1764 :
1765 0 : ctx->ssping = NULL;
1766 0 : if( FD_LIKELY( download_enabled( tile ) ) ) ctx->ssping = fd_ssping_join( fd_ssping_new( _ssping, TOTAL_PEERS_MAX, 1UL, on_ping, ctx ) );
1767 0 : if( FD_LIKELY( tile->snapct.sources.servers_cnt ) ) ctx->ssresolver = fd_http_resolver_join( fd_http_resolver_new( _ssresolver, SERVER_PEERS_MAX, tile->snapct.incremental_snapshots, on_resolve, ctx ) );
1768 0 : else ctx->ssresolver = NULL;
1769 :
1770 0 : fd_ssarchive_remove_old_snapshots( tile->snapct.snapshots_path,
1771 0 : tile->snapct.max_full_snapshots_to_keep,
1772 0 : tile->snapct.max_incremental_snapshots_to_keep );
1773 :
1774 0 : ulong full_slot = ULONG_MAX;
1775 0 : ulong incremental_slot = ULONG_MAX;
1776 0 : int full_is_zstd = 0;
1777 0 : int incremental_is_zstd = 0;
1778 0 : char full_path[ PATH_MAX ] = {0};
1779 0 : char incremental_path[ PATH_MAX ] = {0};
1780 0 : uchar full_snapshot_hash[ FD_HASH_FOOTPRINT ] = {0};
1781 0 : uchar incremental_snapshot_hash[ FD_HASH_FOOTPRINT ] = {0};
1782 0 : if( FD_UNLIKELY( -1==fd_ssarchive_latest_pair( tile->snapct.snapshots_path,
1783 0 : tile->snapct.incremental_snapshots,
1784 0 : &full_slot,
1785 0 : &incremental_slot,
1786 0 : full_path,
1787 0 : incremental_path,
1788 0 : &full_is_zstd,
1789 0 : &incremental_is_zstd,
1790 0 : full_snapshot_hash,
1791 0 : incremental_snapshot_hash ) ) ) {
1792 0 : if( FD_UNLIKELY( !download_enabled( tile ) ) ) {
1793 0 : FD_LOG_ERR(( "No snapshots found in `%s` and no download sources are enabled. "
1794 0 : "Please enable downloading via [snapshots.sources] and restart.", tile->snapct.snapshots_path ));
1795 0 : }
1796 0 : ctx->local_in.full_snapshot_slot = ULONG_MAX;
1797 0 : ctx->local_in.incremental_snapshot_slot = ULONG_MAX;
1798 0 : ctx->local_in.full_snapshot_size = 0UL;
1799 0 : ctx->local_in.incremental_snapshot_size = 0UL;
1800 0 : ctx->local_in.full_snapshot_zstd = 0;
1801 0 : ctx->local_in.incremental_snapshot_zstd = 0;
1802 0 : fd_cstr_fini( ctx->local_in.full_snapshot_path );
1803 0 : fd_cstr_fini( ctx->local_in.incremental_snapshot_path );
1804 0 : fd_memset( ctx->local_in.full_snapshot_hash, 0, FD_HASH_FOOTPRINT );
1805 0 : fd_memset( ctx->local_in.incremental_snapshot_hash, 0, FD_HASH_FOOTPRINT );
1806 0 : } else {
1807 0 : FD_TEST( full_slot!=ULONG_MAX );
1808 :
1809 0 : ctx->local_in.full_snapshot_slot = full_slot;
1810 0 : ctx->local_in.incremental_snapshot_slot = incremental_slot;
1811 0 : ctx->local_in.full_snapshot_zstd = full_is_zstd;
1812 0 : ctx->local_in.incremental_snapshot_zstd = incremental_is_zstd;
1813 :
1814 0 : fd_cstr_ncpy( ctx->local_in.full_snapshot_path, full_path, PATH_MAX );
1815 0 : fd_memcpy( ctx->local_in.full_snapshot_hash, full_snapshot_hash, FD_HASH_FOOTPRINT );
1816 0 : struct stat full_stat;
1817 0 : if( FD_UNLIKELY( -1==stat( ctx->local_in.full_snapshot_path, &full_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", full_path, errno, fd_io_strerror( errno ) ));
1818 0 : if( FD_UNLIKELY( !S_ISREG( full_stat.st_mode ) ) ) FD_LOG_ERR(( "full snapshot path `%s` is not a regular file", full_path ));
1819 0 : ctx->local_in.full_snapshot_size = (ulong)full_stat.st_size;
1820 :
1821 0 : if( FD_LIKELY( incremental_slot!=ULONG_MAX ) ) {
1822 0 : fd_cstr_ncpy( ctx->local_in.incremental_snapshot_path, incremental_path, PATH_MAX );
1823 0 : fd_memcpy( ctx->local_in.incremental_snapshot_hash, incremental_snapshot_hash, FD_HASH_FOOTPRINT );
1824 0 : struct stat incremental_stat;
1825 0 : if( FD_UNLIKELY( -1==stat( ctx->local_in.incremental_snapshot_path, &incremental_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", incremental_path, errno, fd_io_strerror( errno ) ));
1826 0 : if( FD_UNLIKELY( !S_ISREG( incremental_stat.st_mode ) ) ) FD_LOG_ERR(( "incremental snapshot path `%s` is not a regular file", incremental_path ));
1827 0 : ctx->local_in.incremental_snapshot_size = (ulong)incremental_stat.st_size;
1828 0 : } else {
1829 0 : ctx->local_in.incremental_snapshot_size = 0UL;
1830 0 : fd_cstr_fini( ctx->local_in.incremental_snapshot_path );
1831 0 : }
1832 0 : }
1833 :
1834 0 : ctx->local_out.dir_fd = -1;
1835 0 : ctx->local_out.full_snapshot_fd = -1;
1836 0 : ctx->local_out.incremental_snapshot_fd = -1;
1837 0 : if( FD_LIKELY( download_enabled( tile ) ) ) {
1838 : /* Switch to non-root uid/gid for file creation so snapshot files
1839 : are owned by the target user, not root. */
1840 0 : gid_t gid = getgid();
1841 0 : uid_t uid = getuid();
1842 0 : if( FD_LIKELY( !gid && -1==syscall( __NR_setresgid, -1, tile->snapct.target_gid, -1 ) ) )
1843 0 : FD_LOG_ERR(( "setresgid() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
1844 0 : if( FD_LIKELY( !uid && -1==syscall( __NR_setresuid, -1, tile->snapct.target_uid, -1 ) ) )
1845 0 : FD_LOG_ERR(( "setresuid() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
1846 :
1847 0 : ctx->local_out.dir_fd = open( tile->snapct.snapshots_path, O_DIRECTORY|O_CLOEXEC );
1848 0 : if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) FD_LOG_ERR(( "open(%s) failed (%i-%s)", tile->snapct.snapshots_path, errno, fd_io_strerror( errno ) ));
1849 :
1850 0 : ctx->local_out.full_snapshot_fd = openat( ctx->local_out.dir_fd, TEMP_FULL_SNAP_NAME, O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
1851 0 : if( FD_UNLIKELY( -1==ctx->local_out.full_snapshot_fd ) ) FD_LOG_ERR(( "open(%s/%s) failed (%i-%s)", tile->snapct.snapshots_path, TEMP_FULL_SNAP_NAME, errno, fd_io_strerror( errno ) ));
1852 :
1853 0 : if( FD_LIKELY( tile->snapct.incremental_snapshots ) ) {
1854 0 : ctx->local_out.incremental_snapshot_fd = openat( ctx->local_out.dir_fd, TEMP_INCR_SNAP_NAME, O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
1855 0 : if( FD_UNLIKELY( -1==ctx->local_out.incremental_snapshot_fd ) ) FD_LOG_ERR(( "open(%s/%s) failed (%i-%s)", tile->snapct.snapshots_path, TEMP_INCR_SNAP_NAME, errno, fd_io_strerror( errno ) ));
1856 0 : }
1857 :
1858 0 : if( FD_UNLIKELY( -1==syscall( __NR_setresuid, -1, uid, -1 ) ) )
1859 0 : FD_LOG_ERR(( "setresuid() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
1860 0 : if( FD_UNLIKELY( -1==syscall( __NR_setresgid, -1, gid, -1 ) ) )
1861 0 : FD_LOG_ERR(( "setresgid() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
1862 0 : }
1863 :
1864 0 : FD_TEST( fd_rng_secure( &ctx->selector_seed, 8UL ) );
1865 0 : }
1866 :
1867 : static inline fd_snapct_out_link_t
1868 : out1( fd_topo_t const * topo,
1869 : fd_topo_tile_t const * tile,
1870 0 : char const * name ) {
1871 0 : ulong idx = ULONG_MAX;
1872 :
1873 0 : for( ulong i=0UL; i<tile->out_cnt; i++ ) {
1874 0 : fd_topo_link_t const * link = &topo->links[ tile->out_link_id[ i ] ];
1875 0 : if( !strcmp( link->name, name ) ) {
1876 0 : if( FD_UNLIKELY( idx!=ULONG_MAX ) ) FD_LOG_ERR(( "tile %s:%lu had multiple output links named %s but expected one", tile->name, tile->kind_id, name ));
1877 0 : idx = i;
1878 0 : }
1879 0 : }
1880 :
1881 0 : if( FD_UNLIKELY( idx==ULONG_MAX ) ) return (fd_snapct_out_link_t){ .idx = ULONG_MAX, .mem = NULL, .chunk0 = 0, .wmark = 0, .chunk = 0, .mtu = 0 };
1882 :
1883 0 : ulong mtu = topo->links[ tile->out_link_id[ idx ] ].mtu;
1884 0 : if( FD_UNLIKELY( mtu==0UL ) ) return (fd_snapct_out_link_t){ .idx = idx, .mem = NULL, .chunk0 = ULONG_MAX, .wmark = ULONG_MAX, .chunk = ULONG_MAX, .mtu = mtu };
1885 :
1886 0 : void * mem = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ idx ] ].dcache_obj_id ].wksp_id ].wksp;
1887 0 : ulong chunk0 = fd_dcache_compact_chunk0( mem, topo->links[ tile->out_link_id[ idx ] ].dcache );
1888 0 : ulong wmark = fd_dcache_compact_wmark ( mem, topo->links[ tile->out_link_id[ idx ] ].dcache, mtu );
1889 0 : return (fd_snapct_out_link_t){ .idx = idx, .mem = mem, .chunk0 = chunk0, .wmark = wmark, .chunk = chunk0, .mtu = mtu };
1890 0 : }
1891 :
1892 : static void
1893 : unprivileged_init( fd_topo_t const * topo,
1894 0 : fd_topo_tile_t const * tile ) {
1895 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1896 :
1897 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1898 0 : fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
1899 0 : FD_SCRATCH_ALLOC_APPEND( l, fd_ssping_align(), fd_ssping_footprint( TOTAL_PEERS_MAX ) );
1900 0 : void * _ci_table = FD_SCRATCH_ALLOC_APPEND( l, alignof(gossip_ci_entry_t), sizeof(gossip_ci_entry_t) * GOSSIP_PEERS_MAX );
1901 0 : void * _ci_map = FD_SCRATCH_ALLOC_APPEND( l, gossip_ci_map_align(), gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
1902 0 : FD_SCRATCH_ALLOC_APPEND( l, fd_http_resolver_align(), fd_http_resolver_footprint( SERVER_PEERS_MAX ) );
1903 0 : void * _selector = FD_SCRATCH_ALLOC_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( TOTAL_PEERS_MAX ) );
1904 0 : void * _bl_pool = FD_SCRATCH_ALLOC_APPEND( l, blacklist_pool_align(), blacklist_pool_footprint( TOTAL_PEERS_MAX ) );
1905 0 : void * _bl_map = FD_SCRATCH_ALLOC_APPEND( l, blacklist_map_align(), blacklist_map_footprint( blacklist_map_chain_cnt_est( TOTAL_PEERS_MAX ) ) );
1906 :
1907 0 : ctx->config = tile->snapct;
1908 0 : ctx->gossip_enabled = gossip_enabled( tile );
1909 0 : ctx->download_enabled = download_enabled( tile );
1910 :
1911 0 : ctx->selector = fd_sspeer_selector_join( fd_sspeer_selector_new( _selector, TOTAL_PEERS_MAX, ctx->selector_seed ) );
1912 0 : ctx->blacklist_pool = blacklist_pool_join( blacklist_pool_new( _bl_pool, TOTAL_PEERS_MAX ) );
1913 0 : ctx->blacklist_map = blacklist_map_join( blacklist_map_new( _bl_map, blacklist_map_chain_cnt_est( TOTAL_PEERS_MAX ), ctx->selector_seed ) );
1914 :
1915 0 : if( ctx->config.sources.servers_cnt ) {
1916 0 : for( ulong i=0UL; i<tile->snapct.sources.servers_cnt; i++ ) {
1917 : /* The peers needs to be added to resolver and to selector.
1918 : Only if this succeeds, add the peer to ssping list. */
1919 0 : if( FD_LIKELY( !fd_http_resolver_add( ctx->ssresolver,
1920 0 : tile->snapct.sources.servers[ i ].addr,
1921 0 : tile->snapct.sources.servers[ i ].hostname,
1922 0 : tile->snapct.sources.servers[ i ].is_https,
1923 0 : ctx->selector ) ) ) {
1924 0 : fd_ssping_add( ctx->ssping, tile->snapct.sources.servers[ i ].addr );
1925 0 : }
1926 0 : }
1927 0 : }
1928 :
1929 0 : if( FD_UNLIKELY( !ctx->config.incremental_snapshots ) ) {
1930 0 : FD_LOG_WARNING(( "incremental snapshots disabled via [snapshots.incremental_snapshots]." ));
1931 0 : }
1932 :
1933 0 : ctx->state = FD_SNAPCT_STATE_INIT;
1934 0 : ctx->malformed = 0;
1935 0 : ctx->load_complete = 0;
1936 0 : FD_CHECK_ERR( ctx->config.wait_for_peers_timeout_nanos>0L, "snapct wait_for_peers_timeout_nanos must be positive" );
1937 0 : ctx->deadline_nanos = fd_log_wallclock() + ctx->config.wait_for_peers_timeout_nanos;
1938 0 : ctx->flush_ack = 0;
1939 0 : ctx->flush_ack_cnt = 0;
1940 0 : ctx->peer.addr.l = 0UL;
1941 :
1942 0 : fd_memset( ctx->http_full_snapshot_name, 0, PATH_MAX );
1943 0 : fd_memset( ctx->http_incr_snapshot_name, 0, PATH_MAX );
1944 :
1945 0 : ctx->gossip_in_mem = NULL;
1946 0 : int has_snapld_dc = 0, ack_cnt = 0;
1947 0 : FD_TEST( tile->in_cnt<=MAX_IN_LINKS );
1948 0 : for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
1949 0 : fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ i ] ];
1950 0 : if( 0==strcmp( in_link->name, "gossip_out" ) ) {
1951 0 : ctx->in_kind[ i ] = IN_KIND_GOSSIP;
1952 0 : ctx->gossip_in_mem = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
1953 0 : } else if( 0==strcmp( in_link->name, "snapld_dc" ) ) {
1954 0 : ctx->in_kind[ i ] = IN_KIND_SNAPLD;
1955 0 : ctx->snapld_in_mem = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
1956 0 : FD_TEST( !has_snapld_dc );
1957 0 : has_snapld_dc = 1;
1958 0 : } else if( 0==strcmp( in_link->name, "snapin_ct" ) || 0==strcmp( in_link->name, "snapwr_ct" ) ){
1959 0 : ctx->in_kind[ i ] = IN_KIND_ACK;
1960 0 : ack_cnt++;
1961 0 : }
1962 0 : }
1963 0 : FD_TEST( has_snapld_dc && ack_cnt>0 );
1964 0 : ctx->flush_ack_cnt = ack_cnt + 1; /* +1 for snapld (acks via snapld_dc) */
1965 0 : FD_TEST( ctx->gossip_enabled==(ctx->gossip_in_mem!=NULL) );
1966 :
1967 0 : ctx->predicted_incremental.full_slot = FD_SSPEER_SLOT_UNKNOWN;
1968 0 : ctx->predicted_incremental.slot = FD_SSPEER_SLOT_UNKNOWN;
1969 0 : ctx->predicted_incremental.pending = 0;
1970 :
1971 0 : fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
1972 :
1973 0 : fd_memset( _ci_table, 0, sizeof(gossip_ci_entry_t) * GOSSIP_PEERS_MAX );
1974 0 : ctx->gossip.ci_table = _ci_table;
1975 0 : ctx->gossip.ci_map = gossip_ci_map_join( gossip_ci_map_new( _ci_map, gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ), 0UL ) );
1976 0 : ctx->gossip.allowed_cnt = 0UL;
1977 0 : ctx->gossip.saturated = !ctx->gossip_enabled;
1978 :
1979 0 : if( FD_UNLIKELY( tile->out_cnt<2UL || tile->out_cnt>3UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2-3", tile->out_cnt ));
1980 0 : ctx->out_ld = out1( topo, tile, "snapct_ld" );
1981 0 : ctx->out_gui = out1( topo, tile, "snapct_gui" );
1982 0 : ctx->out_rp = out1( topo, tile, "snapct_repr" );
1983 0 : }
1984 :
1985 : /* after_credit can result in as many as 5 stem publishes in some code
1986 : paths, and returnable_frag can result in 1. */
1987 0 : #define STEM_BURST 6UL
1988 :
1989 0 : #define STEM_LAZY 1000L
1990 :
1991 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapct_tile_t
1992 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapct_tile_t)
1993 :
1994 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
1995 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
1996 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
1997 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
1998 :
1999 : #include "../../disco/stem/fd_stem.c"
2000 :
2001 : #ifndef FD_TILE_TEST
2002 : fd_topo_run_tile_t fd_tile_snapct = {
2003 : .name = NAME,
2004 : .rlimit_file_cnt_fn = rlimit_file_cnt,
2005 : .populate_allowed_seccomp = populate_allowed_seccomp,
2006 : .populate_allowed_fds = populate_allowed_fds,
2007 : .scratch_align = scratch_align,
2008 : .scratch_footprint = scratch_footprint,
2009 : .loose_footprint = loose_footprint,
2010 : .privileged_init = privileged_init,
2011 : .unprivileged_init = unprivileged_init,
2012 : .run = stem_run,
2013 : .keep_host_networking = 1,
2014 : .allow_connect = 1,
2015 : .allow_renameat = 1,
2016 : };
2017 : #endif
2018 :
2019 : #undef NAME
|