LCOV - code coverage report
Current view: top level - disco/shred - fd_shred_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 716 0.0 %
Date: 2025-07-01 05:00:49 Functions: 0 15 0.0 %

          Line data    Source code
       1             : #include "../tiles.h"
       2             : 
       3             : #include "generated/fd_shred_tile_seccomp.h"
       4             : #include "../../util/pod/fd_pod_format.h"
       5             : #include "../shred/fd_shredder.h"
       6             : #include "../shred/fd_shred_batch.h"
       7             : #include "../shred/fd_shred_dest.h"
       8             : #include "../shred/fd_fec_resolver.h"
       9             : #include "../shred/fd_stake_ci.h"
      10             : #include "../keyguard/fd_keyload.h"
      11             : #include "../keyguard/fd_keyguard.h"
      12             : #include "../keyguard/fd_keyswitch.h"
      13             : #include "../fd_disco.h"
      14             : #include "../net/fd_net_tile.h"
      15             : #include "../../flamenco/leaders/fd_leaders.h"
      16             : #include "../../flamenco/runtime/fd_blockstore.h"
      17             : #include "../../util/net/fd_net_headers.h"
      18             : 
      19             : #include <linux/unistd.h>
      20             : 
      21             : /* The shred tile handles shreds from two data sources: shreds
      22             :    generated from microblocks from the banking tile, and shreds
      23             :    retransmitted from the network.
      24             : 
      25             :    They have rather different semantics, but at the end of the day, they
      26             :    both result in a bunch of shreds and FEC sets that need to be sent to
      27             :    the blockstore and on the network, which is why one tile handles
      28             :    both.
      29             : 
      30             :    We segment the memory for the two types of shreds into two halves of
      31             :    a dcache because they follow somewhat different flow control
      32             :    patterns. For flow control, the normal guarantee we want to provide
      33             :    is that the dcache entry is not overwritten unless the mcache entry
      34             :    has also been overwritten.  The normal way to do this when using both
      35             :    cyclically and with a 1-to-1 mapping is to make the dcache at least
      36             :    `burst` entries bigger than the mcache.
      37             : 
      38             :    In this tile, we use one output mcache with one output dcache (which
      39             :    is logically partitioned into two) for the two sources of data.  The
      40             :    worst case for flow control is when we're only sending with one of
      41             :    the dcache partitions at a time though, so we can consider them
      42             :    separately.
      43             : 
      44             :    From bank: Every FEC set triggers at least two mcache entries (one
      45             :    for parity and one for data), so at most, we have ceil(mcache
      46             :    depth/2) FEC sets exposed.  This means we need to decompose dcache
      47             :    into at least ceil(mcache depth/2)+1 FEC sets.
      48             : 
      49             :    From the network: The FEC resolver doesn't use a cyclic order, but it
      50             :    does promise that once it returns an FEC set, it will return at least
      51             :    complete_depth FEC sets before returning it again.  This means we
      52             :    want at most complete_depth-1 FEC sets exposed, so
      53             :    complete_depth=ceil(mcache depth/2)+1 FEC sets as above.  The FEC
      54             :    resolver has the ability to keep individual shreds for partial_depth
      55             :    calls, but because in this version of the shred tile, we send each
      56             :    shred to all its destinations as soon as we get it, we don't need
      57             :    that functionality, so we set partial_depth=1.
      58             : 
      59             :    Adding these up, we get 2*ceil(mcache_depth/2)+3+fec_resolver_depth
      60             :    FEC sets, which is no more than mcache_depth+4+fec_resolver_depth.
      61             :    Each FEC is paired with 4 fd_shred34_t structs, so that means we need
      62             :    to decompose the dcache into 4*mcache_depth + 4*fec_resolver_depth +
      63             :    16 fd_shred34_t structs.
      64             : 
      65             :    A note on parallelization.  From the network, shreds are distributed
      66             :    to tiles by their signature, so all the shreds for a given FEC set
      67             :    are processed by the same tile.  From bank, the original implementation
      68             :    used to parallelize by batch of microblocks (so within a block, batches
      69             :    were distributed to different tiles).  To support chained merkle shreds,
      70             :    the current implementation processes all the batches on tile 0 -- this
      71             :    should be a temporary state while Solana moves to a newer shred format
      72             :    that support better parallelization. */
      73             : 
      74             : /* The memory this tile uses is a bit complicated and has some logical
      75             :    aliasing to facilitate zero-copy use.  We have a dcache containing
      76             :    fd_shred34_t objects, which are basically 34 fd_shred_t objects
      77             :    padded to their max size, where 34 is set so that the size of the
      78             :    fd_shred34_t object (including some metadata) is less than
      79             :    USHORT_MAX, which facilitates sending it using Tango.  Then, for each
      80             :    set of 4 consecutive fd_shred34_t objects, we have an fd_fec_set_t.
      81             :    The first 34 data shreds point to the payload section of the payload
      82             :    section of each of the packets in the first fd_shred34_t.  The other
      83             :    33 data shreds point into the second fd_shred34_t.  Similar for the
      84             :    parity shreds pointing into the third and fourth fd_shred34_t. */
      85             : 
      86             : /* There's nothing deep about this max, but I just find it easier to
      87             :    have a max and use statically sized arrays than alloca. */
      88             : #define MAX_BANK_CNT 64UL
      89             : 
      90             : #define FD_SHRED_TILE_SCRATCH_ALIGN 128UL
      91             : 
      92           0 : #define IN_KIND_CONTACT (0UL)
      93           0 : #define IN_KIND_STAKE   (1UL)
      94           0 : #define IN_KIND_POH     (2UL)
      95           0 : #define IN_KIND_NET     (3UL)
      96           0 : #define IN_KIND_SIGN    (4UL)
      97           0 : #define IN_KIND_REPAIR  (5UL)
      98             : 
      99           0 : #define NET_OUT_IDX     1
     100           0 : #define SIGN_OUT_IDX    2
     101             : 
     102           0 : #define DCACHE_ENTRIES_PER_FEC_SET (4UL)
     103             : FD_STATIC_ASSERT( sizeof(fd_shred34_t) < USHORT_MAX, shred_34 );
     104             : FD_STATIC_ASSERT( 34*DCACHE_ENTRIES_PER_FEC_SET >= FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX, shred_34 );
     105             : FD_STATIC_ASSERT( sizeof(fd_shred34_t) == FD_SHRED_STORE_MTU, shred_34 );
     106             : 
     107             : FD_STATIC_ASSERT( sizeof(fd_entry_batch_meta_t)==56UL, poh_shred_mtu );
     108             : 
     109           0 : #define FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT 2
     110             : 
     111             : /* See note on parallelization above. Currently we process all batches in tile 0. */
     112             : #if 1
     113             : #define SHOULD_PROCESS_THESE_SHREDS ( ctx->round_robin_id==0 )
     114             : #else
     115             : #define SHOULD_PROCESS_THESE_SHREDS ( ctx->batch_cnt%ctx->round_robin_cnt==ctx->round_robin_id )
     116             : #endif
     117             : 
     118             : /* The behavior of the shred tile is slightly different for
     119             :    Frankendancer vs Firedancer.  For example, Frankendancer produces
     120             :    chained merkle shreds, while Firedancer doesn't yet.  We can check
     121             :    at runtime the difference by inspecting the topology. The simplest
     122             :    way is to test if ctx->blockstore is enabled. */
     123             : #define IS_FIREDANCER ( ctx->blockstore!=NULL )
     124             : 
     125             : typedef union {
     126             :   struct {
     127             :     fd_wksp_t * mem;
     128             :     ulong       chunk0;
     129             :     ulong       wmark;
     130             :   };
     131             :   fd_net_rx_bounds_t net_rx;
     132             : } fd_shred_in_ctx_t;
     133             : 
     134             : typedef struct {
     135             :   fd_shredder_t      * shredder;
     136             :   fd_fec_resolver_t  * resolver;
     137             :   fd_pubkey_t          identity_key[1]; /* Just the public key */
     138             : 
     139             :   ulong                round_robin_id;
     140             :   ulong                round_robin_cnt;
     141             :   /* Number of batches shredded from PoH during the current slot.
     142             :      This should be the same for all the shred tiles. */
     143             :   ulong                batch_cnt;
     144             :   /* Slot of the most recent microblock we've seen from PoH,
     145             :      or 0 if we haven't seen one yet */
     146             :   ulong                slot;
     147             : 
     148             :   fd_keyswitch_t *     keyswitch;
     149             :   fd_keyguard_client_t keyguard_client[1];
     150             : 
     151             :   /* shred34 and fec_sets are very related: fec_sets[i] has pointers
     152             :      to the shreds in shred34[4*i + k] for k=0,1,2,3. */
     153             :   fd_shred34_t       * shred34;
     154             :   fd_fec_set_t       * fec_sets;
     155             : 
     156             :   fd_stake_ci_t      * stake_ci;
     157             :   /* These are used in between during_frag and after_frag */
     158             :   fd_shred_dest_weighted_t * new_dest_ptr;
     159             :   ulong                      new_dest_cnt;
     160             :   ulong                      shredded_txn_cnt;
     161             : 
     162             :   ulong poh_in_expect_seq;
     163             : 
     164             :   ushort net_id;
     165             : 
     166             :   int skip_frag;
     167             : 
     168             :   fd_shred_dest_weighted_t adtl_dest[1];
     169             : 
     170             :   fd_ip4_udp_hdrs_t data_shred_net_hdr  [1];
     171             :   fd_ip4_udp_hdrs_t parity_shred_net_hdr[1];
     172             : 
     173             :   fd_wksp_t * shred_store_wksp;
     174             : 
     175             :   ulong shredder_fec_set_idx;     /* In [0, shredder_max_fec_set_idx) */
     176             :   ulong shredder_max_fec_set_idx; /* exclusive */
     177             : 
     178             :   ulong send_fec_set_idx[ FD_SHRED_BATCH_FEC_SETS_MAX ];
     179             :   ulong send_fec_set_cnt;
     180             :   ulong tsorig;  /* timestamp of the last packet in compressed form */
     181             : 
     182             :   /* Includes Ethernet, IP, UDP headers */
     183             :   ulong shred_buffer_sz;
     184             :   uchar shred_buffer[ FD_NET_MTU ];
     185             : 
     186             :   fd_shred_in_ctx_t in[ 32 ];
     187             :   int               in_kind[ 32 ];
     188             : 
     189             :   fd_frag_meta_t * net_out_mcache;
     190             :   ulong *          net_out_sync;
     191             :   ulong            net_out_depth;
     192             :   ulong            net_out_seq;
     193             : 
     194             :   fd_wksp_t * net_out_mem;
     195             :   ulong       net_out_chunk0;
     196             :   ulong       net_out_wmark;
     197             :   ulong       net_out_chunk;
     198             : 
     199             :   ulong       store_out_idx;
     200             :   fd_wksp_t * store_out_mem;
     201             :   ulong       store_out_chunk0;
     202             :   ulong       store_out_wmark;
     203             :   ulong       store_out_chunk;
     204             : 
     205             :   ulong       repair_out_idx;
     206             :   fd_wksp_t * repair_out_mem;
     207             :   ulong       repair_out_chunk0;
     208             :   ulong       repair_out_wmark;
     209             :   ulong       repair_out_chunk;
     210             : 
     211             :   fd_blockstore_t   blockstore_ljoin;
     212             :   fd_blockstore_t * blockstore;
     213             : 
     214             :   struct {
     215             :     fd_histf_t contact_info_cnt[ 1 ];
     216             :     fd_histf_t batch_sz[ 1 ];
     217             :     fd_histf_t batch_microblock_cnt[ 1 ];
     218             :     fd_histf_t shredding_timing[ 1 ];
     219             :     fd_histf_t add_shred_timing[ 1 ];
     220             :     ulong shred_processing_result[ FD_FEC_RESOLVER_ADD_SHRED_RETVAL_CNT+FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT ];
     221             :     ulong invalid_block_id_cnt;
     222             :     ulong shred_rejected_unchained_cnt;
     223             :   } metrics[ 1 ];
     224             : 
     225             :   struct {
     226             :     ulong txn_cnt;
     227             :     ulong pos; /* in payload, range [0, FD_SHRED_BATCH_RAW_BUF_SZ-8UL) */
     228             :     ulong slot; /* set to 0 when pos==0 */
     229             :     union {
     230             :       struct {
     231             :         ulong microblock_cnt;
     232             :         uchar payload[ FD_SHRED_BATCH_RAW_BUF_SZ - 8UL ];
     233             :       };
     234             :       uchar raw[ FD_SHRED_BATCH_RAW_BUF_SZ ];
     235             :     };
     236             :   } pending_batch;
     237             : 
     238             :   fd_shred_features_activation_t features_activation[1];
     239             :   /* too large to be left in the stack */
     240             :   fd_shred_dest_idx_t scratchpad_dests[ FD_SHRED_DEST_MAX_FANOUT*(FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX) ];
     241             : 
     242             :   uchar chained_merkle_root[ FD_SHRED_MERKLE_ROOT_SZ ];
     243             : } fd_shred_ctx_t;
     244             : 
     245             : FD_FN_CONST static inline ulong
     246           0 : scratch_align( void ) {
     247           0 :   return 128UL;
     248           0 : }
     249             : 
     250             : FD_FN_PURE static inline ulong
     251           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     252             : 
     253           0 :   ulong fec_resolver_footprint = fd_fec_resolver_footprint( tile->shred.fec_resolver_depth, 1UL, tile->shred.depth,
     254           0 :                                                             128UL * tile->shred.fec_resolver_depth );
     255           0 :   ulong fec_set_cnt = tile->shred.depth + tile->shred.fec_resolver_depth + 4UL;
     256             : 
     257           0 :   ulong l = FD_LAYOUT_INIT;
     258           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_shred_ctx_t),          sizeof(fd_shred_ctx_t)                  );
     259           0 :   l = FD_LAYOUT_APPEND( l, fd_stake_ci_align(),              fd_stake_ci_footprint()                 );
     260           0 :   l = FD_LAYOUT_APPEND( l, fd_fec_resolver_align(),          fec_resolver_footprint                  );
     261           0 :   l = FD_LAYOUT_APPEND( l, fd_shredder_align(),              fd_shredder_footprint()                 );
     262           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_fec_set_t),            sizeof(fd_fec_set_t)*fec_set_cnt        );
     263           0 :   return FD_LAYOUT_FINI( l, scratch_align() );
     264           0 : }
     265             : 
     266             : static inline void
     267           0 : during_housekeeping( fd_shred_ctx_t * ctx ) {
     268           0 :   if( FD_UNLIKELY( fd_keyswitch_state_query( ctx->keyswitch )==FD_KEYSWITCH_STATE_SWITCH_PENDING ) ) {
     269           0 :     ulong seq_must_complete = ctx->keyswitch->param;
     270             : 
     271           0 :     if( FD_UNLIKELY( fd_seq_lt( ctx->poh_in_expect_seq, seq_must_complete ) ) ) {
     272             :       /* See fd_keyswitch.h, we need to flush any in-flight shreds from
     273             :          the leader pipeline before switching key. */
     274           0 :       FD_LOG_WARNING(( "Flushing in-flight unpublished shreds, must reach seq %lu, currently at %lu ...", seq_must_complete, ctx->poh_in_expect_seq ));
     275           0 :       return;
     276           0 :     }
     277             : 
     278           0 :     memcpy( ctx->identity_key->uc, ctx->keyswitch->bytes, 32UL );
     279           0 :     fd_stake_ci_set_identity( ctx->stake_ci, ctx->identity_key );
     280           0 :     fd_keyswitch_state( ctx->keyswitch, FD_KEYSWITCH_STATE_COMPLETED );
     281           0 :   }
     282           0 : }
     283             : 
     284             : static inline void
     285           0 : metrics_write( fd_shred_ctx_t * ctx ) {
     286           0 :   FD_MHIST_COPY( SHRED, CLUSTER_CONTACT_INFO_CNT,   ctx->metrics->contact_info_cnt             );
     287           0 :   FD_MHIST_COPY( SHRED, BATCH_SZ,                   ctx->metrics->batch_sz                     );
     288           0 :   FD_MHIST_COPY( SHRED, BATCH_MICROBLOCK_CNT,       ctx->metrics->batch_microblock_cnt         );
     289           0 :   FD_MHIST_COPY( SHRED, SHREDDING_DURATION_SECONDS, ctx->metrics->shredding_timing             );
     290           0 :   FD_MHIST_COPY( SHRED, ADD_SHRED_DURATION_SECONDS, ctx->metrics->add_shred_timing             );
     291             : 
     292           0 :   FD_MCNT_SET  ( SHRED, INVALID_BLOCK_ID,           ctx->metrics->invalid_block_id_cnt         );
     293           0 :   FD_MCNT_SET  ( SHRED, SHRED_REJECTED_UNCHAINED,   ctx->metrics->shred_rejected_unchained_cnt );
     294             : 
     295           0 :   FD_MCNT_ENUM_COPY( SHRED, SHRED_PROCESSED, ctx->metrics->shred_processing_result             );
     296           0 : }
     297             : 
     298             : static inline void
     299             : handle_new_cluster_contact_info( fd_shred_ctx_t * ctx,
     300           0 :                                  uchar const    * buf ) {
     301           0 :   ulong const * header = (ulong const *)fd_type_pun_const( buf );
     302             : 
     303           0 :   ulong dest_cnt = header[ 0 ];
     304           0 :   fd_histf_sample( ctx->metrics->contact_info_cnt, dest_cnt );
     305             : 
     306           0 :   if( dest_cnt >= MAX_SHRED_DESTS )
     307           0 :     FD_LOG_ERR(( "Cluster nodes had %lu destinations, which was more than the max of %lu", dest_cnt, MAX_SHRED_DESTS ));
     308             : 
     309           0 :   fd_shred_dest_wire_t const * in_dests = fd_type_pun_const( header+1UL );
     310           0 :   fd_shred_dest_weighted_t * dests = fd_stake_ci_dest_add_init( ctx->stake_ci );
     311             : 
     312           0 :   ctx->new_dest_ptr = dests;
     313           0 :   ctx->new_dest_cnt = dest_cnt;
     314             : 
     315           0 :   for( ulong i=0UL; i<dest_cnt; i++ ) {
     316           0 :     memcpy( dests[i].pubkey.uc, in_dests[i].pubkey, 32UL );
     317           0 :     dests[i].ip4  = in_dests[i].ip4_addr;
     318           0 :     dests[i].port = in_dests[i].udp_port;
     319           0 :   }
     320           0 : }
     321             : 
     322             : static inline void
     323           0 : finalize_new_cluster_contact_info( fd_shred_ctx_t * ctx ) {
     324           0 :   fd_stake_ci_dest_add_fini( ctx->stake_ci, ctx->new_dest_cnt );
     325           0 : }
     326             : 
     327             : static inline int
     328             : before_frag( fd_shred_ctx_t * ctx,
     329             :              ulong            in_idx,
     330             :              ulong            seq,
     331           0 :              ulong            sig ) {
     332           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH ) ) {
     333           0 :     ctx->poh_in_expect_seq = seq+1UL;
     334           0 :     return (fd_disco_poh_sig_pkt_type( sig )!=POH_PKT_TYPE_MICROBLOCK) & (fd_disco_poh_sig_pkt_type( sig )!=POH_PKT_TYPE_FEAT_ACT_SLOT);
     335           0 :   }
     336           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
     337           0 :     return (fd_disco_netmux_sig_proto( sig )!=DST_PROTO_SHRED) & (fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR);
     338           0 :   }
     339           0 :   return 0;
     340           0 : }
     341             : 
     342             : static void
     343             : during_frag( fd_shred_ctx_t * ctx,
     344             :              ulong            in_idx,
     345             :              ulong            seq FD_PARAM_UNUSED,
     346             :              ulong            sig,
     347             :              ulong            chunk,
     348             :              ulong            sz,
     349           0 :              ulong            ctl ) {
     350             : 
     351           0 :   ctx->skip_frag = 0;
     352             : 
     353           0 :   ctx->tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
     354             : 
     355           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPAIR ) ) {
     356           0 :     if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
     357           0 :     FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     358           0 :                 ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     359             : 
     360           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     361           0 :     fd_memcpy( ctx->shred_buffer, dcache_entry, sz );
     362           0 :     return;
     363           0 :   }
     364             : 
     365           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_CONTACT ) ) {
     366           0 :     if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
     367           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     368           0 :                    ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     369             : 
     370           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     371           0 :     handle_new_cluster_contact_info( ctx, dcache_entry );
     372           0 :     return;
     373           0 :   }
     374             : 
     375           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_STAKE ) ) {
     376           0 :     if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
     377           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     378           0 :                    ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     379             : 
     380           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     381           0 :     fd_stake_ci_stake_msg_init( ctx->stake_ci, fd_type_pun_const( dcache_entry ) );
     382           0 :     return;
     383           0 :   }
     384             : 
     385           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH ) ) {
     386           0 :     ctx->send_fec_set_cnt = 0UL;
     387             : 
     388           0 :     if( FD_UNLIKELY( (fd_disco_poh_sig_pkt_type( sig )==POH_PKT_TYPE_FEAT_ACT_SLOT) ) ) {
     389             :       /* There is a subset of FD_SHRED_FEATURES_ACTIVATION_... slots that
     390             :           the shred tile needs to be aware of.  Since this requires the
     391             :           bank, we are forced (so far) to receive them from the poh tile
     392             :           (as a POH_PKT_TYPE_FEAT_ACT_SLOT).  This is not elegant, and it
     393             :           should be revised in the future (TODO), but it provides a
     394             :           "temporary" working solution to handle features activation. */
     395           0 :       uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     396           0 :       if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz!=(sizeof(fd_shred_features_activation_t)) ) )
     397           0 :         FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     398           0 :               ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     399             : 
     400           0 :       fd_shred_features_activation_t const * act_data = (fd_shred_features_activation_t const *)dcache_entry;
     401           0 :       memcpy( ctx->features_activation, act_data, sizeof(fd_shred_features_activation_t) );
     402           0 :     }
     403           0 :     else { /* (fd_disco_poh_sig_pkt_type( sig )==POH_PKT_TYPE_MICROBLOCK) */
     404             :       /* This is a frag from the PoH tile.  We'll copy it to our pending
     405             :         microblock batch and shred it if necessary (last in block or
     406             :         above watermark).  We just go ahead and shred it here, even
     407             :         though we may get overrun.  If we do end up getting overrun, we
     408             :         just won't send these shreds out and we'll reuse the FEC set for
     409             :         the next one.  From a higher level though, if we do get overrun,
     410             :         a bunch of shreds will never be transmitted, and we'll end up
     411             :         producing a block that never lands on chain. */
     412             : 
     413           0 :       uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     414           0 :       if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>FD_POH_SHRED_MTU ||
     415           0 :           sz<(sizeof(fd_entry_batch_meta_t)+sizeof(fd_entry_batch_header_t)) ) )
     416           0 :         FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     417           0 :               ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     418             : 
     419           0 :       fd_entry_batch_meta_t const * entry_meta = (fd_entry_batch_meta_t const *)dcache_entry;
     420           0 :       uchar const *                 entry      = dcache_entry + sizeof(fd_entry_batch_meta_t);
     421           0 :       ulong                         entry_sz   = sz           - sizeof(fd_entry_batch_meta_t);
     422             : 
     423           0 :       fd_entry_batch_header_t const * microblock = (fd_entry_batch_header_t const *)entry;
     424             : 
     425             :       /* It should never be possible for this to fail, but we check it
     426             :         anyway. */
     427           0 :       FD_TEST( entry_sz + ctx->pending_batch.pos <= sizeof(ctx->pending_batch.payload) );
     428             : 
     429           0 :       ulong target_slot = fd_disco_poh_sig_slot( sig );
     430           0 :       if( FD_UNLIKELY( (ctx->pending_batch.microblock_cnt>0) & (ctx->pending_batch.slot!=target_slot) ) ) {
     431             :         /* TODO: The Agave client sends a dummy entry batch with only 1
     432             :           byte and the block-complete bit set.  This helps other
     433             :           validators know that the block is dead and they should not try
     434             :           to continue building a fork on it.  We probably want a similar
     435             :           approach eventually. */
     436           0 :         FD_LOG_WARNING(( "Abandoning %lu microblocks for slot %lu and switching to slot %lu",
     437           0 :               ctx->pending_batch.microblock_cnt, ctx->pending_batch.slot, target_slot ));
     438           0 :         ctx->pending_batch.slot           = 0UL;
     439           0 :         ctx->pending_batch.pos            = 0UL;
     440           0 :         ctx->pending_batch.microblock_cnt = 0UL;
     441           0 :         ctx->pending_batch.txn_cnt        = 0UL;
     442           0 :         ctx->batch_cnt                    = 0UL;
     443             : 
     444           0 :         FD_MCNT_INC( SHRED, MICROBLOCKS_ABANDONED, 1UL );
     445           0 :       }
     446             : 
     447           0 :       ctx->pending_batch.slot = target_slot;
     448           0 :       if( FD_UNLIKELY( target_slot!=ctx->slot )) {
     449             :         /* Reset batch count if we are in a new slot */
     450           0 :         ctx->batch_cnt = 0UL;
     451           0 :         ctx->slot      = target_slot;
     452             : 
     453             :         /* Only copy parent_block_id to chained_merkle_root at the beginning
     454             :            of a new slot*/
     455           0 :         if( FD_UNLIKELY( SHOULD_PROCESS_THESE_SHREDS ) ) {
     456             :           /* chained_merkle_root is set as the merkle root of the last FEC set
     457             :             of the parent block (and passed in by POH tile) */
     458           0 :           if( FD_LIKELY( entry_meta->parent_block_id_valid ) ) {
     459           0 :             memcpy( ctx->chained_merkle_root, entry_meta->parent_block_id, FD_SHRED_MERKLE_ROOT_SZ );
     460           0 :           } else {
     461           0 :             ctx->metrics->invalid_block_id_cnt++;
     462           0 :             memset( ctx->chained_merkle_root, 0, FD_SHRED_MERKLE_ROOT_SZ );
     463           0 :           }
     464           0 :         }
     465           0 :       }
     466             : 
     467           0 :       if( FD_LIKELY( !SHOULD_PROCESS_THESE_SHREDS ) ) {
     468             :         /* If we are not processing this batch, filter in after_frag. */
     469           0 :         ctx->skip_frag = 1;
     470           0 :       }
     471             : 
     472           0 :       ulong   pending_batch_wmark = FD_SHRED_BATCH_WMARK_CHAINED;
     473           0 :       uchar * chained_merkle_root = ctx->chained_merkle_root;
     474           0 :       ulong   load_for_32_shreds  = FD_SHREDDER_CHAINED_FEC_SET_PAYLOAD_SZ;
     475             :       /* All fec sets in the last batch of a block need to be resigned.
     476             :          This needs to match Agave's behavior - as a reference, see:
     477             :          https://github.com/anza-xyz/agave/blob/v2.3/ledger/src/shred/merkle.rs#L1040 */
     478           0 :       if( FD_UNLIKELY( entry_meta->block_complete ) ) {
     479           0 :         pending_batch_wmark = FD_SHRED_BATCH_WMARK_RESIGNED;
     480             :         /* chained_merkle_root also applies to resigned FEC sets. */
     481           0 :         load_for_32_shreds = FD_SHREDDER_RESIGNED_FEC_SET_PAYLOAD_SZ;
     482           0 :       }
     483             :       /* TODO remove once unchained fec sets have been deprecated. */
     484           0 :       if( FD_LIKELY( IS_FIREDANCER ) ) {
     485           0 :         pending_batch_wmark = FD_SHRED_BATCH_WMARK_NORMAL;
     486           0 :         chained_merkle_root = NULL;
     487           0 :         load_for_32_shreds  = FD_SHREDDER_NORMAL_FEC_SET_PAYLOAD_SZ;
     488           0 :       }
     489             : 
     490             :       /* If this microblock completes the block, the batch is then
     491             :          finalized here.  Otherwise, we check whether the new entry
     492             :          would exceed the pending_batch_wmark.  If true, then the
     493             :          batch is closed now, shredded, and a new batch is started
     494             :          with the incoming microblock.  If false, no shredding takes
     495             :          place, and the microblock is added to the current batch. */
     496           0 :       int batch_would_exceed_wmark = ( ctx->pending_batch.pos + entry_sz ) > pending_batch_wmark;
     497           0 :       int include_in_current_batch = entry_meta->block_complete | ( !batch_would_exceed_wmark );
     498           0 :       int process_current_batch    = entry_meta->block_complete | batch_would_exceed_wmark;
     499           0 :       int init_new_batch           = !include_in_current_batch;
     500             : 
     501           0 :       if( FD_LIKELY( include_in_current_batch ) ) {
     502           0 :         if( FD_UNLIKELY( SHOULD_PROCESS_THESE_SHREDS ) ) {
     503             :           /* Ugh, yet another memcpy */
     504           0 :           fd_memcpy( ctx->pending_batch.payload + ctx->pending_batch.pos, entry, entry_sz );
     505           0 :         }
     506           0 :         ctx->pending_batch.pos            += entry_sz;
     507           0 :         ctx->pending_batch.microblock_cnt += 1UL;
     508           0 :         ctx->pending_batch.txn_cnt        += microblock->txn_cnt;
     509           0 :       }
     510             : 
     511           0 :       if( FD_LIKELY( process_current_batch )) {
     512             :         /* Batch and padding size calculation. */
     513           0 :         ulong batch_sz        = sizeof(ulong) + ctx->pending_batch.pos; /* without padding */
     514           0 :         ulong batch_sz_padded = load_for_32_shreds * ( ( batch_sz + load_for_32_shreds - 1UL ) / load_for_32_shreds );
     515           0 :         ulong padding_sz      = batch_sz_padded - batch_sz;
     516             : 
     517           0 :         if( FD_UNLIKELY( SHOULD_PROCESS_THESE_SHREDS ) ) {
     518             :           /* If it's our turn, shred this batch. FD_UNLIKELY because shred
     519             :              tile cnt generally >= 2 */
     520             : 
     521           0 :           long shredding_timing = -fd_tickcount();
     522             : 
     523           0 :           fd_memset( ctx->pending_batch.payload + ctx->pending_batch.pos, 0, padding_sz );
     524             : 
     525           0 :           ctx->send_fec_set_cnt = 0UL; /* verbose */
     526           0 :           ctx->shredded_txn_cnt = ctx->pending_batch.txn_cnt;
     527             : 
     528           0 :           fd_shredder_init_batch( ctx->shredder, ctx->pending_batch.raw, batch_sz_padded, target_slot, entry_meta );
     529             : 
     530           0 :           ulong pend_sz = batch_sz_padded;
     531           0 :           while( pend_sz > 0UL ) {
     532             : 
     533           0 :             fd_fec_set_t * out = ctx->fec_sets + ctx->shredder_fec_set_idx;
     534             : 
     535           0 :             FD_TEST( fd_shredder_next_fec_set( ctx->shredder, out, chained_merkle_root ) );
     536             : 
     537           0 :             d_rcvd_join( d_rcvd_new( d_rcvd_delete( d_rcvd_leave( out->data_shred_rcvd   ) ) ) );
     538           0 :             p_rcvd_join( p_rcvd_new( p_rcvd_delete( p_rcvd_leave( out->parity_shred_rcvd ) ) ) );
     539             : 
     540           0 :             ctx->send_fec_set_idx[ ctx->send_fec_set_cnt ] = ctx->shredder_fec_set_idx;
     541           0 :             ctx->send_fec_set_cnt += 1UL;
     542           0 :             ctx->shredder_fec_set_idx = (ctx->shredder_fec_set_idx+1UL)%ctx->shredder_max_fec_set_idx;
     543             : 
     544           0 :             pend_sz -= load_for_32_shreds;
     545           0 :           }
     546             : 
     547           0 :           fd_shredder_fini_batch( ctx->shredder );
     548           0 :           shredding_timing += fd_tickcount();
     549             : 
     550             :           /* Update metrics */
     551           0 :           fd_histf_sample( ctx->metrics->batch_sz,             batch_sz /* without padding */    );
     552           0 :           fd_histf_sample( ctx->metrics->batch_microblock_cnt, ctx->pending_batch.microblock_cnt );
     553           0 :           fd_histf_sample( ctx->metrics->shredding_timing,     (ulong)shredding_timing           );
     554           0 :         } else {
     555           0 :           ctx->send_fec_set_cnt = 0UL; /* verbose */
     556             : 
     557           0 :           ulong shred_type = FD_SHRED_TYPE_MERKLE_DATA_CHAINED;
     558           0 :           if( FD_UNLIKELY( entry_meta->block_complete ) ) {
     559           0 :             shred_type = FD_SHRED_TYPE_MERKLE_DATA_CHAINED_RESIGNED;
     560           0 :           }
     561           0 :           if( FD_LIKELY( IS_FIREDANCER ) ) {
     562           0 :             shred_type = FD_SHRED_TYPE_MERKLE_DATA;
     563           0 :           }
     564           0 :           fd_shredder_skip_batch( ctx->shredder, batch_sz_padded, target_slot, shred_type );
     565           0 :         }
     566             : 
     567           0 :         ctx->pending_batch.slot           = 0UL;
     568           0 :         ctx->pending_batch.pos            = 0UL;
     569           0 :         ctx->pending_batch.microblock_cnt = 0UL;
     570           0 :         ctx->pending_batch.txn_cnt        = 0UL;
     571           0 :         ctx->batch_cnt++;
     572           0 :       }
     573             : 
     574           0 :       if( FD_UNLIKELY( init_new_batch ) ) {
     575             :         /* TODO: this assumes that SHOULD_PROCESS_THESE_SHREDS is
     576             :            constant across batches.  Otherwise, the condition may
     577             :            need to be removed (or adjusted). */
     578           0 :         if( FD_UNLIKELY( SHOULD_PROCESS_THESE_SHREDS ) ) {
     579             :           /* Ugh, yet another memcpy */
     580           0 :           fd_memcpy( ctx->pending_batch.payload + 0UL /* verbose */, entry, entry_sz );
     581           0 :         }
     582           0 :         ctx->pending_batch.slot           = target_slot;
     583           0 :         ctx->pending_batch.pos            = entry_sz;
     584           0 :         ctx->pending_batch.microblock_cnt = 1UL;
     585           0 :         ctx->pending_batch.txn_cnt        = microblock->txn_cnt;
     586           0 :       }
     587           0 :     }
     588           0 :   } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
     589             :     /* The common case, from the net tile.  The FEC resolver API does
     590             :        not present a prepare/commit model. If we get overrun between
     591             :        when the FEC resolver verifies the signature and when it stores
     592             :        the local copy, we could end up storing and retransmitting
     593             :        garbage.  Instead we copy it locally, sadly, and only give it to
     594             :        the FEC resolver when we know it won't be overrun anymore. */
     595           0 :     uchar const * dcache_entry = fd_net_rx_translate_frag( &ctx->in[ in_idx ].net_rx, chunk, ctl, sz );
     596           0 :     ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
     597           0 :     FD_TEST( hdr_sz <= sz ); /* Should be ensured by the net tile */
     598           0 :     fd_shred_t const * shred = fd_shred_parse( dcache_entry+hdr_sz, sz-hdr_sz );
     599           0 :     if( FD_UNLIKELY( !shred ) ) {
     600           0 :       ctx->skip_frag = 1;
     601           0 :       return;
     602           0 :     };
     603             : 
     604             :     /* Drop unchained merkle shreds (if feature is active) */
     605           0 :     int is_unchained = !fd_shred_is_chained( fd_shred_type( shred->variant ) );
     606           0 :     if( FD_UNLIKELY( is_unchained && shred->slot >= ctx->features_activation->drop_unchained_merkle_shreds ) ) {
     607           0 :       ctx->metrics->shred_rejected_unchained_cnt++;
     608           0 :       ctx->skip_frag = 1;
     609           0 :       return;
     610           0 :     };
     611             : 
     612             :     /* all shreds in the same FEC set will have the same signature
     613             :        so we can round-robin shreds between the shred tiles based on
     614             :        just the signature without splitting individual FEC sets. */
     615           0 :     ulong sig = fd_ulong_load_8( shred->signature );
     616           0 :     if( FD_LIKELY( sig%ctx->round_robin_cnt!=ctx->round_robin_id ) ) {
     617           0 :       ctx->skip_frag = 1;
     618           0 :       return;
     619           0 :     }
     620           0 :     fd_memcpy( ctx->shred_buffer, dcache_entry+hdr_sz, sz-hdr_sz );
     621           0 :     ctx->shred_buffer_sz = sz-hdr_sz;
     622           0 :   }
     623           0 : }
     624             : 
     625             : static inline void
     626             : send_shred( fd_shred_ctx_t                 * ctx,
     627             :             fd_shred_t const               * shred,
     628             :             fd_shred_dest_weighted_t const * dest,
     629           0 :             ulong                            tsorig ) {
     630             : 
     631           0 :   if( FD_UNLIKELY( !dest->ip4 ) ) return;
     632             : 
     633           0 :   uchar * packet = fd_chunk_to_laddr( ctx->net_out_mem, ctx->net_out_chunk );
     634             : 
     635           0 :   int is_data = fd_shred_is_data( fd_shred_type( shred->variant ) );
     636           0 :   fd_ip4_udp_hdrs_t * hdr  = (fd_ip4_udp_hdrs_t *)packet;
     637           0 :   *hdr = *( is_data ? ctx->data_shred_net_hdr : ctx->parity_shred_net_hdr );
     638             : 
     639           0 :   fd_ip4_hdr_t * ip4 = hdr->ip4;
     640           0 :   ip4->daddr  = dest->ip4;
     641           0 :   ip4->net_id = fd_ushort_bswap( ctx->net_id++ );
     642           0 :   ip4->check  = 0U;
     643           0 :   ip4->check  = fd_ip4_hdr_check_fast( ip4 );
     644             : 
     645           0 :   hdr->udp->net_dport = fd_ushort_bswap( dest->port );
     646             : 
     647           0 :   ulong shred_sz = fd_ulong_if( is_data, FD_SHRED_MIN_SZ, FD_SHRED_MAX_SZ );
     648           0 : #if FD_HAS_AVX
     649             :   /* We're going to copy this shred potentially a bunch of times without
     650             :      reading it again, and we'd rather not thrash our cache, so we want
     651             :      to use non-temporal writes here.  We need to make sure we don't
     652             :      touch the cache line containing the network headers that we just
     653             :      wrote to though.  We know the destination is 64 byte aligned.  */
     654           0 :   FD_STATIC_ASSERT( sizeof(*hdr)<64UL, non_temporal );
     655             :   /* src[0:sizeof(hdrs)] is invalid, but now we want to copy
     656             :      dest[i]=src[i] for i>=sizeof(hdrs), so it simplifies the code. */
     657           0 :   uchar const * src = (uchar const *)((ulong)shred - sizeof(fd_ip4_udp_hdrs_t));
     658           0 :   memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), src+sizeof(fd_ip4_udp_hdrs_t), 64UL-sizeof(fd_ip4_udp_hdrs_t) );
     659             : 
     660           0 :   ulong end_offset = shred_sz + sizeof(fd_ip4_udp_hdrs_t);
     661           0 :   ulong i;
     662           0 :   for( i=64UL; end_offset-i<64UL; i+=64UL ) {
     663           0 : #  if FD_HAS_AVX512
     664           0 :     _mm512_stream_si512( (void *)(packet+i     ), _mm512_loadu_si512( (void const *)(src+i     ) ) );
     665             : #  else
     666           0 :     _mm256_stream_si256( (void *)(packet+i     ), _mm256_loadu_si256( (void const *)(src+i     ) ) );
     667           0 :     _mm256_stream_si256( (void *)(packet+i+32UL), _mm256_loadu_si256( (void const *)(src+i+32UL) ) );
     668           0 : #  endif
     669           0 :   }
     670           0 :   _mm_sfence();
     671           0 :   fd_memcpy( packet+i, src+i, end_offset-i ); /* Copy the last partial cache line */
     672             : 
     673             : #else
     674             :   fd_memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), shred, shred_sz );
     675             : #endif
     676             : 
     677           0 :   ulong pkt_sz = shred_sz + sizeof(fd_ip4_udp_hdrs_t);
     678           0 :   ulong tspub  = fd_frag_meta_ts_comp( fd_tickcount() );
     679           0 :   ulong sig    = fd_disco_netmux_sig( dest->ip4, dest->port, dest->ip4, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) );
     680           0 :   fd_mcache_publish( ctx->net_out_mcache, ctx->net_out_depth, ctx->net_out_seq, sig, ctx->net_out_chunk, pkt_sz, 0UL, tsorig, tspub );
     681           0 :   ctx->net_out_seq   = fd_seq_inc( ctx->net_out_seq, 1UL );
     682           0 :   ctx->net_out_chunk = fd_dcache_compact_next( ctx->net_out_chunk, pkt_sz, ctx->net_out_chunk0, ctx->net_out_wmark );
     683           0 : }
     684             : 
     685             : static void
     686             : after_frag( fd_shred_ctx_t *    ctx,
     687             :             ulong               in_idx,
     688             :             ulong               seq,
     689             :             ulong               sig,
     690             :             ulong               sz,
     691             :             ulong               tsorig,
     692             :             ulong               _tspub,
     693           0 :             fd_stem_context_t * stem ) {
     694           0 :   (void)seq;
     695           0 :   (void)sz;
     696           0 :   (void)tsorig;
     697           0 :   (void)_tspub;
     698             : 
     699           0 :   if( FD_UNLIKELY( ctx->skip_frag ) ) return;
     700             : 
     701           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_CONTACT ) ) {
     702           0 :     finalize_new_cluster_contact_info( ctx );
     703           0 :     return;
     704           0 :   }
     705             : 
     706           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_STAKE ) ) {
     707           0 :     fd_stake_ci_stake_msg_fini( ctx->stake_ci );
     708           0 :     return;
     709           0 :   }
     710             : 
     711           0 :   if( FD_UNLIKELY( (ctx->in_kind[ in_idx ]==IN_KIND_POH) & (ctx->send_fec_set_cnt==0UL) ) ) {
     712             :     /* Entry from PoH that didn't trigger a new FEC set to be made */
     713           0 :     return;
     714           0 :   }
     715             : 
     716           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPAIR ) ) {
     717           0 :     FD_MCNT_INC( SHRED, FORCE_COMPLETE_REQUEST, 1UL );
     718           0 :     fd_ed25519_sig_t const * shred_sig = (fd_ed25519_sig_t const *)fd_type_pun( ctx->shred_buffer );
     719           0 :     if( FD_UNLIKELY( fd_fec_resolver_done_contains( ctx->resolver, shred_sig ) ) ) {
     720             :       /* This is a FEC completion message from the repair tile.  We need
     721             :          to make sure that we don't force complete something that's just
     722             :          been completed. */
     723           0 :       FD_MCNT_INC( SHRED, FORCE_COMPLETE_FAILURE, 1UL );
     724           0 :       return;
     725           0 :     }
     726             : 
     727           0 :     uint last_idx = fd_disco_repair_shred_sig_last_shred_idx( sig );
     728           0 :     uchar buf_last_shred[FD_SHRED_MIN_SZ];
     729           0 :     int rv = fd_fec_resolver_shred_query( ctx->resolver, shred_sig, last_idx, buf_last_shred );
     730           0 :     if( FD_UNLIKELY( rv != FD_FEC_RESOLVER_SHRED_OKAY ) ) {
     731             : 
     732             :       /* We will hit this case if FEC is no longer in curr_map, or if
     733             :          the shred signature is invalid, which is okay.
     734             : 
     735             :          There's something of a race condition here.  It's possible (but
     736             :          very unlikely) that between when the repair tile observed the
     737             :          FEC set needed to be force completed and now, the FEC set was
     738             :          completed, and then so many additional FEC sets were completed
     739             :          that it fell off the end of the done list.  In that case
     740             :          fd_fec_resolver_done_contains would have returned false, but
     741             :          fd_fec_resolver_shred_query will not return OKAY, which means
     742             :          we'll end up in this block of code.  If the FEC set was
     743             :          completed, then there's nothing we need to do.  If it was
     744             :          spilled, then we'll need to re-repair all the shreds in the FEC
     745             :          set, but it's not fatal. */
     746             : 
     747           0 :       FD_MCNT_INC( SHRED, FORCE_COMPLETE_FAILURE, 1UL );
     748           0 :       return;
     749           0 :     }
     750           0 :     fd_shred_t * out_last_shred = (fd_shred_t *)fd_type_pun( buf_last_shred );
     751             : 
     752           0 :     fd_fec_set_t const * out_fec_set[1];
     753           0 :     rv = fd_fec_resolver_force_complete( ctx->resolver, out_last_shred, out_fec_set );
     754           0 :     if( FD_UNLIKELY( rv != FD_FEC_RESOLVER_SHRED_COMPLETES ) ){
     755           0 :       FD_LOG_WARNING(( "Shred tile %lu cannot force complete the slot %lu fec_set_idx %u %s", ctx->round_robin_id, out_last_shred->slot, out_last_shred->fec_set_idx, FD_BASE58_ENC_32_ALLOCA( shred_sig ) ));
     756           0 :       FD_MCNT_INC( SHRED, FORCE_COMPLETE_FAILURE, 1UL );
     757           0 :       return;
     758           0 :     }
     759           0 :     FD_MCNT_INC( SHRED, FORCE_COMPLETE_SUCCESS, 1UL );
     760           0 :     FD_TEST( ctx->fec_sets <= *out_fec_set );
     761           0 :     ctx->send_fec_set_idx[ 0UL ] = (ulong)(*out_fec_set - ctx->fec_sets);
     762           0 :     ctx->send_fec_set_cnt = 1UL;
     763           0 :     ctx->shredded_txn_cnt = 0UL;
     764           0 :   }
     765             : 
     766           0 :   ulong fanout = 200UL; /* Default Agave's DATA_PLANE_FANOUT = 200UL */
     767             : 
     768           0 :   fd_bmtree_node_t out_merkle_root;
     769           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
     770           0 :     uchar * shred_buffer    = ctx->shred_buffer;
     771           0 :     ulong   shred_buffer_sz = ctx->shred_buffer_sz;
     772             : 
     773           0 :     fd_shred_t const * shred = fd_shred_parse( shred_buffer, shred_buffer_sz );
     774             : 
     775           0 :     if( FD_UNLIKELY( !shred       ) ) { ctx->metrics->shred_processing_result[ 1 ]++; return; }
     776             : 
     777           0 :     fd_epoch_leaders_t const * lsched = fd_stake_ci_get_lsched_for_slot( ctx->stake_ci, shred->slot );
     778           0 :     if( FD_UNLIKELY( !lsched      ) ) { ctx->metrics->shred_processing_result[ 0 ]++; return; }
     779             : 
     780           0 :     fd_pubkey_t const * slot_leader = fd_epoch_leaders_get( lsched, shred->slot );
     781           0 :     if( FD_UNLIKELY( !slot_leader ) ) { ctx->metrics->shred_processing_result[ 0 ]++; return; } /* Count this as bad slot too */
     782             : 
     783           0 :     fd_fec_set_t const * out_fec_set[1];
     784           0 :     fd_shred_t const   * out_shred[1];
     785             : 
     786           0 :     long add_shred_timing  = -fd_tickcount();
     787           0 :     int rv = fd_fec_resolver_add_shred( ctx->resolver, shred, shred_buffer_sz, slot_leader->uc, out_fec_set, out_shred, &out_merkle_root );
     788           0 :     add_shred_timing      +=  fd_tickcount();
     789             : 
     790           0 :     fd_histf_sample( ctx->metrics->add_shred_timing, (ulong)add_shred_timing );
     791           0 :     ctx->metrics->shred_processing_result[ rv + FD_FEC_RESOLVER_ADD_SHRED_RETVAL_OFF+FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT ]++;
     792             : 
     793             :     /* Fanout is subject to feature activation. The code below replicates
     794             :         Agave's get_data_plane_fanout() in turbine/src/cluster_nodes.rs
     795             :         on 2025-03-25. Default Agave's DATA_PLANE_FANOUT = 200UL.
     796             :         TODO once the experiments are disabled, consider removing these
     797             :         fanout variations from the code. */
     798           0 :     if( FD_LIKELY( shred->slot >= ctx->features_activation->disable_turbine_fanout_experiments ) ) {
     799           0 :       fanout = 200UL;
     800           0 :     } else {
     801           0 :       if( FD_LIKELY( shred->slot >= ctx->features_activation->enable_turbine_extended_fanout_experiments ) ) {
     802           0 :         switch( shred->slot % 359 ) {
     803           0 :           case  11UL: fanout = 1152UL;  break;
     804           0 :           case  61UL: fanout = 1280UL;  break;
     805           0 :           case 111UL: fanout = 1024UL;  break;
     806           0 :           case 161UL: fanout = 1408UL;  break;
     807           0 :           case 211UL: fanout =  896UL;  break;
     808           0 :           case 261UL: fanout = 1536UL;  break;
     809           0 :           case 311UL: fanout =  768UL;  break;
     810           0 :           default   : fanout =  200UL;
     811           0 :         }
     812           0 :       } else {
     813           0 :         switch( shred->slot % 359 ) {
     814           0 :           case  11UL: fanout =   64UL;  break;
     815           0 :           case  61UL: fanout =  768UL;  break;
     816           0 :           case 111UL: fanout =  128UL;  break;
     817           0 :           case 161UL: fanout =  640UL;  break;
     818           0 :           case 211UL: fanout =  256UL;  break;
     819           0 :           case 261UL: fanout =  512UL;  break;
     820           0 :           case 311UL: fanout =  384UL;  break;
     821           0 :           default   : fanout =  200UL;
     822           0 :         }
     823           0 :       }
     824           0 :     }
     825             : 
     826           0 :     if( (rv==FD_FEC_RESOLVER_SHRED_OKAY) | (rv==FD_FEC_RESOLVER_SHRED_COMPLETES) ) {
     827           0 :       if( FD_LIKELY( fd_disco_netmux_sig_proto( sig ) != DST_PROTO_REPAIR ) ) {
     828             :         /* Relay this shred */
     829           0 :         ulong max_dest_cnt[1];
     830           0 :         do {
     831             :           /* If we've validated the shred and it COMPLETES but we can't
     832             :             compute the destination for whatever reason, don't forward
     833             :             the shred, but still send it to the blockstore. */
     834           0 :           fd_shred_dest_t * sdest = fd_stake_ci_get_sdest_for_slot( ctx->stake_ci, shred->slot );
     835           0 :           if( FD_UNLIKELY( !sdest ) ) break;
     836           0 :           fd_shred_dest_idx_t * dests = fd_shred_dest_compute_children( sdest, &shred, 1UL, ctx->scratchpad_dests, 1UL, fanout, fanout, max_dest_cnt );
     837           0 :           if( FD_UNLIKELY( !dests ) ) break;
     838             : 
     839           0 :           send_shred( ctx, *out_shred, ctx->adtl_dest, ctx->tsorig );
     840           0 :           for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, *out_shred, fd_shred_dest_idx_to_dest( sdest, dests[ j ]), ctx->tsorig );
     841           0 :         } while( 0 );
     842           0 :       }
     843             : 
     844           0 :       if( FD_LIKELY( ctx->repair_out_idx!=ULONG_MAX ) ) { /* Only send to repair in full Firedancer */
     845             : 
     846             :         /* Construct the sig from the shred. */
     847             : 
     848           0 :         int  is_code               = fd_shred_is_code( fd_shred_type( shred->variant ) );
     849           0 :         uint shred_idx_or_data_cnt = shred->idx;
     850           0 :         int  completes             = 0;
     851           0 :         if( FD_LIKELY( is_code ) ) shred_idx_or_data_cnt = shred->code.data_cnt;  /* optimize for code_cnt >= data_cnt */
     852           0 :         else  completes = shred->data.flags & ( FD_SHRED_DATA_FLAG_SLOT_COMPLETE | FD_SHRED_DATA_FLAG_DATA_COMPLETE );
     853           0 :         ulong sig = fd_disco_shred_repair_shred_sig( !!completes, shred->slot, shred->fec_set_idx, is_code, shred_idx_or_data_cnt );
     854             : 
     855             :         /* Copy the shred header into the frag and publish. */
     856             : 
     857           0 :         ulong sz = fd_shred_header_sz( shred->variant );
     858           0 :         fd_memcpy( fd_chunk_to_laddr( ctx->repair_out_mem, ctx->repair_out_chunk ), shred, sz );
     859           0 :         ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
     860           0 :         fd_stem_publish( stem, ctx->repair_out_idx, sig, ctx->repair_out_chunk, sz, 0UL, ctx->tsorig, tspub );
     861           0 :         ctx->repair_out_chunk = fd_dcache_compact_next( ctx->repair_out_chunk, sz, ctx->repair_out_chunk0, ctx->repair_out_wmark );
     862           0 :       }
     863           0 :     }
     864           0 :     if( FD_LIKELY( rv!=FD_FEC_RESOLVER_SHRED_COMPLETES ) ) return;
     865             : 
     866           0 :     FD_TEST( ctx->fec_sets <= *out_fec_set );
     867           0 :     ctx->send_fec_set_idx[ 0UL ] = (ulong)(*out_fec_set - ctx->fec_sets);
     868           0 :     ctx->send_fec_set_cnt = 1UL;
     869           0 :     ctx->shredded_txn_cnt = 0UL;
     870           0 :   }
     871             : 
     872           0 :   if( FD_UNLIKELY( ctx->send_fec_set_cnt==0UL ) ) return;
     873             : 
     874             :   /* Try to distribute shredded txn count across the fec sets.
     875             :      This is an approximation, but it is acceptable. */
     876           0 :   ulong shredded_txn_cnt_per_fec_set  = ctx->shredded_txn_cnt / ctx->send_fec_set_cnt;
     877           0 :   ulong shredded_txn_cnt_remain       = ctx->shredded_txn_cnt - shredded_txn_cnt_per_fec_set * ctx->send_fec_set_cnt;
     878           0 :   ulong shredded_txn_cnt_last_fec_set = shredded_txn_cnt_per_fec_set + shredded_txn_cnt_remain;
     879             : 
     880             :   /* If this shred completes a FEC set or is part of a microblock from
     881             :     pack (ie. we're leader), we now have a full FEC set: so we notify
     882             :     repair and insert into the blockstore, as well as retransmit. */
     883             : 
     884           0 :   for( ulong fset_k=0; fset_k<ctx->send_fec_set_cnt; fset_k++ ) {
     885             : 
     886           0 :     fd_fec_set_t * set = ctx->fec_sets + ctx->send_fec_set_idx[ fset_k ];
     887           0 :     fd_shred34_t * s34 = ctx->shred34 + 4UL*ctx->send_fec_set_idx[ fset_k ];
     888             : 
     889           0 :     s34[ 0 ].shred_cnt =                         fd_ulong_min( set->data_shred_cnt,   34UL );
     890           0 :     s34[ 1 ].shred_cnt = set->data_shred_cnt   - fd_ulong_min( set->data_shred_cnt,   34UL );
     891           0 :     s34[ 2 ].shred_cnt =                         fd_ulong_min( set->parity_shred_cnt, 34UL );
     892           0 :     s34[ 3 ].shred_cnt = set->parity_shred_cnt - fd_ulong_min( set->parity_shred_cnt, 34UL );
     893             : 
     894           0 :     ulong s34_cnt     = 2UL + !!(s34[ 1 ].shred_cnt) + !!(s34[ 3 ].shred_cnt);
     895           0 :     ulong txn_per_s34 = fd_ulong_if( fset_k<( ctx->send_fec_set_cnt - 1UL ), shredded_txn_cnt_per_fec_set, shredded_txn_cnt_last_fec_set ) / s34_cnt;
     896             : 
     897             :     /* Attribute the transactions evenly to the non-empty shred34s */
     898           0 :     for( ulong j=0UL; j<4UL; j++ ) s34[ j ].est_txn_cnt = fd_ulong_if( s34[ j ].shred_cnt>0UL, txn_per_s34, 0UL );
     899             : 
     900             :     /* Add whatever is left to the last shred34 */
     901           0 :     s34[ fd_ulong_if( s34[ 3 ].shred_cnt>0UL, 3, 2 ) ].est_txn_cnt += ctx->shredded_txn_cnt - txn_per_s34*s34_cnt;
     902             : 
     903             :     /* Set the sz field so that metrics are more accurate. */
     904           0 :     ulong sz0 = sizeof(fd_shred34_t) - (34UL - s34[ 0 ].shred_cnt)*FD_SHRED_MAX_SZ;
     905           0 :     ulong sz1 = sizeof(fd_shred34_t) - (34UL - s34[ 1 ].shred_cnt)*FD_SHRED_MAX_SZ;
     906           0 :     ulong sz2 = sizeof(fd_shred34_t) - (34UL - s34[ 2 ].shred_cnt)*FD_SHRED_MAX_SZ;
     907           0 :     ulong sz3 = sizeof(fd_shred34_t) - (34UL - s34[ 3 ].shred_cnt)*FD_SHRED_MAX_SZ;
     908             : 
     909           0 :     if( FD_LIKELY( ctx->blockstore ) ) { /* firedancer-only */
     910             : 
     911             :       /* Insert shreds into the blockstore. Note we do this regardless of
     912             :         whether the shreds are for one of our leader slots or not. Even
     913             :         though there is a separate link that directly connects pack and
     914             :         replay when we are leader, we still need the shreds in the
     915             :         blockstore to serve repair requests. */
     916             : 
     917           0 :       for( ulong i=0UL; i<set->data_shred_cnt; i++ ) {
     918           0 :         fd_shred_t const * data_shred = (fd_shred_t const *)fd_type_pun_const( set->data_shreds[ i ] );
     919           0 :         fd_blockstore_shred_insert( ctx->blockstore, data_shred );
     920           0 :       }
     921           0 :     }
     922             : 
     923           0 :     if( FD_LIKELY( ctx->repair_out_idx!=ULONG_MAX ) ) { /* firedancer-only */
     924             : 
     925             :       /* Additionally, publish a frag to notify repair that the FEC set is
     926             :         complete. Note the ordering wrt blockstore shred insertion above
     927             :         is intentional: shreds are inserted into the blockstore before
     928             :         notifying repair. This is because the replay tile is downstream
     929             :         of repair, and replay assumes the shreds are already in the
     930             :         blockstore when repair notifies it that the FEC set is complete,
     931             :         and we don't know whether shred will finish inserting into
     932             :         blockstore first or repair will finish validating the FEC set
     933             :         first. */
     934             : 
     935           0 :       fd_shred_t const * last = (fd_shred_t const *)fd_type_pun_const( set->data_shreds[ set->data_shred_cnt - 1 ] );
     936             : 
     937             :       /* Copy the last data shred's header and merkle root of the FEC set into the frag. */
     938           0 :       ulong   sig   =  fd_disco_shred_repair_fec_sig( last->slot, last->fec_set_idx, (uint)set->data_shred_cnt, last->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE, last->data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE );
     939           0 :       uchar * chunk = fd_chunk_to_laddr( ctx->repair_out_mem, ctx->repair_out_chunk );
     940           0 :       memcpy( chunk, last, FD_SHRED_DATA_HEADER_SZ );
     941           0 :       memcpy( chunk+FD_SHRED_DATA_HEADER_SZ, out_merkle_root.hash, FD_SHRED_MERKLE_ROOT_SZ );
     942           0 :       ulong sz    = FD_SHRED_DATA_HEADER_SZ + FD_SHRED_MERKLE_ROOT_SZ;
     943           0 :       ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
     944           0 :       fd_stem_publish( stem, ctx->repair_out_idx, sig, ctx->repair_out_chunk, sz, 0UL, ctx->tsorig, tspub );
     945           0 :       ctx->repair_out_chunk = fd_dcache_compact_next( ctx->repair_out_chunk, sz, ctx->repair_out_chunk0, ctx->repair_out_wmark );
     946           0 :     } else if( FD_UNLIKELY( ctx->store_out_idx != ULONG_MAX ) ) { /* frankendancer-only */
     947             : 
     948             :       /* Send to the blockstore, skipping any empty shred34_t s. */
     949             : 
     950           0 :       ulong new_sig = ctx->in_kind[ in_idx ]!=IN_KIND_NET; /* sig==0 means the store tile will do extra checks */
     951           0 :       ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
     952           0 :       fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+0UL ), sz0, 0UL, ctx->tsorig, tspub );
     953           0 :       if( FD_UNLIKELY( s34[ 1 ].shred_cnt ) )
     954           0 :         fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+1UL ), sz1, 0UL, ctx->tsorig, tspub );
     955           0 :       if( FD_UNLIKELY( s34[ 2 ].shred_cnt ) )
     956           0 :         fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+2UL), sz2, 0UL, ctx->tsorig, tspub );
     957           0 :       if( FD_UNLIKELY( s34[ 3 ].shred_cnt ) )
     958           0 :         fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+3UL ), sz3, 0UL, ctx->tsorig, tspub );
     959           0 :     }
     960             : 
     961             :     /* Compute all the destinations for all the new shreds */
     962             : 
     963           0 :     fd_shred_t const * new_shreds[ FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX ];
     964           0 :     ulong k=0UL;
     965           0 :     for( ulong i=0UL; i<set->data_shred_cnt; i++ )
     966           0 :       if( !d_rcvd_test( set->data_shred_rcvd,   i ) )  new_shreds[ k++ ] = (fd_shred_t const *)set->data_shreds  [ i ];
     967           0 :     for( ulong i=0UL; i<set->parity_shred_cnt; i++ )
     968           0 :       if( !p_rcvd_test( set->parity_shred_rcvd, i ) )  new_shreds[ k++ ] = (fd_shred_t const *)set->parity_shreds[ i ];
     969             : 
     970           0 :     if( FD_UNLIKELY( !k ) ) return;
     971           0 :     fd_shred_dest_t * sdest = fd_stake_ci_get_sdest_for_slot( ctx->stake_ci, new_shreds[ 0 ]->slot );
     972           0 :     if( FD_UNLIKELY( !sdest ) ) return;
     973             : 
     974           0 :     ulong out_stride;
     975           0 :     ulong max_dest_cnt[1];
     976           0 :     fd_shred_dest_idx_t * dests;
     977           0 :     if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
     978           0 :       out_stride = k;
     979             :       /* In the case of feature activation, the fanout used below is
     980             :           the same as the one calculated/modified previously at the
     981             :           begining of after_frag() for IN_KIND_NET in this slot. */
     982           0 :       dests = fd_shred_dest_compute_children( sdest, new_shreds, k, ctx->scratchpad_dests, k, fanout, fanout, max_dest_cnt );
     983           0 :     } else {
     984           0 :       out_stride = 1UL;
     985           0 :       *max_dest_cnt = 1UL;
     986           0 :       dests = fd_shred_dest_compute_first   ( sdest, new_shreds, k, ctx->scratchpad_dests );
     987           0 :     }
     988           0 :     if( FD_UNLIKELY( !dests ) ) return;
     989             : 
     990             :     /* Send only the ones we didn't receive. */
     991           0 :     for( ulong i=0UL; i<k; i++ ) {
     992           0 :       send_shred( ctx, new_shreds[ i ], ctx->adtl_dest, ctx->tsorig );
     993           0 :       for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, new_shreds[ i ], fd_shred_dest_idx_to_dest( sdest, dests[ j*out_stride+i ]), ctx->tsorig );
     994           0 :     }
     995           0 :   }
     996           0 : }
     997             : 
     998             : static void
     999             : privileged_init( fd_topo_t *      topo,
    1000           0 :                  fd_topo_tile_t * tile ) {
    1001           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
    1002           0 :   FD_TEST( scratch!=NULL );
    1003             : 
    1004           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
    1005           0 :   fd_shred_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_shred_ctx_t ), sizeof( fd_shred_ctx_t ) );
    1006             : 
    1007           0 :   if( FD_UNLIKELY( !strcmp( tile->shred.identity_key_path, "" ) ) )
    1008           0 :     FD_LOG_ERR(( "identity_key_path not set" ));
    1009             : 
    1010           0 :   ctx->identity_key[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->shred.identity_key_path, /* pubkey only: */ 1 ) );
    1011           0 : }
    1012             : 
    1013             : static void
    1014             : fd_shred_signer( void *        signer_ctx,
    1015             :                  uchar         signature[ static 64 ],
    1016           0 :                  uchar const   merkle_root[ static 32 ] ) {
    1017           0 :   fd_keyguard_client_sign( signer_ctx, signature, merkle_root, 32UL, FD_KEYGUARD_SIGN_TYPE_ED25519 );
    1018           0 : }
    1019             : 
    1020             : static void
    1021             : unprivileged_init( fd_topo_t *      topo,
    1022           0 :                    fd_topo_tile_t * tile ) {
    1023             : 
    1024           0 :   FD_TEST( 0==strcmp( topo->links[tile->out_link_id[ NET_OUT_IDX   ]].name, "shred_net"   ) );
    1025           0 :   FD_TEST( 0==strcmp( topo->links[tile->out_link_id[ SIGN_OUT_IDX  ]].name, "shred_sign"  ) );
    1026             : 
    1027           0 :   if( FD_UNLIKELY( !tile->out_cnt ) )
    1028           0 :     FD_LOG_ERR(( "shred tile has no primary output link" ));
    1029             : 
    1030           0 :   ulong shred_store_mcache_depth = tile->shred.depth;
    1031           0 :   if( topo->links[ tile->out_link_id[ 0 ] ].depth != shred_store_mcache_depth )
    1032           0 :     FD_LOG_ERR(( "shred tile out depths are not equal %lu %lu",
    1033           0 :                  topo->links[ tile->out_link_id[ 0 ] ].depth, shred_store_mcache_depth ));
    1034             : 
    1035           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
    1036           0 :   FD_TEST( scratch!=NULL );
    1037             : 
    1038           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
    1039           0 :   fd_shred_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_shred_ctx_t ), sizeof( fd_shred_ctx_t ) );
    1040             : 
    1041           0 :   ctx->round_robin_cnt = fd_topo_tile_name_cnt( topo, tile->name );
    1042           0 :   ctx->round_robin_id  = tile->kind_id;
    1043           0 :   ctx->batch_cnt       = 0UL;
    1044           0 :   ctx->slot            = ULONG_MAX;
    1045             : 
    1046             :   /* If the default partial_depth is ever changed, correspondingly
    1047             :      change the size of the fd_fec_intra_pool in fd_fec_repair. */
    1048           0 :   ulong fec_resolver_footprint = fd_fec_resolver_footprint( tile->shred.fec_resolver_depth, 1UL, shred_store_mcache_depth,
    1049           0 :                                                             128UL * tile->shred.fec_resolver_depth );
    1050           0 :   ulong fec_set_cnt            = shred_store_mcache_depth + tile->shred.fec_resolver_depth + 4UL;
    1051           0 :   ulong fec_sets_required_sz   = fec_set_cnt*DCACHE_ENTRIES_PER_FEC_SET*sizeof(fd_shred34_t);
    1052             : 
    1053           0 :   void * fec_sets_shmem = NULL;
    1054           0 :   ctx->repair_out_idx = fd_topo_find_tile_out_link( topo, tile, "shred_repair", ctx->round_robin_id );
    1055           0 :   ctx->store_out_idx  = fd_topo_find_tile_out_link( topo, tile, "shred_store",  ctx->round_robin_id );
    1056           0 :   if( FD_LIKELY( ctx->repair_out_idx!=ULONG_MAX ) ) { /* firedancer-only */
    1057           0 :     fd_topo_link_t * repair_out = &topo->links[ tile->out_link_id[ ctx->repair_out_idx ] ];
    1058           0 :     ctx->repair_out_mem    = topo->workspaces[ topo->objs[ repair_out->dcache_obj_id ].wksp_id ].wksp;
    1059           0 :     ctx->repair_out_chunk0 = fd_dcache_compact_chunk0( ctx->repair_out_mem, repair_out->dcache );
    1060           0 :     ctx->repair_out_wmark  = fd_dcache_compact_wmark ( ctx->repair_out_mem, repair_out->dcache, repair_out->mtu );
    1061           0 :     ctx->repair_out_chunk  = ctx->repair_out_chunk0;
    1062           0 :     FD_TEST( fd_dcache_compact_is_safe( ctx->repair_out_mem, repair_out->dcache, repair_out->mtu, repair_out->depth ) );
    1063           0 :     ulong fec_sets_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "fec_sets" );
    1064           0 :     if( FD_UNLIKELY( fec_sets_obj_id == ULONG_MAX ) ) FD_LOG_ERR(( "invalid firedancer topo" ));
    1065           0 :     fd_topo_obj_t const * obj = &topo->objs[ fec_sets_obj_id ];
    1066           0 :     if( FD_UNLIKELY( obj->footprint<(fec_sets_required_sz*ctx->round_robin_cnt) ) ) {
    1067           0 :       FD_LOG_ERR(( "fec_sets wksp obj too small. It is %lu bytes but must be at least %lu bytes. ",
    1068           0 :                    obj->footprint,
    1069           0 :                    fec_sets_required_sz ));
    1070           0 :     }
    1071           0 :     fec_sets_shmem = (uchar *)fd_topo_obj_laddr( topo, fec_sets_obj_id ) + (ctx->round_robin_id * fec_sets_required_sz);
    1072           0 :   } else if ( FD_LIKELY( ctx->store_out_idx!=ULONG_MAX ) ) { /* frankendancer-only */
    1073           0 :     FD_TEST( 0==strcmp( topo->links[tile->out_link_id[ ctx->store_out_idx ]].name, "shred_store" ) );
    1074           0 :     fec_sets_shmem = topo->links[ tile->out_link_id[ ctx->store_out_idx ] ].dcache;
    1075           0 :     if( FD_UNLIKELY( fd_dcache_data_sz( fec_sets_shmem )<fec_sets_required_sz ) ) {
    1076           0 :       FD_LOG_ERR(( "shred_store dcache too small. It is %lu bytes but must be at least %lu bytes. ",
    1077           0 :                   fd_dcache_data_sz( fec_sets_shmem ),
    1078           0 :                   fec_sets_required_sz ));
    1079           0 :     }
    1080           0 :   }
    1081             : 
    1082           0 :   if( FD_UNLIKELY( !tile->shred.fec_resolver_depth ) ) FD_LOG_ERR(( "fec_resolver_depth not set" ));
    1083           0 :   if( FD_UNLIKELY( !tile->shred.shred_listen_port  ) ) FD_LOG_ERR(( "shred_listen_port not set" ));
    1084             : 
    1085           0 :   ulong bank_cnt   = fd_topo_tile_name_cnt( topo, "bank" );
    1086           0 :   ulong replay_cnt = fd_topo_tile_name_cnt( topo, "replay" );
    1087             : 
    1088           0 :   if( FD_UNLIKELY( !bank_cnt && !replay_cnt ) ) FD_LOG_ERR(( "0 bank/replay tiles" ));
    1089           0 :   if( FD_UNLIKELY( bank_cnt>MAX_BANK_CNT ) ) FD_LOG_ERR(( "Too many banks" ));
    1090             : 
    1091           0 :   void * _stake_ci = FD_SCRATCH_ALLOC_APPEND( l, fd_stake_ci_align(),              fd_stake_ci_footprint()            );
    1092           0 :   void * _resolver = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_resolver_align(),          fec_resolver_footprint             );
    1093           0 :   void * _shredder = FD_SCRATCH_ALLOC_APPEND( l, fd_shredder_align(),              fd_shredder_footprint()            );
    1094           0 :   void * _fec_sets = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_fec_set_t),            sizeof(fd_fec_set_t)*fec_set_cnt   );
    1095             : 
    1096           0 :   fd_fec_set_t * fec_sets = (fd_fec_set_t *)_fec_sets;
    1097           0 :   fd_shred34_t * shred34  = (fd_shred34_t *)fec_sets_shmem;
    1098             : 
    1099           0 :   for( ulong i=0UL; i<fec_set_cnt; i++ ) {
    1100           0 :     fd_shred34_t * p34_base = shred34 + i*DCACHE_ENTRIES_PER_FEC_SET;
    1101           0 :     for( ulong k=0UL; k<DCACHE_ENTRIES_PER_FEC_SET; k++ ) {
    1102           0 :       fd_shred34_t * p34 = p34_base + k;
    1103             : 
    1104           0 :       p34->stride   = (ulong)p34->pkts[1].buffer - (ulong)p34->pkts[0].buffer;
    1105           0 :       p34->offset   = (ulong)p34->pkts[0].buffer - (ulong)p34;
    1106           0 :       p34->shred_sz = fd_ulong_if( k<2UL, 1203UL, 1228UL );
    1107           0 :     }
    1108             : 
    1109           0 :     uchar ** data_shred   = fec_sets[ i ].data_shreds;
    1110           0 :     uchar ** parity_shred = fec_sets[ i ].parity_shreds;
    1111           0 :     for( ulong j=0UL; j<FD_REEDSOL_DATA_SHREDS_MAX;   j++ ) data_shred  [ j ] = p34_base[       j/34UL ].pkts[ j%34UL ].buffer;
    1112           0 :     for( ulong j=0UL; j<FD_REEDSOL_PARITY_SHREDS_MAX; j++ ) parity_shred[ j ] = p34_base[ 2UL + j/34UL ].pkts[ j%34UL ].buffer;
    1113           0 :   }
    1114             : 
    1115           0 : #define NONNULL( x ) (__extension__({                                        \
    1116           0 :       __typeof__((x)) __x = (x);                                             \
    1117           0 :       if( FD_UNLIKELY( !__x ) ) FD_LOG_ERR(( #x " was unexpectedly NULL" )); \
    1118           0 :       __x; }))
    1119             : 
    1120           0 :   ulong expected_shred_version = tile->shred.expected_shred_version;
    1121           0 :   if( FD_LIKELY( !expected_shred_version ) ) {
    1122           0 :     ulong busy_obj_id = fd_pod_query_ulong( topo->props, "poh_shred", ULONG_MAX );
    1123           0 :     FD_TEST( busy_obj_id!=ULONG_MAX );
    1124           0 :     ulong * gossip_shred_version = fd_fseq_join( fd_topo_obj_laddr( topo, busy_obj_id ) );
    1125           0 :     FD_LOG_INFO(( "Waiting for shred version to be determined via gossip." ));
    1126           0 :     do {
    1127           0 :       expected_shred_version = FD_VOLATILE_CONST( *gossip_shred_version );
    1128           0 :     } while( expected_shred_version==ULONG_MAX );
    1129           0 :   }
    1130             : 
    1131           0 :   if( FD_UNLIKELY( expected_shred_version > USHORT_MAX ) ) FD_LOG_ERR(( "invalid shred version %lu", expected_shred_version ));
    1132           0 :   FD_LOG_INFO(( "Using shred version %hu", (ushort)expected_shred_version ));
    1133             : 
    1134           0 :   ctx->keyswitch = fd_keyswitch_join( fd_topo_obj_laddr( topo, tile->keyswitch_obj_id ) );
    1135           0 :   FD_TEST( ctx->keyswitch );
    1136             : 
    1137             :   /* populate ctx */
    1138           0 :   ulong sign_in_idx = fd_topo_find_tile_in_link( topo, tile, "sign_shred", tile->kind_id );
    1139           0 :   FD_TEST( sign_in_idx!=ULONG_MAX );
    1140           0 :   fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id[ sign_in_idx ] ];
    1141           0 :   fd_topo_link_t * sign_out = &topo->links[ tile->out_link_id[ SIGN_OUT_IDX ] ];
    1142           0 :   NONNULL( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client,
    1143           0 :                                                             sign_out->mcache,
    1144           0 :                                                             sign_out->dcache,
    1145           0 :                                                             sign_in->mcache,
    1146           0 :                                                             sign_in->dcache ) ) );
    1147             : 
    1148           0 :   ulong shred_limit = fd_ulong_if( tile->shred.larger_shred_limits_per_block, 32UL*32UL*1024UL, 32UL*1024UL );
    1149           0 :   fd_fec_set_t * resolver_sets = fec_sets + (shred_store_mcache_depth+1UL)/2UL + 1UL;
    1150           0 :   ctx->shredder = NONNULL( fd_shredder_join     ( fd_shredder_new     ( _shredder, fd_shred_signer, ctx->keyguard_client, (ushort)expected_shred_version ) ) );
    1151           0 :   ctx->resolver = NONNULL( fd_fec_resolver_join ( fd_fec_resolver_new ( _resolver,
    1152           0 :                                                                         fd_shred_signer, ctx->keyguard_client,
    1153           0 :                                                                         tile->shred.fec_resolver_depth, 1UL,
    1154           0 :                                                                         (shred_store_mcache_depth+3UL)/2UL,
    1155           0 :                                                                         128UL * tile->shred.fec_resolver_depth, resolver_sets,
    1156           0 :                                                                         (ushort)expected_shred_version,
    1157           0 :                                                                         shred_limit ) ) );
    1158             : 
    1159           0 :   ctx->shred34  = shred34;
    1160           0 :   ctx->fec_sets = fec_sets;
    1161             : 
    1162           0 :   ctx->stake_ci = fd_stake_ci_join( fd_stake_ci_new( _stake_ci, ctx->identity_key ) );
    1163             : 
    1164           0 :   ctx->net_id   = (ushort)0;
    1165             : 
    1166           0 :   fd_ip4_udp_hdr_init( ctx->data_shred_net_hdr,   FD_SHRED_MIN_SZ, 0, tile->shred.shred_listen_port );
    1167           0 :   fd_ip4_udp_hdr_init( ctx->parity_shred_net_hdr, FD_SHRED_MAX_SZ, 0, tile->shred.shred_listen_port );
    1168             : 
    1169           0 :   ctx->adtl_dest->ip4  = tile->shred.adtl_dest.ip;
    1170           0 :   ctx->adtl_dest->port = tile->shred.adtl_dest.port;
    1171             : 
    1172           0 :   for( ulong i=0UL; i<tile->in_cnt; i++ ) {
    1173           0 :     fd_topo_link_t const * link = &topo->links[ tile->in_link_id[ i ] ];
    1174           0 :     fd_topo_wksp_t const * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
    1175             : 
    1176           0 :     if( FD_LIKELY(      !strcmp( link->name, "net_shred"    ) ) ) { ctx->in_kind[ i ] = IN_KIND_NET;
    1177           0 :       fd_net_rx_bounds_init( &ctx->in[ i ].net_rx, link->dcache );
    1178           0 :       continue; /* only net_rx needs to be set in this case. */ }
    1179           0 :     else if( FD_LIKELY( !strcmp( link->name, "poh_shred"    ) ) ) ctx->in_kind[ i ] = IN_KIND_POH;
    1180           0 :     else if( FD_LIKELY( !strcmp( link->name, "stake_out"    ) ) ) ctx->in_kind[ i ] = IN_KIND_STAKE;
    1181           0 :     else if( FD_LIKELY( !strcmp( link->name, "crds_shred"   ) ) ) ctx->in_kind[ i ] = IN_KIND_CONTACT;
    1182           0 :     else if( FD_LIKELY( !strcmp( link->name, "sign_shred"   ) ) ) ctx->in_kind[ i ] = IN_KIND_SIGN;
    1183           0 :     else if( FD_LIKELY( !strcmp( link->name, "repair_shred" ) ) ) ctx->in_kind[ i ] = IN_KIND_REPAIR;
    1184           0 :     else FD_LOG_ERR(( "shred tile has unexpected input link %lu %s", i, link->name ));
    1185             : 
    1186           0 :     ctx->in[ i ].mem    = link_wksp->wksp;
    1187           0 :     ctx->in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in[ i ].mem, link->dcache );
    1188           0 :     ctx->in[ i ].wmark  = fd_dcache_compact_wmark ( ctx->in[ i ].mem, link->dcache, link->mtu );
    1189           0 :   }
    1190             : 
    1191           0 :   fd_topo_link_t * net_out = &topo->links[ tile->out_link_id[ NET_OUT_IDX ] ];
    1192             : 
    1193           0 :   ctx->net_out_mcache = net_out->mcache;
    1194           0 :   ctx->net_out_sync   = fd_mcache_seq_laddr( ctx->net_out_mcache );
    1195           0 :   ctx->net_out_depth  = fd_mcache_depth( ctx->net_out_mcache );
    1196           0 :   ctx->net_out_seq    = fd_mcache_seq_query( ctx->net_out_sync );
    1197           0 :   ctx->net_out_chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( net_out->dcache ), net_out->dcache );
    1198           0 :   ctx->net_out_mem    = topo->workspaces[ topo->objs[ net_out->dcache_obj_id ].wksp_id ].wksp;
    1199           0 :   ctx->net_out_wmark  = fd_dcache_compact_wmark ( ctx->net_out_mem, net_out->dcache, net_out->mtu );
    1200           0 :   ctx->net_out_chunk  = ctx->net_out_chunk0;
    1201             : 
    1202           0 :   ctx->blockstore = NULL;
    1203           0 :   ulong blockstore_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "blockstore" );
    1204           0 :   if( FD_LIKELY( blockstore_obj_id!=ULONG_MAX ) ) { /* firedancer-only */
    1205           0 :     ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, fd_topo_obj_laddr( topo, blockstore_obj_id ) );
    1206           0 :     FD_TEST( ctx->blockstore->shmem->magic == FD_BLOCKSTORE_MAGIC );
    1207           0 :   }
    1208             : 
    1209           0 :   if( FD_LIKELY( ctx->repair_out_idx!=ULONG_MAX ) ) { /* firedancer-only */
    1210           0 :     fd_topo_link_t * repair_out = &topo->links[ tile->out_link_id[ ctx->repair_out_idx ] ];
    1211           0 :     ctx->repair_out_mem         = topo->workspaces[ topo->objs[ repair_out->dcache_obj_id ].wksp_id ].wksp;
    1212           0 :     ctx->repair_out_chunk0      = fd_dcache_compact_chunk0( ctx->repair_out_mem, repair_out->dcache );
    1213           0 :     ctx->repair_out_wmark       = fd_dcache_compact_wmark ( ctx->repair_out_mem, repair_out->dcache, repair_out->mtu );
    1214           0 :     ctx->repair_out_chunk       = ctx->repair_out_chunk0;
    1215           0 :     FD_TEST( fd_dcache_compact_is_safe( ctx->repair_out_mem, repair_out->dcache, repair_out->mtu, repair_out->depth ) );
    1216           0 :   }
    1217             : 
    1218           0 :   if( FD_LIKELY( ctx->store_out_idx!=ULONG_MAX ) ) { /* frankendancer-only */
    1219           0 :     fd_topo_link_t * store_out = &topo->links[ tile->out_link_id[ ctx->store_out_idx ] ];
    1220           0 :     ctx->store_out_mem         = topo->workspaces[ topo->objs[ store_out->dcache_obj_id ].wksp_id ].wksp;
    1221           0 :     ctx->store_out_chunk0      = fd_dcache_compact_chunk0( ctx->store_out_mem, store_out->dcache );
    1222           0 :     ctx->store_out_wmark       = fd_dcache_compact_wmark ( ctx->store_out_mem, store_out->dcache, store_out->mtu );
    1223           0 :     ctx->store_out_chunk       = ctx->store_out_chunk0;
    1224           0 :     FD_TEST( fd_dcache_compact_is_safe( ctx->store_out_mem, store_out->dcache, store_out->mtu, store_out->depth ) );
    1225           0 :   }
    1226             : 
    1227           0 :   ctx->poh_in_expect_seq = 0UL;
    1228             : 
    1229           0 :   ctx->shredder_fec_set_idx = 0UL;
    1230           0 :   ctx->shredder_max_fec_set_idx = (shred_store_mcache_depth+1UL)/2UL + 1UL;
    1231             : 
    1232           0 :   for( ulong i=0UL; i<FD_SHRED_BATCH_FEC_SETS_MAX; i++ ) { ctx->send_fec_set_idx[ i ] = ULONG_MAX; }
    1233           0 :   ctx->send_fec_set_cnt = 0UL;
    1234             : 
    1235           0 :   ctx->shred_buffer_sz  = 0UL;
    1236           0 :   memset( ctx->shred_buffer, 0xFF, FD_NET_MTU );
    1237             : 
    1238           0 :   fd_histf_join( fd_histf_new( ctx->metrics->contact_info_cnt,     FD_MHIST_MIN(         SHRED, CLUSTER_CONTACT_INFO_CNT   ),
    1239           0 :                                                                    FD_MHIST_MAX(         SHRED, CLUSTER_CONTACT_INFO_CNT   ) ) );
    1240           0 :   fd_histf_join( fd_histf_new( ctx->metrics->batch_sz,             FD_MHIST_MIN(         SHRED, BATCH_SZ                   ),
    1241           0 :                                                                    FD_MHIST_MAX(         SHRED, BATCH_SZ                   ) ) );
    1242           0 :   fd_histf_join( fd_histf_new( ctx->metrics->batch_microblock_cnt, FD_MHIST_MIN(         SHRED, BATCH_MICROBLOCK_CNT       ),
    1243           0 :                                                                    FD_MHIST_MAX(         SHRED, BATCH_MICROBLOCK_CNT       ) ) );
    1244           0 :   fd_histf_join( fd_histf_new( ctx->metrics->shredding_timing,     FD_MHIST_SECONDS_MIN( SHRED, SHREDDING_DURATION_SECONDS ),
    1245           0 :                                                                    FD_MHIST_SECONDS_MAX( SHRED, SHREDDING_DURATION_SECONDS ) ) );
    1246           0 :   fd_histf_join( fd_histf_new( ctx->metrics->add_shred_timing,     FD_MHIST_SECONDS_MIN( SHRED, ADD_SHRED_DURATION_SECONDS ),
    1247           0 :                                                                    FD_MHIST_SECONDS_MAX( SHRED, ADD_SHRED_DURATION_SECONDS ) ) );
    1248           0 :   memset( ctx->metrics->shred_processing_result, '\0', sizeof(ctx->metrics->shred_processing_result) );
    1249           0 :   ctx->metrics->invalid_block_id_cnt = 0UL;
    1250           0 :   ctx->metrics->shred_rejected_unchained_cnt = 0UL;
    1251             : 
    1252           0 :   ctx->pending_batch.microblock_cnt = 0UL;
    1253           0 :   ctx->pending_batch.txn_cnt        = 0UL;
    1254           0 :   ctx->pending_batch.pos            = 0UL;
    1255           0 :   ctx->pending_batch.slot           = 0UL;
    1256           0 :   memset( ctx->pending_batch.payload, 0, sizeof(ctx->pending_batch.payload) );
    1257             : 
    1258           0 :   for( ulong i=0UL; i<FD_SHRED_FEATURES_ACTIVATION_SLOT_CNT; i++ )
    1259           0 :     ctx->features_activation->slots[i] = FD_SHRED_FEATURES_ACTIVATION_SLOT_DISABLED;
    1260             : 
    1261           0 :   ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
    1262           0 :   if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
    1263           0 :     FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
    1264           0 : }
    1265             : 
    1266             : static ulong
    1267             : populate_allowed_seccomp( fd_topo_t const *      topo,
    1268             :                           fd_topo_tile_t const * tile,
    1269             :                           ulong                  out_cnt,
    1270           0 :                           struct sock_filter *   out ) {
    1271           0 :   (void)topo;
    1272           0 :   (void)tile;
    1273             : 
    1274           0 :   populate_sock_filter_policy_fd_shred_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
    1275           0 :   return sock_filter_policy_fd_shred_tile_instr_cnt;
    1276           0 : }
    1277             : 
    1278             : static ulong
    1279             : populate_allowed_fds( fd_topo_t const *      topo,
    1280             :                       fd_topo_tile_t const * tile,
    1281             :                       ulong                  out_fds_cnt,
    1282           0 :                       int *                  out_fds ) {
    1283           0 :   (void)topo;
    1284           0 :   (void)tile;
    1285             : 
    1286           0 :   if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
    1287             : 
    1288           0 :   ulong out_cnt = 0UL;
    1289           0 :   out_fds[ out_cnt++ ] = 2; /* stderr */
    1290           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
    1291           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
    1292           0 :   return out_cnt;
    1293           0 : }
    1294             : 
    1295             : /* Excluding net_out (where the link is unreliable), STEM_BURST needs
    1296             :    to guarantee enough credits for the worst case. There are 4 cases
    1297             :    to consider: (IN_KIND_NET/IN_KIND_POH) x (Frankendancer/Firedancer)
    1298             :    In the IN_KIND_NET case:  (Frankendancer) that can be 4 frags to
    1299             :    store;  (Firedancer) that is one frag for the shred to repair, and
    1300             :    then another frag to repair for the FEC set.
    1301             :    In the IN_KIND_POH case:  (Frankendancer) there might be
    1302             :    FD_SHRED_BATCH_FEC_SETS_MAX FEC sets, but we know they are 32:32,
    1303             :    which means only two shred34s per FEC set;  (Firedancer) that is
    1304             :    FD_SHRED_BATCH_FEC_SETS_MAX frags to repair (one per FEC set).
    1305             :    Therefore, the worst case is IN_KIND_POH for Frankendancer. */
    1306           0 : #define STEM_BURST (FD_SHRED_BATCH_FEC_SETS_MAX*2UL)
    1307             : 
    1308             : /* See explanation in fd_pack */
    1309           0 : #define STEM_LAZY  (128L*3000L)
    1310             : 
    1311           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_shred_ctx_t
    1312           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_shred_ctx_t)
    1313             : 
    1314           0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
    1315           0 : #define STEM_CALLBACK_METRICS_WRITE       metrics_write
    1316           0 : #define STEM_CALLBACK_BEFORE_FRAG         before_frag
    1317           0 : #define STEM_CALLBACK_DURING_FRAG         during_frag
    1318           0 : #define STEM_CALLBACK_AFTER_FRAG          after_frag
    1319             : 
    1320             : #include "../stem/fd_stem.c"
    1321             : 
    1322             : fd_topo_run_tile_t fd_tile_shred = {
    1323             :   .name                     = "shred",
    1324             :   .populate_allowed_seccomp = populate_allowed_seccomp,
    1325             :   .populate_allowed_fds     = populate_allowed_fds,
    1326             :   .scratch_align            = scratch_align,
    1327             :   .scratch_footprint        = scratch_footprint,
    1328             :   .privileged_init          = privileged_init,
    1329             :   .unprivileged_init        = unprivileged_init,
    1330             :   .run                      = stem_run,
    1331             : };

Generated by: LCOV version 1.14