Line data Source code
1 : #define _GNU_SOURCE
2 : #include "../../disco/tiles.h"
3 : #include "generated/fd_replay_tile_seccomp.h"
4 :
5 : #include "fd_replay_notif.h"
6 :
7 : #include "../../disco/keyguard/fd_keyload.h"
8 : #include "../../util/pod/fd_pod_format.h"
9 : #include "../../flamenco/runtime/fd_txncache.h"
10 : #include "../../flamenco/runtime/context/fd_capture_ctx.h"
11 : #include "../../flamenco/runtime/context/fd_exec_slot_ctx.h"
12 : #include "../../flamenco/runtime/program/fd_bpf_program_util.h"
13 : #include "../../flamenco/runtime/sysvar/fd_sysvar_slot_history.h"
14 : #include "../../flamenco/runtime/fd_hashes.h"
15 : #include "../../flamenco/runtime/fd_runtime_init.h"
16 : #include "../../flamenco/snapshot/fd_snapshot.h"
17 : #include "../../flamenco/stakes/fd_stakes.h"
18 : #include "../../flamenco/runtime/fd_runtime.h"
19 : #include "../../flamenco/runtime/fd_runtime_public.h"
20 : #include "../../flamenco/rewards/fd_rewards.h"
21 : #include "../../disco/metrics/fd_metrics.h"
22 : #include "../../choreo/fd_choreo.h"
23 : #include "../../disco/plugin/fd_plugin.h"
24 : #include "fd_exec.h"
25 :
26 : #include <arpa/inet.h>
27 : #include <errno.h>
28 : #include <fcntl.h>
29 : #include <linux/unistd.h>
30 : #include <netdb.h>
31 : #include <netinet/in.h>
32 : #include <sys/random.h>
33 : #include <sys/socket.h>
34 : #include <sys/stat.h>
35 : #include <sys/types.h>
36 : #include <unistd.h>
37 :
38 : #define DEQUE_NAME fd_exec_slice
39 0 : #define DEQUE_T ulong
40 0 : #define DEQUE_MAX USHORT_MAX + 1
41 : #include "../../util/tmpl/fd_deque.c"
42 :
43 : /* An estimate of the max number of transactions in a block. If there are more
44 : transactions, they must be split into multiple sets. */
45 : #define MAX_TXNS_PER_REPLAY ( ( FD_SHRED_BLK_MAX * FD_SHRED_MAX_SZ) / FD_TXN_MIN_SERIALIZED_SZ )
46 :
47 : #define PLUGIN_PUBLISH_TIME_NS ((long)60e9)
48 :
49 0 : #define REPAIR_IN_IDX (0UL)
50 0 : #define PACK_IN_IDX (1UL)
51 : #define SHRED_IN_IDX (3UL)
52 :
53 0 : #define EXEC_TXN_BUSY (0xA)
54 0 : #define EXEC_TXN_READY (0xB)
55 :
56 : #define BANK_HASH_CMP_LG_MAX (16UL)
57 :
58 : struct fd_replay_out_link {
59 : ulong idx;
60 :
61 : fd_frag_meta_t * mcache;
62 : ulong * sync;
63 : ulong depth;
64 : ulong seq;
65 :
66 : fd_wksp_t * mem;
67 : ulong chunk0;
68 : ulong wmark;
69 : ulong chunk;
70 :
71 : };
72 : typedef struct fd_replay_out_link fd_replay_out_link_t;
73 :
74 : struct fd_replay_tile_metrics {
75 : ulong slot;
76 : ulong last_voted_slot;
77 : };
78 : typedef struct fd_replay_tile_metrics fd_replay_tile_metrics_t;
79 : #define FD_REPLAY_TILE_METRICS_FOOTPRINT ( sizeof( fd_replay_tile_metrics_t ) )
80 :
81 : struct fd_replay_tile_ctx {
82 : fd_wksp_t * wksp;
83 : fd_wksp_t * blockstore_wksp;
84 : fd_wksp_t * status_cache_wksp;
85 :
86 : fd_wksp_t * runtime_public_wksp;
87 : fd_runtime_public_t * runtime_public;
88 :
89 : // Store tile input
90 : fd_wksp_t * repair_in_mem;
91 : ulong repair_in_chunk0;
92 : ulong repair_in_wmark;
93 :
94 : // Pack tile input
95 : fd_wksp_t * pack_in_mem;
96 : ulong pack_in_chunk0;
97 : ulong pack_in_wmark;
98 :
99 : /* Tower tile input */
100 : ulong tower_in_idx;
101 :
102 : // Notification output defs
103 : fd_replay_out_link_t notif_out[1];
104 :
105 : // Stake weights output link defs
106 : fd_replay_out_link_t stake_out[1];
107 :
108 : ulong tower_out_idx;
109 : fd_wksp_t * tower_out_mem;
110 : ulong tower_out_chunk0;
111 : ulong tower_out_wmark;
112 : ulong tower_out_chunk;
113 :
114 : // Inputs to plugin/gui
115 : fd_replay_out_link_t plugin_out[1];
116 : fd_replay_out_link_t votes_plugin_out[1];
117 : long last_plugin_push_time;
118 :
119 : char const * blockstore_checkpt;
120 : int tx_metadata_storage;
121 : char const * funk_checkpt;
122 : char const * genesis;
123 : char const * incremental;
124 : char const * snapshot;
125 : char const * snapshot_dir;
126 : int incremental_src_type;
127 : int snapshot_src_type;
128 :
129 : /* Do not modify order! This is join-order in unprivileged_init. */
130 :
131 : fd_funk_t funk[1];
132 : fd_forks_t * forks;
133 :
134 : fd_pubkey_t validator_identity[1];
135 : fd_pubkey_t vote_authority[1];
136 : fd_pubkey_t vote_acc[1];
137 :
138 : /* Vote accounts in the current epoch. Lifetimes of the vote account
139 : addresses (pubkeys) are valid for the epoch (the pubkey memory is
140 : owned by the epoch bank). */
141 :
142 : fd_voter_t * epoch_voters; /* Map chain of slot->voter */
143 : fd_bank_hash_cmp_t * bank_hash_cmp; /* Maintains bank hashes seen from votes */
144 :
145 : /* Blockstore local join */
146 :
147 : fd_blockstore_t blockstore_ljoin;
148 : int blockstore_fd; /* file descriptor for archival file */
149 : fd_blockstore_t * blockstore;
150 :
151 : /* Updated during execution */
152 :
153 : fd_exec_slot_ctx_t * slot_ctx;
154 : fd_slice_exec_t slice_exec_ctx;
155 :
156 : /* TODO: Some of these arrays should be bitvecs that get masked into. */
157 : ulong exec_cnt;
158 : fd_replay_out_link_t exec_out [ FD_PACK_MAX_BANK_TILES ]; /* Sending to exec unexecuted txns */
159 : uchar exec_ready[ FD_PACK_MAX_BANK_TILES ]; /* Is tile ready */
160 : uint prev_ids [ FD_PACK_MAX_BANK_TILES ]; /* Previous txn id if any */
161 : ulong * exec_fseq [ FD_PACK_MAX_BANK_TILES ]; /* fseq of the last executed txn */
162 : int block_finalizing;
163 :
164 : ulong writer_cnt;
165 : ulong * writer_fseq[ FD_PACK_MAX_BANK_TILES ];
166 :
167 : /* Metadata updated during execution */
168 :
169 : ulong curr_slot;
170 : ulong parent_slot;
171 : ulong snapshot_slot;
172 : ulong * turbine_slot0;
173 : ulong * turbine_slot;
174 : ulong root; /* the root slot is the most recent slot to have reached
175 : max lockout in the tower */
176 : ulong flags;
177 : ulong bank_idx;
178 :
179 : /* Other metadata */
180 :
181 : ulong funk_seed;
182 : ulong status_cache_seed;
183 : fd_capture_ctx_t * capture_ctx;
184 : FILE * capture_file;
185 : FILE * slots_replayed_file;
186 :
187 : ulong * bank_busy[ FD_PACK_MAX_BANK_TILES ];
188 : ulong bank_cnt;
189 : fd_replay_out_link_t bank_out[ FD_PACK_MAX_BANK_TILES ]; /* Sending to PoH finished txns + a couple more tasks ??? */
190 :
191 :
192 : ulong * published_wmark; /* publish watermark. The watermark is defined as the
193 : minimum of the tower root (root above) and blockstore
194 : smr (blockstore->smr). The watermark is used to
195 : publish our fork-aware structures eg. blockstore,
196 : forks, ghost. In general, publishing has the effect of
197 : pruning minority forks in those structures,
198 : indicating that is ok to release the memory being
199 : occupied by said forks.
200 :
201 : The reason it has to be the minimum of the two, is the
202 : tower root can lag the SMR and vice versa, but both
203 : the fork-aware structures need to maintain information
204 : through both of those slots. */
205 :
206 : ulong * poh; /* proof-of-history slot */
207 : uint poh_init_done;
208 : int snapshot_init_done;
209 :
210 : int tower_checkpt_fileno;
211 :
212 : int vote;
213 : fd_pubkey_t validator_identity_pubkey[ 1 ];
214 :
215 : fd_txncache_t * status_cache;
216 : void * bmtree[ FD_PACK_MAX_BANK_TILES ];
217 :
218 : /* The spad allocators used by the executor tiles are NOT the same as the
219 : spad used for general, longer-lasting spad allocations. The lifetime of
220 : the exec spad is just through an execution. The runtime spad is scoped
221 : to the runtime. The top-most frame will persist for the entire duration
222 : of the process. There will also be a potential second frame that persists
223 : across multiple slots that is created for rewards distrobution. Every other
224 : spad frame should NOT exist beyond the scope of a block. */
225 :
226 : fd_spad_t * exec_spads[ FD_PACK_MAX_BANK_TILES ];
227 : fd_wksp_t * exec_spads_wksp[ FD_PACK_MAX_BANK_TILES ];
228 : ulong exec_spad_cnt;
229 :
230 : fd_spad_t * runtime_spad;
231 : ulong * is_constipated; /* Shared fseq to determine if funk should be constipated */
232 :
233 : fd_funk_txn_t * false_root;
234 :
235 : int read_only; /* The read-only slot is the slot the validator needs
236 : to replay through before it can proceed with any
237 : write operations such as voting or building blocks.
238 :
239 : This restriction is for safety reasons: the
240 : validator could otherwise equivocate a previous vote
241 : or block. */
242 :
243 : int blocked_on_mblock; /* Flag used for synchronizing on mblock boundaries. */
244 :
245 : /* Metrics */
246 : fd_replay_tile_metrics_t metrics;
247 :
248 : ulong * exec_slice_deque; /* Deque to buffer exec slices - lives in spad */
249 :
250 : ulong enable_bank_hash_cmp;
251 :
252 : fd_banks_t * banks;
253 : int is_booted;
254 : };
255 : typedef struct fd_replay_tile_ctx fd_replay_tile_ctx_t;
256 :
257 : FD_FN_CONST static inline ulong
258 0 : scratch_align( void ) {
259 0 : return 128UL;
260 0 : }
261 :
262 : FD_FN_PURE static inline ulong
263 0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
264 0 : return 24UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
265 0 : }
266 :
267 : FD_FN_PURE static inline ulong
268 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
269 :
270 : /* Do not modify order! This is join-order in unprivileged_init. */
271 :
272 0 : ulong l = FD_LAYOUT_INIT;
273 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
274 0 : l = FD_LAYOUT_APPEND( l, FD_CAPTURE_CTX_ALIGN, FD_CAPTURE_CTX_FOOTPRINT );
275 0 : l = FD_LAYOUT_APPEND( l, fd_forks_align(), fd_forks_footprint( FD_BLOCK_MAX ) );
276 0 : for( ulong i = 0UL; i<FD_PACK_MAX_BANK_TILES; i++ ) {
277 0 : l = FD_LAYOUT_APPEND( l, FD_BMTREE_COMMIT_ALIGN, FD_BMTREE_COMMIT_FOOTPRINT(0) );
278 0 : }
279 0 : l = FD_LAYOUT_APPEND( l, 128UL, FD_SLICE_MAX );
280 0 : l = FD_LAYOUT_FINI ( l, scratch_align() );
281 0 : return l;
282 0 : }
283 :
284 : /* Receives from repair newly completed slices of executable slots on
285 : the frontier. Guaranteed good properties, like happiness, in order,
286 : executable immediately as long as the mcache wasn't overrun. */
287 : static int
288 : before_frag( fd_replay_tile_ctx_t * ctx,
289 : ulong in_idx,
290 : ulong seq,
291 0 : ulong sig ) {
292 0 : (void)seq;
293 :
294 0 : if( in_idx==REPAIR_IN_IDX ) {
295 0 : FD_LOG_DEBUG(( "rx slice from repair tile %lu %u", fd_disco_repair_replay_sig_slot( sig ), fd_disco_repair_replay_sig_data_cnt( sig ) ));
296 0 : fd_exec_slice_push_tail( ctx->exec_slice_deque, sig );
297 0 : return 1;
298 0 : }
299 0 : return 0;
300 0 : }
301 :
302 : /* Large number of helpers for after_credit begin here */
303 :
304 : static void
305 : publish_stake_weights( fd_replay_tile_ctx_t * ctx,
306 : fd_stem_context_t * stem,
307 0 : fd_exec_slot_ctx_t * slot_ctx ) {
308 0 : fd_epoch_schedule_t const * epoch_schedule = fd_bank_epoch_schedule_query( slot_ctx->bank );
309 :
310 0 : fd_vote_accounts_global_t const * epoch_stakes = fd_bank_epoch_stakes_locking_query( slot_ctx->bank );
311 0 : fd_vote_accounts_pair_global_t_mapnode_t * epoch_stakes_root = fd_vote_accounts_vote_accounts_root_join( epoch_stakes );
312 :
313 0 : if( epoch_stakes_root!=NULL ) {
314 0 : ulong * stake_weights_msg = fd_chunk_to_laddr( ctx->stake_out->mem,
315 0 : ctx->stake_out->chunk );
316 0 : ulong epoch = fd_slot_to_leader_schedule_epoch( epoch_schedule, slot_ctx->slot );
317 0 : ulong stake_weights_sz = generate_stake_weight_msg( slot_ctx, ctx->runtime_spad, epoch - 1, stake_weights_msg );
318 0 : ulong stake_weights_sig = 4UL;
319 0 : fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_out->chunk, stake_weights_sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
320 0 : ctx->stake_out->chunk = fd_dcache_compact_next( ctx->stake_out->chunk, stake_weights_sz, ctx->stake_out->chunk0, ctx->stake_out->wmark );
321 0 : FD_LOG_NOTICE(("sending current epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
322 0 : }
323 :
324 0 : fd_bank_epoch_stakes_end_locking_query( slot_ctx->bank );
325 :
326 0 : fd_vote_accounts_global_t const * next_epoch_stakes = fd_bank_next_epoch_stakes_locking_query( slot_ctx->bank );
327 0 : fd_vote_accounts_pair_global_t_mapnode_t * next_epoch_stakes_root = fd_vote_accounts_vote_accounts_root_join( next_epoch_stakes );
328 :
329 0 : if( next_epoch_stakes_root!=NULL ) {
330 0 : ulong * stake_weights_msg = fd_chunk_to_laddr( ctx->stake_out->mem, ctx->stake_out->chunk );
331 0 : ulong epoch = fd_slot_to_leader_schedule_epoch( epoch_schedule,
332 0 : slot_ctx->slot ); /* epoch */
333 0 : ulong stake_weights_sz = generate_stake_weight_msg( slot_ctx, ctx->runtime_spad, epoch, stake_weights_msg );
334 0 : ulong stake_weights_sig = 4UL;
335 0 : fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_out->chunk, stake_weights_sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
336 0 : ctx->stake_out->chunk = fd_dcache_compact_next( ctx->stake_out->chunk, stake_weights_sz, ctx->stake_out->chunk0, ctx->stake_out->wmark );
337 0 : FD_LOG_NOTICE(("sending next epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
338 0 : }
339 0 : fd_bank_next_epoch_stakes_end_locking_query( slot_ctx->bank );
340 0 : }
341 :
342 : static void
343 : snapshot_hash_tiles_cb( void * para_arg_1,
344 : void * para_arg_2,
345 : void * fn_arg_1,
346 : void * fn_arg_2 FD_PARAM_UNUSED,
347 : void * fn_arg_3 FD_PARAM_UNUSED,
348 0 : void * fn_arg_4 FD_PARAM_UNUSED ) {
349 :
350 0 : fd_replay_tile_ctx_t * ctx = (fd_replay_tile_ctx_t *)para_arg_1;
351 0 : fd_stem_context_t * stem = (fd_stem_context_t *)para_arg_2;
352 0 : fd_subrange_task_info_t * task_info = (fd_subrange_task_info_t *)fn_arg_1;
353 :
354 0 : ulong num_lists = ctx->exec_cnt;
355 0 : FD_LOG_NOTICE(( "launching %lu hash tasks", num_lists ));
356 0 : fd_pubkey_hash_pair_list_t * lists = fd_spad_alloc( ctx->runtime_spad, alignof(fd_pubkey_hash_pair_list_t), num_lists * sizeof(fd_pubkey_hash_pair_list_t) );
357 0 : fd_lthash_value_t * lthash_values = fd_spad_alloc( ctx->runtime_spad, FD_LTHASH_VALUE_ALIGN, num_lists * FD_LTHASH_VALUE_FOOTPRINT );
358 0 : for( ulong i = 0; i < num_lists; i++ ) {
359 0 : fd_lthash_zero( <hash_values[i] );
360 0 : }
361 :
362 0 : task_info->num_lists = num_lists;
363 0 : task_info->lists = lists;
364 0 : task_info->lthash_values = lthash_values;
365 :
366 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
367 0 : fd_stem_publish( stem, ctx->exec_out[i].idx, EXEC_SNAP_HASH_ACCS_CNT_SIG, ctx->exec_out[i].chunk, 0UL, 0UL, 0UL, 0UL );
368 0 : ctx->exec_out[i].chunk = fd_dcache_compact_next( ctx->exec_out[i].chunk, 0UL, ctx->exec_out[i].chunk0, ctx->exec_out[i].wmark );
369 0 : }
370 :
371 0 : uchar cnt_done[ FD_PACK_MAX_BANK_TILES ] = {0};
372 0 : for( ;; ) {
373 0 : uchar wait_cnt = 0;
374 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
375 0 : if( !cnt_done[ i ] ) {
376 0 : ulong res = fd_fseq_query( ctx->exec_fseq[ i ] );
377 0 : uint state = fd_exec_fseq_get_state( res );
378 0 : if( state==FD_EXEC_STATE_SNAP_CNT_DONE ) {
379 0 : FD_LOG_DEBUG(( "Acked hash cnt msg" ));
380 0 : cnt_done[ i ] = 1;
381 0 : task_info->lists[ i ].pairs = fd_spad_alloc( ctx->runtime_spad,
382 0 : FD_PUBKEY_HASH_PAIR_ALIGN,
383 0 : fd_exec_fseq_get_pairs_len( res ) * sizeof(fd_pubkey_hash_pair_t) );
384 0 : } else {
385 0 : wait_cnt++;
386 0 : }
387 0 : }
388 0 : }
389 0 : if( !wait_cnt ) {
390 0 : break;
391 0 : }
392 0 : }
393 :
394 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
395 :
396 0 : fd_replay_out_link_t * exec_out = &ctx->exec_out[ i ];
397 :
398 0 : fd_runtime_public_snap_hash_msg_t * gather_msg = (fd_runtime_public_snap_hash_msg_t *)fd_chunk_to_laddr( exec_out->mem, exec_out->chunk );
399 :
400 0 : gather_msg->lt_hash_value_out_gaddr = fd_wksp_gaddr_fast( ctx->runtime_public_wksp, <hash_values[i] );
401 0 : gather_msg->num_pairs_out_gaddr = fd_wksp_gaddr_fast( ctx->runtime_public_wksp, &task_info->lists[i].pairs_len );
402 0 : gather_msg->pairs_gaddr = fd_wksp_gaddr_fast( ctx->runtime_public_wksp, task_info->lists[i].pairs );
403 :
404 0 : fd_stem_publish( stem, ctx->exec_out[i].idx, EXEC_SNAP_HASH_ACCS_GATHER_SIG, ctx->exec_out[i].chunk, sizeof(fd_runtime_public_snap_hash_msg_t), 0UL, 0UL, 0UL );
405 0 : ctx->exec_out[i].chunk = fd_dcache_compact_next( ctx->exec_out[i].chunk, sizeof(fd_runtime_public_snap_hash_msg_t), ctx->exec_out[i].chunk0, ctx->exec_out[i].wmark );
406 0 : }
407 :
408 :
409 0 : memset( cnt_done, 0, sizeof(cnt_done) );
410 0 : for( ;; ) {
411 0 : uchar wait_cnt = 0;
412 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
413 0 : if( !cnt_done[ i ] ) {
414 0 : ulong res = fd_fseq_query( ctx->exec_fseq[ i ] );
415 0 : uint state = fd_exec_fseq_get_state( res );
416 0 : if( state==FD_EXEC_STATE_SNAP_GATHER_DONE ) {
417 0 : FD_LOG_DEBUG(( "Acked hash gather msg" ));
418 0 : cnt_done[ i ] = 1;
419 0 : } else {
420 0 : wait_cnt++;
421 0 : }
422 0 : }
423 0 : }
424 0 : if( !wait_cnt ) {
425 0 : break;
426 0 : }
427 0 : }
428 0 : }
429 :
430 : static void
431 : block_finalize_tiles_cb( void * para_arg_1,
432 : void * para_arg_2,
433 : void * fn_arg_1,
434 : void * fn_arg_2 FD_PARAM_UNUSED,
435 : void * fn_arg_3 FD_PARAM_UNUSED,
436 0 : void * fn_arg_4 FD_PARAM_UNUSED ) {
437 :
438 0 : fd_replay_tile_ctx_t * ctx = (fd_replay_tile_ctx_t *)para_arg_1;
439 0 : fd_stem_context_t * stem = (fd_stem_context_t *)para_arg_2;
440 0 : fd_accounts_hash_task_data_t * task_data = (fd_accounts_hash_task_data_t *)fn_arg_1;
441 :
442 0 : ulong cnt_per_worker;
443 0 : if( ctx->exec_cnt>1 ) cnt_per_worker = (task_data->info_sz / (ctx->exec_cnt-1UL)) + 1UL; /* ??? */
444 0 : else cnt_per_worker = task_data->info_sz;
445 0 : ulong task_infos_gaddr = fd_wksp_gaddr_fast( ctx->runtime_public_wksp, task_data->info );
446 :
447 0 : uchar hash_done[ FD_PACK_MAX_BANK_TILES ] = {0};
448 0 : for( ulong worker_idx=0UL; worker_idx<ctx->exec_cnt; worker_idx++ ) {
449 :
450 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
451 :
452 0 : ulong lt_hash_gaddr = fd_wksp_gaddr_fast( ctx->runtime_public_wksp, &task_data->lthash_values[ worker_idx ] );
453 0 : if( FD_UNLIKELY( !lt_hash_gaddr ) ) {
454 0 : FD_LOG_ERR(( "lt_hash_gaddr is NULL" ));
455 0 : return;
456 0 : }
457 :
458 0 : ulong start_idx = worker_idx * cnt_per_worker;
459 0 : if( start_idx >= task_data->info_sz ) {
460 : /* If we do not any work for this worker to do, skip it. */
461 0 : hash_done[ worker_idx ] = 1;
462 0 : continue;
463 0 : }
464 0 : ulong end_idx = fd_ulong_sat_sub( start_idx + cnt_per_worker, 1UL );
465 0 : if( end_idx >= task_data->info_sz ) {
466 0 : end_idx = fd_ulong_sat_sub( task_data->info_sz, 1UL );
467 0 : }
468 :
469 0 : fd_replay_out_link_t * exec_out = &ctx->exec_out[ worker_idx ];
470 :
471 0 : fd_runtime_public_hash_bank_msg_t * hash_msg = (fd_runtime_public_hash_bank_msg_t *)fd_chunk_to_laddr( exec_out->mem, exec_out->chunk );
472 0 : generate_hash_bank_msg( task_infos_gaddr, lt_hash_gaddr, start_idx, end_idx, ctx->curr_slot, hash_msg );
473 :
474 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
475 0 : fd_stem_publish( stem,
476 0 : exec_out->idx,
477 0 : EXEC_HASH_ACCS_SIG,
478 0 : exec_out->chunk,
479 0 : sizeof(fd_runtime_public_hash_bank_msg_t),
480 0 : 0UL,
481 0 : tsorig,
482 0 : tspub );
483 0 : exec_out->chunk = fd_dcache_compact_next( exec_out->chunk, sizeof(fd_runtime_public_hash_bank_msg_t), exec_out->chunk0, exec_out->wmark );
484 0 : }
485 :
486 : /* Spins and blocks until all exec tiles are done hashing. */
487 0 : for( ;; ) {
488 0 : uchar wait_cnt = 0;
489 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
490 0 : if( !hash_done[ i ] ) {
491 0 : ulong res = fd_fseq_query( ctx->exec_fseq[ i ] );
492 0 : uint state = fd_exec_fseq_get_state( res );
493 0 : uint slot = fd_exec_fseq_get_slot( res );
494 : /* We need to compare the state and a unique identifier (slot)
495 : in the case where the last thing the exec tile did is to hash
496 : accounts. */
497 0 : if( state==FD_EXEC_STATE_HASH_DONE && slot==ctx->curr_slot ) {
498 0 : hash_done[ i ] = 1;
499 0 : } else {
500 0 : wait_cnt++;
501 0 : }
502 0 : }
503 0 : }
504 0 : if( !wait_cnt ) {
505 0 : break;
506 0 : }
507 0 : }
508 0 : }
509 :
510 :
511 : FD_FN_UNUSED static void
512 0 : checkpt( fd_replay_tile_ctx_t * ctx ) {
513 0 : if( FD_UNLIKELY( ctx->slots_replayed_file ) ) fclose( ctx->slots_replayed_file );
514 0 : if( FD_UNLIKELY( strcmp( ctx->blockstore_checkpt, "" ) ) ) {
515 0 : int rc = fd_wksp_checkpt( ctx->blockstore_wksp, ctx->blockstore_checkpt, 0666, 0, NULL );
516 0 : if( rc ) {
517 0 : FD_LOG_ERR( ( "blockstore checkpt failed: error %d", rc ) );
518 0 : }
519 0 : }
520 0 : int rc = fd_wksp_checkpt( ctx->funk->wksp, ctx->funk_checkpt, 0666, 0, NULL );
521 0 : if( rc ) {
522 0 : FD_LOG_ERR( ( "funk checkpt failed: error %d", rc ) );
523 0 : }
524 0 : }
525 :
526 : static void FD_FN_UNUSED
527 0 : funk_cancel( fd_replay_tile_ctx_t * ctx, ulong mismatch_slot ) {
528 0 : fd_funk_txn_start_write( ctx->funk );
529 0 : fd_funk_txn_xid_t xid = { .ul = { mismatch_slot, mismatch_slot } };
530 0 : fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
531 0 : fd_funk_txn_t * mismatch_txn = fd_funk_txn_query( &xid, txn_map );
532 0 : FD_TEST( fd_funk_txn_cancel( ctx->funk, mismatch_txn, 1 ) );
533 0 : fd_funk_txn_end_write( ctx->funk );
534 0 : }
535 :
536 : static void
537 : txncache_publish( fd_replay_tile_ctx_t * ctx,
538 : fd_funk_txn_t * to_root_txn,
539 0 : fd_funk_txn_t * rooted_txn ) {
540 :
541 :
542 : /* For the status cache, we stop rooting until the status cache has been
543 : written out to the current snapshot. We also need to iterate up the
544 : funk transaction tree up until the current "root" to figure out what slots
545 : should be registered. This root can correspond to the latest false root if
546 : one exists. */
547 :
548 :
549 0 : if( FD_UNLIKELY( !ctx->slot_ctx->status_cache ) ) {
550 0 : return;
551 0 : }
552 :
553 0 : fd_funk_txn_start_read( ctx->funk );
554 :
555 0 : fd_funk_txn_t * txn = to_root_txn;
556 0 : fd_funk_txn_pool_t * txn_pool = fd_funk_txn_pool( ctx->funk );
557 0 : while( txn!=rooted_txn ) {
558 0 : ulong slot = txn->xid.ul[0];
559 0 : if( FD_LIKELY( !fd_txncache_get_is_constipated( ctx->slot_ctx->status_cache ) ) ) {
560 0 : FD_LOG_INFO(( "Registering slot %lu", slot ));
561 0 : fd_txncache_register_root_slot( ctx->slot_ctx->status_cache, slot );
562 0 : } else {
563 0 : FD_LOG_INFO(( "Registering constipated slot %lu", slot ));
564 0 : fd_txncache_register_constipated_slot( ctx->slot_ctx->status_cache, slot );
565 0 : }
566 0 : txn = fd_funk_txn_parent( txn, txn_pool );
567 0 : }
568 :
569 0 : fd_funk_txn_end_read( ctx->funk );
570 0 : }
571 :
572 : static void
573 : funk_publish( fd_replay_tile_ctx_t * ctx,
574 : fd_funk_txn_t * to_root_txn,
575 : ulong wmk,
576 0 : uchar is_constipated ) {
577 :
578 0 : fd_funk_txn_start_write( ctx->funk );
579 :
580 0 : fd_funk_txn_pool_t * txn_pool = fd_funk_txn_pool( ctx->funk );
581 :
582 : /* Try to publish into Funk */
583 0 : if( is_constipated ) {
584 0 : FD_LOG_NOTICE(( "Publishing slot=%lu while constipated", wmk ));
585 :
586 : /* At this point, first collapse the current transaction that should be
587 : published into the oldest child transaction. */
588 0 : FD_LOG_NOTICE(( "Publishing into constipated root for wmk=%lu", wmk ));
589 0 : fd_funk_txn_t * txn = to_root_txn;
590 :
591 0 : while( txn!=ctx->false_root ) {
592 0 : if( FD_UNLIKELY( fd_funk_txn_publish_into_parent( ctx->funk, txn, 0 ) ) ) {
593 0 : FD_LOG_ERR(( "Can't publish funk transaction" ));
594 0 : }
595 0 : txn = fd_funk_txn_parent( txn, txn_pool );
596 0 : }
597 :
598 0 : } else {
599 : /* This is the case where we are not in the constipated case. We only need
600 : to do special handling in the case where the epoch account hash is about
601 : to be calculated. */
602 0 : FD_LOG_DEBUG(( "Publishing slot=%lu xid=%lu", wmk, to_root_txn->xid.ul[0] ));
603 :
604 : /* This is the standard case. Publish all transactions up to and
605 : including the watermark. This will publish any in-prep ancestors
606 : of root_txn as well. */
607 0 : if( FD_UNLIKELY( !fd_funk_txn_publish( ctx->funk, to_root_txn, 1 ) ) ) {
608 0 : FD_LOG_ERR(( "failed to funk publish slot %lu", wmk ));
609 0 : }
610 0 : }
611 0 : fd_funk_txn_end_write( ctx->funk );
612 :
613 0 : if( FD_LIKELY( FD_FEATURE_ACTIVE_BANK( ctx->slot_ctx->bank, epoch_accounts_hash ) &&
614 0 : !FD_FEATURE_ACTIVE_BANK( ctx->slot_ctx->bank, accounts_lt_hash ) ) ) {
615 :
616 0 : if( wmk>=fd_bank_eah_start_slot_get( ctx->slot_ctx->bank ) ) {
617 0 : fd_exec_para_cb_ctx_t exec_para_ctx = {
618 0 : .func = fd_accounts_hash_counter_and_gather_tpool_cb,
619 0 : .para_arg_1 = NULL,
620 0 : .para_arg_2 = NULL
621 0 : };
622 :
623 0 : fd_hash_t out_hash = {0};
624 0 : fd_accounts_hash( ctx->slot_ctx->funk,
625 0 : ctx->slot_ctx->slot,
626 0 : &out_hash,
627 0 : ctx->runtime_spad,
628 0 : fd_bank_features_query( ctx->slot_ctx->bank ),
629 0 : &exec_para_ctx,
630 0 : NULL );
631 0 : FD_LOG_NOTICE(( "Done computing epoch account hash (%s)", FD_BASE58_ENC_32_ALLOCA( &out_hash ) ));
632 :
633 0 : fd_bank_epoch_account_hash_set( ctx->slot_ctx->bank, out_hash );
634 :
635 0 : fd_bank_eah_start_slot_set( ctx->slot_ctx->bank, FD_SLOT_NULL );
636 0 : }
637 0 : }
638 :
639 0 : }
640 :
641 : static fd_funk_txn_t*
642 : get_rooted_txn( fd_replay_tile_ctx_t * ctx,
643 : fd_funk_txn_t * to_root_txn,
644 0 : uchar is_constipated ) {
645 :
646 : /* We need to get the rooted transaction that we are publishing into. This
647 : needs to account for the two different cases: no constipation and single
648 : constipation.
649 :
650 : Also, if it's the first time that we are setting the false root, then
651 : we must also register it into the status cache because we don't register
652 : the root in txncache_publish to avoid registering the same slot multiple times. */
653 :
654 0 : fd_funk_txn_pool_t * txn_pool = fd_funk_txn_pool( ctx->funk );
655 :
656 0 : if( is_constipated ) {
657 :
658 0 : if( FD_UNLIKELY( !ctx->false_root ) ) {
659 :
660 0 : fd_funk_txn_t * txn = to_root_txn;
661 0 : fd_funk_txn_t * parent_txn = fd_funk_txn_parent( txn, txn_pool );
662 0 : while( parent_txn ) {
663 0 : txn = parent_txn;
664 0 : parent_txn = fd_funk_txn_parent( txn, txn_pool );
665 0 : }
666 :
667 0 : ctx->false_root = txn;
668 0 : if( !fd_txncache_get_is_constipated( ctx->slot_ctx->status_cache ) ) {
669 0 : fd_txncache_register_root_slot( ctx->slot_ctx->status_cache, txn->xid.ul[0] );
670 0 : } else {
671 0 : fd_txncache_register_constipated_slot( ctx->slot_ctx->status_cache, txn->xid.ul[0] );
672 0 : }
673 0 : }
674 0 : return ctx->false_root;
675 0 : } else {
676 0 : return NULL;
677 0 : }
678 0 : }
679 :
680 : static void
681 0 : funk_and_txncache_publish( fd_replay_tile_ctx_t * ctx, ulong wmk, fd_funk_txn_xid_t const * xid ) {
682 :
683 0 : FD_LOG_DEBUG(( "Entering funk_and_txncache_publish for wmk=%lu", wmk ));
684 :
685 0 : if( xid->ul[0] != wmk ) {
686 0 : FD_LOG_CRIT(( "Invariant violation: xid->ul[0] != wmk %lu %lu", xid->ul[0], wmk ));
687 0 : }
688 :
689 : /* Handle updates to funk and the status cache. */
690 :
691 0 : fd_funk_txn_start_read( ctx->funk );
692 0 : fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
693 0 : fd_funk_txn_t * to_root_txn = fd_funk_txn_query( xid, txn_map );
694 0 : if( FD_UNLIKELY( !to_root_txn ) ) {
695 0 : FD_LOG_ERR(( "Unable to find funk transaction for xid %lu", xid->ul[0] ));
696 0 : }
697 0 : fd_funk_txn_t * rooted_txn = get_rooted_txn( ctx, to_root_txn, 0 );
698 0 : fd_funk_txn_end_read( ctx->funk );
699 :
700 0 : txncache_publish( ctx, to_root_txn, rooted_txn );
701 :
702 0 : funk_publish( ctx, to_root_txn, wmk, 0 );
703 :
704 0 : if( FD_UNLIKELY( ctx->capture_ctx ) ) {
705 0 : fd_runtime_checkpt( ctx->capture_ctx, ctx->slot_ctx, wmk );
706 0 : }
707 :
708 0 : }
709 :
710 : static void
711 : after_frag( fd_replay_tile_ctx_t * ctx,
712 : ulong in_idx,
713 : ulong seq FD_PARAM_UNUSED,
714 : ulong sig,
715 : ulong sz FD_PARAM_UNUSED,
716 : ulong tsorig FD_PARAM_UNUSED,
717 : ulong tspub FD_PARAM_UNUSED,
718 0 : fd_stem_context_t * stem FD_PARAM_UNUSED ) {
719 0 : if( FD_LIKELY( in_idx==ctx->tower_in_idx ) ) {
720 0 : ulong root = sig;
721 :
722 0 : if( FD_LIKELY( root <= fd_fseq_query( ctx->published_wmark ) ) ) return;
723 0 : FD_LOG_NOTICE(( "advancing root %lu => %lu", fd_fseq_query( ctx->published_wmark ), root ));
724 :
725 0 : ctx->root = root;
726 0 : if( FD_LIKELY( ctx->blockstore ) ) fd_blockstore_publish( ctx->blockstore, ctx->blockstore_fd, root );
727 0 : if( FD_LIKELY( ctx->forks ) ) fd_forks_publish( ctx->forks, root );
728 0 : if( FD_LIKELY( ctx->funk ) ) { fd_funk_txn_xid_t xid = { .ul = { root, root } }; funk_and_txncache_publish( ctx, root, &xid ); }
729 0 : if( FD_LIKELY( ctx->banks ) ) fd_banks_publish( ctx->banks, root );
730 :
731 0 : fd_fseq_update( ctx->published_wmark, root );
732 0 : }
733 0 : }
734 :
735 : static void
736 : replay_plugin_publish( fd_replay_tile_ctx_t * ctx,
737 : fd_stem_context_t * stem,
738 : ulong sig,
739 : uchar const * data,
740 0 : ulong data_sz ) {
741 0 : uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->plugin_out->mem, ctx->plugin_out->chunk );
742 0 : fd_memcpy( dst, data, data_sz );
743 0 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
744 0 : fd_stem_publish( stem, ctx->plugin_out->idx, sig, ctx->plugin_out->chunk, data_sz, 0UL, 0UL, tspub );
745 0 : ctx->plugin_out->chunk = fd_dcache_compact_next( ctx->plugin_out->chunk, data_sz, ctx->plugin_out->chunk0, ctx->plugin_out->wmark );
746 0 : }
747 :
748 : static void
749 : publish_slot_notifications( fd_replay_tile_ctx_t * ctx,
750 : fd_stem_context_t * stem,
751 : ulong block_entry_block_height,
752 0 : ulong curr_slot ) {
753 0 : if( FD_LIKELY( !ctx->notif_out->mcache ) ) return;
754 :
755 0 : long notify_time_ns = -fd_log_wallclock();
756 0 : #define NOTIFY_START msg = fd_chunk_to_laddr( ctx->notif_out->mem, ctx->notif_out->chunk )
757 0 : #define NOTIFY_END \
758 0 : fd_mcache_publish( ctx->notif_out->mcache, ctx->notif_out->depth, ctx->notif_out->seq, \
759 0 : 0UL, ctx->notif_out->chunk, sizeof(fd_replay_notif_msg_t), 0UL, tsorig, tsorig ); \
760 0 : ctx->notif_out->seq = fd_seq_inc( ctx->notif_out->seq, 1UL ); \
761 0 : ctx->notif_out->chunk = fd_dcache_compact_next( ctx->notif_out->chunk, sizeof(fd_replay_notif_msg_t), \
762 0 : ctx->notif_out->chunk0, ctx->notif_out->wmark ); \
763 0 : msg = NULL
764 :
765 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
766 0 : fd_replay_notif_msg_t * msg = NULL;
767 :
768 0 : {
769 0 : NOTIFY_START;
770 0 : msg->type = FD_REPLAY_SLOT_TYPE;
771 0 : msg->slot_exec.slot = curr_slot;
772 0 : msg->slot_exec.parent = ctx->parent_slot;
773 0 : msg->slot_exec.root = fd_fseq_query( ctx->published_wmark );
774 0 : msg->slot_exec.height = block_entry_block_height;
775 0 : msg->slot_exec.transaction_count = fd_bank_txn_count_get( ctx->slot_ctx->bank );
776 0 : msg->slot_exec.shred_cnt = fd_bank_shred_cnt_get( ctx->slot_ctx->bank );
777 :
778 0 : msg->slot_exec.bank_hash = fd_bank_bank_hash_get( ctx->slot_ctx->bank );
779 :
780 0 : fd_block_hash_queue_global_t const * block_hash_queue = fd_bank_block_hash_queue_query( ctx->slot_ctx->bank );
781 0 : fd_hash_t * last_hash = fd_block_hash_queue_last_hash_join( block_hash_queue );
782 0 : msg->slot_exec.block_hash = *last_hash;
783 :
784 0 : memcpy( &msg->slot_exec.identity, ctx->validator_identity_pubkey, sizeof( fd_pubkey_t ) );
785 0 : msg->slot_exec.ts = tsorig;
786 0 : NOTIFY_END;
787 0 : }
788 0 : fd_bank_shred_cnt_set( ctx->slot_ctx->bank, 0UL );
789 :
790 0 : FD_TEST( curr_slot == ctx->slot_ctx->bank->slot );
791 :
792 0 : #undef NOTIFY_START
793 0 : #undef NOTIFY_END
794 0 : notify_time_ns += fd_log_wallclock();
795 0 : FD_LOG_DEBUG(("TIMING: notify_slot_time - slot: %lu, elapsed: %6.6f ms", curr_slot, (double)notify_time_ns * 1e-6));
796 :
797 0 : if( ctx->plugin_out->mem ) {
798 : /*
799 : fd_replay_complete_msg_t msg2 = {
800 : .slot = curr_slot,
801 : .total_txn_count = ctx->slot_ctx->txn_count,
802 : .nonvote_txn_count = ctx->slot_ctx->nonvote_txn_count,
803 : .failed_txn_count = ctx->slot_ctx->failed_txn_count,
804 : .nonvote_failed_txn_count = ctx->slot_ctx->nonvote_failed_txn_count,
805 : .compute_units = ctx->slot_ctx->total_compute_units_used,
806 : .transaction_fee = ctx->slot_ctx->slot_bank.collected_execution_fees,
807 : .priority_fee = ctx->slot_ctx-2842>slot_bank.collected_priority_fees,
808 : .parent_slot = ctx->parent_slot,
809 : };
810 : */
811 :
812 0 : ulong msg[11];
813 0 : msg[ 0 ] = ctx->curr_slot;
814 0 : msg[ 1 ] = fd_bank_txn_count_get( ctx->slot_ctx->bank );
815 0 : msg[ 2 ] = fd_bank_nonvote_txn_count_get( ctx->slot_ctx->bank );
816 0 : msg[ 3 ] = fd_bank_failed_txn_count_get( ctx->slot_ctx->bank );
817 0 : msg[ 4 ] = fd_bank_nonvote_failed_txn_count_get( ctx->slot_ctx->bank );
818 0 : msg[ 5 ] = fd_bank_total_compute_units_used_get( ctx->slot_ctx->bank );
819 0 : msg[ 6 ] = fd_bank_execution_fees_get( ctx->slot_ctx->bank );
820 0 : msg[ 7 ] = fd_bank_priority_fees_get( ctx->slot_ctx->bank );
821 0 : msg[ 8 ] = 0UL; /* todo ... track tips */
822 0 : msg[ 9 ] = ctx->parent_slot;
823 0 : msg[ 10 ] = 0UL; /* todo ... max compute units */
824 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_SLOT_COMPLETED, (uchar const *)msg, sizeof(msg) );
825 0 : }
826 0 : }
827 :
828 : // static void
829 : // send_tower_sync( fd_replay_tile_ctx_t * ctx ) {
830 : // if( FD_UNLIKELY( !ctx->vote ) ) return;
831 : // FD_LOG_NOTICE( ( "sending tower sync" ) );
832 : // ulong vote_slot = fd_tower_votes_peek_tail_const( ctx->tower )->slot;
833 : // fd_hash_t vote_bank_hash[1] = { 0 };
834 : // fd_hash_t vote_block_hash[1] = { 0 };
835 :
836 : // /* guaranteed to be on frontier from caller check */
837 : // fd_fork_t const * fork = fd_forks_query_const( ctx->forks, vote_slot );
838 : // fd_memcpy( vote_bank_hash, &ctx->slot_ctx->slot_bank.banks_hash, sizeof(fd_hash_t) );
839 :
840 : // int err = fd_blockstore_block_hash_query( ctx->blockstore, vote_slot, vote_block_hash );
841 : // if( err ) FD_LOG_ERR(( "invariant violation: missing block hash for tower vote" ));
842 :
843 : // /* Build a vote state update based on current tower votes. */
844 :
845 : // fd_txn_p_t * txn = (fd_txn_p_t *)fd_chunk_to_laddr( ctx->send_out->mem, ctx->send_out->chunk );
846 : // fd_tower_to_vote_txn( ctx->tower,
847 : // ctx->root,
848 : // vote_bank_hash,
849 : // vote_block_hash,
850 : // ctx->validator_identity,
851 : // ctx->vote_authority,
852 : // ctx->vote_acc,
853 : // txn,
854 : // ctx->runtime_spad );
855 :
856 : // /* TODO: Can use a smaller size, adjusted for payload length */
857 : // ulong msg_sz = sizeof( fd_txn_p_t );
858 : // ulong sig = vote_slot;
859 : // fd_mcache_publish( ctx->send_out->mcache,
860 : // ctx->send_out->depth,
861 : // ctx->send_out->seq,
862 : // sig,
863 : // ctx->send_out->chunk,
864 : // msg_sz,
865 : // 0UL,
866 : // 0,
867 : // 0 );
868 : // ctx->send_out->seq = fd_seq_inc( ctx->send_out->seq, 1UL );
869 : // ctx->send_out->chunk = fd_dcache_compact_next( ctx->send_out->chunk,
870 : // msg_sz,
871 : // ctx->send_out->chunk0,
872 : // ctx->send_out->wmark );
873 :
874 : // /* Dump the latest sent tower into the tower checkpoint file */
875 : // if( FD_LIKELY( ctx->tower_checkpt_fileno > 0 ) ) fd_restart_tower_checkpt( vote_bank_hash, ctx->tower, ctx->ghost, ctx->root, ctx->tower_checkpt_fileno );
876 : // }
877 :
878 : static fd_fork_t *
879 : prepare_new_block_execution( fd_replay_tile_ctx_t * ctx,
880 : fd_stem_context_t * stem,
881 : ulong curr_slot,
882 0 : ulong flags ) {
883 :
884 0 : long prepare_time_ns = -fd_log_wallclock();
885 :
886 0 : int is_new_epoch_in_new_block = 0;
887 0 : fd_fork_t * fork = fd_forks_prepare( ctx->forks,
888 0 : ctx->parent_slot,
889 0 : ctx->funk,
890 0 : ctx->blockstore,
891 0 : ctx->runtime_spad );
892 :
893 : /* Remove previous slot ctx from frontier */
894 0 : fd_fork_t * child = fd_fork_frontier_ele_remove( ctx->forks->frontier, &fork->slot, NULL, ctx->forks->pool );
895 0 : child->slot = curr_slot;
896 0 : child->end_idx = UINT_MAX; // reset end_idx from whatever was previously executed on this fork
897 :
898 : /* Insert new slot onto fork frontier */
899 0 : if( FD_UNLIKELY( fd_fork_frontier_ele_query(
900 0 : ctx->forks->frontier, &curr_slot, NULL, ctx->forks->pool ) ) ) {
901 0 : FD_LOG_ERR( ( "invariant violation: child slot %lu was already in the frontier", curr_slot ) );
902 0 : }
903 0 : fd_fork_frontier_ele_insert( ctx->forks->frontier, child, ctx->forks->pool );
904 0 : fork->lock = 1;
905 0 : FD_TEST( fork == child );
906 :
907 0 : FD_LOG_NOTICE(( "new block execution - slot: %lu, parent_slot: %lu", curr_slot, ctx->parent_slot ));
908 0 : ctx->slot_ctx->banks = ctx->banks;
909 0 : if( FD_UNLIKELY( !ctx->banks ) ) {
910 0 : FD_LOG_CRIT(( "invariant violation: banks is NULL" ));
911 0 : }
912 :
913 0 : ctx->slot_ctx->bank = fd_banks_clone_from_parent( ctx->banks, curr_slot, ctx->parent_slot );
914 0 : if( FD_UNLIKELY( !ctx->slot_ctx->bank ) ) {
915 0 : FD_LOG_CRIT(( "invariant violation: bank is NULL" ));
916 0 : }
917 :
918 : /* if it is an epoch boundary, push out stake weights */
919 :
920 0 : if( ctx->slot_ctx->slot != 0 ) {
921 0 : is_new_epoch_in_new_block = (int)fd_runtime_is_epoch_boundary( ctx->slot_ctx, ctx->slot_ctx->slot, fd_bank_prev_slot_get( ctx->slot_ctx->bank ) );
922 0 : }
923 :
924 : /* Update starting PoH hash for the new slot for tick verification later */
925 0 : fd_block_map_query_t query[1] = { 0 };
926 0 : int err = fd_block_map_prepare( ctx->blockstore->block_map, &curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
927 0 : fd_block_info_t * curr_block_info = fd_block_map_query_ele( query );
928 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_FULL ) ) FD_LOG_ERR(("Block map prepare failed, likely corrupt."));
929 0 : if( FD_UNLIKELY( curr_slot != curr_block_info->slot ) ) FD_LOG_ERR(("Block map prepare failed, likely corrupt."));
930 0 : fd_block_map_publish( query );
931 :
932 0 : fd_bank_prev_slot_set( ctx->slot_ctx->bank, ctx->slot_ctx->slot );
933 :
934 0 : ctx->slot_ctx->slot = curr_slot;
935 :
936 0 : fd_bank_tick_height_set( ctx->slot_ctx->bank, fd_bank_max_tick_height_get( ctx->slot_ctx->bank ) );
937 :
938 0 : ulong * max_tick_height = fd_bank_max_tick_height_modify( ctx->slot_ctx->bank );
939 0 : ulong ticks_per_slot = fd_bank_ticks_per_slot_get( ctx->slot_ctx->bank );
940 0 : if( FD_UNLIKELY( FD_RUNTIME_EXECUTE_SUCCESS != fd_runtime_compute_max_tick_height( ticks_per_slot,
941 0 : curr_slot,
942 0 : max_tick_height ) ) ) {
943 0 : FD_LOG_ERR(( "couldn't compute tick height/max tick height slot %lu ticks_per_slot %lu", curr_slot, ticks_per_slot ));
944 0 : }
945 :
946 0 : fd_bank_enable_exec_recording_set( ctx->slot_ctx->bank, ctx->tx_metadata_storage );
947 :
948 0 : ctx->slot_ctx->status_cache = ctx->status_cache;
949 :
950 0 : fd_funk_txn_xid_t xid = { 0 };
951 :
952 0 : if( flags & REPLAY_FLAG_PACKED_MICROBLOCK ) {
953 0 : memset( xid.uc, 0, sizeof(fd_funk_txn_xid_t) );
954 0 : } else {
955 0 : xid.ul[1] = ctx->slot_ctx->slot;
956 0 : }
957 0 : xid.ul[0] = ctx->slot_ctx->slot;
958 : /* push a new transaction on the stack */
959 0 : fd_funk_txn_start_write( ctx->funk );
960 :
961 0 : fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
962 0 : if( FD_UNLIKELY( !txn_map->map ) ) {
963 0 : FD_LOG_ERR(( "Could not find valid funk transaction map" ));
964 0 : }
965 0 : fd_funk_txn_xid_t parent_xid = { .ul = { ctx->parent_slot, ctx->parent_slot } };
966 0 : fd_funk_txn_t * parent_txn = fd_funk_txn_query( &parent_xid, txn_map );
967 :
968 0 : ctx->slot_ctx->funk_txn = fd_funk_txn_prepare(ctx->funk, parent_txn, &xid, 1 );
969 :
970 0 : fd_funk_txn_end_write( ctx->funk );
971 :
972 0 : int is_epoch_boundary = 0;
973 : /* TODO: Currently all of the epoch boundary/rewards logic is not
974 : multhreaded at the epoch boundary. */
975 0 : fd_runtime_block_pre_execute_process_new_epoch( ctx->slot_ctx,
976 0 : NULL,
977 0 : ctx->exec_spads,
978 0 : ctx->exec_spad_cnt,
979 0 : ctx->runtime_spad,
980 0 : &is_epoch_boundary );
981 :
982 : /* FIXME: This breaks the concurrency model where we can change forks
983 : in the middle of a block. */
984 : /* At this point we need to notify all of the exec tiles and tell them
985 : that a new slot is ready to be published. At this point, we should
986 : also mark the tile as not being ready. */
987 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
988 0 : ctx->exec_ready[ i ] = EXEC_TXN_READY;
989 0 : }
990 :
991 : /* We want to push on a spad frame before we start executing a block.
992 : Apart from allocations made at the epoch boundary, there should be no
993 : allocations that persist beyond the scope of a block. Before this point,
994 : there should only be 1 or 2 frames that are on the stack. The first frame
995 : will hold memory for the slot/epoch context. The potential second frame
996 : will only exist while rewards are being distributed (around the start of
997 : an epoch). We pop a frame when rewards are done being distributed. */
998 0 : fd_spad_push( ctx->runtime_spad );
999 :
1000 0 : int res = fd_runtime_block_execute_prepare( ctx->slot_ctx, ctx->blockstore, ctx->runtime_spad );
1001 0 : if( res != FD_RUNTIME_EXECUTE_SUCCESS ) {
1002 0 : FD_LOG_ERR(( "block prep execute failed" ));
1003 0 : }
1004 :
1005 0 : if( is_new_epoch_in_new_block ) {
1006 0 : publish_stake_weights( ctx, stem, ctx->slot_ctx );
1007 0 : }
1008 :
1009 0 : prepare_time_ns += fd_log_wallclock();
1010 0 : FD_LOG_DEBUG(("TIMING: prepare_time - slot: %lu, elapsed: %6.6f ms", curr_slot, (double)prepare_time_ns * 1e-6));
1011 :
1012 0 : return fork;
1013 0 : }
1014 :
1015 : static void
1016 0 : init_poh( fd_replay_tile_ctx_t * ctx ) {
1017 0 : FD_LOG_INFO(( "sending init msg" ));
1018 0 : fd_replay_out_link_t * bank_out = &ctx->bank_out[ 0UL ];
1019 0 : fd_poh_init_msg_t * msg = fd_chunk_to_laddr( bank_out->mem, bank_out->chunk );
1020 0 : msg->hashcnt_per_tick = fd_bank_hashes_per_tick_get( ctx->slot_ctx->bank );
1021 0 : msg->ticks_per_slot = fd_bank_ticks_per_slot_get( ctx->slot_ctx->bank );
1022 0 : msg->tick_duration_ns = (ulong)(fd_bank_ns_per_slot_get( ctx->slot_ctx->bank )) / fd_bank_ticks_per_slot_get( ctx->slot_ctx->bank );
1023 :
1024 0 : fd_block_hash_queue_global_t * bhq = (fd_block_hash_queue_global_t *)&ctx->slot_ctx->bank->block_hash_queue[0];
1025 0 : fd_hash_t * last_hash = fd_block_hash_queue_last_hash_join( bhq );
1026 0 : if( last_hash ) {
1027 0 : memcpy(msg->last_entry_hash, last_hash, sizeof(fd_hash_t));
1028 0 : } else {
1029 0 : memset(msg->last_entry_hash, 0UL, sizeof(fd_hash_t));
1030 0 : }
1031 0 : msg->tick_height = ctx->slot_ctx->slot * msg->ticks_per_slot;
1032 :
1033 0 : ulong sig = fd_disco_replay_old_sig( ctx->slot_ctx->slot, REPLAY_FLAG_INIT );
1034 0 : fd_mcache_publish( bank_out->mcache, bank_out->depth, bank_out->seq, sig, bank_out->chunk, sizeof(fd_poh_init_msg_t), 0UL, 0UL, 0UL );
1035 0 : bank_out->chunk = fd_dcache_compact_next( bank_out->chunk, sizeof(fd_poh_init_msg_t), bank_out->chunk0, bank_out->wmark );
1036 0 : bank_out->seq = fd_seq_inc( bank_out->seq, 1UL );
1037 0 : ctx->poh_init_done = 1;
1038 0 : }
1039 :
1040 : static void
1041 0 : prepare_first_batch_execution( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem ) {
1042 :
1043 0 : ulong curr_slot = ctx->curr_slot;
1044 0 : ulong flags = ctx->flags;
1045 :
1046 : /**********************************************************************/
1047 : /* Prepare the fork in ctx->forks for replaying curr_slot */
1048 : /**********************************************************************/
1049 :
1050 0 : fd_fork_t * parent_fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &ctx->parent_slot, NULL, ctx->forks->pool );
1051 0 : if( FD_UNLIKELY( parent_fork && parent_fork->lock ) ) {
1052 : /* This is an edge case related to pack. The parent fork might
1053 : already be in the frontier and currently executing (ie.
1054 : fork->frozen = 0). */
1055 0 : FD_LOG_ERR(( "parent slot is frozen in frontier. cannot execute. slot: %lu, parent_slot: %lu",
1056 0 : curr_slot,
1057 0 : ctx->parent_slot ));
1058 0 : }
1059 :
1060 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &curr_slot, NULL, ctx->forks->pool );
1061 0 : if( fork==NULL ) {
1062 0 : fork = prepare_new_block_execution( ctx, stem, curr_slot, flags );
1063 0 : } else {
1064 0 : FD_LOG_WARNING(("Fork for slot %lu already exists, so we don't make a new one. Restarting execution from batch %u", curr_slot, fork->end_idx ));
1065 0 : ctx->slot_ctx->bank = fd_banks_get_bank( ctx->banks, curr_slot );
1066 0 : if( FD_UNLIKELY( !ctx->slot_ctx->bank ) ) {
1067 0 : FD_LOG_CRIT(( "Unable to get bank for slot %lu", curr_slot ));
1068 0 : }
1069 :
1070 0 : fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
1071 0 : fd_funk_txn_xid_t xid = { .ul = { curr_slot, curr_slot } };
1072 0 : ctx->slot_ctx->funk_txn = fd_funk_txn_query( &xid, txn_map );
1073 0 : if( FD_UNLIKELY( !ctx->slot_ctx->funk_txn ) ) {
1074 0 : FD_LOG_CRIT(( "Unable to get funk transaction for slot %lu", curr_slot ));
1075 0 : }
1076 :
1077 0 : ctx->slot_ctx->slot = curr_slot;
1078 0 : }
1079 :
1080 : /**********************************************************************/
1081 : /* Get the solcap context for replaying curr_slot */
1082 : /**********************************************************************/
1083 :
1084 0 : if( ctx->capture_ctx ) {
1085 0 : fd_solcap_writer_set_slot( ctx->capture_ctx->capture, ctx->slot_ctx->slot );
1086 0 : }
1087 :
1088 0 : }
1089 :
1090 : static void
1091 : exec_slice( fd_replay_tile_ctx_t * ctx,
1092 : fd_stem_context_t * stem,
1093 0 : ulong slot ) {
1094 :
1095 : /* Assumes that the slice exec ctx has buffered at least one slice.
1096 : Then, for each microblock, round robin dispatch the transactions in
1097 : that microblock to the exec tile. Once exec tile signifies with a
1098 : retcode, we can continue dispatching transactions. Replay has to
1099 : synchronize at the boundary of every microblock. After we dispatch
1100 : one to each exec tile, we watermark where we are, and then continue
1101 : on the following after_credit. If we still have txns to execute,
1102 : start from wmark, pausing everytime we hit the microblock
1103 : boundaries. */
1104 :
1105 0 : uchar to_exec[ FD_PACK_MAX_BANK_TILES ] = {0};
1106 0 : uchar num_free_exec_tiles = 0UL;
1107 0 : for( uchar i=0; i<ctx->exec_cnt; i++ ) {
1108 0 : if( ctx->exec_ready[ i ]==EXEC_TXN_READY ) {
1109 0 : to_exec[ num_free_exec_tiles++ ] = i;
1110 0 : }
1111 0 : }
1112 :
1113 0 : if( ctx->blocked_on_mblock ) {
1114 0 : if( num_free_exec_tiles==ctx->exec_cnt ) {
1115 0 : ctx->blocked_on_mblock = 0;
1116 0 : } else {
1117 0 : return;
1118 0 : }
1119 0 : }
1120 :
1121 0 : uchar start_num_free_exec_tiles = (uchar)ctx->exec_cnt;
1122 0 : while( num_free_exec_tiles>0 ) {
1123 :
1124 0 : if( fd_slice_exec_txn_ready( &ctx->slice_exec_ctx ) ) {
1125 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
1126 :
1127 0 : uchar exec_idx = to_exec[ num_free_exec_tiles-1 ];
1128 0 : fd_replay_out_link_t * exec_out = &ctx->exec_out[ exec_idx ];
1129 0 : num_free_exec_tiles--;
1130 :
1131 0 : fd_txn_p_t txn_p;
1132 0 : fd_slice_exec_txn_parse( &ctx->slice_exec_ctx, &txn_p );
1133 :
1134 : /* Insert or reverify invoked programs for this epoch, if needed
1135 : FIXME: this should be done during txn parsing so that we don't have to loop
1136 : over all accounts a second time. */
1137 0 : fd_runtime_update_program_cache( ctx->slot_ctx, &txn_p, ctx->runtime_spad );
1138 :
1139 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier,
1140 0 : &slot,
1141 0 : NULL,
1142 0 : ctx->forks->pool );
1143 :
1144 0 : if( FD_UNLIKELY( !fork ) ) FD_LOG_ERR(( "Unable to select a fork" ));
1145 :
1146 0 : fd_bank_txn_count_set( ctx->slot_ctx->bank, fd_bank_txn_count_get( ctx->slot_ctx->bank ) + 1 );
1147 :
1148 : /* dispatch dcache */
1149 0 : fd_runtime_public_txn_msg_t * exec_msg = (fd_runtime_public_txn_msg_t *)fd_chunk_to_laddr( exec_out->mem, exec_out->chunk );
1150 0 : memcpy( &exec_msg->txn, &txn_p, sizeof(fd_txn_p_t) );
1151 0 : exec_msg->slot = ctx->slot_ctx->slot;
1152 :
1153 0 : ctx->exec_ready[ exec_idx ] = EXEC_TXN_BUSY;
1154 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
1155 0 : fd_stem_publish( stem, exec_out->idx, EXEC_NEW_TXN_SIG, exec_out->chunk, sizeof(fd_runtime_public_txn_msg_t), 0UL, tsorig, tspub );
1156 0 : exec_out->chunk = fd_dcache_compact_next( exec_out->chunk, sizeof(fd_runtime_public_txn_msg_t), exec_out->chunk0, exec_out->wmark );
1157 :
1158 0 : continue;
1159 0 : }
1160 :
1161 : /* If the current microblock is complete, and we still have mblks
1162 : to read, then advance to the next microblock */
1163 :
1164 0 : if( fd_slice_exec_microblock_ready( &ctx->slice_exec_ctx ) ) {
1165 0 : ctx->blocked_on_mblock = 1;
1166 0 : fd_slice_exec_microblock_parse( &ctx->slice_exec_ctx );
1167 0 : }
1168 :
1169 : /* Under this condition, we have finished executing all the
1170 : microblocks in the slice, and are ready to load another slice.
1171 : However, if just completed the last batch in the slot, we want
1172 : to be sure to finalize block execution (below). */
1173 :
1174 0 : if( fd_slice_exec_slice_ready( &ctx->slice_exec_ctx )
1175 0 : && !ctx->slice_exec_ctx.last_batch ){
1176 0 : ctx->flags = EXEC_FLAG_READY_NEW;
1177 0 : }
1178 0 : break; /* block on microblock / batch */
1179 0 : }
1180 :
1181 0 : if( fd_slice_exec_slot_complete( &ctx->slice_exec_ctx ) ) {
1182 :
1183 0 : if( num_free_exec_tiles != start_num_free_exec_tiles ) {
1184 0 : FD_LOG_DEBUG(( "blocked on exec tiles completing" ));
1185 0 : return;
1186 0 : }
1187 :
1188 0 : FD_LOG_DEBUG(( "[%s] BLOCK EXECUTION COMPLETE", __func__ ));
1189 :
1190 : /* At this point, the entire block has been executed. */
1191 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier,
1192 0 : &slot,
1193 0 : NULL,
1194 0 : ctx->forks->pool );
1195 0 : if( FD_UNLIKELY( !fork ) ) {
1196 0 : FD_LOG_ERR(( "Unable to select a fork" ));
1197 0 : }
1198 :
1199 0 : fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)fd_type_pun( ctx->slice_exec_ctx.buf + ctx->slice_exec_ctx.last_mblk_off );
1200 :
1201 : // Copy block hash to slot_bank poh for updating the sysvars
1202 0 : fd_block_map_query_t query[1] = { 0 };
1203 0 : fd_block_map_prepare( ctx->blockstore->block_map, &ctx->curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
1204 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
1205 :
1206 0 : fd_hash_t * poh = fd_bank_poh_modify( ctx->slot_ctx->bank );
1207 0 : memcpy( poh, hdr->hash, sizeof(fd_hash_t) );
1208 :
1209 0 : block_info->flags = fd_uchar_set_bit( block_info->flags, FD_BLOCK_FLAG_PROCESSED );
1210 0 : FD_COMPILER_MFENCE();
1211 0 : block_info->flags = fd_uchar_clear_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING );
1212 0 : memcpy( &block_info->block_hash, hdr->hash, sizeof(fd_hash_t) );
1213 0 : fd_hash_t const * bank_hash = fd_bank_bank_hash_query( ctx->slot_ctx->bank );
1214 0 : memcpy( &block_info->bank_hash, bank_hash, sizeof(fd_hash_t) );
1215 :
1216 0 : fd_block_map_publish( query );
1217 0 : ctx->flags = EXEC_FLAG_FINISHED_SLOT;
1218 :
1219 0 : fd_slice_exec_reset( &ctx->slice_exec_ctx ); /* Reset ctx for next slot */
1220 0 : }
1221 :
1222 0 : }
1223 :
1224 : /* handle_slice polls for a new slice off the slices stored in the
1225 : deque, and prepares it for execution, including call
1226 : prepare_first_batch_execution if this is the first slice of a new
1227 : slot. Also queries blockstore for the corresponding shreds and stores
1228 : them into the slice_exec_ctx. Assumes that replay is ready for a new
1229 : slice (i.e., finished executing the previous slice). */
1230 : static void
1231 : handle_slice( fd_replay_tile_ctx_t * ctx,
1232 0 : fd_stem_context_t * stem ) {
1233 :
1234 0 : if( fd_exec_slice_cnt( ctx->exec_slice_deque )==0UL ) {
1235 0 : FD_LOG_DEBUG(( "No slices to execute" ));
1236 0 : return;
1237 0 : }
1238 :
1239 0 : ulong sig = fd_exec_slice_pop_head( ctx->exec_slice_deque );
1240 :
1241 0 : if( FD_UNLIKELY( ctx->flags!=EXEC_FLAG_READY_NEW ) ) {
1242 0 : FD_LOG_ERR(( "Replay is in unexpected state" ));
1243 0 : }
1244 :
1245 0 : ulong slot = fd_disco_repair_replay_sig_slot( sig );
1246 0 : ushort parent_off = fd_disco_repair_replay_sig_parent_off( sig );
1247 0 : uint data_cnt = fd_disco_repair_replay_sig_data_cnt( sig );
1248 0 : int slot_complete = fd_disco_repair_replay_sig_slot_complete( sig );
1249 0 : ulong parent_slot = slot - parent_off;
1250 :
1251 0 : if( FD_UNLIKELY( slot < fd_fseq_query( ctx->published_wmark ) ) ) {
1252 0 : FD_LOG_WARNING(( "ignoring replay of slot %lu (parent: %lu). earlier than our watermark %lu.", slot, parent_slot, fd_fseq_query( ctx->published_wmark ) ));
1253 0 : return;
1254 0 : }
1255 :
1256 0 : if( FD_UNLIKELY( parent_slot < fd_fseq_query( ctx->published_wmark ) ) ) {
1257 0 : FD_LOG_WARNING(( "ignoring replay of slot %lu (parent: %lu). parent slot is earlier than our watermark %lu.", slot, parent_slot, fd_fseq_query( ctx->published_wmark ) ) );
1258 0 : return;
1259 0 : }
1260 :
1261 0 : if( FD_UNLIKELY( !fd_blockstore_block_info_test( ctx->blockstore, parent_slot ) ) ) {
1262 0 : FD_LOG_WARNING(( "unable to find slot %lu's parent slot %lu block_info", slot, parent_slot ));
1263 0 : return;
1264 0 : }
1265 :
1266 0 : if( FD_UNLIKELY( slot != ctx->curr_slot ) ) {
1267 :
1268 : /* We need to switch forks and execution contexts. Either we
1269 : completed execution of the previous slot and are now executing
1270 : a new slot or we are interleaving batches from different slots
1271 : - all executable at the fork frontier.
1272 :
1273 : Going to need to query the frontier for the fork, or create it
1274 : if its not on the frontier. I think
1275 : prepare_first_batch_execution already handles this logic. */
1276 :
1277 0 : ctx->curr_slot = slot;
1278 0 : ctx->parent_slot = parent_slot;
1279 0 : prepare_first_batch_execution( ctx, stem );
1280 :
1281 0 : ulong turbine_slot = fd_fseq_query( ctx->turbine_slot );
1282 :
1283 0 : FD_LOG_NOTICE( ( "\n\n[Replay]\n"
1284 0 : "slot: %lu\n"
1285 0 : "current turbine: %lu\n"
1286 0 : "slots behind: %lu\n"
1287 0 : "live: %d\n",
1288 0 : slot,
1289 0 : turbine_slot,
1290 0 : turbine_slot - slot,
1291 0 : ( turbine_slot - slot ) < 5 ) );
1292 0 : } else {
1293 : /* continuing execution of the slot we have been doing */
1294 0 : }
1295 :
1296 : /* Prepare batch for execution on following after_credit iteration */
1297 0 : ctx->flags = EXEC_FLAG_EXECUTING_SLICE;
1298 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &slot, NULL, ctx->forks->pool );
1299 0 : ulong slice_sz;
1300 0 : uint start_idx = fork->end_idx + 1;
1301 0 : int err = fd_blockstore_slice_query( ctx->blockstore,
1302 0 : slot,
1303 0 : start_idx,
1304 0 : start_idx + data_cnt - 1,
1305 0 : FD_SLICE_MAX,
1306 0 : ctx->slice_exec_ctx.buf,
1307 0 : &slice_sz );
1308 0 : fork->end_idx += data_cnt;
1309 0 : fd_slice_exec_begin( &ctx->slice_exec_ctx, slice_sz, slot_complete );
1310 0 : fd_bank_shred_cnt_set( ctx->slot_ctx->bank, fd_bank_shred_cnt_get( ctx->slot_ctx->bank ) + data_cnt );
1311 :
1312 0 : if( FD_UNLIKELY( err ) ) {
1313 0 : FD_LOG_ERR(( "Failed to query blockstore for slot %lu", slot ));
1314 0 : }
1315 0 : }
1316 :
1317 : static void
1318 0 : kickoff_repair_orphans( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem ) {
1319 0 : ctx->slot_ctx->slot = ctx->curr_slot;
1320 0 : fd_blockstore_init( ctx->blockstore,
1321 0 : ctx->blockstore_fd,
1322 0 : FD_BLOCKSTORE_ARCHIVE_MIN_SIZE,
1323 0 : ctx->slot_ctx->slot );
1324 :
1325 0 : fd_fseq_update( ctx->published_wmark, ctx->slot_ctx->slot );
1326 0 : publish_stake_weights( ctx, stem, ctx->slot_ctx );
1327 :
1328 0 : }
1329 :
1330 : static void
1331 : read_snapshot( void * _ctx,
1332 : fd_stem_context_t * stem,
1333 : char const * snapshot,
1334 : char const * incremental,
1335 0 : char const * snapshot_dir ) {
1336 0 : fd_replay_tile_ctx_t * ctx = (fd_replay_tile_ctx_t *)_ctx;
1337 :
1338 0 : fd_exec_para_cb_ctx_t exec_para_ctx_snap = {
1339 0 : .func = snapshot_hash_tiles_cb,
1340 0 : .para_arg_1 = ctx,
1341 0 : .para_arg_2 = stem,
1342 0 : };
1343 :
1344 : /* Pass the slot_ctx to snapshot_load or recover_banks */
1345 : /* Base slot is the slot we will compare against the base slot of the incremental snapshot, to ensure that the
1346 : base slot of the incremental snapshot is the slot of the full snapshot.
1347 :
1348 : We pull this out of the full snapshot to use when verifying the incremental snapshot. */
1349 0 : ulong base_slot = 0UL;
1350 0 : if( strcmp( snapshot, "funk" )==0 || strncmp( snapshot, "wksp:", 5 )==0 ) {
1351 : /* Funk already has a snapshot loaded */
1352 0 : base_slot = ctx->slot_ctx->slot;
1353 0 : kickoff_repair_orphans( ctx, stem );
1354 0 : } else {
1355 :
1356 : /* If we have an incremental snapshot try to prefetch the snapshot slot
1357 : and manifest as soon as possible. In order to kick off repair effectively
1358 : we need the snapshot slot and the stake weights. These are both available
1359 : in the manifest. We will try to load in the manifest from the latest
1360 : snapshot that is availble, then setup the blockstore and publish the
1361 : stake weights. After this, repair will kick off concurrently with loading
1362 : the rest of the snapshots. */
1363 :
1364 : /* TODO: Verify account hashes for all 3 snapshot loads. */
1365 : /* TODO: If prefetching the manifest is enabled it leads to
1366 : incorrect snapshot loads. This needs to be looked into. */
1367 0 : if( strlen( incremental )>0UL ) {
1368 :
1369 0 : uchar * tmp_mem = fd_spad_alloc_check( ctx->runtime_spad, fd_snapshot_load_ctx_align(), fd_snapshot_load_ctx_footprint() );
1370 :
1371 0 : fd_snapshot_load_ctx_t * tmp_snap_ctx = fd_snapshot_load_new( tmp_mem,
1372 0 : incremental,
1373 0 : ctx->incremental_src_type,
1374 0 : NULL,
1375 0 : ctx->slot_ctx,
1376 0 : false,
1377 0 : false,
1378 0 : FD_SNAPSHOT_TYPE_INCREMENTAL,
1379 0 : ctx->exec_spads,
1380 0 : ctx->exec_spad_cnt,
1381 0 : ctx->runtime_spad,
1382 0 : &exec_para_ctx_snap );
1383 : /* Load the prefetch manifest, and initialize the status cache and slot context,
1384 : so that we can use these to kick off repair. */
1385 0 : fd_snapshot_load_prefetch_manifest( tmp_snap_ctx );
1386 0 : ctx->curr_slot = ctx->slot_ctx->slot;
1387 0 : kickoff_repair_orphans( ctx, stem );
1388 :
1389 0 : }
1390 :
1391 0 : FD_LOG_WARNING(("Loading full snapshot"));
1392 0 : uchar * mem = fd_spad_alloc( ctx->runtime_spad, fd_snapshot_load_ctx_align(), fd_snapshot_load_ctx_footprint() );
1393 0 : fd_snapshot_load_ctx_t * snap_ctx = fd_snapshot_load_new( mem,
1394 0 : snapshot,
1395 0 : ctx->snapshot_src_type,
1396 0 : snapshot_dir,
1397 0 : ctx->slot_ctx,
1398 0 : false,
1399 0 : false,
1400 0 : FD_SNAPSHOT_TYPE_FULL,
1401 0 : ctx->exec_spads,
1402 0 : ctx->exec_spad_cnt,
1403 0 : ctx->runtime_spad,
1404 0 : &exec_para_ctx_snap );
1405 :
1406 0 : fd_snapshot_load_init( snap_ctx );
1407 :
1408 : /* If we don't have an incremental snapshot, load the manifest and the status cache and initialize
1409 : the objects because we don't have these from the incremental snapshot. */
1410 0 : if( strlen( incremental )<=0UL ) {
1411 0 : fd_snapshot_load_manifest_and_status_cache( snap_ctx, NULL,
1412 0 : FD_SNAPSHOT_RESTORE_MANIFEST | FD_SNAPSHOT_RESTORE_STATUS_CACHE );
1413 0 : ctx->curr_slot = ctx->slot_ctx->slot;
1414 0 : kickoff_repair_orphans( ctx, stem );
1415 : /* If we don't have an incremental snapshot, we can still kick off
1416 : sending the stake weights and snapshot slot to repair. */
1417 0 : } else {
1418 : /* If we have an incremental snapshot, load the manifest and the status cache,
1419 : and don't initialize the objects because we did this above from the incremental snapshot. */
1420 0 : fd_snapshot_load_manifest_and_status_cache( snap_ctx, NULL, FD_SNAPSHOT_RESTORE_NONE );
1421 0 : }
1422 0 : base_slot = fd_snapshot_get_slot( snap_ctx );
1423 0 : fd_snapshot_load_accounts( snap_ctx );
1424 0 : fd_snapshot_load_fini( snap_ctx );
1425 0 : }
1426 :
1427 0 : if( strlen( incremental ) > 0 && strcmp( snapshot, "funk" ) != 0 ) {
1428 :
1429 : /* The slot of the full snapshot should be used as the base slot to verify the incremental snapshot,
1430 : not the slot context's slot - which is the slot of the incremental, not the full snapshot. */
1431 0 : fd_snapshot_load_all( incremental,
1432 0 : ctx->incremental_src_type,
1433 0 : NULL,
1434 0 : ctx->slot_ctx,
1435 0 : &base_slot,
1436 0 : NULL,
1437 0 : false,
1438 0 : false,
1439 0 : FD_SNAPSHOT_TYPE_INCREMENTAL,
1440 0 : ctx->exec_spads,
1441 0 : ctx->exec_spad_cnt,
1442 0 : ctx->runtime_spad );
1443 0 : ctx->curr_slot = ctx->slot_ctx->slot;
1444 0 : kickoff_repair_orphans( ctx, stem );
1445 0 : }
1446 :
1447 0 : fd_runtime_update_leaders( ctx->slot_ctx->bank,
1448 0 : ctx->slot_ctx->slot,
1449 0 : ctx->runtime_spad );
1450 0 : }
1451 :
1452 : static void
1453 : init_after_snapshot( fd_replay_tile_ctx_t * ctx,
1454 0 : fd_stem_context_t * stem ) {
1455 : /* Do not modify order! */
1456 :
1457 : /* After both snapshots have been loaded in, we can determine if we should
1458 : start distributing rewards. */
1459 :
1460 0 : fd_rewards_recalculate_partitioned_rewards( ctx->slot_ctx,
1461 0 : NULL,
1462 0 : ctx->exec_spads,
1463 0 : ctx->exec_spad_cnt,
1464 0 : ctx->runtime_spad );
1465 :
1466 0 : ulong snapshot_slot = ctx->slot_ctx->slot;
1467 0 : if( FD_UNLIKELY( !snapshot_slot ) ) {
1468 : /* Genesis-specific setup. */
1469 : /* FIXME: This branch does not set up a new block exec ctx
1470 : properly. Needs to do whatever prepare_new_block_execution
1471 : does, but just hacking that in breaks stuff. */
1472 0 : fd_runtime_update_leaders( ctx->slot_ctx->bank,
1473 0 : ctx->slot_ctx->slot,
1474 0 : ctx->runtime_spad );
1475 :
1476 0 : fd_bank_prev_slot_set( ctx->slot_ctx->bank, 0UL );
1477 0 : ctx->slot_ctx->slot = 1UL;
1478 :
1479 0 : ulong hashcnt_per_slot = fd_bank_hashes_per_tick_get( ctx->slot_ctx->bank ) * fd_bank_ticks_per_slot_get( ctx->slot_ctx->bank );
1480 0 : fd_hash_t * poh = fd_bank_poh_modify( ctx->slot_ctx->bank );
1481 0 : while(hashcnt_per_slot--) {
1482 0 : fd_sha256_hash( poh->hash, 32UL, poh->hash );
1483 0 : }
1484 :
1485 0 : FD_TEST( fd_runtime_block_execute_prepare( ctx->slot_ctx, ctx->blockstore, ctx->runtime_spad ) == 0 );
1486 0 : fd_runtime_block_info_t info = { .signature_cnt = 0 };
1487 :
1488 0 : fd_exec_para_cb_ctx_t exec_para_ctx_block_finalize = {
1489 0 : .func = block_finalize_tiles_cb,
1490 0 : .para_arg_1 = ctx,
1491 0 : .para_arg_2 = stem,
1492 0 : };
1493 :
1494 0 : fd_runtime_block_execute_finalize_para( ctx->slot_ctx,
1495 0 : ctx->capture_ctx,
1496 0 : &info,
1497 0 : ctx->exec_cnt,
1498 0 : ctx->runtime_spad,
1499 0 : &exec_para_ctx_block_finalize );
1500 :
1501 0 : ctx->slot_ctx->slot = 1UL;
1502 0 : snapshot_slot = 1UL;
1503 :
1504 : /* Now setup exec tiles for execution */
1505 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
1506 0 : ctx->exec_ready[ i ] = EXEC_TXN_READY;
1507 0 : }
1508 0 : }
1509 :
1510 0 : ctx->curr_slot = snapshot_slot;
1511 0 : ctx->parent_slot = fd_bank_prev_slot_get( ctx->slot_ctx->bank );
1512 0 : ctx->snapshot_slot = snapshot_slot;
1513 0 : ctx->flags = EXEC_FLAG_READY_NEW;
1514 :
1515 : /* Initialize consensus structures post-snapshot */
1516 :
1517 0 : fd_fork_t * snapshot_fork = fd_forks_init( ctx->forks, ctx->slot_ctx->slot );
1518 0 : if( FD_UNLIKELY( !snapshot_fork ) ) {
1519 0 : FD_LOG_CRIT(( "Failed to initialize snapshot fork" ));
1520 0 : }
1521 :
1522 0 : fd_stakes_global_t const * stakes = fd_bank_stakes_locking_query( ctx->slot_ctx->bank );
1523 0 : fd_vote_accounts_global_t const * vote_accounts = &stakes->vote_accounts;
1524 :
1525 0 : fd_vote_accounts_pair_global_t_mapnode_t * vote_accounts_pool = fd_vote_accounts_vote_accounts_pool_join( vote_accounts );
1526 0 : fd_vote_accounts_pair_global_t_mapnode_t * vote_accounts_root = fd_vote_accounts_vote_accounts_root_join( vote_accounts );
1527 :
1528 : /* Send to tower tile */
1529 :
1530 0 : if( FD_LIKELY( ctx->tower_out_idx!=ULONG_MAX ) ) {
1531 0 : uchar * chunk_laddr = fd_chunk_to_laddr( ctx->tower_out_mem, ctx->tower_out_chunk );
1532 0 : ulong off = 0;
1533 0 : for( fd_vote_accounts_pair_global_t_mapnode_t * curr = fd_vote_accounts_pair_global_t_map_minimum( vote_accounts_pool, vote_accounts_root );
1534 0 : curr;
1535 0 : curr = fd_vote_accounts_pair_global_t_map_successor( vote_accounts_pool, curr ) ) {
1536 :
1537 0 : if( FD_UNLIKELY( curr->elem.stake > 0UL ) ) {
1538 0 : memcpy( chunk_laddr + off, &curr->elem.key, sizeof(fd_pubkey_t) );
1539 0 : off += sizeof(fd_pubkey_t);
1540 :
1541 0 : memcpy( chunk_laddr + off, &curr->elem.stake, sizeof(ulong) );
1542 0 : off += sizeof(ulong);
1543 0 : }
1544 0 : }
1545 0 : fd_stem_publish( stem, ctx->tower_out_idx, snapshot_slot << 32UL | UINT_MAX, ctx->tower_out_chunk, off, 0UL, (ulong)fd_log_wallclock(), (ulong)fd_log_wallclock() );
1546 0 : }
1547 :
1548 0 : fd_bank_hash_cmp_t * bank_hash_cmp = ctx->bank_hash_cmp;
1549 0 : for( fd_vote_accounts_pair_global_t_mapnode_t * curr = fd_vote_accounts_pair_global_t_map_minimum( vote_accounts_pool, vote_accounts_root );
1550 0 : curr;
1551 0 : curr = fd_vote_accounts_pair_global_t_map_successor( vote_accounts_pool, curr ) ) {
1552 0 : bank_hash_cmp->total_stake += curr->elem.stake;
1553 0 : }
1554 0 : bank_hash_cmp->watermark = snapshot_slot;
1555 :
1556 0 : fd_bank_stakes_end_locking_query( ctx->slot_ctx->bank );
1557 :
1558 0 : ulong root = snapshot_slot;
1559 0 : if( FD_LIKELY( root > fd_fseq_query( ctx->published_wmark ) ) ) {
1560 :
1561 : /* The watermark has advanced likely because we loaded an
1562 : incremental snapshot that was downloaded just-in-time. We had
1563 : kicked off repair with an older incremental snapshot, and so now
1564 : we have to prune the relevant data structures, so replay can
1565 : start from the latest frontier.
1566 :
1567 : No funk_and_txncache_publish( ctx, wmark, &xid ); because there
1568 : are no funk txns to publish, and all rooted slots have already
1569 : been registered in the txncache when we loaded the snapshot. */
1570 :
1571 0 : if( FD_LIKELY( ctx->blockstore ) ) fd_blockstore_publish( ctx->blockstore, ctx->blockstore_fd, root );
1572 0 : if( FD_LIKELY( ctx->forks ) ) fd_forks_publish( ctx->forks, root );
1573 :
1574 0 : fd_fseq_update( ctx->published_wmark, root );
1575 0 : }
1576 :
1577 0 : FD_LOG_NOTICE(( "snapshot slot %lu", snapshot_slot ));
1578 0 : }
1579 :
1580 : void
1581 : init_snapshot( fd_replay_tile_ctx_t * ctx,
1582 0 : fd_stem_context_t * stem ) {
1583 : /* Init slot_ctx */
1584 :
1585 0 : uchar * slot_ctx_mem = fd_spad_alloc_check( ctx->runtime_spad, FD_EXEC_SLOT_CTX_ALIGN, FD_EXEC_SLOT_CTX_FOOTPRINT );
1586 0 : ctx->slot_ctx = fd_exec_slot_ctx_join( fd_exec_slot_ctx_new( slot_ctx_mem ) );
1587 0 : ctx->slot_ctx->banks = ctx->banks;
1588 0 : ctx->slot_ctx->bank = fd_banks_get_bank( ctx->banks, 0UL );
1589 :
1590 0 : ctx->slot_ctx->funk = ctx->funk;
1591 0 : ctx->slot_ctx->status_cache = ctx->status_cache;
1592 0 : fd_runtime_update_slots_per_epoch( ctx->slot_ctx->bank, FD_DEFAULT_SLOTS_PER_EPOCH );
1593 :
1594 0 : uchar is_snapshot = strlen( ctx->snapshot ) > 0;
1595 0 : if( is_snapshot ) {
1596 0 : read_snapshot( ctx, stem, ctx->snapshot, ctx->incremental, ctx->snapshot_dir );
1597 0 : }
1598 :
1599 0 : if( ctx->plugin_out->mem ) {
1600 0 : uchar msg[56];
1601 0 : fd_memset( msg, 0, sizeof(msg) );
1602 0 : msg[ 0 ] = 6;
1603 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
1604 0 : }
1605 :
1606 0 : fd_runtime_read_genesis( ctx->slot_ctx,
1607 0 : ctx->genesis,
1608 0 : is_snapshot,
1609 0 : ctx->capture_ctx,
1610 0 : ctx->runtime_spad );
1611 : /* We call this after fd_runtime_read_genesis, which sets up the
1612 : slot_bank needed in blockstore_init. */
1613 : /* FIXME: We should really only call this once. */
1614 0 : fd_blockstore_init( ctx->blockstore,
1615 0 : ctx->blockstore_fd,
1616 0 : FD_BLOCKSTORE_ARCHIVE_MIN_SIZE,
1617 0 : ctx->curr_slot );
1618 0 : init_after_snapshot( ctx, stem );
1619 :
1620 0 : if( ctx->plugin_out->mem && strlen( ctx->genesis ) > 0 ) {
1621 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_GENESIS_HASH_KNOWN, fd_bank_genesis_hash_query( ctx->slot_ctx->bank )->hash, sizeof(fd_hash_t) );
1622 0 : }
1623 :
1624 : // Tell the world about the current activate features
1625 0 : fd_features_t const * features = fd_bank_features_query( ctx->slot_ctx->bank );
1626 0 : fd_memcpy( &ctx->runtime_public->features, features, sizeof(ctx->runtime_public->features) );
1627 :
1628 : /* Publish slot notifs */
1629 0 : ulong curr_slot = ctx->curr_slot;
1630 0 : ulong block_entry_height = 0;
1631 :
1632 0 : if( is_snapshot ) {
1633 0 : for(;;){
1634 0 : fd_block_map_query_t query[1] = { 0 };
1635 0 : int err = fd_block_map_query_try( ctx->blockstore->block_map, &curr_slot, NULL, query, 0 );
1636 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
1637 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) continue; /* FIXME: eventually need an error condition here.*/
1638 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) {
1639 0 : FD_LOG_WARNING(( "Waiting for block map query for slot %lu", curr_slot ));
1640 0 : continue;
1641 0 : };
1642 0 : block_entry_height = block_info->block_height;
1643 0 : if( FD_UNLIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) break;
1644 0 : }
1645 0 : } else {
1646 : /* Block after genesis has a height of 1.
1647 : TODO: We should be able to query slot 1 block_map entry to get this
1648 : (using the above for loop), but blockstore/fork setup on genesis is
1649 : broken for now. */
1650 0 : block_entry_height = 1UL;
1651 0 : init_poh( ctx );
1652 0 : }
1653 :
1654 0 : publish_slot_notifications( ctx, stem, block_entry_height, curr_slot );
1655 :
1656 :
1657 0 : FD_TEST( ctx->slot_ctx );
1658 0 : }
1659 :
1660 : static void
1661 : publish_votes_to_plugin( fd_replay_tile_ctx_t * ctx,
1662 0 : fd_stem_context_t * stem ) {
1663 0 : uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->votes_plugin_out->mem, ctx->votes_plugin_out->chunk );
1664 :
1665 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &ctx->curr_slot, NULL, ctx->forks->pool );
1666 0 : if( FD_UNLIKELY ( !fork ) ) return;
1667 :
1668 0 : fd_vote_accounts_global_t const * epoch_stakes = fd_bank_epoch_stakes_locking_query( ctx->slot_ctx->bank );
1669 0 : fd_vote_accounts_pair_global_t_mapnode_t * epoch_stakes_pool = fd_vote_accounts_vote_accounts_pool_join( epoch_stakes );
1670 0 : fd_vote_accounts_pair_global_t_mapnode_t * epoch_stakes_root = fd_vote_accounts_vote_accounts_root_join( epoch_stakes );
1671 :
1672 0 : ulong i = 0;
1673 0 : FD_SPAD_FRAME_BEGIN( ctx->runtime_spad ) {
1674 0 : for( fd_vote_accounts_pair_global_t_mapnode_t const * n = fd_vote_accounts_pair_global_t_map_minimum_const( epoch_stakes_pool, epoch_stakes_root );
1675 0 : n && i < FD_CLUSTER_NODE_CNT;
1676 0 : n = fd_vote_accounts_pair_global_t_map_successor_const( epoch_stakes_pool, n ) ) {
1677 0 : if( n->elem.stake == 0 ) continue;
1678 :
1679 0 : uchar * data = (uchar *)&n->elem.value + n->elem.value.data_offset;
1680 0 : ulong data_len = n->elem.value.data_len;
1681 :
1682 0 : int err;
1683 0 : fd_vote_state_versioned_t * vsv = fd_bincode_decode_spad(
1684 0 : vote_state_versioned, ctx->runtime_spad,
1685 0 : data,
1686 0 : data_len,
1687 0 : &err );
1688 0 : if( FD_UNLIKELY( err ) ) {
1689 0 : FD_LOG_ERR(( "Unexpected failure in decoding vote state %d", err ));
1690 0 : }
1691 :
1692 0 : fd_pubkey_t node_pubkey;
1693 0 : ulong commission;
1694 0 : ulong epoch_credits;
1695 0 : fd_vote_epoch_credits_t const * _epoch_credits;
1696 0 : ulong root_slot;
1697 :
1698 0 : switch( vsv->discriminant ) {
1699 0 : case fd_vote_state_versioned_enum_v0_23_5:
1700 0 : node_pubkey = vsv->inner.v0_23_5.node_pubkey;
1701 0 : commission = vsv->inner.v0_23_5.commission;
1702 0 : _epoch_credits = deq_fd_vote_epoch_credits_t_cnt( vsv->inner.v0_23_5.epoch_credits ) == 0 ? NULL : deq_fd_vote_epoch_credits_t_peek_tail_const( vsv->inner.v0_23_5.epoch_credits );
1703 0 : epoch_credits = _epoch_credits==NULL ? 0UL : _epoch_credits->credits - _epoch_credits->prev_credits;
1704 0 : root_slot = vsv->inner.v0_23_5.root_slot;
1705 0 : break;
1706 0 : case fd_vote_state_versioned_enum_v1_14_11:
1707 0 : node_pubkey = vsv->inner.v1_14_11.node_pubkey;
1708 0 : commission = vsv->inner.v1_14_11.commission;
1709 0 : _epoch_credits = deq_fd_vote_epoch_credits_t_cnt( vsv->inner.v1_14_11.epoch_credits ) == 0 ? NULL : deq_fd_vote_epoch_credits_t_peek_tail_const( vsv->inner.v1_14_11.epoch_credits );
1710 0 : epoch_credits = _epoch_credits==NULL ? 0UL : _epoch_credits->credits - _epoch_credits->prev_credits;
1711 0 : root_slot = vsv->inner.v1_14_11.root_slot;
1712 0 : break;
1713 0 : case fd_vote_state_versioned_enum_current:
1714 0 : node_pubkey = vsv->inner.current.node_pubkey;
1715 0 : commission = vsv->inner.current.commission;
1716 0 : _epoch_credits = deq_fd_vote_epoch_credits_t_cnt( vsv->inner.current.epoch_credits ) == 0 ? NULL : deq_fd_vote_epoch_credits_t_peek_tail_const( vsv->inner.current.epoch_credits );
1717 0 : epoch_credits = _epoch_credits==NULL ? 0UL : _epoch_credits->credits - _epoch_credits->prev_credits;
1718 0 : root_slot = vsv->inner.v0_23_5.root_slot;
1719 0 : break;
1720 0 : default:
1721 0 : __builtin_unreachable();
1722 0 : }
1723 :
1724 0 : fd_clock_timestamp_vote_t_mapnode_t query;
1725 0 : memcpy( query.elem.pubkey.uc, n->elem.key.uc, 32UL );
1726 0 : fd_clock_timestamp_votes_global_t const * clock_timestamp_votes = fd_bank_clock_timestamp_votes_locking_query( ctx->slot_ctx->bank );
1727 0 : fd_clock_timestamp_vote_t_mapnode_t * timestamp_votes_root = fd_clock_timestamp_votes_votes_root_join( clock_timestamp_votes );
1728 0 : fd_clock_timestamp_vote_t_mapnode_t * timestamp_votes_pool = fd_clock_timestamp_votes_votes_pool_join( clock_timestamp_votes );
1729 :
1730 0 : fd_clock_timestamp_vote_t_mapnode_t * res = fd_clock_timestamp_vote_t_map_find( timestamp_votes_pool, timestamp_votes_root, &query );
1731 :
1732 0 : fd_vote_update_msg_t * msg = (fd_vote_update_msg_t *)(dst + sizeof(ulong) + i*112U);
1733 0 : memset( msg, 0, 112U );
1734 0 : memcpy( msg->vote_pubkey, n->elem.key.uc, sizeof(fd_pubkey_t) );
1735 0 : memcpy( msg->node_pubkey, node_pubkey.uc, sizeof(fd_pubkey_t) );
1736 0 : msg->activated_stake = n->elem.stake;
1737 0 : msg->last_vote = res == NULL ? 0UL : res->elem.slot;
1738 0 : msg->root_slot = root_slot;
1739 0 : msg->epoch_credits = epoch_credits;
1740 0 : msg->commission = (uchar)commission;
1741 0 : msg->is_delinquent = (uchar)fd_int_if(ctx->curr_slot >= 128UL, msg->last_vote <= ctx->curr_slot - 128UL, msg->last_vote == 0);
1742 0 : ++i;
1743 0 : fd_bank_clock_timestamp_votes_end_locking_query( ctx->slot_ctx->bank );
1744 0 : }
1745 0 : } FD_SPAD_FRAME_END;
1746 :
1747 0 : fd_bank_epoch_stakes_end_locking_query( ctx->slot_ctx->bank );
1748 :
1749 0 : *(ulong *)dst = i;
1750 :
1751 0 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
1752 0 : fd_stem_publish( stem, ctx->votes_plugin_out->idx, FD_PLUGIN_MSG_VOTE_ACCOUNT_UPDATE, ctx->votes_plugin_out->chunk, 0, 0UL, 0UL, tspub );
1753 0 : ctx->votes_plugin_out->chunk = fd_dcache_compact_next( ctx->votes_plugin_out->chunk, 8UL + 40200UL*(58UL+12UL*34UL), ctx->votes_plugin_out->chunk0, ctx->votes_plugin_out->wmark );
1754 0 : }
1755 :
1756 : static void
1757 0 : handle_writer_state_updates( fd_replay_tile_ctx_t * ctx ) {
1758 :
1759 0 : for( ulong i=0UL; i<ctx->writer_cnt; i++ ) {
1760 0 : ulong res = fd_fseq_query( ctx->writer_fseq[ i ] );
1761 0 : if( FD_UNLIKELY( fd_writer_fseq_is_not_joined( res ) ) ) {
1762 0 : FD_LOG_WARNING(( "writer tile fseq idx=%lu has not been joined by the corresponding writer tile", i ));
1763 0 : continue;
1764 0 : }
1765 :
1766 0 : uint state = fd_writer_fseq_get_state( res );
1767 0 : switch( state ) {
1768 0 : case FD_WRITER_STATE_NOT_BOOTED:
1769 0 : FD_LOG_WARNING(( "writer tile idx=%lu is still booting", i ));
1770 0 : break;
1771 0 : case FD_WRITER_STATE_READY:
1772 : /* No-op. */
1773 0 : break;
1774 0 : case FD_WRITER_STATE_TXN_DONE: {
1775 0 : uint txn_id = fd_writer_fseq_get_txn_id( res );
1776 0 : ulong exec_tile_id = fd_writer_fseq_get_exec_tile_id( res );
1777 0 : if( ctx->exec_ready[ exec_tile_id ]==EXEC_TXN_BUSY && ctx->prev_ids[ exec_tile_id ]!=txn_id ) {
1778 0 : FD_LOG_DEBUG(( "Ack that exec tile idx=%lu txn id=%u has been finalized by writer tile %lu", exec_tile_id, txn_id, i ));
1779 0 : ctx->exec_ready[ exec_tile_id ] = EXEC_TXN_READY;
1780 0 : ctx->prev_ids[ exec_tile_id ] = txn_id;
1781 0 : fd_fseq_update( ctx->writer_fseq[ i ], FD_WRITER_STATE_READY );
1782 0 : }
1783 0 : break;
1784 0 : }
1785 0 : default:
1786 0 : FD_LOG_CRIT(( "Unexpected fseq state from writer tile idx=%lu state=%u", i, state ));
1787 0 : break;
1788 0 : }
1789 0 : }
1790 :
1791 0 : }
1792 :
1793 : static void
1794 : after_credit( fd_replay_tile_ctx_t * ctx,
1795 : fd_stem_context_t * stem,
1796 : int * opt_poll_in FD_PARAM_UNUSED,
1797 0 : int * charge_busy FD_PARAM_UNUSED ) {
1798 :
1799 0 : if( FD_UNLIKELY( !ctx->snapshot_init_done ) ) {
1800 0 : if( ctx->plugin_out->mem ) {
1801 0 : uchar msg[56];
1802 0 : fd_memset( msg, 0, sizeof(msg) );
1803 0 : msg[ 0 ] = 0; // ValidatorStartProgress::Initializing
1804 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
1805 0 : }
1806 0 : init_snapshot( ctx, stem );
1807 0 : ctx->snapshot_init_done = 1;
1808 : //*charge_busy = 0;
1809 0 : }
1810 :
1811 : /* TODO: Consider moving state management to during_housekeeping */
1812 :
1813 : /* Check all the writer link fseqs. */
1814 0 : handle_writer_state_updates( ctx );
1815 :
1816 : /* If we are ready to process a new slice, we will poll for it and try
1817 : to setup execution for it. */
1818 0 : if( ctx->flags & EXEC_FLAG_READY_NEW ) {
1819 0 : handle_slice( ctx, stem );
1820 0 : }
1821 :
1822 : /* If we are currently executing a slice, proceed. */
1823 0 : if( ctx->flags & EXEC_FLAG_EXECUTING_SLICE ) {
1824 0 : exec_slice( ctx, stem, ctx->curr_slot );
1825 0 : }
1826 :
1827 0 : ulong curr_slot = ctx->curr_slot;
1828 0 : ulong flags = ctx->flags;
1829 :
1830 : /* Finished replaying a slot in this after_credit iteration. */
1831 0 : if( FD_UNLIKELY( flags & EXEC_FLAG_FINISHED_SLOT ) ){
1832 :
1833 : /* Check if the validator is caught up, and can safely be unmarked
1834 : as read-only. This happens when it has replayed through
1835 : turbine_slot0. */
1836 :
1837 0 : if( FD_UNLIKELY( ctx->read_only && ctx->curr_slot >= fd_fseq_query( ctx->turbine_slot0 ) ) ) {
1838 0 : ctx->read_only = 0;
1839 0 : }
1840 :
1841 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &ctx->curr_slot, NULL, ctx->forks->pool );
1842 :
1843 0 : FD_LOG_NOTICE(( "finished block - slot: %lu, parent_slot: %lu", curr_slot, ctx->parent_slot ));
1844 :
1845 : /**************************************************************************************************/
1846 : /* Call fd_runtime_block_execute_finalize_tpool which updates sysvar and cleanup some other stuff */
1847 : /**************************************************************************************************/
1848 :
1849 0 : fd_runtime_block_info_t runtime_block_info[1];
1850 0 : runtime_block_info->signature_cnt = fd_bank_signature_count_get( ctx->slot_ctx->bank );
1851 :
1852 0 : ctx->block_finalizing = 0;
1853 :
1854 0 : fd_exec_para_cb_ctx_t exec_para_ctx_block_finalize = {
1855 0 : .func = block_finalize_tiles_cb,
1856 0 : .para_arg_1 = ctx,
1857 0 : .para_arg_2 = stem,
1858 0 : };
1859 :
1860 0 : fd_runtime_block_execute_finalize_para( ctx->slot_ctx,
1861 0 : ctx->capture_ctx,
1862 0 : runtime_block_info,
1863 0 : ctx->exec_cnt,
1864 0 : ctx->runtime_spad,
1865 0 : &exec_para_ctx_block_finalize );
1866 :
1867 : /* Update blockstore with the freshly computed bank hash */
1868 0 : fd_block_map_query_t query[1] = { 0 };
1869 0 : fd_block_map_prepare( ctx->blockstore->block_map, &curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
1870 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
1871 0 : block_info->bank_hash = fd_bank_bank_hash_get( ctx->slot_ctx->bank );
1872 0 : fd_block_map_publish( query );
1873 :
1874 0 : fd_spad_pop( ctx->runtime_spad );
1875 0 : FD_LOG_NOTICE(( "Spad memory after executing block %lu", ctx->runtime_spad->mem_used ));
1876 :
1877 : /**********************************************************************/
1878 : /* Push notifications for slot updates and reset block_info flag */
1879 : /**********************************************************************/
1880 :
1881 0 : ulong block_entry_height = fd_blockstore_block_height_query( ctx->blockstore, curr_slot );
1882 0 : publish_slot_notifications( ctx, stem, block_entry_height, curr_slot );
1883 :
1884 0 : ctx->blockstore->shmem->lps = curr_slot;
1885 :
1886 0 : FD_TEST(fork->slot == curr_slot);
1887 0 : fork->lock = 0;
1888 :
1889 : // FD_LOG_NOTICE(( "ulong_max? %d", ctx->tower_out_idx==ULONG_MAX ));
1890 0 : if( FD_LIKELY( ctx->tower_out_idx!=ULONG_MAX && !ctx->read_only ) ) {
1891 0 : uchar * chunk_laddr = fd_chunk_to_laddr( ctx->tower_out_mem, ctx->tower_out_chunk );
1892 0 : fd_hash_t const * bank_hash = fd_bank_bank_hash_query( ctx->slot_ctx->bank );
1893 0 : fd_block_hash_queue_global_t * block_hash_queue = (fd_block_hash_queue_global_t *)&ctx->slot_ctx->bank->block_hash_queue[0];
1894 0 : fd_hash_t * last_hash = fd_block_hash_queue_last_hash_join( block_hash_queue );
1895 :
1896 0 : memcpy( chunk_laddr, bank_hash, sizeof(fd_hash_t) );
1897 0 : memcpy( chunk_laddr+sizeof(fd_hash_t), last_hash, sizeof(fd_hash_t) );
1898 0 : fd_stem_publish( stem, ctx->tower_out_idx, ctx->curr_slot << 32UL | ctx->parent_slot, ctx->tower_out_chunk, sizeof(fd_hash_t) * 2, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ), fd_frag_meta_ts_comp( fd_tickcount() ) );
1899 0 : }
1900 :
1901 : // if (FD_UNLIKELY( prev_confirmed!=ctx->forks->confirmed && ctx->plugin_out->mem ) ) {
1902 : // ulong msg[ 1 ] = { ctx->forks->confirmed };
1903 : // replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_SLOT_OPTIMISTICALLY_CONFIRMED, (uchar const *)msg, sizeof(msg) );
1904 : // }
1905 :
1906 : // if (FD_UNLIKELY( prev_finalized!=ctx->forks->finalized && ctx->plugin_out->mem ) ) {
1907 : // ulong msg[ 1 ] = { ctx->forks->finalized };
1908 : // replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_SLOT_ROOTED, (uchar const *)msg, sizeof(msg) );
1909 : // }
1910 :
1911 : /**********************************************************************/
1912 : /* Prepare bank for the next execution and write to debugging files */
1913 : /**********************************************************************/
1914 :
1915 0 : ulong prev_slot = ctx->slot_ctx->slot;
1916 :
1917 0 : fd_bank_execution_fees_set( ctx->slot_ctx->bank, 0UL );
1918 :
1919 0 : fd_bank_priority_fees_set( ctx->slot_ctx->bank, 0UL );
1920 :
1921 0 : if( FD_UNLIKELY( ctx->slots_replayed_file ) ) {
1922 0 : FD_LOG_DEBUG(( "writing %lu to slots file", prev_slot ));
1923 0 : fprintf( ctx->slots_replayed_file, "%lu\n", prev_slot );
1924 0 : fflush( ctx->slots_replayed_file );
1925 0 : }
1926 :
1927 0 : if (NULL != ctx->capture_ctx) {
1928 0 : fd_solcap_writer_flush( ctx->capture_ctx->capture );
1929 0 : }
1930 :
1931 : /**********************************************************************/
1932 : /* Bank hash comparison, and halt if there's a mismatch after replay */
1933 : /**********************************************************************/
1934 :
1935 0 : fd_hash_t const * bank_hash = fd_bank_bank_hash_query( ctx->slot_ctx->bank );
1936 0 : fd_bank_hash_cmp_t * bank_hash_cmp = ctx->bank_hash_cmp;
1937 0 : fd_bank_hash_cmp_lock( bank_hash_cmp );
1938 0 : fd_bank_hash_cmp_insert( bank_hash_cmp, curr_slot, bank_hash, 1, 0 );
1939 :
1940 : /* Try to move the bank hash comparison watermark forward */
1941 0 : for( ulong cmp_slot = bank_hash_cmp->watermark + 1; cmp_slot < curr_slot; cmp_slot++ ) {
1942 0 : if( FD_UNLIKELY( !ctx->enable_bank_hash_cmp ) ) {
1943 0 : bank_hash_cmp->watermark = cmp_slot;
1944 0 : break;
1945 0 : }
1946 0 : int rc = fd_bank_hash_cmp_check( bank_hash_cmp, cmp_slot );
1947 0 : switch ( rc ) {
1948 0 : case -1:
1949 :
1950 : /* Mismatch */
1951 :
1952 : //funk_cancel( ctx, cmp_slot );
1953 : //checkpt( ctx );
1954 0 : FD_LOG_WARNING(( "Bank hash mismatch on slot: %lu. Halting.", cmp_slot ));
1955 :
1956 0 : break;
1957 :
1958 0 : case 0:
1959 :
1960 : /* Not ready */
1961 :
1962 0 : break;
1963 :
1964 0 : case 1:
1965 :
1966 : /* Match*/
1967 :
1968 0 : bank_hash_cmp->watermark = cmp_slot;
1969 0 : break;
1970 :
1971 0 : default:;
1972 0 : }
1973 0 : }
1974 :
1975 0 : fd_bank_hash_cmp_unlock( bank_hash_cmp );
1976 0 : ctx->flags = EXEC_FLAG_READY_NEW;
1977 0 : } // end of if( FD_UNLIKELY( ( flags & EXEC_FLAG_FINISHED_SLOT ) ) )
1978 :
1979 0 : long now = fd_log_wallclock();
1980 0 : if( ctx->votes_plugin_out->mem && FD_UNLIKELY( ( now - ctx->last_plugin_push_time )>PLUGIN_PUBLISH_TIME_NS ) ) {
1981 0 : ctx->last_plugin_push_time = now;
1982 0 : publish_votes_to_plugin( ctx, stem );
1983 0 : }
1984 :
1985 0 : }
1986 :
1987 : static void
1988 : privileged_init( fd_topo_t * topo,
1989 0 : fd_topo_tile_t * tile ) {
1990 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1991 :
1992 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1993 0 : fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
1994 0 : FD_SCRATCH_ALLOC_FINI ( l, scratch_align() );
1995 0 : memset( ctx, 0, sizeof(fd_replay_tile_ctx_t) );
1996 :
1997 0 : FD_TEST( sizeof(ulong) == getrandom( &ctx->funk_seed, sizeof(ulong), 0 ) );
1998 0 : FD_TEST( sizeof(ulong) == getrandom( &ctx->status_cache_seed, sizeof(ulong), 0 ) );
1999 :
2000 0 : ctx->blockstore_fd = open( tile->replay.blockstore_file, O_RDWR | O_CREAT, 0666 );
2001 0 : if( FD_UNLIKELY( ctx->blockstore_fd == -1 ) ) {
2002 0 : FD_LOG_ERR(( "failed to open or create blockstore archival file %s %d %d %s", tile->replay.blockstore_file, ctx->blockstore_fd, errno, strerror(errno) ));
2003 0 : }
2004 :
2005 : /**********************************************************************/
2006 : /* runtime public */
2007 : /**********************************************************************/
2008 :
2009 0 : ulong replay_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "runtime_pub" );
2010 0 : if( FD_UNLIKELY( replay_obj_id==ULONG_MAX ) ) {
2011 0 : FD_LOG_ERR(( "no runtime_public" ));
2012 0 : }
2013 :
2014 0 : ctx->runtime_public_wksp = topo->workspaces[ topo->objs[ replay_obj_id ].wksp_id ].wksp;
2015 0 : if( ctx->runtime_public_wksp==NULL ) {
2016 0 : FD_LOG_ERR(( "no runtime_public workspace" ));
2017 0 : }
2018 :
2019 0 : ctx->runtime_public = fd_runtime_public_join( fd_topo_obj_laddr( topo, replay_obj_id ) );
2020 0 : if( FD_UNLIKELY( !ctx->runtime_public ) ) {
2021 0 : FD_LOG_ERR(( "no runtime_public" ));
2022 0 : }
2023 0 : }
2024 :
2025 : static void
2026 : unprivileged_init( fd_topo_t * topo,
2027 0 : fd_topo_tile_t * tile ) {
2028 :
2029 0 : FD_LOG_NOTICE(("Starting unprivileged init"));
2030 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
2031 :
2032 0 : if( FD_UNLIKELY( tile->in_cnt < 3 ||
2033 0 : strcmp( topo->links[ tile->in_link_id[ PACK_IN_IDX ] ].name, "pack_replay") ||
2034 0 : strcmp( topo->links[ tile->in_link_id[ REPAIR_IN_IDX ] ].name, "repair_repla" ) ) ) {
2035 0 : FD_LOG_ERR(( "replay tile has none or unexpected input links %lu %s %s",
2036 0 : tile->in_cnt, topo->links[ tile->in_link_id[ 0 ] ].name, topo->links[ tile->in_link_id[ 1 ] ].name ));
2037 0 : }
2038 :
2039 : /**********************************************************************/
2040 : /* scratch (bump)-allocate memory owned by the replay tile */
2041 : /**********************************************************************/
2042 :
2043 : /* Do not modify order! This is join-order in unprivileged_init. */
2044 :
2045 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
2046 0 : fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
2047 0 : void * capture_ctx_mem = FD_SCRATCH_ALLOC_APPEND( l, FD_CAPTURE_CTX_ALIGN, FD_CAPTURE_CTX_FOOTPRINT );
2048 0 : void * forks_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_forks_align(), fd_forks_footprint( FD_BLOCK_MAX ) );
2049 0 : for( ulong i = 0UL; i<FD_PACK_MAX_BANK_TILES; i++ ) {
2050 0 : ctx->bmtree[i] = FD_SCRATCH_ALLOC_APPEND( l, FD_BMTREE_COMMIT_ALIGN, FD_BMTREE_COMMIT_FOOTPRINT(0) );
2051 0 : }
2052 0 : void * slice_buf = FD_SCRATCH_ALLOC_APPEND( l, 128UL, FD_SLICE_MAX );
2053 0 : ulong scratch_alloc_mem = FD_SCRATCH_ALLOC_FINI ( l, scratch_align() );
2054 :
2055 0 : if( FD_UNLIKELY( scratch_alloc_mem != ( (ulong)scratch + scratch_footprint( tile ) ) ) ) {
2056 0 : FD_LOG_ERR( ( "scratch_alloc_mem did not match scratch_footprint diff: %lu alloc: %lu footprint: %lu",
2057 0 : scratch_alloc_mem - (ulong)scratch - scratch_footprint( tile ),
2058 0 : scratch_alloc_mem,
2059 0 : (ulong)scratch + scratch_footprint( tile ) ) );
2060 0 : }
2061 :
2062 : /**********************************************************************/
2063 : /* wksp */
2064 : /**********************************************************************/
2065 :
2066 0 : ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
2067 :
2068 0 : ulong blockstore_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "blockstore" );
2069 0 : FD_TEST( blockstore_obj_id!=ULONG_MAX );
2070 0 : ctx->blockstore_wksp = topo->workspaces[ topo->objs[ blockstore_obj_id ].wksp_id ].wksp;
2071 0 : if( ctx->blockstore_wksp==NULL ) {
2072 0 : FD_LOG_ERR(( "no blockstore wksp" ));
2073 0 : }
2074 :
2075 0 : ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, fd_topo_obj_laddr( topo, blockstore_obj_id ) );
2076 0 : fd_buf_shred_pool_reset( ctx->blockstore->shred_pool, 0 );
2077 0 : FD_TEST( ctx->blockstore->shmem->magic == FD_BLOCKSTORE_MAGIC );
2078 :
2079 0 : ulong status_cache_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "txncache" );
2080 0 : FD_TEST( status_cache_obj_id != ULONG_MAX );
2081 0 : ctx->status_cache_wksp = topo->workspaces[topo->objs[status_cache_obj_id].wksp_id].wksp;
2082 0 : if( ctx->status_cache_wksp == NULL ) {
2083 0 : FD_LOG_ERR(( "no status cache wksp" ));
2084 0 : }
2085 :
2086 : /**********************************************************************/
2087 : /* banks */
2088 : /**********************************************************************/
2089 :
2090 0 : ulong banks_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "banks" );
2091 0 : if( FD_UNLIKELY( banks_obj_id==ULONG_MAX ) ) {
2092 0 : FD_LOG_ERR(( "no banks" ));
2093 0 : }
2094 :
2095 0 : ctx->banks = fd_banks_join( fd_topo_obj_laddr( topo, banks_obj_id ) );
2096 0 : if( FD_UNLIKELY( !ctx->banks ) ) {
2097 0 : FD_LOG_ERR(( "failed to join banks" ));
2098 0 : }
2099 0 : fd_bank_t * bank = fd_banks_init_bank( ctx->banks, 0UL );
2100 0 : if( FD_UNLIKELY( !bank ) ) {
2101 0 : FD_LOG_ERR(( "failed to init bank" ));
2102 0 : }
2103 :
2104 : /**********************************************************************/
2105 : /* funk */
2106 : /**********************************************************************/
2107 :
2108 0 : if( FD_UNLIKELY( !fd_funk_join( ctx->funk, fd_topo_obj_laddr( topo, tile->replay.funk_obj_id ) ) ) ) {
2109 0 : FD_LOG_ERR(( "Failed to join database cache" ));
2110 0 : }
2111 :
2112 : /**********************************************************************/
2113 : /* root_slot fseq */
2114 : /**********************************************************************/
2115 :
2116 0 : ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
2117 0 : FD_TEST( root_slot_obj_id!=ULONG_MAX );
2118 0 : ctx->published_wmark = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
2119 0 : if( FD_UNLIKELY( !ctx->published_wmark ) ) FD_LOG_ERR(( "replay tile has no root_slot fseq" ));
2120 0 : FD_TEST( ULONG_MAX==fd_fseq_query( ctx->published_wmark ) );
2121 :
2122 : /**********************************************************************/
2123 : /* constipated fseq */
2124 : /**********************************************************************/
2125 :
2126 : /* When the replay tile boots, funk should not be constipated */
2127 :
2128 0 : ulong constipated_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "constipate" );
2129 0 : FD_TEST( constipated_obj_id!=ULONG_MAX );
2130 0 : ctx->is_constipated = fd_fseq_join( fd_topo_obj_laddr( topo, constipated_obj_id ) );
2131 0 : if( FD_UNLIKELY( !ctx->is_constipated ) ) FD_LOG_ERR(( "replay tile has no constipated fseq" ));
2132 0 : fd_fseq_update( ctx->is_constipated, 0UL );
2133 0 : FD_TEST( 0UL==fd_fseq_query( ctx->is_constipated ) );
2134 :
2135 : /**********************************************************************/
2136 : /* turbine_slot fseq */
2137 : /**********************************************************************/
2138 :
2139 0 : ulong turbine_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "turbine_slot" );
2140 0 : FD_TEST( turbine_slot_obj_id!=ULONG_MAX );
2141 0 : ctx->turbine_slot = fd_fseq_join( fd_topo_obj_laddr( topo, turbine_slot_obj_id ) );
2142 0 : if( FD_UNLIKELY( !ctx->turbine_slot ) ) FD_LOG_ERR(( "replay tile has no turb_slot fseq" ));
2143 :
2144 : /**********************************************************************/
2145 : /* TOML paths */
2146 : /**********************************************************************/
2147 :
2148 0 : ctx->blockstore_checkpt = tile->replay.blockstore_checkpt;
2149 0 : ctx->tx_metadata_storage = tile->replay.tx_metadata_storage;
2150 0 : ctx->funk_checkpt = tile->replay.funk_checkpt;
2151 0 : ctx->genesis = tile->replay.genesis;
2152 0 : ctx->incremental = tile->replay.incremental;
2153 0 : ctx->snapshot = tile->replay.snapshot;
2154 0 : ctx->snapshot_dir = tile->replay.snapshot_dir;
2155 :
2156 0 : ctx->incremental_src_type = tile->replay.incremental_src_type;
2157 0 : ctx->snapshot_src_type = tile->replay.snapshot_src_type;
2158 :
2159 : /**********************************************************************/
2160 : /* status cache */
2161 : /**********************************************************************/
2162 :
2163 0 : char const * status_cache_path = tile->replay.status_cache;
2164 0 : if ( strlen( status_cache_path ) > 0 ) {
2165 0 : FD_LOG_NOTICE(("starting status cache restore..."));
2166 0 : int err = fd_wksp_restore( ctx->status_cache_wksp, status_cache_path, (uint)ctx->status_cache_seed );
2167 0 : FD_LOG_NOTICE(("finished status cache restore..."));
2168 0 : if (err) {
2169 0 : FD_LOG_ERR(( "failed to restore %s: error %d", status_cache_path, err ));
2170 0 : }
2171 0 : fd_wksp_tag_query_info_t info;
2172 0 : ulong tag = FD_TXNCACHE_MAGIC;
2173 0 : if( fd_wksp_tag_query( ctx->status_cache_wksp, &tag, 1, &info, 1 ) > 0 ) {
2174 0 : void * status_cache_mem = fd_wksp_laddr_fast( ctx->status_cache_wksp, info.gaddr_lo );
2175 : /* Set up status cache. */
2176 0 : ctx->status_cache = fd_txncache_join( status_cache_mem );
2177 0 : if( ctx->status_cache == NULL ) {
2178 0 : FD_LOG_ERR(( "failed to join status cache in %s", status_cache_path ));
2179 0 : }
2180 0 : } else {
2181 0 : FD_LOG_ERR(( "failed to tag query status cache in %s", status_cache_path ));
2182 0 : }
2183 0 : } else {
2184 0 : void * status_cache_mem = fd_topo_obj_laddr( topo, status_cache_obj_id );
2185 0 : if (status_cache_mem == NULL) {
2186 0 : FD_LOG_ERR(( "failed to allocate status cache" ));
2187 0 : }
2188 0 : ctx->status_cache = fd_txncache_join( status_cache_mem );
2189 0 : if (ctx->status_cache == NULL) {
2190 0 : FD_LOG_ERR(( "failed to join + new status cache" ));
2191 0 : }
2192 0 : }
2193 :
2194 : /**********************************************************************/
2195 : /* spad */
2196 : /**********************************************************************/
2197 :
2198 : /* Join each of the exec spads. */
2199 0 : ctx->exec_cnt = fd_topo_tile_name_cnt( topo, "exec" );
2200 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
2201 0 : ulong exec_spad_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "exec_spad.%lu", i );
2202 0 : fd_spad_t * spad = fd_spad_join( fd_topo_obj_laddr( topo, exec_spad_id ) );
2203 0 : ctx->exec_spads[ ctx->exec_spad_cnt ] = spad;
2204 0 : if( FD_UNLIKELY( !ctx->exec_spads[ ctx->exec_spad_cnt ] ) ) {
2205 0 : FD_LOG_ERR(( "failed to join exec spad %lu", i ));
2206 0 : }
2207 0 : ctx->exec_spads_wksp[ ctx->exec_spad_cnt ] = fd_wksp_containing( spad );
2208 0 : if( FD_UNLIKELY( !ctx->exec_spads_wksp[ ctx->exec_spad_cnt ] ) ) {
2209 0 : FD_LOG_ERR(( "failed to join exec spad wksp %lu", i ));
2210 0 : }
2211 :
2212 0 : ctx->exec_spad_cnt++;
2213 0 : }
2214 :
2215 : /* Now join the spad that was setup in the runtime public topo obj. */
2216 :
2217 0 : ctx->runtime_spad = fd_runtime_public_spad( ctx->runtime_public );
2218 0 : if( FD_UNLIKELY( !ctx->runtime_spad ) ) {
2219 0 : FD_LOG_ERR(( "Unable to join the runtime_spad" ));
2220 0 : }
2221 0 : fd_spad_push( ctx->runtime_spad );
2222 :
2223 : /**********************************************************************/
2224 : /* joins */
2225 : /**********************************************************************/
2226 :
2227 0 : uchar * bank_hash_cmp_shmem = fd_spad_alloc_check( ctx->runtime_spad, fd_bank_hash_cmp_align(), fd_bank_hash_cmp_footprint() );
2228 0 : ctx->bank_hash_cmp = fd_bank_hash_cmp_join( fd_bank_hash_cmp_new( bank_hash_cmp_shmem ) );
2229 :
2230 0 : fd_cluster_version_t * cluster_version = fd_bank_cluster_version_modify( bank );
2231 :
2232 0 : if( FD_UNLIKELY( sscanf( tile->replay.cluster_version, "%u.%u.%u", &cluster_version->major, &cluster_version->minor, &cluster_version->patch )!=3 ) ) {
2233 0 : FD_LOG_ERR(( "failed to decode cluster version, configured as \"%s\"", tile->replay.cluster_version ));
2234 0 : }
2235 :
2236 0 : fd_features_t * features = fd_bank_features_modify( bank );
2237 0 : fd_features_enable_cleaned_up( features, cluster_version );
2238 :
2239 0 : char const * one_off_features[16];
2240 0 : for (ulong i = 0; i < tile->replay.enable_features_cnt; i++) {
2241 0 : one_off_features[i] = tile->replay.enable_features[i];
2242 0 : }
2243 0 : fd_features_enable_one_offs( features, one_off_features, (uint)tile->replay.enable_features_cnt, 0UL );
2244 :
2245 0 : ctx->forks = fd_forks_join( fd_forks_new( forks_mem, FD_BLOCK_MAX, 42UL ) );
2246 :
2247 : /**********************************************************************/
2248 : /* bank_hash_cmp */
2249 : /**********************************************************************/
2250 :
2251 0 : ulong bank_hash_cmp_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "bh_cmp" );
2252 0 : FD_TEST( bank_hash_cmp_obj_id!=ULONG_MAX );
2253 0 : ctx->bank_hash_cmp = fd_bank_hash_cmp_join( fd_bank_hash_cmp_new( fd_topo_obj_laddr( topo, bank_hash_cmp_obj_id ) ) );
2254 0 : if( FD_UNLIKELY( !ctx->bank_hash_cmp ) ) {
2255 0 : FD_LOG_ERR(( "failed to join bank_hash_cmp" ));
2256 0 : }
2257 :
2258 : /**********************************************************************/
2259 : /* voter */
2260 : /**********************************************************************/
2261 :
2262 0 : memcpy( ctx->validator_identity, fd_keyload_load( tile->replay.identity_key_path, 1 ), sizeof(fd_pubkey_t) );
2263 0 : *ctx->vote_authority = *ctx->validator_identity; /* FIXME */
2264 0 : memcpy( ctx->vote_acc, fd_keyload_load( tile->replay.vote_account_path, 1 ), sizeof(fd_pubkey_t) );
2265 :
2266 0 : ctx->validator_identity_pubkey[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->replay.identity_key_path, 1 ) );
2267 :
2268 : /**********************************************************************/
2269 : /* entry batch */
2270 : /**********************************************************************/
2271 :
2272 0 : fd_slice_exec_join( &ctx->slice_exec_ctx );
2273 0 : ctx->slice_exec_ctx.buf = slice_buf;
2274 :
2275 : /**********************************************************************/
2276 : /* capture */
2277 : /**********************************************************************/
2278 :
2279 0 : if ( strlen(tile->replay.solcap_capture) > 0 || strlen(tile->replay.dump_proto_dir) > 0 ) {
2280 0 : ctx->capture_ctx = fd_capture_ctx_new( capture_ctx_mem );
2281 0 : }
2282 :
2283 0 : if( strlen(tile->replay.solcap_capture) > 0 ) {
2284 0 : ctx->capture_ctx->checkpt_freq = ULONG_MAX;
2285 0 : ctx->capture_file = fopen( tile->replay.solcap_capture, "w+" );
2286 0 : if( FD_UNLIKELY( !ctx->capture_file ) ) {
2287 0 : FD_LOG_ERR(( "fopen(%s) failed (%d-%s)", tile->replay.solcap_capture, errno, strerror( errno ) ));
2288 0 : }
2289 0 : ctx->capture_ctx->capture_txns = 0;
2290 0 : ctx->capture_ctx->solcap_start_slot = tile->replay.capture_start_slot;
2291 0 : fd_solcap_writer_init( ctx->capture_ctx->capture, ctx->capture_file );
2292 0 : }
2293 :
2294 0 : if ( strlen(tile->replay.dump_proto_dir) > 0) {
2295 0 : ctx->capture_ctx->dump_proto_output_dir = tile->replay.dump_proto_dir;
2296 0 : if (tile->replay.dump_block_to_pb) {
2297 0 : ctx->capture_ctx->dump_block_to_pb = tile->replay.dump_block_to_pb;
2298 0 : }
2299 0 : }
2300 :
2301 : /**********************************************************************/
2302 : /* bank */
2303 : /**********************************************************************/
2304 :
2305 0 : ctx->bank_cnt = fd_topo_tile_name_cnt( topo, "bank" );
2306 0 : for( ulong i=0UL; i<(ctx->bank_cnt); i++ ) {
2307 0 : ulong busy_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "bank_busy.%lu", i );
2308 0 : FD_TEST( busy_obj_id!=ULONG_MAX );
2309 0 : ctx->bank_busy[ i ] = fd_fseq_join( fd_topo_obj_laddr( topo, busy_obj_id ) );
2310 0 : if( FD_UNLIKELY( !ctx->bank_busy[ i ] ) ) FD_LOG_ERR(( "banking tile %lu has no busy flag", i ));
2311 :
2312 0 : fd_replay_out_link_t * poh_out = &ctx->bank_out[ i ];
2313 0 : fd_topo_link_t * poh_out_link = &topo->links[ tile->out_link_id[ poh_out->idx+i ] ];
2314 0 : poh_out->mcache = poh_out_link->mcache;
2315 0 : poh_out->sync = fd_mcache_seq_laddr( poh_out->mcache );
2316 0 : poh_out->depth = fd_mcache_depth( poh_out->mcache );
2317 0 : poh_out->seq = fd_mcache_seq_query( poh_out->sync );
2318 0 : poh_out->mem = topo->workspaces[ topo->objs[ poh_out_link->dcache_obj_id ].wksp_id ].wksp;
2319 0 : poh_out->chunk0 = fd_dcache_compact_chunk0( poh_out->mem, poh_out_link->dcache );
2320 0 : poh_out->wmark = fd_dcache_compact_wmark( poh_out->mem, poh_out_link->dcache, poh_out_link->mtu );
2321 0 : poh_out->chunk = poh_out->chunk0;
2322 0 : }
2323 :
2324 0 : ctx->poh_init_done = 0U;
2325 0 : ctx->snapshot_init_done = 0;
2326 :
2327 : /**********************************************************************/
2328 : /* exec */
2329 : /**********************************************************************/
2330 0 : ctx->exec_cnt = fd_topo_tile_name_cnt( topo, "exec" );
2331 0 : if( FD_UNLIKELY( ctx->exec_cnt>FD_PACK_MAX_BANK_TILES ) ) {
2332 0 : FD_LOG_ERR(( "replay tile has too many exec tiles %lu", ctx->exec_cnt ));
2333 0 : }
2334 0 : if( FD_UNLIKELY( ctx->exec_cnt>UCHAR_MAX ) ) {
2335 : /* Exec tile id needs to fit in a uchar for the writer tile txn done
2336 : message. */
2337 0 : FD_LOG_CRIT(( "too many exec tiles %lu", ctx->exec_cnt ));
2338 0 : }
2339 :
2340 0 : for( ulong i = 0UL; i < ctx->exec_cnt; i++ ) {
2341 : /* Mark all initial state as not being ready. */
2342 0 : ctx->exec_ready[ i ] = EXEC_TXN_BUSY;
2343 0 : ctx->prev_ids[ i ] = FD_EXEC_ID_SENTINEL;
2344 :
2345 0 : ulong exec_fseq_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "exec_fseq.%lu", i );
2346 0 : if( FD_UNLIKELY( exec_fseq_id==ULONG_MAX ) ) {
2347 0 : FD_LOG_ERR(( "exec tile %lu has no fseq", i ));
2348 0 : }
2349 0 : ctx->exec_fseq[ i ] = fd_fseq_join( fd_topo_obj_laddr( topo, exec_fseq_id ) );
2350 0 : if( FD_UNLIKELY( !ctx->exec_fseq[ i ] ) ) {
2351 0 : FD_LOG_ERR(( "exec tile %lu has no fseq", i ));
2352 0 : }
2353 :
2354 : /* Setup out links. */
2355 0 : ulong idx = fd_topo_find_tile_out_link( topo, tile, "replay_exec", i );
2356 0 : fd_topo_link_t * exec_out_link = &topo->links[ tile->out_link_id[ idx ] ];
2357 :
2358 0 : if( strcmp( exec_out_link->name, "replay_exec" ) ) {
2359 0 : FD_LOG_ERR(("output link confusion for output %lu", idx ));
2360 0 : }
2361 :
2362 0 : fd_replay_out_link_t * exec_out = &ctx->exec_out[ i ];
2363 0 : exec_out->idx = idx;
2364 0 : exec_out->mem = topo->workspaces[ topo->objs[ exec_out_link->dcache_obj_id ].wksp_id ].wksp;
2365 0 : exec_out->chunk0 = fd_dcache_compact_chunk0( exec_out->mem, exec_out_link->dcache );
2366 0 : exec_out->wmark = fd_dcache_compact_wmark( exec_out->mem, exec_out_link->dcache, exec_out_link->mtu );
2367 0 : exec_out->chunk = exec_out->chunk0;
2368 0 : }
2369 :
2370 : /**********************************************************************/
2371 : /* writer */
2372 : /**********************************************************************/
2373 0 : ctx->writer_cnt = fd_topo_tile_name_cnt( topo, "writer" );
2374 0 : if( FD_UNLIKELY( ctx->writer_cnt>FD_PACK_MAX_BANK_TILES ) ) {
2375 0 : FD_LOG_CRIT(( "replay tile has too many writer tiles %lu", ctx->writer_cnt ));
2376 0 : }
2377 :
2378 0 : for( ulong i = 0UL; i < ctx->writer_cnt; i++ ) {
2379 :
2380 0 : ulong writer_fseq_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "writer_fseq.%lu", i );
2381 0 : if( FD_UNLIKELY( writer_fseq_id==ULONG_MAX ) ) {
2382 0 : FD_LOG_CRIT(( "writer tile %lu has no fseq", i ));
2383 0 : }
2384 0 : ctx->writer_fseq[ i ] = fd_fseq_join( fd_topo_obj_laddr( topo, writer_fseq_id ) );
2385 0 : if( FD_UNLIKELY( !ctx->writer_fseq[ i ] ) ) {
2386 0 : FD_LOG_CRIT(( "writer tile %lu has no fseq", i ));
2387 0 : }
2388 0 : }
2389 :
2390 : /**********************************************************************/
2391 : /* tower checkpointing for wen-restart */
2392 : /**********************************************************************/
2393 0 : ctx->tower_checkpt_fileno = -1;
2394 0 : if( FD_LIKELY( strlen( tile->replay.tower_checkpt )>0 ) ) {
2395 0 : ctx->tower_checkpt_fileno = open( tile->replay.tower_checkpt,
2396 0 : O_RDWR | O_CREAT,
2397 0 : S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH );
2398 0 : if( ctx->tower_checkpt_fileno<0 ) FD_LOG_ERR(( "Failed at opening the tower checkpoint file" ));
2399 0 : }
2400 :
2401 : /**********************************************************************/
2402 : /* links */
2403 : /**********************************************************************/
2404 :
2405 : /* Setup store tile input */
2406 0 : fd_topo_link_t * repair_in_link = &topo->links[ tile->in_link_id[ REPAIR_IN_IDX ] ];
2407 0 : ctx->repair_in_mem = topo->workspaces[ topo->objs[ repair_in_link->dcache_obj_id ].wksp_id ].wksp;
2408 0 : ctx->repair_in_chunk0 = fd_dcache_compact_chunk0( ctx->repair_in_mem, repair_in_link->dcache );
2409 0 : ctx->repair_in_wmark = fd_dcache_compact_wmark( ctx->repair_in_mem, repair_in_link->dcache, repair_in_link->mtu );
2410 :
2411 : /* Setup pack tile input */
2412 0 : fd_topo_link_t * pack_in_link = &topo->links[ tile->in_link_id[ PACK_IN_IDX ] ];
2413 0 : ctx->pack_in_mem = topo->workspaces[ topo->objs[ pack_in_link->dcache_obj_id ].wksp_id ].wksp;
2414 0 : ctx->pack_in_chunk0 = fd_dcache_compact_chunk0( ctx->pack_in_mem, pack_in_link->dcache );
2415 0 : ctx->pack_in_wmark = fd_dcache_compact_wmark( ctx->pack_in_mem, pack_in_link->dcache, pack_in_link->mtu );
2416 :
2417 : /* Setup tower tile input */
2418 0 : ctx->tower_in_idx = fd_topo_find_tile_in_link( topo, tile, "tower_replay", 0 );
2419 0 : if( FD_UNLIKELY( ctx->tower_in_idx==ULONG_MAX ) ) FD_LOG_WARNING(( "replay tile is missing tower input link %lu", ctx->tower_in_idx ));
2420 :
2421 0 : ulong replay_notif_idx = fd_topo_find_tile_out_link( topo, tile, "replay_notif", 0 );
2422 0 : if( FD_UNLIKELY( replay_notif_idx!=ULONG_MAX ) ) {
2423 0 : fd_topo_link_t * notif_out = &topo->links[ tile->out_link_id[ replay_notif_idx ] ];
2424 0 : FD_TEST( notif_out );
2425 0 : ctx->notif_out->idx = replay_notif_idx;
2426 0 : ctx->notif_out->mcache = notif_out->mcache;
2427 0 : ctx->notif_out->sync = fd_mcache_seq_laddr( ctx->notif_out->mcache );
2428 0 : ctx->notif_out->depth = fd_mcache_depth( ctx->notif_out->mcache );
2429 0 : ctx->notif_out->seq = fd_mcache_seq_query( ctx->notif_out->sync );
2430 0 : ctx->notif_out->mem = topo->workspaces[ topo->objs[ notif_out->dcache_obj_id ].wksp_id ].wksp;
2431 0 : ctx->notif_out->chunk0 = fd_dcache_compact_chunk0( ctx->notif_out->mem, notif_out->dcache );
2432 0 : ctx->notif_out->wmark = fd_dcache_compact_wmark ( ctx->notif_out->mem, notif_out->dcache, notif_out->mtu );
2433 0 : ctx->notif_out->chunk = ctx->notif_out->chunk0;
2434 0 : } else {
2435 0 : ctx->notif_out->mcache = NULL;
2436 0 : }
2437 :
2438 : /* Set up stake weights tile output */
2439 0 : ctx->stake_out->idx = fd_topo_find_tile_out_link( topo, tile, "stake_out", 0 );
2440 0 : FD_TEST( ctx->stake_out->idx!=ULONG_MAX );
2441 0 : fd_topo_link_t * stake_weights_out = &topo->links[ tile->out_link_id[ ctx->stake_out->idx] ];
2442 0 : ctx->stake_out->mcache = stake_weights_out->mcache;
2443 0 : ctx->stake_out->mem = topo->workspaces[ topo->objs[ stake_weights_out->dcache_obj_id ].wksp_id ].wksp;
2444 0 : ctx->stake_out->sync = fd_mcache_seq_laddr ( ctx->stake_out->mcache );
2445 0 : ctx->stake_out->depth = fd_mcache_depth ( ctx->stake_out->mcache );
2446 0 : ctx->stake_out->seq = fd_mcache_seq_query ( ctx->stake_out->sync );
2447 0 : ctx->stake_out->chunk0 = fd_dcache_compact_chunk0( ctx->stake_out->mem, stake_weights_out->dcache );
2448 0 : ctx->stake_out->wmark = fd_dcache_compact_wmark ( ctx->stake_out->mem, stake_weights_out->dcache, stake_weights_out->mtu );
2449 0 : ctx->stake_out->chunk = ctx->stake_out->chunk0;
2450 :
2451 0 : ctx->tower_out_idx = fd_topo_find_tile_out_link( topo, tile, "replay_tower", 0 );
2452 0 : if( FD_LIKELY( ctx->tower_out_idx!=ULONG_MAX ) ) {
2453 0 : fd_topo_link_t * tower_out = &topo->links[ tile->out_link_id[ ctx->tower_out_idx ] ];
2454 0 : ctx->tower_out_mem = topo->workspaces[ topo->objs[ tower_out->dcache_obj_id ].wksp_id ].wksp;
2455 0 : ctx->tower_out_chunk0 = fd_dcache_compact_chunk0( ctx->tower_out_mem, tower_out->dcache );
2456 0 : ctx->tower_out_wmark = fd_dcache_compact_wmark ( ctx->tower_out_mem, tower_out->dcache, tower_out->mtu );
2457 0 : ctx->tower_out_chunk = ctx->tower_out_chunk0;
2458 0 : FD_TEST( fd_dcache_compact_is_safe( ctx->tower_out_mem, tower_out->dcache, tower_out->mtu, tower_out->depth ) );
2459 0 : }
2460 :
2461 0 : if( FD_LIKELY( tile->replay.plugins_enabled ) ) {
2462 0 : ctx->plugin_out->idx = fd_topo_find_tile_out_link( topo, tile, "replay_plugi", 0 );
2463 0 : fd_topo_link_t const * replay_plugin_out = &topo->links[ tile->out_link_id[ ctx->plugin_out->idx] ];
2464 0 : if( strcmp( replay_plugin_out->name, "replay_plugi" ) ) {
2465 0 : FD_LOG_ERR(("output link confusion for output %lu", ctx->plugin_out->idx));
2466 0 : }
2467 0 : ctx->plugin_out->mem = topo->workspaces[ topo->objs[ replay_plugin_out->dcache_obj_id ].wksp_id ].wksp;
2468 0 : ctx->plugin_out->chunk0 = fd_dcache_compact_chunk0( ctx->plugin_out->mem, replay_plugin_out->dcache );
2469 0 : ctx->plugin_out->wmark = fd_dcache_compact_wmark ( ctx->plugin_out->mem, replay_plugin_out->dcache, replay_plugin_out->mtu );
2470 0 : ctx->plugin_out->chunk = ctx->plugin_out->chunk0;
2471 :
2472 0 : ctx->votes_plugin_out->idx = fd_topo_find_tile_out_link( topo, tile, "votes_plugin", 0 );
2473 0 : fd_topo_link_t const * votes_plugin_out = &topo->links[ tile->out_link_id[ ctx->votes_plugin_out->idx] ];
2474 0 : if( strcmp( votes_plugin_out->name, "votes_plugin" ) ) {
2475 0 : FD_LOG_ERR(("output link confusion for output %lu", ctx->votes_plugin_out->idx));
2476 0 : }
2477 0 : ctx->votes_plugin_out->mem = topo->workspaces[ topo->objs[ votes_plugin_out->dcache_obj_id ].wksp_id ].wksp;
2478 0 : ctx->votes_plugin_out->chunk0 = fd_dcache_compact_chunk0( ctx->votes_plugin_out->mem, votes_plugin_out->dcache );
2479 0 : ctx->votes_plugin_out->wmark = fd_dcache_compact_wmark ( ctx->votes_plugin_out->mem, votes_plugin_out->dcache, votes_plugin_out->mtu );
2480 0 : ctx->votes_plugin_out->chunk = ctx->votes_plugin_out->chunk0;
2481 0 : }
2482 :
2483 0 : if( strnlen( tile->replay.slots_replayed, sizeof(tile->replay.slots_replayed) )>0UL ) {
2484 0 : ctx->slots_replayed_file = fopen( tile->replay.slots_replayed, "w" );
2485 0 : FD_TEST( ctx->slots_replayed_file );
2486 0 : }
2487 :
2488 0 : FD_TEST( ctx->runtime_public!=NULL );
2489 :
2490 0 : uchar * deque_mem = fd_spad_alloc_check( ctx->runtime_spad, fd_exec_slice_align(), fd_exec_slice_footprint() );
2491 0 : ctx->exec_slice_deque = fd_exec_slice_join( fd_exec_slice_new( deque_mem ) );
2492 0 : if( FD_UNLIKELY( !ctx->exec_slice_deque ) ) {
2493 0 : FD_LOG_ERR(( "failed to join and create exec slice deque" ));
2494 0 : }
2495 :
2496 0 : FD_LOG_NOTICE(("Finished unprivileged init"));
2497 :
2498 0 : ctx->enable_bank_hash_cmp = tile->replay.enable_bank_hash_cmp;
2499 0 : }
2500 :
2501 : static ulong
2502 : populate_allowed_seccomp( fd_topo_t const * topo,
2503 : fd_topo_tile_t const * tile,
2504 : ulong out_cnt,
2505 0 : struct sock_filter * out ) {
2506 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
2507 :
2508 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
2509 0 : fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
2510 0 : FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_replay_tile_ctx_t) );
2511 :
2512 0 : populate_sock_filter_policy_fd_replay_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->blockstore_fd );
2513 0 : return sock_filter_policy_fd_replay_tile_instr_cnt;
2514 0 : }
2515 :
2516 : static ulong
2517 : populate_allowed_fds( fd_topo_t const * topo,
2518 : fd_topo_tile_t const * tile,
2519 : ulong out_fds_cnt,
2520 0 : int * out_fds ) {
2521 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
2522 :
2523 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
2524 0 : fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
2525 0 : FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_replay_tile_ctx_t) );
2526 :
2527 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
2528 :
2529 0 : ulong out_cnt = 0UL;
2530 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
2531 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
2532 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
2533 0 : out_fds[ out_cnt++ ] = ctx->blockstore_fd;
2534 0 : return out_cnt;
2535 0 : }
2536 :
2537 : static inline void
2538 0 : metrics_write( fd_replay_tile_ctx_t * ctx ) {
2539 0 : FD_MGAUGE_SET( REPLAY, LAST_VOTED_SLOT, ctx->metrics.last_voted_slot );
2540 0 : FD_MGAUGE_SET( REPLAY, SLOT, ctx->metrics.slot );
2541 0 : }
2542 :
2543 : /* TODO: This needs to get sized out correctly. */
2544 0 : #define STEM_BURST (64UL)
2545 :
2546 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_replay_tile_ctx_t
2547 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_replay_tile_ctx_t)
2548 :
2549 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
2550 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
2551 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
2552 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
2553 :
2554 : #include "../../disco/stem/fd_stem.c"
2555 :
2556 : fd_topo_run_tile_t fd_tile_replay = {
2557 : .name = "replay",
2558 : .loose_footprint = loose_footprint,
2559 : .populate_allowed_seccomp = populate_allowed_seccomp,
2560 : .populate_allowed_fds = populate_allowed_fds,
2561 : .scratch_align = scratch_align,
2562 : .scratch_footprint = scratch_footprint,
2563 : .privileged_init = privileged_init,
2564 : .unprivileged_init = unprivileged_init,
2565 : .run = stem_run,
2566 : };
|