Line data Source code
1 : /* The repair tile is responsible for repairing missing shreds that were
2 : not received via Turbine.
3 :
4 : Generally there are two distinct traffic patterns:
5 :
6 : 1. Firedancer boots up and fires off a large number of repairs to
7 : recover all the blocks between the snapshot on which it is booting
8 : and the head of the chain. In this mode, repair tile utilization
9 : is very high along with net and sign utilization.
10 :
11 : 2. Firedancer catches up to the head of the chain and enters steady
12 : state where most shred traffic is delivered over turbine. In this
13 : state, repairs are only occasionally needed to recover shreds lost
14 : due to anomalies like packet loss, transmitter (leader) never sent
15 : them or even a malicious leader etc. */
16 :
17 : #define _GNU_SOURCE
18 :
19 : #include "../genesis/fd_genesi_tile.h"
20 : #include "../../disco/topo/fd_topo.h"
21 : #include "generated/fd_repair_tile_seccomp.h"
22 : #include "../../disco/fd_disco.h"
23 : #include "../../disco/keyguard/fd_keyload.h"
24 : #include "../../disco/keyguard/fd_keyguard.h"
25 : #include "../../disco/net/fd_net_tile.h"
26 : #include "../../flamenco/gossip/fd_gossip_types.h"
27 : #include "../tower/fd_tower_tile.h"
28 : #include "../../discof/restore/utils/fd_ssmsg.h"
29 : #include "../../util/net/fd_net_headers.h"
30 : #include "../../tango/fd_tango_base.h"
31 :
32 : #include "../forest/fd_forest.h"
33 : #include "fd_repair_metrics.h"
34 : #include "fd_inflight.h"
35 : #include "fd_repair.h"
36 : #include "fd_policy.h"
37 :
38 : #define LOGGING 1
39 : #define DEBUG_LOGGING 0
40 :
41 : #define IN_KIND_CONTACT (0)
42 0 : #define IN_KIND_NET (1)
43 0 : #define IN_KIND_TOWER (2)
44 0 : #define IN_KIND_SHRED (3)
45 0 : #define IN_KIND_SIGN (4)
46 0 : #define IN_KIND_SNAP (5)
47 0 : #define IN_KIND_STAKE (6)
48 0 : #define IN_KIND_GOSSIP (7)
49 0 : #define IN_KIND_GENESIS (8)
50 :
51 : #define MAX_IN_LINKS (32)
52 :
53 : #define MAX_REPAIR_PEERS 40200UL
54 : #define MAX_BUFFER_SIZE ( MAX_REPAIR_PEERS * sizeof( fd_shred_dest_wire_t ) )
55 : #define MAX_SHRED_TILE_CNT ( 16UL )
56 : #define MAX_SIGN_TILE_CNT ( 16UL )
57 :
58 : /* Maximum size of a network packet */
59 0 : #define FD_REPAIR_MAX_PACKET_SIZE 1232
60 : /* Max number of validators that can be actively queried */
61 0 : #define FD_ACTIVE_KEY_MAX (FD_CONTACT_INFO_TABLE_SIZE)
62 : /* Max number of pending shred requests */
63 0 : #define FD_NEEDED_KEY_MAX (1<<20)
64 :
65 : /* static map from request type to metric array index */
66 : static uint metric_index[FD_REPAIR_KIND_ORPHAN + 1] = {
67 : [FD_REPAIR_KIND_SHRED] = FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_V_NEEDED_WINDOW_IDX,
68 : [FD_REPAIR_KIND_HIGHEST_SHRED] = FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_V_NEEDED_HIGHEST_WINDOW_IDX,
69 : [FD_REPAIR_KIND_ORPHAN] = FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_V_NEEDED_ORPHAN_IDX,
70 : };
71 :
72 : typedef union {
73 : struct {
74 : fd_wksp_t * mem;
75 : ulong chunk0;
76 : ulong wmark;
77 : ulong mtu;
78 : };
79 : fd_net_rx_bounds_t net_rx;
80 : } in_ctx_t;
81 :
82 : struct out_ctx {
83 : ulong idx;
84 : fd_wksp_t * mem;
85 : ulong chunk0;
86 : ulong wmark;
87 : ulong chunk;
88 :
89 : /* Repair tile directly tracks credit outside of stem for these
90 : asynchronous sign links. In particular, credits tracks the RETURN
91 : sign_repair link. This is because repair_sign is reliable, and
92 : sign_repair is unreliable. If both links were reliable, and the
93 : links filled completely, stem would get into a deadlock. Neither
94 : repair or sign would have credits, which would prevent frags from
95 : getting polled in repair or sign, which would prevent any credits
96 : from getting returned back to the tiles. So the sign_repair return
97 : link must be unreliable. credits / max_credits are used by the
98 : repair_sign link. In particular, credits manages the RETURN
99 : sign_repair link.
100 :
101 : Consider the scenario:
102 :
103 : repair_sign (depth 128) sign_repair (depth 128)
104 : repair ----------------------> sign ------------------------> repair
105 : [rest free, r130, r129] [r128, r127, ... , r1] (full)
106 :
107 : If repair is publishing too many requests too fast(common in
108 : catchup), and not polling enough frags from sign, without manual
109 : management the sign_repair link would be overrun. Nothing is
110 : stopping repair from publishing more requests, because sign is
111 : functioning fast enough to handle the requests. However, nothing is
112 : stopping sign from polling the next request and signing it, and
113 : PUBLISHING it on the sign_repair link that is already full, because
114 : the sign_repair link is unreliable.
115 :
116 : This is why we need to manually track credits for the sign_repair
117 : link. We must ensure that there are never more than 128 items in
118 : the ENTIRE repair_sign -> sign tile -> sign_repair work queue, else
119 : there is always a possibility of an overrun in the sign_repair
120 : link.
121 :
122 : We can furthermore ensure some nice properties by having the
123 : repair_sign link have a greater depth than the sign_repair link.
124 : This way, we exclusively use manual credit management to control
125 : the rate at which we publish requests to sign. We can then avoid
126 : being stem backpressured, which allows us to keep polling frags and
127 : reading incoming shreds, even when the repair sign link is "full."
128 : This is a non-necessary property for good performance.
129 :
130 : To lose a frag to overrun isn't necessarily critical, but in
131 : general the repair tile relies on the fact that a signing task
132 : published to sign tile will always come back. If we lose a frag to
133 : overrun, then there will be an entry in the pending signs structure
134 : that is never removed, and theoretically the map could fill up.
135 : Conceptually, with a reliable sign->repair->sign structure, there
136 : should be no eviction needed in this pending signs structure. */
137 :
138 : ulong in_idx; /* index of the incoming link */
139 : ulong credits; /* available credits for link */
140 : ulong max_credits; /* maximum credits (depth) */
141 : };
142 : typedef struct out_ctx out_ctx_t;
143 :
144 : struct fd_fec_sig {
145 : ulong key; /* map key. 32 msb = slot, 32 lsb = fec_set_idx */
146 : fd_ed25519_sig_t sig; /* Ed25519 sig identifier of the FEC. */
147 : };
148 : typedef struct fd_fec_sig fd_fec_sig_t;
149 :
150 : #define MAP_NAME fd_fec_sig
151 0 : #define MAP_T fd_fec_sig_t
152 : #define MAP_MEMOIZE 0
153 : #include "../../util/tmpl/fd_map_dynamic.c"
154 :
155 : /* Data needed to sign and send a pong that is not contained in the
156 : pong msg itself. */
157 :
158 : struct pong_data {
159 : fd_ip4_port_t peer_addr;
160 : fd_hash_t hash;
161 : uint daddr;
162 : };
163 : typedef struct pong_data pong_data_t;
164 :
165 : struct sign_req {
166 : ulong key; /* map key, ctx->pending_key_next */
167 : ulong buflen;
168 : union {
169 : uchar buf[sizeof(fd_repair_msg_t)];
170 : fd_repair_msg_t msg;
171 : };
172 : pong_data_t pong_data; /* populated only for pong msgs */
173 : };
174 : typedef struct sign_req sign_req_t;
175 :
176 : #define MAP_NAME fd_signs_map
177 0 : #define MAP_KEY key
178 0 : #define MAP_KEY_NULL ULONG_MAX
179 0 : #define MAP_KEY_INVAL(k) (k==ULONG_MAX)
180 0 : #define MAP_T sign_req_t
181 : #define MAP_MEMOIZE 0
182 : #include "../../util/tmpl/fd_map_dynamic.c"
183 :
184 : /* Because the sign tiles could be all busy when a contact info arrives,
185 : we need to save ping messages to be signed in a queue and dispatched
186 : in after_credit when there are sign tiles available. The size of the
187 : queue was determined by the following: we can limit the size of this
188 : queue to be the maximum number of active keys - which is equal to the
189 : number of warm up requests we might queue. The queue will also hold
190 : pongs, but in order for the ping to arrive the warm up request must
191 : have left the queue. It is possible that we start up and get
192 : FD_ACTIVE_KEY_MAX peers gossiped to us, and as we are queueing up
193 : their pings they all drop and another FD_ACTIVE_KEY_MAX new peers
194 : gossip to us, causing us to fill up the queue. Idk overall this
195 : scenario is highly unlikely and it's not the end of the world if we
196 : drop a warmup req or ping to a peer because the first req to them
197 : will retrigger it anyway.
198 :
199 : Typical flow is that a pong will get added to the sign_queue during
200 : an after_frag call. Then on the following after_credit will get
201 : popped from the sign_queue and added to sign_map, and then dispatched
202 : to the sign tile. */
203 :
204 : struct sign_pending {
205 : fd_repair_msg_t msg;
206 : pong_data_t pong_data; /* populated only for pong msgs */
207 : };
208 : typedef struct sign_pending sign_pending_t;
209 :
210 : #define QUEUE_NAME fd_signs_queue
211 0 : #define QUEUE_T sign_pending_t
212 0 : #define QUEUE_MAX 2*FD_ACTIVE_KEY_MAX
213 : #include "../../util/tmpl/fd_queue.c"
214 :
215 : struct ctx {
216 : long tsdebug; /* timestamp for debug printing */
217 :
218 : ulong repair_seed;
219 :
220 : fd_ip4_port_t repair_intake_addr;
221 : fd_ip4_port_t repair_serve_addr;
222 :
223 : fd_forest_t * forest;
224 : fd_fec_sig_t * fec_sigs;
225 : fd_policy_t * policy;
226 : fd_inflights_t * inflights;
227 : fd_repair_t * protocol;
228 :
229 : fd_pubkey_t identity_public_key;
230 :
231 : fd_wksp_t * wksp;
232 :
233 : fd_stem_context_t * stem;
234 :
235 : uchar in_kind[ MAX_IN_LINKS ];
236 : in_ctx_t in_links[ MAX_IN_LINKS ];
237 :
238 : int skip_frag;
239 :
240 : uint net_out_idx;
241 : fd_wksp_t * net_out_mem;
242 : ulong net_out_chunk0;
243 : ulong net_out_wmark;
244 : ulong net_out_chunk;
245 :
246 : ulong snap_out_chunk;
247 :
248 : uint shred_tile_cnt;
249 : out_ctx_t shred_out_ctx[ MAX_SHRED_TILE_CNT ];
250 :
251 : /* repair_sign links (to sign tiles 1+) - for round-robin distribution */
252 :
253 : ulong repair_sign_cnt;
254 : out_ctx_t repair_sign_out_ctx[ MAX_SIGN_TILE_CNT ];
255 :
256 : ulong sign_rrobin_idx;
257 :
258 : /* Pending sign requests for async operations */
259 :
260 : uint pending_key_next;
261 : sign_req_t * signs_map; /* contains any request currently in the repair->sign or sign->repair dcache */
262 : sign_pending_t * sign_queue; /* contains any request waiting to be dispatched to repair->sign */
263 :
264 : ushort net_id;
265 : uchar buffer[ MAX_BUFFER_SIZE ]; /* includes Ethernet, IP, UDP headers */
266 : fd_ip4_udp_hdrs_t intake_hdr[1];
267 : fd_ip4_udp_hdrs_t serve_hdr [1];
268 :
269 : ulong manifest_slot;
270 : struct {
271 : ulong send_pkt_cnt;
272 : ulong sent_pkt_types[FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_CNT];
273 : ulong repaired_slots;
274 : ulong current_slot;
275 : ulong sign_tile_unavail;
276 : ulong rerequest;
277 : ulong malformed_ping;
278 : fd_histf_t slot_compl_time[ 1 ];
279 : fd_histf_t response_latency[ 1 ];
280 : } metrics[ 1 ];
281 :
282 : /* Slot-level metrics */
283 :
284 : fd_repair_metrics_t * slot_metrics;
285 : ulong turbine_slot0; // catchup considered complete after this slot
286 : struct {
287 : int enabled;
288 : ulong end_slot;
289 : int complete;
290 : } profiler;
291 : };
292 : typedef struct ctx ctx_t;
293 :
294 : FD_FN_CONST static inline ulong
295 0 : scratch_align( void ) {
296 0 : return 128UL;
297 0 : }
298 :
299 : FD_FN_PURE static inline ulong
300 0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
301 0 : return 1UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
302 0 : }
303 :
304 : FD_FN_PURE static inline ulong
305 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
306 0 : ulong total_sign_depth = tile->repair.repair_sign_depth * tile->repair.repair_sign_cnt;
307 0 : int lg_sign_depth = fd_ulong_find_msb( fd_ulong_pow2_up(total_sign_depth) ) + 1;
308 :
309 0 : ulong l = FD_LAYOUT_INIT;
310 0 : l = FD_LAYOUT_APPEND( l, alignof(ctx_t), sizeof(ctx_t) );
311 0 : l = FD_LAYOUT_APPEND( l, fd_repair_align(), fd_repair_footprint () );
312 0 : l = FD_LAYOUT_APPEND( l, fd_forest_align(), fd_forest_footprint ( tile->repair.slot_max ) );
313 0 : l = FD_LAYOUT_APPEND( l, fd_policy_align(), fd_policy_footprint ( FD_NEEDED_KEY_MAX, FD_ACTIVE_KEY_MAX ) );
314 0 : l = FD_LAYOUT_APPEND( l, fd_inflights_align(), fd_inflights_footprint () );
315 0 : l = FD_LAYOUT_APPEND( l, fd_fec_sig_align(), fd_fec_sig_footprint ( 20 ) );
316 0 : l = FD_LAYOUT_APPEND( l, fd_signs_map_align(), fd_signs_map_footprint ( lg_sign_depth ) );
317 0 : l = FD_LAYOUT_APPEND( l, fd_signs_queue_align(), fd_signs_queue_footprint() );
318 0 : l = FD_LAYOUT_APPEND( l, fd_repair_metrics_align(), fd_repair_metrics_footprint() );
319 0 : return FD_LAYOUT_FINI( l, scratch_align() );
320 0 : }
321 :
322 : /* Below functions manage the current pending sign requests. */
323 :
324 : sign_req_t *
325 : sign_map_insert( ctx_t * ctx,
326 : fd_repair_msg_t const * msg,
327 0 : pong_data_t const * opt_pong_data ) {
328 :
329 : /* Check if there is any space for a new pending sign request. Should never fail as long as credit management is working. */
330 0 : if( FD_UNLIKELY( fd_signs_map_key_cnt( ctx->signs_map )==fd_signs_map_key_max( ctx->signs_map ) ) ) return NULL;
331 :
332 0 : sign_req_t * pending = fd_signs_map_insert( ctx->signs_map, ctx->pending_key_next++ );
333 0 : if( FD_UNLIKELY( !pending ) ) return NULL; // Not possible, unless the same nonce is used twice.
334 0 : pending->msg = *msg;
335 0 : pending->buflen = fd_repair_sz( msg );
336 0 : if( FD_UNLIKELY( opt_pong_data ) ) pending->pong_data = *opt_pong_data;
337 0 : return pending;
338 0 : }
339 :
340 : int
341 : sign_map_remove( ctx_t * ctx,
342 0 : ulong key ) {
343 0 : sign_req_t * pending = fd_signs_map_query( ctx->signs_map, key, NULL );
344 0 : if( FD_UNLIKELY( !pending ) ) return -1;
345 0 : fd_signs_map_remove( ctx->signs_map, pending );
346 0 : return 0;
347 0 : }
348 :
349 : static void
350 : send_packet( ctx_t * ctx,
351 : fd_stem_context_t * stem,
352 : int is_intake,
353 : uint dst_ip_addr,
354 : ushort dst_port,
355 : uint src_ip_addr,
356 : uchar const * payload,
357 : ulong payload_sz,
358 0 : ulong tsorig ) {
359 0 : ctx->metrics->send_pkt_cnt++;
360 0 : uchar * packet = fd_chunk_to_laddr( ctx->net_out_mem, ctx->net_out_chunk );
361 0 : fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)packet;
362 0 : *hdr = *(is_intake ? ctx->intake_hdr : ctx->serve_hdr);
363 :
364 0 : fd_ip4_hdr_t * ip4 = hdr->ip4;
365 0 : ip4->saddr = src_ip_addr;
366 0 : ip4->daddr = dst_ip_addr;
367 0 : ip4->net_id = fd_ushort_bswap( ctx->net_id++ );
368 0 : ip4->check = 0U;
369 0 : ip4->net_tot_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_ip4_hdr_t)+sizeof(fd_udp_hdr_t)) );
370 0 : ip4->check = fd_ip4_hdr_check_fast( ip4 );
371 :
372 0 : fd_udp_hdr_t * udp = hdr->udp;
373 0 : udp->net_dport = dst_port;
374 0 : udp->net_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_udp_hdr_t)) );
375 0 : fd_memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), payload, payload_sz );
376 0 : hdr->udp->check = 0U;
377 :
378 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
379 0 : ulong sig = fd_disco_netmux_sig( dst_ip_addr, dst_port, dst_ip_addr, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) );
380 0 : ulong packet_sz = payload_sz + sizeof(fd_ip4_udp_hdrs_t);
381 0 : ulong chunk = ctx->net_out_chunk;
382 0 : fd_stem_publish( stem, ctx->net_out_idx, sig, chunk, packet_sz, 0UL, tsorig, tspub );
383 0 : ctx->net_out_chunk = fd_dcache_compact_next( chunk, packet_sz, ctx->net_out_chunk0, ctx->net_out_wmark );
384 0 : }
385 :
386 : /* Returns a sign_out context with max available credits.
387 : If no sign_out context has available credits, returns NULL. */
388 : static out_ctx_t *
389 0 : sign_avail_credits( ctx_t * ctx ) {
390 0 : out_ctx_t * sign_out = NULL;
391 0 : ulong max_credits = 0;
392 0 : for( uint i = 0; i < ctx->repair_sign_cnt; i++ ) {
393 0 : if( ctx->repair_sign_out_ctx[i].credits > max_credits ) {
394 0 : max_credits = ctx->repair_sign_out_ctx[i].credits;
395 0 : sign_out = &ctx->repair_sign_out_ctx[i];
396 0 : }
397 0 : }
398 0 : return sign_out;
399 0 : }
400 :
401 : /* Prepares the signing preimage and publishes a signing request that
402 : will be signed asynchronously by the sign tile. The signed data will
403 : be returned via dcache as a frag. */
404 : static void
405 : fd_repair_send_sign_request( ctx_t * ctx,
406 : out_ctx_t * sign_out,
407 : fd_repair_msg_t const * msg,
408 0 : pong_data_t const * opt_pong_data ){
409 : /* New sign request */
410 0 : sign_req_t * pending = sign_map_insert( ctx, msg, opt_pong_data );
411 0 : if( FD_UNLIKELY( !pending ) ) return;
412 :
413 0 : ulong sig = 0;
414 0 : ulong preimage_sz = 0;
415 0 : uchar * dst = fd_chunk_to_laddr( sign_out->mem, sign_out->chunk );
416 :
417 0 : if( FD_UNLIKELY( msg->kind == FD_REPAIR_KIND_PONG ) ) {
418 0 : uchar pre_image[FD_REPAIR_PONG_PREIMAGE_SZ];
419 0 : preimage_pong( &opt_pong_data->hash, pre_image, sizeof(pre_image) );
420 0 : preimage_sz = FD_REPAIR_PONG_PREIMAGE_SZ;
421 0 : fd_memcpy( dst, pre_image, preimage_sz );
422 0 : sig = ((ulong)pending->key << 32) | (uint)FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519;
423 0 : } else {
424 : /* Sign and prepare the message directly into the pending buffer */
425 0 : uchar * preimage = preimage_req( &pending->msg, &preimage_sz );
426 0 : fd_memcpy( dst, preimage, preimage_sz );
427 0 : sig = ((ulong)pending->key << 32) | (uint)FD_KEYGUARD_SIGN_TYPE_ED25519;
428 0 : }
429 :
430 0 : fd_stem_publish( ctx->stem, sign_out->idx, sig, sign_out->chunk, preimage_sz, 0UL, 0UL, 0UL );
431 0 : sign_out->chunk = fd_dcache_compact_next( sign_out->chunk, preimage_sz, sign_out->chunk0, sign_out->wmark );
432 :
433 0 : ctx->metrics->sent_pkt_types[metric_index[msg->kind]]++;
434 0 : sign_out->credits--;
435 0 : }
436 :
437 : static inline int
438 : before_frag( ctx_t * ctx,
439 : ulong in_idx,
440 : ulong seq FD_PARAM_UNUSED,
441 0 : ulong sig ) {
442 0 : uint in_kind = ctx->in_kind[ in_idx ];
443 0 : if( FD_LIKELY ( in_kind==IN_KIND_NET ) ) return fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR;
444 0 : if( FD_UNLIKELY( in_kind==IN_KIND_SHRED ) ) return fd_int_if( fd_forest_root_slot( ctx->forest )==ULONG_MAX, -1, 0 ); /* not ready to read frag */
445 0 : if( FD_UNLIKELY( in_kind==IN_KIND_GOSSIP ) ) {
446 0 : return sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO &&
447 0 : sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE;
448 0 : }
449 0 : return 0;
450 0 : }
451 :
452 : static void
453 : during_frag( ctx_t * ctx,
454 : ulong in_idx,
455 : ulong seq FD_PARAM_UNUSED,
456 : ulong sig FD_PARAM_UNUSED,
457 : ulong chunk,
458 : ulong sz,
459 0 : ulong ctl ) {
460 0 : ctx->skip_frag = 0;
461 :
462 0 : uint in_kind = ctx->in_kind[ in_idx ];
463 0 : in_ctx_t const * in_ctx = &ctx->in_links[ in_idx ];
464 :
465 0 : if( FD_UNLIKELY( in_kind==IN_KIND_TOWER ) ) {
466 0 : if( FD_UNLIKELY( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark || sz>in_ctx->mtu ) ) {
467 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in_ctx->chunk0, in_ctx->wmark ));
468 0 : }
469 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
470 0 : fd_memcpy( ctx->buffer, dcache_entry, sz );
471 0 : return;
472 0 : }
473 :
474 0 : if( FD_UNLIKELY( in_kind==IN_KIND_GENESIS ) ) {
475 0 : return;
476 0 : }
477 0 : if( FD_UNLIKELY( in_kind==IN_KIND_NET ) ) {
478 0 : uchar const * dcache_entry = fd_net_rx_translate_frag( &in_ctx->net_rx, chunk, ctl, sz );
479 0 : fd_memcpy( ctx->buffer, dcache_entry, sz );
480 0 : return;
481 0 : }
482 :
483 0 : if( FD_UNLIKELY( in_kind==IN_KIND_GOSSIP ) ) {
484 0 : if( FD_UNLIKELY( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark || sz>in_ctx->mtu ) ) {
485 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in_ctx->chunk0, in_ctx->wmark ));
486 0 : }
487 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
488 0 : fd_memcpy( ctx->buffer, dcache_entry, sz );
489 0 : return;
490 0 : }
491 :
492 0 : if( FD_LIKELY ( in_kind==IN_KIND_SHRED ) ) {
493 0 : if( FD_UNLIKELY( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark || sz>in_ctx->mtu ) ) {
494 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in_ctx->chunk0, in_ctx->wmark ));
495 0 : }
496 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
497 0 : if( FD_LIKELY( sz > 0 ) ) fd_memcpy( ctx->buffer, dcache_entry, sz );
498 0 : return;
499 0 : }
500 :
501 0 : if( FD_UNLIKELY( in_kind==IN_KIND_STAKE ) ) {
502 0 : return;
503 0 : }
504 :
505 0 : if( FD_UNLIKELY( in_kind==IN_KIND_SNAP ) ) {
506 0 : if( FD_UNLIKELY( fd_ssmsg_sig_message( sig )!=FD_SSMSG_DONE ) ) ctx->snap_out_chunk = chunk;
507 0 : return;
508 0 : }
509 :
510 0 : if( FD_UNLIKELY( in_kind==IN_KIND_SIGN ) ) {
511 0 : if( FD_UNLIKELY( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark || sz>in_ctx->mtu ) ) {
512 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in_ctx->chunk0, in_ctx->wmark ));
513 0 : }
514 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
515 0 : fd_memcpy( ctx->buffer, dcache_entry, sz );
516 0 : return;
517 0 : }
518 :
519 0 : FD_LOG_ERR(( "Frag from unknown link (kind=%u in_idx=%lu)", in_kind, in_idx ));
520 0 : }
521 :
522 : static inline void
523 : after_snap( ctx_t * ctx,
524 : ulong sig,
525 0 : uchar const * chunk ) {
526 0 : if( FD_UNLIKELY( fd_ssmsg_sig_message( sig )!=FD_SSMSG_DONE ) ) return;
527 0 : fd_snapshot_manifest_t * manifest = (fd_snapshot_manifest_t *)chunk;
528 :
529 0 : fd_forest_init( ctx->forest, manifest->slot );
530 0 : FD_TEST( fd_forest_root_slot( ctx->forest )!=ULONG_MAX );
531 :
532 0 : }
533 :
534 : static inline void
535 0 : after_contact( ctx_t * ctx, fd_gossip_update_message_t const * msg ) {
536 0 : fd_contact_info_t const * contact_info = msg->contact_info.contact_info;
537 0 : fd_ip4_port_t repair_peer = contact_info->sockets[ FD_CONTACT_INFO_SOCKET_SERVE_REPAIR ];
538 0 : if( FD_UNLIKELY( !repair_peer.addr || !repair_peer.port ) ) return;
539 0 : fd_policy_peer_t const * peer = fd_policy_peer_insert( ctx->policy, &contact_info->pubkey, &repair_peer );
540 0 : if( peer ) {
541 : /* The repair process uses a Ping-Pong protocol that incurs one
542 : round-trip time (RTT) for the initial repair request. To
543 : optimize this, we proactively send a placeholder repair request
544 : as soon as we receive a peer's contact information for the first
545 : time, effectively prepaying the RTT cost. */
546 0 : fd_repair_msg_t * init = fd_repair_shred( ctx->protocol, &contact_info->pubkey, (ulong)fd_log_wallclock()/1000000L, 0, 0, 0 );
547 0 : fd_signs_queue_push( ctx->sign_queue, (sign_pending_t){ .msg = *init } );
548 0 : }
549 0 : }
550 :
551 : static inline void
552 : after_sign( ctx_t * ctx,
553 : ulong in_idx,
554 : ulong sig,
555 0 : fd_stem_context_t * stem ) {
556 0 : ulong pending_key = sig >> 32;
557 : /* Look up the pending request. Since the repair_sign links are
558 : reliable, the incoming sign_repair fragments represent a complete
559 : set of the previously sent outgoing messages. However, with
560 : multiple sign tiles, the responses may arrive interleaved. */
561 :
562 : /* Find which sign tile sent this response and increment its credits */
563 0 : for( uint i = 0; i < ctx->repair_sign_cnt; i++ ) {
564 0 : if( ctx->repair_sign_out_ctx[i].in_idx == in_idx ) {
565 0 : if( ctx->repair_sign_out_ctx[i].credits < ctx->repair_sign_out_ctx[i].max_credits ) {
566 0 : ctx->repair_sign_out_ctx[i].credits++;
567 0 : }
568 0 : break;
569 0 : }
570 0 : }
571 :
572 0 : sign_req_t * pending_ = fd_signs_map_query( ctx->signs_map, pending_key, NULL );
573 0 : if( FD_UNLIKELY( !pending_ ) ) FD_LOG_CRIT(( "No pending request found for key %lu", pending_key ));
574 :
575 0 : sign_req_t pending[1] = { *pending_ }; /* Make a copy of the pending request so we can sign_map_remove immediately. */
576 0 : sign_map_remove( ctx, pending_key );
577 :
578 : /* Thhis is a pong message */
579 0 : if( FD_UNLIKELY( pending->msg.kind == FD_REPAIR_KIND_PONG ) ) {
580 0 : fd_memcpy( pending->msg.pong.sig, ctx->buffer, 64UL );
581 0 : send_packet( ctx, stem, 1, pending->pong_data.peer_addr.addr, pending->pong_data.peer_addr.port, pending->pong_data.daddr, pending->buf, fd_repair_sz( &pending->msg ), fd_frag_meta_ts_comp( fd_tickcount() ) );
582 0 : return;
583 0 : }
584 :
585 : /* Inject the signature into the pending request */
586 0 : fd_memcpy( pending->buf + 4, ctx->buffer, 64UL );
587 0 : uint src_ip4 = 0U;
588 :
589 : /* This is a warmup message */
590 0 : if( FD_UNLIKELY( pending->msg.kind == FD_REPAIR_KIND_SHRED && pending->msg.shred.slot == 0 ) ) {
591 0 : fd_policy_peer_t * active = fd_policy_peer_query( ctx->policy, &pending->msg.shred.to );
592 0 : if( FD_UNLIKELY( active ) ) send_packet( ctx, stem, 1, active->ip4, active->port, src_ip4, pending->buf, pending->buflen, fd_frag_meta_ts_comp( fd_tickcount() ) );
593 0 : else { /* This is a warmup request for a peer that is no longer active. There's no reason to pick another peer for a warmup rq, so just drop it. */ }
594 0 : return;
595 0 : }
596 :
597 : /* This is a regular repair shred request
598 :
599 : TODO: anyways to make this less complicated? Essentially we need to
600 : ensure we always send out any shred requests we have, because policy_next
601 : has no way to revisit a shred. But the fact that peers can drop out
602 : of the active peer list makes this complicated.
603 :
604 : 1. If the peer is still there (common), it's fine.
605 : 2. If the peer is not there, we can select another peer and send the request.
606 : 3. If the peer is not there, and we have no other peers, we can add
607 : this request to the inflights table, pretend we've sent it and
608 : let the inflight timeout request it down the line.
609 : */
610 0 : fd_policy_peer_t * active = fd_policy_peer_query( ctx->policy, &pending->msg.shred.to );
611 0 : int is_regular_req = pending->msg.kind == FD_REPAIR_KIND_SHRED && pending->msg.shred.nonce > 0; // not a highest/orphan request
612 :
613 0 : if( FD_UNLIKELY( !active ) ) {
614 0 : fd_pubkey_t const * new_peer = fd_policy_peer_select( ctx->policy );
615 0 : if( FD_LIKELY( new_peer ) ) {
616 : /* We have a new peer, so we can send the request */
617 0 : pending->msg.shred.to = *new_peer;
618 0 : fd_signs_queue_push( ctx->sign_queue, (sign_pending_t){ .msg = pending->msg } );
619 0 : }
620 :
621 0 : if( FD_UNLIKELY( !new_peer && is_regular_req ) ) {
622 : /* This is real devastation - we clearly had a peer at the time of
623 : making this request, but for some reason we now have ZERO
624 : peers. The only thing we can do is to add this artificially to
625 : the inflights table, pretend we've sent it and let the inflight
626 : timeout request it down the line. */
627 0 : fd_inflights_request_insert( ctx->inflights, pending->msg.shred.nonce, &pending->msg.shred.to, pending->msg.shred.slot, pending->msg.shred.shred_idx );
628 0 : }
629 0 : return;
630 0 : }
631 : /* Happy path - all is well, our peer didn't drop out from beneath us. */
632 0 : if( FD_LIKELY( is_regular_req ) ) {
633 0 : fd_inflights_request_insert( ctx->inflights, pending->msg.shred.nonce, &pending->msg.shred.to, pending->msg.shred.slot, pending->msg.shred.shred_idx );
634 0 : fd_policy_peer_request_update( ctx->policy, &pending->msg.shred.to );
635 0 : }
636 0 : send_packet( ctx, stem, 1, active->ip4, active->port, src_ip4, pending->buf, pending->buflen, fd_frag_meta_ts_comp( fd_tickcount() ) );
637 0 : }
638 :
639 : static inline void
640 : after_shred( ctx_t * ctx,
641 : ulong sig,
642 : fd_shred_t * shred,
643 0 : ulong nonce ) {
644 : /* Insert the shred sig (shared by all shred members in the FEC set)
645 : into the map. */
646 0 : int is_code = fd_shred_is_code( fd_shred_type( shred->variant ) );
647 0 : int src = fd_disco_shred_out_shred_sig_is_turbine( sig ) ? SHRED_SRC_TURBINE : SHRED_SRC_REPAIR;
648 0 : if( FD_LIKELY( !is_code ) ) {
649 0 : long rtt = 0;
650 0 : fd_pubkey_t peer;
651 0 : if( FD_UNLIKELY( src == SHRED_SRC_REPAIR && ( rtt = fd_inflights_request_remove( ctx->inflights, nonce, &peer ) ) > 0 ) ) {
652 0 : fd_policy_peer_response_update( ctx->policy, &peer, rtt );
653 0 : fd_histf_sample( ctx->metrics->response_latency, (ulong)rtt );
654 0 : }
655 :
656 0 : int slot_complete = !!(shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE);
657 0 : int ref_tick = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
658 0 : fd_forest_blk_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off );
659 0 : if( FD_UNLIKELY( ctx->profiler.enabled && shred->slot == ctx->profiler.end_slot ) ) fd_forest_blk_parent_update( ctx->forest, shred->slot, shred->slot - shred->data.parent_off );
660 0 : fd_forest_data_shred_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off, shred->idx, shred->fec_set_idx, slot_complete, ref_tick, src );
661 0 : } else {
662 0 : fd_forest_code_shred_insert( ctx->forest, shred->slot, shred->idx );
663 0 : }
664 0 : }
665 :
666 : static inline void
667 : after_fec( ctx_t * ctx,
668 0 : fd_shred_t * shred ) {
669 :
670 : /* When this is a FEC completes msg, it is implied that all the
671 : other shreds in the FEC set can also be inserted. Shred inserts
672 : into the forest are idempotent so it is fine to insert the same
673 : shred multiple times. */
674 :
675 0 : int slot_complete = !!( shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE );
676 0 : int ref_tick = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
677 :
678 0 : fd_forest_blk_t * ele = fd_forest_blk_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off );
679 0 : fd_forest_fec_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off, shred->idx, shred->fec_set_idx, slot_complete, ref_tick );
680 0 : fd_fec_sig_t * fec_sig = fd_fec_sig_query( ctx->fec_sigs, (shred->slot << 32) | shred->fec_set_idx, NULL );
681 0 : if( FD_LIKELY( fec_sig ) ) fd_fec_sig_remove( ctx->fec_sigs, fec_sig );
682 0 : FD_TEST( ele ); /* must be non-empty */
683 :
684 : /* metrics for completed slots */
685 0 : if( FD_UNLIKELY( ele->complete_idx != UINT_MAX && ele->buffered_idx==ele->complete_idx &&
686 0 : 0==memcmp( ele->cmpl, ele->fecs, sizeof(fd_forest_blk_idxs_t) * fd_forest_blk_idxs_word_cnt ) ) ) {
687 0 : long now = fd_tickcount();
688 0 : long start_ts = ele->first_req_ts == 0 || ele->slot >= ctx->turbine_slot0 ? ele->first_shred_ts : ele->first_req_ts;
689 0 : ulong duration_ticks = (ulong)(now - start_ts);
690 0 : fd_histf_sample( ctx->metrics->slot_compl_time, duration_ticks );
691 0 : fd_repair_metrics_add_slot( ctx->slot_metrics, ele->slot, start_ts, now, ele->repair_cnt, ele->turbine_cnt );
692 0 : FD_LOG_INFO(( "slot is complete %lu. num_data_shreds: %u, num_repaired: %u, num_turbine: %u, num_recovered: %u, duration: %.2f ms", ele->slot, ele->complete_idx + 1, ele->repair_cnt, ele->turbine_cnt, ele->recovered_cnt, (double)fd_metrics_convert_ticks_to_nanoseconds(duration_ticks) / 1e6 ));
693 0 : }
694 :
695 0 : if( FD_UNLIKELY( ctx->profiler.enabled ) ) {
696 : // If turbine slot 0 is in the consumed frontier, and it satisfies the
697 : // above conditions for completions, then catchup is complete
698 0 : fd_forest_blk_t * turbine0 = fd_forest_query( ctx->forest, ctx->turbine_slot0 );
699 0 : ulong turbine0_idx = fd_forest_pool_idx( fd_forest_pool( ctx->forest ), turbine0 );
700 0 : fd_forest_ref_t * consumed = fd_forest_consumed_ele_query( fd_forest_consumed( ctx->forest ), &turbine0_idx, NULL, fd_forest_conspool( ctx->forest ) );
701 0 : if( FD_UNLIKELY( consumed && turbine0->complete_idx != UINT_MAX && turbine0->complete_idx == turbine0->buffered_idx &&
702 0 : 0==memcmp( turbine0->cmpl, turbine0->fecs, sizeof(fd_forest_blk_idxs_t) * fd_forest_blk_idxs_word_cnt ) ) ) {
703 0 : FD_COMPILER_MFENCE();
704 0 : FD_VOLATILE( ctx->profiler.complete ) = 1;
705 0 : }
706 0 : }
707 0 : }
708 :
709 : static inline void
710 : after_net( ctx_t * ctx,
711 0 : ulong sz ) {
712 0 : fd_eth_hdr_t * eth; fd_ip4_hdr_t * ip4; fd_udp_hdr_t * udp;
713 0 : uchar * data; ulong data_sz;
714 0 : FD_TEST( fd_ip4_udp_hdr_strip( ctx->buffer, sz, &data, &data_sz, ð, &ip4, &udp ) );
715 0 : fd_ip4_port_t peer_addr = { .addr=ip4->saddr, .port=udp->net_sport };
716 0 : if( FD_UNLIKELY( data_sz != sizeof(fd_repair_ping_t) ) ) {
717 0 : ctx->metrics->malformed_ping++;
718 0 : return;
719 0 : }
720 0 : fd_repair_ping_t * res = (fd_repair_ping_t *)fd_type_pun( data );
721 0 : if( FD_UNLIKELY( res->kind != FD_REPAIR_KIND_PING ) ) {
722 0 : ctx->metrics->malformed_ping++;
723 0 : return;
724 0 : }
725 0 : fd_repair_msg_t * pong = fd_repair_pong( ctx->protocol, &res->ping.hash );
726 0 : fd_signs_queue_push( ctx->sign_queue, (sign_pending_t){ .msg = *pong, .pong_data = { .peer_addr = peer_addr, .hash = res->ping.hash, .daddr = ip4->daddr } } );
727 0 : }
728 :
729 : static inline void
730 : after_evict( ctx_t * ctx,
731 0 : ulong sig ) {
732 0 : ulong spilled_slot = fd_disco_shred_out_shred_sig_slot ( sig );
733 0 : uint spilled_fec_set_idx = fd_disco_shred_out_shred_sig_fec_set_idx( sig );
734 0 : uint spilled_max_idx = fd_disco_shred_out_shred_sig_data_cnt ( sig );
735 :
736 0 : fd_forest_fec_clear( ctx->forest, spilled_slot, spilled_fec_set_idx, spilled_max_idx );
737 0 : }
738 :
739 : static void
740 : after_frag( ctx_t * ctx,
741 : ulong in_idx,
742 : ulong seq FD_PARAM_UNUSED,
743 : ulong sig,
744 : ulong sz,
745 : ulong tsorig FD_PARAM_UNUSED,
746 : ulong tspub FD_PARAM_UNUSED,
747 0 : fd_stem_context_t * stem ) {
748 0 : if( FD_UNLIKELY( ctx->skip_frag ) ) return;
749 :
750 0 : ctx->stem = stem;
751 :
752 0 : uint in_kind = ctx->in_kind[ in_idx ];
753 0 : if( FD_UNLIKELY( in_kind==IN_KIND_GENESIS && sig==GENESI_SIG_BOOTSTRAP_COMPLETED ) ) {
754 0 : fd_forest_init( ctx->forest, 0 );
755 0 : return;
756 0 : }
757 :
758 0 : if( FD_UNLIKELY( in_kind==IN_KIND_GOSSIP ) ) {
759 0 : fd_gossip_update_message_t const * msg = (fd_gossip_update_message_t const *)fd_type_pun_const( ctx->buffer );
760 0 : if( FD_LIKELY( sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO ) ){
761 0 : after_contact( ctx, msg );
762 0 : } else {
763 0 : fd_policy_peer_remove( ctx->policy, (fd_pubkey_t const *)fd_type_pun_const( msg->origin_pubkey ) );
764 0 : }
765 0 : return;
766 0 : }
767 :
768 0 : if( FD_UNLIKELY( in_kind==IN_KIND_TOWER ) ) {
769 0 : if( FD_LIKELY( sig==FD_TOWER_SIG_SLOT_DONE ) ) {
770 0 : fd_tower_slot_done_t const * msg = (fd_tower_slot_done_t const *)fd_type_pun_const( ctx->buffer );
771 0 : if( FD_LIKELY( msg->root_slot!=ULONG_MAX ) ) fd_forest_publish( ctx->forest, msg->root_slot );
772 0 : }
773 0 : return;
774 0 : }
775 :
776 0 : if( FD_UNLIKELY( in_kind==IN_KIND_SIGN ) ) {
777 0 : after_sign( ctx, in_idx, sig, stem );
778 0 : return;
779 0 : }
780 :
781 0 : if( FD_UNLIKELY( in_kind==IN_KIND_SHRED ) ) {
782 :
783 : /* There are 3 message types from shred:
784 : 1. resolver evict - incomplete FEC set is evicted by resolver
785 : 2. fec complete - FEC set is completed by resolver. Also contains a shred.
786 : 3. shred - new shred
787 :
788 : Msgs 2 and 3 have a shred header in ctx->buffer */
789 :
790 0 : int resolver_evicted = sz == 0;
791 0 : int fec_completes = sz == FD_SHRED_DATA_HEADER_SZ + sizeof(fd_hash_t) + sizeof(fd_hash_t) + sizeof(int);
792 0 : if( FD_UNLIKELY( resolver_evicted ) ) {
793 0 : after_evict( ctx, sig );
794 0 : return;
795 0 : }
796 :
797 0 : fd_shred_t * shred = (fd_shred_t *)fd_type_pun( ctx->buffer );
798 0 : uint nonce = FD_LOAD(uint, ctx->buffer + fd_shred_header_sz( shred->variant ) );
799 0 : if( FD_UNLIKELY( shred->slot <= fd_forest_root_slot( ctx->forest ) ) ) {
800 0 : FD_LOG_INFO(( "shred %lu %u %u too old, ignoring", shred->slot, shred->idx, shred->fec_set_idx ));
801 0 : return;
802 0 : };
803 :
804 0 : if( FD_UNLIKELY( ctx->profiler.enabled && ctx->turbine_slot0 != ULONG_MAX && shred->slot > ctx->turbine_slot0 ) ) return;
805 0 : # if LOGGING
806 0 : if( FD_UNLIKELY( shred->slot > ctx->metrics->current_slot ) ) {
807 0 : FD_LOG_INFO(( "\n\n[Turbine]\n"
808 0 : "slot: %lu\n"
809 0 : "root: %lu\n",
810 0 : shred->slot,
811 0 : fd_forest_root_slot( ctx->forest ) ));
812 0 : }
813 0 : # endif
814 0 : ctx->metrics->current_slot = fd_ulong_max( shred->slot, ctx->metrics->current_slot );
815 0 : if( FD_UNLIKELY( ctx->turbine_slot0 == ULONG_MAX ) ) {
816 :
817 0 : if( FD_UNLIKELY( ctx->profiler.enabled ) ) {
818 : /* we wait until the first turbine shred arrives to kick off
819 : the profiler. This is to let gossip peers accumulate similar
820 : to a regular Firedancer run. */
821 0 : fd_forest_blk_insert( ctx->forest, ctx->profiler.end_slot, ctx->profiler.end_slot - 1 );
822 0 : fd_forest_code_shred_insert( ctx->forest, ctx->profiler.end_slot, 0 );
823 :
824 0 : ctx->turbine_slot0 = ctx->profiler.end_slot;
825 0 : fd_repair_metrics_set_turbine_slot0( ctx->slot_metrics, ctx->profiler.end_slot );
826 0 : fd_policy_set_turbine_slot0( ctx->policy, ctx->profiler.end_slot );
827 0 : return;
828 0 : }
829 :
830 0 : ctx->turbine_slot0 = shred->slot;
831 0 : fd_repair_metrics_set_turbine_slot0( ctx->slot_metrics, shred->slot );
832 0 : fd_policy_set_turbine_slot0( ctx->policy, shred->slot );
833 0 : }
834 :
835 0 : if( FD_UNLIKELY( fec_completes ) ) {
836 0 : after_fec( ctx, shred );
837 0 : } else {
838 : /* Don't want to reinsert the shred sig for an already complete FEC set */
839 0 : fd_fec_sig_t * fec_sig = fd_fec_sig_query( ctx->fec_sigs, (shred->slot << 32) | shred->fec_set_idx, NULL );
840 0 : if( FD_UNLIKELY( !fec_sig ) ) {
841 0 : fec_sig = fd_fec_sig_insert( ctx->fec_sigs, (shred->slot << 32) | shred->fec_set_idx );
842 0 : memcpy( fec_sig->sig, shred->signature, sizeof(fd_ed25519_sig_t) );
843 0 : }
844 0 : after_shred( ctx, sig, shred, nonce );
845 0 : }
846 :
847 : /* Check if there are FECs to force complete. Algorithm: window
848 : through the idxs in interval [i, j). If j = next fec_set_idx
849 : then we know we can force complete the FEC set interval [i, j)
850 : (assuming it wasn't already completed based on `cmpl`). */
851 :
852 0 : fd_forest_blk_t * blk = fd_forest_query( ctx->forest, shred->slot );
853 0 : if( blk ) {
854 0 : uint i = blk->consumed_idx + 1;
855 0 : for( uint j = i; j < blk->buffered_idx + 1; j++ ) {
856 0 : if( FD_UNLIKELY( fd_forest_blk_idxs_test( blk->fecs, j ) ) ) {
857 0 : if( FD_UNLIKELY( fd_forest_blk_idxs_test( blk->cmpl, j ) ) ) {
858 : /* already been completed without force complete */
859 0 : } else {
860 : /* force completeable */
861 0 : fd_fec_sig_t * fec_sig = fd_fec_sig_query( ctx->fec_sigs, (shred->slot << 32) | i, NULL );
862 0 : if( FD_LIKELY( fec_sig ) ) {
863 0 : ulong sig = fd_ulong_load_8( fec_sig->sig );
864 0 : ulong tile_idx = sig % ctx->shred_tile_cnt;
865 0 : uint last_idx = j - i;
866 :
867 0 : uchar * chunk = fd_chunk_to_laddr( ctx->shred_out_ctx[tile_idx].mem, ctx->shred_out_ctx[tile_idx].chunk );
868 0 : memcpy( chunk, fec_sig->sig, sizeof(fd_ed25519_sig_t) );
869 0 : fd_fec_sig_remove( ctx->fec_sigs, fec_sig );
870 0 : fd_stem_publish( stem, ctx->shred_out_ctx[tile_idx].idx, last_idx, ctx->shred_out_ctx[tile_idx].chunk, sizeof(fd_ed25519_sig_t), 0UL, 0UL, 0UL );
871 0 : ctx->shred_out_ctx[tile_idx].chunk = fd_dcache_compact_next( ctx->shred_out_ctx[tile_idx].chunk, sizeof(fd_ed25519_sig_t), ctx->shred_out_ctx[tile_idx].chunk0, ctx->shred_out_ctx[tile_idx].wmark );
872 0 : }
873 0 : }
874 : /* advance consumed */
875 0 : blk->consumed_idx = j;
876 0 : i = j + 1;
877 0 : }
878 0 : }
879 0 : }
880 : /* update metrics */
881 0 : ctx->metrics->repaired_slots = fd_forest_highest_repaired_slot( ctx->forest );
882 0 : return;
883 0 : }
884 :
885 0 : if( FD_UNLIKELY( in_kind==IN_KIND_STAKE ) ) {
886 0 : return;
887 0 : }
888 :
889 0 : if( FD_UNLIKELY( in_kind==IN_KIND_SNAP ) ) {
890 0 : after_snap( ctx, sig, fd_chunk_to_laddr( ctx->in_links[ in_idx ].mem, ctx->snap_out_chunk ) );
891 0 : return;
892 0 : }
893 :
894 0 : if( FD_UNLIKELY( in_kind==IN_KIND_NET ) ) {
895 0 : after_net( ctx, sz );
896 0 : return;
897 0 : }
898 0 : }
899 :
900 : static inline void
901 : after_credit( ctx_t * ctx,
902 : fd_stem_context_t * stem FD_PARAM_UNUSED,
903 : int * opt_poll_in FD_PARAM_UNUSED,
904 0 : int * charge_busy ) {
905 0 : long now = fd_log_wallclock();
906 :
907 : /* Verify that there is at least one sign tile with available credits.
908 : If not, we can't send any requests and leave early. */
909 0 : out_ctx_t * sign_out = sign_avail_credits( ctx );
910 0 : if( FD_UNLIKELY( !sign_out ) ) {
911 0 : ctx->metrics->sign_tile_unavail++;
912 0 : return;
913 0 : }
914 0 : if( FD_UNLIKELY( !fd_signs_queue_empty( ctx->sign_queue ) ) ) {
915 0 : sign_pending_t signable = fd_signs_queue_pop( ctx->sign_queue );
916 0 : fd_repair_send_sign_request( ctx, sign_out, &signable.msg, signable.msg.kind == FD_REPAIR_KIND_PONG ? &signable.pong_data : NULL );
917 0 : *charge_busy = 1;
918 0 : return;
919 0 : }
920 :
921 0 : if( FD_UNLIKELY( fd_inflights_should_drain( ctx->inflights, now ) ) ) {
922 0 : ulong nonce; ulong slot; ulong shred_idx;
923 0 : *charge_busy = 1;
924 0 : fd_inflights_request_pop( ctx->inflights, &nonce, &slot, &shred_idx );
925 0 : fd_forest_blk_t * blk = fd_forest_query( ctx->forest, slot );
926 0 : if( FD_UNLIKELY( blk && !fd_forest_blk_idxs_test( blk->idxs, shred_idx ) ) ) {
927 0 : fd_pubkey_t const * peer = fd_policy_peer_select( ctx->policy );
928 0 : ctx->metrics->rerequest++;
929 0 : if( FD_UNLIKELY( !peer ) ) {
930 : /* No peers. But we CANNOT lose this request. */
931 : /* Add this request to the inflights table, pretend we've sent it and let the inflight timeout request it down the line. */
932 0 : fd_hash_t hash = { .ul[0] = 0 };
933 0 : fd_inflights_request_insert( ctx->inflights, ctx->policy->nonce++, &hash, slot, shred_idx );
934 0 : } else {
935 0 : fd_repair_msg_t * msg = fd_repair_shred( ctx->protocol, peer, (ulong)((ulong)now / 1e6L), ctx->policy->nonce++, slot, shred_idx );
936 0 : fd_repair_send_sign_request( ctx, sign_out, msg, NULL );
937 0 : return;
938 0 : }
939 0 : }
940 0 : }
941 :
942 0 : fd_repair_msg_t const * cout = fd_policy_next( ctx->policy, ctx->forest, ctx->protocol, now, ctx->metrics->current_slot, charge_busy );
943 0 : if( FD_UNLIKELY( !cout ) ) return;
944 :
945 0 : fd_repair_send_sign_request( ctx, sign_out, cout, NULL );
946 0 : }
947 :
948 : static inline void
949 0 : during_housekeeping( ctx_t * ctx ) {
950 0 : (void)ctx;
951 : # if DEBUG_LOGGING
952 : long now = fd_log_wallclock();
953 : if( FD_UNLIKELY( now - ctx->tsdebug > (long)10e9 ) ) {
954 : fd_forest_print( ctx->forest );
955 : ctx->tsdebug = fd_log_wallclock();
956 : }
957 : # endif
958 0 : }
959 :
960 : static void
961 : privileged_init( fd_topo_t * topo,
962 0 : fd_topo_tile_t * tile ) {
963 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
964 :
965 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
966 0 : ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(ctx_t), sizeof(ctx_t) );
967 0 : fd_memset( ctx, 0, sizeof(ctx_t) );
968 :
969 0 : uchar const * identity_key = fd_keyload_load( tile->repair.identity_key_path, /* pubkey only: */ 0 );
970 0 : fd_memcpy( ctx->identity_public_key.uc, identity_key + 32UL, sizeof(fd_pubkey_t) );
971 :
972 0 : FD_TEST( fd_rng_secure( &ctx->repair_seed, sizeof(ulong) ) );
973 0 : }
974 :
975 : static void
976 : unprivileged_init( fd_topo_t * topo,
977 0 : fd_topo_tile_t * tile ) {
978 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
979 :
980 0 : ulong total_sign_depth = tile->repair.repair_sign_depth * tile->repair.repair_sign_cnt;
981 0 : int lg_sign_depth = fd_ulong_find_msb( fd_ulong_pow2_up(total_sign_depth) ) + 1;
982 :
983 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
984 0 : ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(ctx_t), sizeof(ctx_t) );
985 0 : ctx->protocol = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_align(), fd_repair_footprint () );
986 0 : ctx->forest = FD_SCRATCH_ALLOC_APPEND( l, fd_forest_align(), fd_forest_footprint ( tile->repair.slot_max ) );
987 0 : ctx->policy = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_align(), fd_policy_footprint ( FD_NEEDED_KEY_MAX, FD_ACTIVE_KEY_MAX ) );
988 0 : ctx->inflights = FD_SCRATCH_ALLOC_APPEND( l, fd_inflights_align(), fd_inflights_footprint () );
989 0 : ctx->fec_sigs = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_sig_align(), fd_fec_sig_footprint ( 20 ) );
990 0 : ctx->signs_map = FD_SCRATCH_ALLOC_APPEND( l, fd_signs_map_align(), fd_signs_map_footprint ( lg_sign_depth ) );
991 0 : ctx->sign_queue = FD_SCRATCH_ALLOC_APPEND( l, fd_signs_queue_align(), fd_signs_queue_footprint() );
992 0 : ctx->slot_metrics = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_metrics_align(), fd_repair_metrics_footprint() );
993 0 : FD_TEST( FD_SCRATCH_ALLOC_FINI( l, scratch_align() ) == (ulong)scratch + scratch_footprint( tile ) );
994 :
995 0 : ctx->protocol = fd_repair_join ( fd_repair_new ( ctx->protocol, &ctx->identity_public_key ) );
996 0 : ctx->forest = fd_forest_join ( fd_forest_new ( ctx->forest, tile->repair.slot_max, ctx->repair_seed ) );
997 0 : ctx->policy = fd_policy_join ( fd_policy_new ( ctx->policy, FD_NEEDED_KEY_MAX, FD_ACTIVE_KEY_MAX, ctx->repair_seed ) );
998 0 : ctx->inflights = fd_inflights_join ( fd_inflights_new ( ctx->inflights ) );
999 0 : ctx->fec_sigs = fd_fec_sig_join ( fd_fec_sig_new ( ctx->fec_sigs, 20, 0UL ) );
1000 0 : ctx->signs_map = fd_signs_map_join ( fd_signs_map_new ( ctx->signs_map, lg_sign_depth, 0UL ) );
1001 0 : ctx->sign_queue = fd_signs_queue_join ( fd_signs_queue_new ( ctx->sign_queue ) );
1002 0 : ctx->slot_metrics = fd_repair_metrics_join( fd_repair_metrics_new( ctx->slot_metrics ) );
1003 :
1004 : /* Process in links */
1005 :
1006 0 : if( FD_UNLIKELY( tile->in_cnt > MAX_IN_LINKS ) ) FD_LOG_ERR(( "repair tile has too many input links" ));
1007 :
1008 0 : uint sign_repair_in_idx[ MAX_SIGN_TILE_CNT ] = {0};
1009 0 : uint sign_repair_idx = 0;
1010 0 : ulong sign_link_depth = 0;
1011 :
1012 0 : for( uint in_idx=0U; in_idx<(tile->in_cnt); in_idx++ ) {
1013 0 : fd_topo_link_t * link = &topo->links[ tile->in_link_id[ in_idx ] ];
1014 0 : if( 0==strcmp( link->name, "net_repair" ) ) {
1015 0 : ctx->in_kind[ in_idx ] = IN_KIND_NET;
1016 0 : fd_net_rx_bounds_init( &ctx->in_links[ in_idx ].net_rx, link->dcache );
1017 0 : continue;
1018 0 : } else if( 0==strcmp( link->name, "sign_repair" ) ) {
1019 0 : ctx->in_kind[ in_idx ] = IN_KIND_SIGN;
1020 0 : sign_repair_in_idx[ sign_repair_idx++ ] = in_idx;
1021 0 : sign_link_depth = link->depth;
1022 0 : }
1023 0 : else if( 0==strcmp( link->name, "gossip_out" ) ) ctx->in_kind[ in_idx ] = IN_KIND_GOSSIP;
1024 0 : else if( 0==strcmp( link->name, "tower_out" ) ) ctx->in_kind[ in_idx ] = IN_KIND_TOWER;
1025 0 : else if( 0==strcmp( link->name, "shred_out" ) ) ctx->in_kind[ in_idx ] = IN_KIND_SHRED;
1026 0 : else if( 0==strcmp( link->name, "snapin_manif" ) ) ctx->in_kind[ in_idx ] = IN_KIND_SNAP;
1027 0 : else if( 0==strcmp( link->name, "replay_stake" ) ) ctx->in_kind[ in_idx ] = IN_KIND_STAKE;
1028 0 : else if( 0==strcmp( link->name, "genesi_out" ) ) ctx->in_kind[ in_idx ] = IN_KIND_GENESIS;
1029 0 : else FD_LOG_ERR(( "repair tile has unexpected input link %s", link->name ));
1030 :
1031 0 : ctx->in_links[ in_idx ].mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1032 0 : ctx->in_links[ in_idx ].chunk0 = fd_dcache_compact_chunk0( ctx->in_links[ in_idx ].mem, link->dcache );
1033 0 : ctx->in_links[ in_idx ].wmark = fd_dcache_compact_wmark ( ctx->in_links[ in_idx ].mem, link->dcache, link->mtu );
1034 0 : ctx->in_links[ in_idx ].mtu = link->mtu;
1035 :
1036 0 : FD_TEST( fd_dcache_compact_is_safe( ctx->in_links[in_idx].mem, link->dcache, link->mtu, link->depth ) );
1037 0 : }
1038 :
1039 0 : ctx->net_out_idx = UINT_MAX;
1040 0 : ctx->shred_tile_cnt = 0;
1041 0 : ctx->repair_sign_cnt = 0;
1042 0 : ctx->sign_rrobin_idx = 0;
1043 :
1044 0 : for( uint out_idx=0U; out_idx<(tile->out_cnt); out_idx++ ) {
1045 0 : fd_topo_link_t * link = &topo->links[ tile->out_link_id[ out_idx ] ];
1046 :
1047 0 : if( 0==strcmp( link->name, "repair_net" ) ) {
1048 :
1049 0 : if( ctx->net_out_idx!=UINT_MAX ) continue; /* only use first net link */
1050 0 : ctx->net_out_idx = out_idx;
1051 0 : ctx->net_out_mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1052 0 : ctx->net_out_chunk0 = fd_dcache_compact_chunk0( ctx->net_out_mem, link->dcache );
1053 0 : ctx->net_out_wmark = fd_dcache_compact_wmark( ctx->net_out_mem, link->dcache, link->mtu );
1054 0 : ctx->net_out_chunk = ctx->net_out_chunk0;
1055 :
1056 0 : } else if( 0==strcmp( link->name, "repair_shred" ) ) {
1057 :
1058 0 : out_ctx_t * shred_out = &ctx->shred_out_ctx[ ctx->shred_tile_cnt++ ];
1059 0 : shred_out->idx = out_idx;
1060 0 : shred_out->mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1061 0 : shred_out->chunk0 = fd_dcache_compact_chunk0( shred_out->mem, link->dcache );
1062 0 : shred_out->wmark = fd_dcache_compact_wmark( shred_out->mem, link->dcache, link->mtu );
1063 0 : shred_out->chunk = shred_out->chunk0;
1064 :
1065 0 : } else if( 0==strcmp( link->name, "repair_sign" ) ) {
1066 :
1067 0 : out_ctx_t * repair_sign_out = &ctx->repair_sign_out_ctx[ ctx->repair_sign_cnt ];
1068 0 : repair_sign_out->idx = out_idx;
1069 0 : repair_sign_out->mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1070 0 : repair_sign_out->chunk0 = fd_dcache_compact_chunk0( repair_sign_out->mem, link->dcache );
1071 0 : repair_sign_out->wmark = fd_dcache_compact_wmark( repair_sign_out->mem, link->dcache, link->mtu );
1072 0 : repair_sign_out->chunk = repair_sign_out->chunk0;
1073 0 : repair_sign_out->in_idx = sign_repair_in_idx[ ctx->repair_sign_cnt++ ]; /* match to the sign_repair input link */
1074 0 : repair_sign_out->max_credits = sign_link_depth;
1075 0 : repair_sign_out->credits = sign_link_depth;
1076 :
1077 0 : } else {
1078 0 : FD_LOG_ERR(( "repair tile has unexpected output link %s", link->name ));
1079 0 : }
1080 0 : }
1081 0 : if( FD_UNLIKELY( ctx->net_out_idx==UINT_MAX ) ) FD_LOG_ERR(( "Missing repair_net link" ));
1082 0 : if( FD_UNLIKELY( ctx->repair_sign_cnt!=sign_repair_idx ) ) {
1083 0 : FD_LOG_ERR(( "Mismatch between repair_sign output links (%lu) and sign_repair input links (%u)", ctx->repair_sign_cnt, sign_repair_idx ));
1084 0 : }
1085 :
1086 0 : FD_TEST( ctx->shred_tile_cnt == fd_topo_tile_name_cnt( topo, "shred" ) );
1087 :
1088 : # if DEBUG_LOGGING
1089 : if( fd_signs_map_key_max( ctx->signs_map ) < tile->repair.repair_sign_depth * tile->repair.repair_sign_cnt ) {
1090 : FD_LOG_ERR(( "repair pending signs tracking map is too small: %lu < %lu. Increase the key_max", fd_signs_map_key_max( ctx->signs_map ), tile->repair.repair_sign_depth * tile->repair.repair_sign_cnt ));
1091 : }
1092 : # endif
1093 :
1094 0 : ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
1095 0 : ctx->repair_intake_addr.port = fd_ushort_bswap( tile->repair.repair_intake_listen_port );
1096 0 : ctx->repair_serve_addr.port = fd_ushort_bswap( tile->repair.repair_serve_listen_port );
1097 :
1098 0 : ctx->net_id = (ushort)0;
1099 0 : fd_ip4_udp_hdr_init( ctx->intake_hdr, FD_REPAIR_MAX_PACKET_SIZE, 0, tile->repair.repair_intake_listen_port );
1100 0 : fd_ip4_udp_hdr_init( ctx->serve_hdr, FD_REPAIR_MAX_PACKET_SIZE, 0, tile->repair.repair_serve_listen_port );
1101 :
1102 : /* Repair set up */
1103 :
1104 0 : ctx->turbine_slot0 = ULONG_MAX;
1105 0 : FD_LOG_INFO(( "repair my addr - intake addr: " FD_IP4_ADDR_FMT ":%u, serve_addr: " FD_IP4_ADDR_FMT ":%u",
1106 0 : FD_IP4_ADDR_FMT_ARGS( ctx->repair_intake_addr.addr ), fd_ushort_bswap( ctx->repair_intake_addr.port ),
1107 0 : FD_IP4_ADDR_FMT_ARGS( ctx->repair_serve_addr.addr ), fd_ushort_bswap( ctx->repair_serve_addr.port ) ));
1108 :
1109 0 : memset( ctx->metrics, 0, sizeof(ctx->metrics) );
1110 :
1111 0 : fd_histf_join( fd_histf_new( ctx->metrics->slot_compl_time, FD_MHIST_SECONDS_MIN( REPAIR, SLOT_COMPLETE_TIME ),
1112 0 : FD_MHIST_SECONDS_MAX( REPAIR, SLOT_COMPLETE_TIME ) ) );
1113 0 : fd_histf_join( fd_histf_new( ctx->metrics->response_latency, FD_MHIST_MIN( REPAIR, RESPONSE_LATENCY ),
1114 0 : FD_MHIST_MAX( REPAIR, RESPONSE_LATENCY ) ) );
1115 :
1116 0 : ctx->tsdebug = fd_log_wallclock();
1117 0 : ctx->pending_key_next = 0;
1118 0 : ctx->profiler.enabled = tile->repair.end_slot != 0UL;
1119 0 : ctx->profiler.end_slot = tile->repair.end_slot;
1120 0 : if( ctx->profiler.enabled ) {
1121 0 : ctx->metrics->current_slot = tile->repair.end_slot + 1; /* +1 to allow the turbine slot 0 to be completed */
1122 0 : ctx->profiler.complete = 0;
1123 0 : }
1124 0 : }
1125 :
1126 : static ulong
1127 : populate_allowed_seccomp( fd_topo_t const * topo FD_PARAM_UNUSED,
1128 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
1129 : ulong out_cnt,
1130 0 : struct sock_filter * out ) {
1131 0 : populate_sock_filter_policy_fd_repair_tile(
1132 0 : out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)-1 );
1133 0 : return sock_filter_policy_fd_repair_tile_instr_cnt;
1134 0 : }
1135 :
1136 : static ulong
1137 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
1138 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
1139 : ulong out_fds_cnt,
1140 0 : int * out_fds ) {
1141 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
1142 :
1143 0 : ulong out_cnt = 0UL;
1144 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
1145 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
1146 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
1147 0 : return out_cnt;
1148 0 : }
1149 :
1150 : static inline void
1151 0 : metrics_write( ctx_t * ctx ) {
1152 0 : FD_MCNT_SET( REPAIR, CURRENT_SLOT, ctx->metrics->current_slot );
1153 0 : FD_MCNT_SET( REPAIR, REPAIRED_SLOTS, ctx->metrics->repaired_slots );
1154 0 : FD_MCNT_SET( REPAIR, REQUEST_PEERS, fd_peer_pool_used( ctx->policy->peers.pool ) );
1155 0 : FD_MCNT_SET( REPAIR, SIGN_TILE_UNAVAIL, ctx->metrics->sign_tile_unavail );
1156 0 : FD_MCNT_SET( REPAIR, REREQUEST_QUEUE, ctx->metrics->rerequest );
1157 :
1158 0 : FD_MCNT_SET ( REPAIR, TOTAL_PKT_COUNT, ctx->metrics->send_pkt_cnt );
1159 0 : FD_MCNT_ENUM_COPY( REPAIR, SENT_PKT_TYPES, ctx->metrics->sent_pkt_types );
1160 :
1161 0 : FD_MHIST_COPY( REPAIR, SLOT_COMPLETE_TIME, ctx->metrics->slot_compl_time );
1162 0 : FD_MHIST_COPY( REPAIR, RESPONSE_LATENCY, ctx->metrics->response_latency );
1163 0 : }
1164 :
1165 : #undef DEBUG_LOGGING
1166 :
1167 : /* TODO: This is not correct, but is temporary and will be fixed
1168 : when fixed FEC 32 goes in, and we can finally get rid of force
1169 : completes BS. */
1170 0 : #define STEM_BURST (64UL)
1171 :
1172 : /* Sign manual credit management, backpressuring, sign tile count, &
1173 : sign speed effect this lazy value. The main goal of repair's highest
1174 : workload (catchup) is to have high send packet rate. Repair is
1175 : regularly idle, and mostly waiting for dispatched signs to come
1176 : in. Processing shreds from shred tile is a relatively fast operation.
1177 : Thus we only worry about fully utilizing the sign tiles' capacity.
1178 :
1179 : Assuming standard 2 sign tiles & reasonably fast signing rate & if
1180 : repair_sign_depth==sign_repair_depth: the lower the LAZY, the less
1181 : time is spent in backpressure, and the higher the packet send rate
1182 : gets. As expected, up until a certain point, credit return is slower
1183 : than signing. This starts to plateau at ~10k LAZY (for a box that can
1184 : sign at ~20k repair pps, but is fully dependent on the sign tile's
1185 : speed).
1186 :
1187 : At this point we start returning credits faster than we actually get
1188 : them from the sign tile, so signing becomes the bottleneck. The
1189 : extreme case is when we set it to standard lazy (289 ns);
1190 : housekeeping time spikes, but backpressure time drops (to a lower but
1191 : inconsistent value). But because we are usually idling in the repair
1192 : tile, higher housekeeping doesn't really effect the send packet rate.
1193 :
1194 : Recall that repair_sign_depth is actually > sign_repair_depth (see
1195 : long comment in ctx_t struct). So repair_sign is NEVER
1196 : backpressuring the repair tile. When we set
1197 : repair_sign_depth>sign_repair_depth, we spend very little time in
1198 : backpressure (repair_sign always has available credits), and most of
1199 : the time idling. Theoretically, this uncouples repair tile with
1200 : credit return and basically sends at rate as close to as we can sign.
1201 : This is a small improvement over the first case (low lazy,
1202 : repair_sign_depth==sign_repair_depth).
1203 :
1204 : Since we don't ever fill up repair_sign link, we can set LAZY to any
1205 : reasonable value that keeps housekeeping time low. */
1206 0 : #define STEM_LAZY (64000)
1207 :
1208 0 : #define STEM_CALLBACK_CONTEXT_TYPE ctx_t
1209 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(ctx_t)
1210 :
1211 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
1212 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
1213 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
1214 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
1215 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
1216 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
1217 :
1218 : #include "../../disco/stem/fd_stem.c"
1219 :
1220 : fd_topo_run_tile_t fd_tile_repair = {
1221 : .name = "repair",
1222 : .loose_footprint = loose_footprint,
1223 : .populate_allowed_seccomp = populate_allowed_seccomp,
1224 : .populate_allowed_fds = populate_allowed_fds,
1225 : .scratch_align = scratch_align,
1226 : .scratch_footprint = scratch_footprint,
1227 : .unprivileged_init = unprivileged_init,
1228 : .privileged_init = privileged_init,
1229 : .run = stem_run,
1230 : };
|