LCOV - code coverage report
Current view: top level - discof/restore/utils - fd_sspeer_selector.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 374 425 88.0 %
Date: 2026-03-31 06:22:16 Functions: 21 21 100.0 %

          Line data    Source code
       1             : #include "fd_sspeer_selector.h"
       2             : #include "../../../util/bits/fd_sat.h"
       3             : #include "../../../util/log/fd_log.h"
       4             : 
       5             : static int
       6             : fd_sspeer_key_private_eq( fd_sspeer_key_t const * k0,
       7         456 :                           fd_sspeer_key_t const * k1 ) {
       8         456 :   if( k0->is_url!=k1->is_url ) return 0;
       9         420 :   if( k0->is_url ) {
      10         201 :     return !strncmp( k0->url.hostname, k1->url.hostname, sizeof(k0->url.hostname) )
      11         201 :            && k0->url.resolved_addr.l==k1->url.resolved_addr.l;
      12         201 :   }
      13         219 :   return !memcmp( k0->pubkey, k1->pubkey, FD_PUBKEY_FOOTPRINT );
      14         420 : }
      15             : 
      16             : static ulong
      17             : fd_sspeer_key_private_hash( fd_sspeer_key_t const * key,
      18        1575 :                             ulong                   seed ) {
      19        1575 :   if( key->is_url ) {
      20             :     /* Use strnlen in case the string is not properly \0 terminated.
      21             :        Ideally, one would prefer sizeof(key->url.hostname) but that
      22             :        requires guaranteed zero-padding. */
      23         798 :     ulong h = fd_hash( seed, key->url.hostname, strnlen( key->url.hostname, sizeof(key->url.hostname) ) );
      24             :     /* fd_ip4_port_t is not a complete 64bit ulong, therefore compose
      25             :        the word from its parts to avoid random unused bytes. */
      26         798 :     ulong a = (ulong)key->url.resolved_addr.addr | ( ((ulong)key->url.resolved_addr.port) << 32 );
      27             :     /* Chaining "a" through fd_hash would give better avalanche
      28             :        properties, but it is probably overkill for a chain hash map. */
      29         798 :     return h ^ a;
      30         798 :   }
      31         777 :   return fd_hash( seed, key->pubkey, FD_PUBKEY_FOOTPRINT );
      32        1575 : }
      33             : 
      34             : struct fd_sspeer_private {
      35             :   fd_sspeer_key_t key;
      36             :   fd_ip4_port_t addr;
      37             :   ulong         full_slot;
      38             :   ulong         incr_slot;
      39             :   uchar         full_hash[ FD_HASH_FOOTPRINT ];
      40             :   uchar         incr_hash[ FD_HASH_FOOTPRINT ];
      41             :   ulong         latency;
      42             :   ulong         score;
      43             :   int           valid;
      44             : 
      45             :   struct {
      46             :     ulong next;
      47             :   } pool;
      48             : 
      49             :   struct {
      50             :     ulong next;
      51             :     ulong prev;
      52             :   } map_by_key;
      53             : 
      54             :   struct {
      55             :     ulong next;
      56             :     ulong prev;
      57             :   } map_by_addr;
      58             : 
      59             :   struct {
      60             :     ulong parent;
      61             :     ulong left;
      62             :     ulong right;
      63             :     ulong prio;
      64             :   } score_treap;
      65             : };
      66             : 
      67             : typedef struct fd_sspeer_private fd_sspeer_private_t;
      68             : 
      69             : #define POOL_NAME  peer_pool
      70         270 : #define POOL_T     fd_sspeer_private_t
      71             : #define POOL_IDX_T ulong
      72     8652126 : #define POOL_NEXT  pool.next
      73             : #include "../../../util/tmpl/fd_pool.c"
      74             : 
      75             : #define MAP_NAME               peer_map_by_key
      76         858 : #define MAP_KEY                key
      77         435 : #define MAP_ELE_T              fd_sspeer_private_t
      78             : #define MAP_KEY_T              fd_sspeer_key_t
      79         678 : #define MAP_PREV               map_by_key.prev
      80        1695 : #define MAP_NEXT               map_by_key.next
      81         456 : #define MAP_KEY_EQ(k0,k1)      (fd_sspeer_key_private_eq(k0,k1))
      82        1575 : #define MAP_KEY_HASH(key,seed) (fd_sspeer_key_private_hash(key,seed))
      83             : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
      84             : #include "../../../util/tmpl/fd_map_chain.c"
      85             : 
      86             : #define MAP_NAME               peer_map_by_addr
      87         729 : #define MAP_KEY                addr
      88         429 : #define MAP_ELE_T              fd_sspeer_private_t
      89             : #define MAP_KEY_T              fd_ip4_port_t
      90         816 : #define MAP_PREV               map_by_addr.prev
      91        1467 : #define MAP_NEXT               map_by_addr.next
      92          60 : #define MAP_KEY_EQ(k0,k1)      ((k0)->l==(k1)->l)
      93         810 : #define MAP_KEY_HASH(key,seed) (seed^(key)->l)
      94             : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
      95             : #define MAP_MULTI              1
      96             : #include "../../../util/tmpl/fd_map_chain.c"
      97             : 
      98         888 : #define COMPARE_WORSE(x,y) ( (x)->score<(y)->score )
      99             : 
     100             : #define TREAP_T         fd_sspeer_private_t
     101             : #define TREAP_NAME      score_treap
     102             : #define TREAP_QUERY_T   void *                                         /* We don't use query ... */
     103             : #define TREAP_CMP(a,b)  (__extension__({ (void)(a); (void)(b); -1; })) /* which means we don't need to give a real
     104             :                                                                           implementation to cmp either */
     105        2973 : #define TREAP_IDX_T     ulong
     106         888 : #define TREAP_LT        COMPARE_WORSE
     107        2145 : #define TREAP_PARENT    score_treap.parent
     108        2019 : #define TREAP_LEFT      score_treap.left
     109        1278 : #define TREAP_RIGHT     score_treap.right
     110     8652459 : #define TREAP_PRIO      score_treap.prio
     111             : #include "../../../util/tmpl/fd_treap.c"
     112             : 
     113         525 : #define DEFAULT_SLOTS_BEHIND         (1000UL*1000UL) /* 1,000,000 slots behind */
     114             : /* Assumed latency (in nanos) for peers that have not been pinged yet.
     115             :    Pings are sent immediately on peer discovery, so this default is
     116             :    short-lived.  100ms is a neutral middle-ground: high enough that
     117             :    any peer with a measured latency is preferred, low enough that slot
     118             :    distance still meaningfully differentiates unpinged peers. */
     119         531 : #define DEFAULT_PEER_LATENCY         (100UL*1000UL*1000UL)  /* 100ms */
     120         525 : #define DEFAULT_SLOTS_BEHIND_PENALTY (1000UL)
     121             : 
     122             : #define FD_SSPEER_SELECTOR_DEBUG 0
     123             : 
     124             : struct fd_sspeer_selector_private {
     125             :   fd_sspeer_private_t *     pool;
     126             :   peer_map_by_key_t *       map_by_key;
     127             :   peer_map_by_addr_t *      map_by_addr;
     128             :   score_treap_t *           score_treap;
     129             :   score_treap_t *           shadow_score_treap;
     130             :   ulong *                   peer_idx_list;
     131             :   fd_sscluster_slot_t       cluster_slot;
     132             :   int                       incremental_snapshot_fetch;
     133             :   ulong                     max_peers;
     134             : 
     135             :   ulong                     magic; /* ==FD_SSPEER_SELECTOR_MAGIC */
     136             : };
     137             : 
     138             : FD_FN_CONST ulong
     139         396 : fd_sspeer_selector_align( void ) {
     140         396 :   return fd_ulong_max( alignof( fd_sspeer_selector_t), fd_ulong_max( peer_pool_align(),
     141         396 :           fd_ulong_max( peer_map_by_key_align(), fd_ulong_max( peer_map_by_addr_align(),
     142         396 :           fd_ulong_max( score_treap_align(), alignof(ulong) ) ) ) ) );
     143         396 : }
     144             : 
     145             : FD_FN_CONST ulong
     146          12 : fd_sspeer_selector_footprint( ulong max_peers ) {
     147          12 :   ulong l;
     148          12 :   l = FD_LAYOUT_INIT;
     149          12 :   l = FD_LAYOUT_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
     150          12 :   l = FD_LAYOUT_APPEND( l, peer_pool_align(),             peer_pool_footprint( 2UL*max_peers ) );
     151          12 :   l = FD_LAYOUT_APPEND( l, peer_map_by_key_align(),       peer_map_by_key_footprint( peer_map_by_key_chain_cnt_est( 2UL*max_peers ) ) );
     152          12 :   l = FD_LAYOUT_APPEND( l, peer_map_by_addr_align(),      peer_map_by_addr_footprint( peer_map_by_addr_chain_cnt_est( 2UL*max_peers ) ) );
     153          12 :   l = FD_LAYOUT_APPEND( l, score_treap_align(),           score_treap_footprint( max_peers ) );
     154          12 :   l = FD_LAYOUT_APPEND( l, score_treap_align(),           score_treap_footprint( max_peers ) );
     155          12 :   l = FD_LAYOUT_APPEND( l, alignof(ulong),                max_peers * sizeof(ulong) );
     156          12 :   return FD_LAYOUT_FINI( l, fd_sspeer_selector_align() );
     157          12 : }
     158             : 
     159             : void *
     160             : fd_sspeer_selector_new( void * shmem,
     161             :                         ulong  max_peers,
     162             :                         int    incremental_snapshot_fetch,
     163          90 :                         ulong  seed ) {
     164          90 :   if( FD_UNLIKELY( !shmem ) ) {
     165           0 :     FD_LOG_WARNING(( "NULL shmem" ));
     166           0 :     return NULL;
     167           0 :   }
     168             : 
     169          90 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_sspeer_selector_align() ) ) ) {
     170           0 :     FD_LOG_WARNING(( "unaligned shmem" ));
     171           0 :     return NULL;
     172           0 :   }
     173             : 
     174          90 :   if( FD_UNLIKELY( max_peers < 1UL ) ) {
     175           0 :     FD_LOG_WARNING(( "max_peers must be at least 1" ));
     176           0 :     return NULL;
     177           0 :   }
     178             : 
     179          90 :   FD_SCRATCH_ALLOC_INIT( l, shmem );
     180          90 :   fd_sspeer_selector_t * selector = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
     181          90 :   void * _pool                    = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(),        peer_pool_footprint( 2UL*max_peers ) );
     182          90 :   void * _map                     = FD_SCRATCH_ALLOC_APPEND( l, peer_map_by_key_align(),  peer_map_by_key_footprint( peer_map_by_key_chain_cnt_est( 2UL*max_peers ) )  );
     183          90 :   void * _multimap_by_addr        = FD_SCRATCH_ALLOC_APPEND( l, peer_map_by_addr_align(), peer_map_by_addr_footprint( peer_map_by_addr_chain_cnt_est( 2UL*max_peers ) )  );
     184          90 :   void * _score_treap             = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(),      score_treap_footprint( max_peers ) );
     185          90 :   void * _shadow_score_treap      = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(),      score_treap_footprint( max_peers ) );
     186          90 :   void * _peer_idx_list           = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong),           max_peers * sizeof(ulong) );
     187             : 
     188           0 :   selector->pool               = peer_pool_join( peer_pool_new( _pool, 2UL*max_peers ) );
     189             :   /* Seed treap priorities so the treap is balanced. */
     190          90 :   score_treap_seed( selector->pool, 2UL*max_peers, seed );
     191          90 :   selector->map_by_key         = peer_map_by_key_join( peer_map_by_key_new( _map, peer_map_by_key_chain_cnt_est( 2UL*max_peers ), seed ) );
     192          90 :   selector->map_by_addr        = peer_map_by_addr_join( peer_map_by_addr_new( _multimap_by_addr, peer_map_by_addr_chain_cnt_est( 2UL*max_peers ), seed ) );
     193          90 :   selector->score_treap        = score_treap_join( score_treap_new( _score_treap, max_peers ) );
     194          90 :   selector->shadow_score_treap = score_treap_join( score_treap_new( _shadow_score_treap, max_peers ) );
     195          90 :   selector->peer_idx_list      = (ulong *)_peer_idx_list;
     196          90 :   selector->max_peers          = max_peers;
     197             : 
     198          90 :   selector->cluster_slot.full          = 0UL;
     199          90 :   selector->cluster_slot.incremental   = FD_SSPEER_SLOT_UNKNOWN;
     200          90 :   selector->incremental_snapshot_fetch = incremental_snapshot_fetch;
     201             : 
     202          90 :   FD_COMPILER_MFENCE();
     203          90 :   FD_VOLATILE( selector->magic ) = FD_SSPEER_SELECTOR_MAGIC;
     204          90 :   FD_COMPILER_MFENCE();
     205             : 
     206          90 :   return (void *)selector;
     207          90 : }
     208             : 
     209             : fd_sspeer_selector_t *
     210          90 : fd_sspeer_selector_join( void * shselector ) {
     211          90 :   if( FD_UNLIKELY( !shselector ) ) {
     212           0 :     FD_LOG_WARNING(( "NULL shselector" ));
     213           0 :     return NULL;
     214           0 :   }
     215             : 
     216          90 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
     217           0 :     FD_LOG_WARNING(( "misaligned shselector" ));
     218           0 :     return NULL;
     219           0 :   }
     220             : 
     221          90 :   fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
     222             : 
     223          90 :   if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
     224           0 :     FD_LOG_WARNING(( "bad magic" ));
     225           0 :     return NULL;
     226           0 :   }
     227             : 
     228          90 :   return selector;
     229          90 : }
     230             : 
     231             : void *
     232          90 : fd_sspeer_selector_leave( fd_sspeer_selector_t * selector ) {
     233          90 :   if( FD_UNLIKELY( !selector ) ) {
     234           0 :     FD_LOG_WARNING(( "NULL selector" ));
     235           0 :     return NULL;
     236           0 :   }
     237             : 
     238          90 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)selector, fd_sspeer_selector_align() ) ) ) {
     239           0 :     FD_LOG_WARNING(( "misaligned selector" ));
     240           0 :     return NULL;
     241           0 :   }
     242             : 
     243          90 :   if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
     244           0 :     FD_LOG_WARNING(( "bad magic" ));
     245           0 :     return NULL;
     246           0 :   }
     247             : 
     248          90 :   selector->pool               = peer_pool_leave( selector->pool );
     249          90 :   selector->map_by_key         = peer_map_by_key_leave( selector->map_by_key );
     250          90 :   selector->map_by_addr        = peer_map_by_addr_leave( selector->map_by_addr );
     251          90 :   selector->score_treap        = score_treap_leave( selector->score_treap );
     252          90 :   selector->shadow_score_treap = score_treap_leave( selector->shadow_score_treap );
     253             : 
     254          90 :   return (void *)selector;
     255          90 : }
     256             : 
     257             : void *
     258          90 : fd_sspeer_selector_delete( void * shselector ) {
     259          90 :   if( FD_UNLIKELY( !shselector ) ) {
     260           0 :     FD_LOG_WARNING(( "NULL shselector" ));
     261           0 :     return NULL;
     262           0 :   }
     263             : 
     264          90 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
     265           0 :     FD_LOG_WARNING(( "misaligned shselector" ));
     266           0 :     return NULL;
     267           0 :   }
     268             : 
     269          90 :   fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
     270             : 
     271          90 :   if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
     272           0 :     FD_LOG_WARNING(( "bad magic" ));
     273           0 :     return NULL;
     274           0 :   }
     275             : 
     276          90 :   selector->pool               = peer_pool_delete( selector->pool );
     277          90 :   selector->map_by_key         = peer_map_by_key_delete( selector->map_by_key );
     278          90 :   selector->map_by_addr        = peer_map_by_addr_delete( selector->map_by_addr );
     279          90 :   selector->score_treap        = score_treap_delete( selector->score_treap );
     280          90 :   selector->shadow_score_treap = score_treap_delete( selector->shadow_score_treap );
     281             : 
     282          90 :   FD_COMPILER_MFENCE();
     283          90 :   FD_VOLATILE( selector->magic ) = 0UL;
     284          90 :   FD_COMPILER_MFENCE();
     285             : 
     286          90 :   return (void *)selector;
     287          90 : }
     288             : 
     289             : /* Calculates a score for a peer given its latency and its resolved
     290             :    full and incremental slots */
     291             : static ulong
     292             : fd_sspeer_selector_score( fd_sspeer_selector_t const * selector,
     293             :                           ulong                        peer_latency,
     294             :                           ulong                        full_slot,
     295         525 :                           ulong                        incr_slot ) {
     296         525 :   peer_latency = peer_latency!=FD_SSPEER_LATENCY_UNKNOWN ? peer_latency : DEFAULT_PEER_LATENCY;
     297             : 
     298         525 :   ulong slots_behind = DEFAULT_SLOTS_BEHIND;
     299             : 
     300         525 :   if( FD_LIKELY( full_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) {
     301         507 :     if( FD_LIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN &&
     302         507 :                    selector->cluster_slot.incremental!=FD_SSPEER_SLOT_UNKNOWN ) ) {
     303         345 :       slots_behind = selector->cluster_slot.incremental>incr_slot ? selector->cluster_slot.incremental - incr_slot : 0UL;
     304         345 :     } else {
     305             :       /* Either the peer has no incremental or the cluster has no
     306             :          incremental reference yet.  Fall back to comparing full_slot
     307             :          against the cluster full slot. */
     308         162 :       slots_behind = selector->cluster_slot.full>full_slot ? selector->cluster_slot.full - full_slot : 0UL;
     309         162 :     }
     310         507 :   }
     311             : 
     312             :   /* Using saturating arithmetic to avoid overflow and cap at
     313             :      FD_SSPEER_SCORE_MAX. */
     314         525 :   ulong penalty = fd_ulong_sat_mul( DEFAULT_SLOTS_BEHIND_PENALTY, slots_behind );
     315         525 :   ulong score   = fd_ulong_sat_add( peer_latency, penalty );
     316         525 :   return fd_ulong_min( score, FD_SSPEER_SCORE_MAX );
     317         525 : }
     318             : 
     319             : /* Validates slot arguments for both new and existing peers.  Returns
     320             :    0 on success, -1 on failure due to incr_slot<full_slot, and -2 on
     321             :    failure due to full_slot==UNKNOWN with incr_slot!=UNKNOWN.  The
     322             :    caller passes in the effective (already-resolved) full_slot and
     323             :    incr_slot values.  No log on failure (the caller is responsible
     324             :    for logging whenever needed).
     325             : 
     326             :    Two invariants are enforced:
     327             :    1. When both slots are known, incr_slot must be >= full_slot.
     328             :    2. An incremental slot requires a known full slot. */
     329             : static int
     330             : fd_sspeer_validate_slot_args( ulong full_slot,
     331         426 :                               ulong incr_slot ) {
     332         426 :   if( FD_UNLIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN &&
     333         426 :                    full_slot!=FD_SSPEER_SLOT_UNKNOWN &&
     334         426 :                    incr_slot<full_slot ) ) {
     335           9 :     return -1;
     336           9 :   }
     337             : 
     338         417 :   if( FD_UNLIKELY( full_slot==FD_SSPEER_SLOT_UNKNOWN &&
     339         417 :                    incr_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) {
     340           9 :     return -2;
     341           9 :   }
     342             : 
     343         408 :   return 0;
     344         417 : }
     345             : 
     346             : /* Updates a peer's score with new values for latency and/or resolved
     347             :    full/incremental slots.  Returns FD_SSPEER_UPDATE_SUCCESS on
     348             :    success, or the specific fd_sspeer_validate_slot_args error code
     349             :    on failure without modifying the peer or any data structure.
     350             : 
     351             :    Slot-based incremental clearing: when the caller provides
     352             :    incr_slot==UNKNOWN and full_slot!=UNKNOWN, the peer's existing
     353             :    incremental data is cleared if it is stale (peer->incr_slot <
     354             :    full_slot).  Otherwise, the existing incremental data is preserved. */
     355             : static int
     356             : fd_sspeer_selector_update( fd_sspeer_selector_t * selector,
     357             :                            fd_sspeer_private_t *  peer,
     358             :                            ulong                  latency,
     359             :                            ulong                  full_slot,
     360             :                            ulong                  incr_slot,
     361             :                            uchar const            full_hash[ FD_HASH_FOOTPRINT ],
     362          99 :                            uchar const            incr_hash[ FD_HASH_FOOTPRINT ] ) {
     363          99 :   ulong peer_latency   = latency!=FD_SSPEER_LATENCY_UNKNOWN ? latency : peer->latency;
     364          99 :   ulong peer_full_slot = full_slot!=FD_SSPEER_SLOT_UNKNOWN ? full_slot : peer->full_slot;
     365             : 
     366          99 :   ulong peer_incr_slot;
     367          99 :   int   clear_incr = 0;
     368          99 :   if( incr_slot!=FD_SSPEER_SLOT_UNKNOWN ) {
     369          39 :     peer_incr_slot = incr_slot;
     370          60 :   } else if( full_slot!=FD_SSPEER_SLOT_UNKNOWN &&
     371          60 :              peer->incr_slot!=FD_SSPEER_SLOT_UNKNOWN &&
     372          60 :              peer->incr_slot<full_slot ) {
     373             :     /* The caller is providing a new full_slot that has advanced past
     374             :        the peer's existing incremental — the incremental is stale. */
     375          12 :     peer_incr_slot = FD_SSPEER_SLOT_UNKNOWN;
     376          12 :     clear_incr     = 1;
     377          48 :   } else {
     378          48 :     peer_incr_slot = peer->incr_slot;
     379          48 :   }
     380             : 
     381          99 :   int validate_err = fd_sspeer_validate_slot_args( peer_full_slot, peer_incr_slot );
     382          99 :   if( FD_UNLIKELY( validate_err ) ) return validate_err;
     383             : 
     384          90 :   score_treap_ele_remove( selector->score_treap, peer, selector->pool );
     385             : 
     386          90 :   peer->score = fd_sspeer_selector_score( selector, peer_latency, peer_full_slot, peer_incr_slot );
     387             : 
     388          90 :   peer->latency   = peer_latency;
     389          90 :   peer->full_slot = peer_full_slot;
     390          90 :   peer->incr_slot = peer_incr_slot;
     391          90 :   if( FD_LIKELY( full_hash ) ) {
     392          36 :     fd_memcpy( peer->full_hash, full_hash, FD_HASH_FOOTPRINT );
     393          36 :   }
     394          90 :   if( FD_UNLIKELY( clear_incr ) ) {
     395          12 :     fd_memset( peer->incr_hash, 0, FD_HASH_FOOTPRINT );
     396          78 :   } else if( FD_LIKELY( incr_hash ) ) {
     397          15 :     fd_memcpy( peer->incr_hash, incr_hash, FD_HASH_FOOTPRINT );
     398          15 :   }
     399             : 
     400          90 :   score_treap_ele_insert( selector->score_treap, peer, selector->pool );
     401          90 :   return FD_SSPEER_UPDATE_SUCCESS;
     402          99 : }
     403             : 
     404             : int
     405             : fd_sspeer_selector_update_on_resolve( fd_sspeer_selector_t *  selector,
     406             :                                       fd_sspeer_key_t const * key,
     407             :                                       ulong                   full_slot,
     408             :                                       ulong                   incr_slot,
     409             :                                       uchar const             full_hash[ FD_HASH_FOOTPRINT ],
     410          42 :                                       uchar const             incr_hash[ FD_HASH_FOOTPRINT ] ) {
     411          42 :   if( FD_UNLIKELY( key==NULL ) ) return FD_SSPEER_UPDATE_ERR_NULL_KEY;
     412          39 :   fd_sspeer_private_t * peer = peer_map_by_key_ele_query( selector->map_by_key, key, NULL, selector->pool );
     413          39 :   if( FD_UNLIKELY( peer==NULL ) ) return FD_SSPEER_UPDATE_ERR_NOT_FOUND;
     414          30 :   int update_status = fd_sspeer_selector_update( selector, peer, FD_SSPEER_LATENCY_UNKNOWN, full_slot, incr_slot, full_hash, incr_hash );
     415          30 :   if( FD_UNLIKELY( update_status!=FD_SSPEER_UPDATE_SUCCESS ) ) return FD_SSPEER_UPDATE_ERR_INVALID_ARG;
     416          24 :   peer->valid = peer->full_slot!=FD_SSPEER_SLOT_UNKNOWN;
     417          24 :   return FD_SSPEER_UPDATE_SUCCESS;
     418          30 : }
     419             : 
     420             : ulong
     421             : fd_sspeer_selector_update_on_ping( fd_sspeer_selector_t * selector,
     422             :                                    fd_ip4_port_t          addr,
     423          21 :                                    ulong                  latency ) {
     424          21 :   ulong ele_idx = peer_map_by_addr_idx_query_const( selector->map_by_addr, &addr, ULONG_MAX, selector->pool );
     425          21 :   ulong cnt = 0UL;
     426          45 :   for(;;) {
     427          45 :     if( FD_UNLIKELY( ele_idx==ULONG_MAX ) ) break;
     428          24 :     fd_sspeer_private_t * peer = selector->pool + ele_idx;
     429             :     /* Update cannot fail here: slots are FD_SSPEER_SLOT_UNKNOWN and
     430             :        hashes are NULL, so fd_sspeer_validate_slot_args always
     431             :        returns FD_SSPEER_UPDATE_SUCCESS (no clear_incr trigger,
     432             :        no incr<full violation). */
     433          24 :     int update_status = fd_sspeer_selector_update( selector, peer, latency,
     434          24 :                                                    FD_SSPEER_SLOT_UNKNOWN, FD_SSPEER_SLOT_UNKNOWN,
     435          24 :                                                    NULL, NULL );
     436          24 :     if( FD_UNLIKELY( update_status!=FD_SSPEER_UPDATE_SUCCESS ) ) {
     437             :       /* A warning is a tradeoff between crashing with FD_LOG_CRIT and
     438             :          potentially missing the log altogether with FD_LOG_DEBUG. */
     439           0 :       if( peer->key.is_url ) {
     440           0 :         FD_LOG_WARNING(( "unexpected selector update returned %d for peer %s " FD_IP4_ADDR_FMT ":%hu",
     441           0 :                          update_status, peer->key.url.hostname,
     442           0 :                          FD_IP4_ADDR_FMT_ARGS( peer->key.url.resolved_addr.addr ), fd_ushort_bswap( peer->key.url.resolved_addr.port ) ));
     443           0 :       } else {
     444           0 :         FD_BASE58_ENCODE_32_BYTES( peer->key.pubkey->uc, peer_pubkey_b58 );
     445           0 :         FD_LOG_WARNING(( "unexpected selector update returned %d for peer %s " FD_IP4_ADDR_FMT ":%hu",
     446           0 :                          update_status, peer_pubkey_b58,
     447           0 :                          FD_IP4_ADDR_FMT_ARGS( peer->addr.addr ), fd_ushort_bswap( peer->addr.port ) ));
     448           0 :       }
     449           0 :     }
     450          24 :     ele_idx = peer_map_by_addr_idx_next_const( ele_idx, ULONG_MAX, selector->pool );
     451          24 :     cnt++;
     452          24 :   }
     453          21 :   return cnt;
     454          21 : }
     455             : 
     456             : ulong
     457             : fd_sspeer_selector_add( fd_sspeer_selector_t * selector,
     458             :                         fd_sspeer_key_t const * key,
     459             :                         fd_ip4_port_t          addr,
     460             :                         ulong                  latency,
     461             :                         ulong                  full_slot,
     462             :                         ulong                  incr_slot,
     463             :                         uchar const            full_hash[ FD_HASH_FOOTPRINT ],
     464         399 :                         uchar const            incr_hash[ FD_HASH_FOOTPRINT ] ) {
     465         399 :   if( FD_UNLIKELY( key==NULL ) ) return FD_SSPEER_SCORE_INVALID;
     466             :   /* A peer without a valid address cannot be added to the selector.
     467             :      For an existing peer changing from a valid address to 0, it is
     468             :      the caller's responsibility to remove them. */
     469         393 :   if( FD_UNLIKELY( !addr.l ) ) return FD_SSPEER_SCORE_INVALID;
     470             : 
     471         381 :   fd_sspeer_private_t * peer = peer_map_by_key_ele_query( selector->map_by_key, key, NULL, selector->pool );
     472         381 :   if( FD_LIKELY( peer ) ) {
     473          45 :     int update_status = fd_sspeer_selector_update( selector, peer, latency, full_slot, incr_slot, full_hash, incr_hash );
     474          45 :     if( FD_UNLIKELY( update_status!=FD_SSPEER_UPDATE_SUCCESS ) ) return FD_SSPEER_SCORE_INVALID;
     475             :     /* Update the addr map after the selector update so that the peer
     476             :        is not mutated when the update fails. */
     477          42 :     if( FD_UNLIKELY( peer->addr.l!=addr.l ) ) {
     478           6 :       peer_map_by_addr_ele_remove_fast( selector->map_by_addr, peer, selector->pool );
     479           6 :       peer->addr = addr;
     480           6 :       peer_map_by_addr_ele_insert( selector->map_by_addr, peer, selector->pool );
     481           6 :     }
     482         336 :   } else {
     483         336 :     if( FD_UNLIKELY( !peer_pool_free( selector->pool ) ) ) {
     484           0 :       FD_LOG_WARNING(( "peer selector pool exhausted" ));
     485           0 :       return FD_SSPEER_SCORE_INVALID;
     486           0 :     }
     487         336 :     if( FD_UNLIKELY( score_treap_ele_cnt(selector->score_treap)>=selector->max_peers ) ) {
     488           9 :       FD_LOG_WARNING(( "peer selector at max capacity" ));
     489           9 :       return FD_SSPEER_SCORE_INVALID;
     490           9 :     }
     491             : 
     492         327 :     if( FD_UNLIKELY( fd_sspeer_validate_slot_args( full_slot, incr_slot ) ) ) {
     493           9 :       return FD_SSPEER_SCORE_INVALID;
     494           9 :     }
     495             : 
     496         318 :     peer = peer_pool_ele_acquire( selector->pool );
     497         318 :     peer->key       = *key;
     498         318 :     peer->addr      = addr;
     499         318 :     peer->latency   = latency;
     500         318 :     peer->score     = fd_sspeer_selector_score( selector, latency, full_slot, incr_slot );
     501         318 :     peer->full_slot = full_slot;
     502         318 :     peer->incr_slot = incr_slot;
     503         318 :     if( FD_LIKELY( full_hash ) ) fd_memcpy( peer->full_hash, full_hash, FD_HASH_FOOTPRINT );
     504         306 :     else                         fd_memset( peer->full_hash, 0, FD_HASH_FOOTPRINT );
     505             :     /* full_hash and incr_hash are treated independently here. */
     506         318 :     if( FD_LIKELY( incr_hash ) ) fd_memcpy( peer->incr_hash, incr_hash, FD_HASH_FOOTPRINT );
     507         306 :     else                         fd_memset( peer->incr_hash, 0, FD_HASH_FOOTPRINT );
     508         318 :     peer_map_by_key_ele_insert( selector->map_by_key, peer, selector->pool );
     509         318 :     peer_map_by_addr_ele_insert( selector->map_by_addr, peer, selector->pool );
     510         318 :     score_treap_ele_insert( selector->score_treap, peer, selector->pool );
     511         318 :   }
     512         360 :   peer->valid = peer->full_slot!=FD_SSPEER_SLOT_UNKNOWN;
     513         360 :   return peer->score;
     514         381 : }
     515             : 
     516             : void
     517             : fd_sspeer_selector_remove( fd_sspeer_selector_t * selector,
     518         303 :                            fd_sspeer_key_t const * key ) {
     519         303 :   if( FD_UNLIKELY( key==NULL ) ) return;
     520         297 :   fd_sspeer_private_t * peer = peer_map_by_key_ele_query( selector->map_by_key, key, NULL, selector->pool );
     521         297 :   if( FD_UNLIKELY( peer==NULL ) ) return;
     522         282 :   score_treap_ele_remove( selector->score_treap, peer, selector->pool );
     523         282 :   peer_map_by_key_ele_remove_fast( selector->map_by_key, peer, selector->pool );
     524         282 :   peer_map_by_addr_ele_remove_fast( selector->map_by_addr, peer, selector->pool );
     525         282 :   peer_pool_ele_release( selector->pool, peer );
     526         282 : }
     527             : 
     528             : void
     529             : fd_sspeer_selector_remove_by_addr( fd_sspeer_selector_t * selector,
     530          24 :                                    fd_ip4_port_t          addr ) {
     531          60 :   for(;;) {
     532          60 :     fd_sspeer_private_t * peer = peer_map_by_addr_ele_remove( selector->map_by_addr, &addr, NULL, selector->pool );
     533          60 :     if( FD_UNLIKELY( peer==NULL ) ) break;
     534          36 :     score_treap_ele_remove( selector->score_treap, peer, selector->pool );
     535          36 :     peer_map_by_key_ele_remove_fast( selector->map_by_key, peer, selector->pool );
     536          36 :     peer_pool_ele_release( selector->pool, peer );
     537          36 :   }
     538          24 : }
     539             : 
     540             : fd_sspeer_t
     541             : fd_sspeer_selector_best( fd_sspeer_selector_t * selector,
     542             :                          int                    incremental,
     543         282 :                          ulong                  base_slot ) {
     544         282 :   if( FD_UNLIKELY( incremental && base_slot==FD_SSPEER_SLOT_UNKNOWN ) ) {
     545           3 :     FD_LOG_WARNING(( "incremental selection requires a valid base_slot" ));
     546           3 :     return (fd_sspeer_t){
     547           3 :       .addr      = { .l=0UL },
     548           3 :       .full_slot = FD_SSPEER_SLOT_UNKNOWN,
     549           3 :       .incr_slot = FD_SSPEER_SLOT_UNKNOWN,
     550           3 :       .score     = FD_SSPEER_SCORE_INVALID,
     551           3 :       .full_hash = {0},
     552           3 :       .incr_hash = {0},
     553           3 :     };
     554           3 :   }
     555             : 
     556         279 :   for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
     557         336 :        !score_treap_fwd_iter_done( iter );
     558         297 :        iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
     559         297 :     fd_sspeer_private_t const * peer = score_treap_fwd_iter_ele_const( iter, selector->pool );
     560             :     /* For full selection (!incremental), any valid peer is eligible.
     561             :        For incremental selection, the peer must serve the same base full
     562             :        snapshot and must actually offer an incremental snapshot. */
     563         297 :     if( FD_LIKELY( peer->valid &&
     564         297 :                    (!incremental ||
     565         297 :                    (peer->full_slot==base_slot && peer->incr_slot!=FD_SSPEER_SLOT_UNKNOWN) ) ) ) {
     566         240 :       fd_sspeer_t best = {
     567         240 :         .addr      = peer->addr,
     568         240 :         .full_slot = peer->full_slot,
     569         240 :         .incr_slot = peer->incr_slot,
     570         240 :         .score     = peer->score,
     571         240 :       };
     572         240 :       fd_memcpy( best.full_hash, peer->full_hash, FD_HASH_FOOTPRINT );
     573         240 :       fd_memcpy( best.incr_hash, peer->incr_hash, FD_HASH_FOOTPRINT );
     574         240 :       return best;
     575         240 :     }
     576         297 :   }
     577             : 
     578          39 :   return (fd_sspeer_t){
     579          39 :     .addr      = { .l=0UL },
     580          39 :     .full_slot = FD_SSPEER_SLOT_UNKNOWN,
     581          39 :     .incr_slot = FD_SSPEER_SLOT_UNKNOWN,
     582          39 :     .score     = FD_SSPEER_SCORE_INVALID,
     583          39 :     .full_hash = {0},
     584          39 :     .incr_hash = {0},
     585          39 :   };
     586         279 : }
     587             : 
     588             : void
     589             : fd_sspeer_selector_process_cluster_slot( fd_sspeer_selector_t * selector,
     590             :                                          ulong                  full_slot,
     591         180 :                                          ulong                  incr_slot ) {
     592         180 :   if( FD_UNLIKELY( full_slot==FD_SSPEER_SLOT_UNKNOWN ) ) return;
     593             : 
     594             :   /* Reject cluster slot updates where the incremental slot is before
     595             :      the full slot.  Both must be known for the check to apply.  Genesis
     596             :      (full_slot=0, incr_slot=0) is supported. */
     597         174 :   if( FD_UNLIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN && incr_slot<full_slot ) ) return;
     598             : 
     599         171 :   if( FD_LIKELY( selector->incremental_snapshot_fetch ) ) {
     600             :     /* The full slot must never regress, regardless of incr_slot. */
     601         150 :     if( FD_UNLIKELY( full_slot<selector->cluster_slot.full ) ) return;
     602             : 
     603             :     /* Reject updates that do not advance the cluster slot.
     604             :        incr_slot     | stored incr   | reject when
     605             :        --------------|---------------|--------------------------------
     606             :        valid         | valid         | incr_slot < stored.incremental
     607             :                      |               |   OR (incr_slot == stored.incremental
     608             :                      |               |       AND full_slot <= stored.full)
     609             :        valid         | _SLOT_UNKNOWN | incr_slot <  stored.full
     610             :                      |               |   (strict: genesis accepted)
     611             :        _SLOT_UNKNOWN | valid         | full_slot <= stored.full
     612             :        _SLOT_UNKNOWN | _SLOT_UNKNOWN | full_slot <= stored.full  */
     613         141 :     if( FD_UNLIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) {
     614         120 :       if( FD_UNLIKELY( selector->cluster_slot.incremental!=FD_SSPEER_SLOT_UNKNOWN ) ) {
     615          45 :         if( FD_UNLIKELY( ( incr_slot<selector->cluster_slot.incremental ||
     616          45 :                          ( incr_slot==selector->cluster_slot.incremental &&
     617          45 :                            full_slot<=selector->cluster_slot.full ) ) ) ) return;
     618          75 :       } else {
     619          75 :         if( FD_UNLIKELY( incr_slot<selector->cluster_slot.full ) ) return;
     620          75 :       }
     621         120 :     } else if( FD_UNLIKELY( full_slot<=selector->cluster_slot.full ) ) return;
     622             : 
     623         141 :   } else {
     624          21 :     if( FD_UNLIKELY( full_slot<=selector->cluster_slot.full ) ) return;
     625          21 :   }
     626             : 
     627         141 :   selector->cluster_slot.full = full_slot;
     628         141 :   if( FD_LIKELY( incr_slot!=FD_SSPEER_SLOT_UNKNOWN ) ) {
     629         108 :     selector->cluster_slot.incremental = incr_slot;
     630         108 :   } else if( FD_UNLIKELY( selector->cluster_slot.incremental!=FD_SSPEER_SLOT_UNKNOWN &&
     631          33 :                            selector->cluster_slot.incremental<full_slot ) ) {
     632             :     /* The full slot advanced past the incremental slot, so the
     633             :        incremental reference is stale and must be invalidated. */
     634           9 :     selector->cluster_slot.incremental = FD_SSPEER_SLOT_UNKNOWN;
     635           9 :   }
     636             : 
     637         141 :   if( FD_UNLIKELY( score_treap_ele_cnt( selector->score_treap )==0UL ) ) return;
     638             : 
     639             :   /* Rescore all peers
     640             :      TODO: make more performant, maybe make a treap rebalance API */
     641          33 :   ulong idx = 0UL;
     642          33 :   for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
     643         150 :         !score_treap_fwd_iter_done( iter );
     644         117 :         iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
     645             :     /* Do not remove the peer from the treap while the iterator is
     646             :        running.  Removing from peer_map(s) here is ok. */
     647         117 :     fd_sspeer_private_t * peer = score_treap_fwd_iter_ele( iter, selector->pool );
     648         117 :     fd_sspeer_private_t * shadow_peer = peer_pool_ele_acquire( selector->pool );
     649         117 :     shadow_peer->latency   = peer->latency;
     650         117 :     shadow_peer->full_slot = peer->full_slot;
     651         117 :     shadow_peer->incr_slot = peer->incr_slot;
     652         117 :     shadow_peer->addr      = peer->addr;
     653         117 :     shadow_peer->key       = peer->key;
     654         117 :     shadow_peer->score     = fd_sspeer_selector_score( selector, shadow_peer->latency, shadow_peer->full_slot, shadow_peer->incr_slot );
     655         117 :     shadow_peer->valid     = peer->valid;
     656         117 :     fd_memcpy( shadow_peer->full_hash, peer->full_hash, FD_HASH_FOOTPRINT );
     657         117 :     fd_memcpy( shadow_peer->incr_hash, peer->incr_hash, FD_HASH_FOOTPRINT );
     658         117 :     score_treap_ele_insert( selector->shadow_score_treap, shadow_peer, selector->pool );
     659         117 :     selector->peer_idx_list[ idx++ ] = peer_pool_idx( selector->pool, peer );
     660         117 :     peer_map_by_key_ele_remove_fast( selector->map_by_key, peer, selector->pool );
     661         117 :     peer_map_by_addr_ele_remove_fast( selector->map_by_addr, peer, selector->pool );
     662         117 :     peer_map_by_key_ele_insert( selector->map_by_key, shadow_peer, selector->pool );
     663         117 :     peer_map_by_addr_ele_insert( selector->map_by_addr, shadow_peer, selector->pool );
     664         117 :   }
     665             : 
     666             :   /* clear score treap*/
     667         150 :   for( ulong i=0UL; i<idx; i++ ) {
     668         117 :     fd_sspeer_private_t * peer = peer_pool_ele( selector->pool, selector->peer_idx_list[ i ] );
     669         117 :     score_treap_ele_remove( selector->score_treap, peer, selector->pool );
     670         117 :     peer_pool_ele_release( selector->pool, peer );
     671         117 :   }
     672             : 
     673          33 :   score_treap_t * tmp          = selector->score_treap;
     674          33 :   selector->score_treap        = selector->shadow_score_treap;
     675          33 :   selector->shadow_score_treap = tmp;
     676             : 
     677             : #if FD_SSPEER_SELECTOR_DEBUG
     678             :   FD_TEST( score_treap_verify( selector->score_treap, selector->pool )==0 );
     679             : #endif
     680          33 : }
     681             : 
     682             : fd_sscluster_slot_t
     683         111 : fd_sspeer_selector_cluster_slot( fd_sspeer_selector_t * selector ) {
     684         111 :   return selector->cluster_slot;
     685         111 : }
     686             : 
     687             : ulong
     688         180 : fd_sspeer_selector_peer_map_by_key_ele_cnt( fd_sspeer_selector_t * selector ) {
     689         180 :   ulong cnt = 0UL;
     690         180 :   for( peer_map_by_key_iter_t iter = peer_map_by_key_iter_init( selector->map_by_key, selector->pool );
     691         777 :       !peer_map_by_key_iter_done( iter, selector->map_by_key, selector->pool );
     692         597 :       iter = peer_map_by_key_iter_next( iter, selector->map_by_key, selector->pool ) ) {
     693         597 :     cnt++;
     694         597 :   }
     695         180 :   return cnt;
     696         180 : }
     697             : 
     698             : ulong
     699         156 : fd_sspeer_selector_peer_map_by_addr_ele_cnt( fd_sspeer_selector_t * selector ) {
     700         156 :   ulong cnt = 0UL;
     701         156 :   for( peer_map_by_addr_iter_t iter = peer_map_by_addr_iter_init( selector->map_by_addr, selector->pool );
     702         546 :       !peer_map_by_addr_iter_done( iter, selector->map_by_addr, selector->pool );
     703         390 :       iter = peer_map_by_addr_iter_next( iter, selector->map_by_addr, selector->pool ) ) {
     704         390 :     cnt++;
     705         390 :   }
     706         156 :   return cnt;
     707         156 : }

Generated by: LCOV version 1.14