LCOV - code coverage report
Current view: top level - flamenco/gossip - fd_gossip.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 550 0.0 %
Date: 2026-03-31 06:22:16 Functions: 0 35 0.0 %

          Line data    Source code
       1             : #include "fd_gossip.h"
       2             : #include "fd_bloom.h"
       3             : #include "fd_gossip_message.h"
       4             : #include "fd_gossip_txbuild.h"
       5             : #include "fd_active_set.h"
       6             : #include "fd_ping_tracker.h"
       7             : #include "fd_prune_finder.h"
       8             : #include "fd_gossip_wsample.h"
       9             : #include "../../disco/keyguard/fd_keyguard.h"
      10             : #include "../../ballet/sha256/fd_sha256.h"
      11             : #include "../leaders/fd_leaders_base.h"
      12             : 
      13             : FD_STATIC_ASSERT( FD_METRICS_ENUM_GOSSIP_MESSAGE_CNT==FD_GOSSIP_MESSAGE_CNT,
      14             :                   "FD_METRICS_ENUM_GOSSIP_MESSAGE_CNT must match FD_GOSSIP_MESSAGE_CNT" );
      15             : 
      16             : FD_STATIC_ASSERT( FD_METRICS_ENUM_CRDS_VALUE_CNT==FD_GOSSIP_VALUE_CNT,
      17             :                   "FD_METRICS_ENUM_CRDS_VALUE_CNT must match FD_GOSSIP_VALUE_CNT" );
      18             : 
      19           0 : #define BLOOM_FALSE_POSITIVE_RATE (0.1)
      20           0 : #define BLOOM_NUM_KEYS            (8.0)
      21             : 
      22             : struct stake {
      23             :   fd_pubkey_t pubkey;
      24             :   ulong       stake;
      25             : 
      26             :   struct {
      27             :     ulong prev;
      28             :     ulong next;
      29             :   } map;
      30             : 
      31             :   struct {
      32             :     ulong next;
      33             :   } pool;
      34             : };
      35             : 
      36             : typedef struct stake stake_t;
      37             : 
      38             : /* NOTE: Since the staked count is known at the time we populate
      39             :    the map, we can treat the pool as an array instead. This means we
      40             :    can bypass the acquire/release model and quickly iterate through the
      41             :    pool when we repopulate the map on every fd_gossip_stakes_update
      42             :    iteration. */
      43             : #define POOL_NAME  stake_pool
      44           0 : #define POOL_T     stake_t
      45             : #define POOL_IDX_T ulong
      46           0 : #define POOL_NEXT  pool.next
      47             : #include "../../util/tmpl/fd_pool.c"
      48             : 
      49             : #define MAP_NAME               stake_map
      50           0 : #define MAP_KEY                pubkey
      51             : #define MAP_ELE_T              stake_t
      52             : #define MAP_KEY_T              fd_pubkey_t
      53           0 : #define MAP_PREV               map.prev
      54           0 : #define MAP_NEXT               map.next
      55           0 : #define MAP_KEY_EQ(k0,k1)      fd_pubkey_eq( k0, k1 )
      56           0 : #define MAP_KEY_HASH(key,seed) (seed^fd_ulong_load_8( (key)->uc ))
      57             : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
      58             : #include "../../util/tmpl/fd_map_chain.c"
      59             : 
      60             : struct fd_gossip_private {
      61             :   uchar               identity_pubkey[ 32UL ];
      62             :   ulong               identity_stake;
      63             : 
      64             :   fd_gossip_metrics_t metrics[1];
      65             : 
      66             :   fd_gossip_wsample_t * wsample;
      67             :   fd_crds_t *           crds;
      68             :   fd_gossip_purged_t *  purged;
      69             :   fd_active_set_t *     active_set;
      70             :   fd_ping_tracker_t *   ping_tracker;
      71             :   fd_prune_finder_t *   prune_finder;
      72             : 
      73             :   fd_sha256_t sha256[1];
      74             :   fd_sha512_t sha512[1];
      75             : 
      76             :   ulong         entrypoints_cnt;
      77             :   fd_ip4_port_t entrypoints[ 16UL ];
      78             : 
      79             :   fd_rng_t * rng;
      80             : 
      81             :   struct {
      82             :     ulong         count;
      83             :     stake_t *     pool;
      84             :     stake_map_t * map;
      85             :   } stake;
      86             : 
      87             :   struct {
      88             :     long next_pull_request;
      89             :     long next_active_set_refresh;
      90             :     long next_contact_info_refresh;
      91             :     long next_flush_push_state;
      92             :   } timers;
      93             : 
      94             :   /* Token-bucket rate limiter for outbound pull response data.
      95             :      Matches Agave's DataBudget: replenished every 100ms with
      96             :      num_staked*1024 bytes, capped at 5x that amount.  Only
      97             :      pull responses are rate-limited; push messages are not. */
      98             :   struct {
      99             :     ulong remaining;           /* bytes remaining in budget (signed) */
     100             :     long last_replenish_nanos; /* last replenish timestamp in nanos  */
     101             :   } outbound_budget;
     102             : 
     103             :   /* Callbacks */
     104             :   fd_gossip_sign_fn   sign_fn;
     105             :   void *              sign_ctx;
     106             : 
     107             :   fd_gossip_send_fn   send_fn;
     108             :   void *              send_ctx;
     109             : 
     110             :   fd_ping_tracker_change_fn ping_tracker_change_fn;
     111             :   void *                    ping_tracker_change_fn_ctx;
     112             : 
     113             :   struct {
     114             :     uchar             crds_val[ FD_GOSSIP_VALUE_MAX_SZ ];
     115             :     ulong             crds_val_sz;
     116             :     fd_gossip_value_t ci[1];
     117             :   } my_contact_info;
     118             : 
     119             :   fd_gossip_out_ctx_t * gossip_net_out;
     120             : };
     121             : 
     122             : FD_FN_CONST ulong
     123           0 : fd_gossip_align( void ) {
     124           0 :   return 128uL;
     125           0 : }
     126             : 
     127             : FD_FN_CONST ulong
     128             : fd_gossip_footprint( ulong max_values,
     129           0 :                      ulong entrypoints_len ) {
     130           0 :   ulong l;
     131           0 :   l = FD_LAYOUT_INIT;
     132           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_gossip_t),     sizeof(fd_gossip_t)                                                  );
     133           0 :   l = FD_LAYOUT_APPEND( l, fd_gossip_purged_align(), fd_gossip_purged_footprint( max_values )                             );
     134           0 :   l = FD_LAYOUT_APPEND( l, fd_gossip_wsample_align(),fd_gossip_wsample_footprint( FD_CONTACT_INFO_TABLE_SIZE )            );
     135           0 :   l = FD_LAYOUT_APPEND( l, fd_crds_align(),          fd_crds_footprint( max_values )                                      );
     136           0 :   l = FD_LAYOUT_APPEND( l, fd_active_set_align(),    fd_active_set_footprint()                                            );
     137           0 :   l = FD_LAYOUT_APPEND( l, fd_ping_tracker_align(),  fd_ping_tracker_footprint( entrypoints_len )                         );
     138           0 :   l = FD_LAYOUT_APPEND( l, fd_prune_finder_align(),  fd_prune_finder_footprint()                                          );
     139           0 :   l = FD_LAYOUT_APPEND( l, stake_pool_align(),       stake_pool_footprint( MAX_STAKED_LEADERS )                           );
     140           0 :   l = FD_LAYOUT_APPEND( l, stake_map_align(),        stake_map_footprint( stake_map_chain_cnt_est( MAX_STAKED_LEADERS ) ) );
     141           0 :   l = FD_LAYOUT_FINI( l, fd_gossip_align() );
     142           0 :   return l;
     143           0 : }
     144             : 
     145             : static void
     146             : ping_tracker_change( void *        _ctx,
     147             :                      uchar const * peer_pubkey,
     148             :                      fd_ip4_port_t peer_address,
     149             :                      long          now,
     150           0 :                      int           change_type ) {
     151           0 :   fd_gossip_t * ctx = (fd_gossip_t *)_ctx;
     152             : 
     153           0 :   if( FD_UNLIKELY( !memcmp( peer_pubkey, ctx->identity_pubkey, 32UL ) ) ) return;
     154             : 
     155           0 :   if( FD_LIKELY( change_type==FD_PING_TRACKER_CHANGE_TYPE_ACTIVE ) ) {
     156           0 :     fd_gossip_purged_drain_no_contact_info( ctx->purged, peer_pubkey );
     157           0 :   }
     158             : 
     159           0 :   ulong ci_idx = fd_crds_ci_idx( ctx->crds, peer_pubkey );
     160           0 :   if( FD_UNLIKELY( ci_idx!=ULONG_MAX ) ) {
     161           0 :     switch( change_type ) {
     162           0 :       case FD_PING_TRACKER_CHANGE_TYPE_ACTIVE:
     163           0 :         fd_gossip_wsample_ping_tracked( ctx->wsample, ci_idx, 1 );
     164           0 :         break;
     165           0 :       case FD_PING_TRACKER_CHANGE_TYPE_INACTIVE:
     166           0 :       case FD_PING_TRACKER_CHANGE_TYPE_INACTIVE_STAKED:
     167           0 :         fd_gossip_wsample_ping_tracked( ctx->wsample, ci_idx, 0 );
     168           0 :         fd_active_set_remove_peer( ctx->active_set, ci_idx );
     169           0 :         break;
     170           0 :       default: FD_LOG_ERR(( "Unknown change type %d", change_type )); return;
     171           0 :     }
     172           0 :   }
     173             : 
     174           0 :   ctx->ping_tracker_change_fn( ctx->ping_tracker_change_fn_ctx, peer_pubkey, peer_address, now, change_type );
     175           0 : }
     176             : 
     177             : static inline void
     178             : refresh_contact_info( fd_gossip_t * gossip,
     179           0 :                       long          now ) {
     180           0 :   fd_memcpy( gossip->my_contact_info.ci->origin, gossip->identity_pubkey, 32UL );
     181           0 :   gossip->my_contact_info.ci->wallclock = (ulong)FD_NANOSEC_TO_MILLI( now );
     182           0 :   long sz = fd_gossip_value_serialize( gossip->my_contact_info.ci, gossip->my_contact_info.crds_val, FD_GOSSIP_VALUE_MAX_SZ );
     183           0 :   FD_TEST( sz!=-1L );
     184           0 :   gossip->my_contact_info.crds_val_sz = (ulong)sz;
     185             : 
     186           0 :   gossip->sign_fn( gossip->sign_ctx,
     187           0 :                    gossip->my_contact_info.crds_val+64UL,
     188           0 :                    gossip->my_contact_info.crds_val_sz-64UL,
     189           0 :                    FD_KEYGUARD_SIGN_TYPE_ED25519,
     190           0 :                    gossip->my_contact_info.crds_val );
     191             : 
     192             :   /* We don't have stem_ctx here so we pre-empt in next
     193             :      fd_gossip_advance iteration instead. */
     194           0 :   gossip->timers.next_contact_info_refresh = now;
     195           0 : }
     196             : 
     197             : void *
     198             : fd_gossip_new( void *                           shmem,
     199             :                fd_rng_t *                       rng,
     200             :                ulong                            max_values,
     201             :                ulong                            entrypoints_len,
     202             :                fd_ip4_port_t const *            entrypoints,
     203             :                uchar const *                    identity_pubkey,
     204             :                fd_gossip_contact_info_t const * my_contact_info,
     205             :                long                             now,
     206             :                fd_gossip_send_fn                send_fn,
     207             :                void *                           send_ctx,
     208             :                fd_gossip_sign_fn                sign_fn,
     209             :                void *                           sign_ctx,
     210             :                fd_ping_tracker_change_fn        ping_tracker_change_fn,
     211             :                void *                           ping_tracker_change_fn_ctx,
     212             :                fd_gossip_activity_update_fn     activity_update_fn,
     213             :                void *                           activity_update_fn_ctx,
     214             :                fd_gossip_out_ctx_t *            gossip_update_out,
     215           0 :                fd_gossip_out_ctx_t *            gossip_net_out ) {
     216           0 :   if( FD_UNLIKELY( !shmem ) ) {
     217           0 :     FD_LOG_WARNING(( "NULL shmem" ));
     218           0 :     return NULL;
     219           0 :   }
     220             : 
     221           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_gossip_align() ) ) ) {
     222           0 :     FD_LOG_WARNING(( "misaligned shmem" ));
     223           0 :     return NULL;
     224           0 :   }
     225             : 
     226           0 :   if( FD_UNLIKELY( entrypoints_len>16UL ) ) {
     227           0 :     FD_LOG_WARNING(( "entrypoints_cnt must be in [0, 16]" ));
     228           0 :     return NULL;
     229           0 :   }
     230             : 
     231           0 :   if( FD_UNLIKELY( !fd_ulong_is_pow2( max_values ) ) ) {
     232           0 :     FD_LOG_WARNING(( "max_values must be a power of 2" ));
     233           0 :     return NULL;
     234           0 :   }
     235             : 
     236           0 :   FD_SCRATCH_ALLOC_INIT( l, shmem );
     237           0 :   fd_gossip_t * gossip  = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_gossip_t),      sizeof(fd_gossip_t)                                                  );
     238           0 :   void * purged         = FD_SCRATCH_ALLOC_APPEND( l, fd_gossip_purged_align(),  fd_gossip_purged_footprint( max_values )                             );
     239           0 :   void * wsample        = FD_SCRATCH_ALLOC_APPEND( l, fd_gossip_wsample_align(), fd_gossip_wsample_footprint( FD_CONTACT_INFO_TABLE_SIZE )            );
     240           0 :   void * crds           = FD_SCRATCH_ALLOC_APPEND( l, fd_crds_align(),           fd_crds_footprint( max_values )                                      );
     241           0 :   void * active_set     = FD_SCRATCH_ALLOC_APPEND( l, fd_active_set_align(),     fd_active_set_footprint()                                            );
     242           0 :   void * ping_tracker   = FD_SCRATCH_ALLOC_APPEND( l, fd_ping_tracker_align(),   fd_ping_tracker_footprint( entrypoints_len )                         );
     243           0 :   void * prune_finder   = FD_SCRATCH_ALLOC_APPEND( l, fd_prune_finder_align(),   fd_prune_finder_footprint()                                          );
     244           0 :   void * stake_pool     = FD_SCRATCH_ALLOC_APPEND( l, stake_pool_align(),        stake_pool_footprint( MAX_STAKED_LEADERS )                           );
     245           0 :   void * stake_weights  = FD_SCRATCH_ALLOC_APPEND( l, stake_map_align(),         stake_map_footprint( stake_map_chain_cnt_est( MAX_STAKED_LEADERS ) ) );
     246             : 
     247           0 :   gossip->gossip_net_out  = gossip_net_out;
     248             : 
     249           0 :   gossip->entrypoints_cnt = entrypoints_len;
     250           0 :   fd_memcpy( gossip->entrypoints, entrypoints, entrypoints_len*sizeof(fd_ip4_port_t) );
     251             : 
     252           0 :   gossip->purged = fd_gossip_purged_join( fd_gossip_purged_new( purged, rng, max_values ) );
     253           0 :   FD_TEST( gossip->purged );
     254             : 
     255           0 :   gossip->wsample = fd_gossip_wsample_join( fd_gossip_wsample_new( wsample, rng, FD_CONTACT_INFO_TABLE_SIZE ) );
     256           0 :   FD_TEST( gossip->wsample );
     257             : 
     258           0 :   gossip->crds = fd_crds_join( fd_crds_new( crds, entrypoints, entrypoints_len, gossip->wsample, active_set, rng, max_values, gossip->purged, activity_update_fn, activity_update_fn_ctx, gossip_update_out ) );
     259           0 :   FD_TEST( gossip->crds );
     260             : 
     261           0 :   gossip->active_set = fd_active_set_join( fd_active_set_new( active_set, gossip->wsample, gossip->crds, rng, identity_pubkey, 0UL, send_fn, send_ctx ) );
     262           0 :   FD_TEST( gossip->active_set );
     263             : 
     264           0 :   gossip->ping_tracker = fd_ping_tracker_join( fd_ping_tracker_new( ping_tracker, rng, gossip->entrypoints_cnt, gossip->entrypoints, ping_tracker_change, gossip ) );
     265           0 :   FD_TEST( gossip->ping_tracker );
     266             : 
     267           0 :   gossip->prune_finder = fd_prune_finder_join( fd_prune_finder_new( prune_finder ) );
     268           0 :   FD_TEST( gossip->prune_finder );
     269             : 
     270           0 :   gossip->stake.count = 0UL;
     271           0 :   gossip->stake.pool = stake_pool_join( stake_pool_new( stake_pool, MAX_STAKED_LEADERS ) );
     272           0 :   FD_TEST( gossip->stake.pool );
     273             : 
     274           0 :   gossip->stake.map = stake_map_join( stake_map_new( stake_weights, stake_map_chain_cnt_est( MAX_STAKED_LEADERS ), fd_rng_ulong( rng ) ) );
     275           0 :   FD_TEST( gossip->stake.map );
     276             : 
     277           0 :   FD_TEST( fd_sha256_join( fd_sha256_new( gossip->sha256 ) ) );
     278           0 :   FD_TEST( fd_sha512_join( fd_sha512_new( gossip->sha512 ) ) );
     279             : 
     280           0 :   gossip->rng = rng;
     281             : 
     282           0 :   gossip->timers.next_pull_request = 0L;
     283           0 :   gossip->timers.next_active_set_refresh = 0L;
     284           0 :   gossip->timers.next_contact_info_refresh = 0L;
     285           0 :   gossip->timers.next_flush_push_state = 0L;
     286             : 
     287           0 :   gossip->outbound_budget.remaining            = 0UL;
     288           0 :   gossip->outbound_budget.last_replenish_nanos = now;
     289             : 
     290           0 :   gossip->send_fn  = send_fn;
     291           0 :   gossip->send_ctx = send_ctx;
     292           0 :   gossip->sign_fn  = sign_fn;
     293           0 :   gossip->sign_ctx = sign_ctx;
     294           0 :   gossip->ping_tracker_change_fn     = ping_tracker_change_fn;
     295           0 :   gossip->ping_tracker_change_fn_ctx = ping_tracker_change_fn_ctx;
     296             : 
     297           0 :   gossip->my_contact_info.ci->tag = FD_GOSSIP_VALUE_CONTACT_INFO;
     298           0 :   *gossip->my_contact_info.ci->contact_info = *my_contact_info;
     299           0 :   fd_memcpy( gossip->identity_pubkey, identity_pubkey, 32UL );
     300           0 :   gossip->identity_stake = 0UL;
     301           0 :   refresh_contact_info( gossip, now );
     302             : 
     303           0 :   fd_memset( gossip->metrics, 0, sizeof(fd_gossip_metrics_t) );
     304             : 
     305           0 :   return gossip;
     306           0 : }
     307             : 
     308             : fd_gossip_t *
     309           0 : fd_gossip_join( void * shgossip ) {
     310           0 :   if( FD_UNLIKELY( !shgossip ) ) {
     311           0 :     FD_LOG_WARNING(( "NULL shgossip" ));
     312           0 :     return NULL;
     313           0 :   }
     314             : 
     315           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shgossip, fd_gossip_align() ) ) ) {
     316           0 :     FD_LOG_WARNING(( "misaligned shgossip" ));
     317           0 :     return NULL;
     318           0 :   }
     319             : 
     320           0 :   return (fd_gossip_t *)shgossip;
     321           0 : }
     322             : 
     323             : fd_gossip_metrics_t const *
     324           0 : fd_gossip_metrics( fd_gossip_t const * gossip ) {
     325           0 :   return gossip->metrics;
     326           0 : }
     327             : 
     328             : fd_crds_metrics_t const *
     329           0 : fd_gossip_crds_metrics( fd_gossip_t const * gossip ) {
     330           0 :   return fd_crds_metrics( gossip->crds );
     331           0 : }
     332             : 
     333             : fd_ping_tracker_metrics_t const *
     334           0 : fd_gossip_ping_tracker_metrics( fd_gossip_t const * gossip ) {
     335           0 :   return fd_ping_tracker_metrics( gossip->ping_tracker );
     336           0 : }
     337             : 
     338             : fd_gossip_purged_metrics_t const *
     339           0 : fd_gossip_purged_metrics2( fd_gossip_t const * gossip ) {
     340           0 :   return fd_gossip_purged_metrics( gossip->purged );
     341           0 : }
     342             : 
     343             : fd_active_set_metrics_t const *
     344           0 : fd_gossip_active_set_metrics2( fd_gossip_t const * gossip ) {
     345           0 :   return fd_active_set_metrics( gossip->active_set );
     346           0 : }
     347             : 
     348             : static fd_ip4_port_t
     349           0 : random_entrypoint( fd_gossip_t const * gossip ) {
     350           0 :   ulong idx = fd_rng_ulong_roll( gossip->rng, gossip->entrypoints_cnt );
     351           0 :   return gossip->entrypoints[ idx ];
     352           0 : }
     353             : 
     354             : ulong
     355             : get_stake( fd_gossip_t const * gossip,
     356           0 :            uchar const *       pubkey ) {
     357           0 :   stake_t const * entry = stake_map_ele_query_const( gossip->stake.map, (fd_pubkey_t const *)pubkey, NULL, gossip->stake.pool );
     358           0 :   if( FD_UNLIKELY( !entry ) ) return 0UL;
     359           0 :   return entry->stake;
     360           0 : }
     361             : 
     362             : void
     363             : fd_gossip_set_identity( fd_gossip_t * gossip,
     364             :                         uchar const * identity_pubkey,
     365           0 :                         long          now ) {
     366           0 :   int identity_changed = memcmp( gossip->identity_pubkey, identity_pubkey, 32UL );
     367           0 :   if( FD_UNLIKELY( !identity_changed ) ) return;
     368             : 
     369           0 :   ulong new_ci_idx = fd_crds_ci_idx( gossip->crds, identity_pubkey );
     370             : 
     371             :   /* The new identity may already exist in CRDS as a normal peer (active
     372             :      in the wsample and potentially present in the active set).  We
     373             :      must deactivate it before updating identity_pubkey to maintain the
     374             :      invariant that our own identity is never sampleable. */
     375           0 :   if( FD_UNLIKELY( new_ci_idx!=ULONG_MAX ) ) fd_active_set_remove_peer( gossip->active_set, new_ci_idx );
     376             : 
     377           0 :   fd_memcpy( gossip->identity_pubkey, identity_pubkey, 32UL );
     378           0 :   gossip->identity_stake = get_stake( gossip, identity_pubkey );
     379           0 :   fd_gossip_wsample_set_identity( gossip->wsample, new_ci_idx );
     380           0 :   fd_gossip_wsample_self_stake( gossip->wsample, gossip->identity_stake );
     381           0 :   fd_active_set_set_identity( gossip->active_set, gossip->identity_pubkey, gossip->identity_stake );
     382           0 :   fd_prune_finder_set_identity( gossip->prune_finder, gossip->identity_pubkey, gossip->identity_stake );
     383           0 :   refresh_contact_info( gossip, now );
     384           0 : }
     385             : 
     386             : void
     387             : fd_gossip_set_shred_version( fd_gossip_t * gossip,
     388             :                              ushort        shred_version,
     389           0 :                              long          now ) {
     390           0 :   gossip->my_contact_info.ci->contact_info->shred_version = shred_version;
     391           0 :   refresh_contact_info( gossip, now );
     392           0 : }
     393             : 
     394             : void
     395             : fd_gossip_stakes_update( fd_gossip_t *             gossip,
     396             :                          fd_stake_weight_t const * stake_weights,
     397           0 :                          ulong                     stake_weights_cnt ) {
     398           0 :   stake_map_reset( gossip->stake.map );
     399           0 :   stake_pool_reset( gossip->stake.pool );
     400             : 
     401           0 :   for( ulong i=0UL; i<stake_weights_cnt; i++ ) {
     402           0 :     stake_t * entry = stake_pool_ele_acquire( gossip->stake.pool );
     403           0 :     entry->pubkey = stake_weights[i].key;
     404           0 :     entry->stake  = stake_weights[i].stake;
     405           0 :     stake_map_ele_insert( gossip->stake.map, entry, gossip->stake.pool );
     406           0 :   }
     407             : 
     408           0 :   gossip->identity_stake = get_stake( gossip, gossip->identity_pubkey );
     409           0 :   fd_gossip_wsample_self_stake( gossip->wsample, gossip->identity_stake );
     410           0 :   fd_active_set_set_identity( gossip->active_set, gossip->identity_pubkey, gossip->identity_stake );
     411           0 :   fd_prune_finder_set_identity( gossip->prune_finder, gossip->identity_pubkey, gossip->identity_stake );
     412           0 :   gossip->stake.count = stake_pool_used( gossip->stake.pool );
     413           0 : }
     414             : 
     415             : /* Outbound data budget constants (matching Agave's DataBudget for gossip).
     416             :    Budget is replenished every BUDGET_REPLENISH_INTERVAL_NS with
     417             :    num_staked * BUDGET_BYTES_PER_INTERVAL bytes, capped at
     418             :    BUDGET_MAX_MULTIPLE * num_staked * BUDGET_BYTES_PER_INTERVAL. */
     419             : 
     420             : #define BUDGET_REPLENISH_INTERVAL_NS (100L*1000L*1000L) /* 100 ms */
     421           0 : #define BUDGET_BYTES_PER_INTERVAL    (1024UL)           /* per staked validator */
     422           0 : #define BUDGET_MAX_MULTIPLE          (5UL)              /* max accumulation */
     423           0 : #define BUDGET_MIN_STAKED            (2UL)              /* floor for num_staked */
     424             : 
     425             : /* Lazily replenish the outbound pull-response budget if at least
     426             :    BUDGET_REPLENISH_INTERVAL_NS have elapsed since last replenish.
     427             :    Returns current remaining budget in bytes. */
     428             : 
     429             : static inline ulong
     430             : outbound_budget_replenish( fd_gossip_t * gossip,
     431           0 :                            long          now ) {
     432           0 :   long elapsed = now-gossip->outbound_budget.last_replenish_nanos;
     433             : 
     434           0 :   if( FD_LIKELY( elapsed>=BUDGET_REPLENISH_INTERVAL_NS ) ) {
     435           0 :     ulong num_staked = fd_ulong_max( gossip->stake.count, BUDGET_MIN_STAKED );
     436           0 :     ulong increment  = num_staked * BUDGET_BYTES_PER_INTERVAL;
     437           0 :     ulong cap        = BUDGET_MAX_MULTIPLE * increment;
     438           0 :     ulong remaining  = gossip->outbound_budget.remaining + increment;
     439           0 :     gossip->outbound_budget.remaining            = fd_ulong_min( remaining, cap );
     440           0 :     gossip->outbound_budget.last_replenish_nanos = now;
     441           0 :   }
     442           0 :   return gossip->outbound_budget.remaining;
     443           0 : }
     444             : 
     445             : static inline void
     446             : txbuild_flush( fd_gossip_t *         gossip,
     447             :                fd_gossip_txbuild_t * txbuild,
     448             :                fd_stem_context_t *   stem,
     449             :                fd_ip4_port_t         dest_addr,
     450           0 :                long                  now ) {
     451           0 :   if( FD_UNLIKELY( !txbuild->crds_len ) ) return;
     452             : 
     453             :   /* Debit the outbound data budget (gossip payload bytes only, not
     454             :      including IP/UDP headers — matching Agave's DataBudget which
     455             :      operates on serialized gossip-layer packet sizes). */
     456           0 :   gossip->outbound_budget.remaining -= fd_ulong_min( txbuild->bytes_len, gossip->outbound_budget.remaining );
     457             : 
     458           0 :   gossip->send_fn( gossip->send_ctx, stem, txbuild->bytes, txbuild->bytes_len, &dest_addr, (ulong)now );
     459             : 
     460           0 :   gossip->metrics->message_tx[ txbuild->tag ]++;
     461           0 :   gossip->metrics->message_tx_bytes[ txbuild->tag ] += txbuild->bytes_len+42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
     462           0 :   for( ulong i=0UL; i<txbuild->crds_len; i++ ) {
     463           0 :     gossip->metrics->crds_tx_pull_response[ txbuild->crds[ i ].tag ]++;
     464           0 :     gossip->metrics->crds_tx_pull_response_bytes[ txbuild->crds[ i ].tag ] += txbuild->crds[ i ].sz;
     465           0 :   }
     466             : 
     467           0 :   fd_gossip_txbuild_init( txbuild, gossip->identity_pubkey, txbuild->tag );
     468           0 : }
     469             : 
     470             : static void
     471             : rx_pull_request( fd_gossip_t *                    gossip,
     472             :                  fd_gossip_pull_request_t const * pr_view,
     473             :                  fd_ip4_port_t                    peer_addr,
     474             :                  fd_stem_context_t *              stem,
     475           0 :                  long                             now ) {
     476             :   /* Replenish and check outbound data budget.  If the budget is
     477             :      exhausted, skip generating pull responses entirely. */
     478           0 :   if( FD_UNLIKELY( !outbound_budget_replenish( gossip, now ) ) ) return;
     479             : 
     480             :   /* When responding to a pull request, we skip CRDS entries whose
     481             :      wallclock is newer than the caller's wallclock + a random jitter.
     482             :      The jitter is drawn uniformly from [0, TIMEOUT/4) ms, matching
     483             :      Agave's behavior (CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS = 15000ms).
     484             :      This prevents all responders from consistently excluding the same
     485             :      set of very-recent CRDS values. */
     486           0 : #define FD_GOSSIP_PULL_JITTER_BOUND_MS  (15000UL/4UL)
     487             : 
     488             :   /* Generate a random jitter in [0, 3750) ms, added to the caller's
     489             :      wallclock.  CRDS entries newer than this adjusted threshold are
     490             :      excluded from the response.  The jitter prevents all responders
     491             :      from consistently excluding the same near-boundary entries,
     492             :      improving cluster-wide convergence of recent values. */
     493           0 :   ulong caller_wallclock_ms    = pr_view->contact_info->wallclock;
     494           0 :   ulong jitter_ms              = fd_rng_ulong_roll( gossip->rng, FD_GOSSIP_PULL_JITTER_BOUND_MS );
     495           0 :   ulong adjusted_wallclock_ms  = caller_wallclock_ms + jitter_ms;
     496             : 
     497           0 :   ulong keys[ sizeof(pr_view->crds_filter->filter->keys)/sizeof(ulong) ];
     498           0 :   ulong bits[ sizeof(pr_view->crds_filter->filter->bits)/sizeof(ulong) ];
     499           0 :   fd_memcpy( keys, pr_view->crds_filter->filter->keys, sizeof(pr_view->crds_filter->filter->keys) );
     500           0 :   fd_memcpy( bits, pr_view->crds_filter->filter->bits, sizeof(pr_view->crds_filter->filter->bits) );
     501             : 
     502           0 :   fd_bloom_t filter[1];
     503           0 :   filter->keys_len = pr_view->crds_filter->filter->keys_len;
     504           0 :   filter->keys = keys;
     505             : 
     506           0 :   filter->bits_len = pr_view->crds_filter->filter->bits_len;
     507           0 :   filter->bits     = bits;
     508             : 
     509           0 :   fd_gossip_txbuild_t pull_resp[1];
     510           0 :   fd_gossip_txbuild_init( pull_resp, gossip->identity_pubkey, FD_GOSSIP_MESSAGE_PULL_RESPONSE );
     511             : 
     512           0 :   uchar iter_mem[ 16UL ];
     513             : 
     514           0 :   for( fd_crds_mask_iter_t * it=fd_crds_mask_iter_init( gossip->crds, pr_view->crds_filter->mask, pr_view->crds_filter->mask_bits, iter_mem );
     515           0 :        !fd_crds_mask_iter_done( it, gossip->crds );
     516           0 :        it=fd_crds_mask_iter_next( it, gossip->crds ) ) {
     517           0 :     fd_crds_entry_t const * candidate = fd_crds_mask_iter_entry( it, gossip->crds );
     518             : 
     519             :     /* Skip CRDS entries whose originator wallclock is newer than the
     520             :        caller's wallclock + jitter.  The caller hasn't had time to
     521             :        observe these values yet, so including them would be wasteful. */
     522           0 :     if( FD_UNLIKELY( fd_crds_entry_wallclock( candidate )>adjusted_wallclock_ms ) ) continue;
     523             : 
     524           0 :     if( FD_UNLIKELY( fd_bloom_contains( filter, fd_crds_entry_hash( candidate ), 32UL ) ) ) continue;
     525             : 
     526           0 :     uchar const * crds_val;
     527           0 :     ulong         crds_size;
     528           0 :     fd_crds_entry_value( candidate, &crds_val, &crds_size );
     529           0 :     if( FD_UNLIKELY( !fd_gossip_txbuild_can_fit( pull_resp, crds_size ) ) ) txbuild_flush( gossip, pull_resp, stem, peer_addr, now );
     530           0 :     fd_gossip_txbuild_append( pull_resp, crds_size, crds_val );
     531           0 :     if( FD_UNLIKELY( !gossip->outbound_budget.remaining ) ) break;
     532           0 :   }
     533             : 
     534           0 :   txbuild_flush( gossip, pull_resp, stem, peer_addr, now );
     535           0 : }
     536             : 
     537             : static void
     538             : rx_values( fd_gossip_t *             gossip,
     539             :            ulong                     values_len,
     540             :            fd_gossip_value_t const * values,
     541             :            uchar const *             payload,
     542             :            uchar const *             failed,
     543             :            fd_stem_context_t *       stem,
     544             :            long                      now,
     545           0 :            long                      results[ static 17UL ] ) {
     546           0 :   for( ulong i=0UL; i<values_len; i++ ) {
     547           0 :     fd_gossip_value_t const * value = &values[ i ];
     548             : 
     549           0 :     if( FD_UNLIKELY( failed[ i ] ) ) {
     550           0 :       uchar candidate_hash[ 32UL ];
     551           0 :       fd_sha256_hash( payload+value->offset, value->length, candidate_hash );
     552           0 :       if( FD_LIKELY( failed[ i ]==FD_GOSSIP_FAILED_NO_CONTACT_INFO ) ) fd_gossip_purged_insert_no_contact_info( gossip->purged, value->origin, candidate_hash, now );
     553           0 :       else                                                             fd_gossip_purged_insert_failed_insert( gossip->purged, candidate_hash, now );
     554           0 :       continue;
     555           0 :     }
     556             : 
     557           0 :     ulong origin_stake = get_stake( gossip, value->origin );
     558           0 :     int origin_ping_tracker_active = fd_ping_tracker_active( gossip->ping_tracker, value->origin );
     559           0 :     int is_me = !memcmp( value->origin, gossip->identity_pubkey, 32UL );
     560             : 
     561           0 :     results[ i ] = fd_crds_insert( gossip->crds, value, payload+value->offset, value->length, origin_stake, origin_ping_tracker_active, is_me, now, stem );
     562           0 :     if( FD_UNLIKELY( results[ i ] ) ) continue;
     563             : 
     564           0 :     if( FD_UNLIKELY( value->tag==FD_GOSSIP_VALUE_CONTACT_INFO ) ) {
     565           0 :       fd_ip4_port_t origin_addr = {
     566           0 :         .addr = value->contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0U : value->contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4,
     567           0 :         .port = value->contact_info->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port
     568           0 :       };
     569           0 :       if( FD_LIKELY( !is_me ) ) fd_ping_tracker_track( gossip->ping_tracker, value->origin, origin_stake, origin_addr, now );
     570             : 
     571             :       /* We just learned this peer's contact info.  Drain any
     572             :          no_contact_info hashes associated with this origin from the
     573             :          purged set so peers re-send those CRDS values. */
     574           0 :       if( FD_LIKELY( fd_ping_tracker_active( gossip->ping_tracker, value->origin ) ) ) fd_gossip_purged_drain_no_contact_info( gossip->purged, value->origin );
     575           0 :     }
     576             : 
     577           0 :     fd_active_set_push( gossip->active_set, payload+value->offset, value->length, value->origin, origin_stake, stem, now, 0 );
     578           0 :   }
     579           0 : }
     580             : 
     581             : static void
     582             : rx_pull_response( fd_gossip_t *                     gossip,
     583             :                   fd_gossip_pull_response_t const * pull_response,
     584             :                   uchar const *                     payload,
     585             :                   uchar const *                     failed,
     586             :                   fd_stem_context_t *               stem,
     587           0 :                   long                              now ) {
     588           0 :   long results[ 17UL ];
     589           0 :   rx_values( gossip, pull_response->values_len, pull_response->values, payload, failed, stem, now, results );
     590           0 :   for( ulong i=0UL; i<pull_response->values_len; i++ ) {
     591           0 :     if( FD_UNLIKELY( failed[ i ] ) ) continue;
     592           0 :     if( FD_LIKELY( !results[ i ] ) ) gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_UPSERTED_PULL_RESPONSE_IDX ]++;
     593           0 :     else if( results[ i ]<0L )       gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PULL_RESPONSE_STALE_IDX ]++;
     594           0 :     else                             gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PULL_RESPONSE_DUPLICATE_IDX ]++;
     595           0 :   }
     596           0 : }
     597             : 
     598             : /* tx_prune constructs, signs, and sends a prune message telling
     599             :    `relayer` to stop pushing CRDS values originating from `origin`.
     600             : 
     601             :    On-wire layout (bincode):
     602             :      Protocol tag        4  (FD_GOSSIP_MESSAGE_PRUNE = 3)
     603             :      sender pubkey      32  (= identity_pubkey, outer PruneMessage field)
     604             :      PruneData.pubkey   32  (= identity_pubkey)
     605             :      prunes_len          8
     606             :      prunes[1]          32
     607             :      signature          64
     608             :      destination        32
     609             :      wallclock           8
     610             : 
     611             :    The signable data (input to Ed25519 sign) is the PruneData fields
     612             :    excluding signature:
     613             :      prefix[26] + pubkey[32] + prunes_len[8] + prunes[32] + destination[32] + wallclock[8]
     614             :    This must match fd_keyguard_payload_matches_prune_data (106 + 32 bytes). */
     615             : 
     616             : static void
     617             : tx_prune( fd_gossip_t *       gossip,
     618             :           uchar const *       relayer,
     619             :           uchar const *       origin,
     620             :           fd_stem_context_t * stem,
     621           0 :           long                now ) {
     622           0 :   ulong ci_idx = fd_crds_ci_idx( gossip->crds, relayer );
     623           0 :   if( FD_UNLIKELY( ci_idx==ULONG_MAX ) ) return;
     624             : 
     625           0 :   fd_gossip_contact_info_t const * ci = fd_crds_ci( gossip->crds, ci_idx );
     626           0 :   fd_ip4_port_t dest_addr = {
     627           0 :     .addr = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0U : ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4,
     628           0 :     .port = ci->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port
     629           0 :   };
     630           0 :   if( FD_UNLIKELY( !dest_addr.addr || !dest_addr.port ) ) return;
     631             : 
     632           0 :   ulong wallclock = (ulong)FD_NANOSEC_TO_MILLI( now );
     633             : 
     634             :   /* Build the signable payload:
     635             :      prefix[26] + pubkey[32] + prunes_len[8] + prunes[32] + destination[32] + wallclock[8] */
     636           0 :   uchar signable[ 26UL + 32UL + 8UL + 32UL + 32UL + 8UL ];
     637           0 :   uchar * p = signable;
     638           0 :   FD_STORE( ulong, p, 18UL );                    p += 8UL;
     639           0 :   fd_memcpy( p, "\xffSOLANA_PRUNE_DATA", 18UL ); p += 18UL;
     640           0 :   fd_memcpy( p, gossip->identity_pubkey, 32UL ); p += 32UL;
     641           0 :   FD_STORE( ulong, p, 1UL );                     p += 8UL;
     642           0 :   fd_memcpy( p, origin, 32UL );                  p += 32UL;
     643           0 :   fd_memcpy( p, relayer, 32UL );                 p += 32UL;
     644           0 :   FD_STORE( ulong, p, wallclock );               p += 8UL;
     645             : 
     646           0 :   uchar signature[ 64UL ];
     647           0 :   gossip->sign_fn( gossip->sign_ctx, signable, sizeof(signable), FD_KEYGUARD_SIGN_TYPE_ED25519, signature );
     648             : 
     649             :   /* Build the on-wire packet:
     650             :      tag(4) + sender(32) + pubkey(32) + prunes_len(8) + prunes[32]
     651             :      + signature(64) + destination(32) + wallclock(8) */
     652           0 :   uchar pkt[ 4UL + 32UL + 32UL + 8UL + 32UL + 64UL + 32UL + 8UL ];
     653           0 :   uchar * q = pkt;
     654           0 :   FD_STORE( uint, q, FD_GOSSIP_MESSAGE_PRUNE );  q += 4UL;
     655           0 :   fd_memcpy( q, gossip->identity_pubkey, 32UL ); q += 32UL;  /* sender */
     656           0 :   fd_memcpy( q, gossip->identity_pubkey, 32UL ); q += 32UL;  /* PruneData.pubkey */
     657           0 :   FD_STORE( ulong, q, 1UL );                     q += 8UL;
     658           0 :   fd_memcpy( q, origin, 32UL );                  q += 32UL;
     659           0 :   fd_memcpy( q, signature, 64UL );               q += 64UL;
     660           0 :   fd_memcpy( q, relayer, 32UL );                 q += 32UL;
     661           0 :   FD_STORE( ulong, q, wallclock );               q += 8UL;
     662             : 
     663           0 :   gossip->send_fn( gossip->send_ctx, stem, pkt, sizeof(pkt), &dest_addr, (ulong)now );
     664             : 
     665           0 :   gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PRUNE ]++;
     666           0 :   gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PRUNE ] += sizeof(pkt) + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
     667           0 : }
     668             : 
     669             : static void
     670             : tx_prunes( fd_gossip_t *       gossip,
     671             :              fd_stem_context_t * stem,
     672           0 :              long                now ) {
     673           0 :   uchar const * relayer;
     674           0 :   uchar const * origin;
     675           0 :   while( fd_prune_finder_pop_prune( gossip->prune_finder, &relayer, &origin ) ) {
     676           0 :     tx_prune( gossip, relayer, origin, stem, now );
     677           0 :   }
     678           0 : }
     679             : 
     680             : static void
     681             : rx_push( fd_gossip_t *            gossip,
     682             :          fd_gossip_push_t const * push,
     683             :          uchar const *            payload,
     684             :          uchar const *            failed,
     685             :          long                     now,
     686           0 :          fd_stem_context_t *      stem ) {
     687           0 :   long results[ 17UL ];
     688           0 :   rx_values( gossip, push->values_len, push->values, payload, failed, stem, now, results );
     689             : 
     690           0 :   for( ulong i=0UL; i<push->values_len; i++ ) {
     691           0 :     if( FD_UNLIKELY( failed[ i ] ) ) continue;
     692           0 :     if( FD_LIKELY( !results[ i ] ) ) gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_UPSERTED_PUSH_IDX ]++;
     693           0 :     else if( results[ i ]<0L )       gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PUSH_STALE_IDX ]++;
     694           0 :     else                             gossip->metrics->crds_rx_count[ FD_METRICS_ENUM_GOSSIP_CRDS_OUTCOME_V_DROPPED_PUSH_DUPLICATE_IDX ]++;
     695             : 
     696           0 :     ulong num_dups;
     697           0 :     if( FD_LIKELY( !results[ i ] ) )          num_dups = 0UL;
     698           0 :     else if( FD_UNLIKELY( results[ i ]<0L ) ) num_dups = ULONG_MAX; /* stale => never timely */
     699           0 :     else                                      num_dups = (ulong)results[ i ];
     700             : 
     701           0 :     ulong origin_stake = get_stake( gossip, push->values[ i ].origin );
     702           0 :     fd_prune_finder_record( gossip->prune_finder, push->values[ i ].origin, origin_stake, push->from, get_stake( gossip, push->from ), num_dups );
     703           0 :   }
     704             : 
     705           0 :   tx_prunes( gossip, stem, now );
     706           0 : }
     707             : 
     708             : static void
     709             : rx_prune( fd_gossip_t *             gossip,
     710           0 :           fd_gossip_prune_t const * prune ) {
     711           0 :   for( ulong i=0UL; i<prune->prunes_len; i++ ) {
     712           0 :     fd_active_set_prune( gossip->active_set,
     713           0 :                          prune->pubkey,
     714           0 :                          prune->prunes[ i ],
     715           0 :                          get_stake( gossip, prune->prunes[ i ] ) );
     716           0 :   }
     717           0 : }
     718             : 
     719             : 
     720             : static void
     721             : rx_ping( fd_gossip_t *            gossip,
     722             :          fd_gossip_ping_t const * ping,
     723             :          fd_ip4_port_t            peer_address,
     724             :          fd_stem_context_t *      stem,
     725           0 :          long                     now ) {
     726           0 :   uchar out_payload[ sizeof(fd_gossip_pong_t)+4UL];
     727           0 :   FD_STORE( uint, out_payload, FD_GOSSIP_MESSAGE_PONG );
     728             : 
     729           0 :   fd_gossip_pong_t * out_pong = (fd_gossip_pong_t *)(out_payload + 4UL);
     730           0 :   fd_memcpy( out_pong->from, gossip->identity_pubkey, 32UL );
     731             : 
     732             :   /* fd_keyguard checks payloads for certain patterns before performing the
     733             :      sign. Pattern-matching can't be done on hashed data, so we need
     734             :      to supply the pre-hashed image to the sign fn (fd_keyguard will hash when
     735             :      supplied with FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519) while also hashing
     736             :      the image ourselves onto pong->ping_hash */
     737             : 
     738           0 :   uchar pre_image[ 48UL ];
     739           0 :   fd_memcpy( pre_image, "SOLANA_PING_PONG", 16UL );
     740           0 :   fd_memcpy( pre_image+16UL, ping->token, 32UL );
     741             : 
     742           0 :   fd_sha256_hash( pre_image, 48UL, out_pong->hash );
     743             : 
     744           0 :   gossip->sign_fn( gossip->sign_ctx, pre_image, 48UL, FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519, out_pong->signature );
     745           0 :   gossip->send_fn( gossip->send_ctx, stem, out_payload, sizeof(out_payload), &peer_address, (ulong)now );
     746             : 
     747           0 :   gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PONG ]++;
     748           0 :   gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PONG ] += sizeof(out_payload)+42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
     749           0 : }
     750             : 
     751             : static void
     752             : rx_pong( fd_gossip_t *            gossip,
     753             :          fd_gossip_pong_t const * pong,
     754             :          fd_ip4_port_t            peer_address,
     755           0 :          long                     now ) {
     756           0 :   ulong stake = get_stake( gossip, pong->from );
     757           0 :   fd_ping_tracker_register( gossip->ping_tracker, pong->from, stake, peer_address, pong->hash, now );
     758           0 : }
     759             : 
     760             : void
     761             : fd_gossip_rx( fd_gossip_t *       gossip,
     762             :               fd_ip4_port_t       peer,
     763             :               uchar const *       data,
     764             :               ulong               data_sz,
     765             :               long                now,
     766           0 :               fd_stem_context_t * stem ) {
     767             :   /* TODO: Implement traffic shaper / bandwidth limiter */
     768           0 :   FD_TEST( data_sz>=sizeof(fd_gossip_message_t)+FD_GOSSIP_MESSAGE_MAX_CRDS );
     769           0 :   fd_gossip_message_t const * message = (fd_gossip_message_t const *)data;
     770           0 :   uchar const *               failed  = data+sizeof(fd_gossip_message_t);
     771           0 :   uchar const *               payload = data+sizeof(fd_gossip_message_t)+FD_GOSSIP_MESSAGE_MAX_CRDS;
     772             : 
     773           0 :   switch( message->tag ) {
     774           0 :     case FD_GOSSIP_MESSAGE_PULL_REQUEST:  rx_pull_request( gossip, message->pull_request, peer, stem, now );              break;
     775           0 :     case FD_GOSSIP_MESSAGE_PULL_RESPONSE: rx_pull_response( gossip, message->pull_response, payload, failed, stem, now ); break;
     776           0 :     case FD_GOSSIP_MESSAGE_PUSH:          rx_push( gossip, message->push, payload, failed, now, stem );                   break;
     777           0 :     case FD_GOSSIP_MESSAGE_PRUNE:         rx_prune( gossip, message->prune );                                             break;
     778           0 :     case FD_GOSSIP_MESSAGE_PING:          rx_ping( gossip, message->ping, peer, stem, now );                              break;
     779           0 :     case FD_GOSSIP_MESSAGE_PONG:          rx_pong( gossip, message->pong, peer, now );                                    break;
     780           0 :     default:
     781           0 :       FD_LOG_CRIT(( "Unknown gossip message type %u", message->tag ));
     782           0 :       break;
     783           0 :   }
     784           0 : }
     785             : 
     786             : static int
     787             : fd_gossip_push( fd_gossip_t *             gossip,
     788             :                 fd_gossip_value_t const * value,
     789             :                 fd_stem_context_t *       stem,
     790           0 :                 long                      now ) {
     791           0 :   uchar serialized[ FD_GOSSIP_VALUE_MAX_SZ ];
     792           0 :   long serialized_sz = fd_gossip_value_serialize( value, serialized, sizeof(serialized) );
     793           0 :   FD_TEST( serialized_sz!=-1L );
     794           0 :   gossip->sign_fn( gossip->sign_ctx, serialized+64UL, (ulong)serialized_sz-64UL, FD_KEYGUARD_SIGN_TYPE_ED25519, serialized );
     795             : 
     796           0 :   int origin_active = 0; /* Value doesn't matter, since is_me=1 it's never used. */
     797           0 :   if( FD_UNLIKELY( fd_crds_insert( gossip->crds, value, serialized, (ulong)serialized_sz, gossip->identity_stake, origin_active, 1, now, stem ) ) ) return -1;
     798             : 
     799           0 :   fd_active_set_push( gossip->active_set, serialized, (ulong)serialized_sz, gossip->identity_pubkey, gossip->identity_stake, stem, now, 1 );
     800           0 :   return 0;
     801           0 : }
     802             : 
     803             : int
     804             : fd_gossip_push_vote( fd_gossip_t *       gossip,
     805             :                      uchar const *       txn,
     806             :                      ulong               txn_sz,
     807             :                      fd_stem_context_t * stem,
     808           0 :                      long                now ) {
     809           0 :   fd_gossip_value_t value = {
     810           0 :     .tag = FD_GOSSIP_VALUE_VOTE,
     811           0 :     .wallclock = (ulong)FD_NANOSEC_TO_MILLI( now ),
     812           0 :     .vote = {{
     813           0 :       .index = 0UL, /* TODO */
     814           0 :       .transaction_len = txn_sz,
     815           0 :     }}
     816           0 :   };
     817           0 :   fd_memcpy( value.origin, gossip->identity_pubkey, 32UL );
     818           0 :   FD_TEST( txn_sz<=sizeof(value.vote->transaction) );
     819           0 :   fd_memcpy( value.vote->transaction, txn, txn_sz );
     820             : 
     821           0 :   return fd_gossip_push( gossip, &value, stem, now );
     822           0 : }
     823             : 
     824             : int
     825             : fd_gossip_push_duplicate_shred( fd_gossip_t *                       gossip,
     826             :                                 fd_gossip_duplicate_shred_t const * duplicate_shred,
     827             :                                 fd_stem_context_t *                 stem,
     828           0 :                                 long                                now ) {
     829           0 :   fd_gossip_value_t value = {
     830           0 :     .tag = FD_GOSSIP_VALUE_DUPLICATE_SHRED,
     831           0 :     .wallclock = (ulong)FD_NANOSEC_TO_MILLI( now ),
     832           0 :   };
     833           0 :   fd_memcpy( value.origin, gossip->identity_pubkey, 32UL );
     834           0 :   *value.duplicate_shred = *duplicate_shred;
     835             : 
     836           0 :   return fd_gossip_push( gossip, &value, stem, now );
     837           0 : }
     838             : 
     839             : static void
     840             : tx_ping( fd_gossip_t *       gossip,
     841             :          fd_stem_context_t * stem,
     842             :          long                now,
     843           0 :          int *               charge_busy ) {
     844           0 :   uchar out_payload[ sizeof(fd_gossip_ping_t) + 4UL ];
     845           0 :   FD_STORE( uint, out_payload, FD_GOSSIP_MESSAGE_PING );
     846             : 
     847           0 :   fd_gossip_ping_t * out_ping = (fd_gossip_ping_t *)( out_payload+4UL );
     848           0 :   fd_memcpy( out_ping->from, gossip->identity_pubkey, 32UL );
     849             : 
     850           0 :   uchar const *         peer_pubkey;
     851           0 :   uchar const *         ping_token;
     852           0 :   fd_ip4_port_t const * peer_address;
     853           0 :   while( fd_ping_tracker_pop_request( gossip->ping_tracker,
     854           0 :                                       now,
     855           0 :                                       &peer_pubkey,
     856           0 :                                       &peer_address,
     857           0 :                                       &ping_token ) ) {
     858           0 :     fd_memcpy( out_ping->token, ping_token, 32UL );
     859             : 
     860           0 :     gossip->sign_fn( gossip->sign_ctx, out_ping->token, 32UL, FD_KEYGUARD_SIGN_TYPE_ED25519, out_ping->signature );
     861           0 :     gossip->send_fn( gossip->send_ctx, stem, out_payload, sizeof(out_payload), peer_address, (ulong)now );
     862             : 
     863           0 :     gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PING ]++;
     864           0 :     gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PING ] += sizeof(out_payload) + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
     865           0 :     if( charge_busy ) *charge_busy = 1;
     866           0 :   }
     867           0 : }
     868             : 
     869             : /* Construct and send a pull request to a random peer.  The pull
     870             :    request contains a bloom filter over our known CRDS hashes so that
     871             :    the peer can respond with values we are missing.
     872             : 
     873             :    NOTE: Divergence from Agave:
     874             :     - Agave builds up to 2^mask_bits filters per pull period
     875             :       (sampling up to 1024), each covering a distinct partition of
     876             :       the hash space.  We build and send exactly one filter per
     877             :       pull period, covering 1/2^mask_bits of the space.
     878             : 
     879             :    Maximum bloom filter bits in a PullRequest packet:
     880             : 
     881             :      PACKET_DATA_SIZE             = 1232   (= 1280 - 40 - 8)
     882             : 
     883             :      Bytes consumed by non-bloom fields:
     884             :        discriminant(4) + keys_len(8) + keys(8*num_keys) +
     885             :        has_bits(1) + bloom_vec_len(8) + bloom_bits_count(8) +
     886             :        bloom_num_bits_set(8) + mask(8) + mask_bits(4)
     887             :        + contact_info_crds_val(crds_val_sz)
     888             :        = 49 + 8*num_keys + crds_val_sz
     889             : 
     890             :      The bitvec is serialized as u64 words, so the bitvec storage is
     891             :      ceil(num_bits/64)*8 bytes.  The remaining packet bytes must
     892             :      accommodate this.
     893             : 
     894             :      Agave determines the max_bytes parameter (input to Bloom::random)
     895             :      via an empirical cache (get_max_bloom_filter_bytes).  max_bytes*8
     896             :      is passed as the max_bits cap to Bloom::random, but actual
     897             :      num_bits is only ~83% of max_bits (the E/D ratio for p=0.1).
     898             :      We replicate this with a closed-form inversion: the largest
     899             :      max_bytes where ceil(num_bits/64)*8 fits in remaining space is
     900             :      max_bytes = floor(D * floor(64*W/E) / 8), where W is the max
     901             :      number of u64 words, E and D are the bloom filter constants.
     902             : 
     903             :      num_keys depends on the bloom sizing, which depends on the
     904             :      overhead, which depends on num_keys.  However there is a closed
     905             :      form: compute num_keys from the pessimistic KEYS=8 overhead, then
     906             :      recompute the tight overhead with the true num_keys.  This always
     907             :      converges in one step because the optimal key count is
     908             :      D*ln(2) ≈ 3.32 (where D = ln(p)/ln(1/2^ln2)), far from any
     909             :      rounding boundary.  For p=0.1 and KEYS=8, num_keys is always 3.
     910             : 
     911             :      NB: The has_bits(1) + bloom_vec_len(8) are only written when
     912             :      num_bits>=1.  fd_bloom_num_bits clamps to [1, max_bits], so
     913             :      num_bits>=1 always holds and this layout is correct. */
     914             : 
     915             : static void
     916             : tx_pull_request( fd_gossip_t *       gossip,
     917             :                  fd_stem_context_t * stem,
     918           0 :                  long                now ) {
     919           0 :   ulong total_crds_vals = fd_crds_len( gossip->crds ) + fd_gossip_purged_len( gossip->purged );
     920           0 :   ulong num_items       = fd_ulong_max( 65536UL, total_crds_vals );
     921           0 :   ulong crds_val_sz     = gossip->my_contact_info.crds_val_sz;
     922             : 
     923             :   /* Step 1: Compute num_keys from the pessimistic KEYS=8 overhead
     924             :      (same initial estimate Agave uses in CrdsFilterSet::new). */
     925           0 :   ulong  pessimistic_overhead = 49UL + 8UL*(ulong)BLOOM_NUM_KEYS + crds_val_sz;
     926           0 :   FD_TEST( pessimistic_overhead<FD_GOSSIP_MTU );
     927           0 :   double pessimistic_max_bits = (double)( 8UL*( FD_GOSSIP_MTU - pessimistic_overhead ) );
     928           0 :   double pessimistic_items    = fd_bloom_max_items( pessimistic_max_bits, BLOOM_NUM_KEYS, BLOOM_FALSE_POSITIVE_RATE );
     929           0 :   FD_TEST( pessimistic_items>0.0 );
     930           0 :   ulong  pessimistic_num_bits = fd_bloom_num_bits( pessimistic_items, BLOOM_FALSE_POSITIVE_RATE, pessimistic_max_bits );
     931           0 :   ulong  num_keys             = fd_bloom_num_keys( (double)pessimistic_num_bits, pessimistic_items );
     932             : 
     933             :   /* Step 2: Recompute with the tight overhead using the true num_keys.
     934             :      Find the largest max_bytes parameter (matching Agave's
     935             :      get_max_bloom_filter_bytes cache) such that the resulting bitvec
     936             :      fits in the remaining packet space.
     937             : 
     938             :      Given:
     939             :        max_items = ceil(max_bits / D)   where D = -K / ln(1-exp(ln(p)/K))
     940             :        num_bits  = ceil(max_items * E)  where E = ln(p) / ln(1/2^ln2)
     941             : 
     942             :      We need ceil(num_bits/64)*8 <= remaining, i.e. num_bits <= 64*W
     943             :      where W = floor(remaining/8).  Working backwards:
     944             :        max_items <= I  where I = floor(64*W / E)
     945             :        max_bytes <= D*I / 8
     946             : 
     947             :      So max_bytes = floor(D * floor(64*W/E) / 8). */
     948           0 :   ulong  overhead       = 49UL + 8UL*num_keys + crds_val_sz;
     949           0 :   FD_TEST( overhead<FD_GOSSIP_MTU );
     950           0 :   ulong  remaining      = FD_GOSSIP_MTU - overhead;
     951           0 :   ulong  max_words      = remaining / 8UL; /* max u64 words for bitvec */
     952             : 
     953           0 :   double E = log( BLOOM_FALSE_POSITIVE_RATE ) / log( 1.0 / pow( 2.0, log( 2.0 ) ) );
     954           0 :   double D = -BLOOM_NUM_KEYS / log( 1.0 - exp( log( BLOOM_FALSE_POSITIVE_RATE ) / BLOOM_NUM_KEYS ) );
     955           0 :   ulong  I = (ulong)floor( 64.0 * (double)max_words / E );
     956           0 :   ulong  max_bytes = (ulong)floor( D * (double)I / 8.0 );
     957             : 
     958           0 :   double max_bits  = (double)( max_bytes * 8UL );
     959           0 :   double max_items = fd_bloom_max_items( max_bits, BLOOM_NUM_KEYS, BLOOM_FALSE_POSITIVE_RATE );
     960           0 :   FD_TEST( max_items>0.0 );
     961           0 :   ulong  num_bits  = fd_bloom_num_bits( max_items, BLOOM_FALSE_POSITIVE_RATE, max_bits );
     962           0 :   FD_TEST( num_bits>=1UL );
     963           0 :   FD_TEST( (num_bits+63UL)/64UL<=max_words ); /* verify bitvec fits */
     964           0 :   FD_TEST( fd_bloom_num_keys( (double)num_bits, max_items )==num_keys ); /* verify convergence */
     965             : 
     966           0 :   double _mask_bits     = ceil( log2( (double)num_items / max_items ) );
     967           0 :   uint   mask_bits      = _mask_bits >= 0.0 ? fd_uint_min( (uint)_mask_bits, 63U ) : 0U;
     968           0 :   ulong  mask           = fd_rng_ulong( gossip->rng ) | (~0UL>>(mask_bits));
     969             : 
     970           0 :   uchar payload[ FD_GOSSIP_MTU ] = {0};
     971             : 
     972           0 :   ulong * keys_ptr, * bits_ptr, * bits_set;
     973           0 :   long payload_sz = fd_gossip_pull_request_init( payload,
     974           0 :                                                  FD_GOSSIP_MTU,
     975           0 :                                                  num_keys,
     976           0 :                                                  num_bits,
     977           0 :                                                  mask,
     978           0 :                                                  mask_bits,
     979           0 :                                                  gossip->my_contact_info.crds_val,
     980           0 :                                                  gossip->my_contact_info.crds_val_sz,
     981           0 :                                                  &keys_ptr,
     982           0 :                                                  &bits_ptr,
     983           0 :                                                  &bits_set );
     984           0 :   FD_TEST( -1L!=payload_sz );
     985             : 
     986           0 :   fd_bloom_t filter[1];
     987           0 :   fd_bloom_init_inplace( keys_ptr, bits_ptr, num_keys, num_bits, 0, gossip->rng, BLOOM_FALSE_POSITIVE_RATE, filter );
     988             : 
     989           0 :   uchar iter_mem[ 16UL ];
     990           0 :   for( fd_crds_mask_iter_t * it = fd_crds_mask_iter_init( gossip->crds, mask, mask_bits, iter_mem );
     991           0 :        !fd_crds_mask_iter_done( it, gossip->crds );
     992           0 :        it = fd_crds_mask_iter_next( it, gossip->crds ) ) {
     993           0 :     fd_bloom_insert( filter, fd_crds_entry_hash( fd_crds_mask_iter_entry( it, gossip->crds ) ), 32UL );
     994           0 :   }
     995             : 
     996           0 :   for( fd_gossip_purged_mask_iter_t * it = fd_gossip_purged_mask_iter_init( gossip->purged, mask, mask_bits, iter_mem );
     997           0 :        !fd_gossip_purged_mask_iter_done( it, gossip->purged );
     998           0 :        it = fd_gossip_purged_mask_iter_next( it, gossip->purged ) ){
     999           0 :     fd_bloom_insert( filter, fd_gossip_purged_mask_iter_hash( it, gossip->purged ), 32UL );
    1000           0 :   }
    1001             : 
    1002           0 :   int num_bits_set = 0;
    1003           0 :   for( ulong i=0UL; i<(num_bits+63)/64UL; i++ ) num_bits_set += fd_ulong_popcnt( bits_ptr[ i ] );
    1004           0 :   *bits_set = (ulong)num_bits_set;
    1005             : 
    1006           0 :   ulong idx = fd_gossip_wsample_sample_pull_request( gossip->wsample );
    1007           0 :   fd_ip4_port_t peer_addr;
    1008           0 :   if( FD_UNLIKELY( idx==ULONG_MAX ) ) {
    1009           0 :     if( FD_UNLIKELY( !gossip->entrypoints_cnt ) ) {
    1010             :       /* We are the bootstrapping node, and nobody else is present in
    1011             :          the cluster.  Nowhere to send the pull request. */
    1012           0 :       return;
    1013           0 :     }
    1014           0 :     peer_addr = random_entrypoint( gossip );
    1015           0 :   } else {
    1016           0 :     fd_gossip_contact_info_t const * peer = fd_crds_ci( gossip->crds, idx );
    1017           0 :     peer_addr.addr = peer->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].is_ipv6 ? 0 : peer->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].ip4;
    1018           0 :     peer_addr.port = peer->sockets[ FD_GOSSIP_CONTACT_INFO_SOCKET_GOSSIP ].port;
    1019           0 :   }
    1020           0 :   gossip->send_fn( gossip->send_ctx, stem, payload, (ulong)payload_sz, &peer_addr, (ulong)now );
    1021             : 
    1022           0 :   gossip->metrics->message_tx[ FD_GOSSIP_MESSAGE_PULL_REQUEST ]++;
    1023           0 :   gossip->metrics->message_tx_bytes[ FD_GOSSIP_MESSAGE_PULL_REQUEST ] += (ulong)payload_sz + 42UL; /* 42 = sizeof(fd_ip4_udp_hdrs_t) */
    1024           0 : }
    1025             : 
    1026             : void
    1027             : fd_gossip_advance( fd_gossip_t *       gossip,
    1028             :                    long                now,
    1029             :                    fd_stem_context_t * stem,
    1030           0 :                    int *               charge_busy ) {
    1031           0 :   outbound_budget_replenish( gossip, now );
    1032             : 
    1033           0 :   fd_gossip_purged_expire( gossip->purged, now );
    1034           0 :   fd_active_set_advance( gossip->active_set, stem, now, charge_busy );
    1035           0 :   fd_crds_advance( gossip->crds, now, stem, charge_busy );
    1036             : 
    1037           0 :   tx_ping( gossip, stem, now, charge_busy );
    1038           0 :   if( FD_UNLIKELY( now>=gossip->timers.next_pull_request ) ) {
    1039           0 :     tx_pull_request( gossip, stem, now );
    1040           0 :     if( charge_busy ) *charge_busy = 1;
    1041             :     /* 1.6ms (625/s).  Agave sends min(1024, ceil(2^mask_bits/8))
    1042             :        filters every 500ms.  For a typical mainnet table (~65k items,
    1043             :        mask_bits≈7) that is ~16 filters/500ms = one every 31ms.  We
    1044             :        send a single filter per round, so we fire ~20× more often to
    1045             :        compensate for sending one filter instead of many per period.
    1046             : 
    1047             :        We considered dynamically matching Agave's exact rate by
    1048             :        computing 500ms/filters_per_round from mask_bits each round,
    1049             :        but this caused slow table fill on startup (mask_bits starts
    1050             :        low -> long intervals -> few pulls -> slow CRDS population).
    1051             :        Adaptive boosting (counter-based, timestamp-based, and
    1052             :        threshold-based) all added complexity without clear benefit:
    1053             :        counter decay lost state between send and response arrival,
    1054             :        timestamp checks never disarmed because trickle inserts kept
    1055             :        refreshing the window, and threshold heuristics required
    1056             :        tuning constants that varied by cluster size.
    1057             : 
    1058             :        A fixed 1.6ms is simpler and robust: the cost of a redundant
    1059             :        pull request is negligible (a single 1232-byte packet whose
    1060             :        reply will be empty if we're already caught up), and it
    1061             :        guarantees fast table fill on startup without any adaptive
    1062             :        machinery. */
    1063           0 :     gossip->timers.next_pull_request = now+1600L*1000L;
    1064           0 :   }
    1065           0 :   if( FD_UNLIKELY( now>=gossip->timers.next_contact_info_refresh ) ) {
    1066             :     /* TODO: Frequency of this? More often if observing? */
    1067           0 :     refresh_contact_info( gossip, now );
    1068           0 :     int origin_active = 0; /* Value doesn't matter, since is_me=1 it's never used. */
    1069           0 :     fd_crds_insert( gossip->crds, gossip->my_contact_info.ci, gossip->my_contact_info.crds_val, gossip->my_contact_info.crds_val_sz, gossip->identity_stake, origin_active, 1, now, stem );
    1070           0 :     fd_active_set_push( gossip->active_set, gossip->my_contact_info.crds_val, gossip->my_contact_info.crds_val_sz, gossip->identity_pubkey, gossip->identity_stake, stem, now, 1 );
    1071           0 :     gossip->timers.next_contact_info_refresh = now+15L*500L*1000L*1000L; /* TODO: Jitter */
    1072           0 :     if( charge_busy ) *charge_busy = 1;
    1073           0 :   }
    1074           0 : }
    1075             : 
    1076             : void
    1077             : fd_gossip_ping_tracker_track( fd_gossip_t * gossip,
    1078             :                               uchar const * peer_pubkey,
    1079             :                               fd_ip4_port_t peer_address,
    1080           0 :                               long          now ) {
    1081           0 :   ulong origin_stake = get_stake( gossip, peer_pubkey );
    1082           0 :   fd_ping_tracker_track( gossip->ping_tracker, peer_pubkey, origin_stake, peer_address, now );
    1083           0 : }

Generated by: LCOV version 1.14