Line data Source code
1 : #define _GNU_SOURCE
2 : #include "../../disco/tiles.h"
3 : #include "generated/fd_replay_tile_seccomp.h"
4 :
5 : #include "fd_replay_notif.h"
6 :
7 : #include "../../disco/keyguard/fd_keyload.h"
8 : #include "../../util/pod/fd_pod_format.h"
9 : #include "../../flamenco/runtime/fd_txncache.h"
10 : #include "../../flamenco/runtime/context/fd_capture_ctx.h"
11 : #include "../../flamenco/runtime/context/fd_exec_slot_ctx.h"
12 : #include "../../flamenco/runtime/program/fd_bpf_program_util.h"
13 : #include "../../flamenco/runtime/sysvar/fd_sysvar_slot_history.h"
14 : #include "../../flamenco/runtime/fd_hashes.h"
15 : #include "../../flamenco/runtime/fd_runtime_init.h"
16 : #include "../../flamenco/snapshot/fd_snapshot.h"
17 : #include "../../flamenco/stakes/fd_stakes.h"
18 : #include "../../flamenco/runtime/fd_runtime.h"
19 : #include "../../flamenco/runtime/fd_runtime_public.h"
20 : #include "../../flamenco/rewards/fd_rewards.h"
21 : #include "../../disco/metrics/fd_metrics.h"
22 : #include "../../choreo/fd_choreo.h"
23 : #include "../../disco/plugin/fd_plugin.h"
24 : #include "fd_exec.h"
25 :
26 : #include <arpa/inet.h>
27 : #include <errno.h>
28 : #include <fcntl.h>
29 : #include <linux/unistd.h>
30 : #include <netdb.h>
31 : #include <netinet/in.h>
32 : #include <sys/random.h>
33 : #include <sys/socket.h>
34 : #include <sys/stat.h>
35 : #include <sys/types.h>
36 : #include <unistd.h>
37 :
38 : #define DEQUE_NAME fd_exec_slice
39 0 : #define DEQUE_T ulong
40 0 : #define DEQUE_MAX USHORT_MAX + 1
41 : #include "../../util/tmpl/fd_deque.c"
42 :
43 : /* An estimate of the max number of transactions in a block. If there are more
44 : transactions, they must be split into multiple sets. */
45 : #define MAX_TXNS_PER_REPLAY ( ( FD_SHRED_BLK_MAX * FD_SHRED_MAX_SZ) / FD_TXN_MIN_SERIALIZED_SZ )
46 :
47 : #define PLUGIN_PUBLISH_TIME_NS ((long)60e9)
48 :
49 0 : #define REPAIR_IN_IDX (0UL)
50 0 : #define PACK_IN_IDX (1UL)
51 : #define SHRED_IN_IDX (3UL)
52 :
53 0 : #define EXEC_TXN_BUSY (0xA)
54 0 : #define EXEC_TXN_READY (0xB)
55 :
56 : #define BANK_HASH_CMP_LG_MAX (16UL)
57 :
58 : struct fd_replay_out_link {
59 : ulong idx;
60 :
61 : fd_frag_meta_t * mcache;
62 : ulong * sync;
63 : ulong depth;
64 : ulong seq;
65 :
66 : fd_wksp_t * mem;
67 : ulong chunk0;
68 : ulong wmark;
69 : ulong chunk;
70 :
71 : };
72 : typedef struct fd_replay_out_link fd_replay_out_link_t;
73 :
74 : struct fd_replay_tile_metrics {
75 : ulong slot;
76 : ulong last_voted_slot;
77 : };
78 : typedef struct fd_replay_tile_metrics fd_replay_tile_metrics_t;
79 : #define FD_REPLAY_TILE_METRICS_FOOTPRINT ( sizeof( fd_replay_tile_metrics_t ) )
80 :
81 : struct fd_replay_tile_ctx {
82 : fd_wksp_t * wksp;
83 : fd_wksp_t * blockstore_wksp;
84 : fd_wksp_t * status_cache_wksp;
85 :
86 : fd_wksp_t * runtime_public_wksp;
87 : fd_runtime_public_t * runtime_public;
88 :
89 : // Store tile input
90 : fd_wksp_t * repair_in_mem;
91 : ulong repair_in_chunk0;
92 : ulong repair_in_wmark;
93 :
94 : // Pack tile input
95 : fd_wksp_t * pack_in_mem;
96 : ulong pack_in_chunk0;
97 : ulong pack_in_wmark;
98 :
99 : /* Tower tile input */
100 : ulong tower_in_idx;
101 :
102 : // Notification output defs
103 : fd_replay_out_link_t notif_out[1];
104 :
105 : // Stake weights output link defs
106 : fd_replay_out_link_t stake_out[1];
107 :
108 : ulong tower_out_idx;
109 : fd_wksp_t * tower_out_mem;
110 : ulong tower_out_chunk0;
111 : ulong tower_out_wmark;
112 : ulong tower_out_chunk;
113 :
114 : // Inputs to plugin/gui
115 : fd_replay_out_link_t plugin_out[1];
116 : fd_replay_out_link_t votes_plugin_out[1];
117 : long last_plugin_push_time;
118 :
119 : char const * blockstore_checkpt;
120 : int tx_metadata_storage;
121 : char const * funk_checkpt;
122 : char const * genesis;
123 : char const * incremental;
124 : char const * snapshot;
125 : char const * snapshot_dir;
126 : int incremental_src_type;
127 : int snapshot_src_type;
128 :
129 : /* Do not modify order! This is join-order in unprivileged_init. */
130 :
131 : fd_funk_t funk[1];
132 : fd_forks_t * forks;
133 :
134 : fd_pubkey_t validator_identity[1];
135 : fd_pubkey_t vote_authority[1];
136 : fd_pubkey_t vote_acc[1];
137 :
138 : /* Vote accounts in the current epoch. Lifetimes of the vote account
139 : addresses (pubkeys) are valid for the epoch (the pubkey memory is
140 : owned by the epoch bank). */
141 :
142 : fd_voter_t * epoch_voters; /* Map chain of slot->voter */
143 : fd_bank_hash_cmp_t * bank_hash_cmp; /* Maintains bank hashes seen from votes */
144 :
145 : /* Blockstore local join */
146 :
147 : fd_blockstore_t blockstore_ljoin;
148 : int blockstore_fd; /* file descriptor for archival file */
149 : fd_blockstore_t * blockstore;
150 :
151 : /* Updated during execution */
152 :
153 : fd_exec_slot_ctx_t * slot_ctx;
154 : fd_slice_exec_t slice_exec_ctx;
155 :
156 : /* TODO: Some of these arrays should be bitvecs that get masked into. */
157 : ulong exec_cnt;
158 : fd_replay_out_link_t exec_out [ FD_PACK_MAX_BANK_TILES ]; /* Sending to exec unexecuted txns */
159 : uchar exec_ready[ FD_PACK_MAX_BANK_TILES ]; /* Is tile ready */
160 : uint prev_ids [ FD_PACK_MAX_BANK_TILES ]; /* Previous txn id if any */
161 : ulong * exec_fseq [ FD_PACK_MAX_BANK_TILES ]; /* fseq of the last executed txn */
162 : int block_finalizing;
163 :
164 : ulong writer_cnt;
165 : ulong * writer_fseq[ FD_PACK_MAX_BANK_TILES ];
166 :
167 : /* Metadata updated during execution */
168 :
169 : ulong curr_slot;
170 : ulong parent_slot;
171 : ulong snapshot_slot;
172 : ulong * turbine_slot0;
173 : ulong * turbine_slot;
174 : ulong root; /* the root slot is the most recent slot to have reached
175 : max lockout in the tower */
176 : ulong flags;
177 : ulong bank_idx;
178 :
179 : /* Other metadata */
180 :
181 : ulong funk_seed;
182 : ulong status_cache_seed;
183 : fd_capture_ctx_t * capture_ctx;
184 : FILE * capture_file;
185 : FILE * slots_replayed_file;
186 :
187 : ulong * bank_busy[ FD_PACK_MAX_BANK_TILES ];
188 : ulong bank_cnt;
189 : fd_replay_out_link_t bank_out[ FD_PACK_MAX_BANK_TILES ]; /* Sending to PoH finished txns + a couple more tasks ??? */
190 :
191 :
192 : ulong * published_wmark; /* publish watermark. The watermark is defined as the
193 : minimum of the tower root (root above) and blockstore
194 : smr (blockstore->smr). The watermark is used to
195 : publish our fork-aware structures eg. blockstore,
196 : forks, ghost. In general, publishing has the effect of
197 : pruning minority forks in those structures,
198 : indicating that is ok to release the memory being
199 : occupied by said forks.
200 :
201 : The reason it has to be the minimum of the two, is the
202 : tower root can lag the SMR and vice versa, but both
203 : the fork-aware structures need to maintain information
204 : through both of those slots. */
205 :
206 : ulong * poh; /* proof-of-history slot */
207 : uint poh_init_done;
208 : int snapshot_init_done;
209 :
210 : int tower_checkpt_fileno;
211 :
212 : int vote;
213 : fd_pubkey_t validator_identity_pubkey[ 1 ];
214 :
215 : fd_txncache_t * status_cache;
216 : void * bmtree[ FD_PACK_MAX_BANK_TILES ];
217 :
218 : /* The spad allocators used by the executor tiles are NOT the same as the
219 : spad used for general, longer-lasting spad allocations. The lifetime of
220 : the exec spad is just through an execution. The runtime spad is scoped
221 : to the runtime. The top-most frame will persist for the entire duration
222 : of the process. There will also be a potential second frame that persists
223 : across multiple slots that is created for rewards distrobution. Every other
224 : spad frame should NOT exist beyond the scope of a block. */
225 :
226 : fd_spad_t * exec_spads[ FD_PACK_MAX_BANK_TILES ];
227 : fd_wksp_t * exec_spads_wksp[ FD_PACK_MAX_BANK_TILES ];
228 : ulong exec_spad_cnt;
229 :
230 : fd_spad_t * runtime_spad;
231 :
232 : fd_funk_txn_t * false_root;
233 :
234 : int read_only; /* The read-only slot is the slot the validator needs
235 : to replay through before it can proceed with any
236 : write operations such as voting or building blocks.
237 :
238 : This restriction is for safety reasons: the
239 : validator could otherwise equivocate a previous vote
240 : or block. */
241 :
242 : int blocked_on_mblock; /* Flag used for synchronizing on mblock boundaries. */
243 :
244 : /* Metrics */
245 : fd_replay_tile_metrics_t metrics;
246 :
247 : ulong * exec_slice_deque; /* Deque to buffer exec slices - lives in spad */
248 :
249 : ulong enable_bank_hash_cmp;
250 :
251 : fd_banks_t * banks;
252 : int is_booted;
253 : };
254 : typedef struct fd_replay_tile_ctx fd_replay_tile_ctx_t;
255 :
256 : FD_FN_CONST static inline ulong
257 0 : scratch_align( void ) {
258 0 : return 128UL;
259 0 : }
260 :
261 : FD_FN_PURE static inline ulong
262 0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
263 0 : return 24UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
264 0 : }
265 :
266 : FD_FN_PURE static inline ulong
267 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
268 :
269 : /* Do not modify order! This is join-order in unprivileged_init. */
270 :
271 0 : ulong l = FD_LAYOUT_INIT;
272 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
273 0 : l = FD_LAYOUT_APPEND( l, FD_CAPTURE_CTX_ALIGN, FD_CAPTURE_CTX_FOOTPRINT );
274 0 : l = FD_LAYOUT_APPEND( l, fd_forks_align(), fd_forks_footprint( FD_BLOCK_MAX ) );
275 0 : for( ulong i = 0UL; i<FD_PACK_MAX_BANK_TILES; i++ ) {
276 0 : l = FD_LAYOUT_APPEND( l, FD_BMTREE_COMMIT_ALIGN, FD_BMTREE_COMMIT_FOOTPRINT(0) );
277 0 : }
278 0 : l = FD_LAYOUT_APPEND( l, 128UL, FD_SLICE_MAX );
279 0 : l = FD_LAYOUT_FINI ( l, scratch_align() );
280 0 : return l;
281 0 : }
282 :
283 : /* Receives from repair newly completed slices of executable slots on
284 : the frontier. Guaranteed good properties, like happiness, in order,
285 : executable immediately as long as the mcache wasn't overrun. */
286 : static int
287 : before_frag( fd_replay_tile_ctx_t * ctx,
288 : ulong in_idx,
289 : ulong seq,
290 0 : ulong sig ) {
291 0 : (void)seq;
292 :
293 0 : if( in_idx==REPAIR_IN_IDX ) {
294 0 : FD_LOG_DEBUG(( "rx slice from repair tile %lu %u", fd_disco_repair_replay_sig_slot( sig ), fd_disco_repair_replay_sig_data_cnt( sig ) ));
295 0 : fd_exec_slice_push_tail( ctx->exec_slice_deque, sig );
296 0 : return 1;
297 0 : }
298 0 : return 0;
299 0 : }
300 :
301 : /* Large number of helpers for after_credit begin here */
302 :
303 : static void
304 : publish_stake_weights( fd_replay_tile_ctx_t * ctx,
305 : fd_stem_context_t * stem,
306 0 : fd_exec_slot_ctx_t * slot_ctx ) {
307 0 : fd_epoch_schedule_t const * epoch_schedule = fd_bank_epoch_schedule_query( slot_ctx->bank );
308 :
309 0 : fd_vote_accounts_global_t const * epoch_stakes = fd_bank_epoch_stakes_locking_query( slot_ctx->bank );
310 0 : fd_vote_accounts_pair_global_t_mapnode_t * epoch_stakes_root = fd_vote_accounts_vote_accounts_root_join( epoch_stakes );
311 :
312 0 : if( epoch_stakes_root!=NULL ) {
313 0 : ulong * stake_weights_msg = fd_chunk_to_laddr( ctx->stake_out->mem,
314 0 : ctx->stake_out->chunk );
315 0 : ulong epoch = fd_slot_to_leader_schedule_epoch( epoch_schedule, slot_ctx->slot );
316 0 : ulong stake_weights_sz = generate_stake_weight_msg( slot_ctx, ctx->runtime_spad, epoch - 1, stake_weights_msg );
317 0 : ulong stake_weights_sig = 4UL;
318 0 : fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_out->chunk, stake_weights_sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
319 0 : ctx->stake_out->chunk = fd_dcache_compact_next( ctx->stake_out->chunk, stake_weights_sz, ctx->stake_out->chunk0, ctx->stake_out->wmark );
320 0 : FD_LOG_NOTICE(("sending current epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
321 0 : }
322 :
323 0 : fd_bank_epoch_stakes_end_locking_query( slot_ctx->bank );
324 :
325 0 : fd_vote_accounts_global_t const * next_epoch_stakes = fd_bank_next_epoch_stakes_locking_query( slot_ctx->bank );
326 0 : fd_vote_accounts_pair_global_t_mapnode_t * next_epoch_stakes_root = fd_vote_accounts_vote_accounts_root_join( next_epoch_stakes );
327 :
328 0 : if( next_epoch_stakes_root!=NULL ) {
329 0 : ulong * stake_weights_msg = fd_chunk_to_laddr( ctx->stake_out->mem, ctx->stake_out->chunk );
330 0 : ulong epoch = fd_slot_to_leader_schedule_epoch( epoch_schedule,
331 0 : slot_ctx->slot ); /* epoch */
332 0 : ulong stake_weights_sz = generate_stake_weight_msg( slot_ctx, ctx->runtime_spad, epoch, stake_weights_msg );
333 0 : ulong stake_weights_sig = 4UL;
334 0 : fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_out->chunk, stake_weights_sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
335 0 : ctx->stake_out->chunk = fd_dcache_compact_next( ctx->stake_out->chunk, stake_weights_sz, ctx->stake_out->chunk0, ctx->stake_out->wmark );
336 0 : FD_LOG_NOTICE(("sending next epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
337 0 : }
338 0 : fd_bank_next_epoch_stakes_end_locking_query( slot_ctx->bank );
339 0 : }
340 :
341 : static void
342 : snapshot_hash_tiles_cb( void * para_arg_1,
343 : void * para_arg_2,
344 : void * fn_arg_1,
345 : void * fn_arg_2 FD_PARAM_UNUSED,
346 : void * fn_arg_3 FD_PARAM_UNUSED,
347 0 : void * fn_arg_4 FD_PARAM_UNUSED ) {
348 :
349 0 : fd_replay_tile_ctx_t * ctx = (fd_replay_tile_ctx_t *)para_arg_1;
350 0 : fd_stem_context_t * stem = (fd_stem_context_t *)para_arg_2;
351 0 : fd_subrange_task_info_t * task_info = (fd_subrange_task_info_t *)fn_arg_1;
352 :
353 0 : ulong num_lists = ctx->exec_cnt;
354 0 : FD_LOG_NOTICE(( "launching %lu hash tasks", num_lists ));
355 0 : fd_pubkey_hash_pair_list_t * lists = fd_spad_alloc( ctx->runtime_spad, alignof(fd_pubkey_hash_pair_list_t), num_lists * sizeof(fd_pubkey_hash_pair_list_t) );
356 0 : fd_lthash_value_t * lthash_values = fd_spad_alloc( ctx->runtime_spad, FD_LTHASH_VALUE_ALIGN, num_lists * FD_LTHASH_VALUE_FOOTPRINT );
357 0 : for( ulong i = 0; i < num_lists; i++ ) {
358 0 : fd_lthash_zero( <hash_values[i] );
359 0 : }
360 :
361 0 : task_info->num_lists = num_lists;
362 0 : task_info->lists = lists;
363 0 : task_info->lthash_values = lthash_values;
364 :
365 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
366 0 : fd_stem_publish( stem, ctx->exec_out[i].idx, EXEC_SNAP_HASH_ACCS_CNT_SIG, ctx->exec_out[i].chunk, 0UL, 0UL, 0UL, 0UL );
367 0 : ctx->exec_out[i].chunk = fd_dcache_compact_next( ctx->exec_out[i].chunk, 0UL, ctx->exec_out[i].chunk0, ctx->exec_out[i].wmark );
368 0 : }
369 :
370 0 : uchar cnt_done[ FD_PACK_MAX_BANK_TILES ] = {0};
371 0 : for( ;; ) {
372 0 : uchar wait_cnt = 0;
373 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
374 0 : if( !cnt_done[ i ] ) {
375 0 : ulong res = fd_fseq_query( ctx->exec_fseq[ i ] );
376 0 : uint state = fd_exec_fseq_get_state( res );
377 0 : if( state==FD_EXEC_STATE_SNAP_CNT_DONE ) {
378 0 : FD_LOG_DEBUG(( "Acked hash cnt msg" ));
379 0 : cnt_done[ i ] = 1;
380 0 : task_info->lists[ i ].pairs = fd_spad_alloc( ctx->runtime_spad,
381 0 : FD_PUBKEY_HASH_PAIR_ALIGN,
382 0 : fd_exec_fseq_get_pairs_len( res ) * sizeof(fd_pubkey_hash_pair_t) );
383 0 : } else {
384 0 : wait_cnt++;
385 0 : }
386 0 : }
387 0 : }
388 0 : if( !wait_cnt ) {
389 0 : break;
390 0 : }
391 0 : }
392 :
393 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
394 :
395 0 : fd_replay_out_link_t * exec_out = &ctx->exec_out[ i ];
396 :
397 0 : fd_runtime_public_snap_hash_msg_t * gather_msg = (fd_runtime_public_snap_hash_msg_t *)fd_chunk_to_laddr( exec_out->mem, exec_out->chunk );
398 :
399 0 : gather_msg->lt_hash_value_out_gaddr = fd_wksp_gaddr_fast( ctx->runtime_public_wksp, <hash_values[i] );
400 0 : gather_msg->num_pairs_out_gaddr = fd_wksp_gaddr_fast( ctx->runtime_public_wksp, &task_info->lists[i].pairs_len );
401 0 : gather_msg->pairs_gaddr = fd_wksp_gaddr_fast( ctx->runtime_public_wksp, task_info->lists[i].pairs );
402 :
403 0 : fd_stem_publish( stem, ctx->exec_out[i].idx, EXEC_SNAP_HASH_ACCS_GATHER_SIG, ctx->exec_out[i].chunk, sizeof(fd_runtime_public_snap_hash_msg_t), 0UL, 0UL, 0UL );
404 0 : ctx->exec_out[i].chunk = fd_dcache_compact_next( ctx->exec_out[i].chunk, sizeof(fd_runtime_public_snap_hash_msg_t), ctx->exec_out[i].chunk0, ctx->exec_out[i].wmark );
405 0 : }
406 :
407 :
408 0 : memset( cnt_done, 0, sizeof(cnt_done) );
409 0 : for( ;; ) {
410 0 : uchar wait_cnt = 0;
411 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
412 0 : if( !cnt_done[ i ] ) {
413 0 : ulong res = fd_fseq_query( ctx->exec_fseq[ i ] );
414 0 : uint state = fd_exec_fseq_get_state( res );
415 0 : if( state==FD_EXEC_STATE_SNAP_GATHER_DONE ) {
416 0 : FD_LOG_DEBUG(( "Acked hash gather msg" ));
417 0 : cnt_done[ i ] = 1;
418 0 : } else {
419 0 : wait_cnt++;
420 0 : }
421 0 : }
422 0 : }
423 0 : if( !wait_cnt ) {
424 0 : break;
425 0 : }
426 0 : }
427 0 : }
428 :
429 : static void
430 : block_finalize_tiles_cb( void * para_arg_1,
431 : void * para_arg_2,
432 : void * fn_arg_1,
433 : void * fn_arg_2 FD_PARAM_UNUSED,
434 : void * fn_arg_3 FD_PARAM_UNUSED,
435 0 : void * fn_arg_4 FD_PARAM_UNUSED ) {
436 :
437 0 : fd_replay_tile_ctx_t * ctx = (fd_replay_tile_ctx_t *)para_arg_1;
438 0 : fd_stem_context_t * stem = (fd_stem_context_t *)para_arg_2;
439 0 : fd_accounts_hash_task_data_t * task_data = (fd_accounts_hash_task_data_t *)fn_arg_1;
440 :
441 0 : ulong cnt_per_worker;
442 0 : if( ctx->exec_cnt>1 ) cnt_per_worker = (task_data->info_sz / (ctx->exec_cnt-1UL)) + 1UL; /* ??? */
443 0 : else cnt_per_worker = task_data->info_sz;
444 0 : ulong task_infos_gaddr = fd_wksp_gaddr_fast( ctx->runtime_public_wksp, task_data->info );
445 :
446 0 : uchar hash_done[ FD_PACK_MAX_BANK_TILES ] = {0};
447 0 : for( ulong worker_idx=0UL; worker_idx<ctx->exec_cnt; worker_idx++ ) {
448 :
449 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
450 :
451 0 : ulong lt_hash_gaddr = fd_wksp_gaddr_fast( ctx->runtime_public_wksp, &task_data->lthash_values[ worker_idx ] );
452 0 : if( FD_UNLIKELY( !lt_hash_gaddr ) ) {
453 0 : FD_LOG_ERR(( "lt_hash_gaddr is NULL" ));
454 0 : return;
455 0 : }
456 :
457 0 : ulong start_idx = worker_idx * cnt_per_worker;
458 0 : if( start_idx >= task_data->info_sz ) {
459 : /* If we do not any work for this worker to do, skip it. */
460 0 : hash_done[ worker_idx ] = 1;
461 0 : continue;
462 0 : }
463 0 : ulong end_idx = fd_ulong_sat_sub( start_idx + cnt_per_worker, 1UL );
464 0 : if( end_idx >= task_data->info_sz ) {
465 0 : end_idx = fd_ulong_sat_sub( task_data->info_sz, 1UL );
466 0 : }
467 :
468 0 : fd_replay_out_link_t * exec_out = &ctx->exec_out[ worker_idx ];
469 :
470 0 : fd_runtime_public_hash_bank_msg_t * hash_msg = (fd_runtime_public_hash_bank_msg_t *)fd_chunk_to_laddr( exec_out->mem, exec_out->chunk );
471 0 : generate_hash_bank_msg( task_infos_gaddr, lt_hash_gaddr, start_idx, end_idx, ctx->curr_slot, hash_msg );
472 :
473 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
474 0 : fd_stem_publish( stem,
475 0 : exec_out->idx,
476 0 : EXEC_HASH_ACCS_SIG,
477 0 : exec_out->chunk,
478 0 : sizeof(fd_runtime_public_hash_bank_msg_t),
479 0 : 0UL,
480 0 : tsorig,
481 0 : tspub );
482 0 : exec_out->chunk = fd_dcache_compact_next( exec_out->chunk, sizeof(fd_runtime_public_hash_bank_msg_t), exec_out->chunk0, exec_out->wmark );
483 0 : }
484 :
485 : /* Spins and blocks until all exec tiles are done hashing. */
486 0 : for( ;; ) {
487 0 : uchar wait_cnt = 0;
488 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
489 0 : if( !hash_done[ i ] ) {
490 0 : ulong res = fd_fseq_query( ctx->exec_fseq[ i ] );
491 0 : uint state = fd_exec_fseq_get_state( res );
492 0 : uint slot = fd_exec_fseq_get_slot( res );
493 : /* We need to compare the state and a unique identifier (slot)
494 : in the case where the last thing the exec tile did is to hash
495 : accounts. */
496 0 : if( state==FD_EXEC_STATE_HASH_DONE && slot==ctx->curr_slot ) {
497 0 : hash_done[ i ] = 1;
498 0 : } else {
499 0 : wait_cnt++;
500 0 : }
501 0 : }
502 0 : }
503 0 : if( !wait_cnt ) {
504 0 : break;
505 0 : }
506 0 : }
507 :
508 :
509 0 : }
510 :
511 :
512 : FD_FN_UNUSED static void
513 0 : checkpt( fd_replay_tile_ctx_t * ctx ) {
514 0 : if( FD_UNLIKELY( ctx->slots_replayed_file ) ) fclose( ctx->slots_replayed_file );
515 0 : if( FD_UNLIKELY( strcmp( ctx->blockstore_checkpt, "" ) ) ) {
516 0 : int rc = fd_wksp_checkpt( ctx->blockstore_wksp, ctx->blockstore_checkpt, 0666, 0, NULL );
517 0 : if( rc ) {
518 0 : FD_LOG_ERR( ( "blockstore checkpt failed: error %d", rc ) );
519 0 : }
520 0 : }
521 0 : int rc = fd_wksp_checkpt( ctx->funk->wksp, ctx->funk_checkpt, 0666, 0, NULL );
522 0 : if( rc ) {
523 0 : FD_LOG_ERR( ( "funk checkpt failed: error %d", rc ) );
524 0 : }
525 0 : }
526 :
527 : static void FD_FN_UNUSED
528 0 : funk_cancel( fd_replay_tile_ctx_t * ctx, ulong mismatch_slot ) {
529 0 : fd_funk_txn_start_write( ctx->funk );
530 0 : fd_funk_txn_xid_t xid = { .ul = { mismatch_slot, mismatch_slot } };
531 0 : fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
532 0 : fd_funk_txn_t * mismatch_txn = fd_funk_txn_query( &xid, txn_map );
533 0 : FD_TEST( fd_funk_txn_cancel( ctx->funk, mismatch_txn, 1 ) );
534 0 : fd_funk_txn_end_write( ctx->funk );
535 0 : }
536 :
537 : static void
538 : txncache_publish( fd_replay_tile_ctx_t * ctx,
539 : fd_funk_txn_t * to_root_txn,
540 0 : fd_funk_txn_t * rooted_txn ) {
541 :
542 :
543 : /* For the status cache, we stop rooting until the status cache has been
544 : written out to the current snapshot. We also need to iterate up the
545 : funk transaction tree up until the current "root" to figure out what slots
546 : should be registered. This root can correspond to the latest false root if
547 : one exists. */
548 :
549 :
550 0 : if( FD_UNLIKELY( !ctx->slot_ctx->status_cache ) ) {
551 0 : return;
552 0 : }
553 :
554 0 : fd_funk_txn_start_read( ctx->funk );
555 :
556 0 : fd_funk_txn_t * txn = to_root_txn;
557 0 : fd_funk_txn_pool_t * txn_pool = fd_funk_txn_pool( ctx->funk );
558 0 : while( txn!=rooted_txn ) {
559 0 : ulong slot = txn->xid.ul[0];
560 :
561 0 : FD_LOG_INFO(( "Registering slot %lu", slot ));
562 0 : fd_txncache_register_root_slot( ctx->slot_ctx->status_cache, slot );
563 :
564 0 : txn = fd_funk_txn_parent( txn, txn_pool );
565 0 : }
566 :
567 0 : fd_funk_txn_end_read( ctx->funk );
568 0 : }
569 :
570 : static void
571 : funk_publish( fd_replay_tile_ctx_t * ctx,
572 : fd_funk_txn_t * to_root_txn,
573 0 : ulong wmk ) {
574 :
575 0 : fd_funk_txn_start_write( ctx->funk );
576 0 : FD_LOG_DEBUG(( "Publishing slot=%lu xid=%lu", wmk, to_root_txn->xid.ul[0] ));
577 :
578 : /* This is the standard case. Publish all transactions up to and
579 : including the watermark. This will publish any in-prep ancestors
580 : of root_txn as well. */
581 0 : if( FD_UNLIKELY( !fd_funk_txn_publish( ctx->funk, to_root_txn, 1 ) ) ) {
582 0 : FD_LOG_ERR(( "failed to funk publish slot %lu", wmk ));
583 0 : }
584 0 : fd_funk_txn_end_write( ctx->funk );
585 :
586 0 : if( FD_LIKELY( FD_FEATURE_ACTIVE_BANK( ctx->slot_ctx->bank, epoch_accounts_hash ) &&
587 0 : !FD_FEATURE_ACTIVE_BANK( ctx->slot_ctx->bank, accounts_lt_hash ) ) ) {
588 :
589 0 : if( wmk>=fd_bank_eah_start_slot_get( ctx->slot_ctx->bank ) ) {
590 0 : fd_exec_para_cb_ctx_t exec_para_ctx = {
591 0 : .func = fd_accounts_hash_counter_and_gather_tpool_cb,
592 0 : .para_arg_1 = NULL,
593 0 : .para_arg_2 = NULL
594 0 : };
595 :
596 0 : fd_hash_t out_hash = {0};
597 0 : fd_accounts_hash( ctx->slot_ctx->funk,
598 0 : ctx->slot_ctx->slot,
599 0 : &out_hash,
600 0 : ctx->runtime_spad,
601 0 : fd_bank_features_query( ctx->slot_ctx->bank ),
602 0 : &exec_para_ctx,
603 0 : NULL );
604 0 : FD_LOG_NOTICE(( "Done computing epoch account hash (%s)", FD_BASE58_ENC_32_ALLOCA( &out_hash ) ));
605 :
606 0 : fd_bank_epoch_account_hash_set( ctx->slot_ctx->bank, out_hash );
607 :
608 0 : fd_bank_eah_start_slot_set( ctx->slot_ctx->bank, FD_SLOT_NULL );
609 0 : }
610 0 : }
611 :
612 0 : }
613 :
614 : static void
615 0 : funk_and_txncache_publish( fd_replay_tile_ctx_t * ctx, ulong wmk, fd_funk_txn_xid_t const * xid ) {
616 :
617 0 : FD_LOG_DEBUG(( "Entering funk_and_txncache_publish for wmk=%lu", wmk ));
618 :
619 0 : if( xid->ul[0] != wmk ) {
620 0 : FD_LOG_CRIT(( "Invariant violation: xid->ul[0] != wmk %lu %lu", xid->ul[0], wmk ));
621 0 : }
622 :
623 : /* Handle updates to funk and the status cache. */
624 :
625 0 : fd_funk_txn_start_read( ctx->funk );
626 0 : fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
627 0 : fd_funk_txn_t * to_root_txn = fd_funk_txn_query( xid, txn_map );
628 0 : if( FD_UNLIKELY( !to_root_txn ) ) {
629 0 : FD_LOG_ERR(( "Unable to find funk transaction for xid %lu", xid->ul[0] ));
630 0 : }
631 0 : fd_funk_txn_t * rooted_txn = NULL;
632 0 : fd_funk_txn_end_read( ctx->funk );
633 :
634 0 : txncache_publish( ctx, to_root_txn, rooted_txn );
635 :
636 0 : funk_publish( ctx, to_root_txn, wmk );
637 :
638 0 : if( FD_UNLIKELY( ctx->capture_ctx ) ) {
639 0 : fd_runtime_checkpt( ctx->capture_ctx, ctx->slot_ctx, wmk );
640 0 : }
641 :
642 0 : }
643 :
644 : static void
645 : after_frag( fd_replay_tile_ctx_t * ctx,
646 : ulong in_idx,
647 : ulong seq FD_PARAM_UNUSED,
648 : ulong sig,
649 : ulong sz FD_PARAM_UNUSED,
650 : ulong tsorig FD_PARAM_UNUSED,
651 : ulong tspub FD_PARAM_UNUSED,
652 0 : fd_stem_context_t * stem FD_PARAM_UNUSED ) {
653 0 : if( FD_LIKELY( in_idx==ctx->tower_in_idx ) ) {
654 0 : ulong root = sig;
655 :
656 0 : if( FD_LIKELY( root <= fd_fseq_query( ctx->published_wmark ) ) ) return;
657 0 : FD_LOG_NOTICE(( "advancing root %lu => %lu", fd_fseq_query( ctx->published_wmark ), root ));
658 :
659 0 : ctx->root = root;
660 0 : if( FD_LIKELY( ctx->blockstore ) ) fd_blockstore_publish( ctx->blockstore, ctx->blockstore_fd, root );
661 0 : if( FD_LIKELY( ctx->forks ) ) fd_forks_publish( ctx->forks, root );
662 0 : if( FD_LIKELY( ctx->funk ) ) { fd_funk_txn_xid_t xid = { .ul = { root, root } }; funk_and_txncache_publish( ctx, root, &xid ); }
663 0 : if( FD_LIKELY( ctx->banks ) ) fd_banks_publish( ctx->banks, root );
664 :
665 0 : fd_fseq_update( ctx->published_wmark, root );
666 0 : }
667 0 : }
668 :
669 : static void
670 : replay_plugin_publish( fd_replay_tile_ctx_t * ctx,
671 : fd_stem_context_t * stem,
672 : ulong sig,
673 : uchar const * data,
674 0 : ulong data_sz ) {
675 0 : uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->plugin_out->mem, ctx->plugin_out->chunk );
676 0 : fd_memcpy( dst, data, data_sz );
677 0 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
678 0 : fd_stem_publish( stem, ctx->plugin_out->idx, sig, ctx->plugin_out->chunk, data_sz, 0UL, 0UL, tspub );
679 0 : ctx->plugin_out->chunk = fd_dcache_compact_next( ctx->plugin_out->chunk, data_sz, ctx->plugin_out->chunk0, ctx->plugin_out->wmark );
680 0 : }
681 :
682 : static void
683 : publish_slot_notifications( fd_replay_tile_ctx_t * ctx,
684 : fd_stem_context_t * stem,
685 : ulong block_entry_block_height,
686 0 : ulong curr_slot ) {
687 0 : if( FD_LIKELY( !ctx->notif_out->mcache ) ) return;
688 :
689 0 : long notify_time_ns = -fd_log_wallclock();
690 0 : #define NOTIFY_START msg = fd_chunk_to_laddr( ctx->notif_out->mem, ctx->notif_out->chunk )
691 0 : #define NOTIFY_END \
692 0 : fd_mcache_publish( ctx->notif_out->mcache, ctx->notif_out->depth, ctx->notif_out->seq, \
693 0 : 0UL, ctx->notif_out->chunk, sizeof(fd_replay_notif_msg_t), 0UL, tsorig, tsorig ); \
694 0 : ctx->notif_out->seq = fd_seq_inc( ctx->notif_out->seq, 1UL ); \
695 0 : ctx->notif_out->chunk = fd_dcache_compact_next( ctx->notif_out->chunk, sizeof(fd_replay_notif_msg_t), \
696 0 : ctx->notif_out->chunk0, ctx->notif_out->wmark ); \
697 0 : msg = NULL
698 :
699 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
700 0 : fd_replay_notif_msg_t * msg = NULL;
701 :
702 0 : {
703 0 : NOTIFY_START;
704 0 : msg->type = FD_REPLAY_SLOT_TYPE;
705 0 : msg->slot_exec.slot = curr_slot;
706 0 : msg->slot_exec.parent = ctx->parent_slot;
707 0 : msg->slot_exec.root = fd_fseq_query( ctx->published_wmark );
708 0 : msg->slot_exec.height = block_entry_block_height;
709 0 : msg->slot_exec.transaction_count = fd_bank_txn_count_get( ctx->slot_ctx->bank );
710 0 : msg->slot_exec.shred_cnt = fd_bank_shred_cnt_get( ctx->slot_ctx->bank );
711 :
712 0 : msg->slot_exec.bank_hash = fd_bank_bank_hash_get( ctx->slot_ctx->bank );
713 :
714 0 : fd_block_hash_queue_global_t const * block_hash_queue = fd_bank_block_hash_queue_query( ctx->slot_ctx->bank );
715 0 : fd_hash_t * last_hash = fd_block_hash_queue_last_hash_join( block_hash_queue );
716 0 : msg->slot_exec.block_hash = *last_hash;
717 :
718 0 : memcpy( &msg->slot_exec.identity, ctx->validator_identity_pubkey, sizeof( fd_pubkey_t ) );
719 0 : msg->slot_exec.ts = tsorig;
720 0 : NOTIFY_END;
721 0 : }
722 0 : fd_bank_shred_cnt_set( ctx->slot_ctx->bank, 0UL );
723 :
724 0 : FD_TEST( curr_slot == ctx->slot_ctx->bank->slot );
725 :
726 0 : #undef NOTIFY_START
727 0 : #undef NOTIFY_END
728 0 : notify_time_ns += fd_log_wallclock();
729 0 : FD_LOG_DEBUG(("TIMING: notify_slot_time - slot: %lu, elapsed: %6.6f ms", curr_slot, (double)notify_time_ns * 1e-6));
730 :
731 0 : if( ctx->plugin_out->mem ) {
732 : /*
733 : fd_replay_complete_msg_t msg2 = {
734 : .slot = curr_slot,
735 : .total_txn_count = ctx->slot_ctx->txn_count,
736 : .nonvote_txn_count = ctx->slot_ctx->nonvote_txn_count,
737 : .failed_txn_count = ctx->slot_ctx->failed_txn_count,
738 : .nonvote_failed_txn_count = ctx->slot_ctx->nonvote_failed_txn_count,
739 : .compute_units = ctx->slot_ctx->total_compute_units_used,
740 : .transaction_fee = ctx->slot_ctx->slot_bank.collected_execution_fees,
741 : .priority_fee = ctx->slot_ctx-2842>slot_bank.collected_priority_fees,
742 : .parent_slot = ctx->parent_slot,
743 : };
744 : */
745 :
746 0 : ulong msg[11];
747 0 : msg[ 0 ] = ctx->curr_slot;
748 0 : msg[ 1 ] = fd_bank_txn_count_get( ctx->slot_ctx->bank );
749 0 : msg[ 2 ] = fd_bank_nonvote_txn_count_get( ctx->slot_ctx->bank );
750 0 : msg[ 3 ] = fd_bank_failed_txn_count_get( ctx->slot_ctx->bank );
751 0 : msg[ 4 ] = fd_bank_nonvote_failed_txn_count_get( ctx->slot_ctx->bank );
752 0 : msg[ 5 ] = fd_bank_total_compute_units_used_get( ctx->slot_ctx->bank );
753 0 : msg[ 6 ] = fd_bank_execution_fees_get( ctx->slot_ctx->bank );
754 0 : msg[ 7 ] = fd_bank_priority_fees_get( ctx->slot_ctx->bank );
755 0 : msg[ 8 ] = 0UL; /* todo ... track tips */
756 0 : msg[ 9 ] = ctx->parent_slot;
757 0 : msg[ 10 ] = 0UL; /* todo ... max compute units */
758 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_SLOT_COMPLETED, (uchar const *)msg, sizeof(msg) );
759 0 : }
760 0 : }
761 :
762 : // static void
763 : // send_tower_sync( fd_replay_tile_ctx_t * ctx ) {
764 : // if( FD_UNLIKELY( !ctx->vote ) ) return;
765 : // FD_LOG_NOTICE( ( "sending tower sync" ) );
766 : // ulong vote_slot = fd_tower_votes_peek_tail_const( ctx->tower )->slot;
767 : // fd_hash_t vote_bank_hash[1] = { 0 };
768 : // fd_hash_t vote_block_hash[1] = { 0 };
769 :
770 : // /* guaranteed to be on frontier from caller check */
771 : // fd_fork_t const * fork = fd_forks_query_const( ctx->forks, vote_slot );
772 : // fd_memcpy( vote_bank_hash, &ctx->slot_ctx->slot_bank.banks_hash, sizeof(fd_hash_t) );
773 :
774 : // int err = fd_blockstore_block_hash_query( ctx->blockstore, vote_slot, vote_block_hash );
775 : // if( err ) FD_LOG_ERR(( "invariant violation: missing block hash for tower vote" ));
776 :
777 : // /* Build a vote state update based on current tower votes. */
778 :
779 : // fd_txn_p_t * txn = (fd_txn_p_t *)fd_chunk_to_laddr( ctx->send_out->mem, ctx->send_out->chunk );
780 : // fd_tower_to_vote_txn( ctx->tower,
781 : // ctx->root,
782 : // vote_bank_hash,
783 : // vote_block_hash,
784 : // ctx->validator_identity,
785 : // ctx->vote_authority,
786 : // ctx->vote_acc,
787 : // txn,
788 : // ctx->runtime_spad );
789 :
790 : // /* TODO: Can use a smaller size, adjusted for payload length */
791 : // ulong msg_sz = sizeof( fd_txn_p_t );
792 : // ulong sig = vote_slot;
793 : // fd_mcache_publish( ctx->send_out->mcache,
794 : // ctx->send_out->depth,
795 : // ctx->send_out->seq,
796 : // sig,
797 : // ctx->send_out->chunk,
798 : // msg_sz,
799 : // 0UL,
800 : // 0,
801 : // 0 );
802 : // ctx->send_out->seq = fd_seq_inc( ctx->send_out->seq, 1UL );
803 : // ctx->send_out->chunk = fd_dcache_compact_next( ctx->send_out->chunk,
804 : // msg_sz,
805 : // ctx->send_out->chunk0,
806 : // ctx->send_out->wmark );
807 :
808 : // /* Dump the latest sent tower into the tower checkpoint file */
809 : // if( FD_LIKELY( ctx->tower_checkpt_fileno > 0 ) ) fd_restart_tower_checkpt( vote_bank_hash, ctx->tower, ctx->ghost, ctx->root, ctx->tower_checkpt_fileno );
810 : // }
811 :
812 : static fd_fork_t *
813 : prepare_new_block_execution( fd_replay_tile_ctx_t * ctx,
814 : fd_stem_context_t * stem,
815 : ulong curr_slot,
816 0 : ulong flags ) {
817 :
818 0 : long prepare_time_ns = -fd_log_wallclock();
819 :
820 0 : int is_new_epoch_in_new_block = 0;
821 0 : fd_fork_t * fork = fd_forks_prepare( ctx->forks,
822 0 : ctx->parent_slot,
823 0 : ctx->funk,
824 0 : ctx->blockstore,
825 0 : ctx->runtime_spad );
826 :
827 : /* Remove previous slot ctx from frontier */
828 0 : fd_fork_t * child = fd_fork_frontier_ele_remove( ctx->forks->frontier, &fork->slot, NULL, ctx->forks->pool );
829 0 : child->slot = curr_slot;
830 0 : child->end_idx = UINT_MAX; // reset end_idx from whatever was previously executed on this fork
831 :
832 : /* Insert new slot onto fork frontier */
833 0 : if( FD_UNLIKELY( fd_fork_frontier_ele_query(
834 0 : ctx->forks->frontier, &curr_slot, NULL, ctx->forks->pool ) ) ) {
835 0 : FD_LOG_ERR( ( "invariant violation: child slot %lu was already in the frontier", curr_slot ) );
836 0 : }
837 0 : fd_fork_frontier_ele_insert( ctx->forks->frontier, child, ctx->forks->pool );
838 0 : fork->lock = 1;
839 0 : FD_TEST( fork == child );
840 :
841 0 : FD_LOG_NOTICE(( "new block execution - slot: %lu, parent_slot: %lu", curr_slot, ctx->parent_slot ));
842 0 : ctx->slot_ctx->banks = ctx->banks;
843 0 : if( FD_UNLIKELY( !ctx->banks ) ) {
844 0 : FD_LOG_CRIT(( "invariant violation: banks is NULL" ));
845 0 : }
846 :
847 0 : ctx->slot_ctx->bank = fd_banks_clone_from_parent( ctx->banks, curr_slot, ctx->parent_slot );
848 0 : if( FD_UNLIKELY( !ctx->slot_ctx->bank ) ) {
849 0 : FD_LOG_CRIT(( "invariant violation: bank is NULL" ));
850 0 : }
851 :
852 : /* if it is an epoch boundary, push out stake weights */
853 :
854 0 : if( ctx->slot_ctx->slot != 0 ) {
855 0 : is_new_epoch_in_new_block = (int)fd_runtime_is_epoch_boundary( ctx->slot_ctx, ctx->slot_ctx->slot, fd_bank_prev_slot_get( ctx->slot_ctx->bank ) );
856 0 : }
857 :
858 : /* Update starting PoH hash for the new slot for tick verification later */
859 0 : fd_block_map_query_t query[1] = { 0 };
860 0 : int err = fd_block_map_prepare( ctx->blockstore->block_map, &curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
861 0 : fd_block_info_t * curr_block_info = fd_block_map_query_ele( query );
862 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_FULL ) ) FD_LOG_ERR(("Block map prepare failed, likely corrupt."));
863 0 : if( FD_UNLIKELY( curr_slot != curr_block_info->slot ) ) FD_LOG_ERR(("Block map prepare failed, likely corrupt."));
864 0 : fd_block_map_publish( query );
865 :
866 0 : fd_bank_prev_slot_set( ctx->slot_ctx->bank, ctx->slot_ctx->slot );
867 :
868 0 : ctx->slot_ctx->slot = curr_slot;
869 :
870 0 : fd_bank_tick_height_set( ctx->slot_ctx->bank, fd_bank_max_tick_height_get( ctx->slot_ctx->bank ) );
871 :
872 0 : ulong * max_tick_height = fd_bank_max_tick_height_modify( ctx->slot_ctx->bank );
873 0 : ulong ticks_per_slot = fd_bank_ticks_per_slot_get( ctx->slot_ctx->bank );
874 0 : if( FD_UNLIKELY( FD_RUNTIME_EXECUTE_SUCCESS != fd_runtime_compute_max_tick_height( ticks_per_slot,
875 0 : curr_slot,
876 0 : max_tick_height ) ) ) {
877 0 : FD_LOG_ERR(( "couldn't compute tick height/max tick height slot %lu ticks_per_slot %lu", curr_slot, ticks_per_slot ));
878 0 : }
879 :
880 0 : fd_bank_enable_exec_recording_set( ctx->slot_ctx->bank, ctx->tx_metadata_storage );
881 :
882 0 : ctx->slot_ctx->status_cache = ctx->status_cache;
883 :
884 0 : fd_funk_txn_xid_t xid = { 0 };
885 :
886 0 : if( flags & REPLAY_FLAG_PACKED_MICROBLOCK ) {
887 0 : memset( xid.uc, 0, sizeof(fd_funk_txn_xid_t) );
888 0 : } else {
889 0 : xid.ul[1] = ctx->slot_ctx->slot;
890 0 : }
891 0 : xid.ul[0] = ctx->slot_ctx->slot;
892 : /* push a new transaction on the stack */
893 0 : fd_funk_txn_start_write( ctx->funk );
894 :
895 0 : fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
896 0 : if( FD_UNLIKELY( !txn_map->map ) ) {
897 0 : FD_LOG_ERR(( "Could not find valid funk transaction map" ));
898 0 : }
899 0 : fd_funk_txn_xid_t parent_xid = { .ul = { ctx->parent_slot, ctx->parent_slot } };
900 0 : fd_funk_txn_t * parent_txn = fd_funk_txn_query( &parent_xid, txn_map );
901 :
902 0 : ctx->slot_ctx->funk_txn = fd_funk_txn_prepare(ctx->funk, parent_txn, &xid, 1 );
903 :
904 0 : fd_funk_txn_end_write( ctx->funk );
905 :
906 0 : int is_epoch_boundary = 0;
907 : /* TODO: Currently all of the epoch boundary/rewards logic is not
908 : multhreaded at the epoch boundary. */
909 0 : fd_runtime_block_pre_execute_process_new_epoch( ctx->slot_ctx,
910 0 : NULL,
911 0 : ctx->exec_spads,
912 0 : ctx->exec_spad_cnt,
913 0 : ctx->runtime_spad,
914 0 : &is_epoch_boundary );
915 :
916 : /* FIXME: This breaks the concurrency model where we can change forks
917 : in the middle of a block. */
918 : /* At this point we need to notify all of the exec tiles and tell them
919 : that a new slot is ready to be published. At this point, we should
920 : also mark the tile as not being ready. */
921 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
922 0 : ctx->exec_ready[ i ] = EXEC_TXN_READY;
923 0 : }
924 :
925 : /* We want to push on a spad frame before we start executing a block.
926 : Apart from allocations made at the epoch boundary, there should be no
927 : allocations that persist beyond the scope of a block. Before this point,
928 : there should only be 1 or 2 frames that are on the stack. The first frame
929 : will hold memory for the slot/epoch context. The potential second frame
930 : will only exist while rewards are being distributed (around the start of
931 : an epoch). We pop a frame when rewards are done being distributed. */
932 0 : fd_spad_push( ctx->runtime_spad );
933 :
934 0 : int res = fd_runtime_block_execute_prepare( ctx->slot_ctx, ctx->blockstore, ctx->runtime_spad );
935 0 : if( res != FD_RUNTIME_EXECUTE_SUCCESS ) {
936 0 : FD_LOG_ERR(( "block prep execute failed" ));
937 0 : }
938 :
939 0 : if( is_new_epoch_in_new_block ) {
940 0 : publish_stake_weights( ctx, stem, ctx->slot_ctx );
941 0 : }
942 :
943 0 : prepare_time_ns += fd_log_wallclock();
944 0 : FD_LOG_DEBUG(("TIMING: prepare_time - slot: %lu, elapsed: %6.6f ms", curr_slot, (double)prepare_time_ns * 1e-6));
945 :
946 0 : return fork;
947 0 : }
948 :
949 : static void
950 0 : init_poh( fd_replay_tile_ctx_t * ctx ) {
951 0 : FD_LOG_INFO(( "sending init msg" ));
952 :
953 0 : FD_LOG_WARNING(( "hashes_per_tick: %lu, ticks_per_slot: %lu",
954 0 : fd_bank_hashes_per_tick_get( ctx->slot_ctx->bank ),
955 0 : fd_bank_ticks_per_slot_get( ctx->slot_ctx->bank ) ));
956 :
957 0 : fd_replay_out_link_t * bank_out = &ctx->bank_out[ 0UL ];
958 0 : fd_poh_init_msg_t * msg = fd_chunk_to_laddr( bank_out->mem, bank_out->chunk ); // FIXME: msg is NULL
959 0 : msg->hashcnt_per_tick = fd_bank_hashes_per_tick_get( ctx->slot_ctx->bank );
960 0 : msg->ticks_per_slot = fd_bank_ticks_per_slot_get( ctx->slot_ctx->bank );
961 0 : msg->tick_duration_ns = (ulong)(fd_bank_ns_per_slot_get( ctx->slot_ctx->bank )) / fd_bank_ticks_per_slot_get( ctx->slot_ctx->bank );
962 :
963 0 : fd_block_hash_queue_global_t * bhq = (fd_block_hash_queue_global_t *)&ctx->slot_ctx->bank->block_hash_queue[0];
964 0 : fd_hash_t * last_hash = fd_block_hash_queue_last_hash_join( bhq );
965 0 : if( last_hash ) {
966 0 : memcpy(msg->last_entry_hash, last_hash, sizeof(fd_hash_t));
967 0 : } else {
968 0 : memset(msg->last_entry_hash, 0UL, sizeof(fd_hash_t));
969 0 : }
970 0 : msg->tick_height = ctx->slot_ctx->slot * msg->ticks_per_slot;
971 :
972 0 : ulong sig = fd_disco_replay_old_sig( ctx->slot_ctx->slot, REPLAY_FLAG_INIT );
973 0 : fd_mcache_publish( bank_out->mcache, bank_out->depth, bank_out->seq, sig, bank_out->chunk, sizeof(fd_poh_init_msg_t), 0UL, 0UL, 0UL );
974 0 : bank_out->chunk = fd_dcache_compact_next( bank_out->chunk, sizeof(fd_poh_init_msg_t), bank_out->chunk0, bank_out->wmark );
975 0 : bank_out->seq = fd_seq_inc( bank_out->seq, 1UL );
976 0 : ctx->poh_init_done = 1;
977 0 : }
978 :
979 : static void
980 0 : prepare_first_batch_execution( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem ) {
981 :
982 0 : ulong curr_slot = ctx->curr_slot;
983 0 : ulong flags = ctx->flags;
984 :
985 : /**********************************************************************/
986 : /* Prepare the fork in ctx->forks for replaying curr_slot */
987 : /**********************************************************************/
988 :
989 0 : fd_fork_t * parent_fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &ctx->parent_slot, NULL, ctx->forks->pool );
990 0 : if( FD_UNLIKELY( parent_fork && parent_fork->lock ) ) {
991 : /* This is an edge case related to pack. The parent fork might
992 : already be in the frontier and currently executing (ie.
993 : fork->frozen = 0). */
994 0 : FD_LOG_ERR(( "parent slot is frozen in frontier. cannot execute. slot: %lu, parent_slot: %lu",
995 0 : curr_slot,
996 0 : ctx->parent_slot ));
997 0 : }
998 :
999 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &curr_slot, NULL, ctx->forks->pool );
1000 0 : if( fork==NULL ) {
1001 0 : fork = prepare_new_block_execution( ctx, stem, curr_slot, flags );
1002 0 : } else {
1003 0 : FD_LOG_WARNING(("Fork for slot %lu already exists, so we don't make a new one. Restarting execution from batch %u", curr_slot, fork->end_idx ));
1004 0 : ctx->slot_ctx->bank = fd_banks_get_bank( ctx->banks, curr_slot );
1005 0 : if( FD_UNLIKELY( !ctx->slot_ctx->bank ) ) {
1006 0 : FD_LOG_CRIT(( "Unable to get bank for slot %lu", curr_slot ));
1007 0 : }
1008 :
1009 0 : fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
1010 0 : fd_funk_txn_xid_t xid = { .ul = { curr_slot, curr_slot } };
1011 0 : ctx->slot_ctx->funk_txn = fd_funk_txn_query( &xid, txn_map );
1012 0 : if( FD_UNLIKELY( !ctx->slot_ctx->funk_txn ) ) {
1013 0 : FD_LOG_CRIT(( "Unable to get funk transaction for slot %lu", curr_slot ));
1014 0 : }
1015 :
1016 0 : ctx->slot_ctx->slot = curr_slot;
1017 0 : }
1018 :
1019 : /**********************************************************************/
1020 : /* Get the solcap context for replaying curr_slot */
1021 : /**********************************************************************/
1022 :
1023 0 : if( ctx->capture_ctx ) {
1024 0 : fd_solcap_writer_set_slot( ctx->capture_ctx->capture, ctx->slot_ctx->slot );
1025 0 : }
1026 :
1027 0 : }
1028 :
1029 : static void
1030 : exec_slice( fd_replay_tile_ctx_t * ctx,
1031 : fd_stem_context_t * stem,
1032 0 : ulong slot ) {
1033 :
1034 : /* Assumes that the slice exec ctx has buffered at least one slice.
1035 : Then, for each microblock, round robin dispatch the transactions in
1036 : that microblock to the exec tile. Once exec tile signifies with a
1037 : retcode, we can continue dispatching transactions. Replay has to
1038 : synchronize at the boundary of every microblock. After we dispatch
1039 : one to each exec tile, we watermark where we are, and then continue
1040 : on the following after_credit. If we still have txns to execute,
1041 : start from wmark, pausing everytime we hit the microblock
1042 : boundaries. */
1043 :
1044 0 : uchar to_exec[ FD_PACK_MAX_BANK_TILES ] = {0};
1045 0 : uchar num_free_exec_tiles = 0UL;
1046 0 : for( uchar i=0; i<ctx->exec_cnt; i++ ) {
1047 0 : if( ctx->exec_ready[ i ]==EXEC_TXN_READY ) {
1048 0 : to_exec[ num_free_exec_tiles++ ] = i;
1049 0 : }
1050 0 : }
1051 :
1052 0 : if( ctx->blocked_on_mblock ) {
1053 0 : if( num_free_exec_tiles==ctx->exec_cnt ) {
1054 0 : ctx->blocked_on_mblock = 0;
1055 0 : } else {
1056 0 : return;
1057 0 : }
1058 0 : }
1059 :
1060 0 : uchar start_num_free_exec_tiles = (uchar)ctx->exec_cnt;
1061 0 : while( num_free_exec_tiles>0 ) {
1062 :
1063 0 : if( fd_slice_exec_txn_ready( &ctx->slice_exec_ctx ) ) {
1064 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
1065 :
1066 0 : uchar exec_idx = to_exec[ num_free_exec_tiles-1 ];
1067 0 : fd_replay_out_link_t * exec_out = &ctx->exec_out[ exec_idx ];
1068 0 : num_free_exec_tiles--;
1069 :
1070 0 : fd_txn_p_t txn_p;
1071 0 : fd_slice_exec_txn_parse( &ctx->slice_exec_ctx, &txn_p );
1072 :
1073 : /* Insert or reverify invoked programs for this epoch, if needed
1074 : FIXME: this should be done during txn parsing so that we don't have to loop
1075 : over all accounts a second time. */
1076 0 : fd_runtime_update_program_cache( ctx->slot_ctx, &txn_p, ctx->runtime_spad );
1077 :
1078 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier,
1079 0 : &slot,
1080 0 : NULL,
1081 0 : ctx->forks->pool );
1082 :
1083 0 : if( FD_UNLIKELY( !fork ) ) FD_LOG_ERR(( "Unable to select a fork" ));
1084 :
1085 0 : fd_bank_txn_count_set( ctx->slot_ctx->bank, fd_bank_txn_count_get( ctx->slot_ctx->bank ) + 1 );
1086 :
1087 : /* dispatch dcache */
1088 0 : fd_runtime_public_txn_msg_t * exec_msg = (fd_runtime_public_txn_msg_t *)fd_chunk_to_laddr( exec_out->mem, exec_out->chunk );
1089 0 : memcpy( &exec_msg->txn, &txn_p, sizeof(fd_txn_p_t) );
1090 0 : exec_msg->slot = ctx->slot_ctx->slot;
1091 :
1092 0 : ctx->exec_ready[ exec_idx ] = EXEC_TXN_BUSY;
1093 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
1094 0 : fd_stem_publish( stem, exec_out->idx, EXEC_NEW_TXN_SIG, exec_out->chunk, sizeof(fd_runtime_public_txn_msg_t), 0UL, tsorig, tspub );
1095 0 : exec_out->chunk = fd_dcache_compact_next( exec_out->chunk, sizeof(fd_runtime_public_txn_msg_t), exec_out->chunk0, exec_out->wmark );
1096 :
1097 0 : continue;
1098 0 : }
1099 :
1100 : /* If the current microblock is complete, and we still have mblks
1101 : to read, then advance to the next microblock */
1102 :
1103 0 : if( fd_slice_exec_microblock_ready( &ctx->slice_exec_ctx ) ) {
1104 0 : ctx->blocked_on_mblock = 1;
1105 0 : fd_slice_exec_microblock_parse( &ctx->slice_exec_ctx );
1106 0 : }
1107 :
1108 : /* Under this condition, we have finished executing all the
1109 : microblocks in the slice, and are ready to load another slice.
1110 : However, if just completed the last batch in the slot, we want
1111 : to be sure to finalize block execution (below). */
1112 :
1113 0 : if( fd_slice_exec_slice_ready( &ctx->slice_exec_ctx )
1114 0 : && !ctx->slice_exec_ctx.last_batch ){
1115 0 : ctx->flags = EXEC_FLAG_READY_NEW;
1116 0 : }
1117 0 : break; /* block on microblock / batch */
1118 0 : }
1119 :
1120 0 : if( fd_slice_exec_slot_complete( &ctx->slice_exec_ctx ) ) {
1121 :
1122 0 : if( num_free_exec_tiles != start_num_free_exec_tiles ) {
1123 0 : FD_LOG_DEBUG(( "blocked on exec tiles completing" ));
1124 0 : return;
1125 0 : }
1126 :
1127 0 : FD_LOG_DEBUG(( "[%s] BLOCK EXECUTION COMPLETE", __func__ ));
1128 :
1129 : /* At this point, the entire block has been executed. */
1130 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier,
1131 0 : &slot,
1132 0 : NULL,
1133 0 : ctx->forks->pool );
1134 0 : if( FD_UNLIKELY( !fork ) ) {
1135 0 : FD_LOG_ERR(( "Unable to select a fork" ));
1136 0 : }
1137 :
1138 0 : fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)fd_type_pun( ctx->slice_exec_ctx.buf + ctx->slice_exec_ctx.last_mblk_off );
1139 :
1140 : // Copy block hash to slot_bank poh for updating the sysvars
1141 0 : fd_block_map_query_t query[1] = { 0 };
1142 0 : fd_block_map_prepare( ctx->blockstore->block_map, &ctx->curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
1143 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
1144 :
1145 0 : fd_hash_t * poh = fd_bank_poh_modify( ctx->slot_ctx->bank );
1146 0 : memcpy( poh, hdr->hash, sizeof(fd_hash_t) );
1147 :
1148 0 : block_info->flags = fd_uchar_set_bit( block_info->flags, FD_BLOCK_FLAG_PROCESSED );
1149 0 : FD_COMPILER_MFENCE();
1150 0 : block_info->flags = fd_uchar_clear_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING );
1151 0 : memcpy( &block_info->block_hash, hdr->hash, sizeof(fd_hash_t) );
1152 0 : fd_hash_t const * bank_hash = fd_bank_bank_hash_query( ctx->slot_ctx->bank );
1153 0 : memcpy( &block_info->bank_hash, bank_hash, sizeof(fd_hash_t) );
1154 :
1155 0 : fd_block_map_publish( query );
1156 0 : ctx->flags = EXEC_FLAG_FINISHED_SLOT;
1157 :
1158 0 : fd_slice_exec_reset( &ctx->slice_exec_ctx ); /* Reset ctx for next slot */
1159 0 : }
1160 :
1161 0 : }
1162 :
1163 : /* handle_slice polls for a new slice off the slices stored in the
1164 : deque, and prepares it for execution, including call
1165 : prepare_first_batch_execution if this is the first slice of a new
1166 : slot. Also queries blockstore for the corresponding shreds and stores
1167 : them into the slice_exec_ctx. Assumes that replay is ready for a new
1168 : slice (i.e., finished executing the previous slice). */
1169 : static void
1170 : handle_slice( fd_replay_tile_ctx_t * ctx,
1171 0 : fd_stem_context_t * stem ) {
1172 :
1173 0 : if( fd_exec_slice_cnt( ctx->exec_slice_deque )==0UL ) {
1174 0 : FD_LOG_DEBUG(( "No slices to execute" ));
1175 0 : return;
1176 0 : }
1177 :
1178 0 : ulong sig = fd_exec_slice_pop_head( ctx->exec_slice_deque );
1179 :
1180 0 : if( FD_UNLIKELY( ctx->flags!=EXEC_FLAG_READY_NEW ) ) {
1181 0 : FD_LOG_ERR(( "Replay is in unexpected state" ));
1182 0 : }
1183 :
1184 0 : ulong slot = fd_disco_repair_replay_sig_slot( sig );
1185 0 : ushort parent_off = fd_disco_repair_replay_sig_parent_off( sig );
1186 0 : uint data_cnt = fd_disco_repair_replay_sig_data_cnt( sig );
1187 0 : int slot_complete = fd_disco_repair_replay_sig_slot_complete( sig );
1188 0 : ulong parent_slot = slot - parent_off;
1189 :
1190 0 : if( FD_UNLIKELY( slot < fd_fseq_query( ctx->published_wmark ) ) ) {
1191 0 : FD_LOG_WARNING(( "ignoring replay of slot %lu (parent: %lu). earlier than our watermark %lu.", slot, parent_slot, fd_fseq_query( ctx->published_wmark ) ));
1192 0 : return;
1193 0 : }
1194 :
1195 0 : if( FD_UNLIKELY( parent_slot < fd_fseq_query( ctx->published_wmark ) ) ) {
1196 0 : FD_LOG_WARNING(( "ignoring replay of slot %lu (parent: %lu). parent slot is earlier than our watermark %lu.", slot, parent_slot, fd_fseq_query( ctx->published_wmark ) ) );
1197 0 : return;
1198 0 : }
1199 :
1200 0 : if( FD_UNLIKELY( !fd_blockstore_block_info_test( ctx->blockstore, parent_slot ) ) ) {
1201 0 : FD_LOG_WARNING(( "unable to find slot %lu's parent slot %lu block_info", slot, parent_slot ));
1202 0 : return;
1203 0 : }
1204 :
1205 0 : if( FD_UNLIKELY( slot != ctx->curr_slot ) ) {
1206 :
1207 : /* We need to switch forks and execution contexts. Either we
1208 : completed execution of the previous slot and are now executing
1209 : a new slot or we are interleaving batches from different slots
1210 : - all executable at the fork frontier.
1211 :
1212 : Going to need to query the frontier for the fork, or create it
1213 : if its not on the frontier. I think
1214 : prepare_first_batch_execution already handles this logic. */
1215 :
1216 0 : ctx->curr_slot = slot;
1217 0 : ctx->parent_slot = parent_slot;
1218 0 : prepare_first_batch_execution( ctx, stem );
1219 :
1220 0 : ulong turbine_slot = fd_fseq_query( ctx->turbine_slot );
1221 :
1222 0 : FD_LOG_NOTICE( ( "\n\n[Replay]\n"
1223 0 : "slot: %lu\n"
1224 0 : "current turbine: %lu\n"
1225 0 : "slots behind: %lu\n"
1226 0 : "live: %d\n",
1227 0 : slot,
1228 0 : turbine_slot,
1229 0 : turbine_slot - slot,
1230 0 : ( turbine_slot - slot ) < 5 ) );
1231 0 : } else {
1232 : /* continuing execution of the slot we have been doing */
1233 0 : }
1234 :
1235 : /* Prepare batch for execution on following after_credit iteration */
1236 0 : ctx->flags = EXEC_FLAG_EXECUTING_SLICE;
1237 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &slot, NULL, ctx->forks->pool );
1238 0 : ulong slice_sz;
1239 0 : uint start_idx = fork->end_idx + 1;
1240 0 : int err = fd_blockstore_slice_query( ctx->blockstore,
1241 0 : slot,
1242 0 : start_idx,
1243 0 : start_idx + data_cnt - 1,
1244 0 : FD_SLICE_MAX,
1245 0 : ctx->slice_exec_ctx.buf,
1246 0 : &slice_sz );
1247 0 : fork->end_idx += data_cnt;
1248 0 : fd_slice_exec_begin( &ctx->slice_exec_ctx, slice_sz, slot_complete );
1249 0 : fd_bank_shred_cnt_set( ctx->slot_ctx->bank, fd_bank_shred_cnt_get( ctx->slot_ctx->bank ) + data_cnt );
1250 :
1251 0 : if( FD_UNLIKELY( err ) ) {
1252 0 : FD_LOG_ERR(( "Failed to query blockstore for slot %lu", slot ));
1253 0 : }
1254 0 : }
1255 :
1256 : static void
1257 0 : kickoff_repair_orphans( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem ) {
1258 0 : ctx->slot_ctx->slot = ctx->curr_slot;
1259 0 : fd_blockstore_init( ctx->blockstore,
1260 0 : ctx->blockstore_fd,
1261 0 : FD_BLOCKSTORE_ARCHIVE_MIN_SIZE,
1262 0 : ctx->slot_ctx->slot );
1263 :
1264 0 : fd_fseq_update( ctx->published_wmark, ctx->slot_ctx->slot );
1265 0 : publish_stake_weights( ctx, stem, ctx->slot_ctx );
1266 :
1267 0 : }
1268 :
1269 : static void
1270 : read_snapshot( void * _ctx,
1271 : fd_stem_context_t * stem,
1272 : char const * snapshot,
1273 : char const * incremental,
1274 0 : char const * snapshot_dir ) {
1275 0 : fd_replay_tile_ctx_t * ctx = (fd_replay_tile_ctx_t *)_ctx;
1276 :
1277 0 : fd_exec_para_cb_ctx_t exec_para_ctx_snap = {
1278 0 : .func = snapshot_hash_tiles_cb,
1279 0 : .para_arg_1 = ctx,
1280 0 : .para_arg_2 = stem,
1281 0 : };
1282 :
1283 : /* Pass the slot_ctx to snapshot_load or recover_banks */
1284 : /* Base slot is the slot we will compare against the base slot of the incremental snapshot, to ensure that the
1285 : base slot of the incremental snapshot is the slot of the full snapshot.
1286 :
1287 : We pull this out of the full snapshot to use when verifying the incremental snapshot. */
1288 0 : ulong base_slot = 0UL;
1289 0 : if( strcmp( snapshot, "funk" )==0 || strncmp( snapshot, "wksp:", 5 )==0 ) {
1290 : /* Funk already has a snapshot loaded */
1291 0 : base_slot = ctx->slot_ctx->slot;
1292 0 : kickoff_repair_orphans( ctx, stem );
1293 0 : } else {
1294 :
1295 : /* If we have an incremental snapshot try to prefetch the snapshot slot
1296 : and manifest as soon as possible. In order to kick off repair effectively
1297 : we need the snapshot slot and the stake weights. These are both available
1298 : in the manifest. We will try to load in the manifest from the latest
1299 : snapshot that is availble, then setup the blockstore and publish the
1300 : stake weights. After this, repair will kick off concurrently with loading
1301 : the rest of the snapshots. */
1302 :
1303 : /* TODO: Verify account hashes for all 3 snapshot loads. */
1304 : /* TODO: If prefetching the manifest is enabled it leads to
1305 : incorrect snapshot loads. This needs to be looked into. */
1306 0 : if( strlen( incremental )>0UL ) {
1307 :
1308 0 : uchar * tmp_mem = fd_spad_alloc_check( ctx->runtime_spad, fd_snapshot_load_ctx_align(), fd_snapshot_load_ctx_footprint() );
1309 :
1310 0 : fd_snapshot_load_ctx_t * tmp_snap_ctx = fd_snapshot_load_new( tmp_mem,
1311 0 : incremental,
1312 0 : ctx->incremental_src_type,
1313 0 : NULL,
1314 0 : ctx->slot_ctx,
1315 0 : false,
1316 0 : false,
1317 0 : FD_SNAPSHOT_TYPE_INCREMENTAL,
1318 0 : ctx->exec_spads,
1319 0 : ctx->exec_spad_cnt,
1320 0 : ctx->runtime_spad,
1321 0 : &exec_para_ctx_snap );
1322 : /* Load the prefetch manifest, and initialize the status cache and slot context,
1323 : so that we can use these to kick off repair. */
1324 0 : fd_snapshot_load_prefetch_manifest( tmp_snap_ctx );
1325 0 : ctx->curr_slot = ctx->slot_ctx->slot;
1326 0 : kickoff_repair_orphans( ctx, stem );
1327 :
1328 0 : }
1329 :
1330 0 : FD_LOG_WARNING(("Loading full snapshot"));
1331 0 : uchar * mem = fd_spad_alloc( ctx->runtime_spad, fd_snapshot_load_ctx_align(), fd_snapshot_load_ctx_footprint() );
1332 0 : fd_snapshot_load_ctx_t * snap_ctx = fd_snapshot_load_new( mem,
1333 0 : snapshot,
1334 0 : ctx->snapshot_src_type,
1335 0 : snapshot_dir,
1336 0 : ctx->slot_ctx,
1337 0 : false,
1338 0 : false,
1339 0 : FD_SNAPSHOT_TYPE_FULL,
1340 0 : ctx->exec_spads,
1341 0 : ctx->exec_spad_cnt,
1342 0 : ctx->runtime_spad,
1343 0 : &exec_para_ctx_snap );
1344 :
1345 0 : fd_snapshot_load_init( snap_ctx );
1346 :
1347 : /* If we don't have an incremental snapshot, load the manifest and the status cache and initialize
1348 : the objects because we don't have these from the incremental snapshot. */
1349 0 : if( strlen( incremental )<=0UL ) {
1350 0 : fd_snapshot_load_manifest_and_status_cache( snap_ctx, NULL,
1351 0 : FD_SNAPSHOT_RESTORE_MANIFEST | FD_SNAPSHOT_RESTORE_STATUS_CACHE );
1352 0 : ctx->curr_slot = ctx->slot_ctx->slot;
1353 0 : kickoff_repair_orphans( ctx, stem );
1354 : /* If we don't have an incremental snapshot, we can still kick off
1355 : sending the stake weights and snapshot slot to repair. */
1356 0 : } else {
1357 : /* If we have an incremental snapshot, load the manifest and the status cache,
1358 : and don't initialize the objects because we did this above from the incremental snapshot. */
1359 0 : fd_snapshot_load_manifest_and_status_cache( snap_ctx, NULL, FD_SNAPSHOT_RESTORE_NONE );
1360 0 : }
1361 0 : base_slot = fd_snapshot_get_slot( snap_ctx );
1362 0 : fd_snapshot_load_accounts( snap_ctx );
1363 0 : fd_snapshot_load_fini( snap_ctx );
1364 0 : }
1365 :
1366 0 : if( strlen( incremental ) > 0 && strcmp( snapshot, "funk" ) != 0 ) {
1367 :
1368 : /* The slot of the full snapshot should be used as the base slot to verify the incremental snapshot,
1369 : not the slot context's slot - which is the slot of the incremental, not the full snapshot. */
1370 0 : fd_snapshot_load_all( incremental,
1371 0 : ctx->incremental_src_type,
1372 0 : NULL,
1373 0 : ctx->slot_ctx,
1374 0 : &base_slot,
1375 0 : NULL,
1376 0 : false,
1377 0 : false,
1378 0 : FD_SNAPSHOT_TYPE_INCREMENTAL,
1379 0 : ctx->exec_spads,
1380 0 : ctx->exec_spad_cnt,
1381 0 : ctx->runtime_spad );
1382 0 : ctx->curr_slot = ctx->slot_ctx->slot;
1383 0 : kickoff_repair_orphans( ctx, stem );
1384 0 : }
1385 :
1386 0 : fd_runtime_update_leaders( ctx->slot_ctx->bank,
1387 0 : ctx->slot_ctx->slot,
1388 0 : ctx->runtime_spad );
1389 0 : }
1390 :
1391 : static void
1392 : init_after_snapshot( fd_replay_tile_ctx_t * ctx,
1393 0 : fd_stem_context_t * stem ) {
1394 : /* Do not modify order! */
1395 :
1396 : /* After both snapshots have been loaded in, we can determine if we should
1397 : start distributing rewards. */
1398 :
1399 0 : fd_rewards_recalculate_partitioned_rewards( ctx->slot_ctx,
1400 0 : NULL,
1401 0 : ctx->exec_spads,
1402 0 : ctx->exec_spad_cnt,
1403 0 : ctx->runtime_spad );
1404 :
1405 0 : ulong snapshot_slot = ctx->slot_ctx->slot;
1406 0 : if( FD_UNLIKELY( !snapshot_slot ) ) {
1407 : /* Genesis-specific setup. */
1408 : /* FIXME: This branch does not set up a new block exec ctx
1409 : properly. Needs to do whatever prepare_new_block_execution
1410 : does, but just hacking that in breaks stuff. */
1411 0 : fd_runtime_update_leaders( ctx->slot_ctx->bank,
1412 0 : ctx->slot_ctx->slot,
1413 0 : ctx->runtime_spad );
1414 :
1415 0 : fd_bank_prev_slot_set( ctx->slot_ctx->bank, 0UL );
1416 0 : ctx->slot_ctx->slot = 1UL;
1417 :
1418 0 : ulong hashcnt_per_slot = fd_bank_hashes_per_tick_get( ctx->slot_ctx->bank ) * fd_bank_ticks_per_slot_get( ctx->slot_ctx->bank );
1419 0 : fd_hash_t * poh = fd_bank_poh_modify( ctx->slot_ctx->bank );
1420 0 : while(hashcnt_per_slot--) {
1421 0 : fd_sha256_hash( poh->hash, 32UL, poh->hash );
1422 0 : }
1423 :
1424 0 : FD_TEST( fd_runtime_block_execute_prepare( ctx->slot_ctx, ctx->blockstore, ctx->runtime_spad ) == 0 );
1425 0 : fd_runtime_block_info_t info = { .signature_cnt = 0 };
1426 :
1427 0 : fd_exec_para_cb_ctx_t exec_para_ctx_block_finalize = {
1428 0 : .func = block_finalize_tiles_cb,
1429 0 : .para_arg_1 = ctx,
1430 0 : .para_arg_2 = stem,
1431 0 : };
1432 :
1433 0 : fd_runtime_block_execute_finalize_para( ctx->slot_ctx,
1434 0 : ctx->capture_ctx,
1435 0 : &info,
1436 0 : ctx->exec_cnt,
1437 0 : ctx->runtime_spad,
1438 0 : &exec_para_ctx_block_finalize );
1439 :
1440 0 : ctx->slot_ctx->slot = 1UL;
1441 0 : snapshot_slot = 1UL;
1442 :
1443 : /* Now setup exec tiles for execution */
1444 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
1445 0 : ctx->exec_ready[ i ] = EXEC_TXN_READY;
1446 0 : }
1447 0 : }
1448 :
1449 0 : ctx->curr_slot = snapshot_slot;
1450 0 : ctx->parent_slot = fd_bank_prev_slot_get( ctx->slot_ctx->bank );
1451 0 : ctx->snapshot_slot = snapshot_slot;
1452 0 : ctx->flags = EXEC_FLAG_READY_NEW;
1453 :
1454 : /* Initialize consensus structures post-snapshot */
1455 :
1456 0 : fd_fork_t * snapshot_fork = fd_forks_init( ctx->forks, ctx->slot_ctx->slot );
1457 0 : if( FD_UNLIKELY( !snapshot_fork ) ) {
1458 0 : FD_LOG_CRIT(( "Failed to initialize snapshot fork" ));
1459 0 : }
1460 :
1461 0 : fd_stakes_global_t const * stakes = fd_bank_stakes_locking_query( ctx->slot_ctx->bank );
1462 0 : fd_vote_accounts_global_t const * vote_accounts = &stakes->vote_accounts;
1463 :
1464 0 : fd_vote_accounts_pair_global_t_mapnode_t * vote_accounts_pool = fd_vote_accounts_vote_accounts_pool_join( vote_accounts );
1465 0 : fd_vote_accounts_pair_global_t_mapnode_t * vote_accounts_root = fd_vote_accounts_vote_accounts_root_join( vote_accounts );
1466 :
1467 : /* Send to tower tile */
1468 :
1469 0 : if( FD_LIKELY( ctx->tower_out_idx!=ULONG_MAX ) ) {
1470 0 : uchar * chunk_laddr = fd_chunk_to_laddr( ctx->tower_out_mem, ctx->tower_out_chunk );
1471 0 : ulong off = 0;
1472 0 : for( fd_vote_accounts_pair_global_t_mapnode_t * curr = fd_vote_accounts_pair_global_t_map_minimum( vote_accounts_pool, vote_accounts_root );
1473 0 : curr;
1474 0 : curr = fd_vote_accounts_pair_global_t_map_successor( vote_accounts_pool, curr ) ) {
1475 :
1476 0 : if( FD_UNLIKELY( curr->elem.stake > 0UL ) ) {
1477 0 : memcpy( chunk_laddr + off, &curr->elem.key, sizeof(fd_pubkey_t) );
1478 0 : off += sizeof(fd_pubkey_t);
1479 :
1480 0 : memcpy( chunk_laddr + off, &curr->elem.stake, sizeof(ulong) );
1481 0 : off += sizeof(ulong);
1482 0 : }
1483 0 : }
1484 0 : fd_stem_publish( stem, ctx->tower_out_idx, snapshot_slot << 32UL | UINT_MAX, ctx->tower_out_chunk, off, 0UL, (ulong)fd_log_wallclock(), (ulong)fd_log_wallclock() );
1485 0 : }
1486 :
1487 0 : fd_bank_hash_cmp_t * bank_hash_cmp = ctx->bank_hash_cmp;
1488 0 : for( fd_vote_accounts_pair_global_t_mapnode_t * curr = fd_vote_accounts_pair_global_t_map_minimum( vote_accounts_pool, vote_accounts_root );
1489 0 : curr;
1490 0 : curr = fd_vote_accounts_pair_global_t_map_successor( vote_accounts_pool, curr ) ) {
1491 0 : bank_hash_cmp->total_stake += curr->elem.stake;
1492 0 : }
1493 0 : bank_hash_cmp->watermark = snapshot_slot;
1494 :
1495 0 : fd_bank_stakes_end_locking_query( ctx->slot_ctx->bank );
1496 :
1497 0 : ulong root = snapshot_slot;
1498 0 : if( FD_LIKELY( root > fd_fseq_query( ctx->published_wmark ) ) ) {
1499 :
1500 : /* The watermark has advanced likely because we loaded an
1501 : incremental snapshot that was downloaded just-in-time. We had
1502 : kicked off repair with an older incremental snapshot, and so now
1503 : we have to prune the relevant data structures, so replay can
1504 : start from the latest frontier.
1505 :
1506 : No funk_and_txncache_publish( ctx, wmark, &xid ); because there
1507 : are no funk txns to publish, and all rooted slots have already
1508 : been registered in the txncache when we loaded the snapshot. */
1509 :
1510 0 : if( FD_LIKELY( ctx->blockstore ) ) fd_blockstore_publish( ctx->blockstore, ctx->blockstore_fd, root );
1511 0 : if( FD_LIKELY( ctx->forks ) ) fd_forks_publish( ctx->forks, root );
1512 :
1513 0 : fd_fseq_update( ctx->published_wmark, root );
1514 0 : }
1515 :
1516 0 : FD_LOG_NOTICE(( "snapshot slot %lu", snapshot_slot ));
1517 0 : }
1518 :
1519 : void
1520 : init_snapshot( fd_replay_tile_ctx_t * ctx,
1521 0 : fd_stem_context_t * stem ) {
1522 : /* Init slot_ctx */
1523 :
1524 0 : uchar * slot_ctx_mem = fd_spad_alloc_check( ctx->runtime_spad, FD_EXEC_SLOT_CTX_ALIGN, FD_EXEC_SLOT_CTX_FOOTPRINT );
1525 0 : ctx->slot_ctx = fd_exec_slot_ctx_join( fd_exec_slot_ctx_new( slot_ctx_mem ) );
1526 0 : ctx->slot_ctx->banks = ctx->banks;
1527 0 : ctx->slot_ctx->bank = fd_banks_get_bank( ctx->banks, 0UL );
1528 :
1529 0 : ctx->slot_ctx->funk = ctx->funk;
1530 0 : ctx->slot_ctx->status_cache = ctx->status_cache;
1531 0 : fd_runtime_update_slots_per_epoch( ctx->slot_ctx->bank, FD_DEFAULT_SLOTS_PER_EPOCH );
1532 :
1533 0 : uchar is_snapshot = strlen( ctx->snapshot ) > 0;
1534 0 : if( is_snapshot ) {
1535 0 : read_snapshot( ctx, stem, ctx->snapshot, ctx->incremental, ctx->snapshot_dir );
1536 0 : }
1537 :
1538 0 : if( ctx->plugin_out->mem ) {
1539 0 : uchar msg[56];
1540 0 : fd_memset( msg, 0, sizeof(msg) );
1541 0 : msg[ 0 ] = 6;
1542 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
1543 0 : }
1544 :
1545 0 : fd_runtime_read_genesis( ctx->slot_ctx,
1546 0 : ctx->genesis,
1547 0 : is_snapshot,
1548 0 : ctx->capture_ctx,
1549 0 : ctx->runtime_spad );
1550 : /* We call this after fd_runtime_read_genesis, which sets up the
1551 : slot_bank needed in blockstore_init. */
1552 : /* FIXME: We should really only call this once. */
1553 0 : fd_blockstore_init( ctx->blockstore,
1554 0 : ctx->blockstore_fd,
1555 0 : FD_BLOCKSTORE_ARCHIVE_MIN_SIZE,
1556 0 : ctx->curr_slot );
1557 0 : init_after_snapshot( ctx, stem );
1558 :
1559 0 : if( ctx->plugin_out->mem && strlen( ctx->genesis ) > 0 ) {
1560 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_GENESIS_HASH_KNOWN, fd_bank_genesis_hash_query( ctx->slot_ctx->bank )->hash, sizeof(fd_hash_t) );
1561 0 : }
1562 :
1563 : // Tell the world about the current activate features
1564 0 : fd_features_t const * features = fd_bank_features_query( ctx->slot_ctx->bank );
1565 0 : fd_memcpy( &ctx->runtime_public->features, features, sizeof(ctx->runtime_public->features) );
1566 :
1567 : /* Publish slot notifs */
1568 0 : ulong curr_slot = ctx->curr_slot;
1569 0 : ulong block_entry_height = 0;
1570 :
1571 0 : if( is_snapshot ) {
1572 0 : for(;;){
1573 0 : fd_block_map_query_t query[1] = { 0 };
1574 0 : int err = fd_block_map_query_try( ctx->blockstore->block_map, &curr_slot, NULL, query, 0 );
1575 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
1576 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) continue; /* FIXME: eventually need an error condition here.*/
1577 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) {
1578 0 : FD_LOG_WARNING(( "Waiting for block map query for slot %lu", curr_slot ));
1579 0 : continue;
1580 0 : };
1581 0 : block_entry_height = block_info->block_height;
1582 0 : if( FD_UNLIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) break;
1583 0 : }
1584 0 : } else {
1585 : /* Block after genesis has a height of 1.
1586 : TODO: We should be able to query slot 1 block_map entry to get this
1587 : (using the above for loop), but blockstore/fork setup on genesis is
1588 : broken for now. */
1589 0 : block_entry_height = 1UL;
1590 0 : init_poh( ctx );
1591 0 : }
1592 :
1593 0 : publish_slot_notifications( ctx, stem, block_entry_height, curr_slot );
1594 :
1595 :
1596 0 : FD_TEST( ctx->slot_ctx );
1597 0 : }
1598 :
1599 : static void
1600 : publish_votes_to_plugin( fd_replay_tile_ctx_t * ctx,
1601 0 : fd_stem_context_t * stem ) {
1602 0 : uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->votes_plugin_out->mem, ctx->votes_plugin_out->chunk );
1603 :
1604 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &ctx->curr_slot, NULL, ctx->forks->pool );
1605 0 : if( FD_UNLIKELY ( !fork ) ) return;
1606 :
1607 0 : fd_vote_accounts_global_t const * epoch_stakes = fd_bank_epoch_stakes_locking_query( ctx->slot_ctx->bank );
1608 0 : fd_vote_accounts_pair_global_t_mapnode_t * epoch_stakes_pool = fd_vote_accounts_vote_accounts_pool_join( epoch_stakes );
1609 0 : fd_vote_accounts_pair_global_t_mapnode_t * epoch_stakes_root = fd_vote_accounts_vote_accounts_root_join( epoch_stakes );
1610 :
1611 0 : ulong i = 0;
1612 0 : FD_SPAD_FRAME_BEGIN( ctx->runtime_spad ) {
1613 0 : for( fd_vote_accounts_pair_global_t_mapnode_t const * n = fd_vote_accounts_pair_global_t_map_minimum_const( epoch_stakes_pool, epoch_stakes_root );
1614 0 : n && i < FD_CLUSTER_NODE_CNT;
1615 0 : n = fd_vote_accounts_pair_global_t_map_successor_const( epoch_stakes_pool, n ) ) {
1616 0 : if( n->elem.stake == 0 ) continue;
1617 :
1618 0 : uchar * data = (uchar *)&n->elem.value + n->elem.value.data_offset;
1619 0 : ulong data_len = n->elem.value.data_len;
1620 :
1621 0 : int err;
1622 0 : fd_vote_state_versioned_t * vsv = fd_bincode_decode_spad(
1623 0 : vote_state_versioned, ctx->runtime_spad,
1624 0 : data,
1625 0 : data_len,
1626 0 : &err );
1627 0 : if( FD_UNLIKELY( err ) ) {
1628 0 : FD_LOG_ERR(( "Unexpected failure in decoding vote state %d", err ));
1629 0 : }
1630 :
1631 0 : fd_pubkey_t node_pubkey;
1632 0 : ulong commission;
1633 0 : ulong epoch_credits;
1634 0 : fd_vote_epoch_credits_t const * _epoch_credits;
1635 0 : ulong root_slot;
1636 :
1637 0 : switch( vsv->discriminant ) {
1638 0 : case fd_vote_state_versioned_enum_v0_23_5:
1639 0 : node_pubkey = vsv->inner.v0_23_5.node_pubkey;
1640 0 : commission = vsv->inner.v0_23_5.commission;
1641 0 : _epoch_credits = deq_fd_vote_epoch_credits_t_cnt( vsv->inner.v0_23_5.epoch_credits ) == 0 ? NULL : deq_fd_vote_epoch_credits_t_peek_tail_const( vsv->inner.v0_23_5.epoch_credits );
1642 0 : epoch_credits = _epoch_credits==NULL ? 0UL : _epoch_credits->credits - _epoch_credits->prev_credits;
1643 0 : root_slot = vsv->inner.v0_23_5.root_slot;
1644 0 : break;
1645 0 : case fd_vote_state_versioned_enum_v1_14_11:
1646 0 : node_pubkey = vsv->inner.v1_14_11.node_pubkey;
1647 0 : commission = vsv->inner.v1_14_11.commission;
1648 0 : _epoch_credits = deq_fd_vote_epoch_credits_t_cnt( vsv->inner.v1_14_11.epoch_credits ) == 0 ? NULL : deq_fd_vote_epoch_credits_t_peek_tail_const( vsv->inner.v1_14_11.epoch_credits );
1649 0 : epoch_credits = _epoch_credits==NULL ? 0UL : _epoch_credits->credits - _epoch_credits->prev_credits;
1650 0 : root_slot = vsv->inner.v1_14_11.root_slot;
1651 0 : break;
1652 0 : case fd_vote_state_versioned_enum_current:
1653 0 : node_pubkey = vsv->inner.current.node_pubkey;
1654 0 : commission = vsv->inner.current.commission;
1655 0 : _epoch_credits = deq_fd_vote_epoch_credits_t_cnt( vsv->inner.current.epoch_credits ) == 0 ? NULL : deq_fd_vote_epoch_credits_t_peek_tail_const( vsv->inner.current.epoch_credits );
1656 0 : epoch_credits = _epoch_credits==NULL ? 0UL : _epoch_credits->credits - _epoch_credits->prev_credits;
1657 0 : root_slot = vsv->inner.v0_23_5.root_slot;
1658 0 : break;
1659 0 : default:
1660 0 : __builtin_unreachable();
1661 0 : }
1662 :
1663 0 : fd_clock_timestamp_vote_t_mapnode_t query;
1664 0 : memcpy( query.elem.pubkey.uc, n->elem.key.uc, 32UL );
1665 0 : fd_clock_timestamp_votes_global_t const * clock_timestamp_votes = fd_bank_clock_timestamp_votes_locking_query( ctx->slot_ctx->bank );
1666 0 : fd_clock_timestamp_vote_t_mapnode_t * timestamp_votes_root = fd_clock_timestamp_votes_votes_root_join( clock_timestamp_votes );
1667 0 : fd_clock_timestamp_vote_t_mapnode_t * timestamp_votes_pool = fd_clock_timestamp_votes_votes_pool_join( clock_timestamp_votes );
1668 :
1669 0 : fd_clock_timestamp_vote_t_mapnode_t * res = fd_clock_timestamp_vote_t_map_find( timestamp_votes_pool, timestamp_votes_root, &query );
1670 :
1671 0 : fd_vote_update_msg_t * msg = (fd_vote_update_msg_t *)(dst + sizeof(ulong) + i*112U);
1672 0 : memset( msg, 0, 112U );
1673 0 : memcpy( msg->vote_pubkey, n->elem.key.uc, sizeof(fd_pubkey_t) );
1674 0 : memcpy( msg->node_pubkey, node_pubkey.uc, sizeof(fd_pubkey_t) );
1675 0 : msg->activated_stake = n->elem.stake;
1676 0 : msg->last_vote = res == NULL ? 0UL : res->elem.slot;
1677 0 : msg->root_slot = root_slot;
1678 0 : msg->epoch_credits = epoch_credits;
1679 0 : msg->commission = (uchar)commission;
1680 0 : msg->is_delinquent = (uchar)fd_int_if(ctx->curr_slot >= 128UL, msg->last_vote <= ctx->curr_slot - 128UL, msg->last_vote == 0);
1681 0 : ++i;
1682 0 : fd_bank_clock_timestamp_votes_end_locking_query( ctx->slot_ctx->bank );
1683 0 : }
1684 0 : } FD_SPAD_FRAME_END;
1685 :
1686 0 : fd_bank_epoch_stakes_end_locking_query( ctx->slot_ctx->bank );
1687 :
1688 0 : *(ulong *)dst = i;
1689 :
1690 0 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
1691 0 : fd_stem_publish( stem, ctx->votes_plugin_out->idx, FD_PLUGIN_MSG_VOTE_ACCOUNT_UPDATE, ctx->votes_plugin_out->chunk, 0, 0UL, 0UL, tspub );
1692 0 : ctx->votes_plugin_out->chunk = fd_dcache_compact_next( ctx->votes_plugin_out->chunk, 8UL + 40200UL*(58UL+12UL*34UL), ctx->votes_plugin_out->chunk0, ctx->votes_plugin_out->wmark );
1693 0 : }
1694 :
1695 : static void
1696 0 : handle_writer_state_updates( fd_replay_tile_ctx_t * ctx ) {
1697 :
1698 0 : for( ulong i=0UL; i<ctx->writer_cnt; i++ ) {
1699 0 : ulong res = fd_fseq_query( ctx->writer_fseq[ i ] );
1700 0 : if( FD_UNLIKELY( fd_writer_fseq_is_not_joined( res ) ) ) {
1701 0 : FD_LOG_WARNING(( "writer tile fseq idx=%lu has not been joined by the corresponding writer tile", i ));
1702 0 : continue;
1703 0 : }
1704 :
1705 0 : uint state = fd_writer_fseq_get_state( res );
1706 0 : switch( state ) {
1707 0 : case FD_WRITER_STATE_NOT_BOOTED:
1708 0 : FD_LOG_WARNING(( "writer tile idx=%lu is still booting", i ));
1709 0 : break;
1710 0 : case FD_WRITER_STATE_READY:
1711 : /* No-op. */
1712 0 : break;
1713 0 : case FD_WRITER_STATE_TXN_DONE: {
1714 0 : uint txn_id = fd_writer_fseq_get_txn_id( res );
1715 0 : ulong exec_tile_id = fd_writer_fseq_get_exec_tile_id( res );
1716 0 : if( ctx->exec_ready[ exec_tile_id ]==EXEC_TXN_BUSY && ctx->prev_ids[ exec_tile_id ]!=txn_id ) {
1717 0 : FD_LOG_DEBUG(( "Ack that exec tile idx=%lu txn id=%u has been finalized by writer tile %lu", exec_tile_id, txn_id, i ));
1718 0 : ctx->exec_ready[ exec_tile_id ] = EXEC_TXN_READY;
1719 0 : ctx->prev_ids[ exec_tile_id ] = txn_id;
1720 0 : fd_fseq_update( ctx->writer_fseq[ i ], FD_WRITER_STATE_READY );
1721 0 : }
1722 0 : break;
1723 0 : }
1724 0 : default:
1725 0 : FD_LOG_CRIT(( "Unexpected fseq state from writer tile idx=%lu state=%u", i, state ));
1726 0 : break;
1727 0 : }
1728 0 : }
1729 :
1730 0 : }
1731 :
1732 : static void
1733 : after_credit( fd_replay_tile_ctx_t * ctx,
1734 : fd_stem_context_t * stem,
1735 : int * opt_poll_in FD_PARAM_UNUSED,
1736 0 : int * charge_busy FD_PARAM_UNUSED ) {
1737 :
1738 0 : if( FD_UNLIKELY( !ctx->snapshot_init_done ) ) {
1739 0 : if( ctx->plugin_out->mem ) {
1740 0 : uchar msg[56];
1741 0 : fd_memset( msg, 0, sizeof(msg) );
1742 0 : msg[ 0 ] = 0; // ValidatorStartProgress::Initializing
1743 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
1744 0 : }
1745 0 : init_snapshot( ctx, stem );
1746 0 : ctx->snapshot_init_done = 1;
1747 : //*charge_busy = 0;
1748 0 : }
1749 :
1750 : /* TODO: Consider moving state management to during_housekeeping */
1751 :
1752 : /* Check all the writer link fseqs. */
1753 0 : handle_writer_state_updates( ctx );
1754 :
1755 : /* If we are ready to process a new slice, we will poll for it and try
1756 : to setup execution for it. */
1757 0 : if( ctx->flags & EXEC_FLAG_READY_NEW ) {
1758 0 : handle_slice( ctx, stem );
1759 0 : }
1760 :
1761 : /* If we are currently executing a slice, proceed. */
1762 0 : if( ctx->flags & EXEC_FLAG_EXECUTING_SLICE ) {
1763 0 : exec_slice( ctx, stem, ctx->curr_slot );
1764 0 : }
1765 :
1766 0 : ulong curr_slot = ctx->curr_slot;
1767 0 : ulong flags = ctx->flags;
1768 :
1769 : /* Finished replaying a slot in this after_credit iteration. */
1770 0 : if( FD_UNLIKELY( flags & EXEC_FLAG_FINISHED_SLOT ) ){
1771 :
1772 : /* Check if the validator is caught up, and can safely be unmarked
1773 : as read-only. This happens when it has replayed through
1774 : turbine_slot0. */
1775 :
1776 0 : if( FD_UNLIKELY( ctx->read_only && ctx->curr_slot >= fd_fseq_query( ctx->turbine_slot0 ) ) ) {
1777 0 : ctx->read_only = 0;
1778 0 : }
1779 :
1780 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &ctx->curr_slot, NULL, ctx->forks->pool );
1781 :
1782 0 : FD_LOG_NOTICE(( "finished block - slot: %lu, parent_slot: %lu", curr_slot, ctx->parent_slot ));
1783 :
1784 : /**************************************************************************************************/
1785 : /* Call fd_runtime_block_execute_finalize_tpool which updates sysvar and cleanup some other stuff */
1786 : /**************************************************************************************************/
1787 :
1788 0 : fd_runtime_block_info_t runtime_block_info[1];
1789 0 : runtime_block_info->signature_cnt = fd_bank_signature_count_get( ctx->slot_ctx->bank );
1790 :
1791 0 : ctx->block_finalizing = 0;
1792 :
1793 0 : fd_exec_para_cb_ctx_t exec_para_ctx_block_finalize = {
1794 0 : .func = block_finalize_tiles_cb,
1795 0 : .para_arg_1 = ctx,
1796 0 : .para_arg_2 = stem,
1797 0 : };
1798 :
1799 0 : fd_runtime_block_execute_finalize_para( ctx->slot_ctx,
1800 0 : ctx->capture_ctx,
1801 0 : runtime_block_info,
1802 0 : ctx->exec_cnt,
1803 0 : ctx->runtime_spad,
1804 0 : &exec_para_ctx_block_finalize );
1805 :
1806 : /* Update blockstore with the freshly computed bank hash */
1807 0 : fd_block_map_query_t query[1] = { 0 };
1808 0 : fd_block_map_prepare( ctx->blockstore->block_map, &curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
1809 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
1810 0 : block_info->bank_hash = fd_bank_bank_hash_get( ctx->slot_ctx->bank );
1811 0 : fd_block_map_publish( query );
1812 :
1813 0 : fd_spad_pop( ctx->runtime_spad );
1814 0 : FD_LOG_NOTICE(( "Spad memory after executing block %lu", ctx->runtime_spad->mem_used ));
1815 :
1816 : /**********************************************************************/
1817 : /* Push notifications for slot updates and reset block_info flag */
1818 : /**********************************************************************/
1819 :
1820 0 : ulong block_entry_height = fd_blockstore_block_height_query( ctx->blockstore, curr_slot );
1821 0 : publish_slot_notifications( ctx, stem, block_entry_height, curr_slot );
1822 :
1823 0 : ctx->blockstore->shmem->lps = curr_slot;
1824 :
1825 0 : FD_TEST(fork->slot == curr_slot);
1826 0 : fork->lock = 0;
1827 :
1828 : // FD_LOG_NOTICE(( "ulong_max? %d", ctx->tower_out_idx==ULONG_MAX ));
1829 0 : if( FD_LIKELY( ctx->tower_out_idx!=ULONG_MAX && !ctx->read_only ) ) {
1830 0 : uchar * chunk_laddr = fd_chunk_to_laddr( ctx->tower_out_mem, ctx->tower_out_chunk );
1831 0 : fd_hash_t const * bank_hash = fd_bank_bank_hash_query( ctx->slot_ctx->bank );
1832 0 : fd_block_hash_queue_global_t * block_hash_queue = (fd_block_hash_queue_global_t *)&ctx->slot_ctx->bank->block_hash_queue[0];
1833 0 : fd_hash_t * last_hash = fd_block_hash_queue_last_hash_join( block_hash_queue );
1834 :
1835 0 : memcpy( chunk_laddr, bank_hash, sizeof(fd_hash_t) );
1836 0 : memcpy( chunk_laddr+sizeof(fd_hash_t), last_hash, sizeof(fd_hash_t) );
1837 0 : fd_stem_publish( stem, ctx->tower_out_idx, ctx->curr_slot << 32UL | ctx->parent_slot, ctx->tower_out_chunk, sizeof(fd_hash_t) * 2, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ), fd_frag_meta_ts_comp( fd_tickcount() ) );
1838 0 : }
1839 :
1840 : // if (FD_UNLIKELY( prev_confirmed!=ctx->forks->confirmed && ctx->plugin_out->mem ) ) {
1841 : // ulong msg[ 1 ] = { ctx->forks->confirmed };
1842 : // replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_SLOT_OPTIMISTICALLY_CONFIRMED, (uchar const *)msg, sizeof(msg) );
1843 : // }
1844 :
1845 : // if (FD_UNLIKELY( prev_finalized!=ctx->forks->finalized && ctx->plugin_out->mem ) ) {
1846 : // ulong msg[ 1 ] = { ctx->forks->finalized };
1847 : // replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_SLOT_ROOTED, (uchar const *)msg, sizeof(msg) );
1848 : // }
1849 :
1850 : /**********************************************************************/
1851 : /* Prepare bank for the next execution and write to debugging files */
1852 : /**********************************************************************/
1853 :
1854 0 : ulong prev_slot = ctx->slot_ctx->slot;
1855 :
1856 0 : fd_bank_execution_fees_set( ctx->slot_ctx->bank, 0UL );
1857 :
1858 0 : fd_bank_priority_fees_set( ctx->slot_ctx->bank, 0UL );
1859 :
1860 0 : if( FD_UNLIKELY( ctx->slots_replayed_file ) ) {
1861 0 : FD_LOG_DEBUG(( "writing %lu to slots file", prev_slot ));
1862 0 : fprintf( ctx->slots_replayed_file, "%lu\n", prev_slot );
1863 0 : fflush( ctx->slots_replayed_file );
1864 0 : }
1865 :
1866 0 : if (NULL != ctx->capture_ctx) {
1867 0 : fd_solcap_writer_flush( ctx->capture_ctx->capture );
1868 0 : }
1869 :
1870 : /**********************************************************************/
1871 : /* Bank hash comparison, and halt if there's a mismatch after replay */
1872 : /**********************************************************************/
1873 :
1874 0 : fd_hash_t const * bank_hash = fd_bank_bank_hash_query( ctx->slot_ctx->bank );
1875 0 : fd_bank_hash_cmp_t * bank_hash_cmp = ctx->bank_hash_cmp;
1876 0 : fd_bank_hash_cmp_lock( bank_hash_cmp );
1877 0 : fd_bank_hash_cmp_insert( bank_hash_cmp, curr_slot, bank_hash, 1, 0 );
1878 :
1879 : /* Try to move the bank hash comparison watermark forward */
1880 0 : for( ulong cmp_slot = bank_hash_cmp->watermark + 1; cmp_slot < curr_slot; cmp_slot++ ) {
1881 0 : if( FD_UNLIKELY( !ctx->enable_bank_hash_cmp ) ) {
1882 0 : bank_hash_cmp->watermark = cmp_slot;
1883 0 : break;
1884 0 : }
1885 0 : int rc = fd_bank_hash_cmp_check( bank_hash_cmp, cmp_slot );
1886 0 : switch ( rc ) {
1887 0 : case -1:
1888 :
1889 : /* Mismatch */
1890 :
1891 : //funk_cancel( ctx, cmp_slot );
1892 : //checkpt( ctx );
1893 0 : FD_LOG_WARNING(( "Bank hash mismatch on slot: %lu. Halting.", cmp_slot ));
1894 :
1895 0 : break;
1896 :
1897 0 : case 0:
1898 :
1899 : /* Not ready */
1900 :
1901 0 : break;
1902 :
1903 0 : case 1:
1904 :
1905 : /* Match*/
1906 :
1907 0 : bank_hash_cmp->watermark = cmp_slot;
1908 0 : break;
1909 :
1910 0 : default:;
1911 0 : }
1912 0 : }
1913 :
1914 0 : fd_bank_hash_cmp_unlock( bank_hash_cmp );
1915 0 : ctx->flags = EXEC_FLAG_READY_NEW;
1916 0 : } // end of if( FD_UNLIKELY( ( flags & EXEC_FLAG_FINISHED_SLOT ) ) )
1917 :
1918 0 : long now = fd_log_wallclock();
1919 0 : if( ctx->votes_plugin_out->mem && FD_UNLIKELY( ( now - ctx->last_plugin_push_time )>PLUGIN_PUBLISH_TIME_NS ) ) {
1920 0 : ctx->last_plugin_push_time = now;
1921 0 : publish_votes_to_plugin( ctx, stem );
1922 0 : }
1923 :
1924 0 : }
1925 :
1926 : static void
1927 : privileged_init( fd_topo_t * topo,
1928 0 : fd_topo_tile_t * tile ) {
1929 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1930 :
1931 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1932 0 : fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
1933 0 : FD_SCRATCH_ALLOC_FINI ( l, scratch_align() );
1934 0 : memset( ctx, 0, sizeof(fd_replay_tile_ctx_t) );
1935 :
1936 0 : FD_TEST( sizeof(ulong) == getrandom( &ctx->funk_seed, sizeof(ulong), 0 ) );
1937 0 : FD_TEST( sizeof(ulong) == getrandom( &ctx->status_cache_seed, sizeof(ulong), 0 ) );
1938 :
1939 0 : ctx->blockstore_fd = open( tile->replay.blockstore_file, O_RDWR | O_CREAT, 0666 );
1940 0 : if( FD_UNLIKELY( ctx->blockstore_fd == -1 ) ) {
1941 0 : FD_LOG_ERR(( "failed to open or create blockstore archival file %s %d %d %s", tile->replay.blockstore_file, ctx->blockstore_fd, errno, strerror(errno) ));
1942 0 : }
1943 :
1944 : /**********************************************************************/
1945 : /* runtime public */
1946 : /**********************************************************************/
1947 :
1948 0 : ulong replay_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "runtime_pub" );
1949 0 : if( FD_UNLIKELY( replay_obj_id==ULONG_MAX ) ) {
1950 0 : FD_LOG_ERR(( "no runtime_public" ));
1951 0 : }
1952 :
1953 0 : ctx->runtime_public_wksp = topo->workspaces[ topo->objs[ replay_obj_id ].wksp_id ].wksp;
1954 0 : if( ctx->runtime_public_wksp==NULL ) {
1955 0 : FD_LOG_ERR(( "no runtime_public workspace" ));
1956 0 : }
1957 :
1958 0 : ctx->runtime_public = fd_runtime_public_join( fd_topo_obj_laddr( topo, replay_obj_id ) );
1959 0 : if( FD_UNLIKELY( !ctx->runtime_public ) ) {
1960 0 : FD_LOG_ERR(( "no runtime_public" ));
1961 0 : }
1962 0 : }
1963 :
1964 : static void
1965 : unprivileged_init( fd_topo_t * topo,
1966 0 : fd_topo_tile_t * tile ) {
1967 :
1968 0 : FD_LOG_NOTICE(("Starting unprivileged init"));
1969 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1970 :
1971 0 : if( FD_UNLIKELY( tile->in_cnt < 3 ||
1972 0 : strcmp( topo->links[ tile->in_link_id[ PACK_IN_IDX ] ].name, "pack_replay") ||
1973 0 : strcmp( topo->links[ tile->in_link_id[ REPAIR_IN_IDX ] ].name, "repair_repla" ) ) ) {
1974 0 : FD_LOG_ERR(( "replay tile has none or unexpected input links %lu %s %s",
1975 0 : tile->in_cnt, topo->links[ tile->in_link_id[ 0 ] ].name, topo->links[ tile->in_link_id[ 1 ] ].name ));
1976 0 : }
1977 :
1978 : /**********************************************************************/
1979 : /* scratch (bump)-allocate memory owned by the replay tile */
1980 : /**********************************************************************/
1981 :
1982 : /* Do not modify order! This is join-order in unprivileged_init. */
1983 :
1984 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1985 0 : fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
1986 0 : void * capture_ctx_mem = FD_SCRATCH_ALLOC_APPEND( l, FD_CAPTURE_CTX_ALIGN, FD_CAPTURE_CTX_FOOTPRINT );
1987 0 : void * forks_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_forks_align(), fd_forks_footprint( FD_BLOCK_MAX ) );
1988 0 : for( ulong i = 0UL; i<FD_PACK_MAX_BANK_TILES; i++ ) {
1989 0 : ctx->bmtree[i] = FD_SCRATCH_ALLOC_APPEND( l, FD_BMTREE_COMMIT_ALIGN, FD_BMTREE_COMMIT_FOOTPRINT(0) );
1990 0 : }
1991 0 : void * slice_buf = FD_SCRATCH_ALLOC_APPEND( l, 128UL, FD_SLICE_MAX );
1992 0 : ulong scratch_alloc_mem = FD_SCRATCH_ALLOC_FINI ( l, scratch_align() );
1993 :
1994 0 : if( FD_UNLIKELY( scratch_alloc_mem != ( (ulong)scratch + scratch_footprint( tile ) ) ) ) {
1995 0 : FD_LOG_ERR( ( "scratch_alloc_mem did not match scratch_footprint diff: %lu alloc: %lu footprint: %lu",
1996 0 : scratch_alloc_mem - (ulong)scratch - scratch_footprint( tile ),
1997 0 : scratch_alloc_mem,
1998 0 : (ulong)scratch + scratch_footprint( tile ) ) );
1999 0 : }
2000 :
2001 : /**********************************************************************/
2002 : /* wksp */
2003 : /**********************************************************************/
2004 :
2005 0 : ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
2006 :
2007 0 : ulong blockstore_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "blockstore" );
2008 0 : FD_TEST( blockstore_obj_id!=ULONG_MAX );
2009 0 : ctx->blockstore_wksp = topo->workspaces[ topo->objs[ blockstore_obj_id ].wksp_id ].wksp;
2010 0 : if( ctx->blockstore_wksp==NULL ) {
2011 0 : FD_LOG_ERR(( "no blockstore wksp" ));
2012 0 : }
2013 :
2014 0 : ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, fd_topo_obj_laddr( topo, blockstore_obj_id ) );
2015 0 : fd_buf_shred_pool_reset( ctx->blockstore->shred_pool, 0 );
2016 0 : FD_TEST( ctx->blockstore->shmem->magic == FD_BLOCKSTORE_MAGIC );
2017 :
2018 0 : ulong status_cache_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "txncache" );
2019 0 : FD_TEST( status_cache_obj_id != ULONG_MAX );
2020 0 : ctx->status_cache_wksp = topo->workspaces[topo->objs[status_cache_obj_id].wksp_id].wksp;
2021 0 : if( ctx->status_cache_wksp == NULL ) {
2022 0 : FD_LOG_ERR(( "no status cache wksp" ));
2023 0 : }
2024 :
2025 : /**********************************************************************/
2026 : /* banks */
2027 : /**********************************************************************/
2028 :
2029 0 : ulong banks_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "banks" );
2030 0 : if( FD_UNLIKELY( banks_obj_id==ULONG_MAX ) ) {
2031 0 : FD_LOG_ERR(( "no banks" ));
2032 0 : }
2033 :
2034 0 : ctx->banks = fd_banks_join( fd_topo_obj_laddr( topo, banks_obj_id ) );
2035 0 : if( FD_UNLIKELY( !ctx->banks ) ) {
2036 0 : FD_LOG_ERR(( "failed to join banks" ));
2037 0 : }
2038 0 : fd_bank_t * bank = fd_banks_init_bank( ctx->banks, 0UL );
2039 0 : if( FD_UNLIKELY( !bank ) ) {
2040 0 : FD_LOG_ERR(( "failed to init bank" ));
2041 0 : }
2042 :
2043 : /**********************************************************************/
2044 : /* funk */
2045 : /**********************************************************************/
2046 :
2047 0 : if( FD_UNLIKELY( !fd_funk_join( ctx->funk, fd_topo_obj_laddr( topo, tile->replay.funk_obj_id ) ) ) ) {
2048 0 : FD_LOG_ERR(( "Failed to join database cache" ));
2049 0 : }
2050 :
2051 : /**********************************************************************/
2052 : /* root_slot fseq */
2053 : /**********************************************************************/
2054 :
2055 0 : ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
2056 0 : FD_TEST( root_slot_obj_id!=ULONG_MAX );
2057 0 : ctx->published_wmark = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
2058 0 : if( FD_UNLIKELY( !ctx->published_wmark ) ) FD_LOG_ERR(( "replay tile has no root_slot fseq" ));
2059 0 : FD_TEST( ULONG_MAX==fd_fseq_query( ctx->published_wmark ) );
2060 :
2061 : /**********************************************************************/
2062 : /* turbine_slot fseq */
2063 : /**********************************************************************/
2064 :
2065 0 : ulong turbine_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "turbine_slot" );
2066 0 : FD_TEST( turbine_slot_obj_id!=ULONG_MAX );
2067 0 : ctx->turbine_slot = fd_fseq_join( fd_topo_obj_laddr( topo, turbine_slot_obj_id ) );
2068 0 : if( FD_UNLIKELY( !ctx->turbine_slot ) ) FD_LOG_ERR(( "replay tile has no turb_slot fseq" ));
2069 :
2070 : /**********************************************************************/
2071 : /* TOML paths */
2072 : /**********************************************************************/
2073 :
2074 0 : ctx->blockstore_checkpt = tile->replay.blockstore_checkpt;
2075 0 : ctx->tx_metadata_storage = tile->replay.tx_metadata_storage;
2076 0 : ctx->funk_checkpt = tile->replay.funk_checkpt;
2077 0 : ctx->genesis = tile->replay.genesis;
2078 0 : ctx->incremental = tile->replay.incremental;
2079 0 : ctx->snapshot = tile->replay.snapshot;
2080 0 : ctx->snapshot_dir = tile->replay.snapshot_dir;
2081 :
2082 0 : ctx->incremental_src_type = tile->replay.incremental_src_type;
2083 0 : ctx->snapshot_src_type = tile->replay.snapshot_src_type;
2084 :
2085 : /**********************************************************************/
2086 : /* status cache */
2087 : /**********************************************************************/
2088 :
2089 0 : char const * status_cache_path = tile->replay.status_cache;
2090 0 : if ( strlen( status_cache_path ) > 0 ) {
2091 0 : FD_LOG_NOTICE(("starting status cache restore..."));
2092 0 : int err = fd_wksp_restore( ctx->status_cache_wksp, status_cache_path, (uint)ctx->status_cache_seed );
2093 0 : FD_LOG_NOTICE(("finished status cache restore..."));
2094 0 : if (err) {
2095 0 : FD_LOG_ERR(( "failed to restore %s: error %d", status_cache_path, err ));
2096 0 : }
2097 0 : fd_wksp_tag_query_info_t info;
2098 0 : ulong tag = FD_TXNCACHE_MAGIC;
2099 0 : if( fd_wksp_tag_query( ctx->status_cache_wksp, &tag, 1, &info, 1 ) > 0 ) {
2100 0 : void * status_cache_mem = fd_wksp_laddr_fast( ctx->status_cache_wksp, info.gaddr_lo );
2101 : /* Set up status cache. */
2102 0 : ctx->status_cache = fd_txncache_join( status_cache_mem );
2103 0 : if( ctx->status_cache == NULL ) {
2104 0 : FD_LOG_ERR(( "failed to join status cache in %s", status_cache_path ));
2105 0 : }
2106 0 : } else {
2107 0 : FD_LOG_ERR(( "failed to tag query status cache in %s", status_cache_path ));
2108 0 : }
2109 0 : } else {
2110 0 : void * status_cache_mem = fd_topo_obj_laddr( topo, status_cache_obj_id );
2111 0 : if (status_cache_mem == NULL) {
2112 0 : FD_LOG_ERR(( "failed to allocate status cache" ));
2113 0 : }
2114 0 : ctx->status_cache = fd_txncache_join( status_cache_mem );
2115 0 : if (ctx->status_cache == NULL) {
2116 0 : FD_LOG_ERR(( "failed to join + new status cache" ));
2117 0 : }
2118 0 : }
2119 :
2120 : /**********************************************************************/
2121 : /* spad */
2122 : /**********************************************************************/
2123 :
2124 : /* Join each of the exec spads. */
2125 0 : ctx->exec_cnt = fd_topo_tile_name_cnt( topo, "exec" );
2126 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
2127 0 : ulong exec_spad_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "exec_spad.%lu", i );
2128 0 : fd_spad_t * spad = fd_spad_join( fd_topo_obj_laddr( topo, exec_spad_id ) );
2129 0 : ctx->exec_spads[ ctx->exec_spad_cnt ] = spad;
2130 0 : if( FD_UNLIKELY( !ctx->exec_spads[ ctx->exec_spad_cnt ] ) ) {
2131 0 : FD_LOG_ERR(( "failed to join exec spad %lu", i ));
2132 0 : }
2133 0 : ctx->exec_spads_wksp[ ctx->exec_spad_cnt ] = fd_wksp_containing( spad );
2134 0 : if( FD_UNLIKELY( !ctx->exec_spads_wksp[ ctx->exec_spad_cnt ] ) ) {
2135 0 : FD_LOG_ERR(( "failed to join exec spad wksp %lu", i ));
2136 0 : }
2137 :
2138 0 : ctx->exec_spad_cnt++;
2139 0 : }
2140 :
2141 : /* Now join the spad that was setup in the runtime public topo obj. */
2142 :
2143 0 : ctx->runtime_spad = fd_runtime_public_spad( ctx->runtime_public );
2144 0 : if( FD_UNLIKELY( !ctx->runtime_spad ) ) {
2145 0 : FD_LOG_ERR(( "Unable to join the runtime_spad" ));
2146 0 : }
2147 0 : fd_spad_push( ctx->runtime_spad );
2148 :
2149 : /**********************************************************************/
2150 : /* joins */
2151 : /**********************************************************************/
2152 :
2153 0 : uchar * bank_hash_cmp_shmem = fd_spad_alloc_check( ctx->runtime_spad, fd_bank_hash_cmp_align(), fd_bank_hash_cmp_footprint() );
2154 0 : ctx->bank_hash_cmp = fd_bank_hash_cmp_join( fd_bank_hash_cmp_new( bank_hash_cmp_shmem ) );
2155 :
2156 0 : fd_cluster_version_t * cluster_version = fd_bank_cluster_version_modify( bank );
2157 :
2158 0 : if( FD_UNLIKELY( sscanf( tile->replay.cluster_version, "%u.%u.%u", &cluster_version->major, &cluster_version->minor, &cluster_version->patch )!=3 ) ) {
2159 0 : FD_LOG_ERR(( "failed to decode cluster version, configured as \"%s\"", tile->replay.cluster_version ));
2160 0 : }
2161 :
2162 0 : fd_features_t * features = fd_bank_features_modify( bank );
2163 0 : fd_features_enable_cleaned_up( features, cluster_version );
2164 :
2165 0 : char const * one_off_features[16];
2166 0 : for (ulong i = 0; i < tile->replay.enable_features_cnt; i++) {
2167 0 : one_off_features[i] = tile->replay.enable_features[i];
2168 0 : }
2169 0 : fd_features_enable_one_offs( features, one_off_features, (uint)tile->replay.enable_features_cnt, 0UL );
2170 :
2171 0 : ctx->forks = fd_forks_join( fd_forks_new( forks_mem, FD_BLOCK_MAX, 42UL ) );
2172 :
2173 : /**********************************************************************/
2174 : /* bank_hash_cmp */
2175 : /**********************************************************************/
2176 :
2177 0 : ulong bank_hash_cmp_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "bh_cmp" );
2178 0 : FD_TEST( bank_hash_cmp_obj_id!=ULONG_MAX );
2179 0 : ctx->bank_hash_cmp = fd_bank_hash_cmp_join( fd_bank_hash_cmp_new( fd_topo_obj_laddr( topo, bank_hash_cmp_obj_id ) ) );
2180 0 : if( FD_UNLIKELY( !ctx->bank_hash_cmp ) ) {
2181 0 : FD_LOG_ERR(( "failed to join bank_hash_cmp" ));
2182 0 : }
2183 :
2184 : /**********************************************************************/
2185 : /* voter */
2186 : /**********************************************************************/
2187 :
2188 0 : memcpy( ctx->validator_identity, fd_keyload_load( tile->replay.identity_key_path, 1 ), sizeof(fd_pubkey_t) );
2189 0 : *ctx->vote_authority = *ctx->validator_identity; /* FIXME */
2190 0 : memcpy( ctx->vote_acc, fd_keyload_load( tile->replay.vote_account_path, 1 ), sizeof(fd_pubkey_t) );
2191 :
2192 0 : ctx->validator_identity_pubkey[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->replay.identity_key_path, 1 ) );
2193 :
2194 : /**********************************************************************/
2195 : /* entry batch */
2196 : /**********************************************************************/
2197 :
2198 0 : fd_slice_exec_join( &ctx->slice_exec_ctx );
2199 0 : ctx->slice_exec_ctx.buf = slice_buf;
2200 :
2201 : /**********************************************************************/
2202 : /* capture */
2203 : /**********************************************************************/
2204 :
2205 0 : if ( strlen(tile->replay.solcap_capture) > 0 || strlen(tile->replay.dump_proto_dir) > 0 ) {
2206 0 : ctx->capture_ctx = fd_capture_ctx_new( capture_ctx_mem );
2207 0 : }
2208 :
2209 0 : if( strlen(tile->replay.solcap_capture) > 0 ) {
2210 0 : ctx->capture_ctx->checkpt_freq = ULONG_MAX;
2211 0 : ctx->capture_file = fopen( tile->replay.solcap_capture, "w+" );
2212 0 : if( FD_UNLIKELY( !ctx->capture_file ) ) {
2213 0 : FD_LOG_ERR(( "fopen(%s) failed (%d-%s)", tile->replay.solcap_capture, errno, strerror( errno ) ));
2214 0 : }
2215 0 : ctx->capture_ctx->capture_txns = 0;
2216 0 : ctx->capture_ctx->solcap_start_slot = tile->replay.capture_start_slot;
2217 0 : fd_solcap_writer_init( ctx->capture_ctx->capture, ctx->capture_file );
2218 0 : }
2219 :
2220 0 : if ( strlen(tile->replay.dump_proto_dir) > 0) {
2221 0 : ctx->capture_ctx->dump_proto_output_dir = tile->replay.dump_proto_dir;
2222 0 : if (tile->replay.dump_block_to_pb) {
2223 0 : ctx->capture_ctx->dump_block_to_pb = tile->replay.dump_block_to_pb;
2224 0 : }
2225 0 : }
2226 :
2227 : /**********************************************************************/
2228 : /* bank */
2229 : /**********************************************************************/
2230 :
2231 0 : ctx->bank_cnt = fd_topo_tile_name_cnt( topo, "bank" );
2232 0 : for( ulong i=0UL; i<(ctx->bank_cnt); i++ ) {
2233 0 : ulong busy_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "bank_busy.%lu", i );
2234 0 : FD_TEST( busy_obj_id!=ULONG_MAX );
2235 0 : ctx->bank_busy[ i ] = fd_fseq_join( fd_topo_obj_laddr( topo, busy_obj_id ) );
2236 0 : if( FD_UNLIKELY( !ctx->bank_busy[ i ] ) ) FD_LOG_ERR(( "banking tile %lu has no busy flag", i ));
2237 :
2238 0 : fd_replay_out_link_t * poh_out = &ctx->bank_out[ i ];
2239 0 : fd_topo_link_t * poh_out_link = &topo->links[ tile->out_link_id[ poh_out->idx+i ] ];
2240 0 : poh_out->mcache = poh_out_link->mcache;
2241 0 : poh_out->sync = fd_mcache_seq_laddr( poh_out->mcache );
2242 0 : poh_out->depth = fd_mcache_depth( poh_out->mcache );
2243 0 : poh_out->seq = fd_mcache_seq_query( poh_out->sync );
2244 0 : poh_out->mem = topo->workspaces[ topo->objs[ poh_out_link->dcache_obj_id ].wksp_id ].wksp;
2245 0 : poh_out->chunk0 = fd_dcache_compact_chunk0( poh_out->mem, poh_out_link->dcache );
2246 0 : poh_out->wmark = fd_dcache_compact_wmark( poh_out->mem, poh_out_link->dcache, poh_out_link->mtu );
2247 0 : poh_out->chunk = poh_out->chunk0;
2248 0 : }
2249 :
2250 0 : ctx->poh_init_done = 0U;
2251 0 : ctx->snapshot_init_done = 0;
2252 :
2253 : /**********************************************************************/
2254 : /* exec */
2255 : /**********************************************************************/
2256 0 : ctx->exec_cnt = fd_topo_tile_name_cnt( topo, "exec" );
2257 0 : if( FD_UNLIKELY( ctx->exec_cnt>FD_PACK_MAX_BANK_TILES ) ) {
2258 0 : FD_LOG_ERR(( "replay tile has too many exec tiles %lu", ctx->exec_cnt ));
2259 0 : }
2260 0 : if( FD_UNLIKELY( ctx->exec_cnt>UCHAR_MAX ) ) {
2261 : /* Exec tile id needs to fit in a uchar for the writer tile txn done
2262 : message. */
2263 0 : FD_LOG_CRIT(( "too many exec tiles %lu", ctx->exec_cnt ));
2264 0 : }
2265 :
2266 0 : for( ulong i = 0UL; i < ctx->exec_cnt; i++ ) {
2267 : /* Mark all initial state as not being ready. */
2268 0 : ctx->exec_ready[ i ] = EXEC_TXN_BUSY;
2269 0 : ctx->prev_ids[ i ] = FD_EXEC_ID_SENTINEL;
2270 :
2271 0 : ulong exec_fseq_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "exec_fseq.%lu", i );
2272 0 : if( FD_UNLIKELY( exec_fseq_id==ULONG_MAX ) ) {
2273 0 : FD_LOG_ERR(( "exec tile %lu has no fseq", i ));
2274 0 : }
2275 0 : ctx->exec_fseq[ i ] = fd_fseq_join( fd_topo_obj_laddr( topo, exec_fseq_id ) );
2276 0 : if( FD_UNLIKELY( !ctx->exec_fseq[ i ] ) ) {
2277 0 : FD_LOG_ERR(( "exec tile %lu has no fseq", i ));
2278 0 : }
2279 :
2280 : /* Setup out links. */
2281 0 : ulong idx = fd_topo_find_tile_out_link( topo, tile, "replay_exec", i );
2282 0 : fd_topo_link_t * exec_out_link = &topo->links[ tile->out_link_id[ idx ] ];
2283 :
2284 0 : if( strcmp( exec_out_link->name, "replay_exec" ) ) {
2285 0 : FD_LOG_ERR(("output link confusion for output %lu", idx ));
2286 0 : }
2287 :
2288 0 : fd_replay_out_link_t * exec_out = &ctx->exec_out[ i ];
2289 0 : exec_out->idx = idx;
2290 0 : exec_out->mem = topo->workspaces[ topo->objs[ exec_out_link->dcache_obj_id ].wksp_id ].wksp;
2291 0 : exec_out->chunk0 = fd_dcache_compact_chunk0( exec_out->mem, exec_out_link->dcache );
2292 0 : exec_out->wmark = fd_dcache_compact_wmark( exec_out->mem, exec_out_link->dcache, exec_out_link->mtu );
2293 0 : exec_out->chunk = exec_out->chunk0;
2294 0 : }
2295 :
2296 : /**********************************************************************/
2297 : /* writer */
2298 : /**********************************************************************/
2299 0 : ctx->writer_cnt = fd_topo_tile_name_cnt( topo, "writer" );
2300 0 : if( FD_UNLIKELY( ctx->writer_cnt>FD_PACK_MAX_BANK_TILES ) ) {
2301 0 : FD_LOG_CRIT(( "replay tile has too many writer tiles %lu", ctx->writer_cnt ));
2302 0 : }
2303 :
2304 0 : for( ulong i = 0UL; i < ctx->writer_cnt; i++ ) {
2305 :
2306 0 : ulong writer_fseq_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "writer_fseq.%lu", i );
2307 0 : if( FD_UNLIKELY( writer_fseq_id==ULONG_MAX ) ) {
2308 0 : FD_LOG_CRIT(( "writer tile %lu has no fseq", i ));
2309 0 : }
2310 0 : ctx->writer_fseq[ i ] = fd_fseq_join( fd_topo_obj_laddr( topo, writer_fseq_id ) );
2311 0 : if( FD_UNLIKELY( !ctx->writer_fseq[ i ] ) ) {
2312 0 : FD_LOG_CRIT(( "writer tile %lu has no fseq", i ));
2313 0 : }
2314 0 : }
2315 :
2316 : /**********************************************************************/
2317 : /* tower checkpointing for wen-restart */
2318 : /**********************************************************************/
2319 0 : ctx->tower_checkpt_fileno = -1;
2320 0 : if( FD_LIKELY( strlen( tile->replay.tower_checkpt )>0 ) ) {
2321 0 : ctx->tower_checkpt_fileno = open( tile->replay.tower_checkpt,
2322 0 : O_RDWR | O_CREAT,
2323 0 : S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH );
2324 0 : if( ctx->tower_checkpt_fileno<0 ) FD_LOG_ERR(( "Failed at opening the tower checkpoint file" ));
2325 0 : }
2326 :
2327 : /**********************************************************************/
2328 : /* links */
2329 : /**********************************************************************/
2330 :
2331 : /* Setup store tile input */
2332 0 : fd_topo_link_t * repair_in_link = &topo->links[ tile->in_link_id[ REPAIR_IN_IDX ] ];
2333 0 : ctx->repair_in_mem = topo->workspaces[ topo->objs[ repair_in_link->dcache_obj_id ].wksp_id ].wksp;
2334 0 : ctx->repair_in_chunk0 = fd_dcache_compact_chunk0( ctx->repair_in_mem, repair_in_link->dcache );
2335 0 : ctx->repair_in_wmark = fd_dcache_compact_wmark( ctx->repair_in_mem, repair_in_link->dcache, repair_in_link->mtu );
2336 :
2337 : /* Setup pack tile input */
2338 0 : fd_topo_link_t * pack_in_link = &topo->links[ tile->in_link_id[ PACK_IN_IDX ] ];
2339 0 : ctx->pack_in_mem = topo->workspaces[ topo->objs[ pack_in_link->dcache_obj_id ].wksp_id ].wksp;
2340 0 : ctx->pack_in_chunk0 = fd_dcache_compact_chunk0( ctx->pack_in_mem, pack_in_link->dcache );
2341 0 : ctx->pack_in_wmark = fd_dcache_compact_wmark( ctx->pack_in_mem, pack_in_link->dcache, pack_in_link->mtu );
2342 :
2343 : /* Setup tower tile input */
2344 0 : ctx->tower_in_idx = fd_topo_find_tile_in_link( topo, tile, "tower_replay", 0 );
2345 0 : if( FD_UNLIKELY( ctx->tower_in_idx==ULONG_MAX ) ) FD_LOG_WARNING(( "replay tile is missing tower input link %lu", ctx->tower_in_idx ));
2346 :
2347 0 : ulong replay_notif_idx = fd_topo_find_tile_out_link( topo, tile, "replay_notif", 0 );
2348 0 : if( FD_UNLIKELY( replay_notif_idx!=ULONG_MAX ) ) {
2349 0 : fd_topo_link_t * notif_out = &topo->links[ tile->out_link_id[ replay_notif_idx ] ];
2350 0 : FD_TEST( notif_out );
2351 0 : ctx->notif_out->idx = replay_notif_idx;
2352 0 : ctx->notif_out->mcache = notif_out->mcache;
2353 0 : ctx->notif_out->sync = fd_mcache_seq_laddr( ctx->notif_out->mcache );
2354 0 : ctx->notif_out->depth = fd_mcache_depth( ctx->notif_out->mcache );
2355 0 : ctx->notif_out->seq = fd_mcache_seq_query( ctx->notif_out->sync );
2356 0 : ctx->notif_out->mem = topo->workspaces[ topo->objs[ notif_out->dcache_obj_id ].wksp_id ].wksp;
2357 0 : ctx->notif_out->chunk0 = fd_dcache_compact_chunk0( ctx->notif_out->mem, notif_out->dcache );
2358 0 : ctx->notif_out->wmark = fd_dcache_compact_wmark ( ctx->notif_out->mem, notif_out->dcache, notif_out->mtu );
2359 0 : ctx->notif_out->chunk = ctx->notif_out->chunk0;
2360 0 : } else {
2361 0 : ctx->notif_out->mcache = NULL;
2362 0 : }
2363 :
2364 : /* Set up stake weights tile output */
2365 0 : ctx->stake_out->idx = fd_topo_find_tile_out_link( topo, tile, "stake_out", 0 );
2366 0 : FD_TEST( ctx->stake_out->idx!=ULONG_MAX );
2367 0 : fd_topo_link_t * stake_weights_out = &topo->links[ tile->out_link_id[ ctx->stake_out->idx] ];
2368 0 : ctx->stake_out->mcache = stake_weights_out->mcache;
2369 0 : ctx->stake_out->mem = topo->workspaces[ topo->objs[ stake_weights_out->dcache_obj_id ].wksp_id ].wksp;
2370 0 : ctx->stake_out->sync = fd_mcache_seq_laddr ( ctx->stake_out->mcache );
2371 0 : ctx->stake_out->depth = fd_mcache_depth ( ctx->stake_out->mcache );
2372 0 : ctx->stake_out->seq = fd_mcache_seq_query ( ctx->stake_out->sync );
2373 0 : ctx->stake_out->chunk0 = fd_dcache_compact_chunk0( ctx->stake_out->mem, stake_weights_out->dcache );
2374 0 : ctx->stake_out->wmark = fd_dcache_compact_wmark ( ctx->stake_out->mem, stake_weights_out->dcache, stake_weights_out->mtu );
2375 0 : ctx->stake_out->chunk = ctx->stake_out->chunk0;
2376 :
2377 0 : ctx->tower_out_idx = fd_topo_find_tile_out_link( topo, tile, "replay_tower", 0 );
2378 0 : if( FD_LIKELY( ctx->tower_out_idx!=ULONG_MAX ) ) {
2379 0 : fd_topo_link_t * tower_out = &topo->links[ tile->out_link_id[ ctx->tower_out_idx ] ];
2380 0 : ctx->tower_out_mem = topo->workspaces[ topo->objs[ tower_out->dcache_obj_id ].wksp_id ].wksp;
2381 0 : ctx->tower_out_chunk0 = fd_dcache_compact_chunk0( ctx->tower_out_mem, tower_out->dcache );
2382 0 : ctx->tower_out_wmark = fd_dcache_compact_wmark ( ctx->tower_out_mem, tower_out->dcache, tower_out->mtu );
2383 0 : ctx->tower_out_chunk = ctx->tower_out_chunk0;
2384 0 : FD_TEST( fd_dcache_compact_is_safe( ctx->tower_out_mem, tower_out->dcache, tower_out->mtu, tower_out->depth ) );
2385 0 : }
2386 :
2387 0 : if( FD_LIKELY( tile->replay.plugins_enabled ) ) {
2388 0 : ctx->plugin_out->idx = fd_topo_find_tile_out_link( topo, tile, "replay_plugi", 0 );
2389 0 : fd_topo_link_t const * replay_plugin_out = &topo->links[ tile->out_link_id[ ctx->plugin_out->idx] ];
2390 0 : if( strcmp( replay_plugin_out->name, "replay_plugi" ) ) {
2391 0 : FD_LOG_ERR(("output link confusion for output %lu", ctx->plugin_out->idx));
2392 0 : }
2393 0 : ctx->plugin_out->mem = topo->workspaces[ topo->objs[ replay_plugin_out->dcache_obj_id ].wksp_id ].wksp;
2394 0 : ctx->plugin_out->chunk0 = fd_dcache_compact_chunk0( ctx->plugin_out->mem, replay_plugin_out->dcache );
2395 0 : ctx->plugin_out->wmark = fd_dcache_compact_wmark ( ctx->plugin_out->mem, replay_plugin_out->dcache, replay_plugin_out->mtu );
2396 0 : ctx->plugin_out->chunk = ctx->plugin_out->chunk0;
2397 :
2398 0 : ctx->votes_plugin_out->idx = fd_topo_find_tile_out_link( topo, tile, "votes_plugin", 0 );
2399 0 : fd_topo_link_t const * votes_plugin_out = &topo->links[ tile->out_link_id[ ctx->votes_plugin_out->idx] ];
2400 0 : if( strcmp( votes_plugin_out->name, "votes_plugin" ) ) {
2401 0 : FD_LOG_ERR(("output link confusion for output %lu", ctx->votes_plugin_out->idx));
2402 0 : }
2403 0 : ctx->votes_plugin_out->mem = topo->workspaces[ topo->objs[ votes_plugin_out->dcache_obj_id ].wksp_id ].wksp;
2404 0 : ctx->votes_plugin_out->chunk0 = fd_dcache_compact_chunk0( ctx->votes_plugin_out->mem, votes_plugin_out->dcache );
2405 0 : ctx->votes_plugin_out->wmark = fd_dcache_compact_wmark ( ctx->votes_plugin_out->mem, votes_plugin_out->dcache, votes_plugin_out->mtu );
2406 0 : ctx->votes_plugin_out->chunk = ctx->votes_plugin_out->chunk0;
2407 0 : }
2408 :
2409 0 : if( strnlen( tile->replay.slots_replayed, sizeof(tile->replay.slots_replayed) )>0UL ) {
2410 0 : ctx->slots_replayed_file = fopen( tile->replay.slots_replayed, "w" );
2411 0 : FD_TEST( ctx->slots_replayed_file );
2412 0 : }
2413 :
2414 0 : FD_TEST( ctx->runtime_public!=NULL );
2415 :
2416 0 : uchar * deque_mem = fd_spad_alloc_check( ctx->runtime_spad, fd_exec_slice_align(), fd_exec_slice_footprint() );
2417 0 : ctx->exec_slice_deque = fd_exec_slice_join( fd_exec_slice_new( deque_mem ) );
2418 0 : if( FD_UNLIKELY( !ctx->exec_slice_deque ) ) {
2419 0 : FD_LOG_ERR(( "failed to join and create exec slice deque" ));
2420 0 : }
2421 :
2422 0 : FD_LOG_NOTICE(("Finished unprivileged init"));
2423 :
2424 0 : ctx->enable_bank_hash_cmp = tile->replay.enable_bank_hash_cmp;
2425 0 : }
2426 :
2427 : static ulong
2428 : populate_allowed_seccomp( fd_topo_t const * topo,
2429 : fd_topo_tile_t const * tile,
2430 : ulong out_cnt,
2431 0 : struct sock_filter * out ) {
2432 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
2433 :
2434 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
2435 0 : fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
2436 0 : FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_replay_tile_ctx_t) );
2437 :
2438 0 : populate_sock_filter_policy_fd_replay_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->blockstore_fd );
2439 0 : return sock_filter_policy_fd_replay_tile_instr_cnt;
2440 0 : }
2441 :
2442 : static ulong
2443 : populate_allowed_fds( fd_topo_t const * topo,
2444 : fd_topo_tile_t const * tile,
2445 : ulong out_fds_cnt,
2446 0 : int * out_fds ) {
2447 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
2448 :
2449 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
2450 0 : fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
2451 0 : FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_replay_tile_ctx_t) );
2452 :
2453 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
2454 :
2455 0 : ulong out_cnt = 0UL;
2456 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
2457 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
2458 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
2459 0 : out_fds[ out_cnt++ ] = ctx->blockstore_fd;
2460 0 : return out_cnt;
2461 0 : }
2462 :
2463 : static inline void
2464 0 : metrics_write( fd_replay_tile_ctx_t * ctx ) {
2465 0 : FD_MGAUGE_SET( REPLAY, LAST_VOTED_SLOT, ctx->metrics.last_voted_slot );
2466 0 : FD_MGAUGE_SET( REPLAY, SLOT, ctx->metrics.slot );
2467 0 : }
2468 :
2469 : /* TODO: This needs to get sized out correctly. */
2470 0 : #define STEM_BURST (64UL)
2471 :
2472 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_replay_tile_ctx_t
2473 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_replay_tile_ctx_t)
2474 :
2475 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
2476 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
2477 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
2478 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
2479 :
2480 : #include "../../disco/stem/fd_stem.c"
2481 :
2482 : fd_topo_run_tile_t fd_tile_replay = {
2483 : .name = "replay",
2484 : .loose_footprint = loose_footprint,
2485 : .populate_allowed_seccomp = populate_allowed_seccomp,
2486 : .populate_allowed_fds = populate_allowed_fds,
2487 : .scratch_align = scratch_align,
2488 : .scratch_footprint = scratch_footprint,
2489 : .privileged_init = privileged_init,
2490 : .unprivileged_init = unprivileged_init,
2491 : .run = stem_run,
2492 : };
|