Line data Source code
1 : #include "fd_funk.h"
2 :
3 : /* Provide the actual record map implementation */
4 :
5 : #define POOL_NAME fd_funk_rec_pool
6 411368359 : #define POOL_ELE_T fd_funk_rec_t
7 : #define POOL_IDX_T uint
8 54293749 : #define POOL_NEXT map_next
9 : #define POOL_IMPL_STYLE 2
10 : #include "../util/tmpl/fd_pool_para.c"
11 :
12 : #define MAP_NAME fd_funk_rec_map
13 389648752 : #define MAP_ELE_T fd_funk_rec_t
14 0 : #define MAP_KEY_T fd_funk_xid_key_pair_t
15 23902083 : #define MAP_KEY pair
16 : #define MAP_KEY_EQ(k0,k1) fd_funk_xid_key_pair_eq((k0),(k1))
17 : #define MAP_KEY_HASH(k0,seed) fd_funk_xid_key_pair_hash((k0),(seed))
18 389648683 : #define MAP_IDX_T uint
19 442114433 : #define MAP_NEXT map_next
20 23902083 : #define MAP_MEMO map_hash
21 33 : #define MAP_MAGIC (0xf173da2ce77ecdb0UL) /* Firedancer rec db version 0 */
22 : #define MAP_MEMOIZE 1
23 : #define MAP_IMPL_STYLE 2
24 : #include "../util/tmpl/fd_map_chain_para.c"
25 :
26 : static void
27 : fd_funk_rec_key_set_pair( fd_funk_xid_key_pair_t * key_pair,
28 : fd_funk_txn_t const * txn,
29 477517690 : fd_funk_rec_key_t const * key ) {
30 477517690 : if( !txn ) {
31 214585797 : fd_funk_txn_xid_set_root( key_pair->xid );
32 262931893 : } else {
33 262931893 : fd_funk_txn_xid_copy( key_pair->xid, &txn->xid );
34 262931893 : }
35 477517690 : fd_funk_rec_key_copy( key_pair->key, key );
36 477517690 : }
37 :
38 : fd_funk_rec_t const *
39 : fd_funk_rec_query_try( fd_funk_t * funk,
40 : fd_funk_txn_t const * txn,
41 : fd_funk_rec_key_t const * key,
42 342797874 : fd_funk_rec_query_t * query ) {
43 : #ifdef FD_FUNK_HANDHOLDING
44 : if( FD_UNLIKELY( funk==NULL || key==NULL || query==NULL ) ) {
45 : return NULL;
46 : }
47 : if( FD_UNLIKELY( txn && !fd_funk_txn_valid( funk, txn ) ) ) {
48 : return NULL;
49 : }
50 : #endif
51 :
52 342797874 : fd_funk_xid_key_pair_t pair[1];
53 342797874 : fd_funk_rec_key_set_pair( pair, txn, key );
54 :
55 342797874 : for(;;) {
56 342797874 : int err = fd_funk_rec_map_query_try( funk->rec_map, pair, NULL, query, 0 );
57 342797874 : if( err == FD_MAP_SUCCESS ) break;
58 48 : if( err == FD_MAP_ERR_KEY ) return NULL;
59 0 : if( err == FD_MAP_ERR_AGAIN ) continue;
60 0 : FD_LOG_CRIT(( "query returned err %d", err ));
61 0 : }
62 342797826 : return fd_funk_rec_map_query_ele_const( query );
63 342797874 : }
64 :
65 :
66 : fd_funk_rec_t *
67 : fd_funk_rec_modify( fd_funk_t * funk,
68 : fd_funk_txn_t const * txn,
69 : fd_funk_rec_key_t const * key,
70 23740419 : fd_funk_rec_query_t * query ) {
71 23740419 : fd_funk_rec_map_t * rec_map = fd_funk_rec_map( funk );
72 23740419 : fd_funk_xid_key_pair_t pair[1];
73 23740419 : fd_funk_rec_key_set_pair( pair, txn, key );
74 :
75 23740419 : int err = fd_funk_rec_map_modify_try( rec_map, pair, NULL, query, FD_MAP_FLAG_BLOCKING );
76 23740419 : if( err==FD_MAP_ERR_KEY ) return NULL;
77 23740419 : if( err!=FD_MAP_SUCCESS ) FD_LOG_CRIT(( "query returned err %d", err ));
78 :
79 23740419 : fd_funk_rec_t * rec = fd_funk_rec_map_query_ele( query );
80 23740419 : return rec;
81 23740419 : }
82 :
83 : void
84 23740419 : fd_funk_rec_modify_publish( fd_funk_rec_query_t * query ) {
85 23740419 : fd_funk_rec_map_modify_test( query );
86 23740419 : }
87 :
88 : fd_funk_rec_t const *
89 : fd_funk_rec_query_try_global( fd_funk_t const * funk,
90 : fd_funk_txn_t const * txn,
91 : fd_funk_rec_key_t const * key,
92 : fd_funk_txn_t const ** txn_out,
93 110398651 : fd_funk_rec_query_t * query ) {
94 : #ifdef FD_FUNK_HANDHOLDING
95 : if( FD_UNLIKELY( funk==NULL || key==NULL || query==NULL ) ) {
96 : return NULL;
97 : }
98 : if( FD_UNLIKELY( txn && !fd_funk_txn_valid( funk, txn ) ) ) {
99 : return NULL;
100 : }
101 : #endif
102 :
103 : /* Look for the first element in the hash chain with the right
104 : record key. This takes advantage of the fact that elements with
105 : the same record key appear on the same hash chain in order of
106 : newest to oldest. */
107 :
108 110398651 : fd_funk_xid_key_pair_t pair[1];
109 110398651 : fd_funk_rec_key_set_pair( pair, txn, key );
110 :
111 110398651 : fd_funk_rec_map_shmem_t * rec_map = funk->rec_map->map;
112 110398651 : ulong hash = fd_funk_rec_map_key_hash( pair, rec_map->seed );
113 110398651 : ulong chain_idx = (hash & (rec_map->chain_cnt-1UL) );
114 :
115 110398651 : fd_funk_rec_map_shmem_private_chain_t * chain = fd_funk_rec_map_shmem_private_chain( rec_map, hash );
116 110398651 : query->ele = NULL;
117 110398651 : query->chain = chain;
118 110398651 : query->ver_cnt = chain->ver_cnt; /* After unlock */
119 :
120 110398651 : for( fd_funk_rec_map_iter_t iter = fd_funk_rec_map_iter( funk->rec_map, chain_idx );
121 200542338 : !fd_funk_rec_map_iter_done( iter );
122 200510448 : iter = fd_funk_rec_map_iter_next( iter ) ) {
123 200510448 : fd_funk_rec_t const * ele = fd_funk_rec_map_iter_ele_const( iter );
124 200510448 : if( FD_LIKELY( hash == ele->map_hash ) && FD_LIKELY( fd_funk_rec_key_eq( key, ele->pair.key ) ) ) {
125 :
126 : /* For cur_txn in path from [txn] to [root] where root is NULL */
127 :
128 348755917 : for( fd_funk_txn_t const * cur_txn = txn; ; cur_txn = fd_funk_txn_parent( cur_txn, funk->txn_pool ) ) {
129 : /* If record ele is part of transaction cur_txn, we have a
130 : match. According to the property above, this will be the
131 : youngest descendent in the transaction stack. */
132 :
133 348755917 : int match = FD_UNLIKELY( cur_txn ) ? /* opt for root find (FIXME: eliminate branch with cmov into txn_xid_eq?) */
134 254923080 : fd_funk_txn_xid_eq( &cur_txn->xid, ele->pair.xid ) :
135 348755917 : fd_funk_txn_xid_eq_root( ele->pair.xid );
136 :
137 348755917 : if( FD_LIKELY( match ) ) {
138 110366761 : if( txn_out ) *txn_out = cur_txn;
139 110366761 : query->ele = ( FD_UNLIKELY( ele->flags & FD_FUNK_REC_FLAG_ERASE ) ? NULL :
140 110366761 : (fd_funk_rec_t *)ele );
141 110366761 : return query->ele;
142 110366761 : }
143 :
144 238389156 : if( cur_txn == NULL ) break;
145 238389156 : }
146 153776086 : }
147 200510448 : }
148 31890 : return NULL;
149 110398651 : }
150 :
151 : fd_funk_rec_t const *
152 : fd_funk_rec_query_copy( fd_funk_t * funk,
153 : fd_funk_txn_t const * txn,
154 : fd_funk_rec_key_t const * key,
155 : fd_valloc_t valloc,
156 0 : ulong * sz_out ) {
157 0 : *sz_out = ULONG_MAX;
158 0 : fd_funk_xid_key_pair_t pair[1];
159 0 : fd_funk_rec_key_set_pair( pair, txn, key );
160 :
161 0 : void * last_copy = NULL;
162 0 : ulong last_copy_sz = 0;
163 0 : for(;;) {
164 0 : fd_funk_rec_query_t query[1];
165 0 : int err = fd_funk_rec_map_query_try( funk->rec_map, pair, NULL, query, 0 );
166 0 : if( err == FD_MAP_ERR_KEY ) {
167 0 : if( last_copy ) fd_valloc_free( valloc, last_copy );
168 0 : return NULL;
169 0 : }
170 0 : if( err == FD_MAP_ERR_AGAIN ) continue;
171 0 : if( err != FD_MAP_SUCCESS ) FD_LOG_CRIT(( "query returned err %d", err ));
172 0 : fd_funk_rec_t const * rec = fd_funk_rec_map_query_ele_const( query );
173 0 : ulong sz = fd_funk_val_sz( rec );
174 0 : void * copy;
175 0 : if( sz <= last_copy_sz ) {
176 0 : copy = last_copy;
177 0 : } else {
178 0 : if( last_copy ) fd_valloc_free( valloc, last_copy );
179 0 : copy = last_copy = fd_valloc_malloc( valloc, 1, sz );
180 0 : last_copy_sz = sz;
181 0 : }
182 0 : memcpy( copy, fd_funk_val( rec, fd_funk_wksp( funk ) ), sz );
183 0 : *sz_out = sz;
184 0 : if( !fd_funk_rec_query_test( query ) ) return copy;
185 0 : }
186 0 : }
187 :
188 : int
189 405715597 : fd_funk_rec_query_test( fd_funk_rec_query_t * query ) {
190 405715597 : return fd_funk_rec_map_query_test( query );
191 405715597 : }
192 :
193 : fd_funk_rec_t *
194 : fd_funk_rec_prepare( fd_funk_t * funk,
195 : fd_funk_txn_t * txn,
196 : fd_funk_rec_key_t const * key,
197 : fd_funk_rec_prepare_t * prepare,
198 251524166 : int * opt_err ) {
199 : #ifdef FD_FUNK_HANDHOLDING
200 : if( FD_UNLIKELY( funk==NULL || key==NULL || prepare==NULL ) ) {
201 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_INVAL );
202 : return NULL;
203 : }
204 : if( FD_UNLIKELY( txn && !fd_funk_txn_valid( funk, txn ) ) ) {
205 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_INVAL );
206 : return NULL;
207 : }
208 : #endif
209 :
210 251524166 : if( !txn ) { /* Modifying last published */
211 6060348 : if( FD_UNLIKELY( fd_funk_last_publish_is_frozen( funk ) ) ) {
212 6060120 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_FROZEN );
213 6060120 : return NULL;
214 6060120 : }
215 245463818 : } else {
216 245463818 : if( FD_UNLIKELY( fd_funk_txn_is_frozen( txn ) ) ) {
217 39779922 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_FROZEN );
218 39779922 : return NULL;
219 39779922 : }
220 245463818 : }
221 :
222 205684124 : fd_funk_rec_t * rec = prepare->rec = fd_funk_rec_pool_acquire( funk->rec_pool, NULL, 1, opt_err );
223 205684124 : if( opt_err && *opt_err == FD_POOL_ERR_CORRUPT ) {
224 0 : FD_LOG_ERR(( "corrupt element returned from funk rec pool" ));
225 0 : }
226 :
227 205684124 : if( rec != NULL ) {
228 23902104 : if( txn == NULL ) {
229 228 : fd_funk_txn_xid_set_root( rec->pair.xid );
230 228 : rec->txn_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
231 228 : prepare->rec_head_idx = &funk->shmem->rec_head_idx;
232 228 : prepare->rec_tail_idx = &funk->shmem->rec_tail_idx;
233 228 : prepare->txn_lock = &funk->shmem->lock;
234 23901876 : } else {
235 23901876 : fd_funk_txn_xid_copy( rec->pair.xid, &txn->xid );
236 23901876 : rec->txn_cidx = fd_funk_txn_cidx( (ulong)( txn - funk->txn_pool->ele ) );
237 23901876 : prepare->rec_head_idx = &txn->rec_head_idx;
238 23901876 : prepare->rec_tail_idx = &txn->rec_tail_idx;
239 23901876 : prepare->txn_lock = &txn->lock;
240 23901876 : }
241 23902104 : fd_funk_rec_key_copy( rec->pair.key, key );
242 23902104 : fd_funk_val_init( rec );
243 23902104 : rec->tag = 0;
244 23902104 : rec->flags = 0;
245 23902104 : rec->prev_idx = FD_FUNK_REC_IDX_NULL;
246 23902104 : rec->next_idx = FD_FUNK_REC_IDX_NULL;
247 181782020 : } else {
248 181782020 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_REC );
249 181782020 : }
250 205684124 : return rec;
251 205684124 : }
252 :
253 : void
254 : fd_funk_rec_publish( fd_funk_t * funk,
255 23899185 : fd_funk_rec_prepare_t * prepare ) {
256 23899185 : fd_funk_rec_t * rec = prepare->rec;
257 23899185 : uint * rec_head_idx = prepare->rec_head_idx;
258 23899185 : uint * rec_tail_idx = prepare->rec_tail_idx;
259 :
260 : /* Lock the txn */
261 1102088365 : while( FD_ATOMIC_CAS( prepare->txn_lock, 0, 1 ) ) FD_SPIN_PAUSE();
262 :
263 23899185 : uint rec_prev_idx;
264 23899185 : uint rec_idx = (uint)( rec - funk->rec_pool->ele );
265 23899185 : rec_prev_idx = *rec_tail_idx;
266 23899185 : *rec_tail_idx = rec_idx;
267 23899185 : rec->prev_idx = rec_prev_idx;
268 23899185 : rec->next_idx = FD_FUNK_REC_IDX_NULL;
269 23899185 : if( fd_funk_rec_idx_is_null( rec_prev_idx ) ) {
270 281052 : *rec_head_idx = rec_idx;
271 23618133 : } else {
272 23618133 : funk->rec_pool->ele[ rec_prev_idx ].next_idx = rec_idx;
273 23618133 : }
274 :
275 23899185 : if( fd_funk_rec_map_insert( funk->rec_map, rec, FD_MAP_FLAG_BLOCKING ) ) {
276 0 : FD_LOG_CRIT(( "fd_funk_rec_map_insert failed" ));
277 0 : }
278 :
279 23899185 : FD_VOLATILE( *prepare->txn_lock ) = 0;
280 23899185 : }
281 :
282 : void
283 : fd_funk_rec_cancel( fd_funk_t * funk,
284 21 : fd_funk_rec_prepare_t * prepare ) {
285 21 : fd_funk_val_flush( prepare->rec, funk->alloc, funk->wksp );
286 21 : fd_funk_rec_pool_release( funk->rec_pool, prepare->rec, 1 );
287 21 : }
288 :
289 : static void
290 : fd_funk_rec_txn_publish( fd_funk_t * funk,
291 2898 : fd_funk_rec_prepare_t * prepare ) {
292 2898 : fd_funk_rec_t * rec = prepare->rec;
293 2898 : uint * rec_head_idx = prepare->rec_head_idx;
294 2898 : uint * rec_tail_idx = prepare->rec_tail_idx;
295 :
296 2898 : uint rec_prev_idx;
297 2898 : uint rec_idx = (uint)( rec - funk->rec_pool->ele );
298 2898 : rec_prev_idx = *rec_tail_idx;
299 2898 : *rec_tail_idx = rec_idx;
300 2898 : rec->prev_idx = rec_prev_idx;
301 2898 : rec->next_idx = FD_FUNK_REC_IDX_NULL;
302 2898 : if( fd_funk_rec_idx_is_null( rec_prev_idx ) ) {
303 1449 : *rec_head_idx = rec_idx;
304 1449 : } else {
305 1449 : funk->rec_pool->ele[ rec_prev_idx ].next_idx = rec_idx;
306 1449 : }
307 :
308 2898 : if( FD_UNLIKELY( fd_funk_rec_map_txn_insert( funk->rec_map, rec ) ) ) {
309 0 : FD_LOG_CRIT(( "fd_funk_rec_map_insert failed" ));
310 0 : }
311 2898 : }
312 :
313 : void
314 : fd_funk_rec_try_clone_safe( fd_funk_t * funk,
315 : fd_funk_txn_t * txn,
316 : fd_funk_rec_key_t const * key,
317 : ulong align,
318 23740419 : ulong min_sz ) {
319 :
320 : /* TODO: There is probably a cleaner way to allocate the txn memory. */
321 :
322 : /* See the header comment for why the max is 2. */
323 23743317 : #define MAX_TXN_KEY_CNT (2UL)
324 23740419 : uchar txn_mem[ fd_funk_rec_map_txn_footprint( MAX_TXN_KEY_CNT ) ] __attribute__((aligned(alignof(fd_funk_rec_map_txn_t))));
325 :
326 : /* First, we will do a global query to find a version of the record
327 : from either the current transaction or one of its ancestors. */
328 :
329 23740419 : fd_funk_rec_t const * rec_glob = NULL;
330 23740419 : fd_funk_txn_t const * txn_glob = NULL;
331 :
332 23740419 : for(;;) {
333 23740419 : fd_funk_rec_query_t query_glob[1];
334 23740419 : txn_glob = NULL;
335 23740419 : rec_glob = fd_funk_rec_query_try_global(
336 23740419 : funk, txn,key, &txn_glob, query_glob );
337 :
338 : /* If the record exists and already exists in the specified funk
339 : txn, we can return successfully. */
340 23740419 : if( rec_glob && txn==txn_glob ) {
341 23737521 : return;
342 23737521 : }
343 :
344 2898 : if( fd_funk_rec_query_test( query_glob )==FD_FUNK_SUCCESS ) {
345 2898 : break;
346 2898 : }
347 2898 : }
348 :
349 : /* At this point, we need to atomically clone the record and copy
350 : in the contents of the global record. We at most will be trying to
351 : create a txn with two record keys. If the key exists in some
352 : ancestor txn, than we need to add the ancestor key to the txn. We
353 : will always need to add the current key to the txn. */
354 : /* TODO: Turn key_max into a const. */
355 :
356 2898 : fd_funk_rec_map_txn_t * map_txn = fd_funk_rec_map_txn_init( txn_mem, funk->rec_map, MAX_TXN_KEY_CNT );
357 :
358 2898 : fd_funk_xid_key_pair_t pair[1];
359 2898 : fd_funk_rec_key_set_pair( pair, txn, key );
360 :
361 2898 : fd_funk_rec_map_txn_add( map_txn, pair, 1 );
362 :
363 2898 : fd_funk_xid_key_pair_t pair_glob[1];
364 2898 : if( rec_glob ) {
365 2898 : fd_funk_rec_key_set_pair( pair_glob, txn_glob, key );
366 2898 : fd_funk_rec_map_txn_add( map_txn, pair_glob, 1 );
367 2898 : }
368 :
369 : /* Now that the keys are in the txn, we can try to start the
370 : transaction on the record_map. */
371 :
372 2898 : int err = fd_funk_rec_map_txn_try( map_txn, FD_MAP_FLAG_BLOCKING );
373 2898 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) {
374 0 : FD_LOG_CRIT(( "fd_funk_rec_map_txn_try returned err %d", err ));
375 0 : }
376 :
377 : /* We are now in a txn try with a lock on both record keys. */
378 :
379 : /* First we need to make sure that the record hasn't been created yet. */
380 2898 : fd_funk_rec_map_query_t query[1];
381 2898 : err = fd_funk_rec_map_txn_query( funk->rec_map, pair, NULL, query, FD_MAP_FLAG_BLOCKING );
382 2898 : if( FD_UNLIKELY( err==FD_MAP_SUCCESS ) ) {
383 : /* The key has been inserted. We need to gracefully exit the txn. */
384 0 : err = fd_funk_rec_map_txn_test( map_txn );
385 0 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) {
386 0 : FD_LOG_CRIT(( "fd_funk_rec_map_txn_test returned err %d", err ));
387 0 : }
388 0 : fd_funk_rec_map_txn_fini( map_txn );
389 0 : return;
390 0 : }
391 :
392 : /* If we are at this point, we know for certain that the record hasn't
393 : been created yet. We will copy in the record from the global txn
394 : (if one exists). */
395 :
396 2898 : fd_funk_rec_prepare_t prepare[1];
397 2898 : fd_funk_rec_t * new_rec = fd_funk_rec_prepare( funk, txn, key, prepare, &err );
398 2898 : if( FD_UNLIKELY( err ) ) {
399 0 : FD_LOG_CRIT(( "fd_funk_rec_prepare returned err=%d", err ));
400 0 : }
401 :
402 : /* It is fine to use the old version of the record because we can
403 : assume that it comes from a frozen txn. */
404 2898 : ulong old_val_sz = !!rec_glob ? rec_glob->val_sz : 0UL;
405 2898 : ulong new_val_sz = fd_ulong_max( old_val_sz, min_sz );
406 :
407 2898 : uchar * new_val = fd_funk_val_truncate(
408 2898 : new_rec,
409 2898 : fd_funk_alloc( funk ),
410 2898 : fd_funk_wksp( funk ),
411 2898 : align,
412 2898 : new_val_sz,
413 2898 : &err );
414 2898 : if( FD_UNLIKELY( err ) ) {
415 0 : FD_LOG_CRIT(( "fd_funk_val_truncate returned err=%d", err ));
416 0 : }
417 2898 : if( FD_UNLIKELY( !new_val ) ) {
418 0 : FD_LOG_CRIT(( "fd_funk_val_truncate returned NULL" ));
419 0 : }
420 :
421 2898 : if( rec_glob ) {
422 : /* If we have a global copy of the record, copy it in. */
423 2898 : uchar * old_data = fd_funk_val( rec_glob, fd_funk_wksp( funk ) );
424 2898 : memcpy( new_val, old_data, old_val_sz );
425 2898 : }
426 :
427 2898 : fd_funk_rec_txn_publish( funk, prepare );
428 :
429 2898 : err = fd_funk_rec_map_txn_test( map_txn );
430 2898 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) {
431 0 : FD_LOG_CRIT(( "fd_funk_rec_map_txn_test returned err %d", err ));
432 0 : }
433 :
434 2898 : fd_funk_rec_map_txn_fini( map_txn );
435 :
436 2898 : #undef MAX_TXN_KEY_CNT
437 2898 : }
438 :
439 : fd_funk_rec_t *
440 : fd_funk_rec_clone( fd_funk_t * funk,
441 : fd_funk_txn_t * txn,
442 : fd_funk_rec_key_t const * key,
443 : fd_funk_rec_prepare_t * prepare,
444 24 : int * opt_err ) {
445 24 : fd_funk_rec_t * new_rec = fd_funk_rec_prepare( funk, txn, key, prepare, opt_err );
446 24 : if( !new_rec ) return NULL;
447 :
448 24 : for(;;) {
449 24 : fd_funk_rec_query_t query[1];
450 24 : fd_funk_rec_t const * old_rec = fd_funk_rec_query_try_global( funk, txn, key, NULL, query );
451 24 : if( !old_rec ) {
452 21 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_KEY );
453 21 : fd_funk_rec_cancel( funk, prepare );
454 21 : return NULL;
455 21 : }
456 :
457 3 : fd_wksp_t * wksp = fd_funk_wksp( funk );
458 3 : ulong val_sz = old_rec->val_sz;
459 3 : void * buf = fd_funk_val_truncate(
460 3 : new_rec,
461 3 : fd_funk_alloc( funk ),
462 3 : wksp,
463 3 : 0UL,
464 3 : val_sz,
465 3 : opt_err );
466 3 : if( !buf ) {
467 0 : fd_funk_rec_cancel( funk, prepare );
468 0 : return NULL;
469 0 : }
470 3 : memcpy( buf, fd_funk_val( old_rec, wksp ), val_sz );
471 :
472 3 : if( !fd_funk_rec_query_test( query ) ) {
473 3 : return new_rec;
474 3 : }
475 3 : }
476 24 : }
477 :
478 : void
479 : fd_funk_rec_hard_remove( fd_funk_t * funk,
480 : fd_funk_txn_t * txn,
481 0 : fd_funk_rec_key_t const * key ) {
482 0 : fd_funk_xid_key_pair_t pair[1];
483 0 : fd_funk_rec_key_set_pair( pair, txn, key );
484 :
485 0 : uchar * lock = NULL;
486 0 : if( txn==NULL ) {
487 0 : lock = &funk->shmem->lock;
488 0 : } else {
489 0 : lock = &txn->lock;
490 0 : }
491 :
492 0 : while( FD_ATOMIC_CAS( lock, 0, 1 ) ) FD_SPIN_PAUSE();
493 :
494 0 : fd_funk_rec_t * rec = NULL;
495 0 : for(;;) {
496 0 : fd_funk_rec_map_query_t rec_query[1];
497 0 : int err = fd_funk_rec_map_remove( funk->rec_map, pair, NULL, rec_query, FD_MAP_FLAG_BLOCKING );
498 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
499 0 : if( err == FD_MAP_ERR_KEY ) {
500 0 : FD_VOLATILE( *lock ) = 0;
501 0 : return;
502 0 : }
503 0 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "map corruption" ));
504 0 : rec = fd_funk_rec_map_query_ele( rec_query );
505 0 : break;
506 0 : }
507 :
508 0 : uint prev_idx = rec->prev_idx;
509 0 : uint next_idx = rec->next_idx;
510 0 : if( txn == NULL ) {
511 0 : if( fd_funk_rec_idx_is_null( prev_idx ) ) funk->shmem->rec_head_idx = next_idx;
512 0 : else funk->rec_pool->ele[ prev_idx ].next_idx = next_idx;
513 0 : if( fd_funk_rec_idx_is_null( next_idx ) ) funk->shmem->rec_tail_idx = prev_idx;
514 0 : else funk->rec_pool->ele[ next_idx ].prev_idx = prev_idx;
515 0 : } else {
516 0 : if( fd_funk_rec_idx_is_null( prev_idx ) ) txn->rec_head_idx = next_idx;
517 0 : else funk->rec_pool->ele[ prev_idx ].next_idx = next_idx;
518 0 : if( fd_funk_rec_idx_is_null( next_idx ) ) txn->rec_tail_idx = prev_idx;
519 0 : else funk->rec_pool->ele[ next_idx ].prev_idx = prev_idx;
520 0 : }
521 :
522 0 : FD_VOLATILE( *lock ) = 0;
523 :
524 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
525 0 : fd_funk_rec_pool_release( funk->rec_pool, rec, 1 );
526 0 : }
527 :
528 : int
529 : fd_funk_rec_remove( fd_funk_t * funk,
530 : fd_funk_txn_t * txn,
531 : fd_funk_rec_key_t const * key,
532 : fd_funk_rec_t ** rec_out,
533 12211854 : ulong erase_data ) {
534 : #ifdef FD_FUNK_HANDHOLDING
535 : if( FD_UNLIKELY( funk==NULL || key==NULL ) ) {
536 : return FD_FUNK_ERR_INVAL;
537 : }
538 : if( FD_UNLIKELY( txn && !fd_funk_txn_valid( funk, txn ) ) ) {
539 : return FD_FUNK_ERR_INVAL;
540 : }
541 : #endif
542 :
543 12211854 : if( !txn ) { /* Modifying last published */
544 1491216 : if( FD_UNLIKELY( fd_funk_last_publish_is_frozen( funk ) ) ) {
545 1437087 : return FD_FUNK_ERR_FROZEN;
546 1437087 : }
547 10720638 : } else {
548 10720638 : if( FD_UNLIKELY( fd_funk_txn_is_frozen( txn ) ) ) {
549 10199817 : return FD_FUNK_ERR_FROZEN;
550 10199817 : }
551 10720638 : }
552 :
553 574950 : fd_funk_xid_key_pair_t pair[1];
554 574950 : fd_funk_rec_key_set_pair( pair, txn, key );
555 :
556 574950 : fd_funk_rec_query_t query[ 1 ];
557 574950 : for(;;) {
558 574950 : int err = fd_funk_rec_map_query_try( funk->rec_map, pair, NULL, query, 0 );
559 574950 : if( err == FD_MAP_SUCCESS ) break;
560 0 : if( err == FD_MAP_ERR_KEY ) return FD_FUNK_ERR_KEY;
561 0 : if( err == FD_MAP_ERR_AGAIN ) continue;
562 0 : FD_LOG_CRIT(( "query returned err %d", err ));
563 0 : }
564 :
565 574950 : fd_funk_rec_t * rec = fd_funk_rec_map_query_ele( query );
566 574950 : if( rec_out ) *rec_out = rec;
567 :
568 : /* Access the flags atomically */
569 574950 : ulong old_flags;
570 574950 : for(;;) {
571 574950 : old_flags = rec->flags;
572 574950 : if( FD_UNLIKELY( old_flags & FD_FUNK_REC_FLAG_ERASE ) ) return FD_FUNK_SUCCESS;
573 459390 : if( FD_ATOMIC_CAS( &rec->flags, old_flags, old_flags | FD_FUNK_REC_FLAG_ERASE ) == old_flags ) break;
574 459390 : }
575 :
576 : /* Flush the value and leave a tombstone behind. In theory, this can
577 : lead to an unbounded number of records, but for application
578 : reasons, we need to remember what was deleted. */
579 :
580 459390 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
581 :
582 : /* At this point, the 5 most significant bytes should store data about the
583 : transaction that the record was updated in. */
584 :
585 459390 : fd_funk_rec_set_erase_data( rec, erase_data );
586 :
587 459390 : return FD_FUNK_SUCCESS;
588 574950 : }
589 :
590 : void
591 459390 : fd_funk_rec_set_erase_data( fd_funk_rec_t * rec, ulong erase_data ) {
592 459390 : rec->flags |= ((erase_data & 0xFFFFFFFFFFUL) << (sizeof(unsigned long) * 8 - 40));
593 459390 : }
594 :
595 : ulong
596 0 : fd_funk_rec_get_erase_data( fd_funk_rec_t const * rec ) {
597 0 : return (rec->flags >> (sizeof(unsigned long) * 8 - 40)) & 0xFFFFFFFFFFUL;
598 0 : }
599 :
600 : int
601 : fd_funk_rec_forget( fd_funk_t * funk,
602 : fd_funk_rec_t ** recs,
603 0 : ulong recs_cnt ) {
604 : #ifdef FD_FUNK_HANDHOLDING
605 : if( FD_UNLIKELY( !funk ) ) return FD_FUNK_ERR_INVAL;
606 : #endif
607 :
608 : #ifdef FD_FUNK_HANDHOLDING
609 : ulong rec_max = funk->shmem->rec_max;
610 : #endif
611 :
612 0 : for( ulong i = 0; i < recs_cnt; ++i ) {
613 0 : fd_funk_rec_t * rec = recs[i];
614 :
615 : #ifdef FD_FUNK_HANDHOLDING
616 : ulong rec_idx = (ulong)(rec - funk->rec_pool->ele);
617 : if( FD_UNLIKELY( (rec_idx>=rec_max) /* Out of map (incl NULL) */ | (rec!=(funk->rec_pool->ele+rec_idx)) /* Bad alignment */ ) )
618 : return FD_FUNK_ERR_INVAL;
619 : #endif
620 :
621 0 : ulong txn_idx = fd_funk_txn_idx( rec->txn_cidx );
622 0 : if( FD_UNLIKELY( !fd_funk_txn_idx_is_null( txn_idx ) || /* Must be published */
623 0 : !( rec->flags & FD_FUNK_REC_FLAG_ERASE ) ) ) { /* Must be removed */
624 0 : return FD_FUNK_ERR_KEY;
625 0 : }
626 :
627 0 : for(;;) {
628 0 : fd_funk_rec_map_query_t rec_query[1];
629 0 : int err = fd_funk_rec_map_remove( funk->rec_map, fd_funk_rec_pair( rec ), NULL, rec_query, FD_MAP_FLAG_BLOCKING );
630 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
631 0 : if( err == FD_MAP_ERR_KEY ) return FD_FUNK_ERR_KEY;
632 0 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "map corruption" ));
633 0 : if( rec != fd_funk_rec_map_query_ele( rec_query ) ) FD_LOG_CRIT(( "map corruption" ));
634 0 : break;
635 0 : }
636 :
637 0 : uint prev_idx = rec->prev_idx;
638 0 : uint next_idx = rec->next_idx;
639 0 : if( fd_funk_rec_idx_is_null( prev_idx ) ) funk->shmem->rec_head_idx = next_idx;
640 0 : else funk->rec_pool->ele[ prev_idx ].next_idx = next_idx;
641 0 : if( fd_funk_rec_idx_is_null( next_idx ) ) funk->shmem->rec_tail_idx = prev_idx;
642 0 : else funk->rec_pool->ele[ next_idx ].prev_idx = prev_idx;
643 :
644 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
645 0 : fd_funk_rec_pool_release( funk->rec_pool, rec, 1 );
646 0 : }
647 :
648 0 : return FD_FUNK_SUCCESS;
649 0 : }
650 :
651 : static void
652 18901185 : fd_funk_all_iter_skip_nulls( fd_funk_all_iter_t * iter ) {
653 18901185 : if( iter->chain_idx == iter->chain_cnt ) return;
654 3362019912 : while( fd_funk_rec_map_iter_done( iter->rec_map_iter ) ) {
655 3343220742 : if( ++(iter->chain_idx) == iter->chain_cnt ) break;
656 3343118727 : iter->rec_map_iter = fd_funk_rec_map_iter( &iter->rec_map, iter->chain_idx );
657 3343118727 : }
658 18901185 : }
659 :
660 : void
661 102015 : fd_funk_all_iter_new( fd_funk_t * funk, fd_funk_all_iter_t * iter ) {
662 102015 : iter->rec_map = *funk->rec_map;
663 102015 : iter->chain_cnt = fd_funk_rec_map_chain_cnt( &iter->rec_map );
664 102015 : iter->chain_idx = 0;
665 102015 : iter->rec_map_iter = fd_funk_rec_map_iter( &iter->rec_map, 0 );
666 102015 : fd_funk_all_iter_skip_nulls( iter );
667 102015 : }
668 :
669 : int
670 18901185 : fd_funk_all_iter_done( fd_funk_all_iter_t * iter ) {
671 18901185 : return ( iter->chain_idx == iter->chain_cnt );
672 18901185 : }
673 :
674 : void
675 18799170 : fd_funk_all_iter_next( fd_funk_all_iter_t * iter ) {
676 18799170 : iter->rec_map_iter = fd_funk_rec_map_iter_next( iter->rec_map_iter );
677 18799170 : fd_funk_all_iter_skip_nulls( iter );
678 18799170 : }
679 :
680 : fd_funk_rec_t const *
681 18799170 : fd_funk_all_iter_ele_const( fd_funk_all_iter_t * iter ) {
682 18799170 : return fd_funk_rec_map_iter_ele_const( iter->rec_map_iter );
683 18799170 : }
684 :
685 : fd_funk_rec_t *
686 0 : fd_funk_all_iter_ele( fd_funk_all_iter_t * iter ) {
687 0 : return fd_funk_rec_map_iter_ele( iter->rec_map_iter );
688 0 : }
689 :
690 : int
691 12 : fd_funk_rec_verify( fd_funk_t * funk ) {
692 12 : fd_funk_rec_map_t * rec_map = funk->rec_map;
693 12 : fd_funk_rec_pool_t * rec_pool = funk->rec_pool;
694 12 : fd_funk_txn_pool_t * txn_pool = funk->txn_pool;
695 12 : ulong txn_max = fd_funk_txn_pool_ele_max( txn_pool );
696 12 : ulong rec_max = fd_funk_rec_pool_ele_max( rec_pool );
697 :
698 24 : # define TEST(c) do { \
699 24 : if( FD_UNLIKELY( !(c) ) ) { FD_LOG_WARNING(( "FAIL: %s", #c )); return FD_FUNK_ERR_INVAL; } \
700 24 : } while(0)
701 :
702 12 : TEST( !fd_funk_rec_map_verify( rec_map ) );
703 12 : TEST( !fd_funk_rec_pool_verify( rec_pool ) );
704 :
705 : /* Iterate (again) over all records in use */
706 :
707 12 : ulong chain_cnt = fd_funk_rec_map_chain_cnt( rec_map );
708 786450 : for( ulong chain_idx=0UL; chain_idx<chain_cnt; chain_idx++ ) {
709 786438 : for( fd_funk_rec_map_iter_t iter = fd_funk_rec_map_iter( rec_map, chain_idx );
710 786438 : !fd_funk_rec_map_iter_done( iter );
711 786438 : iter = fd_funk_rec_map_iter_next( iter ) ) {
712 0 : fd_funk_rec_t const * rec = fd_funk_rec_map_iter_ele_const( iter );
713 :
714 : /* Make sure every record either links up with the last published
715 : transaction or an in-prep transaction and the flags are sane. */
716 :
717 0 : fd_funk_txn_xid_t const * txn_xid = fd_funk_rec_xid( rec );
718 0 : ulong txn_idx = fd_funk_txn_idx( rec->txn_cidx );
719 :
720 0 : if( fd_funk_txn_idx_is_null( txn_idx ) ) { /* This is a record from the last published transaction */
721 :
722 0 : TEST( fd_funk_txn_xid_eq_root( txn_xid ) );
723 :
724 0 : } else { /* This is a record from an in-prep transaction */
725 :
726 0 : TEST( txn_idx<txn_max );
727 0 : fd_funk_txn_t const * txn = fd_funk_txn_query( txn_xid, funk->txn_map );
728 0 : TEST( txn );
729 0 : TEST( txn==(funk->txn_pool->ele+txn_idx) );
730 :
731 0 : }
732 0 : }
733 786438 : }
734 :
735 : /* Clear record tags and then verify the forward and reverse linkage */
736 :
737 1572876 : for( ulong rec_idx=0UL; rec_idx<rec_max; rec_idx++ ) rec_pool->ele[ rec_idx ].tag = 0U;
738 :
739 12 : do {
740 12 : ulong txn_idx = FD_FUNK_TXN_IDX_NULL;
741 12 : uint rec_idx = funk->shmem->rec_head_idx;
742 12 : while( !fd_funk_rec_idx_is_null( rec_idx ) ) {
743 0 : TEST( (rec_idx<rec_max) && (fd_funk_txn_idx( rec_pool->ele[ rec_idx ].txn_cidx )==txn_idx) && rec_pool->ele[ rec_idx ].tag==0U );
744 0 : rec_pool->ele[ rec_idx ].tag = 1U;
745 0 : fd_funk_rec_query_t query[1];
746 0 : fd_funk_rec_t const * rec2 = fd_funk_rec_query_try_global( funk, NULL, rec_pool->ele[ rec_idx ].pair.key, NULL, query );
747 0 : if( FD_UNLIKELY( rec_pool->ele[ rec_idx ].flags & FD_FUNK_REC_FLAG_ERASE ) )
748 0 : TEST( rec2 == NULL );
749 0 : else
750 0 : TEST( rec2 == rec_pool->ele + rec_idx );
751 0 : uint next_idx = rec_pool->ele[ rec_idx ].next_idx;
752 0 : if( !fd_funk_rec_idx_is_null( next_idx ) ) TEST( rec_pool->ele[ next_idx ].prev_idx==rec_idx );
753 0 : rec_idx = next_idx;
754 0 : }
755 12 : fd_funk_txn_all_iter_t txn_iter[1];
756 12 : for( fd_funk_txn_all_iter_new( funk, txn_iter ); !fd_funk_txn_all_iter_done( txn_iter ); fd_funk_txn_all_iter_next( txn_iter ) ) {
757 0 : fd_funk_txn_t const * txn = fd_funk_txn_all_iter_ele_const( txn_iter );
758 :
759 0 : ulong txn_idx = (ulong)(txn-txn_pool->ele);
760 0 : uint rec_idx = txn->rec_head_idx;
761 0 : while( !fd_funk_rec_idx_is_null( rec_idx ) ) {
762 0 : TEST( (rec_idx<rec_max) && (fd_funk_txn_idx( rec_pool->ele[ rec_idx ].txn_cidx )==txn_idx) && rec_pool->ele[ rec_idx ].tag==0U );
763 0 : rec_pool->ele[ rec_idx ].tag = 1U;
764 0 : fd_funk_rec_query_t query[1];
765 0 : fd_funk_rec_t const * rec2 = fd_funk_rec_query_try_global( funk, txn, rec_pool->ele[ rec_idx ].pair.key, NULL, query );
766 0 : if( FD_UNLIKELY( rec_pool->ele[ rec_idx ].flags & FD_FUNK_REC_FLAG_ERASE ) )
767 0 : TEST( rec2 == NULL );
768 0 : else
769 0 : TEST( rec2 == rec_pool->ele + rec_idx );
770 0 : uint next_idx = rec_pool->ele[ rec_idx ].next_idx;
771 0 : if( !fd_funk_rec_idx_is_null( next_idx ) ) TEST( rec_pool->ele[ next_idx ].prev_idx==rec_idx );
772 0 : rec_idx = next_idx;
773 0 : }
774 0 : }
775 12 : } while(0);
776 :
777 12 : do {
778 12 : ulong txn_idx = FD_FUNK_TXN_IDX_NULL;
779 12 : uint rec_idx = funk->shmem->rec_tail_idx;
780 12 : while( !fd_funk_rec_idx_is_null( rec_idx ) ) {
781 0 : TEST( (rec_idx<rec_max) && (fd_funk_txn_idx( rec_pool->ele[ rec_idx ].txn_cidx )==txn_idx) && rec_pool->ele[ rec_idx ].tag==1U );
782 0 : rec_pool->ele[ rec_idx ].tag = 2U;
783 0 : uint prev_idx = rec_pool->ele[ rec_idx ].prev_idx;
784 0 : if( !fd_funk_rec_idx_is_null( prev_idx ) ) TEST( rec_pool->ele[ prev_idx ].next_idx==rec_idx );
785 0 : rec_idx = prev_idx;
786 0 : }
787 :
788 12 : fd_funk_txn_all_iter_t txn_iter[1];
789 12 : for( fd_funk_txn_all_iter_new( funk, txn_iter ); !fd_funk_txn_all_iter_done( txn_iter ); fd_funk_txn_all_iter_next( txn_iter ) ) {
790 0 : fd_funk_txn_t const * txn = fd_funk_txn_all_iter_ele_const( txn_iter );
791 :
792 0 : ulong txn_idx = (ulong)(txn-txn_pool->ele);
793 0 : uint rec_idx = txn->rec_tail_idx;
794 0 : while( !fd_funk_rec_idx_is_null( rec_idx ) ) {
795 0 : TEST( (rec_idx<rec_max) && (fd_funk_txn_idx( rec_pool->ele[ rec_idx ].txn_cidx )==txn_idx) && rec_pool->ele[ rec_idx ].tag==1U );
796 0 : rec_pool->ele[ rec_idx ].tag = 2U;
797 0 : uint prev_idx = rec_pool->ele[ rec_idx ].prev_idx;
798 0 : if( !fd_funk_rec_idx_is_null( prev_idx ) ) TEST( rec_pool->ele[ prev_idx ].next_idx==rec_idx );
799 0 : rec_idx = prev_idx;
800 0 : }
801 0 : }
802 12 : } while(0);
803 :
804 12 : # undef TEST
805 :
806 12 : return FD_FUNK_SUCCESS;
807 12 : }
|