Line data Source code
1 : #include "fd_gossip.h"
2 : #include "fd_bloom.h"
3 : #include "fd_gossip_private.h"
4 : #include "fd_gossip_txbuild.h"
5 : #include "fd_active_set.h"
6 : #include "fd_ping_tracker.h"
7 : #include "crds/fd_crds.h"
8 : #include "../../disco/keyguard/fd_keyguard.h"
9 : #include "../../ballet/sha256/fd_sha256.h"
10 :
11 : FD_STATIC_ASSERT( FD_METRICS_ENUM_GOSSIP_MESSAGE_CNT==FD_GOSSIP_MESSAGE_LAST+1UL,
12 : "FD_METRICS_ENUM_GOSSIP_MESSAGE_CNT must match FD_GOSSIP_MESSAGE_LAST+1" );
13 :
14 : FD_STATIC_ASSERT( FD_METRICS_ENUM_CRDS_VALUE_CNT==FD_GOSSIP_VALUE_LAST+1UL,
15 : "FD_METRICS_ENUM_CRDS_VALUE_CNT must match FD_GOSSIP_VALUE_LAST+1" );
16 :
17 : #include <math.h>
18 :
19 0 : #define BLOOM_FALSE_POSITIVE_RATE (0.1)
20 0 : #define BLOOM_NUM_KEYS (8.0)
21 :
22 : struct stake {
23 : fd_pubkey_t pubkey;
24 : ulong stake;
25 :
26 : struct {
27 : ulong prev;
28 : ulong next;
29 : } map;
30 :
31 : struct {
32 : ulong next;
33 : } pool;
34 : };
35 :
36 : typedef struct stake stake_t;
37 :
38 : /* NOTE: Since the staked count is known at the time we populate
39 : the map, we can treat the pool as an array instead. This means we
40 : can bypass the acquire/release model and quickly iterate through the
41 : pool when we repopulate the map on every fd_gossip_stakes_update
42 : iteration. */
43 : #define POOL_NAME stake_pool
44 6 : #define POOL_T stake_t
45 : #define POOL_IDX_T ulong
46 98304 : #define POOL_NEXT pool.next
47 : #include "../../util/tmpl/fd_pool.c"
48 :
49 : #define MAP_NAME stake_map
50 0 : #define MAP_KEY pubkey
51 0 : #define MAP_ELE_T stake_t
52 : #define MAP_KEY_T fd_pubkey_t
53 0 : #define MAP_PREV map.prev
54 0 : #define MAP_NEXT map.next
55 0 : #define MAP_KEY_EQ(k0,k1) fd_pubkey_eq( k0, k1 )
56 0 : #define MAP_KEY_HASH(key,seed) (seed^fd_ulong_load_8( (key)->uc ))
57 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
58 : #include "../../util/tmpl/fd_map_chain.c"
59 :
60 : #include "fd_push_set_private.c"
61 :
62 : struct fd_gossip_private {
63 : uchar identity_pubkey[ 32UL ];
64 : ulong identity_stake;
65 :
66 : fd_gossip_metrics_t metrics[1];
67 :
68 : fd_crds_t * crds;
69 : fd_active_set_t * active_set;
70 : fd_ping_tracker_t * ping_tracker;
71 :
72 : fd_sha256_t sha256[1];
73 : fd_sha512_t sha512[1];
74 :
75 : ulong entrypoints_cnt;
76 : fd_ip4_port_t entrypoints[ 16UL ];
77 :
78 : fd_rng_t * rng;
79 :
80 : struct {
81 : ulong count;
82 : stake_t * pool;
83 : stake_map_t * map;
84 : } stake;
85 :
86 : struct {
87 : long next_pull_request;
88 : long next_active_set_refresh;
89 : long next_contact_info_refresh;
90 : long next_flush_push_state;
91 : } timers;
92 :
93 : /* Callbacks */
94 : fd_gossip_sign_fn sign_fn;
95 : void * sign_ctx;
96 :
97 : fd_gossip_send_fn send_fn;
98 : void * send_ctx;
99 :
100 : fd_ping_tracker_change_fn ping_tracker_change_fn;
101 : void * ping_tracker_change_fn_ctx;
102 :
103 : struct {
104 : uchar crds_val[ FD_GOSSIP_CRDS_MAX_SZ ];
105 : ulong crds_val_sz;
106 : fd_contact_info_t ci[1];
107 : } my_contact_info;
108 :
109 : /* Push state for each peer in the active set. Tracks the active set,
110 : and must be flushed prior to a call to fd_active_set_rotate or
111 : fd_active_set_prune. */
112 : push_set_t * active_pset;
113 : fd_gossip_out_ctx_t * gossip_net_out;
114 : };
115 :
116 : FD_FN_CONST ulong
117 27 : fd_gossip_align( void ) {
118 27 : return 128uL;
119 27 : }
120 :
121 : FD_FN_CONST ulong
122 : fd_gossip_footprint( ulong max_values,
123 6 : ulong entrypoints_len ) {
124 6 : ulong l;
125 6 : l = FD_LAYOUT_INIT;
126 6 : l = FD_LAYOUT_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t) );
127 6 : l = FD_LAYOUT_APPEND( l, fd_crds_align(), fd_crds_footprint( max_values, max_values ) );
128 6 : l = FD_LAYOUT_APPEND( l, fd_active_set_align(), fd_active_set_footprint() );
129 6 : l = FD_LAYOUT_APPEND( l, fd_ping_tracker_align(), fd_ping_tracker_footprint( entrypoints_len ) );
130 6 : l = FD_LAYOUT_APPEND( l, stake_pool_align(), stake_pool_footprint( CRDS_MAX_CONTACT_INFO ) );
131 6 : l = FD_LAYOUT_APPEND( l, stake_map_align(), stake_map_footprint( stake_map_chain_cnt_est( CRDS_MAX_CONTACT_INFO ) ) );
132 6 : l = FD_LAYOUT_APPEND( l, push_set_align(), push_set_footprint( FD_ACTIVE_SET_MAX_PEERS ) );
133 6 : l = FD_LAYOUT_FINI( l, fd_gossip_align() );
134 6 : return l;
135 6 : }
136 :
137 : static void
138 : ping_tracker_change( void * _ctx,
139 : uchar const * peer_pubkey,
140 : fd_ip4_port_t peer_address,
141 : long now,
142 0 : int change_type ) {
143 0 : fd_gossip_t * ctx = (fd_gossip_t *)_ctx;
144 :
145 0 : if( FD_UNLIKELY( !memcmp( peer_pubkey, ctx->identity_pubkey, 32UL ) ) ) return;
146 :
147 0 : switch( change_type ) {
148 0 : case FD_PING_TRACKER_CHANGE_TYPE_ACTIVE: fd_crds_peer_active( ctx->crds, peer_pubkey, now ); break;
149 0 : case FD_PING_TRACKER_CHANGE_TYPE_INACTIVE: fd_crds_peer_inactive( ctx->crds, peer_pubkey, now ); break;
150 0 : case FD_PING_TRACKER_CHANGE_TYPE_INACTIVE_STAKED: break;
151 0 : default: FD_LOG_ERR(( "Unknown change type %d", change_type )); return;
152 0 : }
153 :
154 0 : ctx->ping_tracker_change_fn( ctx->ping_tracker_change_fn_ctx, peer_pubkey, peer_address, now, change_type );
155 0 : }
156 :
157 : void *
158 : fd_gossip_new( void * shmem,
159 : fd_rng_t * rng,
160 : ulong max_values,
161 : ulong entrypoints_len,
162 : fd_ip4_port_t const * entrypoints,
163 : fd_contact_info_t const * my_contact_info,
164 : long now,
165 : fd_gossip_send_fn send_fn,
166 : void * send_ctx,
167 : fd_gossip_sign_fn sign_fn,
168 : void * sign_ctx,
169 : fd_ping_tracker_change_fn ping_tracker_change_fn,
170 : void * ping_tracker_change_fn_ctx,
171 : fd_gossip_out_ctx_t * gossip_update_out,
172 3 : fd_gossip_out_ctx_t * gossip_net_out ) {
173 3 : if( FD_UNLIKELY( !shmem ) ) {
174 0 : FD_LOG_WARNING(( "NULL shmem" ));
175 0 : return NULL;
176 0 : }
177 :
178 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_gossip_align() ) ) ) {
179 0 : FD_LOG_WARNING(( "misaligned shmem" ));
180 0 : return NULL;
181 0 : }
182 :
183 3 : if( FD_UNLIKELY( entrypoints_len>16UL ) ) {
184 0 : FD_LOG_WARNING(( "entrypoints_cnt must be in [0, 16]" ));
185 0 : return NULL;
186 0 : }
187 :
188 3 : if( FD_UNLIKELY( !fd_ulong_is_pow2( max_values ) ) ) {
189 0 : FD_LOG_WARNING(( "max_values must be a power of 2" ));
190 0 : return NULL;
191 0 : }
192 3 : ulong stake_map_chain_cnt = stake_map_chain_cnt_est( CRDS_MAX_CONTACT_INFO );
193 :
194 3 : FD_SCRATCH_ALLOC_INIT( l, shmem );
195 3 : fd_gossip_t * gossip = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t) );
196 3 : void * crds = FD_SCRATCH_ALLOC_APPEND( l, fd_crds_align(), fd_crds_footprint( max_values, max_values ) );
197 3 : void * active_set = FD_SCRATCH_ALLOC_APPEND( l, fd_active_set_align(), fd_active_set_footprint() );
198 3 : void * ping_tracker = FD_SCRATCH_ALLOC_APPEND( l, fd_ping_tracker_align(), fd_ping_tracker_footprint( entrypoints_len ) );
199 3 : void * stake_pool = FD_SCRATCH_ALLOC_APPEND( l, stake_pool_align(), stake_pool_footprint( CRDS_MAX_CONTACT_INFO ) );
200 3 : void * stake_weights = FD_SCRATCH_ALLOC_APPEND( l, stake_map_align(), stake_map_footprint( stake_map_chain_cnt ) );
201 3 : void * active_ps = FD_SCRATCH_ALLOC_APPEND( l, push_set_align(), push_set_footprint( FD_ACTIVE_SET_MAX_PEERS ) );
202 3 : FD_TEST( FD_SCRATCH_ALLOC_FINI( l, fd_gossip_align() ) == (ulong)shmem + fd_gossip_footprint( max_values, entrypoints_len ) );
203 :
204 3 : gossip->gossip_net_out = gossip_net_out;
205 :
206 3 : gossip->entrypoints_cnt = entrypoints_len;
207 3 : fd_memcpy( gossip->entrypoints, entrypoints, entrypoints_len*sizeof(fd_ip4_port_t) );
208 :
209 3 : gossip->crds = fd_crds_join( fd_crds_new( crds, rng, max_values, max_values, gossip_update_out ) );
210 3 : FD_TEST( gossip->crds );
211 :
212 3 : gossip->active_set = fd_active_set_join( fd_active_set_new( active_set, rng ) );
213 3 : FD_TEST( gossip->active_set );
214 :
215 3 : gossip->ping_tracker = fd_ping_tracker_join( fd_ping_tracker_new( ping_tracker, rng, gossip->entrypoints_cnt, gossip->entrypoints, ping_tracker_change, gossip ) );
216 3 : FD_TEST( gossip->ping_tracker );
217 :
218 3 : gossip->stake.count = 0UL;
219 3 : gossip->stake.pool = stake_pool_join( stake_pool_new( stake_pool, CRDS_MAX_CONTACT_INFO ) );
220 3 : FD_TEST( gossip->stake.pool );
221 :
222 3 : gossip->stake.map = stake_map_join( stake_map_new( stake_weights, stake_map_chain_cnt, fd_rng_ulong( rng ) ) );
223 3 : FD_TEST( gossip->stake.map );
224 :
225 3 : gossip->active_pset = push_set_join( push_set_new( active_ps, FD_ACTIVE_SET_MAX_PEERS ) );
226 3 : FD_TEST( gossip->active_pset );
227 :
228 3 : FD_TEST( fd_sha256_join( fd_sha256_new( gossip->sha256 ) ) );
229 3 : FD_TEST( fd_sha512_join( fd_sha512_new( gossip->sha512 ) ) );
230 :
231 3 : gossip->rng = rng;
232 :
233 3 : gossip->timers.next_pull_request = 0L;
234 3 : gossip->timers.next_active_set_refresh = 0L;
235 3 : gossip->timers.next_contact_info_refresh = 0L;
236 3 : gossip->timers.next_flush_push_state = 0L;
237 :
238 3 : gossip->send_fn = send_fn;
239 3 : gossip->send_ctx = send_ctx;
240 3 : gossip->sign_fn = sign_fn;
241 3 : gossip->sign_ctx = sign_ctx;
242 3 : gossip->ping_tracker_change_fn = ping_tracker_change_fn;
243 3 : gossip->ping_tracker_change_fn_ctx = ping_tracker_change_fn_ctx;
244 :
245 3 : fd_gossip_set_my_contact_info( gossip, my_contact_info, now );
246 :
247 3 : fd_memset( gossip->metrics, 0, sizeof(fd_gossip_metrics_t) );
248 :
249 3 : return gossip;
250 3 : }
251 :
252 : fd_gossip_t *
253 3 : fd_gossip_join( void * shgossip ) {
254 3 : if( FD_UNLIKELY( !shgossip ) ) {
255 0 : FD_LOG_WARNING(( "NULL shgossip" ));
256 0 : return NULL;
257 0 : }
258 :
259 3 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shgossip, fd_gossip_align() ) ) ) {
260 0 : FD_LOG_WARNING(( "misaligned shgossip" ));
261 0 : return NULL;
262 0 : }
263 :
264 3 : return (fd_gossip_t *)shgossip;
265 3 : }
266 :
267 : fd_gossip_metrics_t const *
268 0 : fd_gossip_metrics( fd_gossip_t const * gossip ) {
269 0 : return gossip->metrics;
270 0 : }
271 :
272 : fd_crds_metrics_t const *
273 0 : fd_gossip_crds_metrics( fd_gossip_t const * gossip ) {
274 0 : return fd_crds_metrics( gossip->crds );
275 0 : }
276 :
277 : fd_ping_tracker_metrics_t const *
278 0 : fd_gossip_ping_tracker_metrics( fd_gossip_t const * gossip ) {
279 0 : return fd_ping_tracker_metrics( gossip->ping_tracker );
280 0 : }
281 :
282 : static fd_ip4_port_t
283 0 : random_entrypoint( fd_gossip_t const * gossip ) {
284 0 : ulong idx = fd_rng_ulong_roll( gossip->rng, gossip->entrypoints_cnt );
285 0 : return gossip->entrypoints[ idx ];
286 0 : }
287 :
288 : static void
289 : txbuild_flush( fd_gossip_t * gossip,
290 : fd_gossip_txbuild_t * txbuild,
291 : fd_stem_context_t * stem,
292 : fd_ip4_port_t dest_addr,
293 0 : long now ) {
294 0 : if( FD_UNLIKELY( !txbuild->crds_len ) ) return;
295 :
296 0 : gossip->send_fn( gossip->send_ctx, stem, txbuild->bytes, txbuild->bytes_len, &dest_addr, (ulong)now );
297 :
298 0 : gossip->metrics->message_tx[ txbuild->tag ]++;
299 0 : gossip->metrics->message_tx_bytes[ txbuild->tag ] += txbuild->bytes_len+42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
300 0 : for( ulong i=0UL; i<txbuild->crds_len; i++ ) {
301 0 : if( FD_LIKELY( txbuild->tag==FD_GOSSIP_MESSAGE_PUSH ) ) {
302 0 : gossip->metrics->crds_tx_push[ txbuild->crds[ i ].tag ]++;
303 0 : gossip->metrics->crds_tx_push_bytes[ txbuild->crds[ i ].tag ] += txbuild->crds[ i ].sz;
304 0 : } else {
305 0 : gossip->metrics->crds_tx_pull_response[ txbuild->crds[ i ].tag ]++;
306 0 : gossip->metrics->crds_tx_pull_response_bytes[ txbuild->crds[ i ].tag ] += txbuild->crds[ i ].sz;
307 0 : }
308 0 : }
309 :
310 0 : fd_gossip_txbuild_init( txbuild, gossip->identity_pubkey, txbuild->tag );
311 0 : }
312 :
313 : /* Note: NOT a no-op in the case contact info does not exist. We
314 : reset and push it back to the last-hit queue instead.
315 :
316 : TODO: Is this desired behavior? */
317 :
318 : static void
319 : active_push_set_flush( fd_gossip_t * gossip,
320 : push_set_t * pset,
321 : ulong idx,
322 : fd_stem_context_t * stem,
323 0 : long now ) {
324 0 : fd_contact_info_t const * ci = fd_crds_contact_info_lookup( gossip->crds, fd_active_set_node_pubkey( gossip->active_set, idx ) );
325 0 : push_set_entry_t * state = pset_entry_pool_ele( pset->pool, idx );
326 0 : if( FD_LIKELY( ci ) ) {
327 0 : txbuild_flush( gossip, state->txbuild, stem, fd_contact_info_gossip_socket( ci ), now );
328 0 : } else {
329 0 : fd_gossip_txbuild_init( state->txbuild, gossip->identity_pubkey, state->txbuild->tag );
330 0 : }
331 0 : push_set_pop_append( pset, state, now );
332 0 : }
333 :
334 : static void
335 : active_push_set_insert( fd_gossip_t * gossip,
336 : uchar const * crds_val,
337 : ulong crds_sz,
338 : uchar const * origin_pubkey,
339 : ulong origin_stake,
340 : fd_stem_context_t * stem,
341 : long now,
342 0 : int flush_immediately ) {
343 0 : ulong out_nodes[ 12UL ];
344 0 : ulong out_nodes_cnt = fd_active_set_nodes( gossip->active_set,
345 0 : gossip->identity_pubkey,
346 0 : gossip->identity_stake,
347 0 : origin_pubkey,
348 0 : origin_stake,
349 0 : 0UL, /* ignore_prunes_if_peer_is_origin TODO */
350 0 : out_nodes );
351 0 : for( ulong j=0UL; j<out_nodes_cnt; j++ ) {
352 0 : ulong idx = out_nodes[ j ];
353 0 : push_set_entry_t * entry = pset_entry_pool_ele( gossip->active_pset->pool, idx );
354 0 : if( FD_UNLIKELY( !fd_gossip_txbuild_can_fit( entry->txbuild, crds_sz ) ) ) {
355 0 : active_push_set_flush( gossip, gossip->active_pset, idx, stem, now );
356 0 : }
357 :
358 0 : fd_gossip_txbuild_append( entry->txbuild, crds_sz, crds_val );
359 0 : push_set_pop_append( gossip->active_pset, entry, now );
360 0 : if( FD_UNLIKELY( !!flush_immediately ) ) {
361 0 : active_push_set_flush( gossip, gossip->active_pset, idx, stem, now );
362 0 : }
363 0 : }
364 0 : }
365 :
366 : static void
367 : push_my_contact_info( fd_gossip_t * gossip,
368 : fd_stem_context_t * stem,
369 0 : long now ){
370 0 : active_push_set_insert( gossip,
371 0 : gossip->my_contact_info.crds_val,
372 0 : gossip->my_contact_info.crds_val_sz,
373 0 : gossip->identity_pubkey,
374 0 : gossip->identity_stake,
375 0 : stem,
376 0 : now,
377 0 : 0 /* flush_immediately */ );
378 0 : }
379 :
380 : static inline void
381 : refresh_contact_info( fd_gossip_t * gossip,
382 3 : long now ) {
383 3 : gossip->my_contact_info.ci->wallclock_nanos = now;
384 3 : fd_gossip_contact_info_encode( gossip->my_contact_info.ci,
385 3 : gossip->my_contact_info.crds_val,
386 3 : FD_GOSSIP_CRDS_MAX_SZ,
387 3 : &gossip->my_contact_info.crds_val_sz );
388 3 : gossip->sign_fn( gossip->sign_ctx,
389 3 : gossip->my_contact_info.crds_val+64UL,
390 3 : gossip->my_contact_info.crds_val_sz-64UL,
391 3 : FD_KEYGUARD_SIGN_TYPE_ED25519,
392 3 : gossip->my_contact_info.crds_val );
393 :
394 : /* We don't have stem_ctx here so we pre-empt in next
395 : fd_gossip_advance iteration instead. */
396 3 : gossip->timers.next_contact_info_refresh = now;
397 3 : }
398 :
399 : void
400 : fd_gossip_set_my_contact_info( fd_gossip_t * gossip,
401 : fd_contact_info_t const * contact_info,
402 3 : long now ) {
403 3 : fd_memcpy( gossip->identity_pubkey, contact_info->pubkey.uc, 32UL );
404 :
405 3 : *gossip->my_contact_info.ci = *contact_info;
406 3 : refresh_contact_info( gossip, now );
407 3 : }
408 :
409 : ulong
410 : get_stake( fd_gossip_t const * gossip,
411 0 : uchar const * pubkey ) {
412 0 : stake_t const * entry = stake_map_ele_query_const( gossip->stake.map, (fd_pubkey_t const *)pubkey, NULL, gossip->stake.pool );
413 0 : if( FD_UNLIKELY( !entry ) ) return 0UL;
414 0 : return entry->stake;
415 0 : }
416 :
417 : void
418 : fd_gossip_stakes_update( fd_gossip_t * gossip,
419 : fd_stake_weight_t const * stake_weights,
420 0 : ulong stake_weights_cnt ) {
421 0 : if( FD_UNLIKELY( stake_weights_cnt>CRDS_MAX_CONTACT_INFO ) ) {
422 0 : FD_LOG_ERR(( "stake_weights_cnt %lu exceeds maximum of %d", stake_weights_cnt, CRDS_MAX_CONTACT_INFO ));
423 0 : }
424 :
425 : /* Clear the map, this requires us to iterate through all elements and
426 : individually call map remove. */
427 0 : for( ulong i=0UL; i<gossip->stake.count; i++ ) {
428 0 : stake_map_idx_remove_fast( gossip->stake.map, i, gossip->stake.pool );
429 0 : }
430 :
431 0 : for( ulong i=0UL; i<stake_weights_cnt; i++ ) {
432 0 : stake_t * entry = stake_pool_ele( gossip->stake.pool, i );
433 0 : fd_memcpy( entry->pubkey.uc, stake_weights[i].key.uc, 32UL );
434 0 : entry->stake = stake_weights[i].stake;
435 :
436 0 : stake_map_idx_insert( gossip->stake.map, i, gossip->stake.pool );
437 0 : }
438 : /* Update the identity stake */
439 0 : gossip->identity_stake = get_stake( gossip, gossip->identity_pubkey );
440 0 : gossip->stake.count = stake_weights_cnt;
441 0 : }
442 :
443 : static void
444 : rx_pull_request( fd_gossip_t * gossip,
445 : fd_gossip_view_pull_request_t const * pr_view,
446 : uchar const * payload,
447 : fd_ip4_port_t peer_addr,
448 : fd_stem_context_t * stem,
449 0 : long now ) {
450 : /* TODO: Implement data budget? Or at least limit iteration range */
451 :
452 0 : fd_bloom_t filter[1];
453 0 : filter->keys_len = pr_view->bloom_keys_len;
454 0 : filter->keys = (ulong *)( payload + pr_view->bloom_keys_offset );
455 :
456 0 : filter->bits_len = pr_view->bloom_bits_cnt;
457 0 : filter->bits = (ulong *)( payload + pr_view->bloom_bits_offset );
458 :
459 0 : fd_gossip_txbuild_t pull_resp[1];
460 0 : fd_gossip_txbuild_init( pull_resp, gossip->identity_pubkey, FD_GOSSIP_MESSAGE_PULL_RESPONSE );
461 :
462 0 : uchar iter_mem[ 16UL ];
463 :
464 0 : for( fd_crds_mask_iter_t * it=fd_crds_mask_iter_init( gossip->crds, pr_view->mask, pr_view->mask_bits, iter_mem );
465 0 : !fd_crds_mask_iter_done( it, gossip->crds );
466 0 : it=fd_crds_mask_iter_next( it, gossip->crds ) ) {
467 0 : fd_crds_entry_t const * candidate = fd_crds_mask_iter_entry( it, gossip->crds );
468 :
469 : /* TODO: Add jitter here? */
470 : // if( FD_UNLIKELY( fd_crds_value_wallclock( candidate )>contact_info->wallclock_nanos ) ) continue;
471 :
472 0 : if( FD_UNLIKELY( !fd_bloom_contains( filter, fd_crds_entry_hash( candidate ), 32UL ) ) ) continue;
473 :
474 0 : uchar const * crds_val;
475 0 : ulong crds_size;
476 0 : fd_crds_entry_value( candidate, &crds_val, &crds_size );
477 0 : if( FD_UNLIKELY( !fd_gossip_txbuild_can_fit( pull_resp, crds_size ) ) ) {
478 0 : txbuild_flush( gossip, pull_resp, stem, peer_addr, now );
479 0 : }
480 0 : fd_gossip_txbuild_append( pull_resp, crds_size, crds_val );
481 0 : }
482 :
483 0 : txbuild_flush( gossip, pull_resp, stem, peer_addr, now );
484 0 : }
485 :
486 : static void
487 : rx_pull_response( fd_gossip_t * gossip,
488 : fd_gossip_view_pull_response_t const * pull_response,
489 : uchar const * payload,
490 : fd_stem_context_t * stem,
491 0 : long now ) {
492 0 : for( ulong i=0UL; i<pull_response->crds_values_len; i++ ) {
493 0 : fd_gossip_view_crds_value_t const * value = &pull_response->crds_values[ i ];
494 :
495 0 : int checks_res = fd_crds_checks_fast( gossip->crds, value, payload, 0 /* from_push_msg m*/ );
496 0 : if( FD_UNLIKELY( !!checks_res ) ) {
497 0 : checks_res < 0 ? gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PULL_RESPONSE_STALE_IDX ]++
498 0 : : gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PULL_RESPONSE_DUPLICATE_IDX ]++;
499 0 : continue;
500 0 : }
501 :
502 0 : uchar const * origin_pubkey = payload+value->pubkey_off;
503 0 : ulong origin_stake = get_stake( gossip, origin_pubkey );
504 :
505 : /* TODO: Is this jittered in Agave? */
506 0 : long accept_after_nanos;
507 0 : uchar is_me = !memcmp( origin_pubkey, gossip->identity_pubkey, 32UL );
508 0 : if( FD_UNLIKELY( is_me ) ) {
509 0 : accept_after_nanos = 0L;
510 0 : } else if( !origin_stake && fd_crds_has_staked_node( gossip->crds ) ) {
511 0 : accept_after_nanos = now-15L*1000L*1000L*1000L;
512 0 : } else {
513 0 : accept_after_nanos = now-432000L*400L*1000L*1000L;
514 0 : }
515 :
516 : /* https://github.com/anza-xyz/agave/blob/540d5bc56cd44e3cc61b179bd52e9a782a2c99e4/gossip/src/crds_gossip_pull.rs#L340-L351 */
517 0 : if( FD_UNLIKELY( accept_after_nanos>value->wallclock_nanos &&
518 0 : !fd_crds_contact_info_lookup( gossip->crds, origin_pubkey ) ) ) {
519 0 : gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PULL_RESPONSE_WALLCLOCK_IDX ]++;
520 0 : uchar candidate_hash[ 32UL ];
521 0 : fd_crds_generate_hash( gossip->sha256, payload+value->value_off, value->length, candidate_hash );
522 0 : fd_crds_insert_failed_insert( gossip->crds, candidate_hash, now );
523 0 : continue;
524 0 : }
525 :
526 0 : fd_crds_entry_t const * candidate = fd_crds_insert( gossip->crds,
527 0 : value,
528 0 : payload,
529 0 : origin_stake,
530 0 : is_me,
531 0 : now,
532 0 : stem );
533 0 : FD_TEST( candidate );
534 0 : gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_UPSERTED_PULL_RESPONSE_IDX ]++;
535 0 : if( FD_UNLIKELY( fd_crds_entry_is_contact_info( candidate ) ) ){
536 0 : fd_contact_info_t const * contact_info = fd_crds_entry_contact_info( candidate );
537 :
538 0 : fd_ip4_port_t origin_addr = fd_contact_info_gossip_socket( contact_info );
539 0 : if( FD_LIKELY( !is_me ) ) fd_ping_tracker_track( gossip->ping_tracker, origin_pubkey, origin_stake, origin_addr, now );
540 0 : gossip->metrics->ci_rx_unrecognized_socket_tag_cnt += value->ci_view->unrecognized_socket_tag_cnt;
541 0 : gossip->metrics->ci_rx_ipv6_address_cnt += value->ci_view->ip6_cnt;
542 0 : }
543 0 : active_push_set_insert( gossip, payload+value->value_off, value->length, origin_pubkey, origin_stake, stem, now, 0 /* flush_immediately */ );
544 0 : }
545 0 : }
546 :
547 : /* process_push_crds() > 0 holds the duplicate count */
548 : static int
549 : process_push_crds( fd_gossip_t * gossip,
550 : fd_gossip_view_crds_value_t const * value,
551 : uchar const * payload,
552 : long now,
553 0 : fd_stem_context_t * stem ) {
554 : /* overrides_fast here, either count duplicates or purge if older (how!?) */
555 :
556 : /* return values in both fd_crds_checks_fast and fd_crds_inserted need
557 : to be propagated since they both work the same (error>0 holds duplicate
558 : count). This is quite fragile. */
559 0 : int checks_res = fd_crds_checks_fast( gossip->crds,
560 0 : value,
561 0 : payload,
562 0 : 1 /* from_push_msg */ );
563 0 : if( FD_UNLIKELY( !!checks_res ) ) {
564 0 : checks_res < 0 ? gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PUSH_STALE_IDX ]++
565 0 : : gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PUSH_DUPLICATE_IDX ]++;
566 0 : return checks_res;
567 0 : }
568 :
569 0 : gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_UPSERTED_PUSH_IDX ]++;
570 :
571 0 : uchar const * origin_pubkey = payload+value->pubkey_off;
572 0 : uchar is_me = !memcmp( origin_pubkey, gossip->identity_pubkey, 32UL );
573 0 : ulong origin_stake = get_stake( gossip, origin_pubkey );
574 :
575 :
576 0 : fd_crds_entry_t const * candidate = fd_crds_insert( gossip->crds,
577 0 : value,
578 0 : payload,
579 0 : origin_stake,
580 0 : is_me,
581 0 : now,
582 0 : stem );
583 0 : FD_TEST( candidate );
584 0 : if( FD_UNLIKELY( fd_crds_entry_is_contact_info( candidate ) ) ) {
585 0 : fd_contact_info_t const * contact_info = fd_crds_entry_contact_info( candidate );
586 :
587 0 : fd_ip4_port_t origin_addr = contact_info->sockets[ FD_CONTACT_INFO_SOCKET_GOSSIP ];
588 0 : if( FD_LIKELY( !is_me ) ) fd_ping_tracker_track( gossip->ping_tracker, origin_pubkey, origin_stake, origin_addr, now );
589 0 : gossip->metrics->ci_rx_unrecognized_socket_tag_cnt += value->ci_view->unrecognized_socket_tag_cnt;
590 0 : gossip->metrics->ci_rx_ipv6_address_cnt += value->ci_view->ip6_cnt;
591 0 : }
592 0 : active_push_set_insert( gossip, payload+value->value_off, value->length, origin_pubkey, origin_stake, stem, now, 0 /* flush_immediately */ );
593 0 : return 0;
594 0 : }
595 :
596 : static void
597 : rx_push( fd_gossip_t * gossip,
598 : fd_gossip_view_push_t const * push,
599 : uchar const * payload,
600 : long now,
601 0 : fd_stem_context_t * stem ) {
602 0 : for( ulong i=0UL; i<push->crds_values_len; i++ ) {
603 0 : int err = process_push_crds( gossip, &push->crds_values[ i ], payload, now, stem );
604 0 : if( FD_UNLIKELY( err>0 ) ) {
605 : /* TODO: implement prune finder
606 : ulong num_duplicates = (ulong)err;
607 : uchar const * relayer_pubkey = payload+push->from_off;
608 : fd_prune_finder_record( gossip->prune_finder,
609 : origin_pubkey,
610 : origin_stake,
611 : relayer_pubkey,
612 : get_stake( gossip, relayer_pubkey ),
613 : num_duplicates ); */
614 0 : }
615 0 : }
616 0 : }
617 :
618 : static void
619 : rx_prune( fd_gossip_t * gossip,
620 : uchar const * payload,
621 0 : fd_gossip_view_prune_t const * prune ) {
622 0 : uchar const * push_dest_pubkey = payload+prune->pubkey_off;
623 0 : uchar const * origins = payload+prune->origins_off;
624 0 : for( ulong i=0UL; i<prune->origins_len; i++ ) {
625 0 : uchar const * origin_pubkey = &origins[ i*32UL ];
626 0 : ulong origin_stake = get_stake( gossip, origin_pubkey );
627 0 : fd_active_set_prune( gossip->active_set,
628 0 : push_dest_pubkey,
629 0 : origin_pubkey,
630 0 : origin_stake,
631 0 : gossip->identity_pubkey,
632 0 : gossip->identity_stake );
633 0 : }
634 0 : }
635 :
636 :
637 : static void
638 : rx_ping( fd_gossip_t * gossip,
639 : fd_gossip_view_ping_t * ping,
640 : fd_ip4_port_t peer_address,
641 : fd_stem_context_t * stem,
642 0 : long now ) {
643 : /* TODO: have this point to dcache buffer directly instead */
644 0 : uchar out_payload[ sizeof(fd_gossip_view_pong_t) + 4UL];
645 0 : FD_STORE( uint, out_payload, FD_GOSSIP_MESSAGE_PONG );
646 :
647 0 : fd_gossip_view_pong_t * out_pong = (fd_gossip_view_pong_t *)(out_payload + 4UL);
648 0 : fd_memcpy( out_pong->pubkey, gossip->identity_pubkey, 32UL );
649 :
650 : /* fd_keyguard checks payloads for certain patterns before performing the
651 : sign. Pattern-matching can't be done on hashed data, so we need
652 : to supply the pre-hashed image to the sign fn (fd_keyguard will hash when
653 : supplied with FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519) while also hashing
654 : the image ourselves onto pong->ping_hash */
655 :
656 0 : uchar pre_image[ 48UL ];
657 0 : fd_memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
658 0 : fd_memcpy( pre_image+16UL, ping->ping_token, 32UL );
659 :
660 0 : fd_sha256_hash( pre_image, 48UL, out_pong->ping_hash );
661 :
662 0 : gossip->sign_fn( gossip->sign_ctx, pre_image, 48UL, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519, out_pong->signature );
663 0 : gossip->send_fn( gossip->send_ctx, stem, (uchar *)out_payload, sizeof(out_payload), &peer_address, (ulong)now );
664 :
665 0 : gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PONG ]++;
666 0 : gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PONG ] += sizeof(out_payload) + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
667 0 : }
668 :
669 : static void
670 : rx_pong( fd_gossip_t * gossip,
671 : fd_gossip_view_pong_t * pong,
672 : fd_ip4_port_t peer_address,
673 0 : long now ) {
674 0 : ulong stake = get_stake( gossip, pong->pubkey );
675 0 : fd_ping_tracker_register( gossip->ping_tracker, pong->pubkey, stake, peer_address, pong->ping_hash, now );
676 0 : }
677 :
678 : void
679 : fd_gossip_rx( fd_gossip_t * gossip,
680 : fd_ip4_port_t peer,
681 : uchar const * data,
682 : ulong data_sz,
683 : long now,
684 0 : fd_stem_context_t * stem ) {
685 : /* TODO: Implement traffic shaper / bandwidth limiter */
686 0 : FD_TEST( data_sz>=sizeof(fd_gossip_view_t) );
687 0 : fd_gossip_view_t const * view = (fd_gossip_view_t const *)data;
688 0 : uchar const * payload = data+sizeof(fd_gossip_view_t);
689 :
690 0 : switch( view->tag ) {
691 0 : case FD_GOSSIP_MESSAGE_PULL_REQUEST:
692 0 : rx_pull_request( gossip, view->pull_request, payload, peer, stem, now );
693 0 : break;
694 0 : case FD_GOSSIP_MESSAGE_PULL_RESPONSE:
695 0 : rx_pull_response( gossip, view->pull_response, payload, stem, now );
696 0 : break;
697 0 : case FD_GOSSIP_MESSAGE_PUSH:
698 0 : rx_push( gossip, view->push, payload, now, stem );
699 0 : break;
700 0 : case FD_GOSSIP_MESSAGE_PRUNE:
701 0 : rx_prune( gossip, payload, view->prune );
702 0 : break;
703 0 : case FD_GOSSIP_MESSAGE_PING:
704 0 : rx_ping( gossip, (fd_gossip_view_ping_t *)(payload+view->ping_pong_off), peer, stem, now );
705 0 : break;
706 0 : case FD_GOSSIP_MESSAGE_PONG:
707 0 : rx_pong( gossip, (fd_gossip_view_pong_t *)(payload+view->ping_pong_off), peer, now );
708 0 : break;
709 0 : default:
710 0 : FD_LOG_CRIT(( "Unknown gossip message type %d", view->tag ));
711 0 : break;
712 0 : }
713 0 : }
714 :
715 : int
716 : fd_gossip_push_vote( fd_gossip_t * gossip,
717 : uchar const * txn,
718 : ulong txn_sz,
719 : fd_stem_context_t * stem,
720 0 : long now ) {
721 : /* TODO: we can avoid addt'l memcpy if we pass a propely laid out
722 : crds buffer instead */
723 0 : uchar crds_val[ FD_GOSSIP_CRDS_MAX_SZ ];
724 0 : ulong crds_val_sz;
725 0 : fd_gossip_crds_vote_encode( crds_val,
726 0 : FD_GOSSIP_CRDS_MAX_SZ,
727 0 : txn,
728 0 : txn_sz,
729 0 : gossip->identity_pubkey,
730 0 : now,
731 0 : &crds_val_sz );
732 0 : fd_gossip_view_crds_value_t value[1];
733 :
734 0 : gossip->sign_fn( gossip->sign_ctx,
735 0 : crds_val+64UL,
736 0 : crds_val_sz-64UL,
737 0 : FD_KEYGUARD_SIGN_TYPE_ED25519,
738 0 : crds_val );
739 :
740 0 : value->tag = FD_GOSSIP_VALUE_VOTE;
741 0 : value->value_off = 0UL;
742 0 : value->length = (ushort)crds_val_sz;
743 0 : value->pubkey_off = 64UL+1UL; /* Signature + vote index */
744 0 : value->wallclock_nanos = now;
745 0 : fd_gossip_view_vote_t * vote = value->vote;
746 0 : vote->index = 0UL; /* TODO */
747 0 : vote->txn_sz = (ushort)txn_sz;
748 0 : vote->txn_off = 64UL+1UL+32UL; /* Signature + vote index + pubkey */
749 :
750 0 : int res = fd_crds_checks_fast( gossip->crds, value, crds_val, 0 );
751 0 : if( FD_UNLIKELY( res ) ) return -1;
752 :
753 0 : fd_crds_entry_t const * entry = fd_crds_insert( gossip->crds, value, crds_val, gossip->identity_stake, 1, /* is_me */ now, stem );
754 0 : if( FD_UNLIKELY( !entry ) ) return -1;
755 :
756 0 : active_push_set_insert( gossip,
757 0 : crds_val,
758 0 : crds_val_sz,
759 0 : gossip->identity_pubkey,
760 0 : gossip->identity_stake,
761 0 : stem,
762 0 : now,
763 0 : 1 /* flush_immediately */ );
764 0 : return 0;
765 0 : }
766 :
767 : static void
768 : tx_ping( fd_gossip_t * gossip,
769 : fd_stem_context_t * stem,
770 0 : long now ) {
771 : /* TODO: have this point to dcache buffer directly instead. */
772 0 : uchar out_payload[ sizeof(fd_gossip_view_ping_t) + 4UL ];
773 0 : FD_STORE( uint, out_payload, FD_GOSSIP_MESSAGE_PING );
774 :
775 0 : fd_gossip_view_ping_t * out_ping = (fd_gossip_view_ping_t *)( out_payload + 4UL );
776 0 : fd_memcpy( out_ping->pubkey, gossip->identity_pubkey, 32UL );
777 :
778 0 : uchar const * peer_pubkey;
779 0 : uchar const * ping_token;
780 0 : fd_ip4_port_t const * peer_address;
781 0 : while( fd_ping_tracker_pop_request( gossip->ping_tracker,
782 0 : now,
783 0 : &peer_pubkey,
784 0 : &peer_address,
785 0 : &ping_token ) ) {
786 0 : fd_memcpy( out_ping->ping_token, ping_token, 32UL );
787 :
788 0 : gossip->sign_fn( gossip->sign_ctx, out_ping->ping_token, 32UL, FD_KEYGUARD_SIGN_TYPE_ED25519, out_ping->signature );
789 0 : gossip->send_fn( gossip->send_ctx, stem, out_payload, sizeof(out_payload), peer_address, (ulong)now );
790 :
791 0 : gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PING ]++;
792 0 : gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PING ] += sizeof(out_payload) + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
793 0 : }
794 0 : }
795 :
796 : static void
797 : tx_pull_request( fd_gossip_t * gossip,
798 : fd_stem_context_t * stem,
799 0 : long now ) {
800 0 : ulong total_crds_vals = fd_crds_len( gossip->crds ) + fd_crds_purged_len( gossip->crds );
801 0 : ulong num_items = fd_ulong_max( 512UL, total_crds_vals );
802 :
803 0 : double max_bits = (double)fd_gossip_pull_request_max_filter_bits( BLOOM_NUM_KEYS, gossip->my_contact_info.crds_val_sz, FD_GOSSIP_MTU );
804 0 : double max_items = fd_bloom_max_items( max_bits, BLOOM_NUM_KEYS, BLOOM_FALSE_POSITIVE_RATE );
805 0 : ulong num_bits = fd_bloom_num_bits( max_items, BLOOM_FALSE_POSITIVE_RATE, max_bits );
806 :
807 0 : double _mask_bits = ceil( log2( (double)num_items / max_items ) );
808 0 : uint mask_bits = _mask_bits >= 0.0 ? (uint)_mask_bits : 0UL;
809 0 : ulong mask = fd_rng_ulong( gossip->rng ) | (~0UL>>(mask_bits));
810 :
811 0 : uchar payload[ FD_GOSSIP_MTU ] = {0};
812 :
813 0 : ulong payload_sz;
814 0 : ulong * keys_ptr, * bits_ptr, * bits_set;
815 :
816 0 : int res = fd_gossip_pull_request_init( payload,
817 0 : FD_GOSSIP_MTU,
818 0 : BLOOM_NUM_KEYS,
819 0 : num_bits,
820 0 : mask,
821 0 : mask_bits,
822 0 : gossip->my_contact_info.crds_val,
823 0 : gossip->my_contact_info.crds_val_sz,
824 0 : &keys_ptr,
825 0 : &bits_ptr,
826 0 : &bits_set,
827 0 : &payload_sz );
828 0 : FD_TEST( !res && payload_sz<=FD_GOSSIP_MTU );
829 :
830 0 : fd_bloom_t filter[1];
831 0 : fd_bloom_init_inplace( keys_ptr, bits_ptr, BLOOM_NUM_KEYS, num_bits, 0, gossip->rng, BLOOM_FALSE_POSITIVE_RATE, filter );
832 :
833 0 : uchar iter_mem[ 16UL ];
834 0 : for( fd_crds_mask_iter_t * it = fd_crds_mask_iter_init( gossip->crds, mask, mask_bits, iter_mem );
835 0 : !fd_crds_mask_iter_done( it, gossip->crds );
836 0 : it = fd_crds_mask_iter_next( it, gossip->crds ) ) {
837 0 : fd_bloom_insert( filter, fd_crds_entry_hash( fd_crds_mask_iter_entry( it, gossip->crds ) ), 32UL );
838 0 : }
839 :
840 0 : for( fd_crds_mask_iter_t * it = fd_crds_purged_mask_iter_init( gossip->crds, mask, mask_bits, iter_mem );
841 0 : !fd_crds_purged_mask_iter_done( it, gossip->crds );
842 0 : it = fd_crds_purged_mask_iter_next( it, gossip->crds ) ){
843 0 : fd_bloom_insert( filter, fd_crds_purged_mask_iter_hash( it, gossip->crds ), 32UL );
844 0 : }
845 :
846 0 : int num_bits_set = 0;
847 0 : for( ulong i=0UL; i<(num_bits+63)/64UL; i++ ) num_bits_set += fd_ulong_popcnt( bits_ptr[ i ] );
848 0 : *bits_set = (ulong)num_bits_set;
849 :
850 0 : fd_contact_info_t const * peer = fd_crds_peer_sample( gossip->crds, gossip->rng );
851 0 : fd_ip4_port_t peer_addr;
852 0 : if( FD_UNLIKELY( !peer ) ) {
853 0 : if( FD_UNLIKELY( !gossip->entrypoints_cnt ) ) {
854 : /* We are the bootstrapping node, and nobody else is present in
855 : the cluster. Nowhere to send the pull request. */
856 0 : return;
857 0 : }
858 0 : peer_addr = random_entrypoint( gossip );
859 0 : } else {
860 0 : peer_addr = fd_contact_info_gossip_socket( peer );
861 0 : }
862 0 : gossip->send_fn( gossip->send_ctx, stem, payload, payload_sz, &peer_addr, (ulong)now );
863 :
864 0 : gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PULL_REQUEST ]++;
865 0 : gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PULL_REQUEST ] += payload_sz + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
866 0 : }
867 :
868 : static inline long
869 : next_pull_request( fd_gossip_t const * gossip,
870 0 : long now ) {
871 0 : (void)gossip;
872 : /* TODO: Not always every 200 micros ... we should send less frequently
873 : the table is smaller. Agave sends 1024 every 200 millis, but
874 : reduces 1024 to a lower amount as the table size shrinks...
875 : replicate this in the frequency domain. */
876 : /* TODO: Jitter */
877 0 : return now+(long)1e9;
878 0 : }
879 :
880 : static inline void
881 : rotate_active_set( fd_gossip_t * gossip,
882 : fd_stem_context_t * stem,
883 0 : long now ) {
884 0 : push_set_t * pset = gossip->active_pset;
885 0 : ulong replaced_idx = fd_active_set_rotate( gossip->active_set, gossip->crds );
886 0 : if( FD_UNLIKELY( replaced_idx==ULONG_MAX ) ) {
887 0 : return;
888 0 : }
889 0 : push_set_entry_t * entry = pset_entry_pool_ele( pset->pool, replaced_idx );
890 0 : if( FD_LIKELY( !!entry->pool.in_use ) ) {
891 0 : active_push_set_flush( gossip, pset, replaced_idx, stem, now );
892 0 : } else {
893 0 : entry->pool.in_use = 1U;
894 0 : entry->last_hit.wallclock_nanos = now;
895 0 : pset_last_hit_ele_push_tail( pset->last_hit, entry, pset->pool );
896 0 : }
897 :
898 0 : fd_gossip_txbuild_init( entry->txbuild, gossip->identity_pubkey, FD_GOSSIP_MESSAGE_PUSH );
899 0 : }
900 :
901 : static inline void
902 : flush_stale_push_states( fd_gossip_t * gossip,
903 : fd_stem_context_t * stem,
904 0 : long now ) {
905 0 : long stale_if_before = now-1*1000L*1000L;
906 0 : push_set_t * push_set = gossip->active_pset;
907 0 : if( FD_UNLIKELY( pset_last_hit_is_empty( push_set->last_hit, push_set->pool ) ) ) return;
908 :
909 0 : for(;;) {
910 0 : push_set_entry_t * entry = pset_last_hit_ele_peek_head( push_set->last_hit, push_set->pool );
911 0 : ulong entry_idx = pset_entry_pool_idx( push_set->pool, entry );
912 0 : if( FD_UNLIKELY( entry->last_hit.wallclock_nanos>stale_if_before ) ) break;
913 0 : active_push_set_flush( gossip, push_set, entry_idx, stem, now );
914 0 : }
915 0 : }
916 :
917 : void
918 : fd_gossip_advance( fd_gossip_t * gossip,
919 : long now,
920 0 : fd_stem_context_t * stem ) {
921 0 : fd_crds_advance( gossip->crds, now, stem );
922 0 : tx_ping( gossip, stem, now );
923 0 : flush_stale_push_states( gossip, stem, now );
924 0 : if( FD_UNLIKELY( now>=gossip->timers.next_pull_request ) ) {
925 0 : tx_pull_request( gossip, stem, now );
926 0 : gossip->timers.next_pull_request = next_pull_request( gossip, now );
927 0 : }
928 0 : if( FD_UNLIKELY( now>=gossip->timers.next_contact_info_refresh ) ) {
929 : /* TODO: Frequency of this? More often if observing? */
930 0 : refresh_contact_info( gossip, now );
931 0 : push_my_contact_info( gossip, stem, now);
932 0 : gossip->timers.next_contact_info_refresh = now+15L*500L*1000L*1000L; /* TODO: Jitter */
933 0 : }
934 0 : if( FD_UNLIKELY( now>=gossip->timers.next_active_set_refresh ) ) {
935 0 : rotate_active_set( gossip, stem, now );
936 0 : gossip->timers.next_active_set_refresh = now+300L*1000L*1000L; /* TODO: Jitter */
937 0 : }
938 0 : }
939 :
940 : void
941 : fd_gossip_ping_tracker_track( fd_gossip_t * gossip,
942 : uchar const * peer_pubkey,
943 : fd_ip4_port_t peer_address,
944 0 : long now ) {
945 0 : ulong origin_stake = get_stake( gossip, peer_pubkey );
946 0 : fd_ping_tracker_track( gossip->ping_tracker, peer_pubkey, origin_stake, peer_address, now );
947 0 : }
|