Line data Source code
1 : #define _GNU_SOURCE
2 : #include "../../disco/tiles.h"
3 : #include "generated/fd_replay_tile_seccomp.h"
4 :
5 : #include "../geyser/fd_replay_notif.h"
6 : #include "../restart/fd_restart.h"
7 : #include "../store/fd_epoch_forks.h"
8 :
9 : #include "../../disco/keyguard/fd_keyload.h"
10 : #include "../../disco/topo/fd_pod_format.h"
11 : #include "../../flamenco/runtime/fd_txncache.h"
12 : #include "../../flamenco/runtime/context/fd_capture_ctx.h"
13 : #include "../../flamenco/runtime/context/fd_exec_epoch_ctx.h"
14 : #include "../../flamenco/runtime/context/fd_exec_slot_ctx.h"
15 : #include "../../flamenco/runtime/program/fd_bpf_program_util.h"
16 : #include "../../flamenco/runtime/sysvar/fd_sysvar_epoch_schedule.h"
17 : #include "../../flamenco/runtime/sysvar/fd_sysvar_slot_history.h"
18 : #include "../../flamenco/runtime/fd_runtime_init.h"
19 : #include "../../flamenco/snapshot/fd_snapshot.h"
20 : #include "../../flamenco/stakes/fd_stakes.h"
21 : #include "../../flamenco/runtime/fd_runtime.h"
22 : #include "../../flamenco/rewards/fd_rewards.h"
23 : #include "../../disco/metrics/fd_metrics.h"
24 : #include "../../choreo/fd_choreo.h"
25 : #include "../../funk/fd_funk_filemap.h"
26 : #include "../../flamenco/snapshot/fd_snapshot_create.h"
27 : #include "../../disco/plugin/fd_plugin.h"
28 : #include "fd_replay.h"
29 :
30 : #include <arpa/inet.h>
31 : #include <errno.h>
32 : #include <fcntl.h>
33 : #include <linux/unistd.h>
34 : #include <netdb.h>
35 : #include <netinet/in.h>
36 : #include <sys/random.h>
37 : #include <sys/socket.h>
38 : #include <sys/stat.h>
39 : #include <sys/types.h>
40 : #include <unistd.h>
41 :
42 : /* An estimate of the max number of transactions in a block. If there are more
43 : transactions, they must be split into multiple sets. */
44 : #define MAX_TXNS_PER_REPLAY ( ( FD_SHRED_MAX_PER_SLOT * FD_SHRED_MAX_SZ) / FD_TXN_MIN_SERIALIZED_SZ )
45 :
46 : #define PLUGIN_PUBLISH_TIME_NS ((long)60e9)
47 :
48 0 : #define STORE_IN_IDX (0UL)
49 0 : #define PACK_IN_IDX (1UL)
50 0 : #define BATCH_IN_IDX (2UL)
51 0 : #define SHRED_IN_IDX (3UL)
52 :
53 0 : #define STAKE_OUT_IDX (0UL)
54 0 : #define NOTIF_OUT_IDX (1UL)
55 0 : #define SENDER_OUT_IDX (2UL)
56 0 : #define POH_OUT_IDX (3UL)
57 :
58 0 : #define VOTE_ACC_MAX (2000000UL)
59 :
60 : #define BANK_HASH_CMP_LG_MAX 16
61 :
62 : struct fd_shred_replay_in_ctx {
63 : fd_wksp_t * mem;
64 : ulong chunk0;
65 : ulong wmark;
66 : };
67 : typedef struct fd_shred_replay_in_ctx fd_shred_replay_in_ctx_t;
68 :
69 : struct fd_replay_out_ctx {
70 : ulong idx; /* TODO refactor the bank_out to use this */
71 :
72 : fd_frag_meta_t * mcache;
73 : ulong * sync;
74 : ulong depth;
75 : ulong seq;
76 :
77 : fd_wksp_t * mem;
78 : ulong chunk0;
79 : ulong wmark;
80 : ulong chunk;
81 :
82 : };
83 : typedef struct fd_replay_out_ctx fd_replay_out_ctx_t;
84 :
85 : struct fd_replay_tile_metrics {
86 : ulong slot;
87 : ulong last_voted_slot;
88 : };
89 : typedef struct fd_replay_tile_metrics fd_replay_tile_metrics_t;
90 : #define FD_REPLAY_TILE_METRICS_FOOTPRINT ( sizeof( fd_replay_tile_metrics_t ) )
91 :
92 : struct fd_slice_exec_ctx {
93 : ulong wmark; /* offset to start executing from. Will be on a transaction or microblock boundary. */
94 : ulong sz; /* total bytes occupied in the mbatch memory. Queried slices should be placed at this offset */
95 : ulong mblks_rem; /* microblocks remaining in the current batch iteration. If 0, the next batch can be read. */
96 : ulong txns_rem; /* txns remaining in current microblock iteration. If 0, the next microblock can be read. */
97 :
98 : ulong last_mblk_off; /* offset to the last microblock hdr seen */
99 : int last_batch; /* signifies last batch execution for stopping condition */
100 : };
101 : typedef struct fd_slice_exec_ctx fd_slice_exec_ctx_t;
102 :
103 : struct fd_replay_tile_ctx {
104 : fd_wksp_t * wksp;
105 : fd_wksp_t * blockstore_wksp;
106 : fd_wksp_t * funk_wksp;
107 : fd_wksp_t * status_cache_wksp;
108 :
109 : fd_wksp_t * replay_public_wksp;
110 : fd_runtime_public_t * replay_public;
111 :
112 : // Store tile input
113 : fd_wksp_t * store_in_mem;
114 : ulong store_in_chunk0;
115 : ulong store_in_wmark;
116 :
117 : // Pack tile input
118 : fd_wksp_t * pack_in_mem;
119 : ulong pack_in_chunk0;
120 : ulong pack_in_wmark;
121 :
122 : // Batch tile input for epoch account hash
123 : fd_wksp_t * batch_in_mem;
124 : ulong batch_in_chunk0;
125 : ulong batch_in_wmark;
126 :
127 : // Shred tile input
128 : ulong shred_in_cnt;
129 : fd_shred_replay_in_ctx_t shred_in[ 32 ];
130 :
131 : // Notification output defs
132 : fd_frag_meta_t * notif_out_mcache;
133 : ulong * notif_out_sync;
134 : ulong notif_out_depth;
135 : ulong notif_out_seq;
136 :
137 : fd_wksp_t * notif_out_mem;
138 : ulong notif_out_chunk0;
139 : ulong notif_out_wmark;
140 : ulong notif_out_chunk;
141 :
142 : // Sender output defs
143 : fd_frag_meta_t * sender_out_mcache;
144 : ulong * sender_out_sync;
145 : ulong sender_out_depth;
146 : ulong sender_out_seq;
147 :
148 : fd_wksp_t * sender_out_mem;
149 : ulong sender_out_chunk0;
150 : ulong sender_out_wmark;
151 : ulong sender_out_chunk;
152 :
153 : // Stake weights output link defs
154 : fd_frag_meta_t * stake_weights_out_mcache;
155 : ulong * stake_weights_out_sync;
156 : ulong stake_weights_out_depth;
157 : ulong stake_weights_out_seq;
158 :
159 : fd_wksp_t * stake_weights_out_mem;
160 : ulong stake_weights_out_chunk0;
161 : ulong stake_weights_out_wmark;
162 : ulong stake_weights_out_chunk;
163 :
164 : // Inputs to plugin/gui
165 : ulong replay_plug_out_idx;
166 : fd_wksp_t * replay_plugin_out_mem;
167 : ulong replay_plugin_out_chunk0;
168 : ulong replay_plugin_out_wmark;
169 : ulong replay_plugin_out_chunk;
170 :
171 : ulong votes_plug_out_idx;
172 : fd_wksp_t * votes_plugin_out_mem;
173 : ulong votes_plugin_out_chunk0;
174 : ulong votes_plugin_out_wmark;
175 : ulong votes_plugin_out_chunk;
176 : long last_plugin_push_time;
177 :
178 : char const * blockstore_checkpt;
179 : int tx_metadata_storage;
180 : char const * funk_checkpt;
181 : char const * genesis;
182 : char const * incremental;
183 : char const * snapshot;
184 :
185 : /* Do not modify order! This is join-order in unprivileged_init. */
186 :
187 : fd_alloc_t * alloc;
188 : fd_valloc_t valloc;
189 : fd_funk_t * funk;
190 : fd_acc_mgr_t * acc_mgr;
191 : fd_exec_epoch_ctx_t * epoch_ctx;
192 : fd_epoch_t * epoch;
193 : fd_forks_t * forks;
194 : fd_ghost_t * ghost;
195 : fd_tower_t * tower;
196 : fd_replay_t * replay;
197 :
198 : fd_pubkey_t validator_identity[1];
199 : fd_pubkey_t vote_authority[1];
200 : fd_pubkey_t vote_acc[1];
201 :
202 : /* Vote accounts in the current epoch. Lifetimes of the vote account
203 : addresses (pubkeys) are valid for the epoch (the pubkey memory is
204 : owned by the epoch bank). */
205 :
206 : fd_voter_t * epoch_voters; /* map chain of slot->voter */
207 : fd_bank_hash_cmp_t * bank_hash_cmp;
208 :
209 : /* Microblock (entry) batch buffer for replay. */
210 :
211 : uchar * mbatch;
212 : fd_slice_exec_ctx_t slice_exec_ctx;
213 :
214 : /* Tpool */
215 :
216 : uchar tpool_mem[FD_TPOOL_FOOTPRINT( FD_TILE_MAX )] __attribute__( ( aligned( FD_TPOOL_ALIGN ) ) );
217 : fd_tpool_t * tpool;
218 :
219 : /* Depends on store_int and is polled in after_credit */
220 :
221 : fd_blockstore_t blockstore_ljoin;
222 : int blockstore_fd; /* file descriptor for archival file */
223 : fd_blockstore_t * blockstore;
224 :
225 : /* Updated during execution */
226 :
227 : fd_exec_slot_ctx_t * slot_ctx;
228 :
229 : /* Metadata updated during execution */
230 :
231 : ulong curr_slot;
232 : ulong parent_slot;
233 : ulong snapshot_slot;
234 : ulong last_completed_slot; /* questionable variable used for making sure we do post-block execution steps only once,
235 : probably can remove this if after we rip out ctx->curr_slot (recieved from STORE) */
236 : fd_hash_t blockhash;
237 : ulong flags;
238 : ulong txn_cnt;
239 : ulong bank_idx;
240 :
241 : ulong fecs_inserted;
242 : ulong fecs_removed;
243 : /* Other metadata */
244 :
245 : ulong funk_seed;
246 : ulong status_cache_seed;
247 : fd_capture_ctx_t * capture_ctx;
248 : FILE * capture_file;
249 : FILE * slots_replayed_file;
250 :
251 : int skip_frag;
252 :
253 : ulong * first_turbine;
254 :
255 : ulong * bank_busy[ FD_PACK_MAX_BANK_TILES ];
256 : ulong bank_cnt;
257 : fd_replay_out_ctx_t bank_out[ FD_PACK_MAX_BANK_TILES ]; /* Sending to PoH finished txns + a couple more tasks ??? */
258 :
259 : ulong exec_cnt;
260 : ulong exec_out_idx;
261 : fd_replay_out_ctx_t exec_out[ FD_PACK_MAX_BANK_TILES ]; /* Sending to exec unexecuted txns */
262 :
263 : ulong root; /* the root slot is the most recent slot to have reached
264 : max lockout in the tower */
265 :
266 : ulong * published_wmark; /* publish watermark. The watermark is defined as the
267 : minimum of the tower root (root above) and blockstore
268 : smr (blockstore->smr). The watermark is used to
269 : publish our fork-aware structures eg. blockstore,
270 : forks, ghost. In general, publishing has the effect of
271 : pruning minority forks in those structures,
272 : indicating that is ok to release the memory being
273 : occupied by said forks.
274 :
275 : The reason it has to be the minimum of the two, is the
276 : tower root can lag the SMR and vice versa, but both
277 : the fork-aware structures need to maintain information
278 : through both of those slots. */
279 :
280 : ulong * poh; /* proof-of-history slot */
281 : uint poh_init_done;
282 : int snapshot_init_done;
283 :
284 : int tower_checkpt_fileno;
285 :
286 : int vote;
287 : fd_pubkey_t validator_identity_pubkey[ 1 ];
288 : fd_pubkey_t vote_acct_addr[ 1 ];
289 :
290 : fd_txncache_t * status_cache;
291 : void * bmtree[ FD_PACK_MAX_BANK_TILES ];
292 :
293 : fd_epoch_forks_t epoch_forks[1];
294 :
295 : /* The spad allocators used by the executor tiles are NOT the same as the
296 : spad used for general, longer-lasting spad allocations. The lifetime of
297 : the exec spad is just through an execution. The runtime spad is scoped
298 : to the runtime. The top-most frame will persist for the entire duration
299 : of the process. There will also be a potential second frame that persists
300 : across multiple slots that is created for rewards distrobution. Every other
301 : spad frame should NOT exist beyond the scope of a block. */
302 : fd_spad_t * exec_spads[ 128UL ];
303 : ulong exec_spad_cnt;
304 : fd_spad_t * runtime_spad;
305 :
306 : /* TODO: refactor this all into fd_replay_tile_snapshot_ctx_t. */
307 : ulong snapshot_interval; /* User defined parameter */
308 : ulong incremental_interval; /* User defined parameter */
309 : ulong last_full_snap; /* If a full snapshot has been produced */
310 : ulong * is_constipated; /* Shared fseq to determine if funk should be constipated */
311 : ulong prev_full_snapshot_dist; /* Tracking for snapshot creation */
312 : ulong prev_incr_snapshot_dist; /* Tracking for incremental snapshot creation */
313 : ulong double_constipation_slot; /* Tracking for double constipation if any */
314 :
315 : fd_funk_txn_t * false_root;
316 : fd_funk_txn_t * second_false_root;
317 :
318 : int is_caught_up;
319 :
320 : /* Metrics */
321 : fd_replay_tile_metrics_t metrics;
322 : };
323 : typedef struct fd_replay_tile_ctx fd_replay_tile_ctx_t;
324 :
325 : FD_FN_CONST static inline ulong
326 0 : scratch_align( void ) {
327 0 : return 128UL;
328 0 : }
329 :
330 : FD_FN_PURE static inline ulong
331 0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
332 0 : return 24UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
333 0 : }
334 :
335 : FD_FN_PURE static inline ulong
336 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
337 :
338 : /* Do not modify order! This is join-order in unprivileged_init. */
339 :
340 0 : ulong l = FD_LAYOUT_INIT;
341 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
342 0 : l = FD_LAYOUT_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
343 0 : l = FD_LAYOUT_APPEND( l, FD_CAPTURE_CTX_ALIGN, FD_CAPTURE_CTX_FOOTPRINT );
344 0 : l = FD_LAYOUT_APPEND( l, fd_epoch_align(), fd_epoch_footprint( FD_VOTER_MAX ) );
345 0 : l = FD_LAYOUT_APPEND( l, fd_forks_align(), fd_forks_footprint( FD_BLOCK_MAX ) );
346 0 : l = FD_LAYOUT_APPEND( l, fd_ghost_align(), fd_ghost_footprint( FD_BLOCK_MAX ) );
347 0 : l = FD_LAYOUT_APPEND( l, fd_replay_align(), fd_replay_footprint( tile->replay.fec_max, FD_SHRED_MAX_PER_SLOT, FD_BLOCK_MAX ) );
348 0 : l = FD_LAYOUT_APPEND( l, fd_tower_align(), fd_tower_footprint() );
349 0 : l = FD_LAYOUT_APPEND( l, fd_bank_hash_cmp_align(), fd_bank_hash_cmp_footprint( ) );
350 0 : for( ulong i = 0UL; i<FD_PACK_MAX_BANK_TILES; i++ ) {
351 0 : l = FD_LAYOUT_APPEND( l, FD_BMTREE_COMMIT_ALIGN, FD_BMTREE_COMMIT_FOOTPRINT(0) );
352 0 : }
353 0 : l = FD_LAYOUT_APPEND( l, 128UL, FD_SLICE_MAX );
354 0 : ulong thread_spad_size = fd_spad_footprint( FD_RUNTIME_TRANSACTION_EXECUTION_FOOTPRINT_DEFAULT );
355 0 : l = FD_LAYOUT_APPEND( l, fd_spad_align(), tile->replay.tpool_thread_count * fd_ulong_align_up( thread_spad_size, fd_spad_align() ) );
356 0 : l = FD_LAYOUT_APPEND( l, fd_spad_align(), FD_RUNTIME_BLOCK_EXECUTION_FOOTPRINT ); /* FIXME: make this configurable */
357 0 : l = FD_LAYOUT_FINI ( l, scratch_align() );
358 0 : return l;
359 0 : }
360 :
361 : static void
362 : hash_transactions( void * mem,
363 : fd_txn_p_t * txns,
364 : ulong txn_cnt,
365 0 : uchar * mixin ) {
366 0 : fd_bmtree_commit_t * bmtree = fd_bmtree_commit_init( mem, 32UL, 1UL, 0UL );
367 0 : for( ulong i=0; i<txn_cnt; i++ ) {
368 0 : fd_txn_p_t * _txn = txns + i;
369 0 : if( FD_UNLIKELY( !(_txn->flags & FD_TXN_P_FLAGS_EXECUTE_SUCCESS) ) ) continue;
370 :
371 0 : fd_txn_t * txn = TXN(_txn);
372 0 : for( ulong j=0; j<txn->signature_cnt; j++ ) {
373 0 : fd_bmtree_node_t node[1];
374 0 : fd_bmtree_hash_leaf( node, _txn->payload+txn->signature_off+64UL*j, 64UL, 1UL );
375 0 : fd_bmtree_commit_append( bmtree, node, 1UL );
376 0 : }
377 0 : }
378 0 : uchar * root = fd_bmtree_commit_fini( bmtree );
379 0 : fd_memcpy( mixin, root, 32UL );
380 0 : }
381 :
382 : void
383 : publish_stake_weights( fd_replay_tile_ctx_t * ctx,
384 : fd_stem_context_t * stem,
385 0 : fd_exec_slot_ctx_t * slot_ctx ) {
386 0 : fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( slot_ctx->epoch_ctx );
387 0 : if( slot_ctx->slot_bank.epoch_stakes.vote_accounts_root!=NULL ) {
388 0 : ulong * stake_weights_msg = fd_chunk_to_laddr( ctx->stake_weights_out_mem,
389 0 : ctx->stake_weights_out_chunk );
390 0 : fd_stake_weight_t * stake_weights = (fd_stake_weight_t *)&stake_weights_msg[5];
391 0 : ulong stake_weight_idx = fd_stake_weights_by_node( &ctx->slot_ctx->slot_bank.epoch_stakes,
392 0 : stake_weights,
393 0 : ctx->runtime_spad );
394 :
395 0 : stake_weights_msg[0] = fd_slot_to_leader_schedule_epoch( &epoch_bank->epoch_schedule, slot_ctx->slot_bank.slot ) - 1; /* epoch */
396 0 : stake_weights_msg[1] = stake_weight_idx; /* staked_cnt */
397 0 : stake_weights_msg[2] = fd_epoch_slot0( &epoch_bank->epoch_schedule, stake_weights_msg[0] ); /* start_slot */
398 0 : stake_weights_msg[3] = epoch_bank->epoch_schedule.slots_per_epoch; /* slot_cnt */
399 0 : stake_weights_msg[4] = 0UL; /* excluded stake */
400 0 : FD_LOG_NOTICE(("sending current epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
401 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
402 :
403 0 : ulong stake_weights_sz = 5*sizeof(ulong) + (stake_weight_idx * sizeof(fd_stake_weight_t));
404 0 : ulong stake_weights_sig = 4UL;
405 0 : fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_weights_out_chunk, stake_weights_sz, 0UL, 0UL, tspub );
406 0 : ctx->stake_weights_out_chunk = fd_dcache_compact_next( ctx->stake_weights_out_chunk, stake_weights_sz, ctx->stake_weights_out_chunk0, ctx->stake_weights_out_wmark );
407 0 : }
408 :
409 0 : if( epoch_bank->next_epoch_stakes.vote_accounts_root!=NULL ) {
410 0 : ulong * stake_weights_msg = fd_chunk_to_laddr( ctx->stake_weights_out_mem, ctx->stake_weights_out_chunk );
411 0 : fd_stake_weight_t * stake_weights = (fd_stake_weight_t *)&stake_weights_msg[5];
412 0 : ulong stake_weight_idx = fd_stake_weights_by_node( &epoch_bank->next_epoch_stakes, stake_weights, ctx->runtime_spad );
413 :
414 0 : stake_weights_msg[0] = fd_slot_to_leader_schedule_epoch( &epoch_bank->epoch_schedule,
415 0 : slot_ctx->slot_bank.slot ); /* epoch */
416 0 : stake_weights_msg[1] = stake_weight_idx; /* staked_cnt */
417 0 : stake_weights_msg[2] = fd_epoch_slot0( &epoch_bank->epoch_schedule, stake_weights_msg[0] ); /* start_slot */
418 0 : stake_weights_msg[3] = epoch_bank->epoch_schedule.slots_per_epoch; /* slot_cnt */
419 0 : stake_weights_msg[4] = 0UL; /* excluded stake */
420 0 : FD_LOG_NOTICE(("sending next epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
421 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
422 :
423 0 : ulong stake_weights_sz = 5*sizeof(ulong) + (stake_weight_idx * sizeof(fd_stake_weight_t));
424 0 : ulong stake_weights_sig = 4UL;
425 0 : fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_weights_out_chunk, stake_weights_sz, 0UL, 0UL, tspub );
426 0 : ctx->stake_weights_out_chunk = fd_dcache_compact_next( ctx->stake_weights_out_chunk, stake_weights_sz, ctx->stake_weights_out_chunk0, ctx->stake_weights_out_wmark );
427 0 : }
428 0 : }
429 :
430 : /* Polls the blockstore block info object for newly completed slices of
431 : slot. Adds it to the tail of slice_deque (which should be the
432 : slice_deque object of the slot, slice_map[slot]) */
433 :
434 : int
435 : slice_poll( fd_replay_tile_ctx_t * ctx,
436 : fd_replay_slice_t * slice_deque,
437 0 : ulong slot ) {
438 0 : uint consumed_idx, slices_added;
439 0 : for(;;) { /* speculative query */
440 0 : fd_block_map_query_t query[1] = { 0 };
441 0 : int err = fd_block_map_query_try( ctx->blockstore->block_map, &slot, NULL, query, 0 );
442 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
443 :
444 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return 0;
445 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
446 :
447 0 : consumed_idx = block_info->consumed_idx;
448 0 : slices_added = 0;
449 :
450 0 : if( FD_UNLIKELY( block_info->buffered_idx == UINT_MAX ) ) return 1;
451 :
452 0 : for( uint idx = consumed_idx + 1; idx <= block_info->buffered_idx; idx++ ) {
453 0 : if( FD_UNLIKELY( fd_block_set_test( block_info->data_complete_idxs, idx ) ) ) {
454 0 : slices_added++;
455 0 : fd_replay_slice_deque_push_tail( slice_deque->deque, ((ulong)(consumed_idx + 1) << 32) | ((ulong)idx) );
456 0 : FD_LOG_INFO(( "adding slice replay: slot %lu, slice start: %u, slice end: %u", slot, consumed_idx + 1, idx ));
457 0 : consumed_idx = idx;
458 0 : }
459 0 : }
460 0 : if( FD_UNLIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) break;
461 : /* need to dequeue and try again speculatively */
462 0 : for( uint i = 0; i < slices_added; i++ ) {
463 0 : fd_replay_slice_deque_pop_tail( slice_deque->deque );
464 0 : }
465 0 : }
466 :
467 0 : if( slices_added ){
468 0 : fd_block_map_query_t query[1] = { 0 };
469 0 : fd_block_map_prepare( ctx->blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
470 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
471 0 : block_info->consumed_idx = consumed_idx;
472 0 : fd_block_map_publish( query );
473 0 : return 1;
474 0 : }
475 0 : return 0;
476 0 : }
477 :
478 : static int
479 : before_frag( fd_replay_tile_ctx_t * ctx,
480 : ulong in_idx,
481 : ulong seq,
482 0 : ulong sig ) {
483 0 : (void)ctx;
484 0 : (void)seq;
485 :
486 0 : if( in_idx == SHRED_IN_IDX ) {
487 : //FD_LOG_NOTICE(( "in_idx: %lu, seq: %lu, sig: %lu", in_idx, seq, sig ));
488 :
489 0 : ulong slot = fd_disco_shred_replay_sig_slot ( sig );
490 0 : uint shred_idx = fd_disco_shred_replay_sig_shred_idx ( sig );
491 0 : uint fec_set_idx = fd_disco_shred_replay_sig_fec_set_idx( sig );
492 0 : int is_code = fd_disco_shred_replay_sig_is_code ( sig );
493 0 : int completes = fd_disco_shred_replay_sig_completes ( sig );
494 :
495 0 : fd_replay_fec_t * fec = fd_replay_fec_query( ctx->replay, slot, fec_set_idx );
496 0 : if( FD_UNLIKELY( !fec ) ) { /* first time receiving a shred for this FEC set */
497 0 : fec = fd_replay_fec_insert( ctx->replay, slot, fec_set_idx );
498 0 : ctx->fecs_inserted++;
499 : /* TODO implement eviction */
500 0 : }
501 :
502 : /* If the FEC set is complete we don't need to track it anymore. */
503 :
504 0 : if( FD_UNLIKELY( completes ) ) {
505 0 : fd_replay_slice_t * slice_deque = fd_replay_slice_map_query( ctx->replay->slice_map, slot, NULL );
506 :
507 0 : if( FD_UNLIKELY( !slice_deque ) ) slice_deque = fd_replay_slice_map_insert( ctx->replay->slice_map, slot ); /* create new map entry for this slot */
508 :
509 0 : FD_LOG_INFO(( "removing FEC set %u from slot %lu", fec_set_idx, slot ));
510 0 : fd_replay_fec_remove( ctx->replay, slot, fec_set_idx );
511 0 : ctx->fecs_removed++;
512 0 : slice_poll( ctx, slice_deque, slot );
513 0 : return 1; /* skip frag */
514 0 : }
515 :
516 : /* If it is a coding shred, check if it is the first coding shred
517 : we're receiving. We know it's the first if data_cnt is 0 because
518 : that is not a valid cnt and means it's uninitialized. */
519 :
520 0 : if( FD_LIKELY( is_code ) ) { /* optimize for |code| >= |data| */
521 0 : return fec->data_cnt != 0; /* process frag (shred hdr) if it's the first coding shred */
522 0 : } else {
523 0 : uint i = shred_idx - fec_set_idx;
524 0 : fd_replay_fec_idxs_insert( fec->idxs, i ); /* mark ith data shred as received */
525 0 : return 1; /* skip frag */
526 0 : }
527 0 : }
528 :
529 0 : return 0; /* non-shred in - don't skip */
530 0 : }
531 :
532 : static void
533 : during_frag( fd_replay_tile_ctx_t * ctx,
534 : ulong in_idx,
535 : ulong seq FD_PARAM_UNUSED,
536 : ulong sig,
537 : ulong chunk,
538 : ulong sz,
539 0 : ulong ctl FD_PARAM_UNUSED ) {
540 :
541 0 : ctx->skip_frag = 0;
542 :
543 0 : if( in_idx == STORE_IN_IDX ) {
544 0 : if( FD_UNLIKELY( chunk<ctx->store_in_chunk0 || chunk>ctx->store_in_wmark || sz>MAX_TXNS_PER_REPLAY ) ) {
545 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->store_in_chunk0, ctx->store_in_wmark ));
546 0 : }
547 0 : uchar * src = (uchar *)fd_chunk_to_laddr( ctx->store_in_mem, chunk );
548 : /* Incoming packet from store tile. Format:
549 : Parent slot (ulong - 8 bytes)
550 : Updated block hash/PoH hash (fd_hash_t - 32 bytes)
551 : Microblock as a list of fd_txn_p_t (sz * sizeof(fd_txn_p_t)) */
552 :
553 0 : ctx->curr_slot = fd_disco_replay_old_sig_slot( sig );
554 : /* slot changes */
555 0 : if( FD_UNLIKELY( ctx->curr_slot < fd_fseq_query( ctx->published_wmark ) ) ) {
556 0 : FD_LOG_WARNING(( "store sent slot %lu before our root.", ctx->curr_slot ));
557 0 : }
558 0 : ctx->flags = 0; //fd_disco_replay_old_sig_flags( sig );
559 0 : ctx->txn_cnt = sz;
560 :
561 0 : ctx->parent_slot = FD_LOAD( ulong, src );
562 0 : src += sizeof(ulong);
563 0 : memcpy( ctx->blockhash.uc, src, sizeof(fd_hash_t) );
564 0 : src += sizeof(fd_hash_t);
565 0 : ctx->bank_idx = 0UL;
566 0 : fd_replay_out_ctx_t * bank_out = &ctx->bank_out[ ctx->bank_idx ];
567 0 : uchar * dst_poh = fd_chunk_to_laddr( bank_out->mem, bank_out->chunk );
568 0 : fd_memcpy( dst_poh, src, sz * sizeof(fd_txn_p_t) );
569 :
570 0 : FD_LOG_INFO(( "other microblock - slot: %lu, parent_slot: %lu, txn_cnt: %lu", ctx->curr_slot, ctx->parent_slot, sz ));
571 0 : } else if( in_idx == PACK_IN_IDX ) {
572 0 : if( FD_UNLIKELY( chunk<ctx->pack_in_chunk0 || chunk>ctx->pack_in_wmark || sz>USHORT_MAX ) ) {
573 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->pack_in_chunk0, ctx->pack_in_wmark ));
574 0 : }
575 0 : uchar * src = (uchar *)fd_chunk_to_laddr( ctx->pack_in_mem, chunk );
576 : /* Incoming packet from pack tile. Format:
577 : Microblock as a list of fd_txn_p_t (sz * sizeof(fd_txn_p_t))
578 : Microblock bank trailer
579 : */
580 0 : ctx->curr_slot = fd_disco_poh_sig_slot( sig );
581 0 : if( FD_UNLIKELY( ctx->curr_slot < fd_fseq_query( ctx->published_wmark ) ) ) {
582 0 : FD_LOG_WARNING(( "pack sent slot %lu before our watermark %lu.", ctx->curr_slot, fd_fseq_query( ctx->published_wmark ) ));
583 0 : }
584 0 : if( fd_disco_poh_sig_pkt_type( sig )==POH_PKT_TYPE_MICROBLOCK ) {
585 0 : ulong bank_idx = fd_disco_poh_sig_bank_tile( sig );
586 0 : fd_replay_out_ctx_t * bank_out = &ctx->bank_out[ bank_idx ];
587 0 : uchar * dst_poh = fd_chunk_to_laddr( bank_out->mem, bank_out->chunk );
588 0 : ctx->flags = REPLAY_FLAG_PACKED_MICROBLOCK;
589 0 : ctx->txn_cnt = (sz - sizeof(fd_microblock_bank_trailer_t)) / sizeof(fd_txn_p_t);
590 0 : ctx->bank_idx = bank_idx;
591 0 : fd_memcpy( dst_poh, src, (sz - sizeof(fd_microblock_bank_trailer_t)) );
592 0 : src += (sz-sizeof(fd_microblock_bank_trailer_t));
593 0 : dst_poh += (sz - sizeof(fd_microblock_bank_trailer_t));
594 0 : fd_microblock_bank_trailer_t * t = (fd_microblock_bank_trailer_t *)src;
595 0 : ctx->parent_slot = (ulong)t->bank;
596 0 : } else {
597 0 : FD_LOG_WARNING(("OTHER PACKET TYPE: %lu", fd_disco_poh_sig_pkt_type( sig )));
598 0 : ctx->skip_frag = 1;
599 0 : return;
600 0 : }
601 :
602 0 : FD_LOG_DEBUG(( "packed microblock - slot: %lu, parent_slot: %lu, txn_cnt: %lu", ctx->curr_slot, ctx->parent_slot, ctx->txn_cnt ));
603 0 : } else if( in_idx==BATCH_IN_IDX ) {
604 0 : uchar * src = (uchar *)fd_chunk_to_laddr( ctx->batch_in_mem, chunk );
605 0 : fd_memcpy( ctx->slot_ctx->slot_bank.epoch_account_hash.uc, src, sizeof(fd_hash_t) );
606 0 : FD_LOG_NOTICE(( "Epoch account hash calculated to be %s", FD_BASE58_ENC_32_ALLOCA( ctx->slot_ctx->slot_bank.epoch_account_hash.uc ) ));
607 0 : } else if ( in_idx >= SHRED_IN_IDX ) {
608 :
609 0 : fd_shred_replay_in_ctx_t * shred_in = &ctx->shred_in[ in_idx-SHRED_IN_IDX ];
610 0 : if( FD_UNLIKELY( chunk<shred_in->chunk0 || chunk>shred_in->wmark || sz > sizeof(fd_shred34_t) ) ) {
611 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, shred_in->chunk0 , shred_in->wmark ));
612 0 : }
613 : // uchar * src = (uchar *)fd_chunk_to_laddr( shred_in->mem, chunk );
614 : // fd_memcpy( (uchar *)ctx->shred, src, sz ); /* copy the hdr to read the code_cnt & data_cnt */
615 :
616 0 : ctx->skip_frag = 1;
617 :
618 0 : return;
619 0 : }
620 : // if( ctx->flags & REPLAY_FLAG_PACKED_MICROBLOCK ) {
621 : // /* We do not know the parent slot, pick one from fork selection */
622 : // ulong max_slot = 0; /* FIXME: default to snapshot slot/smr */
623 : // for( fd_fork_frontier_iter_t iter = fd_fork_frontier_iter_init( ctx->forks->frontier, ctx->forks->pool );
624 : // !fd_fork_frontier_iter_done( iter, ctx->forks->frontier, ctx->forks->pool );
625 : // iter = fd_fork_frontier_iter_next( iter, ctx->forks->frontier, ctx->forks->pool ) ) {
626 : // fd_exec_slot_ctx_t * ele = &fd_fork_frontier_iter_ele( iter, ctx->forks->frontier, ctx->forks->pool )->slot_ctx;
627 : // if ( max_slot < ele->slot_bank.slot ) {
628 : // max_slot = ele->slot_bank.slot;
629 : // }
630 : // }
631 : // ctx->parent_slot = max_slot;
632 : // }
633 :
634 0 : uchar block_flags = 0;
635 0 : int err = FD_MAP_ERR_AGAIN;
636 0 : while( err == FD_MAP_ERR_AGAIN ){
637 0 : fd_block_map_query_t quer[1] = { 0 };
638 0 : err = fd_block_map_query_try( ctx->blockstore->block_map, &ctx->curr_slot, NULL, quer, 0 );
639 0 : fd_block_info_t * block_info = fd_block_map_query_ele( quer );
640 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
641 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY )) break;
642 0 : block_flags = block_info->flags;
643 0 : err = fd_block_map_query_test( quer );
644 0 : }
645 :
646 0 : if( FD_UNLIKELY( fd_uchar_extract_bit( block_flags, FD_BLOCK_FLAG_PROCESSED ) ) ) {
647 0 : FD_LOG_WARNING(( "block already processed - slot: %lu", ctx->curr_slot ));
648 0 : ctx->skip_frag = 1;
649 0 : }
650 0 : if( FD_UNLIKELY( fd_uchar_extract_bit( block_flags, FD_BLOCK_FLAG_DEADBLOCK ) ) ) {
651 0 : FD_LOG_WARNING(( "block already dead - slot: %lu", ctx->curr_slot ));
652 0 : ctx->skip_frag = 1;
653 0 : }
654 0 : }
655 :
656 : static void
657 0 : checkpt( fd_replay_tile_ctx_t * ctx ) {
658 0 : if( FD_UNLIKELY( ctx->slots_replayed_file ) ) fclose( ctx->slots_replayed_file );
659 0 : if( FD_UNLIKELY( strcmp( ctx->blockstore_checkpt, "" ) ) ) {
660 0 : int rc = fd_wksp_checkpt( ctx->blockstore_wksp, ctx->blockstore_checkpt, 0666, 0, NULL );
661 0 : if( rc ) {
662 0 : FD_LOG_ERR( ( "blockstore checkpt failed: error %d", rc ) );
663 0 : }
664 0 : }
665 0 : int rc = fd_wksp_checkpt( ctx->funk_wksp, ctx->funk_checkpt, 0666, 0, NULL );
666 0 : if( rc ) {
667 0 : FD_LOG_ERR( ( "funk checkpt failed: error %d", rc ) );
668 0 : }
669 0 : }
670 :
671 : static void
672 0 : funk_cancel( fd_replay_tile_ctx_t * ctx, ulong mismatch_slot ) {
673 0 : fd_funk_txn_xid_t xid = { .ul = { mismatch_slot, mismatch_slot } };
674 0 : fd_funk_txn_t * txn_map = fd_funk_txn_map( ctx->funk, fd_funk_wksp( ctx->funk ) );
675 0 : fd_funk_txn_t * mismatch_txn = fd_funk_txn_query( &xid, txn_map );
676 0 : fd_funk_start_write( ctx->funk );
677 0 : FD_TEST( fd_funk_txn_cancel( ctx->funk, mismatch_txn, 1 ) );
678 0 : fd_funk_end_write( ctx->funk );
679 0 : }
680 :
681 : struct fd_status_check_ctx {
682 : fd_slot_history_t * slot_history;
683 : fd_txncache_t * txncache;
684 : ulong current_slot;
685 : };
686 : typedef struct fd_status_check_ctx fd_status_check_ctx_t;
687 :
688 : static void
689 : txncache_publish( fd_replay_tile_ctx_t * ctx,
690 : fd_funk_txn_t * txn_map,
691 : fd_funk_txn_t * to_root_txn,
692 0 : fd_funk_txn_t * rooted_txn ) {
693 :
694 :
695 : /* For the status cache, we stop rooting until the status cache has been
696 : written out to the current snapshot. We also need to iterate up the
697 : funk transaction tree up until the current "root" to figure out what slots
698 : should be registered. This root can correspond to the latest false root if
699 : one exists. */
700 :
701 :
702 0 : if( FD_UNLIKELY( !ctx->slot_ctx->status_cache ) ) {
703 0 : return;
704 0 : }
705 :
706 0 : fd_funk_txn_t * txn = to_root_txn;
707 0 : while( txn!=rooted_txn ) {
708 0 : ulong slot = txn->xid.ul[0];
709 0 : if( FD_LIKELY( !fd_txncache_get_is_constipated( ctx->slot_ctx->status_cache ) ) ) {
710 0 : FD_LOG_INFO(( "Registering slot %lu", slot ));
711 0 : fd_txncache_register_root_slot( ctx->slot_ctx->status_cache, slot );
712 0 : } else {
713 0 : FD_LOG_INFO(( "Registering constipated slot %lu", slot ));
714 0 : fd_txncache_register_constipated_slot( ctx->slot_ctx->status_cache, slot );
715 0 : }
716 0 : txn = fd_funk_txn_parent( txn, txn_map );
717 0 : }
718 0 : }
719 :
720 : static void
721 0 : snapshot_state_update( fd_replay_tile_ctx_t * ctx, ulong wmk ) {
722 :
723 : /* We are ready for a snapshot if either we are on or just passed a snapshot
724 : interval and no snapshot is currently in progress. This is to handle the
725 : case where the snapshot interval falls on a skipped slot.
726 :
727 : We are ready to create a snapshot if:
728 : 1. The node is caught up to the network.
729 : 2. There is currently no snapshot in progress
730 : 3. The current slot is at the snapshot interval OR
731 : The current slot has passed a snapshot interval
732 :
733 : If a snapshot is ready to be created we will constipate funk and the
734 : status cache. This will also notify the status cache via the funk
735 : constipation fseq. */
736 :
737 0 : if( ctx->snapshot_interval==ULONG_MAX ) {
738 0 : return;
739 0 : }
740 :
741 0 : uchar is_constipated = fd_fseq_query( ctx->is_constipated ) != 0UL;
742 :
743 0 : if( !ctx->is_caught_up ) {
744 0 : return;
745 0 : }
746 :
747 0 : if( is_constipated ) {
748 0 : return;
749 0 : }
750 :
751 0 : fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->slot_ctx->epoch_ctx );
752 :
753 : /* The distance from the last snapshot should only grow until we skip
754 : past the last full snapshot. If it has shrunk that means we skipped
755 : over the snapshot interval. */
756 0 : ulong curr_full_snapshot_dist = wmk % ctx->snapshot_interval;
757 0 : uchar is_full_snapshot_ready = curr_full_snapshot_dist < ctx->prev_full_snapshot_dist;
758 0 : ctx->prev_full_snapshot_dist = curr_full_snapshot_dist;
759 :
760 : /* Do the same for incrementals, only try to create one if there has been
761 : a full snapshot. */
762 :
763 0 : ulong curr_incr_snapshot_dist = wmk % ctx->incremental_interval;
764 :
765 0 : uchar is_inc_snapshot_ready = wmk % ctx->incremental_interval < ctx->prev_incr_snapshot_dist && ctx->last_full_snap;
766 0 : ctx->prev_incr_snapshot_dist = curr_incr_snapshot_dist;
767 :
768 0 : ulong updated_fseq = 0UL;
769 :
770 : /* TODO: We need a better check if the wmk fell on an epoch boundary due to
771 : skipped slots. We just don't want to make a snapshot on an epoch boundary */
772 0 : if( (is_full_snapshot_ready || is_inc_snapshot_ready) &&
773 0 : !fd_runtime_is_epoch_boundary( epoch_bank, wmk, wmk-1UL ) ) {
774 : /* Constipate the status cache when a snapshot is ready to be created. */
775 0 : if( is_full_snapshot_ready ) {
776 0 : ctx->last_full_snap = wmk;
777 0 : FD_LOG_NOTICE(( "Ready to create a full snapshot" ));
778 0 : updated_fseq = fd_batch_fseq_pack( 1, 0, wmk );
779 0 : } else {
780 0 : FD_LOG_NOTICE(( "Ready to create an incremental snapshot" ));
781 0 : updated_fseq = fd_batch_fseq_pack( 1, 1, wmk );
782 0 : }
783 0 : fd_txncache_set_is_constipated( ctx->slot_ctx->status_cache, 1 );
784 0 : fd_fseq_update( ctx->is_constipated, updated_fseq );
785 0 : }
786 0 : }
787 :
788 : static void
789 : funk_publish( fd_replay_tile_ctx_t * ctx,
790 : fd_funk_txn_t * to_root_txn,
791 : fd_funk_txn_t * txn_map,
792 : ulong wmk,
793 0 : uchar is_constipated ) {
794 :
795 0 : fd_funk_start_write( ctx->funk );
796 :
797 0 : fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->slot_ctx->epoch_ctx );
798 :
799 : /* Now try to publish into funk, this is handled differently based on if
800 : funk is constipated or if funk is double-constipated. Even if funk was
801 : double-constipated and now no-longer is we still want to preserve the
802 : root for the epoch account hash. */
803 0 : if( ctx->double_constipation_slot ) {
804 0 : FD_LOG_NOTICE(( "Double constipation publish for wmk=%lu", wmk ));
805 :
806 0 : fd_funk_txn_t * txn = to_root_txn;
807 0 : while( txn!=ctx->second_false_root ) {
808 0 : if( FD_UNLIKELY( fd_funk_txn_publish_into_parent( ctx->funk, txn, 0 ) ) ) {
809 0 : FD_LOG_ERR(( "Can't publish funk transaction" ));
810 0 : }
811 0 : txn = fd_funk_txn_parent( txn, txn_map );
812 0 : }
813 :
814 0 : } else if( is_constipated ) {
815 0 : FD_LOG_NOTICE(( "Publishing slot=%lu while constipated", wmk ));
816 :
817 : /* At this point, first collapse the current transaction that should be
818 : published into the oldest child transaction. */
819 :
820 0 : if( FD_UNLIKELY( wmk>=epoch_bank->eah_start_slot ) ) {
821 : /* We need to double-constipate at this point. */
822 :
823 : /* First, find the txn where the corresponding slot is the minimum
824 : pending transaction where >= eah_start_slot. */
825 :
826 0 : fd_funk_txn_t * txn = to_root_txn;
827 0 : fd_funk_txn_t * parent_txn = fd_funk_txn_parent( txn, txn_map );
828 :
829 0 : while( parent_txn ) {
830 :
831 0 : int is_curr_gteq_eah_start = txn->xid.ul[0] >= epoch_bank->eah_start_slot;
832 0 : int is_prev_lt_eah_start = parent_txn->xid.ul[0] < epoch_bank->eah_start_slot;
833 0 : if( is_curr_gteq_eah_start && is_prev_lt_eah_start ) {
834 0 : break;
835 0 : }
836 0 : txn = parent_txn;
837 0 : parent_txn = fd_funk_txn_parent( txn, txn_map );
838 0 : }
839 :
840 : /* We should never get to this point because of the constipated root.
841 : The constipated root is guaranteed to have a slot that's < eah_start_slot. */
842 0 : if( FD_UNLIKELY( !parent_txn ) ) {
843 0 : FD_LOG_ERR(( "Not possible for the parent_txn to be the root" ));
844 0 : }
845 :
846 : /* This transaction will now become the double-constipated root. */
847 :
848 0 : FD_LOG_NOTICE(( "Entering a double constipated state eah_start=%lu eah_slot=%lu",
849 0 : epoch_bank->eah_start_slot, txn->xid.ul[0] ));
850 :
851 0 : ctx->double_constipation_slot = txn->xid.ul[0];
852 :
853 : /* Other pending transactions will get published into the child during
854 : the next invocation of funk_publish. */
855 0 : } else {
856 :
857 0 : FD_LOG_NOTICE(( "Publishing into constipated root for wmk=%lu", wmk ));
858 : /* Standard constipated case where we aren't considering the eah. */
859 0 : fd_funk_txn_t * txn = to_root_txn;
860 :
861 0 : while( txn!=ctx->false_root ) {
862 0 : if( FD_UNLIKELY( fd_funk_txn_publish_into_parent( ctx->funk, txn, 0 ) ) ) {
863 0 : FD_LOG_ERR(( "Can't publish funk transaction" ));
864 0 : }
865 0 : txn = fd_funk_txn_parent( txn, txn_map );
866 0 : }
867 0 : }
868 0 : } else {
869 :
870 : /* This is the case where we are not in the constipated case. We only need
871 : to do special handling in the case where the epoch account hash is about
872 : to be calculated. */
873 :
874 0 : FD_LOG_NOTICE(( "Publishing slot=%lu", wmk ));
875 :
876 0 : if( FD_UNLIKELY( wmk>=epoch_bank->eah_start_slot ) ) {
877 :
878 0 : FD_LOG_NOTICE(( "EAH is ready to be calculated" ));
879 :
880 : /* This condition means that we want to start producing an epoch account
881 : hash at a slot that is in the set of transactions we are about to
882 : publish. We only want to publish all slots that are <= the slot that
883 : we will calculate the epoch account hash for. */
884 :
885 0 : fd_funk_txn_t * txn = to_root_txn;
886 0 : fd_funk_txn_t * parent_txn = fd_funk_txn_parent( txn, txn_map );
887 0 : while( parent_txn ) {
888 : /* We need to be careful here because the eah start slot may be skipped
889 : so the actual slot that we calculate the eah for may be greater than
890 : the eah start slot. The transaction must correspond to a slot greater
891 : than or equal to the eah start slot, but its parent transaction must
892 : either have been published already or must be less than the eah start
893 : slot. */
894 :
895 0 : int is_curr_gteq_eah_start = txn->xid.ul[0] >= epoch_bank->eah_start_slot;
896 0 : int is_prev_lt_eah_start = parent_txn->xid.ul[0] < epoch_bank->eah_start_slot;
897 0 : if( is_curr_gteq_eah_start && is_prev_lt_eah_start ) {
898 0 : break;
899 0 : }
900 0 : txn = parent_txn;
901 0 : parent_txn = fd_funk_txn_parent( txn, txn_map );
902 0 : }
903 :
904 : /* At this point, we know txn is the funk txn that we will want to
905 : calculate the eah for since it's the minimum slot that is >=
906 : eah_start_slot. */
907 :
908 0 : FD_LOG_NOTICE(( "The eah has an expected start slot of %lu and is being created for slot %lu", epoch_bank->eah_start_slot, txn->xid.ul[0] ));
909 :
910 0 : if( FD_UNLIKELY( !fd_funk_txn_publish( ctx->funk, txn, 1 ) ) ) {
911 0 : FD_LOG_ERR(( "failed to funk publish" ));
912 0 : }
913 :
914 : /* At this point, we have the root for which we want to calculate the
915 : epoch account hash for. The other children that are > eah_start_slot
916 : but <= wmk will be published into the constipated root during the next
917 : invocation of funk_and_txncache_publish.
918 :
919 : Notify the batch tile that an eah should be computed. */
920 :
921 0 : ulong updated_fseq = fd_batch_fseq_pack( 0UL, 0UL, txn->xid.ul[0] );
922 0 : fd_fseq_update( ctx->is_constipated, updated_fseq );
923 0 : epoch_bank->eah_start_slot = FD_SLOT_NULL;
924 :
925 0 : } else {
926 : /* This is the standard case. Publish all transactions up to and
927 : including the watermark. This will publish any in-prep ancestors
928 : of root_txn as well. */
929 :
930 0 : if( FD_UNLIKELY( !fd_funk_txn_publish( ctx->funk, to_root_txn, 1 ) ) ) {
931 0 : FD_LOG_ERR(( "failed to funk publish slot %lu", wmk ));
932 0 : }
933 0 : }
934 0 : }
935 :
936 0 : fd_funk_end_write( ctx->funk );
937 :
938 0 : }
939 :
940 : static fd_funk_txn_t*
941 : get_rooted_txn( fd_replay_tile_ctx_t * ctx,
942 : fd_funk_txn_t * to_root_txn,
943 : fd_funk_txn_t * txn_map,
944 0 : uchar is_constipated ) {
945 :
946 : /* We need to get the rooted transaction that we are publishing into. This
947 : needs to account for the three different cases: no constipation, single
948 : constipation, double constipation.
949 :
950 : Also, if it's the first time that we are setting the false root(s), then
951 : we must also register them into the status cache because we don't register
952 : the root in txncache_publish to avoid registering the same slot multiple times. */
953 :
954 0 : if( FD_UNLIKELY( ctx->double_constipation_slot ) ) {
955 :
956 0 : if( FD_UNLIKELY( !ctx->second_false_root ) ) {
957 :
958 : /* Set value of second false root, save it and publish to txncache. */
959 0 : fd_funk_txn_t * txn = to_root_txn;
960 0 : while( txn->xid.ul[0]>ctx->double_constipation_slot ) {
961 0 : txn = fd_funk_txn_parent( txn, txn_map );
962 0 : }
963 :
964 0 : if( FD_LIKELY( !fd_txncache_get_is_constipated( ctx->slot_ctx->status_cache ) ) ) {
965 0 : fd_txncache_register_root_slot( ctx->slot_ctx->status_cache, txn->xid.ul[0] );
966 0 : } else {
967 0 : fd_txncache_register_constipated_slot( ctx->slot_ctx->status_cache, txn->xid.ul[0] );
968 0 : }
969 :
970 0 : if( txn->xid.ul[0] != ctx->double_constipation_slot ) {
971 0 : FD_LOG_ERR(( "txn->xid.ul[0] = %lu, ctx->double_constipation_slot = %lu", txn->xid.ul[0], ctx->double_constipation_slot ));
972 0 : }
973 0 : ctx->second_false_root = txn;
974 0 : }
975 0 : return ctx->second_false_root;
976 0 : } else if( is_constipated ) {
977 :
978 0 : if( FD_UNLIKELY( !ctx->false_root ) ) {
979 :
980 0 : fd_funk_txn_t * txn = to_root_txn;
981 0 : fd_funk_txn_t * parent_txn = fd_funk_txn_parent( txn, txn_map );
982 0 : while( parent_txn ) {
983 0 : txn = parent_txn;
984 0 : parent_txn = fd_funk_txn_parent( txn, txn_map );
985 0 : }
986 :
987 0 : ctx->false_root = txn;
988 0 : if( !fd_txncache_get_is_constipated( ctx->slot_ctx->status_cache ) ) {
989 0 : fd_txncache_register_root_slot( ctx->slot_ctx->status_cache, txn->xid.ul[0] );
990 0 : } else {
991 0 : fd_txncache_register_constipated_slot( ctx->slot_ctx->status_cache, txn->xid.ul[0] );
992 0 : }
993 0 : }
994 0 : return ctx->false_root;
995 0 : } else {
996 0 : return NULL;
997 0 : }
998 0 : }
999 :
1000 : static void
1001 0 : funk_and_txncache_publish( fd_replay_tile_ctx_t * ctx, ulong wmk, fd_funk_txn_xid_t const * xid ) {
1002 :
1003 0 : FD_LOG_NOTICE(( "Entering funk_and_txncache_publish for wmk=%lu", wmk ));
1004 :
1005 : /* This function is responsible for publishing/registering all in-prep slots
1006 : up to and including the watermark slot into funk and the transaction cache.
1007 :
1008 : However, we need to modify this behavior to support snapshot creation and
1009 : epoch account hash generation (which is handled by the batch tile).
1010 : Specifically, we need to change the mechanism by introducing the concept of
1011 : a constipated root. We want to keep the root of funk/txncache constant
1012 : while the batch tile reads from the root of funk. At the same time, we
1013 : want to keep publishing into funk. We accomplish this by treating the
1014 : oldest in-prep ancestor of funk as the "constipated/false" root. While
1015 : the batch tile "works", we will only publish into the false root. Once the
1016 : batch tile is done producing a snapshot/eah, we will then flush the
1017 : constipated root into the real root of funk as we no longer need a frozen
1018 : funk transaction to read from. The batch tile will communicate with the
1019 : replay tile via the is_constipated fseq and a link.
1020 :
1021 : There is a pretty important edge case to consider here: what do we do if
1022 : we are currently in the middle of creating a snapshot, but we need to
1023 : record our state for the epoch account hash? The epoch account hash must
1024 : be created for a specific slot and we can't block execution to calculate
1025 : this hash. The solution will be to introduce a second constipation via a
1026 : second false root. This new false root will correspond to the oldest
1027 : child transaction of the transaction that corresponds to the eah
1028 : calculation slot. When the snapshot is done being produced, any further
1029 : snapshot creation will be blocked until the epoch account hash is created.
1030 : We will use the second false root to publish into while the batch tile
1031 : produces the epoch account hash. We do not modify any of the parents of
1032 : the second constipated root until we are done producing a snapshot.
1033 :
1034 : A similar mechanism for txncache constipation is needed only for snapshot
1035 : creation. This is simpler than for funk because txncache operations are
1036 : atomic and we can just register slots into a constipated set while the
1037 : txncache is getting copied out. This is a much faster operation and the
1038 : txncache will likely get unconstipated before funk.
1039 :
1040 : Single Funk Constipation Example:
1041 :
1042 : If we want to create a snapshot/eah for slot n, then we will publish
1043 : all transactions up to and including those that correspond to slot n.
1044 : We will then publish all transactions into the immediate child of n (lets
1045 : assume it's n+1) in this case. So every transaction will be published into
1046 : n+1 and NOT n. When the computation is done, we resume publishing as normal.
1047 :
1048 : Double Funk Constipation Example:
1049 :
1050 : Let's say we are creating a snapshot for slot n and we want
1051 : the epoch account hash for slot m. A snapshot will take x slots to produce
1052 : and we can assume that n + x > m. So at some slot y where n < y < m, the
1053 : state of funk will be: a root at slot n with a constipated root at
1054 : n+1 which gets published into. However, once it is time to publish slot m,
1055 : we will now have a root at slot n, a constipated root at slot m, and we will
1056 : then start publishing into the second constipated root at slot m + 1. */
1057 :
1058 : /* First wait for all tpool threads to finish. */
1059 :
1060 0 : for( ulong i = 0UL; i<ctx->bank_cnt; i++ ) {
1061 0 : fd_tpool_wait( ctx->tpool, i+1 );
1062 0 : }
1063 :
1064 0 : fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->slot_ctx->epoch_ctx );
1065 0 : uchar is_constipated = fd_fseq_query( ctx->is_constipated ) != 0;
1066 :
1067 : /* If the is_constipated fseq is set to 0 that means that the batch tile
1068 : is currently in an idle state. However, if there was a double constipation
1069 : active, that means that we need to kick off the pending epoch account hash
1070 : calculation. */
1071 0 : if( ctx->double_constipation_slot && !is_constipated ) {
1072 0 : FD_LOG_NOTICE(( "No longer double constipated, ready to start computing the epoch account hash" ));
1073 :
1074 : /* At this point, the snapshot has been completed, so we are now ready to
1075 : start the eah computation. */
1076 0 : ulong updated_fseq = fd_batch_fseq_pack( 0UL, 0UL, ctx->double_constipation_slot );
1077 0 : fd_fseq_update( ctx->is_constipated, updated_fseq );
1078 0 : epoch_bank->eah_start_slot = FD_SLOT_NULL;
1079 0 : }
1080 :
1081 : /* If the (second) false root is no longer needed, then we should stop
1082 : tracking it. */
1083 0 : if( FD_UNLIKELY( ctx->false_root && !is_constipated ) ) {
1084 0 : FD_LOG_NOTICE(( "Unsetting false root tracking" ));
1085 0 : ctx->false_root = NULL;
1086 0 : }
1087 0 : if( FD_UNLIKELY( ctx->second_false_root && !ctx->double_constipation_slot ) ) {
1088 0 : FD_LOG_NOTICE(( "Unsetting second false root tracking" ));
1089 0 : ctx->second_false_root = NULL;
1090 0 : }
1091 :
1092 :
1093 : /* Handle updates to funk and the status cache. */
1094 :
1095 0 : fd_funk_txn_t * txn_map = fd_funk_txn_map( ctx->funk, fd_funk_wksp( ctx->funk ) );
1096 0 : fd_funk_txn_t * to_root_txn = fd_funk_txn_query( xid, txn_map );
1097 0 : fd_funk_txn_t * rooted_txn = get_rooted_txn( ctx, to_root_txn, txn_map, is_constipated );
1098 :
1099 0 : txncache_publish( ctx, txn_map, to_root_txn, rooted_txn );
1100 :
1101 0 : funk_publish( ctx, to_root_txn, txn_map, wmk, is_constipated );
1102 :
1103 : /* Update the snapshot state and determine if one is ready to be created. */
1104 :
1105 0 : snapshot_state_update( ctx, wmk );
1106 :
1107 0 : if( FD_UNLIKELY( ctx->capture_ctx ) ) {
1108 0 : fd_runtime_checkpt( ctx->capture_ctx, ctx->slot_ctx, wmk );
1109 0 : }
1110 :
1111 0 : }
1112 :
1113 : static int
1114 0 : suppress_notify( const fd_pubkey_t * prog ) {
1115 : /* Certain accounts are just noise and a waste of notification bandwidth */
1116 0 : if( !memcmp( prog, fd_solana_vote_program_id.key, sizeof(fd_pubkey_t) ) ) {
1117 0 : return 1;
1118 0 : } else if( !memcmp( prog, fd_solana_system_program_id.key, sizeof(fd_pubkey_t) ) ) {
1119 0 : return 1;
1120 0 : } else if( !memcmp( prog, fd_solana_compute_budget_program_id.key, sizeof(fd_pubkey_t) ) ) {
1121 0 : return 1;
1122 0 : } else {
1123 0 : return 0;
1124 0 : }
1125 0 : }
1126 :
1127 : static void
1128 : publish_account_notifications( fd_replay_tile_ctx_t * ctx,
1129 : fd_fork_t * fork,
1130 : ulong curr_slot,
1131 : fd_txn_p_t const * txns,
1132 0 : ulong txn_cnt ) {
1133 0 : long notify_time_ns = -fd_log_wallclock();
1134 0 : #define NOTIFY_START msg = fd_chunk_to_laddr( ctx->notif_out_mem, ctx->notif_out_chunk )
1135 0 : #define NOTIFY_END \
1136 0 : fd_mcache_publish( ctx->notif_out_mcache, ctx->notif_out_depth, ctx->notif_out_seq, \
1137 0 : 0UL, ctx->notif_out_chunk, sizeof(fd_replay_notif_msg_t), 0UL, tsorig, tsorig ); \
1138 0 : ctx->notif_out_seq = fd_seq_inc( ctx->notif_out_seq, 1UL ); \
1139 0 : ctx->notif_out_chunk = fd_dcache_compact_next( ctx->notif_out_chunk, sizeof(fd_replay_notif_msg_t), \
1140 0 : ctx->notif_out_chunk0, ctx->notif_out_wmark ); \
1141 0 : msg = NULL
1142 :
1143 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
1144 0 : fd_replay_notif_msg_t * msg = NULL;
1145 :
1146 0 : for( ulong i = 0; i < txn_cnt; ++i ) {
1147 0 : uchar const * raw = txns[i].payload;
1148 0 : fd_txn_t const * txn = TXN(txns + i);
1149 0 : ushort acct_cnt = txn->acct_addr_cnt;
1150 0 : const fd_pubkey_t * accts = (const fd_pubkey_t *)(raw + txn->acct_addr_off);
1151 0 : FD_TEST((void*)(accts + acct_cnt) <= (void*)(raw + txns[i].payload_sz));
1152 0 : fd_ed25519_sig_t const * sigs = (fd_ed25519_sig_t const *)(raw + txn->signature_off);
1153 0 : FD_TEST((void*)(sigs + txn->signature_cnt) <= (void*)(raw + txns[i].payload_sz));
1154 0 : for( ushort j = 0; j < acct_cnt; ++j ) {
1155 0 : if( suppress_notify( accts + j ) ) continue;
1156 0 : if( msg == NULL ) {
1157 0 : NOTIFY_START;
1158 0 : msg->type = FD_REPLAY_ACCTS_TYPE;
1159 0 : msg->accts.funk_xid = fork->slot_ctx.funk_txn->xid;
1160 0 : fd_memcpy( msg->accts.sig, sigs, sizeof(fd_ed25519_sig_t) );
1161 0 : msg->accts.accts_cnt = 0;
1162 0 : }
1163 0 : struct fd_replay_notif_acct * out = &msg->accts.accts[ msg->accts.accts_cnt++ ];
1164 0 : fd_memcpy( out->id, accts + j, sizeof(out->id) );
1165 0 : int writable = ((j < txn->signature_cnt - txn->readonly_signed_cnt) ||
1166 0 : ((j >= txn->signature_cnt) && (j < acct_cnt - txn->readonly_unsigned_cnt)));
1167 0 : out->flags = (writable ? FD_REPLAY_NOTIF_ACCT_WRITTEN : FD_REPLAY_NOTIF_ACCT_NO_FLAGS );
1168 :
1169 0 : if( msg->accts.accts_cnt == FD_REPLAY_NOTIF_ACCT_MAX ) {
1170 0 : NOTIFY_END;
1171 0 : }
1172 0 : }
1173 0 : if( msg ) {
1174 0 : NOTIFY_END;
1175 0 : }
1176 0 : }
1177 :
1178 0 : #undef NOTIFY_START
1179 0 : #undef NOTIFY_END
1180 0 : notify_time_ns += fd_log_wallclock();
1181 0 : FD_LOG_DEBUG(("TIMING: notify_account_time - slot: %lu, elapsed: %6.6f ms", curr_slot, (double)notify_time_ns * 1e-6));
1182 0 : }
1183 :
1184 : static void
1185 : replay_plugin_publish( fd_replay_tile_ctx_t * ctx,
1186 : fd_stem_context_t * stem,
1187 : ulong sig,
1188 : uchar const * data,
1189 0 : ulong data_sz ) {
1190 0 : uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->replay_plugin_out_mem, ctx->replay_plugin_out_chunk );
1191 0 : fd_memcpy( dst, data, data_sz );
1192 0 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
1193 0 : fd_stem_publish( stem, ctx->replay_plug_out_idx, sig, ctx->replay_plugin_out_chunk, data_sz, 0UL, 0UL, tspub );
1194 0 : ctx->replay_plugin_out_chunk = fd_dcache_compact_next( ctx->replay_plugin_out_chunk, data_sz, ctx->replay_plugin_out_chunk0, ctx->replay_plugin_out_wmark );
1195 0 : }
1196 :
1197 : static void
1198 : publish_slot_notifications( fd_replay_tile_ctx_t * ctx,
1199 : fd_stem_context_t * stem,
1200 : fd_fork_t * fork,
1201 : ulong block_entry_block_height,
1202 0 : ulong curr_slot ) {
1203 0 : long notify_time_ns = -fd_log_wallclock();
1204 0 : #define NOTIFY_START msg = fd_chunk_to_laddr( ctx->notif_out_mem, ctx->notif_out_chunk )
1205 0 : #define NOTIFY_END \
1206 0 : fd_mcache_publish( ctx->notif_out_mcache, ctx->notif_out_depth, ctx->notif_out_seq, \
1207 0 : 0UL, ctx->notif_out_chunk, sizeof(fd_replay_notif_msg_t), 0UL, tsorig, tsorig ); \
1208 0 : ctx->notif_out_seq = fd_seq_inc( ctx->notif_out_seq, 1UL ); \
1209 0 : ctx->notif_out_chunk = fd_dcache_compact_next( ctx->notif_out_chunk, sizeof(fd_replay_notif_msg_t), \
1210 0 : ctx->notif_out_chunk0, ctx->notif_out_wmark ); \
1211 0 : msg = NULL
1212 :
1213 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
1214 0 : fd_replay_notif_msg_t * msg = NULL;
1215 :
1216 0 : {
1217 0 : NOTIFY_START;
1218 0 : msg->type = FD_REPLAY_SLOT_TYPE;
1219 0 : msg->slot_exec.slot = curr_slot;
1220 0 : msg->slot_exec.parent = ctx->parent_slot;
1221 0 : msg->slot_exec.root = fd_fseq_query( ctx->published_wmark );
1222 0 : msg->slot_exec.height = block_entry_block_height;
1223 0 : msg->slot_exec.transaction_count = fork->slot_ctx.slot_bank.transaction_count;
1224 0 : memcpy( &msg->slot_exec.bank_hash, &fork->slot_ctx.slot_bank.banks_hash, sizeof( fd_hash_t ) );
1225 0 : memcpy( &msg->slot_exec.block_hash, &ctx->blockhash, sizeof( fd_hash_t ) );
1226 0 : memcpy( &msg->slot_exec.identity, ctx->validator_identity_pubkey, sizeof( fd_pubkey_t ) );
1227 0 : NOTIFY_END;
1228 0 : }
1229 :
1230 0 : #undef NOTIFY_START
1231 0 : #undef NOTIFY_END
1232 0 : notify_time_ns += fd_log_wallclock();
1233 0 : FD_LOG_DEBUG(("TIMING: notify_slot_time - slot: %lu, elapsed: %6.6f ms", curr_slot, (double)notify_time_ns * 1e-6));
1234 :
1235 0 : if( ctx->replay_plugin_out_mem ) {
1236 0 : fd_replay_complete_msg_t msg2 = {
1237 0 : .slot = curr_slot,
1238 0 : .total_txn_count = fork->slot_ctx.txn_count,
1239 0 : .nonvote_txn_count = fork->slot_ctx.nonvote_txn_count,
1240 0 : .failed_txn_count = fork->slot_ctx.failed_txn_count,
1241 0 : .nonvote_failed_txn_count = fork->slot_ctx.nonvote_failed_txn_count,
1242 0 : .compute_units = fork->slot_ctx.total_compute_units_used,
1243 0 : .transaction_fee = fork->slot_ctx.slot_bank.collected_execution_fees,
1244 0 : .priority_fee = fork->slot_ctx.slot_bank.collected_priority_fees,
1245 0 : .parent_slot = ctx->parent_slot,
1246 0 : };
1247 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_SLOT_COMPLETED, (uchar const *)&msg2, sizeof(msg2) );
1248 0 : }
1249 0 : }
1250 :
1251 : static void
1252 0 : send_tower_sync( fd_replay_tile_ctx_t * ctx ) {
1253 0 : if( FD_UNLIKELY( !ctx->vote ) ) return;
1254 0 : FD_LOG_NOTICE( ( "sending tower sync" ) );
1255 0 : ulong vote_slot = fd_tower_votes_peek_tail_const( ctx->tower )->slot;
1256 0 : fd_hash_t vote_bank_hash[1] = { 0 };
1257 0 : fd_hash_t vote_block_hash[1] = { 0 };
1258 0 : int err = fd_blockstore_bank_hash_query( ctx->blockstore, vote_slot, vote_bank_hash );
1259 0 : if( err ) FD_LOG_ERR(( "invariant violation: missing bank hash for tower vote" ));
1260 0 : err = fd_blockstore_block_hash_query( ctx->blockstore, vote_slot, vote_block_hash );
1261 0 : if( err ) FD_LOG_ERR(( "invariant violation: missing block hash for tower vote" ));
1262 :
1263 : /* Build a vote state update based on current tower votes. */
1264 :
1265 0 : fd_txn_p_t * txn = (fd_txn_p_t *)fd_chunk_to_laddr( ctx->sender_out_mem, ctx->sender_out_chunk );
1266 0 : fd_tower_to_vote_txn( ctx->tower,
1267 0 : ctx->root,
1268 0 : vote_bank_hash,
1269 0 : vote_block_hash,
1270 0 : ctx->validator_identity,
1271 0 : ctx->vote_authority,
1272 0 : ctx->vote_acc,
1273 0 : txn,
1274 0 : ctx->runtime_spad );
1275 :
1276 : /* TODO: Can use a smaller size, adjusted for payload length */
1277 0 : ulong msg_sz = sizeof( fd_txn_p_t );
1278 0 : fd_mcache_publish( ctx->sender_out_mcache,
1279 0 : ctx->sender_out_depth,
1280 0 : ctx->sender_out_seq,
1281 0 : 1UL,
1282 0 : ctx->sender_out_chunk,
1283 0 : msg_sz,
1284 0 : 0UL,
1285 0 : 0,
1286 0 : 0 );
1287 0 : ctx->sender_out_seq = fd_seq_inc( ctx->sender_out_seq, 1UL );
1288 0 : ctx->sender_out_chunk = fd_dcache_compact_next( ctx->sender_out_chunk,
1289 0 : msg_sz,
1290 0 : ctx->sender_out_chunk0,
1291 0 : ctx->sender_out_wmark );
1292 :
1293 : /* Dump the latest sent tower into the tower checkpoint file */
1294 0 : if( FD_LIKELY( ctx->tower_checkpt_fileno > 0 ) ) fd_restart_tower_checkpt( vote_bank_hash, ctx->tower, ctx->ghost, ctx->root, ctx->tower_checkpt_fileno );
1295 0 : }
1296 :
1297 : static fd_fork_t *
1298 : prepare_new_block_execution( fd_replay_tile_ctx_t * ctx,
1299 : fd_stem_context_t * stem,
1300 : ulong curr_slot,
1301 0 : ulong flags ) {
1302 0 : long prepare_time_ns = -fd_log_wallclock();
1303 :
1304 0 : int is_new_epoch_in_new_block = 0;
1305 0 : fd_fork_t * fork = fd_forks_prepare( ctx->forks,
1306 0 : ctx->parent_slot,
1307 0 : ctx->acc_mgr,
1308 0 : ctx->blockstore,
1309 0 : ctx->epoch_ctx,
1310 0 : ctx->funk,
1311 0 : ctx->runtime_spad );
1312 : // Remove slot ctx from frontier
1313 0 : fd_fork_t * child = fd_fork_frontier_ele_remove( ctx->forks->frontier, &fork->slot, NULL, ctx->forks->pool );
1314 0 : child->slot = curr_slot;
1315 0 : if( FD_UNLIKELY( fd_fork_frontier_ele_query(
1316 0 : ctx->forks->frontier, &curr_slot, NULL, ctx->forks->pool ) ) ) {
1317 0 : FD_LOG_ERR( ( "invariant violation: child slot %lu was already in the frontier", curr_slot ) );
1318 0 : }
1319 0 : fd_fork_frontier_ele_insert( ctx->forks->frontier, child, ctx->forks->pool );
1320 0 : fork->lock = 1;
1321 0 : FD_TEST( fork == child );
1322 :
1323 : // fork is advancing
1324 0 : FD_LOG_NOTICE(( "new block execution - slot: %lu, parent_slot: %lu", curr_slot, ctx->parent_slot ));
1325 0 : fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( fork->slot_ctx.epoch_ctx );
1326 :
1327 : /* if it is an epoch boundary, push out stake weights */
1328 0 : if( fork->slot_ctx.slot_bank.slot != 0 ) {
1329 0 : is_new_epoch_in_new_block = (int)fd_runtime_is_epoch_boundary( epoch_bank, fork->slot_ctx.slot_bank.slot, fork->slot_ctx.slot_bank.prev_slot );
1330 0 : }
1331 :
1332 0 : fd_block_map_query_t query[1] = { 0 };
1333 0 : int err = fd_block_map_prepare( ctx->blockstore->block_map, &curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
1334 0 : fd_block_info_t * curr_block_info = fd_block_map_query_ele( query );
1335 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_FULL ) ) FD_LOG_ERR(("Block map prepare failed, likely corrupt."));
1336 0 : if( FD_UNLIKELY( curr_slot != curr_block_info->slot ) ) FD_LOG_ERR(("Block map prepare failed, likely corrupt."));
1337 0 : curr_block_info->in_poh_hash = fork->slot_ctx.slot_bank.poh;
1338 0 : fd_block_map_publish( query );
1339 :
1340 0 : fork->slot_ctx.slot_bank.prev_slot = fork->slot_ctx.slot_bank.slot;
1341 0 : fork->slot_ctx.slot_bank.slot = curr_slot;
1342 0 : fork->slot_ctx.slot_bank.tick_height = fork->slot_ctx.slot_bank.max_tick_height;
1343 0 : if( FD_UNLIKELY( FD_RUNTIME_EXECUTE_SUCCESS != fd_runtime_compute_max_tick_height( epoch_bank->ticks_per_slot, curr_slot, &fork->slot_ctx.slot_bank.max_tick_height ) ) ) {
1344 0 : FD_LOG_ERR(( "couldn't compute tick height/max tick height slot %lu ticks_per_slot %lu", curr_slot, epoch_bank->ticks_per_slot ));
1345 0 : }
1346 0 : fork->slot_ctx.enable_exec_recording = ctx->tx_metadata_storage;
1347 0 : fork->slot_ctx.runtime_wksp = fd_wksp_containing( ctx->runtime_spad );
1348 :
1349 : /* NOTE: By commenting this out, we don't support forking at the epoch boundary
1350 : but this code is buggy and leads to crashes. */
1351 : // if( fd_runtime_is_epoch_boundary( epoch_bank, fork->slot_ctx.slot_bank.slot, fork->slot_ctx.slot_bank.prev_slot ) ) {
1352 : // FD_LOG_WARNING(("Epoch boundary"));
1353 :
1354 : // fd_epoch_fork_elem_t * epoch_fork = NULL;
1355 : // ulong new_epoch = fd_slot_to_epoch( &epoch_bank->epoch_schedule, fork->slot_ctx.slot_bank.slot, NULL );
1356 : // uint found = fd_epoch_forks_prepare( ctx->epoch_forks, fork->slot_ctx.slot_bank.prev_slot, new_epoch, &epoch_fork );
1357 :
1358 : // if( FD_UNLIKELY( found ) ) {
1359 : // fd_exec_epoch_ctx_bank_mem_clear( epoch_fork->epoch_ctx );
1360 : // }
1361 : // fd_exec_epoch_ctx_t * prev_epoch_ctx = fork->slot_ctx.epoch_ctx;
1362 :
1363 : // fd_exec_epoch_ctx_from_prev( epoch_fork->epoch_ctx, prev_epoch_ctx, ctx->runtime_spad );
1364 : // fork->slot_ctx.epoch_ctx = epoch_fork->epoch_ctx;
1365 : // }
1366 :
1367 0 : fork->slot_ctx.status_cache = ctx->status_cache;
1368 :
1369 0 : fd_funk_txn_xid_t xid = { 0 };
1370 :
1371 0 : if( flags & REPLAY_FLAG_PACKED_MICROBLOCK ) {
1372 0 : memset( xid.uc, 0, sizeof(fd_funk_txn_xid_t) );
1373 0 : } else {
1374 0 : xid.ul[1] = fork->slot_ctx.slot_bank.slot;
1375 0 : }
1376 0 : xid.ul[0] = fork->slot_ctx.slot_bank.slot;
1377 : /* push a new transaction on the stack */
1378 0 : fd_funk_start_write( ctx->funk );
1379 0 : fork->slot_ctx.funk_txn = fd_funk_txn_prepare(ctx->funk, fork->slot_ctx.funk_txn, &xid, 1);
1380 0 : fd_funk_end_write( ctx->funk );
1381 :
1382 0 : fd_runtime_block_pre_execute_process_new_epoch( &fork->slot_ctx,
1383 0 : ctx->tpool,
1384 0 : ctx->exec_spads,
1385 0 : ctx->exec_spad_cnt,
1386 0 : ctx->runtime_spad );
1387 :
1388 : /* We want to push on a spad frame before we start executing a block.
1389 : Apart from allocations made at the epoch boundary, there should be no
1390 : allocations that persist beyond the scope of a block. Before this point,
1391 : there should only be 1 or 2 frames that are on the stack. The first frame
1392 : will hold memory for the slot/epoch context. The potential second frame
1393 : will only exist while rewards are being distributed (around the start of
1394 : an epoch). We pop a frame when rewards are done being distributed. */
1395 0 : fd_spad_push( ctx->runtime_spad );
1396 :
1397 0 : int res = fd_runtime_block_execute_prepare( &fork->slot_ctx, ctx->runtime_spad );
1398 0 : if( res != FD_RUNTIME_EXECUTE_SUCCESS ) {
1399 0 : FD_LOG_ERR(( "block prep execute failed" ));
1400 0 : }
1401 :
1402 : /* Read slot history into slot ctx */
1403 0 : fork->slot_ctx.slot_history = fd_sysvar_slot_history_read( fork->slot_ctx.acc_mgr, fork->slot_ctx.funk_txn, ctx->runtime_spad );
1404 :
1405 0 : if( is_new_epoch_in_new_block ) {
1406 0 : publish_stake_weights( ctx, stem, &fork->slot_ctx );
1407 0 : }
1408 :
1409 0 : prepare_time_ns += fd_log_wallclock();
1410 0 : FD_LOG_DEBUG(("TIMING: prepare_time - slot: %lu, elapsed: %6.6f ms", curr_slot, (double)prepare_time_ns * 1e-6));
1411 :
1412 0 : return fork;
1413 0 : }
1414 :
1415 : void
1416 0 : init_poh( fd_replay_tile_ctx_t * ctx ) {
1417 0 : FD_LOG_INFO(( "sending init msg" ));
1418 0 : fd_replay_out_ctx_t * bank_out = &ctx->bank_out[ 0UL ];
1419 0 : fd_poh_init_msg_t * msg = fd_chunk_to_laddr( bank_out->mem, bank_out->chunk );
1420 0 : fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->epoch_ctx );
1421 0 : msg->hashcnt_per_tick = ctx->epoch_ctx->epoch_bank.hashes_per_tick;
1422 0 : msg->ticks_per_slot = ctx->epoch_ctx->epoch_bank.ticks_per_slot;
1423 0 : msg->tick_duration_ns = (ulong)(epoch_bank->ns_per_slot / epoch_bank->ticks_per_slot);
1424 0 : if( ctx->slot_ctx->slot_bank.block_hash_queue.last_hash ) {
1425 0 : memcpy(msg->last_entry_hash, ctx->slot_ctx->slot_bank.block_hash_queue.last_hash->uc, sizeof(fd_hash_t));
1426 0 : } else {
1427 0 : memset(msg->last_entry_hash, 0UL, sizeof(fd_hash_t));
1428 0 : }
1429 0 : msg->tick_height = ctx->slot_ctx->slot_bank.slot * msg->ticks_per_slot;
1430 :
1431 0 : ulong sig = fd_disco_replay_old_sig( ctx->slot_ctx->slot_bank.slot, REPLAY_FLAG_INIT );
1432 0 : fd_mcache_publish( bank_out->mcache, bank_out->depth, bank_out->seq, sig, bank_out->chunk, sizeof(fd_poh_init_msg_t), 0UL, 0UL, 0UL );
1433 0 : bank_out->chunk = fd_dcache_compact_next( bank_out->chunk, sizeof(fd_poh_init_msg_t), bank_out->chunk0, bank_out->wmark );
1434 0 : bank_out->seq = fd_seq_inc( bank_out->seq, 1UL );
1435 0 : ctx->poh_init_done = 1;
1436 0 : }
1437 :
1438 : /* Verifies a microblock batch validity. */
1439 :
1440 : static int FD_FN_UNUSED
1441 : process_and_exec_mbatch( fd_replay_tile_ctx_t * ctx,
1442 : fd_stem_context_t * stem FD_PARAM_UNUSED,
1443 : ulong mbatch_sz,
1444 0 : bool last_batch ) {
1445 0 : #define wait_and_check_success( worker_idx ) \
1446 0 : fd_tpool_wait( ctx->tpool, worker_idx ); \
1447 0 : if( poh_info[ worker_idx ].success ) { \
1448 0 : FD_LOG_WARNING(( "Failed to verify tick poh" )); \
1449 0 : return -1; \
1450 0 : }
1451 0 :
1452 0 : fd_hash_t in_poh_hash;
1453 0 : fd_block_map_query_t query[1] = { 0 } ;
1454 0 : int err = FD_MAP_ERR_AGAIN;
1455 0 : while( err == FD_MAP_ERR_AGAIN ) {
1456 0 : err = fd_block_map_query_try( ctx->blockstore->block_map, &ctx->curr_slot, NULL, query, 0 );
1457 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
1458 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
1459 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY )) { FD_LOG_ERR(( "Failed to query block map" )); }
1460 0 : in_poh_hash = block_info->in_poh_hash;
1461 0 : err = fd_block_map_query_test( query );
1462 0 : }
1463 0 :
1464 0 : ulong micro_cnt = FD_LOAD( ulong, ctx->mbatch );
1465 0 :
1466 0 : if( FD_UNLIKELY( !micro_cnt ) ) { /* in the case of zero padding */
1467 0 : FD_LOG_DEBUG(( "No microblocks in batch" ));
1468 0 : return 0;
1469 0 : }
1470 0 :
1471 0 : fd_poh_verifier_t poh_info = {0};
1472 0 : (void)poh_info;
1473 0 :
1474 0 : fd_microblock_hdr_t * hdr = NULL;
1475 0 : ulong off = sizeof(ulong);
1476 0 : for( ulong i=0UL; i<micro_cnt; i++ ){
1477 0 : hdr = (fd_microblock_hdr_t *)fd_type_pun( ctx->mbatch + off );
1478 0 : int res = fd_runtime_microblock_verify_ticks( ctx->slot_ctx,
1479 0 : ctx->curr_slot,
1480 0 : hdr,
1481 0 : last_batch && i == micro_cnt - 1,
1482 0 : ctx->slot_ctx->slot_bank.tick_height,
1483 0 : ctx->slot_ctx->slot_bank.max_tick_height,
1484 0 : ctx->slot_ctx->epoch_ctx->epoch_bank.hashes_per_tick );
1485 0 :
1486 0 : if( res != FD_BLOCK_OK ) {
1487 0 : FD_LOG_WARNING(( "Failed to verify tick metadata" ));
1488 0 : return -1;
1489 0 : }
1490 0 :
1491 0 : poh_info.success = 0;
1492 0 : poh_info.in_poh_hash = &in_poh_hash;
1493 0 : poh_info.microblock.hdr = hdr;
1494 0 : poh_info.spad = ctx->runtime_spad;
1495 0 : poh_info.microblk_max_sz = mbatch_sz - off;
1496 0 :
1497 0 : off += sizeof(fd_microblock_hdr_t);
1498 0 :
1499 0 : /* FIXME: This needs to be multithreaded. This will be reintroduced when
1500 0 : the execution model changes are made */
1501 0 : // fd_runtime_poh_verify( &poh_info );
1502 0 : // if( poh_info.success==-1 ) {
1503 0 : // FD_LOG_WARNING(( "Failed to verify poh hash" ));
1504 0 : // return -1;
1505 0 : // }
1506 0 :
1507 0 : in_poh_hash = *(fd_hash_t *)fd_type_pun( hdr->hash );
1508 0 :
1509 0 : /* seek past txns */
1510 0 : fd_txn_p_t * txn_p = fd_spad_alloc( ctx->runtime_spad, alignof(fd_txn_p_t*), sizeof(fd_txn_p_t) * hdr->txn_cnt );
1511 0 : for( ulong t=0UL; t<hdr->txn_cnt; t++ ){
1512 0 : ulong pay_sz = 0UL;
1513 0 : ulong txn_sz = fd_txn_parse_core( ctx->mbatch + off,
1514 0 : fd_ulong_min( FD_TXN_MTU, mbatch_sz - off ),
1515 0 : TXN( &txn_p[t] ),
1516 0 : NULL,
1517 0 : &pay_sz );
1518 0 :
1519 0 : if( FD_UNLIKELY( !pay_sz || !txn_sz || txn_sz > FD_TXN_MTU ) ) {
1520 0 : FD_LOG_WARNING(( "failed to parse transaction %lu in replay", t ));
1521 0 : return -1;
1522 0 : }
1523 0 : fd_memcpy( txn_p[t].payload, ctx->mbatch + off, pay_sz );
1524 0 : txn_p[t].payload_sz = pay_sz;
1525 0 : off += pay_sz;
1526 0 :
1527 0 : /* Execute Transaction */
1528 0 :
1529 0 : /* dispatch into MCACHE / DCACHE */
1530 0 : // fd_replay_out_ctx_t * out = &ctx->exec_out[ 0 ];
1531 0 : // fd_stem_publish( stem, out->idx, 0, out->chunk, sizeof(fd_txn_p_t), 0UL, 0UL, 0UL );
1532 0 : // out->chunk = fd_dcache_compact_next( out->chunk, sizeof(fd_txn_p_t), out->chunk0, out->wmark );
1533 0 : }
1534 0 :
1535 0 : /* Now that we have parsed the mblock, we are ready to execute the whole mblock */
1536 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier,
1537 0 : &ctx->curr_slot,
1538 0 : NULL,
1539 0 : ctx->forks->pool );
1540 0 : if( FD_UNLIKELY( !fork ) ) {
1541 0 : FD_LOG_ERR(( "Unable to select a fork" ));
1542 0 : }
1543 0 :
1544 0 : err = fd_runtime_process_txns_in_microblock_stream( &fork->slot_ctx,
1545 0 : ctx->capture_ctx,
1546 0 : txn_p,
1547 0 : hdr->txn_cnt,
1548 0 : ctx->tpool,
1549 0 : ctx->exec_spads,
1550 0 : ctx->exec_spad_cnt,
1551 0 : ctx->runtime_spad,
1552 0 : NULL );
1553 0 :
1554 0 : fd_block_map_query_t query[1] = { 0 };
1555 0 : fd_block_map_prepare( ctx->blockstore->block_map, &ctx->curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
1556 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
1557 0 : if( FD_UNLIKELY( !block_info || block_info->slot != ctx->curr_slot ) ) FD_LOG_ERR(( "[%s] invariant violation: missing block_info %lu", __func__, ctx->curr_slot ));
1558 0 :
1559 0 : if( err != FD_RUNTIME_EXECUTE_SUCCESS ) {
1560 0 : FD_LOG_WARNING(( "microblk process: block invalid - slot: %lu", ctx->curr_slot ));
1561 0 : block_info->flags = fd_uchar_set_bit( block_info->flags, FD_BLOCK_FLAG_DEADBLOCK );
1562 0 : FD_COMPILER_MFENCE();
1563 0 : block_info->flags = fd_uchar_clear_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING );
1564 0 : fd_block_map_publish( query );
1565 0 : return -1;
1566 0 : }
1567 0 :
1568 0 : if( last_batch && i == micro_cnt - 1 ) {
1569 0 :
1570 0 : // Copy block hash to slot_bank poh for updating the sysvars
1571 0 :
1572 0 : memcpy( fork->slot_ctx.slot_bank.poh.uc, hdr->hash, sizeof(fd_hash_t) );
1573 0 :
1574 0 : block_info->flags = fd_uchar_set_bit( block_info->flags, FD_BLOCK_FLAG_PROCESSED );
1575 0 : FD_COMPILER_MFENCE();
1576 0 : block_info->flags = fd_uchar_clear_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING );
1577 0 : memcpy( &block_info->block_hash, hdr->hash, sizeof(fd_hash_t) );
1578 0 : memcpy( &block_info->bank_hash, &fork->slot_ctx.slot_bank.banks_hash, sizeof(fd_hash_t) );
1579 0 : }
1580 0 : publish_account_notifications( ctx, fork, ctx->curr_slot, txn_p, hdr->txn_cnt );
1581 0 : fd_block_map_publish( query );
1582 0 : }
1583 0 : return 0;
1584 0 : # undef wait_and_check_success
1585 0 : }
1586 :
1587 : static void
1588 0 : prepare_first_batch_execution( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem ){
1589 0 : ulong curr_slot = ctx->curr_slot;
1590 0 : ulong parent_slot = ctx->parent_slot;
1591 0 : ulong flags = ctx->flags;
1592 0 : if( FD_UNLIKELY( curr_slot < fd_fseq_query( ctx->published_wmark ) ) ) {
1593 0 : FD_LOG_WARNING(( "ignoring replay of slot %lu (parent: %lu). earlier than our watermark %lu.", curr_slot, parent_slot, fd_fseq_query( ctx->published_wmark ) ));
1594 0 : return;
1595 0 : }
1596 :
1597 0 : if( FD_UNLIKELY( parent_slot < fd_fseq_query( ctx->published_wmark ) ) ) {
1598 0 : FD_LOG_WARNING(( "ignoring replay of slot %lu (parent: %lu). parent slot is earlier than our watermark %lu.", curr_slot, parent_slot, fd_fseq_query( ctx->published_wmark ) ) );
1599 0 : return;
1600 0 : }
1601 :
1602 0 : if( FD_UNLIKELY( !fd_blockstore_block_info_test( ctx->blockstore, parent_slot ) ) ) {
1603 0 : FD_LOG_WARNING(( "[%s] unable to find slot %lu's parent block_info", __func__, curr_slot ));
1604 0 : return;
1605 0 : }
1606 :
1607 : /**********************************************************************/
1608 : /* Get the epoch_ctx for replaying curr_slot */
1609 : /**********************************************************************/
1610 :
1611 0 : ulong epoch_ctx_idx = fd_epoch_forks_get_epoch_ctx( ctx->epoch_forks, ctx->ghost, curr_slot, &ctx->parent_slot );
1612 0 : ctx->epoch_ctx = ctx->epoch_forks->forks[ epoch_ctx_idx ].epoch_ctx;
1613 :
1614 : /**********************************************************************/
1615 : /* Prepare the fork in ctx->forks for replaying curr_slot */
1616 : /**********************************************************************/
1617 :
1618 0 : fd_fork_t * parent_fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &ctx->parent_slot, NULL, ctx->forks->pool );
1619 0 : if( FD_UNLIKELY( parent_fork && parent_fork->lock ) ) {
1620 : /* This is an edge case related to pack. The parent fork might
1621 : already be in the frontier and currently executing (ie.
1622 : fork->frozen = 0). */
1623 0 : FD_LOG_ERR(( "parent slot is frozen in frontier. cannot execute. slot: %lu, parent_slot: %lu",
1624 0 : curr_slot,
1625 0 : ctx->parent_slot ));
1626 0 : }
1627 :
1628 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &curr_slot, NULL, ctx->forks->pool );
1629 0 : if( fork == NULL ) {
1630 0 : fork = prepare_new_block_execution( ctx, stem, curr_slot, flags );
1631 0 : }
1632 0 : ctx->slot_ctx = &fork->slot_ctx;
1633 :
1634 : /**********************************************************************/
1635 : /* Get the solcap context for replaying curr_slot */
1636 : /**********************************************************************/
1637 :
1638 0 : if( ctx->capture_ctx )
1639 0 : fd_solcap_writer_set_slot( ctx->capture_ctx->capture, fork->slot_ctx.slot_bank.slot );
1640 :
1641 0 : }
1642 :
1643 : static void
1644 : exec_slices( fd_replay_tile_ctx_t * ctx,
1645 : fd_stem_context_t * stem FD_PARAM_UNUSED,
1646 0 : ulong slot ) {
1647 : /* Buffer up to a certain number of slices (configurable?). Then, for
1648 : each microblock, round robin dispatch the transactions in that
1649 : microblock to the exec tile. Once exec tile signifies with a
1650 : retcode, we can continue dispatching transactions. Have to
1651 : synchronize at the boundary of every microblock. After we dispatch
1652 : one to each exec tile, we watermark (ctx->mbatch_wmark) where we
1653 : are, and then continue on the following after_credit. If we still
1654 : have txns to execute, start from wmark, pausing everytime we hit
1655 : the microblock boundaries. */
1656 :
1657 0 : fd_replay_slice_t * slice = fd_replay_slice_map_query( ctx->replay->slice_map, slot, NULL );
1658 0 : if( !slice ) {
1659 0 : slice = fd_replay_slice_map_insert( ctx->replay->slice_map, slot );
1660 0 : }
1661 :
1662 : /* Manual population of the slice deque occurs currently when we are:
1663 : 1. Repairing and catching up. All shreds in this case come through
1664 : repair, and thus aren't processed in SHRED_IN_IDX in before_frag
1665 : 2. Repairing shreds after first turbine. Some of the batches will
1666 : be added to the slice_deque through SHRED, but missing shreds
1667 : are still recieved through repair, and aren't processed in */
1668 :
1669 0 : if( ctx->last_completed_slot != slot && fd_replay_slice_deque_cnt( slice->deque ) == 0 ) {
1670 0 : FD_LOG_INFO(( "Failed to query slice deque for slot %lu. Likely shreds were recieved through repair. Manually adding.", slot ));
1671 0 : slice_poll( ctx, slice, slot );
1672 0 : }
1673 :
1674 : //ulong free_exec_tiles = ctx->exec_cnt;
1675 0 : ulong free_exec_tiles = 512;
1676 :
1677 0 : while( free_exec_tiles > 0 ){
1678 : /* change to whatever condition handles if(exec free). */
1679 0 : if( ctx->slice_exec_ctx.txns_rem > 0 ){
1680 : //FD_LOG_WARNING(( "[%s] executing txn", __func__ ));
1681 0 : ulong pay_sz = 0UL;
1682 0 : fd_replay_out_ctx_t * exec_out = &ctx->exec_out[ ctx->exec_cnt - free_exec_tiles ];
1683 0 : (void)exec_out;
1684 : //fd_txn_p_t * txn_p = (fd_txn_p_t *) fd_chunk_to_laddr( exec_out->mem, exec_out->chunk );
1685 0 : fd_txn_p_t txn_p[1];
1686 0 : ulong txn_sz = fd_txn_parse_core( ctx->mbatch + ctx->slice_exec_ctx.wmark,
1687 0 : fd_ulong_min( FD_TXN_MTU, ctx->slice_exec_ctx.sz - ctx->slice_exec_ctx.wmark ),
1688 0 : TXN( txn_p ),
1689 0 : NULL,
1690 0 : &pay_sz );
1691 :
1692 0 : if( FD_UNLIKELY( !pay_sz || !txn_sz || txn_sz > FD_TXN_MTU ) ) {
1693 0 : __asm__("int $3");
1694 0 : FD_LOG_ERR(( "failed to parse transaction in replay" ));
1695 0 : }
1696 0 : fd_memcpy( txn_p->payload, ctx->mbatch + ctx->slice_exec_ctx.wmark, pay_sz );
1697 0 : txn_p->payload_sz = pay_sz;
1698 0 : ctx->slice_exec_ctx.wmark += pay_sz;
1699 :
1700 : /* dispatch dcache */
1701 : //fd_stem_publish( stem, exec_out->idx, slot, exec_out->chunk, sizeof(fd_txn_p_t), 0UL, 0UL, 0UL );
1702 : //exec_out->chunk = fd_dcache_compact_next( exec_out->chunk, sizeof(fd_txn_p_t), exec_out->chunk0, exec_out->wmark );
1703 :
1704 : /* dispatch tpool */
1705 :
1706 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier,
1707 0 : &slot,
1708 0 : NULL,
1709 0 : ctx->forks->pool );
1710 0 : if( FD_UNLIKELY( !fork ) ) {
1711 0 : FD_LOG_ERR(( "Unable to select a fork" ));
1712 0 : }
1713 :
1714 0 : int err = fd_runtime_process_txns_in_microblock_stream( &fork->slot_ctx,
1715 0 : ctx->capture_ctx,
1716 0 : txn_p,
1717 0 : 1,
1718 0 : ctx->tpool,
1719 0 : ctx->exec_spads,
1720 0 : ctx->exec_spad_cnt,
1721 0 : ctx->runtime_spad,
1722 0 : NULL );
1723 :
1724 0 : if( err != FD_RUNTIME_EXECUTE_SUCCESS ) {
1725 0 : FD_LOG_WARNING(( "microblk process: block invalid - slot: %lu", ctx->curr_slot ));
1726 :
1727 0 : fd_block_map_query_t query[1] = { 0 };
1728 0 : fd_block_map_prepare( ctx->blockstore->block_map, &ctx->curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
1729 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
1730 0 : if( FD_UNLIKELY( !block_info || block_info->slot != ctx->curr_slot ) ) FD_LOG_ERR(( "[%s] invariant violation: missing block_info %lu", __func__, ctx->curr_slot ));
1731 :
1732 0 : block_info->flags = fd_uchar_set_bit( block_info->flags, FD_BLOCK_FLAG_DEADBLOCK );
1733 0 : FD_COMPILER_MFENCE();
1734 0 : block_info->flags = fd_uchar_clear_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING );
1735 :
1736 0 : fd_block_map_publish( query );
1737 0 : }
1738 :
1739 0 : publish_account_notifications( ctx, fork, ctx->curr_slot, txn_p, 1 );
1740 :
1741 0 : ctx->slice_exec_ctx.txns_rem--;
1742 0 : free_exec_tiles--;
1743 0 : continue;
1744 0 : }
1745 :
1746 : /* If the current microblock is complete, and we still have mblks
1747 : to read, then advance to the next microblock */
1748 :
1749 0 : if( ctx->slice_exec_ctx.txns_rem == 0 && ctx->slice_exec_ctx.mblks_rem > 0 ){
1750 : //FD_LOG_WARNING(( "[%s] reading microblock", __func__ ));
1751 :
1752 0 : fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)fd_type_pun( ctx->mbatch + ctx->slice_exec_ctx.wmark );
1753 0 : ctx->slice_exec_ctx.txns_rem = hdr->txn_cnt;
1754 0 : ctx->slice_exec_ctx.last_mblk_off = ctx->slice_exec_ctx.wmark;
1755 0 : ctx->slice_exec_ctx.wmark += sizeof(fd_microblock_hdr_t);
1756 0 : ctx->slice_exec_ctx.mblks_rem--;
1757 0 : if( free_exec_tiles == 512 ){
1758 : /* no transactions were executed this credit, free to start executing new microblock txns */
1759 0 : continue;
1760 0 : }
1761 0 : break; /* have to synchronize & wait for exec tiles to finish the prev microblock */
1762 0 : }
1763 :
1764 : /* The prev batch is complete, but we have more batches to read. */
1765 :
1766 0 : if( ctx->slice_exec_ctx.mblks_rem == 0 && !ctx->slice_exec_ctx.last_batch ) {
1767 :
1768 : /* Waiting on batches to arrive from the shred tile */
1769 :
1770 0 : if( fd_replay_slice_deque_cnt( slice->deque ) == 0 ) break;
1771 :
1772 0 : if( FD_UNLIKELY( ctx->slice_exec_ctx.sz == 0 ) ) { /* I think maybe can move this out when */
1773 0 : FD_LOG_NOTICE(("Preparing first batch execution of slot %lu", slot ));
1774 0 : prepare_first_batch_execution( ctx, stem );
1775 0 : }
1776 :
1777 0 : ulong key = fd_replay_slice_deque_pop_head( slice->deque );
1778 0 : uint start_idx = fd_replay_slice_start_idx( key );
1779 0 : uint end_idx = fd_replay_slice_end_idx ( key );
1780 :
1781 : /* populate last shred idx. Can also do this just once but... */
1782 0 : for(;;) { /* speculative query */
1783 0 : fd_block_map_query_t query[1] = { 0 };
1784 0 : int err = fd_block_map_query_try( ctx->blockstore->block_map, &slot, NULL, query, 0 );
1785 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
1786 :
1787 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) FD_LOG_ERR(("Failed to query blockstore for slot %lu", slot ));
1788 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
1789 :
1790 0 : ctx->slice_exec_ctx.last_batch = block_info->slot_complete_idx == end_idx;
1791 : //slot_complete_idx = block_info->slot_complete_idx;
1792 0 : if( FD_UNLIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) break;
1793 0 : }
1794 : //FD_LOG_WARNING(( "[%s] Executing batch %u %u, last: %u", __func__, start_idx, end_idx, slot_complete_idx ));
1795 :
1796 0 : ulong slice_sz;
1797 0 : int err = fd_blockstore_slice_query( ctx->slot_ctx->blockstore,
1798 0 : slot,
1799 0 : start_idx,
1800 0 : end_idx,
1801 0 : FD_SLICE_MAX - ctx->slice_exec_ctx.sz,
1802 0 : ctx->mbatch + ctx->slice_exec_ctx.sz,
1803 0 : &slice_sz );
1804 :
1805 0 : if( err ) FD_LOG_ERR(( "Failed to query blockstore for slot %lu", slot ));
1806 0 : ctx->slice_exec_ctx.mblks_rem = FD_LOAD( ulong, ctx->mbatch + ctx->slice_exec_ctx.sz );
1807 0 : ctx->slice_exec_ctx.wmark = ctx->slice_exec_ctx.sz + sizeof(ulong);
1808 0 : ctx->slice_exec_ctx.sz += slice_sz;
1809 0 : if ( free_exec_tiles == 512 ) continue;
1810 0 : break;
1811 0 : }
1812 :
1813 0 : if( FD_UNLIKELY( ctx->slice_exec_ctx.last_batch &&
1814 0 : ctx->slice_exec_ctx.mblks_rem == 0 &&
1815 0 : ctx->slice_exec_ctx.txns_rem == 0 ) ) {
1816 : /* block done. */
1817 0 : break;
1818 0 : }
1819 0 : }
1820 :
1821 0 : if( ctx->slice_exec_ctx.last_batch && ctx->slice_exec_ctx.mblks_rem == 0 && ctx->slice_exec_ctx.txns_rem == 0 ){
1822 0 : FD_LOG_WARNING(( "[%s] BLOCK EXECUTION COMPLETE", __func__ ));
1823 :
1824 : /* At this point, the entire block has been executed. */
1825 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier,
1826 0 : &slot,
1827 0 : NULL,
1828 0 : ctx->forks->pool );
1829 0 : if( FD_UNLIKELY( !fork ) ) {
1830 0 : FD_LOG_ERR(( "Unable to select a fork" ));
1831 0 : }
1832 :
1833 0 : fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t*)fd_type_pun( ctx->mbatch + ctx->slice_exec_ctx.last_mblk_off );
1834 :
1835 : // Copy block hash to slot_bank poh for updating the sysvars
1836 0 : fd_block_map_query_t query[1] = { 0 };
1837 0 : fd_block_map_prepare( ctx->blockstore->block_map, &ctx->curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
1838 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
1839 :
1840 0 : memcpy( fork->slot_ctx.slot_bank.poh.uc, hdr->hash, sizeof(fd_hash_t) );
1841 0 : block_info->flags = fd_uchar_set_bit( block_info->flags, FD_BLOCK_FLAG_PROCESSED );
1842 0 : FD_COMPILER_MFENCE();
1843 0 : block_info->flags = fd_uchar_clear_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING );
1844 0 : memcpy( &block_info->block_hash, hdr->hash, sizeof(fd_hash_t) );
1845 0 : memcpy( &block_info->bank_hash, &fork->slot_ctx.slot_bank.banks_hash, sizeof(fd_hash_t) );
1846 :
1847 0 : fd_block_map_publish( query );
1848 0 : ctx->flags = fd_disco_replay_old_sig( slot, REPLAY_FLAG_FINISHED_BLOCK );
1849 :
1850 0 : ctx->slice_exec_ctx.last_batch = 0;
1851 0 : ctx->slice_exec_ctx.txns_rem = 0;
1852 0 : ctx->slice_exec_ctx.mblks_rem = 0;
1853 0 : ctx->slice_exec_ctx.sz = 0;
1854 0 : ctx->slice_exec_ctx.wmark = 0;
1855 0 : ctx->slice_exec_ctx.last_mblk_off = 0;
1856 0 : }
1857 0 : }
1858 :
1859 : static void
1860 : after_frag( fd_replay_tile_ctx_t * ctx,
1861 : ulong in_idx,
1862 : ulong seq,
1863 : ulong sig FD_PARAM_UNUSED,
1864 : ulong sz FD_PARAM_UNUSED,
1865 : ulong tsorig,
1866 : ulong tspub FD_PARAM_UNUSED,
1867 0 : fd_stem_context_t * stem FD_PARAM_UNUSED ) {
1868 0 : (void)sig;
1869 0 : (void)sz;
1870 0 : (void)seq;
1871 :
1872 : /*if( FD_LIKELY( in_idx == SHRED_IN_IDX ) ) {
1873 :
1874 : after_frag only called if it's the first code shred we're
1875 : receiving for the FEC set
1876 :
1877 : ulong slot = fd_disco_shred_replay_sig_slot( sig );
1878 : uint fec_set_idx = fd_disco_shred_replay_sig_fec_set_idx( sig );
1879 :
1880 : fd_replay_fec_t * fec = fd_replay_fec_query( ctx->replay, slot, fec_set_idx );
1881 : if( !fec ) return; // hack
1882 : fec->data_cnt = ctx->shred->code.data_cnt;
1883 :
1884 : return;
1885 : }*/
1886 :
1887 0 : if( FD_UNLIKELY( ctx->skip_frag ) ) return;
1888 0 : if( FD_UNLIKELY( in_idx == STORE_IN_IDX ) ) {
1889 0 : FD_LOG_NOTICE(("Received store message, executing slot %lu", ctx->curr_slot ));
1890 : //exec_slices( ctx, stem, ctx->curr_slot );
1891 0 : }
1892 :
1893 : /**********************************************************************/
1894 : /* The rest of after_frag replays some microblocks in block curr_slot */
1895 : /**********************************************************************/
1896 :
1897 0 : ulong curr_slot = ctx->curr_slot;
1898 0 : ulong flags = ctx->flags;
1899 0 : ulong bank_idx = ctx->bank_idx;
1900 :
1901 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &ctx->curr_slot, NULL, ctx->forks->pool );
1902 :
1903 : /**********************************************************************/
1904 : /* Execute the transactions which were gathered */
1905 : /**********************************************************************/
1906 :
1907 0 : ulong txn_cnt = ctx->txn_cnt;
1908 0 : fd_replay_out_ctx_t * bank_out = &ctx->bank_out[ bank_idx ];
1909 0 : fd_txn_p_t * txns = (fd_txn_p_t *)fd_chunk_to_laddr( bank_out->mem, bank_out->chunk );
1910 :
1911 : //Execute all txns which were successfully prepared
1912 0 : ctx->metrics.slot = curr_slot;
1913 0 : if( flags & REPLAY_FLAG_PACKED_MICROBLOCK ) {
1914 : /* TODO: The leader pipeline execution needs to be optimized. This is
1915 : very hacky and suboptimal. First, wait for the tpool workers to be idle.
1916 : Then, execute the transactions, and notify the pack tile. We should be
1917 : taking advantage of bank_busy flags. */
1918 :
1919 0 : for( ulong i=1UL; i<ctx->exec_spad_cnt; i++ ) {
1920 0 : fd_tpool_wait( ctx->tpool, i );
1921 0 : }
1922 :
1923 0 : fd_runtime_process_txns_in_microblock_stream( ctx->slot_ctx,
1924 0 : ctx->capture_ctx,
1925 0 : txns,
1926 0 : txn_cnt,
1927 0 : ctx->tpool,
1928 0 : ctx->exec_spads,
1929 0 : ctx->exec_spad_cnt,
1930 0 : ctx->runtime_spad,
1931 0 : NULL );
1932 :
1933 0 : fd_microblock_trailer_t * microblock_trailer = (fd_microblock_trailer_t *)(txns + txn_cnt);
1934 :
1935 0 : hash_transactions( ctx->bmtree[ bank_idx ], txns, txn_cnt, microblock_trailer->hash );
1936 :
1937 0 : ulong sig = fd_disco_replay_old_sig( curr_slot, flags );
1938 0 : fd_mcache_publish( bank_out->mcache, bank_out->depth, bank_out->seq, sig, bank_out->chunk, txn_cnt, 0UL, 0UL, 0UL );
1939 0 : bank_out->chunk = fd_dcache_compact_next( bank_out->chunk, (txn_cnt * sizeof(fd_txn_p_t)) + sizeof(fd_microblock_trailer_t), bank_out->chunk0, bank_out->wmark );
1940 0 : bank_out->seq = fd_seq_inc( bank_out->seq, 1UL );
1941 :
1942 : /* Indicate to pack tile we are done processing the transactions so it
1943 : can pack new microblocks using these accounts. DO NOT USE THE
1944 : SANITIZED TRANSACTIONS AFTER THIS POINT, THEY ARE NO LONGER VALID. */
1945 0 : fd_fseq_update( ctx->bank_busy[ bank_idx ], seq );
1946 :
1947 0 : publish_account_notifications( ctx, fork, curr_slot, txns, txn_cnt );
1948 0 : }
1949 :
1950 : /**********************************************************************/
1951 : /* Init PoH if it is ready */
1952 : /**********************************************************************/
1953 :
1954 0 : if( FD_UNLIKELY( !(flags & REPLAY_FLAG_CATCHING_UP) && ctx->poh_init_done == 0 && ctx->slot_ctx->blockstore ) ) {
1955 0 : init_poh( ctx );
1956 0 : }
1957 :
1958 : /**********************************************************************/
1959 : /* Publish mblk to POH */
1960 : /**********************************************************************/
1961 :
1962 0 : if( ctx->poh_init_done == 1 && !( flags & REPLAY_FLAG_FINISHED_BLOCK )
1963 0 : && ( ( flags & REPLAY_FLAG_MICROBLOCK ) ) ) {
1964 : // FD_LOG_INFO(( "publishing mblk to poh - slot: %lu, parent_slot: %lu", curr_slot, ctx->parent_slot ));
1965 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
1966 0 : ulong sig = fd_disco_replay_old_sig( curr_slot, flags );
1967 0 : fd_mcache_publish( bank_out->mcache, bank_out->depth, bank_out->seq, sig, bank_out->chunk, txn_cnt, 0UL, tsorig, tspub );
1968 0 : bank_out->chunk = fd_dcache_compact_next( bank_out->chunk, (txn_cnt * sizeof(fd_txn_p_t)) + sizeof(fd_microblock_trailer_t), bank_out->chunk0, bank_out->wmark );
1969 0 : bank_out->seq = fd_seq_inc( bank_out->seq, 1UL );
1970 0 : } else {
1971 0 : FD_LOG_DEBUG(( "NOT publishing mblk to poh - slot: %lu, parent_slot: %lu, flags: %lx", curr_slot, ctx->parent_slot, flags ));
1972 0 : }
1973 :
1974 : #if STOP_SLOT
1975 : if( FD_UNLIKELY( curr_slot == STOP_SLOT ) ) {
1976 :
1977 : if( FD_UNLIKELY( ctx->capture_file ) ) fclose( ctx->slots_replayed_file );
1978 :
1979 : if( FD_UNLIKELY( strcmp( ctx->blockstore_checkpt, "" ) ) ) {
1980 : int rc = fd_wksp_checkpt( ctx->blockstore_wksp, ctx->blockstore_checkpt, 0666, 0, NULL );
1981 : if( rc ) {
1982 : FD_LOG_ERR( ( "blockstore checkpt failed: error %d", rc ) );
1983 : }
1984 : }
1985 : FD_LOG_ERR( ( "stopping at %lu (#define STOP_SLOT %lu). shutting down.", STOP_SLOT, STOP_SLOT ) );
1986 : }
1987 : #endif
1988 0 : }
1989 :
1990 : void
1991 0 : tpool_boot( fd_topo_t * topo, ulong total_thread_count ) {
1992 0 : ushort tile_to_cpu[ FD_TILE_MAX ] = { 0 };
1993 0 : ulong thread_count = 0;
1994 0 : ulong main_thread_seen = 0;
1995 :
1996 0 : for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
1997 0 : if( strcmp( topo->tiles[i].name, "rtpool" ) == 0 ) {
1998 0 : tile_to_cpu[ 1+thread_count ] = (ushort)topo->tiles[i].cpu_idx;
1999 0 : thread_count++;
2000 0 : }
2001 0 : if( strcmp( topo->tiles[i].name, "replay" ) == 0 ) {
2002 0 : tile_to_cpu[ 0 ] = (ushort)topo->tiles[i].cpu_idx;
2003 0 : main_thread_seen = 1;
2004 0 : }
2005 0 : }
2006 :
2007 0 : if( main_thread_seen ) {
2008 0 : thread_count++;
2009 0 : }
2010 :
2011 0 : if( thread_count != total_thread_count )
2012 0 : FD_LOG_ERR(( "thread count mismatch thread_count=%lu total_thread_count=%lu main_thread_seen=%lu", thread_count, total_thread_count, main_thread_seen ));
2013 :
2014 0 : fd_tile_private_map_boot( tile_to_cpu, thread_count );
2015 0 : }
2016 :
2017 : static void
2018 0 : kickoff_repair_orphans( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem ) {
2019 :
2020 0 : fd_blockstore_init( ctx->slot_ctx->blockstore, ctx->blockstore_fd, FD_BLOCKSTORE_ARCHIVE_MIN_SIZE, &ctx->slot_ctx->slot_bank );
2021 :
2022 0 : publish_stake_weights( ctx, stem, ctx->slot_ctx );
2023 0 : fd_fseq_update( ctx->published_wmark, ctx->slot_ctx->slot_bank.slot );
2024 :
2025 0 : }
2026 :
2027 : static void
2028 : read_snapshot( void * _ctx,
2029 : fd_stem_context_t * stem,
2030 : char const * snapshotfile,
2031 0 : char const * incremental ) {
2032 0 : fd_replay_tile_ctx_t * ctx = (fd_replay_tile_ctx_t *)_ctx;
2033 :
2034 0 : if( ctx->replay_plugin_out_mem ) {
2035 : // ValidatorStartProgress::DownloadingSnapshot
2036 0 : uchar msg[56];
2037 0 : fd_memset( msg, 0, sizeof(msg) );
2038 0 : msg[0] = 2;
2039 0 : msg[1] = 1;
2040 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
2041 0 : }
2042 :
2043 : /* Pass the slot_ctx to snapshot_load or recover_banks */
2044 : /* Base slot is the slot we will compare against the base slot of the incremental snapshot, to ensure that the
2045 : base slot of the incremental snapshot is the slot of the full snapshot.
2046 :
2047 : We pull this out of the full snapshot to use when verifying the incremental snapshot. */
2048 0 : ulong base_slot = 0UL;
2049 0 : const char * snapshot = snapshotfile;
2050 0 : if( strcmp( snapshot, "funk" )==0 || strncmp( snapshot, "wksp:", 5 )==0 ) {
2051 : /* Funk already has a snapshot loaded */
2052 0 : fd_runtime_recover_banks( ctx->slot_ctx, 1, 1, ctx->runtime_spad );
2053 0 : base_slot = ctx->slot_ctx->slot_bank.slot;
2054 0 : publish_stake_weights( ctx, stem, ctx->slot_ctx );
2055 0 : fd_fseq_update( ctx->published_wmark, ctx->slot_ctx->slot_bank.slot );
2056 0 : } else {
2057 :
2058 : /* If we have an incremental snapshot try to prefetch the snapshot slot
2059 : and manifest as soon as possible. In order to kick off repair effectively
2060 : we need the snapshot slot and the stake weights. These are both available
2061 : in the manifest. We will try to load in the manifest from the latest
2062 : snapshot that is availble, then setup the blockstore and publish the
2063 : stake weights. After this, repair will kick off concurrently with loading
2064 : the rest of the snapshots. */
2065 :
2066 : /* TODO: enable snapshot verification for all 3 snapshot loads */
2067 :
2068 0 : if( strlen( incremental )>0UL ) {
2069 0 : uchar * tmp_mem = fd_spad_alloc( ctx->runtime_spad, fd_snapshot_load_ctx_align(), fd_snapshot_load_ctx_footprint() );
2070 : /* TODO: enable snapshot verification */
2071 0 : fd_snapshot_load_ctx_t * tmp_snap_ctx = fd_snapshot_load_new( tmp_mem,
2072 0 : incremental,
2073 0 : ctx->slot_ctx,
2074 0 : ctx->tpool,
2075 0 : false,
2076 0 : false,
2077 0 : FD_SNAPSHOT_TYPE_FULL,
2078 0 : ctx->exec_spads,
2079 0 : ctx->exec_spad_cnt,
2080 0 : ctx->runtime_spad );
2081 : /* Load the prefetch manifest, and initialize the status cache and slot context,
2082 : so that we can use these to kick off repair. */
2083 0 : fd_snapshot_load_prefetch_manifest( tmp_snap_ctx );
2084 0 : kickoff_repair_orphans( ctx, stem );
2085 0 : }
2086 :
2087 : /* In order to kick off repair effectively we need the snapshot slot and
2088 : the stake weights. These are both available in the manifest. We will
2089 : try to load in the manifest from the latest snapshot that is available,
2090 : then setup the blockstore and publish the stake weights. After this,
2091 : repair will kick off concurrently with loading the rest of the snapshots. */
2092 :
2093 0 : uchar * mem = fd_spad_alloc( ctx->runtime_spad, fd_snapshot_load_ctx_align(), fd_snapshot_load_ctx_footprint() );
2094 : /* TODO: enable snapshot verification */
2095 0 : fd_snapshot_load_ctx_t * snap_ctx = fd_snapshot_load_new( mem,
2096 0 : snapshot,
2097 0 : ctx->slot_ctx,
2098 0 : ctx->tpool,
2099 0 : false,
2100 0 : false,
2101 0 : FD_SNAPSHOT_TYPE_FULL,
2102 0 : ctx->exec_spads,
2103 0 : ctx->exec_spad_cnt,
2104 0 : ctx->runtime_spad );
2105 :
2106 0 : fd_snapshot_load_init( snap_ctx );
2107 :
2108 : /* If we don't have an incremental snapshot, load the manifest and the status cache and initialize
2109 : the objects because we don't have these from the incremental snapshot. */
2110 0 : if( strlen( incremental )<=0UL ) {
2111 0 : fd_snapshot_load_manifest_and_status_cache( snap_ctx, NULL,
2112 0 : FD_SNAPSHOT_RESTORE_MANIFEST | FD_SNAPSHOT_RESTORE_STATUS_CACHE );
2113 :
2114 : /* If we don't have an incremental snapshot, we can still kick off
2115 : sending the stake weights and snapshot slot to repair. */
2116 0 : kickoff_repair_orphans( ctx, stem );
2117 0 : } else {
2118 : /* If we have an incremental snapshot, load the manifest and the status cache,
2119 : and don't initialize the objects because we did this above from the incremental snapshot. */
2120 0 : fd_snapshot_load_manifest_and_status_cache( snap_ctx, NULL, FD_SNAPSHOT_RESTORE_NONE );
2121 0 : }
2122 0 : base_slot = fd_snapshot_get_slot( snap_ctx );
2123 :
2124 0 : fd_snapshot_load_accounts( snap_ctx );
2125 0 : fd_snapshot_load_fini( snap_ctx );
2126 0 : }
2127 :
2128 : /* Load incremental */
2129 :
2130 0 : if( ctx->replay_plugin_out_mem ) {
2131 : // ValidatorStartProgress::DownloadingSnapshot
2132 0 : uchar msg[56];
2133 0 : fd_memset( msg, 0, sizeof(msg) );
2134 0 : msg[0] = 2;
2135 0 : msg[1] = 0;
2136 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
2137 0 : }
2138 :
2139 0 : if( strlen( incremental ) > 0 && strcmp( snapshot, "funk" ) != 0 ) {
2140 :
2141 : /* The slot of the full snapshot should be used as the base slot to verify the incremental snapshot,
2142 : not the slot context's slot - which is the slot of the incremental, not the full snapshot. */
2143 : /* TODO: enable snapshot verification */
2144 0 : fd_snapshot_load_all( incremental,
2145 0 : ctx->slot_ctx,
2146 0 : &base_slot,
2147 0 : ctx->tpool,
2148 0 : false,
2149 0 : false,
2150 0 : FD_SNAPSHOT_TYPE_INCREMENTAL,
2151 0 : ctx->exec_spads,
2152 0 : ctx->exec_spad_cnt,
2153 0 : ctx->runtime_spad );
2154 0 : }
2155 :
2156 0 : if( ctx->replay_plugin_out_mem ) {
2157 : // ValidatorStartProgress::DownloadedFullSnapshot
2158 0 : uchar msg[56];
2159 0 : fd_memset( msg, 0, sizeof(msg) );
2160 0 : msg[0] = 3;
2161 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
2162 0 : }
2163 :
2164 0 : fd_runtime_update_leaders( ctx->slot_ctx,
2165 0 : ctx->slot_ctx->slot_bank.slot,
2166 0 : ctx->runtime_spad );
2167 0 : FD_LOG_NOTICE(( "starting fd_bpf_scan_and_create_bpf_program_cache_entry..." ));
2168 0 : fd_funk_start_write( ctx->slot_ctx->acc_mgr->funk );
2169 0 : fd_bpf_scan_and_create_bpf_program_cache_entry_tpool( ctx->slot_ctx,
2170 0 : ctx->slot_ctx->funk_txn,
2171 0 : ctx->tpool,
2172 0 : ctx->runtime_spad );
2173 0 : fd_funk_end_write( ctx->slot_ctx->acc_mgr->funk );
2174 0 : FD_LOG_NOTICE(( "finished fd_bpf_scan_and_create_bpf_program_cache_entry..." ));
2175 :
2176 0 : fd_blockstore_init( ctx->slot_ctx->blockstore,
2177 0 : ctx->blockstore_fd,
2178 0 : FD_BLOCKSTORE_ARCHIVE_MIN_SIZE,
2179 0 : &ctx->slot_ctx->slot_bank );
2180 0 : }
2181 :
2182 : static void
2183 0 : init_after_snapshot( fd_replay_tile_ctx_t * ctx ) {
2184 : /* Do not modify order! */
2185 :
2186 : /* First, load in the sysvars into the sysvar cache. This is required to
2187 : make the StakeHistory sysvar available to the rewards calculation. */
2188 :
2189 0 : fd_runtime_sysvar_cache_load( ctx->slot_ctx, ctx->runtime_spad );
2190 :
2191 : /* After both snapshots have been loaded in, we can determine if we should
2192 : start distributing rewards. */
2193 :
2194 0 : fd_rewards_recalculate_partitioned_rewards( ctx->slot_ctx,
2195 0 : ctx->tpool,
2196 0 : ctx->exec_spads,
2197 0 : ctx->exec_spad_cnt,
2198 0 : ctx->runtime_spad );
2199 :
2200 0 : ulong snapshot_slot = ctx->slot_ctx->slot_bank.slot;
2201 0 : if( FD_UNLIKELY( !snapshot_slot ) ) {
2202 0 : fd_runtime_update_leaders( ctx->slot_ctx,
2203 0 : ctx->slot_ctx->slot_bank.slot,
2204 0 : ctx->runtime_spad );
2205 :
2206 0 : ctx->slot_ctx->slot_bank.prev_slot = 0UL;
2207 0 : ctx->slot_ctx->slot_bank.slot = 1UL;
2208 :
2209 0 : ulong hashcnt_per_slot = ctx->slot_ctx->epoch_ctx->epoch_bank.hashes_per_tick * ctx->slot_ctx->epoch_ctx->epoch_bank.ticks_per_slot;
2210 0 : while(hashcnt_per_slot--) {
2211 0 : fd_sha256_hash( ctx->slot_ctx->slot_bank.poh.uc, 32UL, ctx->slot_ctx->slot_bank.poh.uc );
2212 0 : }
2213 :
2214 0 : FD_TEST( fd_runtime_block_execute_prepare( ctx->slot_ctx, ctx->runtime_spad ) == 0 );
2215 0 : fd_runtime_block_info_t info = { .signature_cnt = 0 };
2216 0 : FD_TEST( fd_runtime_block_execute_finalize_tpool( ctx->slot_ctx,
2217 0 : NULL,
2218 0 : &info,
2219 0 : ctx->tpool,
2220 0 : ctx->runtime_spad ) == 0 );
2221 :
2222 0 : ctx->slot_ctx->slot_bank.prev_slot = 0UL;
2223 0 : ctx->slot_ctx->slot_bank.slot = 1UL;
2224 0 : snapshot_slot = 1UL;
2225 :
2226 0 : FD_LOG_NOTICE(( "starting fd_bpf_scan_and_create_bpf_program_cache_entry..." ));
2227 0 : fd_funk_start_write( ctx->slot_ctx->acc_mgr->funk );
2228 0 : fd_bpf_scan_and_create_bpf_program_cache_entry_tpool( ctx->slot_ctx,
2229 0 : ctx->slot_ctx->funk_txn,
2230 0 : ctx->tpool,
2231 0 : ctx->runtime_spad );
2232 0 : fd_funk_end_write( ctx->slot_ctx->acc_mgr->funk );
2233 0 : FD_LOG_NOTICE(( "finished fd_bpf_scan_and_create_bpf_program_cache_entry..." ));
2234 :
2235 0 : }
2236 :
2237 0 : ctx->curr_slot = snapshot_slot;
2238 0 : ctx->parent_slot = ctx->slot_ctx->slot_bank.prev_slot;
2239 0 : ctx->snapshot_slot = snapshot_slot;
2240 0 : ctx->blockhash = ( fd_hash_t ){ .hash = { 0 } };
2241 0 : ctx->flags = 0UL;
2242 0 : ctx->txn_cnt = 0UL;
2243 :
2244 : /* Initialize consensus structures post-snapshot */
2245 :
2246 0 : fd_fork_t * snapshot_fork = fd_forks_init( ctx->forks, ctx->slot_ctx );
2247 0 : FD_TEST( snapshot_fork );
2248 0 : fd_epoch_init( ctx->epoch, &snapshot_fork->slot_ctx.epoch_ctx->epoch_bank );
2249 0 : fd_ghost_init( ctx->ghost, snapshot_slot );
2250 :
2251 0 : fd_funk_rec_key_t key = { 0 };
2252 0 : fd_memcpy( key.c, ctx->vote_acc, sizeof(fd_pubkey_t) );
2253 0 : key.c[FD_FUNK_REC_KEY_FOOTPRINT - 1] = FD_FUNK_KEY_TYPE_ACC;
2254 0 : fd_tower_from_vote_acc( ctx->tower, ctx->funk, snapshot_fork->slot_ctx.funk_txn, &key );
2255 0 : FD_LOG_NOTICE(( "vote account: %s", FD_BASE58_ENC_32_ALLOCA( key.c ) ));
2256 0 : fd_tower_print( ctx->tower, ctx->root );
2257 :
2258 0 : fd_bank_hash_cmp_t * bank_hash_cmp = ctx->epoch_ctx->bank_hash_cmp;
2259 0 : bank_hash_cmp->total_stake = ctx->epoch->total_stake;
2260 0 : bank_hash_cmp->watermark = snapshot_slot;
2261 :
2262 0 : fd_epoch_fork_elem_t * curr_entry = &ctx->epoch_forks->forks[ 0 ];
2263 :
2264 0 : if( strlen( ctx->genesis ) > 0 ) {
2265 0 : curr_entry->parent_slot = 0UL;
2266 0 : curr_entry->epoch = 0UL;
2267 0 : } else {
2268 0 : fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->epoch_ctx );
2269 :
2270 0 : ulong curr_epoch = fd_slot_to_epoch( &epoch_bank->epoch_schedule, ctx->curr_slot, NULL );
2271 0 : ulong last_slot_in_epoch = fd_ulong_sat_sub( fd_epoch_slot0( &epoch_bank->epoch_schedule, curr_epoch), 1UL );
2272 :
2273 0 : curr_entry->parent_slot = fd_ulong_min( ctx->parent_slot, last_slot_in_epoch );
2274 0 : curr_entry->epoch = curr_epoch;
2275 0 : }
2276 :
2277 0 : curr_entry->epoch_ctx = ctx->epoch_ctx;
2278 0 : ctx->epoch_forks->curr_epoch_idx = 0UL;
2279 :
2280 0 : FD_LOG_NOTICE(( "snapshot slot %lu", snapshot_slot ));
2281 0 : FD_LOG_NOTICE(( "total stake %lu", bank_hash_cmp->total_stake ));
2282 0 : }
2283 :
2284 : void
2285 : init_snapshot( fd_replay_tile_ctx_t * ctx,
2286 0 : fd_stem_context_t * stem ) {
2287 0 : FD_LOG_NOTICE(( "init snapshot" ));
2288 : /* Init slot_ctx */
2289 :
2290 0 : fd_exec_slot_ctx_t slot_ctx = {0};
2291 0 : ctx->slot_ctx = fd_exec_slot_ctx_join( fd_exec_slot_ctx_new( &slot_ctx, ctx->runtime_spad ) );
2292 0 : ctx->slot_ctx->acc_mgr = ctx->acc_mgr;
2293 0 : ctx->slot_ctx->blockstore = ctx->blockstore;
2294 0 : ctx->slot_ctx->epoch_ctx = ctx->epoch_ctx;
2295 0 : ctx->slot_ctx->status_cache = ctx->status_cache;
2296 0 : fd_runtime_update_slots_per_epoch( ctx->slot_ctx, FD_DEFAULT_SLOTS_PER_EPOCH, ctx->runtime_spad );
2297 :
2298 0 : uchar is_snapshot = strlen( ctx->snapshot ) > 0;
2299 0 : if( is_snapshot ) {
2300 0 : read_snapshot( ctx, stem, ctx->snapshot, ctx->incremental );
2301 0 : }
2302 :
2303 0 : fd_runtime_read_genesis( ctx->slot_ctx,
2304 0 : ctx->genesis,
2305 0 : is_snapshot,
2306 0 : ctx->capture_ctx,
2307 0 : ctx->tpool,
2308 0 : ctx->runtime_spad );
2309 0 : ctx->epoch_ctx->bank_hash_cmp = ctx->bank_hash_cmp;
2310 0 : ctx->epoch_ctx->replay_public = ctx->replay_public;
2311 0 : init_after_snapshot( ctx );
2312 :
2313 : /* Redirect ctx->slot_ctx to point to the memory inside forks. */
2314 :
2315 0 : fd_fork_t * fork = fd_forks_query( ctx->forks, ctx->curr_slot );
2316 0 : ctx->slot_ctx = &fork->slot_ctx;
2317 :
2318 : // Tell the world about the current activate features
2319 0 : fd_memcpy ( &ctx->replay_public->features, &ctx->slot_ctx->epoch_ctx->features, sizeof(ctx->replay_public->features) );
2320 :
2321 0 : FD_TEST( ctx->slot_ctx );
2322 0 : }
2323 :
2324 : static void
2325 : publish_votes_to_plugin( fd_replay_tile_ctx_t * ctx,
2326 0 : fd_stem_context_t * stem ) {
2327 0 : uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->votes_plugin_out_mem, ctx->votes_plugin_out_chunk );
2328 :
2329 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &ctx->curr_slot, NULL, ctx->forks->pool );
2330 0 : if( FD_UNLIKELY ( !fork ) ) return;
2331 0 : fd_vote_accounts_t * accts = &fork->slot_ctx.slot_bank.epoch_stakes;
2332 0 : fd_vote_accounts_pair_t_mapnode_t * root = accts->vote_accounts_root;
2333 0 : fd_vote_accounts_pair_t_mapnode_t * pool = accts->vote_accounts_pool;
2334 :
2335 0 : ulong i = 0;
2336 0 : for( fd_vote_accounts_pair_t_mapnode_t const * n = fd_vote_accounts_pair_t_map_minimum_const( pool, root );
2337 0 : n && i < FD_CLUSTER_NODE_CNT;
2338 0 : n = fd_vote_accounts_pair_t_map_successor_const( pool, n ) ) {
2339 0 : if( n->elem.stake == 0 ) continue;
2340 :
2341 : /* TODO: Define a helper that gets specific fields. */
2342 0 : fd_bincode_decode_ctx_t dec_ctx = {
2343 0 : .data = n->elem.value.data,
2344 0 : .dataend = n->elem.value.data + n->elem.value.data_len,
2345 0 : };
2346 :
2347 0 : ulong total_sz = 0UL;
2348 0 : int err = fd_vote_state_versioned_decode_footprint( &dec_ctx, &total_sz );
2349 0 : if( FD_UNLIKELY( err ) ) {
2350 0 : FD_LOG_ERR(( "Unexpected failure in decoding vote state" ));
2351 0 : }
2352 :
2353 0 : uchar * mem = fd_spad_alloc( ctx->runtime_spad, fd_vote_state_versioned_align(), total_sz );
2354 0 : if( FD_UNLIKELY( !mem ) ) {
2355 0 : FD_LOG_ERR(( "Unable to allocate memory for memory" ));
2356 0 : }
2357 :
2358 0 : fd_vote_state_versioned_t * vsv = fd_vote_state_versioned_decode( mem, &dec_ctx );
2359 :
2360 0 : fd_pubkey_t node_pubkey;
2361 0 : ulong last_ts_slot;
2362 0 : switch( vsv->discriminant ) {
2363 0 : case fd_vote_state_versioned_enum_v0_23_5:
2364 0 : node_pubkey = vsv->inner.v0_23_5.node_pubkey;
2365 0 : last_ts_slot = vsv->inner.v0_23_5.last_timestamp.slot;
2366 0 : break;
2367 0 : case fd_vote_state_versioned_enum_v1_14_11:
2368 0 : node_pubkey = vsv->inner.v1_14_11.node_pubkey;
2369 0 : last_ts_slot = vsv->inner.v1_14_11.last_timestamp.slot;
2370 0 : break;
2371 0 : case fd_vote_state_versioned_enum_current:
2372 0 : node_pubkey = vsv->inner.current.node_pubkey;
2373 0 : last_ts_slot = vsv->inner.current.last_timestamp.slot;
2374 0 : break;
2375 0 : default:
2376 0 : __builtin_unreachable();
2377 0 : }
2378 :
2379 0 : fd_vote_update_msg_t * msg = (fd_vote_update_msg_t *)(dst + sizeof(ulong) + i*112U);
2380 0 : memset( msg, 0, 112U );
2381 0 : memcpy( msg->vote_pubkey, n->elem.key.uc, sizeof(fd_pubkey_t) );
2382 0 : memcpy( msg->node_pubkey, node_pubkey.uc, sizeof(fd_pubkey_t) );
2383 0 : msg->activated_stake = n->elem.stake;
2384 0 : msg->last_vote = last_ts_slot;
2385 0 : msg->is_delinquent = (uchar)(msg->last_vote == 0);
2386 0 : ++i;
2387 0 : }
2388 :
2389 0 : *(ulong *)dst = i;
2390 :
2391 0 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
2392 0 : fd_stem_publish( stem, ctx->votes_plug_out_idx, FD_PLUGIN_MSG_VOTE_ACCOUNT_UPDATE, ctx->votes_plugin_out_chunk, 0, 0UL, 0UL, tspub );
2393 0 : ctx->votes_plugin_out_chunk = fd_dcache_compact_next( ctx->votes_plugin_out_chunk, 8UL + 40200UL*(58UL+12UL*34UL), ctx->votes_plugin_out_chunk0, ctx->votes_plugin_out_wmark );
2394 0 : }
2395 :
2396 : /* after_credit runs on every iteration of the replay tile loop except
2397 : when backpressured.
2398 :
2399 : This callback spin-loops for whether the blockstore is ready to join.
2400 : We need to join a blockstore and load a snapshot before we can begin
2401 : replaying.
2402 :
2403 : store_int is responsible for initializing the blockstore (either by
2404 : calling new or restoring an existing one). Once the blockstore is
2405 : available in the wksp (discovered via tag_query), we join the
2406 : blockstore and load the snapshot. */
2407 : static void
2408 : after_credit( fd_replay_tile_ctx_t * ctx,
2409 : fd_stem_context_t * stem,
2410 : int * opt_poll_in FD_PARAM_UNUSED,
2411 0 : int * charge_busy ) {
2412 0 : (void)opt_poll_in;
2413 :
2414 0 : exec_slices( ctx, stem, ctx->curr_slot );
2415 :
2416 0 : ulong curr_slot = ctx->curr_slot;
2417 0 : ulong parent_slot = ctx->parent_slot;
2418 0 : ulong flags = ctx->flags;
2419 0 : ulong bank_idx = ctx->bank_idx;
2420 :
2421 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &ctx->curr_slot, NULL, ctx->forks->pool );
2422 :
2423 0 : ulong txn_cnt = ctx->txn_cnt;
2424 0 : fd_replay_out_ctx_t * bank_out = &ctx->bank_out[ bank_idx ];
2425 0 : fd_txn_p_t * txns = (fd_txn_p_t *)fd_chunk_to_laddr( bank_out->mem, bank_out->chunk );
2426 : /**********************************************************************/
2427 : /* Cleanup and handle consensus after replaying the whole block */
2428 : /**********************************************************************/
2429 :
2430 0 : if( FD_UNLIKELY( (flags & REPLAY_FLAG_FINISHED_BLOCK) && ( ctx->last_completed_slot != curr_slot )) ) {
2431 0 : fork->slot_ctx.txn_count = fork->slot_ctx.slot_bank.transaction_count-fork->slot_ctx.parent_transaction_count;
2432 0 : FD_LOG_WARNING(( "finished block - slot: %lu, parent_slot: %lu, txn_cnt: %lu, blockhash: %s",
2433 0 : curr_slot,
2434 0 : ctx->parent_slot,
2435 0 : fork->slot_ctx.txn_count,
2436 0 : FD_BASE58_ENC_32_ALLOCA( ctx->blockhash.uc ) ));
2437 0 : ctx->last_completed_slot = curr_slot;
2438 :
2439 : /**************************************************************************************************/
2440 : /* Call fd_runtime_block_execute_finalize_tpool which updates sysvar and cleanup some other stuff */
2441 : /**************************************************************************************************/
2442 :
2443 0 : fd_runtime_block_info_t runtime_block_info[1];
2444 0 : runtime_block_info->signature_cnt = fork->slot_ctx.signature_cnt;
2445 :
2446 : /* Destroy the slot history */
2447 0 : fd_slot_history_destroy( fork->slot_ctx.slot_history );
2448 0 : for( ulong i = 0UL; i<ctx->bank_cnt; i++ ) {
2449 0 : fd_tpool_wait( ctx->tpool, i+1 );
2450 0 : }
2451 :
2452 0 : int res = fd_runtime_block_execute_finalize_tpool( &fork->slot_ctx, ctx->capture_ctx, runtime_block_info, ctx->tpool, ctx->runtime_spad );
2453 0 : if( res != FD_RUNTIME_EXECUTE_SUCCESS ) {
2454 0 : FD_LOG_ERR(( "block finished failed" ));
2455 0 : }
2456 :
2457 0 : fd_spad_pop( ctx->runtime_spad );
2458 0 : FD_LOG_NOTICE(( "Spad memory after executing block %lu", ctx->runtime_spad->mem_used ));
2459 : /**********************************************************************/
2460 : /* Push notifications for slot updates and reset block_info flag */
2461 : /**********************************************************************/
2462 :
2463 0 : ulong block_entry_height = 0;
2464 0 : for(;;){
2465 0 : fd_block_map_query_t query[1] = { 0 };
2466 0 : int err = fd_block_map_query_try( ctx->blockstore->block_map, &curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
2467 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
2468 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) FD_LOG_ERR(( "Failed to query blockstore for slot %lu", curr_slot ));
2469 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
2470 0 : block_entry_height = block_info->block_height;
2471 0 : if( FD_UNLIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) break;
2472 0 : }
2473 :
2474 0 : publish_slot_notifications( ctx, stem, fork, block_entry_height, curr_slot );
2475 :
2476 0 : ctx->blockstore->shmem->lps = curr_slot;
2477 :
2478 : /**********************************************************************/
2479 : /* Unlock the fork meaning that execution of the fork is now complete */
2480 : /**********************************************************************/
2481 0 : FD_TEST(fork->slot == curr_slot);
2482 0 : fork->lock = 0;
2483 :
2484 : /**********************************************************************/
2485 : /* Consensus: update ghost and forks */
2486 : /**********************************************************************/
2487 :
2488 0 : FD_PARAM_UNUSED long tic_ = fd_log_wallclock();
2489 0 : fd_ghost_node_t const * ghost_node = fd_ghost_insert( ctx->ghost, parent_slot, curr_slot );
2490 0 : #if FD_GHOST_USE_HANDHOLDING
2491 0 : if( FD_UNLIKELY( !ghost_node ) ) {
2492 0 : FD_LOG_ERR(( "failed to insert ghost node %lu", fork->slot ));
2493 0 : }
2494 0 : #endif
2495 0 : fd_forks_update( ctx->forks, ctx->epoch, ctx->funk, ctx->ghost, fork->slot );
2496 :
2497 : /**********************************************************************/
2498 : /* Consensus: decide (1) the fork for pack; (2) the fork to vote on */
2499 : /**********************************************************************/
2500 :
2501 0 : ulong reset_slot = fd_tower_reset_slot( ctx->tower, ctx->epoch, ctx->ghost );
2502 0 : fd_fork_t const * reset_fork = fd_forks_query_const( ctx->forks, reset_slot );
2503 0 : if( FD_UNLIKELY( !reset_fork ) ) {
2504 0 : FD_LOG_ERR( ( "failed to find reset fork %lu", reset_slot ) );
2505 0 : }
2506 0 : if( reset_fork->lock ) {
2507 0 : FD_LOG_WARNING(("RESET FORK FROZEN: %lu", reset_fork->slot ));
2508 0 : fd_fork_t * new_reset_fork = fd_forks_prepare( ctx->forks, reset_fork->slot_ctx.slot_bank.prev_slot, ctx->acc_mgr,
2509 0 : ctx->blockstore, ctx->epoch_ctx, ctx->funk, ctx->runtime_spad );
2510 0 : new_reset_fork->lock = 0;
2511 0 : reset_fork = new_reset_fork;
2512 0 : }
2513 :
2514 : /* Update the gui */
2515 0 : if( ctx->replay_plugin_out_mem ) {
2516 : /* FIXME. We need a more efficient way to compute the ancestor chain. */
2517 0 : uchar msg[4098*8] __attribute__( ( aligned( 8U ) ) );
2518 0 : fd_memset( msg, 0, sizeof(msg) );
2519 0 : ulong s = reset_fork->slot_ctx.slot_bank.slot;
2520 0 : *(ulong*)(msg + 16U) = s;
2521 0 : ulong i = 0;
2522 0 : do {
2523 0 : if( !fd_blockstore_block_info_test( ctx->blockstore, s ) ) {
2524 0 : break;
2525 0 : }
2526 0 : s = fd_blockstore_parent_slot_query( ctx->blockstore, s );
2527 0 : if( s < ctx->blockstore->shmem->wmk ) {
2528 0 : break;
2529 0 : }
2530 :
2531 0 : *(ulong*)(msg + 24U + i*8U) = s;
2532 0 : if( ++i == 4095U ) {
2533 0 : break;
2534 0 : }
2535 0 : } while( 1 );
2536 0 : *(ulong*)(msg + 8U) = i;
2537 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_SLOT_RESET, msg, sizeof(msg) );
2538 0 : }
2539 :
2540 0 : fd_microblock_trailer_t * microblock_trailer = (fd_microblock_trailer_t *)(txns + txn_cnt);
2541 0 : memcpy( microblock_trailer->hash, reset_fork->slot_ctx.slot_bank.block_hash_queue.last_hash->uc, sizeof(fd_hash_t) );
2542 0 : if( ctx->poh_init_done == 1 ) {
2543 0 : ulong parent_slot = reset_fork->slot_ctx.slot_bank.prev_slot;
2544 0 : ulong curr_slot = reset_fork->slot_ctx.slot_bank.slot;
2545 0 : FD_LOG_DEBUG(( "publishing mblk to poh - slot: %lu, parent_slot: %lu, flags: %lx", curr_slot, parent_slot, flags ));
2546 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
2547 0 : ulong sig = fd_disco_replay_old_sig( curr_slot, flags );
2548 0 : fd_mcache_publish( bank_out->mcache, bank_out->depth, bank_out->seq, sig, bank_out->chunk, txn_cnt, 0UL, 0, tspub );
2549 0 : bank_out->chunk = fd_dcache_compact_next( bank_out->chunk, (txn_cnt * sizeof(fd_txn_p_t)) + sizeof(fd_microblock_trailer_t), bank_out->chunk0, bank_out->wmark );
2550 0 : bank_out->seq = fd_seq_inc( bank_out->seq, 1UL );
2551 0 : } else {
2552 0 : FD_LOG_DEBUG(( "NOT publishing mblk to poh - slot: %lu, parent_slot: %lu, flags: %lx", curr_slot, ctx->parent_slot, flags ));
2553 0 : }
2554 :
2555 0 : fd_forks_print( ctx->forks );
2556 0 : fd_ghost_print( ctx->ghost, ctx->epoch, fd_ghost_root( ctx->ghost ) );
2557 0 : fd_tower_print( ctx->tower, ctx->root );
2558 :
2559 0 : fd_fork_t * child = fd_fork_frontier_ele_query( ctx->forks->frontier, &fork->slot, NULL, ctx->forks->pool );
2560 0 : ulong vote_slot = fd_tower_vote_slot( ctx->tower, ctx->epoch, ctx->funk, child->slot_ctx.funk_txn, ctx->ghost, ctx->runtime_spad );
2561 :
2562 0 : FD_LOG_NOTICE( ( "\n\n[Fork Selection]\n"
2563 0 : "# of vote accounts: %lu\n"
2564 0 : "best fork: %lu\n",
2565 0 : fd_epoch_voters_key_cnt( fd_epoch_voters( ctx->epoch ) ),
2566 0 : fd_ghost_head( ctx->ghost, fd_ghost_root( ctx->ghost ) )->slot ) );
2567 :
2568 : /**********************************************************************/
2569 : /* Consensus: send out a new vote by calling send_tower_sync */
2570 : /**********************************************************************/
2571 :
2572 0 : if( FD_UNLIKELY( ctx->vote && fd_fseq_query( ctx->poh ) == ULONG_MAX ) ) {
2573 : /* Only proceed with voting if we're caught up. */
2574 :
2575 0 : FD_LOG_WARNING(( "still catching up. not voting." ));
2576 0 : } else {
2577 0 : if( FD_UNLIKELY( !ctx->is_caught_up ) ) {
2578 0 : ctx->is_caught_up = 1;
2579 0 : }
2580 :
2581 : /* Proceed according to how local and cluster are synchronized. */
2582 :
2583 0 : if( FD_LIKELY( vote_slot != FD_SLOT_NULL ) ) {
2584 :
2585 : /* Invariant check: the vote_slot must be in the frontier */
2586 :
2587 0 : FD_TEST( fd_forks_query_const( ctx->forks, vote_slot ) );
2588 :
2589 : /* Vote locally */
2590 :
2591 0 : ulong root = fd_tower_vote( ctx->tower, vote_slot );
2592 0 : ctx->metrics.last_voted_slot = vote_slot;
2593 :
2594 : /* Update to a new root, if there is one. */
2595 :
2596 0 : if ( FD_LIKELY ( root != FD_SLOT_NULL ) ) ctx->root = root; /* optimize for full tower (replay is keeping up) */
2597 0 : }
2598 :
2599 : /* Send our updated tower to the cluster. */
2600 :
2601 0 : send_tower_sync( ctx );
2602 0 : }
2603 :
2604 : /**********************************************************************/
2605 : /* Prepare bank for the next execution and write to debugging files */
2606 : /**********************************************************************/
2607 :
2608 0 : ulong prev_slot = child->slot_ctx.slot_bank.slot;
2609 0 : child->slot_ctx.slot_bank.slot = curr_slot;
2610 0 : child->slot_ctx.slot_bank.collected_execution_fees = 0;
2611 0 : child->slot_ctx.slot_bank.collected_priority_fees = 0;
2612 0 : child->slot_ctx.slot_bank.collected_rent = 0;
2613 :
2614 0 : if( FD_UNLIKELY( ctx->slots_replayed_file ) ) {
2615 0 : FD_LOG_DEBUG(( "writing %lu to slots file", prev_slot ));
2616 0 : fprintf( ctx->slots_replayed_file, "%lu\n", prev_slot );
2617 0 : fflush( ctx->slots_replayed_file );
2618 0 : }
2619 :
2620 0 : if (NULL != ctx->capture_ctx) {
2621 0 : fd_solcap_writer_flush( ctx->capture_ctx->capture );
2622 0 : }
2623 :
2624 : /**********************************************************************/
2625 : /* Bank hash comparison, and halt if there's a mismatch after replay */
2626 : /**********************************************************************/
2627 :
2628 0 : fd_hash_t const * bank_hash = &child->slot_ctx.slot_bank.banks_hash;
2629 0 : fd_bank_hash_cmp_t * bank_hash_cmp = child->slot_ctx.epoch_ctx->bank_hash_cmp;
2630 0 : fd_bank_hash_cmp_lock( bank_hash_cmp );
2631 0 : fd_bank_hash_cmp_insert( bank_hash_cmp, curr_slot, bank_hash, 1, 0 );
2632 :
2633 : /* Try to move the bank hash comparison watermark forward */
2634 0 : for( ulong cmp_slot = bank_hash_cmp->watermark + 1; cmp_slot < curr_slot; cmp_slot++ ) {
2635 0 : int rc = fd_bank_hash_cmp_check( bank_hash_cmp, cmp_slot );
2636 0 : switch ( rc ) {
2637 0 : case -1:
2638 :
2639 : /* Mismatch */
2640 :
2641 0 : funk_cancel( ctx, cmp_slot );
2642 0 : checkpt( ctx );
2643 0 : FD_LOG_ERR(( "Bank hash mismatch on slot: %lu. Halting.", cmp_slot ));
2644 :
2645 0 : break;
2646 :
2647 0 : case 0:
2648 :
2649 : /* Not ready */
2650 :
2651 0 : break;
2652 :
2653 0 : case 1:
2654 :
2655 : /* Match*/
2656 :
2657 0 : bank_hash_cmp->watermark = cmp_slot;
2658 0 : break;
2659 :
2660 0 : default:;
2661 0 : }
2662 0 : }
2663 :
2664 0 : fd_bank_hash_cmp_unlock( bank_hash_cmp );
2665 0 : } // end of if( FD_UNLIKELY( ( flags & REPLAY_FLAG_FINISHED_BLOCK ) ) )
2666 :
2667 0 : if( FD_UNLIKELY( ctx->snapshot_init_done==0 ) ) {
2668 0 : init_snapshot( ctx, stem );
2669 0 : ctx->snapshot_init_done = 1;
2670 0 : *charge_busy = 1;
2671 0 : if( ctx->replay_plugin_out_mem ) {
2672 : // ValidatorStartProgress::Running
2673 0 : uchar msg[56];
2674 0 : fd_memset( msg, 0, sizeof(msg) );
2675 0 : msg[0] = 11;
2676 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
2677 0 : }
2678 0 : }
2679 :
2680 0 : long now = fd_log_wallclock();
2681 0 : if( ctx->votes_plugin_out_mem && FD_UNLIKELY( ( now - ctx->last_plugin_push_time )>PLUGIN_PUBLISH_TIME_NS ) ) {
2682 0 : ctx->last_plugin_push_time = now;
2683 0 : publish_votes_to_plugin( ctx, stem );
2684 0 : }
2685 :
2686 0 : }
2687 :
2688 : static void
2689 0 : during_housekeeping( void * _ctx ) {
2690 :
2691 0 : fd_replay_tile_ctx_t * ctx = (fd_replay_tile_ctx_t *)_ctx;
2692 :
2693 : /* Update watermark. The publish watermark is the minimum of the tower
2694 : root and supermajority root. */
2695 :
2696 0 : ulong wmark = fd_ulong_min( ctx->root, ctx->forks->finalized );
2697 :
2698 0 : if ( FD_LIKELY( wmark <= fd_fseq_query( ctx->published_wmark ) ) ) return;
2699 0 : FD_LOG_NOTICE(( "wmk %lu => %lu", fd_fseq_query( ctx->published_wmark ), wmark ));
2700 :
2701 0 : fd_funk_txn_xid_t xid = { .ul = { wmark, wmark } };
2702 0 : if( FD_LIKELY( ctx->blockstore ) ) fd_blockstore_publish( ctx->blockstore, ctx->blockstore_fd, wmark );
2703 0 : if( FD_LIKELY( ctx->forks ) ) fd_forks_publish( ctx->forks, wmark, ctx->ghost );
2704 0 : if( FD_LIKELY( ctx->funk ) ) funk_and_txncache_publish( ctx, wmark, &xid );
2705 0 : if( FD_LIKELY( ctx->ghost ) ) {
2706 0 : fd_epoch_forks_publish( ctx->epoch_forks, ctx->ghost, wmark );
2707 0 : fd_ghost_publish( ctx->ghost, wmark );
2708 0 : }
2709 :
2710 0 : fd_fseq_update( ctx->published_wmark, wmark );
2711 :
2712 :
2713 : // fd_mcache_seq_update( ctx->store_out_sync, ctx->store_out_seq );
2714 0 : }
2715 :
2716 : static void
2717 : privileged_init( fd_topo_t * topo,
2718 0 : fd_topo_tile_t * tile ) {
2719 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
2720 :
2721 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
2722 0 : fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
2723 0 : FD_SCRATCH_ALLOC_FINI ( l, scratch_align() );
2724 0 : memset( ctx, 0, sizeof(fd_replay_tile_ctx_t) );
2725 :
2726 0 : FD_TEST( sizeof(ulong) == getrandom( &ctx->funk_seed, sizeof(ulong), 0 ) );
2727 0 : FD_TEST( sizeof(ulong) == getrandom( &ctx->status_cache_seed, sizeof(ulong), 0 ) );
2728 :
2729 0 : ctx->blockstore_fd = open( tile->replay.blockstore_file, O_RDWR | O_CREAT, 0666 );
2730 0 : if ( FD_UNLIKELY( ctx->blockstore_fd == -1 ) ) {
2731 0 : FD_LOG_ERR(( "failed to open or create blockstore archival file %s %d %d %s", tile->replay.blockstore_file, ctx->blockstore_fd, errno, strerror(errno) ));
2732 0 : }
2733 0 : }
2734 :
2735 : static void
2736 : unprivileged_init( fd_topo_t * topo,
2737 0 : fd_topo_tile_t * tile ) {
2738 :
2739 0 : FD_LOG_NOTICE(("finished unprivileged init"));
2740 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
2741 :
2742 0 : if( FD_UNLIKELY( tile->in_cnt < 4 ||
2743 0 : strcmp( topo->links[ tile->in_link_id[ STORE_IN_IDX ] ].name, "store_replay" ) ||
2744 0 : strcmp( topo->links[ tile->in_link_id[ PACK_IN_IDX ] ].name, "pack_replay") ||
2745 0 : strcmp( topo->links[ tile->in_link_id[ BATCH_IN_IDX ] ].name, "batch_replay" ) ||
2746 0 : strcmp( topo->links[ tile->in_link_id[ SHRED_IN_IDX ] ].name, "shred_replay" ) ) ) {
2747 0 : FD_LOG_ERR(( "replay tile has none or unexpected input links %lu %s %s",
2748 0 : tile->in_cnt, topo->links[ tile->in_link_id[ 0 ] ].name, topo->links[ tile->in_link_id[ 1 ] ].name ));
2749 0 : }
2750 :
2751 : /**********************************************************************/
2752 : /* scratch (bump)-allocate memory owned by the replay tile */
2753 : /**********************************************************************/
2754 :
2755 : /* Do not modify order! This is join-order in unprivileged_init. */
2756 :
2757 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
2758 0 : fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
2759 0 : void * alloc_shmem = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
2760 0 : void * capture_ctx_mem = FD_SCRATCH_ALLOC_APPEND( l, FD_CAPTURE_CTX_ALIGN, FD_CAPTURE_CTX_FOOTPRINT );
2761 0 : void * epoch_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_epoch_align(), fd_epoch_footprint( FD_VOTER_MAX ) );
2762 0 : void * forks_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_forks_align(), fd_forks_footprint( FD_BLOCK_MAX ) );
2763 0 : void * ghost_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_ghost_align(), fd_ghost_footprint( FD_BLOCK_MAX ) );
2764 0 : void * tower_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_tower_align(), fd_tower_footprint() );
2765 0 : void * replay_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_replay_align(), fd_replay_footprint( tile->replay.fec_max, FD_SHRED_MAX_PER_SLOT, FD_BLOCK_MAX ) );
2766 0 : void * bank_hash_cmp_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_bank_hash_cmp_align(), fd_bank_hash_cmp_footprint( ) );
2767 0 : for( ulong i = 0UL; i<FD_PACK_MAX_BANK_TILES; i++ ) {
2768 0 : ctx->bmtree[i] = FD_SCRATCH_ALLOC_APPEND( l, FD_BMTREE_COMMIT_ALIGN, FD_BMTREE_COMMIT_FOOTPRINT(0) );
2769 0 : }
2770 0 : void * mbatch_mem = FD_SCRATCH_ALLOC_APPEND( l, 128UL, FD_SLICE_MAX );
2771 0 : ulong thread_spad_size = fd_spad_footprint( FD_RUNTIME_TRANSACTION_EXECUTION_FOOTPRINT_DEFAULT );
2772 0 : void * spad_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_spad_align(), tile->replay.tpool_thread_count * fd_ulong_align_up( thread_spad_size, fd_spad_align() ) + FD_RUNTIME_BLOCK_EXECUTION_FOOTPRINT );
2773 0 : ulong scratch_alloc_mem = FD_SCRATCH_ALLOC_FINI ( l, scratch_align() );
2774 :
2775 0 : if( FD_UNLIKELY( scratch_alloc_mem != ( (ulong)scratch + scratch_footprint( tile ) ) ) ) {
2776 0 : FD_LOG_ERR( ( "scratch_alloc_mem did not match scratch_footprint diff: %lu alloc: %lu footprint: %lu",
2777 0 : scratch_alloc_mem - (ulong)scratch - scratch_footprint( tile ),
2778 0 : scratch_alloc_mem,
2779 0 : (ulong)scratch + scratch_footprint( tile ) ) );
2780 0 : }
2781 :
2782 : /**********************************************************************/
2783 : /* wksp */
2784 : /**********************************************************************/
2785 :
2786 0 : ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
2787 :
2788 0 : ulong blockstore_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "blockstore" );
2789 0 : FD_TEST( blockstore_obj_id!=ULONG_MAX );
2790 0 : ctx->blockstore_wksp = topo->workspaces[ topo->objs[ blockstore_obj_id ].wksp_id ].wksp;
2791 0 : if( ctx->blockstore_wksp==NULL ) {
2792 0 : FD_LOG_ERR(( "no blockstore wksp" ));
2793 0 : }
2794 :
2795 0 : ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, fd_topo_obj_laddr( topo, blockstore_obj_id ) );
2796 0 : fd_buf_shred_pool_reset( ctx->blockstore->shred_pool, 0 );
2797 0 : FD_TEST( ctx->blockstore->shmem->magic == FD_BLOCKSTORE_MAGIC );
2798 :
2799 0 : ulong status_cache_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "txncache" );
2800 0 : FD_TEST( status_cache_obj_id != ULONG_MAX );
2801 0 : ctx->status_cache_wksp = topo->workspaces[topo->objs[status_cache_obj_id].wksp_id].wksp;
2802 0 : if( ctx->status_cache_wksp == NULL ) {
2803 0 : FD_LOG_ERR(( "no status cache wksp" ));
2804 0 : }
2805 :
2806 : /**********************************************************************/
2807 : /* snapshot */
2808 : /**********************************************************************/
2809 :
2810 0 : ctx->snapshot_interval = tile->replay.full_interval ? tile->replay.full_interval : ULONG_MAX;
2811 0 : ctx->incremental_interval = tile->replay.incremental_interval ? tile->replay.incremental_interval : ULONG_MAX;
2812 0 : ctx->last_full_snap = 0UL;
2813 :
2814 0 : FD_LOG_NOTICE(( "Snapshot intervals full=%lu incremental=%lu", ctx->snapshot_interval, ctx->incremental_interval ));
2815 :
2816 : /**********************************************************************/
2817 : /* funk */
2818 : /**********************************************************************/
2819 :
2820 : /* TODO: This below code needs to be shared as a topology object. This
2821 : will involve adding support to create a funk-based file here. */
2822 0 : fd_funk_t * funk;
2823 0 : const char * snapshot = tile->replay.snapshot;
2824 0 : if( strcmp( snapshot, "funk" ) == 0 ) {
2825 : /* Funk database already exists. The parameters are actually mostly ignored. */
2826 0 : funk = fd_funk_open_file(
2827 0 : tile->replay.funk_file, 1, ctx->funk_seed, tile->replay.funk_txn_max,
2828 0 : tile->replay.funk_rec_max, tile->replay.funk_sz_gb * (1UL<<30),
2829 0 : FD_FUNK_READ_WRITE, NULL );
2830 0 : } else if( strncmp( snapshot, "wksp:", 5 ) == 0) {
2831 : /* Recover funk database from a checkpoint. */
2832 0 : funk = fd_funk_recover_checkpoint( tile->replay.funk_file, 1, snapshot+5, NULL );
2833 0 : } else {
2834 : /* Create new funk database */
2835 0 : funk = fd_funk_open_file(
2836 0 : tile->replay.funk_file, 1, ctx->funk_seed, tile->replay.funk_txn_max,
2837 0 : tile->replay.funk_rec_max, tile->replay.funk_sz_gb * (1UL<<30),
2838 0 : FD_FUNK_OVERWRITE, NULL );
2839 0 : FD_LOG_NOTICE(( "Opened funk file at %s", tile->replay.funk_file ));
2840 0 : }
2841 0 : if( FD_UNLIKELY( funk == NULL ) ) {
2842 0 : FD_LOG_ERR(( "no funk loaded" ));
2843 0 : }
2844 0 : ctx->funk = funk;
2845 0 : ctx->funk_wksp = fd_funk_wksp( funk );
2846 0 : if( FD_UNLIKELY( ctx->funk_wksp == NULL ) ) {
2847 0 : FD_LOG_ERR(( "no funk wksp" ));
2848 0 : }
2849 :
2850 0 : ctx->is_caught_up = 0;
2851 :
2852 : /**********************************************************************/
2853 : /* root_slot fseq */
2854 : /**********************************************************************/
2855 :
2856 0 : ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
2857 0 : FD_TEST( root_slot_obj_id!=ULONG_MAX );
2858 0 : ctx->published_wmark = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
2859 0 : if( FD_UNLIKELY( !ctx->published_wmark ) ) FD_LOG_ERR(( "replay tile has no root_slot fseq" ));
2860 0 : FD_TEST( ULONG_MAX==fd_fseq_query( ctx->published_wmark ) );
2861 :
2862 : /**********************************************************************/
2863 : /* constipated fseq */
2864 : /**********************************************************************/
2865 :
2866 : /* When the replay tile boots, funk should not be constipated */
2867 :
2868 0 : ulong constipated_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "constipate" );
2869 0 : FD_TEST( constipated_obj_id!=ULONG_MAX );
2870 0 : ctx->is_constipated = fd_fseq_join( fd_topo_obj_laddr( topo, constipated_obj_id ) );
2871 0 : if( FD_UNLIKELY( !ctx->is_constipated ) ) FD_LOG_ERR(( "replay tile has no constipated fseq" ));
2872 0 : fd_fseq_update( ctx->is_constipated, 0UL );
2873 0 : FD_TEST( 0UL==fd_fseq_query( ctx->is_constipated ) );
2874 :
2875 : /**********************************************************************/
2876 : /* poh_slot fseq */
2877 : /**********************************************************************/
2878 :
2879 0 : ulong poh_slot_obj_id = fd_pod_query_ulong( topo->props, "poh_slot", ULONG_MAX );
2880 0 : FD_TEST( poh_slot_obj_id!=ULONG_MAX );
2881 0 : ctx->poh = fd_fseq_join( fd_topo_obj_laddr( topo, poh_slot_obj_id ) );
2882 :
2883 : /**********************************************************************/
2884 : /* TOML paths */
2885 : /**********************************************************************/
2886 :
2887 0 : ctx->blockstore_checkpt = tile->replay.blockstore_checkpt;
2888 0 : ctx->tx_metadata_storage = tile->replay.tx_metadata_storage;
2889 0 : ctx->funk_checkpt = tile->replay.funk_checkpt;
2890 0 : ctx->genesis = tile->replay.genesis;
2891 0 : ctx->incremental = tile->replay.incremental;
2892 0 : ctx->snapshot = tile->replay.snapshot;
2893 :
2894 : /**********************************************************************/
2895 : /* alloc */
2896 : /**********************************************************************/
2897 :
2898 0 : void * alloc_shalloc = fd_alloc_new( alloc_shmem, 3UL );
2899 0 : if( FD_UNLIKELY( !alloc_shalloc ) ) {
2900 0 : FD_LOG_ERR( ( "fd_alloc_new failed" ) ); }
2901 0 : ctx->alloc = fd_alloc_join( alloc_shalloc, 3UL );
2902 0 : if( FD_UNLIKELY( !ctx->alloc ) ) {
2903 0 : FD_LOG_ERR( ( "fd_alloc_join failed" ) );
2904 0 : }
2905 :
2906 : /**********************************************************************/
2907 : /* status cache */
2908 : /**********************************************************************/
2909 :
2910 0 : char const * status_cache_path = tile->replay.status_cache;
2911 0 : if ( strlen( status_cache_path ) > 0 ) {
2912 0 : FD_LOG_NOTICE(("starting status cache restore..."));
2913 0 : int err = fd_wksp_restore( ctx->status_cache_wksp, status_cache_path, (uint)ctx->status_cache_seed );
2914 0 : FD_LOG_NOTICE(("finished status cache restore..."));
2915 0 : if (err) {
2916 0 : FD_LOG_ERR(( "failed to restore %s: error %d", status_cache_path, err ));
2917 0 : }
2918 0 : fd_wksp_tag_query_info_t info;
2919 0 : ulong tag = FD_TXNCACHE_MAGIC;
2920 0 : if( fd_wksp_tag_query( ctx->status_cache_wksp, &tag, 1, &info, 1 ) > 0 ) {
2921 0 : void * status_cache_mem = fd_wksp_laddr_fast( ctx->status_cache_wksp, info.gaddr_lo );
2922 : /* Set up status cache. */
2923 0 : ctx->status_cache = fd_txncache_join( status_cache_mem );
2924 0 : if( ctx->status_cache == NULL ) {
2925 0 : FD_LOG_ERR(( "failed to join status cache in %s", status_cache_path ));
2926 0 : }
2927 0 : } else {
2928 0 : FD_LOG_ERR(( "failed to tag query status cache in %s", status_cache_path ));
2929 0 : }
2930 0 : } else {
2931 0 : void * status_cache_mem = fd_topo_obj_laddr( topo, status_cache_obj_id );
2932 0 : if (status_cache_mem == NULL) {
2933 0 : FD_LOG_ERR(( "failed to allocate status cache" ));
2934 0 : }
2935 0 : ctx->status_cache = fd_txncache_join( fd_txncache_new( status_cache_mem, FD_TXNCACHE_DEFAULT_MAX_ROOTED_SLOTS,
2936 0 : FD_TXNCACHE_DEFAULT_MAX_LIVE_SLOTS, MAX_CACHE_TXNS_PER_SLOT,
2937 0 : FD_TXNCACHE_DEFAULT_MAX_CONSTIPATED_SLOTS ) );
2938 0 : if (ctx->status_cache == NULL) {
2939 0 : fd_wksp_free_laddr(status_cache_mem);
2940 0 : FD_LOG_ERR(( "failed to join + new status cache" ));
2941 0 : }
2942 0 : }
2943 :
2944 : /**********************************************************************/
2945 : /* spad */
2946 : /**********************************************************************/
2947 :
2948 : /* TODO: The spad should probably have its own workspace. Eventually each
2949 : spad allocator should be bound to a transaction executor tile and should
2950 : be bounded out for the maximum amount of allocations used in the runtime. */
2951 :
2952 0 : uchar * spad_mem_cur = spad_mem;
2953 0 : for( ulong i=0UL; i<tile->replay.tpool_thread_count; i++ ) {
2954 0 : fd_spad_t * spad = fd_spad_join( fd_spad_new( spad_mem_cur, thread_spad_size ) );
2955 0 : ctx->exec_spads[ ctx->exec_spad_cnt++ ] = spad;
2956 0 : spad_mem_cur += fd_ulong_align_up( thread_spad_size, fd_spad_align() );
2957 0 : }
2958 :
2959 0 : ctx->runtime_spad = fd_spad_join( fd_spad_new( spad_mem_cur, FD_RUNTIME_BLOCK_EXECUTION_FOOTPRINT ) );
2960 0 : fd_spad_push( ctx->runtime_spad );
2961 :
2962 : /**********************************************************************/
2963 : /* epoch forks */
2964 : /**********************************************************************/
2965 :
2966 0 : void * epoch_ctx_mem = fd_spad_alloc( ctx->runtime_spad,
2967 0 : fd_exec_epoch_ctx_align(),
2968 0 : MAX_EPOCH_FORKS * fd_exec_epoch_ctx_footprint( VOTE_ACC_MAX ) );
2969 :
2970 :
2971 0 : fd_epoch_forks_new( ctx->epoch_forks, epoch_ctx_mem );
2972 :
2973 : /**********************************************************************/
2974 : /* joins */
2975 : /**********************************************************************/
2976 :
2977 0 : uchar * acc_mgr_shmem = fd_spad_alloc( ctx->runtime_spad, FD_ACC_MGR_ALIGN, FD_ACC_MGR_FOOTPRINT );
2978 0 : ctx->acc_mgr = fd_acc_mgr_new( acc_mgr_shmem, ctx->funk );
2979 0 : ctx->bank_hash_cmp = fd_bank_hash_cmp_join( fd_bank_hash_cmp_new( bank_hash_cmp_mem ) );
2980 0 : ctx->epoch_ctx = fd_exec_epoch_ctx_join( fd_exec_epoch_ctx_new( epoch_ctx_mem, VOTE_ACC_MAX ) );
2981 :
2982 0 : if( FD_UNLIKELY( sscanf( tile->replay.cluster_version, "%u.%u.%u", &ctx->epoch_ctx->epoch_bank.cluster_version[0], &ctx->epoch_ctx->epoch_bank.cluster_version[1], &ctx->epoch_ctx->epoch_bank.cluster_version[2] )!=3 ) ) {
2983 0 : FD_LOG_ERR(( "failed to decode cluster version, configured as \"%s\"", tile->replay.cluster_version ));
2984 0 : }
2985 0 : fd_features_enable_cleaned_up( &ctx->epoch_ctx->features, ctx->epoch_ctx->epoch_bank.cluster_version );
2986 :
2987 0 : ctx->epoch = fd_epoch_join( fd_epoch_new( epoch_mem, FD_VOTER_MAX ) );
2988 0 : ctx->forks = fd_forks_join( fd_forks_new( forks_mem, FD_BLOCK_MAX, 42UL ) );
2989 0 : ctx->ghost = fd_ghost_join( fd_ghost_new( ghost_mem, 42UL, FD_BLOCK_MAX ) );
2990 0 : ctx->tower = fd_tower_join( fd_tower_new( tower_mem ) );
2991 :
2992 0 : ctx->replay = fd_replay_join( fd_replay_new( replay_mem, tile->replay.fec_max, FD_SHRED_MAX_PER_SLOT, FD_BLOCK_MAX ) );
2993 :
2994 : /**********************************************************************/
2995 : /* voter */
2996 : /**********************************************************************/
2997 :
2998 0 : memcpy( ctx->validator_identity, fd_keyload_load( tile->replay.identity_key_path, 1 ), sizeof(fd_pubkey_t) );
2999 0 : *ctx->vote_authority = *ctx->validator_identity; /* FIXME */
3000 0 : memcpy( ctx->vote_acc, fd_keyload_load( tile->replay.vote_account_path, 1 ), sizeof(fd_pubkey_t) );
3001 :
3002 : /**********************************************************************/
3003 : /* entry batch */
3004 : /**********************************************************************/
3005 :
3006 0 : ctx->mbatch = mbatch_mem;
3007 0 : memset( &ctx->slice_exec_ctx, 0, sizeof(fd_slice_exec_ctx_t) );
3008 :
3009 : /**********************************************************************/
3010 : /* tpool */
3011 : /**********************************************************************/
3012 :
3013 0 : if( FD_LIKELY( tile->replay.tpool_thread_count > 1 ) ) {
3014 0 : tpool_boot( topo, tile->replay.tpool_thread_count );
3015 0 : }
3016 0 : ctx->tpool = fd_tpool_init( ctx->tpool_mem, tile->replay.tpool_thread_count );
3017 :
3018 0 : if( FD_LIKELY( tile->replay.tpool_thread_count > 1 ) ) {
3019 : /* Start the tpool workers */
3020 0 : for( ulong i=1UL; i<tile->replay.tpool_thread_count; i++ ) {
3021 0 : if( fd_tpool_worker_push( ctx->tpool, i, NULL, 0UL ) == NULL ) {
3022 0 : FD_LOG_ERR(( "failed to launch worker" ));
3023 0 : }
3024 0 : }
3025 0 : }
3026 :
3027 0 : if( ctx->tpool == NULL ) {
3028 0 : FD_LOG_ERR(("failed to create thread pool"));
3029 0 : }
3030 :
3031 : /**********************************************************************/
3032 : /* capture */
3033 : /**********************************************************************/
3034 :
3035 0 : if( strlen(tile->replay.capture) > 0 ) {
3036 0 : ctx->capture_ctx = fd_capture_ctx_new( capture_ctx_mem );
3037 0 : ctx->capture_ctx->checkpt_freq = ULONG_MAX;
3038 0 : ctx->capture_file = fopen( tile->replay.capture, "w+" );
3039 0 : if( FD_UNLIKELY( !ctx->capture_file ) ) {
3040 0 : FD_LOG_ERR(( "fopen(%s) failed (%d-%s)", tile->replay.capture, errno, strerror( errno ) ));
3041 0 : }
3042 0 : ctx->capture_ctx->capture_txns = 0;
3043 0 : fd_solcap_writer_init( ctx->capture_ctx->capture, ctx->capture_file );
3044 0 : }
3045 :
3046 : /**********************************************************************/
3047 : /* bank */
3048 : /**********************************************************************/
3049 :
3050 0 : ctx->bank_cnt = tile->replay.bank_tile_count;
3051 0 : for( ulong i=0UL; i<tile->replay.bank_tile_count; i++ ) {
3052 0 : ulong busy_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "bank_busy.%lu", i );
3053 0 : FD_TEST( busy_obj_id!=ULONG_MAX );
3054 0 : ctx->bank_busy[ i ] = fd_fseq_join( fd_topo_obj_laddr( topo, busy_obj_id ) );
3055 0 : if( FD_UNLIKELY( !ctx->bank_busy[ i ] ) ) FD_LOG_ERR(( "banking tile %lu has no busy flag", i ));
3056 :
3057 0 : fd_topo_link_t * poh_out_link = &topo->links[ tile->out_link_id[ POH_OUT_IDX+i ] ];
3058 0 : fd_replay_out_ctx_t * poh_out = &ctx->bank_out[ i ];
3059 0 : poh_out->mcache = poh_out_link->mcache;
3060 0 : poh_out->sync = fd_mcache_seq_laddr( poh_out->mcache );
3061 0 : poh_out->depth = fd_mcache_depth( poh_out->mcache );
3062 0 : poh_out->seq = fd_mcache_seq_query( poh_out->sync );
3063 0 : poh_out->mem = topo->workspaces[ topo->objs[ poh_out_link->dcache_obj_id ].wksp_id ].wksp;
3064 0 : poh_out->chunk0 = fd_dcache_compact_chunk0( poh_out->mem, poh_out_link->dcache );
3065 0 : poh_out->wmark = fd_dcache_compact_wmark( poh_out->mem, poh_out_link->dcache, poh_out_link->mtu );
3066 0 : poh_out->chunk = poh_out->chunk0;
3067 0 : }
3068 :
3069 0 : ctx->poh_init_done = 0U;
3070 0 : ctx->snapshot_init_done = 0;
3071 :
3072 : /**********************************************************************/
3073 : /* exec */
3074 : /**********************************************************************/
3075 0 : ctx->exec_cnt = tile->replay.exec_tile_count;
3076 0 : for( ulong i = 0UL; i < ctx->exec_cnt; i++ ) {
3077 0 : ulong idx = fd_topo_find_tile_out_link( topo, tile, "replay_exec", i );
3078 0 : fd_topo_link_t * exec_out_link = &topo->links[ tile->out_link_id[ idx ] ];
3079 :
3080 0 : if( strcmp( exec_out_link->name, "replay_exec" ) ) {
3081 0 : FD_LOG_ERR(("output link confusion for output %lu", idx ));
3082 0 : }
3083 :
3084 0 : fd_replay_out_ctx_t * exec_out = &ctx->exec_out[ i ];
3085 0 : exec_out->idx = idx;
3086 0 : exec_out->mem = topo->workspaces[ topo->objs[ exec_out_link->dcache_obj_id ].wksp_id ].wksp;
3087 0 : exec_out->chunk0 = fd_dcache_compact_chunk0( exec_out->mem, exec_out_link->dcache );
3088 0 : exec_out->wmark = fd_dcache_compact_wmark( exec_out->mem, exec_out_link->dcache, exec_out_link->mtu );
3089 0 : exec_out->chunk = exec_out->chunk0;
3090 0 : }
3091 :
3092 : /* set up vote related items */
3093 0 : ctx->vote = tile->replay.vote;
3094 0 : ctx->validator_identity_pubkey[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->replay.identity_key_path, 1 ) );
3095 0 : ctx->vote_acct_addr[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->replay.vote_account_path, 1 ) );
3096 :
3097 : /**********************************************************************/
3098 : /* tower checkpointing for wen-restart */
3099 : /**********************************************************************/
3100 0 : ctx->tower_checkpt_fileno = -1;
3101 0 : if( FD_LIKELY( strlen( tile->replay.tower_checkpt )>0 ) ) {
3102 0 : ctx->tower_checkpt_fileno = open( tile->replay.tower_checkpt,
3103 0 : O_RDWR | O_CREAT,
3104 0 : S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH );
3105 0 : if( ctx->tower_checkpt_fileno<0 ) FD_LOG_ERR(( "Failed at opening the tower checkpoint file" ));
3106 0 : }
3107 :
3108 : /**********************************************************************/
3109 : /* links */
3110 : /**********************************************************************/
3111 :
3112 : /* Setup store tile input */
3113 0 : fd_topo_link_t * store_in_link = &topo->links[ tile->in_link_id[ STORE_IN_IDX ] ];
3114 0 : ctx->store_in_mem = topo->workspaces[ topo->objs[ store_in_link->dcache_obj_id ].wksp_id ].wksp;
3115 0 : ctx->store_in_chunk0 = fd_dcache_compact_chunk0( ctx->store_in_mem, store_in_link->dcache );
3116 0 : ctx->store_in_wmark = fd_dcache_compact_wmark( ctx->store_in_mem, store_in_link->dcache, store_in_link->mtu );
3117 :
3118 : /* Setup pack tile input */
3119 0 : fd_topo_link_t * pack_in_link = &topo->links[ tile->in_link_id[ PACK_IN_IDX ] ];
3120 0 : ctx->pack_in_mem = topo->workspaces[ topo->objs[ pack_in_link->dcache_obj_id ].wksp_id ].wksp;
3121 0 : ctx->pack_in_chunk0 = fd_dcache_compact_chunk0( ctx->pack_in_mem, pack_in_link->dcache );
3122 0 : ctx->pack_in_wmark = fd_dcache_compact_wmark( ctx->pack_in_mem, pack_in_link->dcache, pack_in_link->mtu );
3123 :
3124 : /* Setup batch tile input for epoch account hash */
3125 0 : fd_topo_link_t * batch_in_link = &topo->links[ tile->in_link_id[ BATCH_IN_IDX ] ];
3126 0 : ctx->batch_in_mem = topo->workspaces[ topo->objs[ batch_in_link->dcache_obj_id ].wksp_id ].wksp;
3127 0 : ctx->batch_in_chunk0 = fd_dcache_compact_chunk0( ctx->batch_in_mem, batch_in_link->dcache );
3128 0 : ctx->batch_in_wmark = fd_dcache_compact_wmark( ctx->batch_in_mem, batch_in_link->dcache, batch_in_link->mtu );
3129 :
3130 0 : ctx->shred_in_cnt = tile->in_cnt-SHRED_IN_IDX;
3131 0 : for( ulong i = 0; i<ctx->shred_in_cnt; i++ ) {
3132 0 : fd_topo_link_t * shred_in_link = &topo->links[ tile->in_link_id[ i+SHRED_IN_IDX ] ];
3133 0 : ctx->shred_in[ i ].mem = topo->workspaces[ topo->objs[ shred_in_link->dcache_obj_id ].wksp_id ].wksp;
3134 0 : ctx->shred_in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->shred_in[ i ].mem, shred_in_link->dcache );
3135 0 : ctx->shred_in[ i ].wmark = fd_dcache_compact_wmark( ctx->shred_in[ i ].mem, shred_in_link->dcache, shred_in_link->mtu );
3136 0 : }
3137 :
3138 0 : fd_topo_link_t * notif_out = &topo->links[ tile->out_link_id[ NOTIF_OUT_IDX ] ];
3139 0 : ctx->notif_out_mcache = notif_out->mcache;
3140 0 : ctx->notif_out_sync = fd_mcache_seq_laddr( ctx->notif_out_mcache );
3141 0 : ctx->notif_out_depth = fd_mcache_depth( ctx->notif_out_mcache );
3142 0 : ctx->notif_out_seq = fd_mcache_seq_query( ctx->notif_out_sync );
3143 0 : ctx->notif_out_mem = topo->workspaces[ topo->objs[ notif_out->dcache_obj_id ].wksp_id ].wksp;
3144 0 : ctx->notif_out_chunk0 = fd_dcache_compact_chunk0( ctx->notif_out_mem, notif_out->dcache );
3145 0 : ctx->notif_out_wmark = fd_dcache_compact_wmark ( ctx->notif_out_mem, notif_out->dcache, notif_out->mtu );
3146 0 : ctx->notif_out_chunk = ctx->notif_out_chunk0;
3147 :
3148 0 : fd_topo_link_t * sender_out = &topo->links[ tile->out_link_id[ SENDER_OUT_IDX ] ];
3149 0 : ctx->sender_out_mcache = sender_out->mcache;
3150 0 : ctx->sender_out_sync = fd_mcache_seq_laddr( ctx->sender_out_mcache );
3151 0 : ctx->sender_out_depth = fd_mcache_depth( ctx->sender_out_mcache );
3152 0 : ctx->sender_out_seq = fd_mcache_seq_query( ctx->sender_out_sync );
3153 0 : ctx->sender_out_mem = topo->workspaces[ topo->objs[ sender_out->dcache_obj_id ].wksp_id ].wksp;
3154 0 : ctx->sender_out_chunk0 = fd_dcache_compact_chunk0( ctx->sender_out_mem, sender_out->dcache );
3155 0 : ctx->sender_out_wmark = fd_dcache_compact_wmark ( ctx->sender_out_mem, sender_out->dcache, sender_out->mtu );
3156 0 : ctx->sender_out_chunk = ctx->sender_out_chunk0;
3157 :
3158 : /* Set up stake weights tile output */
3159 0 : fd_topo_link_t * stake_weights_out = &topo->links[ tile->out_link_id[ STAKE_OUT_IDX] ];
3160 0 : ctx->stake_weights_out_mcache = stake_weights_out->mcache;
3161 0 : ctx->stake_weights_out_sync = fd_mcache_seq_laddr( ctx->stake_weights_out_mcache );
3162 0 : ctx->stake_weights_out_depth = fd_mcache_depth( ctx->stake_weights_out_mcache );
3163 0 : ctx->stake_weights_out_seq = fd_mcache_seq_query( ctx->stake_weights_out_sync );
3164 0 : ctx->stake_weights_out_mem = topo->workspaces[ topo->objs[ stake_weights_out->dcache_obj_id ].wksp_id ].wksp;
3165 0 : ctx->stake_weights_out_chunk0 = fd_dcache_compact_chunk0( ctx->stake_weights_out_mem, stake_weights_out->dcache );
3166 0 : ctx->stake_weights_out_wmark = fd_dcache_compact_wmark ( ctx->stake_weights_out_mem, stake_weights_out->dcache, stake_weights_out->mtu );
3167 0 : ctx->stake_weights_out_chunk = ctx->stake_weights_out_chunk0;
3168 :
3169 0 : if( FD_LIKELY( tile->replay.plugins_enabled ) ) {
3170 0 : ctx->replay_plug_out_idx = fd_topo_find_tile_out_link( topo, tile, "replay_plugi", 0 );
3171 0 : fd_topo_link_t const * replay_plugin_out = &topo->links[ tile->out_link_id[ ctx->replay_plug_out_idx] ];
3172 0 : if( strcmp( replay_plugin_out->name, "replay_plugi" ) ) {
3173 0 : FD_LOG_ERR(("output link confusion for output %lu", ctx->replay_plug_out_idx));
3174 0 : }
3175 0 : ctx->replay_plugin_out_mem = topo->workspaces[ topo->objs[ replay_plugin_out->dcache_obj_id ].wksp_id ].wksp;
3176 0 : ctx->replay_plugin_out_chunk0 = fd_dcache_compact_chunk0( ctx->replay_plugin_out_mem, replay_plugin_out->dcache );
3177 0 : ctx->replay_plugin_out_wmark = fd_dcache_compact_wmark ( ctx->replay_plugin_out_mem, replay_plugin_out->dcache, replay_plugin_out->mtu );
3178 0 : ctx->replay_plugin_out_chunk = ctx->replay_plugin_out_chunk0;
3179 :
3180 0 : ctx->votes_plug_out_idx = fd_topo_find_tile_out_link( topo, tile, "votes_plugin", 0 );
3181 0 : fd_topo_link_t const * votes_plugin_out = &topo->links[ tile->out_link_id[ ctx->votes_plug_out_idx] ];
3182 0 : if( strcmp( votes_plugin_out->name, "votes_plugin" ) ) {
3183 0 : FD_LOG_ERR(("output link confusion for output %lu", ctx->votes_plug_out_idx));
3184 0 : }
3185 0 : ctx->votes_plugin_out_mem = topo->workspaces[ topo->objs[ votes_plugin_out->dcache_obj_id ].wksp_id ].wksp;
3186 0 : ctx->votes_plugin_out_chunk0 = fd_dcache_compact_chunk0( ctx->votes_plugin_out_mem, votes_plugin_out->dcache );
3187 0 : ctx->votes_plugin_out_wmark = fd_dcache_compact_wmark ( ctx->votes_plugin_out_mem, votes_plugin_out->dcache, votes_plugin_out->mtu );
3188 0 : ctx->votes_plugin_out_chunk = ctx->votes_plugin_out_chunk0;
3189 0 : }
3190 :
3191 0 : if( strnlen( tile->replay.slots_replayed, sizeof(tile->replay.slots_replayed) )>0UL ) {
3192 0 : ctx->slots_replayed_file = fopen( tile->replay.slots_replayed, "w" );
3193 0 : FD_TEST( ctx->slots_replayed_file );
3194 0 : }
3195 :
3196 : /* replay public setup */
3197 0 : ulong replay_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "replay_pub" );
3198 0 : FD_TEST( replay_obj_id!=ULONG_MAX );
3199 0 : ctx->replay_public_wksp = topo->workspaces[ topo->objs[ replay_obj_id ].wksp_id ].wksp;
3200 :
3201 0 : if( ctx->replay_public_wksp==NULL ) {
3202 0 : FD_LOG_ERR(( "no replay_public workspace" ));
3203 0 : }
3204 :
3205 0 : ctx->replay_public = fd_runtime_public_join( fd_topo_obj_laddr( topo, replay_obj_id ) );
3206 0 : ctx->fecs_inserted = 0UL;
3207 0 : ctx->fecs_removed = 0UL;
3208 0 : FD_TEST( ctx->replay_public!=NULL );
3209 0 : }
3210 :
3211 : static ulong
3212 : populate_allowed_seccomp( fd_topo_t const * topo,
3213 : fd_topo_tile_t const * tile,
3214 : ulong out_cnt,
3215 0 : struct sock_filter * out ) {
3216 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
3217 :
3218 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
3219 0 : fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
3220 0 : FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_replay_tile_ctx_t) );
3221 :
3222 0 : populate_sock_filter_policy_fd_replay_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->blockstore_fd );
3223 0 : return sock_filter_policy_fd_replay_tile_instr_cnt;
3224 0 : }
3225 :
3226 : static ulong
3227 : populate_allowed_fds( fd_topo_t const * topo,
3228 : fd_topo_tile_t const * tile,
3229 : ulong out_fds_cnt,
3230 0 : int * out_fds ) {
3231 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
3232 :
3233 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
3234 0 : fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
3235 0 : FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_replay_tile_ctx_t) );
3236 :
3237 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
3238 :
3239 0 : ulong out_cnt = 0UL;
3240 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
3241 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
3242 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
3243 0 : out_fds[ out_cnt++ ] = ctx->blockstore_fd;
3244 0 : return out_cnt;
3245 0 : }
3246 :
3247 : static inline void
3248 0 : metrics_write( fd_replay_tile_ctx_t * ctx ) {
3249 0 : FD_MGAUGE_SET( REPLAY, LAST_VOTED_SLOT, ctx->metrics.last_voted_slot );
3250 0 : FD_MGAUGE_SET( REPLAY, SLOT, ctx->metrics.slot );
3251 0 : }
3252 :
3253 : /* TODO: This is definitely not correct */
3254 0 : #define STEM_BURST (1UL)
3255 :
3256 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_replay_tile_ctx_t
3257 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_replay_tile_ctx_t)
3258 :
3259 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
3260 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
3261 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
3262 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
3263 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
3264 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
3265 :
3266 : #include "../../disco/stem/fd_stem.c"
3267 :
3268 : fd_topo_run_tile_t fd_tile_replay = {
3269 : .name = "replay",
3270 : .loose_footprint = loose_footprint,
3271 : .populate_allowed_seccomp = populate_allowed_seccomp,
3272 : .populate_allowed_fds = populate_allowed_fds,
3273 : .scratch_align = scratch_align,
3274 : .scratch_footprint = scratch_footprint,
3275 : .privileged_init = privileged_init,
3276 : .unprivileged_init = unprivileged_init,
3277 : .run = stem_run,
3278 : };
|