Line data Source code
1 : #include "fd_snapct_tile.h"
2 : #include "utils/fd_ssping.h"
3 : #include "utils/fd_ssctrl.h"
4 : #include "utils/fd_ssarchive.h"
5 : #include "utils/fd_http_resolver.h"
6 : #include "utils/fd_ssmsg.h"
7 :
8 : #include "../../disco/topo/fd_topo.h"
9 : #include "../../disco/metrics/fd_metrics.h"
10 : #include "../../flamenco/gossip/fd_gossip_types.h"
11 : #include "../../waltz/openssl/fd_openssl_tile.h"
12 :
13 : #include <errno.h>
14 : #include <stdio.h>
15 : #include <fcntl.h>
16 : #include <unistd.h>
17 : #include <sys/stat.h>
18 : #include <netinet/tcp.h>
19 : #include <netinet/in.h>
20 :
21 : #include "generated/fd_snapct_tile_seccomp.h"
22 :
23 : #define NAME "snapct"
24 :
25 : /* FIXME: Implement full_effective_age_cancel_threshold */
26 : /* FIXME: Add more timeout config options and have consistent behavior */
27 : /* FIXME: Do a finishing pass over the default.toml config options / comments */
28 : /* FIXME: Improve behavior when using incremental_snapshots = false */
29 : /* FIXME: Handle cases where no explicitly allowed peers advertise RPC */
30 : /* FIXME: Make the code more strict about duplicate IP:port's */
31 : /* FIXME: Handle cases where the slot number we start downloading differs from advertised */
32 : /* FIXME: Ensure local files are not selected again if they fail the first time. */
33 :
34 0 : #define GOSSIP_PEERS_MAX (FD_CONTACT_INFO_TABLE_SIZE)
35 0 : #define SERVER_PEERS_MAX (FD_TOPO_SNAPSHOTS_SERVERS_MAX_RESOLVED)
36 0 : #define TOTAL_PEERS_MAX (GOSSIP_PEERS_MAX + SERVER_PEERS_MAX)
37 :
38 0 : #define IN_KIND_ACK (0)
39 0 : #define IN_KIND_SNAPLD (1)
40 0 : #define IN_KIND_GOSSIP (2)
41 : #define MAX_IN_LINKS (3)
42 :
43 0 : #define TEMP_FULL_SNAP_NAME ".snapshot.tar.bz2-partial"
44 0 : #define TEMP_INCR_SNAP_NAME ".incremental-snapshot.tar.bz2-partial"
45 :
46 : struct fd_snapct_out_link {
47 : ulong idx;
48 : fd_wksp_t * mem;
49 : ulong chunk0;
50 : ulong wmark;
51 : ulong chunk;
52 : ulong mtu;
53 : };
54 : typedef struct fd_snapct_out_link fd_snapct_out_link_t;
55 :
56 : #define FD_SNAPCT_GOSSIP_FRESH_DEADLINE_NANOS (10L*1000L*1000L*1000L) /* gossip contact info is pushed every ~7.5 seconds */
57 0 : #define FD_SNAPCT_GOSSIP_SATURATION_CHECK_INTERVAL ( 10L*1000L*1000L)
58 0 : #define FD_SNAPCT_GOSSIP_SATURATION_THRESHOLD (0.05) /* 5% fresh peers */
59 :
60 0 : #define FD_SNAPCT_COLLECTING_PEERS_TIMEOUT (2L*60L*1000L*1000L*1000L) /* 2 minutes */
61 0 : #define FD_SNAPCT_WAITING_FOR_PEERS_TIMEOUT (2L*60L*1000L*1000L*1000L) /* 2 minutes */
62 :
63 : struct gossip_ci_entry {
64 : fd_pubkey_t pubkey;
65 : int allowed;
66 : fd_ip4_port_t rpc_addr;
67 : long added_nanos;
68 : ulong map_next;
69 : };
70 : typedef struct gossip_ci_entry gossip_ci_entry_t;
71 :
72 : #define MAP_NAME gossip_ci_map
73 0 : #define MAP_KEY pubkey
74 : #define MAP_ELE_T gossip_ci_entry_t
75 : #define MAP_KEY_T fd_pubkey_t
76 0 : #define MAP_NEXT map_next
77 0 : #define MAP_KEY_EQ(k0,k1) fd_pubkey_eq( k0, k1 )
78 0 : #define MAP_KEY_HASH(key,seed) fd_hash( seed, key, sizeof(fd_pubkey_t) )
79 : #include "../../util/tmpl/fd_map_chain.c"
80 :
81 : struct fd_snapct_tile {
82 : struct fd_topo_tile_snapct config;
83 : int gossip_enabled;
84 : int download_enabled;
85 :
86 : fd_ssping_t * ssping;
87 : fd_http_resolver_t * ssresolver;
88 : fd_sspeer_selector_t * selector;
89 :
90 : int state;
91 : int malformed;
92 : long deadline_nanos;
93 : int flush_ack;
94 : fd_ip4_port_t addr;
95 :
96 : struct {
97 : int dir_fd;
98 : int full_snapshot_fd;
99 : int incremental_snapshot_fd;
100 : } local_out;
101 :
102 : char http_full_snapshot_name[ PATH_MAX ];
103 : char http_incr_snapshot_name[ PATH_MAX ];
104 :
105 : fd_wksp_t const * gossip_in_mem;
106 : fd_wksp_t const * snapld_in_mem;
107 : uchar in_kind[ MAX_IN_LINKS ];
108 :
109 : struct {
110 : ulong full_slot;
111 : ulong slot;
112 : int dirty;
113 : } predicted_incremental;
114 :
115 : struct {
116 : ulong full_snapshot_slot;
117 : char full_snapshot_path[ PATH_MAX ];
118 : ulong full_snapshot_size;
119 : int full_snapshot_zstd;
120 :
121 : ulong incremental_snapshot_slot;
122 : char incremental_snapshot_path[ PATH_MAX ];
123 : ulong incremental_snapshot_size;
124 : int incremental_snapshot_zstd;
125 : } local_in;
126 :
127 : struct {
128 : struct {
129 : ulong bytes_read;
130 : ulong bytes_written;
131 : ulong bytes_total;
132 : uint num_retries;
133 : } full;
134 :
135 : struct {
136 : ulong bytes_read;
137 : ulong bytes_written;
138 : ulong bytes_total;
139 : uint num_retries;
140 : } incremental;
141 : } metrics;
142 :
143 : struct {
144 : gossip_ci_entry_t * ci_table; /* flat array of all gossip entries, allowed or not */
145 : gossip_ci_map_t * ci_map; /* map from pubkey to only allowed gossip entries */
146 : ulong fresh_cnt;
147 : ulong total_cnt;
148 : int saturated;
149 : long next_saturated_check;
150 : } gossip;
151 :
152 : fd_snapct_out_link_t out_ld;
153 : fd_snapct_out_link_t out_gui;
154 : fd_snapct_out_link_t out_rp;
155 : };
156 : typedef struct fd_snapct_tile fd_snapct_tile_t;
157 :
158 : static int
159 0 : gossip_enabled( fd_topo_tile_t const * tile ) {
160 0 : return tile->snapct.sources.gossip.allow_any || tile->snapct.sources.gossip.allow_list_cnt>0UL;
161 0 : }
162 :
163 : static int
164 0 : download_enabled( fd_topo_tile_t const * tile ) {
165 0 : return gossip_enabled( tile ) || tile->snapct.sources.servers_cnt>0UL;
166 0 : }
167 :
168 : FD_FN_CONST static inline ulong
169 0 : loose_footprint( fd_topo_tile_t const * tile ) {
170 0 : (void)tile;
171 : /* Leftover space for OpenSSL allocations */
172 0 : return 1<<26UL; /* 64 MiB */
173 0 : }
174 :
175 : static ulong
176 0 : scratch_align( void ) {
177 0 : return fd_ulong_max( alignof(fd_snapct_tile_t),
178 0 : fd_ulong_max( fd_ssping_align(),
179 0 : fd_ulong_max( alignof(gossip_ci_entry_t),
180 0 : fd_ulong_max( gossip_ci_map_align(),
181 0 : fd_ulong_max( fd_http_resolver_align(),
182 0 : fd_sspeer_selector_align() ) ) ) ) );
183 0 : }
184 :
185 : static ulong
186 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
187 0 : ulong l = FD_LAYOUT_INIT;
188 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
189 0 : l = FD_LAYOUT_APPEND( l, fd_ssping_align(), fd_ssping_footprint( TOTAL_PEERS_MAX ) );
190 0 : l = FD_LAYOUT_APPEND( l, alignof(gossip_ci_entry_t), sizeof(gossip_ci_entry_t) * GOSSIP_PEERS_MAX );
191 0 : l = FD_LAYOUT_APPEND( l, gossip_ci_map_align(), gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
192 0 : l = FD_LAYOUT_APPEND( l, fd_http_resolver_align(), fd_http_resolver_footprint( SERVER_PEERS_MAX ) );
193 0 : l = FD_LAYOUT_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( TOTAL_PEERS_MAX ) );
194 0 : l = FD_LAYOUT_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
195 0 : return FD_LAYOUT_FINI( l, scratch_align() );
196 0 : }
197 :
198 : static inline int
199 0 : should_shutdown( fd_snapct_tile_t * ctx ) {
200 0 : return ctx->state==FD_SNAPCT_STATE_SHUTDOWN;
201 0 : }
202 :
203 : static void
204 0 : during_housekeeping( fd_snapct_tile_t * ctx ) {
205 0 : long now = fd_log_wallclock();
206 :
207 0 : if( FD_UNLIKELY( !ctx->gossip.saturated && now>ctx->gossip.next_saturated_check ) ) {
208 0 : ctx->gossip.next_saturated_check = now + FD_SNAPCT_GOSSIP_SATURATION_CHECK_INTERVAL;
209 :
210 0 : ulong fresh_cnt = 0UL;
211 0 : ulong total_cnt = 0UL;
212 0 : for( gossip_ci_map_iter_t iter = gossip_ci_map_iter_init( ctx->gossip.ci_map, ctx->gossip.ci_table );
213 0 : !gossip_ci_map_iter_done( iter, ctx->gossip.ci_map, ctx->gossip.ci_table );
214 0 : iter = gossip_ci_map_iter_next( iter, ctx->gossip.ci_map, ctx->gossip.ci_table ) ) {
215 0 : gossip_ci_entry_t const * ci_entry = gossip_ci_map_iter_ele_const( iter, ctx->gossip.ci_map, ctx->gossip.ci_table );
216 0 : if( FD_UNLIKELY( ci_entry->added_nanos>(now-FD_SNAPCT_GOSSIP_FRESH_DEADLINE_NANOS) ) ) fresh_cnt++;
217 0 : total_cnt++;
218 0 : }
219 0 : ctx->gossip.fresh_cnt = fresh_cnt;
220 0 : ctx->gossip.total_cnt = total_cnt;
221 :
222 0 : if( total_cnt!=0UL && total_cnt==ctx->config.sources.gossip.allow_list_cnt ) ctx->gossip.saturated = 1;
223 0 : else {
224 0 : double fresh = total_cnt ? (double)fresh_cnt/(double)total_cnt : 1.0;
225 0 : ctx->gossip.saturated = fresh<FD_SNAPCT_GOSSIP_SATURATION_THRESHOLD;
226 0 : }
227 0 : }
228 0 : }
229 :
230 : static void
231 0 : metrics_write( fd_snapct_tile_t * ctx ) {
232 0 : FD_MGAUGE_SET( SNAPCT, FULL_BYTES_READ, ctx->metrics.full.bytes_read );
233 0 : FD_MGAUGE_SET( SNAPCT, FULL_BYTES_WRITTEN, ctx->metrics.full.bytes_written );
234 0 : FD_MGAUGE_SET( SNAPCT, FULL_BYTES_TOTAL, ctx->metrics.full.bytes_total );
235 0 : FD_MGAUGE_SET( SNAPCT, FULL_DOWNLOAD_RETRIES, ctx->metrics.full.num_retries );
236 :
237 0 : FD_MGAUGE_SET( SNAPCT, INCREMENTAL_BYTES_READ, ctx->metrics.incremental.bytes_read );
238 0 : FD_MGAUGE_SET( SNAPCT, INCREMENTAL_BYTES_WRITTEN, ctx->metrics.incremental.bytes_written );
239 0 : FD_MGAUGE_SET( SNAPCT, INCREMENTAL_BYTES_TOTAL, ctx->metrics.incremental.bytes_total );
240 0 : FD_MGAUGE_SET( SNAPCT, INCREMENTAL_DOWNLOAD_RETRIES, ctx->metrics.incremental.num_retries );
241 :
242 0 : FD_MGAUGE_SET( SNAPCT, GOSSIP_FRESH_COUNT, ctx->gossip.fresh_cnt );
243 0 : FD_MGAUGE_SET( SNAPCT, GOSSIP_TOTAL_COUNT, ctx->gossip.total_cnt );
244 :
245 0 : FD_MGAUGE_SET( SNAPCT, PREDICTED_SLOT, ctx->predicted_incremental.slot );
246 :
247 0 : #if FD_HAS_OPENSSL
248 0 : FD_MCNT_SET( SNAPCT, SSL_ALLOC_ERRORS, fd_ossl_alloc_errors );
249 0 : #endif
250 :
251 0 : FD_MGAUGE_SET( SNAPCT, STATE, (ulong)ctx->state );
252 0 : }
253 :
254 : static void
255 : snapshot_path_gui_publish( fd_snapct_tile_t * ctx,
256 : fd_stem_context_t * stem,
257 : char const * path,
258 0 : int is_full ) {
259 : /* FIXME: Consider whether we can get everything we need from metrics
260 : rather than creating an entire link for this rare message */
261 0 : fd_snapct_update_t * out = fd_chunk_to_laddr( ctx->out_gui.mem, ctx->out_gui.chunk );
262 0 : FD_TEST( fd_cstr_printf_check( out->read_path, PATH_MAX, NULL, "%s", path ) );
263 0 : out->is_download = 0;
264 0 : out->type = fd_int_if( is_full, FD_SNAPCT_SNAPSHOT_TYPE_FULL, FD_SNAPCT_SNAPSHOT_TYPE_INCREMENTAL );
265 0 : fd_stem_publish( stem, ctx->out_gui.idx, 0UL, ctx->out_gui.chunk, sizeof(fd_snapct_update_t) , 0UL, 0UL, 0UL );
266 0 : ctx->out_gui.chunk = fd_dcache_compact_next( ctx->out_gui.chunk, sizeof(fd_snapct_update_t), ctx->out_gui.chunk0, ctx->out_gui.wmark );
267 0 : }
268 :
269 : static void
270 0 : predict_incremental( fd_snapct_tile_t * ctx ) {
271 0 : if( FD_UNLIKELY( !ctx->config.incremental_snapshots ) ) return;
272 0 : if( FD_UNLIKELY( ctx->predicted_incremental.full_slot==ULONG_MAX ) ) return;
273 :
274 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
275 :
276 0 : if( FD_LIKELY( best.addr.l ) ) {
277 0 : if( FD_UNLIKELY( ctx->predicted_incremental.slot!=best.incr_slot ) ) {
278 0 : ctx->predicted_incremental.slot = best.incr_slot;
279 0 : ctx->predicted_incremental.dirty = 1;
280 0 : }
281 0 : }
282 0 : }
283 :
284 : static void
285 : on_resolve( void * _ctx,
286 : fd_ip4_port_t addr,
287 : ulong full_slot,
288 0 : ulong incr_slot ) {
289 0 : fd_snapct_tile_t * ctx = (fd_snapct_tile_t *)_ctx;
290 :
291 0 : fd_sspeer_selector_add( ctx->selector, addr, ULONG_MAX, full_slot, incr_slot );
292 0 : fd_sspeer_selector_process_cluster_slot( ctx->selector, full_slot, incr_slot );
293 0 : predict_incremental( ctx );
294 0 : }
295 :
296 : static void
297 : on_ping( void * _ctx,
298 : fd_ip4_port_t addr,
299 0 : ulong latency ) {
300 0 : fd_snapct_tile_t * ctx = (fd_snapct_tile_t *)_ctx;
301 :
302 0 : fd_sspeer_selector_add( ctx->selector, addr, latency, ULONG_MAX, ULONG_MAX);
303 0 : predict_incremental( ctx );
304 0 : }
305 :
306 : static void
307 : on_snapshot_hash( fd_snapct_tile_t * ctx,
308 : fd_ip4_port_t addr,
309 0 : fd_gossip_update_message_t const * msg ) {
310 0 : ulong full_slot = msg->snapshot_hashes.full->slot;
311 0 : ulong incr_slot = 0UL;
312 :
313 0 : for( ulong i=0UL; i<msg->snapshot_hashes.incremental_len; i++ ) {
314 0 : if( FD_LIKELY( msg->snapshot_hashes.incremental[ i ].slot>incr_slot ) ) {
315 0 : incr_slot = msg->snapshot_hashes.incremental[ i ].slot;
316 0 : }
317 0 : }
318 :
319 0 : fd_sspeer_selector_add( ctx->selector, addr, ULONG_MAX, full_slot, incr_slot );
320 0 : fd_sspeer_selector_process_cluster_slot( ctx->selector, full_slot, incr_slot );
321 0 : predict_incremental( ctx );
322 0 : }
323 :
324 : static void
325 : send_expected_slot( fd_snapct_tile_t * ctx,
326 : fd_stem_context_t * stem,
327 0 : ulong slot ) {
328 0 : uint tsorig; uint tspub;
329 0 : fd_ssmsg_slot_to_frag( slot, &tsorig, &tspub );
330 0 : fd_stem_publish( stem, ctx->out_rp.idx, FD_SSMSG_EXPECTED_SLOT, 0UL, 0UL, 0UL, tsorig, tspub );
331 0 : }
332 :
333 : static void
334 0 : rename_snapshots( fd_snapct_tile_t * ctx ) {
335 0 : FD_TEST( -1!=ctx->local_out.dir_fd );
336 :
337 : /* FIXME: We should rename the full snapshot earlier as soon as the
338 : download is complete. That way, if the validator crashes during the
339 : incremental load, we can still use the snapshot on the next run. */
340 :
341 0 : if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd && ctx->http_full_snapshot_name[ 0 ]!='\0' ) ) {
342 0 : if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, TEMP_FULL_SNAP_NAME, ctx->local_out.dir_fd, ctx->http_full_snapshot_name ) ) )
343 0 : FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
344 0 : }
345 0 : if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd && ctx->http_incr_snapshot_name[ 0 ]!='\0' ) ) {
346 0 : if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, TEMP_INCR_SNAP_NAME, ctx->local_out.dir_fd, ctx->http_incr_snapshot_name ) ) )
347 0 : FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
348 0 : }
349 0 : }
350 :
351 : static ulong
352 : rlimit_file_cnt( fd_topo_t const * topo FD_PARAM_UNUSED,
353 0 : fd_topo_tile_t const * tile ) {
354 0 : ulong cnt = 1UL + /* stderr */
355 0 : 1UL; /* logfile */
356 0 : if( download_enabled( tile ) ) {
357 0 : cnt += 1UL + /* ssping socket */
358 0 : 2UL + /* dirfd + full snapshot download temp fd */
359 0 : tile->snapct.sources.servers_cnt; /* http resolver peer full sockets */
360 0 : if( tile->snapct.incremental_snapshots ) {
361 0 : cnt += 1UL + /* incr snapshot download temp fd */
362 0 : tile->snapct.sources.servers_cnt; /* http resolver peer incr sockets */
363 0 : }
364 0 : }
365 0 : return cnt;
366 0 : }
367 :
368 : static ulong
369 : populate_allowed_seccomp( fd_topo_t const * topo,
370 : fd_topo_tile_t const * tile,
371 : ulong out_cnt,
372 0 : struct sock_filter * out ) {
373 :
374 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
375 :
376 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
377 0 : fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
378 :
379 0 : int ping_fd = download_enabled( tile ) ? fd_ssping_get_sockfd( ctx->ssping ) : -1;
380 0 : populate_sock_filter_policy_fd_snapct_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->local_out.dir_fd, (uint)ctx->local_out.full_snapshot_fd, (uint)ctx->local_out.incremental_snapshot_fd, (uint)ping_fd );
381 0 : return sock_filter_policy_fd_snapct_tile_instr_cnt;
382 0 : }
383 :
384 : static ulong
385 : populate_allowed_fds( fd_topo_t const * topo,
386 : fd_topo_tile_t const * tile,
387 : ulong out_fds_cnt,
388 0 : int * out_fds ) {
389 0 : if( FD_UNLIKELY( out_fds_cnt<6UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
390 :
391 0 : ulong out_cnt = 0;
392 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
393 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
394 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
395 0 : }
396 :
397 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
398 :
399 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
400 0 : fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
401 0 : if( FD_LIKELY( -1!=ctx->local_out.dir_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.dir_fd;
402 0 : if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.full_snapshot_fd;
403 0 : if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.incremental_snapshot_fd;
404 0 : if( FD_LIKELY( download_enabled( tile ) ) ) out_fds[ out_cnt++ ] = fd_ssping_get_sockfd( ctx->ssping );
405 :
406 0 : return out_cnt;
407 0 : }
408 :
409 : static void
410 : init_load( fd_snapct_tile_t * ctx,
411 : fd_stem_context_t * stem,
412 : int full,
413 0 : int file ) {
414 0 : fd_ssctrl_init_t * out = fd_chunk_to_laddr( ctx->out_ld.mem, ctx->out_ld.chunk );
415 0 : out->file = file;
416 0 : out->zstd = !file || (full ? ctx->local_in.full_snapshot_zstd : ctx->local_in.incremental_snapshot_zstd);
417 0 : if( !file ) {
418 0 : out->addr = ctx->addr;
419 0 : for( ulong i=0UL; i<SERVER_PEERS_MAX; i++ ) {
420 0 : if( FD_UNLIKELY( ctx->addr.l==ctx->config.sources.servers[ i ].addr.l ) ) {
421 0 : fd_cstr_ncpy( out->hostname, ctx->config.sources.servers[ i ].hostname, sizeof(out->hostname) );
422 0 : out->is_https = ctx->config.sources.servers[ i ].is_https;
423 0 : break;
424 0 : }
425 0 : }
426 0 : }
427 0 : fd_stem_publish( stem, ctx->out_ld.idx, full ? FD_SNAPSHOT_MSG_CTRL_INIT_FULL : FD_SNAPSHOT_MSG_CTRL_INIT_INCR, ctx->out_ld.chunk, sizeof(fd_ssctrl_init_t), 0UL, 0UL, 0UL );
428 0 : ctx->out_ld.chunk = fd_dcache_compact_next( ctx->out_ld.chunk, sizeof(fd_ssctrl_init_t), ctx->out_ld.chunk0, ctx->out_ld.wmark );
429 0 : ctx->flush_ack = 0;
430 :
431 0 : if( file ) {
432 : /* When loading from a local file and not from HTTP, there is no
433 : future metadata message to initialize total size / filename, as
434 : these are already known immediately. */
435 0 : if( full ) {
436 0 : ctx->metrics.full.bytes_total = ctx->local_in.full_snapshot_size;
437 0 : fd_cstr_fini( ctx->http_full_snapshot_name );
438 0 : if( FD_LIKELY( !!ctx->out_gui.mem ) ) {
439 0 : snapshot_path_gui_publish( ctx, stem, ctx->local_in.full_snapshot_path, 1 );
440 0 : }
441 0 : } else {
442 0 : ctx->metrics.incremental.bytes_total = ctx->local_in.incremental_snapshot_size;
443 0 : fd_cstr_fini( ctx->http_incr_snapshot_name );
444 0 : if( FD_LIKELY( !!ctx->out_gui.mem ) ) {
445 0 : snapshot_path_gui_publish( ctx, stem, ctx->local_in.incremental_snapshot_path, 0 );
446 0 : }
447 0 : }
448 0 : }
449 0 : }
450 :
451 : static void
452 : log_download( fd_snapct_tile_t * ctx,
453 : int full,
454 : fd_ip4_port_t addr,
455 0 : ulong slot ) {
456 0 : for( gossip_ci_map_iter_t iter = gossip_ci_map_iter_init( ctx->gossip.ci_map, ctx->gossip.ci_table );
457 0 : !gossip_ci_map_iter_done( iter, ctx->gossip.ci_map, ctx->gossip.ci_table );
458 0 : iter = gossip_ci_map_iter_next( iter, ctx->gossip.ci_map, ctx->gossip.ci_table ) ) {
459 0 : gossip_ci_entry_t const * ci_entry = gossip_ci_map_iter_ele_const( iter, ctx->gossip.ci_map, ctx->gossip.ci_table );
460 0 : if( ci_entry->rpc_addr.l==addr.l ) {
461 0 : FD_TEST( ci_entry->allowed );
462 0 : FD_BASE58_ENCODE_32_BYTES( ci_entry->pubkey.uc, pubkey_b58 );
463 0 : FD_LOG_NOTICE(( "downloading %s snapshot at slot %lu from allowed gossip peer %s at http://" FD_IP4_ADDR_FMT ":%hu/%s",
464 0 : full ? "full" : "incremental", slot, pubkey_b58,
465 0 : FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ),
466 0 : full ? "snapshot.tar.bz2" : "incremental-snapshot.tar.bz2" ));
467 0 : return;
468 0 : }
469 0 : }
470 :
471 0 : for( ulong i=0UL; i<ctx->config.sources.servers_cnt; i++ ) {
472 0 : if( addr.l==ctx->config.sources.servers[ i ].addr.l ) {
473 0 : if( ctx->config.sources.servers[ i ].is_https ) {
474 0 : FD_LOG_NOTICE(( "downloading %s snapshot at slot %lu from configured server with index %lu at https://%s:%hu/%s",
475 0 : full ? "full" : "incremental", slot, i,
476 0 : ctx->config.sources.servers[ i ].hostname, fd_ushort_bswap( addr.port ),
477 0 : full ? "snapshot.tar.bz2" : "incremental-snapshot.tar.bz2" ));
478 0 : } else {
479 0 : FD_LOG_NOTICE(( "downloading %s snapshot at slot %lu from configured server with index %lu at http://" FD_IP4_ADDR_FMT ":%hu/%s",
480 0 : full ? "full" : "incremental", slot, i,
481 0 : FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ),
482 0 : full ? "snapshot.tar.bz2" : "incremental-snapshot.tar.bz2" ));
483 0 : }
484 0 : return;
485 0 : }
486 0 : }
487 :
488 0 : FD_TEST( 0 ); /* should not be possible */
489 0 : }
490 :
491 : static void
492 : after_credit( fd_snapct_tile_t * ctx,
493 : fd_stem_context_t * stem,
494 : int * opt_poll_in FD_PARAM_UNUSED,
495 0 : int * charge_busy FD_PARAM_UNUSED ) {
496 0 : long now = fd_log_wallclock();
497 :
498 0 : if( FD_LIKELY( ctx->ssping ) ) fd_ssping_advance( ctx->ssping, now, ctx->selector );
499 0 : if( FD_LIKELY( ctx->ssresolver ) ) fd_http_resolver_advance( ctx->ssresolver, now, ctx->selector );
500 :
501 : /* send an expected slot message as the predicted incremental
502 : could have changed as a result of the pinger, resolver, or from
503 : processing gossip frags in gossip_frag. */
504 0 : if( FD_LIKELY( ctx->predicted_incremental.dirty ) ) {
505 0 : send_expected_slot( ctx, stem, ctx->predicted_incremental.slot );
506 0 : ctx->predicted_incremental.dirty = 0;
507 0 : }
508 :
509 : /* Note: All state transitions should occur within this switch
510 : statement to make it easier to reason about the state management. */
511 :
512 : /* FIXME: Collapse WAITING_FOR_PEERS and COLLECTING_PEERS states for
513 : both full and incremental variants? */
514 : /* FIXME: Add INIT state so that we don't put the !download_enabled
515 : logic in waiting_for_peers, which is weird. */
516 :
517 0 : switch ( ctx->state ) {
518 :
519 : /* ============================================================== */
520 0 : case FD_SNAPCT_STATE_WAITING_FOR_PEERS: {
521 0 : if( FD_UNLIKELY( now>ctx->deadline_nanos ) ) FD_LOG_ERR(( "timed out waiting for peers." ));
522 :
523 0 : if( FD_UNLIKELY( !ctx->download_enabled ) ) {
524 0 : ulong local_slot = ctx->config.incremental_snapshots ? ctx->local_in.incremental_snapshot_slot : ctx->local_in.full_snapshot_slot;
525 0 : send_expected_slot( ctx, stem, local_slot );
526 0 : FD_LOG_NOTICE(( "reading full snapshot at slot %lu from local file `%s`", ctx->local_in.full_snapshot_slot, ctx->local_in.full_snapshot_path ));
527 0 : ctx->predicted_incremental.full_slot = ctx->local_in.full_snapshot_slot;
528 0 : ctx->state = FD_SNAPCT_STATE_READING_FULL_FILE;
529 0 : init_load( ctx, stem, 1, 1 );
530 0 : break;
531 0 : }
532 :
533 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, ULONG_MAX );
534 0 : if( FD_LIKELY( best.addr.l ) ) {
535 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS;
536 0 : ctx->deadline_nanos = now+FD_SNAPCT_COLLECTING_PEERS_TIMEOUT;
537 0 : }
538 0 : break;
539 0 : }
540 :
541 : /* ============================================================== */
542 0 : case FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL: {
543 : /* FIXME: Handle the case where we have no download peers enabled,
544 : boot off the local full snapshot but do not have a local incr. */
545 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, ULONG_MAX );
546 0 : if( FD_LIKELY( best.addr.l ) ) {
547 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
548 0 : ctx->deadline_nanos = now;
549 0 : }
550 0 : break;
551 0 : }
552 :
553 : /* ============================================================== */
554 0 : case FD_SNAPCT_STATE_COLLECTING_PEERS: {
555 0 : if( FD_UNLIKELY( !ctx->gossip.saturated && now<ctx->deadline_nanos ) ) break;
556 :
557 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, ULONG_MAX );
558 0 : if( FD_UNLIKELY( !best.addr.l ) ) {
559 0 : ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS;
560 0 : break;
561 0 : }
562 :
563 0 : fd_sscluster_slot_t cluster = fd_sspeer_selector_cluster_slot( ctx->selector );
564 0 : if( FD_UNLIKELY( cluster.incremental==ULONG_MAX && ctx->config.incremental_snapshots ) ) {
565 : /* We must have a cluster full slot to be in this state. */
566 0 : FD_TEST( cluster.full!=ULONG_MAX );
567 : /* fall back to full snapshot only if the highest cluster slot
568 : is a full snapshot only */
569 0 : ctx->config.incremental_snapshots = 0;
570 0 : }
571 :
572 : /* FIXME: Revisit the local age logic with new effective age
573 : concept. Measure cluster slot based on snapshots we can
574 : download / trust. Reevaluate incremental age after the full
575 : snapshot download is completed. etc. etc. */
576 :
577 0 : ulong cluster_slot = ctx->config.incremental_snapshots ? cluster.incremental : cluster.full;
578 0 : ulong local_slot = ctx->config.incremental_snapshots ? ctx->local_in.incremental_snapshot_slot : ctx->local_in.full_snapshot_slot;
579 0 : ulong local_slot_with_download = local_slot;
580 0 : int local_too_old = local_slot!=ULONG_MAX && local_slot<fd_ulong_sat_sub( cluster_slot, ctx->config.sources.max_local_incremental_age );
581 0 : int local_full_only = ctx->local_in.incremental_snapshot_slot==ULONG_MAX && ctx->local_in.full_snapshot_slot!=ULONG_MAX;
582 0 : if( FD_LIKELY( (ctx->config.incremental_snapshots && local_full_only) || local_too_old ) ) {
583 0 : fd_sspeer_t best_incremental = fd_sspeer_selector_best( ctx->selector, 1, ctx->local_in.full_snapshot_slot );
584 0 : if( FD_LIKELY( best_incremental.addr.l ) ) {
585 0 : ctx->predicted_incremental.slot = best_incremental.incr_slot;
586 0 : local_slot_with_download = best_incremental.incr_slot;
587 0 : ctx->local_in.incremental_snapshot_slot = ULONG_MAX; /* don't use the local incremental snapshot */
588 0 : }
589 0 : }
590 :
591 0 : int can_use_local_full = local_slot_with_download!=ULONG_MAX && local_slot_with_download>=fd_ulong_sat_sub( cluster_slot, ctx->config.sources.max_local_full_effective_age );
592 0 : if( FD_LIKELY( can_use_local_full ) ) {
593 0 : send_expected_slot( ctx, stem, local_slot );
594 :
595 0 : FD_LOG_NOTICE(( "reading full snapshot at slot %lu from local file `%s`", ctx->local_in.full_snapshot_slot, ctx->local_in.full_snapshot_path ));
596 0 : ctx->predicted_incremental.full_slot = ctx->local_in.full_snapshot_slot;
597 0 : ctx->state = FD_SNAPCT_STATE_READING_FULL_FILE;
598 0 : init_load( ctx, stem, 1, 1 );
599 0 : } else {
600 0 : if( FD_UNLIKELY( !ctx->config.incremental_snapshots ) ) send_expected_slot( ctx, stem, best.full_slot );
601 :
602 0 : fd_sspeer_t best_incremental = fd_sspeer_selector_best( ctx->selector, 1, best.full_slot );
603 0 : if( FD_LIKELY( best_incremental.addr.l ) ) {
604 0 : ctx->predicted_incremental.slot = best_incremental.incr_slot;
605 0 : send_expected_slot( ctx, stem, best_incremental.incr_slot );
606 0 : }
607 :
608 0 : ctx->addr = best.addr;
609 0 : ctx->state = FD_SNAPCT_STATE_READING_FULL_HTTP;
610 0 : ctx->predicted_incremental.full_slot = best.full_slot;
611 0 : init_load( ctx, stem, 1, 0 );
612 0 : log_download( ctx, 1, best.addr, best.full_slot );
613 0 : }
614 0 : break;
615 0 : }
616 :
617 : /* ============================================================== */
618 0 : case FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL: {
619 0 : if( FD_UNLIKELY( now<ctx->deadline_nanos ) ) break;
620 :
621 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
622 0 : if( FD_UNLIKELY( !best.addr.l ) ) {
623 0 : ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL;
624 0 : break;
625 0 : }
626 :
627 : /* FIXME: predicted_incremental? */
628 :
629 0 : ctx->addr = best.addr;
630 0 : ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP;
631 0 : init_load( ctx, stem, 0, 0 );
632 0 : log_download( ctx, 0, best.addr, best.incr_slot );
633 0 : break;
634 0 : }
635 :
636 : /* ============================================================== */
637 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE:
638 0 : if( !ctx->flush_ack ) break;
639 :
640 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
641 0 : ctx->malformed = 0;
642 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
643 0 : ctx->flush_ack = 0;
644 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET;
645 0 : FD_LOG_WARNING(( "error reading snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path ));
646 0 : break;
647 0 : }
648 :
649 0 : ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
650 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
651 0 : break;
652 :
653 : /* ============================================================== */
654 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP:
655 0 : if( !ctx->flush_ack ) break;
656 :
657 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
658 0 : ctx->malformed = 0;
659 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
660 0 : ctx->flush_ack = 0;
661 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
662 0 : FD_LOG_WARNING(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2",
663 0 : FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), fd_ushort_bswap( ctx->addr.port ) ));
664 0 : fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
665 0 : fd_sspeer_selector_remove( ctx->selector, ctx->addr );
666 0 : break;
667 0 : }
668 :
669 0 : ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
670 0 : rename_snapshots( ctx );
671 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
672 0 : break;
673 :
674 : /* ============================================================== */
675 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE:
676 0 : if( !ctx->flush_ack ) break;
677 :
678 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
679 0 : ctx->malformed = 0;
680 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
681 0 : ctx->flush_ack = 0;
682 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET;
683 0 : FD_LOG_WARNING(( "error reading snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
684 0 : break;
685 0 : }
686 :
687 0 : if( FD_LIKELY( !ctx->config.incremental_snapshots ) ) {
688 0 : ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
689 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
690 0 : break;
691 0 : }
692 :
693 0 : if( FD_LIKELY( ctx->local_in.incremental_snapshot_slot==ULONG_MAX ) ) {
694 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
695 0 : ctx->deadline_nanos = 0L;
696 0 : } else {
697 0 : FD_LOG_NOTICE(( "reading incremental snapshot at slot %lu from local file `%s`", ctx->local_in.incremental_snapshot_slot, ctx->local_in.incremental_snapshot_path ));
698 0 : ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_FILE;
699 0 : init_load( ctx, stem, 0, 1 );
700 0 : }
701 0 : break;
702 :
703 : /* ============================================================== */
704 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP:
705 0 : if( !ctx->flush_ack ) break;
706 :
707 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
708 0 : ctx->malformed = 0;
709 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
710 0 : ctx->flush_ack = 0;
711 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
712 0 : FD_LOG_WARNING(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
713 0 : FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), fd_ushort_bswap( ctx->addr.port ) ));
714 0 : fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
715 0 : fd_sspeer_selector_remove( ctx->selector, ctx->addr );
716 0 : break;
717 0 : }
718 :
719 0 : if( FD_LIKELY( !ctx->config.incremental_snapshots ) ) {
720 0 : ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
721 0 : rename_snapshots( ctx );
722 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
723 0 : break;
724 0 : }
725 :
726 : /* Get the best incremental peer to download from */
727 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
728 0 : if( FD_UNLIKELY( !best.addr.l ) ) {
729 : /* FIXME: We should just transition to collecting_peers_incremental
730 : here rather than failing the full snapshot? */
731 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
732 0 : ctx->flush_ack = 0;
733 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
734 0 : break;
735 0 : }
736 :
737 0 : if( FD_UNLIKELY( ctx->predicted_incremental.slot!=best.incr_slot ) ) {
738 0 : ctx->predicted_incremental.slot = best.incr_slot;
739 0 : send_expected_slot( ctx, stem, best.incr_slot );
740 0 : }
741 :
742 0 : ctx->addr = best.addr;
743 0 : ctx->state = FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP;
744 0 : init_load( ctx, stem, 0, 0 );
745 0 : log_download( ctx, 0, best.addr, best.incr_slot );
746 0 : break;
747 :
748 : /* ============================================================== */
749 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
750 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET:
751 0 : if( !ctx->flush_ack ) break;
752 :
753 0 : if( ctx->metrics.full.num_retries==ctx->config.max_retry_abort ) {
754 0 : FD_LOG_WARNING(( "hit retry limit of %u for full snapshot, aborting", ctx->config.max_retry_abort ));
755 0 : ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
756 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
757 0 : break;
758 0 : }
759 :
760 0 : ctx->metrics.full.num_retries++;
761 :
762 0 : ctx->metrics.full.bytes_read = 0UL;
763 0 : ctx->metrics.full.bytes_written = 0UL;
764 0 : ctx->metrics.full.bytes_total = 0UL;
765 :
766 0 : ctx->metrics.incremental.bytes_read = 0UL;
767 0 : ctx->metrics.incremental.bytes_written = 0UL;
768 0 : ctx->metrics.incremental.bytes_total = 0UL;
769 :
770 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS;
771 0 : ctx->deadline_nanos = 0L;
772 0 : break;
773 :
774 : /* ============================================================== */
775 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET:
776 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
777 0 : if( !ctx->flush_ack ) break;
778 :
779 0 : if( ctx->metrics.incremental.num_retries==ctx->config.max_retry_abort ) {
780 0 : FD_LOG_WARNING(("hit retry limit of %u for incremental snapshot, aborting", ctx->config.max_retry_abort ));
781 0 : ctx->state = FD_SNAPCT_STATE_SHUTDOWN;
782 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
783 0 : break;
784 0 : }
785 :
786 0 : ctx->metrics.incremental.num_retries++;
787 :
788 0 : ctx->metrics.incremental.bytes_read = 0UL;
789 0 : ctx->metrics.incremental.bytes_written = 0UL;
790 0 : ctx->metrics.incremental.bytes_total = 0UL;
791 :
792 0 : ctx->state = FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL;
793 0 : ctx->deadline_nanos = 0L;
794 0 : break;
795 :
796 : /* ============================================================== */
797 0 : case FD_SNAPCT_STATE_READING_FULL_FILE:
798 0 : if( FD_UNLIKELY( !ctx->flush_ack ) ) break;
799 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
800 0 : ctx->malformed = 0;
801 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
802 0 : ctx->flush_ack = 0;
803 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET;
804 0 : FD_LOG_WARNING(( "error reading snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
805 0 : break;
806 0 : }
807 0 : FD_TEST( ctx->metrics.full.bytes_total!=0UL );
808 0 : if( FD_UNLIKELY( ctx->metrics.full.bytes_read == ctx->metrics.full.bytes_total ) ) {
809 0 : ulong sig = ctx->config.incremental_snapshots ? FD_SNAPSHOT_MSG_CTRL_NEXT : FD_SNAPSHOT_MSG_CTRL_DONE;
810 0 : fd_stem_publish( stem, ctx->out_ld.idx, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
811 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_FILE;
812 0 : ctx->flush_ack = 0;
813 0 : }
814 0 : break;
815 :
816 : /* ============================================================== */
817 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
818 0 : if( FD_UNLIKELY( !ctx->flush_ack ) ) break;
819 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
820 0 : ctx->malformed = 0;
821 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
822 0 : ctx->flush_ack = 0;
823 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET;
824 0 : FD_LOG_WARNING(( "error reading snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path ));
825 0 : break;
826 0 : }
827 0 : FD_TEST( ctx->metrics.incremental.bytes_total!=0UL );
828 0 : if ( FD_UNLIKELY( ctx->metrics.incremental.bytes_read == ctx->metrics.incremental.bytes_total ) ) {
829 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
830 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE;
831 0 : ctx->flush_ack = 0;
832 0 : }
833 0 : break;
834 :
835 : /* ============================================================== */
836 0 : case FD_SNAPCT_STATE_READING_FULL_HTTP:
837 0 : if( FD_UNLIKELY( !ctx->flush_ack ) ) break;
838 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
839 0 : ctx->malformed = 0;
840 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
841 0 : ctx->flush_ack = 0;
842 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET;
843 0 : FD_LOG_WARNING(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
844 0 : FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), fd_ushort_bswap( ctx->addr.port ) ));
845 0 : fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
846 0 : fd_sspeer_selector_remove( ctx->selector, ctx->addr );
847 0 : break;
848 0 : }
849 0 : if( FD_UNLIKELY( ctx->metrics.full.bytes_total!=0UL && ctx->metrics.full.bytes_read==ctx->metrics.full.bytes_total ) ) {
850 0 : ulong sig = ctx->config.incremental_snapshots ? FD_SNAPSHOT_MSG_CTRL_NEXT : FD_SNAPSHOT_MSG_CTRL_DONE;
851 0 : fd_stem_publish( stem, ctx->out_ld.idx, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
852 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_FULL_HTTP;
853 0 : ctx->flush_ack = 0;
854 0 : }
855 0 : break;
856 :
857 : /* ============================================================== */
858 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP:
859 0 : if( FD_UNLIKELY( !ctx->flush_ack ) ) break;
860 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
861 0 : ctx->malformed = 0;
862 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
863 0 : ctx->flush_ack = 0;
864 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
865 0 : FD_LOG_WARNING(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2",
866 0 : FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), fd_ushort_bswap( ctx->addr.port ) ));
867 0 : fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
868 0 : fd_sspeer_selector_remove( ctx->selector, ctx->addr );
869 0 : break;
870 0 : }
871 0 : if ( FD_UNLIKELY( ctx->metrics.incremental.bytes_total!=0UL && ctx->metrics.incremental.bytes_read==ctx->metrics.incremental.bytes_total ) ) {
872 0 : fd_stem_publish( stem, ctx->out_ld.idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
873 0 : ctx->state = FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP;
874 0 : ctx->flush_ack = 0;
875 0 : }
876 0 : break;
877 :
878 : /* ============================================================== */
879 0 : case FD_SNAPCT_STATE_SHUTDOWN:
880 0 : break;
881 :
882 : /* ============================================================== */
883 0 : default: FD_LOG_ERR(( "unexpected state %d", ctx->state ));
884 0 : }
885 0 : }
886 :
887 : static void
888 : gossip_frag( fd_snapct_tile_t * ctx,
889 : ulong sig,
890 : ulong sz FD_PARAM_UNUSED,
891 0 : ulong chunk ) {
892 0 : FD_TEST( ctx->gossip_enabled );
893 :
894 0 : if( !( sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO ||
895 0 : sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE ||
896 0 : sig==FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES ) ) return;
897 :
898 0 : fd_gossip_update_message_t const * msg = fd_chunk_to_laddr_const( ctx->gossip_in_mem, chunk );
899 0 : fd_pubkey_t const * pubkey = (fd_pubkey_t const *)msg->origin_pubkey;
900 0 : switch( msg->tag ) {
901 0 : case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO: {
902 0 : FD_TEST( msg->contact_info.idx<GOSSIP_PEERS_MAX );
903 0 : gossip_ci_entry_t * entry = ctx->gossip.ci_table + msg->contact_info.idx;
904 0 : if( FD_UNLIKELY( !fd_pubkey_eq( &entry->pubkey, pubkey ) ) ) {
905 : /* Initialize the new gossip entry, which may or may not be allowed */
906 0 : FD_TEST( fd_pubkey_check_zero( &entry->pubkey ) );
907 0 : entry->pubkey = *pubkey;
908 0 : entry->rpc_addr.l = 0UL;
909 0 : entry->added_nanos = fd_log_wallclock();
910 0 : if( ctx->config.sources.gossip.allow_any ) {
911 0 : entry->allowed = 1;
912 0 : for( ulong i=0UL; i<ctx->config.sources.gossip.block_list_cnt; i++ ) {
913 0 : if( fd_pubkey_eq( pubkey, &ctx->config.sources.gossip.block_list[ i ] ) ) {
914 0 : entry->allowed = 0;
915 0 : break;
916 0 : }
917 0 : }
918 0 : } else {
919 0 : entry->allowed = 0;
920 0 : for( ulong i=0UL; i<ctx->config.sources.gossip.allow_list_cnt; i++ ) {
921 0 : if( fd_pubkey_eq( pubkey, &ctx->config.sources.gossip.allow_list[ i ] ) ) {
922 0 : entry->allowed = 1;
923 0 : break;
924 0 : }
925 0 : }
926 0 : }
927 0 : FD_TEST( ULONG_MAX==gossip_ci_map_idx_query_const( ctx->gossip.ci_map, pubkey, ULONG_MAX, ctx->gossip.ci_table ) );
928 0 : if( entry->allowed ) gossip_ci_map_idx_insert( ctx->gossip.ci_map, msg->contact_info.idx, ctx->gossip.ci_table );
929 0 : }
930 0 : if( !entry->allowed ) break;
931 : /* Maybe update the RPC address of a new or existing allowed gossip peer */
932 0 : fd_ip4_port_t cur_addr = entry->rpc_addr;
933 0 : fd_ip4_port_t new_addr = msg->contact_info.contact_info->sockets[ FD_CONTACT_INFO_SOCKET_RPC ];
934 0 : if( FD_UNLIKELY( new_addr.l!=cur_addr.l ) ) {
935 0 : entry->rpc_addr = new_addr;
936 0 : if( FD_LIKELY( !!cur_addr.l ) ) {
937 0 : int removed = fd_ssping_remove( ctx->ssping, cur_addr );
938 0 : if( FD_LIKELY( removed ) ) fd_sspeer_selector_remove( ctx->selector, cur_addr );
939 0 : }
940 0 : if( FD_LIKELY( !!new_addr.l ) ) fd_ssping_add( ctx->ssping, new_addr );
941 0 : if( !ctx->config.sources.gossip.allow_any ) {
942 0 : FD_BASE58_ENCODE_32_BYTES( pubkey->uc, pubkey_b58 );
943 0 : if( FD_LIKELY( !!new_addr.l ) ) {
944 0 : FD_LOG_NOTICE(( "allowed gossip peer added with public key `%s` and RPC address `" FD_IP4_ADDR_FMT ":%hu`",
945 0 : pubkey_b58, FD_IP4_ADDR_FMT_ARGS( new_addr.addr ), fd_ushort_bswap( new_addr.port ) ));
946 0 : } else {
947 0 : FD_LOG_WARNING(( "allowed gossip peer with public key `%s` does not advertise an RPC address", pubkey_b58 ));
948 0 : }
949 0 : }
950 0 : }
951 0 : break;
952 0 : }
953 0 : case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE: {
954 0 : FD_TEST( msg->contact_info_remove.idx<GOSSIP_PEERS_MAX );
955 0 : gossip_ci_entry_t * entry = ctx->gossip.ci_table + msg->contact_info_remove.idx;
956 0 : if( FD_UNLIKELY( !fd_pubkey_eq( &entry->pubkey, pubkey ) ) ) {
957 0 : FD_TEST( fd_pubkey_check_zero( &entry->pubkey ) );
958 0 : break;
959 0 : }
960 0 : ulong rem_idx = gossip_ci_map_idx_remove( ctx->gossip.ci_map, pubkey, ULONG_MAX, ctx->gossip.ci_table );
961 0 : if( rem_idx==ULONG_MAX ) break;
962 0 : FD_TEST( entry->allowed && rem_idx==msg->contact_info_remove.idx );
963 0 : fd_ip4_port_t addr = entry->rpc_addr;
964 0 : if( FD_LIKELY( !!addr.l ) ) {
965 0 : int removed = fd_ssping_remove( ctx->ssping, addr );
966 0 : if( FD_LIKELY( removed ) ) fd_sspeer_selector_remove( ctx->selector, addr );
967 0 : }
968 0 : if( !ctx->config.sources.gossip.allow_any ) {
969 0 : FD_BASE58_ENCODE_32_BYTES( pubkey->uc, pubkey_b58 );
970 0 : FD_LOG_WARNING(( "allowed gossip peer removed with public key `%s` and RPC address `" FD_IP4_ADDR_FMT ":%hu`",
971 0 : pubkey_b58, FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ) ));
972 0 : }
973 0 : fd_memset( entry, 0, sizeof(*entry) );
974 0 : break;
975 0 : }
976 0 : case FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES: {
977 0 : ulong idx = gossip_ci_map_idx_query_const( ctx->gossip.ci_map, pubkey, ULONG_MAX, ctx->gossip.ci_table );
978 0 : if( FD_LIKELY( idx!=ULONG_MAX ) ) {
979 0 : gossip_ci_entry_t * entry = ctx->gossip.ci_table + idx;
980 0 : FD_TEST( entry->allowed );
981 0 : on_snapshot_hash( ctx, entry->rpc_addr, msg );
982 0 : }
983 0 : break;
984 0 : }
985 0 : default:
986 0 : FD_LOG_ERR(( "snapct: unexpected gossip tag %u", (uint)msg->tag ));
987 0 : break;
988 0 : }
989 0 : }
990 :
991 : static void
992 : snapld_frag( fd_snapct_tile_t * ctx,
993 : ulong sig,
994 : ulong sz,
995 : ulong chunk,
996 0 : fd_stem_context_t * stem ) {
997 0 : if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_META ) ) {
998 : /* Before snapld starts sending down data fragments, it first sends
999 : a metadata message containing the total size of the snapshot as
1000 : well as the filename. This is only done for HTTP loading. */
1001 0 : int full;
1002 0 : switch( ctx->state ) {
1003 0 : case FD_SNAPCT_STATE_READING_FULL_HTTP: full = 1; break;
1004 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP: full = 0; break;
1005 :
1006 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
1007 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
1008 0 : return; /* Ignore */
1009 0 : default: FD_LOG_ERR(( "invalid meta frag in state %d", ctx->state ));
1010 0 : }
1011 :
1012 0 : FD_TEST( sz==sizeof(fd_ssctrl_meta_t) );
1013 0 : fd_ssctrl_meta_t const * meta = fd_chunk_to_laddr_const( ctx->snapld_in_mem, chunk );
1014 :
1015 0 : fd_memcpy( full ? ctx->http_full_snapshot_name : ctx->http_incr_snapshot_name, meta->name, PATH_MAX );
1016 :
1017 0 : if( FD_LIKELY( !!ctx->out_gui.mem ) ) {
1018 0 : char snapshot_path[ PATH_MAX+30UL ]; /* 30 is fd_cstr_nlen( "https://255.255.255.255:65536/", ULONG_MAX ) */
1019 0 : FD_TEST( fd_cstr_printf_check( snapshot_path, sizeof(snapshot_path), NULL, "http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), fd_ushort_bswap( ctx->addr.port ), meta->name ) );
1020 0 : snapshot_path_gui_publish( ctx, stem, snapshot_path, full );
1021 0 : }
1022 :
1023 0 : if( full ) ctx->metrics.full.bytes_total = meta->total_sz;
1024 0 : else ctx->metrics.incremental.bytes_total = meta->total_sz;
1025 :
1026 0 : return;
1027 0 : }
1028 0 : if( FD_UNLIKELY( sig!=FD_SNAPSHOT_MSG_DATA ) ) return;
1029 :
1030 0 : int full, file;
1031 0 : switch( ctx->state ) {
1032 : /* Expected cases, fall through below */
1033 0 : case FD_SNAPCT_STATE_READING_FULL_FILE: full = 1; file = 1; break;
1034 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE: full = 0; file = 1; break;
1035 0 : case FD_SNAPCT_STATE_READING_FULL_HTTP: full = 1; file = 0; break;
1036 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP: full = 0; file = 0; break;
1037 :
1038 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET:
1039 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET:
1040 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET:
1041 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
1042 : /* We are waiting for a reset to fully propagate through the
1043 : pipeline, just throw away any trailing data frags. */
1044 0 : return;
1045 :
1046 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE:
1047 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE:
1048 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP:
1049 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP:
1050 : /* Based on previously received data frags, we expected that the
1051 : current full / incremental snapshot was finished, but then we
1052 : received additional data frags. Unsafe to continue so throw
1053 : away the whole snapshot. */
1054 0 : if( !ctx->malformed ) {
1055 0 : ctx->malformed = 1;
1056 0 : FD_LOG_WARNING(( "complete snapshot loaded but read %lu extra bytes", sz ));
1057 0 : }
1058 0 : return;
1059 :
1060 0 : case FD_SNAPCT_STATE_WAITING_FOR_PEERS:
1061 0 : case FD_SNAPCT_STATE_WAITING_FOR_PEERS_INCREMENTAL:
1062 0 : case FD_SNAPCT_STATE_COLLECTING_PEERS:
1063 0 : case FD_SNAPCT_STATE_COLLECTING_PEERS_INCREMENTAL:
1064 0 : case FD_SNAPCT_STATE_SHUTDOWN:
1065 0 : default:
1066 0 : FD_LOG_ERR(( "invalid data frag in state %d", ctx->state ));
1067 0 : return;
1068 0 : }
1069 :
1070 0 : if( full ) FD_TEST( ctx->metrics.full.bytes_total !=0UL );
1071 0 : else FD_TEST( ctx->metrics.incremental.bytes_total!=0UL );
1072 :
1073 0 : if( full ) ctx->metrics.full.bytes_read += sz;
1074 0 : else ctx->metrics.incremental.bytes_read += sz;
1075 :
1076 0 : if( !file && -1!=ctx->local_out.dir_fd ) {
1077 0 : uchar const * data = fd_chunk_to_laddr_const( ctx->snapld_in_mem, chunk );
1078 0 : int fd = full ? ctx->local_out.full_snapshot_fd : ctx->local_out.incremental_snapshot_fd;
1079 0 : long result = write( fd, data, sz );
1080 0 : if( FD_UNLIKELY( -1==result && errno==ENOSPC ) ) {
1081 0 : FD_LOG_ERR(( "Out of disk space when writing out snapshot data to `%s`", ctx->config.snapshots_path ));
1082 0 : } else if( FD_UNLIKELY( 0L>result ) ) {
1083 0 : FD_LOG_ERR(( "write() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
1084 0 : } else if( FD_UNLIKELY( sz!=(ulong)result ) ) {
1085 0 : FD_LOG_ERR(( "paritial write(%lu)=%ld", sz, result ));
1086 0 : }
1087 0 : if( full ) ctx->metrics.full.bytes_written += sz;
1088 0 : else ctx->metrics.incremental.bytes_written += sz;
1089 0 : }
1090 :
1091 0 : if( FD_UNLIKELY( ( full && ctx->metrics.full.bytes_read > ctx->metrics.full.bytes_total ) ||
1092 0 : (!full && ctx->metrics.incremental.bytes_read > ctx->metrics.incremental.bytes_total ) ) ) {
1093 0 : if( !ctx->malformed ) {
1094 0 : ctx->malformed = 1;
1095 0 : FD_LOG_WARNING(( "expected %s snapshot size of %lu bytes but read %lu bytes",
1096 0 : full ? "full" : "incremental",
1097 0 : full ? ctx->metrics.full.bytes_total : ctx->metrics.incremental.bytes_total,
1098 0 : full ? ctx->metrics.full.bytes_read : ctx->metrics.incremental.bytes_read ));
1099 :
1100 0 : }
1101 0 : }
1102 0 : }
1103 :
1104 : static void
1105 : snapin_frag( fd_snapct_tile_t * ctx,
1106 0 : ulong sig ) {
1107 0 : switch( sig ) {
1108 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
1109 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_READING_FULL_HTTP ||
1110 0 : ctx->state==FD_SNAPCT_STATE_READING_FULL_FILE ) ) {
1111 0 : FD_TEST( !ctx->flush_ack );
1112 0 : ctx->flush_ack = 1;
1113 0 : } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
1114 0 : break;
1115 :
1116 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
1117 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP ||
1118 0 : ctx->state==FD_SNAPCT_STATE_READING_INCREMENTAL_FILE ) ) {
1119 0 : FD_TEST( !ctx->flush_ack );
1120 0 : ctx->flush_ack = 1;
1121 0 : } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
1122 0 : break;
1123 :
1124 0 : case FD_SNAPSHOT_MSG_CTRL_NEXT:
1125 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP ||
1126 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE ) ) {
1127 0 : FD_TEST( !ctx->flush_ack );
1128 0 : ctx->flush_ack = 1;
1129 0 : } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
1130 0 : break;
1131 :
1132 0 : case FD_SNAPSHOT_MSG_CTRL_DONE:
1133 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP ||
1134 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE ||
1135 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP ||
1136 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE ) ) {
1137 0 : FD_TEST( !ctx->flush_ack );
1138 0 : ctx->flush_ack = 1;
1139 0 : } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
1140 0 : break;
1141 :
1142 0 : case FD_SNAPSHOT_MSG_CTRL_FAIL:
1143 0 : if( FD_LIKELY( ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_HTTP_RESET ||
1144 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_FULL_FILE_RESET ||
1145 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP_RESET ||
1146 0 : ctx->state==FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE_RESET ) ) {
1147 0 : FD_TEST( !ctx->flush_ack );
1148 0 : ctx->flush_ack = 1;
1149 0 : } else FD_LOG_ERR(( "invalid control frag %lu in state %d", sig, ctx->state ));
1150 0 : break;
1151 :
1152 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
1153 0 : break;
1154 :
1155 0 : case FD_SNAPSHOT_MSG_CTRL_ERROR:
1156 0 : switch( ctx->state ) {
1157 0 : case FD_SNAPCT_STATE_READING_FULL_FILE:
1158 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_FILE:
1159 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_FILE:
1160 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_FILE:
1161 0 : case FD_SNAPCT_STATE_READING_FULL_HTTP:
1162 0 : case FD_SNAPCT_STATE_FLUSHING_FULL_HTTP:
1163 0 : case FD_SNAPCT_STATE_READING_INCREMENTAL_HTTP:
1164 0 : case FD_SNAPCT_STATE_FLUSHING_INCREMENTAL_HTTP:
1165 0 : ctx->malformed = 1;
1166 0 : break;
1167 0 : default:
1168 0 : break;
1169 0 : }
1170 0 : break;
1171 0 : }
1172 0 : }
1173 :
1174 : static int
1175 : returnable_frag( fd_snapct_tile_t * ctx,
1176 : ulong in_idx,
1177 : ulong seq FD_PARAM_UNUSED,
1178 : ulong sig,
1179 : ulong chunk,
1180 : ulong sz,
1181 : ulong ctl FD_PARAM_UNUSED,
1182 : ulong tsorig FD_PARAM_UNUSED,
1183 : ulong tspub FD_PARAM_UNUSED,
1184 0 : fd_stem_context_t * stem ) {
1185 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) {
1186 0 : gossip_frag( ctx, sig, sz, chunk );
1187 0 : } else if( ctx->in_kind[ in_idx ]==IN_KIND_SNAPLD ) {
1188 0 : snapld_frag( ctx, sig, sz, chunk, stem );
1189 0 : } else if( ctx->in_kind[ in_idx ]==IN_KIND_ACK ) {
1190 0 : snapin_frag( ctx, sig );
1191 0 : } else FD_LOG_ERR(( "invalid in_kind %lu %u", in_idx, (uint)ctx->in_kind[ in_idx ] ));
1192 0 : return 0;
1193 0 : }
1194 :
1195 : static void
1196 : privileged_init( fd_topo_t * topo,
1197 0 : fd_topo_tile_t * tile ) {
1198 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1199 :
1200 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1201 0 : fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
1202 0 : void * _ssping = FD_SCRATCH_ALLOC_APPEND( l, fd_ssping_align(), fd_ssping_footprint( TOTAL_PEERS_MAX ) );
1203 0 : FD_SCRATCH_ALLOC_APPEND( l, alignof(gossip_ci_entry_t), sizeof(gossip_ci_entry_t)*GOSSIP_PEERS_MAX );
1204 0 : FD_SCRATCH_ALLOC_APPEND( l, gossip_ci_map_align(), gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
1205 0 : void * _ssresolver = FD_SCRATCH_ALLOC_APPEND( l, fd_http_resolver_align(), fd_http_resolver_footprint( SERVER_PEERS_MAX ) );
1206 0 : FD_SCRATCH_ALLOC_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( TOTAL_PEERS_MAX ) );
1207 :
1208 0 : #if FD_HAS_OPENSSL
1209 0 : void * _alloc = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
1210 0 : fd_alloc_t * alloc = fd_alloc_join( fd_alloc_new( _alloc, 1UL ), tile->kind_id );
1211 0 : fd_ossl_tile_init( alloc );
1212 0 : #endif
1213 :
1214 0 : ctx->ssping = NULL;
1215 0 : if( FD_LIKELY( download_enabled( tile ) ) ) ctx->ssping = fd_ssping_join( fd_ssping_new( _ssping, TOTAL_PEERS_MAX, 1UL, on_ping, ctx ) );
1216 0 : if( FD_LIKELY( tile->snapct.sources.servers_cnt ) ) ctx->ssresolver = fd_http_resolver_join( fd_http_resolver_new( _ssresolver, SERVER_PEERS_MAX, tile->snapct.incremental_snapshots, on_resolve, ctx ) );
1217 0 : else ctx->ssresolver = NULL;
1218 :
1219 : /* FIXME: We will keep too many snapshots if we have local snapshots
1220 : but elect not to use them due to their age. */
1221 0 : fd_ssarchive_remove_old_snapshots( tile->snapct.snapshots_path,
1222 0 : tile->snapct.max_full_snapshots_to_keep,
1223 0 : tile->snapct.max_incremental_snapshots_to_keep );
1224 :
1225 0 : ulong full_slot = ULONG_MAX;
1226 0 : ulong incremental_slot = ULONG_MAX;
1227 0 : int full_is_zstd = 0;
1228 0 : int incremental_is_zstd = 0;
1229 0 : char full_path[ PATH_MAX ] = {0};
1230 0 : char incremental_path[ PATH_MAX ] = {0};
1231 0 : if( FD_UNLIKELY( -1==fd_ssarchive_latest_pair( tile->snapct.snapshots_path,
1232 0 : tile->snapct.incremental_snapshots,
1233 0 : &full_slot,
1234 0 : &incremental_slot,
1235 0 : full_path,
1236 0 : incremental_path,
1237 0 : &full_is_zstd,
1238 0 : &incremental_is_zstd ) ) ) {
1239 0 : if( FD_UNLIKELY( !download_enabled( tile ) ) ) {
1240 0 : FD_LOG_ERR(( "No snapshots found in `%s` and no download sources are enabled. "
1241 0 : "Please enable downloading via [snapshots.sources] and restart.", tile->snapct.snapshots_path ));
1242 0 : }
1243 0 : ctx->local_in.full_snapshot_slot = ULONG_MAX;
1244 0 : ctx->local_in.incremental_snapshot_slot = ULONG_MAX;
1245 0 : ctx->local_in.full_snapshot_size = 0UL;
1246 0 : ctx->local_in.incremental_snapshot_size = 0UL;
1247 0 : ctx->local_in.full_snapshot_zstd = 0;
1248 0 : ctx->local_in.incremental_snapshot_zstd = 0;
1249 0 : fd_cstr_fini( ctx->local_in.full_snapshot_path );
1250 0 : fd_cstr_fini( ctx->local_in.incremental_snapshot_path );
1251 0 : } else {
1252 0 : FD_TEST( full_slot!=ULONG_MAX );
1253 :
1254 0 : ctx->local_in.full_snapshot_slot = full_slot;
1255 0 : ctx->local_in.incremental_snapshot_slot = incremental_slot;
1256 0 : ctx->local_in.full_snapshot_zstd = full_is_zstd;
1257 0 : ctx->local_in.incremental_snapshot_zstd = incremental_is_zstd;
1258 :
1259 0 : strncpy( ctx->local_in.full_snapshot_path, full_path, PATH_MAX );
1260 0 : struct stat full_stat;
1261 0 : if( FD_UNLIKELY( -1==stat( ctx->local_in.full_snapshot_path, &full_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", full_path, errno, fd_io_strerror( errno ) ));
1262 0 : if( FD_UNLIKELY( !S_ISREG( full_stat.st_mode ) ) ) FD_LOG_ERR(( "full snapshot path `%s` is not a regular file", full_path ));
1263 0 : ctx->local_in.full_snapshot_size = (ulong)full_stat.st_size;
1264 :
1265 0 : if( FD_LIKELY( incremental_slot!=ULONG_MAX ) ) {
1266 0 : strncpy( ctx->local_in.incremental_snapshot_path, incremental_path, PATH_MAX );
1267 0 : struct stat incremental_stat;
1268 0 : if( FD_UNLIKELY( -1==stat( ctx->local_in.incremental_snapshot_path, &incremental_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", incremental_path, errno, fd_io_strerror( errno ) ));
1269 0 : if( FD_UNLIKELY( !S_ISREG( incremental_stat.st_mode ) ) ) FD_LOG_ERR(( "incremental snapshot path `%s` is not a regular file", incremental_path ));
1270 0 : ctx->local_in.incremental_snapshot_size = (ulong)incremental_stat.st_size;
1271 0 : } else {
1272 0 : ctx->local_in.incremental_snapshot_size = 0UL;
1273 0 : fd_cstr_fini( ctx->local_in.incremental_snapshot_path );
1274 0 : }
1275 0 : }
1276 :
1277 0 : ctx->local_out.dir_fd = -1;
1278 0 : ctx->local_out.full_snapshot_fd = -1;
1279 0 : ctx->local_out.incremental_snapshot_fd = -1;
1280 0 : if( FD_LIKELY( download_enabled( tile ) ) ) {
1281 0 : ctx->local_out.dir_fd = open( tile->snapct.snapshots_path, O_DIRECTORY|O_CLOEXEC );
1282 0 : if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) FD_LOG_ERR(( "open(%s) failed (%i-%s)", tile->snapct.snapshots_path, errno, fd_io_strerror( errno ) ));
1283 :
1284 0 : ctx->local_out.full_snapshot_fd = openat( ctx->local_out.dir_fd, TEMP_FULL_SNAP_NAME, O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
1285 0 : if( FD_UNLIKELY( -1==ctx->local_out.full_snapshot_fd ) ) FD_LOG_ERR(( "open(%s/%s) failed (%i-%s)", tile->snapct.snapshots_path, TEMP_FULL_SNAP_NAME, errno, fd_io_strerror( errno ) ));
1286 :
1287 0 : if( FD_LIKELY( tile->snapct.incremental_snapshots ) ) {
1288 0 : ctx->local_out.incremental_snapshot_fd = openat( ctx->local_out.dir_fd, TEMP_INCR_SNAP_NAME, O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
1289 0 : if( FD_UNLIKELY( -1==ctx->local_out.incremental_snapshot_fd ) ) FD_LOG_ERR(( "open(%s/%s) failed (%i-%s)", tile->snapct.snapshots_path, TEMP_INCR_SNAP_NAME, errno, fd_io_strerror( errno ) ));
1290 0 : }
1291 0 : }
1292 0 : }
1293 :
1294 : static inline fd_snapct_out_link_t
1295 : out1( fd_topo_t const * topo,
1296 : fd_topo_tile_t const * tile,
1297 0 : char const * name ) {
1298 0 : ulong idx = ULONG_MAX;
1299 :
1300 0 : for( ulong i=0UL; i<tile->out_cnt; i++ ) {
1301 0 : fd_topo_link_t const * link = &topo->links[ tile->out_link_id[ i ] ];
1302 0 : if( !strcmp( link->name, name ) ) {
1303 0 : if( FD_UNLIKELY( idx!=ULONG_MAX ) ) FD_LOG_ERR(( "tile %s:%lu had multiple output links named %s but expected one", tile->name, tile->kind_id, name ));
1304 0 : idx = i;
1305 0 : }
1306 0 : }
1307 :
1308 0 : if( FD_UNLIKELY( idx==ULONG_MAX ) ) return (fd_snapct_out_link_t){ .idx = ULONG_MAX, .mem = NULL, .chunk0 = 0, .wmark = 0, .chunk = 0, .mtu = 0 };
1309 :
1310 0 : ulong mtu = topo->links[ tile->out_link_id[ idx ] ].mtu;
1311 0 : if( FD_UNLIKELY( mtu==0UL ) ) return (fd_snapct_out_link_t){ .idx = idx, .mem = NULL, .chunk0 = ULONG_MAX, .wmark = ULONG_MAX, .chunk = ULONG_MAX, .mtu = mtu };
1312 :
1313 0 : void * mem = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ idx ] ].dcache_obj_id ].wksp_id ].wksp;
1314 0 : ulong chunk0 = fd_dcache_compact_chunk0( mem, topo->links[ tile->out_link_id[ idx ] ].dcache );
1315 0 : ulong wmark = fd_dcache_compact_wmark ( mem, topo->links[ tile->out_link_id[ idx ] ].dcache, mtu );
1316 0 : return (fd_snapct_out_link_t){ .idx = idx, .mem = mem, .chunk0 = chunk0, .wmark = wmark, .chunk = chunk0, .mtu = mtu };
1317 0 : }
1318 :
1319 : static void
1320 : unprivileged_init( fd_topo_t * topo,
1321 0 : fd_topo_tile_t * tile ) {
1322 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1323 :
1324 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1325 0 : fd_snapct_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapct_tile_t), sizeof(fd_snapct_tile_t) );
1326 0 : FD_SCRATCH_ALLOC_APPEND( l, fd_ssping_align(), fd_ssping_footprint( TOTAL_PEERS_MAX ) );
1327 0 : void * _ci_table = FD_SCRATCH_ALLOC_APPEND( l, alignof(gossip_ci_entry_t), sizeof(gossip_ci_entry_t) * GOSSIP_PEERS_MAX );
1328 0 : void * _ci_map = FD_SCRATCH_ALLOC_APPEND( l, gossip_ci_map_align(), gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ) ) );
1329 0 : FD_SCRATCH_ALLOC_APPEND( l, fd_http_resolver_align(), fd_http_resolver_footprint( SERVER_PEERS_MAX ) );
1330 0 : void * _selector = FD_SCRATCH_ALLOC_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( TOTAL_PEERS_MAX ) );
1331 :
1332 0 : ctx->config = tile->snapct;
1333 0 : ctx->gossip_enabled = gossip_enabled( tile );
1334 0 : ctx->download_enabled = download_enabled( tile );
1335 :
1336 0 : if( ctx->config.sources.servers_cnt ) {
1337 0 : for( ulong i=0UL; i<tile->snapct.sources.servers_cnt; i++ ) {
1338 0 : fd_ssping_add ( ctx->ssping, tile->snapct.sources.servers[ i ].addr );
1339 0 : fd_http_resolver_add( ctx->ssresolver,
1340 0 : tile->snapct.sources.servers[ i ].addr,
1341 0 : tile->snapct.sources.servers[ i ].hostname,
1342 0 : tile->snapct.sources.servers[ i ].is_https );
1343 0 : }
1344 0 : }
1345 :
1346 0 : ctx->selector = fd_sspeer_selector_join( fd_sspeer_selector_new( _selector, TOTAL_PEERS_MAX, ctx->config.incremental_snapshots, 1UL ) );
1347 :
1348 0 : ctx->state = FD_SNAPCT_STATE_WAITING_FOR_PEERS;
1349 0 : ctx->malformed = 0;
1350 0 : ctx->deadline_nanos = fd_log_wallclock() + FD_SNAPCT_WAITING_FOR_PEERS_TIMEOUT;
1351 0 : ctx->flush_ack = 0;
1352 0 : ctx->addr.l = 0UL;
1353 :
1354 0 : fd_memset( ctx->http_full_snapshot_name, 0, PATH_MAX );
1355 0 : fd_memset( ctx->http_incr_snapshot_name, 0, PATH_MAX );
1356 :
1357 0 : ctx->gossip_in_mem = NULL;
1358 0 : int has_snapld_dc = 0, has_ack_loopback = 0;
1359 0 : FD_TEST( tile->in_cnt<=MAX_IN_LINKS );
1360 0 : for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
1361 0 : fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
1362 0 : if( 0==strcmp( in_link->name, "gossip_out" ) ) {
1363 0 : ctx->in_kind[ i ] = IN_KIND_GOSSIP;
1364 0 : ctx->gossip_in_mem = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
1365 0 : } else if( 0==strcmp( in_link->name, "snapld_dc" ) ) {
1366 0 : ctx->in_kind[ i ] = IN_KIND_SNAPLD;
1367 0 : ctx->snapld_in_mem = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
1368 0 : FD_TEST( !has_snapld_dc );
1369 0 : has_snapld_dc = 1;
1370 0 : } else if( 0==strcmp( in_link->name, "snapin_ct" ) || 0==strcmp( in_link->name, "snapls_ct" ) ||
1371 0 : 0==strcmp( in_link->name, "snapwm_ct" ) || 0==strcmp( in_link->name, "snaplv_ct" ) ) {
1372 0 : ctx->in_kind[ i ] = IN_KIND_ACK;
1373 0 : FD_TEST( !has_ack_loopback );
1374 0 : has_ack_loopback = 1;
1375 0 : }
1376 0 : }
1377 0 : FD_TEST( has_snapld_dc && has_ack_loopback );
1378 0 : FD_TEST( ctx->gossip_enabled==(ctx->gossip_in_mem!=NULL) );
1379 :
1380 0 : ctx->predicted_incremental.full_slot = ULONG_MAX;
1381 0 : ctx->predicted_incremental.slot = ULONG_MAX;
1382 0 : ctx->predicted_incremental.dirty = 0;
1383 :
1384 0 : fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
1385 :
1386 0 : fd_memset( _ci_table, 0, sizeof(gossip_ci_entry_t) * GOSSIP_PEERS_MAX );
1387 0 : ctx->gossip.ci_table = _ci_table;
1388 0 : ctx->gossip.ci_map = gossip_ci_map_join( gossip_ci_map_new( _ci_map, gossip_ci_map_chain_cnt_est( GOSSIP_PEERS_MAX ), 0UL ) );
1389 0 : ctx->gossip.fresh_cnt = 0UL;
1390 0 : ctx->gossip.total_cnt = 0UL;
1391 0 : ctx->gossip.saturated = !ctx->gossip_enabled;
1392 0 : ctx->gossip.next_saturated_check = 0;
1393 :
1394 0 : if( FD_UNLIKELY( tile->out_cnt<2UL || tile->out_cnt>3UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2-3", tile->out_cnt ));
1395 0 : ctx->out_ld = out1( topo, tile, "snapct_ld" );
1396 0 : ctx->out_gui = out1( topo, tile, "snapct_gui" );
1397 0 : ctx->out_rp = out1( topo, tile, "snapct_repr" );
1398 0 : }
1399 :
1400 : /* after_credit can result in as many as 5 stem publishes in some code
1401 : paths, and returnable_frag can result in 1. */
1402 0 : #define STEM_BURST 6UL
1403 :
1404 0 : #define STEM_LAZY 1000L
1405 :
1406 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapct_tile_t
1407 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapct_tile_t)
1408 :
1409 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
1410 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
1411 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
1412 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
1413 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
1414 :
1415 : #include "../../disco/stem/fd_stem.c"
1416 :
1417 : fd_topo_run_tile_t fd_tile_snapct = {
1418 : .name = NAME,
1419 : .rlimit_file_cnt_fn = rlimit_file_cnt,
1420 : .populate_allowed_seccomp = populate_allowed_seccomp,
1421 : .populate_allowed_fds = populate_allowed_fds,
1422 : .scratch_align = scratch_align,
1423 : .scratch_footprint = scratch_footprint,
1424 : .loose_footprint = loose_footprint,
1425 : .privileged_init = privileged_init,
1426 : .unprivileged_init = unprivileged_init,
1427 : .run = stem_run,
1428 : .keep_host_networking = 1,
1429 : .allow_connect = 1,
1430 : .allow_renameat = 1,
1431 : };
1432 :
1433 : #undef NAME
|