Line data Source code
1 : #define _GNU_SOURCE /* SOL_TCP (seccomp) */
2 :
3 : #include "fd_snaprd_tile.h"
4 : #include "utils/fd_ssping.h"
5 : #include "utils/fd_sshttp.h"
6 : #include "utils/fd_ssctrl.h"
7 : #include "utils/fd_ssarchive.h"
8 : #include "utils/fd_http_resolver.h"
9 : #include "utils/fd_ssmsg.h"
10 :
11 : #include "fd_snaprd_tile.h"
12 :
13 : #include "../../disco/topo/fd_topo.h"
14 : #include "../../disco/metrics/fd_metrics.h"
15 : #include "../../flamenco/gossip/fd_gossip_types.h"
16 : #include "../../app/shared/fd_config.h"
17 :
18 : #include <errno.h>
19 : #include <stdio.h>
20 : #include <fcntl.h>
21 : #include <unistd.h>
22 : #include <sys/stat.h>
23 : #include <netinet/tcp.h>
24 : #include <netinet/in.h>
25 :
26 : #include "generated/fd_snaprd_tile_seccomp.h"
27 :
28 0 : #define FD_SSPING_MAX_PEERS (65536UL)
29 :
30 : #define NAME "snaprd"
31 :
32 0 : #define FD_SNAPRD_MAX_HTTP_PEERS (16UL) /* Maximum number of configured http peers */
33 0 : #define SNAPRD_FILE_BUF_SZ (1024UL*1024UL) /* 1 MiB */
34 :
35 0 : #define IN_KIND_SNAPCTL (0)
36 0 : #define IN_KIND_GOSSIP (1)
37 : #define MAX_IN_LINKS (3)
38 :
39 : struct fd_restore_out_link {
40 : ulong idx;
41 : fd_wksp_t * mem;
42 : ulong chunk0;
43 : ulong wmark;
44 : ulong chunk;
45 : ulong mtu;
46 : };
47 :
48 : typedef struct fd_restore_out_link fd_restore_out_link_t;
49 :
50 : #define FD_SNAPRD_GOSSIP_FRESH_DEADLINE_NANOS (7.5L*1000L*1000L*1000L) /* gossip contact info is pushed every 7.5 seconds */
51 0 : #define FD_SNAPRD_GOSSIP_SATURATION_THRESHOLD (0.05) /* 5% fresh peers */
52 0 : #define FD_SNAPRD_GOSSIP_TIMEOUT_DEADLINE_NANOS (2L*60L*1000L*1000L*1000L) /* 2 minutes */
53 0 : #define FD_SNAPRD_WAITING_FOR_PEERS_TIMEOUT_DEADLINE_NANOS (2L*60L*1000L*1000L*1000L) /* 2 minutes */
54 :
55 : struct fd_snaprd_gossip_ci_entry {
56 : fd_ip4_port_t gossip_addr;
57 : fd_ip4_port_t rpc_addr;
58 : fd_pubkey_t pubkey;
59 : long wallclock_nanos;
60 :
61 : struct {
62 : ulong prev;
63 : ulong next;
64 : } map;
65 :
66 : struct {
67 : ulong next;
68 : } pool;
69 : };
70 :
71 : typedef struct fd_snaprd_gossip_ci_entry fd_snaprd_gossip_ci_entry_t;
72 :
73 : #define POOL_NAME gossip_ci_pool
74 0 : #define POOL_T fd_snaprd_gossip_ci_entry_t
75 0 : #define POOL_NEXT pool.next
76 : #include "../../util/tmpl/fd_pool.c"
77 :
78 : #define MAP_NAME gossip_ci_map
79 0 : #define MAP_KEY pubkey
80 0 : #define MAP_ELE_T fd_snaprd_gossip_ci_entry_t
81 : #define MAP_KEY_T fd_pubkey_t
82 0 : #define MAP_PREV map.prev
83 0 : #define MAP_NEXT map.next
84 0 : #define MAP_KEY_EQ(k0,k1) fd_pubkey_eq( k0, k1 )
85 0 : #define MAP_KEY_HASH(key,seed) (seed^fd_ulong_load_8( (key)->uc ))
86 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
87 : #include "../../util/tmpl/fd_map_chain.c"
88 :
89 : struct fd_snaprd_tile {
90 : fd_ssping_t * ssping;
91 : fd_sshttp_t * sshttp;
92 : fd_http_resolver_t * ssresolver;
93 : fd_sspeer_selector_t * selector;
94 :
95 : int state;
96 : int malformed;
97 : long deadline_nanos;
98 : ulong ack_cnt;
99 : int peer_selection;
100 :
101 : fd_ip4_port_t addr;
102 :
103 : struct {
104 : ulong write_buffer_pos;
105 : ulong write_buffer_len;
106 : uchar write_buffer[ SNAPRD_FILE_BUF_SZ ];
107 :
108 : char full_snapshot_path[ PATH_MAX ];
109 : char incremental_snapshot_path[ PATH_MAX ];
110 :
111 : int dir_fd;
112 : int full_snapshot_fd;
113 : int incremental_snapshot_fd;
114 : } local_out;
115 :
116 : uchar in_kind[ MAX_IN_LINKS ];
117 :
118 : struct {
119 : ulong full_slot;
120 : ulong slot;
121 : int dirty;
122 : } predicted_incremental;
123 :
124 : struct {
125 : ulong full_snapshot_slot;
126 : int full_snapshot_fd;
127 : char full_snapshot_path[ PATH_MAX ];
128 : ulong full_snapshot_size;
129 :
130 : ulong incremental_snapshot_slot;
131 : int incremental_snapshot_fd;
132 : char incremental_snapshot_path[ PATH_MAX ];
133 : ulong incremental_snapshot_size;
134 : } local_in;
135 :
136 : struct {
137 : char path[ PATH_MAX ];
138 : int do_download;
139 : int incremental_snapshot_fetch;
140 : uint maximum_local_snapshot_age;
141 : uint minimum_download_speed_mib;
142 : uint maximum_download_retry_abort;
143 : uint max_full_snapshots_to_keep;
144 : uint max_incremental_snapshots_to_keep;
145 : int entrypoints_enabled;
146 : int gossip_peers_enabled;
147 : } config;
148 :
149 : struct {
150 : struct {
151 : ulong bytes_read;
152 : ulong bytes_written;
153 : ulong bytes_total;
154 : uint num_retries;
155 : } full;
156 :
157 : struct {
158 : ulong bytes_read;
159 : ulong bytes_written;
160 : ulong bytes_total;
161 : uint num_retries;
162 : } incremental;
163 : } metrics;
164 :
165 : struct {
166 : fd_wksp_t * mem;
167 : ulong chunk0;
168 : ulong wmark;
169 : ulong mtu;
170 : } gossip_in;
171 :
172 : struct {
173 : fd_snaprd_gossip_ci_entry_t * ci_pool;
174 : gossip_ci_map_t * ci_map;
175 : fd_gossip_update_message_t tmp_upd_buf;
176 : fd_ip4_port_t entrypoints[ GOSSIP_TILE_ENTRYPOINTS_MAX ];
177 : ulong entrypoints_cnt;
178 : ulong entrypoints_received;
179 : double fresh;
180 : ulong fresh_cnt;
181 : ulong total_cnt;
182 : int saturated;
183 : } gossip;
184 :
185 : fd_restore_out_link_t out_snapctl;
186 : fd_restore_out_link_t out_gui;
187 : fd_restore_out_link_t out_rp;
188 :
189 : /* Ensure snapshot path is only published to the gui tile once */
190 : int gui_full_path_published;
191 : int gui_incremental_path_published;
192 : };
193 :
194 : typedef struct fd_snaprd_tile fd_snaprd_tile_t;
195 :
196 : static ulong
197 0 : scratch_align( void ) {
198 0 : return alignof(fd_snaprd_tile_t);
199 0 : }
200 :
201 : static ulong
202 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
203 0 : ulong l = FD_LAYOUT_INIT;
204 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snaprd_tile_t), sizeof(fd_snaprd_tile_t) );
205 0 : l = FD_LAYOUT_APPEND( l, fd_sshttp_align(), fd_sshttp_footprint() );
206 0 : l = FD_LAYOUT_APPEND( l, fd_ssping_align(), fd_ssping_footprint( FD_SSPING_MAX_PEERS ) );
207 0 : l = FD_LAYOUT_APPEND( l, gossip_ci_pool_align(), gossip_ci_pool_footprint( FD_CONTACT_INFO_TABLE_SIZE ) );
208 0 : l = FD_LAYOUT_APPEND( l, gossip_ci_map_align(), gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( FD_CONTACT_INFO_TABLE_SIZE ) ) );
209 0 : l = FD_LAYOUT_APPEND( l, fd_http_resolver_align(), fd_http_resolver_footprint( FD_SNAPRD_MAX_HTTP_PEERS ) );
210 0 : l = FD_LAYOUT_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( FD_SSPING_MAX_PEERS ) );
211 0 : return FD_LAYOUT_FINI( l, alignof(fd_snaprd_tile_t) );
212 0 : }
213 :
214 : static inline int
215 0 : should_shutdown( fd_snaprd_tile_t * ctx ) {
216 0 : return ctx->state==FD_SNAPRD_STATE_SHUTDOWN;
217 0 : }
218 :
219 : static inline int
220 0 : is_entrypoint( fd_snaprd_tile_t * ctx, fd_ip4_port_t addr ) {
221 0 : for( ulong i=0UL; i<ctx->gossip.entrypoints_cnt; i++ ) {
222 0 : if( FD_UNLIKELY( ctx->gossip.entrypoints[ i ].l==addr.l ) ) return 1;
223 0 : }
224 0 : return 0;
225 0 : }
226 :
227 : static int
228 0 : all_entrypoints_received( fd_snaprd_tile_t * ctx ) {
229 0 : if( FD_UNLIKELY( !ctx->config.entrypoints_enabled ) ) return 1;
230 0 : if( FD_UNLIKELY( ctx->gossip.entrypoints_received==ctx->gossip.entrypoints_cnt ) ) return 1;
231 :
232 0 : ulong received_gossip_entrypoints = 0UL;
233 0 : for( ulong i=0UL; i<ctx->gossip.entrypoints_cnt; i++ ) {
234 0 : for( gossip_ci_map_iter_t iter = gossip_ci_map_iter_init( ctx->gossip.ci_map, ctx->gossip.ci_pool );
235 0 : !gossip_ci_map_iter_done( iter, ctx->gossip.ci_map, ctx->gossip.ci_pool );
236 0 : iter = gossip_ci_map_iter_next( iter, ctx->gossip.ci_map, ctx->gossip.ci_pool ) ) {
237 0 : fd_snaprd_gossip_ci_entry_t const * ci_entry = gossip_ci_map_iter_ele_const( iter, ctx->gossip.ci_map, ctx->gossip.ci_pool );
238 0 : if( FD_LIKELY( ci_entry->gossip_addr.l==ctx->gossip.entrypoints[ i ].l ) ) {
239 0 : received_gossip_entrypoints++;
240 0 : break;
241 0 : }
242 0 : }
243 0 : }
244 :
245 0 : ctx->gossip.entrypoints_received = received_gossip_entrypoints;
246 0 : return received_gossip_entrypoints==ctx->gossip.entrypoints_cnt;
247 0 : }
248 :
249 : static int
250 : gossip_saturated( fd_snaprd_tile_t * ctx,
251 0 : long now ) {
252 0 : if( FD_UNLIKELY( !ctx->config.gossip_peers_enabled ) ) return 1;
253 0 : if( FD_UNLIKELY( ctx->gossip.saturated ) ) return 1;
254 :
255 0 : ulong fresh_cnt = 0UL;
256 0 : ulong total_cnt = 0UL;
257 0 : for( gossip_ci_map_iter_t iter = gossip_ci_map_iter_init( ctx->gossip.ci_map, ctx->gossip.ci_pool );
258 0 : !gossip_ci_map_iter_done( iter, ctx->gossip.ci_map, ctx->gossip.ci_pool );
259 0 : iter = gossip_ci_map_iter_next( iter, ctx->gossip.ci_map, ctx->gossip.ci_pool ) ) {
260 0 : fd_snaprd_gossip_ci_entry_t const * ci_entry = gossip_ci_map_iter_ele_const( iter, ctx->gossip.ci_map, ctx->gossip.ci_pool );
261 0 : if( FD_UNLIKELY( ci_entry->wallclock_nanos>(now-FD_SNAPRD_GOSSIP_FRESH_DEADLINE_NANOS) ) ) fresh_cnt++;
262 0 : total_cnt++;
263 0 : }
264 :
265 0 : double fresh = total_cnt ? (double)fresh_cnt/(double)total_cnt : 1.0;
266 0 : ctx->gossip.fresh_cnt = fresh_cnt;
267 0 : ctx->gossip.total_cnt = total_cnt;
268 0 : ctx->gossip.fresh = fresh;
269 0 : ctx->gossip.saturated = fresh<FD_SNAPRD_GOSSIP_SATURATION_THRESHOLD;
270 0 : return ctx->gossip.saturated;
271 0 : }
272 :
273 : static void
274 0 : metrics_write( fd_snaprd_tile_t * ctx ) {
275 0 : FD_MGAUGE_SET( SNAPRD, FULL_BYTES_READ, ctx->metrics.full.bytes_read );
276 0 : FD_MGAUGE_SET( SNAPRD, FULL_BYTES_WRITTEN, ctx->metrics.full.bytes_written );
277 0 : FD_MGAUGE_SET( SNAPRD, FULL_BYTES_TOTAL, ctx->metrics.full.bytes_total );
278 0 : FD_MGAUGE_SET( SNAPRD, FULL_DOWNLOAD_RETRIES, ctx->metrics.full.num_retries );
279 :
280 0 : FD_MGAUGE_SET( SNAPRD, INCREMENTAL_BYTES_READ, ctx->metrics.incremental.bytes_read );
281 0 : FD_MGAUGE_SET( SNAPRD, INCREMENTAL_BYTES_WRITTEN, ctx->metrics.incremental.bytes_written );
282 0 : FD_MGAUGE_SET( SNAPRD, INCREMENTAL_BYTES_TOTAL, ctx->metrics.incremental.bytes_total );
283 0 : FD_MGAUGE_SET( SNAPRD, INCREMENTAL_DOWNLOAD_RETRIES, ctx->metrics.incremental.num_retries );
284 :
285 0 : FD_MGAUGE_SET( SNAPRD, GOSSIP_FRESH_COUNT, ctx->gossip.fresh_cnt );
286 0 : FD_MGAUGE_SET( SNAPRD, GOSSIP_TOTAL_COUNT, ctx->gossip.total_cnt );
287 :
288 0 : FD_MGAUGE_SET( SNAPRD, PREDICTED_SLOT, ctx->predicted_incremental.slot );
289 :
290 0 : FD_MGAUGE_SET( SNAPRD, STATE, (ulong)ctx->state );
291 0 : }
292 :
293 : static void
294 : snapshot_path_gui_publish( fd_snaprd_tile_t * ctx,
295 : fd_stem_context_t * stem,
296 : char const * path,
297 0 : int is_full ) {
298 0 : fd_snaprd_update_t * out = fd_chunk_to_laddr( ctx->out_gui.mem, ctx->out_gui.chunk );
299 0 : FD_TEST( fd_cstr_printf_check( out->read_path, PATH_MAX, NULL, "%s", path ) );
300 0 : out->is_download = 0;
301 0 : out->type = fd_int_if( is_full, FD_SNAPRD_SNAPSHOT_TYPE_FULL, FD_SNAPRD_SNAPSHOT_TYPE_INCREMENTAL );
302 0 : fd_stem_publish( stem, ctx->out_gui.idx, 0UL, ctx->out_gui.chunk, sizeof(fd_snaprd_update_t) , 0UL, 0UL, 0UL );
303 0 : ctx->out_gui.chunk = fd_dcache_compact_next( ctx->out_gui.chunk, sizeof(fd_snaprd_update_t), ctx->out_gui.chunk0, ctx->out_gui.wmark );
304 0 : if( is_full ) ctx->gui_full_path_published = 1;
305 0 : else ctx->gui_incremental_path_published = 1;
306 0 : }
307 :
308 : static void
309 0 : predict_incremental( fd_snaprd_tile_t * ctx ) {
310 0 : if( FD_UNLIKELY( !ctx->config.incremental_snapshot_fetch ) ) return;
311 0 : if( FD_UNLIKELY( ctx->predicted_incremental.full_slot==ULONG_MAX ) ) return;
312 :
313 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
314 :
315 0 : if( FD_LIKELY( best.addr.l ) ) {
316 0 : if( FD_UNLIKELY( ctx->predicted_incremental.slot!=best.ssinfo.incremental.slot ) ) {
317 0 : ctx->predicted_incremental.slot = best.ssinfo.incremental.slot;
318 0 : ctx->predicted_incremental.dirty = 1;
319 0 : }
320 0 : }
321 0 : }
322 :
323 : static void
324 : on_resolve( void * _ctx,
325 : fd_ip4_port_t addr,
326 0 : fd_ssinfo_t const * ssinfo ) {
327 0 : fd_snaprd_tile_t * ctx = (fd_snaprd_tile_t *)_ctx;
328 :
329 0 : fd_sspeer_selector_add( ctx->selector, addr, ULONG_MAX, ssinfo );
330 0 : fd_sspeer_selector_process_cluster_slot( ctx->selector, ssinfo->full.slot, ssinfo->incremental.slot );
331 0 : predict_incremental( ctx );
332 0 : }
333 :
334 : static void
335 : on_ping( void * _ctx,
336 : fd_ip4_port_t addr,
337 0 : ulong latency ) {
338 0 : fd_snaprd_tile_t * ctx = (fd_snaprd_tile_t *)_ctx;
339 :
340 0 : fd_sspeer_selector_add( ctx->selector, addr, latency, NULL );
341 0 : predict_incremental( ctx );
342 0 : }
343 :
344 : static void
345 : on_snapshot_hash( fd_snaprd_tile_t * ctx,
346 : fd_ip4_port_t addr,
347 0 : fd_gossip_update_message_t const * msg ) {
348 0 : ulong full_slot = msg->snapshot_hashes.full->slot;
349 0 : ulong incr_slot = 0UL;
350 :
351 0 : for( ulong i=0UL; i<msg->snapshot_hashes.incremental_len; i++ ) {
352 0 : if( FD_LIKELY( msg->snapshot_hashes.incremental[ i ].slot>incr_slot ) ) {
353 0 : incr_slot = msg->snapshot_hashes.incremental[ i ].slot;
354 0 : }
355 0 : }
356 :
357 0 : fd_ssinfo_t ssinfo = { .full = { .slot = msg->snapshot_hashes.full->slot },
358 0 : .incremental = { .slot = incr_slot, .base_slot = full_slot } };
359 :
360 0 : fd_sspeer_selector_add( ctx->selector, addr, ULONG_MAX, &ssinfo );
361 0 : fd_sspeer_selector_process_cluster_slot( ctx->selector, full_slot, incr_slot );
362 0 : predict_incremental( ctx );
363 0 : }
364 :
365 : static void
366 : send_expected_slot( fd_stem_context_t * stem,
367 0 : ulong slot ) {
368 0 : uint tsorig; uint tspub;
369 0 : fd_ssmsg_slot_to_frag( slot, &tsorig, &tspub );
370 0 : fd_stem_publish( stem, 1UL, FD_SSMSG_EXPECTED_SLOT, 0UL, 0UL, 0UL, tsorig, tspub );
371 0 : }
372 :
373 : static void
374 : read_file_data( fd_snaprd_tile_t * ctx,
375 0 : fd_stem_context_t * stem ) {
376 0 : uchar * out = fd_chunk_to_laddr( ctx->out_snapctl.mem, ctx->out_snapctl.chunk );
377 :
378 0 : FD_TEST( ctx->state==FD_SNAPRD_STATE_READING_INCREMENTAL_FILE || ctx->state==FD_SNAPRD_STATE_READING_FULL_FILE );
379 0 : int full = ctx->state==FD_SNAPRD_STATE_READING_FULL_FILE;
380 0 : long result = read( full ? ctx->local_in.full_snapshot_fd : ctx->local_in.incremental_snapshot_fd , out, ctx->out_snapctl.mtu );
381 0 : if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) return;
382 0 : else if( FD_UNLIKELY( -1==result ) ) FD_LOG_ERR(( "read() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
383 :
384 0 : switch( ctx->state ) {
385 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE:
386 0 : ctx->metrics.incremental.bytes_read += (ulong)result;
387 :
388 0 : break;
389 0 : case FD_SNAPRD_STATE_READING_FULL_FILE:
390 0 : ctx->metrics.full.bytes_read += (ulong)result;
391 :
392 0 : break;
393 0 : default:
394 0 : break;
395 0 : }
396 :
397 0 : if( FD_UNLIKELY( !result ) ) {
398 0 : switch( ctx->state ) {
399 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE:
400 0 : fd_stem_publish( stem, ctx->out_snapctl.idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
401 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE;
402 0 : break;
403 0 : case FD_SNAPRD_STATE_READING_FULL_FILE:
404 0 : if( FD_LIKELY( ctx->config.incremental_snapshot_fetch ) ) {
405 0 : fd_stem_publish( stem, ctx->out_snapctl.idx, FD_SNAPSHOT_MSG_CTRL_EOF_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
406 0 : } else {
407 0 : fd_stem_publish( stem, ctx->out_snapctl.idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
408 0 : }
409 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_FILE;
410 0 : break;
411 0 : default:
412 0 : break;
413 0 : }
414 0 : return;
415 0 : }
416 :
417 0 : fd_stem_publish( stem, ctx->out_snapctl.idx, FD_SNAPSHOT_MSG_DATA, ctx->out_snapctl.chunk, (ulong)result, 0UL, 0UL, 0UL );
418 0 : ctx->out_snapctl.chunk = fd_dcache_compact_next( ctx->out_snapctl.chunk, (ulong)result, ctx->out_snapctl.chunk0, ctx->out_snapctl.wmark );
419 0 : }
420 :
421 : static void
422 : read_http_data( fd_snaprd_tile_t * ctx,
423 : fd_stem_context_t * stem,
424 0 : long now ) {
425 0 : uchar * out = fd_chunk_to_laddr( ctx->out_snapctl.mem, ctx->out_snapctl.chunk );
426 :
427 0 : ulong buffer_avail = fd_ulong_if( -1!=ctx->local_out.dir_fd, SNAPRD_FILE_BUF_SZ-ctx->local_out.write_buffer_len, ULONG_MAX );
428 0 : ulong data_len = fd_ulong_min( buffer_avail, ctx->out_snapctl.mtu );
429 0 : int result = fd_sshttp_advance( ctx->sshttp, &data_len, out, now );
430 :
431 0 : char const * full_snapshot_name;
432 0 : char const * incremental_snapshot_name;
433 0 : fd_sshttp_snapshot_names( ctx->sshttp, &full_snapshot_name, &incremental_snapshot_name );
434 0 : char snapshot_path[ PATH_MAX+30UL ]; /* 30 is fd_cstr_nlen( "https://255.255.255.255:65536/", ULONG_MAX ) */
435 0 : if( FD_LIKELY( !ctx->gui_full_path_published && strcmp( full_snapshot_name, "" ) ) ) {
436 0 : FD_TEST( fd_cstr_printf_check( snapshot_path, sizeof(snapshot_path), NULL, "http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), fd_ushort_bswap( ctx->addr.port ), full_snapshot_name ) );
437 0 : snapshot_path_gui_publish( ctx, stem, snapshot_path, /* is_full */ 1 );
438 0 : }
439 0 : if( FD_LIKELY( !ctx->gui_incremental_path_published && strcmp( incremental_snapshot_name, "" ) ) ) {
440 0 : FD_TEST( fd_cstr_printf_check( snapshot_path, sizeof(snapshot_path), NULL, "http://" FD_IP4_ADDR_FMT ":%hu/%s", FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), fd_ushort_bswap( ctx->addr.port ), incremental_snapshot_name ) );
441 0 : snapshot_path_gui_publish( ctx, stem, snapshot_path, /* is_full */ 0 );
442 0 : }
443 :
444 0 : switch( result ) {
445 0 : case FD_SSHTTP_ADVANCE_AGAIN: break;
446 0 : case FD_SSHTTP_ADVANCE_ERROR: {
447 :
448 0 : switch( ctx->state ) {
449 0 : case FD_SNAPRD_STATE_READING_FULL_HTTP:
450 0 : FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
451 0 : FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
452 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
453 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
454 0 : break;
455 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP:
456 0 : FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2",
457 0 : FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
458 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL, 0UL, 0UL, 0UL, 0UL, 0UL );
459 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP_RESET;
460 0 : break;
461 0 : default:
462 0 : break;
463 0 : }
464 0 : fd_ssping_invalidate( ctx->ssping, ctx->addr, now );
465 0 : fd_stem_publish( stem, ctx->out_snapctl.idx, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
466 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
467 0 : fd_sspeer_selector_remove( ctx->selector, ctx->addr );
468 0 : ctx->deadline_nanos = now;
469 0 : break;
470 0 : }
471 0 : case FD_SSHTTP_ADVANCE_DONE: {
472 0 : switch( ctx->state ) {
473 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP:
474 0 : fd_stem_publish( stem, ctx->out_snapctl.idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
475 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP;
476 0 : break;
477 0 : case FD_SNAPRD_STATE_READING_FULL_HTTP:
478 0 : if( FD_LIKELY( ctx->config.incremental_snapshot_fetch ) ) {
479 0 : fd_stem_publish( stem, ctx->out_snapctl.idx, FD_SNAPSHOT_MSG_CTRL_EOF_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
480 0 : } else {
481 0 : fd_stem_publish( stem, ctx->out_snapctl.idx, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
482 0 : }
483 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP;
484 0 : break;
485 0 : default:
486 0 : break;
487 0 : }
488 0 : break;
489 0 : }
490 0 : case FD_SSHTTP_ADVANCE_DATA: {
491 0 : if( FD_LIKELY( ctx->state==FD_SNAPRD_STATE_READING_FULL_HTTP ) ) ctx->metrics.full.bytes_total = fd_sshttp_content_len( ctx->sshttp );
492 0 : else ctx->metrics.incremental.bytes_total = fd_sshttp_content_len( ctx->sshttp );
493 :
494 0 : fd_stem_publish( stem, ctx->out_snapctl.idx, FD_SNAPSHOT_MSG_DATA, ctx->out_snapctl.chunk, data_len, 0UL, 0UL, 0UL );
495 0 : ctx->out_snapctl.chunk = fd_dcache_compact_next( ctx->out_snapctl.chunk, data_len, ctx->out_snapctl.chunk0, ctx->out_snapctl.wmark );
496 :
497 0 : ulong written_sz = 0UL;
498 0 : if( FD_LIKELY( -1!=ctx->local_out.dir_fd && !ctx->local_out.write_buffer_len ) ) {
499 0 : while( written_sz<data_len ) {
500 0 : int full = ctx->state==FD_SNAPRD_STATE_READING_FULL_HTTP;
501 0 : int fd = full ? ctx->local_out.full_snapshot_fd : ctx->local_out.incremental_snapshot_fd;
502 0 : long result = write( fd, out+written_sz, data_len-written_sz );
503 0 : if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) break;
504 0 : else if( FD_UNLIKELY( -1==result && errno==ENOSPC ) ) {
505 0 : char const * snapshot_path = full ? ctx->local_out.full_snapshot_path : ctx->local_out.incremental_snapshot_path;
506 0 : FD_LOG_ERR(( "Out of disk space when writing out snapshot data to `%s`", snapshot_path ));
507 0 : } else if( FD_UNLIKELY( -1==result ) ) {
508 0 : FD_LOG_ERR(( "write() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
509 0 : break;
510 0 : }
511 0 : written_sz += (ulong)result;
512 0 : }
513 0 : }
514 :
515 0 : if( FD_UNLIKELY( written_sz<data_len ) ) {
516 0 : fd_memcpy( ctx->local_out.write_buffer+ctx->local_out.write_buffer_len, out+written_sz, data_len-written_sz );
517 0 : }
518 0 : ctx->local_out.write_buffer_len += data_len-written_sz;
519 :
520 0 : switch( ctx->state ) {
521 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP:
522 0 : ctx->metrics.incremental.bytes_read += data_len;
523 0 : ctx->metrics.incremental.bytes_written += written_sz;
524 0 : break;
525 0 : case FD_SNAPRD_STATE_READING_FULL_HTTP:
526 0 : ctx->metrics.full.bytes_read += data_len;
527 0 : ctx->metrics.full.bytes_written += written_sz;
528 0 : break;
529 0 : default:
530 0 : FD_LOG_ERR(( "unexpected state %d", ctx->state ));
531 0 : break;
532 0 : }
533 :
534 0 : break;
535 0 : }
536 0 : default:
537 0 : FD_LOG_ERR(( "unexpected fd_sshttp_advance result %d", result ));
538 0 : break;
539 0 : }
540 0 : }
541 :
542 : static void
543 0 : drain_buffer( fd_snaprd_tile_t * ctx ) {
544 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPRD_STATE_READING_FULL_HTTP &&
545 0 : ctx->state!=FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP &&
546 0 : ctx->state!=FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP &&
547 0 : ctx->state!=FD_SNAPRD_STATE_FLUSHING_FULL_HTTP ) ) return;
548 :
549 0 : if( FD_LIKELY( -1==ctx->local_out.dir_fd || !ctx->local_out.write_buffer_len ) ) return;
550 :
551 0 : int full = ctx->state==FD_SNAPRD_STATE_READING_FULL_HTTP || ctx->state==FD_SNAPRD_STATE_FLUSHING_FULL_HTTP;
552 0 : int fd = full ? ctx->local_out.full_snapshot_fd : ctx->local_out.incremental_snapshot_fd;
553 :
554 0 : ulong written_sz = 0UL;
555 0 : while( ctx->local_out.write_buffer_pos+written_sz<ctx->local_out.write_buffer_len ) {
556 0 : long result = write( fd, ctx->local_out.write_buffer+ctx->local_out.write_buffer_pos+written_sz, ctx->local_out.write_buffer_len-written_sz );
557 0 : if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) break;
558 0 : else if( FD_UNLIKELY( -1==result && errno==ENOSPC ) ) {
559 0 : char const * snapshot_path = full ? ctx->local_out.full_snapshot_path : ctx->local_out.incremental_snapshot_path;
560 0 : FD_LOG_ERR(( "Out of disk space when writing out snapshot data to `%s`", snapshot_path ));
561 0 : } else if( FD_UNLIKELY( -1==result ) ) {
562 0 : FD_LOG_ERR(( "write() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
563 0 : break;
564 0 : }
565 0 : written_sz += (ulong)result;
566 0 : }
567 :
568 0 : ctx->local_out.write_buffer_pos += written_sz;
569 :
570 0 : if( FD_LIKELY( ctx->local_out.write_buffer_pos==ctx->local_out.write_buffer_len ) ) {
571 0 : ctx->local_out.write_buffer_pos = 0UL;
572 0 : ctx->local_out.write_buffer_len = 0UL;
573 0 : }
574 :
575 0 : if( FD_LIKELY( full ) ) ctx->metrics.full.bytes_written += written_sz;
576 0 : else ctx->metrics.incremental.bytes_written += written_sz;
577 0 : }
578 :
579 : static void
580 0 : rename_snapshots( fd_snaprd_tile_t * ctx ) {
581 0 : if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) return;
582 0 : char const * full_snapshot_name;
583 0 : char const * incremental_snapshot_name;
584 0 : fd_sshttp_snapshot_names( ctx->sshttp, &full_snapshot_name, &incremental_snapshot_name );
585 :
586 0 : if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd && full_snapshot_name[ 0 ]!='\0' ) ) {
587 0 : if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", ctx->local_out.dir_fd, full_snapshot_name ) ) )
588 0 : FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
589 0 : }
590 0 : if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd && incremental_snapshot_name[ 0 ]!='\0' ) ) {
591 0 : if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", ctx->local_out.dir_fd, incremental_snapshot_name ) ) )
592 0 : FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
593 0 : }
594 0 : }
595 :
596 : static void
597 0 : remove_temp_files( fd_snaprd_tile_t * ctx ) {
598 0 : if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) return;
599 :
600 0 : if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) ) {
601 0 : if( FD_UNLIKELY( -1==unlinkat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", 0 ) ) )
602 0 : FD_LOG_ERR(( "unlinkat(snapshot.tar.bz2-partial) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
603 0 : }
604 0 : if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) {
605 0 : if( FD_UNLIKELY( -1==unlinkat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", 0 ) ) )
606 0 : FD_LOG_ERR(( "unlinkat(incremental-snapshot.tar.bz2-partial) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
607 0 : }
608 0 : }
609 :
610 : static ulong
611 : rlimit_file_cnt( fd_topo_t const * topo FD_PARAM_UNUSED,
612 0 : fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
613 : /* stderr, logfile, dirfd, local out full fd, local out incremental
614 : fd, local in full fd, local in incremental fd, and one spare for a
615 : socket(). */
616 :
617 0 : return 1UL + /* stderr */
618 0 : 1UL + /* logfile */
619 0 : FD_SSPING_MAX_PEERS + /* ssping max peers sockets */
620 0 : FD_SNAPRD_MAX_HTTP_PEERS + /* http resolver max peers sockets */
621 0 : 3UL + /* dirfd + 2 snapshot file fds in the worst case */
622 0 : 1UL; /* sshttp socket */
623 0 : }
624 :
625 : static ulong
626 : populate_allowed_seccomp( fd_topo_t const * topo,
627 : fd_topo_tile_t const * tile,
628 : ulong out_cnt,
629 0 : struct sock_filter * out ) {
630 :
631 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
632 :
633 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
634 0 : fd_snaprd_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaprd_tile_t), sizeof(fd_snaprd_tile_t) );
635 :
636 0 : populate_sock_filter_policy_fd_snaprd_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->local_out.dir_fd, (uint)ctx->local_out.full_snapshot_fd, (uint)ctx->local_out.incremental_snapshot_fd, (uint)ctx->local_in.full_snapshot_fd, (uint)ctx->local_in.incremental_snapshot_fd );
637 0 : return sock_filter_policy_fd_snaprd_tile_instr_cnt;
638 0 : }
639 :
640 : static ulong
641 : populate_allowed_fds( fd_topo_t const * topo,
642 : fd_topo_tile_t const * tile,
643 : ulong out_fds_cnt,
644 0 : int * out_fds ) {
645 : /* In the worst case we expect these file descriptors to be open:
646 : - stderr
647 : - logfile
648 : - 5 file descriptors for the directory fd, 2 snapshot file fds for
649 : http downloads, and 2 snapshot file fds for snapshot files on
650 : disk. */
651 :
652 0 : if( FD_UNLIKELY( out_fds_cnt<7UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
653 :
654 0 : ulong out_cnt = 0;
655 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
656 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
657 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
658 0 : }
659 :
660 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
661 :
662 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
663 0 : fd_snaprd_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaprd_tile_t), sizeof(fd_snaprd_tile_t) );
664 0 : if( FD_LIKELY( -1!=ctx->local_out.dir_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.dir_fd;
665 0 : if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.full_snapshot_fd;
666 0 : if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) out_fds[ out_cnt++ ] = ctx->local_out.incremental_snapshot_fd;
667 0 : if( FD_LIKELY( -1!=ctx->local_in.full_snapshot_fd ) ) out_fds[ out_cnt++ ] = ctx->local_in.full_snapshot_fd;
668 0 : if( FD_LIKELY( -1!=ctx->local_in.incremental_snapshot_fd ) ) out_fds[ out_cnt++ ] = ctx->local_in.incremental_snapshot_fd;
669 :
670 0 : return out_cnt;
671 0 : }
672 :
673 : static void
674 : after_credit( fd_snaprd_tile_t * ctx,
675 : fd_stem_context_t * stem,
676 : int * opt_poll_in,
677 0 : int * charge_busy ) {
678 0 : (void)opt_poll_in;
679 0 : (void)charge_busy;
680 :
681 0 : long now = fd_log_wallclock();
682 :
683 : /* If snapshots are read from disk, we can immediatley publish an
684 : update notificaiton with the snapshot slot and load path */
685 0 : if( FD_UNLIKELY( ctx->local_in.full_snapshot_slot!=ULONG_MAX && !ctx->gui_full_path_published ) ) {
686 0 : snapshot_path_gui_publish( ctx, stem, ctx->local_in.full_snapshot_path, /* is_full */ 1 );
687 0 : *opt_poll_in = 0;
688 0 : return;
689 0 : }
690 0 : if( FD_UNLIKELY( ctx->local_in.incremental_snapshot_slot!=ULONG_MAX && !ctx->gui_incremental_path_published ) ) {
691 0 : snapshot_path_gui_publish( ctx, stem, ctx->local_in.incremental_snapshot_path, /* is_full */ 0 );
692 0 : *opt_poll_in = 0;
693 0 : return;
694 0 : }
695 :
696 0 : if( FD_LIKELY( ctx->peer_selection ) ) {
697 0 : fd_ssping_advance( ctx->ssping, now, ctx->selector );
698 0 : fd_http_resolver_advance( ctx->ssresolver, now, ctx->selector );
699 :
700 : /* send an expected slot message as the predicted incremental
701 : could have changed as a result of the pinger, resolver, or from
702 : processing gossip frags in after_frag. */
703 0 : if( FD_LIKELY( ctx->predicted_incremental.dirty ) ) {
704 0 : send_expected_slot( stem, ctx->predicted_incremental.slot );
705 0 : ctx->predicted_incremental.dirty = 0;
706 0 : }
707 0 : }
708 :
709 0 : drain_buffer( ctx );
710 :
711 : /* All control fragments sent by the snaprd tile must be fully
712 : acknowledged by all downstream consumers before processing can
713 : proceed, to prevent tile state machines from getting out of sync
714 : (see fd_ssctrl.h for more details). Currently there are two
715 : downstream consumers, snapdc and snapin. */
716 0 : #define NUM_SNAP_CONSUMERS (2UL)
717 :
718 0 : switch ( ctx->state ) {
719 0 : case FD_SNAPRD_STATE_WAITING_FOR_PEERS: {
720 0 : if( FD_UNLIKELY( now>ctx->deadline_nanos ) ) FD_LOG_ERR(( "timed out waiting for peers." ));
721 :
722 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, ULONG_MAX );
723 0 : if( FD_LIKELY( best.addr.l ) ) {
724 0 : ctx->state = FD_SNAPRD_STATE_COLLECTING_PEERS;
725 0 : ctx->deadline_nanos = now+FD_SNAPRD_GOSSIP_TIMEOUT_DEADLINE_NANOS;
726 0 : }
727 0 : break;
728 0 : }
729 0 : case FD_SNAPRD_STATE_WAITING_FOR_PEERS_INCREMENTAL: {
730 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, ULONG_MAX );
731 0 : if( FD_LIKELY( best.addr.l ) ) {
732 0 : ctx->state = FD_SNAPRD_STATE_COLLECTING_PEERS_INCREMENTAL;
733 0 : ctx->deadline_nanos = now;
734 0 : }
735 0 : break;
736 0 : }
737 0 : case FD_SNAPRD_STATE_COLLECTING_PEERS: {
738 0 : if( FD_UNLIKELY( (!gossip_saturated( ctx, now ) || !all_entrypoints_received( ctx )) && now<ctx->deadline_nanos ) ) break;
739 :
740 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 0, ULONG_MAX );
741 0 : if( FD_UNLIKELY( !best.addr.l ) ) {
742 0 : ctx->state = FD_SNAPRD_STATE_WAITING_FOR_PEERS;
743 0 : break;
744 0 : }
745 :
746 0 : fd_sscluster_slot_t cluster = fd_sspeer_selector_cluster_slot( ctx->selector );
747 0 : if( FD_UNLIKELY( cluster.incremental==ULONG_MAX && ctx->config.incremental_snapshot_fetch ) ) {
748 : /* We must have a cluster full slot to be in this state. */
749 0 : FD_TEST( cluster.full!=ULONG_MAX );
750 : /* fall back to full snapshot only if the highest cluster slot
751 : is a full snapshot only */
752 0 : ctx->config.incremental_snapshot_fetch = 0;
753 0 : }
754 :
755 0 : ulong cluster_slot = ctx->config.incremental_snapshot_fetch ? cluster.incremental : cluster.full;
756 0 : ulong local_slot = ctx->config.incremental_snapshot_fetch ? ctx->local_in.incremental_snapshot_slot : ctx->local_in.full_snapshot_slot;
757 0 : ulong local_slot_with_download = local_slot;
758 0 : int local_too_old = local_slot!=ULONG_MAX && local_slot<fd_ulong_sat_sub( cluster_slot, ctx->config.maximum_local_snapshot_age );
759 0 : int local_full_only = ctx->local_in.incremental_snapshot_slot==ULONG_MAX && ctx->local_in.full_snapshot_slot!=ULONG_MAX;
760 0 : if( FD_LIKELY( (ctx->config.incremental_snapshot_fetch && local_full_only) || local_too_old ) ) {
761 0 : fd_sspeer_t best_incremental = fd_sspeer_selector_best( ctx->selector, 1, ctx->local_in.full_snapshot_slot );
762 0 : if( FD_LIKELY( best_incremental.addr.l ) ) {
763 0 : ctx->predicted_incremental.slot = best_incremental.ssinfo.incremental.slot;
764 0 : local_slot_with_download = best_incremental.ssinfo.incremental.slot;
765 0 : ctx->local_in.incremental_snapshot_slot = ULONG_MAX; /* don't use the local incremental snapshot */
766 0 : }
767 0 : }
768 :
769 0 : int can_use_local_full = local_slot_with_download!=ULONG_MAX && local_slot_with_download>=fd_ulong_sat_sub( cluster_slot, ctx->config.maximum_local_snapshot_age );
770 0 : if( FD_LIKELY( can_use_local_full ) ) {
771 0 : send_expected_slot( stem, local_slot );
772 :
773 0 : FD_LOG_NOTICE(( "reading full snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
774 0 : ctx->predicted_incremental.full_slot = ctx->local_in.full_snapshot_slot;
775 0 : ctx->metrics.full.bytes_total = ctx->local_in.full_snapshot_size;
776 0 : ctx->state = FD_SNAPRD_STATE_READING_FULL_FILE;
777 0 : } else {
778 0 : if( FD_UNLIKELY( !ctx->config.incremental_snapshot_fetch ) ) send_expected_slot( stem, best.ssinfo.full.slot );
779 :
780 0 : fd_sspeer_t best_incremental = fd_sspeer_selector_best( ctx->selector, 1, best.ssinfo.full.slot );
781 0 : if( FD_LIKELY( best_incremental.addr.l ) ) {
782 0 : ctx->predicted_incremental.slot = best_incremental.ssinfo.incremental.slot;
783 0 : send_expected_slot( stem, best_incremental.ssinfo.incremental.slot );
784 0 : }
785 :
786 0 : FD_LOG_NOTICE(( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( best.addr.addr ), best.addr.port ));
787 0 : ctx->addr = best.addr;
788 0 : ctx->state = FD_SNAPRD_STATE_READING_FULL_HTTP;
789 0 : ctx->predicted_incremental.full_slot = best.ssinfo.full.slot;
790 0 : fd_sshttp_init( ctx->sshttp, best.addr, "/snapshot.tar.bz2", 17UL, now );
791 0 : }
792 0 : break;
793 0 : }
794 0 : case FD_SNAPRD_STATE_COLLECTING_PEERS_INCREMENTAL: {
795 0 : if( FD_UNLIKELY( now<ctx->deadline_nanos ) ) break;
796 :
797 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
798 0 : if( FD_UNLIKELY( !best.addr.l ) ) {
799 0 : ctx->state = FD_SNAPRD_STATE_WAITING_FOR_PEERS_INCREMENTAL;
800 0 : break;
801 0 : }
802 :
803 0 : ctx->addr = best.addr;
804 0 : FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( best.addr.addr ), best.addr.port ));
805 0 : fd_sshttp_init( ctx->sshttp, best.addr, "/incremental-snapshot.tar.bz2", 29UL, fd_log_wallclock() );
806 0 : ctx->state = FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP;
807 0 : break;
808 0 : }
809 0 : case FD_SNAPRD_STATE_READING_FULL_FILE:
810 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE:
811 0 : read_file_data( ctx, stem );
812 0 : break;
813 0 : case FD_SNAPRD_STATE_READING_FULL_HTTP:
814 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP: {
815 0 : read_http_data( ctx, stem, now );
816 0 : break;
817 0 : }
818 0 : case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE:
819 0 : if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
820 0 : ctx->ack_cnt = 0UL;
821 :
822 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
823 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
824 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_FILE_RESET;
825 0 : ctx->malformed = 0;
826 0 : break;
827 0 : }
828 :
829 0 : ctx->state = FD_SNAPRD_STATE_SHUTDOWN;
830 0 : remove_temp_files( ctx );
831 0 : metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
832 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
833 0 : break;
834 0 : case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP:
835 0 : if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
836 0 : ctx->ack_cnt = 0UL;
837 :
838 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
839 0 : fd_stem_publish( stem, ctx->out_snapctl.idx, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
840 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
841 0 : ctx->malformed = 0;
842 0 : break;
843 0 : }
844 :
845 0 : if( FD_UNLIKELY( ctx->local_out.write_buffer_len ) ) break;
846 :
847 0 : rename_snapshots( ctx );
848 0 : ctx->state = FD_SNAPRD_STATE_SHUTDOWN;
849 0 : metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
850 0 : fd_stem_publish( stem, ctx->out_snapctl.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
851 0 : break;
852 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_FILE:
853 0 : if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
854 0 : ctx->ack_cnt = 0UL;
855 :
856 0 : if( FD_LIKELY( !ctx->config.incremental_snapshot_fetch ) ) {
857 0 : ctx->state = FD_SNAPRD_STATE_SHUTDOWN;
858 0 : remove_temp_files( ctx );
859 0 : metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
860 0 : fd_stem_publish( stem, ctx->out_snapctl.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
861 0 : break;
862 0 : }
863 :
864 0 : if( FD_LIKELY( ctx->local_in.incremental_snapshot_slot==ULONG_MAX ) ) {
865 0 : ctx->state = FD_SNAPRD_STATE_COLLECTING_PEERS_INCREMENTAL;
866 0 : ctx->deadline_nanos = 0L;
867 0 : } else {
868 0 : FD_LOG_NOTICE(( "reading incremental snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path ));
869 0 : ctx->metrics.incremental.bytes_total = ctx->local_in.incremental_snapshot_size;
870 0 : ctx->state = FD_SNAPRD_STATE_READING_INCREMENTAL_FILE;
871 0 : }
872 0 : break;
873 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP:
874 0 : if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
875 0 : ctx->ack_cnt = 0UL;
876 :
877 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
878 0 : fd_stem_publish( stem, ctx->out_snapctl.idx, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
879 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
880 0 : ctx->malformed = 0;
881 0 : break;
882 0 : }
883 :
884 0 : if( FD_UNLIKELY( ctx->local_out.write_buffer_len ) ) break;
885 :
886 0 : if( FD_LIKELY( !ctx->config.incremental_snapshot_fetch ) ) {
887 0 : rename_snapshots( ctx );
888 0 : ctx->state = FD_SNAPRD_STATE_SHUTDOWN;
889 0 : metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
890 0 : fd_stem_publish( stem, ctx->out_snapctl.idx, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
891 0 : break;
892 0 : }
893 :
894 : /* Get the best incremental peer to download from */
895 0 : fd_sspeer_t best = fd_sspeer_selector_best( ctx->selector, 1, ctx->predicted_incremental.full_slot );
896 0 : if( FD_UNLIKELY( !best.addr.l ) ) {
897 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
898 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
899 0 : break;
900 0 : }
901 :
902 0 : if( FD_UNLIKELY( ctx->predicted_incremental.slot!=best.ssinfo.incremental.slot ) ) {
903 0 : ctx->predicted_incremental.slot = best.ssinfo.incremental.slot;
904 0 : send_expected_slot( stem, best.ssinfo.incremental.slot );
905 0 : }
906 :
907 0 : ctx->addr = best.addr;
908 0 : FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
909 0 : fd_sshttp_init( ctx->sshttp, ctx->addr, "/incremental-snapshot.tar.bz2", 29UL, fd_log_wallclock() );
910 0 : ctx->state = FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP;
911 0 : break;
912 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET:
913 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_FILE_RESET:
914 0 : if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
915 0 : ctx->ack_cnt = 0UL;
916 :
917 0 : ctx->gui_full_path_published = 0;
918 0 : ctx->metrics.full.bytes_read = 0UL;
919 0 : ctx->metrics.full.bytes_written = 0UL;
920 0 : ctx->metrics.full.bytes_total = 0UL;
921 :
922 0 : ctx->gui_incremental_path_published = 0;
923 0 : ctx->metrics.incremental.bytes_read = 0UL;
924 0 : ctx->metrics.incremental.bytes_written = 0UL;
925 0 : ctx->metrics.incremental.bytes_total = 0UL;
926 :
927 0 : ctx->state = FD_SNAPRD_STATE_COLLECTING_PEERS;
928 0 : ctx->deadline_nanos = 0L;
929 0 : break;
930 0 : case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
931 0 : if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
932 0 : ctx->ack_cnt = 0UL;
933 :
934 0 : ctx->metrics.incremental.bytes_read = 0UL;
935 0 : ctx->metrics.incremental.bytes_written = 0UL;
936 0 : ctx->metrics.incremental.bytes_total = 0UL;
937 :
938 0 : ctx->state = FD_SNAPRD_STATE_COLLECTING_PEERS_INCREMENTAL;
939 0 : ctx->deadline_nanos = 0L;
940 0 : break;
941 0 : case FD_SNAPRD_STATE_SHUTDOWN:
942 0 : break;
943 0 : default: {
944 0 : FD_LOG_ERR(( "unexpected state %d", ctx->state ));
945 0 : break;
946 0 : }
947 0 : }
948 0 : }
949 :
950 : static int
951 : before_frag( fd_snaprd_tile_t * ctx FD_PARAM_UNUSED,
952 : ulong in_idx,
953 : ulong seq FD_PARAM_UNUSED,
954 0 : ulong sig ) {
955 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) {
956 0 : return !( ( sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO ||
957 0 : sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE ||
958 0 : sig==FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES ) &&
959 0 : ( ctx->config.entrypoints_enabled || ctx->config.gossip_peers_enabled ) && ctx->peer_selection );
960 0 : }
961 0 : return 0;
962 0 : }
963 :
964 : static void
965 : during_frag( fd_snaprd_tile_t * ctx,
966 : ulong in_idx,
967 : ulong seq FD_PARAM_UNUSED,
968 : ulong sig FD_PARAM_UNUSED,
969 : ulong chunk,
970 : ulong sz,
971 0 : ulong ctl FD_PARAM_UNUSED) {
972 0 : if( ctx->in_kind[ in_idx ]!=IN_KIND_GOSSIP ) return;
973 :
974 0 : if( FD_UNLIKELY( chunk<ctx->gossip_in.chunk0 ||
975 0 : chunk>ctx->gossip_in.wmark ||
976 0 : sz>sizeof(fd_gossip_update_message_t) ) ) {
977 0 : FD_LOG_ERR(( "snaprd: unexpected chunk %lu", chunk ));
978 0 : }
979 0 : fd_memcpy( &ctx->gossip.tmp_upd_buf, fd_chunk_to_laddr( ctx->gossip_in.mem, chunk ), sz );
980 0 : }
981 :
982 : static void
983 : after_frag( fd_snaprd_tile_t * ctx,
984 : ulong in_idx,
985 : ulong seq,
986 : ulong sig,
987 : ulong sz,
988 : ulong tsorig,
989 : ulong tspub,
990 0 : fd_stem_context_t * stem ) {
991 0 : (void)seq;
992 0 : (void)tsorig;
993 0 : (void)tspub;
994 0 : (void)sz;
995 :
996 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) {
997 0 : fd_gossip_update_message_t * msg = &ctx->gossip.tmp_upd_buf;
998 0 : switch( msg->tag ) {
999 0 : case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO: {
1000 0 : fd_ip4_port_t cur_addr = { .l = 0 };
1001 0 : fd_snaprd_gossip_ci_entry_t * entry = NULL;
1002 0 : ulong idx = gossip_ci_map_idx_query_const( ctx->gossip.ci_map, (fd_pubkey_t const *)msg->origin_pubkey, ULONG_MAX, ctx->gossip.ci_pool );
1003 0 : if( FD_LIKELY( idx!=ULONG_MAX ) ) {
1004 0 : entry = gossip_ci_pool_ele( ctx->gossip.ci_pool, idx );
1005 0 : cur_addr = entry->rpc_addr;
1006 0 : }
1007 :
1008 0 : fd_ip4_port_t new_addr = msg->contact_info.contact_info->sockets[ FD_CONTACT_INFO_SOCKET_RPC ];
1009 0 : new_addr.port = fd_ushort_bswap( new_addr.port );
1010 :
1011 0 : fd_ip4_port_t gossip_addr = msg->contact_info.contact_info->sockets[ FD_CONTACT_INFO_SOCKET_GOSSIP ];
1012 0 : gossip_addr.port = fd_ushort_bswap( gossip_addr.port );
1013 0 : int addr_is_entrypoint = is_entrypoint( ctx, gossip_addr );
1014 0 : if( FD_LIKELY( (ctx->config.entrypoints_enabled && addr_is_entrypoint) || ctx->config.gossip_peers_enabled ) ) {
1015 0 : if( FD_UNLIKELY( cur_addr.l!=new_addr.l ) ) {
1016 0 : if( FD_LIKELY( !!cur_addr.l ) ) {
1017 0 : int removed = fd_ssping_remove( ctx->ssping, cur_addr );
1018 0 : if( FD_LIKELY( removed ) ) fd_sspeer_selector_remove( ctx->selector, cur_addr );
1019 0 : }
1020 0 : if( FD_LIKELY( !!new_addr.l ) ) fd_ssping_add( ctx->ssping, new_addr );
1021 :
1022 0 : if( FD_LIKELY( entry ) ) {
1023 0 : entry->rpc_addr = new_addr;
1024 0 : entry->wallclock_nanos = msg->wallclock_nanos;
1025 0 : }
1026 0 : }
1027 :
1028 0 : if( FD_UNLIKELY( entry==NULL ) ) {
1029 0 : entry = gossip_ci_pool_ele( ctx->gossip.ci_pool, msg->contact_info.idx );
1030 0 : entry->pubkey = *(fd_pubkey_t const *)msg->origin_pubkey;
1031 0 : gossip_ci_map_idx_insert( ctx->gossip.ci_map, msg->contact_info.idx, ctx->gossip.ci_pool );
1032 0 : entry->gossip_addr = gossip_addr;
1033 0 : entry->rpc_addr = new_addr;
1034 0 : entry->wallclock_nanos = msg->wallclock_nanos;
1035 0 : }
1036 0 : }
1037 0 : break;
1038 0 : }
1039 0 : case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE: {
1040 0 : ulong idx = gossip_ci_map_idx_query_const( ctx->gossip.ci_map, (fd_pubkey_t const *)msg->origin_pubkey, ULONG_MAX, ctx->gossip.ci_pool );
1041 0 : fd_ip4_port_t addr = gossip_ci_pool_ele_const( ctx->gossip.ci_pool, idx )->rpc_addr;
1042 0 : if( FD_LIKELY( !!addr.l ) ) {
1043 0 : int removed = fd_ssping_remove( ctx->ssping, addr );
1044 0 : if( FD_LIKELY( removed ) ) fd_sspeer_selector_remove( ctx->selector, addr );
1045 0 : }
1046 0 : gossip_ci_map_idx_remove_fast( ctx->gossip.ci_map, idx, ctx->gossip.ci_pool );
1047 0 : break;
1048 0 : }
1049 0 : case FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES: {
1050 0 : ulong idx = gossip_ci_map_idx_query_const( ctx->gossip.ci_map, (fd_pubkey_t const *)msg->origin_pubkey, ULONG_MAX, ctx->gossip.ci_pool );
1051 :
1052 0 : if( FD_LIKELY( idx!=ULONG_MAX ) ) {
1053 0 : fd_snaprd_gossip_ci_entry_t * entry = gossip_ci_pool_ele( ctx->gossip.ci_pool, idx );
1054 0 : int addr_is_entrypoint = is_entrypoint( ctx, entry->gossip_addr );
1055 0 : if( FD_LIKELY( (ctx->config.entrypoints_enabled && addr_is_entrypoint) || ctx->config.gossip_peers_enabled ) ) {
1056 0 : on_snapshot_hash( ctx, entry->rpc_addr, msg );
1057 0 : }
1058 0 : }
1059 0 : break;
1060 0 : }
1061 0 : default:
1062 0 : FD_LOG_ERR(( "snaprd: unexpected gossip tag %u", (uint)msg->tag ));
1063 0 : break;
1064 0 : }
1065 :
1066 0 : } else {
1067 0 : FD_TEST( sig==FD_SNAPSHOT_MSG_CTRL_ACK || sig==FD_SNAPSHOT_MSG_CTRL_MALFORMED );
1068 :
1069 0 : if( FD_LIKELY( sig==FD_SNAPSHOT_MSG_CTRL_ACK ) ) ctx->ack_cnt++;
1070 0 : else {
1071 0 : FD_TEST( ctx->state!=FD_SNAPRD_STATE_SHUTDOWN &&
1072 0 : ctx->state!=FD_SNAPRD_STATE_COLLECTING_PEERS &&
1073 0 : ctx->state!=FD_SNAPRD_STATE_WAITING_FOR_PEERS );
1074 :
1075 0 : switch( ctx->state ) {
1076 0 : case FD_SNAPRD_STATE_READING_FULL_FILE:
1077 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_FILE:
1078 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_FILE_RESET:
1079 0 : FD_LOG_ERR(( "error reading snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
1080 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE:
1081 0 : case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE:
1082 0 : FD_LOG_ERR(( "error reading snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path ));
1083 0 : case FD_SNAPRD_STATE_READING_FULL_HTTP:
1084 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP:
1085 0 : FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
1086 0 : FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
1087 0 : fd_sshttp_cancel( ctx->sshttp );
1088 0 : fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
1089 0 : fd_stem_publish( stem, ctx->out_snapctl.idx, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
1090 0 : fd_sspeer_selector_remove( ctx->selector, ctx->addr );
1091 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
1092 0 : break;
1093 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP:
1094 0 : case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP:
1095 0 : if( FD_UNLIKELY( ctx->malformed ) ) break;
1096 :
1097 0 : FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2",
1098 0 : FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
1099 0 : fd_sshttp_cancel( ctx->sshttp );
1100 0 : fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
1101 0 : fd_sspeer_selector_remove( ctx->selector, ctx->addr );
1102 : /* We would like to transition to FULL_HTTP_RESET, but we
1103 : can't do it just yet, because we have already sent a DONE
1104 : control fragment, and need to wait for acknowledges to come
1105 : back first, to ensure there's only one control message
1106 : outstanding at a time. */
1107 0 : ctx->malformed = 1;
1108 0 : break;
1109 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET:
1110 0 : case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP_RESET:
1111 0 : break;
1112 0 : default:
1113 0 : FD_LOG_ERR(( "unexpected state %d", ctx->state ));
1114 0 : break;
1115 0 : }
1116 0 : }
1117 0 : }
1118 0 : }
1119 :
1120 : static void
1121 : privileged_init( fd_topo_t * topo,
1122 0 : fd_topo_tile_t * tile ) {
1123 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1124 :
1125 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1126 0 : fd_snaprd_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaprd_tile_t), sizeof(fd_snaprd_tile_t) );
1127 :
1128 0 : fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
1129 :
1130 : /* By default, the snaprd tile selects peers and its initial state is
1131 : WAITING_FOR_PEERS. */
1132 0 : ctx->peer_selection = 1;
1133 0 : ctx->state = FD_SNAPRD_STATE_WAITING_FOR_PEERS;
1134 0 : ctx->deadline_nanos = fd_log_wallclock() + FD_SNAPRD_WAITING_FOR_PEERS_TIMEOUT_DEADLINE_NANOS;
1135 :
1136 0 : ctx->local_in.full_snapshot_fd = -1;
1137 0 : ctx->local_in.incremental_snapshot_fd = -1;
1138 0 : ctx->local_out.dir_fd = -1;
1139 0 : ctx->local_out.full_snapshot_fd = -1;
1140 0 : ctx->local_out.incremental_snapshot_fd = -1;
1141 :
1142 0 : fd_ssarchive_remove_old_snapshots( tile->snaprd.snapshots_path,
1143 0 : tile->snaprd.max_full_snapshots_to_keep,
1144 0 : tile->snaprd.max_incremental_snapshots_to_keep );
1145 :
1146 0 : ulong full_slot = ULONG_MAX;
1147 0 : ulong incremental_slot = ULONG_MAX;
1148 0 : char full_path[ PATH_MAX ] = {0};
1149 0 : char incremental_path[ PATH_MAX ] = {0};
1150 0 : if( FD_UNLIKELY( -1==fd_ssarchive_latest_pair( tile->snaprd.snapshots_path,
1151 0 : tile->snaprd.incremental_snapshot_fetch,
1152 0 : &full_slot,
1153 0 : &incremental_slot,
1154 0 : full_path,
1155 0 : incremental_path ) ) ) {
1156 0 : if( FD_UNLIKELY( !tile->snaprd.do_download ) ) {
1157 0 : FD_LOG_ERR(( "No snapshots found in `%s` and downloading is disabled. "
1158 0 : "Please enable downloading via [snapshots.download] and restart.", tile->snaprd.snapshots_path ));
1159 0 : }
1160 0 : ctx->local_in.full_snapshot_slot = ULONG_MAX;
1161 0 : ctx->local_in.incremental_snapshot_slot = ULONG_MAX;
1162 0 : } else {
1163 0 : FD_TEST( full_slot!=ULONG_MAX );
1164 :
1165 0 : ctx->local_in.full_snapshot_slot = full_slot;
1166 0 : ctx->local_in.incremental_snapshot_slot = incremental_slot;
1167 :
1168 0 : strncpy( ctx->local_in.full_snapshot_path, full_path, PATH_MAX );
1169 0 : ctx->local_in.full_snapshot_fd = open( ctx->local_in.full_snapshot_path, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
1170 0 : if( FD_UNLIKELY( -1==ctx->local_in.full_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_in.full_snapshot_path, errno, fd_io_strerror( errno ) ));
1171 :
1172 0 : struct stat full_stat;
1173 0 : if( FD_UNLIKELY( -1==fstat( ctx->local_in.full_snapshot_fd, &full_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", full_path, errno, fd_io_strerror( errno ) ));
1174 0 : if( FD_UNLIKELY( !S_ISREG( full_stat.st_mode ) ) ) FD_LOG_ERR(( "full snapshot path `%s` is not a regular file", full_path ));
1175 0 : ctx->local_in.full_snapshot_size = (ulong)full_stat.st_size;
1176 :
1177 0 : if( FD_LIKELY( incremental_slot!=ULONG_MAX ) ) {
1178 0 : strncpy( ctx->local_in.incremental_snapshot_path, incremental_path, PATH_MAX );
1179 0 : ctx->local_in.incremental_snapshot_fd = open( ctx->local_in.incremental_snapshot_path, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
1180 0 : if( FD_UNLIKELY( -1==ctx->local_in.incremental_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_in.incremental_snapshot_path, errno, fd_io_strerror( errno ) ));
1181 :
1182 0 : struct stat incremental_stat;
1183 0 : if( FD_UNLIKELY( -1==fstat( ctx->local_in.incremental_snapshot_fd, &incremental_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", incremental_path, errno, fd_io_strerror( errno ) ));
1184 0 : if( FD_UNLIKELY( !S_ISREG( incremental_stat.st_mode ) ) ) FD_LOG_ERR(( "incremental snapshot path `%s` is not a regular file", incremental_path ));
1185 0 : ctx->local_in.incremental_snapshot_size = (ulong)incremental_stat.st_size;
1186 0 : }
1187 :
1188 0 : ctx->local_out.dir_fd = -1;
1189 0 : ctx->local_out.full_snapshot_fd = -1;
1190 0 : ctx->local_out.incremental_snapshot_fd = -1;
1191 :
1192 0 : if( FD_UNLIKELY( tile->snaprd.maximum_local_snapshot_age==0U ) ) {
1193 : /* Disable peer selection if we are reading snapshots from disk
1194 : and there is no maximum local snapshot age set. Set the
1195 : initial state to READING_FULL_FILE to avoid peer selection
1196 : logic.
1197 :
1198 : TODO: Why? Document in TOML. */
1199 0 : ctx->peer_selection = 0;
1200 0 : ctx->state = FD_SNAPRD_STATE_READING_FULL_FILE;
1201 0 : ctx->metrics.full.bytes_total = ctx->local_in.full_snapshot_size;
1202 0 : FD_LOG_NOTICE(( "reading full snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
1203 0 : }
1204 0 : }
1205 :
1206 : /* Set up download descriptors because even if we have local
1207 : snapshots, we may need to download new snapshots if the local
1208 : snapshots are too old. */
1209 0 : ctx->local_out.dir_fd = open( tile->snaprd.snapshots_path, O_DIRECTORY|O_CLOEXEC );
1210 0 : if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", tile->snaprd.snapshots_path, errno, fd_io_strerror( errno ) ));
1211 :
1212 0 : FD_TEST( fd_cstr_printf_check( ctx->local_out.full_snapshot_path, PATH_MAX, NULL, "%s/snapshot.tar.bz2-partial", tile->snaprd.snapshots_path ) );
1213 0 : ctx->local_out.full_snapshot_fd = openat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
1214 0 : if( FD_UNLIKELY( -1==ctx->local_out.full_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_out.full_snapshot_path, errno, fd_io_strerror( errno ) ));
1215 :
1216 0 : if( FD_LIKELY( tile->snaprd.incremental_snapshot_fetch ) ) {
1217 0 : FD_TEST( fd_cstr_printf_check( ctx->local_out.incremental_snapshot_path, PATH_MAX, NULL, "%s/incremental-snapshot.tar.bz2-partial", tile->snaprd.snapshots_path ) );
1218 0 : ctx->local_out.incremental_snapshot_fd = openat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
1219 0 : if( FD_UNLIKELY( -1==ctx->local_out.incremental_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_out.incremental_snapshot_path, errno, fd_io_strerror( errno ) ));
1220 0 : } else {
1221 0 : ctx->local_out.incremental_snapshot_fd = -1;
1222 0 : }
1223 0 : }
1224 :
1225 : static inline fd_restore_out_link_t
1226 : out1( fd_topo_t const * topo,
1227 : fd_topo_tile_t const * tile,
1228 0 : char const * name ) {
1229 0 : ulong idx = ULONG_MAX;
1230 :
1231 0 : for( ulong i=0UL; i<tile->out_cnt; i++ ) {
1232 0 : fd_topo_link_t const * link = &topo->links[ tile->out_link_id[ i ] ];
1233 0 : if( !strcmp( link->name, name ) ) {
1234 0 : if( FD_UNLIKELY( idx!=ULONG_MAX ) ) FD_LOG_ERR(( "tile %s:%lu had multiple output links named %s but expected one", tile->name, tile->kind_id, name ));
1235 0 : idx = i;
1236 0 : }
1237 0 : }
1238 :
1239 0 : if( FD_UNLIKELY( idx==ULONG_MAX ) ) return (fd_restore_out_link_t){ .idx = ULONG_MAX, .mem = NULL, .chunk0 = 0, .wmark = 0, .chunk = 0, .mtu = 0 };
1240 :
1241 0 : ulong mtu = topo->links[ tile->out_link_id[ idx ] ].mtu;
1242 0 : if( FD_UNLIKELY( mtu==0UL ) ) return (fd_restore_out_link_t){ .idx = idx, .mem = NULL, .chunk0 = ULONG_MAX, .wmark = ULONG_MAX, .chunk = ULONG_MAX, .mtu = mtu };
1243 :
1244 0 : void * mem = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ idx ] ].dcache_obj_id ].wksp_id ].wksp;
1245 0 : ulong chunk0 = fd_dcache_compact_chunk0( mem, topo->links[ tile->out_link_id[ idx ] ].dcache );
1246 0 : ulong wmark = fd_dcache_compact_wmark ( mem, topo->links[ tile->out_link_id[ idx ] ].dcache, mtu );
1247 0 : return (fd_restore_out_link_t){ .idx = idx, .mem = mem, .chunk0 = chunk0, .wmark = wmark, .chunk = chunk0, .mtu = mtu };
1248 0 : }
1249 :
1250 : static void
1251 : unprivileged_init( fd_topo_t * topo,
1252 0 : fd_topo_tile_t * tile ) {
1253 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1254 :
1255 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1256 0 : fd_snaprd_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaprd_tile_t), sizeof(fd_snaprd_tile_t) );
1257 0 : void * _sshttp = FD_SCRATCH_ALLOC_APPEND( l, fd_sshttp_align(), fd_sshttp_footprint() );
1258 0 : void * _ssping = FD_SCRATCH_ALLOC_APPEND( l, fd_ssping_align(), fd_ssping_footprint( FD_SSPING_MAX_PEERS ) );
1259 0 : void * _ci_pool = FD_SCRATCH_ALLOC_APPEND( l, gossip_ci_pool_align(), gossip_ci_pool_footprint( FD_CONTACT_INFO_TABLE_SIZE ) );
1260 0 : void * _ci_map = FD_SCRATCH_ALLOC_APPEND( l, gossip_ci_map_align(), gossip_ci_map_footprint( gossip_ci_map_chain_cnt_est( FD_CONTACT_INFO_TABLE_SIZE ) ) );
1261 0 : void * _ssresolver = FD_SCRATCH_ALLOC_APPEND( l, fd_http_resolver_align(), fd_http_resolver_footprint( FD_SNAPRD_MAX_HTTP_PEERS ) );
1262 0 : void * _selector = FD_SCRATCH_ALLOC_APPEND( l, fd_sspeer_selector_align(), fd_sspeer_selector_footprint( FD_SSPING_MAX_PEERS ) );
1263 :
1264 0 : ctx->ack_cnt = 0UL;
1265 0 : ctx->malformed = 0;
1266 :
1267 0 : ctx->local_out.write_buffer_pos = 0UL;
1268 0 : ctx->local_out.write_buffer_len = 0UL;
1269 :
1270 0 : fd_memcpy( ctx->config.path, tile->snaprd.snapshots_path, PATH_MAX );
1271 0 : ctx->config.incremental_snapshot_fetch = tile->snaprd.incremental_snapshot_fetch;
1272 0 : ctx->config.do_download = tile->snaprd.do_download;
1273 0 : ctx->config.maximum_local_snapshot_age = tile->snaprd.maximum_local_snapshot_age;
1274 0 : ctx->config.minimum_download_speed_mib = tile->snaprd.minimum_download_speed_mib;
1275 0 : ctx->config.max_full_snapshots_to_keep = tile->snaprd.max_full_snapshots_to_keep;
1276 0 : ctx->config.max_incremental_snapshots_to_keep = tile->snaprd.max_incremental_snapshots_to_keep;
1277 0 : ctx->config.entrypoints_enabled = tile->snaprd.entrypoints_enabled;
1278 0 : ctx->config.gossip_peers_enabled = tile->snaprd.gossip_peers_enabled;
1279 :
1280 0 : if( FD_UNLIKELY( !tile->snaprd.maximum_download_retry_abort ) ) ctx->config.maximum_download_retry_abort = UINT_MAX;
1281 0 : else ctx->config.maximum_download_retry_abort = tile->snaprd.maximum_download_retry_abort;
1282 :
1283 0 : ctx->ssping = fd_ssping_join( fd_ssping_new( _ssping, FD_SSPING_MAX_PEERS, 1UL, on_ping, ctx ) );
1284 0 : FD_TEST( ctx->ssping );
1285 :
1286 0 : ctx->sshttp = fd_sshttp_join( fd_sshttp_new( _sshttp ) );
1287 0 : FD_TEST( ctx->sshttp );
1288 :
1289 0 : ctx->selector = fd_sspeer_selector_join( fd_sspeer_selector_new( _selector, FD_SSPING_MAX_PEERS, ctx->config.incremental_snapshot_fetch, 1UL ) );
1290 :
1291 0 : ctx->gossip.ci_pool = gossip_ci_pool_join( gossip_ci_pool_new( _ci_pool, FD_CONTACT_INFO_TABLE_SIZE ) );
1292 0 : FD_TEST( ctx->gossip.ci_pool );
1293 0 : ctx->gossip.ci_map = gossip_ci_map_join( gossip_ci_map_new( _ci_map, gossip_ci_map_chain_cnt_est( FD_CONTACT_INFO_TABLE_SIZE ), 0UL ) );
1294 :
1295 0 : ctx->gossip.entrypoints_cnt = tile->snaprd.gossip_entrypoints_cnt;
1296 0 : for( ulong i=0UL; i<tile->snaprd.gossip_entrypoints_cnt; i++ ) {
1297 0 : ctx->gossip.entrypoints[ i ].l = tile->snaprd.gossip_entrypoints[ i ].l;
1298 0 : ctx->gossip.entrypoints[ i ].port = fd_ushort_bswap( tile->snaprd.gossip_entrypoints[ i ].port ); /* TODO: should be fixed in a future PR */
1299 0 : }
1300 :
1301 0 : FD_TEST( tile->in_cnt<=MAX_IN_LINKS );
1302 0 : for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
1303 0 : fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
1304 0 : if( 0==strcmp( in_link->name, "gossip_out" ) ) {
1305 0 : ctx->in_kind[ i ] = IN_KIND_GOSSIP;
1306 0 : ctx->gossip_in.mem = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
1307 0 : ctx->gossip_in.chunk0 = fd_dcache_compact_chunk0( ctx->gossip_in.mem, in_link->dcache );
1308 0 : ctx->gossip_in.wmark = fd_dcache_compact_wmark ( ctx->gossip_in.mem, in_link->dcache, in_link->mtu );
1309 0 : ctx->gossip_in.mtu = in_link->mtu;
1310 0 : } else if( 0==strcmp( in_link->name, "snapdc_rd" ) ||
1311 0 : 0==strcmp( in_link->name, "snapin_rd" ) ) {
1312 0 : ctx->in_kind[ i ] = IN_KIND_SNAPCTL;
1313 0 : }
1314 0 : }
1315 :
1316 0 : ctx->ssresolver = fd_http_resolver_join( fd_http_resolver_new( _ssresolver, FD_SNAPRD_MAX_HTTP_PEERS, ctx->config.incremental_snapshot_fetch, on_resolve, ctx ) );
1317 0 : FD_TEST( ctx->ssresolver );
1318 :
1319 0 : if( FD_UNLIKELY( tile->out_cnt<2UL || tile->out_cnt>3UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2-3", tile->out_cnt ));
1320 0 : ctx->out_snapctl = out1( topo, tile, "snap_zstd" );
1321 0 : ctx->out_gui = out1( topo, tile, "snaprd_out" );
1322 0 : ctx->out_rp = out1( topo, tile, "snaprd_rp" );
1323 :
1324 0 : ctx->gui_incremental_path_published = fd_int_if( !!ctx->out_gui.mem, 0, 1 );
1325 0 : ctx->gui_full_path_published = fd_int_if( !!ctx->out_gui.mem, 0, 1 );
1326 :
1327 0 : for( ulong i=0UL; i<tile->snaprd.http.peers_cnt; i++ ) {
1328 0 : tile->snaprd.http.peers[ i ].port = fd_ushort_bswap( tile->snaprd.http.peers[ i ].port ); /* TODO: should be fixed in a future PR */
1329 0 : fd_ssping_add( ctx->ssping, tile->snaprd.http.peers[ i ] );
1330 0 : fd_http_resolver_add( ctx->ssresolver, tile->snaprd.http.peers[ i ] );
1331 0 : }
1332 :
1333 0 : ctx->predicted_incremental.full_slot = ULONG_MAX;
1334 0 : ctx->predicted_incremental.slot = ULONG_MAX;
1335 0 : ctx->predicted_incremental.dirty = 0;
1336 :
1337 0 : ctx->gossip.entrypoints_received = 0UL;
1338 0 : ctx->gossip.saturated = 0;
1339 0 : }
1340 :
1341 0 : #define STEM_BURST 3UL /* One control message, and one data message, and one expected slot message */
1342 0 : #define STEM_LAZY 1000L
1343 :
1344 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snaprd_tile_t
1345 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaprd_tile_t)
1346 :
1347 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
1348 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
1349 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
1350 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
1351 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
1352 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
1353 :
1354 : #include "../../disco/stem/fd_stem.c"
1355 :
1356 : fd_topo_run_tile_t fd_tile_snaprd = {
1357 : .name = NAME,
1358 : .rlimit_file_cnt_fn = rlimit_file_cnt,
1359 : .populate_allowed_seccomp = populate_allowed_seccomp,
1360 : .populate_allowed_fds = populate_allowed_fds,
1361 : .scratch_align = scratch_align,
1362 : .scratch_footprint = scratch_footprint,
1363 : .privileged_init = privileged_init,
1364 : .unprivileged_init = unprivileged_init,
1365 : .run = stem_run,
1366 : .keep_host_networking = 1,
1367 : .allow_connect = 1,
1368 : .allow_renameat = 1,
1369 : };
1370 :
1371 : #undef NAME
|