Line data Source code
1 : #define _GNU_SOURCE 1
2 : #include "fd_gossip.h"
3 : #include "../../ballet/sha256/fd_sha256.h"
4 : #include "../../ballet/ed25519/fd_ed25519.h"
5 : #include "../../ballet/base58/fd_base58.h"
6 : #include "../../disco/keyguard/fd_keyguard.h"
7 : #include "../../util/net/fd_eth.h"
8 : #include "../../util/rng/fd_rng.h"
9 : #include <string.h>
10 : #include <stdio.h>
11 : #include <math.h>
12 : #include <sys/types.h>
13 : #include <sys/socket.h>
14 : #include <arpa/inet.h>
15 : #include <netinet/in.h>
16 : #include <sys/random.h>
17 :
18 : #pragma GCC diagnostic ignored "-Wstrict-aliasing"
19 :
20 : /* Maximum size of a network packet */
21 0 : #define PACKET_DATA_SIZE 1232
22 : /* How long do we remember values (in millisecs) */
23 0 : #define FD_GOSSIP_VALUE_EXPIRE ((ulong)(60e3)) /* 1 minute */
24 : /* Max age that values can be pushed/pulled (in millisecs) */
25 0 : #define FD_GOSSIP_PULL_TIMEOUT ((ulong)(15e3)) /* 15 seconds */
26 : /* Max number of validators that can be actively pinged */
27 0 : #define FD_ACTIVE_KEY_MAX (1<<8)
28 : /* Max number of values that can be remembered */
29 0 : #define FD_VALUE_KEY_MAX (1<<16)
30 : /* Max number of pending timed events */
31 0 : #define FD_PENDING_MAX (1<<9)
32 : /* Number of bloom filter bits in an outgoing pull request packet */
33 0 : #define FD_BLOOM_NUM_BITS (512U*8U) /* 0.5 Kbyte */
34 : /* Max number of bloom filter keys in an outgoing pull request packet */
35 0 : #define FD_BLOOM_MAX_KEYS 32U
36 : /* Max number of packets in an outgoing pull request batch */
37 0 : #define FD_BLOOM_MAX_PACKETS 32U
38 : /* Number of bloom bits in a push prune filter */
39 0 : #define FD_PRUNE_NUM_BITS (512U*8U) /* 0.5 Kbyte */
40 : /* Number of bloom keys in a push prune filter */
41 0 : #define FD_PRUNE_NUM_KEYS 4U
42 : /* Max number of destinations a single message can be pushed */
43 0 : #define FD_PUSH_VALUE_MAX 9
44 : /* Max number of push destinations that we track */
45 0 : #define FD_PUSH_LIST_MAX 12
46 : /* Max length of queue of values that need pushing */
47 0 : #define FD_NEED_PUSH_MAX (1<<12)
48 : /* Max size of receive statistics table */
49 0 : #define FD_STATS_KEY_MAX (1<<8)
50 : /* Sha256 pre-image size for pings/pongs */
51 0 : #define FD_PING_PRE_IMAGE_SZ (48UL)
52 : /* Number of recognized CRDS enum members */
53 0 : #define FD_KNOWN_CRDS_ENUM_MAX (12UL)
54 :
55 0 : #define FD_NANOSEC_TO_MILLI(_ts_) ((ulong)(_ts_/1000000))
56 :
57 : /* Maximum number of stake weights, mirrors fd_stake_ci */
58 0 : #define MAX_STAKE_WEIGHTS (40200UL)
59 :
60 0 : #define MAX_PEER_PING_COUNT (10000U)
61 :
62 : /* Test if two addresses are equal */
63 0 : static int fd_gossip_peer_addr_eq( const fd_gossip_peer_addr_t * key1, const fd_gossip_peer_addr_t * key2 ) {
64 0 : FD_STATIC_ASSERT(sizeof(fd_gossip_peer_addr_t) == sizeof(ulong),"messed up size");
65 0 : return key1->l == key2->l;
66 0 : }
67 :
68 : /* Hash an address */
69 0 : static ulong fd_gossip_peer_addr_hash( const fd_gossip_peer_addr_t * key, ulong seed ) {
70 0 : FD_STATIC_ASSERT(sizeof(fd_gossip_peer_addr_t) == sizeof(ulong),"messed up size");
71 0 : return (key->l + seed + 7242237688154252699UL)*9540121337UL;
72 0 : }
73 :
74 : /* Efficiently copy an address */
75 0 : static void fd_gossip_peer_addr_copy( fd_gossip_peer_addr_t * keyd, const fd_gossip_peer_addr_t * keys ) {
76 0 : FD_STATIC_ASSERT(sizeof(fd_gossip_peer_addr_t) == sizeof(ulong),"messed up size");
77 0 : keyd->l = keys->l;
78 0 : }
79 :
80 : /* All peers table element. The peers table is all known validator addresses/ids. */
81 : struct fd_peer_elem {
82 : fd_gossip_peer_addr_t key;
83 : ulong next;
84 : fd_pubkey_t id; /* Public indentifier */
85 : ulong wallclock; /* last time we heard about this peer */
86 : ulong stake; /* Staking for this validator. Unimplemented. */
87 : };
88 : /* All peers table */
89 : typedef struct fd_peer_elem fd_peer_elem_t;
90 : #define MAP_NAME fd_peer_table
91 : #define MAP_KEY_T fd_gossip_peer_addr_t
92 0 : #define MAP_KEY_EQ fd_gossip_peer_addr_eq
93 0 : #define MAP_KEY_HASH fd_gossip_peer_addr_hash
94 0 : #define MAP_KEY_COPY fd_gossip_peer_addr_copy
95 0 : #define MAP_T fd_peer_elem_t
96 : #include "../../util/tmpl/fd_map_giant.c"
97 :
98 : /* Active table element. This table is all validators that we are
99 : aggressively pinging for liveness checking. */
100 : struct fd_active_elem {
101 : fd_gossip_peer_addr_t key;
102 : ulong next;
103 : fd_pubkey_t id; /* Public indentifier */
104 : long pingtime; /* Last time we sent a ping */
105 : uint pingcount; /* Number of pings it took to get a pong */
106 : fd_hash_t pingtoken; /* Random data used in ping/pong */
107 : long pongtime; /* Last time we received a pong */
108 : ulong weight; /* Selection weight */
109 : };
110 : /* Active table */
111 : typedef struct fd_active_elem fd_active_elem_t;
112 : #define MAP_NAME fd_active_table
113 : #define MAP_KEY_T fd_gossip_peer_addr_t
114 0 : #define MAP_KEY_EQ fd_gossip_peer_addr_eq
115 0 : #define MAP_KEY_HASH fd_gossip_peer_addr_hash
116 0 : #define MAP_KEY_COPY fd_gossip_peer_addr_copy
117 0 : #define MAP_T fd_active_elem_t
118 : #include "../../util/tmpl/fd_map_giant.c"
119 :
120 : /* Initialize an active table element value */
121 : void
122 0 : fd_active_new_value(fd_active_elem_t * val) {
123 0 : val->pingcount = 1;
124 0 : val->pingtime = val->pongtime = 0;
125 0 : val->weight = 0;
126 0 : fd_memset(val->id.uc, 0, 32U);
127 0 : fd_memset(val->pingtoken.uc, 0, 32U);
128 0 : }
129 :
130 : /* Test if two hash values are equal */
131 0 : int fd_hash_eq( const fd_hash_t * key1, const fd_hash_t * key2 ) {
132 0 : for (ulong i = 0; i < 32U/sizeof(ulong); ++i)
133 0 : if (key1->ul[i] != key2->ul[i])
134 0 : return 0;
135 0 : return 1;
136 0 : }
137 :
138 : /* Hash a hash value */
139 0 : ulong fd_hash_hash( const fd_hash_t * key, ulong seed ) {
140 0 : return key->ul[0] ^ seed;
141 0 : }
142 :
143 : /* Copy a hash value */
144 0 : void fd_hash_copy( fd_hash_t * keyd, const fd_hash_t * keys ) {
145 0 : for (ulong i = 0; i < 32U/sizeof(ulong); ++i)
146 0 : keyd->ul[i] = keys->ul[i];
147 0 : }
148 :
149 : /* Value table element. This table stores all received crds
150 : values. Keyed by the hash of the value data. */
151 : struct fd_value_elem {
152 : fd_hash_t key;
153 : ulong next;
154 : fd_pubkey_t origin; /* Where did this value originate */
155 : ulong wallclock; /* Original timestamp of value in millis */
156 : uchar data[PACKET_DATA_SIZE]; /* Serialized form of value (bincode) including signature */
157 : ulong datalen;
158 : };
159 : /* Value table */
160 : typedef struct fd_value_elem fd_value_elem_t;
161 : #define MAP_NAME fd_value_table
162 : #define MAP_KEY_T fd_hash_t
163 0 : #define MAP_KEY_EQ fd_hash_eq
164 0 : #define MAP_KEY_HASH fd_hash_hash
165 0 : #define MAP_KEY_COPY fd_hash_copy
166 0 : #define MAP_T fd_value_elem_t
167 : #include "../../util/tmpl/fd_map_giant.c"
168 :
169 : /* Weights table element. This table stores the weight for each peer
170 : (determined by stake). */
171 : struct fd_weights_elem {
172 : fd_pubkey_t key;
173 : ulong next;
174 : ulong weight;
175 : };
176 : /* Weights table */
177 : typedef struct fd_weights_elem fd_weights_elem_t;
178 : #define MAP_NAME fd_weights_table
179 : #define MAP_KEY_T fd_hash_t
180 0 : #define MAP_KEY_EQ fd_hash_eq
181 0 : #define MAP_KEY_HASH fd_hash_hash
182 0 : #define MAP_KEY_COPY fd_hash_copy
183 0 : #define MAP_T fd_weights_elem_t
184 : #include "../../util/tmpl/fd_map_giant.c"
185 :
186 : /* Queue of pending timed events, stored as a priority heap */
187 : union fd_pending_event_arg {
188 : fd_gossip_peer_addr_t key;
189 : };
190 : typedef union fd_pending_event_arg fd_pending_event_arg_t;
191 : typedef void (*fd_pending_event_fun)(struct fd_gossip * glob, fd_pending_event_arg_t * arg);
192 : struct fd_pending_event {
193 : ulong left;
194 : ulong right;
195 : long key;
196 : fd_pending_event_fun fun;
197 : fd_pending_event_arg_t fun_arg;
198 : };
199 : typedef struct fd_pending_event fd_pending_event_t;
200 : #define POOL_NAME fd_pending_pool
201 0 : #define POOL_T fd_pending_event_t
202 0 : #define POOL_NEXT left
203 : #include "../../util/tmpl/fd_pool.c"
204 : #define HEAP_NAME fd_pending_heap
205 : #define HEAP_T fd_pending_event_t
206 0 : #define HEAP_LT(e0,e1) (e0->key < e1->key)
207 : #include "../../util/tmpl/fd_heap.c"
208 :
209 : /* Data structure representing an active push destination. There are
210 : only a small number of these. */
211 : struct fd_push_state {
212 : fd_gossip_peer_addr_t addr; /* Destination address */
213 : fd_pubkey_t id; /* Public indentifier */
214 : ulong drop_cnt; /* Number of values dropped due to pruning */
215 : ulong prune_keys[FD_PRUNE_NUM_KEYS]; /* Keys used for bloom filter for pruning */
216 : ulong prune_bits[FD_PRUNE_NUM_BITS/64U]; /* Bits table used for bloom filter for pruning */
217 : uchar packet[PACKET_DATA_SIZE]; /* Partially assembled packet containing a fd_gossip_push_msg_t */
218 : uchar * packet_end_init; /* Initial end of the packet when there are zero values */
219 : uchar * packet_end; /* Current end of the packet including values so far */
220 : ulong next;
221 : };
222 : typedef struct fd_push_state fd_push_state_t;
223 :
224 : #define POOL_NAME fd_push_states_pool
225 0 : #define POOL_T fd_push_state_t
226 : #include "../../util/tmpl/fd_pool.c"
227 :
228 : /* Receive statistics table element. */
229 : struct fd_stats_elem {
230 : fd_gossip_peer_addr_t key; /* Keyed by sender */
231 : ulong next;
232 : long last; /* Timestamp of last update */
233 : /* Duplicate counts by origin */
234 : struct {
235 : fd_pubkey_t origin;
236 : ulong cnt;
237 : } dups[8];
238 : ulong dups_cnt;
239 : };
240 : /* Receive statistics table. */
241 : typedef struct fd_stats_elem fd_stats_elem_t;
242 : #define MAP_NAME fd_stats_table
243 : #define MAP_KEY_T fd_gossip_peer_addr_t
244 0 : #define MAP_KEY_EQ fd_gossip_peer_addr_eq
245 0 : #define MAP_KEY_HASH fd_gossip_peer_addr_hash
246 0 : #define MAP_KEY_COPY fd_gossip_peer_addr_copy
247 0 : #define MAP_T fd_stats_elem_t
248 : #include "../../util/tmpl/fd_map_giant.c"
249 :
250 : struct fd_msg_stats_elem {
251 : ulong bytes_rx_cnt;
252 : ulong total_cnt;
253 : ulong dups_cnt;
254 : };
255 : /* Receive type statistics table. */
256 : typedef struct fd_msg_stats_elem fd_msg_stats_elem_t;
257 :
258 : /* Global data for gossip service */
259 : struct fd_gossip {
260 : /* Concurrency lock */
261 : volatile ulong lock;
262 : /* Current time in nanosecs */
263 : long now;
264 : /* My public/private key */
265 : fd_pubkey_t * public_key;
266 : uchar * private_key;
267 : /* My gossip port address */
268 : fd_gossip_peer_addr_t my_addr;
269 : /* My official contact info in the gossip protocol */
270 : fd_gossip_contact_info_v1_t my_contact_info;
271 : fd_gossip_version_v2_t my_version;
272 : /* Function used to deliver gossip messages to the application */
273 : fd_gossip_data_deliver_fun deliver_fun;
274 : /* Argument to fd_gossip_data_deliver_fun */
275 : void * deliver_arg;
276 : /* Function used to send raw packets on the network */
277 : fd_gossip_send_packet_fun send_fun;
278 : /* Argument to fd_gossip_send_packet_fun */
279 : void * send_arg;
280 : /* Function used to send packets for signing to remote tile */
281 : fd_gossip_sign_fun sign_fun;
282 : /* Argument to fd_gossip_sign_fun */
283 : void * sign_arg;
284 : /* Table of all known validators, keyed by gossip address */
285 : fd_peer_elem_t * peers;
286 : /* Table of validators that we are actively pinging, keyed by gossip address */
287 : fd_active_elem_t * actives;
288 : /* Queue of validators that might be added to actives */
289 : fd_gossip_peer_addr_t * inactives;
290 : ulong inactives_cnt;
291 0 : #define INACTIVES_MAX 1024U
292 : /* Table of crds values that we have received in the last 5 minutes, keys by hash */
293 : fd_value_elem_t * values;
294 : /* The last timestamp hash that we pushed our own contact info */
295 : long last_contact_time;
296 : fd_hash_t last_contact_info_key;
297 : fd_hash_t last_version_key;
298 : fd_hash_t last_contact_info_v2_key;
299 : fd_hash_t last_node_instance_key;
300 :
301 : /* Array of push destinations currently in use */
302 : fd_push_state_t * push_states[FD_PUSH_LIST_MAX];
303 : ulong push_states_cnt;
304 : fd_push_state_t * push_states_pool;
305 : /* Queue of values that need pushing */
306 : fd_hash_t * need_push;
307 : ulong need_push_head;
308 : ulong need_push_cnt;
309 : /* Table of receive statistics */
310 : fd_stats_elem_t * stats;
311 : /* Table of message type stats */
312 : fd_msg_stats_elem_t msg_stats[ FD_KNOWN_CRDS_ENUM_MAX ];
313 : /* Heap/queue of pending timed events */
314 : fd_pending_event_t * event_pool;
315 : fd_pending_heap_t * event_heap;
316 : /* Random number generator */
317 : fd_rng_t rng[1];
318 : /* RNG seed */
319 : ulong seed;
320 : /* Total number of packeets received */
321 : ulong recv_pkt_cnt;
322 : /* Total number of duplicate values received */
323 : ulong recv_dup_cnt;
324 : /* Total number of non-duplicate values received */
325 : ulong recv_nondup_cnt;
326 : /* Count of values pushed */
327 : ulong push_cnt;
328 : /* Count of values not pushed due to pruning */
329 : ulong not_push_cnt;
330 : /* Stake weights */
331 : fd_weights_elem_t * weights;
332 : /* List of added entrypoints at startup */
333 : ulong entrypoints_cnt;
334 : fd_gossip_peer_addr_t entrypoints[16];
335 : };
336 :
337 : ulong
338 0 : fd_gossip_align ( void ) { return 128UL; }
339 :
340 : ulong
341 0 : fd_gossip_footprint( void ) {
342 0 : ulong l = FD_LAYOUT_INIT;
343 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t) );
344 0 : l = FD_LAYOUT_APPEND( l, fd_peer_table_align(), fd_peer_table_footprint(FD_PEER_KEY_MAX) );
345 0 : l = FD_LAYOUT_APPEND( l, fd_active_table_align(), fd_active_table_footprint(FD_ACTIVE_KEY_MAX) );
346 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_gossip_peer_addr_t), INACTIVES_MAX*sizeof(fd_gossip_peer_addr_t) );
347 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_hash_t), FD_NEED_PUSH_MAX*sizeof(fd_hash_t) );
348 0 : l = FD_LAYOUT_APPEND( l, fd_value_table_align(), fd_value_table_footprint(FD_VALUE_KEY_MAX) );
349 0 : l = FD_LAYOUT_APPEND( l, fd_pending_pool_align(), fd_pending_pool_footprint(FD_PENDING_MAX) );
350 0 : l = FD_LAYOUT_APPEND( l, fd_pending_heap_align(), fd_pending_heap_footprint(FD_PENDING_MAX) );
351 0 : l = FD_LAYOUT_APPEND( l, fd_stats_table_align(), fd_stats_table_footprint(FD_STATS_KEY_MAX) );
352 0 : l = FD_LAYOUT_APPEND( l, fd_weights_table_align(), fd_weights_table_footprint(MAX_STAKE_WEIGHTS) );
353 0 : l = FD_LAYOUT_APPEND( l, fd_push_states_pool_align(), fd_push_states_pool_footprint(FD_PUSH_LIST_MAX) );
354 0 : l = FD_LAYOUT_FINI(l, fd_gossip_align());
355 0 : return l;
356 0 : }
357 :
358 : void *
359 0 : fd_gossip_new ( void * shmem, ulong seed ) {
360 0 : FD_SCRATCH_ALLOC_INIT(l, shmem);
361 0 : fd_gossip_t * glob = (fd_gossip_t*)FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t)) ;
362 0 : fd_memset(glob, 0, sizeof(fd_gossip_t));
363 0 : glob->seed = seed;
364 :
365 0 : void * shm = FD_SCRATCH_ALLOC_APPEND(l, fd_peer_table_align(), fd_peer_table_footprint(FD_PEER_KEY_MAX));
366 0 : glob->peers = fd_peer_table_join(fd_peer_table_new(shm, FD_PEER_KEY_MAX, seed));
367 :
368 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_active_table_align(), fd_active_table_footprint(FD_ACTIVE_KEY_MAX));
369 0 : glob->actives = fd_active_table_join(fd_active_table_new(shm, FD_ACTIVE_KEY_MAX, seed));
370 :
371 0 : glob->inactives = (fd_gossip_peer_addr_t*)FD_SCRATCH_ALLOC_APPEND(l, alignof(fd_gossip_peer_addr_t), INACTIVES_MAX*sizeof(fd_gossip_peer_addr_t));
372 0 : glob->need_push = (fd_hash_t*)FD_SCRATCH_ALLOC_APPEND(l, alignof(fd_hash_t), FD_NEED_PUSH_MAX*sizeof(fd_hash_t));
373 :
374 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_value_table_align(), fd_value_table_footprint(FD_VALUE_KEY_MAX));
375 0 : glob->values = fd_value_table_join(fd_value_table_new(shm, FD_VALUE_KEY_MAX, seed));
376 :
377 0 : glob->last_contact_time = 0;
378 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_pending_pool_align(), fd_pending_pool_footprint(FD_PENDING_MAX));
379 0 : glob->event_pool = fd_pending_pool_join(fd_pending_pool_new(shm, FD_PENDING_MAX));
380 :
381 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_pending_heap_align(), fd_pending_heap_footprint(FD_PENDING_MAX));
382 0 : glob->event_heap = fd_pending_heap_join(fd_pending_heap_new(shm, FD_PENDING_MAX));
383 :
384 0 : fd_rng_new(glob->rng, (uint)seed, 0UL);
385 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_stats_table_align(), fd_stats_table_footprint(FD_STATS_KEY_MAX));
386 0 : glob->stats = fd_stats_table_join(fd_stats_table_new(shm, FD_STATS_KEY_MAX, seed));
387 :
388 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_weights_table_align(), fd_weights_table_footprint(MAX_STAKE_WEIGHTS));
389 0 : glob->weights = fd_weights_table_join( fd_weights_table_new( shm, MAX_STAKE_WEIGHTS, seed ) );
390 :
391 0 : shm = FD_SCRATCH_ALLOC_APPEND(l, fd_push_states_pool_align(), fd_push_states_pool_footprint(FD_PUSH_LIST_MAX));
392 0 : glob->push_states_pool = fd_push_states_pool_join( fd_push_states_pool_new( shm, FD_PUSH_LIST_MAX ) );
393 :
394 0 : ulong scratch_top = FD_SCRATCH_ALLOC_FINI(l, 1UL);
395 0 : if ( scratch_top > (ulong)shmem + fd_gossip_footprint() ) {
396 0 : FD_LOG_ERR(("Not enough space allocated for gossip"));
397 0 : }
398 0 : return glob;
399 0 : }
400 :
401 : fd_gossip_t *
402 0 : fd_gossip_join ( void * shmap ) { return (fd_gossip_t *)shmap; }
403 :
404 : void *
405 0 : fd_gossip_leave ( fd_gossip_t * join ) { return join; }
406 :
407 : void *
408 0 : fd_gossip_delete ( void * shmap ) {
409 0 : fd_gossip_t * glob = (fd_gossip_t *)shmap;
410 0 : fd_peer_table_delete( fd_peer_table_leave( glob->peers ) );
411 0 : fd_active_table_delete( fd_active_table_leave( glob->actives ) );
412 :
413 0 : fd_value_table_delete( fd_value_table_leave( glob->values ) );
414 0 : fd_pending_pool_delete( fd_pending_pool_leave( glob->event_pool ) );
415 0 : fd_pending_heap_delete( fd_pending_heap_leave( glob->event_heap ) );
416 0 : fd_stats_table_delete( fd_stats_table_leave( glob->stats ) );
417 0 : fd_weights_table_delete( fd_weights_table_leave( glob->weights ) );
418 0 : fd_push_states_pool_delete( fd_push_states_pool_leave( glob->push_states_pool ) );
419 :
420 0 : return glob;
421 0 : }
422 :
423 : static void
424 0 : fd_gossip_lock( fd_gossip_t * gossip ) {
425 0 : # if FD_HAS_THREADS
426 0 : for(;;) {
427 0 : if( FD_LIKELY( !FD_ATOMIC_CAS( &gossip->lock, 0UL, 1UL) ) ) break;
428 0 : FD_SPIN_PAUSE();
429 0 : }
430 : # else
431 : gossip->lock = 1;
432 : # endif
433 0 : FD_COMPILER_MFENCE();
434 0 : }
435 :
436 : static void
437 0 : fd_gossip_unlock( fd_gossip_t * gossip ) {
438 0 : FD_COMPILER_MFENCE();
439 0 : FD_VOLATILE( gossip->lock ) = 0UL;
440 0 : }
441 :
442 : void
443 : fd_gossip_contact_info_v2_to_v1( fd_gossip_contact_info_v2_t const * v2,
444 0 : fd_gossip_contact_info_v1_t * v1 ) {
445 0 : memset( v1, 0, sizeof(fd_gossip_contact_info_v1_t) );
446 0 : v1->id = v2->from;
447 0 : v1->shred_version = v2->shred_version;
448 0 : v1->wallclock = v2->wallclock;
449 0 : fd_gossip_contact_info_v2_find_proto_ident( v2, FD_GOSSIP_SOCKET_TAG_GOSSIP, &v1->gossip );
450 0 : fd_gossip_contact_info_v2_find_proto_ident( v2, FD_GOSSIP_SOCKET_TAG_SERVE_REPAIR, &v1->serve_repair );
451 0 : fd_gossip_contact_info_v2_find_proto_ident( v2, FD_GOSSIP_SOCKET_TAG_TPU, &v1->tpu );
452 0 : fd_gossip_contact_info_v2_find_proto_ident( v2, FD_GOSSIP_SOCKET_TAG_TPU_VOTE, &v1->tpu_vote );
453 0 : fd_gossip_contact_info_v2_find_proto_ident( v2, FD_GOSSIP_SOCKET_TAG_TVU, &v1->tvu );
454 0 : }
455 :
456 : int
457 : fd_gossip_contact_info_v2_find_proto_ident( fd_gossip_contact_info_v2_t const * contact_info,
458 : uchar proto_ident,
459 0 : fd_gossip_socket_addr_t * out_addr ) {
460 0 : ushort port = 0;
461 0 : for( ulong i = 0UL; i<contact_info->sockets_len; i++ ) {
462 0 : fd_gossip_socket_entry_t const * socket_entry = &contact_info->sockets[ i ];
463 0 : port = (ushort)( port + socket_entry->offset );
464 0 : if( socket_entry->key==proto_ident ) {
465 0 : if( socket_entry->index>=contact_info->addrs_len) {
466 0 : continue;
467 0 : }
468 0 : out_addr->addr = contact_info->addrs[ socket_entry->index ];
469 0 : out_addr->port = port;
470 0 : return 1;
471 0 : }
472 0 : }
473 :
474 0 : return 0;
475 0 : }
476 :
477 : /* Convert my style of address to solana style */
478 : int
479 0 : fd_gossip_to_soladdr( fd_gossip_socket_addr_t * dst, fd_gossip_peer_addr_t const * src ) {
480 0 : dst->port = ntohs(src->port);
481 0 : fd_gossip_ip_addr_new_disc(&dst->addr, fd_gossip_ip_addr_enum_ip4);
482 0 : dst->addr.inner.ip4 = src->addr;
483 0 : return 0;
484 0 : }
485 :
486 : /* Convert my style of address from solana style */
487 : int
488 0 : fd_gossip_from_soladdr(fd_gossip_peer_addr_t * dst, fd_gossip_socket_addr_t const * src ) {
489 0 : FD_STATIC_ASSERT(sizeof(fd_gossip_peer_addr_t) == sizeof(ulong),"messed up size");
490 0 : dst->l = 0;
491 0 : dst->port = htons(src->port);
492 0 : if (src->addr.discriminant == fd_gossip_ip_addr_enum_ip4) {
493 0 : dst->addr = src->addr.inner.ip4;
494 0 : return 0;
495 0 : } else {
496 0 : FD_LOG_ERR(("invalid address family"));
497 0 : return -1;
498 0 : }
499 0 : }
500 :
501 : /* Convert an address to a human readable string */
502 0 : const char * fd_gossip_addr_str( char * dst, size_t dstlen, fd_gossip_peer_addr_t const * src ) {
503 0 : char tmp[INET_ADDRSTRLEN];
504 0 : snprintf(dst, dstlen, "%s:%u", inet_ntop(AF_INET, &src->addr, tmp, INET_ADDRSTRLEN), (uint)ntohs(src->port));
505 0 : return dst;
506 0 : }
507 :
508 : /* Set the gossip configuration */
509 : int
510 0 : fd_gossip_set_config( fd_gossip_t * glob, const fd_gossip_config_t * config ) {
511 0 : fd_gossip_lock( glob );
512 :
513 0 : char tmp[100];
514 0 : char keystr[ FD_BASE58_ENCODED_32_SZ ];
515 0 : fd_base58_encode_32( config->public_key->uc, NULL, keystr );
516 0 : FD_LOG_NOTICE(("configuring address %s id %s", fd_gossip_addr_str(tmp, sizeof(tmp), &config->my_addr), keystr));
517 :
518 0 : glob->public_key = config->public_key;
519 0 : glob->private_key = config->private_key;
520 0 : fd_hash_copy(&glob->my_contact_info.id, config->public_key);
521 0 : fd_gossip_peer_addr_copy(&glob->my_addr, &config->my_addr);
522 0 : fd_gossip_to_soladdr(&glob->my_contact_info.gossip, &config->my_addr);
523 0 : glob->my_contact_info.shred_version = config->shred_version;
524 0 : glob->my_version = config->my_version;
525 0 : glob->deliver_fun = config->deliver_fun;
526 0 : glob->deliver_arg = config->deliver_arg;
527 0 : glob->send_fun = config->send_fun;
528 0 : glob->send_arg = config->send_arg;
529 0 : glob->sign_fun = config->sign_fun;
530 0 : glob->sign_arg = config->sign_arg;
531 :
532 0 : fd_gossip_unlock( glob );
533 :
534 0 : return 0;
535 0 : }
536 :
537 : int
538 0 : fd_gossip_update_addr( fd_gossip_t * glob, const fd_gossip_peer_addr_t * my_addr ) {
539 0 : char tmp[100];
540 0 : FD_LOG_NOTICE(("updating address %s", fd_gossip_addr_str(tmp, sizeof(tmp), my_addr)));
541 :
542 0 : fd_gossip_lock( glob );
543 0 : fd_gossip_peer_addr_copy(&glob->my_addr, my_addr);
544 0 : fd_gossip_to_soladdr(&glob->my_contact_info.gossip, my_addr);
545 0 : fd_gossip_unlock( glob );
546 0 : return 0;
547 0 : }
548 :
549 : int
550 0 : fd_gossip_update_repair_addr( fd_gossip_t * glob, const fd_gossip_peer_addr_t * serve ) {
551 0 : char tmp[100];
552 0 : FD_LOG_NOTICE(("updating repair service address %s", fd_gossip_addr_str(tmp, sizeof(tmp), serve)));
553 :
554 0 : fd_gossip_lock( glob );
555 0 : fd_gossip_to_soladdr(&glob->my_contact_info.serve_repair, serve);
556 0 : fd_gossip_unlock( glob );
557 0 : return 0;
558 0 : }
559 :
560 : int
561 0 : fd_gossip_update_tvu_addr( fd_gossip_t * glob, const fd_gossip_peer_addr_t * tvu, const fd_gossip_peer_addr_t * tvu_fwd ) {
562 0 : char tmp[100];
563 0 : FD_LOG_NOTICE(("updating tvu service address %s", fd_gossip_addr_str(tmp, sizeof(tmp), tvu)));
564 0 : FD_LOG_NOTICE(("updating tvu_fwd service address %s", fd_gossip_addr_str(tmp, sizeof(tmp), tvu_fwd)));
565 :
566 0 : fd_gossip_lock( glob );
567 0 : fd_gossip_to_soladdr(&glob->my_contact_info.tvu, tvu);
568 0 : fd_gossip_to_soladdr(&glob->my_contact_info.tvu_fwd, tvu_fwd);
569 0 : fd_gossip_unlock( glob );
570 0 : return 0;
571 0 : }
572 :
573 : int
574 : fd_gossip_update_tpu_addr( fd_gossip_t * glob,
575 : fd_gossip_peer_addr_t const * tpu,
576 0 : fd_gossip_peer_addr_t const * tpu_fwd ) {
577 0 : char tmp[100];
578 0 : FD_LOG_NOTICE(("updating tpu service address %s", fd_gossip_addr_str(tmp, sizeof(tmp), tpu) ));
579 0 : FD_LOG_NOTICE(("updating tpu_fwd service address %s", fd_gossip_addr_str(tmp, sizeof(tmp), tpu_fwd ) ));
580 :
581 0 : fd_gossip_lock( glob );
582 0 : fd_gossip_to_soladdr(&glob->my_contact_info.tpu, tpu);
583 0 : fd_gossip_to_soladdr(&glob->my_contact_info.tpu_fwd, tpu);
584 0 : fd_gossip_unlock( glob );
585 :
586 0 : return 0;
587 0 : }
588 :
589 : int
590 0 : fd_gossip_update_tpu_vote_addr( fd_gossip_t * glob, const fd_gossip_peer_addr_t * tpu_vote ) {
591 0 : char tmp[100];
592 0 : FD_LOG_NOTICE(("updating tpu vote service address %s", fd_gossip_addr_str(tmp, sizeof(tmp), tpu_vote)));
593 :
594 0 : fd_gossip_lock( glob );
595 0 : fd_gossip_to_soladdr(&glob->my_contact_info.tpu_vote, tpu_vote);
596 0 : fd_gossip_unlock( glob );
597 :
598 0 : return 0;
599 0 : }
600 :
601 : void
602 0 : fd_gossip_set_shred_version( fd_gossip_t * glob, ushort shred_version ) {
603 0 : glob->my_contact_info.shred_version = shred_version;
604 0 : }
605 :
606 : /* Add an event to the queue of pending timed events. The resulting
607 : value needs "fun" and "fun_arg" to be set. */
608 : static fd_pending_event_t *
609 0 : fd_gossip_add_pending( fd_gossip_t * glob, long when ) {
610 0 : if (fd_pending_pool_free( glob->event_pool ) == 0)
611 0 : return NULL;
612 0 : fd_pending_event_t * ev = fd_pending_pool_ele_acquire( glob->event_pool );
613 0 : ev->key = when;
614 0 : fd_pending_heap_ele_insert( glob->event_heap, ev, glob->event_pool );
615 0 : return ev;
616 0 : }
617 :
618 : /* Send raw data as a UDP packet to an address */
619 : static void
620 0 : fd_gossip_send_raw( fd_gossip_t * glob, const fd_gossip_peer_addr_t * dest, void * data, size_t sz) {
621 0 : if ( sz > PACKET_DATA_SIZE )
622 0 : FD_LOG_ERR(("sending oversized packet, size=%lu", sz));
623 0 : fd_gossip_unlock( glob );
624 0 : (*glob->send_fun)(data, sz, dest, glob->send_arg);
625 0 : fd_gossip_lock( glob );
626 0 : }
627 :
628 : /* Send a gossip message to an address */
629 : static void
630 0 : fd_gossip_send( fd_gossip_t * glob, const fd_gossip_peer_addr_t * dest, fd_gossip_msg_t * gmsg ) {
631 : /* Encode the data */
632 0 : uchar buf[PACKET_DATA_SIZE];
633 0 : fd_bincode_encode_ctx_t ctx;
634 0 : ctx.data = buf;
635 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
636 0 : if ( fd_gossip_msg_encode( gmsg, &ctx ) ) {
637 0 : FD_LOG_WARNING(("fd_gossip_msg_encode failed"));
638 0 : return;
639 0 : }
640 0 : size_t sz = (size_t)((const uchar *)ctx.data - buf);
641 0 : fd_gossip_send_raw( glob, dest, buf, sz);
642 : // char tmp[100];
643 : // FD_LOG_WARNING(("sent msg type %u to %s size=%lu", gmsg->discriminant, fd_gossip_addr_str(tmp, sizeof(tmp), dest), sz));
644 0 : }
645 :
646 : /* Initiate the ping/pong protocol to a validator address */
647 : static void
648 0 : fd_gossip_make_ping( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
649 : /* Update the active table where we track the state of the ping/pong
650 : protocol */
651 0 : fd_gossip_peer_addr_t * key = &arg->key;
652 0 : fd_active_elem_t * val = fd_active_table_query(glob->actives, key, NULL);
653 0 : if (val == NULL) {
654 0 : if (fd_active_table_is_full(glob->actives))
655 0 : return;
656 0 : val = fd_active_table_insert(glob->actives, key);
657 0 : fd_active_new_value(val);
658 0 : } else {
659 0 : if (val->pongtime != 0)
660 : /* Success */
661 0 : return;
662 0 : if (val->pingcount++ >= MAX_PEER_PING_COUNT) {
663 : /* Give up. This is a bad peer. */
664 0 : fd_active_table_remove(glob->actives, key);
665 0 : fd_peer_table_remove(glob->peers, key);
666 0 : return;
667 0 : }
668 0 : }
669 0 : val->pingtime = glob->now;
670 : /* Generate a new token when we start a fresh round of pinging */
671 0 : if (val->pingcount == 1U) {
672 0 : for ( ulong i = 0; i < FD_HASH_FOOTPRINT / sizeof(ulong); ++i )
673 0 : val->pingtoken.ul[i] = fd_rng_ulong(glob->rng);
674 0 : }
675 :
676 : /* Keep pinging until we succeed */
677 0 : fd_pending_event_t * ev = fd_gossip_add_pending( glob, glob->now + (long)2e8 /* 200 ms */ );
678 0 : if (ev != NULL) {
679 0 : ev->fun = fd_gossip_make_ping;
680 0 : fd_gossip_peer_addr_copy(&ev->fun_arg.key, key);
681 0 : }
682 :
683 0 : fd_pubkey_t * public_key = glob->public_key;
684 :
685 : /* Build a ping message */
686 0 : fd_gossip_msg_t gmsg;
687 0 : fd_gossip_msg_new_disc(&gmsg, fd_gossip_msg_enum_ping);
688 0 : fd_gossip_ping_t * ping = &gmsg.inner.ping;
689 0 : fd_hash_copy( &ping->from, public_key );
690 :
691 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
692 0 : fd_memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
693 0 : fd_memcpy( pre_image+16UL, val->pingtoken.uc, 32UL );
694 :
695 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, &ping->token );
696 :
697 : /* Sign it */
698 :
699 0 : (*glob->sign_fun)( glob->sign_arg, ping->signature.uc, pre_image, FD_PING_PRE_IMAGE_SZ, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519 );
700 :
701 0 : fd_gossip_send( glob, key, &gmsg );
702 0 : }
703 :
704 : /* Respond to a ping from another validator */
705 : static void
706 0 : fd_gossip_handle_ping( fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_gossip_ping_t const * ping ) {
707 : /* Verify the signature */
708 0 : fd_sha512_t sha2[1];
709 0 : if (fd_ed25519_verify( /* msg */ ping->token.uc,
710 0 : /* sz */ 32UL,
711 0 : /* sig */ ping->signature.uc,
712 0 : /* public_key */ ping->from.uc,
713 0 : sha2 )) {
714 0 : FD_LOG_WARNING(("received ping with invalid signature"));
715 0 : return;
716 0 : }
717 :
718 : /* Build a pong message */
719 0 : fd_gossip_msg_t gmsg;
720 0 : fd_gossip_msg_new_disc(&gmsg, fd_gossip_msg_enum_pong);
721 0 : fd_gossip_ping_t * pong = &gmsg.inner.pong;
722 :
723 0 : fd_pubkey_t * public_key = glob->public_key;
724 :
725 0 : fd_hash_copy( &pong->from, public_key );
726 :
727 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
728 0 : fd_memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
729 0 : fd_memcpy( pre_image+16UL, ping->token.uc, 32UL);
730 :
731 : /* Generate response hash token */
732 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, &pong->token );
733 :
734 : /* Sign it */
735 0 : (*glob->sign_fun)( glob->sign_arg, pong->signature.uc, pre_image, FD_PING_PRE_IMAGE_SZ, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519 );
736 :
737 0 : fd_gossip_send(glob, from, &gmsg);
738 0 : }
739 :
740 : /* Sign/timestamp an outgoing crds value */
741 : static void
742 0 : fd_gossip_sign_crds_value( fd_gossip_t * glob, fd_crds_value_t * crd ) {
743 : /* Update the identifier and timestamp */
744 0 : fd_pubkey_t * pubkey;
745 0 : ulong * wallclock;
746 0 : switch (crd->data.discriminant) {
747 0 : case fd_crds_data_enum_contact_info_v1:
748 0 : pubkey = &crd->data.inner.contact_info_v1.id;
749 0 : wallclock = &crd->data.inner.contact_info_v1.wallclock;
750 0 : break;
751 0 : case fd_crds_data_enum_vote:
752 0 : pubkey = &crd->data.inner.vote.from;
753 0 : wallclock = &crd->data.inner.vote.wallclock;
754 0 : break;
755 0 : case fd_crds_data_enum_lowest_slot:
756 0 : pubkey = &crd->data.inner.lowest_slot.from;
757 0 : wallclock = &crd->data.inner.lowest_slot.wallclock;
758 0 : break;
759 0 : case fd_crds_data_enum_snapshot_hashes:
760 0 : pubkey = &crd->data.inner.snapshot_hashes.from;
761 0 : wallclock = &crd->data.inner.snapshot_hashes.wallclock;
762 0 : break;
763 0 : case fd_crds_data_enum_accounts_hashes:
764 0 : pubkey = &crd->data.inner.accounts_hashes.from;
765 0 : wallclock = &crd->data.inner.accounts_hashes.wallclock;
766 0 : break;
767 0 : case fd_crds_data_enum_epoch_slots:
768 0 : pubkey = &crd->data.inner.epoch_slots.from;
769 0 : wallclock = &crd->data.inner.epoch_slots.wallclock;
770 0 : break;
771 0 : case fd_crds_data_enum_version_v1:
772 0 : pubkey = &crd->data.inner.version_v1.from;
773 0 : wallclock = &crd->data.inner.version_v1.wallclock;
774 0 : break;
775 0 : case fd_crds_data_enum_version_v2:
776 0 : pubkey = &crd->data.inner.version_v2.from;
777 0 : wallclock = &crd->data.inner.version_v2.wallclock;
778 0 : break;
779 0 : case fd_crds_data_enum_node_instance:
780 0 : pubkey = &crd->data.inner.node_instance.from;
781 0 : wallclock = &crd->data.inner.node_instance.wallclock;
782 0 : break;
783 0 : case fd_crds_data_enum_duplicate_shred:
784 0 : pubkey = &crd->data.inner.duplicate_shred.from;
785 0 : wallclock = &crd->data.inner.duplicate_shred.wallclock;
786 0 : break;
787 0 : case fd_crds_data_enum_incremental_snapshot_hashes:
788 0 : pubkey = &crd->data.inner.incremental_snapshot_hashes.from;
789 0 : wallclock = &crd->data.inner.incremental_snapshot_hashes.wallclock;
790 0 : break;
791 0 : case fd_crds_data_enum_contact_info_v2:
792 0 : pubkey = &crd->data.inner.contact_info_v2.from;
793 0 : wallclock = &crd->data.inner.contact_info_v2.wallclock;
794 0 : break;
795 0 : default:
796 0 : return;
797 0 : }
798 0 : fd_pubkey_t * public_key = glob->public_key;
799 0 : fd_hash_copy(pubkey, public_key);
800 0 : *wallclock = FD_NANOSEC_TO_MILLI(glob->now); /* convert to ms */
801 :
802 : /* Sign it */
803 0 : uchar buf[PACKET_DATA_SIZE];
804 0 : fd_bincode_encode_ctx_t ctx;
805 0 : ctx.data = buf;
806 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
807 0 : if ( fd_crds_data_encode( &crd->data, &ctx ) ) {
808 0 : FD_LOG_WARNING(("fd_crds_data_encode failed"));
809 0 : return;
810 0 : }
811 :
812 0 : (*glob->sign_fun)( glob->sign_arg, crd->signature.uc, buf, (ulong)((uchar*)ctx.data - buf), FD_KEYGUARD_SIGN_TYPE_ED25519 );
813 0 : }
814 :
815 : /* Convert a hash to a bloom filter bit position */
816 : static ulong
817 0 : fd_gossip_bloom_pos( fd_hash_t * hash, ulong key, ulong nbits) {
818 0 : for ( ulong i = 0; i < 32U; ++i) {
819 0 : key ^= (ulong)(hash->uc[i]);
820 0 : key *= 1099511628211UL;
821 0 : }
822 0 : return key % nbits;
823 0 : }
824 :
825 : /* Choose a random active peer with good ping count */
826 : static fd_active_elem_t *
827 0 : fd_gossip_random_active( fd_gossip_t * glob ) {
828 : /* Create a list of active peers with minimal pings */
829 0 : fd_active_elem_t * list[FD_ACTIVE_KEY_MAX];
830 0 : ulong listlen = 0;
831 0 : ulong totweight = 0;
832 0 : for( fd_active_table_iter_t iter = fd_active_table_iter_init( glob->actives );
833 0 : !fd_active_table_iter_done( glob->actives, iter );
834 0 : iter = fd_active_table_iter_next( glob->actives, iter ) ) {
835 0 : fd_active_elem_t * ele = fd_active_table_iter_ele( glob->actives, iter );
836 :
837 0 : if (ele->pongtime == 0 && !fd_gossip_is_allowed_entrypoint( glob, &ele->key )) {
838 0 : continue;
839 0 : } else if (listlen == 0) {
840 0 : list[0] = ele;
841 0 : listlen = 1;
842 0 : totweight = ele->weight;
843 0 : } else if (ele->pingcount > list[0]->pingcount) {
844 0 : continue;
845 0 : } else if (ele->pingcount < list[0]->pingcount) {
846 : /* Reset the list */
847 0 : list[0] = ele;
848 0 : listlen = 1;
849 0 : totweight = ele->weight;
850 0 : } else {
851 0 : list[listlen++] = ele;
852 0 : totweight += ele->weight;
853 0 : }
854 0 : }
855 0 : if (listlen == 0 || totweight == 0)
856 0 : return NULL;
857 : /* Choose a random list element by weight */
858 0 : ulong w = fd_rng_ulong(glob->rng) % totweight;
859 0 : ulong j = 0;
860 0 : for( ulong i = 0; i < listlen; ++i) {
861 0 : if( w < j + list[i]->weight )
862 0 : return list[i];
863 0 : j += list[i]->weight;
864 0 : }
865 0 : FD_LOG_CRIT(( "I shouldn't be here" ));
866 0 : return NULL;
867 0 : }
868 :
869 : /* Generate a pull request for a random active peer */
870 : static void
871 0 : fd_gossip_random_pull( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
872 0 : (void)arg;
873 :
874 : /* Try again in 5 sec */
875 0 : fd_pending_event_t * ev = fd_gossip_add_pending(glob, glob->now + (long)100e6);
876 0 : if (ev) {
877 0 : ev->fun = fd_gossip_random_pull;
878 0 : }
879 :
880 : /* Pick a random partner */
881 0 : fd_active_elem_t * ele = fd_gossip_random_active(glob);
882 0 : if (ele == NULL)
883 0 : return;
884 :
885 : /* Compute the number of packets needed for all the bloom filter parts */
886 0 : ulong nitems = fd_value_table_key_cnt(glob->values);
887 0 : ulong nkeys = 1;
888 0 : ulong npackets = 1;
889 0 : uint nmaskbits = 0;
890 0 : double e = 0;
891 0 : if (nitems > 0) {
892 0 : do {
893 0 : double n = ((double)nitems)/((double)npackets); /* Assume even division of values */
894 0 : double m = (double)FD_BLOOM_NUM_BITS;
895 0 : nkeys = fd_ulong_max(1U, (ulong)((m/n)*0.69314718055994530941723212145818 /* ln(2) */));
896 0 : nkeys = fd_ulong_min(nkeys, FD_BLOOM_MAX_KEYS);
897 0 : if (npackets == FD_BLOOM_MAX_PACKETS)
898 0 : break;
899 0 : double k = (double)nkeys;
900 0 : e = pow(1.0 - exp(-k*n/m), k);
901 0 : if (e < 0.001)
902 0 : break;
903 0 : nmaskbits++;
904 0 : npackets = 1U<<nmaskbits;
905 0 : } while (1);
906 0 : }
907 0 : FD_LOG_DEBUG(("making bloom filter for %lu items with %lu packets and %lu keys %g error", nitems, npackets, nkeys, e));
908 :
909 : /* Generate random keys */
910 0 : ulong keys[FD_BLOOM_MAX_KEYS];
911 0 : for (ulong i = 0; i < nkeys; ++i)
912 0 : keys[i] = fd_rng_ulong(glob->rng);
913 : /* Set all the bits */
914 0 : ulong num_bits_set[FD_BLOOM_MAX_PACKETS];
915 0 : for (ulong i = 0; i < npackets; ++i)
916 0 : num_bits_set[i] = 0;
917 0 : #define CHUNKSIZE (FD_BLOOM_NUM_BITS/64U)
918 0 : ulong bits[CHUNKSIZE * FD_BLOOM_MAX_PACKETS];
919 0 : fd_memset(bits, 0, CHUNKSIZE*8U*npackets);
920 0 : ulong expire = FD_NANOSEC_TO_MILLI(glob->now) - FD_GOSSIP_VALUE_EXPIRE;
921 0 : for( fd_value_table_iter_t iter = fd_value_table_iter_init( glob->values );
922 0 : !fd_value_table_iter_done( glob->values, iter );
923 0 : iter = fd_value_table_iter_next( glob->values, iter ) ) {
924 0 : fd_value_elem_t * ele = fd_value_table_iter_ele( glob->values, iter );
925 0 : fd_hash_t * hash = &(ele->key);
926 : /* Purge expired values */
927 0 : if (ele->wallclock < expire) {
928 0 : fd_value_table_remove( glob->values, hash );
929 0 : continue;
930 0 : }
931 : /* Choose which filter packet based on the high bits in the hash */
932 0 : ulong index = (nmaskbits == 0 ? 0UL : ( hash->ul[0] >> (64U - nmaskbits) ));
933 0 : ulong * chunk = bits + (index*CHUNKSIZE);
934 0 : for (ulong i = 0; i < nkeys; ++i) {
935 0 : ulong pos = fd_gossip_bloom_pos(hash, keys[i], FD_BLOOM_NUM_BITS);
936 0 : ulong * j = chunk + (pos>>6U); /* divide by 64 */
937 0 : ulong bit = 1UL<<(pos & 63U);
938 0 : if (!((*j) & bit)) {
939 0 : *j |= bit;
940 0 : num_bits_set[index]++;
941 0 : }
942 0 : }
943 0 : }
944 :
945 : /* Assemble the packets */
946 0 : fd_gossip_msg_t gmsg;
947 0 : fd_gossip_msg_new_disc(&gmsg, fd_gossip_msg_enum_pull_req);
948 0 : fd_gossip_pull_req_t * req = &gmsg.inner.pull_req;
949 0 : fd_crds_filter_t * filter = &req->filter;
950 0 : filter->mask_bits = nmaskbits;
951 0 : filter->filter.keys_len = nkeys;
952 0 : filter->filter.keys = keys;
953 0 : fd_gossip_bitvec_u64_t * bitvec = &filter->filter.bits;
954 0 : bitvec->len = FD_BLOOM_NUM_BITS;
955 0 : bitvec->has_bits = 1;
956 0 : bitvec->bits.vec_len = FD_BLOOM_NUM_BITS/64U;
957 :
958 : /* The "value" in the request is always my own contact info */
959 0 : fd_crds_value_t * value = &req->value;
960 0 : fd_crds_data_new_disc(&value->data, fd_crds_data_enum_contact_info_v1);
961 0 : fd_gossip_contact_info_v1_t * ci = &value->data.inner.contact_info_v1;
962 0 : fd_memcpy(ci, &glob->my_contact_info, sizeof(fd_gossip_contact_info_v1_t));
963 0 : fd_gossip_sign_crds_value(glob, value);
964 :
965 0 : for (uint i = 0; i < npackets; ++i) {
966 : /* Update the filter mask specific part */
967 0 : filter->mask = (nmaskbits == 0 ? ~0UL : ((i << (64U - nmaskbits)) | (~0UL >> nmaskbits)));
968 0 : filter->filter.num_bits_set = num_bits_set[i];
969 0 : bitvec->bits.vec = bits + (i*CHUNKSIZE);
970 0 : fd_gossip_send(glob, &ele->key, &gmsg);
971 0 : }
972 0 : }
973 :
974 : /* Handle a pong response */
975 : static void
976 0 : fd_gossip_handle_pong( fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_gossip_ping_t const * pong ) {
977 0 : fd_active_elem_t * val = fd_active_table_query(glob->actives, from, NULL);
978 0 : if (val == NULL) {
979 0 : FD_LOG_DEBUG(("received pong too late"));
980 0 : return;
981 0 : }
982 :
983 0 : uchar pre_image[FD_PING_PRE_IMAGE_SZ];
984 0 : fd_memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
985 0 : fd_memcpy( pre_image+16UL, val->pingtoken.uc, 32UL );
986 :
987 :
988 0 : fd_hash_t pre_image_hash;
989 0 : fd_sha256_hash( pre_image, FD_PING_PRE_IMAGE_SZ, pre_image_hash.uc );
990 :
991 : /* Confirm response hash token */
992 0 : fd_sha256_t sha[1];
993 0 : fd_sha256_init( sha );
994 0 : fd_sha256_append( sha, "SOLANA_PING_PONG", 16UL );
995 :
996 0 : fd_sha256_append( sha, pre_image_hash.uc, 32UL );
997 0 : fd_hash_t pongtoken;
998 0 : fd_sha256_fini( sha, pongtoken.uc );
999 0 : if (memcmp(pongtoken.uc, pong->token.uc, 32UL) != 0) {
1000 0 : FD_LOG_DEBUG(( "received pong with wrong token" ));
1001 0 : return;
1002 0 : }
1003 :
1004 : /* Verify the signature */
1005 0 : fd_sha512_t sha2[1];
1006 0 : if (fd_ed25519_verify( /* msg */ pong->token.uc,
1007 0 : /* sz */ 32UL,
1008 0 : /* sig */ pong->signature.uc,
1009 0 : /* public_key */ pong->from.uc,
1010 0 : sha2 )) {
1011 0 : FD_LOG_WARNING(("received pong with invalid signature"));
1012 0 : return;
1013 0 : }
1014 :
1015 0 : val->pongtime = glob->now;
1016 0 : fd_hash_copy(&val->id, &pong->from);
1017 :
1018 : /* Remember that this is a good peer */
1019 0 : fd_peer_elem_t * peerval = fd_peer_table_query(glob->peers, from, NULL);
1020 0 : if (peerval == NULL) {
1021 0 : if (fd_peer_table_is_full(glob->peers)) {
1022 0 : FD_LOG_DEBUG(("too many peers"));
1023 0 : return;
1024 0 : }
1025 0 : peerval = fd_peer_table_insert(glob->peers, from);
1026 0 : peerval->stake = 0;
1027 0 : }
1028 0 : peerval->wallclock = FD_NANOSEC_TO_MILLI(glob->now); /* In millisecs */
1029 0 : fd_hash_copy(&peerval->id, &pong->from);
1030 :
1031 0 : fd_weights_elem_t const * val2 = fd_weights_table_query_const( glob->weights, &val->id, NULL );
1032 0 : val->weight = ( val2 == NULL ? 1UL : val2->weight );
1033 :
1034 0 : }
1035 :
1036 : /* Initiate a ping/pong with a random active partner to confirm it is
1037 : still alive. */
1038 : static void
1039 0 : fd_gossip_random_ping( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
1040 0 : (void)arg;
1041 :
1042 : /* Try again in 1 sec */
1043 0 : fd_pending_event_t * ev = fd_gossip_add_pending(glob, glob->now + (long)100e8);
1044 0 : if (ev) {
1045 0 : ev->fun = fd_gossip_random_ping;
1046 0 : }
1047 :
1048 0 : if (fd_pending_pool_free( glob->event_pool ) < 100U)
1049 0 : return;
1050 :
1051 0 : ulong cnt = fd_active_table_key_cnt(glob->actives);
1052 0 : if (cnt == 0 && glob->inactives_cnt == 0)
1053 0 : return;
1054 0 : fd_gossip_peer_addr_t * addr = NULL;
1055 0 : if (glob->inactives_cnt > 0 && cnt < FD_ACTIVE_KEY_MAX)
1056 : /* Try a new peer */
1057 0 : addr = glob->inactives + (--(glob->inactives_cnt));
1058 0 : else {
1059 : /* Choose a random active peer */
1060 0 : ulong i = fd_rng_ulong(glob->rng) % cnt;
1061 0 : ulong j = 0;
1062 0 : for( fd_active_table_iter_t iter = fd_active_table_iter_init( glob->actives );
1063 0 : !fd_active_table_iter_done( glob->actives, iter );
1064 0 : iter = fd_active_table_iter_next( glob->actives, iter ) ) {
1065 0 : if (i == j++) {
1066 0 : fd_active_elem_t * ele = fd_active_table_iter_ele( glob->actives, iter );
1067 0 : if (glob->now - ele->pingtime < (long)60e9) /* minute cooldown */
1068 0 : return;
1069 0 : ele->pingcount = 0;
1070 0 : ele->pongtime = 0;
1071 0 : addr = &(ele->key);
1072 0 : break;
1073 0 : }
1074 0 : }
1075 0 : }
1076 :
1077 0 : fd_pending_event_arg_t arg2;
1078 0 : fd_gossip_peer_addr_copy(&arg2.key, addr);
1079 0 : fd_gossip_make_ping(glob, &arg2);
1080 0 : }
1081 :
1082 : /* Process an incoming crds value */
1083 : static void
1084 0 : fd_gossip_recv_crds_value(fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_pubkey_t * pubkey, fd_crds_value_t* crd) {
1085 : /* Verify the signature */
1086 0 : ulong wallclock;
1087 0 : switch (crd->data.discriminant) {
1088 0 : case fd_crds_data_enum_contact_info_v1:
1089 0 : pubkey = &crd->data.inner.contact_info_v1.id;
1090 0 : wallclock = crd->data.inner.contact_info_v1.wallclock;
1091 0 : break;
1092 0 : case fd_crds_data_enum_vote:
1093 0 : pubkey = &crd->data.inner.vote.from;
1094 0 : wallclock = crd->data.inner.vote.wallclock;
1095 0 : break;
1096 0 : case fd_crds_data_enum_lowest_slot:
1097 0 : pubkey = &crd->data.inner.lowest_slot.from;
1098 0 : wallclock = crd->data.inner.lowest_slot.wallclock;
1099 0 : break;
1100 0 : case fd_crds_data_enum_snapshot_hashes:
1101 0 : pubkey = &crd->data.inner.snapshot_hashes.from;
1102 0 : wallclock = crd->data.inner.snapshot_hashes.wallclock;
1103 0 : break;
1104 0 : case fd_crds_data_enum_accounts_hashes:
1105 0 : pubkey = &crd->data.inner.accounts_hashes.from;
1106 0 : wallclock = crd->data.inner.accounts_hashes.wallclock;
1107 0 : break;
1108 0 : case fd_crds_data_enum_epoch_slots:
1109 0 : pubkey = &crd->data.inner.epoch_slots.from;
1110 0 : wallclock = crd->data.inner.epoch_slots.wallclock;
1111 0 : break;
1112 0 : case fd_crds_data_enum_version_v1:
1113 0 : pubkey = &crd->data.inner.version_v1.from;
1114 0 : wallclock = crd->data.inner.version_v1.wallclock;
1115 0 : break;
1116 0 : case fd_crds_data_enum_version_v2:
1117 0 : pubkey = &crd->data.inner.version_v2.from;
1118 0 : wallclock = crd->data.inner.version_v2.wallclock;
1119 0 : break;
1120 0 : case fd_crds_data_enum_node_instance:
1121 0 : pubkey = &crd->data.inner.node_instance.from;
1122 0 : wallclock = crd->data.inner.node_instance.wallclock;
1123 0 : break;
1124 0 : case fd_crds_data_enum_duplicate_shred:
1125 0 : pubkey = &crd->data.inner.duplicate_shred.from;
1126 0 : wallclock = crd->data.inner.duplicate_shred.wallclock;
1127 0 : break;
1128 0 : case fd_crds_data_enum_incremental_snapshot_hashes:
1129 0 : pubkey = &crd->data.inner.incremental_snapshot_hashes.from;
1130 0 : wallclock = crd->data.inner.incremental_snapshot_hashes.wallclock;
1131 0 : break;
1132 0 : case fd_crds_data_enum_contact_info_v2:
1133 0 : pubkey = &crd->data.inner.contact_info_v2.from;
1134 0 : wallclock = crd->data.inner.contact_info_v2.wallclock;
1135 0 : break;
1136 0 : default:
1137 0 : wallclock = FD_NANOSEC_TO_MILLI(glob->now); /* In millisecs */
1138 0 : break;
1139 0 : }
1140 0 : if (memcmp(pubkey->uc, glob->public_key->uc, 32U) == 0)
1141 : /* Ignore my own messages */
1142 0 : return;
1143 0 : if( crd->data.discriminant>=FD_KNOWN_CRDS_ENUM_MAX ) {
1144 0 : return;
1145 0 : }
1146 :
1147 : /* Perform the value hash to get the value table key */
1148 0 : uchar buf[PACKET_DATA_SIZE];
1149 0 : fd_bincode_encode_ctx_t ctx;
1150 0 : ctx.data = buf;
1151 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
1152 0 : if ( fd_crds_value_encode( crd, &ctx ) ) {
1153 0 : FD_LOG_ERR(("fd_crds_value_encode failed"));
1154 0 : return;
1155 0 : }
1156 0 : ulong datalen = (ulong)((uchar*)ctx.data - buf);
1157 0 : fd_sha256_t sha2[1];
1158 0 : fd_sha256_init( sha2 );
1159 0 : fd_sha256_append( sha2, buf, datalen );
1160 0 : fd_hash_t key;
1161 0 : fd_sha256_fini( sha2, key.uc );
1162 :
1163 0 : fd_msg_stats_elem_t * msg_stat = &glob->msg_stats[ crd->data.discriminant ];
1164 0 : msg_stat->total_cnt++;
1165 0 : msg_stat->bytes_rx_cnt += datalen;
1166 0 : fd_value_elem_t * msg = fd_value_table_query(glob->values, &key, NULL);
1167 0 : if (msg != NULL) {
1168 : /* Already have this value */
1169 0 : msg_stat->dups_cnt++;
1170 0 : glob->recv_dup_cnt++;
1171 0 : if (from != NULL) {
1172 : /* Record the dup in the receive statistics table */
1173 0 : fd_stats_elem_t * val = fd_stats_table_query(glob->stats, from, NULL);
1174 0 : if (val == NULL) {
1175 0 : if (!fd_stats_table_is_full(glob->stats)) {
1176 0 : val = fd_stats_table_insert(glob->stats, from);
1177 0 : val->dups_cnt = 0;
1178 0 : }
1179 0 : }
1180 0 : if (val != NULL) {
1181 0 : val->last = glob->now;
1182 0 : for (ulong i = 0; i < val->dups_cnt; ++i)
1183 0 : if (fd_hash_eq(&val->dups[i].origin, pubkey)) {
1184 0 : val->dups[i].cnt++;
1185 0 : goto found_origin;
1186 0 : }
1187 0 : if (val->dups_cnt < 8) {
1188 0 : ulong i = val->dups_cnt++;
1189 0 : fd_hash_copy(&val->dups[i].origin, pubkey);
1190 0 : val->dups[i].cnt = 1;
1191 0 : }
1192 0 : found_origin: ;
1193 0 : }
1194 0 : }
1195 0 : return;
1196 0 : }
1197 :
1198 0 : ctx.data = buf;
1199 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
1200 0 : if ( fd_crds_data_encode( &crd->data, &ctx ) ) {
1201 0 : FD_LOG_ERR(("fd_crds_data_encode failed"));
1202 0 : return;
1203 0 : }
1204 :
1205 0 : fd_sha512_t sha[1];
1206 0 : if (fd_ed25519_verify( /* msg */ buf,
1207 0 : /* sz */ (ulong)((uchar*)ctx.data - buf),
1208 0 : /* sig */ crd->signature.uc,
1209 0 : /* public_key */ pubkey->uc,
1210 0 : sha )) {
1211 0 : FD_LOG_DEBUG(("received crds_value with invalid signature"));
1212 0 : return;
1213 0 : }
1214 :
1215 : /* Store the value for later pushing/duplicate detection */
1216 0 : glob->recv_nondup_cnt++;
1217 0 : if (fd_value_table_is_full(glob->values)) {
1218 0 : FD_LOG_DEBUG(("too many values"));
1219 0 : return;
1220 0 : }
1221 0 : msg = fd_value_table_insert(glob->values, &key);
1222 0 : msg->wallclock = wallclock;
1223 0 : fd_hash_copy(&msg->origin, pubkey);
1224 :
1225 : /* We store the serialized form for convenience */
1226 0 : fd_memcpy(msg->data, buf, datalen);
1227 0 : msg->datalen = datalen;
1228 :
1229 0 : if (glob->need_push_cnt < FD_NEED_PUSH_MAX) {
1230 : /* Remember that I need to push this value */
1231 0 : ulong i = ((glob->need_push_head + (glob->need_push_cnt++)) & (FD_NEED_PUSH_MAX-1U));
1232 0 : fd_hash_copy(glob->need_push + i, &key);
1233 0 : }
1234 :
1235 0 : if (crd->data.discriminant == fd_crds_data_enum_contact_info_v1) {
1236 0 : fd_gossip_contact_info_v1_t * info = &crd->data.inner.contact_info_v1;
1237 0 : if (info->gossip.port != 0) {
1238 : /* Remember the peer */
1239 0 : fd_gossip_peer_addr_t pkey;
1240 0 : fd_memset(&pkey, 0, sizeof(pkey));
1241 0 : fd_gossip_from_soladdr(&pkey, &info->gossip);
1242 0 : fd_peer_elem_t * val = fd_peer_table_query(glob->peers, &pkey, NULL);
1243 0 : if (val == NULL) {
1244 0 : if (fd_peer_table_is_full(glob->peers)) {
1245 0 : FD_LOG_DEBUG(("too many peers"));
1246 0 : } else {
1247 0 : val = fd_peer_table_insert(glob->peers, &pkey);
1248 0 : if (glob->inactives_cnt < INACTIVES_MAX &&
1249 0 : fd_active_table_query(glob->actives, &pkey, NULL) == NULL) {
1250 : /* Queue this peer for later pinging */
1251 0 : fd_gossip_peer_addr_copy(glob->inactives + (glob->inactives_cnt++), &pkey);
1252 0 : }
1253 0 : }
1254 0 : }
1255 0 : if (val != NULL) {
1256 0 : val->wallclock = wallclock;
1257 0 : val->stake = 0;
1258 0 : fd_hash_copy(&val->id, &info->id);
1259 0 : }
1260 0 : }
1261 :
1262 0 : fd_gossip_peer_addr_t peer_addr = { .addr = crd->data.inner.contact_info_v1.gossip.addr.inner.ip4,
1263 0 : .port = fd_ushort_bswap( crd->data.inner.contact_info_v1.gossip.port ) };
1264 0 : if (glob->my_contact_info.shred_version == 0U && fd_gossip_is_allowed_entrypoint( glob, &peer_addr )) {
1265 0 : FD_LOG_NOTICE(("using shred version %lu", (ulong)crd->data.inner.contact_info_v1.shred_version));
1266 0 : glob->my_contact_info.shred_version = crd->data.inner.contact_info_v1.shred_version;
1267 0 : }
1268 0 : }
1269 :
1270 0 : if (crd->data.discriminant == fd_crds_data_enum_contact_info_v2) {
1271 0 : fd_gossip_contact_info_v2_t * info = &crd->data.inner.contact_info_v2;
1272 0 : fd_gossip_socket_addr_t socket_addr;
1273 0 : if( fd_gossip_contact_info_v2_find_proto_ident( info, FD_GOSSIP_SOCKET_TAG_GOSSIP, &socket_addr ) ) {
1274 0 : if (socket_addr.port != 0) {
1275 : /* Remember the peer */
1276 0 : fd_gossip_peer_addr_t pkey;
1277 0 : fd_memset(&pkey, 0, sizeof(pkey));
1278 0 : fd_gossip_from_soladdr(&pkey, &socket_addr);
1279 0 : fd_peer_elem_t * val = fd_peer_table_query(glob->peers, &pkey, NULL);
1280 0 : if (val == NULL) {
1281 0 : if (fd_peer_table_is_full(glob->peers)) {
1282 0 : FD_LOG_DEBUG(("too many peers"));
1283 0 : } else {
1284 0 : val = fd_peer_table_insert(glob->peers, &pkey);
1285 0 : if (glob->inactives_cnt < INACTIVES_MAX &&
1286 0 : fd_active_table_query(glob->actives, &pkey, NULL) == NULL) {
1287 : /* Queue this peer for later pinging */
1288 0 : fd_gossip_peer_addr_copy(glob->inactives + (glob->inactives_cnt++), &pkey);
1289 0 : }
1290 0 : }
1291 0 : }
1292 0 : if (val != NULL) {
1293 0 : val->wallclock = wallclock;
1294 0 : val->stake = 0;
1295 0 : fd_hash_copy(&val->id, &info->from);
1296 0 : }
1297 0 : }
1298 :
1299 0 : fd_gossip_peer_addr_t peer_addr = { .addr = socket_addr.addr.inner.ip4,
1300 0 : .port = fd_ushort_bswap( socket_addr.port ) };
1301 0 : if (glob->my_contact_info.shred_version == 0U && fd_gossip_is_allowed_entrypoint( glob, &peer_addr )) {
1302 0 : FD_LOG_NOTICE(("using shred version %lu", (ulong)crd->data.inner.contact_info_v2.shred_version));
1303 0 : glob->my_contact_info.shred_version = crd->data.inner.contact_info_v2.shred_version;
1304 0 : }
1305 0 : }
1306 0 : }
1307 :
1308 : /* Deliver the data upstream */
1309 0 : fd_gossip_unlock( glob );
1310 0 : (*glob->deliver_fun)(&crd->data, glob->deliver_arg);
1311 0 : fd_gossip_lock( glob );
1312 0 : }
1313 :
1314 : /* Handle a prune request from somebody else */
1315 : static void
1316 0 : fd_gossip_handle_prune(fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_gossip_prune_msg_t * msg) {
1317 0 : (void)from;
1318 :
1319 : /* Confirm the message is for me */
1320 0 : if (memcmp(msg->data.destination.uc, glob->public_key->uc, 32U) != 0)
1321 0 : return;
1322 :
1323 : /* Verify the signature. This is hacky for prune messages */
1324 0 : fd_gossip_prune_sign_data_t signdata;
1325 0 : signdata.pubkey = msg->data.pubkey;
1326 0 : signdata.prunes_len = msg->data.prunes_len;
1327 0 : signdata.prunes = msg->data.prunes;
1328 0 : signdata.destination = msg->data.destination;
1329 0 : signdata.wallclock = msg->data.wallclock;
1330 :
1331 : /* Verify the signature. You would think that solana would use
1332 : msg->pubkey.uc for this, but that pubkey is actually ignored. The
1333 : inclusion of two pubkeys in this message is confusing and
1334 : problematic. */
1335 0 : uchar buf[PACKET_DATA_SIZE];
1336 0 : fd_bincode_encode_ctx_t ctx;
1337 0 : ctx.data = buf;
1338 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
1339 0 : if ( fd_gossip_prune_sign_data_encode( &signdata, &ctx ) ) {
1340 0 : FD_LOG_ERR(("fd_gossip_prune_sign_data_encode failed"));
1341 0 : return;
1342 0 : }
1343 0 : fd_sha512_t sha[1];
1344 0 : if (fd_ed25519_verify( /* msg */ buf,
1345 0 : /* sz */ (ulong)((uchar*)ctx.data - buf),
1346 0 : /* sig */ msg->data.signature.uc,
1347 0 : /* public_key */ msg->data.pubkey.uc,
1348 0 : sha )) {
1349 0 : FD_LOG_WARNING(("received prune_msg with invalid signature"));
1350 0 : return;
1351 0 : }
1352 :
1353 : /* Find the active push state which needs to be pruned */
1354 0 : fd_push_state_t* ps = NULL;
1355 0 : for (ulong i = 0; i < glob->push_states_cnt; ++i) {
1356 0 : fd_push_state_t* s = glob->push_states[i];
1357 0 : if (memcmp(msg->data.pubkey.uc, s->id.uc, 32U) == 0) {
1358 0 : ps = s;
1359 0 : break;
1360 0 : }
1361 0 : }
1362 0 : if (ps == NULL)
1363 0 : return;
1364 :
1365 : /* Set the bloom filter prune bits */
1366 0 : for (ulong i = 0; i < msg->data.prunes_len; ++i) {
1367 0 : fd_pubkey_t * p = msg->data.prunes + i;
1368 0 : for (ulong j = 0; j < FD_PRUNE_NUM_KEYS; ++j) {
1369 0 : ulong pos = fd_gossip_bloom_pos(p, ps->prune_keys[j], FD_PRUNE_NUM_BITS);
1370 0 : ulong * j = ps->prune_bits + (pos>>6U); /* divide by 64 */
1371 0 : ulong bit = 1UL<<(pos & 63U);
1372 0 : *j |= bit;
1373 0 : }
1374 0 : }
1375 0 : }
1376 :
1377 : static int
1378 : fd_gossip_push_value_nolock( fd_gossip_t * glob, fd_crds_data_t * data, fd_hash_t * key_opt );
1379 :
1380 : /* Push an updated version of my contact info into values */
1381 : static void
1382 0 : fd_gossip_push_updated_contact(fd_gossip_t * glob) {
1383 : /* See if we have a shred version yet */
1384 0 : if (glob->my_contact_info.shred_version == 0U)
1385 0 : return;
1386 : /* Update every 1 secs */
1387 0 : if (glob->now - glob->last_contact_time < (long)1e9)
1388 0 : return;
1389 :
1390 0 : if (glob->last_contact_time != 0) {
1391 : /* Remove the old contact value */
1392 0 : fd_value_elem_t * ele = fd_value_table_query(glob->values, &glob->last_contact_info_key, NULL);
1393 0 : if (ele != NULL) {
1394 0 : fd_value_table_remove( glob->values, &glob->last_contact_info_key );
1395 0 : }
1396 :
1397 : /* Remove the old version value */
1398 0 : ele = fd_value_table_query(glob->values, &glob->last_version_key, NULL);
1399 0 : if (ele != NULL) {
1400 0 : fd_value_table_remove( glob->values, &glob->last_version_key );
1401 0 : }
1402 :
1403 0 : ele = fd_value_table_query(glob->values, &glob->last_contact_info_v2_key, NULL);
1404 0 : if (ele != NULL) {
1405 0 : fd_value_table_remove( glob->values, &glob->last_contact_info_v2_key );
1406 0 : }
1407 :
1408 0 : ele = fd_value_table_query(glob->values, &glob->last_node_instance_key, NULL);
1409 0 : if (ele != NULL) {
1410 0 : fd_value_table_remove( glob->values, &glob->last_node_instance_key );
1411 0 : }
1412 0 : }
1413 :
1414 0 : glob->last_contact_time = glob->now;
1415 :
1416 0 : {
1417 0 : fd_crds_data_t crd;
1418 0 : fd_crds_data_new_disc(&crd, fd_crds_data_enum_contact_info_v1);
1419 0 : fd_gossip_contact_info_v1_t * ci = &crd.inner.contact_info_v1;
1420 0 : fd_memcpy(ci, &glob->my_contact_info, sizeof(fd_gossip_contact_info_v1_t));
1421 :
1422 0 : fd_gossip_push_value_nolock(glob, &crd, &glob->last_contact_info_key);
1423 0 : }
1424 :
1425 0 : {
1426 0 : fd_crds_data_t crd;
1427 0 : fd_crds_data_new_disc(&crd, fd_crds_data_enum_contact_info_v2);
1428 0 : fd_gossip_contact_info_v2_t * ci = &crd.inner.contact_info_v2;
1429 :
1430 0 : fd_gossip_ip_addr_t addrs[ 256 ] = {0};
1431 0 : fd_gossip_socket_entry_t sockets[ 256 ] = {0};
1432 : // uint extentions[ 1 ] = {0};
1433 0 : ci->addrs = addrs;
1434 0 : ci->sockets = sockets;
1435 :
1436 0 : ushort last_port = 0;
1437 0 : uchar cnt = 0;
1438 0 : for(;;) {
1439 0 : fd_gossip_ip_addr_t * min_addr = NULL;
1440 0 : ushort min_port = USHORT_MAX;
1441 0 : uchar min_key = 0;
1442 :
1443 0 : ushort gossip_port = glob->my_contact_info.gossip.port;
1444 0 : ushort serve_repair_port = glob->my_contact_info.serve_repair.port;
1445 0 : ushort tvu_port = glob->my_contact_info.tvu.port;
1446 0 : ushort tpu_port = glob->my_contact_info.tpu.port;
1447 0 : ushort tpu_quic_port = (ushort)( glob->my_contact_info.tpu.port + 6 );
1448 0 : ushort tpu_vote_port = glob->my_contact_info.tpu_vote.port;
1449 0 : if( gossip_port > 0 && gossip_port > last_port && gossip_port < min_port ) {
1450 0 : min_key = FD_GOSSIP_SOCKET_TAG_GOSSIP;
1451 0 : min_addr = &glob->my_contact_info.gossip.addr;
1452 0 : min_port = glob->my_contact_info.gossip.port;
1453 0 : }
1454 0 : if( serve_repair_port > 0 && serve_repair_port > last_port && serve_repair_port < min_port ) {
1455 0 : min_key = FD_GOSSIP_SOCKET_TAG_SERVE_REPAIR;
1456 0 : min_addr = &glob->my_contact_info.serve_repair.addr;
1457 0 : min_port = glob->my_contact_info.serve_repair.port;
1458 0 : }
1459 0 : if( tvu_port > 0 && tvu_port > last_port && tvu_port < min_port ) {
1460 0 : min_key = FD_GOSSIP_SOCKET_TAG_TVU;
1461 0 : min_addr = &glob->my_contact_info.tvu.addr;
1462 0 : min_port = glob->my_contact_info.tvu.port;
1463 0 : }
1464 0 : if( tpu_port > 0 && tpu_port > last_port && tpu_port < min_port ) {
1465 0 : min_key = FD_GOSSIP_SOCKET_TAG_TPU;
1466 0 : min_addr = &glob->my_contact_info.tpu.addr;
1467 0 : min_port = glob->my_contact_info.tpu.port;
1468 0 : }
1469 0 : if( tpu_quic_port > 0 && tpu_quic_port > last_port && tpu_quic_port < min_port ) {
1470 0 : min_key = FD_GOSSIP_SOCKET_TAG_TPU_QUIC;
1471 0 : min_addr = &glob->my_contact_info.tpu.addr;
1472 0 : min_port = (ushort)( glob->my_contact_info.tpu.port + 6 );
1473 0 : }
1474 0 : if( tpu_vote_port > 0 && tpu_vote_port > last_port && tpu_vote_port < min_port ) {
1475 0 : min_key = FD_GOSSIP_SOCKET_TAG_TPU_VOTE;
1476 0 : min_addr = &glob->my_contact_info.tpu_vote.addr;
1477 0 : min_port = glob->my_contact_info.tpu_vote.port;
1478 0 : }
1479 0 : if( min_port==USHORT_MAX ) {
1480 0 : break;
1481 0 : }
1482 :
1483 0 : ci->addrs[ cnt ] = *min_addr;
1484 0 : ci->sockets[ cnt ].index = 0;
1485 0 : ci->sockets[ cnt ].offset = (ushort)( min_port - last_port );
1486 0 : ci->sockets[ cnt ].key = min_key;
1487 0 : cnt++;
1488 0 : last_port = min_port;
1489 :
1490 0 : if( min_key ==FD_GOSSIP_SOCKET_TAG_TPU) {
1491 0 : ci->addrs[ cnt ] = *min_addr;
1492 0 : ci->sockets[ cnt ].index = 0;
1493 0 : ci->sockets[ cnt ].offset = 0;
1494 0 : ci->sockets[ cnt ].key = FD_GOSSIP_SOCKET_TAG_TPU_VOTE;
1495 0 : cnt++;
1496 0 : last_port = min_port;
1497 0 : }
1498 0 : }
1499 :
1500 0 : ci->addrs_len = 1;
1501 0 : ci->sockets_len = cnt;
1502 : // ci->extensions_len = 0;
1503 : // ci->extensions = extentions;
1504 0 : ci->shred_version = glob->my_contact_info.shred_version;
1505 0 : ci->wallclock = FD_NANOSEC_TO_MILLI(glob->now);
1506 0 : ci->from = glob->my_contact_info.id;
1507 0 : ci->outset = 1UL;
1508 0 : ci->version.client = 2;
1509 0 : ci->version.commit = 42;
1510 0 : ci->version.feature_set = 42;
1511 0 : ci->version.major = 42;
1512 0 : ci->version.minor = 42;
1513 0 : ci->version.patch = 42;
1514 :
1515 0 : fd_gossip_push_value_nolock(glob, &crd, &glob->last_contact_info_v2_key);
1516 0 : }
1517 :
1518 0 : {
1519 0 : fd_crds_data_t crd;
1520 0 : fd_crds_data_new_disc(&crd, fd_crds_data_enum_node_instance);
1521 0 : fd_gossip_node_instance_t * node_instance = &crd.inner.node_instance;
1522 0 : node_instance->from = glob->my_contact_info.id;
1523 0 : node_instance->timestamp = (long)FD_NANOSEC_TO_MILLI(glob->now);
1524 0 : node_instance->wallclock = FD_NANOSEC_TO_MILLI(glob->now);
1525 0 : node_instance->token = fd_rng_ulong( glob->rng );
1526 :
1527 0 : fd_gossip_push_value_nolock(glob, &crd, &glob->last_node_instance_key);
1528 0 : }
1529 :
1530 0 : {
1531 0 : fd_crds_data_t crd;
1532 0 : fd_crds_data_new_disc(&crd, fd_crds_data_enum_version_v2);
1533 0 : fd_gossip_version_v2_t * version = &crd.inner.version_v2;
1534 0 : fd_memcpy(version, &glob->my_version, sizeof(fd_gossip_version_v2_t));
1535 :
1536 0 : fd_gossip_push_value_nolock(glob, &crd, &glob->last_version_key);
1537 0 : }
1538 0 : }
1539 :
1540 : /* Respond to a pull request */
1541 : static void
1542 0 : fd_gossip_handle_pull_req(fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_gossip_pull_req_t * msg) {
1543 0 : fd_active_elem_t * val = fd_active_table_query(glob->actives, from, NULL);
1544 0 : if (val == NULL || val->pongtime == 0) {
1545 : /* Ping new peers before responding to requests */
1546 0 : if (fd_pending_pool_free( glob->event_pool ) < 100U)
1547 0 : return;
1548 0 : fd_pending_event_arg_t arg2;
1549 0 : fd_gossip_peer_addr_copy(&arg2.key, from);
1550 0 : fd_gossip_make_ping(glob, &arg2);
1551 0 : return;
1552 0 : }
1553 :
1554 : /* Encode an empty pull response as a template */
1555 0 : fd_gossip_msg_t gmsg;
1556 0 : fd_gossip_msg_new_disc(&gmsg, fd_gossip_msg_enum_pull_resp);
1557 0 : fd_gossip_pull_resp_t * pull_resp = &gmsg.inner.pull_resp;
1558 0 : fd_hash_copy( &pull_resp->pubkey, glob->public_key );
1559 :
1560 0 : uchar buf[PACKET_DATA_SIZE];
1561 0 : fd_bincode_encode_ctx_t ctx;
1562 0 : ctx.data = buf;
1563 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
1564 0 : if ( fd_gossip_msg_encode( &gmsg, &ctx ) ) {
1565 0 : FD_LOG_WARNING(("fd_gossip_msg_encode failed"));
1566 0 : return;
1567 0 : }
1568 : /* Reach into buffer to get the number of values */
1569 0 : uchar * newend = (uchar *)ctx.data;
1570 0 : ulong * crds_len = (ulong *)(newend - sizeof(ulong));
1571 :
1572 : /* Push an updated version of my contact info into values */
1573 0 : fd_gossip_push_updated_contact(glob);
1574 :
1575 : /* Apply the bloom filter to my table of values */
1576 0 : fd_crds_filter_t * filter = &msg->filter;
1577 0 : ulong nkeys = filter->filter.keys_len;
1578 0 : ulong * keys = filter->filter.keys;
1579 0 : fd_gossip_bitvec_u64_t * bitvec = &filter->filter.bits;
1580 0 : ulong * bitvec2 = bitvec->bits.vec;
1581 0 : ulong expire = FD_NANOSEC_TO_MILLI(glob->now) - FD_GOSSIP_PULL_TIMEOUT;
1582 0 : ulong hits = 0;
1583 0 : ulong misses = 0;
1584 0 : uint npackets = 0;
1585 0 : for( fd_value_table_iter_t iter = fd_value_table_iter_init( glob->values );
1586 0 : !fd_value_table_iter_done( glob->values, iter );
1587 0 : iter = fd_value_table_iter_next( glob->values, iter ) ) {
1588 0 : fd_value_elem_t * ele = fd_value_table_iter_ele( glob->values, iter );
1589 0 : fd_hash_t * hash = &(ele->key);
1590 0 : if (ele->wallclock < expire)
1591 0 : continue;
1592 : /* Execute the bloom filter */
1593 0 : if (filter->mask_bits != 0U) {
1594 0 : ulong m = (~0UL >> filter->mask_bits);
1595 0 : if ((hash->ul[0] | m) != filter->mask)
1596 0 : continue;
1597 0 : }
1598 0 : int miss = 0;
1599 0 : for (ulong i = 0; i < nkeys; ++i) {
1600 0 : ulong pos = fd_gossip_bloom_pos(hash, keys[i], bitvec->len);
1601 0 : ulong * j = bitvec2 + (pos>>6U); /* divide by 64 */
1602 0 : ulong bit = 1UL<<(pos & 63U);
1603 0 : if (!((*j) & bit)) {
1604 0 : miss = 1;
1605 0 : break;
1606 0 : }
1607 0 : }
1608 0 : if (!miss) {
1609 0 : hits++;
1610 0 : continue;
1611 0 : }
1612 0 : misses++;
1613 : /* Add the value in already encoded form */
1614 0 : if (newend + ele->datalen - buf > PACKET_DATA_SIZE) {
1615 : /* Packet is getting too large. Flush it */
1616 0 : ulong sz = (ulong)(newend - buf);
1617 0 : fd_gossip_send_raw(glob, from, buf, sz);
1618 0 : char tmp[100];
1619 0 : FD_LOG_DEBUG(("sent msg type %u to %s size=%lu", gmsg.discriminant, fd_gossip_addr_str(tmp, sizeof(tmp), from), sz));
1620 0 : ++npackets;
1621 0 : newend = (uchar *)ctx.data;
1622 0 : *crds_len = 0;
1623 0 : }
1624 0 : fd_memcpy(newend, ele->data, ele->datalen);
1625 0 : newend += ele->datalen;
1626 0 : (*crds_len)++;
1627 0 : }
1628 :
1629 : /* Flush final packet */
1630 0 : if (newend > (uchar *)ctx.data) {
1631 0 : ulong sz = (ulong)(newend - buf);
1632 0 : fd_gossip_send_raw(glob, from, buf, sz);
1633 0 : char tmp[100];
1634 0 : FD_LOG_DEBUG(("sent msg type %u to %s size=%lu", gmsg.discriminant, fd_gossip_addr_str(tmp, sizeof(tmp), from), sz));
1635 0 : ++npackets;
1636 0 : }
1637 :
1638 0 : if (misses)
1639 0 : FD_LOG_DEBUG(("responded to pull request with %lu values in %u packets (%lu filtered out)", misses, npackets, hits));
1640 0 : }
1641 :
1642 : /* Handle any gossip message */
1643 : static void
1644 0 : fd_gossip_recv(fd_gossip_t * glob, const fd_gossip_peer_addr_t * from, fd_gossip_msg_t * gmsg) {
1645 0 : switch (gmsg->discriminant) {
1646 0 : case fd_gossip_msg_enum_pull_req:
1647 0 : fd_gossip_handle_pull_req(glob, from, &gmsg->inner.pull_req);
1648 0 : break;
1649 0 : case fd_gossip_msg_enum_pull_resp: {
1650 0 : fd_gossip_pull_resp_t * pull_resp = &gmsg->inner.pull_resp;
1651 0 : for (ulong i = 0; i < pull_resp->crds_len; ++i)
1652 0 : fd_gossip_recv_crds_value(glob, NULL, &pull_resp->pubkey, pull_resp->crds + i);
1653 0 : break;
1654 0 : }
1655 0 : case fd_gossip_msg_enum_push_msg: {
1656 0 : fd_gossip_push_msg_t * push_msg = &gmsg->inner.push_msg;
1657 0 : for (ulong i = 0; i < push_msg->crds_len; ++i)
1658 0 : fd_gossip_recv_crds_value(glob, from, &push_msg->pubkey, push_msg->crds + i);
1659 0 : break;
1660 0 : }
1661 0 : case fd_gossip_msg_enum_prune_msg:
1662 0 : fd_gossip_handle_prune(glob, from, &gmsg->inner.prune_msg);
1663 0 : break;
1664 0 : case fd_gossip_msg_enum_ping:
1665 0 : fd_gossip_handle_ping(glob, from, &gmsg->inner.ping);
1666 0 : break;
1667 0 : case fd_gossip_msg_enum_pong:
1668 0 : fd_gossip_handle_pong(glob, from, &gmsg->inner.pong);
1669 0 : break;
1670 0 : }
1671 0 : }
1672 :
1673 : /* Initiate connection to a peer */
1674 : int
1675 0 : fd_gossip_add_active_peer( fd_gossip_t * glob, fd_gossip_peer_addr_t * addr ) {
1676 0 : fd_gossip_lock( glob );
1677 0 : fd_active_elem_t * val = fd_active_table_query(glob->actives, addr, NULL);
1678 0 : if (val == NULL) {
1679 0 : if (fd_active_table_is_full(glob->actives)) {
1680 0 : FD_LOG_WARNING(("too many actives"));
1681 0 : fd_gossip_unlock( glob );
1682 0 : return -1;
1683 0 : }
1684 0 : val = fd_active_table_insert(glob->actives, addr);
1685 0 : fd_active_new_value(val);
1686 0 : val->pingcount = 0; /* Incremented in fd_gossip_make_ping */
1687 0 : }
1688 0 : fd_gossip_unlock( glob );
1689 0 : return 0;
1690 0 : }
1691 :
1692 : /* Improve the set of active push states */
1693 : static void
1694 0 : fd_gossip_refresh_push_states( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
1695 0 : (void)arg;
1696 :
1697 : /* Try again in 20 sec */
1698 0 : fd_pending_event_t * ev = fd_gossip_add_pending(glob, glob->now + (long)20e9);
1699 0 : if (ev) {
1700 0 : ev->fun = fd_gossip_refresh_push_states;
1701 0 : }
1702 :
1703 : /* Delete states which no longer have active peers */
1704 0 : for (ulong i = 0; i < glob->push_states_cnt; ++i) {
1705 0 : fd_push_state_t* s = glob->push_states[i];
1706 0 : if (fd_active_table_query(glob->actives, &s->addr, NULL) == NULL) {
1707 0 : fd_push_states_pool_ele_release(glob->push_states_pool, glob->push_states[i]);
1708 : /* Replace with the one at the end */
1709 0 : glob->push_states[i--] = glob->push_states[--(glob->push_states_cnt)];
1710 0 : }
1711 0 : }
1712 0 : if (glob->push_states_cnt == FD_PUSH_LIST_MAX) {
1713 : /* Delete the worst destination based prune count */
1714 0 : fd_push_state_t * worst_s = glob->push_states[0];
1715 0 : ulong worst_i = 0;
1716 0 : for (ulong i = 1; i < glob->push_states_cnt; ++i) {
1717 0 : fd_push_state_t* s = glob->push_states[i];
1718 0 : if (s->drop_cnt > worst_s->drop_cnt) {
1719 0 : worst_s = s;
1720 0 : worst_i = i;
1721 0 : }
1722 0 : }
1723 0 : fd_push_states_pool_ele_release(glob->push_states_pool, worst_s);
1724 : /* Replace with the one at the end */
1725 0 : glob->push_states[worst_i] = glob->push_states[--(glob->push_states_cnt)];
1726 0 : }
1727 :
1728 : /* Add random actives as new pushers */
1729 0 : int failcnt = 0;
1730 0 : while (glob->push_states_cnt < FD_PUSH_LIST_MAX && failcnt < 5) {
1731 0 : fd_active_elem_t * a = fd_gossip_random_active( glob );
1732 0 : if( a == NULL ) break;
1733 :
1734 0 : for (ulong i = 0; i < glob->push_states_cnt; ++i) {
1735 0 : fd_push_state_t* s = glob->push_states[i];
1736 0 : if (fd_gossip_peer_addr_eq(&s->addr, &a->key))
1737 0 : goto skipadd;
1738 0 : }
1739 0 : failcnt = 0;
1740 :
1741 : /* Build the pusher state */
1742 0 : fd_push_state_t * s = fd_push_states_pool_ele_acquire(glob->push_states_pool);
1743 0 : fd_memset(s, 0, sizeof(fd_push_state_t));
1744 0 : fd_gossip_peer_addr_copy(&s->addr, &a->key);
1745 0 : fd_hash_copy(&s->id, &a->id);
1746 0 : for (ulong j = 0; j < FD_PRUNE_NUM_KEYS; ++j)
1747 0 : s->prune_keys[j] = fd_rng_ulong(glob->rng);
1748 :
1749 : /* Encode an empty push msg template */
1750 0 : fd_gossip_msg_t gmsg;
1751 0 : fd_gossip_msg_new_disc(&gmsg, fd_gossip_msg_enum_push_msg);
1752 0 : fd_gossip_push_msg_t * push_msg = &gmsg.inner.push_msg;
1753 0 : fd_hash_copy( &push_msg->pubkey, glob->public_key );
1754 0 : fd_bincode_encode_ctx_t ctx;
1755 0 : ctx.data = s->packet;
1756 0 : ctx.dataend = s->packet + PACKET_DATA_SIZE;
1757 0 : if ( fd_gossip_msg_encode( &gmsg, &ctx ) ) {
1758 0 : FD_LOG_WARNING(("fd_gossip_msg_encode failed"));
1759 0 : return;
1760 0 : }
1761 0 : s->packet_end_init = s->packet_end = (uchar *)ctx.data;
1762 :
1763 0 : glob->push_states[glob->push_states_cnt++] = s;
1764 0 : break;
1765 :
1766 0 : skipadd:
1767 0 : ++failcnt;
1768 0 : }
1769 0 : }
1770 :
1771 : /* Push the latest values */
1772 : static void
1773 0 : fd_gossip_push( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
1774 0 : (void)arg;
1775 :
1776 : /* Try again in 100 msec */
1777 0 : fd_pending_event_t * ev = fd_gossip_add_pending(glob, glob->now + (long)1e8);
1778 0 : if (ev) {
1779 0 : ev->fun = fd_gossip_push;
1780 0 : }
1781 :
1782 : /* Push an updated version of my contact info into values */
1783 0 : fd_gossip_push_updated_contact(glob);
1784 :
1785 : /* Iterate across recent values */
1786 0 : ulong expire = FD_NANOSEC_TO_MILLI(glob->now) - FD_GOSSIP_PULL_TIMEOUT;
1787 0 : while (glob->need_push_cnt > 0) {
1788 0 : fd_hash_t * h = glob->need_push + ((glob->need_push_head++) & (FD_NEED_PUSH_MAX-1));
1789 0 : glob->need_push_cnt--;
1790 :
1791 0 : fd_value_elem_t * msg = fd_value_table_query(glob->values, h, NULL);
1792 0 : if (msg == NULL || msg->wallclock < expire)
1793 0 : continue;
1794 :
1795 : /* Iterate across push states */
1796 0 : ulong npush = 0;
1797 0 : for (ulong i = 0; i < glob->push_states_cnt && npush < FD_PUSH_VALUE_MAX; ++i) {
1798 0 : fd_push_state_t* s = glob->push_states[i];
1799 :
1800 : /* Apply the pruning bloom filter */
1801 0 : int pass = 0;
1802 0 : for (ulong j = 0; j < FD_PRUNE_NUM_KEYS; ++j) {
1803 0 : ulong pos = fd_gossip_bloom_pos(&msg->origin, s->prune_keys[j], FD_PRUNE_NUM_BITS);
1804 0 : ulong * j = s->prune_bits + (pos>>6U); /* divide by 64 */
1805 0 : ulong bit = 1UL<<(pos & 63U);
1806 0 : if (!(*j & bit)) {
1807 0 : pass = 1;
1808 0 : break;
1809 0 : }
1810 0 : }
1811 0 : if (!pass) {
1812 0 : s->drop_cnt++;
1813 0 : glob->not_push_cnt++;
1814 0 : continue;
1815 0 : }
1816 0 : glob->push_cnt++;
1817 0 : npush++;
1818 :
1819 0 : ulong * crds_len = (ulong *)(s->packet_end_init - sizeof(ulong));
1820 : /* Add the value in already encoded form */
1821 0 : if (s->packet_end + msg->datalen - s->packet > PACKET_DATA_SIZE) {
1822 : /* Packet is getting too large. Flush it */
1823 0 : ulong sz = (ulong)(s->packet_end - s->packet);
1824 0 : fd_gossip_send_raw(glob, &s->addr, s->packet, sz);
1825 0 : char tmp[100];
1826 0 : FD_LOG_DEBUG(("push to %s size=%lu", fd_gossip_addr_str(tmp, sizeof(tmp), &s->addr), sz));
1827 0 : s->packet_end = s->packet_end_init;
1828 0 : *crds_len = 0;
1829 0 : }
1830 0 : fd_memcpy(s->packet_end, msg->data, msg->datalen);
1831 0 : s->packet_end += msg->datalen;
1832 0 : (*crds_len)++;
1833 0 : }
1834 0 : }
1835 :
1836 : /* Flush partially full packets */
1837 0 : for (ulong i = 0; i < glob->push_states_cnt; ++i) {
1838 0 : fd_push_state_t* s = glob->push_states[i];
1839 0 : if (s->packet_end != s->packet_end_init) {
1840 0 : ulong * crds_len = (ulong *)(s->packet_end_init - sizeof(ulong));
1841 0 : ulong sz = (ulong)(s->packet_end - s->packet);
1842 0 : fd_gossip_send_raw(glob, &s->addr, s->packet, sz);
1843 0 : char tmp[100];
1844 0 : FD_LOG_DEBUG(("push to %s size=%lu", fd_gossip_addr_str(tmp, sizeof(tmp), &s->addr), sz));
1845 0 : s->packet_end = s->packet_end_init;
1846 0 : *crds_len = 0;
1847 0 : }
1848 0 : }
1849 0 : }
1850 :
1851 : /* Publish an outgoing value. The source id and wallclock are set by this function */
1852 : static int
1853 0 : fd_gossip_push_value_nolock( fd_gossip_t * glob, fd_crds_data_t * data, fd_hash_t * key_opt ) {
1854 : /* Wrap the data in a value stub. Sign it. */
1855 0 : fd_crds_value_t crd;
1856 0 : fd_memcpy(&crd.data, data, sizeof(fd_crds_data_t));
1857 0 : fd_gossip_sign_crds_value(glob, &crd);
1858 :
1859 : /* Perform the value hash to get the value table key */
1860 0 : uchar buf[PACKET_DATA_SIZE];
1861 0 : fd_bincode_encode_ctx_t ctx;
1862 0 : ctx.data = buf;
1863 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
1864 0 : if ( fd_crds_value_encode( &crd, &ctx ) ) {
1865 0 : FD_LOG_ERR(("fd_crds_value_encode failed"));
1866 0 : return -1;
1867 0 : }
1868 0 : fd_sha256_t sha2[1];
1869 0 : fd_sha256_init( sha2 );
1870 0 : ulong datalen = (ulong)((uchar*)ctx.data - buf);
1871 0 : fd_sha256_append( sha2, buf, datalen );
1872 0 : fd_hash_t key;
1873 0 : fd_sha256_fini( sha2, key.uc );
1874 0 : if ( key_opt != NULL )
1875 0 : fd_hash_copy( key_opt, &key );
1876 :
1877 : /* Store the value for later pushing/duplicate detection */
1878 0 : fd_value_elem_t * msg = fd_value_table_query(glob->values, &key, NULL);
1879 0 : if (msg != NULL) {
1880 : /* Already have this value, which is strange! */
1881 0 : return -1;
1882 0 : }
1883 0 : if (fd_value_table_is_full(glob->values)) {
1884 0 : FD_LOG_DEBUG(("too many values"));
1885 0 : return -1;
1886 0 : }
1887 0 : msg = fd_value_table_insert(glob->values, &key);
1888 0 : msg->wallclock = FD_NANOSEC_TO_MILLI(glob->now); /* convert to ms */
1889 0 : fd_hash_copy(&msg->origin, glob->public_key);
1890 :
1891 : /* We store the serialized form for convenience */
1892 0 : fd_memcpy(msg->data, buf, datalen);
1893 0 : msg->datalen = datalen;
1894 :
1895 0 : if (glob->need_push_cnt < FD_NEED_PUSH_MAX) {
1896 : /* Remember that I need to push this value */
1897 0 : ulong i = ((glob->need_push_head + (glob->need_push_cnt++)) & (FD_NEED_PUSH_MAX-1U));
1898 0 : fd_hash_copy(glob->need_push + i, &key);
1899 0 : }
1900 0 : return 0;
1901 0 : }
1902 :
1903 : int
1904 0 : fd_gossip_push_value( fd_gossip_t * glob, fd_crds_data_t * data, fd_hash_t * key_opt ) {
1905 0 : fd_gossip_lock( glob );
1906 0 : int rc = fd_gossip_push_value_nolock( glob, data, key_opt );
1907 0 : fd_gossip_unlock( glob );
1908 0 : return rc;
1909 0 : }
1910 :
1911 : /* Periodically make prune messages */
1912 : static void
1913 0 : fd_gossip_make_prune( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
1914 0 : (void)arg;
1915 :
1916 : /* Try again in 30 sec */
1917 0 : fd_pending_event_t * ev = fd_gossip_add_pending(glob, glob->now + (long)30e9);
1918 0 : if (ev) {
1919 0 : ev->fun = fd_gossip_make_prune;
1920 0 : }
1921 :
1922 0 : long expire = glob->now - (long)FD_GOSSIP_VALUE_EXPIRE*((long)1e6);
1923 0 : for( fd_stats_table_iter_t iter = fd_stats_table_iter_init( glob->stats );
1924 0 : !fd_stats_table_iter_done( glob->stats, iter );
1925 0 : iter = fd_stats_table_iter_next( glob->stats, iter ) ) {
1926 0 : fd_stats_elem_t * ele = fd_stats_table_iter_ele( glob->stats, iter );
1927 0 : if (ele->last < expire) {
1928 : /* Entry hasn't been updated for a long time */
1929 0 : fd_stats_table_remove( glob->stats, &ele->key );
1930 0 : continue;
1931 0 : }
1932 : /* Look for high duplicate counts */
1933 0 : fd_pubkey_t origins[8];
1934 0 : ulong origins_cnt = 0;
1935 0 : for (ulong i = 0; i < ele->dups_cnt; ++i) {
1936 0 : if (ele->dups[i].cnt >= 20U)
1937 0 : fd_hash_copy(&origins[origins_cnt++], &ele->dups[i].origin);
1938 0 : }
1939 0 : if (origins_cnt == 0U)
1940 0 : continue;
1941 : /* Get the peer id */
1942 0 : fd_peer_elem_t * peerval = fd_peer_table_query(glob->peers, &ele->key, NULL);
1943 : /* Always clean up to restart the dup counter */
1944 0 : fd_stats_table_remove( glob->stats, &ele->key );
1945 0 : if (peerval == NULL)
1946 0 : continue;
1947 :
1948 0 : char keystr[ FD_BASE58_ENCODED_32_SZ ];
1949 0 : fd_base58_encode_32( peerval->id.uc, NULL, keystr );
1950 0 : FD_LOG_DEBUG(("sending prune request for %lu origins to %s", origins_cnt, keystr));
1951 :
1952 : /* Make a prune request */
1953 0 : fd_gossip_msg_t gmsg;
1954 0 : fd_gossip_msg_new_disc(&gmsg, fd_gossip_msg_enum_prune_msg);
1955 0 : fd_gossip_prune_msg_t * prune_msg = &gmsg.inner.prune_msg;
1956 0 : fd_hash_copy(&prune_msg->data.pubkey, glob->public_key);
1957 0 : prune_msg->data.prunes_len = origins_cnt;
1958 0 : prune_msg->data.prunes = origins;;
1959 0 : fd_hash_copy(&prune_msg->data.destination, &peerval->id);
1960 0 : ulong wc = prune_msg->data.wallclock = FD_NANOSEC_TO_MILLI(glob->now);
1961 :
1962 0 : fd_gossip_prune_sign_data_t signdata;
1963 0 : fd_hash_copy(&signdata.pubkey, glob->public_key);
1964 0 : signdata.prunes_len = origins_cnt;
1965 0 : signdata.prunes = origins;;
1966 0 : fd_hash_copy(&signdata.destination, &peerval->id);
1967 0 : signdata.wallclock = wc;
1968 :
1969 0 : uchar buf[PACKET_DATA_SIZE];
1970 0 : fd_bincode_encode_ctx_t ctx;
1971 0 : ctx.data = buf;
1972 0 : ctx.dataend = buf + PACKET_DATA_SIZE;
1973 0 : if ( fd_gossip_prune_sign_data_encode( &signdata, &ctx ) ) {
1974 0 : FD_LOG_ERR(("fd_gossip_prune_sign_data_encode failed"));
1975 0 : return;
1976 0 : }
1977 :
1978 0 : (*glob->sign_fun)( glob->sign_arg, prune_msg->data.signature.uc, buf, (ulong)((uchar*)ctx.data - buf), FD_KEYGUARD_SIGN_TYPE_ED25519 );
1979 :
1980 0 : fd_gossip_send(glob, &peerval->key, &gmsg);
1981 0 : }
1982 0 : }
1983 :
1984 : /* Periodically log status. Removes old peers as a side event. */
1985 : static void
1986 0 : fd_gossip_log_stats( fd_gossip_t * glob, fd_pending_event_arg_t * arg ) {
1987 0 : (void)arg;
1988 :
1989 : /* Try again in 60 sec */
1990 0 : fd_pending_event_t * ev = fd_gossip_add_pending(glob, glob->now + (long)60e9);
1991 0 : if (ev) {
1992 0 : ev->fun = fd_gossip_log_stats;
1993 0 : }
1994 :
1995 0 : if( glob->recv_pkt_cnt == 0 )
1996 0 : FD_LOG_WARNING(("received no gossip packets!!"));
1997 0 : else
1998 0 : FD_LOG_INFO(("received %lu packets", glob->recv_pkt_cnt));
1999 0 : glob->recv_pkt_cnt = 0;
2000 0 : FD_LOG_INFO(("received %lu dup values and %lu new", glob->recv_dup_cnt, glob->recv_nondup_cnt));
2001 0 : glob->recv_dup_cnt = glob->recv_nondup_cnt = 0;
2002 0 : FD_LOG_INFO(("pushed %lu values and filtered %lu", glob->push_cnt, glob->not_push_cnt));
2003 0 : glob->push_cnt = glob->not_push_cnt = 0;
2004 :
2005 0 : for( ulong i = 0UL; i<FD_KNOWN_CRDS_ENUM_MAX; i++ ) {
2006 0 : FD_LOG_INFO(( "received values - type: %2lu, total: %12lu, dups: %12lu, bytes: %12lu", i, glob->msg_stats[i].total_cnt, glob->msg_stats[i].dups_cnt, glob->msg_stats[i].bytes_rx_cnt ));
2007 0 : }
2008 :
2009 0 : int need_inactive = (glob->inactives_cnt == 0);
2010 :
2011 0 : ulong wc = FD_NANOSEC_TO_MILLI(glob->now);
2012 0 : ulong expire = wc - 4U*FD_GOSSIP_VALUE_EXPIRE;
2013 0 : for( fd_peer_table_iter_t iter = fd_peer_table_iter_init( glob->peers );
2014 0 : !fd_peer_table_iter_done( glob->peers, iter );
2015 0 : iter = fd_peer_table_iter_next( glob->peers, iter ) ) {
2016 0 : fd_peer_elem_t * ele = fd_peer_table_iter_ele( glob->peers, iter );
2017 0 : if (ele->wallclock < expire) {
2018 : /* Peer hasn't been updated for a long time */
2019 0 : fd_peer_table_remove( glob->peers, &ele->key );
2020 0 : continue;
2021 0 : }
2022 0 : fd_active_elem_t * act = fd_active_table_query(glob->actives, &ele->key, NULL);
2023 0 : char buf[100];
2024 0 : char keystr[ FD_BASE58_ENCODED_32_SZ ];
2025 0 : fd_base58_encode_32( ele->id.uc, NULL, keystr );
2026 0 : FD_LOG_DEBUG(("peer at %s id %s age %.3f %s",
2027 0 : fd_gossip_addr_str(buf, sizeof(buf), &ele->key),
2028 0 : keystr,
2029 0 : ((double)(wc - ele->wallclock))*0.001,
2030 0 : ((act != NULL && act->pongtime != 0) ? "(active)" : "")));
2031 0 : if (need_inactive && act == NULL && glob->inactives_cnt < INACTIVES_MAX)
2032 0 : fd_gossip_peer_addr_copy(glob->inactives + (glob->inactives_cnt++), &ele->key);
2033 0 : }
2034 0 : }
2035 :
2036 : /* Set the current protocol time in nanosecs */
2037 : void
2038 0 : fd_gossip_settime( fd_gossip_t * glob, long ts ) {
2039 0 : glob->now = ts;
2040 0 : }
2041 :
2042 : /* Get the current protocol time in nanosecs */
2043 : long
2044 0 : fd_gossip_gettime( fd_gossip_t * glob ) {
2045 0 : return glob->now;
2046 0 : }
2047 :
2048 : /* Start timed events and other protocol behavior */
2049 : int
2050 0 : fd_gossip_start( fd_gossip_t * glob ) {
2051 0 : fd_gossip_lock( glob );
2052 : /* Start pulling and pinging on a timer */
2053 0 : fd_pending_event_t * ev = fd_gossip_add_pending(glob, glob->now + (long)1e9);
2054 0 : ev->fun = fd_gossip_random_pull;
2055 0 : ev = fd_gossip_add_pending(glob, glob->now + (long)5e9);
2056 0 : ev->fun = fd_gossip_random_ping;
2057 0 : ev = fd_gossip_add_pending(glob, glob->now + (long)60e9);
2058 0 : ev->fun = fd_gossip_log_stats;
2059 0 : ev = fd_gossip_add_pending(glob, glob->now + (long)20e9);
2060 0 : ev->fun = fd_gossip_refresh_push_states;
2061 0 : ev = fd_gossip_add_pending(glob, glob->now + (long)1e8);
2062 0 : ev->fun = fd_gossip_push;
2063 0 : ev = fd_gossip_add_pending(glob, glob->now + (long)30e9);
2064 0 : ev->fun = fd_gossip_make_prune;
2065 0 : fd_gossip_unlock( glob );
2066 :
2067 0 : return 0;
2068 0 : }
2069 :
2070 : /* Dispatch timed events and other protocol behavior. This should be
2071 : * called inside the main spin loop. */
2072 : int
2073 0 : fd_gossip_continue( fd_gossip_t * glob ) {
2074 0 : fd_gossip_lock( glob );
2075 0 : do {
2076 0 : fd_pending_event_t * ev = fd_pending_heap_ele_peek_min( glob->event_heap, glob->event_pool );
2077 0 : if (ev == NULL || ev->key > glob->now)
2078 0 : break;
2079 0 : fd_pending_event_t evcopy;
2080 0 : fd_memcpy(&evcopy, ev, sizeof(evcopy));
2081 0 : fd_pending_heap_ele_remove_min( glob->event_heap, glob->event_pool );
2082 0 : fd_pending_pool_ele_release( glob->event_pool, ev );
2083 0 : (*evcopy.fun)(glob, &evcopy.fun_arg);
2084 0 : } while (1);
2085 0 : fd_gossip_unlock( glob );
2086 0 : return 0;
2087 0 : }
2088 :
2089 : /* Pass a raw gossip packet into the protocol. msg_name is the unix socket address of the sender */
2090 : int
2091 0 : fd_gossip_recv_packet( fd_gossip_t * glob, uchar const * msg, ulong msglen, fd_gossip_peer_addr_t const * from ) {
2092 0 : fd_gossip_lock( glob );
2093 0 : FD_SCRATCH_SCOPE_BEGIN {
2094 0 : glob->recv_pkt_cnt++;
2095 : /* Deserialize the message */
2096 0 : fd_gossip_msg_t gmsg;
2097 0 : fd_bincode_decode_ctx_t ctx;
2098 0 : ctx.data = msg;
2099 0 : ctx.dataend = msg + msglen;
2100 0 : ctx.valloc = fd_scratch_virtual();
2101 0 : if (fd_gossip_msg_decode(&gmsg, &ctx)) {
2102 0 : FD_LOG_WARNING(("corrupt gossip message"));
2103 0 : fd_gossip_unlock( glob );
2104 0 : return -1;
2105 0 : }
2106 0 : if (ctx.data != ctx.dataend) {
2107 0 : FD_LOG_WARNING(("corrupt gossip message"));
2108 0 : fd_gossip_unlock( glob );
2109 0 : return -1;
2110 0 : }
2111 :
2112 0 : char tmp[100];
2113 :
2114 0 : FD_LOG_DEBUG(("recv msg type %u from %s", gmsg.discriminant, fd_gossip_addr_str(tmp, sizeof(tmp), from)));
2115 0 : fd_gossip_recv(glob, from, &gmsg);
2116 :
2117 0 : fd_gossip_unlock( glob );
2118 0 : } FD_SCRATCH_SCOPE_END;
2119 0 : return 0;
2120 0 : }
2121 :
2122 : ushort
2123 0 : fd_gossip_get_shred_version( fd_gossip_t const * glob ) {
2124 0 : return glob->my_contact_info.shred_version;
2125 0 : }
2126 :
2127 : void
2128 : fd_gossip_set_stake_weights( fd_gossip_t * gossip,
2129 : fd_stake_weight_t const * stake_weights,
2130 0 : ulong stake_weights_cnt ) {
2131 0 : if( stake_weights == NULL ) {
2132 0 : FD_LOG_ERR(( "stake weights NULL" ));
2133 0 : }
2134 :
2135 0 : if( stake_weights_cnt > MAX_STAKE_WEIGHTS ) {
2136 0 : FD_LOG_ERR(( "num stake weights (%lu) is larger than max allowed stake weights", stake_weights_cnt ));
2137 0 : }
2138 :
2139 0 : fd_gossip_lock( gossip );
2140 :
2141 : /* Clear out the table for new stake weights. */
2142 0 : for ( fd_weights_table_iter_t iter = fd_weights_table_iter_init( gossip->weights );
2143 0 : !fd_weights_table_iter_done( gossip->weights, iter);
2144 0 : iter = fd_weights_table_iter_next( gossip->weights, iter ) ) {
2145 0 : fd_weights_elem_t * e = fd_weights_table_iter_ele( gossip->weights, iter );
2146 0 : fd_weights_table_remove( gossip->weights, &e->key );
2147 0 : }
2148 :
2149 0 : for( ulong i = 0; i < stake_weights_cnt; ++i ) {
2150 0 : if( !stake_weights[i].stake ) continue;
2151 0 : fd_weights_elem_t * val = fd_weights_table_insert( gossip->weights, &stake_weights[i].key );
2152 : // Weight is log2(stake)^2
2153 0 : ulong w = (ulong)fd_ulong_find_msb( stake_weights[i].stake ) + 1;
2154 0 : val->weight = w*w;
2155 0 : }
2156 :
2157 0 : for( fd_active_table_iter_t iter = fd_active_table_iter_init( gossip->actives );
2158 0 : !fd_active_table_iter_done( gossip->actives, iter );
2159 0 : iter = fd_active_table_iter_next( gossip->actives, iter ) ) {
2160 0 : fd_active_elem_t * ele = fd_active_table_iter_ele( gossip->actives, iter );
2161 0 : fd_weights_elem_t const * val = fd_weights_table_query_const( gossip->weights, &ele->id, NULL );
2162 0 : ele->weight = ( val == NULL ? 1UL : val->weight );
2163 0 : }
2164 :
2165 0 : fd_gossip_unlock( gossip );
2166 0 : }
2167 :
2168 : void
2169 : fd_gossip_set_entrypoints( fd_gossip_t * gossip,
2170 : uint entrypoints[static 16],
2171 : ulong entrypoints_cnt,
2172 0 : ushort * ports ) {
2173 0 : gossip->entrypoints_cnt = entrypoints_cnt;
2174 0 : for( ulong i = 0UL; i<entrypoints_cnt; i++) {
2175 0 : fd_gossip_peer_addr_t addr;
2176 0 : addr.addr = entrypoints[i];
2177 0 : addr.port = fd_ushort_bswap( ports[i] );
2178 0 : FD_LOG_NOTICE(( "gossip initial peer - addr: " FD_IP4_ADDR_FMT ":%u",
2179 0 : FD_IP4_ADDR_FMT_ARGS( addr.addr ), fd_ushort_bswap( addr.port ) ));
2180 0 : fd_gossip_add_active_peer( gossip, &addr );
2181 0 : gossip->entrypoints[i] = addr;
2182 0 : }
2183 0 : }
2184 :
2185 : uint
2186 0 : fd_gossip_is_allowed_entrypoint( fd_gossip_t * gossip, fd_gossip_peer_addr_t * addr ) {
2187 0 : for( ulong i = 0UL; i<gossip->entrypoints_cnt; i++) {
2188 0 : if (fd_gossip_peer_addr_eq( addr, &gossip->entrypoints[i]) ) return 1;
2189 0 : }
2190 0 : return 0;
2191 0 : }
|