Line data Source code
1 : /* Repair tile runs the repair protocol for a Firedancer node. */
2 : #include "fd_fec_chainer.h"
3 : #define _GNU_SOURCE
4 :
5 : #include "../../disco/topo/fd_topo.h"
6 : #include "generated/fd_repair_tile_seccomp.h"
7 :
8 : #include "../../flamenco/repair/fd_repair.h"
9 : #include "../../flamenco/leaders/fd_leaders_base.h"
10 : #include "../../disco/fd_disco.h"
11 : #include "../../disco/keyguard/fd_keyload.h"
12 : #include "../../disco/keyguard/fd_keyguard_client.h"
13 : #include "../../disco/keyguard/fd_keyguard.h"
14 : #include "../../disco/net/fd_net_tile.h"
15 : #include "../../disco/store/fd_store.h"
16 : #include "../../util/pod/fd_pod_format.h"
17 : #include "../../util/net/fd_net_headers.h"
18 :
19 : #include "../forest/fd_forest.h"
20 : #include "fd_fec_chainer.h"
21 :
22 0 : #define IN_KIND_NET (0)
23 0 : #define IN_KIND_CONTACT (1)
24 0 : #define IN_KIND_STAKE (2)
25 0 : #define IN_KIND_SHRED (3)
26 0 : #define IN_KIND_SIGN (4)
27 : #define MAX_IN_LINKS (16)
28 :
29 : #define NET_OUT_IDX (0)
30 : #define SIGN_OUT_IDX (1)
31 0 : #define REPLAY_OUT_IDX (2)
32 : #define ARCHIVE_OUT_IDX (3)
33 :
34 : #define MAX_REPAIR_PEERS 40200UL
35 : #define MAX_BUFFER_SIZE ( MAX_REPAIR_PEERS * sizeof(fd_shred_dest_wire_t))
36 : #define MAX_SHRED_TILE_CNT (16UL)
37 :
38 : typedef union {
39 : struct {
40 : fd_wksp_t * mem;
41 : ulong chunk0;
42 : ulong wmark;
43 : ulong mtu;
44 : };
45 : fd_net_rx_bounds_t net_rx;
46 : } fd_repair_in_ctx_t;
47 :
48 : struct fd_repair_out_ctx {
49 : ulong idx;
50 : fd_wksp_t * mem;
51 : ulong chunk0;
52 : ulong wmark;
53 : ulong chunk;
54 : };
55 : typedef struct fd_repair_out_ctx fd_repair_out_ctx_t;
56 :
57 : struct fd_fec_sig {
58 : ulong key; /* map key. 32 msb = slot, 32 lsb = fec_set_idx */
59 : fd_ed25519_sig_t sig; /* Ed25519 sig identifier of the FEC. */
60 : };
61 : typedef struct fd_fec_sig fd_fec_sig_t;
62 :
63 : #define MAP_NAME fd_fec_sig
64 0 : #define MAP_T fd_fec_sig_t
65 : #define MAP_MEMOIZE 0
66 : #include "../../util/tmpl/fd_map_dynamic.c"
67 :
68 : struct fd_reasm {
69 : ulong slot;
70 : uint cnt;
71 : };
72 : typedef struct fd_reasm fd_reasm_t;
73 :
74 : #define MAP_NAME fd_reasm
75 0 : #define MAP_T fd_reasm_t
76 0 : #define MAP_KEY slot
77 : #define MAP_MEMOIZE 0
78 : #include "../../util/tmpl/fd_map_dynamic.c"
79 :
80 : struct fd_repair_tile_ctx {
81 : long tsprint; /* timestamp for printing */
82 : long tsrepair; /* timestamp for repair */
83 : long tsreset; /* timestamp for resetting iterator */
84 : ulong * wmark;
85 : ulong prev_wmark;
86 :
87 : fd_repair_t * repair;
88 : fd_repair_config_t repair_config;
89 :
90 : ulong repair_seed;
91 :
92 : fd_repair_peer_addr_t repair_intake_addr;
93 : fd_repair_peer_addr_t repair_serve_addr;
94 :
95 : ushort repair_intake_listen_port;
96 : ushort repair_serve_listen_port;
97 :
98 : fd_forest_t * forest;
99 : fd_fec_sig_t * fec_sigs;
100 : fd_reasm_t * reasm;
101 : fd_fec_chainer_t * fec_chainer;
102 : fd_forest_iter_t repair_iter;
103 : fd_store_t * store;
104 :
105 : ulong * turbine_slot0;
106 : ulong * turbine_slot;
107 :
108 : uchar identity_private_key[ 32 ];
109 : fd_pubkey_t identity_public_key;
110 :
111 : fd_wksp_t * wksp;
112 :
113 : uchar in_kind[ MAX_IN_LINKS ];
114 : fd_repair_in_ctx_t in_links[ MAX_IN_LINKS ];
115 :
116 : int skip_frag;
117 :
118 : uint net_out_idx;
119 : fd_wksp_t * net_out_mem;
120 : ulong net_out_chunk0;
121 : ulong net_out_wmark;
122 : ulong net_out_chunk;
123 :
124 : fd_wksp_t * replay_out_mem;
125 : ulong replay_out_chunk0;
126 : ulong replay_out_wmark;
127 : ulong replay_out_chunk;
128 :
129 : /* These will only be used if shredcap is enabled */
130 : uint shredcap_out_idx;
131 : uint shredcap_enabled;
132 : fd_wksp_t * shredcap_out_mem;
133 : ulong shredcap_out_chunk0;
134 : ulong shredcap_out_wmark;
135 : ulong shredcap_out_chunk;
136 :
137 : uint shred_tile_cnt;
138 : fd_repair_out_ctx_t shred_out_ctx[ MAX_SHRED_TILE_CNT ];
139 :
140 : ushort net_id;
141 : /* Includes Ethernet, IP, UDP headers */
142 : uchar buffer[ MAX_BUFFER_SIZE ];
143 : fd_ip4_udp_hdrs_t intake_hdr[1];
144 : fd_ip4_udp_hdrs_t serve_hdr [1];
145 :
146 : fd_keyguard_client_t keyguard_client[1];
147 : };
148 : typedef struct fd_repair_tile_ctx fd_repair_tile_ctx_t;
149 :
150 : FD_FN_CONST static inline ulong
151 0 : scratch_align( void ) {
152 0 : return 128UL;
153 0 : }
154 :
155 : FD_FN_PURE static inline ulong
156 0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
157 0 : return 1UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
158 0 : }
159 :
160 : FD_FN_PURE static inline ulong
161 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
162 :
163 0 : ulong l = FD_LAYOUT_INIT;
164 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_repair_tile_ctx_t), sizeof(fd_repair_tile_ctx_t) );
165 0 : l = FD_LAYOUT_APPEND( l, fd_repair_align(), fd_repair_footprint() );
166 0 : l = FD_LAYOUT_APPEND( l, fd_forest_align(), fd_forest_footprint( tile->repair.slot_max ) );
167 0 : l = FD_LAYOUT_APPEND( l, fd_fec_sig_align(), fd_fec_sig_footprint( 20 ) );
168 0 : l = FD_LAYOUT_APPEND( l, fd_reasm_align(), fd_reasm_footprint( 20 ) );
169 : // l = FD_LAYOUT_APPEND( l, fd_fec_repair_align(), fd_fec_repair_footprint( ( 1<<20 ), tile->repair.shred_tile_cnt ) );
170 0 : l = FD_LAYOUT_APPEND( l, fd_fec_chainer_align(), fd_fec_chainer_footprint( 1 << 20 ) ); // TODO: fix this
171 0 : l = FD_LAYOUT_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( FD_REPAIR_SCRATCH_MAX ) );
172 0 : l = FD_LAYOUT_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( FD_REPAIR_SCRATCH_DEPTH ) );
173 0 : return FD_LAYOUT_FINI( l, scratch_align() );
174 0 : }
175 :
176 : static void
177 : repair_signer( void * signer_ctx,
178 : uchar signature[ static 64 ],
179 : uchar const * buffer,
180 : ulong len,
181 0 : int sign_type ) {
182 0 : fd_repair_tile_ctx_t * ctx = (fd_repair_tile_ctx_t *) signer_ctx;
183 0 : fd_keyguard_client_sign( ctx->keyguard_client, signature, buffer, len, sign_type );
184 0 : }
185 :
186 : static void
187 : send_packet( fd_repair_tile_ctx_t * ctx,
188 : fd_stem_context_t * stem,
189 : int is_intake,
190 : uint dst_ip_addr,
191 : ushort dst_port,
192 : uint src_ip_addr,
193 : uchar const * payload,
194 : ulong payload_sz,
195 0 : ulong tsorig ) {
196 0 : uchar * packet = fd_chunk_to_laddr( ctx->net_out_mem, ctx->net_out_chunk );
197 0 : fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)packet;
198 0 : *hdr = *(is_intake ? ctx->intake_hdr : ctx->serve_hdr);
199 :
200 0 : fd_ip4_hdr_t * ip4 = hdr->ip4;
201 0 : ip4->saddr = src_ip_addr;
202 0 : ip4->daddr = dst_ip_addr;
203 0 : ip4->net_id = fd_ushort_bswap( ctx->net_id++ );
204 0 : ip4->check = 0U;
205 0 : ip4->net_tot_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_ip4_hdr_t)+sizeof(fd_udp_hdr_t)) );
206 0 : ip4->check = fd_ip4_hdr_check_fast( ip4 );
207 :
208 0 : fd_udp_hdr_t * udp = hdr->udp;
209 0 : udp->net_dport = dst_port;
210 0 : udp->net_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_udp_hdr_t)) );
211 0 : fd_memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), payload, payload_sz );
212 0 : hdr->udp->check = 0U;
213 :
214 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
215 0 : ulong sig = fd_disco_netmux_sig( dst_ip_addr, dst_port, dst_ip_addr, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) );
216 0 : ulong packet_sz = payload_sz + sizeof(fd_ip4_udp_hdrs_t);
217 0 : ulong chunk = ctx->net_out_chunk;
218 0 : fd_stem_publish( stem, ctx->net_out_idx, sig, chunk, packet_sz, 0UL, tsorig, tspub );
219 0 : ctx->net_out_chunk = fd_dcache_compact_next( chunk, packet_sz, ctx->net_out_chunk0, ctx->net_out_wmark );
220 0 : }
221 :
222 : static inline void
223 : handle_new_cluster_contact_info( fd_repair_tile_ctx_t * ctx,
224 : uchar const * buf,
225 0 : ulong buf_sz ) {
226 0 : fd_shred_dest_wire_t const * in_dests = (fd_shred_dest_wire_t const *)fd_type_pun_const( buf );
227 :
228 0 : ulong dest_cnt = buf_sz;
229 0 : if( FD_UNLIKELY( dest_cnt >= MAX_REPAIR_PEERS ) ) {
230 0 : FD_LOG_WARNING(( "Cluster nodes had %lu destinations, which was more than the max of %lu", dest_cnt, MAX_REPAIR_PEERS ));
231 0 : return;
232 0 : }
233 :
234 : /* Stop adding peers after we reach the peer max, but we may want to
235 : consider an eviction policy. */
236 0 : for( ulong i=0UL; i<dest_cnt; i++ ) {
237 0 : if( FD_UNLIKELY( ctx->repair->peer_cnt >= FD_ACTIVE_KEY_MAX ) ) break;// FIXME: aiming to move all peer tracking out of lib into tile, leaving like this for now
238 0 : fd_repair_peer_addr_t repair_peer = {
239 0 : .addr = in_dests[i].ip4_addr,
240 0 : .port = fd_ushort_bswap( in_dests[i].udp_port ),
241 0 : };
242 0 : int dup = fd_repair_add_active_peer( ctx->repair, &repair_peer, in_dests[i].pubkey );
243 0 : if( !dup ) {
244 0 : ulong hash_src = 0xfffffUL & fd_ulong_hash( (ulong)in_dests[i].ip4_addr | ((ulong)repair_peer.port<<32) );
245 0 : FD_LOG_INFO(( "Added repair peer: pubkey %s hash_src %lu", FD_BASE58_ENC_32_ALLOCA(in_dests[i].pubkey), hash_src ));
246 0 : }
247 0 : }
248 0 : }
249 :
250 : ulong
251 : fd_repair_handle_ping( fd_repair_tile_ctx_t * repair_tile_ctx,
252 : fd_repair_t * glob,
253 : fd_gossip_ping_t const * ping,
254 : fd_gossip_peer_addr_t const * peer_addr FD_PARAM_UNUSED,
255 : uint self_ip4_addr FD_PARAM_UNUSED,
256 : uchar * msg_buf,
257 0 : ulong msg_buf_sz ) {
258 0 : fd_repair_protocol_t protocol;
259 0 : fd_repair_protocol_new_disc(&protocol, fd_repair_protocol_enum_pong);
260 0 : fd_gossip_ping_t * pong = &protocol.inner.pong;
261 :
262 0 : pong->from = *glob->public_key;
263 :
264 : /* Generate response hash token */
265 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
266 0 : memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
267 0 : memcpy( pre_image+16UL, ping->token.uc, 32UL);
268 :
269 : /* Generate response hash token */
270 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, &pong->token );
271 :
272 : /* Sign it */
273 0 : repair_signer( repair_tile_ctx, pong->signature.uc, pre_image, FD_PING_PRE_IMAGE_SZ, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519 );
274 :
275 0 : fd_bincode_encode_ctx_t ctx;
276 0 : ctx.data = msg_buf;
277 0 : ctx.dataend = msg_buf + msg_buf_sz;
278 0 : FD_TEST(0 == fd_repair_protocol_encode(&protocol, &ctx));
279 0 : ulong buflen = (ulong)((uchar*)ctx.data - msg_buf);
280 0 : return buflen;
281 0 : }
282 :
283 : /* Pass a raw client response packet into the protocol. addr is the address of the sender */
284 : static int
285 : fd_repair_recv_clnt_packet( fd_repair_tile_ctx_t * repair_tile_ctx,
286 : fd_stem_context_t * stem,
287 : fd_repair_t * glob,
288 : uchar const * msg,
289 : ulong msglen,
290 : fd_repair_peer_addr_t const * src_addr,
291 0 : uint dst_ip4_addr ) {
292 0 : glob->metrics.recv_clnt_pkt++;
293 :
294 0 : FD_SCRATCH_SCOPE_BEGIN {
295 0 : while( 1 ) {
296 0 : ulong decoded_sz;
297 0 : fd_repair_response_t * gmsg = fd_bincode_decode1_scratch(
298 0 : repair_response, msg, msglen, NULL, &decoded_sz );
299 0 : if( FD_UNLIKELY( !gmsg ) ) {
300 : /* Solana falls back to assuming we got a shred in this case
301 : https://github.com/solana-labs/solana/blob/master/core/src/repair/serve_repair.rs#L1198 */
302 0 : break;
303 0 : }
304 0 : if( FD_UNLIKELY( decoded_sz != msglen ) ) {
305 0 : break;
306 0 : }
307 :
308 0 : switch( gmsg->discriminant ) {
309 0 : case fd_repair_response_enum_ping:
310 0 : {
311 0 : uchar buf[1024];
312 0 : ulong buflen = fd_repair_handle_ping( repair_tile_ctx, glob, &gmsg->inner.ping, src_addr, dst_ip4_addr, buf, sizeof(buf) );
313 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
314 0 : send_packet( repair_tile_ctx, stem, 1, src_addr->addr, src_addr->port, dst_ip4_addr, buf, buflen, tsorig );
315 0 : break;
316 0 : }
317 0 : }
318 :
319 0 : return 0;
320 0 : }
321 0 : } FD_SCRATCH_SCOPE_END;
322 0 : return 0;
323 0 : }
324 :
325 : static ulong
326 : fd_repair_sign_and_send( fd_repair_tile_ctx_t * repair_tile_ctx,
327 : fd_repair_protocol_t * protocol,
328 : fd_gossip_peer_addr_t * addr FD_PARAM_UNUSED,
329 : uchar * buf,
330 0 : ulong buflen ) {
331 :
332 0 : FD_TEST( buflen >= 1024UL );
333 0 : fd_bincode_encode_ctx_t ctx = { .data = buf, .dataend = buf + buflen };
334 0 : if( FD_UNLIKELY( fd_repair_protocol_encode( protocol, &ctx ) != FD_BINCODE_SUCCESS ) ) {
335 0 : FD_LOG_CRIT(( "Failed to encode repair message (type %#x)", protocol->discriminant ));
336 0 : }
337 :
338 0 : buflen = (ulong)ctx.data - (ulong)buf;
339 0 : if( FD_UNLIKELY( buflen<68 ) ) {
340 0 : FD_LOG_CRIT(( "Attempted to sign unsigned repair message type (type %#x)", protocol->discriminant ));
341 0 : }
342 :
343 : /* At this point buffer contains
344 :
345 : [ discriminant ] [ signature ] [ payload ]
346 : ^ ^ ^
347 : 0 4 68 */
348 :
349 : /* https://github.com/solana-labs/solana/blob/master/core/src/repair/serve_repair.rs#L1258 */
350 :
351 0 : fd_memcpy( buf+64, buf, 4 );
352 0 : buf += 64UL;
353 0 : buflen -= 64UL;
354 :
355 : /* Now it contains
356 :
357 : [ discriminant ] [ payload ]
358 : ^ ^
359 : buf buf+4 */
360 :
361 0 : fd_signature_t sig;
362 0 : repair_signer( repair_tile_ctx, sig.uc, buf, buflen, FD_KEYGUARD_SIGN_TYPE_ED25519 );
363 :
364 : /* Reintroduce the signature */
365 :
366 0 : buf -= 64UL;
367 0 : buflen += 64UL;
368 0 : fd_memcpy( buf + 4U, &sig, 64U );
369 :
370 0 : return buflen;
371 0 : }
372 :
373 :
374 : static void
375 : fd_repair_send_request( fd_repair_tile_ctx_t * repair_tile_ctx,
376 : fd_stem_context_t * stem,
377 : fd_repair_t * glob,
378 : enum fd_needed_elem_type type,
379 : ulong slot,
380 : uint shred_index,
381 : fd_pubkey_t const * recipient,
382 0 : long now ) {
383 :
384 : /* Send requests starting where we left off last time. i.e. if n < current_nonce, seek forward */
385 : /* Track statistics */
386 0 : fd_repair_protocol_t protocol;
387 0 : fd_repair_construct_request_protocol( glob, &protocol, type, slot, shred_index, recipient, glob->next_nonce, now );
388 0 : glob->next_nonce++;
389 0 : fd_active_elem_t * active = fd_active_table_query( glob->actives, recipient, NULL );
390 :
391 0 : active->avg_reqs++;
392 0 : glob->metrics.send_pkt_cnt++;
393 :
394 0 : uchar buf[1024];
395 0 : ulong buflen = fd_repair_sign_and_send( repair_tile_ctx, &protocol, &active->addr, buf, sizeof(buf) );
396 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
397 0 : uint src_ip4_addr = 0U; /* unknown */
398 0 : send_packet( repair_tile_ctx, stem, 1, active->addr.addr, active->addr.port, src_ip4_addr, buf, buflen, tsorig );
399 0 : }
400 :
401 : static void
402 : fd_repair_send_requests( fd_repair_tile_ctx_t * ctx,
403 : fd_stem_context_t * stem,
404 : enum fd_needed_elem_type type,
405 : ulong slot,
406 : uint shred_index,
407 0 : long now ){
408 0 : fd_repair_t * glob = ctx->repair;
409 :
410 0 : for( uint i=0; i<FD_REPAIR_NUM_NEEDED_PEERS; i++ ) {
411 0 : fd_pubkey_t const * id = &glob->peers[ glob->peer_idx++ ].key;
412 0 : fd_repair_send_request( ctx, stem, glob, type, slot, shred_index, id, now );
413 0 : if( FD_UNLIKELY( glob->peer_idx >= glob->peer_cnt ) ) glob->peer_idx = 0; /* wrap around */
414 0 : }
415 0 : }
416 :
417 :
418 : static inline int
419 : before_frag( fd_repair_tile_ctx_t * ctx,
420 : ulong in_idx,
421 : ulong seq FD_PARAM_UNUSED,
422 0 : ulong sig ) {
423 0 : uint in_kind = ctx->in_kind[ in_idx ];
424 0 : if( FD_LIKELY( in_kind==IN_KIND_NET ) ) return fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR;
425 0 : return 0;
426 0 : }
427 :
428 : static void
429 : during_frag( fd_repair_tile_ctx_t * ctx,
430 : ulong in_idx,
431 : ulong seq FD_PARAM_UNUSED,
432 : ulong sig FD_PARAM_UNUSED,
433 : ulong chunk,
434 : ulong sz,
435 0 : ulong ctl ) {
436 0 : ctx->skip_frag = 0;
437 :
438 0 : uchar const * dcache_entry;
439 0 : ulong dcache_entry_sz;
440 :
441 : // TODO: check for sz>MTU for failure once MTUs are decided
442 0 : uint in_kind = ctx->in_kind[ in_idx ];
443 0 : fd_repair_in_ctx_t const * in_ctx = &ctx->in_links[ in_idx ];
444 0 : if( FD_LIKELY( in_kind==IN_KIND_NET ) ) {
445 0 : dcache_entry = fd_net_rx_translate_frag( &in_ctx->net_rx, chunk, ctl, sz );
446 0 : dcache_entry_sz = sz;
447 :
448 0 : } else if( FD_UNLIKELY( in_kind==IN_KIND_CONTACT ) ) {
449 0 : if( FD_UNLIKELY( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark ) ) {
450 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in_ctx->chunk0, in_ctx->wmark ));
451 0 : }
452 0 : dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
453 0 : dcache_entry_sz = sz * sizeof(fd_shred_dest_wire_t);
454 :
455 0 : } else if( FD_UNLIKELY( in_kind==IN_KIND_STAKE ) ) {
456 0 : if( FD_UNLIKELY( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark ) ) {
457 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in_ctx->chunk0, in_ctx->wmark ));
458 0 : }
459 0 : dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
460 0 : fd_stake_weight_msg_t const * msg = fd_type_pun_const( dcache_entry );
461 0 : fd_repair_set_stake_weights_init( ctx->repair, msg->weights, msg->staked_cnt );
462 0 : return;
463 :
464 0 : } else if( FD_LIKELY( in_kind==IN_KIND_SHRED ) ) {
465 0 : if( FD_UNLIKELY( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark ) ) {
466 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in_ctx->chunk0, in_ctx->wmark ));
467 0 : }
468 0 : dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
469 0 : dcache_entry_sz = sz;
470 :
471 0 : } else {
472 0 : FD_LOG_ERR(( "Frag from unknown link (kind=%u in_idx=%lu)", in_kind, in_idx ));
473 0 : }
474 :
475 0 : fd_memcpy( ctx->buffer, dcache_entry, dcache_entry_sz );
476 0 : }
477 :
478 : static void
479 : after_frag( fd_repair_tile_ctx_t * ctx,
480 : ulong in_idx,
481 : ulong seq FD_PARAM_UNUSED,
482 : ulong sig FD_PARAM_UNUSED,
483 : ulong sz,
484 : ulong tsorig FD_PARAM_UNUSED,
485 : ulong tspub FD_PARAM_UNUSED,
486 0 : fd_stem_context_t * stem ) {
487 :
488 0 : if( FD_UNLIKELY( ctx->skip_frag ) ) return;
489 :
490 0 : uint in_kind = ctx->in_kind[ in_idx ];
491 0 : if( FD_UNLIKELY( in_kind==IN_KIND_CONTACT ) ) {
492 0 : handle_new_cluster_contact_info( ctx, ctx->buffer, sz );
493 0 : return;
494 0 : }
495 :
496 0 : if( FD_UNLIKELY( in_kind==IN_KIND_STAKE ) ) {
497 0 : fd_repair_set_stake_weights_fini( ctx->repair );
498 0 : return;
499 0 : }
500 :
501 0 : if( FD_UNLIKELY( in_kind==IN_KIND_SHRED ) ) {
502 :
503 : /* Initialize the forest, which requires the root to be ready. This
504 : must be the case if we have received a frag from shred, because
505 : shred requires stake weights, which implies a genesis or snapshot
506 : slot has been loaded. */
507 :
508 0 : ulong wmark = fd_fseq_query( ctx->wmark );
509 0 : if( FD_UNLIKELY( fd_forest_root_slot( ctx->forest ) == ULONG_MAX ) ) {
510 0 : FD_LOG_NOTICE(( "Forest initializing with root %lu", wmark ));
511 0 : fd_forest_init( ctx->forest, wmark );
512 0 : fd_hash_t mr = { 0 }; /* FIXME */
513 0 : fd_fec_chainer_init( ctx->fec_chainer, wmark, &mr );
514 0 : FD_TEST( fd_forest_root_slot( ctx->forest ) != ULONG_MAX );
515 0 : ctx->prev_wmark = wmark;
516 0 : }
517 :
518 0 : if( FD_UNLIKELY( ctx->prev_wmark < wmark ) ) {
519 0 : fd_forest_publish( ctx->forest, wmark );
520 0 : fd_fec_chainer_publish( ctx->fec_chainer, wmark );
521 0 : ctx->prev_wmark = wmark;
522 : // invalidate our repair iterator
523 0 : ctx->repair_iter = fd_forest_iter_init( ctx->forest );
524 0 : }
525 :
526 0 : fd_shred_t * shred = (fd_shred_t *)fd_type_pun( ctx->buffer );
527 0 : if( FD_UNLIKELY( shred->slot <= fd_forest_root_slot( ctx->forest ) ) ) {
528 : // FD_LOG_WARNING(( "shred %lu %u %u too old, ignoring", shred->slot, shred->idx, shred->fec_set_idx ));
529 0 : return;
530 0 : };
531 :
532 : /* Update turbine_slot0 and turbine_slot. */
533 :
534 0 : if( FD_UNLIKELY( fd_fseq_query( ctx->turbine_slot0 )==ULONG_MAX ) ) {
535 0 : fd_fseq_update( ctx->turbine_slot0, shred->slot );
536 0 : FD_LOG_NOTICE(("First turbine slot %lu", shred->slot));
537 0 : }
538 0 : fd_fseq_update( ctx->turbine_slot, fd_ulong_max( shred->slot, fd_fseq_query( ctx->turbine_slot ) ) );
539 0 : if( FD_UNLIKELY( shred->slot <= fd_forest_root_slot( ctx->forest ) ) ) return; /* shred too old */
540 :
541 : /* TODO add automated caught-up test */
542 :
543 : /* Insert the shred sig (shared by all shred members in the FEC set)
544 : into the map. */
545 :
546 0 : fd_fec_sig_t * fec_sig = fd_fec_sig_query( ctx->fec_sigs, (shred->slot << 32) | shred->fec_set_idx, NULL );
547 0 : if( FD_UNLIKELY( !fec_sig ) ) {
548 0 : fec_sig = fd_fec_sig_insert( ctx->fec_sigs, (shred->slot << 32) | shred->fec_set_idx );
549 0 : memcpy( fec_sig->sig, shred->signature, sizeof(fd_ed25519_sig_t) );
550 0 : }
551 :
552 : /* When this is a FEC completes msg, it is implied that all the
553 : other shreds in the FEC set can also be inserted. Shred inserts
554 : into the forest are idempotent so it is fine to insert the same
555 : shred multiple times. */
556 :
557 0 : if( FD_UNLIKELY( sz == FD_SHRED_DATA_HEADER_SZ + sizeof(fd_hash_t) + sizeof(fd_hash_t) ) ) {
558 0 : fd_forest_ele_t * ele = NULL;
559 0 : for( uint idx = shred->fec_set_idx; idx <= shred->idx; idx++ ) {
560 0 : ele = fd_forest_data_shred_insert( ctx->forest, shred->slot, shred->data.parent_off, idx, shred->fec_set_idx, 0, 0 );
561 0 : }
562 0 : FD_TEST( ele ); /* must be non-empty */
563 0 : fd_forest_ele_idxs_insert( ele->cmpl, shred->fec_set_idx );
564 :
565 0 : fd_hash_t const * merkle_root = (fd_hash_t const *)fd_type_pun_const( ctx->buffer + FD_SHRED_DATA_HEADER_SZ );
566 0 : fd_hash_t const * chained_root = (fd_hash_t const *)fd_type_pun_const( ctx->buffer + FD_SHRED_DATA_HEADER_SZ + sizeof(fd_hash_t) );
567 :
568 0 : int data_complete = !!( shred->data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE );
569 0 : int slot_complete = !!( shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE );
570 :
571 0 : FD_TEST( fd_fec_pool_free( ctx->fec_chainer->pool ) );
572 0 : FD_TEST( !fd_fec_chainer_query( ctx->fec_chainer, shred->slot, shred->fec_set_idx ) );
573 0 : FD_TEST( fd_fec_chainer_insert( ctx->fec_chainer, shred->slot, shred->fec_set_idx, (ushort)(shred->idx - shred->fec_set_idx + 1), data_complete, slot_complete, shred->data.parent_off, merkle_root, chained_root ) );
574 0 : }
575 :
576 : /* Insert the shred into the map. */
577 :
578 0 : int is_code = fd_shred_is_code( fd_shred_type( shred->variant ) );
579 0 : if( FD_LIKELY( !is_code ) ) {
580 0 : fd_repair_inflight_remove( ctx->repair, shred->slot, shred->idx );
581 :
582 0 : int data_complete = !!(shred->data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE);
583 0 : int slot_complete = !!(shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE);
584 0 : fd_forest_ele_t * ele = fd_forest_data_shred_insert( ctx->forest, shred->slot, shred->data.parent_off, shred->idx, shred->fec_set_idx, data_complete, slot_complete );
585 :
586 : /* Check if there are FECs to force complete. Algorithm: window
587 : through the idxs in interval [i, j). If j = next fec_set_idx
588 : then we know we can force complete the FEC set interval [i, j)
589 : (assuming it wasn't already completed based on `cmpl`). */
590 :
591 0 : uint i = 0;
592 0 : for( uint j = 1; j < ele->buffered_idx + 1; j++ ) { /* TODO iterate by word */
593 0 : if( FD_UNLIKELY( fd_forest_ele_idxs_test( ele->cmpl, i ) && fd_forest_ele_idxs_test( ele->fecs, j ) ) ) {
594 0 : i = j;
595 0 : } else if( FD_UNLIKELY( fd_forest_ele_idxs_test( ele->fecs, j ) || j == ele->complete_idx ) ) {
596 0 : if ( j == ele->complete_idx ) j++;
597 0 : fd_forest_ele_idxs_insert( ele->cmpl, i );
598 :
599 : /* Find the shred tile owning this FEC set. */
600 :
601 0 : fd_fec_sig_t * fec_sig = fd_fec_sig_query( ctx->fec_sigs, (shred->slot << 32) | i, NULL );
602 :
603 0 : ulong sig = fd_ulong_load_8( fec_sig->sig );
604 0 : ulong tile_idx = sig % ctx->shred_tile_cnt;
605 0 : uint last_idx = j - i - 1;
606 :
607 0 : uchar * chunk = fd_chunk_to_laddr( ctx->shred_out_ctx[tile_idx].mem, ctx->shred_out_ctx[tile_idx].chunk );
608 0 : memcpy( chunk, fec_sig->sig, sizeof(fd_ed25519_sig_t) );
609 0 : fd_stem_publish( stem, ctx->shred_out_ctx[tile_idx].idx, last_idx, ctx->shred_out_ctx[tile_idx].chunk, sizeof(fd_ed25519_sig_t), 0UL, 0UL, 0UL );
610 0 : ctx->shred_out_ctx[tile_idx].chunk = fd_dcache_compact_next( ctx->shred_out_ctx[tile_idx].chunk, sizeof(fd_ed25519_sig_t), ctx->shred_out_ctx[tile_idx].chunk0, ctx->shred_out_ctx[tile_idx].wmark );
611 0 : i = j;
612 0 : } else {
613 : // FD_LOG_NOTICE(( "not a fec boundary %lu %u", ele->slot, j ));
614 0 : }
615 0 : }
616 0 : }
617 0 : return;
618 0 : }
619 :
620 0 : fd_eth_hdr_t const * eth = (fd_eth_hdr_t const *)ctx->buffer;
621 0 : fd_ip4_hdr_t const * ip4 = (fd_ip4_hdr_t const *)( (ulong)eth + sizeof(fd_eth_hdr_t) );
622 0 : fd_udp_hdr_t const * udp = (fd_udp_hdr_t const *)( (ulong)ip4 + FD_IP4_GET_LEN( *ip4 ) );
623 0 : uchar * data = (uchar *)( (ulong)udp + sizeof(fd_udp_hdr_t) );
624 0 : if( FD_UNLIKELY( (ulong)udp+sizeof(fd_udp_hdr_t) > (ulong)eth+sz ) ) return;
625 0 : ulong udp_sz = fd_ushort_bswap( udp->net_len );
626 0 : if( FD_UNLIKELY( udp_sz<sizeof(fd_udp_hdr_t) ) ) return;
627 0 : ulong data_sz = udp_sz-sizeof(fd_udp_hdr_t);
628 0 : if( FD_UNLIKELY( (ulong)data+data_sz > (ulong)eth+sz ) ) return;
629 :
630 0 : fd_gossip_peer_addr_t peer_addr = { .addr=ip4->saddr, .port=udp->net_sport };
631 0 : ushort dport = udp->net_dport;
632 0 : if( ctx->repair_intake_addr.port == dport ) {
633 0 : fd_repair_recv_clnt_packet( ctx, stem, ctx->repair, data, data_sz, &peer_addr, ip4->daddr );
634 0 : } else if( ctx->repair_serve_addr.port == dport ) {
635 0 : } else {
636 0 : FD_LOG_WARNING(( "Unexpectedly received packet for port %u", (uint)fd_ushort_bswap( dport ) ));
637 0 : }
638 0 : }
639 :
640 0 : #define MAX_REQ_PER_CREDIT 1
641 :
642 : static inline void
643 : after_credit( fd_repair_tile_ctx_t * ctx,
644 : fd_stem_context_t * stem,
645 : int * opt_poll_in FD_PARAM_UNUSED,
646 0 : int * charge_busy ) {
647 :
648 0 : if( FD_LIKELY( !fd_fec_out_empty( ctx->fec_chainer->out ) && ctx->store ) ) {
649 :
650 0 : fd_fec_out_t out = fd_fec_out_pop_head( ctx->fec_chainer->out );
651 0 : fd_hash_t * cmr = &out.chained_root;
652 0 : if( FD_UNLIKELY( ctx->store->slot0==(out.slot - out.parent_off) ) ) {
653 :
654 : /* FIXME This is a hack to handle the fact the `block_id` field is
655 : not available in the snapshot manifest, which is the chained
656 : merkle root of the very first FEC after the snapshot slot. */
657 :
658 0 : fd_hash_t null = { 0 };
659 0 : memcpy( cmr, &null, sizeof(fd_hash_t) );
660 0 : }
661 :
662 : /* Linking only requires a shared lock because the fields that are
663 : modified are only read on publish which uses exclusive lock. */
664 0 : long shacq_start, shacq_end, shrel_end;
665 :
666 0 : FD_STORE_SHACQ_TIMED( ctx->store, shacq_start, shacq_end );
667 0 : if( FD_UNLIKELY( !fd_store_link( ctx->store, &out.merkle_root, &out.chained_root ) ) ) FD_LOG_WARNING(( "failed to link %s %s. slot %lu fec_set_idx %u", FD_BASE58_ENC_32_ALLOCA( &out.merkle_root ), FD_BASE58_ENC_32_ALLOCA( &out.chained_root ), out.slot, out.fec_set_idx ));
668 0 : FD_STORE_SHREL_TIMED( ctx->store, shrel_end );
669 :
670 0 : memcpy( fd_chunk_to_laddr( ctx->replay_out_mem, ctx->replay_out_chunk ), &out, sizeof(fd_fec_out_t) );
671 0 : ulong sig = out.slot << 32 | out.fec_set_idx;
672 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
673 0 : fd_stem_publish( stem, REPLAY_OUT_IDX, sig, ctx->replay_out_chunk, sizeof(fd_fec_out_t), 0, 0, tspub );
674 0 : ctx->replay_out_chunk = fd_dcache_compact_next( ctx->replay_out_chunk, sizeof(fd_fec_out_t), ctx->replay_out_chunk0, ctx->replay_out_wmark );
675 :
676 0 : fd_histf_sample( ctx->repair->metrics.store_link_wait, (ulong)fd_long_max(shacq_end - shacq_start, 0) );
677 0 : fd_histf_sample( ctx->repair->metrics.store_link_work, (ulong)fd_long_max(shrel_end - shacq_end, 0) );
678 :
679 0 : if( FD_UNLIKELY( ctx->shredcap_enabled ) ) {
680 0 : uchar * chunk = fd_chunk_to_laddr( ctx->shredcap_out_mem, ctx->shredcap_out_chunk );
681 0 : ulong sz = 0;
682 :
683 0 : memcpy( chunk + sz, &out.merkle_root, sizeof(fd_hash_t) );
684 0 : sz += sizeof(fd_hash_t);
685 :
686 0 : fd_store_shacq( ctx->store );
687 :
688 0 : fd_store_fec_t const * fec = fd_store_query_const( ctx->store, &out.merkle_root );
689 :
690 0 : memcpy( chunk + sz, &fec->data_sz, sizeof(ulong) );
691 0 : sz += sizeof(ulong);
692 :
693 0 : memcpy( chunk + sz, fec->data, fec->data_sz );
694 0 : sz += fec->data_sz;
695 :
696 0 : fd_store_shrel( ctx->store );
697 :
698 0 : memcpy( chunk + sz, &out, sizeof(fd_fec_out_t) );
699 0 : sz += sizeof(fd_fec_out_t);
700 :
701 0 : fd_stem_publish( stem, ctx->shredcap_out_idx, sz, ctx->shredcap_out_chunk, sz, 0, 0, tspub );
702 0 : ctx->shredcap_out_chunk = fd_dcache_compact_next( ctx->shredcap_out_chunk, sz, ctx->shredcap_out_chunk0, ctx->shredcap_out_wmark );
703 0 : }
704 :
705 : /* We might have more reassembled FEC sets to deliver to the
706 : downstream consumer, so prioritize that over sending out repairs
707 : (which will only increase the number of buffered to send.) */
708 :
709 : /* FIXME instead of draining the chainer, only skip the rest of
710 : after_credit and after_frag when the chainer pool is full.
711 : requires a refactor to the chainer and topology. */
712 :
713 0 : *opt_poll_in = 0; *charge_busy = 1; return;
714 0 : }
715 :
716 0 : if( FD_UNLIKELY( ctx->forest->root==ULONG_MAX ) ) return;
717 0 : if( FD_UNLIKELY( ctx->repair->peer_cnt==0 ) ) return; /* no peers to send requests to */
718 :
719 0 : *charge_busy = 1;
720 :
721 0 : long now = fd_log_wallclock();
722 :
723 : #if MAX_REQ_PER_CREDIT > FD_REPAIR_NUM_NEEDED_PEERS
724 : /* If the requests are > 1 per credit then we need to starve
725 : after_credit for after_frag to get the chance to be called. We could
726 : get rid of this all together considering max requests per credit is
727 : 1 currently, but it could be useful for benchmarking purposes in the
728 : future. */
729 : if( FD_UNLIKELY( now - ctx->tsrepair < (long)20e6 ) ) {
730 : return;
731 : }
732 : ctx->tsrepair = now;
733 : #endif
734 :
735 0 : fd_forest_t * forest = ctx->forest;
736 0 : fd_forest_ele_t * pool = fd_forest_pool( forest );
737 0 : fd_forest_orphaned_t * orphaned = fd_forest_orphaned( forest );
738 :
739 : // Always request orphans
740 :
741 0 : int total_req = 0;
742 0 : for( fd_forest_orphaned_iter_t iter = fd_forest_orphaned_iter_init( orphaned, pool );
743 0 : !fd_forest_orphaned_iter_done( iter, orphaned, pool );
744 0 : iter = fd_forest_orphaned_iter_next( iter, orphaned, pool ) ) {
745 0 : fd_forest_ele_t * orphan = fd_forest_orphaned_iter_ele( iter, orphaned, pool );
746 0 : if( fd_repair_need_orphan( ctx->repair, orphan->slot ) ) {
747 0 : fd_repair_send_requests( ctx, stem, fd_needed_orphan, orphan->slot, UINT_MAX, now );
748 0 : total_req += FD_REPAIR_NUM_NEEDED_PEERS;
749 0 : }
750 0 : }
751 :
752 0 : if( FD_UNLIKELY( total_req >= MAX_REQ_PER_CREDIT ) ) {
753 0 : fd_repair_continue( ctx->repair );
754 0 : return; /* we have already sent enough requests */
755 0 : }
756 :
757 : // Travel down frontier
758 :
759 : /* Every so often we'll need to reset the frontier iterator to the
760 : head of frontier, because we could end up traversing down a very
761 : long tree if we are far behind. */
762 :
763 0 : if( FD_UNLIKELY( now - ctx->tsreset > (long)40e6 ) ) {
764 : // reset iterator to the beginning of the forest frontier
765 0 : ctx->repair_iter = fd_forest_iter_init( ctx->forest );
766 0 : ctx->tsreset = now;
767 0 : }
768 :
769 : /* We are at the head of the turbine, so we should give turbine the
770 : chance to complete the shreds. !ele handles an edgecase where all
771 : frontier are fully complete and the iter is done */
772 :
773 0 : fd_forest_ele_t const * ele = fd_forest_pool_ele_const( pool, ctx->repair_iter.ele_idx );
774 0 : if( FD_LIKELY( !ele || ( ele->slot == fd_fseq_query( ctx->turbine_slot ) && ( now - ctx->tsreset ) < (long)30e6 ) ) ){
775 0 : return;
776 0 : }
777 :
778 0 : while( total_req < MAX_REQ_PER_CREDIT ){
779 0 : ele = fd_forest_pool_ele_const( pool, ctx->repair_iter.ele_idx );
780 : // Request first, advance iterator second.
781 0 : if( ctx->repair_iter.shred_idx == UINT_MAX && fd_repair_need_highest_window_index( ctx->repair, ele->slot, 0 ) ){
782 0 : fd_repair_send_requests( ctx, stem, fd_needed_highest_window_index, ele->slot, 0, now );
783 0 : total_req += FD_REPAIR_NUM_NEEDED_PEERS;
784 0 : } else if( fd_repair_need_window_index( ctx->repair, ele->slot, ctx->repair_iter.shred_idx ) ) {
785 0 : fd_repair_send_requests( ctx, stem, fd_needed_window_index, ele->slot, ctx->repair_iter.shred_idx, now );
786 0 : total_req += FD_REPAIR_NUM_NEEDED_PEERS;
787 0 : }
788 :
789 0 : ctx->repair_iter = fd_forest_iter_next( ctx->repair_iter, forest );
790 :
791 0 : if( FD_UNLIKELY( fd_forest_iter_done( ctx->repair_iter, forest ) ) ) {
792 : /* No more elements in the forest frontier, or the iterator got
793 : invalidated, so we can start from top again. */
794 0 : ctx->repair_iter = fd_forest_iter_init( forest );
795 0 : break;
796 0 : }
797 0 : }
798 :
799 0 : fd_repair_continue( ctx->repair );
800 0 : }
801 :
802 : static inline void
803 0 : during_housekeeping( fd_repair_tile_ctx_t * ctx ) {
804 0 : fd_repair_settime( ctx->repair, fd_log_wallclock() );
805 :
806 0 : long now = fd_log_wallclock();
807 0 : if( FD_UNLIKELY( now - ctx->tsprint > (long)1e9 ) ) {
808 0 : fd_forest_print( ctx->forest );
809 0 : ctx->tsprint = fd_log_wallclock();
810 0 : }
811 0 : }
812 :
813 : static void
814 : privileged_init( fd_topo_t * topo,
815 0 : fd_topo_tile_t * tile ) {
816 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
817 :
818 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
819 0 : fd_repair_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_repair_tile_ctx_t), sizeof(fd_repair_tile_ctx_t) );
820 0 : fd_memset( ctx, 0, sizeof(fd_repair_tile_ctx_t) );
821 :
822 0 : uchar const * identity_key = fd_keyload_load( tile->repair.identity_key_path, /* pubkey only: */ 0 );
823 0 : fd_memcpy( ctx->identity_private_key, identity_key, sizeof(fd_pubkey_t) );
824 0 : fd_memcpy( ctx->identity_public_key.uc, identity_key + 32UL, sizeof(fd_pubkey_t) );
825 :
826 0 : ctx->repair_config.private_key = ctx->identity_private_key;
827 0 : ctx->repair_config.public_key = &ctx->identity_public_key;
828 0 : ctx->repair_config.good_peer_cache_file_fd = -1;
829 :
830 0 : FD_TEST( fd_rng_secure( &ctx->repair_seed, sizeof(ulong) ) );
831 0 : }
832 :
833 : static void
834 : unprivileged_init( fd_topo_t * topo,
835 0 : fd_topo_tile_t * tile ) {
836 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
837 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
838 0 : fd_repair_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_repair_tile_ctx_t), sizeof(fd_repair_tile_ctx_t) );
839 0 : ctx->tsprint = fd_log_wallclock();
840 0 : ctx->tsrepair = fd_log_wallclock();
841 0 : ctx->tsreset = fd_log_wallclock();
842 :
843 0 : if( FD_UNLIKELY( tile->in_cnt > MAX_IN_LINKS ) ) FD_LOG_ERR(( "repair tile has too many input links" ));
844 :
845 0 : uint sign_link_in_idx = UINT_MAX;
846 0 : for( uint in_idx=0U; in_idx<(tile->in_cnt); in_idx++ ) {
847 0 : fd_topo_link_t * link = &topo->links[ tile->in_link_id[ in_idx ] ];
848 0 : if( 0==strcmp( link->name, "net_repair" ) ) {
849 0 : ctx->in_kind[ in_idx ] = IN_KIND_NET;
850 0 : fd_net_rx_bounds_init( &ctx->in_links[ in_idx ].net_rx, link->dcache );
851 0 : continue;
852 0 : } else if( 0==strcmp( link->name, "gossip_repai" ) ) {
853 0 : ctx->in_kind[ in_idx ] = IN_KIND_CONTACT;
854 0 : } else if( 0==strcmp( link->name, "stake_out" ) ) {
855 0 : ctx->in_kind[ in_idx ] = IN_KIND_STAKE;
856 0 : } else if( 0==strcmp( link->name, "shred_repair" ) ) {
857 0 : ctx->in_kind[ in_idx ] = IN_KIND_SHRED;
858 0 : } else if( 0==strcmp( link->name, "sign_repair" ) ) {
859 0 : ctx->in_kind[ in_idx ] = IN_KIND_SIGN;
860 0 : sign_link_in_idx = in_idx;
861 0 : } else {
862 0 : FD_LOG_ERR(( "repair tile has unexpected input link %s", link->name ));
863 0 : }
864 :
865 0 : ctx->in_links[ in_idx ].mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
866 0 : ctx->in_links[ in_idx ].chunk0 = fd_dcache_compact_chunk0( ctx->in_links[ in_idx ].mem, link->dcache );
867 0 : ctx->in_links[ in_idx ].wmark = fd_dcache_compact_wmark ( ctx->in_links[ in_idx ].mem, link->dcache, link->mtu );
868 0 : ctx->in_links[ in_idx ].mtu = link->mtu;
869 0 : FD_TEST( fd_dcache_compact_is_safe( ctx->in_links[in_idx].mem, link->dcache, link->mtu, link->depth ) );
870 0 : }
871 0 : if( FD_UNLIKELY( sign_link_in_idx==UINT_MAX ) ) FD_LOG_ERR(( "Missing sign_repair link" ));
872 :
873 0 : uint net_link_out_idx = UINT_MAX;
874 0 : uint sign_link_out_idx = UINT_MAX;
875 0 : uint shred_tile_idx = 0;
876 0 : for( uint out_idx=0U; out_idx<(tile->out_cnt); out_idx++ ) {
877 0 : fd_topo_link_t * link = &topo->links[ tile->out_link_id[ out_idx ] ];
878 :
879 0 : if( 0==strcmp( link->name, "repair_net" ) ) {
880 :
881 0 : if( net_link_out_idx!=UINT_MAX ) continue; /* only use first net link */
882 0 : net_link_out_idx = out_idx;
883 0 : ctx->net_out_mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
884 0 : ctx->net_out_chunk0 = fd_dcache_compact_chunk0( ctx->net_out_mem, link->dcache );
885 0 : ctx->net_out_wmark = fd_dcache_compact_wmark( ctx->net_out_mem, link->dcache, link->mtu );
886 0 : ctx->net_out_chunk = ctx->net_out_chunk0;
887 :
888 0 : } else if( 0==strcmp( link->name, "repair_sign" ) ) {
889 :
890 0 : sign_link_out_idx = out_idx;
891 :
892 0 : } else if( 0==strcmp( link->name, "repair_repla" ) ) {
893 :
894 0 : ctx->replay_out_mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
895 0 : ctx->replay_out_chunk0 = fd_dcache_compact_chunk0( ctx->replay_out_mem, link->dcache );
896 0 : ctx->replay_out_wmark = fd_dcache_compact_wmark( ctx->replay_out_mem, link->dcache, link->mtu );
897 0 : ctx->replay_out_chunk = ctx->replay_out_chunk0;
898 :
899 0 : } else if( 0==strcmp( link->name, "repair_shred" ) ) {
900 :
901 0 : fd_repair_out_ctx_t * shred_out = &ctx->shred_out_ctx[ shred_tile_idx++ ];
902 0 : shred_out->idx = out_idx;
903 0 : shred_out->mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
904 0 : shred_out->chunk0 = fd_dcache_compact_chunk0( shred_out->mem, link->dcache );
905 0 : shred_out->wmark = fd_dcache_compact_wmark( shred_out->mem, link->dcache, link->mtu );
906 0 : shred_out->chunk = shred_out->chunk0;
907 :
908 0 : } else if( 0==strcmp( link->name, "repair_scap" ) ) {
909 :
910 0 : ctx->shredcap_enabled = 1;
911 0 : ctx->shredcap_out_idx = out_idx;
912 0 : ctx->shredcap_out_mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
913 0 : ctx->shredcap_out_chunk0 = fd_dcache_compact_chunk0( ctx->shredcap_out_mem, link->dcache );
914 0 : ctx->shredcap_out_wmark = fd_dcache_compact_wmark( ctx->shredcap_out_mem, link->dcache, link->mtu );
915 0 : ctx->shredcap_out_chunk = ctx->shredcap_out_chunk0;
916 :
917 0 : } else {
918 0 : FD_LOG_ERR(( "repair tile has unexpected output link %s", link->name ));
919 0 : }
920 :
921 0 : }
922 0 : if( FD_UNLIKELY( sign_link_out_idx==UINT_MAX ) ) FD_LOG_ERR(( "Missing repair_sign link" ));
923 0 : if( FD_UNLIKELY( net_link_out_idx ==UINT_MAX ) ) FD_LOG_ERR(( "Missing repair_net link" ));
924 0 : ctx->shred_tile_cnt = shred_tile_idx;
925 0 : FD_TEST( ctx->shred_tile_cnt == fd_topo_tile_name_cnt( topo, "shred" ) );
926 :
927 : /* Scratch mem setup */
928 :
929 0 : ctx->repair = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_align(), fd_repair_footprint() );
930 0 : ctx->forest = FD_SCRATCH_ALLOC_APPEND( l, fd_forest_align(), fd_forest_footprint( tile->repair.slot_max ) );
931 0 : ctx->fec_sigs = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_sig_align(), fd_fec_sig_footprint( 20 ) );
932 0 : ctx->reasm = FD_SCRATCH_ALLOC_APPEND( l, fd_reasm_align(), fd_reasm_footprint( 20 ) );
933 : // ctx->fec_repair = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_repair_align(), fd_fec_repair_footprint( ( 1<<20 ), tile->repair.shred_tile_cnt ) );
934 : /* Look at fec_repair.h for an explanation of this fec_max. */
935 :
936 0 : ctx->fec_chainer = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_chainer_align(), fd_fec_chainer_footprint( 1 << 20 ) );
937 :
938 0 : ctx->store = NULL;
939 0 : ulong store_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "store" );
940 0 : if( FD_LIKELY( store_obj_id!=ULONG_MAX ) ) { /* firedancer-only */
941 0 : ctx->store = fd_store_join( fd_topo_obj_laddr( topo, store_obj_id ) );
942 0 : FD_TEST( ctx->store->magic == FD_STORE_MAGIC );
943 0 : }
944 :
945 0 : void * smem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( FD_REPAIR_SCRATCH_MAX ) );
946 0 : void * fmem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( FD_REPAIR_SCRATCH_DEPTH ) );
947 :
948 0 : FD_TEST( ( !!smem ) & ( !!fmem ) );
949 0 : fd_scratch_attach( smem, fmem, FD_REPAIR_SCRATCH_MAX, FD_REPAIR_SCRATCH_DEPTH );
950 :
951 0 : ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
952 :
953 0 : ctx->repair_intake_addr.port = fd_ushort_bswap( tile->repair.repair_intake_listen_port );
954 0 : ctx->repair_serve_addr.port = fd_ushort_bswap( tile->repair.repair_serve_listen_port );
955 :
956 0 : ctx->repair_intake_listen_port = tile->repair.repair_intake_listen_port;
957 0 : ctx->repair_serve_listen_port = tile->repair.repair_serve_listen_port;
958 :
959 0 : ctx->net_id = (ushort)0;
960 :
961 0 : fd_ip4_udp_hdr_init( ctx->intake_hdr, FD_REPAIR_MAX_PACKET_SIZE, 0, ctx->repair_intake_listen_port );
962 0 : fd_ip4_udp_hdr_init( ctx->serve_hdr, FD_REPAIR_MAX_PACKET_SIZE, 0, ctx->repair_serve_listen_port );
963 :
964 : /* Keyguard setup */
965 0 : fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id[ sign_link_in_idx ] ];
966 0 : fd_topo_link_t * sign_out = &topo->links[ tile->out_link_id[ sign_link_out_idx ] ];
967 0 : if( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client,
968 0 : sign_out->mcache,
969 0 : sign_out->dcache,
970 0 : sign_in->mcache,
971 0 : sign_in->dcache ) ) == NULL ) {
972 0 : FD_LOG_ERR(( "Keyguard join failed" ));
973 0 : }
974 :
975 0 : FD_LOG_NOTICE(( "repair starting" ));
976 :
977 : /* Repair set up */
978 :
979 0 : ctx->repair = fd_repair_join( fd_repair_new( ctx->repair, ctx->repair_seed ) );
980 0 : ctx->forest = fd_forest_join( fd_forest_new( ctx->forest, tile->repair.slot_max, ctx->repair_seed ) );
981 : // ctx->fec_repair = fd_fec_repair_join( fd_fec_repair_new( ctx->fec_repair, ( tile->repair.max_pending_shred_sets + 2 ), tile->repair.shred_tile_cnt, 0 ) );
982 0 : ctx->fec_sigs = fd_fec_sig_join( fd_fec_sig_new( ctx->fec_sigs, 20 ) );
983 0 : ctx->reasm = fd_reasm_join( fd_reasm_new( ctx->reasm, 20 ) );
984 0 : ctx->fec_chainer = fd_fec_chainer_join( fd_fec_chainer_new( ctx->fec_chainer, 1 << 20, 0 ) );
985 0 : ctx->repair_iter = fd_forest_iter_init( ctx->forest );
986 0 : FD_TEST( fd_forest_iter_done( ctx->repair_iter, ctx->forest ) );
987 :
988 : /**********************************************************************/
989 : /* turbine_slot fseq */
990 : /**********************************************************************/
991 :
992 0 : ulong turbine_slot0_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "turbine_slot0" );
993 0 : FD_TEST( turbine_slot0_obj_id!=ULONG_MAX );
994 0 : ctx->turbine_slot0 = fd_fseq_join( fd_topo_obj_laddr( topo, turbine_slot0_obj_id ) );
995 0 : FD_TEST( ctx->turbine_slot0 );
996 0 : FD_TEST( fd_fseq_query( ctx->turbine_slot0 )==ULONG_MAX );
997 :
998 0 : ulong turbine_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "turbine_slot" );
999 0 : FD_TEST( turbine_slot_obj_id!=ULONG_MAX );
1000 0 : ctx->turbine_slot = fd_fseq_join( fd_topo_obj_laddr( topo, turbine_slot_obj_id ) );
1001 0 : FD_TEST( ctx->turbine_slot );
1002 0 : fd_fseq_update( ctx->turbine_slot, 0UL );
1003 :
1004 0 : FD_LOG_NOTICE(( "repair my addr - intake addr: " FD_IP4_ADDR_FMT ":%u, serve_addr: " FD_IP4_ADDR_FMT ":%u",
1005 0 : FD_IP4_ADDR_FMT_ARGS( ctx->repair_intake_addr.addr ), fd_ushort_bswap( ctx->repair_intake_addr.port ),
1006 0 : FD_IP4_ADDR_FMT_ARGS( ctx->repair_serve_addr.addr ), fd_ushort_bswap( ctx->repair_serve_addr.port ) ));
1007 :
1008 0 : ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
1009 0 : FD_TEST( root_slot_obj_id!=ULONG_MAX );
1010 0 : ctx->wmark = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
1011 0 : if( FD_UNLIKELY( !ctx->wmark ) ) FD_LOG_ERR(( "replay tile has no root_slot fseq" ));
1012 0 : ctx->prev_wmark = fd_fseq_query( ctx->wmark );
1013 :
1014 0 : if( fd_repair_set_config( ctx->repair, &ctx->repair_config ) ) {
1015 0 : FD_LOG_ERR( ( "error setting repair config" ) );
1016 0 : }
1017 :
1018 0 : fd_repair_update_addr( ctx->repair, &ctx->repair_intake_addr, &ctx->repair_serve_addr );
1019 :
1020 0 : fd_histf_join( fd_histf_new( ctx->repair->metrics.store_link_wait, FD_MHIST_SECONDS_MIN( REPAIR, STORE_LINK_WAIT ),
1021 0 : FD_MHIST_SECONDS_MAX( REPAIR, STORE_LINK_WAIT ) ) );
1022 0 : fd_histf_join( fd_histf_new( ctx->repair->metrics.store_link_work, FD_MHIST_SECONDS_MIN( REPAIR, STORE_LINK_WORK ),
1023 0 : FD_MHIST_SECONDS_MAX( REPAIR, STORE_LINK_WORK ) ) );
1024 :
1025 0 : fd_repair_settime( ctx->repair, fd_log_wallclock() );
1026 0 : fd_repair_start( ctx->repair );
1027 :
1028 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
1029 0 : if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
1030 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
1031 0 : }
1032 :
1033 : static ulong
1034 : populate_allowed_seccomp( fd_topo_t const * topo FD_PARAM_UNUSED,
1035 : fd_topo_tile_t const * tile,
1036 : ulong out_cnt,
1037 0 : struct sock_filter * out ) {
1038 0 : populate_sock_filter_policy_fd_repair_tile(
1039 0 : out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)tile->repair.good_peer_cache_file_fd );
1040 0 : return sock_filter_policy_fd_repair_tile_instr_cnt;
1041 0 : }
1042 :
1043 : static ulong
1044 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
1045 : fd_topo_tile_t const * tile,
1046 : ulong out_fds_cnt,
1047 0 : int * out_fds ) {
1048 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
1049 :
1050 0 : ulong out_cnt = 0UL;
1051 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
1052 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
1053 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
1054 0 : if( FD_LIKELY( -1!=tile->repair.good_peer_cache_file_fd ) )
1055 0 : out_fds[ out_cnt++ ] = tile->repair.good_peer_cache_file_fd; /* good peer cache file */
1056 0 : return out_cnt;
1057 0 : }
1058 :
1059 : static inline void
1060 0 : metrics_write( fd_repair_tile_ctx_t * ctx ) {
1061 : /* Repair-protocol-specific metrics */
1062 0 : fd_repair_metrics_t * metrics = fd_repair_get_metrics( ctx->repair );
1063 0 : FD_MCNT_SET( REPAIR, RECV_CLNT_PKT, metrics->recv_clnt_pkt );
1064 0 : FD_MCNT_SET( REPAIR, RECV_SERV_PKT, metrics->recv_serv_pkt );
1065 0 : FD_MCNT_SET( REPAIR, RECV_SERV_CORRUPT_PKT, metrics->recv_serv_corrupt_pkt );
1066 0 : FD_MCNT_SET( REPAIR, RECV_SERV_INVALID_SIGNATURE, metrics->recv_serv_invalid_signature );
1067 0 : FD_MCNT_SET( REPAIR, RECV_SERV_FULL_PING_TABLE, metrics->recv_serv_full_ping_table );
1068 0 : FD_MCNT_ENUM_COPY( REPAIR, RECV_SERV_PKT_TYPES, metrics->recv_serv_pkt_types );
1069 0 : FD_MCNT_SET( REPAIR, RECV_PKT_CORRUPTED_MSG, metrics->recv_pkt_corrupted_msg );
1070 0 : FD_MCNT_SET( REPAIR, SEND_PKT_CNT, metrics->send_pkt_cnt );
1071 0 : FD_MCNT_ENUM_COPY( REPAIR, SENT_PKT_TYPES, metrics->sent_pkt_types );
1072 0 : FD_MHIST_COPY( REPAIR, STORE_LINK_WAIT, metrics->store_link_wait );
1073 0 : FD_MHIST_COPY( REPAIR, STORE_LINK_WORK, metrics->store_link_work );
1074 0 : }
1075 :
1076 : /* TODO: This is not correct, but is temporary and will be fixed
1077 : when the new store is implemented allowing the burst to be increased.
1078 : The burst should be bounded by the number of stem_publishes that
1079 : occur in a single frag loop. */
1080 0 : #define STEM_BURST (64UL)
1081 :
1082 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_repair_tile_ctx_t
1083 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_repair_tile_ctx_t)
1084 :
1085 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
1086 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
1087 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
1088 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
1089 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
1090 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
1091 :
1092 : #include "../../disco/stem/fd_stem.c"
1093 :
1094 : fd_topo_run_tile_t fd_tile_repair = {
1095 : .name = "repair",
1096 : .loose_footprint = loose_footprint,
1097 : .populate_allowed_seccomp = populate_allowed_seccomp,
1098 : .populate_allowed_fds = populate_allowed_fds,
1099 : .scratch_align = scratch_align,
1100 : .scratch_footprint = scratch_footprint,
1101 : .unprivileged_init = unprivileged_init,
1102 : .privileged_init = privileged_init,
1103 : .run = stem_run,
1104 : };
|