LCOV - code coverage report
Current view: top level - discof/restore/utils - fd_sspeer_selector.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 171 241 71.0 %
Date: 2026-01-23 05:02:40 Functions: 10 13 76.9 %

          Line data    Source code
       1             : #include "fd_sspeer_selector.h"
       2             : 
       3             : struct fd_sspeer_private {
       4             :   fd_ip4_port_t addr;
       5             :   ulong         full_slot;
       6             :   ulong         incr_slot;
       7             :   ulong         latency;
       8             :   ulong         score;
       9             :   int           valid;
      10             : 
      11             :   struct {
      12             :     ulong next;
      13             :   } pool;
      14             : 
      15             :   struct {
      16             :     ulong next;
      17             :     ulong prev;
      18             :   } map;
      19             : 
      20             :   struct {
      21             :     ulong parent;
      22             :     ulong left;
      23             :     ulong right;
      24             :     ulong prio;
      25             :   } score_treap;
      26             : };
      27             : 
      28             : typedef struct fd_sspeer_private fd_sspeer_private_t;
      29             : 
      30             : #define POOL_NAME  peer_pool
      31           9 : #define POOL_T     fd_sspeer_private_t
      32             : #define POOL_IDX_T ulong
      33      196674 : #define POOL_NEXT  pool.next
      34             : #include "../../../util/tmpl/fd_pool.c"
      35             : 
      36             : #define MAP_NAME               peer_map
      37          45 : #define MAP_KEY                addr
      38           0 : #define MAP_ELE_T              fd_sspeer_private_t
      39             : #define MAP_KEY_T              fd_ip4_port_t
      40         105 : #define MAP_PREV               map.prev
      41         204 : #define MAP_NEXT               map.next
      42         150 : #define MAP_KEY_EQ(k0,k1)      ((k0)->l==(k1)->l)
      43          90 : #define MAP_KEY_HASH(key,seed) (seed^(key)->l)
      44             : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
      45             : #include "../../../util/tmpl/fd_map_chain.c"
      46             : 
      47          72 : #define COMPARE_WORSE(x,y) ( (x)->score<(y)->score )
      48             : 
      49             : #define TREAP_T         fd_sspeer_private_t
      50             : #define TREAP_NAME      score_treap
      51             : #define TREAP_QUERY_T   void *                                         /* We don't use query ... */
      52             : #define TREAP_CMP(a,b)  (__extension__({ (void)(a); (void)(b); -1; })) /* which means we don't need to give a real
      53             :                                                                           implementation to cmp either */
      54         207 : #define TREAP_IDX_T     ulong
      55          72 : #define TREAP_LT        COMPARE_WORSE
      56         183 : #define TREAP_PARENT    score_treap.parent
      57         222 : #define TREAP_LEFT      score_treap.left
      58          99 : #define TREAP_RIGHT     score_treap.right
      59          87 : #define TREAP_PRIO      score_treap.prio
      60             : #include "../../../util/tmpl/fd_treap.c"
      61             : 
      62          45 : #define DEFAULT_SLOTS_BEHIND   (1000UL*1000UL)        /* 1,000,000 slots behind */
      63          45 : #define DEFAULT_PEER_LATENCY   (100L*1000L*1000L)     /* 100ms */
      64             : 
      65             : #define FD_SSPEER_SELECTOR_DEBUG 0
      66             : 
      67             : struct fd_sspeer_selector_private {
      68             :   fd_sspeer_private_t * pool;
      69             :   peer_map_t *          map;
      70             :   score_treap_t *       score_treap;
      71             :   score_treap_t *       shadow_score_treap;
      72             :   ulong *               peer_idx_list;
      73             :   fd_sscluster_slot_t   cluster_slot;
      74             :   int                   incremental_snapshot_fetch;
      75             : 
      76             :   ulong                 magic; /* ==FD_SSPEER_SELECTOR_MAGIC */
      77             : };
      78             : 
      79             : FD_FN_CONST ulong
      80          21 : fd_sspeer_selector_align( void ) {
      81          21 :   return fd_ulong_max( alignof( fd_sspeer_selector_t), fd_ulong_max( peer_pool_align(), fd_ulong_max( score_treap_align(), alignof(ulong) ) ) );
      82          21 : }
      83             : 
      84             : FD_FN_CONST ulong
      85           3 : fd_sspeer_selector_footprint( ulong max_peers ) {
      86           3 :   ulong l;
      87           3 :   l = FD_LAYOUT_INIT;
      88           3 :   l = FD_LAYOUT_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
      89           3 :   l = FD_LAYOUT_APPEND( l, peer_pool_align(),             peer_pool_footprint( 2UL*max_peers ) );
      90           3 :   l = FD_LAYOUT_APPEND( l, peer_map_align(),              peer_map_footprint( peer_map_chain_cnt_est( 2UL*max_peers ) ) );
      91           3 :   l = FD_LAYOUT_APPEND( l, score_treap_align(),           score_treap_footprint( max_peers ) );
      92           3 :   l = FD_LAYOUT_APPEND( l, score_treap_align(),           score_treap_footprint( max_peers ) );
      93           3 :   l = FD_LAYOUT_APPEND( l, alignof(ulong),                max_peers * sizeof(ulong) );
      94           3 :   return FD_LAYOUT_FINI( l, fd_sspeer_selector_align() );
      95           3 : }
      96             : 
      97             : void *
      98             : fd_sspeer_selector_new( void * shmem,
      99             :                         ulong  max_peers,
     100             :                         int    incremental_snapshot_fetch,
     101           3 :                         ulong  seed ) {
     102           3 :   if( FD_UNLIKELY( !shmem ) ) {
     103           0 :     FD_LOG_WARNING(( "NULL shmem" ));
     104           0 :     return NULL;
     105           0 :   }
     106             : 
     107           3 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_sspeer_selector_align() ) ) ) {
     108           0 :     FD_LOG_WARNING(( "unaligned shmem" ));
     109           0 :     return NULL;
     110           0 :   }
     111             : 
     112           3 :   if( FD_UNLIKELY( max_peers < 1UL ) ) {
     113           0 :     FD_LOG_WARNING(( "max_peers must be at least 1" ));
     114           0 :     return NULL;
     115           0 :   }
     116             : 
     117           3 :   FD_SCRATCH_ALLOC_INIT( l, shmem );
     118           3 :   fd_sspeer_selector_t * selector = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
     119           3 :   void * _pool                    = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(),             peer_pool_footprint( 2UL*max_peers ) );
     120           3 :   void * _map                     = FD_SCRATCH_ALLOC_APPEND( l, peer_map_align(),              peer_map_footprint( peer_map_chain_cnt_est( 2UL*max_peers ) )  );
     121           3 :   void * _score_treap             = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(),           score_treap_footprint( max_peers ) );
     122           3 :   void * _shadow_score_treap      = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(),           score_treap_footprint( max_peers ) );
     123           3 :   void * _peer_idx_list           = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong),                max_peers * sizeof(ulong) );
     124             : 
     125           0 :   selector->pool               = peer_pool_join( peer_pool_new( _pool, max_peers ) );
     126           3 :   selector->map                = peer_map_join( peer_map_new( _map, peer_map_chain_cnt_est( 2UL*max_peers ), seed ) );
     127           3 :   selector->score_treap        = score_treap_join( score_treap_new( _score_treap, max_peers ) );
     128           3 :   selector->shadow_score_treap = score_treap_join( score_treap_new( _shadow_score_treap, max_peers ) );
     129           3 :   selector->peer_idx_list      = (ulong *)_peer_idx_list;
     130             : 
     131           3 :   selector->cluster_slot.full          = 0UL;
     132           3 :   selector->cluster_slot.incremental   = 0UL;
     133           3 :   selector->incremental_snapshot_fetch = incremental_snapshot_fetch;
     134             : 
     135           3 :   FD_COMPILER_MFENCE();
     136           3 :   FD_VOLATILE( selector->magic ) = FD_SSPEER_SELECTOR_MAGIC;
     137           3 :   FD_COMPILER_MFENCE();
     138             : 
     139           3 :   return (void *)selector;
     140           3 : }
     141             : 
     142             : fd_sspeer_selector_t *
     143           3 : fd_sspeer_selector_join( void * shselector ) {
     144           3 :   if( FD_UNLIKELY( !shselector ) ) {
     145           0 :     FD_LOG_WARNING(( "NULL shselector" ));
     146           0 :     return NULL;
     147           0 :   }
     148             : 
     149           3 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
     150           0 :     FD_LOG_WARNING(( "misaligned shselector" ));
     151           0 :     return NULL;
     152           0 :   }
     153             : 
     154           3 :   fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
     155             : 
     156           3 :   if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
     157           0 :     FD_LOG_WARNING(( "bad magic" ));
     158           0 :     return NULL;
     159           0 :   }
     160             : 
     161           3 :   return selector;
     162           3 : }
     163             : 
     164             : void *
     165           3 : fd_sspeer_selector_leave( fd_sspeer_selector_t * selector ) {
     166           3 :   if( FD_UNLIKELY( !selector ) ) {
     167           0 :     FD_LOG_WARNING(( "NULL selector" ));
     168           0 :     return NULL;
     169           0 :   }
     170             : 
     171           3 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)selector, fd_sspeer_selector_align() ) ) ) {
     172           0 :     FD_LOG_WARNING(( "misaligned selector" ));
     173           0 :     return NULL;
     174           0 :   }
     175             : 
     176           3 :   if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
     177           0 :     FD_LOG_WARNING(( "bad magic" ));
     178           0 :     return NULL;
     179           0 :   }
     180             : 
     181           3 :   selector->pool               = peer_pool_leave( selector->pool );
     182           3 :   selector->map                = peer_map_leave( selector->map );
     183           3 :   selector->score_treap        = score_treap_leave( selector->score_treap );
     184           3 :   selector->shadow_score_treap = score_treap_leave( selector->shadow_score_treap );
     185             : 
     186           3 :   return (void *)selector;
     187           3 : }
     188             : 
     189             : void *
     190           3 : fd_sspeer_selector_delete( void * shselector ) {
     191           3 :   if( FD_UNLIKELY( !shselector ) ) {
     192           0 :     FD_LOG_WARNING(( "NULL shselector" ));
     193           0 :     return NULL;
     194           0 :   }
     195             : 
     196           3 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
     197           0 :     FD_LOG_WARNING(( "misaligned shselector" ));
     198           0 :     return NULL;
     199           0 :   }
     200             : 
     201           3 :   fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
     202             : 
     203           3 :   if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
     204           0 :     FD_LOG_WARNING(( "bad magic" ));
     205           0 :     return NULL;
     206           0 :   }
     207             : 
     208           3 :   selector->pool               = peer_pool_delete( selector->pool );
     209           3 :   selector->map                = peer_map_delete( selector->map );
     210           3 :   selector->score_treap        = score_treap_delete( selector->score_treap );
     211           3 :   selector->shadow_score_treap = score_treap_delete( selector->shadow_score_treap );
     212             : 
     213           3 :   FD_COMPILER_MFENCE();
     214           3 :   FD_VOLATILE( selector->magic ) = 0UL;
     215           3 :   FD_COMPILER_MFENCE();
     216             : 
     217           3 :   return (void *)selector;
     218           3 : }
     219             : 
     220             : /* Calculates a score for a peer given its latency and its resolved
     221             :    full and incremental slots */
     222             : ulong
     223             : fd_sspeer_selector_score( fd_sspeer_selector_t * selector,
     224             :                           ulong                  peer_latency,
     225             :                           ulong                  full_slot,
     226          45 :                           ulong                  incr_slot ) {
     227          45 :   static const ulong slots_behind_penalty = 1000UL;
     228          45 :   ulong slot                              = ULONG_MAX;
     229          45 :   ulong slots_behind                      = DEFAULT_SLOTS_BEHIND;
     230          45 :   peer_latency = peer_latency!=ULONG_MAX ? peer_latency : DEFAULT_PEER_LATENCY;
     231             : 
     232          45 :   if( FD_LIKELY( full_slot!=ULONG_MAX ) ) {
     233          39 :     if( FD_UNLIKELY( incr_slot==ULONG_MAX ) ) {
     234           0 :       slot         = full_slot;
     235           0 :       slots_behind = selector->cluster_slot.full>slot ? selector->cluster_slot.full - slot : 0UL;
     236          39 :     } else {
     237          39 :       slot         = incr_slot;
     238          39 :       slots_behind = selector->cluster_slot.incremental>slot ? selector->cluster_slot.incremental - slot : 0UL;
     239          39 :     }
     240          39 :   }
     241             : 
     242             :   /* TODO: come up with a better/more dynamic score function */
     243          45 :   return peer_latency + slots_behind_penalty*slots_behind;
     244          45 : }
     245             : 
     246             : /* Updates a peer's score with new values for latency and/or resolved
     247             :    full/incremental slots */
     248             : static void
     249             : fd_sspeer_selector_update( fd_sspeer_selector_t * selector,
     250             :                            fd_sspeer_private_t *  peer,
     251             :                            ulong                  latency,
     252             :                            ulong                  full_slot,
     253           0 :                            ulong                  incr_slot ) {
     254           0 :   score_treap_ele_remove( selector->score_treap, peer, selector->pool );
     255             : 
     256           0 :   ulong peer_latency = latency!=ULONG_MAX ? latency : peer->latency;
     257           0 :   peer->score = fd_sspeer_selector_score( selector, peer_latency, full_slot, incr_slot );
     258             : 
     259           0 :   peer->full_slot = full_slot!=ULONG_MAX ? full_slot : peer->full_slot;
     260           0 :   peer->incr_slot = incr_slot!=ULONG_MAX ? incr_slot : peer->incr_slot;
     261             : 
     262           0 :   if( FD_LIKELY( latency!=ULONG_MAX ) ) {
     263           0 :     peer->latency = latency;
     264           0 :   }
     265             : 
     266           0 :   score_treap_ele_insert( selector->score_treap, peer, selector->pool );
     267           0 : }
     268             : 
     269             : ulong
     270             : fd_sspeer_selector_add( fd_sspeer_selector_t * selector,
     271             :                         fd_ip4_port_t          addr,
     272             :                         ulong                  latency,
     273             :                         ulong                  full_slot,
     274          21 :                         ulong                  incr_slot ) {
     275          21 :   fd_sspeer_private_t * peer = peer_map_ele_query( selector->map, &addr, NULL, selector->pool );
     276          21 :   if( FD_LIKELY( peer ) ) {
     277           0 :     fd_sspeer_selector_update( selector, peer, latency, full_slot, incr_slot );
     278          21 :   } else {
     279          21 :     if( FD_UNLIKELY( !peer_pool_free( selector->pool ) ) ) return ULONG_MAX;
     280             : 
     281          21 :     peer = peer_pool_ele_acquire( selector->pool );
     282          21 :     peer->addr      = addr;
     283          21 :     peer->latency   = latency;
     284          21 :     peer->score     = fd_sspeer_selector_score( selector, latency, full_slot, incr_slot );
     285          21 :     peer->full_slot = full_slot;
     286          21 :     peer->incr_slot = incr_slot;
     287          21 :     peer_map_ele_insert( selector->map, peer, selector->pool );
     288          21 :     score_treap_ele_insert( selector->score_treap, peer, selector->pool );
     289          21 :   }
     290          21 :   peer->valid = peer->latency!=ULONG_MAX && peer->full_slot!=ULONG_MAX;
     291          21 :   return peer->score;
     292          21 : }
     293             : 
     294             : void
     295             : fd_sspeer_selector_remove( fd_sspeer_selector_t * selector,
     296           0 :                            fd_ip4_port_t          addr ) {
     297           0 :   fd_sspeer_private_t * peer = peer_map_ele_query( selector->map, &addr, NULL, selector->pool );
     298           0 :   if( FD_UNLIKELY( !peer ) ) return;
     299             : 
     300           0 :   score_treap_ele_remove( selector->score_treap, peer, selector->pool );
     301           0 :   peer_map_ele_remove_fast( selector->map, peer, selector->pool );
     302           0 :   peer_pool_ele_release( selector->pool, peer );
     303           0 : }
     304             : 
     305             : fd_sspeer_t
     306             : fd_sspeer_selector_best( fd_sspeer_selector_t * selector,
     307             :                          int                    incremental,
     308          24 :                          ulong                  base_slot ) {
     309          24 :   if( FD_UNLIKELY( incremental ) ) {
     310           9 :     FD_TEST( base_slot!=ULONG_MAX );
     311           9 :   }
     312             : 
     313          24 :   for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
     314          30 :        !score_treap_fwd_iter_done( iter );
     315          30 :        iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
     316          30 :     fd_sspeer_private_t const * peer = score_treap_fwd_iter_ele_const( iter, selector->pool );
     317          30 :     if( FD_LIKELY( peer->valid &&
     318          30 :                    (!incremental ||
     319          30 :                    (incremental && peer->full_slot==base_slot) ) ) ) {
     320          24 :       return (fd_sspeer_t){
     321          24 :         .addr      = peer->addr,
     322          24 :         .full_slot = peer->full_slot,
     323          24 :         .incr_slot = peer->incr_slot,
     324          24 :         .score     = peer->score,
     325          24 :       };
     326          24 :     }
     327          30 :   }
     328             : 
     329           0 :   return (fd_sspeer_t){
     330           0 :     .addr      = { .l=0UL },
     331           0 :     .full_slot = ULONG_MAX,
     332           0 :     .incr_slot = ULONG_MAX,
     333           0 :     .score     = ULONG_MAX,
     334           0 :   };
     335          24 : }
     336             : 
     337             : void
     338             : fd_sspeer_selector_process_cluster_slot( fd_sspeer_selector_t * selector,
     339             :                                          ulong                  full_slot,
     340           9 :                                          ulong                  incr_slot ) {
     341           9 :   if( full_slot==ULONG_MAX && incr_slot==ULONG_MAX ) return;
     342             : 
     343           9 :   FD_TEST( full_slot!=ULONG_MAX );
     344           9 :   if( FD_LIKELY( selector->incremental_snapshot_fetch ) ) {
     345             :     /* incremental slot is less than or equal to cluster incremental slot */
     346           9 :     if( FD_UNLIKELY( incr_slot!=ULONG_MAX && selector->cluster_slot.incremental!=ULONG_MAX && incr_slot<=selector->cluster_slot.incremental ) ) return;
     347             :     /* incremental slot is less than or equal to cluster full slot when cluster incremental slot does not exist */
     348           9 :     else if( FD_UNLIKELY( incr_slot!=ULONG_MAX && selector->cluster_slot.incremental==ULONG_MAX && incr_slot<=selector->cluster_slot.full ) )   return;
     349             :     /* full slot is less than cluster full slot when incremental slot does not exist */
     350           9 :     else if( FD_UNLIKELY( incr_slot==ULONG_MAX && full_slot<=selector->cluster_slot.full ) )                                                           return;
     351           9 :   } else {
     352           0 :     if( FD_UNLIKELY( full_slot<=selector->cluster_slot.full ) ) return;
     353           0 :   }
     354             : 
     355           9 :   selector->cluster_slot.full        = full_slot;
     356           9 :   selector->cluster_slot.incremental = incr_slot;
     357             : 
     358           9 :   if( FD_UNLIKELY( score_treap_ele_cnt( selector->score_treap )==0UL ) ) return;
     359             : 
     360             :   /* Rescore all peers
     361             :      TODO: make more performant, maybe make a treap rebalance API */
     362           6 :   ulong idx = 0UL;
     363           6 :   for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
     364          30 :         !score_treap_fwd_iter_done( iter );
     365          24 :         iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
     366          24 :     fd_sspeer_private_t const * peer  = score_treap_fwd_iter_ele_const( iter, selector->pool );
     367          24 :     fd_sspeer_private_t * shadow_peer = peer_pool_ele_acquire( selector->pool );
     368          24 :     shadow_peer->latency   = peer->latency;
     369          24 :     shadow_peer->full_slot = peer->full_slot;
     370          24 :     shadow_peer->incr_slot = peer->incr_slot;
     371          24 :     shadow_peer->addr      = peer->addr;
     372          24 :     shadow_peer->score     = fd_sspeer_selector_score( selector, shadow_peer->latency, shadow_peer->full_slot, shadow_peer->incr_slot );
     373          24 :     score_treap_ele_insert( selector->shadow_score_treap, shadow_peer, selector->pool );
     374          24 :     selector->peer_idx_list[ idx++ ] = peer_pool_idx( selector->pool, peer );
     375          24 :     peer_map_ele_remove( selector->map, &peer->addr, NULL, selector->pool );
     376          24 :     peer_map_ele_insert( selector->map, shadow_peer, selector->pool );
     377          24 :   }
     378             : 
     379             :   /* clear score treap*/
     380          30 :   for( ulong i=0UL; i<idx; i++ ) {
     381          24 :     fd_sspeer_private_t * peer = peer_pool_ele( selector->pool, selector->peer_idx_list[ i ] );
     382          24 :     score_treap_ele_remove( selector->score_treap, peer, selector->pool );
     383          24 :     peer_pool_ele_release( selector->pool, peer );
     384          24 :   }
     385             : 
     386           6 :   score_treap_t * tmp          = selector->score_treap;
     387           6 :   selector->score_treap        = selector->shadow_score_treap;
     388           6 :   selector->shadow_score_treap = tmp;
     389             : 
     390             : #if FD_SSPEER_SELECTOR_DEBUG
     391             :   FD_TEST( score_treap_verify( selector->score_treap, selector->pool )==0 );
     392             : #endif
     393           6 : }
     394             : 
     395             : fd_sscluster_slot_t
     396           0 : fd_sspeer_selector_cluster_slot( fd_sspeer_selector_t * selector ) {
     397           0 :   return selector->cluster_slot;
     398           0 : }

Generated by: LCOV version 1.14