LCOV - code coverage report
Current view: top level - discof/restore/utils - fd_sspeer_selector.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 374 424 88.2 %
Date: 2026-06-29 05:51:35 Functions: 18 18 100.0 %

          Line data    Source code
       1             : #include "fd_sspeer_selector.h"
       2             : #include "../../../util/bits/fd_sat.h"
       3             : #include "../../../util/log/fd_log.h"
       4             : 
       5             : struct fd_sspeer_private {
       6             :   fd_sspeer_key_t key;
       7             :   fd_ip4_port_t addr;
       8             :   ulong         full_slot;
       9             :   ulong         incr_slot;
      10             :   uchar         full_hash[ FD_HASH_FOOTPRINT ];
      11             :   uchar         incr_hash[ FD_HASH_FOOTPRINT ];
      12             :   ulong         latency;
      13             :   ulong         score;
      14             :   int           valid;
      15             : 
      16             :   struct {
      17             :     ulong next;
      18             :   } pool;
      19             : 
      20             :   struct {
      21             :     ulong next;
      22             :     ulong prev;
      23             :   } map_by_key;
      24             : 
      25             :   struct {
      26             :     ulong next;
      27             :     ulong prev;
      28             :   } map_by_addr;
      29             : 
      30             :   struct {
      31             :     ulong parent;
      32             :     ulong left;
      33             :     ulong right;
      34             :     ulong prio;
      35             :   } score_treap;
      36             : };
      37             : 
      38             : typedef struct fd_sspeer_private fd_sspeer_private_t;
      39             : 
      40             : #define POOL_NAME  peer_pool
      41         372 : #define POOL_T     fd_sspeer_private_t
      42             : #define POOL_IDX_T ulong
      43    14356341 : #define POOL_NEXT  pool.next
      44             : #include "../../../util/tmpl/fd_pool.c"
      45             : 
      46             : #define MAP_NAME               peer_map_by_key
      47        1611 : #define MAP_KEY                key
      48         813 : #define MAP_ELE_T              fd_sspeer_private_t
      49             : #define MAP_KEY_T              fd_sspeer_key_t
      50        1092 : #define MAP_PREV               map_by_key.prev
      51        2490 : #define MAP_NEXT               map_by_key.next
      52         681 : #define MAP_KEY_EQ(k0,k1)      (fd_sspeer_key_eq(k0,k1))
      53        2766 : #define MAP_KEY_HASH(key,seed) (fd_sspeer_key_hash(key,seed))
      54             : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
      55             : #include "../../../util/tmpl/fd_map_chain.c"
      56             : 
      57             : #define MAP_NAME               peer_map_by_addr
      58        1503 : #define MAP_KEY                addr
      59         786 : #define MAP_ELE_T              fd_sspeer_private_t
      60             : #define MAP_KEY_T              fd_ip4_port_t
      61        1122 : #define MAP_PREV               map_by_addr.prev
      62        2184 : #define MAP_NEXT               map_by_addr.next
      63          81 : #define MAP_KEY_EQ(k0,k1)      ((k0)->l==(k1)->l)
      64        1629 : #define MAP_KEY_HASH(key,seed) (seed^(key)->l)
      65             : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
      66             : #define MAP_MULTI              1
      67             : #include "../../../util/tmpl/fd_map_chain.c"
      68             : 
      69        1347 : #define COMPARE_WORSE(x,y) ( (x)->score<(y)->score )
      70             : 
      71             : #define TREAP_T         fd_sspeer_private_t
      72             : #define TREAP_NAME      score_treap
      73             : #define TREAP_QUERY_T   void *                                         /* We don't use query ... */
      74             : #define TREAP_CMP(a,b)  (__extension__({ (void)(a); (void)(b); -1; })) /* which means we don't need to give a real
      75             :                                                                           implementation to cmp either */
      76        5028 : #define TREAP_IDX_T     ulong
      77        1347 : #define TREAP_LT        COMPARE_WORSE
      78        3939 : #define TREAP_PARENT    score_treap.parent
      79        3759 : #define TREAP_LEFT      score_treap.left
      80        2532 : #define TREAP_RIGHT     score_treap.right
      81    14356635 : #define TREAP_PRIO      score_treap.prio
      82             : #include "../../../util/tmpl/fd_treap.c"
      83             : 
      84         906 : #define DEFAULT_SLOTS_BEHIND         (1000UL*1000UL) /* 1,000,000 slots behind */
      85             : /* Intentionally less than DEFAULT_SLOTS_BEHIND so that peers with an
      86             :    unknown slot are penalized more than the worst known peer. */
      87         873 : #define MAX_SLOTS_BEHIND_CAP         (864000UL)      /* ~2 epochs worth of slots */
      88             : /* Assumed latency (in nanos) for peers that have not been pinged yet.
      89             :    Pings are sent immediately on peer discovery, so this default is
      90             :    short-lived.  100ms is a neutral middle-ground: high enough that
      91             :    any peer with a measured latency is preferred, low enough that slot
      92             :    distance still meaningfully differentiates unpinged peers. */
      93         912 : #define DEFAULT_PEER_LATENCY         (100UL*1000UL*1000UL)  /* 100ms */
      94         906 : #define DEFAULT_SLOTS_BEHIND_PENALTY (1000UL)
      95             : 
      96             : #define FD_SSPEER_SELECTOR_DEBUG 0
      97             : 
      98             : struct fd_sspeer_selector_private {
      99             :   fd_sspeer_private_t *     pool;
     100             :   peer_map_by_key_t *       map_by_key;
     101             :   peer_map_by_addr_t *      map_by_addr;
     102             :   score_treap_t *           score_treap;
     103             :   score_treap_t *           shadow_score_treap;
     104             :   ulong *                   peer_idx_list;
     105             :   fd_sscluster_slot_t       cluster_slot;
     106             :   ulong                     max_peers;
     107             :   int                       cluster_slot_dirty;
     108             : 
     109             :   ulong                     magic; /* ==FD_SSPEER_SELECTOR_MAGIC */
     110             : };
     111             : 
     112             : FD_FN_CONST ulong
     113         768 : fd_sspeer_selector_align( void ) {
     114         768 :   return fd_ulong_max( alignof( fd_sspeer_selector_t), fd_ulong_max( peer_pool_align(),
     115         768 :           fd_ulong_max( peer_map_by_key_align(), fd_ulong_max( peer_map_by_addr_align(),
     116         768 :           fd_ulong_max( score_treap_align(), alignof(ulong) ) ) ) ) );
     117         768 : }
     118             : 
     119             : FD_FN_CONST ulong
     120          54 : fd_sspeer_selector_footprint( ulong max_peers ) {
     121          54 :   ulong l;
     122          54 :   l = FD_LAYOUT_INIT;
     123          54 :   l = FD_LAYOUT_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
     124          54 :   l = FD_LAYOUT_APPEND( l, peer_pool_align(),             peer_pool_footprint( 2UL*max_peers ) );
     125          54 :   l = FD_LAYOUT_APPEND( l, peer_map_by_key_align(),       peer_map_by_key_footprint( peer_map_by_key_chain_cnt_est( 2UL*max_peers ) ) );
     126          54 :   l = FD_LAYOUT_APPEND( l, peer_map_by_addr_align(),      peer_map_by_addr_footprint( peer_map_by_addr_chain_cnt_est( 2UL*max_peers ) ) );
     127          54 :   l = FD_LAYOUT_APPEND( l, score_treap_align(),           score_treap_footprint( max_peers ) );
     128          54 :   l = FD_LAYOUT_APPEND( l, score_treap_align(),           score_treap_footprint( max_peers ) );
     129          54 :   l = FD_LAYOUT_APPEND( l, alignof(ulong),                max_peers * sizeof(ulong) ); /* pool indices for shadow treap swap */
     130          54 :   return FD_LAYOUT_FINI( l, fd_sspeer_selector_align() );
     131          54 : }
     132             : 
     133             : void *
     134             : fd_sspeer_selector_new( void * shmem,
     135             :                         ulong  max_peers,
     136         129 :                         ulong  seed ) {
     137         129 :   if( FD_UNLIKELY( !shmem ) ) {
     138           0 :     FD_LOG_WARNING(( "NULL shmem" ));
     139           0 :     return NULL;
     140           0 :   }
     141             : 
     142         129 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_sspeer_selector_align() ) ) ) {
     143           0 :     FD_LOG_WARNING(( "unaligned shmem" ));
     144           0 :     return NULL;
     145           0 :   }
     146             : 
     147         129 :   if( FD_UNLIKELY( max_peers < 1UL ) ) {
     148           0 :     FD_LOG_WARNING(( "max_peers must be at least 1" ));
     149           0 :     return NULL;
     150           0 :   }
     151             : 
     152         129 :   FD_SCRATCH_ALLOC_INIT( l, shmem );
     153         129 :   fd_sspeer_selector_t * selector = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
     154         129 :   void * _pool                    = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(),        peer_pool_footprint( 2UL*max_peers ) );
     155         129 :   void * _map                     = FD_SCRATCH_ALLOC_APPEND( l, peer_map_by_key_align(),  peer_map_by_key_footprint( peer_map_by_key_chain_cnt_est( 2UL*max_peers ) )  );
     156         129 :   void * _multimap_by_addr        = FD_SCRATCH_ALLOC_APPEND( l, peer_map_by_addr_align(), peer_map_by_addr_footprint( peer_map_by_addr_chain_cnt_est( 2UL*max_peers ) )  );
     157         129 :   void * _score_treap             = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(),      score_treap_footprint( max_peers ) );
     158         129 :   void * _shadow_score_treap      = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(),      score_treap_footprint( max_peers ) );
     159         129 :   void * _peer_idx_list           = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong),           max_peers * sizeof(ulong) ); /* pool indices for shadow treap swap */
     160             : 
     161         129 :   selector->pool               = peer_pool_join( peer_pool_new( _pool, 2UL*max_peers ) );
     162             :   /* Seed treap priorities so the treap is balanced. */
     163         129 :   score_treap_seed( selector->pool, 2UL*max_peers, seed );
     164         129 :   selector->map_by_key         = peer_map_by_key_join( peer_map_by_key_new( _map, peer_map_by_key_chain_cnt_est( 2UL*max_peers ), seed ) );
     165         129 :   selector->map_by_addr        = peer_map_by_addr_join( peer_map_by_addr_new( _multimap_by_addr, peer_map_by_addr_chain_cnt_est( 2UL*max_peers ), seed ) );
     166         129 :   selector->score_treap        = score_treap_join( score_treap_new( _score_treap, max_peers ) );
     167         129 :   selector->shadow_score_treap = score_treap_join( score_treap_new( _shadow_score_treap, max_peers ) );
     168         129 :   selector->peer_idx_list      = (ulong *)_peer_idx_list;
     169         129 :   selector->max_peers          = max_peers;
     170             : 
     171         129 :   selector->cluster_slot.full          = 0UL;
     172         129 :   selector->cluster_slot.incremental   = FD_SSPEER_SLOT_UNKNOWN;
     173         129 :   selector->cluster_slot_dirty         = 0;
     174             : 
     175         129 :   FD_COMPILER_MFENCE();
     176         129 :   FD_VOLATILE( selector->magic ) = FD_SSPEER_SELECTOR_MAGIC;
     177         129 :   FD_COMPILER_MFENCE();
     178             : 
     179         129 :   return (void *)selector;
     180         129 : }
     181             : 
     182             : fd_sspeer_selector_t *
     183         129 : fd_sspeer_selector_join( void * shselector ) {
     184         129 :   if( FD_UNLIKELY( !shselector ) ) {
     185           0 :     FD_LOG_WARNING(( "NULL shselector" ));
     186           0 :     return NULL;
     187           0 :   }
     188             : 
     189         129 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
     190           0 :     FD_LOG_WARNING(( "misaligned shselector" ));
     191           0 :     return NULL;
     192           0 :   }
     193             : 
     194         129 :   fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
     195             : 
     196         129 :   if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
     197           0 :     FD_LOG_WARNING(( "bad magic" ));
     198           0 :     return NULL;
     199           0 :   }
     200             : 
     201         129 :   return selector;
     202         129 : }
     203             : 
     204             : void *
     205         114 : fd_sspeer_selector_leave( fd_sspeer_selector_t * selector ) {
     206         114 :   if( FD_UNLIKELY( !selector ) ) {
     207           0 :     FD_LOG_WARNING(( "NULL selector" ));
     208           0 :     return NULL;
     209           0 :   }
     210             : 
     211         114 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)selector, fd_sspeer_selector_align() ) ) ) {
     212           0 :     FD_LOG_WARNING(( "misaligned selector" ));
     213           0 :     return NULL;
     214           0 :   }
     215             : 
     216         114 :   if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
     217           0 :     FD_LOG_WARNING(( "bad magic" ));
     218           0 :     return NULL;
     219           0 :   }
     220             : 
     221         114 :   selector->pool               = peer_pool_leave( selector->pool );
     222         114 :   selector->map_by_key         = peer_map_by_key_leave( selector->map_by_key );
     223         114 :   selector->map_by_addr        = peer_map_by_addr_leave( selector->map_by_addr );
     224         114 :   selector->score_treap        = score_treap_leave( selector->score_treap );
     225         114 :   selector->shadow_score_treap = score_treap_leave( selector->shadow_score_treap );
     226             : 
     227         114 :   return (void *)selector;
     228         114 : }
     229             : 
     230             : void *
     231         114 : fd_sspeer_selector_delete( void * shselector ) {
     232         114 :   if( FD_UNLIKELY( !shselector ) ) {
     233           0 :     FD_LOG_WARNING(( "NULL shselector" ));
     234           0 :     return NULL;
     235           0 :   }
     236             : 
     237         114 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
     238           0 :     FD_LOG_WARNING(( "misaligned shselector" ));
     239           0 :     return NULL;
     240           0 :   }
     241             : 
     242         114 :   fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
     243             : 
     244         114 :   if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
     245           0 :     FD_LOG_WARNING(( "bad magic" ));
     246           0 :     return NULL;
     247           0 :   }
     248             : 
     249         114 :   selector->pool               = peer_pool_delete( selector->pool );
     250         114 :   selector->map_by_key         = peer_map_by_key_delete( selector->map_by_key );
     251         114 :   selector->map_by_addr        = peer_map_by_addr_delete( selector->map_by_addr );
     252         114 :   selector->score_treap        = score_treap_delete( selector->score_treap );
     253         114 :   selector->shadow_score_treap = score_treap_delete( selector->shadow_score_treap );
     254             : 
     255         114 :   FD_COMPILER_MFENCE();
     256         114 :   FD_VOLATILE( selector->magic ) = 0UL;
     257         114 :   FD_COMPILER_MFENCE();
     258             : 
     259         114 :   return (void *)selector;
     260         114 : }
     261             : 
     262             : /* Calculates a score for a peer given its latency and its resolved
     263             :    full and incremental slots */
     264             : static ulong
     265             : fd_sspeer_selector_score( fd_sspeer_selector_t const * selector,
     266             :                           ulong                        peer_latency,
     267             :                           ulong                        full_slot,
     268         906 :                           ulong                        incr_slot ) {
     269         906 :   peer_latency = peer_latency!=FD_SSPEER_LATENCY_UNKNOWN ? peer_latency : DEFAULT_PEER_LATENCY;
     270             : 
     271         906 :   ulong slots_behind = DEFAULT_SLOTS_BEHIND;
     272             : 
     273         906 :   if( FD_LIKELY( full_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) {
     274         873 :     if( FD_LIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN &&
     275         873 :                    selector->cluster_slot.incremental!=FD_SSPEER_SLOT_UNKNOWN ) ) {
     276         321 :       slots_behind = selector->cluster_slot.incremental>incr_slot ? selector->cluster_slot.incremental - incr_slot : 0UL;
     277         552 :     } else {
     278             :       /* Either the peer has no incremental or the cluster has no
     279             :          incremental reference yet.  Fall back to comparing full_slot
     280             :          against the cluster full slot. */
     281         552 :       slots_behind = selector->cluster_slot.full>full_slot ? selector->cluster_slot.full - full_slot : 0UL;
     282         552 :     }
     283         873 :     slots_behind = fd_ulong_min( slots_behind, MAX_SLOTS_BEHIND_CAP );
     284         873 :   }
     285             : 
     286             :   /* Using saturating arithmetic to avoid overflow and cap at
     287             :      FD_SSPEER_SCORE_MAX. */
     288         906 :   ulong penalty = fd_ulong_sat_mul( DEFAULT_SLOTS_BEHIND_PENALTY, slots_behind );
     289         906 :   ulong score   = fd_ulong_sat_add( peer_latency, penalty );
     290         906 :   return fd_ulong_min( score, FD_SSPEER_SCORE_MAX );
     291         906 : }
     292             : 
     293             : /* Validates slot arguments for both new and existing peers.  Returns
     294             :    0 on success, -1 on failure due to incr_slot<full_slot, and -2 on
     295             :    failure due to full_slot==UNKNOWN with incr_slot!=UNKNOWN.  The
     296             :    caller passes in the effective (already-resolved) full_slot and
     297             :    incr_slot values.  No log on failure (the caller is responsible
     298             :    for logging whenever needed).
     299             : 
     300             :    Two invariants are enforced:
     301             :    1. When both slots are known, incr_slot must be >= full_slot.
     302             :    2. An incremental slot requires a known full slot. */
     303             : static int
     304             : fd_sspeer_validate_slot_args( ulong full_slot,
     305         660 :                               ulong incr_slot ) {
     306         660 :   if( FD_UNLIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN &&
     307         660 :                    full_slot!=FD_SSPEER_SLOT_UNKNOWN &&
     308         660 :                    incr_slot<full_slot ) ) {
     309           6 :     return -1;
     310           6 :   }
     311             : 
     312         654 :   if( FD_UNLIKELY( full_slot==FD_SSPEER_SLOT_UNKNOWN &&
     313         654 :                    incr_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) {
     314           9 :     return -2;
     315           9 :   }
     316             : 
     317         645 :   return 0;
     318         654 : }
     319             : 
     320             : /* Updates a peer's score with new values for latency and/or resolved
     321             :    full/incremental slots.  Returns FD_SSPEER_UPDATE_SUCCESS on
     322             :    success, or the specific fd_sspeer_validate_slot_args error code
     323             :    on failure without modifying the peer or any data structure.
     324             : 
     325             :    Slot-based incremental clearing: when the caller provides
     326             :    incr_slot==UNKNOWN and full_slot!=UNKNOWN, the peer's existing
     327             :    incremental data is cleared if it is stale (peer->incr_slot <
     328             :    full_slot).  Otherwise, the existing incremental data is preserved. */
     329             : static int
     330             : fd_sspeer_selector_update( fd_sspeer_selector_t * selector,
     331             :                            fd_sspeer_private_t *  peer,
     332             :                            ulong                  latency,
     333             :                            ulong                  full_slot,
     334             :                            ulong                  incr_slot,
     335             :                            uchar const            full_hash[ FD_HASH_FOOTPRINT ],
     336          96 :                            uchar const            incr_hash[ FD_HASH_FOOTPRINT ] ) {
     337          96 :   ulong peer_latency   = latency!=FD_SSPEER_LATENCY_UNKNOWN ? latency : peer->latency;
     338          96 :   ulong peer_full_slot = full_slot!=FD_SSPEER_SLOT_UNKNOWN ? full_slot : peer->full_slot;
     339             : 
     340          96 :   ulong peer_incr_slot;
     341          96 :   int   clear_incr = 0;
     342          96 :   if( incr_slot!=FD_SSPEER_SLOT_UNKNOWN ) {
     343          36 :     peer_incr_slot = incr_slot;
     344          60 :   } else if( full_slot!=FD_SSPEER_SLOT_UNKNOWN &&
     345          60 :              peer->incr_slot!=FD_SSPEER_SLOT_UNKNOWN &&
     346          60 :              peer->incr_slot<full_slot ) {
     347             :     /* The caller is providing a new full_slot that has advanced past
     348             :        the peer's existing incremental, the incremental is stale. */
     349          12 :     peer_incr_slot = FD_SSPEER_SLOT_UNKNOWN;
     350          12 :     clear_incr     = 1;
     351          48 :   } else {
     352          48 :     peer_incr_slot = peer->incr_slot;
     353          48 :   }
     354             : 
     355          96 :   int validate_err = fd_sspeer_validate_slot_args( peer_full_slot, peer_incr_slot );
     356          96 :   if( FD_UNLIKELY( validate_err ) ) return validate_err;
     357             : 
     358          90 :   score_treap_ele_remove( selector->score_treap, peer, selector->pool );
     359             : 
     360          90 :   peer->score = fd_sspeer_selector_score( selector, peer_latency, peer_full_slot, peer_incr_slot );
     361             : 
     362          90 :   peer->latency   = peer_latency;
     363          90 :   peer->full_slot = peer_full_slot;
     364          90 :   peer->incr_slot = peer_incr_slot;
     365          90 :   if( FD_LIKELY( full_hash ) ) {
     366          36 :     fd_memcpy( peer->full_hash, full_hash, FD_HASH_FOOTPRINT );
     367          36 :   }
     368          90 :   if( FD_UNLIKELY( clear_incr ) ) {
     369          12 :     fd_memset( peer->incr_hash, 0, FD_HASH_FOOTPRINT );
     370          78 :   } else if( FD_LIKELY( incr_hash ) ) {
     371          15 :     fd_memcpy( peer->incr_hash, incr_hash, FD_HASH_FOOTPRINT );
     372          15 :   }
     373             : 
     374          90 :   score_treap_ele_insert( selector->score_treap, peer, selector->pool );
     375          90 :   return FD_SSPEER_UPDATE_SUCCESS;
     376          96 : }
     377             : 
     378             : ulong
     379             : fd_sspeer_selector_update_on_ping( fd_sspeer_selector_t * selector,
     380             :                                    fd_ip4_port_t          addr,
     381          21 :                                    ulong                  latency ) {
     382          21 :   ulong ele_idx = peer_map_by_addr_idx_query_const( selector->map_by_addr, &addr, ULONG_MAX, selector->pool );
     383          21 :   ulong cnt = 0UL;
     384          45 :   for(;;) {
     385          45 :     if( FD_UNLIKELY( ele_idx==ULONG_MAX ) ) break;
     386          24 :     fd_sspeer_private_t * peer = selector->pool + ele_idx;
     387             :     /* Update cannot fail here: slots are FD_SSPEER_SLOT_UNKNOWN and
     388             :        hashes are NULL, so fd_sspeer_validate_slot_args always
     389             :        returns FD_SSPEER_UPDATE_SUCCESS (no clear_incr trigger,
     390             :        no incr<full violation). */
     391          24 :     int update_status = fd_sspeer_selector_update( selector, peer, latency,
     392          24 :                                                    FD_SSPEER_SLOT_UNKNOWN, FD_SSPEER_SLOT_UNKNOWN,
     393          24 :                                                    NULL, NULL );
     394          24 :     if( FD_UNLIKELY( update_status!=FD_SSPEER_UPDATE_SUCCESS ) ) {
     395             :       /* A warning is a tradeoff between crashing with FD_LOG_CRIT and
     396             :          potentially missing the log altogether with FD_LOG_DEBUG. */
     397           0 :       if( peer->key.is_url ) {
     398           0 :         FD_LOG_WARNING(( "unexpected selector update returned %d for peer %s " FD_IP4_ADDR_FMT ":%hu",
     399           0 :                          update_status, peer->key.url.hostname,
     400           0 :                          FD_IP4_ADDR_FMT_ARGS( peer->key.url.resolved_addr.addr ), fd_ushort_bswap( peer->key.url.resolved_addr.port ) ));
     401           0 :       } else {
     402           0 :         FD_BASE58_ENCODE_32_BYTES( peer->key.pubkey->uc, peer_pubkey_b58 );
     403           0 :         FD_LOG_WARNING(( "unexpected selector update returned %d for peer %s " FD_IP4_ADDR_FMT ":%hu",
     404           0 :                          update_status, peer_pubkey_b58,
     405           0 :                          FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), fd_ushort_bswap( peer->addr.port ) ));
     406           0 :       }
     407           0 :     }
     408          24 :     ele_idx = peer_map_by_addr_idx_next_const( ele_idx, ULONG_MAX, selector->pool );
     409          24 :     cnt++;
     410          24 :   }
     411          21 :   return cnt;
     412          21 : }
     413             : 
     414             : ulong
     415             : fd_sspeer_selector_add( fd_sspeer_selector_t * selector,
     416             :                         fd_sspeer_key_t const * key,
     417             :                         fd_ip4_port_t          addr,
     418             :                         ulong                  latency,
     419             :                         ulong                  full_slot,
     420             :                         ulong                  incr_slot,
     421             :                         uchar const            full_hash[ FD_HASH_FOOTPRINT ],
     422         675 :                         uchar const            incr_hash[ FD_HASH_FOOTPRINT ] ) {
     423         675 :   if( FD_UNLIKELY( key==NULL ) ) return FD_SSPEER_SCORE_INVALID;
     424             :   /* A peer without a valid address cannot be added to the selector.
     425             :      For an existing peer changing from a valid address to 0, it is
     426             :      the caller's responsibility to remove them. */
     427         666 :   if( FD_UNLIKELY( !addr.l ) ) return FD_SSPEER_SCORE_INVALID;
     428             : 
     429             :   /* Reject implausible slot values.  FD_SSPEER_SLOT_UNKNOWN is allowed. */
     430         654 :   if( FD_UNLIKELY( full_slot!=FD_SSPEER_SLOT_UNKNOWN && full_slot>=FD_SSPEER_PLAUSIBLE_MAX_SLOT ) ) return FD_SSPEER_SCORE_INVALID;
     431         648 :   if( FD_UNLIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN && incr_slot>=FD_SSPEER_PLAUSIBLE_MAX_SLOT ) ) return FD_SSPEER_SCORE_INVALID;
     432             : 
     433         645 :   fd_sspeer_private_t * peer = peer_map_by_key_ele_query( selector->map_by_key, key, NULL, selector->pool );
     434         645 :   if( FD_LIKELY( peer ) ) {
     435          72 :     ulong old_full = peer->full_slot;
     436          72 :     ulong old_incr = peer->incr_slot;
     437          72 :     int update_status = fd_sspeer_selector_update( selector, peer, latency, full_slot, incr_slot, full_hash, incr_hash );
     438          72 :     if( FD_UNLIKELY( update_status!=FD_SSPEER_UPDATE_SUCCESS ) ) return FD_SSPEER_SCORE_INVALID;
     439          66 :     if( FD_UNLIKELY( peer->full_slot!=old_full || peer->incr_slot!=old_incr ) ) selector->cluster_slot_dirty = 1;
     440             :     /* Update the addr map after the selector update so that the peer
     441             :        is not mutated when the update fails. */
     442          66 :     if( FD_UNLIKELY( peer->addr.l!=addr.l ) ) {
     443           6 :       peer_map_by_addr_ele_remove_fast( selector->map_by_addr, peer, selector->pool );
     444           6 :       peer->addr = addr;
     445           6 :       peer_map_by_addr_ele_insert( selector->map_by_addr, peer, selector->pool );
     446           6 :     }
     447         573 :   } else {
     448         573 :     if( FD_UNLIKELY( !peer_pool_free( selector->pool ) ) ) {
     449           0 :       FD_LOG_WARNING(( "peer selector pool exhausted" ));
     450           0 :       return FD_SSPEER_SCORE_INVALID;
     451           0 :     }
     452         573 :     if( FD_UNLIKELY( score_treap_ele_cnt(selector->score_treap)>=selector->max_peers ) ) {
     453           9 :       FD_LOG_WARNING(( "peer selector at max capacity" ));
     454           9 :       return FD_SSPEER_SCORE_INVALID;
     455           9 :     }
     456             : 
     457         564 :     if( FD_UNLIKELY( fd_sspeer_validate_slot_args( full_slot, incr_slot ) ) ) {
     458           9 :       return FD_SSPEER_SCORE_INVALID;
     459           9 :     }
     460             : 
     461         555 :     peer = peer_pool_ele_acquire( selector->pool );
     462         555 :     peer->key       = *key;
     463         555 :     peer->addr      = addr;
     464         555 :     peer->latency   = latency;
     465         555 :     peer->score     = fd_sspeer_selector_score( selector, latency, full_slot, incr_slot );
     466         555 :     peer->full_slot = full_slot;
     467         555 :     peer->incr_slot = incr_slot;
     468         555 :     if( FD_LIKELY( full_hash ) ) fd_memcpy( peer->full_hash, full_hash, FD_HASH_FOOTPRINT );
     469         543 :     else                         fd_memset( peer->full_hash, 0, FD_HASH_FOOTPRINT );
     470             :     /* full_hash and incr_hash are treated independently here. */
     471         555 :     if( FD_LIKELY( incr_hash ) ) fd_memcpy( peer->incr_hash, incr_hash, FD_HASH_FOOTPRINT );
     472         543 :     else                         fd_memset( peer->incr_hash, 0, FD_HASH_FOOTPRINT );
     473         555 :     peer_map_by_key_ele_insert( selector->map_by_key, peer, selector->pool );
     474         555 :     peer_map_by_addr_ele_insert( selector->map_by_addr, peer, selector->pool );
     475         555 :     score_treap_ele_insert( selector->score_treap, peer, selector->pool );
     476         555 :     if( FD_LIKELY( full_slot!=FD_SSPEER_SLOT_UNKNOWN || incr_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) selector->cluster_slot_dirty = 1;
     477         555 :   }
     478         621 :   peer->valid = peer->full_slot!=FD_SSPEER_SLOT_UNKNOWN;
     479         621 :   return peer->score;
     480         645 : }
     481             : 
     482             : void
     483             : fd_sspeer_selector_remove( fd_sspeer_selector_t * selector,
     484         516 :                            fd_sspeer_key_t const * key ) {
     485         516 :   if( FD_UNLIKELY( key==NULL ) ) return;
     486         510 :   fd_sspeer_private_t * peer = peer_map_by_key_ele_query( selector->map_by_key, key, NULL, selector->pool );
     487         510 :   if( FD_UNLIKELY( peer==NULL ) ) return;
     488         495 :   score_treap_ele_remove( selector->score_treap, peer, selector->pool );
     489         495 :   peer_map_by_key_ele_remove_fast( selector->map_by_key, peer, selector->pool );
     490         495 :   peer_map_by_addr_ele_remove_fast( selector->map_by_addr, peer, selector->pool );
     491         495 :   peer_pool_ele_release( selector->pool, peer );
     492         495 :   selector->cluster_slot_dirty = 1;
     493         495 : }
     494             : 
     495             : void
     496             : fd_sspeer_selector_remove_by_addr( fd_sspeer_selector_t * selector,
     497          48 :                                    fd_ip4_port_t          addr ) {
     498         105 :   for(;;) {
     499         105 :     fd_sspeer_private_t * peer = peer_map_by_addr_ele_remove( selector->map_by_addr, &addr, NULL, selector->pool );
     500         105 :     if( FD_UNLIKELY( peer==NULL ) ) break;
     501          57 :     score_treap_ele_remove( selector->score_treap, peer, selector->pool );
     502          57 :     peer_map_by_key_ele_remove_fast( selector->map_by_key, peer, selector->pool );
     503          57 :     peer_pool_ele_release( selector->pool, peer );
     504          57 :     selector->cluster_slot_dirty = 1;
     505          57 :   }
     506          48 : }
     507             : 
     508             : fd_sspeer_t
     509             : fd_sspeer_selector_best( fd_sspeer_selector_t * selector,
     510             :                          int                    incremental,
     511         303 :                          ulong                  base_slot ) {
     512         303 :   if( FD_UNLIKELY( incremental && base_slot==FD_SSPEER_SLOT_UNKNOWN ) ) {
     513           3 :     FD_LOG_WARNING(( "incremental selection requires a valid base_slot" ));
     514           3 :     return (fd_sspeer_t){
     515           3 :       .key       = { .is_url = 0 },
     516           3 :       .addr      = { .l=0UL },
     517           3 :       .full_slot = FD_SSPEER_SLOT_UNKNOWN,
     518           3 :       .incr_slot = FD_SSPEER_SLOT_UNKNOWN,
     519           3 :       .score     = FD_SSPEER_SCORE_INVALID,
     520           3 :       .full_hash = {0},
     521           3 :       .incr_hash = {0},
     522           3 :     };
     523           3 :   }
     524             : 
     525         300 :   for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
     526         351 :        !score_treap_fwd_iter_done( iter );
     527         306 :        iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
     528         306 :     fd_sspeer_private_t const * peer = score_treap_fwd_iter_ele_const( iter, selector->pool );
     529             :     /* For full selection (!incremental), any valid peer is eligible.
     530             :        For incremental selection, the peer must serve the same base full
     531             :        snapshot and must actually offer an incremental snapshot. */
     532         306 :     if( FD_LIKELY( peer->valid &&
     533         306 :                    (!incremental ||
     534         306 :                    (peer->full_slot==base_slot && peer->incr_slot!=FD_SSPEER_SLOT_UNKNOWN) ) ) ) {
     535         255 :       fd_sspeer_t best = {
     536         255 :         .key       = peer->key,
     537         255 :         .addr      = peer->addr,
     538         255 :         .full_slot = peer->full_slot,
     539         255 :         .incr_slot = peer->incr_slot,
     540         255 :         .score     = peer->score,
     541         255 :       };
     542         255 :       fd_memcpy( best.full_hash, peer->full_hash, FD_HASH_FOOTPRINT );
     543         255 :       fd_memcpy( best.incr_hash, peer->incr_hash, FD_HASH_FOOTPRINT );
     544         255 :       return best;
     545         255 :     }
     546         306 :   }
     547             : 
     548          45 :   return (fd_sspeer_t){
     549          45 :     .key       = { .is_url = 0 },
     550          45 :     .addr      = { .l=0UL },
     551          45 :     .full_slot = FD_SSPEER_SLOT_UNKNOWN,
     552          45 :     .incr_slot = FD_SSPEER_SLOT_UNKNOWN,
     553          45 :     .score     = FD_SSPEER_SCORE_INVALID,
     554          45 :     .full_hash = {0},
     555          45 :     .incr_hash = {0},
     556          45 :   };
     557         300 : }
     558             : 
     559             : void
     560         177 : fd_sspeer_selector_process_cluster_slot( fd_sspeer_selector_t * selector ) {
     561         177 :   if( FD_LIKELY( !selector->cluster_slot_dirty ) ) return;
     562         165 :   selector->cluster_slot_dirty = 0;
     563             : 
     564             :   /* Compute max full_slot and incr_slot across all tracked peers. */
     565         165 :   ulong max_full = 0UL;
     566         165 :   ulong max_incr = 0UL;
     567         165 :   ulong full_cnt = 0UL;
     568         165 :   ulong incr_cnt = 0UL;
     569         165 :   for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
     570         531 :        !score_treap_fwd_iter_done( iter );
     571         366 :        iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
     572         366 :     fd_sspeer_private_t const * peer = score_treap_fwd_iter_ele_const( iter, selector->pool );
     573         366 :     if( FD_LIKELY( peer->full_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) {
     574         360 :       max_full = fd_ulong_max( max_full, peer->full_slot );
     575         360 :       full_cnt++;
     576         360 :     }
     577         366 :     if( FD_LIKELY( peer->incr_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) {
     578         264 :       max_incr = fd_ulong_max( max_incr, peer->incr_slot );
     579         264 :       incr_cnt++;
     580         264 :     }
     581         366 :   }
     582             : 
     583             :   /* If no known incr_slot among tracked peers, treat as unknown. */
     584         165 :   if( FD_UNLIKELY( incr_cnt==0UL ) ) {
     585          42 :     max_incr = FD_SSPEER_SLOT_UNKNOWN;
     586          42 :   }
     587             : 
     588             :   /* If no known full_slot among tracked peers, reset so that a
     589             :      subsequent add() is not scored against a stale cluster slot. */
     590         165 :   if( FD_UNLIKELY( full_cnt==0UL ) ) {
     591          27 :     selector->cluster_slot.full        = 0UL;
     592          27 :     selector->cluster_slot.incremental = FD_SSPEER_SLOT_UNKNOWN;
     593          27 :     return;
     594          27 :   }
     595             : 
     596             :   /* The full and incremental maxima are computed from different peer
     597             :      subsets, so max_incr < max_full is possible.  Clamp to maintain
     598             :      the invariant that incremental >= full. */
     599         138 :   if( FD_UNLIKELY( max_incr!=FD_SSPEER_SLOT_UNKNOWN &&
     600         138 :                    max_incr<max_full ) ) {
     601           3 :     max_incr = max_full;
     602           3 :   }
     603             : 
     604             :   /* If neither max changed, no rescore needed. */
     605         138 :   if( FD_LIKELY( max_full==selector->cluster_slot.full &&
     606         138 :                  max_incr==selector->cluster_slot.incremental ) ) return;
     607             : 
     608         114 :   selector->cluster_slot.full        = max_full;
     609         114 :   selector->cluster_slot.incremental = max_incr;
     610             : 
     611             :   /* Rescore all peers using the shadow treap swap mechanism. */
     612         114 :   ulong idx = 0UL;
     613         114 :   for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
     614         375 :         !score_treap_fwd_iter_done( iter );
     615         261 :         iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
     616         261 :     fd_sspeer_private_t * peer = score_treap_fwd_iter_ele( iter, selector->pool );
     617         261 :     fd_sspeer_private_t * shadow_peer = peer_pool_ele_acquire( selector->pool );
     618         261 :     shadow_peer->latency   = peer->latency;
     619         261 :     shadow_peer->full_slot = peer->full_slot;
     620         261 :     shadow_peer->incr_slot = peer->incr_slot;
     621         261 :     shadow_peer->addr      = peer->addr;
     622         261 :     shadow_peer->key       = peer->key;
     623         261 :     shadow_peer->score     = fd_sspeer_selector_score( selector, shadow_peer->latency, shadow_peer->full_slot, shadow_peer->incr_slot );
     624         261 :     shadow_peer->valid     = peer->valid;
     625         261 :     fd_memcpy( shadow_peer->full_hash, peer->full_hash, FD_HASH_FOOTPRINT );
     626         261 :     fd_memcpy( shadow_peer->incr_hash, peer->incr_hash, FD_HASH_FOOTPRINT );
     627         261 :     score_treap_ele_insert( selector->shadow_score_treap, shadow_peer, selector->pool );
     628         261 :     selector->peer_idx_list[ idx++ ] = peer_pool_idx( selector->pool, peer );
     629         261 :     peer_map_by_key_ele_remove_fast( selector->map_by_key, peer, selector->pool );
     630         261 :     peer_map_by_addr_ele_remove_fast( selector->map_by_addr, peer, selector->pool );
     631         261 :     peer_map_by_key_ele_insert( selector->map_by_key, shadow_peer, selector->pool );
     632         261 :     peer_map_by_addr_ele_insert( selector->map_by_addr, shadow_peer, selector->pool );
     633         261 :   }
     634             : 
     635             :   /* clear score treap */
     636         375 :   for( ulong i=0UL; i<idx; i++ ) {
     637         261 :     fd_sspeer_private_t * peer = peer_pool_ele( selector->pool, selector->peer_idx_list[ i ] );
     638         261 :     score_treap_ele_remove( selector->score_treap, peer, selector->pool );
     639         261 :     peer_pool_ele_release( selector->pool, peer );
     640         261 :   }
     641             : 
     642         114 :   score_treap_t * tmp          = selector->score_treap;
     643         114 :   selector->score_treap        = selector->shadow_score_treap;
     644         114 :   selector->shadow_score_treap = tmp;
     645             : 
     646             : #if FD_SSPEER_SELECTOR_DEBUG
     647             :   FD_TEST( score_treap_verify( selector->score_treap, selector->pool )==0 );
     648             : #endif
     649         114 : }
     650             : 
     651             : fd_sscluster_slot_t
     652         114 : fd_sspeer_selector_cluster_slot( fd_sspeer_selector_t * selector ) {
     653         114 :   return selector->cluster_slot;
     654         114 : }
     655             : 
     656             : ulong
     657         231 : fd_sspeer_selector_peer_map_by_key_ele_cnt( fd_sspeer_selector_t * selector ) {
     658         231 :   ulong cnt = 0UL;
     659         231 :   for( peer_map_by_key_iter_t iter = peer_map_by_key_iter_init( selector->map_by_key, selector->pool );
     660         828 :       !peer_map_by_key_iter_done( iter, selector->map_by_key, selector->pool );
     661         597 :       iter = peer_map_by_key_iter_next( iter, selector->map_by_key, selector->pool ) ) {
     662         597 :     cnt++;
     663         597 :   }
     664         231 :   return cnt;
     665         231 : }
     666             : 
     667             : ulong
     668         168 : fd_sspeer_selector_peer_map_by_addr_ele_cnt( fd_sspeer_selector_t * selector ) {
     669         168 :   ulong cnt = 0UL;
     670         168 :   for( peer_map_by_addr_iter_t iter = peer_map_by_addr_iter_init( selector->map_by_addr, selector->pool );
     671         558 :       !peer_map_by_addr_iter_done( iter, selector->map_by_addr, selector->pool );
     672         390 :       iter = peer_map_by_addr_iter_next( iter, selector->map_by_addr, selector->pool ) ) {
     673         390 :     cnt++;
     674         390 :   }
     675         168 :   return cnt;
     676         168 : }

Generated by: LCOV version 1.14