Line data Source code
1 : #include "fd_gossip.h"
2 : #include "fd_bloom.h"
3 : #include "fd_gossip_message.h"
4 : #include "fd_gossip_txbuild.h"
5 : #include "fd_active_set.h"
6 : #include "fd_ping_tracker.h"
7 : #include "fd_prune_finder.h"
8 : #include "fd_gossip_wsample.h"
9 : #include "../../disco/keyguard/fd_keyguard.h"
10 : #include "../../ballet/sha256/fd_sha256.h"
11 : #include "../leaders/fd_leaders_base.h"
12 :
13 : FD_STATIC_ASSERT( FD_METRICS_ENUM_GOSSIP_MESSAGE_CNT==FD_GOSSIP_MESSAGE_CNT,
14 : "FD_METRICS_ENUM_GOSSIP_MESSAGE_CNT must match FD_GOSSIP_MESSAGE_CNT" );
15 :
16 : FD_STATIC_ASSERT( FD_METRICS_ENUM_CRDS_VALUE_CNT==FD_GOSSIP_VALUE_CNT,
17 : "FD_METRICS_ENUM_CRDS_VALUE_CNT must match FD_GOSSIP_VALUE_CNT" );
18 :
19 0 : #define BLOOM_FALSE_POSITIVE_RATE (0.1)
20 0 : #define BLOOM_NUM_KEYS (8.0)
21 :
22 : struct stake {
23 : fd_pubkey_t pubkey;
24 : ulong stake;
25 :
26 : struct {
27 : ulong prev;
28 : ulong next;
29 : } map;
30 :
31 : struct {
32 : ulong next;
33 : } pool;
34 : };
35 :
36 : typedef struct stake stake_t;
37 :
38 : /* NOTE: Since the staked count is known at the time we populate
39 : the map, we can treat the pool as an array instead. This means we
40 : can bypass the acquire/release model and quickly iterate through the
41 : pool when we repopulate the map on every fd_gossip_stakes_update
42 : iteration. */
43 : #define POOL_NAME stake_pool
44 0 : #define POOL_T stake_t
45 : #define POOL_IDX_T ulong
46 0 : #define POOL_NEXT pool.next
47 : #include "../../util/tmpl/fd_pool.c"
48 :
49 : #define MAP_NAME stake_map
50 0 : #define MAP_KEY pubkey
51 : #define MAP_ELE_T stake_t
52 : #define MAP_KEY_T fd_pubkey_t
53 0 : #define MAP_PREV map.prev
54 0 : #define MAP_NEXT map.next
55 0 : #define MAP_KEY_EQ(k0,k1) fd_pubkey_eq( k0, k1 )
56 0 : #define MAP_KEY_HASH(key,seed) (seed^fd_ulong_load_8( (key)->uc ))
57 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
58 : #include "../../util/tmpl/fd_map_chain.c"
59 :
60 : struct fd_gossip_private {
61 : uchar identity_pubkey[ 32UL ];
62 : ulong identity_stake;
63 :
64 : fd_gossip_metrics_t metrics[1];
65 :
66 : fd_gossip_wsample_t * wsample;
67 : fd_crds_t * crds;
68 : fd_gossip_purged_t * purged;
69 : fd_active_set_t * active_set;
70 : fd_ping_tracker_t * ping_tracker;
71 : fd_prune_finder_t * prune_finder;
72 :
73 : fd_sha256_t sha256[1];
74 : fd_sha512_t sha512[1];
75 :
76 : ulong entrypoints_cnt;
77 : fd_ip4_port_t entrypoints[ 16UL ];
78 :
79 : fd_rng_t * rng;
80 :
81 : struct {
82 : ulong count;
83 : stake_t * pool;
84 : stake_map_t * map;
85 : } stake;
86 :
87 : struct {
88 : long next_pull_request;
89 : long next_active_set_refresh;
90 : long next_contact_info_refresh;
91 : long next_flush_push_state;
92 : } timers;
93 :
94 : /* Token-bucket rate limiter for outbound pull response data.
95 : Matches Agave's DataBudget: replenished every 100ms with
96 : num_staked*1024 bytes, capped at 5x that amount. Only
97 : pull responses are rate-limited; push messages are not. */
98 : struct {
99 : ulong remaining; /* bytes remaining in budget (signed) */
100 : long last_replenish_nanos; /* last replenish timestamp in nanos */
101 : } outbound_budget;
102 :
103 : /* Per-request iteration budget for the CRDS treap scan in
104 : rx_pull_request. Reset at the start of each request. */
105 : struct {
106 : ulong remaining;
107 : } scan_budget;
108 :
109 : /* Callbacks */
110 : fd_gossip_sign_fn sign_fn;
111 : void * sign_ctx;
112 :
113 : fd_gossip_send_fn send_fn;
114 : void * send_ctx;
115 :
116 : fd_ping_tracker_change_fn ping_tracker_change_fn;
117 : void * ping_tracker_change_fn_ctx;
118 :
119 : struct {
120 : uchar crds_val[ FD_GOSSIP_VALUE_MAX_SZ ];
121 : ulong crds_val_sz;
122 : fd_gossip_value_t ci[1];
123 : } my_contact_info;
124 :
125 : fd_gossip_out_ctx_t * gossip_net_out;
126 : };
127 :
128 : FD_FN_CONST ulong
129 0 : fd_gossip_align( void ) {
130 0 : return 128uL;
131 0 : }
132 :
133 : FD_FN_CONST ulong
134 : fd_gossip_footprint( ulong max_values,
135 0 : ulong entrypoints_len ) {
136 0 : ulong l;
137 0 : l = FD_LAYOUT_INIT;
138 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t) );
139 0 : l = FD_LAYOUT_APPEND( l, fd_gossip_purged_align(), fd_gossip_purged_footprint( max_values ) );
140 0 : l = FD_LAYOUT_APPEND( l, fd_gossip_wsample_align(),fd_gossip_wsample_footprint( FD_CONTACT_INFO_TABLE_SIZE ) );
141 0 : l = FD_LAYOUT_APPEND( l, fd_crds_align(), fd_crds_footprint( max_values ) );
142 0 : l = FD_LAYOUT_APPEND( l, fd_active_set_align(), fd_active_set_footprint() );
143 0 : l = FD_LAYOUT_APPEND( l, fd_ping_tracker_align(), fd_ping_tracker_footprint( entrypoints_len ) );
144 0 : l = FD_LAYOUT_APPEND( l, fd_prune_finder_align(), fd_prune_finder_footprint() );
145 0 : l = FD_LAYOUT_APPEND( l, stake_pool_align(), stake_pool_footprint( MAX_SHRED_DESTS ) );
146 0 : l = FD_LAYOUT_APPEND( l, stake_map_align(), stake_map_footprint( stake_map_chain_cnt_est( MAX_SHRED_DESTS ) ) );
147 0 : l = FD_LAYOUT_FINI( l, fd_gossip_align() );
148 0 : return l;
149 0 : }
150 :
151 : static void
152 : ping_tracker_change( void * _ctx,
153 : uchar const * peer_pubkey,
154 : fd_ip4_port_t peer_address,
155 : long now,
156 0 : int change_type ) {
157 0 : fd_gossip_t * ctx = (fd_gossip_t *)_ctx;
158 :
159 0 : if( FD_UNLIKELY( !memcmp( peer_pubkey, ctx->identity_pubkey, 32UL ) ) ) return;
160 :
161 0 : if( FD_LIKELY( change_type==FD_PING_TRACKER_CHANGE_TYPE_ACTIVE ) ) {
162 0 : fd_gossip_purged_drain_no_contact_info( ctx->purged, peer_pubkey );
163 0 : }
164 :
165 0 : ulong ci_idx = fd_crds_ci_idx( ctx->crds, peer_pubkey );
166 0 : if( FD_UNLIKELY( ci_idx!=ULONG_MAX ) ) {
167 0 : switch( change_type ) {
168 0 : case FD_PING_TRACKER_CHANGE_TYPE_ACTIVE:
169 0 : fd_gossip_wsample_ping_tracked( ctx->wsample, ci_idx, 1 );
170 0 : break;
171 0 : case FD_PING_TRACKER_CHANGE_TYPE_INACTIVE:
172 0 : case FD_PING_TRACKER_CHANGE_TYPE_INACTIVE_STAKED:
173 0 : fd_gossip_wsample_ping_tracked( ctx->wsample, ci_idx, 0 );
174 0 : fd_active_set_remove_peer( ctx->active_set, ci_idx );
175 0 : break;
176 0 : default: FD_LOG_ERR(( "Unknown change type %d", change_type )); return;
177 0 : }
178 0 : }
179 :
180 0 : ctx->ping_tracker_change_fn( ctx->ping_tracker_change_fn_ctx, peer_pubkey, peer_address, now, change_type );
181 0 : }
182 :
183 : static inline void
184 : refresh_contact_info( fd_gossip_t * gossip,
185 0 : long now ) {
186 0 : fd_memcpy( gossip->my_contact_info.ci->origin, gossip->identity_pubkey, 32UL );
187 0 : gossip->my_contact_info.ci->wallclock = (ulong)FD_NANOSEC_TO_MILLI( now );
188 0 : long sz = fd_gossip_value_serialize( gossip->my_contact_info.ci, gossip->my_contact_info.crds_val, FD_GOSSIP_VALUE_MAX_SZ );
189 0 : FD_TEST( sz!=-1L );
190 0 : gossip->my_contact_info.crds_val_sz = (ulong)sz;
191 :
192 0 : gossip->sign_fn( gossip->sign_ctx,
193 0 : gossip->my_contact_info.crds_val+64UL,
194 0 : gossip->my_contact_info.crds_val_sz-64UL,
195 0 : FD_KEYGUARD_SIGN_TYPE_ED25519,
196 0 : gossip->my_contact_info.crds_val );
197 :
198 : /* We don't have stem_ctx here so we pre-empt in next
199 : fd_gossip_advance iteration instead. */
200 0 : gossip->timers.next_contact_info_refresh = now;
201 0 : }
202 :
203 : void *
204 : fd_gossip_new( void * shmem,
205 : fd_rng_t * rng,
206 : ulong max_values,
207 : ulong entrypoints_len,
208 : fd_ip4_port_t const * entrypoints,
209 : uchar const * identity_pubkey,
210 : fd_gossip_contact_info_t const * my_contact_info,
211 : long now,
212 : fd_gossip_send_fn send_fn,
213 : void * send_ctx,
214 : fd_gossip_sign_fn sign_fn,
215 : void * sign_ctx,
216 : fd_ping_tracker_change_fn ping_tracker_change_fn,
217 : void * ping_tracker_change_fn_ctx,
218 : fd_gossip_activity_update_fn activity_update_fn,
219 : void * activity_update_fn_ctx,
220 : fd_gossip_out_ctx_t * gossip_update_out,
221 0 : fd_gossip_out_ctx_t * gossip_net_out ) {
222 0 : if( FD_UNLIKELY( !shmem ) ) {
223 0 : FD_LOG_WARNING(( "NULL shmem" ));
224 0 : return NULL;
225 0 : }
226 :
227 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_gossip_align() ) ) ) {
228 0 : FD_LOG_WARNING(( "misaligned shmem" ));
229 0 : return NULL;
230 0 : }
231 :
232 0 : if( FD_UNLIKELY( entrypoints_len>16UL ) ) {
233 0 : FD_LOG_WARNING(( "entrypoints_cnt must be in [0, 16]" ));
234 0 : return NULL;
235 0 : }
236 :
237 0 : if( FD_UNLIKELY( !fd_ulong_is_pow2( max_values ) ) ) {
238 0 : FD_LOG_WARNING(( "max_values must be a power of 2" ));
239 0 : return NULL;
240 0 : }
241 :
242 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
243 0 : fd_gossip_t * gossip = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t) );
244 0 : void * purged = FD_SCRATCH_ALLOC_APPEND( l, fd_gossip_purged_align(), fd_gossip_purged_footprint( max_values ) );
245 0 : void * wsample = FD_SCRATCH_ALLOC_APPEND( l, fd_gossip_wsample_align(), fd_gossip_wsample_footprint( FD_CONTACT_INFO_TABLE_SIZE ) );
246 0 : void * crds = FD_SCRATCH_ALLOC_APPEND( l, fd_crds_align(), fd_crds_footprint( max_values ) );
247 0 : void * active_set = FD_SCRATCH_ALLOC_APPEND( l, fd_active_set_align(), fd_active_set_footprint() );
248 0 : void * ping_tracker = FD_SCRATCH_ALLOC_APPEND( l, fd_ping_tracker_align(), fd_ping_tracker_footprint( entrypoints_len ) );
249 0 : void * prune_finder = FD_SCRATCH_ALLOC_APPEND( l, fd_prune_finder_align(), fd_prune_finder_footprint() );
250 0 : void * stake_pool = FD_SCRATCH_ALLOC_APPEND( l, stake_pool_align(), stake_pool_footprint( MAX_SHRED_DESTS ) );
251 0 : void * stake_weights = FD_SCRATCH_ALLOC_APPEND( l, stake_map_align(), stake_map_footprint( stake_map_chain_cnt_est( MAX_SHRED_DESTS ) ) );
252 :
253 0 : gossip->gossip_net_out = gossip_net_out;
254 :
255 0 : gossip->entrypoints_cnt = entrypoints_len;
256 0 : fd_memcpy( gossip->entrypoints, entrypoints, entrypoints_len*sizeof(fd_ip4_port_t) );
257 :
258 0 : gossip->purged = fd_gossip_purged_join( fd_gossip_purged_new( purged, rng, max_values ) );
259 0 : FD_TEST( gossip->purged );
260 :
261 0 : gossip->wsample = fd_gossip_wsample_join( fd_gossip_wsample_new( wsample, rng, FD_CONTACT_INFO_TABLE_SIZE ) );
262 0 : FD_TEST( gossip->wsample );
263 :
264 0 : gossip->crds = fd_crds_join( fd_crds_new( crds, entrypoints, entrypoints_len, gossip->wsample, active_set, rng, max_values, gossip->purged, activity_update_fn, activity_update_fn_ctx, gossip_update_out ) );
265 0 : FD_TEST( gossip->crds );
266 :
267 0 : gossip->active_set = fd_active_set_join( fd_active_set_new( active_set, gossip->wsample, gossip->crds, rng, identity_pubkey, 0UL, send_fn, send_ctx ) );
268 0 : FD_TEST( gossip->active_set );
269 :
270 0 : gossip->ping_tracker = fd_ping_tracker_join( fd_ping_tracker_new( ping_tracker, rng, gossip->entrypoints_cnt, gossip->entrypoints, ping_tracker_change, gossip ) );
271 0 : FD_TEST( gossip->ping_tracker );
272 :
273 0 : gossip->prune_finder = fd_prune_finder_join( fd_prune_finder_new( prune_finder ) );
274 0 : FD_TEST( gossip->prune_finder );
275 :
276 0 : gossip->stake.count = 0UL;
277 0 : gossip->stake.pool = stake_pool_join( stake_pool_new( stake_pool, MAX_SHRED_DESTS ) );
278 0 : FD_TEST( gossip->stake.pool );
279 :
280 0 : gossip->stake.map = stake_map_join( stake_map_new( stake_weights, stake_map_chain_cnt_est( MAX_SHRED_DESTS ), fd_rng_ulong( rng ) ) );
281 0 : FD_TEST( gossip->stake.map );
282 :
283 0 : FD_TEST( fd_sha256_join( fd_sha256_new( gossip->sha256 ) ) );
284 0 : FD_TEST( fd_sha512_join( fd_sha512_new( gossip->sha512 ) ) );
285 :
286 0 : gossip->rng = rng;
287 :
288 0 : gossip->timers.next_pull_request = 0L;
289 0 : gossip->timers.next_active_set_refresh = 0L;
290 0 : gossip->timers.next_contact_info_refresh = 0L;
291 0 : gossip->timers.next_flush_push_state = 0L;
292 :
293 0 : gossip->outbound_budget.remaining = 0UL;
294 0 : gossip->outbound_budget.last_replenish_nanos = now;
295 :
296 0 : gossip->send_fn = send_fn;
297 0 : gossip->send_ctx = send_ctx;
298 0 : gossip->sign_fn = sign_fn;
299 0 : gossip->sign_ctx = sign_ctx;
300 0 : gossip->ping_tracker_change_fn = ping_tracker_change_fn;
301 0 : gossip->ping_tracker_change_fn_ctx = ping_tracker_change_fn_ctx;
302 :
303 0 : gossip->my_contact_info.ci->tag = FD_GOSSIP_VALUE_CONTACT_INFO;
304 0 : *gossip->my_contact_info.ci->contact_info = *my_contact_info;
305 0 : fd_memcpy( gossip->identity_pubkey, identity_pubkey, 32UL );
306 0 : gossip->identity_stake = 0UL;
307 0 : refresh_contact_info( gossip, now );
308 :
309 0 : fd_memset( gossip->metrics, 0, sizeof(fd_gossip_metrics_t) );
310 :
311 0 : return gossip;
312 0 : }
313 :
314 : fd_gossip_t *
315 0 : fd_gossip_join( void * shgossip ) {
316 0 : if( FD_UNLIKELY( !shgossip ) ) {
317 0 : FD_LOG_WARNING(( "NULL shgossip" ));
318 0 : return NULL;
319 0 : }
320 :
321 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shgossip, fd_gossip_align() ) ) ) {
322 0 : FD_LOG_WARNING(( "misaligned shgossip" ));
323 0 : return NULL;
324 0 : }
325 :
326 0 : return (fd_gossip_t *)shgossip;
327 0 : }
328 :
329 : fd_gossip_metrics_t const *
330 0 : fd_gossip_metrics( fd_gossip_t const * gossip ) {
331 0 : return gossip->metrics;
332 0 : }
333 :
334 : fd_crds_metrics_t const *
335 0 : fd_gossip_crds_metrics( fd_gossip_t const * gossip ) {
336 0 : return fd_crds_metrics( gossip->crds );
337 0 : }
338 :
339 : fd_ping_tracker_metrics_t const *
340 0 : fd_gossip_ping_tracker_metrics( fd_gossip_t const * gossip ) {
341 0 : return fd_ping_tracker_metrics( gossip->ping_tracker );
342 0 : }
343 :
344 : fd_gossip_purged_metrics_t const *
345 0 : fd_gossip_purged_metrics2( fd_gossip_t const * gossip ) {
346 0 : return fd_gossip_purged_metrics( gossip->purged );
347 0 : }
348 :
349 : fd_active_set_metrics_t const *
350 0 : fd_gossip_active_set_metrics2( fd_gossip_t const * gossip ) {
351 0 : return fd_active_set_metrics( gossip->active_set );
352 0 : }
353 :
354 : static fd_ip4_port_t
355 0 : random_entrypoint( fd_gossip_t const * gossip ) {
356 0 : ulong idx = fd_rng_ulong_roll( gossip->rng, gossip->entrypoints_cnt );
357 0 : return gossip->entrypoints[ idx ];
358 0 : }
359 :
360 : ulong
361 : get_stake( fd_gossip_t const * gossip,
362 0 : uchar const * pubkey ) {
363 0 : stake_t const * entry = stake_map_ele_query_const( gossip->stake.map, (fd_pubkey_t const *)pubkey, NULL, gossip->stake.pool );
364 0 : if( FD_UNLIKELY( !entry ) ) return 0UL;
365 0 : return entry->stake;
366 0 : }
367 :
368 : void
369 : fd_gossip_set_identity( fd_gossip_t * gossip,
370 : uchar const * identity_pubkey,
371 0 : long now ) {
372 0 : int identity_changed = memcmp( gossip->identity_pubkey, identity_pubkey, 32UL );
373 0 : if( FD_UNLIKELY( !identity_changed ) ) return;
374 :
375 0 : ulong new_ci_idx = fd_crds_ci_idx( gossip->crds, identity_pubkey );
376 :
377 : /* The new identity may already exist in CRDS as a normal peer (active
378 : in the wsample and potentially present in the active set). We
379 : must deactivate it before updating identity_pubkey to maintain the
380 : invariant that our own identity is never sampleable. */
381 0 : if( FD_UNLIKELY( new_ci_idx!=ULONG_MAX ) ) fd_active_set_remove_peer( gossip->active_set, new_ci_idx );
382 :
383 : /* Also remove the new identity from the ping tracker (we don't
384 : track ourselves). */
385 0 : fd_ping_tracker_remove( gossip->ping_tracker, identity_pubkey, now );
386 :
387 0 : fd_memcpy( gossip->identity_pubkey, identity_pubkey, 32UL );
388 0 : gossip->identity_stake = get_stake( gossip, identity_pubkey );
389 0 : fd_gossip_wsample_set_identity( gossip->wsample, new_ci_idx );
390 0 : fd_gossip_wsample_self_stake( gossip->wsample, gossip->identity_stake );
391 0 : fd_active_set_set_identity( gossip->active_set, gossip->identity_pubkey, gossip->identity_stake );
392 0 : fd_prune_finder_set_identity( gossip->prune_finder, gossip->identity_pubkey, gossip->identity_stake );
393 :
394 0 : refresh_contact_info( gossip, now );
395 0 : }
396 :
397 : void
398 : fd_gossip_set_shred_version( fd_gossip_t * gossip,
399 : ushort shred_version,
400 0 : long now ) {
401 0 : gossip->my_contact_info.ci->contact_info->shred_version = shred_version;
402 0 : refresh_contact_info( gossip, now );
403 0 : }
404 :
405 : void
406 : fd_gossip_stakes_update( fd_gossip_t * gossip,
407 : fd_stake_weight_t const * stake_weights,
408 0 : ulong stake_weights_cnt ) {
409 0 : stake_map_reset( gossip->stake.map );
410 0 : stake_pool_reset( gossip->stake.pool );
411 :
412 0 : for( ulong i=0UL; i<stake_weights_cnt; i++ ) {
413 0 : stake_t * entry = stake_pool_ele_acquire( gossip->stake.pool );
414 0 : entry->pubkey = stake_weights[i].key;
415 0 : entry->stake = stake_weights[i].stake;
416 0 : stake_map_ele_insert( gossip->stake.map, entry, gossip->stake.pool );
417 0 : }
418 :
419 0 : gossip->identity_stake = get_stake( gossip, gossip->identity_pubkey );
420 0 : fd_gossip_wsample_self_stake( gossip->wsample, gossip->identity_stake );
421 0 : fd_active_set_set_identity( gossip->active_set, gossip->identity_pubkey, gossip->identity_stake );
422 0 : fd_prune_finder_set_identity( gossip->prune_finder, gossip->identity_pubkey, gossip->identity_stake );
423 0 : gossip->stake.count = stake_pool_used( gossip->stake.pool );
424 0 : }
425 :
426 : /* Outbound data budget constants (matching Agave's DataBudget for gossip).
427 : Budget is replenished every BUDGET_REPLENISH_INTERVAL_NS with
428 : num_staked * BUDGET_BYTES_PER_INTERVAL bytes, capped at
429 : BUDGET_MAX_MULTIPLE * num_staked * BUDGET_BYTES_PER_INTERVAL. */
430 :
431 : #define BUDGET_REPLENISH_INTERVAL_NS (100L*1000L*1000L) /* 100 ms */
432 0 : #define BUDGET_BYTES_PER_INTERVAL (1024UL) /* per staked validator */
433 0 : #define BUDGET_MAX_MULTIPLE (5UL) /* max accumulation */
434 0 : #define BUDGET_MIN_STAKED (2UL) /* floor for num_staked */
435 :
436 : /* Lazily replenish the outbound pull-response budget if at least
437 : BUDGET_REPLENISH_INTERVAL_NS have elapsed since last replenish.
438 : Returns current remaining budget in bytes. */
439 :
440 : static inline ulong
441 : outbound_budget_replenish( fd_gossip_t * gossip,
442 0 : long now ) {
443 0 : long elapsed = now-gossip->outbound_budget.last_replenish_nanos;
444 :
445 0 : if( FD_LIKELY( elapsed>=BUDGET_REPLENISH_INTERVAL_NS ) ) {
446 0 : ulong num_staked = fd_ulong_max( gossip->stake.count, BUDGET_MIN_STAKED );
447 0 : ulong increment = num_staked * BUDGET_BYTES_PER_INTERVAL;
448 0 : ulong cap = BUDGET_MAX_MULTIPLE * increment;
449 0 : ulong remaining = gossip->outbound_budget.remaining + increment;
450 0 : gossip->outbound_budget.remaining = fd_ulong_min( remaining, cap );
451 0 : gossip->outbound_budget.last_replenish_nanos = now;
452 0 : }
453 0 : return gossip->outbound_budget.remaining;
454 0 : }
455 :
456 : static inline void
457 : txbuild_flush( fd_gossip_t * gossip,
458 : fd_gossip_txbuild_t * txbuild,
459 : fd_stem_context_t * stem,
460 : fd_ip4_port_t dest_addr,
461 0 : long now ) {
462 0 : if( FD_UNLIKELY( !txbuild->crds_len ) ) return;
463 :
464 : /* Debit the outbound data budget (gossip payload bytes only, not
465 : including IP/UDP headers — matching Agave's DataBudget which
466 : operates on serialized gossip-layer packet sizes). */
467 0 : gossip->outbound_budget.remaining -= fd_ulong_min( txbuild->bytes_len, gossip->outbound_budget.remaining );
468 :
469 0 : gossip->send_fn( gossip->send_ctx, stem, txbuild->bytes, txbuild->bytes_len, &dest_addr, (ulong)now );
470 :
471 0 : gossip->metrics->message_tx[ txbuild->tag ]++;
472 0 : gossip->metrics->message_tx_bytes[ txbuild->tag ] += txbuild->bytes_len+42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
473 0 : for( ulong i=0UL; i<txbuild->crds_len; i++ ) {
474 0 : gossip->metrics->crds_tx_pull_response[ txbuild->crds[ i ].tag ]++;
475 0 : gossip->metrics->crds_tx_pull_response_bytes[ txbuild->crds[ i ].tag ] += txbuild->crds[ i ].sz;
476 0 : }
477 :
478 0 : fd_gossip_txbuild_init( txbuild, gossip->identity_pubkey, txbuild->tag );
479 0 : }
480 :
481 : /* pull_scan_range iterates CRDS entries in [start_hash, end_hash] and
482 : appends values the caller is missing to pull_resp. Returns 1 if a
483 : budget was exhausted (iteration or outbound data), 0 if the range was
484 : fully scanned. */
485 :
486 : static int
487 : pull_scan_range( fd_gossip_t * gossip,
488 : ulong start_hash,
489 : ulong end_hash,
490 : fd_bloom_t * filter,
491 : ulong adjusted_wallclock_ms,
492 : fd_gossip_txbuild_t * pull_resp,
493 : fd_stem_context_t * stem,
494 : fd_ip4_port_t peer_addr,
495 0 : long now ) {
496 0 : uchar iter_mem[ 16UL ];
497 :
498 0 : for( fd_crds_mask_iter_t * it = fd_crds_mask_iter_init_range( gossip->crds, start_hash, end_hash, iter_mem );
499 0 : !fd_crds_mask_iter_done( it, gossip->crds );
500 0 : it=fd_crds_mask_iter_next( it, gossip->crds ) ) {
501 0 : if( FD_UNLIKELY( !gossip->scan_budget.remaining ) ) return 1;
502 0 : gossip->scan_budget.remaining--;
503 :
504 0 : fd_crds_entry_t const * candidate = fd_crds_mask_iter_entry( it, gossip->crds );
505 :
506 0 : if( FD_UNLIKELY( fd_crds_entry_wallclock( candidate )>adjusted_wallclock_ms ) ) continue;
507 :
508 0 : if( FD_UNLIKELY( fd_bloom_contains( filter, fd_crds_entry_hash( candidate ), 32UL ) ) ) continue;
509 :
510 0 : uchar const * crds_val;
511 0 : ulong crds_size;
512 0 : fd_crds_entry_value( candidate, &crds_val, &crds_size );
513 0 : if( FD_UNLIKELY( !fd_gossip_txbuild_can_fit( pull_resp, crds_size ) ) ) txbuild_flush( gossip, pull_resp, stem, peer_addr, now );
514 0 : fd_gossip_txbuild_append( pull_resp, crds_size, crds_val );
515 :
516 0 : if( FD_UNLIKELY( !gossip->outbound_budget.remaining ) ) return 1;
517 0 : }
518 0 : return 0;
519 0 : }
520 :
521 : static void
522 : rx_pull_request( fd_gossip_t * gossip,
523 : fd_gossip_pull_request_t const * pr_view,
524 : fd_ip4_port_t peer_addr,
525 : fd_stem_context_t * stem,
526 0 : long now ) {
527 : /* Replenish and check outbound data budget. If the budget is
528 : exhausted, skip generating pull responses entirely. */
529 0 : if( FD_UNLIKELY( !outbound_budget_replenish( gossip, now ) ) ) return;
530 :
531 : /* When responding to a pull request, we skip CRDS entries whose
532 : wallclock is newer than the caller's wallclock + a random jitter.
533 : The jitter is drawn uniformly from [0, TIMEOUT/4) ms, matching
534 : Agave's behavior (CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS = 15000ms).
535 : This prevents all responders from consistently excluding the same
536 : set of very-recent CRDS values. */
537 0 : #define FD_GOSSIP_PULL_JITTER_BOUND_MS (15000UL/4UL)
538 :
539 : /* Generate a random jitter in [0, 3750) ms, added to the caller's
540 : wallclock. CRDS entries newer than this adjusted threshold are
541 : excluded from the response. The jitter prevents all responders
542 : from consistently excluding the same near-boundary entries,
543 : improving cluster-wide convergence of recent values. */
544 0 : ulong caller_wallclock_ms = pr_view->contact_info->wallclock;
545 0 : ulong jitter_ms = fd_rng_ulong_roll( gossip->rng, FD_GOSSIP_PULL_JITTER_BOUND_MS );
546 0 : ulong adjusted_wallclock_ms = caller_wallclock_ms + jitter_ms;
547 :
548 0 : ulong keys[ sizeof(pr_view->crds_filter->filter->keys)/sizeof(ulong) ];
549 0 : ulong bits[ sizeof(pr_view->crds_filter->filter->bits)/sizeof(ulong) ];
550 0 : fd_memcpy( keys, pr_view->crds_filter->filter->keys, sizeof(pr_view->crds_filter->filter->keys) );
551 0 : fd_memcpy( bits, pr_view->crds_filter->filter->bits, sizeof(pr_view->crds_filter->filter->bits) );
552 :
553 0 : fd_bloom_t filter[1];
554 0 : filter->keys_len = pr_view->crds_filter->filter->keys_len;
555 0 : filter->keys = keys;
556 :
557 0 : filter->bits_len = pr_view->crds_filter->filter->bits_len;
558 0 : filter->bits = bits;
559 :
560 0 : fd_gossip_txbuild_t pull_resp[1];
561 0 : fd_gossip_txbuild_init( pull_resp, gossip->identity_pubkey, FD_GOSSIP_MESSAGE_PULL_RESPONSE );
562 :
563 : /* CPU budget for the CRDS treap scan. An honest sender picks
564 :
565 : mask_bits = ceil(log2(num_items / max_items))
566 : num_items >= MIN_NUM_BLOOM_ITEMS (65536)
567 :
568 : max_items is computed by fd_bloom_max_items as:
569 :
570 : max_items = ceil( max_bits / D(K) )
571 : D(K) = -K / ln(1 - exp(ln(p)/K)), p=0.1
572 :
573 : where K is the number of bloom hash functions. D(K) is minimized
574 : at K=3 where D(3)=4.808. max_bits is bounded by the bloom struct's
575 : bits[] array which holds at most 151 u64 words = 9664 bits (see
576 : fd_gossip_message.h). Therefore,
577 :
578 : max_items <= ceil(9664/4.808) = 2010 < 2048.
579 :
580 : The sender's mask_bits partitions the hash space into 2^mask_bits
581 : partitions, each covering ~max_items of the sender's entries.
582 : What's the worst case iteration count for the receiver given an
583 : honest sender?
584 :
585 : 2^mask_bits >= MIN_NUM_BLOOM_ITEMS / max_items
586 : iterations <= crds_len / 2^mask_bits
587 : iterations <= crds_len * max_items / MIN_NUM_BLOOM_ITEMS
588 : <= crds_len * 2048 / 65536
589 : <= crds_len / 32
590 :
591 : A floor of 1024 handles small tables where the budget
592 : would otherwise round to near-zero. */
593 :
594 0 : gossip->scan_budget.remaining = fd_ulong_max( fd_crds_len( gossip->crds ) / 32UL, 1024UL );
595 :
596 : /* Compute the hash range [start_hash, end_hash] for this mask
597 : partition, then pick a random starting point within it.
598 : We iterate [mid_hash, end_hash] then [start_hash, mid_hash)
599 : so that when the iteration budget truncates the scan, different
600 : entries are skipped each time rather than always penalizing the
601 : tail of the partition. */
602 :
603 0 : ulong start_hash, end_hash;
604 0 : fd_gossip_purged_generate_masks( pr_view->crds_filter->mask, pr_view->crds_filter->mask_bits, &start_hash, &end_hash );
605 0 : ulong range = end_hash - start_hash;
606 0 : ulong mid_hash = start_hash + ( range==ULONG_MAX ? fd_rng_ulong( gossip->rng ) : fd_rng_ulong( gossip->rng ) % (range + 1UL) );
607 :
608 0 : int exhausted = pull_scan_range( gossip, mid_hash, end_hash, filter, adjusted_wallclock_ms, pull_resp, stem, peer_addr, now );
609 0 : if( FD_LIKELY( !exhausted && mid_hash>start_hash ) ) pull_scan_range( gossip, start_hash, mid_hash-1UL, filter, adjusted_wallclock_ms, pull_resp, stem, peer_addr, now );
610 :
611 0 : txbuild_flush( gossip, pull_resp, stem, peer_addr, now );
612 0 : }
613 :
614 : static void
615 : rx_values( fd_gossip_t * gossip,
616 : ulong values_len,
617 : fd_gossip_value_t const * values,
618 : uchar const * payload,
619 : uchar const * failed,
620 : fd_stem_context_t * stem,
621 : long now,
622 0 : long results[ static 17UL ] ) {
623 0 : for( ulong i=0UL; i<values_len; i++ ) {
624 0 : fd_gossip_value_t const * value = &values[ i ];
625 :
626 0 : if( FD_UNLIKELY( failed[ i ] ) ) {
627 0 : uchar candidate_hash[ 32UL ];
628 0 : fd_sha256_hash( payload+value->offset, value->length, candidate_hash );
629 0 : if( FD_LIKELY( failed[ i ]==FD_GOSSIP_FAILED_NO_CONTACT_INFO ) ) fd_gossip_purged_insert_no_contact_info( gossip->purged, value->origin, candidate_hash, now );
630 0 : else fd_gossip_purged_insert_failed_insert( gossip->purged, candidate_hash, now );
631 0 : continue;
632 0 : }
633 :
634 0 : ulong origin_stake = get_stake( gossip, value->origin );
635 0 : int is_me = !memcmp( value->origin, gossip->identity_pubkey, 32UL );
636 :
637 0 : int origin_ping_tracker_active = 0;
638 0 : fd_ip4_port_t origin_addr = { 0 };
639 0 : if( FD_UNLIKELY( value->tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) {
640 0 : origin_addr = (fd_ip4_port_t){
641 0 : .addr = value->contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0U : value->contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4,
642 0 : .port = value->contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port
643 0 : };
644 0 : origin_ping_tracker_active = fd_ping_tracker_active( gossip->ping_tracker, value->origin, origin_addr );
645 0 : }
646 :
647 0 : results[ i ] = fd_crds_insert( gossip->crds, value, payload+value->offset, value->length, origin_stake, origin_ping_tracker_active, is_me, now, stem );
648 0 : if( FD_UNLIKELY( results[ i ] ) ) continue;
649 :
650 0 : if( FD_UNLIKELY( value->tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) {
651 0 : if( FD_LIKELY( !is_me ) ) fd_ping_tracker_track( gossip->ping_tracker, value->origin, origin_stake, origin_addr, now );
652 :
653 : /* We just learned this peer's contact info. Drain any
654 : no_contact_info hashes associated with this origin from the
655 : purged set so peers re-send those CRDS values. */
656 0 : if( FD_LIKELY( fd_ping_tracker_active( gossip->ping_tracker, value->origin, origin_addr ) ) ) fd_gossip_purged_drain_no_contact_info( gossip->purged, value->origin );
657 0 : }
658 :
659 0 : fd_active_set_push( gossip->active_set, payload+value->offset, value->length, value->origin, origin_stake, stem, now, 0 );
660 0 : }
661 0 : }
662 :
663 : static void
664 : rx_pull_response( fd_gossip_t * gossip,
665 : fd_gossip_pull_response_t const * pull_response,
666 : uchar const * payload,
667 : uchar const * failed,
668 : fd_stem_context_t * stem,
669 0 : long now ) {
670 0 : long results[ 17UL ];
671 0 : rx_values( gossip, pull_response->values_len, pull_response->values, payload, failed, stem, now, results );
672 0 : for( ulong i=0UL; i<pull_response->values_len; i++ ) {
673 0 : if( FD_UNLIKELY( failed[ i ] ) ) continue;
674 0 : if( FD_LIKELY( !results[ i ] ) ) gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_UPSERTED_PULL_RESPONSE_IDX ]++;
675 0 : else if( results[ i ]<0L ) gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PULL_RESPONSE_STALE_IDX ]++;
676 0 : else gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PULL_RESPONSE_DUPLICATE_IDX ]++;
677 0 : }
678 0 : }
679 :
680 : /* tx_prune constructs, signs, and sends a prune message telling
681 : `relayer` to stop pushing CRDS values originating from `origin`.
682 :
683 : On-wire layout (bincode):
684 : Protocol tag 4 (FD_GOSSIP_MESSAGE_PRUNE = 3)
685 : sender pubkey 32 (= identity_pubkey, outer PruneMessage field)
686 : PruneData.pubkey 32 (= identity_pubkey)
687 : prunes_len 8
688 : prunes[1] 32
689 : signature 64
690 : destination 32
691 : wallclock 8
692 :
693 : The signable data (input to Ed25519 sign) is the PruneData fields
694 : excluding signature:
695 : prefix[26] + pubkey[32] + prunes_len[8] + prunes[32] + destination[32] + wallclock[8]
696 : This must match fd_keyguard_payload_matches_prune_data (106 + 32 bytes). */
697 :
698 : static void
699 : tx_prune( fd_gossip_t * gossip,
700 : uchar const * relayer,
701 : uchar const * origin,
702 : fd_stem_context_t * stem,
703 0 : long now ) {
704 0 : ulong ci_idx = fd_crds_ci_idx( gossip->crds, relayer );
705 0 : if( FD_UNLIKELY( ci_idx==ULONG_MAX ) ) return;
706 :
707 0 : fd_gossip_contact_info_t const * ci = fd_crds_ci( gossip->crds, ci_idx );
708 0 : fd_ip4_port_t dest_addr = {
709 0 : .addr = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0U : ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4,
710 0 : .port = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port
711 0 : };
712 0 : if( FD_UNLIKELY( !dest_addr.addr || !dest_addr.port ) ) return;
713 :
714 0 : ulong wallclock = (ulong)FD_NANOSEC_TO_MILLI( now );
715 :
716 : /* Build the signable payload:
717 : prefix[26] + pubkey[32] + prunes_len[8] + prunes[32] + destination[32] + wallclock[8] */
718 0 : uchar signable[ 26UL + 32UL + 8UL + 32UL + 32UL + 8UL ];
719 0 : uchar * p = signable;
720 0 : FD_STORE( ulong, p, 18UL ); p += 8UL;
721 0 : fd_memcpy( p, "\xffSOLANA_PRUNE_DATA", 18UL ); p += 18UL;
722 0 : fd_memcpy( p, gossip->identity_pubkey, 32UL ); p += 32UL;
723 0 : FD_STORE( ulong, p, 1UL ); p += 8UL;
724 0 : fd_memcpy( p, origin, 32UL ); p += 32UL;
725 0 : fd_memcpy( p, relayer, 32UL ); p += 32UL;
726 0 : FD_STORE( ulong, p, wallclock ); p += 8UL;
727 :
728 0 : uchar signature[ 64UL ];
729 0 : gossip->sign_fn( gossip->sign_ctx, signable, sizeof(signable), FD_KEYGUARD_SIGN_TYPE_ED25519, signature );
730 :
731 : /* Build the on-wire packet:
732 : tag(4) + sender(32) + pubkey(32) + prunes_len(8) + prunes[32]
733 : + signature(64) + destination(32) + wallclock(8) */
734 0 : uchar pkt[ 4UL + 32UL + 32UL + 8UL + 32UL + 64UL + 32UL + 8UL ];
735 0 : uchar * q = pkt;
736 0 : FD_STORE( uint, q, FD_GOSSIP_MESSAGE_PRUNE ); q += 4UL;
737 0 : fd_memcpy( q, gossip->identity_pubkey, 32UL ); q += 32UL; /* sender */
738 0 : fd_memcpy( q, gossip->identity_pubkey, 32UL ); q += 32UL; /* PruneData.pubkey */
739 0 : FD_STORE( ulong, q, 1UL ); q += 8UL;
740 0 : fd_memcpy( q, origin, 32UL ); q += 32UL;
741 0 : fd_memcpy( q, signature, 64UL ); q += 64UL;
742 0 : fd_memcpy( q, relayer, 32UL ); q += 32UL;
743 0 : FD_STORE( ulong, q, wallclock ); q += 8UL;
744 :
745 0 : gossip->send_fn( gossip->send_ctx, stem, pkt, sizeof(pkt), &dest_addr, (ulong)now );
746 :
747 0 : gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PRUNE ]++;
748 0 : gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PRUNE ] += sizeof(pkt) + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
749 0 : }
750 :
751 : static void
752 : tx_prunes( fd_gossip_t * gossip,
753 : fd_stem_context_t * stem,
754 0 : long now ) {
755 0 : uchar const * relayer;
756 0 : uchar const * origin;
757 0 : while( fd_prune_finder_pop_prune( gossip->prune_finder, &relayer, &origin ) ) {
758 0 : tx_prune( gossip, relayer, origin, stem, now );
759 0 : }
760 0 : }
761 :
762 : static void
763 : rx_push( fd_gossip_t * gossip,
764 : fd_gossip_push_t const * push,
765 : uchar const * payload,
766 : uchar const * failed,
767 : long now,
768 0 : fd_stem_context_t * stem ) {
769 0 : long results[ 17UL ];
770 0 : rx_values( gossip, push->values_len, push->values, payload, failed, stem, now, results );
771 :
772 0 : for( ulong i=0UL; i<push->values_len; i++ ) {
773 0 : if( FD_UNLIKELY( failed[ i ] ) ) continue;
774 0 : if( FD_LIKELY( !results[ i ] ) ) gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_UPSERTED_PUSH_IDX ]++;
775 0 : else if( results[ i ]<0L ) gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PUSH_STALE_IDX ]++;
776 0 : else gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PUSH_DUPLICATE_IDX ]++;
777 :
778 0 : ulong num_dups;
779 0 : if( FD_LIKELY( !results[ i ] ) ) num_dups = 0UL;
780 0 : else if( FD_UNLIKELY( results[ i ]<0L ) ) num_dups = ULONG_MAX; /* stale => never timely */
781 0 : else num_dups = (ulong)results[ i ];
782 :
783 0 : ulong origin_stake = get_stake( gossip, push->values[ i ].origin );
784 0 : fd_prune_finder_record( gossip->prune_finder, push->values[ i ].origin, origin_stake, push->from, get_stake( gossip, push->from ), num_dups );
785 0 : }
786 :
787 0 : tx_prunes( gossip, stem, now );
788 0 : }
789 :
790 : static void
791 : rx_prune( fd_gossip_t * gossip,
792 0 : fd_gossip_prune_t const * prune ) {
793 0 : for( ulong i=0UL; i<prune->prunes_len; i++ ) {
794 0 : fd_active_set_prune( gossip->active_set,
795 0 : prune->pubkey,
796 0 : prune->prunes[ i ],
797 0 : get_stake( gossip, prune->prunes[ i ] ) );
798 0 : }
799 0 : }
800 :
801 :
802 : static void
803 : rx_ping( fd_gossip_t * gossip,
804 : fd_gossip_ping_t const * ping,
805 : fd_ip4_port_t peer_address,
806 : fd_stem_context_t * stem,
807 0 : long now ) {
808 0 : uchar out_payload[ sizeof(fd_gossip_pong_t)+4UL];
809 0 : FD_STORE( uint, out_payload, FD_GOSSIP_MESSAGE_PONG );
810 :
811 0 : fd_gossip_pong_t * out_pong = (fd_gossip_pong_t *)(out_payload + 4UL);
812 0 : fd_memcpy( out_pong->from, gossip->identity_pubkey, 32UL );
813 :
814 : /* fd_keyguard checks payloads for certain patterns before performing the
815 : sign. Pattern-matching can't be done on hashed data, so we need
816 : to supply the pre-hashed image to the sign fn (fd_keyguard will hash when
817 : supplied with FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519) while also hashing
818 : the image ourselves onto pong->ping_hash */
819 :
820 0 : uchar pre_image[ 48UL ];
821 0 : fd_memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
822 0 : fd_memcpy( pre_image+16UL, ping->token, 32UL );
823 :
824 0 : fd_sha256_hash( pre_image, 48UL, out_pong->hash );
825 :
826 0 : gossip->sign_fn( gossip->sign_ctx, pre_image, 48UL, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519, out_pong->signature );
827 0 : gossip->send_fn( gossip->send_ctx, stem, out_payload, sizeof(out_payload), &peer_address, (ulong)now );
828 :
829 0 : gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PONG ]++;
830 0 : gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PONG ] += sizeof(out_payload)+42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
831 0 : }
832 :
833 : static void
834 : rx_pong( fd_gossip_t * gossip,
835 : fd_gossip_pong_t const * pong,
836 : fd_ip4_port_t peer_address,
837 0 : long now ) {
838 0 : ulong stake = get_stake( gossip, pong->from );
839 0 : fd_ping_tracker_register( gossip->ping_tracker, pong->from, stake, peer_address, pong->hash, now );
840 0 : }
841 :
842 : void
843 : fd_gossip_rx( fd_gossip_t * gossip,
844 : fd_ip4_port_t peer,
845 : uchar const * data,
846 : ulong data_sz,
847 : long now,
848 0 : fd_stem_context_t * stem ) {
849 : /* TODO: Implement traffic shaper / bandwidth limiter */
850 0 : FD_TEST( data_sz>=sizeof(fd_gossip_message_t)+FD_GOSSIP_MESSAGE_MAX_CRDS );
851 0 : fd_gossip_message_t const * message = (fd_gossip_message_t const *)data;
852 0 : uchar const * failed = data+sizeof(fd_gossip_message_t);
853 0 : uchar const * payload = data+sizeof(fd_gossip_message_t)+FD_GOSSIP_MESSAGE_MAX_CRDS;
854 :
855 0 : switch( message->tag ) {
856 0 : case FD_GOSSIP_MESSAGE_PULL_REQUEST: rx_pull_request( gossip, message->pull_request, peer, stem, now ); break;
857 0 : case FD_GOSSIP_MESSAGE_PULL_RESPONSE: rx_pull_response( gossip, message->pull_response, payload, failed, stem, now ); break;
858 0 : case FD_GOSSIP_MESSAGE_PUSH: rx_push( gossip, message->push, payload, failed, now, stem ); break;
859 0 : case FD_GOSSIP_MESSAGE_PRUNE: rx_prune( gossip, message->prune ); break;
860 0 : case FD_GOSSIP_MESSAGE_PING: rx_ping( gossip, message->ping, peer, stem, now ); break;
861 0 : case FD_GOSSIP_MESSAGE_PONG: rx_pong( gossip, message->pong, peer, now ); break;
862 0 : default:
863 0 : FD_LOG_CRIT(( "Unknown gossip message type %u", message->tag ));
864 0 : break;
865 0 : }
866 0 : }
867 :
868 : static int
869 : fd_gossip_push( fd_gossip_t * gossip,
870 : fd_gossip_value_t const * value,
871 : fd_stem_context_t * stem,
872 0 : long now ) {
873 0 : uchar serialized[ FD_GOSSIP_VALUE_MAX_SZ ];
874 0 : long serialized_sz = fd_gossip_value_serialize( value, serialized, sizeof(serialized) );
875 0 : FD_TEST( serialized_sz!=-1L );
876 0 : gossip->sign_fn( gossip->sign_ctx, serialized+64UL, (ulong)serialized_sz-64UL, FD_KEYGUARD_SIGN_TYPE_ED25519, serialized );
877 :
878 0 : int origin_active = 0; /* Value doesn't matter, since is_me=1 it's never used. */
879 0 : if( FD_UNLIKELY( fd_crds_insert( gossip->crds, value, serialized, (ulong)serialized_sz, gossip->identity_stake, origin_active, 1, now, stem ) ) ) return -1;
880 :
881 0 : fd_active_set_push( gossip->active_set, serialized, (ulong)serialized_sz, gossip->identity_pubkey, gossip->identity_stake, stem, now, 1 );
882 0 : return 0;
883 0 : }
884 :
885 : int
886 : fd_gossip_push_vote( fd_gossip_t * gossip,
887 : uchar const * txn,
888 : ulong txn_sz,
889 : fd_stem_context_t * stem,
890 0 : long now ) {
891 0 : fd_gossip_value_t value = {
892 0 : .tag = FD_GOSSIP_VALUE_VOTE,
893 0 : .wallclock = (ulong)FD_NANOSEC_TO_MILLI( now ),
894 0 : .vote = {{
895 0 : .index = 0UL, /* TODO */
896 0 : .transaction_len = txn_sz,
897 0 : }}
898 0 : };
899 0 : fd_memcpy( value.origin, gossip->identity_pubkey, 32UL );
900 0 : FD_TEST( txn_sz<=sizeof(value.vote->transaction) );
901 0 : fd_memcpy( value.vote->transaction, txn, txn_sz );
902 :
903 0 : return fd_gossip_push( gossip, &value, stem, now );
904 0 : }
905 :
906 : int
907 : fd_gossip_push_duplicate_shred( fd_gossip_t * gossip,
908 : fd_gossip_duplicate_shred_t const * duplicate_shred,
909 : fd_stem_context_t * stem,
910 0 : long now ) {
911 0 : fd_gossip_value_t value = {
912 0 : .tag = FD_GOSSIP_VALUE_DUPLICATE_SHRED,
913 0 : .wallclock = (ulong)FD_NANOSEC_TO_MILLI( now ),
914 0 : };
915 0 : fd_memcpy( value.origin, gossip->identity_pubkey, 32UL );
916 0 : *value.duplicate_shred = *duplicate_shred;
917 :
918 0 : return fd_gossip_push( gossip, &value, stem, now );
919 0 : }
920 :
921 : static void
922 : tx_ping( fd_gossip_t * gossip,
923 : fd_stem_context_t * stem,
924 : long now,
925 0 : int * charge_busy ) {
926 0 : uchar out_payload[ sizeof(fd_gossip_ping_t) + 4UL ];
927 0 : FD_STORE( uint, out_payload, FD_GOSSIP_MESSAGE_PING );
928 :
929 0 : fd_gossip_ping_t * out_ping = (fd_gossip_ping_t *)( out_payload+4UL );
930 0 : fd_memcpy( out_ping->from, gossip->identity_pubkey, 32UL );
931 :
932 0 : uchar const * peer_pubkey;
933 0 : uchar const * ping_token;
934 0 : fd_ip4_port_t const * peer_address;
935 0 : while( fd_ping_tracker_pop_request( gossip->ping_tracker,
936 0 : now,
937 0 : &peer_pubkey,
938 0 : &peer_address,
939 0 : &ping_token ) ) {
940 0 : fd_memcpy( out_ping->token, ping_token, 32UL );
941 :
942 0 : gossip->sign_fn( gossip->sign_ctx, out_ping->token, 32UL, FD_KEYGUARD_SIGN_TYPE_ED25519, out_ping->signature );
943 0 : gossip->send_fn( gossip->send_ctx, stem, out_payload, sizeof(out_payload), peer_address, (ulong)now );
944 :
945 0 : gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PING ]++;
946 0 : gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PING ] += sizeof(out_payload) + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
947 0 : if( charge_busy ) *charge_busy = 1;
948 0 : }
949 0 : }
950 :
951 : /* Construct and send a pull request to a random peer. The pull
952 : request contains a bloom filter over our known CRDS hashes so that
953 : the peer can respond with values we are missing.
954 :
955 : NOTE: Divergence from Agave:
956 : - Agave builds up to 2^mask_bits filters per pull period
957 : (sampling up to 1024), each covering a distinct partition of
958 : the hash space. We build and send exactly one filter per
959 : pull period, covering 1/2^mask_bits of the space.
960 :
961 : Maximum bloom filter bits in a PullRequest packet:
962 :
963 : PACKET_DATA_SIZE = 1232 (= 1280 - 40 - 8)
964 :
965 : Bytes consumed by non-bloom fields:
966 : discriminant(4) + keys_len(8) + keys(8*num_keys) +
967 : has_bits(1) + bloom_vec_len(8) + bloom_bits_count(8) +
968 : bloom_num_bits_set(8) + mask(8) + mask_bits(4)
969 : + contact_info_crds_val(crds_val_sz)
970 : = 49 + 8*num_keys + crds_val_sz
971 :
972 : The bitvec is serialized as u64 words, so the bitvec storage is
973 : ceil(num_bits/64)*8 bytes. The remaining packet bytes must
974 : accommodate this.
975 :
976 : Agave determines the max_bytes parameter (input to Bloom::random)
977 : via an empirical cache (get_max_bloom_filter_bytes). max_bytes*8
978 : is passed as the max_bits cap to Bloom::random, but actual
979 : num_bits is only ~83% of max_bits (the E/D ratio for p=0.1).
980 : We replicate this with a closed-form inversion: the largest
981 : max_bytes where ceil(num_bits/64)*8 fits in remaining space is
982 : max_bytes = floor(D * floor(64*W/E) / 8), where W is the max
983 : number of u64 words, E and D are the bloom filter constants.
984 :
985 : num_keys depends on the bloom sizing, which depends on the
986 : overhead, which depends on num_keys. However there is a closed
987 : form: compute num_keys from the pessimistic KEYS=8 overhead, then
988 : recompute the tight overhead with the true num_keys. This always
989 : converges in one step because the optimal key count is
990 : D*ln(2) ≈ 3.32 (where D = ln(p)/ln(1/2^ln2)), far from any
991 : rounding boundary. For p=0.1 and KEYS=8, num_keys is always 3.
992 :
993 : NB: The has_bits(1) + bloom_vec_len(8) are only written when
994 : num_bits>=1. fd_bloom_num_bits clamps to [1, max_bits], so
995 : num_bits>=1 always holds and this layout is correct. */
996 :
997 : static void
998 : tx_pull_request( fd_gossip_t * gossip,
999 : fd_stem_context_t * stem,
1000 0 : long now ) {
1001 0 : ulong total_crds_vals = fd_crds_len( gossip->crds ) + fd_gossip_purged_len( gossip->purged );
1002 0 : ulong num_items = fd_ulong_max( 65536UL, total_crds_vals );
1003 0 : ulong crds_val_sz = gossip->my_contact_info.crds_val_sz;
1004 :
1005 : /* Step 1: Compute num_keys from the pessimistic KEYS=8 overhead
1006 : (same initial estimate Agave uses in CrdsFilterSet::new). */
1007 0 : ulong pessimistic_overhead = 49UL + 8UL*(ulong)BLOOM_NUM_KEYS + crds_val_sz;
1008 0 : FD_TEST( pessimistic_overhead<FD_GOSSIP_MTU );
1009 0 : double pessimistic_max_bits = (double)( 8UL*( FD_GOSSIP_MTU - pessimistic_overhead ) );
1010 0 : double pessimistic_items = fd_bloom_max_items( pessimistic_max_bits, BLOOM_NUM_KEYS, BLOOM_FALSE_POSITIVE_RATE );
1011 0 : FD_TEST( pessimistic_items>0.0 );
1012 0 : ulong pessimistic_num_bits = fd_bloom_num_bits( pessimistic_items, BLOOM_FALSE_POSITIVE_RATE, pessimistic_max_bits );
1013 0 : ulong num_keys = fd_bloom_num_keys( (double)pessimistic_num_bits, pessimistic_items );
1014 :
1015 : /* Step 2: Recompute with the tight overhead using the true num_keys.
1016 : Find the largest max_bytes parameter (matching Agave's
1017 : get_max_bloom_filter_bytes cache) such that the resulting bitvec
1018 : fits in the remaining packet space.
1019 :
1020 : Given:
1021 : max_items = ceil(max_bits / D) where D = -K / ln(1-exp(ln(p)/K))
1022 : num_bits = ceil(max_items * E) where E = ln(p) / ln(1/2^ln2)
1023 :
1024 : We need ceil(num_bits/64)*8 <= remaining, i.e. num_bits <= 64*W
1025 : where W = floor(remaining/8). Working backwards:
1026 : max_items <= I where I = floor(64*W / E)
1027 : max_bytes <= D*I / 8
1028 :
1029 : So max_bytes = floor(D * floor(64*W/E) / 8). */
1030 0 : ulong overhead = 49UL + 8UL*num_keys + crds_val_sz;
1031 0 : FD_TEST( overhead<FD_GOSSIP_MTU );
1032 0 : ulong remaining = FD_GOSSIP_MTU - overhead;
1033 0 : ulong max_words = remaining / 8UL; /* max u64 words for bitvec */
1034 :
1035 0 : double E = log( BLOOM_FALSE_POSITIVE_RATE ) / log( 1.0 / pow( 2.0, log( 2.0 ) ) );
1036 0 : double D = -BLOOM_NUM_KEYS / log( 1.0 - exp( log( BLOOM_FALSE_POSITIVE_RATE ) / BLOOM_NUM_KEYS ) );
1037 0 : ulong I = (ulong)floor( 64.0 * (double)max_words / E );
1038 0 : ulong max_bytes = (ulong)floor( D * (double)I / 8.0 );
1039 :
1040 0 : double max_bits = (double)( max_bytes * 8UL );
1041 0 : double max_items = fd_bloom_max_items( max_bits, BLOOM_NUM_KEYS, BLOOM_FALSE_POSITIVE_RATE );
1042 0 : FD_TEST( max_items>0.0 );
1043 0 : ulong num_bits = fd_bloom_num_bits( max_items, BLOOM_FALSE_POSITIVE_RATE, max_bits );
1044 0 : FD_TEST( num_bits>=1UL );
1045 0 : FD_TEST( (num_bits+63UL)/64UL<=max_words ); /* verify bitvec fits */
1046 0 : FD_TEST( fd_bloom_num_keys( (double)num_bits, max_items )==num_keys ); /* verify convergence */
1047 :
1048 0 : double _mask_bits = ceil( log2( (double)num_items / max_items ) );
1049 0 : uint mask_bits = _mask_bits >= 0.0 ? fd_uint_min( (uint)_mask_bits, 63U ) : 0U;
1050 0 : ulong mask = fd_rng_ulong( gossip->rng ) | (~0UL>>(mask_bits));
1051 :
1052 0 : uchar payload[ FD_GOSSIP_MTU ] = {0};
1053 :
1054 0 : ulong * keys_ptr, * bits_ptr, * bits_set;
1055 0 : long payload_sz = fd_gossip_pull_request_init( payload,
1056 0 : FD_GOSSIP_MTU,
1057 0 : num_keys,
1058 0 : num_bits,
1059 0 : mask,
1060 0 : mask_bits,
1061 0 : gossip->my_contact_info.crds_val,
1062 0 : gossip->my_contact_info.crds_val_sz,
1063 0 : &keys_ptr,
1064 0 : &bits_ptr,
1065 0 : &bits_set );
1066 0 : FD_TEST( -1L!=payload_sz );
1067 :
1068 0 : fd_bloom_t filter[1];
1069 0 : fd_bloom_init_inplace( keys_ptr, bits_ptr, num_keys, num_bits, 0, gossip->rng, BLOOM_FALSE_POSITIVE_RATE, filter );
1070 :
1071 0 : uchar iter_mem[ 16UL ];
1072 0 : for( fd_crds_mask_iter_t * it = fd_crds_mask_iter_init( gossip->crds, mask, mask_bits, iter_mem );
1073 0 : !fd_crds_mask_iter_done( it, gossip->crds );
1074 0 : it = fd_crds_mask_iter_next( it, gossip->crds ) ) {
1075 0 : fd_bloom_insert( filter, fd_crds_entry_hash( fd_crds_mask_iter_entry( it, gossip->crds ) ), 32UL );
1076 0 : }
1077 :
1078 0 : for( fd_gossip_purged_mask_iter_t * it = fd_gossip_purged_mask_iter_init( gossip->purged, mask, mask_bits, iter_mem );
1079 0 : !fd_gossip_purged_mask_iter_done( it, gossip->purged );
1080 0 : it = fd_gossip_purged_mask_iter_next( it, gossip->purged ) ){
1081 0 : fd_bloom_insert( filter, fd_gossip_purged_mask_iter_hash( it, gossip->purged ), 32UL );
1082 0 : }
1083 :
1084 0 : int num_bits_set = 0;
1085 0 : for( ulong i=0UL; i<(num_bits+63)/64UL; i++ ) num_bits_set += fd_ulong_popcnt( bits_ptr[ i ] );
1086 0 : *bits_set = (ulong)num_bits_set;
1087 :
1088 0 : ulong idx = fd_gossip_wsample_sample_pull_request( gossip->wsample );
1089 0 : fd_ip4_port_t peer_addr;
1090 0 : if( FD_UNLIKELY( idx==ULONG_MAX ) ) {
1091 0 : if( FD_UNLIKELY( !gossip->entrypoints_cnt ) ) {
1092 : /* We are the bootstrapping node, and nobody else is present in
1093 : the cluster. Nowhere to send the pull request. */
1094 0 : return;
1095 0 : }
1096 0 : peer_addr = random_entrypoint( gossip );
1097 0 : } else {
1098 0 : fd_gossip_contact_info_t const * peer = fd_crds_ci( gossip->crds, idx );
1099 0 : peer_addr.addr = peer->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0 : peer->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4;
1100 0 : peer_addr.port = peer->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port;
1101 0 : }
1102 0 : gossip->send_fn( gossip->send_ctx, stem, payload, (ulong)payload_sz, &peer_addr, (ulong)now );
1103 :
1104 0 : gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PULL_REQUEST ]++;
1105 0 : gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PULL_REQUEST ] += (ulong)payload_sz + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
1106 0 : }
1107 :
1108 : void
1109 : fd_gossip_advance( fd_gossip_t * gossip,
1110 : long now,
1111 : fd_stem_context_t * stem,
1112 0 : int * charge_busy ) {
1113 0 : outbound_budget_replenish( gossip, now );
1114 :
1115 0 : fd_gossip_purged_expire( gossip->purged, now );
1116 0 : fd_active_set_advance( gossip->active_set, stem, now, charge_busy );
1117 0 : fd_crds_advance( gossip->crds, now, stem, charge_busy );
1118 :
1119 0 : tx_ping( gossip, stem, now, charge_busy );
1120 0 : if( FD_UNLIKELY( now>=gossip->timers.next_pull_request ) ) {
1121 0 : tx_pull_request( gossip, stem, now );
1122 0 : if( charge_busy ) *charge_busy = 1;
1123 : /* 1.6ms (625/s). Agave sends min(1024, ceil(2^mask_bits/8))
1124 : filters every 500ms. For a typical mainnet table (~65k items,
1125 : mask_bits≈7) that is ~16 filters/500ms = one every 31ms. We
1126 : send a single filter per round, so we fire ~20× more often to
1127 : compensate for sending one filter instead of many per period.
1128 :
1129 : We considered dynamically matching Agave's exact rate by
1130 : computing 500ms/filters_per_round from mask_bits each round,
1131 : but this caused slow table fill on startup (mask_bits starts
1132 : low -> long intervals -> few pulls -> slow CRDS population).
1133 : Adaptive boosting (counter-based, timestamp-based, and
1134 : threshold-based) all added complexity without clear benefit:
1135 : counter decay lost state between send and response arrival,
1136 : timestamp checks never disarmed because trickle inserts kept
1137 : refreshing the window, and threshold heuristics required
1138 : tuning constants that varied by cluster size.
1139 :
1140 : A fixed 1.6ms is simpler and robust: the cost of a redundant
1141 : pull request is negligible (a single 1232-byte packet whose
1142 : reply will be empty if we're already caught up), and it
1143 : guarantees fast table fill on startup without any adaptive
1144 : machinery. */
1145 0 : gossip->timers.next_pull_request = now+1600L*1000L;
1146 0 : }
1147 0 : if( FD_UNLIKELY( now>=gossip->timers.next_contact_info_refresh ) ) {
1148 : /* TODO: Frequency of this? More often if observing? */
1149 0 : refresh_contact_info( gossip, now );
1150 0 : int origin_active = 0; /* Value doesn't matter, since is_me=1 it's never used. */
1151 0 : fd_crds_insert( gossip->crds, gossip->my_contact_info.ci, gossip->my_contact_info.crds_val, gossip->my_contact_info.crds_val_sz, gossip->identity_stake, origin_active, 1, now, stem );
1152 0 : fd_active_set_push( gossip->active_set, gossip->my_contact_info.crds_val, gossip->my_contact_info.crds_val_sz, gossip->identity_pubkey, gossip->identity_stake, stem, now, 1 );
1153 0 : gossip->timers.next_contact_info_refresh = now+15L*500L*1000L*1000L; /* TODO: Jitter */
1154 0 : if( charge_busy ) *charge_busy = 1;
1155 0 : }
1156 0 : }
1157 :
1158 : void
1159 : fd_gossip_ping_tracker_track( fd_gossip_t * gossip,
1160 : uchar const * peer_pubkey,
1161 : fd_ip4_port_t peer_address,
1162 0 : long now ) {
1163 : /* Don't track ourselves. */
1164 0 : if( FD_UNLIKELY( !memcmp( peer_pubkey, gossip->identity_pubkey, 32UL ) ) ) return;
1165 :
1166 0 : ulong origin_stake = get_stake( gossip, peer_pubkey );
1167 0 : fd_ping_tracker_track( gossip->ping_tracker, peer_pubkey, origin_stake, peer_address, now );
1168 0 : }
|