Line data Source code
1 : /* REQUEST HANDLING ARCHITECTURE
2 : =========================================
3 :
4 : The repair tile implements two distinct request handling patterns
5 : based on the nature of the operation and its latency requirements:
6 :
7 : 1. SYNCHRONOUS REQUEST HANDLING
8 : -----------------------------------------
9 : Used for lightweight protocol messages that require immediate
10 : signing and response. These operations use the keyguard client for
11 : direct signing, which requires blocking.
12 :
13 : Message types handled synchronously:
14 : - PINGs & PONGs: Handles peer connectivity and liveness with simple
15 : round-trip messages.
16 :
17 : - PEER WARM UPs: On receiving peer information in
18 : handle_new_cluster_contact_info, we prepay the RTT cost by sending
19 : a placeholder Repair request immediately.
20 :
21 : 2. ASYNCHRONOUS REQUEST HANDLING
22 : --------------------------------
23 : Used strictly for repair requests. These requests are sent to the
24 : sign tile, and the repair tile continues handling other operations
25 : without blocking. Once the sign tile has signed the request, the
26 : repair tile will complete the request from its pending sign request
27 : deque and send the response.
28 :
29 : Message types handled asynchronously:
30 : - WINDOW_INDEX (exact shred): Requests for a specific shred at a
31 : known slot and index. Used when the repair tile knows exactly
32 : which shred is missing from a FEC set.
33 :
34 : - HIGHEST_WINDOW_INDEX: Requests for the highest shred in a slot.
35 : Used to determine the end boundary of a slot when the exact count
36 : is unknown.
37 :
38 : - ORPHAN: Requests for the highest shred in the parent slot of an
39 : orphaned slot. Used to establish the chain of slot ancestry when a
40 : slot's parent is missing.
41 :
42 : Async requests can be distributed across multiple sign tiles using
43 : round-robin based on the request nonce. This provides load balancing
44 : and prevents any single sign tile from becoming a bottleneck. */
45 :
46 : #define _GNU_SOURCE
47 :
48 : #include "../../disco/topo/fd_topo.h"
49 : #include "generated/fd_repair_tile_seccomp.h"
50 :
51 : #include "../tower/fd_tower_tile.h"
52 : #include "../../flamenco/repair/fd_repair.h"
53 : #include "../../flamenco/leaders/fd_leaders_base.h"
54 : #include "../../flamenco/gossip/fd_gossip_types.h"
55 : #include "../../disco/fd_disco.h"
56 : #include "../../disco/keyguard/fd_keyload.h"
57 : #include "../../disco/keyguard/fd_keyguard_client.h"
58 : #include "../../disco/keyguard/fd_keyguard.h"
59 : #include "../../disco/net/fd_net_tile.h"
60 : #include "../../disco/store/fd_store.h"
61 : #include "../../discof/restore/utils/fd_ssmsg.h"
62 : #include "../../ballet/sha256/fd_sha256.h"
63 : #include "../../util/pod/fd_pod_format.h"
64 : #include "../../util/net/fd_net_headers.h"
65 : #include "../../tango/fd_tango_base.h"
66 :
67 : #include "../forest/fd_forest.h"
68 : #include "../reasm/fd_reasm.h"
69 : #include "../../flamenco/repair/fd_catchup.h"
70 :
71 : #define LOGGING 1
72 : #define DEBUG_LOGGING 0
73 :
74 : #define IN_KIND_CONTACT (0)
75 0 : #define IN_KIND_NET (1)
76 0 : #define IN_KIND_TOWER (2)
77 0 : #define IN_KIND_SHRED (3)
78 0 : #define IN_KIND_SIGN (4)
79 0 : #define IN_KIND_SNAP (5)
80 0 : #define IN_KIND_STAKE (6)
81 0 : #define IN_KIND_GOSSIP (7)
82 0 : #define IN_KIND_GENESIS (8)
83 :
84 : #define MAX_IN_LINKS (16)
85 :
86 : #define MAX_REPAIR_PEERS 40200UL
87 : #define MAX_BUFFER_SIZE ( MAX_REPAIR_PEERS * sizeof( fd_shred_dest_wire_t ) )
88 : #define MAX_SHRED_TILE_CNT ( 16UL )
89 :
90 : typedef union {
91 : struct {
92 : fd_wksp_t * mem;
93 : ulong chunk0;
94 : ulong wmark;
95 : ulong mtu;
96 : };
97 : fd_net_rx_bounds_t net_rx;
98 : } fd_repair_in_ctx_t;
99 :
100 : struct fd_repair_out_ctx {
101 : ulong idx;
102 : fd_wksp_t * mem;
103 : ulong chunk0;
104 : ulong wmark;
105 : ulong chunk;
106 : ulong in_idx; /* Index of the incoming link */
107 : ulong credits; /* Available credits for this sign tile */
108 : ulong max_credits; /* Maximum credits (depth) */
109 : };
110 : typedef struct fd_repair_out_ctx fd_repair_out_ctx_t;
111 :
112 : struct fd_fec_sig {
113 : ulong key; /* map key. 32 msb = slot, 32 lsb = fec_set_idx */
114 : fd_ed25519_sig_t sig; /* Ed25519 sig identifier of the FEC. */
115 : };
116 : typedef struct fd_fec_sig fd_fec_sig_t;
117 :
118 : #define MAP_NAME fd_fec_sig
119 0 : #define MAP_T fd_fec_sig_t
120 : #define MAP_MEMOIZE 0
121 : #include "../../util/tmpl/fd_map_dynamic.c"
122 :
123 : struct fd_repair_tile_ctx {
124 : long tsprint; /* timestamp for printing */
125 : long tsrepair; /* timestamp for repair */
126 : long tsreset; /* timestamp for resetting iterator */
127 :
128 : fd_repair_t * repair;
129 : fd_repair_config_t repair_config;
130 :
131 : ulong repair_seed;
132 :
133 : fd_ip4_port_t repair_intake_addr;
134 : fd_ip4_port_t repair_serve_addr;
135 :
136 : ushort repair_intake_listen_port;
137 : ushort repair_serve_listen_port;
138 :
139 : fd_forest_t * forest;
140 : fd_fec_sig_t * fec_sigs;
141 : fd_reasm_t * reasm;
142 : fd_forest_iter_t repair_iter;
143 : fd_store_t * store;
144 :
145 : uchar identity_private_key[ 32 ];
146 : fd_pubkey_t identity_public_key;
147 :
148 : fd_wksp_t * wksp;
149 :
150 : fd_stem_context_t * stem;
151 :
152 : uchar in_kind[ MAX_IN_LINKS ];
153 : fd_repair_in_ctx_t in_links[ MAX_IN_LINKS ];
154 :
155 : int skip_frag;
156 :
157 : uint net_out_idx;
158 : fd_wksp_t * net_out_mem;
159 : ulong net_out_chunk0;
160 : ulong net_out_wmark;
161 : ulong net_out_chunk;
162 :
163 : ulong replay_out_idx;
164 : fd_wksp_t * replay_out_mem;
165 : ulong replay_out_chunk0;
166 : ulong replay_out_wmark;
167 : ulong replay_out_chunk;
168 :
169 : ulong snap_out_chunk;
170 :
171 : /* These will only be used if shredcap is enabled */
172 : uint shredcap_out_idx;
173 : uint shredcap_enabled;
174 : fd_wksp_t * shredcap_out_mem;
175 : ulong shredcap_out_chunk0;
176 : ulong shredcap_out_wmark;
177 : ulong shredcap_out_chunk;
178 :
179 : uint shred_tile_cnt;
180 : fd_repair_out_ctx_t shred_out_ctx[ MAX_SHRED_TILE_CNT ];
181 :
182 : /* ping_sign link (to sign tile 0) - used for keyguard client */
183 : ulong ping_sign_in_idx;
184 :
185 : ulong ping_sign_out_idx;
186 : fd_wksp_t * ping_sign_out_mem;
187 : ulong ping_sign_out_chunk0;
188 : ulong ping_sign_out_wmark;
189 : ulong ping_sign_out_chunk;
190 :
191 : /* repair_sign links (to sign tiles 1+) - for round-robin distribution */
192 : ulong repair_sign_cnt;
193 : fd_repair_out_ctx_t repair_sign_out_ctx[ MAX_SHRED_TILE_CNT ];
194 : ulong sign_repair_in_cnt;
195 : ulong sign_repair_in_idx[ MAX_SHRED_TILE_CNT ];
196 : ulong sign_repair_in_depth[ MAX_SHRED_TILE_CNT ];
197 :
198 : ulong round_robin_idx;
199 :
200 : /* Request sequence tracking for async signing */
201 : ulong request_seq;
202 :
203 : ushort net_id;
204 : /* Includes Ethernet, IP, UDP headers */
205 : uchar buffer[ MAX_BUFFER_SIZE ];
206 : fd_ip4_udp_hdrs_t intake_hdr[1];
207 : fd_ip4_udp_hdrs_t serve_hdr [1];
208 :
209 : fd_keyguard_client_t keyguard_client[1];
210 :
211 : ulong manifest_slot;
212 : ulong turbine_slot;
213 :
214 : struct {
215 : ulong recv_clnt_pkt;
216 : ulong recv_serv_pkt;
217 : ulong recv_serv_corrupt_pkt;
218 : ulong recv_serv_invalid_signature;
219 : ulong recv_serv_full_ping_table;
220 : ulong recv_serv_pkt_types[FD_METRICS_ENUM_REPAIR_SERV_PKT_TYPES_CNT];
221 : ulong recv_pkt_corrupted_msg;
222 : ulong send_pkt_cnt;
223 : ulong sent_pkt_types[FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_CNT];
224 : ulong repaired_slots;
225 : fd_histf_t store_link_wait[ 1 ];
226 : fd_histf_t store_link_work[ 1 ];
227 : fd_histf_t slot_compl_time[ 1 ];
228 : fd_histf_t response_latency[ 1 ];
229 : } metrics[ 1 ];
230 :
231 : /* Catchup metrics */
232 : fd_catchup_t * catchup;
233 :
234 : ulong turbine_slot0; // catchup considered complete after this slot
235 : };
236 : typedef struct fd_repair_tile_ctx fd_repair_tile_ctx_t;
237 :
238 : FD_FN_CONST static inline ulong
239 0 : scratch_align( void ) {
240 0 : return 128UL;
241 0 : }
242 :
243 : FD_FN_PURE static inline ulong
244 0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
245 0 : return 1UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
246 0 : }
247 :
248 : FD_FN_PURE static inline ulong
249 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
250 :
251 0 : ulong l = FD_LAYOUT_INIT;
252 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_repair_tile_ctx_t), sizeof(fd_repair_tile_ctx_t) );
253 0 : l = FD_LAYOUT_APPEND( l, fd_repair_align(), fd_repair_footprint() );
254 0 : l = FD_LAYOUT_APPEND( l, fd_forest_align(), fd_forest_footprint ( tile->repair.slot_max ) );
255 0 : l = FD_LAYOUT_APPEND( l, fd_fec_sig_align(), fd_fec_sig_footprint ( 20 ) );
256 0 : l = FD_LAYOUT_APPEND( l, fd_reasm_align(), fd_reasm_footprint ( 1 << 20 ) );
257 0 : l = FD_LAYOUT_APPEND( l, fd_catchup_align(), fd_catchup_footprint() );
258 0 : l = FD_LAYOUT_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint ( FD_REPAIR_SCRATCH_MAX ) );
259 0 : l = FD_LAYOUT_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint ( FD_REPAIR_SCRATCH_DEPTH ) );
260 0 : return FD_LAYOUT_FINI( l, scratch_align() );
261 0 : }
262 :
263 :
264 : /* Wrapper for keyguard client sign */
265 : static void
266 : repair_signer( void * signer_ctx,
267 : uchar signature[ static 64 ],
268 : uchar const * buffer,
269 : ulong len,
270 0 : int sign_type ) {
271 0 : fd_repair_tile_ctx_t * ctx = (fd_repair_tile_ctx_t *) signer_ctx;
272 0 : fd_keyguard_client_sign( ctx->keyguard_client, signature, buffer, len, sign_type );
273 0 : }
274 :
275 : /* Wrapper for publishing to the sign tile*/
276 : static void
277 : repair_signer_async( void * signer_ctx,
278 : ulong nonce,
279 : uchar const * buffer,
280 : ulong len,
281 : int sign_type,
282 0 : fd_repair_out_ctx_t * sign_out) {
283 0 : fd_repair_tile_ctx_t * ctx = (fd_repair_tile_ctx_t *) signer_ctx;
284 :
285 0 : uchar * dst = fd_chunk_to_laddr( sign_out->mem, sign_out->chunk );
286 0 : fd_memcpy( dst, buffer, len );
287 :
288 0 : ulong sig = ((ulong)nonce << 32) | (ulong)(uint)sign_type;
289 0 : fd_stem_publish( ctx->stem, sign_out->idx, sig, sign_out->chunk, len, 0UL, 0UL, 0UL );
290 0 : sign_out->chunk = fd_dcache_compact_next( sign_out->chunk, len, sign_out->chunk0, sign_out->wmark );
291 :
292 0 : ctx->request_seq = fd_seq_inc( ctx->request_seq, 1UL );
293 0 : }
294 :
295 : static void
296 : send_packet( fd_repair_tile_ctx_t * ctx,
297 : fd_stem_context_t * stem,
298 : int is_intake,
299 : uint dst_ip_addr,
300 : ushort dst_port,
301 : uint src_ip_addr,
302 : uchar const * payload,
303 : ulong payload_sz,
304 0 : ulong tsorig ) {
305 :
306 0 : uchar * packet = fd_chunk_to_laddr( ctx->net_out_mem, ctx->net_out_chunk );
307 0 : fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)packet;
308 0 : *hdr = *(is_intake ? ctx->intake_hdr : ctx->serve_hdr);
309 :
310 0 : fd_ip4_hdr_t * ip4 = hdr->ip4;
311 0 : ip4->saddr = src_ip_addr;
312 0 : ip4->daddr = dst_ip_addr;
313 0 : ip4->net_id = fd_ushort_bswap( ctx->net_id++ );
314 0 : ip4->check = 0U;
315 0 : ip4->net_tot_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_ip4_hdr_t)+sizeof(fd_udp_hdr_t)) );
316 0 : ip4->check = fd_ip4_hdr_check_fast( ip4 );
317 :
318 0 : fd_udp_hdr_t * udp = hdr->udp;
319 0 : udp->net_dport = dst_port;
320 0 : udp->net_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_udp_hdr_t)) );
321 0 : fd_memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), payload, payload_sz );
322 0 : hdr->udp->check = 0U;
323 :
324 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
325 0 : ulong sig = fd_disco_netmux_sig( dst_ip_addr, dst_port, dst_ip_addr, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) );
326 0 : ulong packet_sz = payload_sz + sizeof(fd_ip4_udp_hdrs_t);
327 0 : ulong chunk = ctx->net_out_chunk;
328 0 : fd_stem_publish( stem, ctx->net_out_idx, sig, chunk, packet_sz, 0UL, tsorig, tspub );
329 0 : ctx->net_out_chunk = fd_dcache_compact_next( chunk, packet_sz, ctx->net_out_chunk0, ctx->net_out_wmark );
330 0 : }
331 :
332 : ulong
333 : fd_repair_handle_ping( fd_repair_tile_ctx_t * repair_tile_ctx,
334 : fd_repair_t * glob,
335 : fd_gossip_ping_t const * ping,
336 : fd_ip4_port_t const * peer_addr FD_PARAM_UNUSED,
337 : uint self_ip4_addr FD_PARAM_UNUSED,
338 : uchar * msg_buf,
339 0 : ulong msg_buf_sz ) {
340 0 : fd_repair_protocol_t protocol;
341 0 : fd_repair_protocol_new_disc(&protocol, fd_repair_protocol_enum_pong);
342 0 : fd_gossip_ping_t * pong = &protocol.inner.pong;
343 :
344 0 : pong->from = *glob->public_key;
345 :
346 : /* Generate response hash token */
347 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
348 0 : memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
349 0 : memcpy( pre_image+16UL, ping->token.uc, 32UL);
350 :
351 : /* Generate response hash token */
352 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, &pong->token );
353 :
354 : /* Sign it */
355 0 : repair_signer( repair_tile_ctx, pong->signature.uc, pre_image, FD_PING_PRE_IMAGE_SZ, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519 );
356 :
357 0 : fd_bincode_encode_ctx_t ctx;
358 0 : ctx.data = msg_buf;
359 0 : ctx.dataend = msg_buf + msg_buf_sz;
360 0 : FD_TEST(0 == fd_repair_protocol_encode(&protocol, &ctx));
361 0 : ulong buflen = (ulong)((uchar*)ctx.data - msg_buf);
362 0 : return buflen;
363 0 : }
364 :
365 : /* Pass a raw client response packet into the protocol. addr is the address of the sender */
366 : static int
367 : fd_repair_recv_clnt_packet( fd_repair_tile_ctx_t *repair_tile_ctx,
368 : fd_stem_context_t * stem,
369 : fd_repair_t * glob,
370 : uchar const * msg,
371 : ulong msglen,
372 : fd_ip4_port_t const * src_addr,
373 0 : uint dst_ip4_addr ) {
374 0 : repair_tile_ctx->metrics->recv_clnt_pkt++;
375 :
376 0 : FD_SCRATCH_SCOPE_BEGIN {
377 0 : while( 1 ) {
378 0 : ulong decoded_sz;
379 0 : fd_repair_response_t * gmsg = fd_bincode_decode1_scratch(
380 0 : repair_response, msg, msglen, NULL, &decoded_sz );
381 0 : if( FD_UNLIKELY( !gmsg ) ) {
382 : /* Solana falls back to assuming we got a shred in this case
383 : https://github.com/solana-labs/solana/blob/master/core/src/repair/serve_repair.rs#L1198 */
384 0 : break;
385 0 : }
386 0 : if( FD_UNLIKELY( decoded_sz != msglen ) ) {
387 0 : break;
388 0 : }
389 :
390 0 : switch( gmsg->discriminant ) {
391 0 : case fd_repair_response_enum_ping:
392 0 : {
393 0 : uchar buf[FD_REPAIR_MAX_SIGN_BUF_SIZE];
394 0 : ulong buflen = fd_repair_handle_ping( repair_tile_ctx, glob, &gmsg->inner.ping, src_addr, dst_ip4_addr, buf, sizeof(buf) );
395 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
396 0 : send_packet( repair_tile_ctx, stem, 1, src_addr->addr, src_addr->port, dst_ip4_addr, buf, buflen, tsorig );
397 0 : break;
398 0 : }
399 0 : }
400 :
401 0 : return 0;
402 0 : }
403 0 : } FD_SCRATCH_SCOPE_END;
404 0 : return 0;
405 0 : }
406 :
407 : /* Signs and prepares a repair protocol message for sending, either
408 : synchronously or asynchronously. This is responsible for encoding a
409 : repair protocol message, signing and preparing it for transmission.
410 :
411 : In synchronous mode (is_async == 0), the message is signed
412 : immediately using the keyguard client, and the signature is inserted
413 : into the message buffer before returning.
414 :
415 : In asynchronous mode (is_async != 0), the message is sent to the sign
416 : tile for signing, and the function returns after queuing the request.
417 : The actual sending will be completed once the signature is available.
418 : */
419 : static ulong
420 : fd_repair_sign_and_send( fd_repair_tile_ctx_t * repair_tile_ctx,
421 : fd_repair_protocol_t * protocol,
422 : fd_ip4_port_t * addr FD_PARAM_UNUSED,
423 : uchar * buf,
424 : ulong buflen,
425 : int is_async,
426 : ulong nonce,
427 0 : fd_repair_out_ctx_t * sign_out) {
428 :
429 0 : FD_TEST( buflen >= FD_REPAIR_MAX_SIGN_BUF_SIZE );
430 0 : fd_bincode_encode_ctx_t ctx = { .data = buf, .dataend = buf + buflen };
431 0 : if( FD_UNLIKELY( fd_repair_protocol_encode( protocol, &ctx ) != FD_BINCODE_SUCCESS ) ) {
432 0 : FD_LOG_CRIT(( "Failed to encode repair message (type %#x)", protocol->discriminant ));
433 0 : }
434 :
435 0 : buflen = (ulong)ctx.data - (ulong)buf;
436 0 : if( FD_UNLIKELY( buflen<68 ) ) {
437 0 : FD_LOG_CRIT(( "Attempted to sign unsigned repair message type (type %#x)", protocol->discriminant ));
438 0 : }
439 :
440 : /* At this point buffer contains
441 :
442 : [ discriminant ] [ signature ] [ payload ]
443 : ^ ^ ^
444 : 0 4 68 */
445 :
446 : /* https://github.com/solana-labs/solana/blob/master/core/src/repair/serve_repair.rs#L1258 */
447 :
448 0 : fd_memcpy( buf+64, buf, 4 );
449 0 : buf += 64UL;
450 0 : buflen -= 64UL;
451 :
452 : /* Now it contains
453 :
454 : [ discriminant ] [ payload ]
455 : ^ ^
456 : buf buf+4 */
457 :
458 : /* If async, we send the signing request to the sign tile */
459 0 : if( FD_LIKELY( is_async ) ) {
460 0 : repair_signer_async( repair_tile_ctx, nonce, buf, buflen, FD_KEYGUARD_SIGN_TYPE_ED25519, sign_out);
461 0 : return buflen + 64UL;
462 : /* If sync, we sign using keyguard */
463 0 : } else {
464 0 : fd_signature_t sig;
465 0 : repair_signer( repair_tile_ctx, sig.uc, buf, buflen, FD_KEYGUARD_SIGN_TYPE_ED25519 );
466 :
467 : /* Reintroduce the signature */
468 0 : buf -= 64UL;
469 0 : buflen += 64UL;
470 0 : fd_memcpy( buf + 4U, &sig, 64U );
471 :
472 0 : return buflen;
473 0 : }
474 0 : }
475 :
476 : /* Returns a sign_out context that has available credits.
477 : If no sign_out context has available credits, returns NULL. */
478 : static fd_repair_out_ctx_t *
479 0 : sign_avail_credits( fd_repair_tile_ctx_t * ctx ) {
480 0 : fd_repair_out_ctx_t * sign_out = NULL;
481 :
482 0 : for( uint i = 0; i < ctx->repair_sign_cnt; i++ ) {
483 0 : fd_repair_out_ctx_t * candidate = &ctx->repair_sign_out_ctx[ ctx->round_robin_idx ];
484 0 : ctx->round_robin_idx = (ctx->round_robin_idx + 1) % ctx->repair_sign_cnt;
485 0 : if( candidate->credits > 0 ) {
486 0 : sign_out = candidate;
487 0 : break;
488 0 : }
489 0 : }
490 :
491 0 : return sign_out;
492 0 : }
493 :
494 : static void
495 : fd_repair_send_initial_request( fd_repair_tile_ctx_t * ctx,
496 : fd_stem_context_t * stem,
497 : fd_repair_t * glob,
498 : fd_pubkey_t const * recipient,
499 0 : long now ) {
500 0 : fd_repair_protocol_t protocol;
501 0 : fd_repair_construct_request_protocol( glob, &protocol, fd_needed_window_index, 0, 0, recipient, 0, now );
502 0 : fd_active_elem_t * active = fd_active_table_query( glob->actives, recipient, NULL );
503 :
504 0 : ctx->metrics->sent_pkt_types[fd_needed_window_index]++;
505 :
506 0 : uchar buf[FD_REPAIR_MAX_SIGN_BUF_SIZE];
507 0 : ulong buflen = fd_repair_sign_and_send( ctx, &protocol, &active->addr, buf, sizeof(buf), 0, 1, NULL );
508 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
509 0 : uint src_ip4_addr = 0U; /* unknown */
510 0 : send_packet( ctx, stem, 1, active->addr.addr, active->addr.port, src_ip4_addr, buf, buflen, tsorig );
511 0 : }
512 :
513 : /* Sends a request asynchronously. If successful, adds it to the
514 : pending_sign_map and publishes to the sign tile. If not, the
515 : request is skipped for now and will be retried later by the forest
516 : iterator. */
517 : static void
518 : fd_repair_send_request_async( fd_repair_tile_ctx_t * ctx,
519 : fd_repair_t * glob,
520 : fd_repair_out_ctx_t * sign_out,
521 : enum fd_needed_elem_type type,
522 : ulong slot,
523 : uint shred_index,
524 : fd_pubkey_t const * recipient,
525 0 : long now ){
526 0 : fd_active_elem_t * peer = fd_active_table_query(glob->actives, recipient, NULL);
527 0 : if (!peer) FD_LOG_ERR(( "No active peer found for recipient %s", FD_BASE58_ENC_32_ALLOCA(recipient) ));
528 :
529 : /* Acquire and add a pending request from the pool */
530 0 : fd_repair_protocol_t protocol;
531 0 : fd_repair_pending_sign_req_t * pending = fd_repair_insert_pending_request( ctx->repair, &protocol, peer->addr.addr, peer->addr.port, type, slot, shred_index, now, recipient );
532 :
533 0 : if( FD_UNLIKELY( !pending ) ) {
534 0 : FD_LOG_WARNING(( "No free pending sign reqs" ));
535 0 : return;
536 0 : }
537 0 : ctx->metrics->sent_pkt_types[type]++;
538 : /* Sign and prepare the message directly into the pending buffer */
539 0 : pending->buflen = fd_repair_sign_and_send( ctx, &protocol, &peer->addr, pending->buf, sizeof(pending->buf), 1, pending->nonce, sign_out );
540 :
541 0 : sign_out->credits--;
542 0 : }
543 :
544 : static void
545 : fd_repair_send_requests_async( fd_repair_tile_ctx_t * ctx,
546 : fd_repair_out_ctx_t * sign_out,
547 : enum fd_needed_elem_type type,
548 : ulong slot,
549 : uint shred_index,
550 0 : long now ){
551 0 : fd_repair_t * glob = ctx->repair;
552 0 : fd_pubkey_t const * id = &glob->peers[ glob->peer_idx++ ].key;
553 0 : fd_repair_send_request_async( ctx, glob, sign_out, type, slot, shred_index, id, now );
554 0 : if( FD_UNLIKELY( glob->peer_idx >= glob->peer_cnt ) ) glob->peer_idx = 0;
555 0 : }
556 :
557 : static inline void
558 : handle_contact_info_update( fd_repair_tile_ctx_t * ctx,
559 0 : fd_gossip_update_message_t const * msg ) {
560 0 : fd_contact_info_t const * contact_info = msg->contact_info.contact_info;
561 0 : fd_ip4_port_t repair_peer = contact_info->sockets[ FD_CONTACT_INFO_SOCKET_SERVE_REPAIR ];
562 0 : if( FD_UNLIKELY( !repair_peer.addr || !repair_peer.port ) ) return;
563 0 : int dup = fd_repair_add_active_peer( ctx->repair, &repair_peer, &contact_info->pubkey );
564 0 : if( !dup ) {
565 : /* The repair process uses a Ping-Pong protocol that incurs one
566 : round-trip time (RTT) for the initial repair request. To optimize
567 : this, we proactively send a placeholder Repair request as soon as we
568 : receive a peer's contact information for the first time, effectively
569 : prepaying the RTT cost. */
570 0 : if( FD_LIKELY( ctx->repair_sign_cnt>0 ) ) {
571 0 : fd_repair_send_initial_request( ctx, ctx->stem, ctx->repair, &contact_info->pubkey, fd_log_wallclock() );
572 0 : }
573 0 : }
574 0 : }
575 :
576 : static inline void
577 : handle_contact_info_remove( fd_repair_tile_ctx_t * ctx FD_PARAM_UNUSED,
578 0 : fd_gossip_update_message_t const * msg FD_PARAM_UNUSED ) {
579 : /* TODO: implement me */
580 0 : }
581 :
582 : static inline int
583 : before_frag( fd_repair_tile_ctx_t * ctx,
584 : ulong in_idx,
585 : ulong seq FD_PARAM_UNUSED,
586 0 : ulong sig ) {
587 0 : uint in_kind = ctx->in_kind[ in_idx ];
588 0 : if( FD_LIKELY ( in_kind==IN_KIND_NET ) ) return fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR;
589 0 : if( FD_UNLIKELY( in_kind==IN_KIND_SHRED ) ) return fd_int_if( fd_forest_root_slot( ctx->forest )==ULONG_MAX, -1, 0 ); /* not ready to read frag */
590 0 : if( FD_UNLIKELY( in_kind==IN_KIND_GOSSIP ) ) {
591 0 : return sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO &&
592 0 : sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE;
593 0 : }
594 0 : return 0;
595 0 : }
596 :
597 : static void
598 : during_frag( fd_repair_tile_ctx_t * ctx,
599 : ulong in_idx,
600 : ulong seq FD_PARAM_UNUSED,
601 : ulong sig FD_PARAM_UNUSED,
602 : ulong chunk,
603 : ulong sz,
604 0 : ulong ctl ) {
605 0 : ctx->skip_frag = 0;
606 :
607 0 : uchar const * dcache_entry;
608 0 : ulong dcache_entry_sz;
609 :
610 : // TODO: check for sz>MTU for failure once MTUs are decided
611 0 : uint in_kind = ctx->in_kind[ in_idx ];
612 0 : fd_repair_in_ctx_t const * in_ctx = &ctx->in_links[ in_idx ];
613 0 : if( FD_LIKELY( in_kind==IN_KIND_NET ) ) {
614 0 : dcache_entry = fd_net_rx_translate_frag( &in_ctx->net_rx, chunk, ctl, sz );
615 0 : dcache_entry_sz = sz;
616 :
617 0 : } else if( FD_UNLIKELY( in_kind==IN_KIND_GOSSIP || in_kind==IN_KIND_SHRED ) ) {
618 0 : if( FD_UNLIKELY( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark ) ) {
619 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in_ctx->chunk0, in_ctx->wmark ));
620 0 : }
621 :
622 0 : dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
623 0 : dcache_entry_sz = sz;
624 :
625 0 : } else if( FD_UNLIKELY( in_kind==IN_KIND_TOWER ) ) {
626 0 : fd_tower_slot_done_t const * msg = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
627 0 : if( FD_LIKELY( msg->new_root ) ) {
628 0 : fd_forest_publish( ctx->forest, msg->root_slot );
629 0 : ctx->repair_iter = fd_forest_iter_init( ctx->forest );
630 0 : fd_reasm_publish( ctx->reasm, &msg->root_block_id );
631 0 : return;
632 0 : }
633 0 : return;
634 :
635 0 : } else if( FD_UNLIKELY( in_kind==IN_KIND_STAKE ) ) {
636 0 : if( FD_UNLIKELY( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark ) ) {
637 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in_ctx->chunk0, in_ctx->wmark ));
638 0 : }
639 0 : dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
640 0 : fd_stake_weight_msg_t const * msg = fd_type_pun_const( dcache_entry );
641 0 : (void)msg;
642 : //fd_repair_set_stake_weights_init( ctx->repair, msg->weights, msg->staked_cnt );
643 0 : return;
644 0 : } else if( FD_UNLIKELY( in_kind==IN_KIND_SNAP ) ) {
645 :
646 0 : if( FD_UNLIKELY( ctx->in_kind[in_idx]!=IN_KIND_SNAP || fd_ssmsg_sig_message( sig )!=FD_SSMSG_DONE ) ) ctx->snap_out_chunk = chunk;
647 0 : return;
648 0 : } else if( FD_UNLIKELY( in_kind==IN_KIND_GENESIS ) ) {
649 0 : return;
650 0 : } else if ( FD_UNLIKELY( in_kind==IN_KIND_SIGN ) ) {
651 0 : dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
652 0 : dcache_entry_sz = sz;
653 0 : } else {
654 0 : FD_LOG_ERR(( "Frag from unknown link (kind=%u in_idx=%lu)", in_kind, in_idx ));
655 0 : }
656 :
657 0 : if( FD_LIKELY( dcache_entry_sz > 0 ) ) fd_memcpy( ctx->buffer, dcache_entry, dcache_entry_sz );
658 0 : }
659 :
660 : static inline void
661 : after_frag_snap( fd_repair_tile_ctx_t * ctx,
662 : ulong sig,
663 0 : uchar const * chunk ) {
664 0 : if( FD_UNLIKELY( fd_ssmsg_sig_message( sig )!=FD_SSMSG_DONE ) ) return;
665 0 : fd_snapshot_manifest_t * manifest = (fd_snapshot_manifest_t *)chunk;
666 0 : fd_forest_init( ctx->forest, manifest->slot );
667 0 : FD_TEST( fd_forest_root_slot( ctx->forest )!=ULONG_MAX );
668 0 : fd_hash_t manifest_block_id = { .ul = { 0xf17eda2ce7b1d } }; /* FIXME manifest_block_id */
669 0 : fd_reasm_init( ctx->reasm, &manifest_block_id, manifest->slot );
670 0 : }
671 :
672 : static ulong FD_FN_UNUSED
673 : fd_repair_send_ping( fd_repair_tile_ctx_t * repair_tile_ctx,
674 : fd_repair_t * glob,
675 : fd_pinged_elem_t * val,
676 : uchar * buf,
677 0 : ulong buflen ) {
678 0 : fd_repair_response_t gmsg;
679 0 : fd_repair_response_new_disc( &gmsg, fd_repair_response_enum_ping );
680 0 : fd_gossip_ping_t * ping = &gmsg.inner.ping;
681 0 : ping->from = *glob->public_key;
682 0 :
683 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
684 0 : memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
685 0 : memcpy( pre_image+16UL, val->token.uc, 32UL );
686 0 :
687 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, &ping->token );
688 0 :
689 0 : repair_signer( repair_tile_ctx, ping->signature.uc, pre_image, FD_PING_PRE_IMAGE_SZ, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519 );
690 0 :
691 0 : fd_bincode_encode_ctx_t ctx;
692 0 : FD_TEST( buflen >= FD_REPAIR_MAX_SIGN_BUF_SIZE );
693 0 : ctx.data = buf;
694 0 : ctx.dataend = buf + buflen;
695 0 : FD_TEST(0 == fd_repair_response_encode(&gmsg, &ctx));
696 0 : return (ulong)((uchar*)ctx.data - buf);
697 0 : }
698 :
699 : static void FD_FN_UNUSED
700 0 : fd_repair_recv_pong(fd_repair_t * glob, fd_gossip_ping_t const * pong, fd_ip4_port_t const * from) {
701 0 : fd_pinged_elem_t * val = fd_pinged_table_query(glob->pinged, from, NULL);
702 0 : if( val == NULL || !fd_pubkey_eq( &val->id, &pong->from ) )
703 0 : return;
704 0 :
705 0 : /* Verify response hash token */
706 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
707 0 : memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
708 0 : memcpy( pre_image+16UL, val->token.uc, 32UL );
709 0 :
710 0 : fd_hash_t pre_image_hash;
711 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, pre_image_hash.uc );
712 0 :
713 0 : fd_sha256_t sha[1];
714 0 : fd_sha256_init( sha );
715 0 : fd_sha256_append( sha, "SOLANA_PING_PONG", 16UL );
716 0 : fd_sha256_append( sha, pre_image_hash.uc, 32UL );
717 0 : fd_hash_t golden;
718 0 : fd_sha256_fini( sha, golden.uc );
719 0 :
720 0 : fd_sha512_t sha2[1];
721 0 : if( fd_ed25519_verify( /* msg */ golden.uc,
722 0 : /* sz */ 32U,
723 0 : /* sig */ pong->signature.uc,
724 0 : /* public_key */ pong->from.uc,
725 0 : sha2 )) {
726 0 : FD_LOG_WARNING(("Failed sig verify for pong"));
727 0 : return;
728 0 : }
729 0 :
730 0 : val->good = 1;
731 0 : }
732 :
733 : static void
734 : fd_repair_handle_sign_response( fd_repair_tile_ctx_t * ctx,
735 : ulong in_idx,
736 : ulong sig,
737 0 : fd_stem_context_t * stem ) {
738 : /* Nonce was packed into sig, so we need to unpack it */
739 0 : ulong response_nonce = sig >> 32;
740 : /* Look up the pending request by nonce. Since the repair_sign links are
741 : reliable, the incoming sign_repair fragments represent a complete
742 : set of the previously sent outgoing messages. However, with
743 : multiple sign tiles, the responses may not arrive in order. */
744 :
745 : /* Find which sign tile sent this response and increment its credits */
746 0 : for( uint i = 0; i < ctx->repair_sign_cnt; i++ ) {
747 0 : if( ctx->repair_sign_out_ctx[i].in_idx == in_idx ) {
748 0 : if( ctx->repair_sign_out_ctx[i].credits < ctx->repair_sign_out_ctx[i].max_credits ) {
749 0 : ctx->repair_sign_out_ctx[i].credits++;
750 0 : }
751 0 : break;
752 0 : }
753 0 : }
754 :
755 0 : fd_repair_pending_sign_req_t * pending = fd_repair_query_pending_request( ctx->repair, response_nonce );
756 0 : if( FD_LIKELY( pending ) ) {
757 0 : fd_memcpy( pending->buf + pending->sig_offset, ctx->buffer, 64UL );
758 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
759 0 : uint src_ip4_addr = 0U;
760 0 : ctx->metrics->send_pkt_cnt++;
761 0 : fd_active_elem_t * active = fd_active_table_query( ctx->repair->actives, &pending->recipient, NULL );
762 0 : if( FD_LIKELY( active ) ) {
763 0 : active->req_cnt++;
764 0 : active->last_req_ts = fd_tickcount();
765 0 : if( FD_UNLIKELY( active->first_req_ts == 0 ) ) active->first_req_ts = active->last_req_ts;
766 0 : }
767 0 : send_packet( ctx, stem, 1, pending->dst_ip_addr, pending->dst_port, src_ip4_addr, pending->buf, pending->buflen, tsorig );
768 :
769 0 : fd_repair_remove_pending_request( ctx->repair, response_nonce );
770 0 : return;
771 0 : } else {
772 0 : FD_LOG_ERR(( "No pending request found for nonce %lu", response_nonce ));
773 0 : }
774 0 : }
775 :
776 : static void
777 : after_frag( fd_repair_tile_ctx_t * ctx,
778 : ulong in_idx,
779 : ulong seq FD_PARAM_UNUSED,
780 : ulong sig,
781 : ulong sz,
782 : ulong tsorig FD_PARAM_UNUSED,
783 : ulong tspub FD_PARAM_UNUSED,
784 0 : fd_stem_context_t * stem ) {
785 0 : if( FD_UNLIKELY( ctx->skip_frag ) ) return;
786 :
787 0 : ctx->stem = stem;
788 :
789 0 : uint in_kind = ctx->in_kind[ in_idx ];
790 0 : if( FD_UNLIKELY( in_kind==IN_KIND_GENESIS ) ) {
791 0 : fd_hash_t manifest_block_id = { .ul = { 0xf17eda2ce7b1d } }; /* FIXME manifest_block_id */
792 0 : fd_reasm_init( ctx->reasm, &manifest_block_id, 0 );
793 0 : fd_forest_init( ctx->forest, 0 );
794 0 : return;
795 0 : }
796 :
797 0 : if( FD_UNLIKELY( in_kind==IN_KIND_GOSSIP ) ) {
798 0 : fd_gossip_update_message_t const * msg = (fd_gossip_update_message_t const *)fd_type_pun_const( ctx->buffer );
799 0 : if( FD_LIKELY( sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO ) ){
800 0 : handle_contact_info_update( ctx, msg );
801 0 : } else {
802 : /* TODO: this needs to be implemented */
803 0 : handle_contact_info_remove( ctx, msg );
804 0 : }
805 0 : return;
806 0 : }
807 :
808 0 : if( FD_UNLIKELY( in_kind==IN_KIND_SIGN ) ) {
809 0 : fd_repair_handle_sign_response( ctx, in_idx, sig, stem );
810 0 : return;
811 0 : }
812 :
813 0 : if( FD_UNLIKELY( in_kind==IN_KIND_SHRED ) ) {
814 0 : int resolver_evicted = sz == 0;
815 0 : int fec_completes = sz == FD_SHRED_DATA_HEADER_SZ + sizeof(fd_hash_t) + sizeof(fd_hash_t);
816 0 : if( FD_UNLIKELY( resolver_evicted ) ) {
817 0 : ulong spilled_slot = fd_disco_shred_repair_shred_sig_slot ( sig );
818 0 : uint spilled_fec_set_idx = fd_disco_shred_repair_shred_sig_fec_set_idx( sig );
819 0 : uint spilled_max_idx = fd_disco_shred_repair_shred_sig_data_cnt ( sig );
820 :
821 0 : fd_forest_fec_clear( ctx->forest, spilled_slot, spilled_fec_set_idx, spilled_max_idx );
822 0 : return;
823 0 : }
824 :
825 0 : fd_shred_t * shred = (fd_shred_t *)fd_type_pun( ctx->buffer );
826 0 : uint nonce = FD_LOAD(uint, ctx->buffer + fd_shred_header_sz( shred->variant ) );
827 0 : if( FD_UNLIKELY( shred->slot <= fd_forest_root_slot( ctx->forest ) ) ) {
828 0 : FD_LOG_INFO(( "shred %lu %u %u too old, ignoring", shred->slot, shred->idx, shred->fec_set_idx ));
829 0 : return;
830 0 : };
831 0 : # if LOGGING
832 0 : if( FD_UNLIKELY( shred->slot > ctx->turbine_slot ) ) {
833 0 : FD_LOG_INFO(( "\n\n[Turbine]\n"
834 0 : "slot: %lu\n"
835 0 : "root: %lu\n",
836 0 : shred->slot,
837 0 : fd_forest_root_slot( ctx->forest ) ));
838 0 : }
839 0 : # endif
840 0 : ctx->turbine_slot = fd_ulong_max( shred->slot, ctx->turbine_slot );
841 0 : if( FD_UNLIKELY( ctx->turbine_slot0 == ULONG_MAX ) ) {
842 0 : ctx->turbine_slot0 = shred->slot;
843 0 : fd_catchup_set_turbine_slot0( ctx->catchup, shred->slot );
844 0 : }
845 :
846 : /* TODO add automated caught-up test */
847 :
848 : /* Insert the shred sig (shared by all shred members in the FEC set)
849 : into the map. */
850 :
851 0 : fd_fec_sig_t * fec_sig = fd_fec_sig_query( ctx->fec_sigs, (shred->slot << 32) | shred->fec_set_idx, NULL );
852 0 : if( FD_UNLIKELY( !fec_sig && !fec_completes ) ) {
853 0 : fec_sig = fd_fec_sig_insert( ctx->fec_sigs, (shred->slot << 32) | shred->fec_set_idx );
854 0 : memcpy( fec_sig->sig, shred->signature, sizeof(fd_ed25519_sig_t) );
855 0 : }
856 :
857 : /* When this is a FEC completes msg, it is implied that all the
858 : other shreds in the FEC set can also be inserted. Shred inserts
859 : into the forest are idempotent so it is fine to insert the same
860 : shred multiple times. */
861 :
862 0 : if( FD_UNLIKELY( fec_completes ) ) {
863 0 : fd_forest_blk_t * ele = fd_forest_blk_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off );
864 0 : fd_forest_fec_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off, shred->idx, shred->fec_set_idx, 0 );
865 0 : FD_TEST( ele ); /* must be non-empty */
866 :
867 0 : fd_hash_t const * merkle_root = (fd_hash_t const *)fd_type_pun_const( ctx->buffer + FD_SHRED_DATA_HEADER_SZ );
868 0 : fd_hash_t const * chained_merkle_root = (fd_hash_t const *)fd_type_pun_const( ctx->buffer + FD_SHRED_DATA_HEADER_SZ + sizeof(fd_hash_t) );
869 :
870 0 : int data_complete = !!( shred->data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE );
871 0 : int slot_complete = !!( shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE );
872 :
873 0 : FD_TEST( !fd_reasm_query( ctx->reasm, merkle_root ) );
874 0 : fd_hash_t const * cmr = chained_merkle_root;
875 0 : if( FD_UNLIKELY( shred->slot - shred->data.parent_off == fd_reasm_slot0( ctx->reasm ) && shred->fec_set_idx == 0) ) {
876 0 : cmr = &fd_reasm_root( ctx->reasm )->key;
877 0 : }
878 0 : FD_TEST( fd_reasm_insert( ctx->reasm, merkle_root, cmr, shred->slot, shred->fec_set_idx, shred->data.parent_off, (ushort)(shred->idx - shred->fec_set_idx + 1), data_complete, slot_complete ) );
879 :
880 0 : if( FD_UNLIKELY( ele->complete_idx != UINT_MAX && ele->buffered_idx == ele->complete_idx &&
881 0 : 0==memcmp( ele->cmpl, ele->fecs, sizeof(fd_forest_blk_idxs_t) * fd_forest_blk_idxs_word_cnt ) ) ) {
882 0 : long now = fd_tickcount();
883 0 : long start_ts = ele->first_req_ts == 0 || ele->slot > ctx->turbine_slot0 ? ele->first_shred_ts : ele->first_req_ts;
884 0 : fd_histf_sample( ctx->metrics->slot_compl_time, (ulong)(now - start_ts) );
885 0 : fd_catchup_add_slot( ctx->catchup, ele->slot, start_ts, now, ele->repair_cnt, ele->turbine_cnt );
886 0 : FD_LOG_INFO(( "slot is complete %lu. num_data_shreds: %u, num_repaired: %u, num_turbine: %u", ele->slot, ele->complete_idx + 1, ele->repair_cnt, ele->turbine_cnt ));
887 0 : }
888 0 : }
889 :
890 : /* Insert the shred into the map. */
891 :
892 0 : int is_code = fd_shred_is_code( fd_shred_type( shred->variant ) );
893 0 : int src = fd_disco_shred_repair_shred_sig_is_turbine( sig ) ? SHRED_SRC_TURBINE : SHRED_SRC_REPAIR;
894 0 : if( FD_LIKELY( !is_code ) ) {
895 0 : long rtt = 0;
896 0 : if( FD_UNLIKELY( ( rtt = fd_repair_inflight_remove( ctx->repair, shred->slot, shred->idx, nonce ) ) > 0 ) ) {
897 0 : fd_histf_sample( ctx->metrics->response_latency, (ulong)rtt );
898 0 : }
899 :
900 0 : int slot_complete = !!(shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE);
901 0 : fd_forest_blk_t * blk = fd_forest_blk_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off );
902 0 : fd_forest_data_shred_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off, shred->idx, shred->fec_set_idx, slot_complete, src );
903 :
904 : /* Check if there are FECs to force complete. Algorithm: window
905 : through the idxs in interval [i, j). If j = next fec_set_idx
906 : then we know we can force complete the FEC set interval [i, j)
907 : (assuming it wasn't already completed based on `cmpl`). */
908 :
909 0 : uint i = blk->consumed_idx + 1;
910 0 : for( uint j = i; j < blk->buffered_idx + 1; j++ ) {
911 0 : if( FD_UNLIKELY( fd_forest_blk_idxs_test( blk->fecs, j ) ) ) {
912 0 : fd_fec_sig_t * fec_sig = fd_fec_sig_query( ctx->fec_sigs, (shred->slot << 32) | i, NULL );
913 0 : if( FD_LIKELY( fec_sig ) ) {
914 0 : ulong sig = fd_ulong_load_8( fec_sig->sig );
915 0 : ulong tile_idx = sig % ctx->shred_tile_cnt;
916 0 : uint last_idx = j - i;
917 :
918 0 : uchar * chunk = fd_chunk_to_laddr( ctx->shred_out_ctx[tile_idx].mem, ctx->shred_out_ctx[tile_idx].chunk );
919 0 : memcpy( chunk, fec_sig->sig, sizeof(fd_ed25519_sig_t) );
920 0 : fd_fec_sig_remove( ctx->fec_sigs, fec_sig );
921 0 : fd_stem_publish( stem, ctx->shred_out_ctx[tile_idx].idx, last_idx, ctx->shred_out_ctx[tile_idx].chunk, sizeof(fd_ed25519_sig_t), 0UL, 0UL, 0UL );
922 0 : ctx->shred_out_ctx[tile_idx].chunk = fd_dcache_compact_next( ctx->shred_out_ctx[tile_idx].chunk, sizeof(fd_ed25519_sig_t), ctx->shred_out_ctx[tile_idx].chunk0, ctx->shred_out_ctx[tile_idx].wmark );
923 0 : blk->consumed_idx = j;
924 0 : i = j + 1;
925 0 : }
926 0 : }
927 0 : }
928 0 : } else {
929 0 : fd_forest_code_shred_insert( ctx->forest, shred->slot, shred->idx );
930 0 : }
931 0 : return;
932 0 : }
933 :
934 0 : if( FD_UNLIKELY( in_kind==IN_KIND_STAKE ) ) {
935 : //fd_repair_set_stake_weights_fini( ctx->repair );
936 0 : return;
937 0 : }
938 :
939 0 : if( FD_UNLIKELY( in_kind==IN_KIND_SNAP ) ) {
940 0 : after_frag_snap( ctx, sig, fd_chunk_to_laddr( ctx->in_links[ in_idx ].mem, ctx->snap_out_chunk ) );
941 0 : return;
942 0 : }
943 :
944 0 : if( FD_UNLIKELY( in_kind==IN_KIND_TOWER ) ) {
945 0 : return;
946 0 : }
947 :
948 0 : fd_eth_hdr_t const * eth = (fd_eth_hdr_t const *)ctx->buffer;
949 0 : fd_ip4_hdr_t const * ip4 = (fd_ip4_hdr_t const *)( (ulong)eth + sizeof(fd_eth_hdr_t) );
950 0 : fd_udp_hdr_t const * udp = (fd_udp_hdr_t const *)( (ulong)ip4 + FD_IP4_GET_LEN( *ip4 ) );
951 0 : uchar * data = (uchar *)( (ulong)udp + sizeof(fd_udp_hdr_t) );
952 0 : if( FD_UNLIKELY( (ulong)udp+sizeof(fd_udp_hdr_t) > (ulong)eth+sz ) ) return;
953 0 : ulong udp_sz = fd_ushort_bswap( udp->net_len );
954 0 : if( FD_UNLIKELY( udp_sz<sizeof(fd_udp_hdr_t) ) ) return;
955 0 : ulong data_sz = udp_sz-sizeof(fd_udp_hdr_t);
956 0 : if( FD_UNLIKELY( (ulong)data+data_sz > (ulong)eth+sz ) ) return;
957 :
958 0 : fd_ip4_port_t peer_addr = { .addr=ip4->saddr, .port=udp->net_sport };
959 0 : ushort dport = udp->net_dport;
960 0 : if( ctx->repair_intake_addr.port == dport ) {
961 0 : fd_repair_recv_clnt_packet( ctx, stem, ctx->repair, data, data_sz, &peer_addr, ip4->daddr );
962 0 : } else if( ctx->repair_serve_addr.port == dport ) {
963 0 : } else {
964 0 : FD_LOG_WARNING(( "Unexpectedly received packet for port %u", (uint)fd_ushort_bswap( dport ) ));
965 0 : }
966 0 : }
967 :
968 0 : #define MAX_REQ_PER_CREDIT 1
969 :
970 : static inline void
971 : after_credit( fd_repair_tile_ctx_t * ctx,
972 : fd_stem_context_t * stem,
973 : int * opt_poll_in,
974 0 : int * charge_busy ) {
975 :
976 0 : fd_forest_t * forest = ctx->forest;
977 0 : fd_forest_blk_t * pool = fd_forest_pool( forest );
978 0 : fd_forest_subtrees_t * subtrees = fd_forest_subtrees( forest );
979 :
980 0 : fd_reasm_fec_t * rfec = fd_reasm_next( ctx->reasm );
981 0 : if( FD_LIKELY( rfec ) ) {
982 :
983 0 : if( FD_LIKELY( ctx->store ) ) { /* some topologies don't run with store */
984 :
985 : /* Linking only requires a shared lock because the fields that are
986 : modified are only read on publish which uses exclusive lock. */
987 :
988 0 : long shacq_start, shacq_end, shrel_end;
989 :
990 0 : FD_STORE_SHARED_LOCK( ctx->store, shacq_start, shacq_end, shrel_end ) {
991 0 : if( FD_UNLIKELY( !fd_store_link( ctx->store, &rfec->key, &rfec->cmr ) ) ) FD_LOG_WARNING(( "failed to link %s %s. slot %lu fec_set_idx %u", FD_BASE58_ENC_32_ALLOCA( &rfec->key ), FD_BASE58_ENC_32_ALLOCA( &rfec->cmr ), rfec->slot, rfec->fec_set_idx ));
992 0 : } FD_STORE_SHARED_LOCK_END;
993 0 : fd_histf_sample( ctx->metrics->store_link_wait, (ulong)fd_long_max(shacq_end - shacq_start, 0) );
994 0 : fd_histf_sample( ctx->metrics->store_link_work, (ulong)fd_long_max(shrel_end - shacq_end, 0) );
995 0 : }
996 :
997 0 : ulong sig = rfec->slot << 32 | rfec->fec_set_idx;
998 0 : memcpy( fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk ), rfec, sizeof(fd_reasm_fec_t) );
999 0 : fd_stem_publish( stem, ctx->replay_out_idx, sig, ctx->replay_out_chunk, sizeof(fd_reasm_fec_t), 0, 0, fd_frag_meta_ts_comp( fd_tickcount() ) );
1000 0 : ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, sizeof(fd_reasm_fec_t), ctx->replay_out_chunk0, ctx->replay_out_wmark );
1001 :
1002 : /* We might have more reassembled FEC sets to deliver to the
1003 : downstream consumer, so prioritize that over sending out repairs
1004 : (which will only increase the number of buffered to send.) */
1005 :
1006 : /* FIXME instead of draining the chainer, only skip the rest of
1007 : after_credit and after_frag when the chainer pool is full.
1008 : requires a refactor to the chainer and topology. */
1009 :
1010 0 : *opt_poll_in = 0; *charge_busy = 1; return;
1011 0 : }
1012 :
1013 0 : if( FD_UNLIKELY( ctx->forest->root==ULONG_MAX ) ) return;
1014 0 : if( FD_UNLIKELY( ctx->repair->peer_cnt==0 ) ) return; /* no peers to send requests to */
1015 :
1016 0 : *charge_busy = 1;
1017 :
1018 0 : long now = fd_log_wallclock();
1019 :
1020 : #if MAX_REQ_PER_CREDIT > FD_REPAIR_NUM_NEEDED_PEERS
1021 : /* If the requests are > 1 per credit then we need to starve
1022 : after_credit for after_frag to get the chance to be called. We could
1023 : get rid of this all together considering max requests per credit is
1024 : 1 currently, but it could be useful for benchmarking purposes in the
1025 : future. */
1026 : if( FD_UNLIKELY( now - ctx->tsrepair < (long)20e6 ) ) {
1027 : return;
1028 : }
1029 : ctx->tsrepair = now;
1030 : #endif
1031 :
1032 : /* Verify that there is at least one sign tile with available credits.
1033 : If not, we can't send any requests and leave early. */
1034 0 : fd_repair_out_ctx_t * sign_out = sign_avail_credits( ctx );
1035 0 : if( FD_UNLIKELY( !sign_out ) ) {
1036 : // FD_LOG_NOTICE(( "No sign tiles have available credits" ));
1037 0 : return;
1038 0 : }
1039 :
1040 : /* Always request orphans first */
1041 0 : int total_req = 0;
1042 0 : for( fd_forest_subtrees_iter_t iter = fd_forest_subtrees_iter_init( subtrees, pool );
1043 0 : !fd_forest_subtrees_iter_done( iter, subtrees, pool );
1044 0 : iter = fd_forest_subtrees_iter_next( iter, subtrees, pool ) ) {
1045 0 : fd_forest_blk_t * orphan = fd_forest_subtrees_iter_ele( iter, subtrees, pool );
1046 0 : if( fd_repair_need_orphan( ctx->repair, orphan->slot ) ) {
1047 0 : fd_repair_send_requests_async( ctx, sign_out, fd_needed_orphan, orphan->slot, UINT_MAX, now );
1048 0 : total_req += FD_REPAIR_NUM_NEEDED_PEERS;
1049 0 : fd_repair_continue( ctx->repair );
1050 0 : return;
1051 0 : }
1052 0 : }
1053 :
1054 0 : if( FD_UNLIKELY( total_req >= MAX_REQ_PER_CREDIT ) ) {
1055 0 : fd_repair_continue( ctx->repair );
1056 0 : return; /* we have already sent enough requests */
1057 0 : }
1058 :
1059 : // Travel down frontier
1060 :
1061 : /* Every so often we'll need to reset the frontier iterator to the
1062 : head of frontier, because we could end up traversing down a very
1063 : long tree if we are far behind. */
1064 :
1065 0 : if( FD_UNLIKELY( now - ctx->tsreset > (long)100e6 ) ) {
1066 : // reset iterator to the beginning of the forest frontier
1067 0 : ctx->repair_iter = fd_forest_iter_init( ctx->forest );
1068 0 : ctx->tsreset = now;
1069 0 : }
1070 :
1071 : /* We are at the head of the turbine, so we should give turbine the
1072 : chance to complete the shreds. !ele handles an edgecase where all
1073 : frontier are fully complete and the iter is done */
1074 :
1075 0 : fd_forest_blk_t * ele = fd_forest_pool_ele( pool, ctx->repair_iter.ele_idx );
1076 0 : if( FD_LIKELY( !ele || ( ele->slot==ctx->turbine_slot && (now-ctx->tsreset)<(long)30e6 ) ) ) return;
1077 :
1078 0 : while( total_req < MAX_REQ_PER_CREDIT ){
1079 0 : ele = fd_forest_pool_ele( pool, ctx->repair_iter.ele_idx );
1080 :
1081 : // Request first, advance iterator second.
1082 0 : if( ctx->repair_iter.shred_idx == UINT_MAX && fd_repair_need_highest_window_index( ctx->repair, ele->slot, 0 ) ){
1083 0 : fd_repair_send_requests_async( ctx, sign_out, fd_needed_highest_window_index, ele->slot, 0, now );
1084 0 : total_req += FD_REPAIR_NUM_NEEDED_PEERS;
1085 0 : } else if( fd_repair_need_window_index( ctx->repair, ele->slot, ctx->repair_iter.shred_idx ) ) {
1086 0 : fd_repair_send_requests_async( ctx, sign_out, fd_needed_window_index, ele->slot, ctx->repair_iter.shred_idx, now );
1087 0 : total_req += FD_REPAIR_NUM_NEEDED_PEERS;
1088 0 : if( FD_UNLIKELY( ele->first_req_ts == 0 ) ) ele->first_req_ts = fd_tickcount();
1089 0 : }
1090 :
1091 0 : ctx->repair_iter = fd_forest_iter_next( ctx->repair_iter, forest );
1092 :
1093 0 : if( FD_UNLIKELY( fd_forest_iter_done( ctx->repair_iter, forest ) ) ) {
1094 : /* No more elements in the forest frontier, or the iterator got
1095 : invalidated, so we can start from top again. */
1096 0 : ctx->repair_iter = fd_forest_iter_init( forest );
1097 0 : break;
1098 0 : }
1099 0 : }
1100 :
1101 0 : fd_repair_continue( ctx->repair );
1102 0 : }
1103 :
1104 : static inline void
1105 0 : during_housekeeping( fd_repair_tile_ctx_t * ctx ) {
1106 0 : fd_repair_settime( ctx->repair, fd_log_wallclock() );
1107 :
1108 : # if DEBUG_LOGGING
1109 : long now = fd_log_wallclock();
1110 : if( FD_UNLIKELY( now - ctx->tsprint > (long)10e9 ) ) {
1111 : fd_forest_print( ctx->forest );
1112 : fd_reasm_print( ctx->reasm );
1113 : ctx->tsprint = fd_log_wallclock();
1114 : }
1115 : # endif
1116 0 : }
1117 :
1118 : static void
1119 : privileged_init( fd_topo_t * topo,
1120 0 : fd_topo_tile_t * tile ) {
1121 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1122 :
1123 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1124 0 : fd_repair_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_repair_tile_ctx_t), sizeof(fd_repair_tile_ctx_t) );
1125 0 : fd_memset( ctx, 0, sizeof(fd_repair_tile_ctx_t) );
1126 :
1127 0 : uchar const * identity_key = fd_keyload_load( tile->repair.identity_key_path, /* pubkey only: */ 0 );
1128 0 : fd_memcpy( ctx->identity_private_key, identity_key, sizeof(fd_pubkey_t) );
1129 0 : fd_memcpy( ctx->identity_public_key.uc, identity_key + 32UL, sizeof(fd_pubkey_t) );
1130 :
1131 0 : ctx->repair_config.private_key = ctx->identity_private_key;
1132 0 : ctx->repair_config.public_key = &ctx->identity_public_key;
1133 :
1134 0 : FD_TEST( fd_rng_secure( &ctx->repair_seed, sizeof(ulong) ) );
1135 0 : }
1136 :
1137 : static void
1138 : unprivileged_init( fd_topo_t * topo,
1139 0 : fd_topo_tile_t * tile ) {
1140 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1141 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1142 0 : fd_repair_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_repair_tile_ctx_t), sizeof(fd_repair_tile_ctx_t) );
1143 0 : ctx->tsprint = fd_log_wallclock();
1144 0 : ctx->tsrepair = fd_log_wallclock();
1145 0 : ctx->tsreset = fd_log_wallclock();
1146 :
1147 0 : if( FD_UNLIKELY( tile->in_cnt > MAX_IN_LINKS ) ) FD_LOG_ERR(( "repair tile has too many input links" ));
1148 :
1149 0 : ctx->sign_repair_in_cnt = 0;
1150 0 : for( uint in_idx=0U; in_idx<(tile->in_cnt); in_idx++ ) {
1151 0 : fd_topo_link_t * link = &topo->links[ tile->in_link_id[ in_idx ] ];
1152 0 : if( 0==strcmp( link->name, "net_repair" ) ) {
1153 0 : ctx->in_kind[ in_idx ] = IN_KIND_NET;
1154 0 : fd_net_rx_bounds_init( &ctx->in_links[ in_idx ].net_rx, link->dcache );
1155 0 : continue;
1156 0 : } else if( 0==strcmp( link->name, "gossip_out" ) ) {
1157 0 : ctx->in_kind[ in_idx ] = IN_KIND_GOSSIP;
1158 0 : } else if( 0==strcmp( link->name, "tower_out" ) ) {
1159 0 : ctx->in_kind[ in_idx ] = IN_KIND_TOWER;
1160 0 : } else if( 0==strcmp( link->name, "shred_repair" ) ) {
1161 0 : ctx->in_kind[ in_idx ] = IN_KIND_SHRED;
1162 0 : } else if( 0==strcmp( link->name, "sign_repair" ) || 0==strcmp( link->name, "sign_ping" ) ) {
1163 0 : ctx->in_kind[ in_idx ] = IN_KIND_SIGN;
1164 0 : if( 0==strcmp( link->name, "sign_ping" ) ) {
1165 0 : ctx->in_kind[ in_idx ] = IN_KIND_SIGN;
1166 0 : ctx->ping_sign_in_idx = in_idx;
1167 0 : } if( 0==strcmp( link->name, "sign_repair" ) ) {
1168 0 : ctx->in_kind[ in_idx ] = IN_KIND_SIGN;
1169 0 : ctx->sign_repair_in_idx[ ctx->sign_repair_in_cnt ] = in_idx;
1170 0 : ctx->sign_repair_in_depth[ ctx->sign_repair_in_cnt ] = link->depth;
1171 0 : ctx->sign_repair_in_cnt++;
1172 0 : }
1173 0 : } else if( 0==strcmp( link->name, "snap_out" ) ) {
1174 0 : ctx->in_kind[ in_idx ] = IN_KIND_SNAP;
1175 0 : } else if( 0==strcmp( link->name, "replay_stake" ) ) {
1176 0 : ctx->in_kind[ in_idx ] = IN_KIND_STAKE;
1177 0 : } else if( 0==strcmp( link->name, "genesi_out" ) ) {
1178 0 : ctx->in_kind[ in_idx ] = IN_KIND_GENESIS;
1179 0 : }else {
1180 0 : FD_LOG_ERR(( "repair tile has unexpected input link %s", link->name ));
1181 0 : }
1182 :
1183 : // ulong i = fd_topo_find_tile_in_link( topo, tile, "snap_out", 0 );
1184 : // FD_LOG_ERR(( "snap_out link idx %lu", i ));
1185 :
1186 0 : ctx->in_links[ in_idx ].mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1187 0 : ctx->in_links[ in_idx ].chunk0 = fd_dcache_compact_chunk0( ctx->in_links[ in_idx ].mem, link->dcache );
1188 0 : ctx->in_links[ in_idx ].wmark = fd_dcache_compact_wmark ( ctx->in_links[ in_idx ].mem, link->dcache, link->mtu );
1189 0 : ctx->in_links[ in_idx ].mtu = link->mtu;
1190 :
1191 0 : FD_TEST( fd_dcache_compact_is_safe( ctx->in_links[in_idx].mem, link->dcache, link->mtu, link->depth ) );
1192 0 : }
1193 :
1194 0 : uint net_link_out_idx = UINT_MAX;
1195 0 : ctx->ping_sign_out_idx = UINT_MAX;
1196 0 : ctx->repair_sign_cnt = 0;
1197 0 : ctx->request_seq = 0UL;
1198 0 : uint shred_tile_idx = 0;
1199 0 : uint sign_repair_match_cnt = 0;
1200 0 : ctx->round_robin_idx = 0UL;
1201 :
1202 0 : for( uint out_idx=0U; out_idx<(tile->out_cnt); out_idx++ ) {
1203 0 : fd_topo_link_t * link = &topo->links[ tile->out_link_id[ out_idx ] ];
1204 :
1205 0 : if( 0==strcmp( link->name, "repair_net" ) ) {
1206 :
1207 0 : if( net_link_out_idx!=UINT_MAX ) continue; /* only use first net link */
1208 0 : net_link_out_idx = out_idx;
1209 0 : ctx->net_out_idx = out_idx;
1210 0 : ctx->net_out_mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1211 0 : ctx->net_out_chunk0 = fd_dcache_compact_chunk0( ctx->net_out_mem, link->dcache );
1212 0 : ctx->net_out_wmark = fd_dcache_compact_wmark( ctx->net_out_mem, link->dcache, link->mtu );
1213 0 : ctx->net_out_chunk = ctx->net_out_chunk0;
1214 0 : } else if( 0==strcmp( link->name, "repair_repla" ) ) {
1215 :
1216 0 : ctx->replay_out_idx = out_idx;
1217 0 : ctx->replay_out_mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1218 0 : ctx->replay_out_chunk0 = fd_dcache_compact_chunk0( ctx->replay_out_mem, link->dcache );
1219 0 : ctx->replay_out_wmark = fd_dcache_compact_wmark( ctx->replay_out_mem, link->dcache, link->mtu );
1220 0 : ctx->replay_out_chunk = ctx->replay_out_chunk0;
1221 :
1222 0 : } else if( 0==strcmp( link->name, "repair_shred" ) ) {
1223 :
1224 0 : fd_repair_out_ctx_t * shred_out = &ctx->shred_out_ctx[ shred_tile_idx++ ];
1225 0 : shred_out->idx = out_idx;
1226 0 : shred_out->mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1227 0 : shred_out->chunk0 = fd_dcache_compact_chunk0( shred_out->mem, link->dcache );
1228 0 : shred_out->wmark = fd_dcache_compact_wmark( shred_out->mem, link->dcache, link->mtu );
1229 0 : shred_out->chunk = shred_out->chunk0;
1230 :
1231 0 : } else if( 0==strcmp( link->name, "repair_scap" ) ) {
1232 :
1233 0 : ctx->shredcap_enabled = 1;
1234 0 : ctx->shredcap_out_idx = out_idx;
1235 0 : ctx->shredcap_out_mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1236 0 : ctx->shredcap_out_chunk0 = fd_dcache_compact_chunk0( ctx->shredcap_out_mem, link->dcache );
1237 0 : ctx->shredcap_out_wmark = fd_dcache_compact_wmark( ctx->shredcap_out_mem, link->dcache, link->mtu );
1238 0 : ctx->shredcap_out_chunk = ctx->shredcap_out_chunk0;
1239 :
1240 0 : } else if( 0==strcmp( link->name, "ping_sign" ) ) {
1241 0 : ctx->ping_sign_out_idx = out_idx;
1242 0 : ctx->ping_sign_out_mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1243 0 : ctx->ping_sign_out_chunk0 = fd_dcache_compact_chunk0( ctx->ping_sign_out_mem, link->dcache );
1244 0 : ctx->ping_sign_out_wmark = fd_dcache_compact_wmark( ctx->ping_sign_out_mem, link->dcache, link->mtu );
1245 0 : ctx->ping_sign_out_chunk = ctx->ping_sign_out_chunk0;
1246 :
1247 0 : } else if( 0==strcmp( link->name, "repair_sign" ) ) {
1248 0 : fd_repair_out_ctx_t * repair_sign_out = &ctx->repair_sign_out_ctx[ ctx->repair_sign_cnt++ ];
1249 0 : repair_sign_out->idx = out_idx;
1250 0 : repair_sign_out->mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1251 0 : repair_sign_out->chunk0 = fd_dcache_compact_chunk0( repair_sign_out->mem, link->dcache );
1252 0 : repair_sign_out->wmark = fd_dcache_compact_wmark( repair_sign_out->mem, link->dcache, link->mtu );
1253 0 : repair_sign_out->chunk = repair_sign_out->chunk0;
1254 0 : repair_sign_out->in_idx = ctx->sign_repair_in_idx[ sign_repair_match_cnt ];
1255 0 : repair_sign_out->max_credits = ctx->sign_repair_in_depth[ sign_repair_match_cnt ];
1256 0 : repair_sign_out->credits = ctx->sign_repair_in_depth[ sign_repair_match_cnt ];
1257 0 : sign_repair_match_cnt++;
1258 :
1259 0 : } else {
1260 0 : FD_LOG_ERR(( "repair tile has unexpected output link %s", link->name ));
1261 0 : }
1262 :
1263 0 : }
1264 0 : if( FD_UNLIKELY( ctx->ping_sign_out_idx==UINT_MAX ) ) FD_LOG_ERR(( "Missing ping_sign link for keyguard client" ));
1265 0 : if( FD_UNLIKELY( net_link_out_idx ==UINT_MAX ) ) FD_LOG_ERR(( "Missing repair_net link" ));
1266 0 : if( FD_UNLIKELY( ctx->repair_sign_cnt != ctx->sign_repair_in_cnt ) ) {
1267 0 : FD_LOG_ERR(( "Mismatch between repair_sign output links (%lu) and sign_repair input links (%lu)",
1268 0 : ctx->repair_sign_cnt, ctx->sign_repair_in_cnt ));
1269 0 : }
1270 :
1271 0 : ctx->shred_tile_cnt = shred_tile_idx;
1272 0 : FD_TEST( ctx->shred_tile_cnt == fd_topo_tile_name_cnt( topo, "shred" ) );
1273 :
1274 : /* Scratch mem setup */
1275 :
1276 0 : ctx->repair = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_align(), fd_repair_footprint() );
1277 0 : ctx->forest = FD_SCRATCH_ALLOC_APPEND( l, fd_forest_align(), fd_forest_footprint( tile->repair.slot_max ) );
1278 0 : ctx->fec_sigs = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_sig_align(), fd_fec_sig_footprint( 20 ) );
1279 0 : ctx->reasm = FD_SCRATCH_ALLOC_APPEND( l, fd_reasm_align(), fd_reasm_footprint( 1 << 20 ) );
1280 0 : ctx->catchup = FD_SCRATCH_ALLOC_APPEND( l, fd_catchup_align(), fd_catchup_footprint() );
1281 :
1282 0 : ctx->store = NULL;
1283 0 : ulong store_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "store" );
1284 0 : if( FD_LIKELY( store_obj_id!=ULONG_MAX ) ) { /* firedancer-only */
1285 0 : ctx->store = fd_store_join( fd_topo_obj_laddr( topo, store_obj_id ) );
1286 0 : FD_TEST( ctx->store->magic == FD_STORE_MAGIC );
1287 0 : }
1288 :
1289 0 : void * smem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( FD_REPAIR_SCRATCH_MAX ) );
1290 0 : void * fmem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( FD_REPAIR_SCRATCH_DEPTH ) );
1291 :
1292 0 : FD_TEST( ( !!smem ) & ( !!fmem ) );
1293 0 : fd_scratch_attach( smem, fmem, FD_REPAIR_SCRATCH_MAX, FD_REPAIR_SCRATCH_DEPTH );
1294 :
1295 0 : ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
1296 :
1297 0 : ctx->repair_intake_addr.port = fd_ushort_bswap( tile->repair.repair_intake_listen_port );
1298 0 : ctx->repair_serve_addr.port = fd_ushort_bswap( tile->repair.repair_serve_listen_port );
1299 :
1300 0 : ctx->repair_intake_listen_port = tile->repair.repair_intake_listen_port;
1301 0 : ctx->repair_serve_listen_port = tile->repair.repair_serve_listen_port;
1302 :
1303 0 : ctx->net_id = (ushort)0;
1304 :
1305 0 : fd_ip4_udp_hdr_init( ctx->intake_hdr, FD_REPAIR_MAX_PACKET_SIZE, 0, ctx->repair_intake_listen_port );
1306 0 : fd_ip4_udp_hdr_init( ctx->serve_hdr, FD_REPAIR_MAX_PACKET_SIZE, 0, ctx->repair_serve_listen_port );
1307 :
1308 0 : fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id[ ctx->ping_sign_in_idx ] ];
1309 0 : fd_topo_link_t * sign_out = &topo->links[ tile->out_link_id[ ctx->ping_sign_out_idx ] ];
1310 0 : if( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client,
1311 0 : sign_out->mcache,
1312 0 : sign_out->dcache,
1313 0 : sign_in->mcache,
1314 0 : sign_in->dcache,
1315 0 : sign_out->mtu ) ) == NULL ) {
1316 0 : FD_LOG_ERR(( "Keyguard join failed" ));
1317 0 : }
1318 :
1319 : /* Repair set up */
1320 :
1321 0 : ctx->repair = fd_repair_join ( fd_repair_new ( ctx->repair, ctx->repair_seed ) );
1322 0 : ctx->forest = fd_forest_join ( fd_forest_new ( ctx->forest, tile->repair.slot_max, ctx->repair_seed ) );
1323 0 : ctx->fec_sigs = fd_fec_sig_join( fd_fec_sig_new( ctx->fec_sigs, 20 ) );
1324 0 : ctx->reasm = fd_reasm_join ( fd_reasm_new ( ctx->reasm, 1 << 20, 0 ) );
1325 0 : ctx->catchup = fd_catchup_join( fd_catchup_new( ctx->catchup ) );
1326 :
1327 0 : ctx->repair->next_nonce = 1;
1328 :
1329 0 : ctx->repair_iter = fd_forest_iter_init( ctx->forest );
1330 0 : FD_TEST( fd_forest_iter_done( ctx->repair_iter, ctx->forest ) );
1331 :
1332 0 : ctx->turbine_slot = 0;
1333 0 : ctx->turbine_slot0 = ULONG_MAX;
1334 :
1335 0 : FD_LOG_INFO(( "repair my addr - intake addr: " FD_IP4_ADDR_FMT ":%u, serve_addr: " FD_IP4_ADDR_FMT ":%u",
1336 0 : FD_IP4_ADDR_FMT_ARGS( ctx->repair_intake_addr.addr ), fd_ushort_bswap( ctx->repair_intake_addr.port ),
1337 0 : FD_IP4_ADDR_FMT_ARGS( ctx->repair_serve_addr.addr ), fd_ushort_bswap( ctx->repair_serve_addr.port ) ));
1338 :
1339 0 : if( fd_repair_set_config( ctx->repair, &ctx->repair_config ) ) {
1340 0 : FD_LOG_ERR( ( "error setting repair config" ) );
1341 0 : }
1342 :
1343 0 : fd_repair_update_addr( ctx->repair, &ctx->repair_intake_addr, &ctx->repair_serve_addr );
1344 :
1345 0 : fd_histf_join( fd_histf_new( ctx->metrics->store_link_wait, FD_MHIST_SECONDS_MIN( REPAIR, STORE_LINK_WAIT ),
1346 0 : FD_MHIST_SECONDS_MAX( REPAIR, STORE_LINK_WAIT ) ) );
1347 0 : fd_histf_join( fd_histf_new( ctx->metrics->store_link_work, FD_MHIST_SECONDS_MIN( REPAIR, STORE_LINK_WORK ),
1348 0 : FD_MHIST_SECONDS_MAX( REPAIR, STORE_LINK_WORK ) ) );
1349 0 : fd_histf_join( fd_histf_new( ctx->metrics->slot_compl_time, FD_MHIST_SECONDS_MIN( REPAIR, SLOT_COMPLETE_TIME ),
1350 0 : FD_MHIST_SECONDS_MAX( REPAIR, SLOT_COMPLETE_TIME ) ) );
1351 0 : fd_histf_join( fd_histf_new( ctx->metrics->response_latency, FD_MHIST_MIN( REPAIR, RESPONSE_LATENCY ),
1352 0 : FD_MHIST_MAX( REPAIR, RESPONSE_LATENCY ) ) );
1353 0 : fd_repair_settime( ctx->repair, fd_log_wallclock() );
1354 0 : fd_repair_start( ctx->repair );
1355 :
1356 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
1357 0 : if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
1358 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
1359 0 : }
1360 :
1361 : static ulong
1362 : populate_allowed_seccomp( fd_topo_t const * topo FD_PARAM_UNUSED,
1363 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
1364 : ulong out_cnt,
1365 0 : struct sock_filter * out ) {
1366 0 : populate_sock_filter_policy_fd_repair_tile(
1367 0 : out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)-1 );
1368 0 : return sock_filter_policy_fd_repair_tile_instr_cnt;
1369 0 : }
1370 :
1371 : static ulong
1372 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
1373 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
1374 : ulong out_fds_cnt,
1375 0 : int * out_fds ) {
1376 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
1377 :
1378 0 : ulong out_cnt = 0UL;
1379 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
1380 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
1381 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
1382 0 : return out_cnt;
1383 0 : }
1384 :
1385 : static inline void
1386 0 : metrics_write( fd_repair_tile_ctx_t * ctx ) {
1387 : /* Repair-protocol-specific metrics */
1388 0 : FD_MCNT_SET( REPAIR, RECV_CLNT_PKT, ctx->metrics->recv_clnt_pkt );
1389 0 : FD_MCNT_SET( REPAIR, RECV_SERV_PKT, ctx->metrics->recv_serv_pkt );
1390 0 : FD_MCNT_SET( REPAIR, RECV_SERV_CORRUPT_PKT, ctx->metrics->recv_serv_corrupt_pkt );
1391 0 : FD_MCNT_SET( REPAIR, RECV_SERV_INVALID_SIGNATURE, ctx->metrics->recv_serv_invalid_signature );
1392 0 : FD_MCNT_SET( REPAIR, RECV_SERV_FULL_PING_TABLE, ctx->metrics->recv_serv_full_ping_table );
1393 0 : FD_MCNT_SET( REPAIR, RECV_PKT_CORRUPTED_MSG, ctx->metrics->recv_pkt_corrupted_msg );
1394 0 : FD_MCNT_SET( REPAIR, REQUEST_PEERS, ctx->repair->peer_cnt );
1395 :
1396 0 : FD_MCNT_SET ( REPAIR, SHRED_REPAIR_REQ, ctx->metrics->send_pkt_cnt );
1397 0 : FD_MCNT_ENUM_COPY( REPAIR, RECV_SERV_PKT_TYPES, ctx->metrics->recv_serv_pkt_types );
1398 0 : FD_MCNT_ENUM_COPY( REPAIR, SENT_PKT_TYPES, ctx->metrics->sent_pkt_types );
1399 :
1400 0 : FD_MHIST_COPY( REPAIR, STORE_LINK_WAIT, ctx->metrics->store_link_wait );
1401 0 : FD_MHIST_COPY( REPAIR, STORE_LINK_WORK, ctx->metrics->store_link_work );
1402 0 : FD_MHIST_COPY( REPAIR, SLOT_COMPLETE_TIME, ctx->metrics->slot_compl_time );
1403 0 : FD_MHIST_COPY( REPAIR, RESPONSE_LATENCY, ctx->metrics->response_latency );
1404 :
1405 0 : ulong max_repaired_slot = 0;
1406 0 : fd_forest_consumed_t const * consumed = fd_forest_consumed_const( ctx->forest );
1407 0 : fd_forest_cns_t const * conspool = fd_forest_conspool_const( ctx->forest );
1408 0 : fd_forest_blk_t const * pool = fd_forest_pool_const( ctx->forest );
1409 0 : for( fd_forest_consumed_iter_t iter = fd_forest_consumed_iter_init( consumed, conspool );
1410 0 : !fd_forest_consumed_iter_done( iter, consumed, conspool );
1411 0 : iter = fd_forest_consumed_iter_next( iter, consumed, conspool ) ) {
1412 0 : fd_forest_cns_t const * ele = fd_forest_consumed_iter_ele_const( iter, consumed, conspool );
1413 0 : fd_forest_blk_t const * ele_ = fd_forest_pool_ele_const( pool, ele->forest_pool_idx );
1414 0 : if( ele_->slot > max_repaired_slot ) max_repaired_slot = ele_->slot;
1415 0 : }
1416 0 : FD_MCNT_SET( REPAIR, REPAIRED_SLOTS, max_repaired_slot );
1417 0 : }
1418 :
1419 : /* TODO: This is not correct, but is temporary and will be fixed
1420 : when the new store is implemented allowing the burst to be increased.
1421 : The burst should be bounded by the number of stem_publishes that
1422 : occur in a single frag loop. */
1423 0 : #define STEM_BURST (64UL)
1424 :
1425 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_repair_tile_ctx_t
1426 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_repair_tile_ctx_t)
1427 :
1428 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
1429 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
1430 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
1431 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
1432 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
1433 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
1434 :
1435 : #include "../../disco/stem/fd_stem.c"
1436 :
1437 : fd_topo_run_tile_t fd_tile_repair = {
1438 : .name = "repair",
1439 : .loose_footprint = loose_footprint,
1440 : .populate_allowed_seccomp = populate_allowed_seccomp,
1441 : .populate_allowed_fds = populate_allowed_fds,
1442 : .scratch_align = scratch_align,
1443 : .scratch_footprint = scratch_footprint,
1444 : .unprivileged_init = unprivileged_init,
1445 : .privileged_init = privileged_init,
1446 : .run = stem_run,
1447 : };
|