LCOV - code coverage report
Current view: top level - disco/shred - fd_shred_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 15 532 2.8 %
Date: 2025-03-20 12:08:36 Functions: 2 15 13.3 %

          Line data    Source code
       1             : #include "../tiles.h"
       2             : 
       3             : #include "generated/fd_shred_tile_seccomp.h"
       4             : #include "../topo/fd_pod_format.h"
       5             : #include "../shred/fd_shredder.h"
       6             : #include "../shred/fd_shred_dest.h"
       7             : #include "../shred/fd_fec_resolver.h"
       8             : #include "../shred/fd_stake_ci.h"
       9             : #include "../keyguard/fd_keyload.h"
      10             : #include "../keyguard/fd_keyguard.h"
      11             : #include "../keyguard/fd_keyswitch.h"
      12             : #include "../fd_disco.h"
      13             : #include "../../flamenco/leaders/fd_leaders.h"
      14             : #include "../../flamenco/runtime/fd_blockstore.h"
      15             : #include "../../util/net/fd_net_headers.h"
      16             : 
      17             : #include <linux/unistd.h>
      18             : 
      19             : /* The shred tile handles shreds from two data sources: shreds
      20             :    generated from microblocks from the banking tile, and shreds
      21             :    retransmitted from the network.
      22             : 
      23             :    They have rather different semantics, but at the end of the day, they
      24             :    both result in a bunch of shreds and FEC sets that need to be sent to
      25             :    the blockstore and on the network, which is why one tile handles
      26             :    both.
      27             : 
      28             :    We segment the memory for the two types of shreds into two halves of
      29             :    a dcache because they follow somewhat different flow control
      30             :    patterns. For flow control, the normal guarantee we want to provide
      31             :    is that the dcache entry is not overwritten unless the mcache entry
      32             :    has also been overwritten.  The normal way to do this when using both
      33             :    cyclically and with a 1-to-1 mapping is to make the dcache at least
      34             :    `burst` entries bigger than the mcache.
      35             : 
      36             :    In this tile, we use one output mcache with one output dcache (which
      37             :    is logically partitioned into two) for the two sources of data.  The
      38             :    worst case for flow control is when we're only sending with one of
      39             :    the dcache partitions at a time though, so we can consider them
      40             :    separately.
      41             : 
      42             :    From bank: Every FEC set triggers at least two mcache entries (one
      43             :    for parity and one for data), so at most, we have ceil(mcache
      44             :    depth/2) FEC sets exposed.  This means we need to decompose dcache
      45             :    into at least ceil(mcache depth/2)+1 FEC sets.
      46             : 
      47             :    From the network: The FEC resolver doesn't use a cyclic order, but it
      48             :    does promise that once it returns an FEC set, it will return at least
      49             :    complete_depth FEC sets before returning it again.  This means we
      50             :    want at most complete_depth-1 FEC sets exposed, so
      51             :    complete_depth=ceil(mcache depth/2)+1 FEC sets as above.  The FEC
      52             :    resolver has the ability to keep individual shreds for partial_depth
      53             :    calls, but because in this version of the shred tile, we send each
      54             :    shred to all its destinations as soon as we get it, we don't need
      55             :    that functionality, so we set partial_depth=1.
      56             : 
      57             :    Adding these up, we get 2*ceil(mcache_depth/2)+3+fec_resolver_depth
      58             :    FEC sets, which is no more than mcache_depth+4+fec_resolver_depth.
      59             :    Each FEC is paired with 4 fd_shred34_t structs, so that means we need
      60             :    to decompose the dcache into 4*mcache_depth + 4*fec_resolver_depth +
      61             :    16 fd_shred34_t structs. */
      62             : 
      63             : 
      64             : /* The memory this tile uses is a bit complicated and has some logical
      65             :    aliasing to facilitate zero-copy use.  We have a dcache containing
      66             :    fd_shred34_t objects, which are basically 34 fd_shred_t objects
      67             :    padded to their max size, where 34 is set so that the size of the
      68             :    fd_shred34_t object (including some metadata) is less than
      69             :    USHORT_MAX, which facilitates sending it using Tango.  Then, for each
      70             :    set of 4 consecutive fd_shred34_t objects, we have an fd_fec_set_t.
      71             :    The first 34 data shreds point to the payload section of the payload
      72             :    section of each of the packets in the first fd_shred34_t.  The other
      73             :    33 data shreds point into the second fd_shred34_t.  Similar for the
      74             :    parity shreds pointing into the third and fourth fd_shred34_t. */
      75             : 
      76             : /* There's nothing deep about this max, but I just find it easier to
      77             :    have a max and use statically sized arrays than alloca. */
      78             : #define MAX_BANK_CNT 64UL
      79             : 
      80             : /* MAX_SHRED_DESTS indicates the maximum number of destinations (i.e. a
      81             :    pubkey -> ip, port) that the shred tile can keep track of. */
      82           0 : #define MAX_SHRED_DESTS 40200UL
      83             : 
      84             : #define FD_SHRED_TILE_SCRATCH_ALIGN 128UL
      85             : 
      86           0 : #define IN_KIND_CONTACT (0UL)
      87           0 : #define IN_KIND_STAKE   (1UL)
      88           0 : #define IN_KIND_POH     (2UL)
      89           0 : #define IN_KIND_NET     (3UL)
      90           0 : #define IN_KIND_SIGN    (4UL)
      91             : 
      92           0 : #define STORE_OUT_IDX   0
      93           0 : #define NET_OUT_IDX     1
      94           0 : #define SIGN_OUT_IDX    2
      95           0 : #define REPLAY_OUT_IDX  3
      96             : 
      97             : #define MAX_SLOTS_PER_EPOCH 432000UL
      98             : 
      99           0 : #define DCACHE_ENTRIES_PER_FEC_SET (4UL)
     100             : FD_STATIC_ASSERT( sizeof(fd_shred34_t) < USHORT_MAX, shred_34 );
     101             : FD_STATIC_ASSERT( 34*DCACHE_ENTRIES_PER_FEC_SET >= FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX, shred_34 );
     102             : FD_STATIC_ASSERT( sizeof(fd_shred34_t) == FD_SHRED_STORE_MTU, shred_34 );
     103             : 
     104             : FD_STATIC_ASSERT( sizeof(fd_entry_batch_meta_t)==24UL, poh_shred_mtu );
     105             : 
     106           0 : #define FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT 2
     107             : 
     108             : typedef struct {
     109             :   fd_wksp_t * mem;
     110             :   ulong       chunk0;
     111             :   ulong       wmark;
     112             : } fd_shred_in_ctx_t;
     113             : 
     114             : typedef struct {
     115             :   fd_shredder_t      * shredder;
     116             :   fd_fec_resolver_t  * resolver;
     117             :   fd_pubkey_t          identity_key[1]; /* Just the public key */
     118             : 
     119             :   ulong                round_robin_id;
     120             :   ulong                round_robin_cnt;
     121             :   /* Number of batches shredded from PoH during the current slot.
     122             :      This should be the same for all the shred tiles. */
     123             :   ulong                batch_cnt;
     124             :   /* Slot of the most recent microblock we've seen from PoH,
     125             :      or 0 if we haven't seen one yet */
     126             :   ulong                slot;
     127             : 
     128             :   fd_keyswitch_t *     keyswitch;
     129             :   fd_keyguard_client_t keyguard_client[1];
     130             : 
     131             :   /* shred34 and fec_sets are very related: fec_sets[i] has pointers
     132             :      to the shreds in shred34[4*i + k] for k=0,1,2,3. */
     133             :   fd_shred34_t       * shred34;
     134             :   fd_fec_set_t       * fec_sets;
     135             : 
     136             :   fd_stake_ci_t      * stake_ci;
     137             :   /* These are used in between during_frag and after_frag */
     138             :   fd_shred_dest_weighted_t * new_dest_ptr;
     139             :   ulong                      new_dest_cnt;
     140             :   ulong                      shredded_txn_cnt;
     141             : 
     142             :   ulong poh_in_expect_seq;
     143             : 
     144             :   ushort net_id;
     145             : 
     146             :   int skip_frag;
     147             : 
     148             :   fd_ip4_udp_hdrs_t data_shred_net_hdr  [1];
     149             :   fd_ip4_udp_hdrs_t parity_shred_net_hdr[1];
     150             : 
     151             :   fd_wksp_t * shred_store_wksp;
     152             : 
     153             :   ulong shredder_fec_set_idx;     /* In [0, shredder_max_fec_set_idx) */
     154             :   ulong shredder_max_fec_set_idx; /* exclusive */
     155             : 
     156             :   ulong send_fec_set_idx;
     157             :   ulong tsorig;  /* timestamp of the last packet in compressed form */
     158             : 
     159             :   /* Includes Ethernet, IP, UDP headers */
     160             :   ulong shred_buffer_sz;
     161             :   uchar shred_buffer[ FD_NET_MTU ];
     162             : 
     163             : 
     164             :   fd_shred_in_ctx_t in[ 32 ];
     165             :   int               in_kind[ 32 ];
     166             : 
     167             :   fd_frag_meta_t * net_out_mcache;
     168             :   ulong *          net_out_sync;
     169             :   ulong            net_out_depth;
     170             :   ulong            net_out_seq;
     171             : 
     172             :   fd_wksp_t * net_out_mem;
     173             :   ulong       net_out_chunk0;
     174             :   ulong       net_out_wmark;
     175             :   ulong       net_out_chunk;
     176             : 
     177             :   fd_wksp_t * store_out_mem;
     178             :   ulong       store_out_chunk0;
     179             :   ulong       store_out_wmark;
     180             :   ulong       store_out_chunk;
     181             : 
     182             :   fd_wksp_t * replay_out_mem;
     183             :   ulong       replay_out_chunk0;
     184             :   ulong       replay_out_wmark;
     185             :   ulong       replay_out_chunk;
     186             : 
     187             :   fd_blockstore_t   blockstore_ljoin;
     188             :   fd_blockstore_t * blockstore;
     189             : 
     190             :   struct {
     191             :     fd_histf_t contact_info_cnt[ 1 ];
     192             :     fd_histf_t batch_sz[ 1 ];
     193             :     fd_histf_t batch_microblock_cnt[ 1 ];
     194             :     fd_histf_t shredding_timing[ 1 ];
     195             :     fd_histf_t add_shred_timing[ 1 ];
     196             :     ulong shred_processing_result[ FD_FEC_RESOLVER_ADD_SHRED_RETVAL_CNT+FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT ];
     197             :   } metrics[ 1 ];
     198             : 
     199             :   struct {
     200             :     ulong txn_cnt;
     201             :     ulong pos; /* in payload, so 0<=pos<63671 */
     202             :     ulong slot; /* set to 0 when pos==0 */
     203             :     union {
     204             :       struct {
     205             :         ulong microblock_cnt;
     206             :         uchar payload[ 63679UL - 8UL ];
     207             :       };
     208             :       uchar raw[ 63679UL ]; /* The largest that fits in 1 FEC set */
     209             :     };
     210             :   } pending_batch;
     211             : } fd_shred_ctx_t;
     212             : 
     213             : /* PENDING_BATCH_WMARK: Following along the lines of dcache, batch
     214             :    microblocks until either the slot ends or we excede the watermark.
     215             :    We know that if we're <= watermark, we can always accept a message of
     216             :    maximum size. */
     217           0 : #define PENDING_BATCH_WMARK (63679UL - 8UL - FD_POH_SHRED_MTU)
     218             : 
     219             : FD_FN_CONST static inline ulong
     220           3 : scratch_align( void ) {
     221           3 :   return 128UL;
     222           3 : }
     223             : 
     224             : FD_FN_PURE static inline ulong
     225           3 : scratch_footprint( fd_topo_tile_t const * tile ) {
     226             : 
     227           3 :   ulong fec_resolver_footprint = fd_fec_resolver_footprint( tile->shred.fec_resolver_depth, 1UL, tile->shred.depth,
     228           3 :                                                             128UL * tile->shred.fec_resolver_depth );
     229           3 :   ulong fec_set_cnt = tile->shred.depth + tile->shred.fec_resolver_depth + 4UL;
     230             : 
     231           3 :   ulong l = FD_LAYOUT_INIT;
     232           3 :   l = FD_LAYOUT_APPEND( l, alignof(fd_shred_ctx_t),          sizeof(fd_shred_ctx_t)                  );
     233           3 :   l = FD_LAYOUT_APPEND( l, fd_stake_ci_align(),              fd_stake_ci_footprint()                 );
     234           3 :   l = FD_LAYOUT_APPEND( l, fd_fec_resolver_align(),          fec_resolver_footprint                  );
     235           3 :   l = FD_LAYOUT_APPEND( l, fd_shredder_align(),              fd_shredder_footprint()                 );
     236           3 :   l = FD_LAYOUT_APPEND( l, alignof(fd_fec_set_t),            sizeof(fd_fec_set_t)*fec_set_cnt        );
     237           3 :   return FD_LAYOUT_FINI( l, scratch_align() );
     238           3 : }
     239             : 
     240             : static inline void
     241           0 : during_housekeeping( fd_shred_ctx_t * ctx ) {
     242           0 :   if( FD_UNLIKELY( fd_keyswitch_state_query( ctx->keyswitch )==FD_KEYSWITCH_STATE_SWITCH_PENDING ) ) {
     243           0 :     ulong seq_must_complete = ctx->keyswitch->param;
     244             : 
     245           0 :     if( FD_UNLIKELY( fd_seq_lt( ctx->poh_in_expect_seq, seq_must_complete ) ) ) {
     246             :       /* See fd_keyswitch.h, we need to flush any in-flight shreds from
     247             :          the leader pipeline before switching key. */
     248           0 :       FD_LOG_WARNING(( "Flushing in-flight unpublished shreds, must reach seq %lu, currently at %lu ...", seq_must_complete, ctx->poh_in_expect_seq ));
     249           0 :       return;
     250           0 :     }
     251             : 
     252           0 :     fd_memcpy( ctx->identity_key->uc, ctx->keyswitch->bytes, 32UL );
     253           0 :     fd_stake_ci_set_identity( ctx->stake_ci, ctx->identity_key );
     254           0 :     fd_keyswitch_state( ctx->keyswitch, FD_KEYSWITCH_STATE_COMPLETED );
     255           0 :   }
     256           0 : }
     257             : 
     258             : static inline void
     259           0 : metrics_write( fd_shred_ctx_t * ctx ) {
     260           0 :   FD_MHIST_COPY( SHRED, CLUSTER_CONTACT_INFO_CNT,   ctx->metrics->contact_info_cnt      );
     261           0 :   FD_MHIST_COPY( SHRED, BATCH_SZ,                   ctx->metrics->batch_sz              );
     262           0 :   FD_MHIST_COPY( SHRED, BATCH_MICROBLOCK_CNT,       ctx->metrics->batch_microblock_cnt  );
     263           0 :   FD_MHIST_COPY( SHRED, SHREDDING_DURATION_SECONDS, ctx->metrics->shredding_timing      );
     264           0 :   FD_MHIST_COPY( SHRED, ADD_SHRED_DURATION_SECONDS, ctx->metrics->add_shred_timing      );
     265             : 
     266           0 :   FD_MCNT_ENUM_COPY( SHRED, SHRED_PROCESSED, ctx->metrics->shred_processing_result      );
     267           0 : }
     268             : 
     269             : static inline void
     270             : handle_new_cluster_contact_info( fd_shred_ctx_t * ctx,
     271           0 :                                  uchar const    * buf ) {
     272           0 :   ulong const * header = (ulong const *)fd_type_pun_const( buf );
     273             : 
     274           0 :   ulong dest_cnt = header[ 0 ];
     275           0 :   fd_histf_sample( ctx->metrics->contact_info_cnt, dest_cnt );
     276             : 
     277           0 :   if( dest_cnt >= MAX_SHRED_DESTS )
     278           0 :     FD_LOG_ERR(( "Cluster nodes had %lu destinations, which was more than the max of %lu", dest_cnt, MAX_SHRED_DESTS ));
     279             : 
     280           0 :   fd_shred_dest_wire_t const * in_dests = fd_type_pun_const( header+1UL );
     281           0 :   fd_shred_dest_weighted_t * dests = fd_stake_ci_dest_add_init( ctx->stake_ci );
     282             : 
     283           0 :   ctx->new_dest_ptr = dests;
     284           0 :   ctx->new_dest_cnt = dest_cnt;
     285             : 
     286           0 :   for( ulong i=0UL; i<dest_cnt; i++ ) {
     287           0 :     memcpy( dests[i].pubkey.uc, in_dests[i].pubkey, 32UL );
     288           0 :     dests[i].ip4  = in_dests[i].ip4_addr;
     289           0 :     dests[i].port = in_dests[i].udp_port;
     290           0 :   }
     291           0 : }
     292             : 
     293             : static inline void
     294           0 : finalize_new_cluster_contact_info( fd_shred_ctx_t * ctx ) {
     295           0 :   fd_stake_ci_dest_add_fini( ctx->stake_ci, ctx->new_dest_cnt );
     296           0 : }
     297             : 
     298             : static inline int
     299             : before_frag( fd_shred_ctx_t * ctx,
     300             :              ulong            in_idx,
     301             :              ulong            seq,
     302           0 :              ulong            sig ) {
     303           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH ) ) ctx->poh_in_expect_seq = seq+1UL;
     304             : 
     305           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) )     return fd_disco_netmux_sig_proto( sig )!=DST_PROTO_SHRED;
     306           0 :   else if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH ) ) return fd_disco_poh_sig_pkt_type( sig )!=POH_PKT_TYPE_MICROBLOCK;
     307             : 
     308           0 :   return 0;
     309           0 : }
     310             : 
     311             : static void
     312             : during_frag( fd_shred_ctx_t * ctx,
     313             :              ulong            in_idx,
     314             :              ulong            seq FD_PARAM_UNUSED,
     315             :              ulong            sig,
     316             :              ulong            chunk,
     317             :              ulong            sz,
     318           0 :              ulong            ctl ) {
     319             : 
     320           0 :   ctx->skip_frag = 0;
     321             : 
     322           0 :   ctx->tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
     323             : 
     324             : 
     325           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_CONTACT ) ) {
     326           0 :     if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
     327           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     328           0 :                    ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     329             : 
     330           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     331           0 :     handle_new_cluster_contact_info( ctx, dcache_entry );
     332           0 :     return;
     333           0 :   }
     334             : 
     335           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_STAKE ) ) {
     336           0 :     if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
     337           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     338           0 :                    ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     339             : 
     340           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     341           0 :     fd_stake_ci_stake_msg_init( ctx->stake_ci, dcache_entry );
     342           0 :     return;
     343           0 :   }
     344             : 
     345           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH ) ) {
     346             :     /* This is a frag from the PoH tile.  We'll copy it to our pending
     347             :        microblock batch and shred it if necessary (last in block or
     348             :        above watermark).  We just go ahead and shred it here, even
     349             :        though we may get overrun.  If we do end up getting overrun, we
     350             :        just won't send these shreds out and we'll reuse the FEC set for
     351             :        the next one.  From a higher level though, if we do get overrun,
     352             :        a bunch of shreds will never be transmitted, and we'll end up
     353             :        producing a block that never lands on chain. */
     354           0 :     fd_fec_set_t * out = ctx->fec_sets + ctx->shredder_fec_set_idx;
     355             : 
     356           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
     357           0 :     if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>FD_POH_SHRED_MTU ||
     358           0 :         sz<(sizeof(fd_entry_batch_meta_t)+sizeof(fd_entry_batch_header_t)) ) )
     359           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
     360           0 :             ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     361             : 
     362           0 :     fd_entry_batch_meta_t const * entry_meta = (fd_entry_batch_meta_t const *)dcache_entry;
     363           0 :     uchar const *                 entry      = dcache_entry + sizeof(fd_entry_batch_meta_t);
     364           0 :     ulong                         entry_sz   = sz           - sizeof(fd_entry_batch_meta_t);
     365             : 
     366           0 :     fd_entry_batch_header_t const * microblock = (fd_entry_batch_header_t const *)entry;
     367             : 
     368             :     /* It should never be possible for this to fail, but we check it
     369             :        anyway. */
     370           0 :     FD_TEST( entry_sz + ctx->pending_batch.pos <= sizeof(ctx->pending_batch.payload) );
     371             : 
     372           0 :     ulong target_slot = fd_disco_poh_sig_slot( sig );
     373           0 :     if( FD_UNLIKELY( (ctx->pending_batch.microblock_cnt>0) & (ctx->pending_batch.slot!=target_slot) ) ) {
     374             :       /* TODO: The Agave client sends a dummy entry batch with only 1
     375             :          byte and the block-complete bit set.  This helps other
     376             :          validators know that the block is dead and they should not try
     377             :          to continue building a fork on it.  We probably want a similar
     378             :          approach eventually. */
     379           0 :       FD_LOG_WARNING(( "Abandoning %lu microblocks for slot %lu and switching to slot %lu",
     380           0 :             ctx->pending_batch.microblock_cnt, ctx->pending_batch.slot, target_slot ));
     381           0 :       ctx->pending_batch.slot           = 0UL;
     382           0 :       ctx->pending_batch.pos            = 0UL;
     383           0 :       ctx->pending_batch.microblock_cnt = 0UL;
     384           0 :       ctx->pending_batch.txn_cnt        = 0UL;
     385           0 :       ctx->batch_cnt                    = 0UL;
     386             : 
     387           0 :       FD_MCNT_INC( SHRED, MICROBLOCKS_ABANDONED, 1UL );
     388           0 :     }
     389             : 
     390           0 :     ctx->pending_batch.slot = target_slot;
     391           0 :     if( FD_UNLIKELY( target_slot!=ctx->slot )) {
     392             :       /* Reset batch count if we are in a new slot */
     393           0 :       ctx->batch_cnt = 0UL;
     394           0 :       ctx->slot      = target_slot;
     395           0 :     }
     396           0 :     if( FD_UNLIKELY( ctx->batch_cnt%ctx->round_robin_cnt==ctx->round_robin_id ) ) {
     397             :       /* Ugh, yet another memcpy */
     398           0 :       fd_memcpy( ctx->pending_batch.payload + ctx->pending_batch.pos, entry, entry_sz );
     399           0 :     } else {
     400             :       /* If we are not processing this batch, filter */
     401           0 :       ctx->skip_frag = 1;
     402           0 :     }
     403           0 :     ctx->pending_batch.pos            += entry_sz;
     404           0 :     ctx->pending_batch.microblock_cnt += 1UL;
     405           0 :     ctx->pending_batch.txn_cnt        += microblock->txn_cnt;
     406             : 
     407           0 :     int last_in_batch = entry_meta->block_complete | (ctx->pending_batch.pos > PENDING_BATCH_WMARK);
     408             : 
     409           0 :     ctx->send_fec_set_idx = ULONG_MAX;
     410           0 :     if( FD_UNLIKELY( last_in_batch )) {
     411           0 :       if( FD_UNLIKELY( ctx->batch_cnt%ctx->round_robin_cnt==ctx->round_robin_id ) ) {
     412             :         /* If it's our turn, shred this batch. FD_UNLIKELY because shred tile cnt generally >= 2 */
     413           0 :         ulong batch_sz = sizeof(ulong)+ctx->pending_batch.pos;
     414             : 
     415             :         /* We sized this so it fits in one FEC set */
     416           0 :         long shredding_timing =  -fd_tickcount();
     417             : 
     418           0 :         if( FD_UNLIKELY( entry_meta->block_complete && batch_sz < FD_SHREDDER_NORMAL_FEC_SET_PAYLOAD_SZ ) ) {
     419             : 
     420             :           /* Ensure the last batch generates >= 32 data shreds by
     421             :              padding with 0s. Because the last FEC set is "oddly sized"
     422             :              we only expect this code path to execute for blocks
     423             :              containing less data than can fill 32 data shred payloads
     424             :              (hence FD_UNLIKELY).
     425             : 
     426             :              See documentation for FD_SHREDDER_NORMAL_FEC_SET_PAYLOAD_SZ
     427             :              for further context. */
     428             : 
     429           0 :           fd_memset( ctx->pending_batch.payload + ctx->pending_batch.pos, 0, FD_SHREDDER_NORMAL_FEC_SET_PAYLOAD_SZ - batch_sz );
     430           0 :           batch_sz = FD_SHREDDER_NORMAL_FEC_SET_PAYLOAD_SZ;
     431           0 :         }
     432             : 
     433           0 :         fd_shredder_init_batch( ctx->shredder, ctx->pending_batch.raw, batch_sz, target_slot, entry_meta );
     434           0 :         FD_TEST( fd_shredder_next_fec_set( ctx->shredder, out ) );
     435           0 :         fd_shredder_fini_batch( ctx->shredder );
     436           0 :         shredding_timing      +=  fd_tickcount();
     437             : 
     438           0 :         d_rcvd_join( d_rcvd_new( d_rcvd_delete( d_rcvd_leave( out->data_shred_rcvd   ) ) ) );
     439           0 :         p_rcvd_join( p_rcvd_new( p_rcvd_delete( p_rcvd_leave( out->parity_shred_rcvd ) ) ) );
     440           0 :         ctx->shredded_txn_cnt = ctx->pending_batch.txn_cnt;
     441             : 
     442           0 :         ctx->send_fec_set_idx = ctx->shredder_fec_set_idx;
     443             : 
     444             :         /* Update metrics */
     445           0 :         fd_histf_sample( ctx->metrics->batch_sz,             batch_sz                          );
     446           0 :         fd_histf_sample( ctx->metrics->batch_microblock_cnt, ctx->pending_batch.microblock_cnt );
     447           0 :         fd_histf_sample( ctx->metrics->shredding_timing,     (ulong)shredding_timing           );
     448           0 :       } else {
     449             :         /* If it's not our turn, update the indices for this slot */
     450           0 :         fd_shredder_skip_batch( ctx->shredder, sizeof(ulong)+ctx->pending_batch.pos, target_slot );
     451           0 :       }
     452             : 
     453           0 :       ctx->pending_batch.slot           = 0UL;
     454           0 :       ctx->pending_batch.pos            = 0UL;
     455           0 :       ctx->pending_batch.microblock_cnt = 0UL;
     456           0 :       ctx->pending_batch.txn_cnt        = 0UL;
     457           0 :       ctx->batch_cnt++;
     458           0 :     }
     459           0 :   } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
     460             :     /* The common case, from the net tile.  The FEC resolver API does
     461             :        not present a prepare/commit model. If we get overrun between
     462             :        when the FEC resolver verifies the signature and when it stores
     463             :        the local copy, we could end up storing and retransmitting
     464             :        garbage.  Instead we copy it locally, sadly, and only give it to
     465             :        the FEC resolver when we know it won't be overrun anymore. */
     466           0 :     if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>FD_NET_MTU ) )
     467           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
     468           0 :     uchar const * dcache_entry = (uchar const *)fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk ) + ctl;
     469           0 :     ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
     470           0 :     FD_TEST( hdr_sz <= sz ); /* Should be ensured by the net tile */
     471           0 :     fd_shred_t const * shred = fd_shred_parse( dcache_entry+hdr_sz, sz-hdr_sz );
     472           0 :     if( FD_UNLIKELY( !shred ) ) {
     473           0 :       ctx->skip_frag = 1;
     474           0 :       return;
     475           0 :     };
     476             :     /* all shreds in the same FEC set will have the same signature
     477             :        so we can round-robin shreds between the shred tiles based on
     478             :        just the signature without splitting individual FEC sets. */
     479           0 :     ulong sig = fd_ulong_load_8( shred->signature );
     480           0 :     if( FD_LIKELY( sig%ctx->round_robin_cnt!=ctx->round_robin_id ) ) {
     481           0 :       ctx->skip_frag = 1;
     482           0 :       return;
     483           0 :     }
     484           0 :     fd_memcpy( ctx->shred_buffer, dcache_entry+hdr_sz, sz-hdr_sz );
     485           0 :     ctx->shred_buffer_sz = sz-hdr_sz;
     486           0 :   }
     487           0 : }
     488             : 
     489             : static inline void
     490             : send_shred( fd_shred_ctx_t *    ctx,
     491             :             fd_shred_t const *  shred,
     492             :             fd_shred_dest_t *   sdest,
     493             :             fd_shred_dest_idx_t dest_idx,
     494           0 :             ulong               tsorig ) {
     495           0 :   fd_shred_dest_weighted_t * dest = fd_shred_dest_idx_to_dest( sdest, dest_idx );
     496             : 
     497           0 :   if( FD_UNLIKELY( !dest->ip4 ) ) return;
     498             : 
     499           0 :   uchar * packet = fd_chunk_to_laddr( ctx->net_out_mem, ctx->net_out_chunk );
     500             : 
     501           0 :   int is_data = fd_shred_is_data( fd_shred_type( shred->variant ) );
     502           0 :   fd_ip4_udp_hdrs_t * hdr  = (fd_ip4_udp_hdrs_t *)packet;
     503           0 :   *hdr = *( is_data ? ctx->data_shred_net_hdr : ctx->parity_shred_net_hdr );
     504             : 
     505           0 :   fd_ip4_hdr_t * ip4 = hdr->ip4;
     506           0 :   ip4->daddr  = dest->ip4;
     507           0 :   ip4->net_id = fd_ushort_bswap( ctx->net_id++ );
     508           0 :   ip4->check  = 0U;
     509           0 :   ip4->check  = fd_ip4_hdr_check_fast( ip4 );
     510             : 
     511           0 :   hdr->udp->net_dport = fd_ushort_bswap( dest->port );
     512             : 
     513           0 :   ulong shred_sz = fd_ulong_if( is_data, FD_SHRED_MIN_SZ, FD_SHRED_MAX_SZ );
     514           0 :   fd_memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), shred, shred_sz );
     515             : 
     516           0 :   ulong pkt_sz = shred_sz + sizeof(fd_ip4_udp_hdrs_t);
     517           0 :   ulong tspub  = fd_frag_meta_ts_comp( fd_tickcount() );
     518           0 :   ulong sig    = fd_disco_netmux_sig( dest->ip4, dest->port, dest->ip4, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) );
     519           0 :   fd_mcache_publish( ctx->net_out_mcache, ctx->net_out_depth, ctx->net_out_seq, sig, ctx->net_out_chunk, pkt_sz, 0UL, tsorig, tspub );
     520           0 :   ctx->net_out_seq   = fd_seq_inc( ctx->net_out_seq, 1UL );
     521           0 :   ctx->net_out_chunk = fd_dcache_compact_next( ctx->net_out_chunk, pkt_sz, ctx->net_out_chunk0, ctx->net_out_wmark );
     522           0 : }
     523             : 
     524             : static void
     525             : after_frag( fd_shred_ctx_t *    ctx,
     526             :             ulong               in_idx,
     527             :             ulong               seq,
     528             :             ulong               sig,
     529             :             ulong               sz,
     530             :             ulong               tsorig,
     531             :             ulong               _tspub,
     532           0 :             fd_stem_context_t * stem ) {
     533           0 :   (void)seq;
     534           0 :   (void)sig;
     535           0 :   (void)sz;
     536           0 :   (void)tsorig;
     537           0 :   (void)_tspub;
     538             : 
     539           0 :   if( FD_UNLIKELY( ctx->skip_frag ) ) return;
     540             : 
     541           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_CONTACT ) ) {
     542           0 :     finalize_new_cluster_contact_info( ctx );
     543           0 :     return;
     544           0 :   }
     545             : 
     546           0 :   if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_STAKE ) ) {
     547           0 :     fd_stake_ci_stake_msg_fini( ctx->stake_ci );
     548           0 :     return;
     549           0 :   }
     550             : 
     551           0 :   if( FD_UNLIKELY( (ctx->in_kind[ in_idx ]==IN_KIND_POH) & (ctx->send_fec_set_idx==ULONG_MAX) ) ) {
     552             :     /* Entry from PoH that didn't trigger a new FEC set to be made */
     553           0 :     return;
     554           0 :   }
     555             : 
     556           0 :   const ulong fanout = 200UL;
     557           0 :   fd_shred_dest_idx_t _dests[ 200*(FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX) ];
     558             : 
     559           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
     560           0 :     uchar * shred_buffer    = ctx->shred_buffer;
     561           0 :     ulong   shred_buffer_sz = ctx->shred_buffer_sz;
     562             : 
     563           0 :     fd_shred_t const * shred = fd_shred_parse( shred_buffer, shred_buffer_sz );
     564           0 :     if( FD_UNLIKELY( !shred       ) ) { ctx->metrics->shred_processing_result[ 1 ]++; return; }
     565             : 
     566           0 :     fd_epoch_leaders_t const * lsched = fd_stake_ci_get_lsched_for_slot( ctx->stake_ci, shred->slot );
     567           0 :     if( FD_UNLIKELY( !lsched      ) ) { ctx->metrics->shred_processing_result[ 0 ]++; return; }
     568             : 
     569           0 :     fd_pubkey_t const * slot_leader = fd_epoch_leaders_get( lsched, shred->slot );
     570           0 :     if( FD_UNLIKELY( !slot_leader ) ) { ctx->metrics->shred_processing_result[ 0 ]++; return; } /* Count this as bad slot too */
     571             : 
     572           0 :     fd_fec_set_t const * out_fec_set[1];
     573           0 :     fd_shred_t const   * out_shred[1];
     574           0 :     fd_bmtree_node_t     out_merkle_root[1];
     575             : 
     576           0 :     long add_shred_timing  = -fd_tickcount();
     577           0 :     int rv = fd_fec_resolver_add_shred( ctx->resolver, shred, shred_buffer_sz, slot_leader->uc, out_fec_set, out_shred, out_merkle_root );
     578           0 :     add_shred_timing      +=  fd_tickcount();
     579             : 
     580           0 :     fd_histf_sample( ctx->metrics->add_shred_timing, (ulong)add_shred_timing );
     581           0 :     ctx->metrics->shred_processing_result[ rv + FD_FEC_RESOLVER_ADD_SHRED_RETVAL_OFF+FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT ]++;
     582             : 
     583           0 :     if( (rv==FD_FEC_RESOLVER_SHRED_OKAY) | (rv==FD_FEC_RESOLVER_SHRED_COMPLETES) ) {
     584             :       /* Relay this shred */
     585           0 :       ulong fanout = 200UL;
     586           0 :       ulong max_dest_cnt[1];
     587           0 :       do {
     588             :         /* If we've validated the shred and it COMPLETES but we can't
     589             :            compute the destination for whatever reason, don't forward
     590             :            the shred, but still send it to the blockstore. */
     591           0 :         fd_shred_dest_t * sdest = fd_stake_ci_get_sdest_for_slot( ctx->stake_ci, shred->slot );
     592           0 :         if( FD_UNLIKELY( !sdest ) ) break;
     593           0 :         fd_shred_dest_idx_t * dests = fd_shred_dest_compute_children( sdest, &shred, 1UL, _dests, 1UL, fanout, fanout, max_dest_cnt );
     594           0 :         if( FD_UNLIKELY( !dests ) ) break;
     595             : 
     596           0 :         for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, *out_shred, sdest, dests[ j ], ctx->tsorig );
     597           0 :       } while( 0 );
     598             : 
     599           0 :       if( FD_LIKELY( ctx->blockstore && rv==FD_FEC_RESOLVER_SHRED_OKAY ) ) { /* optimize for the compiler - branch predictor will still be correct */
     600           0 :         uchar * buf = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
     601           0 :         ulong   sz  = fd_shred_header_sz( shred->variant );
     602           0 :         fd_memcpy( buf, shred, sz );
     603           0 :         ulong tspub       = fd_frag_meta_ts_comp( fd_tickcount() );
     604           0 :         ulong replay_sig  = fd_disco_shred_replay_sig( shred->slot, shred->idx, shred->fec_set_idx, fd_shred_is_code( fd_shred_type( shred->variant ) ), 0 );
     605           0 :         fd_stem_publish( stem, REPLAY_OUT_IDX, replay_sig, ctx->replay_out_chunk, sz, 0UL, ctx->tsorig, tspub );
     606           0 :         ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, sz, ctx->replay_out_chunk0, ctx->replay_out_wmark );
     607           0 :       }
     608           0 :     }
     609           0 :     if( FD_LIKELY( rv!=FD_FEC_RESOLVER_SHRED_COMPLETES ) ) return;
     610             : 
     611           0 :     FD_TEST( ctx->fec_sets <= *out_fec_set );
     612           0 :     ctx->send_fec_set_idx = (ulong)(*out_fec_set - ctx->fec_sets);
     613           0 :     ctx->shredded_txn_cnt = 0UL;
     614           0 :   } else {
     615             :     /* We know we didn't get overrun, so advance the index */
     616           0 :     ctx->shredder_fec_set_idx = (ctx->shredder_fec_set_idx+1UL)%ctx->shredder_max_fec_set_idx;
     617           0 :   }
     618             :   /* If this was the shred that completed an FEC set or this was a
     619             :      microblock we shredded ourself, we now have a full FEC set that we
     620             :      need to send to the blockstore and on the network (skipping any
     621             :      shreds we already sent). */
     622             : 
     623           0 :   fd_fec_set_t * set = ctx->fec_sets + ctx->send_fec_set_idx;
     624           0 :   fd_shred34_t * s34 = ctx->shred34 + 4UL*ctx->send_fec_set_idx;
     625             : 
     626           0 :   s34[ 0 ].shred_cnt =                         fd_ulong_min( set->data_shred_cnt,   34UL );
     627           0 :   s34[ 1 ].shred_cnt = set->data_shred_cnt   - fd_ulong_min( set->data_shred_cnt,   34UL );
     628           0 :   s34[ 2 ].shred_cnt =                         fd_ulong_min( set->parity_shred_cnt, 34UL );
     629           0 :   s34[ 3 ].shred_cnt = set->parity_shred_cnt - fd_ulong_min( set->parity_shred_cnt, 34UL );
     630             : 
     631           0 :   ulong s34_cnt     = 2UL + !!(s34[ 1 ].shred_cnt) + !!(s34[ 3 ].shred_cnt);
     632           0 :   ulong txn_per_s34 = ctx->shredded_txn_cnt / s34_cnt;
     633             : 
     634             :   /* Attribute the transactions evenly to the non-empty shred34s */
     635           0 :   for( ulong j=0UL; j<4UL; j++ ) s34[ j ].est_txn_cnt = fd_ulong_if( s34[ j ].shred_cnt>0UL, txn_per_s34, 0UL );
     636             : 
     637             :   /* Add whatever is left to the last shred34 */
     638           0 :   s34[ fd_ulong_if( s34[ 3 ].shred_cnt>0UL, 3, 2 ) ].est_txn_cnt += ctx->shredded_txn_cnt - txn_per_s34*s34_cnt;
     639             : 
     640             :   /* Set the sz field so that metrics are more accurate. */
     641           0 :   ulong sz0 = sizeof(fd_shred34_t) - (34UL - s34[ 0 ].shred_cnt)*FD_SHRED_MAX_SZ;
     642           0 :   ulong sz1 = sizeof(fd_shred34_t) - (34UL - s34[ 1 ].shred_cnt)*FD_SHRED_MAX_SZ;
     643           0 :   ulong sz2 = sizeof(fd_shred34_t) - (34UL - s34[ 2 ].shred_cnt)*FD_SHRED_MAX_SZ;
     644           0 :   ulong sz3 = sizeof(fd_shred34_t) - (34UL - s34[ 3 ].shred_cnt)*FD_SHRED_MAX_SZ;
     645             : 
     646           0 :   if( FD_LIKELY( ctx->blockstore ) ) {
     647             :     /* If the shred has a completes flag, then in the replay tile it
     648             :        will do immediate polling for shreds in that FEC set, under
     649             :        the assumption that they live in the blockstore. When a shred
     650             :        completes a FEC set, we need to add the shreds to the
     651             :        blockstore before we notify replay of a completed FEC set.
     652             :        Replay does not poll the blockstore for shreds on notifies of
     653             :        a regular non-completing shred. */
     654             : 
     655           0 :     for( ulong i=0UL; i<set->data_shred_cnt; i++ ) {
     656           0 :       fd_shred_t const * data_shred = (fd_shred_t const *)fd_type_pun_const( set->data_shreds[ i ] );
     657           0 :       fd_blockstore_shred_insert( ctx->blockstore, data_shred );
     658           0 :     }
     659           0 :     if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
     660             :       /* Shred came from block we didn't produce. This is not our leader
     661             :          slot. */
     662           0 :       fd_shred_t const * shred = (fd_shred_t const *)fd_type_pun_const( ctx->shred_buffer );
     663           0 :       uchar * buf = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
     664           0 :       ulong   sz  = fd_shred_header_sz( shred->variant );
     665           0 :       fd_memcpy( buf, shred, sz );
     666           0 :       ulong tspub       = fd_frag_meta_ts_comp( fd_tickcount() );
     667           0 :       ulong replay_sig  = fd_disco_shred_replay_sig( shred->slot, shred->idx, shred->fec_set_idx, fd_shred_is_code( fd_shred_type( shred->variant ) ), 1 );
     668           0 :       fd_stem_publish( stem, REPLAY_OUT_IDX, replay_sig, ctx->replay_out_chunk, sz, 0UL, ctx->tsorig, tspub );
     669           0 :       ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, sz, ctx->replay_out_chunk0, ctx->replay_out_wmark );
     670           0 :     }
     671           0 :   }
     672             : 
     673             :   /* Send to the blockstore, skipping any empty shred34_t s. */
     674           0 :   ulong new_sig = ctx->in_kind[ in_idx ]!=IN_KIND_NET; /* sig==0 means the store tile will do extra checks */
     675           0 :   ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
     676           0 :   fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+0UL ), sz0, 0UL, ctx->tsorig, tspub );
     677           0 :   if( FD_UNLIKELY( s34[ 1 ].shred_cnt ) )
     678           0 :     fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+1UL ), sz1, 0UL, ctx->tsorig, tspub );
     679           0 :   fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+2UL), sz2, 0UL, ctx->tsorig, tspub );
     680           0 :   if( FD_UNLIKELY( s34[ 3 ].shred_cnt ) )
     681           0 :     fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+3UL ), sz3, 0UL, ctx->tsorig, tspub );
     682             : 
     683             :   /* Compute all the destinations for all the new shreds */
     684             : 
     685           0 :   fd_shred_t const * new_shreds[ FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX ];
     686           0 :   ulong k=0UL;
     687           0 :   for( ulong i=0UL; i<set->data_shred_cnt; i++ )
     688           0 :     if( !d_rcvd_test( set->data_shred_rcvd,   i ) )  new_shreds[ k++ ] = (fd_shred_t const *)set->data_shreds  [ i ];
     689           0 :   for( ulong i=0UL; i<set->parity_shred_cnt; i++ )
     690           0 :     if( !p_rcvd_test( set->parity_shred_rcvd, i ) )  new_shreds[ k++ ] = (fd_shred_t const *)set->parity_shreds[ i ];
     691             : 
     692           0 :   if( FD_UNLIKELY( !k ) ) return;
     693           0 :   fd_shred_dest_t * sdest = fd_stake_ci_get_sdest_for_slot( ctx->stake_ci, new_shreds[ 0 ]->slot );
     694           0 :   if( FD_UNLIKELY( !sdest ) ) return;
     695             : 
     696           0 :   ulong out_stride;
     697           0 :   ulong max_dest_cnt[1];
     698           0 :   fd_shred_dest_idx_t * dests;
     699           0 :   if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
     700           0 :     out_stride = k;
     701           0 :     dests = fd_shred_dest_compute_children( sdest, new_shreds, k, _dests, k, fanout, fanout, max_dest_cnt );
     702           0 :   } else {
     703           0 :     out_stride = 1UL;
     704           0 :     *max_dest_cnt = 1UL;
     705           0 :     dests = fd_shred_dest_compute_first   ( sdest, new_shreds, k, _dests );
     706           0 :   }
     707           0 :   if( FD_UNLIKELY( !dests ) ) return;
     708             : 
     709             :   /* Send only the ones we didn't receive. */
     710           0 :   for( ulong i=0UL; i<k; i++ ) for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, new_shreds[ i ], sdest, dests[ j*out_stride+i ], ctx->tsorig );
     711           0 : }
     712             : 
     713             : static void
     714             : privileged_init( fd_topo_t *      topo,
     715           0 :                  fd_topo_tile_t * tile ) {
     716           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     717             : 
     718           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     719           0 :   fd_shred_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_shred_ctx_t ), sizeof( fd_shred_ctx_t ) );
     720             : 
     721           0 :   if( FD_UNLIKELY( !strcmp( tile->shred.identity_key_path, "" ) ) )
     722           0 :     FD_LOG_ERR(( "identity_key_path not set" ));
     723             : 
     724           0 :   ctx->identity_key[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->shred.identity_key_path, /* pubkey only: */ 1 ) );
     725           0 : }
     726             : 
     727             : static void
     728             : fd_shred_signer( void *        signer_ctx,
     729             :                  uchar         signature[ static 64 ],
     730           0 :                  uchar const   merkle_root[ static 32 ] ) {
     731           0 :   fd_keyguard_client_sign( signer_ctx, signature, merkle_root, 32UL, FD_KEYGUARD_SIGN_TYPE_ED25519 );
     732           0 : }
     733             : 
     734             : static void
     735             : unprivileged_init( fd_topo_t *      topo,
     736           0 :                    fd_topo_tile_t * tile ) {
     737           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     738             : 
     739           0 :   if( FD_LIKELY( tile->out_cnt==3UL ) ) { /* frankendancer */
     740           0 :     FD_TEST( 0==strcmp( topo->links[tile->out_link_id[STORE_OUT_IDX]].name,  "shred_store"  ) );
     741           0 :     FD_TEST( 0==strcmp( topo->links[tile->out_link_id[NET_OUT_IDX]].name,    "shred_net"    ) );
     742           0 :     FD_TEST( 0==strcmp( topo->links[tile->out_link_id[SIGN_OUT_IDX]].name,   "shred_sign"   ) );
     743           0 :   } else if( FD_LIKELY( tile->out_cnt==4UL ) ) { /* firedancer */
     744           0 :     FD_TEST( 0==strcmp( topo->links[tile->out_link_id[STORE_OUT_IDX]].name,  "shred_storei"  ) );
     745           0 :     FD_TEST( 0==strcmp( topo->links[tile->out_link_id[NET_OUT_IDX]].name,    "shred_net"    ) );
     746           0 :     FD_TEST( 0==strcmp( topo->links[tile->out_link_id[SIGN_OUT_IDX]].name,   "shred_sign"   ) );
     747           0 :     FD_TEST( 0==strcmp( topo->links[tile->out_link_id[REPLAY_OUT_IDX]].name, "shred_replay" ) );
     748           0 :   } else {
     749           0 :     FD_LOG_ERR(( "shred tile has unexpected cnt of output links %lu", tile->out_cnt ));
     750           0 :   }
     751             : 
     752           0 :   if( FD_UNLIKELY( !tile->out_cnt ) )
     753           0 :     FD_LOG_ERR(( "shred tile has no primary output link" ));
     754             : 
     755           0 :   ulong shred_store_mcache_depth = tile->shred.depth;
     756           0 :   if( topo->links[ tile->out_link_id[ 0 ] ].depth != shred_store_mcache_depth )
     757           0 :     FD_LOG_ERR(( "shred tile out depths are not equal %lu %lu",
     758           0 :                  topo->links[ tile->out_link_id[ 0 ] ].depth, shred_store_mcache_depth ));
     759             : 
     760           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     761           0 :   fd_shred_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_shred_ctx_t ), sizeof( fd_shred_ctx_t ) );
     762             : 
     763           0 :   ctx->round_robin_cnt = fd_topo_tile_name_cnt( topo, tile->name );
     764           0 :   ctx->round_robin_id  = tile->kind_id;
     765           0 :   ctx->batch_cnt       = 0UL;
     766           0 :   ctx->slot            = ULONG_MAX;
     767             : 
     768           0 :   ulong fec_resolver_footprint = fd_fec_resolver_footprint( tile->shred.fec_resolver_depth, 1UL, shred_store_mcache_depth,
     769           0 :                                                             128UL * tile->shred.fec_resolver_depth );
     770           0 :   ulong fec_set_cnt            = shred_store_mcache_depth + tile->shred.fec_resolver_depth + 4UL;
     771             : 
     772           0 :   void * store_out_dcache = topo->links[ tile->out_link_id[ 0 ] ].dcache;
     773             : 
     774           0 :   ulong required_dcache_sz = fec_set_cnt*DCACHE_ENTRIES_PER_FEC_SET*sizeof(fd_shred34_t);
     775           0 :   if( fd_dcache_data_sz( store_out_dcache )<required_dcache_sz ) {
     776           0 :     FD_LOG_ERR(( "shred->store dcache too small. It is %lu bytes but must be at least %lu bytes.",
     777           0 :                  fd_dcache_data_sz( store_out_dcache ),
     778           0 :                  required_dcache_sz ));
     779           0 :   }
     780             : 
     781           0 :   if( FD_UNLIKELY( !tile->shred.fec_resolver_depth ) ) FD_LOG_ERR(( "fec_resolver_depth not set" ));
     782           0 :   if( FD_UNLIKELY( !tile->shred.shred_listen_port  ) ) FD_LOG_ERR(( "shred_listen_port not set" ));
     783             : 
     784           0 :   ulong bank_cnt   = fd_topo_tile_name_cnt( topo, "bank" );
     785           0 :   ulong replay_cnt = fd_topo_tile_name_cnt( topo, "replay" );
     786             : 
     787           0 :   if( FD_UNLIKELY( !bank_cnt && !replay_cnt ) ) FD_LOG_ERR(( "0 bank/replay tiles" ));
     788           0 :   if( FD_UNLIKELY( bank_cnt>MAX_BANK_CNT ) ) FD_LOG_ERR(( "Too many banks" ));
     789             : 
     790           0 :   void * _stake_ci = FD_SCRATCH_ALLOC_APPEND( l, fd_stake_ci_align(),              fd_stake_ci_footprint()            );
     791           0 :   void * _resolver = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_resolver_align(),          fec_resolver_footprint             );
     792           0 :   void * _shredder = FD_SCRATCH_ALLOC_APPEND( l, fd_shredder_align(),              fd_shredder_footprint()            );
     793           0 :   void * _fec_sets = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_fec_set_t),            sizeof(fd_fec_set_t)*fec_set_cnt   );
     794             : 
     795           0 :   fd_fec_set_t * fec_sets = (fd_fec_set_t *)_fec_sets;
     796           0 :   fd_shred34_t * shred34  = (fd_shred34_t *)store_out_dcache;
     797             : 
     798           0 :   for( ulong i=0UL; i<fec_set_cnt; i++ ) {
     799           0 :     fd_shred34_t * p34_base = shred34 + i*DCACHE_ENTRIES_PER_FEC_SET;
     800           0 :     for( ulong k=0UL; k<DCACHE_ENTRIES_PER_FEC_SET; k++ ) {
     801           0 :       fd_shred34_t * p34 = p34_base + k;
     802             : 
     803           0 :       p34->stride   = (ulong)p34->pkts[1].buffer - (ulong)p34->pkts[0].buffer;
     804           0 :       p34->offset   = (ulong)p34->pkts[0].buffer - (ulong)p34;
     805           0 :       p34->shred_sz = fd_ulong_if( k<2UL, 1203UL, 1228UL );
     806           0 :     }
     807             : 
     808           0 :     uchar ** data_shred   = fec_sets[ i ].data_shreds;
     809           0 :     uchar ** parity_shred = fec_sets[ i ].parity_shreds;
     810           0 :     for( ulong j=0UL; j<FD_REEDSOL_DATA_SHREDS_MAX;   j++ ) data_shred  [ j ] = p34_base[       j/34UL ].pkts[ j%34UL ].buffer;
     811           0 :     for( ulong j=0UL; j<FD_REEDSOL_PARITY_SHREDS_MAX; j++ ) parity_shred[ j ] = p34_base[ 2UL + j/34UL ].pkts[ j%34UL ].buffer;
     812           0 :   }
     813             : 
     814           0 : #define NONNULL( x ) (__extension__({                                        \
     815           0 :       __typeof__((x)) __x = (x);                                             \
     816           0 :       if( FD_UNLIKELY( !__x ) ) FD_LOG_ERR(( #x " was unexpectedly NULL" )); \
     817           0 :       __x; }))
     818             : 
     819           0 :   ulong expected_shred_version = tile->shred.expected_shred_version;
     820           0 :   if( FD_LIKELY( !expected_shred_version ) ) {
     821           0 :     ulong busy_obj_id = fd_pod_query_ulong( topo->props, "poh_shred", ULONG_MAX );
     822           0 :     FD_TEST( busy_obj_id!=ULONG_MAX );
     823           0 :     ulong * gossip_shred_version = fd_fseq_join( fd_topo_obj_laddr( topo, busy_obj_id ) );
     824           0 :     FD_LOG_INFO(( "Waiting for shred version to be determined via gossip." ));
     825           0 :     do {
     826           0 :       expected_shred_version = FD_VOLATILE_CONST( *gossip_shred_version );
     827           0 :     } while( expected_shred_version==ULONG_MAX );
     828           0 :   }
     829             : 
     830           0 :   if( FD_UNLIKELY( expected_shred_version > USHORT_MAX ) ) FD_LOG_ERR(( "invalid shred version %lu", expected_shred_version ));
     831           0 :   FD_LOG_INFO(( "Using shred version %hu", (ushort)expected_shred_version ));
     832             : 
     833           0 :   ctx->keyswitch = fd_keyswitch_join( fd_topo_obj_laddr( topo, tile->keyswitch_obj_id ) );
     834           0 :   FD_TEST( ctx->keyswitch );
     835             : 
     836             :   /* populate ctx */
     837           0 :   ulong sign_in_idx = fd_topo_find_tile_in_link( topo, tile, "sign_shred", tile->kind_id );
     838           0 :   FD_TEST( sign_in_idx!=ULONG_MAX );
     839           0 :   fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id[ sign_in_idx ] ];
     840           0 :   fd_topo_link_t * sign_out = &topo->links[ tile->out_link_id[ SIGN_OUT_IDX ] ];
     841           0 :   NONNULL( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client,
     842           0 :                                                             sign_out->mcache,
     843           0 :                                                             sign_out->dcache,
     844           0 :                                                             sign_in->mcache,
     845           0 :                                                             sign_in->dcache ) ) );
     846             : 
     847           0 :   ulong shred_limit = fd_ulong_if( tile->shred.larger_shred_limits_per_block, 32UL*32UL*1024UL, 32UL*1024UL );
     848           0 :   fd_fec_set_t * resolver_sets = fec_sets + (shred_store_mcache_depth+1UL)/2UL + 1UL;
     849           0 :   ctx->shredder = NONNULL( fd_shredder_join     ( fd_shredder_new     ( _shredder, fd_shred_signer, ctx->keyguard_client, (ushort)expected_shred_version ) ) );
     850           0 :   ctx->resolver = NONNULL( fd_fec_resolver_join ( fd_fec_resolver_new ( _resolver,
     851           0 :                                                                         fd_shred_signer, ctx->keyguard_client,
     852           0 :                                                                         tile->shred.fec_resolver_depth, 1UL,
     853           0 :                                                                         (shred_store_mcache_depth+3UL)/2UL,
     854           0 :                                                                         128UL * tile->shred.fec_resolver_depth, resolver_sets,
     855           0 :                                                                         (ushort)expected_shred_version,
     856           0 :                                                                         shred_limit                                           ) ) );
     857             : 
     858           0 :   ctx->shred34  = shred34;
     859           0 :   ctx->fec_sets = fec_sets;
     860             : 
     861           0 :   ctx->stake_ci = fd_stake_ci_join( fd_stake_ci_new( _stake_ci, ctx->identity_key ) );
     862             : 
     863           0 :   ctx->net_id   = (ushort)0;
     864             : 
     865           0 :   fd_ip4_udp_hdr_init( ctx->data_shred_net_hdr,   FD_SHRED_MIN_SZ, 0, tile->shred.shred_listen_port );
     866           0 :   fd_ip4_udp_hdr_init( ctx->parity_shred_net_hdr, FD_SHRED_MAX_SZ, 0, tile->shred.shred_listen_port );
     867             : 
     868           0 :   for( ulong i=0UL; i<tile->in_cnt; i++ ) {
     869           0 :     fd_topo_link_t const * link = &topo->links[ tile->in_link_id[ i ] ];
     870           0 :     fd_topo_wksp_t const * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
     871             : 
     872           0 :     if( FD_LIKELY(      !strcmp( link->name, "net_shred"   ) ) ) ctx->in_kind[ i ] = IN_KIND_NET;
     873           0 :     else if( FD_LIKELY( !strcmp( link->name, "poh_shred"   ) ) ) ctx->in_kind[ i ] = IN_KIND_POH;
     874           0 :     else if( FD_LIKELY( !strcmp( link->name, "stake_out"   ) ) ) ctx->in_kind[ i ] = IN_KIND_STAKE;
     875           0 :     else if( FD_LIKELY( !strcmp( link->name, "crds_shred"  ) ) ) ctx->in_kind[ i ] = IN_KIND_CONTACT;
     876           0 :     else if( FD_LIKELY( !strcmp( link->name, "sign_shred"  ) ) ) ctx->in_kind[ i ] = IN_KIND_SIGN;
     877           0 :     else FD_LOG_ERR(( "shred tile has unexpected input link %lu %s", i, link->name ));
     878             : 
     879           0 :     ctx->in[ i ].mem    = link_wksp->wksp;
     880           0 :     ctx->in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in[ i ].mem, link->dcache );
     881           0 :     ctx->in[ i ].wmark  = fd_dcache_compact_wmark ( ctx->in[ i ].mem, link->dcache, link->mtu );
     882           0 :   }
     883             : 
     884           0 :   fd_topo_link_t * net_out = &topo->links[ tile->out_link_id[ NET_OUT_IDX ] ];
     885             : 
     886           0 :   ctx->net_out_mcache = net_out->mcache;
     887           0 :   ctx->net_out_sync   = fd_mcache_seq_laddr( ctx->net_out_mcache );
     888           0 :   ctx->net_out_depth  = fd_mcache_depth( ctx->net_out_mcache );
     889           0 :   ctx->net_out_seq    = fd_mcache_seq_query( ctx->net_out_sync );
     890           0 :   ctx->net_out_chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( net_out->dcache ), net_out->dcache );
     891           0 :   ctx->net_out_mem    = topo->workspaces[ topo->objs[ net_out->dcache_obj_id ].wksp_id ].wksp;
     892           0 :   ctx->net_out_wmark  = fd_dcache_compact_wmark ( ctx->net_out_mem, net_out->dcache, net_out->mtu );
     893           0 :   ctx->net_out_chunk  = ctx->net_out_chunk0;
     894             : 
     895           0 :   fd_topo_link_t * store_out = &topo->links[ tile->out_link_id[ STORE_OUT_IDX ] ];
     896             : 
     897           0 :   ctx->store_out_mem    = topo->workspaces[ topo->objs[ store_out->dcache_obj_id ].wksp_id ].wksp;
     898           0 :   ctx->store_out_chunk0 = fd_dcache_compact_chunk0( ctx->store_out_mem, store_out->dcache );
     899           0 :   ctx->store_out_wmark  = fd_dcache_compact_wmark ( ctx->store_out_mem, store_out->dcache, store_out->mtu );
     900           0 :   ctx->store_out_chunk  = ctx->store_out_chunk0;
     901             : 
     902           0 :   if( FD_LIKELY( tile->out_cnt==4UL ) ) { /* firedancer */
     903           0 :     fd_topo_link_t * replay_out = &topo->links[ tile->out_link_id[ REPLAY_OUT_IDX ] ];
     904             : 
     905           0 :     ctx->replay_out_mem    = topo->workspaces[ topo->objs[ replay_out->dcache_obj_id ].wksp_id ].wksp;
     906           0 :     ctx->replay_out_chunk0 = fd_dcache_compact_chunk0( ctx->replay_out_mem, replay_out->dcache );
     907           0 :     ctx->replay_out_wmark  = fd_dcache_compact_wmark ( ctx->replay_out_mem, replay_out->dcache, replay_out->mtu );
     908           0 :     ctx->replay_out_chunk  = ctx->replay_out_chunk0;
     909           0 :   }
     910             : 
     911           0 :   ulong blockstore_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "blockstore" );
     912           0 :   if (FD_LIKELY( blockstore_obj_id!=ULONG_MAX )) {
     913           0 :     ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, fd_topo_obj_laddr( topo, blockstore_obj_id ) );
     914           0 :     FD_TEST( ctx->blockstore->shmem->magic == FD_BLOCKSTORE_MAGIC );
     915           0 :   } else {
     916           0 :     ctx->blockstore = NULL;
     917           0 :   }
     918             : 
     919           0 :   ctx->poh_in_expect_seq = 0UL;
     920             : 
     921           0 :   ctx->shredder_fec_set_idx = 0UL;
     922           0 :   ctx->shredder_max_fec_set_idx = (shred_store_mcache_depth+1UL)/2UL + 1UL;
     923             : 
     924           0 :   ctx->send_fec_set_idx    = ULONG_MAX;
     925             : 
     926           0 :   ctx->shred_buffer_sz  = 0UL;
     927           0 :   fd_memset( ctx->shred_buffer, 0xFF, FD_NET_MTU );
     928             : 
     929           0 :   fd_histf_join( fd_histf_new( ctx->metrics->contact_info_cnt,     FD_MHIST_MIN(         SHRED, CLUSTER_CONTACT_INFO_CNT   ),
     930           0 :                                                                    FD_MHIST_MAX(         SHRED, CLUSTER_CONTACT_INFO_CNT   ) ) );
     931           0 :   fd_histf_join( fd_histf_new( ctx->metrics->batch_sz,             FD_MHIST_MIN(         SHRED, BATCH_SZ                   ),
     932           0 :                                                                    FD_MHIST_MAX(         SHRED, BATCH_SZ                   ) ) );
     933           0 :   fd_histf_join( fd_histf_new( ctx->metrics->batch_microblock_cnt, FD_MHIST_MIN(         SHRED, BATCH_MICROBLOCK_CNT       ),
     934           0 :                                                                    FD_MHIST_MAX(         SHRED, BATCH_MICROBLOCK_CNT       ) ) );
     935           0 :   fd_histf_join( fd_histf_new( ctx->metrics->shredding_timing,     FD_MHIST_SECONDS_MIN( SHRED, SHREDDING_DURATION_SECONDS ),
     936           0 :                                                                    FD_MHIST_SECONDS_MAX( SHRED, SHREDDING_DURATION_SECONDS ) ) );
     937           0 :   fd_histf_join( fd_histf_new( ctx->metrics->add_shred_timing,     FD_MHIST_SECONDS_MIN( SHRED, ADD_SHRED_DURATION_SECONDS ),
     938           0 :                                                                    FD_MHIST_SECONDS_MAX( SHRED, ADD_SHRED_DURATION_SECONDS ) ) );
     939           0 :   memset( ctx->metrics->shred_processing_result, '\0', sizeof(ctx->metrics->shred_processing_result) );
     940             : 
     941           0 :   ctx->pending_batch.microblock_cnt = 0UL;
     942           0 :   ctx->pending_batch.txn_cnt        = 0UL;
     943           0 :   ctx->pending_batch.pos            = 0UL;
     944           0 :   ctx->pending_batch.slot           = 0UL;
     945           0 :   fd_memset( ctx->pending_batch.payload, 0, sizeof(ctx->pending_batch.payload) );
     946             : 
     947           0 :   ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
     948           0 :   if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
     949           0 :     FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
     950           0 : }
     951             : 
     952             : static ulong
     953             : populate_allowed_seccomp( fd_topo_t const *      topo,
     954             :                           fd_topo_tile_t const * tile,
     955             :                           ulong                  out_cnt,
     956           0 :                           struct sock_filter *   out ) {
     957           0 :   (void)topo;
     958           0 :   (void)tile;
     959             : 
     960           0 :   populate_sock_filter_policy_fd_shred_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
     961           0 :   return sock_filter_policy_fd_shred_tile_instr_cnt;
     962           0 : }
     963             : 
     964             : static ulong
     965             : populate_allowed_fds( fd_topo_t const *      topo,
     966             :                       fd_topo_tile_t const * tile,
     967             :                       ulong                  out_fds_cnt,
     968           0 :                       int *                  out_fds ) {
     969           0 :   (void)topo;
     970           0 :   (void)tile;
     971             : 
     972           0 :   if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
     973             : 
     974           0 :   ulong out_cnt = 0UL;
     975           0 :   out_fds[ out_cnt++ ] = 2; /* stderr */
     976           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
     977           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     978           0 :   return out_cnt;
     979           0 : }
     980             : 
     981           0 : #define STEM_BURST (5UL)
     982             : 
     983             : /* See explanation in fd_pack */
     984           0 : #define STEM_LAZY  (128L*3000L)
     985             : 
     986           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_shred_ctx_t
     987           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_shred_ctx_t)
     988             : 
     989           0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
     990           0 : #define STEM_CALLBACK_METRICS_WRITE       metrics_write
     991           0 : #define STEM_CALLBACK_BEFORE_FRAG         before_frag
     992           0 : #define STEM_CALLBACK_DURING_FRAG         during_frag
     993           0 : #define STEM_CALLBACK_AFTER_FRAG          after_frag
     994             : 
     995             : #include "../stem/fd_stem.c"
     996             : 
     997             : fd_topo_run_tile_t fd_tile_shred = {
     998             :   .name                     = "shred",
     999             :   .populate_allowed_seccomp = populate_allowed_seccomp,
    1000             :   .populate_allowed_fds     = populate_allowed_fds,
    1001             :   .scratch_align            = scratch_align,
    1002             :   .scratch_footprint        = scratch_footprint,
    1003             :   .privileged_init          = privileged_init,
    1004             :   .unprivileged_init        = unprivileged_init,
    1005             :   .run                      = stem_run,
    1006             : };

Generated by: LCOV version 1.14