Line data Source code
1 : #define _GNU_SOURCE
2 : #include "fd_accdb.h"
3 : #include "fd_accdb_shmem.h"
4 : #define FD_ACCDB_NO_FORK_ID
5 : #include "fd_accdb_private.h"
6 : #undef FD_ACCDB_NO_FORK_ID
7 :
8 : #if FD_TMPL_USE_HANDHOLDING
9 : #include "../../ballet/txn/fd_txn.h"
10 : #include "../../ballet/base58/fd_base58.h"
11 : #endif
12 : #include "../../util/racesan/fd_racesan_target.h"
13 :
14 : FD_STATIC_ASSERT( sizeof(fd_accdb_cache_line_t)==FD_ACCDB_CACHE_META_SZ, cache_meta_sz );
15 :
16 : #if FD_HAS_RACESAN
17 : /* Test-only telemetry: background_compact publishes the pubkey + dest
18 : offset of the record it is about to relocation-CAS at the
19 : accdb_compact:pre_offset_cas hook, so test_accdb_racesan can PROVE the
20 : parked relocation is the account it set up (avoiding a vacuous test).
21 : Zero-cost / absent in production (racesan off). */
22 : uchar fd_accdb_dbg_reloc_pubkey[ 32UL ];
23 : ulong fd_accdb_dbg_reloc_dest;
24 : ulong fd_accdb_dbg_reloc_cnt;
25 : #endif
26 :
27 : #include <stddef.h>
28 : #include <unistd.h>
29 : #include <fcntl.h>
30 : #include <errno.h>
31 : #include <sys/uio.h>
32 :
33 : struct fd_accdb_fork {
34 : fd_accdb_fork_shmem_t * shmem;
35 : descends_set_t * descends;
36 : };
37 :
38 : typedef struct fd_accdb_fork fd_accdb_fork_t;
39 :
40 250533 : #define FD_ACCDB_ACQUIRE_STATE_IDLE (0)
41 174 : #define FD_ACCDB_ACQUIRE_STATE_PHASE_A (1)
42 246882 : #define FD_ACCDB_ACQUIRE_STATE_OPEN (2)
43 :
44 : struct __attribute__((aligned(FD_ACCDB_ALIGN))) fd_accdb_private {
45 : int fd;
46 :
47 : int acquire_state;
48 :
49 : fd_accdb_shmem_t * shmem;
50 :
51 : fd_accdb_fork_t * fork_pool;
52 : fork_pool_t fork_shmem_pool[1];
53 :
54 : fd_accdb_accmeta_t * acc_pool;
55 : acc_pool_t acc_pool_join[1];
56 : uint * acc_map;
57 :
58 : uchar * cache [ FD_ACCDB_CACHE_CLASS_CNT ];
59 :
60 : fd_accdb_partition_t * partition_pool;
61 : compaction_dlist_t * compaction_dlist[ FD_ACCDB_COMPACTION_LAYER_CNT ];
62 : deferred_free_dlist_t * deferred_free_dlist;
63 :
64 : txn_pool_t txn_pool[1];
65 :
66 : /* Pointer into shmem->joiner_epochs[ my_slot ].val for writer
67 : joiners, or into a private per-tile fseq for read-only joiners.
68 : Set to the current global epoch on entry to an epoch-protected
69 : operation, and ULONG_MAX on exit. Used to determine when
70 : deferred frees are safe. */
71 : ulong * my_epoch_slot;
72 :
73 : /* Read-only pointers to external epoch slots (e.g. fseqs owned by
74 : RO consumer tiles like the rpc tile). Scanned in addition to
75 : shmem->joiner_epochs[] by compaction's deferred-free
76 : reclamation. Borrowed; the caller of fd_accdb_new owns the
77 : storage. */
78 : ulong const * const * external_epoch_slots;
79 : ulong external_epoch_cnt;
80 :
81 : /* Side buffer of acc pool indices that have been CAS-unlinked from
82 : their hash chains but cannot be released back to acc_pool yet,
83 : because concurrent readers (acquire / compact) may still be
84 : traversing the removed nodes via map.next. The batch is released
85 : once all joiner_epochs exceed shmem->deferred_acc_epoch. Indices
86 : are written here (not into pool.next) until after the epoch drain
87 : because pool.next is union-aliased to cache_idx, which a concurrent
88 : cold_load_acc may still write through a captured pointer. Backed
89 : by shmem->deferred_acc_buf_off; cnt and epoch live in shmem too. */
90 : uint * deferred_acc_buf;
91 :
92 : /* Chain of fork pool slots whose IDs are still potentially
93 : referenced by concurrent readers (via descends_set_test or
94 : root_fork_id snapshot). The chain is released back to fork_pool
95 : once all joiner_epochs exceed deferred_fork_epoch. NULL head
96 : means no deferred forks. */
97 : fd_accdb_fork_shmem_t * deferred_fork_head;
98 : fd_accdb_fork_shmem_t * deferred_fork_tail;
99 : ulong deferred_fork_epoch;
100 :
101 : fd_accdb_metrics_t metrics[1];
102 :
103 : /* Set by fd_accdb_snapshot_load_begin/end. When non-zero, layer-0
104 : partition handoffs (in change_partition) re-tier the partitions
105 : that fell out of the snapshot-load working set: P-2 to Warm and
106 : P-3 to Cold. This backfills tiering for snapshot-loaded data
107 : that never gets a second write (and therefore would otherwise
108 : never be promoted by compaction). */
109 : int snapshot_loading;
110 : };
111 :
112 : static inline fd_accdb_cache_line_t *
113 : cache_line( fd_accdb_t * accdb,
114 : ulong cls,
115 2024214 : ulong idx ) {
116 2024214 : return (fd_accdb_cache_line_t *)( accdb->cache[ cls ] + idx * fd_accdb_cache_slot_sz[ cls ] );
117 2024214 : }
118 :
119 : /* Bump the per-partition read counters for the partition that contains
120 : file_offset. Called at preadv2 sites. Writes are counted at
121 : allocate time (see fd_accdb_partition_write_bump) so that they reflect
122 : bytes committed to a partition rather than syscalls — the snapshot
123 : loader bypasses pwritev2 entirely, but every write still goes through
124 : allocate_next_write. */
125 : static inline void
126 : fd_accdb_partition_read_bump( fd_accdb_t * accdb,
127 : ulong file_offset,
128 24 : ulong bytes ) {
129 24 : if( FD_UNLIKELY( !bytes ) ) return;
130 : /* Readonly joiners have no partition_pool join (see
131 : fd_accdb_join_readonly) and do not contribute to per-partition
132 : read telemetry today; their disk reads still show up in the
133 : joiner-local fd_accdb_metrics_t bytes_read/read_ops. */
134 24 : if( FD_UNLIKELY( !accdb->partition_pool ) ) return;
135 24 : ulong partition_idx = file_offset / accdb->shmem->partition_sz;
136 24 : fd_accdb_partition_t * p = partition_pool_ele( accdb->partition_pool, partition_idx );
137 24 : if( FD_UNLIKELY( !p ) ) return;
138 24 : FD_ATOMIC_FETCH_AND_ADD( &p->bytes_read, bytes );
139 24 : FD_ATOMIC_FETCH_AND_ADD( &p->read_ops, 1UL );
140 24 : }
141 :
142 : /* Bump the per-partition write counters at allocate time. bytes is the
143 : reserved size, which equals the bytes that will land on this
144 : partition. Called from allocate_next_write and
145 : allocate_next_compaction_write. */
146 : static inline void
147 : fd_accdb_partition_write_bump( fd_accdb_t * accdb,
148 : ulong file_offset,
149 45 : ulong bytes ) {
150 45 : if( FD_UNLIKELY( !bytes ) ) return;
151 45 : ulong partition_idx = file_offset / accdb->shmem->partition_sz;
152 45 : fd_accdb_partition_t * p = partition_pool_ele( accdb->partition_pool, partition_idx );
153 45 : if( FD_UNLIKELY( !p ) ) return;
154 45 : FD_ATOMIC_FETCH_AND_ADD( &p->bytes_written, bytes );
155 45 : FD_ATOMIC_FETCH_AND_ADD( &p->write_ops, 1UL );
156 45 : }
157 :
158 : static inline ulong
159 : cache_line_idx( fd_accdb_t * accdb,
160 : ulong cls,
161 1742382 : fd_accdb_cache_line_t const * line ) {
162 1742382 : return (ulong)( (uchar const *)line - accdb->cache[ cls ] ) / fd_accdb_cache_slot_sz[ cls ];
163 1742382 : }
164 :
165 : #if FD_TMPL_USE_HANDHOLDING
166 : static inline int
167 : fd_accdb_ptr_in_region( fd_accdb_t const * accdb,
168 : ulong cls,
169 : void const * ptr ) {
170 : if( FD_UNLIKELY( cls>=FD_ACCDB_CACHE_CLASS_CNT ) ) return 0;
171 :
172 : uchar const * base = accdb->cache[ cls ];
173 : if( FD_UNLIKELY( !base ) ) return 0;
174 :
175 : ulong slot_sz = fd_accdb_cache_slot_sz[ cls ];
176 : ulong region_sz = accdb->shmem->cache_class_max[ cls ] * slot_sz;
177 : uchar const * p = (uchar const *)ptr;
178 :
179 : if( FD_UNLIKELY( p<base || p>=base+region_sz ) ) return 0;
180 : return ( (ulong)( p - base ) % slot_sz )==FD_ACCDB_CACHE_META_SZ;
181 : }
182 : #endif
183 :
184 : FD_FN_CONST ulong
185 11523 : fd_accdb_align( void ) {
186 11523 : return FD_ACCDB_ALIGN;
187 11523 : }
188 :
189 : FD_FN_CONST ulong
190 231 : fd_accdb_footprint( ulong max_live_slots ) {
191 231 : ulong l;
192 231 : l = FD_LAYOUT_INIT;
193 231 : l = FD_LAYOUT_APPEND( l, FD_ACCDB_ALIGN, sizeof(fd_accdb_t) );
194 231 : l = FD_LAYOUT_APPEND( l, alignof(fd_accdb_fork_t), max_live_slots*sizeof(fd_accdb_fork_t) );
195 231 : return FD_LAYOUT_FINI( l, FD_ACCDB_ALIGN );
196 231 : }
197 :
198 : void *
199 : fd_accdb_new( void * ljoin,
200 : fd_accdb_shmem_t * shmem,
201 : int fd,
202 : ulong external_epoch_cnt,
203 3765 : ulong const ** external_epoch_slots ) {
204 3765 : if( FD_UNLIKELY( !ljoin ) ) {
205 0 : FD_LOG_WARNING(( "NULL ljoin" ));
206 0 : return NULL;
207 0 : }
208 :
209 3765 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)ljoin, fd_accdb_align() ) ) ) {
210 0 : FD_LOG_WARNING(( "misaligned ljoin" ));
211 0 : return NULL;
212 0 : }
213 :
214 3765 : if( FD_UNLIKELY( fd<0 ) ) {
215 0 : FD_LOG_WARNING(( "fd must be a valid file descriptor" ));
216 0 : return NULL;
217 0 : }
218 :
219 3765 : ulong max_live_slots = shmem->max_live_slots;
220 3765 : ulong max_accounts = shmem->max_accounts;
221 3765 : ulong max_account_writes_per_slot = shmem->max_account_writes_per_slot;
222 3765 : ulong partition_cnt = shmem->partition_cnt;
223 :
224 3765 : ulong chain_cnt = fd_ulong_pow2_up( (max_accounts>>1) + (max_accounts&1UL) );
225 3765 : ulong txn_max = max_live_slots * max_account_writes_per_slot;
226 :
227 3765 : FD_SCRATCH_ALLOC_INIT( l, shmem );
228 3765 : FD_SCRATCH_ALLOC_APPEND( l, FD_ACCDB_SHMEM_ALIGN, sizeof(fd_accdb_shmem_t) );
229 3765 : void * _fork_pool_shmem = FD_SCRATCH_ALLOC_APPEND( l, fork_pool_align(), fork_pool_footprint() );
230 3765 : void * _fork_pool_ele = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_accdb_fork_shmem_t), max_live_slots*sizeof(fd_accdb_fork_shmem_t) );
231 3765 : void * _descends_sets = FD_SCRATCH_ALLOC_APPEND( l, descends_set_align(), max_live_slots*descends_set_footprint( max_live_slots ) );
232 3765 : void * _acc_map = FD_SCRATCH_ALLOC_APPEND( l, alignof(uint), chain_cnt*sizeof(uint) );
233 3765 : void * _acc_pool_shmem = FD_SCRATCH_ALLOC_APPEND( l, acc_pool_align(), acc_pool_footprint() );
234 3765 : void * _acc_pool_ele = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_accdb_accmeta_t), max_accounts*sizeof(fd_accdb_accmeta_t) );
235 3765 : void * _txn_pool_shmem = FD_SCRATCH_ALLOC_APPEND( l, txn_pool_align(), txn_pool_footprint() );
236 3765 : void * _txn_pool_ele = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_accdb_txn_t), txn_max*sizeof(fd_accdb_txn_t) );
237 3765 : void * _partition_pool = FD_SCRATCH_ALLOC_APPEND( l, partition_pool_align(), partition_pool_footprint( partition_cnt ) );
238 3765 : void * _compaction_dlists[ FD_ACCDB_COMPACTION_LAYER_CNT ];
239 15060 : for( ulong k=0UL; k<FD_ACCDB_COMPACTION_LAYER_CNT; k++ ) {
240 11295 : _compaction_dlists[ k ] = FD_SCRATCH_ALLOC_APPEND( l, compaction_dlist_align(), compaction_dlist_footprint() );
241 11295 : }
242 3765 : void * _deferred_free_dlist = FD_SCRATCH_ALLOC_APPEND( l, deferred_free_dlist_align(), deferred_free_dlist_footprint() );
243 :
244 3765 : FD_SCRATCH_ALLOC_INIT( l2, ljoin );
245 3765 : fd_accdb_t * accdb = FD_SCRATCH_ALLOC_APPEND( l2, fd_accdb_align(), sizeof(fd_accdb_t) );
246 3765 : void * _local_fork_pool = FD_SCRATCH_ALLOC_APPEND( l2, alignof(fd_accdb_fork_t), max_live_slots*sizeof(fd_accdb_fork_t) );
247 :
248 3765 : accdb->fd = fd;
249 3765 : accdb->acquire_state = FD_ACCDB_ACQUIRE_STATE_IDLE;
250 3765 : accdb->snapshot_loading = 0;
251 :
252 3765 : accdb->shmem = (fd_accdb_shmem_t *)shmem;
253 3765 : FD_TEST( acc_pool_join( accdb->acc_pool_join, _acc_pool_shmem, _acc_pool_ele, max_accounts ) );
254 3765 : accdb->acc_pool = accdb->acc_pool_join->ele;
255 3765 : accdb->acc_map = _acc_map;
256 3765 : FD_TEST( txn_pool_join( accdb->txn_pool, _txn_pool_shmem, _txn_pool_ele, txn_max ) );
257 33885 : for( ulong c=0UL; c<FD_ACCDB_CACHE_CLASS_CNT; c++ ) accdb->cache[ c ] = (uchar *)shmem + shmem->cache_region_off[ c ];
258 3765 : accdb->partition_pool = partition_pool_join( _partition_pool );
259 3765 : FD_TEST( accdb->partition_pool );
260 15060 : for( ulong k=0UL; k<FD_ACCDB_COMPACTION_LAYER_CNT; k++ ) {
261 11295 : accdb->compaction_dlist[ k ] = compaction_dlist_join( _compaction_dlists[ k ] );
262 11295 : FD_TEST( accdb->compaction_dlist[ k ] );
263 11295 : }
264 3765 : accdb->deferred_free_dlist = deferred_free_dlist_join( _deferred_free_dlist );
265 3765 : FD_TEST( accdb->deferred_free_dlist );
266 :
267 3765 : FD_TEST( fork_pool_join( accdb->fork_shmem_pool, _fork_pool_shmem, _fork_pool_ele, max_live_slots ) );
268 3765 : accdb->fork_pool = _local_fork_pool;
269 66861 : for( ulong i=0UL; i<max_live_slots; i++ ) {
270 63096 : fd_accdb_fork_t * fork = &accdb->fork_pool[ i ];
271 63096 : fork->shmem = fork_pool_ele( accdb->fork_shmem_pool, i );
272 63096 : fork->descends = descends_set_join( (uchar *)_descends_sets + i*descends_set_footprint( max_live_slots ) );
273 63096 : FD_TEST( fork->shmem );
274 63096 : FD_TEST( fork->descends );
275 63096 : }
276 :
277 3765 : ulong epoch_idx = FD_ATOMIC_FETCH_AND_ADD( &shmem->joiner_cnt, 1UL );
278 3765 : FD_TEST( epoch_idx<shmem->joiner_cnt_max );
279 3765 : accdb->my_epoch_slot = &shmem->joiner_epochs[ epoch_idx ].val;
280 :
281 3765 : accdb->external_epoch_slots = external_epoch_slots;
282 3765 : accdb->external_epoch_cnt = external_epoch_cnt;
283 :
284 3765 : accdb->deferred_acc_buf = (uint *)( (uchar *)shmem + shmem->deferred_acc_buf_off );
285 :
286 3765 : accdb->deferred_fork_head = NULL;
287 3765 : accdb->deferred_fork_tail = NULL;
288 3765 : accdb->deferred_fork_epoch = 0UL;
289 :
290 3765 : memset( accdb->metrics, 0, sizeof(fd_accdb_metrics_t) );
291 :
292 3765 : return accdb;
293 3765 : }
294 :
295 : static inline void wait_cmd( fd_accdb_t * accdb );
296 : static inline void submit_cmd( fd_accdb_t * accdb, uint op, ushort fork_id );
297 :
298 : void
299 3 : fd_accdb_reset( fd_accdb_t * accdb ) {
300 3 : fd_accdb_shmem_t * shmem = accdb->shmem;
301 :
302 : /* Wait for any pending background command (advance_root / purge) on
303 : T2 to finish before clobbering shared state. */
304 3 : wait_cmd( accdb );
305 :
306 : /* Reset pools through the joiner's existing pointers. acc_pool and
307 : txn_pool use POOL_LAZY=1 so reset is O(1). fork_pool and
308 : partition_pool rebuild their free lists in O(max_live_slots) and
309 : O(partition_cnt), both small. */
310 3 : acc_pool_reset( accdb->acc_pool_join );
311 3 : txn_pool_reset( accdb->txn_pool );
312 3 : fork_pool_reset( accdb->fork_shmem_pool );
313 3 : partition_pool_reset( accdb->partition_pool );
314 :
315 : /* Clear hash chains */
316 3 : fd_memset( accdb->acc_map, 0xFF, shmem->chain_cnt*sizeof(uint) );
317 :
318 : /* Empty dlists */
319 12 : for( ulong k=0UL; k<FD_ACCDB_COMPACTION_LAYER_CNT; k++ ) {
320 9 : compaction_dlist_remove_all( accdb->compaction_dlist[ k ], accdb->partition_pool );
321 9 : }
322 3 : deferred_free_dlist_remove_all( accdb->deferred_free_dlist, accdb->partition_pool );
323 :
324 : /* Null descends_sets. */
325 195 : for( ulong i=0UL; i<shmem->max_live_slots; i++ ) {
326 192 : descends_set_null( accdb->fork_pool[ i ].descends );
327 192 : }
328 :
329 : /* Reset shmem scalar fields. */
330 3 : shmem->root_fork_id = (fd_accdb_fork_id_t){ .val = USHORT_MAX };
331 3 : shmem->generation = 0U;
332 3 : shmem->partition_lock = 0;
333 3 : shmem->partition_max = 0UL;
334 :
335 : /* Write heads: sentinel values that force partition-switch on first
336 : write. */
337 12 : for( ulong k=0UL; k<FD_ACCDB_COMPACTION_LAYER_CNT; k++ ) {
338 9 : shmem->whead[ k ] = accdb_offset( shmem->partition_cnt, shmem->partition_sz );
339 9 : shmem->has_partition[ k ] = 0;
340 9 : }
341 :
342 : /* Cache state */
343 27 : for( ulong c=0UL; c<FD_ACCDB_CACHE_CLASS_CNT; c++ ) {
344 24 : shmem->clock_hand[ c ].val = 0UL;
345 24 : shmem->cache_free[ c ].ver_top = (ulong)UINT_MAX;
346 24 : shmem->cache_free_cnt[ c ].val = 0UL;
347 24 : shmem->cache_class_init[ c ].val = 0UL;
348 24 : if( shmem->cache_class_max[ c ]>=shmem->cache_min_reserved*shmem->joiner_cnt_max )
349 24 : shmem->cache_class_used[ c ].val = ULONG_MAX;
350 0 : else
351 0 : shmem->cache_class_used[ c ].val = 0UL;
352 24 : }
353 :
354 : /* Reset every cache slot's metadata to empty sentinels. */
355 27 : for( ulong c=0UL; c<FD_ACCDB_CACHE_CLASS_CNT; c++ ) {
356 24 : ulong slot_sz = fd_accdb_cache_slot_sz[ c ];
357 82200 : for( ulong i=0UL; i<shmem->cache_class_max[ c ]; i++ ) {
358 82176 : fd_accdb_cache_line_t * line = (fd_accdb_cache_line_t *)( accdb->cache[ c ] + i*slot_sz );
359 82176 : line->key.generation = UINT_MAX;
360 82176 : line->acc_idx = UINT_MAX;
361 82176 : line->refcnt = 0U;
362 82176 : line->referenced = 0;
363 82176 : line->persisted = 1;
364 82176 : }
365 24 : }
366 :
367 : /* Epoch system: reset epoch and all slot values to idle, but
368 : preserve joiner_cnt and each tile's my_epoch_slot pointer so that
369 : tiles which joined during init keep their original slot indices. */
370 3 : shmem->epoch = 1UL;
371 771 : for( ulong i=0UL; i<FD_ACCDB_MAX_JOINERS; i++ ) shmem->joiner_epochs[ i ].val = ULONG_MAX;
372 :
373 : /* Deferred acc buffer. */
374 3 : shmem->deferred_acc_buf_cnt = 0UL;
375 3 : shmem->deferred_acc_epoch = 0UL;
376 :
377 : /* Shared metrics: zero gauges that reflect current state (now empty)
378 : but preserve counters and accounts_capacity. */
379 3 : shmem->shmetrics->accounts_total = 0UL;
380 3 : shmem->shmetrics->disk_allocated_bytes = 0UL;
381 3 : shmem->shmetrics->disk_current_bytes = 0UL;
382 3 : shmem->shmetrics->disk_used_bytes = 0UL;
383 3 : shmem->shmetrics->in_compaction = 0;
384 :
385 : /* Command slot */
386 3 : shmem->cmd_op = FD_ACCDB_CMD_IDLE;
387 3 : shmem->cmd_fork_id = USHORT_MAX;
388 :
389 3 : shmem->snapshot_loading = 0;
390 :
391 3 : FD_COMPILER_MFENCE();
392 :
393 : /* Tell the accdb tile to clear its stale deferred fork chain.
394 : Its deferred_fork_head/tail now reference recycled pool elements;
395 : it must discard them before processing any future advance_root or
396 : purge command. The command is asynchronous; the next advance_root
397 : or purge call will wait for it to complete via wait_cmd. */
398 3 : submit_cmd( accdb, FD_ACCDB_CMD_CLEAR_DEFERRED, 0 );
399 :
400 : /* Reset local state */
401 3 : accdb->deferred_fork_head = NULL;
402 3 : accdb->deferred_fork_tail = NULL;
403 3 : accdb->deferred_fork_epoch = 0UL;
404 3 : accdb->snapshot_loading = 0;
405 3 : accdb->acquire_state = FD_ACCDB_ACQUIRE_STATE_IDLE;
406 3 : }
407 :
408 : void
409 12 : fd_accdb_snapshot_load_begin( fd_accdb_t * accdb ) {
410 12 : accdb->snapshot_loading = 1;
411 12 : FD_VOLATILE( accdb->shmem->snapshot_loading ) = 1;
412 12 : }
413 :
414 : static inline void
415 : change_partition( fd_accdb_t * accdb,
416 : accdb_offset_t const * offset_before,
417 : accdb_offset_t * out_offset,
418 : int * has_partition,
419 : uchar layer );
420 :
421 : void
422 12 : fd_accdb_snapshot_load_end( fd_accdb_t * accdb ) {
423 12 : spin_lock_acquire( &accdb->shmem->partition_lock );
424 :
425 : /* Force the next layer-0 write onto a fresh Hot partition so we do
426 : not keep appending live execution writes to the tail of a partition
427 : that was tagged Cold during snapshot load. Must run while
428 : snapshot_loading is still set so the partition we just closed
429 : (the snapshot-tagged Cold one) is not enqueued for compaction by
430 : change_partition's tail-credit try_enqueue. change_partition will
431 : retag the newly-allocated partition as Cold (because the flag is
432 : still set), so we fix it back to Hot below. */
433 12 : if( FD_LIKELY( accdb->shmem->has_partition[ 0 ] ) ) {
434 12 : change_partition( accdb, &accdb->shmem->whead[ 0 ], &accdb->shmem->whead[ 0 ], &accdb->shmem->has_partition[ 0 ], 0 );
435 12 : ulong new_idx = packed_partition_idx( &accdb->shmem->whead[ 0 ] );
436 12 : fd_accdb_partition_t * newp = partition_pool_ele( accdb->partition_pool, new_idx );
437 12 : FD_VOLATILE( newp->layer ) = 0;
438 12 : }
439 :
440 12 : accdb->snapshot_loading = 0;
441 12 : FD_VOLATILE( accdb->shmem->snapshot_loading ) = 0;
442 :
443 : /* Sweep all partitions written during the load — any that crossed
444 : the fragmentation threshold while enqueue was suppressed are
445 : re-checked now and pushed onto the compaction queue. */
446 12 : ulong partition_max = accdb->shmem->partition_max;
447 60 : for( ulong p=0UL; p<partition_max; p++ ) {
448 48 : fd_accdb_shmem_try_enqueue_compaction( accdb->shmem, p );
449 48 : }
450 :
451 12 : spin_lock_release( &accdb->shmem->partition_lock );
452 12 : }
453 :
454 : void
455 : fd_accdb_snapshot_save_whead( fd_accdb_t * accdb,
456 6 : fd_accdb_snapshot_recovery_t * out ) {
457 6 : out->whead_val = FD_VOLATILE_CONST( accdb->shmem->whead[ 0 ].val );
458 6 : out->has_partition = FD_VOLATILE_CONST( accdb->shmem->has_partition[ 0 ] );
459 6 : out->partition_max = FD_VOLATILE_CONST( accdb->shmem->partition_max );
460 6 : out->disk_current_bytes = FD_VOLATILE_CONST( accdb->shmem->shmetrics->disk_current_bytes );
461 :
462 6 : if( out->has_partition ) {
463 6 : accdb_offset_t whead = { .val = out->whead_val };
464 6 : ulong idx = packed_partition_idx( &whead );
465 6 : fd_accdb_partition_t * part = partition_pool_ele( accdb->partition_pool, idx );
466 6 : out->savepoint_bytes_freed = FD_VOLATILE_CONST( part->bytes_freed );
467 6 : } else {
468 0 : out->savepoint_bytes_freed = 0UL;
469 0 : }
470 6 : }
471 :
472 : void
473 : fd_accdb_snapshot_revert_whead( fd_accdb_t * accdb,
474 6 : fd_accdb_snapshot_recovery_t const * recover ) {
475 6 : fd_accdb_shmem_t * shmem = accdb->shmem;
476 :
477 : /* Wait for any pending background command (purge) on T2 to finish
478 : before releasing partitions. */
479 6 : wait_cmd( accdb );
480 :
481 6 : ulong cur_partition_max = shmem->partition_max;
482 :
483 : /* Materialize the active partition's write_offset from the whead
484 : before releasing. Closed partitions have write_offset set by
485 : change_partition, but the last active partition still has
486 : write_offset == 0 from its initialization. The real byte offset
487 : is encoded in whead[0]. */
488 6 : if( shmem->has_partition[ 0 ] && cur_partition_max>recover->partition_max ) {
489 6 : ulong active_idx = packed_partition_idx( &shmem->whead[ 0 ] );
490 6 : if( active_idx>=recover->partition_max && active_idx<cur_partition_max ) {
491 6 : fd_accdb_partition_t * active = partition_pool_ele( accdb->partition_pool, active_idx );
492 6 : active->write_offset = packed_partition_offset( &shmem->whead[ 0 ] );
493 6 : }
494 6 : }
495 :
496 : /* Release partitions that have been previously allocated. Must hold
497 : partition_lock because partition_pool_ele_release mutates the
498 : pool free list. Before releasing, unlink any partition that sits
499 : on a compaction dlist (queued flag).
500 :
501 : Release in descending index order so that the LIFO free list
502 : re-acquires them in ascending order (P, P+1, P+2, ...). This
503 : keeps allocate_next_write in sync with snapwr, which advances
504 : its flat file offset sequentially. */
505 6 : spin_lock_acquire( &shmem->partition_lock );
506 18 : for( ulong p=cur_partition_max; p>recover->partition_max; p-- ) {
507 12 : fd_accdb_partition_t * part = partition_pool_ele( accdb->partition_pool, p-1UL );
508 12 : if( FD_UNLIKELY( part->queued ) ) {
509 6 : compaction_dlist_ele_remove( accdb->compaction_dlist[ part->layer ], part, accdb->partition_pool );
510 6 : }
511 12 : partition_pool_ele_release( accdb->partition_pool, part );
512 12 : }
513 :
514 6 : shmem->whead[ 0 ].val = recover->whead_val;
515 6 : shmem->has_partition[ 0 ] = recover->has_partition;
516 6 : shmem->partition_max = recover->partition_max;
517 :
518 : /* disk_used_bytes is NOT saved/restored here. It is implicitly
519 : reverted by purge_inner -> acc_unlink, which decrements
520 : disk_used_bytes for each unlinked entry. The caller must
521 : complete the purge before calling revert_whead. */
522 :
523 6 : shmem->shmetrics->disk_current_bytes = recover->disk_current_bytes;
524 6 : shmem->shmetrics->disk_allocated_bytes = recover->partition_max * shmem->partition_sz;
525 :
526 6 : if( recover->has_partition ) {
527 6 : accdb_offset_t sp_off = (accdb_offset_t){ .val = recover->whead_val };
528 6 : ulong sp_idx = packed_partition_idx( &sp_off );
529 6 : fd_accdb_partition_t * sp = partition_pool_ele( accdb->partition_pool, sp_idx );
530 6 : sp->bytes_freed = recover->savepoint_bytes_freed;
531 6 : sp->write_offset = 0UL;
532 6 : }
533 :
534 6 : spin_lock_release( &shmem->partition_lock );
535 6 : }
536 :
537 : fd_accdb_t *
538 3765 : fd_accdb_join( void * shaccdb ) {
539 3765 : if( FD_UNLIKELY( !shaccdb ) ) {
540 0 : FD_LOG_WARNING(( "NULL shaccdb" ));
541 0 : return NULL;
542 0 : }
543 :
544 3765 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shaccdb, fd_accdb_align() ) ) ) {
545 0 : FD_LOG_WARNING(( "misaligned shaccdb" ));
546 0 : return NULL;
547 0 : }
548 :
549 3765 : return (fd_accdb_t*)shaccdb;
550 3765 : }
551 :
552 : fd_accdb_t *
553 : fd_accdb_join_readonly( void * ljoin,
554 : fd_accdb_shmem_t * shmem,
555 : ulong * my_epoch_slot_rw,
556 0 : int fd_ro ) {
557 0 : if( FD_UNLIKELY( !ljoin ) ) {
558 0 : FD_LOG_WARNING(( "NULL ljoin" ));
559 0 : return NULL;
560 0 : }
561 :
562 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)ljoin, fd_accdb_align() ) ) ) {
563 0 : FD_LOG_WARNING(( "misaligned ljoin" ));
564 0 : return NULL;
565 0 : }
566 :
567 0 : if( FD_UNLIKELY( !my_epoch_slot_rw ) ) {
568 0 : FD_LOG_WARNING(( "NULL my_epoch_slot_rw" ));
569 0 : return NULL;
570 0 : }
571 :
572 0 : ulong max_live_slots = shmem->max_live_slots;
573 0 : ulong max_accounts = shmem->max_accounts;
574 0 : ulong max_account_writes_per_slot = shmem->max_account_writes_per_slot;
575 0 : ulong partition_cnt = shmem->partition_cnt;
576 :
577 0 : ulong chain_cnt = fd_ulong_pow2_up( (max_accounts>>1) + (max_accounts&1UL) );
578 0 : ulong txn_max = max_live_slots * max_account_writes_per_slot;
579 :
580 : /* Recompute the same shmem scratch layout that fd_accdb_shmem_new
581 : used. All FD_SCRATCH_ALLOC_APPEND calls here only compute pointer
582 : offsets — they do not write to shmem. */
583 0 : FD_SCRATCH_ALLOC_INIT( l, shmem );
584 0 : FD_SCRATCH_ALLOC_APPEND( l, FD_ACCDB_SHMEM_ALIGN, sizeof(fd_accdb_shmem_t) );
585 0 : void * _fork_pool_shmem = FD_SCRATCH_ALLOC_APPEND( l, fork_pool_align(), fork_pool_footprint() );
586 0 : void * _fork_pool_ele = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_accdb_fork_shmem_t), max_live_slots*sizeof(fd_accdb_fork_shmem_t) );
587 0 : void * _descends_sets = FD_SCRATCH_ALLOC_APPEND( l, descends_set_align(), max_live_slots*descends_set_footprint( max_live_slots ) );
588 0 : void * _acc_map = FD_SCRATCH_ALLOC_APPEND( l, alignof(uint), chain_cnt*sizeof(uint) );
589 0 : void * _acc_pool_shmem = FD_SCRATCH_ALLOC_APPEND( l, acc_pool_align(), acc_pool_footprint() );
590 0 : void * _acc_pool_ele = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_accdb_accmeta_t), max_accounts*sizeof(fd_accdb_accmeta_t) );
591 0 : FD_SCRATCH_ALLOC_APPEND( l, txn_pool_align(), txn_pool_footprint() );
592 0 : FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_accdb_txn_t), txn_max*sizeof(fd_accdb_txn_t) );
593 0 : FD_SCRATCH_ALLOC_APPEND( l, partition_pool_align(), partition_pool_footprint( partition_cnt ) );
594 0 : for( ulong k=0UL; k<FD_ACCDB_COMPACTION_LAYER_CNT; k++ ) {
595 0 : FD_SCRATCH_ALLOC_APPEND( l, compaction_dlist_align(), compaction_dlist_footprint() );
596 0 : }
597 0 : FD_SCRATCH_ALLOC_APPEND( l, deferred_free_dlist_align(), deferred_free_dlist_footprint() );
598 :
599 0 : FD_SCRATCH_ALLOC_INIT( l2, ljoin );
600 0 : fd_accdb_t * accdb = FD_SCRATCH_ALLOC_APPEND( l2, fd_accdb_align(), sizeof(fd_accdb_t) );
601 0 : void * _local_fork_pool = FD_SCRATCH_ALLOC_APPEND( l2, alignof(fd_accdb_fork_t), max_live_slots*sizeof(fd_accdb_fork_t) );
602 :
603 0 : accdb->fd = fd_ro;
604 0 : accdb->acquire_state = FD_ACCDB_ACQUIRE_STATE_IDLE;
605 0 : accdb->shmem = shmem;
606 0 : FD_TEST( acc_pool_join( accdb->acc_pool_join, _acc_pool_shmem, _acc_pool_ele, max_accounts ) );
607 0 : accdb->acc_pool = accdb->acc_pool_join->ele;
608 0 : accdb->acc_map = _acc_map;
609 0 : for( ulong c=0UL; c<FD_ACCDB_CACHE_CLASS_CNT; c++ ) accdb->cache[ c ] = (uchar *)shmem + shmem->cache_region_off[ c ];
610 :
611 : /* Writer-only structures: leave NULL so any accidental writer-path
612 : call from a readonly joiner crashes loudly rather than corrupting
613 : state. */
614 0 : accdb->partition_pool = NULL;
615 0 : for( ulong k=0UL; k<FD_ACCDB_COMPACTION_LAYER_CNT; k++ ) accdb->compaction_dlist[ k ] = NULL;
616 0 : accdb->deferred_free_dlist = NULL;
617 :
618 0 : FD_TEST( fork_pool_join( accdb->fork_shmem_pool, _fork_pool_shmem, _fork_pool_ele, max_live_slots ) );
619 0 : accdb->fork_pool = _local_fork_pool;
620 0 : for( ulong i=0UL; i<max_live_slots; i++ ) {
621 0 : fd_accdb_fork_t * fork = &accdb->fork_pool[ i ];
622 0 : fork->shmem = fork_pool_ele( accdb->fork_shmem_pool, i );
623 0 : fork->descends = descends_set_join( (uchar *)_descends_sets + i*descends_set_footprint( max_live_slots ) );
624 0 : FD_TEST( fork->shmem );
625 0 : FD_TEST( fork->descends );
626 0 : }
627 :
628 : /* my_epoch_slot_rw points at memory owned by this joiner (e.g. a
629 : private per-tile fseq) that the joiner can write to. The
630 : accdb tile sees it via its external_epoch_slots[] array (mapped
631 : read-only) and includes it in its compaction epoch scan.
632 : Storing through this pointer is the only side effect a readonly
633 : joiner has on shared state. */
634 0 : accdb->my_epoch_slot = my_epoch_slot_rw;
635 :
636 : /* Readonly joiners do not own external slots themselves; only the
637 : compaction tile / writer joiners do. */
638 0 : accdb->external_epoch_slots = NULL;
639 0 : accdb->external_epoch_cnt = 0UL;
640 :
641 0 : accdb->deferred_acc_buf = NULL;
642 0 : accdb->deferred_fork_head = NULL;
643 0 : accdb->deferred_fork_tail = NULL;
644 0 : accdb->deferred_fork_epoch = 0UL;
645 :
646 0 : memset( accdb->metrics, 0, sizeof(fd_accdb_metrics_t) );
647 :
648 0 : return accdb;
649 0 : }
650 :
651 : /* T1 -> T2 cmd channel. Two states on cmd_op:
652 :
653 : IDLE - no cmd in flight
654 : non-IDLE - cmd pending; T2 will process it then flip back to IDLE
655 :
656 : T1 submits by writing fork_id then cmd_op (non-IDLE). T2 processes
657 : by reading fork_id then writing cmd_op = IDLE. T1 waits for IDLE
658 : before submitting again, so T2 never sees a half-written cmd and
659 : never re-processes the same cmd. */
660 :
661 : static inline void
662 7884 : wait_cmd( fd_accdb_t * accdb ) {
663 7884 : fd_accdb_shmem_t * shmem = accdb->shmem;
664 7884 : while( FD_VOLATILE_CONST( shmem->cmd_op )!=FD_ACCDB_CMD_IDLE ) FD_SPIN_PAUSE();
665 7884 : FD_COMPILER_MFENCE();
666 7884 : }
667 :
668 : static inline void
669 : submit_cmd( fd_accdb_t * accdb,
670 : uint op,
671 198 : ushort fork_id ) {
672 198 : fd_accdb_shmem_t * shmem = accdb->shmem;
673 198 : FD_VOLATILE( shmem->cmd_fork_id ) = fork_id;
674 198 : FD_COMPILER_MFENCE();
675 198 : FD_VOLATILE( shmem->cmd_op ) = op;
676 198 : }
677 :
678 : fd_accdb_fork_id_t
679 : fd_accdb_attach_child( fd_accdb_t * accdb,
680 7680 : fd_accdb_fork_id_t parent_fork_id ) {
681 : /* fork_pool_acquire is not NULL-checked: replay gates attaches on
682 : fd_banks_is_full, and wait_cmd ensures the prior advance_root has
683 : fully run on T2, so live + deferred forks <= max_live_slots. */
684 7680 : wait_cmd( accdb );
685 :
686 7680 : fd_accdb_fork_shmem_t * acquired = fork_pool_acquire( accdb->fork_shmem_pool );
687 7680 : ulong idx = fork_pool_idx( accdb->fork_shmem_pool, acquired );
688 :
689 7680 : fd_accdb_fork_t * fork = &accdb->fork_pool[ idx ];
690 7680 : fd_accdb_fork_id_t fork_id = { .val = (ushort)idx };
691 :
692 7680 : fork->shmem->child_id = (fd_accdb_fork_id_t){ .val = USHORT_MAX };
693 :
694 7680 : if( FD_LIKELY( parent_fork_id.val==USHORT_MAX ) ) {
695 3717 : fork->shmem->parent_id = (fd_accdb_fork_id_t){ .val = USHORT_MAX };
696 3717 : fork->shmem->sibling_id = (fd_accdb_fork_id_t){ .val = USHORT_MAX };
697 :
698 3717 : descends_set_null( fork->descends );
699 3717 : accdb->shmem->root_fork_id = fork_id;
700 3963 : } else {
701 3963 : fd_accdb_fork_t * parent = &accdb->fork_pool[ parent_fork_id.val ];
702 3963 : fork->shmem->parent_id = parent_fork_id;
703 :
704 3963 : descends_set_copy( fork->descends, parent->descends );
705 3963 : descends_set_insert( fork->descends, parent_fork_id.val );
706 :
707 : /* Atomically prepend to parent's child list. T2 (background_purge)
708 : may concurrently unlink a different child from the same list, so
709 : we must CAS here. */
710 3963 : FD_COMPILER_MFENCE();
711 3963 : for(;;) {
712 3963 : ushort old_head = FD_VOLATILE_CONST( parent->shmem->child_id.val );
713 3963 : fork->shmem->sibling_id = (fd_accdb_fork_id_t){ .val = old_head };
714 3963 : FD_COMPILER_MFENCE();
715 3963 : if( FD_LIKELY( FD_ATOMIC_CAS( &parent->shmem->child_id.val, old_head, fork_id.val )==old_head ) ) break;
716 0 : FD_SPIN_PAUSE();
717 0 : }
718 3963 : }
719 :
720 7680 : fork->shmem->generation = accdb->shmem->generation++;
721 7680 : fork->shmem->txn_head = UINT_MAX;
722 :
723 7680 : FD_TEST( !descends_set_test( fork->descends, fork_id.val ) );
724 :
725 7680 : return fork_id;
726 7680 : }
727 :
728 : /* evict_clear_acc_cache_ref atomically tears down acc->cache_idx and
729 : acc->executable_size.CACHE_VALID for an acc that is being evicted
730 : from cache line (size_class, line_idx). The caller must already
731 : hold an exclusive claim on the line (line->refcnt ==
732 : FD_ACCDB_EVICT_SENTINEL) so that no concurrent thread can pin the
733 : line.
734 :
735 : The naive sequence (clear cache_idx, clear VALID) lets a reader in
736 : cold_load_acc see VALID=1 and read a stale INVAL cache_idx, which
737 : decodes to an OOB cache_line pointer. The reverse sequence (clear
738 : VALID, clear cache_idx) lets a concurrent cold_load_acc observe
739 : VALID=0/CLAIM=0 and start publishing a *new* cache_idx + VALID=1
740 : between our two stores; our later cache_idx=INVAL would then
741 : stomp on the cold-loader's published idx.
742 :
743 : We close both races by acquiring CACHE_CLAIM_BIT before mutating
744 : acc->cache_idx. cold_load_acc spins while CLAIM is held, so it
745 : cannot enter the publish path concurrently. If CLAIM is already
746 : held, a cold-loader is already mid-publish; in that case
747 : acc->cache_idx is being repointed away from our line, and we must
748 : not touch it. After mutation we release CLAIM.
749 :
750 : Verifies acc->cache_idx still encodes (size_class, line_idx) before
751 : clobbering, in case the acc was concurrently re-published into a
752 : different line (e.g. by a previous cold_load_acc completing before
753 : we arrived). */
754 :
755 : static inline void
756 : evict_clear_acc_cache_ref( fd_accdb_accmeta_t * accmeta,
757 : ulong size_class,
758 333 : ulong line_idx ) {
759 333 : uint expected_cidx = FD_ACCDB_ACC_CIDX_PACK( (uint)size_class, (uint)line_idx );
760 :
761 : /* CAS-acquire CLAIM. If a cold-loader already holds CLAIM, they
762 : own the publish path; bail without touching accmeta fields (their
763 : republish is repointing accmeta->cache_idx away from our line). */
764 333 : for(;;) {
765 333 : uint cur = FD_VOLATILE_CONST( accmeta->executable_size );
766 333 : if( FD_UNLIKELY( cur & FD_ACCDB_SIZE_CACHE_CLAIM_BIT ) ) return;
767 333 : uint nxt = cur | FD_ACCDB_SIZE_CACHE_CLAIM_BIT;
768 333 : if( FD_LIKELY( FD_ATOMIC_CAS( &accmeta->executable_size, cur, nxt )==cur ) ) break;
769 0 : fd_racesan_hook( "accdb_evict_clear:claim_wait" );
770 0 : FD_SPIN_PAUSE();
771 0 : }
772 :
773 333 : fd_racesan_hook( "accdb_evict_clear:post_claim" );
774 :
775 : /* CLAIM held. If accmeta->cache_idx still points at our line, clear
776 : VALID and INVAL the cache_idx. Otherwise the accmeta was already
777 : re-published into a different line; leave it alone. */
778 333 : if( FD_LIKELY( FD_VOLATILE_CONST( accmeta->cache_idx )==expected_cidx ) ) {
779 333 : FD_ATOMIC_FETCH_AND_AND( &accmeta->executable_size, ~FD_ACCDB_SIZE_CACHE_VALID_BIT );
780 333 : FD_VOLATILE( accmeta->cache_idx ) = FD_ACCDB_ACC_CIDX_INVAL;
781 333 : }
782 :
783 : /* Release CLAIM. */
784 333 : FD_ATOMIC_FETCH_AND_AND( &accmeta->executable_size, ~FD_ACCDB_SIZE_CACHE_CLAIM_BIT );
785 333 : }
786 :
787 : /* cache_free_push pushes a fully-freed cache line onto the per-class
788 : CAS free list (Treiber stack). The caller must have already
789 : invalidated the line (key.generation==UINT_MAX) and set persisted=1
790 : before pushing. */
791 :
792 : static inline void
793 : cache_free_push( fd_accdb_t * accdb,
794 : ulong size_class,
795 729246 : fd_accdb_cache_line_t * line ) {
796 729246 : ulong line_idx = cache_line_idx( accdb, size_class, line );
797 729246 : for(;;) {
798 729246 : ulong old_vt = FD_VOLATILE_CONST( accdb->shmem->cache_free[ size_class ].ver_top );
799 729246 : uint old_top = (uint)( old_vt & (ulong)UINT_MAX );
800 729246 : uint old_ver = (uint)( old_vt >> 32 );
801 729246 : line->next = old_top;
802 729246 : FD_COMPILER_MFENCE();
803 729246 : ulong new_vt = ((ulong)(uint)( old_ver+1U ) << 32) | (ulong)(uint)line_idx;
804 729246 : if( FD_LIKELY( FD_ATOMIC_CAS( &accdb->shmem->cache_free[ size_class ].ver_top, old_vt, new_vt )==old_vt ) ) {
805 729246 : FD_ATOMIC_FETCH_AND_ADD( &accdb->shmem->cache_free_cnt[ size_class ].val, 1UL );
806 729246 : return;
807 729246 : }
808 0 : FD_SPIN_PAUSE();
809 0 : }
810 729246 : }
811 :
812 : /* cache_free_pop pops a line from the per-class CAS free list. Returns
813 : NULL if the list is empty. */
814 :
815 : static inline fd_accdb_cache_line_t *
816 : cache_free_pop( fd_accdb_t * accdb,
817 830712 : ulong size_class ) {
818 830712 : for(;;) {
819 830712 : ulong old_vt = FD_VOLATILE_CONST( accdb->shmem->cache_free[ size_class ].ver_top );
820 830712 : uint old_top = (uint)( old_vt & (ulong)UINT_MAX );
821 830712 : if( FD_UNLIKELY( old_top==UINT_MAX ) ) return NULL;
822 701286 : uint old_ver = (uint)( old_vt >> 32 );
823 701286 : fd_accdb_cache_line_t * top = cache_line( accdb, size_class, (ulong)old_top );
824 701286 : uint next = FD_VOLATILE_CONST( top->next );
825 701286 : ulong new_vt = ((ulong)(uint)( old_ver+1U ) << 32) | (ulong)next;
826 701286 : if( FD_LIKELY( FD_ATOMIC_CAS( &accdb->shmem->cache_free[ size_class ].ver_top, old_vt, new_vt )==old_vt ) ) {
827 701286 : FD_ATOMIC_FETCH_AND_SUB( &accdb->shmem->cache_free_cnt[ size_class ].val, 1UL );
828 701286 : return top;
829 701286 : }
830 0 : FD_SPIN_PAUSE();
831 0 : }
832 830712 : }
833 :
834 : /* cache_try_pin attempts a lock-free pin of a cache-hit line. Returns
835 : the line if successfully pinned, or NULL if the line is being evicted
836 : or was recycled (ABA). */
837 :
838 : static inline fd_accdb_cache_line_t *
839 : cache_try_pin( fd_accdb_cache_line_t * line,
840 : uchar const pubkey[ 32 ],
841 79194 : uint generation ) {
842 79194 : for(;;) {
843 79194 : uint old_rc = FD_VOLATILE_CONST( line->refcnt );
844 79194 : if( FD_UNLIKELY( old_rc==FD_ACCDB_EVICT_SENTINEL ) ) return NULL;
845 : /* No saturation guard needed: refcnt is a uint and at most
846 : FD_ACCDB_MAX_JOINERS (256) threads can pin concurrently,
847 : so old_rc+1 can never reach FD_ACCDB_EVICT_SENTINEL
848 : (UINT_MAX) or wrap. */
849 79194 : if( FD_LIKELY( FD_ATOMIC_CAS( &line->refcnt, old_rc, old_rc+1U )==old_rc ) ) {
850 : /* Pinned. ABA check: verify the key hasn't changed under us. */
851 79194 : fd_racesan_hook( "accdb_try_pin:post_cas" );
852 79194 : FD_COMPILER_MFENCE();
853 79194 : if( FD_UNLIKELY( line->key.generation!=generation ||
854 79194 : memcmp( line->key.pubkey, pubkey, 32UL ) ) ) {
855 0 : FD_ATOMIC_FETCH_AND_SUB( &line->refcnt, 1U );
856 0 : return NULL;
857 0 : }
858 79194 : line->referenced = 1;
859 79194 : fd_racesan_hook( "cache_try_pin:pinned" );
860 79194 : return line;
861 79194 : }
862 0 : FD_SPIN_PAUSE();
863 0 : }
864 79194 : }
865 :
866 : /* wait_for_epoch_drain spins until every joiner's published epoch
867 : exceeds tag, meaning all readers that were active at epoch=tag have
868 : since exited their critical sections. */
869 :
870 : static void
871 : wait_for_epoch_drain( fd_accdb_t * accdb,
872 273 : ulong tag ) {
873 273 : for(;;) {
874 273 : ulong min_epoch = ULONG_MAX;
875 273 : ulong joiner_cnt = FD_VOLATILE_CONST( accdb->shmem->joiner_cnt );
876 546 : for( ulong t=0UL; t<joiner_cnt; t++ ) {
877 273 : ulong e = FD_VOLATILE_CONST( accdb->shmem->joiner_epochs[ t ].val );
878 273 : if( FD_LIKELY( e<min_epoch ) ) min_epoch = e;
879 273 : }
880 273 : for( ulong t=0UL; t<accdb->external_epoch_cnt; t++ ) {
881 0 : ulong e = FD_VOLATILE_CONST( *accdb->external_epoch_slots[ t ] );
882 0 : if( FD_LIKELY( e<min_epoch ) ) min_epoch = e;
883 0 : }
884 273 : if( FD_LIKELY( tag<min_epoch ) ) break;
885 0 : fd_racesan_hook( "accdb_epoch_drain:wait" );
886 0 : FD_SPIN_PAUSE();
887 0 : }
888 273 : }
889 :
890 : /* drain_deferred_frees releases back to their respective pools any acc
891 : batch and/or fork slots that were unlinked in a prior advance_root /
892 : purge call. The resources cannot be released immediately because
893 : concurrent readers may still reference them. We wait until every
894 : joiner's published epoch exceeds the tag stamped when each resource
895 : was unlinked.
896 :
897 : Must be called before creating new deferred batches (there is at most
898 : one of each outstanding at a time). */
899 :
900 : static void
901 195 : drain_deferred_frees( fd_accdb_t * accdb ) {
902 195 : if( FD_UNLIKELY( accdb->deferred_fork_head ) ) {
903 138 : wait_for_epoch_drain( accdb, accdb->deferred_fork_epoch );
904 138 : fork_pool_release_chain( accdb->fork_shmem_pool, accdb->deferred_fork_head, accdb->deferred_fork_tail );
905 138 : accdb->deferred_fork_head = NULL;
906 138 : accdb->deferred_fork_tail = NULL;
907 138 : }
908 :
909 195 : ulong n = accdb->shmem->deferred_acc_buf_cnt;
910 195 : if( FD_LIKELY( !n ) ) return;
911 135 : wait_for_epoch_drain( accdb, accdb->shmem->deferred_acc_epoch );
912 :
913 : /* All readers that could have been holding a captured pointer to any
914 : of these accs at unlink time have now exited their epoch sections.
915 : It is safe to materialize pool.next links and hand the chain to
916 : acc_pool_release_chain. */
917 135 : uint * buf = accdb->deferred_acc_buf;
918 135 : fd_accdb_accmeta_t * acc_pool = accdb->acc_pool;
919 :
920 : /* Late-publish sweep: a concurrent acquire evictor may have published
921 : a new offset into one of these accmetas after acc_unlink's
922 : xchg-to-INVAL but before exiting its epoch. Now that the epoch has
923 : drained, any such publish is complete and visible. Free the
924 : orphaned disk bytes here, before the accmeta is released to the
925 : pool and its fields recycled. */
926 135 : ulong acc_pool_cap = acc_pool_ele_max( accdb->acc_pool_join );
927 729 : for( ulong i=0UL; i<n; i++ ) {
928 594 : FD_TEST( (ulong)buf[ i ]<acc_pool_cap );
929 : #if FD_TMPL_USE_HANDHOLDING
930 : for( ulong j=0UL; j<i; j++ ) FD_TEST( buf[ j ]!=buf[ i ] );
931 : #endif
932 594 : fd_accdb_accmeta_t * accmeta = &acc_pool[ buf[ i ] ];
933 594 : ulong off = fd_accdb_acc_offset( accmeta );
934 594 : if( FD_UNLIKELY( off!=FD_ACCDB_OFF_INVAL ) ) {
935 0 : ulong entry_sz = (ulong)FD_ACCDB_SIZE_DATA(accmeta->executable_size)+sizeof(fd_accdb_disk_meta_t);
936 0 : fd_accdb_shmem_bytes_freed( accdb->shmem, off, entry_sz );
937 0 : FD_ATOMIC_FETCH_AND_SUB( &accdb->shmem->shmetrics->disk_used_bytes, entry_sz );
938 0 : }
939 594 : }
940 :
941 594 : for( ulong i=0UL; i+1UL<n; i++ ) {
942 459 : acc_pool[ buf[ i ] ].pool.next = acc_pool_private_cidx( (ulong)buf[ i+1UL ] );
943 459 : }
944 135 : fd_accdb_accmeta_t * head = &acc_pool[ buf[ 0UL ] ];
945 135 : fd_accdb_accmeta_t * tail = &acc_pool[ buf[ n-1UL ] ];
946 135 : acc_pool_release_chain( accdb->acc_pool_join, head, tail );
947 135 : accdb->shmem->deferred_acc_buf_cnt = 0UL;
948 135 : }
949 :
950 : /* deferred_acc_append records an unlinked acc index in the side buffer
951 : for later release after wait_for_epoch_drain. T2 is the sole writer.
952 : The chain link from acc->pool.next is NOT laid down here: pool.next
953 : is union-aliased to cache_idx, and a concurrent cold_load_acc may
954 : still publish through a captured pointer until the epoch drains.
955 : Materialization of the chain happens in drain_deferred_frees. */
956 :
957 : static inline void
958 : deferred_acc_append( fd_accdb_t * accdb,
959 783 : uint acc_idx ) {
960 783 : fd_accdb_shmem_t * shmem = accdb->shmem;
961 783 : FD_TEST( shmem->deferred_acc_buf_cnt<shmem->deferred_acc_buf_max );
962 783 : accdb->deferred_acc_buf[ shmem->deferred_acc_buf_cnt++ ] = acc_idx;
963 783 : }
964 :
965 : /* acc_unlink unlinks an account from its hash map chain, frees any
966 : associated disk bytes, and invalidates a stale cache reference. Does
967 : NOT release the acc pool slot — the caller is responsible for that
968 : (or for batching releases).
969 :
970 : prev is the previous element in the map chain (UINT_MAX if acc_idx is
971 : the head).
972 :
973 : CONCURRENCY: The chain link being removed is swapped out with a CAS
974 : so that a concurrent fd_accdb_release prepending to the same chain
975 : cannot lose its update. If a head-removal CAS fails (a new node was
976 : prepended since we loaded the head), we re-walk from the new head to
977 : find the target as an interior node. Interior CAS cannot fail from
978 : inserts (inserts only touch the head) and only one remover exists at
979 : a time (advance_root / purge are serialized). */
980 :
981 : static inline void
982 : acc_unlink( fd_accdb_t * accdb,
983 : uint map_idx,
984 : uint prev,
985 783 : uint acc_idx ) {
986 783 : fd_accdb_accmeta_t * accmeta = &accdb->acc_pool[ acc_idx ];
987 :
988 : /* Atomically capture and clear the offset. Two races to defuse:
989 :
990 : (1) A concurrent fd_accdb_acquire_inner that is CLOCK-evicting the
991 : cache line currently holding this acc's data may have already
992 : xchg'd the offset to INVAL in step 5-6 and freed the old disk
993 : bytes. Without atomicity we would re-read the old offset and
994 : free those same bytes a second time. The xchg here serializes:
995 : whoever wins sees the real offset and frees; the loser sees
996 : INVAL and skips.
997 :
998 : (2) That same evictor may also be mid-flight to publish a NEW
999 : offset in step 9 (after step 5-6's free but before step 9's
1000 : store). That late publish lands on an accmeta that is about
1001 : to be chain-unlinked and deferred-released. drain_deferred_
1002 : frees sweeps the deferred buffer after epoch drain to catch
1003 : the late publish and free the orphaned bytes. */
1004 783 : ulong entry_sz = (ulong)FD_ACCDB_SIZE_DATA(accmeta->executable_size)+sizeof(fd_accdb_disk_meta_t);
1005 783 : ulong old_offset = fd_accdb_acc_xchg_offset( accmeta, FD_ACCDB_OFF_INVAL );
1006 783 : if( FD_LIKELY( old_offset!=FD_ACCDB_OFF_INVAL ) ) {
1007 21 : fd_accdb_shmem_bytes_freed( accdb->shmem, old_offset, entry_sz );
1008 21 : FD_ATOMIC_FETCH_AND_SUB( &accdb->shmem->shmetrics->disk_used_bytes, entry_sz );
1009 21 : }
1010 783 : FD_ATOMIC_FETCH_AND_SUB( &accdb->shmem->shmetrics->accounts_total, 1UL );
1011 783 : accdb->metrics->accounts_deleted++;
1012 :
1013 783 : if( FD_LIKELY( prev==UINT_MAX ) ) {
1014 : /* Head removal — CAS may fail if a concurrent insert prepended a
1015 : new node. On failure the target is now interior. */
1016 36 : for(;;) {
1017 36 : uint old_head = FD_VOLATILE_CONST( accdb->acc_map[ map_idx ] );
1018 36 : if( FD_LIKELY( old_head==acc_idx ) ) {
1019 36 : if( FD_LIKELY( FD_ATOMIC_CAS( &accdb->acc_map[ map_idx ], acc_idx, accmeta->map.next )==acc_idx ) ) break;
1020 0 : FD_SPIN_PAUSE();
1021 0 : continue;
1022 36 : }
1023 : /* Head changed — walk from new head to find prev for interior
1024 : removal. The target must still be in the chain because only
1025 : this thread removes elements. */
1026 0 : prev = old_head;
1027 0 : while( FD_VOLATILE_CONST( accdb->acc_pool[ prev ].map.next )!=acc_idx ) prev = FD_VOLATILE_CONST( accdb->acc_pool[ prev ].map.next );
1028 0 : FD_ATOMIC_CAS( &accdb->acc_pool[ prev ].map.next, acc_idx, accmeta->map.next );
1029 0 : break;
1030 36 : }
1031 747 : } else {
1032 747 : FD_ATOMIC_CAS( &accdb->acc_pool[ prev ].map.next, acc_idx, accmeta->map.next );
1033 747 : }
1034 :
1035 783 : fd_racesan_hook( "accdb_acc_unlink:post_splice" );
1036 :
1037 : /* If the freed acc still has a cached location, invalidate it and
1038 : try to reclaim the cache line so the eviction path does not try
1039 : to write back stale data from a recycled pool slot. Lock-free:
1040 : CAS the refcnt 0 -> EVICT_SENTINEL to claim it exclusively, then
1041 : push to the CAS free list. If the line is pinned (refcnt>0),
1042 : skip, the pinner's release will handle it.
1043 :
1044 : Acquire CACHE_CLAIM_BIT before touching acc->cache_idx /
1045 : CACHE_VALID — see evict_clear_acc_cache_ref for the protocol.
1046 : Without CLAIM, a concurrent cold_load_acc can publish a fresh
1047 : (cache_idx, VALID=1) pair into this acc between our two stores,
1048 : and our subsequent cache_idx=INVAL stomps onto the freelist
1049 : pool.next field (the union sibling of cache_idx), corrupting the
1050 : pool. Unlike evict_clear_acc_cache_ref, we cannot bail when CLAIM
1051 : is held: this acc is being permanently unlinked, so we must
1052 : spin-wait for the cold-loader to release CLAIM and then invalidate
1053 : whatever cache_idx is current. */
1054 783 : uint cur_es;
1055 783 : for(;;) {
1056 783 : cur_es = FD_VOLATILE_CONST( accmeta->executable_size );
1057 783 : if( FD_UNLIKELY( cur_es & FD_ACCDB_SIZE_CACHE_CLAIM_BIT ) ) { FD_SPIN_PAUSE(); continue; }
1058 783 : uint nxt_es = cur_es | FD_ACCDB_SIZE_CACHE_CLAIM_BIT;
1059 783 : if( FD_LIKELY( FD_ATOMIC_CAS( &accmeta->executable_size, cur_es, nxt_es )==cur_es ) ) break;
1060 0 : FD_SPIN_PAUSE();
1061 0 : }
1062 :
1063 783 : uint cidx = FD_ACCDB_ACC_CIDX_INVAL;
1064 783 : int had_valid = FD_ACCDB_SIZE_CACHE_VALID( cur_es );
1065 783 : if( FD_UNLIKELY( had_valid ) ) {
1066 762 : cidx = FD_VOLATILE_CONST( accmeta->cache_idx );
1067 : /* Clear VALID before INVAL'ing cache_idx — matches the order in
1068 : evict_clear_acc_cache_ref so cold_load_acc's "VALID=1 +
1069 : cidx=INVAL" spin path resolves on the next iteration when it
1070 : observes VALID=0. */
1071 762 : FD_ATOMIC_FETCH_AND_AND( &accmeta->executable_size, ~FD_ACCDB_SIZE_CACHE_VALID_BIT );
1072 762 : FD_VOLATILE( accmeta->cache_idx ) = FD_ACCDB_ACC_CIDX_INVAL;
1073 762 : }
1074 :
1075 : /* Release CLAIM. */
1076 783 : FD_ATOMIC_FETCH_AND_AND( &accmeta->executable_size, ~FD_ACCDB_SIZE_CACHE_CLAIM_BIT );
1077 :
1078 783 : if( FD_UNLIKELY( had_valid ) ) {
1079 762 : fd_accdb_cache_line_t * stale = cache_line( accdb, FD_ACCDB_ACC_CIDX_CLASS( cidx ), FD_ACCDB_ACC_CIDX_IDX( cidx ) );
1080 762 : fd_racesan_hook( "acc_unlink:pre_reclaim_cas" );
1081 762 : uint old_rc = FD_ATOMIC_CAS( &stale->refcnt, 0U, FD_ACCDB_EVICT_SENTINEL );
1082 762 : fd_racesan_hook( "acc_unlink:post_reclaim_cas" );
1083 762 : if( FD_LIKELY( !old_rc ) ) {
1084 : /* Claimed. Validate key (ABA, slot could have been recycled
1085 : between our read of cache_idx and the CAS). */
1086 762 : if( FD_LIKELY( stale->key.generation==accmeta->key.generation &&
1087 762 : !memcmp( stale->key.pubkey, accmeta->key.pubkey, 32UL ) ) ) {
1088 762 : ulong sc = FD_ACCDB_ACC_CIDX_CLASS( cidx );
1089 762 : stale->key.generation = UINT_MAX;
1090 762 : stale->persisted = 1;
1091 762 : stale->acc_idx = UINT_MAX;
1092 762 : stale->refcnt = 0;
1093 762 : cache_free_push( accdb, sc, stale );
1094 762 : } else {
1095 : /* Wrong line (ABA). Release claim. */
1096 0 : FD_VOLATILE( stale->refcnt ) = 0;
1097 0 : }
1098 762 : }
1099 0 : else if( FD_LIKELY( old_rc!=FD_ACCDB_EVICT_SENTINEL ) ) {
1100 : /* The CAS lost to a non-sentinel refcnt, but that does not prove
1101 : `stale` is still our line. Between capturing cidx and here we
1102 : released the claim, so we could have evicted `stale` and
1103 : recycled it to an unrelated account. */
1104 0 : fd_accdb_cache_line_t * mine = cache_try_pin( stale, accmeta->key.pubkey, accmeta->key.generation );
1105 0 : if( FD_LIKELY( mine ) ) {
1106 : /* Genuinely our line, still pinned by a reader. The accmeta
1107 : slot is about to be deferred-released and recycled; if a
1108 : later writeback of this dirty line fires, it would pair the
1109 : recycled accmeta's pubkey with the old owner/data. Set
1110 : persisted so the writeback gate never fires. */
1111 0 : FD_VOLATILE( mine->persisted ) = 1;
1112 :
1113 : /* Only the tombstone self-unlink may be pinned here old-version
1114 : and purge unlinks are never pinned, because a reader on a
1115 : live fork resolves to the newest version, not the one these
1116 : unlink. */
1117 0 : FD_TEST( accmeta->lamports==0UL );
1118 :
1119 0 : FD_ATOMIC_FETCH_AND_SUB( &mine->refcnt, 1U );
1120 0 : }
1121 : /* Else was recycled to a foreign account. Nothing to neutralize,
1122 : leave the line alone. */
1123 0 : } else {
1124 : /* A foreground evictor already claimed this line. It holds its
1125 : epoch acquire and writeback, so drain_deferred_frees cannot
1126 : recycle the slot before it finishes. Its writeback names the
1127 : old account correctly, no poison. */
1128 0 : }
1129 762 : }
1130 783 : }
1131 :
1132 : /* fork_slot_defer removes fork_id from every descends_set and chains
1133 : the fork pool slot onto the deferred fork chain for later release.
1134 : The slot must not be released immediately because concurrent readers
1135 : may still reference the fork ID via descends_set or stale chain
1136 : walks.
1137 :
1138 : The eager descends_set_remove here is safe despite being a
1139 : non-atomic RMW that races with concurrent descends_set_test in
1140 : fd_accdb_acquire, for two reasons:
1141 :
1142 : (a) Rooted parent forks: after advance_root publishes the new
1143 : root_fork_id, any acquire loads root_generation >=
1144 : parent->generation. Every account from the old parent has
1145 : generation <= parent->generation, so the
1146 : "generation > root_generation" gate in the chain walk is
1147 : never satisfied and the parent's bit is never tested.
1148 :
1149 : (b) Purged / pruned sibling forks: a purged fork is by
1150 : definition not an ancestor of any live fork, so its bit
1151 : was never set in any live fork's descends_set. Clearing
1152 : it is a literal no-op.
1153 :
1154 : Fork-id ABA after slot reuse is also safe: the fork pool slot
1155 : is not released until drain_deferred_frees, which waits until
1156 : all epoch-protected readers have exited. On x86 (TSO), the
1157 : synchronization chain (T2: bit clear -> epoch FAA; reader:
1158 : epoch load -> epoch_slot store -> mfence -> bit read) guarantees
1159 : that any reader entering a new epoch section after the drain
1160 : will observe the cleared bit before the slot is recycled by
1161 : attach_child. */
1162 :
1163 : static inline void
1164 : fork_slot_defer( fd_accdb_t * accdb,
1165 : fd_accdb_fork_id_t fork_id,
1166 : fd_accdb_fork_shmem_t ** fork_head,
1167 207 : fd_accdb_fork_shmem_t ** fork_tail ) {
1168 5775 : for( ulong i=0UL; i<accdb->shmem->max_live_slots; i++ ) descends_set_remove( accdb->fork_pool[ i ].descends, fork_id.val );
1169 207 : fd_accdb_fork_shmem_t * shmem = fork_pool_ele( accdb->fork_shmem_pool, (ulong)fork_id.val );
1170 207 : if( *fork_tail ) (*fork_tail)->pool.next = fork_pool_private_cidx( (ulong)fork_id.val );
1171 195 : else *fork_head = shmem;
1172 207 : *fork_tail = shmem;
1173 207 : }
1174 :
1175 : static void
1176 : purge_inner( fd_accdb_t * accdb,
1177 : fd_accdb_fork_id_t fork_id,
1178 : fd_accdb_fork_shmem_t ** fork_head,
1179 24 : fd_accdb_fork_shmem_t ** fork_tail ) {
1180 24 : fd_accdb_fork_t * fork = &accdb->fork_pool[ fork_id.val ];
1181 :
1182 24 : fd_accdb_fork_id_t child = fork->shmem->child_id;
1183 33 : while( child.val!=USHORT_MAX ) {
1184 9 : fd_accdb_fork_id_t next = accdb->fork_pool[ child.val ].shmem->sibling_id;
1185 9 : purge_inner( accdb, child, fork_head, fork_tail );
1186 9 : child = next;
1187 9 : }
1188 :
1189 24 : uint txn = fork->shmem->txn_head;
1190 24 : if( txn!=UINT_MAX ) {
1191 24 : fd_accdb_txn_t * txn_head = txn_pool_ele( accdb->txn_pool, (ulong)txn );
1192 24 : fd_accdb_txn_t * txn_tail = NULL;
1193 63 : while( txn!=UINT_MAX ) {
1194 39 : fd_accdb_txn_t * txne = txn_pool_ele( accdb->txn_pool, (ulong)txn );
1195 :
1196 39 : uint acc_idx = txne->acc_pool_idx;
1197 :
1198 39 : uint prev = UINT_MAX;
1199 39 : uint cur = FD_VOLATILE_CONST( accdb->acc_map[ txne->acc_map_idx ] );
1200 42 : while( cur!=acc_idx ) {
1201 3 : prev = cur;
1202 3 : cur = FD_VOLATILE_CONST( accdb->acc_pool[ cur ].map.next );
1203 3 : }
1204 :
1205 39 : fd_racesan_hook( "accdb_purge:pre_unlink" );
1206 39 : acc_unlink( accdb, txne->acc_map_idx, prev, acc_idx );
1207 39 : deferred_acc_append( accdb, acc_idx );
1208 :
1209 39 : txn_tail = txne;
1210 39 : txn = txne->fork.next;
1211 39 : }
1212 24 : txn_pool_release_chain( accdb->txn_pool, txn_head, txn_tail );
1213 24 : }
1214 :
1215 24 : fork_slot_defer( accdb, fork_id, fork_head, fork_tail );
1216 24 : }
1217 :
1218 : static inline void
1219 : remove_children( fd_accdb_t * accdb,
1220 : fd_accdb_fork_t * fork,
1221 : fd_accdb_fork_t * except,
1222 : fd_accdb_fork_shmem_t ** fork_head,
1223 183 : fd_accdb_fork_shmem_t ** fork_tail ) {
1224 183 : fd_accdb_fork_id_t sibling_idx = fork->shmem->child_id;
1225 369 : while( sibling_idx.val!=USHORT_MAX ) {
1226 186 : fd_accdb_fork_t * sibling = &accdb->fork_pool[ sibling_idx.val ];
1227 186 : fd_accdb_fork_id_t cur_idx = sibling_idx;
1228 :
1229 186 : sibling_idx = sibling->shmem->sibling_id;
1230 186 : if( FD_UNLIKELY( sibling==except ) ) continue;
1231 :
1232 3 : purge_inner( accdb, cur_idx, fork_head, fork_tail );
1233 3 : }
1234 183 : }
1235 :
1236 : static void
1237 : background_advance_root( fd_accdb_t * accdb,
1238 183 : fd_accdb_fork_id_t fork_id ) {
1239 183 : drain_deferred_frees( accdb );
1240 :
1241 : /* The caller guarantees that rooting is sequential: each call
1242 : advances the root by exactly one slot (the immediate child of the
1243 : current root). Skipping levels is not supported. */
1244 183 : fd_accdb_fork_t * fork = &accdb->fork_pool[ fork_id.val ];
1245 183 : FD_TEST( fork->shmem->parent_id.val==accdb->shmem->root_fork_id.val );
1246 183 : FD_TEST( fork->shmem->parent_id.val!=USHORT_MAX );
1247 :
1248 183 : fd_accdb_fork_t * parent_fork = &accdb->fork_pool[ fork->shmem->parent_id.val ];
1249 :
1250 : /* Accumulate freed fork pool slots across remove_children and the
1251 : old-version cleanup below into a chain that will be deferred-
1252 : released after the epoch bump. Freed acc pool slots are recorded
1253 : in the shmem side buffer via deferred_acc_append (they cannot be
1254 : chained via pool.next yet — see comment on the side buffer). */
1255 183 : fd_accdb_fork_shmem_t * fork_head = NULL;
1256 183 : fd_accdb_fork_shmem_t * fork_tail = NULL;
1257 :
1258 : /* When a fork is rooted, any competing forks can be immediately
1259 : removed as they will not be needed again. This includes child
1260 : forks of the pruned siblings as well. */
1261 183 : remove_children( accdb, parent_fork, fork, &fork_head, &fork_tail );
1262 :
1263 : /* And for any accounts which were updated in the newly rooted slot,
1264 : we will now never need to access any older version, so we can
1265 : discard any slots earlier than the one we are rooting. */
1266 183 : uint txn = fork->shmem->txn_head;
1267 183 : if( txn!=UINT_MAX ) {
1268 183 : fd_accdb_txn_t * txn_head = txn_pool_ele( accdb->txn_pool, (ulong)txn );
1269 183 : fd_accdb_txn_t * txn_tail = NULL;
1270 960 : while( txn!=UINT_MAX ) {
1271 777 : fd_accdb_txn_t * txne = txn_pool_ele( accdb->txn_pool, (ulong)txn );
1272 :
1273 777 : fd_accdb_accmeta_t const * new_acc = &accdb->acc_pool[ txne->acc_pool_idx ];
1274 :
1275 777 : uint prev = UINT_MAX;
1276 777 : uint new_acc_prev = UINT_MAX; /* prev of new_acc on the chain when we encounter it (UINT_MAX if head or never seen) */
1277 777 : int new_acc_seen = 0;
1278 777 : uint acc = FD_VOLATILE_CONST( accdb->acc_map[ txne->acc_map_idx ] );
1279 777 : FD_TEST( acc!=UINT_MAX );
1280 2457 : while( acc!=UINT_MAX ) {
1281 1680 : fd_accdb_accmeta_t const * cur_acc = &accdb->acc_pool[ acc ];
1282 1680 : uint cur_next = FD_VOLATILE_CONST( cur_acc->map.next );
1283 :
1284 1680 : if( FD_LIKELY( acc==txne->acc_pool_idx ) ) {
1285 777 : new_acc_prev = prev;
1286 777 : new_acc_seen = 1;
1287 777 : prev = acc;
1288 777 : acc = cur_next;
1289 777 : continue;
1290 777 : }
1291 :
1292 903 : if( FD_LIKELY( (cur_acc->key.generation<=parent_fork->shmem->generation || descends_set_test( fork->descends, fd_accdb_acc_fork_id(cur_acc) ) ) && !memcmp( new_acc->key.pubkey, cur_acc->key.pubkey, 32UL ) ) ) {
1293 744 : uint next = cur_next;
1294 744 : fd_racesan_hook( "accdb_advance:pre_unlink" );
1295 744 : acc_unlink( accdb, txne->acc_map_idx, prev, acc );
1296 744 : deferred_acc_append( accdb, acc );
1297 744 : acc = next;
1298 744 : } else {
1299 159 : prev = acc;
1300 159 : acc = cur_next;
1301 159 : }
1302 903 : }
1303 :
1304 : /* If the newly rooted version is a tombstone (lamports==0, e.g.
1305 : account was closed), drop it from the index too: no fork can
1306 : reach it anymore, and keeping it around just wastes a hash
1307 : slot and the disk bytes it occupies.
1308 :
1309 : If a later txn on this same fork wrote the same pubkey, that
1310 : txn's inner walk above would have already unlinked this txn's
1311 : new_acc as an "older version" - in that case new_acc_seen=0
1312 : and we skip, since the freelist cleanup is already done. */
1313 777 : if( FD_UNLIKELY( new_acc_seen && new_acc->lamports==0UL ) ) {
1314 0 : uint new_acc_idx = (uint)txne->acc_pool_idx;
1315 0 : acc_unlink( accdb, txne->acc_map_idx, new_acc_prev, new_acc_idx );
1316 0 : deferred_acc_append( accdb, new_acc_idx );
1317 0 : }
1318 :
1319 777 : txn_tail = txne;
1320 777 : txn = txne->fork.next;
1321 777 : }
1322 183 : txn_pool_release_chain( accdb->txn_pool, txn_head, txn_tail );
1323 183 : }
1324 :
1325 183 : uint parent_txn = parent_fork->shmem->txn_head;
1326 183 : if( parent_txn!=UINT_MAX ) {
1327 39 : fd_accdb_txn_t * parent_head = txn_pool_ele( accdb->txn_pool, (ulong)parent_txn );
1328 39 : fd_accdb_txn_t * parent_tail = NULL;
1329 708 : while( parent_txn!=UINT_MAX ) {
1330 669 : fd_accdb_txn_t * t = txn_pool_ele( accdb->txn_pool, (ulong)parent_txn );
1331 669 : parent_tail = t;
1332 669 : parent_txn = t->fork.next;
1333 669 : }
1334 39 : txn_pool_release_chain( accdb->txn_pool, parent_head, parent_tail );
1335 39 : }
1336 :
1337 : /* Remove the parent from all descends_sets and chain it for deferred
1338 : release, so that when the slot is eventually recycled to a new
1339 : fork, no concurrent reader can mistake the new fork for the old
1340 : ancestor. Entries from the freed parent are still visible via the
1341 : generation <= root_generation fast path in reads. */
1342 183 : fd_accdb_fork_id_t old_parent_id = fork->shmem->parent_id;
1343 183 : fork_slot_defer( accdb, old_parent_id, &fork_head, &fork_tail );
1344 :
1345 183 : fork->shmem->parent_id = (fd_accdb_fork_id_t){ .val = USHORT_MAX };
1346 183 : fork->shmem->sibling_id = (fd_accdb_fork_id_t){ .val = USHORT_MAX };
1347 183 : fork->shmem->txn_head = UINT_MAX;
1348 183 : descends_set_null( fork->descends );
1349 :
1350 : /* Publish the new root_fork_id BEFORE bumping the epoch and deferring
1351 : the parent slot. On x86-64 (TSO) a concurrent reader that still
1352 : loads the old root_fork_id is guaranteed to see the parent shmem in
1353 : its original (not-yet-recycled) state because the slot has not been
1354 : released yet. A reader that loads the new root_fork_id uses the
1355 : new fork. */
1356 183 : fd_racesan_hook( "accdb_advance:pre_publish_root" );
1357 183 : accdb->shmem->root_fork_id = fork_id;
1358 183 : FD_COMPILER_MFENCE();
1359 183 : fd_racesan_hook( "accdb_advance:post_publish_root" );
1360 :
1361 : /* Bump epoch and defer both the acc batch and parent fork slot. They
1362 : will be released at the next drain_deferred_frees call once all
1363 : concurrent readers have exited. The acc batch lives in the shmem
1364 : side buffer; only its epoch tag needs setting here. */
1365 183 : ulong tag = FD_ATOMIC_FETCH_AND_ADD( &accdb->shmem->epoch, 1UL );
1366 183 : if( FD_LIKELY( accdb->shmem->deferred_acc_buf_cnt ) ) {
1367 180 : accdb->shmem->deferred_acc_epoch = tag;
1368 180 : }
1369 183 : if( FD_LIKELY( fork_head ) ) {
1370 183 : accdb->deferred_fork_head = fork_head;
1371 183 : accdb->deferred_fork_tail = fork_tail;
1372 183 : accdb->deferred_fork_epoch = tag;
1373 183 : }
1374 183 : }
1375 :
1376 : void
1377 : fd_accdb_advance_root( fd_accdb_t * accdb,
1378 183 : fd_accdb_fork_id_t fork_id ) {
1379 183 : wait_cmd( accdb );
1380 183 : submit_cmd( accdb, FD_ACCDB_CMD_ADVANCE_ROOT, fork_id.val );
1381 183 : }
1382 :
1383 : /* background_purge does the heavy lifting of purge on T2: unlink the
1384 : fork from the parent's child list, drain deferred frees, recursively
1385 : purge the fork subtree, and defer-release the freed acc pool
1386 : elements. The sibling-list unlink is done here (not on T1) because
1387 : advance_root / remove_children also mutate sibling lists on T2, and
1388 : T2 is single-threaded so plain stores are safe. */
1389 :
1390 : static void
1391 : background_purge( fd_accdb_t * accdb,
1392 12 : fd_accdb_fork_id_t fork_id ) {
1393 : /* Unlink fork_id from its parent's child list. This runs on T2
1394 : which is the sole mutator of sibling lists (advance_root and
1395 : remove_children also run on T2), so plain stores are safe. */
1396 12 : fd_accdb_fork_t * fork = &accdb->fork_pool[ fork_id.val ];
1397 12 : fd_accdb_fork_id_t parent_id = fork->shmem->parent_id;
1398 12 : if( FD_LIKELY( parent_id.val!=USHORT_MAX ) ) {
1399 12 : fd_accdb_fork_t * parent = &accdb->fork_pool[ parent_id.val ];
1400 12 : if( FD_UNLIKELY( parent->shmem->child_id.val==fork_id.val ) ) {
1401 12 : parent->shmem->child_id = fork->shmem->sibling_id;
1402 12 : } else {
1403 0 : fd_accdb_fork_id_t prev_id = parent->shmem->child_id;
1404 0 : while( prev_id.val!=USHORT_MAX ) {
1405 0 : fd_accdb_fork_t * prev = &accdb->fork_pool[ prev_id.val ];
1406 0 : if( prev->shmem->sibling_id.val==fork_id.val ) {
1407 0 : prev->shmem->sibling_id = fork->shmem->sibling_id;
1408 0 : break;
1409 0 : }
1410 0 : prev_id = prev->shmem->sibling_id;
1411 0 : }
1412 0 : }
1413 12 : }
1414 :
1415 12 : drain_deferred_frees( accdb );
1416 :
1417 12 : fd_accdb_fork_shmem_t * fork_head = NULL;
1418 12 : fd_accdb_fork_shmem_t * fork_tail = NULL;
1419 12 : purge_inner( accdb, fork_id, &fork_head, &fork_tail );
1420 :
1421 12 : ulong tag = FD_ATOMIC_FETCH_AND_ADD( &accdb->shmem->epoch, 1UL );
1422 12 : if( FD_LIKELY( accdb->shmem->deferred_acc_buf_cnt ) ) {
1423 12 : accdb->shmem->deferred_acc_epoch = tag;
1424 12 : }
1425 12 : if( FD_LIKELY( fork_head ) ) {
1426 12 : accdb->deferred_fork_head = fork_head;
1427 12 : accdb->deferred_fork_tail = fork_tail;
1428 12 : accdb->deferred_fork_epoch = tag;
1429 12 : }
1430 12 : }
1431 :
1432 : void
1433 : fd_accdb_purge( fd_accdb_t * accdb,
1434 12 : fd_accdb_fork_id_t fork_id ) {
1435 12 : FD_TEST( fork_id.val!=accdb->shmem->root_fork_id.val );
1436 :
1437 12 : wait_cmd( accdb );
1438 12 : submit_cmd( accdb, FD_ACCDB_CMD_PURGE, fork_id.val );
1439 12 : }
1440 :
1441 : static inline fd_accdb_cache_line_t *
1442 : acquire_cache_line( fd_accdb_t * accdb,
1443 : ulong size_class,
1444 830712 : uint * out_evicted_acc_idx ) {
1445 : /* Priority 1: CAS free list — already invalidated,
1446 : persisted==1, generation==UINT_MAX. Cheapest path. */
1447 830712 : fd_accdb_cache_line_t * result = cache_free_pop( accdb, size_class );
1448 830712 : if( FD_LIKELY( result ) ) {
1449 701286 : result->refcnt = 1;
1450 701286 : result->referenced = 0;
1451 701286 : *out_evicted_acc_idx = UINT_MAX;
1452 701286 : return result;
1453 701286 : }
1454 :
1455 : /* Priority 2: Lazy initial allocation — atomic FAA with undo on
1456 : overflow. Safe for concurrent callers. */
1457 129426 : ulong old_init = FD_ATOMIC_FETCH_AND_ADD( &accdb->shmem->cache_class_init[ size_class ].val, 1UL );
1458 129426 : if( FD_LIKELY( old_init<accdb->shmem->cache_class_max[ size_class ] ) ) {
1459 129414 : result = cache_line( accdb, size_class, old_init );
1460 129414 : result->refcnt = 1;
1461 129414 : result->persisted = 1;
1462 129414 : result->referenced = 0;
1463 129414 : result->acc_idx = UINT_MAX;
1464 129414 : result->key.generation = UINT_MAX;
1465 129414 : *out_evicted_acc_idx = UINT_MAX;
1466 129414 : return result;
1467 129414 : }
1468 12 : FD_ATOMIC_FETCH_AND_SUB( &accdb->shmem->cache_class_init[ size_class ].val, 1UL );
1469 :
1470 : /* Priority 3: CLOCK sweep ... scan forward giving second chances. */
1471 30 : for(;;) {
1472 30 : ulong hand = FD_ATOMIC_FETCH_AND_ADD( &accdb->shmem->clock_hand[ size_class ].val, 1UL ) % accdb->shmem->cache_class_max[ size_class ];
1473 30 : fd_accdb_cache_line_t * line = cache_line( accdb, size_class, hand );
1474 :
1475 30 : if( FD_UNLIKELY( line->key.generation==UINT_MAX && line->acc_idx==UINT_MAX ) ) continue;
1476 :
1477 30 : uint rc = FD_VOLATILE_CONST( line->refcnt );
1478 30 : if( FD_UNLIKELY( rc!=0U ) ) continue; /* Pinned or being evicted */
1479 :
1480 30 : if( FD_UNLIKELY( line->referenced ) ) {
1481 18 : line->referenced = 0;
1482 18 : continue; /* Second chance */
1483 18 : }
1484 :
1485 12 : if( FD_UNLIKELY( FD_ATOMIC_CAS( &line->refcnt, 0U, FD_ACCDB_EVICT_SENTINEL )!=0U ) ) continue;
1486 :
1487 : /* The line is now claimed for eviction (refcnt==EVICT_SENTINEL). A
1488 : concurrent acc_unlink that targets this same line's accmeta will
1489 : observe the sentinel here and take its do-nothing branch — see the
1490 : test_accdb_racesan SENTINEL case. */
1491 12 : fd_racesan_hook( "clock_evict:post_sentinel" );
1492 :
1493 12 : if( FD_LIKELY( line->acc_idx!=UINT_MAX ) ) {
1494 12 : evict_clear_acc_cache_ref( &accdb->acc_pool[ line->acc_idx ], size_class, hand );
1495 12 : }
1496 12 : *out_evicted_acc_idx = line->persisted ? UINT_MAX : line->acc_idx;
1497 12 : line->key.generation = UINT_MAX;
1498 12 : line->refcnt = 1;
1499 12 : line->referenced = 0;
1500 12 : return line;
1501 12 : }
1502 :
1503 0 : FD_TEST( 0 );
1504 0 : return NULL;
1505 0 : }
1506 :
1507 : static inline void
1508 : change_partition( fd_accdb_t * accdb,
1509 : accdb_offset_t const * offset_before,
1510 : accdb_offset_t * out_offset,
1511 : int * has_partition,
1512 30 : uchar layer ) {
1513 : /* New data will not fit in the current partition, so we need to
1514 : move to the next one. */
1515 30 : ulong partition_idx_before = packed_partition_idx( offset_before );
1516 30 : ulong partition_offset_before = packed_partition_offset( offset_before );
1517 30 : if( FD_LIKELY( *has_partition ) ) {
1518 24 : fd_accdb_partition_t * before = partition_pool_ele( accdb->partition_pool, partition_idx_before );
1519 24 : before->write_offset = partition_offset_before;
1520 24 : }
1521 :
1522 : /* Single rdtsc per partition lifecycle event: stamp the closing
1523 : partition's filled time and the new partition's created time off
1524 : the same sample. */
1525 30 : long now_ticks = (long)fd_tickcount();
1526 :
1527 30 : ulong free_size = accdb->shmem->partition_sz - partition_offset_before;
1528 30 : if( FD_LIKELY( *has_partition ) ) {
1529 24 : fd_accdb_partition_t * old = partition_pool_ele( accdb->partition_pool, partition_idx_before );
1530 24 : FD_ATOMIC_FETCH_AND_ADD( &old->bytes_freed, free_size );
1531 24 : FD_VOLATILE( old->filled_ticks ) = now_ticks;
1532 : /* The tail slack is now committed dead — count it as current
1533 : (written-through) so fragmentation reflects it. */
1534 24 : FD_ATOMIC_FETCH_AND_ADD( &accdb->shmem->shmetrics->disk_current_bytes, free_size );
1535 24 : }
1536 :
1537 30 : if( FD_UNLIKELY( !partition_pool_free( accdb->partition_pool ) ) ) FD_LOG_ERR(( "accounts database file is at capacity" ));
1538 30 : fd_accdb_partition_t * partition = partition_pool_ele_acquire( accdb->partition_pool );
1539 30 : partition->bytes_freed = 0UL;
1540 30 : partition->marked_compaction = 0;
1541 30 : partition->layer = layer;
1542 30 : partition->read_ops = 0UL;
1543 30 : partition->bytes_read = 0UL;
1544 30 : partition->write_ops = 0UL;
1545 30 : partition->bytes_written = 0UL;
1546 30 : partition->write_offset = 0UL;
1547 30 : partition->compaction_offset = 0UL;
1548 30 : partition->created_ticks = now_ticks;
1549 30 : partition->filled_ticks = 0L;
1550 30 : partition->queued = 0;
1551 30 : partition->compacting_now = 0;
1552 :
1553 30 : ulong new_partition_idx = partition_pool_idx( accdb->partition_pool, partition );
1554 30 : int had_partition = *has_partition;
1555 30 : *out_offset = accdb_offset( new_partition_idx, 0UL );
1556 30 : *has_partition = 1;
1557 :
1558 : /* Now that the write head has been rotated away from the old
1559 : partition, check if it should be enqueued for compaction. We call
1560 : try_enqueue directly because the caller already holds
1561 : partition_lock (calling fd_accdb_shmem_bytes_freed here would
1562 : deadlock on the non-reentrant lock). Skip when
1563 : has_partition was 0, because the sentinel partition_idx is
1564 : not a valid pool element. */
1565 30 : if( FD_LIKELY( had_partition && partition_idx_before!=new_partition_idx ) ) {
1566 24 : fd_accdb_shmem_try_enqueue_compaction( accdb->shmem, partition_idx_before );
1567 24 : }
1568 :
1569 : /* Snapshot-load tiering: accounts loaded from a snapshot never get
1570 : a second write, so compaction-driven promotion never fires and
1571 : they would otherwise live in Hot forever. When snapshot_loading
1572 : is set, tag the new partition as Cold up front. We do not set
1573 : has_partition[Cold] / whead[Cold] — those are owned by the
1574 : compaction tile and represent the live Cold write head, which is
1575 : independent of snapshot-loaded partitions that happen to be
1576 : labeled Cold. */
1577 30 : if( FD_UNLIKELY( accdb->snapshot_loading && layer==0 ) ) {
1578 30 : FD_VOLATILE( partition->layer ) = FD_ACCDB_COMPACTION_LAYER_CNT-1UL;
1579 30 : }
1580 :
1581 30 : if( FD_UNLIKELY( new_partition_idx>=accdb->shmem->partition_max ) ) {
1582 30 : FD_LOG_INFO(( "growing accounts database from %lu GiB to %lu GiB", accdb->shmem->partition_max*accdb->shmem->partition_sz/(1UL<<30UL), (new_partition_idx+1UL)*accdb->shmem->partition_sz/(1UL<<30UL) ));
1583 :
1584 30 : int result = fallocate( accdb->fd, 0, (long)(new_partition_idx*accdb->shmem->partition_sz), (long)accdb->shmem->partition_sz );
1585 30 : if( FD_UNLIKELY( -1==result ) ) {
1586 0 : if( FD_LIKELY( errno==ENOSPC ) ) FD_LOG_ERR(( "fallocate() failed (%d-%s). The accounts database filled "
1587 0 : "the disk it is on, trying to grow from %lu GiB to %lu GiB. Please "
1588 0 : "free up disk space and restart the validator.",
1589 0 : errno, fd_io_strerror( errno ), accdb->shmem->partition_max*accdb->shmem->partition_sz/(1UL<<30UL), (new_partition_idx+1UL)*accdb->shmem->partition_sz/(1UL<<30UL) ));
1590 0 : else FD_LOG_ERR(( "fallocate() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
1591 0 : }
1592 :
1593 : /* CAS loop: the compaction tile may also be growing the file
1594 : concurrently, so neither path may clobber the other. */
1595 30 : for(;;) {
1596 30 : ulong cur = accdb->shmem->partition_max;
1597 30 : if( FD_LIKELY( new_partition_idx+1UL<=cur ) ) break;
1598 30 : if( FD_LIKELY( FD_ATOMIC_CAS( &accdb->shmem->partition_max, cur, new_partition_idx+1UL )==cur ) ) break;
1599 30 : }
1600 30 : accdb->shmem->shmetrics->disk_allocated_bytes = accdb->shmem->partition_max*accdb->shmem->partition_sz;
1601 30 : }
1602 30 : }
1603 :
1604 : static inline ulong
1605 : allocate_next_write( fd_accdb_t * accdb,
1606 45 : ulong sz ) {
1607 63 : for(;;) {
1608 63 : accdb_offset_t offset = { .val = FD_ATOMIC_FETCH_AND_ADD( &accdb->shmem->whead[ 0 ].val, sz ) };
1609 63 : if( FD_LIKELY( packed_partition_offset( &offset )+sz<=accdb->shmem->partition_sz ) ) {
1610 45 : FD_ATOMIC_FETCH_AND_ADD( &accdb->shmem->shmetrics->disk_current_bytes, sz );
1611 45 : ulong file_offset = packed_partition_file_offset( &offset, accdb->shmem->partition_sz );
1612 45 : fd_accdb_partition_write_bump( accdb, file_offset, sz );
1613 45 : return file_offset;
1614 45 : }
1615 :
1616 18 : if( FD_UNLIKELY( packed_partition_offset( &offset )>accdb->shmem->partition_sz ) ) {
1617 : /* This can happen if another thread also raced to allocate the
1618 : next write and won. Wait for the partition switch to finish
1619 : before retrying, so we do not keep doing fetch-and-adds that
1620 : advance the offset further past the boundary.
1621 :
1622 : A switch is detected by the head moving to a different
1623 : partition index OR its offset dropping back to a valid position
1624 : (a switch resets the offset to 0). We must not key the wait
1625 : solely on the index changing: the initial write head is a
1626 : sentinel whose packed index can coincide with a real pool
1627 : index. */
1628 0 : ulong stale_partition = packed_partition_idx( &offset );
1629 0 : for(;;) {
1630 0 : accdb_offset_t cur = { .val = FD_VOLATILE_CONST( accdb->shmem->whead[ 0 ].val ) };
1631 0 : if( packed_partition_idx( &cur )!=stale_partition ) break;
1632 0 : if( packed_partition_offset( &cur )<=accdb->shmem->partition_sz ) break;
1633 0 : FD_SPIN_PAUSE();
1634 0 : }
1635 0 : continue;
1636 0 : }
1637 :
1638 18 : spin_lock_acquire( &accdb->shmem->partition_lock );
1639 18 : change_partition( accdb, &offset, &accdb->shmem->whead[ 0 ], &accdb->shmem->has_partition[ 0 ], 0 );
1640 18 : spin_lock_release( &accdb->shmem->partition_lock );
1641 18 : }
1642 45 : }
1643 :
1644 : /* Compaction write allocation. Single-threaded: only the compaction
1645 : tile calls these, so the compaction write heads do not need atomic
1646 : fetch-and-add. dest_layer is the target layer (1..N-1). */
1647 :
1648 : static inline ulong
1649 : allocate_next_compaction_write( fd_accdb_t * accdb,
1650 : ulong sz,
1651 0 : ulong dest_layer ) {
1652 0 : accdb_offset_t offset = accdb->shmem->whead[ dest_layer ];
1653 0 : if( FD_UNLIKELY( !accdb->shmem->has_partition[ dest_layer ] ||
1654 0 : packed_partition_offset( &offset )+sz>accdb->shmem->partition_sz ) ) {
1655 0 : spin_lock_acquire( &accdb->shmem->partition_lock );
1656 0 : change_partition( accdb, &offset, &accdb->shmem->whead[ dest_layer ], &accdb->shmem->has_partition[ dest_layer ], (uchar)dest_layer );
1657 0 : spin_lock_release( &accdb->shmem->partition_lock );
1658 0 : offset = accdb->shmem->whead[ dest_layer ];
1659 0 : }
1660 0 : accdb->shmem->whead[ dest_layer ].val += sz;
1661 0 : FD_ATOMIC_FETCH_AND_ADD( &accdb->shmem->shmetrics->disk_current_bytes, sz );
1662 0 : ulong file_offset = packed_partition_file_offset( &offset, accdb->shmem->partition_sz );
1663 0 : fd_accdb_partition_write_bump( accdb, file_offset, sz );
1664 0 : return file_offset;
1665 0 : }
1666 :
1667 : /* fd_accdb_compact relocates one record from the oldest partition
1668 : queued for compaction at src_layer into the write head for the
1669 : next colder tier, or the same tier for the deepest layer. It is
1670 : designed to be called repeatedly from a dedicated compaction tile.
1671 : If there is work to do, *charge_busy is set to 1; otherwise 0 is
1672 : left unchanged and the call returns immediately.
1673 :
1674 : src_layer must be in 0..FD_ACCDB_COMPACTION_LAYER_CNT-1. */
1675 :
1676 : static void
1677 : background_compact( fd_accdb_t * accdb,
1678 : ulong src_layer,
1679 9 : int * charge_busy ) {
1680 9 : FD_COMPILER_MFENCE();
1681 9 : FD_VOLATILE( *accdb->my_epoch_slot ) = FD_VOLATILE_CONST( accdb->shmem->epoch );
1682 9 : FD_HW_MFENCE(); /* StoreLoad: epoch store must be globally visible
1683 : before any subsequent loads so the deferred
1684 : reclamation scan does not miss us. */
1685 :
1686 : /* Reclaim any deferred-free partitions whose epoch has been observed
1687 : by all joiners (i.e. no epoch-publishing joiner could still be
1688 : referencing data in them). Scan writer slots [0, joiner_cnt)
1689 : plus each external (read-only) joiner's private epoch fseq. */
1690 9 : ulong min_epoch = ULONG_MAX;
1691 9 : ulong joiner_cnt = FD_VOLATILE_CONST( accdb->shmem->joiner_cnt );
1692 18 : for( ulong t=0UL; t<joiner_cnt; t++ ) {
1693 9 : ulong e = FD_VOLATILE_CONST( accdb->shmem->joiner_epochs[ t ].val );
1694 9 : if( FD_LIKELY( e<min_epoch ) ) min_epoch = e;
1695 9 : }
1696 9 : for( ulong t=0UL; t<accdb->external_epoch_cnt; t++ ) {
1697 0 : ulong e = FD_VOLATILE_CONST( *accdb->external_epoch_slots[ t ] );
1698 0 : if( FD_LIKELY( e<min_epoch ) ) min_epoch = e;
1699 0 : }
1700 9 : for(;;) {
1701 9 : if( FD_LIKELY( deferred_free_dlist_is_empty( accdb->deferred_free_dlist, accdb->partition_pool ) ) ) break;
1702 0 : fd_accdb_partition_t * p = deferred_free_dlist_ele_peek_head( accdb->deferred_free_dlist, accdb->partition_pool );
1703 0 : if( FD_LIKELY( p->epoch_tag>=min_epoch ) ) break;
1704 :
1705 0 : fd_racesan_hook( "accdb_reclaim:pre_free_partition" );
1706 :
1707 0 : spin_lock_acquire( &accdb->shmem->partition_lock );
1708 0 : deferred_free_dlist_ele_pop_head( accdb->deferred_free_dlist, accdb->partition_pool );
1709 0 : FD_ATOMIC_FETCH_AND_SUB( &accdb->shmem->shmetrics->disk_current_bytes, accdb->shmem->partition_sz );
1710 0 : partition_pool_ele_release( accdb->partition_pool, p );
1711 0 : spin_lock_release( &accdb->shmem->partition_lock );
1712 0 : }
1713 :
1714 9 : if( FD_LIKELY( compaction_dlist_is_empty( accdb->compaction_dlist[ src_layer ], accdb->partition_pool ) ) ) {
1715 9 : FD_COMPILER_MFENCE();
1716 9 : FD_VOLATILE( *accdb->my_epoch_slot ) = ULONG_MAX;
1717 9 : return;
1718 9 : }
1719 0 : fd_accdb_partition_t * compact = compaction_dlist_ele_peek_head( accdb->compaction_dlist[ src_layer ], accdb->partition_pool );
1720 0 : if( FD_UNLIKELY( !compact ) ) {
1721 0 : FD_COMPILER_MFENCE();
1722 0 : FD_VOLATILE( *accdb->my_epoch_slot ) = ULONG_MAX;
1723 0 : return;
1724 0 : }
1725 :
1726 : /* Wait until all epoch-publishing joiners that were active when this
1727 : partition was enqueued for compaction have exited, ensuring any
1728 : in-flight pwritev2 to this partition has completed before we start
1729 : reading from it. */
1730 0 : if( FD_UNLIKELY( compact->compaction_ready_epoch>=min_epoch ) ) {
1731 0 : FD_COMPILER_MFENCE();
1732 0 : FD_VOLATILE( *accdb->my_epoch_slot ) = ULONG_MAX;
1733 0 : return;
1734 0 : }
1735 :
1736 0 : *charge_busy = 1;
1737 :
1738 : /* Mark the head partition as actively compacting. */
1739 0 : FD_VOLATILE( compact->queued ) = 0;
1740 0 : FD_VOLATILE( compact->compacting_now ) = 1;
1741 :
1742 0 : fd_accdb_disk_meta_t meta[1];
1743 :
1744 0 : ulong compact_base = partition_pool_idx( accdb->partition_pool, compact )*accdb->shmem->partition_sz;
1745 :
1746 : /* Read the on-disk metadata header at the current compaction
1747 : cursor within the partition being compacted. */
1748 0 : ulong bytes_read = 0UL;
1749 0 : while( FD_UNLIKELY( bytes_read<sizeof(fd_accdb_disk_meta_t) ) ) {
1750 0 : long result = pread( accdb->fd, ((uchar *)meta)+bytes_read, sizeof(fd_accdb_disk_meta_t)-bytes_read, (long)(compact_base+compact->compaction_offset+bytes_read) );
1751 0 : if( FD_UNLIKELY( -1==result && (errno==EINTR || errno==EAGAIN || errno==EWOULDBLOCK ) ) ) continue;
1752 0 : else if( FD_UNLIKELY( -1==result ) ) FD_LOG_ERR(( "pread() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
1753 0 : else if( FD_UNLIKELY( !result ) ) FD_LOG_ERR(( "accounts database is corrupt, data expected at offset %lu with size %lu exceeded file extents",
1754 0 : compact_base+compact->compaction_offset+bytes_read, sizeof(fd_accdb_disk_meta_t) ));
1755 0 : fd_accdb_partition_read_bump( accdb, compact_base+compact->compaction_offset, (ulong)result );
1756 0 : bytes_read += (ulong)result;
1757 0 : }
1758 :
1759 : /* Walk the hash chain to find a live index entry whose on-disk
1760 : offset matches the record we are compacting. */
1761 0 : fd_accdb_accmeta_t * accmeta = NULL;
1762 0 : ulong source_packed = 0UL;
1763 0 : uint acc_idx = FD_VOLATILE_CONST( accdb->acc_map[ fd_accdb_hash( meta->pubkey, accdb->shmem->seed )&(accdb->shmem->chain_cnt-1UL) ] );
1764 0 : while( acc_idx!=UINT_MAX ) {
1765 0 : fd_accdb_accmeta_t * candidate = &accdb->acc_pool[ acc_idx ];
1766 0 : uint next_idx = FD_VOLATILE_CONST( candidate->map.next );
1767 0 : ulong candidate_packed = FD_VOLATILE_CONST( candidate->offset_fork );
1768 0 : if( FD_LIKELY( (candidate_packed & FD_ACCDB_OFF_MASK)==compact_base+compact->compaction_offset ) ) {
1769 0 : accmeta = candidate;
1770 0 : source_packed = candidate_packed;
1771 0 : break;
1772 0 : }
1773 0 : acc_idx = next_idx;
1774 0 : }
1775 :
1776 0 : ulong record_sz = sizeof(fd_accdb_disk_meta_t) + (ulong)meta->size;
1777 0 : ulong bytes_copied = 0UL;
1778 0 : if( FD_UNLIKELY( !accmeta ) ) {
1779 : /* Dead record — the index entry was already removed, so this
1780 : on-disk extent is garbage. Nothing to relocate. */
1781 0 : } else {
1782 0 : ulong dest_layer = fd_ulong_min( src_layer+1UL, FD_ACCDB_COMPACTION_LAYER_CNT-1UL );
1783 0 : ulong dest_offset = allocate_next_compaction_write( accdb, record_sz, dest_layer );
1784 :
1785 0 : while( FD_UNLIKELY( bytes_copied<record_sz ) ) {
1786 0 : long in_off = (long)(compact_base + compact->compaction_offset + bytes_copied);
1787 0 : long out_off = (long)(dest_offset + bytes_copied);
1788 :
1789 0 : long result = copy_file_range( accdb->fd, &in_off, accdb->fd, &out_off, record_sz-bytes_copied, 0 );
1790 0 : if( FD_UNLIKELY( -1==result && (errno==EINTR || errno==EAGAIN || errno==EWOULDBLOCK ) ) ) continue;
1791 0 : else if( FD_UNLIKELY( -1==result ) ) FD_LOG_ERR(( "copy_file_range() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
1792 0 : else if( FD_UNLIKELY( !result ) ) FD_LOG_ERR(( "accounts database is corrupt, data expected at offset %lu with size %lu exceeded file extents",
1793 0 : compact_base+compact->compaction_offset+bytes_copied, record_sz ));
1794 0 : fd_accdb_partition_read_bump( accdb, compact_base+compact->compaction_offset+bytes_copied, (ulong)result );
1795 0 : bytes_copied += (ulong)result;
1796 0 : accdb->metrics->copy_ops++;
1797 0 : }
1798 :
1799 0 : accdb->shmem->shmetrics->accounts_relocated++;
1800 0 : accdb->shmem->shmetrics->accounts_relocated_bytes += bytes_copied;
1801 :
1802 : /* Ensure the data is on disk before publishing the new offset,
1803 : so concurrent acquire threads do not preadv2 from a location
1804 : that hasn't been written yet. */
1805 0 : FD_COMPILER_MFENCE();
1806 :
1807 : /* CAS the offset from the exact source record we copied to the new
1808 : destination. If a concurrent release overwrote the offset to
1809 : FD_ACCDB_OFF_INVAL (dirty sentinel for a new commit), or later
1810 : published a newer on-disk location, the CAS fails and we treat
1811 : the relocated copy as stale. We CAS the full packed
1812 : offset_fork so the fork_id is preserved and so we only publish
1813 : the relocation if the copied source record is still current. */
1814 0 : ulong new_packed = ( source_packed & ~FD_ACCDB_OFF_MASK ) | ( dest_offset & FD_ACCDB_OFF_MASK );
1815 :
1816 : #if FD_HAS_RACESAN
1817 : fd_memcpy( fd_accdb_dbg_reloc_pubkey, accmeta->key.pubkey, 32UL );
1818 : fd_accdb_dbg_reloc_dest = dest_offset;
1819 : fd_accdb_dbg_reloc_cnt++;
1820 : #endif
1821 :
1822 0 : fd_racesan_hook( "accdb_compact:pre_offset_cas" );
1823 0 : if( FD_UNLIKELY( FD_ATOMIC_CAS( &accmeta->offset_fork, source_packed, new_packed )!=source_packed ) ) {
1824 : /* Record was superseded by a concurrent overwrite commit.
1825 : The disk space we just wrote is dead on arrival — account
1826 : it as freed so compaction can reclaim it later. */
1827 0 : fd_accdb_shmem_bytes_freed( accdb->shmem, dest_offset, record_sz );
1828 0 : bytes_copied = 0UL;
1829 0 : }
1830 0 : }
1831 :
1832 0 : fd_racesan_hook( "accdb_compact:post_relocate" );
1833 :
1834 0 : compact->compaction_offset += record_sz;
1835 :
1836 0 : if( FD_UNLIKELY( compact->compaction_offset>=compact->write_offset ) ) {
1837 0 : FD_LOG_NOTICE(( "compaction of partition %lu completed", partition_pool_idx( accdb->partition_pool, compact ) ));
1838 :
1839 : /* Ensure the new acc->offset_fork stores above are visible to other
1840 : cores before the source partition is moved to the deferred-free
1841 : list. On x86 (TSO) hardware store ordering already guarantees
1842 : this, but the compiler fence prevents the compiler from sinking
1843 : the offset store past the inlined pool/dlist mutations below. */
1844 0 : FD_COMPILER_MFENCE();
1845 :
1846 : /* Bump the global epoch and tag this partition so the reclamation
1847 : scan knows when all epoch-publishing joiners that could reference
1848 : data in this partition have exited. */
1849 0 : ulong tag = FD_ATOMIC_FETCH_AND_ADD( &accdb->shmem->epoch, 1UL );
1850 0 : compact->epoch_tag = tag;
1851 :
1852 : /* partition_lock serializes these dlist/pool mutations with
1853 : concurrent push_tail in fd_accdb_shmem_bytes_freed and
1854 : partition_pool_ele_acquire in change_partition. Neither fd_dlist
1855 : nor fd_pool are thread-safe, so all mutations must be under the
1856 : same lock. */
1857 0 : spin_lock_acquire( &accdb->shmem->partition_lock );
1858 :
1859 0 : accdb->shmem->shmetrics->partitions_freed++;
1860 0 : compaction_dlist_ele_pop_head( accdb->compaction_dlist[ src_layer ], accdb->partition_pool );
1861 0 : FD_VOLATILE( compact->compacting_now ) = 0;
1862 0 : FD_VOLATILE( compact->queued ) = 0;
1863 0 : deferred_free_dlist_ele_push_tail( accdb->deferred_free_dlist, compact, accdb->partition_pool );
1864 :
1865 0 : accdb->shmem->shmetrics->compactions_completed++;
1866 0 : if( FD_LIKELY( compaction_dlist_is_empty( accdb->compaction_dlist[ src_layer ], accdb->partition_pool ) ) ) {
1867 0 : accdb->shmem->shmetrics->in_compaction = 0;
1868 0 : } else {
1869 0 : fd_accdb_partition_t * next = compaction_dlist_ele_peek_head( accdb->compaction_dlist[ src_layer ], accdb->partition_pool );
1870 0 : FD_LOG_NOTICE(( "compaction of layer %lu partition %lu started", src_layer, partition_pool_idx( accdb->partition_pool, next ) ));
1871 0 : }
1872 :
1873 0 : spin_lock_release( &accdb->shmem->partition_lock );
1874 0 : }
1875 :
1876 0 : accdb->metrics->bytes_read += bytes_read + bytes_copied;
1877 0 : accdb->metrics->bytes_written += bytes_copied;
1878 :
1879 0 : FD_COMPILER_MFENCE();
1880 0 : FD_VOLATILE( *accdb->my_epoch_slot ) = ULONG_MAX;
1881 0 : }
1882 :
1883 : /* cold_load_acc resolves the cache slot for `acc` when STEP 1's
1884 : cache_try_pin failed. It uses bit 29 of executable_size as a
1885 : single-claimer lock so that two concurrent acquirers cannot each
1886 : install their own cache slot for the same acc (which would orphan
1887 : one slot with a dangling line->acc_idx and eventually corrupt
1888 : acc->cache_valid via CLOCK).
1889 :
1890 : Protocol per acc:
1891 : - If cache_valid is set, retry cache_try_pin (another thread
1892 : finished the cold-load while we were here). On success, mark
1893 : exists_in_cache so STEP 4 will not write back the slot.
1894 : - If claim is set, spin (another thread is mid-cold-load).
1895 : - Otherwise CAS-set the claim bit. Winner allocates a cache
1896 : line, populates the placeholder (acc_idx=UINT_MAX), publishes
1897 : cache_idx, then atomically (CAS-loop) sets cache_valid and
1898 : clears claim.
1899 :
1900 : The eviction sites that clear cache_valid must use FETCH_AND with
1901 : ~CACHE_VALID_BIT (preserving the claim bit) to interact correctly
1902 : with this protocol. */
1903 :
1904 : static fd_accdb_cache_line_t *
1905 : cold_load_acc( fd_accdb_t * accdb,
1906 : fd_accdb_accmeta_t * accmeta,
1907 : uchar const * pubkey,
1908 : int * out_exists_in_cache,
1909 24 : uint * out_evicted_acc_idx ) {
1910 24 : for(;;) {
1911 24 : uint old_es = FD_VOLATILE_CONST( accmeta->executable_size );
1912 24 : int valid = FD_ACCDB_SIZE_CACHE_VALID( old_es );
1913 24 : int claimed = FD_ACCDB_SIZE_CACHE_CLAIM( old_es );
1914 :
1915 24 : if( FD_UNLIKELY( valid ) ) {
1916 : /* old_es snapshot saw VALID=1 but a concurrent
1917 : evict_clear_acc_cache_ref may have cleared VALID and stored
1918 : cache_idx=INVAL between our snapshot and this load. Decoding
1919 : INVAL would yield a wild cache_line pointer; retry the loop
1920 : instead (next iteration will see VALID=0). */
1921 0 : uint cidx = FD_VOLATILE_CONST( accmeta->cache_idx );
1922 0 : if( FD_UNLIKELY( cidx==FD_ACCDB_ACC_CIDX_INVAL ) ) { FD_SPIN_PAUSE(); continue; }
1923 0 : fd_accdb_cache_line_t * hit = cache_line( accdb, FD_ACCDB_ACC_CIDX_CLASS( cidx ), FD_ACCDB_ACC_CIDX_IDX( cidx ) );
1924 0 : fd_racesan_hook( "accdb_cold_load:pre_try_pin" );
1925 0 : fd_accdb_cache_line_t * pinned = cache_try_pin( hit, pubkey, accmeta->key.generation );
1926 0 : if( FD_LIKELY( pinned ) ) {
1927 0 : *out_exists_in_cache = 1;
1928 0 : *out_evicted_acc_idx = UINT_MAX;
1929 0 : return pinned;
1930 0 : }
1931 0 : FD_SPIN_PAUSE();
1932 0 : continue;
1933 0 : }
1934 :
1935 24 : if( FD_UNLIKELY( claimed ) ) {
1936 0 : fd_racesan_hook( "accdb_cold_load:claim_wait" );
1937 0 : FD_SPIN_PAUSE();
1938 0 : continue;
1939 0 : }
1940 :
1941 24 : if( FD_UNLIKELY( FD_ATOMIC_CAS( &accmeta->executable_size, old_es, old_es | FD_ACCDB_SIZE_CACHE_CLAIM_BIT )!=old_es ) ) {
1942 0 : FD_SPIN_PAUSE();
1943 0 : continue;
1944 0 : }
1945 :
1946 : /* We hold the claim. Allocate a cache line and publish. */
1947 24 : ulong size_class = fd_accdb_cache_class( FD_ACCDB_SIZE_DATA( old_es ) );
1948 24 : fd_accdb_cache_line_t * line = acquire_cache_line( accdb, size_class, out_evicted_acc_idx );
1949 24 : fd_memcpy( line->key.pubkey, accmeta->key.pubkey, 32UL );
1950 24 : line->key.generation = accmeta->key.generation;
1951 : /* Leave acc_idx at UINT_MAX (the "loading" sentinel) until step 12
1952 : publishes it after the preadv2 fence. Concurrent threads that
1953 : pin via cache_idx will spin on this in step 13. */
1954 24 : line->acc_idx = UINT_MAX;
1955 24 : FD_COMPILER_MFENCE();
1956 24 : FD_VOLATILE( accmeta->cache_idx ) = FD_ACCDB_ACC_CIDX_PACK( (uint)size_class, (uint)cache_line_idx( accdb, size_class, line ) );
1957 24 : FD_COMPILER_MFENCE();
1958 :
1959 24 : fd_racesan_hook( "accdb_cold_load:pre_valid" );
1960 :
1961 : /* Atomically set CACHE_VALID_BIT and clear CACHE_CLAIM_BIT.
1962 : Eviction may have flipped CACHE_VALID_BIT on us between our
1963 : claim and now (it preserves CLAIM but can clear VALID); the
1964 : CAS loop tolerates that. The data length and exec bits stay
1965 : unchanged. */
1966 24 : for(;;) {
1967 24 : uint cur = FD_VOLATILE_CONST( accmeta->executable_size );
1968 24 : uint nxt = (cur & ~FD_ACCDB_SIZE_CACHE_CLAIM_BIT) | FD_ACCDB_SIZE_CACHE_VALID_BIT;
1969 24 : if( FD_LIKELY( FD_ATOMIC_CAS( &accmeta->executable_size, cur, nxt )==cur ) ) break;
1970 0 : FD_SPIN_PAUSE();
1971 0 : }
1972 :
1973 24 : *out_exists_in_cache = 0;
1974 24 : return line;
1975 24 : }
1976 24 : }
1977 :
1978 246708 : #define RESERVATION_TYPE_SIMPLE (0)
1979 174 : #define RESERVATION_TYPE_MAYBE_PROGRAMDATA (1)
1980 174 : #define RESERVATION_TYPE_ALREADY_RESERVED (2)
1981 :
1982 : static void
1983 : fd_accdb_acquire_inner( fd_accdb_t * accdb,
1984 : fd_accdb_fork_id_t fork_id,
1985 : int reservation_type,
1986 : ulong reserved_cnt,
1987 : ulong pubkeys_cnt,
1988 : uchar const * const * pubkeys,
1989 : int * writable,
1990 247056 : fd_acc_t * out_accs ) {
1991 247056 : accdb->metrics->acquire_calls++;
1992 :
1993 247056 : ulong max_acquire_cnt = accdb->shmem->bundle_enabled ? FD_ACCDB_MAX_ACQUIRE_CNT : FD_ACCDB_MAX_TX_ACCOUNT_LOCKS;
1994 247056 : FD_TEST( pubkeys_cnt<=max_acquire_cnt );
1995 :
1996 247056 : FD_TEST( FD_VOLATILE_CONST( *accdb->my_epoch_slot )==ULONG_MAX );
1997 :
1998 247056 : FD_COMPILER_MFENCE();
1999 247056 : FD_VOLATILE( *accdb->my_epoch_slot ) = FD_VOLATILE_CONST( accdb->shmem->epoch );
2000 247056 : FD_HW_MFENCE(); /* StoreLoad: epoch store must be globally visible
2001 : before any subsequent loads so the deferred
2002 : reclamation scan does not miss us */
2003 :
2004 : // STEP 1.
2005 : // Locate each account in the fork and index structure, to determine
2006 : // if it already exists, its size and other metadata, and which
2007 : // specific slot (generation) it was last written in.
2008 :
2009 247056 : fd_accdb_fork_t * fork = &accdb->fork_pool[ fork_id.val ];
2010 247056 : uint root_generation = accdb->fork_pool[ accdb->shmem->root_fork_id.val ].shmem->generation;
2011 :
2012 247056 : fd_racesan_hook( "accdb_acquire:post_root_gen" );
2013 :
2014 247056 : fd_accdb_accmeta_t * accmetas[ FD_ACCDB_MAX_ACQUIRE_CNT ];
2015 247056 : ulong acc_map_idxs[ FD_ACCDB_MAX_ACQUIRE_CNT ];
2016 :
2017 : /* Walk the hash chain for each pubkey and take the first visible
2018 : match. Correctness relies on newer entries always being prepended
2019 : to the chain head, which is guaranteed because replay processes
2020 : writes in slot order and release always inserts at the head.
2021 :
2022 : CONCURRENCY: This chain walk runs epoch-protected. A concurrent
2023 : fd_accdb_release may prepend a new node to the same chain while
2024 : we walk it. This is safe on x86-64 (TSO): the releasing thread
2025 : stores all acc fields (pubkey, generation, map.next, ...) before
2026 : publishing the new head via a CAS on acc_map[idx], and TSO
2027 : guarantees a reading core that observes the new head also observes
2028 : all prior stores to the node. A reader that does not yet see the
2029 : new head simply sees an older (still valid) version of the chain.
2030 : On weakly-ordered architectures an explicit acquire fence would be
2031 : needed before the chain walk and a release fence in
2032 : fd_accdb_release before the head-pointer store. Multiple
2033 : concurrent releases serialize on the CAS of the chain head. */
2034 494598 : for( ulong i=0UL; i<pubkeys_cnt; i++ ) {
2035 247542 : acc_map_idxs[ i ] = fd_accdb_hash( pubkeys[ i ], accdb->shmem->seed )&(accdb->shmem->chain_cnt-1UL);
2036 247542 : uint acc = FD_VOLATILE_CONST( accdb->acc_map[ acc_map_idxs[ i ] ] );
2037 304986 : while( acc!=UINT_MAX ) {
2038 136662 : fd_accdb_accmeta_t const * candidate_acc = &accdb->acc_pool[ acc ];
2039 136662 : uint next_acc = FD_VOLATILE_CONST( candidate_acc->map.next );
2040 :
2041 136662 : fd_racesan_hook( "accdb_acquire:post_next" );
2042 :
2043 136662 : if( FD_UNLIKELY( (candidate_acc->key.generation>root_generation &&
2044 136662 : fd_accdb_acc_fork_id(candidate_acc)!=fork_id.val &&
2045 136662 : !descends_set_test( fork->descends, fd_accdb_acc_fork_id(candidate_acc) )) ) ||
2046 136662 : memcmp( pubkeys[ i ], candidate_acc->key.pubkey, 32UL ) ) {
2047 57444 : acc = next_acc;
2048 57444 : continue;
2049 57444 : }
2050 :
2051 79218 : break;
2052 136662 : }
2053 247542 : if( FD_UNLIKELY( acc==UINT_MAX ) ) accmetas[ i ] = NULL;
2054 79218 : else accmetas[ i ] = &accdb->acc_pool[ acc ];
2055 :
2056 : #if FD_TMPL_USE_HANDHOLDING
2057 : if( FD_UNLIKELY( accmetas[ i ] ) ) {
2058 : fd_accdb_accmeta_t const * sel = accmetas[ i ];
2059 : FD_TEST( !memcmp( sel->key.pubkey, pubkeys[ i ], 32UL ) );
2060 : FD_TEST( sel->key.generation<=root_generation ||
2061 : fd_accdb_acc_fork_id( sel )==fork_id.val ||
2062 : descends_set_test( fork->descends, fd_accdb_acc_fork_id( sel ) ) );
2063 : FD_TEST( sel->key.generation<=FD_VOLATILE_CONST( accdb->shmem->generation ) );
2064 : }
2065 : #endif
2066 :
2067 247542 : if( FD_UNLIKELY( accmetas[ i ] && !writable[ i ] && !accmetas[ i ]->lamports ) ) accmetas[ i ] = NULL;
2068 :
2069 : /* Attribute this acquired account to a size class for per-class
2070 : rate metrics. Use the account's current size class when known;
2071 : otherwise (new account) bucket as class 0. */
2072 247542 : ulong acq_class = 0UL;
2073 247542 : if( FD_LIKELY( accmetas[ i ] ) ) acq_class = fd_accdb_cache_class( FD_ACCDB_SIZE_DATA( accmetas[ i ]->executable_size ) );
2074 247542 : if( FD_LIKELY( writable[ i ] ) ) accdb->metrics->writable_accounts_acquired_per_class[ acq_class ]++;
2075 143706 : else accdb->metrics->accounts_acquired_per_class[ acq_class ]++;
2076 247542 : }
2077 :
2078 : // STEP 2.
2079 : // The two-phase programdata acquire (acquire_a then acquire_b)
2080 : // works as follows: acquire_a (RESERVATION_TYPE_MAYBE_PROGRAMDATA)
2081 : // over-reserves one slot in every live size class per candidate
2082 : // account (reserved_cnt total per class), because it does not yet
2083 : // know which accounts have programdata or what size class it lands
2084 : // in. acquire_b then resolves the actual programdata pubkeys and
2085 : // re-enters here with RESERVATION_TYPE_ALREADY_RESERVED to refund
2086 : // the surplus. Keep one reservation per found programdata account
2087 : // in its own size class (consumed later by release) and give the
2088 : // rest back.
2089 247056 : if( FD_UNLIKELY( reservation_type==RESERVATION_TYPE_ALREADY_RESERVED ) ) {
2090 174 : ulong refund[ FD_ACCDB_CACHE_CLASS_CNT ] = {0};
2091 1566 : for( ulong j=0UL; j<FD_ACCDB_CACHE_CLASS_CNT; j++ ) {
2092 1392 : if( FD_LIKELY( accdb->shmem->cache_class_used[ j ].val!=ULONG_MAX ) ) refund[ j ] = reserved_cnt;
2093 1392 : }
2094 180 : for( ulong i=0UL; i<pubkeys_cnt; i++ ) {
2095 6 : if( FD_LIKELY( accmetas[ i ] ) ) {
2096 3 : ulong cls = fd_accdb_cache_class( FD_ACCDB_SIZE_DATA( accmetas[ i ]->executable_size ) );
2097 3 : if( FD_LIKELY( accdb->shmem->cache_class_used[ cls ].val!=ULONG_MAX ) ) {
2098 3 : FD_TEST( refund[ cls ]>0UL );
2099 3 : refund[ cls ]--;
2100 3 : }
2101 3 : }
2102 6 : }
2103 1566 : for( ulong k=0UL; k<FD_ACCDB_CACHE_CLASS_CNT; k++ ) {
2104 1392 : if( FD_UNLIKELY( refund[ k ] ) ) FD_ATOMIC_FETCH_AND_SUB( &accdb->shmem->cache_class_used[ k ].val, refund[ k ] );
2105 1392 : }
2106 174 : }
2107 :
2108 : // STEP 3.
2109 : // We are potentially going to need to read the account data off of
2110 : // disk into the cache, if the account(s) are not in the cache so
2111 : // reserve the necessary cache space. This is done with an "atomic
2112 : // subtract" spin loop on the cache class counters, which is
2113 : // actually faster than doing a real CAS on a packed ulong.
2114 : //
2115 : // For reads, we only need space to copy the account data into a
2116 : // single right-sized cache line, but for writes ... we need to
2117 : // reserve one of every size class. The reason is we are going to
2118 : // need a 10MiB staging buffer for the executor to write to (it may
2119 : // grow the account, so needs the max size class). Even if the
2120 : // account is already in the 10MiB cache class, we need another one
2121 : // because a transaction can fail half way, so we need scratch space
2122 : // to be able to unwind.
2123 : //
2124 : // So we acquire one of each size class. Then when the transaction
2125 : // finishes, if it succeeded, we will copy the data back to the
2126 : // whichever size-class is now right-sized post execution.
2127 247056 : if( FD_LIKELY( reservation_type==RESERVATION_TYPE_SIMPLE || reservation_type==RESERVATION_TYPE_MAYBE_PROGRAMDATA ) ) {
2128 246882 : ulong requested_buckets[ FD_ACCDB_CACHE_CLASS_CNT ] = {0};
2129 494418 : for( ulong i=0UL; i<pubkeys_cnt; i++ ) {
2130 247536 : if( FD_LIKELY( accmetas[ i ] || writable[ i ] ) ) {
2131 171339 : if( FD_LIKELY( accmetas[ i ] ) ) {
2132 79215 : if( FD_UNLIKELY( accdb->shmem->cache_class_used[ fd_accdb_cache_class( FD_ACCDB_SIZE_DATA( accmetas[ i ]->executable_size ) ) ].val!=ULONG_MAX ) ) {
2133 0 : requested_buckets[ fd_accdb_cache_class( FD_ACCDB_SIZE_DATA( accmetas[ i ]->executable_size ) ) ]++;
2134 0 : }
2135 79215 : }
2136 171339 : if( FD_UNLIKELY( writable[ i ] ) ) {
2137 934524 : for( ulong j=0UL; j<FD_ACCDB_CACHE_CLASS_CNT; j++ ) {
2138 830688 : if( FD_UNLIKELY( accdb->shmem->cache_class_used[ j ].val!=ULONG_MAX ) ) {
2139 54 : requested_buckets[ j ]++;
2140 54 : }
2141 830688 : }
2142 103836 : }
2143 171339 : }
2144 :
2145 247536 : if( FD_LIKELY( reservation_type==RESERVATION_TYPE_MAYBE_PROGRAMDATA ) ) {
2146 : /* Any account could also have an implied reference to a
2147 : programdata account, which we don't know yet ... so we need to
2148 : reserve worst case space if they all went to the same size
2149 : class. This reservation runs unconditionally per pubkey (not
2150 : gated on accmetas/writable) so that acquire_b can refund based on
2151 : pubkeys_cnt without needing to re-derive the live-account set. */
2152 7452 : for( ulong j=0UL; j<FD_ACCDB_CACHE_CLASS_CNT; j++ ) {
2153 6624 : if( FD_UNLIKELY( accdb->shmem->cache_class_used[ j ].val!=ULONG_MAX ) ) {
2154 36 : requested_buckets[ j ]++;
2155 36 : }
2156 6624 : }
2157 828 : }
2158 247536 : }
2159 :
2160 : /* TODO: This over-reserves cache slots for writable accounts that
2161 : already exist. For each such account we reserve one line in the
2162 : account's size class (for the read into cache) AND one line in
2163 : every size class (for the write destination buffers). But if the
2164 : account is already resident in cache (which is the common case
2165 : for hot accounts), the read-into-cache line is unnecessary — we
2166 : will get a cache hit in step 4 and never use it. The fix is to
2167 : probe acc->cache_idx here and skip the per-account size class
2168 : reservation per-account size class reservation when a hit is
2169 : found. This would reduce peak reservation by up to one line per
2170 : writable account per acquire batch, lowering contention on the
2171 : cache class counters and allowing smaller cache provisioning. */
2172 :
2173 : /* Reserve cache slots by atomically incrementing the shared used
2174 : counters. If any class exceeds its max, the reservation
2175 : overflowed — subtract back partial grabs and retry. */
2176 246882 : for(;;) {
2177 246882 : int acquire_failed = 0;
2178 246882 : ulong grabbed[ FD_ACCDB_CACHE_CLASS_CNT ] = {0};
2179 2221938 : for( ulong i=0UL; i<FD_ACCDB_CACHE_CLASS_CNT; i++ ) {
2180 1975056 : if( FD_LIKELY( !requested_buckets[ i ] ) ) continue;
2181 72 : ulong new_used = FD_ATOMIC_ADD_AND_FETCH( &accdb->shmem->cache_class_used[ i ].val, requested_buckets[ i ] );
2182 72 : if( FD_UNLIKELY( new_used>accdb->shmem->cache_class_max[ i ] ) ) {
2183 0 : FD_ATOMIC_FETCH_AND_SUB( &accdb->shmem->cache_class_used[ i ].val, requested_buckets[ i ] );
2184 0 : acquire_failed = 1;
2185 72 : } else {
2186 72 : grabbed[ i ] = requested_buckets[ i ];
2187 72 : }
2188 72 : if( FD_UNLIKELY( acquire_failed ) ) {
2189 0 : accdb->metrics->acquire_failed++;
2190 0 : for( ulong j=0UL; j<i; j++ ) {
2191 0 : if( grabbed[ j ] ) FD_ATOMIC_FETCH_AND_SUB( &accdb->shmem->cache_class_used[ j ].val, grabbed[ j ] );
2192 0 : }
2193 0 : FD_SPIN_PAUSE();
2194 0 : break;
2195 0 : }
2196 72 : }
2197 246882 : if( FD_LIKELY( !acquire_failed ) ) break;
2198 246882 : }
2199 246882 : }
2200 :
2201 : // STEP 4.
2202 : // For any accounts that are not in cache, we now need to actually
2203 : // retrieve the cache pointers from our structures. Space has been
2204 : // reserved already, so this step is guaranteed to succeed, and is
2205 : // just pulling the cache lines out of the free lists and marking
2206 : // them as in-use.
2207 : //
2208 : // This step is fully lock-free. Cache hits are pinned with an
2209 : // atomic CAS on refcnt (cache_try_pin). Eviction uses the CLOCK
2210 : // algorithm. The CAS free list provides immediate recycling of
2211 : // fully-freed lines.
2212 :
2213 247056 : int exists_in_cache[ FD_ACCDB_MAX_ACQUIRE_CNT ];
2214 247056 : fd_accdb_cache_line_t * original_cache_line[ FD_ACCDB_MAX_ACQUIRE_CNT ];
2215 247056 : fd_accdb_cache_line_t * destination_cache_lines[ FD_ACCDB_MAX_ACQUIRE_CNT ][ FD_ACCDB_CACHE_CLASS_CNT ];
2216 :
2217 : /* Saved acc_pool indices of evicted dirty cache lines. These are
2218 : captured before clearing acc_idx to UINT_MAX on the line struct, so
2219 : that the sentinel protocol (step 14) works correctly while the
2220 : evicted account metadata is still available for writeback in steps
2221 : 4 and 6. */
2222 247056 : uint evicted_dest_acc[ FD_ACCDB_MAX_ACQUIRE_CNT ][ FD_ACCDB_CACHE_CLASS_CNT ];
2223 247056 : uint evicted_orig_acc[ FD_ACCDB_MAX_ACQUIRE_CNT ];
2224 :
2225 494598 : for( ulong i=0UL; i<pubkeys_cnt; i++ ) {
2226 247542 : if( FD_UNLIKELY( !accmetas[ i ] && !writable[ i ] ) ) continue;
2227 :
2228 171342 : original_cache_line[ i ] = NULL;
2229 171342 : if( FD_LIKELY( accmetas[ i ] ) ) {
2230 79218 : if( FD_LIKELY( FD_ACCDB_SIZE_CACHE_VALID( FD_VOLATILE_CONST( accmetas[ i ]->executable_size ) ) ) ) {
2231 : /* Concurrent evict_clear_acc_cache_ref clears VALID then stores
2232 : cache_idx=INVAL. We may have observed VALID=1 just before the
2233 : writer cleared it, so cidx can read as INVAL here; decoding it
2234 : would yield a wild cache_line pointer. Skip on INVAL. Any
2235 : other stale cidx is harmless: cache_try_pin's ABA generation
2236 : check rejects a recycled line. */
2237 79194 : uint cidx = FD_VOLATILE_CONST( accmetas[ i ]->cache_idx );
2238 79194 : if( FD_LIKELY( cidx!=FD_ACCDB_ACC_CIDX_INVAL ) ) {
2239 79194 : fd_accdb_cache_line_t * hit = cache_line( accdb, FD_ACCDB_ACC_CIDX_CLASS( cidx ), FD_ACCDB_ACC_CIDX_IDX( cidx ) );
2240 79194 : fd_racesan_hook( "accdb_acquire:pre_try_pin" );
2241 79194 : original_cache_line[ i ] = cache_try_pin( hit, pubkeys[ i ], accmetas[ i ]->key.generation );
2242 : #if FD_TMPL_USE_HANDHOLDING
2243 : if( FD_LIKELY( original_cache_line[ i ] ) ) {
2244 : FD_TEST( original_cache_line[ i ]->key.generation==accmetas[ i ]->key.generation &&
2245 : !memcmp( original_cache_line[ i ]->key.pubkey, pubkeys[ i ], 32UL ) );
2246 : uint rc = FD_VOLATILE_CONST( original_cache_line[ i ]->refcnt );
2247 : FD_TEST( rc>0U && rc!=FD_ACCDB_EVICT_SENTINEL );
2248 : }
2249 : #endif
2250 79194 : }
2251 79194 : }
2252 79218 : }
2253 171342 : exists_in_cache[ i ] = original_cache_line[ i ]!=NULL;
2254 :
2255 171342 : if( FD_UNLIKELY( writable[ i ] ) ) {
2256 934524 : for( ulong j=0UL; j<FD_ACCDB_CACHE_CLASS_CNT; j++ ) destination_cache_lines[ i ][ j ] = acquire_cache_line( accdb, j, &evicted_dest_acc[ i ][ j ] );
2257 103836 : if( FD_UNLIKELY( accmetas[ i ] && !original_cache_line[ i ] ) ) {
2258 0 : original_cache_line[ i ] = cold_load_acc( accdb, accmetas[ i ], pubkeys[ i ], &exists_in_cache[ i ], &evicted_orig_acc[ i ] );
2259 0 : }
2260 103836 : } else {
2261 67506 : if( FD_UNLIKELY( !original_cache_line[ i ] ) ) {
2262 24 : original_cache_line[ i ] = cold_load_acc( accdb, accmetas[ i ], pubkeys[ i ], &exists_in_cache[ i ], &evicted_orig_acc[ i ] );
2263 24 : }
2264 67506 : }
2265 171342 : }
2266 :
2267 : // STEP 5.
2268 : // For any cache lines we have retrieved, which we might potentially
2269 : // be about to trash (by writing stuff in there), we need to write
2270 : // them back to disk first if they are dirty. This is the proces of
2271 : // "persisting" (a/k/a evicting) whatever was previously in the
2272 : // cache line we are about to use.
2273 : //
2274 : // This step does not actually persist the data to disk, it just
2275 : // constructs a series of iovecs (write instructions) which will be
2276 : // used later to do the actual write. The reason is that we want to
2277 : // batch all the writes together into a single writev call, to
2278 : // minimize overhead, and also keep the actual writes at the end of
2279 : // the function and independent of the specific control flow, so
2280 : // that they could be offloaded to another thread of made
2281 : // asynchronous (e.g. with io_uring) in the future without needing
2282 : // to change the rest of the logic.
2283 :
2284 247056 : int write_ops_cnt = 0;
2285 247056 : int write_meta_cnt = 0;
2286 247056 : ulong total_write_sz = 0UL;
2287 247056 : fd_accdb_disk_meta_t write_metas[ (FD_ACCDB_CACHE_CLASS_CNT+1UL)*FD_ACCDB_MAX_ACQUIRE_CNT ];
2288 247056 : struct iovec write_ops[ 2UL*(FD_ACCDB_CACHE_CLASS_CNT+1UL)*FD_ACCDB_MAX_ACQUIRE_CNT ];
2289 :
2290 494598 : for( ulong i=0UL; i<pubkeys_cnt; i++ ) {
2291 247542 : if( FD_UNLIKELY( !accmetas[ i ] && !writable[ i ] ) ) continue;
2292 :
2293 171342 : if( FD_UNLIKELY( writable[ i ] ) ) {
2294 934524 : for( ulong j=0UL; j<FD_ACCDB_CACHE_CLASS_CNT; j++ ) {
2295 830688 : if( FD_LIKELY( evicted_dest_acc[ i ][ j ]==UINT_MAX ) ) continue;
2296 0 : accdb->metrics->accounts_evicted++;
2297 0 : accdb->metrics->accounts_evicted_per_class[ j ]++;
2298 :
2299 0 : fd_accdb_accmeta_t const * evicted = &accdb->acc_pool[ evicted_dest_acc[ i ][ j ] ];
2300 0 : fd_racesan_hook( "writeback:pre_synth" );
2301 0 : total_write_sz += sizeof(fd_accdb_disk_meta_t) + FD_ACCDB_SIZE_DATA( evicted->executable_size );
2302 0 : FD_TEST( write_meta_cnt<(int)(sizeof(write_metas)/sizeof(write_metas[0])) );
2303 0 : fd_memcpy( write_metas[ write_meta_cnt ].pubkey, evicted->key.pubkey, 32UL );
2304 0 : write_metas[ write_meta_cnt ].size = FD_ACCDB_SIZE_DATA( evicted->executable_size );
2305 0 : fd_memcpy( write_metas[ write_meta_cnt ].owner, destination_cache_lines[ i ][ j ]->owner, 32UL );
2306 0 : write_ops[ write_ops_cnt++ ] = (struct iovec){ .iov_base = &write_metas[ write_meta_cnt ], .iov_len = sizeof(fd_accdb_disk_meta_t) };
2307 0 : write_meta_cnt++;
2308 0 : write_ops[ write_ops_cnt++ ] = (struct iovec){ .iov_base = destination_cache_lines[ i ][ j ]+1UL, .iov_len = FD_ACCDB_SIZE_DATA( evicted->executable_size ) };
2309 0 : }
2310 103836 : if( FD_UNLIKELY( accmetas[ i ] && !exists_in_cache[ i ] && evicted_orig_acc[ i ]!=UINT_MAX ) ) {
2311 0 : fd_accdb_accmeta_t const * evicted = &accdb->acc_pool[ evicted_orig_acc[ i ] ];
2312 0 : accdb->metrics->accounts_evicted++;
2313 0 : accdb->metrics->accounts_evicted_per_class[ fd_accdb_cache_class( FD_ACCDB_SIZE_DATA( evicted->executable_size ) ) ]++;
2314 :
2315 0 : total_write_sz += sizeof(fd_accdb_disk_meta_t) + FD_ACCDB_SIZE_DATA( evicted->executable_size );
2316 0 : FD_TEST( write_meta_cnt<(int)(sizeof(write_metas)/sizeof(write_metas[0])) );
2317 0 : fd_memcpy( write_metas[ write_meta_cnt ].pubkey, evicted->key.pubkey, 32UL );
2318 0 : write_metas[ write_meta_cnt ].size = FD_ACCDB_SIZE_DATA( evicted->executable_size );
2319 0 : fd_memcpy( write_metas[ write_meta_cnt ].owner, original_cache_line[ i ]->owner, 32UL );
2320 0 : write_ops[ write_ops_cnt++ ] = (struct iovec){ .iov_base = &write_metas[ write_meta_cnt ], .iov_len = sizeof(fd_accdb_disk_meta_t) };
2321 0 : write_meta_cnt++;
2322 0 : write_ops[ write_ops_cnt++ ] = (struct iovec){ .iov_base = original_cache_line[ i ]+1UL, .iov_len = FD_ACCDB_SIZE_DATA( evicted->executable_size ) };
2323 0 : }
2324 103836 : } else {
2325 67506 : if( FD_LIKELY( exists_in_cache[ i ] || evicted_orig_acc[ i ]==UINT_MAX ) ) continue;
2326 0 : fd_accdb_accmeta_t const * evicted = &accdb->acc_pool[ evicted_orig_acc[ i ] ];
2327 0 : accdb->metrics->accounts_evicted++;
2328 0 : accdb->metrics->accounts_evicted_per_class[ fd_accdb_cache_class( FD_ACCDB_SIZE_DATA( evicted->executable_size ) ) ]++;
2329 0 : total_write_sz += sizeof(fd_accdb_disk_meta_t) + FD_ACCDB_SIZE_DATA( evicted->executable_size );
2330 0 : FD_TEST( write_meta_cnt<(int)(sizeof(write_metas)/sizeof(write_metas[0])) );
2331 0 : fd_memcpy( write_metas[ write_meta_cnt ].pubkey, evicted->key.pubkey, 32UL );
2332 0 : write_metas[ write_meta_cnt ].size = FD_ACCDB_SIZE_DATA( evicted->executable_size );
2333 0 : fd_memcpy( write_metas[ write_meta_cnt ].owner, original_cache_line[ i ]->owner, 32UL );
2334 0 : write_ops[ write_ops_cnt++ ] = (struct iovec){ .iov_base = &write_metas[ write_meta_cnt ], .iov_len = sizeof(fd_accdb_disk_meta_t) };
2335 0 : write_meta_cnt++;
2336 0 : write_ops[ write_ops_cnt++ ] = (struct iovec){ .iov_base = original_cache_line[ i ]+1UL, .iov_len = FD_ACCDB_SIZE_DATA( evicted->executable_size ) };
2337 0 : }
2338 171342 : }
2339 :
2340 : // STEP 6-7.
2341 : // Compute the file offset for the writes we are about to do and
2342 : // build the pending offset table. The common case is a single
2343 : // atomic fetch-add on the write head, reserving a contiguous
2344 : // region. If the total eviction batch is too large to fit in one
2345 : // partition (extremely unlikely — requires many dirty 10MiB
2346 : // evictions), fall back to per-entry allocation so that each
2347 : // individual write fits in a single partition.
2348 : //
2349 : // The actual stores to evicted->offset_fork and line->persisted
2350 : // are deferred until after pwritev2 completes (Step 9-10), so
2351 : // a concurrent acquire spinning on offset==FD_ACCDB_OFF_INVAL
2352 : // does not proceed to preadv2 from a location that hasn't been
2353 : // written.
2354 247056 : int pending_cnt = 0;
2355 247056 : fd_accdb_accmeta_t * pending_accs [ (FD_ACCDB_CACHE_CLASS_CNT+1UL)*FD_ACCDB_MAX_ACQUIRE_CNT ];
2356 247056 : ulong pending_offs [ (FD_ACCDB_CACHE_CLASS_CNT+1UL)*FD_ACCDB_MAX_ACQUIRE_CNT ];
2357 247056 : fd_accdb_cache_line_t * pending_lines[ (FD_ACCDB_CACHE_CLASS_CNT+1UL)*FD_ACCDB_MAX_ACQUIRE_CNT ];
2358 :
2359 247056 : ulong file_offset;
2360 247056 : int batch_contiguous;
2361 247056 : if( FD_LIKELY( total_write_sz && total_write_sz<=accdb->shmem->partition_sz ) ) {
2362 0 : file_offset = allocate_next_write( accdb, total_write_sz );
2363 0 : batch_contiguous = 1;
2364 247056 : } else {
2365 247056 : file_offset = 0UL;
2366 247056 : batch_contiguous = 0;
2367 247056 : }
2368 :
2369 247056 : ulong cumulative_offset = 0UL;
2370 494598 : for( ulong i=0UL; i<pubkeys_cnt; i++ ) {
2371 247542 : if( FD_UNLIKELY( !accmetas[ i ] && !writable[ i ] ) ) continue;
2372 :
2373 171342 : if( FD_UNLIKELY( writable[ i ] ) ) {
2374 934524 : for( ulong j=0UL; j<FD_ACCDB_CACHE_CLASS_CNT; j++ ) {
2375 830688 : if( FD_LIKELY( evicted_dest_acc[ i ][ j ]==UINT_MAX ) ) continue;
2376 :
2377 0 : fd_accdb_accmeta_t * evicted = &accdb->acc_pool[ evicted_dest_acc[ i ][ j ] ];
2378 0 : ulong entry_sz = sizeof(fd_accdb_disk_meta_t) + (ulong)FD_ACCDB_SIZE_DATA( evicted->executable_size );
2379 : /* xchg-to-INVAL atomically captures the old offset and prevents
2380 : a concurrent acc_unlink from also reading and freeing it (the
2381 : xchg there will see INVAL and skip). Step 10 republishes the
2382 : new offset; the spinner at line ~2082 tolerates the transient
2383 : INVAL. Same pattern as the overwrite path at line ~2388. */
2384 0 : ulong old_off = fd_accdb_acc_xchg_offset( evicted, FD_ACCDB_OFF_INVAL );
2385 0 : if( FD_LIKELY( old_off!=FD_ACCDB_OFF_INVAL ) ) {
2386 0 : fd_accdb_shmem_bytes_freed( accdb->shmem, old_off, entry_sz );
2387 0 : FD_ATOMIC_FETCH_AND_SUB( &accdb->shmem->shmetrics->disk_used_bytes, entry_sz );
2388 0 : }
2389 0 : FD_TEST( pending_cnt<(int)(sizeof(pending_accs)/sizeof(pending_accs[0])) );
2390 0 : pending_accs [ pending_cnt ] = evicted;
2391 0 : if( FD_LIKELY( batch_contiguous ) ) pending_offs[ pending_cnt ] = file_offset + cumulative_offset;
2392 0 : else pending_offs[ pending_cnt ] = allocate_next_write( accdb, entry_sz );
2393 0 : pending_lines[ pending_cnt ] = destination_cache_lines[ i ][ j ];
2394 0 : pending_cnt++;
2395 0 : cumulative_offset += entry_sz;
2396 0 : FD_ATOMIC_FETCH_AND_ADD( &accdb->shmem->shmetrics->disk_used_bytes, entry_sz );
2397 0 : }
2398 103836 : if( FD_UNLIKELY( accmetas[ i ] && !exists_in_cache[ i ] && evicted_orig_acc[ i ]!=UINT_MAX ) ) {
2399 0 : fd_accdb_accmeta_t * evicted = &accdb->acc_pool[ evicted_orig_acc[ i ] ];
2400 0 : ulong entry_sz = sizeof(fd_accdb_disk_meta_t) + (ulong)FD_ACCDB_SIZE_DATA( evicted->executable_size );
2401 0 : ulong old_off = fd_accdb_acc_xchg_offset( evicted, FD_ACCDB_OFF_INVAL );
2402 0 : if( FD_LIKELY( old_off!=FD_ACCDB_OFF_INVAL ) ) {
2403 0 : fd_accdb_shmem_bytes_freed( accdb->shmem, old_off, entry_sz );
2404 0 : FD_ATOMIC_FETCH_AND_SUB( &accdb->shmem->shmetrics->disk_used_bytes, entry_sz );
2405 0 : }
2406 0 : FD_TEST( pending_cnt<(int)(sizeof(pending_accs)/sizeof(pending_accs[0])) );
2407 0 : pending_accs [ pending_cnt ] = evicted;
2408 0 : if( FD_LIKELY( batch_contiguous ) ) pending_offs[ pending_cnt ] = file_offset + cumulative_offset;
2409 0 : else pending_offs[ pending_cnt ] = allocate_next_write( accdb, entry_sz );
2410 0 : pending_lines[ pending_cnt ] = original_cache_line[ i ];
2411 0 : pending_cnt++;
2412 0 : cumulative_offset += entry_sz;
2413 0 : FD_ATOMIC_FETCH_AND_ADD( &accdb->shmem->shmetrics->disk_used_bytes, entry_sz );
2414 0 : }
2415 103836 : } else {
2416 67506 : if( FD_LIKELY( exists_in_cache[ i ] || evicted_orig_acc[ i ]==UINT_MAX ) ) continue;
2417 :
2418 0 : fd_accdb_accmeta_t * evicted = &accdb->acc_pool[ evicted_orig_acc[ i ] ];
2419 0 : ulong entry_sz = sizeof(fd_accdb_disk_meta_t) + (ulong)FD_ACCDB_SIZE_DATA( evicted->executable_size );
2420 0 : ulong old_off = fd_accdb_acc_xchg_offset( evicted, FD_ACCDB_OFF_INVAL );
2421 0 : if( FD_LIKELY( old_off!=FD_ACCDB_OFF_INVAL ) ) {
2422 0 : fd_accdb_shmem_bytes_freed( accdb->shmem, old_off, entry_sz );
2423 0 : FD_ATOMIC_FETCH_AND_SUB( &accdb->shmem->shmetrics->disk_used_bytes, entry_sz );
2424 0 : }
2425 0 : FD_TEST( pending_cnt<(int)(sizeof(pending_accs)/sizeof(pending_accs[0])) );
2426 0 : pending_accs [ pending_cnt ] = evicted;
2427 0 : if( FD_LIKELY( batch_contiguous ) ) pending_offs[ pending_cnt ] = file_offset + cumulative_offset;
2428 0 : else pending_offs[ pending_cnt ] = allocate_next_write( accdb, entry_sz );
2429 0 : pending_lines[ pending_cnt ] = original_cache_line[ i ];
2430 0 : pending_cnt++;
2431 0 : cumulative_offset += entry_sz;
2432 0 : FD_ATOMIC_FETCH_AND_ADD( &accdb->shmem->shmetrics->disk_used_bytes, entry_sz );
2433 0 : }
2434 171342 : }
2435 :
2436 : // STEP 8.
2437 : // Fill the output entries with cache pointers and metadata based on
2438 : // the accounts we have located and the cache lines we have
2439 : // reserved.
2440 :
2441 494598 : for( ulong i=0UL; i<pubkeys_cnt; i++ ) {
2442 247542 : if( FD_UNLIKELY( !accmetas[ i ] && !writable[ i ] ) ) {
2443 76200 : out_accs[ i ].data = NULL;
2444 76200 : out_accs[ i ].data_len = 0UL;
2445 76200 : out_accs[ i ].lamports = 0UL;
2446 76200 : out_accs[ i ].executable = 0;
2447 76200 : memset( out_accs[ i ].owner, 0, 32UL );
2448 76200 : fd_memcpy( out_accs[ i ].pubkey, pubkeys[ i ], 32UL );
2449 76200 : out_accs[ i ].prior_lamports = 0UL;
2450 76200 : out_accs[ i ].prior_data_len = 0UL;
2451 76200 : out_accs[ i ].prior_executable = 0;
2452 76200 : memset( out_accs[ i ].prior_owner, 0, 32UL );
2453 76200 : out_accs[ i ].prior_data = NULL;
2454 76200 : out_accs[ i ].commit = 0;
2455 76200 : out_accs[ i ]._writable = 0;
2456 76200 : out_accs[ i ]._original_size_class = ULONG_MAX;
2457 76200 : out_accs[ i ]._original_cache_idx = ULONG_MAX;
2458 76200 : continue;
2459 76200 : }
2460 :
2461 171342 : if( FD_LIKELY( !writable[ i ] ) ) out_accs[ i ].data = (uchar *)(original_cache_line[ i ]+1UL);
2462 103836 : else out_accs[ i ].data = (uchar *)(destination_cache_lines[ i ][ 7UL ]+1UL);
2463 : /* Tombstone reset: agave's account loader returns AccountSharedData::default()
2464 : (System owner, empty data, exec=0) for any account with lamports==0.
2465 : https://github.com/anza-xyz/agave/blob/v2.3.1/svm/src/account_loader.rs#L199-L228 */
2466 171342 : fd_racesan_hook( "accdb_acquire:pre_step7_meta" );
2467 171342 : int tombstone = accmetas[ i ] && accmetas[ i ]->lamports==0UL;
2468 171342 : out_accs[ i ].data_len = ( accmetas[ i ] && !tombstone ) ? FD_ACCDB_SIZE_DATA( accmetas[ i ]->executable_size ) : 0UL;
2469 171342 : out_accs[ i ].executable = ( accmetas[ i ] && !tombstone ) ? FD_ACCDB_SIZE_EXEC( accmetas[ i ]->executable_size ) : 0;
2470 171342 : fd_racesan_hook( "accdb_acquire:mid_step7_meta" );
2471 171342 : out_accs[ i ].lamports = accmetas[ i ] ? accmetas[ i ]->lamports : 0UL;
2472 171342 : if( FD_UNLIKELY( !accmetas[ i ] ) ) memset( out_accs[ i ].owner, 0, 32UL );
2473 : /* For accmetas[i] != NULL, the owner is copied from the cache line
2474 : below in step 15, after step 12 has populated it from disk for
2475 : cold loads. */
2476 :
2477 171342 : out_accs[ i ].prior_lamports = out_accs[ i ].lamports;
2478 171342 : out_accs[ i ].prior_data_len = out_accs[ i ].data_len;
2479 171342 : out_accs[ i ].prior_executable = out_accs[ i ].executable;
2480 171342 : out_accs[ i ].prior_data = (uchar *)(original_cache_line[ i ] ? (original_cache_line[ i ]+1UL) : NULL);
2481 :
2482 171342 : out_accs[ i ].commit = 0;
2483 171342 : out_accs[ i ]._writable = writable[ i ];
2484 171342 : if( FD_UNLIKELY( writable[ i ] && accmetas[ i ] ) ) out_accs[ i ]._overwrite = accdb->fork_pool[ fork_id.val ].shmem->generation==accmetas[ i ]->key.generation;
2485 159630 : else out_accs[ i ]._overwrite = 0;
2486 :
2487 171342 : FD_TEST( out_accs[ i ].data_len<=(10UL<<20) );
2488 171342 : FD_TEST( !out_accs[ i ]._overwrite || accdb->fork_pool[ fork_id.val ].shmem->generation==accmetas[ i ]->key.generation );
2489 :
2490 : #if FD_TMPL_USE_HANDHOLDING
2491 : if( FD_UNLIKELY( !writable[ i ] && accmetas[ i ] && !tombstone ) ) {
2492 : ulong cls = fd_accdb_cache_class( FD_ACCDB_SIZE_DATA( accmetas[ i ]->executable_size ) );
2493 : FD_TEST( fd_accdb_ptr_in_region( accdb, cls, out_accs[ i ].data ) );
2494 : }
2495 : #endif
2496 :
2497 171342 : if( FD_UNLIKELY( writable[ i ] ) ) {
2498 103836 : out_accs[ i ]._fork_id = fork_id.val;
2499 103836 : out_accs[ i ]._generation = fork->shmem->generation;
2500 103836 : out_accs[ i ]._acc_map_idx = acc_map_idxs[ i ];
2501 103836 : }
2502 171342 : fd_memcpy( out_accs[ i ].pubkey, pubkeys[ i ], 32UL );
2503 :
2504 171342 : if( FD_UNLIKELY( !accmetas[ i ] ) ) {
2505 92124 : out_accs[ i ]._original_size_class = ULONG_MAX;
2506 92124 : out_accs[ i ]._original_cache_idx = ULONG_MAX;
2507 92124 : } else {
2508 79218 : out_accs[ i ]._original_size_class = fd_accdb_cache_class( FD_ACCDB_SIZE_DATA( accmetas[ i ]->executable_size ) );
2509 79218 : out_accs[ i ]._original_cache_idx = cache_line_idx( accdb, out_accs[ i ]._original_size_class, original_cache_line[ i ] );
2510 79218 : }
2511 :
2512 171342 : if( FD_UNLIKELY( writable[ i ] ) ) {
2513 934524 : for( ulong j=0UL; j<FD_ACCDB_CACHE_CLASS_CNT; j++ ) {
2514 830688 : out_accs[ i ]._write.destination_cache_idx[ j ] = cache_line_idx( accdb, j, destination_cache_lines[ i ][ j ] );
2515 830688 : }
2516 103836 : }
2517 171342 : }
2518 :
2519 : // STEP 9.
2520 : // Write the dirty eviction data to disk and publish the new offsets
2521 : // BEFORE constructing read iovecs. This is critical: step 4 may
2522 : // have evicted a dirty cache line belonging to another account in
2523 : // the same batch whose acc->offset is still FD_ACCDB_OFF_INVAL.
2524 : // The read-iovec loop below spin-waits on
2525 : // offset!=FD_ACCDB_OFF_INVAL, so publishing evicted offsets first
2526 : // prevents an intra-batch deadlock where the thread waits on an
2527 : // offset that only it can resolve.
2528 247056 : if( FD_LIKELY( batch_contiguous ) ) {
2529 : /* Fast path: all evictions fit in one contiguous region. Use the
2530 : pre-built iovec array for a single batched pwritev2 call. */
2531 0 : ulong bytes_written = 0UL;
2532 0 : struct iovec * write_ptr = write_ops;
2533 0 : while( FD_LIKELY( bytes_written<total_write_sz ) ) {
2534 0 : long result = pwritev2( accdb->fd, write_ptr, fd_int_min( write_ops_cnt, IOV_MAX ), (long)(file_offset+bytes_written), 0 );
2535 0 : if( FD_UNLIKELY( -1==result && (errno==EINTR || errno==EAGAIN || errno==EWOULDBLOCK ) ) ) continue;
2536 0 : else if( FD_UNLIKELY( -1==result ) ) FD_LOG_ERR(( "pwritev2() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
2537 0 : else if( FD_UNLIKELY( !result ) ) FD_LOG_ERR(( "accounts database is corrupt, pwritev2() returned 0 at offset %lu with %lu bytes remaining",
2538 0 : file_offset+bytes_written, total_write_sz-bytes_written ));
2539 0 : bytes_written += (ulong)result;
2540 0 : accdb->metrics->bytes_written += (ulong)result;
2541 0 : accdb->metrics->write_ops++;
2542 :
2543 0 : while( write_ops_cnt && (ulong)result>=(ulong)write_ptr[ 0 ].iov_len ) {
2544 0 : result -= (long)write_ptr[ 0 ].iov_len;
2545 0 : write_ptr++;
2546 0 : write_ops_cnt--;
2547 0 : }
2548 0 : if( FD_LIKELY( write_ops_cnt ) ) {
2549 0 : write_ptr[ 0 ].iov_base = (uchar *)write_ptr[ 0 ].iov_base + result;
2550 0 : write_ptr[ 0 ].iov_len -= (ulong)result;
2551 0 : }
2552 0 : }
2553 247056 : } else {
2554 : /* Slow path: total eviction batch exceeds a single partition.
2555 : Write each entry individually using its own allocated offset.
2556 : This path is only taken in extreme edge cases (many concurrent
2557 : dirty 10 MiB evictions). */
2558 247056 : struct iovec * wp = write_ops;
2559 247056 : for( int k=0; k<pending_cnt; k++ ) {
2560 0 : ulong entry_sz = sizeof(fd_accdb_disk_meta_t) + (ulong)FD_ACCDB_SIZE_DATA( pending_accs[ k ]->executable_size );
2561 0 : ulong entry_off = pending_offs[ k ];
2562 0 : struct iovec entry_iovs[2] = { wp[0], wp[1] };
2563 0 : wp += 2;
2564 :
2565 0 : ulong written = 0UL;
2566 0 : while( FD_LIKELY( written<entry_sz ) ) {
2567 0 : long result = pwritev2( accdb->fd, entry_iovs, 2, (long)(entry_off+written), 0 );
2568 0 : if( FD_UNLIKELY( -1==result && (errno==EINTR || errno==EAGAIN || errno==EWOULDBLOCK ) ) ) continue;
2569 0 : else if( FD_UNLIKELY( -1==result ) ) FD_LOG_ERR(( "pwritev2() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
2570 0 : else if( FD_UNLIKELY( !result ) ) FD_LOG_ERR(( "accounts database is corrupt, pwritev2() returned 0 at offset %lu with %lu bytes remaining", entry_off+written, entry_sz-written ));
2571 0 : written += (ulong)result;
2572 0 : accdb->metrics->bytes_written += (ulong)result;
2573 0 : accdb->metrics->write_ops++;
2574 :
2575 0 : for( int v=0; v<2; v++ ) {
2576 0 : if( (ulong)result>=(ulong)entry_iovs[ v ].iov_len ) {
2577 0 : result -= (long)entry_iovs[ v ].iov_len;
2578 0 : entry_iovs[ v ].iov_len = 0UL;
2579 0 : } else {
2580 0 : entry_iovs[ v ].iov_base = (uchar *)entry_iovs[ v ].iov_base + result;
2581 0 : entry_iovs[ v ].iov_len -= (ulong)result;
2582 0 : break;
2583 0 : }
2584 0 : }
2585 0 : }
2586 0 : }
2587 247056 : }
2588 :
2589 : // STEP 10.
2590 : // Now that the data is on disk, publish the evicted account offsets
2591 : // so concurrent acquire threads spinning on
2592 : // offset==FD_ACCDB_OFF_INVAL can proceed. The fence ensures
2593 : // pwritev2 data is globally visible before the offset stores.
2594 247056 : FD_COMPILER_MFENCE();
2595 247056 : for( int k=0; k<pending_cnt; k++ ) {
2596 0 : pending_accs[ k ]->offset_fork = fd_accdb_acc_pack_offset_fork( pending_offs[ k ], fd_accdb_acc_fork_id(pending_accs[ k ]) );
2597 0 : pending_lines[ k ]->persisted = 1;
2598 0 : }
2599 :
2600 : // STEP 11.
2601 : // Now construct iovecs for any reads we need to do of accounts into
2602 : // the cache. For reading accounts, we read them directly into the
2603 : // sole cache line we took (and maybe just evicted). For writing
2604 : // accounts, we read them into the right sized cache line, and later
2605 : // it will be copied to the staging buffer. This is to prevent
2606 : // repeatedly reading the same account off disk into cache, if it is
2607 : // being written cold multiple times and every write fails.
2608 :
2609 247056 : ulong read_ops_cnt = 0UL;
2610 247056 : ulong read_offsets[ FD_ACCDB_CACHE_CLASS_CNT*FD_ACCDB_MAX_ACQUIRE_CNT ];
2611 247056 : uchar * read_bases[ FD_ACCDB_CACHE_CLASS_CNT*FD_ACCDB_MAX_ACQUIRE_CNT ];
2612 247056 : ulong read_sizes[ FD_ACCDB_CACHE_CLASS_CNT*FD_ACCDB_MAX_ACQUIRE_CNT ];
2613 247056 : struct iovec read_ops[ FD_ACCDB_CACHE_CLASS_CNT*FD_ACCDB_MAX_ACQUIRE_CNT ];
2614 :
2615 494598 : for( ulong i=0UL; i<pubkeys_cnt; i++ ) {
2616 247542 : if( FD_UNLIKELY( !accmetas[ i ] || exists_in_cache[ i ] ) ) continue;
2617 :
2618 24 : accdb->metrics->accounts_not_found_per_class[ fd_accdb_cache_class( FD_ACCDB_SIZE_DATA( accmetas[ i ]->executable_size ) ) ]++;
2619 :
2620 : /* Tombstones (lamports==0) have no on-disk payload to read, and
2621 : background_advance_root may unlink the acc and never assign it a
2622 : disk offset, so the offset_fork spin below would hang forever.
2623 : Step 15's tombstone reset zeros the owner for these accounts. */
2624 24 : if( FD_UNLIKELY( !accmetas[ i ]->lamports ) ) continue;
2625 :
2626 : /* We are guaranteed that if an account is in the cache, the bytes
2627 : are available (all cache operations are atomic via refcnt CAS),
2628 : but we are not guaranteed that if something is _not_ in the cache
2629 : that it has been written back to disk yet. In paticular, if we
2630 : are trying to read an account that another thread is in the
2631 : process of evicting, we know they removed it from the cache, but
2632 : we don't know exactly when they will have written it back fully
2633 : to disk, so we may need to wait for that here.
2634 :
2635 : Compaction may concurrently relocate this record, but
2636 : epoch-based safe reclamation guarantees the source partition
2637 : is not freed until all epoch-protected operations that could
2638 : have snapshotted the old offset have exited. So the data at the
2639 : snapshotted offset remains stable for the duration of our
2640 : read and no post-read validation is needed. */
2641 24 : ulong off_packed = FD_VOLATILE_CONST( accmetas[ i ]->offset_fork );
2642 24 : while( FD_UNLIKELY( (off_packed & FD_ACCDB_OFF_MASK)==FD_ACCDB_OFF_INVAL ) ) {
2643 0 : FD_SPIN_PAUSE();
2644 0 : off_packed = FD_VOLATILE_CONST( accmetas[ i ]->offset_fork );
2645 0 : }
2646 24 : fd_racesan_hook( "accdb_coldload:pre_iovec" );
2647 :
2648 24 : read_offsets[ read_ops_cnt ] = fd_accdb_acc_offset(accmetas[ i ]) + offsetof(fd_accdb_disk_meta_t, owner);
2649 24 : read_bases[ read_ops_cnt ] = original_cache_line[ i ]->owner;
2650 24 : read_sizes[ read_ops_cnt ] = 32UL + FD_ACCDB_SIZE_DATA( accmetas[ i ]->executable_size );
2651 24 : read_ops[ read_ops_cnt++ ] = (struct iovec){ .iov_base = original_cache_line[ i ]->owner, .iov_len = 32UL + FD_ACCDB_SIZE_DATA( accmetas[ i ]->executable_size ) };
2652 24 : }
2653 :
2654 : // STEP 12.
2655 : // Almost done... now do the actual reads of accounts into cache,
2656 : // using the iovecs we constructed. This is basically the same loop
2657 : // as the writes, but with preadv2 instead of pwritev2, and that the
2658 : // reads are not necessarily all contiguous, but occur at random
2659 : // offsets.
2660 : //
2661 : // CONCURRENCY: The compaction tile may concurrently relocate a
2662 : // record we are about to read (both are epoch-protected). Epoch-
2663 : // based safe reclamation guarantees the source partition is not
2664 : // freed until all epoch-protected operations that could have
2665 : // snapshotted the old offset have exited, so the data at the
2666 : // remains stable for the duration of this read — no post-read
2667 : // validation or retry is needed.
2668 247080 : for( ulong i=0UL; i<read_ops_cnt; i++ ) {
2669 24 : ulong bytes_read = 0UL;
2670 48 : while( FD_LIKELY( bytes_read<read_sizes[ i ] ) ) {
2671 24 : long result = preadv2( accdb->fd, &read_ops[ i ], 1, (long)(read_offsets[ i ]+bytes_read), 0 );
2672 24 : if( FD_UNLIKELY( -1==result && (errno==EINTR || errno==EAGAIN || errno==EWOULDBLOCK ) ) ) continue;
2673 24 : else if( FD_UNLIKELY( -1==result ) ) FD_LOG_ERR(( "preadv2() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
2674 24 : else if( FD_UNLIKELY( !result ) ) FD_LOG_ERR(( "accounts database is corrupt, data expected at offset %lu with size %lu exceeded file extents",
2675 24 : read_offsets[ i ]+bytes_read, read_sizes[ i ] ));
2676 24 : fd_accdb_partition_read_bump( accdb, read_offsets[ i ]+bytes_read, (ulong)result );
2677 24 : bytes_read += (ulong)result;
2678 24 : accdb->metrics->bytes_read += (ulong)result;
2679 24 : accdb->metrics->read_ops++;
2680 :
2681 24 : read_ops[ i ].iov_base = read_bases[ i ] + bytes_read;
2682 24 : read_ops[ i ].iov_len = read_sizes[ i ] - bytes_read;
2683 24 : }
2684 24 : }
2685 :
2686 : // STEP 13.
2687 : // Publish the real acc index for any cache lines we just loaded
2688 : // from disk, so concurrent threads spinning on acc_idx==UINT_MAX
2689 : // can proceed. The fence ensures all preadv2 data is visible
2690 : // before the sentinel is cleared.
2691 247056 : FD_COMPILER_MFENCE();
2692 494598 : for( ulong i=0UL; i<pubkeys_cnt; i++ ) {
2693 247542 : if( FD_UNLIKELY( !accmetas[ i ] || exists_in_cache[ i ] ) ) continue;
2694 24 : FD_VOLATILE( original_cache_line[ i ]->acc_idx ) = (uint)( accmetas[ i ] - accdb->acc_pool );
2695 24 : FD_TEST( FD_VOLATILE_CONST( original_cache_line[ i ]->acc_idx )==(uint)( accmetas[ i ] - accdb->acc_pool ) );
2696 24 : }
2697 :
2698 : // STEP 14.
2699 : // Spin-wait for any cache lines found via acc->cache_idx that are
2700 : // still being loaded by another thread's preadv2. The loading
2701 : // thread sets acc_idx to UINT_MAX before publishing cache_idx
2702 : // and publishes the real acc index after its read completes.
2703 : // This step is placed as late as possible to give the loading
2704 : // thread maximum time to finish before we need to spin.
2705 494598 : for( ulong i=0UL; i<pubkeys_cnt; i++ ) {
2706 247542 : if( FD_UNLIKELY( !accmetas[ i ] && !writable[ i ] ) ) continue;
2707 :
2708 171342 : if( FD_UNLIKELY( !original_cache_line[ i ] ) ) continue;
2709 79218 : if( FD_LIKELY( FD_VOLATILE_CONST( original_cache_line[ i ]->acc_idx )!=UINT_MAX ) ) goto step13_check;
2710 0 : accdb->metrics->accounts_waited++;
2711 0 : while( FD_UNLIKELY( FD_VOLATILE_CONST( original_cache_line[ i ]->acc_idx )==UINT_MAX ) ) {
2712 0 : fd_racesan_hook( "accdb_acquire:step14_load_wait" );
2713 0 : FD_SPIN_PAUSE();
2714 0 : }
2715 79218 : step13_check:;
2716 : #if FD_TMPL_USE_HANDHOLDING
2717 : FD_TEST( original_cache_line[ i ]->key.generation==accmetas[ i ]->key.generation &&
2718 : !memcmp( original_cache_line[ i ]->key.pubkey, pubkeys[ i ], 32UL ) );
2719 : #endif
2720 79218 : }
2721 :
2722 : // STEP 15.
2723 : // Now that all reads from disk into original_cache_line have
2724 : // completed (and any concurrent loaders have published their
2725 : // acc_idx in step 14), copy the owner into the output entries.
2726 : // This must happen here rather than in step 8 because the cache
2727 : // line owner is only valid post-read for cold loads.
2728 494598 : for( ulong i=0UL; i<pubkeys_cnt; i++ ) {
2729 247542 : if( FD_UNLIKELY( !accmetas[ i ] ) ) continue;
2730 79218 : fd_racesan_hook( "accdb_acquire:pre_step14_owner" );
2731 : /* Tombstone reset: see STEP 7 comment. */
2732 79218 : if( FD_UNLIKELY( accmetas[ i ]->lamports==0UL ) ) {
2733 3 : memset( out_accs[ i ].owner, 0, 32UL );
2734 3 : memset( out_accs[ i ].prior_owner, 0, 32UL );
2735 79215 : } else {
2736 79215 : fd_memcpy( out_accs[ i ].owner, original_cache_line[ i ]->owner, 32UL );
2737 79215 : fd_memcpy( out_accs[ i ].prior_owner, original_cache_line[ i ]->owner, 32UL );
2738 79215 : }
2739 79218 : }
2740 :
2741 : // STEP 16.
2742 : // Finally, copy any accounts we are writing into the staging
2743 : // buffers, so they occupy a 10MiB cache line for the execution
2744 : // system.
2745 494598 : for( ulong i=0UL; i<pubkeys_cnt; i++ ) {
2746 247542 : if( FD_UNLIKELY( !accmetas[ i ] || !writable[ i ] ) ) continue;
2747 :
2748 11712 : ulong copy_sz = (ulong)FD_ACCDB_SIZE_DATA( accmetas[ i ]->executable_size );
2749 11712 : fd_memcpy( destination_cache_lines[ i ][ 7UL ]+1UL, original_cache_line[ i ]+1UL, copy_sz );
2750 11712 : accdb->metrics->bytes_copied += copy_sz;
2751 11712 : }
2752 :
2753 247056 : FD_COMPILER_MFENCE();
2754 247056 : FD_VOLATILE( *accdb->my_epoch_slot ) = ULONG_MAX;
2755 247056 : }
2756 :
2757 : void
2758 : fd_accdb_acquire( fd_accdb_t * accdb,
2759 : fd_accdb_fork_id_t fork_id,
2760 : ulong pubkeys_cnt,
2761 : uchar const * const * pubkeys,
2762 : int * writable,
2763 246708 : fd_acc_t * out_accs ) {
2764 246708 : FD_TEST( accdb->acquire_state==FD_ACCDB_ACQUIRE_STATE_IDLE );
2765 246708 : accdb->acquire_state = FD_ACCDB_ACQUIRE_STATE_OPEN;
2766 246708 : fd_accdb_acquire_inner( accdb, fork_id, RESERVATION_TYPE_SIMPLE, 0UL, pubkeys_cnt, pubkeys, writable, out_accs );
2767 246708 : }
2768 :
2769 : void
2770 : fd_accdb_acquire_a( fd_accdb_t * accdb,
2771 : fd_accdb_fork_id_t fork_id,
2772 : ulong pubkeys_cnt,
2773 : uchar const * const * pubkeys,
2774 : int * writable,
2775 174 : fd_acc_t * out_accs ) {
2776 174 : FD_TEST( accdb->acquire_state==FD_ACCDB_ACQUIRE_STATE_IDLE );
2777 174 : accdb->acquire_state = FD_ACCDB_ACQUIRE_STATE_PHASE_A;
2778 174 : fd_accdb_acquire_inner( accdb, fork_id, RESERVATION_TYPE_MAYBE_PROGRAMDATA, 0UL, pubkeys_cnt, pubkeys, writable, out_accs );
2779 174 : }
2780 :
2781 : void
2782 : fd_accdb_acquire_b( fd_accdb_t * accdb,
2783 : fd_accdb_fork_id_t fork_id,
2784 : ulong reserved_cnt,
2785 : ulong pubkeys_cnt,
2786 : uchar const * const * pubkeys,
2787 : int * writable,
2788 174 : fd_acc_t * out_accs ) {
2789 174 : FD_TEST( accdb->acquire_state==FD_ACCDB_ACQUIRE_STATE_PHASE_A );
2790 174 : accdb->acquire_state = FD_ACCDB_ACQUIRE_STATE_OPEN;
2791 174 : fd_accdb_acquire_inner( accdb, fork_id, RESERVATION_TYPE_ALREADY_RESERVED, reserved_cnt, pubkeys_cnt, pubkeys, writable, out_accs );
2792 174 : }
2793 :
2794 : /* release_inner drains one group of acquired accs but does NOT change the
2795 : handle's acquire_state. The public fd_accdb_release / fd_accdb_release_ab
2796 : wrappers below own the state transition (a single-phase release closes
2797 : the bracket; release_ab drains both phase groups then closes). */
2798 : static void
2799 : release_inner( fd_accdb_t * accdb,
2800 : ulong accs_cnt,
2801 246768 : fd_acc_t * accs ) {
2802 246768 : FD_TEST( accdb->acquire_state==FD_ACCDB_ACQUIRE_STATE_OPEN );
2803 :
2804 246768 : {
2805 246768 : ulong prev = FD_VOLATILE_CONST( *accdb->my_epoch_slot );
2806 246768 : FD_TEST( prev==ULONG_MAX || prev<=FD_VOLATILE_CONST( accdb->shmem->epoch ) );
2807 246768 : }
2808 :
2809 246768 : FD_COMPILER_MFENCE();
2810 246768 : FD_VOLATILE( *accdb->my_epoch_slot ) = FD_VOLATILE_CONST( accdb->shmem->epoch );
2811 246768 : FD_HW_MFENCE(); /* StoreLoad: epoch store must be globally visible
2812 : before any subsequent loads so the deferred
2813 : reclamation scan does not miss us. */
2814 :
2815 : // STEP 1.
2816 : // For each cache line which was written to in the 10MiB staging
2817 : // buffer, we may need to copy to the data out to a right sized
2818 : // cache line. Figuring out the target cache line is non-obvious,
2819 : // but follows the more complete logic below this, we just pull the
2820 : // memcpy out so they are not done inside the cache lock.
2821 :
2822 493893 : for( ulong i=0UL; i<accs_cnt; i++ ) {
2823 247125 : if( FD_UNLIKELY( accs[ i ]._original_size_class==ULONG_MAX && !accs[ i ]._writable ) ) continue;
2824 :
2825 : #if FD_TMPL_USE_HANDHOLDING
2826 : if( FD_LIKELY( accs[ i ]._original_size_class!=ULONG_MAX ) ) {
2827 : FD_TEST( accs[ i ]._original_cache_idx<accdb->shmem->cache_class_max[ accs[ i ]._original_size_class ] );
2828 : }
2829 : if( FD_UNLIKELY( accs[ i ].commit ) ) FD_TEST( accs[ i ]._writable );
2830 : #endif
2831 :
2832 170952 : if( FD_LIKELY( !accs[ i ]._writable || !accs[ i ].commit ) ) continue;
2833 : #if FD_TMPL_USE_HANDHOLDING
2834 : if( FD_UNLIKELY( accs[ i ]._overwrite ) ) {
2835 : FD_TEST( accs[ i ]._writable );
2836 : FD_TEST( accs[ i ]._original_cache_idx!=ULONG_MAX );
2837 : FD_TEST( accs[ i ]._original_size_class!=ULONG_MAX );
2838 : }
2839 : #endif
2840 :
2841 103206 : ulong original_size_class = accs[ i ]._original_size_class;
2842 103206 : ulong new_size_class = fd_accdb_cache_class( accs[ i ].data_len );
2843 103206 : if( FD_UNLIKELY( new_size_class==7UL ) ) continue;
2844 :
2845 102894 : fd_accdb_cache_line_t * target_cache_line;
2846 102894 : if( FD_LIKELY( original_size_class==new_size_class && accs[ i ]._overwrite ) ) target_cache_line = cache_line( accdb, original_size_class, accs[ i ]._original_cache_idx );
2847 100437 : else target_cache_line = cache_line( accdb, new_size_class, accs[ i ]._write.destination_cache_idx[ new_size_class ] );
2848 :
2849 102894 : fd_accdb_cache_line_t * staging_line = cache_line( accdb, 7UL, accs[ i ]._write.destination_cache_idx[ 7UL ] );
2850 :
2851 102894 : fd_racesan_hook( "accdb_commit:pre_owner_write" );
2852 :
2853 : #if FD_TMPL_USE_HANDHOLDING
2854 : if( FD_UNLIKELY( original_size_class==new_size_class && accs[ i ]._overwrite ) ) {
2855 : uint rc = FD_VOLATILE_CONST( target_cache_line->refcnt );
2856 : FD_TEST( target_cache_line->key.generation==accs[ i ]._generation &&
2857 : !memcmp( target_cache_line->key.pubkey, accs[ i ].pubkey, 32UL ) &&
2858 : rc>0U &&
2859 : rc!=FD_ACCDB_EVICT_SENTINEL );
2860 : }
2861 : #endif
2862 :
2863 102894 : fd_memcpy( target_cache_line->owner, accs[ i ].owner, 32UL );
2864 102894 : fd_memcpy( target_cache_line+1UL, staging_line+1UL, accs[ i ].data_len );
2865 102894 : accdb->metrics->bytes_copied += accs[ i ].data_len;
2866 102894 : }
2867 :
2868 : // STEP 2.
2869 : // Now update the metadata structures and free lists to reflect the
2870 : // fact that we are done with these cache lines. This is fully
2871 : // atomic with CLOCK.
2872 :
2873 493893 : for( ulong i=0UL; i<accs_cnt; i++ ) {
2874 247125 : if( FD_UNLIKELY( accs[ i ]._original_size_class==ULONG_MAX && !accs[ i ]._writable ) ) continue;
2875 :
2876 170952 : ulong original_size_class = accs[ i ]._original_size_class;
2877 170952 : fd_accdb_cache_line_t * original_cache_line = accs[ i ]._original_cache_idx==ULONG_MAX ? NULL : cache_line( accdb, original_size_class, accs[ i ]._original_cache_idx );
2878 : /* For overwrite commits, defer the refcnt decrement on
2879 : original_cache_line until after invalidation completes. If
2880 : we dropped refcnt to 0 here, a concurrent CLOCK sweep could
2881 : CAS(refcnt, 0, EVICT_SENTINEL) and steal the line before we
2882 : get to invalidate it, causing data corruption.
2883 : Non-overwrite and non-commit paths unpin
2884 : immediately because they never invalidate the original line. */
2885 170952 : if( FD_LIKELY( original_cache_line ) ) {
2886 : #if FD_TMPL_USE_HANDHOLDING
2887 : FD_TEST( original_cache_line->refcnt>0U );
2888 : #endif
2889 78828 : if( FD_LIKELY( !accs[ i ]._writable || !accs[ i ].commit || !accs[ i ]._overwrite ) ) {
2890 76050 : FD_ATOMIC_FETCH_AND_SUB( &original_cache_line->refcnt, 1U );
2891 76050 : }
2892 78828 : }
2893 :
2894 170952 : if( FD_LIKELY( !accs[ i ]._writable ) ) {
2895 : /* For readonly accounts, mark as recently used so the CLOCK
2896 : algorithm gives it a second chance before eviction. */
2897 : #if FD_TMPL_USE_HANDHOLDING
2898 : FD_TEST( original_cache_line );
2899 : #endif
2900 67338 : original_cache_line->referenced = 1;
2901 67338 : continue;
2902 67338 : }
2903 :
2904 103614 : fd_accdb_cache_line_t * destination_cache_lines[ FD_ACCDB_CACHE_CLASS_CNT ];
2905 932526 : for( ulong j=0UL; j<FD_ACCDB_CACHE_CLASS_CNT; j++ ) destination_cache_lines[ j ] = cache_line( accdb, j, accs[ i ]._write.destination_cache_idx[ j ] );
2906 103614 : int destination_committed[ FD_ACCDB_CACHE_CLASS_CNT ] = {0};
2907 :
2908 103614 : if( FD_LIKELY( !accs[ i ].commit ) ) {
2909 : /* If it's writable but it didn't commit, all of the destination
2910 : cache lines (including the staging buffer which is trashed) are
2911 : unused and can be pushed to the CAS free list for immediate
2912 : reuse. Whatever buffer it was accessing also gets marked as
2913 : recently used. */
2914 408 : if( FD_LIKELY( original_cache_line ) ) original_cache_line->referenced = 1;
2915 3672 : for( ulong j=0UL; j<FD_ACCDB_CACHE_CLASS_CNT; j++ ) {
2916 : /* acquire_cache_line via CLOCK leaves line->acc_idx pointing
2917 : at the prior owner. cache_free_push consumers (CLOCK,
2918 : background_preevict) skip lines only when acc_idx==UINT_MAX
2919 : AND gen==UINT_MAX; if we leave the stale acc_idx, a future
2920 : CLOCK pick would call line 849/853 against the wrong acc
2921 : and corrupt its cache_idx/valid. */
2922 3264 : destination_cache_lines[ j ]->acc_idx = UINT_MAX;
2923 3264 : destination_cache_lines[ j ]->key.generation = UINT_MAX;
2924 3264 : destination_cache_lines[ j ]->refcnt = 0;
2925 3264 : destination_cache_lines[ j ]->persisted = 1;
2926 3264 : cache_free_push( accdb, j, destination_cache_lines[ j ] );
2927 3264 : }
2928 408 : continue;
2929 408 : }
2930 :
2931 103206 : ulong new_size_class = fd_accdb_cache_class( accs[ i ].data_len );
2932 103206 : uint original_acc_idx = original_cache_line ? original_cache_line->acc_idx : UINT_MAX;
2933 103206 : fd_accdb_cache_line_t * committed_line;
2934 :
2935 : /* For overwrites, invalidate the on-disk offset BEFORE removing
2936 : the cache acc. This ensures a concurrent acquire that misses
2937 : the cache will see offset==FD_ACCDB_OFF_INVAL and spin-wait,
2938 : rather than reading stale on-disk bytes from the old location.
2939 : The CAS-loop exchange also serializes with a concurrent
2940 : compaction CAS (old_offset -> dest_offset). */
2941 103206 : ulong old_offset = FD_ACCDB_OFF_INVAL;
2942 103206 : if( FD_LIKELY( accs[ i ]._overwrite ) ) {
2943 2778 : fd_accdb_accmeta_t * ow_accmeta = &accdb->acc_pool[ original_acc_idx ];
2944 2778 : fd_racesan_hook( "accdb_overwrite:pre_xchg_offset" );
2945 2778 : old_offset = fd_accdb_acc_xchg_offset( ow_accmeta, FD_ACCDB_OFF_INVAL );
2946 2778 : if( FD_LIKELY( old_offset!=FD_ACCDB_OFF_INVAL ) ) {
2947 0 : fd_accdb_shmem_bytes_freed( accdb->shmem, old_offset, (ulong)FD_ACCDB_SIZE_DATA(ow_accmeta->executable_size)+sizeof(fd_accdb_disk_meta_t) );
2948 0 : FD_ATOMIC_FETCH_AND_SUB( &accdb->shmem->shmetrics->disk_used_bytes, (ulong)FD_ACCDB_SIZE_DATA(ow_accmeta->executable_size)+sizeof(fd_accdb_disk_meta_t) );
2949 0 : }
2950 2778 : }
2951 :
2952 103206 : if( FD_UNLIKELY( new_size_class==7UL ) ) {
2953 : /* The account belongs in the largest size class, and we already
2954 : have it resident in a 10MiB buffer anyway, so no need to copy
2955 : back. If we are "overwriting" (same generation as the account
2956 : came from), then the original can be discarded (pushed to
2957 : the CAS free list) and removed from the cache. */
2958 312 : destination_cache_lines[ 7UL ]->persisted = 0;
2959 312 : destination_committed[ 7UL ] = 1;
2960 312 : if( FD_LIKELY( accs[ i ]._overwrite ) ) {
2961 : /* Atomically clear acc.VALID and acc.cache_idx BEFORE freeing
2962 : the line, so a reader cannot observe acc.VALID=1 with
2963 : acc.cache_idx pointing at a line that has been recycled to
2964 : another acc. evict_clear_acc_cache_ref uses the CLAIM
2965 : protocol to serialize with cold_load_acc. */
2966 303 : evict_clear_acc_cache_ref( &accdb->acc_pool[ original_acc_idx ], original_size_class, accs[ i ]._original_cache_idx );
2967 :
2968 : /* Drop our pin, then try to claim the line exclusively for
2969 : freeing. A concurrent reader that pinned the line via
2970 : cache_try_pin BEFORE evict_clear_acc_cache_ref completed
2971 : may still hold a reference here (its ABA check on
2972 : line->key.generation is not synchronized with our writes
2973 : to that field). CAS(refcnt, 0, EVICT_SENTINEL) succeeds
2974 : only when no such reader is outstanding; on failure we
2975 : must NOT free the line — leave acc_idx/key.generation
2976 : intact so CLOCK can reclaim it once the reader unpins.
2977 : At that point CLOCK's call to evict_clear_acc_cache_ref
2978 : is a no-op (acc.cache_idx no longer matches expected_cidx)
2979 : and the line is safely repurposed. */
2980 303 : FD_ATOMIC_FETCH_AND_SUB( &original_cache_line->refcnt, 1U );
2981 303 : if( FD_LIKELY( FD_ATOMIC_CAS( &original_cache_line->refcnt, 0U, FD_ACCDB_EVICT_SENTINEL )==0U ) ) {
2982 303 : original_cache_line->persisted = 1;
2983 303 : original_cache_line->acc_idx = UINT_MAX;
2984 303 : original_cache_line->key.generation = UINT_MAX;
2985 303 : original_cache_line->refcnt = 0;
2986 303 : cache_free_push( accdb, original_size_class, original_cache_line );
2987 303 : }
2988 303 : }
2989 312 : committed_line = destination_cache_lines[ 7UL ];
2990 102894 : } else {
2991 : /* The account started in some arbitrary size class, transited
2992 : through a 10MiB staging buffer, and is now being written back
2993 : to some arbitrary (non-10MiB) size class, so we need to copy it
2994 : there. The staging buffer is discarded. If we are going to
2995 : a different size class, and we are "overwriting" (same
2996 : generation), then the original can also be discarded, but if
2997 : we are staying in the same size class, we can reuse the cache
2998 : line in place. */
2999 102894 : fd_accdb_cache_line_t * target_cache_line;
3000 102894 : if( FD_LIKELY( original_size_class==new_size_class ) ) {
3001 10878 : if( FD_LIKELY( accs[ i ]._overwrite ) ) {
3002 2457 : FD_TEST( FD_VOLATILE_CONST( original_cache_line->refcnt )==1U );
3003 2457 : original_cache_line->key.generation = UINT_MAX;
3004 : /* Keep refcnt>=1 through the reuse window so CLOCK cannot
3005 : steal the line between invalidation and re-publish. The
3006 : pin is released in the destination cleanup loop after
3007 : acc->cache_idx has been republished. */
3008 2457 : original_cache_line->acc_idx = UINT_MAX;
3009 2457 : target_cache_line = original_cache_line;
3010 8421 : } else {
3011 8421 : target_cache_line = destination_cache_lines[ new_size_class ];
3012 8421 : destination_committed[ new_size_class ] = 1;
3013 8421 : }
3014 92016 : } else {
3015 92016 : if( FD_LIKELY( accs[ i ]._overwrite ) ) {
3016 : /* Atomically clear acc.VALID and acc.cache_idx BEFORE freeing
3017 : the line, so a reader cannot observe acc.VALID=1 with
3018 : acc.cache_idx pointing at a line that has been recycled to
3019 : another acc. evict_clear_acc_cache_ref uses the CLAIM
3020 : protocol to serialize with cold_load_acc. See the
3021 : size_class==7 path above for the refcnt CAS rationale. */
3022 18 : evict_clear_acc_cache_ref( &accdb->acc_pool[ original_acc_idx ], original_size_class, accs[ i ]._original_cache_idx );
3023 18 : FD_ATOMIC_FETCH_AND_SUB( &original_cache_line->refcnt, 1U );
3024 18 : if( FD_LIKELY( FD_ATOMIC_CAS( &original_cache_line->refcnt, 0U, FD_ACCDB_EVICT_SENTINEL )==0U ) ) {
3025 18 : original_cache_line->persisted = 1;
3026 18 : original_cache_line->acc_idx = UINT_MAX;
3027 18 : original_cache_line->key.generation = UINT_MAX;
3028 18 : original_cache_line->refcnt = 0;
3029 18 : cache_free_push( accdb, original_size_class, original_cache_line );
3030 18 : }
3031 18 : }
3032 :
3033 92016 : destination_committed[ new_size_class ] = 1;
3034 92016 : target_cache_line = destination_cache_lines[ new_size_class ];
3035 92016 : }
3036 :
3037 102894 : target_cache_line->persisted = 0;
3038 : /* If target is the original cache line (overwrite, same size
3039 : class), mark as referenced directly since the cleanup loop
3040 : only handles destination lines. */
3041 102894 : if( FD_LIKELY( !destination_committed[ new_size_class ] ) ) target_cache_line->referenced = 1;
3042 102894 : committed_line = target_cache_line;
3043 102894 : }
3044 :
3045 : /* For non-overwrite commits, the original cache line (if any) still
3046 : holds valid ancestor data but is no longer pinned. Mark it as
3047 : recently used so the CLOCK algorithm retains it. */
3048 103206 : if( FD_UNLIKELY( !accs[ i ]._overwrite && original_cache_line ) ) {
3049 8421 : original_cache_line->referenced = 1;
3050 8421 : }
3051 :
3052 : /* Handle every destination cache line: committed ones keep
3053 : refcnt>=1 until acc->cache_idx is published (the deferred
3054 : unpin happens after the publish below), uncommitted ones are
3055 : fully freed to the CAS free list. */
3056 928854 : for( ulong j=0UL; j<FD_ACCDB_CACHE_CLASS_CNT; j++ ) {
3057 825648 : if( destination_committed[ j ] ) {
3058 100749 : destination_cache_lines[ j ]->referenced = 1;
3059 724899 : } else {
3060 : /* See note above (no-commit path): clear stale acc_idx/gen
3061 : before pushing, otherwise CLOCK can pick this line and
3062 : stomp the prior owner's cache_idx/valid. */
3063 724899 : destination_cache_lines[ j ]->acc_idx = UINT_MAX;
3064 724899 : destination_cache_lines[ j ]->key.generation = UINT_MAX;
3065 724899 : destination_cache_lines[ j ]->refcnt = 0;
3066 724899 : destination_cache_lines[ j ]->persisted = 1;
3067 724899 : cache_free_push( accdb, j, destination_cache_lines[ j ] );
3068 724899 : }
3069 825648 : }
3070 :
3071 : /* Update the accounts index for this committed write. For an
3072 : overwrite (same fork+generation), update the existing acc
3073 : acc in place. Otherwise allocate a new acc, prepend it
3074 : to the hash chain, and record the write in a txn linked to
3075 : the fork so advance_root can clean up old versions. */
3076 103206 : if( FD_LIKELY( accs[ i ]._overwrite ) ) {
3077 2778 : accdb->metrics->accounts_committed_overwrite_per_class[ new_size_class ]++;
3078 2778 : committed_line->acc_idx = original_acc_idx;
3079 :
3080 2778 : fd_accdb_accmeta_t * accmeta = &accdb->acc_pool[ original_acc_idx ];
3081 : /* The offset was already atomically swapped to FD_ACCDB_OFF_INVAL
3082 : and bytes freed above, so just update the metadata and
3083 : re-publish the cache location. CAS-loop preserves CLAIM bit
3084 : (a concurrent evict_clear_acc_cache_ref or acc_unlink may
3085 : hold it) and clears VALID; a plain store would clobber CLAIM
3086 : and break those protocols. */
3087 2778 : for(;;) {
3088 2778 : uint cur = FD_VOLATILE_CONST( accmeta->executable_size );
3089 2778 : uint nxt = (cur & FD_ACCDB_SIZE_CACHE_CLAIM_BIT) | FD_ACCDB_SIZE_PACK( (uint)accs[ i ].data_len, accs[ i ].executable );
3090 2778 : if( FD_LIKELY( FD_ATOMIC_CAS( &accmeta->executable_size, cur, nxt )==cur ) ) break;
3091 0 : FD_SPIN_PAUSE();
3092 0 : }
3093 2778 : accmeta->lamports = accs[ i ].lamports;
3094 2778 : fd_racesan_hook( "accdb_overwrite:mid_inplace" );
3095 :
3096 2778 : fd_memcpy( committed_line->owner, accs[ i ].owner, 32UL );
3097 2778 : fd_memcpy( committed_line->key.pubkey, accmeta->key.pubkey, 32UL );
3098 2778 : committed_line->key.generation = accmeta->key.generation;
3099 2778 : committed_line->acc_idx = original_acc_idx;
3100 2778 : FD_VOLATILE( accmeta->cache_idx ) = FD_ACCDB_ACC_CIDX_PACK( (uint)new_size_class, (uint)cache_line_idx( accdb, new_size_class, committed_line ) );
3101 : /* Atomic OR so a concurrent evict_clear_acc_cache_ref's CLAIM
3102 : clear (FETCH_AND_AND with ~CLAIM) cannot be lost by an RMW
3103 : race with a plain |= store. */
3104 2778 : FD_ATOMIC_FETCH_AND_OR( &accmeta->executable_size, FD_ACCDB_SIZE_CACHE_VALID_BIT );
3105 :
3106 : /* Now that acc->cache_idx is published, unpin so CLOCK can
3107 : eventually evict it. For same-size overwrites, committed_line
3108 : IS the reused original_cache_line. For cross-size overwrites,
3109 : committed_line is a destination line whose refcnt decrement was
3110 : deferred from the cleanup loop. */
3111 2778 : FD_ATOMIC_FETCH_AND_SUB( &committed_line->refcnt, 1U );
3112 2778 : committed_line->referenced = 1;
3113 100428 : } else {
3114 100428 : accdb->metrics->accounts_committed_new_per_class[ new_size_class ]++;
3115 100428 : fd_accdb_accmeta_t * accmeta = acc_pool_acquire( accdb->acc_pool_join );
3116 100428 : FD_TEST( accmeta );
3117 100428 : ulong acc_idx = acc_pool_idx( accdb->acc_pool_join, accmeta );
3118 100428 : fd_memcpy( accmeta->key.pubkey, accs[ i ].pubkey, 32UL );
3119 100428 : accmeta->lamports = accs[ i ].lamports;
3120 100428 : accmeta->executable_size = FD_ACCDB_SIZE_PACK( (uint)accs[ i ].data_len, accs[ i ].executable );
3121 100428 : accmeta->key.generation = accs[ i ]._generation;
3122 100428 : accmeta->offset_fork = fd_accdb_acc_pack_offset_fork( FD_ACCDB_OFF_INVAL, accs[ i ]._fork_id );
3123 :
3124 : /* Publish in the cache BEFORE the acc_map head so that a
3125 : concurrent acquire that finds this acc in the hash chain will
3126 : also find a cache hit, rather than inserting a conflicting
3127 : placeholder cache acc. */
3128 100428 : committed_line->acc_idx = (uint)acc_idx;
3129 100428 : fd_memcpy( committed_line->owner, accs[ i ].owner, 32UL );
3130 100428 : fd_memcpy( committed_line->key.pubkey, accmeta->key.pubkey, 32UL );
3131 100428 : committed_line->key.generation = accmeta->key.generation;
3132 100428 : FD_VOLATILE( accmeta->cache_idx ) = FD_ACCDB_ACC_CIDX_PACK( (uint)new_size_class, (uint)cache_line_idx( accdb, new_size_class, committed_line ) );
3133 : /* Atomic OR so a concurrent evict_clear_acc_cache_ref's CLAIM
3134 : clear (FETCH_AND_AND with ~CLAIM) cannot be lost by an RMW
3135 : race with a plain |= store. */
3136 100428 : FD_ATOMIC_FETCH_AND_OR( &accmeta->executable_size, FD_ACCDB_SIZE_CACHE_VALID_BIT );
3137 :
3138 : /* Now that acc->cache_idx is published, unpin it so
3139 : CLOCK can eventually evict it. */
3140 100428 : FD_ATOMIC_FETCH_AND_SUB( &committed_line->refcnt, 1U );
3141 100428 : committed_line->referenced = 1;
3142 :
3143 : /* CAS loop to prepend to the hash chain. Succeeds on the first
3144 : try in most cases, but a concurrent acc_unlink CAS removing
3145 : the old head can change acc_map[idx] between our load and
3146 : CAS. Multiple concurrent releases may also race on the head
3147 : pointer — the CAS retry handles this. */
3148 100428 : for(;;) {
3149 100428 : uint old_head = FD_VOLATILE_CONST( accdb->acc_map[ accs[ i ]._acc_map_idx ] );
3150 100428 : accmeta->map.next = old_head;
3151 100428 : FD_COMPILER_MFENCE();
3152 100428 : fd_racesan_hook( "accdb_release:pre_chain_cas" );
3153 100428 : if( FD_LIKELY( FD_ATOMIC_CAS( &accdb->acc_map[ accs[ i ]._acc_map_idx ], old_head, (uint)acc_idx )==old_head ) ) break;
3154 0 : FD_SPIN_PAUSE();
3155 0 : }
3156 :
3157 : /* CONCURRENCY: The cache acc is published before the acc_map
3158 : head so that a concurrent fd_accdb_acquire reader that
3159 : observes the new head also finds a cache hit, preventing
3160 : duplicate cache insertion.
3161 :
3162 : (1) The CAS on acc_map[idx] serializes head-pointer mutations
3163 : from concurrent releases onto the same chain without any
3164 : external lock.
3165 :
3166 : (2) The FD_COMPILER_MFENCE above ensures stores to the acc node
3167 : fields (pubkey, lamports, size, generation, fork_id,
3168 : offset, map.next) are ordered before the CAS that publishes
3169 : the new head. On x86-64 (TSO), hardware also guarantees
3170 : this, but the compiler fence is needed to prevent the
3171 : compiler from reordering the stores. A reader that
3172 : observes the new head is guaranteed to see a fully
3173 : initialized node. A reader that has not yet seen the new
3174 : head simply traverses the previous (still valid) chain.
3175 :
3176 : (3) A concurrent acc_unlink (advance_root / purge) may CAS the
3177 : head away between our load and CAS here. The CAS retry
3178 : loop handles this. */
3179 :
3180 100428 : fd_accdb_txn_t * txn = txn_pool_acquire( accdb->txn_pool );
3181 100428 : FD_TEST( txn ); /* Sized so it always succeeds */
3182 100428 : txn->acc_map_idx = (uint)accs[ i ]._acc_map_idx;
3183 100428 : txn->acc_pool_idx = (uint)acc_idx;
3184 100428 : uint txn_idx = (uint)txn_pool_idx( accdb->txn_pool, txn );
3185 100428 : for(;;) {
3186 100428 : uint old_head = FD_VOLATILE_CONST( accdb->fork_pool[ accs[ i ]._fork_id ].shmem->txn_head );
3187 100428 : txn->fork.next = old_head;
3188 100428 : if( FD_LIKELY( FD_ATOMIC_CAS( &accdb->fork_pool[ accs[ i ]._fork_id ].shmem->txn_head, old_head, txn_idx )==old_head ) ) break;
3189 0 : FD_SPIN_PAUSE();
3190 0 : }
3191 :
3192 100428 : FD_ATOMIC_FETCH_AND_ADD( &accdb->shmem->shmetrics->accounts_total, 1UL );
3193 100428 : }
3194 103206 : }
3195 :
3196 : // STEP 3.
3197 : // Finally, we release the cache class reservations we took at the
3198 : // beginning when we acquired these cache lines. Credits return
3199 : // directly to the shared pool so other threads can use them
3200 : // immediately.
3201 :
3202 246768 : ulong refund[ FD_ACCDB_CACHE_CLASS_CNT ] = {0};
3203 493893 : for( ulong i=0UL; i<accs_cnt; i++ ) {
3204 247125 : if( FD_LIKELY( accs[ i ]._original_size_class!=ULONG_MAX ) ) {
3205 78828 : if( FD_UNLIKELY( accdb->shmem->cache_class_used[ accs[ i ]._original_size_class ].val!=ULONG_MAX ) ) {
3206 3 : refund[ accs[ i ]._original_size_class ]++;
3207 3 : }
3208 78828 : }
3209 247125 : if( FD_UNLIKELY( accs[ i ]._writable ) ) {
3210 932526 : for( ulong j=0UL; j<FD_ACCDB_CACHE_CLASS_CNT; j++ ) {
3211 828912 : if( FD_UNLIKELY( accdb->shmem->cache_class_used[ j ].val!=ULONG_MAX ) ) {
3212 54 : refund[ j ]++;
3213 54 : }
3214 828912 : }
3215 103614 : }
3216 247125 : }
3217 2220912 : for( ulong k=0UL; k<FD_ACCDB_CACHE_CLASS_CNT; k++ ) {
3218 1974144 : if( FD_UNLIKELY( refund[ k ] ) ) FD_ATOMIC_FETCH_AND_SUB( &accdb->shmem->cache_class_used[ k ].val, refund[ k ] );
3219 1974144 : }
3220 :
3221 246768 : FD_COMPILER_MFENCE();
3222 246768 : FD_VOLATILE( *accdb->my_epoch_slot ) = ULONG_MAX;
3223 246768 : }
3224 :
3225 : void
3226 : fd_accdb_release( fd_accdb_t * accdb,
3227 : ulong accs_cnt,
3228 246708 : fd_acc_t * accs ) {
3229 246708 : FD_TEST( accdb->acquire_state==FD_ACCDB_ACQUIRE_STATE_OPEN );
3230 246708 : release_inner( accdb, accs_cnt, accs );
3231 246708 : accdb->acquire_state = FD_ACCDB_ACQUIRE_STATE_IDLE;
3232 246708 : }
3233 :
3234 : void
3235 : fd_accdb_release_ab( fd_accdb_t * accdb,
3236 : ulong accs_cnt,
3237 : fd_acc_t * accs,
3238 : ulong execs_cnt,
3239 57 : fd_acc_t * execs ) {
3240 57 : FD_TEST( accdb->acquire_state==FD_ACCDB_ACQUIRE_STATE_OPEN );
3241 57 : release_inner( accdb, accs_cnt, accs );
3242 57 : if( FD_LIKELY( execs_cnt ) ) release_inner( accdb, execs_cnt, execs );
3243 57 : accdb->acquire_state = FD_ACCDB_ACQUIRE_STATE_IDLE;
3244 57 : }
3245 :
3246 : fd_acc_t
3247 : fd_accdb_read_one( fd_accdb_t * accdb,
3248 : fd_accdb_fork_id_t fork_id,
3249 142029 : uchar const * pubkey ) {
3250 142029 : fd_acc_t acc;
3251 142029 : fd_accdb_acquire( accdb, fork_id, 1UL, &pubkey, (int[]){0}, &acc );
3252 142029 : return acc;
3253 142029 : }
3254 :
3255 : void
3256 : fd_accdb_unread_one( fd_accdb_t * accdb,
3257 142029 : fd_acc_t * acc ) {
3258 142029 : fd_accdb_release( accdb, 1UL, acc );
3259 142029 : }
3260 :
3261 : fd_acc_t
3262 : fd_accdb_write_one( fd_accdb_t * accdb,
3263 : fd_accdb_fork_id_t fork_id,
3264 101997 : uchar const * pubkey ) {
3265 101997 : fd_acc_t acc;
3266 101997 : fd_accdb_acquire( accdb, fork_id, 1UL, &pubkey, (int[]){1}, &acc );
3267 101997 : return acc;
3268 101997 : }
3269 :
3270 : void
3271 : fd_accdb_unwrite_one( fd_accdb_t * accdb,
3272 101997 : fd_acc_t * acc ) {
3273 101997 : fd_accdb_release( accdb, 1UL, acc );
3274 101997 : }
3275 :
3276 : void
3277 : fd_accdb_read_one_nocache( fd_accdb_t * accdb,
3278 : fd_accdb_fork_id_t fork_id,
3279 : uchar const * pubkey,
3280 : ulong * out_lamports,
3281 : int * out_executable,
3282 : uchar * out_owner,
3283 : uchar * out_data,
3284 0 : ulong * out_data_len ) {
3285 : /* Publish epoch — protects against compaction freeing the partition
3286 : under us during the preadv2 path. This is the only write the
3287 : readonly joiner makes into accdb shmem (and the pointer it stores
3288 : through is mapped through a separately-mmap'd writable page that
3289 : aliases shmem->joiner_epochs[idx]). */
3290 0 : FD_COMPILER_MFENCE();
3291 0 : FD_VOLATILE( *accdb->my_epoch_slot ) = FD_VOLATILE_CONST( accdb->shmem->epoch );
3292 0 : FD_HW_MFENCE();
3293 :
3294 : /// STEP 1.
3295 : /// Walk the hash chain at acc_map[hash(pubkey)] using the same
3296 : // visibility test as fd_accdb_acquire_inner. See that function
3297 : // for the detailed safety argument under concurrent prepend.
3298 0 : uint root_generation = accdb->fork_pool[ accdb->shmem->root_fork_id.val ].shmem->generation;
3299 0 : fd_accdb_fork_t * fork = &accdb->fork_pool[ fork_id.val ];
3300 0 : ulong hash = fd_accdb_hash( pubkey, accdb->shmem->seed )&(accdb->shmem->chain_cnt-1UL);
3301 0 : uint acc_idx = FD_VOLATILE_CONST( accdb->acc_map[ hash ] );
3302 0 : fd_accdb_accmeta_t const * accmeta = NULL;
3303 0 : while( acc_idx!=UINT_MAX ) {
3304 0 : fd_accdb_accmeta_t const * candidate = &accdb->acc_pool[ acc_idx ];
3305 0 : uint next_idx = FD_VOLATILE_CONST( candidate->map.next );
3306 0 : if( FD_UNLIKELY( (candidate->key.generation>root_generation &&
3307 0 : fd_accdb_acc_fork_id(candidate)!=fork_id.val &&
3308 0 : !descends_set_test( fork->descends, fd_accdb_acc_fork_id(candidate) )) ) ||
3309 0 : memcmp( pubkey, candidate->key.pubkey, 32UL ) ) {
3310 0 : acc_idx = next_idx;
3311 0 : continue;
3312 0 : }
3313 0 : accmeta = candidate;
3314 0 : break;
3315 0 : }
3316 :
3317 0 : if( FD_UNLIKELY( !accmeta ) ) {
3318 0 : accdb->metrics->accounts_acquired_per_class[ 0 ]++;
3319 0 : *out_lamports = 0UL;
3320 0 : FD_COMPILER_MFENCE();
3321 0 : FD_VOLATILE( *accdb->my_epoch_slot ) = ULONG_MAX;
3322 0 : return;
3323 0 : }
3324 :
3325 : /// STEP 2.
3326 : /// Snapshot acc fields. The acc element's metadata is effectively
3327 : /// immutable from the perspective of cross-fork readers (see the
3328 : /// comment block in fd_accdb.h about cross-fork reads). */
3329 0 : uint snap_es = FD_VOLATILE_CONST( accmeta->executable_size );
3330 0 : uint snap_gen = accmeta->key.generation;
3331 0 : ulong snap_lamports = accmeta->lamports;
3332 0 : uint snap_cidx = FD_VOLATILE_CONST( accmeta->cache_idx );
3333 0 : ulong data_len = (ulong)FD_ACCDB_SIZE_DATA( snap_es );
3334 0 : int executable = FD_ACCDB_SIZE_EXEC( snap_es );
3335 :
3336 0 : accdb->metrics->accounts_acquired_per_class[ fd_accdb_cache_class( data_len ) ]++;
3337 :
3338 0 : if( FD_UNLIKELY( !snap_lamports ) ) {
3339 0 : *out_lamports = 0UL;
3340 0 : FD_COMPILER_MFENCE();
3341 0 : FD_VOLATILE( *accdb->my_epoch_slot ) = ULONG_MAX;
3342 0 : return;
3343 0 : }
3344 :
3345 : /// STEP 3.
3346 : /// Cache hit fast path with try-read-test (ABA) loop. Same
3347 : /// primitives as cache_try_pin: re-check key.generation + pubkey
3348 : /// before and after the bulk copy, and bail to the disk path if the
3349 : /// line was claimed for eviction (refcnt ==
3350 : /// FD_ACCDB_EVICT_SENTINEL). No CAS on refcnt, we never pin the
3351 : /// line.
3352 0 : if( FD_LIKELY( FD_ACCDB_SIZE_CACHE_VALID( snap_es ) && snap_cidx!=FD_ACCDB_ACC_CIDX_INVAL ) ) {
3353 0 : ulong cls = FD_ACCDB_ACC_CIDX_CLASS( snap_cidx );
3354 0 : ulong idx = FD_ACCDB_ACC_CIDX_IDX ( snap_cidx );
3355 0 : fd_accdb_cache_line_t * line = cache_line( accdb, cls, idx );
3356 :
3357 0 : for(;;) {
3358 0 : uint gen0 = FD_VOLATILE_CONST( line->key.generation );
3359 0 : uint rc0 = FD_VOLATILE_CONST( line->refcnt );
3360 0 : uint ai0 = FD_VOLATILE_CONST( line->acc_idx );
3361 0 : if( FD_UNLIKELY( rc0==FD_ACCDB_EVICT_SENTINEL ) ) goto miss;
3362 0 : if( FD_UNLIKELY( gen0!=snap_gen ) ) goto miss;
3363 0 : if( FD_UNLIKELY( memcmp( line->key.pubkey, pubkey, 32UL ) ) ) goto miss;
3364 : /* acc_idx==UINT_MAX is the "loading" sentinel set by cold_load_acc
3365 : before the preadv2 fills the line. CACHE_VALID can be observed
3366 : set while the bytes are still stale, so fall to the disk path
3367 : (which spins on offset_fork and reads from the file) rather
3368 : than copying garbage. */
3369 0 : if( FD_UNLIKELY( ai0==UINT_MAX ) ) goto miss;
3370 :
3371 0 : FD_COMPILER_MFENCE();
3372 0 : memcpy( out_owner, line->owner, 32UL );
3373 0 : memcpy( out_data, (uchar const *)(line+1UL), data_len );
3374 0 : FD_COMPILER_MFENCE();
3375 :
3376 0 : uint gen1 = FD_VOLATILE_CONST( line->key.generation );
3377 0 : uint rc1 = FD_VOLATILE_CONST( line->refcnt );
3378 0 : uint ai1 = FD_VOLATILE_CONST( line->acc_idx );
3379 0 : if( FD_UNLIKELY( rc1==FD_ACCDB_EVICT_SENTINEL ) ) goto miss;
3380 0 : if( FD_UNLIKELY( gen1!=snap_gen ) ) goto miss;
3381 0 : if( FD_UNLIKELY( memcmp( line->key.pubkey, pubkey, 32UL ) ) ) goto miss;
3382 0 : if( FD_UNLIKELY( ai1==UINT_MAX ) ) goto miss;
3383 :
3384 0 : *out_lamports = snap_lamports;
3385 0 : *out_executable = executable;
3386 0 : *out_data_len = data_len;
3387 0 : accdb->metrics->bytes_copied += data_len;
3388 0 : FD_COMPILER_MFENCE();
3389 0 : FD_VOLATILE( *accdb->my_epoch_slot ) = ULONG_MAX;
3390 0 : return;
3391 0 : }
3392 0 : }
3393 :
3394 0 : miss:;
3395 0 : accdb->metrics->accounts_not_found_per_class[ fd_accdb_cache_class( FD_ACCDB_SIZE_DATA( snap_es ) ) ]++;
3396 :
3397 : /// STEP 4.
3398 : /// Disk path. Spin until the writer publishes a real offset
3399 : /// (matches STEP 10 of fd_accdb_acquire_inner). Compaction may
3400 : /// concurrently relocate the record, but our published epoch
3401 : /// prevents the source partition from being freed until we exit
3402 : /// our critical section, so the bytes at the snapshotted offset
3403 : /// remain stable for the duration of the read.
3404 0 : fd_racesan_hook( "accdb_nocache:pre_offset" );
3405 0 : ulong off_packed = FD_VOLATILE_CONST( accmeta->offset_fork );
3406 0 : if( FD_UNLIKELY( (off_packed & FD_ACCDB_OFF_MASK)==FD_ACCDB_OFF_INVAL ) ) {
3407 0 : accdb->metrics->accounts_waited++;
3408 0 : while( FD_UNLIKELY( ((off_packed=FD_VOLATILE_CONST( accmeta->offset_fork )) & FD_ACCDB_OFF_MASK)==FD_ACCDB_OFF_INVAL ) ) FD_SPIN_PAUSE();
3409 0 : }
3410 0 : ulong off = off_packed & FD_ACCDB_OFF_MASK;
3411 0 : fd_racesan_hook( "accdb_nocache:pre_preadv2" );
3412 :
3413 0 : struct iovec iovs[ 2 ] = {
3414 0 : { .iov_base = out_owner, .iov_len = 32UL },
3415 0 : { .iov_base = out_data, .iov_len = data_len },
3416 0 : };
3417 0 : ulong total = 32UL+data_len;
3418 0 : ulong start = off+offsetof( fd_accdb_disk_meta_t, owner );
3419 0 : ulong got = 0UL;
3420 0 : int nio = data_len ? 2 : 1;
3421 0 : while( FD_LIKELY( got<total ) ) {
3422 0 : long result = preadv2( accdb->fd, iovs, nio, (long)(start+got), 0 );
3423 0 : if( FD_UNLIKELY( -1==result && (errno==EINTR || errno==EAGAIN || errno==EWOULDBLOCK) ) ) continue;
3424 0 : else if( FD_UNLIKELY( -1==result ) ) FD_LOG_ERR(( "preadv2() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
3425 0 : else if( FD_UNLIKELY( !result ) ) FD_LOG_ERR(( "accounts database is corrupt, data expected at offset %lu with size %lu exceeded file extents", start+got, total ));
3426 0 : fd_accdb_partition_read_bump( accdb, start+got, (ulong)result );
3427 0 : got += (ulong)result;
3428 0 : accdb->metrics->bytes_read += (ulong)result;
3429 0 : accdb->metrics->read_ops++;
3430 :
3431 0 : long r = result;
3432 0 : for( int v=0; v<nio; v++ ) {
3433 0 : if( (ulong)r>=iovs[ v ].iov_len ) {
3434 0 : r -= (long)iovs[ v ].iov_len;
3435 0 : iovs[ v ].iov_len = 0UL;
3436 0 : } else {
3437 0 : iovs[ v ].iov_base = (uchar *)iovs[ v ].iov_base + r;
3438 0 : iovs[ v ].iov_len -= (ulong)r;
3439 0 : break;
3440 0 : }
3441 0 : }
3442 0 : }
3443 :
3444 0 : *out_lamports = snap_lamports;
3445 0 : *out_executable = executable;
3446 0 : *out_data_len = data_len;
3447 :
3448 0 : FD_COMPILER_MFENCE();
3449 0 : FD_VOLATILE( *accdb->my_epoch_slot ) = ULONG_MAX;
3450 0 : }
3451 :
3452 : int
3453 : fd_accdb_exists( fd_accdb_t * accdb,
3454 : fd_accdb_fork_id_t fork_id,
3455 3 : uchar const * pubkey ) {
3456 3 : FD_COMPILER_MFENCE();
3457 3 : FD_VOLATILE( *accdb->my_epoch_slot ) = FD_VOLATILE_CONST( accdb->shmem->epoch );
3458 3 : FD_HW_MFENCE();
3459 :
3460 3 : uint root_generation = accdb->fork_pool[ accdb->shmem->root_fork_id.val ].shmem->generation;
3461 3 : fd_accdb_fork_t * fork = &accdb->fork_pool[ fork_id.val ];
3462 3 : ulong hash = fd_accdb_hash( pubkey, accdb->shmem->seed )&(accdb->shmem->chain_cnt-1UL);
3463 3 : uint acc = FD_VOLATILE_CONST( accdb->acc_map[ hash ] );
3464 3 : while( acc!=UINT_MAX ) {
3465 3 : fd_accdb_accmeta_t const * candidate_acc = &accdb->acc_pool[ acc ];
3466 3 : uint next_acc = FD_VOLATILE_CONST( candidate_acc->map.next );
3467 :
3468 3 : if( FD_UNLIKELY( (candidate_acc->key.generation>root_generation && fd_accdb_acc_fork_id(candidate_acc)!=fork_id.val && !descends_set_test( fork->descends, fd_accdb_acc_fork_id(candidate_acc) )) ) || memcmp( pubkey, candidate_acc->key.pubkey, 32UL ) ) {
3469 0 : acc = next_acc;
3470 0 : continue;
3471 0 : }
3472 :
3473 3 : break;
3474 3 : }
3475 :
3476 3 : int result;
3477 3 : if( FD_UNLIKELY( acc==UINT_MAX ) ) result = 0;
3478 3 : else result = !!FD_VOLATILE_CONST( accdb->acc_pool[ acc ].lamports );
3479 :
3480 3 : FD_COMPILER_MFENCE();
3481 3 : FD_VOLATILE( *accdb->my_epoch_slot ) = ULONG_MAX;
3482 3 : return result;
3483 3 : }
3484 :
3485 : ulong
3486 : fd_accdb_lamports( fd_accdb_t * accdb,
3487 : fd_accdb_fork_id_t fork_id,
3488 7827 : uchar const * pubkey ) {
3489 7827 : FD_COMPILER_MFENCE();
3490 7827 : FD_VOLATILE( *accdb->my_epoch_slot ) = FD_VOLATILE_CONST( accdb->shmem->epoch );
3491 7827 : FD_HW_MFENCE();
3492 :
3493 7827 : uint root_generation = accdb->fork_pool[ accdb->shmem->root_fork_id.val ].shmem->generation;
3494 7827 : fd_accdb_fork_t * fork = &accdb->fork_pool[ fork_id.val ];
3495 7827 : ulong hash = fd_accdb_hash( pubkey, accdb->shmem->seed )&(accdb->shmem->chain_cnt-1UL);
3496 7827 : uint acc = FD_VOLATILE_CONST( accdb->acc_map[ hash ] );
3497 8865 : while( acc!=UINT_MAX ) {
3498 1275 : fd_accdb_accmeta_t const * candidate_acc = &accdb->acc_pool[ acc ];
3499 1275 : uint next_acc = FD_VOLATILE_CONST( candidate_acc->map.next );
3500 :
3501 1275 : if( FD_UNLIKELY( (candidate_acc->key.generation>root_generation && fd_accdb_acc_fork_id(candidate_acc)!=fork_id.val && !descends_set_test( fork->descends, fd_accdb_acc_fork_id(candidate_acc) )) ) || memcmp( pubkey, candidate_acc->key.pubkey, 32UL ) ) {
3502 1038 : acc = next_acc;
3503 1038 : continue;
3504 1038 : }
3505 :
3506 237 : break;
3507 1275 : }
3508 :
3509 7827 : ulong result;
3510 7827 : if( FD_UNLIKELY( acc==UINT_MAX ) ) result = 0UL;
3511 237 : else result = FD_VOLATILE_CONST( accdb->acc_pool[ acc ].lamports );
3512 :
3513 7827 : FD_COMPILER_MFENCE();
3514 7827 : FD_VOLATILE( *accdb->my_epoch_slot ) = ULONG_MAX;
3515 7827 : return result;
3516 7827 : }
3517 :
3518 : /* cache_bg_evict pre-evicts cache lines in the background to keep the
3519 : per-class CAS free lists populated ahead of demand. For each class
3520 : whose immediately available capacity has dropped below low_water,
3521 : a bounded CLOCK sweep claims lines, writes dirty ones to disk, and
3522 : pushes them onto the free list until available capacity reaches
3523 : target. Immediately available capacity includes both the CAS free
3524 : list and the never-initialized tail of the class, since foreground
3525 : allocators can consume either path without evicting resident data.
3526 :
3527 : Budget: at most 256 CLOCK ticks per class per invocation to keep the
3528 : background loop responsive. The function is called every tick of
3529 : fd_accdb_background, so large refills happen across several ticks
3530 : rather than blocking. The low_water / target thresholds are static
3531 : per-class watermarks computed at initialization; pre-eviction only
3532 : converts resident lines into free-list entries and does not consume
3533 : cache-slot reservations.
3534 :
3535 : force: when non-zero, ignore the watermark and sweep every line in
3536 : every class. Always 0 in normal operation; used only by
3537 : test_accdb_racesan to deterministically exercise the writeback path
3538 : without manufacturing real cache pressure. */
3539 :
3540 : static void
3541 : background_preevict( fd_accdb_t * accdb,
3542 : int * charge_busy,
3543 3 : int force ) {
3544 3 : fd_accdb_shmem_t * shmem = accdb->shmem;
3545 :
3546 27 : for( ulong c=0UL; c<FD_ACCDB_CACHE_CLASS_CNT; c++ ) {
3547 24 : ulong target = shmem->cache_free_target[ c ];
3548 24 : ulong max_c = shmem->cache_class_max[ c ];
3549 24 : ulong init = fd_ulong_min( FD_VOLATILE_CONST( shmem->cache_class_init[ c ].val ), max_c );
3550 24 : ulong freec = FD_VOLATILE_CONST( shmem->cache_free_cnt[ c ].val );
3551 24 : ulong live = init>freec ? init-freec : 0UL;
3552 24 : ulong avail = max_c-live;
3553 24 : if( FD_LIKELY( !force && avail>=shmem->cache_free_low_water[ c ] ) ) continue;
3554 :
3555 0 : *charge_busy = 1;
3556 :
3557 0 : ulong budget = force ? init : 256UL;
3558 0 : ulong evicted = 0UL;
3559 0 : if( FD_UNLIKELY( force ) ) target = max_c; /* sweep everything */
3560 :
3561 0 : for( ulong tick=0UL; tick<budget && avail+evicted<target; tick++ ) {
3562 : /* Only sweep the lazily initialized prefix. cache_class_init
3563 : may transiently exceed max_c during the acquire_cache_line
3564 : overflow/undo path, so clamp it before using it as the wrap
3565 : bound. */
3566 0 : init = fd_ulong_min( FD_VOLATILE_CONST( shmem->cache_class_init[ c ].val ), max_c );
3567 0 : if( FD_UNLIKELY( !init ) ) break;
3568 :
3569 0 : ulong hand = FD_ATOMIC_FETCH_AND_ADD( &shmem->clock_hand[ c ].val, 1UL ) % init;
3570 :
3571 0 : fd_accdb_cache_line_t * line = cache_line( accdb, c, hand );
3572 :
3573 0 : if( FD_UNLIKELY( line->key.generation==UINT_MAX && line->acc_idx==UINT_MAX ) ) continue;
3574 :
3575 0 : uint rc = FD_VOLATILE_CONST( line->refcnt );
3576 0 : if( FD_UNLIKELY( rc ) ) continue;
3577 :
3578 0 : if( FD_UNLIKELY( line->referenced ) ) {
3579 0 : line->referenced = 0;
3580 0 : continue;
3581 0 : }
3582 :
3583 0 : if( FD_UNLIKELY( FD_ATOMIC_CAS( &line->refcnt, 0U, FD_ACCDB_EVICT_SENTINEL )!=0U ) ) continue;
3584 :
3585 0 : uint acc_idx = line->acc_idx;
3586 : #if FD_TMPL_USE_HANDHOLDING
3587 : uint line_gen FD_FN_UNUSED = line->key.generation;
3588 : #endif
3589 0 : if( FD_LIKELY( acc_idx!=UINT_MAX ) ) {
3590 0 : evict_clear_acc_cache_ref( &accdb->acc_pool[ acc_idx ], c, hand );
3591 0 : }
3592 0 : line->key.generation = UINT_MAX;
3593 0 : if( FD_UNLIKELY( !line->persisted && acc_idx!=UINT_MAX ) ) {
3594 0 : fd_accdb_accmeta_t * accmeta = &accdb->acc_pool[ acc_idx ];
3595 0 : fd_racesan_hook( "preevict:pre_synth" );
3596 : #if FD_TMPL_USE_HANDHOLDING
3597 : FD_TEST( line_gen==accmeta->key.generation &&
3598 : !memcmp( line->key.pubkey, accmeta->key.pubkey, 32UL ) );
3599 : #endif
3600 0 : ulong entry_sz = sizeof(fd_accdb_disk_meta_t)+(ulong)FD_ACCDB_SIZE_DATA( accmeta->executable_size );
3601 :
3602 : /* Atomically swap the old offset to FD_ACCDB_OFF_INVAL so that
3603 : a concurrent compaction CAS (old_offset -> dest_offset)
3604 : cannot succeed between our read and our later store of
3605 : the new file_off. Without the exchange, compaction could
3606 : relocate the record, then our plain store would overwrite
3607 : the relocated offset, leaving the compaction destination
3608 : as unreachable dead space whose bytes are never freed. */
3609 0 : ulong old_offset = fd_accdb_acc_xchg_offset( accmeta, FD_ACCDB_OFF_INVAL );
3610 0 : if( FD_LIKELY( old_offset!=FD_ACCDB_OFF_INVAL ) ) {
3611 0 : fd_accdb_shmem_bytes_freed( shmem, old_offset, entry_sz );
3612 0 : FD_ATOMIC_FETCH_AND_SUB( &shmem->shmetrics->disk_used_bytes, entry_sz );
3613 0 : }
3614 :
3615 0 : fd_accdb_disk_meta_t meta;
3616 0 : fd_memcpy( meta.pubkey, accmeta->key.pubkey, 32UL );
3617 0 : meta.size = FD_ACCDB_SIZE_DATA( accmeta->executable_size );
3618 0 : fd_memcpy( meta.owner, line->owner, 32UL );
3619 :
3620 0 : struct iovec iovs[ 2UL ] = {
3621 0 : { .iov_base = &meta, .iov_len = sizeof(fd_accdb_disk_meta_t) },
3622 0 : { .iov_base = (void *)(line+1UL), .iov_len = FD_ACCDB_SIZE_DATA( accmeta->executable_size ) }
3623 0 : };
3624 :
3625 0 : ulong file_off = allocate_next_write( accdb, entry_sz );
3626 0 : ulong written = 0UL;
3627 0 : while( written<entry_sz ) {
3628 0 : long result = pwritev2( accdb->fd, iovs, 2, (long)(file_off+written), 0 );
3629 0 : if( FD_UNLIKELY( result==-1 && errno==EINTR ) ) continue;
3630 0 : else if( FD_UNLIKELY( result<=0 ) ) FD_LOG_ERR(( "pwritev2() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
3631 0 : written += (ulong)result;
3632 0 : accdb->metrics->bytes_written += (ulong)result;
3633 0 : accdb->metrics->write_ops++;
3634 :
3635 0 : for( int v=0; v<2; v++ ) {
3636 0 : if( (ulong)result>=iovs[ v ].iov_len ) {
3637 0 : result -= (long)iovs[ v ].iov_len;
3638 0 : iovs[ v ].iov_len = 0UL;
3639 0 : } else {
3640 0 : iovs[ v ].iov_base = (uchar *)iovs[ v ].iov_base + result;
3641 0 : iovs[ v ].iov_len -= (ulong)result;
3642 0 : break;
3643 0 : }
3644 0 : }
3645 0 : }
3646 :
3647 0 : FD_COMPILER_MFENCE();
3648 0 : accmeta->offset_fork = fd_accdb_acc_pack_offset_fork( file_off, fd_accdb_acc_fork_id(accmeta) );
3649 0 : FD_ATOMIC_FETCH_AND_ADD( &shmem->shmetrics->disk_used_bytes, entry_sz );
3650 :
3651 0 : accdb->metrics->accounts_preevicted++;
3652 0 : accdb->metrics->accounts_preevicted_per_class[ c ]++;
3653 0 : }
3654 :
3655 0 : line->persisted = 1;
3656 0 : line->acc_idx = UINT_MAX;
3657 0 : line->key.generation = UINT_MAX;
3658 0 : line->refcnt = 0;
3659 0 : cache_free_push( accdb, c, line );
3660 0 : evicted++;
3661 0 : }
3662 0 : }
3663 3 : }
3664 :
3665 : int
3666 : fd_accdb_snapshot_write_one( fd_accdb_t * accdb,
3667 : fd_accdb_fork_id_t fork_id,
3668 : uchar const * pubkey,
3669 : ulong slot,
3670 : ulong lamports,
3671 : ulong data_len,
3672 : int executable,
3673 45 : ulong * out_replaced_lamports ) {
3674 : /* Snapshot slots are stored in the 32-bit cache_idx scratch field
3675 : during loading. Reject anything that would truncate. */
3676 45 : if( FD_UNLIKELY( slot>UINT_MAX ) ) FD_LOG_ERR(( "snapshot slot %lu exceeds 2^32-1, accdb format must be widened", slot ));
3677 :
3678 45 : int incremental = fork_id.val!=USHORT_MAX;
3679 :
3680 45 : fd_accdb_fork_t * fork = NULL;
3681 45 : uint fork_gen = 0U;
3682 45 : if( FD_UNLIKELY( incremental ) ) {
3683 21 : fork = &accdb->fork_pool[ fork_id.val ];
3684 21 : fork_gen = fork->shmem->generation;
3685 21 : }
3686 :
3687 45 : ulong hash = fd_accdb_hash( pubkey, accdb->shmem->seed )&(accdb->shmem->chain_cnt-1UL);
3688 :
3689 45 : *out_replaced_lamports = 0UL;
3690 :
3691 45 : fd_accdb_accmeta_t * accmeta = NULL;
3692 45 : int cross_fork = 0; /* incremental only: existing entry from different fork */
3693 :
3694 45 : ulong next_acc = accdb->acc_map[ hash ];
3695 45 : while( next_acc!=UINT_MAX ) {
3696 6 : fd_accdb_accmeta_t * candidate_acc = &accdb->acc_pool[ next_acc ];
3697 6 : if( FD_UNLIKELY( !memcmp( pubkey, candidate_acc->key.pubkey, 32UL ) ) ) {
3698 6 : if( FD_LIKELY( (ulong)candidate_acc->cache_idx>slot ) ) {
3699 : /* Still advance the write head so snapwr and snapin stay in
3700 : sync — snapwr unconditionally writes every account to disk.
3701 : Mark the space as immediately freed since it is dead on
3702 : arrival. */
3703 0 : ulong dead_sz = sizeof(fd_accdb_disk_meta_t)+data_len;
3704 0 : ulong dead_off = allocate_next_write( accdb, dead_sz );
3705 0 : fd_accdb_shmem_bytes_freed( accdb->shmem, dead_off, dead_sz );
3706 0 : return -1;
3707 0 : }
3708 6 : if( FD_UNLIKELY( incremental ) && candidate_acc->key.generation!=fork_gen ) {
3709 : /* Cross-snapshot override: don't replace in-place; insert a
3710 : new entry alongside the old one so purge can revert. */
3711 6 : cross_fork = 1;
3712 6 : *out_replaced_lamports = candidate_acc->lamports;
3713 6 : } else {
3714 : /* Same-fork duplicate (or full-snapshot mode): replace in-place */
3715 0 : accmeta = candidate_acc;
3716 0 : }
3717 6 : break;
3718 6 : }
3719 0 : next_acc = candidate_acc->map.next;
3720 0 : }
3721 :
3722 45 : int replace = !!accmeta;
3723 :
3724 45 : if( FD_UNLIKELY( !accmeta ) ) {
3725 45 : accmeta = acc_pool_acquire( accdb->acc_pool_join );
3726 45 : if( FD_UNLIKELY( !accmeta ) ) FD_LOG_ERR(( "accounts database ran out of space during snapshot loading, increase [accounts.max_accounts], current value is %lu", acc_pool_ele_max( accdb->acc_pool_join ) ));
3727 :
3728 45 : uint acc_idx = (uint)acc_pool_idx( accdb->acc_pool_join, accmeta );
3729 :
3730 45 : fd_memcpy( accmeta->key.pubkey, pubkey, 32UL );
3731 45 : if( FD_UNLIKELY( !incremental && accdb->shmem->root_fork_id.val==USHORT_MAX ) ) {
3732 0 : FD_LOG_ERR(( "snapshot_write_one called without a root fork attached" ));
3733 0 : }
3734 45 : accmeta->key.generation = incremental ? fork_gen : accdb->fork_pool[ accdb->shmem->root_fork_id.val ].shmem->generation;
3735 45 : accmeta->map.next = accdb->acc_map[ hash ];
3736 45 : accdb->acc_map[ hash ] = acc_idx;
3737 :
3738 : /* In incremental mode, record this insert in the fork's txn list
3739 : so purge can find and unlink it on failure. */
3740 45 : if( FD_UNLIKELY( incremental ) ) {
3741 21 : fd_accdb_txn_t * txn = txn_pool_acquire( accdb->txn_pool );
3742 21 : if( FD_UNLIKELY( !txn ) ) FD_LOG_ERR(( "txn pool exhausted during incremental snapshot loading" ));
3743 21 : txn->acc_map_idx = (uint)hash;
3744 21 : txn->acc_pool_idx = acc_idx;
3745 21 : uint txn_idx = (uint)txn_pool_idx( accdb->txn_pool, txn );
3746 21 : txn->fork.next = fork->shmem->txn_head;
3747 21 : fork->shmem->txn_head = txn_idx;
3748 21 : }
3749 45 : }
3750 :
3751 45 : if( FD_UNLIKELY( replace ) ) {
3752 : /* The old version's disk space is now dead. */
3753 0 : ulong old_sz = sizeof(fd_accdb_disk_meta_t) + FD_ACCDB_SIZE_DATA( accmeta->executable_size );
3754 0 : fd_accdb_shmem_bytes_freed( accdb->shmem, fd_accdb_acc_offset( accmeta ), old_sz );
3755 0 : accdb->shmem->shmetrics->disk_used_bytes -= old_sz;
3756 0 : *out_replaced_lamports = accmeta->lamports;
3757 0 : }
3758 :
3759 45 : accmeta->cache_idx = (uint)slot;
3760 45 : accmeta->lamports = lamports;
3761 45 : accmeta->executable_size = FD_ACCDB_SIZE_PACK( (uint)data_len, executable );
3762 45 : ulong entry_sz = sizeof(fd_accdb_disk_meta_t)+data_len;
3763 45 : ulong file_off = allocate_next_write( accdb, entry_sz );
3764 45 : accmeta->offset_fork = incremental ? fd_accdb_acc_pack_offset_fork( file_off, fork_id.val ) : file_off;
3765 45 : accdb->shmem->shmetrics->disk_used_bytes += entry_sz;
3766 45 : if( !replace ) accdb->shmem->shmetrics->accounts_total++;
3767 :
3768 45 : return ( replace || cross_fork ) ? 2 : 1;
3769 45 : }
3770 :
3771 : int
3772 : fd_accdb_snapshot_write_batch( fd_accdb_t * accdb,
3773 : fd_accdb_fork_id_t fork_id,
3774 : ulong cnt,
3775 : uchar const * const pubkeys[],
3776 : ulong const slots[],
3777 : ulong const lamports[],
3778 : ulong const data_lens[],
3779 : int const executables[],
3780 : ulong * accounts_ignored,
3781 : ulong * accounts_replaced,
3782 : ulong * accounts_loaded,
3783 : ulong * out_replaced_lamports,
3784 0 : ulong * out_ignored_lamports ) {
3785 0 : int incremental = fork_id.val!=USHORT_MAX;
3786 :
3787 0 : fd_accdb_fork_t * fork = NULL;
3788 0 : uint fork_gen = 0U;
3789 0 : if( FD_UNLIKELY( incremental ) ) {
3790 0 : fork = &accdb->fork_pool[ fork_id.val ];
3791 0 : fork_gen = fork->shmem->generation;
3792 0 : }
3793 :
3794 0 : ulong seed = accdb->shmem->seed;
3795 0 : ulong chain_msk = accdb->shmem->chain_cnt - 1UL;
3796 0 : if( FD_UNLIKELY( !incremental && accdb->shmem->root_fork_id.val==USHORT_MAX ) ) {
3797 0 : FD_LOG_ERR(( "snapshot_write_batch called without a root fork attached" ));
3798 0 : }
3799 0 : uint gen = incremental ? 0U : accdb->fork_pool[ accdb->shmem->root_fork_id.val ].shmem->generation;
3800 :
3801 0 : ulong ignored = 0UL;
3802 0 : ulong replaced = 0UL;
3803 0 : ulong loaded = 0UL;
3804 0 : ulong cross_replaced = 0UL; /* cross-fork overrides (subset of replaced) */
3805 0 : ulong replaced_lamports = 0UL;
3806 0 : ulong ignored_lamports = 0UL;
3807 :
3808 : /* Snapshot slots are stored in the 32-bit cache_idx scratch field
3809 : during loading. Reject anything that would truncate. */
3810 0 : for( ulong i=0UL; i<cnt; i++ ) {
3811 0 : if( FD_UNLIKELY( slots[ i ]>UINT_MAX ) ) FD_LOG_ERR(( "snapshot slot %lu exceeds 2^32-1, accdb format must be widened", slots[ i ] ));
3812 0 : }
3813 :
3814 : /* Phase 1: compute hashes and prefetch chain heads. */
3815 :
3816 0 : ulong hashes[ 8 ];
3817 0 : fd_accdb_accmeta_t * existing[ 8 ]; /* same-fork dup or full-snapshot replace */
3818 0 : fd_accdb_accmeta_t * cross_existing[ 8 ]; /* cross-fork dup (incremental only) */
3819 0 : int skip[ 8 ];
3820 :
3821 0 : for( ulong i=0UL; i<cnt; i++ ) {
3822 0 : hashes[ i ] = fd_accdb_hash( pubkeys[ i ], seed ) & chain_msk;
3823 0 : existing[ i ] = NULL;
3824 0 : cross_existing[ i ] = NULL;
3825 0 : skip[ i ] = 0;
3826 :
3827 : /* Prefetch the chain head and first pool element on the chain */
3828 0 : __builtin_prefetch( &accdb->acc_map[ hashes[ i ] ], 1, 1 );
3829 0 : }
3830 :
3831 : /* Phase 2: walk chains looking for duplicates. By now the chain
3832 : heads prefetched above should be warm in L1/L2. If the existing
3833 : entry has a higher slot, mark skip. Otherwise, save the existing
3834 : entry pointer for in-place update (matching write_one semantics).
3835 : In incremental mode, cross-fork entries are saved separately so
3836 : they can be left in place while a new entry is inserted. */
3837 :
3838 0 : for( ulong i=0UL; i<cnt; i++ ) {
3839 0 : ulong next_acc = accdb->acc_map[ hashes[ i ] ];
3840 :
3841 0 : if( FD_LIKELY( next_acc!=UINT_MAX ) ) {
3842 0 : __builtin_prefetch( &accdb->acc_pool[ next_acc ], 0, 1 );
3843 0 : }
3844 :
3845 0 : while( next_acc!=UINT_MAX ) {
3846 0 : fd_accdb_accmeta_t * candidate = &accdb->acc_pool[ next_acc ];
3847 :
3848 0 : if( FD_LIKELY( candidate->map.next!=UINT_MAX ) ) {
3849 0 : __builtin_prefetch( &accdb->acc_pool[ candidate->map.next ], 0, 1 );
3850 0 : }
3851 :
3852 0 : if( FD_UNLIKELY( !memcmp( pubkeys[ i ], candidate->key.pubkey, 32UL ) ) ) {
3853 0 : if( FD_LIKELY( (ulong)candidate->cache_idx>slots[ i ] ) ) {
3854 0 : skip[ i ] = 1;
3855 0 : } else if( FD_UNLIKELY( incremental ) && candidate->key.generation!=fork_gen ) {
3856 0 : cross_existing[ i ] = candidate;
3857 0 : } else {
3858 0 : existing[ i ] = candidate;
3859 0 : }
3860 0 : break;
3861 0 : }
3862 0 : next_acc = candidate->map.next;
3863 0 : }
3864 0 : }
3865 :
3866 : /* Phase 2b: reject intra-batch duplicate pubkeys. Snapin always
3867 : populates a batch from a single AppendVec, so every slot in the
3868 : batch is identical and a duplicate pubkey means the same account
3869 : appears twice at the same slot — i.e. a corrupt snapshot per the
3870 : Agave spec. We have no principled way to pick a winner; return
3871 : -1 so the caller can flag the snapshot malformed. Batches are
3872 : bounded (<=8) so the O(n^2) scan is trivial. */
3873 :
3874 0 : for( ulong i=1UL; i<cnt; i++ ) {
3875 0 : for( ulong j=0UL; j<i; j++ ) {
3876 0 : if( hashes[ j ]!=hashes[ i ] ) continue;
3877 0 : if( FD_UNLIKELY( !memcmp( pubkeys[ j ], pubkeys[ i ], 32UL ) ) ) {
3878 0 : FD_LOG_WARNING(( "corrupt snapshot: duplicate pubkey within a single batch (entries %lu and %lu, slots %lu and %lu)", j, i, slots[ j ], slots[ i ] ));
3879 0 : return -1;
3880 0 : }
3881 0 : }
3882 0 : }
3883 :
3884 : /* Phase 3: commit. For each account either update the existing
3885 : entry in-place (replace), allocate and insert at the chain head
3886 : (new), or skip entirely (ignore). This matches the
3887 : insert/replace/ignore semantics of write_one. */
3888 :
3889 0 : for( ulong i=0UL; i<cnt; i++ ) {
3890 0 : if( FD_UNLIKELY( skip[ i ] ) ) {
3891 : /* Still advance the write head so snapwr and snapin stay in
3892 : sync — snapwr unconditionally writes every account to disk.
3893 : Mark the space as immediately freed since it is dead on
3894 : arrival. */
3895 0 : ulong dead_sz = sizeof(fd_accdb_disk_meta_t)+data_lens[ i ];
3896 0 : ulong dead_off = allocate_next_write( accdb, dead_sz );
3897 0 : fd_accdb_shmem_bytes_freed( accdb->shmem, dead_off, dead_sz );
3898 0 : ignored_lamports += lamports[ i ];
3899 0 : ignored++;
3900 0 : continue;
3901 0 : }
3902 :
3903 0 : fd_accdb_accmeta_t * accmeta;
3904 :
3905 0 : if( FD_UNLIKELY( existing[ i ] ) ) {
3906 0 : accmeta = existing[ i ];
3907 : /* The old version's disk space is now dead. */
3908 0 : ulong old_sz = sizeof(fd_accdb_disk_meta_t) + FD_ACCDB_SIZE_DATA( accmeta->executable_size );
3909 0 : fd_accdb_shmem_bytes_freed( accdb->shmem, fd_accdb_acc_offset( accmeta ), old_sz );
3910 0 : accdb->shmem->shmetrics->disk_used_bytes -= old_sz;
3911 0 : replaced_lamports += accmeta->lamports;
3912 0 : replaced++;
3913 0 : } else {
3914 0 : accmeta = acc_pool_acquire( accdb->acc_pool_join );
3915 0 : if( FD_UNLIKELY( !accmeta ) ) FD_LOG_ERR(( "accounts database ran out of space during snapshot loading" ));
3916 :
3917 0 : uint acc_idx = (uint)acc_pool_idx( accdb->acc_pool_join, accmeta );
3918 :
3919 0 : fd_memcpy( accmeta->key.pubkey, pubkeys[ i ], 32UL );
3920 0 : accmeta->key.generation = incremental ? fork_gen : gen;
3921 0 : accmeta->map.next = accdb->acc_map[ hashes[ i ] ];
3922 0 : accdb->acc_map[ hashes[ i ] ] = acc_idx;
3923 :
3924 0 : if( FD_UNLIKELY( incremental ) ) {
3925 0 : fd_accdb_txn_t * txn = txn_pool_acquire( accdb->txn_pool );
3926 0 : if( FD_UNLIKELY( !txn ) ) FD_LOG_ERR(( "txn pool exhausted during incremental snapshot loading" ));
3927 0 : txn->acc_map_idx = (uint)hashes[ i ];
3928 0 : txn->acc_pool_idx = acc_idx;
3929 0 : uint txn_idx = (uint)txn_pool_idx( accdb->txn_pool, txn );
3930 0 : txn->fork.next = fork->shmem->txn_head;
3931 0 : fork->shmem->txn_head = txn_idx;
3932 0 : }
3933 :
3934 0 : if( cross_existing[ i ] ) {
3935 0 : replaced_lamports += cross_existing[ i ]->lamports;
3936 0 : replaced++;
3937 0 : cross_replaced++;
3938 0 : } else {
3939 0 : loaded++;
3940 0 : }
3941 0 : }
3942 :
3943 0 : accmeta->cache_idx = (uint)slots[ i ];
3944 0 : accmeta->lamports = lamports[ i ];
3945 0 : accmeta->executable_size = FD_ACCDB_SIZE_PACK( (uint)data_lens[ i ], executables[ i ] );
3946 0 : ulong entry_sz = sizeof(fd_accdb_disk_meta_t)+data_lens[ i ];
3947 0 : ulong file_off = allocate_next_write( accdb, entry_sz );
3948 0 : accmeta->offset_fork = incremental ? fd_accdb_acc_pack_offset_fork( file_off, fork_id.val ) : file_off;
3949 0 : accdb->shmem->shmetrics->disk_used_bytes += entry_sz;
3950 0 : }
3951 :
3952 : /* accounts_total tracks acc_pool entries: increment for every new
3953 : allocation (both genuinely new accounts and cross-fork overrides
3954 : that insert a second pool entry). The output counter
3955 : *accounts_loaded excludes cross-fork overrides to match
3956 : snapshot_write_one semantics (cross-fork returns 2 = replaced). */
3957 0 : accdb->shmem->shmetrics->accounts_total += loaded + cross_replaced;
3958 :
3959 0 : *accounts_ignored = ignored;
3960 0 : *accounts_replaced = replaced;
3961 0 : *accounts_loaded = loaded;
3962 0 : *out_replaced_lamports = replaced_lamports;
3963 0 : *out_ignored_lamports = ignored_lamports;
3964 :
3965 0 : return 0;
3966 0 : }
3967 :
3968 : void
3969 : fd_accdb_background( fd_accdb_t * accdb,
3970 201 : int * charge_busy ) {
3971 201 : fd_accdb_shmem_t * shmem = accdb->shmem;
3972 201 : uint op = FD_VOLATILE_CONST( shmem->cmd_op );
3973 201 : if( FD_UNLIKELY( op!=FD_ACCDB_CMD_IDLE ) ) {
3974 198 : fd_accdb_fork_id_t fork_id = { .val = FD_VOLATILE_CONST( shmem->cmd_fork_id ) };
3975 :
3976 198 : switch( op ) {
3977 183 : case FD_ACCDB_CMD_ADVANCE_ROOT:
3978 183 : background_advance_root( accdb, fork_id );
3979 183 : break;
3980 12 : case FD_ACCDB_CMD_PURGE:
3981 12 : background_purge( accdb, fork_id );
3982 12 : break;
3983 3 : case FD_ACCDB_CMD_CLEAR_DEFERRED: {
3984 : /* Posted by fd_accdb_reset after it clobbers shared pools.
3985 : T2's deferred fork chain now points at recycled elements;
3986 : discard the stale pointers. Epoch slots are preserved
3987 : across reset so no re-join is needed. */
3988 3 : accdb->deferred_fork_head = NULL;
3989 3 : accdb->deferred_fork_tail = NULL;
3990 3 : accdb->deferred_fork_epoch = 0UL;
3991 3 : break;
3992 0 : }
3993 0 : default:
3994 0 : FD_LOG_ERR(( "unexpected accdb cmd_op %u", op ));
3995 198 : }
3996 :
3997 198 : FD_COMPILER_MFENCE();
3998 198 : FD_VOLATILE( shmem->cmd_op ) = FD_ACCDB_CMD_IDLE;
3999 198 : *charge_busy = 1;
4000 198 : return;
4001 198 : }
4002 :
4003 3 : background_preevict( accdb, charge_busy, 0 );
4004 :
4005 12 : for( ulong k=0UL; k<FD_ACCDB_COMPACTION_LAYER_CNT; k++ ) {
4006 9 : background_compact( accdb, k, charge_busy );
4007 9 : }
4008 3 : }
4009 :
4010 : fd_accdb_shmem_metrics_t const *
4011 48 : fd_accdb_shmetrics( fd_accdb_t * accdb ) {
4012 48 : return accdb->shmem->shmetrics;
4013 48 : }
4014 :
4015 : fd_accdb_metrics_t const *
4016 9 : fd_accdb_metrics( fd_accdb_t * accdb ) {
4017 9 : return accdb->metrics;
4018 9 : }
4019 :
4020 : void
4021 : fd_accdb_cache_class_occupancy( fd_accdb_t * accdb,
4022 : ulong * used,
4023 : ulong * max,
4024 12 : ulong * reserved ) {
4025 108 : for( ulong c=0UL; c<FD_ACCDB_CACHE_CLASS_CNT; c++ ) {
4026 96 : ulong cap = accdb->shmem->cache_class_max[ c ];
4027 96 : ulong init = FD_VOLATILE_CONST( accdb->shmem->cache_class_init[ c ].val );
4028 96 : ulong freec = FD_VOLATILE_CONST( accdb->shmem->cache_free_cnt [ c ].val );
4029 96 : ulong live = init>freec ? init-freec : 0UL;
4030 96 : if( live>cap ) live = cap;
4031 96 : max [ c ] = cap;
4032 96 : used [ c ] = live;
4033 96 : reserved[ c ] = FD_VOLATILE_CONST( accdb->shmem->cache_class_used[ c ].val );
4034 96 : }
4035 12 : }
4036 :
4037 : void
4038 : fd_accdb_cache_class_thresholds( fd_accdb_t * accdb,
4039 : ulong * target_used,
4040 0 : ulong * low_water_used ) {
4041 0 : for( ulong c=0UL; c<FD_ACCDB_CACHE_CLASS_CNT; c++ ) {
4042 0 : ulong max_c = accdb->shmem->cache_class_max [ c ];
4043 0 : ulong free_tgt = accdb->shmem->cache_free_target [ c ];
4044 0 : ulong free_lwm = accdb->shmem->cache_free_low_water[ c ];
4045 0 : target_used [ c ] = max_c>free_tgt ? max_c-free_tgt : 0UL;
4046 0 : low_water_used[ c ] = max_c>free_lwm ? max_c-free_lwm : 0UL;
4047 0 : }
4048 0 : }
4049 :
4050 : #if FD_HAS_RACESAN
4051 :
4052 : /* Force pre-eviction (ignore the watermark) so a deterministic
4053 : single-threaded test can exercise the writeback path without
4054 : manufacturing real cache pressure. Sweeps several times: CLOCK needs
4055 : two visits to evict a recently-touched line (clear the "referenced"
4056 : bit, then evict), and the clock hand position carries across calls, so
4057 : one or two sweeps is not enough to guarantee every eligible line is
4058 : flushed back. */
4059 : void
4060 : fd_accdb_debug_force_preevict( fd_accdb_t * accdb ) {
4061 : for( ulong iter=0UL; iter<8UL; iter++ ) {
4062 : int charge_busy = 0;
4063 : background_preevict( accdb, &charge_busy, 1 );
4064 : }
4065 : }
4066 :
4067 : /* Locate the resident cache line currently holding `pubkey` (most recent
4068 : generation if multiple). Returns 1 and fills out_class/out_idx on a
4069 : hit, 0 if no resident line matches. Test-only helper so the test can
4070 : target a specific line without seeing the opaque fd_accdb struct. */
4071 :
4072 : int
4073 : fd_accdb_debug_find_line( fd_accdb_t * accdb,
4074 : uchar const * pubkey,
4075 : ulong * out_class,
4076 : ulong * out_idx ) {
4077 : int found = 0;
4078 : uint best_gen = 0U;
4079 : for( ulong c=0UL; c<FD_ACCDB_CACHE_CLASS_CNT; c++ ) {
4080 : ulong init = FD_VOLATILE_CONST( accdb->shmem->cache_class_init[ c ].val );
4081 : ulong max_c = accdb->shmem->cache_class_max[ c ];
4082 : if( init>max_c ) init = max_c;
4083 : for( ulong idx=0UL; idx<init; idx++ ) {
4084 : fd_accdb_cache_line_t * line = cache_line( accdb, c, idx );
4085 : if( line->key.generation==UINT_MAX ) continue;
4086 : if( memcmp( line->key.pubkey, pubkey, 32UL ) ) continue;
4087 : if( !found || line->key.generation>=best_gen ) {
4088 : best_gen = line->key.generation;
4089 : *out_class = c;
4090 : *out_idx = idx;
4091 : found = 1;
4092 : }
4093 : }
4094 : }
4095 : return found;
4096 : }
4097 :
4098 : /* Deterministically evict a single specified cache line via the
4099 : foreground evictor's claim sequence (CAS refcnt 0->EVICT_SENTINEL),
4100 : then write the dirty line back exactly as fd_accdb_acquire_inner's
4101 : STEP-4 / background_ preevict do (pubkey from accmeta, owner+data
4102 : from the line). Mirrors acquire_cache_line's CLOCK-claim path
4103 : (fd_accdb.c) so a racesan test can reproduce, without a 640+-slot
4104 : cache-pressure rig, the interleaving where acc_unlink observes
4105 : EVICT_SENTINEL on the line it is unlinking.
4106 :
4107 : The fd_racesan_hook("clock_evict:post_sentinel") fires right after
4108 : the sentinel is installed (matching the production foreground path),
4109 : so the test can suspend this fiber holding the sentinel while another
4110 : fiber drives acc_unlink to its reclaim CAS. Returns the captured
4111 : evicted acc_idx (UINT_MAX if the line was clean / unbound). */
4112 :
4113 : uint
4114 : fd_accdb_debug_clock_evict_line( fd_accdb_t * accdb,
4115 : ulong size_class,
4116 : ulong line_idx ) {
4117 : fd_accdb_shmem_t * shmem = accdb->shmem;
4118 : fd_accdb_cache_line_t * line = cache_line( accdb, size_class, line_idx );
4119 :
4120 : /* Claim for eviction, same as acquire_cache_line's CLOCK path. */
4121 : if( FD_UNLIKELY( FD_ATOMIC_CAS( &line->refcnt, 0U, FD_ACCDB_EVICT_SENTINEL )!=0U ) ) return UINT_MAX;
4122 :
4123 : fd_racesan_hook( "clock_evict:post_sentinel" );
4124 :
4125 : uint acc_idx = line->acc_idx;
4126 : if( FD_LIKELY( acc_idx!=UINT_MAX ) ) {
4127 : evict_clear_acc_cache_ref( &accdb->acc_pool[ acc_idx ], size_class, line_idx );
4128 : }
4129 : uint evicted_acc_idx = line->persisted ? UINT_MAX : acc_idx;
4130 : line->key.generation = UINT_MAX;
4131 :
4132 : /* Write back the dirty line, exactly like the production writeback
4133 : sites: this is the synthesis that would emit a pubkey=NEW/owner=OLD
4134 : poison record if the accmeta slot had been recycled out from under
4135 : us. In the SENTINEL-vs-acc_unlink race this proves no poison: the
4136 : epoch the evictor holds blocks drain_deferred_frees, so the slot is
4137 : never recycled while we are here. */
4138 : if( FD_UNLIKELY( !line->persisted && acc_idx!=UINT_MAX ) ) {
4139 : fd_accdb_accmeta_t * accmeta = &accdb->acc_pool[ acc_idx ];
4140 : ulong entry_sz = sizeof(fd_accdb_disk_meta_t)+(ulong)FD_ACCDB_SIZE_DATA( accmeta->executable_size );
4141 :
4142 : ulong old_offset = fd_accdb_acc_xchg_offset( accmeta, FD_ACCDB_OFF_INVAL );
4143 : if( FD_LIKELY( old_offset!=FD_ACCDB_OFF_INVAL ) ) {
4144 : fd_accdb_shmem_bytes_freed( shmem, old_offset, entry_sz );
4145 : FD_ATOMIC_FETCH_AND_SUB( &shmem->shmetrics->disk_used_bytes, entry_sz );
4146 : }
4147 :
4148 : fd_accdb_disk_meta_t meta;
4149 : fd_memcpy( meta.pubkey, accmeta->key.pubkey, 32UL );
4150 : meta.size = FD_ACCDB_SIZE_DATA( accmeta->executable_size );
4151 : fd_memcpy( meta.owner, line->owner, 32UL );
4152 :
4153 : struct iovec iovs[ 2UL ] = {
4154 : { .iov_base = &meta, .iov_len = sizeof(fd_accdb_disk_meta_t) },
4155 : { .iov_base = (void *)(line+1UL), .iov_len = FD_ACCDB_SIZE_DATA( accmeta->executable_size ) }
4156 : };
4157 : ulong file_off = allocate_next_write( accdb, entry_sz );
4158 : ulong written = 0UL;
4159 : while( written<entry_sz ) {
4160 : long result = pwritev2( accdb->fd, iovs, 2, (long)(file_off+written), 0 );
4161 : if( FD_UNLIKELY( result==-1 && errno==EINTR ) ) continue;
4162 : else if( FD_UNLIKELY( result<=0 ) ) FD_LOG_ERR(( "pwritev2() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
4163 : written += (ulong)result;
4164 : for( int v=0; v<2; v++ ) {
4165 : if( (ulong)result>=iovs[ v ].iov_len ) { result -= (long)iovs[ v ].iov_len; iovs[ v ].iov_len = 0UL; }
4166 : else { iovs[ v ].iov_base = (uchar *)iovs[ v ].iov_base + result; iovs[ v ].iov_len -= (ulong)result; break; }
4167 : }
4168 : }
4169 : FD_COMPILER_MFENCE();
4170 : accmeta->offset_fork = fd_accdb_acc_pack_offset_fork( file_off, fd_accdb_acc_fork_id(accmeta) );
4171 : FD_ATOMIC_FETCH_AND_ADD( &shmem->shmetrics->disk_used_bytes, entry_sz );
4172 : }
4173 :
4174 : line->persisted = 1;
4175 : line->acc_idx = UINT_MAX;
4176 : line->key.generation = UINT_MAX;
4177 : line->refcnt = 0;
4178 : cache_free_push( accdb, size_class, line );
4179 : return evicted_acc_idx;
4180 : }
4181 :
4182 : #endif
|