Line data Source code
1 : #include "utils/fd_ssping.h"
2 : #include "utils/fd_sshttp.h"
3 : #include "utils/fd_ssctrl.h"
4 : #include "utils/fd_ssarchive.h"
5 :
6 : #include "../../disco/topo/fd_topo.h"
7 : #include "../../disco/metrics/fd_metrics.h"
8 : #include "../../flamenco/gossip/fd_gossip_types.h"
9 :
10 : #include <errno.h>
11 : #include <fcntl.h>
12 : #include <string.h>
13 : #include <unistd.h>
14 : #include <stdio.h>
15 : #include <sys/stat.h>
16 :
17 : #define NAME "snaprd"
18 :
19 : /* The snaprd tile at a high level is a state machine that downloads
20 : snapshots from the network or reads snapshots from disk and produces
21 : a byte stream that is parsed by downstream snapshot consumer tiles.
22 : The snaprd tile gathers the latest SnapshotHashes information from
23 : gossip to decide whether to download snapshots or read local
24 : snapshots from disk. If the snaprd tile needs to download a snapshot,
25 : it goes through the process of discovering and selecting elegible
26 : peers from gossip to download from. */
27 :
28 0 : #define FD_SNAPRD_STATE_WAITING_FOR_PEERS ( 0) /* Waiting for first peer to arrive from gossip to download from */
29 0 : #define FD_SNAPRD_STATE_COLLECTING_PEERS ( 1) /* First peer arrived, wait a little longer to see if a better one arrives */
30 0 : #define FD_SNAPRD_STATE_READING_FULL_FILE ( 2) /* Full file looks better than peer, reading it from disk */
31 0 : #define FD_SNAPRD_STATE_FLUSHING_FULL_FILE ( 3) /* Full file was read ok, confirm it decompressed and inserted ok */
32 0 : #define FD_SNAPRD_STATE_FLUSHING_FULL_FILE_RESET ( 4) /* Resetting to load full snapshot from file again, confirm decompress and inserter are reset too */
33 0 : #define FD_SNAPRD_STATE_READING_INCREMENTAL_FILE ( 5) /* Incremental file looks better than peer, reading it from disk */
34 0 : #define FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE ( 6) /* Incremental file was read ok, confirm it decompressed and inserted ok */
35 0 : #define FD_SNAPRD_STATE_READING_FULL_HTTP ( 7) /* Peer was selected, reading full snapshot from HTTP */
36 0 : #define FD_SNAPRD_STATE_FLUSHING_FULL_HTTP ( 8) /* Full snapshot was downloaded ok, confirm it decompressed and inserted ok */
37 0 : #define FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET ( 9) /* Resetting to load full snapshot from HTTP again, confirm decompress and inserter are reset too */
38 0 : #define FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP (10) /* Peer was selected, reading incremental snapshot from HTTP */
39 0 : #define FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP (11) /* Incremental snapshot was downloaded ok, confirm it decompressed and inserted ok */
40 0 : #define FD_SNAPRD_STATE_SHUTDOWN (12) /* The tile is done, and has likely already exited */
41 :
42 0 : #define SNAPRD_FILE_BUF_SZ (1024UL*1024UL) /* 1 MiB */
43 :
44 0 : #define IN_KIND_SNAPCTL (0)
45 0 : #define IN_KIND_GOSSIP (1)
46 : #define MAX_IN_LINKS (3)
47 :
48 : struct fd_snaprd_tile {
49 : fd_ssping_t * ssping;
50 : fd_sshttp_t * sshttp;
51 :
52 : int state;
53 : int malformed;
54 : long deadline_nanos;
55 : ulong ack_cnt;
56 : int peer_selection;
57 :
58 : long diagnostic_deadline_nanos;
59 :
60 : fd_ip4_port_t addr;
61 :
62 : struct {
63 : ulong write_buffer_pos;
64 : ulong write_buffer_len;
65 : uchar write_buffer[ SNAPRD_FILE_BUF_SZ ];
66 :
67 : char full_snapshot_path[ PATH_MAX ];
68 : char incremental_snapshot_path[ PATH_MAX ];
69 :
70 : int dir_fd;
71 : int full_snapshot_fd;
72 : int incremental_snapshot_fd;
73 : } local_out;
74 :
75 : uchar in_kind[ MAX_IN_LINKS ];
76 :
77 : struct {
78 : ulong full_snapshot_slot;
79 : int full_snapshot_fd;
80 : char full_snapshot_path[ PATH_MAX ];
81 : ulong full_snapshot_size;
82 :
83 : ulong incremental_snapshot_slot;
84 : int incremental_snapshot_fd;
85 : char incremental_snapshot_path[ PATH_MAX ];
86 : ulong incremental_snapshot_size;
87 : } local_in;
88 :
89 : struct {
90 : char path[ PATH_MAX ];
91 : int do_download;
92 : int incremental_snapshot_fetch;
93 : uint maximum_local_snapshot_age;
94 : uint minimum_download_speed_mib;
95 : uint maximum_download_retry_abort;
96 : uint max_full_snapshots_to_keep;
97 : uint max_incremental_snapshots_to_keep;
98 : } config;
99 :
100 : struct {
101 : struct {
102 : ulong bytes_read;
103 : ulong bytes_written;
104 : ulong bytes_total;
105 : uint num_retries;
106 : } full;
107 :
108 : struct {
109 : ulong bytes_read;
110 : ulong bytes_written;
111 : ulong bytes_total;
112 : uint num_retries;
113 : } incremental;
114 : } metrics;
115 :
116 : /* TODO: Don't do this ... should be in the monitor instead */
117 : struct {
118 : ulong prev_bytes_read;
119 : ulong prev_accounts_inserted; volatile ulong * cur_accounts_inserted;
120 :
121 : ulong prev_snaprd_backp_prefrag; volatile ulong * cur_snaprd_backp_prefrag;
122 : ulong prev_snaprd_wait; volatile ulong * cur_snaprd_caughtup_postfrag;
123 : ulong prev_snapdc_backp_prefrag; volatile ulong * cur_snapdc_backp_prefrag;
124 : ulong prev_snapdc_wait; volatile ulong * cur_snapdc_caughtup_postfrag;
125 : ulong prev_snapin_backp_prefrag; volatile ulong * cur_snapin_backp_prefrag;
126 : ulong prev_snapin_wait; volatile ulong * cur_snapin_caughtup_postfrag;
127 : } diagnostics;
128 :
129 : struct {
130 : fd_wksp_t * mem;
131 : ulong chunk0;
132 : ulong wmark;
133 : ulong mtu;
134 : } gossip_in;
135 :
136 : struct {
137 : fd_gossip_update_message_t tmp_upd_buf;
138 : fd_contact_info_t * ci_table;
139 : } gossip;
140 :
141 : struct {
142 : fd_wksp_t * wksp;
143 : ulong chunk0;
144 : ulong wmark;
145 : ulong chunk;
146 : ulong mtu;
147 : } out;
148 : };
149 :
150 : typedef struct fd_snaprd_tile fd_snaprd_tile_t;
151 :
152 : static ulong
153 0 : scratch_align( void ) {
154 0 : return alignof(fd_snaprd_tile_t);
155 0 : }
156 :
157 : static ulong
158 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
159 0 : (void)tile;
160 0 : ulong l = FD_LAYOUT_INIT;
161 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snaprd_tile_t), sizeof(fd_snaprd_tile_t) );
162 0 : l = FD_LAYOUT_APPEND( l, fd_sshttp_align(), fd_sshttp_footprint() );
163 0 : l = FD_LAYOUT_APPEND( l, fd_ssping_align(), fd_ssping_footprint( 65536UL ) );
164 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_contact_info_t), sizeof(fd_contact_info_t) * FD_CONTACT_INFO_TABLE_SIZE );
165 0 : return FD_LAYOUT_FINI( l, alignof(fd_snaprd_tile_t) );
166 0 : }
167 :
168 : static inline int
169 0 : should_shutdown( fd_snaprd_tile_t * ctx ) {
170 0 : return ctx->state==FD_SNAPRD_STATE_SHUTDOWN;
171 0 : }
172 :
173 : static void
174 0 : metrics_write( fd_snaprd_tile_t * ctx ) {
175 0 : FD_MGAUGE_SET( SNAPRD, FULL_BYTES_READ, ctx->metrics.full.bytes_read );
176 0 : FD_MGAUGE_SET( SNAPRD, FULL_BYTES_WRITTEN, ctx->metrics.full.bytes_written );
177 0 : FD_MGAUGE_SET( SNAPRD, FULL_BYTES_TOTAL, ctx->metrics.full.bytes_total );
178 0 : FD_MGAUGE_SET( SNAPRD, FULL_DOWNLOAD_RETRIES, ctx->metrics.full.num_retries );
179 :
180 0 : FD_MGAUGE_SET( SNAPRD, INCREMENTAL_BYTES_READ, ctx->metrics.incremental.bytes_read );
181 0 : FD_MGAUGE_SET( SNAPRD, INCREMENTAL_BYTES_WRITTEN, ctx->metrics.incremental.bytes_written );
182 0 : FD_MGAUGE_SET( SNAPRD, INCREMENTAL_BYTES_TOTAL, ctx->metrics.incremental.bytes_total );
183 0 : FD_MGAUGE_SET( SNAPRD, INCREMENTAL_DOWNLOAD_RETRIES, ctx->metrics.incremental.num_retries );
184 :
185 0 : FD_MGAUGE_SET( SNAPRD, STATE, (ulong)ctx->state );
186 0 : }
187 :
188 : static void
189 : read_file_data( fd_snaprd_tile_t * ctx,
190 0 : fd_stem_context_t * stem ) {
191 0 : uchar * out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk );
192 :
193 0 : FD_TEST( ctx->state==FD_SNAPRD_STATE_READING_INCREMENTAL_FILE || ctx->state==FD_SNAPRD_STATE_READING_FULL_FILE );
194 0 : int full = ctx->state==FD_SNAPRD_STATE_READING_FULL_FILE;
195 0 : long result = read( full ? ctx->local_in.full_snapshot_fd : ctx->local_in.incremental_snapshot_fd , out, ctx->out.mtu );
196 0 : if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) return;
197 0 : else if( FD_UNLIKELY( -1==result ) ) FD_LOG_ERR(( "read() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
198 :
199 0 : switch( ctx->state ) {
200 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE:
201 0 : ctx->metrics.incremental.bytes_read += (ulong)result;
202 0 : break;
203 0 : case FD_SNAPRD_STATE_READING_FULL_FILE:
204 0 : ctx->metrics.full.bytes_read += (ulong)result;
205 0 : break;
206 0 : default:
207 0 : break;
208 0 : }
209 :
210 0 : if( FD_UNLIKELY( !result ) ) {
211 0 : switch( ctx->state ) {
212 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE:
213 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
214 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE;
215 0 : break;
216 0 : case FD_SNAPRD_STATE_READING_FULL_FILE:
217 0 : if( FD_LIKELY( ctx->config.incremental_snapshot_fetch ) ) {
218 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_EOF_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
219 0 : } else {
220 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
221 0 : }
222 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_FILE;
223 0 : break;
224 0 : default:
225 0 : break;
226 0 : }
227 0 : return;
228 0 : }
229 :
230 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out.chunk, (ulong)result, 0UL, 0UL, 0UL );
231 0 : ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, (ulong)result, ctx->out.chunk0, ctx->out.wmark );
232 0 : }
233 :
234 : static void
235 : read_http_data( fd_snaprd_tile_t * ctx,
236 : fd_stem_context_t * stem,
237 0 : long now ) {
238 0 : uchar * out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk );
239 :
240 0 : ulong buffer_avail = fd_ulong_if( -1!=ctx->local_out.dir_fd, SNAPRD_FILE_BUF_SZ-ctx->local_out.write_buffer_len, ULONG_MAX );
241 0 : ulong data_len = fd_ulong_min( buffer_avail, ctx->out.mtu );
242 0 : int result = fd_sshttp_advance( ctx->sshttp, &data_len, out, now );
243 :
244 0 : switch( result ) {
245 0 : case FD_SSHTTP_ADVANCE_AGAIN: break;
246 0 : case FD_SSHTTP_ADVANCE_ERROR: {
247 0 : FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
248 0 : FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), fd_ushort_bswap( ctx->addr.port ) ));
249 0 : fd_ssping_invalidate( ctx->ssping, ctx->addr, now );
250 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
251 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
252 0 : ctx->deadline_nanos = now;
253 0 : break;
254 0 : }
255 0 : case FD_SSHTTP_ADVANCE_DONE: {
256 0 : switch( ctx->state ) {
257 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP:
258 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
259 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP;
260 0 : break;
261 0 : case FD_SNAPRD_STATE_READING_FULL_HTTP:
262 0 : if( FD_LIKELY( ctx->config.incremental_snapshot_fetch ) ) {
263 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_EOF_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
264 0 : } else {
265 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
266 0 : }
267 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP;
268 0 : break;
269 0 : default:
270 0 : break;
271 0 : }
272 0 : break;
273 0 : }
274 0 : case FD_SSHTTP_ADVANCE_DATA: {
275 0 : if( FD_LIKELY( ctx->state==FD_SNAPRD_STATE_READING_FULL_HTTP ) ) ctx->metrics.full.bytes_total = fd_sshttp_content_len( ctx->sshttp );
276 0 : else ctx->metrics.incremental.bytes_total = fd_sshttp_content_len( ctx->sshttp );
277 :
278 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out.chunk, data_len, 0UL, 0UL, 0UL );
279 0 : ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, data_len, ctx->out.chunk0, ctx->out.wmark );
280 :
281 0 : ulong written_sz = 0UL;
282 0 : if( FD_LIKELY( -1!=ctx->local_out.dir_fd && !ctx->local_out.write_buffer_len ) ) {
283 0 : while( written_sz<data_len ) {
284 0 : int full = ctx->state==FD_SNAPRD_STATE_READING_FULL_HTTP;
285 0 : int fd = full ? ctx->local_out.full_snapshot_fd : ctx->local_out.incremental_snapshot_fd;
286 0 : long result = write( fd, out+written_sz, data_len-written_sz );
287 0 : if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) break;
288 0 : else if( FD_UNLIKELY( -1==result && errno==ENOSPC ) ) {
289 0 : char const * snapshot_path = full ? ctx->local_out.full_snapshot_path : ctx->local_out.incremental_snapshot_path;
290 0 : FD_LOG_ERR(( "Out of disk space when writing out snapshot data to `%s`", snapshot_path ));
291 0 : } else if( FD_UNLIKELY( -1==result ) ) {
292 0 : FD_LOG_ERR(( "write() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
293 0 : break;
294 0 : }
295 0 : written_sz += (ulong)result;
296 0 : }
297 0 : }
298 :
299 0 : if( FD_UNLIKELY( written_sz<data_len ) ) {
300 0 : fd_memcpy( ctx->local_out.write_buffer+ctx->local_out.write_buffer_len, out+written_sz, data_len-written_sz );
301 0 : }
302 0 : ctx->local_out.write_buffer_len += data_len-written_sz;
303 :
304 0 : switch( ctx->state ) {
305 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP:
306 0 : ctx->metrics.incremental.bytes_read += data_len;
307 0 : ctx->metrics.incremental.bytes_written += written_sz;
308 0 : break;
309 0 : case FD_SNAPRD_STATE_READING_FULL_HTTP:
310 0 : ctx->metrics.full.bytes_read += data_len;
311 0 : ctx->metrics.full.bytes_written += written_sz;
312 0 : break;
313 0 : default:
314 0 : FD_LOG_ERR(( "unexpected state %d", ctx->state ));
315 0 : break;
316 0 : }
317 :
318 0 : break;
319 0 : }
320 0 : default:
321 0 : FD_LOG_ERR(( "unexpected fd_sshttp_advance result %d", result ));
322 0 : break;
323 0 : }
324 0 : }
325 :
326 : static void
327 0 : drain_buffer( fd_snaprd_tile_t * ctx ) {
328 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPRD_STATE_READING_FULL_HTTP &&
329 0 : ctx->state!=FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP &&
330 0 : ctx->state!=FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP &&
331 0 : ctx->state!=FD_SNAPRD_STATE_FLUSHING_FULL_HTTP ) ) return;
332 :
333 0 : if( FD_LIKELY( -1==ctx->local_out.dir_fd || !ctx->local_out.write_buffer_len ) ) return;
334 :
335 0 : int full = ctx->state==FD_SNAPRD_STATE_READING_FULL_HTTP || ctx->state==FD_SNAPRD_STATE_FLUSHING_FULL_HTTP;
336 0 : int fd = full ? ctx->local_out.full_snapshot_fd : ctx->local_out.incremental_snapshot_fd;
337 :
338 0 : ulong written_sz = 0UL;
339 0 : while( ctx->local_out.write_buffer_pos+written_sz<ctx->local_out.write_buffer_len ) {
340 0 : long result = write( fd, ctx->local_out.write_buffer+ctx->local_out.write_buffer_pos+written_sz, ctx->local_out.write_buffer_len-written_sz );
341 0 : if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) break;
342 0 : else if( FD_UNLIKELY( -1==result && errno==ENOSPC ) ) {
343 0 : char const * snapshot_path = full ? ctx->local_out.full_snapshot_path : ctx->local_out.incremental_snapshot_path;
344 0 : FD_LOG_ERR(( "Out of disk space when writing out snapshot data to `%s`", snapshot_path ));
345 0 : } else if( FD_UNLIKELY( -1==result ) ) {
346 0 : FD_LOG_ERR(( "write() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
347 0 : break;
348 0 : }
349 0 : written_sz += (ulong)result;
350 0 : }
351 :
352 0 : ctx->local_out.write_buffer_pos += written_sz;
353 :
354 0 : if( FD_LIKELY( ctx->local_out.write_buffer_pos==ctx->local_out.write_buffer_len ) ) {
355 0 : ctx->local_out.write_buffer_pos = 0UL;
356 0 : ctx->local_out.write_buffer_len = 0UL;
357 0 : }
358 :
359 0 : if( FD_LIKELY( full ) ) ctx->metrics.full.bytes_written += written_sz;
360 0 : else ctx->metrics.incremental.bytes_written += written_sz;
361 0 : }
362 :
363 : static void
364 0 : rename_snapshots( fd_snaprd_tile_t * ctx ) {
365 0 : if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) return;
366 0 : char const * full_snapshot_name;
367 0 : char const * incremental_snapshot_name;
368 0 : fd_sshttp_snapshot_names( ctx->sshttp, &full_snapshot_name, &incremental_snapshot_name );
369 :
370 0 : if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) ) {
371 0 : if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", ctx->local_out.dir_fd, full_snapshot_name ) ) )
372 0 : FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
373 0 : }
374 0 : if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) {
375 0 : if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", ctx->local_out.dir_fd, incremental_snapshot_name ) ) )
376 0 : FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
377 0 : }
378 0 : }
379 :
380 : static void
381 0 : print_diagnostics( fd_snaprd_tile_t * ctx ) {
382 0 : double bandwidth = (double)((ctx->metrics.full.bytes_read+ctx->metrics.incremental.bytes_read)-ctx->diagnostics.prev_bytes_read)/1e6;
383 :
384 0 : ulong snaprd_backp = *ctx->diagnostics.cur_snaprd_backp_prefrag;
385 0 : ulong snaprd_wait = *ctx->diagnostics.cur_snaprd_caughtup_postfrag + snaprd_backp;
386 0 : ulong snapdc_backp = *ctx->diagnostics.cur_snapdc_backp_prefrag;
387 0 : ulong snapdc_wait = *ctx->diagnostics.cur_snapdc_caughtup_postfrag + snapdc_backp;
388 0 : ulong snapin_backp = *ctx->diagnostics.cur_snapin_backp_prefrag;
389 0 : ulong snapin_wait = *ctx->diagnostics.cur_snapin_caughtup_postfrag + snapin_backp;
390 :
391 0 : ulong accounts_inserted = *ctx->diagnostics.cur_accounts_inserted;
392 :
393 0 : double ns_per_tick = 1.0/fd_tempo_tick_per_ns( NULL );
394 :
395 0 : switch( ctx->state ) {
396 0 : case FD_SNAPRD_STATE_WAITING_FOR_PEERS:
397 0 : FD_LOG_NOTICE(( "waiting for peers from gossip" ));
398 0 : break;
399 0 : case FD_SNAPRD_STATE_COLLECTING_PEERS:
400 0 : FD_LOG_NOTICE(( "collecting peers from gossip" ));
401 0 : break;
402 0 : case FD_SNAPRD_STATE_READING_FULL_FILE: {
403 0 : double progress = 0.0;
404 0 : if( FD_LIKELY( ctx->metrics.full.bytes_total ) ) progress = 100.0 * (double)ctx->metrics.full.bytes_read / (double)ctx->metrics.full.bytes_total;
405 0 : FD_LOG_NOTICE(( "restoring full from file ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s",
406 0 : progress,
407 0 : bandwidth,
408 0 : ((double)(snaprd_backp-ctx->diagnostics.prev_snaprd_backp_prefrag)*ns_per_tick )/1e7,
409 0 : ((double)(snapdc_backp-ctx->diagnostics.prev_snapdc_backp_prefrag)*ns_per_tick )/1e7,
410 0 : ((double)(snapin_backp-ctx->diagnostics.prev_snapin_backp_prefrag)*ns_per_tick )/1e7,
411 0 : 100-(((double)(snaprd_wait-ctx->diagnostics.prev_snaprd_wait)*ns_per_tick )/1e7 ),
412 0 : 100-(((double)(snapdc_wait-ctx->diagnostics.prev_snapdc_wait)*ns_per_tick )/1e7 ),
413 0 : 100-(((double)(snapin_wait-ctx->diagnostics.prev_snapin_wait)*ns_per_tick )/1e7 ),
414 0 : (double)( accounts_inserted-ctx->diagnostics.prev_accounts_inserted )/1e6 ));
415 0 : break;
416 0 : }
417 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_FILE: {
418 0 : FD_LOG_NOTICE(( "flushing full from file ... 100.0 %% bw= 0 MB/s" ));
419 0 : break;
420 0 : }
421 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_FILE_RESET:
422 0 : FD_LOG_NOTICE(( "resetting full from file ... 100.0 %% bw= 0 MB/s" ));
423 0 : break;
424 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE: {
425 0 : double progress = 0.0;
426 0 : if( FD_LIKELY( ctx->metrics.incremental.bytes_total ) ) progress = 100.0 * (double)ctx->metrics.incremental.bytes_read / (double)ctx->metrics.incremental.bytes_total;
427 0 : FD_LOG_NOTICE(( "restoring incremental from file ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s",
428 0 : progress,
429 0 : bandwidth,
430 0 : ((double)(snaprd_backp-ctx->diagnostics.prev_snaprd_backp_prefrag)*ns_per_tick )/1e7,
431 0 : ((double)(snapdc_backp-ctx->diagnostics.prev_snapdc_backp_prefrag)*ns_per_tick )/1e7,
432 0 : ((double)(snapin_backp-ctx->diagnostics.prev_snapin_backp_prefrag)*ns_per_tick )/1e7,
433 0 : 100-(((double)(snaprd_wait-ctx->diagnostics.prev_snaprd_wait)*ns_per_tick )/1e7 ),
434 0 : 100-(((double)(snapdc_wait-ctx->diagnostics.prev_snapdc_wait)*ns_per_tick )/1e7 ),
435 0 : 100-(((double)(snapin_wait-ctx->diagnostics.prev_snapin_wait)*ns_per_tick )/1e7 ),
436 0 : (double)( accounts_inserted-ctx->diagnostics.prev_accounts_inserted )/1e6 ));
437 0 : break;
438 0 : }
439 0 : case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE: {
440 0 : FD_LOG_NOTICE(( "flushing incremental from file ... 100.0 %% bw= 0 MB/s" ));
441 0 : break;
442 0 : }
443 0 : case FD_SNAPRD_STATE_READING_FULL_HTTP: {
444 0 : double progress = 0.0;
445 0 : if( FD_LIKELY( ctx->metrics.full.bytes_total ) ) progress = 100.0 * (double)ctx->metrics.full.bytes_read / (double)ctx->metrics.full.bytes_total;
446 0 : FD_LOG_NOTICE(( "restoring full from http ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s",
447 0 : progress,
448 0 : bandwidth,
449 0 : ((double)(snaprd_backp-ctx->diagnostics.prev_snaprd_backp_prefrag)*ns_per_tick )/1e7,
450 0 : ((double)(snapdc_backp-ctx->diagnostics.prev_snapdc_backp_prefrag)*ns_per_tick )/1e7,
451 0 : ((double)(snapin_backp-ctx->diagnostics.prev_snapin_backp_prefrag)*ns_per_tick )/1e7,
452 0 : 100-(((double)(snaprd_wait-ctx->diagnostics.prev_snaprd_wait)*ns_per_tick )/1e7 ),
453 0 : 100-(((double)(snapdc_wait-ctx->diagnostics.prev_snapdc_wait)*ns_per_tick )/1e7 ),
454 0 : 100-(((double)(snapin_wait-ctx->diagnostics.prev_snapin_wait)*ns_per_tick )/1e7 ),
455 0 : (double)( accounts_inserted-ctx->diagnostics.prev_accounts_inserted )/1e6 ));
456 0 : break;
457 0 : }
458 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP: {
459 0 : FD_LOG_NOTICE(( "flushing full from http ... 100.0 %% bw= 0 MB/s" ));
460 0 : break;
461 0 : }
462 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET: {
463 0 : FD_LOG_NOTICE(( "resetting full from http ... 100.0 %% bw= 0 MB/s" ));
464 0 : break;
465 0 : }
466 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP: {
467 0 : double progress = 0.0;
468 0 : if( FD_LIKELY( ctx->metrics.incremental.bytes_total ) ) progress = 100.0 * (double)ctx->metrics.incremental.bytes_read / (double)ctx->metrics.incremental.bytes_total;
469 0 : FD_LOG_NOTICE(( "restoring incremental from http ... (%.1f %%) bw=%3.f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s",
470 0 : progress,
471 0 : bandwidth,
472 0 : ((double)(snaprd_backp-ctx->diagnostics.prev_snaprd_backp_prefrag)*ns_per_tick )/1e7,
473 0 : ((double)(snapdc_backp-ctx->diagnostics.prev_snapdc_backp_prefrag)*ns_per_tick )/1e7,
474 0 : ((double)(snapin_backp-ctx->diagnostics.prev_snapin_backp_prefrag)*ns_per_tick )/1e7,
475 0 : 100-(((double)(snaprd_wait-ctx->diagnostics.prev_snaprd_wait)*ns_per_tick )/1e7 ),
476 0 : 100-(((double)(snapdc_wait-ctx->diagnostics.prev_snapdc_wait)*ns_per_tick )/1e7 ),
477 0 : 100-(((double)(snapin_wait-ctx->diagnostics.prev_snapin_wait)*ns_per_tick )/1e7 ),
478 0 : (double)( accounts_inserted-ctx->diagnostics.prev_accounts_inserted )/1e6 ));
479 0 : break;
480 0 : }
481 0 : case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP: {
482 0 : FD_LOG_NOTICE(( "flushing incremental from http ... 100.0 %% bw= 0 MB/s" ));
483 0 : break;
484 0 : }
485 0 : case FD_SNAPRD_STATE_SHUTDOWN: {
486 0 : break;
487 0 : }
488 0 : default:
489 0 : break;
490 0 : }
491 :
492 0 : ctx->diagnostics.prev_bytes_read = ctx->metrics.full.bytes_read+ctx->metrics.incremental.bytes_read;
493 :
494 0 : ctx->diagnostics.prev_snaprd_backp_prefrag = snaprd_backp;
495 0 : ctx->diagnostics.prev_snaprd_wait = snaprd_wait;
496 0 : ctx->diagnostics.prev_snapdc_backp_prefrag = snapdc_backp;
497 0 : ctx->diagnostics.prev_snapdc_wait = snapdc_wait;
498 0 : ctx->diagnostics.prev_snapin_backp_prefrag = snapin_backp;
499 0 : ctx->diagnostics.prev_snapin_wait = snapin_wait;
500 :
501 0 : ctx->diagnostics.prev_accounts_inserted = accounts_inserted;
502 0 : }
503 :
504 : static void
505 : after_credit( fd_snaprd_tile_t * ctx,
506 : fd_stem_context_t * stem,
507 : int * opt_poll_in,
508 0 : int * charge_busy ) {
509 0 : (void)opt_poll_in;
510 0 : (void)charge_busy;
511 :
512 0 : long now = fd_log_wallclock();
513 0 : if( FD_LIKELY( ctx->peer_selection ) ) {
514 0 : fd_ssping_advance( ctx->ssping, now );
515 0 : }
516 :
517 0 : drain_buffer( ctx );
518 :
519 : /* All control fragments sent by the snaprd tile must be fully
520 : acknowledged by all downstream consumers before processing can
521 : proceed, to prevent tile state machines from getting out of sync
522 : (see fd_ssctrl.h for more details). Currently there are two
523 : downstream consumers, snapdc and snapin. */
524 0 : #define NUM_SNAP_CONSUMERS (2UL)
525 :
526 0 : if( FD_UNLIKELY( now>ctx->diagnostic_deadline_nanos ) ) {
527 0 : ctx->diagnostic_deadline_nanos = now+(long)1e9;
528 0 : print_diagnostics( ctx );
529 0 : }
530 :
531 0 : switch ( ctx->state ) {
532 0 : case FD_SNAPRD_STATE_WAITING_FOR_PEERS: {
533 0 : fd_ip4_port_t best = fd_ssping_best( ctx->ssping );
534 0 : if( FD_LIKELY( best.l ) ) {
535 0 : ctx->state = FD_SNAPRD_STATE_COLLECTING_PEERS;
536 0 : ctx->deadline_nanos = now+500L*1000L*1000L;
537 0 : }
538 0 : break;
539 0 : }
540 0 : case FD_SNAPRD_STATE_COLLECTING_PEERS: {
541 0 : if( FD_UNLIKELY( now<ctx->deadline_nanos ) ) break;
542 :
543 0 : fd_ip4_port_t best = fd_ssping_best( ctx->ssping );
544 0 : if( FD_UNLIKELY( !best.l ) ) {
545 0 : ctx->state = FD_SNAPRD_STATE_WAITING_FOR_PEERS;
546 0 : break;
547 0 : }
548 :
549 0 : ulong highest_cluster_slot = 0UL; /* TODO: Implement, using incremental snapshot slot for age */
550 0 : if( FD_LIKELY( ctx->local_in.full_snapshot_slot!=ULONG_MAX && ctx->local_in.full_snapshot_slot>=fd_ulong_sat_sub( highest_cluster_slot, ctx->config.maximum_local_snapshot_age ) ) ) {
551 0 : FD_LOG_NOTICE(( "reading full snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
552 0 : ctx->metrics.full.bytes_total = ctx->local_in.full_snapshot_size;
553 0 : ctx->state = FD_SNAPRD_STATE_READING_FULL_FILE;
554 0 : } else {
555 0 : FD_LOG_NOTICE(( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( best.addr ), fd_ushort_bswap( best.port ) ));
556 0 : ctx->addr = best;
557 0 : ctx->state = FD_SNAPRD_STATE_READING_FULL_HTTP;
558 0 : fd_sshttp_init( ctx->sshttp, best, "/snapshot.tar.bz2", 17UL, now );
559 0 : }
560 0 : break;
561 0 : }
562 0 : case FD_SNAPRD_STATE_READING_FULL_FILE:
563 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE:
564 0 : read_file_data( ctx, stem );
565 0 : break;
566 0 : case FD_SNAPRD_STATE_READING_FULL_HTTP:
567 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP: {
568 0 : read_http_data( ctx, stem, now );
569 0 : break;
570 0 : }
571 0 : case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE:
572 0 : case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP:
573 0 : if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
574 0 : ctx->ack_cnt = 0UL;
575 :
576 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
577 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
578 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
579 0 : ctx->malformed = 0;
580 0 : break;
581 0 : }
582 :
583 0 : if( FD_UNLIKELY( ctx->local_out.write_buffer_len ) ) break;
584 :
585 0 : rename_snapshots( ctx );
586 0 : ctx->state = FD_SNAPRD_STATE_SHUTDOWN;
587 0 : metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
588 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
589 0 : break;
590 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_FILE:
591 0 : if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
592 0 : ctx->ack_cnt = 0UL;
593 :
594 0 : if( FD_LIKELY( !ctx->config.incremental_snapshot_fetch ) ) {
595 0 : ctx->state = FD_SNAPRD_STATE_SHUTDOWN;
596 0 : metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
597 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
598 0 : break;
599 0 : }
600 :
601 0 : FD_LOG_NOTICE(( "reading incremental snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path ));
602 0 : ctx->metrics.incremental.bytes_total = ctx->local_in.incremental_snapshot_size;
603 0 : ctx->state = FD_SNAPRD_STATE_READING_INCREMENTAL_FILE;
604 0 : break;
605 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP:
606 0 : if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
607 0 : ctx->ack_cnt = 0UL;
608 :
609 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
610 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
611 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
612 0 : ctx->malformed = 0;
613 0 : break;
614 0 : }
615 :
616 0 : if( FD_UNLIKELY( ctx->local_out.write_buffer_len ) ) break;
617 :
618 0 : if( FD_LIKELY( !ctx->config.incremental_snapshot_fetch ) ) {
619 0 : rename_snapshots( ctx );
620 0 : ctx->state = FD_SNAPRD_STATE_SHUTDOWN;
621 0 : metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
622 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
623 0 : break;
624 0 : }
625 :
626 0 : FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), fd_ushort_bswap( ctx->addr.port ) ));
627 0 : fd_sshttp_init( ctx->sshttp, ctx->addr, "/incremental-snapshot.tar.bz2", 29UL, fd_log_wallclock() );
628 0 : ctx->state = FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP;
629 0 : break;
630 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET:
631 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_FILE_RESET:
632 0 : if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
633 0 : ctx->ack_cnt = 0UL;
634 :
635 0 : ctx->metrics.full.bytes_read = 0UL;
636 0 : ctx->metrics.full.bytes_written = 0UL;
637 0 : ctx->metrics.full.bytes_total = 0UL;
638 :
639 0 : ctx->metrics.incremental.bytes_read = 0UL;
640 0 : ctx->metrics.incremental.bytes_written = 0UL;
641 0 : ctx->metrics.incremental.bytes_total = 0UL;
642 :
643 0 : ctx->state = FD_SNAPRD_STATE_COLLECTING_PEERS;
644 0 : ctx->deadline_nanos = 0L;
645 0 : break;
646 0 : case FD_SNAPRD_STATE_SHUTDOWN:
647 0 : break;
648 0 : default: {
649 0 : FD_LOG_ERR(( "unexpected state %d", ctx->state ));
650 0 : break;
651 0 : }
652 0 : }
653 0 : }
654 :
655 : static int
656 : before_frag( fd_snaprd_tile_t * ctx FD_PARAM_UNUSED,
657 : ulong in_idx,
658 : ulong seq FD_PARAM_UNUSED,
659 0 : ulong sig ) {
660 0 : if( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ){
661 0 : (void)sig;
662 : // return !( sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO ||
663 : // sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE ||
664 : // sig==FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES );
665 0 : return 1;
666 0 : }
667 0 : return 0;
668 0 : }
669 :
670 : static void
671 : during_frag( fd_snaprd_tile_t * ctx,
672 : ulong in_idx,
673 : ulong seq FD_PARAM_UNUSED,
674 : ulong sig FD_PARAM_UNUSED,
675 : ulong chunk,
676 : ulong sz,
677 0 : ulong ctl FD_PARAM_UNUSED) {
678 0 : if( ctx->in_kind[ in_idx ]!= IN_KIND_GOSSIP ) return;
679 :
680 0 : if( FD_UNLIKELY( chunk<ctx->gossip_in.chunk0 ||
681 0 : chunk>ctx->gossip_in.wmark ) ) {
682 0 : FD_LOG_ERR(( "snaprd: unexpected chunk %lu", chunk ));
683 0 : }
684 : /* TODO: Size checks */
685 0 : fd_memcpy( &ctx->gossip.tmp_upd_buf, fd_chunk_to_laddr( ctx->gossip_in.mem, chunk ), sz );
686 0 : }
687 :
688 : static void
689 : after_frag( fd_snaprd_tile_t * ctx,
690 : ulong in_idx,
691 : ulong seq,
692 : ulong sig,
693 : ulong sz,
694 : ulong tsorig,
695 : ulong tspub,
696 0 : fd_stem_context_t * stem ) {
697 0 : (void)in_idx;
698 :
699 0 : (void)seq;
700 0 : (void)tsorig;
701 0 : (void)tspub;
702 0 : (void)sz;
703 :
704 0 : if( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) {
705 0 : fd_gossip_update_message_t * msg = &ctx->gossip.tmp_upd_buf;
706 0 : switch( msg->tag ) {
707 0 : case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO: {
708 0 : fd_contact_info_t * cur = &ctx->gossip.ci_table[ msg->contact_info.idx ];
709 0 : fd_ip4_port_t cur_addr = ctx->gossip.ci_table[ msg->contact_info.idx ].sockets[ FD_CONTACT_INFO_SOCKET_RPC ];
710 0 : if( cur_addr.l ){
711 0 : fd_ssping_remove( ctx->ssping, cur_addr );
712 0 : }
713 0 : fd_contact_info_t * new = msg->contact_info.contact_info;
714 0 : fd_ip4_port_t new_addr = new->sockets[ FD_CONTACT_INFO_SOCKET_RPC ];
715 0 : if( new_addr.l ) {
716 0 : fd_ssping_add( ctx->ssping, new_addr );
717 0 : }
718 0 : *cur = *new;
719 0 : }
720 0 : break;
721 0 : case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE: {
722 0 : fd_contact_info_t * cur = &ctx->gossip.ci_table[ msg->contact_info_remove.idx ];
723 0 : fd_ip4_port_t addr = cur->sockets[ FD_CONTACT_INFO_SOCKET_RPC ];
724 0 : if( addr.l ) {
725 0 : fd_ssping_remove( ctx->ssping, addr );
726 0 : }
727 0 : }
728 0 : break;
729 0 : case FD_GOSSIP_UPDATE_TAG_SNAPSHOT_HASHES:
730 : /* TODO */
731 0 : break;
732 0 : }
733 :
734 0 : } else {
735 0 : FD_TEST( sig==FD_SNAPSHOT_MSG_CTRL_ACK || sig==FD_SNAPSHOT_MSG_CTRL_MALFORMED );
736 :
737 0 : if( FD_LIKELY( sig==FD_SNAPSHOT_MSG_CTRL_ACK ) ) ctx->ack_cnt++;
738 0 : else {
739 0 : FD_TEST( ctx->state!=FD_SNAPRD_STATE_SHUTDOWN &&
740 0 : ctx->state!=FD_SNAPRD_STATE_COLLECTING_PEERS &&
741 0 : ctx->state!=FD_SNAPRD_STATE_WAITING_FOR_PEERS );
742 :
743 0 : switch( ctx->state) {
744 0 : case FD_SNAPRD_STATE_READING_FULL_FILE:
745 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_FILE:
746 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_FILE_RESET:
747 0 : FD_LOG_ERR(( "error reading snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
748 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE:
749 0 : case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE:
750 0 : FD_LOG_ERR(( "error reading snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path ));
751 0 : case FD_SNAPRD_STATE_READING_FULL_HTTP:
752 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP:
753 0 : FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
754 0 : FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
755 0 : fd_sshttp_cancel( ctx->sshttp );
756 0 : fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
757 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
758 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
759 0 : break;
760 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP:
761 0 : case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP:
762 0 : if( FD_UNLIKELY( ctx->malformed ) ) break;
763 :
764 0 : FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2",
765 0 : FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
766 0 : fd_sshttp_cancel( ctx->sshttp );
767 0 : fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
768 : /* We would like to transition to FULL_HTTP_RESET, but we
769 : can't do it just yet, because we have already sent a DONE
770 : control fragment, and need to wait for acknowledges to come
771 : back first, to ensure there's only one control message
772 : outstanding at a time. */
773 0 : ctx->malformed = 1;
774 0 : break;
775 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET:
776 0 : break;
777 0 : default:
778 0 : FD_LOG_ERR(( "unexpected state %d", ctx->state ));
779 0 : break;
780 0 : }
781 0 : }
782 0 : }
783 0 : }
784 :
785 : static void
786 : privileged_init( fd_topo_t * topo,
787 0 : fd_topo_tile_t * tile ) {
788 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
789 :
790 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
791 0 : fd_snaprd_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaprd_tile_t), sizeof(fd_snaprd_tile_t) );
792 :
793 0 : fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
794 :
795 : /* By default, the snaprd tile selects peers and its initial state is
796 : WAITING_FOR_PEERS. */
797 0 : ctx->peer_selection = 1;
798 0 : ctx->state = FD_SNAPRD_STATE_WAITING_FOR_PEERS;
799 :
800 0 : fd_ssarchive_remove_old_snapshots( tile->snaprd.snapshots_path,
801 0 : tile->snaprd.max_full_snapshots_to_keep,
802 0 : tile->snaprd.max_incremental_snapshots_to_keep );
803 :
804 0 : ulong full_slot = ULONG_MAX;
805 0 : ulong incremental_slot = ULONG_MAX;
806 0 : char full_path[ PATH_MAX ] = {0};
807 0 : char incremental_path[ PATH_MAX ] = {0};
808 0 : if( FD_UNLIKELY( -1==fd_ssarchive_latest_pair( tile->snaprd.snapshots_path,
809 0 : tile->snaprd.incremental_snapshot_fetch,
810 0 : &full_slot,
811 0 : &incremental_slot,
812 0 : full_path,
813 0 : incremental_path ) ) ) {
814 0 : if( FD_UNLIKELY( !tile->snaprd.do_download ) ) {
815 0 : FD_LOG_ERR(( "No snapshots found in `%s` and downloading is disabled. "
816 0 : "Please enable downloading via [snapshots.download] and restart.", tile->snaprd.snapshots_path ));
817 0 : }
818 :
819 0 : ctx->local_in.full_snapshot_slot = ULONG_MAX;
820 0 : ctx->local_in.incremental_snapshot_slot = ULONG_MAX;
821 :
822 0 : ctx->local_out.dir_fd = open( tile->snaprd.snapshots_path, O_DIRECTORY|O_CLOEXEC );
823 0 : if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", tile->snaprd.snapshots_path, errno, fd_io_strerror( errno ) ));
824 :
825 0 : FD_TEST( fd_cstr_printf_check( ctx->local_out.full_snapshot_path, PATH_MAX, NULL, "%s/snapshot.tar.bz2-partial", tile->snaprd.snapshots_path ) );
826 0 : ctx->local_out.full_snapshot_fd = openat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
827 0 : if( FD_UNLIKELY( -1==ctx->local_out.full_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_out.full_snapshot_path, errno, fd_io_strerror( errno ) ));
828 :
829 0 : if( FD_LIKELY( tile->snaprd.incremental_snapshot_fetch ) ) {
830 0 : FD_TEST( fd_cstr_printf_check( ctx->local_out.incremental_snapshot_path, PATH_MAX, NULL, "%s/incremental-snapshot.tar.bz2-partial", tile->snaprd.snapshots_path ) );
831 0 : ctx->local_out.incremental_snapshot_fd = openat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
832 0 : if( FD_UNLIKELY( -1==ctx->local_out.incremental_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_out.incremental_snapshot_path, errno, fd_io_strerror( errno ) ));
833 0 : } else {
834 0 : ctx->local_out.incremental_snapshot_fd = -1;
835 0 : }
836 0 : } else {
837 0 : FD_TEST( full_slot!=ULONG_MAX );
838 :
839 0 : ctx->local_in.full_snapshot_slot = full_slot;
840 0 : ctx->local_in.incremental_snapshot_slot = incremental_slot;
841 :
842 0 : strncpy( ctx->local_in.full_snapshot_path, full_path, PATH_MAX );
843 0 : ctx->local_in.full_snapshot_fd = open( ctx->local_in.full_snapshot_path, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
844 0 : if( FD_UNLIKELY( -1==ctx->local_in.full_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_in.full_snapshot_path, errno, fd_io_strerror( errno ) ));
845 :
846 0 : struct stat full_stat;
847 0 : if( FD_UNLIKELY( -1==fstat( ctx->local_in.full_snapshot_fd, &full_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", full_path, errno, fd_io_strerror( errno ) ));
848 0 : if( FD_UNLIKELY( !S_ISREG( full_stat.st_mode ) ) ) FD_LOG_ERR(( "full snapshot path `%s` is not a regular file", full_path ));
849 0 : ctx->local_in.full_snapshot_size = (ulong)full_stat.st_size;
850 :
851 0 : if( FD_LIKELY( tile->snaprd.incremental_snapshot_fetch ) ) FD_TEST( incremental_slot!=ULONG_MAX );
852 :
853 0 : if( FD_LIKELY( incremental_slot!=ULONG_MAX ) ) {
854 0 : strncpy( ctx->local_in.incremental_snapshot_path, incremental_path, PATH_MAX );
855 0 : ctx->local_in.incremental_snapshot_fd = open( ctx->local_in.incremental_snapshot_path, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
856 0 : if( FD_UNLIKELY( -1==ctx->local_in.incremental_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_in.incremental_snapshot_path, errno, fd_io_strerror( errno ) ));
857 :
858 0 : struct stat incremental_stat;
859 0 : if( FD_UNLIKELY( -1==fstat( ctx->local_in.incremental_snapshot_fd, &incremental_stat ) ) ) FD_LOG_ERR(( "stat() failed `%s` (%i-%s)", incremental_path, errno, fd_io_strerror( errno ) ));
860 0 : if( FD_UNLIKELY( !S_ISREG( incremental_stat.st_mode ) ) ) FD_LOG_ERR(( "incremental snapshot path `%s` is not a regular file", incremental_path ));
861 0 : ctx->local_in.incremental_snapshot_size = (ulong)incremental_stat.st_size;
862 0 : }
863 :
864 0 : ctx->local_out.dir_fd = -1;
865 0 : ctx->local_out.full_snapshot_fd = -1;
866 0 : ctx->local_out.incremental_snapshot_fd = -1;
867 :
868 0 : if( FD_UNLIKELY( tile->snaprd.maximum_local_snapshot_age==0U ) ) {
869 : /* Disable peer selection if we are reading snapshots from disk
870 : and there is no maximum local snapshot age set. Set the
871 : initial state to READING_FULL_FILE to avoid peer selection
872 : logic.
873 :
874 : TODO: Why? Document in TOML. */
875 0 : ctx->peer_selection = 0;
876 0 : ctx->state = FD_SNAPRD_STATE_READING_FULL_FILE;
877 0 : ctx->metrics.full.bytes_total = ctx->local_in.full_snapshot_size;
878 0 : FD_LOG_NOTICE(( "reading full snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
879 0 : }
880 0 : }
881 0 : }
882 :
883 : static void
884 : unprivileged_init( fd_topo_t * topo,
885 0 : fd_topo_tile_t * tile ) {
886 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
887 :
888 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
889 0 : fd_snaprd_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaprd_tile_t), sizeof(fd_snaprd_tile_t) );
890 0 : void * _sshttp = FD_SCRATCH_ALLOC_APPEND( l, fd_sshttp_align(), fd_sshttp_footprint() );
891 0 : void * _ssping = FD_SCRATCH_ALLOC_APPEND( l, fd_ssping_align(), fd_ssping_footprint( 65536UL ) );
892 : // void * _gossip_peers_rx = FD_SCRATCH_ALLOC_APPEND( l, gossip_peers_rx_align(), gossip_peers_rx_footprint() );
893 0 : void * _ci_table = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_contact_info_t), sizeof(fd_contact_info_t) * FD_CONTACT_INFO_TABLE_SIZE );
894 :
895 0 : ctx->ack_cnt = 0UL;
896 0 : ctx->malformed = 0;
897 :
898 0 : if( FD_UNLIKELY( tile->snaprd.diagnostics ) ) ctx->diagnostic_deadline_nanos = fd_log_wallclock()+(long)1e9;
899 0 : else ctx->diagnostic_deadline_nanos = LONG_MAX;
900 :
901 0 : ctx->local_out.write_buffer_pos = 0UL;
902 0 : ctx->local_out.write_buffer_len = 0UL;
903 :
904 0 : fd_memcpy( ctx->config.path, tile->snaprd.snapshots_path, PATH_MAX );
905 0 : ctx->config.incremental_snapshot_fetch = tile->snaprd.incremental_snapshot_fetch;
906 0 : ctx->config.do_download = tile->snaprd.do_download;
907 0 : ctx->config.maximum_local_snapshot_age = tile->snaprd.maximum_local_snapshot_age;
908 0 : ctx->config.minimum_download_speed_mib = tile->snaprd.minimum_download_speed_mib;
909 0 : ctx->config.max_full_snapshots_to_keep = tile->snaprd.max_full_snapshots_to_keep;
910 0 : ctx->config.max_incremental_snapshots_to_keep = tile->snaprd.max_incremental_snapshots_to_keep;
911 :
912 0 : if( FD_UNLIKELY( !tile->snaprd.maximum_download_retry_abort ) ) ctx->config.maximum_download_retry_abort = UINT_MAX;
913 0 : else ctx->config.maximum_download_retry_abort = tile->snaprd.maximum_download_retry_abort;
914 :
915 0 : ctx->ssping = fd_ssping_join( fd_ssping_new( _ssping, 65536UL, 1UL ) );
916 0 : FD_TEST( ctx->ssping );
917 :
918 0 : ctx->sshttp = fd_sshttp_join( fd_sshttp_new( _sshttp ) );
919 0 : FD_TEST( ctx->sshttp );
920 :
921 0 : memset( &ctx->diagnostics, 0, sizeof(ctx->diagnostics) );
922 :
923 0 : fd_topo_tile_t * snaprd_tile = &topo->tiles[ fd_topo_find_tile( topo, "snaprd", 0UL ) ];
924 0 : fd_topo_tile_t * snapdc_tile = &topo->tiles[ fd_topo_find_tile( topo, "snapdc", 0UL ) ];
925 0 : fd_topo_tile_t * snapin_tile = &topo->tiles[ fd_topo_find_tile( topo, "snapin", 0UL ) ];
926 0 : ulong volatile * const snaprd_metrics = fd_metrics_tile( snaprd_tile->metrics );
927 0 : ulong volatile * const snapdc_metrics = fd_metrics_tile( snapdc_tile->metrics );
928 0 : ulong volatile * const snapin_metrics = fd_metrics_tile( snapin_tile->metrics );
929 0 : ctx->diagnostics.cur_snaprd_backp_prefrag = snaprd_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG );
930 0 : ctx->diagnostics.cur_snaprd_caughtup_postfrag = snaprd_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG );
931 0 : ctx->diagnostics.cur_snapdc_backp_prefrag = snapdc_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG );
932 0 : ctx->diagnostics.cur_snapdc_caughtup_postfrag = snapdc_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG );
933 0 : ctx->diagnostics.cur_snapin_backp_prefrag = snapin_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG );
934 0 : ctx->diagnostics.cur_snapin_caughtup_postfrag = snapin_metrics+MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG );
935 0 : ctx->diagnostics.cur_accounts_inserted = snapin_metrics+MIDX( GAUGE, SNAPIN, ACCOUNTS_INSERTED );
936 :
937 0 : ctx->gossip.ci_table = _ci_table;
938 : /* zero-out memory so that we can perform null checks in after_frag */
939 0 : fd_memset( ctx->gossip.ci_table, 0, sizeof(fd_contact_info_t) * FD_CONTACT_INFO_TABLE_SIZE );
940 :
941 0 : FD_TEST( tile->in_cnt<=MAX_IN_LINKS );
942 0 : for( ulong i=0UL; i<(tile->in_cnt); i++ ){
943 0 : fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
944 0 : if( 0==strcmp( in_link->name, "gossip_out" ) ) {
945 : // has_gossip_in = 1;
946 0 : ctx->in_kind[ i ] = IN_KIND_GOSSIP;
947 : // ctx->gossip_in.mem = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
948 : // ctx->gossip_in.chunk0 = fd_dcache_compact_chunk0( ctx->gossip_in.mem, in_link->dcache );
949 : // ctx->gossip_in.wmark = fd_dcache_compact_wmark ( ctx->gossip_in.mem, in_link->dcache, in_link->mtu );
950 : // ctx->gossip_in.mtu = in_link->mtu;
951 0 : } else if( 0==strcmp( in_link->name, "snapdc_rd" ) ||
952 0 : 0==strcmp( in_link->name, "snapin_rd" ) ) {
953 0 : ctx->in_kind[ i ] = IN_KIND_SNAPCTL;
954 0 : }
955 0 : }
956 :
957 0 : for( ulong i=0UL; i<tile->snaprd.http.peers_cnt; i++ ) fd_ssping_add( ctx->ssping, tile->snaprd.http.peers[ i ] );
958 :
959 0 : if( FD_UNLIKELY( tile->out_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 1", tile->out_cnt ));
960 :
961 0 : ctx->out.wksp = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ 0 ] ].dcache_obj_id ].wksp_id ].wksp;
962 0 : ctx->out.chunk0 = fd_dcache_compact_chunk0( ctx->out.wksp, topo->links[ tile->out_link_id[ 0 ] ].dcache );
963 0 : ctx->out.wmark = fd_dcache_compact_wmark ( ctx->out.wksp, topo->links[ tile->out_link_id[ 0 ] ].dcache, topo->links[ tile->out_link_id[ 0 ] ].mtu );
964 0 : ctx->out.chunk = ctx->out.chunk0;
965 0 : ctx->out.mtu = topo->links[ tile->out_link_id[ 0 ] ].mtu;
966 0 : }
967 :
968 0 : #define STEM_BURST 2UL /* One control message, and one data message */
969 0 : #define STEM_LAZY 1000L
970 :
971 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snaprd_tile_t
972 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaprd_tile_t)
973 :
974 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
975 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
976 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
977 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
978 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
979 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
980 :
981 : #include "../../disco/stem/fd_stem.c"
982 :
983 : fd_topo_run_tile_t fd_tile_snaprd = {
984 : .name = NAME,
985 : .scratch_align = scratch_align,
986 : .scratch_footprint = scratch_footprint,
987 : .privileged_init = privileged_init,
988 : .unprivileged_init = unprivileged_init,
989 : .run = stem_run,
990 : .keep_host_networking = 1,
991 : .allow_connect = 1
992 : };
993 :
994 : #undef NAME
|