Line data Source code
1 : #define _GNU_SOURCE 1
2 : #include "fd_repair.h"
3 : #include "../../ballet/sha256/fd_sha256.h"
4 : #include "../../ballet/ed25519/fd_ed25519.h"
5 : #include "../../ballet/base58/fd_base58.h"
6 : #include "../../disco/keyguard/fd_keyguard.h"
7 : #include "../../util/rng/fd_rng.h"
8 : #include "../../flamenco/fd_flamenco_base.h"
9 : #include <string.h>
10 : #include <stdio.h>
11 : #include <stdlib.h>
12 : #include <errno.h>
13 : #include <arpa/inet.h>
14 : #include <unistd.h>
15 : #include <sys/socket.h>
16 :
17 : void *
18 0 : fd_repair_new ( void * shmem, ulong seed ) {
19 0 : FD_SCRATCH_ALLOC_INIT(l, shmem);
20 0 : fd_repair_t * repair = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_repair_t), sizeof(fd_repair_t) );
21 0 : void * actives = FD_SCRATCH_ALLOC_APPEND( l, fd_active_table_align(), fd_active_table_footprint(FD_ACTIVE_KEY_MAX) );
22 0 : void * inflight = FD_SCRATCH_ALLOC_APPEND( l, fd_inflight_table_align(), fd_inflight_table_footprint(FD_NEEDED_KEY_MAX) );
23 0 : void * inflpool = FD_SCRATCH_ALLOC_APPEND( l, fd_inflight_pool_align(), fd_inflight_pool_footprint(FD_NEEDED_KEY_MAX) );
24 0 : void * inflmap = FD_SCRATCH_ALLOC_APPEND( l, fd_inflight_map_align(), fd_inflight_map_footprint(FD_NEEDED_KEY_MAX) );
25 0 : void * infldl = FD_SCRATCH_ALLOC_APPEND( l, fd_inflight_dlist_align(), fd_inflight_dlist_footprint() );
26 0 : void * pinged = FD_SCRATCH_ALLOC_APPEND( l, fd_pinged_table_align(), fd_pinged_table_footprint(FD_REPAIR_PINGED_MAX) );
27 0 : void * signpool = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_pending_sign_req_pool_align(), fd_repair_pending_sign_req_pool_footprint( FD_REPAIR_PENDING_SIGN_REQ_MAX ) );
28 0 : void * signmap = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_pending_sign_req_map_align(), fd_repair_pending_sign_req_map_footprint( FD_REPAIR_PENDING_SIGN_REQ_MAX ) );
29 :
30 0 : fd_memset(repair, 0, sizeof(fd_repair_t));
31 0 : repair->seed = seed;
32 :
33 0 : repair->actives = fd_active_table_join ( fd_active_table_new ( actives, FD_ACTIVE_KEY_MAX, seed ));
34 0 : repair->dupdetect = fd_inflight_table_join( fd_inflight_table_new( inflight, FD_NEEDED_KEY_MAX, seed ));
35 0 : repair->inflight_pool = fd_inflight_pool_join ( fd_inflight_pool_new ( inflpool, FD_NEEDED_KEY_MAX ));
36 0 : repair->inflight_map = fd_inflight_map_join ( fd_inflight_map_new ( inflmap, FD_NEEDED_KEY_MAX, seed ));
37 0 : repair->inflight_dlist = fd_inflight_dlist_join( fd_inflight_dlist_new( infldl ));
38 0 : repair->pinged = fd_pinged_table_join ( fd_pinged_table_new ( pinged, FD_REPAIR_PINGED_MAX, seed ));
39 0 : repair->pending_sign_pool = fd_repair_pending_sign_req_pool_join( fd_repair_pending_sign_req_pool_new( signpool, FD_REPAIR_PENDING_SIGN_REQ_MAX ) );
40 0 : repair->pending_sign_map = fd_repair_pending_sign_req_map_join ( fd_repair_pending_sign_req_map_new ( signmap, FD_REPAIR_PENDING_SIGN_REQ_MAX, seed ) );
41 0 : repair->last_decay = 0;
42 0 : repair->last_print = 0;
43 0 : repair->next_nonce = 0;
44 0 : fd_rng_new(repair->rng, (uint)seed, 0UL);
45 0 : repair->peer_cnt = 0;
46 0 : repair->peer_idx = 0;
47 :
48 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI(l, 1UL);
49 0 : if ( scratch_top > (ulong)shmem + fd_repair_footprint() ) {
50 0 : FD_LOG_ERR(("Enough space not allocated for repair"));
51 0 : }
52 :
53 0 : return repair;
54 0 : }
55 :
56 : fd_repair_t *
57 0 : fd_repair_join ( void * shmap ) { return (fd_repair_t *)shmap; }
58 :
59 : void *
60 0 : fd_repair_leave ( fd_repair_t * join ) { return join; }
61 :
62 : void *
63 0 : fd_repair_delete ( void * shmap ) {
64 0 : fd_repair_t * glob = (fd_repair_t *)shmap;
65 0 : fd_active_table_delete( fd_active_table_leave( glob->actives ) );
66 0 : fd_inflight_table_delete( fd_inflight_table_leave( glob->dupdetect ) );
67 0 : fd_pinged_table_delete( fd_pinged_table_leave( glob->pinged ) );
68 0 : fd_repair_pending_sign_req_pool_delete( fd_repair_pending_sign_req_pool_leave( glob->pending_sign_pool ) );
69 0 : fd_repair_pending_sign_req_map_delete( fd_repair_pending_sign_req_map_leave( glob->pending_sign_map ) );
70 0 : return glob;
71 0 : }
72 :
73 : /* Convert an address to a human readable string */
74 0 : const char * fd_repair_addr_str( char * dst, size_t dstlen, fd_ip4_port_t const * src ) {
75 0 : char tmp[INET_ADDRSTRLEN];
76 0 : snprintf(dst, dstlen, "%s:%u", inet_ntop(AF_INET, &src->addr, tmp, INET_ADDRSTRLEN), (uint)ntohs(src->port));
77 0 : return dst;
78 0 : }
79 :
80 : /* Set the repair configuration */
81 : int
82 0 : fd_repair_set_config( fd_repair_t * glob, const fd_repair_config_t * config ) {
83 0 : char tmp[100];
84 0 : char keystr[ FD_BASE58_ENCODED_32_SZ ];
85 0 : fd_base58_encode_32( config->public_key->uc, NULL, keystr );
86 0 : FD_LOG_INFO(("configuring address %s key %s", fd_repair_addr_str(tmp, sizeof(tmp), &config->intake_addr), keystr));
87 :
88 0 : glob->public_key = config->public_key;
89 0 : glob->private_key = config->private_key;
90 0 : fd_repair_peer_addr_copy(&glob->intake_addr, &config->intake_addr);
91 0 : fd_repair_peer_addr_copy(&glob->service_addr, &config->service_addr);
92 0 : return 0;
93 0 : }
94 :
95 : int
96 0 : fd_repair_update_addr( fd_repair_t * glob, const fd_ip4_port_t * intake_addr, const fd_ip4_port_t * service_addr ) {
97 0 : char tmp[100];
98 0 : FD_LOG_INFO(("updating address %s", fd_repair_addr_str(tmp, sizeof(tmp), intake_addr)));
99 :
100 0 : fd_repair_peer_addr_copy(&glob->intake_addr, intake_addr);
101 0 : fd_repair_peer_addr_copy(&glob->service_addr, service_addr);
102 0 : return 0;
103 0 : }
104 :
105 : /* Initiate connection to a peer */
106 : int
107 0 : fd_repair_add_active_peer( fd_repair_t * glob, fd_ip4_port_t const * addr, fd_pubkey_t const * id ) {
108 0 : fd_active_elem_t * val = fd_active_table_query(glob->actives, id, NULL);
109 0 : if (val == NULL) {
110 0 : val = fd_active_table_insert(glob->actives, id);
111 0 : fd_repair_peer_addr_copy(&val->addr, addr);
112 0 : val->resp_cnt = 0;
113 0 : val->req_cnt = 0;
114 0 : val->first_req_ts = 0;
115 0 : val->last_req_ts = 0;
116 0 : val->first_resp_ts = 0;
117 0 : val->last_resp_ts = 0;
118 0 : val->total_latency = 0;
119 0 : val->stake = 0;
120 :
121 : /* This memory could get read by the catchup tool, so keep a valid
122 : state of the peer list at all times.*/
123 :
124 0 : glob->peers[ glob->peer_cnt ] = (fd_peer_t){
125 0 : .key = *id,
126 0 : .ip4 = *addr
127 0 : };
128 0 : FD_COMPILER_MFENCE();
129 0 : glob->peer_cnt++;
130 :
131 0 : return 0;
132 0 : }
133 0 : return 1;
134 0 : }
135 :
136 : /* Set the current protocol time in nanosecs */
137 : void
138 0 : fd_repair_settime( fd_repair_t * glob, long ts ) {
139 0 : glob->now = ts;
140 0 : }
141 :
142 : /* Get the current protocol time in nanosecs */
143 : long
144 0 : fd_repair_gettime( fd_repair_t * glob ) {
145 0 : return glob->now;
146 0 : }
147 : /* Start timed events and other protocol behavior */
148 : int
149 0 : fd_repair_start( fd_repair_t * glob ) {
150 0 : glob->last_sends = glob->now;
151 0 : glob->last_decay = glob->now;
152 0 : glob->last_print = glob->now;
153 0 : return 0;
154 0 : }
155 :
156 : /* Dispatch timed events and other protocol behavior. This should be
157 : * called inside the main spin loop. */
158 : int
159 0 : fd_repair_continue( fd_repair_t * glob ) {
160 0 : if ( glob->now - glob->last_print > (long)30e9 ) { /* 30 seconds */
161 0 : glob->last_print = glob->now;
162 0 : glob->last_decay = glob->now;
163 0 : } else if ( glob->now - glob->last_decay > (long)15e9 ) { /* 15 seconds */
164 0 : glob->last_decay = glob->now;
165 0 : }
166 0 : return 0;
167 0 : }
168 :
169 : int
170 : fd_repair_construct_request_protocol( fd_repair_t * glob,
171 : fd_repair_protocol_t * protocol,
172 : enum fd_needed_elem_type type,
173 : ulong slot,
174 : uint shred_index,
175 : fd_pubkey_t const * recipient,
176 : uint nonce,
177 0 : long now ) {
178 0 : switch( type ) {
179 0 : case fd_needed_window_index: {
180 0 : fd_repair_protocol_new_disc(protocol, fd_repair_protocol_enum_window_index);
181 0 : fd_repair_window_index_t * wi = &protocol->inner.window_index;
182 0 : wi->header.sender = *glob->public_key;
183 0 : wi->header.recipient = *recipient;
184 0 : wi->header.timestamp = (ulong)now/1000000L;
185 0 : wi->header.nonce = nonce;
186 0 : wi->slot = slot;
187 0 : wi->shred_index = shred_index;
188 0 : return 1;
189 0 : }
190 :
191 0 : case fd_needed_highest_window_index: {
192 0 : fd_repair_protocol_new_disc( protocol, fd_repair_protocol_enum_highest_window_index );
193 0 : fd_repair_highest_window_index_t * wi = &protocol->inner.highest_window_index;
194 0 : wi->header.sender = *glob->public_key;
195 0 : wi->header.recipient = *recipient;
196 0 : wi->header.timestamp = (ulong)now/1000000L;
197 0 : wi->header.nonce = nonce;
198 0 : wi->slot = slot;
199 0 : wi->shred_index = shred_index;
200 0 : return 1;
201 0 : }
202 :
203 0 : case fd_needed_orphan: {
204 0 : fd_repair_protocol_new_disc( protocol, fd_repair_protocol_enum_orphan );
205 0 : fd_repair_orphan_t * wi = &protocol->inner.orphan;
206 0 : wi->header.sender = *glob->public_key;
207 0 : wi->header.recipient = *recipient;
208 0 : wi->header.timestamp = (ulong)now/1000000L;
209 0 : wi->header.nonce = nonce;
210 0 : wi->slot = slot;
211 0 : return 1;
212 0 : }
213 0 : }
214 0 : return 0;
215 0 : }
216 :
217 : /*
218 : * This function is called when the inflight table becomes full and we need to insert
219 : * a new repair request. Rather than failing the new request, we attempt to free up
220 : * space by removing old entries that are unlikely to receive responses. This happens
221 : * in fd_repair_create_inflight_request() when fd_inflight_table_is_full() returns true.
222 : */
223 : static int
224 0 : fd_repair_evict_old_inflight_request( fd_repair_t * glob, long now ) {
225 0 : int evicted = 0;
226 :
227 : /* Iterate through all entries in the inflight request tracking table.
228 : * This table (glob->dupdetect) maps repair request keys to tracking info. */
229 0 : for( fd_inflight_table_iter_t iter = fd_inflight_table_iter_init( glob->dupdetect );
230 0 : !fd_inflight_table_iter_done( glob->dupdetect, iter );
231 0 : iter = fd_inflight_table_iter_next( glob->dupdetect, iter ) ) {
232 :
233 0 : fd_inflight_elem_t * ele = fd_inflight_table_iter_ele( glob->dupdetect, iter );
234 :
235 : /* Check if this request is stale (older than 80ms).
236 : * last_send_time is in nanoseconds, so 80e6 = 80 million nanoseconds = 80ms.
237 : * FD_UNLIKELY hint suggests most entries will not be stale when this function is called. */
238 0 : if( FD_UNLIKELY( ele->last_send_time + (long)80e6 < now ) ) {
239 : /* Remove the stale entry from the inflight table.
240 : * This frees up space and allows the same shred to be requested again. */
241 0 : fd_inflight_table_remove( glob->dupdetect, &ele->key );
242 0 : evicted++;
243 0 : }
244 0 : }
245 :
246 0 : return evicted;
247 0 : }
248 :
249 : /* Returns 1 if its valid to send a request for the given shred. 0 if
250 : it is not, i.e., there is an inflight request for it that was sent
251 : within the last x ms. */
252 : static int
253 0 : fd_repair_create_dedup_request( fd_repair_t * glob, int type, ulong slot, uint shred_index, long now ) {
254 :
255 : /* If there are no active sticky peers from which to send requests to, refresh the sticky peers
256 : selection. It may be that stake weights were not available before, and now they are. */
257 :
258 0 : fd_inflight_key_t dupkey = { .type = (enum fd_needed_elem_type)type, .slot = slot, .shred_index = shred_index };
259 0 : fd_inflight_elem_t * dupelem = fd_inflight_table_query( glob->dupdetect, &dupkey, NULL );
260 :
261 0 : if( dupelem == NULL ) {
262 : /* No existing inflight request for this shred, need to create a new tracking entry.
263 : * First check if the inflight table has space available. */
264 0 : if( FD_UNLIKELY( fd_inflight_table_is_full( glob->dupdetect ) ) ) {
265 : /* Table is full - attempt to free space by evicting stale entries.
266 : * This prevents the repair system from getting stuck when the table fills up
267 : * with old requests that will never receive responses. */
268 0 : if( 0 == fd_repair_evict_old_inflight_request( glob, now ) ) {
269 : /* No stale entries found to evict - all requests are recent.
270 : * This likely indicates high repair request volume or very slow network. */
271 0 : FD_LOG_WARNING(( "Failed to evict old inflight requests for slot %lu, shred_index %u.", slot, shred_index ));
272 0 : return 0;
273 0 : }
274 : /* Verify that eviction freed up space as expected */
275 0 : FD_TEST( !fd_inflight_table_is_full( glob->dupdetect ) );
276 0 : }
277 : /* Insert new tracking entry for this repair request */
278 0 : dupelem = fd_inflight_table_insert( glob->dupdetect, &dupkey );
279 0 : dupelem->last_send_time = 0L; /* Will be set to current time below if request is sent */
280 0 : }
281 :
282 0 : if( FD_LIKELY( dupelem->last_send_time+(long)80e6 < now ) ) { /* 80ms */
283 0 : dupelem->last_send_time = now;
284 0 : dupelem->req_cnt = FD_REPAIR_NUM_NEEDED_PEERS;
285 0 : return 1;
286 0 : }
287 0 : return 0;
288 0 : }
289 :
290 : long
291 : fd_repair_inflight_remove( fd_repair_t * glob,
292 : ulong slot,
293 : uint shred_index,
294 0 : ulong nonce ) {
295 : /* If we have a shred, we can remove it from the inflight table */
296 : // FIXME: might be worth adding eviction logic here for orphan / highest window reqs
297 :
298 0 : fd_inflight_key_t dupkey = { .type = fd_needed_window_index, .slot = slot, .shred_index = shred_index };
299 0 : fd_inflight_elem_t * dupelem = fd_inflight_table_query( glob->dupdetect, &dupkey, NULL );
300 0 : if( dupelem ) {
301 : /* Remove the element from the inflight table */
302 0 : fd_inflight_table_remove( glob->dupdetect, &dupkey );
303 0 : }
304 :
305 0 : long now = fd_log_wallclock();
306 0 : fd_inflight_t * inflight_req = fd_inflight_map_ele_query( glob->inflight_map, &nonce, NULL, glob->inflight_pool );
307 0 : if( inflight_req ) {
308 0 : long rtt = now - inflight_req->timestamp_ns;
309 :
310 : /* update peer stats */
311 0 : fd_active_elem_t * active_elem = fd_active_table_query( glob->actives, &inflight_req->pubkey, NULL );
312 0 : if( FD_LIKELY( active_elem ) ) {
313 0 : active_elem->resp_cnt++;
314 0 : if( FD_UNLIKELY( active_elem->first_resp_ts == 0 ) ) active_elem->first_resp_ts = now;
315 0 : active_elem->last_resp_ts = now;
316 0 : active_elem->total_latency += rtt;
317 0 : }
318 : /* Remove the element from the inflight table */
319 0 : fd_inflight_map_ele_remove ( glob->inflight_map, &nonce, NULL, glob->inflight_pool );
320 0 : fd_inflight_dlist_ele_remove( glob->inflight_dlist, inflight_req, glob->inflight_pool );
321 0 : fd_inflight_pool_ele_release( glob->inflight_pool, inflight_req );
322 0 : return rtt;
323 0 : }
324 :
325 0 : return 0;
326 0 : }
327 :
328 : int
329 0 : fd_repair_need_window_index( fd_repair_t * glob, ulong slot, uint shred_index ) {
330 : // FD_LOG_NOTICE(( "[%s] need window %lu, shred_index %u", __func__, slot, shred_index ));
331 0 : return fd_repair_create_dedup_request( glob, fd_needed_window_index, slot, shred_index, glob->now );
332 0 : }
333 :
334 : int
335 0 : fd_repair_need_highest_window_index( fd_repair_t * glob, ulong slot, uint shred_index ) {
336 : // FD_LOG_NOTICE(( "[%s] need highest %lu", __func__, slot ));
337 0 : return fd_repair_create_dedup_request( glob, fd_needed_highest_window_index, slot, shred_index, glob->now );
338 0 : }
339 :
340 : int
341 0 : fd_repair_need_orphan( fd_repair_t * glob, ulong slot ) {
342 : // FD_LOG_NOTICE( ( "[repair] need orphan %lu", slot ) );
343 0 : return fd_repair_create_dedup_request( glob, fd_needed_orphan, slot, UINT_MAX, glob->now );
344 0 : }
345 :
346 : /* Pending Sign Request API
347 :
348 : These functions manage the pool and map of pending sign requests in
349 : the repair module. Each request is identified by a unique nonce,
350 : allowing for nonce to be used as a key in the map.
351 :
352 : fd_repair_pending_sign_req_t * fd_repair_acquire_pending_request(...);
353 : Acquires an empty pending sign request from the pool. Returns
354 : pointer or NULL if pool is full. Caller is responsible for setting
355 : all fields before adding to map.
356 :
357 : int fd_repair_add_pending_to_map(...);
358 : Adds a pending sign request to the map. Returns 0 on success, -1 on
359 : failure. The pending request must be previously acquired from the
360 : pool.
361 :
362 : fd_repair_pending_sign_req_t * fd_repair_find_pending_request(...);
363 : Finds a pending sign request by nonce. Returns pointer or NULL.
364 :
365 : int fd_repair_remove_pending_request(...);
366 : Removes a pending sign request by nonce. Returns 0 on success, -1
367 : if not found.
368 :
369 : All functions assume the repair context is valid and not used concurrently.
370 : */
371 :
372 : fd_repair_pending_sign_req_t *
373 : fd_repair_insert_pending_request( fd_repair_t * repair,
374 : fd_repair_protocol_t * protocol,
375 : uint dst_ip_addr,
376 : ushort dst_port,
377 : enum fd_needed_elem_type type,
378 : ulong slot,
379 : uint shred_index,
380 : long now,
381 0 : fd_pubkey_t const * recipient ) {
382 :
383 : /* Check if there is any space for a new pending sign request */
384 0 : if( FD_UNLIKELY( fd_repair_pending_sign_req_pool_free( repair->pending_sign_pool ) == 0 ) ) {
385 0 : return NULL;
386 0 : }
387 :
388 0 : fd_repair_pending_sign_req_t * pending = fd_repair_pending_sign_req_pool_ele_acquire( repair->pending_sign_pool );
389 0 : if (FD_UNLIKELY( !pending ) ) {
390 0 : return NULL;
391 0 : }
392 :
393 0 : pending->nonce = repair->next_nonce;
394 :
395 0 : fd_repair_pending_sign_req_map_ele_insert( repair->pending_sign_map, pending, repair->pending_sign_pool );
396 0 : fd_repair_construct_request_protocol( repair, protocol, type, slot, shred_index, recipient, repair->next_nonce, now );
397 :
398 0 : pending->sig_offset = 4;
399 0 : pending->dst_ip_addr = dst_ip_addr;
400 0 : pending->dst_port = dst_port;
401 0 : pending->recipient = *recipient;
402 0 : pending->type = (uchar)type;
403 :
404 : /* Add the request to the inflight table */
405 0 : fd_inflight_t * inflight_req = fd_inflight_pool_ele_acquire( repair->inflight_pool );
406 0 : if( FD_UNLIKELY( !inflight_req ) ) {
407 0 : FD_LOG_ERR(("Failed to acquire inflight request from pool, implement eviction"));
408 0 : }
409 0 : inflight_req->nonce = repair->next_nonce;
410 0 : inflight_req->timestamp_ns = now;
411 0 : inflight_req->pubkey = *recipient;
412 :
413 0 : fd_inflight_map_ele_insert( repair->inflight_map, inflight_req, repair->inflight_pool );
414 0 : fd_inflight_dlist_ele_push_tail( repair->inflight_dlist, inflight_req, repair->inflight_pool );
415 :
416 0 : repair->next_nonce++;
417 0 : return pending;
418 0 : }
419 :
420 : fd_repair_pending_sign_req_t *
421 : fd_repair_query_pending_request( fd_repair_t * repair,
422 0 : ulong nonce ) {
423 0 : return fd_repair_pending_sign_req_map_ele_query( repair->pending_sign_map, &nonce, NULL, repair->pending_sign_pool );
424 0 : }
425 :
426 : int
427 : fd_repair_remove_pending_request( fd_repair_t * repair,
428 0 : ulong nonce ) {
429 0 : fd_repair_pending_sign_req_t * pending = fd_repair_pending_sign_req_map_ele_query( repair->pending_sign_map, &nonce, NULL, repair->pending_sign_pool );
430 0 : if( FD_UNLIKELY( !pending ) ) {
431 0 : return -1;
432 0 : }
433 0 : fd_repair_pending_sign_req_map_ele_remove( repair->pending_sign_map, &nonce, NULL, repair->pending_sign_pool );
434 0 : fd_repair_pending_sign_req_pool_ele_release( repair->pending_sign_pool, pending );
435 0 : return 0;
436 0 : }
|