LCOV - code coverage report
Current view: top level - discof/restore/utils - fd_sspeer_selector.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 174 245 71.0 %
Date: 2025-10-13 04:42:14 Functions: 10 13 76.9 %

          Line data    Source code
       1             : #include "fd_sspeer_selector.h"
       2             : 
       3             : struct fd_sspeer_private {
       4             :   fd_ip4_port_t addr;
       5             :   fd_ssinfo_t   ssinfo;
       6             :   ulong         latency;
       7             :   ulong         score;
       8             : 
       9             :   struct {
      10             :     ulong next;
      11             :   } pool;
      12             : 
      13             :   struct {
      14             :     ulong next;
      15             :     ulong prev;
      16             :   } map;
      17             : 
      18             :   struct {
      19             :     ulong parent;
      20             :     ulong left;
      21             :     ulong right;
      22             :     ulong prio;
      23             :   } score_treap;
      24             : };
      25             : 
      26             : typedef struct fd_sspeer_private fd_sspeer_private_t;
      27             : 
      28             : #define POOL_NAME  peer_pool
      29           9 : #define POOL_T     fd_sspeer_private_t
      30             : #define POOL_IDX_T ulong
      31      196674 : #define POOL_NEXT  pool.next
      32             : #include "../../../util/tmpl/fd_pool.c"
      33             : 
      34             : #define MAP_NAME               peer_map
      35          45 : #define MAP_KEY                addr
      36           0 : #define MAP_ELE_T              fd_sspeer_private_t
      37             : #define MAP_KEY_T              fd_ip4_port_t
      38         105 : #define MAP_PREV               map.prev
      39         204 : #define MAP_NEXT               map.next
      40         150 : #define MAP_KEY_EQ(k0,k1)      ((k0)->l==(k1)->l)
      41          90 : #define MAP_KEY_HASH(key,seed) (seed^(key)->l)
      42             : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
      43             : #include "../../../util/tmpl/fd_map_chain.c"
      44             : 
      45          72 : #define COMPARE_WORSE(x,y) ( (x)->score<(y)->score )
      46             : 
      47             : #define TREAP_T         fd_sspeer_private_t
      48             : #define TREAP_NAME      score_treap
      49             : #define TREAP_QUERY_T   void *                                         /* We don't use query ... */
      50             : #define TREAP_CMP(a,b)  (__extension__({ (void)(a); (void)(b); -1; })) /* which means we don't need to give a real
      51             :                                                                           implementation to cmp either */
      52         207 : #define TREAP_IDX_T     ulong
      53          72 : #define TREAP_LT        COMPARE_WORSE
      54         183 : #define TREAP_PARENT    score_treap.parent
      55         222 : #define TREAP_LEFT      score_treap.left
      56          99 : #define TREAP_RIGHT     score_treap.right
      57          87 : #define TREAP_PRIO      score_treap.prio
      58             : #include "../../../util/tmpl/fd_treap.c"
      59             : 
      60          45 : #define DEFAULT_SLOTS_BEHIND   (1000UL*1000UL)        /* 1,000,000 slots behind */
      61          45 : #define DEFAULT_PEER_LATENCY   (100L*1000L*1000L)     /* 100ms */
      62             : 
      63             : #define FD_SSPEER_SELECTOR_DEBUG 0
      64             : 
      65             : struct fd_sspeer_selector_private {
      66             :   fd_sspeer_private_t * pool;
      67             :   peer_map_t *          map;
      68             :   score_treap_t *       score_treap;
      69             :   score_treap_t *       shadow_score_treap;
      70             :   ulong *               peer_idx_list;
      71             :   fd_sscluster_slot_t   cluster_slot;
      72             :   int                   incremental_snapshot_fetch;
      73             : 
      74             :   ulong                 magic; /* ==FD_SSPEER_SELECTOR_MAGIC */
      75             : };
      76             : 
      77             : FD_FN_CONST ulong
      78          21 : fd_sspeer_selector_align( void ) {
      79          21 :   return fd_ulong_max( alignof( fd_sspeer_selector_t), fd_ulong_max( peer_pool_align(), fd_ulong_max( score_treap_align(), alignof(ulong) ) ) );
      80          21 : }
      81             : 
      82             : FD_FN_CONST ulong
      83           3 : fd_sspeer_selector_footprint( ulong max_peers ) {
      84           3 :   ulong l;
      85           3 :   l = FD_LAYOUT_INIT;
      86           3 :   l = FD_LAYOUT_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
      87           3 :   l = FD_LAYOUT_APPEND( l, peer_pool_align(),             peer_pool_footprint( 2UL*max_peers ) );
      88           3 :   l = FD_LAYOUT_APPEND( l, peer_map_align(),              peer_map_footprint( peer_map_chain_cnt_est( 2UL*max_peers ) ) );
      89           3 :   l = FD_LAYOUT_APPEND( l, score_treap_align(),           score_treap_footprint( max_peers ) );
      90           3 :   l = FD_LAYOUT_APPEND( l, score_treap_align(),           score_treap_footprint( max_peers ) );
      91           3 :   l = FD_LAYOUT_APPEND( l, alignof(ulong),                max_peers * sizeof(ulong) );
      92           3 :   return FD_LAYOUT_FINI( l, fd_sspeer_selector_align() );
      93           3 : }
      94             : 
      95             : void *
      96             : fd_sspeer_selector_new( void * shmem,
      97             :                         ulong  max_peers,
      98             :                         int    incremental_snapshot_fetch,
      99           3 :                         ulong  seed ) {
     100           3 :   if( FD_UNLIKELY( !shmem ) ) {
     101           0 :     FD_LOG_WARNING(( "NULL shmem" ));
     102           0 :     return NULL;
     103           0 :   }
     104             : 
     105           3 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_sspeer_selector_align() ) ) ) {
     106           0 :     FD_LOG_WARNING(( "unaligned shmem" ));
     107           0 :     return NULL;
     108           0 :   }
     109             : 
     110           3 :   if( FD_UNLIKELY( max_peers < 1UL ) ) {
     111           0 :     FD_LOG_WARNING(( "max_peers must be at least 1" ));
     112           0 :     return NULL;
     113           0 :   }
     114             : 
     115           3 :   FD_SCRATCH_ALLOC_INIT( l, shmem );
     116           3 :   fd_sspeer_selector_t * selector = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sspeer_selector_t), sizeof(fd_sspeer_selector_t) );
     117           3 :   void * _pool                    = FD_SCRATCH_ALLOC_APPEND( l, peer_pool_align(),             peer_pool_footprint( 2UL*max_peers ) );
     118           3 :   void * _map                     = FD_SCRATCH_ALLOC_APPEND( l, peer_map_align(),              peer_map_footprint( peer_map_chain_cnt_est( 2UL*max_peers ) )  );
     119           3 :   void * _score_treap             = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(),           score_treap_footprint( max_peers ) );
     120           3 :   void * _shadow_score_treap      = FD_SCRATCH_ALLOC_APPEND( l, score_treap_align(),           score_treap_footprint( max_peers ) );
     121           3 :   void * _peer_idx_list           = FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong),                max_peers * sizeof(ulong) );
     122             : 
     123           0 :   selector->pool               = peer_pool_join( peer_pool_new( _pool, max_peers ) );
     124           3 :   selector->map                = peer_map_join( peer_map_new( _map, peer_map_chain_cnt_est( 2UL*max_peers ), seed ) );
     125           3 :   selector->score_treap        = score_treap_join( score_treap_new( _score_treap, max_peers ) );
     126           3 :   selector->shadow_score_treap = score_treap_join( score_treap_new( _shadow_score_treap, max_peers ) );
     127           3 :   selector->peer_idx_list      = (ulong *)_peer_idx_list;
     128             : 
     129           3 :   selector->cluster_slot.full          = 0UL;
     130           3 :   selector->cluster_slot.incremental   = 0UL;
     131           3 :   selector->incremental_snapshot_fetch = incremental_snapshot_fetch;
     132             : 
     133           3 :   FD_COMPILER_MFENCE();
     134           3 :   FD_VOLATILE( selector->magic ) = FD_SSPEER_SELECTOR_MAGIC;
     135           3 :   FD_COMPILER_MFENCE();
     136             : 
     137           3 :   return (void *)selector;
     138           3 : }
     139             : 
     140             : fd_sspeer_selector_t *
     141           3 : fd_sspeer_selector_join( void * shselector ) {
     142           3 :   if( FD_UNLIKELY( !shselector ) ) {
     143           0 :     FD_LOG_WARNING(( "NULL shselector" ));
     144           0 :     return NULL;
     145           0 :   }
     146             : 
     147           3 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
     148           0 :     FD_LOG_WARNING(( "misaligned shselector" ));
     149           0 :     return NULL;
     150           0 :   }
     151             : 
     152           3 :   fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
     153             : 
     154           3 :   if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
     155           0 :     FD_LOG_WARNING(( "bad magic" ));
     156           0 :     return NULL;
     157           0 :   }
     158             : 
     159           3 :   return selector;
     160           3 : }
     161             : 
     162             : void *
     163           3 : fd_sspeer_selector_leave( fd_sspeer_selector_t * selector ) {
     164           3 :   if( FD_UNLIKELY( !selector ) ) {
     165           0 :     FD_LOG_WARNING(( "NULL selector" ));
     166           0 :     return NULL;
     167           0 :   }
     168             : 
     169           3 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)selector, fd_sspeer_selector_align() ) ) ) {
     170           0 :     FD_LOG_WARNING(( "misaligned selector" ));
     171           0 :     return NULL;
     172           0 :   }
     173             : 
     174           3 :   if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
     175           0 :     FD_LOG_WARNING(( "bad magic" ));
     176           0 :     return NULL;
     177           0 :   }
     178             : 
     179           3 :   selector->pool               = peer_pool_leave( selector->pool );
     180           3 :   selector->map                = peer_map_leave( selector->map );
     181           3 :   selector->score_treap        = score_treap_leave( selector->score_treap );
     182           3 :   selector->shadow_score_treap = score_treap_leave( selector->shadow_score_treap );
     183             : 
     184           3 :   return (void *)selector;
     185           3 : }
     186             : 
     187             : void *
     188           3 : fd_sspeer_selector_delete( void * shselector ) {
     189           3 :   if( FD_UNLIKELY( !shselector ) ) {
     190           0 :     FD_LOG_WARNING(( "NULL shselector" ));
     191           0 :     return NULL;
     192           0 :   }
     193             : 
     194           3 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shselector, fd_sspeer_selector_align() ) ) ) {
     195           0 :     FD_LOG_WARNING(( "misaligned shselector" ));
     196           0 :     return NULL;
     197           0 :   }
     198             : 
     199           3 :   fd_sspeer_selector_t * selector = (fd_sspeer_selector_t *)shselector;
     200             : 
     201           3 :   if( FD_UNLIKELY( selector->magic!=FD_SSPEER_SELECTOR_MAGIC ) ) {
     202           0 :     FD_LOG_WARNING(( "bad magic" ));
     203           0 :     return NULL;
     204           0 :   }
     205             : 
     206           3 :   selector->pool               = peer_pool_delete( selector->pool );
     207           3 :   selector->map                = peer_map_delete( selector->map );
     208           3 :   selector->score_treap        = score_treap_delete( selector->score_treap );
     209           3 :   selector->shadow_score_treap = score_treap_delete( selector->shadow_score_treap );
     210             : 
     211           3 :   FD_COMPILER_MFENCE();
     212           3 :   FD_VOLATILE( selector->magic ) = 0UL;
     213           3 :   FD_COMPILER_MFENCE();
     214             : 
     215           3 :   return (void *)selector;
     216           3 : }
     217             : 
     218             : /* Calculates a score for a peer given its latency and ssinfo */
     219             : ulong
     220             : fd_sspeer_selector_score( fd_sspeer_selector_t * selector,
     221             :                           ulong                  peer_latency,
     222          45 :                           fd_ssinfo_t const *    ssinfo ) {
     223          45 :   static const ulong slots_behind_penalty = 1000UL;
     224          45 :   ulong slot                              = ULONG_MAX;
     225          45 :   ulong slots_behind                      = DEFAULT_SLOTS_BEHIND;
     226          45 :   peer_latency = peer_latency!=ULONG_MAX ? peer_latency : DEFAULT_PEER_LATENCY;
     227             : 
     228          45 :   if( FD_LIKELY( ssinfo && ssinfo->full.slot!=ULONG_MAX ) ) {
     229          39 :     if( FD_UNLIKELY( ssinfo->incremental.slot==ULONG_MAX ) ) {
     230           0 :       slot         = ssinfo->full.slot;
     231           0 :       slots_behind = selector->cluster_slot.full>slot ? selector->cluster_slot.full - slot : 0UL;
     232          39 :     } else {
     233          39 :       slot         = ssinfo->incremental.slot;
     234          39 :       slots_behind = selector->cluster_slot.incremental>slot ? selector->cluster_slot.incremental - slot : 0UL;
     235          39 :     }
     236          39 :   }
     237             : 
     238             :   /* TODO: come up with a better/more dynamic score function */
     239          45 :   return peer_latency + slots_behind_penalty*slots_behind;
     240          45 : }
     241             : 
     242             : /* Updates a peer's score with new values for latency and/or ssinfo */
     243             : static void
     244             : fd_sspeer_selector_update( fd_sspeer_selector_t * selector,
     245             :                            fd_sspeer_private_t *  peer,
     246             :                            ulong                  latency,
     247           0 :                            fd_ssinfo_t const *    ssinfo ) {
     248           0 :   score_treap_ele_remove( selector->score_treap, peer, selector->pool );
     249             : 
     250           0 :   ulong               peer_latency = latency!=ULONG_MAX ? latency : peer->latency;
     251           0 :   fd_ssinfo_t const * peer_ssinfo  = ssinfo ? ssinfo : &peer->ssinfo;
     252             : 
     253           0 :   peer->score = fd_sspeer_selector_score( selector, peer_latency, peer_ssinfo );
     254             : 
     255           0 :   if( FD_LIKELY( ssinfo ) ) {
     256           0 :     peer->ssinfo = *ssinfo;
     257           0 :   }
     258             : 
     259           0 :   if( FD_LIKELY( latency!=ULONG_MAX ) ) {
     260           0 :     peer->latency = latency;
     261           0 :   }
     262             : 
     263           0 :   score_treap_ele_insert( selector->score_treap, peer, selector->pool );
     264           0 : }
     265             : 
     266             : ulong
     267             : fd_sspeer_selector_add( fd_sspeer_selector_t * selector,
     268             :                         fd_ip4_port_t          addr,
     269             :                         ulong                  latency,
     270          21 :                         fd_ssinfo_t const *    ssinfo ) {
     271          21 :   fd_sspeer_private_t * peer = peer_map_ele_query( selector->map, &addr, NULL, selector->pool );
     272          21 :   if( FD_LIKELY( peer ) ) {
     273           0 :     fd_sspeer_selector_update( selector, peer, latency, ssinfo );
     274          21 :   } else {
     275          21 :     if( FD_UNLIKELY( !peer_pool_free( selector->pool ) ) ) return ULONG_MAX;
     276             : 
     277          21 :     peer = peer_pool_ele_acquire( selector->pool );
     278          21 :     FD_TEST( peer );
     279          21 :     if( FD_LIKELY( ssinfo ) ) {
     280          18 :       peer->ssinfo  = *ssinfo;
     281          18 :     } else {
     282           3 :       peer->ssinfo.full.slot             = ULONG_MAX;
     283           3 :       peer->ssinfo.incremental.slot      = ULONG_MAX;
     284           3 :       peer->ssinfo.incremental.base_slot = ULONG_MAX;
     285           3 :     }
     286             : 
     287          21 :     peer->addr    = addr;
     288          21 :     peer->latency = latency;
     289          21 :     peer->score   = fd_sspeer_selector_score( selector, latency, ssinfo );
     290          21 :     peer_map_ele_insert( selector->map, peer, selector->pool );
     291          21 :     score_treap_ele_insert( selector->score_treap, peer, selector->pool );
     292          21 :   }
     293          21 :   return peer->score;
     294          21 : }
     295             : 
     296             : void
     297             : fd_sspeer_selector_remove( fd_sspeer_selector_t * selector,
     298           0 :                            fd_ip4_port_t          addr ) {
     299           0 :   fd_sspeer_private_t * peer = peer_map_ele_query( selector->map, &addr, NULL, selector->pool );
     300           0 :   if( FD_UNLIKELY( !peer ) ) return;
     301             : 
     302           0 :   score_treap_ele_remove( selector->score_treap, peer, selector->pool );
     303           0 :   peer_map_ele_remove_fast( selector->map, peer, selector->pool );
     304           0 :   peer_pool_ele_release( selector->pool, peer );
     305           0 : }
     306             : 
     307             : fd_sspeer_t
     308             : fd_sspeer_selector_best( fd_sspeer_selector_t * selector,
     309             :                          int                    incremental,
     310          24 :                          ulong                  base_slot ) {
     311          24 :   if( FD_UNLIKELY( incremental ) ) {
     312           9 :     FD_TEST( base_slot!=ULONG_MAX );
     313           9 :   }
     314             : 
     315          24 :   for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
     316          30 :        !score_treap_fwd_iter_done( iter );
     317          30 :        iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
     318          30 :     fd_sspeer_private_t const * peer = score_treap_fwd_iter_ele_const( iter, selector->pool );
     319          30 :     if( FD_LIKELY( peer->ssinfo.full.slot!=ULONG_MAX &&
     320          30 :                    (!incremental ||
     321          30 :                    (incremental && peer->ssinfo.incremental.base_slot==base_slot) ) ) ) {
     322          24 :       return (fd_sspeer_t){
     323          24 :         .addr    = peer->addr,
     324          24 :         .ssinfo  = peer->ssinfo,
     325          24 :         .score   = peer->score,
     326          24 :       };
     327          24 :     }
     328          30 :   }
     329             : 
     330           0 :   return (fd_sspeer_t){
     331           0 :     .addr={ .l=0UL },
     332           0 :     .ssinfo={ .full={ .slot=ULONG_MAX }, .incremental={ .base_slot=ULONG_MAX, .slot=ULONG_MAX } },
     333           0 :     .score=ULONG_MAX,
     334           0 :   };
     335          24 : }
     336             : 
     337             : void
     338             : fd_sspeer_selector_process_cluster_slot( fd_sspeer_selector_t * selector,
     339             :                                          ulong                  full_slot,
     340           9 :                                          ulong                  incremental_slot ) {
     341           9 :   if( full_slot==ULONG_MAX && incremental_slot==ULONG_MAX ) return;
     342             : 
     343           9 :   FD_TEST( full_slot!=ULONG_MAX );
     344           9 :   if( FD_LIKELY( selector->incremental_snapshot_fetch ) ) {
     345             :     /* incremental slot is less than or equal to cluster incremental slot */
     346           9 :     if( FD_UNLIKELY( incremental_slot!=ULONG_MAX && selector->cluster_slot.incremental!=ULONG_MAX && incremental_slot<=selector->cluster_slot.incremental ) ) return;
     347             :     /* incremental slot is less than or equal to cluster full slot when cluster incremental slot does not exist */
     348           9 :     else if( FD_UNLIKELY( incremental_slot!=ULONG_MAX && selector->cluster_slot.incremental==ULONG_MAX && incremental_slot<=selector->cluster_slot.full ) )   return;
     349             :     /* full slot is less than cluster full slot when incremental slot does not exist */
     350           9 :     else if( FD_UNLIKELY( incremental_slot==ULONG_MAX && full_slot<=selector->cluster_slot.full ) )                                                           return;
     351           9 :   } else {
     352           0 :     if( FD_UNLIKELY( full_slot<=selector->cluster_slot.full ) ) return;
     353           0 :   }
     354             : 
     355           9 :   selector->cluster_slot.full        = full_slot;
     356           9 :   selector->cluster_slot.incremental = incremental_slot;
     357             : 
     358           9 :   if( FD_UNLIKELY( score_treap_ele_cnt( selector->score_treap )==0UL ) ) return;
     359             : 
     360             :   /* Rescore all peers
     361             :      TODO: make more performant, maybe make a treap rebalance API */
     362           6 :   ulong idx = 0UL;
     363           6 :   for( score_treap_fwd_iter_t iter = score_treap_fwd_iter_init( selector->score_treap, selector->pool );
     364          30 :         !score_treap_fwd_iter_done( iter );
     365          24 :         iter = score_treap_fwd_iter_next( iter, selector->pool ) ) {
     366          24 :     fd_sspeer_private_t const * peer  = score_treap_fwd_iter_ele_const( iter, selector->pool );
     367          24 :     fd_sspeer_private_t * shadow_peer = peer_pool_ele_acquire( selector->pool );
     368          24 :     shadow_peer->latency = peer->latency;
     369          24 :     shadow_peer->ssinfo  = peer->ssinfo;
     370          24 :     shadow_peer->addr    = peer->addr;
     371          24 :     shadow_peer->score   = fd_sspeer_selector_score( selector, shadow_peer->latency, &shadow_peer->ssinfo );
     372          24 :     score_treap_ele_insert( selector->shadow_score_treap, shadow_peer, selector->pool );
     373          24 :     selector->peer_idx_list[ idx++ ] = peer_pool_idx( selector->pool, peer );
     374          24 :     peer_map_ele_remove( selector->map, &peer->addr, NULL, selector->pool );
     375          24 :     peer_map_ele_insert( selector->map, shadow_peer, selector->pool );
     376          24 :   }
     377             : 
     378             :   /* clear score treap*/
     379          30 :   for( ulong i=0UL; i<idx; i++ ) {
     380          24 :     fd_sspeer_private_t * peer = peer_pool_ele( selector->pool, selector->peer_idx_list[ i ] );
     381          24 :     score_treap_ele_remove( selector->score_treap, peer, selector->pool );
     382          24 :     peer_pool_ele_release( selector->pool, peer );
     383          24 :   }
     384             : 
     385           6 :   score_treap_t * tmp          = selector->score_treap;
     386           6 :   selector->score_treap        = selector->shadow_score_treap;
     387           6 :   selector->shadow_score_treap = tmp;
     388             : 
     389             : #if FD_SSPEER_SELECTOR_DEBUG
     390             :   FD_TEST( score_treap_verify( selector->score_treap, selector->pool )==0 );
     391             : #endif
     392           6 : }
     393             : 
     394             : fd_sscluster_slot_t
     395           0 : fd_sspeer_selector_cluster_slot( fd_sspeer_selector_t * selector ) {
     396           0 :   return selector->cluster_slot;
     397           0 : }

Generated by: LCOV version 1.14