Line data Source code
1 : #include "../tiles.h"
2 :
3 : #include "generated/fd_shred_tile_seccomp.h"
4 : #include "../../util/pod/fd_pod_format.h"
5 : #include "../shred/fd_shredder.h"
6 : #include "../shred/fd_shred_batch.h"
7 : #include "../shred/fd_shred_dest.h"
8 : #include "../shred/fd_fec_resolver.h"
9 : #include "../shred/fd_stake_ci.h"
10 : #include "../store/fd_store.h"
11 : #include "../keyguard/fd_keyload.h"
12 : #include "../keyguard/fd_keyguard.h"
13 : #include "../keyguard/fd_keyswitch.h"
14 : #include "../fd_disco.h"
15 : #include "../net/fd_net_tile.h"
16 : #include "../../flamenco/leaders/fd_leaders.h"
17 : #include "../../util/net/fd_net_headers.h"
18 :
19 : #include <linux/unistd.h>
20 :
21 : /* The shred tile handles shreds from two data sources: shreds generated
22 : from microblocks from the banking tile, and shreds retransmitted from
23 : the network.
24 :
25 : They have rather different semantics, but at the end of the day, they
26 : both result in a bunch of shreds and FEC sets that need to be sent to
27 : the blockstore and on the network, which is why one tile handles
28 : both.
29 :
30 : We segment the memory for the two types of shreds into two halves of
31 : a dcache because they follow somewhat different flow control
32 : patterns. For flow control, the normal guarantee we want to provide
33 : is that the dcache entry is not overwritten unless the mcache entry
34 : has also been overwritten. The normal way to do this when using both
35 : cyclically and with a 1-to-1 mapping is to make the dcache at least
36 : `burst` entries bigger than the mcache.
37 :
38 : In this tile, we use one output mcache with one output dcache (which
39 : is logically partitioned into two) for the two sources of data. The
40 : worst case for flow control is when we're only sending with one of
41 : the dcache partitions at a time though, so we can consider them
42 : separately.
43 :
44 : From bank: Every FEC set triggers at least two mcache entries (one
45 : for parity and one for data), so at most, we have ceil(mcache
46 : depth/2) FEC sets exposed. This means we need to decompose dcache
47 : into at least ceil(mcache depth/2)+1 FEC sets.
48 :
49 : From the network: The FEC resolver doesn't use a cyclic order, but it
50 : does promise that once it returns an FEC set, it will return at least
51 : complete_depth FEC sets before returning it again. This means we
52 : want at most complete_depth-1 FEC sets exposed, so
53 : complete_depth=ceil(mcache depth/2)+1 FEC sets as above. The FEC
54 : resolver has the ability to keep individual shreds for partial_depth
55 : calls, but because in this version of the shred tile, we send each
56 : shred to all its destinations as soon as we get it, we don't need
57 : that functionality, so we set partial_depth=1.
58 :
59 : Adding these up, we get 2*ceil(mcache_depth/2)+3+fec_resolver_depth
60 : FEC sets, which is no more than mcache_depth+4+fec_resolver_depth.
61 : Each FEC is paired with 4 fd_shred34_t structs, so that means we need
62 : to decompose the dcache into 4*mcache_depth + 4*fec_resolver_depth +
63 : 16 fd_shred34_t structs.
64 :
65 : A note on parallelization. From the network, shreds are distributed
66 : to tiles by their signature, so all the shreds for a given FEC set
67 : are processed by the same tile. From bank, the original
68 : implementation used to parallelize by batch of microblocks (so within
69 : a block, batches were distributed to different tiles). To support
70 : chained merkle shreds, the current implementation processes all the
71 : batches on tile 0 -- this should be a temporary state while Solana
72 : moves to a newer shred format that support better parallelization. */
73 :
74 : /* The memory this tile uses is a bit complicated and has some logical
75 : aliasing to facilitate zero-copy use. We have a dcache containing
76 : fd_shred34_t objects, which are basically 34 fd_shred_t objects
77 : padded to their max size, where 34 is set so that the size of the
78 : fd_shred34_t object (including some metadata) is less than
79 : USHORT_MAX, which facilitates sending it using Tango. Then, for each
80 : set of 4 consecutive fd_shred34_t objects, we have an fd_fec_set_t.
81 : The first 34 data shreds point to the payload section of the payload
82 : section of each of the packets in the first fd_shred34_t. The other
83 : 33 data shreds point into the second fd_shred34_t. Similar for the
84 : parity shreds pointing into the third and fourth fd_shred34_t. */
85 :
86 : #define FD_SHRED_TILE_SCRATCH_ALIGN 128UL
87 :
88 0 : #define IN_KIND_CONTACT (0UL)
89 0 : #define IN_KIND_STAKE (1UL)
90 0 : #define IN_KIND_POH (2UL)
91 0 : #define IN_KIND_NET (3UL)
92 0 : #define IN_KIND_SIGN (4UL)
93 0 : #define IN_KIND_REPAIR (5UL)
94 :
95 0 : #define NET_OUT_IDX 1
96 0 : #define SIGN_OUT_IDX 2
97 :
98 0 : #define DCACHE_ENTRIES_PER_FEC_SET (4UL)
99 : FD_STATIC_ASSERT( sizeof(fd_shred34_t) < USHORT_MAX, shred_34 );
100 : FD_STATIC_ASSERT( 34*DCACHE_ENTRIES_PER_FEC_SET >= FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX, shred_34 );
101 : FD_STATIC_ASSERT( sizeof(fd_shred34_t) == FD_SHRED_STORE_MTU, shred_34 );
102 :
103 : FD_STATIC_ASSERT( sizeof(fd_entry_batch_meta_t)==56UL, poh_shred_mtu );
104 :
105 0 : #define FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT 2
106 :
107 : /* See note on parallelization above. Currently we process all batches in tile 0. */
108 : #if 1
109 : #define SHOULD_PROCESS_THESE_SHREDS ( ctx->round_robin_id==0 )
110 : #else
111 : #define SHOULD_PROCESS_THESE_SHREDS ( ctx->batch_cnt%ctx->round_robin_cnt==ctx->round_robin_id )
112 : #endif
113 :
114 : /* The behavior of the shred tile is slightly different for
115 : Frankendancer vs Firedancer. For example, Frankendancer produces
116 : chained merkle shreds, while Firedancer doesn't yet. We can check
117 : at runtime the difference by inspecting the topology. The simplest
118 : way is to test if ctx->store is initialized.
119 :
120 : FIXME don't assume only frank vs. fire */
121 : #define IS_FIREDANCER ( ctx->store!=NULL )
122 :
123 : typedef union {
124 : struct {
125 : fd_wksp_t * mem;
126 : ulong chunk0;
127 : ulong wmark;
128 : };
129 : fd_net_rx_bounds_t net_rx;
130 : } fd_shred_in_ctx_t;
131 :
132 : typedef struct {
133 : fd_shredder_t * shredder;
134 : fd_fec_resolver_t * resolver;
135 : fd_pubkey_t identity_key[1]; /* Just the public key */
136 :
137 : ulong round_robin_id;
138 : ulong round_robin_cnt;
139 : /* Number of batches shredded from PoH during the current slot.
140 : This should be the same for all the shred tiles. */
141 : ulong batch_cnt;
142 : /* Slot of the most recent microblock we've seen from PoH,
143 : or 0 if we haven't seen one yet */
144 : ulong slot;
145 :
146 : fd_keyswitch_t * keyswitch;
147 : fd_keyguard_client_t keyguard_client[1];
148 :
149 : /* shred34 and fec_sets are very related: fec_sets[i] has pointers
150 : to the shreds in shred34[4*i + k] for k=0,1,2,3. */
151 : fd_shred34_t * shred34;
152 : fd_fec_set_t * fec_sets;
153 :
154 : fd_stake_ci_t * stake_ci;
155 : /* These are used in between during_frag and after_frag */
156 : fd_shred_dest_weighted_t * new_dest_ptr;
157 : ulong new_dest_cnt;
158 : ulong shredded_txn_cnt;
159 :
160 : ulong poh_in_expect_seq;
161 :
162 : ushort net_id;
163 :
164 : int skip_frag;
165 :
166 : ulong adtl_dests_leader_cnt;
167 : fd_shred_dest_weighted_t adtl_dests_leader [ FD_TOPO_ADTL_DESTS_MAX ];
168 : ulong adtl_dests_retransmit_cnt;
169 : fd_shred_dest_weighted_t adtl_dests_retransmit[ FD_TOPO_ADTL_DESTS_MAX ];
170 :
171 : fd_ip4_udp_hdrs_t data_shred_net_hdr [1];
172 : fd_ip4_udp_hdrs_t parity_shred_net_hdr[1];
173 :
174 : fd_wksp_t * shred_store_wksp;
175 :
176 : ulong shredder_fec_set_idx; /* In [0, shredder_max_fec_set_idx) */
177 : ulong shredder_max_fec_set_idx; /* exclusive */
178 :
179 : ulong send_fec_set_idx[ FD_SHRED_BATCH_FEC_SETS_MAX ];
180 : ulong send_fec_set_cnt;
181 : ulong tsorig; /* timestamp of the last packet in compressed form */
182 :
183 : /* Includes Ethernet, IP, UDP headers */
184 : ulong shred_buffer_sz;
185 : uchar shred_buffer[ FD_NET_MTU ];
186 :
187 : fd_shred_in_ctx_t in[ 32 ];
188 : int in_kind[ 32 ];
189 :
190 : fd_wksp_t * net_out_mem;
191 : ulong net_out_chunk0;
192 : ulong net_out_wmark;
193 : ulong net_out_chunk;
194 :
195 : ulong store_out_idx;
196 : fd_wksp_t * store_out_mem;
197 : ulong store_out_chunk0;
198 : ulong store_out_wmark;
199 : ulong store_out_chunk;
200 :
201 : ulong repair_out_idx;
202 : fd_wksp_t * repair_out_mem;
203 : ulong repair_out_chunk0;
204 : ulong repair_out_wmark;
205 : ulong repair_out_chunk;
206 :
207 : fd_store_t * store;
208 :
209 : struct {
210 : fd_histf_t contact_info_cnt[ 1 ];
211 : fd_histf_t batch_sz[ 1 ];
212 : fd_histf_t batch_microblock_cnt[ 1 ];
213 : fd_histf_t shredding_timing[ 1 ];
214 : fd_histf_t add_shred_timing[ 1 ];
215 : ulong shred_processing_result[ FD_FEC_RESOLVER_ADD_SHRED_RETVAL_CNT+FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT ];
216 : ulong invalid_block_id_cnt;
217 : ulong shred_rejected_unchained_cnt;
218 : fd_histf_t store_insert_wait[ 1 ];
219 : fd_histf_t store_insert_work[ 1 ];
220 : } metrics[ 1 ];
221 :
222 : struct {
223 : ulong txn_cnt;
224 : ulong pos; /* in payload, range [0, FD_SHRED_BATCH_RAW_BUF_SZ-8UL) */
225 : ulong slot; /* set to 0 when pos==0 */
226 : union {
227 : struct {
228 : ulong microblock_cnt;
229 : uchar payload[ FD_SHRED_BATCH_RAW_BUF_SZ - 8UL ];
230 : };
231 : uchar raw[ FD_SHRED_BATCH_RAW_BUF_SZ ];
232 : };
233 : } pending_batch;
234 :
235 : fd_shred_features_activation_t features_activation[1];
236 : /* too large to be left in the stack */
237 : fd_shred_dest_idx_t scratchpad_dests[ FD_SHRED_DEST_MAX_FANOUT*(FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX) ];
238 :
239 : uchar chained_merkle_root[ FD_SHRED_MERKLE_ROOT_SZ ];
240 : } fd_shred_ctx_t;
241 :
242 : FD_FN_CONST static inline ulong
243 0 : scratch_align( void ) {
244 0 : return 128UL;
245 0 : }
246 :
247 : FD_FN_PURE static inline ulong
248 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
249 :
250 0 : ulong fec_resolver_footprint = fd_fec_resolver_footprint( tile->shred.fec_resolver_depth, 1UL, tile->shred.depth,
251 0 : 128UL * tile->shred.fec_resolver_depth );
252 0 : ulong fec_set_cnt = tile->shred.depth + tile->shred.fec_resolver_depth + 4UL;
253 :
254 0 : ulong l = FD_LAYOUT_INIT;
255 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_shred_ctx_t), sizeof(fd_shred_ctx_t) );
256 0 : l = FD_LAYOUT_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
257 0 : l = FD_LAYOUT_APPEND( l, fd_fec_resolver_align(), fec_resolver_footprint );
258 0 : l = FD_LAYOUT_APPEND( l, fd_shredder_align(), fd_shredder_footprint() );
259 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_fec_set_t), sizeof(fd_fec_set_t)*fec_set_cnt );
260 0 : return FD_LAYOUT_FINI( l, scratch_align() );
261 0 : }
262 :
263 : static inline void
264 0 : during_housekeeping( fd_shred_ctx_t * ctx ) {
265 0 : if( FD_UNLIKELY( fd_keyswitch_state_query( ctx->keyswitch )==FD_KEYSWITCH_STATE_SWITCH_PENDING ) ) {
266 0 : ulong seq_must_complete = ctx->keyswitch->param;
267 :
268 0 : if( FD_UNLIKELY( fd_seq_lt( ctx->poh_in_expect_seq, seq_must_complete ) ) ) {
269 : /* See fd_keyswitch.h, we need to flush any in-flight shreds from
270 : the leader pipeline before switching key. */
271 0 : FD_LOG_WARNING(( "Flushing in-flight unpublished shreds, must reach seq %lu, currently at %lu ...", seq_must_complete, ctx->poh_in_expect_seq ));
272 0 : return;
273 0 : }
274 :
275 0 : memcpy( ctx->identity_key->uc, ctx->keyswitch->bytes, 32UL );
276 0 : fd_stake_ci_set_identity( ctx->stake_ci, ctx->identity_key );
277 0 : fd_keyswitch_state( ctx->keyswitch, FD_KEYSWITCH_STATE_COMPLETED );
278 0 : }
279 0 : }
280 :
281 : static inline void
282 0 : metrics_write( fd_shred_ctx_t * ctx ) {
283 0 : FD_MHIST_COPY( SHRED, CLUSTER_CONTACT_INFO_CNT, ctx->metrics->contact_info_cnt );
284 0 : FD_MHIST_COPY( SHRED, BATCH_SZ, ctx->metrics->batch_sz );
285 0 : FD_MHIST_COPY( SHRED, BATCH_MICROBLOCK_CNT, ctx->metrics->batch_microblock_cnt );
286 0 : FD_MHIST_COPY( SHRED, SHREDDING_DURATION_SECONDS, ctx->metrics->shredding_timing );
287 0 : FD_MHIST_COPY( SHRED, ADD_SHRED_DURATION_SECONDS, ctx->metrics->add_shred_timing );
288 :
289 0 : FD_MCNT_SET ( SHRED, INVALID_BLOCK_ID, ctx->metrics->invalid_block_id_cnt );
290 0 : FD_MCNT_SET ( SHRED, SHRED_REJECTED_UNCHAINED, ctx->metrics->shred_rejected_unchained_cnt );
291 0 : FD_MHIST_COPY( SHRED, STORE_INSERT_WAIT, ctx->metrics->store_insert_wait );
292 0 : FD_MHIST_COPY( SHRED, STORE_INSERT_WORK, ctx->metrics->store_insert_work );
293 :
294 0 : FD_MCNT_ENUM_COPY( SHRED, SHRED_PROCESSED, ctx->metrics->shred_processing_result );
295 0 : }
296 :
297 : static inline void
298 : handle_new_cluster_contact_info( fd_shred_ctx_t * ctx,
299 0 : uchar const * buf ) {
300 0 : ulong const * header = (ulong const *)fd_type_pun_const( buf );
301 :
302 0 : ulong dest_cnt = header[ 0 ];
303 0 : fd_histf_sample( ctx->metrics->contact_info_cnt, dest_cnt );
304 :
305 0 : if( dest_cnt >= MAX_SHRED_DESTS )
306 0 : FD_LOG_ERR(( "Cluster nodes had %lu destinations, which was more than the max of %lu", dest_cnt, MAX_SHRED_DESTS ));
307 :
308 0 : fd_shred_dest_wire_t const * in_dests = fd_type_pun_const( header+1UL );
309 0 : fd_shred_dest_weighted_t * dests = fd_stake_ci_dest_add_init( ctx->stake_ci );
310 :
311 0 : ctx->new_dest_ptr = dests;
312 0 : ctx->new_dest_cnt = dest_cnt;
313 :
314 0 : for( ulong i=0UL; i<dest_cnt; i++ ) {
315 0 : memcpy( dests[i].pubkey.uc, in_dests[i].pubkey, 32UL );
316 0 : dests[i].ip4 = in_dests[i].ip4_addr;
317 0 : dests[i].port = in_dests[i].udp_port;
318 0 : }
319 0 : }
320 :
321 : static inline void
322 0 : finalize_new_cluster_contact_info( fd_shred_ctx_t * ctx ) {
323 0 : fd_stake_ci_dest_add_fini( ctx->stake_ci, ctx->new_dest_cnt );
324 0 : }
325 :
326 : static inline int
327 : before_frag( fd_shred_ctx_t * ctx,
328 : ulong in_idx,
329 : ulong seq,
330 0 : ulong sig ) {
331 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH ) ) {
332 0 : ctx->poh_in_expect_seq = seq+1UL;
333 0 : return (int)(fd_disco_poh_sig_pkt_type( sig )!=POH_PKT_TYPE_MICROBLOCK) & (int)(fd_disco_poh_sig_pkt_type( sig )!=POH_PKT_TYPE_FEAT_ACT_SLOT);
334 0 : }
335 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
336 0 : return (int)(fd_disco_netmux_sig_proto( sig )!=DST_PROTO_SHRED) & (int)(fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR);
337 0 : }
338 0 : return 0;
339 0 : }
340 :
341 : static void
342 : during_frag( fd_shred_ctx_t * ctx,
343 : ulong in_idx,
344 : ulong seq FD_PARAM_UNUSED,
345 : ulong sig,
346 : ulong chunk,
347 : ulong sz,
348 0 : ulong ctl ) {
349 :
350 0 : ctx->skip_frag = 0;
351 :
352 0 : ctx->tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
353 :
354 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPAIR ) ) {
355 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
356 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
357 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
358 :
359 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
360 0 : fd_memcpy( ctx->shred_buffer, dcache_entry, sz );
361 0 : return;
362 0 : }
363 :
364 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_CONTACT ) ) {
365 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
366 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
367 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
368 :
369 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
370 0 : handle_new_cluster_contact_info( ctx, dcache_entry );
371 0 : return;
372 0 : }
373 :
374 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_STAKE ) ) {
375 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
376 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
377 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
378 :
379 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
380 0 : fd_stake_ci_stake_msg_init( ctx->stake_ci, fd_type_pun_const( dcache_entry ) );
381 0 : return;
382 0 : }
383 :
384 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH ) ) {
385 0 : ctx->send_fec_set_cnt = 0UL;
386 :
387 0 : if( FD_UNLIKELY( (fd_disco_poh_sig_pkt_type( sig )==POH_PKT_TYPE_FEAT_ACT_SLOT) ) ) {
388 : /* There is a subset of FD_SHRED_FEATURES_ACTIVATION_... slots that
389 : the shred tile needs to be aware of. Since this requires the
390 : bank, we are forced (so far) to receive them from the poh tile
391 : (as a POH_PKT_TYPE_FEAT_ACT_SLOT). This is not elegant, and it
392 : should be revised in the future (TODO), but it provides a
393 : "temporary" working solution to handle features activation. */
394 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
395 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz!=(sizeof(fd_shred_features_activation_t)) ) )
396 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
397 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
398 :
399 0 : fd_shred_features_activation_t const * act_data = (fd_shred_features_activation_t const *)dcache_entry;
400 0 : memcpy( ctx->features_activation, act_data, sizeof(fd_shred_features_activation_t) );
401 0 : }
402 0 : else { /* (fd_disco_poh_sig_pkt_type( sig )==POH_PKT_TYPE_MICROBLOCK) */
403 : /* This is a frag from the PoH tile. We'll copy it to our pending
404 : microblock batch and shred it if necessary (last in block or
405 : above watermark). We just go ahead and shred it here, even
406 : though we may get overrun. If we do end up getting overrun, we
407 : just won't send these shreds out and we'll reuse the FEC set for
408 : the next one. From a higher level though, if we do get overrun,
409 : a bunch of shreds will never be transmitted, and we'll end up
410 : producing a block that never lands on chain. */
411 :
412 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
413 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>FD_POH_SHRED_MTU ||
414 0 : sz<(sizeof(fd_entry_batch_meta_t)+sizeof(fd_entry_batch_header_t)) ) )
415 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
416 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
417 :
418 0 : fd_entry_batch_meta_t const * entry_meta = (fd_entry_batch_meta_t const *)dcache_entry;
419 0 : uchar const * entry = dcache_entry + sizeof(fd_entry_batch_meta_t);
420 0 : ulong entry_sz = sz - sizeof(fd_entry_batch_meta_t);
421 :
422 0 : fd_entry_batch_header_t const * microblock = (fd_entry_batch_header_t const *)entry;
423 :
424 : /* It should never be possible for this to fail, but we check it
425 : anyway. */
426 0 : FD_TEST( entry_sz + ctx->pending_batch.pos <= sizeof(ctx->pending_batch.payload) );
427 :
428 0 : ulong target_slot = fd_disco_poh_sig_slot( sig );
429 0 : if( FD_UNLIKELY( (ctx->pending_batch.microblock_cnt>0) & (ctx->pending_batch.slot!=target_slot) ) ) {
430 : /* TODO: The Agave client sends a dummy entry batch with only 1
431 : byte and the block-complete bit set. This helps other
432 : validators know that the block is dead and they should not try
433 : to continue building a fork on it. We probably want a similar
434 : approach eventually. */
435 0 : FD_LOG_WARNING(( "Abandoning %lu microblocks for slot %lu and switching to slot %lu",
436 0 : ctx->pending_batch.microblock_cnt, ctx->pending_batch.slot, target_slot ));
437 0 : ctx->pending_batch.slot = 0UL;
438 0 : ctx->pending_batch.pos = 0UL;
439 0 : ctx->pending_batch.microblock_cnt = 0UL;
440 0 : ctx->pending_batch.txn_cnt = 0UL;
441 0 : ctx->batch_cnt = 0UL;
442 :
443 0 : FD_MCNT_INC( SHRED, MICROBLOCKS_ABANDONED, 1UL );
444 0 : }
445 :
446 0 : ctx->pending_batch.slot = target_slot;
447 0 : if( FD_UNLIKELY( target_slot!=ctx->slot )) {
448 : /* Reset batch count if we are in a new slot */
449 0 : ctx->batch_cnt = 0UL;
450 0 : ctx->slot = target_slot;
451 :
452 : /* Only copy parent_block_id to chained_merkle_root at the beginning
453 : of a new slot*/
454 0 : if( FD_UNLIKELY( SHOULD_PROCESS_THESE_SHREDS ) ) {
455 : /* chained_merkle_root is set as the merkle root of the last FEC set
456 : of the parent block (and passed in by POH tile) */
457 0 : if( FD_LIKELY( entry_meta->parent_block_id_valid ) ) {
458 0 : memcpy( ctx->chained_merkle_root, entry_meta->parent_block_id, FD_SHRED_MERKLE_ROOT_SZ );
459 0 : } else {
460 0 : ctx->metrics->invalid_block_id_cnt++;
461 0 : memset( ctx->chained_merkle_root, 0, FD_SHRED_MERKLE_ROOT_SZ );
462 0 : }
463 0 : }
464 0 : }
465 :
466 0 : if( FD_LIKELY( !SHOULD_PROCESS_THESE_SHREDS ) ) {
467 : /* If we are not processing this batch, filter in after_frag. */
468 0 : ctx->skip_frag = 1;
469 0 : }
470 :
471 0 : ulong pending_batch_wmark = FD_SHRED_BATCH_WMARK_CHAINED;
472 0 : uchar * chained_merkle_root = ctx->chained_merkle_root;
473 0 : ulong load_for_32_shreds = FD_SHREDDER_CHAINED_FEC_SET_PAYLOAD_SZ;
474 : /* All fec sets in the last batch of a block need to be resigned.
475 : This needs to match Agave's behavior - as a reference, see:
476 : https://github.com/anza-xyz/agave/blob/v2.3/ledger/src/shred/merkle.rs#L1040 */
477 0 : if( FD_UNLIKELY( entry_meta->block_complete ) ) {
478 0 : pending_batch_wmark = FD_SHRED_BATCH_WMARK_RESIGNED;
479 : /* chained_merkle_root also applies to resigned FEC sets. */
480 0 : load_for_32_shreds = FD_SHREDDER_RESIGNED_FEC_SET_PAYLOAD_SZ;
481 0 : }
482 : /* TODO remove once unchained fec sets have been deprecated. */
483 0 : if( FD_LIKELY( IS_FIREDANCER ) ) {
484 0 : pending_batch_wmark = FD_SHRED_BATCH_WMARK_NORMAL;
485 0 : chained_merkle_root = NULL;
486 0 : load_for_32_shreds = FD_SHREDDER_NORMAL_FEC_SET_PAYLOAD_SZ;
487 0 : }
488 :
489 : /* If this microblock completes the block, the batch is then
490 : finalized here. Otherwise, we check whether the new entry
491 : would exceed the pending_batch_wmark. If true, then the
492 : batch is closed now, shredded, and a new batch is started
493 : with the incoming microblock. If false, no shredding takes
494 : place, and the microblock is added to the current batch. */
495 0 : int batch_would_exceed_wmark = ( ctx->pending_batch.pos + entry_sz ) > pending_batch_wmark;
496 0 : int include_in_current_batch = entry_meta->block_complete | ( !batch_would_exceed_wmark );
497 0 : int process_current_batch = entry_meta->block_complete | batch_would_exceed_wmark;
498 0 : int init_new_batch = !include_in_current_batch;
499 :
500 0 : if( FD_LIKELY( include_in_current_batch ) ) {
501 0 : if( FD_UNLIKELY( SHOULD_PROCESS_THESE_SHREDS ) ) {
502 : /* Ugh, yet another memcpy */
503 0 : fd_memcpy( ctx->pending_batch.payload + ctx->pending_batch.pos, entry, entry_sz );
504 0 : }
505 0 : ctx->pending_batch.pos += entry_sz;
506 0 : ctx->pending_batch.microblock_cnt += 1UL;
507 0 : ctx->pending_batch.txn_cnt += microblock->txn_cnt;
508 0 : }
509 :
510 0 : if( FD_LIKELY( process_current_batch )) {
511 : /* Batch and padding size calculation. */
512 0 : ulong batch_sz = sizeof(ulong) + ctx->pending_batch.pos; /* without padding */
513 0 : ulong batch_sz_padded = load_for_32_shreds * ( ( batch_sz + load_for_32_shreds - 1UL ) / load_for_32_shreds );
514 0 : ulong padding_sz = batch_sz_padded - batch_sz;
515 :
516 0 : if( FD_UNLIKELY( SHOULD_PROCESS_THESE_SHREDS ) ) {
517 : /* If it's our turn, shred this batch. FD_UNLIKELY because shred
518 : tile cnt generally >= 2 */
519 :
520 0 : long shredding_timing = -fd_tickcount();
521 :
522 0 : fd_memset( ctx->pending_batch.payload + ctx->pending_batch.pos, 0, padding_sz );
523 :
524 0 : ctx->send_fec_set_cnt = 0UL; /* verbose */
525 0 : ctx->shredded_txn_cnt = ctx->pending_batch.txn_cnt;
526 :
527 0 : fd_shredder_init_batch( ctx->shredder, ctx->pending_batch.raw, batch_sz_padded, target_slot, entry_meta );
528 :
529 0 : ulong pend_sz = batch_sz_padded;
530 0 : while( pend_sz > 0UL ) {
531 :
532 0 : fd_fec_set_t * out = ctx->fec_sets + ctx->shredder_fec_set_idx;
533 :
534 0 : FD_TEST( fd_shredder_next_fec_set( ctx->shredder, out, chained_merkle_root ) );
535 :
536 0 : d_rcvd_join( d_rcvd_new( d_rcvd_delete( d_rcvd_leave( out->data_shred_rcvd ) ) ) );
537 0 : p_rcvd_join( p_rcvd_new( p_rcvd_delete( p_rcvd_leave( out->parity_shred_rcvd ) ) ) );
538 :
539 0 : ctx->send_fec_set_idx[ ctx->send_fec_set_cnt ] = ctx->shredder_fec_set_idx;
540 0 : ctx->send_fec_set_cnt += 1UL;
541 0 : ctx->shredder_fec_set_idx = (ctx->shredder_fec_set_idx+1UL)%ctx->shredder_max_fec_set_idx;
542 :
543 0 : pend_sz -= load_for_32_shreds;
544 0 : }
545 :
546 0 : fd_shredder_fini_batch( ctx->shredder );
547 0 : shredding_timing += fd_tickcount();
548 :
549 : /* Update metrics */
550 0 : fd_histf_sample( ctx->metrics->batch_sz, batch_sz /* without padding */ );
551 0 : fd_histf_sample( ctx->metrics->batch_microblock_cnt, ctx->pending_batch.microblock_cnt );
552 0 : fd_histf_sample( ctx->metrics->shredding_timing, (ulong)shredding_timing );
553 0 : } else {
554 0 : ctx->send_fec_set_cnt = 0UL; /* verbose */
555 :
556 0 : ulong shred_type = FD_SHRED_TYPE_MERKLE_DATA_CHAINED;
557 0 : if( FD_UNLIKELY( entry_meta->block_complete ) ) {
558 0 : shred_type = FD_SHRED_TYPE_MERKLE_DATA_CHAINED_RESIGNED;
559 0 : }
560 0 : if( FD_LIKELY( IS_FIREDANCER ) ) {
561 0 : shred_type = FD_SHRED_TYPE_MERKLE_DATA;
562 0 : }
563 0 : fd_shredder_skip_batch( ctx->shredder, batch_sz_padded, target_slot, shred_type );
564 0 : }
565 :
566 0 : ctx->pending_batch.slot = 0UL;
567 0 : ctx->pending_batch.pos = 0UL;
568 0 : ctx->pending_batch.microblock_cnt = 0UL;
569 0 : ctx->pending_batch.txn_cnt = 0UL;
570 0 : ctx->batch_cnt++;
571 0 : }
572 :
573 0 : if( FD_UNLIKELY( init_new_batch ) ) {
574 : /* TODO: this assumes that SHOULD_PROCESS_THESE_SHREDS is
575 : constant across batches. Otherwise, the condition may
576 : need to be removed (or adjusted). */
577 0 : if( FD_UNLIKELY( SHOULD_PROCESS_THESE_SHREDS ) ) {
578 : /* Ugh, yet another memcpy */
579 0 : fd_memcpy( ctx->pending_batch.payload + 0UL /* verbose */, entry, entry_sz );
580 0 : }
581 0 : ctx->pending_batch.slot = target_slot;
582 0 : ctx->pending_batch.pos = entry_sz;
583 0 : ctx->pending_batch.microblock_cnt = 1UL;
584 0 : ctx->pending_batch.txn_cnt = microblock->txn_cnt;
585 0 : }
586 0 : }
587 0 : } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
588 : /* The common case, from the net tile. The FEC resolver API does
589 : not present a prepare/commit model. If we get overrun between
590 : when the FEC resolver verifies the signature and when it stores
591 : the local copy, we could end up storing and retransmitting
592 : garbage. Instead we copy it locally, sadly, and only give it to
593 : the FEC resolver when we know it won't be overrun anymore. */
594 0 : uchar const * dcache_entry = fd_net_rx_translate_frag( &ctx->in[ in_idx ].net_rx, chunk, ctl, sz );
595 0 : ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
596 0 : FD_TEST( hdr_sz <= sz ); /* Should be ensured by the net tile */
597 0 : fd_shred_t const * shred = fd_shred_parse( dcache_entry+hdr_sz, sz-hdr_sz );
598 0 : if( FD_UNLIKELY( !shred ) ) {
599 0 : ctx->skip_frag = 1;
600 0 : return;
601 0 : };
602 :
603 : /* Drop unchained merkle shreds (if feature is active) */
604 0 : int is_unchained = !fd_shred_is_chained( fd_shred_type( shred->variant ) );
605 0 : if( FD_UNLIKELY( is_unchained && shred->slot >= ctx->features_activation->drop_unchained_merkle_shreds ) ) {
606 0 : ctx->metrics->shred_rejected_unchained_cnt++;
607 0 : ctx->skip_frag = 1;
608 0 : return;
609 0 : };
610 :
611 : /* all shreds in the same FEC set will have the same signature
612 : so we can round-robin shreds between the shred tiles based on
613 : just the signature without splitting individual FEC sets. */
614 0 : ulong sig = fd_ulong_load_8( shred->signature );
615 0 : if( FD_LIKELY( sig%ctx->round_robin_cnt!=ctx->round_robin_id ) ) {
616 0 : ctx->skip_frag = 1;
617 0 : return;
618 0 : }
619 0 : fd_memcpy( ctx->shred_buffer, dcache_entry+hdr_sz, sz-hdr_sz );
620 0 : ctx->shred_buffer_sz = sz-hdr_sz;
621 0 : }
622 0 : }
623 :
624 : static inline void
625 : send_shred( fd_shred_ctx_t * ctx,
626 : fd_stem_context_t * stem,
627 : fd_shred_t const * shred,
628 : fd_shred_dest_weighted_t const * dest,
629 0 : ulong tsorig ) {
630 :
631 0 : if( FD_UNLIKELY( !dest->ip4 ) ) return;
632 :
633 0 : uchar * packet = fd_chunk_to_laddr( ctx->net_out_mem, ctx->net_out_chunk );
634 :
635 0 : int is_data = fd_shred_is_data( fd_shred_type( shred->variant ) );
636 0 : fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)packet;
637 0 : *hdr = *( is_data ? ctx->data_shred_net_hdr : ctx->parity_shred_net_hdr );
638 :
639 0 : fd_ip4_hdr_t * ip4 = hdr->ip4;
640 0 : ip4->daddr = dest->ip4;
641 0 : ip4->net_id = fd_ushort_bswap( ctx->net_id++ );
642 0 : ip4->check = 0U;
643 0 : ip4->check = fd_ip4_hdr_check_fast( ip4 );
644 :
645 0 : hdr->udp->net_dport = fd_ushort_bswap( dest->port );
646 :
647 0 : ulong shred_sz = fd_ulong_if( is_data, FD_SHRED_MIN_SZ, FD_SHRED_MAX_SZ );
648 0 : #if FD_HAS_AVX
649 : /* We're going to copy this shred potentially a bunch of times without
650 : reading it again, and we'd rather not thrash our cache, so we want
651 : to use non-temporal writes here. We need to make sure we don't
652 : touch the cache line containing the network headers that we just
653 : wrote to though. We know the destination is 64 byte aligned. */
654 0 : FD_STATIC_ASSERT( sizeof(*hdr)<64UL, non_temporal );
655 : /* src[0:sizeof(hdrs)] is invalid, but now we want to copy
656 : dest[i]=src[i] for i>=sizeof(hdrs), so it simplifies the code. */
657 0 : uchar const * src = (uchar const *)((ulong)shred - sizeof(fd_ip4_udp_hdrs_t));
658 0 : memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), src+sizeof(fd_ip4_udp_hdrs_t), 64UL-sizeof(fd_ip4_udp_hdrs_t) );
659 :
660 0 : ulong end_offset = shred_sz + sizeof(fd_ip4_udp_hdrs_t);
661 0 : ulong i;
662 0 : for( i=64UL; end_offset-i<64UL; i+=64UL ) {
663 0 : # if FD_HAS_AVX512
664 0 : _mm512_stream_si512( (void *)(packet+i ), _mm512_loadu_si512( (void const *)(src+i ) ) );
665 : # else
666 0 : _mm256_stream_si256( (void *)(packet+i ), _mm256_loadu_si256( (void const *)(src+i ) ) );
667 0 : _mm256_stream_si256( (void *)(packet+i+32UL), _mm256_loadu_si256( (void const *)(src+i+32UL) ) );
668 0 : # endif
669 0 : }
670 0 : _mm_sfence();
671 0 : fd_memcpy( packet+i, src+i, end_offset-i ); /* Copy the last partial cache line */
672 :
673 : #else
674 : fd_memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), shred, shred_sz );
675 : #endif
676 :
677 0 : ulong pkt_sz = shred_sz + sizeof(fd_ip4_udp_hdrs_t);
678 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
679 0 : ulong sig = fd_disco_netmux_sig( dest->ip4, dest->port, dest->ip4, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) );
680 0 : ulong const chunk = ctx->net_out_chunk;
681 0 : fd_stem_publish( stem, NET_OUT_IDX, sig, chunk, pkt_sz, 0UL, tsorig, tspub );
682 0 : ctx->net_out_chunk = fd_dcache_compact_next( chunk, pkt_sz, ctx->net_out_chunk0, ctx->net_out_wmark );
683 0 : }
684 :
685 : static void
686 : after_frag( fd_shred_ctx_t * ctx,
687 : ulong in_idx,
688 : ulong seq,
689 : ulong sig,
690 : ulong sz,
691 : ulong tsorig,
692 : ulong _tspub,
693 0 : fd_stem_context_t * stem ) {
694 0 : (void)seq;
695 0 : (void)sz;
696 0 : (void)tsorig;
697 0 : (void)_tspub;
698 :
699 0 : if( FD_UNLIKELY( ctx->skip_frag ) ) return;
700 :
701 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_CONTACT ) ) {
702 0 : finalize_new_cluster_contact_info( ctx );
703 0 : return;
704 0 : }
705 :
706 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_STAKE ) ) {
707 0 : fd_stake_ci_stake_msg_fini( ctx->stake_ci );
708 0 : return;
709 0 : }
710 :
711 0 : if( FD_UNLIKELY( (ctx->in_kind[ in_idx ]==IN_KIND_POH) & (ctx->send_fec_set_cnt==0UL) ) ) {
712 : /* Entry from PoH that didn't trigger a new FEC set to be made */
713 0 : return;
714 0 : }
715 :
716 0 : fd_bmtree_node_t out_merkle_root = { 0 };
717 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_REPAIR ) ) {
718 0 : FD_MCNT_INC( SHRED, FORCE_COMPLETE_REQUEST, 1UL );
719 0 : fd_ed25519_sig_t const * shred_sig = (fd_ed25519_sig_t const *)fd_type_pun( ctx->shred_buffer );
720 0 : if( FD_UNLIKELY( fd_fec_resolver_done_contains( ctx->resolver, shred_sig ) ) ) {
721 : /* This is a FEC completion message from the repair tile. We need
722 : to make sure that we don't force complete something that's just
723 : been completed. */
724 0 : FD_MCNT_INC( SHRED, FORCE_COMPLETE_FAILURE, 1UL );
725 0 : return;
726 0 : }
727 :
728 0 : uint last_idx = fd_disco_repair_shred_sig_last_shred_idx( sig );
729 0 : uchar buf_last_shred[FD_SHRED_MIN_SZ];
730 0 : int rv = fd_fec_resolver_shred_query( ctx->resolver, shred_sig, last_idx, buf_last_shred );
731 0 : if( FD_UNLIKELY( rv != FD_FEC_RESOLVER_SHRED_OKAY ) ) {
732 :
733 : /* We will hit this case if FEC is no longer in curr_map, or if
734 : the shred signature is invalid, which is okay.
735 :
736 : There's something of a race condition here. It's possible (but
737 : very unlikely) that between when the repair tile observed the
738 : FEC set needed to be force completed and now, the FEC set was
739 : completed, and then so many additional FEC sets were completed
740 : that it fell off the end of the done list. In that case
741 : fd_fec_resolver_done_contains would have returned false, but
742 : fd_fec_resolver_shred_query will not return OKAY, which means
743 : we'll end up in this block of code. If the FEC set was
744 : completed, then there's nothing we need to do. If it was
745 : spilled, then we'll need to re-repair all the shreds in the FEC
746 : set, but it's not fatal. */
747 :
748 0 : FD_MCNT_INC( SHRED, FORCE_COMPLETE_FAILURE, 1UL );
749 0 : return;
750 0 : }
751 0 : fd_shred_t * out_last_shred = (fd_shred_t *)fd_type_pun( buf_last_shred );
752 :
753 0 : fd_fec_set_t const * out_fec_set[1];
754 0 : rv = fd_fec_resolver_force_complete( ctx->resolver, out_last_shred, out_fec_set, &out_merkle_root );
755 0 : if( FD_UNLIKELY( rv != FD_FEC_RESOLVER_SHRED_COMPLETES ) ) {
756 0 : FD_LOG_WARNING(( "Shred tile %lu cannot force complete the slot %lu fec_set_idx %u %s", ctx->round_robin_id, out_last_shred->slot, out_last_shred->fec_set_idx, FD_BASE58_ENC_32_ALLOCA( shred_sig ) ));
757 0 : FD_MCNT_INC( SHRED, FORCE_COMPLETE_FAILURE, 1UL );
758 0 : return;
759 0 : }
760 0 : FD_MCNT_INC( SHRED, FORCE_COMPLETE_SUCCESS, 1UL );
761 0 : FD_TEST( ctx->fec_sets <= *out_fec_set );
762 0 : ctx->send_fec_set_idx[ 0UL ] = (ulong)(*out_fec_set - ctx->fec_sets);
763 0 : ctx->send_fec_set_cnt = 1UL;
764 0 : ctx->shredded_txn_cnt = 0UL;
765 0 : }
766 :
767 0 : ulong fanout = 200UL; /* Default Agave's DATA_PLANE_FANOUT = 200UL */
768 :
769 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
770 0 : uchar * shred_buffer = ctx->shred_buffer;
771 0 : ulong shred_buffer_sz = ctx->shred_buffer_sz;
772 :
773 0 : fd_shred_t const * shred = fd_shred_parse( shred_buffer, shred_buffer_sz );
774 :
775 0 : if( FD_UNLIKELY( !shred ) ) { ctx->metrics->shred_processing_result[ 1 ]++; return; }
776 :
777 0 : fd_epoch_leaders_t const * lsched = fd_stake_ci_get_lsched_for_slot( ctx->stake_ci, shred->slot );
778 0 : if( FD_UNLIKELY( !lsched ) ) { ctx->metrics->shred_processing_result[ 0 ]++; return; }
779 :
780 0 : fd_pubkey_t const * slot_leader = fd_epoch_leaders_get( lsched, shred->slot );
781 0 : if( FD_UNLIKELY( !slot_leader ) ) { ctx->metrics->shred_processing_result[ 0 ]++; return; } /* Count this as bad slot too */
782 :
783 0 : fd_fec_set_t const * out_fec_set[1];
784 0 : fd_shred_t const * out_shred[1];
785 :
786 0 : long add_shred_timing = -fd_tickcount();
787 0 : int rv = fd_fec_resolver_add_shred( ctx->resolver, shred, shred_buffer_sz, slot_leader->uc, out_fec_set, out_shred, &out_merkle_root );
788 0 : add_shred_timing += fd_tickcount();
789 :
790 0 : fd_histf_sample( ctx->metrics->add_shred_timing, (ulong)add_shred_timing );
791 0 : ctx->metrics->shred_processing_result[ rv + FD_FEC_RESOLVER_ADD_SHRED_RETVAL_OFF+FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT ]++;
792 :
793 : /* Fanout is subject to feature activation. The code below replicates
794 : Agave's get_data_plane_fanout() in turbine/src/cluster_nodes.rs
795 : on 2025-03-25. Default Agave's DATA_PLANE_FANOUT = 200UL.
796 : TODO once the experiments are disabled, consider removing these
797 : fanout variations from the code. */
798 0 : if( FD_LIKELY( shred->slot >= ctx->features_activation->disable_turbine_fanout_experiments ) ) {
799 0 : fanout = 200UL;
800 0 : } else {
801 0 : if( FD_LIKELY( shred->slot >= ctx->features_activation->enable_turbine_extended_fanout_experiments ) ) {
802 0 : switch( shred->slot % 359 ) {
803 0 : case 11UL: fanout = 1152UL; break;
804 0 : case 61UL: fanout = 1280UL; break;
805 0 : case 111UL: fanout = 1024UL; break;
806 0 : case 161UL: fanout = 1408UL; break;
807 0 : case 211UL: fanout = 896UL; break;
808 0 : case 261UL: fanout = 1536UL; break;
809 0 : case 311UL: fanout = 768UL; break;
810 0 : default : fanout = 200UL;
811 0 : }
812 0 : } else {
813 0 : switch( shred->slot % 359 ) {
814 0 : case 11UL: fanout = 64UL; break;
815 0 : case 61UL: fanout = 768UL; break;
816 0 : case 111UL: fanout = 128UL; break;
817 0 : case 161UL: fanout = 640UL; break;
818 0 : case 211UL: fanout = 256UL; break;
819 0 : case 261UL: fanout = 512UL; break;
820 0 : case 311UL: fanout = 384UL; break;
821 0 : default : fanout = 200UL;
822 0 : }
823 0 : }
824 0 : }
825 :
826 0 : if( (rv==FD_FEC_RESOLVER_SHRED_OKAY) | (rv==FD_FEC_RESOLVER_SHRED_COMPLETES) ) {
827 0 : if( FD_LIKELY( fd_disco_netmux_sig_proto( sig ) != DST_PROTO_REPAIR ) ) {
828 : /* Relay this shred */
829 0 : ulong max_dest_cnt[1];
830 0 : do {
831 : /* If we've validated the shred and it COMPLETES but we can't
832 : compute the destination for whatever reason, don't forward
833 : the shred, but still send it to the blockstore. */
834 0 : fd_shred_dest_t * sdest = fd_stake_ci_get_sdest_for_slot( ctx->stake_ci, shred->slot );
835 0 : if( FD_UNLIKELY( !sdest ) ) break;
836 0 : fd_shred_dest_idx_t * dests = fd_shred_dest_compute_children( sdest, &shred, 1UL, ctx->scratchpad_dests, 1UL, fanout, fanout, max_dest_cnt );
837 0 : if( FD_UNLIKELY( !dests ) ) break;
838 :
839 0 : for( ulong i=0UL; i<ctx->adtl_dests_retransmit_cnt; i++ ) send_shred( ctx, stem, *out_shred, ctx->adtl_dests_retransmit+i, ctx->tsorig );
840 0 : for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, stem, *out_shred, fd_shred_dest_idx_to_dest( sdest, dests[ j ] ), ctx->tsorig );
841 0 : } while( 0 );
842 0 : }
843 :
844 0 : if( FD_LIKELY( ctx->repair_out_idx!=ULONG_MAX ) ) { /* Only send to repair in full Firedancer */
845 :
846 : /* Construct the sig from the shred. */
847 :
848 0 : int is_code = fd_shred_is_code( fd_shred_type( shred->variant ) );
849 0 : uint shred_idx_or_data_cnt = shred->idx;
850 0 : int completes = 0;
851 0 : if( FD_LIKELY( is_code ) ) shred_idx_or_data_cnt = shred->code.data_cnt; /* optimize for code_cnt >= data_cnt */
852 0 : else completes = shred->data.flags & ( FD_SHRED_DATA_FLAG_SLOT_COMPLETE | FD_SHRED_DATA_FLAG_DATA_COMPLETE );
853 0 : ulong sig = fd_disco_shred_repair_shred_sig( !!completes, shred->slot, shred->fec_set_idx, is_code, shred_idx_or_data_cnt );
854 :
855 : /* Copy the shred header into the frag and publish. */
856 :
857 0 : ulong sz = fd_shred_header_sz( shred->variant );
858 0 : fd_memcpy( fd_chunk_to_laddr( ctx->repair_out_mem, ctx->repair_out_chunk ), shred, sz );
859 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
860 0 : fd_stem_publish( stem, ctx->repair_out_idx, sig, ctx->repair_out_chunk, sz, 0UL, ctx->tsorig, tspub );
861 0 : ctx->repair_out_chunk = fd_dcache_compact_next( ctx->repair_out_chunk, sz, ctx->repair_out_chunk0, ctx->repair_out_wmark );
862 0 : }
863 0 : }
864 0 : if( FD_LIKELY( rv!=FD_FEC_RESOLVER_SHRED_COMPLETES ) ) return;
865 :
866 0 : FD_TEST( ctx->fec_sets <= *out_fec_set );
867 0 : ctx->send_fec_set_idx[ 0UL ] = (ulong)(*out_fec_set - ctx->fec_sets);
868 0 : ctx->send_fec_set_cnt = 1UL;
869 0 : ctx->shredded_txn_cnt = 0UL;
870 0 : }
871 :
872 0 : if( FD_UNLIKELY( ctx->send_fec_set_cnt==0UL ) ) return;
873 :
874 : /* Try to distribute shredded txn count across the fec sets.
875 : This is an approximation, but it is acceptable. */
876 0 : ulong shredded_txn_cnt_per_fec_set = ctx->shredded_txn_cnt / ctx->send_fec_set_cnt;
877 0 : ulong shredded_txn_cnt_remain = ctx->shredded_txn_cnt - shredded_txn_cnt_per_fec_set * ctx->send_fec_set_cnt;
878 0 : ulong shredded_txn_cnt_last_fec_set = shredded_txn_cnt_per_fec_set + shredded_txn_cnt_remain;
879 :
880 : /* If this shred completes a FEC set or is part of a microblock from
881 : pack (ie. we're leader), we now have a full FEC set: so we notify
882 : repair and insert into the blockstore, as well as retransmit. */
883 :
884 0 : for( ulong fset_k=0; fset_k<ctx->send_fec_set_cnt; fset_k++ ) {
885 :
886 0 : fd_fec_set_t * set = ctx->fec_sets + ctx->send_fec_set_idx[ fset_k ];
887 0 : fd_shred34_t * s34 = ctx->shred34 + 4UL*ctx->send_fec_set_idx[ fset_k ];
888 :
889 0 : s34[ 0 ].shred_cnt = fd_ulong_min( set->data_shred_cnt, 34UL );
890 0 : s34[ 1 ].shred_cnt = set->data_shred_cnt - fd_ulong_min( set->data_shred_cnt, 34UL );
891 0 : s34[ 2 ].shred_cnt = fd_ulong_min( set->parity_shred_cnt, 34UL );
892 0 : s34[ 3 ].shred_cnt = set->parity_shred_cnt - fd_ulong_min( set->parity_shred_cnt, 34UL );
893 :
894 0 : ulong s34_cnt = 2UL + !!(s34[ 1 ].shred_cnt) + !!(s34[ 3 ].shred_cnt);
895 0 : ulong txn_per_s34 = fd_ulong_if( fset_k<( ctx->send_fec_set_cnt - 1UL ), shredded_txn_cnt_per_fec_set, shredded_txn_cnt_last_fec_set ) / s34_cnt;
896 :
897 : /* Attribute the transactions evenly to the non-empty shred34s */
898 0 : for( ulong j=0UL; j<4UL; j++ ) s34[ j ].est_txn_cnt = fd_ulong_if( s34[ j ].shred_cnt>0UL, txn_per_s34, 0UL );
899 :
900 : /* Add whatever is left to the last shred34 */
901 0 : s34[ fd_ulong_if( s34[ 3 ].shred_cnt>0UL, 3, 2 ) ].est_txn_cnt += ctx->shredded_txn_cnt - txn_per_s34*s34_cnt;
902 :
903 : /* Set the sz field so that metrics are more accurate. */
904 0 : ulong sz0 = sizeof(fd_shred34_t) - (34UL - s34[ 0 ].shred_cnt)*FD_SHRED_MAX_SZ;
905 0 : ulong sz1 = sizeof(fd_shred34_t) - (34UL - s34[ 1 ].shred_cnt)*FD_SHRED_MAX_SZ;
906 0 : ulong sz2 = sizeof(fd_shred34_t) - (34UL - s34[ 2 ].shred_cnt)*FD_SHRED_MAX_SZ;
907 0 : ulong sz3 = sizeof(fd_shred34_t) - (34UL - s34[ 3 ].shred_cnt)*FD_SHRED_MAX_SZ;
908 :
909 0 : fd_shred_t const * last = (fd_shred_t const *)fd_type_pun_const( set->data_shreds[ set->data_shred_cnt - 1 ] );
910 :
911 : /* Compute merkle root and chained merkle root. */
912 :
913 0 : if( FD_LIKELY( ctx->store ) ) { /* firedancer-only */
914 :
915 : /* Insert shreds into the store. We do this regardless of whether
916 : we are leader. */
917 :
918 : /* See top-level documentation in fd_store.h under CONCURRENCY to
919 : understand why it is safe to use a Store read vs. write lock in
920 : Shred tile. */
921 :
922 0 : long shacq_start, shacq_end, shrel_end;
923 0 : FD_STORE_SHACQ_TIMED( ctx->store, shacq_start, shacq_end );
924 0 : fd_store_fec_t * fec = fd_store_insert( ctx->store, (uint)ctx->round_robin_id, (fd_hash_t *)fd_type_pun( &out_merkle_root ) );
925 0 : FD_STORE_SHREL_TIMED( ctx->store, shrel_end );
926 :
927 0 : for( ulong i=0UL; i<set->data_shred_cnt; i++ ) {
928 0 : fd_shred_t * data_shred = (fd_shred_t *)fd_type_pun( set->data_shreds[i] );
929 0 : ulong payload_sz = fd_shred_payload_sz( data_shred );
930 0 : if( FD_UNLIKELY( fec->data_sz + payload_sz > FD_STORE_DATA_MAX ) ) {
931 :
932 : /* This code is only reachable if shred tile has completed the
933 : FEC set, which implies it was able to validate it, yet
934 : somehow the total payload sz of this FEC set exceeds the
935 : maximum payload sz. This indicates either a serious bug or
936 : shred tile is compromised so log_crit. */
937 :
938 0 : FD_LOG_CRIT(( "Shred tile %lu: completed FEC set %lu %u data_sz: %lu exceeds FD_STORE_DATA_MAX: %lu. Ignoring FEC set.", ctx->round_robin_id, data_shred->slot, data_shred->fec_set_idx, fec->data_sz + payload_sz, FD_STORE_DATA_MAX ));
939 0 : }
940 0 : fd_memcpy( fec->data + fec->data_sz, fd_shred_data_payload( data_shred ), payload_sz );
941 0 : fec->data_sz += payload_sz;
942 0 : }
943 :
944 : /* It's safe to memcpy the FEC payload outside of the shared-lock,
945 : because the fec object ptr is guaranteed to be valid. It is
946 : not possible for a store_publish to free/invalidate the fec
947 : object during the data memcpy, because the free can only happen
948 : after the fec is linked to its parent, which happens in the
949 : repair tile, and crucially, only after we call stem publish in
950 : this tile. Copying outside the shared lock scope also means
951 : that we can lower the duration for which the shared lock is
952 : held, and enables replay to acquire the exclusive lock and
953 : avoid getting starved. */
954 :
955 0 : fd_histf_sample( ctx->metrics->store_insert_wait, (ulong)fd_long_max(shacq_end - shacq_start, 0) );
956 0 : fd_histf_sample( ctx->metrics->store_insert_work, (ulong)fd_long_max(shrel_end - shacq_end, 0) );
957 0 : }
958 :
959 0 : if( FD_LIKELY( ctx->repair_out_idx!=ULONG_MAX ) ) { /* firedancer-only */
960 :
961 : /* Additionally, publish a frag to notify repair that the FEC set
962 : is complete. Note the ordering wrt store shred insertion above
963 : is intentional: shreds are inserted into the store before
964 : notifying repair. This is because the replay tile is downstream
965 : of repair, and replay assumes the shreds are already in the
966 : store when repair notifies it that the FEC set is complete, and
967 : we don't know whether shred will finish inserting into store
968 : first or repair will finish validating the FEC set first. The
969 : header and merkle root of the last shred in the FEC set are
970 : sent as part of this frag. */
971 :
972 0 : ulong sig = fd_disco_shred_repair_fec_sig( last->slot, last->fec_set_idx, (uint)set->data_shred_cnt, last->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE, last->data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE );
973 0 : uchar * chunk = fd_chunk_to_laddr( ctx->repair_out_mem, ctx->repair_out_chunk );
974 0 : memcpy( chunk, last, FD_SHRED_DATA_HEADER_SZ );
975 0 : memcpy( chunk+FD_SHRED_DATA_HEADER_SZ, out_merkle_root.hash, FD_SHRED_MERKLE_ROOT_SZ );
976 0 : memcpy( chunk+FD_SHRED_DATA_HEADER_SZ + FD_SHRED_MERKLE_ROOT_SZ, (uchar *)last + fd_shred_chain_off( last->variant ), FD_SHRED_MERKLE_ROOT_SZ );
977 0 : ulong sz = FD_SHRED_DATA_HEADER_SZ + FD_SHRED_MERKLE_ROOT_SZ * 2;
978 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
979 0 : fd_stem_publish( stem, ctx->repair_out_idx, sig, ctx->repair_out_chunk, sz, 0UL, ctx->tsorig, tspub );
980 0 : ctx->repair_out_chunk = fd_dcache_compact_next( ctx->repair_out_chunk, sz, ctx->repair_out_chunk0, ctx->repair_out_wmark );
981 :
982 0 : } else if( FD_UNLIKELY( ctx->store_out_idx != ULONG_MAX ) ) { /* frankendancer-only */
983 :
984 : /* Send to the blockstore, skipping any empty shred34_t s. */
985 :
986 0 : ulong new_sig = ctx->in_kind[ in_idx ]!=IN_KIND_NET; /* sig==0 means the store tile will do extra checks */
987 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
988 0 : fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+0UL ), sz0, 0UL, ctx->tsorig, tspub );
989 0 : if( FD_UNLIKELY( s34[ 1 ].shred_cnt ) )
990 0 : fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+1UL ), sz1, 0UL, ctx->tsorig, tspub );
991 0 : if( FD_UNLIKELY( s34[ 2 ].shred_cnt ) )
992 0 : fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+2UL), sz2, 0UL, ctx->tsorig, tspub );
993 0 : if( FD_UNLIKELY( s34[ 3 ].shred_cnt ) )
994 0 : fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+3UL ), sz3, 0UL, ctx->tsorig, tspub );
995 0 : }
996 :
997 : /* Compute all the destinations for all the new shreds */
998 :
999 0 : fd_shred_t const * new_shreds[ FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX ];
1000 0 : ulong k=0UL;
1001 0 : for( ulong i=0UL; i<set->data_shred_cnt; i++ )
1002 0 : if( !d_rcvd_test( set->data_shred_rcvd, i ) ) new_shreds[ k++ ] = (fd_shred_t const *)set->data_shreds [ i ];
1003 0 : for( ulong i=0UL; i<set->parity_shred_cnt; i++ )
1004 0 : if( !p_rcvd_test( set->parity_shred_rcvd, i ) ) new_shreds[ k++ ] = (fd_shred_t const *)set->parity_shreds[ i ];
1005 :
1006 0 : if( FD_UNLIKELY( !k ) ) return;
1007 0 : fd_shred_dest_t * sdest = fd_stake_ci_get_sdest_for_slot( ctx->stake_ci, new_shreds[ 0 ]->slot );
1008 0 : if( FD_UNLIKELY( !sdest ) ) return;
1009 :
1010 0 : ulong out_stride;
1011 0 : ulong max_dest_cnt[1];
1012 0 : fd_shred_dest_idx_t * dests;
1013 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
1014 0 : out_stride = k;
1015 : /* In the case of feature activation, the fanout used below is
1016 : the same as the one calculated/modified previously at the
1017 : begining of after_frag() for IN_KIND_NET in this slot. */
1018 0 : dests = fd_shred_dest_compute_children( sdest, new_shreds, k, ctx->scratchpad_dests, k, fanout, fanout, max_dest_cnt );
1019 0 : } else {
1020 0 : out_stride = 1UL;
1021 0 : *max_dest_cnt = 1UL;
1022 0 : dests = fd_shred_dest_compute_first ( sdest, new_shreds, k, ctx->scratchpad_dests );
1023 0 : for( ulong i=0UL; i<k; i++ ) {
1024 0 : for( ulong j=0UL; j<ctx->adtl_dests_leader_cnt; j++ ) send_shred( ctx, stem, new_shreds[ i ], ctx->adtl_dests_leader+j, ctx->tsorig );
1025 0 : }
1026 0 : }
1027 0 : if( FD_UNLIKELY( !dests ) ) return;
1028 :
1029 : /* Send only the ones we didn't receive. */
1030 0 : for( ulong i=0UL; i<k; i++ ) {
1031 0 : for( ulong j=0UL; j<ctx->adtl_dests_retransmit_cnt; j++ ) send_shred( ctx, stem, new_shreds[ i ], ctx->adtl_dests_retransmit+j, ctx->tsorig );
1032 0 : for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, stem, new_shreds[ i ], fd_shred_dest_idx_to_dest( sdest, dests[ j*out_stride+i ]), ctx->tsorig );
1033 0 : }
1034 0 : }
1035 0 : }
1036 :
1037 : static void
1038 : privileged_init( fd_topo_t * topo,
1039 0 : fd_topo_tile_t * tile ) {
1040 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1041 0 : FD_TEST( scratch!=NULL );
1042 :
1043 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1044 0 : fd_shred_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_shred_ctx_t ), sizeof( fd_shred_ctx_t ) );
1045 :
1046 0 : if( FD_UNLIKELY( !strcmp( tile->shred.identity_key_path, "" ) ) )
1047 0 : FD_LOG_ERR(( "identity_key_path not set" ));
1048 :
1049 0 : ctx->identity_key[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->shred.identity_key_path, /* pubkey only: */ 1 ) );
1050 0 : }
1051 :
1052 : static void
1053 : fd_shred_signer( void * signer_ctx,
1054 : uchar signature[ static 64 ],
1055 0 : uchar const merkle_root[ static 32 ] ) {
1056 0 : fd_keyguard_client_sign( signer_ctx, signature, merkle_root, 32UL, FD_KEYGUARD_SIGN_TYPE_ED25519 );
1057 0 : }
1058 :
1059 : static void
1060 : unprivileged_init( fd_topo_t * topo,
1061 0 : fd_topo_tile_t * tile ) {
1062 :
1063 0 : FD_TEST( 0==strcmp( topo->links[tile->out_link_id[ NET_OUT_IDX ]].name, "shred_net" ) );
1064 0 : FD_TEST( 0==strcmp( topo->links[tile->out_link_id[ SIGN_OUT_IDX ]].name, "shred_sign" ) );
1065 :
1066 0 : if( FD_UNLIKELY( !tile->out_cnt ) )
1067 0 : FD_LOG_ERR(( "shred tile has no primary output link" ));
1068 :
1069 0 : ulong shred_store_mcache_depth = tile->shred.depth;
1070 0 : if( topo->links[ tile->out_link_id[ 0 ] ].depth != shred_store_mcache_depth )
1071 0 : FD_LOG_ERR(( "shred tile out depths are not equal %lu %lu",
1072 0 : topo->links[ tile->out_link_id[ 0 ] ].depth, shred_store_mcache_depth ));
1073 :
1074 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1075 0 : FD_TEST( scratch!=NULL );
1076 :
1077 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1078 0 : fd_shred_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_shred_ctx_t ), sizeof( fd_shred_ctx_t ) );
1079 :
1080 0 : ctx->round_robin_cnt = fd_topo_tile_name_cnt( topo, tile->name );
1081 0 : ctx->round_robin_id = tile->kind_id;
1082 0 : ctx->batch_cnt = 0UL;
1083 0 : ctx->slot = ULONG_MAX;
1084 :
1085 : /* If the default partial_depth is ever changed, correspondingly
1086 : change the size of the fd_fec_intra_pool in fd_fec_repair. */
1087 0 : ulong fec_resolver_footprint = fd_fec_resolver_footprint( tile->shred.fec_resolver_depth, 1UL, shred_store_mcache_depth,
1088 0 : 128UL * tile->shred.fec_resolver_depth );
1089 0 : ulong fec_set_cnt = shred_store_mcache_depth + tile->shred.fec_resolver_depth + 4UL;
1090 0 : ulong fec_sets_required_sz = fec_set_cnt*DCACHE_ENTRIES_PER_FEC_SET*sizeof(fd_shred34_t);
1091 :
1092 0 : void * fec_sets_shmem = NULL;
1093 0 : ctx->repair_out_idx = fd_topo_find_tile_out_link( topo, tile, "shred_repair", ctx->round_robin_id );
1094 0 : ctx->store_out_idx = fd_topo_find_tile_out_link( topo, tile, "shred_store", ctx->round_robin_id );
1095 0 : if( FD_LIKELY( ctx->repair_out_idx!=ULONG_MAX ) ) { /* firedancer-only */
1096 0 : fd_topo_link_t * repair_out = &topo->links[ tile->out_link_id[ ctx->repair_out_idx ] ];
1097 0 : ctx->repair_out_mem = topo->workspaces[ topo->objs[ repair_out->dcache_obj_id ].wksp_id ].wksp;
1098 0 : ctx->repair_out_chunk0 = fd_dcache_compact_chunk0( ctx->repair_out_mem, repair_out->dcache );
1099 0 : ctx->repair_out_wmark = fd_dcache_compact_wmark ( ctx->repair_out_mem, repair_out->dcache, repair_out->mtu );
1100 0 : ctx->repair_out_chunk = ctx->repair_out_chunk0;
1101 0 : FD_TEST( fd_dcache_compact_is_safe( ctx->repair_out_mem, repair_out->dcache, repair_out->mtu, repair_out->depth ) );
1102 0 : ulong fec_sets_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "fec_sets" );
1103 0 : if( FD_UNLIKELY( fec_sets_obj_id == ULONG_MAX ) ) FD_LOG_ERR(( "invalid firedancer topo" ));
1104 0 : fd_topo_obj_t const * obj = &topo->objs[ fec_sets_obj_id ];
1105 0 : if( FD_UNLIKELY( obj->footprint<(fec_sets_required_sz*ctx->round_robin_cnt) ) ) {
1106 0 : FD_LOG_ERR(( "fec_sets wksp obj too small. It is %lu bytes but must be at least %lu bytes. ",
1107 0 : obj->footprint,
1108 0 : fec_sets_required_sz ));
1109 0 : }
1110 0 : fec_sets_shmem = (uchar *)fd_topo_obj_laddr( topo, fec_sets_obj_id ) + (ctx->round_robin_id * fec_sets_required_sz);
1111 0 : } else if ( FD_LIKELY( ctx->store_out_idx!=ULONG_MAX ) ) { /* frankendancer-only */
1112 0 : FD_TEST( 0==strcmp( topo->links[tile->out_link_id[ ctx->store_out_idx ]].name, "shred_store" ) );
1113 0 : fec_sets_shmem = topo->links[ tile->out_link_id[ ctx->store_out_idx ] ].dcache;
1114 0 : if( FD_UNLIKELY( fd_dcache_data_sz( fec_sets_shmem )<fec_sets_required_sz ) ) {
1115 0 : FD_LOG_ERR(( "shred_store dcache too small. It is %lu bytes but must be at least %lu bytes. ",
1116 0 : fd_dcache_data_sz( fec_sets_shmem ),
1117 0 : fec_sets_required_sz ));
1118 0 : }
1119 0 : }
1120 :
1121 0 : if( FD_UNLIKELY( !tile->shred.fec_resolver_depth ) ) FD_LOG_ERR(( "fec_resolver_depth not set" ));
1122 0 : if( FD_UNLIKELY( !tile->shred.shred_listen_port ) ) FD_LOG_ERR(( "shred_listen_port not set" ));
1123 :
1124 0 : void * _stake_ci = FD_SCRATCH_ALLOC_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
1125 0 : void * _resolver = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_resolver_align(), fec_resolver_footprint );
1126 0 : void * _shredder = FD_SCRATCH_ALLOC_APPEND( l, fd_shredder_align(), fd_shredder_footprint() );
1127 0 : void * _fec_sets = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_fec_set_t), sizeof(fd_fec_set_t)*fec_set_cnt );
1128 :
1129 0 : fd_fec_set_t * fec_sets = (fd_fec_set_t *)_fec_sets;
1130 0 : fd_shred34_t * shred34 = (fd_shred34_t *)fec_sets_shmem;
1131 :
1132 0 : for( ulong i=0UL; i<fec_set_cnt; i++ ) {
1133 0 : fd_shred34_t * p34_base = shred34 + i*DCACHE_ENTRIES_PER_FEC_SET;
1134 0 : for( ulong k=0UL; k<DCACHE_ENTRIES_PER_FEC_SET; k++ ) {
1135 0 : fd_shred34_t * p34 = p34_base + k;
1136 :
1137 0 : p34->stride = (ulong)p34->pkts[1].buffer - (ulong)p34->pkts[0].buffer;
1138 0 : p34->offset = (ulong)p34->pkts[0].buffer - (ulong)p34;
1139 0 : p34->shred_sz = fd_ulong_if( k<2UL, 1203UL, 1228UL );
1140 0 : }
1141 :
1142 0 : uchar ** data_shred = fec_sets[ i ].data_shreds;
1143 0 : uchar ** parity_shred = fec_sets[ i ].parity_shreds;
1144 0 : for( ulong j=0UL; j<FD_REEDSOL_DATA_SHREDS_MAX; j++ ) data_shred [ j ] = p34_base[ j/34UL ].pkts[ j%34UL ].buffer;
1145 0 : for( ulong j=0UL; j<FD_REEDSOL_PARITY_SHREDS_MAX; j++ ) parity_shred[ j ] = p34_base[ 2UL + j/34UL ].pkts[ j%34UL ].buffer;
1146 0 : }
1147 :
1148 0 : #define NONNULL( x ) (__extension__({ \
1149 0 : __typeof__((x)) __x = (x); \
1150 0 : if( FD_UNLIKELY( !__x ) ) FD_LOG_ERR(( #x " was unexpectedly NULL" )); \
1151 0 : __x; }))
1152 :
1153 0 : ulong expected_shred_version = tile->shred.expected_shred_version;
1154 0 : if( FD_LIKELY( !expected_shred_version ) ) {
1155 0 : ulong busy_obj_id = fd_pod_query_ulong( topo->props, "poh_shred", ULONG_MAX );
1156 0 : FD_TEST( busy_obj_id!=ULONG_MAX );
1157 0 : ulong * gossip_shred_version = fd_fseq_join( fd_topo_obj_laddr( topo, busy_obj_id ) );
1158 0 : FD_LOG_INFO(( "Waiting for shred version to be determined via gossip." ));
1159 0 : do {
1160 0 : expected_shred_version = FD_VOLATILE_CONST( *gossip_shred_version );
1161 0 : } while( expected_shred_version==ULONG_MAX );
1162 0 : }
1163 :
1164 0 : if( FD_UNLIKELY( expected_shred_version > USHORT_MAX ) ) FD_LOG_ERR(( "invalid shred version %lu", expected_shred_version ));
1165 0 : FD_LOG_INFO(( "Using shred version %hu", (ushort)expected_shred_version ));
1166 :
1167 0 : ctx->keyswitch = fd_keyswitch_join( fd_topo_obj_laddr( topo, tile->keyswitch_obj_id ) );
1168 0 : FD_TEST( ctx->keyswitch );
1169 :
1170 : /* populate ctx */
1171 0 : ulong sign_in_idx = fd_topo_find_tile_in_link( topo, tile, "sign_shred", tile->kind_id );
1172 0 : FD_TEST( sign_in_idx!=ULONG_MAX );
1173 0 : fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id[ sign_in_idx ] ];
1174 0 : fd_topo_link_t * sign_out = &topo->links[ tile->out_link_id[ SIGN_OUT_IDX ] ];
1175 0 : NONNULL( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client,
1176 0 : sign_out->mcache,
1177 0 : sign_out->dcache,
1178 0 : sign_in->mcache,
1179 0 : sign_in->dcache ) ) );
1180 :
1181 0 : ulong shred_limit = fd_ulong_if( tile->shred.larger_shred_limits_per_block, 32UL*32UL*1024UL, 32UL*1024UL );
1182 0 : fd_fec_set_t * resolver_sets = fec_sets + (shred_store_mcache_depth+1UL)/2UL + 1UL;
1183 0 : ctx->shredder = NONNULL( fd_shredder_join ( fd_shredder_new ( _shredder, fd_shred_signer, ctx->keyguard_client, (ushort)expected_shred_version ) ) );
1184 0 : ctx->resolver = NONNULL( fd_fec_resolver_join ( fd_fec_resolver_new ( _resolver,
1185 0 : fd_shred_signer, ctx->keyguard_client,
1186 0 : tile->shred.fec_resolver_depth, 1UL,
1187 0 : (shred_store_mcache_depth+3UL)/2UL,
1188 0 : 128UL * tile->shred.fec_resolver_depth, resolver_sets,
1189 0 : (ushort)expected_shred_version,
1190 0 : shred_limit ) ) );
1191 :
1192 0 : ctx->shred34 = shred34;
1193 0 : ctx->fec_sets = fec_sets;
1194 :
1195 0 : ctx->stake_ci = fd_stake_ci_join( fd_stake_ci_new( _stake_ci, ctx->identity_key ) );
1196 :
1197 0 : ctx->net_id = (ushort)0;
1198 :
1199 0 : fd_ip4_udp_hdr_init( ctx->data_shred_net_hdr, FD_SHRED_MIN_SZ, 0, tile->shred.shred_listen_port );
1200 0 : fd_ip4_udp_hdr_init( ctx->parity_shred_net_hdr, FD_SHRED_MAX_SZ, 0, tile->shred.shred_listen_port );
1201 :
1202 0 : ctx->adtl_dests_retransmit_cnt = tile->shred.adtl_dests_retransmit_cnt;
1203 0 : for( ulong i=0UL; i<ctx->adtl_dests_retransmit_cnt; i++) {
1204 0 : ctx->adtl_dests_retransmit[ i ].ip4 = tile->shred.adtl_dests_retransmit[ i ].ip;
1205 0 : ctx->adtl_dests_retransmit[ i ].port = tile->shred.adtl_dests_retransmit[ i ].port;
1206 0 : }
1207 0 : ctx->adtl_dests_leader_cnt = tile->shred.adtl_dests_leader_cnt;
1208 0 : for( ulong i=0UL; i<ctx->adtl_dests_leader_cnt; i++) {
1209 0 : ctx->adtl_dests_leader[i].ip4 = tile->shred.adtl_dests_leader[i].ip;
1210 0 : ctx->adtl_dests_leader[i].port = tile->shred.adtl_dests_leader[i].port;
1211 0 : }
1212 :
1213 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
1214 0 : fd_topo_link_t const * link = &topo->links[ tile->in_link_id[ i ] ];
1215 0 : fd_topo_wksp_t const * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
1216 :
1217 0 : if( FD_LIKELY( !strcmp( link->name, "net_shred" ) ) ) { ctx->in_kind[ i ] = IN_KIND_NET;
1218 0 : fd_net_rx_bounds_init( &ctx->in[ i ].net_rx, link->dcache );
1219 0 : continue; /* only net_rx needs to be set in this case. */ }
1220 0 : else if( FD_LIKELY( !strcmp( link->name, "poh_shred" ) ) ) ctx->in_kind[ i ] = IN_KIND_POH;
1221 0 : else if( FD_LIKELY( !strcmp( link->name, "stake_out" ) ) ) ctx->in_kind[ i ] = IN_KIND_STAKE;
1222 0 : else if( FD_LIKELY( !strcmp( link->name, "crds_shred" ) ) ) ctx->in_kind[ i ] = IN_KIND_CONTACT;
1223 0 : else if( FD_LIKELY( !strcmp( link->name, "sign_shred" ) ) ) ctx->in_kind[ i ] = IN_KIND_SIGN;
1224 0 : else if( FD_LIKELY( !strcmp( link->name, "repair_shred" ) ) ) ctx->in_kind[ i ] = IN_KIND_REPAIR;
1225 0 : else FD_LOG_ERR(( "shred tile has unexpected input link %lu %s", i, link->name ));
1226 :
1227 0 : ctx->in[ i ].mem = link_wksp->wksp;
1228 0 : ctx->in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in[ i ].mem, link->dcache );
1229 0 : ctx->in[ i ].wmark = fd_dcache_compact_wmark ( ctx->in[ i ].mem, link->dcache, link->mtu );
1230 0 : }
1231 :
1232 0 : fd_topo_link_t * net_out = &topo->links[ tile->out_link_id[ NET_OUT_IDX ] ];
1233 :
1234 0 : ctx->net_out_chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( net_out->dcache ), net_out->dcache );
1235 0 : ctx->net_out_mem = topo->workspaces[ topo->objs[ net_out->dcache_obj_id ].wksp_id ].wksp;
1236 0 : ctx->net_out_wmark = fd_dcache_compact_wmark ( ctx->net_out_mem, net_out->dcache, net_out->mtu );
1237 0 : ctx->net_out_chunk = ctx->net_out_chunk0;
1238 :
1239 0 : ctx->store = NULL;
1240 0 : ulong store_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "store" );
1241 0 : if( FD_LIKELY( store_obj_id!=ULONG_MAX ) ) { /* firedancer-only */
1242 0 : ctx->store = fd_store_join( fd_topo_obj_laddr( topo, store_obj_id ) );
1243 0 : FD_TEST( ctx->store->magic == FD_STORE_MAGIC );
1244 0 : }
1245 :
1246 0 : if( FD_LIKELY( ctx->repair_out_idx!=ULONG_MAX ) ) { /* firedancer-only */
1247 0 : fd_topo_link_t * repair_out = &topo->links[ tile->out_link_id[ ctx->repair_out_idx ] ];
1248 0 : ctx->repair_out_mem = topo->workspaces[ topo->objs[ repair_out->dcache_obj_id ].wksp_id ].wksp;
1249 0 : ctx->repair_out_chunk0 = fd_dcache_compact_chunk0( ctx->repair_out_mem, repair_out->dcache );
1250 0 : ctx->repair_out_wmark = fd_dcache_compact_wmark ( ctx->repair_out_mem, repair_out->dcache, repair_out->mtu );
1251 0 : ctx->repair_out_chunk = ctx->repair_out_chunk0;
1252 0 : FD_TEST( fd_dcache_compact_is_safe( ctx->repair_out_mem, repair_out->dcache, repair_out->mtu, repair_out->depth ) );
1253 0 : }
1254 :
1255 0 : if( FD_LIKELY( ctx->store_out_idx!=ULONG_MAX ) ) { /* frankendancer-only */
1256 0 : fd_topo_link_t * store_out = &topo->links[ tile->out_link_id[ ctx->store_out_idx ] ];
1257 0 : ctx->store_out_mem = topo->workspaces[ topo->objs[ store_out->dcache_obj_id ].wksp_id ].wksp;
1258 0 : ctx->store_out_chunk0 = fd_dcache_compact_chunk0( ctx->store_out_mem, store_out->dcache );
1259 0 : ctx->store_out_wmark = fd_dcache_compact_wmark ( ctx->store_out_mem, store_out->dcache, store_out->mtu );
1260 0 : ctx->store_out_chunk = ctx->store_out_chunk0;
1261 0 : FD_TEST( fd_dcache_compact_is_safe( ctx->store_out_mem, store_out->dcache, store_out->mtu, store_out->depth ) );
1262 0 : }
1263 :
1264 0 : ctx->poh_in_expect_seq = 0UL;
1265 :
1266 0 : ctx->shredder_fec_set_idx = 0UL;
1267 0 : ctx->shredder_max_fec_set_idx = (shred_store_mcache_depth+1UL)/2UL + 1UL;
1268 :
1269 0 : for( ulong i=0UL; i<FD_SHRED_BATCH_FEC_SETS_MAX; i++ ) { ctx->send_fec_set_idx[ i ] = ULONG_MAX; }
1270 0 : ctx->send_fec_set_cnt = 0UL;
1271 :
1272 0 : ctx->shred_buffer_sz = 0UL;
1273 0 : memset( ctx->shred_buffer, 0xFF, FD_NET_MTU );
1274 :
1275 0 : fd_histf_join( fd_histf_new( ctx->metrics->contact_info_cnt, FD_MHIST_MIN( SHRED, CLUSTER_CONTACT_INFO_CNT ),
1276 0 : FD_MHIST_MAX( SHRED, CLUSTER_CONTACT_INFO_CNT ) ) );
1277 0 : fd_histf_join( fd_histf_new( ctx->metrics->batch_sz, FD_MHIST_MIN( SHRED, BATCH_SZ ),
1278 0 : FD_MHIST_MAX( SHRED, BATCH_SZ ) ) );
1279 0 : fd_histf_join( fd_histf_new( ctx->metrics->batch_microblock_cnt, FD_MHIST_MIN( SHRED, BATCH_MICROBLOCK_CNT ),
1280 0 : FD_MHIST_MAX( SHRED, BATCH_MICROBLOCK_CNT ) ) );
1281 0 : fd_histf_join( fd_histf_new( ctx->metrics->shredding_timing, FD_MHIST_SECONDS_MIN( SHRED, SHREDDING_DURATION_SECONDS ),
1282 0 : FD_MHIST_SECONDS_MAX( SHRED, SHREDDING_DURATION_SECONDS ) ) );
1283 0 : fd_histf_join( fd_histf_new( ctx->metrics->add_shred_timing, FD_MHIST_SECONDS_MIN( SHRED, ADD_SHRED_DURATION_SECONDS ),
1284 0 : FD_MHIST_SECONDS_MAX( SHRED, ADD_SHRED_DURATION_SECONDS ) ) );
1285 0 : fd_histf_join( fd_histf_new( ctx->metrics->store_insert_wait, FD_MHIST_SECONDS_MIN( SHRED, STORE_INSERT_WAIT ),
1286 0 : FD_MHIST_SECONDS_MAX( SHRED, STORE_INSERT_WAIT ) ) );
1287 0 : fd_histf_join( fd_histf_new( ctx->metrics->store_insert_work, FD_MHIST_SECONDS_MIN( SHRED, STORE_INSERT_WORK ),
1288 0 : FD_MHIST_SECONDS_MAX( SHRED, STORE_INSERT_WORK ) ) );
1289 0 : memset( ctx->metrics->shred_processing_result, '\0', sizeof(ctx->metrics->shred_processing_result) );
1290 0 : ctx->metrics->invalid_block_id_cnt = 0UL;
1291 0 : ctx->metrics->shred_rejected_unchained_cnt = 0UL;
1292 :
1293 0 : ctx->pending_batch.microblock_cnt = 0UL;
1294 0 : ctx->pending_batch.txn_cnt = 0UL;
1295 0 : ctx->pending_batch.pos = 0UL;
1296 0 : ctx->pending_batch.slot = 0UL;
1297 0 : memset( ctx->pending_batch.payload, 0, sizeof(ctx->pending_batch.payload) );
1298 :
1299 0 : for( ulong i=0UL; i<FD_SHRED_FEATURES_ACTIVATION_SLOT_CNT; i++ )
1300 0 : ctx->features_activation->slots[i] = FD_SHRED_FEATURES_ACTIVATION_SLOT_DISABLED;
1301 :
1302 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
1303 0 : if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
1304 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
1305 0 : }
1306 :
1307 : static ulong
1308 : populate_allowed_seccomp( fd_topo_t const * topo,
1309 : fd_topo_tile_t const * tile,
1310 : ulong out_cnt,
1311 0 : struct sock_filter * out ) {
1312 0 : (void)topo;
1313 0 : (void)tile;
1314 :
1315 0 : populate_sock_filter_policy_fd_shred_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
1316 0 : return sock_filter_policy_fd_shred_tile_instr_cnt;
1317 0 : }
1318 :
1319 : static ulong
1320 : populate_allowed_fds( fd_topo_t const * topo,
1321 : fd_topo_tile_t const * tile,
1322 : ulong out_fds_cnt,
1323 0 : int * out_fds ) {
1324 0 : (void)topo;
1325 0 : (void)tile;
1326 :
1327 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
1328 :
1329 0 : ulong out_cnt = 0UL;
1330 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
1331 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
1332 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
1333 0 : return out_cnt;
1334 0 : }
1335 :
1336 : /* Excluding net_out (where the link is unreliable), STEM_BURST needs
1337 : to guarantee enough credits for the worst case. There are 4 cases
1338 : to consider: (IN_KIND_NET/IN_KIND_POH) x (Frankendancer/Firedancer)
1339 : In the IN_KIND_NET case: (Frankendancer) that can be 4 frags to
1340 : store; (Firedancer) that is one frag for the shred to repair, and
1341 : then another frag to repair for the FEC set.
1342 : In the IN_KIND_POH case: (Frankendancer) there might be
1343 : FD_SHRED_BATCH_FEC_SETS_MAX FEC sets, but we know they are 32:32,
1344 : which means only two shred34s per FEC set; (Firedancer) that is
1345 : FD_SHRED_BATCH_FEC_SETS_MAX frags to repair (one per FEC set).
1346 : Therefore, the worst case is IN_KIND_POH for Frankendancer. */
1347 0 : #define STEM_BURST (FD_SHRED_BATCH_FEC_SETS_MAX*2UL)
1348 :
1349 : /* See explanation in fd_pack */
1350 0 : #define STEM_LAZY (128L*3000L)
1351 :
1352 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_shred_ctx_t
1353 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_shred_ctx_t)
1354 :
1355 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
1356 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
1357 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
1358 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
1359 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
1360 :
1361 : #include "../stem/fd_stem.c"
1362 :
1363 : fd_topo_run_tile_t fd_tile_shred = {
1364 : .name = "shred",
1365 : .populate_allowed_seccomp = populate_allowed_seccomp,
1366 : .populate_allowed_fds = populate_allowed_fds,
1367 : .scratch_align = scratch_align,
1368 : .scratch_footprint = scratch_footprint,
1369 : .privileged_init = privileged_init,
1370 : .unprivileged_init = unprivileged_init,
1371 : .run = stem_run,
1372 : };
|