Line data Source code
1 : #define _GNU_SOURCE
2 : #include "../../disco/tiles.h"
3 : #include "generated/fd_replay_tile_seccomp.h"
4 :
5 : #include "fd_replay_notif.h"
6 : #include "../restore/utils/fd_ssload.h"
7 :
8 : #include "../../disco/keyguard/fd_keyload.h"
9 : #include "../../disco/store/fd_store.h"
10 : #include "../../discof/reasm/fd_reasm.h"
11 : #include "../../util/pod/fd_pod_format.h"
12 : #include "../../flamenco/runtime/fd_txncache.h"
13 : #include "../../flamenco/runtime/context/fd_capture_ctx.h"
14 : #include "../../flamenco/runtime/context/fd_exec_slot_ctx.h"
15 : #include "../../flamenco/runtime/fd_hashes.h"
16 : #include "../../flamenco/runtime/fd_runtime_init.h"
17 : #include "../../flamenco/runtime/fd_runtime.h"
18 : #include "../../flamenco/runtime/fd_runtime_public.h"
19 : #include "../../flamenco/rewards/fd_rewards.h"
20 : #include "../../disco/metrics/fd_metrics.h"
21 : #include "../../choreo/fd_choreo.h"
22 : #include "../../disco/plugin/fd_plugin.h"
23 : #include "fd_exec.h"
24 : #include "../../discof/restore/utils/fd_ssmsg.h"
25 :
26 : #include <arpa/inet.h>
27 : #include <errno.h>
28 : #include <fcntl.h>
29 : #include <linux/unistd.h>
30 : #include <netdb.h>
31 : #include <netinet/in.h>
32 : #include <sys/random.h>
33 : #include <sys/socket.h>
34 : #include <sys/stat.h>
35 : #include <sys/types.h>
36 : #include <unistd.h>
37 :
38 : /* Replay concepts:
39 :
40 : - Blocks are aggregations of entries aka. microblocks which are
41 : groupings of txns and are constructed by the block producer (see
42 : fd_pack).
43 :
44 : - Entries are grouped into entry batches by the block producer (see
45 : fd_pack / fd_shredder).
46 :
47 : - Entry batches are divided into chunks known as shreds by the block
48 : producer (see fd_shredder).
49 :
50 : - Shreds are grouped into forward-error-correction sets (FEC sets) by
51 : the block producer (see fd_shredder).
52 :
53 : - Shreds are transmitted to the rest of the cluster via the Turbine
54 : protocol (see fd_shredder / fd_shred).
55 :
56 : - Once enough shreds within a FEC set are received to recover the
57 : entirety of the shred data encoded by that FEC set, the receiver
58 : can "complete" the FEC set (see fd_fec_resolver).
59 :
60 : - If shreds in the FEC set are missing such that it can't complete,
61 : the receiver can use the Repair protocol to request missing shreds
62 : in FEC set (see fd_repair).
63 :
64 : - The current Repair protocol does not support requesting coding
65 : shreds. As a result, some FEC sets might be actually complete
66 : (contain all data shreds). Repair currently hacks around this by
67 : forcing completion but the long-term solution is to add support for
68 : fec_repairing coding shreds via Repair.
69 :
70 : - FEC sets are delivered in partial-order to the Replay tile by the
71 : Repair tile. Currently Replay only supports replaying entry batches
72 : so FEC sets need to reassembled into an entry batch before they can
73 : be replayed. The new Dispatcher will change this by taking a FEC
74 : set as input instead. */
75 :
76 : /* An estimate of the max number of transactions in a block. If there are more
77 : transactions, they must be split into multiple sets. */
78 : #define MAX_TXNS_PER_REPLAY ( ( FD_SHRED_BLK_MAX * FD_SHRED_MAX_SZ) / FD_TXN_MIN_SERIALIZED_SZ )
79 :
80 : #define PLUGIN_PUBLISH_TIME_NS ((long)60e9)
81 :
82 0 : #define IN_KIND_REPAIR (0)
83 0 : #define IN_KIND_ROOT (2)
84 0 : #define IN_KIND_SNAP (3)
85 0 : #define IN_KIND_WRITER (4)
86 :
87 0 : #define EXEC_TXN_BUSY (0xA)
88 0 : #define EXEC_TXN_READY (0xB)
89 :
90 : #define BANK_HASH_CMP_LG_MAX (16UL)
91 :
92 : struct fd_replay_in_link {
93 : fd_wksp_t * mem;
94 : ulong chunk0;
95 : ulong wmark;
96 : };
97 :
98 : typedef struct fd_replay_in_link fd_replay_in_link_t;
99 :
100 : struct fd_replay_out_link {
101 : ulong idx;
102 :
103 : fd_frag_meta_t * mcache;
104 : ulong * sync;
105 : ulong depth;
106 : ulong seq;
107 :
108 : fd_wksp_t * mem;
109 : ulong chunk0;
110 : ulong wmark;
111 : ulong chunk;
112 :
113 : };
114 : typedef struct fd_replay_out_link fd_replay_out_link_t;
115 :
116 : struct fd_replay_tile_metrics {
117 : ulong slot;
118 : ulong last_voted_slot;
119 : fd_histf_t store_read_wait[ 1 ];
120 : fd_histf_t store_read_work[ 1 ];
121 : fd_histf_t store_publish_wait[ 1 ];
122 : fd_histf_t store_publish_work[ 1 ];
123 : };
124 : typedef struct fd_replay_tile_metrics fd_replay_tile_metrics_t;
125 : #define FD_REPLAY_TILE_METRICS_FOOTPRINT ( sizeof( fd_replay_tile_metrics_t ) )
126 :
127 : /* FIXME this is a temporary workaround because our bank is missing an
128 : important field block_id. This map can removed once that's fixed, and
129 : the slot->block_id is bank_mgr_query_bank(slot)->block_id. */
130 :
131 : typedef struct {
132 : ulong slot;
133 : fd_hash_t block_id;
134 : } block_id_map_t;
135 :
136 : #define MAP_NAME block_id_map
137 0 : #define MAP_T block_id_map_t
138 0 : #define MAP_KEY slot
139 : #define MAP_MEMOIZE 0
140 : #include "../../util/tmpl/fd_map_dynamic.c"
141 :
142 : #define MERKLES_MAX 1024 /* FIXME hack for bounding # of merkle roots.
143 : FEC sets are accumulated into entry batches.
144 : 1024 * 32 shreds = 32768 (max per slot).
145 : Remove with new dispatcher. */
146 :
147 : struct fd_exec_slice { /* FIXME deleted with new dispatcher */
148 : ulong slot;
149 : ushort parent_off;
150 : int slot_complete;
151 : uint data_cnt;
152 : fd_hash_t merkles[MERKLES_MAX];
153 : ulong merkles_cnt;
154 : };
155 : typedef struct fd_exec_slice fd_exec_slice_t;
156 :
157 : #define MAP_NAME fd_exec_slice_map
158 0 : #define MAP_T fd_exec_slice_t
159 0 : #define MAP_KEY slot
160 : #define MAP_MEMOIZE 0
161 : #include "../../util/tmpl/fd_map_dynamic.c"
162 :
163 : #define DEQUE_NAME fd_exec_slice_deque
164 0 : #define DEQUE_T fd_exec_slice_t
165 : #define DEQUE_MAX USHORT_MAX
166 : #include "../../util/tmpl/fd_deque_dynamic.c"
167 :
168 : struct fd_replay_tile_ctx {
169 : fd_wksp_t * wksp;
170 : fd_wksp_t * status_cache_wksp;
171 :
172 : fd_wksp_t * runtime_public_wksp;
173 : fd_runtime_public_t * runtime_public;
174 :
175 : int in_kind[ 64 ];
176 : fd_replay_in_link_t in[ 64 ];
177 :
178 : // Notification output defs
179 : fd_replay_out_link_t notif_out[1];
180 :
181 : // Stake weights output link defs
182 : fd_replay_out_link_t stake_out[1];
183 :
184 : // Shredcap output link defs
185 : fd_replay_out_link_t shredcap_out[1];
186 :
187 : ulong replay_out_idx;
188 : fd_wksp_t * replay_out_mem;
189 : ulong replay_out_chunk0;
190 : ulong replay_out_wmark;
191 : ulong replay_out_chunk;
192 :
193 : // Inputs to plugin/gui
194 : fd_replay_out_link_t plugin_out[1];
195 : fd_replay_out_link_t votes_plugin_out[1];
196 : long last_plugin_push_time;
197 :
198 : int tx_metadata_storage;
199 : char const * funk_checkpt;
200 : char const * genesis;
201 :
202 : /* Do not modify order! This is join-order in unprivileged_init. */
203 :
204 : fd_funk_t funk[1];
205 : fd_forks_t * forks;
206 : fd_store_t * store;
207 :
208 : /* Vote accounts in the current epoch. Lifetimes of the vote account
209 : addresses (pubkeys) are valid for the epoch (the pubkey memory is
210 : owned by the epoch bank). */
211 :
212 : fd_voter_t * epoch_voters; /* Map chain of slot->voter */
213 : fd_bank_hash_cmp_t * bank_hash_cmp; /* Maintains bank hashes seen from votes */
214 :
215 : block_id_map_t * block_id_map; /* maps slot to block id */
216 :
217 : /* Updated during execution */
218 :
219 : fd_exec_slot_ctx_t * slot_ctx;
220 : fd_slice_exec_t slice_exec_ctx;
221 :
222 : /* TODO: Some of these arrays should be bitvecs that get masked into. */
223 : ulong exec_cnt;
224 : fd_replay_out_link_t exec_out [ FD_PACK_MAX_BANK_TILES ]; /* Sending to exec unexecuted txns */
225 : uchar exec_ready[ FD_PACK_MAX_BANK_TILES ]; /* Is tile ready */
226 : uint prev_ids [ FD_PACK_MAX_BANK_TILES ]; /* Previous txn id if any */
227 : ulong * exec_fseq [ FD_PACK_MAX_BANK_TILES ]; /* fseq of the last executed txn */
228 :
229 : ulong writer_cnt;
230 : ulong * writer_fseq[ FD_PACK_MAX_BANK_TILES ];
231 :
232 : /* Metadata updated during execution */
233 :
234 : ulong snapshot_slot;
235 : ulong * turbine_slot0;
236 : ulong * turbine_slot;
237 : ulong root; /* the root slot is the most recent slot to have reached
238 : max lockout in the tower */
239 : ulong bank_idx;
240 :
241 : /* Other metadata */
242 :
243 : ulong funk_seed;
244 : ulong status_cache_seed;
245 : fd_capture_ctx_t * capture_ctx;
246 : FILE * capture_file;
247 : FILE * slots_replayed_file;
248 :
249 : ulong * bank_busy[ FD_PACK_MAX_BANK_TILES ];
250 : ulong bank_cnt;
251 : fd_replay_out_link_t bank_out[ FD_PACK_MAX_BANK_TILES ]; /* Sending to PoH finished txns + a couple more tasks ??? */
252 :
253 :
254 : ulong * published_wmark; /* publish watermark. The watermark is defined as the
255 : minimum of the tower root (root above) and blockstore
256 : smr (blockstore->smr). The watermark is used to
257 : publish our fork-aware structures eg. blockstore,
258 : forks, ghost. In general, publishing has the effect of
259 : pruning minority forks in those structures,
260 : indicating that is ok to release the memory being
261 : occupied by said forks.
262 :
263 : The reason it has to be the minimum of the two, is the
264 : tower root can lag the SMR and vice versa, but both
265 : the fork-aware structures need to maintain information
266 : through both of those slots. */
267 :
268 : ulong * poh; /* proof-of-history slot */
269 : uint poh_init_done;
270 : int snapshot_init_done;
271 :
272 : int tower_checkpt_fileno;
273 :
274 : int vote;
275 :
276 : fd_txncache_t * status_cache;
277 : void * bmtree[ FD_PACK_MAX_BANK_TILES ];
278 :
279 : /* The spad allocators used by the executor tiles are NOT the same as the
280 : spad used for general, longer-lasting spad allocations. The lifetime of
281 : the exec spad is just through an execution. The runtime spad is scoped
282 : to the runtime. The top-most frame will persist for the entire duration
283 : of the process. There will also be a potential second frame that persists
284 : across multiple slots that is created for rewards distrobution. Every other
285 : spad frame should NOT exist beyond the scope of a block. */
286 :
287 : fd_spad_t * exec_spads[ FD_PACK_MAX_BANK_TILES ];
288 : fd_wksp_t * exec_spads_wksp[ FD_PACK_MAX_BANK_TILES ];
289 : ulong exec_spad_cnt;
290 :
291 : fd_spad_t * runtime_spad;
292 :
293 : /* TODO: Remove this and use the parsed manifest generated by snapin
294 : tiles. */
295 : uchar manifest_scratch[ (1UL<<31UL)+(1UL<<28UL) ] __attribute((aligned(FD_SOLANA_MANIFEST_GLOBAL_ALIGN)));
296 : fd_snapshot_manifest_t * manifest;
297 :
298 : int read_only; /* The read-only slot is the slot the validator needs
299 : to replay through before it can proceed with any
300 : write operations such as voting or building blocks.
301 :
302 : This restriction is for safety reasons: the
303 : validator could otherwise equivocate a previous vote
304 : or block. */
305 :
306 : /* Metrics */
307 : fd_replay_tile_metrics_t metrics;
308 :
309 :
310 : ulong enable_bank_hash_cmp;
311 :
312 : fd_banks_t * banks;
313 : int is_booted;
314 :
315 : /* A hack to get the chunk in after_frag. Revist as needed. */
316 : ulong _snap_out_chunk;
317 : uchar const * manifest_dcache; /* Dcache to receive decoded solana manifest */
318 :
319 : fd_reasm_fec_t fec_out;
320 : fd_exec_slice_t * exec_slice_map;
321 : fd_exec_slice_t * exec_slice_deque; /* Deque to buffer exec slices - lives in spad */
322 : };
323 : typedef struct fd_replay_tile_ctx fd_replay_tile_ctx_t;
324 :
325 : FD_FN_CONST static inline ulong
326 0 : scratch_align( void ) {
327 0 : return 128UL;
328 0 : }
329 :
330 : FD_FN_PURE static inline ulong
331 0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
332 0 : return 24UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
333 0 : }
334 :
335 : FD_FN_PURE static inline ulong
336 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
337 :
338 : /* Do not modify order! This is join-order in unprivileged_init. */
339 :
340 0 : ulong l = FD_LAYOUT_INIT;
341 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
342 0 : l = FD_LAYOUT_APPEND( l, FD_CAPTURE_CTX_ALIGN, FD_CAPTURE_CTX_FOOTPRINT );
343 0 : l = FD_LAYOUT_APPEND( l, fd_forks_align(), fd_forks_footprint( FD_BLOCK_MAX ) );
344 0 : for( ulong i = 0UL; i<FD_PACK_MAX_BANK_TILES; i++ ) {
345 0 : l = FD_LAYOUT_APPEND( l, FD_BMTREE_COMMIT_ALIGN, FD_BMTREE_COMMIT_FOOTPRINT(0) );
346 0 : }
347 0 : l = FD_LAYOUT_APPEND( l, 128UL, FD_SLICE_MAX );
348 0 : l = FD_LAYOUT_APPEND( l, block_id_map_align(), block_id_map_footprint( fd_ulong_find_msb( fd_ulong_pow2_up( FD_BLOCK_MAX ) ) ) );
349 0 : l = FD_LAYOUT_APPEND( l, fd_exec_slice_map_align(), fd_exec_slice_map_footprint( 20 ) );
350 0 : l = FD_LAYOUT_FINI ( l, scratch_align() );
351 0 : return l;
352 0 : }
353 :
354 : /* Large number of helpers for after_credit begin here */
355 :
356 : static void
357 : publish_stake_weights( fd_replay_tile_ctx_t * ctx,
358 : fd_stem_context_t * stem,
359 0 : fd_exec_slot_ctx_t * slot_ctx ) {
360 0 : fd_epoch_schedule_t const * epoch_schedule = fd_bank_epoch_schedule_query( slot_ctx->bank );
361 :
362 0 : fd_vote_accounts_global_t const * epoch_stakes = fd_bank_epoch_stakes_locking_query( slot_ctx->bank );
363 0 : fd_vote_accounts_pair_global_t_mapnode_t * epoch_stakes_root = fd_vote_accounts_vote_accounts_root_join( epoch_stakes );
364 :
365 0 : if( epoch_stakes_root!=NULL ) {
366 0 : ulong * stake_weights_msg = fd_chunk_to_laddr( ctx->stake_out->mem, ctx->stake_out->chunk );
367 0 : ulong epoch = fd_slot_to_leader_schedule_epoch( epoch_schedule, fd_bank_slot_get( slot_ctx->bank ) );
368 0 : ulong stake_weights_sz = generate_stake_weight_msg( slot_ctx, epoch - 1, epoch_stakes, stake_weights_msg );
369 0 : ulong stake_weights_sig = 4UL;
370 0 : fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_out->chunk, stake_weights_sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
371 0 : ctx->stake_out->chunk = fd_dcache_compact_next( ctx->stake_out->chunk, stake_weights_sz, ctx->stake_out->chunk0, ctx->stake_out->wmark );
372 0 : FD_LOG_NOTICE(("sending current epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
373 0 : }
374 :
375 0 : fd_bank_epoch_stakes_end_locking_query( slot_ctx->bank );
376 0 : fd_vote_accounts_global_t const * next_epoch_stakes = fd_bank_next_epoch_stakes_locking_query( slot_ctx->bank );
377 0 : fd_vote_accounts_pair_global_t_mapnode_t * next_epoch_stakes_root = fd_vote_accounts_vote_accounts_root_join( next_epoch_stakes );
378 :
379 0 : if( next_epoch_stakes_root!=NULL ) {
380 0 : ulong * stake_weights_msg = fd_chunk_to_laddr( ctx->stake_out->mem, ctx->stake_out->chunk );
381 0 : ulong epoch = fd_slot_to_leader_schedule_epoch( epoch_schedule, fd_bank_slot_get( slot_ctx->bank ) ); /* epoch */
382 0 : ulong stake_weights_sz = generate_stake_weight_msg( slot_ctx, epoch, next_epoch_stakes, stake_weights_msg );
383 0 : ulong stake_weights_sig = 4UL;
384 0 : fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_out->chunk, stake_weights_sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
385 0 : ctx->stake_out->chunk = fd_dcache_compact_next( ctx->stake_out->chunk, stake_weights_sz, ctx->stake_out->chunk0, ctx->stake_out->wmark );
386 0 : FD_LOG_NOTICE(("sending next epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
387 0 : }
388 0 : fd_bank_next_epoch_stakes_end_locking_query( slot_ctx->bank );
389 0 : }
390 :
391 : static void
392 : publish_stake_weights_manifest( fd_replay_tile_ctx_t * ctx,
393 : fd_stem_context_t * stem,
394 0 : fd_snapshot_manifest_t const * manifest ) {
395 0 : fd_epoch_schedule_t const * schedule = fd_type_pun_const( &manifest->epoch_schedule_params );
396 0 : ulong epoch = fd_slot_to_epoch( schedule, manifest->slot, NULL );
397 :
398 : /* current epoch */
399 0 : ulong * stake_weights_msg = fd_chunk_to_laddr( ctx->stake_out->mem, ctx->stake_out->chunk );
400 0 : ulong stake_weights_sz = generate_stake_weight_msg_manifest( epoch, schedule, &manifest->epoch_stakes[0], stake_weights_msg );
401 0 : ulong stake_weights_sig = 4UL;
402 0 : fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_out->chunk, stake_weights_sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
403 0 : ctx->stake_out->chunk = fd_dcache_compact_next( ctx->stake_out->chunk, stake_weights_sz, ctx->stake_out->chunk0, ctx->stake_out->wmark );
404 0 : FD_LOG_NOTICE(("sending current epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
405 :
406 : /* next current epoch */
407 0 : stake_weights_msg = fd_chunk_to_laddr( ctx->stake_out->mem, ctx->stake_out->chunk );
408 0 : stake_weights_sz = generate_stake_weight_msg_manifest( epoch + 1, schedule, &manifest->epoch_stakes[1], stake_weights_msg );
409 0 : stake_weights_sig = 4UL;
410 0 : fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_out->chunk, stake_weights_sz, 0UL, 0UL, fd_frag_meta_ts_comp( fd_tickcount() ) );
411 0 : ctx->stake_out->chunk = fd_dcache_compact_next( ctx->stake_out->chunk, stake_weights_sz, ctx->stake_out->chunk0, ctx->stake_out->wmark );
412 0 : FD_LOG_NOTICE(("sending next epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
413 0 : }
414 :
415 : static void
416 : txncache_publish( fd_replay_tile_ctx_t * ctx,
417 : fd_funk_txn_t * to_root_txn,
418 0 : fd_funk_txn_t * rooted_txn ) {
419 :
420 :
421 : /* For the status cache, we stop rooting until the status cache has been
422 : written out to the current snapshot. We also need to iterate up the
423 : funk transaction tree up until the current "root" to figure out what slots
424 : should be registered. This root can correspond to the latest false root if
425 : one exists. */
426 :
427 :
428 0 : if( FD_UNLIKELY( !ctx->slot_ctx->status_cache ) ) {
429 0 : return;
430 0 : }
431 :
432 0 : fd_funk_txn_start_read( ctx->funk );
433 :
434 0 : fd_funk_txn_t * txn = to_root_txn;
435 0 : fd_funk_txn_pool_t * txn_pool = fd_funk_txn_pool( ctx->funk );
436 0 : while( txn!=rooted_txn ) {
437 0 : ulong slot = txn->xid.ul[0];
438 :
439 0 : FD_LOG_INFO(( "Registering slot %lu", slot ));
440 0 : fd_txncache_register_root_slot( ctx->slot_ctx->status_cache, slot );
441 :
442 0 : txn = fd_funk_txn_parent( txn, txn_pool );
443 0 : }
444 :
445 0 : fd_funk_txn_end_read( ctx->funk );
446 0 : }
447 :
448 : static void
449 : funk_publish( fd_replay_tile_ctx_t * ctx,
450 : fd_funk_txn_t * to_root_txn,
451 0 : ulong wmk ) {
452 :
453 0 : fd_funk_txn_start_write( ctx->funk );
454 0 : FD_LOG_DEBUG(( "Publishing slot=%lu xid=%lu", wmk, to_root_txn->xid.ul[0] ));
455 :
456 : /* This is the standard case. Publish all transactions up to and
457 : including the watermark. This will publish any in-prep ancestors
458 : of root_txn as well. */
459 0 : if( FD_UNLIKELY( !fd_funk_txn_publish( ctx->funk, to_root_txn, 1 ) ) ) {
460 0 : FD_LOG_ERR(( "failed to funk publish slot %lu", wmk ));
461 0 : }
462 0 : fd_funk_txn_end_write( ctx->funk );
463 0 : }
464 :
465 : static void
466 0 : funk_and_txncache_publish( fd_replay_tile_ctx_t * ctx, ulong wmk, fd_funk_txn_xid_t const * xid ) {
467 :
468 0 : FD_LOG_DEBUG(( "Entering funk_and_txncache_publish for wmk=%lu", wmk ));
469 :
470 0 : if( xid->ul[0] != wmk ) {
471 0 : FD_LOG_CRIT(( "Invariant violation: xid->ul[0] != wmk %lu %lu", xid->ul[0], wmk ));
472 0 : }
473 :
474 : /* Handle updates to funk and the status cache. */
475 :
476 0 : fd_funk_txn_start_read( ctx->funk );
477 0 : fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
478 0 : fd_funk_txn_t * to_root_txn = fd_funk_txn_query( xid, txn_map );
479 0 : if( FD_UNLIKELY( !to_root_txn ) ) {
480 0 : FD_LOG_ERR(( "Unable to find funk transaction for xid %lu", xid->ul[0] ));
481 0 : }
482 0 : fd_funk_txn_t * rooted_txn = NULL;
483 0 : fd_funk_txn_end_read( ctx->funk );
484 :
485 0 : txncache_publish( ctx, to_root_txn, rooted_txn );
486 :
487 0 : funk_publish( ctx, to_root_txn, wmk );
488 :
489 0 : if( FD_UNLIKELY( ctx->capture_ctx ) ) {
490 0 : fd_runtime_checkpt( ctx->capture_ctx, ctx->slot_ctx, wmk );
491 0 : }
492 :
493 0 : }
494 :
495 : static void
496 : restore_slot_ctx( fd_replay_tile_ctx_t * ctx,
497 : fd_wksp_t * mem,
498 : ulong chunk,
499 0 : ulong sig ) {
500 : /* Use the full snapshot manifest to initialize the slot context */
501 0 : uchar * slot_ctx_mem = fd_spad_alloc_check( ctx->runtime_spad, FD_EXEC_SLOT_CTX_ALIGN, FD_EXEC_SLOT_CTX_FOOTPRINT );
502 0 : ctx->slot_ctx = fd_exec_slot_ctx_join( fd_exec_slot_ctx_new( slot_ctx_mem ) );
503 0 : ctx->slot_ctx->banks = ctx->banks;
504 0 : ctx->slot_ctx->bank = fd_banks_get_bank( ctx->banks, 0UL );
505 :
506 0 : ctx->slot_ctx->funk = ctx->funk;
507 0 : ctx->slot_ctx->status_cache = ctx->status_cache;
508 :
509 0 : ctx->slot_ctx->capture_ctx = ctx->capture_ctx;
510 :
511 0 : uchar const * data = fd_chunk_to_laddr( mem, chunk );
512 0 : ctx->manifest = (fd_snapshot_manifest_t*)data;
513 :
514 0 : uchar const * manifest_bytes = data+sizeof(fd_snapshot_manifest_t);
515 :
516 0 : fd_bincode_decode_ctx_t decode = {
517 0 : .data = manifest_bytes,
518 0 : .dataend = manifest_bytes+fd_ssmsg_sig_manifest_size( sig ),
519 0 : };
520 :
521 0 : ulong total_sz = 0UL;
522 0 : int err = fd_solana_manifest_decode_footprint( &decode, &total_sz );
523 0 : if( FD_UNLIKELY( err ) ) FD_LOG_ERR(( "fd_solana_manifest_decode_footprint failed (%d)", err ));
524 :
525 0 : if( FD_UNLIKELY( total_sz>sizeof(ctx->manifest_scratch ) ) ) FD_LOG_ERR(( "manifest size %lu is larger than scratch size %lu", total_sz, sizeof(ctx->manifest_scratch) ));
526 :
527 0 : fd_solana_manifest_global_t * manifest_global = fd_solana_manifest_decode_global( ctx->manifest_scratch, &decode );
528 :
529 0 : fd_ssload_recover( (fd_snapshot_manifest_t*)data, ctx->slot_ctx );
530 :
531 0 : fd_exec_slot_ctx_t * recovered_slot_ctx = fd_exec_slot_ctx_recover( ctx->slot_ctx,
532 0 : manifest_global );
533 :
534 0 : if( !recovered_slot_ctx ) {
535 0 : FD_LOG_ERR(( "Failed to restore slot context from snapshot manifest!" ));
536 0 : }
537 0 : }
538 :
539 : static void
540 0 : kickoff_repair_orphans( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem ) {
541 0 : fd_fseq_update( ctx->published_wmark, ctx->manifest->slot );
542 0 : publish_stake_weights_manifest( ctx, stem, ctx->manifest );
543 0 : }
544 :
545 : static void
546 : replay_plugin_publish( fd_replay_tile_ctx_t * ctx,
547 : fd_stem_context_t * stem,
548 : ulong sig,
549 : uchar const * data,
550 0 : ulong data_sz ) {
551 0 : uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->plugin_out->mem, ctx->plugin_out->chunk );
552 0 : fd_memcpy( dst, data, data_sz );
553 0 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
554 0 : fd_stem_publish( stem, ctx->plugin_out->idx, sig, ctx->plugin_out->chunk, data_sz, 0UL, 0UL, tspub );
555 0 : ctx->plugin_out->chunk = fd_dcache_compact_next( ctx->plugin_out->chunk, data_sz, ctx->plugin_out->chunk0, ctx->plugin_out->wmark );
556 0 : }
557 :
558 : static void
559 : publish_slot_notifications( fd_replay_tile_ctx_t * ctx,
560 : fd_stem_context_t * stem,
561 : ulong block_entry_block_height,
562 0 : ulong curr_slot ) {
563 0 : if( FD_LIKELY( !ctx->notif_out->mcache ) ) return;
564 :
565 0 : long notify_time_ns = -fd_log_wallclock();
566 0 : #define NOTIFY_START msg = fd_chunk_to_laddr( ctx->notif_out->mem, ctx->notif_out->chunk )
567 0 : #define NOTIFY_END \
568 0 : fd_mcache_publish( ctx->notif_out->mcache, ctx->notif_out->depth, ctx->notif_out->seq, \
569 0 : 0UL, ctx->notif_out->chunk, sizeof(fd_replay_notif_msg_t), 0UL, tsorig, tsorig ); \
570 0 : ctx->notif_out->seq = fd_seq_inc( ctx->notif_out->seq, 1UL ); \
571 0 : ctx->notif_out->chunk = fd_dcache_compact_next( ctx->notif_out->chunk, sizeof(fd_replay_notif_msg_t), \
572 0 : ctx->notif_out->chunk0, ctx->notif_out->wmark ); \
573 0 : msg = NULL
574 :
575 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
576 0 : fd_replay_notif_msg_t * msg = NULL;
577 :
578 0 : {
579 0 : NOTIFY_START;
580 0 : msg->type = FD_REPLAY_SLOT_TYPE;
581 0 : msg->slot_exec.slot = curr_slot;
582 0 : msg->slot_exec.parent = fd_bank_parent_slot_get( ctx->slot_ctx->bank );
583 0 : msg->slot_exec.root = fd_fseq_query( ctx->published_wmark );
584 0 : msg->slot_exec.height = block_entry_block_height;
585 0 : msg->slot_exec.transaction_count = fd_bank_txn_count_get( ctx->slot_ctx->bank );
586 0 : msg->slot_exec.shred_cnt = fd_bank_shred_cnt_get( ctx->slot_ctx->bank );
587 :
588 0 : msg->slot_exec.bank_hash = fd_bank_bank_hash_get( ctx->slot_ctx->bank );
589 :
590 0 : fd_blockhashes_t const * block_hash_queue = fd_bank_block_hash_queue_query( ctx->slot_ctx->bank );
591 0 : fd_hash_t const * last_hash = fd_blockhashes_peek_last( block_hash_queue );
592 0 : FD_TEST( last_hash );
593 0 : msg->slot_exec.block_hash = *last_hash;
594 :
595 0 : msg->slot_exec.ts = tsorig;
596 0 : NOTIFY_END;
597 0 : }
598 0 : fd_bank_shred_cnt_set( ctx->slot_ctx->bank, 0UL );
599 :
600 0 : FD_TEST( curr_slot == fd_bank_slot_get( ctx->slot_ctx->bank ) );
601 :
602 0 : #undef NOTIFY_START
603 0 : #undef NOTIFY_END
604 0 : notify_time_ns += fd_log_wallclock();
605 0 : FD_LOG_DEBUG(("TIMING: notify_slot_time - slot: %lu, elapsed: %6.6f ms", curr_slot, (double)notify_time_ns * 1e-6));
606 :
607 0 : if( ctx->plugin_out->mem ) {
608 : /*
609 : fd_replay_complete_msg_t msg2 = {
610 : .slot = curr_slot,
611 : .total_txn_count = ctx->slot_ctx->txn_count,
612 : .nonvote_txn_count = ctx->slot_ctx->nonvote_txn_count,
613 : .failed_txn_count = ctx->slot_ctx->failed_txn_count,
614 : .nonvote_failed_txn_count = ctx->slot_ctx->nonvote_failed_txn_count,
615 : .compute_units = ctx->slot_ctx->total_compute_units_used,
616 : .transaction_fee = ctx->slot_ctx->slot_bank.collected_execution_fees,
617 : .priority_fee = ctx->slot_ctx-2842>slot_bank.collected_priority_fees,
618 : .parent_slot = fd_bank_parent_slot_get( ctx->slot_ctx->bank ),
619 : };
620 : */
621 :
622 0 : ulong msg[11];
623 0 : msg[ 0 ] = fd_bank_slot_get( ctx->slot_ctx->bank );
624 0 : msg[ 1 ] = fd_bank_txn_count_get( ctx->slot_ctx->bank );
625 0 : msg[ 2 ] = fd_bank_nonvote_txn_count_get( ctx->slot_ctx->bank );
626 0 : msg[ 3 ] = fd_bank_failed_txn_count_get( ctx->slot_ctx->bank );
627 0 : msg[ 4 ] = fd_bank_nonvote_failed_txn_count_get( ctx->slot_ctx->bank );
628 0 : msg[ 5 ] = fd_bank_total_compute_units_used_get( ctx->slot_ctx->bank );
629 0 : msg[ 6 ] = fd_bank_execution_fees_get( ctx->slot_ctx->bank );
630 0 : msg[ 7 ] = fd_bank_priority_fees_get( ctx->slot_ctx->bank );
631 0 : msg[ 8 ] = 0UL; /* todo ... track tips */
632 0 : msg[ 9 ] = fd_bank_parent_slot_get( ctx->slot_ctx->bank );
633 0 : msg[ 10 ] = 0UL; /* todo ... max compute units */
634 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_SLOT_COMPLETED, (uchar const *)msg, sizeof(msg) );
635 0 : }
636 0 : }
637 :
638 : static void
639 0 : init_after_snapshot( fd_replay_tile_ctx_t * ctx ) {
640 : /* Do not modify order! */
641 :
642 : /* Now that the snapshot has been loaded in, we have to refresh the
643 : stake delegations since the manifest does not contain the full set
644 : of data required for the stake delegations. See
645 : fd_stake_delegations.h for why this is required. */
646 :
647 0 : fd_refresh_stake_delegations( ctx->slot_ctx );
648 :
649 : /* After both snapshots have been loaded in, we can determine if we should
650 : start distributing rewards. */
651 :
652 0 : fd_rewards_recalculate_partitioned_rewards( ctx->slot_ctx, ctx->capture_ctx, ctx->runtime_spad );
653 :
654 0 : ulong snapshot_slot = fd_bank_slot_get( ctx->slot_ctx->bank );
655 0 : if( FD_UNLIKELY( !snapshot_slot ) ) {
656 : /* Genesis-specific setup. */
657 : /* FIXME: This branch does not set up a new block exec ctx
658 : properly. Needs to do whatever prepare_new_block_execution
659 : does, but just hacking that in breaks stuff. */
660 0 : fd_runtime_update_leaders( ctx->slot_ctx->bank,
661 0 : fd_bank_slot_get( ctx->slot_ctx->bank ),
662 0 : ctx->runtime_spad );
663 :
664 0 : fd_bank_parent_slot_set( ctx->slot_ctx->bank, 0UL );
665 :
666 0 : ulong hashcnt_per_slot = fd_bank_hashes_per_tick_get( ctx->slot_ctx->bank ) * fd_bank_ticks_per_slot_get( ctx->slot_ctx->bank );
667 0 : fd_hash_t * poh = fd_bank_poh_modify( ctx->slot_ctx->bank );
668 0 : while(hashcnt_per_slot--) {
669 0 : fd_sha256_hash( poh->hash, 32UL, poh->hash );
670 0 : }
671 :
672 0 : FD_TEST( fd_runtime_block_execute_prepare( ctx->slot_ctx, ctx->runtime_spad ) == 0 );
673 0 : fd_runtime_block_info_t info = { .signature_cnt = 0 };
674 :
675 0 : fd_runtime_block_execute_finalize( ctx->slot_ctx, &info, ctx->runtime_spad );
676 :
677 0 : snapshot_slot = 1UL;
678 :
679 : /* Now setup exec tiles for execution */
680 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
681 0 : ctx->exec_ready[ i ] = EXEC_TXN_READY;
682 0 : }
683 0 : }
684 :
685 0 : ctx->snapshot_slot = snapshot_slot;
686 :
687 : /* Initialize consensus structures post-snapshot */
688 :
689 0 : fd_fork_t * snapshot_fork = fd_forks_init( ctx->forks, fd_bank_slot_get( ctx->slot_ctx->bank ) );
690 0 : if( FD_UNLIKELY( !snapshot_fork ) ) {
691 0 : FD_LOG_CRIT(( "Failed to initialize snapshot fork" ));
692 0 : }
693 :
694 0 : fd_hash_t * block_id = fd_bank_block_id_modify( ctx->slot_ctx->bank );
695 0 : memset( block_id, 0, sizeof(fd_hash_t) );
696 0 : block_id->key[0] = UCHAR_MAX; /* TODO: would be good to have the actual block id of the snapshot slot */
697 :
698 0 : fd_vote_accounts_global_t const * vote_accounts = fd_bank_curr_epoch_stakes_locking_query( ctx->slot_ctx->bank );
699 :
700 0 : fd_vote_accounts_pair_global_t_mapnode_t * vote_accounts_pool = fd_vote_accounts_vote_accounts_pool_join( vote_accounts );
701 0 : fd_vote_accounts_pair_global_t_mapnode_t * vote_accounts_root = fd_vote_accounts_vote_accounts_root_join( vote_accounts );
702 :
703 0 : fd_bank_hash_cmp_t * bank_hash_cmp = ctx->bank_hash_cmp;
704 0 : for( fd_vote_accounts_pair_global_t_mapnode_t * curr = fd_vote_accounts_pair_global_t_map_minimum( vote_accounts_pool, vote_accounts_root );
705 0 : curr;
706 0 : curr = fd_vote_accounts_pair_global_t_map_successor( vote_accounts_pool, curr ) ) {
707 0 : bank_hash_cmp->total_stake += curr->elem.stake;
708 0 : }
709 0 : bank_hash_cmp->watermark = snapshot_slot;
710 :
711 0 : fd_bank_curr_epoch_stakes_end_locking_query( ctx->slot_ctx->bank );
712 :
713 0 : ulong root = snapshot_slot;
714 0 : if( FD_LIKELY( root > fd_fseq_query( ctx->published_wmark ) ) ) {
715 :
716 : /* The watermark has advanced likely because we loaded an
717 : incremental snapshot that was downloaded just-in-time. We had
718 : kicked off repair with an older incremental snapshot, and so now
719 : we have to prune the relevant data structures, so replay can
720 : start from the latest frontier.
721 :
722 : No funk_and_txncache_publish( ctx, wmark, &xid ); because there
723 : are no funk txns to publish, and all rooted slots have already
724 : been registered in the txncache when we loaded the snapshot. */
725 :
726 0 : if( FD_LIKELY( ctx->store ) ) {
727 0 : block_id_map_t * bid = block_id_map_query( ctx->block_id_map, root, NULL );
728 0 : fd_store_exacq ( ctx->store );
729 0 : fd_store_publish( ctx->store, &bid->block_id );
730 0 : fd_store_exrel ( ctx->store );
731 0 : }
732 0 : if( FD_LIKELY( ctx->forks ) ) fd_forks_publish( ctx->forks, root );
733 :
734 0 : fd_fseq_update( ctx->published_wmark, root );
735 0 : }
736 :
737 : /* Now that the snapshot(s) are done loading, we can mark all of the
738 : exec tiles as ready. */
739 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
740 0 : if( ctx->exec_ready[ i ] == EXEC_TXN_BUSY ) {
741 0 : ctx->exec_ready[ i ] = EXEC_TXN_READY;
742 0 : }
743 0 : }
744 :
745 :
746 0 : FD_LOG_NOTICE(( "snapshot slot %lu", snapshot_slot ));
747 0 : }
748 :
749 : static void
750 : init_from_snapshot( fd_replay_tile_ctx_t * ctx,
751 0 : fd_stem_context_t * stem ) {
752 0 : fd_features_restore( ctx->slot_ctx, ctx->runtime_spad );
753 :
754 0 : fd_slot_lthash_t const * lthash = fd_bank_lthash_locking_query( ctx->slot_ctx->bank );
755 0 : if( fd_lthash_is_zero( (fd_lthash_value_t *)lthash ) ) {
756 0 : FD_LOG_ERR(( "snapshot manifest does not contain lthash!" ));
757 0 : }
758 0 : fd_bank_lthash_end_locking_query( ctx->slot_ctx->bank );
759 :
760 0 : fd_runtime_update_leaders( ctx->slot_ctx->bank,
761 0 : fd_bank_slot_get( ctx->slot_ctx->bank ),
762 0 : ctx->runtime_spad );
763 :
764 0 : fd_runtime_read_genesis( ctx->slot_ctx,
765 0 : ctx->genesis,
766 0 : 1,
767 0 : ctx->runtime_spad );
768 : /* We call this after fd_runtime_read_genesis, which sets up the
769 : slot_bank needed in blockstore_init. */
770 : /* FIXME: We should really only call this once. */
771 0 : init_after_snapshot( ctx );
772 :
773 0 : if( ctx->plugin_out->mem && strlen( ctx->genesis ) > 0 ) {
774 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_GENESIS_HASH_KNOWN, fd_bank_genesis_hash_query( ctx->slot_ctx->bank )->hash, sizeof(fd_hash_t) );
775 0 : }
776 :
777 : // Tell the world about the current activate features
778 0 : fd_features_t const * features = fd_bank_features_query( ctx->slot_ctx->bank );
779 0 : fd_memcpy( &ctx->runtime_public->features, features, sizeof(ctx->runtime_public->features) );
780 :
781 : /* Publish slot notifs */
782 0 : ulong curr_slot = fd_bank_slot_get( ctx->slot_ctx->bank );
783 0 : ulong block_entry_height = fd_bank_block_height_get( ctx->slot_ctx->bank );
784 :
785 0 : publish_slot_notifications( ctx, stem, block_entry_height, curr_slot );
786 :
787 0 : FD_TEST( ctx->slot_ctx );
788 0 : }
789 :
790 : static void
791 : on_snapshot_message( fd_replay_tile_ctx_t * ctx,
792 : fd_stem_context_t * stem,
793 : ulong in_idx,
794 : ulong chunk,
795 0 : ulong sig ) {
796 0 : ulong msg = fd_ssmsg_sig_message( sig );
797 0 : if( FD_LIKELY( msg==FD_SSMSG_DONE ) ) {
798 : /* An end of message notification indicates the snapshot is loaded.
799 : Replay is able to start executing from this point onwards. */
800 : /* TODO: replay should finish booting. Could make replay a
801 : state machine and set the state here accordingly. */
802 0 : FD_LOG_INFO(("Snapshot loaded, replay can start executing"));
803 0 : ctx->snapshot_init_done = 1;
804 : /* Kickoff repair orphans after the snapshots are done loading. If
805 : we kickoff repair after we receive a full manifest, we might try
806 : to repair a slot that is potentially huge amount of slots behind
807 : turbine causing our repair buffers to fill up. Instead, we should
808 : wait until we are done receiving all the snapshots.
809 :
810 : TODO: Eventually, this logic should be cased out more:
811 : 1. If we just have a full snapshot, load in the slot_ctx for the
812 : slot ctx and kickoff repair as soon as the manifest is
813 : received.
814 : 2. If we are loading a full and incremental snapshot, we should
815 : only load in the slot_ctx and kickoff repair for the
816 : incremental snapshot. */
817 0 : kickoff_repair_orphans( ctx, stem );
818 0 : init_from_snapshot( ctx, stem );
819 0 : ulong curr_slot = fd_bank_slot_get( ctx->slot_ctx->bank );
820 0 : block_id_map_t * entry = block_id_map_insert( ctx->block_id_map, curr_slot );
821 0 : fd_hash_t null = { 0 };
822 0 : entry->block_id = null;
823 0 : return;
824 0 : }
825 :
826 0 : switch( msg ) {
827 0 : case FD_SSMSG_MANIFEST_FULL:
828 0 : case FD_SSMSG_MANIFEST_INCREMENTAL: {
829 : /* We may either receive a full snapshot manifest or an
830 : incremental snapshot manifest. Note that this external message
831 : id is only used temporarily because replay cannot yet receive
832 : the firedancer-internal snapshot manifest message. */
833 0 : restore_slot_ctx( ctx, ctx->in[ in_idx ].mem, chunk, sig );
834 :
835 : /* The below handles the annoying case in which multiple manifests
836 : can be received. A new manifest is essentially a new root, and
837 : while the Store has a "publish" operation for new roots, this
838 : scenario is a little different, because the manifest overwrites
839 : the previous one entirely. For example, the manifest slot can
840 : go backwards, which would violate the Store assumption that the
841 : root only publishes forwards (and is a descendant of the prior
842 : root). Instead, on new manifest, the Store is cleaned out
843 : entirely. This could be done more intelligently and retain any
844 : store FEC sets that we suspect would chain off the new manifest
845 : slot, but given this is a startup-only operation it is probably
846 : unnecessary.*/
847 :
848 0 : fd_store_exacq( ctx->store );
849 0 : fd_hash_t null = { 0 }; /* FIXME sentinel value for missing block_id in manifest */
850 0 : if( FD_LIKELY( fd_store_root( ctx->store ) ) ) fd_store_clear( ctx->store );
851 0 : fd_store_insert( ctx->store, 0, &null );
852 0 : ctx->store->slot0 = fd_bank_slot_get( ctx->slot_ctx->bank ); /* FIXME special slot to link to sentinel value */
853 0 : fd_store_exrel( ctx->store );
854 0 : break;
855 0 : }
856 0 : default: {
857 0 : FD_LOG_ERR(( "Received unknown snapshot message with msg %lu", msg ));
858 0 : return;
859 0 : }
860 0 : }
861 :
862 0 : return;
863 0 : }
864 :
865 : /* Receives from repair newly completed slices of executable slots on
866 : the frontier. Guaranteed good properties, like happiness, in order,
867 : executable immediately as long as the mcache wasn't overrun. */
868 : static int
869 : before_frag( fd_replay_tile_ctx_t * ctx,
870 : ulong in_idx,
871 : ulong seq FD_PARAM_UNUSED,
872 0 : ulong sig FD_PARAM_UNUSED ) {
873 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPAIR ) ) {
874 : /* If the internal slice buffer is full, there is nowhere for the
875 : fragment to go and we cannot pull it off the incoming queue yet.
876 : This will eventually cause backpressure to the repair system. */
877 :
878 : /* FIXME this isn't quite right anymore, because the slice queue
879 : no longer corresponds 1-1 with the input frag type. FEC sets are
880 : delivered, not slices. This could result in us backpressuring
881 : too early (in the worst case an entire block, if there is a
882 : single slice for the block). */
883 :
884 0 : if( FD_UNLIKELY( fd_exec_slice_deque_full( ctx->exec_slice_deque ) ) ) return -1;
885 0 : }
886 0 : return 0;
887 0 : }
888 :
889 : static void
890 : during_frag( fd_replay_tile_ctx_t * ctx,
891 : ulong in_idx,
892 : ulong seq,
893 : ulong sig,
894 : ulong chunk,
895 : ulong sz,
896 0 : ulong ctl ) {
897 0 : (void)seq;
898 0 : (void)ctl;
899 :
900 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAP ) ) ctx->_snap_out_chunk = chunk;
901 0 : else if( FD_LIKELY( ctx->in_kind[in_idx] == IN_KIND_REPAIR ) ) {
902 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) ) {
903 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
904 0 : }
905 0 : FD_TEST( sz==sizeof(fd_reasm_fec_t) );
906 0 : memcpy( &ctx->fec_out, fd_chunk_to_laddr( ctx->in[ in_idx ].mem, chunk ), sizeof(fd_reasm_fec_t) );
907 0 : } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_WRITER ) ) {
908 :
909 : /* Currently we need to serialize all solcap writes in a single tile, so
910 : we are notified by the writer tile when an account update has occurred.
911 :
912 : TODO: remove this message and link when solcap v2 is here */
913 0 : if( FD_UNLIKELY( sig!=FD_WRITER_ACCOUNT_UPDATE_SIG ) ) {
914 0 : FD_LOG_WARNING(( "Unexpected sig %lu from writer_replay", sig ));
915 0 : return;
916 0 : }
917 :
918 0 : if( FD_UNLIKELY( !ctx->capture_ctx || !ctx->capture_ctx->capture ||
919 0 : fd_bank_slot_get( ctx->slot_ctx->bank )<ctx->capture_ctx->solcap_start_slot ) ) {
920 : /* No solcap capture configured or slot not reached yet, ignore the message */
921 0 : return;
922 0 : }
923 :
924 0 : fd_replay_in_link_t * in = &ctx->in[ in_idx ];
925 0 : uchar const * msg_data = fd_chunk_to_laddr( in->mem, chunk );
926 0 : fd_runtime_public_account_update_msg_t const * msg = (fd_runtime_public_account_update_msg_t const *)msg_data;
927 :
928 : /* Account data follows immediately after the message header */
929 0 : void const * account_data = msg_data + sizeof(fd_runtime_public_account_update_msg_t);
930 :
931 : /* Write the account to the solcap file */
932 0 : fd_solcap_write_account( ctx->capture_ctx->capture,
933 0 : &msg->pubkey,
934 0 : &msg->info,
935 0 : account_data,
936 0 : msg->data_sz,
937 0 : &msg->hash );
938 0 : }
939 0 : }
940 :
941 : static void
942 : after_frag( fd_replay_tile_ctx_t * ctx,
943 : ulong in_idx,
944 : ulong seq FD_PARAM_UNUSED,
945 : ulong sig,
946 : ulong sz FD_PARAM_UNUSED,
947 : ulong tsorig FD_PARAM_UNUSED,
948 : ulong tspub FD_PARAM_UNUSED,
949 0 : fd_stem_context_t * stem FD_PARAM_UNUSED ) {
950 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_ROOT ) ) {
951 0 : ulong root = sig;
952 :
953 0 : if( FD_LIKELY( root <= fd_fseq_query( ctx->published_wmark ) ) ) {
954 0 : return;
955 0 : }
956 :
957 0 : ctx->root = root;
958 0 : block_id_map_t * block_id = block_id_map_query( ctx->block_id_map, root, NULL );
959 0 : FD_TEST( block_id ); /* invariant violation. replay must have replayed the full block (and therefore have the block id) if it's trying to root it. */
960 0 : if( FD_LIKELY( ctx->store ) ) {
961 0 : long exacq_start, exacq_end, exrel_end;
962 0 : FD_STORE_EXACQ_TIMED( ctx->store, exacq_start, exacq_end );
963 0 : fd_store_publish( ctx->store, &block_id->block_id );
964 0 : FD_STORE_EXREL_TIMED( ctx->store, exrel_end );
965 :
966 0 : fd_histf_sample( ctx->metrics.store_publish_wait, (ulong)fd_long_max(exacq_end - exacq_start, 0) );
967 0 : fd_histf_sample( ctx->metrics.store_publish_work, (ulong)fd_long_max(exrel_end - exacq_end, 0) );
968 :
969 0 : block_id_map_remove( ctx->block_id_map, block_id );
970 0 : }
971 0 : if( FD_LIKELY( ctx->forks ) ) fd_forks_publish( ctx->forks, root );
972 0 : if( FD_LIKELY( ctx->funk ) ) { fd_funk_txn_xid_t xid = { .ul = { root, root } }; funk_and_txncache_publish( ctx, root, &xid ); }
973 0 : if( FD_LIKELY( ctx->banks ) ) fd_banks_publish( ctx->banks, root );
974 :
975 0 : fd_fseq_update( ctx->published_wmark, root );
976 0 : } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_SNAP ) ) {
977 0 : on_snapshot_message( ctx, stem, in_idx, ctx->_snap_out_chunk, sig );
978 0 : } else if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPAIR ) ) {
979 :
980 : /* Forks form a partial ordering over FEC sets. The Repair tile
981 : delivers FEC sets in-order per fork, but FEC set ordering across
982 : forks is arbitrary.
983 :
984 : The existing Replay interface can only replay on entry batch
985 : boundaries but the new Dispatcher interface will support
986 : processing individual FEC sets (ie. the repair_replay frag). So
987 : the following code is a temporary workaround to internally buffer
988 : and reassemble FEC sets into entry batches. */
989 :
990 : // FD_LOG_NOTICE(( "replay tile %lu received FEC set for slot %lu, fec_set_idx %u, parent_off %u, slot_complete %d, data_cnt %u, data_complete %d",
991 : // in_idx, out->slot, out->fec_set_idx, out->parent_off, out->slot_complete, out->data_cnt, out->data_complete ));
992 0 : fd_reasm_fec_t * fec = &ctx->fec_out;
993 0 : fd_exec_slice_t * slice = fd_exec_slice_map_query( ctx->exec_slice_map, fec->slot, NULL );
994 0 : if( FD_UNLIKELY( !slice ) ) slice = fd_exec_slice_map_insert( ctx->exec_slice_map, fec->slot );
995 0 : slice->parent_off = fec->parent_off;
996 0 : slice->slot_complete = fec->slot_complete;
997 0 : slice->data_cnt += fec->data_cnt;
998 0 : FD_TEST( slice->merkles_cnt < MERKLES_MAX );
999 0 : memcpy( &slice->merkles[ slice->merkles_cnt++ ], &fec->key, sizeof(fd_hash_t) );
1000 :
1001 0 : if( FD_UNLIKELY( fec->data_complete ) ) {
1002 :
1003 : /* If the internal slice buffer is full, there is nowhere for the
1004 : fragment to go and we cannot pull it off the incoming queue yet.
1005 : This will eventually cause backpressure to the repair system.
1006 :
1007 : @chali: this comment reads like a bug. probably shouldn't have
1008 : pulled it off the mcache / dcache at all? making it FD_LOG_ERR to
1009 : be rewritten later. */
1010 :
1011 0 : if( FD_UNLIKELY( fd_exec_slice_deque_full( ctx->exec_slice_deque ) ) ) FD_LOG_ERR(( "invariant violation" ));
1012 :
1013 0 : FD_TEST( !fd_exec_slice_deque_full( ctx->exec_slice_deque ) );
1014 0 : fd_exec_slice_deque_push_tail( ctx->exec_slice_deque, *slice ); /* push a copy */
1015 :
1016 0 : memset( slice, 0, sizeof(fd_exec_slice_t) );
1017 0 : fd_exec_slice_map_remove( ctx->exec_slice_map, slice );
1018 0 : }
1019 :
1020 0 : if( FD_UNLIKELY( fec->slot_complete ) ) {
1021 0 : block_id_map_t * entry = block_id_map_insert( ctx->block_id_map, fec->slot );
1022 0 : entry->block_id = fec->key; /* the "block_id" is the last FEC set's merkle root */
1023 0 : }
1024 0 : }
1025 0 : }
1026 :
1027 : __attribute__((unused)) static void
1028 0 : init_poh( fd_replay_tile_ctx_t * ctx ) {
1029 0 : FD_LOG_INFO(( "sending init msg" ));
1030 :
1031 0 : FD_LOG_WARNING(( "hashes_per_tick: %lu, ticks_per_slot: %lu",
1032 0 : fd_bank_hashes_per_tick_get( ctx->slot_ctx->bank ),
1033 0 : fd_bank_ticks_per_slot_get( ctx->slot_ctx->bank ) ));
1034 :
1035 0 : fd_replay_out_link_t * bank_out = &ctx->bank_out[ 0UL ];
1036 0 : fd_poh_init_msg_t * msg = fd_chunk_to_laddr( bank_out->mem, bank_out->chunk ); // FIXME: msg is NULL
1037 0 : msg->hashcnt_per_tick = fd_bank_hashes_per_tick_get( ctx->slot_ctx->bank );
1038 0 : msg->ticks_per_slot = fd_bank_ticks_per_slot_get( ctx->slot_ctx->bank );
1039 0 : msg->tick_duration_ns = (ulong)(fd_bank_ns_per_slot_get( ctx->slot_ctx->bank )) / fd_bank_ticks_per_slot_get( ctx->slot_ctx->bank );
1040 :
1041 0 : fd_blockhashes_t const * bhq = fd_bank_block_hash_queue_query( ctx->slot_ctx->bank );
1042 0 : fd_hash_t const * last_hash = fd_blockhashes_peek_last( bhq );
1043 0 : if( last_hash ) {
1044 0 : memcpy( msg->last_entry_hash, last_hash, sizeof(fd_hash_t) );
1045 0 : } else {
1046 0 : memset( msg->last_entry_hash, 0UL, sizeof(fd_hash_t) );
1047 0 : }
1048 0 : msg->tick_height = fd_bank_slot_get( ctx->slot_ctx->bank ) * msg->ticks_per_slot;
1049 :
1050 0 : ulong sig = fd_disco_replay_old_sig( fd_bank_slot_get( ctx->slot_ctx->bank ), REPLAY_FLAG_INIT );
1051 0 : fd_mcache_publish( bank_out->mcache, bank_out->depth, bank_out->seq, sig, bank_out->chunk, sizeof(fd_poh_init_msg_t), 0UL, 0UL, 0UL );
1052 0 : bank_out->chunk = fd_dcache_compact_next( bank_out->chunk, sizeof(fd_poh_init_msg_t), bank_out->chunk0, bank_out->wmark );
1053 0 : bank_out->seq = fd_seq_inc( bank_out->seq, 1UL );
1054 0 : ctx->poh_init_done = 1;
1055 0 : }
1056 :
1057 : static void
1058 : publish_votes_to_plugin( fd_replay_tile_ctx_t * ctx,
1059 0 : fd_stem_context_t * stem ) {
1060 0 : uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->votes_plugin_out->mem, ctx->votes_plugin_out->chunk );
1061 :
1062 0 : ulong bank_slot = fd_bank_slot_get( ctx->slot_ctx->bank );
1063 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &bank_slot, NULL, ctx->forks->pool );
1064 0 : if( FD_UNLIKELY ( !fork ) ) return;
1065 :
1066 0 : fd_vote_accounts_global_t const * epoch_stakes = fd_bank_epoch_stakes_locking_query( ctx->slot_ctx->bank );
1067 0 : fd_vote_accounts_pair_global_t_mapnode_t * epoch_stakes_pool = fd_vote_accounts_vote_accounts_pool_join( epoch_stakes );
1068 0 : fd_vote_accounts_pair_global_t_mapnode_t * epoch_stakes_root = fd_vote_accounts_vote_accounts_root_join( epoch_stakes );
1069 :
1070 0 : ulong i = 0;
1071 0 : FD_SPAD_FRAME_BEGIN( ctx->runtime_spad ) {
1072 0 : for( fd_vote_accounts_pair_global_t_mapnode_t const * n = fd_vote_accounts_pair_global_t_map_minimum_const( epoch_stakes_pool, epoch_stakes_root );
1073 0 : n && i < FD_CLUSTER_NODE_CNT;
1074 0 : n = fd_vote_accounts_pair_global_t_map_successor_const( epoch_stakes_pool, n ) ) {
1075 0 : if( n->elem.stake == 0 ) continue;
1076 :
1077 0 : uchar * data = (uchar *)&n->elem.value + n->elem.value.data_offset;
1078 0 : ulong data_len = n->elem.value.data_len;
1079 :
1080 0 : int err;
1081 0 : fd_vote_state_versioned_t * vsv = fd_bincode_decode_spad(
1082 0 : vote_state_versioned, ctx->runtime_spad,
1083 0 : data,
1084 0 : data_len,
1085 0 : &err );
1086 0 : if( FD_UNLIKELY( err ) ) {
1087 0 : FD_LOG_ERR(( "Unexpected failure in decoding vote state %d", err ));
1088 0 : }
1089 :
1090 0 : fd_pubkey_t node_pubkey;
1091 0 : ulong commission;
1092 0 : ulong epoch_credits;
1093 0 : fd_vote_epoch_credits_t const * _epoch_credits;
1094 0 : ulong root_slot;
1095 :
1096 0 : switch( vsv->discriminant ) {
1097 0 : case fd_vote_state_versioned_enum_v0_23_5:
1098 0 : node_pubkey = vsv->inner.v0_23_5.node_pubkey;
1099 0 : commission = vsv->inner.v0_23_5.commission;
1100 0 : _epoch_credits = deq_fd_vote_epoch_credits_t_cnt( vsv->inner.v0_23_5.epoch_credits ) == 0 ? NULL : deq_fd_vote_epoch_credits_t_peek_tail_const( vsv->inner.v0_23_5.epoch_credits );
1101 0 : epoch_credits = _epoch_credits==NULL ? 0UL : _epoch_credits->credits - _epoch_credits->prev_credits;
1102 0 : root_slot = vsv->inner.v0_23_5.root_slot;
1103 0 : break;
1104 0 : case fd_vote_state_versioned_enum_v1_14_11:
1105 0 : node_pubkey = vsv->inner.v1_14_11.node_pubkey;
1106 0 : commission = vsv->inner.v1_14_11.commission;
1107 0 : _epoch_credits = deq_fd_vote_epoch_credits_t_cnt( vsv->inner.v1_14_11.epoch_credits ) == 0 ? NULL : deq_fd_vote_epoch_credits_t_peek_tail_const( vsv->inner.v1_14_11.epoch_credits );
1108 0 : epoch_credits = _epoch_credits==NULL ? 0UL : _epoch_credits->credits - _epoch_credits->prev_credits;
1109 0 : root_slot = vsv->inner.v1_14_11.root_slot;
1110 0 : break;
1111 0 : case fd_vote_state_versioned_enum_current:
1112 0 : node_pubkey = vsv->inner.current.node_pubkey;
1113 0 : commission = vsv->inner.current.commission;
1114 0 : _epoch_credits = deq_fd_vote_epoch_credits_t_cnt( vsv->inner.current.epoch_credits ) == 0 ? NULL : deq_fd_vote_epoch_credits_t_peek_tail_const( vsv->inner.current.epoch_credits );
1115 0 : epoch_credits = _epoch_credits==NULL ? 0UL : _epoch_credits->credits - _epoch_credits->prev_credits;
1116 0 : root_slot = vsv->inner.v0_23_5.root_slot;
1117 0 : break;
1118 0 : default:
1119 0 : __builtin_unreachable();
1120 0 : }
1121 :
1122 0 : fd_clock_timestamp_vote_t_mapnode_t query;
1123 0 : memcpy( query.elem.pubkey.uc, n->elem.key.uc, 32UL );
1124 0 : fd_clock_timestamp_votes_global_t const * clock_timestamp_votes = fd_bank_clock_timestamp_votes_locking_query( ctx->slot_ctx->bank );
1125 0 : fd_clock_timestamp_vote_t_mapnode_t * timestamp_votes_root = fd_clock_timestamp_votes_votes_root_join( clock_timestamp_votes );
1126 0 : fd_clock_timestamp_vote_t_mapnode_t * timestamp_votes_pool = fd_clock_timestamp_votes_votes_pool_join( clock_timestamp_votes );
1127 :
1128 0 : fd_clock_timestamp_vote_t_mapnode_t * res = fd_clock_timestamp_vote_t_map_find( timestamp_votes_pool, timestamp_votes_root, &query );
1129 :
1130 0 : fd_vote_update_msg_t * msg = (fd_vote_update_msg_t *)(dst + sizeof(ulong) + i*112U);
1131 0 : memset( msg, 0, 112U );
1132 0 : memcpy( msg->vote_pubkey, n->elem.key.uc, sizeof(fd_pubkey_t) );
1133 0 : memcpy( msg->node_pubkey, node_pubkey.uc, sizeof(fd_pubkey_t) );
1134 0 : msg->activated_stake = n->elem.stake;
1135 0 : msg->last_vote = res == NULL ? 0UL : res->elem.slot;
1136 0 : msg->root_slot = root_slot;
1137 0 : msg->epoch_credits = epoch_credits;
1138 0 : msg->commission = (uchar)commission;
1139 0 : msg->is_delinquent = (uchar)fd_int_if(fd_bank_slot_get( ctx->slot_ctx->bank ) >= 128UL, msg->last_vote <= fd_bank_slot_get( ctx->slot_ctx->bank ) - 128UL, msg->last_vote == 0);
1140 0 : ++i;
1141 0 : fd_bank_clock_timestamp_votes_end_locking_query( ctx->slot_ctx->bank );
1142 0 : }
1143 0 : } FD_SPAD_FRAME_END;
1144 :
1145 0 : fd_bank_epoch_stakes_end_locking_query( ctx->slot_ctx->bank );
1146 :
1147 0 : *(ulong *)dst = i;
1148 :
1149 0 : ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
1150 0 : fd_stem_publish( stem, ctx->votes_plugin_out->idx, FD_PLUGIN_MSG_VOTE_ACCOUNT_UPDATE, ctx->votes_plugin_out->chunk, 0, 0UL, 0UL, tspub );
1151 0 : ctx->votes_plugin_out->chunk = fd_dcache_compact_next( ctx->votes_plugin_out->chunk, 8UL + 40200UL*(58UL+12UL*34UL), ctx->votes_plugin_out->chunk0, ctx->votes_plugin_out->wmark );
1152 0 : }
1153 :
1154 : static void
1155 0 : handle_writer_state_updates( fd_replay_tile_ctx_t * ctx ) {
1156 :
1157 0 : for( ulong i=0UL; i<ctx->writer_cnt; i++ ) {
1158 0 : ulong res = fd_fseq_query( ctx->writer_fseq[ i ] );
1159 0 : if( FD_UNLIKELY( fd_writer_fseq_is_not_joined( res ) ) ) {
1160 0 : FD_LOG_WARNING(( "writer tile fseq idx=%lu has not been joined by the corresponding writer tile", i ));
1161 0 : continue;
1162 0 : }
1163 :
1164 0 : uint state = fd_writer_fseq_get_state( res );
1165 0 : switch( state ) {
1166 0 : case FD_WRITER_STATE_NOT_BOOTED:
1167 0 : FD_LOG_WARNING(( "writer tile idx=%lu is still booting", i ));
1168 0 : break;
1169 0 : case FD_WRITER_STATE_READY:
1170 : /* No-op. */
1171 0 : break;
1172 0 : case FD_WRITER_STATE_TXN_DONE: {
1173 0 : uint txn_id = fd_writer_fseq_get_txn_id( res );
1174 0 : ulong exec_tile_id = fd_writer_fseq_get_exec_tile_id( res );
1175 0 : if( ctx->exec_ready[ exec_tile_id ]==EXEC_TXN_BUSY && ctx->prev_ids[ exec_tile_id ]!=txn_id ) {
1176 : //FD_LOG_DEBUG(( "Ack that exec tile idx=%lu txn id=%u has been finalized by writer tile %lu", exec_tile_id, txn_id, i ));
1177 0 : ctx->exec_ready[ exec_tile_id ] = EXEC_TXN_READY;
1178 0 : ctx->prev_ids[ exec_tile_id ] = txn_id;
1179 0 : fd_fseq_update( ctx->writer_fseq[ i ], FD_WRITER_STATE_READY );
1180 0 : }
1181 0 : break;
1182 0 : }
1183 0 : default:
1184 0 : FD_LOG_CRIT(( "Unexpected fseq state from writer tile idx=%lu state=%u", i, state ));
1185 0 : break;
1186 0 : }
1187 0 : }
1188 :
1189 0 : }
1190 :
1191 : static void
1192 : init_from_genesis( fd_replay_tile_ctx_t * ctx,
1193 0 : fd_stem_context_t * stem ) {
1194 0 : fd_runtime_read_genesis( ctx->slot_ctx,
1195 0 : ctx->genesis,
1196 0 : 0,
1197 0 : ctx->runtime_spad );
1198 :
1199 : /* We call this after fd_runtime_read_genesis, which sets up the
1200 : slot_bank needed in blockstore_init. */
1201 : /* FIXME: We should really only call this once. */
1202 0 : init_after_snapshot( ctx );
1203 :
1204 0 : if( ctx->plugin_out->mem && strlen( ctx->genesis ) > 0 ) {
1205 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_GENESIS_HASH_KNOWN, fd_bank_genesis_hash_query( ctx->slot_ctx->bank )->hash, sizeof(fd_hash_t) );
1206 0 : }
1207 :
1208 : // Tell the world about the current activate features
1209 0 : fd_features_t const * features = fd_bank_features_query( ctx->slot_ctx->bank );
1210 0 : fd_memcpy( &ctx->runtime_public->features, features, sizeof(ctx->runtime_public->features) );
1211 :
1212 : /* Publish slot notifs */
1213 0 : ulong curr_slot = fd_bank_slot_get( ctx->slot_ctx->bank );
1214 0 : ulong block_entry_height = 0;
1215 :
1216 : /* Block after genesis has a height of 1.
1217 : TODO: We should be able to query slot 1 block_map entry to get this
1218 : (using the above for loop), but blockstore/fork setup on genesis is
1219 : broken for now. */
1220 0 : block_entry_height = 1UL;
1221 0 : init_poh( ctx );
1222 :
1223 0 : publish_slot_notifications( ctx, stem, block_entry_height, curr_slot );
1224 :
1225 0 : FD_TEST( ctx->slot_ctx );
1226 0 : }
1227 :
1228 : static void
1229 : handle_new_slot( fd_replay_tile_ctx_t * ctx,
1230 : fd_stem_context_t * stem,
1231 : ulong slot,
1232 0 : ulong parent_slot ) {
1233 :
1234 : /* We need to handle logic that creates a bank and funk txn since
1235 : we are starting to execute a new slot. We must also manage the
1236 : forks data structure to reflect that this slot is now being
1237 : executed. */
1238 :
1239 : /* First, update fd_forks_t */
1240 :
1241 : /* Make sure that the slot is not already in the frontier. */
1242 0 : if( FD_UNLIKELY( fd_fork_frontier_ele_query(
1243 0 : ctx->forks->frontier,
1244 0 : &slot,
1245 0 : NULL,
1246 0 : ctx->forks->pool ) ) ) {
1247 0 : FD_LOG_CRIT(( "invariant violation: child slot %lu was already in the frontier", slot ) );
1248 0 : }
1249 :
1250 : /* This means we want to execute a slice on a new slot. This means
1251 : we have to update our forks and create a new bank/funk_txn. */
1252 0 : fd_fork_t * fork = fd_forks_prepare( ctx->forks, parent_slot );
1253 0 : if( FD_UNLIKELY( !fork ) ) {
1254 0 : FD_LOG_CRIT(( "invariant violation: failed to prepare fork for slot: %lu", slot ));
1255 0 : }
1256 :
1257 : /* We need to update the fork's position in the map. This means
1258 : we have to remove it from the map, update its key and reinsert
1259 : into the frontier map. */
1260 0 : fd_fork_t * fork_map_ele = fd_fork_frontier_ele_remove(
1261 0 : ctx->forks->frontier,
1262 0 : &fork->slot,
1263 0 : NULL,
1264 0 : ctx->forks->pool );
1265 0 : if( FD_UNLIKELY( !fork_map_ele ) ) {
1266 0 : FD_LOG_CRIT(( "invariant violation: failed to remove fork for slot: %lu", slot ));
1267 0 : }
1268 :
1269 : /* Update the values for the child */
1270 0 : fork_map_ele->slot = slot;
1271 :
1272 0 : fd_fork_frontier_ele_insert( ctx->forks->frontier, fork_map_ele, ctx->forks->pool );
1273 :
1274 0 : if( FD_UNLIKELY( fork!=fork_map_ele ) ) {
1275 0 : FD_LOG_CRIT(( "invariant violation: fork != new_fork for slot: %lu", slot ));
1276 0 : }
1277 :
1278 : /* Second, clone the bank from the parent. */
1279 :
1280 0 : ctx->slot_ctx->bank = fd_banks_clone_from_parent( ctx->banks, slot, parent_slot );
1281 0 : if( FD_UNLIKELY( !ctx->slot_ctx->bank ) ) {
1282 0 : FD_LOG_CRIT(( "invariant violation: bank is NULL curr_slot: %lu, parent_slot: %lu", slot, parent_slot ));
1283 0 : }
1284 :
1285 : /* Third, create a new funk txn for the slot. */
1286 :
1287 0 : fd_funk_txn_start_write( ctx->funk );
1288 :
1289 0 : fd_funk_txn_xid_t xid = { .ul = { slot, slot } };
1290 0 : fd_funk_txn_xid_t parent_xid = { .ul = { parent_slot, parent_slot } };
1291 :
1292 0 : fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
1293 0 : if( FD_UNLIKELY( !txn_map ) ) {
1294 0 : FD_LOG_CRIT(( "invariant violation: funk_txn_map is NULL for slot %lu", slot ));
1295 0 : }
1296 :
1297 0 : fd_funk_txn_t * parent_txn = fd_funk_txn_query( &parent_xid, txn_map );
1298 :
1299 0 : fd_funk_txn_t * funk_txn = fd_funk_txn_prepare( ctx->funk, parent_txn, &xid, 1 );
1300 0 : if( FD_UNLIKELY( !funk_txn ) ) {
1301 0 : FD_LOG_CRIT(( "invariant violation: funk_txn is NULL for slot %lu", slot ));
1302 0 : }
1303 :
1304 0 : ctx->slot_ctx->funk_txn = funk_txn;
1305 :
1306 0 : fd_funk_txn_end_write( ctx->funk );
1307 :
1308 : /* Now update any required runtime state and handle an epoch boundary
1309 : change. */
1310 :
1311 0 : fd_bank_parent_slot_set( ctx->slot_ctx->bank, parent_slot );
1312 :
1313 0 : fd_bank_tick_height_set( ctx->slot_ctx->bank, fd_bank_max_tick_height_get( ctx->slot_ctx->bank ) );
1314 :
1315 0 : ulong * max_tick_height = fd_bank_max_tick_height_modify( ctx->slot_ctx->bank );
1316 0 : ulong ticks_per_slot = fd_bank_ticks_per_slot_get( ctx->slot_ctx->bank );
1317 0 : if( FD_UNLIKELY( FD_RUNTIME_EXECUTE_SUCCESS != fd_runtime_compute_max_tick_height(ticks_per_slot, slot, max_tick_height ) ) ) {
1318 0 : FD_LOG_CRIT(( "couldn't compute tick height/max tick height slot %lu ticks_per_slot %lu", slot, ticks_per_slot ));
1319 0 : }
1320 :
1321 0 : fd_bank_enable_exec_recording_set( ctx->slot_ctx->bank, ctx->tx_metadata_storage );
1322 :
1323 0 : int is_epoch_boundary = 0;
1324 0 : fd_runtime_block_pre_execute_process_new_epoch(
1325 0 : ctx->slot_ctx,
1326 0 : ctx->capture_ctx,
1327 0 : ctx->runtime_spad,
1328 0 : &is_epoch_boundary );
1329 0 : if( FD_UNLIKELY( is_epoch_boundary ) ) {
1330 0 : publish_stake_weights( ctx, stem, ctx->slot_ctx );
1331 0 : }
1332 :
1333 0 : int res = fd_runtime_block_execute_prepare( ctx->slot_ctx, ctx->runtime_spad );
1334 0 : if( FD_UNLIKELY( res!=FD_RUNTIME_EXECUTE_SUCCESS ) ) {
1335 0 : FD_LOG_CRIT(( "block prep execute failed" ));
1336 0 : }
1337 0 : }
1338 :
1339 : static void
1340 : handle_prev_slot( fd_replay_tile_ctx_t * ctx,
1341 : ulong slot,
1342 0 : ulong parent_slot ) {
1343 : /* Because a fork already exists for the fork we are attempting to
1344 : execute, we just need to update the slot ctx's handles to
1345 : the bank and funk txn. */
1346 :
1347 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &slot, NULL, ctx->forks->pool );
1348 0 : if( FD_UNLIKELY( !fork ) ) {
1349 0 : FD_LOG_CRIT(( "invariant violation: fork is NULL for slot %lu", slot ));
1350 0 : }
1351 :
1352 0 : FD_LOG_NOTICE(( "switching to executing slot: %lu (parent: %lu)", slot, parent_slot ));
1353 :
1354 0 : ctx->slot_ctx->bank = fd_banks_get_bank( ctx->banks, slot );
1355 0 : if( FD_UNLIKELY( !ctx->slot_ctx->bank ) ) {
1356 0 : FD_LOG_CRIT(( "invariant violation: fork is non-NULL and bank is NULL for slot %lu", slot ));
1357 0 : }
1358 :
1359 0 : fd_funk_txn_map_t * txn_map = fd_funk_txn_map( ctx->funk );
1360 0 : fd_funk_txn_xid_t xid = { .ul = { slot, slot } };
1361 0 : ctx->slot_ctx->funk_txn = fd_funk_txn_query( &xid, txn_map );
1362 0 : if( FD_UNLIKELY( !ctx->slot_ctx->funk_txn ) ) {
1363 0 : FD_LOG_CRIT(( "invariant violation: fork is non-NULL and funk_txn is NULL for slot %lu", slot ));
1364 0 : }
1365 0 : }
1366 :
1367 : static void
1368 : handle_slot_change( fd_replay_tile_ctx_t * ctx,
1369 : fd_stem_context_t * stem,
1370 : ulong slot,
1371 0 : ulong parent_slot ) {
1372 : /* This is an edge case related to pack. The parent fork might
1373 : already be in the frontier and currently executing (ie.
1374 : fork->frozen = 0). */
1375 0 : fd_fork_t * parent_fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &parent_slot, NULL, ctx->forks->pool );
1376 0 : if( FD_UNLIKELY( parent_fork && !!parent_fork->lock ) ) {
1377 0 : FD_LOG_CRIT(( "invariant violation: parent fork is locked for slot %lu", slot ));
1378 0 : }
1379 :
1380 0 : ulong turbine_slot = fd_fseq_query( ctx->turbine_slot );
1381 0 : FD_LOG_NOTICE(( "\n\n[Distance]\n"
1382 0 : "slot: %lu\n"
1383 0 : "current turbine: %lu\n"
1384 0 : "slots behind: %lu\n"
1385 0 : "live: %d\n",
1386 0 : slot,
1387 0 : turbine_slot,
1388 0 : turbine_slot - slot,
1389 0 : (turbine_slot-slot)<5UL ));
1390 :
1391 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &slot, NULL, ctx->forks->pool );
1392 0 : if( !!fork ) {
1393 0 : FD_LOG_NOTICE(( "switching from slot: %lu to executing on a different slot: %lu", fd_bank_slot_get( ctx->slot_ctx->bank ), slot ));
1394 : /* This means we are switching back to a slot we have already
1395 : started executing (we have executed at least 1 slice from the
1396 : slot we are switching to). */
1397 0 : handle_prev_slot( ctx, slot, parent_slot );
1398 0 : } else {
1399 : /* This means we are switching to a new slot. */
1400 0 : handle_new_slot( ctx, stem, slot, parent_slot );
1401 0 : }
1402 :
1403 0 : if( ctx->capture_ctx ) {
1404 0 : fd_solcap_writer_set_slot( ctx->capture_ctx->capture, slot );
1405 0 : }
1406 0 : }
1407 :
1408 : static void
1409 0 : handle_new_slice( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem ) {
1410 : /* If there are no slices in slice deque, then there is nothing to
1411 : execute. */
1412 0 : if( FD_UNLIKELY( fd_exec_slice_deque_cnt( ctx->exec_slice_deque )==0UL ) ) {
1413 0 : return;
1414 0 : }
1415 :
1416 0 : fd_exec_slice_t slice = fd_exec_slice_deque_pop_head( ctx->exec_slice_deque );
1417 :
1418 : /* Pop the head of the slice deque and do some basic sanity checks. */
1419 0 : ulong slot = slice.slot;
1420 0 : ushort parent_off = slice.parent_off;
1421 0 : uint data_cnt = slice.data_cnt;
1422 0 : int slot_complete = slice.slot_complete;
1423 0 : ulong parent_slot = slot - parent_off;
1424 :
1425 0 : if( FD_UNLIKELY( slot<fd_fseq_query( ctx->published_wmark ) ) ) {
1426 0 : FD_LOG_WARNING(( "ignoring replay of slot %lu (parent: %lu). earlier than our watermark %lu.", slot, parent_slot, fd_fseq_query( ctx->published_wmark ) ));
1427 0 : return;
1428 0 : }
1429 :
1430 0 : if( FD_UNLIKELY( parent_slot<fd_fseq_query( ctx->published_wmark ) ) ) {
1431 0 : FD_LOG_WARNING(( "ignoring replay of slot %lu (parent: %lu). parent slot is earlier than our watermark %lu.", slot, parent_slot, fd_fseq_query( ctx->published_wmark ) ) );
1432 0 : return;
1433 0 : }
1434 :
1435 : /* If the slot of the slice we are about to execute is different than
1436 : the current slot, then we need to handle it. There are two cases:
1437 : 1. We have already executed at least one slice from the slot.
1438 : Then we just need to query for the correct database handle,
1439 : fork, and bank.
1440 : 2. We need to create a database txn, initialize forks, and clone
1441 : a bank. */
1442 0 : if( FD_UNLIKELY( slot!=fd_bank_slot_get( ctx->slot_ctx->bank ) ) ) {
1443 0 : handle_slot_change( ctx, stem, slot, parent_slot );
1444 0 : }
1445 :
1446 : /* At this point, our runtime state has been updated correctly. We
1447 : need to update the current fork with the range of shred indices
1448 : that we are about to execute. We also need to populate the slice's
1449 : metadata into the slice_exec_ctx. */
1450 0 : fd_fork_t * current_fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &slot, NULL, ctx->forks->pool );
1451 0 : if( FD_UNLIKELY( !current_fork ) ) FD_LOG_CRIT(( "invariant violation: current_fork is NULL for slot %lu", slot ));
1452 :
1453 0 : long shacq_start, shacq_end, shrel_end;
1454 0 : FD_STORE_SHACQ_TIMED( ctx->store, shacq_start, shacq_end );
1455 0 : ulong slice_sz = 0;
1456 0 : for( ulong i = 0; i < slice.merkles_cnt; i++ ) {
1457 0 : fd_store_fec_t * fec = fd_store_query( ctx->store, &slice.merkles[i] );
1458 0 : FD_TEST( fec );
1459 0 : memcpy( ctx->slice_exec_ctx.buf + slice_sz, fec->data, fec->data_sz );
1460 0 : slice_sz += fec->data_sz;
1461 0 : }
1462 0 : FD_STORE_SHREL_TIMED( ctx->store, shrel_end );
1463 :
1464 0 : fd_histf_sample( ctx->metrics.store_read_wait, (ulong)fd_long_max(shacq_end - shacq_start, 0) );
1465 0 : fd_histf_sample( ctx->metrics.store_read_work, (ulong)fd_long_max(shrel_end - shacq_end, 0) );
1466 :
1467 0 : fd_slice_exec_begin( &ctx->slice_exec_ctx, slice_sz, slot_complete );
1468 0 : fd_bank_shred_cnt_set( ctx->slot_ctx->bank, fd_bank_shred_cnt_get( ctx->slot_ctx->bank ) + data_cnt );
1469 0 : }
1470 :
1471 : static ulong
1472 0 : get_free_exec_tiles( fd_replay_tile_ctx_t * ctx, uchar * exec_free_idx ) {
1473 0 : ulong cnt=0UL;
1474 0 : for( uchar i=0; i<ctx->exec_cnt; i++ ) {
1475 0 : if( ctx->exec_ready[ i ]==EXEC_TXN_READY) {
1476 0 : exec_free_idx[ cnt ] = i;
1477 0 : cnt++;
1478 0 : }
1479 0 : }
1480 0 : return cnt;
1481 0 : }
1482 :
1483 : static void
1484 0 : exec_slice_fini_slot( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem ) {
1485 :
1486 0 : ulong curr_slot = fd_bank_slot_get( ctx->slot_ctx->bank );
1487 :
1488 0 : fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)fd_type_pun( ctx->slice_exec_ctx.buf + ctx->slice_exec_ctx.last_mblk_off );
1489 0 : fd_hash_t * poh = fd_bank_poh_modify( ctx->slot_ctx->bank );
1490 0 : memcpy( poh, hdr->hash, sizeof(fd_hash_t) );
1491 :
1492 0 : block_id_map_t * bid = block_id_map_query( ctx->block_id_map, curr_slot, NULL );
1493 0 : FD_TEST( bid ); /* must exist */
1494 0 : fd_hash_t * block_id = fd_bank_block_id_modify( ctx->slot_ctx->bank );
1495 0 : memcpy( block_id, &bid->block_id, sizeof(fd_hash_t) );
1496 :
1497 : /* Reset ctx for next slot */
1498 0 : fd_slice_exec_reset( &ctx->slice_exec_ctx );
1499 :
1500 : /* do hashing */
1501 :
1502 0 : fd_runtime_block_info_t runtime_block_info[1];
1503 0 : runtime_block_info->signature_cnt = fd_bank_signature_count_get( ctx->slot_ctx->bank );
1504 :
1505 0 : fd_runtime_block_execute_finalize( ctx->slot_ctx, runtime_block_info, ctx->runtime_spad );
1506 :
1507 :
1508 0 : ulong block_entry_height = fd_bank_block_height_get( ctx->slot_ctx->bank );
1509 0 : publish_slot_notifications( ctx, stem, block_entry_height, curr_slot );
1510 :
1511 0 : fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &curr_slot, NULL, ctx->forks->pool );
1512 0 : if( FD_UNLIKELY( !fork ) ) {
1513 0 : FD_LOG_CRIT(( "invariant violation: fork is NULL for slot %lu", curr_slot ));
1514 0 : }
1515 0 : fork->lock = 0;
1516 :
1517 0 : if( FD_LIKELY( ctx->replay_out_idx != ULONG_MAX && !ctx->read_only ) ) {
1518 0 : fd_hash_t const * block_id = &block_id_map_query( ctx->block_id_map, curr_slot, NULL )->block_id;
1519 0 : fd_hash_t const * parent_block_id = &block_id_map_query( ctx->block_id_map, fd_bank_parent_slot_get( ctx->slot_ctx->bank ), NULL )->block_id;
1520 0 : fd_hash_t const * bank_hash = fd_bank_bank_hash_query( ctx->slot_ctx->bank );
1521 0 : fd_hash_t const * block_hash = fd_blockhashes_peek_last( fd_bank_block_hash_queue_query( ctx->slot_ctx->bank ) );
1522 0 : FD_TEST( block_id );
1523 0 : FD_TEST( parent_block_id );
1524 0 : FD_TEST( bank_hash );
1525 0 : FD_TEST( block_hash );
1526 0 : fd_replay_out_t out = {
1527 0 : .block_id = *block_id,
1528 0 : .parent_block_id = *parent_block_id,
1529 0 : .bank_hash = *bank_hash,
1530 0 : .block_hash = *block_hash,
1531 0 : };
1532 0 : uchar * chunk_laddr = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
1533 0 : memcpy( chunk_laddr, &out, sizeof(fd_replay_out_t) );
1534 0 : fd_stem_publish( stem, ctx->replay_out_idx, curr_slot, ctx->replay_out_chunk, sizeof(fd_replay_out_t), 0UL, fd_frag_meta_ts_comp( fd_tickcount() ), fd_frag_meta_ts_comp( fd_tickcount() ) );
1535 0 : ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, sizeof(fd_replay_out_t), ctx->replay_out_chunk0, ctx->replay_out_wmark );
1536 0 : }
1537 :
1538 : /**********************************************************************/
1539 : /* Prepare bank for the next execution and write to debugging files */
1540 : /**********************************************************************/
1541 :
1542 0 : ulong prev_slot = fd_bank_slot_get( ctx->slot_ctx->bank );
1543 :
1544 0 : fd_bank_execution_fees_set( ctx->slot_ctx->bank, 0UL );
1545 :
1546 0 : fd_bank_priority_fees_set( ctx->slot_ctx->bank, 0UL );
1547 :
1548 0 : if( FD_UNLIKELY( ctx->slots_replayed_file ) ) {
1549 0 : FD_LOG_DEBUG(( "writing %lu to slots file", prev_slot ));
1550 0 : fprintf( ctx->slots_replayed_file, "%lu\n", prev_slot );
1551 0 : fflush( ctx->slots_replayed_file );
1552 0 : }
1553 :
1554 0 : if (NULL != ctx->capture_ctx) {
1555 0 : fd_solcap_writer_flush( ctx->capture_ctx->capture );
1556 0 : }
1557 :
1558 : /**********************************************************************/
1559 : /* Bank hash comparison, and halt if there's a mismatch after replay */
1560 : /**********************************************************************/
1561 :
1562 0 : fd_hash_t const * bank_hash = fd_bank_bank_hash_query( ctx->slot_ctx->bank );
1563 0 : fd_bank_hash_cmp_t * bank_hash_cmp = ctx->bank_hash_cmp;
1564 0 : fd_bank_hash_cmp_lock( bank_hash_cmp );
1565 0 : fd_bank_hash_cmp_insert( bank_hash_cmp, curr_slot, bank_hash, 1, 0 );
1566 :
1567 0 : if( ctx->shredcap_out->idx!=ULONG_MAX ) {
1568 : /* TODO: We need some way to define common headers. */
1569 0 : uchar * chunk_laddr = fd_chunk_to_laddr( ctx->shredcap_out->mem, ctx->shredcap_out->chunk );
1570 0 : fd_hash_t const * bank_hash = fd_bank_bank_hash_query( ctx->slot_ctx->bank );
1571 0 : ulong slot = fd_bank_slot_get( ctx->slot_ctx->bank );
1572 0 : memcpy( chunk_laddr, bank_hash, sizeof(fd_hash_t) );
1573 0 : memcpy( chunk_laddr+sizeof(fd_hash_t), &slot, sizeof(ulong) );
1574 0 : fd_stem_publish( stem, ctx->shredcap_out->idx, 0UL, ctx->shredcap_out->chunk, sizeof(fd_hash_t) + sizeof(ulong), 0UL, fd_frag_meta_ts_comp( fd_tickcount() ), fd_frag_meta_ts_comp( fd_tickcount() ) );
1575 0 : ctx->shredcap_out->chunk = fd_dcache_compact_next( ctx->shredcap_out->chunk, sizeof(fd_hash_t) + sizeof(ulong), ctx->shredcap_out->chunk0, ctx->shredcap_out->wmark );
1576 0 : }
1577 :
1578 : /* Try to move the bank hash comparison watermark forward */
1579 0 : for( ulong cmp_slot = bank_hash_cmp->watermark + 1; cmp_slot < curr_slot; cmp_slot++ ) {
1580 0 : if( FD_UNLIKELY( !ctx->enable_bank_hash_cmp ) ) {
1581 0 : bank_hash_cmp->watermark = cmp_slot;
1582 0 : break;
1583 0 : }
1584 0 : int rc = fd_bank_hash_cmp_check( bank_hash_cmp, cmp_slot );
1585 0 : switch ( rc ) {
1586 0 : case -1:
1587 : /* Mismatch */
1588 0 : FD_LOG_WARNING(( "Bank hash mismatch on slot: %lu. Halting.", cmp_slot ));
1589 0 : break;
1590 0 : case 0:
1591 : /* Not ready */
1592 0 : break;
1593 0 : case 1:
1594 : /* Match*/
1595 0 : bank_hash_cmp->watermark = cmp_slot;
1596 0 : break;
1597 0 : default:;
1598 0 : }
1599 0 : }
1600 :
1601 0 : fd_bank_hash_cmp_unlock( bank_hash_cmp );
1602 0 : }
1603 :
1604 : static void
1605 0 : exec_and_handle_slice( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem ) {
1606 0 : uchar exec_free_idx[ FD_PACK_MAX_BANK_TILES ];
1607 0 : ulong free_exec_cnt = get_free_exec_tiles( ctx, exec_free_idx );
1608 :
1609 : /* If there are no txns left to execute in the microblock and the
1610 : exec tiles are not busy, then we are ready to either start
1611 : executing the next microblock/slice/slot.
1612 :
1613 : We have to synchronize on the the microblock boundary because we
1614 : only have the guarantee that all transactions within the same
1615 : microblock can be executed in parallel. */
1616 0 : if( !fd_slice_exec_txn_ready( &ctx->slice_exec_ctx ) && free_exec_cnt==ctx->exec_cnt ) {
1617 0 : if( fd_slice_exec_microblock_ready( &ctx->slice_exec_ctx ) ) {
1618 0 : fd_slice_exec_microblock_parse( &ctx->slice_exec_ctx );
1619 0 : } else if( fd_slice_exec_slice_ready( &ctx->slice_exec_ctx ) ) {
1620 : /* If the current slice was the last one for the slot we need to
1621 : finalize the slot (update bank members/compare bank hash). */
1622 0 : if( fd_slice_exec_slot_complete( &ctx->slice_exec_ctx ) ) {
1623 0 : exec_slice_fini_slot( ctx, stem );
1624 0 : }
1625 :
1626 : /* Now, we are ready to start executing the next buffered slice. */
1627 0 : handle_new_slice( ctx, stem );
1628 0 : }
1629 0 : }
1630 :
1631 : /* At this point, we know that we have some quantity of transactions
1632 : in a microblock that we are ready to execute. */
1633 0 : for( ulong i=0UL; i<free_exec_cnt; i++ ) {
1634 :
1635 0 : if( !fd_slice_exec_txn_ready( &ctx->slice_exec_ctx ) ) {
1636 0 : return;
1637 0 : }
1638 :
1639 0 : ulong exec_idx = exec_free_idx[ i ];
1640 :
1641 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
1642 :
1643 : /* Parse the transaction from the current slice */
1644 0 : fd_txn_p_t txn_p;
1645 0 : fd_slice_exec_txn_parse( &ctx->slice_exec_ctx, &txn_p );
1646 :
1647 : /* Insert or reverify invoked programs for this epoch, if needed
1648 : FIXME: this should be done during txn parsing so that we don't have to loop
1649 : over all accounts a second time. */
1650 0 : fd_runtime_update_program_cache( ctx->slot_ctx, &txn_p, ctx->runtime_spad );
1651 :
1652 : /* Mark the exec tile as busy */
1653 0 : ctx->exec_ready[ exec_idx ] = EXEC_TXN_BUSY;
1654 :
1655 : /* Dispatch dcache to exec tile */
1656 0 : fd_replay_out_link_t * exec_out = &ctx->exec_out[ exec_idx ];
1657 0 : fd_runtime_public_txn_msg_t * exec_msg = (fd_runtime_public_txn_msg_t *)fd_chunk_to_laddr( exec_out->mem, exec_out->chunk );
1658 :
1659 0 : memcpy( &exec_msg->txn, &txn_p, sizeof(fd_txn_p_t) );
1660 0 : exec_msg->slot = fd_bank_slot_get( ctx->slot_ctx->bank );
1661 :
1662 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
1663 0 : fd_stem_publish( stem, exec_out->idx, EXEC_NEW_TXN_SIG, exec_out->chunk, sizeof(fd_runtime_public_txn_msg_t), 0UL, tsorig, tspub );
1664 0 : exec_out->chunk = fd_dcache_compact_next( exec_out->chunk, sizeof(fd_runtime_public_txn_msg_t), exec_out->chunk0, exec_out->wmark );
1665 0 : }
1666 0 : }
1667 :
1668 : static void
1669 : after_credit( fd_replay_tile_ctx_t * ctx,
1670 : fd_stem_context_t * stem,
1671 : int * opt_poll_in FD_PARAM_UNUSED,
1672 0 : int * charge_busy FD_PARAM_UNUSED ) {
1673 :
1674 0 : if( !ctx->snapshot_init_done ) {
1675 0 : if( ctx->plugin_out->mem ) {
1676 0 : uchar msg[56];
1677 0 : fd_memset( msg, 0, sizeof(msg) );
1678 0 : msg[ 0 ] = 0; // ValidatorStartProgress::Initializing
1679 0 : replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
1680 0 : }
1681 :
1682 0 : if( strlen( ctx->genesis )>0 ) {
1683 0 : init_from_genesis( ctx, stem );
1684 0 : ctx->snapshot_init_done = 1;
1685 0 : }
1686 :
1687 0 : return;
1688 0 : }
1689 :
1690 : /* TODO: Consider moving state management to during_housekeeping */
1691 :
1692 : /* Check all the writer link fseqs. */
1693 0 : handle_writer_state_updates( ctx );
1694 :
1695 0 : exec_and_handle_slice( ctx, stem );
1696 :
1697 0 : long now = fd_log_wallclock();
1698 0 : if( ctx->votes_plugin_out->mem && FD_UNLIKELY( ( now - ctx->last_plugin_push_time )>PLUGIN_PUBLISH_TIME_NS ) ) {
1699 0 : ctx->last_plugin_push_time = now;
1700 0 : publish_votes_to_plugin( ctx, stem );
1701 0 : }
1702 :
1703 0 : }
1704 :
1705 : static void
1706 : privileged_init( fd_topo_t * topo,
1707 0 : fd_topo_tile_t * tile ) {
1708 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1709 :
1710 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1711 0 : fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
1712 0 : FD_SCRATCH_ALLOC_FINI ( l, scratch_align() );
1713 0 : memset( ctx, 0, sizeof(fd_replay_tile_ctx_t) );
1714 :
1715 0 : FD_TEST( sizeof(ulong) == getrandom( &ctx->funk_seed, sizeof(ulong), 0 ) );
1716 0 : FD_TEST( sizeof(ulong) == getrandom( &ctx->status_cache_seed, sizeof(ulong), 0 ) );
1717 :
1718 : /**********************************************************************/
1719 : /* runtime public */
1720 : /**********************************************************************/
1721 :
1722 0 : ulong replay_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "runtime_pub" );
1723 0 : if( FD_UNLIKELY( replay_obj_id==ULONG_MAX ) ) {
1724 0 : FD_LOG_ERR(( "no runtime_public" ));
1725 0 : }
1726 :
1727 0 : ctx->runtime_public_wksp = topo->workspaces[ topo->objs[ replay_obj_id ].wksp_id ].wksp;
1728 0 : if( ctx->runtime_public_wksp==NULL ) {
1729 0 : FD_LOG_ERR(( "no runtime_public workspace" ));
1730 0 : }
1731 :
1732 0 : ctx->runtime_public = fd_runtime_public_join( fd_topo_obj_laddr( topo, replay_obj_id ) );
1733 0 : if( FD_UNLIKELY( !ctx->runtime_public ) ) {
1734 0 : FD_LOG_ERR(( "no runtime_public" ));
1735 0 : }
1736 0 : }
1737 :
1738 : static void
1739 : unprivileged_init( fd_topo_t * topo,
1740 0 : fd_topo_tile_t * tile ) {
1741 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1742 :
1743 : /**********************************************************************/
1744 : /* scratch (bump)-allocate memory owned by the replay tile */
1745 : /**********************************************************************/
1746 :
1747 : /* Do not modify order! This is join-order in unprivileged_init. */
1748 :
1749 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1750 0 : fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
1751 0 : void * capture_ctx_mem = FD_SCRATCH_ALLOC_APPEND( l, FD_CAPTURE_CTX_ALIGN, FD_CAPTURE_CTX_FOOTPRINT );
1752 0 : void * forks_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_forks_align(), fd_forks_footprint( FD_BLOCK_MAX ) );
1753 0 : for( ulong i = 0UL; i<FD_PACK_MAX_BANK_TILES; i++ ) {
1754 0 : ctx->bmtree[i] = FD_SCRATCH_ALLOC_APPEND( l, FD_BMTREE_COMMIT_ALIGN, FD_BMTREE_COMMIT_FOOTPRINT(0) );
1755 0 : }
1756 0 : void * slice_buf = FD_SCRATCH_ALLOC_APPEND( l, 128UL, FD_SLICE_MAX );
1757 0 : void * block_id_map_mem = FD_SCRATCH_ALLOC_APPEND( l, block_id_map_align(), block_id_map_footprint( fd_ulong_find_msb( fd_ulong_pow2_up( FD_BLOCK_MAX ) ) ) );
1758 0 : void * exec_slice_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_exec_slice_map_align(), fd_exec_slice_map_footprint( 20 ) );
1759 0 : ulong scratch_alloc_mem = FD_SCRATCH_ALLOC_FINI ( l, scratch_align() );
1760 :
1761 0 : if( FD_UNLIKELY( scratch_alloc_mem != ( (ulong)scratch + scratch_footprint( tile ) ) ) ) {
1762 0 : FD_LOG_ERR( ( "scratch_alloc_mem did not match scratch_footprint diff: %lu alloc: %lu footprint: %lu",
1763 0 : scratch_alloc_mem - (ulong)scratch - scratch_footprint( tile ),
1764 0 : scratch_alloc_mem,
1765 0 : (ulong)scratch + scratch_footprint( tile ) ) );
1766 0 : }
1767 :
1768 : /**********************************************************************/
1769 : /* wksp */
1770 : /**********************************************************************/
1771 :
1772 0 : ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
1773 :
1774 0 : ulong store_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "store" );
1775 0 : FD_TEST( store_obj_id!=ULONG_MAX );
1776 0 : ctx->store = fd_store_join( fd_topo_obj_laddr( topo, store_obj_id ) );
1777 0 : FD_TEST( ctx->store->magic == FD_STORE_MAGIC );
1778 :
1779 0 : ulong status_cache_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "txncache" );
1780 0 : FD_TEST( status_cache_obj_id != ULONG_MAX );
1781 0 : ctx->status_cache_wksp = topo->workspaces[topo->objs[status_cache_obj_id].wksp_id].wksp;
1782 0 : if( ctx->status_cache_wksp == NULL ) {
1783 0 : FD_LOG_ERR(( "no status cache wksp" ));
1784 0 : }
1785 :
1786 : /**********************************************************************/
1787 : /* banks */
1788 : /**********************************************************************/
1789 :
1790 0 : ulong banks_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "banks" );
1791 0 : if( FD_UNLIKELY( banks_obj_id==ULONG_MAX ) ) {
1792 0 : FD_LOG_ERR(( "no banks" ));
1793 0 : }
1794 :
1795 0 : ctx->banks = fd_banks_join( fd_topo_obj_laddr( topo, banks_obj_id ) );
1796 0 : if( FD_UNLIKELY( !ctx->banks ) ) {
1797 0 : FD_LOG_ERR(( "failed to join banks" ));
1798 0 : }
1799 :
1800 0 : fd_bank_t * bank = fd_banks_init_bank( ctx->banks, 0UL );
1801 0 : if( FD_UNLIKELY( !bank ) ) {
1802 0 : FD_LOG_ERR(( "failed to init bank" ));
1803 0 : }
1804 :
1805 : /**********************************************************************/
1806 : /* funk */
1807 : /**********************************************************************/
1808 :
1809 0 : if( FD_UNLIKELY( !fd_funk_join( ctx->funk, fd_topo_obj_laddr( topo, tile->replay.funk_obj_id ) ) ) ) {
1810 0 : FD_LOG_ERR(( "Failed to join database cache" ));
1811 0 : }
1812 :
1813 : /**********************************************************************/
1814 : /* root_slot fseq */
1815 : /**********************************************************************/
1816 :
1817 0 : ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
1818 0 : FD_TEST( root_slot_obj_id!=ULONG_MAX );
1819 0 : ctx->published_wmark = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
1820 0 : if( FD_UNLIKELY( !ctx->published_wmark ) ) FD_LOG_ERR(( "replay tile has no root_slot fseq" ));
1821 0 : FD_TEST( ULONG_MAX==fd_fseq_query( ctx->published_wmark ) );
1822 :
1823 : /**********************************************************************/
1824 : /* turbine_slot fseq */
1825 : /**********************************************************************/
1826 :
1827 0 : ulong turbine_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "turbine_slot" );
1828 0 : FD_TEST( turbine_slot_obj_id!=ULONG_MAX );
1829 0 : ctx->turbine_slot = fd_fseq_join( fd_topo_obj_laddr( topo, turbine_slot_obj_id ) );
1830 0 : if( FD_UNLIKELY( !ctx->turbine_slot ) ) FD_LOG_ERR(( "replay tile has no turb_slot fseq" ));
1831 :
1832 : /**********************************************************************/
1833 : /* TOML paths */
1834 : /**********************************************************************/
1835 :
1836 0 : ctx->tx_metadata_storage = tile->replay.tx_metadata_storage;
1837 0 : ctx->funk_checkpt = tile->replay.funk_checkpt;
1838 0 : ctx->genesis = tile->replay.genesis;
1839 :
1840 : /**********************************************************************/
1841 : /* status cache */
1842 : /**********************************************************************/
1843 :
1844 0 : char const * status_cache_path = tile->replay.status_cache;
1845 0 : if ( strlen( status_cache_path ) > 0 ) {
1846 0 : FD_LOG_NOTICE(("starting status cache restore..."));
1847 0 : int err = fd_wksp_restore( ctx->status_cache_wksp, status_cache_path, (uint)ctx->status_cache_seed );
1848 0 : FD_LOG_NOTICE(("finished status cache restore..."));
1849 0 : if (err) {
1850 0 : FD_LOG_ERR(( "failed to restore %s: error %d", status_cache_path, err ));
1851 0 : }
1852 0 : fd_wksp_tag_query_info_t info;
1853 0 : ulong tag = FD_TXNCACHE_MAGIC;
1854 0 : if( fd_wksp_tag_query( ctx->status_cache_wksp, &tag, 1, &info, 1 ) > 0 ) {
1855 0 : void * status_cache_mem = fd_wksp_laddr_fast( ctx->status_cache_wksp, info.gaddr_lo );
1856 : /* Set up status cache. */
1857 0 : ctx->status_cache = fd_txncache_join( status_cache_mem );
1858 0 : if( ctx->status_cache == NULL ) {
1859 0 : FD_LOG_ERR(( "failed to join status cache in %s", status_cache_path ));
1860 0 : }
1861 0 : } else {
1862 0 : FD_LOG_ERR(( "failed to tag query status cache in %s", status_cache_path ));
1863 0 : }
1864 0 : } else {
1865 0 : void * status_cache_mem = fd_topo_obj_laddr( topo, status_cache_obj_id );
1866 0 : if (status_cache_mem == NULL) {
1867 0 : FD_LOG_ERR(( "failed to allocate status cache" ));
1868 0 : }
1869 0 : ctx->status_cache = fd_txncache_join( status_cache_mem );
1870 0 : if (ctx->status_cache == NULL) {
1871 0 : FD_LOG_ERR(( "failed to join + new status cache" ));
1872 0 : }
1873 0 : }
1874 :
1875 : /**********************************************************************/
1876 : /* spad */
1877 : /**********************************************************************/
1878 :
1879 : /* Join each of the exec spads. */
1880 0 : ctx->exec_cnt = fd_topo_tile_name_cnt( topo, "exec" );
1881 0 : for( ulong i=0UL; i<ctx->exec_cnt; i++ ) {
1882 0 : ulong exec_spad_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "exec_spad.%lu", i );
1883 0 : fd_spad_t * spad = fd_spad_join( fd_topo_obj_laddr( topo, exec_spad_id ) );
1884 0 : ctx->exec_spads[ ctx->exec_spad_cnt ] = spad;
1885 0 : if( FD_UNLIKELY( !ctx->exec_spads[ ctx->exec_spad_cnt ] ) ) {
1886 0 : FD_LOG_ERR(( "failed to join exec spad %lu", i ));
1887 0 : }
1888 0 : ctx->exec_spads_wksp[ ctx->exec_spad_cnt ] = fd_wksp_containing( spad );
1889 0 : if( FD_UNLIKELY( !ctx->exec_spads_wksp[ ctx->exec_spad_cnt ] ) ) {
1890 0 : FD_LOG_ERR(( "failed to join exec spad wksp %lu", i ));
1891 0 : }
1892 :
1893 0 : ctx->exec_spad_cnt++;
1894 0 : }
1895 :
1896 : /* Now join the spad that was setup in the runtime public topo obj. */
1897 :
1898 0 : ctx->runtime_spad = fd_runtime_public_spad( ctx->runtime_public );
1899 0 : if( FD_UNLIKELY( !ctx->runtime_spad ) ) {
1900 0 : FD_LOG_ERR(( "Unable to join the runtime_spad" ));
1901 0 : }
1902 0 : fd_spad_push( ctx->runtime_spad );
1903 :
1904 : /**********************************************************************/
1905 : /* joins */
1906 : /**********************************************************************/
1907 :
1908 0 : uchar * bank_hash_cmp_shmem = fd_spad_alloc_check( ctx->runtime_spad, fd_bank_hash_cmp_align(), fd_bank_hash_cmp_footprint() );
1909 0 : ctx->bank_hash_cmp = fd_bank_hash_cmp_join( fd_bank_hash_cmp_new( bank_hash_cmp_shmem ) );
1910 :
1911 0 : ctx->block_id_map = block_id_map_join( block_id_map_new( block_id_map_mem, fd_ulong_find_msb( fd_ulong_pow2_up( FD_BLOCK_MAX ) ) ) );
1912 :
1913 0 : ctx->exec_slice_map = fd_exec_slice_map_join( fd_exec_slice_map_new( exec_slice_map_mem, 20 ) );
1914 0 : FD_TEST( fd_exec_slice_map_key_max( ctx->exec_slice_map ) );
1915 0 : FD_TEST( fd_exec_slice_map_key_cnt( ctx->exec_slice_map ) == 0 );
1916 :
1917 :
1918 0 : fd_cluster_version_t * cluster_version = fd_bank_cluster_version_modify( bank );
1919 :
1920 0 : if( FD_UNLIKELY( sscanf( tile->replay.cluster_version, "%u.%u.%u", &cluster_version->major, &cluster_version->minor, &cluster_version->patch )!=3 ) ) {
1921 0 : FD_LOG_ERR(( "failed to decode cluster version, configured as \"%s\"", tile->replay.cluster_version ));
1922 0 : }
1923 :
1924 0 : fd_features_t * features = fd_bank_features_modify( bank );
1925 0 : fd_features_enable_cleaned_up( features, cluster_version );
1926 :
1927 0 : char const * one_off_features[16];
1928 0 : for (ulong i = 0; i < tile->replay.enable_features_cnt; i++) {
1929 0 : one_off_features[i] = tile->replay.enable_features[i];
1930 0 : }
1931 0 : fd_features_enable_one_offs( features, one_off_features, (uint)tile->replay.enable_features_cnt, 0UL );
1932 :
1933 0 : ctx->forks = fd_forks_join( fd_forks_new( forks_mem, FD_BLOCK_MAX, 42UL ) );
1934 :
1935 : /**********************************************************************/
1936 : /* bank_hash_cmp */
1937 : /**********************************************************************/
1938 :
1939 0 : ulong bank_hash_cmp_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "bh_cmp" );
1940 0 : FD_TEST( bank_hash_cmp_obj_id!=ULONG_MAX );
1941 0 : ctx->bank_hash_cmp = fd_bank_hash_cmp_join( fd_bank_hash_cmp_new( fd_topo_obj_laddr( topo, bank_hash_cmp_obj_id ) ) );
1942 0 : if( FD_UNLIKELY( !ctx->bank_hash_cmp ) ) {
1943 0 : FD_LOG_ERR(( "failed to join bank_hash_cmp" ));
1944 0 : }
1945 :
1946 : /**********************************************************************/
1947 : /* entry batch */
1948 : /**********************************************************************/
1949 :
1950 0 : fd_slice_exec_join( &ctx->slice_exec_ctx );
1951 0 : ctx->slice_exec_ctx.buf = slice_buf;
1952 :
1953 : /**********************************************************************/
1954 : /* capture */
1955 : /**********************************************************************/
1956 :
1957 0 : if ( strlen(tile->replay.solcap_capture) > 0 || strlen(tile->replay.dump_proto_dir) > 0 ) {
1958 0 : ctx->capture_ctx = fd_capture_ctx_new( capture_ctx_mem );
1959 0 : } else {
1960 0 : ctx->capture_ctx = NULL;
1961 0 : }
1962 :
1963 0 : if( strlen(tile->replay.solcap_capture) > 0 ) {
1964 0 : ctx->capture_ctx->checkpt_freq = ULONG_MAX;
1965 0 : ctx->capture_file = fopen( tile->replay.solcap_capture, "w+" );
1966 0 : if( FD_UNLIKELY( !ctx->capture_file ) ) {
1967 0 : FD_LOG_ERR(( "fopen(%s) failed (%d-%s)", tile->replay.solcap_capture, errno, strerror( errno ) ));
1968 0 : }
1969 0 : ctx->capture_ctx->capture_txns = 0;
1970 0 : ctx->capture_ctx->solcap_start_slot = tile->replay.capture_start_slot;
1971 0 : fd_solcap_writer_init( ctx->capture_ctx->capture, ctx->capture_file );
1972 0 : }
1973 :
1974 0 : if ( strlen(tile->replay.dump_proto_dir) > 0) {
1975 0 : ctx->capture_ctx->dump_proto_output_dir = tile->replay.dump_proto_dir;
1976 0 : if (tile->replay.dump_block_to_pb) {
1977 0 : ctx->capture_ctx->dump_block_to_pb = tile->replay.dump_block_to_pb;
1978 0 : }
1979 0 : }
1980 :
1981 : /**********************************************************************/
1982 : /* bank */
1983 : /**********************************************************************/
1984 :
1985 0 : ctx->bank_cnt = fd_topo_tile_name_cnt( topo, "bank" );
1986 0 : for( ulong i=0UL; i<(ctx->bank_cnt); i++ ) {
1987 0 : ulong busy_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "bank_busy.%lu", i );
1988 0 : FD_TEST( busy_obj_id!=ULONG_MAX );
1989 0 : ctx->bank_busy[ i ] = fd_fseq_join( fd_topo_obj_laddr( topo, busy_obj_id ) );
1990 0 : if( FD_UNLIKELY( !ctx->bank_busy[ i ] ) ) FD_LOG_ERR(( "banking tile %lu has no busy flag", i ));
1991 :
1992 0 : fd_replay_out_link_t * poh_out = &ctx->bank_out[ i ];
1993 0 : fd_topo_link_t * poh_out_link = &topo->links[ tile->out_link_id[ poh_out->idx+i ] ];
1994 0 : poh_out->mcache = poh_out_link->mcache;
1995 0 : poh_out->sync = fd_mcache_seq_laddr( poh_out->mcache );
1996 0 : poh_out->depth = fd_mcache_depth( poh_out->mcache );
1997 0 : poh_out->seq = fd_mcache_seq_query( poh_out->sync );
1998 0 : poh_out->mem = topo->workspaces[ topo->objs[ poh_out_link->dcache_obj_id ].wksp_id ].wksp;
1999 0 : poh_out->chunk0 = fd_dcache_compact_chunk0( poh_out->mem, poh_out_link->dcache );
2000 0 : poh_out->wmark = fd_dcache_compact_wmark( poh_out->mem, poh_out_link->dcache, poh_out_link->mtu );
2001 0 : poh_out->chunk = poh_out->chunk0;
2002 0 : }
2003 :
2004 0 : ctx->poh_init_done = 0U;
2005 0 : ctx->snapshot_init_done = 0;
2006 :
2007 : /**********************************************************************/
2008 : /* exec */
2009 : /**********************************************************************/
2010 0 : ctx->exec_cnt = fd_topo_tile_name_cnt( topo, "exec" );
2011 0 : if( FD_UNLIKELY( ctx->exec_cnt>FD_PACK_MAX_BANK_TILES ) ) {
2012 0 : FD_LOG_ERR(( "replay tile has too many exec tiles %lu", ctx->exec_cnt ));
2013 0 : }
2014 0 : if( FD_UNLIKELY( ctx->exec_cnt>UCHAR_MAX ) ) {
2015 : /* Exec tile id needs to fit in a uchar for the writer tile txn done
2016 : message. */
2017 0 : FD_LOG_CRIT(( "too many exec tiles %lu", ctx->exec_cnt ));
2018 0 : }
2019 :
2020 0 : for( ulong i = 0UL; i < ctx->exec_cnt; i++ ) {
2021 : /* Mark all initial state as not being ready. */
2022 0 : ctx->exec_ready[ i ] = EXEC_TXN_BUSY;
2023 0 : ctx->prev_ids[ i ] = FD_EXEC_ID_SENTINEL;
2024 :
2025 0 : ulong exec_fseq_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "exec_fseq.%lu", i );
2026 0 : if( FD_UNLIKELY( exec_fseq_id==ULONG_MAX ) ) {
2027 0 : FD_LOG_ERR(( "exec tile %lu has no fseq", i ));
2028 0 : }
2029 0 : ctx->exec_fseq[ i ] = fd_fseq_join( fd_topo_obj_laddr( topo, exec_fseq_id ) );
2030 0 : if( FD_UNLIKELY( !ctx->exec_fseq[ i ] ) ) {
2031 0 : FD_LOG_ERR(( "exec tile %lu has no fseq", i ));
2032 0 : }
2033 :
2034 : /* Setup out links. */
2035 0 : ulong idx = fd_topo_find_tile_out_link( topo, tile, "replay_exec", i );
2036 0 : fd_topo_link_t * exec_out_link = &topo->links[ tile->out_link_id[ idx ] ];
2037 :
2038 0 : if( strcmp( exec_out_link->name, "replay_exec" ) ) {
2039 0 : FD_LOG_ERR(("output link confusion for output %lu", idx ));
2040 0 : }
2041 :
2042 0 : fd_replay_out_link_t * exec_out = &ctx->exec_out[ i ];
2043 0 : exec_out->idx = idx;
2044 0 : exec_out->mem = topo->workspaces[ topo->objs[ exec_out_link->dcache_obj_id ].wksp_id ].wksp;
2045 0 : exec_out->chunk0 = fd_dcache_compact_chunk0( exec_out->mem, exec_out_link->dcache );
2046 0 : exec_out->wmark = fd_dcache_compact_wmark( exec_out->mem, exec_out_link->dcache, exec_out_link->mtu );
2047 0 : exec_out->chunk = exec_out->chunk0;
2048 0 : }
2049 :
2050 : /**********************************************************************/
2051 : /* writer */
2052 : /**********************************************************************/
2053 0 : ctx->writer_cnt = fd_topo_tile_name_cnt( topo, "writer" );
2054 0 : if( FD_UNLIKELY( ctx->writer_cnt>FD_PACK_MAX_BANK_TILES ) ) {
2055 0 : FD_LOG_CRIT(( "replay tile has too many writer tiles %lu", ctx->writer_cnt ));
2056 0 : }
2057 :
2058 0 : for( ulong i = 0UL; i < ctx->writer_cnt; i++ ) {
2059 :
2060 0 : ulong writer_fseq_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "writer_fseq.%lu", i );
2061 0 : if( FD_UNLIKELY( writer_fseq_id==ULONG_MAX ) ) {
2062 0 : FD_LOG_CRIT(( "writer tile %lu has no fseq", i ));
2063 0 : }
2064 0 : ctx->writer_fseq[ i ] = fd_fseq_join( fd_topo_obj_laddr( topo, writer_fseq_id ) );
2065 0 : if( FD_UNLIKELY( !ctx->writer_fseq[ i ] ) ) {
2066 0 : FD_LOG_CRIT(( "writer tile %lu has no fseq", i ));
2067 0 : }
2068 0 : }
2069 :
2070 : /**********************************************************************/
2071 : /* tower checkpointing for wen-restart */
2072 : /**********************************************************************/
2073 0 : ctx->tower_checkpt_fileno = -1;
2074 0 : if( FD_LIKELY( strlen( tile->replay.tower_checkpt )>0 ) ) {
2075 0 : ctx->tower_checkpt_fileno = open( tile->replay.tower_checkpt,
2076 0 : O_RDWR | O_CREAT,
2077 0 : S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH );
2078 0 : if( ctx->tower_checkpt_fileno<0 ) FD_LOG_ERR(( "Failed at opening the tower checkpoint file" ));
2079 0 : }
2080 :
2081 : /**********************************************************************/
2082 : /* links */
2083 : /**********************************************************************/
2084 :
2085 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
2086 0 : fd_topo_link_t * link = &topo->links[ tile->in_link_id[ i ] ];
2087 0 : fd_topo_wksp_t * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
2088 :
2089 0 : if( FD_LIKELY( link->dcache ) ) {
2090 0 : ctx->in[ i ].mem = link_wksp->wksp;
2091 0 : ctx->in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in[ i ].mem, link->dcache );
2092 0 : ctx->in[ i ].wmark = fd_dcache_compact_wmark ( ctx->in[ i ].mem, link->dcache, link->mtu );
2093 0 : }
2094 :
2095 0 : if( !strcmp( link->name, "repair_repla" ) ) {
2096 0 : ctx->in_kind[ i ] = IN_KIND_REPAIR;
2097 0 : } else if( !strcmp( link->name, "snap_out" ) ) {
2098 0 : ctx->in_kind[ i ] = IN_KIND_SNAP;
2099 0 : } else if( !strcmp( link->name, "root_out" ) ) {
2100 0 : ctx->in_kind[ i ] = IN_KIND_ROOT;
2101 0 : } else if( !strcmp( link->name, "writ_repl" ) ) {
2102 0 : ctx->in_kind[ i ] = IN_KIND_WRITER;
2103 0 : } else {
2104 0 : FD_LOG_ERR(( "unexpected input link name %s", link->name ));
2105 0 : }
2106 0 : }
2107 :
2108 0 : ulong replay_notif_idx = fd_topo_find_tile_out_link( topo, tile, "replay_notif", 0 );
2109 0 : if( FD_UNLIKELY( replay_notif_idx!=ULONG_MAX ) ) {
2110 0 : fd_topo_link_t * notif_out = &topo->links[ tile->out_link_id[ replay_notif_idx ] ];
2111 0 : FD_TEST( notif_out );
2112 0 : ctx->notif_out->idx = replay_notif_idx;
2113 0 : ctx->notif_out->mcache = notif_out->mcache;
2114 0 : ctx->notif_out->sync = fd_mcache_seq_laddr( ctx->notif_out->mcache );
2115 0 : ctx->notif_out->depth = fd_mcache_depth( ctx->notif_out->mcache );
2116 0 : ctx->notif_out->seq = fd_mcache_seq_query( ctx->notif_out->sync );
2117 0 : ctx->notif_out->mem = topo->workspaces[ topo->objs[ notif_out->dcache_obj_id ].wksp_id ].wksp;
2118 0 : ctx->notif_out->chunk0 = fd_dcache_compact_chunk0( ctx->notif_out->mem, notif_out->dcache );
2119 0 : ctx->notif_out->wmark = fd_dcache_compact_wmark ( ctx->notif_out->mem, notif_out->dcache, notif_out->mtu );
2120 0 : ctx->notif_out->chunk = ctx->notif_out->chunk0;
2121 0 : } else {
2122 0 : ctx->notif_out->mcache = NULL;
2123 0 : }
2124 :
2125 : /* Setup shredcap tile output. This link should only exist if the
2126 : shredcap tile has been enabled. */
2127 0 : ulong replay_shredcap_idx = fd_topo_find_tile_out_link( topo, tile, "replay_scap", 0 );
2128 0 : if( FD_UNLIKELY( replay_shredcap_idx!=ULONG_MAX ) ) {
2129 0 : fd_topo_link_t * shredcap_out = &topo->links[ tile->out_link_id[ replay_shredcap_idx ] ];
2130 0 : FD_TEST( shredcap_out );
2131 0 : ctx->shredcap_out->idx = replay_shredcap_idx;
2132 0 : ctx->shredcap_out->mem = topo->workspaces[ topo->objs[ shredcap_out->dcache_obj_id ].wksp_id ].wksp;
2133 0 : ctx->shredcap_out->chunk0 = fd_dcache_compact_chunk0( ctx->shredcap_out->mem, shredcap_out->dcache );
2134 0 : ctx->shredcap_out->wmark = fd_dcache_compact_wmark ( ctx->shredcap_out->mem, shredcap_out->dcache, shredcap_out->mtu );
2135 0 : ctx->shredcap_out->chunk = ctx->shredcap_out->chunk0;
2136 0 : } else {
2137 0 : ctx->shredcap_out->idx = ULONG_MAX;
2138 0 : }
2139 :
2140 : /* Set up stake weights tile output */
2141 0 : ctx->stake_out->idx = fd_topo_find_tile_out_link( topo, tile, "stake_out", 0 );
2142 0 : FD_TEST( ctx->stake_out->idx!=ULONG_MAX );
2143 0 : fd_topo_link_t * stake_weights_out = &topo->links[ tile->out_link_id[ ctx->stake_out->idx] ];
2144 0 : ctx->stake_out->mcache = stake_weights_out->mcache;
2145 0 : ctx->stake_out->mem = topo->workspaces[ topo->objs[ stake_weights_out->dcache_obj_id ].wksp_id ].wksp;
2146 0 : ctx->stake_out->sync = fd_mcache_seq_laddr ( ctx->stake_out->mcache );
2147 0 : ctx->stake_out->depth = fd_mcache_depth ( ctx->stake_out->mcache );
2148 0 : ctx->stake_out->seq = fd_mcache_seq_query ( ctx->stake_out->sync );
2149 0 : ctx->stake_out->chunk0 = fd_dcache_compact_chunk0( ctx->stake_out->mem, stake_weights_out->dcache );
2150 0 : ctx->stake_out->wmark = fd_dcache_compact_wmark ( ctx->stake_out->mem, stake_weights_out->dcache, stake_weights_out->mtu );
2151 0 : ctx->stake_out->chunk = ctx->stake_out->chunk0;
2152 :
2153 0 : ctx->replay_out_idx = fd_topo_find_tile_out_link( topo, tile, "replay_out", 0 );
2154 0 : if( FD_LIKELY( ctx->replay_out_idx!=ULONG_MAX ) ) {
2155 0 : fd_topo_link_t * replay_out = &topo->links[ tile->out_link_id[ ctx->replay_out_idx ] ];
2156 0 : ctx->replay_out_mem = topo->workspaces[ topo->objs[ replay_out->dcache_obj_id ].wksp_id ].wksp;
2157 0 : ctx->replay_out_chunk0 = fd_dcache_compact_chunk0( ctx->replay_out_mem, replay_out->dcache );
2158 0 : ctx->replay_out_wmark = fd_dcache_compact_wmark ( ctx->replay_out_mem, replay_out->dcache, replay_out->mtu );
2159 0 : ctx->replay_out_chunk = ctx->replay_out_chunk0;
2160 0 : FD_TEST( fd_dcache_compact_is_safe( ctx->replay_out_mem, replay_out->dcache, replay_out->mtu, replay_out->depth ) );
2161 0 : }
2162 :
2163 0 : if( FD_LIKELY( tile->replay.plugins_enabled ) ) {
2164 0 : ctx->plugin_out->idx = fd_topo_find_tile_out_link( topo, tile, "replay_plugi", 0 );
2165 0 : fd_topo_link_t const * replay_plugin_out = &topo->links[ tile->out_link_id[ ctx->plugin_out->idx] ];
2166 0 : if( strcmp( replay_plugin_out->name, "replay_plugi" ) ) {
2167 0 : FD_LOG_ERR(("output link confusion for output %lu", ctx->plugin_out->idx));
2168 0 : }
2169 0 : ctx->plugin_out->mem = topo->workspaces[ topo->objs[ replay_plugin_out->dcache_obj_id ].wksp_id ].wksp;
2170 0 : ctx->plugin_out->chunk0 = fd_dcache_compact_chunk0( ctx->plugin_out->mem, replay_plugin_out->dcache );
2171 0 : ctx->plugin_out->wmark = fd_dcache_compact_wmark ( ctx->plugin_out->mem, replay_plugin_out->dcache, replay_plugin_out->mtu );
2172 0 : ctx->plugin_out->chunk = ctx->plugin_out->chunk0;
2173 :
2174 0 : ctx->votes_plugin_out->idx = fd_topo_find_tile_out_link( topo, tile, "votes_plugin", 0 );
2175 0 : fd_topo_link_t const * votes_plugin_out = &topo->links[ tile->out_link_id[ ctx->votes_plugin_out->idx] ];
2176 0 : if( strcmp( votes_plugin_out->name, "votes_plugin" ) ) {
2177 0 : FD_LOG_ERR(("output link confusion for output %lu", ctx->votes_plugin_out->idx));
2178 0 : }
2179 0 : ctx->votes_plugin_out->mem = topo->workspaces[ topo->objs[ votes_plugin_out->dcache_obj_id ].wksp_id ].wksp;
2180 0 : ctx->votes_plugin_out->chunk0 = fd_dcache_compact_chunk0( ctx->votes_plugin_out->mem, votes_plugin_out->dcache );
2181 0 : ctx->votes_plugin_out->wmark = fd_dcache_compact_wmark ( ctx->votes_plugin_out->mem, votes_plugin_out->dcache, votes_plugin_out->mtu );
2182 0 : ctx->votes_plugin_out->chunk = ctx->votes_plugin_out->chunk0;
2183 0 : }
2184 :
2185 0 : if( strnlen( tile->replay.slots_replayed, sizeof(tile->replay.slots_replayed) )>0UL ) {
2186 0 : ctx->slots_replayed_file = fopen( tile->replay.slots_replayed, "w" );
2187 0 : FD_TEST( ctx->slots_replayed_file );
2188 0 : }
2189 :
2190 0 : FD_TEST( ctx->runtime_public!=NULL );
2191 :
2192 0 : ulong max_exec_slices = tile->replay.max_exec_slices ? tile->replay.max_exec_slices : 65536UL;
2193 0 : uchar * deque_mem = fd_spad_alloc_check( ctx->runtime_spad, fd_exec_slice_deque_align(), fd_exec_slice_deque_footprint( max_exec_slices ) );
2194 0 : ctx->exec_slice_deque = fd_exec_slice_deque_join( fd_exec_slice_deque_new( deque_mem, max_exec_slices ) );
2195 0 : if( FD_UNLIKELY( !ctx->exec_slice_deque ) ) {
2196 0 : FD_LOG_ERR(( "failed to join and create exec slice deque" ));
2197 0 : }
2198 :
2199 0 : ctx->enable_bank_hash_cmp = tile->replay.enable_bank_hash_cmp;
2200 :
2201 : /**********************************************************************/
2202 : /* metrics */
2203 : /**********************************************************************/
2204 0 : fd_histf_join( fd_histf_new( ctx->metrics.store_read_wait, FD_MHIST_SECONDS_MIN( REPLAY, STORE_READ_WAIT ),
2205 0 : FD_MHIST_SECONDS_MAX( REPLAY, STORE_READ_WAIT ) ) );
2206 0 : fd_histf_join( fd_histf_new( ctx->metrics.store_read_work, FD_MHIST_SECONDS_MIN( REPLAY, STORE_READ_WORK ),
2207 0 : FD_MHIST_SECONDS_MAX( REPLAY, STORE_READ_WORK ) ) );
2208 0 : fd_histf_join( fd_histf_new( ctx->metrics.store_publish_wait, FD_MHIST_SECONDS_MIN( REPLAY, STORE_PUBLISH_WAIT ),
2209 0 : FD_MHIST_SECONDS_MAX( REPLAY, STORE_PUBLISH_WAIT ) ) );
2210 0 : fd_histf_join( fd_histf_new( ctx->metrics.store_publish_work, FD_MHIST_SECONDS_MIN( REPLAY, STORE_PUBLISH_WORK ),
2211 0 : FD_MHIST_SECONDS_MAX( REPLAY, STORE_PUBLISH_WORK ) ) );
2212 :
2213 0 : FD_LOG_NOTICE(("Finished unprivileged init"));
2214 0 : }
2215 :
2216 : static ulong
2217 : populate_allowed_seccomp( fd_topo_t const * topo FD_PARAM_UNUSED,
2218 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
2219 : ulong out_cnt,
2220 0 : struct sock_filter * out ) {
2221 0 : populate_sock_filter_policy_fd_replay_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
2222 0 : return sock_filter_policy_fd_replay_tile_instr_cnt;
2223 0 : }
2224 :
2225 : static ulong
2226 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
2227 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
2228 : ulong out_fds_cnt,
2229 0 : int * out_fds ) {
2230 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
2231 :
2232 0 : ulong out_cnt = 0UL;
2233 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
2234 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
2235 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
2236 0 : return out_cnt;
2237 0 : }
2238 :
2239 : static inline void
2240 0 : metrics_write( fd_replay_tile_ctx_t * ctx ) {
2241 0 : FD_MGAUGE_SET( REPLAY, LAST_VOTED_SLOT, ctx->metrics.last_voted_slot );
2242 0 : FD_MGAUGE_SET( REPLAY, SLOT, ctx->metrics.slot );
2243 0 : FD_MHIST_COPY( REPLAY, STORE_READ_WAIT, ctx->metrics.store_read_wait );
2244 0 : FD_MHIST_COPY( REPLAY, STORE_READ_WORK, ctx->metrics.store_read_work );
2245 0 : FD_MHIST_COPY( REPLAY, STORE_PUBLISH_WAIT, ctx->metrics.store_publish_wait );
2246 0 : FD_MHIST_COPY( REPLAY, STORE_PUBLISH_WORK, ctx->metrics.store_publish_work );
2247 0 : }
2248 :
2249 : /* TODO: This needs to get sized out correctly. */
2250 0 : #define STEM_BURST (64UL)
2251 :
2252 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_replay_tile_ctx_t
2253 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_replay_tile_ctx_t)
2254 :
2255 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
2256 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
2257 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
2258 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
2259 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
2260 :
2261 : #include "../../disco/stem/fd_stem.c"
2262 :
2263 : fd_topo_run_tile_t fd_tile_replay = {
2264 : .name = "replay",
2265 : .loose_footprint = loose_footprint,
2266 : .populate_allowed_seccomp = populate_allowed_seccomp,
2267 : .populate_allowed_fds = populate_allowed_fds,
2268 : .scratch_align = scratch_align,
2269 : .scratch_footprint = scratch_footprint,
2270 : .privileged_init = privileged_init,
2271 : .unprivileged_init = unprivileged_init,
2272 : .run = stem_run,
2273 : };
|