Line data Source code
1 : #define _GNU_SOURCE 1
2 : #include "fd_repair.h"
3 : #include "../../ballet/sha256/fd_sha256.h"
4 : #include "../../ballet/ed25519/fd_ed25519.h"
5 : #include "../../ballet/base58/fd_base58.h"
6 : #include "../../disco/keyguard/fd_keyguard.h"
7 : #include "../../util/net/fd_eth.h"
8 : #include "../../util/rng/fd_rng.h"
9 : #include <string.h>
10 : #include <stdio.h>
11 : #include <math.h>
12 : #include <arpa/inet.h>
13 : #include <sys/socket.h>
14 :
15 : #pragma GCC diagnostic ignored "-Wstrict-aliasing"
16 :
17 : /* Max number of validators that can be actively queried */
18 0 : #define FD_ACTIVE_KEY_MAX (1<<12)
19 : /* Max number of pending shred requests */
20 0 : #define FD_NEEDED_KEY_MAX (1<<20)
21 : /* Max number of sticky repair peers */
22 0 : #define FD_REPAIR_STICKY_MAX 1024
23 : /* Max number of validator identities in stake weights */
24 0 : #define FD_STAKE_WEIGHTS_MAX (1<<14)
25 : /* Max number of validator clients that we ping */
26 0 : #define FD_REPAIR_PINGED_MAX (1<<14)
27 : /* Sha256 pre-image size for pings */
28 0 : #define FD_PING_PRE_IMAGE_SZ (48UL)
29 : /* Number of peers to send requests to. */
30 0 : #define FD_REPAIR_NUM_NEEDED_PEERS (4)
31 :
32 : /* Test if two hash values are equal */
33 0 : static int fd_hash_eq( const fd_hash_t * key1, const fd_hash_t * key2 ) {
34 0 : for (ulong i = 0; i < 32U/sizeof(ulong); ++i)
35 0 : if (key1->ul[i] != key2->ul[i])
36 0 : return 0;
37 0 : return 1;
38 0 : }
39 :
40 : /* Hash a hash value */
41 0 : static ulong fd_hash_hash( const fd_hash_t * key, ulong seed ) {
42 0 : return key->ul[0] ^ seed;
43 0 : }
44 :
45 : /* Copy a hash value */
46 0 : static void fd_hash_copy( fd_hash_t * keyd, const fd_hash_t * keys ) {
47 0 : for (ulong i = 0; i < 32U/sizeof(ulong); ++i)
48 0 : keyd->ul[i] = keys->ul[i];
49 0 : }
50 :
51 : /* Test if two addresses are equal */
52 0 : int fd_repair_peer_addr_eq( const fd_repair_peer_addr_t * key1, const fd_repair_peer_addr_t * key2 ) {
53 0 : FD_STATIC_ASSERT(sizeof(fd_repair_peer_addr_t) == sizeof(ulong),"messed up size");
54 0 : return key1->l == key2->l;
55 0 : }
56 :
57 : /* Hash an address */
58 0 : ulong fd_repair_peer_addr_hash( const fd_repair_peer_addr_t * key, ulong seed ) {
59 0 : FD_STATIC_ASSERT(sizeof(fd_repair_peer_addr_t) == sizeof(ulong),"messed up size");
60 0 : return (key->l + seed + 7242237688154252699UL)*9540121337UL;
61 0 : }
62 :
63 : /* Efficiently copy an address */
64 0 : void fd_repair_peer_addr_copy( fd_repair_peer_addr_t * keyd, const fd_repair_peer_addr_t * keys ) {
65 0 : FD_STATIC_ASSERT(sizeof(fd_repair_peer_addr_t) == sizeof(ulong),"messed up size");
66 0 : keyd->l = keys->l;
67 0 : }
68 :
69 : typedef uint fd_repair_nonce_t;
70 :
71 : /* Active table element. This table is all validators that we are
72 : asking for repairs. */
73 : struct fd_active_elem {
74 : fd_pubkey_t key; /* Public identifier and map key */
75 : ulong next; /* used internally by fd_map_giant */
76 :
77 : fd_repair_peer_addr_t addr;
78 : ulong avg_reqs; /* Moving average of the number of requests */
79 : ulong avg_reps; /* Moving average of the number of requests */
80 : long avg_lat; /* Moving average of response latency */
81 : uchar sticky;
82 : uchar permanent;
83 : long first_request_time;
84 : ulong stake;
85 : };
86 : /* Active table */
87 : typedef struct fd_active_elem fd_active_elem_t;
88 : #define MAP_NAME fd_active_table
89 : #define MAP_KEY_T fd_pubkey_t
90 0 : #define MAP_KEY_EQ fd_hash_eq
91 0 : #define MAP_KEY_HASH fd_hash_hash
92 0 : #define MAP_KEY_COPY fd_hash_copy
93 0 : #define MAP_T fd_active_elem_t
94 : #include "../../util/tmpl/fd_map_giant.c"
95 :
96 : enum fd_needed_elem_type {
97 : fd_needed_window_index, fd_needed_highest_window_index, fd_needed_orphan
98 : };
99 :
100 : struct fd_dupdetect_key {
101 : enum fd_needed_elem_type type;
102 : ulong slot;
103 : uint shred_index;
104 : };
105 : typedef struct fd_dupdetect_key fd_dupdetect_key_t;
106 :
107 : struct fd_dupdetect_elem {
108 : fd_dupdetect_key_t key;
109 : long last_send_time;
110 : uint req_cnt;
111 : ulong next;
112 : };
113 : typedef struct fd_dupdetect_elem fd_dupdetect_elem_t;
114 :
115 0 : int fd_dupdetect_eq( const fd_dupdetect_key_t * key1, const fd_dupdetect_key_t * key2 ) {
116 0 : return (key1->type == key2->type) &&
117 0 : (key1->slot == key2->slot) &&
118 0 : (key1->shred_index == key2->shred_index);
119 0 : }
120 :
121 0 : ulong fd_dupdetect_hash( const fd_dupdetect_key_t * key, ulong seed ) {
122 0 : return (key->slot + seed)*9540121337UL + key->shred_index*131U;
123 0 : }
124 :
125 0 : void fd_dupdetect_copy( fd_dupdetect_key_t * keyd, const fd_dupdetect_key_t * keys ) {
126 0 : *keyd = *keys;
127 0 : }
128 :
129 : #define MAP_NAME fd_dupdetect_table
130 : #define MAP_KEY_T fd_dupdetect_key_t
131 0 : #define MAP_KEY_EQ fd_dupdetect_eq
132 0 : #define MAP_KEY_HASH fd_dupdetect_hash
133 0 : #define MAP_KEY_COPY fd_dupdetect_copy
134 0 : #define MAP_T fd_dupdetect_elem_t
135 : #include "../../util/tmpl/fd_map_giant.c"
136 :
137 0 : int fd_repair_nonce_eq( const fd_repair_nonce_t * key1, const fd_repair_nonce_t * key2 ) {
138 0 : return *key1 == *key2;
139 0 : }
140 :
141 0 : ulong fd_repair_nonce_hash( const fd_repair_nonce_t * key, ulong seed ) {
142 0 : return (*key + seed + 7242237688154252699UL)*9540121337UL;
143 0 : }
144 :
145 0 : void fd_repair_nonce_copy( fd_repair_nonce_t * keyd, const fd_repair_nonce_t * keys ) {
146 0 : *keyd = *keys;
147 0 : }
148 :
149 : struct fd_needed_elem {
150 : fd_repair_nonce_t key;
151 : ulong next;
152 : fd_pubkey_t id;
153 : fd_dupdetect_key_t dupkey;
154 : long when;
155 : };
156 : typedef struct fd_needed_elem fd_needed_elem_t;
157 : #define MAP_NAME fd_needed_table
158 : #define MAP_KEY_T fd_repair_nonce_t
159 0 : #define MAP_KEY_EQ fd_repair_nonce_eq
160 0 : #define MAP_KEY_HASH fd_repair_nonce_hash
161 0 : #define MAP_KEY_COPY fd_repair_nonce_copy
162 0 : #define MAP_T fd_needed_elem_t
163 : #include "../../util/tmpl/fd_map_giant.c"
164 :
165 : struct fd_pinged_elem {
166 : fd_repair_peer_addr_t key;
167 : ulong next;
168 : fd_pubkey_t id;
169 : fd_hash_t token;
170 : int good;
171 : };
172 : typedef struct fd_pinged_elem fd_pinged_elem_t;
173 : #define MAP_NAME fd_pinged_table
174 : #define MAP_KEY_T fd_repair_peer_addr_t
175 0 : #define MAP_KEY_EQ fd_repair_peer_addr_eq
176 0 : #define MAP_KEY_HASH fd_repair_peer_addr_hash
177 0 : #define MAP_KEY_COPY fd_repair_peer_addr_copy
178 0 : #define MAP_T fd_pinged_elem_t
179 : #include "../../util/tmpl/fd_map_giant.c"
180 :
181 : /* Global data for repair service */
182 : struct fd_repair {
183 : /* Concurrency lock */
184 : volatile ulong lock;
185 : /* Current time in nanosecs */
186 : long now;
187 : /* My public/private key */
188 : fd_pubkey_t * public_key;
189 : uchar * private_key;
190 : /* My repair addresses */
191 : fd_repair_peer_addr_t service_addr;
192 : fd_repair_peer_addr_t intake_addr;
193 : /* Function used to deliver repair messages to the application */
194 : fd_repair_shred_deliver_fun deliver_fun;
195 : /* Functions used to handle repair requests */
196 : fd_repair_serv_get_shred_fun serv_get_shred_fun;
197 : fd_repair_serv_get_parent_fun serv_get_parent_fun;
198 : /* Function used to send raw packets on the network */
199 : fd_repair_send_packet_fun clnt_send_fun; /* Client requests */
200 : fd_repair_send_packet_fun serv_send_fun; /* Service responses */
201 : /* Function used to send packets for signing to remote tile */
202 : fd_repair_sign_fun sign_fun;
203 : /* Argument to fd_repair_sign_fun */
204 : void * sign_arg;
205 : /* Function used to deliver repair failure on the network */
206 : fd_repair_shred_deliver_fail_fun deliver_fail_fun;
207 : void * fun_arg;
208 : /* Table of validators that we are actively pinging, keyed by repair address */
209 : fd_active_elem_t * actives;
210 : fd_pubkey_t actives_sticky[FD_REPAIR_STICKY_MAX]; /* cache of chosen repair peer samples */
211 : ulong actives_sticky_cnt;
212 : ulong actives_random_seed;
213 : /* Duplicate request detection table */
214 : fd_dupdetect_elem_t * dupdetect;
215 : /* Table of needed shreds */
216 : fd_needed_elem_t * needed;
217 : fd_repair_nonce_t oldest_nonce;
218 : fd_repair_nonce_t current_nonce;
219 : fd_repair_nonce_t next_nonce;
220 : /* Table of validator clients that we have pinged */
221 : fd_pinged_elem_t * pinged;
222 : /* Last batch of sends */
223 : long last_sends;
224 : /* Last statistics decay */
225 : long last_decay;
226 : /* Last statistics printout */
227 : long last_print;
228 : /* Random number generator */
229 : fd_rng_t rng[1];
230 : /* RNG seed */
231 : ulong seed;
232 : /* Stake weights */
233 : ulong stake_weights_cnt;
234 : fd_stake_weight_t * stake_weights;
235 : };
236 :
237 : ulong
238 0 : fd_repair_align ( void ) { return 128UL; }
239 :
240 : ulong
241 0 : fd_repair_footprint( void ) {
242 0 : ulong l = FD_LAYOUT_INIT;
243 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_repair_t), sizeof(fd_repair_t) );
244 0 : l = FD_LAYOUT_APPEND( l, fd_active_table_align(), fd_active_table_footprint(FD_ACTIVE_KEY_MAX) );
245 0 : l = FD_LAYOUT_APPEND( l, fd_needed_table_align(), fd_needed_table_footprint(FD_NEEDED_KEY_MAX) );
246 0 : l = FD_LAYOUT_APPEND( l, fd_dupdetect_table_align(), fd_dupdetect_table_footprint(FD_NEEDED_KEY_MAX) );
247 0 : l = FD_LAYOUT_APPEND( l, fd_pinged_table_align(), fd_pinged_table_footprint(FD_REPAIR_PINGED_MAX) );
248 0 : l = FD_LAYOUT_APPEND( l, fd_stake_weight_align(), FD_STAKE_WEIGHTS_MAX * fd_stake_weight_footprint() );
249 0 : return FD_LAYOUT_FINI(l, fd_repair_align() );
250 0 : }
251 :
252 : void *
253 0 : fd_repair_new ( void * shmem, ulong seed ) {
254 0 : FD_SCRATCH_ALLOC_INIT(l, shmem);
255 0 : fd_repair_t * glob = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_repair_t), sizeof(fd_repair_t) );
256 0 : fd_memset(glob, 0, sizeof(fd_repair_t));
257 0 : void * shm = FD_SCRATCH_ALLOC_APPEND( l, fd_active_table_align(), fd_active_table_footprint(FD_ACTIVE_KEY_MAX) );
258 0 : glob->actives = fd_active_table_join(fd_active_table_new(shm, FD_ACTIVE_KEY_MAX, seed));
259 0 : glob->seed = seed;
260 0 : shm = FD_SCRATCH_ALLOC_APPEND( l, fd_needed_table_align(), fd_needed_table_footprint(FD_NEEDED_KEY_MAX) );
261 0 : glob->needed = fd_needed_table_join(fd_needed_table_new(shm, FD_NEEDED_KEY_MAX, seed));
262 0 : shm = FD_SCRATCH_ALLOC_APPEND( l, fd_dupdetect_table_align(), fd_dupdetect_table_footprint(FD_NEEDED_KEY_MAX) );
263 0 : glob->dupdetect = fd_dupdetect_table_join(fd_dupdetect_table_new(shm, FD_NEEDED_KEY_MAX, seed));
264 0 : shm = FD_SCRATCH_ALLOC_APPEND( l, fd_pinged_table_align(), fd_pinged_table_footprint(FD_REPAIR_PINGED_MAX) );
265 0 : glob->pinged = fd_pinged_table_join(fd_pinged_table_new(shm, FD_REPAIR_PINGED_MAX, seed));
266 0 : glob->stake_weights = FD_SCRATCH_ALLOC_APPEND( l, fd_stake_weight_align(), FD_STAKE_WEIGHTS_MAX * fd_stake_weight_footprint() );
267 0 : glob->stake_weights_cnt = 0;
268 0 : glob->last_sends = 0;
269 0 : glob->last_decay = 0;
270 0 : glob->last_print = 0;
271 0 : glob->oldest_nonce = glob->current_nonce = glob->next_nonce = 0;
272 0 : fd_rng_new(glob->rng, (uint)seed, 0UL);
273 :
274 0 : glob->actives_sticky_cnt = 0;
275 0 : glob->actives_random_seed = 0;
276 :
277 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI(l, 1UL);
278 0 : if ( scratch_top > (ulong)shmem + fd_repair_footprint() ) {
279 0 : FD_LOG_ERR(("Enough space not allocated for repair"));
280 0 : }
281 :
282 0 : return glob;
283 0 : }
284 :
285 : fd_repair_t *
286 0 : fd_repair_join ( void * shmap ) { return (fd_repair_t *)shmap; }
287 :
288 : void *
289 0 : fd_repair_leave ( fd_repair_t * join ) { return join; }
290 :
291 : void *
292 0 : fd_repair_delete ( void * shmap ) {
293 0 : fd_repair_t * glob = (fd_repair_t *)shmap;
294 0 : fd_active_table_delete( fd_active_table_leave( glob->actives ) );
295 0 : fd_needed_table_delete( fd_needed_table_leave( glob->needed ) );
296 0 : fd_dupdetect_table_delete( fd_dupdetect_table_leave( glob->dupdetect ) );
297 0 : fd_pinged_table_delete( fd_pinged_table_leave( glob->pinged ) );
298 0 : return glob;
299 0 : }
300 :
301 : static void
302 0 : fd_repair_lock( fd_repair_t * repair ) {
303 0 : # if FD_HAS_THREADS
304 0 : for(;;) {
305 0 : if( FD_LIKELY( !FD_ATOMIC_CAS( &repair->lock, 0UL, 1UL) ) ) break;
306 0 : FD_SPIN_PAUSE();
307 0 : }
308 : # else
309 : repair->lock = 1;
310 : # endif
311 0 : FD_COMPILER_MFENCE();
312 0 : }
313 :
314 : static void
315 0 : fd_repair_unlock( fd_repair_t * repair ) {
316 0 : FD_COMPILER_MFENCE();
317 0 : FD_VOLATILE( repair->lock ) = 0UL;
318 0 : }
319 :
320 : /* Convert an address to a human readable string */
321 0 : const char * fd_repair_addr_str( char * dst, size_t dstlen, fd_repair_peer_addr_t const * src ) {
322 0 : char tmp[INET_ADDRSTRLEN];
323 0 : snprintf(dst, dstlen, "%s:%u", inet_ntop(AF_INET, &src->addr, tmp, INET_ADDRSTRLEN), (uint)ntohs(src->port));
324 0 : return dst;
325 0 : }
326 :
327 : /* Set the repair configuration */
328 : int
329 0 : fd_repair_set_config( fd_repair_t * glob, const fd_repair_config_t * config ) {
330 0 : char tmp[100];
331 0 : char keystr[ FD_BASE58_ENCODED_32_SZ ];
332 0 : fd_base58_encode_32( config->public_key->uc, NULL, keystr );
333 0 : FD_LOG_NOTICE(("configuring address %s key %s", fd_repair_addr_str(tmp, sizeof(tmp), &config->intake_addr), keystr));
334 :
335 0 : glob->public_key = config->public_key;
336 0 : glob->private_key = config->private_key;
337 0 : fd_repair_peer_addr_copy(&glob->intake_addr, &config->intake_addr);
338 0 : fd_repair_peer_addr_copy(&glob->service_addr, &config->service_addr);
339 0 : glob->deliver_fun = config->deliver_fun;
340 0 : glob->serv_get_shred_fun = config->serv_get_shred_fun;
341 0 : glob->serv_get_parent_fun = config->serv_get_parent_fun;
342 0 : glob->clnt_send_fun = config->clnt_send_fun;
343 0 : glob->serv_send_fun = config->serv_send_fun;
344 0 : glob->fun_arg = config->fun_arg;
345 0 : glob->sign_fun = config->sign_fun;
346 0 : glob->sign_arg = config->sign_arg;
347 0 : glob->deliver_fail_fun = config->deliver_fail_fun;
348 0 : return 0;
349 0 : }
350 :
351 : int
352 0 : fd_repair_update_addr( fd_repair_t * glob, const fd_repair_peer_addr_t * intake_addr, const fd_repair_peer_addr_t * service_addr ) {
353 0 : char tmp[100];
354 0 : FD_LOG_NOTICE(("updating address %s", fd_repair_addr_str(tmp, sizeof(tmp), intake_addr)));
355 :
356 0 : fd_repair_peer_addr_copy(&glob->intake_addr, intake_addr);
357 0 : fd_repair_peer_addr_copy(&glob->service_addr, service_addr);
358 0 : return 0;
359 0 : }
360 :
361 : /* Initiate connection to a peer */
362 : int
363 0 : fd_repair_add_active_peer( fd_repair_t * glob, fd_repair_peer_addr_t const * addr, fd_pubkey_t const * id ) {
364 0 : fd_repair_lock( glob );
365 0 : char tmp[100];
366 0 : char keystr[ FD_BASE58_ENCODED_32_SZ ];
367 0 : fd_base58_encode_32( id->uc, NULL, keystr );
368 0 : FD_LOG_DEBUG(("adding active peer address %s key %s", fd_repair_addr_str(tmp, sizeof(tmp), addr), keystr));
369 :
370 0 : fd_active_elem_t * val = fd_active_table_query(glob->actives, id, NULL);
371 0 : if (val == NULL) {
372 0 : if (fd_active_table_is_full(glob->actives)) {
373 0 : FD_LOG_DEBUG(("too many actives"));
374 0 : fd_repair_unlock( glob );
375 0 : return -1;
376 0 : }
377 0 : val = fd_active_table_insert(glob->actives, id);
378 0 : fd_repair_peer_addr_copy(&val->addr, addr);
379 0 : val->avg_reqs = 0;
380 0 : val->avg_reps = 0;
381 0 : val->avg_lat = 0;
382 0 : val->sticky = 0;
383 0 : val->first_request_time = 0;
384 0 : val->permanent = 0;
385 0 : val->stake = 0UL;
386 0 : FD_LOG_DEBUG(( "adding repair peer %s", FD_BASE58_ENC_32_ALLOCA( val->key.uc ) ));
387 0 : }
388 0 : fd_repair_unlock( glob );
389 0 : return 0;
390 0 : }
391 :
392 : /* Set the current protocol time in nanosecs */
393 : void
394 0 : fd_repair_settime( fd_repair_t * glob, long ts ) {
395 0 : glob->now = ts;
396 0 : }
397 :
398 : /* Get the current protocol time in nanosecs */
399 : long
400 0 : fd_repair_gettime( fd_repair_t * glob ) {
401 0 : return glob->now;
402 0 : }
403 :
404 : static void
405 : fd_repair_sign_and_send( fd_repair_t * glob,
406 : fd_repair_protocol_t * protocol,
407 0 : fd_gossip_peer_addr_t * addr ) {
408 :
409 0 : uchar _buf[1024];
410 0 : uchar * buf = _buf;
411 0 : ulong buflen = sizeof(_buf);
412 0 : fd_bincode_encode_ctx_t ctx = { .data = buf, .dataend = buf + buflen };
413 0 : if( FD_UNLIKELY( fd_repair_protocol_encode( protocol, &ctx ) != FD_BINCODE_SUCCESS ) ) {
414 0 : FD_LOG_CRIT(( "Failed to encode repair message (type %#x)", protocol->discriminant ));
415 0 : }
416 :
417 0 : buflen = (ulong)ctx.data - (ulong)buf;
418 0 : if( FD_UNLIKELY( buflen<68 ) ) {
419 0 : FD_LOG_CRIT(( "Attempted to sign unsigned repair message type (type %#x)", protocol->discriminant ));
420 0 : }
421 :
422 : /* At this point buffer contains
423 :
424 : [ discriminant ] [ signature ] [ payload ]
425 : ^ ^ ^
426 : 0 4 68 */
427 :
428 : /* https://github.com/solana-labs/solana/blob/master/core/src/repair/serve_repair.rs#L874 */
429 :
430 0 : fd_memcpy( buf+64, buf, 4 );
431 0 : buf += 64UL;
432 0 : buflen -= 64UL;
433 :
434 : /* Now it contains
435 :
436 : [ discriminant ] [ payload ]
437 : ^ ^
438 : 0 4 */
439 :
440 0 : fd_signature_t sig;
441 0 : (*glob->sign_fun)( glob->sign_arg, sig.uc, buf, buflen, FD_KEYGUARD_SIGN_TYPE_ED25519 );
442 :
443 : /* Reintroduce the signature */
444 :
445 0 : buf -= 64UL;
446 0 : buflen += 64UL;
447 0 : fd_memcpy( buf + 4U, &sig, 64U );
448 :
449 0 : (*glob->clnt_send_fun)( buf, buflen, addr, glob->fun_arg );
450 0 : }
451 :
452 : static void
453 0 : fd_repair_send_requests( fd_repair_t * glob ) {
454 : /* Garbage collect old requests */
455 0 : long expire = glob->now - (long)5e9; /* 5 seconds */
456 0 : fd_repair_nonce_t n;
457 0 : for ( n = glob->oldest_nonce; n != glob->next_nonce; ++n ) {
458 0 : fd_needed_elem_t * ele = fd_needed_table_query( glob->needed, &n, NULL );
459 0 : if ( NULL == ele )
460 0 : continue;
461 0 : if (ele->when > expire)
462 0 : break;
463 : // (*glob->deliver_fail_fun)( &ele->key, ele->slot, ele->shred_index, glob->fun_arg, FD_REPAIR_DELIVER_FAIL_TIMEOUT );
464 0 : fd_dupdetect_elem_t * dup = fd_dupdetect_table_query( glob->dupdetect, &ele->dupkey, NULL );
465 0 : if( dup && --dup->req_cnt == 0) {
466 0 : fd_dupdetect_table_remove( glob->dupdetect, &ele->dupkey );
467 0 : }
468 0 : fd_needed_table_remove( glob->needed, &n );
469 0 : }
470 0 : glob->oldest_nonce = n;
471 :
472 : /* Send requests starting where we left off last time */
473 0 : if ( (int)(n - glob->current_nonce) < 0 )
474 0 : n = glob->current_nonce;
475 0 : ulong j = 0;
476 0 : ulong k = 0;
477 0 : for ( ; n != glob->next_nonce; ++n ) {
478 0 : ++k;
479 0 : fd_needed_elem_t * ele = fd_needed_table_query( glob->needed, &n, NULL );
480 0 : if ( NULL == ele )
481 0 : continue;
482 :
483 0 : if(j == 128U) break;
484 0 : ++j;
485 :
486 : /* Track statistics */
487 0 : ele->when = glob->now;
488 :
489 0 : fd_active_elem_t * active = fd_active_table_query( glob->actives, &ele->id, NULL );
490 0 : if ( active == NULL) {
491 0 : fd_dupdetect_elem_t * dup = fd_dupdetect_table_query( glob->dupdetect, &ele->dupkey, NULL );
492 0 : if( dup && --dup->req_cnt == 0) {
493 0 : fd_dupdetect_table_remove( glob->dupdetect, &ele->dupkey );
494 0 : }
495 0 : fd_needed_table_remove( glob->needed, &n );
496 0 : continue;
497 0 : }
498 :
499 0 : active->avg_reqs++;
500 :
501 0 : fd_repair_protocol_t protocol;
502 0 : switch (ele->dupkey.type) {
503 0 : case fd_needed_window_index: {
504 0 : fd_repair_protocol_new_disc(&protocol, fd_repair_protocol_enum_window_index);
505 0 : fd_repair_window_index_t * wi = &protocol.inner.window_index;
506 0 : fd_hash_copy(&wi->header.sender, glob->public_key);
507 0 : fd_hash_copy(&wi->header.recipient, &active->key);
508 0 : wi->header.timestamp = glob->now/1000000L;
509 0 : wi->header.nonce = n;
510 0 : wi->slot = ele->dupkey.slot;
511 0 : wi->shred_index = ele->dupkey.shred_index;
512 : // FD_LOG_INFO(("[repair]"))
513 0 : break;
514 0 : }
515 :
516 0 : case fd_needed_highest_window_index: {
517 0 : fd_repair_protocol_new_disc(&protocol, fd_repair_protocol_enum_highest_window_index);
518 0 : fd_repair_highest_window_index_t * wi = &protocol.inner.highest_window_index;
519 0 : fd_hash_copy(&wi->header.sender, glob->public_key);
520 0 : fd_hash_copy(&wi->header.recipient, &active->key);
521 0 : wi->header.timestamp = glob->now/1000000L;
522 0 : wi->header.nonce = n;
523 0 : wi->slot = ele->dupkey.slot;
524 0 : wi->shred_index = ele->dupkey.shred_index;
525 0 : break;
526 0 : }
527 :
528 0 : case fd_needed_orphan: {
529 0 : fd_repair_protocol_new_disc(&protocol, fd_repair_protocol_enum_orphan);
530 0 : fd_repair_orphan_t * wi = &protocol.inner.orphan;
531 0 : fd_hash_copy(&wi->header.sender, glob->public_key);
532 0 : fd_hash_copy(&wi->header.recipient, &active->key);
533 0 : wi->header.timestamp = glob->now/1000000L;
534 0 : wi->header.nonce = n;
535 0 : wi->slot = ele->dupkey.slot;
536 0 : break;
537 0 : }
538 0 : }
539 :
540 0 : fd_repair_sign_and_send(glob, &protocol, &active->addr);
541 :
542 0 : }
543 0 : glob->current_nonce = n;
544 0 : if( k )
545 0 : FD_LOG_DEBUG(("checked %lu nonces, sent %lu packets, total %lu", k, j, fd_needed_table_key_cnt( glob->needed )));
546 0 : }
547 :
548 : static void
549 0 : fd_repair_decay_stats( fd_repair_t * glob ) {
550 0 : for( fd_active_table_iter_t iter = fd_active_table_iter_init( glob->actives );
551 0 : !fd_active_table_iter_done( glob->actives, iter );
552 0 : iter = fd_active_table_iter_next( glob->actives, iter ) ) {
553 0 : fd_active_elem_t * ele = fd_active_table_iter_ele( glob->actives, iter );
554 0 : #define DECAY(_v_) _v_ = _v_ - ((_v_)>>3U) /* Reduce by 12.5% */
555 0 : DECAY(ele->avg_reqs);
556 0 : DECAY(ele->avg_reps);
557 0 : DECAY(ele->avg_lat);
558 0 : #undef DECAY
559 0 : }
560 0 : }
561 :
562 : /* Start timed events and other protocol behavior */
563 : int
564 0 : fd_repair_start( fd_repair_t * glob ) {
565 0 : glob->last_sends = glob->now;
566 0 : glob->last_decay = glob->now;
567 0 : glob->last_print = glob->now;
568 0 : return 0;
569 0 : }
570 :
571 : static void fd_repair_print_all_stats( fd_repair_t * glob );
572 : static void fd_actives_shuffle( fd_repair_t * repair );
573 :
574 : /* Dispatch timed events and other protocol behavior. This should be
575 : * called inside the main spin loop. */
576 : int
577 0 : fd_repair_continue( fd_repair_t * glob ) {
578 0 : fd_repair_lock( glob );
579 0 : if ( glob->now - glob->last_sends > (long)1e6 ) { /* 1 millisecond */
580 0 : fd_repair_send_requests( glob );
581 0 : glob->last_sends = glob->now;
582 0 : }
583 0 : if ( glob->now - glob->last_print > (long)30e9 ) { /* 30 seconds */
584 0 : fd_repair_print_all_stats( glob );
585 0 : glob->last_print = glob->now;
586 0 : fd_actives_shuffle( glob );
587 0 : fd_repair_decay_stats( glob );
588 0 : glob->last_decay = glob->now;
589 0 : } else if ( glob->now - glob->last_decay > (long)15e9 ) { /* 15 seconds */
590 0 : fd_actives_shuffle( glob );
591 0 : fd_repair_decay_stats( glob );
592 0 : glob->last_decay = glob->now;
593 0 : }
594 0 : fd_repair_unlock( glob );
595 0 : return 0;
596 0 : }
597 :
598 : static void
599 0 : fd_repair_recv_ping(fd_repair_t * glob, fd_gossip_ping_t const * ping, fd_gossip_peer_addr_t const * from) {
600 0 : (void)from;
601 0 : fd_repair_protocol_t protocol;
602 0 : fd_repair_protocol_new_disc(&protocol, fd_repair_protocol_enum_pong);
603 0 : fd_gossip_ping_t * pong = &protocol.inner.pong;
604 :
605 0 : fd_hash_copy( &pong->from, glob->public_key );
606 :
607 : /* Generate response hash token */
608 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
609 0 : memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
610 0 : memcpy( pre_image+16UL, ping->token.uc, 32UL);
611 :
612 : /* Generate response hash token */
613 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, &pong->token );
614 :
615 : /* Sign it */
616 0 : (*glob->sign_fun)( glob->sign_arg, pong->signature.uc, pre_image, FD_PING_PRE_IMAGE_SZ, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519 );
617 :
618 0 : fd_bincode_encode_ctx_t ctx;
619 0 : uchar buf[1024];
620 0 : ctx.data = buf;
621 0 : ctx.dataend = buf + sizeof(buf);
622 0 : FD_TEST(0 == fd_repair_protocol_encode(&protocol, &ctx));
623 0 : ulong buflen = (ulong)((uchar*)ctx.data - buf);
624 :
625 0 : (*glob->clnt_send_fun)(buf, buflen, from, glob->fun_arg);
626 0 : }
627 :
628 : int
629 0 : fd_repair_recv_clnt_packet(fd_repair_t * glob, uchar const * msg, ulong msglen, fd_gossip_peer_addr_t const * from) {
630 0 : fd_repair_lock( glob );
631 0 : FD_SCRATCH_SCOPE_BEGIN {
632 0 : while (1) {
633 0 : fd_repair_response_t gmsg;
634 0 : fd_bincode_decode_ctx_t ctx;
635 0 : ctx.data = msg;
636 0 : ctx.dataend = msg + msglen;
637 0 : ctx.valloc = fd_scratch_virtual();
638 0 : if (fd_repair_response_decode(&gmsg, &ctx)) {
639 : /* Solana falls back to assuming we got a shred in this case
640 : https://github.com/solana-labs/solana/blob/master/core/src/repair/serve_repair.rs#L1198 */
641 0 : break;
642 0 : }
643 0 : fd_bincode_destroy_ctx_t ctx2;
644 0 : ctx2.valloc = fd_scratch_virtual();
645 0 : if (ctx.data != ctx.dataend) {
646 0 : fd_repair_response_destroy(&gmsg, &ctx2);
647 0 : break;
648 0 : }
649 :
650 0 : switch (gmsg.discriminant) {
651 0 : case fd_repair_response_enum_ping:
652 0 : fd_repair_recv_ping(glob, &gmsg.inner.ping, from);
653 0 : break;
654 0 : }
655 :
656 0 : fd_repair_response_destroy(&gmsg, &ctx2);
657 0 : fd_repair_unlock( glob );
658 0 : return 0;
659 0 : }
660 :
661 : /* Look at the nonse */
662 0 : if ( msglen < sizeof(fd_repair_nonce_t) ) {
663 0 : fd_repair_unlock( glob );
664 0 : return 0;
665 0 : }
666 0 : ulong shredlen = msglen - sizeof(fd_repair_nonce_t); /* Nonce is at the end */
667 0 : fd_repair_nonce_t key = *(fd_repair_nonce_t const *)(msg + shredlen);
668 0 : fd_needed_elem_t * val = fd_needed_table_query(glob->needed, &key, NULL);
669 0 : if ( NULL == val ) {
670 0 : fd_repair_unlock( glob );
671 0 : return 0;
672 0 : }
673 :
674 0 : fd_active_elem_t * active = fd_active_table_query( glob->actives, &val->id, NULL );
675 0 : if ( NULL != active ) {
676 : /* Update statistics */
677 0 : active->avg_reps++;
678 0 : active->avg_lat += glob->now - val->when;
679 0 : }
680 :
681 0 : fd_shred_t const * shred = fd_shred_parse(msg, shredlen);
682 0 : fd_repair_unlock( glob );
683 0 : if (shred == NULL) {
684 0 : FD_LOG_WARNING(("invalid shread"));
685 0 : } else {
686 0 : (*glob->deliver_fun)(shred, shredlen, from, &val->id, glob->fun_arg);
687 0 : }
688 0 : } FD_SCRATCH_SCOPE_END;
689 0 : return 0;
690 0 : }
691 :
692 : int
693 0 : fd_repair_is_full( fd_repair_t * glob ) {
694 0 : return fd_needed_table_is_full(glob->needed);
695 0 : }
696 :
697 : /* Test if a peer is good. Returns 1 if the peer is "great", 0 if the peer is "good", and -1 if the peer sucks */
698 : static int
699 0 : is_good_peer( fd_active_elem_t * val ) {
700 0 : if( FD_UNLIKELY( NULL == val ) ) return -1; /* Very bad */
701 0 : if( val->avg_reqs > 10U && val->avg_reps == 0U ) return -1; /* Bad, no response after 10 requests */
702 0 : if( val->avg_reqs < 20U ) return 0; /* Not sure yet, good enough for now */
703 0 : if( (float)val->avg_reps < 0.01f*((float)val->avg_reqs) ) return -1; /* Very bad */
704 0 : if( (float)val->avg_reps < 0.8f*((float)val->avg_reqs) ) return 0; /* 80%, Good but not great */
705 0 : if( (float)val->avg_lat > 2500e9f*((float)val->avg_reps) ) return 0; /* 300ms, Good but not great */
706 0 : return 1; /* Great! */
707 0 : }
708 :
709 : #define SORT_NAME fd_latency_sort
710 0 : #define SORT_KEY_T long
711 0 : #define SORT_BEFORE(a,b) (a)<(b)
712 : #include "../../util/tmpl/fd_sort.c"
713 :
714 : static void
715 0 : fd_actives_shuffle( fd_repair_t * repair ) {
716 0 : if( repair->stake_weights_cnt == 0 ) {
717 0 : FD_LOG_NOTICE(( "repair does not have stake weights yet, shuffling active set" ));
718 0 : }
719 :
720 0 : FD_SCRATCH_SCOPE_BEGIN {
721 0 : ulong prev_sticky_cnt = repair->actives_sticky_cnt;
722 : /* Find all the usable stake holders */
723 0 : fd_active_elem_t ** leftovers = fd_scratch_alloc(
724 0 : alignof( fd_active_elem_t * ),
725 0 : sizeof( fd_active_elem_t * ) * repair->stake_weights_cnt );
726 0 : ulong leftovers_cnt = 0;
727 :
728 0 : if( repair->stake_weights_cnt==0 ) {
729 0 : leftovers = fd_scratch_alloc(
730 0 : alignof( fd_active_elem_t * ),
731 0 : sizeof( fd_active_elem_t * ) * fd_active_table_key_cnt( repair->actives ) );
732 :
733 0 : for( fd_active_table_iter_t iter = fd_active_table_iter_init( repair->actives );
734 0 : !fd_active_table_iter_done( repair->actives, iter );
735 0 : iter = fd_active_table_iter_next( repair->actives, iter ) ) {
736 0 : fd_active_elem_t * peer = fd_active_table_iter_ele( repair->actives, iter );
737 0 : if( peer->sticky ) continue;
738 0 : leftovers[leftovers_cnt++] = peer;
739 0 : }
740 0 : } else {
741 0 : leftovers = fd_scratch_alloc(
742 0 : alignof( fd_active_elem_t * ),
743 0 : sizeof( fd_active_elem_t * ) * repair->stake_weights_cnt );
744 :
745 0 : for( ulong i = 0; i < repair->stake_weights_cnt; i++ ) {
746 0 : fd_stake_weight_t const * stake_weight = &repair->stake_weights[i];
747 0 : ulong stake = stake_weight->stake;
748 0 : if( !stake ) continue;
749 0 : fd_pubkey_t const * key = &stake_weight->key;
750 0 : fd_active_elem_t * peer = fd_active_table_query( repair->actives, key, NULL );
751 0 : if( peer!=NULL ) {
752 0 : peer->stake = stake;
753 0 : }
754 0 : if( NULL == peer || peer->sticky ) continue;
755 0 : leftovers[leftovers_cnt++] = peer;
756 0 : }
757 0 : }
758 :
759 0 : fd_active_elem_t * best[FD_REPAIR_STICKY_MAX];
760 0 : ulong best_cnt = 0;
761 0 : fd_active_elem_t * good[FD_REPAIR_STICKY_MAX];
762 0 : ulong good_cnt = 0;
763 :
764 0 : long latencies[ FD_REPAIR_STICKY_MAX ];
765 0 : ulong latencies_cnt = 0UL;
766 :
767 0 : long first_quartile_latency = LONG_MAX;
768 :
769 : /* fetch all latencies */
770 0 : for( fd_active_table_iter_t iter = fd_active_table_iter_init( repair->actives );
771 0 : !fd_active_table_iter_done( repair->actives, iter );
772 0 : iter = fd_active_table_iter_next( repair->actives, iter ) ) {
773 0 : fd_active_elem_t * peer = fd_active_table_iter_ele( repair->actives, iter );
774 :
775 0 : if( !peer->sticky ) {
776 0 : continue;
777 0 : }
778 :
779 0 : if( peer->avg_lat==0L || peer->avg_reps==0UL ) {
780 0 : continue;
781 0 : }
782 :
783 0 : latencies[ latencies_cnt++ ] = peer->avg_lat/(long)peer->avg_reps;
784 0 : }
785 :
786 0 : if( latencies_cnt >= 4 ) {
787 : /* we probably want a few peers before sorting and pruning them based on
788 : latency. */
789 0 : fd_latency_sort_inplace( latencies, latencies_cnt );
790 0 : first_quartile_latency = latencies[ latencies_cnt / 4UL ];
791 0 : FD_LOG_NOTICE(( "repair peers first quartile latency - latency: %6.6f ms", (double)first_quartile_latency * 1e-6 ));
792 0 : }
793 :
794 : /* select an upper bound */
795 : /* acceptable latency is 2 * first quartile latency */
796 0 : long acceptable_latency = first_quartile_latency != LONG_MAX ? 2L * first_quartile_latency : LONG_MAX;
797 0 : for( fd_active_table_iter_t iter = fd_active_table_iter_init( repair->actives );
798 0 : !fd_active_table_iter_done( repair->actives, iter );
799 0 : iter = fd_active_table_iter_next( repair->actives, iter ) ) {
800 0 : fd_active_elem_t * peer = fd_active_table_iter_ele( repair->actives, iter );
801 0 : uchar sticky = peer->sticky;
802 0 : peer->sticky = 0; /* Already clear the sticky bit */
803 0 : if( peer->permanent ) {
804 0 : best[best_cnt++] = peer;
805 0 : } else if( sticky ) {
806 : /* See if we still like this peer */
807 0 : if( peer->avg_reps>0UL && ( peer->avg_lat/(long)peer->avg_reps ) >= acceptable_latency ) {
808 0 : continue;
809 0 : }
810 0 : int r = is_good_peer( peer );
811 0 : if( r == 1 ) best[best_cnt++] = peer;
812 0 : else if( r == 0 ) good[good_cnt++] = peer;
813 0 : }
814 0 : }
815 :
816 0 : ulong tot_cnt = 0;
817 0 : for( ulong i = 0; i < best_cnt && tot_cnt < FD_REPAIR_STICKY_MAX - 2U; ++i ) {
818 0 : repair->actives_sticky[tot_cnt++] = best[i]->key;
819 0 : best[i]->sticky = (uchar)1;
820 0 : }
821 0 : for( ulong i = 0; i < good_cnt && tot_cnt < FD_REPAIR_STICKY_MAX - 2U; ++i ) {
822 0 : repair->actives_sticky[tot_cnt++] = good[i]->key;
823 0 : good[i]->sticky = (uchar)1;
824 0 : }
825 0 : if( leftovers_cnt ) {
826 : /* Always try afew new ones */
827 0 : ulong seed = repair->actives_random_seed;
828 0 : for( ulong i = 0; i < 64 && tot_cnt < FD_REPAIR_STICKY_MAX && tot_cnt < fd_active_table_key_cnt( repair->actives ); ++i ) {
829 0 : seed = ( seed + 774583887101UL ) * 131UL;
830 0 : fd_active_elem_t * peer = leftovers[seed % leftovers_cnt];
831 0 : repair->actives_sticky[tot_cnt++] = peer->key;
832 0 : peer->sticky = (uchar)1;
833 0 : }
834 0 : repair->actives_random_seed = seed;
835 0 : }
836 0 : repair->actives_sticky_cnt = tot_cnt;
837 :
838 0 : FD_LOG_NOTICE(
839 0 : ( "selected %lu (previously: %lu) peers for repair (best was %lu, good was %lu, leftovers was %lu) (nonce_diff: %u)",
840 0 : tot_cnt,
841 0 : prev_sticky_cnt,
842 0 : best_cnt,
843 0 : good_cnt,
844 0 : leftovers_cnt,
845 0 : repair->next_nonce - repair->current_nonce ) );
846 0 : }
847 0 : FD_SCRATCH_SCOPE_END;
848 0 : }
849 :
850 : static fd_active_elem_t *
851 0 : actives_sample( fd_repair_t * repair ) {
852 0 : ulong seed = repair->actives_random_seed;
853 0 : while( repair->actives_sticky_cnt ) {
854 0 : seed += 774583887101UL;
855 0 : fd_pubkey_t * id = &repair->actives_sticky[seed % repair->actives_sticky_cnt];
856 0 : fd_active_elem_t * peer = fd_active_table_query( repair->actives, id, NULL );
857 0 : if( NULL != peer ) {
858 0 : if( peer->first_request_time == 0U ) peer->first_request_time = repair->now;
859 : /* Aggressively throw away bad peers */
860 0 : if( peer->permanent ||
861 0 : repair->now - peer->first_request_time < (long)5e9 || /* Sample the peer for at least 5 seconds */
862 0 : is_good_peer( peer ) != -1 ) {
863 0 : repair->actives_random_seed = seed;
864 0 : return peer;
865 0 : }
866 0 : peer->sticky = 0;
867 0 : }
868 0 : *id = repair->actives_sticky[--( repair->actives_sticky_cnt )];
869 0 : }
870 0 : return NULL;
871 0 : }
872 :
873 : static int
874 0 : fd_repair_create_needed_request( fd_repair_t * glob, int type, ulong slot, uint shred_index ) {
875 0 : fd_repair_lock( glob );
876 0 : fd_pubkey_t * ids[FD_REPAIR_NUM_NEEDED_PEERS] = {0};
877 0 : uint found_peer = 0;
878 0 : uint peer_cnt = fd_uint_min( (uint)glob->actives_sticky_cnt, FD_REPAIR_NUM_NEEDED_PEERS );
879 0 : for( ulong i=0UL; i<peer_cnt; i++ ) {
880 0 : fd_active_elem_t * peer = actives_sample( glob );
881 0 : if(!peer) continue;
882 0 : found_peer = 1;
883 :
884 0 : ids[i] = &peer->key;
885 0 : }
886 :
887 0 : if (!found_peer) {
888 0 : FD_LOG_DEBUG( ( "failed to find a good peer." ) );
889 0 : fd_repair_unlock( glob );
890 0 : return -1;
891 0 : };
892 :
893 0 : fd_dupdetect_key_t dupkey = { .type = (enum fd_needed_elem_type)type, .slot = slot, .shred_index = shred_index };
894 0 : fd_dupdetect_elem_t * dupelem = fd_dupdetect_table_query( glob->dupdetect, &dupkey, NULL );
895 0 : if( dupelem == NULL ) {
896 0 : dupelem = fd_dupdetect_table_insert( glob->dupdetect, &dupkey );
897 0 : dupelem->last_send_time = 0L;
898 0 : } else if( ( dupelem->last_send_time+(long)200e6 )<glob->now ) {
899 0 : fd_repair_unlock( glob );
900 0 : return 0;
901 0 : }
902 :
903 0 : dupelem->last_send_time = glob->now;
904 0 : dupelem->req_cnt = peer_cnt;
905 :
906 0 : if (fd_needed_table_is_full(glob->needed)) {
907 0 : fd_repair_unlock( glob );
908 0 : FD_LOG_NOTICE(("table full"));
909 0 : ( *glob->deliver_fail_fun )(ids[0], slot, shred_index, glob->fun_arg, FD_REPAIR_DELIVER_FAIL_REQ_LIMIT_EXCEEDED );
910 0 : return -1;
911 0 : }
912 0 : for( ulong i=0UL; i<peer_cnt; i++ ) {
913 0 : fd_repair_nonce_t key = glob->next_nonce++;
914 0 : fd_needed_elem_t * val = fd_needed_table_insert(glob->needed, &key);
915 0 : fd_hash_copy(&val->id, ids[i]);
916 0 : val->dupkey = dupkey;
917 0 : val->when = glob->now;
918 0 : }
919 0 : fd_repair_unlock( glob );
920 0 : return 0;
921 0 : }
922 :
923 : int
924 0 : fd_repair_need_window_index( fd_repair_t * glob, ulong slot, uint shred_index ) {
925 : // FD_LOG_INFO( ( "[repair] need window %lu, shred_index %lu", slot, shred_index ) );
926 0 : return fd_repair_create_needed_request( glob, fd_needed_window_index, slot, shred_index );
927 0 : }
928 :
929 : int
930 0 : fd_repair_need_highest_window_index( fd_repair_t * glob, ulong slot, uint shred_index ) {
931 : // FD_LOG_INFO( ( "[repair] need highest %lu", slot ) );
932 0 : return fd_repair_create_needed_request( glob, fd_needed_highest_window_index, slot, shred_index );
933 0 : }
934 :
935 : int
936 0 : fd_repair_need_orphan( fd_repair_t * glob, ulong slot ) {
937 0 : FD_LOG_NOTICE( ( "[repair] need orphan %lu", slot ) );
938 0 : return fd_repair_create_needed_request( glob, fd_needed_orphan, slot, UINT_MAX );
939 0 : }
940 :
941 : static void
942 0 : print_stats( fd_active_elem_t * val ) {
943 0 : fd_pubkey_t const * id = &val->key;
944 0 : if( FD_UNLIKELY( NULL == val ) ) return;
945 0 : if( val->avg_reqs == 0 )
946 0 : FD_LOG_DEBUG(( "repair peer %s: no requests sent, stake=%lu", FD_BASE58_ENC_32_ALLOCA( id ), val->stake / (ulong)1e9 ));
947 0 : else if( val->avg_reps == 0 )
948 0 : FD_LOG_DEBUG(( "repair peer %s: avg_requests=%lu, no responses received, stake=%lu", FD_BASE58_ENC_32_ALLOCA( id ), val->avg_reqs, val->stake / (ulong)1e9 ));
949 0 : else
950 0 : FD_LOG_DEBUG(( "repair peer %s: avg_requests=%lu, response_rate=%f, latency=%f, stake=%lu",
951 0 : FD_BASE58_ENC_32_ALLOCA( id ),
952 0 : val->avg_reqs,
953 0 : ((double)val->avg_reps)/((double)val->avg_reqs),
954 0 : 1.0e-9*((double)val->avg_lat)/((double)val->avg_reps),
955 0 : val->stake / (ulong)1e9 ));
956 0 : }
957 :
958 : static void
959 0 : fd_repair_print_all_stats( fd_repair_t * glob ) {
960 0 : for( fd_active_table_iter_t iter = fd_active_table_iter_init( glob->actives );
961 0 : !fd_active_table_iter_done( glob->actives, iter );
962 0 : iter = fd_active_table_iter_next( glob->actives, iter ) ) {
963 0 : fd_active_elem_t * val = fd_active_table_iter_ele( glob->actives, iter );
964 0 : if( !val->sticky ) continue;
965 0 : print_stats( val );
966 0 : }
967 0 : FD_LOG_INFO( ( "peer count: %lu", fd_active_table_key_cnt( glob->actives ) ) );
968 0 : }
969 :
970 0 : void fd_repair_add_sticky( fd_repair_t * glob, fd_pubkey_t const * id ) {
971 0 : fd_repair_lock( glob );
972 0 : glob->actives_sticky[glob->actives_sticky_cnt++] = *id;
973 0 : fd_repair_unlock( glob );
974 0 : }
975 :
976 0 : void fd_repair_set_permanent( fd_repair_t * glob, fd_pubkey_t const * id ) {
977 0 : fd_repair_lock( glob );
978 0 : fd_active_elem_t * val = fd_active_table_query(glob->actives, id, NULL);
979 0 : if( FD_LIKELY( val ) )
980 0 : val->permanent = 1;
981 0 : fd_repair_unlock( glob );
982 0 : }
983 :
984 : void
985 : fd_repair_set_stake_weights( fd_repair_t * repair,
986 : fd_stake_weight_t const * stake_weights,
987 0 : ulong stake_weights_cnt ) {
988 0 : if( stake_weights == NULL ) {
989 0 : FD_LOG_ERR(( "stake weights NULL" ));
990 0 : }
991 0 : if( stake_weights_cnt > FD_STAKE_WEIGHTS_MAX ) {
992 0 : FD_LOG_ERR(( "too many stake weights" ));
993 0 : }
994 :
995 0 : fd_repair_lock( repair );
996 :
997 0 : fd_memset( repair->stake_weights, 0, FD_STAKE_WEIGHTS_MAX * fd_stake_weight_footprint() );
998 0 : fd_memcpy( repair->stake_weights, stake_weights, stake_weights_cnt * sizeof(fd_stake_weight_t) );
999 0 : repair->stake_weights_cnt = stake_weights_cnt;
1000 :
1001 0 : fd_repair_unlock( repair );
1002 0 : }
1003 :
1004 : static void
1005 0 : fd_repair_send_ping(fd_repair_t * glob, fd_gossip_peer_addr_t const * addr, fd_pinged_elem_t * val) {
1006 0 : fd_repair_response_t gmsg;
1007 0 : fd_repair_response_new_disc( &gmsg, fd_repair_response_enum_ping );
1008 0 : fd_gossip_ping_t * ping = &gmsg.inner.ping;
1009 0 : fd_hash_copy( &ping->from, glob->public_key );
1010 :
1011 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
1012 0 : memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
1013 0 : memcpy( pre_image+16UL, val->token.uc, 32UL );
1014 :
1015 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, &ping->token );
1016 :
1017 0 : (*glob->sign_fun)( glob->sign_arg, ping->signature.uc, pre_image, FD_PING_PRE_IMAGE_SZ, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519 );
1018 :
1019 0 : fd_bincode_encode_ctx_t ctx;
1020 0 : uchar buf[1024];
1021 0 : ctx.data = buf;
1022 0 : ctx.dataend = buf + sizeof(buf);
1023 0 : FD_TEST(0 == fd_repair_response_encode(&gmsg, &ctx));
1024 0 : ulong buflen = (ulong)((uchar*)ctx.data - buf);
1025 :
1026 0 : (*glob->serv_send_fun)(buf, buflen, addr, glob->fun_arg);
1027 0 : }
1028 :
1029 : static void
1030 0 : fd_repair_recv_pong(fd_repair_t * glob, fd_gossip_ping_t const * pong, fd_gossip_peer_addr_t const * from) {
1031 0 : fd_pinged_elem_t * val = fd_pinged_table_query(glob->pinged, from, NULL);
1032 0 : if( val == NULL || !fd_hash_eq( &val->id, &pong->from ) )
1033 0 : return;
1034 :
1035 : /* Verify response hash token */
1036 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
1037 0 : memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
1038 0 : memcpy( pre_image+16UL, val->token.uc, 32UL );
1039 :
1040 0 : fd_hash_t pre_image_hash;
1041 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, pre_image_hash.uc );
1042 :
1043 0 : fd_sha256_t sha[1];
1044 0 : fd_sha256_init( sha );
1045 0 : fd_sha256_append( sha, "SOLANA_PING_PONG", 16UL );
1046 0 : fd_sha256_append( sha, pre_image_hash.uc, 32UL );
1047 0 : fd_hash_t golden;
1048 0 : fd_sha256_fini( sha, golden.uc );
1049 :
1050 0 : fd_sha512_t sha2[1];
1051 0 : if( fd_ed25519_verify( /* msg */ golden.uc,
1052 0 : /* sz */ 32U,
1053 0 : /* sig */ pong->signature.uc,
1054 0 : /* public_key */ pong->from.uc,
1055 0 : sha2 )) {
1056 0 : FD_LOG_WARNING(("Failed sig verify for pong"));
1057 0 : return;
1058 0 : }
1059 :
1060 0 : val->good = 1;
1061 0 : }
1062 :
1063 : int
1064 0 : fd_repair_recv_serv_packet(fd_repair_t * glob, uchar const * msg, ulong msglen, fd_gossip_peer_addr_t const * from) {
1065 0 : FD_SCRATCH_SCOPE_BEGIN {
1066 0 : fd_repair_protocol_t protocol;
1067 0 : fd_bincode_decode_ctx_t ctx;
1068 0 : ctx.data = msg;
1069 0 : ctx.dataend = msg + msglen;
1070 0 : ctx.valloc = fd_scratch_virtual();
1071 0 : if( fd_repair_protocol_decode(&protocol, &ctx) ||
1072 0 : ctx.data != ctx.dataend ) {
1073 0 : FD_LOG_WARNING(( "failed to decode repair request packet" ));
1074 0 : return 0;
1075 0 : }
1076 :
1077 0 : fd_repair_request_header_t * header;
1078 0 : switch (protocol.discriminant) {
1079 0 : case fd_repair_protocol_enum_pong:
1080 0 : fd_repair_lock( glob );
1081 0 : fd_repair_recv_pong( glob, &protocol.inner.pong, from );
1082 0 : fd_repair_unlock( glob );
1083 0 : return 0;
1084 0 : case fd_repair_protocol_enum_window_index: {
1085 0 : fd_repair_window_index_t * wi = &protocol.inner.window_index;
1086 0 : header = &wi->header;
1087 0 : break;
1088 0 : }
1089 0 : case fd_repair_protocol_enum_highest_window_index: {
1090 0 : fd_repair_highest_window_index_t * wi = &protocol.inner.highest_window_index;
1091 0 : header = &wi->header;
1092 0 : break;
1093 0 : }
1094 0 : case fd_repair_protocol_enum_orphan: {
1095 0 : fd_repair_orphan_t * wi = &protocol.inner.orphan;
1096 0 : header = &wi->header;
1097 0 : break;
1098 0 : }
1099 :
1100 0 : default:
1101 0 : FD_LOG_WARNING(( "received repair request of unknown type: %d", (int)protocol.discriminant ));
1102 0 : return 0;
1103 0 : }
1104 :
1105 0 : if( !fd_hash_eq( &header->recipient, glob->public_key ) ) {
1106 0 : FD_LOG_WARNING(( "received repair request with wrong recipient, %s instead of %s", FD_BASE58_ENC_32_ALLOCA( header->recipient.uc ), FD_BASE58_ENC_32_ALLOCA( glob->public_key ) ));
1107 0 : return 0;
1108 0 : }
1109 :
1110 : /* Verify the signature */
1111 0 : fd_sha512_t sha2[1];
1112 0 : fd_signature_t sig;
1113 0 : fd_memcpy( &sig, header->signature.uc, sizeof(sig) );
1114 0 : fd_memcpy( (uchar *)msg + 64U, msg, 4U );
1115 0 : if( fd_ed25519_verify( /* msg */ msg + 64U,
1116 0 : /* sz */ msglen - 64U,
1117 0 : /* sig */ sig.uc,
1118 0 : /* public_key */ header->sender.uc,
1119 0 : sha2 )) {
1120 0 : FD_LOG_WARNING(("received repair request with with invalid signature"));
1121 0 : return 0;
1122 0 : }
1123 :
1124 0 : fd_repair_lock( glob );
1125 :
1126 0 : fd_pinged_elem_t * val = fd_pinged_table_query(glob->pinged, from, NULL);
1127 0 : if( val == NULL || !val->good || !fd_hash_eq( &val->id, &header->sender ) ) {
1128 : /* Need to ping this client */
1129 0 : if( val == NULL ) {
1130 0 : if( fd_pinged_table_is_full(glob->pinged) ) {
1131 0 : FD_LOG_WARNING(("pinged table is full"));
1132 0 : fd_repair_unlock( glob );
1133 0 : return 0;
1134 0 : }
1135 0 : val = fd_pinged_table_insert(glob->pinged, from);
1136 0 : for ( ulong i = 0; i < FD_HASH_FOOTPRINT / sizeof(ulong); i++ )
1137 0 : val->token.ul[i] = fd_rng_ulong(glob->rng);
1138 0 : }
1139 0 : fd_hash_copy( &val->id, &header->sender );
1140 0 : val->good = 0;
1141 0 : fd_repair_send_ping( glob, from, val );
1142 :
1143 0 : } else {
1144 0 : uchar buf[FD_SHRED_MAX_SZ + sizeof(uint)];
1145 0 : switch (protocol.discriminant) {
1146 0 : case fd_repair_protocol_enum_window_index: {
1147 0 : fd_repair_window_index_t const * wi = &protocol.inner.window_index;
1148 0 : long sz = (*glob->serv_get_shred_fun)( wi->slot, (uint)wi->shred_index, buf, FD_SHRED_MAX_SZ, glob->fun_arg );
1149 0 : if( sz < 0 ) break;
1150 0 : *(uint *)(buf + sz) = wi->header.nonce;
1151 0 : (*glob->serv_send_fun)( buf, (ulong)sz + sizeof(uint), from, glob->fun_arg );
1152 0 : break;
1153 0 : }
1154 :
1155 0 : case fd_repair_protocol_enum_highest_window_index: {
1156 0 : fd_repair_highest_window_index_t const * wi = &protocol.inner.highest_window_index;
1157 0 : long sz = (*glob->serv_get_shred_fun)( wi->slot, UINT_MAX, buf, FD_SHRED_MAX_SZ, glob->fun_arg );
1158 0 : if( sz < 0 ) break;
1159 0 : *(uint *)(buf + sz) = wi->header.nonce;
1160 0 : (*glob->serv_send_fun)( buf, (ulong)sz + sizeof(uint), from, glob->fun_arg );
1161 0 : break;
1162 0 : }
1163 :
1164 0 : case fd_repair_protocol_enum_orphan: {
1165 0 : fd_repair_orphan_t const * wi = &protocol.inner.orphan;
1166 0 : ulong slot = wi->slot;
1167 0 : for(unsigned i = 0; i < 10; ++i) {
1168 0 : slot = (*glob->serv_get_parent_fun)( slot, glob->fun_arg );
1169 : /* We cannot serve slots <= 1 since they are empy and created at genesis. */
1170 0 : if( slot == FD_SLOT_NULL || slot <= 1UL ) break;
1171 0 : long sz = (*glob->serv_get_shred_fun)( slot, UINT_MAX, buf, FD_SHRED_MAX_SZ, glob->fun_arg );
1172 0 : if( sz < 0 ) continue;
1173 0 : *(uint *)(buf + sz) = wi->header.nonce;
1174 0 : (*glob->serv_send_fun)( buf, (ulong)sz + sizeof(uint), from, glob->fun_arg );
1175 0 : }
1176 0 : break;
1177 0 : }
1178 :
1179 0 : default:
1180 0 : break;
1181 0 : }
1182 0 : }
1183 :
1184 0 : fd_repair_unlock( glob );
1185 0 : } FD_SCRATCH_SCOPE_END;
1186 0 : return 0;
1187 0 : }
|