Line data Source code
1 : #include "../tiles.h"
2 :
3 : #include "generated/fd_shred_tile_seccomp.h"
4 : #include "../../util/pod/fd_pod_format.h"
5 : #include "fd_shredder.h"
6 : #include "fd_shred_batch.h"
7 : #include "fd_shred_dest.h"
8 : #include "fd_fec_resolver.h"
9 : #include "fd_stake_ci.h"
10 : #include "fd_rnonce_ss.h"
11 : #include "fd_shred_tile.h"
12 : #include "../store/fd_store.h"
13 : #include "../keyguard/fd_keyload.h"
14 : #include "../keyguard/fd_keyguard.h"
15 : #include "../keyguard/fd_keyguard_client.h"
16 : #include "../keyguard/fd_keyswitch.h"
17 : #include "../fd_disco.h"
18 : #include "../net/fd_net_tile.h"
19 : #include "../../flamenco/leaders/fd_leaders.h"
20 : #include "../../util/net/fd_net_headers.h"
21 : #include "../../flamenco/gossip/fd_gossip_message.h"
22 : #include "../../flamenco/runtime/sysvar/fd_sysvar_epoch_schedule.h"
23 : #include "../../discof/tower/fd_tower_slot_rooted.h"
24 :
25 : /* The shred tile handles shreds from two data sources: shreds generated
26 : from microblocks from the leader pipeline, and shreds retransmitted
27 : from the network.
28 :
29 : They have rather different semantics, but at the end of the day, they
30 : both result in a bunch of shreds and FEC sets that need to be sent to
31 : the blockstore and on the network, which is why one tile handles
32 : both.
33 :
34 : We segment the memory for the two types of shreds into two halves of
35 : a dcache because they follow somewhat different flow control
36 : patterns. For flow control, the normal guarantee we want to provide
37 : is that the dcache entry is not overwritten unless the mcache entry
38 : has also been overwritten. The normal way to do this when using both
39 : cyclically and with a 1-to-1 mapping is to make the dcache at least
40 : `burst` entries bigger than the mcache.
41 :
42 : In this tile, we use one output mcache (of depth d) with one output
43 : dcache (which is logically partitioned into two) for the two sources
44 : of data. The worst case for flow control is when we're only sending
45 : with one of the dcache partitions at a time though, so we can
46 : consider them separately.
47 :
48 : Leader pipeline: Every entry triggers s FEC sets to be created, where
49 : s is in [0, FD_SHRED_BATCH_FEC_SETS_MAX]. Each FEC set corresponds
50 : to 1 dcache entry and 1 mcache entry. This means we can have d FEC
51 : sets exposed while producing FD_SHRED_BATCH_FEC_SETS_MAX more FEC
52 : sets, so the leader pipeline section of the dcache needs at least
53 : d+FD_SHRED_BATCH_FEC_SETS_MAX entries.
54 :
55 : From the network: The FEC resolver doesn't use a cyclic order, but it
56 : does promise that once it returns an FEC set, it will return at least
57 : complete_depth FEC sets before returning it again. This means we
58 : want at most complete_depth-1 FEC sets exposed, so
59 : complete_depth=d+1 FEC sets. The FEC resolver has the
60 : ability to keep individual shreds for partial_depth calls, but
61 : because in this version of the shred tile, we send each shred to all
62 : its destinations as soon as we get it, we don't need that
63 : functionality, so we set partial_depth=1.
64 :
65 : Adding these up and plugging in the current value of
66 : BATCH_FEC_SETS_MAX, we get 2*d+6+fec_resolver_depth FEC sets. The
67 : topology code doesn't allow specifying mcache depth and dcache depth
68 : independently. That means we have to lie about the MTU and burst.
69 : We say the MTU is double what it actually is, and then the burst is
70 : 4+fec_resolver_depth/2. That means we get
71 : 2*d+2*(4+fec_resolver_depth/2) >= 2*d+6+fec_resolver_depth FEC sets.
72 :
73 : A note on parallelization. From the network, shreds are distributed
74 : to tiles based on a validator-specific seeded hash of (slot, FEC set
75 : index) so all the shreds for a given FEC set (and any equivocating
76 : FEC set) are processed by the same tile. From the leader pipeline,
77 : the original implementation used to parallelize by batch of
78 : microblocks (so within a block, batches were distributed to different
79 : tiles). To support chained merkle shreds, the current implementation
80 : processes all the batches on tile 0 -- this should be a temporary
81 : state while Solana moves to a newer shred format that support better
82 : parallelization. */
83 :
84 : #define FD_SHRED_TILE_SCRATCH_ALIGN 128UL
85 :
86 0 : #define IN_KIND_CONTACT ( 0UL)
87 0 : #define IN_KIND_EPOCH ( 1UL) /* Firedancer */
88 0 : #define IN_KIND_STAKE ( 2UL) /* Frankendancer */
89 0 : #define IN_KIND_POH ( 3UL)
90 0 : #define IN_KIND_NET ( 4UL)
91 0 : #define IN_KIND_SIGN ( 5UL)
92 : #define IN_KIND_REPAIR ( 6UL)
93 0 : #define IN_KIND_IPECHO ( 7UL)
94 0 : #define IN_KIND_GOSSIP ( 8UL)
95 0 : #define IN_KIND_ROOTED ( 9UL)
96 0 : #define IN_KIND_ROOTEDH (10UL)
97 :
98 0 : #define NET_OUT_IDX 1
99 0 : #define SIGN_OUT_IDX 2
100 :
101 : FD_STATIC_ASSERT( sizeof(fd_entry_batch_meta_t)==56UL, poh_shred_mtu );
102 : FD_STATIC_ASSERT( sizeof(fd_fec_set_t)==FD_SHRED_STORE_MTU, shred_store_mtu );
103 :
104 0 : #define FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT 2
105 :
106 : /* Number of entries in the block_ids table. Each entry is 32 byte.
107 : This table is used to keep track of block ids that we create
108 : when we're leader, so that we can access them whenever we need
109 : a *parent* block id for a new block. Larger table allows to
110 : retrieve older parent block ids. Currently it's set for worst
111 : case parent offset of USHORT_MAX (max allowed in a shred),
112 : making the total table 2MiB.
113 : See also comment on chained_merkle_root. */
114 0 : #define BLOCK_IDS_TABLE_CNT USHORT_MAX
115 :
116 : /* See note on parallelization above. Currently we process all batches in tile 0. */
117 : #if 1
118 : #define SHOULD_PROCESS_THESE_SHREDS ( ctx->round_robin_id==0 )
119 : #else
120 : #define SHOULD_PROCESS_THESE_SHREDS ( ctx->batch_cnt%ctx->round_robin_cnt==ctx->round_robin_id )
121 : #endif
122 :
123 : /* The behavior of the shred tile is slightly different for
124 : Frankendancer vs Firedancer. For example, Frankendancer produces
125 : chained merkle shreds, while Firedancer doesn't yet. We can check
126 : at runtime the difference by inspecting the topology. The simplest
127 : way is to test if ctx->store is initialized.
128 :
129 : FIXME don't assume only frank vs. fire */
130 : #define IS_FIREDANCER ( ctx->store!=NULL )
131 :
132 : typedef union {
133 : struct {
134 : fd_wksp_t * mem;
135 : ulong chunk0;
136 : ulong wmark;
137 : };
138 : fd_net_rx_bounds_t net_rx;
139 : } fd_shred_in_ctx_t;
140 :
141 : typedef struct {
142 : fd_shredder_t * shredder;
143 : fd_fec_resolver_t * resolver;
144 : ulong shred_limit;
145 : fd_pubkey_t identity_key[1]; /* Just the public key */
146 :
147 : ulong round_robin_id;
148 : ulong round_robin_cnt;
149 : /* Number of batches shredded from PoH during the current slot.
150 : This should be the same for all the shred tiles. */
151 : ulong batch_cnt;
152 : /* Slot of the most recent microblock we've seen from PoH,
153 : or 0 if we haven't seen one yet */
154 : ulong slot;
155 :
156 : fd_rnonce_ss_t repair_nonce_ss[1];
157 :
158 : fd_keyswitch_t * keyswitch;
159 : fd_keyguard_client_t keyguard_client[1];
160 :
161 : fd_fec_set_t * fec_sets;
162 :
163 : fd_stake_ci_t * stake_ci;
164 : /* These are used in between during_frag and after_frag */
165 : fd_shred_dest_weighted_t * new_dest_ptr;
166 : ulong new_dest_cnt;
167 : ulong shredded_txn_cnt;
168 : ulong new_root;
169 :
170 : ulong poh_in_expect_seq;
171 :
172 : ushort net_id;
173 :
174 : int skip_frag;
175 :
176 : ulong adtl_dests_leader_cnt;
177 : fd_shred_dest_weighted_t adtl_dests_leader [ FD_TOPO_ADTL_DESTS_MAX ];
178 : ulong adtl_dests_retransmit_cnt;
179 : fd_shred_dest_weighted_t adtl_dests_retransmit[ FD_TOPO_ADTL_DESTS_MAX ];
180 :
181 : fd_ip4_udp_hdrs_t data_shred_net_hdr [1];
182 : fd_ip4_udp_hdrs_t parity_shred_net_hdr[1];
183 :
184 : ulong shredder_fec_set_idx; /* In [0, shredder_max_fec_set_idx) */
185 : ulong shredder_max_fec_set_idx; /* exclusive */
186 :
187 : uchar shredder_merkle_root[32];
188 :
189 : ulong send_fec_set_idx[ FD_SHRED_BATCH_FEC_SETS_MAX ];
190 : ulong send_fec_set_cnt;
191 : ulong tsorig; /* timestamp of the last packet in compressed form */
192 :
193 : /* Includes Ethernet, IP, UDP headers */
194 : ulong shred_buffer_sz;
195 : uchar shred_buffer[ FD_NET_MTU ];
196 :
197 : /* resolver_seed gets generated in privileged_init but used in
198 : unprivileged_init, so we store it here in between. */
199 : ulong resolver_seed;
200 :
201 : fd_shred_in_ctx_t in[ 32 ];
202 : int in_kind[ 32 ];
203 :
204 : fd_wksp_t * net_out_mem;
205 : ulong net_out_chunk0;
206 : ulong net_out_wmark;
207 : ulong net_out_chunk;
208 :
209 : ulong store_out_idx;
210 : fd_wksp_t * store_out_mem;
211 : ulong store_out_chunk0;
212 : ulong store_out_wmark;
213 : ulong store_out_chunk;
214 :
215 : /* This is the output link for shreds that is currently consumed by
216 : the repair and replay tile. */
217 : ulong shred_out_idx;
218 : fd_wksp_t * shred_out_mem;
219 : ulong shred_out_chunk0;
220 : ulong shred_out_wmark;
221 : ulong shred_out_chunk;
222 :
223 : fd_store_t * store;
224 :
225 : fd_gossip_update_message_t gossip_upd_buf[1];
226 :
227 : struct {
228 : fd_histf_t contact_info_cnt[ 1 ];
229 : fd_histf_t batch_sz[ 1 ];
230 : fd_histf_t batch_microblock_cnt[ 1 ];
231 : fd_histf_t shredding_timing[ 1 ];
232 : fd_histf_t add_shred_timing[ 1 ];
233 : ulong shred_processing_result[ FD_FEC_RESOLVER_ADD_SHRED_RETVAL_CNT+FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT ];
234 : ulong invalid_block_id_cnt;
235 : ulong shred_rejected_unchained_cnt;
236 : ulong repair_rcv_cnt;
237 : ulong repair_rcv_bytes;
238 : ulong turbine_rcv_cnt;
239 : ulong turbine_rcv_bytes;
240 : ulong bad_nonce;
241 : } metrics[ 1 ];
242 :
243 : struct {
244 : ulong txn_cnt;
245 : ulong pos; /* in payload, range [0, FD_SHRED_BATCH_RAW_BUF_SZ-8UL) */
246 : ulong slot; /* set to 0 when pos==0 */
247 : union {
248 : struct {
249 : ulong microblock_cnt;
250 : uchar payload[ FD_SHRED_BATCH_RAW_BUF_SZ - 8UL ];
251 : };
252 : uchar raw[ FD_SHRED_BATCH_RAW_BUF_SZ ];
253 : };
254 : } pending_batch;
255 :
256 : fd_epoch_schedule_t epoch_schedule[1];
257 : fd_shred_features_activation_t features_activation[1];
258 : /* too large to be left in the stack */
259 : fd_shred_dest_idx_t scratchpad_dests[ FD_SHRED_DEST_MAX_FANOUT*(FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX) ];
260 :
261 : uchar * chained_merkle_root;
262 : fd_bmtree_node_t out_merkle_roots[ FD_SHRED_BATCH_FEC_SETS_MAX ];
263 : uchar block_ids[ BLOCK_IDS_TABLE_CNT ][ FD_SHRED_MERKLE_ROOT_SZ ];
264 :
265 : /* Bank object that we receive from the PoH tile and pass on to
266 : the store tile for setting the block_id of a slot. */
267 : void const * leader_bank;
268 : } fd_shred_ctx_t;
269 :
270 : /* shred features are generally considered active at the epoch *following*
271 : the epoch in which the feature gate is activated.
272 :
273 : As an optimization, when the activation slot is received, it is converted
274 : into the first slot of the subsequent epoch. This allows for a more
275 : efficient check (shred_slot >= feature_slot) and avoids the overhead of
276 : repeatedly converting slots into epochs for comparison.
277 :
278 : This function is only for Firedancer, while Frankendancer already receives
279 : the final activation slot from POH tile.
280 :
281 : In Agave, this is done with check_feature_activation():
282 : https://github.com/anza-xyz/agave/blob/v3.1.4/turbine/src/cluster_nodes.rs#L771
283 : https://github.com/anza-xyz/agave/blob/v3.1.4/core/src/shred_fetch_stage.rs#L456 */
284 : static inline ulong
285 0 : fd_shred_get_feature_activation_slot0( ulong feature_slot, fd_shred_ctx_t * ctx ) {
286 : /* if the feature does not have an activation slot yet, return ULONG_MAX */
287 0 : if( FD_UNLIKELY( feature_slot==ULONG_MAX ) ) {
288 0 : return ULONG_MAX;
289 0 : }
290 : /* if we don't have an epoch schedule yet, return ULONG_MAX */
291 0 : if( FD_UNLIKELY( ctx->epoch_schedule->slots_per_epoch==0 ) ) {
292 0 : return ULONG_MAX;
293 0 : }
294 : /* compute the activation epoch, add one, return the first slot. */
295 0 : ulong feature_epoch = 1 + fd_slot_to_epoch( ctx->epoch_schedule, feature_slot, NULL );
296 0 : return fd_epoch_slot0( ctx->epoch_schedule, feature_epoch );
297 0 : }
298 :
299 : FD_FN_CONST static inline ulong
300 0 : scratch_align( void ) {
301 0 : return 128UL;
302 0 : }
303 :
304 : FD_FN_PURE static inline ulong
305 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
306 :
307 0 : ulong fec_resolver_footprint = fd_fec_resolver_footprint( tile->shred.fec_resolver_depth, 1UL, tile->shred.depth+1UL,
308 0 : 128UL * tile->shred.fec_resolver_depth );
309 0 : ulong l = FD_LAYOUT_INIT;
310 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_shred_ctx_t), sizeof(fd_shred_ctx_t) );
311 0 : l = FD_LAYOUT_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
312 0 : l = FD_LAYOUT_APPEND( l, fd_fec_resolver_align(), fec_resolver_footprint );
313 0 : l = FD_LAYOUT_APPEND( l, fd_shredder_align(), fd_shredder_footprint() );
314 0 : return FD_LAYOUT_FINI( l, scratch_align() );
315 0 : }
316 :
317 : static inline void
318 0 : during_housekeeping( fd_shred_ctx_t * ctx ) {
319 0 : if( FD_UNLIKELY( fd_keyswitch_state_query( ctx->keyswitch )==FD_KEYSWITCH_STATE_SWITCH_PENDING ) ) {
320 0 : ulong seq_must_complete = ctx->keyswitch->param;
321 :
322 0 : if( FD_UNLIKELY( fd_seq_lt( ctx->poh_in_expect_seq, seq_must_complete ) ) ) {
323 : /* See fd_keyswitch.h, we need to flush any in-flight shreds from
324 : the leader pipeline before switching key. */
325 0 : FD_LOG_WARNING(( "Flushing in-flight unpublished shreds, must reach seq %lu, currently at %lu ...", seq_must_complete, ctx->poh_in_expect_seq ));
326 0 : return;
327 0 : }
328 :
329 0 : memcpy( ctx->identity_key->uc, ctx->keyswitch->bytes, 32UL );
330 0 : fd_stake_ci_set_identity( ctx->stake_ci, ctx->identity_key );
331 0 : fd_keyswitch_state( ctx->keyswitch, FD_KEYSWITCH_STATE_COMPLETED );
332 0 : }
333 0 : }
334 :
335 : static inline void
336 0 : metrics_write( fd_shred_ctx_t * ctx ) {
337 0 : FD_MHIST_COPY( SHRED, CONTACT_INFO_PER_MESSAGE, ctx->metrics->contact_info_cnt );
338 0 : FD_MHIST_COPY( SHRED, BATCH_SIZE_BYTES, ctx->metrics->batch_sz );
339 0 : FD_MHIST_COPY( SHRED, MICROBLOCK_PER_BATCH, ctx->metrics->batch_microblock_cnt );
340 0 : FD_MHIST_COPY( SHRED, SHREDDING_DURATION_SECONDS, ctx->metrics->shredding_timing );
341 0 : FD_MHIST_COPY( SHRED, ADD_SHRED_DURATION_SECONDS, ctx->metrics->add_shred_timing );
342 0 : FD_MCNT_SET ( SHRED, SHRED_REPAIR_RX, ctx->metrics->repair_rcv_cnt );
343 0 : FD_MCNT_SET ( SHRED, SHRED_REPAIR_RX_BYTES, ctx->metrics->repair_rcv_bytes );
344 0 : FD_MCNT_SET ( SHRED, SHRED_TURBINE_RX, ctx->metrics->turbine_rcv_cnt );
345 0 : FD_MCNT_SET ( SHRED, SHRED_TURBINE_RX_BYTES, ctx->metrics->turbine_rcv_bytes );
346 0 : FD_MCNT_SET ( SHRED, NONCE_INVALID, ctx->metrics->bad_nonce );
347 :
348 0 : FD_MCNT_SET ( SHRED, BLOCK_ID_INVALID, ctx->metrics->invalid_block_id_cnt );
349 0 : FD_MCNT_SET ( SHRED, SHRED_UNCHAINED_REJECTED, ctx->metrics->shred_rejected_unchained_cnt );
350 :
351 0 : FD_MCNT_ENUM_COPY( SHRED, SHRED_PROCESSED, ctx->metrics->shred_processing_result );
352 0 : }
353 :
354 : static inline void
355 : handle_new_cluster_contact_info( fd_shred_ctx_t * ctx,
356 0 : uchar const * buf ) {
357 0 : ulong const * header = (ulong const *)fd_type_pun_const( buf );
358 :
359 0 : ulong dest_cnt = header[ 0 ];
360 0 : fd_histf_sample( ctx->metrics->contact_info_cnt, dest_cnt );
361 :
362 0 : if( dest_cnt >= MAX_SHRED_DESTS )
363 0 : FD_LOG_ERR(( "Cluster nodes had %lu destinations, which was more than the max of %lu", dest_cnt, MAX_SHRED_DESTS ));
364 :
365 0 : fd_shred_dest_wire_t const * in_dests = fd_type_pun_const( header+1UL );
366 0 : fd_shred_dest_weighted_t * dests = fd_stake_ci_dest_add_init( ctx->stake_ci );
367 :
368 0 : ctx->new_dest_ptr = dests;
369 0 : ctx->new_dest_cnt = dest_cnt;
370 :
371 0 : for( ulong i=0UL; i<dest_cnt; i++ ) {
372 0 : memcpy( dests[i].pubkey.uc, in_dests[i].pubkey, 32UL );
373 0 : dests[i].ip4 = in_dests[i].ip4_addr;
374 0 : dests[i].port = in_dests[i].udp_port;
375 0 : }
376 0 : }
377 :
378 : static inline void
379 0 : finalize_new_cluster_contact_info( fd_shred_ctx_t * ctx ) {
380 0 : fd_stake_ci_dest_add_fini( ctx->stake_ci, ctx->new_dest_cnt );
381 0 : }
382 :
383 : static inline int
384 : before_frag( fd_shred_ctx_t * ctx,
385 : ulong in_idx,
386 : ulong seq,
387 0 : ulong sig ) {
388 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_IPECHO ) ) {
389 0 : FD_TEST( sig!=0UL && sig<=USHORT_MAX );
390 0 : fd_shredder_set_shred_version ( ctx->shredder, (ushort)sig );
391 0 : fd_fec_resolver_set_shred_version( ctx->resolver, (ushort)sig );
392 0 : return 1;
393 0 : }
394 :
395 0 : if( FD_UNLIKELY( !ctx->shredder->shred_version ) ) return -1;
396 :
397 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH ) ) {
398 0 : ctx->poh_in_expect_seq = seq+1UL;
399 0 : return (int)(fd_disco_poh_sig_pkt_type( sig )!=POH_PKT_TYPE_MICROBLOCK) &
400 0 : (int)(fd_disco_poh_sig_pkt_type( sig )!=POH_PKT_TYPE_FEAT_ACT_SLOT) &
401 0 : (int)(fd_disco_poh_sig_pkt_type( sig )!=POH_PKT_TYPE_LEADER_BANK);
402 0 : }
403 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
404 0 : return (int)(fd_disco_netmux_sig_proto( sig )!=DST_PROTO_SHRED) & (int)(fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR);
405 0 : }
406 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ){
407 0 : return sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO &&
408 0 : sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE;
409 0 : }
410 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_ROOTEDH ) ) {
411 0 : return sig!=0UL; /* only care about rooted banks, not completed blockhash */
412 0 : }
413 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_ROOTED ) ) {
414 0 : return sig!=FD_TOWER_SIG_SLOT_ROOTED; /* only care about slot_confirmed messages */
415 0 : }
416 0 : return 0;
417 0 : }
418 :
419 : static void
420 : during_frag( fd_shred_ctx_t * ctx,
421 : ulong in_idx,
422 : ulong seq FD_PARAM_UNUSED,
423 : ulong sig,
424 : ulong chunk,
425 : ulong sz,
426 0 : ulong ctl ) {
427 :
428 0 : ctx->skip_frag = 0;
429 :
430 0 : ctx->tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
431 :
432 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPAIR ) ) {
433 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>FD_NET_MTU ) )
434 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
435 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
436 :
437 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
438 0 : fd_memcpy( ctx->shred_buffer, dcache_entry, sz );
439 0 : return;
440 0 : }
441 :
442 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_CONTACT ) ) {
443 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
444 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
445 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
446 :
447 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
448 0 : handle_new_cluster_contact_info( ctx, dcache_entry );
449 0 : return;
450 0 : }
451 :
452 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) {
453 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>sizeof(fd_gossip_update_message_t) ) )
454 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
455 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
456 0 : uchar const * gossip_upd_msg = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
457 0 : fd_memcpy( ctx->gossip_upd_buf, gossip_upd_msg, sz );
458 0 : return;
459 0 : }
460 :
461 : /* Firedancer only */
462 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_ROOTED ) ) {
463 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz<sizeof(fd_tower_slot_rooted_t) ) )
464 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
465 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
466 0 : fd_tower_slot_rooted_t const * rooted_msg = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
467 0 : ctx->new_root = rooted_msg->slot;
468 0 : return;
469 0 : }
470 :
471 : /* Frankendancer only */
472 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_ROOTEDH ) ) {
473 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz<sizeof(fd_rooted_bank_t) ) )
474 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
475 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
476 : /* The message format is a pointer to the bank (which is in the
477 : agave address space, so we couldn't access it even if we wanted
478 : to) followed by the rooted slot. */
479 0 : ulong const * replay_msg = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
480 0 : ctx->new_root = replay_msg[ 1 ];
481 0 : return;
482 0 : }
483 :
484 : /* Firedancer only */
485 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_EPOCH ) ) {
486 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
487 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
488 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
489 :
490 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
491 0 : fd_epoch_info_msg_t const * epoch_msg = fd_type_pun_const( dcache_entry );
492 :
493 0 : FD_TEST( epoch_msg->staked_vote_cnt<=MAX_COMPRESSED_STAKE_WEIGHTS );
494 0 : FD_TEST( epoch_msg->staked_id_cnt<=MAX_SHRED_DESTS );
495 :
496 0 : fd_stake_ci_epoch_msg_init( ctx->stake_ci, epoch_msg );
497 :
498 0 : *ctx->epoch_schedule = epoch_msg->epoch_schedule;
499 0 : ctx->features_activation->enforce_fixed_fec_set = fd_shred_get_feature_activation_slot0(
500 0 : epoch_msg->features.enforce_fixed_fec_set, ctx );
501 0 : ctx->features_activation->discard_unexpected_data_complete_shreds = fd_shred_get_feature_activation_slot0(
502 0 : epoch_msg->features.discard_unexpected_data_complete_shreds, ctx );
503 :
504 0 : fd_fec_resolver_set_discard_unexpected_data_complete_shreds( ctx->resolver,
505 0 : ctx->features_activation->discard_unexpected_data_complete_shreds );
506 :
507 0 : return;
508 0 : }
509 :
510 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_STAKE ) ) {
511 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
512 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
513 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
514 :
515 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
516 0 : fd_stake_ci_stake_msg_init( ctx->stake_ci, fd_type_pun_const( dcache_entry ) );
517 0 : return;
518 0 : }
519 :
520 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH ) ) {
521 0 : ctx->send_fec_set_cnt = 0UL;
522 :
523 0 : if( FD_UNLIKELY( fd_disco_poh_sig_pkt_type( sig )==POH_PKT_TYPE_LEADER_BANK ) ) {
524 : /* Only one tile needs to act on this. Other tiles see the frag but
525 : ignore it. */
526 0 : if( ctx->round_robin_id!=0UL ) return;
527 : /* poh is handing off a leader bank pointer for a slot we may
528 : be leader for. Copy the pointer out of the dcache for
529 : after_frag to attach to the FEC sets for this slot. */
530 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz!=sizeof(void const *) ) )
531 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
532 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
533 :
534 0 : void const * const * src = (void const * const *)fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
535 0 : ctx->leader_bank = *src;
536 0 : return;
537 0 : }
538 :
539 0 : if( FD_UNLIKELY( (fd_disco_poh_sig_pkt_type( sig )==POH_PKT_TYPE_FEAT_ACT_SLOT) ) ) {
540 : /* There is a subset of FD_SHRED_FEATURES_ACTIVATION_... slots that
541 : the shred tile needs to be aware of. Since this requires the
542 : bank, we are forced (so far) to receive them from the poh tile
543 : (as a POH_PKT_TYPE_FEAT_ACT_SLOT). */
544 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
545 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz!=(sizeof(fd_shred_features_activation_t)) ) )
546 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
547 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
548 :
549 0 : fd_shred_features_activation_t const * act_data = (fd_shred_features_activation_t const *)dcache_entry;
550 0 : memcpy( ctx->features_activation, act_data, sizeof(fd_shred_features_activation_t) );
551 :
552 0 : fd_fec_resolver_set_discard_unexpected_data_complete_shreds( ctx->resolver,
553 0 : ctx->features_activation->discard_unexpected_data_complete_shreds );
554 0 : }
555 0 : else { /* (fd_disco_poh_sig_pkt_type( sig )==POH_PKT_TYPE_MICROBLOCK) */
556 : /* This is a frag from the PoH tile. We'll copy it to our pending
557 : microblock batch and shred it if necessary (last in block or
558 : above watermark). We just go ahead and shred it here, even
559 : though we may get overrun. If we do end up getting overrun, we
560 : just won't send these shreds out and we'll reuse the FEC set for
561 : the next one. From a higher level though, if we do get overrun,
562 : a bunch of shreds will never be transmitted, and we'll end up
563 : producing a block that never lands on chain. */
564 :
565 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
566 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>FD_POH_SHRED_MTU ||
567 0 : sz<(sizeof(fd_entry_batch_meta_t)+sizeof(fd_entry_batch_header_t)) ) )
568 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
569 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
570 :
571 0 : fd_entry_batch_meta_t const * entry_meta = (fd_entry_batch_meta_t const *)dcache_entry;
572 0 : uchar const * entry = dcache_entry + sizeof(fd_entry_batch_meta_t);
573 0 : ulong entry_sz = sz - sizeof(fd_entry_batch_meta_t);
574 :
575 0 : fd_entry_batch_header_t const * microblock = (fd_entry_batch_header_t const *)entry;
576 :
577 : /* It should never be possible for this to fail, but we check it
578 : anyway. */
579 0 : FD_TEST( entry_sz + ctx->pending_batch.pos <= sizeof(ctx->pending_batch.payload) );
580 :
581 0 : ulong target_slot = fd_disco_poh_sig_slot( sig );
582 0 : if( FD_UNLIKELY( (ctx->pending_batch.microblock_cnt>0) & (ctx->pending_batch.slot!=target_slot) ) ) {
583 : /* TODO: The Agave client sends a dummy entry batch with only 1
584 : byte and the block-complete bit set. This helps other
585 : validators know that the block is dead and they should not try
586 : to continue building a fork on it. We probably want a similar
587 : approach eventually. */
588 0 : FD_LOG_WARNING(( "Abandoning %lu microblocks for slot %lu and switching to slot %lu",
589 0 : ctx->pending_batch.microblock_cnt, ctx->pending_batch.slot, target_slot ));
590 0 : ctx->pending_batch.slot = 0UL;
591 0 : ctx->pending_batch.pos = 0UL;
592 0 : ctx->pending_batch.microblock_cnt = 0UL;
593 0 : ctx->pending_batch.txn_cnt = 0UL;
594 0 : ctx->batch_cnt = 0UL;
595 :
596 0 : FD_MCNT_INC( SHRED, MICROBLOCK_ABANDONED, 1UL );
597 0 : }
598 :
599 0 : ctx->pending_batch.slot = target_slot;
600 : /* We want to send out some shreds immediately when we start a new
601 : slot to help with leader targeting. */
602 0 : int new_slot = 0;
603 0 : if( FD_UNLIKELY( target_slot!=ctx->slot )) {
604 : /* Reset batch count if we are in a new slot */
605 0 : ctx->batch_cnt = 0UL;
606 0 : ctx->slot = target_slot;
607 0 : new_slot = 1;
608 :
609 : /* At the beginning of a new slot, prepare chained_merkle_root.
610 : chained_merkle_root is initialized at the block_id of the parent
611 : block, there's two cases:
612 :
613 : 1. block_id is passed in by the poh tile:
614 : - it's always passed when parent block had a different leader
615 : - it may be passed when we were leader for parent block (there
616 : are race conditions when it's not passed)
617 :
618 : 2. block_id is taken from block_ids table if we were the leader
619 : for the parent block (when we were NOT the leader, because of
620 : equivocation, we can't store block_id in the table)
621 :
622 : chained_merkle_root is stored in block_ids table at target_slot
623 : and it's progressively updated as more microblocks are received.
624 : As a result, when we move to a new slot, the block_ids table at
625 : the old slot will contain the block_id.
626 :
627 : The block_ids table is designed to protect against the race condition
628 : case in 1., therefore the table may not be set in some cases, e.g. if
629 : a validator (re)starts, but in those cases we don't expect the race
630 : condition to apply. */
631 0 : ctx->chained_merkle_root = ctx->block_ids[ target_slot % BLOCK_IDS_TABLE_CNT ];
632 0 : if( FD_UNLIKELY( SHOULD_PROCESS_THESE_SHREDS ) ) {
633 0 : if( FD_LIKELY( entry_meta->parent_block_id_valid ) ) {
634 : /* 1. Initialize chained_merkle_root sent from poh tile */
635 0 : memcpy( ctx->chained_merkle_root, entry_meta->parent_block_id, FD_SHRED_MERKLE_ROOT_SZ );
636 0 : } else {
637 0 : ulong parent_slot = target_slot - entry_meta->parent_offset;
638 0 : fd_epoch_leaders_t const * lsched = fd_stake_ci_get_lsched_for_slot( ctx->stake_ci, parent_slot );
639 0 : fd_pubkey_t const * slot_leader = fd_epoch_leaders_get( lsched, parent_slot );
640 :
641 0 : if( lsched && slot_leader && fd_memeq( slot_leader, ctx->identity_key, sizeof(fd_pubkey_t) ) ) {
642 : /* 2. Initialize chained_merkle_root from block_ids table, if we were the leader */
643 0 : memcpy( ctx->chained_merkle_root, ctx->block_ids[ parent_slot % BLOCK_IDS_TABLE_CNT ], FD_SHRED_MERKLE_ROOT_SZ );
644 0 : } else {
645 : /* This should never happen, log a metric and set chained_merkle_root to 0 */
646 0 : ctx->metrics->invalid_block_id_cnt++;
647 0 : memset( ctx->chained_merkle_root, 0, FD_SHRED_MERKLE_ROOT_SZ );
648 0 : }
649 0 : }
650 0 : }
651 0 : }
652 :
653 0 : if( FD_LIKELY( !SHOULD_PROCESS_THESE_SHREDS ) ) {
654 : /* If we are not processing this batch, filter in after_frag. */
655 0 : ctx->skip_frag = 1;
656 0 : }
657 :
658 0 : ulong pending_batch_wmark = FD_SHRED_BATCH_WMARK_CHAINED;
659 0 : uchar * chained_merkle_root = ctx->chained_merkle_root;
660 0 : ulong load_for_32_shreds = FD_SHREDDER_CHAINED_FEC_SET_PAYLOAD_SZ;
661 : /* All fec sets in the last batch of a block need to be resigned.
662 : This needs to match Agave's behavior - as a reference, see:
663 : https://github.com/anza-xyz/agave/blob/v2.3/ledger/src/shred/merkle.rs#L1040 */
664 0 : if( FD_UNLIKELY( entry_meta->block_complete ) ) {
665 0 : pending_batch_wmark = FD_SHRED_BATCH_WMARK_RESIGNED;
666 : /* chained_merkle_root also applies to resigned FEC sets. */
667 0 : load_for_32_shreds = FD_SHREDDER_RESIGNED_FEC_SET_PAYLOAD_SZ;
668 0 : }
669 :
670 : /* If this microblock completes the block, the batch is then
671 : finalized here. Otherwise, we check whether the new entry
672 : would exceed the pending_batch_wmark. If true, then the
673 : batch is closed now, shredded, and a new batch is started
674 : with the incoming microblock. If false, no shredding takes
675 : place, and the microblock is added to the current batch. */
676 0 : int forced_end_batch = entry_meta->block_complete | new_slot;
677 0 : int batch_would_exceed_wmark = ( ctx->pending_batch.pos + entry_sz ) > pending_batch_wmark;
678 0 : int include_in_current_batch = forced_end_batch | ( !batch_would_exceed_wmark );
679 0 : int process_current_batch = forced_end_batch | batch_would_exceed_wmark;
680 0 : int init_new_batch = !include_in_current_batch;
681 :
682 0 : if( FD_LIKELY( include_in_current_batch ) ) {
683 0 : if( FD_UNLIKELY( SHOULD_PROCESS_THESE_SHREDS ) ) {
684 : /* Ugh, yet another memcpy */
685 0 : fd_memcpy( ctx->pending_batch.payload + ctx->pending_batch.pos, entry, entry_sz );
686 0 : }
687 0 : ctx->pending_batch.pos += entry_sz;
688 0 : ctx->pending_batch.microblock_cnt += 1UL;
689 0 : ctx->pending_batch.txn_cnt += microblock->txn_cnt;
690 0 : }
691 :
692 0 : if( FD_LIKELY( process_current_batch )) {
693 : /* Batch and padding size calculation. */
694 0 : ulong batch_sz = sizeof(ulong) + ctx->pending_batch.pos; /* without padding */
695 0 : ulong batch_sz_padded = load_for_32_shreds * ( ( batch_sz + load_for_32_shreds - 1UL ) / load_for_32_shreds );
696 0 : ulong padding_sz = batch_sz_padded - batch_sz;
697 :
698 0 : if( FD_UNLIKELY( SHOULD_PROCESS_THESE_SHREDS ) ) {
699 : /* If it's our turn, shred this batch. FD_UNLIKELY because shred
700 : tile cnt generally >= 2 */
701 :
702 0 : long shredding_timing = -fd_tickcount();
703 :
704 0 : fd_memset( ctx->pending_batch.payload + ctx->pending_batch.pos, 0, padding_sz );
705 :
706 0 : ctx->send_fec_set_cnt = 0UL; /* verbose */
707 0 : ctx->shredded_txn_cnt = ctx->pending_batch.txn_cnt;
708 :
709 0 : fd_shredder_init_batch( ctx->shredder, ctx->pending_batch.raw, batch_sz_padded, target_slot, entry_meta );
710 :
711 0 : ulong pend_sz = batch_sz_padded;
712 0 : ulong pend_idx = 0;
713 0 : while( pend_sz > 0UL ) {
714 :
715 0 : fd_fec_set_t * out = ctx->fec_sets + ctx->shredder_fec_set_idx;
716 :
717 0 : FD_TEST( fd_shredder_next_fec_set( ctx->shredder, out, chained_merkle_root ) );
718 0 : memcpy( ctx->out_merkle_roots[pend_idx].hash, chained_merkle_root, 32UL );
719 :
720 0 : out->data_shred_rcvd = 0U;
721 0 : out->parity_shred_rcvd = 0U;
722 :
723 0 : ctx->send_fec_set_idx[ ctx->send_fec_set_cnt ] = ctx->shredder_fec_set_idx;
724 0 : ctx->send_fec_set_cnt += 1UL;
725 0 : ctx->shredder_fec_set_idx = (ctx->shredder_fec_set_idx+1UL)%ctx->shredder_max_fec_set_idx;
726 :
727 0 : pend_sz -= load_for_32_shreds;
728 0 : pend_idx++;
729 0 : }
730 :
731 0 : fd_shredder_fini_batch( ctx->shredder );
732 0 : shredding_timing += fd_tickcount();
733 :
734 : /* Update metrics */
735 0 : fd_histf_sample( ctx->metrics->batch_sz, batch_sz /* without padding */ );
736 0 : fd_histf_sample( ctx->metrics->batch_microblock_cnt, ctx->pending_batch.microblock_cnt );
737 0 : fd_histf_sample( ctx->metrics->shredding_timing, (ulong)shredding_timing );
738 0 : } else {
739 0 : ctx->send_fec_set_cnt = 0UL; /* verbose */
740 :
741 0 : fd_shredder_skip_batch( ctx->shredder, batch_sz_padded, target_slot, entry_meta->block_complete );
742 0 : }
743 :
744 0 : ctx->pending_batch.slot = 0UL;
745 0 : ctx->pending_batch.pos = 0UL;
746 0 : ctx->pending_batch.microblock_cnt = 0UL;
747 0 : ctx->pending_batch.txn_cnt = 0UL;
748 0 : ctx->batch_cnt++;
749 0 : }
750 :
751 0 : if( FD_UNLIKELY( init_new_batch ) ) {
752 : /* TODO: this assumes that SHOULD_PROCESS_THESE_SHREDS is
753 : constant across batches. Otherwise, the condition may
754 : need to be removed (or adjusted). */
755 0 : if( FD_UNLIKELY( SHOULD_PROCESS_THESE_SHREDS ) ) {
756 : /* Ugh, yet another memcpy */
757 0 : fd_memcpy( ctx->pending_batch.payload + 0UL /* verbose */, entry, entry_sz );
758 0 : }
759 0 : ctx->pending_batch.slot = target_slot;
760 0 : ctx->pending_batch.pos = entry_sz;
761 0 : ctx->pending_batch.microblock_cnt = 1UL;
762 0 : ctx->pending_batch.txn_cnt = microblock->txn_cnt;
763 0 : }
764 0 : }
765 0 : } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
766 : /* The common case, from the net tile. The FEC resolver API does
767 : not present a prepare/commit model. If we get overrun between
768 : when the FEC resolver verifies the signature and when it stores
769 : the local copy, we could end up storing and retransmitting
770 : garbage. Instead we copy it locally, sadly, and only give it to
771 : the FEC resolver when we know it won't be overrun anymore. */
772 0 : uchar const * dcache_entry = fd_net_rx_translate_frag( &ctx->in[ in_idx ].net_rx, chunk, ctl, sz );
773 0 : ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
774 0 : FD_TEST( hdr_sz <= sz ); /* Should be ensured by the net tile */
775 0 : fd_shred_t const * shred = fd_shred_parse( dcache_entry+hdr_sz, sz-hdr_sz, ctx->shred_limit );
776 0 : if( FD_UNLIKELY( !shred ) ) {
777 0 : ctx->skip_frag = 1;
778 0 : return;
779 0 : };
780 :
781 0 : if( FD_UNLIKELY( fd_disco_netmux_sig_proto( sig )==DST_PROTO_REPAIR ) ) {
782 0 : ctx->metrics->repair_rcv_cnt++;
783 0 : ctx->metrics->repair_rcv_bytes += sz;
784 0 : } else {
785 0 : ctx->metrics->turbine_rcv_cnt++;
786 0 : ctx->metrics->turbine_rcv_bytes += sz;
787 0 : }
788 :
789 : /* Drop unchained merkle shreds */
790 0 : int is_unchained = !fd_shred_is_chained( fd_shred_type( shred->variant ) );
791 0 : if( FD_UNLIKELY( is_unchained ) ) {
792 0 : ctx->metrics->shred_rejected_unchained_cnt++;
793 0 : ctx->skip_frag = 1;
794 0 : return;
795 0 : };
796 :
797 : /* all shreds in the same FEC set will have the same signature
798 : so we can round-robin shreds between the shred tiles based on
799 : just the signature without splitting individual FEC sets. */
800 0 : ulong sig = fd_ulong_load_8( shred->signature );
801 0 : if( FD_LIKELY( sig%ctx->round_robin_cnt!=ctx->round_robin_id ) ) {
802 0 : ctx->skip_frag = 1;
803 0 : return;
804 0 : }
805 0 : fd_memcpy( ctx->shred_buffer, dcache_entry+hdr_sz, sz-hdr_sz );
806 0 : ctx->shred_buffer_sz = sz-hdr_sz;
807 0 : }
808 0 : }
809 :
810 : static inline void
811 : send_shred( fd_shred_ctx_t * ctx,
812 : fd_stem_context_t * stem,
813 : fd_shred_t const * shred,
814 : fd_shred_dest_weighted_t const * dest,
815 0 : ulong tsorig ) {
816 :
817 0 : if( FD_UNLIKELY( !dest->ip4 ) ) return;
818 :
819 0 : uchar * packet = fd_chunk_to_laddr( ctx->net_out_mem, ctx->net_out_chunk );
820 :
821 0 : int is_data = fd_shred_is_data( fd_shred_type( shred->variant ) );
822 0 : fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)packet;
823 0 : *hdr = *( is_data ? ctx->data_shred_net_hdr : ctx->parity_shred_net_hdr );
824 :
825 0 : fd_ip4_hdr_t * ip4 = hdr->ip4;
826 0 : ip4->daddr = dest->ip4;
827 0 : ip4->net_id = fd_ushort_bswap( ctx->net_id++ );
828 0 : ip4->check = 0U;
829 0 : ip4->check = fd_ip4_hdr_check_fast( ip4 );
830 :
831 0 : hdr->udp->net_dport = fd_ushort_bswap( dest->port );
832 :
833 0 : ulong shred_sz = fd_ulong_if( is_data, FD_SHRED_MIN_SZ, FD_SHRED_MAX_SZ );
834 0 : #if FD_HAS_AVX
835 : /* We're going to copy this shred potentially a bunch of times without
836 : reading it again, and we'd rather not thrash our cache, so we want
837 : to use non-temporal writes here. We need to make sure we don't
838 : touch the cache line containing the network headers that we just
839 : wrote to though. We know the destination is 64 byte aligned. */
840 0 : FD_STATIC_ASSERT( sizeof(*hdr)<64UL, non_temporal );
841 : /* src[0:sizeof(hdrs)] is invalid, but now we want to copy
842 : dest[i]=src[i] for i>=sizeof(hdrs), so it simplifies the code. */
843 0 : uchar const * src = (uchar const *)((ulong)shred - sizeof(fd_ip4_udp_hdrs_t));
844 0 : memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), src+sizeof(fd_ip4_udp_hdrs_t), 64UL-sizeof(fd_ip4_udp_hdrs_t) );
845 :
846 0 : ulong end_offset = shred_sz + sizeof(fd_ip4_udp_hdrs_t);
847 0 : ulong i;
848 0 : for( i=64UL; end_offset-i<64UL; i+=64UL ) {
849 0 : # if FD_HAS_AVX512
850 0 : _mm512_stream_si512( (void *)(packet+i ), _mm512_loadu_si512( (void const *)(src+i ) ) );
851 : # else
852 0 : _mm256_stream_si256( (void *)(packet+i ), _mm256_loadu_si256( (void const *)(src+i ) ) );
853 0 : _mm256_stream_si256( (void *)(packet+i+32UL), _mm256_loadu_si256( (void const *)(src+i+32UL) ) );
854 0 : # endif
855 0 : }
856 0 : _mm_sfence();
857 0 : fd_memcpy( packet+i, src+i, end_offset-i ); /* Copy the last partial cache line */
858 :
859 : #else
860 : fd_memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), shred, shred_sz );
861 : #endif
862 :
863 0 : ulong pkt_sz = shred_sz + sizeof(fd_ip4_udp_hdrs_t);
864 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
865 0 : ulong sig = fd_disco_netmux_sig( dest->ip4, dest->port, dest->ip4, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) );
866 0 : ulong const chunk = ctx->net_out_chunk;
867 0 : fd_stem_publish( stem, NET_OUT_IDX, sig, chunk, pkt_sz, 0UL, tsorig, tspub );
868 0 : ctx->net_out_chunk = fd_dcache_compact_next( chunk, pkt_sz, ctx->net_out_chunk0, ctx->net_out_wmark );
869 0 : }
870 :
871 : static void
872 : after_frag( fd_shred_ctx_t * ctx,
873 : ulong in_idx,
874 : ulong seq,
875 : ulong sig,
876 : ulong sz,
877 : ulong tsorig,
878 : ulong _tspub,
879 0 : fd_stem_context_t * stem ) {
880 0 : (void)seq;
881 0 : (void)sz;
882 0 : (void)tsorig;
883 0 : (void)_tspub;
884 :
885 0 : if( FD_UNLIKELY( ctx->skip_frag ) ) return;
886 :
887 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_CONTACT ) ) {
888 0 : finalize_new_cluster_contact_info( ctx );
889 0 : return;
890 0 : }
891 :
892 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_EPOCH ) ) {
893 0 : fd_stake_ci_epoch_msg_fini( ctx->stake_ci );
894 0 : return;
895 0 : }
896 :
897 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_STAKE ) ) {
898 0 : fd_stake_ci_stake_msg_fini( ctx->stake_ci );
899 0 : return;
900 0 : }
901 :
902 0 : if( FD_UNLIKELY( (ctx->in_kind[ in_idx ]==IN_KIND_ROOTED) | (ctx->in_kind[ in_idx ]==IN_KIND_ROOTEDH) ) ) {
903 0 : if( FD_LIKELY( (ctx->new_root > 0UL) & (ctx->new_root<ULONG_MAX) ) ) fd_fec_resolver_advance_slot_old( ctx->resolver, ctx->new_root );
904 0 : return;
905 0 : }
906 :
907 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_GOSSIP ) ) {
908 0 : if( ctx->gossip_upd_buf->tag==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO ) {
909 0 : fd_gossip_contact_info_t const * ci = ctx->gossip_upd_buf->contact_info->value;
910 0 : fd_ip4_port_t tvu_addr;
911 0 : tvu_addr.addr = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_TVU ].is_ipv6 ? 0U : ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_TVU ].ip4;
912 0 : tvu_addr.port = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_TVU ].port;
913 0 : if( !tvu_addr.l ){
914 0 : fd_stake_ci_dest_remove( ctx->stake_ci, fd_type_pun_const( ctx->gossip_upd_buf->origin ) );
915 0 : } else {
916 0 : fd_stake_ci_dest_update( ctx->stake_ci, fd_type_pun_const( ctx->gossip_upd_buf->origin ), tvu_addr.addr, fd_ushort_bswap( tvu_addr.port ) );
917 0 : }
918 0 : } else if( ctx->gossip_upd_buf->tag==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE ) {
919 0 : if( FD_UNLIKELY( !memcmp( ctx->identity_key->uc, ctx->gossip_upd_buf->origin, 32UL ) ) ) {
920 : /* If our own contact info was dropped, we update with dummy IP
921 : instead of removing since stake_ci expects our contact info
922 : in the sdests table all the time. fd_stake_ci_new initializes
923 : both ei->sdests with our contact info so this should always
924 : update (and not append). */
925 0 : fd_stake_ci_dest_update( ctx->stake_ci, fd_type_pun_const( ctx->gossip_upd_buf->origin ), 1U, 0U );
926 0 : } else {
927 0 : fd_stake_ci_dest_remove( ctx->stake_ci, fd_type_pun_const( ctx->gossip_upd_buf->origin ) );
928 0 : }
929 0 : }
930 0 : return;
931 0 : }
932 :
933 0 : if( FD_UNLIKELY( (ctx->in_kind[ in_idx ]==IN_KIND_POH) & (ctx->send_fec_set_cnt==0UL) ) ) {
934 : /* Entry from PoH that didn't trigger a new FEC set to be made */
935 0 : return;
936 0 : }
937 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPAIR ) ) {
938 0 : return;
939 0 : }
940 :
941 0 : ulong fanout = 200UL; /* Default Agave's DATA_PLANE_FANOUT = 200UL */
942 :
943 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
944 0 : uchar * shred_buffer = ctx->shred_buffer;
945 0 : ulong shred_buffer_sz = ctx->shred_buffer_sz;
946 :
947 0 : fd_shred_t const * shred = fd_shred_parse( shred_buffer, shred_buffer_sz, ctx->shred_limit );
948 :
949 0 : if( FD_UNLIKELY( !shred ) ) { ctx->metrics->shred_processing_result[ 1 ]++; return; }
950 :
951 0 : fd_epoch_leaders_t const * lsched = fd_stake_ci_get_lsched_for_slot( ctx->stake_ci, shred->slot );
952 0 : if( FD_UNLIKELY( !lsched ) ) { ctx->metrics->shred_processing_result[ 0 ]++; return; }
953 :
954 0 : fd_pubkey_t const * slot_leader = fd_epoch_leaders_get( lsched, shred->slot );
955 0 : if( FD_UNLIKELY( !slot_leader ) ) { ctx->metrics->shred_processing_result[ 0 ]++; return; } /* Count this as bad slot too */
956 :
957 0 : fd_fec_set_t const * out_fec_set[1];
958 0 : fd_shred_t const * out_shred[1];
959 0 : fd_fec_resolver_spilled_t spilled_fec = { 0 };
960 0 : int from_repair = 0;
961 :
962 0 : uint nonce = UINT_MAX;
963 0 : ulong shred_sz = fd_shred_sz( shred );
964 0 : if( FD_UNLIKELY( (fd_disco_netmux_sig_proto( sig )==DST_PROTO_REPAIR) & (shred_buffer_sz>=shred_sz+sizeof(uint)) ) ) {
965 0 : nonce = FD_LOAD(uint, shred_buffer + shred_sz );
966 0 : long est_now_ns = fd_log_wallclock(); /* TODO: switch to fd_clock for performance */
967 0 : int slot_complete = fd_shred_is_data( fd_shred_type( shred->variant ) ) && (shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE);
968 0 : int nonce_okay = fd_rnonce_ss_verify( ctx->repair_nonce_ss, nonce, shred->slot, shred->idx, slot_complete, est_now_ns );
969 0 : ctx->metrics->bad_nonce += (ulong)(!nonce_okay);
970 0 : from_repair = nonce_okay;
971 0 : }
972 :
973 0 : long add_shred_timing = -fd_tickcount();
974 0 : int rv = fd_fec_resolver_add_shred( ctx->resolver, shred, shred_buffer_sz, from_repair, slot_leader->uc, out_fec_set, out_shred, &ctx->out_merkle_roots[0], &spilled_fec );
975 0 : add_shred_timing += fd_tickcount();
976 :
977 0 : fd_histf_sample( ctx->metrics->add_shred_timing, (ulong)add_shred_timing );
978 0 : ctx->metrics->shred_processing_result[ rv + FD_FEC_RESOLVER_ADD_SHRED_RETVAL_OFF+FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT ]++;
979 :
980 0 : if( FD_UNLIKELY( ctx->shred_out_idx!=ULONG_MAX && /* Only send to repair in full Firedancer */
981 0 : spilled_fec.slot!=0 ) ) {
982 : /* We've spilled an in-progress FEC set in the fec_resolver. We
983 : need to let repair know to clear out it's cached info for that
984 : fec set and re-repair those shreds. */
985 0 : fd_fec_evicted_t * evicted_msg = (fd_fec_evicted_t *)fd_type_pun( fd_chunk_to_laddr( ctx->shred_out_mem, ctx->shred_out_chunk ) );
986 0 : evicted_msg->slot = spilled_fec.slot;
987 0 : evicted_msg->fec_set_idx = spilled_fec.fec_set_idx;
988 :
989 0 : fd_stem_publish( stem, ctx->shred_out_idx, SHRED_SIG_FEC_EVICTED, ctx->shred_out_chunk, sizeof(fd_fec_evicted_t), 0, ctx->tsorig, fd_frag_meta_ts_comp( fd_tickcount() ) );
990 0 : ctx->shred_out_chunk = fd_dcache_compact_next( ctx->shred_out_chunk, sizeof(fd_fec_evicted_t), ctx->shred_out_chunk0, ctx->shred_out_wmark );
991 0 : }
992 :
993 0 : if( FD_LIKELY( ctx->shred_out_idx!=ULONG_MAX /* Only send to repair/replay in full Firedancer */
994 0 : && ( ( rv==FD_FEC_RESOLVER_SHRED_OKAY )
995 0 : | ( rv==FD_FEC_RESOLVER_SHRED_COMPLETES )
996 0 : | ( rv==FD_FEC_RESOLVER_SHRED_DUPLICATE )
997 0 : | ( rv==FD_FEC_RESOLVER_SHRED_EQUIVOC ) ) ) ) {
998 :
999 : /* Construct the sig from fec_resolver result and shred source. */
1000 :
1001 0 : ulong _sig = fd_disco_netmux_sig_proto( sig )==DST_PROTO_REPAIR
1002 0 : ? ( from_repair /*nonce_okay*/ ? SHRED_SIG_SRC_REPAIR : SHRED_SIG_SRC_BAD_REPAIR )
1003 0 : : ( SHRED_SIG_SRC_TURBINE );
1004 0 : _sig = ((ulong)rv << 32UL) | _sig;
1005 :
1006 : /* Copy the full shred into the frag and publish. */
1007 :
1008 0 : fd_shred_base_t * shred_msg = (fd_shred_base_t *)fd_chunk_to_laddr( ctx->shred_out_mem, ctx->shred_out_chunk );
1009 0 : memcpy( shred_msg->shred_, shred, fd_shred_sz( shred ) );
1010 0 : memcpy( &shred_msg->merkle_root, ctx->out_merkle_roots[0].hash, sizeof(fd_hash_t) );
1011 0 : if( FD_UNLIKELY( fd_disco_netmux_sig_proto( sig )==DST_PROTO_REPAIR ) ) { shred_msg->rnonce = nonce; }
1012 :
1013 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
1014 0 : fd_stem_publish( stem, ctx->shred_out_idx, _sig, ctx->shred_out_chunk, sizeof(fd_shred_base_t), 0UL, ctx->tsorig, tspub );
1015 0 : ctx->shred_out_chunk = fd_dcache_compact_next( ctx->shred_out_chunk, sizeof(fd_shred_base_t), ctx->shred_out_chunk0, ctx->shred_out_wmark );
1016 0 : }
1017 :
1018 0 : if( FD_LIKELY( fd_disco_netmux_sig_proto( sig ) != DST_PROTO_REPAIR &&
1019 0 : ( (rv==FD_FEC_RESOLVER_SHRED_OKAY) | (rv==FD_FEC_RESOLVER_SHRED_COMPLETES) ) ) ) {
1020 : /* Relay this shred */
1021 0 : ulong max_dest_cnt[1];
1022 0 : do {
1023 : /* If we've validated the shred and it COMPLETES but we can't
1024 : compute the destination for whatever reason, don't forward
1025 : the shred, but still send it to the blockstore. */
1026 0 : fd_shred_dest_t * sdest = fd_stake_ci_get_sdest_for_slot( ctx->stake_ci, shred->slot );
1027 0 : if( FD_UNLIKELY( !sdest ) ) break;
1028 0 : fd_shred_dest_idx_t * dests = fd_shred_dest_compute_children( sdest, &shred, 1UL, ctx->scratchpad_dests, 1UL, fanout, fanout, max_dest_cnt );
1029 0 : if( FD_UNLIKELY( !dests ) ) break;
1030 :
1031 0 : for( ulong i=0UL; i<ctx->adtl_dests_retransmit_cnt; i++ ) send_shred( ctx, stem, *out_shred, ctx->adtl_dests_retransmit+i, ctx->tsorig );
1032 0 : for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, stem, *out_shred, fd_shred_dest_idx_to_dest( sdest, dests[ j ] ), ctx->tsorig );
1033 0 : } while( 0 );
1034 0 : }
1035 :
1036 0 : if( FD_LIKELY( rv!=FD_FEC_RESOLVER_SHRED_COMPLETES ) ) return;
1037 :
1038 0 : FD_TEST( ctx->fec_sets <= *out_fec_set );
1039 0 : ctx->send_fec_set_idx[ 0UL ] = (ulong)(*out_fec_set - ctx->fec_sets);
1040 0 : ctx->send_fec_set_cnt = 1UL;
1041 0 : ctx->shredded_txn_cnt = 0UL;
1042 0 : }
1043 :
1044 0 : if( FD_UNLIKELY( ctx->send_fec_set_cnt==0UL ) ) return;
1045 :
1046 : /* Try to distribute shredded txn count across the fec sets.
1047 : This is an approximation, but it is acceptable. */
1048 0 : ulong shredded_txn_cnt_per_fec_set = ctx->shredded_txn_cnt / ctx->send_fec_set_cnt;
1049 0 : ulong shredded_txn_cnt_remain = ctx->shredded_txn_cnt - shredded_txn_cnt_per_fec_set * ctx->send_fec_set_cnt;
1050 0 : ulong shredded_txn_cnt_last_fec_set = shredded_txn_cnt_per_fec_set + shredded_txn_cnt_remain;
1051 :
1052 : /* If this shred completes a FEC set or is part of a microblock from
1053 : pack (ie. we're leader), we now have a full FEC set: so we notify
1054 : repair and insert into the blockstore, as well as retransmit. */
1055 :
1056 0 : for( ulong fset_k=0; fset_k<ctx->send_fec_set_cnt; fset_k++ ) {
1057 :
1058 0 : fd_fec_set_t * set = ctx->fec_sets + ctx->send_fec_set_idx[ fset_k ];
1059 :
1060 0 : fd_shred_t const * last = set->data_shreds[ FD_FEC_SHRED_CNT - 1 ].s;
1061 :
1062 : /* Compute merkle root and chained merkle root. */
1063 :
1064 0 : int replay_fwd = 1;
1065 0 : if( FD_LIKELY( ctx->store ) ) { /* firedancer-only */
1066 :
1067 0 : set->leader_bank = NULL; /* un-used by firedancer */
1068 :
1069 : /* Insert shreds into the store. We do this regardless of whether
1070 : we are leader. */
1071 :
1072 0 : fd_store_fec_t * fec = fd_store_insert( ctx->store, ctx->round_robin_id, (fd_hash_t *)fd_type_pun( &ctx->out_merkle_roots[fset_k] ) );
1073 :
1074 : /* Firedancer is configured such that the store never fills up, as
1075 : the reasm is responsible for also evicting from store (based on
1076 : its eviction policy, see fd_reasm.h). fec is only NULL when the
1077 : store is full, so this is either a bug or misconfiguration. */
1078 :
1079 0 : if( FD_UNLIKELY( !fec ) ) FD_LOG_CRIT(( "store full" ));
1080 :
1081 : /* It's safe to memcpy the FEC payload outside of the shared lock,
1082 : because the store ele is guaranteed to remain valid here. It
1083 : is not possible for a fd_store_remove to interleave, because
1084 : remove is only called by replay_tile, which (crucially) is only
1085 : sent this FEC via stem publish after we have finished copying.
1086 :
1087 : Copying outside the shared lock scope also means that we can
1088 : lower the duration for which the shared lock is held, and
1089 : enables replay to acquire the exclusive lock for removes
1090 : without getting starved. */
1091 :
1092 : /* if data_sz is non-zero, we've already inserted this FEC set into the store */
1093 0 : if( FD_UNLIKELY( fec->data_sz ) ) replay_fwd = 0;
1094 0 : else {
1095 0 : for( ulong i=0UL; i<FD_FEC_SHRED_CNT; i++ ) {
1096 0 : fd_shred_t * data_shred = set->data_shreds[i].s;
1097 0 : ulong payload_sz = fd_shred_payload_sz( data_shred );
1098 0 : if( FD_UNLIKELY( fec->data_sz + payload_sz > ctx->store->fec_data_max ) ) {
1099 :
1100 : /* This code is only reachable if shred tile has completed the
1101 : FEC set, which implies it was able to validate it, yet
1102 : somehow the total payload sz of this FEC set exceeds the
1103 : maximum payload sz. This indicates either a serious bug or
1104 : shred tile is compromised so FD_LOG_CRIT. */
1105 :
1106 0 : FD_LOG_CRIT(( "Shred tile %lu: completed FEC set %lu %u data_sz: %lu exceeds data_max: %lu. Ignoring FEC set.", ctx->round_robin_id, data_shred->slot, data_shred->fec_set_idx, fec->data_sz + payload_sz, ctx->store->fec_data_max ));
1107 0 : }
1108 0 : fd_memcpy( fd_store_fec_data( ctx->store, fec ) + fec->data_sz, fd_shred_data_payload( data_shred ), payload_sz );
1109 0 : fec->data_sz += payload_sz;
1110 0 : if( FD_LIKELY( i<32UL ) ) fec->shred_offs[ i ] = (uint)payload_sz + (i==0UL ? 0U : fec->shred_offs[ i-1UL ]);
1111 0 : }
1112 0 : }
1113 0 : }
1114 :
1115 0 : if( FD_LIKELY( ctx->shred_out_idx!=ULONG_MAX && replay_fwd ) ) { /* firedancer-only */
1116 :
1117 : /* Send all of the data shred headers we recovered (weren't received) */
1118 0 : for( int i=0; i<32; i++ ) {
1119 0 : if( fd_uint_extract_bit( set->data_shred_rcvd, i )==0 ) {
1120 0 : fd_shred_t * const missing = &set->data_shreds[ i ].s[0];
1121 :
1122 0 : ulong sig = ((ulong)FD_FEC_RESOLVER_SHRED_COMPLETES << 32UL) | SHRED_SIG_SRC_RECONSTRUCTED;
1123 :
1124 0 : fd_shred_base_t * shred_msg = (fd_shred_base_t *)fd_chunk_to_laddr( ctx->shred_out_mem, ctx->shred_out_chunk );
1125 0 : memcpy( shred_msg->shred_, missing, fd_shred_sz( missing ) );
1126 0 : memcpy( &shred_msg->merkle_root, ctx->out_merkle_roots[fset_k].hash, sizeof(fd_hash_t) );
1127 :
1128 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
1129 0 : fd_stem_publish( stem, ctx->shred_out_idx, sig, ctx->shred_out_chunk, sizeof(fd_shred_base_t), 0UL, ctx->tsorig, tspub );
1130 0 : ctx->shred_out_chunk = fd_dcache_compact_next( ctx->shred_out_chunk, sizeof(fd_shred_base_t), ctx->shred_out_chunk0, ctx->shred_out_wmark );
1131 0 : }
1132 0 : }
1133 :
1134 : /* Additionally, publish a frag to notify repair and replay that
1135 : the FEC set is complete. Note the ordering wrt store shred
1136 : insertion above is intentional: shreds are inserted into the
1137 : store before notifying repair and replay. This is because the
1138 : replay tile assumes the shreds are already in the store when
1139 : replay gets a notification from the shred tile that the FEC is
1140 : complete. We we don't know whether shred will finish inserting
1141 : into store first or repair will finish validating the FEC set
1142 : first. The header and merkle root of the last shred in the FEC
1143 : set are sent as part of this frag.
1144 :
1145 : This message, the shred msg, and the FEC evict msg constitute
1146 : the max 3 possible messages to repair/replay per after_frag.
1147 : In reality, it is only possible to publish all 3 in the case
1148 : where we receive a coding shred first for a FEC set where
1149 : (N=1,K=18), which allows for the FEC set to be instantly
1150 : completed by the singular coding shred, and that also happens
1151 : to evict a FEC set from the curr_map. When fix-32 arrives, the
1152 : link burst value can be lowered to 2. */
1153 0 : ulong sig = ctx->in_kind[ in_idx ]==IN_KIND_POH ? SHRED_SIG_FEC_COMPLETE_LEADER : SHRED_SIG_FEC_COMPLETE;
1154 :
1155 0 : fd_fec_complete_t * complete_msg = fd_chunk_to_laddr( ctx->shred_out_mem, ctx->shred_out_chunk );
1156 0 : complete_msg->last_shred_hdr = *last;
1157 0 : memcpy( &complete_msg->merkle_root, ctx->out_merkle_roots[fset_k].hash, sizeof(fd_hash_t) );
1158 0 : complete_msg->chained_merkle_root = *(fd_hash_t *)fd_type_pun((uchar *)last + fd_shred_chain_off( last->variant ));
1159 :
1160 0 : fd_stem_publish( stem, ctx->shred_out_idx, sig, ctx->shred_out_chunk, sizeof(fd_fec_complete_t), 0UL, ctx->tsorig, fd_frag_meta_ts_comp( fd_tickcount() ) );
1161 0 : ctx->shred_out_chunk = fd_dcache_compact_next( ctx->shred_out_chunk, sizeof(fd_fec_complete_t), ctx->shred_out_chunk0, ctx->shred_out_wmark );
1162 :
1163 0 : } else if( FD_UNLIKELY( ctx->store_out_idx != ULONG_MAX ) ) { /* frankendancer-only */
1164 :
1165 : /* Send to the blockstore */
1166 :
1167 0 : ulong txn_cnt = fd_ulong_if( fset_k==ctx->send_fec_set_cnt-1UL, shredded_txn_cnt_last_fec_set, shredded_txn_cnt_per_fec_set );
1168 : /* If the low 32 bits of sig are 0, the store tile will do extra
1169 : checks */
1170 0 : ulong new_sig = txn_cnt<<32 | (ulong)(ctx->in_kind[ in_idx ]!=IN_KIND_NET);
1171 :
1172 : /* Attach the leader bank pointer and merkle root so that the
1173 : store tile can set the block_id for the slot. Network
1174 : FEC sets have no leader bank. */
1175 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH ) ) {
1176 0 : set->leader_bank = ctx->leader_bank;
1177 0 : memcpy( set->merkle_root, ctx->out_merkle_roots[fset_k].hash, 32UL );
1178 0 : } else {
1179 0 : set->leader_bank = NULL;
1180 0 : }
1181 :
1182 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
1183 : /* The size is actually slightly larger than USHORT_MAX, but the store tile
1184 : knows to use sizeof(fd_fec_set_t) instead of the sz field. Put
1185 : USHORT_MAX so that monitoring tools are at least close. */
1186 0 : ulong sz = fd_ulong_min( sizeof(fd_fec_set_t), USHORT_MAX );
1187 0 : fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, set ), sz, 0UL, ctx->tsorig, tspub );
1188 :
1189 : /* Store tile will release the bank pointer when it sees
1190 : SLOT_COMPLETE FEC set. So we reset our tracking. */
1191 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH &&
1192 0 : (set->data_shreds[ FD_FEC_SHRED_CNT-1UL ].s->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE) ) ) {
1193 0 : ctx->leader_bank = NULL;
1194 0 : }
1195 0 : }
1196 :
1197 : /* Compute all the destinations for all the new shreds */
1198 :
1199 0 : fd_shred_t const * new_shreds[ FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX ];
1200 0 : ulong k=0UL;
1201 0 : for( ulong i=0UL; i<FD_FEC_SHRED_CNT; i++ )
1202 0 : if( !(set->data_shred_rcvd & (1U<<i)) ) new_shreds[ k++ ] = set->data_shreds [ i ].s;
1203 0 : for( ulong i=0UL; i<FD_FEC_SHRED_CNT; i++ )
1204 0 : if( !(set->parity_shred_rcvd & (1U<<i)) ) new_shreds[ k++ ] = set->parity_shreds[ i ].s;
1205 :
1206 0 : if( FD_UNLIKELY( !k ) ) return;
1207 0 : fd_shred_dest_t * sdest = fd_stake_ci_get_sdest_for_slot( ctx->stake_ci, new_shreds[ 0 ]->slot );
1208 0 : if( FD_UNLIKELY( !sdest ) ) return;
1209 :
1210 0 : ulong out_stride;
1211 0 : ulong max_dest_cnt[1];
1212 0 : fd_shred_dest_idx_t * dests;
1213 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
1214 0 : for( ulong i=0UL; i<k; i++ ) {
1215 0 : for( ulong j=0UL; j<ctx->adtl_dests_retransmit_cnt; j++ ) send_shred( ctx, stem, new_shreds[ i ], ctx->adtl_dests_retransmit+j, ctx->tsorig );
1216 0 : }
1217 0 : out_stride = k;
1218 : /* In the case of feature activation, the fanout used below is
1219 : the same as the one calculated/modified previously at the
1220 : beginning of after_frag() for IN_KIND_NET in this slot. */
1221 0 : dests = fd_shred_dest_compute_children( sdest, new_shreds, k, ctx->scratchpad_dests, k, fanout, fanout, max_dest_cnt );
1222 0 : } else {
1223 0 : for( ulong i=0UL; i<k; i++ ) {
1224 0 : for( ulong j=0UL; j<ctx->adtl_dests_leader_cnt; j++ ) send_shred( ctx, stem, new_shreds[ i ], ctx->adtl_dests_leader+j, ctx->tsorig );
1225 0 : }
1226 0 : out_stride = 1UL;
1227 0 : *max_dest_cnt = 1UL;
1228 0 : dests = fd_shred_dest_compute_first ( sdest, new_shreds, k, ctx->scratchpad_dests );
1229 0 : }
1230 0 : if( FD_UNLIKELY( !dests ) ) return;
1231 :
1232 : /* Send only the ones we didn't receive. */
1233 0 : for( ulong i=0UL; i<k; i++ ) {
1234 0 : for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, stem, new_shreds[ i ], fd_shred_dest_idx_to_dest( sdest, dests[ j*out_stride+i ]), ctx->tsorig );
1235 0 : }
1236 0 : }
1237 0 : }
1238 :
1239 : static void
1240 : privileged_init( fd_topo_t const * topo,
1241 0 : fd_topo_tile_t const * tile ) {
1242 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1243 0 : FD_TEST( scratch!=NULL );
1244 :
1245 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1246 0 : fd_shred_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_shred_ctx_t ), sizeof( fd_shred_ctx_t ) );
1247 :
1248 0 : if( FD_UNLIKELY( !strcmp( tile->shred.identity_key_path, "" ) ) )
1249 0 : FD_LOG_ERR(( "identity_key_path not set" ));
1250 :
1251 0 : ctx->identity_key[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->shred.identity_key_path, /* pubkey only: */ 1 ) );
1252 :
1253 0 : if( FD_UNLIKELY( !fd_rng_secure( &(ctx->resolver_seed), sizeof(ulong) ) ) ) {
1254 0 : FD_LOG_CRIT(( "fd_rng_secure failed" ));
1255 0 : }
1256 : /* This is only needed in frankendancer, but we'll overwrite it with
1257 : the value the repair tile generated in full firedancer. */
1258 0 : if( FD_UNLIKELY( !fd_rng_secure( ctx->repair_nonce_ss->bytes, sizeof(fd_rnonce_ss_t) ) ) ) {
1259 0 : FD_LOG_CRIT(( "fd_rng_secure failed" ));
1260 0 : }
1261 0 : }
1262 :
1263 : static void
1264 : fd_shred_signer( void * signer_ctx,
1265 : uchar signature[ static 64 ],
1266 0 : uchar const merkle_root[ static 32 ] ) {
1267 0 : fd_keyguard_client_sign( signer_ctx, signature, merkle_root, 32UL, FD_KEYGUARD_SIGN_TYPE_ED25519 );
1268 0 : }
1269 :
1270 : static void
1271 : unprivileged_init( fd_topo_t const * topo,
1272 0 : fd_topo_tile_t const * tile ) {
1273 :
1274 0 : FD_TEST( 0==strcmp( topo->links[tile->out_link_id[ NET_OUT_IDX ]].name, "shred_net" ) );
1275 0 : FD_TEST( 0==strcmp( topo->links[tile->out_link_id[ SIGN_OUT_IDX ]].name, "shred_sign" ) );
1276 :
1277 0 : if( FD_UNLIKELY( !tile->out_cnt ) )
1278 0 : FD_LOG_ERR(( "shred tile has no primary output link" ));
1279 :
1280 0 : ulong shred_store_mcache_depth = tile->shred.depth;
1281 0 : if( topo->links[ tile->out_link_id[ 0 ] ].depth != shred_store_mcache_depth )
1282 0 : FD_LOG_ERR(( "shred tile out depths are not equal %lu %lu",
1283 0 : topo->links[ tile->out_link_id[ 0 ] ].depth, shred_store_mcache_depth ));
1284 :
1285 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1286 0 : FD_TEST( scratch!=NULL );
1287 :
1288 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1289 0 : fd_shred_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_shred_ctx_t ), sizeof( fd_shred_ctx_t ) );
1290 :
1291 0 : ctx->round_robin_cnt = fd_topo_tile_name_cnt( topo, tile->name );
1292 0 : ctx->round_robin_id = tile->kind_id;
1293 0 : ctx->batch_cnt = 0UL;
1294 0 : ctx->slot = ULONG_MAX;
1295 :
1296 : /* If the default partial_depth is ever changed, correspondingly
1297 : change the size of the fd_fec_intra_pool in fd_fec_repair. */
1298 0 : ulong fec_resolver_footprint = fd_fec_resolver_footprint( tile->shred.fec_resolver_depth, 1UL, shred_store_mcache_depth + 1UL,
1299 0 : 128UL * tile->shred.fec_resolver_depth );
1300 : /* See long comment at the top of this file for the computation of
1301 : fec_set_cnt. */
1302 0 : ulong fec_set_cnt = 2UL*shred_store_mcache_depth + tile->shred.fec_resolver_depth + FD_SHRED_BATCH_FEC_SETS_MAX + 2UL;
1303 0 : ulong fec_sets_required_sz = fec_set_cnt*sizeof(fd_fec_set_t);
1304 :
1305 0 : void * fec_sets_shmem = NULL;
1306 0 : ctx->shred_out_idx = fd_topo_find_tile_out_link( topo, tile, "shred_out", ctx->round_robin_id );
1307 0 : ctx->store_out_idx = fd_topo_find_tile_out_link( topo, tile, "shred_store", ctx->round_robin_id );
1308 0 : if( FD_LIKELY( ctx->shred_out_idx!=ULONG_MAX ) ) { /* firedancer-only */
1309 0 : fd_topo_link_t const * shred_out = &topo->links[ tile->out_link_id[ ctx->shred_out_idx ] ];
1310 0 : ctx->shred_out_mem = topo->workspaces[ topo->objs[ shred_out->dcache_obj_id ].wksp_id ].wksp;
1311 0 : ctx->shred_out_chunk0 = fd_dcache_compact_chunk0( ctx->shred_out_mem, shred_out->dcache );
1312 0 : ctx->shred_out_wmark = fd_dcache_compact_wmark ( ctx->shred_out_mem, shred_out->dcache, shred_out->mtu );
1313 0 : ctx->shred_out_chunk = ctx->shred_out_chunk0;
1314 0 : FD_TEST( fd_dcache_compact_is_safe( ctx->shred_out_mem, shred_out->dcache, shred_out->mtu, shred_out->depth ) );
1315 0 : ulong fec_sets_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "fec_sets" );
1316 0 : if( FD_UNLIKELY( fec_sets_obj_id == ULONG_MAX ) ) FD_LOG_ERR(( "invalid firedancer topo" ));
1317 0 : fd_topo_obj_t const * obj = &topo->objs[ fec_sets_obj_id ];
1318 0 : if( FD_UNLIKELY( obj->footprint<(fec_sets_required_sz*ctx->round_robin_cnt) ) ) {
1319 0 : FD_LOG_ERR(( "fec_sets wksp obj too small. It is %lu bytes but must be at least %lu bytes. ",
1320 0 : obj->footprint,
1321 0 : fec_sets_required_sz ));
1322 0 : }
1323 0 : fec_sets_shmem = (uchar *)fd_topo_obj_laddr( topo, fec_sets_obj_id ) + (ctx->round_robin_id * fec_sets_required_sz);
1324 :
1325 0 : ulong rnonce_ss_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "rnonce_ss" );
1326 0 : FD_TEST( rnonce_ss_id!=ULONG_MAX );
1327 0 : memcpy( ctx->repair_nonce_ss, fd_topo_obj_laddr( topo, rnonce_ss_id ), sizeof(fd_rnonce_ss_t) );
1328 :
1329 0 : } else if ( FD_LIKELY( ctx->store_out_idx!=ULONG_MAX ) ) { /* frankendancer-only */
1330 0 : FD_TEST( 0==strcmp( topo->links[tile->out_link_id[ ctx->store_out_idx ]].name, "shred_store" ) );
1331 0 : fec_sets_shmem = topo->links[ tile->out_link_id[ ctx->store_out_idx ] ].dcache;
1332 0 : if( FD_UNLIKELY( fd_dcache_data_sz( fec_sets_shmem )<fec_sets_required_sz ) ) {
1333 0 : FD_LOG_ERR(( "shred_store dcache too small. It is %lu bytes but must be at least %lu bytes. ",
1334 0 : fd_dcache_data_sz( fec_sets_shmem ),
1335 0 : fec_sets_required_sz ));
1336 0 : }
1337 0 : }
1338 :
1339 0 : if( FD_UNLIKELY( !tile->shred.fec_resolver_depth ) ) FD_LOG_ERR(( "fec_resolver_depth not set" ));
1340 0 : if( FD_UNLIKELY( !tile->shred.shred_listen_port ) ) FD_LOG_ERR(( "shred_listen_port not set" ));
1341 :
1342 0 : void * _stake_ci = FD_SCRATCH_ALLOC_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
1343 0 : void * _resolver = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_resolver_align(), fec_resolver_footprint );
1344 0 : void * _shredder = FD_SCRATCH_ALLOC_APPEND( l, fd_shredder_align(), fd_shredder_footprint() );
1345 :
1346 0 : fd_fec_set_t * fec_sets = (fd_fec_set_t *)fec_sets_shmem;
1347 :
1348 0 : #define NONNULL( x ) (__extension__({ \
1349 0 : __typeof__((x)) __x = (x); \
1350 0 : if( FD_UNLIKELY( !__x ) ) FD_LOG_ERR(( #x " was unexpectedly NULL" )); \
1351 0 : __x; }))
1352 :
1353 0 : int has_ipecho_in = fd_topo_find_tile_in_link( topo, tile, "ipecho_out", 0UL )!=ULONG_MAX;
1354 0 : ushort expected_shred_version = tile->shred.expected_shred_version;
1355 0 : if( FD_UNLIKELY( !has_ipecho_in && !expected_shred_version ) ) {
1356 0 : ulong busy_obj_id = fd_pod_query_ulong( topo->props, "pohh_shred", ULONG_MAX );
1357 0 : FD_TEST( busy_obj_id!=ULONG_MAX );
1358 0 : ulong * gossip_shred_version = fd_fseq_join( fd_topo_obj_laddr( topo, busy_obj_id ) );
1359 0 : FD_LOG_INFO(( "Waiting for shred version to be determined via gossip." ));
1360 0 : ulong _expected_shred_version = ULONG_MAX;
1361 0 : do {
1362 0 : _expected_shred_version = FD_VOLATILE_CONST( *gossip_shred_version );
1363 0 : } while( _expected_shred_version==ULONG_MAX );
1364 :
1365 0 : if( FD_UNLIKELY( _expected_shred_version>USHORT_MAX ) ) FD_LOG_ERR(( "invalid shred version %lu", _expected_shred_version ));
1366 0 : FD_LOG_INFO(( "Using shred version %hu", (ushort)_expected_shred_version ));
1367 0 : expected_shred_version = (ushort)_expected_shred_version;
1368 0 : }
1369 :
1370 0 : ctx->keyswitch = fd_keyswitch_join( fd_topo_obj_laddr( topo, tile->id_keyswitch_obj_id ) );
1371 0 : FD_TEST( ctx->keyswitch );
1372 :
1373 : /* populate ctx */
1374 0 : ulong sign_in_idx = fd_topo_find_tile_in_link( topo, tile, "sign_shred", tile->kind_id );
1375 0 : FD_TEST( sign_in_idx!=ULONG_MAX );
1376 0 : fd_topo_link_t const * sign_in = &topo->links[ tile->in_link_id[ sign_in_idx ] ];
1377 0 : fd_topo_link_t const * sign_out = &topo->links[ tile->out_link_id[ SIGN_OUT_IDX ] ];
1378 0 : NONNULL( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client,
1379 0 : sign_out->mcache,
1380 0 : sign_out->dcache,
1381 0 : sign_in->mcache,
1382 0 : sign_in->dcache,
1383 0 : sign_out->mtu ) ) );
1384 :
1385 0 : ulong shred_limit = fd_ulong_if( tile->shred.larger_shred_limits_per_block, 32UL*32UL*1024UL, 32UL*1024UL );
1386 0 : ctx->shred_limit = shred_limit;
1387 0 : fd_fec_set_t * resolver_sets = fec_sets + shred_store_mcache_depth + FD_SHRED_BATCH_FEC_SETS_MAX;
1388 0 : ctx->shredder = NONNULL( fd_shredder_join ( fd_shredder_new ( _shredder, fd_shred_signer, ctx->keyguard_client ) ) );
1389 0 : ctx->resolver = NONNULL( fd_fec_resolver_join ( fd_fec_resolver_new ( _resolver,
1390 0 : fd_shred_signer, ctx->keyguard_client,
1391 0 : tile->shred.fec_resolver_depth, 1UL,
1392 0 : shred_store_mcache_depth+1UL,
1393 0 : 128UL * tile->shred.fec_resolver_depth, resolver_sets,
1394 0 : shred_limit,
1395 0 : ctx->resolver_seed ) ) );
1396 :
1397 0 : if( FD_LIKELY( !!expected_shred_version ) ) {
1398 0 : fd_shredder_set_shred_version ( ctx->shredder, expected_shred_version );
1399 0 : fd_fec_resolver_set_shred_version( ctx->resolver, expected_shred_version );
1400 0 : }
1401 :
1402 0 : ctx->fec_sets = fec_sets;
1403 :
1404 0 : ctx->stake_ci = fd_stake_ci_join( fd_stake_ci_new( _stake_ci, ctx->identity_key ) );
1405 :
1406 0 : ctx->net_id = (ushort)0;
1407 :
1408 0 : fd_ip4_udp_hdr_init( ctx->data_shred_net_hdr, FD_SHRED_MIN_SZ, 0, tile->shred.shred_listen_port );
1409 0 : fd_ip4_udp_hdr_init( ctx->parity_shred_net_hdr, FD_SHRED_MAX_SZ, 0, tile->shred.shred_listen_port );
1410 :
1411 0 : ctx->adtl_dests_retransmit_cnt = tile->shred.adtl_dests_retransmit_cnt;
1412 0 : for( ulong i=0UL; i<ctx->adtl_dests_retransmit_cnt; i++) {
1413 0 : ctx->adtl_dests_retransmit[ i ].ip4 = tile->shred.adtl_dests_retransmit[ i ].ip;
1414 0 : ctx->adtl_dests_retransmit[ i ].port = tile->shred.adtl_dests_retransmit[ i ].port;
1415 0 : }
1416 0 : ctx->adtl_dests_leader_cnt = tile->shred.adtl_dests_leader_cnt;
1417 0 : for( ulong i=0UL; i<ctx->adtl_dests_leader_cnt; i++) {
1418 0 : ctx->adtl_dests_leader[i].ip4 = tile->shred.adtl_dests_leader[i].ip;
1419 0 : ctx->adtl_dests_leader[i].port = tile->shred.adtl_dests_leader[i].port;
1420 0 : }
1421 :
1422 0 : uchar has_contact_info_in = 0;
1423 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
1424 0 : fd_topo_link_t const * link = &topo->links[ tile->in_link_id[ i ] ];
1425 0 : fd_topo_wksp_t const * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
1426 :
1427 0 : if( FD_LIKELY( !strcmp( link->name, "net_shred" ) ) ) {
1428 0 : ctx->in_kind[ i ] = IN_KIND_NET;
1429 0 : fd_net_rx_bounds_init( &ctx->in[ i ].net_rx, link->dcache );
1430 0 : continue; /* only net_rx needs to be set in this case. */
1431 0 : }
1432 0 : else if( FD_LIKELY( !strcmp( link->name, "poh_shred" ) ) ) ctx->in_kind[ i ] = IN_KIND_POH; /* Firedancer */
1433 0 : else if( FD_LIKELY( !strcmp( link->name, "pohh_shred" ) ) ) ctx->in_kind[ i ] = IN_KIND_POH; /* Frankendancer */
1434 0 : else if( FD_LIKELY( !strcmp( link->name, "stake_out" ) ) ) ctx->in_kind[ i ] = IN_KIND_STAKE; /* Frankendancer */
1435 0 : else if( FD_LIKELY( !strcmp( link->name, "replay_epoch" ) ) ) ctx->in_kind[ i ] = IN_KIND_EPOCH; /* Firedancer */
1436 0 : else if( FD_LIKELY( !strcmp( link->name, "sign_shred" ) ) ) ctx->in_kind[ i ] = IN_KIND_SIGN;
1437 0 : else if( FD_LIKELY( !strcmp( link->name, "ipecho_out" ) ) ) ctx->in_kind[ i ] = IN_KIND_IPECHO;
1438 0 : else if( FD_LIKELY( !strcmp( link->name, "tower_out" ) ) ) ctx->in_kind[ i ] = IN_KIND_ROOTED;
1439 0 : else if( FD_LIKELY( !strcmp( link->name, "replay_resol" ) ) ) ctx->in_kind[ i ] = IN_KIND_ROOTEDH;
1440 0 : else if( FD_LIKELY( !strcmp( link->name, "crds_shred" ) ) ) { ctx->in_kind[ i ] = IN_KIND_CONTACT;
1441 0 : if( FD_UNLIKELY( has_contact_info_in ) ) FD_LOG_ERR(( "shred tile has multiple contact info in link types, can only be either gossip_out or crds_shred" ));
1442 0 : has_contact_info_in = 1;
1443 0 : }
1444 0 : else if( FD_LIKELY( !strcmp( link->name, "gossip_out" ) ) ) { ctx->in_kind[ i ] = IN_KIND_GOSSIP;
1445 0 : if( FD_UNLIKELY( has_contact_info_in ) ) FD_LOG_ERR(( "shred tile has multiple contact info in link types, can only be either gossip_out or crds_shred" ));
1446 0 : has_contact_info_in = 1;
1447 0 : }
1448 :
1449 0 : else FD_LOG_ERR(( "shred tile has unexpected input link %lu %s", i, link->name ));
1450 :
1451 0 : if( FD_LIKELY( !!link->mtu ) ) {
1452 0 : ctx->in[ i ].mem = link_wksp->wksp;
1453 0 : ctx->in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in[ i ].mem, link->dcache );
1454 0 : ctx->in[ i ].wmark = fd_dcache_compact_wmark ( ctx->in[ i ].mem, link->dcache, link->mtu );
1455 0 : }
1456 0 : }
1457 :
1458 0 : fd_topo_link_t const * net_out = &topo->links[ tile->out_link_id[ NET_OUT_IDX ] ];
1459 :
1460 0 : ctx->net_out_chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( net_out->dcache ), net_out->dcache );
1461 0 : ctx->net_out_mem = topo->workspaces[ topo->objs[ net_out->dcache_obj_id ].wksp_id ].wksp;
1462 0 : ctx->net_out_wmark = fd_dcache_compact_wmark ( ctx->net_out_mem, net_out->dcache, net_out->mtu );
1463 0 : ctx->net_out_chunk = ctx->net_out_chunk0;
1464 :
1465 0 : ctx->store = NULL;
1466 0 : ulong store_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "store" );
1467 0 : if( FD_LIKELY( store_obj_id!=ULONG_MAX ) ) { /* firedancer-only */
1468 0 : ctx->store = fd_store_join( fd_topo_obj_laddr( topo, store_obj_id ) );
1469 0 : FD_TEST( ctx->store->magic==FD_STORE_MAGIC );
1470 0 : FD_TEST( ctx->store->part_cnt==ctx->round_robin_cnt ); /* single-writer (shred tile) per store part */
1471 0 : FD_TEST( !fd_store_verify( ctx->store ) );
1472 0 : }
1473 :
1474 0 : if( FD_LIKELY( ctx->shred_out_idx!=ULONG_MAX ) ) { /* firedancer-only */
1475 0 : fd_topo_link_t const * shred_out = &topo->links[ tile->out_link_id[ ctx->shred_out_idx ] ];
1476 0 : ctx->shred_out_mem = topo->workspaces[ topo->objs[ shred_out->dcache_obj_id ].wksp_id ].wksp;
1477 0 : ctx->shred_out_chunk0 = fd_dcache_compact_chunk0( ctx->shred_out_mem, shred_out->dcache );
1478 0 : ctx->shred_out_wmark = fd_dcache_compact_wmark ( ctx->shred_out_mem, shred_out->dcache, shred_out->mtu );
1479 0 : ctx->shred_out_chunk = ctx->shred_out_chunk0;
1480 0 : FD_TEST( fd_dcache_compact_is_safe( ctx->shred_out_mem, shred_out->dcache, shred_out->mtu, shred_out->depth ) );
1481 0 : }
1482 :
1483 0 : if( FD_LIKELY( ctx->store_out_idx!=ULONG_MAX ) ) { /* frankendancer-only */
1484 0 : fd_topo_link_t const * store_out = &topo->links[ tile->out_link_id[ ctx->store_out_idx ] ];
1485 0 : ctx->store_out_mem = topo->workspaces[ topo->objs[ store_out->dcache_obj_id ].wksp_id ].wksp;
1486 0 : ctx->store_out_chunk0 = fd_dcache_compact_chunk0( ctx->store_out_mem, store_out->dcache );
1487 0 : ctx->store_out_wmark = fd_dcache_compact_wmark ( ctx->store_out_mem, store_out->dcache, store_out->mtu );
1488 0 : ctx->store_out_chunk = ctx->store_out_chunk0;
1489 0 : FD_TEST( fd_dcache_compact_is_safe( ctx->store_out_mem, store_out->dcache, store_out->mtu, store_out->depth ) );
1490 0 : }
1491 :
1492 0 : ctx->poh_in_expect_seq = 0UL;
1493 :
1494 0 : ctx->shredder_fec_set_idx = 0UL;
1495 0 : ctx->shredder_max_fec_set_idx = shred_store_mcache_depth + FD_SHRED_BATCH_FEC_SETS_MAX;
1496 :
1497 0 : ctx->chained_merkle_root = NULL;
1498 0 : memset( ctx->out_merkle_roots, 0, sizeof(ctx->out_merkle_roots) );
1499 :
1500 0 : for( ulong i=0UL; i<FD_SHRED_BATCH_FEC_SETS_MAX; i++ ) { ctx->send_fec_set_idx[ i ] = ULONG_MAX; }
1501 0 : ctx->send_fec_set_cnt = 0UL;
1502 :
1503 0 : ctx->shred_buffer_sz = 0UL;
1504 0 : memset( ctx->shred_buffer, 0xFF, FD_NET_MTU );
1505 :
1506 0 : ctx->leader_bank = NULL;
1507 :
1508 0 : fd_histf_join( fd_histf_new( ctx->metrics->contact_info_cnt, FD_MHIST_MIN( SHRED, CONTACT_INFO_PER_MESSAGE ),
1509 0 : FD_MHIST_MAX( SHRED, CONTACT_INFO_PER_MESSAGE ) ) );
1510 0 : fd_histf_join( fd_histf_new( ctx->metrics->batch_sz, FD_MHIST_MIN( SHRED, BATCH_SIZE_BYTES ),
1511 0 : FD_MHIST_MAX( SHRED, BATCH_SIZE_BYTES ) ) );
1512 0 : fd_histf_join( fd_histf_new( ctx->metrics->batch_microblock_cnt, FD_MHIST_MIN( SHRED, MICROBLOCK_PER_BATCH ),
1513 0 : FD_MHIST_MAX( SHRED, MICROBLOCK_PER_BATCH ) ) );
1514 0 : fd_histf_join( fd_histf_new( ctx->metrics->shredding_timing, FD_MHIST_SECONDS_MIN( SHRED, SHREDDING_DURATION_SECONDS ),
1515 0 : FD_MHIST_SECONDS_MAX( SHRED, SHREDDING_DURATION_SECONDS ) ) );
1516 0 : fd_histf_join( fd_histf_new( ctx->metrics->add_shred_timing, FD_MHIST_SECONDS_MIN( SHRED, ADD_SHRED_DURATION_SECONDS ),
1517 0 : FD_MHIST_SECONDS_MAX( SHRED, ADD_SHRED_DURATION_SECONDS ) ) );
1518 0 : memset( ctx->metrics->shred_processing_result, '\0', sizeof(ctx->metrics->shred_processing_result) );
1519 0 : ctx->metrics->invalid_block_id_cnt = 0UL;
1520 0 : ctx->metrics->shred_rejected_unchained_cnt = 0UL;
1521 0 : ctx->metrics->repair_rcv_cnt = 0UL;
1522 0 : ctx->metrics->repair_rcv_bytes = 0UL;
1523 0 : ctx->metrics->turbine_rcv_cnt = 0UL;
1524 0 : ctx->metrics->turbine_rcv_bytes = 0UL;
1525 0 : ctx->metrics->bad_nonce = 0UL;
1526 :
1527 0 : ctx->pending_batch.microblock_cnt = 0UL;
1528 0 : ctx->pending_batch.txn_cnt = 0UL;
1529 0 : ctx->pending_batch.pos = 0UL;
1530 0 : ctx->pending_batch.slot = 0UL;
1531 0 : memset( ctx->pending_batch.payload, 0, sizeof(ctx->pending_batch.payload) );
1532 :
1533 0 : memset( ctx->epoch_schedule, 0, sizeof(ctx->epoch_schedule) );
1534 0 : for( ulong i=0UL; i<FD_SHRED_FEATURES_ACTIVATION_SLOT_CNT; i++ ) {
1535 0 : ctx->features_activation->slots[i] = FD_SHRED_FEATURES_ACTIVATION_SLOT_DISABLED;
1536 0 : }
1537 :
1538 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
1539 0 : if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
1540 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
1541 :
1542 0 : memset( ctx->block_ids, 0, sizeof(ctx->block_ids) );
1543 0 : }
1544 :
1545 : static ulong
1546 : populate_allowed_seccomp( fd_topo_t const * topo,
1547 : fd_topo_tile_t const * tile,
1548 : ulong out_cnt,
1549 0 : struct sock_filter * out ) {
1550 0 : (void)topo;
1551 0 : (void)tile;
1552 :
1553 0 : populate_sock_filter_policy_fd_shred_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
1554 0 : return sock_filter_policy_fd_shred_tile_instr_cnt;
1555 0 : }
1556 :
1557 : static ulong
1558 : populate_allowed_fds( fd_topo_t const * topo,
1559 : fd_topo_tile_t const * tile,
1560 : ulong out_fds_cnt,
1561 0 : int * out_fds ) {
1562 0 : (void)topo;
1563 0 : (void)tile;
1564 :
1565 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
1566 :
1567 0 : ulong out_cnt = 0UL;
1568 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
1569 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
1570 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
1571 0 : return out_cnt;
1572 0 : }
1573 :
1574 : /* Excluding net_out (where the link is unreliable), STEM_BURST needs
1575 : to guarantee enough credits for the worst case. There are 4 cases
1576 : to consider: (IN_KIND_NET/IN_KIND_POH) x (Frankendancer/Firedancer)
1577 : In the IN_KIND_NET case: (Frankendancer) sends 1 frag to
1578 : store; (Firedancer) that is one frag for the shred to repair, and
1579 : then another frag to repair for the FEC set.
1580 : In the IN_KIND_POH case: (Frankendancer) there might be
1581 : FD_SHRED_BATCH_FEC_SETS_MAX FEC sets; (Firedancer) that is
1582 : FD_SHRED_BATCH_FEC_SETS_MAX frags to repair (one per FEC set).
1583 : Therefore, the worst case is IN_KIND_POH for Frankendancer. */
1584 0 : #define STEM_BURST (FD_SHRED_BATCH_FEC_SETS_MAX + 40UL)
1585 :
1586 : /* See explanation in fd_pack */
1587 0 : #define STEM_LAZY (128L*3000L)
1588 :
1589 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_shred_ctx_t
1590 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_shred_ctx_t)
1591 :
1592 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
1593 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
1594 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
1595 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
1596 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
1597 :
1598 : #include "../stem/fd_stem.c"
1599 :
1600 : fd_topo_run_tile_t fd_tile_shred = {
1601 : .name = "shred",
1602 : .populate_allowed_seccomp = populate_allowed_seccomp,
1603 : .populate_allowed_fds = populate_allowed_fds,
1604 : .scratch_align = scratch_align,
1605 : .scratch_footprint = scratch_footprint,
1606 : .privileged_init = privileged_init,
1607 : .unprivileged_init = unprivileged_init,
1608 : .run = stem_run,
1609 : };
|