LCOV - code coverage report
Current view: top level - flamenco/gossip/crds - fd_crds_peer_samplers.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 92 146 63.0 %
Date: 2025-09-19 04:41:14 Functions: 9 13 69.2 %

          Line data    Source code
       1             : /* Owned by CRDS table, tightly coupled with contact info side table.
       2             : 
       3             :   The gossip table (currently) needs 25 (active set rotation)
       4             :   + 1 (tx pull req) stake-weighted peer samplers.
       5             : 
       6             :   Each sampler needs a different weight scoring implementation. This
       7             :   compile unit:
       8             :     - defines weight scoring implementations for each sampler.
       9             :     - defines functions to insert, remove, and update all samplers in
      10             :       one go
      11             : 
      12             :   The sample sets are designed to closely track modifications to the
      13             :   CRDS contact info table. */
      14             : 
      15             : #include "fd_crds.h"
      16             : #include "../fd_active_set_private.h"
      17             : 
      18         150 : #define SAMPLE_IDX_SENTINEL ULONG_MAX
      19             : 
      20             : #define SET_NAME peer_enabled
      21          78 : #define SET_MAX  CRDS_MAX_CONTACT_INFO
      22             : #include "../../../util/tmpl/fd_set.c"
      23             : 
      24             : /* This is a very rudimentary implementation of a weighted sampler. We will
      25             :    eventually want to use a modified fd_wsample. Using this until then. */
      26             : struct wpeer_sampler {
      27             :   /* Cumulative weight for each peer.
      28             :      Individual peer weight can be derived with cumul_weight[i]-cumul_weight[i-1]  */
      29             :   ulong          cumul_weight[ CRDS_MAX_CONTACT_INFO ];
      30             : 
      31             :   /* peer_enabled_test( peer_enabled, i ) determines if peer i is enabled. A
      32             :      disabled peer should not be scored (e.g., during an update) and should
      33             :      have a peer weight of 0 (i.e,. cumul_weight[i] - cumul_weight[i-1]==0).
      34             : 
      35             :      Why not just remove the peer?
      36             :       Adding/removing should strictly track the CRDS Contact Info
      37             :       table. That is to say a peer must have an entry in the sampler
      38             :       if it has an entry in the table, and vice versa. This simplifies
      39             :       the crds_samplers management logic which ties the peer to the
      40             :       same index across all samplers. A single sampler can "disable" a
      41             :       peer without affecting the other samplers.
      42             : 
      43             :      Why not just set the peer's weight to 0?
      44             :       0 is a legitimate score for a peer (wpeer_sampler_peer_score).
      45             :       A futre upsert into the CRDS table might trigger a re-score, which
      46             :       might inadvertently re-enable the peer in the sampler. We  want to
      47             :       distinguish peers that are "removed" from the sampler from peers
      48             :       that have a score of 0 based on the parameters provided. */
      49             :   peer_enabled_t peer_enabled[ peer_enabled_word_cnt ];
      50             : };
      51             : 
      52             : typedef struct wpeer_sampler wpeer_sampler_t;
      53             : 
      54             : 
      55             : struct crds_samplers {
      56             :   wpeer_sampler_t   pr_sampler[1];
      57             :   wpeer_sampler_t   bucket_samplers[25];
      58             :   fd_crds_entry_t * ele[ CRDS_MAX_CONTACT_INFO ];
      59             :   ulong             ele_cnt;
      60             : };
      61             : 
      62             : typedef struct crds_samplers crds_samplers_t;
      63             : 
      64             : #define PREV_PEER_WEIGHT( ps, idx ) \
      65        3903 :   ( (idx) ? (ps)->cumul_weight[(idx)-1] : 0UL )
      66             : int
      67          78 : wpeer_sampler_init( wpeer_sampler_t * ps ) {
      68          78 :   if( FD_UNLIKELY( !ps ) ) return -1;
      69     2555982 :   for( ulong i = 0UL; i < CRDS_MAX_CONTACT_INFO; i++ ) {
      70     2555904 :     ps->cumul_weight[i] = 0UL;
      71     2555904 :   }
      72             :   /* All peers are "enabled" by default as they get added. */
      73          78 :   peer_enabled_full( ps->peer_enabled );
      74          78 :   return 0;
      75          78 : }
      76             : 
      77             : ulong
      78             : wpeer_sampler_sample( wpeer_sampler_t const * ps,
      79             :                       fd_rng_t *              rng,
      80           3 :                       ulong                   ele_cnt ) {
      81           3 :   if( FD_UNLIKELY( !ele_cnt || !ps->cumul_weight[ele_cnt-1] ) ) {
      82           0 :     return SAMPLE_IDX_SENTINEL;
      83           0 :   }
      84             : 
      85           3 :   ulong sample = fd_rng_ulong_roll( rng, ps->cumul_weight[ele_cnt-1] );
      86             :   /* avoid sampling 0 */
      87           3 :   sample = fd_ulong_min( sample+1UL, ps->cumul_weight[ele_cnt-1] );
      88             : 
      89             :   /* Binary search for the smallest cumulative weight >= sample */
      90           3 :   ulong left = 0UL;
      91           3 :   ulong right = ele_cnt;
      92          18 :   while( left < right ) {
      93          15 :     ulong mid = left + (right - left) / 2UL;
      94          15 :     if( ps->cumul_weight[mid]<sample ) {
      95          12 :       left = mid + 1UL;
      96          12 :     } else {
      97           3 :       right = mid;
      98           3 :     }
      99          15 :   }
     100           3 :   return left;
     101           3 : }
     102             : 
     103             : int
     104             : wpeer_sampler_upd( wpeer_sampler_t * ps,
     105             :                    ulong             weight,
     106             :                    ulong             idx,
     107        3903 :                    ulong             ele_cnt ) {
     108        3903 :   if( FD_UNLIKELY( !ps ) ) return -1;
     109             : 
     110             :   /* Disabled peers should not be scored */
     111        3903 :   weight *= (ulong)peer_enabled_test( ps->peer_enabled, idx );
     112             : 
     113        3903 :   ulong old_weight = ps->cumul_weight[idx] - PREV_PEER_WEIGHT( ps, idx );
     114        3903 :   if( FD_UNLIKELY( old_weight==weight ) ) return 0;
     115             : 
     116        3903 :   if( weight>old_weight ) {
     117         156 :     for( ulong i=idx; i<ele_cnt; i++ ) {
     118          78 :       ps->cumul_weight[i] += (weight - old_weight);
     119          78 :     }
     120        3825 :   } else {
     121        7722 :     for( ulong i=idx; i<ele_cnt; i++ ) {
     122        3897 :       ps->cumul_weight[i] -= (old_weight - weight);
     123        3897 :     }
     124        3825 :   }
     125        3903 :   return 0;
     126        3903 : }
     127             : 
     128             : int
     129             : wpeer_sampler_disable( wpeer_sampler_t * ps,
     130             :                        ulong             idx,
     131           3 :                        ulong             ele_cnt ) {
     132           3 :   if( FD_UNLIKELY( !ps || idx>=ele_cnt ) ) return -1;
     133             : 
     134             :   /* Set the peer weight to zero */
     135           3 :   if( FD_UNLIKELY( wpeer_sampler_upd( ps, 0UL, idx, ele_cnt )<0 ) ) return -1;
     136             : 
     137             :   /* Disable the peer in the enabled set */
     138           3 :   peer_enabled_remove( ps->peer_enabled, idx );
     139           3 :   return 0;
     140           3 : }
     141             : 
     142             : int
     143             : wpeer_sampler_enable( wpeer_sampler_t * ps,
     144             :                       ulong             idx,
     145           0 :                       ulong             ele_cnt ) {
     146           0 :   if( FD_UNLIKELY( !ps || idx>=ele_cnt ) ) return -1;
     147           0 :   peer_enabled_insert( ps->peer_enabled, idx );
     148           0 :   return 0;
     149           0 : }
     150             : 
     151             : /* NOTE: this should only be called if the peer is dropped from the
     152             :          Contact Info table. Use wpeer_sampler_disable otherwise */
     153             : int
     154             : wpeer_sampler_rem( wpeer_sampler_t * ps,
     155             :                    ulong             idx,
     156           0 :                    ulong             ele_cnt ) {
     157           0 :   ulong score = ps->cumul_weight[idx] - PREV_PEER_WEIGHT( ps, idx );
     158             : 
     159           0 :   for( ulong i = idx+1; i < ele_cnt; i++ ) {
     160           0 :     ps->cumul_weight[i]  -= score;
     161           0 :     ps->cumul_weight[i-1] = ps->cumul_weight[i];
     162             : 
     163           0 :     peer_enabled_insert_if( ps->peer_enabled,  peer_enabled_test( ps->peer_enabled, i ), i-1UL );
     164           0 :     peer_enabled_remove_if( ps->peer_enabled, !peer_enabled_test( ps->peer_enabled, i ), i-1UL );
     165           0 :   }
     166             : 
     167           0 :   peer_enabled_insert( ps->peer_enabled, ele_cnt-1UL );
     168           0 :   return 0;
     169           0 : }
     170             : 
     171         150 : #define BASE_WEIGHT 100UL /* TODO: figure this out!! */
     172             : ulong
     173             : wpeer_sampler_peer_score( fd_crds_entry_t * peer,
     174         150 :                           long              now ) {
     175         150 :   if( FD_UNLIKELY( !peer->contact_info.is_active ) ) return 0;
     176         150 :   ulong score  = BASE_WEIGHT;
     177         150 :         score += peer->stake;
     178         150 :   if( FD_UNLIKELY( peer->wallclock_nanos<now-60*1000L*1000L*1000L ) ) score/=100;
     179             : 
     180         150 :   return score;
     181         150 : }
     182             : 
     183             : ulong
     184             : wpeer_sampler_bucket_score( fd_crds_entry_t * peer,
     185        3750 :                             ulong             bucket ) {
     186        3750 :   ulong peer_bucket = fd_active_set_stake_bucket( peer->stake );
     187        3750 :   ulong score       = fd_ulong_sat_add( fd_ulong_min( bucket, peer_bucket ), 1UL );
     188             : 
     189        3750 :   return score*score;
     190        3750 : }
     191             : 
     192             : 
     193             : 
     194             : void
     195           3 : crds_samplers_new( crds_samplers_t * ps ) {
     196           3 :   if( FD_UNLIKELY( !ps ) ) return;
     197             : 
     198           3 :   wpeer_sampler_init( ps->pr_sampler );
     199          78 :   for( ulong i=0UL; i<25UL; i++ ) {
     200          75 :     wpeer_sampler_init( &ps->bucket_samplers[i] );
     201          75 :   }
     202           3 :   ps->ele_cnt = 0UL;
     203       98307 :   for( ulong i=0UL; i<CRDS_MAX_CONTACT_INFO; i++ ) {
     204       98304 :     ps->ele[i] = NULL;
     205       98304 :   }
     206           3 : }
     207             : 
     208             : int
     209             : crds_samplers_upd_peer_at_idx( crds_samplers_t * ps,
     210             :                                fd_crds_entry_t * peer,
     211             :                                ulong             idx,
     212         150 :                                long              now ) {
     213         150 :   if( FD_UNLIKELY( idx>=ps->ele_cnt ) ) {
     214           0 :     FD_LOG_WARNING(( "Bad peer idx supplied in sample update" ));
     215           0 :     return -1;
     216           0 :   }
     217         150 :   ps->ele[idx] = peer;
     218         150 :   peer->contact_info.sampler_idx = idx;
     219         150 :   ulong peer_score = wpeer_sampler_peer_score( peer, now );
     220         150 :   if( FD_UNLIKELY( wpeer_sampler_upd( ps->pr_sampler, peer_score, idx, ps->ele_cnt )<0 ) ) return -1;
     221             : 
     222        3900 :   for( ulong i=0UL; i<25UL; i++ ) {
     223        3750 :     ulong bucket_score = wpeer_sampler_bucket_score( peer, i );
     224        3750 :     if( FD_UNLIKELY( !bucket_score ) ) FD_LOG_ERR(( "0-weighted peer in bucket, should not be possible" ));
     225        3750 :     if( FD_UNLIKELY( wpeer_sampler_upd( &ps->bucket_samplers[i], bucket_score, idx, ps->ele_cnt )<0 ) ) return -1;
     226        3750 :   }
     227             : 
     228         150 :   return 0;
     229         150 : }
     230             : 
     231             : int
     232             : crds_samplers_swap_peer_at_idx( crds_samplers_t *  ps,
     233             :                                 fd_crds_entry_t *  new_peer,
     234           0 :                                 ulong              idx ) {
     235           0 :   if( FD_UNLIKELY( idx>=ps->ele_cnt ) ) {
     236           0 :     FD_LOG_WARNING(( "Bad peer idx supplied in sample update" ));
     237           0 :     return -1;
     238           0 :   }
     239           0 :   fd_crds_entry_t * old_peer = ps->ele[idx];
     240           0 :   if( FD_UNLIKELY( !old_peer ) ) {
     241           0 :     FD_LOG_ERR(( "No peer at index %lu in samplers" , idx ));
     242           0 :   }
     243             : 
     244           0 :   ps->ele[idx]                       = new_peer;
     245           0 :   new_peer->contact_info.sampler_idx = idx;
     246           0 :   old_peer->contact_info.sampler_idx = SAMPLE_IDX_SENTINEL;
     247           0 :   return 0;
     248           0 : }
     249             : 
     250             : int
     251             : crds_samplers_add_peer( crds_samplers_t * ps,
     252             :                         fd_crds_entry_t * peer,
     253         150 :                         long              now ) {
     254         150 :   ulong idx = fd_ulong_min( ps->ele_cnt, (CRDS_MAX_CONTACT_INFO)-1UL );
     255         150 :   ps->ele_cnt++;
     256         150 :   if( FD_UNLIKELY( !!crds_samplers_upd_peer_at_idx( ps, peer, idx, now ) ) ){
     257           0 :     FD_LOG_WARNING(( "Failed to update peer in samplers" ));
     258           0 :     ps->ele_cnt--;
     259           0 :     ps->ele[idx] = NULL;
     260           0 :     return -1;
     261           0 :   }
     262         150 :   return 0;
     263         150 : }
     264             : 
     265             : int
     266             : crds_samplers_rem_peer( crds_samplers_t * ps,
     267           0 :                         fd_crds_entry_t * peer ) {
     268           0 :   ulong idx = peer->contact_info.sampler_idx;
     269           0 :   if( FD_UNLIKELY( idx>=ps->ele_cnt ) ) return -1;
     270           0 :   if( FD_UNLIKELY( wpeer_sampler_rem( ps->pr_sampler, idx, ps->ele_cnt )<0 ) ) return -1;
     271           0 :   for( ulong i=0UL; i<25UL; i++ ) {
     272           0 :     if( FD_UNLIKELY( wpeer_sampler_rem( &ps->bucket_samplers[i], idx, ps->ele_cnt )<0 ) ) return -1;
     273           0 :   }
     274             : 
     275             :   // Shift the elements down in elems array
     276           0 :   for( ulong i = idx+1; i < ps->ele_cnt; i++ ) {
     277           0 :     ps->ele[i-1]                           = ps->ele[i];
     278           0 :     ps->ele[i-1]->contact_info.sampler_idx = i-1;
     279           0 :   }
     280           0 :   ps->ele_cnt--;
     281           0 :   return 0;
     282           0 : }

Generated by: LCOV version 1.14