Line data Source code
1 : /* The repair tile is responsible for repairing missing shreds that were
2 : not received via Turbine. The goal is to ensure that slots we "care"
3 : about have their FEC sets inserted into store.
4 :
5 : Generally there are two distinct traffic patterns:
6 :
7 : a. Firedancer boots up and fires off a large number of repairs to
8 : recover all the blocks between the snapshot on which it is booting
9 : and the head of the chain. In this mode, repair tile utilization
10 : is very high along with net and sign utilization.
11 :
12 : b. Firedancer catches up to the head of the chain and enters steady
13 : state where most shred traffic is delivered over turbine. In this
14 : state, repairs are only occasionally needed to recover shreds lost
15 : due to anomalies like packet loss, transmitter (leader) never sent
16 : them or even a malicious leader etc. On rare occasion, repair
17 : will also need to recover a different version of a block that
18 : equivocated.
19 :
20 : To accomplish the above, repair mainly processes 4 kinds of frags:
21 :
22 : 1. Shred data (from shred tile)
23 :
24 : Any shred (coding or data) that passes validation and filtering in
25 : the shred tile is forwarded to repair. Repair uses these to track
26 : which shreds have been received in `fd_forest`, a tree data
27 : structure that mirrors the block/slot ancestry chain. It also
28 : uses these shreds to discover slots or ancestries that were not
29 : known. fd_forest tracks metadata for each slot, including slot
30 : completion status, merkle roots, and metrics.
31 :
32 : Any shred that we can correlate with a repair request we made is
33 : used to update peer response latency metrics in fd_policy (See
34 : fd_policy.h for more details).
35 :
36 : 2. FEC status messages (from shred tile)
37 :
38 : These fall under two categories: FEC completion and FEC eviction.
39 : When all shreds in a FEC set have been recovered, the shred tile
40 : sends a completion message. This may trigger chained merkle
41 : verification if the slot has a confirmed block_id. The completed
42 : FEC message is always forwarded to replay via repair_out.
43 :
44 : When an incomplete FEC set is evicted from the shred tile's FEC
45 : resolver (e.g. due to capacity), it also notifies repair. Repair
46 : clears the corresponding FEC set entries from the forest so those
47 : shred indices can be re-requested if they are necessary. As
48 : mentioned in fd_forest.h, forest needs to maintain a strict subset
49 : of shreds that are known by fec_resolver, store, and reasm in
50 : order to guarantee forward progress always.
51 :
52 : 3. Pings (from net tile)
53 :
54 : Repair peers use a ping-pong protocol to verify liveness before
55 : serving repair requests. When a ping arrives over the network,
56 : repair validates the message and constructs a pong response. To
57 : prevent spam attacks, repair has stopgaps like tracking how many
58 : pongs per peer are currently in the sign queue and dropping pings
59 : from unknown peers. These are the only untrusted inputs to
60 : repair.
61 :
62 : 4. Sign task responses (from sign tile)
63 :
64 : Repair requests are signed asynchronously; the repair tile
65 : constructs a repair request, dispatches it to a sign tile via the
66 : repair_sign output link. The repair-sign communication is
67 : manually managed via credit tracking in the repair tile (see
68 : comment in out_ctx_t struct definition).
69 :
70 : After receiving the signature back from sign tile, repair injects
71 : the signature into the pending request and dispatches it to the
72 : net tile via the repair_net output link. The behavior depends on
73 : the request type:
74 : - Pong: the signed pong is sent to the peer that pinged us.
75 : - Warmup: a proactive request sent when a new peer's contact info
76 : first arrives, prepaying the ping-pong RTT cost. The signed
77 : request is sent to the peer if it is still active.
78 : - Regular shred request: the request is recorded in an inflight
79 : table (for tracking response latency and timeouts) and the
80 : signed packet is sent to the selected peer.
81 :
82 : Secondary "other" frags that are processed but not part of core
83 : repair logic:
84 :
85 : 5. Confirmation messages from tower
86 :
87 : Tower sends two kinds of messages relevant to repair:
88 : - slot_done: indicates a slot has finished replay and may advance
89 : the root. Repair publishes (roots) the forest up to that slot,
90 : pruning old ancestry.
91 : - slot_confirmed: indicates a slot has reached a confirmation
92 : level (e.g. duplicate-confirmed). If the slot is not yet in the
93 : forest, repair creates a sentinel block so it can be repaired.
94 : It also stores the confirmed block_id and may trigger chained
95 : merkle verification. See fd_forest.h on more details about
96 : chained merkle verification.
97 :
98 : 6. Eviction messages from replay (reasm)
99 :
100 : When the replay tile's reassembly buffer evicts a FEC set (e.g.
101 : due to pool capacity) from itself and from store, it notifies
102 : repair with the slot and fec_set_idx. Repair clears those FEC
103 : entries from the forest so the shreds can be re-requested.
104 :
105 : 7. Contact info messages from gossip
106 :
107 : Gossip forwards contact info updates and removals for other
108 : validators. Repair uses these to maintain a list of peers to
109 : make requests to.
110 :
111 : If fd_forest tracks what we know about each shred, fd_policy and
112 : fd_inflights is responsible for deciding what next repair request to
113 : make. fd_policy and fd_inflights split responsibility: fd_policy
114 : makes any new requests, orphan requests, and requests directly off
115 : the forest iterator, while fd_inflights re-requests anything that has
116 : been requested but not received yet within a timeout window. */
117 :
118 : #define _GNU_SOURCE
119 :
120 : #include "../genesis/fd_genesi_tile.h"
121 : #include "../../disco/topo/fd_topo.h"
122 : #include "generated/fd_repair_tile_seccomp.h"
123 : #include "../../disco/keyguard/fd_keyload.h"
124 : #include "../../disco/keyguard/fd_keyguard.h"
125 : #include "../../disco/keyguard/fd_keyswitch.h"
126 : #include "../../disco/metrics/fd_metrics.h"
127 : #include "../../disco/net/fd_net_tile.h"
128 : #include "../../disco/shred/fd_rnonce_ss.h"
129 : #include "../../disco/shred/fd_shred_tile.h"
130 : #include "../../flamenco/gossip/fd_gossip_message.h"
131 : #include "../replay/fd_replay_tile.h"
132 : #include "../tower/fd_tower_tile.h"
133 : #include "../../discof/restore/utils/fd_ssmsg.h"
134 : #include "../../util/net/fd_net_headers.h"
135 : #include "../../util/pod/fd_pod_format.h"
136 : #include "../../tango/fd_tango_base.h"
137 :
138 : #include "../forest/fd_forest.h"
139 : #include "fd_repair_metrics.h"
140 : #include "fd_inflight.h"
141 : #include "fd_repair.h"
142 : #include "fd_policy.h"
143 :
144 : #define DEBUG_LOGGING 0
145 :
146 : #define IN_KIND_CONTACT (0)
147 0 : #define IN_KIND_NET (1)
148 0 : #define IN_KIND_TOWER (2)
149 0 : #define IN_KIND_SHRED (3)
150 0 : #define IN_KIND_SIGN (4)
151 0 : #define IN_KIND_SNAP (5)
152 0 : #define IN_KIND_GOSSIP (6)
153 0 : #define IN_KIND_GENESIS (7)
154 0 : #define IN_KIND_REPLAY (8)
155 :
156 : #define MAX_IN_LINKS (32)
157 : #define MAX_SHRED_TILE_CNT ( 16UL )
158 : #define MAX_SIGN_TILE_CNT ( 16UL )
159 :
160 : /* Max number of validators that can be actively queried */
161 0 : #define FD_REPAIR_PEER_MAX (FD_CONTACT_INFO_TABLE_SIZE)
162 :
163 : /* Max number of pending repair requests recently made to keep track of.
164 : Calculated generally as we estimate around 50k/s/core to sign
165 : requests. Assuming an over-provisioned 4 sign tiles just for repair,
166 : this means we can make up to ~200k requests per second. With a dedup
167 : timeout of 80ms, this means we can make up to ~16k requests within
168 : the dedup timeout window. We round up to the next power of two to
169 : get the dedup cache max. Since we are sizing the dedup cache for a
170 : generous margin, and this number not particularly fragile or
171 : sensitive, we can leave it static. */
172 0 : #define FD_DEDUP_CACHE_MAX (1<<15)
173 :
174 : /* static map from request type to metric array index */
175 : static uint metric_index[FD_REPAIR_KIND_ORPHAN + 1] = {
176 : [FD_REPAIR_KIND_PONG] = FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_V_PONG_IDX,
177 : [FD_REPAIR_KIND_SHRED] = FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_V_NEEDED_WINDOW_IDX,
178 : [FD_REPAIR_KIND_HIGHEST_SHRED] = FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_V_NEEDED_HIGHEST_WINDOW_IDX,
179 : [FD_REPAIR_KIND_ORPHAN] = FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_V_NEEDED_ORPHAN_IDX,
180 : };
181 :
182 : typedef union {
183 : struct {
184 : fd_wksp_t * mem;
185 : ulong chunk0;
186 : ulong wmark;
187 : ulong mtu;
188 : };
189 : fd_net_rx_bounds_t net_rx;
190 : } in_ctx_t;
191 :
192 : struct out_ctx {
193 : ulong idx;
194 : fd_wksp_t * mem;
195 : ulong chunk0;
196 : ulong wmark;
197 : ulong chunk;
198 :
199 : /* Repair tile directly tracks credit outside of stem for these
200 : asynchronous sign links. In particular, credits tracks the RETURN
201 : sign_repair link. This is because repair_sign and
202 : sign_repair are unreliable. If both links were reliable, and the
203 : links filled completely, stem would get into a deadlock. Neither
204 : repair or sign would have credits, which would prevent frags from
205 : getting polled in repair or sign, which would prevent any credits
206 : from getting returned back to the tiles. So the sign_repair return
207 : link must be unreliable. credits / max_credits are used by the
208 : repair_sign link, but credits tracks the RETURN
209 : sign_repair link.
210 :
211 : Consider the scenario:
212 :
213 : repair_sign (depth 128) sign_repair (depth 128)
214 : repair ----------------------> sign ------------------------> repair
215 : [rest free, r130, r129] [r128, r127, ... , r1] (full)
216 :
217 : If repair is publishing too many requests too fast(common in
218 : catchup), and not polling enough frags from sign, without manual
219 : management the sign_repair link would be overrun. Nothing is
220 : stopping repair from publishing more requests, because sign is
221 : functioning fast enough to handle the requests. However, nothing is
222 : stopping sign from polling the next request and signing it, and
223 : PUBLISHING it on the sign_repair link that is already full, because
224 : the sign_repair link is unreliable.
225 :
226 : This is why we need to manually track credits for the sign_repair
227 : link. We must ensure that there are never more than 128 items in
228 : the ENTIRE repair_sign -> sign tile -> sign_repair work queue, else
229 : there is always a possibility of an overrun in the sign_repair
230 : link.
231 :
232 : We can furthermore ensure some nice properties by having the
233 : repair_sign link have a greater depth than the sign_repair link.
234 : This way, we exclusively use manual credit management to control
235 : the rate at which we publish requests to sign. This allows for
236 : repair_sign to also be unreliable. Even when the repair sign link
237 : is "full", we can avoid backpressure and continue polling frags,
238 : without overruning the sign_repair link.
239 :
240 : To lose a frag to overrun isn't necessarily critical, but in
241 : general the repair tile relies on the fact that a signing task
242 : published to sign tile will always come back. If we lose a frag to
243 : overrun, then there will be an entry in the pending signs structure
244 : that is never removed, and theoretically the map could fill up.
245 : Conceptually, with a reliable (unreliable links, but strictly
246 : controlled count-per-link) sign->repair->sign structure, there
247 : should be no eviction needed in this pending signs structure. */
248 :
249 : ulong in_idx; /* index of the incoming link */
250 : ulong credits; /* available credits for link */
251 : ulong max_credits; /* maximum credits (depth) */
252 : };
253 : typedef struct out_ctx out_ctx_t;
254 :
255 : /* Data needed to sign and send a pong that is not contained in the
256 : pong msg itself. */
257 :
258 : struct pong_data {
259 : fd_ip4_port_t peer_addr;
260 : fd_hash_t hash;
261 : uint daddr;
262 : fd_pubkey_t key;
263 : };
264 : typedef struct pong_data pong_data_t;
265 :
266 : struct sign_req {
267 : ulong key; /* map key, ctx->pending_key_next */
268 : ulong buflen;
269 : union {
270 : uchar buf[sizeof(fd_repair_msg_t)];
271 : fd_repair_msg_t msg;
272 : };
273 : pong_data_t pong_data; /* populated only for pong msgs */
274 : };
275 : typedef struct sign_req sign_req_t;
276 :
277 : #define MAP_NAME fd_signs_map
278 0 : #define MAP_KEY key
279 0 : #define MAP_KEY_NULL ULONG_MAX
280 0 : #define MAP_KEY_INVAL(k) (k==ULONG_MAX)
281 0 : #define MAP_T sign_req_t
282 : #define MAP_MEMOIZE 0
283 : #include "../../util/tmpl/fd_map_dynamic.c"
284 :
285 : /* Because the sign tiles could be all busy when a contact info or a
286 : ping arrives, we need to save ping messages to be signed in a queue
287 : and dispatched in after_credit when there are sign tiles available.
288 : The size of the queue is sized to be the number of warm up
289 : requests we might burst to the queue all at once (at most
290 : FD_REPAIR_PEER_MAX), then doubled for good measure.
291 :
292 : There is a possibility that someone could spam pings to block other
293 : peers' pings (and prevent us from responding to those pings). To
294 : mitigate this, we track the number of pings currently living in the
295 : sign queue that belong to each peer. If a peer already has a pong
296 : living in the sign queue, we drop the pings from that peer.
297 :
298 : The peer could send us a new bogus ping every time we pop their ping
299 : from the sign queue, but there would be no way to prevent other
300 : peers' pings from getting processed, so the wasted work and impact
301 : would be minimal.
302 :
303 : Typical flow is that a pong will get added to the pong_queue during
304 : an after_frag call. Then on the following after_credit will get
305 : popped from the sign_queue and added to sign_map, and then dispatched
306 : to the sign tile.
307 :
308 : Note that after the first turbine shred arrives, the signs_queue also
309 : stores highest window index requests for slots between snapshot and
310 : turbine_slot0, which are dispatched first before any other requests
311 : as a catchup optimization. This doesn't break any of the inflight
312 : invariants as highest window index requests do not get added to the
313 : inflight table. */
314 :
315 : struct sign_pending {
316 : fd_repair_msg_t msg;
317 : pong_data_t pong_data; /* populated only for pong msgs */
318 : };
319 : typedef struct sign_pending sign_pending_t;
320 :
321 : #define QUEUE_NAME fd_signs_queue
322 0 : #define QUEUE_T sign_pending_t
323 0 : #define QUEUE_MAX (2*FD_REPAIR_PEER_MAX)
324 : #include "../../util/tmpl/fd_queue.c"
325 :
326 : struct ctx {
327 : long tsdebug; /* timestamp for debug printing */
328 :
329 : ulong repair_seed;
330 :
331 : fd_keyswitch_t * keyswitch;
332 : int halt_signing;
333 :
334 : fd_ip4_port_t repair_intake_addr;
335 : fd_ip4_port_t repair_serve_addr;
336 :
337 : fd_forest_t * forest;
338 : fd_policy_t * policy;
339 : fd_inflights_t * inflights;
340 : fd_repair_t * protocol;
341 :
342 : ulong enforce_fixed_fec_set; /* min slot where the feature is enforced */
343 :
344 : fd_pubkey_t identity_public_key;
345 :
346 : fd_wksp_t * wksp;
347 :
348 : fd_stem_context_t * stem;
349 :
350 : uchar in_kind[ MAX_IN_LINKS ];
351 : in_ctx_t in_links[ MAX_IN_LINKS ];
352 :
353 : int skip_frag;
354 :
355 : out_ctx_t net_out_ctx[1];
356 :
357 : out_ctx_t repair_out_ctx[1];
358 :
359 : /* repair_sign links (to sign tiles 1+) - for round-robin distribution */
360 :
361 : ulong repair_sign_cnt;
362 : out_ctx_t repair_sign_out_ctx[ MAX_SIGN_TILE_CNT ];
363 :
364 : ulong sign_rrobin_idx;
365 :
366 : /* Pending sign requests for async operations */
367 :
368 : uint pending_key_next;
369 : sign_req_t * signs_map; /* contains any request currently in the repair->sign or sign->repair dcache */
370 : sign_pending_t * pong_queue; /* contains any pong or initial warmup request waiting to be dispatched to repair->sign. Size is 2*FD_REPAIR_PEER_MAX */
371 :
372 : ushort net_id;
373 :
374 : /* Buffers for incoming unreliable frags */
375 : uchar net_buf[ FD_NET_MTU ];
376 : uchar sign_buf[ sizeof(fd_ed25519_sig_t) ];
377 :
378 : /* Store chunk for incoming reliable frags */
379 : ulong chunk;
380 : ulong snap_out_chunk; /* store second to last chunk for snap_out */
381 :
382 : fd_ip4_udp_hdrs_t intake_hdr[1];
383 : fd_ip4_udp_hdrs_t serve_hdr [1];
384 :
385 : fd_rnonce_ss_t repair_nonce_ss[1];
386 :
387 : ulong manifest_slot;
388 : struct {
389 : ulong send_pkt_cnt;
390 : ulong sent_pkt_types[FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_CNT];
391 : ulong repaired_slots;
392 : ulong current_slot;
393 : ulong old_shred;
394 : ulong last_requested_slot;
395 : ulong last_requested_orphan;
396 : ulong sign_tile_unavail;
397 : ulong rerequest;
398 : ulong malformed_ping;
399 : ulong unknown_peer_ping;
400 : ulong fail_sigverify_ping;
401 : fd_histf_t slot_compl_time[ 1 ];
402 : fd_histf_t response_latency[ 1 ];
403 : ulong blk_evicted;
404 : ulong blk_failed_insert;
405 :
406 : ulong slot_evicted;
407 : ulong slot_evicted_by;
408 : ulong slot_failed_insert;
409 :
410 : ulong failed_chain_verify_cnt;
411 : ulong failed_chain_verify_slot;
412 : } metrics[ 1 ];
413 :
414 : /* Slot-level metrics */
415 :
416 : fd_repair_metrics_t * slot_metrics;
417 : ulong turbine_slot0; // catchup considered complete after this slot
418 : };
419 : typedef struct ctx ctx_t;
420 :
421 : FD_FN_CONST static inline ulong
422 0 : scratch_align( void ) {
423 0 : return 128UL;
424 0 : }
425 :
426 : FD_FN_PURE static inline ulong
427 0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
428 0 : return 1UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
429 0 : }
430 :
431 : FD_FN_PURE static inline ulong
432 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
433 0 : ulong total_sign_depth = tile->repair.repair_sign_depth * tile->repair.repair_sign_cnt;
434 0 : int lg_sign_depth = fd_ulong_find_msb( fd_ulong_pow2_up(total_sign_depth) ) + 1;
435 :
436 0 : ulong l = FD_LAYOUT_INIT;
437 0 : l = FD_LAYOUT_APPEND( l, alignof(ctx_t), sizeof(ctx_t) );
438 0 : l = FD_LAYOUT_APPEND( l, fd_repair_align(), fd_repair_footprint () );
439 0 : l = FD_LAYOUT_APPEND( l, fd_forest_align(), fd_forest_footprint ( tile->repair.slot_max ) );
440 0 : l = FD_LAYOUT_APPEND( l, fd_policy_align(), fd_policy_footprint ( FD_DEDUP_CACHE_MAX, FD_REPAIR_PEER_MAX ) );
441 0 : l = FD_LAYOUT_APPEND( l, fd_inflights_align(), fd_inflights_footprint () );
442 0 : l = FD_LAYOUT_APPEND( l, fd_signs_map_align(), fd_signs_map_footprint ( lg_sign_depth ) );
443 0 : l = FD_LAYOUT_APPEND( l, fd_signs_queue_align(), fd_signs_queue_footprint() );
444 0 : l = FD_LAYOUT_APPEND( l, fd_repair_metrics_align(), fd_repair_metrics_footprint() );
445 0 : return FD_LAYOUT_FINI( l, scratch_align() );
446 0 : }
447 :
448 : /* Below functions manage the current pending sign requests. */
449 :
450 : static sign_req_t *
451 : sign_map_insert( ctx_t * ctx,
452 : fd_repair_msg_t const * msg,
453 0 : pong_data_t const * opt_pong_data ) {
454 :
455 : /* Check if there is any space for a new pending sign request. Should never fail as long as credit management is working. */
456 0 : if( FD_UNLIKELY( fd_signs_map_key_cnt( ctx->signs_map )==fd_signs_map_key_max( ctx->signs_map ) ) ) return NULL;
457 :
458 0 : sign_req_t * pending = fd_signs_map_insert( ctx->signs_map, ctx->pending_key_next++ );
459 0 : if( FD_UNLIKELY( !pending ) ) return NULL; /* Not possible, unless the same key is used twice. */
460 0 : pending->msg = *msg;
461 0 : pending->buflen = fd_repair_sz( msg );
462 0 : if( FD_UNLIKELY( opt_pong_data ) ) pending->pong_data = *opt_pong_data;
463 0 : return pending;
464 0 : }
465 :
466 : static int
467 : sign_map_remove( ctx_t * ctx,
468 0 : ulong key ) {
469 0 : sign_req_t * pending = fd_signs_map_query( ctx->signs_map, key, NULL );
470 0 : if( FD_UNLIKELY( !pending ) ) return -1;
471 0 : fd_signs_map_remove( ctx->signs_map, pending );
472 0 : return 0;
473 0 : }
474 :
475 : static void
476 : send_packet( ctx_t * ctx,
477 : fd_stem_context_t * stem,
478 : uint dst_ip_addr,
479 : ushort dst_port,
480 : uint src_ip_addr,
481 : uchar const * payload,
482 : ulong payload_sz,
483 0 : ulong tsorig ) {
484 0 : ctx->metrics->send_pkt_cnt++;
485 0 : uchar * packet = fd_chunk_to_laddr( ctx->net_out_ctx->mem, ctx->net_out_ctx->chunk );
486 0 : fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)packet;
487 0 : *hdr = *ctx->intake_hdr;
488 :
489 0 : fd_ip4_hdr_t * ip4 = hdr->ip4;
490 0 : ip4->saddr = src_ip_addr;
491 0 : ip4->daddr = dst_ip_addr;
492 0 : ip4->net_id = fd_ushort_bswap( ctx->net_id++ );
493 0 : ip4->check = 0U;
494 0 : ip4->net_tot_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_ip4_hdr_t)+sizeof(fd_udp_hdr_t)) );
495 0 : ip4->check = fd_ip4_hdr_check_fast( ip4 );
496 :
497 0 : fd_udp_hdr_t * udp = hdr->udp;
498 0 : udp->net_dport = dst_port;
499 0 : udp->net_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_udp_hdr_t)) );
500 0 : fd_memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), payload, payload_sz );
501 0 : hdr->udp->check = 0U;
502 :
503 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
504 0 : ulong sig = fd_disco_netmux_sig( dst_ip_addr, dst_port, dst_ip_addr, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) );
505 0 : ulong packet_sz = payload_sz + sizeof(fd_ip4_udp_hdrs_t);
506 0 : ulong chunk = ctx->net_out_ctx->chunk;
507 0 : fd_stem_publish( stem, ctx->net_out_ctx->idx, sig, chunk, packet_sz, 0UL, tsorig, tspub );
508 0 : ctx->net_out_ctx->chunk = fd_dcache_compact_next( chunk, packet_sz, ctx->net_out_ctx->chunk0, ctx->net_out_ctx->wmark );
509 0 : }
510 :
511 : /* Returns a sign_out context with max available credits.
512 : If no sign_out context has available credits, returns NULL. */
513 : static out_ctx_t *
514 0 : sign_avail_credits( ctx_t * ctx ) {
515 0 : out_ctx_t * sign_out = NULL;
516 0 : ulong max_credits = 0;
517 0 : for( uint i = 0; i < ctx->repair_sign_cnt; i++ ) {
518 0 : if( ctx->repair_sign_out_ctx[i].credits > max_credits ) {
519 0 : max_credits = ctx->repair_sign_out_ctx[i].credits;
520 0 : sign_out = &ctx->repair_sign_out_ctx[i];
521 0 : }
522 0 : }
523 0 : return sign_out;
524 0 : }
525 :
526 : /* Prepares the signing preimage and publishes a signing request that
527 : will be signed asynchronously by the sign tile. The signed data will
528 : be returned via dcache as a frag. */
529 : static void
530 : fd_repair_send_sign_request( ctx_t * ctx,
531 : out_ctx_t * sign_out,
532 : fd_repair_msg_t const * msg,
533 0 : pong_data_t const * opt_pong_data ) {
534 :
535 0 : if( FD_UNLIKELY( ctx->halt_signing ) ) FD_LOG_CRIT(( "can't dispatch sign requests while halting signing" ));
536 :
537 : /* New sign request */
538 0 : sign_req_t * pending = sign_map_insert( ctx, msg, opt_pong_data );
539 0 : if( FD_UNLIKELY( !pending ) ) return;
540 :
541 0 : ulong sig = 0;
542 0 : ulong preimage_sz = 0;
543 0 : uchar * dst = fd_chunk_to_laddr( sign_out->mem, sign_out->chunk );
544 :
545 0 : if( FD_UNLIKELY( msg->kind == FD_REPAIR_KIND_PONG ) ) {
546 0 : uchar pre_image[FD_REPAIR_PONG_PREIMAGE_SZ];
547 0 : preimage_pong( &opt_pong_data->hash, pre_image, sizeof(pre_image) );
548 0 : preimage_sz = FD_REPAIR_PONG_PREIMAGE_SZ;
549 0 : fd_memcpy( dst, pre_image, preimage_sz );
550 0 : sig = ((ulong)pending->key << 32) | (uint)FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519;
551 0 : } else {
552 : /* Sign and prepare the message directly into the pending buffer */
553 0 : uchar * preimage = preimage_req( &pending->msg, &preimage_sz );
554 0 : fd_memcpy( dst, preimage, preimage_sz );
555 0 : sig = ((ulong)pending->key << 32) | (uint)FD_KEYGUARD_SIGN_TYPE_ED25519;
556 0 : }
557 :
558 0 : fd_stem_publish( ctx->stem, sign_out->idx, sig, sign_out->chunk, preimage_sz, 0UL, 0UL, 0UL );
559 0 : sign_out->chunk = fd_dcache_compact_next( sign_out->chunk, preimage_sz, sign_out->chunk0, sign_out->wmark );
560 :
561 0 : ctx->metrics->sent_pkt_types[metric_index[msg->kind]]++;
562 0 : sign_out->credits--;
563 0 : }
564 :
565 : static inline int
566 : before_frag( ctx_t * ctx,
567 : ulong in_idx,
568 : ulong seq FD_PARAM_UNUSED,
569 0 : ulong sig ) {
570 0 : uint in_kind = ctx->in_kind[ in_idx ];
571 0 : if( FD_LIKELY ( in_kind==IN_KIND_NET ) ) return fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR;
572 0 : if( FD_UNLIKELY( in_kind==IN_KIND_SHRED ) ) return fd_int_if( fd_forest_root_slot( ctx->forest )==ULONG_MAX, -1, 0 ); /* not ready to read frag */
573 0 : if( FD_UNLIKELY( in_kind==IN_KIND_GOSSIP ) ) {
574 0 : return sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO &&
575 0 : sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE;
576 0 : }
577 0 : if( FD_UNLIKELY( in_kind==IN_KIND_REPLAY ) ) return sig!=REPLAY_SIG_REASM_EVICTED;
578 0 : return 0;
579 0 : }
580 :
581 : static void
582 : during_frag( ctx_t * ctx,
583 : ulong in_idx,
584 : ulong seq FD_PARAM_UNUSED,
585 : ulong sig,
586 : ulong chunk,
587 : ulong sz,
588 0 : ulong ctl ) {
589 0 : ctx->skip_frag = 0;
590 :
591 0 : uint in_kind = ctx->in_kind[ in_idx ];
592 0 : in_ctx_t const * in_ctx = &ctx->in_links[ in_idx ];
593 0 : ctx->chunk = chunk;
594 :
595 0 : if( FD_UNLIKELY( in_kind==IN_KIND_NET ) ) {
596 0 : ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
597 0 : FD_TEST( hdr_sz <= sz ); /* Should be ensured by the net tile */
598 0 : uchar const * dcache_entry = fd_net_rx_translate_frag( &in_ctx->net_rx, chunk, ctl, sz );
599 0 : fd_memcpy( ctx->net_buf, dcache_entry, sz );
600 0 : return;
601 0 : }
602 :
603 0 : if( FD_UNLIKELY( in_kind==IN_KIND_GENESIS ) ) {
604 0 : FD_TEST( sizeof(fd_genesis_meta_t)<=sig );
605 0 : return;
606 0 : }
607 :
608 0 : if( FD_UNLIKELY( sz!=0UL && ( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark || sz>in_ctx->mtu ) ) )
609 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu] in kind %u", chunk, sz, in_ctx->chunk0, in_ctx->wmark, in_kind ));
610 :
611 0 : if( FD_UNLIKELY( in_kind==IN_KIND_SNAP ) ) {
612 0 : if( FD_UNLIKELY( fd_ssmsg_sig_message( sig )!=FD_SSMSG_DONE ) ) ctx->snap_out_chunk = chunk;
613 0 : return;
614 0 : }
615 :
616 0 : if( FD_UNLIKELY( in_kind==IN_KIND_SIGN ) ) {
617 : /* sign_repair is unreliable, so we copy the frag for convention.
618 : Theoretically impossible to overrun. */
619 0 : uchar const * dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
620 0 : fd_memcpy( ctx->sign_buf, dcache_entry, sz );
621 0 : return;
622 0 : }
623 0 : }
624 :
625 : static inline void
626 : after_snap( ctx_t * ctx,
627 : ulong sig,
628 0 : uchar const * chunk ) {
629 0 : if( FD_UNLIKELY( fd_ssmsg_sig_message( sig )!=FD_SSMSG_DONE ) ) return;
630 0 : fd_snapshot_manifest_t * manifest = (fd_snapshot_manifest_t *)chunk;
631 :
632 0 : fd_forest_init( ctx->forest, manifest->slot );
633 0 : }
634 :
635 : static inline void
636 0 : after_gossip( ctx_t * ctx, fd_gossip_update_message_t const * msg, ulong sig ) {
637 0 : switch( sig ) {
638 0 : case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE: {
639 0 : fd_policy_peer_remove( ctx->policy, fd_type_pun_const( msg->origin ) );
640 0 : break;
641 0 : }
642 0 : case FD_GOSSIP_UPDATE_TAG_CONTACT_INFO: {
643 0 : fd_gossip_contact_info_t const * contact_info = msg->contact_info->value;
644 0 : fd_ip4_port_t repair_peer;
645 0 : repair_peer.addr = contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_SERVE_REPAIR ].is_ipv6 ? 0U : contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_SERVE_REPAIR ].ip4;
646 0 : repair_peer.port = contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_SERVE_REPAIR ].port;
647 0 : if( FD_UNLIKELY( !repair_peer.addr || !repair_peer.port ) ) return;
648 0 : fd_policy_peer_t const * peer = fd_policy_peer_upsert( ctx->policy, fd_type_pun_const( msg->origin ), &repair_peer );
649 0 : if( FD_LIKELY( peer && !fd_signs_queue_full( ctx->pong_queue ) ) ) {
650 : /* The repair process uses a Ping-Pong protocol that incurs one
651 : round-trip time (RTT) for the initmial repair request. To
652 : optimize this, we proactively send a placeholder repair request
653 : as soon as we receive a peer's contact information for the first
654 : time, effectively prepaying the RTT cost. */
655 0 : fd_repair_msg_t * init = fd_repair_shred( ctx->protocol, fd_type_pun_const( msg->origin ), (ulong)fd_log_wallclock()/1000000L, 0, 0, 0 );
656 0 : fd_signs_queue_push( ctx->pong_queue, (sign_pending_t){ .msg = *init } );
657 0 : }
658 0 : break;
659 0 : }
660 0 : default: FD_LOG_ERR(( "bad gossip sig %lu", sig ));
661 0 : }
662 0 : }
663 :
664 : static inline void
665 : after_sign( ctx_t * ctx,
666 : ulong in_idx,
667 : ulong sig,
668 0 : fd_stem_context_t * stem ) {
669 0 : ulong pending_key = sig >> 32;
670 : /* Look up the pending request. Since the repair_sign links are
671 : reliable, the incoming sign_repair fragments represent a complete
672 : set of the previously sent outgoing messages. However, with
673 : multiple sign tiles, the responses may arrive interleaved. */
674 :
675 : /* Find which sign tile sent this response and increment its credits */
676 0 : for( uint i = 0; i < ctx->repair_sign_cnt; i++ ) {
677 0 : if( ctx->repair_sign_out_ctx[i].in_idx == in_idx ) {
678 0 : if( FD_LIKELY( ctx->repair_sign_out_ctx[i].credits < ctx->repair_sign_out_ctx[i].max_credits ) ) ctx->repair_sign_out_ctx[i].credits++;
679 0 : break;
680 0 : }
681 0 : }
682 :
683 0 : sign_req_t * pending_ = fd_signs_map_query( ctx->signs_map, pending_key, NULL );
684 0 : if( FD_UNLIKELY( !pending_ ) ) FD_LOG_CRIT(( "No pending request found for key %lu", pending_key )); /* implies either bad programmer error or something happened with sign tile */
685 :
686 0 : sign_req_t pending[1] = { *pending_ }; /* Make a copy of the pending request so we can sign_map_remove immediately. */
687 0 : sign_map_remove( ctx, pending_key );
688 :
689 : /* This is a pong message */
690 0 : if( FD_UNLIKELY( pending->msg.kind == FD_REPAIR_KIND_PONG ) ) {
691 0 : fd_policy_peer_t * peer = fd_policy_peer_query( ctx->policy, &pending->pong_data.key );
692 0 : if( FD_LIKELY( peer && peer->ping ) ) peer->ping--; /* prevent underflow if the peer was removed/readded */
693 :
694 0 : fd_memcpy( pending->msg.pong.sig, ctx->sign_buf, 64UL );
695 0 : send_packet( ctx, stem, pending->pong_data.peer_addr.addr, pending->pong_data.peer_addr.port, pending->pong_data.daddr, pending->buf, fd_repair_sz( &pending->msg ), fd_frag_meta_ts_comp( fd_tickcount() ) );
696 0 : return;
697 0 : }
698 :
699 : /* Inject the signature into the pending request */
700 0 : fd_memcpy( pending->buf + 4, ctx->sign_buf, 64UL );
701 0 : uint src_ip4 = 0U;
702 :
703 : /* This is a warmup message */
704 0 : if( FD_UNLIKELY( pending->msg.kind == FD_REPAIR_KIND_SHRED && pending->msg.shred.slot == 0 ) ) {
705 0 : fd_policy_peer_t * peer = fd_policy_peer_query( ctx->policy, &pending->msg.shred.to );
706 0 : if( FD_UNLIKELY( peer ) ) send_packet( ctx, stem, peer->ip4, peer->port, src_ip4, pending->buf, pending->buflen, fd_frag_meta_ts_comp( fd_tickcount() ) );
707 0 : else { /* This is a warmup request for a peer that is no longer active. There's no reason to pick another peer for a warmup rq, so just drop it. */ }
708 0 : return;
709 0 : }
710 :
711 : /* This is a regular repair shred request
712 :
713 : We need to ensure we always send out any shred requests we have,
714 : because policy_next has no way to revisit a shred. But the fact
715 : that peers can drop out of the peer list makes this complicated.
716 : If the peer is still there (common), it's fine. If the peer is not
717 : there, we can add this request to the inflights table, pretend
718 : we've sent it and let the inflight timeout request it down the
719 : line. */
720 :
721 0 : fd_policy_peer_t * active = fd_policy_peer_query( ctx->policy, &pending->msg.shred.to );
722 0 : int is_regular_req = pending->msg.kind == FD_REPAIR_KIND_SHRED && pending->msg.shred.nonce > 0; // not a highest/orphan request
723 :
724 0 : if( FD_UNLIKELY( !active ) ) {
725 0 : if( FD_LIKELY( is_regular_req ) ) {
726 : /* Artificially add to the inflights table, pretend we've sent it
727 : and let the inflight timeout request it down the line. */
728 0 : fd_inflights_request_insert( ctx->inflights, pending->msg.shred.nonce, &pending->msg.shred.to, pending->msg.shred.slot, pending->msg.shred.shred_idx );
729 0 : }
730 0 : return;
731 0 : }
732 : /* Happy path - all is well, our peer didn't drop out from beneath us. */
733 0 : if( FD_LIKELY( is_regular_req ) ) {
734 0 : fd_inflights_request_insert( ctx->inflights, pending->msg.shred.nonce, &pending->msg.shred.to, pending->msg.shred.slot, pending->msg.shred.shred_idx );
735 0 : fd_policy_peer_request_update( ctx->policy, &pending->msg.shred.to );
736 0 : }
737 :
738 0 : if( FD_UNLIKELY( pending->msg.kind == FD_REPAIR_KIND_ORPHAN ) ) ctx->metrics->last_requested_orphan = pending->msg.orphan.slot;
739 0 : else ctx->metrics->last_requested_slot = pending->msg.shred.slot;
740 :
741 0 : send_packet( ctx, stem, active->ip4, active->port, src_ip4, pending->buf, pending->buflen, fd_frag_meta_ts_comp( fd_tickcount() ) );
742 0 : }
743 :
744 : static int
745 0 : blk_insert_check( ctx_t * ctx, fd_forest_blk_t * new_blk, ulong new_slot, ulong evicted ) {
746 0 : if( FD_UNLIKELY( !new_blk ) ) {
747 0 : ctx->metrics->blk_failed_insert++;
748 0 : ctx->metrics->slot_failed_insert = new_slot;
749 0 : return 0;
750 0 : } else {
751 0 : if( FD_UNLIKELY( evicted != ULONG_MAX ) ) {
752 0 : ctx->metrics->blk_evicted++;
753 0 : ctx->metrics->slot_evicted = evicted;
754 0 : ctx->metrics->slot_evicted_by = new_slot;
755 0 : }
756 0 : return 1;
757 0 : }
758 0 : }
759 :
760 : static inline void
761 : after_shred( ctx_t * ctx,
762 : ulong sig,
763 : fd_shred_t * shred,
764 : ulong nonce,
765 : fd_hash_t * mr,
766 0 : fd_hash_t * cmr ) {
767 : /* Insert the shred sig (shared by all shred members in the FEC set)
768 : into the map. */
769 0 : int is_code = fd_shred_is_code( fd_shred_type( shred->variant ) );
770 0 : int src = fd_shred_sig_src( sig )==SHRED_SIG_SRC_TURBINE ? SHRED_SRC_TURBINE : SHRED_SRC_REPAIR /* bad or good repair */ ;
771 :
772 0 : if( FD_LIKELY( !is_code ) ) {
773 0 : long rtt = 0;
774 0 : fd_pubkey_t peer;
775 0 : if( FD_UNLIKELY( src == SHRED_SRC_REPAIR && ( rtt = fd_inflights_request_remove( ctx->inflights, nonce, shred->slot, shred->idx, &peer ) ) > 0 ) ) {
776 0 : fd_policy_peer_response_update( ctx->policy, &peer, rtt );
777 0 : fd_histf_sample( ctx->metrics->response_latency, (ulong)rtt );
778 0 : }
779 :
780 : /* we don't want to add a slot to the forest that chains to a slot
781 : older than root, to avoid filling forest up with junk.
782 : Especially if we are close to full and we are having trouble
783 : rooting, we can't rely on publishing to prune these useless
784 : subtrees. TODO: do the same with reasm/store/shred? */
785 0 : if( FD_UNLIKELY( shred->slot - shred->data.parent_off < fd_forest_root_slot( ctx->forest ) ) ) return;
786 :
787 0 : int slot_complete = !!(shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE);
788 0 : int ref_tick = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
789 0 : ulong evicted = ULONG_MAX;
790 0 : fd_forest_blk_t * blk = fd_forest_blk_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off, &evicted );
791 0 : if( FD_LIKELY( blk_insert_check( ctx, blk, shred->slot, evicted ) ) ) {
792 0 : fd_forest_data_shred_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off, shred->idx, shred->fec_set_idx, slot_complete, ref_tick, src, mr, cmr );
793 0 : }
794 0 : } else {
795 0 : fd_forest_code_shred_insert( ctx->forest, shred->slot, shred->idx );
796 0 : }
797 0 : }
798 :
799 : /* Kicks off the chained merkle verification starting at a slot with
800 : a confirmed, canonical block_id. Either finishes successfully and
801 : returns early, or detects an incorrect FEC set and clears it. In
802 : this case the verification is paused and state is saved at where
803 : it left off. Verification can be re-triggered in after_fec as well. */
804 : static inline void
805 : check_confirmed( ctx_t * ctx,
806 : fd_forest_blk_t * blk,
807 0 : fd_hash_t const * confirmed_bid ) {
808 :
809 0 : if( FD_LIKELY( !blk->chain_confirmed && blk->complete_idx != UINT_MAX && blk->buffered_idx == blk->complete_idx ) ) {
810 : /* The above conditions say that all the shreds of the block have arrived. */
811 0 : fd_forest_blk_t * bad_blk = fd_forest_fec_chain_verify( ctx->forest, blk, confirmed_bid );
812 0 : if( FD_LIKELY( !bad_blk ) ) {
813 : /* chain verified successfully from blk to as far as we have fec data */
814 0 : return;
815 0 : }
816 :
817 0 : uint bad_fec_idx = fd_forest_merkle_last_incorrect_idx( bad_blk );
818 0 : fd_hash_t const * expected = (bad_fec_idx == bad_blk->complete_idx - (FD_FEC_SHRED_CNT - 1)) ? &bad_blk->confirmed_bid : &bad_blk->merkle_roots[(bad_fec_idx / 32) + 1].cmr;
819 :
820 0 : FD_BASE58_ENCODE_32_BYTES( confirmed_bid->uc, confirmed_bid_b58 );
821 0 : FD_BASE58_ENCODE_32_BYTES( expected->uc, expected_mr );
822 0 : FD_BASE58_ENCODE_32_BYTES( bad_blk->merkle_roots[bad_fec_idx].mr.uc, recorded_mr );
823 :
824 0 : FD_LOG_WARNING(( "[%s] slot %lu block_id %s confirmation detected incorrect FECs. bad FEC is slot %lu fec set %u. expected mr (%s) != recorded mr (%s)",
825 0 : __func__,
826 0 : blk->slot,
827 0 : confirmed_bid_b58,
828 0 : bad_blk->slot,
829 0 : bad_fec_idx,
830 0 : expected_mr,
831 0 : recorded_mr ));
832 :
833 0 : ctx->metrics->failed_chain_verify_cnt++;
834 0 : ctx->metrics->failed_chain_verify_slot = bad_blk->slot;
835 :
836 : /* If we have a bad block, we need to dump and repair backwards from
837 : the point where the merkle root is incorrect.
838 : We start only by dumping the last incorrect FEC. It's possible that
839 : this is the only incorrect one. If it isn't though, when the slot
840 : recompletes, this function will trigger again and we will dump the
841 : second to last incorrect FEC. */
842 :
843 0 : fd_forest_fec_clear( ctx->forest, bad_blk->slot, bad_fec_idx, FD_FEC_SHRED_CNT - 1 );
844 0 : }
845 0 : }
846 :
847 : static inline void
848 : after_fec( ctx_t * ctx,
849 : fd_shred_t * shred,
850 : fd_hash_t * mr,
851 0 : fd_hash_t * cmr ) {
852 :
853 : /* When this is a FEC completes msg, it is implied that all the
854 : other shreds in the FEC set can also be inserted. Shred inserts
855 : into the forest are idempotent so it is fine to insert the same
856 : shred multiple times. */
857 :
858 0 : int slot_complete = !!( shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE );
859 0 : int ref_tick = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
860 :
861 : /* Similar to after_shred, do not insert a slot that chains to a slot older than root */
862 0 : if( FD_UNLIKELY( shred->slot - shred->data.parent_off < fd_forest_root_slot( ctx->forest ) ) ) return;
863 0 : ulong evicted = ULONG_MAX;
864 0 : fd_forest_blk_t * ele = fd_forest_blk_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off, &evicted );
865 0 : if( FD_UNLIKELY( !blk_insert_check( ctx, ele, shred->slot, evicted ) ) ) return;
866 0 : fd_forest_fec_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off, shred->idx, shred->fec_set_idx, slot_complete, ref_tick, mr, cmr );
867 :
868 : /* metrics for completed slots */
869 0 : if( FD_UNLIKELY( ele->complete_idx != UINT_MAX && ele->buffered_idx==ele->complete_idx ) ) {
870 0 : long now = fd_tickcount();
871 0 : long start_ts = ele->first_req_ts == 0 || ele->slot >= ctx->turbine_slot0 ? ele->first_shred_ts : ele->first_req_ts;
872 0 : ulong duration_ticks = (ulong)(now - start_ts);
873 0 : fd_histf_sample( ctx->metrics->slot_compl_time, duration_ticks );
874 0 : fd_repair_metrics_add_slot( ctx->slot_metrics, ele->slot, start_ts, now, ele->repair_cnt, ele->turbine_cnt );
875 : /* Note: this log does not imply that the slot is fully executable.
876 : It's possible that we have a slot that doesn't chain verify,
877 : which could be un-executable. */
878 0 : FD_BASE58_ENCODE_32_BYTES( ele->merkle_roots[ele->complete_idx / 32].mr.uc, block_id );
879 0 : FD_BASE58_ENCODE_32_BYTES( mr->uc, fec_mr );
880 0 : FD_LOG_INFO(( "[%s] slot is complete %lu. num_data_shreds: %u, num_repaired: %u, num_turbine: %u, num_recovered: %u, duration: %.2f ms. last recvd fec: %u, mr %s. current block_id: %s",
881 0 : __func__,
882 0 : ele->slot,
883 0 : ele->complete_idx + 1,
884 0 : ele->repair_cnt,
885 0 : ele->turbine_cnt,
886 0 : ele->recovered_cnt,
887 0 : (double)fd_metrics_convert_ticks_to_nanoseconds(duration_ticks) / 1e6,
888 0 : shred->fec_set_idx,
889 0 : fec_mr,
890 0 : block_id ));
891 0 : }
892 :
893 : /* re-trigger continuation of chained merkle verification if this FEC
894 : set enables it TODO MOVE TO AFTER_SHRED? */
895 0 : if( FD_UNLIKELY( ele->lowest_verified_fec == (shred->fec_set_idx / 32UL) + 1 ) &&
896 0 : ele->buffered_idx == ele->complete_idx ) {
897 0 : check_confirmed( ctx, ele, &ele->confirmed_bid /* if lowest_verified_fec is not UINT_MAX, confirmed_bid must be populated */ );
898 0 : }
899 0 : }
900 :
901 : static inline void
902 : after_net( ctx_t * ctx,
903 0 : ulong sz ) {
904 0 : fd_eth_hdr_t * eth; fd_ip4_hdr_t * ip4; fd_udp_hdr_t * udp;
905 0 : uchar * data; ulong data_sz;
906 0 : if( FD_UNLIKELY( !fd_ip4_udp_hdr_strip( ctx->net_buf, sz, &data, &data_sz, ð, &ip4, &udp ) ) ) {
907 0 : ctx->metrics->malformed_ping++;
908 0 : return;
909 0 : }
910 0 : fd_ip4_port_t peer_addr = { .addr=ip4->saddr, .port=udp->net_sport };
911 :
912 0 : fd_repair_ping_t ping[1];
913 0 : int err = fd_repair_ping_de( ping, data, data_sz );
914 0 : if( FD_UNLIKELY( err ) ) {
915 0 : ctx->metrics->malformed_ping++;
916 0 : return;
917 0 : }
918 :
919 0 : fd_policy_peer_t * peer = fd_policy_peer_query( ctx->policy, &ping->ping.from );
920 0 : if( FD_UNLIKELY( !peer ) ) {
921 0 : ctx->metrics->unknown_peer_ping++;
922 0 : return;
923 0 : }
924 0 : if( FD_UNLIKELY( peer->ping ) ) return;
925 0 : if( FD_UNLIKELY( fd_signs_queue_full( ctx->pong_queue ) ) ) return;
926 :
927 0 : fd_sha512_t sha[1];
928 0 : if( FD_UNLIKELY( FD_ED25519_SUCCESS != fd_ed25519_verify( ping->ping.hash.uc, 32UL, ping->ping.sig, ping->ping.from.uc, sha ) ) ) {
929 0 : ctx->metrics->fail_sigverify_ping++;
930 0 : return;
931 0 : }
932 :
933 : /* Any gossip peer can send a ping, but they are bounded to at most
934 : one ping in the queue so they can't evict others' pings without
935 : multiple gossip identities. */
936 :
937 0 : fd_repair_msg_t * pong = fd_repair_pong( ctx->protocol, &ping->ping.hash );
938 0 : fd_signs_queue_push( ctx->pong_queue, (sign_pending_t){ .msg = *pong, .pong_data = { .peer_addr = peer_addr, .hash = ping->ping.hash, .daddr = ip4->daddr, .key = ping->ping.from } } );
939 0 : peer->ping++;
940 0 : }
941 :
942 : static inline void
943 : after_evict( ctx_t * ctx,
944 0 : fd_fec_evicted_t * evicted ) {
945 0 : fd_forest_fec_clear( ctx->forest, evicted->slot, evicted->fec_set_idx, FD_FEC_SHRED_CNT - 1 );
946 0 : }
947 :
948 : static inline void
949 : after_tower( ctx_t * ctx,
950 : ulong sig,
951 0 : uchar const * chunk ) {
952 :
953 0 : switch( sig ) {
954 0 : case FD_TOWER_SIG_SLOT_DONE: {
955 0 : fd_tower_slot_done_t const * msg = (fd_tower_slot_done_t const *)fd_type_pun_const( chunk );
956 0 : if( FD_LIKELY( msg->root_slot!=ULONG_MAX && msg->root_slot > fd_forest_root_slot( ctx->forest ) ) ) fd_forest_publish( ctx->forest, msg->root_slot );
957 0 : break;
958 0 : }
959 0 : case FD_TOWER_SIG_SLOT_CONFIRMED: {
960 0 : fd_tower_slot_confirmed_t const * msg = (fd_tower_slot_confirmed_t const *)fd_type_pun_const( chunk );
961 0 : if( msg->slot > fd_forest_root_slot( ctx->forest ) && (msg->level >= FD_TOWER_SLOT_CONFIRMED_DUPLICATE ) ) {
962 0 : fd_forest_blk_t * blk = fd_forest_query( ctx->forest, msg->slot );
963 0 : if( FD_UNLIKELY( !blk ) ) {
964 : /* If we receive a confirmation for a slot we don't have,
965 : create a sentinel forest block that we can repair from. */
966 0 : ulong evicted = ULONG_MAX;
967 0 : blk = fd_forest_blk_insert( ctx->forest, msg->slot, ULONG_MAX, &evicted );
968 0 : FD_LOG_INFO(("[%s] creating sentinel for duplicate confirmed block %lu", __func__, msg->slot));
969 0 : if( FD_UNLIKELY( !blk_insert_check( ctx, blk, msg->slot, evicted ) ) ) return;
970 0 : }
971 :
972 : /* Confirm the block */
973 0 : blk->confirmed_bid = msg->block_id;
974 0 : check_confirmed( ctx, blk, &msg->block_id );
975 0 : }
976 0 : break;
977 0 : }
978 0 : default: return;
979 0 : }
980 0 : }
981 :
982 : static void
983 : after_frag( ctx_t * ctx,
984 : ulong in_idx,
985 : ulong seq FD_PARAM_UNUSED,
986 : ulong sig,
987 : ulong sz,
988 : ulong tsorig FD_PARAM_UNUSED,
989 : ulong tspub,
990 0 : fd_stem_context_t * stem ) {
991 0 : if( FD_UNLIKELY( ctx->skip_frag ) ) return;
992 :
993 0 : ctx->stem = stem;
994 0 : in_ctx_t const * in_ctx = &ctx->in_links[ in_idx ];
995 0 : uint in_kind = ctx->in_kind[ in_idx ];
996 :
997 0 : switch( in_kind ) {
998 : /* Unreliable frags */
999 0 : case IN_KIND_NET: {
1000 0 : after_net( ctx, sz );
1001 0 : break;
1002 0 : }
1003 0 : case IN_KIND_SIGN: {
1004 0 : after_sign( ctx, in_idx, sig, stem );
1005 0 : break;
1006 0 : }
1007 : /* Reliable frags read directly from dcache */
1008 0 : case IN_KIND_SNAP: {
1009 0 : after_snap( ctx, sig, fd_chunk_to_laddr( ctx->in_links[ in_idx ].mem, ctx->snap_out_chunk ) );
1010 0 : break;
1011 0 : }
1012 0 : case IN_KIND_GENESIS: {
1013 0 : fd_genesis_meta_t const * meta = (fd_genesis_meta_t const *)fd_type_pun_const( fd_chunk_to_laddr( in_ctx->mem, ctx->chunk ) );
1014 0 : if( meta->bootstrap ) fd_forest_init( ctx->forest, 0 );
1015 0 : break;
1016 0 : }
1017 0 : case IN_KIND_GOSSIP: {
1018 0 : fd_gossip_update_message_t const * msg = (fd_gossip_update_message_t const *)fd_type_pun_const( fd_chunk_to_laddr( in_ctx->mem, ctx->chunk ) );
1019 0 : after_gossip( ctx, msg, sig );
1020 0 : break;
1021 0 : }
1022 0 : case IN_KIND_REPLAY: {
1023 0 : fd_replay_fec_evicted_t const * msg = (fd_replay_fec_evicted_t const *)fd_type_pun_const( fd_chunk_to_laddr( in_ctx->mem, ctx->chunk ) );
1024 0 : fd_forest_fec_clear( ctx->forest, msg->slot, msg->fec_set_idx, FD_FEC_SHRED_CNT - 1 );
1025 0 : break;
1026 0 : }
1027 0 : case IN_KIND_TOWER: {
1028 0 : after_tower( ctx, sig, fd_chunk_to_laddr( in_ctx->mem, ctx->chunk ) );
1029 0 : break;
1030 0 : }
1031 0 : case IN_KIND_SHRED: {
1032 :
1033 : /* There are 3 message types from shred:
1034 : 1. resolver evict - incomplete FEC set is evicted by resolver
1035 : 2. fec complete - FEC set is completed by resolver. Also contains a shred.
1036 : 3. shred - new shred
1037 :
1038 : Msgs 2 and 3 have a shred header in the dcache. Msg 1 is empty. */
1039 :
1040 0 : if( FD_UNLIKELY( sig==SHRED_SIG_FEC_EVICTED ) ) {
1041 0 : fd_fec_evicted_t * evicted = (fd_fec_evicted_t *)fd_type_pun( fd_chunk_to_laddr( in_ctx->mem, ctx->chunk ) );
1042 0 : after_evict( ctx, evicted );
1043 0 : return;
1044 0 : }
1045 :
1046 0 : uchar * src = fd_chunk_to_laddr( in_ctx->mem, ctx->chunk );
1047 0 : fd_shred_base_t * shred_msg = (fd_shred_base_t *)fd_type_pun( src );
1048 0 : fd_shred_t * shred = &shred_msg->shred; /* completes & shred messages all have a shred header at the same offset (after merkle root) */
1049 :
1050 0 : if( FD_UNLIKELY( shred->slot <= fd_forest_root_slot( ctx->forest ) ) ) {
1051 0 : ctx->metrics->old_shred++;
1052 0 : return;
1053 0 : };
1054 :
1055 0 : if( FD_UNLIKELY( shred->slot > ctx->metrics->current_slot ) ) {
1056 0 : FD_LOG_INFO(( "[Turbine] slot: %lu, root: %lu", shred->slot, fd_forest_root_slot( ctx->forest ) ));
1057 0 : ctx->metrics->current_slot = shred->slot;
1058 0 : }
1059 :
1060 0 : if( FD_UNLIKELY( ctx->turbine_slot0 == ULONG_MAX ) ) {
1061 0 : ctx->turbine_slot0 = shred->slot;
1062 0 : fd_repair_metrics_set_turbine_slot0( ctx->slot_metrics, shred->slot );
1063 0 : fd_policy_set_turbine_slot0( ctx->policy, shred->slot );
1064 :
1065 : /* On first turbine shred, seed repair by queuing highest_shred
1066 : requests for slots between snapshot and turbine_slot0. This
1067 : bypasses forest entirely and dispatches directly via the sign
1068 : queue. Cap at half queue capacity to leave room for pongs. */
1069 0 : ulong root = fd_forest_root_slot( ctx->forest );
1070 0 : if( FD_LIKELY( root != ULONG_MAX && shred->slot > root ) ) {
1071 0 : ulong capacity = fd_signs_queue_max( ctx->pong_queue ) - fd_signs_queue_cnt( ctx->pong_queue );
1072 0 : ulong seed_cnt = fd_ulong_min( shred->slot-root, capacity/2 );
1073 0 : long now_ms = fd_log_wallclock()/(long)1e6;
1074 0 : for( ulong i=1; i<=seed_cnt; i++ ) {
1075 0 : ulong slot = root + i;
1076 0 : fd_pubkey_t const * peer = fd_policy_peer_select( ctx->policy );
1077 0 : if( FD_UNLIKELY( !peer ) ) break;
1078 0 : fd_repair_msg_t * msg = fd_repair_highest_shred( ctx->protocol, peer, (ulong)now_ms, 0, slot, 0 );
1079 0 : if( FD_LIKELY( msg ) ) fd_signs_queue_push( ctx->pong_queue, (sign_pending_t){ .msg = *msg } );
1080 0 : }
1081 0 : }
1082 0 : }
1083 :
1084 :
1085 0 : if( FD_UNLIKELY( sig==SHRED_SIG_FEC_COMPLETE || sig==SHRED_SIG_FEC_COMPLETE_LEADER ) ) {
1086 0 : fd_fec_complete_t * complete_msg = (fd_fec_complete_t *)fd_type_pun( src );
1087 0 : after_fec( ctx, &complete_msg->last_shred_hdr, &complete_msg->merkle_root, &complete_msg->chained_merkle_root );
1088 :
1089 : /* forward along to replay */
1090 0 : memcpy( fd_chunk_to_laddr( ctx->repair_out_ctx->mem, ctx->repair_out_ctx->chunk ), src, sz );
1091 0 : fd_stem_publish( ctx->stem, ctx->repair_out_ctx->idx, sig, ctx->repair_out_ctx->chunk, sz, 0UL, 0UL, tspub );
1092 0 : ctx->repair_out_ctx->chunk = fd_dcache_compact_next( ctx->repair_out_ctx->chunk, sz, ctx->repair_out_ctx->chunk0, ctx->repair_out_ctx->wmark );
1093 0 : } else if( FD_LIKELY( fd_shred_sig_res( sig )!=SHRED_SIG_RESULT_EQVOC ) ) {
1094 0 : fd_hash_t * cmr = (fd_hash_t *)fd_type_pun(shred_msg->shred_ + fd_shred_chain_off( shred->variant ));
1095 0 : after_shred( ctx, sig, shred, shred_msg->rnonce, &shred_msg->merkle_root, cmr );
1096 0 : }
1097 :
1098 : /* update metrics */
1099 0 : ctx->metrics->repaired_slots = fd_forest_highest_repaired_slot( ctx->forest );
1100 0 : return;
1101 0 : }
1102 0 : default: FD_LOG_ERR(( "bad in_kind %u", in_kind )); /* Should never reach here since before_frag should have filtered out any unexpected frags. */
1103 0 : }
1104 0 : }
1105 :
1106 : static inline void
1107 : after_credit( ctx_t * ctx,
1108 : fd_stem_context_t * stem FD_PARAM_UNUSED,
1109 : int * opt_poll_in FD_PARAM_UNUSED,
1110 0 : int * charge_busy ) {
1111 0 : long now = fd_log_wallclock();
1112 :
1113 0 : if( FD_UNLIKELY( ctx->halt_signing ) ) {
1114 0 : *charge_busy = 1;
1115 0 : return;
1116 0 : }
1117 :
1118 : /* Verify that there is at least one sign tile with available credits.
1119 : If not, we can't send any requests and leave early. */
1120 0 : out_ctx_t * sign_out = sign_avail_credits( ctx );
1121 0 : if( FD_UNLIKELY( !sign_out ) ) {
1122 0 : ctx->metrics->sign_tile_unavail++;
1123 0 : return;
1124 0 : }
1125 :
1126 : /* If inflights is at capacity, then the only thing we can send is:
1127 : pongs, initial highest window index requests, or resend things that
1128 : are already inflight. Any new requests that would cause an
1129 : inflight to be added to the queue must be deferred. */
1130 :
1131 0 : if( FD_UNLIKELY( !fd_signs_queue_empty( ctx->pong_queue ) ) ) {
1132 0 : sign_pending_t signable = fd_signs_queue_pop( ctx->pong_queue );
1133 0 : fd_repair_send_sign_request( ctx, sign_out, &signable.msg, signable.msg.kind == FD_REPAIR_KIND_PONG ? &signable.pong_data : NULL );
1134 0 : *charge_busy = 1;
1135 0 : return;
1136 0 : }
1137 :
1138 0 : if( FD_UNLIKELY( fd_inflights_should_drain( ctx->inflights, now ) ) ) {
1139 0 : ulong nonce; ulong slot; ulong shred_idx;
1140 0 : *charge_busy = 1;
1141 0 : fd_inflights_request_pop( ctx->inflights, &nonce, &slot, &shred_idx );
1142 0 : fd_forest_blk_t * blk = fd_forest_query( ctx->forest, slot );
1143 0 : if( FD_UNLIKELY( blk && !fd_forest_blk_idxs_test( blk->idxs, shred_idx ) ) ) {
1144 0 : fd_pubkey_t const * peer = fd_policy_peer_select( ctx->policy );
1145 0 : ctx->metrics->rerequest++;
1146 0 : nonce = fd_rnonce_ss_compute( ctx->repair_nonce_ss, 1, slot, (uint)shred_idx, now );
1147 0 : if( FD_UNLIKELY( !peer ) ) {
1148 : /* No peers. But we CANNOT lose this request. */
1149 : /* Add this request to the inflights table, pretend we've sent it and let the inflight timeout request it down the line. */
1150 0 : fd_hash_t hash = { .ul[0] = 0 };
1151 0 : fd_inflights_request_insert( ctx->inflights, nonce, &hash, slot, shred_idx );
1152 0 : } else {
1153 0 : fd_repair_msg_t * msg = fd_repair_shred( ctx->protocol, peer, (ulong)now/(ulong)1e6, (uint)nonce, slot, shred_idx );
1154 0 : fd_repair_send_sign_request( ctx, sign_out, msg, NULL );
1155 0 : return;
1156 0 : }
1157 0 : }
1158 0 : }
1159 :
1160 0 : if( FD_UNLIKELY( fd_inflights_outstanding_free( ctx->inflights ) <= fd_signs_map_key_cnt( ctx->signs_map ) ) ) return; /* no new requests allowed */
1161 :
1162 0 : fd_repair_msg_t const * cout = fd_policy_next( ctx->policy, ctx->forest, ctx->protocol, now, ctx->metrics->current_slot, charge_busy );
1163 0 : if( FD_UNLIKELY( !cout ) ) return;
1164 0 : fd_repair_send_sign_request( ctx, sign_out, cout, NULL );
1165 0 : }
1166 :
1167 : static void
1168 0 : signs_queue_update_identity( ctx_t * ctx ) {
1169 0 : ulong queue_cnt = fd_signs_queue_cnt( ctx->pong_queue );
1170 0 : for( ulong i=0UL; i<queue_cnt; i++ ) {
1171 0 : sign_pending_t signable = fd_signs_queue_pop( ctx->pong_queue );
1172 0 : switch( signable.msg.kind ) {
1173 0 : case FD_REPAIR_KIND_PONG:
1174 0 : memcpy( signable.msg.pong.from.uc, ctx->identity_public_key.uc, sizeof(fd_pubkey_t) );
1175 0 : break;
1176 0 : case FD_REPAIR_KIND_SHRED:
1177 0 : memcpy( signable.msg.shred.from.uc, ctx->identity_public_key.uc, sizeof(fd_pubkey_t) );
1178 0 : break;
1179 0 : case FD_REPAIR_KIND_HIGHEST_SHRED:
1180 0 : memcpy( signable.msg.highest_shred.from.uc, ctx->identity_public_key.uc, sizeof(fd_pubkey_t) );
1181 0 : break;
1182 0 : case FD_REPAIR_KIND_ORPHAN:
1183 0 : memcpy( signable.msg.orphan.from.uc, ctx->identity_public_key.uc, sizeof(fd_pubkey_t) );
1184 0 : break;
1185 0 : default:
1186 0 : FD_LOG_CRIT(( "Unhandled repair kind %u", signable.msg.kind ));
1187 0 : break;
1188 0 : }
1189 0 : fd_signs_queue_push( ctx->pong_queue, signable );
1190 0 : }
1191 0 : }
1192 :
1193 : static inline void
1194 0 : during_housekeeping( ctx_t * ctx ) {
1195 : # if DEBUG_LOGGING
1196 : long now = fd_log_wallclock();
1197 : if( FD_UNLIKELY( now - ctx->tsdebug > (long)10e9 ) ) {
1198 : fd_forest_print( ctx->forest );
1199 : ctx->tsdebug = fd_log_wallclock();
1200 : }
1201 : # endif
1202 :
1203 0 : if( FD_UNLIKELY( fd_keyswitch_state_query( ctx->keyswitch )==FD_KEYSWITCH_STATE_UNHALT_PENDING ) ) {
1204 0 : FD_LOG_DEBUG(( "keyswitch: unhalting" ));
1205 0 : FD_CRIT( ctx->halt_signing, "state machine corruption" );
1206 0 : ctx->halt_signing = 0;
1207 0 : fd_keyswitch_state( ctx->keyswitch, FD_KEYSWITCH_STATE_COMPLETED );
1208 0 : }
1209 :
1210 0 : if( FD_UNLIKELY( fd_keyswitch_state_query( ctx->keyswitch )==FD_KEYSWITCH_STATE_SWITCH_PENDING ) ) {
1211 :
1212 0 : if( !ctx->halt_signing ) {
1213 : /* At this point, stop sending new sign requests to the sign tile
1214 : and wait for all outstanding sign requests to be received back
1215 : from the sign tile. We also need to update any pending
1216 : outgoing sign requests with the new identity key. */
1217 0 : FD_LOG_DEBUG(( "keyswitch: halting signing" ));
1218 0 : ctx->halt_signing = 1;
1219 0 : memcpy( ctx->identity_public_key.uc, ctx->keyswitch->bytes, 32UL );
1220 0 : ctx->protocol->identity_key = ctx->identity_public_key;
1221 0 : signs_queue_update_identity( ctx );
1222 0 : }
1223 :
1224 0 : if( fd_signs_map_key_cnt( ctx->signs_map )==0UL ) {
1225 : /* Once there are no more in flight sign requests, we are ready to
1226 : say that the keyswitch is completed. */
1227 0 : FD_LOG_DEBUG(( "keyswitch: completed, no more outstanding stale sign requests" ));
1228 0 : fd_keyswitch_state( ctx->keyswitch, FD_KEYSWITCH_STATE_COMPLETED );
1229 0 : }
1230 0 : }
1231 0 : }
1232 :
1233 : static void
1234 : privileged_init( fd_topo_t * topo,
1235 0 : fd_topo_tile_t * tile ) {
1236 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1237 :
1238 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1239 0 : ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(ctx_t), sizeof(ctx_t) );
1240 0 : fd_memset( ctx, 0, sizeof(ctx_t) );
1241 :
1242 0 : uchar const * identity_key = fd_keyload_load( tile->repair.identity_key_path, /* pubkey only: */ 1 );
1243 0 : fd_memcpy( ctx->identity_public_key.uc, identity_key, sizeof(fd_pubkey_t) );
1244 :
1245 0 : FD_TEST( fd_rng_secure( &ctx->repair_seed, sizeof(ulong) ) );
1246 :
1247 0 : FD_LOG_DEBUG(( "Generating rnonce_ss" ));
1248 0 : ulong rnonce_ss_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "rnonce_ss" );
1249 0 : FD_TEST( rnonce_ss_id!=ULONG_MAX );
1250 0 : void * shared_rnonce = fd_topo_obj_laddr( topo, rnonce_ss_id );
1251 0 : ulong * nonce_initialized = (ulong *)(sizeof(fd_rnonce_ss_t)+(uchar *)shared_rnonce);
1252 0 : FD_TEST( fd_rng_secure( shared_rnonce, sizeof(fd_rnonce_ss_t) ) );
1253 0 : memcpy( ctx->repair_nonce_ss, shared_rnonce, sizeof(fd_rnonce_ss_t) );
1254 0 : FD_COMPILER_MFENCE();
1255 0 : FD_VOLATILE( *nonce_initialized ) = 1UL;
1256 0 : }
1257 :
1258 : static void
1259 : unprivileged_init( fd_topo_t * topo,
1260 0 : fd_topo_tile_t * tile ) {
1261 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1262 :
1263 0 : ulong total_sign_depth = tile->repair.repair_sign_depth * tile->repair.repair_sign_cnt;
1264 0 : int lg_sign_depth = fd_ulong_find_msb( fd_ulong_pow2_up(total_sign_depth) ) + 1;
1265 :
1266 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1267 0 : ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(ctx_t), sizeof(ctx_t) );
1268 0 : ctx->protocol = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_align(), fd_repair_footprint() );
1269 0 : ctx->forest = FD_SCRATCH_ALLOC_APPEND( l, fd_forest_align(), fd_forest_footprint( tile->repair.slot_max ) );
1270 0 : ctx->policy = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_align(), fd_policy_footprint( FD_DEDUP_CACHE_MAX, FD_REPAIR_PEER_MAX ) );
1271 0 : ctx->inflights = FD_SCRATCH_ALLOC_APPEND( l, fd_inflights_align(), fd_inflights_footprint() );
1272 0 : ctx->signs_map = FD_SCRATCH_ALLOC_APPEND( l, fd_signs_map_align(), fd_signs_map_footprint( lg_sign_depth ) );
1273 0 : ctx->pong_queue = FD_SCRATCH_ALLOC_APPEND( l, fd_signs_queue_align(), fd_signs_queue_footprint() );
1274 0 : ctx->slot_metrics = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_metrics_align(), fd_repair_metrics_footprint() );
1275 0 : FD_TEST( FD_SCRATCH_ALLOC_FINI( l, scratch_align() ) == (ulong)scratch + scratch_footprint( tile ) );
1276 :
1277 0 : ctx->protocol = fd_repair_join ( fd_repair_new ( ctx->protocol, &ctx->identity_public_key ) );
1278 0 : ctx->forest = fd_forest_join ( fd_forest_new ( ctx->forest, tile->repair.slot_max, ctx->repair_seed ) );
1279 0 : ctx->policy = fd_policy_join ( fd_policy_new ( ctx->policy, FD_DEDUP_CACHE_MAX, FD_REPAIR_PEER_MAX, ctx->repair_seed, ctx->repair_nonce_ss ) );
1280 0 : ctx->inflights = fd_inflights_join ( fd_inflights_new ( ctx->inflights, ctx->repair_seed+1234UL ) );
1281 0 : ctx->signs_map = fd_signs_map_join ( fd_signs_map_new ( ctx->signs_map, lg_sign_depth, 0UL ) );
1282 0 : ctx->pong_queue = fd_signs_queue_join ( fd_signs_queue_new ( ctx->pong_queue ) );
1283 0 : ctx->slot_metrics = fd_repair_metrics_join( fd_repair_metrics_new( ctx->slot_metrics ) );
1284 :
1285 0 : ctx->keyswitch = fd_keyswitch_join( fd_topo_obj_laddr( topo, tile->id_keyswitch_obj_id ) );
1286 0 : FD_TEST( ctx->keyswitch );
1287 :
1288 0 : ctx->halt_signing = 0;
1289 :
1290 : /* Process in links */
1291 :
1292 0 : if( FD_UNLIKELY( tile->in_cnt > MAX_IN_LINKS ) ) FD_LOG_ERR(( "repair tile has too many input links" ));
1293 :
1294 0 : uint sign_repair_in_idx[ MAX_SIGN_TILE_CNT ] = {0};
1295 0 : uint sign_repair_idx = 0;
1296 0 : ulong sign_link_depth = 0;
1297 :
1298 0 : for( uint in_idx=0U; in_idx<(tile->in_cnt); in_idx++ ) {
1299 0 : fd_topo_link_t * link = &topo->links[ tile->in_link_id[ in_idx ] ];
1300 0 : if( 0==strcmp( link->name, "net_repair" ) ) {
1301 0 : ctx->in_kind[ in_idx ] = IN_KIND_NET;
1302 0 : fd_net_rx_bounds_init( &ctx->in_links[ in_idx ].net_rx, link->dcache );
1303 0 : continue;
1304 0 : } else if( 0==strcmp( link->name, "sign_repair" ) ) {
1305 0 : ctx->in_kind[ in_idx ] = IN_KIND_SIGN;
1306 0 : sign_repair_in_idx[ sign_repair_idx++ ] = in_idx;
1307 0 : sign_link_depth = link->depth;
1308 0 : }
1309 0 : else if( 0==strcmp( link->name, "gossip_out" ) ) ctx->in_kind[ in_idx ] = IN_KIND_GOSSIP;
1310 0 : else if( 0==strcmp( link->name, "tower_out" ) ) ctx->in_kind[ in_idx ] = IN_KIND_TOWER;
1311 0 : else if( 0==strcmp( link->name, "shred_out" ) ) ctx->in_kind[ in_idx ] = IN_KIND_SHRED;
1312 0 : else if( 0==strcmp( link->name, "snapin_manif" ) ) ctx->in_kind[ in_idx ] = IN_KIND_SNAP;
1313 0 : else if( 0==strcmp( link->name, "genesi_out" ) ) ctx->in_kind[ in_idx ] = IN_KIND_GENESIS;
1314 0 : else if( 0==strcmp( link->name, "replay_out" ) ) ctx->in_kind[ in_idx ] = IN_KIND_REPLAY;
1315 0 : else FD_LOG_ERR(( "repair tile has unexpected input link %s", link->name ));
1316 :
1317 0 : ctx->in_links[ in_idx ].mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1318 0 : ctx->in_links[ in_idx ].chunk0 = fd_dcache_compact_chunk0( ctx->in_links[ in_idx ].mem, link->dcache );
1319 0 : ctx->in_links[ in_idx ].wmark = fd_dcache_compact_wmark ( ctx->in_links[ in_idx ].mem, link->dcache, link->mtu );
1320 0 : ctx->in_links[ in_idx ].mtu = link->mtu;
1321 :
1322 0 : FD_TEST( fd_dcache_compact_is_safe( ctx->in_links[in_idx].mem, link->dcache, link->mtu, link->depth ) );
1323 0 : }
1324 :
1325 0 : ctx->net_out_ctx->idx = UINT_MAX;
1326 0 : ctx->repair_out_ctx->idx = UINT_MAX;
1327 0 : ctx->repair_sign_cnt = 0;
1328 0 : ctx->sign_rrobin_idx = 0;
1329 :
1330 0 : for( uint out_idx=0U; out_idx<(tile->out_cnt); out_idx++ ) {
1331 0 : fd_topo_link_t * link = &topo->links[ tile->out_link_id[ out_idx ] ];
1332 :
1333 0 : if( 0==strcmp( link->name, "repair_net" ) ) {
1334 :
1335 0 : if( ctx->net_out_ctx->idx!=UINT_MAX ) continue; /* only use first net link */
1336 0 : ctx->net_out_ctx->idx = out_idx;
1337 0 : ctx->net_out_ctx->mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1338 0 : ctx->net_out_ctx->chunk0 = fd_dcache_compact_chunk0( ctx->net_out_ctx->mem, link->dcache );
1339 0 : ctx->net_out_ctx->wmark = fd_dcache_compact_wmark( ctx->net_out_ctx->mem, link->dcache, link->mtu );
1340 0 : ctx->net_out_ctx->chunk = ctx->net_out_ctx->chunk0;
1341 :
1342 0 : } else if( 0==strcmp( link->name, "repair_out" ) ) {
1343 :
1344 0 : out_ctx_t * replay_out = ctx->repair_out_ctx;
1345 0 : replay_out->idx = out_idx;
1346 0 : replay_out->mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1347 0 : replay_out->chunk0 = fd_dcache_compact_chunk0( replay_out->mem, link->dcache );
1348 0 : replay_out->wmark = fd_dcache_compact_wmark( replay_out->mem, link->dcache, link->mtu );
1349 0 : replay_out->chunk = replay_out->chunk0;
1350 :
1351 0 : } else if( 0==strcmp( link->name, "repair_sign" ) ) {
1352 :
1353 0 : out_ctx_t * repair_sign_out = &ctx->repair_sign_out_ctx[ ctx->repair_sign_cnt ];
1354 0 : repair_sign_out->idx = out_idx;
1355 0 : repair_sign_out->mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1356 0 : repair_sign_out->chunk0 = fd_dcache_compact_chunk0( repair_sign_out->mem, link->dcache );
1357 0 : repair_sign_out->wmark = fd_dcache_compact_wmark( repair_sign_out->mem, link->dcache, link->mtu );
1358 0 : repair_sign_out->chunk = repair_sign_out->chunk0;
1359 0 : repair_sign_out->in_idx = sign_repair_in_idx[ ctx->repair_sign_cnt++ ]; /* match to the sign_repair input link */
1360 0 : repair_sign_out->max_credits = sign_link_depth;
1361 0 : repair_sign_out->credits = sign_link_depth;
1362 :
1363 0 : } else {
1364 0 : FD_LOG_ERR(( "repair tile has unexpected output link %s", link->name ));
1365 0 : }
1366 0 : }
1367 0 : FD_TEST( ctx->net_out_ctx->idx!=UINT_MAX );
1368 0 : FD_TEST( ctx->repair_out_ctx->idx!=UINT_MAX );
1369 0 : if( FD_UNLIKELY( ctx->repair_sign_cnt!=sign_repair_idx ) ) {
1370 0 : FD_LOG_ERR(( "Mismatch between repair_sign output links (%lu) and sign_repair input links (%u)", ctx->repair_sign_cnt, sign_repair_idx ));
1371 0 : }
1372 0 : if( FD_UNLIKELY( fd_signs_map_key_max( ctx->signs_map ) < tile->repair.repair_sign_depth * tile->repair.repair_sign_cnt ) ) {
1373 0 : FD_LOG_ERR(( "Repair pending signs tracking map is too small: %lu < %lu.", fd_signs_map_key_max( ctx->signs_map ), tile->repair.repair_sign_depth * tile->repair.repair_sign_cnt ));
1374 0 : }
1375 :
1376 0 : ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
1377 0 : ctx->repair_intake_addr.port = fd_ushort_bswap( tile->repair.repair_intake_listen_port );
1378 0 : ctx->repair_serve_addr.port = fd_ushort_bswap( tile->repair.repair_serve_listen_port );
1379 :
1380 : /* TODO clean these up */
1381 0 : ctx->net_id = (ushort)0;
1382 0 : fd_ip4_udp_hdr_init( ctx->intake_hdr, 0, 0, tile->repair.repair_intake_listen_port );
1383 0 : fd_ip4_udp_hdr_init( ctx->serve_hdr, 0, 0, tile->repair.repair_serve_listen_port );
1384 :
1385 : /* Repair set up */
1386 :
1387 0 : ctx->turbine_slot0 = ULONG_MAX;
1388 0 : FD_LOG_INFO(( "repair my addr - intake addr: " FD_IP4_ADDR_FMT ":%u, serve_addr: " FD_IP4_ADDR_FMT ":%u",
1389 0 : FD_IP4_ADDR_FMT_ARGS( ctx->repair_intake_addr.addr ), fd_ushort_bswap( ctx->repair_intake_addr.port ),
1390 0 : FD_IP4_ADDR_FMT_ARGS( ctx->repair_serve_addr.addr ), fd_ushort_bswap( ctx->repair_serve_addr.port ) ));
1391 :
1392 0 : memset( ctx->metrics, 0, sizeof(ctx->metrics) );
1393 :
1394 0 : fd_histf_join( fd_histf_new( ctx->metrics->slot_compl_time, FD_MHIST_SECONDS_MIN( REPAIR, SLOT_COMPLETE_TIME ),
1395 0 : FD_MHIST_SECONDS_MAX( REPAIR, SLOT_COMPLETE_TIME ) ) );
1396 0 : fd_histf_join( fd_histf_new( ctx->metrics->response_latency, FD_MHIST_MIN( REPAIR, RESPONSE_LATENCY ),
1397 0 : FD_MHIST_MAX( REPAIR, RESPONSE_LATENCY ) ) );
1398 :
1399 0 : ctx->tsdebug = fd_log_wallclock();
1400 0 : ctx->pending_key_next = 0;
1401 0 : }
1402 :
1403 : static ulong
1404 : populate_allowed_seccomp( fd_topo_t const * topo FD_PARAM_UNUSED,
1405 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
1406 : ulong out_cnt,
1407 0 : struct sock_filter * out ) {
1408 0 : populate_sock_filter_policy_fd_repair_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
1409 0 : return sock_filter_policy_fd_repair_tile_instr_cnt;
1410 0 : }
1411 :
1412 : static ulong
1413 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
1414 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
1415 : ulong out_fds_cnt,
1416 0 : int * out_fds ) {
1417 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
1418 :
1419 0 : ulong out_cnt = 0UL;
1420 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
1421 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
1422 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
1423 0 : return out_cnt;
1424 0 : }
1425 :
1426 : static inline void
1427 0 : metrics_write( ctx_t * ctx ) {
1428 0 : FD_MCNT_SET( REPAIR, CURRENT_SLOT, ctx->metrics->current_slot );
1429 0 : FD_MCNT_SET( REPAIR, REPAIRED_SLOTS, ctx->metrics->repaired_slots );
1430 0 : FD_MCNT_SET( REPAIR, OLD_SHRED, ctx->metrics->old_shred );
1431 0 : FD_MCNT_SET( REPAIR, REQUEST_PEERS, fd_policy_peer_pool_used( ctx->policy->peers.pool ) );
1432 0 : FD_MCNT_SET( REPAIR, SIGN_TILE_UNAVAIL, ctx->metrics->sign_tile_unavail );
1433 0 : FD_MCNT_SET( REPAIR, REREQUEST_QUEUE, ctx->metrics->rerequest );
1434 :
1435 0 : FD_MGAUGE_SET( REPAIR, LAST_REQUESTED_SLOT, ctx->metrics->last_requested_slot );
1436 0 : FD_MGAUGE_SET( REPAIR, LAST_REQUESTED_ORPHAN, ctx->metrics->last_requested_orphan );
1437 0 : FD_MGAUGE_SET( REPAIR, INFLIGHT_REQUESTS, fd_inflight_pool_used( ctx->inflights->pool ) - ctx->inflights->popped_cnt );
1438 :
1439 0 : FD_MCNT_SET ( REPAIR, TOTAL_PKT_COUNT, ctx->metrics->send_pkt_cnt );
1440 0 : FD_MCNT_ENUM_COPY( REPAIR, SENT_PKT_TYPES, ctx->metrics->sent_pkt_types );
1441 :
1442 0 : FD_MHIST_COPY( REPAIR, SLOT_COMPLETE_TIME, ctx->metrics->slot_compl_time );
1443 0 : FD_MHIST_COPY( REPAIR, RESPONSE_LATENCY, ctx->metrics->response_latency );
1444 :
1445 0 : FD_MCNT_SET ( REPAIR, BLK_EVICTED, ctx->metrics->blk_evicted );
1446 0 : FD_MCNT_SET ( REPAIR, BLK_FAILED_INSERT, ctx->metrics->blk_failed_insert );
1447 0 : FD_MGAUGE_SET( REPAIR, SLOT_EVICTED, ctx->metrics->slot_evicted );
1448 0 : FD_MGAUGE_SET( REPAIR, SLOT_EVICTED_BY, ctx->metrics->slot_evicted_by );
1449 0 : FD_MGAUGE_SET( REPAIR, SLOT_FAILED_INSERT, ctx->metrics->slot_failed_insert );
1450 :
1451 0 : FD_MCNT_SET ( REPAIR, FAILED_CHAIN_VERIFY_CNT, ctx->metrics->failed_chain_verify_cnt );
1452 0 : FD_MGAUGE_SET( REPAIR, FAILED_CHAIN_VERIFY_SLOT, ctx->metrics->failed_chain_verify_slot );
1453 :
1454 0 : FD_MCNT_SET( REPAIR, UNKNOWN_PEER_PING, ctx->metrics->unknown_peer_ping );
1455 0 : FD_MCNT_SET( REPAIR, MALFORMED_PING, ctx->metrics->malformed_ping );
1456 0 : FD_MCNT_SET( REPAIR, FAILED_SIGVERIFY_PING, ctx->metrics->fail_sigverify_ping );
1457 0 : }
1458 :
1459 : #undef DEBUG_LOGGING
1460 :
1461 : /* At most one sign request is made in after_credit. Then at most one
1462 : message is published in after_frag. */
1463 0 : #define STEM_BURST (2UL)
1464 :
1465 : /* Set LAZY to a reasonable value that keeps housekeeping time low.
1466 : Repair tile's only reliable consumer is replay. */
1467 0 : #define STEM_LAZY (64000)
1468 :
1469 0 : #define STEM_CALLBACK_CONTEXT_TYPE ctx_t
1470 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(ctx_t)
1471 :
1472 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
1473 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
1474 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
1475 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
1476 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
1477 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
1478 :
1479 : #include "../../disco/stem/fd_stem.c"
1480 :
1481 : fd_topo_run_tile_t fd_tile_repair = {
1482 : .name = "repair",
1483 : .loose_footprint = loose_footprint,
1484 : .populate_allowed_seccomp = populate_allowed_seccomp,
1485 : .populate_allowed_fds = populate_allowed_fds,
1486 : .scratch_align = scratch_align,
1487 : .scratch_footprint = scratch_footprint,
1488 : .unprivileged_init = unprivileged_init,
1489 : .privileged_init = privileged_init,
1490 : .run = stem_run,
1491 : };
|