Line data Source code
1 : #include "fd_accdb_impl_v2.h"
2 : #include "fd_accdb_funk.h"
3 : #include "fd_vinyl_req_pool.h"
4 :
5 : FD_STATIC_ASSERT( alignof(fd_accdb_user_v2_t)<=alignof(fd_accdb_user_t), layout );
6 : FD_STATIC_ASSERT( sizeof (fd_accdb_user_v2_t)<=sizeof(fd_accdb_user_t), layout );
7 :
8 : /* Record synchronization *********************************************/
9 :
10 : /* fd_funk_rec_write_{lock,unlock} acquire/release a record write lock.
11 :
12 : These are assumed to never fail because writes to accdb records are
13 : externally coordinated to be non-conflicting at the record level.
14 : In other words, it is assumed that, when the caller attempts to
15 : write to a record:
16 : - no admin attempts to root or cancel the DB txn this write targets
17 : (admin may only root/cancel txns after they are frozen, and users
18 : may only write to txns before they are frozen)
19 : - no other user attempts to read/write to the same record until the
20 : current thread is done writing. (Other threads wait for the
21 : current thread to signal completion before attempting to access) */
22 :
23 : static void
24 0 : fd_funk_rec_write_lock( fd_funk_rec_t * rec ) {
25 0 : ulong volatile * vl = &rec->ver_lock;
26 0 : ulong val = FD_VOLATILE_CONST( *vl );
27 0 : if( FD_UNLIKELY( fd_funk_rec_lock_bits( val ) ) ) {
28 0 : FD_LOG_CRIT(( "fd_funk_rec_write_lock(" FD_FUNK_REC_PAIR_FMT ") failed: record has active readers",
29 0 : FD_FUNK_REC_PAIR_FMT_ARGS( rec->pair ) ));
30 0 : }
31 0 : ulong val_new = fd_funk_rec_ver_lock( fd_funk_rec_ver_bits( val ), FD_FUNK_REC_LOCK_MASK );
32 0 : if( FD_UNLIKELY( FD_ATOMIC_CAS( vl, val, val_new )!=val ) ) {
33 0 : FD_LOG_CRIT(( "fd_funk_rec_write_lock(" FD_FUNK_REC_PAIR_FMT ") failed: data race detected",
34 0 : FD_FUNK_REC_PAIR_FMT_ARGS( rec->pair ) ));
35 0 : }
36 0 : }
37 :
38 : static void
39 0 : fd_funk_rec_write_lock_uncontended( fd_funk_rec_t * rec ) {
40 0 : ulong val = rec->ver_lock;
41 0 : ulong val_ver = fd_funk_rec_ver_bits( val );
42 0 : rec->ver_lock = fd_funk_rec_ver_lock( val_ver, FD_FUNK_REC_LOCK_MASK );
43 0 : }
44 :
45 : static void
46 0 : fd_funk_rec_write_unlock( fd_funk_rec_t * rec ) {
47 0 : ulong volatile * vl = &rec->ver_lock;
48 0 : ulong val = FD_VOLATILE_CONST( *vl );
49 0 : if( FD_UNLIKELY( fd_funk_rec_lock_bits( val )!=FD_FUNK_REC_LOCK_MASK ) ) {
50 0 : FD_LOG_CRIT(( "fd_funk_rec_write_unlock(" FD_FUNK_REC_PAIR_FMT ") failed: record is not write locked",
51 0 : FD_FUNK_REC_PAIR_FMT_ARGS( rec->pair ) ));
52 0 : }
53 0 : ulong val_new = fd_funk_rec_ver_lock( fd_funk_rec_ver_bits( val ), 0UL );
54 0 : if( FD_UNLIKELY( FD_ATOMIC_CAS( vl, val, val_new )!=val ) ) {
55 0 : FD_LOG_CRIT(( "fd_funk_rec_write_unlock(" FD_FUNK_REC_PAIR_FMT ") failed: data race detected",
56 0 : FD_FUNK_REC_PAIR_FMT_ARGS( rec->pair ) ));
57 0 : }
58 0 : }
59 :
60 : /* fd_funk_rec_read_{lock_try,unlock} try-acquire/release a record read
61 : lock.
62 :
63 : Read lock acquires may fail for frozen txn. This is because an admin
64 : may concurrently root the txn as we are querying. */
65 :
66 : static int
67 0 : fd_funk_rec_read_lock( fd_funk_rec_t * rec ) {
68 0 : ulong volatile * vl = &rec->ver_lock;
69 0 : for(;;) {
70 0 : ulong val = FD_VOLATILE_CONST( *vl );
71 0 : ulong val_ver = fd_funk_rec_ver_bits ( val );
72 0 : ulong val_lock = fd_funk_rec_lock_bits( val );
73 0 : if( FD_UNLIKELY( !fd_funk_rec_ver_alive( val_ver ) ) ) {
74 0 : return FD_MAP_ERR_AGAIN;
75 0 : }
76 0 : if( FD_UNLIKELY( val_lock>=FD_FUNK_REC_LOCK_MASK-1 ) ) {
77 0 : FD_LOG_CRIT(( "fd_funk_rec_read_lock(" FD_FUNK_REC_PAIR_FMT ") failed: val_lock=%#lx (too many readers or already write locked)",
78 0 : FD_FUNK_REC_PAIR_FMT_ARGS( rec->pair ), val_lock ));
79 0 : }
80 0 : ulong val_new = fd_funk_rec_ver_lock( val_ver, val_lock+1UL );
81 0 : if( FD_LIKELY( FD_ATOMIC_CAS( vl, val, val_new )==val ) ) {
82 0 : return FD_MAP_SUCCESS;
83 0 : }
84 0 : }
85 0 : }
86 :
87 : static void
88 0 : fd_funk_rec_read_unlock( fd_funk_rec_t * rec ) {
89 0 : ulong volatile * vl = &rec->ver_lock;
90 0 : for(;;) {
91 0 : ulong val = FD_VOLATILE_CONST( *vl );
92 0 : ulong val_ver = fd_funk_rec_ver_bits ( val );
93 0 : ulong val_lock = fd_funk_rec_lock_bits( val );
94 0 : if( FD_UNLIKELY( val_lock==0UL || val_lock==FD_FUNK_REC_LOCK_MASK ) ) {
95 0 : FD_LOG_CRIT(( "fd_funk_rec_read_unlock(" FD_FUNK_REC_PAIR_FMT ") failed: val_lock=%#lx (cannot unlock)",
96 0 : FD_FUNK_REC_PAIR_FMT_ARGS( rec->pair ), val_lock ));
97 0 : }
98 0 : ulong val_new = fd_funk_rec_ver_lock( val_ver, val_lock-1UL );
99 0 : if( FD_LIKELY( FD_ATOMIC_CAS( vl, val, val_new )==val ) ) {
100 0 : return;
101 0 : }
102 0 : }
103 0 : }
104 :
105 : /* Record acquisition *************************************************/
106 :
107 : /* funk_rec_acquire finds the newest revision of 'key' in the funk hash
108 : chain at index chain_idx. Only considers records on the current fork
109 : (fork nodes stored in accdb).
110 :
111 : On success, returns ACQUIRE_{READ,WRITE} and points *out_rec to the
112 : record found. If is_write, a write-lock is acquired for this record,
113 : otherwise a read lock. It is the caller's responsibility to release
114 : this lock.
115 :
116 : If no record was found, returns ACQUIRE_NOT_FOUND.
117 :
118 : Returns ACQUIRE_FAILED on index contention (hash map locks) or
119 : record contention (admin started deleting the record just as we were
120 : about to start the access). The caller should retry in this case. */
121 :
122 0 : #define ACQUIRE_READ 0
123 0 : #define ACQUIRE_WRITE 1
124 0 : #define ACQUIRE_NOT_FOUND 2
125 0 : #define ACQUIRE_FAILED 3
126 :
127 : static int
128 : funk_rec_acquire( fd_accdb_user_v2_t const * accdb,
129 : ulong chain_idx,
130 : fd_funk_rec_key_t const * key,
131 : fd_funk_rec_t ** out_rec,
132 0 : _Bool is_write ) {
133 0 : *out_rec = NULL;
134 :
135 0 : fd_funk_rec_map_shmem_t const * shmap = accdb->funk->rec_map->map;
136 0 : fd_funk_rec_map_shmem_private_chain_t const * chain_tbl = fd_funk_rec_map_shmem_private_chain_const( shmap, 0UL );
137 0 : fd_funk_rec_map_shmem_private_chain_t const * chain = chain_tbl + chain_idx;
138 0 : fd_funk_rec_t * rec_tbl = accdb->funk->rec_pool->ele;
139 0 : ulong rec_max = fd_funk_rec_pool_ele_max( accdb->funk->rec_pool );
140 0 : ulong ver_cnt = FD_VOLATILE_CONST( chain->ver_cnt );
141 :
142 : /* Start a speculative transaction for the chain containing revisions
143 : of the account key we are looking for. */
144 0 : ulong cnt = fd_funk_rec_map_private_vcnt_cnt( ver_cnt );
145 0 : if( FD_UNLIKELY( fd_funk_rec_map_private_vcnt_ver( ver_cnt )&1 ) ) {
146 0 : return ACQUIRE_FAILED; /* chain is locked */
147 0 : }
148 0 : FD_COMPILER_MFENCE();
149 0 : uint ele_idx = chain->head_cidx;
150 :
151 : /* Walk the map chain, bail at the first entry
152 : (Each chain is sorted newest-to-oldest) */
153 0 : fd_funk_rec_t * best = NULL;
154 0 : for( ulong i=0UL; i<cnt; i++, ele_idx=rec_tbl[ ele_idx ].map_next ) {
155 0 : fd_funk_rec_t * rec = &rec_tbl[ ele_idx ];
156 :
157 : /* Skip over unrelated records (hash collision) */
158 0 : if( FD_UNLIKELY( !fd_funk_rec_key_eq( rec->pair.key, key ) ) ) continue;
159 :
160 : /* Confirm that record is part of the current fork
161 : FIXME this has bad performance / pointer-chasing */
162 0 : if( FD_UNLIKELY( !fd_accdb_lineage_has_xid( accdb->lineage, rec->pair.xid ) ) ) continue;
163 :
164 0 : if( FD_UNLIKELY( rec->map_next==ele_idx ) ) {
165 0 : FD_LOG_CRIT(( "fd_accdb_search_chain detected cycle" ));
166 0 : }
167 0 : if( rec->map_next > rec_max ) {
168 0 : if( FD_UNLIKELY( !fd_funk_rec_map_private_idx_is_null( rec->map_next ) ) ) {
169 0 : FD_LOG_CRIT(( "fd_accdb_search_chain detected memory corruption: rec->map_next %u is out of bounds (rec_max %lu)",
170 0 : rec->map_next, rec_max ));
171 0 : }
172 0 : }
173 0 : best = rec;
174 0 : break;
175 0 : }
176 :
177 : /* Found a record, acquire a lock */
178 0 : if( best ) {
179 : /* If the write does not target the current transaction, demote it */
180 0 : if( is_write && !fd_accdb_lineage_is_tip( accdb->lineage, best->pair.xid ) ) {
181 0 : is_write = 0;
182 0 : }
183 0 : if( is_write ) {
184 0 : fd_funk_rec_write_lock( best );
185 0 : } else {
186 0 : if( fd_funk_rec_read_lock( best )!=FD_MAP_SUCCESS ) {
187 0 : return ACQUIRE_FAILED; /* record about to be moved to vinyl */
188 0 : }
189 0 : }
190 0 : }
191 :
192 : /* Retry if there was contention at the hash map */
193 0 : if( FD_UNLIKELY( FD_VOLATILE_CONST( chain->ver_cnt )!=ver_cnt ) ) {
194 0 : if( best ) {
195 0 : if( is_write ) fd_funk_rec_write_unlock( best );
196 0 : else fd_funk_rec_read_unlock ( best );
197 0 : }
198 0 : return ACQUIRE_FAILED;
199 0 : }
200 :
201 0 : *out_rec = best;
202 0 : return best ? ( is_write ? ACQUIRE_WRITE : ACQUIRE_READ ) : ACQUIRE_NOT_FOUND;
203 0 : }
204 :
205 : static int
206 : funk_open_ref( fd_accdb_user_v2_t * accdb,
207 : fd_accdb_ref_t * ref,
208 : fd_funk_txn_xid_t const * xid,
209 : void const * address,
210 0 : _Bool is_write ) {
211 0 : fd_funk_t const * funk = accdb->funk;
212 0 : fd_funk_rec_key_t key[1]; memcpy( key->uc, address, 32UL );
213 :
214 : /* Hash key to chain */
215 0 : fd_funk_xid_key_pair_t pair[1];
216 0 : fd_funk_txn_xid_copy( pair->xid, xid );
217 0 : fd_funk_rec_key_copy( pair->key, key );
218 0 : fd_funk_rec_map_t const * rec_map = funk->rec_map;
219 0 : ulong hash = fd_funk_rec_map_key_hash( pair, rec_map->map->seed );
220 0 : ulong chain_idx = (hash & (rec_map->map->chain_cnt-1UL) );
221 :
222 : /* Traverse chain for candidate */
223 0 : fd_funk_rec_t * rec = NULL;
224 0 : int err;
225 0 : for(;;) {
226 0 : err = funk_rec_acquire( accdb, chain_idx, key, &rec, is_write );
227 0 : if( FD_LIKELY( err!=ACQUIRE_FAILED ) ) break;
228 0 : FD_SPIN_PAUSE();
229 : /* FIXME backoff */
230 0 : }
231 0 : if( rec ) {
232 0 : memcpy( ref->address, address, 32UL );
233 0 : ref->accdb_type = FD_ACCDB_TYPE_V1;
234 0 : ref->ref_type = err==ACQUIRE_WRITE ? FD_ACCDB_REF_RW : FD_ACCDB_REF_RO;
235 0 : ref->user_data = (ulong)rec;
236 0 : ref->user_data2 = 0UL;
237 0 : ref->meta_laddr = (ulong)fd_funk_val( rec, funk->wksp );
238 0 : } else {
239 0 : memset( ref, 0, sizeof(fd_accdb_rw_t) );
240 0 : }
241 0 : return err;
242 0 : }
243 :
244 : /* Read method ********************************************************/
245 :
246 : void
247 : fd_accdb_user_v2_open_ro_multi( fd_accdb_user_t * accdb,
248 : fd_accdb_ro_t * ro0,
249 : fd_funk_txn_xid_t const * xid,
250 : void const * addr0,
251 0 : ulong cnt ) {
252 0 : fd_accdb_user_v2_t * v2 = (fd_accdb_user_v2_t *)accdb;
253 0 : fd_vinyl_rq_t * rq = v2->vinyl_rq; /* "request queue "*/
254 0 : fd_vinyl_req_pool_t * req_pool = v2->vinyl_req_pool; /* "request pool" */
255 0 : fd_wksp_t * req_wksp = v2->vinyl_req_wksp; /* shm workspace containing request buffer */
256 0 : fd_wksp_t * data_wksp = v2->vinyl_data_wksp; /* shm workspace containing vinyl data cache */
257 0 : ulong link_id = v2->vinyl_link_id; /* vinyl client ID */
258 :
259 0 : if( FD_UNLIKELY( cnt>fd_vinyl_req_batch_key_max( req_pool ) ) ) {
260 0 : FD_LOG_CRIT(( "open_ro_multi cnt %lu exceeds vinyl request batch max %lu",
261 0 : cnt, fd_vinyl_req_batch_key_max( req_pool ) ));
262 0 : }
263 :
264 : /* Open accounts from funk
265 :
266 : (FIXME this is a potentially slow operation, might want to fire off
267 : a 'prefetch' instruction to vinyl asynchronously before doing this,
268 : so that the vinyl data is in cache by the time v1_open_rw_multi
269 : finishes) */
270 :
271 0 : fd_accdb_lineage_set_fork( v2->lineage, v2->funk, xid );
272 0 : ulong addr_laddr = (ulong)addr0;
273 0 : for( ulong i=0UL; i<cnt; i++ ) {
274 0 : void const * addr_i = (void const *)( (ulong)addr_laddr + i*32UL );
275 0 : if( funk_open_ref( v2, ro0[i].ref, xid, addr_i, 0 )==ACQUIRE_READ ) {
276 0 : v2->base.ro_active++;
277 0 : } else {
278 0 : fd_accdb_ro_init_empty( &ro0[i], addr_i );
279 0 : }
280 0 : }
281 :
282 : /* For the accounts that were not found in funk, open vinyl records */
283 :
284 0 : ulong batch_idx = fd_vinyl_req_pool_acquire( req_pool );
285 : /* req_pool_release called before returning */
286 0 : fd_vinyl_comp_t * comp = fd_vinyl_req_batch_comp ( req_pool, batch_idx );
287 0 : fd_vinyl_key_t * req_key0 = fd_vinyl_req_batch_key ( req_pool, batch_idx );
288 0 : schar * req_err0 = fd_vinyl_req_batch_err ( req_pool, batch_idx );
289 0 : ulong * req_val_gaddr0 = fd_vinyl_req_batch_val_gaddr( req_pool, batch_idx );
290 :
291 : /* Create a read-only vinyl "ACQUIRE" batch */
292 :
293 0 : ulong req_cnt = 0UL;
294 0 : for( ulong i=0UL; i<cnt; i++ ) {
295 0 : if( ro0[i].ref->accdb_type!=FD_ACCDB_TYPE_NONE ) continue;
296 : /* At this point, addr0[i] not found in funk, load from vinyl */
297 0 : void const * addr_i = (void const *)( (ulong)addr0 + i*32UL );
298 :
299 0 : fd_vinyl_key_init( req_key0+req_cnt, addr_i, 32UL );
300 0 : req_err0 [ req_cnt ] = 0;
301 0 : req_val_gaddr0[ req_cnt ] = 0UL;
302 0 : req_cnt++;
303 0 : }
304 0 : if( !req_cnt ) {
305 : /* All records were found in funk, bail early */
306 0 : fd_vinyl_req_pool_release( req_pool, batch_idx );
307 0 : return;
308 0 : }
309 :
310 : /* Send read-only "ACQUIRE" batch to vinyl and wait for response */
311 :
312 0 : ulong req_id = v2->vinyl_req_id++;
313 0 : memset( comp, 0, sizeof(fd_vinyl_comp_t) );
314 0 : fd_vinyl_req_send_batch( rq, req_pool, req_wksp, req_id, link_id, FD_VINYL_REQ_TYPE_ACQUIRE, 0UL, batch_idx, req_cnt, 0UL );
315 :
316 0 : while( FD_VOLATILE_CONST( comp->seq )!=1UL ) FD_SPIN_PAUSE();
317 0 : FD_COMPILER_MFENCE();
318 0 : int comp_err = FD_VOLATILE_CONST( comp->err );
319 0 : if( FD_UNLIKELY( comp_err!=FD_VINYL_SUCCESS ) ) {
320 0 : FD_LOG_CRIT(( "vinyl tile rejected my ACQUIRE request: %i-%s", comp_err, fd_vinyl_strerror( comp_err ) ));
321 0 : }
322 :
323 : /* For the accounts that were newly found in vinyl, create accdb
324 : handles */
325 :
326 0 : req_cnt = 0UL;
327 0 : for( ulong i=0UL; i<cnt; i++ ) {
328 0 : if( ro0[i].ref->accdb_type!=FD_ACCDB_TYPE_NONE ) continue;
329 0 : void const * addr_i = (void const *)( (ulong)addr0 + i*32UL );
330 :
331 0 : int req_err = FD_VOLATILE_CONST( req_err0 [ req_cnt ] );
332 0 : ulong val_gaddr = FD_VOLATILE_CONST( req_val_gaddr0[ req_cnt ] );
333 :
334 0 : fd_accdb_ro_t * ro = &ro0[ i ];
335 0 : if( req_err==0 ) {
336 : /* Record found in vinyl, create reference */
337 0 : fd_account_meta_t const * meta = fd_wksp_laddr_fast( data_wksp, val_gaddr );
338 :
339 0 : accdb->base.ro_active++;
340 0 : *ro = (fd_accdb_ro_t) {0};
341 0 : memcpy( ro->ref->address, addr_i, 32UL );
342 0 : ro->ref->accdb_type = FD_ACCDB_TYPE_V2;
343 0 : ro->ref->ref_type = FD_ACCDB_REF_RO;
344 0 : ro->meta = meta;
345 0 : } else if( FD_UNLIKELY( req_err!=FD_VINYL_ERR_KEY ) ) {
346 0 : FD_LOG_CRIT(( "vinyl tile ACQUIRE request failed: %i-%s", req_err, fd_vinyl_strerror( req_err ) ));
347 0 : }
348 0 : req_cnt++;
349 0 : }
350 :
351 0 : fd_vinyl_req_pool_release( req_pool, batch_idx );
352 :
353 : /* At this point, ownership of vinyl records transitions to caller.
354 : (Released using close_ro_multi) */
355 0 : }
356 :
357 : /* Write method *******************************************************/
358 :
359 : void
360 : fd_accdb_user_v2_open_rw_multi( fd_accdb_user_t * accdb,
361 : fd_accdb_rw_t * rw0,
362 : fd_funk_txn_xid_t const * xid,
363 : void const * addr0,
364 : ulong const * data_max0,
365 : int flags,
366 0 : ulong cnt ) {
367 0 : fd_accdb_user_v2_t * v2 = (fd_accdb_user_v2_t *)accdb;
368 0 : fd_funk_t * funk = v2->funk;
369 0 : fd_vinyl_rq_t * rq = v2->vinyl_rq; /* "request queue "*/
370 0 : fd_vinyl_req_pool_t * req_pool = v2->vinyl_req_pool; /* "request pool" */
371 0 : fd_wksp_t * req_wksp = v2->vinyl_req_wksp; /* shm workspace containing request buffer */
372 0 : fd_wksp_t * data_wksp = v2->vinyl_data_wksp; /* shm workspace containing vinyl data cache */
373 0 : ulong link_id = v2->vinyl_link_id; /* vinyl client ID */
374 :
375 0 : fd_accdb_lineage_set_fork( v2->lineage, v2->funk, xid );
376 0 : fd_funk_txn_t * txn = fd_accdb_lineage_write_check( v2->lineage, v2->funk );
377 :
378 0 : int const flag_truncate = !!( flags & FD_ACCDB_FLAG_TRUNCATE );
379 0 : int const flag_create = !!( flags & FD_ACCDB_FLAG_CREATE );
380 0 : if( FD_UNLIKELY( flags & ~(FD_ACCDB_FLAG_CREATE|FD_ACCDB_FLAG_TRUNCATE) ) ) {
381 0 : FD_LOG_CRIT(( "invalid flags for open_rw: %#02x", (uint)flags ));
382 0 : }
383 :
384 0 : if( FD_UNLIKELY( cnt>fd_vinyl_req_batch_key_max( req_pool ) ) ) {
385 0 : FD_LOG_CRIT(( "open_rw_multi cnt %lu exceeds vinyl request batch max %lu",
386 0 : cnt, fd_vinyl_req_batch_key_max( req_pool ) ));
387 0 : }
388 :
389 : /* Query for existing funk records
390 :
391 : (FIXME this is a potentially slow operation, might want to fire off
392 : a 'prefetch' instruction to vinyl asynchronously before doing this,
393 : so that the vinyl data is in cache by the time v1_open_rw_multi
394 : finishes) */
395 :
396 0 : ulong addr_laddr = (ulong)addr0;
397 0 : for( ulong i=0UL; i<cnt; i++ ) {
398 0 : void const * addr_i = (void const *)( (ulong)addr_laddr + i*32UL );
399 0 : funk_open_ref( v2, rw0[ i ].ref, xid, addr_i, 1 );
400 0 : }
401 :
402 : /* For the accounts that were not found in funk, create writable funk
403 : records from elements in vinyl. */
404 :
405 0 : ulong batch_idx = fd_vinyl_req_pool_acquire( req_pool );
406 : /* req_pool_release called before returning */
407 0 : fd_vinyl_comp_t * comp = fd_vinyl_req_batch_comp ( req_pool, batch_idx );
408 0 : fd_vinyl_key_t * req_key0 = fd_vinyl_req_batch_key ( req_pool, batch_idx );
409 0 : schar * req_err0 = fd_vinyl_req_batch_err ( req_pool, batch_idx );
410 0 : ulong * req_val_gaddr0 = fd_vinyl_req_batch_val_gaddr( req_pool, batch_idx );
411 :
412 : /* Create a read-only vinyl "ACQUIRE" batch */
413 :
414 0 : ulong req_cnt = 0UL;
415 0 : for( ulong i=0UL; i<cnt; i++ ) {
416 0 : if( rw0[i].ref->ref_type!=FD_ACCDB_REF_INVAL ) continue;
417 : /* At this point, addr0[i] not found in funk, load from vinyl */
418 0 : void const * addr_i = (void const *)( (ulong)addr0 + i*32UL );
419 :
420 0 : fd_vinyl_key_init( req_key0+req_cnt, addr_i, 32UL );
421 0 : req_err0 [ req_cnt ] = 0;
422 0 : req_val_gaddr0[ req_cnt ] = 0UL;
423 0 : req_cnt++;
424 0 : }
425 :
426 : /* Send read-only "ACQUIRE" batch to vinyl and wait for response */
427 :
428 0 : if( req_cnt ) {
429 0 : ulong req_id = v2->vinyl_req_id++;
430 0 : memset( fd_vinyl_req_batch_comp( req_pool, batch_idx ), 0, sizeof(fd_vinyl_comp_t) );
431 0 : fd_vinyl_req_send_batch( rq, req_pool, req_wksp, req_id, link_id, FD_VINYL_REQ_TYPE_ACQUIRE, 0UL, batch_idx, req_cnt, 0UL );
432 :
433 0 : while( FD_VOLATILE_CONST( comp->seq )!=1UL ) FD_SPIN_PAUSE();
434 0 : FD_COMPILER_MFENCE();
435 0 : int comp_err = FD_VOLATILE_CONST( comp->err );
436 0 : if( FD_UNLIKELY( comp_err!=FD_VINYL_SUCCESS ) ) {
437 0 : FD_LOG_CRIT(( "vinyl tile rejected my ACQUIRE request: %i-%s", comp_err, fd_vinyl_strerror( comp_err ) ));
438 0 : }
439 0 : }
440 :
441 : /* Promote any found accounts to writable accounts */
442 :
443 0 : req_cnt = 0UL;
444 0 : for( ulong i=0UL; i<cnt; i++ ) {
445 0 : void const * addr_i = (void const *)( (ulong)addr0 + i*32UL );
446 0 : fd_accdb_rw_t * rw = &rw0[ i ];
447 0 : fd_funk_rec_t * rec = (fd_funk_rec_t *)rw->ref->user_data;
448 0 : ulong data_max = data_max0[ i ];
449 :
450 0 : if( rw->ref->ref_type==FD_ACCDB_REF_RW ) {
451 : /* Mutable record found, modify in-place */
452 :
453 0 : if( FD_UNLIKELY( !flag_create && fd_accdb_ref_lamports( rw->ro )==0UL ) ) {
454 : /* Tombstone */
455 0 : fd_funk_rec_write_unlock( rec );
456 0 : goto not_found;
457 0 : }
458 :
459 0 : ulong acc_orig_sz = fd_accdb_ref_data_sz( rw->ro );
460 0 : ulong val_sz_min = sizeof(fd_account_meta_t)+fd_ulong_max( data_max, acc_orig_sz );
461 0 : void * val = fd_funk_val_truncate( rec, funk->alloc, funk->wksp, 16UL, val_sz_min, NULL );
462 0 : if( FD_UNLIKELY( !val ) ) {
463 0 : FD_LOG_CRIT(( "Failed to modify account: out of memory allocating %lu bytes", acc_orig_sz ));
464 0 : }
465 0 : fd_accdb_funk_prep_inplace( rw, funk, rec );
466 0 : if( flag_truncate ) {
467 0 : rec->val_sz = sizeof(fd_account_meta_t);
468 0 : rw->meta->dlen = 0;
469 0 : }
470 0 : accdb->base.rw_active++;
471 : /* Retain write lock */
472 :
473 0 : continue; /* next account */
474 0 : }
475 :
476 0 : if( rw->ref->ref_type==FD_ACCDB_REF_RO ) {
477 : /* Frozen record found, copy out to new object */
478 :
479 0 : fd_accdb_ro_t * ro = rw->ro;
480 0 : if( FD_UNLIKELY( !flag_create && fd_accdb_ref_lamports( ro )==0UL ) ) {
481 : /* Tombstone */
482 0 : fd_funk_rec_read_unlock( rec );
483 0 : goto not_found;
484 0 : }
485 :
486 0 : ulong acc_orig_sz = fd_accdb_ref_data_sz( ro );
487 0 : ulong val_sz_min = sizeof(fd_account_meta_t)+fd_ulong_max( data_max, acc_orig_sz );
488 0 : ulong val_sz = flag_truncate ? sizeof(fd_account_meta_t) : rec->val_sz;
489 0 : ulong val_max = 0UL;
490 0 : void * val = fd_alloc_malloc_at_least( funk->alloc, 16UL, val_sz_min, &val_max );
491 0 : if( FD_UNLIKELY( !val ) ) {
492 0 : FD_LOG_CRIT(( "Failed to modify account: out of memory allocating %lu bytes", acc_orig_sz ));
493 0 : }
494 :
495 0 : fd_account_meta_t * meta = val;
496 0 : uchar * data = (uchar *)( meta+1 );
497 0 : ulong data_max_actual = val_max - sizeof(fd_account_meta_t);
498 0 : if( flag_truncate ) fd_accdb_funk_copy_truncated( meta, ro->meta );
499 0 : else fd_accdb_funk_copy_account ( meta, data, ro->meta, fd_account_data( ro->meta ) );
500 0 : if( acc_orig_sz<data_max_actual ) {
501 : /* Zero out trailing data */
502 0 : uchar * tail = data +acc_orig_sz;
503 0 : ulong tail_sz = data_max_actual-acc_orig_sz;
504 0 : fd_memset( tail, 0, tail_sz );
505 0 : }
506 :
507 0 : fd_accdb_funk_prep_create( rw, funk, txn, addr_i, val, val_sz, val_max );
508 0 : accdb->base.rw_active++;
509 :
510 0 : FD_COMPILER_MFENCE();
511 0 : fd_funk_rec_read_unlock( rec );
512 :
513 0 : continue; /* next account */
514 0 : }
515 :
516 : /* Record not found in funk */
517 :
518 0 : int req_err = req_err0[ req_cnt ];
519 0 : if( req_err ) {
520 : /* Record not found in vinyl either (truly does not exist)
521 : If CREATE flag was requested, create it in funk */
522 0 : if( FD_UNLIKELY( req_err!=FD_VINYL_ERR_KEY ) ) {
523 0 : FD_LOG_CRIT(( "vinyl tile ACQUIRE request failed: %i-%s", req_err, fd_vinyl_strerror( req_err ) ));
524 0 : }
525 0 : not_found:
526 0 : if( flag_create ) {
527 0 : fd_accdb_funk_create( v2->funk, rw, txn, addr_i, data_max0[ i ] );
528 0 : fd_funk_rec_write_lock_uncontended( (fd_funk_rec_t *)rw->ref->user_data );
529 0 : accdb->base.rw_active++;
530 0 : } else {
531 0 : memset( rw, 0, sizeof(fd_accdb_ref_t) );
532 0 : }
533 0 : req_cnt++;
534 0 : continue;
535 0 : }
536 :
537 : /* Record found in vinyl */
538 :
539 0 : ulong req_val_gaddr = req_val_gaddr0[ req_cnt ];
540 0 : fd_account_meta_t * src_meta = fd_wksp_laddr_fast( data_wksp, req_val_gaddr );
541 0 : uchar const * src_data = (uchar *)( src_meta+1 );
542 :
543 0 : if( FD_UNLIKELY( src_meta->lamports==0UL ) ) goto not_found; /* tombstone */
544 :
545 0 : ulong acc_orig_sz = src_meta->dlen;
546 0 : ulong val_sz_min = sizeof(fd_account_meta_t)+fd_ulong_max( data_max0[ i ], acc_orig_sz );
547 0 : ulong acc_sz = flag_truncate ? 0UL : acc_orig_sz;
548 0 : ulong val_sz = sizeof(fd_account_meta_t)+acc_sz;
549 0 : ulong val_max = 0UL;
550 0 : void * val = fd_alloc_malloc_at_least( v2->funk->alloc, 16UL, val_sz_min, &val_max );
551 0 : if( FD_UNLIKELY( !val ) ) {
552 0 : FD_LOG_CRIT(( "Failed to modify account: out of memory allocating %lu bytes", acc_orig_sz ));
553 0 : }
554 :
555 0 : fd_account_meta_t * dst_meta = val;
556 0 : uchar * dst_data = (uchar *)( dst_meta+1 );
557 0 : ulong data_max_actual = val_max - sizeof(fd_account_meta_t);
558 0 : if( flag_truncate ) fd_accdb_funk_copy_truncated( dst_meta, src_meta );
559 0 : else fd_accdb_funk_copy_account ( dst_meta, dst_data, src_meta, src_data );
560 0 : if( acc_orig_sz<data_max_actual ) {
561 : /* Zero out trailing data */
562 0 : uchar * tail = dst_data +acc_orig_sz;
563 0 : ulong tail_sz = data_max_actual-acc_orig_sz;
564 0 : fd_memset( tail, 0, tail_sz );
565 0 : }
566 :
567 0 : fd_accdb_funk_prep_create( rw, v2->funk, txn, addr_i, val, val_sz, val_max );
568 0 : fd_funk_rec_write_lock_uncontended( (fd_funk_rec_t *)rw->ref->user_data );
569 :
570 0 : req_cnt++;
571 0 : accdb->base.rw_active++;
572 0 : }
573 :
574 : /* Send "RELEASE" batch (reuse val_gaddr values),
575 : and wait for response */
576 :
577 0 : if( req_cnt ) {
578 0 : ulong req_id = v2->vinyl_req_id++;
579 0 : memset( fd_vinyl_req_batch_comp( req_pool, batch_idx ), 0, sizeof(fd_vinyl_comp_t) );
580 0 : fd_vinyl_req_send_batch( rq, req_pool, req_wksp, req_id, link_id, FD_VINYL_REQ_TYPE_RELEASE, 0UL, batch_idx, req_cnt, 0UL );
581 :
582 0 : while( FD_VOLATILE_CONST( comp->seq )!=1UL ) FD_SPIN_PAUSE();
583 0 : FD_COMPILER_MFENCE();
584 0 : int comp_err = FD_VOLATILE_CONST( comp->err );
585 0 : if( FD_UNLIKELY( comp_err!=FD_VINYL_SUCCESS ) ) {
586 0 : FD_LOG_CRIT(( "vinyl tile rejected my RELEASE request: %i-%s", comp_err, fd_vinyl_strerror( comp_err ) ));
587 0 : }
588 0 : }
589 :
590 0 : fd_vinyl_req_pool_release( req_pool, batch_idx );
591 0 : }
592 :
593 : void
594 : funk_close_rw( fd_accdb_user_v2_t * accdb,
595 0 : fd_accdb_rw_t * write ) {
596 0 : fd_funk_rec_t * rec = (fd_funk_rec_t *)write->ref->user_data;
597 :
598 0 : if( FD_UNLIKELY( !accdb->base.rw_active ) ) {
599 0 : FD_LOG_CRIT(( "Failed to modify account: ref count underflow" ));
600 0 : }
601 :
602 0 : if( write->ref->user_data2 ) {
603 0 : fd_funk_txn_t * txn = (fd_funk_txn_t *)write->ref->user_data2;
604 0 : fd_funk_rec_prepare_t prepare = {
605 0 : .rec = rec,
606 0 : .rec_head_idx = &txn->rec_head_idx,
607 0 : .rec_tail_idx = &txn->rec_tail_idx
608 0 : };
609 0 : fd_funk_rec_publish( accdb->funk, &prepare );
610 0 : }
611 :
612 0 : fd_funk_rec_write_unlock( rec );
613 0 : accdb->base.rw_active--;
614 0 : }
615 :
616 : void
617 : fd_accdb_user_v2_close_ref_multi( fd_accdb_user_t * accdb,
618 : fd_accdb_ref_t * ref0,
619 0 : ulong cnt ) {
620 0 : fd_accdb_user_v2_t * v2 = (fd_accdb_user_v2_t *)accdb;
621 0 : fd_vinyl_rq_t * rq = v2->vinyl_rq; /* "request queue "*/
622 0 : fd_vinyl_req_pool_t * req_pool = v2->vinyl_req_pool; /* "request pool" */
623 0 : fd_wksp_t * req_wksp = v2->vinyl_req_wksp; /* shm workspace containing request buffer */
624 0 : fd_wksp_t * data_wksp = v2->vinyl_data_wksp; /* shm workspace containing vinyl data cache */
625 0 : ulong link_id = v2->vinyl_link_id; /* vinyl client ID */
626 :
627 0 : if( FD_UNLIKELY( cnt>fd_vinyl_req_batch_key_max( req_pool ) ) ) {
628 0 : FD_LOG_CRIT(( "close_ref_multi cnt %lu exceeds vinyl request batch max %lu",
629 0 : cnt, fd_vinyl_req_batch_key_max( req_pool ) ));
630 0 : }
631 :
632 : /* First, release all references to vinyl records
633 : (This is a prefetch friendly / fast loop) */
634 :
635 0 : ulong batch_idx = fd_vinyl_req_pool_acquire( req_pool );
636 : /* req_pool_release called before returning */
637 0 : fd_vinyl_comp_t * comp = fd_vinyl_req_batch_comp ( req_pool, batch_idx );
638 0 : schar * req_err0 = fd_vinyl_req_batch_err ( req_pool, batch_idx );
639 0 : ulong * req_val_gaddr0 = fd_vinyl_req_batch_val_gaddr( req_pool, batch_idx );
640 :
641 0 : ulong ro_close_cnt = 0UL;
642 0 : ulong rw_close_cnt = 0UL;
643 0 : ulong req_cnt = 0UL;
644 0 : for( ulong i=0UL; i<cnt; i++ ) {
645 0 : fd_accdb_ref_t * ref = &ref0[ i ];
646 0 : if( ref->accdb_type!=FD_ACCDB_TYPE_V2 ) continue;
647 0 : ref->ref_type==FD_ACCDB_REF_RO ? ro_close_cnt++ : rw_close_cnt++;
648 0 : req_err0 [ req_cnt ] = 0;
649 0 : req_val_gaddr0[ req_cnt ] = fd_wksp_gaddr_fast( data_wksp, (void *)ref->meta_laddr );
650 0 : memset( ref, 0, sizeof(fd_accdb_ref_t) );
651 0 : req_cnt++;
652 0 : }
653 0 : if( req_cnt ) {
654 0 : if( FD_UNLIKELY( ro_close_cnt > accdb->base.ro_active ) ) {
655 0 : FD_LOG_CRIT(( "attempted to close more accdb_ro (%lu) than are open (%lu)",
656 0 : ro_close_cnt, accdb->base.ro_active ));
657 0 : }
658 0 : if( FD_UNLIKELY( rw_close_cnt > accdb->base.rw_active ) ) {
659 0 : FD_LOG_CRIT(( "attempted to close more accdb_rw (%lu) than are open (%lu)",
660 0 : rw_close_cnt, accdb->base.rw_active ));
661 0 : }
662 0 : ulong req_id = v2->vinyl_req_id++;
663 0 : memset( fd_vinyl_req_batch_comp( req_pool, batch_idx ), 0, sizeof(fd_vinyl_comp_t) );
664 0 : fd_vinyl_req_send_batch( rq, req_pool, req_wksp, req_id, link_id, FD_VINYL_REQ_TYPE_RELEASE, 0UL, batch_idx, req_cnt, 0UL );
665 0 : }
666 :
667 : /* While our vinyl request is inflight, release funk records
668 : (This does expensive DRAM accesses, which are convenient to do when
669 : we are waiting for the database to asynchronously respond) */
670 :
671 0 : for( ulong i=0UL; i<cnt; i++ ) {
672 0 : fd_accdb_ref_t * ref = &ref0[ i ];
673 0 : if( ref->accdb_type!=FD_ACCDB_TYPE_V1 ) continue;
674 0 : switch( ref0[ i ].ref_type ) {
675 0 : case FD_ACCDB_REF_RO:
676 0 : accdb->base.ro_active--;
677 0 : fd_funk_rec_read_unlock( (fd_funk_rec_t *)ref->user_data );
678 0 : break;
679 0 : case FD_ACCDB_REF_RW:
680 0 : funk_close_rw( v2, (fd_accdb_rw_t *)ref );
681 0 : break;
682 0 : default:
683 0 : FD_LOG_CRIT(( "invalid ref_type %u in fd_accdb_user_v1_close_ref", (uint)ref->ref_type ));
684 0 : }
685 0 : memset( ref, 0, sizeof(fd_accdb_ref_t) );
686 0 : }
687 :
688 : /* Wait for response from vinyl */
689 :
690 0 : if( req_cnt ) {
691 0 : while( FD_VOLATILE_CONST( comp->seq )!=1UL ) FD_SPIN_PAUSE();
692 0 : FD_COMPILER_MFENCE();
693 0 : int comp_err = FD_VOLATILE_CONST( comp->err );
694 0 : if( FD_UNLIKELY( comp_err!=FD_VINYL_SUCCESS ) ) {
695 0 : FD_LOG_CRIT(( "vinyl tile rejected my RELEASE request: %i-%s", comp_err, fd_vinyl_strerror( comp_err ) ));
696 0 : }
697 0 : for( ulong i=0UL; i<req_cnt; i++ ) {
698 0 : int req_err = req_err0[ i ];
699 0 : if( FD_UNLIKELY( req_err!=FD_VINYL_SUCCESS ) ) {
700 0 : FD_LOG_CRIT(( "vinyl tile RELEASE request failed: %i-%s", req_err, fd_vinyl_strerror( req_err ) ));
701 0 : }
702 0 : }
703 :
704 0 : accdb->base.ro_active -= ro_close_cnt;
705 0 : accdb->base.rw_active -= rw_close_cnt;
706 0 : }
707 :
708 0 : fd_vinyl_req_pool_release( req_pool, batch_idx );
709 0 : }
710 :
711 : ulong
712 : fd_accdb_user_v2_rw_data_max( fd_accdb_user_t * accdb,
713 0 : fd_accdb_rw_t const * rw ) {
714 0 : (void)accdb;
715 0 : if( rw->ref->accdb_type==FD_ACCDB_TYPE_NONE ) {
716 0 : return rw->ref->user_data; /* data_max */
717 0 : }
718 0 : fd_funk_rec_t * rec = (fd_funk_rec_t *)rw->ref->user_data;
719 0 : return (ulong)( rec->val_max - sizeof(fd_account_meta_t) );
720 0 : }
721 :
722 : void
723 : fd_accdb_user_v2_rw_data_sz_set( fd_accdb_user_t * accdb,
724 : fd_accdb_rw_t * rw,
725 : ulong data_sz,
726 0 : int flags ) {
727 0 : int flag_dontzero = !!( flags & FD_ACCDB_FLAG_DONTZERO );
728 0 : if( FD_UNLIKELY( flags & ~(FD_ACCDB_FLAG_DONTZERO) ) ) {
729 0 : FD_LOG_CRIT(( "invalid flags for rw_data_sz_set: %#02x", (uint)flags ));
730 0 : }
731 :
732 0 : ulong prev_sz = rw->meta->dlen;
733 0 : if( data_sz>prev_sz ) {
734 0 : ulong data_max = fd_accdb_user_v2_rw_data_max( accdb, rw );
735 0 : if( FD_UNLIKELY( data_sz>data_max ) ) {
736 0 : FD_LOG_CRIT(( "attempted to write %lu bytes into a rec with only %lu bytes of data space",
737 0 : data_sz, data_max ));
738 0 : }
739 0 : if( !flag_dontzero ) {
740 0 : void * tail = (uchar *)fd_accdb_ref_data( rw ) + prev_sz;
741 0 : fd_memset( tail, 0, data_sz-prev_sz );
742 0 : }
743 0 : }
744 0 : rw->meta->dlen = (uint)data_sz;
745 :
746 0 : if( rw->ref->accdb_type==FD_ACCDB_TYPE_V1 ) {
747 0 : fd_funk_rec_t * rec = (fd_funk_rec_t *)rw->ref->user_data;
748 0 : rec->val_sz = (uint)( sizeof(fd_account_meta_t)+data_sz ) & FD_FUNK_REC_VAL_MAX;
749 0 : }
750 0 : }
751 :
752 : fd_accdb_user_t *
753 : fd_accdb_user_v2_init( fd_accdb_user_t * accdb_,
754 : void * shfunk,
755 : void * vinyl_rq,
756 : void * vinyl_data,
757 : void * vinyl_req_pool,
758 0 : ulong vinyl_link_id ) {
759 0 : if( FD_UNLIKELY( !accdb_ ) ) {
760 0 : FD_LOG_WARNING(( "NULL ljoin" ));
761 0 : return NULL;
762 0 : }
763 0 : if( FD_UNLIKELY( !shfunk ) ) {
764 0 : FD_LOG_WARNING(( "NULL shfunk" ));
765 0 : return NULL;
766 0 : }
767 0 : if( FD_UNLIKELY( !vinyl_data ) ) {
768 0 : FD_LOG_WARNING(( "NULL vinyl_data" ));
769 0 : return NULL;
770 0 : }
771 :
772 0 : fd_accdb_user_v2_t * accdb = fd_type_pun( accdb_ );
773 0 : memset( accdb, 0, sizeof(fd_accdb_user_v2_t) );
774 :
775 0 : if( FD_UNLIKELY( !fd_funk_join( accdb->funk, shfunk ) ) ) {
776 0 : FD_LOG_CRIT(( "fd_funk_join failed" ));
777 0 : }
778 :
779 0 : fd_vinyl_rq_t * rq = fd_vinyl_rq_join( vinyl_rq );
780 0 : fd_vinyl_req_pool_t * req_pool = fd_vinyl_req_pool_join( vinyl_req_pool );
781 0 : if( FD_UNLIKELY( !rq || !req_pool ) ) {
782 : /* component joins log warning if this is reached */
783 0 : FD_LOG_WARNING(( "Failed to initialize database client" ));
784 0 : return NULL;
785 0 : }
786 :
787 0 : accdb->vinyl_req_id = 0UL;
788 0 : accdb->vinyl_rq = rq;
789 0 : accdb->vinyl_link_id = vinyl_link_id;
790 0 : accdb->vinyl_data_wksp = vinyl_data;
791 0 : accdb->vinyl_req_wksp = fd_wksp_containing( req_pool );
792 0 : accdb->vinyl_req_pool = req_pool;
793 0 : accdb->base.accdb_type = FD_ACCDB_TYPE_V2;
794 0 : accdb->base.vt = &fd_accdb_user_v2_vt;
795 0 : return accdb_;
796 0 : }
797 :
798 : void
799 0 : fd_accdb_user_v2_fini( fd_accdb_user_t * accdb ) {
800 0 : fd_accdb_user_v2_t * user = (fd_accdb_user_v2_t *)accdb;
801 :
802 0 : fd_vinyl_rq_leave( user->vinyl_rq );
803 :
804 0 : if( FD_UNLIKELY( !fd_funk_leave( user->funk, NULL ) ) ) FD_LOG_CRIT(( "fd_funk_leave failed" ));
805 0 : }
806 :
807 : ulong
808 0 : fd_accdb_user_v2_batch_max( fd_accdb_user_t * accdb ) {
809 0 : fd_accdb_user_v2_t * user = (fd_accdb_user_v2_t *)accdb;
810 0 : return fd_vinyl_req_batch_key_max( user->vinyl_req_pool );
811 0 : }
812 :
813 :
814 : fd_accdb_user_vt_t const fd_accdb_user_v2_vt = {
815 : .fini = fd_accdb_user_v2_fini,
816 : .batch_max = fd_accdb_user_v2_batch_max,
817 : .open_ro_multi = fd_accdb_user_v2_open_ro_multi,
818 : .open_rw_multi = fd_accdb_user_v2_open_rw_multi,
819 : .close_ref_multi = fd_accdb_user_v2_close_ref_multi,
820 : .rw_data_max = fd_accdb_user_v2_rw_data_max,
821 : .rw_data_sz_set = fd_accdb_user_v2_rw_data_sz_set
822 : };
|