Line data Source code
1 : #define _GNU_SOURCE 1
2 : #include "fd_gossip.h"
3 : #include "../../ballet/sha256/fd_sha256.h"
4 : #include "../../ballet/ed25519/fd_ed25519.h"
5 : #include "../../ballet/base58/fd_base58.h"
6 : #include "../../disco/keyguard/fd_keyguard.h"
7 : #include "../../util/rng/fd_rng.h"
8 : #include <string.h>
9 : #include <stdio.h>
10 : #include <math.h>
11 : #include <sys/types.h>
12 : #include <sys/socket.h>
13 : #include <arpa/inet.h>
14 : #include <netinet/in.h>
15 : #include <sys/random.h>
16 :
17 : /* Maximum size of a network packet */
18 0 : #define PACKET_DATA_SIZE 1232
19 : /* How long do we remember values (in millisecs) */
20 0 : #define FD_GOSSIP_VALUE_EXPIRE ((ulong)(3600e3)) /* 1 hr */
21 : /* Max age that values can be pushed/pulled (in millisecs) */
22 0 : #define FD_GOSSIP_PULL_TIMEOUT ((ulong)(15e3)) /* 15 seconds */
23 : /* Max number of validators that can be actively pinged */
24 0 : #define FD_ACTIVE_KEY_MAX (1<<8)
25 :
26 : /* Max number of CRDS values that can be remembered.
27 :
28 : As of 2.1.11 on 02/05/2025, Agave value table (approx) sizes on an unstaked node:
29 : | cluster | num unique entries | num total (unique + purged) |
30 : | testnet | ~800k | 1.4m |
31 : | mainnet | ~750k | 1m |
32 :
33 : Purged values are counted because:
34 : - Our table (currently) does not "purge" values in the same sense Agave does
35 : - Purged values are included in bloom filter construction, so we need them anyway */
36 0 : #define FD_VALUE_KEY_MAX (1<<21)
37 : /* Max number of pending timed events */
38 0 : #define FD_PENDING_MAX (1<<9)
39 : /* Sample rate of bloom filters. */
40 0 : #define FD_BLOOM_SAMPLE_RATE 8U
41 : /* Number of bloom filter bits in an outgoing pull request packet */
42 0 : #define FD_BLOOM_NUM_BITS (512U*8U) /* 0.5 Kbyte */
43 : /* Max number of bloom filter keys in an outgoing pull request packet */
44 0 : #define FD_BLOOM_MAX_KEYS 32U
45 : /* Max number of packets in an outgoing pull request batch */
46 0 : #define FD_BLOOM_MAX_PACKETS 32U
47 : /* Number of bloom bits in a push prune filter */
48 0 : #define FD_PRUNE_NUM_BITS (512U*8U) /* 0.5 Kbyte */
49 : /* Number of bloom keys in a push prune filter */
50 0 : #define FD_PRUNE_NUM_KEYS 4U
51 : /* Max number of destinations a single message can be pushed */
52 0 : #define FD_PUSH_VALUE_MAX 9
53 : /* Max number of push destinations that we track */
54 0 : #define FD_PUSH_LIST_MAX 12
55 : /* Max length of queue of values that need pushing */
56 0 : #define FD_NEED_PUSH_MAX (1<<12)
57 : /* Max size of receive statistics table */
58 0 : #define FD_STATS_KEY_MAX (1<<8)
59 : /* Sha256 pre-image size for pings/pongs */
60 0 : #define FD_PING_PRE_IMAGE_SZ (48UL)
61 : /* Number of recognized CRDS enum members */
62 0 : #define FD_KNOWN_CRDS_ENUM_MAX (14UL)
63 : /* Prune data prefix
64 : https://github.com/anza-xyz/agave/blob/0c264859b127940f13673b5fea300131a70b1a8d/gossip/src/protocol.rs#L39 */
65 0 : #define FD_GOSSIP_PRUNE_DATA_PREFIX "\xffSOLANA_PRUNE_DATA"
66 :
67 0 : #define FD_NANOSEC_TO_MILLI(_ts_) ((ulong)(_ts_/1000000))
68 :
69 : /* Maximum number of stake weights, mirrors fd_stake_ci */
70 0 : #define MAX_STAKE_WEIGHTS (40200UL)
71 :
72 0 : #define MAX_PEER_PING_COUNT (10000U)
73 :
74 : /* Test if two addresses are equal */
75 0 : FD_FN_PURE static int fd_gossip_peer_addr_eq( const fd_gossip_peer_addr_t * key1, const fd_gossip_peer_addr_t * key2 ) {
76 0 : FD_STATIC_ASSERT(sizeof(fd_gossip_peer_addr_t) == sizeof(ulong),"messed up size");
77 0 : return key1->l == key2->l;
78 0 : }
79 :
80 : /* Hash an address */
81 0 : FD_FN_PURE static ulong fd_gossip_peer_addr_hash( const fd_gossip_peer_addr_t * key, ulong seed ) {
82 0 : FD_STATIC_ASSERT(sizeof(fd_gossip_peer_addr_t) == sizeof(ulong),"messed up size");
83 0 : return (key->l + seed + 7242237688154252699UL)*9540121337UL;
84 0 : }
85 :
86 : /* Efficiently copy an address */
87 0 : static void fd_gossip_peer_addr_copy( fd_gossip_peer_addr_t * keyd, const fd_gossip_peer_addr_t * keys ) {
88 0 : FD_STATIC_ASSERT(sizeof(fd_gossip_peer_addr_t) == sizeof(ulong),"messed up size");
89 0 : keyd->l = keys->l;
90 0 : }
91 :
92 : /* All peers table element. The peers table is all known validator addresses/ids. */
93 : struct fd_peer_elem {
94 : fd_gossip_peer_addr_t key;
95 : ulong next;
96 : fd_pubkey_t id; /* Public indentifier */
97 : ulong wallclock; /* last time we heard about this peer */
98 : ulong stake; /* Staking for this validator. Unimplemented. */
99 : };
100 : /* All peers table */
101 : typedef struct fd_peer_elem fd_peer_elem_t;
102 : #define MAP_NAME fd_peer_table
103 : #define MAP_KEY_T fd_gossip_peer_addr_t
104 0 : #define MAP_KEY_EQ fd_gossip_peer_addr_eq
105 0 : #define MAP_KEY_HASH fd_gossip_peer_addr_hash
106 0 : #define MAP_KEY_COPY fd_gossip_peer_addr_copy
107 0 : #define MAP_T fd_peer_elem_t
108 : #include "../../util/tmpl/fd_map_giant.c"
109 :
110 : /* Active table element. This table is all validators that we are
111 : aggressively pinging for liveness checking. */
112 : struct fd_active_elem {
113 : fd_gossip_peer_addr_t key;
114 : ulong next;
115 : fd_pubkey_t id; /* Public indentifier */
116 : long pingtime; /* Last time we sent a ping */
117 : uint pingcount; /* Number of pings it took to get a pong */
118 : fd_hash_t pingtoken; /* Random data used in ping/pong */
119 : long pongtime; /* Last time we received a pong */
120 : ulong weight; /* Selection weight */
121 : };
122 : /* Active table */
123 : typedef struct fd_active_elem fd_active_elem_t;
124 : #define MAP_NAME fd_active_table
125 : #define MAP_KEY_T fd_gossip_peer_addr_t
126 0 : #define MAP_KEY_EQ fd_gossip_peer_addr_eq
127 0 : #define MAP_KEY_HASH fd_gossip_peer_addr_hash
128 0 : #define MAP_KEY_COPY fd_gossip_peer_addr_copy
129 0 : #define MAP_T fd_active_elem_t
130 : #include "../../util/tmpl/fd_map_giant.c"
131 :
132 : /* Initialize an active table element value */
133 : void
134 0 : fd_active_new_value(fd_active_elem_t * val) {
135 0 : val->pingcount = 1;
136 0 : val->pingtime = val->pongtime = 0;
137 0 : val->weight = 0;
138 0 : fd_memset(val->id.uc, 0, 32U);
139 0 : fd_memset(val->pingtoken.uc, 0, 32U);
140 0 : }
141 :
142 : /* Test if two hash values are equal */
143 0 : int fd_hash_eq( const fd_hash_t * key1, const fd_hash_t * key2 ) {
144 0 : for (ulong i = 0; i < 32U/sizeof(ulong); ++i)
145 0 : if (key1->ul[i] != key2->ul[i])
146 0 : return 0;
147 0 : return 1;
148 0 : }
149 :
150 : /* Hash a hash value */
151 0 : ulong fd_hash_hash( const fd_hash_t * key, ulong seed ) {
152 0 : return key->ul[0] ^ seed;
153 0 : }
154 :
155 : /* Copy a hash value */
156 0 : void fd_hash_copy( fd_hash_t * keyd, const fd_hash_t * keys ) {
157 0 : for (ulong i = 0; i < 32U/sizeof(ulong); ++i)
158 0 : keyd->ul[i] = keys->ul[i];
159 0 : }
160 :
161 : /* Value table element. This table stores all received crds
162 : values. Keyed by the hash of the value data. */
163 : struct fd_value_elem {
164 : fd_hash_t key;
165 : ulong next;
166 : fd_pubkey_t origin; /* Where did this value originate */
167 : ulong wallclock; /* Original timestamp of value in millis */
168 : uchar data[PACKET_DATA_SIZE]; /* Serialized form of value (bincode) including signature */
169 : ulong datalen;
170 : };
171 : /* Value table */
172 : typedef struct fd_value_elem fd_value_elem_t;
173 : #define MAP_NAME fd_value_table
174 : #define MAP_KEY_T fd_hash_t
175 0 : #define MAP_KEY_EQ fd_hash_eq
176 0 : #define MAP_KEY_HASH fd_hash_hash
177 0 : #define MAP_KEY_COPY fd_hash_copy
178 0 : #define MAP_T fd_value_elem_t
179 : #include "../../util/tmpl/fd_map_giant.c"
180 :
181 : /* Weights table element. This table stores the weight for each peer
182 : (determined by stake). */
183 : struct fd_weights_elem {
184 : fd_pubkey_t key;
185 : ulong next;
186 : ulong weight;
187 : };
188 : /* Weights table */
189 : typedef struct fd_weights_elem fd_weights_elem_t;
190 : #define MAP_NAME fd_weights_table
191 : #define MAP_KEY_T fd_hash_t
192 0 : #define MAP_KEY_EQ fd_hash_eq
193 0 : #define MAP_KEY_HASH fd_hash_hash
194 0 : #define MAP_KEY_COPY fd_hash_copy
195 0 : #define MAP_T fd_weights_elem_t
196 : #include "../../util/tmpl/fd_map_giant.c"
197 :
198 : /* Queue of pending timed events, stored as a priority heap */
199 : union fd_pending_event_arg {
200 : fd_gossip_peer_addr_t key;
201 : };
202 : typedef union fd_pending_event_arg fd_pending_event_arg_t;
203 : typedef void (*fd_pending_event_fun)(struct fd_gossip * glob, fd_pending_event_arg_t * arg);
204 : struct fd_pending_event {
205 : ulong left;
206 : ulong right;
207 : long key;
208 : fd_pending_event_fun fun;
209 : fd_pending_event_arg_t fun_arg;
210 : };
211 : typedef struct fd_pending_event fd_pending_event_t;
212 : #define POOL_NAME fd_pending_pool
213 0 : #define POOL_T fd_pending_event_t
214 0 : #define POOL_NEXT left
215 : #include "../../util/tmpl/fd_pool.c"
216 : #define HEAP_NAME fd_pending_heap
217 : #define HEAP_T fd_pending_event_t
218 0 : #define HEAP_LT(e0,e1) (e0->key < e1->key)
219 : #include "../../util/tmpl/fd_heap.c"
220 :
221 : /* Data structure representing an active push destination. There are
222 : only a small number of these. */
223 : struct fd_push_state {
224 : fd_gossip_peer_addr_t addr; /* Destination address */
225 : fd_pubkey_t id; /* Public indentifier */
226 : ulong drop_cnt; /* Number of values dropped due to pruning */
227 : ulong prune_keys[FD_PRUNE_NUM_KEYS]; /* Keys used for bloom filter for pruning */
228 : ulong prune_bits[FD_PRUNE_NUM_BITS/64U]; /* Bits table used for bloom filter for pruning */
229 : uchar packet[PACKET_DATA_SIZE]; /* Partially assembled packet containing a fd_gossip_push_msg_t */
230 : uchar * packet_end_init; /* Initial end of the packet when there are zero values */
231 : uchar * packet_end; /* Current end of the packet including values so far */
232 : ulong next;
233 : };
234 : typedef struct fd_push_state fd_push_state_t;
235 :
236 : #define POOL_NAME fd_push_states_pool
237 0 : #define POOL_T fd_push_state_t
238 : #include "../../util/tmpl/fd_pool.c"
239 :
240 : /* Receive statistics table element. */
241 : struct fd_stats_elem {
242 : fd_gossip_peer_addr_t key; /* Keyed by sender */
243 : ulong next;
244 : long last; /* Timestamp of last update */
245 : /* Duplicate counts by origin */
246 : struct {
247 : fd_pubkey_t origin;
248 : ulong cnt;
249 : } dups[8];
250 : ulong dups_cnt;
251 : };
252 : /* Receive statistics table. */
253 : typedef struct fd_stats_elem fd_stats_elem_t;
254 : #define MAP_NAME fd_stats_table
255 : #define MAP_KEY_T fd_gossip_peer_addr_t
256 0 : #define MAP_KEY_EQ fd_gossip_peer_addr_eq
257 0 : #define MAP_KEY_HASH fd_gossip_peer_addr_hash
258 0 : #define MAP_KEY_COPY fd_gossip_peer_addr_copy
259 0 : #define MAP_T fd_stats_elem_t
260 : #include "../../util/tmpl/fd_map_giant.c"
261 :
262 : #define SET_NAME fd_gossip_filter_selection
263 : #define SET_MAX FD_BLOOM_MAX_PACKETS
264 : #include "../../util/tmpl/fd_smallset.c"
265 :
266 : struct fd_msg_stats_elem {
267 : ulong bytes_rx_cnt;
268 : ulong total_cnt;
269 : ulong dups_cnt;
270 : };
271 : /* Receive type statistics table. */
272 : typedef struct fd_msg_stats_elem fd_msg_stats_elem_t;
273 :
274 : /* Internal struct for maintaining
275 : a contact_info_v2 entry */
276 : typedef struct {
277 : fd_crds_data_t crd;
278 : fd_gossip_contact_info_v2_t * ci;
279 : fd_gossip_ip_addr_t addrs[FD_GOSSIP_SOCKET_TAG_MAX];
280 : fd_gossip_socket_entry_t sockets[FD_GOSSIP_SOCKET_TAG_MAX];
281 : // uint extentions[1]; /* Unused */
282 : } fd_gossip_node_contact_t;
283 :
284 : static void
285 0 : fd_gossip_init_node_contact( fd_gossip_node_contact_t * contact ) {
286 0 : fd_crds_data_new_disc( &contact->crd, fd_crds_data_enum_contact_info_v2 );
287 0 : contact->ci = &contact->crd.inner.contact_info_v2;
288 0 : contact->ci->addrs = contact->addrs;
289 0 : contact->ci->sockets = contact->sockets;
290 0 : }
291 :
292 : /* Internal node address book.
293 : Both ip addr and port* are stored
294 : in network order
295 :
296 : * contact_info_v2 stores port numbers
297 : in host order, so they need to be converted
298 : when copying over. See
299 : fd_gossip_refresh_contact_info_v2_sockets() */
300 :
301 : typedef struct {
302 : fd_gossip_peer_addr_t gossip;
303 : fd_gossip_peer_addr_t serve_repair;
304 : fd_gossip_peer_addr_t tvu;
305 : fd_gossip_peer_addr_t tpu;
306 : fd_gossip_peer_addr_t tpu_quic;
307 : fd_gossip_peer_addr_t tpu_vote;
308 : } fd_gossip_node_addrs_t;
309 :
310 : /* Global data for gossip service */
311 : struct fd_gossip {
312 : /* Concurrency lock */
313 : volatile ulong lock;
314 : /* Current time in nanosecs */
315 : long now;
316 :
317 : /* My official contact info in the gossip protocol */
318 : fd_gossip_node_contact_t my_contact;
319 : fd_gossip_node_addrs_t my_node_addrs;
320 : /* My gossip port address (ptr to gossip entry in my node addrs) */
321 : fd_gossip_peer_addr_t * my_addr;
322 : /* My public key (ptr to entry in my contact info) */
323 : fd_pubkey_t * public_key;
324 :
325 : /* Function used to deliver gossip messages to the application */
326 : fd_gossip_data_deliver_fun deliver_fun;
327 : /* Argument to fd_gossip_data_deliver_fun */
328 : void * deliver_arg;
329 : /* Function used to send raw packets on the network */
330 : fd_gossip_send_packet_fun send_fun;
331 : /* Argument to fd_gossip_send_packet_fun */
332 : void * send_arg;
333 : /* Function used to send packets for signing to remote tile */
334 : fd_gossip_sign_fun sign_fun;
335 : /* Argument to fd_gossip_sign_fun */
336 : void * sign_arg;
337 :
338 : /* Table of all known validators, keyed by gossip address */
339 : fd_peer_elem_t * peers;
340 : /* Table of validators that we are actively pinging, keyed by gossip address */
341 : fd_active_elem_t * actives;
342 : /* Queue of validators that might be added to actives */
343 : fd_gossip_peer_addr_t * inactives;
344 : ulong inactives_cnt;
345 0 : #define INACTIVES_MAX 1024U
346 :
347 : /* Table of crds values that we have received in the last 60 minutes, keys by hash */
348 : fd_value_elem_t * values;
349 : /* The last timestamp that we pushed our own contact info */
350 : long last_contact_time;
351 : fd_hash_t last_contact_info_v2_key;
352 :
353 : /* Array of push destinations currently in use */
354 : fd_push_state_t * push_states[FD_PUSH_LIST_MAX];
355 : ulong push_states_cnt;
356 : fd_push_state_t * push_states_pool;
357 : /* Queue of values that need pushing */
358 : fd_hash_t * need_push;
359 : ulong need_push_head;
360 : ulong need_push_cnt;
361 :
362 : /* Table of receive statistics */
363 : fd_stats_elem_t * stats;
364 : /* Table of message type stats */
365 : fd_msg_stats_elem_t msg_stats[FD_KNOWN_CRDS_ENUM_MAX];
366 :
367 : /* Heap/queue of pending timed events */
368 : fd_pending_event_t * event_pool;
369 : fd_pending_heap_t * event_heap;
370 :
371 : /* Random number generator */
372 : fd_rng_t rng[1];
373 : /* RNG seed */
374 : ulong seed;
375 : /* Total number of packeets received */
376 : ulong recv_pkt_cnt;
377 : /* Total number of duplicate values received */
378 : ulong recv_dup_cnt;
379 : /* Total number of non-duplicate values received */
380 : ulong recv_nondup_cnt;
381 : /* Count of values pushed */
382 : ulong push_cnt;
383 : /* Count of values not pushed due to pruning */
384 : ulong not_push_cnt;
385 :
386 : /* Stake weights */
387 : fd_weights_elem_t * weights;
388 :
389 : /* List of added entrypoints at startup */
390 : ulong entrypoints_cnt;
391 : fd_gossip_peer_addr_t entrypoints[16];
392 : /* Metrics */
393 : fd_gossip_metrics_t metrics;
394 : };
395 :
396 : fd_gossip_metrics_t *
397 0 : fd_gossip_get_metrics( fd_gossip_t * gossip ) {
398 0 : return &gossip->metrics;
399 0 : }
400 :
401 : FD_FN_CONST ulong
402 0 : fd_gossip_align ( void ) { return 128UL; }
403 :
404 : FD_FN_CONST ulong
405 0 : fd_gossip_footprint( void ) {
406 0 : ulong l = FD_LAYOUT_INIT;
407 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t) );
408 0 : l = FD_LAYOUT_APPEND( l, fd_peer_table_align(), fd_peer_table_footprint(FD_PEER_KEY_MAX) );
409 0 : l = FD_LAYOUT_APPEND( l, fd_active_table_align(), fd_active_table_footprint(FD_ACTIVE_KEY_MAX) );
410 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_gossip_peer_addr_t), INACTIVES_MAX*sizeof(fd_gossip_peer_addr_t) );
411 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_hash_t), FD_NEED_PUSH_MAX*sizeof(fd_hash_t) );
412 0 : l = FD_LAYOUT_APPEND( l, fd_value_table_align(), fd_value_table_footprint(FD_VALUE_KEY_MAX) );
413 0 : l = FD_LAYOUT_APPEND( l, fd_pending_pool_align(), fd_pending_pool_footprint(FD_PENDING_MAX) );
414 0 : l = FD_LAYOUT_APPEND( l, fd_pending_heap_align(), fd_pending_heap_footprint(FD_PENDING_MAX) );
415 0 : l = FD_LAYOUT_APPEND( l, fd_stats_table_align(), fd_stats_table_footprint(FD_STATS_KEY_MAX) );
416 0 : l = FD_LAYOUT_APPEND( l, fd_weights_table_align(), fd_weights_table_footprint(MAX_STAKE_WEIGHTS) );
417 0 : l = FD_LAYOUT_APPEND( l, fd_push_states_pool_align(), fd_push_states_pool_footprint(FD_PUSH_LIST_MAX) );
418 0 : l = FD_LAYOUT_FINI( l, fd_gossip_align() );
419 0 : return l;
420 0 : }
421 :
422 : void *
423 0 : fd_gossip_new ( void * shmem, ulong seed ) {
424 0 : FD_SCRATCH_ALLOC_INIT(l, shmem);
425 0 : fd_gossip_t * glob = (fd_gossip_t*)FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t)) ;
426 0 : fd_memset(glob, 0, sizeof(fd_gossip_t));
427 0 : glob->seed = seed;
428 :
429 0 : void * shm = FD_SCRATCH_ALLOC_APPEND(l, fd_peer_table_align(), fd_peer_table_footprint(FD_PEER_KEY_MAX));
430 0 : glob->peers = fd_peer_table_join(fd_peer_table_new(shm, FD_PEER_KEY_MAX, seed));
431 :
432 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_active_table_align(), fd_active_table_footprint(FD_ACTIVE_KEY_MAX));
433 0 : glob->actives = fd_active_table_join(fd_active_table_new(shm, FD_ACTIVE_KEY_MAX, seed));
434 :
435 0 : glob->inactives = (fd_gossip_peer_addr_t*)FD_SCRATCH_ALLOC_APPEND(l, alignof(fd_gossip_peer_addr_t), INACTIVES_MAX*sizeof(fd_gossip_peer_addr_t));
436 0 : glob->need_push = (fd_hash_t*)FD_SCRATCH_ALLOC_APPEND(l, alignof(fd_hash_t), FD_NEED_PUSH_MAX*sizeof(fd_hash_t));
437 :
438 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_value_table_align(), fd_value_table_footprint(FD_VALUE_KEY_MAX));
439 0 : glob->values = fd_value_table_join(fd_value_table_new(shm, FD_VALUE_KEY_MAX, seed));
440 :
441 0 : glob->last_contact_time = 0;
442 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_pending_pool_align(), fd_pending_pool_footprint(FD_PENDING_MAX));
443 0 : glob->event_pool = fd_pending_pool_join(fd_pending_pool_new(shm, FD_PENDING_MAX));
444 :
445 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_pending_heap_align(), fd_pending_heap_footprint(FD_PENDING_MAX));
446 0 : glob->event_heap = fd_pending_heap_join(fd_pending_heap_new(shm, FD_PENDING_MAX));
447 :
448 0 : fd_rng_new(glob->rng, (uint)seed, 0UL);
449 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_stats_table_align(), fd_stats_table_footprint(FD_STATS_KEY_MAX));
450 0 : glob->stats = fd_stats_table_join(fd_stats_table_new(shm, FD_STATS_KEY_MAX, seed));
451 :
452 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_weights_table_align(), fd_weights_table_footprint(MAX_STAKE_WEIGHTS));
453 0 : glob->weights = fd_weights_table_join( fd_weights_table_new( shm, MAX_STAKE_WEIGHTS, seed ) );
454 :
455 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_push_states_pool_align(), fd_push_states_pool_footprint(FD_PUSH_LIST_MAX));
456 0 : glob->push_states_pool = fd_push_states_pool_join( fd_push_states_pool_new( shm, FD_PUSH_LIST_MAX ) );
457 :
458 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, fd_gossip_align() );
459 0 : if ( scratch_top > (ulong)shmem + fd_gossip_footprint() ) {
460 0 : FD_LOG_ERR(("Not enough space allocated for gossip"));
461 0 : }
462 0 : return glob;
463 0 : }
464 :
465 : fd_gossip_t *
466 0 : fd_gossip_join ( void * shmap ) { return (fd_gossip_t *)shmap; }
467 :
468 : void *
469 0 : fd_gossip_leave ( fd_gossip_t * join ) { return join; }
470 :
471 : void *
472 0 : fd_gossip_delete ( void * shmap ) {
473 0 : fd_gossip_t * glob = (fd_gossip_t *)shmap;
474 0 : fd_peer_table_delete( fd_peer_table_leave( glob->peers ) );
475 0 : fd_active_table_delete( fd_active_table_leave( glob->actives ) );
476 :
477 0 : fd_value_table_delete( fd_value_table_leave( glob->values ) );
478 0 : fd_pending_pool_delete( fd_pending_pool_leave( glob->event_pool ) );
479 0 : fd_pending_heap_delete( fd_pending_heap_leave( glob->event_heap ) );
480 0 : fd_stats_table_delete( fd_stats_table_leave( glob->stats ) );
481 0 : fd_weights_table_delete( fd_weights_table_leave( glob->weights ) );
482 0 : fd_push_states_pool_delete( fd_push_states_pool_leave( glob->push_states_pool ) );
483 :
484 0 : return glob;
485 0 : }
486 :
487 : static void
488 0 : fd_gossip_lock( fd_gossip_t * gossip ) {
489 0 : # if FD_HAS_THREADS
490 0 : for(;;) {
491 0 : if( FD_LIKELY( !FD_ATOMIC_CAS( &gossip->lock, 0UL, 1UL) ) ) break;
492 0 : FD_SPIN_PAUSE();
493 0 : }
494 : # else
495 : gossip->lock = 1;
496 : # endif
497 0 : FD_COMPILER_MFENCE();
498 0 : }
499 :
500 : static void
501 0 : fd_gossip_unlock( fd_gossip_t * gossip ) {
502 0 : FD_COMPILER_MFENCE();
503 0 : FD_VOLATILE( gossip->lock ) = 0UL;
504 0 : }
505 :
506 : /* FIXME: do these go in fd_types_custom instead? */
507 : void
508 0 : fd_gossip_ipaddr_from_socketaddr( fd_gossip_socket_addr_t const * addr, fd_gossip_ip_addr_t * out ) {
509 0 : if( FD_LIKELY( addr->discriminant == fd_gossip_socket_addr_enum_ip4 ) ) {
510 0 : fd_gossip_ip_addr_new_disc(out, fd_gossip_ip_addr_enum_ip4);
511 0 : out->inner.ip4 = addr->inner.ip4.addr;
512 0 : } else {
513 0 : fd_gossip_ip_addr_new_disc(out, fd_gossip_ip_addr_enum_ip6);
514 0 : out->inner.ip6 = addr->inner.ip6.addr;
515 0 : }
516 0 : }
517 :
518 : ushort
519 0 : fd_gossip_port_from_socketaddr( fd_gossip_socket_addr_t const * addr ) {
520 0 : if( FD_LIKELY( addr->discriminant == fd_gossip_socket_addr_enum_ip4 ) ) {
521 0 : return addr->inner.ip4.port;
522 0 : } else {
523 0 : return addr->inner.ip6.port;
524 0 : }
525 0 : }
526 :
527 :
528 : void
529 : fd_gossip_contact_info_v2_to_v1( fd_gossip_contact_info_v2_t const * v2,
530 0 : fd_gossip_contact_info_v1_t * v1 ) {
531 0 : memset( v1, 0, sizeof(fd_gossip_contact_info_v1_t) );
532 0 : v1->id = v2->from;
533 0 : v1->shred_version = v2->shred_version;
534 0 : v1->wallclock = v2->wallclock;
535 0 : fd_gossip_contact_info_v2_find_proto_ident( v2, FD_GOSSIP_SOCKET_TAG_GOSSIP, &v1->gossip );
536 0 : fd_gossip_contact_info_v2_find_proto_ident( v2, FD_GOSSIP_SOCKET_TAG_SERVE_REPAIR, &v1->serve_repair );
537 0 : fd_gossip_contact_info_v2_find_proto_ident( v2, FD_GOSSIP_SOCKET_TAG_TPU, &v1->tpu );
538 0 : fd_gossip_contact_info_v2_find_proto_ident( v2, FD_GOSSIP_SOCKET_TAG_TPU_VOTE, &v1->tpu_vote );
539 0 : fd_gossip_contact_info_v2_find_proto_ident( v2, FD_GOSSIP_SOCKET_TAG_TVU, &v1->tvu );
540 0 : }
541 :
542 : int
543 : fd_gossip_contact_info_v2_find_proto_ident( fd_gossip_contact_info_v2_t const * contact_info,
544 : uchar proto_ident,
545 0 : fd_gossip_socket_addr_t * out_addr ) {
546 0 : ushort port = 0;
547 0 : for( ulong i = 0UL; i<contact_info->sockets_len; i++ ) {
548 0 : fd_gossip_socket_entry_t const * socket_entry = &contact_info->sockets[ i ];
549 0 : port = (ushort)( port + socket_entry->offset );
550 0 : if( socket_entry->key==proto_ident ) {
551 0 : if( socket_entry->index>=contact_info->addrs_len) {
552 0 : continue;
553 0 : }
554 :
555 : /* fd_gossip_socket_addr->inner and fd_gossip_ip_addr
556 : are slightly different, so we can't just
557 : out_addr->ip = contact_info->addrs[ idx ] */
558 0 : fd_gossip_ip_addr_t * tmp = &contact_info->addrs[ socket_entry->index ];
559 0 : if( FD_LIKELY( tmp->discriminant == fd_gossip_ip_addr_enum_ip4 ) ) {
560 0 : out_addr->discriminant = fd_gossip_socket_addr_enum_ip4;
561 0 : out_addr->inner.ip4.addr = tmp->inner.ip4;
562 0 : out_addr->inner.ip4.port = port;
563 0 : } else {
564 0 : out_addr->discriminant = fd_gossip_socket_addr_enum_ip6;
565 0 : out_addr->inner.ip6.addr = tmp->inner.ip6;
566 0 : out_addr->inner.ip6.port = port;
567 0 : }
568 0 : return 1;
569 0 : }
570 0 : }
571 :
572 0 : return 0;
573 0 : }
574 :
575 : /* Convert my style of address to solana style */
576 : int
577 0 : fd_gossip_to_soladdr( fd_gossip_socket_addr_t * dst, fd_gossip_peer_addr_t const * src ) {
578 0 : fd_gossip_socket_addr_new_disc( dst, fd_gossip_socket_addr_enum_ip4 );
579 0 : dst->inner.ip4.port = ntohs(src->port);
580 0 : dst->inner.ip4.addr = src->addr;
581 0 : return 0;
582 0 : }
583 :
584 : /* Convert my style of address from solana style */
585 : int
586 0 : fd_gossip_from_soladdr(fd_gossip_peer_addr_t * dst, fd_gossip_socket_addr_t const * src ) {
587 0 : FD_STATIC_ASSERT(sizeof(fd_gossip_peer_addr_t) == sizeof(ulong),"messed up size");
588 0 : dst->l = 0;
589 0 : if (src->discriminant == fd_gossip_socket_addr_enum_ip4) {
590 0 : dst->port = htons(src->inner.ip4.port);
591 0 : dst->addr = src->inner.ip4.addr;
592 0 : return 0;
593 0 : } else {
594 0 : FD_LOG_ERR(("invalid address family %lu", (ulong)src->discriminant));
595 0 : return -1;
596 0 : }
597 0 : }
598 :
599 : /* Convert an address to a human readable string */
600 0 : const char * fd_gossip_addr_str( char * dst, size_t dstlen, fd_gossip_peer_addr_t const * src ) {
601 0 : char tmp[INET_ADDRSTRLEN];
602 0 : snprintf(dst, dstlen, "%s:%u", inet_ntop(AF_INET, &src->addr, tmp, INET_ADDRSTRLEN), (uint)ntohs(src->port));
603 0 : return dst;
604 0 : }
605 :
606 : static void
607 : fd_gossip_refresh_contact_info_v2_sockets( fd_gossip_node_addrs_t const * addrs, fd_gossip_node_contact_t * ci_int );
608 :
609 : /* Set the gossip configuration */
610 : int
611 0 : fd_gossip_set_config( fd_gossip_t * glob, const fd_gossip_config_t * config ) {
612 0 : fd_gossip_lock( glob );
613 :
614 0 : char tmp[100];
615 0 : char keystr[ FD_BASE58_ENCODED_32_SZ ];
616 0 : fd_base58_encode_32( config->public_key->uc, NULL, keystr );
617 0 : FD_LOG_NOTICE(("configuring address %s id %s", fd_gossip_addr_str(tmp, sizeof(tmp), &config->my_addr), keystr));
618 :
619 0 : fd_gossip_init_node_contact( &glob->my_contact );
620 :
621 0 : fd_hash_copy(&glob->my_contact.ci->from, config->public_key);
622 0 : glob->public_key = &glob->my_contact.ci->from;
623 :
624 0 : fd_gossip_peer_addr_copy( &glob->my_node_addrs.gossip, &config->my_addr );
625 0 : fd_gossip_refresh_contact_info_v2_sockets( &glob->my_node_addrs, &glob->my_contact );
626 0 : glob->my_addr = &glob->my_node_addrs.gossip;
627 :
628 0 : glob->my_contact.ci->shred_version = config->shred_version;
629 0 : glob->my_contact.ci->outset = (ulong)config->node_outset;
630 0 : glob->my_contact.ci->version = config->my_version;
631 :
632 0 : glob->deliver_fun = config->deliver_fun;
633 0 : glob->deliver_arg = config->deliver_arg;
634 0 : glob->send_fun = config->send_fun;
635 0 : glob->send_arg = config->send_arg;
636 0 : glob->sign_fun = config->sign_fun;
637 0 : glob->sign_arg = config->sign_arg;
638 :
639 0 : fd_gossip_unlock( glob );
640 :
641 0 : return 0;
642 0 : }
643 :
644 : /* Updates the sockets and addrs lists in contact_info_v2 */
645 : static void
646 0 : fd_gossip_refresh_contact_info_v2_sockets( fd_gossip_node_addrs_t const * addrs, fd_gossip_node_contact_t * ci_int ) {
647 0 : fd_gossip_contact_info_v2_t * info = ci_int->ci;
648 0 : ushort last_port = 0;
649 0 : uchar addr_cnt = 0;
650 0 : uchar sock_cnt = 0;
651 :
652 : /* contact_info_v2 stores port numbers in host order,
653 : ip addresses remain in network order... */
654 0 : ushort gossip_port = fd_ushort_bswap( addrs->gossip.port );
655 0 : ushort serve_repair_port = fd_ushort_bswap( addrs->serve_repair.port );
656 0 : ushort tvu_port = fd_ushort_bswap( addrs->tvu.port );
657 0 : ushort tpu_port = fd_ushort_bswap( addrs->tpu.port );
658 0 : ushort tpu_quic_port = fd_ushort_bswap( (ushort)(addrs->tpu_quic.port) );
659 0 : ushort tpu_vote_port = fd_ushort_bswap( addrs->tpu_vote.port );
660 :
661 : /* Loop is bounded by number of if( xx_port > 0 ... ) statements + 1, so 7. */
662 0 : for(;;) {
663 0 : fd_gossip_peer_addr_t const * min_addr = NULL;
664 0 : ushort min_port = USHORT_MAX;
665 0 : uchar min_key = 0;
666 :
667 : /* Find the next socket to add to the list, based on next min port number */
668 0 : if( gossip_port > 0 && gossip_port > last_port && gossip_port < min_port ) {
669 0 : min_key = FD_GOSSIP_SOCKET_TAG_GOSSIP;
670 0 : min_addr = &addrs->gossip;
671 0 : min_port = gossip_port;
672 0 : }
673 0 : if( serve_repair_port > 0 && serve_repair_port > last_port && serve_repair_port < min_port ) {
674 0 : min_key = FD_GOSSIP_SOCKET_TAG_SERVE_REPAIR;
675 0 : min_addr = &addrs->serve_repair;
676 0 : min_port = serve_repair_port;
677 0 : }
678 0 : if( tvu_port > 0 && tvu_port > last_port && tvu_port < min_port ) {
679 0 : min_key = FD_GOSSIP_SOCKET_TAG_TVU;
680 0 : min_addr = &addrs->tvu;
681 0 : min_port = tvu_port;
682 0 : }
683 0 : if( tpu_port > 0 && tpu_port > last_port && tpu_port < min_port ) {
684 0 : min_key = FD_GOSSIP_SOCKET_TAG_TPU;
685 0 : min_addr = &addrs->tpu;
686 0 : min_port = tpu_port;
687 0 : }
688 0 : if( tpu_quic_port > 0 && tpu_quic_port > last_port && tpu_quic_port < min_port ) {
689 0 : min_key = FD_GOSSIP_SOCKET_TAG_TPU_QUIC;
690 0 : min_addr = &addrs->tpu;
691 0 : min_port = tpu_quic_port;
692 0 : }
693 0 : if( tpu_vote_port > 0 && tpu_vote_port > last_port && tpu_vote_port < min_port ) {
694 0 : min_key = FD_GOSSIP_SOCKET_TAG_TPU_VOTE;
695 0 : min_addr = &addrs->tpu_vote;
696 0 : min_port = tpu_vote_port;
697 0 : }
698 0 : if( min_port==USHORT_MAX ) {
699 0 : break;
700 0 : }
701 :
702 0 : if( FD_UNLIKELY( sock_cnt >= FD_GOSSIP_SOCKET_TAG_MAX )) {
703 0 : FD_LOG_ERR(("Too many sockets in contact info, possible broken implementation of fd_gossip_refresh_contact_info_v2_sockets"));
704 0 : }
705 :
706 : /* Check if ip addr exists in addrs list already */
707 0 : uchar min_addr_idx = 0;
708 0 : for( ; min_addr_idx < addr_cnt; min_addr_idx++ ) {
709 0 : if( info->addrs[min_addr_idx].inner.ip4 == min_addr->addr ) {
710 0 : break;
711 0 : }
712 0 : }
713 :
714 0 : if( min_addr_idx == addr_cnt ) {
715 : /* ip addr not found, add it */
716 0 : fd_gossip_ip_addr_new_disc( &info->addrs[ addr_cnt ], fd_gossip_ip_addr_enum_ip4 );
717 0 : info->addrs[ addr_cnt ].inner.ip4 = min_addr->addr;
718 0 : min_addr_idx = addr_cnt;
719 0 : addr_cnt++;
720 0 : }
721 :
722 0 : info->sockets[ sock_cnt ].index = min_addr_idx;
723 0 : info->sockets[ sock_cnt ].offset = (ushort)( min_port - last_port );
724 0 : info->sockets[ sock_cnt ].key = min_key;
725 0 : sock_cnt++;
726 0 : last_port = min_port;
727 :
728 0 : if( FD_UNLIKELY( min_key == FD_GOSSIP_SOCKET_TAG_TPU ) ){
729 : /* FIXME: This is a hack around the fact that TPU and TPU vote
730 : share the same port. */
731 0 : info->sockets[ sock_cnt ].index = min_addr_idx;
732 0 : info->sockets[ sock_cnt ].offset = 0U;
733 0 : info->sockets[ sock_cnt ].key = FD_GOSSIP_SOCKET_TAG_TPU_VOTE;
734 0 : sock_cnt++;
735 0 : }
736 0 : }
737 :
738 0 : info->addrs_len = addr_cnt;
739 0 : info->sockets_len = sock_cnt;
740 0 : info->extensions_len = 0;
741 0 : }
742 :
743 : int
744 0 : fd_gossip_update_addr( fd_gossip_t * glob, const fd_gossip_peer_addr_t * my_addr ) {
745 0 : char tmp[100];
746 0 : FD_LOG_NOTICE(("updating address %s", fd_gossip_addr_str(tmp, sizeof(tmp), my_addr)));
747 :
748 0 : fd_gossip_lock( glob );
749 0 : fd_gossip_peer_addr_copy( &glob->my_node_addrs.gossip, my_addr );
750 :
751 0 : fd_gossip_refresh_contact_info_v2_sockets( &glob->my_node_addrs, &glob->my_contact );
752 0 : fd_gossip_unlock( glob );
753 0 : return 0;
754 0 : }
755 :
756 : int
757 0 : fd_gossip_update_repair_addr( fd_gossip_t * glob, const fd_gossip_peer_addr_t * serve ) {
758 0 : char tmp[100];
759 0 : FD_LOG_NOTICE(("updating repair service address %s", fd_gossip_addr_str(tmp, sizeof(tmp), serve)));
760 :
761 0 : fd_gossip_lock( glob );
762 0 : fd_gossip_peer_addr_copy( &glob->my_node_addrs.serve_repair, serve );
763 :
764 0 : fd_gossip_refresh_contact_info_v2_sockets( &glob->my_node_addrs, &glob->my_contact );
765 0 : fd_gossip_unlock( glob );
766 0 : return 0;
767 0 : }
768 :
769 : int
770 0 : fd_gossip_update_tvu_addr( fd_gossip_t * glob, const fd_gossip_peer_addr_t * tvu ) {
771 0 : char tmp[100];
772 0 : FD_LOG_NOTICE(("updating tvu service address %s", fd_gossip_addr_str(tmp, sizeof(tmp), tvu)));
773 :
774 0 : fd_gossip_lock( glob );
775 0 : fd_gossip_peer_addr_copy( &glob->my_node_addrs.tvu, tvu );
776 :
777 0 : fd_gossip_refresh_contact_info_v2_sockets( &glob->my_node_addrs, &glob->my_contact );
778 0 : fd_gossip_unlock( glob );
779 0 : return 0;
780 0 : }
781 :
782 : int
783 : fd_gossip_update_tpu_addr( fd_gossip_t * glob,
784 : fd_gossip_peer_addr_t const * tpu,
785 0 : fd_gossip_peer_addr_t const * tpu_quic ) {
786 0 : char tmp[100];
787 0 : FD_LOG_NOTICE(("updating tpu service address %s", fd_gossip_addr_str(tmp, sizeof(tmp), tpu) ));
788 0 : FD_LOG_NOTICE(("updating tpu_quic service address %s", fd_gossip_addr_str(tmp, sizeof(tmp), tpu_quic ) ));
789 :
790 0 : fd_gossip_lock( glob );
791 0 : fd_gossip_peer_addr_copy( &glob->my_node_addrs.tpu, tpu );
792 0 : fd_gossip_peer_addr_copy( &glob->my_node_addrs.tpu_quic, tpu_quic );
793 :
794 0 : fd_gossip_refresh_contact_info_v2_sockets( &glob->my_node_addrs, &glob->my_contact );
795 0 : fd_gossip_unlock( glob );
796 :
797 0 : return 0;
798 0 : }
799 :
800 : int
801 0 : fd_gossip_update_tpu_vote_addr( fd_gossip_t * glob, const fd_gossip_peer_addr_t * tpu_vote ) {
802 0 : char tmp[100];
803 0 : FD_LOG_NOTICE(("updating tpu vote service address %s", fd_gossip_addr_str(tmp, sizeof(tmp), tpu_vote)));
804 :
805 0 : fd_gossip_lock( glob );
806 0 : fd_gossip_peer_addr_copy(&glob->my_node_addrs.tpu_vote, tpu_vote);
807 :
808 0 : fd_gossip_refresh_contact_info_v2_sockets( &glob->my_node_addrs, &glob->my_contact );
809 0 : fd_gossip_unlock( glob );
810 :
811 0 : return 0;
812 0 : }
813 :
814 : void
815 0 : fd_gossip_set_shred_version( fd_gossip_t * glob, ushort shred_version ) {
816 0 : glob->my_contact.ci->shred_version = shred_version;
817 0 : }
818 :
819 : /* Add an event to the queue of pending timed events. The resulting
820 : value needs "fun" and "fun_arg" to be set. */
821 : static fd_pending_event_t *
822 0 : fd_gossip_add_pending( fd_gossip_t * glob, long when ) {
823 0 : if (fd_pending_pool_free( glob->event_pool ) == 0)
824 0 : return NULL;
825 0 : fd_pending_event_t * ev = fd_pending_pool_ele_acquire( glob->event_pool );
826 0 : ev->key = when;
827 0 : fd_pending_heap_ele_insert( glob->event_heap, ev, glob->event_pool );
828 0 : return ev;
829 0 : }
830 :
831 : /* Send raw data as a UDP packet to an address */
832 : static void
833 0 : fd_gossip_send_raw( fd_gossip_t * glob, const fd_gossip_peer_addr_t * dest, void * data, size_t sz) {
834 0 : if ( sz > PACKET_DATA_SIZE ) {
835 0 : FD_LOG_ERR(("sending oversized packet, size=%lu", sz));
836 0 : }
837 0 : glob->metrics.send_packet_cnt += 1UL;
838 0 : fd_gossip_unlock( glob );
839 0 : (*glob->send_fun)(data, sz, dest, glob->send_arg);
840 0 : fd_gossip_lock( glob );
841 0 : }
842 :
843 : /* Send a gossip message to an address */
844 : static void
845 0 : fd_gossip_send( fd_gossip_t * glob, const fd_gossip_peer_addr_t * dest, fd_gossip_msg_t * gmsg ) {
846 : /* Encode the data */
847 0 : uchar buf[PACKET_DATA_SIZE];
848 0 : fd_bincode_encode_ctx_t ctx;
849 0 : ctx.data = buf;
850 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
851 0 : if ( fd_gossip_msg_encode( gmsg, &ctx ) ) {
852 0 : FD_LOG_ERR(( "fd_gossip_msg_encode failed" ));
853 0 : }
854 0 : size_t sz = (size_t)((const uchar *)ctx.data - buf);
855 0 : fd_gossip_send_raw( glob, dest, buf, sz);
856 0 : glob->metrics.send_message[ gmsg->discriminant ] += 1UL;
857 : // char tmp[100];
858 : // FD_LOG_WARNING(("sent msg type %u to %s size=%lu", gmsg->discriminant, fd_gossip_addr_str(tmp, sizeof(tmp), dest), sz));
859 0 : }
860 :
861 : /* Initiate the ping/pong protocol to a validator address */
862 : static void
863 0 : fd_gossip_make_ping( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
864 : /* Update the active table where we track the state of the ping/pong
865 : protocol */
866 0 : fd_gossip_peer_addr_t * key = &arg->key;
867 0 : fd_active_elem_t * val = fd_active_table_query(glob->actives, key, NULL);
868 0 : if (val == NULL) {
869 0 : if (fd_active_table_is_full(glob->actives)) {
870 0 : glob->metrics.send_ping_events[ FD_METRICS_ENUM_SEND_PING_EVENT_V_ACTIVES_TABLE_FULL_IDX ] += 1UL;
871 0 : return;
872 0 : }
873 0 : val = fd_active_table_insert(glob->actives, key);
874 0 : fd_active_new_value(val);
875 0 : glob->metrics.send_ping_events[ FD_METRICS_ENUM_SEND_PING_EVENT_V_ACTIVES_TABLE_INSERT_IDX ] += 1UL;
876 0 : } else {
877 0 : if (val->pongtime != 0)
878 : /* Success */
879 0 : return;
880 0 : if (val->pingcount++ >= MAX_PEER_PING_COUNT) {
881 : /* Give up. This is a bad peer. */
882 0 : glob->metrics.send_ping_events[ FD_METRICS_ENUM_SEND_PING_EVENT_V_MAX_PING_COUNT_EXCEEDED_IDX ] += 1UL;
883 0 : fd_active_table_remove(glob->actives, key);
884 0 : fd_peer_table_remove(glob->peers, key);
885 0 : return;
886 0 : }
887 0 : }
888 0 : val->pingtime = glob->now;
889 : /* Generate a new token when we start a fresh round of pinging */
890 0 : if (val->pingcount == 1U) {
891 0 : for ( ulong i = 0; i < FD_HASH_FOOTPRINT / sizeof(ulong); ++i ) {
892 0 : val->pingtoken.ul[i] = fd_rng_ulong(glob->rng);
893 0 : }
894 0 : }
895 :
896 : /* Keep pinging until we succeed */
897 0 : fd_pending_event_t * ev = fd_gossip_add_pending( glob, glob->now + (long)2e8 /* 200 ms */ );
898 0 : if (ev != NULL) {
899 0 : ev->fun = fd_gossip_make_ping;
900 0 : fd_gossip_peer_addr_copy(&ev->fun_arg.key, key);
901 0 : }
902 :
903 0 : fd_pubkey_t * public_key = glob->public_key;
904 :
905 : /* Build a ping message */
906 0 : fd_gossip_msg_t gmsg;
907 0 : fd_gossip_msg_new_disc(&gmsg, fd_gossip_msg_enum_ping);
908 0 : fd_gossip_ping_t * ping = &gmsg.inner.ping;
909 0 : fd_hash_copy( &ping->from, public_key );
910 :
911 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
912 0 : fd_memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
913 0 : fd_memcpy( pre_image+16UL, val->pingtoken.uc, 32UL );
914 :
915 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, &ping->token );
916 :
917 : /* Sign it */
918 :
919 0 : (*glob->sign_fun)( glob->sign_arg, ping->signature.uc, pre_image, FD_PING_PRE_IMAGE_SZ, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519 );
920 :
921 0 : fd_gossip_send( glob, key, &gmsg );
922 0 : }
923 :
924 : /* Respond to a ping from another validator */
925 : static void
926 0 : fd_gossip_handle_ping( fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_gossip_ping_t const * ping ) {
927 : /* Verify the signature */
928 0 : fd_sha512_t sha2[1];
929 0 : if (fd_ed25519_verify( /* msg */ ping->token.uc,
930 0 : /* sz */ 32UL,
931 0 : /* sig */ ping->signature.uc,
932 0 : /* public_key */ ping->from.uc,
933 0 : sha2 )) {
934 0 : glob->metrics.recv_ping_invalid_signature += 1UL;
935 0 : FD_LOG_WARNING(("received ping with invalid signature"));
936 0 : return;
937 0 : }
938 :
939 : /* Build a pong message */
940 0 : fd_gossip_msg_t gmsg;
941 0 : fd_gossip_msg_new_disc(&gmsg, fd_gossip_msg_enum_pong);
942 0 : fd_gossip_ping_t * pong = &gmsg.inner.pong;
943 :
944 0 : fd_pubkey_t * public_key = glob->public_key;
945 :
946 0 : fd_hash_copy( &pong->from, public_key );
947 :
948 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
949 0 : fd_memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
950 0 : fd_memcpy( pre_image+16UL, ping->token.uc, 32UL);
951 :
952 : /* Generate response hash token */
953 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, &pong->token );
954 :
955 : /* Sign it */
956 0 : (*glob->sign_fun)( glob->sign_arg, pong->signature.uc, pre_image, FD_PING_PRE_IMAGE_SZ, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519 );
957 :
958 0 : fd_gossip_send(glob, from, &gmsg);
959 0 : }
960 :
961 : /* Sign/timestamp an outgoing crds value */
962 : static void
963 0 : fd_gossip_sign_crds_value( fd_gossip_t * glob, fd_crds_value_t * crd ) {
964 : /* Update the identifier and timestamp */
965 0 : fd_pubkey_t * pubkey;
966 0 : ulong * wallclock;
967 0 : switch (crd->data.discriminant) {
968 0 : case fd_crds_data_enum_contact_info_v1:
969 0 : pubkey = &crd->data.inner.contact_info_v1.id;
970 0 : wallclock = &crd->data.inner.contact_info_v1.wallclock;
971 0 : break;
972 0 : case fd_crds_data_enum_vote:
973 0 : pubkey = &crd->data.inner.vote.from;
974 0 : wallclock = &crd->data.inner.vote.wallclock;
975 0 : break;
976 0 : case fd_crds_data_enum_lowest_slot:
977 0 : pubkey = &crd->data.inner.lowest_slot.from;
978 0 : wallclock = &crd->data.inner.lowest_slot.wallclock;
979 0 : break;
980 0 : case fd_crds_data_enum_snapshot_hashes:
981 0 : pubkey = &crd->data.inner.snapshot_hashes.from;
982 0 : wallclock = &crd->data.inner.snapshot_hashes.wallclock;
983 0 : break;
984 0 : case fd_crds_data_enum_accounts_hashes:
985 0 : pubkey = &crd->data.inner.accounts_hashes.from;
986 0 : wallclock = &crd->data.inner.accounts_hashes.wallclock;
987 0 : break;
988 0 : case fd_crds_data_enum_epoch_slots:
989 0 : pubkey = &crd->data.inner.epoch_slots.from;
990 0 : wallclock = &crd->data.inner.epoch_slots.wallclock;
991 0 : break;
992 0 : case fd_crds_data_enum_version_v1:
993 0 : pubkey = &crd->data.inner.version_v1.from;
994 0 : wallclock = &crd->data.inner.version_v1.wallclock;
995 0 : break;
996 0 : case fd_crds_data_enum_version_v2:
997 0 : pubkey = &crd->data.inner.version_v2.from;
998 0 : wallclock = &crd->data.inner.version_v2.wallclock;
999 0 : break;
1000 0 : case fd_crds_data_enum_node_instance:
1001 0 : pubkey = &crd->data.inner.node_instance.from;
1002 0 : wallclock = &crd->data.inner.node_instance.wallclock;
1003 0 : break;
1004 0 : case fd_crds_data_enum_duplicate_shred:
1005 0 : pubkey = &crd->data.inner.duplicate_shred.from;
1006 0 : wallclock = &crd->data.inner.duplicate_shred.wallclock;
1007 0 : break;
1008 0 : case fd_crds_data_enum_incremental_snapshot_hashes:
1009 0 : pubkey = &crd->data.inner.incremental_snapshot_hashes.from;
1010 0 : wallclock = &crd->data.inner.incremental_snapshot_hashes.wallclock;
1011 0 : break;
1012 0 : case fd_crds_data_enum_contact_info_v2:
1013 0 : pubkey = &crd->data.inner.contact_info_v2.from;
1014 0 : wallclock = &crd->data.inner.contact_info_v2.wallclock;
1015 0 : break;
1016 0 : case fd_crds_data_enum_restart_last_voted_fork_slots:
1017 0 : pubkey = &crd->data.inner.restart_last_voted_fork_slots.from;
1018 0 : wallclock = &crd->data.inner.restart_last_voted_fork_slots.wallclock;
1019 0 : break;
1020 0 : case fd_crds_data_enum_restart_heaviest_fork:
1021 0 : pubkey = &crd->data.inner.restart_heaviest_fork.from;
1022 0 : wallclock = &crd->data.inner.restart_heaviest_fork.wallclock;
1023 0 : break;
1024 0 : default:
1025 0 : return;
1026 0 : }
1027 0 : fd_pubkey_t * public_key = glob->public_key;
1028 0 : fd_hash_copy(pubkey, public_key);
1029 0 : *wallclock = FD_NANOSEC_TO_MILLI(glob->now); /* convert to ms */
1030 :
1031 : /* Sign it */
1032 0 : uchar buf[PACKET_DATA_SIZE];
1033 0 : fd_bincode_encode_ctx_t ctx;
1034 0 : ctx.data = buf;
1035 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
1036 0 : if ( fd_crds_data_encode( &crd->data, &ctx ) ) {
1037 0 : FD_LOG_WARNING(("fd_crds_data_encode failed"));
1038 0 : return;
1039 0 : }
1040 :
1041 0 : (*glob->sign_fun)( glob->sign_arg, crd->signature.uc, buf, (ulong)((uchar*)ctx.data - buf), FD_KEYGUARD_SIGN_TYPE_ED25519 );
1042 0 : }
1043 :
1044 : /* Convert a hash to a bloom filter bit position
1045 : https://github.com/anza-xyz/agave/blob/v2.1.7/bloom/src/bloom.rs#L136 */
1046 : static ulong
1047 0 : fd_gossip_bloom_pos( fd_hash_t * hash, ulong key, ulong nbits) {
1048 0 : for ( ulong i = 0; i < 32U; ++i) {
1049 0 : key ^= (ulong)(hash->uc[i]);
1050 0 : key *= 1099511628211UL; // FNV prime
1051 0 : }
1052 0 : return key % nbits;
1053 0 : }
1054 :
1055 : /* Choose a random active peer with good ping count */
1056 : static fd_active_elem_t *
1057 0 : fd_gossip_random_active( fd_gossip_t * glob ) {
1058 : /* Create a list of active peers with minimal pings */
1059 0 : fd_active_elem_t * list[FD_ACTIVE_KEY_MAX];
1060 0 : ulong listlen = 0;
1061 0 : ulong totweight = 0;
1062 0 : for( fd_active_table_iter_t iter = fd_active_table_iter_init( glob->actives );
1063 0 : !fd_active_table_iter_done( glob->actives, iter );
1064 0 : iter = fd_active_table_iter_next( glob->actives, iter ) ) {
1065 0 : fd_active_elem_t * ele = fd_active_table_iter_ele( glob->actives, iter );
1066 :
1067 0 : if (ele->pongtime == 0 && !fd_gossip_is_allowed_entrypoint( glob, &ele->key )) {
1068 0 : continue;
1069 0 : } else if (listlen == 0) {
1070 0 : list[0] = ele;
1071 0 : listlen = 1;
1072 0 : totweight = ele->weight;
1073 0 : } else if (ele->pingcount > list[0]->pingcount) {
1074 0 : continue;
1075 0 : } else if (ele->pingcount < list[0]->pingcount) {
1076 : /* Reset the list */
1077 0 : list[0] = ele;
1078 0 : listlen = 1;
1079 0 : totweight = ele->weight;
1080 0 : } else {
1081 0 : list[listlen++] = ele;
1082 0 : totweight += ele->weight;
1083 0 : }
1084 0 : }
1085 0 : if (listlen == 0 || totweight == 0)
1086 0 : return NULL;
1087 : /* Choose a random list element by weight */
1088 0 : ulong w = fd_rng_ulong(glob->rng) % totweight;
1089 0 : ulong j = 0;
1090 0 : for( ulong i = 0; i < listlen; ++i) {
1091 0 : if( w < j + list[i]->weight )
1092 0 : return list[i];
1093 0 : j += list[i]->weight;
1094 0 : }
1095 0 : FD_LOG_CRIT(( "I shouldn't be here" ));
1096 0 : return NULL;
1097 0 : }
1098 :
1099 : /* Generate a pull request for a random active peer */
1100 : static void
1101 0 : fd_gossip_random_pull( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
1102 0 : (void)arg;
1103 :
1104 : /* Try again in 5 sec */
1105 0 : fd_pending_event_t * ev = fd_gossip_add_pending(glob, glob->now + (long)100e6);
1106 0 : if (ev) {
1107 0 : ev->fun = fd_gossip_random_pull;
1108 0 : }
1109 :
1110 : /* Pick a random partner */
1111 0 : fd_active_elem_t * ele = fd_gossip_random_active(glob);
1112 0 : if (ele == NULL)
1113 0 : return;
1114 :
1115 : /* Compute the number of packets needed for all the bloom filter parts
1116 : with a desired false positive rate <0.1% (upper bounded by FD_BLOOM_MAX_PACKETS ) */
1117 0 : ulong nitems = fd_value_table_key_cnt(glob->values);
1118 0 : ulong nkeys = 1;
1119 0 : ulong npackets = 1;
1120 0 : uint nmaskbits = 0;
1121 0 : double e = 0;
1122 0 : if (nitems > 0) {
1123 0 : do {
1124 0 : double n = ((double)nitems)/((double)npackets); /* Assume even division of values */
1125 0 : double m = (double)FD_BLOOM_NUM_BITS;
1126 0 : nkeys = fd_ulong_max(1U, (ulong)((m/n)*0.69314718055994530941723212145818 /* ln(2) */));
1127 0 : nkeys = fd_ulong_min(nkeys, FD_BLOOM_MAX_KEYS);
1128 0 : if (npackets == FD_BLOOM_MAX_PACKETS)
1129 0 : break;
1130 0 : double k = (double)nkeys;
1131 0 : e = pow(1.0 - exp(-k*n/m), k);
1132 0 : if (e < 0.001)
1133 0 : break;
1134 0 : nmaskbits++;
1135 0 : npackets = 1U<<nmaskbits;
1136 0 : } while (1);
1137 0 : }
1138 0 : FD_LOG_DEBUG(("making bloom filter for %lu items with %lu packets, %u maskbits and %lu keys %g error", nitems, npackets, nmaskbits, nkeys, e));
1139 :
1140 : /* Generate random keys */
1141 0 : ulong keys[FD_BLOOM_MAX_KEYS];
1142 0 : for (ulong i = 0; i < nkeys; ++i)
1143 0 : keys[i] = fd_rng_ulong(glob->rng);
1144 : /* Set all the bits */
1145 0 : ulong num_bits_set[FD_BLOOM_MAX_PACKETS];
1146 0 : for (ulong i = 0; i < npackets; ++i)
1147 0 : num_bits_set[i] = 0;
1148 :
1149 : /* Bloom filter set sampling
1150 :
1151 : This effectively translates to sending out 1/(SAMPLE_RATE)
1152 : of the full bloom filter set every new pull request
1153 : loop.
1154 :
1155 : Some observations:
1156 : - Duplicate CRDS received rate* decreases dramatically
1157 : (from being 90% of incoming CRDS traffic to nearly negligible)
1158 : - Unique CRDS received rate* doesn't appear to be affected by
1159 : sampling
1160 :
1161 : * rate measured in values/minute
1162 : https://github.com/anza-xyz/agave/blob/v2.1.11/gossip/src/crds_gossip_pull.rs#L157-L163 */
1163 0 : ushort indices[FD_BLOOM_MAX_PACKETS];
1164 0 : for( ushort i = 0; i < npackets; ++i) {
1165 0 : indices[i] = i;
1166 0 : }
1167 0 : ushort indices_len = (ushort)npackets;
1168 0 : ulong filter_sample_size = (indices_len + FD_BLOOM_SAMPLE_RATE - 1) / FD_BLOOM_SAMPLE_RATE;
1169 0 : FD_TEST( filter_sample_size <= FD_BLOOM_MAX_PACKETS );
1170 :
1171 0 : fd_gossip_filter_selection_t selected_filters = fd_gossip_filter_selection_null();
1172 :
1173 0 : for( ushort i = 0; i < filter_sample_size; ++i) {
1174 0 : ulong idx = fd_rng_ushort( glob->rng ) % indices_len;
1175 :
1176 : /* swap and remove */
1177 0 : ushort filter_idx = indices[ idx ];
1178 0 : indices[ idx ] = indices[ --indices_len ];
1179 :
1180 : /* insert */
1181 0 : selected_filters = fd_gossip_filter_selection_insert( selected_filters, filter_idx );
1182 0 : }
1183 :
1184 0 : FD_TEST( fd_gossip_filter_selection_cnt( selected_filters ) == filter_sample_size );
1185 :
1186 :
1187 0 : #define CHUNKSIZE (FD_BLOOM_NUM_BITS/64U)
1188 0 : ulong bits[CHUNKSIZE * FD_BLOOM_MAX_PACKETS]; /* TODO: can we bound size based on sample rate instead? */
1189 0 : fd_memset(bits, 0, CHUNKSIZE*8U*npackets);
1190 0 : ulong expire = FD_NANOSEC_TO_MILLI(glob->now) - FD_GOSSIP_VALUE_EXPIRE;
1191 0 : for( fd_value_table_iter_t iter = fd_value_table_iter_init( glob->values );
1192 0 : !fd_value_table_iter_done( glob->values, iter );
1193 0 : iter = fd_value_table_iter_next( glob->values, iter ) ) {
1194 0 : fd_value_elem_t * ele = fd_value_table_iter_ele( glob->values, iter );
1195 0 : fd_hash_t * hash = &(ele->key);
1196 : /* Purge expired values */
1197 0 : if (ele->wallclock < expire) {
1198 0 : fd_value_table_remove( glob->values, hash );
1199 0 : continue;
1200 0 : }
1201 : /* Choose which filter packet based on the high bits in the hash,
1202 : https://github.com/anza-xyz/agave/blob/v2.1.7/gossip/src/crds_gossip_pull.rs#L167
1203 :
1204 : skip if packet not part of sample
1205 : https://github.com/anza-xyz/agave/blob/v2.1.11/gossip/src/crds_gossip_pull.rs#L175-L177
1206 : */
1207 :
1208 0 : ulong index = (nmaskbits == 0 ? 0UL : ( hash->ul[0] >> (64U - nmaskbits) ));
1209 0 : if( FD_LIKELY( !fd_gossip_filter_selection_test( selected_filters, index ) ) ) {
1210 0 : continue;
1211 0 : };
1212 :
1213 0 : ulong * chunk = bits + (index*CHUNKSIZE);
1214 :
1215 : /* https://github.com/anza-xyz/agave/blob/v2.1.7/bloom/src/bloom.rs#L191 */
1216 0 : for (ulong i = 0; i < nkeys; ++i) {
1217 0 : ulong pos = fd_gossip_bloom_pos(hash, keys[i], FD_BLOOM_NUM_BITS);
1218 : /* https://github.com/anza-xyz/agave/blob/v2.1.7/bloom/src/bloom.rs#L182-L185 */
1219 0 : ulong * j = chunk + (pos>>6U); /* divide by 64 */
1220 0 : ulong bit = 1UL<<(pos & 63U);
1221 0 : if (!((*j) & bit)) {
1222 0 : *j |= bit;
1223 0 : num_bits_set[index]++;
1224 0 : }
1225 0 : }
1226 0 : }
1227 :
1228 : /* Assemble the packets */
1229 0 : fd_gossip_msg_t gmsg;
1230 0 : fd_gossip_msg_new_disc(&gmsg, fd_gossip_msg_enum_pull_req);
1231 0 : fd_gossip_pull_req_t * req = &gmsg.inner.pull_req;
1232 0 : fd_crds_filter_t * filter = &req->filter;
1233 0 : filter->mask_bits = nmaskbits;
1234 0 : filter->filter.keys_len = nkeys;
1235 0 : filter->filter.keys = keys;
1236 0 : fd_gossip_bitvec_u64_t * bitvec = &filter->filter.bits;
1237 0 : bitvec->len = FD_BLOOM_NUM_BITS;
1238 0 : bitvec->has_bits = 1;
1239 0 : bitvec->bits.vec_len = FD_BLOOM_NUM_BITS/64U;
1240 :
1241 : /* The "value" in the request is always my own contact info (v2) */
1242 0 : fd_crds_value_t * value = &req->value;
1243 0 : fd_crds_data_new_disc(&value->data, fd_crds_data_enum_contact_info_v2);
1244 0 : fd_gossip_contact_info_v2_t * ci = &value->data.inner.contact_info_v2;
1245 0 : fd_memcpy(ci, &glob->my_contact.crd.inner.contact_info_v2, sizeof(fd_gossip_contact_info_v2_t) );
1246 0 : fd_gossip_sign_crds_value(glob, value);
1247 :
1248 0 : for( fd_gossip_filter_selection_iter_t iter = fd_gossip_filter_selection_iter_init( selected_filters );
1249 0 : !fd_gossip_filter_selection_iter_done( iter );
1250 0 : iter = fd_gossip_filter_selection_iter_next( iter ) ){
1251 0 : ulong index = fd_gossip_filter_selection_iter_idx( iter );
1252 0 : filter->mask = (nmaskbits == 0 ? ~0UL : ((index << (64U - nmaskbits)) | (~0UL >> nmaskbits)));
1253 0 : filter->filter.num_bits_set = num_bits_set[index];
1254 0 : bitvec->bits.vec = bits + (index*CHUNKSIZE);
1255 0 : fd_gossip_send(glob, &ele->key, &gmsg);
1256 0 : }
1257 0 : }
1258 :
1259 : /* Handle a pong response */
1260 : static void
1261 0 : fd_gossip_handle_pong( fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_gossip_ping_t const * pong ) {
1262 0 : #define INC_RECV_PONG_EVENT_CNT( REASON ) glob->metrics.recv_pong_events[ FD_CONCAT3( FD_METRICS_ENUM_RECV_PONG_EVENT_V_, REASON, _IDX ) ] += 1UL
1263 0 : fd_active_elem_t * val = fd_active_table_query(glob->actives, from, NULL);
1264 0 : if (val == NULL) {
1265 0 : INC_RECV_PONG_EVENT_CNT( EXPIRED );
1266 0 : FD_LOG_DEBUG(("received pong too late"));
1267 0 : return;
1268 0 : }
1269 :
1270 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
1271 0 : fd_memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
1272 0 : fd_memcpy( pre_image+16UL, val->pingtoken.uc, 32UL );
1273 :
1274 :
1275 0 : fd_hash_t pre_image_hash;
1276 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, pre_image_hash.uc );
1277 :
1278 : /* Confirm response hash token */
1279 0 : fd_sha256_t sha[1];
1280 0 : fd_sha256_init( sha );
1281 0 : fd_sha256_append( sha, "SOLANA_PING_PONG", 16UL );
1282 :
1283 0 : fd_sha256_append( sha, pre_image_hash.uc, 32UL );
1284 0 : fd_hash_t pongtoken;
1285 0 : fd_sha256_fini( sha, pongtoken.uc );
1286 0 : if (memcmp(pongtoken.uc, pong->token.uc, 32UL) != 0) {
1287 0 : INC_RECV_PONG_EVENT_CNT( WRONG_TOKEN );
1288 0 : FD_LOG_DEBUG(( "received pong with wrong token" ));
1289 0 : return;
1290 0 : }
1291 :
1292 : /* Verify the signature */
1293 0 : fd_sha512_t sha2[1];
1294 0 : if (fd_ed25519_verify( /* msg */ pong->token.uc,
1295 0 : /* sz */ 32UL,
1296 0 : /* sig */ pong->signature.uc,
1297 0 : /* public_key */ pong->from.uc,
1298 0 : sha2 )) {
1299 0 : INC_RECV_PONG_EVENT_CNT( INVALID_SIGNATURE );
1300 0 : FD_LOG_WARNING(("received pong with invalid signature"));
1301 0 : return;
1302 0 : }
1303 :
1304 0 : val->pongtime = glob->now;
1305 0 : fd_hash_copy(&val->id, &pong->from);
1306 :
1307 : /* Remember that this is a good peer */
1308 0 : fd_peer_elem_t * peerval = fd_peer_table_query(glob->peers, from, NULL);
1309 0 : if (peerval == NULL) {
1310 0 : INC_RECV_PONG_EVENT_CNT( NEW_PEER );
1311 0 : if (fd_peer_table_is_full(glob->peers)) {
1312 0 : INC_RECV_PONG_EVENT_CNT( TABLE_FULL );
1313 0 : FD_LOG_DEBUG(("too many peers"));
1314 0 : return;
1315 0 : }
1316 0 : peerval = fd_peer_table_insert(glob->peers, from);
1317 0 : peerval->stake = 0;
1318 0 : }
1319 0 : peerval->wallclock = FD_NANOSEC_TO_MILLI(glob->now); /* In millisecs */
1320 0 : fd_hash_copy(&peerval->id, &pong->from);
1321 :
1322 0 : fd_weights_elem_t const * val2 = fd_weights_table_query_const( glob->weights, &val->id, NULL );
1323 0 : val->weight = ( val2 == NULL ? 1UL : val2->weight );
1324 :
1325 0 : }
1326 :
1327 : /* Initiate a ping/pong with a random active partner to confirm it is
1328 : still alive. */
1329 : static void
1330 0 : fd_gossip_random_ping( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
1331 0 : (void)arg;
1332 :
1333 : /* Try again in 1 sec */
1334 0 : fd_pending_event_t * ev = fd_gossip_add_pending(glob, glob->now + (long)100e8);
1335 0 : if (ev) {
1336 0 : ev->fun = fd_gossip_random_ping;
1337 0 : }
1338 :
1339 0 : if (fd_pending_pool_free( glob->event_pool ) < 100U)
1340 0 : return;
1341 :
1342 0 : ulong cnt = fd_active_table_key_cnt(glob->actives);
1343 0 : if (cnt == 0 && glob->inactives_cnt == 0)
1344 0 : return;
1345 0 : fd_gossip_peer_addr_t * addr = NULL;
1346 0 : if (glob->inactives_cnt > 0 && cnt < FD_ACTIVE_KEY_MAX)
1347 : /* Try a new peer */
1348 0 : addr = glob->inactives + (--(glob->inactives_cnt));
1349 0 : else {
1350 : /* Choose a random active peer */
1351 0 : ulong i = fd_rng_ulong(glob->rng) % cnt;
1352 0 : ulong j = 0;
1353 0 : for( fd_active_table_iter_t iter = fd_active_table_iter_init( glob->actives );
1354 0 : !fd_active_table_iter_done( glob->actives, iter );
1355 0 : iter = fd_active_table_iter_next( glob->actives, iter ) ) {
1356 0 : if (i == j++) {
1357 0 : fd_active_elem_t * ele = fd_active_table_iter_ele( glob->actives, iter );
1358 0 : if (glob->now - ele->pingtime < (long)60e9) /* minute cooldown */
1359 0 : return;
1360 0 : ele->pingcount = 0;
1361 0 : ele->pongtime = 0;
1362 0 : addr = &(ele->key);
1363 0 : break;
1364 0 : }
1365 0 : }
1366 0 : }
1367 :
1368 0 : fd_pending_event_arg_t arg2;
1369 0 : fd_gossip_peer_addr_copy(&arg2.key, addr);
1370 0 : fd_gossip_make_ping(glob, &arg2);
1371 0 : }
1372 :
1373 : /* CRDS processing utils.
1374 : TODO: move to a separate fd_crds file? Need to decouple gossip metrics first */
1375 :
1376 : typedef struct{
1377 : fd_crds_value_t * crd; // Decoded CRDS value
1378 : uchar encoded_val[PACKET_DATA_SIZE]; // Raw encoded form
1379 : ulong encoded_len; // Length of encoded data
1380 : fd_hash_t val_hash; // Hash of the value (used as key in the value table)
1381 : fd_pubkey_t* pubkey; // Reference to origin's pubkey
1382 : ulong wallclock; // Timestamp
1383 : } fd_crds_value_processed_t;
1384 :
1385 : /* fd_crds_dedup_check returns 1 if key exists in the CRDS value table, 0 otherwise.
1386 : Also logs the
1387 : - the host that sent the duplicate message
1388 : - origin of the actual CRDS value
1389 : for use in making prune messages. */
1390 : static int
1391 0 : fd_crds_dup_check( fd_gossip_t * glob, fd_hash_t * key, const fd_gossip_peer_addr_t * from, const fd_pubkey_t * origin ) {
1392 0 : fd_value_elem_t * msg = fd_value_table_query(glob->values, key, NULL);
1393 :
1394 0 : if (msg != NULL) {
1395 : /* Already have this value */
1396 0 : if (from != NULL) {
1397 : /* Record the dup in the receive statistics table */
1398 0 : fd_stats_elem_t * val = fd_stats_table_query(glob->stats, from, NULL);
1399 0 : if (val == NULL) {
1400 0 : if (!fd_stats_table_is_full(glob->stats)) {
1401 0 : val = fd_stats_table_insert(glob->stats, from);
1402 0 : val->dups_cnt = 0;
1403 0 : }
1404 0 : }
1405 0 : if (val != NULL) {
1406 0 : val->last = glob->now;
1407 0 : for (ulong i = 0; i < val->dups_cnt; ++i){
1408 0 : if (fd_hash_eq(&val->dups[i].origin, origin)) {
1409 0 : val->dups[i].cnt++;
1410 0 : goto found_origin;
1411 0 : }
1412 0 : }
1413 0 : if (val->dups_cnt < 8) {
1414 0 : ulong i = val->dups_cnt++;
1415 0 : fd_hash_copy(&val->dups[i].origin, origin);
1416 0 : val->dups[i].cnt = 1;
1417 0 : }
1418 0 : found_origin: ;
1419 0 : }
1420 0 : }
1421 0 : return 1;
1422 0 : } else {
1423 0 : return 0;
1424 0 : }
1425 0 : }
1426 : /* fd_crds_sigverify verifies the data in an encoded CRDS value.
1427 : Assumes the following CRDS value layout (packed)
1428 : {
1429 : fd_signature_t signature;
1430 : uchar* data;
1431 : } */
1432 : static int
1433 0 : fd_crds_sigverify( uchar * crds_encoded_val, ulong crds_encoded_len, fd_pubkey_t * pubkey ) {
1434 :
1435 0 : fd_signature_t * sig = (fd_signature_t *)crds_encoded_val;
1436 0 : uchar * data = (crds_encoded_val + sizeof(fd_signature_t));
1437 0 : ulong datalen = crds_encoded_len - sizeof(fd_signature_t);
1438 :
1439 0 : static fd_sha512_t sha[1]; /* static is ok since ed25519_verify calls sha512_init */
1440 0 : return fd_ed25519_verify( data,
1441 0 : datalen,
1442 0 : sig->uc,
1443 0 : pubkey->uc,
1444 0 : sha );
1445 0 : }
1446 :
1447 :
1448 0 : #define INC_RECV_CRDS_DROP_METRIC( REASON ) glob->metrics.recv_crds_drop_reason[ FD_CONCAT3( FD_METRICS_ENUM_CRDS_DROP_REASON_V_, REASON, _IDX ) ] += 1UL
1449 :
1450 : /* fd_crds_insert sets up and inserts a fd_value_elem_t entry
1451 : from a fully populated crd_p.
1452 :
1453 : This only fails if the table is full. */
1454 : static void
1455 0 : fd_crds_insert( fd_gossip_t * glob, fd_crds_value_processed_t * crd_p ) {
1456 0 : if( FD_UNLIKELY( fd_value_table_is_full( glob->values ) ) ) {
1457 0 : INC_RECV_CRDS_DROP_METRIC( TABLE_FULL );
1458 0 : FD_LOG_DEBUG(( "too many values" ));
1459 0 : return;
1460 0 : }
1461 :
1462 0 : fd_value_elem_t * ele = fd_value_table_insert(glob->values, &crd_p->val_hash );
1463 0 : ele->wallclock = crd_p->wallclock;
1464 0 : fd_hash_copy( &ele->origin, crd_p->pubkey );
1465 :
1466 : /* Store encoded form of full CRDS value (including signature) */
1467 0 : fd_memcpy( ele->data, crd_p->encoded_val, crd_p->encoded_len );
1468 0 : ele->datalen = crd_p->encoded_len;
1469 0 : }
1470 :
1471 : /* fd_gossip_recv_crds_array processes crds_len crds values. First
1472 : performs a filter pass, dropping duplicate/own values and
1473 : exiting* on any sigverify failures. Then inserts the filtered
1474 : values into the CRDS value table. Also performs housekeeping, like
1475 : updating contact infos and push queue. The filtered crds data is
1476 : finally dispatched via the glob->deliver_fun callback.
1477 :
1478 : *(an exit drops the full packet, so no values are inserted into the table)
1479 : This only fails on a crds value encode failure, which is impossible if
1480 : the value was derived from a gossip message decode. */
1481 : static void
1482 0 : fd_gossip_recv_crds_array( fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_crds_value_t * crds, ulong crds_len, fd_gossip_crds_route_t route ) {
1483 0 : FD_SCRATCH_SCOPE_BEGIN{
1484 0 : fd_crds_value_processed_t * filtered_crds = (fd_crds_value_processed_t *)fd_scratch_alloc( alignof(fd_crds_value_processed_t), sizeof(fd_crds_value_processed_t) * crds_len );
1485 0 : ulong num_filtered_crds = 0;
1486 :
1487 : /* OK to reuse since sha256_init is called in every iteration */
1488 0 : fd_sha256_t sha2[1];
1489 :
1490 : /* Filter pass */
1491 0 : for( ulong i = 0; i < crds_len; ++i ) {
1492 0 : fd_crds_value_processed_t * tmp = &filtered_crds[ num_filtered_crds ]; /* This will overwrite if previous value was dedup, should be safe */
1493 0 : tmp->crd = &crds[ i ];
1494 :
1495 : /* Setup fd_crds_value_processed_t entry */
1496 :
1497 0 : if( FD_UNLIKELY( tmp->crd->data.discriminant>=FD_KNOWN_CRDS_ENUM_MAX ) ) {
1498 0 : INC_RECV_CRDS_DROP_METRIC( UNKNOWN_DISCRIMINANT );
1499 0 : } else {
1500 0 : glob->metrics.recv_crds[ route ][ tmp->crd->data.discriminant ] += 1UL;
1501 0 : }
1502 0 : switch (tmp->crd->data.discriminant) {
1503 0 : case fd_crds_data_enum_contact_info_v1:
1504 0 : tmp->pubkey = &tmp->crd->data.inner.contact_info_v1.id;
1505 0 : tmp->wallclock = tmp->crd->data.inner.contact_info_v1.wallclock;
1506 0 : break;
1507 0 : case fd_crds_data_enum_vote:
1508 0 : tmp->pubkey = &tmp->crd->data.inner.vote.from;
1509 0 : tmp->wallclock = tmp->crd->data.inner.vote.wallclock;
1510 0 : break;
1511 0 : case fd_crds_data_enum_lowest_slot:
1512 0 : tmp->pubkey = &tmp->crd->data.inner.lowest_slot.from;
1513 0 : tmp->wallclock = tmp->crd->data.inner.lowest_slot.wallclock;
1514 0 : break;
1515 0 : case fd_crds_data_enum_snapshot_hashes:
1516 0 : tmp->pubkey = &tmp->crd->data.inner.snapshot_hashes.from;
1517 0 : tmp->wallclock = tmp->crd->data.inner.snapshot_hashes.wallclock;
1518 0 : break;
1519 0 : case fd_crds_data_enum_accounts_hashes:
1520 0 : tmp->pubkey = &tmp->crd->data.inner.accounts_hashes.from;
1521 0 : tmp->wallclock = tmp->crd->data.inner.accounts_hashes.wallclock;
1522 0 : break;
1523 0 : case fd_crds_data_enum_epoch_slots:
1524 0 : tmp->pubkey = &tmp->crd->data.inner.epoch_slots.from;
1525 0 : tmp->wallclock = tmp->crd->data.inner.epoch_slots.wallclock;
1526 0 : break;
1527 0 : case fd_crds_data_enum_version_v1:
1528 0 : tmp->pubkey = &tmp->crd->data.inner.version_v1.from;
1529 0 : tmp->wallclock = tmp->crd->data.inner.version_v1.wallclock;
1530 0 : break;
1531 0 : case fd_crds_data_enum_version_v2:
1532 0 : tmp->pubkey = &tmp->crd->data.inner.version_v2.from;
1533 0 : tmp->wallclock = tmp->crd->data.inner.version_v2.wallclock;
1534 0 : break;
1535 0 : case fd_crds_data_enum_node_instance:
1536 0 : tmp->pubkey = &tmp->crd->data.inner.node_instance.from;
1537 0 : tmp->wallclock = tmp->crd->data.inner.node_instance.wallclock;
1538 0 : break;
1539 0 : case fd_crds_data_enum_duplicate_shred:
1540 0 : tmp->pubkey = &tmp->crd->data.inner.duplicate_shred.from;
1541 0 : tmp->wallclock = tmp->crd->data.inner.duplicate_shred.wallclock;
1542 0 : break;
1543 0 : case fd_crds_data_enum_incremental_snapshot_hashes:
1544 0 : tmp->pubkey = &tmp->crd->data.inner.incremental_snapshot_hashes.from;
1545 0 : tmp->wallclock = tmp->crd->data.inner.incremental_snapshot_hashes.wallclock;
1546 0 : break;
1547 0 : case fd_crds_data_enum_contact_info_v2:
1548 0 : tmp->pubkey = &tmp->crd->data.inner.contact_info_v2.from;
1549 0 : tmp->wallclock = tmp->crd->data.inner.contact_info_v2.wallclock;
1550 0 : break;
1551 0 : case fd_crds_data_enum_restart_last_voted_fork_slots:
1552 0 : tmp->pubkey = &tmp->crd->data.inner.restart_last_voted_fork_slots.from;
1553 0 : tmp->wallclock = tmp->crd->data.inner.restart_last_voted_fork_slots.wallclock;
1554 0 : break;
1555 0 : case fd_crds_data_enum_restart_heaviest_fork:
1556 0 : tmp->pubkey = &tmp->crd->data.inner.restart_heaviest_fork.from;
1557 0 : tmp->wallclock = tmp->crd->data.inner.restart_heaviest_fork.wallclock;
1558 0 : break;
1559 0 : default:
1560 0 : tmp->wallclock = FD_NANOSEC_TO_MILLI(glob->now); /* In millisecs */
1561 0 : break;
1562 0 : }
1563 0 : if (memcmp(tmp->pubkey->uc, glob->public_key->uc, 32U) == 0) {
1564 : /* skip my own messages */
1565 0 : INC_RECV_CRDS_DROP_METRIC( OWN_MESSAGE );
1566 0 : continue;
1567 0 : }
1568 :
1569 : /* Encode */
1570 0 : fd_bincode_encode_ctx_t ctx;
1571 0 : ctx.data = tmp->encoded_val;
1572 0 : ctx.dataend = tmp->encoded_val + PACKET_DATA_SIZE;
1573 0 : if ( fd_crds_value_encode( tmp->crd, &ctx ) ) {
1574 0 : FD_LOG_ERR(("fd_crds_value_encode failed"));
1575 0 : }
1576 0 : tmp->encoded_len = (ulong)((uchar *)ctx.data - tmp->encoded_val);
1577 :
1578 : /* Get hash */
1579 0 : fd_sha256_init( sha2 );
1580 0 : fd_sha256_append( sha2, tmp->encoded_val, tmp->encoded_len );
1581 0 : fd_sha256_fini( sha2, tmp->val_hash.uc );
1582 :
1583 0 : fd_msg_stats_elem_t * msg_stat = &glob->msg_stats[ tmp->crd->data.discriminant ];
1584 0 : msg_stat->total_cnt++;
1585 0 : msg_stat->bytes_rx_cnt += tmp->encoded_len;
1586 :
1587 : /* Dedup first */
1588 0 : if ( fd_crds_dup_check( glob, &tmp->val_hash, from, tmp->pubkey ) ) {
1589 0 : msg_stat->dups_cnt++;
1590 0 : glob->recv_dup_cnt++;
1591 0 : glob->metrics.recv_crds_duplicate_message[ route ][ tmp->crd->data.discriminant ]++;
1592 0 : continue; /* skip this entry */
1593 0 : }
1594 :
1595 0 : glob->recv_nondup_cnt++;
1596 : /* Sigverify step
1597 : Skip verifying epoch slots because they:
1598 : - are not used anywhere within the client
1599 : - still store them in table for forwarding +
1600 : prune message construction
1601 : - represent a significant portion of inbound CRDS
1602 : traffic (~90%)
1603 : - will be deprecated soon */
1604 0 : if( tmp->crd->data.discriminant != fd_crds_data_enum_epoch_slots &&
1605 0 : fd_crds_sigverify( tmp->encoded_val, tmp->encoded_len, tmp->pubkey ) ) {
1606 0 : INC_RECV_CRDS_DROP_METRIC( INVALID_SIGNATURE );
1607 : /* drop full packet on bad signature
1608 : https://github.com/anza-xyz/agave/commit/d68b5de6c0fc07d60cf9749ae82c2651a549e81b */
1609 0 : return;
1610 0 : }
1611 0 : num_filtered_crds++;
1612 0 : }
1613 :
1614 : /* Insert pass */
1615 0 : for( ulong i = 0; i < num_filtered_crds; ++i ) {
1616 0 : fd_crds_value_processed_t * crd_p = &filtered_crds[ i ];
1617 0 : fd_crds_insert( glob, crd_p );
1618 :
1619 0 : if ( FD_UNLIKELY( glob->need_push_cnt < FD_NEED_PUSH_MAX ) ) {
1620 : /* Remember that I need to push this value */
1621 0 : ulong i = ((glob->need_push_head + (glob->need_push_cnt++)) & (FD_NEED_PUSH_MAX-1U));
1622 0 : fd_hash_copy(glob->need_push + i, &crd_p->val_hash);
1623 0 : glob->metrics.push_crds_queue_cnt = glob->need_push_cnt;
1624 0 : } else {
1625 0 : INC_RECV_CRDS_DROP_METRIC( PUSH_QUEUE_FULL );
1626 0 : }
1627 :
1628 0 : fd_crds_value_t * crd = crd_p->crd;
1629 :
1630 0 : if (crd->data.discriminant == fd_crds_data_enum_contact_info_v2) {
1631 0 : fd_gossip_contact_info_v2_t * info = &crd->data.inner.contact_info_v2;
1632 0 : fd_gossip_socket_addr_t socket_addr;
1633 0 : if( fd_gossip_contact_info_v2_find_proto_ident( info, FD_GOSSIP_SOCKET_TAG_GOSSIP, &socket_addr ) ) {
1634 0 : if( fd_gossip_port_from_socketaddr( &socket_addr ) != 0) {
1635 : /* Remember the peer */
1636 0 : fd_gossip_peer_addr_t pkey;
1637 0 : fd_memset(&pkey, 0, sizeof(pkey));
1638 0 : fd_gossip_from_soladdr(&pkey, &socket_addr);
1639 0 : fd_peer_elem_t * val = fd_peer_table_query(glob->peers, &pkey, NULL);
1640 0 : if (val == NULL) {
1641 0 : if (fd_peer_table_is_full(glob->peers)) {
1642 0 : INC_RECV_CRDS_DROP_METRIC( PEER_TABLE_FULL );
1643 0 : FD_LOG_DEBUG(("too many peers"));
1644 0 : } else {
1645 0 : val = fd_peer_table_insert(glob->peers, &pkey);
1646 :
1647 0 : if ( glob->inactives_cnt >= INACTIVES_MAX ) {
1648 0 : INC_RECV_CRDS_DROP_METRIC( INACTIVES_QUEUE_FULL );
1649 0 : } else if ( fd_active_table_query(glob->actives, &pkey, NULL) == NULL ) {
1650 : /* Queue this peer for later pinging */
1651 0 : fd_gossip_peer_addr_copy(glob->inactives + (glob->inactives_cnt++), &pkey);
1652 0 : }
1653 0 : }
1654 0 : }
1655 0 : if (val != NULL) {
1656 0 : val->wallclock = crd_p->wallclock;
1657 0 : val->stake = 0;
1658 0 : fd_hash_copy(&val->id, &info->from);
1659 0 : } else {
1660 0 : INC_RECV_CRDS_DROP_METRIC( DISCARDED_PEER );
1661 0 : }
1662 0 : }
1663 :
1664 0 : fd_gossip_peer_addr_t peer_addr = { .addr = socket_addr.inner.ip4.addr,
1665 : /* FIXME: hardcode to ip4 inner? */
1666 0 : .port = fd_ushort_bswap( fd_gossip_port_from_socketaddr( &socket_addr ) ) };
1667 0 : if (glob->my_contact.ci->shred_version == 0U && fd_gossip_is_allowed_entrypoint( glob, &peer_addr )) {
1668 0 : FD_LOG_NOTICE(("using shred version %lu", (ulong)crd->data.inner.contact_info_v2.shred_version));
1669 0 : glob->my_contact.ci->shred_version = crd->data.inner.contact_info_v2.shred_version;
1670 0 : }
1671 0 : }
1672 0 : }
1673 :
1674 0 : glob->metrics.gossip_peer_cnt[ FD_METRICS_ENUM_GOSSIP_PEER_STATE_V_INACTIVE_IDX ] = glob->inactives_cnt;
1675 :
1676 : /* Deliver the data upstream */
1677 0 : fd_gossip_unlock( glob );
1678 0 : (*glob->deliver_fun)(&crd->data, glob->deliver_arg);
1679 0 : fd_gossip_lock( glob );
1680 0 : }
1681 :
1682 0 : } FD_SCRATCH_SCOPE_END;
1683 0 : }
1684 : #undef INC_RECV_CRDS_DROP_METRIC
1685 :
1686 : static int
1687 0 : verify_signable_data_with_prefix( fd_gossip_t * glob, fd_gossip_prune_msg_t * msg ) {
1688 0 : fd_gossip_prune_sign_data_with_prefix_t signdata[1] = {0};
1689 0 : signdata->prefix = (uchar *)&FD_GOSSIP_PRUNE_DATA_PREFIX;
1690 0 : signdata->prefix_len = 18UL;
1691 0 : signdata->data.pubkey = msg->data.pubkey;
1692 0 : signdata->data.prunes_len = msg->data.prunes_len;
1693 0 : signdata->data.prunes = msg->data.prunes;
1694 0 : signdata->data.destination = msg->data.destination;
1695 0 : signdata->data.wallclock = msg->data.wallclock;
1696 :
1697 0 : uchar buf[PACKET_DATA_SIZE];
1698 0 : fd_bincode_encode_ctx_t ctx;
1699 0 : ctx.data = buf;
1700 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
1701 0 : if ( fd_gossip_prune_sign_data_with_prefix_encode( signdata, &ctx ) ) {
1702 0 : glob->metrics.handle_prune_fails[ FD_METRICS_ENUM_PRUNE_FAILURE_REASON_V_SIGN_ENCODING_FAILED_IDX ] += 1UL;
1703 0 : FD_LOG_WARNING(("fd_gossip_prune_sign_data_encode failed"));
1704 0 : return 1;
1705 0 : }
1706 :
1707 0 : fd_sha512_t sha[1];
1708 0 : return fd_ed25519_verify( /* msg */ buf,
1709 0 : /* sz */ (ulong)((uchar*)ctx.data - buf),
1710 0 : /* sig */ msg->data.signature.uc,
1711 0 : /* public_key */ msg->data.pubkey.uc,
1712 0 : sha );
1713 0 : }
1714 :
1715 : static int
1716 0 : verify_signable_data( fd_gossip_t * glob, fd_gossip_prune_msg_t * msg ) {
1717 0 : fd_gossip_prune_sign_data_t signdata;
1718 0 : signdata.pubkey = msg->data.pubkey;
1719 0 : signdata.prunes_len = msg->data.prunes_len;
1720 0 : signdata.prunes = msg->data.prunes;
1721 0 : signdata.destination = msg->data.destination;
1722 0 : signdata.wallclock = msg->data.wallclock;
1723 :
1724 0 : uchar buf[PACKET_DATA_SIZE];
1725 0 : fd_bincode_encode_ctx_t ctx;
1726 0 : ctx.data = buf;
1727 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
1728 0 : if ( fd_gossip_prune_sign_data_encode( &signdata, &ctx ) ) {
1729 0 : glob->metrics.handle_prune_fails[ FD_METRICS_ENUM_PRUNE_FAILURE_REASON_V_SIGN_ENCODING_FAILED_IDX ] += 1UL;
1730 0 : FD_LOG_WARNING(("fd_gossip_prune_sign_data_encode failed"));
1731 0 : return 1;
1732 0 : }
1733 :
1734 0 : fd_sha512_t sha[1];
1735 0 : return fd_ed25519_verify( /* msg */ buf,
1736 0 : /* sz */ (ulong)((uchar*)ctx.data - buf),
1737 0 : /* sig */ msg->data.signature.uc,
1738 0 : /* public_key */ msg->data.pubkey.uc,
1739 0 : sha );
1740 0 : }
1741 :
1742 : /* Handle a prune request from somebody else */
1743 : static void
1744 0 : fd_gossip_handle_prune(fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_gossip_prune_msg_t * msg) {
1745 0 : (void)from;
1746 :
1747 : /* Confirm the message is for me */
1748 0 : if (memcmp(msg->data.destination.uc, glob->public_key->uc, 32U) != 0)
1749 0 : return;
1750 :
1751 : /* Try to verify the signed data either with the prefix and not the prefix */
1752 0 : if ( ! ( verify_signable_data( glob, msg ) == FD_ED25519_SUCCESS ||
1753 0 : verify_signable_data_with_prefix( glob, msg ) == FD_ED25519_SUCCESS ) ) {
1754 0 : glob->metrics.handle_prune_fails[ FD_METRICS_ENUM_PRUNE_FAILURE_REASON_V_INVALID_SIGNATURE_IDX ] += 1UL;
1755 0 : FD_LOG_WARNING(( "received prune message with invalid signature" ));
1756 0 : return;
1757 0 : }
1758 :
1759 : /* Find the active push state which needs to be pruned */
1760 0 : fd_push_state_t* ps = NULL;
1761 0 : for (ulong i = 0; i < glob->push_states_cnt; ++i) {
1762 0 : fd_push_state_t* s = glob->push_states[i];
1763 0 : if (memcmp(msg->data.pubkey.uc, s->id.uc, 32U) == 0) {
1764 0 : ps = s;
1765 0 : break;
1766 0 : }
1767 0 : }
1768 0 : if (ps == NULL)
1769 0 : return;
1770 :
1771 : /* Set the bloom filter prune bits */
1772 0 : for (ulong i = 0; i < msg->data.prunes_len; ++i) {
1773 0 : fd_pubkey_t * p = msg->data.prunes + i;
1774 0 : for (ulong j = 0; j < FD_PRUNE_NUM_KEYS; ++j) {
1775 0 : ulong pos = fd_gossip_bloom_pos(p, ps->prune_keys[j], FD_PRUNE_NUM_BITS);
1776 0 : ulong * j = ps->prune_bits + (pos>>6U); /* divide by 64 */
1777 0 : ulong bit = 1UL<<(pos & 63U);
1778 0 : *j |= bit;
1779 0 : }
1780 0 : }
1781 0 : }
1782 :
1783 : static int
1784 : fd_gossip_push_value_nolock( fd_gossip_t * glob, fd_crds_data_t * data, fd_hash_t * key_opt );
1785 :
1786 : /* Push an updated version of my contact info into values */
1787 : static void
1788 0 : fd_gossip_push_updated_contact(fd_gossip_t * glob) {
1789 : /* See if we have a shred version yet */
1790 0 : if (glob->my_contact.ci->shred_version == 0U)
1791 0 : return;
1792 : /* Update every 1 secs */
1793 0 : if (glob->now - glob->last_contact_time < (long)1e9)
1794 0 : return;
1795 :
1796 0 : if (glob->last_contact_time != 0) {
1797 0 : fd_value_elem_t * ele = fd_value_table_query(glob->values, &glob->last_contact_info_v2_key, NULL);
1798 0 : if (ele != NULL) {
1799 0 : fd_value_table_remove( glob->values, &glob->last_contact_info_v2_key );
1800 0 : }
1801 0 : }
1802 :
1803 0 : glob->last_contact_time = glob->now;
1804 0 : glob->my_contact.ci->wallclock = FD_NANOSEC_TO_MILLI(glob->now);
1805 0 : fd_gossip_push_value_nolock(glob, &glob->my_contact.crd, &glob->last_contact_info_v2_key);
1806 0 : }
1807 :
1808 : /* Respond to a pull request */
1809 : static void
1810 0 : fd_gossip_handle_pull_req(fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_gossip_pull_req_t * msg) {
1811 0 : #define INC_HANDLE_PULL_REQ_FAIL_METRIC( REASON ) glob->metrics.handle_pull_req_fails[ FD_CONCAT3( FD_METRICS_ENUM_PULL_REQ_FAIL_REASON_V_, REASON, _IDX ) ] += 1UL
1812 0 : fd_active_elem_t * val = fd_active_table_query(glob->actives, from, NULL);
1813 0 : if (val == NULL || val->pongtime == 0) {
1814 :
1815 0 : if ( val == NULL ) {
1816 0 : INC_HANDLE_PULL_REQ_FAIL_METRIC( PEER_NOT_IN_ACTIVES );
1817 0 : }
1818 0 : else if ( val->pongtime == 0 ) {
1819 0 : INC_HANDLE_PULL_REQ_FAIL_METRIC( UNRESPONSIVE_PEER );
1820 0 : }
1821 :
1822 : /* Ping new peers before responding to requests */
1823 : /* TODO: is this the right thing to do here? */
1824 0 : if (fd_pending_pool_free( glob->event_pool ) < 100U) {
1825 0 : INC_HANDLE_PULL_REQ_FAIL_METRIC( PENDING_POOL_FULL );
1826 0 : return;
1827 0 : }
1828 0 : fd_pending_event_arg_t arg2;
1829 0 : fd_gossip_peer_addr_copy(&arg2.key, from);
1830 0 : fd_gossip_make_ping(glob, &arg2);
1831 0 : return;
1832 0 : }
1833 :
1834 : /* Encode an empty pull response as a template */
1835 0 : fd_gossip_msg_t gmsg;
1836 0 : fd_gossip_msg_new_disc(&gmsg, fd_gossip_msg_enum_pull_resp);
1837 0 : fd_gossip_pull_resp_t * pull_resp = &gmsg.inner.pull_resp;
1838 0 : fd_hash_copy( &pull_resp->pubkey, glob->public_key );
1839 :
1840 0 : uchar buf[PACKET_DATA_SIZE];
1841 0 : fd_bincode_encode_ctx_t ctx;
1842 0 : ctx.data = buf;
1843 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
1844 0 : if ( fd_gossip_msg_encode( &gmsg, &ctx ) ) {
1845 0 : INC_HANDLE_PULL_REQ_FAIL_METRIC( ENCODING_FAILED );
1846 0 : FD_LOG_WARNING(("fd_gossip_msg_encode failed"));
1847 0 : return;
1848 0 : }
1849 : /* Reach into buffer to get the number of values */
1850 0 : uchar * newend = (uchar *)ctx.data;
1851 0 : ulong * crds_len = (ulong *)(newend - sizeof(ulong));
1852 :
1853 : /* Push an updated version of my contact info into values */
1854 0 : fd_gossip_push_updated_contact(glob);
1855 :
1856 : /* Apply the bloom filter to my table of values */
1857 0 : fd_crds_filter_t * filter = &msg->filter;
1858 0 : ulong nkeys = filter->filter.keys_len;
1859 0 : ulong * keys = filter->filter.keys;
1860 0 : fd_gossip_bitvec_u64_t * bitvec = &filter->filter.bits;
1861 0 : ulong * bitvec2 = bitvec->bits.vec;
1862 0 : ulong expire = FD_NANOSEC_TO_MILLI(glob->now) - FD_GOSSIP_PULL_TIMEOUT;
1863 0 : ulong hits = 0;
1864 0 : ulong misses = 0;
1865 0 : uint npackets = 0;
1866 0 : for( fd_value_table_iter_t iter = fd_value_table_iter_init( glob->values );
1867 0 : !fd_value_table_iter_done( glob->values, iter );
1868 0 : iter = fd_value_table_iter_next( glob->values, iter ) ) {
1869 0 : fd_value_elem_t * ele = fd_value_table_iter_ele( glob->values, iter );
1870 0 : fd_hash_t * hash = &(ele->key);
1871 0 : if (ele->wallclock < expire)
1872 0 : continue;
1873 : /* Execute the bloom filter */
1874 0 : if (filter->mask_bits != 0U) {
1875 0 : ulong m = (~0UL >> filter->mask_bits);
1876 0 : if ((hash->ul[0] | m) != filter->mask)
1877 0 : continue;
1878 0 : }
1879 0 : int miss = 0;
1880 0 : for (ulong i = 0; i < nkeys; ++i) {
1881 0 : ulong pos = fd_gossip_bloom_pos(hash, keys[i], bitvec->len);
1882 0 : ulong * j = bitvec2 + (pos>>6U); /* divide by 64 */
1883 0 : ulong bit = 1UL<<(pos & 63U);
1884 0 : if (!((*j) & bit)) {
1885 0 : miss = 1;
1886 0 : break;
1887 0 : }
1888 0 : }
1889 0 : if (!miss) {
1890 0 : hits++;
1891 0 : continue;
1892 0 : }
1893 0 : misses++;
1894 :
1895 : /* Add the value in already encoded form */
1896 0 : if (newend + ele->datalen - buf > PACKET_DATA_SIZE) {
1897 : /* Packet is getting too large. Flush it */
1898 0 : ulong sz = (ulong)(newend - buf);
1899 0 : fd_gossip_send_raw(glob, from, buf, sz);
1900 0 : glob->metrics.send_message[ FD_METRICS_ENUM_GOSSIP_MESSAGE_V_PULL_RESPONSE_IDX ]++;
1901 0 : char tmp[100];
1902 0 : FD_LOG_DEBUG(("sent msg type %u to %s size=%lu", gmsg.discriminant, fd_gossip_addr_str(tmp, sizeof(tmp), from), sz));
1903 0 : ++npackets;
1904 0 : newend = (uchar *)ctx.data;
1905 0 : *crds_len = 0;
1906 0 : }
1907 0 : fd_memcpy(newend, ele->data, ele->datalen);
1908 0 : newend += ele->datalen;
1909 0 : (*crds_len)++;
1910 0 : }
1911 : /* Record the number of hits and misses
1912 :
1913 : These metrics are imprecise as the numbers will vary
1914 : per pull request. We keep them to surface
1915 : obvious issues like 100% miss rate. */
1916 0 : glob->metrics.handle_pull_req_bloom_filter_result[ FD_METRICS_ENUM_PULL_REQ_BLOOM_FILTER_RESULT_V_HIT_IDX ] += hits;
1917 0 : glob->metrics.handle_pull_req_bloom_filter_result[ FD_METRICS_ENUM_PULL_REQ_BLOOM_FILTER_RESULT_V_MISS_IDX ] += misses;
1918 :
1919 : /* Flush final packet */
1920 0 : if (newend > (uchar *)ctx.data) {
1921 0 : ulong sz = (ulong)(newend - buf);
1922 0 : fd_gossip_send_raw(glob, from, buf, sz);
1923 0 : glob->metrics.send_message[ FD_METRICS_ENUM_GOSSIP_MESSAGE_V_PULL_RESPONSE_IDX ]++;
1924 0 : char tmp[100];
1925 0 : FD_LOG_DEBUG(("sent msg type %u to %s size=%lu", gmsg.discriminant, fd_gossip_addr_str(tmp, sizeof(tmp), from), sz));
1926 0 : ++npackets;
1927 0 : }
1928 :
1929 0 : if (misses) {
1930 0 : FD_LOG_DEBUG(("responded to pull request with %lu values in %u packets (%lu filtered out)", misses, npackets, hits));
1931 0 : }
1932 0 : glob->metrics.handle_pull_req_npackets = npackets;
1933 0 : #undef INC_HANDLE_PULL_REQ_FAIL_METRIC
1934 0 : }
1935 :
1936 :
1937 :
1938 : /* Handle any gossip message */
1939 : static void
1940 0 : fd_gossip_recv(fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_gossip_msg_t * gmsg) {
1941 0 : if ( FD_LIKELY( gmsg->discriminant < FD_METRICS_COUNTER_GOSSIP_RECEIVED_GOSSIP_MESSAGES_CNT ) ) {
1942 0 : glob->metrics.recv_message[gmsg->discriminant] += 1UL;
1943 0 : } else {
1944 0 : glob->metrics.recv_unknown_message += 1UL;
1945 0 : }
1946 0 : switch (gmsg->discriminant) {
1947 0 : case fd_gossip_msg_enum_pull_req:
1948 0 : fd_gossip_handle_pull_req(glob, from, &gmsg->inner.pull_req);
1949 0 : break;
1950 0 : case fd_gossip_msg_enum_pull_resp: {
1951 0 : fd_gossip_pull_resp_t * pull_resp = &gmsg->inner.pull_resp;
1952 0 : fd_gossip_recv_crds_array( glob, NULL, pull_resp->crds, pull_resp->crds_len, FD_GOSSIP_CRDS_ROUTE_PULL_RESP );
1953 0 : break;
1954 0 : }
1955 0 : case fd_gossip_msg_enum_push_msg: {
1956 0 : fd_gossip_push_msg_t * push_msg = &gmsg->inner.push_msg;
1957 0 : fd_gossip_recv_crds_array( glob, from, push_msg->crds, push_msg->crds_len, FD_GOSSIP_CRDS_ROUTE_PUSH );
1958 0 : break;
1959 0 : }
1960 0 : case fd_gossip_msg_enum_prune_msg:
1961 0 : fd_gossip_handle_prune(glob, from, &gmsg->inner.prune_msg);
1962 0 : break;
1963 0 : case fd_gossip_msg_enum_ping:
1964 0 : fd_gossip_handle_ping(glob, from, &gmsg->inner.ping);
1965 0 : break;
1966 0 : case fd_gossip_msg_enum_pong:
1967 0 : fd_gossip_handle_pong(glob, from, &gmsg->inner.pong);
1968 0 : break;
1969 0 : }
1970 0 : }
1971 :
1972 : /* Initiate connection to a peer */
1973 : int
1974 : fd_gossip_add_active_peer( fd_gossip_t * glob,
1975 0 : fd_gossip_peer_addr_t const * addr ) {
1976 0 : fd_gossip_lock( glob );
1977 0 : fd_active_elem_t * val = fd_active_table_query( glob->actives, addr, NULL );
1978 0 : if( val==NULL ) {
1979 0 : if( fd_active_table_is_full( glob->actives )) {
1980 0 : FD_LOG_WARNING(( "too many actives" ));
1981 0 : fd_gossip_unlock( glob );
1982 0 : return -1;
1983 0 : }
1984 0 : val = fd_active_table_insert( glob->actives, addr );
1985 0 : fd_active_new_value( val );
1986 0 : val->pingcount = 0; /* Incremented in fd_gossip_make_ping */
1987 0 : }
1988 0 : fd_gossip_unlock( glob );
1989 0 : return 0;
1990 0 : }
1991 :
1992 : /* Improve the set of active push states */
1993 : static void
1994 0 : fd_gossip_refresh_push_states( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
1995 0 : (void)arg;
1996 :
1997 : /* Try again in 20 sec */
1998 0 : fd_pending_event_t * ev = fd_gossip_add_pending(glob, glob->now + (long)20e9);
1999 0 : if (ev) {
2000 0 : ev->fun = fd_gossip_refresh_push_states;
2001 0 : }
2002 :
2003 : /* Delete states which no longer have active peers */
2004 0 : for (ulong i = 0; i < glob->push_states_cnt; ++i) {
2005 0 : fd_push_state_t* s = glob->push_states[i];
2006 0 : if (fd_active_table_query(glob->actives, &s->addr, NULL) == NULL) {
2007 0 : fd_push_states_pool_ele_release(glob->push_states_pool, glob->push_states[i]);
2008 : /* Replace with the one at the end */
2009 0 : glob->push_states[i--] = glob->push_states[--(glob->push_states_cnt)];
2010 0 : }
2011 0 : }
2012 0 : if (glob->push_states_cnt == FD_PUSH_LIST_MAX) {
2013 : /* Delete the worst destination based prune count */
2014 0 : fd_push_state_t * worst_s = glob->push_states[0];
2015 0 : ulong worst_i = 0;
2016 0 : for (ulong i = 1; i < glob->push_states_cnt; ++i) {
2017 0 : fd_push_state_t* s = glob->push_states[i];
2018 0 : if (s->drop_cnt > worst_s->drop_cnt) {
2019 0 : worst_s = s;
2020 0 : worst_i = i;
2021 0 : }
2022 0 : }
2023 0 : fd_push_states_pool_ele_release(glob->push_states_pool, worst_s);
2024 : /* Replace with the one at the end */
2025 0 : glob->push_states[worst_i] = glob->push_states[--(glob->push_states_cnt)];
2026 0 : }
2027 :
2028 : /* Add random actives as new pushers */
2029 0 : int failcnt = 0;
2030 0 : while (glob->push_states_cnt < FD_PUSH_LIST_MAX && failcnt < 5) {
2031 0 : fd_active_elem_t * a = fd_gossip_random_active( glob );
2032 0 : if( a == NULL ) break;
2033 :
2034 0 : for (ulong i = 0; i < glob->push_states_cnt; ++i) {
2035 0 : fd_push_state_t* s = glob->push_states[i];
2036 0 : if (fd_gossip_peer_addr_eq(&s->addr, &a->key))
2037 0 : goto skipadd;
2038 0 : }
2039 0 : failcnt = 0;
2040 :
2041 : /* Build the pusher state */
2042 0 : fd_push_state_t * s = fd_push_states_pool_ele_acquire(glob->push_states_pool);
2043 0 : fd_memset(s, 0, sizeof(fd_push_state_t));
2044 0 : fd_gossip_peer_addr_copy(&s->addr, &a->key);
2045 0 : fd_hash_copy(&s->id, &a->id);
2046 0 : for (ulong j = 0; j < FD_PRUNE_NUM_KEYS; ++j)
2047 0 : s->prune_keys[j] = fd_rng_ulong(glob->rng);
2048 :
2049 : /* Encode an empty push msg template */
2050 0 : fd_gossip_msg_t gmsg[1] = {0};
2051 0 : fd_gossip_msg_new_disc(gmsg, fd_gossip_msg_enum_push_msg);
2052 0 : fd_gossip_push_msg_t * push_msg = &gmsg->inner.push_msg;
2053 0 : fd_hash_copy( &push_msg->pubkey, glob->public_key );
2054 0 : fd_bincode_encode_ctx_t ctx;
2055 0 : ctx.data = s->packet;
2056 0 : ctx.dataend = s->packet + PACKET_DATA_SIZE;
2057 0 : if ( fd_gossip_msg_encode( gmsg, &ctx ) ) {
2058 0 : FD_LOG_ERR(("fd_gossip_msg_encode failed"));
2059 0 : return;
2060 0 : }
2061 0 : s->packet_end_init = s->packet_end = (uchar *)ctx.data;
2062 :
2063 0 : glob->push_states[glob->push_states_cnt++] = s;
2064 0 : break;
2065 :
2066 0 : skipadd:
2067 0 : ++failcnt;
2068 0 : }
2069 :
2070 0 : glob->metrics.active_push_destinations = glob->push_states_cnt;
2071 0 : glob->metrics.refresh_push_states_failcnt = (ulong)failcnt;
2072 0 : }
2073 :
2074 : /* Push the latest values */
2075 : static void
2076 0 : fd_gossip_push( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
2077 0 : (void)arg;
2078 :
2079 : /* Try again in 100 msec */
2080 0 : fd_pending_event_t * ev = fd_gossip_add_pending(glob, glob->now + (long)1e8);
2081 0 : if (ev) {
2082 0 : ev->fun = fd_gossip_push;
2083 0 : }
2084 :
2085 : /* Push an updated version of my contact info into values */
2086 0 : fd_gossip_push_updated_contact(glob);
2087 :
2088 : /* Iterate across recent values */
2089 0 : ulong expire = FD_NANOSEC_TO_MILLI(glob->now) - FD_GOSSIP_PULL_TIMEOUT;
2090 0 : while (glob->need_push_cnt > 0) {
2091 0 : fd_hash_t * h = glob->need_push + ((glob->need_push_head++) & (FD_NEED_PUSH_MAX-1));
2092 0 : glob->need_push_cnt--;
2093 :
2094 0 : fd_value_elem_t * msg = fd_value_table_query(glob->values, h, NULL);
2095 0 : if (msg == NULL || msg->wallclock < expire)
2096 0 : continue;
2097 :
2098 : /* Iterate across push states */
2099 0 : ulong npush = 0;
2100 0 : for (ulong i = 0; i < glob->push_states_cnt && npush < FD_PUSH_VALUE_MAX; ++i) {
2101 0 : fd_push_state_t* s = glob->push_states[i];
2102 :
2103 : /* Apply the pruning bloom filter */
2104 0 : int pass = 0;
2105 0 : for (ulong j = 0; j < FD_PRUNE_NUM_KEYS; ++j) {
2106 0 : ulong pos = fd_gossip_bloom_pos(&msg->origin, s->prune_keys[j], FD_PRUNE_NUM_BITS);
2107 0 : ulong * j = s->prune_bits + (pos>>6U); /* divide by 64 */
2108 0 : ulong bit = 1UL<<(pos & 63U);
2109 0 : if (!(*j & bit)) {
2110 0 : pass = 1;
2111 0 : break;
2112 0 : }
2113 0 : }
2114 0 : if (!pass) {
2115 0 : s->drop_cnt++;
2116 0 : glob->not_push_cnt++;
2117 0 : continue;
2118 0 : }
2119 0 : glob->push_cnt++;
2120 0 : npush++;
2121 0 : glob->metrics.push_crds[ (uint)msg->data[sizeof(fd_signature_t)] ]++; /* discriminant */
2122 :
2123 0 : ulong * crds_len = (ulong *)(s->packet_end_init - sizeof(ulong));
2124 : /* Add the value in already encoded form */
2125 0 : if (s->packet_end + msg->datalen - s->packet > PACKET_DATA_SIZE) {
2126 : /* Packet is getting too large. Flush it */
2127 0 : ulong sz = (ulong)(s->packet_end - s->packet);
2128 0 : glob->metrics.send_message[ FD_METRICS_ENUM_GOSSIP_MESSAGE_V_PUSH_IDX ]++;
2129 0 : fd_gossip_send_raw(glob, &s->addr, s->packet, sz);
2130 0 : char tmp[100];
2131 0 : FD_LOG_DEBUG(("push to %s size=%lu", fd_gossip_addr_str(tmp, sizeof(tmp), &s->addr), sz));
2132 0 : s->packet_end = s->packet_end_init;
2133 0 : *crds_len = 0;
2134 0 : }
2135 0 : fd_memcpy(s->packet_end, msg->data, msg->datalen);
2136 0 : s->packet_end += msg->datalen;
2137 0 : (*crds_len)++;
2138 0 : }
2139 0 : }
2140 :
2141 : /* Flush partially full packets */
2142 0 : for (ulong i = 0; i < glob->push_states_cnt; ++i) {
2143 0 : fd_push_state_t* s = glob->push_states[i];
2144 0 : if (s->packet_end != s->packet_end_init) {
2145 0 : ulong * crds_len = (ulong *)(s->packet_end_init - sizeof(ulong));
2146 0 : ulong sz = (ulong)(s->packet_end - s->packet);
2147 0 : fd_gossip_send_raw(glob, &s->addr, s->packet, sz);
2148 0 : glob->metrics.send_message[ FD_METRICS_ENUM_GOSSIP_MESSAGE_V_PUSH_IDX ]++;
2149 0 : char tmp[100];
2150 0 : FD_LOG_DEBUG(("push to %s size=%lu", fd_gossip_addr_str(tmp, sizeof(tmp), &s->addr), sz));
2151 0 : s->packet_end = s->packet_end_init;
2152 0 : *crds_len = 0;
2153 0 : }
2154 0 : }
2155 0 : }
2156 :
2157 : /* Publish an outgoing value. The source id and wallclock are set by this function */
2158 : static int
2159 0 : fd_gossip_push_value_nolock( fd_gossip_t * glob, fd_crds_data_t * data, fd_hash_t * key_opt ) {
2160 0 : #define INC_PUSH_CRDS_DROP_METRIC( REASON ) \
2161 0 : glob->metrics.push_crds_drop_reason[ FD_CONCAT3( FD_METRICS_ENUM_CRDS_DROP_REASON_V_, REASON , _IDX ) ] += 1UL
2162 :
2163 0 : if ( FD_UNLIKELY( data->discriminant >= FD_KNOWN_CRDS_ENUM_MAX ) ) {
2164 0 : INC_PUSH_CRDS_DROP_METRIC( UNKNOWN_DISCRIMINANT );
2165 0 : return -1;
2166 0 : }
2167 :
2168 :
2169 : /* Wrap the data in a value stub. Sign it. */
2170 0 : fd_crds_value_t crd;
2171 0 : fd_memcpy(&crd.data, data, sizeof(fd_crds_data_t));
2172 0 : fd_gossip_sign_crds_value(glob, &crd);
2173 :
2174 : /* Perform the value hash to get the value table key */
2175 0 : uchar buf[PACKET_DATA_SIZE];
2176 0 : fd_bincode_encode_ctx_t ctx;
2177 0 : ctx.data = buf;
2178 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
2179 0 : if ( fd_crds_value_encode( &crd, &ctx ) ) {
2180 0 : INC_PUSH_CRDS_DROP_METRIC( ENCODING_FAILED );
2181 0 : FD_LOG_ERR(("fd_crds_value_encode failed"));
2182 0 : return -1;
2183 0 : }
2184 0 : fd_sha256_t sha2[1];
2185 0 : fd_sha256_init( sha2 );
2186 0 : ulong datalen = (ulong)((uchar*)ctx.data - buf);
2187 0 : fd_sha256_append( sha2, buf, datalen );
2188 0 : fd_hash_t key;
2189 0 : fd_sha256_fini( sha2, key.uc );
2190 0 : if ( key_opt != NULL )
2191 0 : fd_hash_copy( key_opt, &key );
2192 :
2193 : /* Store the value for later pushing/duplicate detection */
2194 0 : fd_value_elem_t * msg = fd_value_table_query(glob->values, &key, NULL);
2195 0 : if (msg != NULL) {
2196 : /* Already have this value, which is strange!
2197 : NOTE: This is a different list from duplicate crds values received
2198 : from the network (see metrics.recv_crds_duplicate_message).
2199 : Reaching this path implies a crds value generated internally was
2200 : detected as a duplicate. */
2201 0 : glob->metrics.push_crds_duplicate[ data->discriminant ] += 1UL;
2202 0 : return -1;
2203 0 : }
2204 0 : if (fd_value_table_is_full(glob->values)) {
2205 0 : INC_PUSH_CRDS_DROP_METRIC( TABLE_FULL );
2206 0 : FD_LOG_DEBUG(("too many values"));
2207 0 : return -1;
2208 0 : }
2209 0 : msg = fd_value_table_insert(glob->values, &key);
2210 0 : msg->wallclock = FD_NANOSEC_TO_MILLI(glob->now); /* convert to ms */
2211 0 : fd_hash_copy(&msg->origin, glob->public_key);
2212 :
2213 : /* We store the serialized form for convenience */
2214 0 : fd_memcpy(msg->data, buf, datalen);
2215 0 : msg->datalen = datalen;
2216 :
2217 0 : if (glob->need_push_cnt < FD_NEED_PUSH_MAX) {
2218 : /* Remember that I need to push this value */
2219 0 : ulong i = ((glob->need_push_head + (glob->need_push_cnt++)) & (FD_NEED_PUSH_MAX-1U));
2220 0 : fd_hash_copy(glob->need_push + i, &key);
2221 0 : } else {
2222 0 : INC_PUSH_CRDS_DROP_METRIC( PUSH_QUEUE_FULL );
2223 0 : }
2224 :
2225 0 : glob->metrics.push_crds_queue_cnt = glob->need_push_cnt;
2226 0 : glob->metrics.push_crds[ data->discriminant ] += 1UL;
2227 0 : return 0;
2228 :
2229 0 : #undef INC_PUSH_CRDS_DROP_METRIC
2230 0 : }
2231 :
2232 : int
2233 0 : fd_gossip_push_value( fd_gossip_t * glob, fd_crds_data_t * data, fd_hash_t * key_opt ) {
2234 0 : fd_gossip_lock( glob );
2235 0 : int rc = fd_gossip_push_value_nolock( glob, data, key_opt );
2236 0 : fd_gossip_unlock( glob );
2237 0 : return rc;
2238 0 : }
2239 :
2240 : /* Periodically make prune messages */
2241 : static void
2242 0 : fd_gossip_make_prune( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
2243 0 : (void)arg;
2244 :
2245 : /* Try again in 30 sec */
2246 0 : fd_pending_event_t * ev = fd_gossip_add_pending(glob, glob->now + (long)30e9);
2247 0 : if (ev) {
2248 0 : ev->fun = fd_gossip_make_prune;
2249 0 : }
2250 :
2251 0 : long expire = glob->now - (long)FD_GOSSIP_VALUE_EXPIRE*((long)1e6);
2252 0 : for( fd_stats_table_iter_t iter = fd_stats_table_iter_init( glob->stats );
2253 0 : !fd_stats_table_iter_done( glob->stats, iter );
2254 0 : iter = fd_stats_table_iter_next( glob->stats, iter ) ) {
2255 0 : fd_stats_elem_t * ele = fd_stats_table_iter_ele( glob->stats, iter );
2256 0 : if (ele->last < expire) {
2257 : /* Entry hasn't been updated for a long time */
2258 0 : glob->metrics.make_prune_stale_entry += 1UL;
2259 0 : fd_stats_table_remove( glob->stats, &ele->key );
2260 0 : continue;
2261 0 : }
2262 : /* Look for high duplicate counts */
2263 0 : fd_pubkey_t origins[8];
2264 0 : ulong origins_cnt = 0;
2265 0 : for (ulong i = 0; i < ele->dups_cnt; ++i) {
2266 0 : if (ele->dups[i].cnt >= 20U) {
2267 0 : fd_hash_copy(&origins[origins_cnt++], &ele->dups[i].origin);
2268 0 : glob->metrics.make_prune_high_duplicates += 1UL;
2269 0 : }
2270 0 : }
2271 0 : glob->metrics.make_prune_requested_origins = origins_cnt;
2272 0 : if (origins_cnt == 0U)
2273 0 : continue;
2274 : /* Get the peer id */
2275 0 : fd_peer_elem_t * peerval = fd_peer_table_query(glob->peers, &ele->key, NULL);
2276 : /* Always clean up to restart the dup counter */
2277 0 : fd_stats_table_remove( glob->stats, &ele->key );
2278 0 : if (peerval == NULL)
2279 0 : continue;
2280 :
2281 0 : char keystr[ FD_BASE58_ENCODED_32_SZ ];
2282 0 : fd_base58_encode_32( peerval->id.uc, NULL, keystr );
2283 0 : FD_LOG_DEBUG(("sending prune request for %lu origins to %s", origins_cnt, keystr));
2284 :
2285 : /* Make a prune request */
2286 0 : fd_gossip_msg_t gmsg;
2287 0 : fd_gossip_msg_new_disc(&gmsg, fd_gossip_msg_enum_prune_msg);
2288 0 : fd_gossip_prune_msg_t * prune_msg = &gmsg.inner.prune_msg;
2289 0 : fd_hash_copy(&prune_msg->data.pubkey, glob->public_key);
2290 0 : prune_msg->data.prunes_len = origins_cnt;
2291 0 : prune_msg->data.prunes = origins;;
2292 0 : fd_hash_copy(&prune_msg->data.destination, &peerval->id);
2293 0 : ulong wc = prune_msg->data.wallclock = FD_NANOSEC_TO_MILLI(glob->now);
2294 :
2295 0 : fd_gossip_prune_sign_data_t signdata;
2296 0 : fd_hash_copy(&signdata.pubkey, glob->public_key);
2297 0 : signdata.prunes_len = origins_cnt;
2298 0 : signdata.prunes = origins;
2299 0 : fd_hash_copy(&signdata.destination, &peerval->id);
2300 0 : signdata.wallclock = wc;
2301 :
2302 0 : uchar buf[PACKET_DATA_SIZE];
2303 0 : fd_bincode_encode_ctx_t ctx;
2304 0 : ctx.data = buf;
2305 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
2306 0 : if ( fd_gossip_prune_sign_data_encode( &signdata, &ctx ) ) {
2307 0 : glob->metrics.make_prune_sign_data_encode_failed += 1UL;
2308 0 : FD_LOG_ERR(("fd_gossip_prune_sign_data_encode failed"));
2309 0 : return;
2310 0 : }
2311 :
2312 0 : (*glob->sign_fun)( glob->sign_arg, prune_msg->data.signature.uc, buf, (ulong)((uchar*)ctx.data - buf), FD_KEYGUARD_SIGN_TYPE_ED25519 );
2313 :
2314 0 : fd_gossip_send(glob, &peerval->key, &gmsg);
2315 0 : }
2316 0 : }
2317 :
2318 : /* Periodically log status. Removes old peers as a side event. */
2319 : static void
2320 0 : fd_gossip_log_stats( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
2321 0 : (void)arg;
2322 :
2323 : /* Try again in 60 sec */
2324 0 : fd_pending_event_t * ev = fd_gossip_add_pending(glob, glob->now + (long)60e9);
2325 0 : if (ev) {
2326 0 : ev->fun = fd_gossip_log_stats;
2327 0 : }
2328 :
2329 0 : if( glob->recv_pkt_cnt == 0 )
2330 0 : FD_LOG_WARNING(("received no gossip packets!!"));
2331 0 : else
2332 0 : FD_LOG_INFO(("received %lu packets", glob->recv_pkt_cnt));
2333 0 : glob->recv_pkt_cnt = 0;
2334 0 : FD_LOG_INFO(("received %lu dup values and %lu new", glob->recv_dup_cnt, glob->recv_nondup_cnt));
2335 0 : glob->recv_dup_cnt = glob->recv_nondup_cnt = 0;
2336 0 : FD_LOG_INFO(("pushed %lu values and filtered %lu", glob->push_cnt, glob->not_push_cnt));
2337 0 : glob->push_cnt = glob->not_push_cnt = 0;
2338 :
2339 0 : for( ulong i = 0UL; i<FD_KNOWN_CRDS_ENUM_MAX; i++ ) {
2340 0 : FD_LOG_INFO(( "received values - type: %2lu, total: %12lu, dups: %12lu, bytes: %12lu", i, glob->msg_stats[i].total_cnt, glob->msg_stats[i].dups_cnt, glob->msg_stats[i].bytes_rx_cnt ));
2341 0 : }
2342 :
2343 0 : int need_inactive = (glob->inactives_cnt == 0);
2344 :
2345 0 : ulong wc = FD_NANOSEC_TO_MILLI(glob->now);
2346 0 : ulong expire = wc - 4U*FD_GOSSIP_VALUE_EXPIRE;
2347 0 : for( fd_peer_table_iter_t iter = fd_peer_table_iter_init( glob->peers );
2348 0 : !fd_peer_table_iter_done( glob->peers, iter );
2349 0 : iter = fd_peer_table_iter_next( glob->peers, iter ) ) {
2350 0 : fd_peer_elem_t * ele = fd_peer_table_iter_ele( glob->peers, iter );
2351 0 : if (ele->wallclock < expire) {
2352 : /* Peer hasn't been updated for a long time */
2353 0 : fd_peer_table_remove( glob->peers, &ele->key );
2354 0 : continue;
2355 0 : }
2356 0 : fd_active_elem_t * act = fd_active_table_query(glob->actives, &ele->key, NULL);
2357 0 : char buf[100];
2358 0 : char keystr[ FD_BASE58_ENCODED_32_SZ ];
2359 0 : fd_base58_encode_32( ele->id.uc, NULL, keystr );
2360 0 : FD_LOG_DEBUG(("peer at %s id %s age %.3f %s",
2361 0 : fd_gossip_addr_str(buf, sizeof(buf), &ele->key),
2362 0 : keystr,
2363 0 : ((double)(wc - ele->wallclock))*0.001,
2364 0 : ((act != NULL && act->pongtime != 0) ? "(active)" : "")));
2365 0 : if (need_inactive && act == NULL && glob->inactives_cnt < INACTIVES_MAX)
2366 0 : fd_gossip_peer_addr_copy(glob->inactives + (glob->inactives_cnt++), &ele->key);
2367 0 : }
2368 :
2369 :
2370 0 : glob->metrics.gossip_peer_cnt[ FD_METRICS_ENUM_GOSSIP_PEER_STATE_V_TOTAL_IDX ] = fd_peer_table_key_cnt( glob->peers );
2371 0 : glob->metrics.gossip_peer_cnt[ FD_METRICS_ENUM_GOSSIP_PEER_STATE_V_ACTIVE_IDX ] = fd_active_table_key_cnt( glob->actives );
2372 0 : glob->metrics.gossip_peer_cnt[ FD_METRICS_ENUM_GOSSIP_PEER_STATE_V_INACTIVE_IDX ] = (ulong)glob->inactives_cnt;
2373 0 : }
2374 :
2375 : /* Set the current protocol time in nanosecs */
2376 : void
2377 0 : fd_gossip_settime( fd_gossip_t * glob, long ts ) {
2378 0 : glob->now = ts;
2379 0 : }
2380 :
2381 : /* Get the current protocol time in nanosecs */
2382 : long
2383 0 : fd_gossip_gettime( fd_gossip_t * glob ) {
2384 0 : return glob->now;
2385 0 : }
2386 :
2387 : /* Start timed events and other protocol behavior */
2388 : int
2389 0 : fd_gossip_start( fd_gossip_t * glob ) {
2390 0 : fd_gossip_lock( glob );
2391 : /* Start pulling and pinging on a timer */
2392 0 : fd_pending_event_t * ev = fd_gossip_add_pending(glob, glob->now + (long)1e9);
2393 0 : ev->fun = fd_gossip_random_pull;
2394 0 : ev = fd_gossip_add_pending(glob, glob->now + (long)5e9);
2395 0 : ev->fun = fd_gossip_random_ping;
2396 0 : ev = fd_gossip_add_pending(glob, glob->now + (long)60e9);
2397 0 : ev->fun = fd_gossip_log_stats;
2398 0 : ev = fd_gossip_add_pending(glob, glob->now + (long)20e9);
2399 0 : ev->fun = fd_gossip_refresh_push_states;
2400 0 : ev = fd_gossip_add_pending(glob, glob->now + (long)1e8);
2401 0 : ev->fun = fd_gossip_push;
2402 0 : ev = fd_gossip_add_pending(glob, glob->now + (long)30e9);
2403 0 : ev->fun = fd_gossip_make_prune;
2404 0 : fd_gossip_unlock( glob );
2405 :
2406 0 : return 0;
2407 0 : }
2408 :
2409 : /* Dispatch timed events and other protocol behavior. This should be
2410 : * called inside the main spin loop. */
2411 : int
2412 0 : fd_gossip_continue( fd_gossip_t * glob ) {
2413 0 : fd_gossip_lock( glob );
2414 0 : do {
2415 0 : fd_pending_event_t * ev = fd_pending_heap_ele_peek_min( glob->event_heap, glob->event_pool );
2416 0 : if (ev == NULL || ev->key > glob->now)
2417 0 : break;
2418 0 : fd_pending_event_t evcopy;
2419 0 : fd_memcpy(&evcopy, ev, sizeof(evcopy));
2420 0 : fd_pending_heap_ele_remove_min( glob->event_heap, glob->event_pool );
2421 0 : fd_pending_pool_ele_release( glob->event_pool, ev );
2422 0 : (*evcopy.fun)(glob, &evcopy.fun_arg);
2423 0 : } while (1);
2424 0 : fd_gossip_unlock( glob );
2425 0 : return 0;
2426 0 : }
2427 :
2428 : /* Pass a raw gossip packet into the protocol. msg_name is the unix socket address of the sender */
2429 : int
2430 0 : fd_gossip_recv_packet( fd_gossip_t * glob, uchar const * msg, ulong msglen, fd_gossip_peer_addr_t const * from ) {
2431 0 : fd_gossip_lock( glob );
2432 0 : FD_SCRATCH_SCOPE_BEGIN {
2433 0 : glob->recv_pkt_cnt++;
2434 0 : glob->metrics.recv_pkt_cnt++;
2435 : /* Deserialize the message */
2436 0 : fd_bincode_decode_ctx_t ctx = {
2437 0 : .data = msg,
2438 0 : .dataend = msg + msglen,
2439 0 : };
2440 :
2441 0 : ulong total_sz = 0UL;
2442 0 : if( FD_UNLIKELY( fd_gossip_msg_decode_footprint( &ctx, &total_sz ) ) ) {
2443 0 : glob->metrics.recv_pkt_corrupted_msg += 1UL;
2444 0 : FD_LOG_WARNING(( "corrupt gossip message" ));
2445 0 : fd_gossip_unlock( glob );
2446 0 : return -1;
2447 0 : }
2448 :
2449 0 : uchar * mem = fd_scratch_alloc( fd_gossip_msg_align(), total_sz );
2450 0 : if( FD_UNLIKELY( !mem ) ) {
2451 0 : FD_LOG_ERR(( "Unable to allocate memory for gossip msg" ));
2452 0 : }
2453 :
2454 0 : fd_gossip_msg_t * gmsg = fd_gossip_msg_decode( mem, &ctx );
2455 :
2456 0 : if( FD_UNLIKELY( ctx.data != ctx.dataend ) ) {
2457 0 : glob->metrics.recv_pkt_corrupted_msg += 1UL;
2458 0 : FD_LOG_WARNING(( "corrupt gossip message" ));
2459 0 : fd_gossip_unlock( glob );
2460 0 : return -1;
2461 0 : }
2462 :
2463 0 : char tmp[100];
2464 :
2465 0 : FD_LOG_DEBUG(( "recv msg type %u from %s", gmsg->discriminant, fd_gossip_addr_str( tmp, sizeof(tmp), from ) ));
2466 0 : fd_gossip_recv( glob, from, gmsg );
2467 :
2468 0 : fd_gossip_unlock( glob );
2469 0 : } FD_SCRATCH_SCOPE_END;
2470 0 : return 0;
2471 0 : }
2472 :
2473 : ushort
2474 0 : fd_gossip_get_shred_version( fd_gossip_t const * glob ) {
2475 0 : return glob->my_contact.ci->shred_version;
2476 0 : }
2477 :
2478 : void
2479 : fd_gossip_set_stake_weights( fd_gossip_t * gossip,
2480 : fd_stake_weight_t const * stake_weights,
2481 0 : ulong stake_weights_cnt ) {
2482 0 : if( stake_weights == NULL ) {
2483 0 : FD_LOG_ERR(( "stake weights NULL" ));
2484 0 : }
2485 :
2486 0 : if( stake_weights_cnt > MAX_STAKE_WEIGHTS ) {
2487 0 : FD_LOG_ERR(( "num stake weights (%lu) is larger than max allowed stake weights", stake_weights_cnt ));
2488 0 : }
2489 :
2490 0 : fd_gossip_lock( gossip );
2491 :
2492 : /* Clear out the table for new stake weights. */
2493 0 : for ( fd_weights_table_iter_t iter = fd_weights_table_iter_init( gossip->weights );
2494 0 : !fd_weights_table_iter_done( gossip->weights, iter);
2495 0 : iter = fd_weights_table_iter_next( gossip->weights, iter ) ) {
2496 0 : fd_weights_elem_t * e = fd_weights_table_iter_ele( gossip->weights, iter );
2497 0 : fd_weights_table_remove( gossip->weights, &e->key );
2498 0 : }
2499 :
2500 0 : for( ulong i = 0; i < stake_weights_cnt; ++i ) {
2501 0 : if( !stake_weights[i].stake ) continue;
2502 0 : fd_weights_elem_t * val = fd_weights_table_insert( gossip->weights, &stake_weights[i].key );
2503 : // Weight is log2(stake)^2
2504 0 : ulong w = (ulong)fd_ulong_find_msb( stake_weights[i].stake ) + 1;
2505 0 : val->weight = w*w;
2506 0 : }
2507 :
2508 0 : for( fd_active_table_iter_t iter = fd_active_table_iter_init( gossip->actives );
2509 0 : !fd_active_table_iter_done( gossip->actives, iter );
2510 0 : iter = fd_active_table_iter_next( gossip->actives, iter ) ) {
2511 0 : fd_active_elem_t * ele = fd_active_table_iter_ele( gossip->actives, iter );
2512 0 : fd_weights_elem_t const * val = fd_weights_table_query_const( gossip->weights, &ele->id, NULL );
2513 0 : ele->weight = ( val == NULL ? 1UL : val->weight );
2514 0 : }
2515 :
2516 0 : fd_gossip_unlock( gossip );
2517 0 : }
2518 :
2519 : void
2520 : fd_gossip_set_entrypoints( fd_gossip_t * gossip,
2521 : fd_ip4_port_t const * entrypoints,
2522 0 : ulong entrypoints_cnt ) {
2523 0 : gossip->entrypoints_cnt = entrypoints_cnt;
2524 0 : for( ulong i=0UL; i<entrypoints_cnt; i++ ) {
2525 0 : FD_LOG_NOTICE(( "gossip initial peer - addr: " FD_IP4_ADDR_FMT ":%u",
2526 0 : FD_IP4_ADDR_FMT_ARGS( entrypoints[i].addr ), fd_ushort_bswap( entrypoints[i].port ) ));
2527 0 : fd_gossip_add_active_peer( gossip, &entrypoints[i] );
2528 0 : gossip->entrypoints[i] = entrypoints[i];
2529 0 : }
2530 0 : }
2531 :
2532 : uint
2533 0 : fd_gossip_is_allowed_entrypoint( fd_gossip_t * gossip, fd_gossip_peer_addr_t * addr ) {
2534 0 : for( ulong i = 0UL; i<gossip->entrypoints_cnt; i++) {
2535 0 : if (fd_gossip_peer_addr_eq( addr, &gossip->entrypoints[i]) ) return 1;
2536 0 : }
2537 0 : return 0;
2538 0 : }
|