LCOV - code coverage report
Current view: top level - discof/repair - fd_policy.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 252 0.0 %
Date: 2025-10-14 04:31:44 Functions: 0 17 0.0 %

          Line data    Source code
       1             : #include "fd_policy.h"
       2             : #include "../../disco/metrics/fd_metrics.h"
       3             : 
       4             : #define NONCE_NULL        (UINT_MAX)
       5           0 : #define DEFER_REPAIR_MS   (200UL)
       6           0 : #define TARGET_TICK_PER_SLOT (64.0)
       7           0 : #define MS_PER_TICK          (400.0 / TARGET_TICK_PER_SLOT)
       8             : 
       9             : void *
      10           0 : fd_policy_new( void * shmem, ulong dedup_max, ulong peer_max, ulong seed ) {
      11             : 
      12           0 :   if( FD_UNLIKELY( !shmem ) ) {
      13           0 :     FD_LOG_WARNING(( "NULL mem" ));
      14           0 :     return NULL;
      15           0 :   }
      16             : 
      17           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_policy_align() ) ) ) {
      18           0 :     FD_LOG_WARNING(( "misaligned mem" ));
      19           0 :     return NULL;
      20           0 :   }
      21             : 
      22           0 :   ulong footprint = fd_policy_footprint( dedup_max, peer_max );
      23           0 :   fd_memset( shmem, 0, footprint );
      24             : 
      25           0 :   int lg_peer_max = fd_ulong_find_msb( fd_ulong_pow2_up( peer_max ) );
      26           0 :   FD_SCRATCH_ALLOC_INIT( l, shmem );
      27           0 :   fd_policy_t * policy      = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_align(),            sizeof(fd_policy_t)                           );
      28           0 :   void *        dedup_map   = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_map_align(),  fd_policy_dedup_map_footprint ( dedup_max   ) );
      29           0 :   void *        dedup_pool  = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_pool_align(), fd_policy_dedup_pool_footprint( dedup_max   ) );
      30           0 :   void *        dedup_lru   = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_dedup_lru_align(),  fd_policy_dedup_lru_footprint (             ) );
      31           0 :   void *        peers       = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_peer_map_align(),   fd_policy_peer_map_footprint  ( lg_peer_max ) );
      32           0 :   void *        peers_pool  = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_pool_align(),         fd_peer_pool_footprint        ( peer_max    ) );
      33           0 :   void *        peers_dlist = FD_SCRATCH_ALLOC_APPEND( l, fd_peer_dlist_align(),        fd_peer_dlist_footprint       (             ) );
      34           0 :   FD_TEST( FD_SCRATCH_ALLOC_FINI( l, fd_policy_align() ) == (ulong)shmem + footprint );
      35             : 
      36           0 :   policy->dedup.map     = fd_policy_dedup_map_new ( dedup_map,  dedup_max, seed );
      37           0 :   policy->dedup.pool    = fd_policy_dedup_pool_new( dedup_pool, dedup_max       );
      38           0 :   policy->dedup.lru     = fd_policy_dedup_lru_new ( dedup_lru                   );
      39           0 :   policy->peers.map     = fd_policy_peer_map_new  ( peers,      lg_peer_max     );
      40           0 :   policy->peers.pool    = fd_peer_pool_new        ( peers_pool, peer_max        );
      41           0 :   policy->peers.dlist   = fd_peer_dlist_new       ( peers_dlist                 );
      42           0 :   policy->iterf.ele_idx = ULONG_MAX;
      43           0 :   policy->turbine_slot0 = ULONG_MAX;
      44           0 :   policy->tsreset       = 0;
      45           0 :   policy->nonce         = 1;
      46             : 
      47           0 :   return shmem;
      48           0 : }
      49             : 
      50             : fd_policy_t *
      51           0 : fd_policy_join( void * shpolicy ) {
      52           0 :   fd_policy_t * policy = (fd_policy_t *)shpolicy;
      53             : 
      54           0 :   if( FD_UNLIKELY( !policy ) ) {
      55           0 :     FD_LOG_WARNING(( "NULL policy" ));
      56           0 :     return NULL;
      57           0 :   }
      58             : 
      59           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)policy, fd_policy_align() ) ) ) {
      60           0 :     FD_LOG_WARNING(( "misaligned policy" ));
      61           0 :     return NULL;
      62           0 :   }
      63             : 
      64           0 :   fd_wksp_t * wksp = fd_wksp_containing( policy );
      65           0 :   if( FD_UNLIKELY( !wksp ) ) {
      66           0 :     FD_LOG_WARNING(( "policy must be part of a workspace" ));
      67           0 :     return NULL;
      68           0 :   }
      69             : 
      70           0 :   policy->dedup.map   = fd_policy_dedup_map_join ( policy->dedup.map   );
      71           0 :   policy->dedup.pool  = fd_policy_dedup_pool_join( policy->dedup.pool  );
      72           0 :   policy->dedup.lru   = fd_policy_dedup_lru_join ( policy->dedup.lru   );
      73           0 :   policy->peers.map   = fd_policy_peer_map_join  ( policy->peers.map   );
      74           0 :   policy->peers.pool  = fd_peer_pool_join        ( policy->peers.pool  );
      75           0 :   policy->peers.dlist = fd_peer_dlist_join       ( policy->peers.dlist );
      76           0 :   policy->peers.iter  = fd_peer_dlist_iter_fwd_init( policy->peers.dlist, policy->peers.pool );
      77             : 
      78           0 :   return policy;
      79           0 : }
      80             : 
      81             : void *
      82           0 : fd_policy_leave( fd_policy_t const * policy ) {
      83             : 
      84           0 :   if( FD_UNLIKELY( !policy ) ) {
      85           0 :     FD_LOG_WARNING(( "NULL policy" ));
      86           0 :     return NULL;
      87           0 :   }
      88             : 
      89           0 :   return (void *)policy;
      90           0 : }
      91             : 
      92             : void *
      93           0 : fd_policy_delete( void * policy ) {
      94             : 
      95           0 :   if( FD_UNLIKELY( !policy ) ) {
      96           0 :     FD_LOG_WARNING(( "NULL policy" ));
      97           0 :     return NULL;
      98           0 :   }
      99             : 
     100           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned((ulong)policy, fd_policy_align() ) ) ) {
     101           0 :     FD_LOG_WARNING(( "misaligned policy" ));
     102           0 :     return NULL;
     103           0 :   }
     104             : 
     105           0 :   return policy;
     106           0 : }
     107             : 
     108             : /* dedup_evict evicts the first element returned by the map iterator. */
     109             : 
     110             : static void
     111           0 : dedup_evict( fd_policy_t * policy ) {
     112           0 :   fd_policy_dedup_ele_t * ele = fd_policy_dedup_lru_ele_pop_head( policy->dedup.lru, policy->dedup.pool );
     113           0 :   fd_policy_dedup_map_ele_remove( policy->dedup.map, &ele->key, NULL, policy->dedup.pool );
     114           0 :   fd_policy_dedup_pool_ele_release( policy->dedup.pool, ele );
     115           0 : }
     116             : 
     117             : /* dedup_next returns 1 if key is deduped, 0 otherwise. */
     118             : static int
     119           0 : dedup_next( fd_policy_t * policy, ulong key, long now ) {
     120           0 :   fd_policy_dedup_t *     dedup = &policy->dedup;
     121           0 :   fd_policy_dedup_ele_t * ele   = fd_policy_dedup_map_ele_query( dedup->map, &key, NULL, dedup->pool );
     122           0 :   if( FD_UNLIKELY( !ele ) ) {
     123           0 :     if( FD_UNLIKELY( !fd_policy_dedup_pool_free( dedup->pool ) ) ) dedup_evict( policy );
     124           0 :     ele         = fd_policy_dedup_pool_ele_acquire( dedup->pool );
     125           0 :     ele->key    = key;
     126           0 :     ele->req_ts = 0;
     127           0 :     fd_policy_dedup_map_ele_insert   ( dedup->map, ele, dedup->pool );
     128           0 :     fd_policy_dedup_lru_ele_push_tail( dedup->lru, ele, dedup->pool );
     129           0 :   }
     130           0 :   if( FD_LIKELY( now < ele->req_ts + (long)80e6 ) ) {
     131           0 :     return 1;
     132           0 :   }
     133           0 :   ele->req_ts = now;
     134           0 :   return 0;
     135           0 : }
     136             : 
     137           0 : static ulong ts_ms( long wallclock ) {
     138           0 :   return (ulong)wallclock / (ulong)1e6;
     139           0 : }
     140             : 
     141             : static int
     142           0 : passes_throttle_threshold( fd_policy_t * policy, fd_forest_blk_t * ele ) {
     143           0 :   if( FD_UNLIKELY( ele->slot < policy->turbine_slot0 ) ) return 1;
     144             :   /* Essentially is checking if current duration of block ( from the
     145             :      first shred received until now ) is greater than the highest tick
     146             :      received + 200ms. */
     147           0 :   double current_duration = (double)(fd_tickcount() - ele->first_shred_ts) / fd_tempo_tick_per_ns(NULL);
     148           0 :   double tick_plus_buffer = (ele->est_buffered_tick_recv * MS_PER_TICK + DEFER_REPAIR_MS) * 1e6; // change to 400e6 for a slot duration policy
     149             : 
     150           0 :   if( current_duration >= tick_plus_buffer ){
     151           0 :     FD_MCNT_INC( REPAIR, EAGER_REPAIR_AGGRESSES, 1 );
     152           0 :     return 1;
     153           0 :   }
     154           0 :   return 0;
     155           0 : }
     156             : 
     157             : fd_pubkey_t const *
     158           0 : fd_policy_peer_select( fd_policy_t * policy ) {
     159           0 :   fd_peer_dlist_t * dlist = policy->peers.dlist;
     160           0 :   fd_peer_t       * pool  = policy->peers.pool;
     161           0 :   if( FD_UNLIKELY( fd_peer_dlist_iter_done( policy->peers.iter, dlist, pool ) ) ) {
     162           0 :     policy->peers.iter = fd_peer_dlist_iter_fwd_init( dlist, pool );
     163           0 :   }
     164           0 :   fd_peer_t * select = fd_peer_dlist_iter_ele( policy->peers.iter, dlist, pool );
     165           0 :   policy->peers.iter = fd_peer_dlist_iter_fwd_next( policy->peers.iter, dlist, pool );
     166           0 :   return &select->identity;
     167           0 : }
     168             : 
     169             : fd_repair_msg_t const *
     170           0 : fd_policy_next( fd_policy_t * policy, fd_forest_t * forest, fd_repair_t * repair, long now, ulong highest_known_slot ) {
     171           0 :   fd_forest_blk_t *      pool     = fd_forest_pool( forest );
     172           0 :   fd_forest_subtrees_t * subtrees = fd_forest_subtrees( forest );
     173             : 
     174           0 :   if( FD_UNLIKELY( forest->root == ULONG_MAX ) ) return NULL;
     175           0 :   if( FD_UNLIKELY( fd_peer_pool_used( policy->peers.pool ) == 0 ) ) return NULL;
     176             : 
     177           0 :   fd_repair_msg_t * out = NULL;
     178           0 :   ulong now_ms = ts_ms( now );
     179             : 
     180           0 :   if( FD_UNLIKELY( forest->subtree_cnt > 0 ) ) {
     181           0 :     for( fd_forest_subtrees_iter_t iter = fd_forest_subtrees_iter_init( subtrees, pool );
     182           0 :           !fd_forest_subtrees_iter_done( iter, subtrees, pool );
     183           0 :           iter = fd_forest_subtrees_iter_next( iter, subtrees, pool ) ) {
     184           0 :       fd_forest_blk_t * orphan = fd_forest_subtrees_iter_ele( iter, subtrees, pool );
     185           0 :       ulong key                = fd_policy_dedup_key( FD_REPAIR_KIND_ORPHAN, orphan->slot, UINT_MAX );
     186           0 :       if( FD_UNLIKELY( !dedup_next( policy, key, now ) ) ) {
     187           0 :         out = fd_repair_orphan( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, orphan->slot );
     188           0 :         policy->nonce++;
     189           0 :         return out;
     190           0 :       }
     191           0 :     }
     192           0 :   }
     193             : 
     194             :   /* Every so often we'll need to reset the frontier iterator to the
     195             :      head of frontier, because we could end up traversing down a very
     196             :      long tree if we are far behind. */
     197             : 
     198           0 :   if( FD_UNLIKELY( now_ms - policy->tsreset > 100UL /* ms */ ||
     199           0 :                    policy->iterf.frontier_ver != fd_fseq_query( fd_forest_ver_const( forest ) ) ) ) {
     200           0 :     fd_policy_reset( policy, forest );
     201           0 :   }
     202             : 
     203           0 :   fd_forest_blk_t * ele = fd_forest_pool_ele( pool, policy->iterf.ele_idx );
     204           0 :   if( FD_UNLIKELY( !ele ) ) {
     205             :     // This happens when we are fully caught up i.e. we have all the shreds of every slot we know about.
     206           0 :     return NULL;
     207           0 :   }
     208             : 
     209             :   /* When we are at the head of the turbine, we should give turbine the
     210             :      chance to complete the shreds.  Agave waits 200ms from the
     211             :      estimated "correct time" of the highest shred received to repair.
     212             :      i.e. if we've received the first 200 shreds, the 200th has a tick
     213             :      of x. Translate that to millis, and we should wait to request shred
     214             :      201 until x + 200ms.  If we have a hole, i.e. first 200 shreds
     215             :      receive except shred 100, and the 101th shred has a tick of y, we
     216             :      should wait until y + 200ms to request shred 100.
     217             : 
     218             :      At the start of the loop, the policy iterf is valid and requestable.
     219             :      At the end of the loop, the policy iterf has been advanced to the
     220             :      next valid requestable element. */
     221             : 
     222           0 :   int req_made = 0;
     223           0 :   while( !req_made ) {
     224           0 :     ele = fd_forest_pool_ele( pool, policy->iterf.ele_idx );
     225             : 
     226           0 :     if( FD_UNLIKELY( !passes_throttle_threshold( policy, ele ) ) ) {
     227             :       /* We are not ready to repair this slot yet.  But it's possible we
     228             :          have another fork that we need to repair... so we just
     229             :          should skip to the next SLOT in the consumed iterator.  The
     230             :          likelihood that this ele is the head of turbine is high, which
     231             :          means that the shred_idx of the iterf is likely to be UINT_MAX,
     232             :          which means calling fd_forest_iter_next will advance the iterf
     233             :          to the next slot. */
     234           0 :       policy->iterf.shred_idx = UINT_MAX; // heinous... i'm sorry
     235           0 :       policy->iterf = fd_forest_iter_next( policy->iterf, forest );
     236           0 :       if( FD_UNLIKELY( fd_forest_iter_done( policy->iterf, forest ) ) ) {
     237           0 :          policy->iterf = fd_forest_iter_init( forest );
     238           0 :          break;
     239           0 :       }
     240           0 :       continue;
     241           0 :     }
     242             : 
     243           0 :     if( FD_UNLIKELY( policy->iterf.shred_idx == UINT_MAX ) ) {
     244           0 :       ulong key = fd_policy_dedup_key( FD_REPAIR_KIND_HIGHEST_SHRED, ele->slot, 0 );
     245           0 :       if( FD_UNLIKELY( ele->slot < highest_known_slot && !dedup_next( policy, key, now ) ) ) {
     246             :         // We'll never know the the highest shred for the current turbine slot, so there's no point in requesting it.
     247           0 :         out = fd_repair_highest_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, 0 );
     248           0 :         policy->nonce++;
     249           0 :         req_made = 1;
     250           0 :       }
     251           0 :     } else {
     252           0 :       ulong key = fd_policy_dedup_key( FD_REPAIR_KIND_SHRED, ele->slot, policy->iterf.shred_idx );
     253           0 :       if( FD_UNLIKELY( !dedup_next( policy, key, now ) ) ) {
     254           0 :         out = fd_repair_shred( repair, fd_policy_peer_select( policy ), now_ms, policy->nonce, ele->slot, policy->iterf.shred_idx );
     255           0 :         policy->nonce++;
     256           0 :         if( FD_UNLIKELY( ele->first_req_ts == 0 ) ) ele->first_req_ts = fd_tickcount();
     257           0 :         req_made = 1;
     258           0 :       }
     259           0 :     }
     260             : 
     261             :     /* Even if we have a request ready, we need to advance the iterator.
     262             :        Otherwise on the next call of policy_next, we'll try to re-request the
     263             :        same shred and it will get deduped. */
     264             : 
     265           0 :     policy->iterf = fd_forest_iter_next( policy->iterf, forest );
     266           0 :     if( FD_UNLIKELY( fd_forest_iter_done( policy->iterf, forest ) ) ) {
     267           0 :       policy->iterf = fd_forest_iter_init( forest );
     268           0 :       break;
     269           0 :     }
     270           0 :   }
     271             : 
     272           0 :   if( FD_UNLIKELY( !req_made ) ) return NULL;
     273           0 :   return out;
     274           0 : }
     275             : 
     276             : fd_policy_peer_t const *
     277           0 : fd_policy_peer_insert( fd_policy_t * policy, fd_pubkey_t const * key, fd_ip4_port_t const * addr ) {
     278           0 :   fd_policy_peer_t * peer_map = policy->peers.map;
     279           0 :   fd_policy_peer_t * peer = fd_policy_peer_map_query( peer_map, *key, NULL );
     280           0 :   if( FD_UNLIKELY( !peer && fd_policy_peer_map_key_cnt( peer_map ) < fd_policy_peer_map_key_max( peer_map ) ) ) {
     281           0 :     peer = fd_policy_peer_map_insert( policy->peers.map, *key );
     282           0 :     peer->key  = *key;
     283           0 :     peer->ip4  = addr->addr;
     284           0 :     peer->port = addr->port;
     285           0 :     peer->req_cnt       = 0;
     286           0 :     peer->res_cnt       = 0;
     287           0 :     peer->first_req_ts  = 0;
     288           0 :     peer->last_req_ts   = 0;
     289           0 :     peer->first_resp_ts = 0;
     290           0 :     peer->last_resp_ts  = 0;
     291           0 :     peer->total_lat     = 0;
     292           0 :     peer->stake         = 0;
     293             : 
     294           0 :     fd_peer_t * peer_ele = fd_peer_pool_ele_acquire( policy->peers.pool );
     295           0 :     peer->pool_idx = fd_peer_pool_idx( policy->peers.pool, peer_ele );
     296           0 :     peer_ele->identity = *key;
     297           0 :     fd_peer_dlist_ele_push_tail( policy->peers.dlist, peer_ele, policy->peers.pool );
     298           0 :     return peer;
     299           0 :   }
     300           0 :   return NULL;
     301           0 : }
     302             : 
     303             : fd_policy_peer_t *
     304           0 : fd_policy_peer_query( fd_policy_t * policy, fd_pubkey_t const * key ) {
     305           0 :   return fd_policy_peer_map_query( policy->peers.map, *key, NULL );
     306           0 : }
     307             : 
     308             : int
     309           0 : fd_policy_peer_remove( fd_policy_t * policy, fd_pubkey_t const * key ) {
     310           0 :   fd_policy_peer_t * peer = fd_policy_peer_map_query( policy->peers.map, *key, NULL );
     311           0 :   if( FD_UNLIKELY( !peer ) ) return 0;
     312           0 :   fd_peer_t * peer_ele = fd_peer_pool_ele( policy->peers.pool, peer->pool_idx );
     313           0 :   fd_policy_peer_map_remove( policy->peers.map, peer );
     314             : 
     315           0 :   if( FD_UNLIKELY( policy->peers.iter == fd_peer_pool_idx( policy->peers.pool, peer_ele ) ) ) {
     316             :     /* In general removal during iteration is safe, except when the iterator is on the peer to be removed. */
     317           0 :     policy->peers.iter = fd_peer_dlist_iter_fwd_next( policy->peers.iter, policy->peers.dlist, policy->peers.pool );
     318           0 :   }
     319           0 :   fd_peer_dlist_ele_remove( policy->peers.dlist, peer_ele, policy->peers.pool );
     320           0 :   fd_peer_pool_ele_release( policy->peers.pool, peer_ele );
     321           0 :   return 1;
     322           0 : }
     323             : 
     324             : void
     325           0 : fd_policy_peer_request_update( fd_policy_t * policy, fd_pubkey_t const * to ) {
     326           0 :   fd_policy_peer_t * active = fd_policy_peer_query( policy, to );
     327           0 :   if( FD_LIKELY( active ) ) {
     328           0 :     active->req_cnt++;
     329           0 :     active->last_req_ts = fd_tickcount();
     330           0 :     if( FD_UNLIKELY( active->first_req_ts == 0 ) ) active->first_req_ts = active->last_req_ts;
     331           0 :   }
     332           0 : }
     333             : 
     334             : void
     335           0 : fd_policy_peer_response_update( fd_policy_t * policy, fd_pubkey_t const * to, long rtt ) {
     336           0 :   fd_policy_peer_t * peer = fd_policy_peer_query( policy, to );
     337           0 :   if( FD_LIKELY( peer ) ) {
     338           0 :     long now = fd_tickcount();
     339           0 :     peer->res_cnt++;
     340           0 :     if( FD_UNLIKELY( peer->first_resp_ts == 0 ) ) peer->first_resp_ts = now;
     341           0 :     peer->last_resp_ts = now;
     342           0 :     peer->total_lat   += rtt;
     343           0 :   }
     344           0 : }
     345             : 
     346             : void
     347           0 : fd_policy_reset( fd_policy_t * policy, fd_forest_t * forest ) {
     348           0 :   policy->iterf   = fd_forest_iter_init( forest );
     349           0 :   policy->tsreset = ts_ms( fd_log_wallclock() );
     350           0 : }
     351             : 
     352             : void
     353           0 : fd_policy_set_turbine_slot0( fd_policy_t * policy, ulong slot ) {
     354           0 :   policy->turbine_slot0 = slot;
     355           0 : }
     356             : 

Generated by: LCOV version 1.14