Line data Source code
1 : #include "fd_snapwm_tile_private.h"
2 : #include "utils/fd_ssctrl.h"
3 : #include "utils/fd_vinyl_io_wd.h"
4 :
5 : #include "../../disco/topo/fd_topo.h"
6 : #include "../../disco/metrics/fd_metrics.h"
7 : #include "../../flamenco/runtime/sysvar/fd_sysvar_slot_history.h"
8 : #include "../../flamenco/runtime/fd_system_ids.h"
9 :
10 : #define NAME "snapwm"
11 :
12 : /* The snapwm tile is a state machine responsible for loading accounts
13 : into vinyl database. It processes pre-assembled bstream pairs
14 : and handles vinyl's meta_map and bstream actual allocation. */
15 :
16 : static inline int
17 0 : should_shutdown( fd_snapwm_tile_t * ctx ) {
18 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN ) ) {
19 0 : ulong accounts_dup = ctx->metrics.accounts_ignored + ctx->metrics.accounts_replaced;
20 0 : ulong accounts = ctx->metrics.accounts_loaded - accounts_dup;
21 0 : long elapsed_ns = fd_log_wallclock() - ctx->boot_timestamp;
22 0 : FD_LOG_NOTICE(( "loaded %.1fM accounts (%.1fM dups) from snapshot in %.3f seconds",
23 0 : (double)accounts/1e6,
24 0 : (double)accounts_dup/1e6,
25 0 : (double)elapsed_ns/1e9 ));
26 0 : }
27 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
28 0 : }
29 :
30 : static ulong
31 0 : scratch_align( void ) {
32 0 : return 512UL;
33 0 : }
34 :
35 : static ulong
36 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
37 0 : (void)tile;
38 0 : ulong l = FD_LAYOUT_INIT;
39 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snapwm_tile_t), sizeof(fd_snapwm_tile_t) );
40 0 : l = FD_LAYOUT_APPEND( l, fd_vinyl_io_wd_align(), fd_vinyl_io_wd_footprint( tile->snapwm.snapwr_depth ) );
41 0 : l = FD_LAYOUT_APPEND( l, fd_vinyl_io_mm_align(), fd_vinyl_io_mm_footprint( FD_SNAPWM_IO_SPAD_MAX ) );
42 0 : return FD_LAYOUT_FINI( l, scratch_align() );
43 0 : }
44 :
45 : static void
46 0 : metrics_write( fd_snapwm_tile_t * ctx ) {
47 0 : FD_MGAUGE_SET( SNAPWM, STATE, (ulong)ctx->state );
48 0 : FD_MGAUGE_SET( SNAPWM, ACCOUNTS_LOADED, ctx->metrics.accounts_loaded );
49 0 : FD_MGAUGE_SET( SNAPWM, ACCOUNTS_REPLACED, ctx->metrics.accounts_replaced );
50 0 : FD_MGAUGE_SET( SNAPWM, ACCOUNTS_IGNORED, ctx->metrics.accounts_ignored );
51 0 : FD_MGAUGE_SET( SNAPWM, ACCOUNTS_ACTIVE, ctx->vinyl.pair_cnt );
52 0 : }
53 :
54 : static inline void
55 : seq_to_tsorig_tspub( ulong * tsorig,
56 : ulong * tspub,
57 0 : ulong seq ) {
58 0 : *tsorig = ( seq >> 0 ) & ((1UL<<32)-1UL);
59 0 : *tspub = ( seq >> 32 ) & ((1UL<<32)-1UL);
60 0 : }
61 :
62 : static void
63 : transition_malformed( fd_snapwm_tile_t * ctx,
64 0 : fd_stem_context_t * stem ) {
65 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
66 0 : fd_stem_publish( stem, ctx->out_ct_idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
67 0 : }
68 :
69 : /* verify_slot_deltas_with_slot_history verifies the 'SlotHistory'
70 : sysvar account after loading a snapshot. The full database
71 : architecture is only instantiated after snapshot loading, so this
72 : function uses a primitive/cache-free mechanism to query the parts of
73 : the account database that are available.
74 :
75 : Returns 0 if verification passed, -1 if not. */
76 :
77 : static int
78 0 : verify_slot_deltas_with_slot_history( fd_snapwm_tile_t * ctx ) {
79 : /* Do a raw read of the slot history sysvar account from the database.
80 : Requires approx 500kB stack space. */
81 :
82 0 : fd_account_meta_t meta;
83 0 : uchar data[ FD_SYSVAR_SLOT_HISTORY_BINCODE_SZ ];
84 0 : union {
85 0 : uchar buf[ FD_SYSVAR_SLOT_HISTORY_FOOTPRINT ];
86 0 : fd_slot_history_global_t o;
87 0 : } decoded;
88 0 : FD_STATIC_ASSERT( offsetof( __typeof__(decoded), buf)==offsetof( __typeof__(decoded), o ), memory_layout );
89 0 : fd_snapwm_vinyl_read_account( ctx, &fd_sysvar_slot_history_id, &meta, data, sizeof(data) );
90 :
91 0 : if( FD_UNLIKELY( !meta.lamports || !meta.dlen ) ) {
92 0 : FD_LOG_WARNING(( "SlotHistory sysvar account missing or empty" ));
93 0 : return -1;
94 0 : }
95 0 : if( FD_UNLIKELY( meta.dlen > FD_SYSVAR_SLOT_HISTORY_BINCODE_SZ ) ) {
96 0 : FD_LOG_WARNING(( "SlotHistory sysvar account data too large: %u bytes", meta.dlen ));
97 0 : return -1;
98 0 : }
99 0 : if( FD_UNLIKELY( !fd_memeq( meta.owner, fd_sysvar_owner_id.uc, sizeof(fd_pubkey_t) ) ) ) {
100 0 : FD_BASE58_ENCODE_32_BYTES( meta.owner, owner_b58 );
101 0 : FD_LOG_WARNING(( "SlotHistory sysvar owner is invalid: %s != sysvar_owner_id", owner_b58 ));
102 0 : return -1;
103 0 : }
104 :
105 0 : if( FD_UNLIKELY(
106 0 : !fd_bincode_decode_static_global(
107 0 : slot_history,
108 0 : &decoded.o,
109 0 : data,
110 0 : meta.dlen,
111 0 : NULL )
112 0 : ) ) {
113 0 : FD_LOG_WARNING(( "SlotHistory sysvar account data is corrupt" ));
114 0 : return -1;
115 0 : }
116 :
117 0 : ulong txncache_entries_len = fd_ulong_load_8( ctx->txncache_entries_len_ptr );
118 0 : if( FD_UNLIKELY( !txncache_entries_len ) ) FD_LOG_WARNING(( "txncache_entries_len %lu", txncache_entries_len ));
119 :
120 0 : for( ulong i=0UL; i<txncache_entries_len; i++ ) {
121 0 : fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[i];
122 0 : if( FD_UNLIKELY( fd_sysvar_slot_history_find_slot( &decoded.o, entry->slot )!=FD_SLOT_HISTORY_SLOT_FOUND ) ) {
123 0 : FD_LOG_WARNING(( "slot %lu missing from SlotHistory sysvar account", entry->slot ));
124 0 : return -1;
125 0 : }
126 0 : }
127 0 : return 0;
128 0 : }
129 :
130 : static int
131 : handle_data_frag( fd_snapwm_tile_t * ctx,
132 : ulong chunk,
133 : ulong acc_cnt,
134 0 : fd_stem_context_t * stem ) {
135 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_FINISHING ) ) {
136 0 : transition_malformed( ctx, stem );
137 0 : return 0;
138 0 : }
139 0 : else if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
140 : /* Ignore all data frags after observing an error in the stream until
141 : we receive fail & init control messages to restart processing. */
142 0 : return 0;
143 0 : }
144 0 : else if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) ) {
145 0 : FD_LOG_ERR(( "invalid state for data frag %d", ctx->state ));
146 0 : }
147 :
148 0 : FD_TEST( chunk>=ctx->in.chunk0 && chunk<=ctx->in.wmark && acc_cnt<=FD_SNAPWM_PAIR_BATCH_CNT_MAX );
149 :
150 0 : fd_snapwm_vinyl_process_account( ctx, chunk, acc_cnt, stem );
151 :
152 0 : return 0;
153 0 : }
154 :
155 : static void
156 : handle_control_frag( fd_snapwm_tile_t * ctx,
157 : ulong sig,
158 0 : fd_stem_context_t * stem ) {
159 0 : ulong tsorig = 0UL;
160 0 : ulong tspub = 0UL;
161 :
162 0 : switch( sig ) {
163 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
164 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
165 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
166 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
167 0 : ctx->full = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
168 0 : if( sig==FD_SNAPSHOT_MSG_CTRL_INIT_INCR ) {
169 0 : fd_snapwm_vinyl_txn_begin( ctx );
170 0 : }
171 0 : fd_snapwm_vinyl_wd_init( ctx );
172 :
173 : /* Rewind metric counters (no-op unless recovering from a fail) */
174 0 : if( sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL ) {
175 0 : ctx->metrics.accounts_loaded = ctx->metrics.full_accounts_loaded = 0;
176 0 : ctx->metrics.accounts_replaced = ctx->metrics.full_accounts_replaced = 0;
177 0 : ctx->metrics.accounts_ignored = ctx->metrics.full_accounts_ignored = 0;
178 0 : ctx->vinyl.pair_cnt = 0UL;
179 0 : } else {
180 0 : ctx->metrics.accounts_loaded = ctx->metrics.full_accounts_loaded;
181 0 : ctx->metrics.accounts_replaced = ctx->metrics.full_accounts_replaced;
182 0 : ctx->metrics.accounts_ignored = ctx->metrics.full_accounts_ignored;
183 0 : }
184 0 : break;
185 :
186 0 : case FD_SNAPSHOT_MSG_CTRL_FAIL:
187 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ||
188 0 : ctx->state==FD_SNAPSHOT_STATE_ERROR );
189 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
190 :
191 0 : fd_snapwm_vinyl_wd_fini( ctx );
192 0 : if( ctx->vinyl.txn_active ) {
193 0 : fd_snapwm_vinyl_txn_cancel( ctx );
194 0 : }
195 0 : if( !ctx->lthash_disabled ) {
196 0 : seq_to_tsorig_tspub( &tsorig, &tspub, ((fd_vinyl_io_wd_t const *)ctx->vinyl.io_wd)->wr_seq );
197 0 : }
198 0 : break;
199 :
200 0 : case FD_SNAPSHOT_MSG_CTRL_NEXT: {
201 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ||
202 0 : ctx->state==FD_SNAPSHOT_STATE_ERROR );
203 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
204 :
205 : /* Backup metric counters */
206 0 : ctx->metrics.full_accounts_loaded = ctx->metrics.accounts_loaded;
207 0 : ctx->metrics.full_accounts_replaced = ctx->metrics.accounts_replaced;
208 0 : ctx->metrics.full_accounts_ignored = ctx->metrics.accounts_ignored;
209 :
210 0 : fd_snapwm_vinyl_wd_fini( ctx );
211 0 : if( ctx->vinyl.txn_active ) {
212 0 : fd_snapwm_vinyl_txn_commit( ctx, stem );
213 0 : }
214 0 : if( !ctx->lthash_disabled ) {
215 0 : seq_to_tsorig_tspub( &tsorig, &tspub, ((fd_vinyl_io_wd_t const *)ctx->vinyl.io_wd)->wr_seq );
216 0 : }
217 0 : break;
218 0 : }
219 :
220 0 : case FD_SNAPSHOT_MSG_CTRL_DONE: {
221 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ||
222 0 : ctx->state==FD_SNAPSHOT_STATE_ERROR );
223 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
224 :
225 0 : fd_snapwm_vinyl_wd_fini( ctx );
226 0 : if( ctx->vinyl.txn_active ) {
227 0 : fd_snapwm_vinyl_txn_commit( ctx, stem );
228 0 : }
229 0 : if( !ctx->lthash_disabled ) {
230 0 : seq_to_tsorig_tspub( &tsorig, &tspub, ((fd_vinyl_io_wd_t const *)ctx->vinyl.io_wd)->wr_seq );
231 0 : }
232 :
233 0 : if( FD_UNLIKELY( verify_slot_deltas_with_slot_history( ctx ) ) ) {
234 0 : FD_LOG_WARNING(( "slot deltas verification failed" ));
235 0 : transition_malformed( ctx, stem );
236 0 : break;
237 0 : }
238 0 : break;
239 0 : }
240 :
241 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
242 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
243 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
244 0 : fd_snapwm_vinyl_shutdown( ctx );
245 0 : break;
246 :
247 0 : case FD_SNAPSHOT_MSG_CTRL_ERROR:
248 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
249 0 : fd_snapwm_vinyl_wd_fini( ctx );
250 0 : if( ctx->vinyl.txn_active ) {
251 0 : fd_snapwm_vinyl_txn_cancel( ctx );
252 0 : }
253 0 : if( !ctx->lthash_disabled ) {
254 0 : seq_to_tsorig_tspub( &tsorig, &tspub, ((fd_vinyl_io_wd_t const *)ctx->vinyl.io_wd)->wr_seq );
255 0 : }
256 0 : break;
257 :
258 0 : default:
259 0 : FD_LOG_ERR(( "unexpected control sig %lu", sig ));
260 0 : return;
261 0 : }
262 :
263 : /* Forward the control message down the pipeline */
264 0 : fd_stem_publish( stem, ctx->out_ct_idx, sig, 0UL, 0UL, 0UL, tsorig, tspub );
265 0 : }
266 :
267 : static inline void
268 : handle_expected_hash_message( fd_snapwm_tile_t * ctx,
269 : ulong chunk,
270 : ulong sz,
271 0 : fd_stem_context_t * stem ) {
272 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return;
273 0 : uchar * src = fd_chunk_to_laddr( ctx->in.wksp, chunk );
274 0 : uchar * dst = fd_chunk_to_laddr( ctx->hash_out.mem, ctx->hash_out.chunk );
275 0 : memcpy( dst, src, sz );
276 0 : fd_stem_publish( stem, ctx->out_ct_idx, FD_SNAPSHOT_HASH_MSG_EXPECTED, ctx->hash_out.chunk, sz, 0UL, 0UL, 0UL );
277 0 : ctx->hash_out.chunk = fd_dcache_compact_next( ctx->hash_out.chunk, sz, ctx->hash_out.chunk0, ctx->hash_out.wmark );
278 0 : }
279 :
280 : static inline int
281 : returnable_frag( fd_snapwm_tile_t * ctx,
282 : ulong in_idx FD_PARAM_UNUSED,
283 : ulong seq FD_PARAM_UNUSED,
284 : ulong sig,
285 : ulong chunk,
286 : ulong sz,
287 : ulong ctl FD_PARAM_UNUSED,
288 : ulong tsorig FD_PARAM_UNUSED,
289 : ulong tspub FD_PARAM_UNUSED,
290 0 : fd_stem_context_t * stem ) {
291 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
292 :
293 0 : int ret = 0;
294 0 : if( FD_LIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) ret = handle_data_frag( ctx, chunk, sz/*acc_cnt*/, stem );
295 0 : else if( FD_UNLIKELY( sig==FD_SNAPSHOT_HASH_MSG_EXPECTED ) ) handle_expected_hash_message( ctx, chunk, sz, stem );
296 0 : else handle_control_frag( ctx, sig, stem );
297 :
298 0 : return ret;
299 0 : }
300 :
301 : static ulong
302 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
303 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
304 : ulong out_fds_cnt,
305 0 : int * out_fds ) {
306 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
307 :
308 0 : ulong out_cnt = 0;
309 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
310 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
311 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
312 0 : }
313 :
314 0 : return out_cnt;
315 0 : }
316 :
317 : static ulong
318 : populate_allowed_seccomp( fd_topo_t const * topo,
319 : fd_topo_tile_t const * tile,
320 : ulong out_cnt,
321 0 : struct sock_filter * out ) {
322 0 : (void)topo; (void)tile;
323 0 : return fd_snapwm_vinyl_seccomp( out_cnt, out );
324 0 : }
325 :
326 : static void
327 : privileged_init( fd_topo_t * topo,
328 0 : fd_topo_tile_t * tile ) {
329 0 : fd_snapwm_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
330 0 : memset( ctx, 0, sizeof(fd_snapwm_tile_t) );
331 0 : FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
332 :
333 0 : fd_snapwm_vinyl_privileged_init( ctx, topo, tile );
334 0 : }
335 :
336 : static inline fd_snapwm_out_link_t
337 : out1( fd_topo_t const * topo,
338 : fd_topo_tile_t const * tile,
339 0 : char const * name ) {
340 0 : ulong idx = fd_topo_find_tile_out_link( topo, tile, name, 0UL );
341 :
342 0 : fd_topo_link_t const * out_link = &topo->links[ tile->out_link_id[ idx ] ];
343 :
344 0 : if( FD_UNLIKELY( idx==ULONG_MAX ) ) return (fd_snapwm_out_link_t){0};
345 :
346 0 : ulong mtu = topo->links[ tile->out_link_id[ idx ] ].mtu;
347 0 : if( FD_UNLIKELY( mtu==0UL ) ) return (fd_snapwm_out_link_t){0};
348 :
349 0 : void * mem = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ idx ] ].dcache_obj_id ].wksp_id ].wksp;
350 0 : ulong chunk0 = fd_dcache_compact_chunk0( mem, topo->links[ tile->out_link_id[ idx ] ].dcache );
351 0 : ulong wmark = fd_dcache_compact_wmark ( mem, topo->links[ tile->out_link_id[ idx ] ].dcache, mtu );
352 0 : ulong depth = out_link->depth;
353 :
354 : /* This only works when there is a single consumer for the given
355 : output link. */
356 0 : ulong const * consumer_fseq = NULL;
357 0 : ulong consumer_cnt = 0UL;
358 :
359 0 : for( ulong tile_idx=0UL; tile_idx<topo->tile_cnt; tile_idx++ ) {
360 0 : fd_topo_tile_t const * consumer_tile = &topo->tiles[ tile_idx ];
361 0 : for( ulong i=0UL; i<consumer_tile->in_cnt; i++ ) {
362 0 : if( consumer_tile->in_link_id[ i ]==out_link->id ) {
363 0 : consumer_fseq = consumer_tile->in_link_fseq[ i ];
364 0 : consumer_cnt++;
365 0 : }
366 0 : }
367 0 : }
368 0 : FD_TEST( consumer_cnt==1UL );
369 0 : FD_TEST( !!consumer_fseq );
370 :
371 0 : return (fd_snapwm_out_link_t){ .idx = idx, .mem = mem, .chunk0 = chunk0, .wmark = wmark, .chunk = chunk0,
372 0 : .mtu = mtu, .depth = depth, .consumer_fseq = consumer_fseq };
373 0 : }
374 :
375 : FD_FN_UNUSED static void
376 : unprivileged_init( fd_topo_t * topo,
377 0 : fd_topo_tile_t * tile ) {
378 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
379 :
380 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
381 0 : fd_snapwm_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapwm_tile_t), sizeof(fd_snapwm_tile_t) );
382 0 : void * _io_wd = FD_SCRATCH_ALLOC_APPEND( l, fd_vinyl_io_wd_align(), fd_vinyl_io_wd_footprint( tile->snapwm.snapwr_depth ) );
383 0 : void * _io_mm = FD_SCRATCH_ALLOC_APPEND( l, fd_vinyl_io_mm_align(), fd_vinyl_io_mm_footprint( FD_SNAPWM_IO_SPAD_MAX ) );
384 :
385 0 : ctx->full = 1;
386 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
387 0 : ctx->lthash_disabled = tile->snapwm.lthash_disabled;
388 :
389 0 : ctx->boot_timestamp = fd_log_wallclock();
390 :
391 0 : fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
392 :
393 0 : if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
394 0 : if( FD_UNLIKELY( tile->in_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 2", tile->in_cnt ));
395 0 : if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt ));
396 :
397 0 : ulong out_link_ct_idx = fd_topo_find_tile_out_link( topo, tile, "snapwm_ct", 0UL );
398 0 : if( out_link_ct_idx==ULONG_MAX ) out_link_ct_idx = fd_topo_find_tile_out_link( topo, tile, "snapwm_lv", 0UL );
399 0 : if( FD_UNLIKELY( out_link_ct_idx==ULONG_MAX ) ) FD_LOG_ERR(( "tile `" NAME "` missing required out link `snapwm_ct` or `snapwm_lv`" ));
400 0 : fd_topo_link_t * snapwm_out_link = &topo->links[ tile->out_link_id[ out_link_ct_idx ] ];
401 0 : ctx->out_ct_idx = out_link_ct_idx;
402 :
403 0 : if( 0==strcmp( snapwm_out_link->name, "snapwm_lv" ) ) {
404 0 : ctx->hash_out = out1( topo, tile, "snapwm_lv" );
405 0 : FD_TEST( ctx->hash_out.mtu==FD_SNAPWM_DUP_META_BATCH_SZ );
406 0 : }
407 :
408 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
409 0 : fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ i ] ];
410 0 : if( 0==strcmp( in_link->name, "snapin_wm" ) ) {
411 0 : fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
412 0 : ctx->in.wksp = in_wksp->wksp;
413 0 : ctx->in.chunk0 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
414 0 : ctx->in.wmark = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
415 0 : ctx->in.mtu = in_link->mtu;
416 0 : ctx->in.pos = 0UL;
417 0 : } else if( 0==strcmp( in_link->name, "snapin_txn" ) ) {
418 : /* snapwm needs all txn_cache data in order to verify the slot
419 : deltas with the slot history. To make this possible, snapin
420 : uses the dcache of the snapin_txn link as the scratch memory.
421 : The app field of the dcache is used to communicate the
422 : txncache_entries_len value. */
423 0 : fd_topo_wksp_t * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
424 0 : ulong chunk0 = fd_dcache_compact_chunk0( in_wksp->wksp, in_link->dcache );
425 0 : ctx->txncache_entries = fd_chunk_to_laddr( in_wksp->wksp, chunk0 );
426 0 : ctx->txncache_entries_len_ptr = (ulong const *)fd_dcache_app_laddr_const( in_link->dcache );
427 0 : } else {
428 0 : FD_LOG_ERR(( "tile `" NAME "` unrecognized in link %s", in_link->name ));
429 0 : }
430 0 : }
431 0 : FD_TEST( !!ctx->in.wksp );
432 0 : FD_TEST( !!ctx->txncache_entries );
433 :
434 0 : fd_snapwm_vinyl_unprivileged_init( ctx, topo, tile, _io_mm, _io_wd );
435 0 : }
436 :
437 : /* Control fragments can result in one extra publish to forward the
438 : message down the pipeline, in addition to the result / malformed
439 : message. It can send one duplicate account message as well.
440 : When fd_snapwm_vinyl_txn_commit is invoked, the latter will handle
441 : fseq checks internally, since the amount of messages it needs to
442 : send far exceed the STEM_BURST. */
443 0 : #define STEM_BURST 3UL
444 :
445 0 : #define STEM_LAZY 1000L
446 :
447 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapwm_tile_t
448 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapwm_tile_t)
449 :
450 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
451 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
452 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
453 :
454 : #include "../../disco/stem/fd_stem.c"
455 :
456 : fd_topo_run_tile_t fd_tile_snapwm = {
457 : .name = NAME,
458 : .populate_allowed_fds = populate_allowed_fds,
459 : .populate_allowed_seccomp = populate_allowed_seccomp,
460 : .scratch_align = scratch_align,
461 : .scratch_footprint = scratch_footprint,
462 : .privileged_init = privileged_init,
463 : .unprivileged_init = unprivileged_init,
464 : .run = stem_run,
465 : };
466 :
467 : #undef NAME
|