Line data Source code
1 : #define _GNU_SOURCE
2 : #include "utils/fd_ssarchive.h"
3 : #include "utils/fd_ssctrl.h"
4 : #include "utils/fd_sshttp.h"
5 :
6 : #include "../../disco/topo/fd_topo.h"
7 : #include "../../disco/metrics/fd_metrics.h"
8 : #include "../../waltz/openssl/fd_openssl_tile.h"
9 :
10 : #include <sys/mman.h> /* memfd_create */
11 : #include <errno.h>
12 : #include <fcntl.h>
13 : #include <unistd.h>
14 : #include <sys/socket.h>
15 :
16 : #include "generated/fd_snapld_tile_seccomp.h"
17 :
18 : #define NAME "snapld"
19 :
20 : /* The snapld tile is responsible for loading data from the local file
21 : or from an HTTP/TCP connection and sending it to the snapdc tile
22 : for later decompression. */
23 :
24 : typedef struct fd_snapld_tile {
25 :
26 : struct {
27 : char path[ PATH_MAX ];
28 : } config;
29 :
30 : int state;
31 : int load_full;
32 : int load_file;
33 : int sent_meta;
34 :
35 : int awaiting_ack;
36 : ulong awaiting_ack_sig;
37 :
38 : int local_full_fd;
39 : int local_incr_fd;
40 : int sockfd;
41 :
42 : int is_https;
43 :
44 : fd_sshttp_t * sshttp;
45 :
46 : struct {
47 : void const * base;
48 : } in_rd;
49 :
50 : struct {
51 : fd_wksp_t * mem;
52 : ulong chunk0;
53 : ulong wmark;
54 : ulong chunk;
55 : ulong mtu;
56 : } out_dc;
57 :
58 : } fd_snapld_tile_t;
59 :
60 : static ulong
61 0 : scratch_align( void ) {
62 0 : return fd_ulong_max( alignof(fd_snapld_tile_t), fd_sshttp_align() );
63 0 : }
64 :
65 : static ulong
66 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
67 0 : ulong l = FD_LAYOUT_INIT;
68 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snapld_tile_t), sizeof(fd_snapld_tile_t) );
69 0 : l = FD_LAYOUT_APPEND( l, fd_sshttp_align(), fd_sshttp_footprint() );
70 0 : l = FD_LAYOUT_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
71 0 : return FD_LAYOUT_FINI( l, scratch_align() );
72 0 : }
73 :
74 : FD_FN_CONST static inline ulong
75 0 : loose_footprint( fd_topo_tile_t const * tile ) {
76 0 : (void)tile;
77 : /* Leftover space for OpenSSL allocations */
78 0 : return 1UL<<26UL; /* 64 MiB */
79 0 : }
80 :
81 : static void
82 : privileged_init( fd_topo_t * topo,
83 0 : fd_topo_tile_t * tile ) {
84 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
85 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
86 0 : fd_snapld_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapld_tile_t), sizeof(fd_snapld_tile_t) );
87 0 : void * _sshttp = FD_SCRATCH_ALLOC_APPEND( l, fd_sshttp_align(), fd_sshttp_footprint() );
88 :
89 0 : #if FD_HAS_OPENSSL
90 0 : void * _alloc = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
91 0 : fd_alloc_t * alloc = fd_alloc_join( fd_alloc_new( _alloc, 1UL ), tile->kind_id );
92 0 : fd_ossl_tile_init( alloc );
93 0 : #endif
94 :
95 0 : ctx->sshttp = fd_sshttp_join( fd_sshttp_new( _sshttp ) );
96 0 : FD_TEST( ctx->sshttp );
97 :
98 : /* FIXME: Allow incremental_snapshots=0 config */
99 0 : ulong full_slot = ULONG_MAX;
100 0 : ulong incr_slot = ULONG_MAX;
101 0 : int full_is_zstd = 0;
102 0 : int incr_is_zstd = 0;
103 0 : char full_path[ PATH_MAX ] = { 0 };
104 0 : char incr_path[ PATH_MAX ] = { 0 };
105 0 : ctx->local_full_fd = -1;
106 0 : ctx->local_incr_fd = -1;
107 0 : if( FD_LIKELY( -1!=fd_ssarchive_latest_pair( tile->snapld.snapshots_path, 1,
108 0 : &full_slot, &incr_slot,
109 0 : full_path, incr_path,
110 0 : &full_is_zstd, &incr_is_zstd ) ) ) {
111 0 : FD_TEST( full_slot!=ULONG_MAX );
112 :
113 0 : ctx->local_full_fd = open( full_path, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
114 0 : if( FD_UNLIKELY( -1==ctx->local_full_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", full_path, errno, fd_io_strerror( errno ) ));
115 :
116 0 : if( FD_LIKELY( incr_slot!=ULONG_MAX ) ) {
117 0 : ctx->local_incr_fd = open( incr_path, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
118 0 : if( FD_UNLIKELY( -1==ctx->local_incr_fd ) ) FD_LOG_ERR(( "open() failed `%s` (%i-%s)", incr_path, errno, fd_io_strerror( errno ) ));
119 0 : }
120 0 : }
121 :
122 : /* Create a temporary file descriptor for our socket file descriptor.
123 : It is closed later in unprivileged init so that the sandbox sees
124 : an existent file descriptor. */
125 0 : ctx->sockfd = memfd_create( "snapld.sockfd", 0 );
126 0 : if( FD_UNLIKELY( -1==ctx->sockfd ) ) FD_LOG_ERR(( "memfd_create() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
127 0 : }
128 :
129 : static ulong
130 : populate_allowed_fds( fd_topo_t const * topo,
131 : fd_topo_tile_t const * tile,
132 : ulong out_fds_cnt,
133 0 : int * out_fds ) {
134 0 : if( FD_UNLIKELY( out_fds_cnt<4UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
135 :
136 0 : ulong out_cnt = 0;
137 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
138 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
139 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd();
140 0 : }
141 :
142 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
143 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
144 0 : fd_snapld_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapld_tile_t), sizeof(fd_snapld_tile_t) );
145 0 : if( FD_LIKELY( -1!=ctx->local_full_fd ) ) out_fds[ out_cnt++ ] = ctx->local_full_fd;
146 0 : if( FD_LIKELY( -1!=ctx->local_incr_fd ) ) out_fds[ out_cnt++ ] = ctx->local_incr_fd;
147 0 : out_fds[ out_cnt++ ] = ctx->sockfd;
148 :
149 0 : return out_cnt;
150 0 : }
151 :
152 : static ulong
153 : populate_allowed_seccomp( fd_topo_t const * topo,
154 : fd_topo_tile_t const * tile,
155 : ulong out_cnt,
156 0 : struct sock_filter * out ) {
157 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
158 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
159 0 : fd_snapld_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapld_tile_t), sizeof(fd_snapld_tile_t) );
160 :
161 0 : populate_sock_filter_policy_fd_snapld_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->local_full_fd, (uint)ctx->local_incr_fd, (uint)ctx->sockfd );
162 0 : return sock_filter_policy_fd_snapld_tile_instr_cnt;
163 0 : }
164 :
165 : static void
166 : unprivileged_init( fd_topo_t * topo,
167 0 : fd_topo_tile_t * tile ) {
168 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
169 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
170 0 : fd_snapld_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapld_tile_t), sizeof(fd_snapld_tile_t) );
171 :
172 0 : fd_memcpy( ctx->config.path, tile->snapld.snapshots_path, PATH_MAX );
173 :
174 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
175 :
176 0 : FD_TEST( tile->in_cnt==1UL );
177 0 : fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0 ] ];
178 0 : FD_TEST( 0==strcmp( in_link->name, "snapct_ld" ) );
179 0 : ctx->in_rd.base = fd_topo_obj_wksp_base( topo, in_link->dcache_obj_id );
180 :
181 0 : FD_TEST( tile->out_cnt==1UL );
182 0 : fd_topo_link_t const * out_link = &topo->links[ tile->out_link_id[ 0 ] ];
183 0 : FD_TEST( 0==strcmp( out_link->name, "snapld_dc" ) );
184 0 : ctx->out_dc.mem = fd_topo_obj_wksp_base( topo, out_link->dcache_obj_id );
185 0 : ctx->out_dc.chunk0 = fd_dcache_compact_chunk0( ctx->out_dc.mem, out_link->dcache );
186 0 : ctx->out_dc.wmark = fd_dcache_compact_wmark ( ctx->out_dc.mem, out_link->dcache, out_link->mtu );
187 0 : ctx->out_dc.chunk = ctx->out_dc.chunk0;
188 0 : ctx->out_dc.mtu = out_link->mtu;
189 :
190 0 : ctx->awaiting_ack = 0;
191 0 : ctx->awaiting_ack_sig = 0;
192 0 : ctx->is_https = 0;
193 :
194 : /* We can only close the temporary socket file descriptor after
195 : entering the sandbox because the sandbox checks all file
196 : descriptors are existent. */
197 0 : if( -1==close( ctx->sockfd ) ) FD_LOG_ERR((" close() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
198 0 : }
199 :
200 : static int
201 0 : should_shutdown( fd_snapld_tile_t * ctx ) {
202 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
203 0 : }
204 :
205 : static void
206 0 : metrics_write( fd_snapld_tile_t * ctx ) {
207 0 : #if FD_HAS_OPENSSL
208 0 : FD_MCNT_SET( SNAPLD, SSL_ALLOC_ERRORS, fd_ossl_alloc_errors );
209 0 : #endif
210 0 : FD_MGAUGE_SET( SNAPLD, STATE, (ulong)(ctx->state) );
211 0 : }
212 :
213 : static void
214 : after_credit( fd_snapld_tile_t * ctx,
215 : fd_stem_context_t * stem,
216 : int * opt_poll_in FD_PARAM_UNUSED,
217 0 : int * charge_busy ) {
218 0 : if( FD_UNLIKELY( ctx->awaiting_ack && ctx->state==FD_SNAPSHOT_STATE_FINISHING ) ) {
219 0 : ctx->awaiting_ack = 0;
220 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
221 0 : fd_stem_publish( stem, 0UL, ctx->awaiting_ack_sig, 0UL, 0UL, 0UL, 0UL, 0UL );
222 0 : return;
223 0 : }
224 :
225 0 : if( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) {
226 0 : fd_log_sleep( (long)1e6 );
227 0 : return;
228 0 : }
229 :
230 0 : uchar * out = fd_chunk_to_laddr( ctx->out_dc.mem, ctx->out_dc.chunk );
231 :
232 0 : if( ctx->load_file ) {
233 0 : long result = read( ctx->load_full ? ctx->local_full_fd : ctx->local_incr_fd, out, ctx->out_dc.mtu );
234 0 : if( FD_UNLIKELY( result<=0L ) ) {
235 0 : if( result==0L ) ctx->state = FD_SNAPSHOT_STATE_FINISHING;
236 0 : else if( FD_UNLIKELY( errno!=EAGAIN && errno!=EINTR ) ) {
237 0 : FD_LOG_WARNING(( "read() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
238 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
239 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
240 0 : }
241 0 : } else {
242 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out_dc.chunk, (ulong)result, 0UL, 0UL, 0UL );
243 0 : ctx->out_dc.chunk = fd_dcache_compact_next( ctx->out_dc.chunk, (ulong)result, ctx->out_dc.chunk0, ctx->out_dc.wmark );
244 0 : *charge_busy = 1;
245 0 : }
246 0 : } else {
247 0 : ulong data_len = ctx->out_dc.mtu;
248 0 : int result = fd_sshttp_advance( ctx->sshttp, &data_len, out, fd_log_wallclock() );
249 0 : switch( result ) {
250 0 : case FD_SSHTTP_ADVANCE_AGAIN:
251 0 : break;
252 0 : case FD_SSHTTP_ADVANCE_DATA: {
253 0 : if( FD_UNLIKELY( !ctx->sent_meta ) ) {
254 : /* On the first DATA return, the HTTP headers are available
255 : for use. We need to send this metadata downstream, but
256 : need to do so before any data frags. So, we copy any data
257 : we received with the headers (if any) to the next dcache
258 : chunk and then publish both in order. */
259 0 : ctx->sent_meta = 1;
260 0 : fd_ssctrl_meta_t * meta = (fd_ssctrl_meta_t *)out;
261 0 : ulong next_chunk = fd_dcache_compact_next( ctx->out_dc.chunk, sizeof(fd_ssctrl_meta_t), ctx->out_dc.chunk0, ctx->out_dc.wmark );
262 0 : memmove( fd_chunk_to_laddr( ctx->out_dc.mem, next_chunk ), out, data_len );
263 0 : meta->total_sz = fd_sshttp_content_len( ctx->sshttp );
264 0 : FD_TEST( meta->total_sz!=ULONG_MAX );
265 0 : fd_memcpy( meta->name, fd_sshttp_snapshot_name( ctx->sshttp ), PATH_MAX );
266 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_META, ctx->out_dc.chunk, sizeof(fd_ssctrl_meta_t), 0UL, 0UL, 0UL );
267 0 : ctx->out_dc.chunk = next_chunk;
268 0 : }
269 0 : if( FD_LIKELY( data_len!=0UL ) ) {
270 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_DATA, ctx->out_dc.chunk, data_len, 0UL, 0UL, 0UL );
271 0 : ctx->out_dc.chunk = fd_dcache_compact_next( ctx->out_dc.chunk, data_len, ctx->out_dc.chunk0, ctx->out_dc.wmark );
272 0 : }
273 0 : *charge_busy = 1;
274 0 : break;
275 0 : }
276 0 : case FD_SSHTTP_ADVANCE_DONE:
277 0 : ctx->state = FD_SNAPSHOT_STATE_FINISHING;
278 0 : break;
279 0 : case FD_SSHTTP_ADVANCE_ERROR:
280 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
281 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
282 0 : break;
283 0 : default: FD_LOG_ERR(( "unexpected fd_sshttp_advance result %d", result ));
284 0 : }
285 0 : }
286 0 : }
287 :
288 : static int
289 : returnable_frag( fd_snapld_tile_t * ctx,
290 : ulong in_idx FD_PARAM_UNUSED,
291 : ulong seq FD_PARAM_UNUSED,
292 : ulong sig,
293 : ulong chunk,
294 : ulong sz,
295 : ulong ctl FD_PARAM_UNUSED,
296 : ulong tsorig FD_PARAM_UNUSED,
297 : ulong tspub FD_PARAM_UNUSED,
298 0 : fd_stem_context_t * stem ) {
299 0 : switch( sig ) {
300 :
301 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
302 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR: {
303 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
304 0 : FD_TEST( sz==sizeof(fd_ssctrl_init_t) && sz<=ctx->out_dc.mtu );
305 0 : fd_ssctrl_init_t const * msg_in = fd_chunk_to_laddr_const( ctx->in_rd.base, chunk );
306 0 : ctx->load_full = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
307 0 : ctx->load_file = msg_in->file;
308 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
309 0 : ctx->sent_meta = 0;
310 0 : ctx->is_https = msg_in->is_https;
311 0 : if( ctx->load_file ) {
312 0 : if( FD_UNLIKELY( 0!=lseek( ctx->load_full ? ctx->local_full_fd : ctx->local_incr_fd, 0, SEEK_SET ) ) )
313 0 : FD_LOG_ERR(( "lseek(0) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
314 0 : } else {
315 0 : if( ctx->load_full ) fd_sshttp_init( ctx->sshttp, msg_in->addr, msg_in->hostname, msg_in->is_https, "/snapshot.tar.bz2", 17UL, fd_log_wallclock() );
316 0 : else fd_sshttp_init( ctx->sshttp, msg_in->addr, msg_in->hostname, msg_in->is_https, "/incremental-snapshot.tar.bz2", 29UL, fd_log_wallclock() );
317 0 : }
318 0 : fd_ssctrl_init_t * msg_out = fd_chunk_to_laddr( ctx->out_dc.mem, ctx->out_dc.chunk );
319 0 : fd_memcpy( msg_out, msg_in, sz );
320 0 : fd_stem_publish( stem, 0UL, sig, ctx->out_dc.chunk, sz, 0UL, 0UL, 0UL );
321 0 : ctx->out_dc.chunk = fd_dcache_compact_next( ctx->out_dc.chunk, ctx->out_dc.mtu, ctx->out_dc.chunk0, ctx->out_dc.wmark );
322 0 : return 0;
323 0 : }
324 :
325 0 : case FD_SNAPSHOT_MSG_CTRL_FAIL:
326 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ||
327 0 : ctx->state==FD_SNAPSHOT_STATE_FINISHING ||
328 0 : ctx->state==FD_SNAPSHOT_STATE_ERROR );
329 0 : fd_sshttp_cancel( ctx->sshttp );
330 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
331 0 : break;
332 :
333 0 : case FD_SNAPSHOT_MSG_CTRL_NEXT:
334 0 : case FD_SNAPSHOT_MSG_CTRL_DONE:
335 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ||
336 0 : ctx->state==FD_SNAPSHOT_STATE_FINISHING ||
337 0 : ctx->state==FD_SNAPSHOT_STATE_ERROR );
338 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_FINISHING ) ) {
339 : /* snapld should be in the finishing state when reading from a
340 : file or downloading from http. It is only allowed to still
341 : be in progress for shutting down an https connection. */
342 0 : FD_TEST( ctx->is_https );
343 : /* snapld might not be done with shutting down an https
344 : connection. Save the sig here and send the message when
345 : snapld is in the finishing state. */
346 0 : ctx->awaiting_ack = 1;
347 0 : ctx->awaiting_ack_sig = sig;
348 0 : return 0; /* return directly to avoid fowarding the message */
349 0 : }
350 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
351 0 : break;
352 :
353 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
354 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
355 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
356 0 : break;
357 :
358 : /* FD_SNAPSHOT_MSG_CTRL_ERROR and FD_SNAPSHOT_MSG_DATA are not possible */
359 0 : default: FD_LOG_ERR(( "invalid sig %lu", sig ));
360 0 : }
361 :
362 : /* Forward the control message down the pipeline */
363 0 : fd_stem_publish( stem, 0UL, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
364 :
365 0 : return 0;
366 0 : }
367 :
368 : /* Up to one frag from after_credit plus one from returnable_frag */
369 0 : #define STEM_BURST 2UL
370 :
371 0 : #define STEM_LAZY 1000L
372 :
373 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapld_tile_t
374 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapld_tile_t)
375 :
376 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
377 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
378 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
379 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
380 :
381 : #include "../../disco/stem/fd_stem.c"
382 :
383 : fd_topo_run_tile_t fd_tile_snapld = {
384 : .name = NAME,
385 : .populate_allowed_seccomp = populate_allowed_seccomp,
386 : .populate_allowed_fds = populate_allowed_fds,
387 : .scratch_align = scratch_align,
388 : .scratch_footprint = scratch_footprint,
389 : .loose_footprint = loose_footprint,
390 : .privileged_init = privileged_init,
391 : .unprivileged_init = unprivileged_init,
392 : .run = stem_run,
393 : .keep_host_networking = 1,
394 : .allow_connect = 1,
395 : .rlimit_file_cnt = 5UL, /* stderr, log, http, full/incr local files */
396 : };
397 :
398 : #undef NAME
|