Line data Source code
1 : #include "../tiles.h"
2 :
3 : #include "generated/fd_shred_tile_seccomp.h"
4 : #include "../topo/fd_pod_format.h"
5 : #include "../shred/fd_shredder.h"
6 : #include "../shred/fd_shred_dest.h"
7 : #include "../shred/fd_fec_resolver.h"
8 : #include "../shred/fd_stake_ci.h"
9 : #include "../keyguard/fd_keyload.h"
10 : #include "../keyguard/fd_keyguard.h"
11 : #include "../keyguard/fd_keyswitch.h"
12 : #include "../fd_disco.h"
13 : #include "../../flamenco/leaders/fd_leaders.h"
14 : #include "../../flamenco/runtime/fd_blockstore.h"
15 : #include "../../util/net/fd_net_headers.h"
16 :
17 : #include <linux/unistd.h>
18 :
19 : /* The shred tile handles shreds from two data sources: shreds
20 : generated from microblocks from the banking tile, and shreds
21 : retransmitted from the network.
22 :
23 : They have rather different semantics, but at the end of the day, they
24 : both result in a bunch of shreds and FEC sets that need to be sent to
25 : the blockstore and on the network, which is why one tile handles
26 : both.
27 :
28 : We segment the memory for the two types of shreds into two halves of
29 : a dcache because they follow somewhat different flow control
30 : patterns. For flow control, the normal guarantee we want to provide
31 : is that the dcache entry is not overwritten unless the mcache entry
32 : has also been overwritten. The normal way to do this when using both
33 : cyclically and with a 1-to-1 mapping is to make the dcache at least
34 : `burst` entries bigger than the mcache.
35 :
36 : In this tile, we use one output mcache with one output dcache (which
37 : is logically partitioned into two) for the two sources of data. The
38 : worst case for flow control is when we're only sending with one of
39 : the dcache partitions at a time though, so we can consider them
40 : separately.
41 :
42 : From bank: Every FEC set triggers at least two mcache entries (one
43 : for parity and one for data), so at most, we have ceil(mcache
44 : depth/2) FEC sets exposed. This means we need to decompose dcache
45 : into at least ceil(mcache depth/2)+1 FEC sets.
46 :
47 : From the network: The FEC resolver doesn't use a cyclic order, but it
48 : does promise that once it returns an FEC set, it will return at least
49 : complete_depth FEC sets before returning it again. This means we
50 : want at most complete_depth-1 FEC sets exposed, so
51 : complete_depth=ceil(mcache depth/2)+1 FEC sets as above. The FEC
52 : resolver has the ability to keep individual shreds for partial_depth
53 : calls, but because in this version of the shred tile, we send each
54 : shred to all its destinations as soon as we get it, we don't need
55 : that functionality, so we set partial_depth=1.
56 :
57 : Adding these up, we get 2*ceil(mcache_depth/2)+3+fec_resolver_depth
58 : FEC sets, which is no more than mcache_depth+4+fec_resolver_depth.
59 : Each FEC is paired with 4 fd_shred34_t structs, so that means we need
60 : to decompose the dcache into 4*mcache_depth + 4*fec_resolver_depth +
61 : 16 fd_shred34_t structs. */
62 :
63 :
64 : /* The memory this tile uses is a bit complicated and has some logical
65 : aliasing to facilitate zero-copy use. We have a dcache containing
66 : fd_shred34_t objects, which are basically 34 fd_shred_t objects
67 : padded to their max size, where 34 is set so that the size of the
68 : fd_shred34_t object (including some metadata) is less than
69 : USHORT_MAX, which facilitates sending it using Tango. Then, for each
70 : set of 4 consecutive fd_shred34_t objects, we have an fd_fec_set_t.
71 : The first 34 data shreds point to the payload section of the payload
72 : section of each of the packets in the first fd_shred34_t. The other
73 : 33 data shreds point into the second fd_shred34_t. Similar for the
74 : parity shreds pointing into the third and fourth fd_shred34_t. */
75 :
76 : /* There's nothing deep about this max, but I just find it easier to
77 : have a max and use statically sized arrays than alloca. */
78 : #define MAX_BANK_CNT 64UL
79 :
80 : /* MAX_SHRED_DESTS indicates the maximum number of destinations (i.e. a
81 : pubkey -> ip, port) that the shred tile can keep track of. */
82 0 : #define MAX_SHRED_DESTS 40200UL
83 :
84 : #define FD_SHRED_TILE_SCRATCH_ALIGN 128UL
85 :
86 0 : #define IN_KIND_CONTACT (0UL)
87 0 : #define IN_KIND_STAKE (1UL)
88 0 : #define IN_KIND_POH (2UL)
89 0 : #define IN_KIND_NET (3UL)
90 0 : #define IN_KIND_SIGN (4UL)
91 :
92 0 : #define STORE_OUT_IDX 0
93 0 : #define NET_OUT_IDX 1
94 0 : #define SIGN_OUT_IDX 2
95 0 : #define REPLAY_OUT_IDX 3
96 :
97 : #define MAX_SLOTS_PER_EPOCH 432000UL
98 :
99 0 : #define DCACHE_ENTRIES_PER_FEC_SET (4UL)
100 : FD_STATIC_ASSERT( sizeof(fd_shred34_t) < USHORT_MAX, shred_34 );
101 : FD_STATIC_ASSERT( 34*DCACHE_ENTRIES_PER_FEC_SET >= FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX, shred_34 );
102 : FD_STATIC_ASSERT( sizeof(fd_shred34_t) == FD_SHRED_STORE_MTU, shred_34 );
103 :
104 : FD_STATIC_ASSERT( sizeof(fd_entry_batch_meta_t)==24UL, poh_shred_mtu );
105 :
106 0 : #define FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT 2
107 :
108 : typedef struct {
109 : fd_wksp_t * mem;
110 : ulong chunk0;
111 : ulong wmark;
112 : } fd_shred_in_ctx_t;
113 :
114 : typedef struct {
115 : fd_shredder_t * shredder;
116 : fd_fec_resolver_t * resolver;
117 : fd_pubkey_t identity_key[1]; /* Just the public key */
118 :
119 : ulong round_robin_id;
120 : ulong round_robin_cnt;
121 : /* Number of batches shredded from PoH during the current slot.
122 : This should be the same for all the shred tiles. */
123 : ulong batch_cnt;
124 : /* Slot of the most recent microblock we've seen from PoH,
125 : or 0 if we haven't seen one yet */
126 : ulong slot;
127 :
128 : fd_keyswitch_t * keyswitch;
129 : fd_keyguard_client_t keyguard_client[1];
130 :
131 : /* shred34 and fec_sets are very related: fec_sets[i] has pointers
132 : to the shreds in shred34[4*i + k] for k=0,1,2,3. */
133 : fd_shred34_t * shred34;
134 : fd_fec_set_t * fec_sets;
135 :
136 : fd_stake_ci_t * stake_ci;
137 : /* These are used in between during_frag and after_frag */
138 : fd_shred_dest_weighted_t * new_dest_ptr;
139 : ulong new_dest_cnt;
140 : ulong shredded_txn_cnt;
141 :
142 : ulong poh_in_expect_seq;
143 :
144 : ushort net_id;
145 :
146 : int skip_frag;
147 :
148 : fd_ip4_udp_hdrs_t data_shred_net_hdr [1];
149 : fd_ip4_udp_hdrs_t parity_shred_net_hdr[1];
150 :
151 : fd_wksp_t * shred_store_wksp;
152 :
153 : ulong shredder_fec_set_idx; /* In [0, shredder_max_fec_set_idx) */
154 : ulong shredder_max_fec_set_idx; /* exclusive */
155 :
156 : ulong send_fec_set_idx;
157 : ulong tsorig; /* timestamp of the last packet in compressed form */
158 :
159 : /* Includes Ethernet, IP, UDP headers */
160 : ulong shred_buffer_sz;
161 : uchar shred_buffer[ FD_NET_MTU ];
162 :
163 :
164 : fd_shred_in_ctx_t in[ 32 ];
165 : int in_kind[ 32 ];
166 :
167 : fd_frag_meta_t * net_out_mcache;
168 : ulong * net_out_sync;
169 : ulong net_out_depth;
170 : ulong net_out_seq;
171 :
172 : fd_wksp_t * net_out_mem;
173 : ulong net_out_chunk0;
174 : ulong net_out_wmark;
175 : ulong net_out_chunk;
176 :
177 : fd_wksp_t * store_out_mem;
178 : ulong store_out_chunk0;
179 : ulong store_out_wmark;
180 : ulong store_out_chunk;
181 :
182 : fd_wksp_t * replay_out_mem;
183 : ulong replay_out_chunk0;
184 : ulong replay_out_wmark;
185 : ulong replay_out_chunk;
186 :
187 : fd_blockstore_t blockstore_ljoin;
188 : fd_blockstore_t * blockstore;
189 :
190 : struct {
191 : fd_histf_t contact_info_cnt[ 1 ];
192 : fd_histf_t batch_sz[ 1 ];
193 : fd_histf_t batch_microblock_cnt[ 1 ];
194 : fd_histf_t shredding_timing[ 1 ];
195 : fd_histf_t add_shred_timing[ 1 ];
196 : ulong shred_processing_result[ FD_FEC_RESOLVER_ADD_SHRED_RETVAL_CNT+FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT ];
197 : } metrics[ 1 ];
198 :
199 : struct {
200 : ulong txn_cnt;
201 : ulong pos; /* in payload, so 0<=pos<63671 */
202 : ulong slot; /* set to 0 when pos==0 */
203 : union {
204 : struct {
205 : ulong microblock_cnt;
206 : uchar payload[ 63679UL - 8UL ];
207 : };
208 : uchar raw[ 63679UL ]; /* The largest that fits in 1 FEC set */
209 : };
210 : } pending_batch;
211 : } fd_shred_ctx_t;
212 :
213 : /* PENDING_BATCH_WMARK: Following along the lines of dcache, batch
214 : microblocks until either the slot ends or we excede the watermark.
215 : We know that if we're <= watermark, we can always accept a message of
216 : maximum size. */
217 0 : #define PENDING_BATCH_WMARK (63679UL - 8UL - FD_POH_SHRED_MTU)
218 :
219 : FD_FN_CONST static inline ulong
220 3 : scratch_align( void ) {
221 3 : return 128UL;
222 3 : }
223 :
224 : FD_FN_PURE static inline ulong
225 3 : scratch_footprint( fd_topo_tile_t const * tile ) {
226 :
227 3 : ulong fec_resolver_footprint = fd_fec_resolver_footprint( tile->shred.fec_resolver_depth, 1UL, tile->shred.depth,
228 3 : 128UL * tile->shred.fec_resolver_depth );
229 3 : ulong fec_set_cnt = tile->shred.depth + tile->shred.fec_resolver_depth + 4UL;
230 :
231 3 : ulong l = FD_LAYOUT_INIT;
232 3 : l = FD_LAYOUT_APPEND( l, alignof(fd_shred_ctx_t), sizeof(fd_shred_ctx_t) );
233 3 : l = FD_LAYOUT_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
234 3 : l = FD_LAYOUT_APPEND( l, fd_fec_resolver_align(), fec_resolver_footprint );
235 3 : l = FD_LAYOUT_APPEND( l, fd_shredder_align(), fd_shredder_footprint() );
236 3 : l = FD_LAYOUT_APPEND( l, alignof(fd_fec_set_t), sizeof(fd_fec_set_t)*fec_set_cnt );
237 3 : return FD_LAYOUT_FINI( l, scratch_align() );
238 3 : }
239 :
240 : static inline void
241 0 : during_housekeeping( fd_shred_ctx_t * ctx ) {
242 0 : if( FD_UNLIKELY( fd_keyswitch_state_query( ctx->keyswitch )==FD_KEYSWITCH_STATE_SWITCH_PENDING ) ) {
243 0 : ulong seq_must_complete = ctx->keyswitch->param;
244 :
245 0 : if( FD_UNLIKELY( fd_seq_lt( ctx->poh_in_expect_seq, seq_must_complete ) ) ) {
246 : /* See fd_keyswitch.h, we need to flush any in-flight shreds from
247 : the leader pipeline before switching key. */
248 0 : FD_LOG_WARNING(( "Flushing in-flight unpublished shreds, must reach seq %lu, currently at %lu ...", seq_must_complete, ctx->poh_in_expect_seq ));
249 0 : return;
250 0 : }
251 :
252 0 : fd_memcpy( ctx->identity_key->uc, ctx->keyswitch->bytes, 32UL );
253 0 : fd_stake_ci_set_identity( ctx->stake_ci, ctx->identity_key );
254 0 : fd_keyswitch_state( ctx->keyswitch, FD_KEYSWITCH_STATE_COMPLETED );
255 0 : }
256 0 : }
257 :
258 : static inline void
259 0 : metrics_write( fd_shred_ctx_t * ctx ) {
260 0 : FD_MHIST_COPY( SHRED, CLUSTER_CONTACT_INFO_CNT, ctx->metrics->contact_info_cnt );
261 0 : FD_MHIST_COPY( SHRED, BATCH_SZ, ctx->metrics->batch_sz );
262 0 : FD_MHIST_COPY( SHRED, BATCH_MICROBLOCK_CNT, ctx->metrics->batch_microblock_cnt );
263 0 : FD_MHIST_COPY( SHRED, SHREDDING_DURATION_SECONDS, ctx->metrics->shredding_timing );
264 0 : FD_MHIST_COPY( SHRED, ADD_SHRED_DURATION_SECONDS, ctx->metrics->add_shred_timing );
265 :
266 0 : FD_MCNT_ENUM_COPY( SHRED, SHRED_PROCESSED, ctx->metrics->shred_processing_result );
267 0 : }
268 :
269 : static inline void
270 : handle_new_cluster_contact_info( fd_shred_ctx_t * ctx,
271 0 : uchar const * buf ) {
272 0 : ulong const * header = (ulong const *)fd_type_pun_const( buf );
273 :
274 0 : ulong dest_cnt = header[ 0 ];
275 0 : fd_histf_sample( ctx->metrics->contact_info_cnt, dest_cnt );
276 :
277 0 : if( dest_cnt >= MAX_SHRED_DESTS )
278 0 : FD_LOG_ERR(( "Cluster nodes had %lu destinations, which was more than the max of %lu", dest_cnt, MAX_SHRED_DESTS ));
279 :
280 0 : fd_shred_dest_wire_t const * in_dests = fd_type_pun_const( header+1UL );
281 0 : fd_shred_dest_weighted_t * dests = fd_stake_ci_dest_add_init( ctx->stake_ci );
282 :
283 0 : ctx->new_dest_ptr = dests;
284 0 : ctx->new_dest_cnt = dest_cnt;
285 :
286 0 : for( ulong i=0UL; i<dest_cnt; i++ ) {
287 0 : memcpy( dests[i].pubkey.uc, in_dests[i].pubkey, 32UL );
288 0 : dests[i].ip4 = in_dests[i].ip4_addr;
289 0 : dests[i].port = in_dests[i].udp_port;
290 0 : }
291 0 : }
292 :
293 : static inline void
294 0 : finalize_new_cluster_contact_info( fd_shred_ctx_t * ctx ) {
295 0 : fd_stake_ci_dest_add_fini( ctx->stake_ci, ctx->new_dest_cnt );
296 0 : }
297 :
298 : static inline int
299 : before_frag( fd_shred_ctx_t * ctx,
300 : ulong in_idx,
301 : ulong seq,
302 0 : ulong sig ) {
303 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH ) ) ctx->poh_in_expect_seq = seq+1UL;
304 :
305 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) return fd_disco_netmux_sig_proto( sig )!=DST_PROTO_SHRED;
306 0 : else if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH ) ) return fd_disco_poh_sig_pkt_type( sig )!=POH_PKT_TYPE_MICROBLOCK;
307 :
308 0 : return 0;
309 0 : }
310 :
311 : static void
312 : during_frag( fd_shred_ctx_t * ctx,
313 : ulong in_idx,
314 : ulong seq FD_PARAM_UNUSED,
315 : ulong sig,
316 : ulong chunk,
317 : ulong sz,
318 0 : ulong ctl ) {
319 :
320 0 : ctx->skip_frag = 0;
321 :
322 0 : ctx->tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
323 :
324 :
325 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_CONTACT ) ) {
326 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
327 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
328 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
329 :
330 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
331 0 : handle_new_cluster_contact_info( ctx, dcache_entry );
332 0 : return;
333 0 : }
334 :
335 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_STAKE ) ) {
336 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark ) )
337 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
338 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
339 :
340 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
341 0 : fd_stake_ci_stake_msg_init( ctx->stake_ci, dcache_entry );
342 0 : return;
343 0 : }
344 :
345 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_POH ) ) {
346 : /* This is a frag from the PoH tile. We'll copy it to our pending
347 : microblock batch and shred it if necessary (last in block or
348 : above watermark). We just go ahead and shred it here, even
349 : though we may get overrun. If we do end up getting overrun, we
350 : just won't send these shreds out and we'll reuse the FEC set for
351 : the next one. From a higher level though, if we do get overrun,
352 : a bunch of shreds will never be transmitted, and we'll end up
353 : producing a block that never lands on chain. */
354 0 : fd_fec_set_t * out = ctx->fec_sets + ctx->shredder_fec_set_idx;
355 :
356 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk );
357 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>FD_POH_SHRED_MTU ||
358 0 : sz<(sizeof(fd_entry_batch_meta_t)+sizeof(fd_entry_batch_header_t)) ) )
359 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
360 0 : ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
361 :
362 0 : fd_entry_batch_meta_t const * entry_meta = (fd_entry_batch_meta_t const *)dcache_entry;
363 0 : uchar const * entry = dcache_entry + sizeof(fd_entry_batch_meta_t);
364 0 : ulong entry_sz = sz - sizeof(fd_entry_batch_meta_t);
365 :
366 0 : fd_entry_batch_header_t const * microblock = (fd_entry_batch_header_t const *)entry;
367 :
368 : /* It should never be possible for this to fail, but we check it
369 : anyway. */
370 0 : FD_TEST( entry_sz + ctx->pending_batch.pos <= sizeof(ctx->pending_batch.payload) );
371 :
372 0 : ulong target_slot = fd_disco_poh_sig_slot( sig );
373 0 : if( FD_UNLIKELY( (ctx->pending_batch.microblock_cnt>0) & (ctx->pending_batch.slot!=target_slot) ) ) {
374 : /* TODO: The Agave client sends a dummy entry batch with only 1
375 : byte and the block-complete bit set. This helps other
376 : validators know that the block is dead and they should not try
377 : to continue building a fork on it. We probably want a similar
378 : approach eventually. */
379 0 : FD_LOG_WARNING(( "Abandoning %lu microblocks for slot %lu and switching to slot %lu",
380 0 : ctx->pending_batch.microblock_cnt, ctx->pending_batch.slot, target_slot ));
381 0 : ctx->pending_batch.slot = 0UL;
382 0 : ctx->pending_batch.pos = 0UL;
383 0 : ctx->pending_batch.microblock_cnt = 0UL;
384 0 : ctx->pending_batch.txn_cnt = 0UL;
385 0 : ctx->batch_cnt = 0UL;
386 :
387 0 : FD_MCNT_INC( SHRED, MICROBLOCKS_ABANDONED, 1UL );
388 0 : }
389 :
390 0 : ctx->pending_batch.slot = target_slot;
391 0 : if( FD_UNLIKELY( target_slot!=ctx->slot )) {
392 : /* Reset batch count if we are in a new slot */
393 0 : ctx->batch_cnt = 0UL;
394 0 : ctx->slot = target_slot;
395 0 : }
396 0 : if( FD_UNLIKELY( ctx->batch_cnt%ctx->round_robin_cnt==ctx->round_robin_id ) ) {
397 : /* Ugh, yet another memcpy */
398 0 : fd_memcpy( ctx->pending_batch.payload + ctx->pending_batch.pos, entry, entry_sz );
399 0 : } else {
400 : /* If we are not processing this batch, filter */
401 0 : ctx->skip_frag = 1;
402 0 : }
403 0 : ctx->pending_batch.pos += entry_sz;
404 0 : ctx->pending_batch.microblock_cnt += 1UL;
405 0 : ctx->pending_batch.txn_cnt += microblock->txn_cnt;
406 :
407 0 : int last_in_batch = entry_meta->block_complete | (ctx->pending_batch.pos > PENDING_BATCH_WMARK);
408 :
409 0 : ctx->send_fec_set_idx = ULONG_MAX;
410 0 : if( FD_UNLIKELY( last_in_batch )) {
411 0 : if( FD_UNLIKELY( ctx->batch_cnt%ctx->round_robin_cnt==ctx->round_robin_id ) ) {
412 : /* If it's our turn, shred this batch. FD_UNLIKELY because shred tile cnt generally >= 2 */
413 0 : ulong batch_sz = sizeof(ulong)+ctx->pending_batch.pos;
414 :
415 : /* We sized this so it fits in one FEC set */
416 0 : long shredding_timing = -fd_tickcount();
417 :
418 0 : if( FD_UNLIKELY( entry_meta->block_complete && batch_sz < FD_SHREDDER_NORMAL_FEC_SET_PAYLOAD_SZ ) ) {
419 :
420 : /* Ensure the last batch generates >= 32 data shreds by
421 : padding with 0s. Because the last FEC set is "oddly sized"
422 : we only expect this code path to execute for blocks
423 : containing less data than can fill 32 data shred payloads
424 : (hence FD_UNLIKELY).
425 :
426 : See documentation for FD_SHREDDER_NORMAL_FEC_SET_PAYLOAD_SZ
427 : for further context. */
428 :
429 0 : fd_memset( ctx->pending_batch.payload + ctx->pending_batch.pos, 0, FD_SHREDDER_NORMAL_FEC_SET_PAYLOAD_SZ - batch_sz );
430 0 : batch_sz = FD_SHREDDER_NORMAL_FEC_SET_PAYLOAD_SZ;
431 0 : }
432 :
433 0 : fd_shredder_init_batch( ctx->shredder, ctx->pending_batch.raw, batch_sz, target_slot, entry_meta );
434 0 : FD_TEST( fd_shredder_next_fec_set( ctx->shredder, out ) );
435 0 : fd_shredder_fini_batch( ctx->shredder );
436 0 : shredding_timing += fd_tickcount();
437 :
438 0 : d_rcvd_join( d_rcvd_new( d_rcvd_delete( d_rcvd_leave( out->data_shred_rcvd ) ) ) );
439 0 : p_rcvd_join( p_rcvd_new( p_rcvd_delete( p_rcvd_leave( out->parity_shred_rcvd ) ) ) );
440 0 : ctx->shredded_txn_cnt = ctx->pending_batch.txn_cnt;
441 :
442 0 : ctx->send_fec_set_idx = ctx->shredder_fec_set_idx;
443 :
444 : /* Update metrics */
445 0 : fd_histf_sample( ctx->metrics->batch_sz, batch_sz );
446 0 : fd_histf_sample( ctx->metrics->batch_microblock_cnt, ctx->pending_batch.microblock_cnt );
447 0 : fd_histf_sample( ctx->metrics->shredding_timing, (ulong)shredding_timing );
448 0 : } else {
449 : /* If it's not our turn, update the indices for this slot */
450 0 : fd_shredder_skip_batch( ctx->shredder, sizeof(ulong)+ctx->pending_batch.pos, target_slot );
451 0 : }
452 :
453 0 : ctx->pending_batch.slot = 0UL;
454 0 : ctx->pending_batch.pos = 0UL;
455 0 : ctx->pending_batch.microblock_cnt = 0UL;
456 0 : ctx->pending_batch.txn_cnt = 0UL;
457 0 : ctx->batch_cnt++;
458 0 : }
459 0 : } else if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
460 : /* The common case, from the net tile. The FEC resolver API does
461 : not present a prepare/commit model. If we get overrun between
462 : when the FEC resolver verifies the signature and when it stores
463 : the local copy, we could end up storing and retransmitting
464 : garbage. Instead we copy it locally, sadly, and only give it to
465 : the FEC resolver when we know it won't be overrun anymore. */
466 0 : if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz>FD_NET_MTU ) )
467 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));
468 0 : uchar const * dcache_entry = (uchar const *)fd_chunk_to_laddr_const( ctx->in[ in_idx ].mem, chunk ) + ctl;
469 0 : ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
470 0 : FD_TEST( hdr_sz <= sz ); /* Should be ensured by the net tile */
471 0 : fd_shred_t const * shred = fd_shred_parse( dcache_entry+hdr_sz, sz-hdr_sz );
472 0 : if( FD_UNLIKELY( !shred ) ) {
473 0 : ctx->skip_frag = 1;
474 0 : return;
475 0 : };
476 : /* all shreds in the same FEC set will have the same signature
477 : so we can round-robin shreds between the shred tiles based on
478 : just the signature without splitting individual FEC sets. */
479 0 : ulong sig = fd_ulong_load_8( shred->signature );
480 0 : if( FD_LIKELY( sig%ctx->round_robin_cnt!=ctx->round_robin_id ) ) {
481 0 : ctx->skip_frag = 1;
482 0 : return;
483 0 : }
484 0 : fd_memcpy( ctx->shred_buffer, dcache_entry+hdr_sz, sz-hdr_sz );
485 0 : ctx->shred_buffer_sz = sz-hdr_sz;
486 0 : }
487 0 : }
488 :
489 : static inline void
490 : send_shred( fd_shred_ctx_t * ctx,
491 : fd_shred_t const * shred,
492 : fd_shred_dest_t * sdest,
493 : fd_shred_dest_idx_t dest_idx,
494 0 : ulong tsorig ) {
495 0 : fd_shred_dest_weighted_t * dest = fd_shred_dest_idx_to_dest( sdest, dest_idx );
496 :
497 0 : if( FD_UNLIKELY( !dest->ip4 ) ) return;
498 :
499 0 : uchar * packet = fd_chunk_to_laddr( ctx->net_out_mem, ctx->net_out_chunk );
500 :
501 0 : int is_data = fd_shred_is_data( fd_shred_type( shred->variant ) );
502 0 : fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)packet;
503 0 : *hdr = *( is_data ? ctx->data_shred_net_hdr : ctx->parity_shred_net_hdr );
504 :
505 0 : fd_ip4_hdr_t * ip4 = hdr->ip4;
506 0 : ip4->daddr = dest->ip4;
507 0 : ip4->net_id = fd_ushort_bswap( ctx->net_id++ );
508 0 : ip4->check = 0U;
509 0 : ip4->check = fd_ip4_hdr_check_fast( ip4 );
510 :
511 0 : hdr->udp->net_dport = fd_ushort_bswap( dest->port );
512 :
513 0 : ulong shred_sz = fd_ulong_if( is_data, FD_SHRED_MIN_SZ, FD_SHRED_MAX_SZ );
514 0 : fd_memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), shred, shred_sz );
515 :
516 0 : ulong pkt_sz = shred_sz + sizeof(fd_ip4_udp_hdrs_t);
517 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
518 0 : ulong sig = fd_disco_netmux_sig( dest->ip4, dest->port, dest->ip4, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) );
519 0 : fd_mcache_publish( ctx->net_out_mcache, ctx->net_out_depth, ctx->net_out_seq, sig, ctx->net_out_chunk, pkt_sz, 0UL, tsorig, tspub );
520 0 : ctx->net_out_seq = fd_seq_inc( ctx->net_out_seq, 1UL );
521 0 : ctx->net_out_chunk = fd_dcache_compact_next( ctx->net_out_chunk, pkt_sz, ctx->net_out_chunk0, ctx->net_out_wmark );
522 0 : }
523 :
524 : static void
525 : after_frag( fd_shred_ctx_t * ctx,
526 : ulong in_idx,
527 : ulong seq,
528 : ulong sig,
529 : ulong sz,
530 : ulong tsorig,
531 : ulong _tspub,
532 0 : fd_stem_context_t * stem ) {
533 0 : (void)seq;
534 0 : (void)sig;
535 0 : (void)sz;
536 0 : (void)tsorig;
537 0 : (void)_tspub;
538 :
539 0 : if( FD_UNLIKELY( ctx->skip_frag ) ) return;
540 :
541 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_CONTACT ) ) {
542 0 : finalize_new_cluster_contact_info( ctx );
543 0 : return;
544 0 : }
545 :
546 0 : if( FD_UNLIKELY( ctx->in_kind[ in_idx ]==IN_KIND_STAKE ) ) {
547 0 : fd_stake_ci_stake_msg_fini( ctx->stake_ci );
548 0 : return;
549 0 : }
550 :
551 0 : if( FD_UNLIKELY( (ctx->in_kind[ in_idx ]==IN_KIND_POH) & (ctx->send_fec_set_idx==ULONG_MAX) ) ) {
552 : /* Entry from PoH that didn't trigger a new FEC set to be made */
553 0 : return;
554 0 : }
555 :
556 0 : const ulong fanout = 200UL;
557 0 : fd_shred_dest_idx_t _dests[ 200*(FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX) ];
558 :
559 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
560 0 : uchar * shred_buffer = ctx->shred_buffer;
561 0 : ulong shred_buffer_sz = ctx->shred_buffer_sz;
562 :
563 0 : fd_shred_t const * shred = fd_shred_parse( shred_buffer, shred_buffer_sz );
564 0 : if( FD_UNLIKELY( !shred ) ) { ctx->metrics->shred_processing_result[ 1 ]++; return; }
565 :
566 0 : fd_epoch_leaders_t const * lsched = fd_stake_ci_get_lsched_for_slot( ctx->stake_ci, shred->slot );
567 0 : if( FD_UNLIKELY( !lsched ) ) { ctx->metrics->shred_processing_result[ 0 ]++; return; }
568 :
569 0 : fd_pubkey_t const * slot_leader = fd_epoch_leaders_get( lsched, shred->slot );
570 0 : if( FD_UNLIKELY( !slot_leader ) ) { ctx->metrics->shred_processing_result[ 0 ]++; return; } /* Count this as bad slot too */
571 :
572 0 : fd_fec_set_t const * out_fec_set[1];
573 0 : fd_shred_t const * out_shred[1];
574 0 : fd_bmtree_node_t out_merkle_root[1];
575 :
576 0 : long add_shred_timing = -fd_tickcount();
577 0 : int rv = fd_fec_resolver_add_shred( ctx->resolver, shred, shred_buffer_sz, slot_leader->uc, out_fec_set, out_shred, out_merkle_root );
578 0 : add_shred_timing += fd_tickcount();
579 :
580 0 : fd_histf_sample( ctx->metrics->add_shred_timing, (ulong)add_shred_timing );
581 0 : ctx->metrics->shred_processing_result[ rv + FD_FEC_RESOLVER_ADD_SHRED_RETVAL_OFF+FD_SHRED_ADD_SHRED_EXTRA_RETVAL_CNT ]++;
582 :
583 0 : if( (rv==FD_FEC_RESOLVER_SHRED_OKAY) | (rv==FD_FEC_RESOLVER_SHRED_COMPLETES) ) {
584 : /* Relay this shred */
585 0 : ulong fanout = 200UL;
586 0 : ulong max_dest_cnt[1];
587 0 : do {
588 : /* If we've validated the shred and it COMPLETES but we can't
589 : compute the destination for whatever reason, don't forward
590 : the shred, but still send it to the blockstore. */
591 0 : fd_shred_dest_t * sdest = fd_stake_ci_get_sdest_for_slot( ctx->stake_ci, shred->slot );
592 0 : if( FD_UNLIKELY( !sdest ) ) break;
593 0 : fd_shred_dest_idx_t * dests = fd_shred_dest_compute_children( sdest, &shred, 1UL, _dests, 1UL, fanout, fanout, max_dest_cnt );
594 0 : if( FD_UNLIKELY( !dests ) ) break;
595 :
596 0 : for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, *out_shred, sdest, dests[ j ], ctx->tsorig );
597 0 : } while( 0 );
598 :
599 0 : if( FD_LIKELY( ctx->blockstore && rv==FD_FEC_RESOLVER_SHRED_OKAY ) ) { /* optimize for the compiler - branch predictor will still be correct */
600 0 : uchar * buf = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
601 0 : ulong sz = fd_shred_header_sz( shred->variant );
602 0 : fd_memcpy( buf, shred, sz );
603 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
604 0 : ulong replay_sig = fd_disco_shred_replay_sig( shred->slot, shred->idx, shred->fec_set_idx, fd_shred_is_code( fd_shred_type( shred->variant ) ), 0 );
605 0 : fd_stem_publish( stem, REPLAY_OUT_IDX, replay_sig, ctx->replay_out_chunk, sz, 0UL, ctx->tsorig, tspub );
606 0 : ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, sz, ctx->replay_out_chunk0, ctx->replay_out_wmark );
607 0 : }
608 0 : }
609 0 : if( FD_LIKELY( rv!=FD_FEC_RESOLVER_SHRED_COMPLETES ) ) return;
610 :
611 0 : FD_TEST( ctx->fec_sets <= *out_fec_set );
612 0 : ctx->send_fec_set_idx = (ulong)(*out_fec_set - ctx->fec_sets);
613 0 : ctx->shredded_txn_cnt = 0UL;
614 0 : } else {
615 : /* We know we didn't get overrun, so advance the index */
616 0 : ctx->shredder_fec_set_idx = (ctx->shredder_fec_set_idx+1UL)%ctx->shredder_max_fec_set_idx;
617 0 : }
618 : /* If this was the shred that completed an FEC set or this was a
619 : microblock we shredded ourself, we now have a full FEC set that we
620 : need to send to the blockstore and on the network (skipping any
621 : shreds we already sent). */
622 :
623 0 : fd_fec_set_t * set = ctx->fec_sets + ctx->send_fec_set_idx;
624 0 : fd_shred34_t * s34 = ctx->shred34 + 4UL*ctx->send_fec_set_idx;
625 :
626 0 : s34[ 0 ].shred_cnt = fd_ulong_min( set->data_shred_cnt, 34UL );
627 0 : s34[ 1 ].shred_cnt = set->data_shred_cnt - fd_ulong_min( set->data_shred_cnt, 34UL );
628 0 : s34[ 2 ].shred_cnt = fd_ulong_min( set->parity_shred_cnt, 34UL );
629 0 : s34[ 3 ].shred_cnt = set->parity_shred_cnt - fd_ulong_min( set->parity_shred_cnt, 34UL );
630 :
631 0 : ulong s34_cnt = 2UL + !!(s34[ 1 ].shred_cnt) + !!(s34[ 3 ].shred_cnt);
632 0 : ulong txn_per_s34 = ctx->shredded_txn_cnt / s34_cnt;
633 :
634 : /* Attribute the transactions evenly to the non-empty shred34s */
635 0 : for( ulong j=0UL; j<4UL; j++ ) s34[ j ].est_txn_cnt = fd_ulong_if( s34[ j ].shred_cnt>0UL, txn_per_s34, 0UL );
636 :
637 : /* Add whatever is left to the last shred34 */
638 0 : s34[ fd_ulong_if( s34[ 3 ].shred_cnt>0UL, 3, 2 ) ].est_txn_cnt += ctx->shredded_txn_cnt - txn_per_s34*s34_cnt;
639 :
640 : /* Set the sz field so that metrics are more accurate. */
641 0 : ulong sz0 = sizeof(fd_shred34_t) - (34UL - s34[ 0 ].shred_cnt)*FD_SHRED_MAX_SZ;
642 0 : ulong sz1 = sizeof(fd_shred34_t) - (34UL - s34[ 1 ].shred_cnt)*FD_SHRED_MAX_SZ;
643 0 : ulong sz2 = sizeof(fd_shred34_t) - (34UL - s34[ 2 ].shred_cnt)*FD_SHRED_MAX_SZ;
644 0 : ulong sz3 = sizeof(fd_shred34_t) - (34UL - s34[ 3 ].shred_cnt)*FD_SHRED_MAX_SZ;
645 :
646 0 : if( FD_LIKELY( ctx->blockstore ) ) {
647 : /* If the shred has a completes flag, then in the replay tile it
648 : will do immediate polling for shreds in that FEC set, under
649 : the assumption that they live in the blockstore. When a shred
650 : completes a FEC set, we need to add the shreds to the
651 : blockstore before we notify replay of a completed FEC set.
652 : Replay does not poll the blockstore for shreds on notifies of
653 : a regular non-completing shred. */
654 :
655 0 : for( ulong i=0UL; i<set->data_shred_cnt; i++ ) {
656 0 : fd_shred_t const * data_shred = (fd_shred_t const *)fd_type_pun_const( set->data_shreds[ i ] );
657 0 : fd_blockstore_shred_insert( ctx->blockstore, data_shred );
658 0 : }
659 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
660 : /* Shred came from block we didn't produce. This is not our leader
661 : slot. */
662 0 : fd_shred_t const * shred = (fd_shred_t const *)fd_type_pun_const( ctx->shred_buffer );
663 0 : uchar * buf = fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk );
664 0 : ulong sz = fd_shred_header_sz( shred->variant );
665 0 : fd_memcpy( buf, shred, sz );
666 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
667 0 : ulong replay_sig = fd_disco_shred_replay_sig( shred->slot, shred->idx, shred->fec_set_idx, fd_shred_is_code( fd_shred_type( shred->variant ) ), 1 );
668 0 : fd_stem_publish( stem, REPLAY_OUT_IDX, replay_sig, ctx->replay_out_chunk, sz, 0UL, ctx->tsorig, tspub );
669 0 : ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, sz, ctx->replay_out_chunk0, ctx->replay_out_wmark );
670 0 : }
671 0 : }
672 :
673 : /* Send to the blockstore, skipping any empty shred34_t s. */
674 0 : ulong new_sig = ctx->in_kind[ in_idx ]!=IN_KIND_NET; /* sig==0 means the store tile will do extra checks */
675 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
676 0 : fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+0UL ), sz0, 0UL, ctx->tsorig, tspub );
677 0 : if( FD_UNLIKELY( s34[ 1 ].shred_cnt ) )
678 0 : fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+1UL ), sz1, 0UL, ctx->tsorig, tspub );
679 0 : fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+2UL), sz2, 0UL, ctx->tsorig, tspub );
680 0 : if( FD_UNLIKELY( s34[ 3 ].shred_cnt ) )
681 0 : fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+3UL ), sz3, 0UL, ctx->tsorig, tspub );
682 :
683 : /* Compute all the destinations for all the new shreds */
684 :
685 0 : fd_shred_t const * new_shreds[ FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX ];
686 0 : ulong k=0UL;
687 0 : for( ulong i=0UL; i<set->data_shred_cnt; i++ )
688 0 : if( !d_rcvd_test( set->data_shred_rcvd, i ) ) new_shreds[ k++ ] = (fd_shred_t const *)set->data_shreds [ i ];
689 0 : for( ulong i=0UL; i<set->parity_shred_cnt; i++ )
690 0 : if( !p_rcvd_test( set->parity_shred_rcvd, i ) ) new_shreds[ k++ ] = (fd_shred_t const *)set->parity_shreds[ i ];
691 :
692 0 : if( FD_UNLIKELY( !k ) ) return;
693 0 : fd_shred_dest_t * sdest = fd_stake_ci_get_sdest_for_slot( ctx->stake_ci, new_shreds[ 0 ]->slot );
694 0 : if( FD_UNLIKELY( !sdest ) ) return;
695 :
696 0 : ulong out_stride;
697 0 : ulong max_dest_cnt[1];
698 0 : fd_shred_dest_idx_t * dests;
699 0 : if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
700 0 : out_stride = k;
701 0 : dests = fd_shred_dest_compute_children( sdest, new_shreds, k, _dests, k, fanout, fanout, max_dest_cnt );
702 0 : } else {
703 0 : out_stride = 1UL;
704 0 : *max_dest_cnt = 1UL;
705 0 : dests = fd_shred_dest_compute_first ( sdest, new_shreds, k, _dests );
706 0 : }
707 0 : if( FD_UNLIKELY( !dests ) ) return;
708 :
709 : /* Send only the ones we didn't receive. */
710 0 : for( ulong i=0UL; i<k; i++ ) for( ulong j=0UL; j<*max_dest_cnt; j++ ) send_shred( ctx, new_shreds[ i ], sdest, dests[ j*out_stride+i ], ctx->tsorig );
711 0 : }
712 :
713 : static void
714 : privileged_init( fd_topo_t * topo,
715 0 : fd_topo_tile_t * tile ) {
716 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
717 :
718 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
719 0 : fd_shred_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_shred_ctx_t ), sizeof( fd_shred_ctx_t ) );
720 :
721 0 : if( FD_UNLIKELY( !strcmp( tile->shred.identity_key_path, "" ) ) )
722 0 : FD_LOG_ERR(( "identity_key_path not set" ));
723 :
724 0 : ctx->identity_key[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->shred.identity_key_path, /* pubkey only: */ 1 ) );
725 0 : }
726 :
727 : static void
728 : fd_shred_signer( void * signer_ctx,
729 : uchar signature[ static 64 ],
730 0 : uchar const merkle_root[ static 32 ] ) {
731 0 : fd_keyguard_client_sign( signer_ctx, signature, merkle_root, 32UL, FD_KEYGUARD_SIGN_TYPE_ED25519 );
732 0 : }
733 :
734 : static void
735 : unprivileged_init( fd_topo_t * topo,
736 0 : fd_topo_tile_t * tile ) {
737 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
738 :
739 0 : if( FD_LIKELY( tile->out_cnt==3UL ) ) { /* frankendancer */
740 0 : FD_TEST( 0==strcmp( topo->links[tile->out_link_id[STORE_OUT_IDX]].name, "shred_store" ) );
741 0 : FD_TEST( 0==strcmp( topo->links[tile->out_link_id[NET_OUT_IDX]].name, "shred_net" ) );
742 0 : FD_TEST( 0==strcmp( topo->links[tile->out_link_id[SIGN_OUT_IDX]].name, "shred_sign" ) );
743 0 : } else if( FD_LIKELY( tile->out_cnt==4UL ) ) { /* firedancer */
744 0 : FD_TEST( 0==strcmp( topo->links[tile->out_link_id[STORE_OUT_IDX]].name, "shred_storei" ) );
745 0 : FD_TEST( 0==strcmp( topo->links[tile->out_link_id[NET_OUT_IDX]].name, "shred_net" ) );
746 0 : FD_TEST( 0==strcmp( topo->links[tile->out_link_id[SIGN_OUT_IDX]].name, "shred_sign" ) );
747 0 : FD_TEST( 0==strcmp( topo->links[tile->out_link_id[REPLAY_OUT_IDX]].name, "shred_replay" ) );
748 0 : } else {
749 0 : FD_LOG_ERR(( "shred tile has unexpected cnt of output links %lu", tile->out_cnt ));
750 0 : }
751 :
752 0 : if( FD_UNLIKELY( !tile->out_cnt ) )
753 0 : FD_LOG_ERR(( "shred tile has no primary output link" ));
754 :
755 0 : ulong shred_store_mcache_depth = tile->shred.depth;
756 0 : if( topo->links[ tile->out_link_id[ 0 ] ].depth != shred_store_mcache_depth )
757 0 : FD_LOG_ERR(( "shred tile out depths are not equal %lu %lu",
758 0 : topo->links[ tile->out_link_id[ 0 ] ].depth, shred_store_mcache_depth ));
759 :
760 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
761 0 : fd_shred_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof( fd_shred_ctx_t ), sizeof( fd_shred_ctx_t ) );
762 :
763 0 : ctx->round_robin_cnt = fd_topo_tile_name_cnt( topo, tile->name );
764 0 : ctx->round_robin_id = tile->kind_id;
765 0 : ctx->batch_cnt = 0UL;
766 0 : ctx->slot = ULONG_MAX;
767 :
768 0 : ulong fec_resolver_footprint = fd_fec_resolver_footprint( tile->shred.fec_resolver_depth, 1UL, shred_store_mcache_depth,
769 0 : 128UL * tile->shred.fec_resolver_depth );
770 0 : ulong fec_set_cnt = shred_store_mcache_depth + tile->shred.fec_resolver_depth + 4UL;
771 :
772 0 : void * store_out_dcache = topo->links[ tile->out_link_id[ 0 ] ].dcache;
773 :
774 0 : ulong required_dcache_sz = fec_set_cnt*DCACHE_ENTRIES_PER_FEC_SET*sizeof(fd_shred34_t);
775 0 : if( fd_dcache_data_sz( store_out_dcache )<required_dcache_sz ) {
776 0 : FD_LOG_ERR(( "shred->store dcache too small. It is %lu bytes but must be at least %lu bytes.",
777 0 : fd_dcache_data_sz( store_out_dcache ),
778 0 : required_dcache_sz ));
779 0 : }
780 :
781 0 : if( FD_UNLIKELY( !tile->shred.fec_resolver_depth ) ) FD_LOG_ERR(( "fec_resolver_depth not set" ));
782 0 : if( FD_UNLIKELY( !tile->shred.shred_listen_port ) ) FD_LOG_ERR(( "shred_listen_port not set" ));
783 :
784 0 : ulong bank_cnt = fd_topo_tile_name_cnt( topo, "bank" );
785 0 : ulong replay_cnt = fd_topo_tile_name_cnt( topo, "replay" );
786 :
787 0 : if( FD_UNLIKELY( !bank_cnt && !replay_cnt ) ) FD_LOG_ERR(( "0 bank/replay tiles" ));
788 0 : if( FD_UNLIKELY( bank_cnt>MAX_BANK_CNT ) ) FD_LOG_ERR(( "Too many banks" ));
789 :
790 0 : void * _stake_ci = FD_SCRATCH_ALLOC_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
791 0 : void * _resolver = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_resolver_align(), fec_resolver_footprint );
792 0 : void * _shredder = FD_SCRATCH_ALLOC_APPEND( l, fd_shredder_align(), fd_shredder_footprint() );
793 0 : void * _fec_sets = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_fec_set_t), sizeof(fd_fec_set_t)*fec_set_cnt );
794 :
795 0 : fd_fec_set_t * fec_sets = (fd_fec_set_t *)_fec_sets;
796 0 : fd_shred34_t * shred34 = (fd_shred34_t *)store_out_dcache;
797 :
798 0 : for( ulong i=0UL; i<fec_set_cnt; i++ ) {
799 0 : fd_shred34_t * p34_base = shred34 + i*DCACHE_ENTRIES_PER_FEC_SET;
800 0 : for( ulong k=0UL; k<DCACHE_ENTRIES_PER_FEC_SET; k++ ) {
801 0 : fd_shred34_t * p34 = p34_base + k;
802 :
803 0 : p34->stride = (ulong)p34->pkts[1].buffer - (ulong)p34->pkts[0].buffer;
804 0 : p34->offset = (ulong)p34->pkts[0].buffer - (ulong)p34;
805 0 : p34->shred_sz = fd_ulong_if( k<2UL, 1203UL, 1228UL );
806 0 : }
807 :
808 0 : uchar ** data_shred = fec_sets[ i ].data_shreds;
809 0 : uchar ** parity_shred = fec_sets[ i ].parity_shreds;
810 0 : for( ulong j=0UL; j<FD_REEDSOL_DATA_SHREDS_MAX; j++ ) data_shred [ j ] = p34_base[ j/34UL ].pkts[ j%34UL ].buffer;
811 0 : for( ulong j=0UL; j<FD_REEDSOL_PARITY_SHREDS_MAX; j++ ) parity_shred[ j ] = p34_base[ 2UL + j/34UL ].pkts[ j%34UL ].buffer;
812 0 : }
813 :
814 0 : #define NONNULL( x ) (__extension__({ \
815 0 : __typeof__((x)) __x = (x); \
816 0 : if( FD_UNLIKELY( !__x ) ) FD_LOG_ERR(( #x " was unexpectedly NULL" )); \
817 0 : __x; }))
818 :
819 0 : ulong expected_shred_version = tile->shred.expected_shred_version;
820 0 : if( FD_LIKELY( !expected_shred_version ) ) {
821 0 : ulong busy_obj_id = fd_pod_query_ulong( topo->props, "poh_shred", ULONG_MAX );
822 0 : FD_TEST( busy_obj_id!=ULONG_MAX );
823 0 : ulong * gossip_shred_version = fd_fseq_join( fd_topo_obj_laddr( topo, busy_obj_id ) );
824 0 : FD_LOG_INFO(( "Waiting for shred version to be determined via gossip." ));
825 0 : do {
826 0 : expected_shred_version = FD_VOLATILE_CONST( *gossip_shred_version );
827 0 : } while( expected_shred_version==ULONG_MAX );
828 0 : }
829 :
830 0 : if( FD_UNLIKELY( expected_shred_version > USHORT_MAX ) ) FD_LOG_ERR(( "invalid shred version %lu", expected_shred_version ));
831 0 : FD_LOG_INFO(( "Using shred version %hu", (ushort)expected_shred_version ));
832 :
833 0 : ctx->keyswitch = fd_keyswitch_join( fd_topo_obj_laddr( topo, tile->keyswitch_obj_id ) );
834 0 : FD_TEST( ctx->keyswitch );
835 :
836 : /* populate ctx */
837 0 : ulong sign_in_idx = fd_topo_find_tile_in_link( topo, tile, "sign_shred", tile->kind_id );
838 0 : FD_TEST( sign_in_idx!=ULONG_MAX );
839 0 : fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id[ sign_in_idx ] ];
840 0 : fd_topo_link_t * sign_out = &topo->links[ tile->out_link_id[ SIGN_OUT_IDX ] ];
841 0 : NONNULL( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client,
842 0 : sign_out->mcache,
843 0 : sign_out->dcache,
844 0 : sign_in->mcache,
845 0 : sign_in->dcache ) ) );
846 :
847 0 : ulong shred_limit = fd_ulong_if( tile->shred.larger_shred_limits_per_block, 32UL*32UL*1024UL, 32UL*1024UL );
848 0 : fd_fec_set_t * resolver_sets = fec_sets + (shred_store_mcache_depth+1UL)/2UL + 1UL;
849 0 : ctx->shredder = NONNULL( fd_shredder_join ( fd_shredder_new ( _shredder, fd_shred_signer, ctx->keyguard_client, (ushort)expected_shred_version ) ) );
850 0 : ctx->resolver = NONNULL( fd_fec_resolver_join ( fd_fec_resolver_new ( _resolver,
851 0 : fd_shred_signer, ctx->keyguard_client,
852 0 : tile->shred.fec_resolver_depth, 1UL,
853 0 : (shred_store_mcache_depth+3UL)/2UL,
854 0 : 128UL * tile->shred.fec_resolver_depth, resolver_sets,
855 0 : (ushort)expected_shred_version,
856 0 : shred_limit ) ) );
857 :
858 0 : ctx->shred34 = shred34;
859 0 : ctx->fec_sets = fec_sets;
860 :
861 0 : ctx->stake_ci = fd_stake_ci_join( fd_stake_ci_new( _stake_ci, ctx->identity_key ) );
862 :
863 0 : ctx->net_id = (ushort)0;
864 :
865 0 : fd_ip4_udp_hdr_init( ctx->data_shred_net_hdr, FD_SHRED_MIN_SZ, 0, tile->shred.shred_listen_port );
866 0 : fd_ip4_udp_hdr_init( ctx->parity_shred_net_hdr, FD_SHRED_MAX_SZ, 0, tile->shred.shred_listen_port );
867 :
868 0 : for( ulong i=0UL; i<tile->in_cnt; i++ ) {
869 0 : fd_topo_link_t const * link = &topo->links[ tile->in_link_id[ i ] ];
870 0 : fd_topo_wksp_t const * link_wksp = &topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ];
871 :
872 0 : if( FD_LIKELY( !strcmp( link->name, "net_shred" ) ) ) ctx->in_kind[ i ] = IN_KIND_NET;
873 0 : else if( FD_LIKELY( !strcmp( link->name, "poh_shred" ) ) ) ctx->in_kind[ i ] = IN_KIND_POH;
874 0 : else if( FD_LIKELY( !strcmp( link->name, "stake_out" ) ) ) ctx->in_kind[ i ] = IN_KIND_STAKE;
875 0 : else if( FD_LIKELY( !strcmp( link->name, "crds_shred" ) ) ) ctx->in_kind[ i ] = IN_KIND_CONTACT;
876 0 : else if( FD_LIKELY( !strcmp( link->name, "sign_shred" ) ) ) ctx->in_kind[ i ] = IN_KIND_SIGN;
877 0 : else FD_LOG_ERR(( "shred tile has unexpected input link %lu %s", i, link->name ));
878 :
879 0 : ctx->in[ i ].mem = link_wksp->wksp;
880 0 : ctx->in[ i ].chunk0 = fd_dcache_compact_chunk0( ctx->in[ i ].mem, link->dcache );
881 0 : ctx->in[ i ].wmark = fd_dcache_compact_wmark ( ctx->in[ i ].mem, link->dcache, link->mtu );
882 0 : }
883 :
884 0 : fd_topo_link_t * net_out = &topo->links[ tile->out_link_id[ NET_OUT_IDX ] ];
885 :
886 0 : ctx->net_out_mcache = net_out->mcache;
887 0 : ctx->net_out_sync = fd_mcache_seq_laddr( ctx->net_out_mcache );
888 0 : ctx->net_out_depth = fd_mcache_depth( ctx->net_out_mcache );
889 0 : ctx->net_out_seq = fd_mcache_seq_query( ctx->net_out_sync );
890 0 : ctx->net_out_chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( net_out->dcache ), net_out->dcache );
891 0 : ctx->net_out_mem = topo->workspaces[ topo->objs[ net_out->dcache_obj_id ].wksp_id ].wksp;
892 0 : ctx->net_out_wmark = fd_dcache_compact_wmark ( ctx->net_out_mem, net_out->dcache, net_out->mtu );
893 0 : ctx->net_out_chunk = ctx->net_out_chunk0;
894 :
895 0 : fd_topo_link_t * store_out = &topo->links[ tile->out_link_id[ STORE_OUT_IDX ] ];
896 :
897 0 : ctx->store_out_mem = topo->workspaces[ topo->objs[ store_out->dcache_obj_id ].wksp_id ].wksp;
898 0 : ctx->store_out_chunk0 = fd_dcache_compact_chunk0( ctx->store_out_mem, store_out->dcache );
899 0 : ctx->store_out_wmark = fd_dcache_compact_wmark ( ctx->store_out_mem, store_out->dcache, store_out->mtu );
900 0 : ctx->store_out_chunk = ctx->store_out_chunk0;
901 :
902 0 : if( FD_LIKELY( tile->out_cnt==4UL ) ) { /* firedancer */
903 0 : fd_topo_link_t * replay_out = &topo->links[ tile->out_link_id[ REPLAY_OUT_IDX ] ];
904 :
905 0 : ctx->replay_out_mem = topo->workspaces[ topo->objs[ replay_out->dcache_obj_id ].wksp_id ].wksp;
906 0 : ctx->replay_out_chunk0 = fd_dcache_compact_chunk0( ctx->replay_out_mem, replay_out->dcache );
907 0 : ctx->replay_out_wmark = fd_dcache_compact_wmark ( ctx->replay_out_mem, replay_out->dcache, replay_out->mtu );
908 0 : ctx->replay_out_chunk = ctx->replay_out_chunk0;
909 0 : }
910 :
911 0 : ulong blockstore_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "blockstore" );
912 0 : if (FD_LIKELY( blockstore_obj_id!=ULONG_MAX )) {
913 0 : ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, fd_topo_obj_laddr( topo, blockstore_obj_id ) );
914 0 : FD_TEST( ctx->blockstore->shmem->magic == FD_BLOCKSTORE_MAGIC );
915 0 : } else {
916 0 : ctx->blockstore = NULL;
917 0 : }
918 :
919 0 : ctx->poh_in_expect_seq = 0UL;
920 :
921 0 : ctx->shredder_fec_set_idx = 0UL;
922 0 : ctx->shredder_max_fec_set_idx = (shred_store_mcache_depth+1UL)/2UL + 1UL;
923 :
924 0 : ctx->send_fec_set_idx = ULONG_MAX;
925 :
926 0 : ctx->shred_buffer_sz = 0UL;
927 0 : fd_memset( ctx->shred_buffer, 0xFF, FD_NET_MTU );
928 :
929 0 : fd_histf_join( fd_histf_new( ctx->metrics->contact_info_cnt, FD_MHIST_MIN( SHRED, CLUSTER_CONTACT_INFO_CNT ),
930 0 : FD_MHIST_MAX( SHRED, CLUSTER_CONTACT_INFO_CNT ) ) );
931 0 : fd_histf_join( fd_histf_new( ctx->metrics->batch_sz, FD_MHIST_MIN( SHRED, BATCH_SZ ),
932 0 : FD_MHIST_MAX( SHRED, BATCH_SZ ) ) );
933 0 : fd_histf_join( fd_histf_new( ctx->metrics->batch_microblock_cnt, FD_MHIST_MIN( SHRED, BATCH_MICROBLOCK_CNT ),
934 0 : FD_MHIST_MAX( SHRED, BATCH_MICROBLOCK_CNT ) ) );
935 0 : fd_histf_join( fd_histf_new( ctx->metrics->shredding_timing, FD_MHIST_SECONDS_MIN( SHRED, SHREDDING_DURATION_SECONDS ),
936 0 : FD_MHIST_SECONDS_MAX( SHRED, SHREDDING_DURATION_SECONDS ) ) );
937 0 : fd_histf_join( fd_histf_new( ctx->metrics->add_shred_timing, FD_MHIST_SECONDS_MIN( SHRED, ADD_SHRED_DURATION_SECONDS ),
938 0 : FD_MHIST_SECONDS_MAX( SHRED, ADD_SHRED_DURATION_SECONDS ) ) );
939 0 : memset( ctx->metrics->shred_processing_result, '\0', sizeof(ctx->metrics->shred_processing_result) );
940 :
941 0 : ctx->pending_batch.microblock_cnt = 0UL;
942 0 : ctx->pending_batch.txn_cnt = 0UL;
943 0 : ctx->pending_batch.pos = 0UL;
944 0 : ctx->pending_batch.slot = 0UL;
945 0 : fd_memset( ctx->pending_batch.payload, 0, sizeof(ctx->pending_batch.payload) );
946 :
947 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
948 0 : if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
949 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
950 0 : }
951 :
952 : static ulong
953 : populate_allowed_seccomp( fd_topo_t const * topo,
954 : fd_topo_tile_t const * tile,
955 : ulong out_cnt,
956 0 : struct sock_filter * out ) {
957 0 : (void)topo;
958 0 : (void)tile;
959 :
960 0 : populate_sock_filter_policy_fd_shred_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
961 0 : return sock_filter_policy_fd_shred_tile_instr_cnt;
962 0 : }
963 :
964 : static ulong
965 : populate_allowed_fds( fd_topo_t const * topo,
966 : fd_topo_tile_t const * tile,
967 : ulong out_fds_cnt,
968 0 : int * out_fds ) {
969 0 : (void)topo;
970 0 : (void)tile;
971 :
972 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
973 :
974 0 : ulong out_cnt = 0UL;
975 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
976 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
977 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
978 0 : return out_cnt;
979 0 : }
980 :
981 0 : #define STEM_BURST (5UL)
982 :
983 : /* See explanation in fd_pack */
984 0 : #define STEM_LAZY (128L*3000L)
985 :
986 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_shred_ctx_t
987 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_shred_ctx_t)
988 :
989 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
990 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
991 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
992 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
993 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
994 :
995 : #include "../stem/fd_stem.c"
996 :
997 : fd_topo_run_tile_t fd_tile_shred = {
998 : .name = "shred",
999 : .populate_allowed_seccomp = populate_allowed_seccomp,
1000 : .populate_allowed_fds = populate_allowed_fds,
1001 : .scratch_align = scratch_align,
1002 : .scratch_footprint = scratch_footprint,
1003 : .privileged_init = privileged_init,
1004 : .unprivileged_init = unprivileged_init,
1005 : .run = stem_run,
1006 : };
|