Line data Source code
1 : #include "fd_snaplv_tile_private.h"
2 : #include "../../disco/topo/fd_topo.h"
3 : #include "../../disco/metrics/fd_metrics.h"
4 : #include "../../ballet/lthash/fd_lthash.h"
5 : #include "../../vinyl/bstream/fd_vinyl_bstream.h"
6 : #include "generated/fd_snaplv_tile_seccomp.h"
7 :
8 : #include "utils/fd_ssctrl.h"
9 :
10 : #define NAME "snaplv"
11 :
12 0 : #define IN_KIND_SNAPWM (0)
13 0 : #define IN_KIND_SNAPLH (1)
14 : #define MAX_IN_LINKS (16UL) /* at least 1 snapwm and FD_SNAPSHOT_MAX_SNAPLH_TILES */
15 :
16 : #define OUT_LINK_CNT (3UL)
17 0 : #define OUT_LINK_LH (0)
18 0 : #define OUT_LINK_CT (1)
19 0 : #define OUT_LINK_WR (2)
20 :
21 : struct out_link {
22 : ulong idx;
23 : fd_wksp_t * mem;
24 : ulong chunk0;
25 : ulong wmark;
26 : ulong chunk;
27 : };
28 : typedef struct out_link out_link_t;
29 :
30 : struct fd_snaplv_tile {
31 : uint state;
32 : int full;
33 :
34 : ulong num_hash_tiles;
35 :
36 : uchar in_kind[MAX_IN_LINKS];
37 : ulong adder_in_offset;
38 :
39 : out_link_t out_link[OUT_LINK_CNT];
40 :
41 : struct {
42 : ulong const * bstream_seq;
43 : ulong bstream_seq_last;
44 : ulong bstream_seq_cnt;
45 : struct {
46 : int active[FD_SNAPLV_DUP_PENDING_CNT_MAX];
47 : ulong seq [FD_SNAPLV_DUP_PENDING_CNT_MAX];
48 : fd_vinyl_bstream_phdr_t phdr [FD_SNAPLV_DUP_PENDING_CNT_MAX];
49 : } pending;
50 : ulong pending_cnt;
51 : } vinyl;
52 :
53 : struct {
54 : fd_lthash_value_t expected_lthash;
55 : fd_lthash_value_t calculated_lthash;
56 : ulong received_lthashes;
57 : ulong ack_sig;
58 : int awaiting_results;
59 : int hash_check_done;
60 : } hash_accum;
61 :
62 : fd_lthash_value_t running_lthash;
63 :
64 : struct {
65 : struct {
66 : ulong duplicate_accounts_hashed;
67 : } full;
68 :
69 : struct {
70 : ulong duplicate_accounts_hashed;
71 : } incremental;
72 : } metrics;
73 :
74 : struct {
75 : fd_wksp_t * wksp;
76 : ulong chunk0;
77 : ulong wmark;
78 : ulong mtu;
79 : ulong pos;
80 : } in;
81 :
82 : struct {
83 : fd_wksp_t * wksp;
84 : ulong chunk0;
85 : ulong wmark;
86 : ulong mtu;
87 : } adder_in[FD_SNAPSHOT_MAX_SNAPLH_TILES];
88 : };
89 :
90 : typedef struct fd_snaplv_tile fd_snaplv_t;
91 :
92 : static inline int
93 0 : should_shutdown( fd_snaplv_t * ctx ) {
94 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
95 0 : }
96 :
97 : static ulong
98 0 : scratch_align( void ) {
99 0 : return alignof(fd_snaplv_t);
100 0 : }
101 :
102 : static ulong
103 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
104 0 : (void)tile;
105 0 : ulong l = FD_LAYOUT_INIT;
106 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snaplv_t), sizeof(fd_snaplv_t) );
107 0 : return FD_LAYOUT_FINI( l, alignof(fd_snaplv_t) );
108 0 : }
109 :
110 : static void
111 0 : metrics_write( fd_snaplv_t * ctx ) {
112 0 : (void)ctx;
113 0 : FD_MGAUGE_SET( SNAPLV, FULL_DUPLICATE_ACCOUNTS_HASHED, ctx->metrics.full.duplicate_accounts_hashed );
114 0 : FD_MGAUGE_SET( SNAPLV, INCREMENTAL_DUPLICATE_ACCOUNTS_HASHED, ctx->metrics.incremental.duplicate_accounts_hashed );
115 0 : FD_MGAUGE_SET( SNAPLV, STATE, (ulong)(ctx->state) );
116 0 : }
117 :
118 : static void
119 : transition_malformed( fd_snaplv_t * ctx,
120 0 : fd_stem_context_t * stem ) {
121 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
122 0 : fd_stem_publish( stem, ctx->out_link[ OUT_LINK_LH ].idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
123 0 : fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
124 0 : }
125 :
126 : static void
127 : handle_vinyl_lthash_request( fd_snaplv_t * ctx,
128 : fd_stem_context_t * stem,
129 : ulong seq,
130 0 : fd_vinyl_bstream_phdr_t * acc_hdr ) {
131 :
132 0 : out_link_t * o_link = &ctx->out_link[ OUT_LINK_LH ];
133 0 : uchar * data = fd_chunk_to_laddr( o_link->mem, o_link->chunk );
134 0 : memcpy( data, &seq, sizeof(ulong) );
135 0 : memcpy( data + sizeof(ulong), acc_hdr, sizeof(fd_vinyl_bstream_phdr_t) );
136 0 : ulong data_sz = sizeof(ulong)+sizeof(fd_vinyl_bstream_phdr_t);
137 0 : fd_stem_publish( stem, ctx->out_link[ OUT_LINK_LH ].idx, FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH, o_link->chunk, data_sz, 0UL, 0UL, 0UL );
138 0 : o_link->chunk = fd_dcache_compact_next( o_link->chunk, data_sz, o_link->chunk0, o_link->wmark );
139 :
140 0 : if( ctx->full ) ctx->metrics.full.duplicate_accounts_hashed++;
141 0 : else ctx->metrics.incremental.duplicate_accounts_hashed++;
142 0 : }
143 :
144 : static inline void
145 0 : handle_vinyl_lthash_seq_sync( fd_snaplv_t * ctx ) {
146 0 : ulong bstream_seq_min = ULONG_MAX;
147 0 : for( ulong i=0; i<ctx->vinyl.bstream_seq_cnt; i++ ){
148 0 : ulong bstream_seq = fd_mcache_seq_query( ctx->vinyl.bstream_seq + i );
149 0 : bstream_seq_min = fd_ulong_min( bstream_seq_min, bstream_seq );
150 0 : }
151 0 : ctx->vinyl.bstream_seq_last = bstream_seq_min;
152 0 : }
153 :
154 : static inline int
155 : handle_vinyl_lthash_seq_check_fast( fd_snaplv_t * ctx,
156 0 : ulong seq ) {
157 0 : return seq < ctx->vinyl.bstream_seq_last;
158 0 : }
159 :
160 : static inline void
161 : handle_vinyl_lthash_seq_check_until_match( fd_snaplv_t * ctx,
162 : ulong seq,
163 0 : int do_sleep ) {
164 0 : for(;;) {
165 0 : if( handle_vinyl_lthash_seq_check_fast( ctx, seq ) ) break;
166 0 : FD_SPIN_PAUSE();
167 0 : if( do_sleep ) fd_log_sleep( (long)1e3 ); /* 1 microsecond */
168 0 : handle_vinyl_lthash_seq_sync( ctx );
169 0 : }
170 0 : }
171 :
172 : static inline void
173 : handle_vinyl_lthash_request_drain_all( fd_snaplv_t * ctx,
174 0 : fd_stem_context_t * stem ) {
175 0 : for( ulong i=0; i<FD_SNAPLV_DUP_PENDING_CNT_MAX; i++ ) {
176 0 : if( !ctx->vinyl.pending.active[ i ] ) continue;
177 0 : handle_vinyl_lthash_seq_check_until_match( ctx, ctx->vinyl.pending.seq[ i ], 1/*do_sleep*/ );
178 0 : handle_vinyl_lthash_request( ctx, stem, ctx->vinyl.pending.seq[ i ], &ctx->vinyl.pending.phdr[ i ] );
179 0 : ctx->vinyl.pending.active[ i ] = 0;
180 0 : ctx->vinyl.pending_cnt--;
181 0 : }
182 0 : FD_TEST( !ctx->vinyl.pending_cnt );
183 0 : }
184 :
185 : static inline void
186 : handle_vinyl_lthash_pending_list( fd_snaplv_t * ctx,
187 0 : fd_stem_context_t * stem ) {
188 : /* Try to consume as many pending requests as possible. */
189 0 : for( ulong i=0; i<FD_SNAPLV_DUP_PENDING_CNT_MAX; i++ ) {
190 0 : if( FD_LIKELY( !ctx->vinyl.pending.active[ i ] ) ) continue;
191 0 : if( handle_vinyl_lthash_seq_check_fast( ctx, ctx->vinyl.pending.seq[ i ] ) ) {
192 0 : handle_vinyl_lthash_request( ctx, stem, ctx->vinyl.pending.seq[ i ], &ctx->vinyl.pending.phdr[ i ] );
193 0 : ctx->vinyl.pending.active[ i ] = 0;
194 0 : ctx->vinyl.pending_cnt--;
195 0 : }
196 0 : }
197 0 : }
198 :
199 : static void
200 : handle_data_frag( fd_snaplv_t * ctx,
201 : fd_stem_context_t * stem,
202 : ulong sig,
203 : ulong chunk,
204 : ulong sz,
205 0 : ulong tspub ) {
206 0 : (void)chunk; (void)sz;
207 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING );
208 :
209 0 : if( sig!=FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH ) {
210 0 : FD_LOG_ERR(( "unexpected sig %lu in handle_data_frag", sig ));
211 0 : return;
212 0 : }
213 :
214 0 : uchar const * in_data = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
215 :
216 0 : ulong const acc_sz = sizeof(ulong)+sizeof(fd_vinyl_bstream_phdr_t);
217 0 : ulong const batch_sz = sz;
218 0 : ulong const batch_cnt = tspub;
219 0 : if( FD_UNLIKELY( batch_cnt>FD_SNAPLV_DUP_BATCH_IN_CNT_MAX ) ) {
220 0 : FD_LOG_CRIT(( "batch count %lu exceeds FD_SNAPLV_DUP_BATCH_IN_CNT_MAX %lu", batch_cnt, FD_SNAPLV_DUP_BATCH_IN_CNT_MAX ));
221 0 : }
222 0 : if( FD_UNLIKELY( (batch_cnt*acc_sz)!=batch_sz ) ) {
223 0 : FD_LOG_CRIT(( "batch count %lu with account size %lu does not match batch size %lu", batch_cnt, acc_sz, batch_sz ));
224 0 : }
225 :
226 0 : for( ulong acc_i=0UL; acc_i<batch_cnt; acc_i++ ) {
227 :
228 : /* move in_data pointer forward */
229 0 : uchar const * acc_data = in_data;
230 0 : in_data += acc_sz;
231 :
232 0 : ulong acc_data_seq = fd_ulong_load_8( acc_data );
233 :
234 0 : if( FD_LIKELY( handle_vinyl_lthash_seq_check_fast( ctx, acc_data_seq ) ) ) {
235 : /* The request can be processed immediately, skipping the pending list. */
236 0 : fd_vinyl_bstream_phdr_t acc_data_phdr[1];
237 0 : memcpy( acc_data_phdr, acc_data + sizeof(ulong), sizeof(fd_vinyl_bstream_phdr_t) );
238 0 : handle_vinyl_lthash_request( ctx, stem, acc_data_seq, acc_data_phdr );
239 0 : continue;
240 0 : }
241 :
242 : /* Find an empty slot in the pending list. */
243 0 : ulong seq_min_i = ULONG_MAX;
244 0 : ulong seq_min = ULONG_MAX;
245 0 : ulong free_i = ULONG_MAX;
246 0 : if( FD_UNLIKELY( ctx->vinyl.pending_cnt==FD_SNAPLV_DUP_PENDING_CNT_MAX ) ) {
247 : /* an entry must be consumed to free a slot */
248 0 : for( ulong i=0; i<FD_SNAPLV_DUP_PENDING_CNT_MAX; i++ ) {
249 0 : ulong seq = ctx->vinyl.pending.seq[ i ];
250 0 : seq_min_i = fd_ulong_if( seq_min > seq, i, seq_min_i );
251 0 : seq_min = fd_ulong_min( seq_min, seq );
252 0 : }
253 0 : handle_vinyl_lthash_seq_check_until_match( ctx, ctx->vinyl.pending.seq[ seq_min_i ], 1/*do_sleep*/ );
254 0 : handle_vinyl_lthash_request( ctx, stem, ctx->vinyl.pending.seq[ seq_min_i ], &ctx->vinyl.pending.phdr[ seq_min_i ] );
255 0 : ctx->vinyl.pending.active[ seq_min_i ] = 0;
256 0 : ctx->vinyl.pending_cnt--;
257 0 : free_i = seq_min_i;
258 0 : } else {
259 : /* Pick a free slot. */
260 0 : free_i = 0UL;
261 0 : for( ; free_i<FD_SNAPLV_DUP_PENDING_CNT_MAX; free_i++ ) {
262 0 : if( !ctx->vinyl.pending.active[ free_i ] ) break;
263 0 : }
264 0 : }
265 :
266 : /* Populate the free slot. */
267 0 : ctx->vinyl.pending.seq[ free_i ] = acc_data_seq;
268 0 : memcpy( &ctx->vinyl.pending.phdr[ free_i ], acc_data + sizeof(ulong), sizeof(fd_vinyl_bstream_phdr_t) );
269 0 : ctx->vinyl.pending.active[ free_i ] = 1;
270 0 : ctx->vinyl.pending_cnt++;
271 0 : }
272 0 : }
273 :
274 : static void
275 : handle_control_frag( fd_snaplv_t * ctx,
276 : fd_stem_context_t * stem,
277 : ulong sig,
278 : ulong in_idx,
279 : ulong tsorig,
280 0 : ulong tspub ) {
281 0 : (void)in_idx;
282 :
283 0 : int forward_to_ct = 1UL;
284 :
285 0 : switch( sig ) {
286 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
287 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR: {
288 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
289 0 : ctx->full = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
290 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
291 0 : fd_lthash_zero( &ctx->running_lthash );
292 0 : break;
293 0 : }
294 :
295 0 : case FD_SNAPSHOT_MSG_CTRL_FAIL: {
296 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ||
297 0 : ctx->state==FD_SNAPSHOT_STATE_FINISHING ||
298 0 : ctx->state==FD_SNAPSHOT_STATE_ERROR );
299 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
300 0 : fd_lthash_zero( &ctx->running_lthash );
301 0 : break;
302 0 : }
303 :
304 0 : case FD_SNAPSHOT_MSG_CTRL_NEXT:
305 0 : case FD_SNAPSHOT_MSG_CTRL_DONE: {
306 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) ) {
307 0 : transition_malformed( ctx, stem );
308 0 : break;
309 0 : }
310 0 : ctx->hash_accum.ack_sig = sig;
311 0 : ctx->hash_accum.awaiting_results = 1;
312 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
313 0 : forward_to_ct = 0UL;
314 0 : handle_vinyl_lthash_request_drain_all( ctx, stem );
315 0 : break; /* the ack is sent when all hashes are received */
316 0 : }
317 :
318 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: {
319 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
320 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
321 0 : break;
322 0 : }
323 :
324 0 : case FD_SNAPSHOT_MSG_CTRL_ERROR:
325 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
326 0 : break;
327 :
328 0 : default:
329 0 : FD_LOG_ERR(( "unexpected control sig %lu", sig ));
330 0 : break;
331 0 : }
332 :
333 : /* Forward the control message down the pipeline */
334 0 : fd_stem_publish( stem, ctx->out_link[ OUT_LINK_LH ].idx, sig, 0UL, 0UL, 0UL, tsorig, tspub );
335 0 : if( !forward_to_ct ) return;
336 0 : fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, sig, 0UL, 0UL, 0UL, tsorig, tspub );
337 0 : }
338 :
339 : static void
340 : handle_hash_frag( fd_snaplv_t * ctx,
341 : ulong in_idx,
342 : ulong sig,
343 : ulong chunk,
344 0 : ulong sz ) {
345 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING || ctx->state==FD_SNAPSHOT_STATE_IDLE );
346 0 : switch( sig ) {
347 0 : case FD_SNAPSHOT_HASH_MSG_RESULT_ADD: {
348 0 : FD_TEST( sz==sizeof(fd_lthash_value_t) );
349 0 : FD_TEST( ctx->in_kind[ in_idx ]==IN_KIND_SNAPLH );
350 0 : fd_lthash_value_t const * result = fd_chunk_to_laddr_const( ctx->adder_in[ in_idx-ctx->adder_in_offset ].wksp, chunk );
351 0 : fd_lthash_add( &ctx->hash_accum.calculated_lthash, result );
352 0 : ctx->hash_accum.received_lthashes++;
353 0 : break;
354 0 : }
355 0 : case FD_SNAPSHOT_HASH_MSG_RESULT_SUB: {
356 0 : FD_TEST( sz==sizeof(fd_lthash_value_t) );
357 0 : FD_TEST( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWM );
358 0 : fd_lthash_value_t const * result = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
359 0 : fd_lthash_sub( &ctx->hash_accum.calculated_lthash, result );
360 0 : break;
361 0 : }
362 0 : case FD_SNAPSHOT_HASH_MSG_EXPECTED: {
363 0 : FD_TEST( sz==sizeof(fd_lthash_value_t) );
364 0 : FD_TEST( ctx->in_kind[ in_idx ]==IN_KIND_SNAPWM );
365 0 : fd_lthash_value_t const * result = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
366 0 : fd_memcpy( &ctx->hash_accum.expected_lthash, result, sizeof(fd_lthash_value_t) );
367 0 : break;
368 0 : }
369 0 : default:
370 0 : FD_LOG_ERR(( "unexpected hash sig %lu", sig ));
371 0 : break;
372 0 : }
373 :
374 0 : }
375 :
376 : static inline int
377 : returnable_frag( fd_snaplv_t * ctx,
378 : ulong in_idx FD_PARAM_UNUSED,
379 : ulong seq FD_PARAM_UNUSED,
380 : ulong sig,
381 : ulong chunk,
382 : ulong sz,
383 : ulong ctl FD_PARAM_UNUSED,
384 : ulong tsorig,
385 : ulong tspub,
386 0 : fd_stem_context_t * stem ) {
387 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
388 :
389 0 : if( FD_LIKELY( sig==FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH ) ) handle_data_frag( ctx, stem, sig, chunk, sz, tspub );
390 0 : else if( FD_LIKELY( sig==FD_SNAPSHOT_HASH_MSG_RESULT_ADD ||
391 0 : sig==FD_SNAPSHOT_HASH_MSG_RESULT_SUB ||
392 0 : sig==FD_SNAPSHOT_HASH_MSG_EXPECTED ) ) handle_hash_frag( ctx, in_idx, sig, chunk, sz );
393 0 : else handle_control_frag( ctx, stem, sig, in_idx, tsorig, tspub );
394 :
395 0 : return 0;
396 0 : }
397 :
398 : static void
399 : before_credit( fd_snaplv_t * ctx,
400 : fd_stem_context_t * stem FD_PARAM_UNUSED,
401 0 : int * charge_busy ) {
402 0 : *charge_busy = 0;
403 0 : handle_vinyl_lthash_seq_sync( ctx );
404 0 : }
405 :
406 : static void
407 : after_credit( fd_snaplv_t * ctx,
408 : fd_stem_context_t * stem,
409 : int * opt_poll_in FD_PARAM_UNUSED,
410 0 : int * charge_busy FD_PARAM_UNUSED ) {
411 :
412 0 : handle_vinyl_lthash_pending_list( ctx, stem );
413 :
414 0 : if( FD_UNLIKELY( ctx->hash_accum.received_lthashes==ctx->num_hash_tiles && ctx->hash_accum.awaiting_results ) ) {
415 0 : fd_lthash_sub( &ctx->hash_accum.calculated_lthash, &ctx->running_lthash );
416 :
417 0 : int test = memcmp( &ctx->hash_accum.expected_lthash, &ctx->hash_accum.calculated_lthash, sizeof(fd_lthash_value_t) );
418 :
419 0 : if( FD_UNLIKELY( test ) ) {
420 0 : FD_LOG_WARNING(( "calculated accounts lthash %s does not match accounts lthash %s in snapshot manifest",
421 0 : FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.calculated_lthash ),
422 0 : FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.expected_lthash ) ));
423 0 : transition_malformed( ctx, stem );
424 0 : } else {
425 0 : FD_LOG_NOTICE(( "calculated accounts lthash %s matches accounts lthash %s in snapshot manifest",
426 0 : FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.calculated_lthash ),
427 0 : FD_LTHASH_ENC_32_ALLOCA( &ctx->hash_accum.expected_lthash ) ));
428 0 : }
429 0 : ctx->hash_accum.received_lthashes = 0UL;
430 0 : ctx->hash_accum.hash_check_done = 1;
431 0 : }
432 :
433 0 : if( FD_UNLIKELY( ctx->hash_accum.awaiting_results && ctx->hash_accum.hash_check_done ) ) {
434 0 : fd_stem_publish( stem, ctx->out_link[ OUT_LINK_CT ].idx, ctx->hash_accum.ack_sig, 0UL, 0UL, 0UL, 0UL, 0UL );
435 0 : ctx->hash_accum.awaiting_results = 0;
436 0 : ctx->hash_accum.hash_check_done = 0;
437 0 : }
438 0 : }
439 :
440 : static ulong
441 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
442 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
443 : ulong out_fds_cnt,
444 0 : int * out_fds ) {
445 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
446 :
447 0 : ulong out_cnt = 0;
448 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
449 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
450 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
451 0 : }
452 0 : return out_cnt;
453 0 : }
454 :
455 : static ulong
456 : populate_allowed_seccomp( fd_topo_t const * topo FD_PARAM_UNUSED,
457 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
458 : ulong out_cnt,
459 0 : struct sock_filter * out ) {
460 0 : populate_sock_filter_policy_fd_snaplv_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
461 0 : return sock_filter_policy_fd_snaplv_tile_instr_cnt;
462 0 : }
463 :
464 : static void
465 : unprivileged_init( fd_topo_t * topo,
466 0 : fd_topo_tile_t * tile ) {
467 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
468 :
469 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
470 0 : fd_snaplv_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snaplv_t), sizeof(fd_snaplv_t) );
471 :
472 0 : FD_TEST( fd_topo_tile_name_cnt( topo, "snaplh" )<=FD_SNAPSHOT_MAX_SNAPLH_TILES );
473 :
474 0 : ulong expected_in_cnt = 1UL + fd_topo_tile_name_cnt( topo, "snaplh" );
475 0 : if( FD_UNLIKELY( tile->in_cnt!=expected_in_cnt ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected %lu", tile->in_cnt, expected_in_cnt ));
476 0 : if( FD_UNLIKELY( tile->out_cnt!=3UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 3", tile->out_cnt ));
477 :
478 0 : ulong adder_idx = 0UL;
479 0 : for( ulong i=0UL; i<(tile->in_cnt); i++ ) {
480 0 : fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ i ] ];
481 0 : fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
482 :
483 0 : if( FD_LIKELY( 0==strcmp( in_link->name, "snapwm_lv" ) ) ) {
484 0 : ctx->in.wksp = in_wksp->wksp;
485 0 : ctx->in.chunk0 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
486 0 : ctx->in.wmark = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
487 0 : ctx->in.mtu = in_link->mtu;
488 0 : ctx->in.pos = 0UL;
489 0 : ctx->in_kind[ i ] = IN_KIND_SNAPWM;
490 :
491 0 : } else if( FD_LIKELY( 0==strcmp( in_link->name, "snaplh_lv" ) ) ) {
492 0 : ctx->adder_in[ adder_idx ].wksp = in_wksp->wksp;
493 0 : ctx->adder_in[ adder_idx ].chunk0 = fd_dcache_compact_chunk0( ctx->adder_in[ adder_idx ].wksp, in_link->dcache );
494 0 : ctx->adder_in[ adder_idx ].wmark = fd_dcache_compact_wmark ( ctx->adder_in[ adder_idx ].wksp, in_link->dcache, in_link->mtu );
495 0 : ctx->adder_in[ adder_idx ].mtu = in_link->mtu;
496 0 : ctx->in_kind[ i ] = IN_KIND_SNAPLH;
497 0 : if( FD_LIKELY( adder_idx==0UL ) ) ctx->adder_in_offset = i;
498 0 : adder_idx++;
499 :
500 0 : } else {
501 0 : FD_LOG_ERR(( "tile `" NAME "` has unexpected in link name `%s`", in_link->name ));
502 0 : }
503 0 : }
504 :
505 0 : ctx->vinyl.bstream_seq = NULL; /* set to NULL by default, before checking output links. */
506 0 : ctx->vinyl.bstream_seq_last = 0UL;
507 0 : ctx->vinyl.bstream_seq_cnt = fd_topo_tile_name_cnt( topo, "snapwr" );
508 :
509 0 : for( uint i=0U; i<(tile->out_cnt); i++ ) {
510 0 : fd_topo_link_t * link = &topo->links[ tile->out_link_id[ i ] ];
511 :
512 0 : if( 0==strcmp( link->name, "snaplv_ct" ) ) {
513 0 : out_link_t * o_link = &ctx->out_link[ OUT_LINK_CT ];
514 0 : o_link->idx = i;
515 0 : o_link->mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
516 0 : o_link->chunk0 = 0UL;
517 0 : o_link->wmark = 0UL;
518 0 : o_link->chunk = 0UL;
519 :
520 0 : } else if( 0==strcmp( link->name, "snaplv_lh" ) ) {
521 0 : out_link_t * o_link = &ctx->out_link[ OUT_LINK_LH ];
522 0 : o_link->idx = i;
523 0 : o_link->mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
524 0 : o_link->chunk0 = fd_dcache_compact_chunk0( o_link->mem, link->dcache );
525 0 : o_link->wmark = fd_dcache_compact_wmark( o_link->mem, link->dcache, link->mtu );
526 0 : o_link->chunk = o_link->chunk0;
527 :
528 0 : } else if( 0==strcmp( link->name, "snaplv_wr" ) ) {
529 0 : out_link_t * o_link = &ctx->out_link[ OUT_LINK_WR ];
530 0 : o_link->idx = i;
531 0 : o_link->mem = NULL;
532 0 : o_link->chunk0 = 0UL;
533 0 : o_link->wmark = 0UL;
534 0 : o_link->chunk = 0UL;
535 0 : ctx->vinyl.bstream_seq = fd_mcache_seq_laddr( fd_mcache_join( fd_topo_obj_laddr( topo, link->mcache_obj_id ) ) );
536 0 : } else {
537 0 : FD_LOG_ERR(( "unexpected output link %s", link->name ));
538 0 : }
539 0 : }
540 :
541 0 : FD_TEST( !!ctx->vinyl.bstream_seq );
542 0 : memset( ctx->vinyl.pending.active, 0, FD_SNAPLV_DUP_PENDING_CNT_MAX*sizeof(int) );
543 0 : ctx->vinyl.pending_cnt = 0;
544 :
545 0 : ctx->metrics.full.duplicate_accounts_hashed = 0UL;
546 0 : ctx->metrics.incremental.duplicate_accounts_hashed = 0UL;
547 :
548 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
549 0 : ctx->full = 1;
550 :
551 0 : ctx->num_hash_tiles = fd_topo_tile_name_cnt( topo, "snaplh" );
552 :
553 0 : ctx->hash_accum.received_lthashes = 0UL;
554 0 : ctx->hash_accum.awaiting_results = 0;
555 0 : ctx->hash_accum.hash_check_done = 0;
556 :
557 0 : fd_lthash_zero( &ctx->hash_accum.calculated_lthash );
558 0 : fd_lthash_zero( &ctx->running_lthash );
559 0 : }
560 :
561 0 : #define STEM_BURST (FD_SNAPLV_STEM_BURST)
562 0 : #define STEM_LAZY 1000L
563 :
564 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snaplv_t
565 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snaplv_t)
566 :
567 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
568 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
569 0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
570 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
571 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
572 :
573 :
574 : #include "../../disco/stem/fd_stem.c"
575 :
576 : fd_topo_run_tile_t fd_tile_snaplv = {
577 : .name = NAME,
578 : .populate_allowed_fds = populate_allowed_fds,
579 : .populate_allowed_seccomp = populate_allowed_seccomp,
580 : .scratch_align = scratch_align,
581 : .scratch_footprint = scratch_footprint,
582 : .unprivileged_init = unprivileged_init,
583 : .run = stem_run,
584 : };
585 :
586 : #undef NAME
|