LCOV - code coverage report
Current view: top level - discof/repair - fd_policy.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 251 0.0 %
Date: 2025-12-07 04:58:33 Functions: 0 16 0.0 %

          Line data    Source code
       1             : #include "fd_policy.h"
       2             : #include "../../disco/metrics/fd_metrics.h"
       3             : 
       4             : #define NONCE_NULL        (UINT_MAX)
       5           0 : #define DEFER_REPAIR_MS   (200UL)
       6           0 : #define TARGET_TICK_PER_SLOT (64.0)
       7           0 : #define MS_PER_TICK          (400.0 / TARGET_TICK_PER_SLOT)
       8             : 
       9             : void *
      10           0 : fd_policy_new( void * shmem, ulong dedup_max, ulong peer_max, ulong seed ) {
      11             : 
      12           0 :   if( FD_UNLIKELY( !shmem ) ) {
      13           0 :     FD_LOG_WARNING(( "NULL mem" ));
      14           0 :     return NULL;
      15           0 :   }
      16             : 
      17           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_policy_align() ) ) ) {
      18           0 :     FD_LOG_WARNING(( "misaligned mem" ));
      19           0 :     return NULL;
      20           0 :   }
      21             : 
      22           0 :   ulong footprint = fd_policy_footprint( dedup_max, peer_max );
      23           0 :   fd_memset( shmem, 0, footprint );
      24             : 
      25           0 :   int lg_peer_max = fd_ulong_find_msb( fd_ulong_pow2_up( peer_max ) );
      26           0 :   FD_SCRATCH_ALLOC_INIT( l, shmem );
      27           0 :   fd_policy_t * policy     = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_align(),            sizeof(fd_policy_t)                           );
      28           0 :   void *        dedup_map  = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_map_align(),  fd_policy_dedup_map_footprint ( dedup_max   ) );
      29           0 :   void *        dedup_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_pool_align(), fd_policy_dedup_pool_footprint( dedup_max   ) );
      30           0 :   void *        dedup_lru  = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_lru_align(),  fd_policy_dedup_lru_footprint()               );
      31           0 :   void *        peers      = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_peer_map_align(),   fd_policy_peer_map_footprint  ( lg_peer_max ) );
      32           0 :   void *        peers_pool = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_pool_align(),         fd_peer_pool_footprint        ( peer_max    ) );
      33           0 :   void *        peers_fast = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_dlist_align(),        fd_peer_dlist_footprint()                     );
      34           0 :   void *        peers_slow = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_dlist_align(),        fd_peer_dlist_footprint()                     );
      35           0 :   FD_TEST( FD_SCRATCH_ALLOC_FINI( l, fd_policy_align() ) == (ulong)shmem + footprint );
      36             : 
      37           0 :   policy->dedup.map     = fd_policy_dedup_map_new ( dedup_map,  dedup_max,   seed );
      38           0 :   policy->dedup.pool    = fd_policy_dedup_pool_new( dedup_pool, dedup_max         );
      39           0 :   policy->dedup.lru     = fd_policy_dedup_lru_new ( dedup_lru                     );
      40           0 :   policy->peers.map     = fd_policy_peer_map_new  ( peers,      lg_peer_max, seed );
      41           0 :   policy->peers.pool    = fd_peer_pool_new        ( peers_pool, peer_max          );
      42           0 :   policy->peers.fast    = fd_peer_dlist_new       ( peers_fast                    );
      43           0 :   policy->peers.slow    = fd_peer_dlist_new       ( peers_slow                    );
      44           0 :   policy->turbine_slot0 = ULONG_MAX;
      45           0 :   policy->nonce         = 1;
      46             : 
      47           0 :   return shmem;
      48           0 : }
      49             : 
      50             : fd_policy_t *
      51           0 : fd_policy_join( void * shpolicy ) {
      52           0 :   fd_policy_t * policy = (fd_policy_t *)shpolicy;
      53             : 
      54           0 :   if( FD_UNLIKELY( !policy ) ) {
      55           0 :     FD_LOG_WARNING(( "NULL policy" ));
      56           0 :     return NULL;
      57           0 :   }
      58             : 
      59           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)policy, fd_policy_align() ) ) ) {
      60           0 :     FD_LOG_WARNING(( "misaligned policy" ));
      61           0 :     return NULL;
      62           0 :   }
      63             : 
      64           0 :   fd_wksp_t * wksp = fd_wksp_containing( policy );
      65           0 :   if( FD_UNLIKELY( !wksp ) ) {
      66           0 :     FD_LOG_WARNING(( "policy must be part of a workspace" ));
      67           0 :     return NULL;
      68           0 :   }
      69             : 
      70           0 :   policy->dedup.map  = fd_policy_dedup_map_join ( policy->dedup.map   );
      71           0 :   policy->dedup.pool = fd_policy_dedup_pool_join( policy->dedup.pool  );
      72           0 :   policy->dedup.lru  = fd_policy_dedup_lru_join ( policy->dedup.lru   );
      73           0 :   policy->peers.map  = fd_policy_peer_map_join  ( policy->peers.map   );
      74           0 :   policy->peers.pool = fd_peer_pool_join        ( policy->peers.pool  );
      75           0 :   policy->peers.fast = fd_peer_dlist_join       ( policy->peers.fast  );
      76           0 :   policy->peers.slow = fd_peer_dlist_join       ( policy->peers.slow );
      77             : 
      78           0 :   policy->peers.select.iter  = fd_peer_dlist_iter_fwd_init( policy->peers.slow, policy->peers.pool );
      79           0 :   policy->peers.select.stage = 0;
      80             : 
      81           0 :   return policy;
      82           0 : }
      83             : 
      84             : void *
      85           0 : fd_policy_leave( fd_policy_t const * policy ) {
      86             : 
      87           0 :   if( FD_UNLIKELY( !policy ) ) {
      88           0 :     FD_LOG_WARNING(( "NULL policy" ));
      89           0 :     return NULL;
      90           0 :   }
      91             : 
      92           0 :   return (void *)policy;
      93           0 : }
      94             : 
      95             : void *
      96           0 : fd_policy_delete( void * policy ) {
      97             : 
      98           0 :   if( FD_UNLIKELY( !policy ) ) {
      99           0 :     FD_LOG_WARNING(( "NULL policy" ));
     100           0 :     return NULL;
     101           0 :   }
     102             : 
     103           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)policy, fd_policy_align() ) ) ) {
     104           0 :     FD_LOG_WARNING(( "misaligned policy" ));
     105           0 :     return NULL;
     106           0 :   }
     107             : 
     108           0 :   return policy;
     109           0 : }
     110             : 
     111             : /* dedup_evict evicts the first element returned by the map iterator. */
     112             : 
     113             : static void
     114           0 : dedup_evict( fd_policy_t * policy ) {
     115           0 :   fd_policy_dedup_ele_t * ele = fd_policy_dedup_lru_ele_pop_head( policy->dedup.lru, policy->dedup.pool );
     116           0 :   fd_policy_dedup_map_ele_remove( policy->dedup.map, &ele->key, NULL, policy->dedup.pool );
     117           0 :   fd_policy_dedup_pool_ele_release( policy->dedup.pool, ele );
     118           0 : }
     119             : 
     120             : /* dedup_next returns 1 if key is deduped, 0 otherwise. */
     121             : static int
     122           0 : dedup_next( fd_policy_t * policy, ulong key, long now ) {
     123           0 :   fd_policy_dedup_t *     dedup = &policy->dedup;
     124           0 :   fd_policy_dedup_ele_t * ele   = fd_policy_dedup_map_ele_query( dedup->map, &key, NULL, dedup->pool );
     125           0 :   if( FD_UNLIKELY( !ele ) ) {
     126           0 :     if( FD_UNLIKELY( !fd_policy_dedup_pool_free( dedup->pool ) ) ) dedup_evict( policy );
     127           0 :     ele         = fd_policy_dedup_pool_ele_acquire( dedup->pool );
     128           0 :     ele->key    = key;
     129           0 :     ele->req_ts = 0;
     130           0 :     fd_policy_dedup_map_ele_insert   ( dedup->map, ele, dedup->pool );
     131           0 :     fd_policy_dedup_lru_ele_push_tail( dedup->lru, ele, dedup->pool );
     132           0 :   }
     133           0 :   if( FD_LIKELY( now < ele->req_ts + (long)FD_POLICY_DEDUP_TIMEOUT ) ) {
     134           0 :     return 1;
     135           0 :   }
     136           0 :   ele->req_ts = now;
     137           0 :   return 0;
     138           0 : }
     139             : 
     140           0 : static ulong ts_ms( long wallclock ) {
     141           0 :   return (ulong)wallclock / (ulong)1e6;
     142           0 : }
     143             : 
     144             : static int
     145           0 : passes_throttle_threshold( fd_policy_t * policy, fd_forest_blk_t * ele ) {
     146           0 :   if( FD_UNLIKELY( ele->slot < policy->turbine_slot0 ) ) return 1;
     147             :   /* Essentially is checking if current duration of block ( from the
     148             :      first shred received until now ) is greater than the highest tick
     149             :      received + 200ms. */
     150           0 :   double current_duration = (double)(fd_tickcount() - ele->first_shred_ts) / fd_tempo_tick_per_ns(NULL);
     151           0 :   double tick_plus_buffer = (ele->est_buffered_tick_recv * MS_PER_TICK + DEFER_REPAIR_MS) * 1e6; // change to 400e6 for a slot duration policy
     152             : 
     153           0 :   if( current_duration >= tick_plus_buffer ){
     154           0 :     FD_MCNT_INC( REPAIR, EAGER_REPAIR_AGGRESSES, 1 );
     155           0 :     return 1;
     156           0 :   }
     157           0 :   return 0;
     158           0 : }
     159             : 
     160             : fd_pubkey_t const *
     161           0 : fd_policy_peer_select( fd_policy_t * policy ) {
     162           0 :   fd_peer_dlist_t * best_dlist  = policy->peers.fast;
     163           0 :   fd_peer_dlist_t * worst_dlist = policy->peers.slow;
     164           0 :   fd_peer_t       * pool        = policy->peers.pool;
     165             : 
     166           0 :   if( FD_UNLIKELY( fd_peer_pool_used( policy->peers.pool ) == 0 ) ) return NULL;
     167             : 
     168           0 :   fd_peer_dlist_t * dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? best_dlist : worst_dlist;
     169             : 
     170           0 :   while( FD_UNLIKELY( fd_peer_dlist_iter_done( policy->peers.select.iter, dlist, pool ) ) ) {
     171           0 :     policy->peers.select.stage = (policy->peers.select.stage + 1) % (sizeof(bucket_stages) / sizeof(uint));
     172           0 :     dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? best_dlist : worst_dlist;
     173           0 :     policy->peers.select.iter = fd_peer_dlist_iter_fwd_init( dlist, pool );
     174           0 :   }
     175           0 :   fd_peer_t * select = fd_peer_dlist_iter_ele( policy->peers.select.iter, dlist, pool );
     176           0 :   policy->peers.select.iter = fd_peer_dlist_iter_fwd_next( policy->peers.select.iter, dlist, pool );
     177           0 :   return &select->identity;
     178           0 : }
     179             : 
     180             : fd_repair_msg_t const *
     181           0 : fd_policy_next( fd_policy_t * policy, fd_forest_t * forest, fd_repair_t * repair, long now, ulong highest_known_slot, int * charge_busy ) {
     182           0 :   fd_forest_blk_t *      pool     = fd_forest_pool( forest );
     183           0 :   fd_forest_subtlist_t * subtlist = fd_forest_subtlist( forest );
     184           0 :   *charge_busy = 0;
     185             : 
     186           0 :   if( FD_UNLIKELY( forest->root == ULONG_MAX ) ) return NULL;
     187           0 :   if( FD_UNLIKELY( fd_peer_pool_used( policy->peers.pool ) == 0 ) ) return NULL;
     188             : 
     189           0 :   fd_repair_msg_t * out = NULL;
     190           0 :   ulong now_ms = ts_ms( now );
     191             : 
     192           0 :   for( fd_forest_subtlist_iter_t iter = fd_forest_subtlist_iter_fwd_init( subtlist, pool );
     193           0 :                                        !fd_forest_subtlist_iter_done    ( iter, subtlist, pool );
     194           0 :                                  iter = fd_forest_subtlist_iter_fwd_next( iter, subtlist, pool ) ) {
     195           0 :     *charge_busy = 1;
     196           0 :     fd_forest_blk_t * orphan = fd_forest_subtlist_iter_ele( iter, subtlist, pool );
     197           0 :     ulong key                = fd_policy_dedup_key( FD_REPAIR_KIND_ORPHAN, orphan->slot, UINT_MAX );
     198           0 :     if( FD_UNLIKELY( !dedup_next( policy, key, now ) ) ) {
     199           0 :       out = fd_repair_orphan( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, orphan->slot );
     200           0 :       policy->nonce++;
     201           0 :       return out;
     202           0 :     }
     203           0 :   }
     204             : 
     205             :   /* Select a slot to operate on 🔪. Advance either the orphan iter or
     206             :      regular iter. */
     207           0 :   fd_forest_iter_t * iter = NULL;
     208           0 :   if( FD_UNLIKELY( fd_forest_reqslist_is_empty( fd_forest_reqslist( forest ), fd_forest_reqspool( forest ) ) ) ) {
     209             :     /* If the main tree has nothing to iterate at the moment, we can
     210             :        request down the ORPHAN trees on slots we know about. */
     211           0 :     iter = &forest->orphiter;
     212           0 :   } else {
     213           0 :     iter = &forest->iter;
     214           0 :   }
     215             : 
     216           0 :   fd_forest_iter_next( iter, forest );
     217           0 :   if( FD_UNLIKELY( fd_forest_iter_done( iter, forest ) ) ) {
     218             :     // This happens when we have already requested all the shreds we know about.
     219           0 :     return NULL;
     220           0 :   }
     221             : 
     222           0 :   fd_forest_blk_t * ele = fd_forest_pool_ele( pool, iter->ele_idx );
     223           0 :   if( FD_UNLIKELY( !passes_throttle_threshold( policy, ele ) ) ) {
     224             :     /* When we are at the head of the turbine, we should give turbine the
     225             :        chance to complete the shreds.  Agave waits 200ms from the
     226             :        estimated "correct time" of the highest shred received to repair.
     227             :        i.e. if we've received the first 200 shreds, the 200th has a tick
     228             :        of x. Translate that to millis, and we should wait to request shred
     229             :        201 until x + 200ms.  If we have a hole, i.e. first 200 shreds
     230             :        receive except shred 100, and the 101th shred has a tick of y, we
     231             :        should wait until y + 200ms to request shred 100.
     232             : 
     233             :        Here we did not pass the timeout threshold, so we are not ready
     234             :        to repair this slot yet.  But it's possible we have another fork
     235             :        that we need to repair... so we just should skip to the next SLOT
     236             :        in the main tree iterator.  The likelihood that this ele is the
     237             :        head of turbine is high, which means that the shred_idx of the
     238             :        iterf is likely to be UINT_MAX, which means calling
     239             :        fd_forest_iter_next will advance the iterf to the next slot. */
     240           0 :     iter->shred_idx = UINT_MAX;
     241             :     /* TODO: Heinous... but the easiest way to ensure this slot gets
     242             :        added back to the requests deque is if we set the shred_idx to
     243             :        UINT_MAX, but maybe there should be an explicit API for it. */
     244             : 
     245           0 :     return NULL;
     246           0 :   }
     247             : 
     248           0 :   *charge_busy = 1;
     249             : 
     250           0 :   if( FD_UNLIKELY( iter->shred_idx == UINT_MAX ) ) {
     251           0 :     if( FD_UNLIKELY( ele->slot < highest_known_slot ) ) {
     252             :       // We'll never know the the highest shred for the current turbine slot, so there's no point in requesting it.
     253           0 :       out = fd_repair_highest_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, 0 );
     254           0 :       policy->nonce++;
     255           0 :     }
     256           0 :   } else {
     257           0 :     out = fd_repair_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, iter->shred_idx );
     258           0 :     policy->nonce++;
     259           0 :     if( FD_UNLIKELY( ele->first_req_ts == 0 ) ) ele->first_req_ts = fd_tickcount();
     260           0 :   }
     261           0 :   return out;
     262           0 : }
     263             : 
     264             : fd_policy_peer_t const *
     265           0 : fd_policy_peer_insert( fd_policy_t * policy, fd_pubkey_t const * key, fd_ip4_port_t const * addr ) {
     266           0 :   fd_policy_peer_t * peer_map = policy->peers.map;
     267           0 :   fd_policy_peer_t * peer = fd_policy_peer_map_query( peer_map, *key, NULL );
     268           0 :   if( FD_UNLIKELY( !peer && fd_policy_peer_map_key_cnt( peer_map ) < fd_policy_peer_map_key_max( peer_map ) ) ) {
     269           0 :     peer = fd_policy_peer_map_insert( policy->peers.map, *key );
     270           0 :     peer->key  = *key;
     271           0 :     peer->ip4  = addr->addr;
     272           0 :     peer->port = addr->port;
     273           0 :     peer->req_cnt       = 0;
     274           0 :     peer->res_cnt       = 0;
     275           0 :     peer->first_req_ts  = 0;
     276           0 :     peer->last_req_ts   = 0;
     277           0 :     peer->first_resp_ts = 0;
     278           0 :     peer->last_resp_ts  = 0;
     279           0 :     peer->total_lat     = 0;
     280           0 :     peer->stake         = 0;
     281             : 
     282           0 :     fd_peer_t * peer_ele = fd_peer_pool_ele_acquire( policy->peers.pool );
     283           0 :     peer->pool_idx       = fd_peer_pool_idx( policy->peers.pool, peer_ele );
     284           0 :     peer_ele->identity   = *key;
     285           0 :     fd_peer_dlist_ele_push_tail( policy->peers.slow, peer_ele, policy->peers.pool );
     286           0 :     return peer;
     287           0 :   }
     288           0 :   return NULL;
     289           0 : }
     290             : 
     291             : fd_policy_peer_t *
     292           0 : fd_policy_peer_query( fd_policy_t * policy, fd_pubkey_t const * key ) {
     293           0 :   if( FD_UNLIKELY( memcmp( key->key, null_pubkey.key, 32UL ) == 0 ) ) {
     294           0 :     FD_LOG_WARNING(( "Repair policy peer with null pubkey." ));
     295           0 :     return NULL;
     296           0 :   };
     297           0 :   return fd_policy_peer_map_query( policy->peers.map, *key, NULL );
     298           0 : }
     299             : 
     300             : int
     301           0 : fd_policy_peer_remove( fd_policy_t * policy, fd_pubkey_t const * key ) {
     302           0 :   fd_policy_peer_t * peer = fd_policy_peer_map_query( policy->peers.map, *key, NULL );
     303           0 :   if( FD_UNLIKELY( !peer ) ) return 0;
     304           0 :   fd_peer_t * peer_ele = fd_peer_pool_ele( policy->peers.pool, peer->pool_idx );
     305             : 
     306           0 :   if( FD_UNLIKELY( policy->peers.select.iter == fd_peer_pool_idx( policy->peers.pool, peer_ele ) ) ) {
     307             :     /* In general removal during iteration is safe, except when the iterator is on the peer to be removed. */
     308           0 :     fd_peer_dlist_t * dlist = bucket_stages[policy->peers.select.stage] == FD_POLICY_LATENCY_FAST ? policy->peers.fast : policy->peers.slow;
     309           0 :     policy->peers.select.iter = fd_peer_dlist_iter_fwd_next( policy->peers.select.iter, dlist, policy->peers.pool );
     310           0 :   }
     311             : 
     312           0 :   fd_peer_dlist_t * bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt );
     313           0 :   fd_peer_dlist_ele_remove( bucket, peer_ele, policy->peers.pool );
     314           0 :   fd_peer_pool_ele_release( policy->peers.pool, peer_ele );
     315             : 
     316           0 :   fd_policy_peer_map_remove( policy->peers.map, peer );
     317           0 :   return 1;
     318           0 : }
     319             : 
     320             : void
     321           0 : fd_policy_peer_request_update( fd_policy_t * policy, fd_pubkey_t const * to ) {
     322           0 :   fd_policy_peer_t * active = fd_policy_peer_query( policy, to );
     323           0 :   if( FD_LIKELY( active ) ) {
     324           0 :     active->req_cnt++;
     325           0 :     active->last_req_ts = fd_tickcount();
     326           0 :     if( FD_UNLIKELY( active->first_req_ts == 0 ) ) active->first_req_ts = active->last_req_ts;
     327           0 :   }
     328           0 : }
     329             : 
     330             : void
     331           0 : fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, long rtt /* ns */ ) {
     332           0 :   fd_policy_peer_t * peer = fd_policy_peer_query( policy, to );
     333           0 :   if( FD_LIKELY( peer ) ) {
     334           0 :     long now = fd_tickcount();
     335           0 :     fd_peer_dlist_t * prev_bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt );
     336           0 :     peer->res_cnt++;
     337           0 :     if( FD_UNLIKELY( peer->first_resp_ts == 0 ) ) peer->first_resp_ts = now;
     338           0 :     peer->last_resp_ts = now;
     339           0 :     peer->total_lat   += rtt;
     340           0 :     fd_peer_dlist_t * new_bucket = fd_policy_peer_latency_bucket( policy, peer->total_lat, peer->res_cnt  );
     341             : 
     342           0 :     if( prev_bucket != new_bucket ) {
     343           0 :       fd_peer_t * peer_ele = fd_peer_pool_ele( policy->peers.pool, peer->pool_idx );
     344           0 :       fd_peer_dlist_ele_remove   ( prev_bucket, peer_ele, policy->peers.pool );
     345           0 :       fd_peer_dlist_ele_push_tail( new_bucket,  peer_ele, policy->peers.pool );
     346           0 :     }
     347           0 :   }
     348           0 : }
     349             : 
     350             : void
     351           0 : fd_policy_set_turbine_slot0( fd_policy_t * policy, ulong slot ) {
     352           0 :   policy->turbine_slot0 = slot;
     353           0 : }
     354             : 

Generated by: LCOV version 1.14