LCOV - code coverage report
Current view: top level - discof/replay - fd_replay_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 1883 0.0 %
Date: 2025-03-20 12:08:36 Functions: 0 39 0.0 %

          Line data    Source code
       1             : #define _GNU_SOURCE
       2             : #include "../../disco/tiles.h"
       3             : #include "generated/fd_replay_tile_seccomp.h"
       4             : 
       5             : #include "../geyser/fd_replay_notif.h"
       6             : #include "../restart/fd_restart.h"
       7             : #include "../store/fd_epoch_forks.h"
       8             : 
       9             : #include "../../disco/keyguard/fd_keyload.h"
      10             : #include "../../disco/topo/fd_pod_format.h"
      11             : #include "../../flamenco/runtime/fd_txncache.h"
      12             : #include "../../flamenco/runtime/context/fd_capture_ctx.h"
      13             : #include "../../flamenco/runtime/context/fd_exec_epoch_ctx.h"
      14             : #include "../../flamenco/runtime/context/fd_exec_slot_ctx.h"
      15             : #include "../../flamenco/runtime/program/fd_bpf_program_util.h"
      16             : #include "../../flamenco/runtime/sysvar/fd_sysvar_epoch_schedule.h"
      17             : #include "../../flamenco/runtime/sysvar/fd_sysvar_slot_history.h"
      18             : #include "../../flamenco/runtime/fd_runtime_init.h"
      19             : #include "../../flamenco/snapshot/fd_snapshot.h"
      20             : #include "../../flamenco/stakes/fd_stakes.h"
      21             : #include "../../flamenco/runtime/fd_runtime.h"
      22             : #include "../../flamenco/rewards/fd_rewards.h"
      23             : #include "../../disco/metrics/fd_metrics.h"
      24             : #include "../../choreo/fd_choreo.h"
      25             : #include "../../funk/fd_funk_filemap.h"
      26             : #include "../../flamenco/snapshot/fd_snapshot_create.h"
      27             : #include "../../disco/plugin/fd_plugin.h"
      28             : #include "fd_replay.h"
      29             : 
      30             : #include <arpa/inet.h>
      31             : #include <errno.h>
      32             : #include <fcntl.h>
      33             : #include <linux/unistd.h>
      34             : #include <netdb.h>
      35             : #include <netinet/in.h>
      36             : #include <sys/random.h>
      37             : #include <sys/socket.h>
      38             : #include <sys/stat.h>
      39             : #include <sys/types.h>
      40             : #include <unistd.h>
      41             : 
      42             : /* An estimate of the max number of transactions in a block.  If there are more
      43             :    transactions, they must be split into multiple sets. */
      44             : #define MAX_TXNS_PER_REPLAY ( ( FD_SHRED_MAX_PER_SLOT * FD_SHRED_MAX_SZ) / FD_TXN_MIN_SERIALIZED_SZ )
      45             : 
      46             : #define PLUGIN_PUBLISH_TIME_NS ((long)60e9)
      47             : 
      48           0 : #define STORE_IN_IDX   (0UL)
      49           0 : #define PACK_IN_IDX    (1UL)
      50           0 : #define BATCH_IN_IDX   (2UL)
      51           0 : #define SHRED_IN_IDX   (3UL)
      52             : 
      53           0 : #define STAKE_OUT_IDX  (0UL)
      54           0 : #define NOTIF_OUT_IDX  (1UL)
      55           0 : #define SENDER_OUT_IDX (2UL)
      56           0 : #define POH_OUT_IDX    (3UL)
      57             : 
      58           0 : #define VOTE_ACC_MAX   (2000000UL)
      59             : 
      60             : #define BANK_HASH_CMP_LG_MAX 16
      61             : 
      62             : struct fd_shred_replay_in_ctx {
      63             :   fd_wksp_t * mem;
      64             :   ulong       chunk0;
      65             :   ulong       wmark;
      66             : };
      67             : typedef struct fd_shred_replay_in_ctx fd_shred_replay_in_ctx_t;
      68             : 
      69             : struct fd_replay_out_ctx {
      70             :   ulong            idx; /* TODO refactor the bank_out to use this */
      71             : 
      72             :   fd_frag_meta_t * mcache;
      73             :   ulong *          sync;
      74             :   ulong            depth;
      75             :   ulong            seq;
      76             : 
      77             :   fd_wksp_t * mem;
      78             :   ulong       chunk0;
      79             :   ulong       wmark;
      80             :   ulong       chunk;
      81             : 
      82             : };
      83             : typedef struct fd_replay_out_ctx fd_replay_out_ctx_t;
      84             : 
      85             : struct fd_replay_tile_metrics {
      86             :   ulong slot;
      87             :   ulong last_voted_slot;
      88             : };
      89             : typedef struct fd_replay_tile_metrics fd_replay_tile_metrics_t;
      90             : #define FD_REPLAY_TILE_METRICS_FOOTPRINT ( sizeof( fd_replay_tile_metrics_t ) )
      91             : 
      92             : struct fd_slice_exec_ctx {
      93             :   ulong wmark;     /* offset to start executing from. Will be on a transaction or microblock boundary. */
      94             :   ulong sz;        /* total bytes occupied in the mbatch memory. Queried slices should be placed at this offset */
      95             :   ulong mblks_rem; /* microblocks remaining in the current batch iteration. If 0, the next batch can be read. */
      96             :   ulong txns_rem;  /* txns remaining in current microblock iteration. If 0, the next microblock can be read. */
      97             : 
      98             :   ulong last_mblk_off; /* offset to the last microblock hdr seen */
      99             :   int   last_batch;    /* signifies last batch execution for stopping condition */
     100             : };
     101             : typedef struct fd_slice_exec_ctx fd_slice_exec_ctx_t;
     102             : 
     103             : struct fd_replay_tile_ctx {
     104             :   fd_wksp_t * wksp;
     105             :   fd_wksp_t * blockstore_wksp;
     106             :   fd_wksp_t * funk_wksp;
     107             :   fd_wksp_t * status_cache_wksp;
     108             : 
     109             :   fd_wksp_t  * replay_public_wksp;
     110             :   fd_runtime_public_t * replay_public;
     111             : 
     112             :   // Store tile input
     113             :   fd_wksp_t * store_in_mem;
     114             :   ulong       store_in_chunk0;
     115             :   ulong       store_in_wmark;
     116             : 
     117             :   // Pack tile input
     118             :   fd_wksp_t * pack_in_mem;
     119             :   ulong       pack_in_chunk0;
     120             :   ulong       pack_in_wmark;
     121             : 
     122             :   // Batch tile input for epoch account hash
     123             :   fd_wksp_t * batch_in_mem;
     124             :   ulong       batch_in_chunk0;
     125             :   ulong       batch_in_wmark;
     126             : 
     127             :   // Shred tile input
     128             :   ulong             shred_in_cnt;
     129             :   fd_shred_replay_in_ctx_t shred_in[ 32 ];
     130             : 
     131             :   // Notification output defs
     132             :   fd_frag_meta_t * notif_out_mcache;
     133             :   ulong *          notif_out_sync;
     134             :   ulong            notif_out_depth;
     135             :   ulong            notif_out_seq;
     136             : 
     137             :   fd_wksp_t * notif_out_mem;
     138             :   ulong       notif_out_chunk0;
     139             :   ulong       notif_out_wmark;
     140             :   ulong       notif_out_chunk;
     141             : 
     142             :   // Sender output defs
     143             :   fd_frag_meta_t * sender_out_mcache;
     144             :   ulong *          sender_out_sync;
     145             :   ulong            sender_out_depth;
     146             :   ulong            sender_out_seq;
     147             : 
     148             :   fd_wksp_t * sender_out_mem;
     149             :   ulong       sender_out_chunk0;
     150             :   ulong       sender_out_wmark;
     151             :   ulong       sender_out_chunk;
     152             : 
     153             :   // Stake weights output link defs
     154             :   fd_frag_meta_t * stake_weights_out_mcache;
     155             :   ulong *          stake_weights_out_sync;
     156             :   ulong            stake_weights_out_depth;
     157             :   ulong            stake_weights_out_seq;
     158             : 
     159             :   fd_wksp_t * stake_weights_out_mem;
     160             :   ulong       stake_weights_out_chunk0;
     161             :   ulong       stake_weights_out_wmark;
     162             :   ulong       stake_weights_out_chunk;
     163             : 
     164             :   // Inputs to plugin/gui
     165             :   ulong       replay_plug_out_idx;
     166             :   fd_wksp_t * replay_plugin_out_mem;
     167             :   ulong       replay_plugin_out_chunk0;
     168             :   ulong       replay_plugin_out_wmark;
     169             :   ulong       replay_plugin_out_chunk;
     170             : 
     171             :   ulong       votes_plug_out_idx;
     172             :   fd_wksp_t * votes_plugin_out_mem;
     173             :   ulong       votes_plugin_out_chunk0;
     174             :   ulong       votes_plugin_out_wmark;
     175             :   ulong       votes_plugin_out_chunk;
     176             :   long        last_plugin_push_time;
     177             : 
     178             :   char const * blockstore_checkpt;
     179             :   int          tx_metadata_storage;
     180             :   char const * funk_checkpt;
     181             :   char const * genesis;
     182             :   char const * incremental;
     183             :   char const * snapshot;
     184             : 
     185             :   /* Do not modify order! This is join-order in unprivileged_init. */
     186             : 
     187             :   fd_alloc_t *          alloc;
     188             :   fd_valloc_t           valloc;
     189             :   fd_funk_t *           funk;
     190             :   fd_acc_mgr_t *        acc_mgr;
     191             :   fd_exec_epoch_ctx_t * epoch_ctx;
     192             :   fd_epoch_t *          epoch;
     193             :   fd_forks_t *          forks;
     194             :   fd_ghost_t *          ghost;
     195             :   fd_tower_t *          tower;
     196             :   fd_replay_t *         replay;
     197             : 
     198             :   fd_pubkey_t validator_identity[1];
     199             :   fd_pubkey_t vote_authority[1];
     200             :   fd_pubkey_t vote_acc[1];
     201             : 
     202             :   /* Vote accounts in the current epoch. Lifetimes of the vote account
     203             :      addresses (pubkeys) are valid for the epoch (the pubkey memory is
     204             :      owned by the epoch bank). */
     205             : 
     206             :   fd_voter_t *          epoch_voters; /* map chain of slot->voter */
     207             :   fd_bank_hash_cmp_t *  bank_hash_cmp;
     208             : 
     209             :   /* Microblock (entry) batch buffer for replay. */
     210             : 
     211             :   uchar * mbatch;
     212             :   fd_slice_exec_ctx_t slice_exec_ctx;
     213             : 
     214             :   /* Tpool */
     215             : 
     216             :   uchar        tpool_mem[FD_TPOOL_FOOTPRINT( FD_TILE_MAX )] __attribute__( ( aligned( FD_TPOOL_ALIGN ) ) );
     217             :   fd_tpool_t * tpool;
     218             : 
     219             :   /* Depends on store_int and is polled in after_credit */
     220             : 
     221             :   fd_blockstore_t   blockstore_ljoin;
     222             :   int               blockstore_fd; /* file descriptor for archival file */
     223             :   fd_blockstore_t * blockstore;
     224             : 
     225             :   /* Updated during execution */
     226             : 
     227             :   fd_exec_slot_ctx_t *  slot_ctx;
     228             : 
     229             :   /* Metadata updated during execution */
     230             : 
     231             :   ulong     curr_slot;
     232             :   ulong     parent_slot;
     233             :   ulong     snapshot_slot;
     234             :   ulong     last_completed_slot; /* questionable variable used for making sure we do post-block execution steps only once,
     235             :                                     probably can remove this if after we rip out ctx->curr_slot (recieved from STORE) */
     236             :   fd_hash_t blockhash;
     237             :   ulong     flags;
     238             :   ulong     txn_cnt;
     239             :   ulong     bank_idx;
     240             : 
     241             :   ulong     fecs_inserted;
     242             :   ulong     fecs_removed;
     243             :   /* Other metadata */
     244             : 
     245             :   ulong funk_seed;
     246             :   ulong status_cache_seed;
     247             :   fd_capture_ctx_t * capture_ctx;
     248             :   FILE *             capture_file;
     249             :   FILE *             slots_replayed_file;
     250             : 
     251             :   int skip_frag;
     252             : 
     253             :   ulong * first_turbine;
     254             : 
     255             :   ulong * bank_busy[ FD_PACK_MAX_BANK_TILES ];
     256             :   ulong   bank_cnt;
     257             :   fd_replay_out_ctx_t bank_out[ FD_PACK_MAX_BANK_TILES ]; /* Sending to PoH finished txns + a couple more tasks ??? */
     258             : 
     259             :   ulong   exec_cnt;
     260             :   ulong   exec_out_idx;
     261             :   fd_replay_out_ctx_t exec_out[ FD_PACK_MAX_BANK_TILES ]; /* Sending to exec unexecuted txns */
     262             : 
     263             :   ulong root; /* the root slot is the most recent slot to have reached
     264             :                  max lockout in the tower  */
     265             : 
     266             :   ulong * published_wmark; /* publish watermark. The watermark is defined as the
     267             :                   minimum of the tower root (root above) and blockstore
     268             :                   smr (blockstore->smr). The watermark is used to
     269             :                   publish our fork-aware structures eg. blockstore,
     270             :                   forks, ghost. In general, publishing has the effect of
     271             :                   pruning minority forks in those structures,
     272             :                   indicating that is ok to release the memory being
     273             :                   occupied by said forks.
     274             : 
     275             :                   The reason it has to be the minimum of the two, is the
     276             :                   tower root can lag the SMR and vice versa, but both
     277             :                   the fork-aware structures need to maintain information
     278             :                   through both of those slots. */
     279             : 
     280             :   ulong * poh;  /* proof-of-history slot */
     281             :   uint poh_init_done;
     282             :   int  snapshot_init_done;
     283             : 
     284             :   int         tower_checkpt_fileno;
     285             : 
     286             :   int         vote;
     287             :   fd_pubkey_t validator_identity_pubkey[ 1 ];
     288             :   fd_pubkey_t vote_acct_addr[ 1 ];
     289             : 
     290             :   fd_txncache_t * status_cache;
     291             :   void * bmtree[ FD_PACK_MAX_BANK_TILES ];
     292             : 
     293             :   fd_epoch_forks_t epoch_forks[1];
     294             : 
     295             :   /* The spad allocators used by the executor tiles are NOT the same as the
     296             :      spad used for general, longer-lasting spad allocations. The lifetime of
     297             :      the exec spad is just through an execution. The runtime spad is scoped
     298             :      to the runtime. The top-most frame will persist for the entire duration
     299             :      of the process. There will also be a potential second frame that persists
     300             :      across multiple slots that is created for rewards distrobution. Every other
     301             :      spad frame should NOT exist beyond the scope of a block. */
     302             :   fd_spad_t * exec_spads[ 128UL ];
     303             :   ulong       exec_spad_cnt;
     304             :   fd_spad_t * runtime_spad;
     305             : 
     306             :   /* TODO: refactor this all into fd_replay_tile_snapshot_ctx_t. */
     307             :   ulong   snapshot_interval;        /* User defined parameter */
     308             :   ulong   incremental_interval;     /* User defined parameter */
     309             :   ulong   last_full_snap;           /* If a full snapshot has been produced */
     310             :   ulong * is_constipated;           /* Shared fseq to determine if funk should be constipated */
     311             :   ulong   prev_full_snapshot_dist;  /* Tracking for snapshot creation */
     312             :   ulong   prev_incr_snapshot_dist;  /* Tracking for incremental snapshot creation */
     313             :   ulong   double_constipation_slot; /* Tracking for double constipation if any */
     314             : 
     315             :   fd_funk_txn_t * false_root;
     316             :   fd_funk_txn_t * second_false_root;
     317             : 
     318             :   int     is_caught_up;
     319             : 
     320             :   /* Metrics */
     321             :   fd_replay_tile_metrics_t metrics;
     322             : };
     323             : typedef struct fd_replay_tile_ctx fd_replay_tile_ctx_t;
     324             : 
     325             : FD_FN_CONST static inline ulong
     326           0 : scratch_align( void ) {
     327           0 :   return 128UL;
     328           0 : }
     329             : 
     330             : FD_FN_PURE static inline ulong
     331           0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
     332           0 :   return 24UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
     333           0 : }
     334             : 
     335             : FD_FN_PURE static inline ulong
     336           0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
     337             : 
     338             :   /* Do not modify order! This is join-order in unprivileged_init. */
     339             : 
     340           0 :   ulong l = FD_LAYOUT_INIT;
     341           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
     342           0 :   l = FD_LAYOUT_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
     343           0 :   l = FD_LAYOUT_APPEND( l, FD_CAPTURE_CTX_ALIGN, FD_CAPTURE_CTX_FOOTPRINT );
     344           0 :   l = FD_LAYOUT_APPEND( l, fd_epoch_align(), fd_epoch_footprint( FD_VOTER_MAX ) );
     345           0 :   l = FD_LAYOUT_APPEND( l, fd_forks_align(), fd_forks_footprint( FD_BLOCK_MAX ) );
     346           0 :   l = FD_LAYOUT_APPEND( l, fd_ghost_align(), fd_ghost_footprint( FD_BLOCK_MAX ) );
     347           0 :   l = FD_LAYOUT_APPEND( l, fd_replay_align(), fd_replay_footprint( tile->replay.fec_max, FD_SHRED_MAX_PER_SLOT, FD_BLOCK_MAX ) );
     348           0 :   l = FD_LAYOUT_APPEND( l, fd_tower_align(), fd_tower_footprint() );
     349           0 :   l = FD_LAYOUT_APPEND( l, fd_bank_hash_cmp_align(), fd_bank_hash_cmp_footprint( ) );
     350           0 :   for( ulong i = 0UL; i<FD_PACK_MAX_BANK_TILES; i++ ) {
     351           0 :     l = FD_LAYOUT_APPEND( l, FD_BMTREE_COMMIT_ALIGN, FD_BMTREE_COMMIT_FOOTPRINT(0) );
     352           0 :   }
     353           0 :   l = FD_LAYOUT_APPEND( l, 128UL, FD_SLICE_MAX );
     354           0 :   ulong  thread_spad_size  = fd_spad_footprint( FD_RUNTIME_TRANSACTION_EXECUTION_FOOTPRINT_DEFAULT );
     355           0 :   l = FD_LAYOUT_APPEND( l, fd_spad_align(), tile->replay.tpool_thread_count * fd_ulong_align_up( thread_spad_size, fd_spad_align() ) );
     356           0 :   l = FD_LAYOUT_APPEND( l, fd_spad_align(), FD_RUNTIME_BLOCK_EXECUTION_FOOTPRINT ); /* FIXME: make this configurable */
     357           0 :   l = FD_LAYOUT_FINI  ( l, scratch_align() );
     358           0 :   return l;
     359           0 : }
     360             : 
     361             : static void
     362             : hash_transactions( void *       mem,
     363             :                    fd_txn_p_t * txns,
     364             :                    ulong        txn_cnt,
     365           0 :                    uchar *      mixin ) {
     366           0 :   fd_bmtree_commit_t * bmtree = fd_bmtree_commit_init( mem, 32UL, 1UL, 0UL );
     367           0 :   for( ulong i=0; i<txn_cnt; i++ ) {
     368           0 :     fd_txn_p_t * _txn = txns + i;
     369           0 :     if( FD_UNLIKELY( !(_txn->flags & FD_TXN_P_FLAGS_EXECUTE_SUCCESS) ) ) continue;
     370             : 
     371           0 :     fd_txn_t * txn = TXN(_txn);
     372           0 :     for( ulong j=0; j<txn->signature_cnt; j++ ) {
     373           0 :       fd_bmtree_node_t node[1];
     374           0 :       fd_bmtree_hash_leaf( node, _txn->payload+txn->signature_off+64UL*j, 64UL, 1UL );
     375           0 :       fd_bmtree_commit_append( bmtree, node, 1UL );
     376           0 :     }
     377           0 :   }
     378           0 :   uchar * root = fd_bmtree_commit_fini( bmtree );
     379           0 :   fd_memcpy( mixin, root, 32UL );
     380           0 : }
     381             : 
     382             : void
     383             : publish_stake_weights( fd_replay_tile_ctx_t * ctx,
     384             :                        fd_stem_context_t *    stem,
     385           0 :                        fd_exec_slot_ctx_t *   slot_ctx ) {
     386           0 :   fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( slot_ctx->epoch_ctx );
     387           0 :   if( slot_ctx->slot_bank.epoch_stakes.vote_accounts_root!=NULL ) {
     388           0 :     ulong *             stake_weights_msg = fd_chunk_to_laddr( ctx->stake_weights_out_mem,
     389           0 :                                                                ctx->stake_weights_out_chunk );
     390           0 :     fd_stake_weight_t * stake_weights     = (fd_stake_weight_t *)&stake_weights_msg[5];
     391           0 :     ulong               stake_weight_idx  = fd_stake_weights_by_node( &ctx->slot_ctx->slot_bank.epoch_stakes,
     392           0 :                                                                       stake_weights,
     393           0 :                                                                       ctx->runtime_spad );
     394             : 
     395           0 :     stake_weights_msg[0] = fd_slot_to_leader_schedule_epoch( &epoch_bank->epoch_schedule, slot_ctx->slot_bank.slot ) - 1; /* epoch */
     396           0 :     stake_weights_msg[1] = stake_weight_idx; /* staked_cnt */
     397           0 :     stake_weights_msg[2] = fd_epoch_slot0( &epoch_bank->epoch_schedule, stake_weights_msg[0] ); /* start_slot */
     398           0 :     stake_weights_msg[3] = epoch_bank->epoch_schedule.slots_per_epoch; /* slot_cnt */
     399           0 :     stake_weights_msg[4] = 0UL; /* excluded stake */
     400           0 :     FD_LOG_NOTICE(("sending current epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
     401           0 :     ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
     402             : 
     403           0 :     ulong stake_weights_sz  = 5*sizeof(ulong) + (stake_weight_idx * sizeof(fd_stake_weight_t));
     404           0 :     ulong stake_weights_sig = 4UL;
     405           0 :     fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_weights_out_chunk, stake_weights_sz, 0UL, 0UL, tspub );
     406           0 :     ctx->stake_weights_out_chunk = fd_dcache_compact_next( ctx->stake_weights_out_chunk, stake_weights_sz, ctx->stake_weights_out_chunk0, ctx->stake_weights_out_wmark );
     407           0 :   }
     408             : 
     409           0 :   if( epoch_bank->next_epoch_stakes.vote_accounts_root!=NULL ) {
     410           0 :     ulong * stake_weights_msg         = fd_chunk_to_laddr( ctx->stake_weights_out_mem, ctx->stake_weights_out_chunk );
     411           0 :     fd_stake_weight_t * stake_weights = (fd_stake_weight_t *)&stake_weights_msg[5];
     412           0 :     ulong stake_weight_idx            = fd_stake_weights_by_node( &epoch_bank->next_epoch_stakes, stake_weights, ctx->runtime_spad );
     413             : 
     414           0 :     stake_weights_msg[0] = fd_slot_to_leader_schedule_epoch( &epoch_bank->epoch_schedule,
     415           0 :                                                              slot_ctx->slot_bank.slot ); /* epoch */
     416           0 :     stake_weights_msg[1] = stake_weight_idx; /* staked_cnt */
     417           0 :     stake_weights_msg[2] = fd_epoch_slot0( &epoch_bank->epoch_schedule, stake_weights_msg[0] ); /* start_slot */
     418           0 :     stake_weights_msg[3] = epoch_bank->epoch_schedule.slots_per_epoch; /* slot_cnt */
     419           0 :     stake_weights_msg[4] = 0UL; /* excluded stake */
     420           0 :     FD_LOG_NOTICE(("sending next epoch stake weights - epoch: %lu, stake_weight_cnt: %lu, start_slot: %lu, slot_cnt: %lu", stake_weights_msg[0], stake_weights_msg[1], stake_weights_msg[2], stake_weights_msg[3]));
     421           0 :     ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
     422             : 
     423           0 :     ulong stake_weights_sz = 5*sizeof(ulong) + (stake_weight_idx * sizeof(fd_stake_weight_t));
     424           0 :     ulong stake_weights_sig = 4UL;
     425           0 :     fd_stem_publish( stem, 0UL, stake_weights_sig, ctx->stake_weights_out_chunk, stake_weights_sz, 0UL, 0UL, tspub );
     426           0 :     ctx->stake_weights_out_chunk = fd_dcache_compact_next( ctx->stake_weights_out_chunk, stake_weights_sz, ctx->stake_weights_out_chunk0, ctx->stake_weights_out_wmark );
     427           0 :   }
     428           0 : }
     429             : 
     430             : /* Polls the blockstore block info object for newly completed slices of
     431             :    slot. Adds it to the tail of slice_deque (which should be the
     432             :    slice_deque object of the slot, slice_map[slot]) */
     433             : 
     434             : int
     435             : slice_poll( fd_replay_tile_ctx_t * ctx,
     436             :             fd_replay_slice_t    * slice_deque,
     437           0 :             ulong slot ) {
     438           0 :   uint consumed_idx, slices_added;
     439           0 :   for(;;) { /* speculative query */
     440           0 :     fd_block_map_query_t query[1] = { 0 };
     441           0 :     int err = fd_block_map_query_try( ctx->blockstore->block_map, &slot, NULL, query, 0 );
     442           0 :     fd_block_info_t * block_info = fd_block_map_query_ele( query );
     443             : 
     444           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_KEY   ) ) return 0;
     445           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
     446             : 
     447           0 :     consumed_idx = block_info->consumed_idx;
     448           0 :     slices_added = 0;
     449             : 
     450           0 :     if( FD_UNLIKELY( block_info->buffered_idx == UINT_MAX ) ) return 1;
     451             : 
     452           0 :     for( uint idx = consumed_idx + 1; idx <= block_info->buffered_idx; idx++ ) {
     453           0 :       if( FD_UNLIKELY( fd_block_set_test( block_info->data_complete_idxs, idx ) ) ) {
     454           0 :         slices_added++;
     455           0 :         fd_replay_slice_deque_push_tail( slice_deque->deque, ((ulong)(consumed_idx + 1) << 32) | ((ulong)idx) );
     456           0 :         FD_LOG_INFO(( "adding slice replay: slot %lu, slice start: %u, slice end: %u", slot, consumed_idx + 1, idx ));
     457           0 :         consumed_idx = idx;
     458           0 :       }
     459           0 :     }
     460           0 :     if( FD_UNLIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) break;
     461             :     /* need to dequeue and try again speculatively */
     462           0 :     for( uint i = 0; i < slices_added; i++ ) {
     463           0 :       fd_replay_slice_deque_pop_tail( slice_deque->deque );
     464           0 :     }
     465           0 :   }
     466             : 
     467           0 :   if( slices_added ){
     468           0 :     fd_block_map_query_t query[1] = { 0 };
     469           0 :     fd_block_map_prepare( ctx->blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
     470           0 :     fd_block_info_t * block_info = fd_block_map_query_ele( query );
     471           0 :     block_info->consumed_idx = consumed_idx;
     472           0 :     fd_block_map_publish( query );
     473           0 :     return 1;
     474           0 :   }
     475           0 :   return 0;
     476           0 : }
     477             : 
     478             : static int
     479             : before_frag( fd_replay_tile_ctx_t * ctx,
     480             :              ulong                  in_idx,
     481             :              ulong                  seq,
     482           0 :              ulong                  sig ) {
     483           0 :   (void)ctx;
     484           0 :   (void)seq;
     485             : 
     486           0 :   if( in_idx == SHRED_IN_IDX ) {
     487             :     //FD_LOG_NOTICE(( "in_idx: %lu, seq: %lu, sig: %lu", in_idx, seq, sig ));
     488             : 
     489           0 :     ulong slot        = fd_disco_shred_replay_sig_slot       ( sig );
     490           0 :     uint  shred_idx   = fd_disco_shred_replay_sig_shred_idx  ( sig );
     491           0 :     uint  fec_set_idx = fd_disco_shred_replay_sig_fec_set_idx( sig );
     492           0 :     int   is_code     = fd_disco_shred_replay_sig_is_code    ( sig );
     493           0 :     int   completes   = fd_disco_shred_replay_sig_completes  ( sig );
     494             : 
     495           0 :     fd_replay_fec_t * fec = fd_replay_fec_query( ctx->replay, slot, fec_set_idx );
     496           0 :     if( FD_UNLIKELY( !fec ) ) { /* first time receiving a shred for this FEC set */
     497           0 :       fec = fd_replay_fec_insert( ctx->replay, slot, fec_set_idx );
     498           0 :       ctx->fecs_inserted++;
     499             :       /* TODO implement eviction */
     500           0 :     }
     501             : 
     502             :     /* If the FEC set is complete we don't need to track it anymore. */
     503             : 
     504           0 :     if( FD_UNLIKELY( completes ) ) {
     505           0 :       fd_replay_slice_t * slice_deque = fd_replay_slice_map_query( ctx->replay->slice_map, slot, NULL );
     506             : 
     507           0 :       if( FD_UNLIKELY( !slice_deque ) ) slice_deque = fd_replay_slice_map_insert( ctx->replay->slice_map, slot ); /* create new map entry for this slot */
     508             : 
     509           0 :       FD_LOG_INFO(( "removing FEC set %u from slot %lu", fec_set_idx, slot ));
     510           0 :       fd_replay_fec_remove( ctx->replay, slot, fec_set_idx );
     511           0 :       ctx->fecs_removed++;
     512           0 :       slice_poll( ctx, slice_deque, slot );
     513           0 :       return 1; /* skip frag */
     514           0 :     }
     515             : 
     516             :     /* If it is a coding shred, check if it is the first coding shred
     517             :        we're receiving. We know it's the first if data_cnt is 0 because
     518             :        that is not a valid cnt and means it's uninitialized. */
     519             : 
     520           0 :     if( FD_LIKELY( is_code ) ) { /* optimize for |code| >= |data| */
     521           0 :       return fec->data_cnt != 0; /* process frag (shred hdr) if it's the first coding shred */
     522           0 :     } else {
     523           0 :       uint i = shred_idx - fec_set_idx;
     524           0 :       fd_replay_fec_idxs_insert( fec->idxs, i ); /* mark ith data shred as received */
     525           0 :       return 1; /* skip frag */
     526           0 :     }
     527           0 :   }
     528             : 
     529           0 :   return 0; /* non-shred in - don't skip */
     530           0 : }
     531             : 
     532             : static void
     533             : during_frag( fd_replay_tile_ctx_t * ctx,
     534             :              ulong                  in_idx,
     535             :              ulong                  seq FD_PARAM_UNUSED,
     536             :              ulong                  sig,
     537             :              ulong                  chunk,
     538             :              ulong                  sz,
     539           0 :              ulong                  ctl FD_PARAM_UNUSED ) {
     540             : 
     541           0 :   ctx->skip_frag = 0;
     542             : 
     543           0 :   if( in_idx == STORE_IN_IDX ) {
     544           0 :     if( FD_UNLIKELY( chunk<ctx->store_in_chunk0 || chunk>ctx->store_in_wmark || sz>MAX_TXNS_PER_REPLAY ) ) {
     545           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->store_in_chunk0, ctx->store_in_wmark ));
     546           0 :     }
     547           0 :     uchar * src = (uchar *)fd_chunk_to_laddr( ctx->store_in_mem, chunk );
     548             :     /* Incoming packet from store tile. Format:
     549             :        Parent slot (ulong - 8 bytes)
     550             :        Updated block hash/PoH hash (fd_hash_t - 32 bytes)
     551             :        Microblock as a list of fd_txn_p_t (sz * sizeof(fd_txn_p_t)) */
     552             : 
     553           0 :     ctx->curr_slot = fd_disco_replay_old_sig_slot( sig );
     554             :     /* slot changes */
     555           0 :     if( FD_UNLIKELY( ctx->curr_slot < fd_fseq_query( ctx->published_wmark ) ) ) {
     556           0 :       FD_LOG_WARNING(( "store sent slot %lu before our root.", ctx->curr_slot ));
     557           0 :     }
     558           0 :     ctx->flags = 0; //fd_disco_replay_old_sig_flags( sig );
     559           0 :     ctx->txn_cnt = sz;
     560             : 
     561           0 :     ctx->parent_slot = FD_LOAD( ulong, src );
     562           0 :     src += sizeof(ulong);
     563           0 :     memcpy( ctx->blockhash.uc, src, sizeof(fd_hash_t) );
     564           0 :     src += sizeof(fd_hash_t);
     565           0 :     ctx->bank_idx = 0UL;
     566           0 :     fd_replay_out_ctx_t * bank_out = &ctx->bank_out[ ctx->bank_idx ];
     567           0 :     uchar * dst_poh = fd_chunk_to_laddr( bank_out->mem, bank_out->chunk );
     568           0 :     fd_memcpy( dst_poh, src, sz * sizeof(fd_txn_p_t) );
     569             : 
     570           0 :     FD_LOG_INFO(( "other microblock - slot: %lu, parent_slot: %lu, txn_cnt: %lu", ctx->curr_slot, ctx->parent_slot, sz ));
     571           0 :   } else if( in_idx == PACK_IN_IDX ) {
     572           0 :     if( FD_UNLIKELY( chunk<ctx->pack_in_chunk0 || chunk>ctx->pack_in_wmark || sz>USHORT_MAX ) ) {
     573           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->pack_in_chunk0, ctx->pack_in_wmark ));
     574           0 :     }
     575           0 :     uchar * src = (uchar *)fd_chunk_to_laddr( ctx->pack_in_mem, chunk );
     576             :     /* Incoming packet from pack tile. Format:
     577             :        Microblock as a list of fd_txn_p_t (sz * sizeof(fd_txn_p_t))
     578             :        Microblock bank trailer
     579             :     */
     580           0 :     ctx->curr_slot = fd_disco_poh_sig_slot( sig );
     581           0 :     if( FD_UNLIKELY( ctx->curr_slot < fd_fseq_query( ctx->published_wmark ) ) ) {
     582           0 :       FD_LOG_WARNING(( "pack sent slot %lu before our watermark %lu.", ctx->curr_slot, fd_fseq_query( ctx->published_wmark ) ));
     583           0 :     }
     584           0 :     if( fd_disco_poh_sig_pkt_type( sig )==POH_PKT_TYPE_MICROBLOCK ) {
     585           0 :       ulong bank_idx = fd_disco_poh_sig_bank_tile( sig );
     586           0 :       fd_replay_out_ctx_t * bank_out = &ctx->bank_out[ bank_idx ];
     587           0 :       uchar * dst_poh = fd_chunk_to_laddr( bank_out->mem, bank_out->chunk );
     588           0 :       ctx->flags = REPLAY_FLAG_PACKED_MICROBLOCK;
     589           0 :       ctx->txn_cnt = (sz - sizeof(fd_microblock_bank_trailer_t)) / sizeof(fd_txn_p_t);
     590           0 :       ctx->bank_idx = bank_idx;
     591           0 :       fd_memcpy( dst_poh, src, (sz - sizeof(fd_microblock_bank_trailer_t)) );
     592           0 :       src += (sz-sizeof(fd_microblock_bank_trailer_t));
     593           0 :       dst_poh += (sz - sizeof(fd_microblock_bank_trailer_t));
     594           0 :       fd_microblock_bank_trailer_t * t = (fd_microblock_bank_trailer_t *)src;
     595           0 :       ctx->parent_slot = (ulong)t->bank;
     596           0 :     } else {
     597           0 :       FD_LOG_WARNING(("OTHER PACKET TYPE: %lu", fd_disco_poh_sig_pkt_type( sig )));
     598           0 :       ctx->skip_frag = 1;
     599           0 :       return;
     600           0 :     }
     601             : 
     602           0 :     FD_LOG_DEBUG(( "packed microblock - slot: %lu, parent_slot: %lu, txn_cnt: %lu", ctx->curr_slot, ctx->parent_slot, ctx->txn_cnt ));
     603           0 :   } else if( in_idx==BATCH_IN_IDX ) {
     604           0 :     uchar * src = (uchar *)fd_chunk_to_laddr( ctx->batch_in_mem, chunk );
     605           0 :     fd_memcpy( ctx->slot_ctx->slot_bank.epoch_account_hash.uc, src, sizeof(fd_hash_t) );
     606           0 :     FD_LOG_NOTICE(( "Epoch account hash calculated to be %s", FD_BASE58_ENC_32_ALLOCA( ctx->slot_ctx->slot_bank.epoch_account_hash.uc ) ));
     607           0 :   } else if ( in_idx >= SHRED_IN_IDX ) {
     608             : 
     609           0 :     fd_shred_replay_in_ctx_t * shred_in = &ctx->shred_in[ in_idx-SHRED_IN_IDX ];
     610           0 :     if( FD_UNLIKELY( chunk<shred_in->chunk0 || chunk>shred_in->wmark || sz > sizeof(fd_shred34_t) ) ) {
     611           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, shred_in->chunk0 , shred_in->wmark ));
     612           0 :     }
     613             :     // uchar * src = (uchar *)fd_chunk_to_laddr( shred_in->mem, chunk );
     614             :     // fd_memcpy( (uchar *)ctx->shred, src, sz ); /* copy the hdr to read the code_cnt & data_cnt */
     615             : 
     616           0 :     ctx->skip_frag = 1;
     617             : 
     618           0 :     return;
     619           0 :   }
     620             :   // if( ctx->flags & REPLAY_FLAG_PACKED_MICROBLOCK ) {
     621             :   //   /* We do not know the parent slot, pick one from fork selection */
     622             :   //   ulong max_slot = 0; /* FIXME: default to snapshot slot/smr */
     623             :   //   for( fd_fork_frontier_iter_t iter = fd_fork_frontier_iter_init( ctx->forks->frontier, ctx->forks->pool );
     624             :   //      !fd_fork_frontier_iter_done( iter, ctx->forks->frontier, ctx->forks->pool );
     625             :   //      iter = fd_fork_frontier_iter_next( iter, ctx->forks->frontier, ctx->forks->pool ) ) {
     626             :   //     fd_exec_slot_ctx_t * ele = &fd_fork_frontier_iter_ele( iter, ctx->forks->frontier, ctx->forks->pool )->slot_ctx;
     627             :   //     if ( max_slot < ele->slot_bank.slot ) {
     628             :   //       max_slot = ele->slot_bank.slot;
     629             :   //     }
     630             :   //   }
     631             :   //   ctx->parent_slot = max_slot;
     632             :   // }
     633             : 
     634           0 :   uchar block_flags = 0;
     635           0 :   int err = FD_MAP_ERR_AGAIN;
     636           0 :   while( err == FD_MAP_ERR_AGAIN ){
     637           0 :     fd_block_map_query_t quer[1] = { 0 };
     638           0 :     err = fd_block_map_query_try( ctx->blockstore->block_map, &ctx->curr_slot, NULL, quer, 0 );
     639           0 :     fd_block_info_t * block_info = fd_block_map_query_ele( quer );
     640           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
     641           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_KEY )) break;
     642           0 :     block_flags = block_info->flags;
     643           0 :     err = fd_block_map_query_test( quer );
     644           0 :   }
     645             : 
     646           0 :   if( FD_UNLIKELY( fd_uchar_extract_bit( block_flags, FD_BLOCK_FLAG_PROCESSED ) ) ) {
     647           0 :     FD_LOG_WARNING(( "block already processed - slot: %lu", ctx->curr_slot ));
     648           0 :     ctx->skip_frag = 1;
     649           0 :   }
     650           0 :   if( FD_UNLIKELY( fd_uchar_extract_bit( block_flags, FD_BLOCK_FLAG_DEADBLOCK ) ) ) {
     651           0 :     FD_LOG_WARNING(( "block already dead - slot: %lu", ctx->curr_slot ));
     652           0 :     ctx->skip_frag = 1;
     653           0 :   }
     654           0 : }
     655             : 
     656             : static void
     657           0 : checkpt( fd_replay_tile_ctx_t * ctx ) {
     658           0 :   if( FD_UNLIKELY( ctx->slots_replayed_file ) ) fclose( ctx->slots_replayed_file );
     659           0 :   if( FD_UNLIKELY( strcmp( ctx->blockstore_checkpt, "" ) ) ) {
     660           0 :     int rc = fd_wksp_checkpt( ctx->blockstore_wksp, ctx->blockstore_checkpt, 0666, 0, NULL );
     661           0 :     if( rc ) {
     662           0 :       FD_LOG_ERR( ( "blockstore checkpt failed: error %d", rc ) );
     663           0 :     }
     664           0 :   }
     665           0 :   int rc = fd_wksp_checkpt( ctx->funk_wksp, ctx->funk_checkpt, 0666, 0, NULL );
     666           0 :   if( rc ) {
     667           0 :     FD_LOG_ERR( ( "funk checkpt failed: error %d", rc ) );
     668           0 :   }
     669           0 : }
     670             : 
     671             : static void
     672           0 : funk_cancel( fd_replay_tile_ctx_t * ctx, ulong mismatch_slot ) {
     673           0 :   fd_funk_txn_xid_t xid        = { .ul = { mismatch_slot, mismatch_slot } };
     674           0 :   fd_funk_txn_t * txn_map      = fd_funk_txn_map( ctx->funk, fd_funk_wksp( ctx->funk ) );
     675           0 :   fd_funk_txn_t * mismatch_txn = fd_funk_txn_query( &xid, txn_map );
     676           0 :   fd_funk_start_write( ctx->funk );
     677           0 :   FD_TEST( fd_funk_txn_cancel( ctx->funk, mismatch_txn, 1 ) );
     678           0 :   fd_funk_end_write( ctx->funk );
     679           0 : }
     680             : 
     681             : struct fd_status_check_ctx {
     682             :   fd_slot_history_t * slot_history;
     683             :   fd_txncache_t * txncache;
     684             :   ulong current_slot;
     685             : };
     686             : typedef struct fd_status_check_ctx fd_status_check_ctx_t;
     687             : 
     688             : static void
     689             : txncache_publish( fd_replay_tile_ctx_t * ctx,
     690             :                   fd_funk_txn_t *        txn_map,
     691             :                   fd_funk_txn_t *        to_root_txn,
     692           0 :                   fd_funk_txn_t *        rooted_txn ) {
     693             : 
     694             : 
     695             :   /* For the status cache, we stop rooting until the status cache has been
     696             :      written out to the current snapshot. We also need to iterate up the
     697             :      funk transaction tree up until the current "root" to figure out what slots
     698             :      should be registered. This root can correspond to the latest false root if
     699             :      one exists.  */
     700             : 
     701             : 
     702           0 :   if( FD_UNLIKELY( !ctx->slot_ctx->status_cache ) ) {
     703           0 :     return;
     704           0 :   }
     705             : 
     706           0 :   fd_funk_txn_t * txn = to_root_txn;
     707           0 :   while( txn!=rooted_txn ) {
     708           0 :     ulong slot = txn->xid.ul[0];
     709           0 :     if( FD_LIKELY( !fd_txncache_get_is_constipated( ctx->slot_ctx->status_cache ) ) ) {
     710           0 :       FD_LOG_INFO(( "Registering slot %lu", slot ));
     711           0 :       fd_txncache_register_root_slot( ctx->slot_ctx->status_cache, slot );
     712           0 :     } else {
     713           0 :       FD_LOG_INFO(( "Registering constipated slot %lu", slot ));
     714           0 :       fd_txncache_register_constipated_slot( ctx->slot_ctx->status_cache, slot );
     715           0 :     }
     716           0 :     txn = fd_funk_txn_parent( txn, txn_map );
     717           0 :   }
     718           0 : }
     719             : 
     720             : static void
     721           0 : snapshot_state_update( fd_replay_tile_ctx_t * ctx, ulong wmk ) {
     722             : 
     723             :   /* We are ready for a snapshot if either we are on or just passed a snapshot
     724             :      interval and no snapshot is currently in progress. This is to handle the
     725             :      case where the snapshot interval falls on a skipped slot.
     726             : 
     727             :      We are ready to create a snapshot if:
     728             :      1. The node is caught up to the network.
     729             :      2. There is currently no snapshot in progress
     730             :      3. The current slot is at the snapshot interval OR
     731             :         The current slot has passed a snapshot interval
     732             : 
     733             :     If a snapshot is ready to be created we will constipate funk and the
     734             :     status cache. This will also notify the status cache via the funk
     735             :     constipation fseq. */
     736             : 
     737           0 :   if( ctx->snapshot_interval==ULONG_MAX ) {
     738           0 :     return;
     739           0 :   }
     740             : 
     741           0 :   uchar is_constipated = fd_fseq_query( ctx->is_constipated ) != 0UL;
     742             : 
     743           0 :   if( !ctx->is_caught_up ) {
     744           0 :     return;
     745           0 :   }
     746             : 
     747           0 :   if( is_constipated ) {
     748           0 :     return;
     749           0 :   }
     750             : 
     751           0 :   fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->slot_ctx->epoch_ctx );
     752             : 
     753             :   /* The distance from the last snapshot should only grow until we skip
     754             :      past the last full snapshot. If it has shrunk that means we skipped
     755             :      over the snapshot interval. */
     756           0 :   ulong curr_full_snapshot_dist = wmk % ctx->snapshot_interval;
     757           0 :   uchar is_full_snapshot_ready  = curr_full_snapshot_dist < ctx->prev_full_snapshot_dist;
     758           0 :   ctx->prev_full_snapshot_dist  = curr_full_snapshot_dist;
     759             : 
     760             :   /* Do the same for incrementals, only try to create one if there has been
     761             :      a full snapshot. */
     762             : 
     763           0 :   ulong curr_incr_snapshot_dist = wmk % ctx->incremental_interval;
     764             : 
     765           0 :   uchar is_inc_snapshot_ready   = wmk % ctx->incremental_interval < ctx->prev_incr_snapshot_dist && ctx->last_full_snap;
     766           0 :   ctx->prev_incr_snapshot_dist  = curr_incr_snapshot_dist;
     767             : 
     768           0 :   ulong updated_fseq = 0UL;
     769             : 
     770             :   /* TODO: We need a better check if the wmk fell on an epoch boundary due to
     771             :      skipped slots. We just don't want to make a snapshot on an epoch boundary */
     772           0 :   if( (is_full_snapshot_ready || is_inc_snapshot_ready) &&
     773           0 :       !fd_runtime_is_epoch_boundary( epoch_bank, wmk, wmk-1UL ) ) {
     774             :     /* Constipate the status cache when a snapshot is ready to be created. */
     775           0 :     if( is_full_snapshot_ready ) {
     776           0 :       ctx->last_full_snap = wmk;
     777           0 :       FD_LOG_NOTICE(( "Ready to create a full snapshot" ));
     778           0 :       updated_fseq = fd_batch_fseq_pack( 1, 0, wmk );
     779           0 :     } else {
     780           0 :       FD_LOG_NOTICE(( "Ready to create an incremental snapshot" ));
     781           0 :       updated_fseq = fd_batch_fseq_pack( 1, 1, wmk );
     782           0 :     }
     783           0 :     fd_txncache_set_is_constipated( ctx->slot_ctx->status_cache, 1 );
     784           0 :     fd_fseq_update( ctx->is_constipated, updated_fseq );
     785           0 :   }
     786           0 : }
     787             : 
     788             : static void
     789             : funk_publish( fd_replay_tile_ctx_t * ctx,
     790             :               fd_funk_txn_t *        to_root_txn,
     791             :               fd_funk_txn_t *        txn_map,
     792             :               ulong                  wmk,
     793           0 :               uchar                  is_constipated ) {
     794             : 
     795           0 :   fd_funk_start_write( ctx->funk );
     796             : 
     797           0 :   fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->slot_ctx->epoch_ctx );
     798             : 
     799             :   /* Now try to publish into funk, this is handled differently based on if
     800             :      funk is constipated or if funk is double-constipated. Even if funk was
     801             :      double-constipated and now no-longer is we still want to preserve the
     802             :      root for the epoch account hash. */
     803           0 :   if( ctx->double_constipation_slot ) {
     804           0 :     FD_LOG_NOTICE(( "Double constipation publish for wmk=%lu", wmk ));
     805             : 
     806           0 :     fd_funk_txn_t * txn = to_root_txn;
     807           0 :     while( txn!=ctx->second_false_root ) {
     808           0 :       if( FD_UNLIKELY( fd_funk_txn_publish_into_parent( ctx->funk, txn, 0 ) ) ) {
     809           0 :         FD_LOG_ERR(( "Can't publish funk transaction" ));
     810           0 :       }
     811           0 :       txn = fd_funk_txn_parent( txn, txn_map );
     812           0 :     }
     813             : 
     814           0 :   } else if( is_constipated ) {
     815           0 :     FD_LOG_NOTICE(( "Publishing slot=%lu while constipated", wmk ));
     816             : 
     817             :     /* At this point, first collapse the current transaction that should be
     818             :        published into the oldest child transaction. */
     819             : 
     820           0 :     if( FD_UNLIKELY( wmk>=epoch_bank->eah_start_slot ) ) {
     821             :       /* We need to double-constipate at this point. */
     822             : 
     823             :       /* First, find the txn where the corresponding slot is the minimum
     824             :          pending transaction where >= eah_start_slot. */
     825             : 
     826           0 :       fd_funk_txn_t * txn        = to_root_txn;
     827           0 :       fd_funk_txn_t * parent_txn = fd_funk_txn_parent( txn, txn_map );
     828             : 
     829           0 :       while( parent_txn ) {
     830             : 
     831           0 :         int is_curr_gteq_eah_start = txn->xid.ul[0] >= epoch_bank->eah_start_slot;
     832           0 :         int is_prev_lt_eah_start   = parent_txn->xid.ul[0] < epoch_bank->eah_start_slot;
     833           0 :         if( is_curr_gteq_eah_start && is_prev_lt_eah_start ) {
     834           0 :           break;
     835           0 :         }
     836           0 :         txn        = parent_txn;
     837           0 :         parent_txn = fd_funk_txn_parent( txn, txn_map );
     838           0 :       }
     839             : 
     840             :       /* We should never get to this point because of the constipated root.
     841             :          The constipated root is guaranteed to have a slot that's < eah_start_slot. */
     842           0 :       if( FD_UNLIKELY( !parent_txn ) ) {
     843           0 :         FD_LOG_ERR(( "Not possible for the parent_txn to be the root" ));
     844           0 :       }
     845             : 
     846             :       /* This transaction will now become the double-constipated root. */
     847             : 
     848           0 :       FD_LOG_NOTICE(( "Entering a double constipated state eah_start=%lu eah_slot=%lu",
     849           0 :                       epoch_bank->eah_start_slot, txn->xid.ul[0] ));
     850             : 
     851           0 :       ctx->double_constipation_slot = txn->xid.ul[0];
     852             : 
     853             :       /* Other pending transactions will get published into the child during
     854             :          the next invocation of funk_publish. */
     855           0 :     } else {
     856             : 
     857           0 :       FD_LOG_NOTICE(( "Publishing into constipated root for wmk=%lu", wmk ));
     858             :       /* Standard constipated case where we aren't considering the eah. */
     859           0 :       fd_funk_txn_t * txn        = to_root_txn;
     860             : 
     861           0 :       while( txn!=ctx->false_root ) {
     862           0 :         if( FD_UNLIKELY( fd_funk_txn_publish_into_parent( ctx->funk, txn, 0 ) ) ) {
     863           0 :           FD_LOG_ERR(( "Can't publish funk transaction" ));
     864           0 :         }
     865           0 :         txn = fd_funk_txn_parent( txn, txn_map );
     866           0 :       }
     867           0 :     }
     868           0 :   } else {
     869             : 
     870             :     /* This is the case where we are not in the constipated case. We only need
     871             :        to do special handling in the case where the epoch account hash is about
     872             :        to be calculated. */
     873             : 
     874           0 :     FD_LOG_NOTICE(( "Publishing slot=%lu", wmk ));
     875             : 
     876           0 :     if( FD_UNLIKELY( wmk>=epoch_bank->eah_start_slot ) ) {
     877             : 
     878           0 :       FD_LOG_NOTICE(( "EAH is ready to be calculated" ));
     879             : 
     880             :       /* This condition means that we want to start producing an epoch account
     881             :          hash at a slot that is in the set of transactions we are about to
     882             :          publish. We only want to publish all slots that are <= the slot that
     883             :          we will calculate the epoch account hash for. */
     884             : 
     885           0 :       fd_funk_txn_t * txn        = to_root_txn;
     886           0 :       fd_funk_txn_t * parent_txn = fd_funk_txn_parent( txn, txn_map );
     887           0 :       while( parent_txn ) {
     888             :         /* We need to be careful here because the eah start slot may be skipped
     889             :            so the actual slot that we calculate the eah for may be greater than
     890             :            the eah start slot. The transaction must correspond to a slot greater
     891             :            than or equal to the eah start slot, but its parent transaction must
     892             :            either have been published already or must be less than the eah start
     893             :            slot. */
     894             : 
     895           0 :         int is_curr_gteq_eah_start = txn->xid.ul[0] >= epoch_bank->eah_start_slot;
     896           0 :         int is_prev_lt_eah_start   = parent_txn->xid.ul[0] < epoch_bank->eah_start_slot;
     897           0 :         if( is_curr_gteq_eah_start && is_prev_lt_eah_start ) {
     898           0 :           break;
     899           0 :         }
     900           0 :         txn        = parent_txn;
     901           0 :         parent_txn = fd_funk_txn_parent( txn, txn_map );
     902           0 :       }
     903             : 
     904             :       /* At this point, we know txn is the funk txn that we will want to
     905             :          calculate the eah for since it's the minimum slot that is >=
     906             :          eah_start_slot. */
     907             : 
     908           0 :       FD_LOG_NOTICE(( "The eah has an expected start slot of %lu and is being created for slot %lu", epoch_bank->eah_start_slot, txn->xid.ul[0] ));
     909             : 
     910           0 :       if( FD_UNLIKELY( !fd_funk_txn_publish( ctx->funk, txn, 1 ) ) ) {
     911           0 :         FD_LOG_ERR(( "failed to funk publish" ));
     912           0 :       }
     913             : 
     914             :       /* At this point, we have the root for which we want to calculate the
     915             :          epoch account hash for. The other children that are > eah_start_slot
     916             :          but <= wmk will be published into the constipated root during the next
     917             :          invocation of funk_and_txncache_publish.
     918             : 
     919             :          Notify the batch tile that an eah should be computed. */
     920             : 
     921           0 :       ulong updated_fseq = fd_batch_fseq_pack( 0UL, 0UL, txn->xid.ul[0] );
     922           0 :       fd_fseq_update( ctx->is_constipated, updated_fseq );
     923           0 :       epoch_bank->eah_start_slot = FD_SLOT_NULL;
     924             : 
     925           0 :     } else {
     926             :       /* This is the standard case. Publish all transactions up to and
     927             :          including the watermark. This will publish any in-prep ancestors
     928             :          of root_txn as well. */
     929             : 
     930           0 :       if( FD_UNLIKELY( !fd_funk_txn_publish( ctx->funk, to_root_txn, 1 ) ) ) {
     931           0 :         FD_LOG_ERR(( "failed to funk publish slot %lu", wmk ));
     932           0 :       }
     933           0 :     }
     934           0 :   }
     935             : 
     936           0 :   fd_funk_end_write( ctx->funk );
     937             : 
     938           0 : }
     939             : 
     940             : static fd_funk_txn_t*
     941             : get_rooted_txn( fd_replay_tile_ctx_t * ctx,
     942             :                 fd_funk_txn_t *        to_root_txn,
     943             :                 fd_funk_txn_t *        txn_map,
     944           0 :                 uchar                  is_constipated ) {
     945             : 
     946             :   /* We need to get the rooted transaction that we are publishing into. This
     947             :      needs to account for the three different cases: no constipation, single
     948             :      constipation, double constipation.
     949             : 
     950             :      Also, if it's the first time that we are setting the false root(s), then
     951             :      we must also register them into the status cache because we don't register
     952             :      the root in txncache_publish to avoid registering the same slot multiple times. */
     953             : 
     954           0 :   if( FD_UNLIKELY( ctx->double_constipation_slot ) ) {
     955             : 
     956           0 :     if( FD_UNLIKELY( !ctx->second_false_root ) ) {
     957             : 
     958             :       /* Set value of second false root, save it and publish to txncache. */
     959           0 :       fd_funk_txn_t * txn = to_root_txn;
     960           0 :       while( txn->xid.ul[0]>ctx->double_constipation_slot ) {
     961           0 :         txn = fd_funk_txn_parent( txn, txn_map );
     962           0 :       }
     963             : 
     964           0 :       if( FD_LIKELY( !fd_txncache_get_is_constipated( ctx->slot_ctx->status_cache ) ) ) {
     965           0 :         fd_txncache_register_root_slot( ctx->slot_ctx->status_cache, txn->xid.ul[0] );
     966           0 :       } else {
     967           0 :         fd_txncache_register_constipated_slot( ctx->slot_ctx->status_cache, txn->xid.ul[0] );
     968           0 :       }
     969             : 
     970           0 :       if( txn->xid.ul[0] != ctx->double_constipation_slot ) {
     971           0 :         FD_LOG_ERR(( "txn->xid.ul[0] = %lu, ctx->double_constipation_slot = %lu", txn->xid.ul[0], ctx->double_constipation_slot ));
     972           0 :       }
     973           0 :       ctx->second_false_root = txn;
     974           0 :     }
     975           0 :     return ctx->second_false_root;
     976           0 :   } else if( is_constipated ) {
     977             : 
     978           0 :     if( FD_UNLIKELY( !ctx->false_root ) ) {
     979             : 
     980           0 :       fd_funk_txn_t * txn        = to_root_txn;
     981           0 :       fd_funk_txn_t * parent_txn = fd_funk_txn_parent( txn, txn_map );
     982           0 :       while( parent_txn ) {
     983           0 :         txn        = parent_txn;
     984           0 :         parent_txn = fd_funk_txn_parent( txn, txn_map );
     985           0 :       }
     986             : 
     987           0 :       ctx->false_root = txn;
     988           0 :       if( !fd_txncache_get_is_constipated( ctx->slot_ctx->status_cache ) ) {
     989           0 :         fd_txncache_register_root_slot( ctx->slot_ctx->status_cache, txn->xid.ul[0] );
     990           0 :       } else {
     991           0 :         fd_txncache_register_constipated_slot( ctx->slot_ctx->status_cache, txn->xid.ul[0] );
     992           0 :       }
     993           0 :     }
     994           0 :     return ctx->false_root;
     995           0 :   } else {
     996           0 :     return NULL;
     997           0 :   }
     998           0 : }
     999             : 
    1000             : static void
    1001           0 : funk_and_txncache_publish( fd_replay_tile_ctx_t * ctx, ulong wmk, fd_funk_txn_xid_t const * xid ) {
    1002             : 
    1003           0 :   FD_LOG_NOTICE(( "Entering funk_and_txncache_publish for wmk=%lu", wmk ));
    1004             : 
    1005             :   /* This function is responsible for publishing/registering all in-prep slots
    1006             :      up to and including the watermark slot into funk and the transaction cache.
    1007             : 
    1008             :      However, we need to modify this behavior to support snapshot creation and
    1009             :      epoch account hash generation (which is handled by the batch tile).
    1010             :      Specifically, we need to change the mechanism by introducing the concept of
    1011             :      a constipated root. We want to keep the root of funk/txncache constant
    1012             :      while the batch tile reads from the root of funk. At the same time, we
    1013             :      want to keep publishing into funk. We accomplish this by treating the
    1014             :      oldest in-prep ancestor of funk as the "constipated/false" root. While
    1015             :      the batch tile "works", we will only publish into the false root. Once the
    1016             :      batch tile is done producing a snapshot/eah, we will then flush the
    1017             :      constipated root into the real root of funk as we no longer need a frozen
    1018             :      funk transaction to read from. The batch tile will communicate with the
    1019             :      replay tile via the is_constipated fseq and a link.
    1020             : 
    1021             :      There is a pretty important edge case to consider here: what do we do if
    1022             :      we are currently in the middle of creating a snapshot, but we need to
    1023             :      record our state for the epoch account hash? The epoch account hash must
    1024             :      be created for a specific slot and we can't block execution to calculate
    1025             :      this hash. The solution will be to introduce a second constipation via a
    1026             :      second false root. This new false root will correspond to the oldest
    1027             :      child transaction of the transaction that corresponds to the eah
    1028             :      calculation slot. When the snapshot is done being produced, any further
    1029             :      snapshot creation will be blocked until the epoch account hash is created.
    1030             :      We will use the second false root to publish into while the batch tile
    1031             :      produces the epoch account hash. We do not modify any of the parents of
    1032             :      the second constipated root until we are done producing a snapshot.
    1033             : 
    1034             :      A similar mechanism for txncache constipation is needed only for snapshot
    1035             :      creation. This is simpler than for funk because txncache operations are
    1036             :      atomic and we can just register slots into a constipated set while the
    1037             :      txncache is getting copied out. This is a much faster operation and the
    1038             :      txncache will likely get unconstipated before funk.
    1039             : 
    1040             :      Single Funk Constipation Example:
    1041             : 
    1042             :      If we want to create a snapshot/eah for slot n, then we will publish
    1043             :      all transactions up to and including those that correspond to slot n.
    1044             :      We will then publish all transactions into the immediate child of n (lets
    1045             :      assume it's n+1) in this case. So every transaction will be published into
    1046             :      n+1 and NOT n. When the computation is done, we resume publishing as normal.
    1047             : 
    1048             :      Double Funk Constipation Example:
    1049             : 
    1050             :      Let's say we are creating a snapshot for slot n and we want
    1051             :      the epoch account hash for slot m. A snapshot will take x slots to produce
    1052             :      and we can assume that n + x > m. So at some slot y where n < y < m, the
    1053             :      state of funk will be: a root at slot n with a constipated root at
    1054             :      n+1 which gets published into. However, once it is time to publish slot m,
    1055             :      we will now have a root at slot n, a constipated root at slot m, and we will
    1056             :      then start publishing into the second constipated root at slot m + 1. */
    1057             : 
    1058             :   /* First wait for all tpool threads to finish. */
    1059             : 
    1060           0 :   for( ulong i = 0UL; i<ctx->bank_cnt; i++ ) {
    1061           0 :     fd_tpool_wait( ctx->tpool, i+1 );
    1062           0 :   }
    1063             : 
    1064           0 :   fd_epoch_bank_t * epoch_bank     = fd_exec_epoch_ctx_epoch_bank( ctx->slot_ctx->epoch_ctx );
    1065           0 :   uchar             is_constipated = fd_fseq_query( ctx->is_constipated ) != 0;
    1066             : 
    1067             :   /* If the is_constipated fseq is set to 0 that means that the batch tile
    1068             :      is currently in an idle state. However, if there was a double constipation
    1069             :      active, that means that we need to kick off the pending epoch account hash
    1070             :      calculation. */
    1071           0 :   if( ctx->double_constipation_slot && !is_constipated ) {
    1072           0 :     FD_LOG_NOTICE(( "No longer double constipated, ready to start computing the epoch account hash" ));
    1073             : 
    1074             :     /* At this point, the snapshot has been completed, so we are now ready to
    1075             :        start the eah computation. */
    1076           0 :     ulong updated_fseq = fd_batch_fseq_pack( 0UL, 0UL, ctx->double_constipation_slot );
    1077           0 :     fd_fseq_update( ctx->is_constipated, updated_fseq );
    1078           0 :     epoch_bank->eah_start_slot = FD_SLOT_NULL;
    1079           0 :   }
    1080             : 
    1081             :   /* If the (second) false root is no longer needed, then we should stop
    1082             :      tracking it. */
    1083           0 :   if( FD_UNLIKELY( ctx->false_root && !is_constipated ) ) {
    1084           0 :     FD_LOG_NOTICE(( "Unsetting false root tracking" ));
    1085           0 :     ctx->false_root = NULL;
    1086           0 :   }
    1087           0 :   if( FD_UNLIKELY( ctx->second_false_root && !ctx->double_constipation_slot ) ) {
    1088           0 :     FD_LOG_NOTICE(( "Unsetting second false root tracking" ));
    1089           0 :     ctx->second_false_root = NULL;
    1090           0 :   }
    1091             : 
    1092             : 
    1093             :   /* Handle updates to funk and the status cache. */
    1094             : 
    1095           0 :   fd_funk_txn_t * txn_map     = fd_funk_txn_map( ctx->funk, fd_funk_wksp( ctx->funk ) );
    1096           0 :   fd_funk_txn_t * to_root_txn = fd_funk_txn_query( xid, txn_map );
    1097           0 :   fd_funk_txn_t * rooted_txn  = get_rooted_txn( ctx, to_root_txn, txn_map, is_constipated );
    1098             : 
    1099           0 :   txncache_publish( ctx, txn_map, to_root_txn, rooted_txn );
    1100             : 
    1101           0 :   funk_publish( ctx, to_root_txn, txn_map, wmk, is_constipated );
    1102             : 
    1103             :   /* Update the snapshot state and determine if one is ready to be created. */
    1104             : 
    1105           0 :   snapshot_state_update( ctx, wmk );
    1106             : 
    1107           0 :   if( FD_UNLIKELY( ctx->capture_ctx ) ) {
    1108           0 :     fd_runtime_checkpt( ctx->capture_ctx, ctx->slot_ctx, wmk );
    1109           0 :   }
    1110             : 
    1111           0 : }
    1112             : 
    1113             : static int
    1114           0 : suppress_notify( const fd_pubkey_t * prog ) {
    1115             :   /* Certain accounts are just noise and a waste of notification bandwidth */
    1116           0 :   if( !memcmp( prog, fd_solana_vote_program_id.key, sizeof(fd_pubkey_t) ) ) {
    1117           0 :     return 1;
    1118           0 :   } else if( !memcmp( prog, fd_solana_system_program_id.key, sizeof(fd_pubkey_t) ) ) {
    1119           0 :     return 1;
    1120           0 :   } else if( !memcmp( prog, fd_solana_compute_budget_program_id.key, sizeof(fd_pubkey_t) ) ) {
    1121           0 :     return 1;
    1122           0 :   } else {
    1123           0 :     return 0;
    1124           0 :   }
    1125           0 : }
    1126             : 
    1127             : static void
    1128             : publish_account_notifications( fd_replay_tile_ctx_t * ctx,
    1129             :                                fd_fork_t *            fork,
    1130             :                                ulong                  curr_slot,
    1131             :                                fd_txn_p_t const *     txns,
    1132           0 :                                ulong                  txn_cnt ) {
    1133           0 :   long notify_time_ns = -fd_log_wallclock();
    1134           0 : #define NOTIFY_START msg = fd_chunk_to_laddr( ctx->notif_out_mem, ctx->notif_out_chunk )
    1135           0 : #define NOTIFY_END                                                      \
    1136           0 :   fd_mcache_publish( ctx->notif_out_mcache, ctx->notif_out_depth, ctx->notif_out_seq, \
    1137           0 :                       0UL, ctx->notif_out_chunk, sizeof(fd_replay_notif_msg_t), 0UL, tsorig, tsorig ); \
    1138           0 :   ctx->notif_out_seq   = fd_seq_inc( ctx->notif_out_seq, 1UL );     \
    1139           0 :   ctx->notif_out_chunk = fd_dcache_compact_next( ctx->notif_out_chunk, sizeof(fd_replay_notif_msg_t), \
    1140           0 :                                                   ctx->notif_out_chunk0, ctx->notif_out_wmark ); \
    1141           0 :   msg = NULL
    1142             : 
    1143           0 :   ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
    1144           0 :   fd_replay_notif_msg_t * msg = NULL;
    1145             : 
    1146           0 :   for( ulong i = 0; i < txn_cnt; ++i ) {
    1147           0 :       uchar const * raw = txns[i].payload;
    1148           0 :       fd_txn_t const * txn = TXN(txns + i);
    1149           0 :       ushort acct_cnt = txn->acct_addr_cnt;
    1150           0 :       const fd_pubkey_t * accts = (const fd_pubkey_t *)(raw + txn->acct_addr_off);
    1151           0 :       FD_TEST((void*)(accts + acct_cnt) <= (void*)(raw + txns[i].payload_sz));
    1152           0 :       fd_ed25519_sig_t const * sigs = (fd_ed25519_sig_t const *)(raw + txn->signature_off);
    1153           0 :       FD_TEST((void*)(sigs + txn->signature_cnt) <= (void*)(raw + txns[i].payload_sz));
    1154           0 :       for( ushort j = 0; j < acct_cnt; ++j ) {
    1155           0 :         if( suppress_notify( accts + j ) ) continue;
    1156           0 :         if( msg == NULL ) {
    1157           0 :           NOTIFY_START;
    1158           0 :           msg->type = FD_REPLAY_ACCTS_TYPE;
    1159           0 :           msg->accts.funk_xid = fork->slot_ctx.funk_txn->xid;
    1160           0 :           fd_memcpy( msg->accts.sig, sigs, sizeof(fd_ed25519_sig_t) );
    1161           0 :           msg->accts.accts_cnt = 0;
    1162           0 :         }
    1163           0 :         struct fd_replay_notif_acct * out = &msg->accts.accts[ msg->accts.accts_cnt++ ];
    1164           0 :         fd_memcpy( out->id, accts + j, sizeof(out->id) );
    1165           0 :         int writable = ((j < txn->signature_cnt - txn->readonly_signed_cnt) ||
    1166           0 :                         ((j >= txn->signature_cnt) && (j < acct_cnt - txn->readonly_unsigned_cnt)));
    1167           0 :         out->flags = (writable ? FD_REPLAY_NOTIF_ACCT_WRITTEN : FD_REPLAY_NOTIF_ACCT_NO_FLAGS );
    1168             : 
    1169           0 :         if( msg->accts.accts_cnt == FD_REPLAY_NOTIF_ACCT_MAX ) {
    1170           0 :           NOTIFY_END;
    1171           0 :         }
    1172           0 :       }
    1173           0 :       if( msg ) {
    1174           0 :         NOTIFY_END;
    1175           0 :       }
    1176           0 :     }
    1177             : 
    1178           0 : #undef NOTIFY_START
    1179           0 : #undef NOTIFY_END
    1180           0 :   notify_time_ns += fd_log_wallclock();
    1181           0 :   FD_LOG_DEBUG(("TIMING: notify_account_time - slot: %lu, elapsed: %6.6f ms", curr_slot, (double)notify_time_ns * 1e-6));
    1182           0 : }
    1183             : 
    1184             : static void
    1185             : replay_plugin_publish( fd_replay_tile_ctx_t * ctx,
    1186             :                        fd_stem_context_t * stem,
    1187             :                        ulong sig,
    1188             :                        uchar const * data,
    1189           0 :                        ulong data_sz ) {
    1190           0 :   uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->replay_plugin_out_mem, ctx->replay_plugin_out_chunk );
    1191           0 :   fd_memcpy( dst, data, data_sz );
    1192           0 :   ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
    1193           0 :   fd_stem_publish( stem, ctx->replay_plug_out_idx, sig, ctx->replay_plugin_out_chunk, data_sz, 0UL, 0UL, tspub );
    1194           0 :   ctx->replay_plugin_out_chunk = fd_dcache_compact_next( ctx->replay_plugin_out_chunk, data_sz, ctx->replay_plugin_out_chunk0, ctx->replay_plugin_out_wmark );
    1195           0 : }
    1196             : 
    1197             : static void
    1198             : publish_slot_notifications( fd_replay_tile_ctx_t * ctx,
    1199             :                             fd_stem_context_t *    stem,
    1200             :                             fd_fork_t *            fork,
    1201             :                             ulong                  block_entry_block_height,
    1202           0 :                             ulong                  curr_slot ) {
    1203           0 :   long notify_time_ns = -fd_log_wallclock();
    1204           0 : #define NOTIFY_START msg = fd_chunk_to_laddr( ctx->notif_out_mem, ctx->notif_out_chunk )
    1205           0 : #define NOTIFY_END                                                      \
    1206           0 :   fd_mcache_publish( ctx->notif_out_mcache, ctx->notif_out_depth, ctx->notif_out_seq, \
    1207           0 :                       0UL, ctx->notif_out_chunk, sizeof(fd_replay_notif_msg_t), 0UL, tsorig, tsorig ); \
    1208           0 :   ctx->notif_out_seq   = fd_seq_inc( ctx->notif_out_seq, 1UL );     \
    1209           0 :   ctx->notif_out_chunk = fd_dcache_compact_next( ctx->notif_out_chunk, sizeof(fd_replay_notif_msg_t), \
    1210           0 :                                                   ctx->notif_out_chunk0, ctx->notif_out_wmark ); \
    1211           0 :   msg = NULL
    1212             : 
    1213           0 :   ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
    1214           0 :   fd_replay_notif_msg_t * msg = NULL;
    1215             : 
    1216           0 :   {
    1217           0 :     NOTIFY_START;
    1218           0 :     msg->type = FD_REPLAY_SLOT_TYPE;
    1219           0 :     msg->slot_exec.slot = curr_slot;
    1220           0 :     msg->slot_exec.parent = ctx->parent_slot;
    1221           0 :     msg->slot_exec.root = fd_fseq_query( ctx->published_wmark );
    1222           0 :     msg->slot_exec.height = block_entry_block_height;
    1223           0 :     msg->slot_exec.transaction_count = fork->slot_ctx.slot_bank.transaction_count;
    1224           0 :     memcpy( &msg->slot_exec.bank_hash, &fork->slot_ctx.slot_bank.banks_hash, sizeof( fd_hash_t ) );
    1225           0 :     memcpy( &msg->slot_exec.block_hash, &ctx->blockhash, sizeof( fd_hash_t ) );
    1226           0 :     memcpy( &msg->slot_exec.identity, ctx->validator_identity_pubkey, sizeof( fd_pubkey_t ) );
    1227           0 :     NOTIFY_END;
    1228           0 :   }
    1229             : 
    1230           0 : #undef NOTIFY_START
    1231           0 : #undef NOTIFY_END
    1232           0 :   notify_time_ns += fd_log_wallclock();
    1233           0 :   FD_LOG_DEBUG(("TIMING: notify_slot_time - slot: %lu, elapsed: %6.6f ms", curr_slot, (double)notify_time_ns * 1e-6));
    1234             : 
    1235           0 :   if( ctx->replay_plugin_out_mem ) {
    1236           0 :     fd_replay_complete_msg_t msg2 = {
    1237           0 :       .slot = curr_slot,
    1238           0 :       .total_txn_count = fork->slot_ctx.txn_count,
    1239           0 :       .nonvote_txn_count = fork->slot_ctx.nonvote_txn_count,
    1240           0 :       .failed_txn_count = fork->slot_ctx.failed_txn_count,
    1241           0 :       .nonvote_failed_txn_count = fork->slot_ctx.nonvote_failed_txn_count,
    1242           0 :       .compute_units = fork->slot_ctx.total_compute_units_used,
    1243           0 :       .transaction_fee = fork->slot_ctx.slot_bank.collected_execution_fees,
    1244           0 :       .priority_fee = fork->slot_ctx.slot_bank.collected_priority_fees,
    1245           0 :       .parent_slot = ctx->parent_slot,
    1246           0 :     };
    1247           0 :     replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_SLOT_COMPLETED, (uchar const *)&msg2, sizeof(msg2) );
    1248           0 :   }
    1249           0 : }
    1250             : 
    1251             : static void
    1252           0 : send_tower_sync( fd_replay_tile_ctx_t * ctx ) {
    1253           0 :   if( FD_UNLIKELY( !ctx->vote ) ) return;
    1254           0 :   FD_LOG_NOTICE( ( "sending tower sync" ) );
    1255           0 :   ulong vote_slot = fd_tower_votes_peek_tail_const( ctx->tower )->slot;
    1256           0 :   fd_hash_t vote_bank_hash[1]  = { 0 };
    1257           0 :   fd_hash_t vote_block_hash[1] = { 0 };
    1258           0 :   int err = fd_blockstore_bank_hash_query( ctx->blockstore, vote_slot, vote_bank_hash );
    1259           0 :   if( err ) FD_LOG_ERR(( "invariant violation: missing bank hash for tower vote" ));
    1260           0 :   err = fd_blockstore_block_hash_query( ctx->blockstore, vote_slot, vote_block_hash );
    1261           0 :   if( err ) FD_LOG_ERR(( "invariant violation: missing block hash for tower vote" ));
    1262             : 
    1263             :   /* Build a vote state update based on current tower votes. */
    1264             : 
    1265           0 :   fd_txn_p_t * txn = (fd_txn_p_t *)fd_chunk_to_laddr( ctx->sender_out_mem, ctx->sender_out_chunk );
    1266           0 :   fd_tower_to_vote_txn( ctx->tower,
    1267           0 :                         ctx->root,
    1268           0 :                         vote_bank_hash,
    1269           0 :                         vote_block_hash,
    1270           0 :                         ctx->validator_identity,
    1271           0 :                         ctx->vote_authority,
    1272           0 :                         ctx->vote_acc,
    1273           0 :                         txn,
    1274           0 :                         ctx->runtime_spad );
    1275             : 
    1276             :   /* TODO: Can use a smaller size, adjusted for payload length */
    1277           0 :   ulong msg_sz     = sizeof( fd_txn_p_t );
    1278           0 :   fd_mcache_publish( ctx->sender_out_mcache,
    1279           0 :                      ctx->sender_out_depth,
    1280           0 :                      ctx->sender_out_seq,
    1281           0 :                      1UL,
    1282           0 :                      ctx->sender_out_chunk,
    1283           0 :                      msg_sz,
    1284           0 :                      0UL,
    1285           0 :                      0,
    1286           0 :                      0 );
    1287           0 :   ctx->sender_out_seq   = fd_seq_inc( ctx->sender_out_seq, 1UL );
    1288           0 :   ctx->sender_out_chunk = fd_dcache_compact_next( ctx->sender_out_chunk,
    1289           0 :                                                   msg_sz,
    1290           0 :                                                   ctx->sender_out_chunk0,
    1291           0 :                                                   ctx->sender_out_wmark );
    1292             : 
    1293             :   /* Dump the latest sent tower into the tower checkpoint file */
    1294           0 :   if( FD_LIKELY( ctx->tower_checkpt_fileno > 0 ) ) fd_restart_tower_checkpt( vote_bank_hash, ctx->tower, ctx->ghost, ctx->root, ctx->tower_checkpt_fileno );
    1295           0 : }
    1296             : 
    1297             : static fd_fork_t *
    1298             : prepare_new_block_execution( fd_replay_tile_ctx_t * ctx,
    1299             :                              fd_stem_context_t *    stem,
    1300             :                              ulong                  curr_slot,
    1301           0 :                              ulong                  flags ) {
    1302           0 :   long prepare_time_ns = -fd_log_wallclock();
    1303             : 
    1304           0 :   int is_new_epoch_in_new_block = 0;
    1305           0 :   fd_fork_t * fork = fd_forks_prepare( ctx->forks,
    1306           0 :                                        ctx->parent_slot,
    1307           0 :                                        ctx->acc_mgr,
    1308           0 :                                        ctx->blockstore,
    1309           0 :                                        ctx->epoch_ctx,
    1310           0 :                                        ctx->funk,
    1311           0 :                                        ctx->runtime_spad );
    1312             :   // Remove slot ctx from frontier
    1313           0 :   fd_fork_t * child = fd_fork_frontier_ele_remove( ctx->forks->frontier, &fork->slot, NULL, ctx->forks->pool );
    1314           0 :   child->slot = curr_slot;
    1315           0 :   if( FD_UNLIKELY( fd_fork_frontier_ele_query(
    1316           0 :       ctx->forks->frontier, &curr_slot, NULL, ctx->forks->pool ) ) ) {
    1317           0 :     FD_LOG_ERR( ( "invariant violation: child slot %lu was already in the frontier", curr_slot ) );
    1318           0 :   }
    1319           0 :   fd_fork_frontier_ele_insert( ctx->forks->frontier, child, ctx->forks->pool );
    1320           0 :   fork->lock = 1;
    1321           0 :   FD_TEST( fork == child );
    1322             : 
    1323             :   // fork is advancing
    1324           0 :   FD_LOG_NOTICE(( "new block execution - slot: %lu, parent_slot: %lu", curr_slot, ctx->parent_slot ));
    1325           0 :   fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( fork->slot_ctx.epoch_ctx );
    1326             : 
    1327             :   /* if it is an epoch boundary, push out stake weights */
    1328           0 :   if( fork->slot_ctx.slot_bank.slot != 0 ) {
    1329           0 :     is_new_epoch_in_new_block = (int)fd_runtime_is_epoch_boundary( epoch_bank, fork->slot_ctx.slot_bank.slot, fork->slot_ctx.slot_bank.prev_slot );
    1330           0 :   }
    1331             : 
    1332           0 :   fd_block_map_query_t query[1] = { 0 };
    1333           0 :   int err = fd_block_map_prepare( ctx->blockstore->block_map, &curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
    1334           0 :   fd_block_info_t * curr_block_info = fd_block_map_query_ele( query );
    1335           0 :   if( FD_UNLIKELY( err == FD_MAP_ERR_FULL ) ) FD_LOG_ERR(("Block map prepare failed, likely corrupt."));
    1336           0 :   if( FD_UNLIKELY( curr_slot != curr_block_info->slot ) ) FD_LOG_ERR(("Block map prepare failed, likely corrupt."));
    1337           0 :   curr_block_info->in_poh_hash = fork->slot_ctx.slot_bank.poh;
    1338           0 :   fd_block_map_publish( query );
    1339             : 
    1340           0 :   fork->slot_ctx.slot_bank.prev_slot   = fork->slot_ctx.slot_bank.slot;
    1341           0 :   fork->slot_ctx.slot_bank.slot        = curr_slot;
    1342           0 :   fork->slot_ctx.slot_bank.tick_height = fork->slot_ctx.slot_bank.max_tick_height;
    1343           0 :   if( FD_UNLIKELY( FD_RUNTIME_EXECUTE_SUCCESS != fd_runtime_compute_max_tick_height( epoch_bank->ticks_per_slot, curr_slot, &fork->slot_ctx.slot_bank.max_tick_height ) ) ) {
    1344           0 :     FD_LOG_ERR(( "couldn't compute tick height/max tick height slot %lu ticks_per_slot %lu", curr_slot, epoch_bank->ticks_per_slot ));
    1345           0 :   }
    1346           0 :   fork->slot_ctx.enable_exec_recording = ctx->tx_metadata_storage;
    1347           0 :   fork->slot_ctx.runtime_wksp          = fd_wksp_containing( ctx->runtime_spad );
    1348             : 
    1349             :   /* NOTE: By commenting this out, we don't support forking at the epoch boundary
    1350             :      but this code is buggy and leads to crashes. */
    1351             :   // if( fd_runtime_is_epoch_boundary( epoch_bank, fork->slot_ctx.slot_bank.slot, fork->slot_ctx.slot_bank.prev_slot ) ) {
    1352             :   //   FD_LOG_WARNING(("Epoch boundary"));
    1353             : 
    1354             :   //   fd_epoch_fork_elem_t * epoch_fork = NULL;
    1355             :   //   ulong new_epoch = fd_slot_to_epoch( &epoch_bank->epoch_schedule, fork->slot_ctx.slot_bank.slot, NULL );
    1356             :   //   uint found = fd_epoch_forks_prepare( ctx->epoch_forks, fork->slot_ctx.slot_bank.prev_slot, new_epoch, &epoch_fork );
    1357             : 
    1358             :   //   if( FD_UNLIKELY( found ) ) {
    1359             :   //     fd_exec_epoch_ctx_bank_mem_clear( epoch_fork->epoch_ctx );
    1360             :   //   }
    1361             :   //   fd_exec_epoch_ctx_t * prev_epoch_ctx = fork->slot_ctx.epoch_ctx;
    1362             : 
    1363             :   //   fd_exec_epoch_ctx_from_prev( epoch_fork->epoch_ctx, prev_epoch_ctx, ctx->runtime_spad );
    1364             :   //   fork->slot_ctx.epoch_ctx = epoch_fork->epoch_ctx;
    1365             :   // }
    1366             : 
    1367           0 :   fork->slot_ctx.status_cache = ctx->status_cache;
    1368             : 
    1369           0 :   fd_funk_txn_xid_t xid = { 0 };
    1370             : 
    1371           0 :   if( flags & REPLAY_FLAG_PACKED_MICROBLOCK ) {
    1372           0 :     memset( xid.uc, 0, sizeof(fd_funk_txn_xid_t) );
    1373           0 :   } else {
    1374           0 :     xid.ul[1] = fork->slot_ctx.slot_bank.slot;
    1375           0 :   }
    1376           0 :   xid.ul[0] = fork->slot_ctx.slot_bank.slot;
    1377             :   /* push a new transaction on the stack */
    1378           0 :   fd_funk_start_write( ctx->funk );
    1379           0 :   fork->slot_ctx.funk_txn = fd_funk_txn_prepare(ctx->funk, fork->slot_ctx.funk_txn, &xid, 1);
    1380           0 :   fd_funk_end_write( ctx->funk );
    1381             : 
    1382           0 :   fd_runtime_block_pre_execute_process_new_epoch( &fork->slot_ctx,
    1383           0 :                                                   ctx->tpool,
    1384           0 :                                                   ctx->exec_spads,
    1385           0 :                                                   ctx->exec_spad_cnt,
    1386           0 :                                                   ctx->runtime_spad );
    1387             : 
    1388             :   /* We want to push on a spad frame before we start executing a block.
    1389             :      Apart from allocations made at the epoch boundary, there should be no
    1390             :      allocations that persist beyond the scope of a block. Before this point,
    1391             :      there should only be 1 or 2 frames that are on the stack. The first frame
    1392             :      will hold memory for the slot/epoch context. The potential second frame
    1393             :      will only exist while rewards are being distributed (around the start of
    1394             :      an epoch). We pop a frame when rewards are done being distributed. */
    1395           0 :   fd_spad_push( ctx->runtime_spad );
    1396             : 
    1397           0 :   int res = fd_runtime_block_execute_prepare( &fork->slot_ctx, ctx->runtime_spad );
    1398           0 :   if( res != FD_RUNTIME_EXECUTE_SUCCESS ) {
    1399           0 :     FD_LOG_ERR(( "block prep execute failed" ));
    1400           0 :   }
    1401             : 
    1402             :   /* Read slot history into slot ctx */
    1403           0 :   fork->slot_ctx.slot_history = fd_sysvar_slot_history_read( fork->slot_ctx.acc_mgr, fork->slot_ctx.funk_txn, ctx->runtime_spad );
    1404             : 
    1405           0 :   if( is_new_epoch_in_new_block ) {
    1406           0 :     publish_stake_weights( ctx, stem, &fork->slot_ctx );
    1407           0 :   }
    1408             : 
    1409           0 :   prepare_time_ns += fd_log_wallclock();
    1410           0 :   FD_LOG_DEBUG(("TIMING: prepare_time - slot: %lu, elapsed: %6.6f ms", curr_slot, (double)prepare_time_ns * 1e-6));
    1411             : 
    1412           0 :   return fork;
    1413           0 : }
    1414             : 
    1415             : void
    1416           0 : init_poh( fd_replay_tile_ctx_t * ctx ) {
    1417           0 :   FD_LOG_INFO(( "sending init msg" ));
    1418           0 :   fd_replay_out_ctx_t * bank_out = &ctx->bank_out[ 0UL ];
    1419           0 :   fd_poh_init_msg_t * msg = fd_chunk_to_laddr( bank_out->mem, bank_out->chunk );
    1420           0 :   fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->epoch_ctx );
    1421           0 :   msg->hashcnt_per_tick = ctx->epoch_ctx->epoch_bank.hashes_per_tick;
    1422           0 :   msg->ticks_per_slot   = ctx->epoch_ctx->epoch_bank.ticks_per_slot;
    1423           0 :   msg->tick_duration_ns = (ulong)(epoch_bank->ns_per_slot / epoch_bank->ticks_per_slot);
    1424           0 :   if( ctx->slot_ctx->slot_bank.block_hash_queue.last_hash ) {
    1425           0 :     memcpy(msg->last_entry_hash, ctx->slot_ctx->slot_bank.block_hash_queue.last_hash->uc, sizeof(fd_hash_t));
    1426           0 :   } else {
    1427           0 :     memset(msg->last_entry_hash, 0UL, sizeof(fd_hash_t));
    1428           0 :   }
    1429           0 :   msg->tick_height = ctx->slot_ctx->slot_bank.slot * msg->ticks_per_slot;
    1430             : 
    1431           0 :   ulong sig = fd_disco_replay_old_sig( ctx->slot_ctx->slot_bank.slot, REPLAY_FLAG_INIT );
    1432           0 :   fd_mcache_publish( bank_out->mcache, bank_out->depth, bank_out->seq, sig, bank_out->chunk, sizeof(fd_poh_init_msg_t), 0UL, 0UL, 0UL );
    1433           0 :   bank_out->chunk = fd_dcache_compact_next( bank_out->chunk, sizeof(fd_poh_init_msg_t), bank_out->chunk0, bank_out->wmark );
    1434           0 :   bank_out->seq = fd_seq_inc( bank_out->seq, 1UL );
    1435           0 :   ctx->poh_init_done = 1;
    1436           0 : }
    1437             : 
    1438             : /* Verifies a microblock batch validity. */
    1439             : 
    1440             : static int FD_FN_UNUSED
    1441             : process_and_exec_mbatch( fd_replay_tile_ctx_t * ctx,
    1442             :                          fd_stem_context_t *    stem FD_PARAM_UNUSED,
    1443             :                          ulong                  mbatch_sz,
    1444           0 :                          bool                   last_batch ) {
    1445           0 :   #define wait_and_check_success( worker_idx )         \
    1446           0 :     fd_tpool_wait( ctx->tpool, worker_idx );           \
    1447           0 :     if( poh_info[ worker_idx ].success ) {             \
    1448           0 :       FD_LOG_WARNING(( "Failed to verify tick poh" )); \
    1449           0 :       return -1; \
    1450           0 :     }
    1451           0 : 
    1452           0 :   fd_hash_t in_poh_hash;
    1453           0 :   fd_block_map_query_t query[1] = { 0 } ;
    1454           0 :   int err = FD_MAP_ERR_AGAIN;
    1455           0 :   while( err == FD_MAP_ERR_AGAIN ) {
    1456           0 :     err = fd_block_map_query_try( ctx->blockstore->block_map, &ctx->curr_slot, NULL, query, 0 );
    1457           0 :     fd_block_info_t * block_info = fd_block_map_query_ele( query );
    1458           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
    1459           0 :     if( FD_UNLIKELY( err == FD_MAP_ERR_KEY )) { FD_LOG_ERR(( "Failed to query block map" )); }
    1460           0 :     in_poh_hash = block_info->in_poh_hash;
    1461           0 :     err = fd_block_map_query_test( query );
    1462           0 :   }
    1463           0 : 
    1464           0 :   ulong micro_cnt = FD_LOAD( ulong, ctx->mbatch );
    1465           0 : 
    1466           0 :   if( FD_UNLIKELY( !micro_cnt ) ) { /* in the case of zero padding */
    1467           0 :     FD_LOG_DEBUG(( "No microblocks in batch" ));
    1468           0 :     return 0;
    1469           0 :   }
    1470           0 : 
    1471           0 :   fd_poh_verifier_t     poh_info         = {0};
    1472           0 :   (void)poh_info;
    1473           0 : 
    1474           0 :   fd_microblock_hdr_t * hdr              = NULL;
    1475           0 :   ulong                 off              = sizeof(ulong);
    1476           0 :   for( ulong i=0UL; i<micro_cnt; i++ ){
    1477           0 :     hdr = (fd_microblock_hdr_t *)fd_type_pun( ctx->mbatch + off );
    1478           0 :     int res = fd_runtime_microblock_verify_ticks( ctx->slot_ctx,
    1479           0 :                                                   ctx->curr_slot,
    1480           0 :                                                   hdr,
    1481           0 :                                                   last_batch && i == micro_cnt - 1,
    1482           0 :                                                   ctx->slot_ctx->slot_bank.tick_height,
    1483           0 :                                                   ctx->slot_ctx->slot_bank.max_tick_height,
    1484           0 :                                                   ctx->slot_ctx->epoch_ctx->epoch_bank.hashes_per_tick );
    1485           0 : 
    1486           0 :     if( res != FD_BLOCK_OK ) {
    1487           0 :       FD_LOG_WARNING(( "Failed to verify tick metadata" ));
    1488           0 :       return -1;
    1489           0 :     }
    1490           0 : 
    1491           0 :     poh_info.success         = 0;
    1492           0 :     poh_info.in_poh_hash     = &in_poh_hash;
    1493           0 :     poh_info.microblock.hdr  = hdr;
    1494           0 :     poh_info.spad            = ctx->runtime_spad;
    1495           0 :     poh_info.microblk_max_sz = mbatch_sz - off;
    1496           0 : 
    1497           0 :     off += sizeof(fd_microblock_hdr_t);
    1498           0 : 
    1499           0 :     /* FIXME: This needs to be multithreaded. This will be reintroduced when
    1500           0 :        the execution model changes are made */
    1501           0 :     // fd_runtime_poh_verify( &poh_info );
    1502           0 :     // if( poh_info.success==-1 ) {
    1503           0 :     //   FD_LOG_WARNING(( "Failed to verify poh hash" ));
    1504           0 :     //   return -1;
    1505           0 :     // }
    1506           0 : 
    1507           0 :     in_poh_hash = *(fd_hash_t *)fd_type_pun( hdr->hash );
    1508           0 : 
    1509           0 :     /* seek past txns */
    1510           0 :     fd_txn_p_t * txn_p  = fd_spad_alloc( ctx->runtime_spad, alignof(fd_txn_p_t*), sizeof(fd_txn_p_t) * hdr->txn_cnt );
    1511           0 :     for( ulong t=0UL; t<hdr->txn_cnt; t++ ){
    1512           0 :       ulong pay_sz = 0UL;
    1513           0 :       ulong txn_sz = fd_txn_parse_core( ctx->mbatch + off,
    1514           0 :                                         fd_ulong_min( FD_TXN_MTU, mbatch_sz - off ),
    1515           0 :                                         TXN( &txn_p[t] ),
    1516           0 :                                         NULL,
    1517           0 :                                         &pay_sz );
    1518           0 : 
    1519           0 :       if( FD_UNLIKELY( !pay_sz || !txn_sz || txn_sz > FD_TXN_MTU ) ) {
    1520           0 :         FD_LOG_WARNING(( "failed to parse transaction %lu in replay", t ));
    1521           0 :         return -1;
    1522           0 :       }
    1523           0 :       fd_memcpy( txn_p[t].payload, ctx->mbatch + off, pay_sz );
    1524           0 :       txn_p[t].payload_sz = pay_sz;
    1525           0 :       off                += pay_sz;
    1526           0 : 
    1527           0 :       /* Execute Transaction  */
    1528           0 : 
    1529           0 :       /* dispatch into MCACHE / DCACHE */
    1530           0 :       // fd_replay_out_ctx_t * out = &ctx->exec_out[ 0 ];
    1531           0 :       // fd_stem_publish( stem, out->idx, 0, out->chunk, sizeof(fd_txn_p_t), 0UL, 0UL, 0UL );
    1532           0 :       // out->chunk = fd_dcache_compact_next( out->chunk,  sizeof(fd_txn_p_t), out->chunk0, out->wmark );
    1533           0 :     }
    1534           0 : 
    1535           0 :     /* Now that we have parsed the mblock, we are ready to execute the whole mblock */
    1536           0 :     fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier,
    1537           0 :                                                    &ctx->curr_slot,
    1538           0 :                                                    NULL,
    1539           0 :                                                    ctx->forks->pool );
    1540           0 :     if( FD_UNLIKELY( !fork ) ) {
    1541           0 :       FD_LOG_ERR(( "Unable to select a fork" ));
    1542           0 :     }
    1543           0 : 
    1544           0 :     err = fd_runtime_process_txns_in_microblock_stream( &fork->slot_ctx,
    1545           0 :                                                         ctx->capture_ctx,
    1546           0 :                                                         txn_p,
    1547           0 :                                                         hdr->txn_cnt,
    1548           0 :                                                         ctx->tpool,
    1549           0 :                                                         ctx->exec_spads,
    1550           0 :                                                         ctx->exec_spad_cnt,
    1551           0 :                                                         ctx->runtime_spad,
    1552           0 :                                                         NULL );
    1553           0 : 
    1554           0 :     fd_block_map_query_t query[1] = { 0 };
    1555           0 :     fd_block_map_prepare( ctx->blockstore->block_map, &ctx->curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
    1556           0 :     fd_block_info_t * block_info = fd_block_map_query_ele( query );
    1557           0 :     if( FD_UNLIKELY( !block_info || block_info->slot != ctx->curr_slot ) ) FD_LOG_ERR(( "[%s] invariant violation: missing block_info %lu", __func__, ctx->curr_slot ));
    1558           0 : 
    1559           0 :     if( err != FD_RUNTIME_EXECUTE_SUCCESS ) {
    1560           0 :       FD_LOG_WARNING(( "microblk process: block invalid - slot: %lu", ctx->curr_slot ));
    1561           0 :       block_info->flags = fd_uchar_set_bit( block_info->flags, FD_BLOCK_FLAG_DEADBLOCK );
    1562           0 :       FD_COMPILER_MFENCE();
    1563           0 :       block_info->flags = fd_uchar_clear_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING );
    1564           0 :       fd_block_map_publish( query );
    1565           0 :       return -1;
    1566           0 :     }
    1567           0 : 
    1568           0 :     if( last_batch && i == micro_cnt - 1 ) {
    1569           0 : 
    1570           0 :       // Copy block hash to slot_bank poh for updating the sysvars
    1571           0 : 
    1572           0 :       memcpy( fork->slot_ctx.slot_bank.poh.uc, hdr->hash, sizeof(fd_hash_t) );
    1573           0 : 
    1574           0 :       block_info->flags = fd_uchar_set_bit( block_info->flags, FD_BLOCK_FLAG_PROCESSED );
    1575           0 :       FD_COMPILER_MFENCE();
    1576           0 :       block_info->flags = fd_uchar_clear_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING );
    1577           0 :       memcpy( &block_info->block_hash, hdr->hash, sizeof(fd_hash_t) );
    1578           0 :       memcpy( &block_info->bank_hash, &fork->slot_ctx.slot_bank.banks_hash, sizeof(fd_hash_t) );
    1579           0 :     }
    1580           0 :     publish_account_notifications( ctx, fork, ctx->curr_slot, txn_p, hdr->txn_cnt );
    1581           0 :     fd_block_map_publish( query );
    1582           0 :   }
    1583           0 :   return 0;
    1584           0 : # undef wait_and_check_success
    1585           0 : }
    1586             : 
    1587             : static void
    1588           0 : prepare_first_batch_execution( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem ){
    1589           0 :   ulong curr_slot   = ctx->curr_slot;
    1590           0 :   ulong parent_slot = ctx->parent_slot;
    1591           0 :   ulong flags       = ctx->flags;
    1592           0 :   if( FD_UNLIKELY( curr_slot < fd_fseq_query( ctx->published_wmark ) ) ) {
    1593           0 :     FD_LOG_WARNING(( "ignoring replay of slot %lu (parent: %lu). earlier than our watermark %lu.", curr_slot, parent_slot, fd_fseq_query( ctx->published_wmark ) ));
    1594           0 :     return;
    1595           0 :   }
    1596             : 
    1597           0 :   if( FD_UNLIKELY( parent_slot < fd_fseq_query( ctx->published_wmark ) ) ) {
    1598           0 :     FD_LOG_WARNING(( "ignoring replay of slot %lu (parent: %lu). parent slot is earlier than our watermark %lu.", curr_slot, parent_slot, fd_fseq_query( ctx->published_wmark ) ) );
    1599           0 :     return;
    1600           0 :   }
    1601             : 
    1602           0 :   if( FD_UNLIKELY( !fd_blockstore_block_info_test( ctx->blockstore, parent_slot ) ) ) {
    1603           0 :     FD_LOG_WARNING(( "[%s] unable to find slot %lu's parent block_info", __func__, curr_slot ));
    1604           0 :     return;
    1605           0 :   }
    1606             : 
    1607             :   /**********************************************************************/
    1608             :   /* Get the epoch_ctx for replaying curr_slot                          */
    1609             :   /**********************************************************************/
    1610             : 
    1611           0 :   ulong epoch_ctx_idx = fd_epoch_forks_get_epoch_ctx( ctx->epoch_forks, ctx->ghost, curr_slot, &ctx->parent_slot );
    1612           0 :   ctx->epoch_ctx = ctx->epoch_forks->forks[ epoch_ctx_idx ].epoch_ctx;
    1613             : 
    1614             :   /**********************************************************************/
    1615             :   /* Prepare the fork in ctx->forks for replaying curr_slot             */
    1616             :   /**********************************************************************/
    1617             : 
    1618           0 :   fd_fork_t * parent_fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &ctx->parent_slot, NULL, ctx->forks->pool );
    1619           0 :   if( FD_UNLIKELY( parent_fork && parent_fork->lock ) ) {
    1620             :     /* This is an edge case related to pack. The parent fork might
    1621             :        already be in the frontier and currently executing (ie.
    1622             :        fork->frozen = 0). */
    1623           0 :     FD_LOG_ERR(( "parent slot is frozen in frontier. cannot execute. slot: %lu, parent_slot: %lu",
    1624           0 :                  curr_slot,
    1625           0 :                  ctx->parent_slot ));
    1626           0 :   }
    1627             : 
    1628           0 :   fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &curr_slot, NULL, ctx->forks->pool );
    1629           0 :   if( fork == NULL ) {
    1630           0 :     fork = prepare_new_block_execution( ctx, stem, curr_slot, flags );
    1631           0 :   }
    1632           0 :   ctx->slot_ctx = &fork->slot_ctx;
    1633             : 
    1634             :   /**********************************************************************/
    1635             :   /* Get the solcap context for replaying curr_slot                     */
    1636             :   /**********************************************************************/
    1637             : 
    1638           0 :   if( ctx->capture_ctx )
    1639           0 :     fd_solcap_writer_set_slot( ctx->capture_ctx->capture, fork->slot_ctx.slot_bank.slot );
    1640             : 
    1641           0 : }
    1642             : 
    1643             : static void
    1644             : exec_slices( fd_replay_tile_ctx_t * ctx,
    1645             :              fd_stem_context_t * stem FD_PARAM_UNUSED,
    1646           0 :              ulong slot ) {
    1647             :   /* Buffer up to a certain number of slices (configurable?). Then, for
    1648             :      each microblock, round robin dispatch the transactions in that
    1649             :      microblock to the exec tile. Once exec tile signifies with a
    1650             :      retcode, we can continue dispatching transactions. Have to
    1651             :      synchronize at the boundary of every microblock. After we dispatch
    1652             :      one to each exec tile, we watermark (ctx->mbatch_wmark) where we
    1653             :      are, and then continue on the following after_credit. If we still
    1654             :      have txns to execute, start from wmark, pausing everytime we hit
    1655             :      the microblock boundaries. */
    1656             : 
    1657           0 :   fd_replay_slice_t * slice = fd_replay_slice_map_query( ctx->replay->slice_map, slot, NULL );
    1658           0 :   if( !slice ) {
    1659           0 :     slice = fd_replay_slice_map_insert( ctx->replay->slice_map, slot );
    1660           0 :   }
    1661             : 
    1662             :   /* Manual population of the slice deque occurs currently when we are:
    1663             :       1. Repairing and catching up. All shreds in this case come through
    1664             :          repair, and thus aren't processed in SHRED_IN_IDX in before_frag
    1665             :       2. Repairing shreds after first turbine. Some of the batches will
    1666             :          be added to the slice_deque through SHRED, but missing shreds
    1667             :          are still recieved through repair, and aren't processed in  */
    1668             : 
    1669           0 :   if( ctx->last_completed_slot != slot && fd_replay_slice_deque_cnt( slice->deque ) == 0 ) {
    1670           0 :     FD_LOG_INFO(( "Failed to query slice deque for slot %lu. Likely shreds were recieved through repair. Manually adding.", slot ));
    1671           0 :     slice_poll( ctx, slice, slot );
    1672           0 :   }
    1673             : 
    1674             :   //ulong free_exec_tiles = ctx->exec_cnt;
    1675           0 :   ulong free_exec_tiles = 512;
    1676             : 
    1677           0 :   while( free_exec_tiles > 0 ){
    1678             :     /* change to whatever condition handles if(exec free). */
    1679           0 :     if( ctx->slice_exec_ctx.txns_rem > 0 ){
    1680             :       //FD_LOG_WARNING(( "[%s] executing txn", __func__ ));
    1681           0 :       ulong pay_sz = 0UL;
    1682           0 :       fd_replay_out_ctx_t * exec_out = &ctx->exec_out[ ctx->exec_cnt - free_exec_tiles ];
    1683           0 :       (void)exec_out;
    1684             :       //fd_txn_p_t * txn_p = (fd_txn_p_t *) fd_chunk_to_laddr( exec_out->mem, exec_out->chunk );
    1685           0 :       fd_txn_p_t txn_p[1];
    1686           0 :       ulong txn_sz = fd_txn_parse_core( ctx->mbatch + ctx->slice_exec_ctx.wmark,
    1687           0 :                                         fd_ulong_min( FD_TXN_MTU, ctx->slice_exec_ctx.sz - ctx->slice_exec_ctx.wmark ),
    1688           0 :                                         TXN( txn_p ),
    1689           0 :                                         NULL,
    1690           0 :                                         &pay_sz );
    1691             : 
    1692           0 :       if( FD_UNLIKELY( !pay_sz || !txn_sz || txn_sz > FD_TXN_MTU ) ) {
    1693           0 :         __asm__("int $3");
    1694           0 :         FD_LOG_ERR(( "failed to parse transaction in replay" ));
    1695           0 :       }
    1696           0 :       fd_memcpy( txn_p->payload, ctx->mbatch + ctx->slice_exec_ctx.wmark, pay_sz );
    1697           0 :       txn_p->payload_sz = pay_sz;
    1698           0 :       ctx->slice_exec_ctx.wmark += pay_sz;
    1699             : 
    1700             :       /* dispatch dcache */
    1701             :       //fd_stem_publish( stem, exec_out->idx, slot, exec_out->chunk, sizeof(fd_txn_p_t), 0UL, 0UL, 0UL );
    1702             :       //exec_out->chunk = fd_dcache_compact_next( exec_out->chunk, sizeof(fd_txn_p_t), exec_out->chunk0, exec_out->wmark );
    1703             : 
    1704             :       /* dispatch tpool */
    1705             : 
    1706           0 :       fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier,
    1707           0 :                                                      &slot,
    1708           0 :                                                      NULL,
    1709           0 :                                                      ctx->forks->pool );
    1710           0 :       if( FD_UNLIKELY( !fork ) ) {
    1711           0 :         FD_LOG_ERR(( "Unable to select a fork" ));
    1712           0 :       }
    1713             : 
    1714           0 :       int err = fd_runtime_process_txns_in_microblock_stream( &fork->slot_ctx,
    1715           0 :                   ctx->capture_ctx,
    1716           0 :                   txn_p,
    1717           0 :                   1,
    1718           0 :                   ctx->tpool,
    1719           0 :                   ctx->exec_spads,
    1720           0 :                   ctx->exec_spad_cnt,
    1721           0 :                   ctx->runtime_spad,
    1722           0 :                   NULL );
    1723             : 
    1724           0 :       if( err != FD_RUNTIME_EXECUTE_SUCCESS ) {
    1725           0 :         FD_LOG_WARNING(( "microblk process: block invalid - slot: %lu", ctx->curr_slot ));
    1726             : 
    1727           0 :         fd_block_map_query_t query[1] = { 0 };
    1728           0 :         fd_block_map_prepare( ctx->blockstore->block_map, &ctx->curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
    1729           0 :         fd_block_info_t * block_info = fd_block_map_query_ele( query );
    1730           0 :         if( FD_UNLIKELY( !block_info || block_info->slot != ctx->curr_slot ) ) FD_LOG_ERR(( "[%s] invariant violation: missing block_info %lu", __func__, ctx->curr_slot ));
    1731             : 
    1732           0 :         block_info->flags = fd_uchar_set_bit( block_info->flags, FD_BLOCK_FLAG_DEADBLOCK );
    1733           0 :         FD_COMPILER_MFENCE();
    1734           0 :         block_info->flags = fd_uchar_clear_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING );
    1735             : 
    1736           0 :         fd_block_map_publish( query );
    1737           0 :       }
    1738             : 
    1739           0 :       publish_account_notifications( ctx, fork, ctx->curr_slot, txn_p, 1 );
    1740             : 
    1741           0 :       ctx->slice_exec_ctx.txns_rem--;
    1742           0 :       free_exec_tiles--;
    1743           0 :       continue;
    1744           0 :     }
    1745             : 
    1746             :     /* If the current microblock is complete, and we still have mblks
    1747             :        to read, then advance to the next microblock */
    1748             : 
    1749           0 :     if( ctx->slice_exec_ctx.txns_rem == 0 && ctx->slice_exec_ctx.mblks_rem > 0 ){
    1750             :       //FD_LOG_WARNING(( "[%s] reading microblock", __func__ ));
    1751             : 
    1752           0 :       fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)fd_type_pun( ctx->mbatch + ctx->slice_exec_ctx.wmark );
    1753           0 :       ctx->slice_exec_ctx.txns_rem      = hdr->txn_cnt;
    1754           0 :       ctx->slice_exec_ctx.last_mblk_off = ctx->slice_exec_ctx.wmark;
    1755           0 :       ctx->slice_exec_ctx.wmark        += sizeof(fd_microblock_hdr_t);
    1756           0 :       ctx->slice_exec_ctx.mblks_rem--;
    1757           0 :       if( free_exec_tiles == 512 ){
    1758             :         /* no transactions were executed this credit, free to start executing new microblock txns */
    1759           0 :         continue;
    1760           0 :       }
    1761           0 :       break; /* have to synchronize & wait for exec tiles to finish the prev microblock */
    1762           0 :     }
    1763             : 
    1764             :     /* The prev batch is complete, but we have more batches to read. */
    1765             : 
    1766           0 :     if( ctx->slice_exec_ctx.mblks_rem == 0 && !ctx->slice_exec_ctx.last_batch ) {
    1767             : 
    1768             :       /* Waiting on batches to arrive from the shred tile */
    1769             : 
    1770           0 :       if( fd_replay_slice_deque_cnt( slice->deque ) == 0 ) break;
    1771             : 
    1772           0 :       if( FD_UNLIKELY( ctx->slice_exec_ctx.sz == 0 ) ) { /* I think maybe can move this out when */
    1773           0 :         FD_LOG_NOTICE(("Preparing first batch execution of slot %lu", slot ));
    1774           0 :         prepare_first_batch_execution( ctx, stem );
    1775           0 :       }
    1776             : 
    1777           0 :       ulong key       = fd_replay_slice_deque_pop_head( slice->deque );
    1778           0 :       uint  start_idx = fd_replay_slice_start_idx( key );
    1779           0 :       uint  end_idx   = fd_replay_slice_end_idx  ( key );
    1780             : 
    1781             :       /* populate last shred idx. Can also do this just once but... */
    1782           0 :       for(;;) { /* speculative query */
    1783           0 :         fd_block_map_query_t query[1] = { 0 };
    1784           0 :         int err = fd_block_map_query_try( ctx->blockstore->block_map, &slot, NULL, query, 0 );
    1785           0 :         fd_block_info_t * block_info = fd_block_map_query_ele( query );
    1786             : 
    1787           0 :         if( FD_UNLIKELY( err == FD_MAP_ERR_KEY   ) ) FD_LOG_ERR(("Failed to query blockstore for slot %lu", slot ));
    1788           0 :         if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
    1789             : 
    1790           0 :         ctx->slice_exec_ctx.last_batch = block_info->slot_complete_idx == end_idx;
    1791             :         //slot_complete_idx = block_info->slot_complete_idx;
    1792           0 :         if( FD_UNLIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) break;
    1793           0 :       }
    1794             :       //FD_LOG_WARNING(( "[%s] Executing batch %u %u, last: %u", __func__, start_idx, end_idx, slot_complete_idx ));
    1795             : 
    1796           0 :       ulong slice_sz;
    1797           0 :       int err = fd_blockstore_slice_query( ctx->slot_ctx->blockstore,
    1798           0 :                                                        slot,
    1799           0 :                                                        start_idx,
    1800           0 :                                                        end_idx,
    1801           0 :                                                       FD_SLICE_MAX - ctx->slice_exec_ctx.sz,
    1802           0 :                                                       ctx->mbatch + ctx->slice_exec_ctx.sz,
    1803           0 :                                                       &slice_sz );
    1804             : 
    1805           0 :       if( err ) FD_LOG_ERR(( "Failed to query blockstore for slot %lu", slot ));
    1806           0 :       ctx->slice_exec_ctx.mblks_rem = FD_LOAD( ulong, ctx->mbatch + ctx->slice_exec_ctx.sz );
    1807           0 :       ctx->slice_exec_ctx.wmark = ctx->slice_exec_ctx.sz + sizeof(ulong);
    1808           0 :       ctx->slice_exec_ctx.sz += slice_sz;
    1809           0 :       if ( free_exec_tiles == 512 ) continue;
    1810           0 :       break;
    1811           0 :     }
    1812             : 
    1813           0 :     if( FD_UNLIKELY( ctx->slice_exec_ctx.last_batch &&
    1814           0 :                      ctx->slice_exec_ctx.mblks_rem == 0 &&
    1815           0 :                      ctx->slice_exec_ctx.txns_rem == 0 ) ) {
    1816             :       /* block done. */
    1817           0 :       break;
    1818           0 :     }
    1819           0 :   }
    1820             : 
    1821           0 :   if( ctx->slice_exec_ctx.last_batch && ctx->slice_exec_ctx.mblks_rem == 0 && ctx->slice_exec_ctx.txns_rem == 0 ){
    1822           0 :     FD_LOG_WARNING(( "[%s] BLOCK EXECUTION COMPLETE", __func__ ));
    1823             : 
    1824             :      /* At this point, the entire block has been executed. */
    1825           0 :      fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier,
    1826           0 :                                                     &slot,
    1827           0 :                                                     NULL,
    1828           0 :                                                     ctx->forks->pool );
    1829           0 :      if( FD_UNLIKELY( !fork ) ) {
    1830           0 :        FD_LOG_ERR(( "Unable to select a fork" ));
    1831           0 :      }
    1832             : 
    1833           0 :      fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t*)fd_type_pun( ctx->mbatch + ctx->slice_exec_ctx.last_mblk_off );
    1834             : 
    1835             :      // Copy block hash to slot_bank poh for updating the sysvars
    1836           0 :      fd_block_map_query_t query[1] = { 0 };
    1837           0 :      fd_block_map_prepare( ctx->blockstore->block_map, &ctx->curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
    1838           0 :      fd_block_info_t * block_info = fd_block_map_query_ele( query );
    1839             : 
    1840           0 :      memcpy( fork->slot_ctx.slot_bank.poh.uc, hdr->hash, sizeof(fd_hash_t) );
    1841           0 :      block_info->flags = fd_uchar_set_bit( block_info->flags, FD_BLOCK_FLAG_PROCESSED );
    1842           0 :      FD_COMPILER_MFENCE();
    1843           0 :      block_info->flags = fd_uchar_clear_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING );
    1844           0 :      memcpy( &block_info->block_hash, hdr->hash, sizeof(fd_hash_t) );
    1845           0 :      memcpy( &block_info->bank_hash, &fork->slot_ctx.slot_bank.banks_hash, sizeof(fd_hash_t) );
    1846             : 
    1847           0 :      fd_block_map_publish( query );
    1848           0 :      ctx->flags = fd_disco_replay_old_sig( slot, REPLAY_FLAG_FINISHED_BLOCK );
    1849             : 
    1850           0 :      ctx->slice_exec_ctx.last_batch = 0;
    1851           0 :      ctx->slice_exec_ctx.txns_rem = 0;
    1852           0 :      ctx->slice_exec_ctx.mblks_rem = 0;
    1853           0 :      ctx->slice_exec_ctx.sz = 0;
    1854           0 :      ctx->slice_exec_ctx.wmark = 0;
    1855           0 :      ctx->slice_exec_ctx.last_mblk_off = 0;
    1856           0 :   }
    1857           0 : }
    1858             : 
    1859             : static void
    1860             : after_frag( fd_replay_tile_ctx_t * ctx,
    1861             :             ulong                  in_idx,
    1862             :             ulong                  seq,
    1863             :             ulong                  sig   FD_PARAM_UNUSED,
    1864             :             ulong                  sz    FD_PARAM_UNUSED,
    1865             :             ulong                  tsorig,
    1866             :             ulong                  tspub FD_PARAM_UNUSED,
    1867           0 :             fd_stem_context_t *    stem  FD_PARAM_UNUSED ) {
    1868           0 :   (void)sig;
    1869           0 :   (void)sz;
    1870           0 :   (void)seq;
    1871             : 
    1872             :   /*if( FD_LIKELY( in_idx == SHRED_IN_IDX ) ) {
    1873             : 
    1874             :      after_frag only called if it's the first code shred we're
    1875             :        receiving for the FEC set
    1876             : 
    1877             :     ulong slot        = fd_disco_shred_replay_sig_slot( sig );
    1878             :     uint  fec_set_idx = fd_disco_shred_replay_sig_fec_set_idx( sig );
    1879             : 
    1880             :     fd_replay_fec_t * fec = fd_replay_fec_query( ctx->replay, slot, fec_set_idx );
    1881             :     if( !fec ) return; // hack
    1882             :     fec->data_cnt         = ctx->shred->code.data_cnt;
    1883             : 
    1884             :     return;
    1885             :   }*/
    1886             : 
    1887           0 :   if( FD_UNLIKELY( ctx->skip_frag ) ) return;
    1888           0 :   if( FD_UNLIKELY( in_idx == STORE_IN_IDX ) ) {
    1889           0 :     FD_LOG_NOTICE(("Received store message, executing slot %lu", ctx->curr_slot ));
    1890             :     //exec_slices( ctx, stem, ctx->curr_slot );
    1891           0 :   }
    1892             : 
    1893             :   /**********************************************************************/
    1894             :   /* The rest of after_frag replays some microblocks in block curr_slot */
    1895             :   /**********************************************************************/
    1896             : 
    1897           0 :   ulong curr_slot   = ctx->curr_slot;
    1898           0 :   ulong flags       = ctx->flags;
    1899           0 :   ulong bank_idx    = ctx->bank_idx;
    1900             : 
    1901           0 :   fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &ctx->curr_slot, NULL, ctx->forks->pool );
    1902             : 
    1903             :   /**********************************************************************/
    1904             :   /* Execute the transactions which were gathered                       */
    1905             :   /**********************************************************************/
    1906             : 
    1907           0 :   ulong                 txn_cnt  = ctx->txn_cnt;
    1908           0 :   fd_replay_out_ctx_t * bank_out = &ctx->bank_out[ bank_idx ];
    1909           0 :   fd_txn_p_t *          txns     = (fd_txn_p_t *)fd_chunk_to_laddr( bank_out->mem, bank_out->chunk );
    1910             : 
    1911             :   //Execute all txns which were successfully prepared
    1912           0 :   ctx->metrics.slot = curr_slot;
    1913           0 :   if( flags & REPLAY_FLAG_PACKED_MICROBLOCK ) {
    1914             :     /* TODO: The leader pipeline execution needs to be optimized. This is
    1915             :        very hacky and suboptimal. First, wait for the tpool workers to be idle.
    1916             :        Then, execute the transactions, and notify the pack tile. We should be
    1917             :        taking advantage of bank_busy flags. */
    1918             : 
    1919           0 :     for( ulong i=1UL; i<ctx->exec_spad_cnt; i++ ) {
    1920           0 :       fd_tpool_wait( ctx->tpool, i );
    1921           0 :     }
    1922             : 
    1923           0 :     fd_runtime_process_txns_in_microblock_stream( ctx->slot_ctx,
    1924           0 :                                                   ctx->capture_ctx,
    1925           0 :                                                   txns,
    1926           0 :                                                   txn_cnt,
    1927           0 :                                                   ctx->tpool,
    1928           0 :                                                   ctx->exec_spads,
    1929           0 :                                                   ctx->exec_spad_cnt,
    1930           0 :                                                   ctx->runtime_spad,
    1931           0 :                                                   NULL );
    1932             : 
    1933           0 :     fd_microblock_trailer_t * microblock_trailer = (fd_microblock_trailer_t *)(txns + txn_cnt);
    1934             : 
    1935           0 :     hash_transactions( ctx->bmtree[ bank_idx ], txns, txn_cnt, microblock_trailer->hash );
    1936             : 
    1937           0 :     ulong sig = fd_disco_replay_old_sig( curr_slot, flags );
    1938           0 :     fd_mcache_publish( bank_out->mcache, bank_out->depth, bank_out->seq, sig, bank_out->chunk, txn_cnt, 0UL, 0UL, 0UL );
    1939           0 :     bank_out->chunk = fd_dcache_compact_next( bank_out->chunk, (txn_cnt * sizeof(fd_txn_p_t)) + sizeof(fd_microblock_trailer_t), bank_out->chunk0, bank_out->wmark );
    1940           0 :     bank_out->seq = fd_seq_inc( bank_out->seq, 1UL );
    1941             : 
    1942             :     /* Indicate to pack tile we are done processing the transactions so it
    1943             :       can pack new microblocks using these accounts.  DO NOT USE THE
    1944             :       SANITIZED TRANSACTIONS AFTER THIS POINT, THEY ARE NO LONGER VALID. */
    1945           0 :     fd_fseq_update( ctx->bank_busy[ bank_idx ], seq );
    1946             : 
    1947           0 :     publish_account_notifications( ctx, fork, curr_slot, txns, txn_cnt );
    1948           0 :   }
    1949             : 
    1950             :   /**********************************************************************/
    1951             :   /* Init PoH if it is ready                                            */
    1952             :   /**********************************************************************/
    1953             : 
    1954           0 :   if( FD_UNLIKELY( !(flags & REPLAY_FLAG_CATCHING_UP) && ctx->poh_init_done == 0 && ctx->slot_ctx->blockstore ) ) {
    1955           0 :     init_poh( ctx );
    1956           0 :   }
    1957             : 
    1958             :   /**********************************************************************/
    1959             :   /* Publish mblk to POH                                                */
    1960             :   /**********************************************************************/
    1961             : 
    1962           0 :   if( ctx->poh_init_done == 1 && !( flags & REPLAY_FLAG_FINISHED_BLOCK )
    1963           0 :       && ( ( flags & REPLAY_FLAG_MICROBLOCK ) ) ) {
    1964             :     // FD_LOG_INFO(( "publishing mblk to poh - slot: %lu, parent_slot: %lu", curr_slot, ctx->parent_slot ));
    1965           0 :     ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
    1966           0 :     ulong sig = fd_disco_replay_old_sig( curr_slot, flags );
    1967           0 :     fd_mcache_publish( bank_out->mcache, bank_out->depth, bank_out->seq, sig, bank_out->chunk, txn_cnt, 0UL, tsorig, tspub );
    1968           0 :     bank_out->chunk = fd_dcache_compact_next( bank_out->chunk, (txn_cnt * sizeof(fd_txn_p_t)) + sizeof(fd_microblock_trailer_t), bank_out->chunk0, bank_out->wmark );
    1969           0 :     bank_out->seq = fd_seq_inc( bank_out->seq, 1UL );
    1970           0 :   } else {
    1971           0 :     FD_LOG_DEBUG(( "NOT publishing mblk to poh - slot: %lu, parent_slot: %lu, flags: %lx", curr_slot, ctx->parent_slot, flags ));
    1972           0 :   }
    1973             : 
    1974             : #if STOP_SLOT
    1975             :   if( FD_UNLIKELY( curr_slot == STOP_SLOT ) ) {
    1976             : 
    1977             :     if( FD_UNLIKELY( ctx->capture_file ) ) fclose( ctx->slots_replayed_file );
    1978             : 
    1979             :     if( FD_UNLIKELY( strcmp( ctx->blockstore_checkpt, "" ) ) ) {
    1980             :       int rc = fd_wksp_checkpt( ctx->blockstore_wksp, ctx->blockstore_checkpt, 0666, 0, NULL );
    1981             :       if( rc ) {
    1982             :         FD_LOG_ERR( ( "blockstore checkpt failed: error %d", rc ) );
    1983             :       }
    1984             :     }
    1985             :     FD_LOG_ERR( ( "stopping at %lu (#define STOP_SLOT %lu). shutting down.", STOP_SLOT, STOP_SLOT ) );
    1986             :   }
    1987             : #endif
    1988           0 : }
    1989             : 
    1990             : void
    1991           0 : tpool_boot( fd_topo_t * topo, ulong total_thread_count ) {
    1992           0 :   ushort tile_to_cpu[ FD_TILE_MAX ] = { 0 };
    1993           0 :   ulong thread_count = 0;
    1994           0 :   ulong main_thread_seen = 0;
    1995             : 
    1996           0 :   for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
    1997           0 :     if( strcmp( topo->tiles[i].name, "rtpool" ) == 0 ) {
    1998           0 :       tile_to_cpu[ 1+thread_count ] = (ushort)topo->tiles[i].cpu_idx;
    1999           0 :       thread_count++;
    2000           0 :     }
    2001           0 :     if( strcmp( topo->tiles[i].name, "replay" ) == 0 ) {
    2002           0 :       tile_to_cpu[ 0 ] = (ushort)topo->tiles[i].cpu_idx;
    2003           0 :       main_thread_seen = 1;
    2004           0 :     }
    2005           0 :   }
    2006             : 
    2007           0 :   if( main_thread_seen ) {
    2008           0 :     thread_count++;
    2009           0 :   }
    2010             : 
    2011           0 :   if( thread_count != total_thread_count )
    2012           0 :     FD_LOG_ERR(( "thread count mismatch thread_count=%lu total_thread_count=%lu main_thread_seen=%lu", thread_count, total_thread_count, main_thread_seen ));
    2013             : 
    2014           0 :   fd_tile_private_map_boot( tile_to_cpu, thread_count );
    2015           0 : }
    2016             : 
    2017             : static void
    2018           0 : kickoff_repair_orphans( fd_replay_tile_ctx_t * ctx, fd_stem_context_t * stem ) {
    2019             : 
    2020           0 :   fd_blockstore_init( ctx->slot_ctx->blockstore, ctx->blockstore_fd, FD_BLOCKSTORE_ARCHIVE_MIN_SIZE, &ctx->slot_ctx->slot_bank );
    2021             : 
    2022           0 :   publish_stake_weights( ctx, stem, ctx->slot_ctx );
    2023           0 :   fd_fseq_update( ctx->published_wmark, ctx->slot_ctx->slot_bank.slot );
    2024             : 
    2025           0 : }
    2026             : 
    2027             : static void
    2028             : read_snapshot( void *              _ctx,
    2029             :                fd_stem_context_t * stem,
    2030             :                char const *        snapshotfile,
    2031           0 :                char const *        incremental ) {
    2032           0 :   fd_replay_tile_ctx_t * ctx = (fd_replay_tile_ctx_t *)_ctx;
    2033             : 
    2034           0 :   if( ctx->replay_plugin_out_mem ) {
    2035             :     // ValidatorStartProgress::DownloadingSnapshot
    2036           0 :     uchar msg[56];
    2037           0 :     fd_memset( msg, 0, sizeof(msg) );
    2038           0 :     msg[0] = 2;
    2039           0 :     msg[1] = 1;
    2040           0 :     replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
    2041           0 :   }
    2042             : 
    2043             :   /* Pass the slot_ctx to snapshot_load or recover_banks */
    2044             :   /* Base slot is the slot we will compare against the base slot of the incremental snapshot, to ensure that the
    2045             :      base slot of the incremental snapshot is the slot of the full snapshot.
    2046             : 
    2047             :      We pull this out of the full snapshot to use when verifying the incremental snapshot. */
    2048           0 :   ulong        base_slot = 0UL;
    2049           0 :   const char * snapshot  = snapshotfile;
    2050           0 :   if( strcmp( snapshot, "funk" )==0 || strncmp( snapshot, "wksp:", 5 )==0 ) {
    2051             :     /* Funk already has a snapshot loaded */
    2052           0 :     fd_runtime_recover_banks( ctx->slot_ctx, 1, 1, ctx->runtime_spad );
    2053           0 :     base_slot = ctx->slot_ctx->slot_bank.slot;
    2054           0 :     publish_stake_weights( ctx, stem, ctx->slot_ctx );
    2055           0 :     fd_fseq_update( ctx->published_wmark, ctx->slot_ctx->slot_bank.slot );
    2056           0 :   } else {
    2057             : 
    2058             :     /* If we have an incremental snapshot try to prefetch the snapshot slot
    2059             :        and manifest as soon as possible. In order to kick off repair effectively
    2060             :        we need the snapshot slot and the stake weights. These are both available
    2061             :        in the manifest. We will try to load in the manifest from the latest
    2062             :        snapshot that is availble, then setup the blockstore and publish the
    2063             :        stake weights. After this, repair will kick off concurrently with loading
    2064             :        the rest of the snapshots. */
    2065             : 
    2066             :     /* TODO: enable snapshot verification for all 3 snapshot loads */
    2067             : 
    2068           0 :     if( strlen( incremental )>0UL ) {
    2069           0 :       uchar *                  tmp_mem      = fd_spad_alloc( ctx->runtime_spad, fd_snapshot_load_ctx_align(), fd_snapshot_load_ctx_footprint() );
    2070             :       /* TODO: enable snapshot verification */
    2071           0 :       fd_snapshot_load_ctx_t * tmp_snap_ctx = fd_snapshot_load_new( tmp_mem,
    2072           0 :                                                                     incremental,
    2073           0 :                                                                     ctx->slot_ctx,
    2074           0 :                                                                     ctx->tpool,
    2075           0 :                                                                     false,
    2076           0 :                                                                     false,
    2077           0 :                                                                     FD_SNAPSHOT_TYPE_FULL,
    2078           0 :                                                                     ctx->exec_spads,
    2079           0 :                                                                     ctx->exec_spad_cnt,
    2080           0 :                                                                     ctx->runtime_spad );
    2081             :       /* Load the prefetch manifest, and initialize the status cache and slot context,
    2082             :          so that we can use these to kick off repair. */
    2083           0 :       fd_snapshot_load_prefetch_manifest( tmp_snap_ctx );
    2084           0 :       kickoff_repair_orphans( ctx, stem );
    2085           0 :     }
    2086             : 
    2087             :     /* In order to kick off repair effectively we need the snapshot slot and
    2088             :        the stake weights. These are both available in the manifest. We will
    2089             :        try to load in the manifest from the latest snapshot that is available,
    2090             :        then setup the blockstore and publish the stake weights. After this,
    2091             :        repair will kick off concurrently with loading the rest of the snapshots. */
    2092             : 
    2093           0 :     uchar *                  mem      = fd_spad_alloc( ctx->runtime_spad, fd_snapshot_load_ctx_align(), fd_snapshot_load_ctx_footprint() );
    2094             :     /* TODO: enable snapshot verification */
    2095           0 :     fd_snapshot_load_ctx_t * snap_ctx = fd_snapshot_load_new( mem,
    2096           0 :                                                               snapshot,
    2097           0 :                                                               ctx->slot_ctx,
    2098           0 :                                                               ctx->tpool,
    2099           0 :                                                               false,
    2100           0 :                                                               false,
    2101           0 :                                                               FD_SNAPSHOT_TYPE_FULL,
    2102           0 :                                                               ctx->exec_spads,
    2103           0 :                                                               ctx->exec_spad_cnt,
    2104           0 :                                                               ctx->runtime_spad );
    2105             : 
    2106           0 :     fd_snapshot_load_init( snap_ctx );
    2107             : 
    2108             :     /* If we don't have an incremental snapshot, load the manifest and the status cache and initialize
    2109             :          the objects because we don't have these from the incremental snapshot. */
    2110           0 :     if( strlen( incremental )<=0UL ) {
    2111           0 :       fd_snapshot_load_manifest_and_status_cache( snap_ctx, NULL,
    2112           0 :         FD_SNAPSHOT_RESTORE_MANIFEST | FD_SNAPSHOT_RESTORE_STATUS_CACHE );
    2113             : 
    2114             :       /* If we don't have an incremental snapshot, we can still kick off
    2115             :          sending the stake weights and snapshot slot to repair. */
    2116           0 :       kickoff_repair_orphans( ctx, stem );
    2117           0 :     } else {
    2118             :       /* If we have an incremental snapshot, load the manifest and the status cache,
    2119             :           and don't initialize the objects because we did this above from the incremental snapshot. */
    2120           0 :       fd_snapshot_load_manifest_and_status_cache( snap_ctx, NULL, FD_SNAPSHOT_RESTORE_NONE );
    2121           0 :     }
    2122           0 :     base_slot = fd_snapshot_get_slot( snap_ctx );
    2123             : 
    2124           0 :     fd_snapshot_load_accounts( snap_ctx );
    2125           0 :     fd_snapshot_load_fini( snap_ctx );
    2126           0 :   }
    2127             : 
    2128             :   /* Load incremental */
    2129             : 
    2130           0 :   if( ctx->replay_plugin_out_mem ) {
    2131             :     // ValidatorStartProgress::DownloadingSnapshot
    2132           0 :     uchar msg[56];
    2133           0 :     fd_memset( msg, 0, sizeof(msg) );
    2134           0 :     msg[0] = 2;
    2135           0 :     msg[1] = 0;
    2136           0 :     replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
    2137           0 :   }
    2138             : 
    2139           0 :   if( strlen( incremental ) > 0 && strcmp( snapshot, "funk" ) != 0 ) {
    2140             : 
    2141             :     /* The slot of the full snapshot should be used as the base slot to verify the incremental snapshot,
    2142             :        not the slot context's slot - which is the slot of the incremental, not the full snapshot. */
    2143             :     /* TODO: enable snapshot verification */
    2144           0 :     fd_snapshot_load_all( incremental,
    2145           0 :                           ctx->slot_ctx,
    2146           0 :                           &base_slot,
    2147           0 :                           ctx->tpool,
    2148           0 :                           false,
    2149           0 :                           false,
    2150           0 :                           FD_SNAPSHOT_TYPE_INCREMENTAL,
    2151           0 :                           ctx->exec_spads,
    2152           0 :                           ctx->exec_spad_cnt,
    2153           0 :                           ctx->runtime_spad );
    2154           0 :   }
    2155             : 
    2156           0 :   if( ctx->replay_plugin_out_mem ) {
    2157             :     // ValidatorStartProgress::DownloadedFullSnapshot
    2158           0 :     uchar msg[56];
    2159           0 :     fd_memset( msg, 0, sizeof(msg) );
    2160           0 :     msg[0] = 3;
    2161           0 :     replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
    2162           0 :   }
    2163             : 
    2164           0 :   fd_runtime_update_leaders( ctx->slot_ctx,
    2165           0 :                              ctx->slot_ctx->slot_bank.slot,
    2166           0 :                              ctx->runtime_spad );
    2167           0 :   FD_LOG_NOTICE(( "starting fd_bpf_scan_and_create_bpf_program_cache_entry..." ));
    2168           0 :   fd_funk_start_write( ctx->slot_ctx->acc_mgr->funk );
    2169           0 :   fd_bpf_scan_and_create_bpf_program_cache_entry_tpool( ctx->slot_ctx,
    2170           0 :                                                         ctx->slot_ctx->funk_txn,
    2171           0 :                                                         ctx->tpool,
    2172           0 :                                                         ctx->runtime_spad );
    2173           0 :   fd_funk_end_write( ctx->slot_ctx->acc_mgr->funk );
    2174           0 :   FD_LOG_NOTICE(( "finished fd_bpf_scan_and_create_bpf_program_cache_entry..." ));
    2175             : 
    2176           0 :   fd_blockstore_init( ctx->slot_ctx->blockstore,
    2177           0 :                       ctx->blockstore_fd,
    2178           0 :                       FD_BLOCKSTORE_ARCHIVE_MIN_SIZE,
    2179           0 :                       &ctx->slot_ctx->slot_bank );
    2180           0 : }
    2181             : 
    2182             : static void
    2183           0 : init_after_snapshot( fd_replay_tile_ctx_t * ctx ) {
    2184             :   /* Do not modify order! */
    2185             : 
    2186             :   /* First, load in the sysvars into the sysvar cache. This is required to
    2187             :      make the StakeHistory sysvar available to the rewards calculation. */
    2188             : 
    2189           0 :   fd_runtime_sysvar_cache_load( ctx->slot_ctx, ctx->runtime_spad );
    2190             : 
    2191             :   /* After both snapshots have been loaded in, we can determine if we should
    2192             :      start distributing rewards. */
    2193             : 
    2194           0 :   fd_rewards_recalculate_partitioned_rewards( ctx->slot_ctx,
    2195           0 :                                               ctx->tpool,
    2196           0 :                                               ctx->exec_spads,
    2197           0 :                                               ctx->exec_spad_cnt,
    2198           0 :                                               ctx->runtime_spad );
    2199             : 
    2200           0 :   ulong snapshot_slot = ctx->slot_ctx->slot_bank.slot;
    2201           0 :   if( FD_UNLIKELY( !snapshot_slot ) ) {
    2202           0 :     fd_runtime_update_leaders( ctx->slot_ctx,
    2203           0 :                                ctx->slot_ctx->slot_bank.slot,
    2204           0 :                                ctx->runtime_spad );
    2205             : 
    2206           0 :     ctx->slot_ctx->slot_bank.prev_slot = 0UL;
    2207           0 :     ctx->slot_ctx->slot_bank.slot      = 1UL;
    2208             : 
    2209           0 :     ulong hashcnt_per_slot = ctx->slot_ctx->epoch_ctx->epoch_bank.hashes_per_tick * ctx->slot_ctx->epoch_ctx->epoch_bank.ticks_per_slot;
    2210           0 :     while(hashcnt_per_slot--) {
    2211           0 :       fd_sha256_hash( ctx->slot_ctx->slot_bank.poh.uc, 32UL, ctx->slot_ctx->slot_bank.poh.uc );
    2212           0 :     }
    2213             : 
    2214           0 :     FD_TEST( fd_runtime_block_execute_prepare( ctx->slot_ctx, ctx->runtime_spad ) == 0 );
    2215           0 :     fd_runtime_block_info_t info = { .signature_cnt = 0 };
    2216           0 :     FD_TEST( fd_runtime_block_execute_finalize_tpool( ctx->slot_ctx,
    2217           0 :                                                       NULL,
    2218           0 :                                                       &info,
    2219           0 :                                                       ctx->tpool,
    2220           0 :                                                       ctx->runtime_spad ) == 0 );
    2221             : 
    2222           0 :     ctx->slot_ctx->slot_bank.prev_slot = 0UL;
    2223           0 :     ctx->slot_ctx->slot_bank.slot      = 1UL;
    2224           0 :     snapshot_slot                      = 1UL;
    2225             : 
    2226           0 :     FD_LOG_NOTICE(( "starting fd_bpf_scan_and_create_bpf_program_cache_entry..." ));
    2227           0 :     fd_funk_start_write( ctx->slot_ctx->acc_mgr->funk );
    2228           0 :     fd_bpf_scan_and_create_bpf_program_cache_entry_tpool( ctx->slot_ctx,
    2229           0 :                                                           ctx->slot_ctx->funk_txn,
    2230           0 :                                                           ctx->tpool,
    2231           0 :                                                           ctx->runtime_spad );
    2232           0 :     fd_funk_end_write( ctx->slot_ctx->acc_mgr->funk );
    2233           0 :     FD_LOG_NOTICE(( "finished fd_bpf_scan_and_create_bpf_program_cache_entry..." ));
    2234             : 
    2235           0 :   }
    2236             : 
    2237           0 :   ctx->curr_slot     = snapshot_slot;
    2238           0 :   ctx->parent_slot   = ctx->slot_ctx->slot_bank.prev_slot;
    2239           0 :   ctx->snapshot_slot = snapshot_slot;
    2240           0 :   ctx->blockhash     = ( fd_hash_t ){ .hash = { 0 } };
    2241           0 :   ctx->flags         = 0UL;
    2242           0 :   ctx->txn_cnt       = 0UL;
    2243             : 
    2244             :   /* Initialize consensus structures post-snapshot */
    2245             : 
    2246           0 :   fd_fork_t * snapshot_fork = fd_forks_init( ctx->forks, ctx->slot_ctx );
    2247           0 :   FD_TEST( snapshot_fork );
    2248           0 :   fd_epoch_init( ctx->epoch, &snapshot_fork->slot_ctx.epoch_ctx->epoch_bank );
    2249           0 :   fd_ghost_init( ctx->ghost, snapshot_slot );
    2250             : 
    2251           0 :   fd_funk_rec_key_t key = { 0 };
    2252           0 :   fd_memcpy( key.c, ctx->vote_acc, sizeof(fd_pubkey_t) );
    2253           0 :   key.c[FD_FUNK_REC_KEY_FOOTPRINT - 1] = FD_FUNK_KEY_TYPE_ACC;
    2254           0 :   fd_tower_from_vote_acc( ctx->tower, ctx->funk, snapshot_fork->slot_ctx.funk_txn, &key );
    2255           0 :   FD_LOG_NOTICE(( "vote account: %s", FD_BASE58_ENC_32_ALLOCA( key.c ) ));
    2256           0 :   fd_tower_print( ctx->tower, ctx->root );
    2257             : 
    2258           0 :   fd_bank_hash_cmp_t * bank_hash_cmp = ctx->epoch_ctx->bank_hash_cmp;
    2259           0 :   bank_hash_cmp->total_stake         = ctx->epoch->total_stake;
    2260           0 :   bank_hash_cmp->watermark           = snapshot_slot;
    2261             : 
    2262           0 :   fd_epoch_fork_elem_t * curr_entry = &ctx->epoch_forks->forks[ 0 ];
    2263             : 
    2264           0 :   if( strlen( ctx->genesis ) > 0 ) {
    2265           0 :     curr_entry->parent_slot = 0UL;
    2266           0 :     curr_entry->epoch       = 0UL;
    2267           0 :   } else {
    2268           0 :     fd_epoch_bank_t * epoch_bank = fd_exec_epoch_ctx_epoch_bank( ctx->epoch_ctx );
    2269             : 
    2270           0 :     ulong curr_epoch = fd_slot_to_epoch( &epoch_bank->epoch_schedule, ctx->curr_slot, NULL );
    2271           0 :     ulong last_slot_in_epoch = fd_ulong_sat_sub( fd_epoch_slot0( &epoch_bank->epoch_schedule, curr_epoch), 1UL );
    2272             : 
    2273           0 :     curr_entry->parent_slot = fd_ulong_min( ctx->parent_slot, last_slot_in_epoch );
    2274           0 :     curr_entry->epoch = curr_epoch;
    2275           0 :   }
    2276             : 
    2277           0 :   curr_entry->epoch_ctx = ctx->epoch_ctx;
    2278           0 :   ctx->epoch_forks->curr_epoch_idx = 0UL;
    2279             : 
    2280           0 :   FD_LOG_NOTICE(( "snapshot slot %lu", snapshot_slot ));
    2281           0 :   FD_LOG_NOTICE(( "total stake %lu", bank_hash_cmp->total_stake ));
    2282           0 : }
    2283             : 
    2284             : void
    2285             : init_snapshot( fd_replay_tile_ctx_t * ctx,
    2286           0 :                fd_stem_context_t *    stem ) {
    2287           0 :   FD_LOG_NOTICE(( "init snapshot" ));
    2288             :   /* Init slot_ctx */
    2289             : 
    2290           0 :   fd_exec_slot_ctx_t slot_ctx = {0};
    2291           0 :   ctx->slot_ctx               = fd_exec_slot_ctx_join( fd_exec_slot_ctx_new( &slot_ctx, ctx->runtime_spad ) );
    2292           0 :   ctx->slot_ctx->acc_mgr      = ctx->acc_mgr;
    2293           0 :   ctx->slot_ctx->blockstore   = ctx->blockstore;
    2294           0 :   ctx->slot_ctx->epoch_ctx    = ctx->epoch_ctx;
    2295           0 :   ctx->slot_ctx->status_cache = ctx->status_cache;
    2296           0 :   fd_runtime_update_slots_per_epoch( ctx->slot_ctx, FD_DEFAULT_SLOTS_PER_EPOCH, ctx->runtime_spad );
    2297             : 
    2298           0 :   uchar is_snapshot = strlen( ctx->snapshot ) > 0;
    2299           0 :   if( is_snapshot ) {
    2300           0 :     read_snapshot( ctx, stem, ctx->snapshot, ctx->incremental );
    2301           0 :   }
    2302             : 
    2303           0 :   fd_runtime_read_genesis( ctx->slot_ctx,
    2304           0 :                            ctx->genesis,
    2305           0 :                            is_snapshot,
    2306           0 :                            ctx->capture_ctx,
    2307           0 :                            ctx->tpool,
    2308           0 :                            ctx->runtime_spad );
    2309           0 :   ctx->epoch_ctx->bank_hash_cmp = ctx->bank_hash_cmp;
    2310           0 :   ctx->epoch_ctx->replay_public = ctx->replay_public;
    2311           0 :   init_after_snapshot( ctx );
    2312             : 
    2313             :   /* Redirect ctx->slot_ctx to point to the memory inside forks. */
    2314             : 
    2315           0 :   fd_fork_t * fork = fd_forks_query( ctx->forks, ctx->curr_slot );
    2316           0 :   ctx->slot_ctx = &fork->slot_ctx;
    2317             : 
    2318             :   // Tell the world about the current activate features
    2319           0 :   fd_memcpy ( &ctx->replay_public->features,  &ctx->slot_ctx->epoch_ctx->features, sizeof(ctx->replay_public->features) );
    2320             : 
    2321           0 :   FD_TEST( ctx->slot_ctx );
    2322           0 : }
    2323             : 
    2324             : static void
    2325             : publish_votes_to_plugin( fd_replay_tile_ctx_t * ctx,
    2326           0 :                          fd_stem_context_t *    stem ) {
    2327           0 :   uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->votes_plugin_out_mem, ctx->votes_plugin_out_chunk );
    2328             : 
    2329           0 :   fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &ctx->curr_slot, NULL, ctx->forks->pool );
    2330           0 :   if( FD_UNLIKELY ( !fork  ) ) return;
    2331           0 :   fd_vote_accounts_t * accts = &fork->slot_ctx.slot_bank.epoch_stakes;
    2332           0 :   fd_vote_accounts_pair_t_mapnode_t * root = accts->vote_accounts_root;
    2333           0 :   fd_vote_accounts_pair_t_mapnode_t * pool = accts->vote_accounts_pool;
    2334             : 
    2335           0 :   ulong i = 0;
    2336           0 :   for( fd_vote_accounts_pair_t_mapnode_t const * n = fd_vote_accounts_pair_t_map_minimum_const( pool, root );
    2337           0 :        n && i < FD_CLUSTER_NODE_CNT;
    2338           0 :        n = fd_vote_accounts_pair_t_map_successor_const( pool, n ) ) {
    2339           0 :     if( n->elem.stake == 0 ) continue;
    2340             : 
    2341             :     /* TODO: Define a helper that gets specific fields. */
    2342           0 :     fd_bincode_decode_ctx_t dec_ctx = {
    2343           0 :       .data    = n->elem.value.data,
    2344           0 :       .dataend = n->elem.value.data + n->elem.value.data_len,
    2345           0 :     };
    2346             : 
    2347           0 :     ulong total_sz = 0UL;
    2348           0 :     int err = fd_vote_state_versioned_decode_footprint( &dec_ctx, &total_sz );
    2349           0 :     if( FD_UNLIKELY( err ) ) {
    2350           0 :       FD_LOG_ERR(( "Unexpected failure in decoding vote state" ));
    2351           0 :     }
    2352             : 
    2353           0 :     uchar * mem = fd_spad_alloc( ctx->runtime_spad, fd_vote_state_versioned_align(), total_sz );
    2354           0 :     if( FD_UNLIKELY( !mem ) ) {
    2355           0 :       FD_LOG_ERR(( "Unable to allocate memory for memory" ));
    2356           0 :     }
    2357             : 
    2358           0 :     fd_vote_state_versioned_t * vsv = fd_vote_state_versioned_decode( mem, &dec_ctx );
    2359             : 
    2360           0 :     fd_pubkey_t node_pubkey;
    2361           0 :     ulong       last_ts_slot;
    2362           0 :     switch( vsv->discriminant ) {
    2363           0 :       case fd_vote_state_versioned_enum_v0_23_5:
    2364           0 :         node_pubkey  = vsv->inner.v0_23_5.node_pubkey;
    2365           0 :         last_ts_slot = vsv->inner.v0_23_5.last_timestamp.slot;
    2366           0 :         break;
    2367           0 :       case fd_vote_state_versioned_enum_v1_14_11:
    2368           0 :         node_pubkey  = vsv->inner.v1_14_11.node_pubkey;
    2369           0 :         last_ts_slot = vsv->inner.v1_14_11.last_timestamp.slot;
    2370           0 :         break;
    2371           0 :       case fd_vote_state_versioned_enum_current:
    2372           0 :         node_pubkey  = vsv->inner.current.node_pubkey;
    2373           0 :         last_ts_slot = vsv->inner.current.last_timestamp.slot;
    2374           0 :         break;
    2375           0 :       default:
    2376           0 :         __builtin_unreachable();
    2377           0 :     }
    2378             : 
    2379           0 :     fd_vote_update_msg_t * msg = (fd_vote_update_msg_t *)(dst + sizeof(ulong) + i*112U);
    2380           0 :     memset( msg, 0, 112U );
    2381           0 :     memcpy( msg->vote_pubkey, n->elem.key.uc, sizeof(fd_pubkey_t) );
    2382           0 :     memcpy( msg->node_pubkey, node_pubkey.uc, sizeof(fd_pubkey_t) );
    2383           0 :     msg->activated_stake = n->elem.stake;
    2384           0 :     msg->last_vote       = last_ts_slot;
    2385           0 :     msg->is_delinquent   = (uchar)(msg->last_vote == 0);
    2386           0 :     ++i;
    2387           0 :   }
    2388             : 
    2389           0 :   *(ulong *)dst = i;
    2390             : 
    2391           0 :   ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
    2392           0 :   fd_stem_publish( stem, ctx->votes_plug_out_idx, FD_PLUGIN_MSG_VOTE_ACCOUNT_UPDATE, ctx->votes_plugin_out_chunk, 0, 0UL, 0UL, tspub );
    2393           0 :   ctx->votes_plugin_out_chunk = fd_dcache_compact_next( ctx->votes_plugin_out_chunk, 8UL + 40200UL*(58UL+12UL*34UL), ctx->votes_plugin_out_chunk0, ctx->votes_plugin_out_wmark );
    2394           0 : }
    2395             : 
    2396             : /* after_credit runs on every iteration of the replay tile loop except
    2397             :    when backpressured.
    2398             : 
    2399             :    This callback spin-loops for whether the blockstore is ready to join.
    2400             :    We need to join a blockstore and load a snapshot before we can begin
    2401             :    replaying.
    2402             : 
    2403             :    store_int is responsible for initializing the blockstore (either by
    2404             :    calling new or restoring an existing one). Once the blockstore is
    2405             :    available in the wksp (discovered via tag_query), we join the
    2406             :    blockstore and load the snapshot. */
    2407             : static void
    2408             : after_credit( fd_replay_tile_ctx_t * ctx,
    2409             :               fd_stem_context_t *    stem,
    2410             :               int *                  opt_poll_in FD_PARAM_UNUSED,
    2411           0 :               int *                  charge_busy ) {
    2412           0 :   (void)opt_poll_in;
    2413             : 
    2414           0 :   exec_slices( ctx, stem, ctx->curr_slot );
    2415             : 
    2416           0 :   ulong curr_slot   = ctx->curr_slot;
    2417           0 :   ulong parent_slot = ctx->parent_slot;
    2418           0 :   ulong flags       = ctx->flags;
    2419           0 :   ulong bank_idx    = ctx->bank_idx;
    2420             : 
    2421           0 :   fd_fork_t * fork = fd_fork_frontier_ele_query( ctx->forks->frontier, &ctx->curr_slot, NULL, ctx->forks->pool );
    2422             : 
    2423           0 :   ulong                 txn_cnt  = ctx->txn_cnt;
    2424           0 :   fd_replay_out_ctx_t * bank_out = &ctx->bank_out[ bank_idx ];
    2425           0 :   fd_txn_p_t *          txns     = (fd_txn_p_t *)fd_chunk_to_laddr( bank_out->mem, bank_out->chunk );
    2426             :   /**********************************************************************/
    2427             :   /* Cleanup and handle consensus after replaying the whole block       */
    2428             :   /**********************************************************************/
    2429             : 
    2430           0 :   if( FD_UNLIKELY( (flags & REPLAY_FLAG_FINISHED_BLOCK) && ( ctx->last_completed_slot != curr_slot )) ) {
    2431           0 :     fork->slot_ctx.txn_count = fork->slot_ctx.slot_bank.transaction_count-fork->slot_ctx.parent_transaction_count;
    2432           0 :     FD_LOG_WARNING(( "finished block - slot: %lu, parent_slot: %lu, txn_cnt: %lu, blockhash: %s",
    2433           0 :                   curr_slot,
    2434           0 :                   ctx->parent_slot,
    2435           0 :                   fork->slot_ctx.txn_count,
    2436           0 :                   FD_BASE58_ENC_32_ALLOCA( ctx->blockhash.uc ) ));
    2437           0 :     ctx->last_completed_slot = curr_slot;
    2438             : 
    2439             :     /**************************************************************************************************/
    2440             :     /* Call fd_runtime_block_execute_finalize_tpool which updates sysvar and cleanup some other stuff */
    2441             :     /**************************************************************************************************/
    2442             : 
    2443           0 :     fd_runtime_block_info_t runtime_block_info[1];
    2444           0 :     runtime_block_info->signature_cnt = fork->slot_ctx.signature_cnt;
    2445             : 
    2446             :     /* Destroy the slot history */
    2447           0 :     fd_slot_history_destroy( fork->slot_ctx.slot_history );
    2448           0 :     for( ulong i = 0UL; i<ctx->bank_cnt; i++ ) {
    2449           0 :       fd_tpool_wait( ctx->tpool, i+1 );
    2450           0 :     }
    2451             : 
    2452           0 :     int res = fd_runtime_block_execute_finalize_tpool( &fork->slot_ctx, ctx->capture_ctx, runtime_block_info, ctx->tpool, ctx->runtime_spad );
    2453           0 :     if( res != FD_RUNTIME_EXECUTE_SUCCESS ) {
    2454           0 :       FD_LOG_ERR(( "block finished failed" ));
    2455           0 :     }
    2456             : 
    2457           0 :     fd_spad_pop( ctx->runtime_spad );
    2458           0 :     FD_LOG_NOTICE(( "Spad memory after executing block %lu", ctx->runtime_spad->mem_used ));
    2459             :     /**********************************************************************/
    2460             :     /* Push notifications for slot updates and reset block_info flag */
    2461             :     /**********************************************************************/
    2462             : 
    2463           0 :     ulong block_entry_height = 0;
    2464           0 :     for(;;){
    2465           0 :       fd_block_map_query_t query[1] = { 0 };
    2466           0 :       int err = fd_block_map_query_try( ctx->blockstore->block_map, &curr_slot, NULL, query, FD_MAP_FLAG_BLOCKING );
    2467           0 :       fd_block_info_t * block_info = fd_block_map_query_ele( query );
    2468           0 :       if( FD_UNLIKELY( err == FD_MAP_ERR_KEY   ) ) FD_LOG_ERR(( "Failed to query blockstore for slot %lu", curr_slot ));
    2469           0 :       if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
    2470           0 :       block_entry_height = block_info->block_height;
    2471           0 :       if( FD_UNLIKELY( fd_block_map_query_test( query ) == FD_MAP_SUCCESS ) ) break;
    2472           0 :     }
    2473             : 
    2474           0 :     publish_slot_notifications( ctx, stem, fork, block_entry_height, curr_slot );
    2475             : 
    2476           0 :     ctx->blockstore->shmem->lps = curr_slot;
    2477             : 
    2478             :     /**********************************************************************/
    2479             :     /* Unlock the fork meaning that execution of the fork is now complete */
    2480             :     /**********************************************************************/
    2481           0 :     FD_TEST(fork->slot == curr_slot);
    2482           0 :     fork->lock = 0;
    2483             : 
    2484             :     /**********************************************************************/
    2485             :     /* Consensus: update ghost and forks                                  */
    2486             :     /**********************************************************************/
    2487             : 
    2488           0 :     FD_PARAM_UNUSED long tic_ = fd_log_wallclock();
    2489           0 :     fd_ghost_node_t const * ghost_node = fd_ghost_insert( ctx->ghost, parent_slot, curr_slot );
    2490           0 : #if FD_GHOST_USE_HANDHOLDING
    2491           0 :     if( FD_UNLIKELY( !ghost_node ) ) {
    2492           0 :       FD_LOG_ERR(( "failed to insert ghost node %lu", fork->slot ));
    2493           0 :     }
    2494           0 : #endif
    2495           0 :     fd_forks_update( ctx->forks, ctx->epoch, ctx->funk, ctx->ghost, fork->slot );
    2496             : 
    2497             :     /**********************************************************************/
    2498             :     /* Consensus: decide (1) the fork for pack; (2) the fork to vote on   */
    2499             :     /**********************************************************************/
    2500             : 
    2501           0 :     ulong reset_slot = fd_tower_reset_slot( ctx->tower, ctx->epoch, ctx->ghost );
    2502           0 :     fd_fork_t const * reset_fork = fd_forks_query_const( ctx->forks, reset_slot );
    2503           0 :     if( FD_UNLIKELY( !reset_fork ) ) {
    2504           0 :       FD_LOG_ERR( ( "failed to find reset fork %lu", reset_slot ) );
    2505           0 :     }
    2506           0 :     if( reset_fork->lock ) {
    2507           0 :       FD_LOG_WARNING(("RESET FORK FROZEN: %lu", reset_fork->slot ));
    2508           0 :       fd_fork_t * new_reset_fork = fd_forks_prepare( ctx->forks, reset_fork->slot_ctx.slot_bank.prev_slot, ctx->acc_mgr,
    2509           0 :                                                      ctx->blockstore, ctx->epoch_ctx, ctx->funk, ctx->runtime_spad );
    2510           0 :       new_reset_fork->lock = 0;
    2511           0 :       reset_fork = new_reset_fork;
    2512           0 :     }
    2513             : 
    2514             :     /* Update the gui */
    2515           0 :     if( ctx->replay_plugin_out_mem ) {
    2516             :       /* FIXME. We need a more efficient way to compute the ancestor chain. */
    2517           0 :       uchar msg[4098*8] __attribute__( ( aligned( 8U ) ) );
    2518           0 :       fd_memset( msg, 0, sizeof(msg) );
    2519           0 :       ulong s = reset_fork->slot_ctx.slot_bank.slot;
    2520           0 :       *(ulong*)(msg + 16U) = s;
    2521           0 :       ulong i = 0;
    2522           0 :       do {
    2523           0 :         if( !fd_blockstore_block_info_test( ctx->blockstore, s ) ) {
    2524           0 :           break;
    2525           0 :         }
    2526           0 :         s = fd_blockstore_parent_slot_query( ctx->blockstore, s );
    2527           0 :         if( s < ctx->blockstore->shmem->wmk ) {
    2528           0 :           break;
    2529           0 :         }
    2530             : 
    2531           0 :         *(ulong*)(msg + 24U + i*8U) = s;
    2532           0 :         if( ++i == 4095U ) {
    2533           0 :           break;
    2534           0 :         }
    2535           0 :       } while( 1 );
    2536           0 :       *(ulong*)(msg + 8U) = i;
    2537           0 :       replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_SLOT_RESET, msg, sizeof(msg) );
    2538           0 :     }
    2539             : 
    2540           0 :     fd_microblock_trailer_t * microblock_trailer = (fd_microblock_trailer_t *)(txns + txn_cnt);
    2541           0 :     memcpy( microblock_trailer->hash, reset_fork->slot_ctx.slot_bank.block_hash_queue.last_hash->uc, sizeof(fd_hash_t) );
    2542           0 :     if( ctx->poh_init_done == 1 ) {
    2543           0 :       ulong parent_slot = reset_fork->slot_ctx.slot_bank.prev_slot;
    2544           0 :       ulong curr_slot = reset_fork->slot_ctx.slot_bank.slot;
    2545           0 :       FD_LOG_DEBUG(( "publishing mblk to poh - slot: %lu, parent_slot: %lu, flags: %lx", curr_slot, parent_slot, flags ));
    2546           0 :       ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
    2547           0 :       ulong sig = fd_disco_replay_old_sig( curr_slot, flags );
    2548           0 :       fd_mcache_publish( bank_out->mcache, bank_out->depth, bank_out->seq, sig, bank_out->chunk, txn_cnt, 0UL, 0, tspub );
    2549           0 :       bank_out->chunk = fd_dcache_compact_next( bank_out->chunk, (txn_cnt * sizeof(fd_txn_p_t)) + sizeof(fd_microblock_trailer_t), bank_out->chunk0, bank_out->wmark );
    2550           0 :       bank_out->seq = fd_seq_inc( bank_out->seq, 1UL );
    2551           0 :     } else {
    2552           0 :       FD_LOG_DEBUG(( "NOT publishing mblk to poh - slot: %lu, parent_slot: %lu, flags: %lx", curr_slot, ctx->parent_slot, flags ));
    2553           0 :     }
    2554             : 
    2555           0 :     fd_forks_print( ctx->forks );
    2556           0 :     fd_ghost_print( ctx->ghost, ctx->epoch, fd_ghost_root( ctx->ghost ) );
    2557           0 :     fd_tower_print( ctx->tower, ctx->root );
    2558             : 
    2559           0 :     fd_fork_t * child = fd_fork_frontier_ele_query( ctx->forks->frontier, &fork->slot, NULL, ctx->forks->pool );
    2560           0 :     ulong vote_slot   = fd_tower_vote_slot( ctx->tower, ctx->epoch, ctx->funk, child->slot_ctx.funk_txn, ctx->ghost, ctx->runtime_spad );
    2561             : 
    2562           0 :     FD_LOG_NOTICE( ( "\n\n[Fork Selection]\n"
    2563           0 :                      "# of vote accounts: %lu\n"
    2564           0 :                      "best fork:          %lu\n",
    2565           0 :                      fd_epoch_voters_key_cnt( fd_epoch_voters( ctx->epoch ) ),
    2566           0 :                      fd_ghost_head( ctx->ghost, fd_ghost_root( ctx->ghost ) )->slot ) );
    2567             : 
    2568             :     /**********************************************************************/
    2569             :     /* Consensus: send out a new vote by calling send_tower_sync          */
    2570             :     /**********************************************************************/
    2571             : 
    2572           0 :     if( FD_UNLIKELY( ctx->vote && fd_fseq_query( ctx->poh ) == ULONG_MAX ) ) {
    2573             :       /* Only proceed with voting if we're caught up. */
    2574             : 
    2575           0 :       FD_LOG_WARNING(( "still catching up. not voting." ));
    2576           0 :     } else {
    2577           0 :       if( FD_UNLIKELY( !ctx->is_caught_up ) ) {
    2578           0 :         ctx->is_caught_up = 1;
    2579           0 :       }
    2580             : 
    2581             :       /* Proceed according to how local and cluster are synchronized. */
    2582             : 
    2583           0 :       if( FD_LIKELY( vote_slot != FD_SLOT_NULL ) ) {
    2584             : 
    2585             :         /* Invariant check: the vote_slot must be in the frontier */
    2586             : 
    2587           0 :         FD_TEST( fd_forks_query_const( ctx->forks, vote_slot ) );
    2588             : 
    2589             :         /* Vote locally */
    2590             : 
    2591           0 :         ulong root = fd_tower_vote( ctx->tower, vote_slot );
    2592           0 :         ctx->metrics.last_voted_slot = vote_slot;
    2593             : 
    2594             :         /* Update to a new root, if there is one. */
    2595             : 
    2596           0 :         if ( FD_LIKELY ( root != FD_SLOT_NULL ) ) ctx->root = root; /* optimize for full tower (replay is keeping up) */
    2597           0 :       }
    2598             : 
    2599             :       /* Send our updated tower to the cluster. */
    2600             : 
    2601           0 :       send_tower_sync( ctx );
    2602           0 :     }
    2603             : 
    2604             :     /**********************************************************************/
    2605             :     /* Prepare bank for the next execution and write to debugging files   */
    2606             :     /**********************************************************************/
    2607             : 
    2608           0 :     ulong prev_slot = child->slot_ctx.slot_bank.slot;
    2609           0 :     child->slot_ctx.slot_bank.slot           = curr_slot;
    2610           0 :     child->slot_ctx.slot_bank.collected_execution_fees = 0;
    2611           0 :     child->slot_ctx.slot_bank.collected_priority_fees = 0;
    2612           0 :     child->slot_ctx.slot_bank.collected_rent = 0;
    2613             : 
    2614           0 :     if( FD_UNLIKELY( ctx->slots_replayed_file ) ) {
    2615           0 :       FD_LOG_DEBUG(( "writing %lu to slots file", prev_slot ));
    2616           0 :       fprintf( ctx->slots_replayed_file, "%lu\n", prev_slot );
    2617           0 :       fflush( ctx->slots_replayed_file );
    2618           0 :     }
    2619             : 
    2620           0 :     if (NULL != ctx->capture_ctx) {
    2621           0 :       fd_solcap_writer_flush( ctx->capture_ctx->capture );
    2622           0 :     }
    2623             : 
    2624             :     /**********************************************************************/
    2625             :     /* Bank hash comparison, and halt if there's a mismatch after replay  */
    2626             :     /**********************************************************************/
    2627             : 
    2628           0 :     fd_hash_t const * bank_hash = &child->slot_ctx.slot_bank.banks_hash;
    2629           0 :     fd_bank_hash_cmp_t * bank_hash_cmp = child->slot_ctx.epoch_ctx->bank_hash_cmp;
    2630           0 :     fd_bank_hash_cmp_lock( bank_hash_cmp );
    2631           0 :     fd_bank_hash_cmp_insert( bank_hash_cmp, curr_slot, bank_hash, 1, 0 );
    2632             : 
    2633             :     /* Try to move the bank hash comparison watermark forward */
    2634           0 :     for( ulong cmp_slot = bank_hash_cmp->watermark + 1; cmp_slot < curr_slot; cmp_slot++ ) {
    2635           0 :       int rc = fd_bank_hash_cmp_check( bank_hash_cmp, cmp_slot );
    2636           0 :       switch ( rc ) {
    2637           0 :         case -1:
    2638             : 
    2639             :           /* Mismatch */
    2640             : 
    2641           0 :           funk_cancel( ctx, cmp_slot );
    2642           0 :           checkpt( ctx );
    2643           0 :           FD_LOG_ERR(( "Bank hash mismatch on slot: %lu. Halting.", cmp_slot ));
    2644             : 
    2645           0 :           break;
    2646             : 
    2647           0 :         case 0:
    2648             : 
    2649             :           /* Not ready */
    2650             : 
    2651           0 :           break;
    2652             : 
    2653           0 :         case 1:
    2654             : 
    2655             :           /* Match*/
    2656             : 
    2657           0 :           bank_hash_cmp->watermark = cmp_slot;
    2658           0 :           break;
    2659             : 
    2660           0 :         default:;
    2661           0 :       }
    2662           0 :     }
    2663             : 
    2664           0 :     fd_bank_hash_cmp_unlock( bank_hash_cmp );
    2665           0 :   } // end of if( FD_UNLIKELY( ( flags & REPLAY_FLAG_FINISHED_BLOCK ) ) )
    2666             : 
    2667           0 :   if( FD_UNLIKELY( ctx->snapshot_init_done==0 ) ) {
    2668           0 :     init_snapshot( ctx, stem );
    2669           0 :     ctx->snapshot_init_done = 1;
    2670           0 :     *charge_busy = 1;
    2671           0 :     if( ctx->replay_plugin_out_mem ) {
    2672             :       // ValidatorStartProgress::Running
    2673           0 :       uchar msg[56];
    2674           0 :       fd_memset( msg, 0, sizeof(msg) );
    2675           0 :       msg[0] = 11;
    2676           0 :       replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
    2677           0 :     }
    2678           0 :   }
    2679             : 
    2680           0 :   long now = fd_log_wallclock();
    2681           0 :   if( ctx->votes_plugin_out_mem && FD_UNLIKELY( ( now - ctx->last_plugin_push_time )>PLUGIN_PUBLISH_TIME_NS ) ) {
    2682           0 :     ctx->last_plugin_push_time = now;
    2683           0 :     publish_votes_to_plugin( ctx, stem );
    2684           0 :   }
    2685             : 
    2686           0 : }
    2687             : 
    2688             : static void
    2689           0 : during_housekeeping( void * _ctx ) {
    2690             : 
    2691           0 :   fd_replay_tile_ctx_t * ctx = (fd_replay_tile_ctx_t *)_ctx;
    2692             : 
    2693             :   /* Update watermark. The publish watermark is the minimum of the tower
    2694             :      root and supermajority root. */
    2695             : 
    2696           0 :   ulong wmark = fd_ulong_min( ctx->root, ctx->forks->finalized );
    2697             : 
    2698           0 :   if ( FD_LIKELY( wmark <= fd_fseq_query( ctx->published_wmark ) ) ) return;
    2699           0 :   FD_LOG_NOTICE(( "wmk %lu => %lu", fd_fseq_query( ctx->published_wmark ), wmark ));
    2700             : 
    2701           0 :   fd_funk_txn_xid_t xid = { .ul = { wmark, wmark } };
    2702           0 :   if( FD_LIKELY( ctx->blockstore ) ) fd_blockstore_publish( ctx->blockstore, ctx->blockstore_fd, wmark );
    2703           0 :   if( FD_LIKELY( ctx->forks ) ) fd_forks_publish( ctx->forks, wmark, ctx->ghost );
    2704           0 :   if( FD_LIKELY( ctx->funk ) ) funk_and_txncache_publish( ctx, wmark, &xid );
    2705           0 :   if( FD_LIKELY( ctx->ghost ) ) {
    2706           0 :     fd_epoch_forks_publish( ctx->epoch_forks, ctx->ghost, wmark );
    2707           0 :     fd_ghost_publish( ctx->ghost, wmark );
    2708           0 :   }
    2709             : 
    2710           0 :   fd_fseq_update( ctx->published_wmark, wmark );
    2711             : 
    2712             : 
    2713             :   // fd_mcache_seq_update( ctx->store_out_sync, ctx->store_out_seq );
    2714           0 : }
    2715             : 
    2716             : static void
    2717             : privileged_init( fd_topo_t *      topo,
    2718           0 :                  fd_topo_tile_t * tile ) {
    2719           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
    2720             : 
    2721           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
    2722           0 :   fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
    2723           0 :   FD_SCRATCH_ALLOC_FINI  ( l, scratch_align() );
    2724           0 :   memset( ctx, 0, sizeof(fd_replay_tile_ctx_t) );
    2725             : 
    2726           0 :   FD_TEST( sizeof(ulong) == getrandom( &ctx->funk_seed, sizeof(ulong), 0 ) );
    2727           0 :   FD_TEST( sizeof(ulong) == getrandom( &ctx->status_cache_seed, sizeof(ulong), 0 ) );
    2728             : 
    2729           0 :   ctx->blockstore_fd = open( tile->replay.blockstore_file, O_RDWR | O_CREAT, 0666 );
    2730           0 :   if ( FD_UNLIKELY( ctx->blockstore_fd == -1 ) ) {
    2731           0 :     FD_LOG_ERR(( "failed to open or create blockstore archival file %s %d %d %s", tile->replay.blockstore_file, ctx->blockstore_fd, errno, strerror(errno) ));
    2732           0 :   }
    2733           0 : }
    2734             : 
    2735             : static void
    2736             : unprivileged_init( fd_topo_t *      topo,
    2737           0 :                    fd_topo_tile_t * tile ) {
    2738             : 
    2739           0 :   FD_LOG_NOTICE(("finished unprivileged init"));
    2740           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
    2741             : 
    2742           0 :   if( FD_UNLIKELY( tile->in_cnt < 4 ||
    2743           0 :                    strcmp( topo->links[ tile->in_link_id[ STORE_IN_IDX  ] ].name, "store_replay" ) ||
    2744           0 :                    strcmp( topo->links[ tile->in_link_id[ PACK_IN_IDX ] ].name, "pack_replay")   ||
    2745           0 :                    strcmp( topo->links[ tile->in_link_id[ BATCH_IN_IDX  ] ].name, "batch_replay" ) ||
    2746           0 :                    strcmp( topo->links[ tile->in_link_id[ SHRED_IN_IDX  ] ].name, "shred_replay" ) ) ) {
    2747           0 :     FD_LOG_ERR(( "replay tile has none or unexpected input links %lu %s %s",
    2748           0 :                  tile->in_cnt, topo->links[ tile->in_link_id[ 0 ] ].name, topo->links[ tile->in_link_id[ 1 ] ].name ));
    2749           0 :   }
    2750             : 
    2751             :   /**********************************************************************/
    2752             :   /* scratch (bump)-allocate memory owned by the replay tile            */
    2753             :   /**********************************************************************/
    2754             : 
    2755             :   /* Do not modify order! This is join-order in unprivileged_init. */
    2756             : 
    2757           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
    2758           0 :   fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
    2759           0 :   void * alloc_shmem         = FD_SCRATCH_ALLOC_APPEND( l, fd_alloc_align(), fd_alloc_footprint() );
    2760           0 :   void * capture_ctx_mem     = FD_SCRATCH_ALLOC_APPEND( l, FD_CAPTURE_CTX_ALIGN, FD_CAPTURE_CTX_FOOTPRINT );
    2761           0 :   void * epoch_mem           = FD_SCRATCH_ALLOC_APPEND( l, fd_epoch_align(), fd_epoch_footprint( FD_VOTER_MAX ) );
    2762           0 :   void * forks_mem           = FD_SCRATCH_ALLOC_APPEND( l, fd_forks_align(), fd_forks_footprint( FD_BLOCK_MAX ) );
    2763           0 :   void * ghost_mem           = FD_SCRATCH_ALLOC_APPEND( l, fd_ghost_align(), fd_ghost_footprint( FD_BLOCK_MAX ) );
    2764           0 :   void * tower_mem           = FD_SCRATCH_ALLOC_APPEND( l, fd_tower_align(), fd_tower_footprint() );
    2765           0 :   void * replay_mem          = FD_SCRATCH_ALLOC_APPEND( l, fd_replay_align(), fd_replay_footprint( tile->replay.fec_max, FD_SHRED_MAX_PER_SLOT, FD_BLOCK_MAX ) );
    2766           0 :   void * bank_hash_cmp_mem   = FD_SCRATCH_ALLOC_APPEND( l, fd_bank_hash_cmp_align(), fd_bank_hash_cmp_footprint( ) );
    2767           0 :   for( ulong i = 0UL; i<FD_PACK_MAX_BANK_TILES; i++ ) {
    2768           0 :     ctx->bmtree[i]           = FD_SCRATCH_ALLOC_APPEND( l, FD_BMTREE_COMMIT_ALIGN, FD_BMTREE_COMMIT_FOOTPRINT(0) );
    2769           0 :   }
    2770           0 :   void * mbatch_mem          = FD_SCRATCH_ALLOC_APPEND( l, 128UL, FD_SLICE_MAX );
    2771           0 :   ulong  thread_spad_size    = fd_spad_footprint( FD_RUNTIME_TRANSACTION_EXECUTION_FOOTPRINT_DEFAULT );
    2772           0 :   void * spad_mem            = FD_SCRATCH_ALLOC_APPEND( l, fd_spad_align(), tile->replay.tpool_thread_count * fd_ulong_align_up( thread_spad_size, fd_spad_align() ) + FD_RUNTIME_BLOCK_EXECUTION_FOOTPRINT );
    2773           0 :   ulong  scratch_alloc_mem   = FD_SCRATCH_ALLOC_FINI  ( l, scratch_align() );
    2774             : 
    2775           0 :   if( FD_UNLIKELY( scratch_alloc_mem != ( (ulong)scratch + scratch_footprint( tile ) ) ) ) {
    2776           0 :     FD_LOG_ERR( ( "scratch_alloc_mem did not match scratch_footprint diff: %lu alloc: %lu footprint: %lu",
    2777           0 :           scratch_alloc_mem - (ulong)scratch - scratch_footprint( tile ),
    2778           0 :           scratch_alloc_mem,
    2779           0 :           (ulong)scratch + scratch_footprint( tile ) ) );
    2780           0 :   }
    2781             : 
    2782             :   /**********************************************************************/
    2783             :   /* wksp                                                               */
    2784             :   /**********************************************************************/
    2785             : 
    2786           0 :   ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
    2787             : 
    2788           0 :   ulong blockstore_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "blockstore" );
    2789           0 :   FD_TEST( blockstore_obj_id!=ULONG_MAX );
    2790           0 :   ctx->blockstore_wksp = topo->workspaces[ topo->objs[ blockstore_obj_id ].wksp_id ].wksp;
    2791           0 :   if( ctx->blockstore_wksp==NULL ) {
    2792           0 :     FD_LOG_ERR(( "no blockstore wksp" ));
    2793           0 :   }
    2794             : 
    2795           0 :   ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, fd_topo_obj_laddr( topo, blockstore_obj_id ) );
    2796           0 :   fd_buf_shred_pool_reset( ctx->blockstore->shred_pool, 0 );
    2797           0 :   FD_TEST( ctx->blockstore->shmem->magic == FD_BLOCKSTORE_MAGIC );
    2798             : 
    2799           0 :   ulong status_cache_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "txncache" );
    2800           0 :   FD_TEST( status_cache_obj_id != ULONG_MAX );
    2801           0 :   ctx->status_cache_wksp = topo->workspaces[topo->objs[status_cache_obj_id].wksp_id].wksp;
    2802           0 :   if( ctx->status_cache_wksp == NULL ) {
    2803           0 :     FD_LOG_ERR(( "no status cache wksp" ));
    2804           0 :   }
    2805             : 
    2806             :   /**********************************************************************/
    2807             :   /* snapshot                                                           */
    2808             :   /**********************************************************************/
    2809             : 
    2810           0 :   ctx->snapshot_interval    = tile->replay.full_interval ? tile->replay.full_interval : ULONG_MAX;
    2811           0 :   ctx->incremental_interval = tile->replay.incremental_interval ? tile->replay.incremental_interval : ULONG_MAX;
    2812           0 :   ctx->last_full_snap       = 0UL;
    2813             : 
    2814           0 :   FD_LOG_NOTICE(( "Snapshot intervals full=%lu incremental=%lu", ctx->snapshot_interval, ctx->incremental_interval ));
    2815             : 
    2816             :   /**********************************************************************/
    2817             :   /* funk                                                               */
    2818             :   /**********************************************************************/
    2819             : 
    2820             :   /* TODO: This below code needs to be shared as a topology object. This
    2821             :      will involve adding support to create a funk-based file here. */
    2822           0 :   fd_funk_t * funk;
    2823           0 :   const char * snapshot = tile->replay.snapshot;
    2824           0 :   if( strcmp( snapshot, "funk" ) == 0 ) {
    2825             :     /* Funk database already exists. The parameters are actually mostly ignored. */
    2826           0 :     funk = fd_funk_open_file(
    2827           0 :       tile->replay.funk_file, 1, ctx->funk_seed, tile->replay.funk_txn_max,
    2828           0 :         tile->replay.funk_rec_max, tile->replay.funk_sz_gb * (1UL<<30),
    2829           0 :         FD_FUNK_READ_WRITE, NULL );
    2830           0 :   } else if( strncmp( snapshot, "wksp:", 5 ) == 0) {
    2831             :     /* Recover funk database from a checkpoint. */
    2832           0 :     funk = fd_funk_recover_checkpoint( tile->replay.funk_file, 1, snapshot+5, NULL );
    2833           0 :   } else {
    2834             :     /* Create new funk database */
    2835           0 :     funk = fd_funk_open_file(
    2836           0 :       tile->replay.funk_file, 1, ctx->funk_seed, tile->replay.funk_txn_max,
    2837           0 :         tile->replay.funk_rec_max, tile->replay.funk_sz_gb * (1UL<<30),
    2838           0 :         FD_FUNK_OVERWRITE, NULL );
    2839           0 :     FD_LOG_NOTICE(( "Opened funk file at %s", tile->replay.funk_file ));
    2840           0 :   }
    2841           0 :   if( FD_UNLIKELY( funk == NULL ) ) {
    2842           0 :     FD_LOG_ERR(( "no funk loaded" ));
    2843           0 :   }
    2844           0 :   ctx->funk = funk;
    2845           0 :   ctx->funk_wksp = fd_funk_wksp( funk );
    2846           0 :   if( FD_UNLIKELY( ctx->funk_wksp == NULL ) ) {
    2847           0 :     FD_LOG_ERR(( "no funk wksp" ));
    2848           0 :   }
    2849             : 
    2850           0 :   ctx->is_caught_up = 0;
    2851             : 
    2852             :   /**********************************************************************/
    2853             :   /* root_slot fseq                                                     */
    2854             :   /**********************************************************************/
    2855             : 
    2856           0 :   ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
    2857           0 :   FD_TEST( root_slot_obj_id!=ULONG_MAX );
    2858           0 :   ctx->published_wmark = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
    2859           0 :   if( FD_UNLIKELY( !ctx->published_wmark ) ) FD_LOG_ERR(( "replay tile has no root_slot fseq" ));
    2860           0 :   FD_TEST( ULONG_MAX==fd_fseq_query( ctx->published_wmark ) );
    2861             : 
    2862             :   /**********************************************************************/
    2863             :   /* constipated fseq                                                   */
    2864             :   /**********************************************************************/
    2865             : 
    2866             :   /* When the replay tile boots, funk should not be constipated */
    2867             : 
    2868           0 :   ulong constipated_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "constipate" );
    2869           0 :   FD_TEST( constipated_obj_id!=ULONG_MAX );
    2870           0 :   ctx->is_constipated = fd_fseq_join( fd_topo_obj_laddr( topo, constipated_obj_id ) );
    2871           0 :   if( FD_UNLIKELY( !ctx->is_constipated ) ) FD_LOG_ERR(( "replay tile has no constipated fseq" ));
    2872           0 :   fd_fseq_update( ctx->is_constipated, 0UL );
    2873           0 :   FD_TEST( 0UL==fd_fseq_query( ctx->is_constipated ) );
    2874             : 
    2875             :   /**********************************************************************/
    2876             :   /* poh_slot fseq                                                     */
    2877             :   /**********************************************************************/
    2878             : 
    2879           0 :   ulong poh_slot_obj_id = fd_pod_query_ulong( topo->props, "poh_slot", ULONG_MAX );
    2880           0 :   FD_TEST( poh_slot_obj_id!=ULONG_MAX );
    2881           0 :   ctx->poh = fd_fseq_join( fd_topo_obj_laddr( topo, poh_slot_obj_id ) );
    2882             : 
    2883             :   /**********************************************************************/
    2884             :   /* TOML paths                                                         */
    2885             :   /**********************************************************************/
    2886             : 
    2887           0 :   ctx->blockstore_checkpt  = tile->replay.blockstore_checkpt;
    2888           0 :   ctx->tx_metadata_storage = tile->replay.tx_metadata_storage;
    2889           0 :   ctx->funk_checkpt        = tile->replay.funk_checkpt;
    2890           0 :   ctx->genesis             = tile->replay.genesis;
    2891           0 :   ctx->incremental         = tile->replay.incremental;
    2892           0 :   ctx->snapshot            = tile->replay.snapshot;
    2893             : 
    2894             :   /**********************************************************************/
    2895             :   /* alloc                                                              */
    2896             :   /**********************************************************************/
    2897             : 
    2898           0 :   void * alloc_shalloc = fd_alloc_new( alloc_shmem, 3UL );
    2899           0 :   if( FD_UNLIKELY( !alloc_shalloc ) ) {
    2900           0 :     FD_LOG_ERR( ( "fd_alloc_new failed" ) ); }
    2901           0 :   ctx->alloc = fd_alloc_join( alloc_shalloc, 3UL );
    2902           0 :   if( FD_UNLIKELY( !ctx->alloc ) ) {
    2903           0 :     FD_LOG_ERR( ( "fd_alloc_join failed" ) );
    2904           0 :   }
    2905             : 
    2906             :   /**********************************************************************/
    2907             :   /* status cache                                                       */
    2908             :   /**********************************************************************/
    2909             : 
    2910           0 :   char const * status_cache_path = tile->replay.status_cache;
    2911           0 :   if ( strlen( status_cache_path ) > 0 ) {
    2912           0 :     FD_LOG_NOTICE(("starting status cache restore..."));
    2913           0 :     int err = fd_wksp_restore( ctx->status_cache_wksp, status_cache_path, (uint)ctx->status_cache_seed );
    2914           0 :     FD_LOG_NOTICE(("finished status cache restore..."));
    2915           0 :     if (err) {
    2916           0 :       FD_LOG_ERR(( "failed to restore %s: error %d", status_cache_path, err ));
    2917           0 :     }
    2918           0 :     fd_wksp_tag_query_info_t info;
    2919           0 :     ulong tag = FD_TXNCACHE_MAGIC;
    2920           0 :     if( fd_wksp_tag_query( ctx->status_cache_wksp, &tag, 1, &info, 1 ) > 0 ) {
    2921           0 :       void * status_cache_mem = fd_wksp_laddr_fast( ctx->status_cache_wksp, info.gaddr_lo );
    2922             :       /* Set up status cache. */
    2923           0 :       ctx->status_cache = fd_txncache_join( status_cache_mem );
    2924           0 :       if( ctx->status_cache == NULL ) {
    2925           0 :         FD_LOG_ERR(( "failed to join status cache in %s", status_cache_path ));
    2926           0 :       }
    2927           0 :     } else {
    2928           0 :       FD_LOG_ERR(( "failed to tag query status cache in %s", status_cache_path ));
    2929           0 :     }
    2930           0 :   } else {
    2931           0 :     void * status_cache_mem = fd_topo_obj_laddr( topo, status_cache_obj_id );
    2932           0 :     if (status_cache_mem == NULL) {
    2933           0 :       FD_LOG_ERR(( "failed to allocate status cache" ));
    2934           0 :     }
    2935           0 :     ctx->status_cache = fd_txncache_join( fd_txncache_new( status_cache_mem, FD_TXNCACHE_DEFAULT_MAX_ROOTED_SLOTS,
    2936           0 :                                                            FD_TXNCACHE_DEFAULT_MAX_LIVE_SLOTS, MAX_CACHE_TXNS_PER_SLOT,
    2937           0 :                                                            FD_TXNCACHE_DEFAULT_MAX_CONSTIPATED_SLOTS ) );
    2938           0 :     if (ctx->status_cache == NULL) {
    2939           0 :       fd_wksp_free_laddr(status_cache_mem);
    2940           0 :       FD_LOG_ERR(( "failed to join + new status cache" ));
    2941           0 :     }
    2942           0 :   }
    2943             : 
    2944             :   /**********************************************************************/
    2945             :   /* spad                                                               */
    2946             :   /**********************************************************************/
    2947             : 
    2948             :   /* TODO: The spad should probably have its own workspace. Eventually each
    2949             :      spad allocator should be bound to a transaction executor tile and should
    2950             :      be bounded out for the maximum amount of allocations used in the runtime. */
    2951             : 
    2952           0 :   uchar * spad_mem_cur = spad_mem;
    2953           0 :   for( ulong i=0UL; i<tile->replay.tpool_thread_count; i++ ) {
    2954           0 :     fd_spad_t * spad = fd_spad_join( fd_spad_new( spad_mem_cur, thread_spad_size ) );
    2955           0 :     ctx->exec_spads[ ctx->exec_spad_cnt++ ] = spad;
    2956           0 :     spad_mem_cur += fd_ulong_align_up( thread_spad_size, fd_spad_align() );
    2957           0 :   }
    2958             : 
    2959           0 :   ctx->runtime_spad = fd_spad_join( fd_spad_new( spad_mem_cur, FD_RUNTIME_BLOCK_EXECUTION_FOOTPRINT ) );
    2960           0 :   fd_spad_push( ctx->runtime_spad );
    2961             : 
    2962             :   /**********************************************************************/
    2963             :   /* epoch forks                                                        */
    2964             :   /**********************************************************************/
    2965             : 
    2966           0 :   void * epoch_ctx_mem = fd_spad_alloc( ctx->runtime_spad,
    2967           0 :                                         fd_exec_epoch_ctx_align(),
    2968           0 :                                         MAX_EPOCH_FORKS * fd_exec_epoch_ctx_footprint( VOTE_ACC_MAX ) );
    2969             : 
    2970             : 
    2971           0 :   fd_epoch_forks_new( ctx->epoch_forks, epoch_ctx_mem );
    2972             : 
    2973             :   /**********************************************************************/
    2974             :   /* joins                                                              */
    2975             :   /**********************************************************************/
    2976             : 
    2977           0 :   uchar * acc_mgr_shmem = fd_spad_alloc( ctx->runtime_spad, FD_ACC_MGR_ALIGN, FD_ACC_MGR_FOOTPRINT );
    2978           0 :   ctx->acc_mgr       = fd_acc_mgr_new( acc_mgr_shmem, ctx->funk );
    2979           0 :   ctx->bank_hash_cmp = fd_bank_hash_cmp_join( fd_bank_hash_cmp_new( bank_hash_cmp_mem ) );
    2980           0 :   ctx->epoch_ctx     = fd_exec_epoch_ctx_join( fd_exec_epoch_ctx_new( epoch_ctx_mem, VOTE_ACC_MAX ) );
    2981             : 
    2982           0 :   if( FD_UNLIKELY( sscanf( tile->replay.cluster_version, "%u.%u.%u", &ctx->epoch_ctx->epoch_bank.cluster_version[0], &ctx->epoch_ctx->epoch_bank.cluster_version[1], &ctx->epoch_ctx->epoch_bank.cluster_version[2] )!=3 ) ) {
    2983           0 :     FD_LOG_ERR(( "failed to decode cluster version, configured as \"%s\"", tile->replay.cluster_version ));
    2984           0 :   }
    2985           0 :   fd_features_enable_cleaned_up( &ctx->epoch_ctx->features, ctx->epoch_ctx->epoch_bank.cluster_version );
    2986             : 
    2987           0 :   ctx->epoch = fd_epoch_join( fd_epoch_new( epoch_mem, FD_VOTER_MAX ) );
    2988           0 :   ctx->forks = fd_forks_join( fd_forks_new( forks_mem, FD_BLOCK_MAX, 42UL ) );
    2989           0 :   ctx->ghost = fd_ghost_join( fd_ghost_new( ghost_mem, 42UL, FD_BLOCK_MAX ) );
    2990           0 :   ctx->tower = fd_tower_join( fd_tower_new( tower_mem ) );
    2991             : 
    2992           0 :   ctx->replay = fd_replay_join( fd_replay_new( replay_mem, tile->replay.fec_max, FD_SHRED_MAX_PER_SLOT, FD_BLOCK_MAX ) );
    2993             : 
    2994             :   /**********************************************************************/
    2995             :   /* voter                                                              */
    2996             :   /**********************************************************************/
    2997             : 
    2998           0 :   memcpy( ctx->validator_identity, fd_keyload_load( tile->replay.identity_key_path, 1 ), sizeof(fd_pubkey_t) );
    2999           0 :   *ctx->vote_authority = *ctx->validator_identity; /* FIXME */
    3000           0 :   memcpy( ctx->vote_acc, fd_keyload_load( tile->replay.vote_account_path, 1 ), sizeof(fd_pubkey_t) );
    3001             : 
    3002             :   /**********************************************************************/
    3003             :   /* entry batch                                                        */
    3004             :   /**********************************************************************/
    3005             : 
    3006           0 :   ctx->mbatch = mbatch_mem;
    3007           0 :   memset( &ctx->slice_exec_ctx, 0, sizeof(fd_slice_exec_ctx_t) );
    3008             : 
    3009             :   /**********************************************************************/
    3010             :   /* tpool                                                              */
    3011             :   /**********************************************************************/
    3012             : 
    3013           0 :   if( FD_LIKELY( tile->replay.tpool_thread_count > 1 ) ) {
    3014           0 :     tpool_boot( topo, tile->replay.tpool_thread_count );
    3015           0 :   }
    3016           0 :   ctx->tpool = fd_tpool_init( ctx->tpool_mem, tile->replay.tpool_thread_count );
    3017             : 
    3018           0 :   if( FD_LIKELY( tile->replay.tpool_thread_count > 1 ) ) {
    3019             :     /* Start the tpool workers */
    3020           0 :     for( ulong i=1UL; i<tile->replay.tpool_thread_count; i++ ) {
    3021           0 :       if( fd_tpool_worker_push( ctx->tpool, i, NULL, 0UL ) == NULL ) {
    3022           0 :         FD_LOG_ERR(( "failed to launch worker" ));
    3023           0 :       }
    3024           0 :     }
    3025           0 :   }
    3026             : 
    3027           0 :   if( ctx->tpool == NULL ) {
    3028           0 :     FD_LOG_ERR(("failed to create thread pool"));
    3029           0 :   }
    3030             : 
    3031             :   /**********************************************************************/
    3032             :   /* capture                                                            */
    3033             :   /**********************************************************************/
    3034             : 
    3035           0 :   if( strlen(tile->replay.capture) > 0 ) {
    3036           0 :     ctx->capture_ctx = fd_capture_ctx_new( capture_ctx_mem );
    3037           0 :     ctx->capture_ctx->checkpt_freq = ULONG_MAX;
    3038           0 :     ctx->capture_file = fopen( tile->replay.capture, "w+" );
    3039           0 :     if( FD_UNLIKELY( !ctx->capture_file ) ) {
    3040           0 :       FD_LOG_ERR(( "fopen(%s) failed (%d-%s)", tile->replay.capture, errno, strerror( errno ) ));
    3041           0 :     }
    3042           0 :     ctx->capture_ctx->capture_txns = 0;
    3043           0 :     fd_solcap_writer_init( ctx->capture_ctx->capture, ctx->capture_file );
    3044           0 :   }
    3045             : 
    3046             :   /**********************************************************************/
    3047             :   /* bank                                                               */
    3048             :   /**********************************************************************/
    3049             : 
    3050           0 :   ctx->bank_cnt         = tile->replay.bank_tile_count;
    3051           0 :   for( ulong i=0UL; i<tile->replay.bank_tile_count; i++ ) {
    3052           0 :     ulong busy_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "bank_busy.%lu", i );
    3053           0 :     FD_TEST( busy_obj_id!=ULONG_MAX );
    3054           0 :     ctx->bank_busy[ i ] = fd_fseq_join( fd_topo_obj_laddr( topo, busy_obj_id ) );
    3055           0 :     if( FD_UNLIKELY( !ctx->bank_busy[ i ] ) ) FD_LOG_ERR(( "banking tile %lu has no busy flag", i ));
    3056             : 
    3057           0 :     fd_topo_link_t * poh_out_link = &topo->links[ tile->out_link_id[ POH_OUT_IDX+i ] ];
    3058           0 :     fd_replay_out_ctx_t * poh_out = &ctx->bank_out[ i ];
    3059           0 :     poh_out->mcache           = poh_out_link->mcache;
    3060           0 :     poh_out->sync             = fd_mcache_seq_laddr( poh_out->mcache );
    3061           0 :     poh_out->depth            = fd_mcache_depth( poh_out->mcache );
    3062           0 :     poh_out->seq              = fd_mcache_seq_query( poh_out->sync );
    3063           0 :     poh_out->mem              = topo->workspaces[ topo->objs[ poh_out_link->dcache_obj_id ].wksp_id ].wksp;
    3064           0 :     poh_out->chunk0           = fd_dcache_compact_chunk0( poh_out->mem, poh_out_link->dcache );
    3065           0 :     poh_out->wmark            = fd_dcache_compact_wmark( poh_out->mem, poh_out_link->dcache, poh_out_link->mtu );
    3066           0 :     poh_out->chunk            = poh_out->chunk0;
    3067           0 :   }
    3068             : 
    3069           0 :   ctx->poh_init_done = 0U;
    3070           0 :   ctx->snapshot_init_done = 0;
    3071             : 
    3072             :   /**********************************************************************/
    3073             :   /* exec                                                               */
    3074             :   /**********************************************************************/
    3075           0 :   ctx->exec_cnt = tile->replay.exec_tile_count;
    3076           0 :   for( ulong i = 0UL; i < ctx->exec_cnt; i++ ) {
    3077           0 :     ulong idx = fd_topo_find_tile_out_link( topo, tile, "replay_exec", i );
    3078           0 :     fd_topo_link_t * exec_out_link = &topo->links[ tile->out_link_id[ idx ] ];
    3079             : 
    3080           0 :     if( strcmp( exec_out_link->name, "replay_exec" ) ) {
    3081           0 :       FD_LOG_ERR(("output link confusion for output %lu", idx ));
    3082           0 :     }
    3083             : 
    3084           0 :     fd_replay_out_ctx_t * exec_out = &ctx->exec_out[ i ];
    3085           0 :     exec_out->idx              = idx;
    3086           0 :     exec_out->mem              = topo->workspaces[ topo->objs[ exec_out_link->dcache_obj_id ].wksp_id ].wksp;
    3087           0 :     exec_out->chunk0           = fd_dcache_compact_chunk0( exec_out->mem, exec_out_link->dcache );
    3088           0 :     exec_out->wmark            = fd_dcache_compact_wmark( exec_out->mem, exec_out_link->dcache, exec_out_link->mtu );
    3089           0 :     exec_out->chunk            = exec_out->chunk0;
    3090           0 :   }
    3091             : 
    3092             :   /* set up vote related items */
    3093           0 :   ctx->vote                           = tile->replay.vote;
    3094           0 :   ctx->validator_identity_pubkey[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->replay.identity_key_path, 1 ) );
    3095           0 :   ctx->vote_acct_addr[ 0 ]            = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->replay.vote_account_path, 1 ) );
    3096             : 
    3097             :   /**********************************************************************/
    3098             :   /* tower checkpointing for wen-restart                                */
    3099             :   /**********************************************************************/
    3100           0 :   ctx->tower_checkpt_fileno = -1;
    3101           0 :   if( FD_LIKELY( strlen( tile->replay.tower_checkpt )>0 ) ) {
    3102           0 :     ctx->tower_checkpt_fileno = open( tile->replay.tower_checkpt,
    3103           0 :                                       O_RDWR | O_CREAT,
    3104           0 :                                       S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH );
    3105           0 :     if( ctx->tower_checkpt_fileno<0 ) FD_LOG_ERR(( "Failed at opening the tower checkpoint file" ));
    3106           0 :   }
    3107             : 
    3108             :   /**********************************************************************/
    3109             :   /* links                                                              */
    3110             :   /**********************************************************************/
    3111             : 
    3112             :   /* Setup store tile input */
    3113           0 :   fd_topo_link_t * store_in_link = &topo->links[ tile->in_link_id[ STORE_IN_IDX ] ];
    3114           0 :   ctx->store_in_mem              = topo->workspaces[ topo->objs[ store_in_link->dcache_obj_id ].wksp_id ].wksp;
    3115           0 :   ctx->store_in_chunk0           = fd_dcache_compact_chunk0( ctx->store_in_mem, store_in_link->dcache );
    3116           0 :   ctx->store_in_wmark            = fd_dcache_compact_wmark( ctx->store_in_mem, store_in_link->dcache, store_in_link->mtu );
    3117             : 
    3118             :   /* Setup pack tile input */
    3119           0 :   fd_topo_link_t * pack_in_link = &topo->links[ tile->in_link_id[ PACK_IN_IDX ] ];
    3120           0 :   ctx->pack_in_mem              = topo->workspaces[ topo->objs[ pack_in_link->dcache_obj_id ].wksp_id ].wksp;
    3121           0 :   ctx->pack_in_chunk0           = fd_dcache_compact_chunk0( ctx->pack_in_mem, pack_in_link->dcache );
    3122           0 :   ctx->pack_in_wmark            = fd_dcache_compact_wmark( ctx->pack_in_mem, pack_in_link->dcache, pack_in_link->mtu );
    3123             : 
    3124             :   /* Setup batch tile input for epoch account hash */
    3125           0 :   fd_topo_link_t * batch_in_link = &topo->links[ tile->in_link_id[ BATCH_IN_IDX ] ];
    3126           0 :   ctx->batch_in_mem              = topo->workspaces[ topo->objs[ batch_in_link->dcache_obj_id ].wksp_id ].wksp;
    3127           0 :   ctx->batch_in_chunk0           = fd_dcache_compact_chunk0( ctx->batch_in_mem, batch_in_link->dcache );
    3128           0 :   ctx->batch_in_wmark            = fd_dcache_compact_wmark( ctx->batch_in_mem, batch_in_link->dcache, batch_in_link->mtu );
    3129             : 
    3130           0 :   ctx->shred_in_cnt = tile->in_cnt-SHRED_IN_IDX;
    3131           0 :   for( ulong i = 0; i<ctx->shred_in_cnt; i++ ) {
    3132           0 :     fd_topo_link_t * shred_in_link = &topo->links[ tile->in_link_id[ i+SHRED_IN_IDX ] ];
    3133           0 :     ctx->shred_in[ i ].mem    = topo->workspaces[ topo->objs[ shred_in_link->dcache_obj_id ].wksp_id ].wksp;
    3134           0 :     ctx->shred_in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->shred_in[ i ].mem, shred_in_link->dcache );
    3135           0 :     ctx->shred_in[ i ].wmark  = fd_dcache_compact_wmark( ctx->shred_in[ i ].mem, shred_in_link->dcache, shred_in_link->mtu );
    3136           0 :   }
    3137             : 
    3138           0 :   fd_topo_link_t * notif_out = &topo->links[ tile->out_link_id[ NOTIF_OUT_IDX ] ];
    3139           0 :   ctx->notif_out_mcache      = notif_out->mcache;
    3140           0 :   ctx->notif_out_sync        = fd_mcache_seq_laddr( ctx->notif_out_mcache );
    3141           0 :   ctx->notif_out_depth       = fd_mcache_depth( ctx->notif_out_mcache );
    3142           0 :   ctx->notif_out_seq         = fd_mcache_seq_query( ctx->notif_out_sync );
    3143           0 :   ctx->notif_out_mem         = topo->workspaces[ topo->objs[ notif_out->dcache_obj_id ].wksp_id ].wksp;
    3144           0 :   ctx->notif_out_chunk0      = fd_dcache_compact_chunk0( ctx->notif_out_mem, notif_out->dcache );
    3145           0 :   ctx->notif_out_wmark       = fd_dcache_compact_wmark ( ctx->notif_out_mem, notif_out->dcache, notif_out->mtu );
    3146           0 :   ctx->notif_out_chunk       = ctx->notif_out_chunk0;
    3147             : 
    3148           0 :   fd_topo_link_t * sender_out = &topo->links[ tile->out_link_id[ SENDER_OUT_IDX ] ];
    3149           0 :   ctx->sender_out_mcache      = sender_out->mcache;
    3150           0 :   ctx->sender_out_sync        = fd_mcache_seq_laddr( ctx->sender_out_mcache );
    3151           0 :   ctx->sender_out_depth       = fd_mcache_depth( ctx->sender_out_mcache );
    3152           0 :   ctx->sender_out_seq         = fd_mcache_seq_query( ctx->sender_out_sync );
    3153           0 :   ctx->sender_out_mem         = topo->workspaces[ topo->objs[ sender_out->dcache_obj_id ].wksp_id ].wksp;
    3154           0 :   ctx->sender_out_chunk0      = fd_dcache_compact_chunk0( ctx->sender_out_mem, sender_out->dcache );
    3155           0 :   ctx->sender_out_wmark       = fd_dcache_compact_wmark ( ctx->sender_out_mem, sender_out->dcache, sender_out->mtu );
    3156           0 :   ctx->sender_out_chunk       = ctx->sender_out_chunk0;
    3157             : 
    3158             :   /* Set up stake weights tile output */
    3159           0 :   fd_topo_link_t * stake_weights_out = &topo->links[ tile->out_link_id[ STAKE_OUT_IDX] ];
    3160           0 :   ctx->stake_weights_out_mcache      = stake_weights_out->mcache;
    3161           0 :   ctx->stake_weights_out_sync   = fd_mcache_seq_laddr( ctx->stake_weights_out_mcache );
    3162           0 :   ctx->stake_weights_out_depth  = fd_mcache_depth( ctx->stake_weights_out_mcache );
    3163           0 :   ctx->stake_weights_out_seq    = fd_mcache_seq_query( ctx->stake_weights_out_sync );
    3164           0 :   ctx->stake_weights_out_mem    = topo->workspaces[ topo->objs[ stake_weights_out->dcache_obj_id ].wksp_id ].wksp;
    3165           0 :   ctx->stake_weights_out_chunk0 = fd_dcache_compact_chunk0( ctx->stake_weights_out_mem, stake_weights_out->dcache );
    3166           0 :   ctx->stake_weights_out_wmark  = fd_dcache_compact_wmark ( ctx->stake_weights_out_mem, stake_weights_out->dcache, stake_weights_out->mtu );
    3167           0 :   ctx->stake_weights_out_chunk  = ctx->stake_weights_out_chunk0;
    3168             : 
    3169           0 :   if( FD_LIKELY( tile->replay.plugins_enabled ) ) {
    3170           0 :     ctx->replay_plug_out_idx = fd_topo_find_tile_out_link( topo, tile, "replay_plugi", 0 );
    3171           0 :     fd_topo_link_t const * replay_plugin_out = &topo->links[ tile->out_link_id[ ctx->replay_plug_out_idx] ];
    3172           0 :     if( strcmp( replay_plugin_out->name, "replay_plugi" ) ) {
    3173           0 :       FD_LOG_ERR(("output link confusion for output %lu", ctx->replay_plug_out_idx));
    3174           0 :     }
    3175           0 :     ctx->replay_plugin_out_mem    = topo->workspaces[ topo->objs[ replay_plugin_out->dcache_obj_id ].wksp_id ].wksp;
    3176           0 :     ctx->replay_plugin_out_chunk0 = fd_dcache_compact_chunk0( ctx->replay_plugin_out_mem, replay_plugin_out->dcache );
    3177           0 :     ctx->replay_plugin_out_wmark  = fd_dcache_compact_wmark ( ctx->replay_plugin_out_mem, replay_plugin_out->dcache, replay_plugin_out->mtu );
    3178           0 :     ctx->replay_plugin_out_chunk  = ctx->replay_plugin_out_chunk0;
    3179             : 
    3180           0 :     ctx->votes_plug_out_idx = fd_topo_find_tile_out_link( topo, tile, "votes_plugin", 0 );
    3181           0 :     fd_topo_link_t const * votes_plugin_out = &topo->links[ tile->out_link_id[ ctx->votes_plug_out_idx] ];
    3182           0 :     if( strcmp( votes_plugin_out->name, "votes_plugin" ) ) {
    3183           0 :       FD_LOG_ERR(("output link confusion for output %lu", ctx->votes_plug_out_idx));
    3184           0 :     }
    3185           0 :     ctx->votes_plugin_out_mem    = topo->workspaces[ topo->objs[ votes_plugin_out->dcache_obj_id ].wksp_id ].wksp;
    3186           0 :     ctx->votes_plugin_out_chunk0 = fd_dcache_compact_chunk0( ctx->votes_plugin_out_mem, votes_plugin_out->dcache );
    3187           0 :     ctx->votes_plugin_out_wmark  = fd_dcache_compact_wmark ( ctx->votes_plugin_out_mem, votes_plugin_out->dcache, votes_plugin_out->mtu );
    3188           0 :     ctx->votes_plugin_out_chunk  = ctx->votes_plugin_out_chunk0;
    3189           0 :   }
    3190             : 
    3191           0 :   if( strnlen( tile->replay.slots_replayed, sizeof(tile->replay.slots_replayed) )>0UL ) {
    3192           0 :     ctx->slots_replayed_file = fopen( tile->replay.slots_replayed, "w" );
    3193           0 :     FD_TEST( ctx->slots_replayed_file );
    3194           0 :   }
    3195             : 
    3196             :   /* replay public setup */
    3197           0 :   ulong replay_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "replay_pub" );
    3198           0 :   FD_TEST( replay_obj_id!=ULONG_MAX );
    3199           0 :   ctx->replay_public_wksp = topo->workspaces[ topo->objs[ replay_obj_id ].wksp_id ].wksp;
    3200             : 
    3201           0 :   if( ctx->replay_public_wksp==NULL ) {
    3202           0 :     FD_LOG_ERR(( "no replay_public workspace" ));
    3203           0 :   }
    3204             : 
    3205           0 :   ctx->replay_public = fd_runtime_public_join( fd_topo_obj_laddr( topo, replay_obj_id ) );
    3206           0 :   ctx->fecs_inserted = 0UL;
    3207           0 :   ctx->fecs_removed  = 0UL;
    3208           0 :   FD_TEST( ctx->replay_public!=NULL );
    3209           0 : }
    3210             : 
    3211             : static ulong
    3212             : populate_allowed_seccomp( fd_topo_t const *      topo,
    3213             :                           fd_topo_tile_t const * tile,
    3214             :                           ulong                  out_cnt,
    3215           0 :                           struct sock_filter *   out ) {
    3216           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
    3217             : 
    3218           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
    3219           0 :   fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
    3220           0 :   FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_replay_tile_ctx_t) );
    3221             : 
    3222           0 :   populate_sock_filter_policy_fd_replay_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)ctx->blockstore_fd );
    3223           0 :   return sock_filter_policy_fd_replay_tile_instr_cnt;
    3224           0 : }
    3225             : 
    3226             : static ulong
    3227             : populate_allowed_fds( fd_topo_t const *      topo,
    3228             :                       fd_topo_tile_t const * tile,
    3229             :                       ulong                  out_fds_cnt,
    3230           0 :                       int *                  out_fds ) {
    3231           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
    3232             : 
    3233           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
    3234           0 :   fd_replay_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_replay_tile_ctx_t), sizeof(fd_replay_tile_ctx_t) );
    3235           0 :   FD_SCRATCH_ALLOC_FINI( l, sizeof(fd_replay_tile_ctx_t) );
    3236             : 
    3237           0 :   if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
    3238             : 
    3239           0 :   ulong out_cnt = 0UL;
    3240           0 :   out_fds[ out_cnt++ ] = 2; /* stderr */
    3241           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
    3242           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
    3243           0 :   out_fds[ out_cnt++ ] = ctx->blockstore_fd;
    3244           0 :   return out_cnt;
    3245           0 : }
    3246             : 
    3247             : static inline void
    3248           0 : metrics_write( fd_replay_tile_ctx_t * ctx ) {
    3249           0 :   FD_MGAUGE_SET( REPLAY, LAST_VOTED_SLOT, ctx->metrics.last_voted_slot );
    3250           0 :   FD_MGAUGE_SET( REPLAY, SLOT, ctx->metrics.slot );
    3251           0 : }
    3252             : 
    3253             : /* TODO: This is definitely not correct */
    3254           0 : #define STEM_BURST (1UL)
    3255             : 
    3256           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_replay_tile_ctx_t
    3257           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_replay_tile_ctx_t)
    3258             : 
    3259           0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
    3260           0 : #define STEM_CALLBACK_BEFORE_FRAG         before_frag
    3261           0 : #define STEM_CALLBACK_AFTER_CREDIT        after_credit
    3262           0 : #define STEM_CALLBACK_DURING_FRAG         during_frag
    3263           0 : #define STEM_CALLBACK_AFTER_FRAG          after_frag
    3264           0 : #define STEM_CALLBACK_METRICS_WRITE       metrics_write
    3265             : 
    3266             : #include "../../disco/stem/fd_stem.c"
    3267             : 
    3268             : fd_topo_run_tile_t fd_tile_replay = {
    3269             :     .name                     = "replay",
    3270             :     .loose_footprint          = loose_footprint,
    3271             :     .populate_allowed_seccomp = populate_allowed_seccomp,
    3272             :     .populate_allowed_fds     = populate_allowed_fds,
    3273             :     .scratch_align            = scratch_align,
    3274             :     .scratch_footprint        = scratch_footprint,
    3275             :     .privileged_init          = privileged_init,
    3276             :     .unprivileged_init        = unprivileged_init,
    3277             :     .run                      = stem_run,
    3278             : };

Generated by: LCOV version 1.14