Line data Source code
1 : #include "fd_snapwm_tile_private.h"
2 : #include "utils/fd_ssctrl.h"
3 : #include "utils/fd_vinyl_io_wd.h"
4 :
5 : #include "../../disco/topo/fd_topo.h"
6 : #include "../../disco/metrics/fd_metrics.h"
7 : #include "../../flamenco/runtime/sysvar/fd_sysvar_slot_history.h"
8 : #include "../../flamenco/runtime/fd_system_ids.h"
9 :
10 : #define NAME "snapwm"
11 :
12 : /* The snapwm tile is a state machine responsible for loading accounts
13 : into vinyl database. It processes pre-assembled bstream pairs
14 : and handles vinyl's meta_map and bstream actual allocation. */
15 :
16 : static inline int
17 0 : should_shutdown( fd_snapwm_tile_t * ctx ) {
18 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN ) ) {
19 0 : ulong accounts_dup = ctx->metrics.accounts_ignored + ctx->metrics.accounts_replaced;
20 0 : ulong accounts = ctx->metrics.accounts_loaded - accounts_dup;
21 0 : long elapsed_ns = fd_log_wallclock() - ctx->boot_timestamp;
22 0 : FD_LOG_NOTICE(( "loaded %.1fM accounts (%.1fM dups) from snapshot in %.3f seconds",
23 0 : (double)accounts/1e6,
24 0 : (double)accounts_dup/1e6,
25 0 : (double)elapsed_ns/1e9 ));
26 0 : }
27 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
28 0 : }
29 :
30 : static ulong
31 0 : scratch_align( void ) {
32 0 : return 512UL;
33 0 : }
34 :
35 : static ulong
36 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
37 0 : (void)tile;
38 0 : ulong l = FD_LAYOUT_INIT;
39 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snapwm_tile_t), sizeof(fd_snapwm_tile_t) );
40 0 : l = FD_LAYOUT_APPEND( l, fd_vinyl_io_wd_align(), fd_vinyl_io_wd_footprint( tile->snapwm.snapwr_depth ) );
41 0 : l = FD_LAYOUT_APPEND( l, fd_vinyl_io_mm_align(), fd_vinyl_io_mm_footprint( FD_SNAPWM_IO_SPAD_MAX ) );
42 0 : return FD_LAYOUT_FINI( l, scratch_align() );
43 0 : }
44 :
45 : static void
46 0 : metrics_write( fd_snapwm_tile_t * ctx ) {
47 0 : FD_MGAUGE_SET( SNAPWM, STATE, (ulong)ctx->state );
48 0 : FD_MGAUGE_SET( SNAPWM, ACCOUNTS_LOADED, ctx->metrics.accounts_loaded );
49 0 : FD_MGAUGE_SET( SNAPWM, ACCOUNTS_REPLACED, ctx->metrics.accounts_replaced );
50 0 : FD_MGAUGE_SET( SNAPWM, ACCOUNTS_IGNORED, ctx->metrics.accounts_ignored );
51 0 : FD_MGAUGE_SET( SNAPWM, ACCOUNTS_ACTIVE, ctx->vinyl.pair_cnt );
52 0 : }
53 :
54 : static inline void
55 : seq_to_tsorig_tspub( ulong * tsorig,
56 : ulong * tspub,
57 0 : ulong seq ) {
58 0 : *tsorig = ( seq >> 0 ) & ((1UL<<32)-1UL);
59 0 : *tspub = ( seq >> 32 ) & ((1UL<<32)-1UL);
60 0 : }
61 :
62 : static void
63 : transition_malformed( fd_snapwm_tile_t * ctx,
64 0 : fd_stem_context_t * stem ) {
65 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) return;
66 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
67 0 : fd_stem_publish( stem, ctx->out_ct_idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
68 0 : }
69 :
70 : /* verify_slot_deltas_with_slot_history verifies the 'SlotHistory'
71 : sysvar account after loading a snapshot. The full database
72 : architecture is only instantiated after snapshot loading, so this
73 : function uses a primitive/cache-free mechanism to query the parts of
74 : the account database that are available.
75 :
76 : Returns 0 if verification passed, -1 if not. */
77 :
78 : static int
79 0 : verify_slot_deltas_with_slot_history( fd_snapwm_tile_t * ctx ) {
80 : /* Do a raw read of the slot history sysvar account from the database.
81 : Requires approx 500kB stack space. */
82 :
83 0 : fd_account_meta_t meta;
84 0 : uchar data[ FD_SYSVAR_SLOT_HISTORY_BINCODE_SZ ];
85 0 : union {
86 0 : uchar buf[ FD_SYSVAR_SLOT_HISTORY_FOOTPRINT ];
87 0 : fd_slot_history_global_t o;
88 0 : } decoded;
89 0 : FD_STATIC_ASSERT( offsetof( __typeof__(decoded), buf)==offsetof( __typeof__(decoded), o ), memory_layout );
90 0 : fd_snapwm_vinyl_read_account( ctx, &fd_sysvar_slot_history_id, &meta, data, sizeof(data) );
91 :
92 0 : if( FD_UNLIKELY( !meta.lamports || !meta.dlen ) ) {
93 0 : FD_LOG_WARNING(( "SlotHistory sysvar account missing or empty" ));
94 0 : return -1;
95 0 : }
96 0 : if( FD_UNLIKELY( meta.dlen > FD_SYSVAR_SLOT_HISTORY_BINCODE_SZ ) ) {
97 0 : FD_LOG_WARNING(( "SlotHistory sysvar account data too large: %u bytes", meta.dlen ));
98 0 : return -1;
99 0 : }
100 0 : if( FD_UNLIKELY( !fd_memeq( meta.owner, fd_sysvar_owner_id.uc, sizeof(fd_pubkey_t) ) ) ) {
101 0 : FD_BASE58_ENCODE_32_BYTES( meta.owner, owner_b58 );
102 0 : FD_LOG_WARNING(( "SlotHistory sysvar owner is invalid: %s != sysvar_owner_id", owner_b58 ));
103 0 : return -1;
104 0 : }
105 :
106 0 : if( FD_UNLIKELY(
107 0 : !fd_bincode_decode_static_global(
108 0 : slot_history,
109 0 : &decoded.o,
110 0 : data,
111 0 : meta.dlen,
112 0 : NULL )
113 0 : ) ) {
114 0 : FD_LOG_WARNING(( "SlotHistory sysvar account data is corrupt" ));
115 0 : return -1;
116 0 : }
117 :
118 0 : ulong txncache_entries_len = fd_ulong_load_8( ctx->txncache_entries_len_ptr );
119 0 : if( FD_UNLIKELY( !txncache_entries_len ) ) FD_LOG_WARNING(( "txncache_entries_len %lu", txncache_entries_len ));
120 :
121 0 : for( ulong i=0UL; i<txncache_entries_len; i++ ) {
122 0 : fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[i];
123 0 : if( FD_UNLIKELY( fd_sysvar_slot_history_find_slot( &decoded.o, entry->slot )!=FD_SLOT_HISTORY_SLOT_FOUND ) ) {
124 0 : FD_LOG_WARNING(( "slot %lu missing from SlotHistory sysvar account", entry->slot ));
125 0 : return -1;
126 0 : }
127 0 : }
128 0 : return 0;
129 0 : }
130 :
131 : static int
132 : handle_data_frag( fd_snapwm_tile_t * ctx,
133 : ulong chunk,
134 : ulong acc_cnt,
135 0 : fd_stem_context_t * stem ) {
136 :
137 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_FINISHING ) ) {
138 0 : transition_malformed( ctx, stem );
139 0 : return 0;
140 0 : }
141 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
142 : /* Ignore all data frags after observing an error in the stream until
143 : we receive fail & init control messages to restart processing. */
144 0 : return 0;
145 0 : }
146 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) ) {
147 0 : FD_LOG_ERR(( "invalid state for data frag %d", ctx->state ));
148 0 : }
149 :
150 0 : FD_TEST( chunk>=ctx->in.chunk0 && chunk<=ctx->in.wmark && acc_cnt<=FD_SNAPWM_PAIR_BATCH_CNT_MAX );
151 :
152 0 : fd_snapwm_vinyl_process_account( ctx, chunk, acc_cnt, stem );
153 :
154 0 : return 0;
155 0 : }
156 :
157 : static void
158 : handle_control_frag( fd_snapwm_tile_t * ctx,
159 : ulong sig,
160 0 : fd_stem_context_t * stem ) {
161 0 : if( ctx->state==FD_SNAPSHOT_STATE_ERROR && sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) {
162 : /* Control messages move along the snapshot load pipeline. Since
163 : error conditions can be triggered by any tile in the pipeline,
164 : it is possible to be in error state and still receive otherwise
165 : valid messages. Only a fail message can revert this. */
166 0 : return;
167 0 : };
168 :
169 0 : int forward_msg = 1;
170 :
171 0 : switch( sig ) {
172 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
173 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR: {
174 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
175 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
176 0 : ctx->full = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
177 : /* When lthash verification is disabled, vinyl txn operation can
178 : be used, providing a fast revert mechanism when the incr
179 : snapshot is cancelled. However, when the lthash verification
180 : is enabled, the meta map needs to be updated as the accounts
181 : are processed, in order to detemine which duplicates are new
182 : and which are old. This prevents us from using txn_commit
183 : when lthash is enabled. (In more detail: the need of having
184 : recovery_seq in the pair's phdr info makes an update on the
185 : bstream during txn_commit not feasible, since it would require
186 : recomputing the pair's integrity hashes). For the case when
187 : lthash verification is enabled, there is a recovery mechanism
188 : in place. */
189 0 : if( sig==FD_SNAPSHOT_MSG_CTRL_INIT_INCR && ctx->lthash_disabled ) {
190 0 : fd_snapwm_vinyl_txn_begin( ctx );
191 0 : }
192 0 : fd_snapwm_vinyl_wd_init( ctx );
193 : /* backup bstream seq, independent of whether full or incr. */
194 0 : fd_snapwm_vinyl_recovery_seq_backup( ctx );
195 :
196 : /* There is a way to avoid a lock here: every other writer, in
197 : particular the write tiles that update wr_seq, have already
198 : finished processing all standing writes and are idle. And
199 : even though any read tile may see partial updates as they
200 : occur, this all happens while they are idle waiting for the
201 : init full/incr command. */
202 0 : if( !!ctx->vinyl.admin ) fd_snapwm_vinyl_update_admin( ctx, 0/*do_rwlock*/ );
203 :
204 : /* Rewind metric counters (no-op unless recovering from a fail) */
205 0 : if( sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL ) {
206 0 : ctx->metrics.accounts_loaded = ctx->metrics.full_accounts_loaded = 0UL;
207 0 : ctx->metrics.accounts_replaced = ctx->metrics.full_accounts_replaced = 0UL;
208 0 : ctx->metrics.accounts_ignored = ctx->metrics.full_accounts_ignored = 0UL;
209 0 : ctx->vinyl.pair_cnt = ctx->vinyl.full_pair_cnt = 0UL;
210 0 : } else {
211 0 : ctx->metrics.accounts_loaded = ctx->metrics.full_accounts_loaded;
212 0 : ctx->metrics.accounts_replaced = ctx->metrics.full_accounts_replaced;
213 0 : ctx->metrics.accounts_ignored = ctx->metrics.full_accounts_ignored;
214 0 : ctx->vinyl.pair_cnt = ctx->vinyl.full_pair_cnt;
215 0 : }
216 0 : break;
217 0 : }
218 :
219 0 : case FD_SNAPSHOT_MSG_CTRL_FINI: {
220 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING );
221 0 : ctx->state = FD_SNAPSHOT_STATE_FINISHING;
222 0 : fd_snapwm_vinyl_wd_fini( ctx );
223 0 : if( ctx->vinyl.txn_active ) {
224 0 : fd_snapwm_vinyl_txn_commit( ctx, stem );
225 0 : }
226 0 : break;
227 0 : }
228 :
229 0 : case FD_SNAPSHOT_MSG_CTRL_NEXT: {
230 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
231 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
232 :
233 : /* Backup metric counters */
234 0 : ctx->metrics.full_accounts_loaded = ctx->metrics.accounts_loaded;
235 0 : ctx->metrics.full_accounts_replaced = ctx->metrics.accounts_replaced;
236 0 : ctx->metrics.full_accounts_ignored = ctx->metrics.accounts_ignored;
237 0 : ctx->vinyl.full_pair_cnt = ctx->vinyl.pair_cnt;
238 :
239 0 : if( FD_UNLIKELY( verify_slot_deltas_with_slot_history( ctx ) ) ) {
240 0 : FD_LOG_WARNING(( "slot deltas verification failed for full snapshot" ));
241 0 : transition_malformed( ctx, stem );
242 0 : forward_msg = 0;
243 0 : break;
244 0 : }
245 :
246 : /* FIXME re-enable fd_snapwm_vinyl_txn_commit here once recovery
247 : is fully implemented. */
248 0 : break;
249 0 : }
250 :
251 0 : case FD_SNAPSHOT_MSG_CTRL_DONE: {
252 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
253 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
254 :
255 0 : if( FD_UNLIKELY( verify_slot_deltas_with_slot_history( ctx ) ) ) {
256 0 : if( ctx->full ) FD_LOG_WARNING(( "slot deltas verification failed for full snapshot" ));
257 0 : else FD_LOG_WARNING(( "slot deltas verification failed for incremental snapshot" ));
258 0 : transition_malformed( ctx, stem );
259 0 : forward_msg = 0;
260 0 : break;
261 0 : }
262 0 : break;
263 0 : }
264 :
265 0 : case FD_SNAPSHOT_MSG_CTRL_ERROR: {
266 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
267 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
268 0 : break;
269 0 : }
270 :
271 0 : case FD_SNAPSHOT_MSG_CTRL_FAIL: {
272 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
273 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
274 0 : fd_snapwm_vinyl_wd_fini( ctx );
275 0 : if( ctx->full ) {
276 0 : fd_snapwm_vinyl_revert_full( ctx );
277 0 : } else {
278 0 : if( ctx->vinyl.txn_active ) {
279 0 : fd_snapwm_vinyl_txn_cancel( ctx );
280 0 : } else {
281 0 : fd_snapwm_vinyl_revert_incr( ctx );
282 0 : }
283 0 : }
284 0 : break;
285 0 : }
286 :
287 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: {
288 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
289 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
290 0 : fd_snapwm_vinyl_shutdown( ctx );
291 0 : break;
292 0 : }
293 :
294 0 : default: {
295 0 : FD_LOG_ERR(( "unexpected control sig %lu", sig ));
296 0 : break;
297 0 : }
298 0 : }
299 :
300 0 : if( FD_LIKELY( forward_msg ) ) {
301 : /* Encode wr_seq into tsorig and tspub. Downstream will make use
302 : of wr_seq when needed. */
303 0 : ulong tsorig = 0UL;
304 0 : ulong tspub = 0UL;
305 0 : if( !ctx->lthash_disabled ) {
306 0 : seq_to_tsorig_tspub( &tsorig, &tspub, ((fd_vinyl_io_wd_t const *)ctx->vinyl.io_wd)->wr_seq );
307 0 : }
308 : /* Forward the control message down the pipeline */
309 0 : fd_stem_publish( stem, ctx->out_ct_idx, sig, 0UL, 0UL, 0UL, tsorig, tspub );
310 0 : }
311 0 : }
312 :
313 : static inline void
314 : handle_expected_hash_message( fd_snapwm_tile_t * ctx,
315 : ulong chunk,
316 : ulong sz,
317 0 : fd_stem_context_t * stem ) {
318 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return;
319 0 : uchar * src = fd_chunk_to_laddr( ctx->in.wksp, chunk );
320 0 : uchar * dst = fd_chunk_to_laddr( ctx->hash_out.mem, ctx->hash_out.chunk );
321 0 : memcpy( dst, src, sz );
322 0 : fd_stem_publish( stem, ctx->out_ct_idx, FD_SNAPSHOT_HASH_MSG_EXPECTED, ctx->hash_out.chunk, sz, 0UL, 0UL, 0UL );
323 0 : ctx->hash_out.chunk = fd_dcache_compact_next( ctx->hash_out.chunk, sz, ctx->hash_out.chunk0, ctx->hash_out.wmark );
324 0 : }
325 :
326 : static inline int
327 : returnable_frag( fd_snapwm_tile_t * ctx,
328 : ulong in_idx FD_PARAM_UNUSED,
329 : ulong seq FD_PARAM_UNUSED,
330 : ulong sig,
331 : ulong chunk,
332 : ulong sz,
333 : ulong ctl FD_PARAM_UNUSED,
334 : ulong tsorig FD_PARAM_UNUSED,
335 : ulong tspub FD_PARAM_UNUSED,
336 0 : fd_stem_context_t * stem ) {
337 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
338 :
339 0 : int ret = 0;
340 0 : if( FD_LIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) ret = handle_data_frag( ctx, chunk, sz/*acc_cnt*/, stem );
341 0 : else if( FD_UNLIKELY( sig==FD_SNAPSHOT_HASH_MSG_EXPECTED ) ) handle_expected_hash_message( ctx, chunk, sz, stem );
342 0 : else handle_control_frag( ctx, sig, stem );
343 :
344 0 : return ret;
345 0 : }
346 :
347 : static ulong
348 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
349 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
350 : ulong out_fds_cnt,
351 0 : int * out_fds ) {
352 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
353 :
354 0 : ulong out_cnt = 0;
355 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
356 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
357 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
358 0 : }
359 :
360 0 : return out_cnt;
361 0 : }
362 :
363 : static ulong
364 : populate_allowed_seccomp( fd_topo_t const * topo,
365 : fd_topo_tile_t const * tile,
366 : ulong out_cnt,
367 0 : struct sock_filter * out ) {
368 0 : (void)topo; (void)tile;
369 0 : return fd_snapwm_vinyl_seccomp( out_cnt, out );
370 0 : }
371 :
372 : static void
373 : privileged_init( fd_topo_t * topo,
374 0 : fd_topo_tile_t * tile ) {
375 0 : fd_snapwm_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
376 0 : memset( ctx, 0, sizeof(fd_snapwm_tile_t) );
377 0 : FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
378 :
379 0 : fd_snapwm_vinyl_privileged_init( ctx, topo, tile );
380 0 : }
381 :
382 : static inline fd_snapwm_out_link_t
383 : out1( fd_topo_t const * topo,
384 : fd_topo_tile_t const * tile,
385 0 : char const * name ) {
386 0 : ulong idx = fd_topo_find_tile_out_link( topo, tile, name, 0UL );
387 :
388 0 : fd_topo_link_t const * out_link = &topo->links[ tile->out_link_id[ idx ] ];
389 :
390 0 : if( FD_UNLIKELY( idx==ULONG_MAX ) ) return (fd_snapwm_out_link_t){0};
391 :
392 0 : ulong mtu = topo->links[ tile->out_link_id[ idx ] ].mtu;
393 0 : if( FD_UNLIKELY( mtu==0UL ) ) return (fd_snapwm_out_link_t){0};
394 :
395 0 : void * mem = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ idx ] ].dcache_obj_id ].wksp_id ].wksp;
396 0 : ulong chunk0 = fd_dcache_compact_chunk0( mem, topo->links[ tile->out_link_id[ idx ] ].dcache );
397 0 : ulong wmark = fd_dcache_compact_wmark ( mem, topo->links[ tile->out_link_id[ idx ] ].dcache, mtu );
398 0 : ulong depth = out_link->depth;
399 :
400 : /* This only works when there is a single consumer for the given
401 : output link. */
402 0 : ulong const * consumer_fseq = NULL;
403 0 : ulong consumer_cnt = 0UL;
404 :
405 0 : for( ulong tile_idx=0UL; tile_idx<topo->tile_cnt; tile_idx++ ) {
406 0 : fd_topo_tile_t const * consumer_tile = &topo->tiles[ tile_idx ];
407 0 : for( ulong i=0UL; i<consumer_tile->in_cnt; i++ ) {
408 0 : if( consumer_tile->in_link_id[ i ]==out_link->id ) {
409 0 : consumer_fseq = consumer_tile->in_link_fseq[ i ];
410 0 : consumer_cnt++;
411 0 : }
412 0 : }
413 0 : }
414 0 : FD_TEST( consumer_cnt==1UL );
415 0 : FD_TEST( !!consumer_fseq );
416 :
417 0 : return (fd_snapwm_out_link_t){ .idx = idx, .mem = mem, .chunk0 = chunk0, .wmark = wmark, .chunk = chunk0,
418 0 : .mtu = mtu, .depth = depth, .consumer_fseq = consumer_fseq };
419 0 : }
420 :
421 : FD_FN_UNUSED static void
422 : unprivileged_init( fd_topo_t * topo,
423 0 : fd_topo_tile_t * tile ) {
424 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
425 :
426 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
427 0 : fd_snapwm_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapwm_tile_t), sizeof(fd_snapwm_tile_t) );
428 0 : void * _io_wd = FD_SCRATCH_ALLOC_APPEND( l, fd_vinyl_io_wd_align(), fd_vinyl_io_wd_footprint( tile->snapwm.snapwr_depth ) );
429 0 : void * _io_mm = FD_SCRATCH_ALLOC_APPEND( l, fd_vinyl_io_mm_align(), fd_vinyl_io_mm_footprint( FD_SNAPWM_IO_SPAD_MAX ) );
430 :
431 0 : ctx->full = 1;
432 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
433 0 : ctx->lthash_disabled = tile->snapwm.lthash_disabled;
434 :
435 0 : ctx->boot_timestamp = fd_log_wallclock();
436 :
437 0 : fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
438 :
439 0 : if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
440 0 : if( FD_UNLIKELY( tile->in_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 2", tile->in_cnt ));
441 0 : if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt ));
442 :
443 0 : ulong out_link_ct_idx = fd_topo_find_tile_out_link( topo, tile, "snapwm_ct", 0UL );
444 0 : if( out_link_ct_idx==ULONG_MAX ) out_link_ct_idx = fd_topo_find_tile_out_link( topo, tile, "snapwm_lv", 0UL );
445 0 : if( FD_UNLIKELY( out_link_ct_idx==ULONG_MAX ) ) FD_LOG_ERR(( "tile `" NAME "` missing required out link `snapwm_ct` or `snapwm_lv`" ));
446 0 : fd_topo_link_t * snapwm_out_link = &topo->links[ tile->out_link_id[ out_link_ct_idx ] ];
447 0 : ctx->out_ct_idx = out_link_ct_idx;
448 :
449 0 : if( 0==strcmp( snapwm_out_link->name, "snapwm_lv" ) ) {
450 0 : ctx->hash_out = out1( topo, tile, "snapwm_lv" );
451 0 : FD_TEST( ctx->hash_out.mtu==FD_SNAPWM_DUP_META_BATCH_SZ );
452 0 : }
453 :
454 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
455 0 : fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ i ] ];
456 0 : if( 0==strcmp( in_link->name, "snapin_wm" ) ) {
457 0 : fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
458 0 : ctx->in.wksp = in_wksp->wksp;
459 0 : ctx->in.chunk0 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
460 0 : ctx->in.wmark = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
461 0 : ctx->in.mtu = in_link->mtu;
462 0 : ctx->in.pos = 0UL;
463 0 : } else if( 0==strcmp( in_link->name, "snapin_txn" ) ) {
464 : /* snapwm needs all txn_cache data in order to verify the slot
465 : deltas with the slot history. To make this possible, snapin
466 : uses the dcache of the snapin_txn link as the scratch memory.
467 : The app field of the dcache is used to communicate the
468 : txncache_entries_len value. */
469 0 : fd_topo_wksp_t * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
470 0 : ulong chunk0 = fd_dcache_compact_chunk0( in_wksp->wksp, in_link->dcache );
471 0 : ctx->txncache_entries = fd_chunk_to_laddr( in_wksp->wksp, chunk0 );
472 0 : ctx->txncache_entries_len_ptr = (ulong const *)fd_dcache_app_laddr_const( in_link->dcache );
473 0 : } else {
474 0 : FD_LOG_ERR(( "tile `" NAME "` unrecognized in link %s", in_link->name ));
475 0 : }
476 0 : }
477 0 : FD_TEST( !!ctx->in.wksp );
478 0 : FD_TEST( !!ctx->txncache_entries );
479 :
480 0 : fd_snapwm_vinyl_unprivileged_init( ctx, topo, tile, _io_mm, _io_wd );
481 0 : }
482 :
483 : /* Control fragments can result in one extra publish to forward the
484 : message down the pipeline, in addition to the result / malformed
485 : message. It can send one duplicate account message as well.
486 : When fd_snapwm_vinyl_txn_commit is invoked, the latter will handle
487 : fseq checks internally, since the amount of messages it needs to
488 : send far exceed the STEM_BURST. */
489 0 : #define STEM_BURST 3UL
490 :
491 0 : #define STEM_LAZY 1000L
492 :
493 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapwm_tile_t
494 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapwm_tile_t)
495 :
496 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
497 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
498 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
499 :
500 : #include "../../disco/stem/fd_stem.c"
501 :
502 : fd_topo_run_tile_t fd_tile_snapwm = {
503 : .name = NAME,
504 : .populate_allowed_fds = populate_allowed_fds,
505 : .populate_allowed_seccomp = populate_allowed_seccomp,
506 : .scratch_align = scratch_align,
507 : .scratch_footprint = scratch_footprint,
508 : .privileged_init = privileged_init,
509 : .unprivileged_init = unprivileged_init,
510 : .run = stem_run,
511 : };
512 :
513 : #undef NAME
|