Line data Source code
1 : /* Repair tile runs the repair protocol for a Firedancer node. */
2 : #include "fd_fec_chainer.h"
3 : #define _GNU_SOURCE
4 :
5 : #include "../../disco/topo/fd_topo.h"
6 : #include "generated/fd_repair_tile_seccomp.h"
7 :
8 : #include "../../flamenco/repair/fd_repair.h"
9 : #include "../../flamenco/runtime/fd_blockstore.h"
10 : #include "../../flamenco/leaders/fd_leaders_base.h"
11 : #include "../../disco/fd_disco.h"
12 : #include "../../disco/keyguard/fd_keyload.h"
13 : #include "../../disco/keyguard/fd_keyguard_client.h"
14 : #include "../../disco/keyguard/fd_keyguard.h"
15 : #include "../../disco/net/fd_net_tile.h"
16 : #include "../../discof/replay/fd_exec.h"
17 : #include "../../util/pod/fd_pod_format.h"
18 : #include "../../choreo/fd_choreo_base.h"
19 : #include "../../util/net/fd_net_headers.h"
20 :
21 : #include "../forest/fd_forest.h"
22 : #include "fd_fec_repair.h"
23 : #include "fd_fec_chainer.h"
24 :
25 : #include <errno.h>
26 :
27 0 : #define IN_KIND_NET (0)
28 0 : #define IN_KIND_CONTACT (1)
29 0 : #define IN_KIND_STAKE (2)
30 0 : #define IN_KIND_SHRED (3)
31 0 : #define IN_KIND_SIGN (4)
32 : #define MAX_IN_LINKS (16)
33 :
34 : #define NET_OUT_IDX (0)
35 : #define SIGN_OUT_IDX (1)
36 0 : #define REPLAY_OUT_IDX (2)
37 : #define ARCHIVE_OUT_IDX (3)
38 :
39 : #define MAX_REPAIR_PEERS 40200UL
40 : #define MAX_BUFFER_SIZE ( MAX_REPAIR_PEERS * sizeof(fd_shred_dest_wire_t))
41 : #define MAX_SHRED_TILE_CNT (16UL)
42 :
43 : typedef union {
44 : struct {
45 : fd_wksp_t * mem;
46 : ulong chunk0;
47 : ulong wmark;
48 : ulong mtu;
49 : };
50 : fd_net_rx_bounds_t net_rx;
51 : } fd_repair_in_ctx_t;
52 :
53 : struct fd_repair_out_ctx {
54 : ulong idx;
55 : fd_wksp_t * mem;
56 : ulong chunk0;
57 : ulong wmark;
58 : ulong chunk;
59 : };
60 : typedef struct fd_repair_out_ctx fd_repair_out_ctx_t;
61 :
62 : struct fd_fec_sig {
63 : ulong key; /* map key. 32 msb = slot, 32 lsb = fec_set_idx */
64 : fd_ed25519_sig_t sig; /* Ed25519 sig identifier of the FEC. */
65 : };
66 : typedef struct fd_fec_sig fd_fec_sig_t;
67 :
68 : #define MAP_NAME fd_fec_sig
69 0 : #define MAP_T fd_fec_sig_t
70 : #define MAP_MEMOIZE 0
71 : #include "../../util/tmpl/fd_map_dynamic.c"
72 :
73 : struct fd_reasm {
74 : ulong slot;
75 : uint cnt;
76 : };
77 : typedef struct fd_reasm fd_reasm_t;
78 :
79 : #define MAP_NAME fd_reasm
80 0 : #define MAP_T fd_reasm_t
81 0 : #define MAP_KEY slot
82 : #define MAP_MEMOIZE 0
83 : #include "../../util/tmpl/fd_map_dynamic.c"
84 :
85 : struct fd_repair_tile_ctx {
86 : long tsprint; /* timestamp for printing */
87 : long tsrepair; /* timestamp for repair */
88 : long tsreset; /* timestamp for resetting iterator */
89 : ulong * wmark;
90 : ulong prev_wmark;
91 :
92 : fd_repair_t * repair;
93 : fd_repair_config_t repair_config;
94 :
95 : ulong repair_seed;
96 :
97 : fd_repair_peer_addr_t repair_intake_addr;
98 : fd_repair_peer_addr_t repair_serve_addr;
99 :
100 : ushort repair_intake_listen_port;
101 : ushort repair_serve_listen_port;
102 :
103 : fd_forest_t * forest;
104 : fd_fec_sig_t * fec_sigs;
105 : fd_reasm_t * reasm;
106 : fd_fec_chainer_t * fec_chainer;
107 : fd_forest_iter_t repair_iter;
108 :
109 : ulong * turbine_slot0;
110 : ulong * turbine_slot;
111 :
112 : uchar identity_private_key[ 32 ];
113 : fd_pubkey_t identity_public_key;
114 :
115 : fd_wksp_t * wksp;
116 :
117 : uchar in_kind[ MAX_IN_LINKS ];
118 : fd_repair_in_ctx_t in_links[ MAX_IN_LINKS ];
119 :
120 : int skip_frag;
121 :
122 : fd_frag_meta_t * net_out_mcache;
123 : ulong * net_out_sync;
124 : ulong net_out_depth;
125 : ulong net_out_seq;
126 :
127 : fd_wksp_t * net_out_mem;
128 : ulong net_out_chunk0;
129 : ulong net_out_wmark;
130 : ulong net_out_chunk;
131 :
132 : fd_wksp_t * replay_out_mem;
133 : ulong replay_out_chunk0;
134 : ulong replay_out_wmark;
135 : ulong replay_out_chunk;
136 :
137 : uint shred_tile_cnt;
138 : fd_repair_out_ctx_t shred_out_ctx[ MAX_SHRED_TILE_CNT ];
139 :
140 : ushort net_id;
141 : /* Includes Ethernet, IP, UDP headers */
142 : uchar buffer[ MAX_BUFFER_SIZE ];
143 : fd_ip4_udp_hdrs_t intake_hdr[1];
144 : fd_ip4_udp_hdrs_t serve_hdr [1];
145 :
146 : fd_stem_context_t * stem;
147 :
148 : fd_wksp_t * blockstore_wksp;
149 : fd_blockstore_t blockstore_ljoin;
150 : fd_blockstore_t * blockstore;
151 :
152 : fd_keyguard_client_t keyguard_client[1];
153 : };
154 : typedef struct fd_repair_tile_ctx fd_repair_tile_ctx_t;
155 :
156 : FD_FN_CONST static inline ulong
157 0 : scratch_align( void ) {
158 0 : return 128UL;
159 0 : }
160 :
161 : FD_FN_PURE static inline ulong
162 0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
163 0 : return 1UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
164 0 : }
165 :
166 : FD_FN_PURE static inline ulong
167 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
168 :
169 0 : ulong l = FD_LAYOUT_INIT;
170 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_repair_tile_ctx_t), sizeof(fd_repair_tile_ctx_t) );
171 0 : l = FD_LAYOUT_APPEND( l, fd_repair_align(), fd_repair_footprint() );
172 0 : l = FD_LAYOUT_APPEND( l, fd_forest_align(), fd_forest_footprint( tile->repair.slot_max ) );
173 0 : l = FD_LAYOUT_APPEND( l, fd_fec_sig_align(), fd_fec_sig_footprint( 20 ) );
174 0 : l = FD_LAYOUT_APPEND( l, fd_reasm_align(), fd_reasm_footprint( 20 ) );
175 : // l = FD_LAYOUT_APPEND( l, fd_fec_repair_align(), fd_fec_repair_footprint( ( 1<<20 ), tile->repair.shred_tile_cnt ) );
176 0 : l = FD_LAYOUT_APPEND( l, fd_fec_chainer_align(), fd_fec_chainer_footprint( 1 << 20 ) ); // TODO: fix this
177 0 : l = FD_LAYOUT_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( FD_REPAIR_SCRATCH_MAX ) );
178 0 : l = FD_LAYOUT_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( FD_REPAIR_SCRATCH_DEPTH ) );
179 0 : return FD_LAYOUT_FINI( l, scratch_align() );
180 0 : }
181 :
182 : static void
183 : repair_signer( void * signer_ctx,
184 : uchar signature[ static 64 ],
185 : uchar const * buffer,
186 : ulong len,
187 0 : int sign_type ) {
188 0 : fd_repair_tile_ctx_t * ctx = (fd_repair_tile_ctx_t *) signer_ctx;
189 0 : fd_keyguard_client_sign( ctx->keyguard_client, signature, buffer, len, sign_type );
190 0 : }
191 :
192 : static void
193 : send_packet( fd_repair_tile_ctx_t * ctx,
194 : int is_intake,
195 : uint dst_ip_addr,
196 : ushort dst_port,
197 : uint src_ip_addr,
198 : uchar const * payload,
199 : ulong payload_sz,
200 0 : ulong tsorig ) {
201 0 : uchar * packet = fd_chunk_to_laddr( ctx->net_out_mem, ctx->net_out_chunk );
202 0 : fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)packet;
203 0 : *hdr = *(is_intake ? ctx->intake_hdr : ctx->serve_hdr);
204 :
205 0 : fd_ip4_hdr_t * ip4 = hdr->ip4;
206 0 : ip4->saddr = src_ip_addr;
207 0 : ip4->daddr = dst_ip_addr;
208 0 : ip4->net_id = fd_ushort_bswap( ctx->net_id++ );
209 0 : ip4->check = 0U;
210 0 : ip4->net_tot_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_ip4_hdr_t)+sizeof(fd_udp_hdr_t)) );
211 0 : ip4->check = fd_ip4_hdr_check_fast( ip4 );
212 :
213 0 : fd_udp_hdr_t * udp = hdr->udp;
214 0 : udp->net_dport = dst_port;
215 0 : udp->net_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_udp_hdr_t)) );
216 0 : fd_memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), payload, payload_sz );
217 0 : hdr->udp->check = 0U;
218 :
219 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
220 0 : ulong sig = fd_disco_netmux_sig( dst_ip_addr, dst_port, dst_ip_addr, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) );
221 0 : ulong packet_sz = payload_sz + sizeof(fd_ip4_udp_hdrs_t);
222 0 : fd_mcache_publish( ctx->net_out_mcache, ctx->net_out_depth, ctx->net_out_seq, sig, ctx->net_out_chunk, packet_sz, 0UL, tsorig, tspub );
223 0 : ctx->net_out_seq = fd_seq_inc( ctx->net_out_seq, 1UL );
224 0 : ctx->net_out_chunk = fd_dcache_compact_next( ctx->net_out_chunk, packet_sz, ctx->net_out_chunk0, ctx->net_out_wmark );
225 0 : }
226 :
227 : static inline void
228 : handle_new_cluster_contact_info( fd_repair_tile_ctx_t * ctx,
229 : uchar const * buf,
230 0 : ulong buf_sz ) {
231 0 : fd_shred_dest_wire_t const * in_dests = (fd_shred_dest_wire_t const *)fd_type_pun_const( buf );
232 :
233 0 : ulong dest_cnt = buf_sz;
234 0 : if( FD_UNLIKELY( dest_cnt >= MAX_REPAIR_PEERS ) ) {
235 0 : FD_LOG_WARNING(( "Cluster nodes had %lu destinations, which was more than the max of %lu", dest_cnt, MAX_REPAIR_PEERS ));
236 0 : return;
237 0 : }
238 :
239 : /* Stop adding peers after we reach the peer max, but we may want to
240 : consider an eviction policy. */
241 0 : for( ulong i=0UL; i<dest_cnt; i++ ) {
242 0 : if( FD_UNLIKELY( ctx->repair->peer_cnt >= FD_ACTIVE_KEY_MAX ) ) break;// FIXME: aiming to move all peer tracking out of lib into tile, leaving like this for now
243 0 : fd_repair_peer_addr_t repair_peer = {
244 0 : .addr = in_dests[i].ip4_addr,
245 0 : .port = fd_ushort_bswap( in_dests[i].udp_port ),
246 0 : };
247 0 : int dup = fd_repair_add_active_peer( ctx->repair, &repair_peer, in_dests[i].pubkey );
248 0 : if( !dup ) {
249 0 : ulong hash_src = 0xfffffUL & fd_ulong_hash( (ulong)in_dests[i].ip4_addr | ((ulong)repair_peer.port<<32) );
250 0 : FD_LOG_INFO(( "Added repair peer: pubkey %s hash_src %lu", FD_BASE58_ENC_32_ALLOCA(in_dests[i].pubkey), hash_src ));
251 0 : }
252 0 : }
253 0 : }
254 :
255 : ulong
256 : fd_repair_handle_ping( fd_repair_tile_ctx_t * repair_tile_ctx,
257 : fd_repair_t * glob,
258 : fd_gossip_ping_t const * ping,
259 : fd_gossip_peer_addr_t const * peer_addr FD_PARAM_UNUSED,
260 : uint self_ip4_addr FD_PARAM_UNUSED,
261 : uchar * msg_buf,
262 0 : ulong msg_buf_sz ) {
263 0 : fd_repair_protocol_t protocol;
264 0 : fd_repair_protocol_new_disc(&protocol, fd_repair_protocol_enum_pong);
265 0 : fd_gossip_ping_t * pong = &protocol.inner.pong;
266 :
267 0 : pong->from = *glob->public_key;
268 :
269 : /* Generate response hash token */
270 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
271 0 : memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
272 0 : memcpy( pre_image+16UL, ping->token.uc, 32UL);
273 :
274 : /* Generate response hash token */
275 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, &pong->token );
276 :
277 : /* Sign it */
278 0 : repair_signer( repair_tile_ctx, pong->signature.uc, pre_image, FD_PING_PRE_IMAGE_SZ, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519 );
279 :
280 0 : fd_bincode_encode_ctx_t ctx;
281 0 : ctx.data = msg_buf;
282 0 : ctx.dataend = msg_buf + msg_buf_sz;
283 0 : FD_TEST(0 == fd_repair_protocol_encode(&protocol, &ctx));
284 0 : ulong buflen = (ulong)((uchar*)ctx.data - msg_buf);
285 0 : return buflen;
286 0 : }
287 :
288 : /* Pass a raw client response packet into the protocol. addr is the address of the sender */
289 : static int
290 : fd_repair_recv_clnt_packet( fd_repair_tile_ctx_t * repair_tile_ctx,
291 : fd_repair_t * glob,
292 : uchar const * msg,
293 : ulong msglen,
294 : fd_repair_peer_addr_t const * src_addr,
295 0 : uint dst_ip4_addr ) {
296 0 : glob->metrics.recv_clnt_pkt++;
297 :
298 0 : FD_SCRATCH_SCOPE_BEGIN {
299 0 : while( 1 ) {
300 0 : ulong decoded_sz;
301 0 : fd_repair_response_t * gmsg = fd_bincode_decode1_scratch(
302 0 : repair_response, msg, msglen, NULL, &decoded_sz );
303 0 : if( FD_UNLIKELY( !gmsg ) ) {
304 : /* Solana falls back to assuming we got a shred in this case
305 : https://github.com/solana-labs/solana/blob/master/core/src/repair/serve_repair.rs#L1198 */
306 0 : break;
307 0 : }
308 0 : if( FD_UNLIKELY( decoded_sz != msglen ) ) {
309 0 : break;
310 0 : }
311 :
312 0 : switch( gmsg->discriminant ) {
313 0 : case fd_repair_response_enum_ping:
314 0 : {
315 0 : uchar buf[1024];
316 0 : ulong buflen = fd_repair_handle_ping( repair_tile_ctx, glob, &gmsg->inner.ping, src_addr, dst_ip4_addr, buf, sizeof(buf) );
317 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
318 0 : send_packet( repair_tile_ctx, 1, src_addr->addr, src_addr->port, dst_ip4_addr, buf, buflen, tsorig );
319 0 : break;
320 0 : }
321 0 : }
322 :
323 0 : return 0;
324 0 : }
325 0 : } FD_SCRATCH_SCOPE_END;
326 0 : return 0;
327 0 : }
328 :
329 : static ulong
330 : fd_repair_sign_and_send( fd_repair_tile_ctx_t * repair_tile_ctx,
331 : fd_repair_protocol_t * protocol,
332 : fd_gossip_peer_addr_t * addr FD_PARAM_UNUSED,
333 : uchar * buf,
334 0 : ulong buflen ) {
335 :
336 0 : FD_TEST( buflen >= 1024UL );
337 0 : fd_bincode_encode_ctx_t ctx = { .data = buf, .dataend = buf + buflen };
338 0 : if( FD_UNLIKELY( fd_repair_protocol_encode( protocol, &ctx ) != FD_BINCODE_SUCCESS ) ) {
339 0 : FD_LOG_CRIT(( "Failed to encode repair message (type %#x)", protocol->discriminant ));
340 0 : }
341 :
342 0 : buflen = (ulong)ctx.data - (ulong)buf;
343 0 : if( FD_UNLIKELY( buflen<68 ) ) {
344 0 : FD_LOG_CRIT(( "Attempted to sign unsigned repair message type (type %#x)", protocol->discriminant ));
345 0 : }
346 :
347 : /* At this point buffer contains
348 :
349 : [ discriminant ] [ signature ] [ payload ]
350 : ^ ^ ^
351 : 0 4 68 */
352 :
353 : /* https://github.com/solana-labs/solana/blob/master/core/src/repair/serve_repair.rs#L1258 */
354 :
355 0 : fd_memcpy( buf+64, buf, 4 );
356 0 : buf += 64UL;
357 0 : buflen -= 64UL;
358 :
359 : /* Now it contains
360 :
361 : [ discriminant ] [ payload ]
362 : ^ ^
363 : buf buf+4 */
364 :
365 0 : fd_signature_t sig;
366 0 : repair_signer( repair_tile_ctx, sig.uc, buf, buflen, FD_KEYGUARD_SIGN_TYPE_ED25519 );
367 :
368 : /* Reintroduce the signature */
369 :
370 0 : buf -= 64UL;
371 0 : buflen += 64UL;
372 0 : fd_memcpy( buf + 4U, &sig, 64U );
373 :
374 0 : return buflen;
375 0 : }
376 :
377 :
378 : static void
379 : fd_repair_send_request( fd_repair_tile_ctx_t * repair_tile_ctx,
380 : fd_repair_t * glob,
381 : enum fd_needed_elem_type type,
382 : ulong slot,
383 : uint shred_index,
384 : fd_pubkey_t const * recipient,
385 0 : long now ) {
386 :
387 : /* Send requests starting where we left off last time. i.e. if n < current_nonce, seek forward */
388 : /* Track statistics */
389 0 : fd_repair_protocol_t protocol;
390 0 : fd_repair_construct_request_protocol( glob, &protocol, type, slot, shred_index, recipient, glob->next_nonce, now );
391 0 : glob->next_nonce++;
392 0 : fd_active_elem_t * active = fd_active_table_query( glob->actives, recipient, NULL );
393 :
394 0 : active->avg_reqs++;
395 0 : glob->metrics.send_pkt_cnt++;
396 :
397 0 : uchar buf[1024];
398 0 : ulong buflen = fd_repair_sign_and_send( repair_tile_ctx, &protocol, &active->addr, buf, sizeof(buf) );
399 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
400 0 : uint src_ip4_addr = 0U; /* unknown */
401 0 : send_packet( repair_tile_ctx, 1, active->addr.addr, active->addr.port, src_ip4_addr, buf, buflen, tsorig );
402 0 : }
403 :
404 : static void
405 : fd_repair_send_requests( fd_repair_tile_ctx_t * ctx,
406 : enum fd_needed_elem_type type,
407 : ulong slot,
408 : uint shred_index,
409 0 : long now ){
410 0 : fd_repair_t * glob = ctx->repair;
411 :
412 0 : for( uint i=0; i<FD_REPAIR_NUM_NEEDED_PEERS; i++ ) {
413 0 : fd_pubkey_t const * id = &glob->peers[ glob->peer_idx++ ].key;
414 0 : fd_repair_send_request( ctx, glob, type, slot, shred_index, id, now );
415 0 : if( FD_UNLIKELY( glob->peer_idx >= glob->peer_cnt ) ) glob->peer_idx = 0; /* wrap around */
416 0 : }
417 0 : }
418 :
419 :
420 : static inline int
421 : before_frag( fd_repair_tile_ctx_t * ctx,
422 : ulong in_idx,
423 : ulong seq FD_PARAM_UNUSED,
424 0 : ulong sig ) {
425 0 : uint in_kind = ctx->in_kind[ in_idx ];
426 0 : if( FD_LIKELY( in_kind==IN_KIND_NET ) ) return fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR;
427 0 : return 0;
428 0 : }
429 :
430 : static int
431 0 : is_fec_completes_msg( ulong sz ) {
432 0 : return sz == FD_SHRED_DATA_HEADER_SZ + FD_SHRED_MERKLE_ROOT_SZ;
433 0 : }
434 :
435 : static void
436 : during_frag( fd_repair_tile_ctx_t * ctx,
437 : ulong in_idx,
438 : ulong seq FD_PARAM_UNUSED,
439 : ulong sig FD_PARAM_UNUSED,
440 : ulong chunk,
441 : ulong sz,
442 0 : ulong ctl ) {
443 0 : ctx->skip_frag = 0;
444 :
445 0 : uchar const * dcache_entry;
446 0 : ulong dcache_entry_sz;
447 :
448 : // TODO: check for sz>MTU for failure once MTUs are decided
449 0 : uint in_kind = ctx->in_kind[ in_idx ];
450 0 : fd_repair_in_ctx_t const * in_ctx = &ctx->in_links[ in_idx ];
451 0 : if( FD_LIKELY( in_kind==IN_KIND_NET ) ) {
452 0 : dcache_entry = fd_net_rx_translate_frag( &in_ctx->net_rx, chunk, ctl, sz );
453 0 : dcache_entry_sz = sz;
454 :
455 0 : } else if( FD_UNLIKELY( in_kind==IN_KIND_CONTACT ) ) {
456 0 : if( FD_UNLIKELY( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark ) ) {
457 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in_ctx->chunk0, in_ctx->wmark ));
458 0 : }
459 0 : dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
460 0 : dcache_entry_sz = sz * sizeof(fd_shred_dest_wire_t);
461 :
462 0 : } else if( FD_UNLIKELY( in_kind==IN_KIND_STAKE ) ) {
463 0 : if( FD_UNLIKELY( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark ) ) {
464 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in_ctx->chunk0, in_ctx->wmark ));
465 0 : }
466 0 : dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
467 0 : fd_stake_weight_msg_t const * msg = fd_type_pun_const( dcache_entry );
468 0 : fd_repair_set_stake_weights_init( ctx->repair, msg->weights, msg->staked_cnt );
469 0 : return;
470 :
471 0 : } else if( FD_LIKELY( in_kind==IN_KIND_SHRED ) ) {
472 0 : if( FD_UNLIKELY( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark ) ) {
473 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in_ctx->chunk0, in_ctx->wmark ));
474 0 : }
475 0 : dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
476 0 : dcache_entry_sz = sz;
477 :
478 0 : } else {
479 0 : FD_LOG_ERR(( "Frag from unknown link (kind=%u in_idx=%lu)", in_kind, in_idx ));
480 0 : }
481 :
482 0 : fd_memcpy( ctx->buffer, dcache_entry, dcache_entry_sz );
483 0 : }
484 :
485 : static ulong
486 : fd_repair_send_ping( fd_repair_tile_ctx_t * repair_tile_ctx,
487 : fd_repair_t * glob,
488 : fd_pinged_elem_t * val,
489 : uchar * buf,
490 0 : ulong buflen ) {
491 0 : fd_repair_response_t gmsg;
492 0 : fd_repair_response_new_disc( &gmsg, fd_repair_response_enum_ping );
493 0 : fd_gossip_ping_t * ping = &gmsg.inner.ping;
494 0 : ping->from = *glob->public_key;
495 :
496 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
497 0 : memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
498 0 : memcpy( pre_image+16UL, val->token.uc, 32UL );
499 :
500 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, &ping->token );
501 :
502 0 : repair_signer( repair_tile_ctx, ping->signature.uc, pre_image, FD_PING_PRE_IMAGE_SZ, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519 );
503 :
504 0 : fd_bincode_encode_ctx_t ctx;
505 0 : FD_TEST( buflen >= 1024UL );
506 0 : ctx.data = buf;
507 0 : ctx.dataend = buf + buflen;
508 0 : FD_TEST(0 == fd_repair_response_encode(&gmsg, &ctx));
509 0 : return (ulong)((uchar*)ctx.data - buf);
510 0 : }
511 :
512 : static void
513 0 : fd_repair_recv_pong(fd_repair_t * glob, fd_gossip_ping_t const * pong, fd_gossip_peer_addr_t const * from) {
514 0 : fd_pinged_elem_t * val = fd_pinged_table_query(glob->pinged, from, NULL);
515 0 : if( val == NULL || !fd_pubkey_eq( &val->id, &pong->from ) )
516 0 : return;
517 :
518 : /* Verify response hash token */
519 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
520 0 : memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
521 0 : memcpy( pre_image+16UL, val->token.uc, 32UL );
522 :
523 0 : fd_hash_t pre_image_hash;
524 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, pre_image_hash.uc );
525 :
526 0 : fd_sha256_t sha[1];
527 0 : fd_sha256_init( sha );
528 0 : fd_sha256_append( sha, "SOLANA_PING_PONG", 16UL );
529 0 : fd_sha256_append( sha, pre_image_hash.uc, 32UL );
530 0 : fd_hash_t golden;
531 0 : fd_sha256_fini( sha, golden.uc );
532 :
533 0 : fd_sha512_t sha2[1];
534 0 : if( fd_ed25519_verify( /* msg */ golden.uc,
535 0 : /* sz */ 32U,
536 0 : /* sig */ pong->signature.uc,
537 0 : /* public_key */ pong->from.uc,
538 0 : sha2 )) {
539 0 : FD_LOG_WARNING(("Failed sig verify for pong"));
540 0 : return;
541 0 : }
542 :
543 0 : val->good = 1;
544 0 : }
545 :
546 :
547 : static long
548 : repair_get_shred( ulong slot,
549 : uint shred_idx,
550 : void * buf,
551 : ulong buf_max,
552 0 : void * arg ) {
553 0 : fd_repair_tile_ctx_t * ctx = (fd_repair_tile_ctx_t *)arg;
554 0 : fd_blockstore_t * blockstore = ctx->blockstore;
555 0 : if( FD_UNLIKELY( blockstore == NULL ) ) {
556 0 : return -1;
557 0 : }
558 :
559 0 : if( shred_idx == UINT_MAX ) {
560 0 : int err = FD_MAP_ERR_AGAIN;
561 0 : while( err == FD_MAP_ERR_AGAIN ) {
562 0 : fd_block_map_query_t query[1] = { 0 };
563 0 : err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
564 0 : fd_block_info_t * meta = fd_block_map_query_ele( query );
565 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return -1L;
566 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
567 0 : shred_idx = (uint)meta->slot_complete_idx;
568 0 : err = fd_block_map_query_test( query );
569 0 : }
570 0 : }
571 0 : long sz = fd_buf_shred_query_copy_data( blockstore, slot, shred_idx, buf, buf_max );
572 0 : return sz;
573 0 : }
574 :
575 : static ulong
576 : repair_get_parent( ulong slot,
577 0 : void * arg ) {
578 0 : fd_repair_tile_ctx_t * ctx = (fd_repair_tile_ctx_t *)arg;
579 0 : fd_blockstore_t * blockstore = ctx->blockstore;
580 0 : if( FD_UNLIKELY( blockstore == NULL ) ) {
581 0 : return FD_SLOT_NULL;
582 0 : }
583 0 : return fd_blockstore_parent_slot_query( blockstore, slot );
584 0 : }
585 :
586 :
587 : /* Pass a raw service request packet into the protocol.
588 : src_addr is the address of the sender
589 : dst_ip4_addr is the dst IPv4 address of the incoming packet (i.e. our IP) */
590 :
591 : static int
592 : fd_repair_recv_serv_packet( fd_repair_tile_ctx_t * repair_tile_ctx,
593 : fd_repair_t * glob,
594 : uchar * msg,
595 : ulong msglen,
596 : fd_repair_peer_addr_t const * peer_addr,
597 0 : uint self_ip4_addr ) {
598 : //ulong recv_serv_packet;
599 : //ulong recv_serv_pkt_types[FD_METRICS_ENUM_SENT_REQUEST_TYPES_CNT];
600 :
601 0 : FD_SCRATCH_SCOPE_BEGIN {
602 0 : ulong decoded_sz;
603 0 : fd_repair_protocol_t * protocol = fd_bincode_decode1_scratch(
604 0 : repair_protocol, msg, msglen, NULL, &decoded_sz );
605 0 : if( FD_UNLIKELY( !protocol ) ) {
606 0 : glob->metrics.recv_serv_corrupt_pkt++;
607 0 : FD_LOG_WARNING(( "Failed to decode repair request packet" ));
608 0 : return 0;
609 0 : }
610 :
611 0 : glob->metrics.recv_serv_pkt++;
612 :
613 0 : if( FD_UNLIKELY( decoded_sz != msglen ) ) {
614 0 : FD_LOG_WARNING(( "failed to decode repair request packet" ));
615 0 : return 0;
616 0 : }
617 :
618 0 : fd_repair_request_header_t * header;
619 0 : switch( protocol->discriminant ) {
620 0 : case fd_repair_protocol_enum_pong:
621 0 : glob->metrics.recv_serv_pkt_types[FD_METRICS_ENUM_REPAIR_SERV_PKT_TYPES_V_PONG_IDX]++;
622 0 : fd_repair_recv_pong( glob, &protocol->inner.pong, peer_addr );
623 :
624 0 : return 0;
625 0 : case fd_repair_protocol_enum_window_index: {
626 0 : glob->metrics.recv_serv_pkt_types[FD_METRICS_ENUM_REPAIR_SERV_PKT_TYPES_V_WINDOW_IDX]++;
627 0 : fd_repair_window_index_t * wi = &protocol->inner.window_index;
628 0 : header = &wi->header;
629 0 : break;
630 0 : }
631 0 : case fd_repair_protocol_enum_highest_window_index: {
632 0 : glob->metrics.recv_serv_pkt_types[FD_METRICS_ENUM_REPAIR_SERV_PKT_TYPES_V_HIGHEST_WINDOW_IDX]++;
633 0 : fd_repair_highest_window_index_t * wi = &protocol->inner.highest_window_index;
634 0 : header = &wi->header;
635 0 : break;
636 0 : }
637 0 : case fd_repair_protocol_enum_orphan: {
638 0 : glob->metrics.recv_serv_pkt_types[FD_METRICS_ENUM_REPAIR_SERV_PKT_TYPES_V_ORPHAN_IDX]++;
639 0 : fd_repair_orphan_t * wi = &protocol->inner.orphan;
640 0 : header = &wi->header;
641 0 : break;
642 0 : }
643 0 : default: {
644 0 : glob->metrics.recv_serv_pkt_types[FD_METRICS_ENUM_REPAIR_SERV_PKT_TYPES_V_UNKNOWN_IDX]++;
645 0 : FD_LOG_WARNING(( "received repair request of unknown type: %d", (int)protocol->discriminant ));
646 0 : return 0;
647 0 : }
648 0 : }
649 :
650 0 : if( FD_UNLIKELY( !fd_pubkey_eq( &header->recipient, glob->public_key ) ) ) {
651 0 : FD_LOG_WARNING(( "received repair request with wrong recipient, %s instead of %s", FD_BASE58_ENC_32_ALLOCA( header->recipient.uc ), FD_BASE58_ENC_32_ALLOCA( glob->public_key ) ));
652 0 : return 0;
653 0 : }
654 :
655 : /* Verify the signature */
656 0 : fd_sha512_t sha2[1];
657 0 : fd_signature_t sig;
658 0 : fd_memcpy( &sig, header->signature.uc, sizeof(sig) );
659 0 : fd_memcpy( (uchar *)msg + 64U, msg, 4U );
660 0 : if( fd_ed25519_verify( /* msg */ msg + 64U,
661 0 : /* sz */ msglen - 64U,
662 0 : /* sig */ sig.uc,
663 0 : /* public_key */ header->sender.uc,
664 0 : sha2 )) {
665 0 : glob->metrics.recv_serv_invalid_signature++;
666 0 : FD_LOG_WARNING(( "received repair request with with invalid signature" ));
667 0 : return 0;
668 0 : }
669 :
670 0 : fd_pinged_elem_t * val = fd_pinged_table_query( glob->pinged, peer_addr, NULL) ;
671 0 : if( val == NULL || !val->good || !fd_pubkey_eq( &val->id, &header->sender ) ) {
672 : /* Need to ping this client */
673 0 : if( val == NULL ) {
674 0 : if( fd_pinged_table_is_full( glob->pinged ) ) {
675 0 : FD_LOG_WARNING(( "pinged table is full" ));
676 :
677 0 : glob->metrics.recv_serv_full_ping_table++;
678 0 : return 0;
679 0 : }
680 0 : val = fd_pinged_table_insert( glob->pinged, peer_addr );
681 0 : for ( ulong i = 0; i < FD_HASH_FOOTPRINT / sizeof(ulong); i++ )
682 0 : val->token.ul[i] = fd_rng_ulong(glob->rng);
683 0 : }
684 0 : val->id = header->sender;
685 0 : val->good = 0;
686 0 : uchar buf[1024];
687 0 : ulong buflen = fd_repair_send_ping( repair_tile_ctx, glob, val, buf, sizeof(buf) );
688 0 : send_packet( repair_tile_ctx, 0, peer_addr->addr, peer_addr->port, self_ip4_addr, buf, buflen, fd_frag_meta_ts_comp( fd_tickcount() ) );
689 0 : } else {
690 0 : uchar buf[FD_SHRED_MAX_SZ + sizeof(uint)];
691 0 : switch( protocol->discriminant ) {
692 0 : case fd_repair_protocol_enum_window_index: {
693 0 : fd_repair_window_index_t const * wi = &protocol->inner.window_index;
694 0 : long sz = repair_get_shred( wi->slot, (uint)wi->shred_index, buf, FD_SHRED_MAX_SZ, repair_tile_ctx );
695 0 : if( sz < 0 ) break;
696 0 : *(uint *)(buf + sz) = wi->header.nonce;
697 0 : send_packet( repair_tile_ctx, 0, peer_addr->addr, peer_addr->port, self_ip4_addr, buf, (ulong)sz + sizeof(uint), fd_frag_meta_ts_comp( fd_tickcount() ) );
698 0 : break;
699 0 : }
700 :
701 0 : case fd_repair_protocol_enum_highest_window_index: {
702 0 : fd_repair_highest_window_index_t const * wi = &protocol->inner.highest_window_index;
703 0 : long sz = repair_get_shred( wi->slot, UINT_MAX, buf, FD_SHRED_MAX_SZ, repair_tile_ctx );
704 0 : if( sz < 0 ) break;
705 0 : *(uint *)(buf + sz) = wi->header.nonce;
706 0 : send_packet( repair_tile_ctx, 0, peer_addr->addr, peer_addr->port, self_ip4_addr, buf, (ulong)sz + sizeof(uint), fd_frag_meta_ts_comp( fd_tickcount() ) );
707 0 : break;
708 0 : }
709 :
710 0 : case fd_repair_protocol_enum_orphan: {
711 0 : fd_repair_orphan_t const * wi = &protocol->inner.orphan;
712 0 : ulong slot = wi->slot;
713 0 : for(unsigned i = 0; i < 10; ++i) {
714 0 : slot = repair_get_parent( slot, repair_tile_ctx );
715 : /* We cannot serve slots <= 1 since they are empy and created at genesis. */
716 0 : if( slot == FD_SLOT_NULL || slot <= 1UL ) break;
717 0 : long sz = repair_get_shred( slot, UINT_MAX, buf, FD_SHRED_MAX_SZ, repair_tile_ctx );
718 0 : if( sz < 0 ) continue;
719 0 : *(uint *)(buf + sz) = wi->header.nonce;
720 0 : send_packet( repair_tile_ctx, 0, peer_addr->addr, peer_addr->port, self_ip4_addr, buf, (ulong)sz + sizeof(uint), fd_frag_meta_ts_comp( fd_tickcount() ) );
721 0 : }
722 0 : break;
723 0 : }
724 :
725 0 : default:
726 0 : break;
727 0 : }
728 0 : }
729 :
730 :
731 0 : } FD_SCRATCH_SCOPE_END;
732 0 : return 0;
733 0 : }
734 :
735 : static void
736 : after_frag( fd_repair_tile_ctx_t * ctx,
737 : ulong in_idx,
738 : ulong seq FD_PARAM_UNUSED,
739 : ulong sig FD_PARAM_UNUSED,
740 : ulong sz,
741 : ulong tsorig,
742 : ulong tspub FD_PARAM_UNUSED,
743 0 : fd_stem_context_t * stem ) {
744 :
745 0 : if( FD_UNLIKELY( ctx->skip_frag ) ) return;
746 :
747 0 : ctx->stem = stem;
748 :
749 0 : uint in_kind = ctx->in_kind[ in_idx ];
750 0 : if( FD_UNLIKELY( in_kind==IN_KIND_CONTACT ) ) {
751 0 : handle_new_cluster_contact_info( ctx, ctx->buffer, sz );
752 0 : return;
753 0 : }
754 :
755 0 : if( FD_UNLIKELY( in_kind==IN_KIND_STAKE ) ) {
756 0 : fd_repair_set_stake_weights_fini( ctx->repair );
757 0 : return;
758 0 : }
759 :
760 0 : if( FD_UNLIKELY( in_kind==IN_KIND_SHRED ) ) {
761 :
762 : /* Initialize the forest, which requires the root to be ready. This
763 : must be the case if we have received a frag from shred, because
764 : shred requires stake weights, which implies a genesis or snapshot
765 : slot has been loaded. */
766 :
767 0 : ulong wmark = fd_fseq_query( ctx->wmark );
768 0 : if( FD_UNLIKELY( fd_forest_root_slot( ctx->forest ) == ULONG_MAX ) ) {
769 0 : fd_forest_init( ctx->forest, wmark );
770 0 : uchar mr[ FD_SHRED_MERKLE_ROOT_SZ ] = { 0 }; /* FIXME */
771 0 : fd_fec_chainer_init( ctx->fec_chainer, wmark, mr );
772 0 : FD_TEST( fd_forest_root_slot( ctx->forest ) != ULONG_MAX );
773 0 : FD_LOG_NOTICE(( "Forest initialized with root %lu", wmark ));
774 0 : ctx->prev_wmark = wmark;
775 0 : }
776 0 : if( FD_UNLIKELY( ctx->prev_wmark < wmark ) ) {
777 0 : fd_forest_publish( ctx->forest, wmark );
778 0 : ctx->prev_wmark = wmark;
779 : // invalidate our repair iterator
780 0 : ctx->repair_iter = fd_forest_iter_init( ctx->forest );
781 0 : }
782 :
783 0 : fd_shred_t * shred = (fd_shred_t *)fd_type_pun( ctx->buffer );
784 0 : if( FD_UNLIKELY( shred->slot <= fd_forest_root_slot( ctx->forest ) ) ) {
785 0 : FD_LOG_WARNING(( "shred %lu %u %u too old, ignoring", shred->slot, shred->idx, shred->fec_set_idx ));
786 0 : return;
787 0 : };
788 :
789 : /* Update turbine_slot0 and turbine_slot. */
790 :
791 0 : if( FD_UNLIKELY( fd_fseq_query( ctx->turbine_slot0 )==ULONG_MAX ) ) {
792 0 : fd_fseq_update( ctx->turbine_slot0, shred->slot );
793 0 : FD_LOG_NOTICE(("First turbine slot %lu", shred->slot));
794 0 : }
795 0 : fd_fseq_update( ctx->turbine_slot, fd_ulong_max( shred->slot, fd_fseq_query( ctx->turbine_slot ) ) );
796 0 : if( FD_UNLIKELY( shred->slot <= fd_forest_root_slot( ctx->forest ) ) ) return; /* shred too old */
797 :
798 :
799 : /* Insert the shred sig (shared by all shred members in the FEC set)
800 : into the map. */
801 :
802 0 : fd_fec_sig_t * fec_sig = fd_fec_sig_query( ctx->fec_sigs, (shred->slot << 32) | shred->fec_set_idx, NULL );
803 0 : if( FD_UNLIKELY( !fec_sig ) ) {
804 0 : fec_sig = fd_fec_sig_insert( ctx->fec_sigs, (shred->slot << 32) | shred->fec_set_idx );
805 0 : memcpy( fec_sig->sig, shred->signature, sizeof(fd_ed25519_sig_t) );
806 0 : }
807 :
808 : /* When this is a FEC completes msg, it is implied that all the
809 : other shreds in the FEC set can also be inserted. Shred inserts
810 : into the forest are idempotent so it is fine to insert the same
811 : shred multiple times. */
812 :
813 0 : if( FD_UNLIKELY( is_fec_completes_msg( sz ) ) ) {
814 0 : fd_forest_ele_t * ele = NULL;
815 0 : for( uint idx = shred->fec_set_idx; idx <= shred->idx; idx++ ) {
816 0 : ele = fd_forest_data_shred_insert( ctx->forest, shred->slot, shred->data.parent_off, idx, shred->fec_set_idx, 0, 0 );
817 0 : }
818 0 : FD_TEST( ele ); /* must be non-empty */
819 0 : fd_forest_ele_idxs_insert( ele->cmpl, shred->fec_set_idx );
820 :
821 0 : uchar * merkle = ctx->buffer + FD_SHRED_DATA_HEADER_SZ;
822 0 : int data_complete = !!(shred->data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE);
823 0 : int slot_complete = !!(shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE);
824 :
825 0 : FD_TEST( fd_fec_pool_free( ctx->fec_chainer->pool ) );
826 0 : FD_TEST( !fd_fec_chainer_query( ctx->fec_chainer, shred->slot, shred->fec_set_idx ) );
827 0 : FD_TEST( fd_fec_chainer_insert( ctx->fec_chainer, shred->slot, shred->fec_set_idx, (ushort)(shred->idx - shred->fec_set_idx + 1), data_complete, slot_complete, shred->data.parent_off, merkle, merkle /* FIXME */ ) );
828 :
829 0 : while( FD_LIKELY( !fd_fec_out_empty( ctx->fec_chainer->out ) ) ) {
830 0 : fd_fec_out_t out = fd_fec_out_pop_head( ctx->fec_chainer->out );
831 0 : if( FD_UNLIKELY( out.err != FD_FEC_CHAINER_SUCCESS ) ) FD_LOG_ERR(( "fec chainer err %d", out.err ));
832 0 : fd_reasm_t * reasm = fd_reasm_query( ctx->reasm, out.slot, NULL );
833 0 : if( FD_UNLIKELY( !reasm ) ) {
834 0 : reasm = fd_reasm_insert( ctx->reasm, out.slot );
835 0 : reasm->cnt = 0;
836 0 : }
837 0 : if( FD_UNLIKELY( out.data_complete ) ) {
838 0 : uint cnt = out.fec_set_idx + out.data_cnt - reasm->cnt;
839 0 : ulong sig = fd_disco_repair_replay_sig( out.slot, out.parent_off, cnt, out.slot_complete );
840 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
841 0 : reasm->cnt = out.fec_set_idx + out.data_cnt;
842 0 : fd_stem_publish( ctx->stem, REPLAY_OUT_IDX, sig, 0, 0, 0, tsorig, tspub );
843 0 : if( FD_UNLIKELY( out.slot_complete ) ) {
844 0 : fd_reasm_remove( ctx->reasm, reasm );
845 0 : }
846 0 : }
847 0 : }
848 0 : }
849 :
850 : /* Insert the shred into the map. */
851 :
852 0 : int is_code = fd_shred_is_code( fd_shred_type( shred->variant ) );
853 : // FD_LOG_NOTICE(( "shred %lu %u %u %d", shred->slot, shred->idx, shred->fec_set_idx, is_code ));
854 0 : if( FD_LIKELY( !is_code ) ) {
855 0 : fd_repair_inflight_remove( ctx->repair, shred->slot, shred->idx );
856 :
857 0 : int data_complete = !!(shred->data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE);
858 0 : int slot_complete = !!(shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE);
859 0 : fd_forest_ele_t * ele = fd_forest_data_shred_insert( ctx->forest, shred->slot, shred->data.parent_off, shred->idx, shred->fec_set_idx, data_complete, slot_complete );
860 :
861 : /* Check if there are FECs to force complete. Algorithm: window
862 : through the idxs in interval [i, j). If j = next fec_set_idx
863 : then we know we can force complete the FEC set interval [i, j)
864 : (assuming it wasn't already completed based on `cmpl`). */
865 :
866 0 : uint i = 0;
867 0 : for( uint j = 1; j < ele->buffered_idx + 1; j++ ) { /* TODO iterate by word */
868 0 : if( FD_UNLIKELY( fd_forest_ele_idxs_test( ele->cmpl, i ) && fd_forest_ele_idxs_test( ele->fecs, j ) ) ) {
869 0 : i = j;
870 0 : } else if( FD_UNLIKELY( fd_forest_ele_idxs_test( ele->fecs, j ) || j == ele->complete_idx ) ) {
871 0 : if ( j == ele->complete_idx ) j++;
872 0 : fd_forest_ele_idxs_insert( ele->cmpl, i );
873 :
874 : /* Find the shred tile owning this FEC set. */
875 :
876 0 : fd_fec_sig_t * fec_sig = fd_fec_sig_query( ctx->fec_sigs, (shred->slot << 32) | i, NULL );
877 :
878 0 : ulong sig = fd_ulong_load_8( fec_sig->sig );
879 0 : ulong tile_idx = sig % ctx->shred_tile_cnt;
880 0 : uint last_idx = j - i - 1;
881 :
882 0 : uchar * chunk = fd_chunk_to_laddr( ctx->shred_out_ctx[tile_idx].mem, ctx->shred_out_ctx[tile_idx].chunk );
883 0 : memcpy( chunk, fec_sig->sig, sizeof(fd_ed25519_sig_t) );
884 0 : fd_stem_publish( stem, ctx->shred_out_ctx[tile_idx].idx, last_idx, ctx->shred_out_ctx[tile_idx].chunk, sizeof(fd_ed25519_sig_t), 0UL, 0UL, 0UL );
885 0 : ctx->shred_out_ctx[tile_idx].chunk = fd_dcache_compact_next( ctx->shred_out_ctx[tile_idx].chunk, sizeof(fd_ed25519_sig_t), ctx->shred_out_ctx[tile_idx].chunk0, ctx->shred_out_ctx[tile_idx].wmark );
886 0 : i = j;
887 0 : } else {
888 : // FD_LOG_NOTICE(( "not a fec boundary %lu %u", ele->slot, j ));
889 0 : }
890 0 : }
891 0 : }
892 0 : return;
893 0 : }
894 :
895 0 : fd_eth_hdr_t const * eth = (fd_eth_hdr_t const *)ctx->buffer;
896 0 : fd_ip4_hdr_t const * ip4 = (fd_ip4_hdr_t const *)( (ulong)eth + sizeof(fd_eth_hdr_t) );
897 0 : fd_udp_hdr_t const * udp = (fd_udp_hdr_t const *)( (ulong)ip4 + FD_IP4_GET_LEN( *ip4 ) );
898 0 : uchar * data = (uchar *)( (ulong)udp + sizeof(fd_udp_hdr_t) );
899 0 : if( FD_UNLIKELY( (ulong)udp+sizeof(fd_udp_hdr_t) > (ulong)eth+sz ) ) return;
900 0 : ulong udp_sz = fd_ushort_bswap( udp->net_len );
901 0 : if( FD_UNLIKELY( udp_sz<sizeof(fd_udp_hdr_t) ) ) return;
902 0 : ulong data_sz = udp_sz-sizeof(fd_udp_hdr_t);
903 0 : if( FD_UNLIKELY( (ulong)data+data_sz > (ulong)eth+sz ) ) return;
904 :
905 0 : fd_gossip_peer_addr_t peer_addr = { .addr=ip4->saddr, .port=udp->net_sport };
906 0 : ushort dport = udp->net_dport;
907 0 : if( ctx->repair_intake_addr.port == dport ) {
908 0 : fd_repair_recv_clnt_packet( ctx, ctx->repair, data, data_sz, &peer_addr, ip4->daddr );
909 0 : } else if( ctx->repair_serve_addr.port == dport ) {
910 0 : fd_repair_recv_serv_packet( ctx, ctx->repair, data, data_sz, &peer_addr, ip4->daddr );
911 0 : } else {
912 0 : FD_LOG_WARNING(( "Unexpectedly received packet for port %u", (uint)fd_ushort_bswap( dport ) ));
913 0 : }
914 0 : }
915 :
916 0 : #define MAX_REQ_PER_CREDIT 1
917 :
918 : static inline void
919 : after_credit( fd_repair_tile_ctx_t * ctx,
920 : fd_stem_context_t * stem FD_PARAM_UNUSED,
921 : int * opt_poll_in FD_PARAM_UNUSED,
922 0 : int * charge_busy ) {
923 : /* TODO: Don't charge the tile as busy if after_credit isn't actually
924 : doing any work. */
925 0 : *charge_busy = 1;
926 :
927 0 : if( FD_UNLIKELY( ctx->forest->root == ULONG_MAX ) ) return;
928 0 : if( FD_UNLIKELY( ctx->repair->peer_cnt == 0 ) ) return; /* no peers to send requests to */
929 :
930 0 : long now = fd_log_wallclock();
931 :
932 : #if MAX_REQ_PER_CREDIT > FD_REPAIR_NUM_NEEDED_PEERS
933 : /* If the requests are > 1 per credit then we need to starve
934 : after_credit for after_frag to get the chance to be called. We could
935 : get rid of this all together considering max requests per credit is
936 : 1 currently, but it could be useful for benchmarking purposes in the
937 : future. */
938 : if( FD_UNLIKELY( now - ctx->tsrepair < (long)20e6 ) ) {
939 : return;
940 : }
941 : ctx->tsrepair = now;
942 : #endif
943 :
944 0 : fd_forest_t * forest = ctx->forest;
945 0 : fd_forest_ele_t * pool = fd_forest_pool( forest );
946 0 : fd_forest_orphaned_t * orphaned = fd_forest_orphaned( forest );
947 :
948 : // Always request orphans
949 :
950 0 : int total_req = 0;
951 0 : for( fd_forest_orphaned_iter_t iter = fd_forest_orphaned_iter_init( orphaned, pool );
952 0 : !fd_forest_orphaned_iter_done( iter, orphaned, pool );
953 0 : iter = fd_forest_orphaned_iter_next( iter, orphaned, pool ) ) {
954 0 : fd_forest_ele_t * orphan = fd_forest_orphaned_iter_ele( iter, orphaned, pool );
955 0 : if( fd_repair_need_orphan( ctx->repair, orphan->slot ) ) {
956 0 : fd_repair_send_requests( ctx, fd_needed_orphan, orphan->slot, UINT_MAX, now );
957 0 : total_req += FD_REPAIR_NUM_NEEDED_PEERS;
958 0 : }
959 0 : }
960 :
961 0 : if( FD_UNLIKELY( total_req >= MAX_REQ_PER_CREDIT ) ) {
962 0 : fd_mcache_seq_update( ctx->net_out_sync, ctx->net_out_seq );
963 0 : fd_repair_continue( ctx->repair );
964 0 : return; /* we have already sent enough requests */
965 0 : }
966 :
967 : // Travel down frontier
968 :
969 : /* Every so often we'll need to reset the frontier iterator to the
970 : head of frontier, because we could end up traversing down a very
971 : long tree if we are far behind. */
972 :
973 0 : if( FD_UNLIKELY( now - ctx->tsreset > (long)40e6 ) ) {
974 : // reset iterator to the beginning of the forest frontier
975 0 : ctx->repair_iter = fd_forest_iter_init( ctx->forest );
976 0 : ctx->tsreset = now;
977 0 : }
978 :
979 : /* We are at the head of the turbine, so we should give turbine the
980 : chance to complete the shreds. !ele handles an edgecase where all
981 : frontier are fully complete and the iter is done */
982 :
983 0 : fd_forest_ele_t const * ele = fd_forest_pool_ele_const( pool, ctx->repair_iter.ele_idx );
984 0 : if( FD_LIKELY( !ele || ( ele->slot == fd_fseq_query( ctx->turbine_slot ) && ( now - ctx->tsreset ) < (long)30e6 ) ) ){
985 0 : return;
986 0 : }
987 :
988 0 : while( total_req < MAX_REQ_PER_CREDIT ){
989 0 : ele = fd_forest_pool_ele_const( pool, ctx->repair_iter.ele_idx );
990 : // Request first, advance iterator second.
991 0 : if( ctx->repair_iter.shred_idx == UINT_MAX && fd_repair_need_highest_window_index( ctx->repair, ele->slot, 0 ) ){
992 0 : fd_repair_send_requests( ctx, fd_needed_highest_window_index, ele->slot, 0, now );
993 0 : total_req += FD_REPAIR_NUM_NEEDED_PEERS;
994 0 : } else if( fd_repair_need_window_index( ctx->repair, ele->slot, ctx->repair_iter.shred_idx ) ) {
995 0 : fd_repair_send_requests( ctx, fd_needed_window_index, ele->slot, ctx->repair_iter.shred_idx, now );
996 0 : total_req += FD_REPAIR_NUM_NEEDED_PEERS;
997 0 : }
998 :
999 0 : ctx->repair_iter = fd_forest_iter_next( ctx->repair_iter, forest );
1000 :
1001 0 : if( FD_UNLIKELY( fd_forest_iter_done( ctx->repair_iter, forest ) ) ) {
1002 : /* No more elements in the forest frontier, or the iterator got
1003 : invalidated, so we can start from top again. */
1004 0 : ctx->repair_iter = fd_forest_iter_init( forest );
1005 0 : break;
1006 0 : }
1007 0 : }
1008 :
1009 0 : fd_mcache_seq_update( ctx->net_out_sync, ctx->net_out_seq );
1010 0 : fd_repair_continue( ctx->repair );
1011 0 : }
1012 :
1013 : static inline void
1014 0 : during_housekeeping( fd_repair_tile_ctx_t * ctx ) {
1015 0 : fd_repair_settime( ctx->repair, fd_log_wallclock() );
1016 :
1017 0 : long now = fd_log_wallclock();
1018 0 : if( FD_UNLIKELY( now - ctx->tsprint > (long)1e9 ) ) {
1019 0 : fd_forest_print( ctx->forest );
1020 0 : ctx->tsprint = fd_log_wallclock();
1021 0 : }
1022 :
1023 0 : if( FD_UNLIKELY( !ctx->stem ) ) {
1024 0 : return;
1025 0 : }
1026 0 : }
1027 : static void
1028 : privileged_init( fd_topo_t * topo,
1029 0 : fd_topo_tile_t * tile ) {
1030 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1031 :
1032 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1033 0 : fd_repair_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_repair_tile_ctx_t), sizeof(fd_repair_tile_ctx_t) );
1034 0 : fd_memset( ctx, 0, sizeof(fd_repair_tile_ctx_t) );
1035 :
1036 0 : uchar const * identity_key = fd_keyload_load( tile->repair.identity_key_path, /* pubkey only: */ 0 );
1037 0 : fd_memcpy( ctx->identity_private_key, identity_key, sizeof(fd_pubkey_t) );
1038 0 : fd_memcpy( ctx->identity_public_key.uc, identity_key + 32UL, sizeof(fd_pubkey_t) );
1039 :
1040 0 : ctx->repair_config.private_key = ctx->identity_private_key;
1041 0 : ctx->repair_config.public_key = &ctx->identity_public_key;
1042 :
1043 0 : tile->repair.good_peer_cache_file_fd = open( tile->repair.good_peer_cache_file, O_RDWR | O_CREAT, 0644 );
1044 0 : if( FD_UNLIKELY( tile->repair.good_peer_cache_file_fd==-1 ) ) {
1045 0 : FD_LOG_WARNING(( "Failed to open the good peer cache file (%s) (%i-%s)", tile->repair.good_peer_cache_file, errno, fd_io_strerror( errno ) ));
1046 0 : }
1047 0 : ctx->repair_config.good_peer_cache_file_fd = tile->repair.good_peer_cache_file_fd;
1048 :
1049 0 : FD_TEST( fd_rng_secure( &ctx->repair_seed, sizeof(ulong) ) );
1050 0 : }
1051 :
1052 : static void
1053 : unprivileged_init( fd_topo_t * topo,
1054 0 : fd_topo_tile_t * tile ) {
1055 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1056 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1057 0 : fd_repair_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_repair_tile_ctx_t), sizeof(fd_repair_tile_ctx_t) );
1058 0 : ctx->tsprint = fd_log_wallclock();
1059 0 : ctx->tsrepair = fd_log_wallclock();
1060 0 : ctx->tsreset = fd_log_wallclock();
1061 :
1062 0 : if( FD_UNLIKELY( tile->in_cnt > MAX_IN_LINKS ) ) FD_LOG_ERR(( "repair tile has too many input links" ));
1063 :
1064 0 : uint sign_link_in_idx = UINT_MAX;
1065 0 : for( uint in_idx=0U; in_idx<(tile->in_cnt); in_idx++ ) {
1066 0 : fd_topo_link_t * link = &topo->links[ tile->in_link_id[ in_idx ] ];
1067 0 : if( 0==strcmp( link->name, "net_repair" ) ) {
1068 0 : ctx->in_kind[ in_idx ] = IN_KIND_NET;
1069 0 : fd_net_rx_bounds_init( &ctx->in_links[ in_idx ].net_rx, link->dcache );
1070 0 : continue;
1071 0 : } else if( 0==strcmp( link->name, "gossip_repai" ) ) {
1072 0 : ctx->in_kind[ in_idx ] = IN_KIND_CONTACT;
1073 0 : } else if( 0==strcmp( link->name, "stake_out" ) ) {
1074 0 : ctx->in_kind[ in_idx ] = IN_KIND_STAKE;
1075 0 : } else if( 0==strcmp( link->name, "shred_repair" ) ) {
1076 0 : ctx->in_kind[ in_idx ] = IN_KIND_SHRED;
1077 0 : } else if( 0==strcmp( link->name, "sign_repair" ) ) {
1078 0 : ctx->in_kind[ in_idx ] = IN_KIND_SIGN;
1079 0 : sign_link_in_idx = in_idx;
1080 0 : } else {
1081 0 : FD_LOG_ERR(( "repair tile has unexpected input link %s", link->name ));
1082 0 : }
1083 :
1084 0 : ctx->in_links[ in_idx ].mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1085 0 : ctx->in_links[ in_idx ].chunk0 = fd_dcache_compact_chunk0( ctx->in_links[ in_idx ].mem, link->dcache );
1086 0 : ctx->in_links[ in_idx ].wmark = fd_dcache_compact_wmark ( ctx->in_links[ in_idx ].mem, link->dcache, link->mtu );
1087 0 : ctx->in_links[ in_idx ].mtu = link->mtu;
1088 0 : FD_TEST( fd_dcache_compact_is_safe( ctx->in_links[in_idx].mem, link->dcache, link->mtu, link->depth ) );
1089 0 : }
1090 0 : if( FD_UNLIKELY( sign_link_in_idx==UINT_MAX ) ) FD_LOG_ERR(( "Missing sign_repair link" ));
1091 :
1092 :
1093 0 : uint sign_link_out_idx = UINT_MAX;
1094 0 : uint shred_tile_idx = 0;
1095 0 : for( uint out_idx=0U; out_idx<(tile->out_cnt); out_idx++ ) {
1096 0 : fd_topo_link_t * link = &topo->links[ tile->out_link_id[ out_idx ] ];
1097 :
1098 0 : if( 0==strcmp( link->name, "repair_net" ) ) {
1099 :
1100 0 : if( FD_UNLIKELY( ctx->net_out_mcache ) ) FD_LOG_ERR(( "repair tile has multiple repair_net out links" ));
1101 0 : ctx->net_out_mcache = link->mcache;
1102 0 : ctx->net_out_sync = fd_mcache_seq_laddr( ctx->net_out_mcache );
1103 0 : ctx->net_out_depth = fd_mcache_depth( ctx->net_out_mcache );
1104 0 : ctx->net_out_seq = fd_mcache_seq_query( ctx->net_out_sync );
1105 0 : ctx->net_out_mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1106 0 : ctx->net_out_chunk0 = fd_dcache_compact_chunk0( ctx->net_out_mem, link->dcache );
1107 0 : ctx->net_out_wmark = fd_dcache_compact_wmark( ctx->net_out_mem, link->dcache, link->mtu );
1108 0 : ctx->net_out_chunk = ctx->net_out_chunk0;
1109 :
1110 0 : } else if( 0==strcmp( link->name, "repair_sign" ) ) {
1111 :
1112 0 : sign_link_out_idx = out_idx;
1113 :
1114 0 : } else if( 0==strcmp( link->name, "repair_repla" ) ) {
1115 :
1116 0 : ctx->replay_out_mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1117 0 : ctx->replay_out_chunk0 = fd_dcache_compact_chunk0( ctx->replay_out_mem, link->dcache );
1118 0 : ctx->replay_out_wmark = fd_dcache_compact_wmark( ctx->replay_out_mem, link->dcache, link->mtu );
1119 0 : ctx->replay_out_chunk = ctx->replay_out_chunk0;
1120 :
1121 0 : } else if ( 0==strcmp( link->name, "repair_shred" ) ) {
1122 :
1123 0 : fd_repair_out_ctx_t * shred_out = &ctx->shred_out_ctx[ shred_tile_idx++ ];
1124 0 : shred_out->idx = out_idx;
1125 0 : shred_out->mem = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
1126 0 : shred_out->chunk0 = fd_dcache_compact_chunk0( shred_out->mem, link->dcache );
1127 0 : shred_out->wmark = fd_dcache_compact_wmark( shred_out->mem, link->dcache, link->mtu );
1128 0 : shred_out->chunk = shred_out->chunk0;
1129 :
1130 0 : } else {
1131 0 : FD_LOG_ERR(( "repair tile has unexpected output link %s", link->name ));
1132 0 : }
1133 :
1134 0 : }
1135 0 : if( FD_UNLIKELY( sign_link_out_idx==UINT_MAX ) ) FD_LOG_ERR(( "Missing gossip_sign link" ));
1136 0 : ctx->shred_tile_cnt = shred_tile_idx;
1137 0 : FD_TEST( ctx->shred_tile_cnt == fd_topo_tile_name_cnt( topo, "shred" ) );
1138 :
1139 : /* Scratch mem setup */
1140 :
1141 0 : ctx->blockstore = &ctx->blockstore_ljoin;
1142 0 : ctx->repair = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_align(), fd_repair_footprint() );
1143 0 : ctx->forest = FD_SCRATCH_ALLOC_APPEND( l, fd_forest_align(), fd_forest_footprint( tile->repair.slot_max ) );
1144 0 : ctx->fec_sigs = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_sig_align(), fd_fec_sig_footprint( 20 ) );
1145 0 : ctx->reasm = FD_SCRATCH_ALLOC_APPEND( l, fd_reasm_align(), fd_reasm_footprint( 20 ) );
1146 : // ctx->fec_repair = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_repair_align(), fd_fec_repair_footprint( ( 1<<20 ), tile->repair.shred_tile_cnt ) );
1147 : /* Look at fec_repair.h for an explanation of this fec_max. */
1148 :
1149 0 : ctx->fec_chainer = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_chainer_align(), fd_fec_chainer_footprint( 1 << 20 ) );
1150 :
1151 0 : void * smem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( FD_REPAIR_SCRATCH_MAX ) );
1152 0 : void * fmem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( FD_REPAIR_SCRATCH_DEPTH ) );
1153 :
1154 0 : FD_TEST( ( !!smem ) & ( !!fmem ) );
1155 0 : fd_scratch_attach( smem, fmem, FD_REPAIR_SCRATCH_MAX, FD_REPAIR_SCRATCH_DEPTH );
1156 :
1157 0 : ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
1158 :
1159 0 : ctx->repair_intake_addr.port = fd_ushort_bswap( tile->repair.repair_intake_listen_port );
1160 0 : ctx->repair_serve_addr.port = fd_ushort_bswap( tile->repair.repair_serve_listen_port );
1161 :
1162 0 : ctx->repair_intake_listen_port = tile->repair.repair_intake_listen_port;
1163 0 : ctx->repair_serve_listen_port = tile->repair.repair_serve_listen_port;
1164 :
1165 0 : ctx->net_id = (ushort)0;
1166 :
1167 0 : fd_ip4_udp_hdr_init( ctx->intake_hdr, FD_REPAIR_MAX_PACKET_SIZE, 0, ctx->repair_intake_listen_port );
1168 0 : fd_ip4_udp_hdr_init( ctx->serve_hdr, FD_REPAIR_MAX_PACKET_SIZE, 0, ctx->repair_serve_listen_port );
1169 :
1170 : /* Keyguard setup */
1171 0 : fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id[ sign_link_in_idx ] ];
1172 0 : fd_topo_link_t * sign_out = &topo->links[ tile->out_link_id[ sign_link_out_idx ] ];
1173 0 : if( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client,
1174 0 : sign_out->mcache,
1175 0 : sign_out->dcache,
1176 0 : sign_in->mcache,
1177 0 : sign_in->dcache ) ) == NULL ) {
1178 0 : FD_LOG_ERR(( "Keyguard join failed" ));
1179 0 : }
1180 :
1181 : /* Blockstore setup */
1182 0 : ulong blockstore_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "blockstore" );
1183 0 : FD_TEST( blockstore_obj_id!=ULONG_MAX );
1184 0 : ctx->blockstore_wksp = topo->workspaces[ topo->objs[ blockstore_obj_id ].wksp_id ].wksp;
1185 :
1186 0 : if( ctx->blockstore_wksp==NULL ) {
1187 0 : FD_LOG_ERR(( "no blocktore workspace" ));
1188 0 : }
1189 :
1190 0 : ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, fd_topo_obj_laddr( topo, blockstore_obj_id ) );
1191 0 : FD_TEST( ctx->blockstore!=NULL );
1192 :
1193 0 : FD_LOG_NOTICE(( "repair starting" ));
1194 :
1195 : /* Repair set up */
1196 :
1197 0 : ctx->repair = fd_repair_join( fd_repair_new( ctx->repair, ctx->repair_seed ) );
1198 0 : ctx->forest = fd_forest_join( fd_forest_new( ctx->forest, tile->repair.slot_max, ctx->repair_seed ) );
1199 : // ctx->fec_repair = fd_fec_repair_join( fd_fec_repair_new( ctx->fec_repair, ( tile->repair.max_pending_shred_sets + 2 ), tile->repair.shred_tile_cnt, 0 ) );
1200 0 : ctx->fec_sigs = fd_fec_sig_join( fd_fec_sig_new( ctx->fec_sigs, 20 ) );
1201 0 : ctx->reasm = fd_reasm_join( fd_reasm_new( ctx->reasm, 20 ) );
1202 0 : ctx->fec_chainer = fd_fec_chainer_join( fd_fec_chainer_new( ctx->fec_chainer, 1 << 20, 0 ) );
1203 0 : ctx->repair_iter = fd_forest_iter_init( ctx->forest );
1204 0 : FD_TEST( fd_forest_iter_done( ctx->repair_iter, ctx->forest ) );
1205 :
1206 : /**********************************************************************/
1207 : /* turbine_slot fseq */
1208 : /**********************************************************************/
1209 :
1210 0 : ulong turbine_slot0_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "turbine_slot0" );
1211 0 : FD_TEST( turbine_slot0_obj_id!=ULONG_MAX );
1212 0 : ctx->turbine_slot0 = fd_fseq_join( fd_topo_obj_laddr( topo, turbine_slot0_obj_id ) );
1213 0 : FD_TEST( ctx->turbine_slot0 );
1214 0 : FD_TEST( fd_fseq_query( ctx->turbine_slot0 )==ULONG_MAX );
1215 :
1216 0 : ulong turbine_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "turbine_slot" );
1217 0 : FD_TEST( turbine_slot_obj_id!=ULONG_MAX );
1218 0 : ctx->turbine_slot = fd_fseq_join( fd_topo_obj_laddr( topo, turbine_slot_obj_id ) );
1219 0 : FD_TEST( ctx->turbine_slot );
1220 0 : fd_fseq_update( ctx->turbine_slot, 0UL );
1221 :
1222 0 : FD_LOG_NOTICE(( "repair my addr - intake addr: " FD_IP4_ADDR_FMT ":%u, serve_addr: " FD_IP4_ADDR_FMT ":%u",
1223 0 : FD_IP4_ADDR_FMT_ARGS( ctx->repair_intake_addr.addr ), fd_ushort_bswap( ctx->repair_intake_addr.port ),
1224 0 : FD_IP4_ADDR_FMT_ARGS( ctx->repair_serve_addr.addr ), fd_ushort_bswap( ctx->repair_serve_addr.port ) ));
1225 :
1226 0 : ulong root_slot_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "root_slot" );
1227 0 : FD_TEST( root_slot_obj_id!=ULONG_MAX );
1228 0 : ctx->wmark = fd_fseq_join( fd_topo_obj_laddr( topo, root_slot_obj_id ) );
1229 0 : if( FD_UNLIKELY( !ctx->wmark ) ) FD_LOG_ERR(( "replay tile has no root_slot fseq" ));
1230 0 : ctx->prev_wmark = fd_fseq_query( ctx->wmark );
1231 :
1232 0 : if( fd_repair_set_config( ctx->repair, &ctx->repair_config ) ) {
1233 0 : FD_LOG_ERR( ( "error setting repair config" ) );
1234 0 : }
1235 :
1236 0 : fd_repair_update_addr( ctx->repair, &ctx->repair_intake_addr, &ctx->repair_serve_addr );
1237 :
1238 0 : fd_repair_settime( ctx->repair, fd_log_wallclock() );
1239 0 : fd_repair_start( ctx->repair );
1240 :
1241 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
1242 0 : if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
1243 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
1244 0 : }
1245 :
1246 : static ulong
1247 : populate_allowed_seccomp( fd_topo_t const * topo FD_PARAM_UNUSED,
1248 : fd_topo_tile_t const * tile,
1249 : ulong out_cnt,
1250 0 : struct sock_filter * out ) {
1251 0 : populate_sock_filter_policy_fd_repair_tile(
1252 0 : out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)tile->repair.good_peer_cache_file_fd );
1253 0 : return sock_filter_policy_fd_repair_tile_instr_cnt;
1254 0 : }
1255 :
1256 : static ulong
1257 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
1258 : fd_topo_tile_t const * tile,
1259 : ulong out_fds_cnt,
1260 0 : int * out_fds ) {
1261 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
1262 :
1263 0 : ulong out_cnt = 0UL;
1264 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
1265 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
1266 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
1267 0 : if( FD_LIKELY( -1!=tile->repair.good_peer_cache_file_fd ) )
1268 0 : out_fds[ out_cnt++ ] = tile->repair.good_peer_cache_file_fd; /* good peer cache file */
1269 0 : return out_cnt;
1270 0 : }
1271 :
1272 : static inline void
1273 0 : fd_repair_update_repair_metrics( fd_repair_metrics_t * metrics ) {
1274 0 : FD_MCNT_SET( REPAIR, RECV_CLNT_PKT, metrics->recv_clnt_pkt );
1275 0 : FD_MCNT_SET( REPAIR, RECV_SERV_PKT, metrics->recv_serv_pkt );
1276 0 : FD_MCNT_SET( REPAIR, RECV_SERV_CORRUPT_PKT, metrics->recv_serv_corrupt_pkt );
1277 0 : FD_MCNT_SET( REPAIR, RECV_SERV_INVALID_SIGNATURE, metrics->recv_serv_invalid_signature );
1278 0 : FD_MCNT_SET( REPAIR, RECV_SERV_FULL_PING_TABLE, metrics->recv_serv_full_ping_table );
1279 0 : FD_MCNT_ENUM_COPY( REPAIR, RECV_SERV_PKT_TYPES, metrics->recv_serv_pkt_types );
1280 0 : FD_MCNT_SET( REPAIR, RECV_PKT_CORRUPTED_MSG, metrics->recv_pkt_corrupted_msg );
1281 0 : FD_MCNT_SET( REPAIR, SEND_PKT_CNT, metrics->send_pkt_cnt );
1282 0 : FD_MCNT_ENUM_COPY( REPAIR, SENT_PKT_TYPES, metrics->sent_pkt_types );
1283 0 : }
1284 :
1285 : static inline void
1286 0 : metrics_write( fd_repair_tile_ctx_t * ctx ) {
1287 : /* Repair-protocol-specific metrics */
1288 0 : fd_repair_update_repair_metrics( fd_repair_get_metrics( ctx->repair ) );
1289 0 : }
1290 :
1291 : /* TODO: This is probably not correct. */
1292 0 : #define STEM_BURST (2UL)
1293 :
1294 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_repair_tile_ctx_t
1295 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_repair_tile_ctx_t)
1296 :
1297 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
1298 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
1299 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
1300 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
1301 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
1302 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
1303 :
1304 : #include "../../disco/stem/fd_stem.c"
1305 :
1306 : fd_topo_run_tile_t fd_tile_repair = {
1307 : .name = "repair",
1308 : .loose_footprint = loose_footprint,
1309 : .populate_allowed_seccomp = populate_allowed_seccomp,
1310 : .populate_allowed_fds = populate_allowed_fds,
1311 : .scratch_align = scratch_align,
1312 : .scratch_footprint = scratch_footprint,
1313 : .unprivileged_init = unprivileged_init,
1314 : .privileged_init = privileged_init,
1315 : .run = stem_run,
1316 : };
|