Line data Source code
1 : #define _GNU_SOURCE
2 : #include "utils/fd_ssarchive.h"
3 : #include "utils/fd_ssctrl.h"
4 : #include "utils/fd_sshttp.h"
5 :
6 : #include "../../disco/topo/fd_topo.h"
7 : #include "../../disco/metrics/fd_metrics.h"
8 : #include "../../waltz/openssl/fd_openssl_tile.h"
9 :
10 : #include <sys/mman.h> /* memfd_create */
11 : #include <errno.h>
12 : #include <fcntl.h>
13 : #include <unistd.h>
14 : #include <sys/socket.h>
15 :
16 : #include "generated/fd_snapld_tile_seccomp.h"
17 :
18 : #define NAME "snapld"
19 :
20 : /* download progress in each 10 second window must be at
21 : min_download_speed_mibs * 10 seconds or higher. Catches extremely
22 : slow download speeds where we may not get to 100 MiB downloaded for a
23 : while. */
24 0 : #define FD_SNAPLD_DOWNLOAD_WINDOW_NS (10L*1000L*1000L*1000L) /* 10 seconds */
25 :
26 : /* The snapld tile is responsible for loading data from the local file
27 : or from an HTTP/TCP connection and sending it to the snapdc tile
28 : for later decompression. */
29 :
30 : typedef struct fd_snapld_tile {
31 :
32 : struct {
33 : char path[ PATH_MAX ];
34 : uint min_download_speed_mibs;
35 : } config;
36 :
37 : int state;
38 : int load_full;
39 : int load_file;
40 : int sent_meta;
41 :
42 : ulong bytes_in_batch;
43 : double download_speed_mibs;
44 : long start_batch;
45 : long end_batch;
46 :
47 : ulong bytes_in_window;
48 : ulong min_bytes_in_window;
49 : long window_deadline;
50 :
51 : int local_full_fd;
52 : int local_incr_fd;
53 : int sockfd;
54 :
55 : fd_sshttp_t * sshttp;
56 :
57 : struct {
58 : void const * base;
59 : } in_rd;
60 :
61 : struct {
62 : fd_wksp_t * mem;
63 : ulong chunk0;
64 : ulong wmark;
65 : ulong chunk;
66 : ulong mtu;
67 : } out_dc;
68 :
69 : } fd_snapld_tile_t;
70 :
71 : static ulong
72 0 : scratch_align( void ) {
73 0 : ulong a = alignof(fd_snapld_tile_t);
74 0 : a = fd_ulong_max( a, fd_sshttp_align() );
75 0 : a = fd_ulong_max( a, fd_alloc_align() );
76 0 : return a;
77 0 : }
78 :
79 : static ulong
80 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
81 0 : ulong l = FD_LAYOUT_INIT;
82 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snapld_tile_t), sizeof(fd_snapld_tile_t) );
83 0 : l = FD_LAYOUT_APPEND( l, fd_sshttp_align(), fd_sshttp_footprint() );
84 0 : l = FD_LAYOUT_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
85 0 : return FD_LAYOUT_FINI( l, scratch_align() );
86 0 : }
87 :
88 : FD_FN_CONST static inline ulong
89 0 : loose_footprint( fd_topo_tile_t const * tile ) {
90 0 : (void)tile;
91 : /* Leftover space for OpenSSL allocations */
92 0 : return 1UL<<26UL; /* 64 MiB */
93 0 : }
94 :
95 : static void
96 : privileged_init( fd_topo_t const * topo,
97 0 : fd_topo_tile_t const * tile ) {
98 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
99 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
100 0 : fd_snapld_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapld_tile_t), sizeof(fd_snapld_tile_t) );
101 0 : void * _sshttp = FD_SCRATCH_ALLOC_APPEND( l, fd_sshttp_align(), fd_sshttp_footprint() );
102 :
103 0 : #if FD_HAS_OPENSSL
104 0 : void * _alloc = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
105 0 : fd_alloc_t * alloc = fd_alloc_join( fd_alloc_new( _alloc, 1UL ), tile->kind_id );
106 0 : fd_ossl_tile_init( alloc );
107 0 : #endif
108 :
109 0 : ctx->sshttp = fd_sshttp_join( fd_sshttp_new( _sshttp ) );
110 0 : FD_TEST( ctx->sshttp );
111 :
112 0 : ulong full_slot = ULONG_MAX;
113 0 : ulong incr_slot = ULONG_MAX;
114 0 : int full_is_zstd = 0;
115 0 : int incr_is_zstd = 0;
116 0 : char full_path[ PATH_MAX ] = { 0 };
117 0 : char incr_path[ PATH_MAX ] = { 0 };
118 0 : uchar full_snapshot_hash[ FD_HASH_FOOTPRINT ] = { 0 };
119 0 : uchar incr_snapshot_hash[ FD_HASH_FOOTPRINT ] = { 0 };
120 0 : ctx->local_full_fd = -1;
121 0 : ctx->local_incr_fd = -1;
122 : /* fd_ssarchive_latest_pair needs to be invoked here, irrespective
123 : of whether snapct may do the same, because this information is
124 : needed here during privileged_init. */
125 0 : if( FD_LIKELY( -1!=fd_ssarchive_latest_pair( tile->snapld.snapshots_path,
126 0 : tile->snapld.incremental_snapshots,
127 0 : &full_slot, &incr_slot,
128 0 : full_path, incr_path,
129 0 : &full_is_zstd, &incr_is_zstd,
130 0 : full_snapshot_hash, incr_snapshot_hash ) ) ) {
131 0 : FD_TEST( full_slot!=ULONG_MAX );
132 :
133 0 : ctx->local_full_fd = open( full_path, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
134 0 : if( FD_UNLIKELY( -1==ctx->local_full_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", full_path, errno, fd_io_strerror( errno ) ));
135 :
136 0 : if( FD_LIKELY( incr_slot!=ULONG_MAX ) ) {
137 0 : ctx->local_incr_fd = open( incr_path, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
138 0 : if( FD_UNLIKELY( -1==ctx->local_incr_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", incr_path, errno, fd_io_strerror( errno ) ));
139 0 : }
140 0 : }
141 :
142 : /* Create a temporary file descriptor for our socket file descriptor.
143 : It is closed later in unprivileged init so that the sandbox sees
144 : an existent file descriptor. */
145 0 : ctx->sockfd = memfd_create( "snapld.sockfd", 0 );
146 0 : if( FD_UNLIKELY( -1==ctx->sockfd ) ) FD_LOG_ERR(( "memfd_create() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
147 0 : }
148 :
149 : static ulong
150 : populate_allowed_fds( fd_topo_t const * topo,
151 : fd_topo_tile_t const * tile,
152 : ulong out_fds_cnt,
153 0 : int * out_fds ) {
154 0 : if( FD_UNLIKELY( out_fds_cnt<4UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
155 :
156 0 : ulong out_cnt = 0;
157 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
158 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
159 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd();
160 0 : }
161 :
162 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
163 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
164 0 : fd_snapld_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapld_tile_t), sizeof(fd_snapld_tile_t) );
165 0 : if( FD_LIKELY( -1!=ctx->local_full_fd ) ) out_fds[ out_cnt++ ] = ctx->local_full_fd;
166 0 : if( FD_LIKELY( -1!=ctx->local_incr_fd ) ) out_fds[ out_cnt++ ] = ctx->local_incr_fd;
167 0 : out_fds[ out_cnt++ ] = ctx->sockfd;
168 :
169 0 : return out_cnt;
170 0 : }
171 :
172 : static ulong
173 : populate_allowed_seccomp( fd_topo_t const * topo,
174 : fd_topo_tile_t const * tile,
175 : ulong out_cnt,
176 0 : struct sock_filter * out ) {
177 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
178 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
179 0 : fd_snapld_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapld_tile_t), sizeof(fd_snapld_tile_t) );
180 :
181 0 : populate_sock_filter_policy_fd_snapld_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->local_full_fd, (uint)ctx->local_incr_fd, (uint)ctx->sockfd );
182 0 : return sock_filter_policy_fd_snapld_tile_instr_cnt;
183 0 : }
184 :
185 : static void
186 : unprivileged_init( fd_topo_t const * topo,
187 0 : fd_topo_tile_t const * tile ) {
188 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
189 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
190 0 : fd_snapld_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapld_tile_t), sizeof(fd_snapld_tile_t) );
191 0 : FD_SCRATCH_ALLOC_APPEND( l, fd_sshttp_align(), fd_sshttp_footprint() );
192 0 : FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
193 :
194 0 : fd_memcpy( ctx->config.path, tile->snapld.snapshots_path, PATH_MAX );
195 0 : ctx->config.min_download_speed_mibs = tile->snapld.min_download_speed_mibs;
196 :
197 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
198 :
199 0 : ctx->download_speed_mibs = 0.0;
200 0 : ctx->bytes_in_batch = 0UL;
201 0 : ctx->start_batch = 0L;
202 0 : ctx->end_batch = 0L;
203 0 : ctx->bytes_in_window = 0UL;
204 0 : ctx->window_deadline = LONG_MAX;
205 0 : ctx->min_bytes_in_window = ((ulong)ctx->config.min_download_speed_mibs * (FD_SNAPLD_DOWNLOAD_WINDOW_NS / (ulong)1e9))<<20UL;
206 :
207 0 : FD_TEST( tile->in_cnt==1UL );
208 0 : fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0 ] ];
209 0 : FD_TEST( 0==strcmp( in_link->name, "snapct_ld" ) );
210 0 : ctx->in_rd.base = fd_topo_obj_wksp_base( topo, in_link->dcache_obj_id );
211 :
212 0 : FD_TEST( tile->out_cnt==1UL );
213 0 : fd_topo_link_t const * out_link = &topo->links[ tile->out_link_id[ 0 ] ];
214 0 : FD_TEST( 0==strcmp( out_link->name, "snapld_dc" ) );
215 0 : ctx->out_dc.mem = fd_topo_obj_wksp_base( topo, out_link->dcache_obj_id );
216 0 : ctx->out_dc.chunk0 = fd_dcache_compact_chunk0( ctx->out_dc.mem, out_link->dcache );
217 0 : ctx->out_dc.wmark = fd_dcache_compact_wmark ( ctx->out_dc.mem, out_link->dcache, out_link->mtu );
218 0 : ctx->out_dc.chunk = ctx->out_dc.chunk0;
219 0 : ctx->out_dc.mtu = out_link->mtu;
220 :
221 : /* We can only close the temporary socket file descriptor after
222 : entering the sandbox because the sandbox checks all file
223 : descriptors are existent. */
224 0 : if( -1==close( ctx->sockfd ) ) FD_LOG_ERR((" close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
225 :
226 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
227 0 : if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
228 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
229 0 : }
230 :
231 : static int
232 0 : should_shutdown( fd_snapld_tile_t * ctx ) {
233 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
234 0 : }
235 :
236 : static void
237 0 : metrics_write( fd_snapld_tile_t * ctx ) {
238 0 : #if FD_HAS_OPENSSL
239 0 : FD_MCNT_SET( SNAPLD, SSL_ALLOC_FAILED, fd_ossl_alloc_errors );
240 0 : #endif
241 0 : FD_MGAUGE_SET( SNAPLD, STATE, (ulong)(ctx->state) );
242 0 : }
243 :
244 : static void
245 : transition_malformed( fd_snapld_tile_t * ctx,
246 0 : fd_stem_context_t * stem ) {
247 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) return;
248 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
249 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
250 0 : }
251 :
252 : static int
253 : check_download_progress( fd_snapld_tile_t * ctx,
254 : fd_stem_context_t * stem,
255 : int downloading,
256 0 : long now ) {
257 0 : if( FD_UNLIKELY( ctx->window_deadline==LONG_MAX && downloading ) ) {
258 0 : ctx->window_deadline = now + FD_SNAPLD_DOWNLOAD_WINDOW_NS;
259 0 : ctx->bytes_in_window = 0UL;
260 0 : }
261 :
262 0 : if( FD_UNLIKELY( now>ctx->window_deadline ) ) {
263 0 : if( FD_UNLIKELY( ctx->bytes_in_window<ctx->min_bytes_in_window ) ) {
264 : /* cancel the download if the download progress speed in the last
265 : window is less than the minimum download speed. */
266 0 : double download_speed_mibs = (double)ctx->bytes_in_window / (double)(FD_SNAPLD_DOWNLOAD_WINDOW_NS / 1e9) / (double)(1<<20UL);
267 0 : FD_LOG_WARNING(( "download progress of %.2f MiB/s in the last %lu seconds for %s snapshot "
268 0 : "is below the minimum download speed %u MiB/s, cancelling download",
269 0 : download_speed_mibs, FD_SNAPLD_DOWNLOAD_WINDOW_NS / (ulong)1e9,
270 0 : ctx->load_full ? "full" : "incremental", ctx->config.min_download_speed_mibs ));
271 0 : transition_malformed( ctx, stem );
272 0 : fd_sshttp_cancel( ctx->sshttp );
273 0 : return -1;
274 0 : }
275 0 : ctx->window_deadline = now + FD_SNAPLD_DOWNLOAD_WINDOW_NS;
276 0 : ctx->bytes_in_window = 0UL;
277 0 : }
278 0 : return 0;
279 0 : }
280 :
281 : static void
282 : after_credit( fd_snapld_tile_t * ctx,
283 : fd_stem_context_t * stem,
284 : int * opt_poll_in FD_PARAM_UNUSED,
285 0 : int * charge_busy ) {
286 0 : if( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) {
287 0 : fd_log_sleep( (long)1e6 );
288 0 : return;
289 0 : }
290 :
291 0 : uchar * out = fd_chunk_to_laddr( ctx->out_dc.mem, ctx->out_dc.chunk );
292 :
293 0 : if( ctx->load_file ) {
294 0 : long result = read( ctx->load_full ? ctx->local_full_fd : ctx->local_incr_fd, out, ctx->out_dc.mtu );
295 0 : if( FD_UNLIKELY( result<=0L ) ) {
296 0 : if( result==0L ) {
297 0 : FD_LOG_INFO(( "finished reading %s snapshot from local file", ctx->load_full ? "full" : "incremental" ));
298 0 : ctx->state = FD_SNAPSHOT_STATE_FINISHING;
299 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_LOAD_COMPLETE, 0UL, 0UL, 0UL, 0UL, 0UL );
300 0 : } else if( FD_UNLIKELY( errno!=EAGAIN && errno!=EINTR ) ) {
301 0 : FD_LOG_WARNING(( "read() failed on %s snapshot file (%i-%s)", ctx->load_full ? "full" : "incremental", errno, fd_io_strerror( errno ) ));
302 0 : transition_malformed( ctx, stem );
303 0 : return; /* verbose return */
304 0 : }
305 0 : } else {
306 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out_dc.chunk, (ulong)result, 0UL, 0UL, 0UL );
307 0 : ctx->out_dc.chunk = fd_dcache_compact_next( ctx->out_dc.chunk, (ulong)result, ctx->out_dc.chunk0, ctx->out_dc.wmark );
308 0 : *charge_busy = 1;
309 0 : return; /* verbose return */
310 0 : }
311 0 : } else {
312 0 : int downloading = 0;
313 0 : ulong data_len = ctx->out_dc.mtu;
314 0 : long now = fd_log_wallclock();
315 0 : int result = fd_sshttp_advance( ctx->sshttp, &data_len, out, &downloading, now );
316 0 : switch( result ) {
317 0 : case FD_SSHTTP_ADVANCE_AGAIN:
318 : /* Return value ignored: on failure, check_download_progress
319 : already calls transition_malformed and fd_sshttp_cancel. */
320 0 : check_download_progress( ctx, stem, downloading, now );
321 0 : break;
322 0 : case FD_SSHTTP_ADVANCE_DATA: {
323 0 : ctx->bytes_in_window += data_len;
324 0 : if( FD_UNLIKELY( -1==check_download_progress( ctx, stem, downloading, now ) ) ) break;
325 0 : if( FD_UNLIKELY( !ctx->sent_meta ) ) {
326 : /* On the first DATA return, the HTTP headers are available
327 : for use. We need to send this metadata downstream, but
328 : need to do so before any data frags. So, we copy any data
329 : we received with the headers (if any) to the next dcache
330 : chunk and then publish both in order. */
331 0 : ctx->start_batch = fd_log_wallclock();
332 0 : fd_ssctrl_meta_t * meta = (fd_ssctrl_meta_t *)out;
333 0 : ulong next_chunk = fd_dcache_compact_next( ctx->out_dc.chunk, sizeof(fd_ssctrl_meta_t), ctx->out_dc.chunk0, ctx->out_dc.wmark );
334 0 : memmove( fd_chunk_to_laddr( ctx->out_dc.mem, next_chunk ), out, data_len );
335 0 : meta->total_sz = fd_sshttp_content_len( ctx->sshttp );
336 0 : if( FD_UNLIKELY( meta->total_sz==ULONG_MAX ) ) {
337 0 : FD_LOG_WARNING(( "HTTP response for %s snapshot is missing Content-Length header", ctx->load_full ? "full" : "incremental" ));
338 0 : transition_malformed( ctx, stem );
339 0 : fd_sshttp_cancel( ctx->sshttp );
340 0 : break;
341 0 : }
342 0 : ctx->sent_meta = 1;
343 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_META, ctx->out_dc.chunk, sizeof(fd_ssctrl_meta_t), 0UL, 0UL, 0UL );
344 0 : ctx->out_dc.chunk = next_chunk;
345 0 : }
346 0 : if( FD_LIKELY( data_len!=0UL ) ) {
347 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out_dc.chunk, data_len, 0UL, 0UL, 0UL );
348 0 : ctx->out_dc.chunk = fd_dcache_compact_next( ctx->out_dc.chunk, data_len, ctx->out_dc.chunk0, ctx->out_dc.wmark );
349 0 : ctx->bytes_in_batch += data_len;
350 :
351 : /* measure download speed every 100 MiB */
352 0 : if(ctx->bytes_in_batch>=100<<20UL) {
353 0 : ctx->end_batch = fd_log_wallclock();
354 : /* as a precaution, make sure elapsed_batch is positive
355 : and larger than zero (to avoid division by zero). */
356 0 : long elapsed_batch = fd_long_if( ctx->end_batch > ctx->start_batch, ctx->end_batch - ctx->start_batch, 1L );
357 : /* download speed in MiB/s = bytes/nanoseconds * 1e9/(1 second) * 1/(1MiB = 1<<20UL) = 1e9/(1024*1024) ~= 954 */
358 0 : ctx->download_speed_mibs = (double)(ctx->bytes_in_batch*954) / (double)elapsed_batch;
359 0 : if( FD_UNLIKELY( ctx->download_speed_mibs<ctx->config.min_download_speed_mibs ) ) {
360 : /* cancel the snapshot load if the download speed is less
361 : than the minimum download speed. */
362 0 : FD_LOG_WARNING(( "download speed %.2f MiB/s on a batch of %lu MiB for %s snapshot is below the minimum threshold %.2f MiB/s. "
363 0 : "cancelling snapshot download",
364 0 : ctx->download_speed_mibs, ctx->bytes_in_batch>>20UL, ctx->load_full ? "full" : "incremental",
365 0 : (double)(ctx->config.min_download_speed_mibs) ));
366 0 : transition_malformed( ctx, stem );
367 0 : fd_sshttp_cancel( ctx->sshttp );
368 0 : break;
369 0 : }
370 0 : ctx->start_batch = ctx->end_batch;
371 0 : ctx->bytes_in_batch = 0UL;
372 0 : }
373 0 : }
374 0 : *charge_busy = 1;
375 0 : break;
376 0 : }
377 0 : case FD_SSHTTP_ADVANCE_DONE:
378 0 : if( FD_UNLIKELY( !ctx->sent_meta ) ) {
379 0 : FD_LOG_WARNING(( "zero-length HTTP response for %s snapshot", ctx->load_full ? "full" : "incremental" ));
380 0 : transition_malformed( ctx, stem );
381 0 : fd_sshttp_cancel( ctx->sshttp );
382 0 : break;
383 0 : }
384 0 : FD_LOG_NOTICE(( "finished downloading %s snapshot", ctx->load_full ? "full" : "incremental" ));
385 0 : ctx->state = FD_SNAPSHOT_STATE_FINISHING;
386 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_LOAD_COMPLETE, 0UL, 0UL, 0UL, 0UL, 0UL );
387 0 : break;
388 0 : case FD_SSHTTP_ADVANCE_ERROR:
389 0 : FD_LOG_WARNING(( "HTTP advance error during %s snapshot download, entering error state",
390 0 : ctx->load_full ? "full" : "incremental" ));
391 0 : transition_malformed( ctx, stem );
392 0 : fd_sshttp_cancel( ctx->sshttp );
393 0 : break;
394 0 : default: FD_LOG_ERR(( "unexpected fd_sshttp_advance result %d for %s snapshot",
395 0 : result, ctx->load_full ? "full" : "incremental" ));
396 0 : }
397 0 : }
398 0 : }
399 :
400 : static int
401 : returnable_frag( fd_snapld_tile_t * ctx,
402 : ulong in_idx FD_PARAM_UNUSED,
403 : ulong seq FD_PARAM_UNUSED,
404 : ulong sig,
405 : ulong chunk,
406 : ulong sz,
407 : ulong ctl FD_PARAM_UNUSED,
408 : ulong tsorig FD_PARAM_UNUSED,
409 : ulong tspub FD_PARAM_UNUSED,
410 0 : fd_stem_context_t * stem ) {
411 0 : if( ctx->state==FD_SNAPSHOT_STATE_ERROR && sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) {
412 : /* Control messages move along the snapshot load pipeline. Since
413 : error conditions can be triggered by any tile in the pipeline,
414 : it is possible to be in error state and still receive otherwise
415 : valid messages. Only a fail message can revert this. */
416 0 : return 0;
417 0 : };
418 :
419 0 : int forward_msg = 1;
420 :
421 0 : switch( sig ) {
422 :
423 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
424 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR: {
425 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
426 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
427 0 : FD_TEST( sz==sizeof(fd_ssctrl_init_t) && sz<=ctx->out_dc.mtu );
428 0 : fd_ssctrl_init_t const * msg_in = fd_chunk_to_laddr_const( ctx->in_rd.base, chunk );
429 0 : ctx->load_full = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
430 0 : ctx->load_file = msg_in->file;
431 0 : ctx->sent_meta = 0;
432 :
433 0 : ctx->window_deadline = LONG_MAX;
434 0 : ctx->bytes_in_window = 0UL;
435 0 : long now = fd_log_wallclock();
436 0 : if( ctx->load_file ) {
437 0 : if( FD_UNLIKELY( 0!=lseek( ctx->load_full ? ctx->local_full_fd : ctx->local_incr_fd, 0, SEEK_SET ) ) )
438 0 : FD_LOG_ERR(( "lseek(0) failed on %s snapshot file (%i-%s)",
439 0 : ctx->load_full ? "full" : "incremental", errno, fd_io_strerror( errno ) ));
440 0 : } else {
441 0 : if( FD_UNLIKELY( fd_sshttp_init( ctx->sshttp, msg_in->addr, msg_in->hostname, msg_in->is_https, msg_in->path, msg_in->path_len, 4UL, now ) ) ) {
442 0 : transition_malformed( ctx, stem );
443 0 : forward_msg = 0;
444 0 : break;
445 0 : }
446 0 : }
447 0 : fd_ssctrl_init_t * msg_out = fd_chunk_to_laddr( ctx->out_dc.mem, ctx->out_dc.chunk );
448 0 : fd_memcpy( msg_out, msg_in, sz );
449 0 : fd_stem_publish( stem, 0UL, sig, ctx->out_dc.chunk, sz, 0UL, 0UL, 0UL );
450 0 : ctx->out_dc.chunk = fd_dcache_compact_next( ctx->out_dc.chunk, ctx->out_dc.mtu, ctx->out_dc.chunk0, ctx->out_dc.wmark );
451 0 : forward_msg = 0; // we are forwarding the control message in the `fd_sstrl_init_t` message
452 0 : break;
453 0 : }
454 :
455 0 : case FD_SNAPSHOT_MSG_CTRL_FINI: {
456 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
457 0 : break;
458 0 : }
459 :
460 0 : case FD_SNAPSHOT_MSG_CTRL_NEXT:
461 0 : case FD_SNAPSHOT_MSG_CTRL_DONE: {
462 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
463 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
464 0 : break;
465 0 : }
466 :
467 0 : case FD_SNAPSHOT_MSG_CTRL_ERROR: {
468 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
469 0 : fd_sshttp_cancel( ctx->sshttp );
470 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
471 0 : break;
472 0 : }
473 :
474 0 : case FD_SNAPSHOT_MSG_CTRL_FAIL:
475 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
476 0 : fd_sshttp_cancel( ctx->sshttp );
477 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
478 0 : break;
479 :
480 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: {
481 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
482 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
483 0 : break;
484 0 : }
485 :
486 : /* FD_SNAPSHOT_MSG_DATA is not possible */
487 0 : default: {
488 0 : FD_LOG_ERR(( "unexpected control frag %s (%lu) in state %s (%lu)",
489 0 : fd_ssctrl_msg_ctrl_str( sig ), sig,
490 0 : fd_ssctrl_state_str( (ulong)ctx->state ), (ulong)ctx->state ));
491 0 : break;
492 0 : }
493 0 : }
494 :
495 : /* Forward the control message down the pipeline */
496 0 : if( FD_LIKELY( forward_msg ) ) {
497 0 : fd_stem_publish( stem, 0UL, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
498 0 : }
499 :
500 0 : return 0;
501 0 : }
502 :
503 : /* Up to two frags from after_credit plus one from returnable_frag */
504 0 : #define STEM_BURST 3UL
505 :
506 0 : #define STEM_LAZY (128L*3000L)
507 :
508 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapld_tile_t
509 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapld_tile_t)
510 :
511 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
512 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
513 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
514 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
515 :
516 : #include "../../disco/stem/fd_stem.c"
517 :
518 : fd_topo_run_tile_t fd_tile_snapld = {
519 : .name = NAME,
520 : .populate_allowed_seccomp = populate_allowed_seccomp,
521 : .populate_allowed_fds = populate_allowed_fds,
522 : .scratch_align = scratch_align,
523 : .scratch_footprint = scratch_footprint,
524 : .loose_footprint = loose_footprint,
525 : .privileged_init = privileged_init,
526 : .unprivileged_init = unprivileged_init,
527 : .run = stem_run,
528 : .keep_host_networking = 1,
529 : .allow_connect = 1,
530 : .rlimit_file_cnt = 5UL, /* stderr, log, http, full/incr local files */
531 : };
532 :
533 : #undef NAME
|