Line data Source code
1 : #include "fd_accdb_impl_v2.h"
2 : #include "fd_accdb_funk.h"
3 : #include "fd_vinyl_req_pool.h"
4 : #include <stdatomic.h>
5 :
6 : FD_STATIC_ASSERT( alignof(fd_accdb_user_v2_t)<=alignof(fd_accdb_user_t), layout );
7 : FD_STATIC_ASSERT( sizeof (fd_accdb_user_v2_t)<=sizeof(fd_accdb_user_t), layout );
8 :
9 : /* Record synchronization *********************************************/
10 :
11 : /* fd_funk_rec_write_{lock,unlock} acquire/release a record write lock.
12 :
13 : These are assumed to never fail because writes to accdb records are
14 : externally coordinated to be non-conflicting at the record level.
15 : In other words, it is assumed that, when the caller attempts to
16 : write to a record:
17 : - no admin attempts to root or cancel the DB txn this write targets
18 : (admin may only root/cancel txns after they are frozen, and users
19 : may only write to txns before they are frozen)
20 : - no other user attempts to read/write to the same record until the
21 : current thread is done writing. (Other threads wait for the
22 : current thread to signal completion before attempting to access) */
23 :
24 : static void
25 0 : fd_funk_rec_write_lock( fd_funk_rec_t * rec ) {
26 0 : ulong volatile * vl = &rec->ver_lock;
27 0 : ulong val = FD_VOLATILE_CONST( *vl );
28 0 : if( FD_UNLIKELY( fd_funk_rec_lock_bits( val ) ) ) {
29 0 : FD_LOG_CRIT(( "fd_funk_rec_write_lock(" FD_FUNK_REC_PAIR_FMT ") failed: record has active readers",
30 0 : FD_FUNK_REC_PAIR_FMT_ARGS( rec->pair ) ));
31 0 : }
32 0 : ulong val_new = fd_funk_rec_ver_lock( fd_funk_rec_ver_bits( val ), FD_FUNK_REC_LOCK_MASK );
33 0 : if( FD_UNLIKELY( FD_ATOMIC_CAS( vl, val, val_new )!=val ) ) {
34 0 : FD_LOG_CRIT(( "fd_funk_rec_write_lock(" FD_FUNK_REC_PAIR_FMT ") failed: data race detected",
35 0 : FD_FUNK_REC_PAIR_FMT_ARGS( rec->pair ) ));
36 0 : }
37 0 : }
38 :
39 : static void
40 0 : fd_funk_rec_write_lock_uncontended( fd_funk_rec_t * rec ) {
41 0 : ulong val = rec->ver_lock;
42 0 : ulong val_ver = fd_funk_rec_ver_bits( val );
43 0 : rec->ver_lock = fd_funk_rec_ver_lock( val_ver, FD_FUNK_REC_LOCK_MASK );
44 0 : }
45 :
46 : static void
47 0 : fd_funk_rec_write_unlock( fd_funk_rec_t * rec ) {
48 0 : ulong volatile * vl = &rec->ver_lock;
49 0 : ulong val = FD_VOLATILE_CONST( *vl );
50 0 : if( FD_UNLIKELY( fd_funk_rec_lock_bits( val )!=FD_FUNK_REC_LOCK_MASK ) ) {
51 0 : FD_LOG_CRIT(( "fd_funk_rec_write_unlock(" FD_FUNK_REC_PAIR_FMT ") failed: record is not write locked",
52 0 : FD_FUNK_REC_PAIR_FMT_ARGS( rec->pair ) ));
53 0 : }
54 0 : ulong val_new = fd_funk_rec_ver_lock( fd_funk_rec_ver_bits( val ), 0UL );
55 0 : if( FD_UNLIKELY( FD_ATOMIC_CAS( vl, val, val_new )!=val ) ) {
56 0 : FD_LOG_CRIT(( "fd_funk_rec_write_unlock(" FD_FUNK_REC_PAIR_FMT ") failed: data race detected",
57 0 : FD_FUNK_REC_PAIR_FMT_ARGS( rec->pair ) ));
58 0 : }
59 0 : }
60 :
61 : /* fd_funk_rec_read_{lock_try,unlock} try-acquire/release a record read
62 : lock.
63 :
64 : Read lock acquires may fail for frozen txn. This is because an admin
65 : may concurrently root the txn as we are querying. */
66 :
67 : static int
68 0 : fd_funk_rec_read_lock( fd_funk_rec_t * rec ) {
69 0 : ulong volatile * vl = &rec->ver_lock;
70 0 : for(;;) {
71 0 : ulong val = FD_VOLATILE_CONST( *vl );
72 0 : ulong val_ver = fd_funk_rec_ver_bits ( val );
73 0 : ulong val_lock = fd_funk_rec_lock_bits( val );
74 0 : if( FD_UNLIKELY( !fd_funk_rec_ver_alive( val_ver ) ) ) {
75 0 : return FD_MAP_ERR_AGAIN;
76 0 : }
77 0 : if( FD_UNLIKELY( val_lock>=FD_FUNK_REC_LOCK_MASK-1 ) ) {
78 0 : FD_LOG_CRIT(( "fd_funk_rec_read_lock(" FD_FUNK_REC_PAIR_FMT ") failed: val_lock=%#lx (too many readers or already write locked)",
79 0 : FD_FUNK_REC_PAIR_FMT_ARGS( rec->pair ), val_lock ));
80 0 : }
81 0 : ulong val_new = fd_funk_rec_ver_lock( val_ver, val_lock+1UL );
82 0 : if( FD_LIKELY( FD_ATOMIC_CAS( vl, val, val_new )==val ) ) {
83 0 : return FD_MAP_SUCCESS;
84 0 : }
85 0 : }
86 0 : }
87 :
88 : static void
89 0 : fd_funk_rec_read_unlock( fd_funk_rec_t * rec ) {
90 0 : ulong volatile * vl = &rec->ver_lock;
91 0 : for(;;) {
92 0 : ulong val = FD_VOLATILE_CONST( *vl );
93 0 : ulong val_ver = fd_funk_rec_ver_bits ( val );
94 0 : ulong val_lock = fd_funk_rec_lock_bits( val );
95 0 : if( FD_UNLIKELY( val_lock==0UL || val_lock==FD_FUNK_REC_LOCK_MASK ) ) {
96 0 : FD_LOG_CRIT(( "fd_funk_rec_read_unlock(" FD_FUNK_REC_PAIR_FMT ") failed: val_lock=%#lx (cannot unlock)",
97 0 : FD_FUNK_REC_PAIR_FMT_ARGS( rec->pair ), val_lock ));
98 0 : }
99 0 : ulong val_new = fd_funk_rec_ver_lock( val_ver, val_lock-1UL );
100 0 : if( FD_LIKELY( FD_ATOMIC_CAS( vl, val, val_new )==val ) ) {
101 0 : return;
102 0 : }
103 0 : }
104 0 : }
105 :
106 : /* Record acquisition *************************************************/
107 :
108 : /* funk_rec_acquire finds the newest revision of 'key' in the funk hash
109 : chain at index chain_idx. Only considers records on the current fork
110 : (fork nodes stored in accdb).
111 :
112 : On success, returns ACQUIRE_{READ,WRITE} and points *out_rec to the
113 : record found. If is_write, a write-lock is acquired for this record,
114 : otherwise a read lock. It is the caller's responsibility to release
115 : this lock.
116 :
117 : If no record was found, returns ACQUIRE_NOT_FOUND.
118 :
119 : Returns ACQUIRE_FAILED on index contention (hash map locks) or
120 : record contention (admin started deleting the record just as we were
121 : about to start the access). The caller should retry in this case. */
122 :
123 0 : #define ACQUIRE_READ 0
124 0 : #define ACQUIRE_WRITE 1
125 0 : #define ACQUIRE_NOT_FOUND 2
126 0 : #define ACQUIRE_FAILED 3
127 :
128 : static int
129 : funk_rec_acquire( fd_accdb_user_v2_t const * accdb,
130 : ulong chain_idx,
131 : fd_funk_rec_key_t const * key,
132 : fd_funk_rec_t ** out_rec,
133 0 : _Bool is_write ) {
134 0 : *out_rec = NULL;
135 :
136 0 : fd_accdb_lineage_t const * lineage = accdb->lineage;
137 0 : fd_funk_rec_map_shmem_t const * shmap = accdb->funk->rec_map->map;
138 0 : fd_funk_rec_map_shmem_private_chain_t const * chain_tbl = fd_funk_rec_map_shmem_private_chain_const( shmap, 0UL );
139 0 : fd_funk_rec_map_shmem_private_chain_t const * chain = chain_tbl + chain_idx;
140 0 : fd_funk_rec_t * rec_tbl = accdb->funk->rec_pool->ele;
141 0 : ulong rec_max = fd_funk_rec_pool_ele_max( accdb->funk->rec_pool );
142 :
143 0 : uint ele_idx = chain->head_cidx;
144 0 : ulong _Atomic * ver_cnt_p = (ulong _Atomic *)&chain->ver_cnt;
145 0 : ulong ver_cnt = atomic_load_explicit( ver_cnt_p, memory_order_acquire );
146 :
147 : /* Start a speculative transaction for the chain containing revisions
148 : of the account key we are looking for. */
149 0 : ulong cnt = fd_funk_rec_map_private_vcnt_cnt( ver_cnt );
150 0 : if( FD_UNLIKELY( fd_funk_rec_map_private_vcnt_ver( ver_cnt )&1 ) ) {
151 0 : return ACQUIRE_FAILED; /* chain is locked */
152 0 : }
153 :
154 : /* Walk the map chain, bail at the first entry
155 : (Each chain is sorted newest-to-oldest) */
156 0 : fd_funk_rec_t * best = NULL;
157 0 : for( ulong i=0UL; i<cnt; i++ ) {
158 0 : uint ele_next = rec_tbl[ ele_idx ].map_next;
159 0 : if( FD_UNLIKELY( atomic_load_explicit( ver_cnt_p, memory_order_acquire )!=ver_cnt ) ) {
160 0 : return ACQUIRE_FAILED;
161 0 : }
162 :
163 0 : if( FD_UNLIKELY( ele_idx>=rec_max ) ) {
164 0 : FD_LOG_CRIT(( "funk_rec_acquire detected memory corruption: invalid ele_idx at node %lu:%u (rec_max %lu)",
165 0 : chain_idx, ele_idx, rec_max ));
166 0 : }
167 :
168 0 : fd_funk_rec_t * rec = &rec_tbl[ ele_idx ];
169 :
170 : /* Skip over unrelated records (hash collision) */
171 0 : if( FD_UNLIKELY( !fd_funk_rec_key_eq( rec->pair.key, key ) ) ) goto next;
172 :
173 : /* Confirm that record is part of the current fork */
174 0 : if( FD_UNLIKELY( !fd_accdb_lineage_has_xid( lineage, rec->pair.xid ) ) ) goto next;
175 :
176 0 : if( FD_UNLIKELY( ele_next==ele_idx ) ) {
177 0 : FD_LOG_CRIT(( "funk_rec_acquire detected cycle" ));
178 0 : }
179 0 : best = rec;
180 0 : break;
181 :
182 0 : next:
183 0 : ele_idx = ele_next;
184 0 : }
185 0 : if( FD_UNLIKELY( !best && ele_idx!=FD_FUNK_REC_IDX_NULL ) ) {
186 0 : FD_LOG_CRIT(( "funk_rec_acquire detected malformed chain (%lu): found more nodes than chain header indicated (%lu)", chain_idx, cnt ));
187 0 : }
188 :
189 : /* Found a record, acquire a lock */
190 0 : if( best ) {
191 : /* If the write does not target the current transaction, demote it */
192 0 : if( is_write && !fd_accdb_lineage_is_tip( accdb->lineage, best->pair.xid ) ) {
193 0 : is_write = 0;
194 0 : }
195 0 : if( is_write ) {
196 0 : fd_funk_rec_write_lock( best );
197 0 : } else {
198 0 : if( fd_funk_rec_read_lock( best )!=FD_MAP_SUCCESS ) {
199 0 : return ACQUIRE_FAILED; /* record about to be moved to vinyl */
200 0 : }
201 0 : }
202 0 : }
203 :
204 : /* Retry if there was contention at the hash map */
205 0 : if( FD_UNLIKELY( atomic_load_explicit( ver_cnt_p, memory_order_acquire )!=ver_cnt ) ) {
206 0 : if( best ) {
207 0 : if( is_write ) fd_funk_rec_write_unlock( best );
208 0 : else fd_funk_rec_read_unlock ( best );
209 0 : }
210 0 : return ACQUIRE_FAILED;
211 0 : }
212 :
213 0 : *out_rec = best;
214 0 : return best ? ( is_write ? ACQUIRE_WRITE : ACQUIRE_READ ) : ACQUIRE_NOT_FOUND;
215 0 : }
216 :
217 : static int
218 : funk_open_ref( fd_accdb_user_v2_t * accdb,
219 : fd_accdb_ref_t * ref,
220 : fd_funk_txn_xid_t const * xid,
221 : void const * address,
222 0 : _Bool is_write ) {
223 0 : fd_funk_t const * funk = accdb->funk;
224 0 : fd_funk_rec_key_t key[1]; memcpy( key->uc, address, 32UL );
225 :
226 : /* Hash key to chain */
227 0 : fd_funk_xid_key_pair_t pair[1];
228 0 : fd_funk_txn_xid_copy( pair->xid, xid );
229 0 : fd_funk_rec_key_copy( pair->key, key );
230 0 : fd_funk_rec_map_t const * rec_map = funk->rec_map;
231 0 : ulong hash = fd_funk_rec_map_key_hash( pair, rec_map->map->seed );
232 0 : ulong chain_idx = (hash & (rec_map->map->chain_cnt-1UL) );
233 :
234 : /* Traverse chain for candidate */
235 0 : fd_funk_rec_t * rec = NULL;
236 0 : int err;
237 0 : for(;;) {
238 0 : err = funk_rec_acquire( accdb, chain_idx, key, &rec, is_write );
239 0 : if( FD_LIKELY( err!=ACQUIRE_FAILED ) ) break;
240 0 : FD_SPIN_PAUSE();
241 : /* FIXME backoff */
242 0 : }
243 0 : if( rec ) {
244 0 : memcpy( ref->address, address, 32UL );
245 0 : ref->accdb_type = FD_ACCDB_TYPE_V1;
246 0 : ref->ref_type = err==ACQUIRE_WRITE ? FD_ACCDB_REF_RW : FD_ACCDB_REF_RO;
247 0 : ref->user_data = (ulong)rec;
248 0 : ref->user_data2 = 0UL;
249 0 : ref->meta_laddr = (ulong)fd_funk_val( rec, funk->wksp );
250 0 : } else {
251 0 : memset( ref, 0, sizeof(fd_accdb_rw_t) );
252 0 : }
253 0 : return err;
254 0 : }
255 :
256 : /* Read method ********************************************************/
257 :
258 : void
259 : fd_accdb_user_v2_open_ro_multi( fd_accdb_user_t * accdb,
260 : fd_accdb_ro_t * ro0,
261 : fd_funk_txn_xid_t const * xid,
262 : void const * addr0,
263 0 : ulong cnt ) {
264 0 : fd_accdb_user_v2_t * v2 = (fd_accdb_user_v2_t *)accdb;
265 0 : fd_vinyl_rq_t * rq = v2->vinyl_rq; /* "request queue "*/
266 0 : fd_vinyl_req_pool_t * req_pool = v2->vinyl_req_pool; /* "request pool" */
267 0 : fd_wksp_t * req_wksp = v2->vinyl_req_wksp; /* shm workspace containing request buffer */
268 0 : fd_wksp_t * data_wksp = v2->vinyl_data_wksp; /* shm workspace containing vinyl data cache */
269 0 : ulong link_id = v2->vinyl_link_id; /* vinyl client ID */
270 :
271 0 : if( FD_UNLIKELY( cnt>fd_vinyl_req_batch_key_max( req_pool ) ) ) {
272 0 : FD_LOG_CRIT(( "open_ro_multi cnt %lu exceeds vinyl request batch max %lu",
273 0 : cnt, fd_vinyl_req_batch_key_max( req_pool ) ));
274 0 : }
275 :
276 : /* Open accounts from funk
277 :
278 : (FIXME this is a potentially slow operation, might want to fire off
279 : a 'prefetch' instruction to vinyl asynchronously before doing this,
280 : so that the vinyl data is in cache by the time v1_open_rw_multi
281 : finishes) */
282 :
283 0 : fd_accdb_lineage_set_fork( v2->lineage, v2->funk, xid );
284 0 : ulong addr_laddr = (ulong)addr0;
285 0 : for( ulong i=0UL; i<cnt; i++ ) {
286 0 : void const * addr_i = (void const *)( (ulong)addr_laddr + i*32UL );
287 0 : if( funk_open_ref( v2, ro0[i].ref, xid, addr_i, 0 )==ACQUIRE_READ ) {
288 0 : v2->base.ro_active++;
289 0 : } else {
290 0 : fd_accdb_ro_init_empty( &ro0[i], addr_i );
291 0 : }
292 0 : }
293 :
294 : /* For the accounts that were not found in funk, open vinyl records */
295 :
296 0 : ulong batch_idx = fd_vinyl_req_pool_acquire( req_pool );
297 : /* req_pool_release called before returning */
298 0 : fd_vinyl_comp_t * comp = fd_vinyl_req_batch_comp ( req_pool, batch_idx );
299 0 : fd_vinyl_key_t * req_key0 = fd_vinyl_req_batch_key ( req_pool, batch_idx );
300 0 : schar * req_err0 = fd_vinyl_req_batch_err ( req_pool, batch_idx );
301 0 : ulong * req_val_gaddr0 = fd_vinyl_req_batch_val_gaddr( req_pool, batch_idx );
302 :
303 : /* Create a read-only vinyl "ACQUIRE" batch */
304 :
305 0 : ulong req_cnt = 0UL;
306 0 : for( ulong i=0UL; i<cnt; i++ ) {
307 0 : if( ro0[i].ref->accdb_type!=FD_ACCDB_TYPE_NONE ) continue;
308 : /* At this point, addr0[i] not found in funk, load from vinyl */
309 0 : void const * addr_i = (void const *)( (ulong)addr0 + i*32UL );
310 :
311 0 : fd_vinyl_key_init( req_key0+req_cnt, addr_i, 32UL );
312 0 : req_err0 [ req_cnt ] = 0;
313 0 : req_val_gaddr0[ req_cnt ] = 0UL;
314 0 : req_cnt++;
315 0 : }
316 0 : if( !req_cnt ) {
317 : /* All records were found in funk, bail early */
318 0 : fd_vinyl_req_pool_release( req_pool, batch_idx );
319 0 : return;
320 0 : }
321 :
322 : /* Send read-only "ACQUIRE" batch to vinyl and wait for response */
323 :
324 0 : ulong req_id = v2->vinyl_req_id++;
325 0 : memset( comp, 0, sizeof(fd_vinyl_comp_t) );
326 0 : fd_vinyl_req_send_batch( rq, req_pool, req_wksp, req_id, link_id, FD_VINYL_REQ_TYPE_ACQUIRE, 0UL, batch_idx, req_cnt );
327 :
328 0 : while( FD_VOLATILE_CONST( comp->seq )!=1UL ) FD_SPIN_PAUSE();
329 0 : FD_COMPILER_MFENCE();
330 0 : int comp_err = FD_VOLATILE_CONST( comp->err );
331 0 : if( FD_UNLIKELY( comp_err!=FD_VINYL_SUCCESS ) ) {
332 0 : FD_LOG_CRIT(( "vinyl tile rejected my ACQUIRE request: %i-%s", comp_err, fd_vinyl_strerror( comp_err ) ));
333 0 : }
334 :
335 : /* For the accounts that were newly found in vinyl, create accdb
336 : handles */
337 :
338 0 : req_cnt = 0UL;
339 0 : for( ulong i=0UL; i<cnt; i++ ) {
340 0 : if( ro0[i].ref->accdb_type!=FD_ACCDB_TYPE_NONE ) continue;
341 0 : void const * addr_i = (void const *)( (ulong)addr0 + i*32UL );
342 :
343 0 : int req_err = FD_VOLATILE_CONST( req_err0 [ req_cnt ] );
344 0 : ulong val_gaddr = FD_VOLATILE_CONST( req_val_gaddr0[ req_cnt ] );
345 :
346 0 : fd_accdb_ro_t * ro = &ro0[ i ];
347 0 : if( req_err==0 ) {
348 : /* Record found in vinyl, create reference */
349 0 : fd_account_meta_t const * meta = fd_wksp_laddr_fast( data_wksp, val_gaddr );
350 :
351 0 : accdb->base.ro_active++;
352 0 : *ro = (fd_accdb_ro_t) {0};
353 0 : memcpy( ro->ref->address, addr_i, 32UL );
354 0 : ro->ref->accdb_type = FD_ACCDB_TYPE_V2;
355 0 : ro->ref->ref_type = FD_ACCDB_REF_RO;
356 0 : ro->meta = meta;
357 0 : } else if( FD_UNLIKELY( req_err!=FD_VINYL_ERR_KEY ) ) {
358 0 : FD_LOG_CRIT(( "vinyl tile ACQUIRE request failed: %i-%s", req_err, fd_vinyl_strerror( req_err ) ));
359 0 : }
360 0 : req_cnt++;
361 0 : }
362 :
363 0 : fd_vinyl_req_pool_release( req_pool, batch_idx );
364 :
365 : /* At this point, ownership of vinyl records transitions to caller.
366 : (Released using close_ro_multi) */
367 0 : }
368 :
369 : /* Write method *******************************************************/
370 :
371 : void
372 : fd_accdb_user_v2_open_rw_multi( fd_accdb_user_t * accdb,
373 : fd_accdb_rw_t * rw0,
374 : fd_funk_txn_xid_t const * xid,
375 : void const * addr0,
376 : ulong const * data_max0,
377 : int flags,
378 0 : ulong cnt ) {
379 0 : fd_accdb_user_v2_t * v2 = (fd_accdb_user_v2_t *)accdb;
380 0 : fd_funk_t * funk = v2->funk;
381 0 : fd_vinyl_rq_t * rq = v2->vinyl_rq; /* "request queue "*/
382 0 : fd_vinyl_req_pool_t * req_pool = v2->vinyl_req_pool; /* "request pool" */
383 0 : fd_wksp_t * req_wksp = v2->vinyl_req_wksp; /* shm workspace containing request buffer */
384 0 : fd_wksp_t * data_wksp = v2->vinyl_data_wksp; /* shm workspace containing vinyl data cache */
385 0 : ulong link_id = v2->vinyl_link_id; /* vinyl client ID */
386 :
387 0 : fd_accdb_lineage_set_fork( v2->lineage, v2->funk, xid );
388 0 : fd_funk_txn_t * txn = fd_accdb_lineage_write_check( v2->lineage, v2->funk );
389 :
390 0 : int const flag_truncate = !!( flags & FD_ACCDB_FLAG_TRUNCATE );
391 0 : int const flag_create = !!( flags & FD_ACCDB_FLAG_CREATE );
392 0 : if( FD_UNLIKELY( flags & ~(FD_ACCDB_FLAG_CREATE|FD_ACCDB_FLAG_TRUNCATE) ) ) {
393 0 : FD_LOG_CRIT(( "invalid flags for open_rw: %#02x", (uint)flags ));
394 0 : }
395 :
396 0 : if( FD_UNLIKELY( cnt>fd_vinyl_req_batch_key_max( req_pool ) ) ) {
397 0 : FD_LOG_CRIT(( "open_rw_multi cnt %lu exceeds vinyl request batch max %lu",
398 0 : cnt, fd_vinyl_req_batch_key_max( req_pool ) ));
399 0 : }
400 :
401 : /* Query for existing funk records
402 :
403 : (FIXME this is a potentially slow operation, might want to fire off
404 : a 'prefetch' instruction to vinyl asynchronously before doing this,
405 : so that the vinyl data is in cache by the time v1_open_rw_multi
406 : finishes) */
407 :
408 0 : ulong addr_laddr = (ulong)addr0;
409 0 : for( ulong i=0UL; i<cnt; i++ ) {
410 0 : void const * addr_i = (void const *)( (ulong)addr_laddr + i*32UL );
411 0 : funk_open_ref( v2, rw0[ i ].ref, xid, addr_i, 1 );
412 0 : }
413 :
414 : /* For the accounts that were not found in funk, create writable funk
415 : records from elements in vinyl. */
416 :
417 0 : ulong batch_idx = fd_vinyl_req_pool_acquire( req_pool );
418 : /* req_pool_release called before returning */
419 0 : fd_vinyl_comp_t * comp = fd_vinyl_req_batch_comp ( req_pool, batch_idx );
420 0 : fd_vinyl_key_t * req_key0 = fd_vinyl_req_batch_key ( req_pool, batch_idx );
421 0 : schar * req_err0 = fd_vinyl_req_batch_err ( req_pool, batch_idx );
422 0 : ulong * req_val_gaddr0 = fd_vinyl_req_batch_val_gaddr( req_pool, batch_idx );
423 :
424 : /* Create a read-only vinyl "ACQUIRE" batch */
425 :
426 0 : ulong req_cnt = 0UL;
427 0 : for( ulong i=0UL; i<cnt; i++ ) {
428 0 : if( rw0[i].ref->ref_type!=FD_ACCDB_REF_INVAL ) continue;
429 : /* At this point, addr0[i] not found in funk, load from vinyl */
430 0 : void const * addr_i = (void const *)( (ulong)addr0 + i*32UL );
431 :
432 0 : fd_vinyl_key_init( req_key0+req_cnt, addr_i, 32UL );
433 0 : req_err0 [ req_cnt ] = 0;
434 0 : req_val_gaddr0[ req_cnt ] = 0UL;
435 0 : req_cnt++;
436 0 : }
437 :
438 : /* Send read-only "ACQUIRE" batch to vinyl and wait for response */
439 :
440 0 : if( req_cnt ) {
441 0 : ulong req_id = v2->vinyl_req_id++;
442 0 : memset( fd_vinyl_req_batch_comp( req_pool, batch_idx ), 0, sizeof(fd_vinyl_comp_t) );
443 0 : fd_vinyl_req_send_batch( rq, req_pool, req_wksp, req_id, link_id, FD_VINYL_REQ_TYPE_ACQUIRE, 0UL, batch_idx, req_cnt );
444 :
445 0 : while( FD_VOLATILE_CONST( comp->seq )!=1UL ) FD_SPIN_PAUSE();
446 0 : FD_COMPILER_MFENCE();
447 0 : int comp_err = FD_VOLATILE_CONST( comp->err );
448 0 : if( FD_UNLIKELY( comp_err!=FD_VINYL_SUCCESS ) ) {
449 0 : FD_LOG_CRIT(( "vinyl tile rejected my ACQUIRE request: %i-%s", comp_err, fd_vinyl_strerror( comp_err ) ));
450 0 : }
451 0 : }
452 :
453 : /* Promote any found accounts to writable accounts */
454 :
455 0 : req_cnt = 0UL;
456 0 : for( ulong i=0UL; i<cnt; i++ ) {
457 0 : void const * addr_i = (void const *)( (ulong)addr0 + i*32UL );
458 0 : fd_accdb_rw_t * rw = &rw0[ i ];
459 0 : fd_funk_rec_t * rec = (fd_funk_rec_t *)rw->ref->user_data;
460 0 : ulong data_max = data_max0[ i ];
461 :
462 0 : if( rw->ref->ref_type==FD_ACCDB_REF_RW ) {
463 : /* Mutable record found, modify in-place */
464 :
465 0 : if( FD_UNLIKELY( !flag_create && fd_accdb_ref_lamports( rw->ro )==0UL ) ) {
466 : /* Tombstone */
467 0 : fd_funk_rec_write_unlock( rec );
468 0 : goto not_found;
469 0 : }
470 :
471 0 : ulong acc_orig_sz = fd_accdb_ref_data_sz( rw->ro );
472 0 : ulong val_sz_min = sizeof(fd_account_meta_t)+fd_ulong_max( data_max, acc_orig_sz );
473 0 : void * val = fd_funk_val_truncate( rec, funk->alloc, funk->wksp, 16UL, val_sz_min, NULL );
474 0 : if( FD_UNLIKELY( !val ) ) {
475 0 : FD_LOG_CRIT(( "Failed to modify account: out of memory allocating %lu bytes", acc_orig_sz ));
476 0 : }
477 0 : fd_accdb_funk_prep_inplace( rw, funk, rec );
478 0 : if( flag_truncate ) {
479 0 : rec->val_sz = sizeof(fd_account_meta_t);
480 0 : rw->meta->dlen = 0;
481 0 : }
482 0 : accdb->base.rw_active++;
483 : /* Retain write lock */
484 :
485 0 : continue; /* next account */
486 0 : }
487 :
488 0 : if( rw->ref->ref_type==FD_ACCDB_REF_RO ) {
489 : /* Frozen record found, copy out to new object */
490 :
491 0 : fd_accdb_ro_t * ro = rw->ro;
492 0 : if( FD_UNLIKELY( !flag_create && fd_accdb_ref_lamports( ro )==0UL ) ) {
493 : /* Tombstone */
494 0 : fd_funk_rec_read_unlock( rec );
495 0 : goto not_found;
496 0 : }
497 :
498 0 : ulong acc_orig_sz = fd_accdb_ref_data_sz( ro );
499 0 : ulong val_sz_min = sizeof(fd_account_meta_t)+fd_ulong_max( data_max, acc_orig_sz );
500 0 : ulong val_sz = flag_truncate ? sizeof(fd_account_meta_t) : rec->val_sz;
501 0 : ulong val_max = 0UL;
502 0 : void * val = fd_alloc_malloc_at_least( funk->alloc, 16UL, val_sz_min, &val_max );
503 0 : if( FD_UNLIKELY( !val ) ) {
504 0 : FD_LOG_CRIT(( "Failed to modify account: out of memory allocating %lu bytes", acc_orig_sz ));
505 0 : }
506 :
507 0 : fd_account_meta_t * meta = val;
508 0 : uchar * data = (uchar *)( meta+1 );
509 0 : ulong data_max_actual = val_max - sizeof(fd_account_meta_t);
510 0 : if( flag_truncate ) fd_accdb_funk_copy_truncated( meta, ro->meta );
511 0 : else fd_accdb_funk_copy_account ( meta, data, ro->meta, fd_account_data( ro->meta ) );
512 0 : if( acc_orig_sz<data_max_actual ) {
513 : /* Zero out trailing data */
514 0 : uchar * tail = data +acc_orig_sz;
515 0 : ulong tail_sz = data_max_actual-acc_orig_sz;
516 0 : fd_memset( tail, 0, tail_sz );
517 0 : }
518 :
519 0 : fd_accdb_funk_prep_create( rw, funk, txn, addr_i, val, val_sz, val_max );
520 0 : accdb->base.rw_active++;
521 :
522 0 : FD_COMPILER_MFENCE();
523 0 : fd_funk_rec_read_unlock( rec );
524 :
525 0 : continue; /* next account */
526 0 : }
527 :
528 : /* Record not found in funk */
529 :
530 0 : int req_err = req_err0[ req_cnt ];
531 0 : if( req_err ) {
532 : /* Record not found in vinyl either (truly does not exist)
533 : If CREATE flag was requested, create it in funk */
534 0 : if( FD_UNLIKELY( req_err!=FD_VINYL_ERR_KEY ) ) {
535 0 : FD_LOG_CRIT(( "vinyl tile ACQUIRE request failed: %i-%s", req_err, fd_vinyl_strerror( req_err ) ));
536 0 : }
537 0 : not_found:
538 0 : if( flag_create ) {
539 0 : fd_accdb_funk_create( v2->funk, rw, txn, addr_i, data_max0[ i ] );
540 0 : fd_funk_rec_write_lock_uncontended( (fd_funk_rec_t *)rw->ref->user_data );
541 0 : accdb->base.rw_active++;
542 0 : } else {
543 0 : memset( rw, 0, sizeof(fd_accdb_ref_t) );
544 0 : }
545 0 : req_cnt++;
546 0 : continue;
547 0 : }
548 :
549 : /* Record found in vinyl */
550 :
551 0 : ulong req_val_gaddr = req_val_gaddr0[ req_cnt ];
552 0 : fd_account_meta_t * src_meta = fd_wksp_laddr_fast( data_wksp, req_val_gaddr );
553 0 : uchar const * src_data = (uchar *)( src_meta+1 );
554 :
555 0 : if( FD_UNLIKELY( src_meta->lamports==0UL ) ) goto not_found; /* tombstone */
556 :
557 0 : ulong acc_orig_sz = src_meta->dlen;
558 0 : ulong val_sz_min = sizeof(fd_account_meta_t)+fd_ulong_max( data_max0[ i ], acc_orig_sz );
559 0 : ulong acc_sz = flag_truncate ? 0UL : acc_orig_sz;
560 0 : ulong val_sz = sizeof(fd_account_meta_t)+acc_sz;
561 0 : ulong val_max = 0UL;
562 0 : void * val = fd_alloc_malloc_at_least( v2->funk->alloc, 16UL, val_sz_min, &val_max );
563 0 : if( FD_UNLIKELY( !val ) ) {
564 0 : FD_LOG_CRIT(( "Failed to modify account: out of memory allocating %lu bytes", acc_orig_sz ));
565 0 : }
566 :
567 0 : fd_account_meta_t * dst_meta = val;
568 0 : uchar * dst_data = (uchar *)( dst_meta+1 );
569 0 : ulong data_max_actual = val_max - sizeof(fd_account_meta_t);
570 0 : if( flag_truncate ) fd_accdb_funk_copy_truncated( dst_meta, src_meta );
571 0 : else fd_accdb_funk_copy_account ( dst_meta, dst_data, src_meta, src_data );
572 0 : if( acc_orig_sz<data_max_actual ) {
573 : /* Zero out trailing data */
574 0 : uchar * tail = dst_data +acc_orig_sz;
575 0 : ulong tail_sz = data_max_actual-acc_orig_sz;
576 0 : fd_memset( tail, 0, tail_sz );
577 0 : }
578 :
579 0 : fd_accdb_funk_prep_create( rw, v2->funk, txn, addr_i, val, val_sz, val_max );
580 0 : fd_funk_rec_write_lock_uncontended( (fd_funk_rec_t *)rw->ref->user_data );
581 :
582 0 : req_cnt++;
583 0 : accdb->base.rw_active++;
584 0 : }
585 :
586 : /* Send "RELEASE" batch (reuse val_gaddr values),
587 : and wait for response */
588 :
589 0 : if( req_cnt ) {
590 0 : ulong req_id = v2->vinyl_req_id++;
591 0 : memset( fd_vinyl_req_batch_comp( req_pool, batch_idx ), 0, sizeof(fd_vinyl_comp_t) );
592 0 : fd_vinyl_req_send_batch( rq, req_pool, req_wksp, req_id, link_id, FD_VINYL_REQ_TYPE_RELEASE, 0UL, batch_idx, req_cnt );
593 :
594 0 : while( FD_VOLATILE_CONST( comp->seq )!=1UL ) FD_SPIN_PAUSE();
595 0 : FD_COMPILER_MFENCE();
596 0 : int comp_err = FD_VOLATILE_CONST( comp->err );
597 0 : if( FD_UNLIKELY( comp_err!=FD_VINYL_SUCCESS ) ) {
598 0 : FD_LOG_CRIT(( "vinyl tile rejected my RELEASE request: %i-%s", comp_err, fd_vinyl_strerror( comp_err ) ));
599 0 : }
600 0 : }
601 :
602 0 : fd_vinyl_req_pool_release( req_pool, batch_idx );
603 0 : }
604 :
605 : void
606 : funk_close_rw( fd_accdb_user_v2_t * accdb,
607 0 : fd_accdb_rw_t * write ) {
608 0 : fd_funk_rec_t * rec = (fd_funk_rec_t *)write->ref->user_data;
609 :
610 0 : if( FD_UNLIKELY( !accdb->base.rw_active ) ) {
611 0 : FD_LOG_CRIT(( "Failed to modify account: ref count underflow" ));
612 0 : }
613 :
614 0 : if( write->ref->user_data2 ) {
615 0 : fd_funk_txn_t * txn = (fd_funk_txn_t *)write->ref->user_data2;
616 0 : fd_funk_rec_prepare_t prepare = {
617 0 : .rec = rec,
618 0 : .rec_head_idx = &txn->rec_head_idx,
619 0 : .rec_tail_idx = &txn->rec_tail_idx
620 0 : };
621 0 : fd_funk_rec_publish( accdb->funk, &prepare );
622 0 : }
623 :
624 0 : fd_funk_rec_write_unlock( rec );
625 0 : accdb->base.rw_active--;
626 0 : }
627 :
628 : void
629 : fd_accdb_user_v2_close_ref_multi( fd_accdb_user_t * accdb,
630 : fd_accdb_ref_t * ref0,
631 0 : ulong cnt ) {
632 0 : fd_accdb_user_v2_t * v2 = (fd_accdb_user_v2_t *)accdb;
633 0 : fd_vinyl_rq_t * rq = v2->vinyl_rq; /* "request queue "*/
634 0 : fd_vinyl_req_pool_t * req_pool = v2->vinyl_req_pool; /* "request pool" */
635 0 : fd_wksp_t * req_wksp = v2->vinyl_req_wksp; /* shm workspace containing request buffer */
636 0 : fd_wksp_t * data_wksp = v2->vinyl_data_wksp; /* shm workspace containing vinyl data cache */
637 0 : ulong link_id = v2->vinyl_link_id; /* vinyl client ID */
638 :
639 0 : if( FD_UNLIKELY( cnt>fd_vinyl_req_batch_key_max( req_pool ) ) ) {
640 0 : FD_LOG_CRIT(( "close_ref_multi cnt %lu exceeds vinyl request batch max %lu",
641 0 : cnt, fd_vinyl_req_batch_key_max( req_pool ) ));
642 0 : }
643 :
644 : /* First, release all references to vinyl records
645 : (This is a prefetch friendly / fast loop) */
646 :
647 0 : ulong batch_idx = fd_vinyl_req_pool_acquire( req_pool );
648 : /* req_pool_release called before returning */
649 0 : fd_vinyl_comp_t * comp = fd_vinyl_req_batch_comp ( req_pool, batch_idx );
650 0 : schar * req_err0 = fd_vinyl_req_batch_err ( req_pool, batch_idx );
651 0 : ulong * req_val_gaddr0 = fd_vinyl_req_batch_val_gaddr( req_pool, batch_idx );
652 :
653 0 : ulong ro_close_cnt = 0UL;
654 0 : ulong rw_close_cnt = 0UL;
655 0 : ulong req_cnt = 0UL;
656 0 : for( ulong i=0UL; i<cnt; i++ ) {
657 0 : fd_accdb_ref_t * ref = &ref0[ i ];
658 0 : if( ref->accdb_type!=FD_ACCDB_TYPE_V2 ) continue;
659 0 : ref->ref_type==FD_ACCDB_REF_RO ? ro_close_cnt++ : rw_close_cnt++;
660 0 : req_err0 [ req_cnt ] = 0;
661 0 : req_val_gaddr0[ req_cnt ] = fd_wksp_gaddr_fast( data_wksp, (void *)ref->meta_laddr );
662 0 : memset( ref, 0, sizeof(fd_accdb_ref_t) );
663 0 : req_cnt++;
664 0 : }
665 0 : if( req_cnt ) {
666 0 : if( FD_UNLIKELY( ro_close_cnt > accdb->base.ro_active ) ) {
667 0 : FD_LOG_CRIT(( "attempted to close more accdb_ro (%lu) than are open (%lu)",
668 0 : ro_close_cnt, accdb->base.ro_active ));
669 0 : }
670 0 : if( FD_UNLIKELY( rw_close_cnt > accdb->base.rw_active ) ) {
671 0 : FD_LOG_CRIT(( "attempted to close more accdb_rw (%lu) than are open (%lu)",
672 0 : rw_close_cnt, accdb->base.rw_active ));
673 0 : }
674 0 : ulong req_id = v2->vinyl_req_id++;
675 0 : memset( fd_vinyl_req_batch_comp( req_pool, batch_idx ), 0, sizeof(fd_vinyl_comp_t) );
676 0 : fd_vinyl_req_send_batch( rq, req_pool, req_wksp, req_id, link_id, FD_VINYL_REQ_TYPE_RELEASE, 0UL, batch_idx, req_cnt );
677 0 : }
678 :
679 : /* While our vinyl request is inflight, release funk records
680 : (This does expensive DRAM accesses, which are convenient to do when
681 : we are waiting for the database to asynchronously respond) */
682 :
683 0 : for( ulong i=0UL; i<cnt; i++ ) {
684 0 : fd_accdb_ref_t * ref = &ref0[ i ];
685 0 : if( ref->accdb_type!=FD_ACCDB_TYPE_V1 ) continue;
686 0 : switch( ref0[ i ].ref_type ) {
687 0 : case FD_ACCDB_REF_RO:
688 0 : accdb->base.ro_active--;
689 0 : fd_funk_rec_read_unlock( (fd_funk_rec_t *)ref->user_data );
690 0 : break;
691 0 : case FD_ACCDB_REF_RW:
692 0 : funk_close_rw( v2, (fd_accdb_rw_t *)ref );
693 0 : break;
694 0 : default:
695 0 : FD_LOG_CRIT(( "invalid ref_type %u in fd_accdb_user_v1_close_ref", (uint)ref->ref_type ));
696 0 : }
697 0 : memset( ref, 0, sizeof(fd_accdb_ref_t) );
698 0 : }
699 :
700 : /* Wait for response from vinyl */
701 :
702 0 : if( req_cnt ) {
703 0 : while( FD_VOLATILE_CONST( comp->seq )!=1UL ) FD_SPIN_PAUSE();
704 0 : FD_COMPILER_MFENCE();
705 0 : int comp_err = FD_VOLATILE_CONST( comp->err );
706 0 : if( FD_UNLIKELY( comp_err!=FD_VINYL_SUCCESS ) ) {
707 0 : FD_LOG_CRIT(( "vinyl tile rejected my RELEASE request: %i-%s", comp_err, fd_vinyl_strerror( comp_err ) ));
708 0 : }
709 0 : for( ulong i=0UL; i<req_cnt; i++ ) {
710 0 : int req_err = req_err0[ i ];
711 0 : if( FD_UNLIKELY( req_err!=FD_VINYL_SUCCESS ) ) {
712 0 : FD_LOG_CRIT(( "vinyl tile RELEASE request failed: %i-%s", req_err, fd_vinyl_strerror( req_err ) ));
713 0 : }
714 0 : }
715 :
716 0 : accdb->base.ro_active -= ro_close_cnt;
717 0 : accdb->base.rw_active -= rw_close_cnt;
718 0 : }
719 :
720 0 : fd_vinyl_req_pool_release( req_pool, batch_idx );
721 0 : }
722 :
723 : ulong
724 : fd_accdb_user_v2_rw_data_max( fd_accdb_user_t * accdb,
725 0 : fd_accdb_rw_t const * rw ) {
726 0 : (void)accdb;
727 0 : if( rw->ref->accdb_type==FD_ACCDB_TYPE_NONE ) {
728 0 : return rw->ref->user_data; /* data_max */
729 0 : }
730 0 : fd_funk_rec_t * rec = (fd_funk_rec_t *)rw->ref->user_data;
731 0 : return (ulong)( rec->val_max - sizeof(fd_account_meta_t) );
732 0 : }
733 :
734 : void
735 : fd_accdb_user_v2_rw_data_sz_set( fd_accdb_user_t * accdb,
736 : fd_accdb_rw_t * rw,
737 : ulong data_sz,
738 0 : int flags ) {
739 0 : int flag_dontzero = !!( flags & FD_ACCDB_FLAG_DONTZERO );
740 0 : if( FD_UNLIKELY( flags & ~(FD_ACCDB_FLAG_DONTZERO) ) ) {
741 0 : FD_LOG_CRIT(( "invalid flags for rw_data_sz_set: %#02x", (uint)flags ));
742 0 : }
743 :
744 0 : ulong prev_sz = rw->meta->dlen;
745 0 : if( data_sz>prev_sz ) {
746 0 : ulong data_max = fd_accdb_user_v2_rw_data_max( accdb, rw );
747 0 : if( FD_UNLIKELY( data_sz>data_max ) ) {
748 0 : FD_LOG_CRIT(( "attempted to write %lu bytes into a rec with only %lu bytes of data space",
749 0 : data_sz, data_max ));
750 0 : }
751 0 : if( !flag_dontzero ) {
752 0 : void * tail = (uchar *)fd_accdb_ref_data( rw ) + prev_sz;
753 0 : fd_memset( tail, 0, data_sz-prev_sz );
754 0 : }
755 0 : }
756 0 : rw->meta->dlen = (uint)data_sz;
757 :
758 0 : if( rw->ref->accdb_type==FD_ACCDB_TYPE_V1 ) {
759 0 : fd_funk_rec_t * rec = (fd_funk_rec_t *)rw->ref->user_data;
760 0 : rec->val_sz = (uint)( sizeof(fd_account_meta_t)+data_sz ) & FD_FUNK_REC_VAL_MAX;
761 0 : }
762 0 : }
763 :
764 : fd_accdb_user_t *
765 : fd_accdb_user_v2_init( fd_accdb_user_t * accdb_,
766 : void * shfunk,
767 : void * vinyl_rq,
768 : void * vinyl_data,
769 : void * vinyl_req_pool,
770 0 : ulong vinyl_link_id ) {
771 0 : if( FD_UNLIKELY( !accdb_ ) ) {
772 0 : FD_LOG_WARNING(( "NULL ljoin" ));
773 0 : return NULL;
774 0 : }
775 0 : if( FD_UNLIKELY( !shfunk ) ) {
776 0 : FD_LOG_WARNING(( "NULL shfunk" ));
777 0 : return NULL;
778 0 : }
779 0 : if( FD_UNLIKELY( !vinyl_data ) ) {
780 0 : FD_LOG_WARNING(( "NULL vinyl_data" ));
781 0 : return NULL;
782 0 : }
783 :
784 0 : fd_accdb_user_v2_t * accdb = fd_type_pun( accdb_ );
785 0 : memset( accdb, 0, sizeof(fd_accdb_user_v2_t) );
786 :
787 0 : if( FD_UNLIKELY( !fd_funk_join( accdb->funk, shfunk ) ) ) {
788 0 : FD_LOG_CRIT(( "fd_funk_join failed" ));
789 0 : }
790 :
791 0 : fd_vinyl_rq_t * rq = fd_vinyl_rq_join( vinyl_rq );
792 0 : fd_vinyl_req_pool_t * req_pool = fd_vinyl_req_pool_join( vinyl_req_pool );
793 0 : if( FD_UNLIKELY( !rq || !req_pool ) ) {
794 : /* component joins log warning if this is reached */
795 0 : FD_LOG_WARNING(( "Failed to initialize database client" ));
796 0 : return NULL;
797 0 : }
798 :
799 0 : accdb->vinyl_req_id = 0UL;
800 0 : accdb->vinyl_rq = rq;
801 0 : accdb->vinyl_link_id = vinyl_link_id;
802 0 : accdb->vinyl_data_wksp = vinyl_data;
803 0 : accdb->vinyl_req_wksp = fd_wksp_containing( req_pool );
804 0 : accdb->vinyl_req_pool = req_pool;
805 0 : accdb->base.accdb_type = FD_ACCDB_TYPE_V2;
806 0 : accdb->base.vt = &fd_accdb_user_v2_vt;
807 0 : return accdb_;
808 0 : }
809 :
810 : void
811 0 : fd_accdb_user_v2_fini( fd_accdb_user_t * accdb ) {
812 0 : fd_accdb_user_v2_t * user = (fd_accdb_user_v2_t *)accdb;
813 :
814 0 : fd_vinyl_rq_leave( user->vinyl_rq );
815 :
816 0 : if( FD_UNLIKELY( !fd_funk_leave( user->funk, NULL ) ) ) FD_LOG_CRIT(( "fd_funk_leave failed" ));
817 0 : }
818 :
819 : ulong
820 0 : fd_accdb_user_v2_batch_max( fd_accdb_user_t * accdb ) {
821 0 : fd_accdb_user_v2_t * user = (fd_accdb_user_v2_t *)accdb;
822 0 : return fd_vinyl_req_batch_key_max( user->vinyl_req_pool );
823 0 : }
824 :
825 :
826 : fd_accdb_user_vt_t const fd_accdb_user_v2_vt = {
827 : .fini = fd_accdb_user_v2_fini,
828 : .batch_max = fd_accdb_user_v2_batch_max,
829 : .open_ro_multi = fd_accdb_user_v2_open_ro_multi,
830 : .open_rw_multi = fd_accdb_user_v2_open_rw_multi,
831 : .close_ref_multi = fd_accdb_user_v2_close_ref_multi,
832 : .rw_data_max = fd_accdb_user_v2_rw_data_max,
833 : .rw_data_sz_set = fd_accdb_user_v2_rw_data_sz_set
834 : };
|