Line data Source code
1 : #include "fd_accdb_admin_v2_private.h"
2 : #include "../fd_flamenco_base.h"
3 : #include "../runtime/fd_runtime_const.h" /* FD_RUNTIME_ACC_SZ_MAX */
4 : #include "../../vinyl/data/fd_vinyl_data.h"
5 :
6 : /***********************************************************************
7 :
8 : fd_accdb_admin_v2_root.c contains the account rooting algorithm.
9 :
10 : This algorithm is designed to amortize vinyl I/O latency by
11 : processing accounts in batches.
12 :
13 : For each batch of accounts, it does the following logic:
14 :
15 : - ACQUIRE batch request for account updates
16 : - ERASE batch request for account deletions
17 : - Spin wait for ACQUIRE completion
18 : - Copy back modified accounts
19 : - RELEASE batch request for account updates
20 : - Spin wait for ACQUIRE, ERASE completions
21 : - Free records from funk
22 :
23 : ***********************************************************************/
24 :
25 : /* vinyl_spin_wait waits for completion of a vinyl request and asserts
26 : that all requests completed successfully. */
27 :
28 : static void
29 : vinyl_spin_wait( fd_vinyl_comp_t const * comp,
30 : fd_vinyl_key_t const * key0,
31 : schar const * err0,
32 : ulong cnt,
33 0 : char const * req_type_cstr ) {
34 :
35 : /* FIXME use a load-acquire here, such that later loads are ordered
36 : past this load */
37 0 : while( FD_VOLATILE_CONST( comp->seq )!=1UL ) FD_SPIN_PAUSE();
38 0 : FD_COMPILER_MFENCE();
39 0 : int comp_err = FD_VOLATILE_CONST( comp->err );
40 0 : if( FD_UNLIKELY( comp_err!=FD_VINYL_SUCCESS ) ) {
41 0 : FD_LOG_CRIT(( "vinyl tile rejected my %s request (%i-%s)",
42 0 : req_type_cstr, comp_err, fd_vinyl_strerror( comp_err ) ));
43 0 : }
44 :
45 0 : for( ulong i=0UL; i<cnt; i++ ) {
46 0 : int err = err0[ i ];
47 0 : if( FD_UNLIKELY( err!=FD_VINYL_SUCCESS && err!=FD_VINYL_ERR_KEY ) ) {
48 0 : FD_BASE58_ENCODE_32_BYTES( key0[i].uc, key_b58 );
49 0 : FD_LOG_CRIT(( "vinyl %s request failed for %s (%i-%s)",
50 0 : req_type_cstr, key_b58, err, fd_vinyl_strerror( err ) ));
51 0 : }
52 0 : }
53 0 : }
54 :
55 : /* funk_rec_write_lock spins until it gains a write lock for a record,
56 : increments the version number, and returns the updated ver_lock
57 : value. */
58 :
59 : static ulong
60 0 : fd_funk_rec_admin_lock( fd_funk_rec_t * rec ) {
61 0 : ulong * vl = &rec->ver_lock;
62 0 : for(;;) {
63 0 : ulong const ver_lock = FD_VOLATILE_CONST( *vl );
64 0 : ulong const ver = fd_funk_rec_ver_bits ( ver_lock );
65 0 : ulong const lock = fd_funk_rec_lock_bits( ver_lock );
66 0 : if( FD_UNLIKELY( lock ) ) {
67 : /* Spin while there are active readers */
68 : /* FIXME kill client after spinning for 30 seconds to prevent silent deadlock */
69 0 : FD_SPIN_PAUSE();
70 0 : continue;
71 0 : }
72 0 : ulong const new_ver = fd_funk_rec_ver_inc( ver );
73 0 : ulong const new_vl = fd_funk_rec_ver_lock( new_ver, FD_FUNK_REC_LOCK_MASK );
74 0 : if( FD_UNLIKELY( FD_ATOMIC_CAS( vl, ver_lock, new_vl )!=ver_lock ) ) {
75 0 : FD_SPIN_PAUSE();
76 0 : continue;
77 0 : }
78 0 : return new_vl;
79 0 : }
80 0 : }
81 :
82 : static void
83 : fd_funk_rec_admin_unlock( fd_funk_rec_t * rec,
84 0 : ulong ver_lock ) {
85 0 : FD_VOLATILE( rec->ver_lock ) = fd_funk_rec_ver_lock( fd_funk_rec_ver_bits( ver_lock ), 0UL );
86 0 : }
87 :
88 : static void
89 : funk_free_rec( fd_funk_t * funk,
90 0 : fd_funk_rec_t * rec ) {
91 : /* Acquire admin lock (kick out readers)
92 :
93 : Note: At this point, well-behaving external readers will abandon a
94 : read-lock attempt if they observe this active write lock. (An
95 : admin lock always implies the record is about to die) */
96 :
97 0 : FD_COMPILER_MFENCE();
98 0 : ulong ver_lock = fd_funk_rec_admin_lock( rec );
99 :
100 : /* Free record */
101 :
102 0 : memset( &rec->pair, 0, sizeof(fd_funk_xid_key_pair_t) );
103 0 : FD_COMPILER_MFENCE();
104 0 : rec->map_next = FD_FUNK_REC_IDX_NULL;
105 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
106 0 : fd_funk_rec_admin_unlock( rec, ver_lock );
107 0 : fd_funk_rec_pool_release( funk->rec_pool, rec, 1 );
108 0 : }
109 :
110 : /* funk_gc_chain optimistically deletes all but the newest rooted
111 : revisions of rec. This possibly deletes 'rec'. Returns rec if rec
112 : is the only known rooted revision, otherwise returns NULL (if rec was
113 : deleted). Note that due to edge cases, revisions that are not in the
114 : oldest tracked slot, may not reliably get cleaned up. (The oldest
115 : tracked slot always gets cleaned up, though.) */
116 :
117 : static fd_funk_rec_t *
118 : funk_gc_chain( fd_accdb_admin_v2_t * const admin,
119 0 : fd_funk_rec_t * const rec ) {
120 :
121 0 : fd_accdb_lineage_t * lineage = admin->root_lineage;
122 0 : fd_funk_t * funk = admin->v1->funk;
123 0 : fd_funk_rec_t * rec_pool = funk->rec_pool->ele;
124 0 : ulong rec_max = funk->rec_pool->ele_max;
125 0 : ulong seed = funk->rec_map->map->seed;
126 0 : ulong chain_cnt = funk->rec_map->map->chain_cnt;
127 0 : ulong root_slot = lineage->fork[0].ul[0];
128 :
129 0 : ulong hash = fd_funk_rec_map_key_hash( &rec->pair, seed );
130 0 : ulong chain_idx = (hash & (chain_cnt-1UL) );
131 :
132 : /* Lock rec_map chain */
133 :
134 0 : int lock_err = fd_funk_rec_map_iter_lock( funk->rec_map, &chain_idx, 1UL, FD_MAP_FLAG_BLOCKING );
135 0 : if( FD_UNLIKELY( lock_err!=FD_MAP_SUCCESS ) ) {
136 0 : FD_LOG_CRIT(( "fd_funk_rec_map_iter_lock failed (%i-%s)", lock_err, fd_map_strerror( lock_err ) ));
137 0 : }
138 :
139 0 : fd_funk_rec_map_shmem_private_chain_t * chain =
140 0 : fd_funk_rec_map_shmem_private_chain( funk->rec_map->map, 0UL ) + chain_idx;
141 0 : ulong ver =
142 0 : fd_funk_rec_map_private_vcnt_ver( FD_VOLATILE_CONST( chain->ver_cnt ) );
143 0 : FD_CRIT( ver&1UL, "chain is not locked" );
144 :
145 : /* Walk map chain */
146 :
147 0 : fd_funk_rec_t * found_rec = NULL;
148 0 : uint * pnext = &chain->head_cidx;
149 0 : uint cur = *pnext;
150 0 : ulong chain_len = 0UL;
151 0 : ulong iter = 0UL;
152 0 : while( cur!=FD_FUNK_REC_IDX_NULL ) {
153 0 : if( FD_UNLIKELY( iter++ > rec_max ) ) FD_LOG_CRIT(( "cycle detected in rec_map chain %lu", chain_idx ));
154 :
155 : /* Is this node garbage? */
156 :
157 0 : fd_funk_rec_t * node = &funk->rec_pool->ele[ cur ];
158 0 : if( FD_UNLIKELY( cur==node->map_next ) ) FD_LOG_CRIT(( "accdb corruption detected: cycle in rec_map chain %lu", chain_idx ));
159 0 : cur = node->map_next;
160 0 : if( !fd_funk_rec_key_eq( rec->pair.key, node->pair.key ) ) goto retain;
161 0 : if( node->pair.xid->ul[0]>root_slot ) goto retain;
162 0 : if( !found_rec ) {
163 0 : found_rec = node;
164 0 : goto retain;
165 0 : }
166 :
167 : /* No longer need this node */
168 :
169 0 : if( node->pair.xid->ul[0] > rec->pair.xid->ul[0] ) {
170 : /* If this node is newer than the to-be-deleted slot, need to
171 : remove it from the transaction's record list. */
172 0 : uint neigh_prev = node->prev_idx;
173 0 : uint neigh_next = node->next_idx;
174 0 : if( neigh_prev==FD_FUNK_REC_IDX_NULL ||
175 0 : neigh_next==FD_FUNK_REC_IDX_NULL ) {
176 : /* Node is first or last of transaction -- too bothersome to
177 : remove it from the transaction's record list */
178 0 : goto retain;
179 0 : }
180 0 : rec_pool[ neigh_next ].prev_idx = neigh_prev;
181 0 : rec_pool[ neigh_prev ].next_idx = neigh_next;
182 0 : }
183 :
184 : /* Destroy this node */
185 :
186 0 : funk_free_rec( funk, node );
187 0 : *pnext = cur;
188 0 : continue;
189 :
190 0 : retain:
191 0 : pnext = &node->map_next;
192 0 : chain_len++;
193 0 : }
194 :
195 : /* Unlock rec_map chain */
196 :
197 0 : FD_COMPILER_MFENCE();
198 0 : FD_VOLATILE( chain->ver_cnt ) =
199 0 : fd_funk_rec_map_private_vcnt( ver+1UL, chain_len );
200 0 : FD_COMPILER_MFENCE();
201 0 : return found_rec==rec ? found_rec : NULL;
202 0 : }
203 :
204 : /* Main algorithm */
205 :
206 : fd_funk_rec_t *
207 : fd_accdb_v2_root_batch( fd_accdb_admin_v2_t * admin,
208 0 : fd_funk_rec_t * rec0 ) {
209 0 : long t_start = fd_tickcount();
210 :
211 0 : fd_funk_t * funk = admin->v1->funk; /* unrooted DB */
212 0 : fd_wksp_t * funk_wksp = funk->wksp; /* shm workspace containing unrooted accounts */
213 0 : fd_funk_rec_t * rec_pool = funk->rec_pool->ele; /* funk rec arena */
214 0 : fd_vinyl_rq_t * rq = admin->vinyl_rq; /* "request queue "*/
215 0 : fd_vinyl_req_pool_t * req_pool = admin->vinyl_req_pool; /* "request pool" */
216 0 : fd_wksp_t * req_wksp = admin->vinyl_req_wksp; /* shm workspace containing request buffer */
217 0 : fd_wksp_t * data_wksp = admin->vinyl_data_wksp; /* shm workspace containing vinyl data cache */
218 0 : ulong link_id = admin->vinyl_link_id; /* vinyl client ID */
219 :
220 : /* Collect funk request batch */
221 :
222 0 : fd_funk_rec_t * recs[ FD_ACCDB_ROOT_BATCH_MAX ];
223 0 : ulong rec_cnt;
224 :
225 0 : fd_funk_rec_t * next = rec0;
226 0 : for( rec_cnt=0UL; next && rec_cnt<FD_ACCDB_ROOT_BATCH_MAX; ) {
227 0 : fd_funk_rec_t * cur = next;
228 0 : if( fd_funk_rec_idx_is_null( cur->next_idx ) ) {
229 0 : next = NULL;
230 0 : } else {
231 0 : next = &rec_pool[ cur->next_idx ];
232 0 : }
233 0 : cur->prev_idx = FD_FUNK_REC_IDX_NULL;
234 0 : cur->next_idx = FD_FUNK_REC_IDX_NULL;
235 :
236 0 : if( funk_gc_chain( admin, cur ) ) {
237 0 : recs[ rec_cnt++ ] = cur;
238 0 : }
239 0 : }
240 :
241 : /* Partition batch into ACQUIRE (updates) and ERASE (deletions) */
242 :
243 0 : ulong acq_cnt = 0UL;
244 0 : ulong del_cnt;
245 0 : for( ulong i=0UL; i<rec_cnt; i++ ) {
246 0 : fd_account_meta_t const * meta = fd_funk_val( recs[ i ], funk_wksp );
247 0 : FD_CRIT( meta && recs[ i ]->val_sz>=sizeof(fd_account_meta_t), "corrupt funk_rec" );
248 0 : if( meta->lamports ) {
249 0 : fd_funk_rec_t * tmp = recs[ i ];
250 0 : recs[ i ] = recs[ acq_cnt ];
251 0 : recs[ acq_cnt ] = tmp;
252 0 : acq_cnt++;
253 0 : }
254 0 : }
255 0 : del_cnt = rec_cnt - acq_cnt;
256 :
257 : /* Create ACQUIRE and ERASE batch requests */
258 :
259 0 : ulong del_batch = fd_vinyl_req_pool_acquire( req_pool ); /* ERASE */
260 0 : ulong acq_batch = fd_vinyl_req_pool_acquire( req_pool ); /* ACQUIRE */
261 0 : fd_vinyl_key_t * acq_key0 = fd_vinyl_req_batch_key( req_pool, acq_batch );
262 0 : fd_vinyl_key_t * del_key0 = fd_vinyl_req_batch_key( req_pool, del_batch );
263 :
264 0 : for( ulong i=0UL; i<acq_cnt; i++ ) {
265 0 : fd_vinyl_key_init( &acq_key0[ i ], recs[ i ]->pair.key, 32UL );
266 0 : }
267 0 : for( ulong i=0UL; i<del_cnt; i++ ) {
268 0 : fd_vinyl_key_init( &del_key0[ i ], recs[ acq_cnt+i ]->pair.key, 32UL );
269 0 : }
270 :
271 : /* Send off ACQUIRE and ERASE requests */
272 :
273 0 : fd_vinyl_comp_t * acq_comp = fd_vinyl_req_batch_comp ( req_pool, acq_batch );
274 0 : fd_vinyl_comp_t * del_comp = fd_vinyl_req_batch_comp ( req_pool, del_batch );
275 0 : schar * acq_err0 = fd_vinyl_req_batch_err ( req_pool, acq_batch );
276 0 : schar * del_err0 = fd_vinyl_req_batch_err ( req_pool, del_batch );
277 0 : ulong * acq_val_gaddr0 = fd_vinyl_req_batch_val_gaddr( req_pool, acq_batch );
278 :
279 0 : memset( acq_comp, 0, sizeof(fd_vinyl_comp_t) );
280 0 : memset( del_comp, 0, sizeof(fd_vinyl_comp_t) );
281 0 : for( ulong i=0UL; i<acq_cnt; i++ ) acq_err0[ i ] = 0;
282 0 : for( ulong i=0UL; i<del_cnt; i++ ) del_err0[ i ] = 0;
283 0 : for( ulong i=0UL; i<acq_cnt; i++ ) {
284 0 : fd_account_meta_t const * src_meta = fd_funk_val( recs[ i ], funk_wksp );
285 :
286 0 : ulong data_sz = src_meta->dlen;
287 0 : FD_CRIT( data_sz<=FD_RUNTIME_ACC_SZ_MAX, "oversize account record" );
288 :
289 0 : ulong val_sz = sizeof(fd_account_meta_t) + data_sz;
290 0 : acq_val_gaddr0[ i ] = val_sz;
291 0 : admin->base.root_tot_sz += val_sz;
292 0 : }
293 :
294 0 : fd_vinyl_req_send_batch(
295 0 : rq, req_pool, req_wksp,
296 0 : admin->vinyl_req_id++, link_id,
297 0 : FD_VINYL_REQ_TYPE_ACQUIRE,
298 0 : FD_VINYL_REQ_FLAG_MODIFY |
299 0 : FD_VINYL_REQ_FLAG_IGNORE |
300 0 : FD_VINYL_REQ_FLAG_CREATE,
301 0 : acq_batch, acq_cnt
302 0 : );
303 0 : fd_vinyl_req_send_batch(
304 0 : rq, req_pool, req_wksp,
305 0 : admin->vinyl_req_id++, link_id,
306 0 : FD_VINYL_REQ_TYPE_ERASE,
307 0 : 0UL,
308 0 : del_batch, del_cnt
309 0 : );
310 :
311 : /* Spin for ACQUIRE completion */
312 :
313 0 : vinyl_spin_wait( acq_comp, acq_key0, acq_err0, acq_cnt, "ACQUIRE" );
314 0 : long t_acquire = fd_tickcount();
315 :
316 : /* Copy back modified accounts */
317 :
318 0 : for( ulong i=0UL; i<acq_cnt; i++ ) {
319 0 : fd_account_meta_t const * src_meta = fd_funk_val( recs[ i ], funk_wksp );
320 :
321 0 : ulong data_sz = src_meta->dlen;
322 0 : ulong val_sz = sizeof(fd_account_meta_t) + data_sz;
323 0 : FD_CRIT( data_sz<=FD_RUNTIME_ACC_SZ_MAX, "oversize account record" );
324 :
325 0 : fd_account_meta_t * dst_meta = fd_wksp_laddr_fast( data_wksp, acq_val_gaddr0[ i ] );
326 0 : fd_vinyl_info_t * val_info = fd_vinyl_data_info( dst_meta );
327 :
328 0 : fd_memcpy( dst_meta, src_meta, val_sz );
329 0 : val_info->val_sz = (uint)val_sz;
330 0 : }
331 :
332 : /* Send off RELEASE batch request (reuse acq_batch) */
333 :
334 0 : memset( acq_comp, 0, sizeof(fd_vinyl_comp_t) );
335 0 : for( ulong i=0UL; i<acq_cnt; i++ ) acq_err0[ i ] = 0;
336 0 : fd_vinyl_req_send_batch(
337 0 : rq, req_pool, req_wksp,
338 0 : admin->vinyl_req_id++, link_id,
339 0 : FD_VINYL_REQ_TYPE_RELEASE,
340 0 : FD_VINYL_REQ_FLAG_MODIFY,
341 0 : acq_batch, acq_cnt
342 0 : );
343 0 : long t_copy = fd_tickcount();
344 :
345 : /* Spin for ERASE, RELEASE completions */
346 :
347 0 : vinyl_spin_wait( del_comp, del_key0, del_err0, del_cnt, "ERASE" );
348 0 : fd_vinyl_req_pool_release( req_pool, del_batch );
349 :
350 0 : vinyl_spin_wait( acq_comp, acq_key0, acq_err0, acq_cnt, "RELEASE" );
351 0 : fd_vinyl_req_pool_release( req_pool, acq_batch );
352 0 : long t_release = fd_tickcount();
353 :
354 : /* Remove funk records */
355 :
356 0 : for( ulong i=0UL; i<rec_cnt; i++ ) {
357 0 : fd_funk_xid_key_pair_t pair = recs[ i ]->pair;
358 0 : fd_funk_rec_query_t query[1];
359 0 : int rm_err = fd_funk_rec_map_remove( funk->rec_map, &pair, NULL, query, FD_MAP_FLAG_BLOCKING );
360 0 : if( FD_UNLIKELY( rm_err!=FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "fd_funk_rec_map_remove failed (%i-%s)", rm_err, fd_map_strerror( rm_err ) ));
361 0 : funk_free_rec( funk, recs[ i ] );
362 0 : }
363 0 : long t_gc = fd_tickcount();
364 :
365 : /* Update metrics */
366 :
367 0 : admin->base.root_cnt += (uint)acq_cnt;
368 0 : admin->base.reclaim_cnt += (uint)del_cnt;
369 0 : admin->base.dt_vinyl += ( t_acquire - t_start ) + ( t_release - t_copy );
370 0 : admin->base.dt_copy += ( t_copy - t_acquire );
371 0 : admin->base.dt_gc += ( t_gc - t_release );
372 :
373 0 : return next;
374 0 : }
|