Line data Source code
1 : #include "fd_gossip.h"
2 : #include "fd_bloom.h"
3 : #include "fd_gossip_message.h"
4 : #include "fd_gossip_txbuild.h"
5 : #include "fd_active_set.h"
6 : #include "fd_ping_tracker.h"
7 : #include "fd_prune_finder.h"
8 : #include "fd_gossip_wsample.h"
9 : #include "../../disco/keyguard/fd_keyguard.h"
10 : #include "../../ballet/sha256/fd_sha256.h"
11 : #include "../leaders/fd_leaders_base.h"
12 :
13 : FD_STATIC_ASSERT( FD_METRICS_ENUM_GOSSIP_MESSAGE_CNT==FD_GOSSIP_MESSAGE_CNT,
14 : "FD_METRICS_ENUM_GOSSIP_MESSAGE_CNT must match FD_GOSSIP_MESSAGE_CNT" );
15 :
16 : FD_STATIC_ASSERT( FD_METRICS_ENUM_CRDS_VALUE_CNT==FD_GOSSIP_VALUE_CNT,
17 : "FD_METRICS_ENUM_CRDS_VALUE_CNT must match FD_GOSSIP_VALUE_CNT" );
18 :
19 0 : #define BLOOM_FALSE_POSITIVE_RATE (0.1)
20 0 : #define BLOOM_NUM_KEYS (8.0)
21 :
22 : struct stake {
23 : fd_pubkey_t pubkey;
24 : ulong stake;
25 :
26 : struct {
27 : ulong prev;
28 : ulong next;
29 : } map;
30 :
31 : struct {
32 : ulong next;
33 : } pool;
34 : };
35 :
36 : typedef struct stake stake_t;
37 :
38 : /* NOTE: Since the staked count is known at the time we populate
39 : the map, we can treat the pool as an array instead. This means we
40 : can bypass the acquire/release model and quickly iterate through the
41 : pool when we repopulate the map on every fd_gossip_stakes_update
42 : iteration. */
43 : #define POOL_NAME stake_pool
44 0 : #define POOL_T stake_t
45 : #define POOL_IDX_T ulong
46 0 : #define POOL_NEXT pool.next
47 : #include "../../util/tmpl/fd_pool.c"
48 :
49 : #define MAP_NAME stake_map
50 0 : #define MAP_KEY pubkey
51 : #define MAP_ELE_T stake_t
52 : #define MAP_KEY_T fd_pubkey_t
53 0 : #define MAP_PREV map.prev
54 0 : #define MAP_NEXT map.next
55 0 : #define MAP_KEY_EQ(k0,k1) fd_pubkey_eq( k0, k1 )
56 0 : #define MAP_KEY_HASH(key,seed) (seed^fd_ulong_load_8( (key)->uc ))
57 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
58 : #include "../../util/tmpl/fd_map_chain.c"
59 :
60 : struct fd_gossip_private {
61 : uchar identity_pubkey[ 32UL ];
62 : ulong identity_stake;
63 :
64 : fd_gossip_metrics_t metrics[1];
65 :
66 : fd_gossip_wsample_t * wsample;
67 : fd_crds_t * crds;
68 : fd_gossip_purged_t * purged;
69 : fd_active_set_t * active_set;
70 : fd_ping_tracker_t * ping_tracker;
71 : fd_prune_finder_t * prune_finder;
72 :
73 : fd_sha256_t sha256[1];
74 : fd_sha512_t sha512[1];
75 :
76 : ulong entrypoints_cnt;
77 : fd_ip4_port_t entrypoints[ 16UL ];
78 :
79 : fd_rng_t * rng;
80 :
81 : struct {
82 : ulong count;
83 : stake_t * pool;
84 : stake_map_t * map;
85 : } stake;
86 :
87 : struct {
88 : long next_pull_request;
89 : long next_active_set_refresh;
90 : long next_contact_info_refresh;
91 : long next_flush_push_state;
92 : } timers;
93 :
94 : /* Token-bucket rate limiter for outbound pull response data.
95 : Matches Agave's DataBudget: replenished every 100ms with
96 : num_staked*1024 bytes, capped at 5x that amount. Only
97 : pull responses are rate-limited; push messages are not. */
98 : struct {
99 : ulong remaining; /* bytes remaining in budget (signed) */
100 : long last_replenish_nanos; /* last replenish timestamp in nanos */
101 : } outbound_budget;
102 :
103 : /* Callbacks */
104 : fd_gossip_sign_fn sign_fn;
105 : void * sign_ctx;
106 :
107 : fd_gossip_send_fn send_fn;
108 : void * send_ctx;
109 :
110 : fd_ping_tracker_change_fn ping_tracker_change_fn;
111 : void * ping_tracker_change_fn_ctx;
112 :
113 : struct {
114 : uchar crds_val[ FD_GOSSIP_VALUE_MAX_SZ ];
115 : ulong crds_val_sz;
116 : fd_gossip_value_t ci[1];
117 : } my_contact_info;
118 :
119 : fd_gossip_out_ctx_t * gossip_net_out;
120 : };
121 :
122 : FD_FN_CONST ulong
123 0 : fd_gossip_align( void ) {
124 0 : return 128uL;
125 0 : }
126 :
127 : FD_FN_CONST ulong
128 : fd_gossip_footprint( ulong max_values,
129 0 : ulong entrypoints_len ) {
130 0 : ulong l;
131 0 : l = FD_LAYOUT_INIT;
132 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t) );
133 0 : l = FD_LAYOUT_APPEND( l, fd_gossip_purged_align(), fd_gossip_purged_footprint( max_values ) );
134 0 : l = FD_LAYOUT_APPEND( l, fd_gossip_wsample_align(),fd_gossip_wsample_footprint( FD_CONTACT_INFO_TABLE_SIZE ) );
135 0 : l = FD_LAYOUT_APPEND( l, fd_crds_align(), fd_crds_footprint( max_values ) );
136 0 : l = FD_LAYOUT_APPEND( l, fd_active_set_align(), fd_active_set_footprint() );
137 0 : l = FD_LAYOUT_APPEND( l, fd_ping_tracker_align(), fd_ping_tracker_footprint( entrypoints_len ) );
138 0 : l = FD_LAYOUT_APPEND( l, fd_prune_finder_align(), fd_prune_finder_footprint() );
139 0 : l = FD_LAYOUT_APPEND( l, stake_pool_align(), stake_pool_footprint( MAX_STAKED_LEADERS ) );
140 0 : l = FD_LAYOUT_APPEND( l, stake_map_align(), stake_map_footprint( stake_map_chain_cnt_est( MAX_STAKED_LEADERS ) ) );
141 0 : l = FD_LAYOUT_FINI( l, fd_gossip_align() );
142 0 : return l;
143 0 : }
144 :
145 : static void
146 : ping_tracker_change( void * _ctx,
147 : uchar const * peer_pubkey,
148 : fd_ip4_port_t peer_address,
149 : long now,
150 0 : int change_type ) {
151 0 : fd_gossip_t * ctx = (fd_gossip_t *)_ctx;
152 :
153 0 : if( FD_UNLIKELY( !memcmp( peer_pubkey, ctx->identity_pubkey, 32UL ) ) ) return;
154 :
155 0 : if( FD_LIKELY( change_type==FD_PING_TRACKER_CHANGE_TYPE_ACTIVE ) ) {
156 0 : fd_gossip_purged_drain_no_contact_info( ctx->purged, peer_pubkey );
157 0 : }
158 :
159 0 : ulong ci_idx = fd_crds_ci_idx( ctx->crds, peer_pubkey );
160 0 : if( FD_UNLIKELY( ci_idx!=ULONG_MAX ) ) {
161 0 : switch( change_type ) {
162 0 : case FD_PING_TRACKER_CHANGE_TYPE_ACTIVE:
163 0 : fd_gossip_wsample_ping_tracked( ctx->wsample, ci_idx, 1 );
164 0 : break;
165 0 : case FD_PING_TRACKER_CHANGE_TYPE_INACTIVE:
166 0 : case FD_PING_TRACKER_CHANGE_TYPE_INACTIVE_STAKED:
167 0 : fd_gossip_wsample_ping_tracked( ctx->wsample, ci_idx, 0 );
168 0 : fd_active_set_remove_peer( ctx->active_set, ci_idx );
169 0 : break;
170 0 : default: FD_LOG_ERR(( "Unknown change type %d", change_type )); return;
171 0 : }
172 0 : }
173 :
174 0 : ctx->ping_tracker_change_fn( ctx->ping_tracker_change_fn_ctx, peer_pubkey, peer_address, now, change_type );
175 0 : }
176 :
177 : static inline void
178 : refresh_contact_info( fd_gossip_t * gossip,
179 0 : long now ) {
180 0 : fd_memcpy( gossip->my_contact_info.ci->origin, gossip->identity_pubkey, 32UL );
181 0 : gossip->my_contact_info.ci->wallclock = (ulong)FD_NANOSEC_TO_MILLI( now );
182 0 : long sz = fd_gossip_value_serialize( gossip->my_contact_info.ci, gossip->my_contact_info.crds_val, FD_GOSSIP_VALUE_MAX_SZ );
183 0 : FD_TEST( sz!=-1L );
184 0 : gossip->my_contact_info.crds_val_sz = (ulong)sz;
185 :
186 0 : gossip->sign_fn( gossip->sign_ctx,
187 0 : gossip->my_contact_info.crds_val+64UL,
188 0 : gossip->my_contact_info.crds_val_sz-64UL,
189 0 : FD_KEYGUARD_SIGN_TYPE_ED25519,
190 0 : gossip->my_contact_info.crds_val );
191 :
192 : /* We don't have stem_ctx here so we pre-empt in next
193 : fd_gossip_advance iteration instead. */
194 0 : gossip->timers.next_contact_info_refresh = now;
195 0 : }
196 :
197 : void *
198 : fd_gossip_new( void * shmem,
199 : fd_rng_t * rng,
200 : ulong max_values,
201 : ulong entrypoints_len,
202 : fd_ip4_port_t const * entrypoints,
203 : uchar const * identity_pubkey,
204 : fd_gossip_contact_info_t const * my_contact_info,
205 : long now,
206 : fd_gossip_send_fn send_fn,
207 : void * send_ctx,
208 : fd_gossip_sign_fn sign_fn,
209 : void * sign_ctx,
210 : fd_ping_tracker_change_fn ping_tracker_change_fn,
211 : void * ping_tracker_change_fn_ctx,
212 : fd_gossip_activity_update_fn activity_update_fn,
213 : void * activity_update_fn_ctx,
214 : fd_gossip_out_ctx_t * gossip_update_out,
215 0 : fd_gossip_out_ctx_t * gossip_net_out ) {
216 0 : if( FD_UNLIKELY( !shmem ) ) {
217 0 : FD_LOG_WARNING(( "NULL shmem" ));
218 0 : return NULL;
219 0 : }
220 :
221 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_gossip_align() ) ) ) {
222 0 : FD_LOG_WARNING(( "misaligned shmem" ));
223 0 : return NULL;
224 0 : }
225 :
226 0 : if( FD_UNLIKELY( entrypoints_len>16UL ) ) {
227 0 : FD_LOG_WARNING(( "entrypoints_cnt must be in [0, 16]" ));
228 0 : return NULL;
229 0 : }
230 :
231 0 : if( FD_UNLIKELY( !fd_ulong_is_pow2( max_values ) ) ) {
232 0 : FD_LOG_WARNING(( "max_values must be a power of 2" ));
233 0 : return NULL;
234 0 : }
235 :
236 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
237 0 : fd_gossip_t * gossip = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_gossip_t), sizeof(fd_gossip_t) );
238 0 : void * purged = FD_SCRATCH_ALLOC_APPEND( l, fd_gossip_purged_align(), fd_gossip_purged_footprint( max_values ) );
239 0 : void * wsample = FD_SCRATCH_ALLOC_APPEND( l, fd_gossip_wsample_align(), fd_gossip_wsample_footprint( FD_CONTACT_INFO_TABLE_SIZE ) );
240 0 : void * crds = FD_SCRATCH_ALLOC_APPEND( l, fd_crds_align(), fd_crds_footprint( max_values ) );
241 0 : void * active_set = FD_SCRATCH_ALLOC_APPEND( l, fd_active_set_align(), fd_active_set_footprint() );
242 0 : void * ping_tracker = FD_SCRATCH_ALLOC_APPEND( l, fd_ping_tracker_align(), fd_ping_tracker_footprint( entrypoints_len ) );
243 0 : void * prune_finder = FD_SCRATCH_ALLOC_APPEND( l, fd_prune_finder_align(), fd_prune_finder_footprint() );
244 0 : void * stake_pool = FD_SCRATCH_ALLOC_APPEND( l, stake_pool_align(), stake_pool_footprint( MAX_STAKED_LEADERS ) );
245 0 : void * stake_weights = FD_SCRATCH_ALLOC_APPEND( l, stake_map_align(), stake_map_footprint( stake_map_chain_cnt_est( MAX_STAKED_LEADERS ) ) );
246 :
247 0 : gossip->gossip_net_out = gossip_net_out;
248 :
249 0 : gossip->entrypoints_cnt = entrypoints_len;
250 0 : fd_memcpy( gossip->entrypoints, entrypoints, entrypoints_len*sizeof(fd_ip4_port_t) );
251 :
252 0 : gossip->purged = fd_gossip_purged_join( fd_gossip_purged_new( purged, rng, max_values ) );
253 0 : FD_TEST( gossip->purged );
254 :
255 0 : gossip->wsample = fd_gossip_wsample_join( fd_gossip_wsample_new( wsample, rng, FD_CONTACT_INFO_TABLE_SIZE ) );
256 0 : FD_TEST( gossip->wsample );
257 :
258 0 : gossip->crds = fd_crds_join( fd_crds_new( crds, entrypoints, entrypoints_len, gossip->wsample, active_set, rng, max_values, gossip->purged, activity_update_fn, activity_update_fn_ctx, gossip_update_out ) );
259 0 : FD_TEST( gossip->crds );
260 :
261 0 : gossip->active_set = fd_active_set_join( fd_active_set_new( active_set, gossip->wsample, gossip->crds, rng, identity_pubkey, 0UL, send_fn, send_ctx ) );
262 0 : FD_TEST( gossip->active_set );
263 :
264 0 : gossip->ping_tracker = fd_ping_tracker_join( fd_ping_tracker_new( ping_tracker, rng, gossip->entrypoints_cnt, gossip->entrypoints, ping_tracker_change, gossip ) );
265 0 : FD_TEST( gossip->ping_tracker );
266 :
267 0 : gossip->prune_finder = fd_prune_finder_join( fd_prune_finder_new( prune_finder ) );
268 0 : FD_TEST( gossip->prune_finder );
269 :
270 0 : gossip->stake.count = 0UL;
271 0 : gossip->stake.pool = stake_pool_join( stake_pool_new( stake_pool, MAX_STAKED_LEADERS ) );
272 0 : FD_TEST( gossip->stake.pool );
273 :
274 0 : gossip->stake.map = stake_map_join( stake_map_new( stake_weights, stake_map_chain_cnt_est( MAX_STAKED_LEADERS ), fd_rng_ulong( rng ) ) );
275 0 : FD_TEST( gossip->stake.map );
276 :
277 0 : FD_TEST( fd_sha256_join( fd_sha256_new( gossip->sha256 ) ) );
278 0 : FD_TEST( fd_sha512_join( fd_sha512_new( gossip->sha512 ) ) );
279 :
280 0 : gossip->rng = rng;
281 :
282 0 : gossip->timers.next_pull_request = 0L;
283 0 : gossip->timers.next_active_set_refresh = 0L;
284 0 : gossip->timers.next_contact_info_refresh = 0L;
285 0 : gossip->timers.next_flush_push_state = 0L;
286 :
287 0 : gossip->outbound_budget.remaining = 0UL;
288 0 : gossip->outbound_budget.last_replenish_nanos = now;
289 :
290 0 : gossip->send_fn = send_fn;
291 0 : gossip->send_ctx = send_ctx;
292 0 : gossip->sign_fn = sign_fn;
293 0 : gossip->sign_ctx = sign_ctx;
294 0 : gossip->ping_tracker_change_fn = ping_tracker_change_fn;
295 0 : gossip->ping_tracker_change_fn_ctx = ping_tracker_change_fn_ctx;
296 :
297 0 : gossip->my_contact_info.ci->tag = FD_GOSSIP_VALUE_CONTACT_INFO;
298 0 : *gossip->my_contact_info.ci->contact_info = *my_contact_info;
299 0 : fd_memcpy( gossip->identity_pubkey, identity_pubkey, 32UL );
300 0 : gossip->identity_stake = 0UL;
301 0 : refresh_contact_info( gossip, now );
302 :
303 0 : fd_memset( gossip->metrics, 0, sizeof(fd_gossip_metrics_t) );
304 :
305 0 : return gossip;
306 0 : }
307 :
308 : fd_gossip_t *
309 0 : fd_gossip_join( void * shgossip ) {
310 0 : if( FD_UNLIKELY( !shgossip ) ) {
311 0 : FD_LOG_WARNING(( "NULL shgossip" ));
312 0 : return NULL;
313 0 : }
314 :
315 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shgossip, fd_gossip_align() ) ) ) {
316 0 : FD_LOG_WARNING(( "misaligned shgossip" ));
317 0 : return NULL;
318 0 : }
319 :
320 0 : return (fd_gossip_t *)shgossip;
321 0 : }
322 :
323 : fd_gossip_metrics_t const *
324 0 : fd_gossip_metrics( fd_gossip_t const * gossip ) {
325 0 : return gossip->metrics;
326 0 : }
327 :
328 : fd_crds_metrics_t const *
329 0 : fd_gossip_crds_metrics( fd_gossip_t const * gossip ) {
330 0 : return fd_crds_metrics( gossip->crds );
331 0 : }
332 :
333 : fd_ping_tracker_metrics_t const *
334 0 : fd_gossip_ping_tracker_metrics( fd_gossip_t const * gossip ) {
335 0 : return fd_ping_tracker_metrics( gossip->ping_tracker );
336 0 : }
337 :
338 : fd_gossip_purged_metrics_t const *
339 0 : fd_gossip_purged_metrics2( fd_gossip_t const * gossip ) {
340 0 : return fd_gossip_purged_metrics( gossip->purged );
341 0 : }
342 :
343 : fd_active_set_metrics_t const *
344 0 : fd_gossip_active_set_metrics2( fd_gossip_t const * gossip ) {
345 0 : return fd_active_set_metrics( gossip->active_set );
346 0 : }
347 :
348 : static fd_ip4_port_t
349 0 : random_entrypoint( fd_gossip_t const * gossip ) {
350 0 : ulong idx = fd_rng_ulong_roll( gossip->rng, gossip->entrypoints_cnt );
351 0 : return gossip->entrypoints[ idx ];
352 0 : }
353 :
354 : ulong
355 : get_stake( fd_gossip_t const * gossip,
356 0 : uchar const * pubkey ) {
357 0 : stake_t const * entry = stake_map_ele_query_const( gossip->stake.map, (fd_pubkey_t const *)pubkey, NULL, gossip->stake.pool );
358 0 : if( FD_UNLIKELY( !entry ) ) return 0UL;
359 0 : return entry->stake;
360 0 : }
361 :
362 : void
363 : fd_gossip_set_identity( fd_gossip_t * gossip,
364 : uchar const * identity_pubkey,
365 0 : long now ) {
366 0 : int identity_changed = memcmp( gossip->identity_pubkey, identity_pubkey, 32UL );
367 0 : if( FD_UNLIKELY( !identity_changed ) ) return;
368 :
369 0 : ulong new_ci_idx = fd_crds_ci_idx( gossip->crds, identity_pubkey );
370 :
371 : /* The new identity may already exist in CRDS as a normal peer (active
372 : in the wsample and potentially present in the active set). We
373 : must deactivate it before updating identity_pubkey to maintain the
374 : invariant that our own identity is never sampleable. */
375 0 : if( FD_UNLIKELY( new_ci_idx!=ULONG_MAX ) ) fd_active_set_remove_peer( gossip->active_set, new_ci_idx );
376 :
377 0 : fd_memcpy( gossip->identity_pubkey, identity_pubkey, 32UL );
378 0 : gossip->identity_stake = get_stake( gossip, identity_pubkey );
379 0 : fd_gossip_wsample_set_identity( gossip->wsample, new_ci_idx );
380 0 : fd_gossip_wsample_self_stake( gossip->wsample, gossip->identity_stake );
381 0 : fd_active_set_set_identity( gossip->active_set, gossip->identity_pubkey, gossip->identity_stake );
382 0 : fd_prune_finder_set_identity( gossip->prune_finder, gossip->identity_pubkey, gossip->identity_stake );
383 0 : refresh_contact_info( gossip, now );
384 0 : }
385 :
386 : void
387 : fd_gossip_set_shred_version( fd_gossip_t * gossip,
388 : ushort shred_version,
389 0 : long now ) {
390 0 : gossip->my_contact_info.ci->contact_info->shred_version = shred_version;
391 0 : refresh_contact_info( gossip, now );
392 0 : }
393 :
394 : void
395 : fd_gossip_stakes_update( fd_gossip_t * gossip,
396 : fd_stake_weight_t const * stake_weights,
397 0 : ulong stake_weights_cnt ) {
398 0 : stake_map_reset( gossip->stake.map );
399 0 : stake_pool_reset( gossip->stake.pool );
400 :
401 0 : for( ulong i=0UL; i<stake_weights_cnt; i++ ) {
402 0 : stake_t * entry = stake_pool_ele_acquire( gossip->stake.pool );
403 0 : entry->pubkey = stake_weights[i].key;
404 0 : entry->stake = stake_weights[i].stake;
405 0 : stake_map_ele_insert( gossip->stake.map, entry, gossip->stake.pool );
406 0 : }
407 :
408 0 : gossip->identity_stake = get_stake( gossip, gossip->identity_pubkey );
409 0 : fd_gossip_wsample_self_stake( gossip->wsample, gossip->identity_stake );
410 0 : fd_active_set_set_identity( gossip->active_set, gossip->identity_pubkey, gossip->identity_stake );
411 0 : fd_prune_finder_set_identity( gossip->prune_finder, gossip->identity_pubkey, gossip->identity_stake );
412 0 : gossip->stake.count = stake_pool_used( gossip->stake.pool );
413 0 : }
414 :
415 : /* Outbound data budget constants (matching Agave's DataBudget for gossip).
416 : Budget is replenished every BUDGET_REPLENISH_INTERVAL_NS with
417 : num_staked * BUDGET_BYTES_PER_INTERVAL bytes, capped at
418 : BUDGET_MAX_MULTIPLE * num_staked * BUDGET_BYTES_PER_INTERVAL. */
419 :
420 : #define BUDGET_REPLENISH_INTERVAL_NS (100L*1000L*1000L) /* 100 ms */
421 0 : #define BUDGET_BYTES_PER_INTERVAL (1024UL) /* per staked validator */
422 0 : #define BUDGET_MAX_MULTIPLE (5UL) /* max accumulation */
423 0 : #define BUDGET_MIN_STAKED (2UL) /* floor for num_staked */
424 :
425 : /* Lazily replenish the outbound pull-response budget if at least
426 : BUDGET_REPLENISH_INTERVAL_NS have elapsed since last replenish.
427 : Returns current remaining budget in bytes. */
428 :
429 : static inline ulong
430 : outbound_budget_replenish( fd_gossip_t * gossip,
431 0 : long now ) {
432 0 : long elapsed = now-gossip->outbound_budget.last_replenish_nanos;
433 :
434 0 : if( FD_LIKELY( elapsed>=BUDGET_REPLENISH_INTERVAL_NS ) ) {
435 0 : ulong num_staked = fd_ulong_max( gossip->stake.count, BUDGET_MIN_STAKED );
436 0 : ulong increment = num_staked * BUDGET_BYTES_PER_INTERVAL;
437 0 : ulong cap = BUDGET_MAX_MULTIPLE * increment;
438 0 : ulong remaining = gossip->outbound_budget.remaining + increment;
439 0 : gossip->outbound_budget.remaining = fd_ulong_min( remaining, cap );
440 0 : gossip->outbound_budget.last_replenish_nanos = now;
441 0 : }
442 0 : return gossip->outbound_budget.remaining;
443 0 : }
444 :
445 : static inline void
446 : txbuild_flush( fd_gossip_t * gossip,
447 : fd_gossip_txbuild_t * txbuild,
448 : fd_stem_context_t * stem,
449 : fd_ip4_port_t dest_addr,
450 0 : long now ) {
451 0 : if( FD_UNLIKELY( !txbuild->crds_len ) ) return;
452 :
453 : /* Debit the outbound data budget (gossip payload bytes only, not
454 : including IP/UDP headers — matching Agave's DataBudget which
455 : operates on serialized gossip-layer packet sizes). */
456 0 : gossip->outbound_budget.remaining -= fd_ulong_min( txbuild->bytes_len, gossip->outbound_budget.remaining );
457 :
458 0 : gossip->send_fn( gossip->send_ctx, stem, txbuild->bytes, txbuild->bytes_len, &dest_addr, (ulong)now );
459 :
460 0 : gossip->metrics->message_tx[ txbuild->tag ]++;
461 0 : gossip->metrics->message_tx_bytes[ txbuild->tag ] += txbuild->bytes_len+42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
462 0 : for( ulong i=0UL; i<txbuild->crds_len; i++ ) {
463 0 : gossip->metrics->crds_tx_pull_response[ txbuild->crds[ i ].tag ]++;
464 0 : gossip->metrics->crds_tx_pull_response_bytes[ txbuild->crds[ i ].tag ] += txbuild->crds[ i ].sz;
465 0 : }
466 :
467 0 : fd_gossip_txbuild_init( txbuild, gossip->identity_pubkey, txbuild->tag );
468 0 : }
469 :
470 : static void
471 : rx_pull_request( fd_gossip_t * gossip,
472 : fd_gossip_pull_request_t const * pr_view,
473 : fd_ip4_port_t peer_addr,
474 : fd_stem_context_t * stem,
475 0 : long now ) {
476 : /* Replenish and check outbound data budget. If the budget is
477 : exhausted, skip generating pull responses entirely. */
478 0 : if( FD_UNLIKELY( !outbound_budget_replenish( gossip, now ) ) ) return;
479 :
480 : /* When responding to a pull request, we skip CRDS entries whose
481 : wallclock is newer than the caller's wallclock + a random jitter.
482 : The jitter is drawn uniformly from [0, TIMEOUT/4) ms, matching
483 : Agave's behavior (CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS = 15000ms).
484 : This prevents all responders from consistently excluding the same
485 : set of very-recent CRDS values. */
486 0 : #define FD_GOSSIP_PULL_JITTER_BOUND_MS (15000UL/4UL)
487 :
488 : /* Generate a random jitter in [0, 3750) ms, added to the caller's
489 : wallclock. CRDS entries newer than this adjusted threshold are
490 : excluded from the response. The jitter prevents all responders
491 : from consistently excluding the same near-boundary entries,
492 : improving cluster-wide convergence of recent values. */
493 0 : ulong caller_wallclock_ms = pr_view->contact_info->wallclock;
494 0 : ulong jitter_ms = fd_rng_ulong_roll( gossip->rng, FD_GOSSIP_PULL_JITTER_BOUND_MS );
495 0 : ulong adjusted_wallclock_ms = caller_wallclock_ms + jitter_ms;
496 :
497 0 : ulong keys[ sizeof(pr_view->crds_filter->filter->keys)/sizeof(ulong) ];
498 0 : ulong bits[ sizeof(pr_view->crds_filter->filter->bits)/sizeof(ulong) ];
499 0 : fd_memcpy( keys, pr_view->crds_filter->filter->keys, sizeof(pr_view->crds_filter->filter->keys) );
500 0 : fd_memcpy( bits, pr_view->crds_filter->filter->bits, sizeof(pr_view->crds_filter->filter->bits) );
501 :
502 0 : fd_bloom_t filter[1];
503 0 : filter->keys_len = pr_view->crds_filter->filter->keys_len;
504 0 : filter->keys = keys;
505 :
506 0 : filter->bits_len = pr_view->crds_filter->filter->bits_len;
507 0 : filter->bits = bits;
508 :
509 0 : fd_gossip_txbuild_t pull_resp[1];
510 0 : fd_gossip_txbuild_init( pull_resp, gossip->identity_pubkey, FD_GOSSIP_MESSAGE_PULL_RESPONSE );
511 :
512 0 : uchar iter_mem[ 16UL ];
513 :
514 0 : for( fd_crds_mask_iter_t * it=fd_crds_mask_iter_init( gossip->crds, pr_view->crds_filter->mask, pr_view->crds_filter->mask_bits, iter_mem );
515 0 : !fd_crds_mask_iter_done( it, gossip->crds );
516 0 : it=fd_crds_mask_iter_next( it, gossip->crds ) ) {
517 0 : fd_crds_entry_t const * candidate = fd_crds_mask_iter_entry( it, gossip->crds );
518 :
519 : /* Skip CRDS entries whose originator wallclock is newer than the
520 : caller's wallclock + jitter. The caller hasn't had time to
521 : observe these values yet, so including them would be wasteful. */
522 0 : if( FD_UNLIKELY( fd_crds_entry_wallclock( candidate )>adjusted_wallclock_ms ) ) continue;
523 :
524 0 : if( FD_UNLIKELY( fd_bloom_contains( filter, fd_crds_entry_hash( candidate ), 32UL ) ) ) continue;
525 :
526 0 : uchar const * crds_val;
527 0 : ulong crds_size;
528 0 : fd_crds_entry_value( candidate, &crds_val, &crds_size );
529 0 : if( FD_UNLIKELY( !fd_gossip_txbuild_can_fit( pull_resp, crds_size ) ) ) txbuild_flush( gossip, pull_resp, stem, peer_addr, now );
530 0 : fd_gossip_txbuild_append( pull_resp, crds_size, crds_val );
531 0 : if( FD_UNLIKELY( !gossip->outbound_budget.remaining ) ) break;
532 0 : }
533 :
534 0 : txbuild_flush( gossip, pull_resp, stem, peer_addr, now );
535 0 : }
536 :
537 : static void
538 : rx_values( fd_gossip_t * gossip,
539 : ulong values_len,
540 : fd_gossip_value_t const * values,
541 : uchar const * payload,
542 : uchar const * failed,
543 : fd_stem_context_t * stem,
544 : long now,
545 0 : long results[ static 17UL ] ) {
546 0 : for( ulong i=0UL; i<values_len; i++ ) {
547 0 : fd_gossip_value_t const * value = &values[ i ];
548 :
549 0 : if( FD_UNLIKELY( failed[ i ] ) ) {
550 0 : uchar candidate_hash[ 32UL ];
551 0 : fd_sha256_hash( payload+value->offset, value->length, candidate_hash );
552 0 : if( FD_LIKELY( failed[ i ]==FD_GOSSIP_FAILED_NO_CONTACT_INFO ) ) fd_gossip_purged_insert_no_contact_info( gossip->purged, value->origin, candidate_hash, now );
553 0 : else fd_gossip_purged_insert_failed_insert( gossip->purged, candidate_hash, now );
554 0 : continue;
555 0 : }
556 :
557 0 : ulong origin_stake = get_stake( gossip, value->origin );
558 0 : int origin_ping_tracker_active = fd_ping_tracker_active( gossip->ping_tracker, value->origin );
559 0 : int is_me = !memcmp( value->origin, gossip->identity_pubkey, 32UL );
560 :
561 0 : results[ i ] = fd_crds_insert( gossip->crds, value, payload+value->offset, value->length, origin_stake, origin_ping_tracker_active, is_me, now, stem );
562 0 : if( FD_UNLIKELY( results[ i ] ) ) continue;
563 :
564 0 : if( FD_UNLIKELY( value->tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) {
565 0 : fd_ip4_port_t origin_addr = {
566 0 : .addr = value->contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0U : value->contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4,
567 0 : .port = value->contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port
568 0 : };
569 0 : if( FD_LIKELY( !is_me ) ) fd_ping_tracker_track( gossip->ping_tracker, value->origin, origin_stake, origin_addr, now );
570 :
571 : /* We just learned this peer's contact info. Drain any
572 : no_contact_info hashes associated with this origin from the
573 : purged set so peers re-send those CRDS values. */
574 0 : if( FD_LIKELY( fd_ping_tracker_active( gossip->ping_tracker, value->origin ) ) ) fd_gossip_purged_drain_no_contact_info( gossip->purged, value->origin );
575 0 : }
576 :
577 0 : fd_active_set_push( gossip->active_set, payload+value->offset, value->length, value->origin, origin_stake, stem, now, 0 );
578 0 : }
579 0 : }
580 :
581 : static void
582 : rx_pull_response( fd_gossip_t * gossip,
583 : fd_gossip_pull_response_t const * pull_response,
584 : uchar const * payload,
585 : uchar const * failed,
586 : fd_stem_context_t * stem,
587 0 : long now ) {
588 0 : long results[ 17UL ];
589 0 : rx_values( gossip, pull_response->values_len, pull_response->values, payload, failed, stem, now, results );
590 0 : for( ulong i=0UL; i<pull_response->values_len; i++ ) {
591 0 : if( FD_UNLIKELY( failed[ i ] ) ) continue;
592 0 : if( FD_LIKELY( !results[ i ] ) ) gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_UPSERTED_PULL_RESPONSE_IDX ]++;
593 0 : else if( results[ i ]<0L ) gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PULL_RESPONSE_STALE_IDX ]++;
594 0 : else gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PULL_RESPONSE_DUPLICATE_IDX ]++;
595 0 : }
596 0 : }
597 :
598 : /* tx_prune constructs, signs, and sends a prune message telling
599 : `relayer` to stop pushing CRDS values originating from `origin`.
600 :
601 : On-wire layout (bincode):
602 : Protocol tag 4 (FD_GOSSIP_MESSAGE_PRUNE = 3)
603 : sender pubkey 32 (= identity_pubkey, outer PruneMessage field)
604 : PruneData.pubkey 32 (= identity_pubkey)
605 : prunes_len 8
606 : prunes[1] 32
607 : signature 64
608 : destination 32
609 : wallclock 8
610 :
611 : The signable data (input to Ed25519 sign) is the PruneData fields
612 : excluding signature:
613 : prefix[26] + pubkey[32] + prunes_len[8] + prunes[32] + destination[32] + wallclock[8]
614 : This must match fd_keyguard_payload_matches_prune_data (106 + 32 bytes). */
615 :
616 : static void
617 : tx_prune( fd_gossip_t * gossip,
618 : uchar const * relayer,
619 : uchar const * origin,
620 : fd_stem_context_t * stem,
621 0 : long now ) {
622 0 : ulong ci_idx = fd_crds_ci_idx( gossip->crds, relayer );
623 0 : if( FD_UNLIKELY( ci_idx==ULONG_MAX ) ) return;
624 :
625 0 : fd_gossip_contact_info_t const * ci = fd_crds_ci( gossip->crds, ci_idx );
626 0 : fd_ip4_port_t dest_addr = {
627 0 : .addr = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0U : ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4,
628 0 : .port = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port
629 0 : };
630 0 : if( FD_UNLIKELY( !dest_addr.addr || !dest_addr.port ) ) return;
631 :
632 0 : ulong wallclock = (ulong)FD_NANOSEC_TO_MILLI( now );
633 :
634 : /* Build the signable payload:
635 : prefix[26] + pubkey[32] + prunes_len[8] + prunes[32] + destination[32] + wallclock[8] */
636 0 : uchar signable[ 26UL + 32UL + 8UL + 32UL + 32UL + 8UL ];
637 0 : uchar * p = signable;
638 0 : FD_STORE( ulong, p, 18UL ); p += 8UL;
639 0 : fd_memcpy( p, "\xffSOLANA_PRUNE_DATA", 18UL ); p += 18UL;
640 0 : fd_memcpy( p, gossip->identity_pubkey, 32UL ); p += 32UL;
641 0 : FD_STORE( ulong, p, 1UL ); p += 8UL;
642 0 : fd_memcpy( p, origin, 32UL ); p += 32UL;
643 0 : fd_memcpy( p, relayer, 32UL ); p += 32UL;
644 0 : FD_STORE( ulong, p, wallclock ); p += 8UL;
645 :
646 0 : uchar signature[ 64UL ];
647 0 : gossip->sign_fn( gossip->sign_ctx, signable, sizeof(signable), FD_KEYGUARD_SIGN_TYPE_ED25519, signature );
648 :
649 : /* Build the on-wire packet:
650 : tag(4) + sender(32) + pubkey(32) + prunes_len(8) + prunes[32]
651 : + signature(64) + destination(32) + wallclock(8) */
652 0 : uchar pkt[ 4UL + 32UL + 32UL + 8UL + 32UL + 64UL + 32UL + 8UL ];
653 0 : uchar * q = pkt;
654 0 : FD_STORE( uint, q, FD_GOSSIP_MESSAGE_PRUNE ); q += 4UL;
655 0 : fd_memcpy( q, gossip->identity_pubkey, 32UL ); q += 32UL; /* sender */
656 0 : fd_memcpy( q, gossip->identity_pubkey, 32UL ); q += 32UL; /* PruneData.pubkey */
657 0 : FD_STORE( ulong, q, 1UL ); q += 8UL;
658 0 : fd_memcpy( q, origin, 32UL ); q += 32UL;
659 0 : fd_memcpy( q, signature, 64UL ); q += 64UL;
660 0 : fd_memcpy( q, relayer, 32UL ); q += 32UL;
661 0 : FD_STORE( ulong, q, wallclock ); q += 8UL;
662 :
663 0 : gossip->send_fn( gossip->send_ctx, stem, pkt, sizeof(pkt), &dest_addr, (ulong)now );
664 :
665 0 : gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PRUNE ]++;
666 0 : gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PRUNE ] += sizeof(pkt) + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
667 0 : }
668 :
669 : static void
670 : tx_prunes( fd_gossip_t * gossip,
671 : fd_stem_context_t * stem,
672 0 : long now ) {
673 0 : uchar const * relayer;
674 0 : uchar const * origin;
675 0 : while( fd_prune_finder_pop_prune( gossip->prune_finder, &relayer, &origin ) ) {
676 0 : tx_prune( gossip, relayer, origin, stem, now );
677 0 : }
678 0 : }
679 :
680 : static void
681 : rx_push( fd_gossip_t * gossip,
682 : fd_gossip_push_t const * push,
683 : uchar const * payload,
684 : uchar const * failed,
685 : long now,
686 0 : fd_stem_context_t * stem ) {
687 0 : long results[ 17UL ];
688 0 : rx_values( gossip, push->values_len, push->values, payload, failed, stem, now, results );
689 :
690 0 : for( ulong i=0UL; i<push->values_len; i++ ) {
691 0 : if( FD_UNLIKELY( failed[ i ] ) ) continue;
692 0 : if( FD_LIKELY( !results[ i ] ) ) gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_UPSERTED_PUSH_IDX ]++;
693 0 : else if( results[ i ]<0L ) gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PUSH_STALE_IDX ]++;
694 0 : else gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PUSH_DUPLICATE_IDX ]++;
695 :
696 0 : ulong num_dups;
697 0 : if( FD_LIKELY( !results[ i ] ) ) num_dups = 0UL;
698 0 : else if( FD_UNLIKELY( results[ i ]<0L ) ) num_dups = ULONG_MAX; /* stale => never timely */
699 0 : else num_dups = (ulong)results[ i ];
700 :
701 0 : ulong origin_stake = get_stake( gossip, push->values[ i ].origin );
702 0 : fd_prune_finder_record( gossip->prune_finder, push->values[ i ].origin, origin_stake, push->from, get_stake( gossip, push->from ), num_dups );
703 0 : }
704 :
705 0 : tx_prunes( gossip, stem, now );
706 0 : }
707 :
708 : static void
709 : rx_prune( fd_gossip_t * gossip,
710 0 : fd_gossip_prune_t const * prune ) {
711 0 : for( ulong i=0UL; i<prune->prunes_len; i++ ) {
712 0 : fd_active_set_prune( gossip->active_set,
713 0 : prune->pubkey,
714 0 : prune->prunes[ i ],
715 0 : get_stake( gossip, prune->prunes[ i ] ) );
716 0 : }
717 0 : }
718 :
719 :
720 : static void
721 : rx_ping( fd_gossip_t * gossip,
722 : fd_gossip_ping_t const * ping,
723 : fd_ip4_port_t peer_address,
724 : fd_stem_context_t * stem,
725 0 : long now ) {
726 0 : uchar out_payload[ sizeof(fd_gossip_pong_t)+4UL];
727 0 : FD_STORE( uint, out_payload, FD_GOSSIP_MESSAGE_PONG );
728 :
729 0 : fd_gossip_pong_t * out_pong = (fd_gossip_pong_t *)(out_payload + 4UL);
730 0 : fd_memcpy( out_pong->from, gossip->identity_pubkey, 32UL );
731 :
732 : /* fd_keyguard checks payloads for certain patterns before performing the
733 : sign. Pattern-matching can't be done on hashed data, so we need
734 : to supply the pre-hashed image to the sign fn (fd_keyguard will hash when
735 : supplied with FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519) while also hashing
736 : the image ourselves onto pong->ping_hash */
737 :
738 0 : uchar pre_image[ 48UL ];
739 0 : fd_memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
740 0 : fd_memcpy( pre_image+16UL, ping->token, 32UL );
741 :
742 0 : fd_sha256_hash( pre_image, 48UL, out_pong->hash );
743 :
744 0 : gossip->sign_fn( gossip->sign_ctx, pre_image, 48UL, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519, out_pong->signature );
745 0 : gossip->send_fn( gossip->send_ctx, stem, out_payload, sizeof(out_payload), &peer_address, (ulong)now );
746 :
747 0 : gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PONG ]++;
748 0 : gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PONG ] += sizeof(out_payload)+42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
749 0 : }
750 :
751 : static void
752 : rx_pong( fd_gossip_t * gossip,
753 : fd_gossip_pong_t const * pong,
754 : fd_ip4_port_t peer_address,
755 0 : long now ) {
756 0 : ulong stake = get_stake( gossip, pong->from );
757 0 : fd_ping_tracker_register( gossip->ping_tracker, pong->from, stake, peer_address, pong->hash, now );
758 0 : }
759 :
760 : void
761 : fd_gossip_rx( fd_gossip_t * gossip,
762 : fd_ip4_port_t peer,
763 : uchar const * data,
764 : ulong data_sz,
765 : long now,
766 0 : fd_stem_context_t * stem ) {
767 : /* TODO: Implement traffic shaper / bandwidth limiter */
768 0 : FD_TEST( data_sz>=sizeof(fd_gossip_message_t)+FD_GOSSIP_MESSAGE_MAX_CRDS );
769 0 : fd_gossip_message_t const * message = (fd_gossip_message_t const *)data;
770 0 : uchar const * failed = data+sizeof(fd_gossip_message_t);
771 0 : uchar const * payload = data+sizeof(fd_gossip_message_t)+FD_GOSSIP_MESSAGE_MAX_CRDS;
772 :
773 0 : switch( message->tag ) {
774 0 : case FD_GOSSIP_MESSAGE_PULL_REQUEST: rx_pull_request( gossip, message->pull_request, peer, stem, now ); break;
775 0 : case FD_GOSSIP_MESSAGE_PULL_RESPONSE: rx_pull_response( gossip, message->pull_response, payload, failed, stem, now ); break;
776 0 : case FD_GOSSIP_MESSAGE_PUSH: rx_push( gossip, message->push, payload, failed, now, stem ); break;
777 0 : case FD_GOSSIP_MESSAGE_PRUNE: rx_prune( gossip, message->prune ); break;
778 0 : case FD_GOSSIP_MESSAGE_PING: rx_ping( gossip, message->ping, peer, stem, now ); break;
779 0 : case FD_GOSSIP_MESSAGE_PONG: rx_pong( gossip, message->pong, peer, now ); break;
780 0 : default:
781 0 : FD_LOG_CRIT(( "Unknown gossip message type %u", message->tag ));
782 0 : break;
783 0 : }
784 0 : }
785 :
786 : static int
787 : fd_gossip_push( fd_gossip_t * gossip,
788 : fd_gossip_value_t const * value,
789 : fd_stem_context_t * stem,
790 0 : long now ) {
791 0 : uchar serialized[ FD_GOSSIP_VALUE_MAX_SZ ];
792 0 : long serialized_sz = fd_gossip_value_serialize( value, serialized, sizeof(serialized) );
793 0 : FD_TEST( serialized_sz!=-1L );
794 0 : gossip->sign_fn( gossip->sign_ctx, serialized+64UL, (ulong)serialized_sz-64UL, FD_KEYGUARD_SIGN_TYPE_ED25519, serialized );
795 :
796 0 : int origin_active = 0; /* Value doesn't matter, since is_me=1 it's never used. */
797 0 : if( FD_UNLIKELY( fd_crds_insert( gossip->crds, value, serialized, (ulong)serialized_sz, gossip->identity_stake, origin_active, 1, now, stem ) ) ) return -1;
798 :
799 0 : fd_active_set_push( gossip->active_set, serialized, (ulong)serialized_sz, gossip->identity_pubkey, gossip->identity_stake, stem, now, 1 );
800 0 : return 0;
801 0 : }
802 :
803 : int
804 : fd_gossip_push_vote( fd_gossip_t * gossip,
805 : uchar const * txn,
806 : ulong txn_sz,
807 : fd_stem_context_t * stem,
808 0 : long now ) {
809 0 : fd_gossip_value_t value = {
810 0 : .tag = FD_GOSSIP_VALUE_VOTE,
811 0 : .wallclock = (ulong)FD_NANOSEC_TO_MILLI( now ),
812 0 : .vote = {{
813 0 : .index = 0UL, /* TODO */
814 0 : .transaction_len = txn_sz,
815 0 : }}
816 0 : };
817 0 : fd_memcpy( value.origin, gossip->identity_pubkey, 32UL );
818 0 : FD_TEST( txn_sz<=sizeof(value.vote->transaction) );
819 0 : fd_memcpy( value.vote->transaction, txn, txn_sz );
820 :
821 0 : return fd_gossip_push( gossip, &value, stem, now );
822 0 : }
823 :
824 : int
825 : fd_gossip_push_duplicate_shred( fd_gossip_t * gossip,
826 : fd_gossip_duplicate_shred_t const * duplicate_shred,
827 : fd_stem_context_t * stem,
828 0 : long now ) {
829 0 : fd_gossip_value_t value = {
830 0 : .tag = FD_GOSSIP_VALUE_DUPLICATE_SHRED,
831 0 : .wallclock = (ulong)FD_NANOSEC_TO_MILLI( now ),
832 0 : };
833 0 : fd_memcpy( value.origin, gossip->identity_pubkey, 32UL );
834 0 : *value.duplicate_shred = *duplicate_shred;
835 :
836 0 : return fd_gossip_push( gossip, &value, stem, now );
837 0 : }
838 :
839 : static void
840 : tx_ping( fd_gossip_t * gossip,
841 : fd_stem_context_t * stem,
842 : long now,
843 0 : int * charge_busy ) {
844 0 : uchar out_payload[ sizeof(fd_gossip_ping_t) + 4UL ];
845 0 : FD_STORE( uint, out_payload, FD_GOSSIP_MESSAGE_PING );
846 :
847 0 : fd_gossip_ping_t * out_ping = (fd_gossip_ping_t *)( out_payload+4UL );
848 0 : fd_memcpy( out_ping->from, gossip->identity_pubkey, 32UL );
849 :
850 0 : uchar const * peer_pubkey;
851 0 : uchar const * ping_token;
852 0 : fd_ip4_port_t const * peer_address;
853 0 : while( fd_ping_tracker_pop_request( gossip->ping_tracker,
854 0 : now,
855 0 : &peer_pubkey,
856 0 : &peer_address,
857 0 : &ping_token ) ) {
858 0 : fd_memcpy( out_ping->token, ping_token, 32UL );
859 :
860 0 : gossip->sign_fn( gossip->sign_ctx, out_ping->token, 32UL, FD_KEYGUARD_SIGN_TYPE_ED25519, out_ping->signature );
861 0 : gossip->send_fn( gossip->send_ctx, stem, out_payload, sizeof(out_payload), peer_address, (ulong)now );
862 :
863 0 : gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PING ]++;
864 0 : gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PING ] += sizeof(out_payload) + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
865 0 : if( charge_busy ) *charge_busy = 1;
866 0 : }
867 0 : }
868 :
869 : /* Construct and send a pull request to a random peer. The pull
870 : request contains a bloom filter over our known CRDS hashes so that
871 : the peer can respond with values we are missing.
872 :
873 : NOTE: Divergence from Agave:
874 : - Agave builds up to 2^mask_bits filters per pull period
875 : (sampling up to 1024), each covering a distinct partition of
876 : the hash space. We build and send exactly one filter per
877 : pull period, covering 1/2^mask_bits of the space.
878 :
879 : Maximum bloom filter bits in a PullRequest packet:
880 :
881 : PACKET_DATA_SIZE = 1232 (= 1280 - 40 - 8)
882 :
883 : Bytes consumed by non-bloom fields:
884 : discriminant(4) + keys_len(8) + keys(8*num_keys) +
885 : has_bits(1) + bloom_vec_len(8) + bloom_bits_count(8) +
886 : bloom_num_bits_set(8) + mask(8) + mask_bits(4)
887 : + contact_info_crds_val(crds_val_sz)
888 : = 49 + 8*num_keys + crds_val_sz
889 :
890 : The bitvec is serialized as u64 words, so the bitvec storage is
891 : ceil(num_bits/64)*8 bytes. The remaining packet bytes must
892 : accommodate this.
893 :
894 : Agave determines the max_bytes parameter (input to Bloom::random)
895 : via an empirical cache (get_max_bloom_filter_bytes). max_bytes*8
896 : is passed as the max_bits cap to Bloom::random, but actual
897 : num_bits is only ~83% of max_bits (the E/D ratio for p=0.1).
898 : We replicate this with a closed-form inversion: the largest
899 : max_bytes where ceil(num_bits/64)*8 fits in remaining space is
900 : max_bytes = floor(D * floor(64*W/E) / 8), where W is the max
901 : number of u64 words, E and D are the bloom filter constants.
902 :
903 : num_keys depends on the bloom sizing, which depends on the
904 : overhead, which depends on num_keys. However there is a closed
905 : form: compute num_keys from the pessimistic KEYS=8 overhead, then
906 : recompute the tight overhead with the true num_keys. This always
907 : converges in one step because the optimal key count is
908 : D*ln(2) ≈ 3.32 (where D = ln(p)/ln(1/2^ln2)), far from any
909 : rounding boundary. For p=0.1 and KEYS=8, num_keys is always 3.
910 :
911 : NB: The has_bits(1) + bloom_vec_len(8) are only written when
912 : num_bits>=1. fd_bloom_num_bits clamps to [1, max_bits], so
913 : num_bits>=1 always holds and this layout is correct. */
914 :
915 : static void
916 : tx_pull_request( fd_gossip_t * gossip,
917 : fd_stem_context_t * stem,
918 0 : long now ) {
919 0 : ulong total_crds_vals = fd_crds_len( gossip->crds ) + fd_gossip_purged_len( gossip->purged );
920 0 : ulong num_items = fd_ulong_max( 65536UL, total_crds_vals );
921 0 : ulong crds_val_sz = gossip->my_contact_info.crds_val_sz;
922 :
923 : /* Step 1: Compute num_keys from the pessimistic KEYS=8 overhead
924 : (same initial estimate Agave uses in CrdsFilterSet::new). */
925 0 : ulong pessimistic_overhead = 49UL + 8UL*(ulong)BLOOM_NUM_KEYS + crds_val_sz;
926 0 : FD_TEST( pessimistic_overhead<FD_GOSSIP_MTU );
927 0 : double pessimistic_max_bits = (double)( 8UL*( FD_GOSSIP_MTU - pessimistic_overhead ) );
928 0 : double pessimistic_items = fd_bloom_max_items( pessimistic_max_bits, BLOOM_NUM_KEYS, BLOOM_FALSE_POSITIVE_RATE );
929 0 : FD_TEST( pessimistic_items>0.0 );
930 0 : ulong pessimistic_num_bits = fd_bloom_num_bits( pessimistic_items, BLOOM_FALSE_POSITIVE_RATE, pessimistic_max_bits );
931 0 : ulong num_keys = fd_bloom_num_keys( (double)pessimistic_num_bits, pessimistic_items );
932 :
933 : /* Step 2: Recompute with the tight overhead using the true num_keys.
934 : Find the largest max_bytes parameter (matching Agave's
935 : get_max_bloom_filter_bytes cache) such that the resulting bitvec
936 : fits in the remaining packet space.
937 :
938 : Given:
939 : max_items = ceil(max_bits / D) where D = -K / ln(1-exp(ln(p)/K))
940 : num_bits = ceil(max_items * E) where E = ln(p) / ln(1/2^ln2)
941 :
942 : We need ceil(num_bits/64)*8 <= remaining, i.e. num_bits <= 64*W
943 : where W = floor(remaining/8). Working backwards:
944 : max_items <= I where I = floor(64*W / E)
945 : max_bytes <= D*I / 8
946 :
947 : So max_bytes = floor(D * floor(64*W/E) / 8). */
948 0 : ulong overhead = 49UL + 8UL*num_keys + crds_val_sz;
949 0 : FD_TEST( overhead<FD_GOSSIP_MTU );
950 0 : ulong remaining = FD_GOSSIP_MTU - overhead;
951 0 : ulong max_words = remaining / 8UL; /* max u64 words for bitvec */
952 :
953 0 : double E = log( BLOOM_FALSE_POSITIVE_RATE ) / log( 1.0 / pow( 2.0, log( 2.0 ) ) );
954 0 : double D = -BLOOM_NUM_KEYS / log( 1.0 - exp( log( BLOOM_FALSE_POSITIVE_RATE ) / BLOOM_NUM_KEYS ) );
955 0 : ulong I = (ulong)floor( 64.0 * (double)max_words / E );
956 0 : ulong max_bytes = (ulong)floor( D * (double)I / 8.0 );
957 :
958 0 : double max_bits = (double)( max_bytes * 8UL );
959 0 : double max_items = fd_bloom_max_items( max_bits, BLOOM_NUM_KEYS, BLOOM_FALSE_POSITIVE_RATE );
960 0 : FD_TEST( max_items>0.0 );
961 0 : ulong num_bits = fd_bloom_num_bits( max_items, BLOOM_FALSE_POSITIVE_RATE, max_bits );
962 0 : FD_TEST( num_bits>=1UL );
963 0 : FD_TEST( (num_bits+63UL)/64UL<=max_words ); /* verify bitvec fits */
964 0 : FD_TEST( fd_bloom_num_keys( (double)num_bits, max_items )==num_keys ); /* verify convergence */
965 :
966 0 : double _mask_bits = ceil( log2( (double)num_items / max_items ) );
967 0 : uint mask_bits = _mask_bits >= 0.0 ? fd_uint_min( (uint)_mask_bits, 63U ) : 0U;
968 0 : ulong mask = fd_rng_ulong( gossip->rng ) | (~0UL>>(mask_bits));
969 :
970 0 : uchar payload[ FD_GOSSIP_MTU ] = {0};
971 :
972 0 : ulong * keys_ptr, * bits_ptr, * bits_set;
973 0 : long payload_sz = fd_gossip_pull_request_init( payload,
974 0 : FD_GOSSIP_MTU,
975 0 : num_keys,
976 0 : num_bits,
977 0 : mask,
978 0 : mask_bits,
979 0 : gossip->my_contact_info.crds_val,
980 0 : gossip->my_contact_info.crds_val_sz,
981 0 : &keys_ptr,
982 0 : &bits_ptr,
983 0 : &bits_set );
984 0 : FD_TEST( -1L!=payload_sz );
985 :
986 0 : fd_bloom_t filter[1];
987 0 : fd_bloom_init_inplace( keys_ptr, bits_ptr, num_keys, num_bits, 0, gossip->rng, BLOOM_FALSE_POSITIVE_RATE, filter );
988 :
989 0 : uchar iter_mem[ 16UL ];
990 0 : for( fd_crds_mask_iter_t * it = fd_crds_mask_iter_init( gossip->crds, mask, mask_bits, iter_mem );
991 0 : !fd_crds_mask_iter_done( it, gossip->crds );
992 0 : it = fd_crds_mask_iter_next( it, gossip->crds ) ) {
993 0 : fd_bloom_insert( filter, fd_crds_entry_hash( fd_crds_mask_iter_entry( it, gossip->crds ) ), 32UL );
994 0 : }
995 :
996 0 : for( fd_gossip_purged_mask_iter_t * it = fd_gossip_purged_mask_iter_init( gossip->purged, mask, mask_bits, iter_mem );
997 0 : !fd_gossip_purged_mask_iter_done( it, gossip->purged );
998 0 : it = fd_gossip_purged_mask_iter_next( it, gossip->purged ) ){
999 0 : fd_bloom_insert( filter, fd_gossip_purged_mask_iter_hash( it, gossip->purged ), 32UL );
1000 0 : }
1001 :
1002 0 : int num_bits_set = 0;
1003 0 : for( ulong i=0UL; i<(num_bits+63)/64UL; i++ ) num_bits_set += fd_ulong_popcnt( bits_ptr[ i ] );
1004 0 : *bits_set = (ulong)num_bits_set;
1005 :
1006 0 : ulong idx = fd_gossip_wsample_sample_pull_request( gossip->wsample );
1007 0 : fd_ip4_port_t peer_addr;
1008 0 : if( FD_UNLIKELY( idx==ULONG_MAX ) ) {
1009 0 : if( FD_UNLIKELY( !gossip->entrypoints_cnt ) ) {
1010 : /* We are the bootstrapping node, and nobody else is present in
1011 : the cluster. Nowhere to send the pull request. */
1012 0 : return;
1013 0 : }
1014 0 : peer_addr = random_entrypoint( gossip );
1015 0 : } else {
1016 0 : fd_gossip_contact_info_t const * peer = fd_crds_ci( gossip->crds, idx );
1017 0 : peer_addr.addr = peer->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0 : peer->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4;
1018 0 : peer_addr.port = peer->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port;
1019 0 : }
1020 0 : gossip->send_fn( gossip->send_ctx, stem, payload, (ulong)payload_sz, &peer_addr, (ulong)now );
1021 :
1022 0 : gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PULL_REQUEST ]++;
1023 0 : gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PULL_REQUEST ] += (ulong)payload_sz + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
1024 0 : }
1025 :
1026 : void
1027 : fd_gossip_advance( fd_gossip_t * gossip,
1028 : long now,
1029 : fd_stem_context_t * stem,
1030 0 : int * charge_busy ) {
1031 0 : outbound_budget_replenish( gossip, now );
1032 :
1033 0 : fd_gossip_purged_expire( gossip->purged, now );
1034 0 : fd_active_set_advance( gossip->active_set, stem, now, charge_busy );
1035 0 : fd_crds_advance( gossip->crds, now, stem, charge_busy );
1036 :
1037 0 : tx_ping( gossip, stem, now, charge_busy );
1038 0 : if( FD_UNLIKELY( now>=gossip->timers.next_pull_request ) ) {
1039 0 : tx_pull_request( gossip, stem, now );
1040 0 : if( charge_busy ) *charge_busy = 1;
1041 : /* 1.6ms (625/s). Agave sends min(1024, ceil(2^mask_bits/8))
1042 : filters every 500ms. For a typical mainnet table (~65k items,
1043 : mask_bits≈7) that is ~16 filters/500ms = one every 31ms. We
1044 : send a single filter per round, so we fire ~20× more often to
1045 : compensate for sending one filter instead of many per period.
1046 :
1047 : We considered dynamically matching Agave's exact rate by
1048 : computing 500ms/filters_per_round from mask_bits each round,
1049 : but this caused slow table fill on startup (mask_bits starts
1050 : low -> long intervals -> few pulls -> slow CRDS population).
1051 : Adaptive boosting (counter-based, timestamp-based, and
1052 : threshold-based) all added complexity without clear benefit:
1053 : counter decay lost state between send and response arrival,
1054 : timestamp checks never disarmed because trickle inserts kept
1055 : refreshing the window, and threshold heuristics required
1056 : tuning constants that varied by cluster size.
1057 :
1058 : A fixed 1.6ms is simpler and robust: the cost of a redundant
1059 : pull request is negligible (a single 1232-byte packet whose
1060 : reply will be empty if we're already caught up), and it
1061 : guarantees fast table fill on startup without any adaptive
1062 : machinery. */
1063 0 : gossip->timers.next_pull_request = now+1600L*1000L;
1064 0 : }
1065 0 : if( FD_UNLIKELY( now>=gossip->timers.next_contact_info_refresh ) ) {
1066 : /* TODO: Frequency of this? More often if observing? */
1067 0 : refresh_contact_info( gossip, now );
1068 0 : int origin_active = 0; /* Value doesn't matter, since is_me=1 it's never used. */
1069 0 : fd_crds_insert( gossip->crds, gossip->my_contact_info.ci, gossip->my_contact_info.crds_val, gossip->my_contact_info.crds_val_sz, gossip->identity_stake, origin_active, 1, now, stem );
1070 0 : fd_active_set_push( gossip->active_set, gossip->my_contact_info.crds_val, gossip->my_contact_info.crds_val_sz, gossip->identity_pubkey, gossip->identity_stake, stem, now, 1 );
1071 0 : gossip->timers.next_contact_info_refresh = now+15L*500L*1000L*1000L; /* TODO: Jitter */
1072 0 : if( charge_busy ) *charge_busy = 1;
1073 0 : }
1074 0 : }
1075 :
1076 : void
1077 : fd_gossip_ping_tracker_track( fd_gossip_t * gossip,
1078 : uchar const * peer_pubkey,
1079 : fd_ip4_port_t peer_address,
1080 0 : long now ) {
1081 0 : ulong origin_stake = get_stake( gossip, peer_pubkey );
1082 0 : fd_ping_tracker_track( gossip->ping_tracker, peer_pubkey, origin_stake, peer_address, now );
1083 0 : }
|