Line data Source code
1 : #include "utils/fd_ssping.h"
2 : #include "utils/fd_sshttp.h"
3 : #include "utils/fd_ssctrl.h"
4 : #include "utils/fd_ssarchive.h"
5 :
6 : #include "../../disco/topo/fd_topo.h"
7 : #include "../../disco/metrics/fd_metrics.h"
8 :
9 : #include <errno.h>
10 : #include <fcntl.h>
11 : #include <string.h>
12 : #include <unistd.h>
13 : #include <stdio.h>
14 :
15 : #define NAME "snaprd"
16 :
17 : /* The snaprd tile at a high level is a state machine that downloads
18 : snapshots from the network or reads snapshots from disk and produces
19 : a byte stream that is parsed by downstream snapshot consumer tiles.
20 : The snaprd tile gathers the latest SnapshotHashes information from
21 : gossip to decide whether to download snapshots or read local
22 : snapshots from disk. If the snaprd tile needs to download a snapshot,
23 : it goes through the process of discovering and selecting elegible
24 : peers from gossip to download from. */
25 :
26 0 : #define FD_SNAPRD_STATE_WAITING_FOR_PEERS ( 0) /* Waiting for first peer to arrive from gossip to download from */
27 0 : #define FD_SNAPRD_STATE_COLLECTING_PEERS ( 1) /* First peer arrived, wait a little longer to see if a better one arrives */
28 0 : #define FD_SNAPRD_STATE_READING_FULL_FILE ( 2) /* Full file looks better than peer, reading it from disk */
29 0 : #define FD_SNAPRD_STATE_FLUSHING_FULL_FILE ( 3) /* Full file was read ok, confirm it decompressed and inserted ok */
30 0 : #define FD_SNAPRD_STATE_FLUSHING_FULL_FILE_RESET ( 4) /* Resetting to load full snapshot from file again, confirm decompress and inserter are reset too */
31 0 : #define FD_SNAPRD_STATE_READING_INCREMENTAL_FILE ( 5) /* Incremental file looks better than peer, reading it from disk */
32 0 : #define FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE ( 6) /* Incremental file was read ok, confirm it decompressed and inserted ok */
33 0 : #define FD_SNAPRD_STATE_READING_FULL_HTTP ( 7) /* Peer was selected, reading full snapshot from HTTP */
34 0 : #define FD_SNAPRD_STATE_FLUSHING_FULL_HTTP ( 8) /* Full snapshot was downloaded ok, confirm it decompressed and inserted ok */
35 0 : #define FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET ( 9) /* Resetting to load full snapshot from HTTP again, confirm decompress and inserter are reset too */
36 0 : #define FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP (10) /* Peer was selected, reading incremental snapshot from HTTP */
37 0 : #define FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP (11) /* Incremental snapshot was downloaded ok, confirm it decompressed and inserted ok */
38 0 : #define FD_SNAPRD_STATE_SHUTDOWN (12) /* The tile is done, and has likely already exited */
39 :
40 0 : #define SNAPRD_FILE_BUF_SZ (1024UL*1024UL) /* 1 MiB */
41 :
42 : struct fd_snaprd_tile {
43 : fd_ssping_t * ssping;
44 : fd_sshttp_t * sshttp;
45 :
46 : int state;
47 : int malformed;
48 : long deadline_nanos;
49 : ulong ack_cnt;
50 : int peer_selection;
51 :
52 : fd_ip4_port_t addr;
53 :
54 : struct {
55 : ulong write_buffer_pos;
56 : ulong write_buffer_len;
57 : uchar write_buffer[ SNAPRD_FILE_BUF_SZ ];
58 :
59 : char full_snapshot_path[ PATH_MAX ];
60 : char incremental_snapshot_path[ PATH_MAX ];
61 :
62 : int dir_fd;
63 : int full_snapshot_fd;
64 : int incremental_snapshot_fd;
65 : } local_out;
66 :
67 : struct {
68 : ulong full_snapshot_slot;
69 : int full_snapshot_fd;
70 : char full_snapshot_path[ PATH_MAX ];
71 : ulong incremental_snapshot_slot;
72 : int incremental_snapshot_fd;
73 : char incremental_snapshot_path[ PATH_MAX ];
74 : } local_in;
75 :
76 : struct {
77 : char path[ PATH_MAX ];
78 : int do_download;
79 : int incremental_snapshot_fetch;
80 : uint maximum_local_snapshot_age;
81 : uint minimum_download_speed_mib;
82 : uint maximum_download_retry_abort;
83 : } config;
84 :
85 : struct {
86 : struct {
87 : ulong bytes_read;
88 : ulong bytes_written;
89 : ulong bytes_total;
90 : uint num_retries;
91 : } full;
92 :
93 : struct {
94 : ulong bytes_read;
95 : ulong bytes_written;
96 : ulong bytes_total;
97 : uint num_retries;
98 : } incremental;
99 : } metrics;
100 :
101 : struct {
102 : fd_wksp_t * wksp;
103 : ulong chunk0;
104 : ulong wmark;
105 : ulong chunk;
106 : ulong mtu;
107 : } out;
108 : };
109 :
110 : typedef struct fd_snaprd_tile fd_snaprd_tile_t;
111 :
112 : static ulong
113 0 : scratch_align( void ) {
114 0 : return alignof(fd_snaprd_tile_t);
115 0 : }
116 :
117 : static ulong
118 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
119 0 : (void)tile;
120 0 : ulong l = FD_LAYOUT_INIT;
121 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snaprd_tile_t), sizeof(fd_snaprd_tile_t) );
122 0 : l = FD_LAYOUT_APPEND( l, fd_sshttp_align(), fd_sshttp_footprint() );
123 0 : l = FD_LAYOUT_APPEND( l, fd_ssping_align(), fd_ssping_footprint( 65536UL ) );
124 0 : return FD_LAYOUT_FINI( l, alignof(fd_snaprd_tile_t) );
125 0 : }
126 :
127 : static inline int
128 0 : should_shutdown( fd_snaprd_tile_t * ctx ) {
129 0 : return ctx->state==FD_SNAPRD_STATE_SHUTDOWN;
130 0 : }
131 :
132 : static void
133 0 : metrics_write( fd_snaprd_tile_t * ctx ) {
134 0 : FD_MGAUGE_SET( SNAPRD, FULL_BYTES_READ, ctx->metrics.full.bytes_read );
135 0 : FD_MGAUGE_SET( SNAPRD, FULL_BYTES_WRITTEN, ctx->metrics.full.bytes_written );
136 0 : FD_MGAUGE_SET( SNAPRD, FULL_BYTES_TOTAL, ctx->metrics.full.bytes_total );
137 0 : FD_MGAUGE_SET( SNAPRD, FULL_DOWNLOAD_RETRIES, ctx->metrics.full.num_retries );
138 :
139 0 : FD_MGAUGE_SET( SNAPRD, INCREMENTAL_BYTES_READ, ctx->metrics.incremental.bytes_read );
140 0 : FD_MGAUGE_SET( SNAPRD, INCREMENTAL_BYTES_WRITTEN, ctx->metrics.incremental.bytes_written );
141 0 : FD_MGAUGE_SET( SNAPRD, INCREMENTAL_BYTES_TOTAL, ctx->metrics.incremental.bytes_total );
142 0 : FD_MGAUGE_SET( SNAPRD, INCREMENTAL_DOWNLOAD_RETRIES, ctx->metrics.incremental.num_retries );
143 :
144 0 : FD_MGAUGE_SET( SNAPRD, STATE, (ulong)ctx->state );
145 0 : }
146 :
147 : static void
148 : read_file_data( fd_snaprd_tile_t * ctx,
149 0 : fd_stem_context_t * stem ) {
150 0 : uchar * out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk );
151 :
152 0 : FD_TEST( ctx->state==FD_SNAPRD_STATE_READING_INCREMENTAL_FILE || ctx->state==FD_SNAPRD_STATE_READING_FULL_FILE );
153 0 : int full = ctx->state==FD_SNAPRD_STATE_READING_FULL_FILE;
154 0 : long result = read( full ? ctx->local_in.full_snapshot_fd : ctx->local_in.incremental_snapshot_fd , out, ctx->out.mtu );
155 0 : if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) return;
156 0 : else if( FD_UNLIKELY( -1==result ) ) FD_LOG_ERR(( "read() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
157 :
158 0 : switch( ctx->state ) {
159 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE:
160 0 : ctx->metrics.incremental.bytes_read += (ulong)result;
161 0 : break;
162 0 : case FD_SNAPRD_STATE_READING_FULL_FILE:
163 0 : ctx->metrics.full.bytes_read += (ulong)result;
164 0 : break;
165 0 : default:
166 0 : break;
167 0 : }
168 :
169 0 : if( FD_UNLIKELY( !result ) ) {
170 0 : switch( ctx->state ) {
171 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE:
172 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
173 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE;
174 0 : break;
175 0 : case FD_SNAPRD_STATE_READING_FULL_FILE:
176 0 : if( FD_LIKELY( ctx->config.incremental_snapshot_fetch ) ) {
177 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_EOF_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
178 0 : } else {
179 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
180 0 : }
181 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_FILE;
182 0 : break;
183 0 : default:
184 0 : break;
185 0 : }
186 0 : return;
187 0 : }
188 :
189 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out.chunk, (ulong)result, 0UL, 0UL, 0UL );
190 0 : ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, (ulong)result, ctx->out.chunk0, ctx->out.wmark );
191 0 : }
192 :
193 : static void
194 : read_http_data( fd_snaprd_tile_t * ctx,
195 : fd_stem_context_t * stem,
196 0 : long now ) {
197 0 : uchar * out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk );
198 :
199 0 : ulong buffer_avail = fd_ulong_if( -1!=ctx->local_out.dir_fd, SNAPRD_FILE_BUF_SZ-ctx->local_out.write_buffer_len, ULONG_MAX );
200 0 : ulong data_len = fd_ulong_min( buffer_avail, ctx->out.mtu );
201 0 : int result = fd_sshttp_advance( ctx->sshttp, &data_len, out, now );
202 :
203 0 : switch( result ) {
204 0 : case FD_SSHTTP_ADVANCE_AGAIN: break;
205 0 : case FD_SSHTTP_ADVANCE_ERROR: {
206 0 : FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
207 0 : FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
208 0 : fd_ssping_invalidate( ctx->ssping, ctx->addr, now );
209 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
210 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
211 0 : ctx->deadline_nanos = now;
212 0 : break;
213 0 : }
214 0 : case FD_SSHTTP_ADVANCE_DONE: {
215 0 : switch( ctx->state ) {
216 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP:
217 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
218 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP;
219 0 : break;
220 0 : case FD_SNAPRD_STATE_READING_FULL_HTTP:
221 0 : if( FD_LIKELY( ctx->config.incremental_snapshot_fetch ) ) {
222 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_EOF_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
223 0 : } else {
224 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_DONE, 0UL, 0UL, 0UL, 0UL, 0UL );
225 0 : }
226 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP;
227 0 : break;
228 0 : default:
229 0 : break;
230 0 : }
231 0 : break;
232 0 : }
233 0 : case FD_SSHTTP_ADVANCE_DATA: {
234 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out.chunk, data_len, 0UL, 0UL, 0UL );
235 0 : ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, data_len, ctx->out.chunk0, ctx->out.wmark );
236 :
237 0 : ulong written_sz = 0UL;
238 0 : if( FD_LIKELY( -1!=ctx->local_out.dir_fd && !ctx->local_out.write_buffer_len ) ) {
239 0 : while( written_sz<data_len ) {
240 0 : int full = ctx->state==FD_SNAPRD_STATE_READING_FULL_HTTP;
241 0 : int fd = full ? ctx->local_out.full_snapshot_fd : ctx->local_out.incremental_snapshot_fd;
242 0 : long result = write( fd, out+written_sz, data_len-written_sz );
243 0 : if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) break;
244 0 : else if( FD_UNLIKELY( -1==result && errno==ENOSPC ) ) {
245 0 : char const * snapshot_path = full ? ctx->local_out.full_snapshot_path : ctx->local_out.incremental_snapshot_path;
246 0 : FD_LOG_ERR(( "Out of disk space when writing out snapshot data to `%s`", snapshot_path ));
247 0 : } else if( FD_UNLIKELY( -1==result ) ) {
248 0 : FD_LOG_ERR(( "write() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
249 0 : break;
250 0 : }
251 0 : written_sz += (ulong)result;
252 0 : }
253 0 : }
254 :
255 0 : if( FD_UNLIKELY( written_sz<data_len ) ) {
256 0 : fd_memcpy( ctx->local_out.write_buffer+ctx->local_out.write_buffer_len, out+written_sz, data_len-written_sz );
257 0 : }
258 0 : ctx->local_out.write_buffer_len += data_len-written_sz;
259 :
260 0 : switch( ctx->state ) {
261 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP:
262 0 : ctx->metrics.incremental.bytes_read += data_len;
263 0 : ctx->metrics.incremental.bytes_written += written_sz;
264 0 : break;
265 0 : case FD_SNAPRD_STATE_READING_FULL_HTTP:
266 0 : ctx->metrics.full.bytes_read += data_len;
267 0 : ctx->metrics.full.bytes_written += written_sz;
268 0 : break;
269 0 : default:
270 0 : FD_LOG_ERR(( "unexpected state %d", ctx->state ));
271 0 : break;
272 0 : }
273 :
274 0 : break;
275 0 : }
276 0 : default:
277 0 : FD_LOG_ERR(( "unexpected fd_sshttp_advance result %d", result ));
278 0 : break;
279 0 : }
280 0 : }
281 :
282 : static void
283 0 : drain_buffer( fd_snaprd_tile_t * ctx ) {
284 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPRD_STATE_READING_FULL_HTTP &&
285 0 : ctx->state!=FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP &&
286 0 : ctx->state!=FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP &&
287 0 : ctx->state!=FD_SNAPRD_STATE_FLUSHING_FULL_HTTP ) ) return;
288 :
289 0 : if( FD_LIKELY( -1==ctx->local_out.dir_fd || !ctx->local_out.write_buffer_len ) ) return;
290 :
291 0 : int full = ctx->state==FD_SNAPRD_STATE_READING_FULL_HTTP || ctx->state==FD_SNAPRD_STATE_FLUSHING_FULL_HTTP;
292 0 : int fd = full ? ctx->local_out.full_snapshot_fd : ctx->local_out.incremental_snapshot_fd;
293 :
294 0 : ulong written_sz = 0UL;
295 0 : while( ctx->local_out.write_buffer_pos+written_sz<ctx->local_out.write_buffer_len ) {
296 0 : long result = write( fd, ctx->local_out.write_buffer+ctx->local_out.write_buffer_pos+written_sz, ctx->local_out.write_buffer_len-written_sz );
297 0 : if( FD_UNLIKELY( -1==result && errno==EAGAIN ) ) break;
298 0 : else if( FD_UNLIKELY( -1==result && errno==ENOSPC ) ) {
299 0 : char const * snapshot_path = full ? ctx->local_out.full_snapshot_path : ctx->local_out.incremental_snapshot_path;
300 0 : FD_LOG_ERR(( "Out of disk space when writing out snapshot data to `%s`", snapshot_path ));
301 0 : } else if( FD_UNLIKELY( -1==result ) ) {
302 0 : FD_LOG_ERR(( "write() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
303 0 : break;
304 0 : }
305 0 : written_sz += (ulong)result;
306 0 : }
307 :
308 0 : ctx->local_out.write_buffer_pos += written_sz;
309 :
310 0 : if( FD_LIKELY( ctx->local_out.write_buffer_pos==ctx->local_out.write_buffer_len ) ) {
311 0 : ctx->local_out.write_buffer_pos = 0UL;
312 0 : ctx->local_out.write_buffer_len = 0UL;
313 0 : }
314 :
315 0 : if( FD_LIKELY( full ) ) ctx->metrics.full.bytes_written += written_sz;
316 0 : else ctx->metrics.incremental.bytes_written += written_sz;
317 0 : }
318 :
319 : static void
320 0 : rename_snapshots( fd_snaprd_tile_t * ctx ) {
321 0 : if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) return;
322 0 : char const * full_snapshot_name;
323 0 : char const * incremental_snapshot_name;
324 0 : fd_sshttp_snapshot_names( ctx->sshttp, &full_snapshot_name, &incremental_snapshot_name );
325 :
326 0 : if( FD_LIKELY( -1!=ctx->local_out.full_snapshot_fd ) ) {
327 0 : if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", ctx->local_out.dir_fd, full_snapshot_name ) ) )
328 0 : FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
329 0 : }
330 0 : if( FD_LIKELY( -1!=ctx->local_out.incremental_snapshot_fd ) ) {
331 0 : if( FD_UNLIKELY( -1==renameat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", ctx->local_out.dir_fd, incremental_snapshot_name ) ) )
332 0 : FD_LOG_ERR(( "renameat() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
333 0 : }
334 0 : }
335 :
336 : static void
337 : after_credit( fd_snaprd_tile_t * ctx,
338 : fd_stem_context_t * stem,
339 : int * opt_poll_in,
340 0 : int * charge_busy ) {
341 0 : (void)stem;
342 0 : (void)opt_poll_in;
343 0 : (void)charge_busy;
344 :
345 0 : long now = fd_log_wallclock();
346 0 : if( FD_LIKELY( ctx->peer_selection ) ) {
347 0 : fd_ssping_advance( ctx->ssping, now );
348 0 : }
349 :
350 0 : drain_buffer( ctx );
351 :
352 : /* All control fragments sent by the snaprd tile must be fully
353 : acknowledged by all downstream consumers before processing can
354 : proceed, to prevent tile state machines from getting out of sync
355 : (see fd_ssctrl.h for more details). Currently there are two
356 : downstream consumers, snapdc and snapin. */
357 0 : #define NUM_SNAP_CONSUMERS (2UL)
358 :
359 0 : switch ( ctx->state ) {
360 0 : case FD_SNAPRD_STATE_WAITING_FOR_PEERS: {
361 0 : fd_ip4_port_t best = fd_ssping_best( ctx->ssping );
362 0 : if( FD_LIKELY( best.l ) ) {
363 0 : ctx->state = FD_SNAPRD_STATE_COLLECTING_PEERS;
364 0 : ctx->deadline_nanos = now + 500L*1000L*1000L;
365 0 : }
366 0 : break;
367 0 : }
368 0 : case FD_SNAPRD_STATE_COLLECTING_PEERS: {
369 0 : if( FD_UNLIKELY( now<ctx->deadline_nanos ) ) break;
370 :
371 0 : fd_ip4_port_t best = fd_ssping_best( ctx->ssping );
372 0 : if( FD_UNLIKELY( !best.l ) ) {
373 0 : ctx->state = FD_SNAPRD_STATE_WAITING_FOR_PEERS;
374 0 : break;
375 0 : }
376 :
377 0 : ulong highest_cluster_slot = 0UL; /* TODO: Implement, using incremental snapshot slot for age */
378 0 : if( FD_LIKELY( ctx->local_in.full_snapshot_slot!=ULONG_MAX && ctx->local_in.full_snapshot_slot>=fd_ulong_sat_sub( highest_cluster_slot, ctx->config.maximum_local_snapshot_age ) ) ) {
379 0 : FD_LOG_NOTICE(( "loading full snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
380 0 : ctx->state = FD_SNAPRD_STATE_READING_FULL_FILE;
381 0 : } else {
382 0 : FD_LOG_NOTICE(( "downloading full snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( best.addr ), best.port ));
383 0 : ctx->addr = best;
384 0 : ctx->state = FD_SNAPRD_STATE_READING_FULL_HTTP;
385 0 : fd_sshttp_init( ctx->sshttp, best, "/snapshot.tar.bz2", 17UL, now );
386 0 : }
387 0 : break;
388 0 : }
389 0 : case FD_SNAPRD_STATE_READING_FULL_FILE:
390 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE:
391 0 : read_file_data( ctx, stem );
392 0 : break;
393 0 : case FD_SNAPRD_STATE_READING_FULL_HTTP:
394 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP: {
395 0 : read_http_data( ctx, stem, now );
396 0 : break;
397 0 : }
398 0 : case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE:
399 0 : case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP:
400 0 : if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
401 0 : ctx->ack_cnt = 0UL;
402 :
403 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
404 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
405 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
406 0 : ctx->malformed = 0;
407 0 : break;
408 0 : }
409 :
410 0 : if( FD_UNLIKELY( ctx->local_out.write_buffer_len ) ) break;
411 :
412 0 : rename_snapshots( ctx );
413 0 : ctx->state = FD_SNAPRD_STATE_SHUTDOWN;
414 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
415 0 : break;
416 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_FILE:
417 0 : if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
418 0 : ctx->ack_cnt = 0UL;
419 :
420 0 : if( FD_LIKELY( !ctx->config.incremental_snapshot_fetch ) ) {
421 0 : ctx->state = FD_SNAPRD_STATE_SHUTDOWN;
422 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
423 0 : break;
424 0 : }
425 :
426 0 : FD_LOG_NOTICE(( "reading incremental snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path ));
427 0 : ctx->state = FD_SNAPRD_STATE_READING_INCREMENTAL_FILE;
428 0 : break;
429 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP:
430 0 : if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
431 0 : ctx->ack_cnt = 0UL;
432 :
433 0 : if( FD_UNLIKELY( ctx->malformed ) ) {
434 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
435 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
436 0 : ctx->malformed = 0;
437 0 : break;
438 0 : }
439 :
440 0 : if( FD_UNLIKELY( ctx->local_out.write_buffer_len ) ) break;
441 :
442 0 : if( FD_LIKELY( !ctx->config.incremental_snapshot_fetch ) ) {
443 0 : rename_snapshots( ctx );
444 0 : ctx->state = FD_SNAPRD_STATE_SHUTDOWN;
445 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL, 0UL, 0UL, 0UL, 0UL );
446 0 : break;
447 0 : }
448 :
449 0 : FD_LOG_NOTICE(( "downloading incremental snapshot from http://" FD_IP4_ADDR_FMT ":%hu/incremental-snapshot.tar.bz2", FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
450 0 : fd_sshttp_init( ctx->sshttp, ctx->addr, "/incremental-snapshot.tar.bz2", 29UL, fd_log_wallclock() );
451 0 : ctx->state = FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP;
452 0 : break;
453 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET:
454 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_FILE_RESET:
455 0 : if( FD_UNLIKELY( ctx->ack_cnt<NUM_SNAP_CONSUMERS ) ) break;
456 0 : ctx->ack_cnt = 0UL;
457 :
458 0 : ctx->metrics.full.bytes_read = 0UL;
459 0 : ctx->metrics.full.bytes_written = 0UL;
460 0 : ctx->metrics.full.bytes_total = 0UL;
461 :
462 0 : ctx->metrics.incremental.bytes_read = 0UL;
463 0 : ctx->metrics.incremental.bytes_written = 0UL;
464 0 : ctx->metrics.incremental.bytes_total = 0UL;
465 :
466 0 : ctx->state = FD_SNAPRD_STATE_COLLECTING_PEERS;
467 0 : ctx->deadline_nanos = 0L;
468 0 : break;
469 0 : case FD_SNAPRD_STATE_SHUTDOWN:
470 0 : break;
471 0 : default: {
472 0 : FD_LOG_ERR(( "unexpected state %d", ctx->state ));
473 0 : break;
474 0 : }
475 0 : }
476 0 : }
477 :
478 : static void
479 : after_frag( fd_snaprd_tile_t * ctx,
480 : ulong in_idx,
481 : ulong seq,
482 : ulong sig,
483 : ulong sz,
484 : ulong tsorig,
485 : ulong tspub,
486 0 : fd_stem_context_t * stem ) {
487 0 : (void)in_idx;
488 0 : (void)seq;
489 0 : (void)tsorig;
490 0 : (void)tspub;
491 0 : (void)sz;
492 :
493 0 : FD_TEST( sig==FD_SNAPSHOT_MSG_CTRL_ACK || sig==FD_SNAPSHOT_MSG_CTRL_MALFORMED );
494 :
495 0 : if( FD_LIKELY( sig==FD_SNAPSHOT_MSG_CTRL_ACK ) ) ctx->ack_cnt++;
496 0 : else {
497 0 : FD_TEST( ctx->state!=FD_SNAPRD_STATE_SHUTDOWN &&
498 0 : ctx->state!=FD_SNAPRD_STATE_COLLECTING_PEERS &&
499 0 : ctx->state!=FD_SNAPRD_STATE_WAITING_FOR_PEERS );
500 :
501 0 : switch( ctx->state) {
502 0 : case FD_SNAPRD_STATE_READING_FULL_FILE:
503 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_FILE:
504 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_FILE_RESET:
505 0 : FD_LOG_ERR(( "error reading snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
506 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_FILE:
507 0 : case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_FILE:
508 0 : FD_LOG_ERR(( "error reading snapshot from local file `%s`", ctx->local_in.incremental_snapshot_path ));
509 0 : case FD_SNAPRD_STATE_READING_FULL_HTTP:
510 0 : case FD_SNAPRD_STATE_READING_INCREMENTAL_HTTP:
511 0 : FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
512 0 : FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
513 0 : fd_sshttp_cancel( ctx->sshttp );
514 0 : fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
515 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_RESET_FULL, 0UL, 0UL, 0UL, 0UL, 0UL );
516 0 : ctx->state = FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET;
517 0 : break;
518 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP:
519 0 : case FD_SNAPRD_STATE_FLUSHING_INCREMENTAL_HTTP:
520 0 : FD_LOG_NOTICE(( "error downloading snapshot from http://" FD_IP4_ADDR_FMT ":%hu/snapshot.tar.bz2",
521 0 : FD_IP4_ADDR_FMT_ARGS( ctx->addr.addr ), ctx->addr.port ));
522 0 : fd_sshttp_cancel( ctx->sshttp );
523 0 : fd_ssping_invalidate( ctx->ssping, ctx->addr, fd_log_wallclock() );
524 : /* We would like to transition to FULL_HTTP_RESET, but we can't
525 : do it just yet, because we have already sent a DONE control
526 : fragment, and need to wait for acknowledges to come back
527 : first, to ensure there's only one control message outstanding
528 : at a time. */
529 0 : ctx->malformed = 1;
530 0 : break;
531 0 : case FD_SNAPRD_STATE_FLUSHING_FULL_HTTP_RESET:
532 0 : break;
533 0 : default:
534 0 : FD_LOG_ERR(( "unexpected state %d", ctx->state ));
535 0 : break;
536 0 : }
537 0 : }
538 0 : }
539 :
540 : static void
541 : privileged_init( fd_topo_t * topo,
542 0 : fd_topo_tile_t * tile ) {
543 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
544 :
545 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
546 0 : fd_snaprd_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaprd_tile_t), sizeof(fd_snaprd_tile_t) );
547 :
548 : /* By default, the snaprd tile selects peers and its initial state is
549 : WAITING_FOR_PEERS. */
550 0 : ctx->peer_selection = 1;
551 0 : ctx->state = FD_SNAPRD_STATE_WAITING_FOR_PEERS;
552 :
553 0 : ulong full_slot = ULONG_MAX;
554 0 : ulong incremental_slot = ULONG_MAX;
555 0 : char full_path[ PATH_MAX ] = {0};
556 0 : char incremental_path[ PATH_MAX ] = {0};
557 0 : if( FD_UNLIKELY( -1==fd_ssarchive_latest_pair( tile->snaprd.snapshots_path,
558 0 : tile->snaprd.incremental_snapshot_fetch,
559 0 : &full_slot,
560 0 : &incremental_slot,
561 0 : full_path,
562 0 : incremental_path ) ) ) {
563 0 : ctx->local_in.full_snapshot_slot = ULONG_MAX;
564 0 : ctx->local_in.incremental_snapshot_slot = ULONG_MAX;
565 :
566 0 : ctx->local_out.dir_fd = open( tile->snaprd.snapshots_path, O_DIRECTORY|O_CLOEXEC );
567 0 : if( FD_UNLIKELY( -1==ctx->local_out.dir_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", tile->snaprd.snapshots_path, errno, fd_io_strerror( errno ) ));
568 :
569 0 : FD_TEST( fd_cstr_printf_check( ctx->local_out.full_snapshot_path, PATH_MAX, NULL, "%s/snapshot.tar.bz2-partial", tile->snaprd.snapshots_path ) );
570 0 : ctx->local_out.full_snapshot_fd = openat( ctx->local_out.dir_fd, "snapshot.tar.bz2-partial", O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
571 0 : if( FD_UNLIKELY( -1==ctx->local_out.full_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_out.full_snapshot_path, errno, fd_io_strerror( errno ) ));
572 :
573 0 : if( FD_LIKELY( tile->snaprd.incremental_snapshot_fetch ) ) {
574 0 : FD_TEST( fd_cstr_printf_check( ctx->local_out.incremental_snapshot_path, PATH_MAX, NULL, "%s/incremental-snapshot.tar.bz2-partial", tile->snaprd.snapshots_path ) );
575 0 : ctx->local_out.incremental_snapshot_fd = openat( ctx->local_out.dir_fd, "incremental-snapshot.tar.bz2-partial", O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
576 0 : if( FD_UNLIKELY( -1==ctx->local_out.incremental_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_out.incremental_snapshot_path, errno, fd_io_strerror( errno ) ));
577 0 : } else {
578 0 : ctx->local_out.incremental_snapshot_fd = -1;
579 0 : }
580 :
581 0 : } else {
582 0 : FD_TEST( full_slot!=ULONG_MAX );
583 :
584 0 : ctx->local_in.full_snapshot_slot = full_slot;
585 0 : ctx->local_in.incremental_snapshot_slot = incremental_slot;
586 :
587 0 : strncpy( ctx->local_in.full_snapshot_path, full_path, PATH_MAX );
588 0 : ctx->local_in.full_snapshot_fd = open( ctx->local_in.full_snapshot_path, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
589 0 : if( FD_UNLIKELY( -1==ctx->local_in.full_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_in.full_snapshot_path, errno, fd_io_strerror( errno ) ));
590 :
591 0 : if( tile->snaprd.incremental_snapshot_fetch ) {
592 0 : FD_TEST( incremental_slot!=ULONG_MAX );
593 0 : }
594 :
595 0 : if( FD_LIKELY( incremental_slot!=ULONG_MAX ) ) {
596 0 : strncpy( ctx->local_in.incremental_snapshot_path, incremental_path, PATH_MAX );
597 0 : ctx->local_in.incremental_snapshot_fd = open( ctx->local_in.incremental_snapshot_path, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
598 0 : if( FD_UNLIKELY( -1==ctx->local_in.incremental_snapshot_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", ctx->local_in.incremental_snapshot_path, errno, fd_io_strerror( errno ) ));
599 0 : }
600 :
601 0 : ctx->local_out.dir_fd = -1;
602 0 : ctx->local_out.full_snapshot_fd = -1;
603 0 : ctx->local_out.incremental_snapshot_fd = -1;
604 :
605 0 : if( FD_UNLIKELY( tile->snaprd.maximum_local_snapshot_age==0 ) ) {
606 : /* Disable peer selection if we are reading snapshots from disk
607 : and there is no maximum local snapshot age set.
608 : Set the initial state to READING_FULL_FILE to avoid peer
609 : selection logic. */
610 0 : ctx->peer_selection = 0;
611 0 : ctx->state = FD_SNAPRD_STATE_READING_FULL_FILE;
612 0 : FD_LOG_NOTICE(( "loading full snapshot from local file `%s`", ctx->local_in.full_snapshot_path ));
613 0 : }
614 0 : }
615 0 : }
616 :
617 : static void
618 : unprivileged_init( fd_topo_t * topo,
619 0 : fd_topo_tile_t * tile ) {
620 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
621 :
622 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
623 0 : fd_snaprd_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaprd_tile_t), sizeof(fd_snaprd_tile_t) );
624 0 : void * _sshttp = FD_SCRATCH_ALLOC_APPEND( l, fd_sshttp_align(), fd_sshttp_footprint() );
625 0 : void * _ssping = FD_SCRATCH_ALLOC_APPEND( l, fd_ssping_align(), fd_ssping_footprint( 65536UL ) );
626 :
627 0 : ctx->ack_cnt = 0UL;
628 0 : ctx->malformed = 0;
629 :
630 0 : ctx->local_out.write_buffer_pos = 0UL;
631 0 : ctx->local_out.write_buffer_len = 0UL;
632 :
633 0 : fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
634 :
635 0 : fd_memcpy( ctx->config.path, tile->snaprd.snapshots_path, PATH_MAX );
636 0 : ctx->config.incremental_snapshot_fetch = tile->snaprd.incremental_snapshot_fetch;
637 0 : ctx->config.do_download = tile->snaprd.do_download;
638 0 : ctx->config.maximum_local_snapshot_age = tile->snaprd.maximum_local_snapshot_age;
639 0 : ctx->config.minimum_download_speed_mib = tile->snaprd.minimum_download_speed_mib;
640 :
641 0 : if( FD_UNLIKELY( !tile->snaprd.maximum_download_retry_abort ) ) ctx->config.maximum_download_retry_abort = UINT_MAX;
642 0 : else ctx->config.maximum_download_retry_abort = tile->snaprd.maximum_download_retry_abort;
643 :
644 0 : ctx->ssping = fd_ssping_join( fd_ssping_new( _ssping, 65536UL, 1UL ) );
645 0 : FD_TEST( ctx->ssping );
646 :
647 0 : ctx->sshttp = fd_sshttp_join( fd_sshttp_new( _sshttp ) );
648 0 : FD_TEST( ctx->sshttp );
649 :
650 0 : if( FD_LIKELY( !strcmp( tile->snaprd.cluster, "testnet" ) ) ) {
651 0 : fd_ip4_port_t initial_peers[ 2UL ] = {
652 0 : { .addr = FD_IP4_ADDR( 35 , 214, 172, 227 ), .port = 8899 },
653 0 : { .addr = FD_IP4_ADDR( 145, 40 , 95 , 69 ), .port = 8899 }, /* Solana testnet peer */
654 0 : };
655 0 : for( ulong i=0UL; i<2UL; i++ ) fd_ssping_add( ctx->ssping, initial_peers[ i ] );
656 0 : } else if( FD_LIKELY( !strcmp( tile->snaprd.cluster, "private" ) ) ) {
657 0 : fd_ip4_port_t initial_peers[ 1UL ] = {
658 0 : { .addr = FD_IP4_ADDR( 147, 28, 185, 47 ), .port = 8899 } /* A private cluster peer */
659 0 : };
660 0 : for( ulong i=0UL; i<1UL; i++ ) fd_ssping_add( ctx->ssping, initial_peers[ i ] );
661 0 : } else if (FD_LIKELY( !strcmp( tile->snaprd.cluster, "mainnet" ) ) ) {
662 0 : fd_ip4_port_t initial_peers[ 3UL ] = {
663 0 : { .addr = FD_IP4_ADDR( 149, 255, 37 , 130 ), .port = 8899 },
664 0 : { .addr = FD_IP4_ADDR( 34 , 1 , 238, 227 ), .port = 8899 },
665 0 : { .addr = FD_IP4_ADDR( 34 , 1 , 139, 131 ), .port = 8899 }
666 0 : };
667 0 : for( ulong i=0UL; i<3UL; i++ ) fd_ssping_add( ctx->ssping, initial_peers[ i ] );
668 0 : }
669 0 : else {
670 0 : FD_LOG_ERR(( "unexpected cluster %s", tile->snaprd.cluster ));
671 0 : }
672 :
673 0 : if( FD_UNLIKELY( tile->out_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 1", tile->out_cnt ));
674 :
675 0 : ctx->out.wksp = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ 0 ] ].dcache_obj_id ].wksp_id ].wksp;
676 0 : ctx->out.chunk0 = fd_dcache_compact_chunk0( ctx->out.wksp, topo->links[ tile->out_link_id[ 0 ] ].dcache );
677 0 : ctx->out.wmark = fd_dcache_compact_wmark ( ctx->out.wksp, topo->links[ tile->out_link_id[ 0 ] ].dcache, topo->links[ tile->out_link_id[ 0 ] ].mtu );
678 0 : ctx->out.chunk = ctx->out.chunk0;
679 0 : ctx->out.mtu = topo->links[ tile->out_link_id[ 0 ] ].mtu;
680 0 : }
681 :
682 0 : #define STEM_BURST 2UL /* One control message, and one data message */
683 0 : #define STEM_LAZY 1000L
684 :
685 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snaprd_tile_t
686 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaprd_tile_t)
687 :
688 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
689 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
690 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
691 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
692 :
693 : #include "../../disco/stem/fd_stem.c"
694 :
695 : fd_topo_run_tile_t fd_tile_snaprd = {
696 : .name = NAME,
697 : .scratch_align = scratch_align,
698 : .scratch_footprint = scratch_footprint,
699 : .privileged_init = privileged_init,
700 : .unprivileged_init = unprivileged_init,
701 : .run = stem_run,
702 : .keep_host_networking = 1,
703 : .allow_connect = 1
704 : };
705 :
706 : #undef NAME
|