Line data Source code
1 : /* Repair tile runs the repair protocol for a Firedancer node. */
2 : #define _GNU_SOURCE
3 :
4 : #include "../../disco/topo/fd_topo.h"
5 : #include "generated/fd_repair_tile_seccomp.h"
6 :
7 : #include "../store/util.h"
8 :
9 : #include "../../flamenco/repair/fd_repair.h"
10 : #include "../../flamenco/runtime/fd_blockstore.h"
11 : #include "../../disco/fd_disco.h"
12 : #include "../../disco/keyguard/fd_keyload.h"
13 : #include "../../disco/keyguard/fd_keyguard_client.h"
14 : #include "../../disco/net/fd_net_tile.h"
15 : #include "../../disco/shred/fd_stake_ci.h"
16 : #include "../../disco/topo/fd_pod_format.h"
17 : #include "../../util/net/fd_net_headers.h"
18 :
19 : #include <unistd.h>
20 : #include <arpa/inet.h>
21 : #include <linux/unistd.h>
22 : #include <sys/random.h>
23 : #include <netdb.h>
24 : #include <errno.h>
25 : #include <netinet/in.h>
26 :
27 : #define NET_IN_IDX 0
28 0 : #define CONTACT_IN_IDX 1
29 0 : #define STAKE_IN_IDX 2
30 0 : #define STORE_IN_IDX 3
31 0 : #define SIGN_IN_IDX 4
32 :
33 : #define STORE_OUT_IDX 0
34 0 : #define NET_OUT_IDX 1
35 0 : #define SIGN_OUT_IDX 2
36 :
37 0 : #define MAX_REPAIR_PEERS 40200UL
38 : #define MAX_BUFFER_SIZE ( MAX_REPAIR_PEERS * sizeof(fd_shred_dest_wire_t))
39 :
40 : struct fd_repair_tile_ctx {
41 : fd_repair_t * repair;
42 : fd_repair_config_t repair_config;
43 :
44 : ulong repair_seed;
45 :
46 : fd_repair_peer_addr_t repair_intake_addr;
47 : fd_repair_peer_addr_t repair_serve_addr;
48 :
49 : ushort repair_intake_listen_port;
50 : ushort repair_serve_listen_port;
51 :
52 : uchar identity_private_key[ 32 ];
53 : fd_pubkey_t identity_public_key;
54 :
55 : fd_wksp_t * wksp;
56 :
57 : fd_wksp_t * contact_in_mem;
58 : ulong contact_in_chunk0;
59 : ulong contact_in_wmark;
60 :
61 : fd_wksp_t * stake_weights_in_mem;
62 : ulong stake_weights_in_chunk0;
63 : ulong stake_weights_in_wmark;
64 :
65 : fd_wksp_t * repair_req_in_mem;
66 : ulong repair_req_in_chunk0;
67 : ulong repair_req_in_wmark;
68 :
69 : fd_net_rx_bounds_t net_in_bounds;
70 :
71 : fd_frag_meta_t * net_out_mcache;
72 : ulong * net_out_sync;
73 : ulong net_out_depth;
74 : ulong net_out_seq;
75 :
76 : fd_wksp_t * net_out_mem;
77 : ulong net_out_chunk0;
78 : ulong net_out_wmark;
79 : ulong net_out_chunk;
80 :
81 : fd_frag_meta_t * store_out_mcache;
82 : ulong * store_out_sync;
83 : ulong store_out_depth;
84 : ulong store_out_seq;
85 :
86 : fd_wksp_t * store_out_mem;
87 : ulong store_out_chunk0;
88 : ulong store_out_wmark;
89 : ulong store_out_chunk;
90 :
91 : ushort net_id;
92 : /* Includes Ethernet, IP, UDP headers */
93 : uchar buffer[ MAX_BUFFER_SIZE ];
94 : fd_ip4_udp_hdrs_t intake_hdr[1];
95 : fd_ip4_udp_hdrs_t serve_hdr [1];
96 :
97 : fd_stake_ci_t * stake_ci;
98 :
99 : fd_stem_context_t * stem;
100 :
101 : fd_wksp_t * blockstore_wksp;
102 : fd_blockstore_t blockstore_ljoin;
103 : fd_blockstore_t * blockstore;
104 :
105 : fd_keyguard_client_t keyguard_client[1];
106 : };
107 : typedef struct fd_repair_tile_ctx fd_repair_tile_ctx_t;
108 :
109 : FD_FN_CONST static inline ulong
110 0 : scratch_align( void ) {
111 0 : return 128UL;
112 0 : }
113 :
114 : FD_FN_PURE static inline ulong
115 0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
116 0 : return 1UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
117 0 : }
118 :
119 : FD_FN_PURE static inline ulong
120 0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED) {
121 :
122 0 : ulong l = FD_LAYOUT_INIT;
123 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_repair_tile_ctx_t), sizeof(fd_repair_tile_ctx_t) );
124 0 : l = FD_LAYOUT_APPEND( l, fd_repair_align(), fd_repair_footprint() );
125 0 : l = FD_LAYOUT_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( FD_REPAIR_SCRATCH_MAX ) );
126 0 : l = FD_LAYOUT_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( FD_REPAIR_SCRATCH_DEPTH ) );
127 0 : l = FD_LAYOUT_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
128 0 : return FD_LAYOUT_FINI( l, scratch_align() );
129 0 : }
130 :
131 : void
132 : repair_signer( void * signer_ctx,
133 : uchar signature[ static 64 ],
134 : uchar const * buffer,
135 : ulong len,
136 0 : int sign_type ) {
137 0 : fd_repair_tile_ctx_t * ctx = (fd_repair_tile_ctx_t *) signer_ctx;
138 0 : fd_keyguard_client_sign( ctx->keyguard_client, signature, buffer, len, sign_type );
139 0 : }
140 :
141 : static void
142 : send_packet( fd_repair_tile_ctx_t * ctx,
143 : int is_intake,
144 : uint dst_ip_addr,
145 : ushort dst_port,
146 : uint src_ip_addr,
147 : uchar const * payload,
148 : ulong payload_sz,
149 0 : ulong tsorig ) {
150 0 : uchar * packet = fd_chunk_to_laddr( ctx->net_out_mem, ctx->net_out_chunk );
151 0 : fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)packet;
152 0 : *hdr = *(is_intake ? ctx->intake_hdr : ctx->serve_hdr);
153 :
154 0 : fd_ip4_hdr_t * ip4 = hdr->ip4;
155 0 : ip4->saddr = src_ip_addr;
156 0 : ip4->daddr = dst_ip_addr;
157 0 : ip4->net_id = fd_ushort_bswap( ctx->net_id++ );
158 0 : ip4->check = 0U;
159 0 : ip4->net_tot_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_ip4_hdr_t)+sizeof(fd_udp_hdr_t)) );
160 0 : ip4->check = fd_ip4_hdr_check_fast( ip4 );
161 :
162 0 : fd_udp_hdr_t * udp = hdr->udp;
163 0 : udp->net_dport = dst_port;
164 0 : udp->net_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_udp_hdr_t)) );
165 0 : fd_memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), payload, payload_sz );
166 0 : hdr->udp->check = 0U;
167 :
168 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
169 0 : ulong sig = fd_disco_netmux_sig( dst_ip_addr, dst_port, dst_ip_addr, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) );
170 0 : ulong packet_sz = payload_sz + sizeof(fd_ip4_udp_hdrs_t);
171 0 : fd_mcache_publish( ctx->net_out_mcache, ctx->net_out_depth, ctx->net_out_seq, sig, ctx->net_out_chunk, packet_sz, 0UL, tsorig, tspub );
172 0 : ctx->net_out_seq = fd_seq_inc( ctx->net_out_seq, 1UL );
173 0 : ctx->net_out_chunk = fd_dcache_compact_next( ctx->net_out_chunk, packet_sz, ctx->net_out_chunk0, ctx->net_out_wmark );
174 0 : }
175 :
176 : static inline void
177 : handle_new_cluster_contact_info( fd_repair_tile_ctx_t * ctx,
178 : uchar const * buf,
179 0 : ulong buf_sz ) {
180 0 : fd_shred_dest_wire_t const * in_dests = (fd_shred_dest_wire_t const *)fd_type_pun_const( buf );
181 :
182 0 : ulong dest_cnt = buf_sz;
183 0 : if( FD_UNLIKELY( dest_cnt >= MAX_REPAIR_PEERS ) ) {
184 0 : FD_LOG_WARNING(( "Cluster nodes had %lu destinations, which was more than the max of %lu", dest_cnt, MAX_REPAIR_PEERS ));
185 0 : return;
186 0 : }
187 :
188 0 : for( ulong i=0UL; i<dest_cnt; i++ ) {
189 0 : fd_repair_peer_addr_t repair_peer = {
190 0 : .addr = in_dests[i].ip4_addr,
191 0 : .port = fd_ushort_bswap( in_dests[i].udp_port ),
192 0 : };
193 :
194 0 : fd_repair_add_active_peer( ctx->repair, &repair_peer, in_dests[i].pubkey );
195 0 : }
196 0 : }
197 :
198 : static inline void
199 : handle_new_repair_requests( fd_repair_tile_ctx_t * ctx,
200 : uchar const * buf,
201 0 : ulong buf_sz ) {
202 :
203 0 : fd_repair_request_t const * repair_reqs = (fd_repair_request_t const *) buf;
204 0 : ulong repair_req_cnt = buf_sz / sizeof(fd_repair_request_t);
205 :
206 0 : for( ulong i=0UL; i<repair_req_cnt; i++ ) {
207 0 : fd_repair_request_t const * repair_req = &repair_reqs[i];
208 0 : int rc = 0;
209 0 : switch(repair_req->type) {
210 0 : case FD_REPAIR_REQ_TYPE_NEED_WINDOW_INDEX: {
211 0 : rc = fd_repair_need_window_index( ctx->repair, repair_req->slot, repair_req->shred_index );
212 0 : break;
213 0 : }
214 0 : case FD_REPAIR_REQ_TYPE_NEED_HIGHEST_WINDOW_INDEX: {
215 0 : rc = fd_repair_need_highest_window_index( ctx->repair, repair_req->slot, repair_req->shred_index );
216 0 : break;
217 0 : }
218 0 : case FD_REPAIR_REQ_TYPE_NEED_ORPHAN: {
219 0 : rc = fd_repair_need_orphan( ctx->repair, repair_req->slot );
220 0 : break;
221 0 : }
222 0 : }
223 :
224 0 : if( rc < 0 ) {
225 0 : FD_LOG_DEBUG(( "failed to issue repair request" ));
226 0 : }
227 0 : }
228 :
229 0 : }
230 :
231 : static inline void
232 0 : handle_new_stake_weights( fd_repair_tile_ctx_t * ctx ) {
233 0 : ulong stakes_cnt = ctx->stake_ci->scratch->staked_cnt;
234 :
235 0 : if( stakes_cnt >= MAX_REPAIR_PEERS ) {
236 0 : FD_LOG_ERR(( "Cluster nodes had %lu stake weights, which was more than the max of %lu", stakes_cnt, MAX_REPAIR_PEERS ));
237 0 : }
238 :
239 0 : fd_stake_weight_t const * in_stake_weights = ctx->stake_ci->stake_weight;
240 0 : fd_repair_set_stake_weights( ctx->repair, in_stake_weights, stakes_cnt );
241 0 : }
242 :
243 :
244 : static void
245 : repair_send_intake_packet( uchar const * msg,
246 : size_t msglen,
247 : fd_gossip_peer_addr_t const * addr,
248 : uint src_addr,
249 0 : void * arg ) {
250 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
251 0 : send_packet( arg, 1, addr->addr, addr->port, src_addr, msg, msglen, tsorig );
252 0 : }
253 :
254 : static void
255 : repair_send_serve_packet( uchar const * msg,
256 : size_t msglen,
257 : fd_gossip_peer_addr_t const * addr,
258 : uint src_addr,
259 0 : void * arg ) {
260 0 : ulong tsorig = fd_frag_meta_ts_comp( fd_tickcount() );
261 0 : send_packet( arg, 0, addr->addr, addr->port, src_addr, msg, msglen, tsorig );
262 0 : }
263 :
264 : static void
265 : repair_shred_deliver( fd_shred_t const * shred,
266 : ulong shred_sz,
267 : fd_repair_peer_addr_t const * from FD_PARAM_UNUSED,
268 : fd_pubkey_t const * id FD_PARAM_UNUSED,
269 0 : void * arg ) {
270 0 : fd_repair_tile_ctx_t * ctx = (fd_repair_tile_ctx_t *)arg;
271 :
272 0 : fd_shred_t * out_shred = fd_chunk_to_laddr( ctx->store_out_mem, ctx->store_out_chunk );
273 0 : fd_memcpy( out_shred, shred, shred_sz );
274 :
275 0 : ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
276 0 : ulong sig = 0UL;
277 0 : fd_stem_publish( ctx->stem, 0UL, sig, ctx->store_out_chunk, shred_sz, 0UL, 0UL, tspub );
278 0 : ctx->store_out_chunk = fd_dcache_compact_next( ctx->store_out_chunk, shred_sz, ctx->store_out_chunk0, ctx->store_out_wmark );
279 0 : }
280 :
281 : static void
282 : repair_shred_deliver_fail( fd_pubkey_t const * id FD_PARAM_UNUSED,
283 : ulong slot,
284 : uint shred_index,
285 : void * arg FD_PARAM_UNUSED,
286 0 : int reason ) {
287 0 : FD_LOG_WARNING(( "repair failed to get shred - slot: %lu, shred_index: %u, reason: %d", slot, shred_index, reason ));
288 0 : }
289 :
290 : static inline int
291 : before_frag( fd_repair_tile_ctx_t * ctx,
292 : ulong in_idx,
293 : ulong seq,
294 0 : ulong sig ) {
295 0 : (void)ctx;
296 0 : (void)seq;
297 :
298 0 : if( FD_LIKELY( in_idx==NET_IN_IDX ) ) return fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR;
299 0 : return 0;
300 0 : }
301 :
302 : static void
303 : during_frag( fd_repair_tile_ctx_t * ctx,
304 : ulong in_idx,
305 : ulong seq FD_PARAM_UNUSED,
306 : ulong sig FD_PARAM_UNUSED,
307 : ulong chunk,
308 : ulong sz,
309 0 : ulong ctl ) {
310 :
311 0 : uchar const * dcache_entry;
312 0 : ulong dcache_entry_sz;
313 :
314 : // TODO: check for sz>MTU for failure once MTUs are decided
315 0 : if( FD_UNLIKELY( in_idx==CONTACT_IN_IDX ) ) {
316 0 : if( FD_UNLIKELY( chunk<ctx->contact_in_chunk0 || chunk>ctx->contact_in_wmark ) ) {
317 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
318 0 : ctx->contact_in_chunk0, ctx->contact_in_wmark ));
319 0 : }
320 0 : dcache_entry = fd_chunk_to_laddr_const( ctx->contact_in_mem, chunk );
321 0 : dcache_entry_sz = sz * sizeof(fd_shred_dest_wire_t);
322 :
323 0 : } else if( FD_UNLIKELY( in_idx==STAKE_IN_IDX ) ) {
324 0 : if( FD_UNLIKELY( chunk<ctx->stake_weights_in_chunk0 || chunk>ctx->stake_weights_in_wmark ) ) {
325 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
326 0 : ctx->stake_weights_in_chunk0, ctx->stake_weights_in_wmark ));
327 0 : }
328 :
329 0 : dcache_entry = fd_chunk_to_laddr_const( ctx->stake_weights_in_mem, chunk );
330 0 : fd_stake_ci_stake_msg_init( ctx->stake_ci, dcache_entry );
331 0 : return;
332 :
333 0 : } else if( FD_UNLIKELY( in_idx==STORE_IN_IDX ) ) {
334 0 : if( FD_UNLIKELY( chunk<ctx->repair_req_in_chunk0 || chunk>ctx->repair_req_in_wmark ) ) {
335 0 : FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz,
336 0 : ctx->repair_req_in_chunk0, ctx->repair_req_in_wmark ));
337 0 : }
338 :
339 0 : dcache_entry = fd_chunk_to_laddr_const( ctx->repair_req_in_mem, chunk );
340 0 : dcache_entry_sz = sz;
341 0 : } else if ( FD_LIKELY( in_idx == NET_IN_IDX ) ) {
342 0 : dcache_entry = fd_net_rx_translate_frag( &ctx->net_in_bounds, chunk, ctl, sz );
343 0 : dcache_entry_sz = sz;
344 0 : } else {
345 0 : FD_LOG_ERR(("Unknown in_idx %lu for repair", in_idx));
346 0 : }
347 :
348 0 : fd_memcpy( ctx->buffer, dcache_entry, dcache_entry_sz );
349 0 : }
350 :
351 : static void
352 : after_frag( fd_repair_tile_ctx_t * ctx,
353 : ulong in_idx,
354 : ulong seq,
355 : ulong sig,
356 : ulong sz,
357 : ulong tsorig,
358 : ulong tspub,
359 0 : fd_stem_context_t * stem ) {
360 0 : (void)seq;
361 0 : (void)tsorig;
362 0 : (void)tspub;
363 :
364 0 : if( FD_UNLIKELY( in_idx==CONTACT_IN_IDX ) ) {
365 0 : handle_new_cluster_contact_info( ctx, ctx->buffer, sz );
366 0 : return;
367 0 : }
368 :
369 0 : if( FD_UNLIKELY( in_idx==STAKE_IN_IDX ) ) {
370 0 : fd_stake_ci_stake_msg_fini( ctx->stake_ci );
371 0 : handle_new_stake_weights( ctx );
372 0 : return;
373 0 : }
374 :
375 0 : if( FD_UNLIKELY( in_idx==STORE_IN_IDX ) ) {
376 0 : handle_new_repair_requests( ctx, ctx->buffer, sz );
377 0 : return;
378 0 : }
379 :
380 0 : ctx->stem = stem;
381 0 : ulong hdr_sz = fd_disco_netmux_sig_hdr_sz( sig );
382 0 : fd_eth_hdr_t const * eth = (fd_eth_hdr_t const *)ctx->buffer;
383 0 : fd_ip4_hdr_t const * ip4 = (fd_ip4_hdr_t const *)( eth+1 );
384 0 : fd_udp_hdr_t const * udp = (fd_udp_hdr_t const *)( (ulong)ip4 + FD_IP4_GET_LEN( *ip4 ) );
385 0 : if( FD_UNLIKELY( (ulong)(udp+1) > (ulong)eth+sz ) ) return;
386 :
387 0 : fd_repair_peer_addr_t peer_addr;
388 0 : peer_addr.l = 0;
389 0 : peer_addr.addr = ip4->saddr;
390 0 : peer_addr.port = udp->net_sport;
391 :
392 0 : ushort dport = udp->net_dport;
393 0 : if( ctx->repair_intake_addr.port == dport ) {
394 0 : fd_repair_recv_clnt_packet( ctx->repair, ctx->buffer + hdr_sz, sz - hdr_sz, &peer_addr, ip4->daddr );
395 0 : } else if( ctx->repair_serve_addr.port == dport ) {
396 0 : fd_repair_recv_serv_packet( ctx->repair, ctx->buffer + hdr_sz, sz - hdr_sz, &peer_addr, ip4->daddr );
397 0 : } else {
398 0 : FD_LOG_ERR(( "Unexpectedly received packet for port %u", (uint)fd_ushort_bswap( dport ) ));
399 0 : }
400 0 : }
401 :
402 : static inline void
403 : after_credit( fd_repair_tile_ctx_t * ctx,
404 : fd_stem_context_t * stem,
405 : int * opt_poll_in,
406 0 : int * charge_busy ) {
407 0 : (void)stem;
408 0 : (void)opt_poll_in;
409 :
410 : /* TODO: Don't charge the tile as busy if after_credit isn't actually
411 : doing any work. */
412 0 : *charge_busy = 1;
413 :
414 0 : fd_mcache_seq_update( ctx->net_out_sync, ctx->net_out_seq );
415 :
416 0 : fd_repair_continue( ctx->repair );
417 0 : }
418 :
419 : static inline void
420 0 : during_housekeeping( fd_repair_tile_ctx_t * ctx ) {
421 0 : fd_repair_settime( ctx->repair, fd_log_wallclock() );
422 0 : }
423 :
424 : static long
425 : repair_get_shred( ulong slot,
426 : uint shred_idx,
427 : void * buf,
428 : ulong buf_max,
429 0 : void * arg ) {
430 0 : fd_repair_tile_ctx_t * ctx = (fd_repair_tile_ctx_t *)arg;
431 0 : fd_blockstore_t * blockstore = ctx->blockstore;
432 0 : if( FD_UNLIKELY( blockstore == NULL ) ) {
433 0 : return -1;
434 0 : }
435 :
436 0 : if( shred_idx == UINT_MAX ) {
437 0 : int err = FD_MAP_ERR_AGAIN;
438 0 : while( err == FD_MAP_ERR_AGAIN ) {
439 0 : fd_block_map_query_t query[1] = { 0 };
440 0 : err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
441 0 : fd_block_info_t * meta = fd_block_map_query_ele( query );
442 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return -1L;
443 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
444 0 : shred_idx = (uint)meta->slot_complete_idx;
445 0 : err = fd_block_map_query_test( query );
446 0 : }
447 0 : }
448 0 : long sz = fd_buf_shred_query_copy_data( blockstore, slot, shred_idx, buf, buf_max );
449 0 : return sz;
450 0 : }
451 :
452 : static ulong
453 : repair_get_parent( ulong slot,
454 0 : void * arg ) {
455 0 : fd_repair_tile_ctx_t * ctx = (fd_repair_tile_ctx_t *)arg;
456 0 : fd_blockstore_t * blockstore = ctx->blockstore;
457 0 : if( FD_UNLIKELY( blockstore == NULL ) ) {
458 0 : return FD_SLOT_NULL;
459 0 : }
460 0 : return fd_blockstore_parent_slot_query( blockstore, slot );
461 0 : }
462 :
463 : static void
464 : privileged_init( fd_topo_t * topo,
465 0 : fd_topo_tile_t * tile ) {
466 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
467 :
468 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
469 0 : fd_repair_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_repair_tile_ctx_t), sizeof(fd_repair_tile_ctx_t) );
470 :
471 0 : uchar const * identity_key = fd_keyload_load( tile->repair.identity_key_path, /* pubkey only: */ 0 );
472 0 : fd_memcpy( ctx->identity_private_key, identity_key, sizeof(fd_pubkey_t) );
473 0 : fd_memcpy( ctx->identity_public_key.uc, identity_key + 32UL, sizeof(fd_pubkey_t) );
474 :
475 0 : ctx->repair_config.private_key = ctx->identity_private_key;
476 0 : ctx->repair_config.public_key = &ctx->identity_public_key;
477 :
478 0 : tile->repair.good_peer_cache_file_fd = open( tile->repair.good_peer_cache_file, O_RDWR | O_CREAT, 0644 );
479 0 : if( FD_UNLIKELY( tile->repair.good_peer_cache_file_fd==-1 ) ) {
480 0 : FD_LOG_WARNING(( "Failed to open the good peer cache file (%s) (%i-%s)", tile->repair.good_peer_cache_file, errno, fd_io_strerror( errno ) ));
481 0 : }
482 0 : ctx->repair_config.good_peer_cache_file_fd = tile->repair.good_peer_cache_file_fd;
483 :
484 0 : FD_TEST( sizeof(ulong) == getrandom( &ctx->repair_seed, sizeof(ulong), 0 ) );
485 0 : }
486 :
487 : static void
488 : unprivileged_init( fd_topo_t * topo,
489 0 : fd_topo_tile_t * tile ) {
490 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
491 :
492 0 : if( FD_UNLIKELY( tile->in_cnt != 5 ||
493 0 : strcmp( topo->links[ tile->in_link_id[ NET_IN_IDX ] ].name, "net_repair") ||
494 0 : strcmp( topo->links[ tile->in_link_id[ CONTACT_IN_IDX ] ].name, "gossip_repai" ) ||
495 0 : strcmp( topo->links[ tile->in_link_id[ STAKE_IN_IDX ] ].name, "stake_out" ) ||
496 0 : strcmp( topo->links[ tile->in_link_id[ STORE_IN_IDX ] ].name, "store_repair" ) ||
497 0 : strcmp( topo->links[ tile->in_link_id[ SIGN_IN_IDX ] ].name, "sign_repair" ) ) ) {
498 0 : FD_LOG_ERR(( "repair tile has none or unexpected input links %lu %s %s",
499 0 : tile->in_cnt, topo->links[ tile->in_link_id[ 0 ] ].name, topo->links[ tile->in_link_id[ 1 ] ].name ));
500 0 : }
501 :
502 0 : if( FD_UNLIKELY( tile->out_cnt != 3 ||
503 0 : strcmp( topo->links[ tile->out_link_id[ STORE_OUT_IDX ] ].name, "repair_store" ) ||
504 0 : strcmp( topo->links[ tile->out_link_id[ NET_OUT_IDX ] ].name, "repair_net" ) ||
505 0 : strcmp( topo->links[ tile->out_link_id[ SIGN_OUT_IDX ] ].name, "repair_sign" ) ) ) {
506 0 : FD_LOG_ERR(( "repair tile has none or unexpected output links %lu %s %s",
507 0 : tile->out_cnt, topo->links[ tile->out_link_id[ 0 ] ].name, topo->links[ tile->out_link_id[ 1 ] ].name ));
508 0 : }
509 :
510 0 : if( FD_UNLIKELY( !tile->out_cnt ) ) FD_LOG_ERR(( "repair tile has no primary output link" ));
511 :
512 : /* Scratch mem setup */
513 :
514 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
515 0 : fd_repair_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_repair_tile_ctx_t), sizeof(fd_repair_tile_ctx_t) );
516 0 : ctx->blockstore = &ctx->blockstore_ljoin;
517 0 : ctx->repair = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_align(), fd_repair_footprint() );
518 :
519 0 : void * smem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_smem_align(), fd_scratch_smem_footprint( FD_REPAIR_SCRATCH_MAX ) );
520 0 : void * fmem = FD_SCRATCH_ALLOC_APPEND( l, fd_scratch_fmem_align(), fd_scratch_fmem_footprint( FD_REPAIR_SCRATCH_DEPTH ) );
521 :
522 0 : FD_TEST( ( !!smem ) & ( !!fmem ) );
523 0 : fd_scratch_attach( smem, fmem, FD_REPAIR_SCRATCH_MAX, FD_REPAIR_SCRATCH_DEPTH );
524 :
525 0 : ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
526 :
527 0 : ctx->repair_intake_addr.port = fd_ushort_bswap( tile->repair.repair_intake_listen_port );
528 0 : ctx->repair_serve_addr.port = fd_ushort_bswap( tile->repair.repair_serve_listen_port );
529 :
530 0 : ctx->repair_intake_listen_port = tile->repair.repair_intake_listen_port;
531 0 : ctx->repair_serve_listen_port = tile->repair.repair_serve_listen_port;
532 :
533 0 : void * _stake_ci = FD_SCRATCH_ALLOC_APPEND( l, fd_stake_ci_align(), fd_stake_ci_footprint() );
534 0 : ctx->stake_ci = fd_stake_ci_join( fd_stake_ci_new( _stake_ci , &ctx->identity_public_key ) );
535 :
536 0 : ctx->net_id = (ushort)0;
537 :
538 0 : fd_ip4_udp_hdr_init( ctx->intake_hdr, FD_REPAIR_MAX_PACKET_SIZE, 0, ctx->repair_intake_listen_port );
539 0 : fd_ip4_udp_hdr_init( ctx->serve_hdr, FD_REPAIR_MAX_PACKET_SIZE, 0, ctx->repair_serve_listen_port );
540 :
541 : /* Keyguard setup */
542 0 : fd_topo_link_t * sign_in = &topo->links[ tile->in_link_id[ SIGN_IN_IDX ] ];
543 0 : fd_topo_link_t * sign_out = &topo->links[ tile->out_link_id[ SIGN_OUT_IDX ] ];
544 0 : if( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client,
545 0 : sign_out->mcache,
546 0 : sign_out->dcache,
547 0 : sign_in->mcache,
548 0 : sign_in->dcache ) ) == NULL ) {
549 0 : FD_LOG_ERR(( "Keyguard join failed" ));
550 0 : }
551 :
552 : /* Blockstore setup */
553 0 : ulong blockstore_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "blockstore" );
554 0 : FD_TEST( blockstore_obj_id!=ULONG_MAX );
555 0 : ctx->blockstore_wksp = topo->workspaces[ topo->objs[ blockstore_obj_id ].wksp_id ].wksp;
556 :
557 0 : if( ctx->blockstore_wksp==NULL ) {
558 0 : FD_LOG_ERR(( "no blocktore workspace" ));
559 0 : }
560 :
561 0 : ctx->blockstore = fd_blockstore_join( &ctx->blockstore_ljoin, fd_topo_obj_laddr( topo, blockstore_obj_id ) );
562 0 : FD_TEST( ctx->blockstore!=NULL );
563 :
564 0 : fd_topo_link_t * netmux_link = &topo->links[ tile->in_link_id[ 0 ] ];
565 0 : fd_net_rx_bounds_init( &ctx->net_in_bounds, netmux_link->dcache );
566 :
567 0 : FD_LOG_NOTICE(( "repair starting" ));
568 :
569 0 : fd_topo_link_t * net_out = &topo->links[ tile->out_link_id[ NET_OUT_IDX ] ];
570 0 : ctx->net_out_mcache = net_out->mcache;
571 0 : ctx->net_out_sync = fd_mcache_seq_laddr( ctx->net_out_mcache );
572 0 : ctx->net_out_depth = fd_mcache_depth( ctx->net_out_mcache );
573 0 : ctx->net_out_seq = fd_mcache_seq_query( ctx->net_out_sync );
574 0 : ctx->net_out_chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( net_out->dcache ), net_out->dcache );
575 0 : ctx->net_out_mem = topo->workspaces[ topo->objs[ net_out->dcache_obj_id ].wksp_id ].wksp;
576 0 : ctx->net_out_wmark = fd_dcache_compact_wmark( ctx->net_out_mem, net_out->dcache, net_out->mtu );
577 0 : ctx->net_out_chunk = ctx->net_out_chunk0;
578 :
579 :
580 0 : fd_topo_link_t * store_out = &topo->links[ tile->out_link_id[ 0 ] ];
581 0 : ctx->store_out_mcache = store_out->mcache;
582 0 : ctx->store_out_sync = fd_mcache_seq_laddr( ctx->store_out_mcache );
583 0 : ctx->store_out_depth = fd_mcache_depth( ctx->store_out_mcache );
584 0 : ctx->store_out_seq = fd_mcache_seq_query( ctx->store_out_sync );
585 0 : ctx->store_out_chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( store_out->dcache ), store_out->dcache );
586 0 : ctx->store_out_mem = topo->workspaces[ topo->objs[ store_out->dcache_obj_id ].wksp_id ].wksp;
587 0 : ctx->store_out_wmark = fd_dcache_compact_wmark( ctx->store_out_mem, store_out->dcache, store_out->mtu );
588 0 : ctx->store_out_chunk = ctx->store_out_chunk0;
589 :
590 : /* Set up contact info tile input */
591 0 : fd_topo_link_t * contact_in_link = &topo->links[ tile->in_link_id[ CONTACT_IN_IDX ] ];
592 0 : ctx->contact_in_mem = topo->workspaces[ topo->objs[ contact_in_link->dcache_obj_id ].wksp_id ].wksp;
593 0 : ctx->contact_in_chunk0 = fd_dcache_compact_chunk0( ctx->contact_in_mem, contact_in_link->dcache );
594 0 : ctx->contact_in_wmark = fd_dcache_compact_wmark ( ctx->contact_in_mem, contact_in_link->dcache, contact_in_link->mtu );
595 :
596 : /* Set up tile stake weight tile input */
597 0 : fd_topo_link_t * stake_weights_in_link = &topo->links[ tile->in_link_id[ STAKE_IN_IDX ] ];
598 0 : ctx->stake_weights_in_mem = topo->workspaces[ topo->objs[ stake_weights_in_link->dcache_obj_id ].wksp_id ].wksp;
599 0 : ctx->stake_weights_in_chunk0 = fd_dcache_compact_chunk0( ctx->stake_weights_in_mem, stake_weights_in_link->dcache );
600 0 : ctx->stake_weights_in_wmark = fd_dcache_compact_wmark ( ctx->stake_weights_in_mem, stake_weights_in_link->dcache, stake_weights_in_link->mtu );
601 :
602 : /* Set up tile repair request input */
603 0 : fd_topo_link_t * repair_req_in_link = &topo->links[ tile->in_link_id[ STORE_IN_IDX ] ];
604 0 : ctx->repair_req_in_mem = topo->workspaces[ topo->objs[ repair_req_in_link->dcache_obj_id ].wksp_id ].wksp;
605 0 : ctx->repair_req_in_chunk0 = fd_dcache_compact_chunk0( ctx->repair_req_in_mem, repair_req_in_link->dcache );
606 0 : ctx->repair_req_in_wmark = fd_dcache_compact_wmark ( ctx->repair_req_in_mem, repair_req_in_link->dcache, repair_req_in_link->mtu );
607 :
608 : /* Repair set up */
609 :
610 0 : ctx->repair = fd_repair_join( fd_repair_new( ctx->repair, ctx->repair_seed ) );
611 :
612 0 : FD_LOG_NOTICE(( "repair my addr - intake addr: " FD_IP4_ADDR_FMT ":%u, serve_addr: " FD_IP4_ADDR_FMT ":%u",
613 0 : FD_IP4_ADDR_FMT_ARGS( ctx->repair_intake_addr.addr ), fd_ushort_bswap( ctx->repair_intake_addr.port ),
614 0 : FD_IP4_ADDR_FMT_ARGS( ctx->repair_serve_addr.addr ), fd_ushort_bswap( ctx->repair_serve_addr.port ) ));
615 :
616 0 : ctx->repair_config.fun_arg = ctx;
617 0 : ctx->repair_config.deliver_fun = repair_shred_deliver;
618 0 : ctx->repair_config.deliver_fail_fun = repair_shred_deliver_fail;
619 0 : ctx->repair_config.clnt_send_fun = repair_send_intake_packet;
620 0 : ctx->repair_config.serv_send_fun = repair_send_serve_packet;
621 0 : ctx->repair_config.serv_get_shred_fun = repair_get_shred;
622 0 : ctx->repair_config.serv_get_parent_fun = repair_get_parent;
623 0 : ctx->repair_config.sign_fun = repair_signer;
624 0 : ctx->repair_config.sign_arg = ctx;
625 :
626 0 : if( fd_repair_set_config( ctx->repair, &ctx->repair_config ) ) {
627 0 : FD_LOG_ERR( ( "error setting repair config" ) );
628 0 : }
629 :
630 0 : fd_repair_update_addr( ctx->repair, &ctx->repair_intake_addr, &ctx->repair_serve_addr );
631 :
632 0 : fd_repair_settime( ctx->repair, fd_log_wallclock() );
633 0 : fd_repair_start( ctx->repair );
634 :
635 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
636 0 : if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
637 0 : FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
638 0 : }
639 :
640 : static ulong
641 : populate_allowed_seccomp( fd_topo_t const * topo FD_PARAM_UNUSED,
642 : fd_topo_tile_t const * tile,
643 : ulong out_cnt,
644 0 : struct sock_filter * out ) {
645 0 : populate_sock_filter_policy_fd_repair_tile(
646 0 : out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)tile->repair.good_peer_cache_file_fd );
647 0 : return sock_filter_policy_fd_repair_tile_instr_cnt;
648 0 : }
649 :
650 : static ulong
651 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
652 : fd_topo_tile_t const * tile,
653 : ulong out_fds_cnt,
654 0 : int * out_fds ) {
655 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
656 :
657 0 : ulong out_cnt = 0UL;
658 0 : out_fds[ out_cnt++ ] = 2; /* stderr */
659 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
660 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
661 0 : if( FD_LIKELY( -1!=tile->repair.good_peer_cache_file_fd ) )
662 0 : out_fds[ out_cnt++ ] = tile->repair.good_peer_cache_file_fd; /* good peer cache file */
663 0 : return out_cnt;
664 0 : }
665 :
666 : static inline void
667 0 : fd_repair_update_repair_metrics( fd_repair_metrics_t * metrics ) {
668 0 : FD_MCNT_SET( REPAIR, RECV_CLNT_PKT, metrics->recv_clnt_pkt );
669 0 : FD_MCNT_SET( REPAIR, RECV_SERV_PKT, metrics->recv_serv_pkt );
670 0 : FD_MCNT_SET( REPAIR, RECV_SERV_CORRUPT_PKT, metrics->recv_serv_corrupt_pkt );
671 0 : FD_MCNT_SET( REPAIR, RECV_SERV_INVALID_SIGNATURE, metrics->recv_serv_invalid_signature );
672 0 : FD_MCNT_SET( REPAIR, RECV_SERV_FULL_PING_TABLE, metrics->recv_serv_full_ping_table );
673 0 : FD_MCNT_ENUM_COPY( REPAIR, RECV_SERV_PKT_TYPES, metrics->recv_serv_pkt_types );
674 0 : FD_MCNT_SET( REPAIR, RECV_PKT_CORRUPTED_MSG, metrics->recv_pkt_corrupted_msg );
675 0 : FD_MCNT_SET( REPAIR, SEND_PKT_CNT, metrics->send_pkt_cnt );
676 0 : FD_MCNT_ENUM_COPY( REPAIR, SENT_PKT_TYPES, metrics->sent_pkt_types );
677 0 : }
678 :
679 : static inline void
680 0 : metrics_write( fd_repair_tile_ctx_t * ctx ) {
681 : /* Repair-protocol-specific metrics */
682 0 : fd_repair_update_repair_metrics( fd_repair_get_metrics( ctx->repair ) );
683 0 : }
684 :
685 : /* TODO: This is probably not correct. */
686 0 : #define STEM_BURST (1UL)
687 :
688 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_repair_tile_ctx_t
689 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_repair_tile_ctx_t)
690 :
691 0 : #define STEM_CALLBACK_AFTER_CREDIT after_credit
692 0 : #define STEM_CALLBACK_BEFORE_FRAG before_frag
693 0 : #define STEM_CALLBACK_DURING_FRAG during_frag
694 0 : #define STEM_CALLBACK_AFTER_FRAG after_frag
695 0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
696 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
697 :
698 : #include "../../disco/stem/fd_stem.c"
699 :
700 : fd_topo_run_tile_t fd_tile_repair = {
701 : .name = "repair",
702 : .loose_footprint = loose_footprint,
703 : .populate_allowed_seccomp = populate_allowed_seccomp,
704 : .populate_allowed_fds = populate_allowed_fds,
705 : .scratch_align = scratch_align,
706 : .scratch_footprint = scratch_footprint,
707 : .unprivileged_init = unprivileged_init,
708 : .privileged_init = privileged_init,
709 : .run = stem_run,
710 : };
|