Line data Source code
1 : #include "../../disco/topo/fd_topo.h"
2 : #include "../../disco/metrics/fd_metrics.h"
3 : #include "../../ballet/lthash/fd_lthash.h"
4 : #include "../../ballet/lthash/fd_lthash_adder.h"
5 : #include "../../vinyl/io/fd_vinyl_io.h"
6 : #include "../../vinyl/bstream/fd_vinyl_bstream.h"
7 : #include "generated/fd_snaplh_tile_seccomp.h"
8 :
9 : #include "utils/fd_ssctrl.h"
10 :
11 : #include <errno.h>
12 : #include <sys/stat.h> /* fstat */
13 : #include <fcntl.h> /* open */
14 : #include <unistd.h> /* close */
15 :
16 : #if FD_HAS_LIBURING
17 : #include "../../vinyl/io/fd_vinyl_io_ur.h"
18 : #include <liburing.h>
19 : typedef struct io_uring fd_io_uring_t;
20 : #else
21 : typedef char fd_io_uring_t;
22 : #endif
23 :
24 : #define NAME "snaplh"
25 :
26 : #define IN_CNT_MAX (2UL)
27 0 : #define IN_KIND_SNAPLV (0UL)
28 0 : #define IN_KIND_SNAPWH (1UL)
29 :
30 0 : #define VINYL_LTHASH_BLOCK_ALIGN (512UL) /* O_DIRECT would require 4096UL */
31 0 : #define VINYL_LTHASH_BLOCK_MAX_SZ (16UL<<20)
32 : FD_STATIC_ASSERT( VINYL_LTHASH_BLOCK_MAX_SZ>(sizeof(fd_snapshot_full_account_t)+FD_VINYL_BSTREAM_BLOCK_SZ+2*VINYL_LTHASH_BLOCK_ALIGN), "VINYL_LTHASH_BLOCK_MAX_SZ" );
33 :
34 0 : #define VINYL_LTHASH_RD_REQ_MAX (32UL)
35 :
36 : #define VINYL_LTHASH_IO_SPAD_MAX (2<<20UL)
37 :
38 0 : #define VINYL_LTHASH_RD_REQ_FREE (0UL)
39 0 : #define VINYL_LTHASH_RD_REQ_PEND (1UL)
40 0 : #define VINYL_LTHASH_RD_REQ_SENT (2UL)
41 :
42 : struct in_link_private {
43 : fd_wksp_t * wksp;
44 : ulong chunk0;
45 : ulong wmark;
46 : ulong mtu;
47 : void const * base;
48 : ulong * seq_sync; /* fseq->seq[0] */
49 : };
50 : typedef struct in_link_private in_link_t;
51 :
52 : struct out_link_private {
53 : fd_wksp_t * wksp;
54 : ulong chunk0;
55 : ulong wmark;
56 : ulong chunk;
57 : ulong mtu;
58 : };
59 : typedef struct out_link_private out_link_t;
60 :
61 : struct fd_snaplh_tile {
62 : uint state;
63 : int full;
64 :
65 : ulong seed;
66 : ulong lthash_tile_cnt;
67 : ulong lthash_tile_idx;
68 : ulong lthash_tile_add_cnt;
69 : ulong lthash_tile_sub_cnt;
70 : ulong lthash_tile_add_idx;
71 : ulong lthash_tile_sub_idx;
72 : ulong pairs_seen;
73 : ulong lthash_req_seen;
74 :
75 : /* Database params */
76 : ulong const * io_seed;
77 :
78 : fd_lthash_adder_t adder[1];
79 : fd_lthash_adder_t adder_sub[1];
80 : uchar data[FD_RUNTIME_ACC_SZ_MAX];
81 :
82 : fd_lthash_value_t running_lthash;
83 : fd_lthash_value_t running_lthash_sub;
84 :
85 : struct {
86 : int dev_fd;
87 : ulong dev_sz;
88 : ulong dev_base;
89 : void * pair_mem;
90 : void * pair_tmp;
91 :
92 : struct {
93 : fd_vinyl_bstream_phdr_t phdr [VINYL_LTHASH_RD_REQ_MAX];
94 : fd_vinyl_io_rd_t rd_req[VINYL_LTHASH_RD_REQ_MAX];
95 : } pending;
96 : ulong pending_rd_req_cnt;
97 :
98 : int io_uring_enabled;
99 : fd_vinyl_io_t * io;
100 : fd_io_uring_t * ring;
101 : } vinyl;
102 :
103 : struct {
104 : struct {
105 : ulong accounts_hashed;
106 : } full;
107 :
108 : struct {
109 : ulong accounts_hashed;
110 : } incremental;
111 : } metrics;
112 :
113 : ulong wh_finish_fseq;
114 : ulong wh_last_in_seq;
115 :
116 : in_link_t in[IN_CNT_MAX];
117 : uchar in_kind[IN_CNT_MAX];
118 : out_link_t out;
119 : };
120 :
121 : typedef struct fd_snaplh_tile fd_snaplh_t;
122 :
123 : static inline int
124 0 : should_shutdown( fd_snaplh_t * ctx ) {
125 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
126 0 : }
127 :
128 : static ulong
129 0 : scratch_align( void ) {
130 0 : return alignof(fd_snaplh_t);
131 0 : }
132 :
133 : static ulong
134 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
135 0 : (void)tile;
136 0 : ulong l = FD_LAYOUT_INIT;
137 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
138 0 : l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ );
139 0 : l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ );
140 0 : l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ );
141 : #if FD_HAS_LIBURING
142 : l = FD_LAYOUT_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ );
143 : l = FD_LAYOUT_APPEND( l, alignof(struct io_uring), sizeof(struct io_uring) );
144 : l = FD_LAYOUT_APPEND( l, fd_vinyl_io_ur_align(), fd_vinyl_io_ur_footprint(VINYL_LTHASH_IO_SPAD_MAX) );
145 : #endif
146 0 : return FD_LAYOUT_FINI( l, alignof(fd_snaplh_t) );
147 0 : }
148 :
149 : static void
150 0 : metrics_write( fd_snaplh_t * ctx ) {
151 0 : FD_MGAUGE_SET( SNAPLH, FULL_ACCOUNTS_HASHED, ctx->metrics.full.accounts_hashed );
152 0 : FD_MGAUGE_SET( SNAPLH, INCREMENTAL_ACCOUNTS_HASHED, ctx->metrics.incremental.accounts_hashed );
153 0 : FD_MGAUGE_SET( SNAPLH, STATE, (ulong)(ctx->state) );
154 0 : }
155 :
156 : static inline int
157 0 : should_hash_account( fd_snaplh_t * ctx ) {
158 0 : return (ctx->pairs_seen % ctx->lthash_tile_add_cnt)==ctx->lthash_tile_add_idx;
159 0 : }
160 :
161 : static inline int
162 0 : should_process_lthash_request( fd_snaplh_t * ctx ) {
163 0 : return (ctx->lthash_req_seen % ctx->lthash_tile_sub_cnt)==ctx->lthash_tile_sub_idx;
164 0 : }
165 :
166 : FD_FN_UNUSED static void
167 : streamlined_hash( fd_snaplh_t * restrict ctx,
168 : fd_lthash_adder_t * restrict adder,
169 : fd_lthash_value_t * restrict running_lthash,
170 0 : uchar const * restrict _pair ) {
171 0 : uchar const * pair = _pair;
172 0 : fd_vinyl_bstream_phdr_t const * phdr = (fd_vinyl_bstream_phdr_t const *)pair;
173 0 : pair += sizeof(fd_vinyl_bstream_phdr_t);
174 0 : fd_account_meta_t const * meta = (fd_account_meta_t const *)pair;
175 0 : pair += sizeof(fd_account_meta_t);
176 0 : uchar const * data = pair;
177 :
178 0 : ulong data_len = meta->dlen;
179 0 : const char * pubkey = phdr->key.c;
180 0 : ulong lamports = meta->lamports;
181 0 : const uchar * owner = meta->owner;
182 0 : uchar executable = (uchar)( !meta->executable ? 0U : 1U) ;
183 :
184 0 : if( FD_UNLIKELY( data_len > FD_RUNTIME_ACC_SZ_MAX ) ) FD_LOG_ERR(( "Found unusually large account (data_sz=%lu), aborting", data_len ));
185 0 : if( FD_UNLIKELY( lamports==0UL ) ) return;
186 :
187 0 : fd_lthash_adder_push_solana_account( adder,
188 0 : running_lthash,
189 0 : pubkey,
190 0 : data,
191 0 : data_len,
192 0 : lamports,
193 0 : executable,
194 0 : owner );
195 :
196 0 : if( FD_LIKELY( ctx->full ) ) ctx->metrics.full.accounts_hashed++;
197 0 : else ctx->metrics.incremental.accounts_hashed++;
198 0 : }
199 :
200 : FD_FN_UNUSED static inline void
201 : bd_read( int fd,
202 : ulong off,
203 : void * buf,
204 0 : ulong sz ) {
205 0 : ssize_t ssz = pread( fd, buf, sz, (off_t)off );
206 0 : if( FD_LIKELY( ssz==(ssize_t)sz ) ) return;
207 0 : if( ssz<(ssize_t)0 ) FD_LOG_CRIT(( "pread(fd %i,off %lu,sz %lu) failed (%i-%s)", fd, off, sz, errno, fd_io_strerror( errno ) ));
208 0 : /**/ FD_LOG_CRIT(( "pread(fd %i,off %lu,sz %lu) failed (unexpected sz %li)", fd, off, sz, (long)ssz ));
209 0 : }
210 :
211 : FD_FN_UNUSED static void
212 : handle_vinyl_lthash_request_bd( fd_snaplh_t * ctx,
213 : ulong seq,
214 0 : fd_vinyl_bstream_phdr_t * acc_hdr ) {
215 :
216 : /* The bd version is blocking, therefore ctx->pending is not used. */
217 0 : ulong const io_seed = FD_VOLATILE_CONST( *ctx->io_seed );
218 :
219 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz( acc_hdr->ctl );
220 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
221 :
222 : /* dev_seq shows where the seq is physically located in device. */
223 0 : ulong dev_seq = ( seq + ctx->vinyl.dev_base ) % ctx->vinyl.dev_sz;
224 0 : ulong rd_off = fd_ulong_align_dn( dev_seq, VINYL_LTHASH_BLOCK_ALIGN );
225 0 : ulong pair_off = (dev_seq - rd_off);
226 0 : ulong rd_sz = fd_ulong_align_up( pair_off + pair_sz, VINYL_LTHASH_BLOCK_ALIGN );
227 0 : FD_TEST( rd_sz < VINYL_LTHASH_BLOCK_MAX_SZ );
228 :
229 0 : uchar * pair = ((uchar*)ctx->vinyl.pair_mem) + pair_off;
230 0 : fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)pair;
231 :
232 0 : for(;;) {
233 0 : ulong sz = rd_sz;
234 0 : ulong rsz = fd_ulong_min( rd_sz, ctx->vinyl.dev_sz - rd_off );
235 0 : uchar * dst = ctx->vinyl.pair_mem;
236 0 : uchar * tmp = ctx->vinyl.pair_tmp;
237 :
238 0 : bd_read( ctx->vinyl.dev_fd, rd_off, dst, rsz );
239 0 : sz -= rsz;
240 0 : if( FD_UNLIKELY( sz ) ) {
241 : /* When the dev wraps around, the dev_base needs to be skipped.
242 : This means: increase the size multiple of the alignment,
243 : read into a temporary buffer, and memcpy into the dst at the
244 : correct offset. */
245 0 : bd_read( ctx->vinyl.dev_fd, 0, tmp, sz + VINYL_LTHASH_BLOCK_ALIGN );
246 0 : fd_memcpy( dst + rsz, tmp + ctx->vinyl.dev_base, sz );
247 0 : }
248 :
249 0 : if( FD_LIKELY( !memcmp( phdr, acc_hdr, sizeof(fd_vinyl_bstream_phdr_t)) ) ) {
250 :
251 : /* test bstream pair integrity hashes */
252 0 : int test = !fd_vinyl_bstream_pair_test( io_seed, seq, (fd_vinyl_bstream_block_t *)pair, pair_sz );
253 0 : if( FD_LIKELY( test ) ) break;
254 0 : }
255 0 : FD_LOG_WARNING(( "phdr mismatch! - this should not happen under bstream_seq" ));
256 0 : FD_SPIN_PAUSE();
257 0 : }
258 :
259 0 : streamlined_hash( ctx, ctx->adder_sub, &ctx->running_lthash_sub, pair );
260 0 : }
261 :
262 : FD_FN_UNUSED static inline ulong
263 0 : rd_req_ctx_get_idx( ulong rd_req_ctx ) {
264 0 : return ( rd_req_ctx >> 0 ) & ((1UL<<32)-1UL);
265 0 : }
266 :
267 : FD_FN_UNUSED static inline ulong
268 0 : rd_req_ctx_get_status( ulong rd_req_ctx ) {
269 0 : return ( rd_req_ctx >> 32 ) & ((1UL<<32)-1UL);
270 0 : }
271 :
272 : FD_FN_UNUSED static inline void
273 : rd_req_ctx_into_parts( ulong rd_req_ctx,
274 : ulong * idx,
275 0 : ulong * status ) {
276 0 : *idx = rd_req_ctx_get_idx( rd_req_ctx );
277 0 : *status = rd_req_ctx_get_status( rd_req_ctx );
278 0 : }
279 :
280 : FD_FN_UNUSED static inline ulong
281 : rd_req_ctx_from_parts( ulong idx,
282 0 : ulong status ) {
283 0 : return ( idx & ((1UL<<32)-1UL) ) | ( status << 32 );
284 0 : }
285 :
286 : FD_FN_UNUSED static inline ulong
287 : rd_req_ctx_update_status( ulong rd_req_ctx,
288 0 : ulong status ) {
289 0 : return rd_req_ctx_from_parts( rd_req_ctx_get_idx( rd_req_ctx ), status );
290 0 : }
291 :
292 : FD_FN_UNUSED static void
293 : handle_vinyl_lthash_compute_from_rd_req( fd_snaplh_t * ctx,
294 0 : fd_vinyl_io_rd_t * rd_req ) {
295 0 : ulong idx = rd_req_ctx_get_idx( rd_req->ctx );
296 :
297 0 : fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)rd_req->dst;
298 0 : fd_vinyl_bstream_phdr_t * acc_hdr = &ctx->vinyl.pending.phdr[ idx ];
299 :
300 : /* test the retrieved header (it must mach the request) */
301 0 : FD_TEST( !memcmp( phdr, acc_hdr, sizeof(fd_vinyl_bstream_phdr_t)) );
302 :
303 0 : ulong const io_seed = FD_VOLATILE_CONST( *ctx->io_seed );
304 0 : ulong seq = rd_req->seq;
305 0 : uchar * pair = (uchar*)rd_req->dst;
306 0 : ulong pair_sz = rd_req->sz;
307 :
308 : /* test the bstream pair integrity hashes */
309 0 : FD_TEST( !fd_vinyl_bstream_pair_test( io_seed, seq, (fd_vinyl_bstream_block_t *)pair, pair_sz ) );
310 :
311 0 : streamlined_hash( ctx, ctx->adder_sub, &ctx->running_lthash_sub, pair );
312 0 : }
313 :
314 : static inline ulong
315 : consume_available_cqe( fd_snaplh_t * ctx,
316 0 : int peek_first ) {
317 0 : if( FD_LIKELY( !ctx->vinyl.pending_rd_req_cnt ) ) return 0UL;
318 0 : if( FD_UNLIKELY( !ctx->vinyl.io_uring_enabled ) ) return 0UL;
319 : /* Consume one at a time, so that incoming frags can be processed
320 : with minimum delay. */
321 : #if FD_HAS_LIBURING
322 : if( FD_UNLIKELY( !!peek_first ) ) {
323 : struct io_uring_cqe * cqe = NULL;
324 : io_uring_peek_cqe( ctx->vinyl.ring, &cqe ); if( !cqe ) return 0UL;
325 : }
326 : fd_vinyl_io_rd_t * rd_req = NULL;
327 : if( FD_LIKELY( fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, 0/*non blocking*/ )==FD_VINYL_SUCCESS ) ) {
328 : handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
329 : rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
330 : rd_req->seq = ULONG_MAX;
331 : rd_req->sz = 0UL;
332 : ctx->vinyl.pending_rd_req_cnt--;
333 : return 1UL;
334 : }
335 : #else
336 0 : (void)peek_first;
337 0 : #endif
338 0 : return 0UL;
339 0 : }
340 :
341 : FD_FN_UNUSED static void
342 : handle_vinyl_lthash_request_ur( fd_snaplh_t * ctx,
343 : ulong seq,
344 0 : fd_vinyl_bstream_phdr_t * acc_hdr ) {
345 : /* Find a free slot */
346 0 : ulong free_i = ULONG_MAX;
347 0 : if( FD_LIKELY( ctx->vinyl.pending_rd_req_cnt<VINYL_LTHASH_RD_REQ_MAX ) ) {
348 0 : for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
349 0 : fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
350 0 : if( FD_UNLIKELY( rd_req_ctx_get_status( rd_req->ctx )==VINYL_LTHASH_RD_REQ_FREE ) ) {
351 0 : free_i = i;
352 0 : break;
353 0 : }
354 0 : }
355 0 : } else {
356 0 : fd_vinyl_io_rd_t * rd_req = NULL;
357 0 : fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, FD_VINYL_IO_FLAG_BLOCKING );
358 0 : FD_TEST( rd_req!=NULL );
359 0 : handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
360 0 : rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
361 0 : rd_req->seq = ULONG_MAX;
362 0 : rd_req->sz = 0UL;
363 0 : free_i = rd_req_ctx_get_idx( rd_req->ctx );
364 0 : ctx->vinyl.pending_rd_req_cnt--;
365 0 : }
366 0 : FD_CRIT( free_i<VINYL_LTHASH_RD_REQ_MAX, "read request free index exceeds max value" );
367 :
368 : /* Populate the empty slot and submit */
369 0 : fd_vinyl_bstream_phdr_t * in_phdr = &ctx->vinyl.pending.phdr[ free_i ];
370 0 : memcpy( in_phdr, acc_hdr, sizeof(fd_vinyl_bstream_phdr_t) );
371 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz( acc_hdr->ctl );
372 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
373 :
374 0 : fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ free_i ];
375 0 : rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_PEND );
376 0 : rd_req->seq = seq;
377 0 : rd_req->sz = pair_sz;
378 0 : fd_vinyl_io_read( ctx->vinyl.io, rd_req );
379 0 : rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_SENT );
380 0 : ctx->vinyl.pending_rd_req_cnt++;
381 0 : }
382 :
383 : FD_FN_UNUSED static void
384 0 : handle_vinyl_lthash_request_ur_consume_all( fd_snaplh_t * ctx ) {
385 0 : while( ctx->vinyl.pending_rd_req_cnt ) {
386 0 : fd_vinyl_io_rd_t * rd_req = NULL;
387 0 : fd_vinyl_io_poll( ctx->vinyl.io, &rd_req, FD_VINYL_IO_FLAG_BLOCKING );
388 0 : FD_TEST( rd_req!=NULL );
389 0 : handle_vinyl_lthash_compute_from_rd_req( ctx, rd_req );
390 0 : rd_req->ctx = rd_req_ctx_update_status( rd_req->ctx, VINYL_LTHASH_RD_REQ_FREE );
391 0 : rd_req->seq = ULONG_MAX;
392 0 : rd_req->sz = 0UL;
393 0 : ctx->vinyl.pending_rd_req_cnt--;
394 0 : }
395 0 : FD_CRIT( !ctx->vinyl.pending_rd_req_cnt, "pending read requests count not zero" );
396 0 : for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
397 0 : fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
398 0 : FD_CRIT( rd_req_ctx_get_status( rd_req->ctx )==VINYL_LTHASH_RD_REQ_FREE, "pending request status is not free" );
399 0 : }
400 0 : }
401 :
402 : FD_FN_UNUSED static uint
403 : handle_lthash_completion( fd_snaplh_t * ctx,
404 0 : fd_stem_context_t * stem ) {
405 0 : fd_lthash_adder_flush( ctx->adder, &ctx->running_lthash );
406 0 : fd_lthash_adder_flush( ctx->adder_sub, &ctx->running_lthash_sub );
407 0 : if( fd_seq_inc( ctx->wh_last_in_seq, 1UL )==ctx->wh_finish_fseq ) {
408 0 : fd_lthash_sub( &ctx->running_lthash, &ctx->running_lthash_sub );
409 0 : uchar * lthash_out = fd_chunk_to_laddr( ctx->out.wksp, ctx->out.chunk );
410 0 : fd_memcpy( lthash_out, &ctx->running_lthash, sizeof(fd_lthash_value_t) );
411 0 : fd_stem_publish( stem, 0UL, FD_SNAPSHOT_HASH_MSG_RESULT_ADD, ctx->out.chunk, FD_LTHASH_LEN_BYTES, 0UL, 0UL, 0UL );
412 0 : ctx->out.chunk = fd_dcache_compact_next( ctx->out.chunk, FD_LTHASH_LEN_BYTES, ctx->out.chunk0, ctx->out.wmark );
413 0 : return FD_SNAPSHOT_STATE_IDLE;
414 0 : }
415 0 : return ctx->state;
416 0 : }
417 :
418 : static void
419 : before_credit( fd_snaplh_t * ctx,
420 : fd_stem_context_t * stem FD_PARAM_UNUSED,
421 0 : int * charge_busy ) {
422 0 : *charge_busy = !!consume_available_cqe( ctx, 1/*peek_first*/ );
423 0 : }
424 :
425 : static void
426 : handle_wh_data_frag( fd_snaplh_t * ctx,
427 : ulong in_idx,
428 : ulong chunk, /* compressed input pointer */
429 : ulong sz_comp, /* compressed input size */
430 0 : fd_stem_context_t * stem ) {
431 0 : FD_CRIT( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH, "incorrect in kind" );
432 :
433 0 : uchar const * rem = fd_chunk_to_laddr_const( ctx->in[ in_idx ].base, chunk );
434 0 : ulong rem_sz = sz_comp<<FD_VINYL_BSTREAM_BLOCK_LG_SZ;
435 0 : FD_CRIT( fd_ulong_is_aligned( (ulong)rem, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
436 0 : FD_CRIT( fd_ulong_is_aligned( rem_sz, FD_VINYL_BSTREAM_BLOCK_SZ ), "misaligned write request" );
437 :
438 0 : while( rem_sz ) {
439 0 : FD_CRIT( rem_sz>=FD_VINYL_BSTREAM_BLOCK_SZ, "corrupted bstream block" );
440 :
441 0 : fd_vinyl_bstream_phdr_t const * phdr = (fd_vinyl_bstream_phdr_t *)rem;
442 0 : ulong ctl = phdr->ctl;
443 0 : int ctl_type = fd_vinyl_bstream_ctl_type( ctl );
444 0 : switch( ctl_type ) {
445 :
446 0 : case FD_VINYL_BSTREAM_CTL_TYPE_PAIR: {
447 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz( ctl );
448 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
449 0 : if( FD_LIKELY( should_hash_account( ctx ) ) ) {
450 0 : uchar * pair = ctx->vinyl.pair_mem;
451 0 : fd_memcpy( pair, rem, pair_sz );
452 0 : streamlined_hash( ctx, ctx->adder, &ctx->running_lthash, pair );
453 0 : }
454 0 : rem += pair_sz;
455 0 : rem_sz -= pair_sz;
456 0 : ctx->pairs_seen++;
457 0 : break;
458 0 : }
459 :
460 0 : case FD_VINYL_BSTREAM_CTL_TYPE_ZPAD: {
461 0 : rem += FD_VINYL_BSTREAM_BLOCK_SZ;
462 0 : rem_sz -= FD_VINYL_BSTREAM_BLOCK_SZ;
463 0 : break;
464 0 : }
465 :
466 0 : default:
467 0 : FD_LOG_CRIT(( "unexpected vinyl bstream block ctl=%016lx", ctl ));
468 0 : }
469 0 : }
470 :
471 0 : if( ctx->state==FD_SNAPSHOT_STATE_FINISHING ) {
472 0 : ctx->state = handle_lthash_completion( ctx, stem );
473 0 : }
474 0 : }
475 :
476 : static void
477 : handle_lv_data_frag( fd_snaplh_t * ctx,
478 : ulong in_idx,
479 : ulong chunk, /* compressed input pointer */
480 0 : ulong sz_comp ) { /* compressed input size */
481 0 : (void)sz_comp;
482 0 : if( FD_LIKELY( should_process_lthash_request( ctx ) ) ) {
483 0 : uchar const * indata = fd_chunk_to_laddr_const( ctx->in[ in_idx ].wksp, chunk );
484 0 : ulong seq;
485 0 : fd_vinyl_bstream_phdr_t acc_hdr[1];
486 0 : memcpy( &seq, indata, sizeof(ulong) );
487 0 : memcpy( acc_hdr, indata + sizeof(ulong), sizeof(fd_vinyl_bstream_phdr_t) );
488 0 : if( FD_LIKELY( ctx->vinyl.io_uring_enabled ) ) {
489 0 : handle_vinyl_lthash_request_ur( ctx, seq, acc_hdr );
490 0 : } else {
491 0 : handle_vinyl_lthash_request_bd( ctx, seq, acc_hdr );
492 0 : }
493 0 : }
494 0 : ctx->lthash_req_seen++;
495 0 : }
496 :
497 : static inline ulong
498 : tsorig_tspub_to_fseq( ulong tsorig,
499 0 : ulong tspub ) {
500 0 : return (tspub<<32 ) | tsorig;
501 0 : }
502 :
503 : static void
504 : handle_control_frag( fd_snaplh_t * ctx,
505 : ulong sig,
506 : ulong tsorig,
507 : ulong tspub,
508 0 : fd_stem_context_t * stem ) {
509 0 : switch( sig ) {
510 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
511 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
512 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
513 0 : ctx->full = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
514 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
515 0 : fd_lthash_zero( &ctx->running_lthash );
516 0 : fd_lthash_zero( &ctx->running_lthash_sub );
517 0 : fd_lthash_adder_new( ctx->adder );
518 0 : fd_lthash_adder_new( ctx->adder_sub );
519 0 : break;
520 :
521 0 : case FD_SNAPSHOT_MSG_CTRL_FAIL:
522 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ||
523 0 : ctx->state==FD_SNAPSHOT_STATE_FINISHING ||
524 0 : ctx->state==FD_SNAPSHOT_STATE_ERROR );
525 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
526 0 : fd_lthash_zero( &ctx->running_lthash );
527 0 : fd_lthash_zero( &ctx->running_lthash_sub );
528 0 : fd_lthash_adder_new( ctx->adder );
529 0 : fd_lthash_adder_new( ctx->adder_sub );
530 0 : break;
531 :
532 0 : case FD_SNAPSHOT_MSG_CTRL_NEXT:
533 0 : case FD_SNAPSHOT_MSG_CTRL_DONE:{
534 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ||
535 0 : ctx->state==FD_SNAPSHOT_STATE_FINISHING ||
536 0 : ctx->state==FD_SNAPSHOT_STATE_ERROR );
537 0 : ctx->wh_finish_fseq = tsorig_tspub_to_fseq( tsorig, tspub );
538 0 : ctx->state = FD_SNAPSHOT_STATE_FINISHING;
539 :
540 0 : if( ctx->state==FD_SNAPSHOT_STATE_FINISHING ) {
541 0 : if( FD_LIKELY( ctx->vinyl.io_uring_enabled ) ) {
542 0 : handle_vinyl_lthash_request_ur_consume_all( ctx );
543 0 : }
544 0 : ctx->state = handle_lthash_completion( ctx, stem );
545 0 : }
546 0 : break;
547 0 : }
548 :
549 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
550 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
551 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
552 0 : break;
553 :
554 0 : case FD_SNAPSHOT_MSG_CTRL_ERROR:
555 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
556 0 : break;
557 :
558 0 : default:
559 0 : FD_LOG_ERR(( "unexpected control sig %lu", sig ));
560 0 : return;
561 0 : }
562 0 : }
563 :
564 : static inline int
565 : returnable_frag( fd_snaplh_t * ctx,
566 : ulong in_idx,
567 : ulong seq,
568 : ulong sig,
569 : ulong chunk,
570 : ulong sz,
571 : ulong ctl FD_PARAM_UNUSED,
572 : ulong tsorig,
573 : ulong tspub,
574 0 : fd_stem_context_t * stem ) {
575 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
576 :
577 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH ) ) handle_wh_data_frag( ctx, in_idx, chunk, sz/*sz_comp*/, stem );
578 0 : else if( FD_UNLIKELY( sig==FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH ) ) handle_lv_data_frag( ctx, in_idx, chunk, sz );
579 0 : else handle_control_frag( ctx, sig, tsorig, tspub, stem );
580 :
581 : /* Because fd_stem may not return flow control credits fast enough,
582 : always update fseq (consumer progress) here. */
583 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWH ) ) {
584 0 : ctx->wh_last_in_seq = seq;
585 0 : fd_fseq_update( ctx->in[ in_idx ].seq_sync, fd_seq_inc( seq, 1UL ) );
586 0 : }
587 :
588 0 : return 0;
589 0 : }
590 :
591 : static ulong
592 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
593 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
594 : ulong out_fds_cnt,
595 0 : int * out_fds ) {
596 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
597 :
598 0 : ulong out_cnt = 0;
599 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
600 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
601 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
602 0 : }
603 :
604 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
605 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
606 0 : fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
607 :
608 0 : out_fds[ out_cnt++ ] = ctx->vinyl.dev_fd;
609 :
610 : #if FD_HAS_LIBURING
611 : if( FD_LIKELY( !!ctx->vinyl.ring ) ) out_fds[ out_cnt++ ] = ctx->vinyl.ring->ring_fd;
612 : #endif
613 :
614 0 : return out_cnt;
615 0 : }
616 :
617 : static ulong
618 : populate_allowed_seccomp( fd_topo_t const * topo,
619 : fd_topo_tile_t const * tile,
620 : ulong out_cnt,
621 0 : struct sock_filter * out ) {
622 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
623 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
624 0 : fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
625 :
626 0 : int ring_fd = INT_MAX;
627 : #if FD_HAS_LIBURING
628 : if( !!ctx->vinyl.ring ) ring_fd = ctx->vinyl.ring->ring_fd;
629 : #endif
630 0 : populate_sock_filter_policy_fd_snaplh_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->vinyl.dev_fd, (uint)ring_fd );
631 0 : return sock_filter_policy_fd_snaplh_tile_instr_cnt;
632 0 : }
633 :
634 : static void
635 : privileged_init( fd_topo_t * topo,
636 0 : fd_topo_tile_t * tile ) {
637 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
638 :
639 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
640 0 : fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
641 0 : void * pair_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ ); (void)pair_mem;
642 0 : void * pair_tmp = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ ); (void)pair_tmp;
643 : #if FD_HAS_LIBURING
644 : void * rd_req_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ ); (void)rd_req_mem;
645 : void * _ring_mem = FD_SCRATCH_ALLOC_APPEND( l, alignof(struct io_uring), sizeof(struct io_uring) );
646 : void * uring_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_vinyl_io_ur_align(), fd_vinyl_io_ur_footprint(VINYL_LTHASH_IO_SPAD_MAX) );
647 : #endif
648 :
649 0 : FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
650 :
651 : /* Set up io_bd dependencies */
652 :
653 0 : char const * bstream_path = tile->snaplh.vinyl_path;
654 : /* Note: it would be possible to use O_DIRECT, but it would require
655 : VINYL_LTHASH_BLOCK_ALIGN to be 4096UL, which substantially
656 : increases the read overhead, making it slower (keep in mind that
657 : a rather large subset of mainnet accounts typically fits inside
658 : one FD_VINYL_BSTREAM_BLOCK_SZ. */
659 0 : int dev_fd = open( bstream_path, O_RDONLY|O_CLOEXEC, 0444 );
660 0 : if( FD_UNLIKELY( dev_fd<0 ) ) {
661 0 : FD_LOG_ERR(( "open(%s,O_RDONLY|O_CLOEXEC, 0444) failed (%i-%s)",
662 0 : bstream_path, errno, fd_io_strerror( errno ) ));
663 0 : }
664 :
665 0 : struct stat st;
666 0 : if( FD_UNLIKELY( 0!=fstat( dev_fd, &st ) ) ) FD_LOG_ERR(( "fstat(%s) failed (%i-%s)", bstream_path, errno, strerror( errno ) ));
667 :
668 0 : ctx->vinyl.dev_fd = dev_fd;
669 0 : ulong bstream_sz = (ulong)st.st_size;
670 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( bstream_sz, FD_VINYL_BSTREAM_BLOCK_SZ ) ) ) {
671 0 : FD_LOG_ERR(( "vinyl file %s has misaligned size (%lu bytes)", bstream_path, bstream_sz ));
672 0 : }
673 0 : ctx->vinyl.dev_sz = bstream_sz;
674 0 : ctx->vinyl.dev_base = FD_VINYL_BSTREAM_BLOCK_SZ;
675 :
676 0 : if( FD_LIKELY( tile->snaplh.io_uring_enabled ) ) {
677 0 : #if !FD_HAS_LIBURING
678 0 : FD_LOG_ERR(( "config has enabled io_uring but liburing is not available" ));
679 0 : #endif
680 0 : }
681 0 : ctx->vinyl.io_uring_enabled = !!tile->snaplh.io_uring_enabled;
682 :
683 0 : ctx->vinyl.io = NULL;
684 0 : ctx->vinyl.ring = NULL;
685 : #if FD_HAS_LIBURING
686 : if( FD_LIKELY( tile->snaplh.io_uring_enabled ) ) {
687 : /* Join the bstream using io_ur */
688 : struct io_uring * ring = _ring_mem;
689 : uint depth = 2 * VINYL_LTHASH_RD_REQ_MAX;
690 : struct io_uring_params params = {
691 : .flags = IORING_SETUP_CQSIZE |
692 : IORING_SETUP_DEFER_TASKRUN |
693 : IORING_SETUP_SINGLE_ISSUER |
694 : IORING_SETUP_COOP_TASKRUN,
695 : .cq_entries = depth
696 : };
697 : int init_err = io_uring_queue_init_params( depth, ring, ¶ms );
698 : if( FD_UNLIKELY( init_err==-EPERM ) ) {
699 : FD_LOG_ERR(( "missing privileges to setup io_uring" ));
700 : } else if( init_err<0 ) {
701 : FD_LOG_ERR(( "io_uring_queue_init_params failed (%i-%s)", init_err, fd_io_strerror( -init_err ) ));
702 : }
703 : FD_TEST( 0==io_uring_register_files( ring, &dev_fd, 1 ) );
704 :
705 : ulong align = fd_vinyl_io_ur_align();
706 : FD_TEST( fd_ulong_is_pow2( align ) );
707 :
708 : ulong footprint = fd_vinyl_io_ur_footprint( VINYL_LTHASH_IO_SPAD_MAX );
709 : FD_TEST( fd_ulong_is_aligned( footprint, align ) );
710 :
711 : /* Before invoking fd_vinyl_io_ur_init, the sync block must be
712 : already available. Although in principle one could keep
713 : calling fd_vinyl_io_ur_init until it returns !=NULL, doing this
714 : would log uncessary (and misleading) warnings. */
715 : for(;;) {
716 : fd_vinyl_bstream_block_t block[1];
717 : ulong dev_sync = 0UL; /* Use the beginning of the file for the sync block */
718 : bd_read( dev_fd, dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ );
719 : int type = fd_vinyl_bstream_ctl_type ( block->sync.ctl );
720 : if( FD_UNLIKELY( type != FD_VINYL_BSTREAM_CTL_TYPE_SYNC ) ) continue;
721 : ulong io_seed = block->sync.hash_trail;
722 : if( FD_LIKELY( !fd_vinyl_bstream_block_test( io_seed, block ) ) ) break;
723 : FD_SPIN_PAUSE();
724 : }
725 :
726 : fd_vinyl_io_t * io = fd_vinyl_io_ur_init( uring_mem, VINYL_LTHASH_IO_SPAD_MAX, dev_fd, ring );
727 : FD_TEST( io );
728 : ctx->vinyl.io = io;
729 :
730 : ctx->vinyl.ring = ring;
731 : }
732 : #endif
733 0 : }
734 :
735 : static void
736 : unprivileged_init( fd_topo_t * topo,
737 0 : fd_topo_tile_t * tile ) {
738 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
739 :
740 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
741 0 : fd_snaplh_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplh_t), sizeof(fd_snaplh_t) );
742 0 : void * pair_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ );
743 0 : void * pair_tmp = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_BLOCK_MAX_SZ );
744 0 : void * rd_req_mem = NULL;
745 : #if FD_HAS_LIBURING
746 : rd_req_mem = FD_SCRATCH_ALLOC_APPEND( l, VINYL_LTHASH_BLOCK_ALIGN, VINYL_LTHASH_RD_REQ_MAX*VINYL_LTHASH_BLOCK_MAX_SZ );
747 : #endif
748 :
749 0 : FD_TEST( fd_topo_tile_name_cnt( topo, "snaplh" )<=FD_SNAPSHOT_MAX_SNAPLH_TILES );
750 :
751 0 : ctx->vinyl.pair_mem = pair_mem;
752 0 : ctx->vinyl.pair_tmp = pair_tmp;
753 :
754 0 : if( FD_UNLIKELY( tile->in_cnt!=IN_CNT_MAX ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected %lu", tile->in_cnt, IN_CNT_MAX ));
755 0 : if( FD_UNLIKELY( tile->out_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 1", tile->out_cnt ));
756 :
757 0 : ctx->io_seed = NULL;
758 :
759 0 : for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
760 0 : fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
761 0 : fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
762 0 : if( FD_LIKELY( 0==strcmp( in_link->name, "snaplv_lh" ) ) ) {
763 0 : ctx->in[ i ].wksp = in_wksp->wksp;
764 0 : ctx->in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in[ i ].wksp, in_link->dcache );
765 0 : ctx->in[ i ].wmark = fd_dcache_compact_wmark( ctx->in[ i ].wksp, in_link->dcache, in_link->mtu );
766 0 : ctx->in[ i ].mtu = in_link->mtu;
767 0 : ctx->in[ i ].base = NULL;
768 0 : ctx->in[ i ].seq_sync = NULL;
769 0 : ctx->in_kind[ i ] = IN_KIND_SNAPLV;
770 0 : } else if( FD_LIKELY( 0==strcmp( in_link->name, "snapwh_wr" ) ) ) {
771 0 : ctx->in[ i ].wksp = in_wksp->wksp;
772 0 : ctx->in[ i ].chunk0 = 0;
773 0 : ctx->in[ i ].wmark = 0;
774 0 : ctx->in[ i ].mtu = 0;
775 0 : ctx->in[ i ].base = fd_dcache_join( fd_topo_obj_laddr( topo, tile->snaplh.dcache_obj_id ) );
776 0 : ctx->in[ i ].seq_sync = tile->in_link_fseq[ i ];
777 0 : ctx->wh_last_in_seq = fd_fseq_query( tile->in_link_fseq[ i ] );
778 0 : ctx->in_kind[ i ] = IN_KIND_SNAPWH;
779 0 : ctx->io_seed = (ulong const *)fd_dcache_app_laddr_const( ctx->in[ i ].base );
780 0 : FD_TEST( ctx->in[ i ].base );
781 0 : } else {
782 0 : FD_LOG_ERR(( "tile `" NAME "` has unexpected in link name `%s`", in_link->name ));
783 0 : }
784 0 : }
785 :
786 0 : FD_TEST( ctx->io_seed );
787 :
788 0 : fd_topo_link_t * out_link = &topo->links[ tile->out_link_id[ 0UL ] ];
789 0 : ctx->out.wksp = topo->workspaces[ topo->objs[ out_link->dcache_obj_id ].wksp_id ].wksp;
790 0 : ctx->out.chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( out_link->dcache ), out_link->dcache );
791 0 : ctx->out.wmark = fd_dcache_compact_wmark ( ctx->out.wksp, out_link->dcache, out_link->mtu );
792 0 : ctx->out.chunk = ctx->out.chunk0;
793 0 : ctx->out.mtu = out_link->mtu;
794 0 : FD_TEST( 0==strcmp( out_link->name, "snaplh_lv" ) );
795 :
796 0 : fd_lthash_adder_new( ctx->adder );
797 0 : fd_lthash_adder_new( ctx->adder_sub );
798 :
799 0 : ctx->metrics.full.accounts_hashed = 0UL;
800 0 : ctx->metrics.incremental.accounts_hashed = 0UL;
801 :
802 0 : memset( ctx->vinyl.pending.phdr, 0, sizeof(fd_vinyl_bstream_phdr_t) * VINYL_LTHASH_RD_REQ_MAX );
803 0 : memset( ctx->vinyl.pending.rd_req, 0, sizeof(fd_vinyl_io_rd_t) * VINYL_LTHASH_RD_REQ_MAX );
804 0 : for( ulong i=0UL; i<VINYL_LTHASH_RD_REQ_MAX; i++ ) {
805 0 : fd_vinyl_io_rd_t * rd_req = &ctx->vinyl.pending.rd_req[ i ];
806 0 : rd_req->ctx = rd_req_ctx_from_parts( i, VINYL_LTHASH_RD_REQ_FREE );
807 0 : rd_req->dst = NULL;
808 0 : if( rd_req_mem!=NULL ) {
809 0 : rd_req->dst = ((uchar*)rd_req_mem) + i*VINYL_LTHASH_BLOCK_MAX_SZ;
810 0 : }
811 0 : }
812 0 : ctx->vinyl.pending_rd_req_cnt = 0UL;
813 :
814 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
815 0 : ctx->full = 1;
816 0 : ctx->lthash_tile_cnt = fd_topo_tile_name_cnt( topo, "snaplh" );
817 0 : ctx->lthash_tile_idx = tile->kind_id;
818 : /* This may seem redundant, but it provides flexibility around which
819 : tiles and do addition and subtraction of lthash. */
820 0 : ctx->lthash_tile_add_cnt = ctx->lthash_tile_cnt;
821 0 : ctx->lthash_tile_sub_cnt = ctx->lthash_tile_cnt;
822 0 : ctx->lthash_tile_add_idx = ctx->lthash_tile_idx;
823 0 : ctx->lthash_tile_sub_idx = ctx->lthash_tile_idx;
824 0 : if( ctx->lthash_tile_add_idx != ULONG_MAX ) FD_TEST( ctx->lthash_tile_add_idx < ctx->lthash_tile_add_cnt );
825 0 : if( ctx->lthash_tile_sub_idx != ULONG_MAX ) FD_TEST( ctx->lthash_tile_sub_idx < ctx->lthash_tile_sub_cnt );
826 0 : ctx->pairs_seen = 0UL;
827 0 : ctx->lthash_req_seen = 0UL;
828 0 : fd_lthash_zero( &ctx->running_lthash );
829 0 : fd_lthash_zero( &ctx->running_lthash_sub );
830 0 : }
831 :
832 0 : #define STEM_BURST 1UL
833 0 : #define STEM_LAZY 1000L
834 :
835 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snaplh_t
836 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaplh_t)
837 :
838 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
839 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
840 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
841 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
842 :
843 : #include "../../disco/stem/fd_stem.c"
844 :
845 : fd_topo_run_tile_t fd_tile_snaplh = {
846 : .name = NAME,
847 : .populate_allowed_fds = populate_allowed_fds,
848 : .populate_allowed_seccomp = populate_allowed_seccomp,
849 : .scratch_align = scratch_align,
850 : .scratch_footprint = scratch_footprint,
851 : .privileged_init = privileged_init,
852 : .unprivileged_init = unprivileged_init,
853 : .run = stem_run,
854 : };
855 :
856 : #undef NAME
|