LCOV - code coverage report
Current view: top level - flamenco/accdb - fd_accdb_admin_v2_root.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 231 0.0 %
Date: 2026-02-13 06:06:24 Functions: 0 6 0.0 %

          Line data    Source code
       1             : #include "fd_accdb_admin_v2_private.h"
       2             : #include "../fd_flamenco_base.h"
       3             : #include "../runtime/fd_runtime_const.h" /* FD_RUNTIME_ACC_SZ_MAX */
       4             : #include "../../vinyl/data/fd_vinyl_data.h"
       5             : 
       6             : /***********************************************************************
       7             : 
       8             :   fd_accdb_admin_v2_root.c contains the account rooting algorithm.
       9             : 
      10             :    This algorithm is designed to amortize vinyl I/O latency by
      11             :    processing accounts in batches.
      12             : 
      13             :    For each batch of accounts, it does the following logic:
      14             : 
      15             :    - ACQUIRE batch request for account updates
      16             :    - ERASE   batch request for account deletions
      17             :    - Spin wait for ACQUIRE completion
      18             :    - Copy back modified accounts
      19             :    - RELEASE batch request for account updates
      20             :    - Spin wait for ACQUIRE, ERASE completions
      21             :    - Free records from funk
      22             : 
      23             : ***********************************************************************/
      24             : 
      25             : /* vinyl_spin_wait waits for completion of a vinyl request and asserts
      26             :    that all requests completed successfully. */
      27             : 
      28             : static void
      29             : vinyl_spin_wait( fd_vinyl_comp_t const * comp,
      30             :                  fd_vinyl_key_t const *  key0,
      31             :                  schar const *           err0,
      32             :                  ulong                   cnt,
      33           0 :                  char const *            req_type_cstr ) {
      34             : 
      35             :   /* FIXME use a load-acquire here, such that later loads are ordered
      36             :            past this load */
      37           0 :   while( FD_VOLATILE_CONST( comp->seq )!=1UL ) FD_SPIN_PAUSE();
      38           0 :   FD_COMPILER_MFENCE();
      39           0 :   int comp_err = FD_VOLATILE_CONST( comp->err );
      40           0 :   if( FD_UNLIKELY( comp_err!=FD_VINYL_SUCCESS ) ) {
      41           0 :     FD_LOG_CRIT(( "vinyl tile rejected my %s request (%i-%s)",
      42           0 :                   req_type_cstr, comp_err, fd_vinyl_strerror( comp_err ) ));
      43           0 :   }
      44             : 
      45           0 :   for( ulong i=0UL; i<cnt; i++ ) {
      46           0 :     int err = err0[ i ];
      47           0 :     if( FD_UNLIKELY( err!=FD_VINYL_SUCCESS && err!=FD_VINYL_ERR_KEY ) ) {
      48           0 :       FD_BASE58_ENCODE_32_BYTES( key0[i].uc, key_b58 );
      49           0 :       FD_LOG_CRIT(( "vinyl %s request failed for %s (%i-%s)",
      50           0 :                     req_type_cstr, key_b58, err, fd_vinyl_strerror( err ) ));
      51           0 :     }
      52           0 :   }
      53           0 : }
      54             : 
      55             : /* funk_rec_write_lock spins until it gains a write lock for a record,
      56             :    increments the version number, and returns the updated ver_lock
      57             :    value. */
      58             : 
      59             : static ulong
      60           0 : fd_funk_rec_admin_lock( fd_funk_rec_t * rec ) {
      61           0 :   ulong * vl = &rec->ver_lock;
      62           0 :   for(;;) {
      63           0 :     ulong const ver_lock = FD_VOLATILE_CONST( *vl );
      64           0 :     ulong const ver      = fd_funk_rec_ver_bits ( ver_lock );
      65           0 :     ulong const lock     = fd_funk_rec_lock_bits( ver_lock );
      66           0 :     if( FD_UNLIKELY( lock ) ) {
      67             :       /* Spin while there are active readers */
      68             :       /* FIXME kill client after spinning for 30 seconds to prevent silent deadlock */
      69           0 :       FD_SPIN_PAUSE();
      70           0 :       continue;
      71           0 :     }
      72           0 :     ulong const new_ver = fd_funk_rec_ver_inc( ver );
      73           0 :     ulong const new_vl  = fd_funk_rec_ver_lock( new_ver, FD_FUNK_REC_LOCK_MASK );
      74           0 :     if( FD_UNLIKELY( FD_ATOMIC_CAS( vl, ver_lock, new_vl )!=ver_lock ) ) {
      75           0 :       FD_SPIN_PAUSE();
      76           0 :       continue;
      77           0 :     }
      78           0 :     return new_vl;
      79           0 :   }
      80           0 : }
      81             : 
      82             : static void
      83             : fd_funk_rec_admin_unlock( fd_funk_rec_t * rec,
      84           0 :                           ulong           ver_lock ) {
      85           0 :   FD_VOLATILE( rec->ver_lock ) = fd_funk_rec_ver_lock( fd_funk_rec_ver_bits( ver_lock ), 0UL );
      86           0 : }
      87             : 
      88             : static void
      89             : funk_free_rec( fd_funk_t *     funk,
      90           0 :                fd_funk_rec_t * rec ) {
      91             :   /* Acquire admin lock (kick out readers)
      92             : 
      93             :      Note: At this point, well-behaving external readers will abandon a
      94             :      read-lock attempt if they observe this active write lock.  (An
      95             :      admin lock always implies the record is about to die) */
      96             : 
      97           0 :   FD_COMPILER_MFENCE();
      98           0 :   ulong ver_lock = fd_funk_rec_admin_lock( rec );
      99             : 
     100             :   /* Free record */
     101             : 
     102           0 :   memset( &rec->pair, 0, sizeof(fd_funk_xid_key_pair_t) );
     103           0 :   FD_COMPILER_MFENCE();
     104           0 :   rec->map_next = FD_FUNK_REC_IDX_NULL;
     105           0 :   fd_funk_val_flush( rec, funk->alloc, funk->wksp );
     106           0 :   fd_funk_rec_admin_unlock( rec, ver_lock );
     107           0 :   fd_funk_rec_pool_release( funk->rec_pool, rec, 1 );
     108           0 : }
     109             : 
     110             : /* funk_gc_chain optimistically deletes all but the newest rooted
     111             :    revisions of rec.  This possibly deletes 'rec'.  Returns rec if rec
     112             :    is the only known rooted revision, otherwise returns NULL (if rec was
     113             :    deleted).  Note that due to edge cases, revisions that are not in the
     114             :    oldest tracked slot, may not reliably get cleaned up.  (The oldest
     115             :    tracked slot always gets cleaned up, though.) */
     116             : 
     117             : static fd_funk_rec_t *
     118             : funk_gc_chain( fd_accdb_admin_v2_t * const admin,
     119           0 :                fd_funk_rec_t *       const rec ) {
     120             : 
     121           0 :   fd_accdb_lineage_t * lineage   = admin->root_lineage;
     122           0 :   fd_funk_t *          funk      = admin->v1->funk;
     123           0 :   fd_funk_rec_t *      rec_pool  = funk->rec_pool->ele;
     124           0 :   ulong                rec_max   = funk->rec_pool->ele_max;
     125           0 :   ulong                seed      = funk->rec_map->map->seed;
     126           0 :   ulong                chain_cnt = funk->rec_map->map->chain_cnt;
     127           0 :   ulong                root_slot = lineage->fork[0].ul[0];
     128             : 
     129           0 :   ulong hash      = fd_funk_rec_map_key_hash( &rec->pair, seed );
     130           0 :   ulong chain_idx = (hash & (chain_cnt-1UL) );
     131             : 
     132             :   /* Lock rec_map chain */
     133             : 
     134           0 :   int lock_err = fd_funk_rec_map_iter_lock( funk->rec_map, &chain_idx, 1UL, FD_MAP_FLAG_BLOCKING );
     135           0 :   if( FD_UNLIKELY( lock_err!=FD_MAP_SUCCESS ) ) {
     136           0 :     FD_LOG_CRIT(( "fd_funk_rec_map_iter_lock failed (%i-%s)", lock_err, fd_map_strerror( lock_err ) ));
     137           0 :   }
     138             : 
     139           0 :   fd_funk_rec_map_shmem_private_chain_t * chain =
     140           0 :       fd_funk_rec_map_shmem_private_chain( funk->rec_map->map, 0UL ) + chain_idx;
     141           0 :   ulong ver =
     142           0 :       fd_funk_rec_map_private_vcnt_ver( FD_VOLATILE_CONST( chain->ver_cnt ) );
     143           0 :   FD_CRIT( ver&1UL, "chain is not locked" );
     144             : 
     145             :   /* Walk map chain */
     146             : 
     147           0 :   fd_funk_rec_t * found_rec = NULL;
     148           0 :   uint *          pnext     = &chain->head_cidx;
     149           0 :   uint            cur       = *pnext;
     150           0 :   ulong           chain_len = 0UL;
     151           0 :   ulong           iter      = 0UL;
     152           0 :   while( cur!=FD_FUNK_REC_IDX_NULL ) {
     153           0 :     if( FD_UNLIKELY( iter++ > rec_max ) ) FD_LOG_CRIT(( "cycle detected in rec_map chain %lu", chain_idx ));
     154             : 
     155             :     /* Is this node garbage? */
     156             : 
     157           0 :     fd_funk_rec_t * node = &funk->rec_pool->ele[ cur ];
     158           0 :     if( FD_UNLIKELY( cur==node->map_next ) ) FD_LOG_CRIT(( "accdb corruption detected: cycle in rec_map chain %lu", chain_idx ));
     159           0 :     cur = node->map_next;
     160           0 :     if( !fd_funk_rec_key_eq( rec->pair.key, node->pair.key ) ) goto retain;
     161           0 :     if( node->pair.xid->ul[0]>root_slot ) goto retain;
     162           0 :     if( !found_rec ) {
     163           0 :       found_rec = node;
     164           0 :       goto retain;
     165           0 :     }
     166             : 
     167             :     /* No longer need this node */
     168             : 
     169           0 :     if( node->pair.xid->ul[0] > rec->pair.xid->ul[0] ) {
     170             :       /* If this node is newer than the to-be-deleted slot, need to
     171             :          remove it from the transaction's record list. */
     172           0 :       uint neigh_prev = node->prev_idx;
     173           0 :       uint neigh_next = node->next_idx;
     174           0 :       if( neigh_prev==FD_FUNK_REC_IDX_NULL ||
     175           0 :           neigh_next==FD_FUNK_REC_IDX_NULL ) {
     176             :         /* Node is first or last of transaction -- too bothersome to
     177             :            remove it from the transaction's record list */
     178           0 :         goto retain;
     179           0 :       }
     180           0 :       rec_pool[ neigh_next ].prev_idx = neigh_prev;
     181           0 :       rec_pool[ neigh_prev ].next_idx = neigh_next;
     182           0 :     }
     183             : 
     184             :     /* Destroy this node */
     185             : 
     186           0 :     funk_free_rec( funk, node );
     187           0 :     *pnext = cur;
     188           0 :     continue;
     189             : 
     190           0 :   retain:
     191           0 :     pnext = &node->map_next;
     192           0 :     chain_len++;
     193           0 :   }
     194             : 
     195             :   /* Unlock rec_map chain */
     196             : 
     197           0 :   FD_COMPILER_MFENCE();
     198           0 :   FD_VOLATILE( chain->ver_cnt ) =
     199           0 :       fd_funk_rec_map_private_vcnt( ver+1UL, chain_len );
     200           0 :   FD_COMPILER_MFENCE();
     201           0 :   return found_rec==rec ? found_rec : NULL;
     202           0 : }
     203             : 
     204             : /* Main algorithm */
     205             : 
     206             : fd_funk_rec_t *
     207             : fd_accdb_v2_root_batch( fd_accdb_admin_v2_t * admin,
     208           0 :                         fd_funk_rec_t *       rec0 ) {
     209           0 :   long t_start = fd_tickcount();
     210             : 
     211           0 :   fd_funk_t *           funk      = admin->v1->funk;        /* unrooted DB */
     212           0 :   fd_wksp_t *           funk_wksp = funk->wksp;             /* shm workspace containing unrooted accounts */
     213           0 :   fd_funk_rec_t *       rec_pool  = funk->rec_pool->ele;    /* funk rec arena */
     214           0 :   fd_vinyl_rq_t *       rq        = admin->vinyl_rq;        /* "request queue "*/
     215           0 :   fd_vinyl_req_pool_t * req_pool  = admin->vinyl_req_pool;  /* "request pool" */
     216           0 :   fd_wksp_t *           req_wksp  = admin->vinyl_req_wksp;  /* shm workspace containing request buffer */
     217           0 :   fd_wksp_t *           data_wksp = admin->vinyl_data_wksp; /* shm workspace containing vinyl data cache */
     218           0 :   ulong                 link_id   = admin->vinyl_link_id;   /* vinyl client ID */
     219             : 
     220             :   /* Collect funk request batch */
     221             : 
     222           0 :   fd_funk_rec_t * recs[ FD_ACCDB_ROOT_BATCH_MAX ];
     223           0 :   ulong           rec_cnt;
     224             : 
     225           0 :   fd_funk_rec_t * next = rec0;
     226           0 :   for( rec_cnt=0UL; next && rec_cnt<FD_ACCDB_ROOT_BATCH_MAX; ) {
     227           0 :     fd_funk_rec_t * cur = next;
     228           0 :     if( fd_funk_rec_idx_is_null( cur->next_idx ) ) {
     229           0 :       next = NULL;
     230           0 :     } else {
     231           0 :       next = &rec_pool[ cur->next_idx ];
     232           0 :     }
     233           0 :     cur->prev_idx = FD_FUNK_REC_IDX_NULL;
     234           0 :     cur->next_idx = FD_FUNK_REC_IDX_NULL;
     235             : 
     236           0 :     if( funk_gc_chain( admin, cur ) ) {
     237           0 :       recs[ rec_cnt++ ] = cur;
     238           0 :     }
     239           0 :   }
     240             : 
     241             :   /* Partition batch into ACQUIRE (updates) and ERASE (deletions) */
     242             : 
     243           0 :   ulong acq_cnt = 0UL;
     244           0 :   ulong del_cnt;
     245           0 :   for( ulong i=0UL; i<rec_cnt; i++ ) {
     246           0 :     fd_account_meta_t const * meta = fd_funk_val( recs[ i ], funk_wksp );
     247           0 :     FD_CRIT( meta && recs[ i ]->val_sz>=sizeof(fd_account_meta_t), "corrupt funk_rec" );
     248           0 :     if( meta->lamports ) {
     249           0 :       fd_funk_rec_t * tmp = recs[ i ];
     250           0 :       recs[ i ]       = recs[ acq_cnt ];
     251           0 :       recs[ acq_cnt ] = tmp;
     252           0 :       acq_cnt++;
     253           0 :     }
     254           0 :   }
     255           0 :   del_cnt = rec_cnt - acq_cnt;
     256             : 
     257             :   /* Create ACQUIRE and ERASE batch requests */
     258             : 
     259           0 :   ulong            del_batch = fd_vinyl_req_pool_acquire( req_pool ); /* ERASE */
     260           0 :   ulong            acq_batch = fd_vinyl_req_pool_acquire( req_pool ); /* ACQUIRE */
     261           0 :   fd_vinyl_key_t * acq_key0  = fd_vinyl_req_batch_key( req_pool, acq_batch );
     262           0 :   fd_vinyl_key_t * del_key0  = fd_vinyl_req_batch_key( req_pool, del_batch );
     263             : 
     264           0 :   for( ulong i=0UL; i<acq_cnt; i++ ) {
     265           0 :     fd_vinyl_key_init( &acq_key0[ i ], recs[ i         ]->pair.key, 32UL );
     266           0 :   }
     267           0 :   for( ulong i=0UL; i<del_cnt; i++ ) {
     268           0 :     fd_vinyl_key_init( &del_key0[ i ], recs[ acq_cnt+i ]->pair.key, 32UL );
     269           0 :   }
     270             : 
     271             :   /* Send off ACQUIRE and ERASE requests */
     272             : 
     273           0 :   fd_vinyl_comp_t * acq_comp       = fd_vinyl_req_batch_comp     ( req_pool, acq_batch );
     274           0 :   fd_vinyl_comp_t * del_comp       = fd_vinyl_req_batch_comp     ( req_pool, del_batch );
     275           0 :   schar *           acq_err0       = fd_vinyl_req_batch_err      ( req_pool, acq_batch );
     276           0 :   schar *           del_err0       = fd_vinyl_req_batch_err      ( req_pool, del_batch );
     277           0 :   ulong *           acq_val_gaddr0 = fd_vinyl_req_batch_val_gaddr( req_pool, acq_batch );
     278             : 
     279           0 :   memset( acq_comp, 0, sizeof(fd_vinyl_comp_t) );
     280           0 :   memset( del_comp, 0, sizeof(fd_vinyl_comp_t) );
     281           0 :   for( ulong i=0UL; i<acq_cnt; i++ ) acq_err0[ i ] = 0;
     282           0 :   for( ulong i=0UL; i<del_cnt; i++ ) del_err0[ i ] = 0;
     283           0 :   for( ulong i=0UL; i<acq_cnt; i++ ) {
     284           0 :     fd_account_meta_t const * src_meta = fd_funk_val( recs[ i ], funk_wksp );
     285             : 
     286           0 :     ulong data_sz = src_meta->dlen;
     287           0 :     FD_CRIT( data_sz<=FD_RUNTIME_ACC_SZ_MAX, "oversize account record" );
     288             : 
     289           0 :     ulong val_sz = sizeof(fd_account_meta_t) + data_sz;
     290           0 :     acq_val_gaddr0[ i ]      = val_sz;
     291           0 :     admin->base.root_tot_sz += val_sz;
     292           0 :   }
     293             : 
     294           0 :   fd_vinyl_req_send_batch(
     295           0 :       rq, req_pool, req_wksp,
     296           0 :       admin->vinyl_req_id++, link_id,
     297           0 :       FD_VINYL_REQ_TYPE_ACQUIRE,
     298           0 :       FD_VINYL_REQ_FLAG_MODIFY |
     299           0 :       FD_VINYL_REQ_FLAG_IGNORE |
     300           0 :       FD_VINYL_REQ_FLAG_CREATE,
     301           0 :       acq_batch, acq_cnt
     302           0 :   );
     303           0 :   fd_vinyl_req_send_batch(
     304           0 :       rq, req_pool, req_wksp,
     305           0 :       admin->vinyl_req_id++, link_id,
     306           0 :       FD_VINYL_REQ_TYPE_ERASE,
     307           0 :       0UL,
     308           0 :       del_batch, del_cnt
     309           0 :   );
     310             : 
     311             :   /* Spin for ACQUIRE completion */
     312             : 
     313           0 :   vinyl_spin_wait( acq_comp, acq_key0, acq_err0, acq_cnt, "ACQUIRE" );
     314           0 :   long t_acquire = fd_tickcount();
     315             : 
     316             :   /* Copy back modified accounts */
     317             : 
     318           0 :   for( ulong i=0UL; i<acq_cnt; i++ ) {
     319           0 :     fd_account_meta_t const * src_meta = fd_funk_val( recs[ i ], funk_wksp );
     320             : 
     321           0 :     ulong data_sz = src_meta->dlen;
     322           0 :     ulong val_sz  = sizeof(fd_account_meta_t) + data_sz;
     323           0 :     FD_CRIT( data_sz<=FD_RUNTIME_ACC_SZ_MAX, "oversize account record" );
     324             : 
     325           0 :     fd_account_meta_t * dst_meta = fd_wksp_laddr_fast( data_wksp, acq_val_gaddr0[ i ] );
     326           0 :     fd_vinyl_info_t *   val_info = fd_vinyl_data_info( dst_meta );
     327             : 
     328           0 :     fd_memcpy( dst_meta, src_meta, val_sz );
     329           0 :     val_info->val_sz = (uint)val_sz;
     330           0 :   }
     331             : 
     332             :   /* Send off RELEASE batch request (reuse acq_batch) */
     333             : 
     334           0 :   memset( acq_comp, 0, sizeof(fd_vinyl_comp_t) );
     335           0 :   for( ulong i=0UL; i<acq_cnt; i++ ) acq_err0[ i ] = 0;
     336           0 :   fd_vinyl_req_send_batch(
     337           0 :       rq, req_pool, req_wksp,
     338           0 :       admin->vinyl_req_id++, link_id,
     339           0 :       FD_VINYL_REQ_TYPE_RELEASE,
     340           0 :       FD_VINYL_REQ_FLAG_MODIFY,
     341           0 :       acq_batch, acq_cnt
     342           0 :   );
     343           0 :   long t_copy = fd_tickcount();
     344             : 
     345             :   /* Spin for ERASE, RELEASE completions */
     346             : 
     347           0 :   vinyl_spin_wait( del_comp, del_key0, del_err0, del_cnt, "ERASE" );
     348           0 :   fd_vinyl_req_pool_release( req_pool, del_batch );
     349             : 
     350           0 :   vinyl_spin_wait( acq_comp, acq_key0, acq_err0, acq_cnt, "RELEASE" );
     351           0 :   fd_vinyl_req_pool_release( req_pool, acq_batch );
     352           0 :   long t_release = fd_tickcount();
     353             : 
     354             :   /* Remove funk records */
     355             : 
     356           0 :   for( ulong i=0UL; i<rec_cnt; i++ ) {
     357           0 :     fd_funk_xid_key_pair_t pair = recs[ i ]->pair;
     358           0 :     fd_funk_rec_query_t query[1];
     359           0 :     int rm_err = fd_funk_rec_map_remove( funk->rec_map, &pair, NULL, query, FD_MAP_FLAG_BLOCKING );
     360           0 :     if( FD_UNLIKELY( rm_err!=FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "fd_funk_rec_map_remove failed (%i-%s)", rm_err, fd_map_strerror( rm_err ) ));
     361           0 :     funk_free_rec( funk, recs[ i ] );
     362           0 :   }
     363           0 :   long t_gc = fd_tickcount();
     364             : 
     365             :   /* Update metrics */
     366             : 
     367           0 :   admin->base.root_cnt    += (uint)acq_cnt;
     368           0 :   admin->base.reclaim_cnt += (uint)del_cnt;
     369           0 :   admin->base.dt_vinyl    += ( t_acquire - t_start ) + ( t_release - t_copy );
     370           0 :   admin->base.dt_copy     += ( t_copy - t_acquire );
     371           0 :   admin->base.dt_gc       += ( t_gc - t_release );
     372             : 
     373           0 :   return next;
     374           0 : }

Generated by: LCOV version 1.14