Line data Source code
1 : #include "fd_gossip.h"
2 : #include "../../ballet/base58/fd_base58.h"
3 : #include "../../disco/keyguard/fd_keyguard.h"
4 : #include <math.h>
5 : #include "fd_contact_info.h"
6 :
7 : /* Maximum size of a network packet */
8 0 : #define PACKET_DATA_SIZE 1232
9 : /* How long do we remember values (in millisecs) */
10 0 : #define FD_GOSSIP_VALUE_EXPIRE ((ulong)(3600e3)) /* 1 hr */
11 : /* Max age that values can be pushed/pulled (in millisecs) */
12 0 : #define FD_GOSSIP_PULL_TIMEOUT ((ulong)(15e3)) /* 15 seconds */
13 : /* Max number of validators that can be actively pinged */
14 0 : #define FD_ACTIVE_KEY_MAX (1<<8)
15 :
16 : /* Max number of CRDS values that can be remembered.
17 :
18 : As of 2.1.11 on 02/05/2025, Agave value table (approx) sizes on an unstaked node:
19 : | cluster | num unique entries | num total (unique + purged) |
20 : | testnet | ~800k | 1.4m |
21 : | mainnet | ~750k | 1m |
22 :
23 : Purged values are counted because:
24 : - Our table (currently) does not "purge" values in the same sense Agave does
25 : - Purged values are included in bloom filter construction, so we need them anyway */
26 0 : #define FD_VALUE_KEY_MAX (1<<24) // includes purged values
27 0 : #define FD_VALUE_DATA_MAX (1<<21)
28 : /* Max number of pending timed events */
29 0 : #define FD_PENDING_MAX (1<<9)
30 : /* Sample rate of bloom filters. */
31 0 : #define FD_BLOOM_SAMPLE_RATE 8U
32 : /* Number of bloom filter bits in an outgoing pull request packet */
33 0 : #define FD_BLOOM_NUM_BITS (512U*8U) /* 0.5 Kbyte */
34 : /* Max number of bloom filter keys in an outgoing pull request packet */
35 0 : #define FD_BLOOM_MAX_KEYS 32U
36 : /* Max number of packets in an outgoing pull request batch */
37 0 : #define FD_BLOOM_MAX_PACKETS 32U
38 : /* Number of bloom bits in a push prune filter */
39 0 : #define FD_PRUNE_NUM_BITS (512U*8U) /* 0.5 Kbyte */
40 : /* Number of bloom keys in a push prune filter */
41 0 : #define FD_PRUNE_NUM_KEYS 4U
42 : /* Max number of destinations a single message can be pushed */
43 0 : #define FD_PUSH_VALUE_MAX 9
44 : /* Max number of push destinations that we track */
45 0 : #define FD_PUSH_LIST_MAX 12
46 : /* Max length of queue of values that need pushing */
47 0 : #define FD_NEED_PUSH_MAX (1<<12)
48 : /* Max size of receive statistics table */
49 0 : #define FD_STATS_KEY_MAX (1<<8)
50 : /* Sha256 pre-image size for pings/pongs */
51 0 : #define FD_PING_PRE_IMAGE_SZ (48UL)
52 : /* Number of recognized CRDS enum members */
53 0 : #define FD_KNOWN_CRDS_ENUM_MAX (14UL)
54 : /* Prune data prefix
55 : https://github.com/anza-xyz/agave/blob/0c264859b127940f13673b5fea300131a70b1a8d/gossip/src/protocol.rs#L39 */
56 0 : #define FD_GOSSIP_PRUNE_DATA_PREFIX "\xffSOLANA_PRUNE_DATA"
57 :
58 : /* Maximum buffer size for decoding a gossip message. This is a guess,
59 : but it should be large enough to hold the largest message we expect
60 : to receive. The current worst-case estimate is a push/pullresp
61 : message with 2 vote entries, which takes up 2*2144 + 48 ~= 4.2kb.
62 : Provide an order of magnitude for safety, so we use 64k.
63 :
64 : TODO: formally verify this */
65 0 : #define FD_GOSSIP_DECODE_BUFFER_MAX (1 << 16) /* 64k */
66 :
67 : /* Loose estimate of the maximum number of CRDS values a
68 : push/pullresp message can hold. For static allocation
69 : purposes. Derived by taking total packet size (1232b)
70 : divided by the minimum encoded size of a CRDS value
71 : consisting of just a signature (64b) and discriminant (4b).
72 : This does not include the actual CRDS data, so we will
73 : always run into a decode overflow before exceeding this value. */
74 : #define FD_GOSSIP_MAX_CRDS_VALS (PACKET_DATA_SIZE / (64U + 4U))
75 :
76 :
77 0 : #define FD_NANOSEC_TO_MILLI(_ts_) ((ulong)(_ts_/1000000))
78 :
79 : /* Maximum number of stake weights, mirrors fd_stake_ci */
80 0 : #define MAX_STAKE_WEIGHTS (40200UL)
81 :
82 0 : #define MAX_PEER_PING_COUNT (10000U)
83 :
84 : /* Test if two addresses are equal */
85 0 : FD_FN_PURE static int fd_gossip_peer_addr_eq( const fd_gossip_peer_addr_t * key1, const fd_gossip_peer_addr_t * key2 ) {
86 0 : FD_STATIC_ASSERT(sizeof(fd_gossip_peer_addr_t) == sizeof(ulong),"messed up size");
87 0 : return key1->l == key2->l;
88 0 : }
89 :
90 : /* Hash an address */
91 0 : FD_FN_PURE static ulong fd_gossip_peer_addr_hash( const fd_gossip_peer_addr_t * key, ulong seed ) {
92 0 : FD_STATIC_ASSERT(sizeof(fd_gossip_peer_addr_t) == sizeof(ulong),"messed up size");
93 0 : return (key->l + seed + 7242237688154252699UL)*9540121337UL;
94 0 : }
95 :
96 : /* Efficiently copy an address */
97 0 : static void fd_gossip_peer_addr_copy( fd_gossip_peer_addr_t * keyd, const fd_gossip_peer_addr_t * keys ) {
98 0 : FD_STATIC_ASSERT(sizeof(fd_gossip_peer_addr_t) == sizeof(ulong),"messed up size");
99 0 : keyd->l = keys->l;
100 0 : }
101 :
102 : /* All peers table element. The peers table is all known validator addresses/ids. */
103 : struct fd_peer_elem {
104 : fd_gossip_peer_addr_t key;
105 : ulong next;
106 : fd_pubkey_t id; /* Public indentifier */
107 : ulong wallclock; /* last time we heard about this peer */
108 : ulong stake; /* Staking for this validator. Unimplemented. */
109 : };
110 : /* All peers table */
111 : typedef struct fd_peer_elem fd_peer_elem_t;
112 : #define MAP_NAME fd_peer_table
113 : #define MAP_KEY_T fd_gossip_peer_addr_t
114 0 : #define MAP_KEY_EQ fd_gossip_peer_addr_eq
115 0 : #define MAP_KEY_HASH fd_gossip_peer_addr_hash
116 0 : #define MAP_KEY_COPY fd_gossip_peer_addr_copy
117 0 : #define MAP_T fd_peer_elem_t
118 : #include "../../util/tmpl/fd_map_giant.c"
119 :
120 : /* Active table element. This table is all validators that we are
121 : aggressively pinging for liveness checking. */
122 : struct fd_active_elem {
123 : fd_gossip_peer_addr_t key;
124 : ulong next;
125 : fd_pubkey_t id; /* Public indentifier */
126 : long pingtime; /* Last time we sent a ping */
127 : uint pingcount; /* Number of pings it took to get a pong */
128 : fd_hash_t pingtoken; /* Random data used in ping/pong */
129 : long pongtime; /* Last time we received a pong */
130 : ulong weight; /* Selection weight */
131 : };
132 : /* Active table */
133 : typedef struct fd_active_elem fd_active_elem_t;
134 : #define MAP_NAME fd_active_table
135 : #define MAP_KEY_T fd_gossip_peer_addr_t
136 0 : #define MAP_KEY_EQ fd_gossip_peer_addr_eq
137 0 : #define MAP_KEY_HASH fd_gossip_peer_addr_hash
138 0 : #define MAP_KEY_COPY fd_gossip_peer_addr_copy
139 0 : #define MAP_T fd_active_elem_t
140 : #include "../../util/tmpl/fd_map_giant.c"
141 :
142 : /* Initialize an active table element value */
143 : void
144 0 : fd_active_new_value(fd_active_elem_t * val) {
145 0 : val->pingcount = 1;
146 0 : val->pingtime = val->pongtime = 0;
147 0 : val->weight = 0;
148 0 : fd_memset(val->id.uc, 0, 32U);
149 0 : fd_memset(val->pingtoken.uc, 0, 32U);
150 0 : }
151 :
152 : /* Hash a hash value */
153 0 : ulong fd_hash_hash( const fd_hash_t * key, ulong seed ) {
154 0 : return key->ul[0] ^ seed;
155 0 : }
156 :
157 : /************ Gossip Value Table Structures **************/
158 : /* The fd_gossip value table is backed by two structures:
159 : a value metadata map and a value vector. The lifetime
160 : requirement of a full value is much smaller than its
161 : metadata (15s vs 1hr), while a full value has a much
162 : larger size footprint (100x). Decoupling the two
163 : allows for retaining more metadata without significantly
164 : increasing memory footprint and iteration overhead.
165 :
166 :
167 :
168 :
169 : An entry in the vector must have a corresponding entry
170 : in the metadata map, while an entry in the metadata
171 : map may not have an entry in the vector (denoted by a
172 : NULL in the meta_t value ptr). */
173 :
174 : /* Full gossip value representation. Stores the encoded
175 : form of a CRDS value and other metadata. */
176 : struct fd_value {
177 : fd_hash_t key; /* Hash of the value data */
178 : ulong wallclock; /* Original timestamp of value in millis */
179 : fd_pubkey_t origin; /* Where did this value originate */
180 : uchar data[PACKET_DATA_SIZE]; /* Serialized form of value (bincode) including signature */
181 : ulong datalen;
182 : ulong del; /* Set to queue for deletion in fd_gossip_cleanup */
183 : };
184 :
185 : typedef struct fd_value fd_value_t;
186 :
187 0 : #define CRDS_DROP_REASON_IDX( REASON ) FD_CONCAT3( FD_METRICS_ENUM_CRDS_DROP_REASON_V_, REASON, _IDX )
188 : static inline int
189 : fd_value_from_crds( fd_value_t * val,
190 0 : fd_crds_value_t const * crd ) {
191 : /* OK to reuse since sha256_init is called */
192 0 : static fd_sha256_t sha2[1];
193 0 : val->del = 0;
194 0 : switch( crd->data.discriminant ) {
195 0 : case fd_crds_data_enum_contact_info_v1:
196 0 : val->origin = crd->data.inner.contact_info_v1.id;
197 0 : val->wallclock = crd->data.inner.contact_info_v1.wallclock;
198 0 : break;
199 0 : case fd_crds_data_enum_vote:
200 0 : val->origin = crd->data.inner.vote.from;
201 0 : val->wallclock = crd->data.inner.vote.wallclock;
202 0 : break;
203 0 : case fd_crds_data_enum_lowest_slot:
204 0 : val->origin = crd->data.inner.lowest_slot.from;
205 0 : val->wallclock = crd->data.inner.lowest_slot.wallclock;
206 0 : break;
207 0 : case fd_crds_data_enum_snapshot_hashes:
208 0 : val->origin = crd->data.inner.snapshot_hashes.from;
209 0 : val->wallclock = crd->data.inner.snapshot_hashes.wallclock;
210 0 : break;
211 0 : case fd_crds_data_enum_accounts_hashes:
212 0 : val->origin = crd->data.inner.accounts_hashes.from;
213 0 : val->wallclock = crd->data.inner.accounts_hashes.wallclock;
214 0 : break;
215 0 : case fd_crds_data_enum_epoch_slots:
216 0 : val->origin = crd->data.inner.epoch_slots.from;
217 0 : val->wallclock = crd->data.inner.epoch_slots.wallclock;
218 0 : break;
219 0 : case fd_crds_data_enum_version_v1:
220 0 : val->origin = crd->data.inner.version_v1.from;
221 0 : val->wallclock = crd->data.inner.version_v1.wallclock;
222 0 : break;
223 0 : case fd_crds_data_enum_version_v2:
224 0 : val->origin = crd->data.inner.version_v2.from;
225 0 : val->wallclock = crd->data.inner.version_v2.wallclock;
226 0 : break;
227 0 : case fd_crds_data_enum_node_instance:
228 0 : val->origin = crd->data.inner.node_instance.from;
229 0 : val->wallclock = crd->data.inner.node_instance.wallclock;
230 0 : break;
231 0 : case fd_crds_data_enum_duplicate_shred:
232 0 : val->origin = crd->data.inner.duplicate_shred.from;
233 0 : val->wallclock = crd->data.inner.duplicate_shred.wallclock;
234 0 : break;
235 0 : case fd_crds_data_enum_incremental_snapshot_hashes:
236 0 : val->origin = crd->data.inner.incremental_snapshot_hashes.from;
237 0 : val->wallclock = crd->data.inner.incremental_snapshot_hashes.wallclock;
238 0 : break;
239 0 : case fd_crds_data_enum_contact_info_v2:
240 0 : val->origin = crd->data.inner.contact_info_v2.from;
241 0 : val->wallclock = crd->data.inner.contact_info_v2.wallclock;
242 0 : break;
243 0 : case fd_crds_data_enum_restart_last_voted_fork_slots:
244 0 : val->origin = crd->data.inner.restart_last_voted_fork_slots.from;
245 0 : val->wallclock = crd->data.inner.restart_last_voted_fork_slots.wallclock;
246 0 : break;
247 0 : case fd_crds_data_enum_restart_heaviest_fork:
248 0 : val->origin = crd->data.inner.restart_heaviest_fork.from;
249 0 : val->wallclock = crd->data.inner.restart_heaviest_fork.wallclock;
250 0 : break;
251 0 : default:
252 0 : return CRDS_DROP_REASON_IDX( UNKNOWN_DISCRIMINANT );
253 0 : }
254 :
255 : /* Encode */
256 0 : fd_bincode_encode_ctx_t ctx;
257 0 : ctx.data = val->data;
258 0 : ctx.dataend = val->data + PACKET_DATA_SIZE;
259 0 : if( fd_crds_value_encode( crd, &ctx ) ) {
260 0 : FD_LOG_ERR(("fd_crds_value_encode failed"));
261 0 : }
262 0 : val->datalen = (ulong)((uchar *)ctx.data - val->data);
263 :
264 : /* Get hash */
265 0 : fd_sha256_init( sha2 );
266 0 : fd_sha256_append( sha2, val->data, val->datalen );
267 0 : fd_sha256_fini( sha2, val->key.uc );
268 :
269 0 : return 0;
270 0 : }
271 :
272 : /* Value vector that:
273 : - backs the values pointed by fd_value_meta_t->value
274 : - is used in generating push and pull resp
275 : messages */
276 : #define VEC_NAME fd_value_vec
277 : #define VEC_T fd_value_t
278 : #include "../../util/tmpl/fd_vec.c"
279 :
280 : /* Minimized form of fd_value that only holds metadata */
281 : struct fd_value_meta {
282 : fd_hash_t key; /* Hash of the value data, also functions as map key */
283 : ulong wallclock; /* Timestamp of value (millis) */
284 : fd_value_t * value; /* Pointer to the actual value element (backed by the value vector) */
285 : ulong next;
286 : };
287 : typedef struct fd_value_meta fd_value_meta_t;
288 :
289 : /* Value map, holds hashes of processed CRDS entries.
290 : Used in pull request generation and de-duplication.
291 : Also holds pointer to corresponding element in
292 : value vec, if available. */
293 : #define MAP_NAME fd_value_meta_map
294 : #define MAP_KEY_T fd_hash_t
295 0 : #define MAP_KEY_EQ(a,b) (0==memcmp( (a),(b),sizeof(fd_hash_t) ))
296 0 : #define MAP_KEY_HASH fd_hash_hash
297 0 : #define MAP_T fd_value_meta_t
298 : #include "../../util/tmpl/fd_map_giant.c"
299 :
300 : static void
301 : fd_value_meta_map_value_init( fd_value_meta_t * meta,
302 : ulong wallclock,
303 0 : fd_value_t * value ) {
304 : /* Key should have been initialized in fd_value_meta_map_insert */
305 0 : meta->wallclock = wallclock;
306 0 : meta->value = value;
307 0 : }
308 :
309 : /* Weights table element. This table stores the weight for each peer
310 : (determined by stake). */
311 : struct fd_weights_elem {
312 : fd_pubkey_t key;
313 : ulong next;
314 : ulong weight;
315 : };
316 : /* Weights table */
317 : typedef struct fd_weights_elem fd_weights_elem_t;
318 : #define MAP_NAME fd_weights_table
319 : #define MAP_KEY_T fd_hash_t
320 0 : #define MAP_KEY_EQ fd_hash_eq
321 0 : #define MAP_KEY_HASH fd_hash_hash
322 0 : #define MAP_T fd_weights_elem_t
323 : #include "../../util/tmpl/fd_map_giant.c"
324 :
325 : /* Queue of pending timed events, stored as a priority heap */
326 : union fd_pending_event_arg {
327 : fd_gossip_peer_addr_t key;
328 : ulong ul;
329 : };
330 : typedef union fd_pending_event_arg fd_pending_event_arg_t;
331 :
332 : static inline fd_pending_event_arg_t
333 0 : fd_pending_event_arg_null( void ) {
334 0 : return (fd_pending_event_arg_t){ .ul=0 };
335 0 : }
336 :
337 : static inline fd_pending_event_arg_t
338 0 : fd_pending_event_arg_peer_addr( fd_gossip_peer_addr_t key ) {
339 0 : return (fd_pending_event_arg_t){ .key=key };
340 0 : }
341 :
342 : typedef void (*fd_pending_event_fun)(struct fd_gossip * glob, fd_pending_event_arg_t * arg);
343 : struct fd_pending_event {
344 : long timeout;
345 : fd_pending_event_fun fun;
346 : fd_pending_event_arg_t fun_arg;
347 : };
348 : typedef struct fd_pending_event fd_pending_event_t;
349 : #define PRQ_NAME fd_pending_heap
350 0 : #define PRQ_T fd_pending_event_t
351 : #include "../../util/tmpl/fd_prq.c"
352 :
353 : /* Data structure representing an active push destination. There are
354 : only a small number of these. */
355 : struct fd_push_state {
356 : fd_gossip_peer_addr_t addr; /* Destination address */
357 : fd_pubkey_t id; /* Public indentifier */
358 : ulong drop_cnt; /* Number of values dropped due to pruning */
359 : ulong prune_keys[FD_PRUNE_NUM_KEYS]; /* Keys used for bloom filter for pruning */
360 : ulong prune_bits[FD_PRUNE_NUM_BITS/64U]; /* Bits table used for bloom filter for pruning */
361 : uchar packet[PACKET_DATA_SIZE]; /* Partially assembled packet containing a fd_gossip_push_msg_t */
362 : uchar * packet_end_init; /* Initial end of the packet when there are zero values */
363 : uchar * packet_end; /* Current end of the packet including values so far */
364 : ulong next;
365 : };
366 : typedef struct fd_push_state fd_push_state_t;
367 :
368 : #define POOL_NAME fd_push_states_pool
369 0 : #define POOL_T fd_push_state_t
370 : #include "../../util/tmpl/fd_pool.c"
371 :
372 0 : #define MAX_DUP_ORIGINS 8U
373 : /* Receive statistics table element. */
374 : struct fd_stats_elem {
375 : fd_gossip_peer_addr_t key; /* Keyed by sender */
376 : ulong next;
377 : long last; /* Timestamp of last update */
378 : /* Duplicate counts by origin */
379 : struct {
380 : fd_pubkey_t origin;
381 : ulong cnt;
382 : } dups[MAX_DUP_ORIGINS];
383 : ulong dups_cnt;
384 : };
385 : /* Receive statistics table. */
386 : typedef struct fd_stats_elem fd_stats_elem_t;
387 : #define MAP_NAME fd_stats_table
388 : #define MAP_KEY_T fd_gossip_peer_addr_t
389 0 : #define MAP_KEY_EQ fd_gossip_peer_addr_eq
390 0 : #define MAP_KEY_HASH fd_gossip_peer_addr_hash
391 0 : #define MAP_KEY_COPY fd_gossip_peer_addr_copy
392 0 : #define MAP_T fd_stats_elem_t
393 : #include "../../util/tmpl/fd_map_giant.c"
394 :
395 : #define SET_NAME fd_gossip_filter_selection
396 : #define SET_MAX FD_BLOOM_MAX_PACKETS
397 : #include "../../util/tmpl/fd_smallset.c"
398 :
399 : struct fd_msg_stats_elem {
400 : ulong bytes_rx_cnt;
401 : ulong total_cnt;
402 : ulong dups_cnt;
403 : };
404 : /* Receive type statistics table. */
405 : typedef struct fd_msg_stats_elem fd_msg_stats_elem_t;
406 :
407 : /* Global data for gossip service */
408 : struct fd_gossip {
409 : /* Concurrency lock */
410 : volatile ulong lock;
411 : /* Current time in nanosecs */
412 : long now;
413 : fd_spad_t * decode_spad; /* For holding CRDS decode artifacts */
414 :
415 : /* My official contact info in the gossip protocol */
416 : fd_contact_info_t my_contact;
417 : /* My public key (ptr to entry in my contact info) */
418 : fd_pubkey_t * public_key;
419 :
420 : /* Function used to deliver gossip messages to the application */
421 : fd_gossip_data_deliver_fun deliver_fun;
422 : /* Argument to fd_gossip_data_deliver_fun */
423 : void * deliver_arg;
424 : /* Function used to send raw packets on the network */
425 : fd_gossip_send_packet_fun send_fun;
426 : /* Argument to fd_gossip_send_packet_fun */
427 : void * send_arg;
428 : /* Function used to send packets for signing to remote tile */
429 : fd_gossip_sign_fun sign_fun;
430 : /* Argument to fd_gossip_sign_fun */
431 : void * sign_arg;
432 :
433 : /* Table of all known validators, keyed by gossip address */
434 : fd_peer_elem_t * peers;
435 : /* Table of validators that we are actively pinging, keyed by gossip address */
436 : fd_active_elem_t * actives;
437 : /* Queue of validators that might be added to actives */
438 : fd_gossip_peer_addr_t * inactives;
439 : ulong inactives_cnt;
440 0 : #define INACTIVES_MAX 1024U
441 :
442 : /* Table of crds metadata, keyed by hash of the encoded data */
443 : fd_value_meta_t * value_metas;
444 : fd_value_t * values; /* Vector of full values */
445 : /* The last timestamp that we pushed our own contact info */
446 : long last_contact_time;
447 : fd_hash_t last_contact_info_v2_key;
448 :
449 : /* Array of push destinations currently in use */
450 : fd_push_state_t * push_states[FD_PUSH_LIST_MAX];
451 : ulong push_states_cnt;
452 : fd_push_state_t * push_states_pool;
453 : /* Index into values vector */
454 : ulong need_push_head;
455 :
456 : /* Table of receive statistics */
457 : fd_stats_elem_t * stats;
458 : /* Table of message type stats */
459 : fd_msg_stats_elem_t msg_stats[FD_KNOWN_CRDS_ENUM_MAX];
460 :
461 : /* Heap/queue of pending timed events */
462 : fd_pending_event_t * event_heap;
463 :
464 : /* Random number generator */
465 : fd_rng_t rng[1];
466 : /* RNG seed */
467 : ulong seed;
468 : /* Total number of packeets received */
469 : ulong recv_pkt_cnt;
470 : /* Total number of duplicate values received */
471 : ulong recv_dup_cnt;
472 : /* Total number of non-duplicate values received */
473 : ulong recv_nondup_cnt;
474 : /* Count of values pushed */
475 : ulong push_cnt;
476 : /* Count of values not pushed due to pruning */
477 : ulong not_push_cnt;
478 :
479 : /* Stake weights */
480 : fd_weights_elem_t * weights;
481 :
482 : /* List of added entrypoints at startup */
483 : ulong entrypoints_cnt;
484 : fd_gossip_peer_addr_t entrypoints[16];
485 : /* Metrics */
486 : fd_gossip_metrics_t metrics;
487 : };
488 :
489 : fd_gossip_metrics_t *
490 0 : fd_gossip_get_metrics( fd_gossip_t * gossip ) {
491 0 : return &gossip->metrics;
492 0 : }
493 :
494 : FD_FN_CONST ulong
495 0 : fd_gossip_align ( void ) { return 128UL; }
496 :
497 : FD_FN_CONST ulong
498 0 : fd_gossip_footprint( void ) {
499 0 : ulong l = FD_LAYOUT_INIT;
500 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t) );
501 0 : l = FD_LAYOUT_APPEND( l, fd_spad_align(), fd_spad_footprint( FD_GOSSIP_DECODE_BUFFER_MAX ) );
502 0 : l = FD_LAYOUT_APPEND( l, fd_peer_table_align(), fd_peer_table_footprint(FD_PEER_KEY_MAX) );
503 0 : l = FD_LAYOUT_APPEND( l, fd_active_table_align(), fd_active_table_footprint(FD_ACTIVE_KEY_MAX) );
504 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_gossip_peer_addr_t), INACTIVES_MAX*sizeof(fd_gossip_peer_addr_t) );
505 0 : l = FD_LAYOUT_APPEND( l, fd_value_meta_map_align(), fd_value_meta_map_footprint( FD_VALUE_KEY_MAX ) );
506 0 : l = FD_LAYOUT_APPEND( l, fd_value_vec_align(), fd_value_vec_footprint( FD_VALUE_DATA_MAX ) );
507 0 : l = FD_LAYOUT_APPEND( l, fd_pending_heap_align(), fd_pending_heap_footprint(FD_PENDING_MAX) );
508 0 : l = FD_LAYOUT_APPEND( l, fd_stats_table_align(), fd_stats_table_footprint(FD_STATS_KEY_MAX) );
509 0 : l = FD_LAYOUT_APPEND( l, fd_weights_table_align(), fd_weights_table_footprint(MAX_STAKE_WEIGHTS) );
510 0 : l = FD_LAYOUT_APPEND( l, fd_push_states_pool_align(), fd_push_states_pool_footprint(FD_PUSH_LIST_MAX) );
511 0 : l = FD_LAYOUT_FINI( l, fd_gossip_align() );
512 0 : return l;
513 0 : }
514 :
515 : void *
516 0 : fd_gossip_new ( void * shmem, ulong seed ) {
517 0 : FD_SCRATCH_ALLOC_INIT(l, shmem);
518 0 : fd_gossip_t * glob = (fd_gossip_t*)FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t)) ;
519 0 : fd_memset(glob, 0, sizeof(fd_gossip_t));
520 0 : glob->seed = seed;
521 :
522 0 : void * spad_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_spad_align(), fd_spad_footprint( FD_GOSSIP_DECODE_BUFFER_MAX ) );
523 0 : glob->decode_spad = fd_spad_join( fd_spad_new( spad_mem, FD_GOSSIP_DECODE_BUFFER_MAX ) );
524 :
525 0 : void * shm = FD_SCRATCH_ALLOC_APPEND(l, fd_peer_table_align(), fd_peer_table_footprint(FD_PEER_KEY_MAX));
526 0 : glob->peers = fd_peer_table_join(fd_peer_table_new(shm, FD_PEER_KEY_MAX, seed));
527 :
528 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_active_table_align(), fd_active_table_footprint(FD_ACTIVE_KEY_MAX));
529 0 : glob->actives = fd_active_table_join(fd_active_table_new(shm, FD_ACTIVE_KEY_MAX, seed));
530 :
531 0 : glob->inactives = (fd_gossip_peer_addr_t*)FD_SCRATCH_ALLOC_APPEND(l, alignof(fd_gossip_peer_addr_t), INACTIVES_MAX*sizeof(fd_gossip_peer_addr_t));
532 :
533 0 : shm = FD_SCRATCH_ALLOC_APPEND( l, fd_value_meta_map_align(), fd_value_meta_map_footprint( FD_VALUE_KEY_MAX ) );
534 0 : glob->value_metas = fd_value_meta_map_join( fd_value_meta_map_new( shm, FD_VALUE_KEY_MAX, seed ) );
535 :
536 0 : shm = FD_SCRATCH_ALLOC_APPEND( l, fd_value_vec_align(), fd_value_vec_footprint( FD_VALUE_DATA_MAX ) );
537 0 : glob->values = fd_value_vec_join( fd_value_vec_new( shm, FD_VALUE_DATA_MAX ) );
538 0 : glob->need_push_head = 0; // point to start of values
539 :
540 0 : glob->last_contact_time = 0;
541 :
542 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_pending_heap_align(), fd_pending_heap_footprint(FD_PENDING_MAX));
543 0 : glob->event_heap = fd_pending_heap_join(fd_pending_heap_new(shm, FD_PENDING_MAX));
544 :
545 0 : fd_rng_new(glob->rng, (uint)seed, 0UL);
546 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_stats_table_align(), fd_stats_table_footprint(FD_STATS_KEY_MAX));
547 0 : glob->stats = fd_stats_table_join(fd_stats_table_new(shm, FD_STATS_KEY_MAX, seed));
548 :
549 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_weights_table_align(), fd_weights_table_footprint(MAX_STAKE_WEIGHTS));
550 0 : glob->weights = fd_weights_table_join( fd_weights_table_new( shm, MAX_STAKE_WEIGHTS, seed ) );
551 :
552 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_push_states_pool_align(), fd_push_states_pool_footprint(FD_PUSH_LIST_MAX));
553 0 : glob->push_states_pool = fd_push_states_pool_join( fd_push_states_pool_new( shm, FD_PUSH_LIST_MAX ) );
554 :
555 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, fd_gossip_align() );
556 0 : if ( scratch_top > (ulong)shmem + fd_gossip_footprint() ) {
557 0 : FD_LOG_ERR(("Not enough space allocated for gossip"));
558 0 : }
559 :
560 0 : fd_contact_info_init( &glob->my_contact );
561 0 : return glob;
562 0 : }
563 :
564 : fd_gossip_t *
565 0 : fd_gossip_join ( void * shmap ) { return (fd_gossip_t *)shmap; }
566 :
567 : void *
568 0 : fd_gossip_leave ( fd_gossip_t * join ) { return join; }
569 :
570 : void *
571 0 : fd_gossip_delete ( void * shmap ) {
572 0 : fd_gossip_t * glob = (fd_gossip_t *)shmap;
573 0 : fd_peer_table_delete( fd_peer_table_leave( glob->peers ) );
574 0 : fd_active_table_delete( fd_active_table_leave( glob->actives ) );
575 :
576 0 : fd_value_meta_map_delete( fd_value_meta_map_leave( glob->value_metas ) );
577 0 : fd_value_vec_delete( fd_value_vec_leave( glob->values ) );
578 0 : fd_pending_heap_delete( fd_pending_heap_leave( glob->event_heap ) );
579 0 : fd_stats_table_delete( fd_stats_table_leave( glob->stats ) );
580 0 : fd_weights_table_delete( fd_weights_table_leave( glob->weights ) );
581 0 : fd_push_states_pool_delete( fd_push_states_pool_leave( glob->push_states_pool ) );
582 :
583 0 : return glob;
584 0 : }
585 :
586 : static void
587 0 : fd_gossip_lock( fd_gossip_t * gossip ) {
588 0 : # if FD_HAS_THREADS
589 0 : for(;;) {
590 0 : if( FD_LIKELY( !FD_ATOMIC_CAS( &gossip->lock, 0UL, 1UL) ) ) break;
591 0 : FD_SPIN_PAUSE();
592 0 : }
593 : # else
594 : gossip->lock = 1;
595 : # endif
596 0 : FD_COMPILER_MFENCE();
597 0 : }
598 :
599 : static void
600 0 : fd_gossip_unlock( fd_gossip_t * gossip ) {
601 0 : FD_COMPILER_MFENCE();
602 0 : FD_VOLATILE( gossip->lock ) = 0UL;
603 0 : }
604 :
605 : /* FIXME: do these go in fd_types_custom instead? */
606 : void
607 0 : fd_gossip_ipaddr_from_socketaddr( fd_gossip_socket_addr_t const * addr, fd_gossip_ip_addr_t * out ) {
608 0 : if( FD_LIKELY( addr->discriminant == fd_gossip_socket_addr_enum_ip4 ) ) {
609 0 : fd_gossip_ip_addr_new_disc(out, fd_gossip_ip_addr_enum_ip4);
610 0 : out->inner.ip4 = addr->inner.ip4.addr;
611 0 : } else {
612 0 : fd_gossip_ip_addr_new_disc(out, fd_gossip_ip_addr_enum_ip6);
613 0 : out->inner.ip6 = addr->inner.ip6.addr;
614 0 : }
615 0 : }
616 :
617 : ushort
618 0 : fd_gossip_port_from_socketaddr( fd_gossip_socket_addr_t const * addr ) {
619 0 : if( FD_LIKELY( addr->discriminant == fd_gossip_socket_addr_enum_ip4 ) ) {
620 0 : return addr->inner.ip4.port;
621 0 : } else {
622 0 : return addr->inner.ip6.port;
623 0 : }
624 0 : }
625 :
626 : /* Convert my style of address to solana style */
627 : int
628 0 : fd_gossip_to_soladdr( fd_gossip_socket_addr_t * dst, fd_gossip_peer_addr_t const * src ) {
629 0 : fd_gossip_socket_addr_new_disc( dst, fd_gossip_socket_addr_enum_ip4 );
630 0 : dst->inner.ip4.port = fd_ushort_bswap( src->port );
631 0 : dst->inner.ip4.addr = src->addr;
632 0 : return 0;
633 0 : }
634 :
635 : /* Convert my style of address from solana style */
636 : int
637 0 : fd_gossip_from_soladdr(fd_gossip_peer_addr_t * dst, fd_gossip_socket_addr_t const * src ) {
638 0 : FD_STATIC_ASSERT(sizeof(fd_gossip_peer_addr_t) == sizeof(ulong),"messed up size");
639 0 : dst->l = 0;
640 0 : if (src->discriminant == fd_gossip_socket_addr_enum_ip4) {
641 0 : dst->port = fd_ushort_bswap( src->inner.ip4.port );
642 0 : dst->addr = src->inner.ip4.addr;
643 0 : return 0;
644 0 : } else {
645 0 : FD_LOG_ERR(("invalid address family %lu", (ulong)src->discriminant));
646 0 : return -1;
647 0 : }
648 0 : }
649 :
650 : #define GOSSIP_ADDR_FMT FD_IP4_ADDR_FMT ":%u"
651 : #define GOSSIP_ADDR_FMT_ARGS( k ) FD_IP4_ADDR_FMT_ARGS( (k).addr ), fd_ushort_bswap( (k).port )
652 :
653 :
654 : /* Set the gossip configuration */
655 : int
656 0 : fd_gossip_set_config( fd_gossip_t * glob, const fd_gossip_config_t * config ) {
657 0 : fd_gossip_lock( glob );
658 :
659 0 : char keystr[ FD_BASE58_ENCODED_32_SZ ];
660 0 : fd_base58_encode_32( config->public_key->uc, NULL, keystr );
661 0 : FD_LOG_NOTICE(("configuring address " GOSSIP_ADDR_FMT " id %s", GOSSIP_ADDR_FMT_ARGS( config->my_addr ), keystr));
662 :
663 0 : fd_contact_info_insert_socket( &glob->my_contact, &config->my_addr, FD_GOSSIP_SOCKET_TAG_GOSSIP );
664 :
665 0 : fd_gossip_contact_info_v2_t * info = &glob->my_contact.ci_crd;
666 0 : info->from = *config->public_key;
667 0 : glob->public_key = &info->from;
668 :
669 0 : info->shred_version = config->shred_version;
670 0 : info->outset = (ulong)config->node_outset;
671 0 : info->version = config->my_version;
672 :
673 0 : glob->deliver_fun = config->deliver_fun;
674 0 : glob->deliver_arg = config->deliver_arg;
675 0 : glob->send_fun = config->send_fun;
676 0 : glob->send_arg = config->send_arg;
677 0 : glob->sign_fun = config->sign_fun;
678 0 : glob->sign_arg = config->sign_arg;
679 :
680 0 : fd_gossip_unlock( glob );
681 :
682 0 : return 0;
683 0 : }
684 :
685 : /* Updates the sockets and addrs lists in contact_info_v2 */
686 :
687 : static int
688 0 : fd_gossip_update_addr_internal( fd_gossip_t * glob, const fd_gossip_peer_addr_t * my_addr, uchar tag ) {
689 :
690 0 : fd_gossip_lock( glob );
691 0 : int res = fd_contact_info_insert_socket( &glob->my_contact, my_addr, tag );
692 0 : fd_gossip_unlock( glob );
693 :
694 :
695 0 : return res;
696 0 : }
697 :
698 : int
699 0 : fd_gossip_update_addr( fd_gossip_t * glob, const fd_gossip_peer_addr_t * my_addr ) {
700 0 : FD_LOG_NOTICE(("updating address " GOSSIP_ADDR_FMT, GOSSIP_ADDR_FMT_ARGS( *my_addr ) ));
701 0 : return fd_gossip_update_addr_internal( glob, my_addr, FD_GOSSIP_SOCKET_TAG_GOSSIP );
702 0 : }
703 :
704 : int
705 0 : fd_gossip_update_repair_addr( fd_gossip_t * glob, const fd_gossip_peer_addr_t * serve ) {
706 0 : FD_LOG_NOTICE(("updating repair service address " GOSSIP_ADDR_FMT, GOSSIP_ADDR_FMT_ARGS( *serve ) ));
707 0 : return fd_gossip_update_addr_internal( glob, serve, FD_GOSSIP_SOCKET_TAG_SERVE_REPAIR );
708 0 : }
709 :
710 : int
711 0 : fd_gossip_update_tvu_addr( fd_gossip_t * glob, const fd_gossip_peer_addr_t * tvu ) {
712 0 : FD_LOG_NOTICE(("updating tvu service address " GOSSIP_ADDR_FMT, GOSSIP_ADDR_FMT_ARGS( *tvu ) ));
713 0 : return fd_gossip_update_addr_internal( glob, tvu, FD_GOSSIP_SOCKET_TAG_TVU );
714 0 : }
715 :
716 : int
717 : fd_gossip_update_tpu_addr( fd_gossip_t * glob,
718 : fd_gossip_peer_addr_t const * tpu,
719 0 : fd_gossip_peer_addr_t const * tpu_quic ) {
720 0 : FD_LOG_NOTICE(("updating tpu service address " GOSSIP_ADDR_FMT, GOSSIP_ADDR_FMT_ARGS( *tpu ) ));
721 0 : FD_LOG_NOTICE(("updating tpu_quic service address " GOSSIP_ADDR_FMT, GOSSIP_ADDR_FMT_ARGS( *tpu_quic ) ));
722 :
723 0 : int res = fd_gossip_update_addr_internal( glob, tpu, FD_GOSSIP_SOCKET_TAG_TPU );
724 0 : res |= fd_gossip_update_addr_internal( glob, tpu_quic, FD_GOSSIP_SOCKET_TAG_TPU_QUIC );
725 0 : return res;
726 0 : }
727 :
728 : int
729 0 : fd_gossip_update_tpu_vote_addr( fd_gossip_t * glob, const fd_gossip_peer_addr_t * tpu_vote ) {
730 0 : FD_LOG_NOTICE(("updating tpu vote service address " GOSSIP_ADDR_FMT, GOSSIP_ADDR_FMT_ARGS( *tpu_vote ) ));
731 :
732 0 : return fd_gossip_update_addr_internal( glob, tpu_vote, FD_GOSSIP_SOCKET_TAG_TPU_VOTE );
733 0 : }
734 :
735 : ushort
736 0 : fd_gossip_get_shred_version( fd_gossip_t const * glob ) {
737 0 : return fd_contact_info_get_shred_version( &glob->my_contact);
738 0 : }
739 :
740 : void
741 0 : fd_gossip_set_shred_version( fd_gossip_t * glob, ushort shred_version ) {
742 0 : fd_contact_info_set_shred_version( &glob->my_contact, shred_version );
743 0 : }
744 :
745 : /* Add an event to the queue of pending timed events. */
746 : static int
747 : fd_gossip_add_pending( fd_gossip_t * glob,
748 : fd_pending_event_fun fun,
749 : fd_pending_event_arg_t fun_arg,
750 0 : long timeout ) {
751 0 : if( FD_UNLIKELY( fd_pending_heap_cnt( glob->event_heap )>=fd_pending_heap_max( glob->event_heap ) ) )
752 0 : return 0;
753 0 : fd_pending_event_t ev = { .fun=fun, .fun_arg=fun_arg, .timeout=timeout };
754 0 : fd_pending_heap_insert( glob->event_heap, &ev );
755 0 : return 1;
756 0 : }
757 :
758 : /* Send raw data as a UDP packet to an address */
759 : static void
760 0 : fd_gossip_send_raw( fd_gossip_t * glob, const fd_gossip_peer_addr_t * dest, void * data, size_t sz) {
761 0 : if ( sz > PACKET_DATA_SIZE ) {
762 0 : FD_LOG_ERR(("sending oversized packet, size=%lu", sz));
763 0 : }
764 0 : glob->metrics.send_packet_cnt += 1UL;
765 0 : fd_gossip_unlock( glob );
766 0 : (*glob->send_fun)(data, sz, dest, glob->send_arg);
767 0 : fd_gossip_lock( glob );
768 0 : }
769 :
770 : /* Send a gossip message to an address */
771 : static void
772 0 : fd_gossip_send( fd_gossip_t * glob, const fd_gossip_peer_addr_t * dest, fd_gossip_msg_t * gmsg ) {
773 : /* Encode the data */
774 0 : uchar buf[PACKET_DATA_SIZE];
775 0 : fd_bincode_encode_ctx_t ctx;
776 0 : ctx.data = buf;
777 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
778 0 : if ( fd_gossip_msg_encode( gmsg, &ctx ) ) {
779 0 : FD_LOG_ERR(( "fd_gossip_msg_encode failed" ));
780 0 : }
781 0 : size_t sz = (size_t)((const uchar *)ctx.data - buf);
782 0 : fd_gossip_send_raw( glob, dest, buf, sz);
783 0 : glob->metrics.send_message[ gmsg->discriminant ] += 1UL;
784 : // FD_LOG_WARNING(("sent msg type %u to " GOSSIP_ADDR_FMT " size=%lu", gmsg->discriminant, GOSSIP_ADDR_FMT_ARGS( *dest ), sz));
785 0 : }
786 :
787 : /* Initiate the ping/pong protocol to a validator address */
788 : static void
789 0 : fd_gossip_make_ping( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
790 : /* Update the active table where we track the state of the ping/pong
791 : protocol */
792 0 : fd_gossip_peer_addr_t * key = &arg->key;
793 0 : fd_active_elem_t * val = fd_active_table_query(glob->actives, key, NULL);
794 0 : if (val == NULL) {
795 0 : if (fd_active_table_is_full(glob->actives)) {
796 0 : glob->metrics.send_ping_events[ FD_METRICS_ENUM_SEND_PING_EVENT_V_ACTIVES_TABLE_FULL_IDX ] += 1UL;
797 0 : return;
798 0 : }
799 0 : val = fd_active_table_insert(glob->actives, key);
800 0 : fd_active_new_value(val);
801 0 : glob->metrics.send_ping_events[ FD_METRICS_ENUM_SEND_PING_EVENT_V_ACTIVES_TABLE_INSERT_IDX ] += 1UL;
802 0 : } else {
803 0 : if (val->pongtime != 0)
804 : /* Success */
805 0 : return;
806 0 : if (val->pingcount++ >= MAX_PEER_PING_COUNT) {
807 : /* Give up. This is a bad peer. */
808 0 : glob->metrics.send_ping_events[ FD_METRICS_ENUM_SEND_PING_EVENT_V_MAX_PING_COUNT_EXCEEDED_IDX ] += 1UL;
809 0 : fd_active_table_remove(glob->actives, key);
810 0 : fd_peer_table_remove(glob->peers, key);
811 0 : return;
812 0 : }
813 0 : }
814 0 : val->pingtime = glob->now;
815 : /* Generate a new token when we start a fresh round of pinging */
816 0 : if (val->pingcount == 1U) {
817 0 : for ( ulong i = 0; i < FD_HASH_FOOTPRINT / sizeof(ulong); ++i ) {
818 0 : val->pingtoken.ul[i] = fd_rng_ulong(glob->rng);
819 0 : }
820 0 : }
821 :
822 : /* Keep pinging until we succeed */
823 0 : fd_gossip_add_pending( glob,
824 0 : fd_gossip_make_ping, fd_pending_event_arg_peer_addr( *key ),
825 0 : glob->now + (long)2e8 /* 200 ms */ );
826 :
827 0 : fd_pubkey_t * public_key = glob->public_key;
828 :
829 : /* Build a ping message */
830 0 : fd_gossip_msg_t gmsg;
831 0 : fd_gossip_msg_new_disc(&gmsg, fd_gossip_msg_enum_ping);
832 0 : fd_gossip_ping_t * ping = &gmsg.inner.ping;
833 0 : ping->from = *public_key;
834 :
835 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
836 0 : fd_memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
837 0 : fd_memcpy( pre_image+16UL, val->pingtoken.uc, 32UL );
838 :
839 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, &ping->token );
840 :
841 : /* Sign it */
842 :
843 0 : (*glob->sign_fun)( glob->sign_arg, ping->signature.uc, pre_image, FD_PING_PRE_IMAGE_SZ, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519 );
844 :
845 0 : fd_gossip_send( glob, key, &gmsg );
846 0 : }
847 :
848 : /* Respond to a ping from another validator */
849 : static void
850 0 : fd_gossip_handle_ping( fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_gossip_ping_t const * ping ) {
851 : /* Verify the signature */
852 0 : fd_sha512_t sha2[1];
853 0 : if (fd_ed25519_verify( /* msg */ ping->token.uc,
854 0 : /* sz */ 32UL,
855 0 : /* sig */ ping->signature.uc,
856 0 : /* public_key */ ping->from.uc,
857 0 : sha2 )) {
858 0 : glob->metrics.recv_ping_invalid_signature += 1UL;
859 0 : FD_LOG_WARNING(("received ping with invalid signature"));
860 0 : return;
861 0 : }
862 :
863 : /* Build a pong message */
864 0 : fd_gossip_msg_t gmsg;
865 0 : fd_gossip_msg_new_disc(&gmsg, fd_gossip_msg_enum_pong);
866 0 : fd_gossip_ping_t * pong = &gmsg.inner.pong;
867 :
868 0 : pong->from = *glob->public_key;
869 :
870 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
871 0 : fd_memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
872 0 : fd_memcpy( pre_image+16UL, ping->token.uc, 32UL);
873 :
874 : /* Generate response hash token */
875 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, &pong->token );
876 :
877 : /* Sign it */
878 0 : (*glob->sign_fun)( glob->sign_arg, pong->signature.uc, pre_image, FD_PING_PRE_IMAGE_SZ, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519 );
879 :
880 0 : fd_gossip_send(glob, from, &gmsg);
881 0 : }
882 :
883 : /* Sign/timestamp an outgoing crds value */
884 : static void
885 0 : fd_gossip_sign_crds_value( fd_gossip_t * glob, fd_crds_value_t * crd ) {
886 : /* Update the identifier and timestamp */
887 0 : fd_pubkey_t * pubkey;
888 0 : ulong * wallclock;
889 0 : switch (crd->data.discriminant) {
890 0 : case fd_crds_data_enum_contact_info_v1:
891 0 : pubkey = &crd->data.inner.contact_info_v1.id;
892 0 : wallclock = &crd->data.inner.contact_info_v1.wallclock;
893 0 : break;
894 0 : case fd_crds_data_enum_vote:
895 0 : pubkey = &crd->data.inner.vote.from;
896 0 : wallclock = &crd->data.inner.vote.wallclock;
897 0 : break;
898 0 : case fd_crds_data_enum_lowest_slot:
899 0 : pubkey = &crd->data.inner.lowest_slot.from;
900 0 : wallclock = &crd->data.inner.lowest_slot.wallclock;
901 0 : break;
902 0 : case fd_crds_data_enum_snapshot_hashes:
903 0 : pubkey = &crd->data.inner.snapshot_hashes.from;
904 0 : wallclock = &crd->data.inner.snapshot_hashes.wallclock;
905 0 : break;
906 0 : case fd_crds_data_enum_accounts_hashes:
907 0 : pubkey = &crd->data.inner.accounts_hashes.from;
908 0 : wallclock = &crd->data.inner.accounts_hashes.wallclock;
909 0 : break;
910 0 : case fd_crds_data_enum_epoch_slots:
911 0 : pubkey = &crd->data.inner.epoch_slots.from;
912 0 : wallclock = &crd->data.inner.epoch_slots.wallclock;
913 0 : break;
914 0 : case fd_crds_data_enum_version_v1:
915 0 : pubkey = &crd->data.inner.version_v1.from;
916 0 : wallclock = &crd->data.inner.version_v1.wallclock;
917 0 : break;
918 0 : case fd_crds_data_enum_version_v2:
919 0 : pubkey = &crd->data.inner.version_v2.from;
920 0 : wallclock = &crd->data.inner.version_v2.wallclock;
921 0 : break;
922 0 : case fd_crds_data_enum_node_instance:
923 0 : pubkey = &crd->data.inner.node_instance.from;
924 0 : wallclock = &crd->data.inner.node_instance.wallclock;
925 0 : break;
926 0 : case fd_crds_data_enum_duplicate_shred:
927 0 : pubkey = &crd->data.inner.duplicate_shred.from;
928 0 : wallclock = &crd->data.inner.duplicate_shred.wallclock;
929 0 : break;
930 0 : case fd_crds_data_enum_incremental_snapshot_hashes:
931 0 : pubkey = &crd->data.inner.incremental_snapshot_hashes.from;
932 0 : wallclock = &crd->data.inner.incremental_snapshot_hashes.wallclock;
933 0 : break;
934 0 : case fd_crds_data_enum_contact_info_v2:
935 0 : pubkey = &crd->data.inner.contact_info_v2.from;
936 0 : wallclock = &crd->data.inner.contact_info_v2.wallclock;
937 0 : break;
938 0 : case fd_crds_data_enum_restart_last_voted_fork_slots:
939 0 : pubkey = &crd->data.inner.restart_last_voted_fork_slots.from;
940 0 : wallclock = &crd->data.inner.restart_last_voted_fork_slots.wallclock;
941 0 : break;
942 0 : case fd_crds_data_enum_restart_heaviest_fork:
943 0 : pubkey = &crd->data.inner.restart_heaviest_fork.from;
944 0 : wallclock = &crd->data.inner.restart_heaviest_fork.wallclock;
945 0 : break;
946 0 : default:
947 0 : return;
948 0 : }
949 0 : *pubkey = *glob->public_key;
950 0 : *wallclock = FD_NANOSEC_TO_MILLI(glob->now); /* convert to ms */
951 :
952 : /* Sign it */
953 0 : uchar buf[PACKET_DATA_SIZE];
954 0 : fd_bincode_encode_ctx_t ctx;
955 0 : ctx.data = buf;
956 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
957 0 : if ( fd_crds_data_encode( &crd->data, &ctx ) ) {
958 0 : FD_LOG_WARNING(("fd_crds_data_encode failed"));
959 0 : return;
960 0 : }
961 :
962 0 : (*glob->sign_fun)( glob->sign_arg, crd->signature.uc, buf, (ulong)((uchar*)ctx.data - buf), FD_KEYGUARD_SIGN_TYPE_ED25519 );
963 0 : }
964 :
965 : /* Convert a hash to a bloom filter bit position
966 : https://github.com/anza-xyz/agave/blob/v2.1.7/bloom/src/bloom.rs#L136 */
967 : static ulong
968 0 : fd_gossip_bloom_pos( fd_hash_t * hash, ulong key, ulong nbits) {
969 0 : for ( ulong i = 0; i < 32U; ++i) {
970 0 : key ^= (ulong)(hash->uc[i]);
971 0 : key *= 1099511628211UL; // FNV prime
972 0 : }
973 0 : return key % nbits;
974 0 : }
975 :
976 : /* Choose a random active peer with good ping count */
977 : static fd_active_elem_t *
978 0 : fd_gossip_random_active( fd_gossip_t * glob ) {
979 : /* Create a list of active peers with minimal pings */
980 0 : fd_active_elem_t * list[FD_ACTIVE_KEY_MAX];
981 0 : ulong listlen = 0;
982 0 : ulong totweight = 0;
983 0 : for( fd_active_table_iter_t iter = fd_active_table_iter_init( glob->actives );
984 0 : !fd_active_table_iter_done( glob->actives, iter );
985 0 : iter = fd_active_table_iter_next( glob->actives, iter ) ) {
986 0 : fd_active_elem_t * ele = fd_active_table_iter_ele( glob->actives, iter );
987 :
988 0 : if (ele->pongtime == 0 && !fd_gossip_is_allowed_entrypoint( glob, &ele->key )) {
989 0 : continue;
990 0 : } else if (listlen == 0) {
991 0 : list[0] = ele;
992 0 : listlen = 1;
993 0 : totweight = ele->weight;
994 0 : } else if (ele->pingcount > list[0]->pingcount) {
995 0 : continue;
996 0 : } else if (ele->pingcount < list[0]->pingcount) {
997 : /* Reset the list */
998 0 : list[0] = ele;
999 0 : listlen = 1;
1000 0 : totweight = ele->weight;
1001 0 : } else {
1002 0 : list[listlen++] = ele;
1003 0 : totweight += ele->weight;
1004 0 : }
1005 0 : }
1006 0 : if (listlen == 0 || totweight == 0)
1007 0 : return NULL;
1008 : /* Choose a random list element by weight */
1009 0 : ulong w = fd_rng_ulong(glob->rng) % totweight;
1010 0 : ulong j = 0;
1011 0 : for( ulong i = 0; i < listlen; ++i) {
1012 0 : if( w < j + list[i]->weight )
1013 0 : return list[i];
1014 0 : j += list[i]->weight;
1015 0 : }
1016 0 : FD_LOG_CRIT(( "I shouldn't be here" ));
1017 0 : return NULL;
1018 0 : }
1019 :
1020 : /* Generate a pull request for a random active peer */
1021 : static void
1022 0 : fd_gossip_random_pull( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
1023 0 : (void)arg;
1024 :
1025 : /* Try again in 5 sec */
1026 0 : fd_gossip_add_pending( glob, fd_gossip_random_pull, fd_pending_event_arg_null(), glob->now + (long)100e6 );
1027 :
1028 : /* Pick a random partner */
1029 0 : fd_active_elem_t * ele = fd_gossip_random_active(glob);
1030 0 : if (ele == NULL)
1031 0 : return;
1032 :
1033 : /* Compute the number of packets needed for all the bloom filter parts
1034 : with a desired false positive rate <0.1% (upper bounded by FD_BLOOM_MAX_PACKETS ) */
1035 0 : ulong nitems = fd_value_meta_map_key_cnt( glob->value_metas );
1036 0 : ulong nkeys = 1;
1037 0 : ulong npackets = 1;
1038 0 : uint nmaskbits = 0;
1039 0 : double e = 0;
1040 0 : if (nitems > 0) {
1041 0 : do {
1042 0 : double n = ((double)nitems)/((double)npackets); /* Assume even division of values */
1043 0 : double m = (double)FD_BLOOM_NUM_BITS;
1044 0 : nkeys = fd_ulong_max(1U, (ulong)((m/n)*0.69314718055994530941723212145818 /* ln(2) */));
1045 0 : nkeys = fd_ulong_min(nkeys, FD_BLOOM_MAX_KEYS);
1046 0 : if (npackets == FD_BLOOM_MAX_PACKETS)
1047 0 : break;
1048 0 : double k = (double)nkeys;
1049 0 : e = pow(1.0 - exp(-k*n/m), k);
1050 0 : if (e < 0.001)
1051 0 : break;
1052 0 : nmaskbits++;
1053 0 : npackets = 1U<<nmaskbits;
1054 0 : } while (1);
1055 0 : }
1056 0 : FD_LOG_DEBUG(("making bloom filter for %lu items with %lu packets, %u maskbits and %lu keys %g error", nitems, npackets, nmaskbits, nkeys, e));
1057 :
1058 : /* Generate random keys */
1059 0 : ulong keys[FD_BLOOM_MAX_KEYS];
1060 0 : for (ulong i = 0; i < nkeys; ++i)
1061 0 : keys[i] = fd_rng_ulong(glob->rng);
1062 : /* Set all the bits */
1063 0 : ulong num_bits_set[FD_BLOOM_MAX_PACKETS];
1064 0 : for (ulong i = 0; i < npackets; ++i)
1065 0 : num_bits_set[i] = 0;
1066 :
1067 : /* Bloom filter set sampling
1068 :
1069 : This effectively translates to sending out 1/(SAMPLE_RATE)
1070 : of the full bloom filter set every new pull request
1071 : loop.
1072 :
1073 : Some observations:
1074 : - Duplicate CRDS received rate* decreases dramatically
1075 : (from being 90% of incoming CRDS traffic to nearly negligible)
1076 : - Unique CRDS received rate* doesn't appear to be affected by
1077 : sampling
1078 :
1079 : * rate measured in values/minute
1080 : https://github.com/anza-xyz/agave/blob/v2.1.11/gossip/src/crds_gossip_pull.rs#L157-L163 */
1081 0 : ushort indices[FD_BLOOM_MAX_PACKETS];
1082 0 : for( ushort i = 0; i < npackets; ++i) {
1083 0 : indices[i] = i;
1084 0 : }
1085 0 : ushort indices_len = (ushort)npackets;
1086 0 : ulong filter_sample_size = (indices_len + FD_BLOOM_SAMPLE_RATE - 1) / FD_BLOOM_SAMPLE_RATE;
1087 0 : FD_TEST( filter_sample_size <= FD_BLOOM_MAX_PACKETS );
1088 :
1089 0 : fd_gossip_filter_selection_t selected_filters = fd_gossip_filter_selection_null();
1090 :
1091 0 : for( ushort i = 0; i < filter_sample_size; ++i) {
1092 0 : ulong idx = fd_rng_ushort( glob->rng ) % indices_len;
1093 :
1094 : /* swap and remove */
1095 0 : ushort filter_idx = indices[ idx ];
1096 0 : indices[ idx ] = indices[ --indices_len ];
1097 :
1098 : /* insert */
1099 0 : selected_filters = fd_gossip_filter_selection_insert( selected_filters, filter_idx );
1100 0 : }
1101 :
1102 0 : FD_TEST( fd_gossip_filter_selection_cnt( selected_filters ) == filter_sample_size );
1103 :
1104 :
1105 0 : #define CHUNKSIZE (FD_BLOOM_NUM_BITS/64U)
1106 0 : ulong bits[CHUNKSIZE * FD_BLOOM_MAX_PACKETS]; /* TODO: can we bound size based on sample rate instead? */
1107 0 : fd_memset(bits, 0, CHUNKSIZE*8U*npackets);
1108 0 : ulong expire = FD_NANOSEC_TO_MILLI(glob->now) - FD_GOSSIP_VALUE_EXPIRE;
1109 0 : for( fd_value_meta_map_iter_t iter = fd_value_meta_map_iter_init( glob->value_metas );
1110 0 : !fd_value_meta_map_iter_done( glob->value_metas, iter );
1111 0 : iter = fd_value_meta_map_iter_next( glob->value_metas, iter ) ) {
1112 0 : fd_value_meta_t * ele = fd_value_meta_map_iter_ele( glob->value_metas, iter );
1113 0 : fd_hash_t * hash = &(ele->key);
1114 :
1115 : /* Purge expired value's data entry */
1116 0 : if( ele->wallclock<expire && ele->value!=NULL ) {
1117 0 : ele->value->del = 1; // Mark for deletion
1118 0 : ele->value = NULL;
1119 0 : continue;
1120 0 : }
1121 : /* Choose which filter packet based on the high bits in the hash,
1122 : https://github.com/anza-xyz/agave/blob/v2.1.7/gossip/src/crds_gossip_pull.rs#L167
1123 :
1124 : skip if packet not part of sample
1125 : https://github.com/anza-xyz/agave/blob/v2.1.11/gossip/src/crds_gossip_pull.rs#L175-L177
1126 : */
1127 :
1128 0 : ulong index = (nmaskbits == 0 ? 0UL : ( hash->ul[0] >> (64U - nmaskbits) ));
1129 0 : if( FD_LIKELY( !fd_gossip_filter_selection_test( selected_filters, index ) ) ) {
1130 0 : continue;
1131 0 : };
1132 :
1133 0 : ulong * chunk = bits + (index*CHUNKSIZE);
1134 :
1135 : /* https://github.com/anza-xyz/agave/blob/v2.1.7/bloom/src/bloom.rs#L191 */
1136 0 : for (ulong i = 0; i < nkeys; ++i) {
1137 0 : ulong pos = fd_gossip_bloom_pos(hash, keys[i], FD_BLOOM_NUM_BITS);
1138 : /* https://github.com/anza-xyz/agave/blob/v2.1.7/bloom/src/bloom.rs#L182-L185 */
1139 0 : ulong * j = chunk + (pos>>6U); /* divide by 64 */
1140 0 : ulong bit = 1UL<<(pos & 63U);
1141 0 : if (!((*j) & bit)) {
1142 0 : *j |= bit;
1143 0 : num_bits_set[index]++;
1144 0 : }
1145 0 : }
1146 0 : }
1147 :
1148 : /* Assemble the packets */
1149 0 : fd_gossip_msg_t gmsg;
1150 0 : fd_gossip_msg_new_disc(&gmsg, fd_gossip_msg_enum_pull_req);
1151 0 : fd_gossip_pull_req_t * req = &gmsg.inner.pull_req;
1152 0 : fd_crds_filter_t * filter = &req->filter;
1153 0 : filter->mask_bits = nmaskbits;
1154 0 : filter->filter.keys_len = nkeys;
1155 0 : filter->filter.keys = keys;
1156 0 : filter->filter.bits_len = FD_BLOOM_NUM_BITS;
1157 0 : filter->filter.has_bits = 1;
1158 0 : filter->filter.bits_bitvec_len = FD_BLOOM_NUM_BITS/64U;
1159 :
1160 : /* The "value" in the request is always my own contact info (v2) */
1161 0 : fd_crds_value_t * value = &req->value;
1162 0 : fd_crds_data_new_disc(&value->data, fd_crds_data_enum_contact_info_v2);
1163 0 : value->data.inner.contact_info_v2 = glob->my_contact.ci_crd;
1164 0 : fd_gossip_sign_crds_value(glob, value);
1165 :
1166 0 : for( fd_gossip_filter_selection_iter_t iter = fd_gossip_filter_selection_iter_init( selected_filters );
1167 0 : !fd_gossip_filter_selection_iter_done( iter );
1168 0 : iter = fd_gossip_filter_selection_iter_next( iter ) ){
1169 0 : ulong index = fd_gossip_filter_selection_iter_idx( iter );
1170 0 : filter->mask = (nmaskbits == 0 ? ~0UL : ((index << (64U - nmaskbits)) | (~0UL >> nmaskbits)));
1171 0 : filter->filter.num_bits_set = num_bits_set[index];
1172 0 : filter->filter.bits_bitvec = bits + (index*CHUNKSIZE);
1173 0 : fd_gossip_send(glob, &ele->key, &gmsg);
1174 0 : }
1175 0 : }
1176 :
1177 : /* Handle a pong response */
1178 : static void
1179 0 : fd_gossip_handle_pong( fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_gossip_ping_t const * pong ) {
1180 0 : #define INC_RECV_PONG_EVENT_CNT( REASON ) glob->metrics.recv_pong_events[ FD_CONCAT3( FD_METRICS_ENUM_RECV_PONG_EVENT_V_, REASON, _IDX ) ] += 1UL
1181 0 : fd_active_elem_t * val = fd_active_table_query(glob->actives, from, NULL);
1182 0 : if (val == NULL) {
1183 0 : INC_RECV_PONG_EVENT_CNT( EXPIRED );
1184 0 : FD_LOG_DEBUG(("received pong too late"));
1185 0 : return;
1186 0 : }
1187 :
1188 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
1189 0 : fd_memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
1190 0 : fd_memcpy( pre_image+16UL, val->pingtoken.uc, 32UL );
1191 :
1192 :
1193 0 : fd_hash_t pre_image_hash;
1194 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, pre_image_hash.uc );
1195 :
1196 : /* Confirm response hash token */
1197 0 : fd_sha256_t sha[1];
1198 0 : fd_sha256_init( sha );
1199 0 : fd_sha256_append( sha, "SOLANA_PING_PONG", 16UL );
1200 :
1201 0 : fd_sha256_append( sha, pre_image_hash.uc, 32UL );
1202 0 : fd_hash_t pongtoken;
1203 0 : fd_sha256_fini( sha, pongtoken.uc );
1204 0 : if (memcmp(pongtoken.uc, pong->token.uc, 32UL) != 0) {
1205 0 : INC_RECV_PONG_EVENT_CNT( WRONG_TOKEN );
1206 0 : FD_LOG_DEBUG(( "received pong with wrong token" ));
1207 0 : return;
1208 0 : }
1209 :
1210 : /* Verify the signature */
1211 0 : fd_sha512_t sha2[1];
1212 0 : if (fd_ed25519_verify( /* msg */ pong->token.uc,
1213 0 : /* sz */ 32UL,
1214 0 : /* sig */ pong->signature.uc,
1215 0 : /* public_key */ pong->from.uc,
1216 0 : sha2 )) {
1217 0 : INC_RECV_PONG_EVENT_CNT( INVALID_SIGNATURE );
1218 0 : FD_LOG_WARNING(("received pong with invalid signature"));
1219 0 : return;
1220 0 : }
1221 :
1222 0 : val->pongtime = glob->now;
1223 0 : val->id = pong->from;
1224 :
1225 : /* Remember that this is a good peer */
1226 0 : fd_peer_elem_t * peerval = fd_peer_table_query(glob->peers, from, NULL);
1227 0 : if (peerval == NULL) {
1228 0 : INC_RECV_PONG_EVENT_CNT( NEW_PEER );
1229 0 : if (fd_peer_table_is_full(glob->peers)) {
1230 0 : INC_RECV_PONG_EVENT_CNT( TABLE_FULL );
1231 0 : FD_LOG_DEBUG(("too many peers"));
1232 0 : return;
1233 0 : }
1234 0 : peerval = fd_peer_table_insert(glob->peers, from);
1235 0 : peerval->stake = 0;
1236 0 : }
1237 0 : peerval->wallclock = FD_NANOSEC_TO_MILLI(glob->now); /* In millisecs */
1238 0 : peerval->id = pong->from;
1239 :
1240 0 : fd_weights_elem_t const * val2 = fd_weights_table_query_const( glob->weights, &val->id, NULL );
1241 0 : val->weight = ( val2 == NULL ? 1UL : val2->weight );
1242 :
1243 0 : }
1244 :
1245 : /* Initiate a ping/pong with a random active partner to confirm it is
1246 : still alive. */
1247 : static void
1248 0 : fd_gossip_random_ping( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
1249 0 : (void)arg;
1250 :
1251 : /* Try again in 1 sec */
1252 0 : fd_gossip_add_pending( glob,
1253 0 : fd_gossip_random_ping, fd_pending_event_arg_null(),
1254 0 : glob->now + (long)100e6 );
1255 :
1256 0 : ulong cnt = fd_active_table_key_cnt( glob->actives );
1257 0 : if( cnt == 0 && glob->inactives_cnt == 0 )
1258 0 : return;
1259 0 : fd_gossip_peer_addr_t * addr = NULL;
1260 0 : if( glob->inactives_cnt > 0 && cnt < FD_ACTIVE_KEY_MAX )
1261 : /* Try a new peer */
1262 0 : addr = glob->inactives + (--(glob->inactives_cnt));
1263 0 : else {
1264 : /* Choose a random active peer */
1265 0 : ulong i = fd_rng_ulong( glob->rng ) % cnt;
1266 0 : ulong j = 0;
1267 0 : for( fd_active_table_iter_t iter = fd_active_table_iter_init( glob->actives );
1268 0 : !fd_active_table_iter_done( glob->actives, iter );
1269 0 : iter = fd_active_table_iter_next( glob->actives, iter ) ) {
1270 0 : if( FD_UNLIKELY( i==j++ ) ) {
1271 0 : fd_active_elem_t * ele = fd_active_table_iter_ele( glob->actives, iter );
1272 0 : if( (glob->now - ele->pingtime)<(long)60e9 ) /* minute cooldown */
1273 0 : return;
1274 0 : ele->pingcount = 0;
1275 0 : ele->pongtime = 0;
1276 0 : addr = &(ele->key);
1277 0 : break;
1278 0 : }
1279 0 : }
1280 0 : }
1281 0 : fd_pending_event_arg_t arg2;
1282 0 : fd_gossip_peer_addr_copy( &arg2.key, addr );
1283 0 : fd_gossip_make_ping( glob, &arg2 );
1284 0 : }
1285 :
1286 : /* CRDS processing utils.
1287 : TODO: move to a separate fd_crds file? Need to decouple gossip metrics first */
1288 :
1289 : /* fd_crds_dedup_check returns 1 if key exists in the CRDS value table, 0 otherwise.
1290 : Also logs the
1291 : - the host that sent the duplicate message
1292 : - origin of the actual CRDS value
1293 : for use in making prune messages. */
1294 : static int
1295 0 : fd_crds_dup_check( fd_gossip_t * glob, fd_hash_t * key, const fd_gossip_peer_addr_t * from, const fd_pubkey_t * origin ) {
1296 0 : fd_value_meta_t * msg = fd_value_meta_map_query( glob->value_metas, key, NULL );
1297 :
1298 0 : if( msg!=NULL ) {
1299 : /* Already have this value */
1300 0 : if( from!=NULL ) {
1301 : /* Record the dup in the receive statistics table */
1302 0 : fd_stats_elem_t * val = fd_stats_table_query(glob->stats, from, NULL);
1303 0 : if( val==NULL ) {
1304 0 : if (!fd_stats_table_is_full(glob->stats)) {
1305 0 : val = fd_stats_table_insert(glob->stats, from);
1306 0 : val->dups_cnt = 0;
1307 0 : }
1308 0 : }
1309 0 : if( val!=NULL ) {
1310 0 : val->last = glob->now;
1311 0 : for( ulong i = 0; i<val->dups_cnt; ++i ){
1312 0 : if( fd_hash_eq(&val->dups[i].origin, origin ) ) {
1313 0 : val->dups[i].cnt++;
1314 0 : goto found_origin;
1315 0 : }
1316 0 : }
1317 0 : if( val->dups_cnt<MAX_DUP_ORIGINS ) {
1318 0 : ulong i = val->dups_cnt++;
1319 0 : val->dups[i].origin = *origin;
1320 0 : val->dups[i].cnt = 1;
1321 0 : }
1322 0 : found_origin: ;
1323 0 : }
1324 0 : }
1325 0 : return 1;
1326 0 : } else {
1327 0 : return 0;
1328 0 : }
1329 0 : }
1330 : /* fd_crds_sigverify verifies the data in an encoded CRDS value.
1331 : Assumes the following CRDS value layout (packed)
1332 : {
1333 : fd_signature_t signature;
1334 : uchar* data;
1335 : } */
1336 : static int
1337 0 : fd_crds_sigverify( uchar * crds_encoded_val, ulong crds_encoded_len, fd_pubkey_t * pubkey ) {
1338 :
1339 0 : fd_signature_t * sig = (fd_signature_t *)crds_encoded_val;
1340 0 : uchar * data = (crds_encoded_val + sizeof(fd_signature_t));
1341 0 : ulong datalen = crds_encoded_len - sizeof(fd_signature_t);
1342 :
1343 0 : static fd_sha512_t sha[1]; /* static is ok since ed25519_verify calls sha512_init */
1344 0 : return fd_ed25519_verify( data,
1345 0 : datalen,
1346 0 : sig->uc,
1347 0 : pubkey->uc,
1348 0 : sha );
1349 0 : }
1350 :
1351 :
1352 0 : #define INC_RECV_CRDS_DROP_METRIC( REASON ) glob->metrics.recv_crds_drop_reason[ CRDS_DROP_REASON_IDX( REASON ) ] += 1UL
1353 :
1354 : /* fd_gossip_recv_crds_array processes crds_len crds values. First
1355 : performs a filter pass, dropping duplicate/own values and
1356 : exiting* on any sigverify failures. Then inserts the filtered
1357 : values into the CRDS value table. Also performs housekeeping, like
1358 : updating contact infos and push queue. The filtered crds data is
1359 : finally dispatched via the glob->deliver_fun callback.
1360 :
1361 : *(an exit drops the full packet, so no values are inserted into the table)
1362 : This only fails on a crds value encode failure, which is impossible if
1363 : the value was derived from a gossip message decode. */
1364 : static void
1365 0 : fd_gossip_recv_crds_array( fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_crds_value_t * crds, ulong crds_len, fd_gossip_crds_route_t route ) {
1366 : /* Sanity check */
1367 0 : if( FD_UNLIKELY( crds_len > FD_GOSSIP_MAX_CRDS_VALS ) ) {
1368 0 : FD_LOG_ERR(( "too many CRDS values, max %u vs %lu received", FD_GOSSIP_MAX_CRDS_VALS, crds_len ));
1369 0 : }
1370 :
1371 0 : if( FD_UNLIKELY( fd_value_vec_cnt( glob->values ) + crds_len > fd_value_vec_max( glob->values ))){
1372 0 : INC_RECV_CRDS_DROP_METRIC( TABLE_FULL );
1373 0 : FD_LOG_DEBUG(( "too many values" ));
1374 0 : return;
1375 0 : }
1376 0 : fd_value_t * retained_vals = fd_value_vec_expand( glob->values, crds_len );
1377 0 : fd_crds_value_t * retained_crds[FD_GOSSIP_MAX_CRDS_VALS]; /* store pointers to decoded crds entries we retain */
1378 0 : ulong num_retained_crds = 0;
1379 :
1380 :
1381 : /**************** Filter pass ******************/
1382 0 : for( ulong i = 0; i < crds_len; ++i ) {
1383 0 : fd_value_t * val = &retained_vals[ num_retained_crds ]; /* This will overwrite if previous value was filtered, should be safe */
1384 0 : fd_crds_value_t * crd = &crds[ i ];
1385 0 : retained_crds[ num_retained_crds ] = crd; /* for use in insert pass */
1386 :
1387 0 : int drop_reason_idx = fd_value_from_crds( val, crd );
1388 0 : if( FD_UNLIKELY( drop_reason_idx ) ) {
1389 0 : glob->metrics.recv_crds_drop_reason[ drop_reason_idx ] += 1UL;
1390 0 : return; /* Drop full packet if any issues extracting CRDS */
1391 0 : };
1392 :
1393 0 : glob->metrics.recv_crds[ route ][ crd->data.discriminant ] += 1UL;
1394 :
1395 0 : if( memcmp( val->origin.uc, glob->public_key->uc, 32U )==0 ) {
1396 : /* skip my own messages */
1397 0 : INC_RECV_CRDS_DROP_METRIC( OWN_MESSAGE );
1398 0 : continue;
1399 0 : }
1400 :
1401 0 : fd_msg_stats_elem_t * msg_stat = &glob->msg_stats[ crd->data.discriminant ];
1402 0 : msg_stat->total_cnt++;
1403 0 : msg_stat->bytes_rx_cnt += val->datalen;
1404 :
1405 : /* Dedup first */
1406 0 : if ( fd_crds_dup_check( glob, &val->key, from, &val->origin ) ) {
1407 0 : msg_stat->dups_cnt++;
1408 0 : glob->recv_dup_cnt++;
1409 0 : glob->metrics.recv_crds_duplicate_message[ route ][ crd->data.discriminant ]++;
1410 0 : continue; /* skip this entry */
1411 0 : }
1412 :
1413 0 : glob->recv_nondup_cnt++;
1414 : /* Sigverify step
1415 : Skip verifying epoch slots because they:
1416 : - are not used anywhere within the client
1417 : - still store them in table for forwarding
1418 : - represent a significant portion of inbound CRDS
1419 : traffic (~50%)
1420 : - will be deprecated soon */
1421 0 : if( crd->data.discriminant!=fd_crds_data_enum_epoch_slots &&
1422 0 : fd_crds_sigverify( val->data, val->datalen, &val->origin ) ) {
1423 0 : INC_RECV_CRDS_DROP_METRIC( INVALID_SIGNATURE );
1424 : /* drop full packet on bad signature
1425 : https://github.com/anza-xyz/agave/commit/d68b5de6c0fc07d60cf9749ae82c2651a549e81b */
1426 0 : fd_value_vec_contract( glob->values, crds_len );
1427 0 : return;
1428 0 : }
1429 0 : num_retained_crds++;
1430 0 : }
1431 :
1432 : /* Contract vector by number of values not retained */
1433 0 : fd_value_vec_contract( glob->values, crds_len - num_retained_crds );
1434 :
1435 : /**************** Insert pass ****************/
1436 0 : for( ulong i = 0; i < num_retained_crds; ++i ) {
1437 0 : fd_value_t * val = &retained_vals[ i ];
1438 : /* Technically not needed, len(value_key_map) >>> len(values) */
1439 0 : if( FD_UNLIKELY( fd_value_meta_map_is_full( glob->value_metas ) ) ) {
1440 0 : INC_RECV_CRDS_DROP_METRIC( TABLE_FULL );
1441 0 : FD_LOG_DEBUG(( "too many values" ));
1442 0 : fd_value_vec_contract( glob->values, num_retained_crds );
1443 0 : return;
1444 0 : }
1445 :
1446 : /* Insert into the value set (duplicate check performed in filter pass) */
1447 0 : fd_value_meta_t * ele = fd_value_meta_map_insert( glob->value_metas, &val->key );
1448 :
1449 0 : fd_value_meta_map_value_init( ele,
1450 0 : val->wallclock,
1451 0 : val );
1452 :
1453 :
1454 0 : fd_crds_value_t * crd = retained_crds[ i ];
1455 :
1456 0 : if( crd->data.discriminant==fd_crds_data_enum_contact_info_v2 ) {
1457 0 : fd_gossip_contact_info_v2_t * info = &crd->data.inner.contact_info_v2;
1458 0 : fd_gossip_socket_addr_t socket_addr;
1459 0 : if( fd_gossip_contact_info_v2_find_proto_ident( info, FD_GOSSIP_SOCKET_TAG_GOSSIP, &socket_addr ) ) {
1460 0 : if( fd_gossip_port_from_socketaddr( &socket_addr )!=0 &&
1461 0 : fd_gossip_socket_addr_is_ip4( &socket_addr ) ) { /* Only support ipv4 */
1462 : /* Remember the peer */
1463 0 : fd_gossip_peer_addr_t pkey;
1464 0 : fd_memset( &pkey, 0, sizeof(pkey) );
1465 0 : fd_gossip_from_soladdr( &pkey, &socket_addr );
1466 0 : fd_peer_elem_t * peer = fd_peer_table_query( glob->peers, &pkey, NULL );
1467 0 : if( peer==NULL ) {
1468 0 : if(fd_peer_table_is_full( glob->peers ) ) {
1469 0 : INC_RECV_CRDS_DROP_METRIC( PEER_TABLE_FULL );
1470 0 : FD_LOG_DEBUG(( "too many peers" ));
1471 0 : } else {
1472 0 : peer = fd_peer_table_insert( glob->peers, &pkey );
1473 :
1474 0 : if( glob->inactives_cnt>=INACTIVES_MAX ) {
1475 0 : INC_RECV_CRDS_DROP_METRIC( INACTIVES_QUEUE_FULL );
1476 0 : } else if( fd_active_table_query( glob->actives, &pkey, NULL )==NULL ) {
1477 : /* Queue this peer for later pinging */
1478 0 : fd_gossip_peer_addr_copy( glob->inactives + (glob->inactives_cnt++), &pkey );
1479 0 : }
1480 0 : }
1481 0 : }
1482 0 : if( peer!=NULL ) {
1483 0 : peer->wallclock = val->wallclock;
1484 0 : peer->stake = 0;
1485 0 : peer->id = info->from;
1486 0 : } else {
1487 0 : INC_RECV_CRDS_DROP_METRIC( DISCARDED_PEER );
1488 0 : }
1489 0 : }
1490 :
1491 0 : fd_gossip_peer_addr_t peer_addr = { .addr = socket_addr.inner.ip4.addr,
1492 : /* FIXME: hardcode to ip4 inner? */
1493 0 : .port = fd_ushort_bswap( fd_gossip_port_from_socketaddr( &socket_addr ) ) };
1494 0 : if( fd_gossip_get_shred_version( glob )==0U &&
1495 0 : fd_gossip_is_allowed_entrypoint( glob, &peer_addr ) ) {
1496 0 : FD_LOG_NOTICE(( "using shred version %lu", (ulong)crd->data.inner.contact_info_v2.shred_version ));
1497 0 : fd_gossip_set_shred_version( glob, crd->data.inner.contact_info_v2.shred_version );
1498 0 : }
1499 0 : }
1500 0 : }
1501 :
1502 0 : glob->metrics.gossip_peer_cnt[ FD_METRICS_ENUM_GOSSIP_PEER_STATE_V_INACTIVE_IDX ] = glob->inactives_cnt;
1503 0 : glob->metrics.value_meta_cnt = fd_value_meta_map_key_cnt( glob->value_metas );
1504 0 : glob->metrics.value_vec_cnt = fd_value_vec_cnt( glob->values );
1505 :
1506 : /* Deliver the data upstream */
1507 0 : fd_gossip_unlock( glob );
1508 0 : (*glob->deliver_fun)( &crd->data, glob->deliver_arg );
1509 0 : fd_gossip_lock( glob );
1510 0 : }
1511 :
1512 0 : }
1513 : #undef INC_RECV_CRDS_DROP_METRIC
1514 :
1515 : static int
1516 0 : verify_signable_data_with_prefix( fd_gossip_t * glob, fd_gossip_prune_msg_t * msg ) {
1517 0 : fd_gossip_prune_sign_data_with_prefix_t signdata[1] = {0};
1518 0 : signdata->prefix = (uchar *)&FD_GOSSIP_PRUNE_DATA_PREFIX;
1519 0 : signdata->prefix_len = 18UL;
1520 0 : signdata->data.pubkey = msg->data.pubkey;
1521 0 : signdata->data.prunes_len = msg->data.prunes_len;
1522 0 : signdata->data.prunes = msg->data.prunes;
1523 0 : signdata->data.destination = msg->data.destination;
1524 0 : signdata->data.wallclock = msg->data.wallclock;
1525 :
1526 0 : uchar buf[PACKET_DATA_SIZE];
1527 0 : fd_bincode_encode_ctx_t ctx;
1528 0 : ctx.data = buf;
1529 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
1530 0 : if ( fd_gossip_prune_sign_data_with_prefix_encode( signdata, &ctx ) ) {
1531 0 : glob->metrics.handle_prune_fails[ FD_METRICS_ENUM_PRUNE_FAILURE_REASON_V_SIGN_ENCODING_FAILED_IDX ] += 1UL;
1532 0 : FD_LOG_WARNING(("fd_gossip_prune_sign_data_encode failed"));
1533 0 : return 1;
1534 0 : }
1535 :
1536 0 : fd_sha512_t sha[1];
1537 0 : return fd_ed25519_verify( /* msg */ buf,
1538 0 : /* sz */ (ulong)((uchar*)ctx.data - buf),
1539 0 : /* sig */ msg->data.signature.uc,
1540 0 : /* public_key */ msg->data.pubkey.uc,
1541 0 : sha );
1542 0 : }
1543 :
1544 : static int
1545 0 : verify_signable_data( fd_gossip_t * glob, fd_gossip_prune_msg_t * msg ) {
1546 0 : fd_gossip_prune_sign_data_t signdata;
1547 0 : signdata.pubkey = msg->data.pubkey;
1548 0 : signdata.prunes_len = msg->data.prunes_len;
1549 0 : signdata.prunes = msg->data.prunes;
1550 0 : signdata.destination = msg->data.destination;
1551 0 : signdata.wallclock = msg->data.wallclock;
1552 :
1553 0 : uchar buf[PACKET_DATA_SIZE];
1554 0 : fd_bincode_encode_ctx_t ctx;
1555 0 : ctx.data = buf;
1556 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
1557 0 : if ( fd_gossip_prune_sign_data_encode( &signdata, &ctx ) ) {
1558 0 : glob->metrics.handle_prune_fails[ FD_METRICS_ENUM_PRUNE_FAILURE_REASON_V_SIGN_ENCODING_FAILED_IDX ] += 1UL;
1559 0 : FD_LOG_WARNING(("fd_gossip_prune_sign_data_encode failed"));
1560 0 : return 1;
1561 0 : }
1562 :
1563 0 : fd_sha512_t sha[1];
1564 0 : return fd_ed25519_verify( /* msg */ buf,
1565 0 : /* sz */ (ulong)((uchar*)ctx.data - buf),
1566 0 : /* sig */ msg->data.signature.uc,
1567 0 : /* public_key */ msg->data.pubkey.uc,
1568 0 : sha );
1569 0 : }
1570 :
1571 : /* Handle a prune request from somebody else */
1572 : static void
1573 0 : fd_gossip_handle_prune(fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_gossip_prune_msg_t * msg) {
1574 0 : (void)from;
1575 :
1576 : /* Confirm the message is for me */
1577 0 : if (memcmp(msg->data.destination.uc, glob->public_key->uc, 32U) != 0)
1578 0 : return;
1579 :
1580 : /* Try to verify the signed data either with the prefix and not the prefix */
1581 0 : if ( ! ( verify_signable_data( glob, msg ) == FD_ED25519_SUCCESS ||
1582 0 : verify_signable_data_with_prefix( glob, msg ) == FD_ED25519_SUCCESS ) ) {
1583 0 : glob->metrics.handle_prune_fails[ FD_METRICS_ENUM_PRUNE_FAILURE_REASON_V_INVALID_SIGNATURE_IDX ] += 1UL;
1584 0 : FD_LOG_WARNING(( "received prune message with invalid signature" ));
1585 0 : return;
1586 0 : }
1587 :
1588 : /* Find the active push state which needs to be pruned */
1589 0 : fd_push_state_t* ps = NULL;
1590 0 : for (ulong i = 0; i < glob->push_states_cnt; ++i) {
1591 0 : fd_push_state_t* s = glob->push_states[i];
1592 0 : if (memcmp(msg->data.pubkey.uc, s->id.uc, 32U) == 0) {
1593 0 : ps = s;
1594 0 : break;
1595 0 : }
1596 0 : }
1597 0 : if (ps == NULL)
1598 0 : return;
1599 :
1600 : /* Set the bloom filter prune bits */
1601 0 : for (ulong i = 0; i < msg->data.prunes_len; ++i) {
1602 0 : fd_pubkey_t * p = msg->data.prunes + i;
1603 0 : for (ulong j = 0; j < FD_PRUNE_NUM_KEYS; ++j) {
1604 0 : ulong pos = fd_gossip_bloom_pos(p, ps->prune_keys[j], FD_PRUNE_NUM_BITS);
1605 0 : ulong * j = ps->prune_bits + (pos>>6U); /* divide by 64 */
1606 0 : ulong bit = 1UL<<(pos & 63U);
1607 0 : *j |= bit;
1608 0 : }
1609 0 : }
1610 0 : }
1611 :
1612 : static int
1613 : fd_gossip_push_value_nolock( fd_gossip_t * glob, fd_crds_data_t * data, fd_hash_t * key_opt );
1614 :
1615 : /* Push an updated version of my contact info into values */
1616 : static void
1617 0 : fd_gossip_push_updated_contact(fd_gossip_t * glob) {
1618 : /* See if we have a shred version yet */
1619 0 : if ( fd_gossip_get_shred_version( glob )==0U )
1620 0 : return;
1621 :
1622 0 : if( (glob->now - glob->last_contact_time)<(long)1e9 )
1623 0 : return;
1624 :
1625 0 : if( glob->last_contact_time!=0 ) {
1626 0 : fd_value_meta_t * ele = fd_value_meta_map_query( glob->value_metas, &glob->last_contact_info_v2_key, NULL );
1627 0 : if( ele!=NULL ) {
1628 0 : ele->value->del = 1UL;
1629 0 : fd_value_meta_map_remove( glob->value_metas, &glob->last_contact_info_v2_key );
1630 0 : }
1631 0 : }
1632 :
1633 0 : glob->last_contact_time = glob->now;
1634 0 : glob->my_contact.ci_crd.wallclock = FD_NANOSEC_TO_MILLI( glob->now );
1635 :
1636 0 : fd_crds_data_t ci_v2;
1637 0 : fd_crds_data_new_disc( &ci_v2, fd_crds_data_enum_contact_info_v2 );
1638 0 : ci_v2.inner.contact_info_v2 = glob->my_contact.ci_crd;
1639 :
1640 0 : fd_gossip_push_value_nolock( glob, &ci_v2, &glob->last_contact_info_v2_key );
1641 0 : }
1642 :
1643 : /* Respond to a pull request */
1644 : static void
1645 0 : fd_gossip_handle_pull_req(fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_gossip_pull_req_t * msg) {
1646 0 : #define INC_HANDLE_PULL_REQ_FAIL_METRIC( REASON ) glob->metrics.handle_pull_req_fails[ FD_CONCAT3( FD_METRICS_ENUM_PULL_REQ_FAIL_REASON_V_, REASON, _IDX ) ] += 1UL
1647 0 : fd_active_elem_t * val = fd_active_table_query(glob->actives, from, NULL);
1648 0 : if (val == NULL || val->pongtime == 0) {
1649 :
1650 0 : if ( val == NULL ) {
1651 0 : INC_HANDLE_PULL_REQ_FAIL_METRIC( PEER_NOT_IN_ACTIVES );
1652 0 : }
1653 0 : else if ( val->pongtime == 0 ) {
1654 0 : INC_HANDLE_PULL_REQ_FAIL_METRIC( UNRESPONSIVE_PEER );
1655 0 : }
1656 :
1657 : /* Ping new peers before responding to requests */
1658 : /* TODO: is this the right thing to do here? */
1659 0 : if( fd_pending_heap_cnt( glob->event_heap )+100U > fd_pending_heap_max( glob->event_heap ) ) {
1660 0 : INC_HANDLE_PULL_REQ_FAIL_METRIC( PENDING_POOL_FULL );
1661 0 : return;
1662 0 : }
1663 0 : fd_pending_event_arg_t arg2;
1664 0 : fd_gossip_peer_addr_copy(&arg2.key, from);
1665 0 : fd_gossip_make_ping(glob, &arg2);
1666 0 : return;
1667 0 : }
1668 :
1669 : /* Encode an empty pull response as a template */
1670 0 : fd_gossip_msg_t gmsg;
1671 0 : fd_gossip_msg_new_disc(&gmsg, fd_gossip_msg_enum_pull_resp);
1672 0 : fd_gossip_pull_resp_t * pull_resp = &gmsg.inner.pull_resp;
1673 0 : pull_resp->pubkey = *glob->public_key;
1674 :
1675 0 : uchar buf[PACKET_DATA_SIZE];
1676 0 : fd_bincode_encode_ctx_t ctx;
1677 0 : ctx.data = buf;
1678 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
1679 0 : if ( fd_gossip_msg_encode( &gmsg, &ctx ) ) {
1680 0 : INC_HANDLE_PULL_REQ_FAIL_METRIC( ENCODING_FAILED );
1681 0 : FD_LOG_WARNING(("fd_gossip_msg_encode failed"));
1682 0 : return;
1683 0 : }
1684 : /* Reach into buffer to get the number of values */
1685 0 : uchar * newend = (uchar *)ctx.data;
1686 0 : ulong * crds_len = (ulong *)(newend - sizeof(ulong));
1687 :
1688 : /* Push an updated version of my contact info into values */
1689 0 : fd_gossip_push_updated_contact(glob);
1690 :
1691 : /* Apply the bloom filter to my table of values */
1692 0 : fd_crds_filter_t * filter = &msg->filter;
1693 0 : ulong nkeys = filter->filter.keys_len;
1694 0 : ulong * keys = filter->filter.keys;
1695 0 : ulong * inner = filter->filter.bits_bitvec;
1696 0 : ulong expire = FD_NANOSEC_TO_MILLI(glob->now) - FD_GOSSIP_PULL_TIMEOUT;
1697 0 : ulong hits = 0;
1698 0 : ulong misses = 0;
1699 0 : uint npackets = 0;
1700 0 : for( ulong i = 0UL; i < fd_value_vec_cnt( glob->values ); ++i ) {
1701 0 : fd_value_t * ele = &glob->values[ i ];
1702 0 : fd_hash_t * hash = &(ele->key);
1703 0 : if (ele->wallclock < expire)
1704 0 : continue;
1705 : /* Execute the bloom filter */
1706 0 : if (filter->mask_bits != 0U) {
1707 0 : ulong m = (~0UL >> filter->mask_bits);
1708 0 : if ((hash->ul[0] | m) != filter->mask)
1709 0 : continue;
1710 0 : }
1711 0 : int miss = 0;
1712 0 : for (ulong i = 0; i < nkeys; ++i) {
1713 0 : ulong pos = fd_gossip_bloom_pos(hash, keys[i], filter->filter.bits_len);
1714 0 : ulong * j = inner + (pos>>6U); /* divide by 64 */
1715 0 : ulong bit = 1UL<<(pos & 63U);
1716 0 : if (!((*j) & bit)) {
1717 0 : miss = 1;
1718 0 : break;
1719 0 : }
1720 0 : }
1721 0 : if (!miss) {
1722 0 : hits++;
1723 0 : continue;
1724 0 : }
1725 0 : misses++;
1726 :
1727 : /* Add the value in already encoded form */
1728 0 : if (newend + ele->datalen - buf > PACKET_DATA_SIZE) {
1729 : /* Packet is getting too large. Flush it */
1730 0 : ulong sz = (ulong)(newend - buf);
1731 0 : fd_gossip_send_raw(glob, from, buf, sz);
1732 0 : glob->metrics.send_message[ FD_METRICS_ENUM_GOSSIP_MESSAGE_V_PULL_RESPONSE_IDX ]++;
1733 0 : FD_LOG_DEBUG(("sent msg type %u to " GOSSIP_ADDR_FMT " size=%lu", gmsg.discriminant, GOSSIP_ADDR_FMT_ARGS( *from ), sz));
1734 0 : ++npackets;
1735 0 : newend = (uchar *)ctx.data;
1736 0 : *crds_len = 0;
1737 0 : }
1738 0 : fd_memcpy(newend, ele->data, ele->datalen);
1739 0 : newend += ele->datalen;
1740 0 : (*crds_len)++;
1741 0 : }
1742 : /* Record the number of hits and misses
1743 :
1744 : These metrics are imprecise as the numbers will vary
1745 : per pull request. We keep them to surface
1746 : obvious issues like 100% miss rate. */
1747 0 : glob->metrics.handle_pull_req_bloom_filter_result[ FD_METRICS_ENUM_PULL_REQ_BLOOM_FILTER_RESULT_V_HIT_IDX ] += hits;
1748 0 : glob->metrics.handle_pull_req_bloom_filter_result[ FD_METRICS_ENUM_PULL_REQ_BLOOM_FILTER_RESULT_V_MISS_IDX ] += misses;
1749 :
1750 : /* Flush final packet */
1751 0 : if (newend > (uchar *)ctx.data) {
1752 0 : ulong sz = (ulong)(newend - buf);
1753 0 : fd_gossip_send_raw(glob, from, buf, sz);
1754 0 : glob->metrics.send_message[ FD_METRICS_ENUM_GOSSIP_MESSAGE_V_PULL_RESPONSE_IDX ]++;
1755 0 : FD_LOG_DEBUG(("sent msg type %u to " GOSSIP_ADDR_FMT " size=%lu", gmsg.discriminant, GOSSIP_ADDR_FMT_ARGS( *from ), sz));
1756 0 : ++npackets;
1757 0 : }
1758 :
1759 0 : if (misses) {
1760 0 : FD_LOG_DEBUG(("responded to pull request with %lu values in %u packets (%lu filtered out)", misses, npackets, hits));
1761 0 : }
1762 0 : glob->metrics.handle_pull_req_npackets = npackets;
1763 0 : #undef INC_HANDLE_PULL_REQ_FAIL_METRIC
1764 0 : }
1765 :
1766 :
1767 :
1768 : /* Handle any gossip message */
1769 : static void
1770 0 : fd_gossip_recv(fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_gossip_msg_t * gmsg) {
1771 0 : if ( FD_LIKELY( gmsg->discriminant < FD_METRICS_COUNTER_GOSSIP_RECEIVED_GOSSIP_MESSAGES_CNT ) ) {
1772 0 : glob->metrics.recv_message[gmsg->discriminant] += 1UL;
1773 0 : } else {
1774 0 : glob->metrics.recv_unknown_message += 1UL;
1775 0 : }
1776 0 : switch (gmsg->discriminant) {
1777 0 : case fd_gossip_msg_enum_pull_req:
1778 0 : fd_gossip_handle_pull_req(glob, from, &gmsg->inner.pull_req);
1779 0 : break;
1780 0 : case fd_gossip_msg_enum_pull_resp: {
1781 0 : fd_gossip_pull_resp_t * pull_resp = &gmsg->inner.pull_resp;
1782 0 : fd_gossip_recv_crds_array( glob, NULL, pull_resp->crds, pull_resp->crds_len, FD_GOSSIP_CRDS_ROUTE_PULL_RESP );
1783 0 : break;
1784 0 : }
1785 0 : case fd_gossip_msg_enum_push_msg: {
1786 0 : fd_gossip_push_msg_t * push_msg = &gmsg->inner.push_msg;
1787 0 : fd_gossip_recv_crds_array( glob, from, push_msg->crds, push_msg->crds_len, FD_GOSSIP_CRDS_ROUTE_PUSH );
1788 0 : break;
1789 0 : }
1790 0 : case fd_gossip_msg_enum_prune_msg:
1791 0 : fd_gossip_handle_prune(glob, from, &gmsg->inner.prune_msg);
1792 0 : break;
1793 0 : case fd_gossip_msg_enum_ping:
1794 0 : fd_gossip_handle_ping(glob, from, &gmsg->inner.ping);
1795 0 : break;
1796 0 : case fd_gossip_msg_enum_pong:
1797 0 : fd_gossip_handle_pong(glob, from, &gmsg->inner.pong);
1798 0 : break;
1799 0 : }
1800 0 : }
1801 :
1802 : /* Initiate connection to a peer */
1803 : int
1804 : fd_gossip_add_active_peer( fd_gossip_t * glob,
1805 0 : fd_gossip_peer_addr_t const * addr ) {
1806 0 : fd_gossip_lock( glob );
1807 0 : fd_active_elem_t * val = fd_active_table_query( glob->actives, addr, NULL );
1808 0 : if( val==NULL ) {
1809 0 : if( fd_active_table_is_full( glob->actives )) {
1810 0 : FD_LOG_WARNING(( "too many actives" ));
1811 0 : fd_gossip_unlock( glob );
1812 0 : return -1;
1813 0 : }
1814 0 : val = fd_active_table_insert( glob->actives, addr );
1815 0 : fd_active_new_value( val );
1816 0 : val->pingcount = 0; /* Incremented in fd_gossip_make_ping */
1817 0 : }
1818 0 : fd_gossip_unlock( glob );
1819 0 : return 0;
1820 0 : }
1821 :
1822 : /* Improve the set of active push states */
1823 : static void
1824 0 : fd_gossip_refresh_push_states( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
1825 0 : (void)arg;
1826 :
1827 : /* Try again in 20 sec */
1828 0 : fd_gossip_add_pending( glob, fd_gossip_refresh_push_states, fd_pending_event_arg_null(), glob->now + (long)20e9 );
1829 :
1830 : /* Delete states which no longer have active peers */
1831 0 : for (ulong i = 0; i < glob->push_states_cnt; ++i) {
1832 0 : fd_push_state_t* s = glob->push_states[i];
1833 0 : if (fd_active_table_query(glob->actives, &s->addr, NULL) == NULL) {
1834 0 : fd_push_states_pool_ele_release(glob->push_states_pool, glob->push_states[i]);
1835 : /* Replace with the one at the end */
1836 0 : glob->push_states[i--] = glob->push_states[--(glob->push_states_cnt)];
1837 0 : }
1838 0 : }
1839 0 : if (glob->push_states_cnt == FD_PUSH_LIST_MAX) {
1840 : /* Delete the worst destination based prune count */
1841 0 : fd_push_state_t * worst_s = glob->push_states[0];
1842 0 : ulong worst_i = 0;
1843 0 : for (ulong i = 1; i < glob->push_states_cnt; ++i) {
1844 0 : fd_push_state_t* s = glob->push_states[i];
1845 0 : if (s->drop_cnt > worst_s->drop_cnt) {
1846 0 : worst_s = s;
1847 0 : worst_i = i;
1848 0 : }
1849 0 : }
1850 0 : fd_push_states_pool_ele_release(glob->push_states_pool, worst_s);
1851 : /* Replace with the one at the end */
1852 0 : glob->push_states[worst_i] = glob->push_states[--(glob->push_states_cnt)];
1853 0 : }
1854 :
1855 : /* Add random actives as new pushers */
1856 0 : int failcnt = 0;
1857 0 : while (glob->push_states_cnt < FD_PUSH_LIST_MAX && failcnt < 5) {
1858 0 : fd_active_elem_t * a = fd_gossip_random_active( glob );
1859 0 : if( a == NULL ) break;
1860 :
1861 0 : for (ulong i = 0; i < glob->push_states_cnt; ++i) {
1862 0 : fd_push_state_t* s = glob->push_states[i];
1863 0 : if (fd_gossip_peer_addr_eq(&s->addr, &a->key))
1864 0 : goto skipadd;
1865 0 : }
1866 0 : failcnt = 0;
1867 :
1868 : /* Build the pusher state */
1869 0 : fd_push_state_t * s = fd_push_states_pool_ele_acquire(glob->push_states_pool);
1870 0 : fd_memset(s, 0, sizeof(fd_push_state_t));
1871 0 : fd_gossip_peer_addr_copy(&s->addr, &a->key);
1872 0 : s->id = a->id;
1873 0 : for (ulong j = 0; j < FD_PRUNE_NUM_KEYS; ++j)
1874 0 : s->prune_keys[j] = fd_rng_ulong(glob->rng);
1875 :
1876 : /* Encode an empty push msg template */
1877 0 : fd_gossip_msg_t gmsg[1] = {0};
1878 0 : fd_gossip_msg_new_disc(gmsg, fd_gossip_msg_enum_push_msg);
1879 0 : fd_gossip_push_msg_t * push_msg = &gmsg->inner.push_msg;
1880 0 : push_msg->pubkey = *glob->public_key;
1881 0 : fd_bincode_encode_ctx_t ctx;
1882 0 : ctx.data = s->packet;
1883 0 : ctx.dataend = s->packet + PACKET_DATA_SIZE;
1884 0 : if ( fd_gossip_msg_encode( gmsg, &ctx ) ) {
1885 0 : FD_LOG_ERR(("fd_gossip_msg_encode failed"));
1886 0 : return;
1887 0 : }
1888 0 : s->packet_end_init = s->packet_end = (uchar *)ctx.data;
1889 :
1890 0 : glob->push_states[glob->push_states_cnt++] = s;
1891 0 : break;
1892 :
1893 0 : skipadd:
1894 0 : ++failcnt;
1895 0 : }
1896 :
1897 0 : glob->metrics.active_push_destinations = glob->push_states_cnt;
1898 0 : glob->metrics.refresh_push_states_failcnt = (ulong)failcnt;
1899 0 : }
1900 :
1901 : /* Push the latest values */
1902 : static void
1903 0 : fd_gossip_push( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
1904 0 : (void)arg;
1905 :
1906 : /* Try again in 100 msec */
1907 0 : fd_gossip_add_pending( glob, fd_gossip_push, fd_pending_event_arg_null(), glob->now + (long)1e8 );
1908 :
1909 : /* Push an updated version of my contact info into values */
1910 0 : fd_gossip_push_updated_contact(glob);
1911 :
1912 0 : ulong pending_values_cnt = fd_value_vec_cnt( glob->values ) - glob->need_push_head;
1913 0 : ulong need_push_cnt = fd_ulong_if( pending_values_cnt < FD_NEED_PUSH_MAX, pending_values_cnt, FD_NEED_PUSH_MAX );
1914 :
1915 : /* Iterate across recent values */
1916 0 : ulong expire = FD_NANOSEC_TO_MILLI( glob->now ) - FD_GOSSIP_PULL_TIMEOUT;
1917 0 : while( need_push_cnt>0 ) {
1918 0 : fd_value_t * msg = &glob->values[ glob->need_push_head++ ];
1919 0 : need_push_cnt--;
1920 :
1921 0 : if( msg->wallclock<expire )
1922 0 : continue;
1923 :
1924 : /* Iterate across push states */
1925 0 : ulong npush = 0;
1926 0 : for( ulong i = 0; i<glob->push_states_cnt && npush<FD_PUSH_VALUE_MAX; ++i ) {
1927 0 : fd_push_state_t* s = glob->push_states[i];
1928 :
1929 : /* Apply the pruning bloom filter */
1930 0 : int pass = 0;
1931 0 : for( ulong j = 0; j<FD_PRUNE_NUM_KEYS; ++j ) {
1932 0 : ulong pos = fd_gossip_bloom_pos( &msg->origin, s->prune_keys[j], FD_PRUNE_NUM_BITS );
1933 0 : ulong * j = s->prune_bits + (pos>>6U); /* divide by 64 */
1934 0 : ulong bit = 1UL<<(pos & 63U);
1935 0 : if( !(*j & bit) ) {
1936 0 : pass = 1;
1937 0 : break;
1938 0 : }
1939 0 : }
1940 0 : if( !pass ) {
1941 0 : s->drop_cnt++;
1942 0 : glob->not_push_cnt++;
1943 0 : continue;
1944 0 : }
1945 0 : glob->push_cnt++;
1946 0 : npush++;
1947 0 : glob->metrics.push_crds[ (uint)msg->data[sizeof(fd_signature_t)] ]++; /* discriminant */
1948 :
1949 0 : ulong * crds_len = (ulong *)(s->packet_end_init - sizeof(ulong));
1950 : /* Add the value in already encoded form */
1951 0 : if( (s->packet_end + msg->datalen - s->packet)>PACKET_DATA_SIZE ) {
1952 : /* Packet is getting too large. Flush it */
1953 0 : ulong sz = (ulong)(s->packet_end - s->packet);
1954 0 : glob->metrics.send_message[ FD_METRICS_ENUM_GOSSIP_MESSAGE_V_PUSH_IDX ]++;
1955 0 : fd_gossip_send_raw( glob, &s->addr, s->packet, sz );
1956 0 : FD_LOG_DEBUG(("push to " GOSSIP_ADDR_FMT " size=%lu", GOSSIP_ADDR_FMT_ARGS( s->addr ), sz));
1957 0 : s->packet_end = s->packet_end_init;
1958 0 : *crds_len = 0;
1959 0 : }
1960 0 : fd_memcpy( s->packet_end, msg->data, msg->datalen );
1961 0 : s->packet_end += msg->datalen;
1962 0 : (*crds_len)++;
1963 0 : }
1964 0 : }
1965 :
1966 : /* Flush partially full packets */
1967 0 : for( ulong i = 0; i < glob->push_states_cnt; ++i ) {
1968 0 : fd_push_state_t* s = glob->push_states[i];
1969 0 : if ( s->packet_end != s->packet_end_init ) {
1970 0 : ulong * crds_len = (ulong *)(s->packet_end_init - sizeof(ulong));
1971 0 : ulong sz = (ulong)(s->packet_end - s->packet);
1972 0 : fd_gossip_send_raw( glob, &s->addr, s->packet, sz );
1973 0 : glob->metrics.send_message[ FD_METRICS_ENUM_GOSSIP_MESSAGE_V_PUSH_IDX ]++;
1974 0 : FD_LOG_DEBUG(( "push to " GOSSIP_ADDR_FMT " size=%lu", GOSSIP_ADDR_FMT_ARGS( s->addr ), sz ));
1975 0 : s->packet_end = s->packet_end_init;
1976 0 : *crds_len = 0;
1977 0 : }
1978 0 : }
1979 0 : }
1980 :
1981 : /* Publish an outgoing value. The source id and wallclock are set by this function */
1982 : static int
1983 0 : fd_gossip_push_value_nolock( fd_gossip_t * glob, fd_crds_data_t * data, fd_hash_t * key_opt ) {
1984 0 : #define INC_PUSH_CRDS_DROP_METRIC( REASON ) \
1985 0 : glob->metrics.push_crds_drop_reason[ CRDS_DROP_REASON_IDX( REASON ) ] += 1UL
1986 :
1987 0 : if( FD_UNLIKELY( data->discriminant >= FD_KNOWN_CRDS_ENUM_MAX ) ) {
1988 0 : INC_PUSH_CRDS_DROP_METRIC( UNKNOWN_DISCRIMINANT );
1989 0 : return -1;
1990 0 : }
1991 :
1992 : /* Wrap the data in a value stub. Sign it. */
1993 0 : fd_crds_value_t crd;
1994 0 : crd.data = *data;
1995 0 : fd_gossip_sign_crds_value(glob, &crd);
1996 :
1997 0 : if( fd_value_meta_map_is_full( glob->value_metas ) || fd_value_vec_is_full( glob->values ) ) {
1998 0 : INC_PUSH_CRDS_DROP_METRIC( TABLE_FULL );
1999 0 : FD_LOG_DEBUG(("too many values"));
2000 0 : return -1;
2001 0 : }
2002 0 : fd_value_t * val = fd_value_vec_expand( glob->values, 1UL );
2003 0 : int drop_reason_idx = fd_value_from_crds( val, &crd );
2004 0 : if( FD_UNLIKELY( drop_reason_idx ) ) {
2005 0 : glob->metrics.push_crds_drop_reason[ drop_reason_idx ] += 1UL;
2006 0 : return -1;
2007 0 : }
2008 :
2009 0 : if( key_opt!=NULL ) *key_opt = val->key;
2010 :
2011 : /* Store the value for later pushing/duplicate detection */
2012 0 : fd_value_meta_t * ele = fd_value_meta_map_query( glob->value_metas, &val->key, NULL );
2013 0 : if( ele != NULL ) {
2014 : /* Already have this value, which is strange!
2015 : NOTE: This is a different list from duplicate crds values received
2016 : from the network (see metrics.recv_crds_duplicate_message).
2017 : Reaching this path implies a crds value generated internally was
2018 : detected as a duplicate. */
2019 0 : glob->metrics.push_crds_duplicate[ data->discriminant ] += 1UL;
2020 0 : fd_value_vec_contract( glob->values, 1UL );
2021 0 : return -1;
2022 0 : }
2023 :
2024 0 : ele = fd_value_meta_map_insert( glob->value_metas, &val->key );
2025 :
2026 0 : fd_value_meta_map_value_init( ele,
2027 0 : val->wallclock,
2028 0 : val );
2029 :
2030 0 : glob->metrics.push_crds_queue_cnt = fd_value_vec_cnt( glob->values ) - glob->need_push_head;
2031 0 : glob->metrics.push_crds[ data->discriminant ] += 1UL;
2032 0 : return 0;
2033 :
2034 0 : #undef INC_PUSH_CRDS_DROP_METRIC
2035 0 : }
2036 :
2037 : int
2038 0 : fd_gossip_push_value( fd_gossip_t * glob, fd_crds_data_t * data, fd_hash_t * key_opt ) {
2039 0 : fd_gossip_lock( glob );
2040 0 : int rc = fd_gossip_push_value_nolock( glob, data, key_opt );
2041 0 : fd_gossip_unlock( glob );
2042 0 : return rc;
2043 0 : }
2044 :
2045 : /* Periodically make prune messages */
2046 : static void
2047 0 : fd_gossip_make_prune( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
2048 0 : (void)arg;
2049 :
2050 : /* Try again in 30 sec */
2051 0 : fd_gossip_add_pending( glob, fd_gossip_make_prune, fd_pending_event_arg_null(), glob->now + (long)30e9 );
2052 :
2053 0 : long expire = glob->now - (long)FD_GOSSIP_VALUE_EXPIRE*((long)1e6);
2054 0 : for( fd_stats_table_iter_t iter = fd_stats_table_iter_init( glob->stats );
2055 0 : !fd_stats_table_iter_done( glob->stats, iter );
2056 0 : iter = fd_stats_table_iter_next( glob->stats, iter ) ) {
2057 0 : fd_stats_elem_t * ele = fd_stats_table_iter_ele( glob->stats, iter );
2058 0 : if (ele->last < expire) {
2059 : /* Entry hasn't been updated for a long time */
2060 0 : glob->metrics.make_prune_stale_entry += 1UL;
2061 0 : fd_stats_table_remove( glob->stats, &ele->key );
2062 0 : continue;
2063 0 : }
2064 : /* Look for high duplicate counts */
2065 0 : fd_pubkey_t origins[MAX_DUP_ORIGINS];
2066 0 : ulong origins_cnt = 0;
2067 0 : for (ulong i = 0; i < ele->dups_cnt; ++i) {
2068 0 : if (ele->dups[i].cnt >= 20U) {
2069 0 : origins[origins_cnt++] = ele->dups[i].origin;
2070 0 : glob->metrics.make_prune_high_duplicates += 1UL;
2071 0 : }
2072 0 : }
2073 0 : glob->metrics.make_prune_requested_origins = origins_cnt;
2074 0 : if (origins_cnt == 0U)
2075 0 : continue;
2076 : /* Get the peer id */
2077 0 : fd_peer_elem_t * peerval = fd_peer_table_query(glob->peers, &ele->key, NULL);
2078 : /* Always clean up to restart the dup counter */
2079 0 : fd_stats_table_remove( glob->stats, &ele->key );
2080 0 : if (peerval == NULL)
2081 0 : continue;
2082 :
2083 0 : char keystr[ FD_BASE58_ENCODED_32_SZ ];
2084 0 : fd_base58_encode_32( peerval->id.uc, NULL, keystr );
2085 0 : FD_LOG_DEBUG(("sending prune request for %lu origins to %s", origins_cnt, keystr));
2086 :
2087 : /* Make a prune request */
2088 0 : fd_gossip_msg_t gmsg;
2089 0 : fd_gossip_msg_new_disc(&gmsg, fd_gossip_msg_enum_prune_msg);
2090 0 : fd_gossip_prune_msg_t * prune_msg = &gmsg.inner.prune_msg;
2091 0 : prune_msg->data.pubkey = *glob->public_key;
2092 0 : prune_msg->data.prunes_len = origins_cnt;
2093 0 : prune_msg->data.prunes = origins;
2094 0 : prune_msg->data.destination = peerval->id;
2095 0 : ulong wc = prune_msg->data.wallclock = FD_NANOSEC_TO_MILLI(glob->now);
2096 :
2097 0 : fd_gossip_prune_sign_data_t signdata;
2098 0 : signdata.pubkey = *glob->public_key;
2099 0 : signdata.prunes_len = origins_cnt;
2100 0 : signdata.prunes = origins;
2101 0 : signdata.destination = peerval->id;
2102 0 : signdata.wallclock = wc;
2103 :
2104 0 : uchar buf[PACKET_DATA_SIZE];
2105 0 : fd_bincode_encode_ctx_t ctx;
2106 0 : ctx.data = buf;
2107 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
2108 0 : if ( fd_gossip_prune_sign_data_encode( &signdata, &ctx ) ) {
2109 0 : glob->metrics.make_prune_sign_data_encode_failed += 1UL;
2110 0 : FD_LOG_ERR(("fd_gossip_prune_sign_data_encode failed"));
2111 0 : return;
2112 0 : }
2113 :
2114 0 : (*glob->sign_fun)( glob->sign_arg, prune_msg->data.signature.uc, buf, (ulong)((uchar*)ctx.data - buf), FD_KEYGUARD_SIGN_TYPE_ED25519 );
2115 :
2116 0 : fd_gossip_send(glob, &peerval->key, &gmsg);
2117 0 : }
2118 0 : }
2119 :
2120 : /* Periodically log status. Removes old peers as a side event. */
2121 : static void
2122 0 : fd_gossip_log_stats( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
2123 0 : (void)arg;
2124 :
2125 : /* Try again in 60 sec */
2126 0 : fd_gossip_add_pending( glob, fd_gossip_log_stats, fd_pending_event_arg_null(), glob->now + (long)60e9 );
2127 :
2128 0 : if( glob->recv_pkt_cnt == 0 )
2129 0 : FD_LOG_WARNING(("received no gossip packets!!"));
2130 0 : else
2131 0 : FD_LOG_INFO(("received %lu packets", glob->recv_pkt_cnt));
2132 :
2133 : /* TODO: Come up with a better way to detect bad shred version */
2134 0 : if( fd_peer_table_key_cnt( glob->peers )!=0 &&
2135 0 : ( glob->metrics.recv_message[ FD_METRICS_ENUM_GOSSIP_MESSAGE_V_PULL_RESPONSE_IDX ]==0 ||
2136 0 : glob->metrics.recv_message[ FD_METRICS_ENUM_GOSSIP_MESSAGE_V_PUSH_IDX ]==0 ) ) {
2137 0 : FD_LOG_WARNING(( "received no CRDS traffic! Likely bad shred version (current: %u)", glob->my_contact.ci_crd.shred_version ));
2138 0 : }
2139 :
2140 0 : glob->recv_pkt_cnt = 0;
2141 0 : FD_LOG_INFO(("received %lu dup values and %lu new", glob->recv_dup_cnt, glob->recv_nondup_cnt));
2142 0 : glob->recv_dup_cnt = glob->recv_nondup_cnt = 0;
2143 0 : FD_LOG_INFO(("pushed %lu values and filtered %lu", glob->push_cnt, glob->not_push_cnt));
2144 0 : glob->push_cnt = glob->not_push_cnt = 0;
2145 :
2146 0 : for( ulong i = 0UL; i<FD_KNOWN_CRDS_ENUM_MAX; i++ ) {
2147 0 : FD_LOG_INFO(( "received values - type: %2lu, total: %12lu, dups: %12lu, bytes: %12lu", i, glob->msg_stats[i].total_cnt, glob->msg_stats[i].dups_cnt, glob->msg_stats[i].bytes_rx_cnt ));
2148 0 : }
2149 :
2150 0 : int need_inactive = (glob->inactives_cnt == 0);
2151 :
2152 0 : ulong wc = FD_NANOSEC_TO_MILLI(glob->now);
2153 0 : ulong expire = wc - 4U*FD_GOSSIP_VALUE_EXPIRE;
2154 0 : for( fd_peer_table_iter_t iter = fd_peer_table_iter_init( glob->peers );
2155 0 : !fd_peer_table_iter_done( glob->peers, iter );
2156 0 : iter = fd_peer_table_iter_next( glob->peers, iter ) ) {
2157 0 : fd_peer_elem_t * ele = fd_peer_table_iter_ele( glob->peers, iter );
2158 0 : if (ele->wallclock < expire) {
2159 : /* Peer hasn't been updated for a long time */
2160 0 : fd_peer_table_remove( glob->peers, &ele->key );
2161 0 : continue;
2162 0 : }
2163 0 : fd_active_elem_t * act = fd_active_table_query(glob->actives, &ele->key, NULL);
2164 0 : char keystr[ FD_BASE58_ENCODED_32_SZ ];
2165 0 : fd_base58_encode_32( ele->id.uc, NULL, keystr );
2166 0 : FD_LOG_DEBUG(( "peer at " GOSSIP_ADDR_FMT " id %s age %.3f %s",
2167 0 : GOSSIP_ADDR_FMT_ARGS( ele->key ),
2168 0 : keystr,
2169 0 : ((double)(wc - ele->wallclock))*0.001,
2170 0 : ((act != NULL && act->pongtime != 0) ? "(active)" : "")));
2171 0 : if (need_inactive && act == NULL && glob->inactives_cnt < INACTIVES_MAX)
2172 0 : fd_gossip_peer_addr_copy(glob->inactives + (glob->inactives_cnt++), &ele->key);
2173 0 : }
2174 :
2175 :
2176 0 : glob->metrics.gossip_peer_cnt[ FD_METRICS_ENUM_GOSSIP_PEER_STATE_V_TOTAL_IDX ] = fd_peer_table_key_cnt( glob->peers );
2177 0 : glob->metrics.gossip_peer_cnt[ FD_METRICS_ENUM_GOSSIP_PEER_STATE_V_ACTIVE_IDX ] = fd_active_table_key_cnt( glob->actives );
2178 0 : glob->metrics.gossip_peer_cnt[ FD_METRICS_ENUM_GOSSIP_PEER_STATE_V_INACTIVE_IDX ] = (ulong)glob->inactives_cnt;
2179 0 : }
2180 :
2181 : /* Set the current protocol time in nanosecs */
2182 : void
2183 0 : fd_gossip_settime( fd_gossip_t * glob, long ts ) {
2184 0 : glob->now = ts;
2185 0 : }
2186 :
2187 : /* Get the current protocol time in nanosecs */
2188 : long
2189 0 : fd_gossip_gettime( fd_gossip_t * glob ) {
2190 0 : return glob->now;
2191 0 : }
2192 :
2193 : /* Single pass values vector compaction. This
2194 : preserves ordering, which means the push queue
2195 : is unbroken. It is also performed in place which
2196 : means the values vector, which takes up a few GBs
2197 : on its own, does not need to be reallocated.
2198 :
2199 : TODO: This has high runtime complexity, but is only run
2200 : once every 15 seconds. We can simplify/improve on this by
2201 : breaking up pushed and un-pushed values into two vectors
2202 : (with the full vector being the concat of both) so that
2203 : order preservation is not needed. This lets us use a more
2204 : efficient cleanup (i.e,. swap in place) while also
2205 : simplifying the push queue management.
2206 :
2207 : See fd_gossip_cleanup_values for an example */
2208 : static ulong
2209 0 : fd_gossip_compact_values( fd_gossip_t * glob ) {
2210 0 : fd_value_t * vec = glob->values;
2211 :
2212 0 : ulong start = 0;
2213 0 : ulong cur_count = fd_value_vec_cnt( vec );
2214 : /* find first element to delete */
2215 0 : for( ; start<cur_count ; start++ ) {
2216 0 : if( FD_UNLIKELY( vec[start].del ) ) break;
2217 0 : }
2218 :
2219 0 : ulong next = start + 1;
2220 :
2221 0 : ulong num_deleted = 0UL;
2222 0 : ulong push_head_snapshot = ULONG_MAX;
2223 :
2224 0 : while( next<cur_count ) {
2225 0 : if( FD_UNLIKELY( vec[next].del ) ) {
2226 0 : if( next>(start + 1) ) {
2227 : /* move all values between start and next in place */
2228 0 : memmove( &vec[start - num_deleted],
2229 0 : &vec[start + 1],
2230 0 : (next - start - 1) * sizeof(fd_value_t) );
2231 0 : }
2232 0 : start = next;
2233 0 : next = start + 1;
2234 0 : num_deleted++;
2235 : /* Need to adjust push queue */
2236 0 : if( FD_UNLIKELY( glob->need_push_head > start &&
2237 0 : glob->need_push_head <= next ) ) {
2238 0 : push_head_snapshot = num_deleted;
2239 0 : }
2240 0 : } else {
2241 0 : next++;
2242 0 : }
2243 0 : }
2244 :
2245 0 : glob->need_push_head -= fd_ulong_if( push_head_snapshot != ULONG_MAX, push_head_snapshot, num_deleted );
2246 0 : fd_value_vec_contract( glob->values, num_deleted );
2247 0 : glob->metrics.value_vec_cnt = fd_value_vec_cnt( glob->values );
2248 0 : FD_LOG_INFO(( "GOSSIP compacted %lu values", num_deleted ));
2249 0 : return num_deleted;
2250 0 : }
2251 :
2252 : /* Implements a two-stage cleanup:
2253 : 1. Iterate through metas map and find elements to delete
2254 : based on conditions (currently determined by expiry
2255 : window). If entry has a corresponding value entry
2256 : mark it for deletion based on conditions*. An entry
2257 : might have already been marked for deletion before,
2258 : but this is not a problem.
2259 :
2260 : 2. Iterate through values vector and remove
2261 : entries marked for deletion in the first stage.
2262 : This is done to preserve ordering of the values vector.
2263 : See fd_gossip_compact_values for more details.
2264 :
2265 :
2266 :
2267 : * TODO: In the current implementation, the conditions for
2268 : removing a meta entry and a value are the same, but
2269 : they can differ. Ideally values should be more
2270 : aggressively cleaned up as they are only needed for
2271 : push messages and processing. Will come in a separate PR. */
2272 :
2273 : static void
2274 : fd_gossip_cleanup_values( fd_gossip_t * glob,
2275 0 : fd_pending_event_arg_t * arg FD_PARAM_UNUSED ) {
2276 0 : fd_gossip_add_pending( glob, fd_gossip_cleanup_values, fd_pending_event_arg_null(), glob->now + (long)15e9 );
2277 :
2278 0 : ulong value_expire = FD_NANOSEC_TO_MILLI(glob->now) - FD_GOSSIP_VALUE_EXPIRE;
2279 0 : for( fd_value_meta_map_iter_t iter = fd_value_meta_map_iter_init( glob->value_metas );
2280 0 : !fd_value_meta_map_iter_done( glob->value_metas, iter );
2281 0 : iter = fd_value_meta_map_iter_next( glob->value_metas, iter ) ) {
2282 0 : fd_value_meta_t * ele = fd_value_meta_map_iter_ele( glob->value_metas, iter );
2283 0 : if ( ele->wallclock<value_expire ) {
2284 : /* This value has expired, mark it for deletion in the value vector and remove from map */
2285 0 : if( ele->value!=NULL ){
2286 0 : ele->value->del = 1UL;
2287 0 : }
2288 0 : fd_value_meta_map_remove( glob->value_metas, &ele->key ); /* Remove from the value set */
2289 0 : }
2290 0 : }
2291 :
2292 0 : fd_gossip_compact_values( glob );
2293 0 : glob->metrics.value_meta_cnt = fd_value_meta_map_key_cnt( glob->value_metas );
2294 0 : glob->metrics.value_vec_cnt = fd_value_vec_cnt( glob->values );
2295 0 : }
2296 :
2297 : /* Start timed events and other protocol behavior */
2298 : int
2299 0 : fd_gossip_start( fd_gossip_t * glob ) {
2300 0 : fd_gossip_lock( glob );
2301 0 : fd_gossip_add_pending( glob, fd_gossip_random_pull, fd_pending_event_arg_null(), glob->now + (long) 2e9 );
2302 0 : fd_gossip_add_pending( glob, fd_gossip_random_ping, fd_pending_event_arg_null(), glob->now + (long) 1e9 );
2303 0 : fd_gossip_add_pending( glob, fd_gossip_log_stats, fd_pending_event_arg_null(), glob->now + (long) 60e9 );
2304 0 : fd_gossip_add_pending( glob, fd_gossip_refresh_push_states, fd_pending_event_arg_null(), glob->now + (long) 20e9 );
2305 0 : fd_gossip_add_pending( glob, fd_gossip_push, fd_pending_event_arg_null(), glob->now + (long) 1e8 );
2306 0 : fd_gossip_add_pending( glob, fd_gossip_make_prune, fd_pending_event_arg_null(), glob->now + (long) 30e9 );
2307 0 : fd_gossip_add_pending( glob, fd_gossip_cleanup_values, fd_pending_event_arg_null(), glob->now + (long) 15e9 );
2308 0 : fd_gossip_unlock( glob );
2309 0 : return 0;
2310 0 : }
2311 :
2312 : /* Dispatch timed events and other protocol behavior. This should be
2313 : * called inside the main spin loop. */
2314 : int
2315 0 : fd_gossip_continue( fd_gossip_t * glob ) {
2316 0 : fd_gossip_lock( glob );
2317 0 : fd_pending_event_t * events = glob->event_heap;
2318 0 : while( fd_pending_heap_cnt( events ) ) {
2319 0 : if( events[0].timeout > glob->now ) break;
2320 0 : (events[0].fun)( glob, &events[0].fun_arg );
2321 0 : fd_pending_heap_remove_min( events );
2322 0 : }
2323 0 : fd_gossip_unlock( glob );
2324 0 : return 0;
2325 0 : }
2326 :
2327 : /* Pass a raw gossip packet into the protocol. msg_name is the unix socket address of the sender */
2328 : int
2329 0 : fd_gossip_recv_packet( fd_gossip_t * glob, uchar const * msg, ulong msglen, fd_gossip_peer_addr_t const * from ) {
2330 0 : fd_gossip_lock( glob );
2331 0 : FD_SPAD_FRAME_BEGIN( glob->decode_spad ) {
2332 0 : glob->recv_pkt_cnt++;
2333 0 : glob->metrics.recv_pkt_cnt++;
2334 :
2335 0 : ulong decoded_sz;
2336 0 : fd_gossip_msg_t * gmsg = fd_bincode_decode1_spad(
2337 0 : gossip_msg,
2338 0 : glob->decode_spad,
2339 0 : msg, msglen,
2340 0 : NULL,
2341 0 : &decoded_sz );
2342 0 : if( FD_UNLIKELY( !gmsg ) ) {
2343 0 : glob->metrics.recv_pkt_corrupted_msg += 1UL;
2344 0 : FD_LOG_WARNING(( "corrupt gossip message" ));
2345 0 : fd_gossip_unlock( glob );
2346 0 : return -1;
2347 0 : }
2348 :
2349 0 : if( FD_UNLIKELY( decoded_sz != msglen ) ) {
2350 0 : glob->metrics.recv_pkt_corrupted_msg += 1UL;
2351 0 : FD_LOG_WARNING(( "corrupt gossip message" ));
2352 0 : fd_gossip_unlock( glob );
2353 0 : return -1;
2354 0 : }
2355 :
2356 0 : FD_LOG_DEBUG(( "recv msg type %u from " GOSSIP_ADDR_FMT,
2357 0 : gmsg->discriminant, GOSSIP_ADDR_FMT_ARGS( *from ) ));
2358 0 : fd_gossip_recv( glob, from, gmsg );
2359 :
2360 0 : fd_gossip_unlock( glob );
2361 0 : } FD_SPAD_FRAME_END;
2362 0 : return 0;
2363 0 : }
2364 :
2365 :
2366 : void
2367 : fd_gossip_set_stake_weights( fd_gossip_t * gossip,
2368 : fd_stake_weight_t const * stake_weights,
2369 0 : ulong stake_weights_cnt ) {
2370 0 : if( stake_weights == NULL ) {
2371 0 : FD_LOG_ERR(( "stake weights NULL" ));
2372 0 : }
2373 :
2374 0 : if( stake_weights_cnt > MAX_STAKE_WEIGHTS ) {
2375 0 : FD_LOG_ERR(( "num stake weights (%lu) is larger than max allowed stake weights", stake_weights_cnt ));
2376 0 : }
2377 :
2378 0 : fd_gossip_lock( gossip );
2379 :
2380 : /* Clear out the table for new stake weights. */
2381 0 : for ( fd_weights_table_iter_t iter = fd_weights_table_iter_init( gossip->weights );
2382 0 : !fd_weights_table_iter_done( gossip->weights, iter);
2383 0 : iter = fd_weights_table_iter_next( gossip->weights, iter ) ) {
2384 0 : fd_weights_elem_t * e = fd_weights_table_iter_ele( gossip->weights, iter );
2385 0 : fd_weights_table_remove( gossip->weights, &e->key );
2386 0 : }
2387 :
2388 0 : for( ulong i = 0; i < stake_weights_cnt; ++i ) {
2389 0 : if( !stake_weights[i].stake ) continue;
2390 0 : fd_weights_elem_t * val = fd_weights_table_insert( gossip->weights, &stake_weights[i].key );
2391 : // Weight is log2(stake)^2
2392 0 : ulong w = (ulong)fd_ulong_find_msb( stake_weights[i].stake ) + 1;
2393 0 : val->weight = w*w;
2394 0 : }
2395 :
2396 0 : for( fd_active_table_iter_t iter = fd_active_table_iter_init( gossip->actives );
2397 0 : !fd_active_table_iter_done( gossip->actives, iter );
2398 0 : iter = fd_active_table_iter_next( gossip->actives, iter ) ) {
2399 0 : fd_active_elem_t * ele = fd_active_table_iter_ele( gossip->actives, iter );
2400 0 : fd_weights_elem_t const * val = fd_weights_table_query_const( gossip->weights, &ele->id, NULL );
2401 0 : ele->weight = ( val == NULL ? 1UL : val->weight );
2402 0 : }
2403 :
2404 0 : fd_gossip_unlock( gossip );
2405 0 : }
2406 :
2407 : void
2408 : fd_gossip_set_entrypoints( fd_gossip_t * gossip,
2409 : fd_ip4_port_t const * entrypoints,
2410 0 : ulong entrypoints_cnt ) {
2411 0 : gossip->entrypoints_cnt = entrypoints_cnt;
2412 0 : for( ulong i=0UL; i<entrypoints_cnt; i++ ) {
2413 0 : FD_LOG_NOTICE(( "gossip initial peer - addr: " FD_IP4_ADDR_FMT ":%u",
2414 0 : FD_IP4_ADDR_FMT_ARGS( entrypoints[i].addr ), fd_ushort_bswap( entrypoints[i].port ) ));
2415 0 : fd_gossip_add_active_peer( gossip, &entrypoints[i] );
2416 0 : gossip->entrypoints[i] = entrypoints[i];
2417 0 : }
2418 0 : }
2419 :
2420 : uint
2421 0 : fd_gossip_is_allowed_entrypoint( fd_gossip_t * gossip, fd_gossip_peer_addr_t * addr ) {
2422 0 : for( ulong i = 0UL; i<gossip->entrypoints_cnt; i++) {
2423 0 : if (fd_gossip_peer_addr_eq( addr, &gossip->entrypoints[i]) ) return 1;
2424 0 : }
2425 0 : return 0;
2426 0 : }
2427 :
|