LCOV - code coverage report
Current view: top level - disco/shred - fd_shred_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 865 0.0 %
Date: 2026-07-02 05:42:57 Functions: 0 16 0.0 %

          Line data    Source code
       1             : #include "../tiles.h"
       2             : 
       3             : #include "generated/fd_shred_tile_seccomp.h"
       4             : #include "../../util/pod/fd_pod_format.h"
       5             : #include "fd_shredder.h"
       6             : #include "fd_shred_batch.h"
       7             : #include "fd_shred_dest.h"
       8             : #include "fd_fec_resolver.h"
       9             : #include "fd_stake_ci.h"
      10             : #include "fd_rnonce_ss.h"
      11             : #include "fd_shred_tile.h"
      12             : #include "../store/fd_store.h"
      13             : #include "../keyguard/fd_keyload.h"
      14             : #include "../keyguard/fd_keyguard.h"
      15             : #include "../keyguard/fd_keyguard_client.h"
      16             : #include "../keyguard/fd_keyswitch.h"
      17             : #include "../fd_disco.h"
      18             : #include "../net/fd_net_tile.h"
      19             : #include "../../flamenco/leaders/fd_leaders.h"
      20             : #include "../../util/net/fd_net_headers.h"
      21             : #include "../../flamenco/gossip/fd_gossip_message.h"
      22             : #include "../../flamenco/runtime/sysvar/fd_sysvar_epoch_schedule.h"
      23             : #include "../../discof/tower/fd_tower_slot_rooted.h"
      24             : 
      25             : /* The shred tile handles shreds from two data sources: shreds generated
      26             :    from microblocks from the leader pipeline, and shreds retransmitted
      27             :    from the network.
      28             : 
      29             :    They have rather different semantics, but at the end of the day, they
      30             :    both result in a bunch of shreds and FEC sets that need to be sent to
      31             :    the blockstore and on the network, which is why one tile handles
      32             :    both.
      33             : 
      34             :    We segment the memory for the two types of shreds into two halves of
      35             :    a dcache because they follow somewhat different flow control
      36             :    patterns. For flow control, the normal guarantee we want to provide
      37             :    is that the dcache entry is not overwritten unless the mcache entry
      38             :    has also been overwritten.  The normal way to do this when using both
      39             :    cyclically and with a 1-to-1 mapping is to make the dcache at least
      40             :    `burst` entries bigger than the mcache.
      41             : 
      42             :    In this tile, we use one output mcache (of depth d) with one output
      43             :    dcache (which is logically partitioned into two) for the two sources
      44             :    of data.  The worst case for flow control is when we're only sending
      45             :    with one of the dcache partitions at a time though, so we can
      46             :    consider them separately.
      47             : 
      48             :    Leader pipeline: Every entry triggers s FEC sets to be created, where
      49             :    s is in [0, FD_SHRED_BATCH_FEC_SETS_MAX].  Each FEC set corresponds
      50             :    to 1 dcache entry and 1 mcache entry.  This means we can have d FEC
      51             :    sets exposed while producing FD_SHRED_BATCH_FEC_SETS_MAX more FEC
      52             :    sets, so the leader pipeline section of the dcache needs at least
      53             :    d+FD_SHRED_BATCH_FEC_SETS_MAX entries.
      54             : 
      55             :    From the network: The FEC resolver doesn't use a cyclic order, but it
      56             :    does promise that once it returns an FEC set, it will return at least
      57             :    complete_depth FEC sets before returning it again.  This means we
      58             :    want at most complete_depth-1 FEC sets exposed, so
      59             :    complete_depth=d+1 FEC sets.  The FEC resolver has the
      60             :    ability to keep individual shreds for partial_depth calls, but
      61             :    because in this version of the shred tile, we send each shred to all
      62             :    its destinations as soon as we get it, we don't need that
      63             :    functionality, so we set partial_depth=1.
      64             : 
      65             :    Adding these up and plugging in the current value of
      66             :    BATCH_FEC_SETS_MAX, we get 2*d+6+fec_resolver_depth FEC sets.  The
      67             :    topology code doesn't allow specifying mcache depth and dcache depth
      68             :    independently.  That means we have to lie about the MTU and burst.
      69             :    We say the MTU is double what it actually is, and then the burst is
      70             :    4+fec_resolver_depth/2.  That means we get
      71             :    2*d+2*(4+fec_resolver_depth/2) >= 2*d+6+fec_resolver_depth FEC sets.
      72             : 
      73             :    A note on parallelization.  From the network, shreds are distributed
      74             :    to tiles based on a validator-specific seeded hash of (slot, FEC set
      75             :    index) so all the shreds for a given FEC set (and any equivocating
      76             :    FEC set) are processed by the same tile.  From the leader pipeline,
      77             :    the original implementation used to parallelize by batch of
      78             :    microblocks (so within a block, batches were distributed to different
      79             :    tiles).  To support chained merkle shreds, the current implementation
      80             :    processes all the batches on tile 0 -- this should be a temporary
      81             :    state while Solana moves to a newer shred format that support better
      82             :    parallelization. */
      83             : 
      84             : #define FD_SHRED_TILE_SCRATCH_ALIGN 128UL
      85             : 
      86           0 : #define IN_KIND_CONTACT ( 0UL)
      87           0 : #define IN_KIND_EPOCH   ( 1UL) /* Firedancer */
      88           0 : #define IN_KIND_STAKE   ( 2UL) /* Frankendancer */
      89           0 : #define IN_KIND_POH     ( 3UL)
      90           0 : #define IN_KIND_NET     ( 4UL)
      91           0 : #define IN_KIND_SIGN    ( 5UL)
      92             : #define IN_KIND_REPAIR  ( 6UL)
      93           0 : #define IN_KIND_IPECHO  ( 7UL)
      94           0 : #define IN_KIND_GOSSIP  ( 8UL)
      95           0 : #define IN_KIND_ROOTED  ( 9UL)
      96           0 : #define IN_KIND_ROOTEDH (10UL)
      97             : 
      98           0 : #define NET_OUT_IDX     1
      99           0 : #define SIGN_OUT_IDX    2
     100             : 
     101             : FD_STATIC_ASSERT( sizeof(fd_entry_batch_meta_t)==56UL,      poh_shred_mtu   );
     102             : FD_STATIC_ASSERT( sizeof(fd_fec_set_t)==FD_SHRED_STORE_MTU, shred_store_mtu );
     103             : 
     104           0 : #define FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT 2
     105             : 
     106             : /* Number of entries in the block_ids table. Each entry is 32 byte.
     107             :    This table is used to keep track of block ids that we create
     108             :    when we're leader, so that we can access them whenever we need
     109             :    a *parent* block id for a new block. Larger table allows to
     110             :    retrieve older parent block ids. Currently it's set for worst
     111             :    case parent offset of USHORT_MAX (max allowed in a shred),
     112             :    making the total table 2MiB.
     113             :    See also comment on chained_merkle_root. */
     114           0 : #define BLOCK_IDS_TABLE_CNT USHORT_MAX
     115             : 
     116             : /* See note on parallelization above. Currently we process all batches in tile 0. */
     117             : #if 1
     118             : #define SHOULD_PROCESS_THESE_SHREDS ( ctx->round_robin_id==0 )
     119             : #else
     120             : #define SHOULD_PROCESS_THESE_SHREDS ( ctx->batch_cnt%ctx->round_robin_cnt==ctx->round_robin_id )
     121             : #endif
     122             : 
     123             : /* The behavior of the shred tile is slightly different for
     124             :    Frankendancer vs Firedancer.  For example, Frankendancer produces
     125             :    chained merkle shreds, while Firedancer doesn't yet.  We can check
     126             :    at runtime the difference by inspecting the topology. The simplest
     127             :    way is to test if ctx->store is initialized.
     128             : 
     129             :    FIXME don't assume only frank vs. fire */
     130             : #define IS_FIREDANCER ( ctx->store!=NULL )
     131             : 
     132             : typedef union {
     133             :   struct {
     134             :     fd_wksp_t * mem;
     135             :     ulong       chunk0;
     136             :     ulong       wmark;
     137             :   };
     138             :   fd_net_rx_bounds_t net_rx;
     139             : } fd_shred_in_ctx_t;
     140             : 
     141             : typedef struct {
     142             :   fd_shredder_t      * shredder;
     143             :   fd_fec_resolver_t  * resolver;
     144             :   ulong                shred_limit;
     145             :   fd_pubkey_t          identity_key[1]; /* Just the public key */
     146             : 
     147             :   ulong                round_robin_id;
     148             :   ulong                round_robin_cnt;
     149             :   /* Number of batches shredded from PoH during the current slot.
     150             :      This should be the same for all the shred tiles. */
     151             :   ulong                batch_cnt;
     152             :   /* Slot of the most recent microblock we've seen from PoH,
     153             :      or 0 if we haven't seen one yet */
     154             :   ulong                slot;
     155             : 
     156             :   fd_rnonce_ss_t       repair_nonce_ss[1];
     157             : 
     158             :   fd_keyswitch_t *     keyswitch;
     159             :   fd_keyguard_client_t keyguard_client[1];
     160             : 
     161             :   fd_fec_set_t       * fec_sets;
     162             : 
     163             :   fd_stake_ci_t      * stake_ci;
     164             :   /* These are used in between during_frag and after_frag */
     165             :   fd_shred_dest_weighted_t * new_dest_ptr;
     166             :   ulong                      new_dest_cnt;
     167             :   ulong                      shredded_txn_cnt;
     168             :   ulong                      new_root;
     169             : 
     170             :   ulong poh_in_expect_seq;
     171             : 
     172             :   ushort net_id;
     173             : 
     174             :   int skip_frag;
     175             : 
     176             :   ulong                    adtl_dests_leader_cnt;
     177             :   fd_shred_dest_weighted_t adtl_dests_leader    [ FD_TOPO_ADTL_DESTS_MAX ];
     178             :   ulong                    adtl_dests_retransmit_cnt;
     179             :   fd_shred_dest_weighted_t adtl_dests_retransmit[ FD_TOPO_ADTL_DESTS_MAX ];
     180             : 
     181             :   fd_ip4_udp_hdrs_t data_shred_net_hdr  [1];
     182             :   fd_ip4_udp_hdrs_t parity_shred_net_hdr[1];
     183             : 
     184             :   ulong shredder_fec_set_idx;     /* In [0, shredder_max_fec_set_idx) */
     185             :   ulong shredder_max_fec_set_idx; /* exclusive */
     186             : 
     187             :   uchar shredder_merkle_root[32];
     188             : 
     189             :   ulong send_fec_set_idx[ FD_SHRED_BATCH_FEC_SETS_MAX ];
     190             :   ulong send_fec_set_cnt;
     191             :   ulong tsorig;  /* timestamp of the last packet in compressed form */
     192             : 
     193             :   /* Includes Ethernet, IP, UDP headers */
     194             :   ulong shred_buffer_sz;
     195             :   uchar shred_buffer[ FD_NET_MTU ];
     196             : 
     197             :   /* resolver_seed gets generated in privileged_init but used in
     198             :      unprivileged_init, so we store it here in between. */
     199             :   ulong resolver_seed;
     200             : 
     201             :   fd_shred_in_ctx_t in[ 32 ];
     202             :   int               in_kind[ 32 ];
     203             : 
     204             :   fd_wksp_t * net_out_mem;
     205             :   ulong       net_out_chunk0;
     206             :   ulong       net_out_wmark;
     207             :   ulong       net_out_chunk;
     208             : 
     209             :   ulong       store_out_idx;
     210             :   fd_wksp_t * store_out_mem;
     211             :   ulong       store_out_chunk0;
     212             :   ulong       store_out_wmark;
     213             :   ulong       store_out_chunk;
     214             : 
     215             :   /* This is the output link for shreds that is currently consumed by
     216             :      the repair and replay tile. */
     217             :   ulong       shred_out_idx;
     218             :   fd_wksp_t * shred_out_mem;
     219             :   ulong       shred_out_chunk0;
     220             :   ulong       shred_out_wmark;
     221             :   ulong       shred_out_chunk;
     222             : 
     223             :   fd_store_t * store;
     224             : 
     225             :   fd_gossip_update_message_t gossip_upd_buf[1];
     226             : 
     227             :   struct {
     228             :     fd_histf_t contact_info_cnt[ 1 ];
     229             :     fd_histf_t batch_sz[ 1 ];
     230             :     fd_histf_t batch_microblock_cnt[ 1 ];
     231             :     fd_histf_t shredding_timing[ 1 ];
     232             :     fd_histf_t add_shred_timing[ 1 ];
     233             :     ulong shred_processing_result[ FD_FEC_RESOLVER_ADD_SHRED_RETVAL_CNT+FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT ];
     234             :     ulong invalid_block_id_cnt;
     235             :     ulong shred_rejected_unchained_cnt;
     236             :     ulong repair_rcv_cnt;
     237             :     ulong repair_rcv_bytes;
     238             :     ulong turbine_rcv_cnt;
     239             :     ulong turbine_rcv_bytes;
     240             :     ulong bad_nonce;
     241             :   } metrics[ 1 ];
     242             : 
     243             :   struct {
     244             :     ulong txn_cnt;
     245             :     ulong pos; /* in payload, range [0, FD_SHRED_BATCH_RAW_BUF_SZ-8UL) */
     246             :     ulong slot; /* set to 0 when pos==0 */
     247             :     union {
     248             :       struct {
     249             :         ulong microblock_cnt;
     250             :         uchar payload[ FD_SHRED_BATCH_RAW_BUF_SZ - 8UL ];
     251             :       };
     252             :       uchar raw[ FD_SHRED_BATCH_RAW_BUF_SZ ];
     253             :     };
     254             :   } pending_batch;
     255             : 
     256             :   fd_epoch_schedule_t            epoch_schedule[1];
     257             :   fd_shred_features_activation_t features_activation[1];
     258             :   /* too large to be left in the stack */
     259             :   fd_shred_dest_idx_t scratchpad_dests[ FD_SHRED_DEST_MAX_FANOUT*(FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX) ];
     260             : 
     261             :   uchar * chained_merkle_root;
     262             :   fd_bmtree_node_t out_merkle_roots[ FD_SHRED_BATCH_FEC_SETS_MAX ];
     263             :   uchar block_ids[ BLOCK_IDS_TABLE_CNT ][ FD_SHRED_MERKLE_ROOT_SZ ];
     264             : 
     265             :   /* Bank object that we receive from the PoH tile and pass on to
     266             :      the store tile for setting the block_id of a slot. */
     267             :   void const * leader_bank;
     268             : } fd_shred_ctx_t;
     269             : 
     270             : /* shred features are generally considered active at the epoch *following*
     271             :    the epoch in which the feature gate is activated.
     272             : 
     273             :    As an optimization, when the activation slot is received, it is converted
     274             :    into the first slot of the subsequent epoch.  This allows for a more
     275             :    efficient check (shred_slot >= feature_slot) and avoids the overhead of
     276             :    repeatedly converting slots into epochs for comparison.
     277             : 
     278             :    This function is only for Firedancer, while Frankendancer already receives
     279             :    the final activation slot from POH tile.
     280             : 
     281             :    In Agave, this is done with check_feature_activation():
     282             :    https://github.com/anza-xyz/agave/blob/v3.1.4/turbine/src/cluster_nodes.rs#L771
     283             :    https://github.com/anza-xyz/agave/blob/v3.1.4/core/src/shred_fetch_stage.rs#L456 */
     284             : static inline ulong
     285           0 : fd_shred_get_feature_activation_slot0( ulong feature_slot, fd_shred_ctx_t * ctx ) {
     286             :   /* if the feature does not have an activation slot yet, return ULONG_MAX */
     287           0 :   if( FD_UNLIKELY( feature_slot==ULONG_MAX ) ) {
     288           0 :     return ULONG_MAX;
     289           0 :   }
     290             :   /* if we don't have an epoch schedule yet, return ULONG_MAX */
     291           0 :   if( FD_UNLIKELY( ctx->epoch_schedule->slots_per_epoch==0 ) ) {
     292           0 :     return ULONG_MAX;
     293           0 :   }
     294             :   /* compute the activation epoch, add one, return the first slot. */
     295           0 :   ulong feature_epoch = 1 + fd_slot_to_epoch( ctx->epoch_schedule, feature_slot, NULL );
     296           0 :   return fd_epoch_slot0( ctx->epoch_schedule, feature_epoch );
     297           0 : }
     298             : 
     299             : FD_FN_CONST static inline ulong
     300           0 : scratch_align( void ) {
     301           0 :   return 128UL;
     302           0 : }
     303             : 
     304             : FD_FN_PURE static inline ulong
     305           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     306             : 
     307           0 :   ulong fec_resolver_footprint = fd_fec_resolver_footprint( tile->shred.fec_resolver_depth, 1UL, tile->shred.depth+1UL,
     308           0 :                                                             128UL * tile->shred.fec_resolver_depth );
     309           0 :   ulong l = FD_LAYOUT_INIT;
     310           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_shred_ctx_t),          sizeof(fd_shred_ctx_t)                  );
     311           0 :   l = FD_LAYOUT_APPEND( l, fd_stake_ci_align(),              fd_stake_ci_footprint()                 );
     312           0 :   l = FD_LAYOUT_APPEND( l, fd_fec_resolver_align(),          fec_resolver_footprint                  );
     313           0 :   l = FD_LAYOUT_APPEND( l, fd_shredder_align(),              fd_shredder_footprint()                 );
     314           0 :   return FD_LAYOUT_FINI( l, scratch_align() );
     315           0 : }
     316             : 
     317             : static inline void
     318           0 : during_housekeeping( fd_shred_ctx_t * ctx ) {
     319           0 :   if( FD_UNLIKELY( fd_keyswitch_state_query( ctx->keyswitch )==FD_KEYSWITCH_STATE_SWITCH_PENDING ) ) {
     320           0 :     ulong seq_must_complete = ctx->keyswitch->param;
     321             : 
     322           0 :     if( FD_UNLIKELY( fd_seq_lt( ctx->poh_in_expect_seq, seq_must_complete ) ) ) {
     323             :       /* See fd_keyswitch.h, we need to flush any in-flight shreds from
     324             :          the leader pipeline before switching key. */
     325           0 :       FD_LOG_WARNING(( "Flushing in-flight unpublished shreds, must reach seq %lu, currently at %lu ...", seq_must_complete, ctx->poh_in_expect_seq ));
     326           0 :       return;
     327           0 :     }
     328             : 
     329           0 :     memcpy( ctx->identity_key->uc, ctx->keyswitch->bytes, 32UL );
     330           0 :     fd_stake_ci_set_identity( ctx->stake_ci, ctx->identity_key );
     331           0 :     fd_keyswitch_state( ctx->keyswitch, FD_KEYSWITCH_STATE_COMPLETED );
     332           0 :   }
     333           0 : }
     334             : 
     335             : static inline void
     336           0 : metrics_write( fd_shred_ctx_t * ctx ) {
     337           0 :   FD_MHIST_COPY( SHRED, CONTACT_INFO_PER_MESSAGE,   ctx->metrics->contact_info_cnt             );
     338           0 :   FD_MHIST_COPY( SHRED, BATCH_SIZE_BYTES,           ctx->metrics->batch_sz                     );
     339           0 :   FD_MHIST_COPY( SHRED, MICROBLOCK_PER_BATCH,       ctx->metrics->batch_microblock_cnt         );
     340           0 :   FD_MHIST_COPY( SHRED, SHREDDING_DURATION_SECONDS, ctx->metrics->shredding_timing             );
     341           0 :   FD_MHIST_COPY( SHRED, ADD_SHRED_DURATION_SECONDS, ctx->metrics->add_shred_timing             );
     342           0 :   FD_MCNT_SET  ( SHRED, SHRED_REPAIR_RX,            ctx->metrics->repair_rcv_cnt               );
     343           0 :   FD_MCNT_SET  ( SHRED, SHRED_REPAIR_RX_BYTES,      ctx->metrics->repair_rcv_bytes             );
     344           0 :   FD_MCNT_SET  ( SHRED, SHRED_TURBINE_RX,           ctx->metrics->turbine_rcv_cnt              );
     345           0 :   FD_MCNT_SET  ( SHRED, SHRED_TURBINE_RX_BYTES,     ctx->metrics->turbine_rcv_bytes            );
     346           0 :   FD_MCNT_SET  ( SHRED, NONCE_INVALID,              ctx->metrics->bad_nonce                    );
     347             : 
     348           0 :   FD_MCNT_SET  ( SHRED, BLOCK_ID_INVALID,           ctx->metrics->invalid_block_id_cnt         );
     349           0 :   FD_MCNT_SET  ( SHRED, SHRED_UNCHAINED_REJECTED,   ctx->metrics->shred_rejected_unchained_cnt );
     350             : 
     351           0 :   FD_MCNT_ENUM_COPY( SHRED, SHRED_PROCESSED, ctx->metrics->shred_processing_result             );
     352           0 : }
     353             : 
     354             : static inline void
     355             : handle_new_cluster_contact_info( fd_shred_ctx_t * ctx,
     356           0 :                                  uchar const    * buf ) {
     357           0 :   ulong const * header = (ulong const *)fd_type_pun_const( buf );
     358             : 
     359           0 :   ulong dest_cnt = header[ 0 ];
     360           0 :   fd_histf_sample( ctx->metrics->contact_info_cnt, dest_cnt );
     361             : 
     362           0 :   if( dest_cnt >= MAX_SHRED_DESTS )
     363           0 :     FD_LOG_ERR(( "Cluster nodes had %lu destinations, which was more than the max of %lu", dest_cnt, MAX_SHRED_DESTS ));
     364             : 
     365           0 :   fd_shred_dest_wire_t const * in_dests = fd_type_pun_const( header+1UL );
     366           0 :   fd_shred_dest_weighted_t * dests = fd_stake_ci_dest_add_init( ctx->stake_ci );
     367             : 
     368           0 :   ctx->new_dest_ptr = dests;
     369           0 :   ctx->new_dest_cnt = dest_cnt;
     370             : 
     371           0 :   for( ulong i=0UL; i<dest_cnt; i++ ) {
     372           0 :     memcpy( dests[i].pubkey.uc, in_dests[i].pubkey, 32UL );
     373           0 :     dests[i].ip4  = in_dests[i].ip4_addr;
     374           0 :     dests[i].port = in_dests[i].udp_port;
     375           0 :   }
     376           0 : }
     377             : 
     378             : static inline void
     379           0 : finalize_new_cluster_contact_info( fd_shred_ctx_t * ctx ) {
     380           0 :   fd_stake_ci_dest_add_fini( ctx->stake_ci, ctx->new_dest_cnt );
     381           0 : }
     382             : 
     383             : static inline int
     384             : before_frag( fd_shred_ctx_t * ctx,
     385             :              ulong            in_idx,
     386             :              ulong            seq,
     387           0 :              ulong            sig ) {
     388           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_IPECHO ) ) {
     389           0 :     FD_TEST( sig!=0UL && sig<=USHORT_MAX );
     390           0 :     fd_shredder_set_shred_version    ( ctx->shredder, (ushort)sig );
     391           0 :     fd_fec_resolver_set_shred_version( ctx->resolver, (ushort)sig );
     392           0 :     return 1;
     393           0 :   }
     394             : 
     395           0 :   if( FD_UNLIKELY( !ctx->shredder->shred_version ) ) return -1;
     396             : 
     397           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH ) ) {
     398           0 :     ctx->poh_in_expect_seq = seq+1UL;
     399           0 :     return (int)(fd_disco_poh_sig_pkt_type( sig )!=POH_PKT_TYPE_MICROBLOCK) &
     400           0 :            (int)(fd_disco_poh_sig_pkt_type( sig )!=POH_PKT_TYPE_FEAT_ACT_SLOT) &
     401           0 :            (int)(fd_disco_poh_sig_pkt_type( sig )!=POH_PKT_TYPE_LEADER_BANK);
     402           0 :   }
     403           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
     404           0 :     return (int)(fd_disco_netmux_sig_proto( sig )!=DST_PROTO_SHRED) & (int)(fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR);
     405           0 :   }
     406           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ){
     407           0 :     return sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO &&
     408           0 :            sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE;
     409           0 :   }
     410           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_ROOTEDH ) ) {
     411           0 :     return sig!=0UL; /* only care about rooted banks, not completed blockhash */
     412           0 :   }
     413           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_ROOTED ) ) {
     414           0 :     return sig!=FD_TOWER_SIG_SLOT_ROOTED; /* only care about slot_confirmed messages */
     415           0 :   }
     416           0 :   return 0;
     417           0 : }
     418             : 
     419             : static void
     420             : during_frag( fd_shred_ctx_t * ctx,
     421             :              ulong            in_idx,
     422             :              ulong            seq FD_PARAM_UNUSED,
     423             :              ulong            sig,
     424             :              ulong            chunk,
     425             :              ulong            sz,
     426           0 :              ulong            ctl ) {
     427             : 
     428           0 :   ctx->skip_frag = 0;
     429             : 
     430           0 :   ctx->tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
     431             : 
     432           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPAIR ) ) {
     433           0 :     if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>FD_NET_MTU ) )
     434           0 :     FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     435           0 :                 ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     436             : 
     437           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     438           0 :     fd_memcpy( ctx->shred_buffer, dcache_entry, sz );
     439           0 :     return;
     440           0 :   }
     441             : 
     442           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_CONTACT ) ) {
     443           0 :     if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
     444           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     445           0 :                    ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     446             : 
     447           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     448           0 :     handle_new_cluster_contact_info( ctx, dcache_entry );
     449           0 :     return;
     450           0 :   }
     451             : 
     452           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) {
     453           0 :     if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>sizeof(fd_gossip_update_message_t) ) )
     454           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     455           0 :                    ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     456           0 :     uchar const * gossip_upd_msg = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     457           0 :     fd_memcpy( ctx->gossip_upd_buf, gossip_upd_msg, sz );
     458           0 :     return;
     459           0 :   }
     460             : 
     461             :   /* Firedancer only */
     462           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_ROOTED ) ) {
     463           0 :     if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz<sizeof(fd_tower_slot_rooted_t) ) )
     464           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     465           0 :                    ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     466           0 :     fd_tower_slot_rooted_t const * rooted_msg = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     467           0 :     ctx->new_root = rooted_msg->slot;
     468           0 :     return;
     469           0 :   }
     470             : 
     471             :   /* Frankendancer only */
     472           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_ROOTEDH ) ) {
     473           0 :     if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz<sizeof(fd_rooted_bank_t) ) )
     474           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     475           0 :                    ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     476             :     /* The message format is a pointer to the bank (which is in the
     477             :        agave address space, so we couldn't access it even if we wanted
     478             :        to) followed by the rooted slot. */
     479           0 :     ulong const * replay_msg = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     480           0 :     ctx->new_root = replay_msg[ 1 ];
     481           0 :     return;
     482           0 :   }
     483             : 
     484             :   /* Firedancer only */
     485           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_EPOCH ) ) {
     486           0 :     if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
     487           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     488           0 :                    ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     489             : 
     490           0 :     uchar const *               dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     491           0 :     fd_epoch_info_msg_t const * epoch_msg    = fd_type_pun_const( dcache_entry );
     492             : 
     493           0 :     FD_TEST( epoch_msg->staked_vote_cnt<=MAX_COMPRESSED_STAKE_WEIGHTS );
     494           0 :     FD_TEST( epoch_msg->staked_id_cnt<=MAX_SHRED_DESTS );
     495             : 
     496           0 :     fd_stake_ci_epoch_msg_init( ctx->stake_ci, epoch_msg );
     497             : 
     498           0 :     *ctx->epoch_schedule                                = epoch_msg->epoch_schedule;
     499           0 :     ctx->features_activation->enforce_fixed_fec_set     = fd_shred_get_feature_activation_slot0(
     500           0 :       epoch_msg->features.enforce_fixed_fec_set, ctx );
     501           0 :     ctx->features_activation->discard_unexpected_data_complete_shreds = fd_shred_get_feature_activation_slot0(
     502           0 :       epoch_msg->features.discard_unexpected_data_complete_shreds, ctx );
     503             : 
     504           0 :     fd_fec_resolver_set_discard_unexpected_data_complete_shreds( ctx->resolver,
     505           0 :       ctx->features_activation->discard_unexpected_data_complete_shreds );
     506             : 
     507           0 :     return;
     508           0 :   }
     509             : 
     510           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_STAKE ) ) {
     511           0 :     if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
     512           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     513           0 :                    ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     514             : 
     515           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     516           0 :     fd_stake_ci_stake_msg_init( ctx->stake_ci, fd_type_pun_const( dcache_entry ) );
     517           0 :     return;
     518           0 :   }
     519             : 
     520           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH ) ) {
     521           0 :     ctx->send_fec_set_cnt = 0UL;
     522             : 
     523           0 :     if( FD_UNLIKELY( fd_disco_poh_sig_pkt_type( sig )==POH_PKT_TYPE_LEADER_BANK ) ) {
     524             :       /* Only one tile needs to act on this. Other tiles see the frag but
     525             :          ignore it. */
     526           0 :       if( ctx->round_robin_id!=0UL ) return;
     527             :       /* poh is handing off a leader bank pointer for a slot we may
     528             :          be leader for.  Copy the pointer out of the dcache for
     529             :          after_frag to attach to the FEC sets for this slot. */
     530           0 :       if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz!=sizeof(void const *) ) )
     531           0 :         FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     532           0 :               ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     533             : 
     534           0 :       void const * const * src = (void const * const *)fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     535           0 :       ctx->leader_bank = *src;
     536           0 :       return;
     537           0 :     }
     538             : 
     539           0 :     if( FD_UNLIKELY( (fd_disco_poh_sig_pkt_type( sig )==POH_PKT_TYPE_FEAT_ACT_SLOT) ) ) {
     540             :       /* There is a subset of FD_SHRED_FEATURES_ACTIVATION_... slots that
     541             :           the shred tile needs to be aware of.  Since this requires the
     542             :           bank, we are forced (so far) to receive them from the poh tile
     543             :           (as a POH_PKT_TYPE_FEAT_ACT_SLOT). */
     544           0 :       uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     545           0 :       if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz!=(sizeof(fd_shred_features_activation_t)) ) )
     546           0 :         FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     547           0 :               ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     548             : 
     549           0 :       fd_shred_features_activation_t const * act_data = (fd_shred_features_activation_t const *)dcache_entry;
     550           0 :       memcpy( ctx->features_activation, act_data, sizeof(fd_shred_features_activation_t) );
     551             : 
     552           0 :       fd_fec_resolver_set_discard_unexpected_data_complete_shreds( ctx->resolver,
     553           0 :         ctx->features_activation->discard_unexpected_data_complete_shreds );
     554           0 :     }
     555           0 :     else { /* (fd_disco_poh_sig_pkt_type( sig )==POH_PKT_TYPE_MICROBLOCK) */
     556             :       /* This is a frag from the PoH tile.  We'll copy it to our pending
     557             :         microblock batch and shred it if necessary (last in block or
     558             :         above watermark).  We just go ahead and shred it here, even
     559             :         though we may get overrun.  If we do end up getting overrun, we
     560             :         just won't send these shreds out and we'll reuse the FEC set for
     561             :         the next one.  From a higher level though, if we do get overrun,
     562             :         a bunch of shreds will never be transmitted, and we'll end up
     563             :         producing a block that never lands on chain. */
     564             : 
     565           0 :       uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     566           0 :       if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>FD_POH_SHRED_MTU ||
     567           0 :           sz<(sizeof(fd_entry_batch_meta_t)+sizeof(fd_entry_batch_header_t)) ) )
     568           0 :         FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     569           0 :               ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     570             : 
     571           0 :       fd_entry_batch_meta_t const * entry_meta = (fd_entry_batch_meta_t const *)dcache_entry;
     572           0 :       uchar const *                 entry      = dcache_entry + sizeof(fd_entry_batch_meta_t);
     573           0 :       ulong                         entry_sz   = sz           - sizeof(fd_entry_batch_meta_t);
     574             : 
     575           0 :       fd_entry_batch_header_t const * microblock = (fd_entry_batch_header_t const *)entry;
     576             : 
     577             :       /* It should never be possible for this to fail, but we check it
     578             :         anyway. */
     579           0 :       FD_TEST( entry_sz + ctx->pending_batch.pos <= sizeof(ctx->pending_batch.payload) );
     580             : 
     581           0 :       ulong target_slot = fd_disco_poh_sig_slot( sig );
     582           0 :       if( FD_UNLIKELY( (ctx->pending_batch.microblock_cnt>0) & (ctx->pending_batch.slot!=target_slot) ) ) {
     583             :         /* TODO: The Agave client sends a dummy entry batch with only 1
     584             :           byte and the block-complete bit set.  This helps other
     585             :           validators know that the block is dead and they should not try
     586             :           to continue building a fork on it.  We probably want a similar
     587             :           approach eventually. */
     588           0 :         FD_LOG_WARNING(( "Abandoning %lu microblocks for slot %lu and switching to slot %lu",
     589           0 :               ctx->pending_batch.microblock_cnt, ctx->pending_batch.slot, target_slot ));
     590           0 :         ctx->pending_batch.slot           = 0UL;
     591           0 :         ctx->pending_batch.pos            = 0UL;
     592           0 :         ctx->pending_batch.microblock_cnt = 0UL;
     593           0 :         ctx->pending_batch.txn_cnt        = 0UL;
     594           0 :         ctx->batch_cnt                    = 0UL;
     595             : 
     596           0 :         FD_MCNT_INC( SHRED, MICROBLOCK_ABANDONED, 1UL );
     597           0 :       }
     598             : 
     599           0 :       ctx->pending_batch.slot = target_slot;
     600             :       /* We want to send out some shreds immediately when we start a new
     601             :          slot to help with leader targeting. */
     602           0 :       int new_slot = 0;
     603           0 :       if( FD_UNLIKELY( target_slot!=ctx->slot )) {
     604             :         /* Reset batch count if we are in a new slot */
     605           0 :         ctx->batch_cnt = 0UL;
     606           0 :         ctx->slot      = target_slot;
     607           0 :         new_slot       = 1;
     608             : 
     609             :         /* At the beginning of a new slot, prepare chained_merkle_root.
     610             :            chained_merkle_root is initialized at the block_id of the parent
     611             :            block, there's two cases:
     612             : 
     613             :            1. block_id is passed in by the poh tile:
     614             :               - it's always passed when parent block had a different leader
     615             :               - it may be passed when we were leader for parent block (there
     616             :                 are race conditions when it's not passed)
     617             : 
     618             :            2. block_id is taken from block_ids table if we were the leader
     619             :               for the parent block (when we were NOT the leader, because of
     620             :               equivocation, we can't store block_id in the table)
     621             : 
     622             :            chained_merkle_root is stored in block_ids table at target_slot
     623             :            and it's progressively updated as more microblocks are received.
     624             :            As a result, when we move to a new slot, the block_ids table at
     625             :            the old slot will contain the block_id.
     626             : 
     627             :            The block_ids table is designed to protect against the race condition
     628             :            case in 1., therefore the table may not be set in some cases, e.g. if
     629             :            a validator (re)starts, but in those cases we don't expect the race
     630             :            condition to apply. */
     631           0 :         ctx->chained_merkle_root = ctx->block_ids[ target_slot % BLOCK_IDS_TABLE_CNT ];
     632           0 :         if( FD_UNLIKELY( SHOULD_PROCESS_THESE_SHREDS ) ) {
     633           0 :           if( FD_LIKELY( entry_meta->parent_block_id_valid ) ) {
     634             :             /* 1. Initialize chained_merkle_root sent from poh tile */
     635           0 :             memcpy( ctx->chained_merkle_root, entry_meta->parent_block_id, FD_SHRED_MERKLE_ROOT_SZ );
     636           0 :           } else {
     637           0 :             ulong parent_slot = target_slot - entry_meta->parent_offset;
     638           0 :             fd_epoch_leaders_t const * lsched = fd_stake_ci_get_lsched_for_slot( ctx->stake_ci, parent_slot );
     639           0 :             fd_pubkey_t const * slot_leader = fd_epoch_leaders_get( lsched, parent_slot );
     640             : 
     641           0 :             if( lsched && slot_leader && fd_memeq( slot_leader, ctx->identity_key, sizeof(fd_pubkey_t) ) ) {
     642             :               /* 2. Initialize chained_merkle_root from block_ids table, if we were the leader */
     643           0 :               memcpy( ctx->chained_merkle_root, ctx->block_ids[ parent_slot % BLOCK_IDS_TABLE_CNT ], FD_SHRED_MERKLE_ROOT_SZ );
     644           0 :             } else {
     645             :               /* This should never happen, log a metric and set chained_merkle_root to 0 */
     646           0 :               ctx->metrics->invalid_block_id_cnt++;
     647           0 :               memset( ctx->chained_merkle_root, 0, FD_SHRED_MERKLE_ROOT_SZ );
     648           0 :             }
     649           0 :           }
     650           0 :         }
     651           0 :       }
     652             : 
     653           0 :       if( FD_LIKELY( !SHOULD_PROCESS_THESE_SHREDS ) ) {
     654             :         /* If we are not processing this batch, filter in after_frag. */
     655           0 :         ctx->skip_frag = 1;
     656           0 :       }
     657             : 
     658           0 :       ulong   pending_batch_wmark = FD_SHRED_BATCH_WMARK_CHAINED;
     659           0 :       uchar * chained_merkle_root = ctx->chained_merkle_root;
     660           0 :       ulong   load_for_32_shreds  = FD_SHREDDER_CHAINED_FEC_SET_PAYLOAD_SZ;
     661             :       /* All fec sets in the last batch of a block need to be resigned.
     662             :          This needs to match Agave's behavior - as a reference, see:
     663             :          https://github.com/anza-xyz/agave/blob/v2.3/ledger/src/shred/merkle.rs#L1040 */
     664           0 :       if( FD_UNLIKELY( entry_meta->block_complete ) ) {
     665           0 :         pending_batch_wmark = FD_SHRED_BATCH_WMARK_RESIGNED;
     666             :         /* chained_merkle_root also applies to resigned FEC sets. */
     667           0 :         load_for_32_shreds = FD_SHREDDER_RESIGNED_FEC_SET_PAYLOAD_SZ;
     668           0 :       }
     669             : 
     670             :       /* If this microblock completes the block, the batch is then
     671             :          finalized here.  Otherwise, we check whether the new entry
     672             :          would exceed the pending_batch_wmark.  If true, then the
     673             :          batch is closed now, shredded, and a new batch is started
     674             :          with the incoming microblock.  If false, no shredding takes
     675             :          place, and the microblock is added to the current batch. */
     676           0 :       int forced_end_batch         = entry_meta->block_complete | new_slot;
     677           0 :       int batch_would_exceed_wmark = ( ctx->pending_batch.pos + entry_sz ) > pending_batch_wmark;
     678           0 :       int include_in_current_batch = forced_end_batch | ( !batch_would_exceed_wmark );
     679           0 :       int process_current_batch    = forced_end_batch | batch_would_exceed_wmark;
     680           0 :       int init_new_batch           = !include_in_current_batch;
     681             : 
     682           0 :       if( FD_LIKELY( include_in_current_batch ) ) {
     683           0 :         if( FD_UNLIKELY( SHOULD_PROCESS_THESE_SHREDS ) ) {
     684             :           /* Ugh, yet another memcpy */
     685           0 :           fd_memcpy( ctx->pending_batch.payload + ctx->pending_batch.pos, entry, entry_sz );
     686           0 :         }
     687           0 :         ctx->pending_batch.pos            += entry_sz;
     688           0 :         ctx->pending_batch.microblock_cnt += 1UL;
     689           0 :         ctx->pending_batch.txn_cnt        += microblock->txn_cnt;
     690           0 :       }
     691             : 
     692           0 :       if( FD_LIKELY( process_current_batch )) {
     693             :         /* Batch and padding size calculation. */
     694           0 :         ulong batch_sz        = sizeof(ulong) + ctx->pending_batch.pos; /* without padding */
     695           0 :         ulong batch_sz_padded = load_for_32_shreds * ( ( batch_sz + load_for_32_shreds - 1UL ) / load_for_32_shreds );
     696           0 :         ulong padding_sz      = batch_sz_padded - batch_sz;
     697             : 
     698           0 :         if( FD_UNLIKELY( SHOULD_PROCESS_THESE_SHREDS ) ) {
     699             :           /* If it's our turn, shred this batch. FD_UNLIKELY because shred
     700             :              tile cnt generally >= 2 */
     701             : 
     702           0 :           long shredding_timing = -fd_tickcount();
     703             : 
     704           0 :           fd_memset( ctx->pending_batch.payload + ctx->pending_batch.pos, 0, padding_sz );
     705             : 
     706           0 :           ctx->send_fec_set_cnt = 0UL; /* verbose */
     707           0 :           ctx->shredded_txn_cnt = ctx->pending_batch.txn_cnt;
     708             : 
     709           0 :           fd_shredder_init_batch( ctx->shredder, ctx->pending_batch.raw, batch_sz_padded, target_slot, entry_meta );
     710             : 
     711           0 :           ulong pend_sz  = batch_sz_padded;
     712           0 :           ulong pend_idx = 0;
     713           0 :           while( pend_sz > 0UL ) {
     714             : 
     715           0 :             fd_fec_set_t * out = ctx->fec_sets + ctx->shredder_fec_set_idx;
     716             : 
     717           0 :             FD_TEST( fd_shredder_next_fec_set( ctx->shredder, out, chained_merkle_root ) );
     718           0 :             memcpy( ctx->out_merkle_roots[pend_idx].hash, chained_merkle_root, 32UL );
     719             : 
     720           0 :             out->data_shred_rcvd   = 0U;
     721           0 :             out->parity_shred_rcvd = 0U;
     722             : 
     723           0 :             ctx->send_fec_set_idx[ ctx->send_fec_set_cnt ] = ctx->shredder_fec_set_idx;
     724           0 :             ctx->send_fec_set_cnt += 1UL;
     725           0 :             ctx->shredder_fec_set_idx = (ctx->shredder_fec_set_idx+1UL)%ctx->shredder_max_fec_set_idx;
     726             : 
     727           0 :             pend_sz -= load_for_32_shreds;
     728           0 :             pend_idx++;
     729           0 :           }
     730             : 
     731           0 :           fd_shredder_fini_batch( ctx->shredder );
     732           0 :           shredding_timing += fd_tickcount();
     733             : 
     734             :           /* Update metrics */
     735           0 :           fd_histf_sample( ctx->metrics->batch_sz,             batch_sz /* without padding */    );
     736           0 :           fd_histf_sample( ctx->metrics->batch_microblock_cnt, ctx->pending_batch.microblock_cnt );
     737           0 :           fd_histf_sample( ctx->metrics->shredding_timing,     (ulong)shredding_timing           );
     738           0 :         } else {
     739           0 :           ctx->send_fec_set_cnt = 0UL; /* verbose */
     740             : 
     741           0 :           fd_shredder_skip_batch( ctx->shredder, batch_sz_padded, target_slot, entry_meta->block_complete );
     742           0 :         }
     743             : 
     744           0 :         ctx->pending_batch.slot           = 0UL;
     745           0 :         ctx->pending_batch.pos            = 0UL;
     746           0 :         ctx->pending_batch.microblock_cnt = 0UL;
     747           0 :         ctx->pending_batch.txn_cnt        = 0UL;
     748           0 :         ctx->batch_cnt++;
     749           0 :       }
     750             : 
     751           0 :       if( FD_UNLIKELY( init_new_batch ) ) {
     752             :         /* TODO: this assumes that SHOULD_PROCESS_THESE_SHREDS is
     753             :            constant across batches.  Otherwise, the condition may
     754             :            need to be removed (or adjusted). */
     755           0 :         if( FD_UNLIKELY( SHOULD_PROCESS_THESE_SHREDS ) ) {
     756             :           /* Ugh, yet another memcpy */
     757           0 :           fd_memcpy( ctx->pending_batch.payload + 0UL /* verbose */, entry, entry_sz );
     758           0 :         }
     759           0 :         ctx->pending_batch.slot           = target_slot;
     760           0 :         ctx->pending_batch.pos            = entry_sz;
     761           0 :         ctx->pending_batch.microblock_cnt = 1UL;
     762           0 :         ctx->pending_batch.txn_cnt        = microblock->txn_cnt;
     763           0 :       }
     764           0 :     }
     765           0 :   } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
     766             :     /* The common case, from the net tile.  The FEC resolver API does
     767             :        not present a prepare/commit model. If we get overrun between
     768             :        when the FEC resolver verifies the signature and when it stores
     769             :        the local copy, we could end up storing and retransmitting
     770             :        garbage.  Instead we copy it locally, sadly, and only give it to
     771             :        the FEC resolver when we know it won't be overrun anymore. */
     772           0 :     uchar const * dcache_entry = fd_net_rx_translate_frag( &ctx->in[ in_idx ].net_rx, chunk, ctl, sz );
     773           0 :     ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
     774           0 :     FD_TEST( hdr_sz <= sz ); /* Should be ensured by the net tile */
     775           0 :     fd_shred_t const * shred = fd_shred_parse( dcache_entry+hdr_sz, sz-hdr_sz, ctx->shred_limit );
     776           0 :     if( FD_UNLIKELY( !shred ) ) {
     777           0 :       ctx->skip_frag = 1;
     778           0 :       return;
     779           0 :     };
     780             : 
     781           0 :     if( FD_UNLIKELY( fd_disco_netmux_sig_proto( sig )==DST_PROTO_REPAIR ) ) {
     782           0 :       ctx->metrics->repair_rcv_cnt++;
     783           0 :       ctx->metrics->repair_rcv_bytes += sz;
     784           0 :     } else {
     785           0 :       ctx->metrics->turbine_rcv_cnt++;
     786           0 :       ctx->metrics->turbine_rcv_bytes += sz;
     787           0 :     }
     788             : 
     789             :     /* Drop unchained merkle shreds */
     790           0 :     int is_unchained = !fd_shred_is_chained( fd_shred_type( shred->variant ) );
     791           0 :     if( FD_UNLIKELY( is_unchained ) ) {
     792           0 :       ctx->metrics->shred_rejected_unchained_cnt++;
     793           0 :       ctx->skip_frag = 1;
     794           0 :       return;
     795           0 :     };
     796             : 
     797             :     /* all shreds in the same FEC set will have the same signature
     798             :        so we can round-robin shreds between the shred tiles based on
     799             :        just the signature without splitting individual FEC sets. */
     800           0 :     ulong sig = fd_ulong_load_8( shred->signature );
     801           0 :     if( FD_LIKELY( sig%ctx->round_robin_cnt!=ctx->round_robin_id ) ) {
     802           0 :       ctx->skip_frag = 1;
     803           0 :       return;
     804           0 :     }
     805           0 :     fd_memcpy( ctx->shred_buffer, dcache_entry+hdr_sz, sz-hdr_sz );
     806           0 :     ctx->shred_buffer_sz = sz-hdr_sz;
     807           0 :   }
     808           0 : }
     809             : 
     810             : static inline void
     811             : send_shred( fd_shred_ctx_t                 * ctx,
     812             :             fd_stem_context_t              * stem,
     813             :             fd_shred_t const               * shred,
     814             :             fd_shred_dest_weighted_t const * dest,
     815           0 :             ulong                            tsorig ) {
     816             : 
     817           0 :   if( FD_UNLIKELY( !dest->ip4 ) ) return;
     818             : 
     819           0 :   uchar * packet = fd_chunk_to_laddr( ctx->net_out_mem, ctx->net_out_chunk );
     820             : 
     821           0 :   int is_data = fd_shred_is_data( fd_shred_type( shred->variant ) );
     822           0 :   fd_ip4_udp_hdrs_t * hdr  = (fd_ip4_udp_hdrs_t *)packet;
     823           0 :   *hdr = *( is_data ? ctx->data_shred_net_hdr : ctx->parity_shred_net_hdr );
     824             : 
     825           0 :   fd_ip4_hdr_t * ip4 = hdr->ip4;
     826           0 :   ip4->daddr  = dest->ip4;
     827           0 :   ip4->net_id = fd_ushort_bswap( ctx->net_id++ );
     828           0 :   ip4->check  = 0U;
     829           0 :   ip4->check  = fd_ip4_hdr_check_fast( ip4 );
     830             : 
     831           0 :   hdr->udp->net_dport = fd_ushort_bswap( dest->port );
     832             : 
     833           0 :   ulong shred_sz = fd_ulong_if( is_data, FD_SHRED_MIN_SZ, FD_SHRED_MAX_SZ );
     834           0 : #if FD_HAS_AVX
     835             :   /* We're going to copy this shred potentially a bunch of times without
     836             :      reading it again, and we'd rather not thrash our cache, so we want
     837             :      to use non-temporal writes here.  We need to make sure we don't
     838             :      touch the cache line containing the network headers that we just
     839             :      wrote to though.  We know the destination is 64 byte aligned.  */
     840           0 :   FD_STATIC_ASSERT( sizeof(*hdr)<64UL, non_temporal );
     841             :   /* src[0:sizeof(hdrs)] is invalid, but now we want to copy
     842             :      dest[i]=src[i] for i>=sizeof(hdrs), so it simplifies the code. */
     843           0 :   uchar const * src = (uchar const *)((ulong)shred - sizeof(fd_ip4_udp_hdrs_t));
     844           0 :   memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), src+sizeof(fd_ip4_udp_hdrs_t), 64UL-sizeof(fd_ip4_udp_hdrs_t) );
     845             : 
     846           0 :   ulong end_offset = shred_sz + sizeof(fd_ip4_udp_hdrs_t);
     847           0 :   ulong i;
     848           0 :   for( i=64UL; end_offset-i<64UL; i+=64UL ) {
     849           0 : #  if FD_HAS_AVX512
     850           0 :     _mm512_stream_si512( (void *)(packet+i     ), _mm512_loadu_si512( (void const *)(src+i     ) ) );
     851             : #  else
     852           0 :     _mm256_stream_si256( (void *)(packet+i     ), _mm256_loadu_si256( (void const *)(src+i     ) ) );
     853           0 :     _mm256_stream_si256( (void *)(packet+i+32UL), _mm256_loadu_si256( (void const *)(src+i+32UL) ) );
     854           0 : #  endif
     855           0 :   }
     856           0 :   _mm_sfence();
     857           0 :   fd_memcpy( packet+i, src+i, end_offset-i ); /* Copy the last partial cache line */
     858             : 
     859             : #else
     860             :   fd_memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), shred, shred_sz );
     861             : #endif
     862             : 
     863           0 :   ulong pkt_sz = shred_sz + sizeof(fd_ip4_udp_hdrs_t);
     864           0 :   ulong tspub  = fd_frag_meta_ts_comp( fd_tickcount() );
     865           0 :   ulong sig    = fd_disco_netmux_sig( dest->ip4, dest->port, dest->ip4, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) );
     866           0 :   ulong const chunk = ctx->net_out_chunk;
     867           0 :   fd_stem_publish( stem, NET_OUT_IDX, sig, chunk, pkt_sz, 0UL, tsorig, tspub );
     868           0 :   ctx->net_out_chunk = fd_dcache_compact_next( chunk, pkt_sz, ctx->net_out_chunk0, ctx->net_out_wmark );
     869           0 : }
     870             : 
     871             : static void
     872             : after_frag( fd_shred_ctx_t *    ctx,
     873             :             ulong               in_idx,
     874             :             ulong               seq,
     875             :             ulong               sig,
     876             :             ulong               sz,
     877             :             ulong               tsorig,
     878             :             ulong               _tspub,
     879           0 :             fd_stem_context_t * stem ) {
     880           0 :   (void)seq;
     881           0 :   (void)sz;
     882           0 :   (void)tsorig;
     883           0 :   (void)_tspub;
     884             : 
     885           0 :   if( FD_UNLIKELY( ctx->skip_frag ) ) return;
     886             : 
     887           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_CONTACT ) ) {
     888           0 :     finalize_new_cluster_contact_info( ctx );
     889           0 :     return;
     890           0 :   }
     891             : 
     892           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_EPOCH ) ) {
     893           0 :     fd_stake_ci_epoch_msg_fini( ctx->stake_ci );
     894           0 :     return;
     895           0 :   }
     896             : 
     897           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_STAKE ) ) {
     898           0 :     fd_stake_ci_stake_msg_fini( ctx->stake_ci );
     899           0 :     return;
     900           0 :   }
     901             : 
     902           0 :   if( FD_UNLIKELY( (ctx->in_kind[ in_idx ]==IN_KIND_ROOTED) | (ctx->in_kind[ in_idx ]==IN_KIND_ROOTEDH) ) ) {
     903           0 :     if( FD_LIKELY( (ctx->new_root > 0UL) & (ctx->new_root<ULONG_MAX) ) ) fd_fec_resolver_advance_slot_old( ctx->resolver, ctx->new_root );
     904           0 :     return;
     905           0 :   }
     906             : 
     907           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) {
     908           0 :     if( ctx->gossip_upd_buf->tag==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO ) {
     909           0 :       fd_gossip_contact_info_t const * ci = ctx->gossip_upd_buf->contact_info->value;
     910           0 :       fd_ip4_port_t tvu_addr;
     911           0 :       tvu_addr.addr = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_TVU ].is_ipv6 ? 0U : ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_TVU ].ip4;
     912           0 :       tvu_addr.port = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_TVU ].port;
     913           0 :       if( !tvu_addr.l ){
     914           0 :         fd_stake_ci_dest_remove( ctx->stake_ci, fd_type_pun_const( ctx->gossip_upd_buf->origin ) );
     915           0 :       } else {
     916           0 :         fd_stake_ci_dest_update( ctx->stake_ci, fd_type_pun_const( ctx->gossip_upd_buf->origin ), tvu_addr.addr, fd_ushort_bswap( tvu_addr.port ) );
     917           0 :       }
     918           0 :     } else if( ctx->gossip_upd_buf->tag==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE ) {
     919           0 :       if( FD_UNLIKELY( !memcmp( ctx->identity_key->uc, ctx->gossip_upd_buf->origin, 32UL ) ) ) {
     920             :         /* If our own contact info was dropped, we update with dummy IP
     921             :            instead of removing since stake_ci expects our contact info
     922             :            in the sdests table all the time. fd_stake_ci_new initializes
     923             :            both ei->sdests with our contact info so this should always
     924             :            update (and not append). */
     925           0 :         fd_stake_ci_dest_update( ctx->stake_ci, fd_type_pun_const( ctx->gossip_upd_buf->origin ), 1U, 0U );
     926           0 :       } else {
     927           0 :         fd_stake_ci_dest_remove( ctx->stake_ci, fd_type_pun_const( ctx->gossip_upd_buf->origin ) );
     928           0 :       }
     929           0 :     }
     930           0 :     return;
     931           0 :   }
     932             : 
     933           0 :   if( FD_UNLIKELY( (ctx->in_kind[ in_idx ]==IN_KIND_POH) & (ctx->send_fec_set_cnt==0UL) ) ) {
     934             :     /* Entry from PoH that didn't trigger a new FEC set to be made */
     935           0 :     return;
     936           0 :   }
     937           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPAIR ) ) {
     938           0 :     return;
     939           0 :   }
     940             : 
     941           0 :   ulong fanout = 200UL; /* Default Agave's DATA_PLANE_FANOUT = 200UL */
     942             : 
     943           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
     944           0 :     uchar * shred_buffer    = ctx->shred_buffer;
     945           0 :     ulong   shred_buffer_sz = ctx->shred_buffer_sz;
     946             : 
     947           0 :     fd_shred_t const * shred = fd_shred_parse( shred_buffer, shred_buffer_sz, ctx->shred_limit );
     948             : 
     949           0 :     if( FD_UNLIKELY( !shred       ) ) { ctx->metrics->shred_processing_result[ 1 ]++; return; }
     950             : 
     951           0 :     fd_epoch_leaders_t const * lsched = fd_stake_ci_get_lsched_for_slot( ctx->stake_ci, shred->slot );
     952           0 :     if( FD_UNLIKELY( !lsched      ) ) { ctx->metrics->shred_processing_result[ 0 ]++; return; }
     953             : 
     954           0 :     fd_pubkey_t const * slot_leader = fd_epoch_leaders_get( lsched, shred->slot );
     955           0 :     if( FD_UNLIKELY( !slot_leader ) ) { ctx->metrics->shred_processing_result[ 0 ]++; return; } /* Count this as bad slot too */
     956             : 
     957           0 :     fd_fec_set_t const * out_fec_set[1];
     958           0 :     fd_shred_t const   * out_shred[1];
     959           0 :     fd_fec_resolver_spilled_t spilled_fec = { 0 };
     960           0 :     int from_repair = 0;
     961             : 
     962           0 :     uint nonce = UINT_MAX;
     963           0 :     ulong shred_sz = fd_shred_sz( shred );
     964           0 :     if( FD_UNLIKELY( (fd_disco_netmux_sig_proto( sig )==DST_PROTO_REPAIR) & (shred_buffer_sz>=shred_sz+sizeof(uint)) ) ) {
     965           0 :       nonce = FD_LOAD(uint, shred_buffer + shred_sz );
     966           0 :       long est_now_ns = fd_log_wallclock(); /* TODO: switch to fd_clock for performance */
     967           0 :       int  slot_complete = fd_shred_is_data( fd_shred_type( shred->variant ) ) && (shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE);
     968           0 :       int  nonce_okay = fd_rnonce_ss_verify( ctx->repair_nonce_ss, nonce, shred->slot, shred->idx, slot_complete, est_now_ns );
     969           0 :       ctx->metrics->bad_nonce += (ulong)(!nonce_okay);
     970           0 :       from_repair = nonce_okay;
     971           0 :     }
     972             : 
     973           0 :     long add_shred_timing  = -fd_tickcount();
     974           0 :     int rv = fd_fec_resolver_add_shred( ctx->resolver, shred, shred_buffer_sz, from_repair, slot_leader->uc, out_fec_set, out_shred, &ctx->out_merkle_roots[0], &spilled_fec );
     975           0 :     add_shred_timing      +=  fd_tickcount();
     976             : 
     977           0 :     fd_histf_sample( ctx->metrics->add_shred_timing, (ulong)add_shred_timing );
     978           0 :     ctx->metrics->shred_processing_result[ rv + FD_FEC_RESOLVER_ADD_SHRED_RETVAL_OFF+FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT ]++;
     979             : 
     980           0 :     if( FD_UNLIKELY( ctx->shred_out_idx!=ULONG_MAX &&  /* Only send to repair in full Firedancer */
     981           0 :                      spilled_fec.slot!=0 ) ) {
     982             :       /* We've spilled an in-progress FEC set in the fec_resolver. We
     983             :          need to let repair know to clear out it's cached info for that
     984             :          fec set and re-repair those shreds. */
     985           0 :       fd_fec_evicted_t * evicted_msg = (fd_fec_evicted_t *)fd_type_pun( fd_chunk_to_laddr( ctx->shred_out_mem, ctx->shred_out_chunk ) );
     986           0 :       evicted_msg->slot        = spilled_fec.slot;
     987           0 :       evicted_msg->fec_set_idx = spilled_fec.fec_set_idx;
     988             : 
     989           0 :       fd_stem_publish( stem, ctx->shred_out_idx, SHRED_SIG_FEC_EVICTED, ctx->shred_out_chunk, sizeof(fd_fec_evicted_t), 0, ctx->tsorig, fd_frag_meta_ts_comp( fd_tickcount() ) );
     990           0 :       ctx->shred_out_chunk = fd_dcache_compact_next( ctx->shred_out_chunk, sizeof(fd_fec_evicted_t), ctx->shred_out_chunk0, ctx->shred_out_wmark );
     991           0 :     }
     992             : 
     993           0 :     if( FD_LIKELY( ctx->shred_out_idx!=ULONG_MAX  /* Only send to repair/replay in full Firedancer */
     994           0 :                    && ( ( rv==FD_FEC_RESOLVER_SHRED_OKAY )
     995           0 :                       | ( rv==FD_FEC_RESOLVER_SHRED_COMPLETES )
     996           0 :                       | ( rv==FD_FEC_RESOLVER_SHRED_DUPLICATE )
     997           0 :                       | ( rv==FD_FEC_RESOLVER_SHRED_EQUIVOC   ) ) ) ) {
     998             : 
     999             :       /* Construct the sig from fec_resolver result and shred source. */
    1000             : 
    1001           0 :       ulong _sig = fd_disco_netmux_sig_proto( sig )==DST_PROTO_REPAIR
    1002           0 :                      ? ( from_repair /*nonce_okay*/ ? SHRED_SIG_SRC_REPAIR : SHRED_SIG_SRC_BAD_REPAIR )
    1003           0 :                      : ( SHRED_SIG_SRC_TURBINE );
    1004           0 :       _sig = ((ulong)rv << 32UL) | _sig;
    1005             : 
    1006             :       /* Copy the full shred into the frag and publish. */
    1007             : 
    1008           0 :       fd_shred_base_t * shred_msg = (fd_shred_base_t *)fd_chunk_to_laddr( ctx->shred_out_mem, ctx->shred_out_chunk );
    1009           0 :       memcpy(  shred_msg->shred_, shred, fd_shred_sz( shred ) );
    1010           0 :       memcpy( &shred_msg->merkle_root, ctx->out_merkle_roots[0].hash, sizeof(fd_hash_t) );
    1011           0 :       if( FD_UNLIKELY( fd_disco_netmux_sig_proto( sig )==DST_PROTO_REPAIR ) ) { shred_msg->rnonce = nonce; }
    1012             : 
    1013           0 :       ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
    1014           0 :       fd_stem_publish( stem, ctx->shred_out_idx, _sig, ctx->shred_out_chunk, sizeof(fd_shred_base_t), 0UL, ctx->tsorig, tspub );
    1015           0 :       ctx->shred_out_chunk = fd_dcache_compact_next( ctx->shred_out_chunk, sizeof(fd_shred_base_t), ctx->shred_out_chunk0, ctx->shred_out_wmark );
    1016           0 :     }
    1017             : 
    1018           0 :     if( FD_LIKELY( fd_disco_netmux_sig_proto( sig ) != DST_PROTO_REPAIR &&
    1019           0 :                  ( (rv==FD_FEC_RESOLVER_SHRED_OKAY) | (rv==FD_FEC_RESOLVER_SHRED_COMPLETES) ) ) ) {
    1020             :       /* Relay this shred */
    1021           0 :       ulong max_dest_cnt[1];
    1022           0 :       do {
    1023             :         /* If we've validated the shred and it COMPLETES but we can't
    1024             :           compute the destination for whatever reason, don't forward
    1025             :           the shred, but still send it to the blockstore. */
    1026           0 :         fd_shred_dest_t * sdest = fd_stake_ci_get_sdest_for_slot( ctx->stake_ci, shred->slot );
    1027           0 :         if( FD_UNLIKELY( !sdest ) ) break;
    1028           0 :         fd_shred_dest_idx_t * dests = fd_shred_dest_compute_children( sdest, &shred, 1UL, ctx->scratchpad_dests, 1UL, fanout, fanout, max_dest_cnt );
    1029           0 :         if( FD_UNLIKELY( !dests ) ) break;
    1030             : 
    1031           0 :         for( ulong i=0UL; i<ctx->adtl_dests_retransmit_cnt; i++ ) send_shred( ctx, stem, *out_shred, ctx->adtl_dests_retransmit+i, ctx->tsorig );
    1032           0 :         for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, stem, *out_shred, fd_shred_dest_idx_to_dest( sdest, dests[ j ] ), ctx->tsorig );
    1033           0 :       } while( 0 );
    1034           0 :     }
    1035             : 
    1036           0 :     if( FD_LIKELY( rv!=FD_FEC_RESOLVER_SHRED_COMPLETES ) ) return;
    1037             : 
    1038           0 :     FD_TEST( ctx->fec_sets <= *out_fec_set );
    1039           0 :     ctx->send_fec_set_idx[ 0UL ] = (ulong)(*out_fec_set - ctx->fec_sets);
    1040           0 :     ctx->send_fec_set_cnt = 1UL;
    1041           0 :     ctx->shredded_txn_cnt = 0UL;
    1042           0 :   }
    1043             : 
    1044           0 :   if( FD_UNLIKELY( ctx->send_fec_set_cnt==0UL ) ) return;
    1045             : 
    1046             :   /* Try to distribute shredded txn count across the fec sets.
    1047             :      This is an approximation, but it is acceptable. */
    1048           0 :   ulong shredded_txn_cnt_per_fec_set  = ctx->shredded_txn_cnt / ctx->send_fec_set_cnt;
    1049           0 :   ulong shredded_txn_cnt_remain       = ctx->shredded_txn_cnt - shredded_txn_cnt_per_fec_set * ctx->send_fec_set_cnt;
    1050           0 :   ulong shredded_txn_cnt_last_fec_set = shredded_txn_cnt_per_fec_set + shredded_txn_cnt_remain;
    1051             : 
    1052             :   /* If this shred completes a FEC set or is part of a microblock from
    1053             :     pack (ie. we're leader), we now have a full FEC set: so we notify
    1054             :     repair and insert into the blockstore, as well as retransmit. */
    1055             : 
    1056           0 :   for( ulong fset_k=0; fset_k<ctx->send_fec_set_cnt; fset_k++ ) {
    1057             : 
    1058           0 :     fd_fec_set_t * set = ctx->fec_sets + ctx->send_fec_set_idx[ fset_k ];
    1059             : 
    1060           0 :     fd_shred_t const * last = set->data_shreds[ FD_FEC_SHRED_CNT - 1 ].s;
    1061             : 
    1062             :     /* Compute merkle root and chained merkle root. */
    1063             : 
    1064           0 :     int replay_fwd = 1;
    1065           0 :     if( FD_LIKELY( ctx->store ) ) { /* firedancer-only */
    1066             : 
    1067           0 :       set->leader_bank = NULL; /* un-used by firedancer */
    1068             : 
    1069             :       /* Insert shreds into the store. We do this regardless of whether
    1070             :          we are leader. */
    1071             : 
    1072           0 :       fd_store_fec_t * fec = fd_store_insert( ctx->store, ctx->round_robin_id, (fd_hash_t *)fd_type_pun( &ctx->out_merkle_roots[fset_k] ) );
    1073             : 
    1074             :       /* Firedancer is configured such that the store never fills up, as
    1075             :          the reasm is responsible for also evicting from store (based on
    1076             :          its eviction policy, see fd_reasm.h). fec is only NULL when the
    1077             :          store is full, so this is either a bug or misconfiguration. */
    1078             : 
    1079           0 :       if( FD_UNLIKELY( !fec ) ) FD_LOG_CRIT(( "store full" ));
    1080             : 
    1081             :       /* It's safe to memcpy the FEC payload outside of the shared lock,
    1082             :          because the store ele is guaranteed to remain valid here.  It
    1083             :          is not possible for a fd_store_remove to interleave, because
    1084             :          remove is only called by replay_tile, which (crucially) is only
    1085             :          sent this FEC via stem publish after we have finished copying.
    1086             : 
    1087             :          Copying outside the shared lock scope also means that we can
    1088             :          lower the duration for which the shared lock is held, and
    1089             :          enables replay to acquire the exclusive lock for removes
    1090             :          without getting starved. */
    1091             : 
    1092             :       /* if data_sz is non-zero, we've already inserted this FEC set into the store */
    1093           0 :       if( FD_UNLIKELY( fec->data_sz ) ) replay_fwd = 0;
    1094           0 :       else {
    1095           0 :         for( ulong i=0UL; i<FD_FEC_SHRED_CNT; i++ ) {
    1096           0 :           fd_shred_t * data_shred = set->data_shreds[i].s;
    1097           0 :           ulong        payload_sz = fd_shred_payload_sz( data_shred );
    1098           0 :           if( FD_UNLIKELY( fec->data_sz + payload_sz > ctx->store->fec_data_max ) ) {
    1099             : 
    1100             :             /* This code is only reachable if shred tile has completed the
    1101             :                FEC set, which implies it was able to validate it, yet
    1102             :                somehow the total payload sz of this FEC set exceeds the
    1103             :                maximum payload sz.  This indicates either a serious bug or
    1104             :                shred tile is compromised so FD_LOG_CRIT. */
    1105             : 
    1106           0 :             FD_LOG_CRIT(( "Shred tile %lu: completed FEC set %lu %u data_sz: %lu exceeds data_max: %lu. Ignoring FEC set.", ctx->round_robin_id, data_shred->slot, data_shred->fec_set_idx, fec->data_sz + payload_sz, ctx->store->fec_data_max ));
    1107           0 :           }
    1108           0 :           fd_memcpy( fd_store_fec_data( ctx->store, fec ) + fec->data_sz, fd_shred_data_payload( data_shred ), payload_sz );
    1109           0 :           fec->data_sz += payload_sz;
    1110           0 :           if( FD_LIKELY( i<32UL ) ) fec->shred_offs[ i ] = (uint)payload_sz +  (i==0UL ? 0U : fec->shred_offs[ i-1UL ]);
    1111           0 :         }
    1112           0 :       }
    1113           0 :     }
    1114             : 
    1115           0 :     if( FD_LIKELY( ctx->shred_out_idx!=ULONG_MAX && replay_fwd ) ) { /* firedancer-only */
    1116             : 
    1117             :       /* Send all of the data shred headers we recovered (weren't received) */
    1118           0 :       for( int i=0; i<32; i++ ) {
    1119           0 :         if( fd_uint_extract_bit( set->data_shred_rcvd, i )==0 ) {
    1120           0 :           fd_shred_t * const missing = &set->data_shreds[ i ].s[0];
    1121             : 
    1122           0 :           ulong sig = ((ulong)FD_FEC_RESOLVER_SHRED_COMPLETES << 32UL) | SHRED_SIG_SRC_RECONSTRUCTED;
    1123             : 
    1124           0 :           fd_shred_base_t * shred_msg = (fd_shred_base_t *)fd_chunk_to_laddr( ctx->shred_out_mem, ctx->shred_out_chunk );
    1125           0 :           memcpy(  shred_msg->shred_, missing, fd_shred_sz( missing ) );
    1126           0 :           memcpy( &shred_msg->merkle_root, ctx->out_merkle_roots[fset_k].hash, sizeof(fd_hash_t) );
    1127             : 
    1128           0 :           ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
    1129           0 :           fd_stem_publish( stem, ctx->shred_out_idx, sig, ctx->shred_out_chunk, sizeof(fd_shred_base_t), 0UL, ctx->tsorig, tspub );
    1130           0 :           ctx->shred_out_chunk = fd_dcache_compact_next( ctx->shred_out_chunk, sizeof(fd_shred_base_t), ctx->shred_out_chunk0, ctx->shred_out_wmark );
    1131           0 :         }
    1132           0 :       }
    1133             : 
    1134             :       /* Additionally, publish a frag to notify repair and replay that
    1135             :          the FEC set is complete.  Note the ordering wrt store shred
    1136             :          insertion above is intentional: shreds are inserted into the
    1137             :          store before notifying repair and replay.  This is because the
    1138             :          replay tile assumes the shreds are already in the store when
    1139             :          replay gets a notification from the shred tile that the FEC is
    1140             :          complete.  We we don't know whether shred will finish inserting
    1141             :          into store first or repair will finish validating the FEC set
    1142             :          first.  The header and merkle root of the last shred in the FEC
    1143             :          set are sent as part of this frag.
    1144             : 
    1145             :          This message, the shred msg, and the FEC evict msg constitute
    1146             :          the max 3 possible messages to repair/replay per after_frag.
    1147             :          In reality, it is only possible to publish all 3 in the case
    1148             :          where we receive a coding shred first for a FEC set where
    1149             :          (N=1,K=18), which allows for the FEC set to be instantly
    1150             :          completed by the singular coding shred, and that also happens
    1151             :          to evict a FEC set from the curr_map.  When fix-32 arrives, the
    1152             :          link burst value can be lowered to 2. */
    1153           0 :       ulong sig = ctx->in_kind[ in_idx ]==IN_KIND_POH ? SHRED_SIG_FEC_COMPLETE_LEADER : SHRED_SIG_FEC_COMPLETE;
    1154             : 
    1155           0 :       fd_fec_complete_t * complete_msg = fd_chunk_to_laddr( ctx->shred_out_mem, ctx->shred_out_chunk );
    1156           0 :       complete_msg->last_shred_hdr = *last;
    1157           0 :       memcpy( &complete_msg->merkle_root, ctx->out_merkle_roots[fset_k].hash, sizeof(fd_hash_t) );
    1158           0 :       complete_msg->chained_merkle_root = *(fd_hash_t *)fd_type_pun((uchar *)last + fd_shred_chain_off( last->variant ));
    1159             : 
    1160           0 :       fd_stem_publish( stem, ctx->shred_out_idx, sig, ctx->shred_out_chunk, sizeof(fd_fec_complete_t), 0UL, ctx->tsorig, fd_frag_meta_ts_comp( fd_tickcount() ) );
    1161           0 :       ctx->shred_out_chunk = fd_dcache_compact_next( ctx->shred_out_chunk, sizeof(fd_fec_complete_t), ctx->shred_out_chunk0, ctx->shred_out_wmark );
    1162             : 
    1163           0 :     } else if( FD_UNLIKELY( ctx->store_out_idx != ULONG_MAX ) ) { /* frankendancer-only */
    1164             : 
    1165             :       /* Send to the blockstore */
    1166             : 
    1167           0 :       ulong txn_cnt = fd_ulong_if( fset_k==ctx->send_fec_set_cnt-1UL, shredded_txn_cnt_last_fec_set, shredded_txn_cnt_per_fec_set );
    1168             :       /* If the low 32 bits of sig are 0, the store tile will do extra
    1169             :          checks */
    1170           0 :       ulong new_sig = txn_cnt<<32 | (ulong)(ctx->in_kind[ in_idx ]!=IN_KIND_NET);
    1171             : 
    1172             :       /* Attach the leader bank pointer and merkle root so that the
    1173             :          store tile can set the block_id for the slot.  Network
    1174             :          FEC sets have no leader bank. */
    1175           0 :       if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH ) ) {
    1176           0 :         set->leader_bank = ctx->leader_bank;
    1177           0 :         memcpy( set->merkle_root, ctx->out_merkle_roots[fset_k].hash, 32UL );
    1178           0 :       } else {
    1179           0 :         set->leader_bank = NULL;
    1180           0 :       }
    1181             : 
    1182           0 :       ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
    1183             :       /* The size is actually slightly larger than USHORT_MAX, but the store tile
    1184             :          knows to use sizeof(fd_fec_set_t) instead of the sz field.  Put
    1185             :          USHORT_MAX so that monitoring tools are at least close. */
    1186           0 :       ulong sz = fd_ulong_min( sizeof(fd_fec_set_t), USHORT_MAX );
    1187           0 :       fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, set ), sz, 0UL, ctx->tsorig, tspub );
    1188             : 
    1189             :       /* Store tile will release the bank pointer when it sees
    1190             :          SLOT_COMPLETE FEC set.  So we reset our tracking. */
    1191           0 :       if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH &&
    1192           0 :                        (set->data_shreds[ FD_FEC_SHRED_CNT-1UL ].s->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE) ) ) {
    1193           0 :         ctx->leader_bank  = NULL;
    1194           0 :       }
    1195           0 :     }
    1196             : 
    1197             :     /* Compute all the destinations for all the new shreds */
    1198             : 
    1199           0 :     fd_shred_t const * new_shreds[ FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX ];
    1200           0 :     ulong k=0UL;
    1201           0 :     for( ulong i=0UL; i<FD_FEC_SHRED_CNT; i++ )
    1202           0 :       if( !(set->data_shred_rcvd   & (1U<<i)) ) new_shreds[ k++ ] = set->data_shreds  [ i ].s;
    1203           0 :     for( ulong i=0UL; i<FD_FEC_SHRED_CNT; i++ )
    1204           0 :       if( !(set->parity_shred_rcvd & (1U<<i)) ) new_shreds[ k++ ] = set->parity_shreds[ i ].s;
    1205             : 
    1206           0 :     if( FD_UNLIKELY( !k ) ) return;
    1207           0 :     fd_shred_dest_t * sdest = fd_stake_ci_get_sdest_for_slot( ctx->stake_ci, new_shreds[ 0 ]->slot );
    1208           0 :     if( FD_UNLIKELY( !sdest ) ) return;
    1209             : 
    1210           0 :     ulong out_stride;
    1211           0 :     ulong max_dest_cnt[1];
    1212           0 :     fd_shred_dest_idx_t * dests;
    1213           0 :     if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
    1214           0 :       for( ulong i=0UL; i<k; i++ ) {
    1215           0 :         for( ulong j=0UL; j<ctx->adtl_dests_retransmit_cnt; j++ ) send_shred( ctx, stem, new_shreds[ i ], ctx->adtl_dests_retransmit+j, ctx->tsorig );
    1216           0 :       }
    1217           0 :       out_stride = k;
    1218             :       /* In the case of feature activation, the fanout used below is
    1219             :           the same as the one calculated/modified previously at the
    1220             :           beginning of after_frag() for IN_KIND_NET in this slot. */
    1221           0 :       dests = fd_shred_dest_compute_children( sdest, new_shreds, k, ctx->scratchpad_dests, k, fanout, fanout, max_dest_cnt );
    1222           0 :     } else {
    1223           0 :       for( ulong i=0UL; i<k; i++ ) {
    1224           0 :         for( ulong j=0UL; j<ctx->adtl_dests_leader_cnt; j++ ) send_shred( ctx, stem, new_shreds[ i ], ctx->adtl_dests_leader+j, ctx->tsorig );
    1225           0 :       }
    1226           0 :       out_stride = 1UL;
    1227           0 :       *max_dest_cnt = 1UL;
    1228           0 :       dests = fd_shred_dest_compute_first   ( sdest, new_shreds, k, ctx->scratchpad_dests );
    1229           0 :     }
    1230           0 :     if( FD_UNLIKELY( !dests ) ) return;
    1231             : 
    1232             :     /* Send only the ones we didn't receive. */
    1233           0 :     for( ulong i=0UL; i<k; i++ ) {
    1234           0 :       for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, stem, new_shreds[ i ], fd_shred_dest_idx_to_dest( sdest, dests[ j*out_stride+i ]), ctx->tsorig );
    1235           0 :     }
    1236           0 :   }
    1237           0 : }
    1238             : 
    1239             : static void
    1240             : privileged_init( fd_topo_t const *      topo,
    1241           0 :                  fd_topo_tile_t const * tile ) {
    1242           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
    1243           0 :   FD_TEST( scratch!=NULL );
    1244             : 
    1245           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
    1246           0 :   fd_shred_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_shred_ctx_t ), sizeof( fd_shred_ctx_t ) );
    1247             : 
    1248           0 :   if( FD_UNLIKELY( !strcmp( tile->shred.identity_key_path, "" ) ) )
    1249           0 :     FD_LOG_ERR(( "identity_key_path not set" ));
    1250             : 
    1251           0 :   ctx->identity_key[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->shred.identity_key_path, /* pubkey only: */ 1 ) );
    1252             : 
    1253           0 :   if( FD_UNLIKELY( !fd_rng_secure( &(ctx->resolver_seed), sizeof(ulong) ) ) ) {
    1254           0 :     FD_LOG_CRIT(( "fd_rng_secure failed" ));
    1255           0 :   }
    1256             :   /* This is only needed in frankendancer, but we'll overwrite it with
    1257             :      the value the repair tile generated in full firedancer. */
    1258           0 :   if( FD_UNLIKELY( !fd_rng_secure( ctx->repair_nonce_ss->bytes, sizeof(fd_rnonce_ss_t) ) ) ) {
    1259           0 :     FD_LOG_CRIT(( "fd_rng_secure failed" ));
    1260           0 :   }
    1261           0 : }
    1262             : 
    1263             : static void
    1264             : fd_shred_signer( void *        signer_ctx,
    1265             :                  uchar         signature[ static 64 ],
    1266           0 :                  uchar const   merkle_root[ static 32 ] ) {
    1267           0 :   fd_keyguard_client_sign( signer_ctx, signature, merkle_root, 32UL, FD_KEYGUARD_SIGN_TYPE_ED25519 );
    1268           0 : }
    1269             : 
    1270             : static void
    1271             : unprivileged_init( fd_topo_t const *      topo,
    1272           0 :                    fd_topo_tile_t const * tile ) {
    1273             : 
    1274           0 :   FD_TEST( 0==strcmp( topo->links[tile->out_link_id[ NET_OUT_IDX   ]].name, "shred_net"   ) );
    1275           0 :   FD_TEST( 0==strcmp( topo->links[tile->out_link_id[ SIGN_OUT_IDX  ]].name, "shred_sign"  ) );
    1276             : 
    1277           0 :   if( FD_UNLIKELY( !tile->out_cnt ) )
    1278           0 :     FD_LOG_ERR(( "shred tile has no primary output link" ));
    1279             : 
    1280           0 :   ulong shred_store_mcache_depth = tile->shred.depth;
    1281           0 :   if( topo->links[ tile->out_link_id[ 0 ] ].depth != shred_store_mcache_depth )
    1282           0 :     FD_LOG_ERR(( "shred tile out depths are not equal %lu %lu",
    1283           0 :                  topo->links[ tile->out_link_id[ 0 ] ].depth, shred_store_mcache_depth ));
    1284             : 
    1285           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
    1286           0 :   FD_TEST( scratch!=NULL );
    1287             : 
    1288           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
    1289           0 :   fd_shred_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_shred_ctx_t ), sizeof( fd_shred_ctx_t ) );
    1290             : 
    1291           0 :   ctx->round_robin_cnt = fd_topo_tile_name_cnt( topo, tile->name );
    1292           0 :   ctx->round_robin_id  = tile->kind_id;
    1293           0 :   ctx->batch_cnt       = 0UL;
    1294           0 :   ctx->slot            = ULONG_MAX;
    1295             : 
    1296             :   /* If the default partial_depth is ever changed, correspondingly
    1297             :      change the size of the fd_fec_intra_pool in fd_fec_repair. */
    1298           0 :   ulong fec_resolver_footprint = fd_fec_resolver_footprint( tile->shred.fec_resolver_depth, 1UL, shred_store_mcache_depth + 1UL,
    1299           0 :                                                             128UL * tile->shred.fec_resolver_depth );
    1300             :   /* See long comment at the top of this file for the computation of
    1301             :      fec_set_cnt. */
    1302           0 :   ulong fec_set_cnt            = 2UL*shred_store_mcache_depth + tile->shred.fec_resolver_depth + FD_SHRED_BATCH_FEC_SETS_MAX + 2UL;
    1303           0 :   ulong fec_sets_required_sz   = fec_set_cnt*sizeof(fd_fec_set_t);
    1304             : 
    1305           0 :   void * fec_sets_shmem = NULL;
    1306           0 :   ctx->shred_out_idx = fd_topo_find_tile_out_link( topo, tile, "shred_out", ctx->round_robin_id );
    1307           0 :   ctx->store_out_idx = fd_topo_find_tile_out_link( topo, tile, "shred_store",  ctx->round_robin_id );
    1308           0 :   if( FD_LIKELY( ctx->shred_out_idx!=ULONG_MAX ) ) { /* firedancer-only */
    1309           0 :     fd_topo_link_t const * shred_out = &topo->links[ tile->out_link_id[ ctx->shred_out_idx ] ];
    1310           0 :     ctx->shred_out_mem    = topo->workspaces[ topo->objs[ shred_out->dcache_obj_id ].wksp_id ].wksp;
    1311           0 :     ctx->shred_out_chunk0 = fd_dcache_compact_chunk0( ctx->shred_out_mem, shred_out->dcache );
    1312           0 :     ctx->shred_out_wmark  = fd_dcache_compact_wmark ( ctx->shred_out_mem, shred_out->dcache, shred_out->mtu );
    1313           0 :     ctx->shred_out_chunk  = ctx->shred_out_chunk0;
    1314           0 :     FD_TEST( fd_dcache_compact_is_safe( ctx->shred_out_mem, shred_out->dcache, shred_out->mtu, shred_out->depth ) );
    1315           0 :     ulong fec_sets_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "fec_sets" );
    1316           0 :     if( FD_UNLIKELY( fec_sets_obj_id == ULONG_MAX ) ) FD_LOG_ERR(( "invalid firedancer topo" ));
    1317           0 :     fd_topo_obj_t const * obj = &topo->objs[ fec_sets_obj_id ];
    1318           0 :     if( FD_UNLIKELY( obj->footprint<(fec_sets_required_sz*ctx->round_robin_cnt) ) ) {
    1319           0 :       FD_LOG_ERR(( "fec_sets wksp obj too small. It is %lu bytes but must be at least %lu bytes. ",
    1320           0 :                    obj->footprint,
    1321           0 :                    fec_sets_required_sz ));
    1322           0 :     }
    1323           0 :     fec_sets_shmem = (uchar *)fd_topo_obj_laddr( topo, fec_sets_obj_id ) + (ctx->round_robin_id * fec_sets_required_sz);
    1324             : 
    1325           0 :     ulong rnonce_ss_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "rnonce_ss" );
    1326           0 :     FD_TEST( rnonce_ss_id!=ULONG_MAX );
    1327           0 :     memcpy( ctx->repair_nonce_ss, fd_topo_obj_laddr( topo, rnonce_ss_id ), sizeof(fd_rnonce_ss_t) );
    1328             : 
    1329           0 :   } else if ( FD_LIKELY( ctx->store_out_idx!=ULONG_MAX ) ) { /* frankendancer-only */
    1330           0 :     FD_TEST( 0==strcmp( topo->links[tile->out_link_id[ ctx->store_out_idx ]].name, "shred_store" ) );
    1331           0 :     fec_sets_shmem = topo->links[ tile->out_link_id[ ctx->store_out_idx ] ].dcache;
    1332           0 :     if( FD_UNLIKELY( fd_dcache_data_sz( fec_sets_shmem )<fec_sets_required_sz ) ) {
    1333           0 :       FD_LOG_ERR(( "shred_store dcache too small. It is %lu bytes but must be at least %lu bytes. ",
    1334           0 :                   fd_dcache_data_sz( fec_sets_shmem ),
    1335           0 :                   fec_sets_required_sz ));
    1336           0 :     }
    1337           0 :   }
    1338             : 
    1339           0 :   if( FD_UNLIKELY( !tile->shred.fec_resolver_depth ) ) FD_LOG_ERR(( "fec_resolver_depth not set" ));
    1340           0 :   if( FD_UNLIKELY( !tile->shred.shred_listen_port  ) ) FD_LOG_ERR(( "shred_listen_port not set" ));
    1341             : 
    1342           0 :   void * _stake_ci = FD_SCRATCH_ALLOC_APPEND( l, fd_stake_ci_align(),              fd_stake_ci_footprint()            );
    1343           0 :   void * _resolver = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_resolver_align(),          fec_resolver_footprint             );
    1344           0 :   void * _shredder = FD_SCRATCH_ALLOC_APPEND( l, fd_shredder_align(),              fd_shredder_footprint()            );
    1345             : 
    1346           0 :   fd_fec_set_t * fec_sets  = (fd_fec_set_t *)fec_sets_shmem;
    1347             : 
    1348           0 : #define NONNULL( x ) (__extension__({                                        \
    1349           0 :       __typeof__((x)) __x = (x);                                             \
    1350           0 :       if( FD_UNLIKELY( !__x ) ) FD_LOG_ERR(( #x " was unexpectedly NULL" )); \
    1351           0 :       __x; }))
    1352             : 
    1353           0 :   int has_ipecho_in = fd_topo_find_tile_in_link( topo, tile, "ipecho_out", 0UL )!=ULONG_MAX;
    1354           0 :   ushort expected_shred_version = tile->shred.expected_shred_version;
    1355           0 :   if( FD_UNLIKELY( !has_ipecho_in && !expected_shred_version ) ) {
    1356           0 :     ulong busy_obj_id = fd_pod_query_ulong( topo->props, "pohh_shred", ULONG_MAX );
    1357           0 :     FD_TEST( busy_obj_id!=ULONG_MAX );
    1358           0 :     ulong * gossip_shred_version = fd_fseq_join( fd_topo_obj_laddr( topo, busy_obj_id ) );
    1359           0 :     FD_LOG_INFO(( "Waiting for shred version to be determined via gossip." ));
    1360           0 :     ulong _expected_shred_version = ULONG_MAX;
    1361           0 :     do {
    1362           0 :       _expected_shred_version = FD_VOLATILE_CONST( *gossip_shred_version );
    1363           0 :     } while( _expected_shred_version==ULONG_MAX );
    1364             : 
    1365           0 :     if( FD_UNLIKELY( _expected_shred_version>USHORT_MAX ) ) FD_LOG_ERR(( "invalid shred version %lu", _expected_shred_version ));
    1366           0 :     FD_LOG_INFO(( "Using shred version %hu", (ushort)_expected_shred_version ));
    1367           0 :     expected_shred_version = (ushort)_expected_shred_version;
    1368           0 :   }
    1369             : 
    1370           0 :   ctx->keyswitch = fd_keyswitch_join( fd_topo_obj_laddr( topo, tile->id_keyswitch_obj_id ) );
    1371           0 :   FD_TEST( ctx->keyswitch );
    1372             : 
    1373             :   /* populate ctx */
    1374           0 :   ulong sign_in_idx = fd_topo_find_tile_in_link( topo, tile, "sign_shred", tile->kind_id );
    1375           0 :   FD_TEST( sign_in_idx!=ULONG_MAX );
    1376           0 :   fd_topo_link_t const * sign_in = &topo->links[ tile->in_link_id[ sign_in_idx ] ];
    1377           0 :   fd_topo_link_t const * sign_out = &topo->links[ tile->out_link_id[ SIGN_OUT_IDX ] ];
    1378           0 :   NONNULL( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client,
    1379           0 :                                                             sign_out->mcache,
    1380           0 :                                                             sign_out->dcache,
    1381           0 :                                                             sign_in->mcache,
    1382           0 :                                                             sign_in->dcache,
    1383           0 :                                                             sign_out->mtu ) ) );
    1384             : 
    1385           0 :   ulong shred_limit = fd_ulong_if( tile->shred.larger_shred_limits_per_block, 32UL*32UL*1024UL, 32UL*1024UL );
    1386           0 :   ctx->shred_limit  = shred_limit;
    1387           0 :   fd_fec_set_t * resolver_sets = fec_sets + shred_store_mcache_depth + FD_SHRED_BATCH_FEC_SETS_MAX;
    1388           0 :   ctx->shredder = NONNULL( fd_shredder_join     ( fd_shredder_new     ( _shredder, fd_shred_signer, ctx->keyguard_client ) ) );
    1389           0 :   ctx->resolver = NONNULL( fd_fec_resolver_join ( fd_fec_resolver_new ( _resolver,
    1390           0 :                                                                         fd_shred_signer, ctx->keyguard_client,
    1391           0 :                                                                         tile->shred.fec_resolver_depth, 1UL,
    1392           0 :                                                                         shred_store_mcache_depth+1UL,
    1393           0 :                                                                         128UL * tile->shred.fec_resolver_depth, resolver_sets,
    1394           0 :                                                                         shred_limit,
    1395           0 :                                                                         ctx->resolver_seed ) ) );
    1396             : 
    1397           0 :   if( FD_LIKELY( !!expected_shred_version ) ) {
    1398           0 :     fd_shredder_set_shred_version    ( ctx->shredder, expected_shred_version );
    1399           0 :     fd_fec_resolver_set_shred_version( ctx->resolver, expected_shred_version );
    1400           0 :   }
    1401             : 
    1402           0 :   ctx->fec_sets = fec_sets;
    1403             : 
    1404           0 :   ctx->stake_ci = fd_stake_ci_join( fd_stake_ci_new( _stake_ci, ctx->identity_key ) );
    1405             : 
    1406           0 :   ctx->net_id   = (ushort)0;
    1407             : 
    1408           0 :   fd_ip4_udp_hdr_init( ctx->data_shred_net_hdr,   FD_SHRED_MIN_SZ, 0, tile->shred.shred_listen_port );
    1409           0 :   fd_ip4_udp_hdr_init( ctx->parity_shred_net_hdr, FD_SHRED_MAX_SZ, 0, tile->shred.shred_listen_port );
    1410             : 
    1411           0 :   ctx->adtl_dests_retransmit_cnt = tile->shred.adtl_dests_retransmit_cnt;
    1412           0 :   for( ulong i=0UL; i<ctx->adtl_dests_retransmit_cnt; i++) {
    1413           0 :     ctx->adtl_dests_retransmit[ i ].ip4 = tile->shred.adtl_dests_retransmit[ i ].ip;
    1414           0 :     ctx->adtl_dests_retransmit[ i ].port = tile->shred.adtl_dests_retransmit[ i ].port;
    1415           0 :   }
    1416           0 :   ctx->adtl_dests_leader_cnt = tile->shred.adtl_dests_leader_cnt;
    1417           0 :   for( ulong i=0UL; i<ctx->adtl_dests_leader_cnt; i++) {
    1418           0 :     ctx->adtl_dests_leader[i].ip4  = tile->shred.adtl_dests_leader[i].ip;
    1419           0 :     ctx->adtl_dests_leader[i].port = tile->shred.adtl_dests_leader[i].port;
    1420           0 :   }
    1421             : 
    1422           0 :   uchar has_contact_info_in = 0;
    1423           0 :   for( ulong i=0UL; i<tile->in_cnt; i++ ) {
    1424           0 :     fd_topo_link_t const * link = &topo->links[ tile->in_link_id[ i ] ];
    1425           0 :     fd_topo_wksp_t const * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
    1426             : 
    1427           0 :     if( FD_LIKELY(      !strcmp( link->name, "net_shred"    ) ) ) {
    1428           0 :       ctx->in_kind[ i ] = IN_KIND_NET;
    1429           0 :       fd_net_rx_bounds_init( &ctx->in[ i ].net_rx, link->dcache );
    1430           0 :       continue; /* only net_rx needs to be set in this case. */
    1431           0 :     }
    1432           0 :     else if( FD_LIKELY( !strcmp( link->name, "poh_shred"    ) ) )   ctx->in_kind[ i ] = IN_KIND_POH;   /* Firedancer */
    1433           0 :     else if( FD_LIKELY( !strcmp( link->name, "pohh_shred"   ) ) )   ctx->in_kind[ i ] = IN_KIND_POH;   /* Frankendancer */
    1434           0 :     else if( FD_LIKELY( !strcmp( link->name, "stake_out"    ) ) )   ctx->in_kind[ i ] = IN_KIND_STAKE; /* Frankendancer */
    1435           0 :     else if( FD_LIKELY( !strcmp( link->name, "replay_epoch" ) ) )   ctx->in_kind[ i ] = IN_KIND_EPOCH; /* Firedancer */
    1436           0 :     else if( FD_LIKELY( !strcmp( link->name, "sign_shred"   ) ) )   ctx->in_kind[ i ] = IN_KIND_SIGN;
    1437           0 :     else if( FD_LIKELY( !strcmp( link->name, "ipecho_out"   ) ) )   ctx->in_kind[ i ] = IN_KIND_IPECHO;
    1438           0 :     else if( FD_LIKELY( !strcmp( link->name, "tower_out"    ) ) )   ctx->in_kind[ i ] = IN_KIND_ROOTED;
    1439           0 :     else if( FD_LIKELY( !strcmp( link->name, "replay_resol" ) ) )   ctx->in_kind[ i ] = IN_KIND_ROOTEDH;
    1440           0 :     else if( FD_LIKELY( !strcmp( link->name, "crds_shred"   ) ) ) { ctx->in_kind[ i ] = IN_KIND_CONTACT;
    1441           0 :       if( FD_UNLIKELY( has_contact_info_in ) ) FD_LOG_ERR(( "shred tile has multiple contact info in link types, can only be either gossip_out or crds_shred" ));
    1442           0 :       has_contact_info_in = 1;
    1443           0 :     }
    1444           0 :     else if( FD_LIKELY( !strcmp( link->name, "gossip_out"   ) ) ) { ctx->in_kind[ i ] = IN_KIND_GOSSIP;
    1445           0 :       if( FD_UNLIKELY( has_contact_info_in ) ) FD_LOG_ERR(( "shred tile has multiple contact info in link types, can only be either gossip_out or crds_shred" ));
    1446           0 :       has_contact_info_in = 1;
    1447           0 :     }
    1448             : 
    1449           0 :     else FD_LOG_ERR(( "shred tile has unexpected input link %lu %s", i, link->name ));
    1450             : 
    1451           0 :     if( FD_LIKELY( !!link->mtu ) ) {
    1452           0 :       ctx->in[ i ].mem    = link_wksp->wksp;
    1453           0 :       ctx->in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in[ i ].mem, link->dcache );
    1454           0 :       ctx->in[ i ].wmark  = fd_dcache_compact_wmark ( ctx->in[ i ].mem, link->dcache, link->mtu );
    1455           0 :     }
    1456           0 :   }
    1457             : 
    1458           0 :   fd_topo_link_t const * net_out = &topo->links[ tile->out_link_id[ NET_OUT_IDX ] ];
    1459             : 
    1460           0 :   ctx->net_out_chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( net_out->dcache ), net_out->dcache );
    1461           0 :   ctx->net_out_mem    = topo->workspaces[ topo->objs[ net_out->dcache_obj_id ].wksp_id ].wksp;
    1462           0 :   ctx->net_out_wmark  = fd_dcache_compact_wmark ( ctx->net_out_mem, net_out->dcache, net_out->mtu );
    1463           0 :   ctx->net_out_chunk  = ctx->net_out_chunk0;
    1464             : 
    1465           0 :   ctx->store = NULL;
    1466           0 :   ulong store_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "store" );
    1467           0 :   if( FD_LIKELY( store_obj_id!=ULONG_MAX ) ) { /* firedancer-only */
    1468           0 :     ctx->store = fd_store_join( fd_topo_obj_laddr( topo, store_obj_id ) );
    1469           0 :     FD_TEST( ctx->store->magic==FD_STORE_MAGIC );
    1470           0 :     FD_TEST( ctx->store->part_cnt==ctx->round_robin_cnt ); /* single-writer (shred tile) per store part */
    1471           0 :     FD_TEST( !fd_store_verify( ctx->store ) );
    1472           0 :   }
    1473             : 
    1474           0 :   if( FD_LIKELY( ctx->shred_out_idx!=ULONG_MAX ) ) { /* firedancer-only */
    1475           0 :     fd_topo_link_t const * shred_out = &topo->links[ tile->out_link_id[ ctx->shred_out_idx ] ];
    1476           0 :     ctx->shred_out_mem         = topo->workspaces[ topo->objs[ shred_out->dcache_obj_id ].wksp_id ].wksp;
    1477           0 :     ctx->shred_out_chunk0      = fd_dcache_compact_chunk0( ctx->shred_out_mem, shred_out->dcache );
    1478           0 :     ctx->shred_out_wmark       = fd_dcache_compact_wmark ( ctx->shred_out_mem, shred_out->dcache, shred_out->mtu );
    1479           0 :     ctx->shred_out_chunk       = ctx->shred_out_chunk0;
    1480           0 :     FD_TEST( fd_dcache_compact_is_safe( ctx->shred_out_mem, shred_out->dcache, shred_out->mtu, shred_out->depth ) );
    1481           0 :   }
    1482             : 
    1483           0 :   if( FD_LIKELY( ctx->store_out_idx!=ULONG_MAX ) ) { /* frankendancer-only */
    1484           0 :     fd_topo_link_t const * store_out = &topo->links[ tile->out_link_id[ ctx->store_out_idx ] ];
    1485           0 :     ctx->store_out_mem         = topo->workspaces[ topo->objs[ store_out->dcache_obj_id ].wksp_id ].wksp;
    1486           0 :     ctx->store_out_chunk0      = fd_dcache_compact_chunk0( ctx->store_out_mem, store_out->dcache );
    1487           0 :     ctx->store_out_wmark       = fd_dcache_compact_wmark ( ctx->store_out_mem, store_out->dcache, store_out->mtu );
    1488           0 :     ctx->store_out_chunk       = ctx->store_out_chunk0;
    1489           0 :     FD_TEST( fd_dcache_compact_is_safe( ctx->store_out_mem, store_out->dcache, store_out->mtu, store_out->depth ) );
    1490           0 :   }
    1491             : 
    1492           0 :   ctx->poh_in_expect_seq = 0UL;
    1493             : 
    1494           0 :   ctx->shredder_fec_set_idx = 0UL;
    1495           0 :   ctx->shredder_max_fec_set_idx = shred_store_mcache_depth + FD_SHRED_BATCH_FEC_SETS_MAX;
    1496             : 
    1497           0 :   ctx->chained_merkle_root = NULL;
    1498           0 :   memset( ctx->out_merkle_roots, 0, sizeof(ctx->out_merkle_roots) );
    1499             : 
    1500           0 :   for( ulong i=0UL; i<FD_SHRED_BATCH_FEC_SETS_MAX; i++ ) { ctx->send_fec_set_idx[ i ] = ULONG_MAX; }
    1501           0 :   ctx->send_fec_set_cnt = 0UL;
    1502             : 
    1503           0 :   ctx->shred_buffer_sz  = 0UL;
    1504           0 :   memset( ctx->shred_buffer, 0xFF, FD_NET_MTU );
    1505             : 
    1506           0 :   ctx->leader_bank  = NULL;
    1507             : 
    1508           0 :   fd_histf_join( fd_histf_new( ctx->metrics->contact_info_cnt,     FD_MHIST_MIN(         SHRED, CONTACT_INFO_PER_MESSAGE   ),
    1509           0 :                                                                    FD_MHIST_MAX(         SHRED, CONTACT_INFO_PER_MESSAGE   ) ) );
    1510           0 :   fd_histf_join( fd_histf_new( ctx->metrics->batch_sz,             FD_MHIST_MIN(         SHRED, BATCH_SIZE_BYTES           ),
    1511           0 :                                                                    FD_MHIST_MAX(         SHRED, BATCH_SIZE_BYTES           ) ) );
    1512           0 :   fd_histf_join( fd_histf_new( ctx->metrics->batch_microblock_cnt, FD_MHIST_MIN(         SHRED, MICROBLOCK_PER_BATCH       ),
    1513           0 :                                                                    FD_MHIST_MAX(         SHRED, MICROBLOCK_PER_BATCH       ) ) );
    1514           0 :   fd_histf_join( fd_histf_new( ctx->metrics->shredding_timing,     FD_MHIST_SECONDS_MIN( SHRED, SHREDDING_DURATION_SECONDS ),
    1515           0 :                                                                    FD_MHIST_SECONDS_MAX( SHRED, SHREDDING_DURATION_SECONDS ) ) );
    1516           0 :   fd_histf_join( fd_histf_new( ctx->metrics->add_shred_timing,     FD_MHIST_SECONDS_MIN( SHRED, ADD_SHRED_DURATION_SECONDS ),
    1517           0 :                                                                    FD_MHIST_SECONDS_MAX( SHRED, ADD_SHRED_DURATION_SECONDS ) ) );
    1518           0 :   memset( ctx->metrics->shred_processing_result, '\0', sizeof(ctx->metrics->shred_processing_result) );
    1519           0 :   ctx->metrics->invalid_block_id_cnt         = 0UL;
    1520           0 :   ctx->metrics->shred_rejected_unchained_cnt = 0UL;
    1521           0 :   ctx->metrics->repair_rcv_cnt               = 0UL;
    1522           0 :   ctx->metrics->repair_rcv_bytes             = 0UL;
    1523           0 :   ctx->metrics->turbine_rcv_cnt              = 0UL;
    1524           0 :   ctx->metrics->turbine_rcv_bytes            = 0UL;
    1525           0 :   ctx->metrics->bad_nonce                    = 0UL;
    1526             : 
    1527           0 :   ctx->pending_batch.microblock_cnt = 0UL;
    1528           0 :   ctx->pending_batch.txn_cnt        = 0UL;
    1529           0 :   ctx->pending_batch.pos            = 0UL;
    1530           0 :   ctx->pending_batch.slot           = 0UL;
    1531           0 :   memset( ctx->pending_batch.payload, 0, sizeof(ctx->pending_batch.payload) );
    1532             : 
    1533           0 :   memset( ctx->epoch_schedule, 0, sizeof(ctx->epoch_schedule) );
    1534           0 :   for( ulong i=0UL; i<FD_SHRED_FEATURES_ACTIVATION_SLOT_CNT; i++ ) {
    1535           0 :     ctx->features_activation->slots[i] = FD_SHRED_FEATURES_ACTIVATION_SLOT_DISABLED;
    1536           0 :   }
    1537             : 
    1538           0 :   ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
    1539           0 :   if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
    1540           0 :     FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
    1541             : 
    1542           0 :   memset( ctx->block_ids, 0, sizeof(ctx->block_ids) );
    1543           0 : }
    1544             : 
    1545             : static ulong
    1546             : populate_allowed_seccomp( fd_topo_t const *      topo,
    1547             :                           fd_topo_tile_t const * tile,
    1548             :                           ulong                  out_cnt,
    1549           0 :                           struct sock_filter *   out ) {
    1550           0 :   (void)topo;
    1551           0 :   (void)tile;
    1552             : 
    1553           0 :   populate_sock_filter_policy_fd_shred_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
    1554           0 :   return sock_filter_policy_fd_shred_tile_instr_cnt;
    1555           0 : }
    1556             : 
    1557             : static ulong
    1558             : populate_allowed_fds( fd_topo_t const *      topo,
    1559             :                       fd_topo_tile_t const * tile,
    1560             :                       ulong                  out_fds_cnt,
    1561           0 :                       int *                  out_fds ) {
    1562           0 :   (void)topo;
    1563           0 :   (void)tile;
    1564             : 
    1565           0 :   if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
    1566             : 
    1567           0 :   ulong out_cnt = 0UL;
    1568           0 :   out_fds[ out_cnt++ ] = 2; /* stderr */
    1569           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
    1570           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
    1571           0 :   return out_cnt;
    1572           0 : }
    1573             : 
    1574             : /* Excluding net_out (where the link is unreliable), STEM_BURST needs
    1575             :    to guarantee enough credits for the worst case. There are 4 cases
    1576             :    to consider: (IN_KIND_NET/IN_KIND_POH) x (Frankendancer/Firedancer)
    1577             :    In the IN_KIND_NET case:  (Frankendancer) sends 1 frag to
    1578             :    store;  (Firedancer) that is one frag for the shred to repair, and
    1579             :    then another frag to repair for the FEC set.
    1580             :    In the IN_KIND_POH case:  (Frankendancer) there might be
    1581             :    FD_SHRED_BATCH_FEC_SETS_MAX FEC sets;  (Firedancer) that is
    1582             :    FD_SHRED_BATCH_FEC_SETS_MAX frags to repair (one per FEC set).
    1583             :    Therefore, the worst case is IN_KIND_POH for Frankendancer. */
    1584           0 : #define STEM_BURST (FD_SHRED_BATCH_FEC_SETS_MAX + 40UL)
    1585             : 
    1586             : /* See explanation in fd_pack */
    1587           0 : #define STEM_LAZY  (128L*3000L)
    1588             : 
    1589           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_shred_ctx_t
    1590           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_shred_ctx_t)
    1591             : 
    1592           0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
    1593           0 : #define STEM_CALLBACK_METRICS_WRITE       metrics_write
    1594           0 : #define STEM_CALLBACK_BEFORE_FRAG         before_frag
    1595           0 : #define STEM_CALLBACK_DURING_FRAG         during_frag
    1596           0 : #define STEM_CALLBACK_AFTER_FRAG          after_frag
    1597             : 
    1598             : #include "../stem/fd_stem.c"
    1599             : 
    1600             : fd_topo_run_tile_t fd_tile_shred = {
    1601             :   .name                     = "shred",
    1602             :   .populate_allowed_seccomp = populate_allowed_seccomp,
    1603             :   .populate_allowed_fds     = populate_allowed_fds,
    1604             :   .scratch_align            = scratch_align,
    1605             :   .scratch_footprint        = scratch_footprint,
    1606             :   .privileged_init          = privileged_init,
    1607             :   .unprivileged_init        = unprivileged_init,
    1608             :   .run                      = stem_run,
    1609             : };

Generated by: LCOV version 1.14