Line data Source code
1 : #include "../../disco/topo/fd_topo.h"
2 : #include "../../disco/metrics/fd_metrics.h"
3 : #include "../../ballet/lthash/fd_lthash.h"
4 : #include "../../ballet/lthash/fd_lthash_adder.h"
5 : #include "../../util/pod/fd_pod.h"
6 : #include "../../vinyl/io/fd_vinyl_io.h"
7 : #include "../../vinyl/io/ur/fd_vinyl_io_ur_private.h"
8 : #include "../../vinyl/bstream/fd_vinyl_bstream.h"
9 : #include "../../util/io_uring/fd_io_uring_setup.h"
10 : #include "../../util/io_uring/fd_io_uring_register.h"
11 : #include "../../util/io_uring/fd_io_uring.h"
12 : #include "generated/fd_snaplh_tile_seccomp.h"
13 :
14 : #include "utils/fd_ssctrl.h"
15 : #include "utils/fd_vinyl_admin.h"
16 :
17 : #include <errno.h>
18 : #include <sys/stat.h> /* fstat */
19 : #include <fcntl.h> /* open */
20 : #include <unistd.h> /* close */
21 :
22 : #include "../../vinyl/io/ur/fd_vinyl_io_ur.h"
23 :
24 : #define NAME "snaplh"
25 :
26 : #define IN_CNT_MAX (2UL)
27 0 : #define IN_KIND_SNAPLV (0UL)
28 0 : #define IN_KIND_SNAPWH (1UL)
29 :
30 : #define VINYL_LTHASH_BLOCK_ALIGN FD_VINYL_BSTREAM_BLOCK_SZ
31 0 : #define VINYL_LTHASH_BLOCK_MAX_SZ (16UL<<20)
32 : FD_STATIC_ASSERT( VINYL_LTHASH_BLOCK_MAX_SZ>(sizeof(fd_snapshot_full_account_t)+FD_VINYL_BSTREAM_BLOCK_SZ+2*VINYL_LTHASH_BLOCK_ALIGN), "VINYL_LTHASH_BLOCK_MAX_SZ" );
33 :
34 0 : #define VINYL_LTHASH_RD_REQ_MAX (32UL)
35 0 : #define VINYL_LTHASH_IORING_DEPTH (2*VINYL_LTHASH_RD_REQ_MAX)
36 :
37 0 : #define VINYL_LTHASH_IO_SPAD_MAX (2<<20UL)
38 :
39 0 : #define VINYL_LTHASH_RD_REQ_FREE (0UL)
40 0 : #define VINYL_LTHASH_RD_REQ_PEND (1UL)
41 0 : #define VINYL_LTHASH_RD_REQ_SENT (2UL)
42 :
43 : struct in_link_private {
44 : fd_wksp_t * wksp;
45 : ulong chunk0;
46 : ulong wmark;
47 : ulong mtu;
48 : void const * base;
49 : ulong * seq_sync; /* fseq->seq[0] */
50 : };
51 : typedef struct in_link_private in_link_t;
52 :
53 : struct out_link_private {
54 : fd_wksp_t * wksp;
55 : ulong chunk0;
56 : ulong wmark;
57 : ulong chunk;
58 : ulong mtu;
59 : };
60 : typedef struct out_link_private out_link_t;
61 :
62 : struct fd_snaplh_tile {
63 : uint state;
64 : int full;
65 :
66 : ulong seed;
67 : ulong lthash_tile_cnt;
68 : ulong lthash_tile_idx;
69 : ulong lthash_tile_add_cnt;
70 : ulong lthash_tile_sub_cnt;
71 : ulong lthash_tile_add_idx;
72 : ulong lthash_tile_sub_idx;
73 : ulong pairs_seen;
74 : ulong lthash_req_seen;
75 :
76 : /* Database params */
77 : ulong const * io_seed;
78 :
79 : fd_lthash_adder_t adder[1];
80 : fd_lthash_adder_t adder_sub[1];
81 : uchar data[FD_RUNTIME_ACC_SZ_MAX];
82 :
83 : fd_lthash_value_t running_lthash;
84 : fd_lthash_value_t running_lthash_sub;
85 :
86 : struct {
87 : int dev_fd;
88 : ulong dev_sz;
89 : ulong dev_base;
90 : void * pair_mem;
91 : void * pair_tmp;
92 :
93 : struct {
94 : fd_vinyl_bstream_phdr_t phdr [VINYL_LTHASH_RD_REQ_MAX];
95 : fd_vinyl_io_rd_t rd_req[VINYL_LTHASH_RD_REQ_MAX];
96 : } pending;
97 : ulong pending_rd_req_cnt;
98 :
99 : fd_vinyl_io_t * io;
100 : fd_vinyl_admin_t * admin;
101 : } vinyl;
102 :
103 : struct {
104 : struct {
105 : ulong accounts_hashed;
106 : } full;
107 :
108 : struct {
109 : ulong accounts_hashed;
110 : } incremental;
111 : } metrics;
112 :
113 : ulong wh_finish_fseq;
114 : ulong wh_last_in_seq;
115 :
116 : in_link_t in[IN_CNT_MAX];
117 : uchar in_kind[IN_CNT_MAX];
118 : out_link_t out;
119 :
120 : int lthash_completion_pending;
121 : int fail_completion_pending;
122 :
123 : /* io_uring setup */
124 :
125 : fd_io_uring_t ioring[1];
126 : int io_uring_enabled;
127 : };
128 :
129 : typedef struct fd_snaplh_tile fd_snaplh_t;
130 :
131 : static inline int
132 0 : should_shutdown( fd_snaplh_t * ctx ) {
133 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
134 0 : }
135 :
136 : static ulong
137 0 : scratch_align( void ) {
138 0 : return alignof(fd_snaplh_t);
139 0 : }
140 :
141 : static ulong
142 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
143 0 : (void)tile;
144 0 : ulong l = FD_LAYOUT_INIT;
145 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
146 0 : l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ );
147 0 : l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ );
148 0 : l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ );
149 0 : l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ );
150 0 : l = FD_LAYOUT_APPEND( l, fd_vinyl_io_ur_align(), fd_vinyl_io_ur_footprint(VINYL_LTHASH_IO_SPAD_MAX) );
151 0 : l = FD_LAYOUT_APPEND( l, fd_io_uring_shmem_align(), fd_io_uring_shmem_footprint( VINYL_LTHASH_IORING_DEPTH, VINYL_LTHASH_IORING_DEPTH ) );
152 0 : return FD_LAYOUT_FINI( l, alignof(fd_snaplh_t) );
153 0 : }
154 :
155 : static void
156 0 : metrics_write( fd_snaplh_t * ctx ) {
157 0 : FD_MGAUGE_SET( SNAPLH, FULL_ACCOUNTS_HASHED, ctx->metrics.full.accounts_hashed );
158 0 : FD_MGAUGE_SET( SNAPLH, INCREMENTAL_ACCOUNTS_HASHED, ctx->metrics.incremental.accounts_hashed );
159 0 : FD_MGAUGE_SET( SNAPLH, STATE, (ulong)(ctx->state) );
160 0 : }
161 :
162 : static inline int
163 0 : should_hash_account( fd_snaplh_t * ctx ) {
164 0 : return (ctx->pairs_seen % ctx->lthash_tile_add_cnt)==ctx->lthash_tile_add_idx;
165 0 : }
166 :
167 : static inline int
168 0 : should_process_lthash_request( fd_snaplh_t * ctx ) {
169 0 : return (ctx->lthash_req_seen % ctx->lthash_tile_sub_cnt)==ctx->lthash_tile_sub_idx;
170 0 : }
171 :
172 : static void
173 : streamlined_hash( fd_snaplh_t * restrict ctx,
174 : fd_lthash_adder_t * restrict adder,
175 : fd_lthash_value_t * restrict running_lthash,
176 0 : uchar const * restrict _pair ) {
177 0 : uchar const * pair = _pair;
178 0 : fd_vinyl_bstream_phdr_t const * phdr = (fd_vinyl_bstream_phdr_t const *)pair;
179 0 : pair += sizeof(fd_vinyl_bstream_phdr_t);
180 0 : fd_account_meta_t const * meta = (fd_account_meta_t const *)pair;
181 0 : pair += sizeof(fd_account_meta_t);
182 0 : uchar const * data = pair;
183 :
184 0 : ulong data_len = meta->dlen;
185 0 : const char * pubkey = phdr->key.c;
186 0 : ulong lamports = meta->lamports;
187 0 : const uchar * owner = meta->owner;
188 0 : uchar executable = (uchar)( !meta->executable ? 0U : 1U) ;
189 :
190 0 : if( FD_UNLIKELY( data_len > FD_RUNTIME_ACC_SZ_MAX ) ) FD_LOG_ERR(( "Found unusually large account (data_sz=%lu), aborting", data_len ));
191 0 : if( FD_UNLIKELY( lamports==0UL ) ) return;
192 :
193 0 : fd_lthash_adder_push_solana_account( adder,
194 0 : running_lthash,
195 0 : pubkey,
196 0 : data,
197 0 : data_len,
198 0 : lamports,
199 0 : executable,
200 0 : owner );
201 :
202 0 : if( FD_LIKELY( ctx->full ) ) ctx->metrics.full.accounts_hashed++;
203 0 : else ctx->metrics.incremental.accounts_hashed++;
204 0 : }
205 :
206 : static void
207 : handle_vinyl_lthash_request_bd( fd_snaplh_t * ctx,
208 : ulong seq,
209 0 : fd_vinyl_bstream_phdr_t * acc_hdr ) {
210 :
211 : /* The bd version is blocking, therefore ctx->pending is not used. */
212 0 : ulong const io_seed = FD_VOLATILE_CONST( *ctx->io_seed );
213 :
214 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz( acc_hdr->ctl );
215 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
216 :
217 : /* dev_seq shows where the seq is physically located in device. */
218 0 : ulong dev_seq = ( seq + ctx->vinyl.dev_base ) % ctx->vinyl.dev_sz;
219 0 : ulong rd_off = fd_ulong_align_dn( dev_seq, FD_VINYL_BSTREAM_BLOCK_SZ );
220 0 : ulong pair_off = (dev_seq - rd_off);
221 0 : ulong rd_sz = fd_ulong_align_up( pair_off + pair_sz, FD_VINYL_BSTREAM_BLOCK_SZ );
222 0 : FD_TEST( rd_sz < VINYL_LTHASH_BLOCK_MAX_SZ );
223 :
224 0 : uchar * pair = ((uchar*)ctx->vinyl.pair_mem) + pair_off;
225 0 : fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)pair;
226 :
227 0 : for(;;) {
228 0 : ulong sz = rd_sz;
229 0 : ulong rsz = fd_ulong_min( rd_sz, ctx->vinyl.dev_sz - rd_off );
230 0 : uchar * dst = ctx->vinyl.pair_mem;
231 0 : uchar * tmp = ctx->vinyl.pair_tmp;
232 :
233 0 : bd_read( ctx->vinyl.dev_fd, rd_off, dst, rsz );
234 0 : sz -= rsz;
235 0 : if( FD_UNLIKELY( sz ) ) {
236 : /* When the dev wraps around, the dev_base needs to be skipped.
237 : This means: increase the size multiple of the alignment,
238 : read into a temporary buffer, and memcpy into the dst at the
239 : correct offset. */
240 0 : bd_read( ctx->vinyl.dev_fd, 0, tmp, sz + FD_VINYL_BSTREAM_BLOCK_SZ );
241 0 : fd_memcpy( dst + rsz, tmp + ctx->vinyl.dev_base, sz );
242 0 : }
243 :
244 0 : if( FD_LIKELY( !memcmp( phdr, acc_hdr, sizeof(fd_vinyl_bstream_phdr_t)) ) ) {
245 :
246 : /* test bstream pair integrity hashes */
247 0 : int test = !fd_vinyl_bstream_pair_test( io_seed, seq, (fd_vinyl_bstream_block_t *)pair, pair_sz );
248 0 : if( FD_LIKELY( test ) ) break;
249 0 : }
250 0 : FD_LOG_WARNING(( "phdr mismatch! - this should not happen under bstream_seq" ));
251 0 : FD_SPIN_PAUSE();
252 0 : }
253 :
254 0 : streamlined_hash( ctx, ctx->adder_sub, &ctx->running_lthash_sub, pair );
255 0 : }
256 :
257 : FD_FN_UNUSED static inline ulong
258 0 : rd_req_ctx_get_idx( ulong rd_req_ctx ) {
259 0 : return ( rd_req_ctx >> 0 ) & ((1UL<<32)-1UL);
260 0 : }
261 :
262 : FD_FN_UNUSED static inline ulong
263 0 : rd_req_ctx_get_status( ulong rd_req_ctx ) {
264 0 : return ( rd_req_ctx >> 32 ) & ((1UL<<32)-1UL);
265 0 : }
266 :
267 : FD_FN_UNUSED static inline void
268 : rd_req_ctx_into_parts( ulong rd_req_ctx,
269 : ulong * idx,
270 0 : ulong * status ) {
271 0 : *idx = rd_req_ctx_get_idx( rd_req_ctx );
272 0 : *status = rd_req_ctx_get_status( rd_req_ctx );
273 0 : }
274 :
275 : FD_FN_UNUSED static inline ulong
276 : rd_req_ctx_from_parts( ulong idx,
277 0 : ulong status ) {
278 0 : return ( idx & ((1UL<<32)-1UL) ) | ( status << 32 );
279 0 : }
280 :
281 : FD_FN_UNUSED static inline ulong
282 : rd_req_ctx_update_status( ulong rd_req_ctx,
283 0 : ulong status ) {
284 0 : return rd_req_ctx_from_parts( rd_req_ctx_get_idx( rd_req_ctx ), status );
285 0 : }
286 :
287 : static void
288 : handle_vinyl_lthash_compute_from_rd_req( fd_snaplh_t * ctx,
289 0 : fd_vinyl_io_rd_t * rd_req ) {
290 0 : ulong idx = rd_req_ctx_get_idx( rd_req->ctx );
291 :
292 0 : fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)rd_req->dst;
293 0 : fd_vinyl_bstream_phdr_t * acc_hdr = &ctx->vinyl.pending.phdr[ idx ];
294 :
295 : /* test the retrieved header (it must mach the request) */
296 0 : FD_TEST( !memcmp( phdr, acc_hdr, sizeof(fd_vinyl_bstream_phdr_t)) );
297 :
298 0 : ulong const io_seed = FD_VOLATILE_CONST( *ctx->io_seed );
299 0 : ulong seq = rd_req->seq;
300 0 : uchar * pair = (uchar*)rd_req->dst;
301 0 : ulong pair_sz = rd_req->sz;
302 :
303 : /* test the bstream pair integrity hashes */
304 0 : FD_TEST( !fd_vinyl_bstream_pair_test( io_seed, seq, (fd_vinyl_bstream_block_t *)pair, pair_sz ) );
305 :
306 0 : streamlined_hash( ctx, ctx->adder_sub, &ctx->running_lthash_sub, pair );
307 0 : }
308 :
309 : /* Process next read completion */
310 :
311 : static inline ulong
312 0 : consume_available_cqe( fd_snaplh_t * ctx ) {
313 0 : if( FD_LIKELY( !ctx->vinyl.pending_rd_req_cnt ) ) return 0UL;
314 0 : if( FD_UNLIKELY( !ctx->io_uring_enabled ) ) return 0UL;
315 0 : if( !fd_io_uring_cq_ready( ctx->ioring->cq ) ) return 0UL;
316 :
317 : /* At this point, there is at least one unconsumed CQE */
318 :
319 0 : fd_vinyl_io_rd_t * rd_req = NULL;
320 0 : if( FD_LIKELY( fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, 0/*non blocking*/ )==FD_VINYL_SUCCESS ) ) {
321 0 : handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
322 0 : rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
323 0 : rd_req->seq = ULONG_MAX;
324 0 : rd_req->sz = 0UL;
325 0 : ctx->vinyl.pending_rd_req_cnt--;
326 0 : return 1UL;
327 0 : }
328 0 : return 0UL;
329 0 : }
330 :
331 : static void
332 : handle_vinyl_lthash_request_ur( fd_snaplh_t * ctx,
333 : ulong seq,
334 0 : fd_vinyl_bstream_phdr_t * acc_hdr ) {
335 : /* Find a free slot */
336 0 : ulong free_i = ULONG_MAX;
337 0 : if( FD_LIKELY( ctx->vinyl.pending_rd_req_cnt<VINYL_LTHASH_RD_REQ_MAX ) ) {
338 0 : for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
339 0 : fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
340 0 : if( FD_UNLIKELY( rd_req_ctx_get_status( rd_req->ctx )==VINYL_LTHASH_RD_REQ_FREE ) ) {
341 0 : free_i = i;
342 0 : break;
343 0 : }
344 0 : }
345 0 : } else {
346 0 : fd_vinyl_io_rd_t * rd_req = NULL;
347 0 : fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, FD_VINYL_IO_FLAG_BLOCKING );
348 0 : FD_TEST( rd_req!=NULL );
349 0 : handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
350 0 : rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
351 0 : rd_req->seq = ULONG_MAX;
352 0 : rd_req->sz = 0UL;
353 0 : free_i = rd_req_ctx_get_idx( rd_req->ctx );
354 0 : ctx->vinyl.pending_rd_req_cnt--;
355 0 : }
356 0 : FD_CRIT( free_i<VINYL_LTHASH_RD_REQ_MAX, "read request free index exceeds max value" );
357 :
358 : /* Populate the empty slot and submit */
359 0 : fd_vinyl_bstream_phdr_t * in_phdr = &ctx->vinyl.pending.phdr[ free_i ];
360 0 : *in_phdr = *acc_hdr;
361 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz( acc_hdr->ctl );
362 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
363 :
364 : /* Fixup io addressable range */
365 0 : fd_vinyl_io_t * io = ctx->vinyl.io;
366 0 : io->seq_past = fd_ulong_align_dn( seq, FD_VINYL_BSTREAM_BLOCK_SZ );
367 0 : io->seq_present = fd_ulong_align_up( seq+pair_sz, FD_VINYL_BSTREAM_BLOCK_SZ );
368 0 : if( io->type==FD_VINYL_IO_TYPE_UR ) {
369 0 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io;
370 0 : ur->seq_clean = ur->seq_cache = ur->seq_write = io->seq_present;
371 0 : }
372 :
373 0 : fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ free_i ];
374 0 : rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_PEND );
375 0 : rd_req->seq = seq;
376 0 : rd_req->sz = pair_sz;
377 0 : fd_vinyl_io_read( ctx->vinyl.io, rd_req );
378 0 : rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_SENT );
379 0 : ctx->vinyl.pending_rd_req_cnt++;
380 0 : }
381 :
382 : static void
383 0 : handle_vinyl_lthash_request_ur_consume_all( fd_snaplh_t * ctx ) {
384 0 : while( ctx->vinyl.pending_rd_req_cnt ) {
385 0 : fd_vinyl_io_rd_t * rd_req = NULL;
386 0 : fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, FD_VINYL_IO_FLAG_BLOCKING );
387 0 : FD_TEST( rd_req!=NULL );
388 0 : handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
389 0 : rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
390 0 : rd_req->seq = ULONG_MAX;
391 0 : rd_req->sz = 0UL;
392 0 : ctx->vinyl.pending_rd_req_cnt--;
393 0 : }
394 0 : FD_CRIT( !ctx->vinyl.pending_rd_req_cnt, "pending read requests count not zero" );
395 0 : for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
396 0 : fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
397 0 : FD_CRIT( rd_req_ctx_get_status( rd_req->ctx )==VINYL_LTHASH_RD_REQ_FREE, "pending request status is not free" );
398 0 : }
399 0 : }
400 :
401 : static void
402 : handle_lthash_completion( fd_snaplh_t * ctx,
403 0 : fd_stem_context_t * stem ) {
404 0 : if( FD_LIKELY( !ctx->lthash_completion_pending ) ) return;
405 :
406 0 : if( fd_seq_inc( ctx->wh_last_in_seq, 1UL )>=ctx->wh_finish_fseq ) {
407 0 : fd_lthash_adder_flush( ctx->adder, &ctx->running_lthash );
408 0 : fd_lthash_adder_flush( ctx->adder_sub, &ctx->running_lthash_sub );
409 0 : fd_lthash_sub( &ctx->running_lthash, &ctx->running_lthash_sub );
410 0 : uchar * lthash_out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk );
411 0 : fd_memcpy( lthash_out, &ctx->running_lthash, sizeof(fd_lthash_value_t) );
412 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_HASH_MSG_RESULT_ADD, ctx->out.chunk, FD_LTHASH_LEN_BYTES, 0UL, 0UL, 0UL );
413 0 : ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, FD_LTHASH_LEN_BYTES, ctx->out.chunk0, ctx->out.wmark );
414 0 : ctx->lthash_completion_pending = 0;
415 0 : }
416 0 : }
417 :
418 : static void
419 : handle_fail_completion( fd_snaplh_t * ctx,
420 0 : fd_stem_context_t * stem ) {
421 0 : if( FD_LIKELY( !ctx->fail_completion_pending ) ) return;
422 :
423 0 : if( fd_seq_inc( ctx->wh_last_in_seq, 1UL )>=ctx->wh_finish_fseq ) {
424 0 : fd_lthash_adder_flush( ctx->adder, &ctx->running_lthash );
425 0 : fd_lthash_adder_flush( ctx->adder_sub, &ctx->running_lthash_sub );
426 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_MSG_CTRL_FAIL, 0UL, 0UL, 0UL, 0UL, 0UL );
427 0 : ctx->fail_completion_pending = 0;
428 0 : }
429 0 : }
430 :
431 : static void
432 : before_credit( fd_snaplh_t * ctx,
433 : fd_stem_context_t * stem FD_PARAM_UNUSED,
434 0 : int * charge_busy ) {
435 0 : *charge_busy = !!consume_available_cqe( ctx );
436 0 : }
437 :
438 : static void
439 : handle_wh_data_frag( fd_snaplh_t * ctx,
440 : ulong in_idx,
441 : ulong chunk, /* compressed input pointer */
442 : ulong sz_comp, /* compressed input size */
443 0 : fd_stem_context_t * stem ) {
444 0 : FD_CRIT( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH, "incorrect in kind" );
445 :
446 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
447 : /* skip all wh data frags when in error state. */
448 0 : return;
449 0 : }
450 0 : if( FD_UNLIKELY( ctx->fail_completion_pending ) ) {
451 : /* handle_fail_completion may succeed (complete) either when the
452 : control frag that triggers it is received (conditional upon
453 : having no pending wh data frags) or after all wh data frags have
454 : been received and processed. Once the fail control message
455 : is received, the state transitions into idle. */
456 0 : handle_fail_completion( ctx, stem );
457 0 : return;
458 0 : }
459 :
460 0 : uchar const * rem = fd_chunk_to_laddr_const( ctx->in[ in_idx ].base, chunk );
461 0 : ulong rem_sz = sz_comp<<FD_VINYL_BSTREAM_BLOCK_LG_SZ;
462 0 : FD_CRIT( fd_ulong_is_aligned( (ulong)rem, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
463 0 : FD_CRIT( fd_ulong_is_aligned( rem_sz, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
464 :
465 0 : while( rem_sz ) {
466 0 : FD_CRIT( rem_sz>=FD_VINYL_BSTREAM_BLOCK_SZ, "corrupted bstream block" );
467 :
468 0 : fd_vinyl_bstream_phdr_t const * phdr = (fd_vinyl_bstream_phdr_t *)rem;
469 0 : ulong ctl = phdr->ctl;
470 0 : int ctl_type = fd_vinyl_bstream_ctl_type( ctl );
471 0 : switch( ctl_type ) {
472 :
473 0 : case FD_VINYL_BSTREAM_CTL_TYPE_PAIR: {
474 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz( ctl );
475 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
476 0 : if( FD_LIKELY( should_hash_account( ctx ) ) ) {
477 0 : uchar * pair = ctx->vinyl.pair_mem;
478 0 : fd_memcpy( pair, rem, pair_sz );
479 0 : streamlined_hash( ctx, ctx->adder, &ctx->running_lthash, pair );
480 0 : }
481 0 : rem += pair_sz;
482 0 : rem_sz -= pair_sz;
483 0 : ctx->pairs_seen++;
484 0 : break;
485 0 : }
486 :
487 0 : case FD_VINYL_BSTREAM_CTL_TYPE_ZPAD: {
488 0 : rem += FD_VINYL_BSTREAM_BLOCK_SZ;
489 0 : rem_sz -= FD_VINYL_BSTREAM_BLOCK_SZ;
490 0 : break;
491 0 : }
492 :
493 0 : default:
494 0 : FD_LOG_CRIT(( "unexpected vinyl bstream block ctl=%016lx", ctl ));
495 0 : }
496 0 : }
497 :
498 0 : if( ctx->state==FD_SNAPSHOT_STATE_FINISHING ) {
499 : /* handle_lthash_completion may succeed (complete) either when the
500 : control frag that triggers it is received (conditional upon
501 : having no pending wh data frags) or after all wh data frags have
502 : been received and processed. */
503 0 : handle_lthash_completion( ctx, stem );
504 0 : }
505 0 : }
506 :
507 : static void
508 : handle_lv_data_frag( fd_snaplh_t * ctx,
509 : ulong in_idx,
510 0 : ulong chunk ) { /* compressed input pointer */
511 :
512 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
513 : /* skip all lv data frags when in error state. */
514 0 : return;
515 0 : }
516 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) ) {
517 0 : FD_LOG_ERR(( "invalid state for lv data frag %u", ctx->state ));
518 0 : return;
519 0 : }
520 :
521 0 : if( FD_LIKELY( should_process_lthash_request( ctx ) ) ) {
522 0 : uchar const * indata = fd_chunk_to_laddr_const( ctx->in[ in_idx ].wksp, chunk );
523 0 : ulong seq;
524 0 : fd_vinyl_bstream_phdr_t acc_hdr[1];
525 0 : memcpy( &seq, indata, sizeof(ulong) );
526 0 : memcpy( acc_hdr, indata + sizeof(ulong), sizeof(fd_vinyl_bstream_phdr_t) );
527 0 : if( FD_LIKELY( ctx->io_uring_enabled ) ) {
528 0 : handle_vinyl_lthash_request_ur( ctx, seq, acc_hdr );
529 0 : } else {
530 0 : handle_vinyl_lthash_request_bd( ctx, seq, acc_hdr );
531 0 : }
532 0 : }
533 0 : ctx->lthash_req_seen++;
534 0 : }
535 :
536 : static inline ulong
537 : tsorig_tspub_to_fseq( ulong tsorig,
538 0 : ulong tspub ) {
539 0 : return (tspub<<32 ) | tsorig;
540 0 : }
541 :
542 : static void
543 : handle_control_frag( fd_snaplh_t * ctx,
544 : ulong sig,
545 : ulong tsorig,
546 : ulong tspub,
547 0 : fd_stem_context_t * stem ) {
548 0 : if( ctx->state==FD_SNAPSHOT_STATE_ERROR && sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) {
549 : /* Control messages move along the snapshot load pipeline. Since
550 : error conditions can be triggered by any tile in the pipeline,
551 : it is possible to be in error state and still receive otherwise
552 : valid messages. Only a fail message can revert this. */
553 0 : return;
554 0 : };
555 :
556 0 : switch( sig ) {
557 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
558 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR: {
559 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
560 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
561 0 : ctx->full = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
562 0 : fd_lthash_zero( &ctx->running_lthash );
563 0 : fd_lthash_zero( &ctx->running_lthash_sub );
564 0 : fd_lthash_adder_new( ctx->adder );
565 0 : fd_lthash_adder_new( ctx->adder_sub );
566 0 : break;
567 0 : }
568 :
569 0 : case FD_SNAPSHOT_MSG_CTRL_FINI: {
570 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING );
571 0 : ctx->state = FD_SNAPSHOT_STATE_FINISHING;
572 0 : ctx->wh_finish_fseq = tsorig_tspub_to_fseq( tsorig, tspub );
573 0 : if( FD_LIKELY( ctx->io_uring_enabled ) ) {
574 0 : handle_vinyl_lthash_request_ur_consume_all( ctx );
575 0 : }
576 0 : ctx->lthash_completion_pending = 1;
577 : /* handle_lthash_completion may succeed (complete) either here
578 : (if there are no pending wh data frags) or after all wh data
579 : frags have been received and processed. */
580 0 : handle_lthash_completion( ctx, stem );
581 0 : break;
582 0 : }
583 :
584 0 : case FD_SNAPSHOT_MSG_CTRL_NEXT:
585 0 : case FD_SNAPSHOT_MSG_CTRL_DONE: {
586 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
587 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
588 0 : break;
589 0 : }
590 :
591 0 : case FD_SNAPSHOT_MSG_CTRL_ERROR: {
592 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
593 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
594 0 : break;
595 0 : }
596 :
597 0 : case FD_SNAPSHOT_MSG_CTRL_FAIL: {
598 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
599 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
600 0 : ctx->wh_finish_fseq = tsorig_tspub_to_fseq( tsorig, tspub );
601 0 : if( FD_LIKELY( ctx->io_uring_enabled ) ) {
602 0 : handle_vinyl_lthash_request_ur_consume_all( ctx );
603 0 : }
604 0 : ctx->fail_completion_pending = 1;
605 : /* handle_fail_completion may succeed (complete) either here (if
606 : there are no pending wh data frags) or after all wh data frags
607 : have been received and processed. */
608 0 : handle_fail_completion( ctx, stem );
609 0 : break;
610 0 : }
611 :
612 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
613 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
614 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
615 0 : break;
616 :
617 0 : default: {
618 0 : FD_LOG_ERR(( "unexpected control sig %lu", sig ));
619 0 : break;
620 0 : }
621 0 : }
622 0 : }
623 :
624 : static inline int
625 : returnable_frag( fd_snaplh_t * ctx,
626 : ulong in_idx,
627 : ulong seq,
628 : ulong sig,
629 : ulong chunk,
630 : ulong sz,
631 : ulong ctl,
632 : ulong tsorig,
633 : ulong tspub,
634 0 : fd_stem_context_t * stem ) {
635 0 : (void)sz; (void)ctl;
636 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
637 :
638 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH ) ) handle_wh_data_frag( ctx, in_idx, chunk, tsorig, stem );
639 0 : else if( FD_UNLIKELY( sig==FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH ) ) handle_lv_data_frag( ctx, in_idx, chunk );
640 0 : else handle_control_frag( ctx, sig, tsorig, tspub, stem );
641 :
642 : /* Because fd_stem may not return flow control credits fast enough,
643 : always update fseq (consumer progress) here. */
644 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH ) ) {
645 0 : ctx->wh_last_in_seq = seq;
646 0 : fd_fseq_update( ctx->in[ in_idx ].seq_sync, fd_seq_inc( seq, 1UL ) );
647 0 : }
648 :
649 0 : return 0;
650 0 : }
651 :
652 : static ulong
653 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
654 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
655 : ulong out_fds_cnt,
656 0 : int * out_fds ) {
657 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
658 :
659 0 : ulong out_cnt = 0;
660 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
661 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
662 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
663 0 : }
664 :
665 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
666 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
667 0 : fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
668 :
669 0 : out_fds[ out_cnt++ ] = ctx->vinyl.dev_fd;
670 :
671 0 : if( FD_LIKELY( ctx->ioring->ioring_fd>=0 ) ) {
672 0 : out_fds[ out_cnt++ ] = ctx->ioring->ioring_fd;
673 0 : }
674 :
675 0 : return out_cnt;
676 0 : }
677 :
678 : static void
679 0 : during_housekeeping( fd_snaplh_t * ctx ) {
680 :
681 : /* Service io_uring instance */
682 :
683 0 : if( FD_LIKELY( ctx->io_uring_enabled ) ) {
684 0 : uint sq_drops = fd_io_uring_sq_dropped( ctx->ioring->sq );
685 0 : if( FD_UNLIKELY( sq_drops ) ) {
686 0 : FD_LOG_CRIT(( "kernel io_uring dropped I/O requests, cannot continue (sq_dropped=%u)", sq_drops ));
687 0 : }
688 :
689 0 : uint cq_drops = fd_io_uring_cq_overflow( ctx->ioring->cq );
690 0 : if( FD_UNLIKELY( cq_drops ) ) {
691 0 : FD_LOG_CRIT(( "kernel io_uring dropped I/O completions, cannot continue (cq_overflow=%u)", cq_drops ));
692 0 : }
693 0 : }
694 :
695 0 : }
696 :
697 : static ulong
698 : populate_allowed_seccomp( fd_topo_t const * topo,
699 : fd_topo_tile_t const * tile,
700 : ulong out_cnt,
701 0 : struct sock_filter * out ) {
702 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
703 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
704 0 : fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
705 :
706 0 : populate_sock_filter_policy_fd_snaplh_tile( out_cnt, out,
707 0 : (uint)fd_log_private_logfile_fd(),
708 0 : (uint)ctx->vinyl.dev_fd,
709 0 : (uint)ctx->ioring->ioring_fd /* possibly -1 */ );
710 0 : return sock_filter_policy_fd_snaplh_tile_instr_cnt;
711 0 : }
712 :
713 : static fd_vinyl_io_t *
714 : snaplh_io_uring_init( fd_snaplh_t * ctx,
715 : void * uring_shmem,
716 : void * vinyl_io_ur_mem,
717 0 : int dev_fd ) {
718 0 : ulong const uring_depth = VINYL_LTHASH_IORING_DEPTH;
719 0 : fd_io_uring_params_t params[1];
720 0 : fd_io_uring_params_init( params, uring_depth );
721 :
722 0 : if( FD_UNLIKELY( !fd_io_uring_init_shmem( ctx->ioring, params, uring_shmem, uring_depth, uring_depth ) ) ) {
723 0 : FD_LOG_ERR(( "fd_io_uring_init_shmem failed (%i-%s)", errno, fd_io_strerror( errno ) ));
724 0 : }
725 0 : fd_io_uring_t * ioring = ctx->ioring;
726 :
727 0 : if( FD_UNLIKELY( fd_io_uring_register_files( ioring->ioring_fd, &dev_fd, 1 )<0 ) ) {
728 0 : FD_LOG_ERR(( "io_uring_register_files failed (%i-%s)", errno, fd_io_strerror( errno ) ));
729 0 : }
730 :
731 0 : fd_io_uring_restriction_t res[3] = {
732 0 : { .opcode = FD_IORING_RESTRICTION_SQE_OP,
733 0 : .sqe_op = IORING_OP_READ },
734 0 : { .opcode = FD_IORING_RESTRICTION_SQE_FLAGS_REQUIRED,
735 0 : .sqe_flags = IOSQE_FIXED_FILE },
736 0 : { .opcode = FD_IORING_RESTRICTION_SQE_FLAGS_ALLOWED,
737 0 : .sqe_flags = 0 }
738 0 : };
739 0 : if( FD_UNLIKELY( fd_io_uring_register_restrictions( ioring->ioring_fd, res, 3U )<0 ) ) {
740 0 : FD_LOG_ERR(( "io_uring_register_restrictions failed (%i-%s)", errno, fd_io_strerror( errno ) ));
741 0 : }
742 :
743 0 : if( FD_UNLIKELY( fd_io_uring_enable_rings( ioring->ioring_fd )<0 ) ) {
744 0 : FD_LOG_ERR(( "io_uring_enable_rings failed (%i-%s)", errno, fd_io_strerror( errno ) ));
745 0 : }
746 :
747 0 : ulong align = fd_vinyl_io_ur_align();
748 0 : FD_TEST( fd_ulong_is_pow2( align ) );
749 :
750 0 : ulong footprint = fd_vinyl_io_ur_footprint( VINYL_LTHASH_IO_SPAD_MAX );
751 0 : FD_TEST( fd_ulong_is_aligned( footprint, align ) );
752 :
753 : /* Before invoking fd_vinyl_io_ur_init, the sync block must be
754 : already available. Although in principle one could keep
755 : calling fd_vinyl_io_ur_init until it returns !=NULL, doing this
756 : would log uncessary (and misleading) warnings. */
757 0 : FD_LOG_INFO(( "waiting for account database creation" ));
758 0 : for(;;) {
759 0 : fd_vinyl_bstream_block_t block[1];
760 0 : ulong dev_sync = 0UL; /* Use the beginning of the file for the sync block */
761 0 : bd_read( dev_fd, dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ );
762 0 : int type = fd_vinyl_bstream_ctl_type( block->sync.ctl );
763 0 : if( FD_UNLIKELY( type != FD_VINYL_BSTREAM_CTL_TYPE_SYNC ) ) continue;
764 0 : ulong io_seed = block->sync.hash_trail;
765 0 : if( FD_LIKELY( !fd_vinyl_bstream_block_test( io_seed, block ) ) ) break;
766 0 : fd_log_sleep( 1e6 ); /* 1ms */
767 0 : }
768 0 : FD_LOG_INFO(( "found valid account database sync block, attaching ..." ));
769 :
770 0 : fd_vinyl_io_t * io = fd_vinyl_io_ur_init( vinyl_io_ur_mem, VINYL_LTHASH_IO_SPAD_MAX, dev_fd, ioring );
771 0 : if( FD_UNLIKELY( !io ) ) FD_LOG_ERR(( "vinyl_io_ur_init failed" ));
772 0 : return io;
773 0 : }
774 :
775 : static void
776 : privileged_init( fd_topo_t * topo,
777 0 : fd_topo_tile_t * tile ) {
778 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
779 :
780 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
781 0 : fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
782 0 : void * pair_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ ); (void)pair_mem;
783 0 : void * pair_tmp = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ ); (void)pair_tmp;
784 0 : void * rd_req_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ ); (void)rd_req_mem;
785 0 : void * uring_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_vinyl_io_ur_align(), fd_vinyl_io_ur_footprint(VINYL_LTHASH_IO_SPAD_MAX) );
786 0 : void * uring_shmem = FD_SCRATCH_ALLOC_APPEND( l, fd_io_uring_shmem_align(), fd_io_uring_shmem_footprint( VINYL_LTHASH_IORING_DEPTH, VINYL_LTHASH_IORING_DEPTH ) );
787 :
788 0 : FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
789 :
790 : /* Set up io_bd dependencies */
791 :
792 0 : char const * bstream_path = tile->snaplh.vinyl_path;
793 : /* Note: it would be possible to use O_DIRECT, but it would require
794 : VINYL_LTHASH_BLOCK_ALIGN to be 4096UL, which substantially
795 : increases the read overhead, making it slower (keep in mind that
796 : a rather large subset of mainnet accounts typically fits inside
797 : one FD_VINYL_BSTREAM_BLOCK_SZ. */
798 0 : int dev_fd = open( bstream_path, O_RDONLY|O_CLOEXEC, 0444 );
799 0 : if( FD_UNLIKELY( dev_fd<0 ) ) {
800 0 : FD_LOG_ERR(( "open(%s,O_RDONLY|O_CLOEXEC, 0444) failed (%i-%s)",
801 0 : bstream_path, errno, fd_io_strerror( errno ) ));
802 0 : }
803 :
804 0 : struct stat st;
805 0 : if( FD_UNLIKELY( 0!=fstat( dev_fd, &st ) ) ) FD_LOG_ERR(( "fstat(%s) failed (%i-%s)", bstream_path, errno, strerror( errno ) ));
806 :
807 0 : ctx->vinyl.dev_fd = dev_fd;
808 0 : ulong bstream_sz = (ulong)st.st_size;
809 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( bstream_sz, FD_VINYL_BSTREAM_BLOCK_SZ ) ) ) {
810 0 : FD_LOG_ERR(( "vinyl file %s has misaligned size (%lu bytes)", bstream_path, bstream_sz ));
811 0 : }
812 0 : ctx->vinyl.dev_sz = bstream_sz;
813 0 : ctx->vinyl.dev_base = FD_VINYL_BSTREAM_BLOCK_SZ;
814 :
815 0 : ctx->vinyl.io = NULL;
816 0 : ctx->ioring->ioring_fd = -1;
817 :
818 0 : if( FD_LIKELY( tile->snaplh.io_uring_enabled ) ) {
819 0 : ctx->vinyl.io = snaplh_io_uring_init( ctx, uring_shmem, uring_mem, dev_fd );
820 0 : }
821 0 : ctx->io_uring_enabled = tile->snaplh.io_uring_enabled;
822 0 : }
823 :
824 : static void
825 : unprivileged_init( fd_topo_t * topo,
826 0 : fd_topo_tile_t * tile ) {
827 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
828 :
829 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
830 0 : fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
831 0 : void * pair_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ );
832 0 : void * pair_tmp = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ );
833 0 : void * rd_req_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ );
834 :
835 0 : FD_TEST( fd_topo_tile_name_cnt( topo, "snaplh" )<=FD_SNAPSHOT_MAX_SNAPLH_TILES );
836 :
837 0 : ctx->vinyl.pair_mem = pair_mem;
838 0 : ctx->vinyl.pair_tmp = pair_tmp;
839 :
840 0 : if( FD_UNLIKELY( tile->in_cnt!=IN_CNT_MAX ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected %lu", tile->in_cnt, IN_CNT_MAX ));
841 0 : if( FD_UNLIKELY( tile->out_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 1", tile->out_cnt ));
842 :
843 0 : ctx->io_seed = NULL;
844 :
845 0 : for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
846 0 : fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
847 0 : fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
848 0 : if( FD_LIKELY( 0==strcmp( in_link->name, "snaplv_lh" ) ) ) {
849 0 : ctx->in[ i ].wksp = in_wksp->wksp;
850 0 : ctx->in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in[ i ].wksp, in_link->dcache );
851 0 : ctx->in[ i ].wmark = fd_dcache_compact_wmark( ctx->in[ i ].wksp, in_link->dcache, in_link->mtu );
852 0 : ctx->in[ i ].mtu = in_link->mtu;
853 0 : ctx->in[ i ].base = NULL;
854 0 : ctx->in[ i ].seq_sync = NULL;
855 0 : ctx->in_kind[ i ] = IN_KIND_SNAPLV;
856 0 : } else if( FD_LIKELY( 0==strcmp( in_link->name, "snapwh_wr" ) ) ) {
857 0 : ctx->in[ i ].wksp = in_wksp->wksp;
858 0 : ctx->in[ i ].chunk0 = 0;
859 0 : ctx->in[ i ].wmark = 0;
860 0 : ctx->in[ i ].mtu = 0;
861 0 : ctx->in[ i ].base = fd_dcache_join( fd_topo_obj_laddr( topo, tile->snaplh.dcache_obj_id ) );
862 0 : ctx->in[ i ].seq_sync = tile->in_link_fseq[ i ];
863 0 : ctx->wh_last_in_seq = fd_fseq_query( tile->in_link_fseq[ i ] );
864 0 : ctx->in_kind[ i ] = IN_KIND_SNAPWH;
865 0 : ctx->io_seed = (ulong const *)fd_dcache_app_laddr_const( ctx->in[ i ].base );
866 0 : FD_TEST( ctx->in[ i ].base );
867 0 : } else {
868 0 : FD_LOG_ERR(( "tile `" NAME "` has unexpected in link name `%s`", in_link->name ));
869 0 : }
870 0 : }
871 :
872 0 : FD_TEST( ctx->io_seed );
873 :
874 0 : fd_topo_link_t * out_link = &topo->links[ tile->out_link_id[ 0UL ] ];
875 0 : ctx->out.wksp = topo->workspaces[ topo->objs[ out_link->dcache_obj_id ].wksp_id ].wksp;
876 0 : ctx->out.chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( out_link->dcache ), out_link->dcache );
877 0 : ctx->out.wmark = fd_dcache_compact_wmark ( ctx->out.wksp, out_link->dcache, out_link->mtu );
878 0 : ctx->out.chunk = ctx->out.chunk0;
879 0 : ctx->out.mtu = out_link->mtu;
880 0 : FD_TEST( 0==strcmp( out_link->name, "snaplh_lv" ) );
881 :
882 0 : fd_lthash_adder_new( ctx->adder );
883 0 : fd_lthash_adder_new( ctx->adder_sub );
884 :
885 0 : ctx->metrics.full.accounts_hashed = 0UL;
886 0 : ctx->metrics.incremental.accounts_hashed = 0UL;
887 :
888 0 : memset( ctx->vinyl.pending.phdr, 0, sizeof(fd_vinyl_bstream_phdr_t) * VINYL_LTHASH_RD_REQ_MAX );
889 0 : memset( ctx->vinyl.pending.rd_req, 0, sizeof(fd_vinyl_io_rd_t) * VINYL_LTHASH_RD_REQ_MAX );
890 0 : for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
891 0 : fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
892 0 : rd_req->ctx = rd_req_ctx_from_parts( i, VINYL_LTHASH_RD_REQ_FREE );
893 0 : rd_req->dst = NULL;
894 0 : if( rd_req_mem!=NULL ) {
895 0 : rd_req->dst = ((uchar*)rd_req_mem) + i*VINYL_LTHASH_BLOCK_MAX_SZ;
896 0 : }
897 0 : }
898 0 : ctx->vinyl.pending_rd_req_cnt = 0UL;
899 :
900 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
901 0 : ctx->full = 1;
902 0 : ctx->lthash_tile_cnt = fd_topo_tile_name_cnt( topo, "snaplh" );
903 0 : ctx->lthash_tile_idx = tile->kind_id;
904 : /* This may seem redundant, but it provides flexibility around which
905 : tiles and do addition and subtraction of lthash. */
906 0 : ctx->lthash_tile_add_cnt = ctx->lthash_tile_cnt;
907 0 : ctx->lthash_tile_sub_cnt = ctx->lthash_tile_cnt;
908 0 : ctx->lthash_tile_add_idx = ctx->lthash_tile_idx;
909 0 : ctx->lthash_tile_sub_idx = ctx->lthash_tile_idx;
910 0 : if( ctx->lthash_tile_add_idx != ULONG_MAX ) FD_TEST( ctx->lthash_tile_add_idx < ctx->lthash_tile_add_cnt );
911 0 : if( ctx->lthash_tile_sub_idx != ULONG_MAX ) FD_TEST( ctx->lthash_tile_sub_idx < ctx->lthash_tile_sub_cnt );
912 0 : ctx->pairs_seen = 0UL;
913 0 : ctx->lthash_req_seen = 0UL;
914 0 : fd_lthash_zero( &ctx->running_lthash );
915 0 : fd_lthash_zero( &ctx->running_lthash_sub );
916 :
917 0 : ulong vinyl_admin_obj_id = fd_pod_query_ulong( topo->props, "vinyl_admin", ULONG_MAX );
918 0 : FD_TEST( vinyl_admin_obj_id!=ULONG_MAX );
919 0 : fd_vinyl_admin_t * vinyl_admin = fd_vinyl_admin_join( fd_topo_obj_laddr( topo, vinyl_admin_obj_id ) );
920 0 : FD_TEST( vinyl_admin );
921 0 : ctx->vinyl.admin = vinyl_admin;
922 0 : for(;;) {
923 : /* This query can be done without the need of an rwlock. */
924 0 : ulong vinyl_admin_status = fd_vinyl_admin_ulong_query( &vinyl_admin->status );
925 0 : if( FD_LIKELY( vinyl_admin_status!=FD_VINYL_ADMIN_STATUS_INIT_PENDING &&
926 0 : vinyl_admin_status!=FD_VINYL_ADMIN_STATUS_ERROR ) ) break;
927 0 : fd_log_sleep( (long)1e6 /*1ms*/ );
928 0 : FD_SPIN_PAUSE();
929 0 : }
930 :
931 0 : ctx->lthash_completion_pending = 0;
932 0 : ctx->fail_completion_pending = 0;
933 0 : }
934 :
935 0 : #define STEM_BURST 1UL
936 0 : #define STEM_LAZY 1000L
937 :
938 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snaplh_t
939 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaplh_t)
940 :
941 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
942 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
943 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
944 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
945 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
946 :
947 : #include "../../disco/stem/fd_stem.c"
948 :
949 : fd_topo_run_tile_t fd_tile_snaplh = {
950 : .name = NAME,
951 : .populate_allowed_fds = populate_allowed_fds,
952 : .populate_allowed_seccomp = populate_allowed_seccomp,
953 : .scratch_align = scratch_align,
954 : .scratch_footprint = scratch_footprint,
955 : .privileged_init = privileged_init,
956 : .unprivileged_init = unprivileged_init,
957 : .run = stem_run,
958 : };
959 :
960 : #undef NAME
|