Line data Source code
1 : #include "fd_funk.h"
2 : #include "fd_funk_txn.h"
3 :
4 : /* Provide the actual record map implementation */
5 :
6 3486291 : static void fd_funk_rec_pool_mark_in_pool( fd_funk_rec_t * ele ) {
7 3486291 : ele->val_gaddr = ULONG_MAX;
8 3486291 : ele->val_sz = UINT_MAX;
9 3486291 : ele->val_max = UINT_MAX;
10 3486291 : }
11 1714080 : static void fd_funk_rec_pool_mark_not_in_pool( fd_funk_rec_t * ele ) {
12 1714080 : ele->val_gaddr = 0;
13 1714080 : ele->val_sz = 0;
14 1714080 : ele->val_max = 0;
15 1714080 : }
16 1572864 : static int fd_funk_rec_pool_is_in_pool( fd_funk_rec_t const * ele ) {
17 1572864 : return (ele->val_sz == UINT_MAX);
18 1572864 : }
19 :
20 : #define POOL_NAME fd_funk_rec_pool
21 54118263 : #define POOL_ELE_T fd_funk_rec_t
22 : #define POOL_IDX_T uint
23 6773235 : #define POOL_NEXT map_next
24 3486291 : #define POOL_MARK_IN_POOL(ele) fd_funk_rec_pool_mark_in_pool(ele)
25 1714080 : #define POOL_MARK_NOT_IN_POOL(ele) fd_funk_rec_pool_mark_not_in_pool(ele)
26 : #define POOL_IMPL_STYLE 2
27 : #include "../util/tmpl/fd_pool_para.c"
28 :
29 : #define MAP_NAME fd_funk_rec_map
30 321393546 : #define MAP_ELE_T fd_funk_rec_t
31 0 : #define MAP_KEY_T fd_funk_xid_key_pair_t
32 1714041 : #define MAP_KEY pair
33 : #define MAP_KEY_EQ(k0,k1) fd_funk_xid_key_pair_eq((k0),(k1))
34 : #define MAP_KEY_HASH(k0,seed) fd_funk_xid_key_pair_hash((k0),(seed))
35 321393483 : #define MAP_IDX_T uint
36 387579207 : #define MAP_NEXT map_next
37 1714041 : #define MAP_MEMO map_hash
38 33 : #define MAP_MAGIC (0xf173da2ce77ecdb0UL) /* Firedancer rec db version 0 */
39 : #define MAP_MEMOIZE 1
40 : #define MAP_IMPL_STYLE 2
41 : #include "../util/tmpl/fd_map_chain_para.c"
42 :
43 : static void
44 : fd_funk_rec_key_set_pair( fd_funk_xid_key_pair_t * key_pair,
45 : fd_funk_txn_t const * txn,
46 384115854 : fd_funk_rec_key_t const * key ) {
47 384115854 : if( !txn ) {
48 214585530 : fd_funk_txn_xid_set_root( key_pair->xid );
49 214585530 : } else {
50 169530324 : fd_funk_txn_xid_copy( key_pair->xid, &txn->xid );
51 169530324 : }
52 384115854 : fd_funk_rec_key_copy( key_pair->key, key );
53 384115854 : }
54 :
55 : fd_funk_rec_t const *
56 : fd_funk_rec_query_try( fd_funk_t * funk,
57 : fd_funk_txn_t const * txn,
58 : fd_funk_rec_key_t const * key,
59 319054995 : fd_funk_rec_query_t * query ) {
60 : #ifdef FD_FUNK_HANDHOLDING
61 : if( FD_UNLIKELY( funk==NULL || key==NULL || query==NULL ) ) {
62 : return NULL;
63 : }
64 : if( FD_UNLIKELY( txn && !fd_funk_txn_valid( funk, txn ) ) ) {
65 : return NULL;
66 : }
67 : #endif
68 :
69 319054995 : fd_funk_xid_key_pair_t pair[1];
70 319054995 : fd_funk_rec_key_set_pair( pair, txn, key );
71 :
72 319054995 : for(;;) {
73 319054995 : int err = fd_funk_rec_map_query_try( funk->rec_map, pair, NULL, query, 0 );
74 319054995 : if( err == FD_MAP_SUCCESS ) break;
75 39 : if( err == FD_MAP_ERR_KEY ) return NULL;
76 0 : if( err == FD_MAP_ERR_AGAIN ) continue;
77 0 : FD_LOG_CRIT(( "query returned err %d", err ));
78 0 : }
79 319054956 : return fd_funk_rec_map_query_ele_const( query );
80 319054995 : }
81 :
82 :
83 : fd_funk_rec_t *
84 : fd_funk_rec_modify( fd_funk_t * funk,
85 : fd_funk_txn_t const * txn,
86 : fd_funk_rec_key_t const * key,
87 27 : fd_funk_rec_query_t * query ) {
88 27 : fd_funk_rec_map_t * rec_map = fd_funk_rec_map( funk );
89 27 : fd_funk_xid_key_pair_t pair[1];
90 27 : fd_funk_rec_key_set_pair( pair, txn, key );
91 :
92 27 : int err = fd_funk_rec_map_modify_try( rec_map, pair, NULL, query, FD_MAP_FLAG_BLOCKING );
93 27 : if( err==FD_MAP_ERR_KEY ) return NULL;
94 27 : if( err!=FD_MAP_SUCCESS ) FD_LOG_CRIT(( "query returned err %d", err ));
95 :
96 27 : fd_funk_rec_t * rec = fd_funk_rec_map_query_ele( query );
97 27 : return rec;
98 27 : }
99 :
100 : void
101 27 : fd_funk_rec_modify_publish( fd_funk_rec_query_t * query ) {
102 27 : fd_funk_rec_map_modify_test( query );
103 27 : }
104 :
105 : fd_funk_rec_t const *
106 : fd_funk_rec_query_try_global( fd_funk_t const * funk,
107 : fd_funk_txn_t const * txn,
108 : fd_funk_rec_key_t const * key,
109 : fd_funk_txn_t const ** txn_out,
110 64485471 : fd_funk_rec_query_t * query ) {
111 : #ifdef FD_FUNK_HANDHOLDING
112 : if( FD_UNLIKELY( funk==NULL || key==NULL || query==NULL ) ) {
113 : return NULL;
114 : }
115 : if( FD_UNLIKELY( txn && !fd_funk_txn_valid( funk, txn ) ) ) {
116 : return NULL;
117 : }
118 : #endif
119 :
120 : /* Look for the first element in the hash chain with the right
121 : record key. This takes advantage of the fact that elements with
122 : the same record key appear on the same hash chain in order of
123 : newest to oldest. */
124 :
125 64485471 : fd_funk_xid_key_pair_t pair[1];
126 64485471 : fd_funk_rec_key_set_pair( pair, txn, key );
127 :
128 64485471 : fd_funk_rec_map_shmem_t * rec_map = funk->rec_map->map;
129 64485471 : ulong hash = fd_funk_rec_map_key_hash( pair, rec_map->seed );
130 64485471 : ulong chain_idx = (hash & (rec_map->chain_cnt-1UL) );
131 :
132 64485471 : fd_funk_rec_map_shmem_private_chain_t * chain = fd_funk_rec_map_shmem_private_chain( rec_map, hash );
133 64485471 : query->ele = NULL;
134 64485471 : query->chain = chain;
135 64485471 : query->ver_cnt = chain->ver_cnt; /* After unlock */
136 :
137 64485471 : for( fd_funk_rec_map_iter_t iter = fd_funk_rec_map_iter( funk->rec_map, chain_idx );
138 154629999 : !fd_funk_rec_map_iter_done( iter );
139 154594407 : iter = fd_funk_rec_map_iter_next( iter ) ) {
140 154594407 : fd_funk_rec_t const * ele = fd_funk_rec_map_iter_ele_const( iter );
141 154594407 : if( FD_LIKELY( hash == ele->map_hash ) && FD_LIKELY( fd_funk_rec_key_eq( key, ele->pair.key ) ) ) {
142 :
143 : /* For cur_txn in path from [txn] to [root] where root is NULL */
144 :
145 302780883 : for( fd_funk_txn_t const * cur_txn = txn; ; cur_txn = fd_funk_txn_parent( cur_txn, funk->txn_pool ) ) {
146 : /* If record ele is part of transaction cur_txn, we have a
147 : match. According to the property above, this will be the
148 : youngest descendent in the transaction stack. */
149 :
150 302780883 : int match = FD_UNLIKELY( cur_txn ) ? /* opt for root find (FIXME: eliminate branch with cmov into txn_xid_eq?) */
151 209127204 : fd_funk_txn_xid_eq( &cur_txn->xid, ele->pair.xid ) :
152 302780883 : fd_funk_txn_xid_eq_root( ele->pair.xid );
153 :
154 302780883 : if( FD_LIKELY( match ) ) {
155 64449879 : if( txn_out ) *txn_out = cur_txn;
156 64449879 : query->ele = ( FD_UNLIKELY( ele->flags & FD_FUNK_REC_FLAG_ERASE ) ? NULL :
157 64449879 : (fd_funk_rec_t *)ele );
158 64449879 : return query->ele;
159 64449879 : }
160 :
161 238331004 : if( cur_txn == NULL ) break;
162 238331004 : }
163 107860047 : }
164 154594407 : }
165 35592 : return NULL;
166 64485471 : }
167 :
168 : fd_funk_rec_t const *
169 : fd_funk_rec_query_copy( fd_funk_t * funk,
170 : fd_funk_txn_t const * txn,
171 : fd_funk_rec_key_t const * key,
172 : fd_valloc_t valloc,
173 0 : ulong * sz_out ) {
174 0 : *sz_out = ULONG_MAX;
175 0 : fd_funk_xid_key_pair_t pair[1];
176 0 : fd_funk_rec_key_set_pair( pair, txn, key );
177 :
178 0 : void * last_copy = NULL;
179 0 : ulong last_copy_sz = 0;
180 0 : for(;;) {
181 0 : fd_funk_rec_query_t query[1];
182 0 : int err = fd_funk_rec_map_query_try( funk->rec_map, pair, NULL, query, 0 );
183 0 : if( err == FD_MAP_ERR_KEY ) {
184 0 : if( last_copy ) fd_valloc_free( valloc, last_copy );
185 0 : return NULL;
186 0 : }
187 0 : if( err == FD_MAP_ERR_AGAIN ) continue;
188 0 : if( err != FD_MAP_SUCCESS ) FD_LOG_CRIT(( "query returned err %d", err ));
189 0 : fd_funk_rec_t const * rec = fd_funk_rec_map_query_ele_const( query );
190 0 : ulong sz = fd_funk_val_sz( rec );
191 0 : void * copy;
192 0 : if( sz <= last_copy_sz ) {
193 0 : copy = last_copy;
194 0 : } else {
195 0 : if( last_copy ) fd_valloc_free( valloc, last_copy );
196 0 : copy = last_copy = fd_valloc_malloc( valloc, 1, sz );
197 0 : last_copy_sz = sz;
198 0 : }
199 0 : memcpy( copy, fd_funk_val( rec, fd_funk_wksp( funk ) ), sz );
200 0 : *sz_out = sz;
201 0 : if( !fd_funk_rec_query_test( query ) ) return copy;
202 0 : }
203 0 : }
204 :
205 : int
206 383536194 : fd_funk_rec_query_test( fd_funk_rec_query_t * query ) {
207 383536194 : return fd_funk_rec_map_query_test( query );
208 383536194 : }
209 :
210 : fd_funk_rec_t *
211 : fd_funk_rec_prepare( fd_funk_t * funk,
212 : fd_funk_txn_t * txn,
213 : fd_funk_rec_key_t const * key,
214 : fd_funk_rec_prepare_t * prepare,
215 72899118 : int * opt_err ) {
216 : #ifdef FD_FUNK_HANDHOLDING
217 : if( FD_UNLIKELY( funk==NULL || key==NULL || prepare==NULL ) ) {
218 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_INVAL );
219 : return NULL;
220 : }
221 : if( FD_UNLIKELY( txn && !fd_funk_txn_valid( funk, txn ) ) ) {
222 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_INVAL );
223 : return NULL;
224 : }
225 : #endif
226 72899118 : memset( prepare, 0, sizeof(fd_funk_rec_prepare_t) );
227 :
228 72899118 : if( !txn ) { /* Modifying last published */
229 6060321 : if( FD_UNLIKELY( fd_funk_last_publish_is_frozen( funk ) ) ) {
230 6060120 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_FROZEN );
231 6060120 : return NULL;
232 6060120 : }
233 66838797 : } else {
234 66838797 : if( FD_UNLIKELY( fd_funk_txn_is_frozen( txn ) ) ) {
235 39779922 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_FROZEN );
236 39779922 : return NULL;
237 39779922 : }
238 66838797 : }
239 :
240 27059076 : fd_funk_rec_t * rec = prepare->rec = fd_funk_rec_pool_acquire( funk->rec_pool, NULL, 1, opt_err );
241 27059076 : if( opt_err && *opt_err == FD_POOL_ERR_CORRUPT ) {
242 0 : FD_LOG_ERR(( "corrupt element returned from funk rec pool" ));
243 0 : }
244 27059076 : if( FD_UNLIKELY( !rec ) ) {
245 25344996 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_REC );
246 25344996 : return rec;
247 25344996 : }
248 :
249 1714080 : fd_funk_val_init( rec );
250 1714080 : if( txn == NULL ) {
251 201 : fd_funk_txn_xid_set_root( rec->pair.xid );
252 201 : rec->txn_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
253 201 : prepare->txn_lock = &funk->shmem->lock;
254 1713879 : } else {
255 1713879 : fd_funk_txn_xid_copy( rec->pair.xid, &txn->xid );
256 1713879 : rec->txn_cidx = fd_funk_txn_cidx( (ulong)( txn - funk->txn_pool->ele ) );
257 1713879 : prepare->rec_head_idx = &txn->rec_head_idx;
258 1713879 : prepare->rec_tail_idx = &txn->rec_tail_idx;
259 1713879 : prepare->txn_lock = &txn->lock;
260 1713879 : }
261 1714080 : fd_funk_rec_key_copy( rec->pair.key, key );
262 1714080 : rec->tag = 0;
263 1714080 : rec->flags = 0;
264 1714080 : rec->prev_idx = FD_FUNK_REC_IDX_NULL;
265 1714080 : rec->next_idx = FD_FUNK_REC_IDX_NULL;
266 1714080 : return rec;
267 27059076 : }
268 :
269 : void
270 : fd_funk_rec_publish( fd_funk_t * funk,
271 1714041 : fd_funk_rec_prepare_t * prepare ) {
272 1714041 : fd_funk_rec_t * rec = prepare->rec;
273 1714041 : uint * rec_head_idx = prepare->rec_head_idx;
274 1714041 : uint * rec_tail_idx = prepare->rec_tail_idx;
275 :
276 : /* Lock the txn */
277 1714041 : while( FD_ATOMIC_CAS( prepare->txn_lock, 0, 1 ) ) FD_SPIN_PAUSE();
278 :
279 1714041 : if( rec_head_idx ) {
280 1713849 : uint rec_idx = (uint)( rec - funk->rec_pool->ele );
281 1713849 : uint rec_prev_idx = *rec_tail_idx;
282 1713849 : *rec_tail_idx = rec_idx;
283 1713849 : rec->prev_idx = rec_prev_idx;
284 1713849 : rec->next_idx = FD_FUNK_REC_IDX_NULL;
285 1713849 : if( fd_funk_rec_idx_is_null( rec_prev_idx ) ) {
286 280914 : *rec_head_idx = rec_idx;
287 1432935 : } else {
288 1432935 : funk->rec_pool->ele[ rec_prev_idx ].next_idx = rec_idx;
289 1432935 : }
290 1713849 : }
291 :
292 1714041 : if( fd_funk_rec_map_insert( funk->rec_map, rec, FD_MAP_FLAG_BLOCKING ) ) {
293 0 : FD_LOG_CRIT(( "fd_funk_rec_map_insert failed" ));
294 0 : }
295 :
296 1714041 : FD_VOLATILE( *prepare->txn_lock ) = 0;
297 1714041 : }
298 :
299 : void
300 : fd_funk_rec_cancel( fd_funk_t * funk,
301 39 : fd_funk_rec_prepare_t * prepare ) {
302 39 : fd_funk_val_flush( prepare->rec, funk->alloc, funk->wksp );
303 39 : fd_funk_rec_pool_release( funk->rec_pool, prepare->rec, 1 );
304 39 : }
305 :
306 : static void
307 : fd_funk_rec_txn_publish( fd_funk_t * funk,
308 0 : fd_funk_rec_prepare_t * prepare ) {
309 0 : fd_funk_rec_t * rec = prepare->rec;
310 0 : uint * rec_head_idx = prepare->rec_head_idx;
311 0 : uint * rec_tail_idx = prepare->rec_tail_idx;
312 :
313 0 : uint rec_prev_idx;
314 0 : uint rec_idx = (uint)( rec - funk->rec_pool->ele );
315 0 : rec_prev_idx = *rec_tail_idx;
316 0 : *rec_tail_idx = rec_idx;
317 0 : rec->prev_idx = rec_prev_idx;
318 0 : rec->next_idx = FD_FUNK_REC_IDX_NULL;
319 0 : if( fd_funk_rec_idx_is_null( rec_prev_idx ) ) {
320 0 : *rec_head_idx = rec_idx;
321 0 : } else {
322 0 : funk->rec_pool->ele[ rec_prev_idx ].next_idx = rec_idx;
323 0 : }
324 :
325 0 : if( FD_UNLIKELY( fd_funk_rec_map_txn_insert( funk->rec_map, rec ) ) ) {
326 0 : FD_LOG_CRIT(( "fd_funk_rec_map_insert failed" ));
327 0 : }
328 0 : }
329 :
330 : void
331 : fd_funk_rec_insert_para( fd_funk_t * funk,
332 : fd_funk_txn_t * txn,
333 27 : fd_funk_rec_key_t const * key ) {
334 :
335 : /* TODO: There is probably a cleaner way to allocate the txn memory. */
336 :
337 : /* See the header comment for why the max is 2. */
338 27 : #define MAX_TXN_KEY_CNT (2UL)
339 27 : uchar txn_mem[ fd_funk_rec_map_txn_footprint( MAX_TXN_KEY_CNT ) ] __attribute__((aligned(alignof(fd_funk_rec_map_txn_t))));
340 :
341 : /* First, we will do a global query to find a version of the record
342 : from either the current transaction or one of its ancestors. */
343 :
344 27 : fd_funk_rec_t const * rec_glob = NULL;
345 27 : fd_funk_txn_t const * txn_glob = NULL;
346 :
347 27 : for(;;) {
348 27 : fd_funk_rec_query_t query_glob[1];
349 27 : txn_glob = NULL;
350 27 : rec_glob = fd_funk_rec_query_try_global(
351 27 : funk, txn,key, &txn_glob, query_glob );
352 :
353 : /* If the record exists and already exists in the specified funk
354 : txn, we can return successfully.
355 :
356 : TODO: This should probably also check that the record has a large
357 : enough size, i.e. rec_glob >= min_sz. */
358 27 : if( rec_glob && txn==txn_glob ) {
359 27 : return;
360 27 : }
361 :
362 0 : if( rec_glob && fd_funk_rec_query_test( query_glob )==FD_FUNK_SUCCESS ) {
363 0 : break;
364 0 : }
365 0 : }
366 :
367 : /* At this point, we need to atomically clone the record and copy
368 : in the contents of the global record. We at most will be trying to
369 : create a txn with two record keys. If the key exists in some
370 : ancestor txn, than we need to add the ancestor key to the txn. We
371 : will always need to add the current key to the txn. */
372 : /* TODO: Turn key_max into a const. */
373 :
374 0 : fd_funk_rec_map_txn_t * map_txn = fd_funk_rec_map_txn_init( txn_mem, funk->rec_map, MAX_TXN_KEY_CNT );
375 :
376 0 : fd_funk_xid_key_pair_t pair[1];
377 0 : fd_funk_rec_key_set_pair( pair, txn, key );
378 :
379 0 : fd_funk_rec_map_txn_add( map_txn, pair, 1 );
380 :
381 0 : fd_funk_xid_key_pair_t pair_glob[1];
382 0 : if( rec_glob ) {
383 0 : fd_funk_rec_key_set_pair( pair_glob, txn_glob, key );
384 0 : fd_funk_rec_map_txn_add( map_txn, pair_glob, 1 );
385 0 : }
386 :
387 : /* Now that the keys are in the txn, we can try to start the
388 : transaction on the record_map. */
389 :
390 0 : int err = fd_funk_rec_map_txn_try( map_txn, FD_MAP_FLAG_BLOCKING );
391 0 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) {
392 0 : FD_LOG_CRIT(( "fd_funk_rec_map_txn_try returned err %d", err ));
393 0 : }
394 :
395 : /* We are now in a txn try with a lock on both record keys. */
396 :
397 : /* First we need to make sure that the record hasn't been created yet. */
398 0 : fd_funk_rec_map_query_t query[1];
399 0 : err = fd_funk_rec_map_txn_query( funk->rec_map, pair, NULL, query, FD_MAP_FLAG_BLOCKING );
400 0 : if( FD_UNLIKELY( err==FD_MAP_SUCCESS ) ) {
401 : /* The key has been inserted. We need to gracefully exit the txn. */
402 0 : err = fd_funk_rec_map_txn_test( map_txn );
403 0 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) {
404 0 : FD_LOG_CRIT(( "fd_funk_rec_map_txn_test returned err %d", err ));
405 0 : }
406 0 : fd_funk_rec_map_txn_fini( map_txn );
407 0 : return;
408 0 : }
409 :
410 : /* If we are at this point, we know for certain that the record hasn't
411 : been created yet. We will copy in the record from the global txn
412 : (if one exists). */
413 :
414 0 : fd_funk_rec_prepare_t prepare[1];
415 0 : fd_funk_rec_prepare( funk, txn, key, prepare, &err );
416 0 : if( FD_UNLIKELY( err ) ) {
417 0 : FD_LOG_CRIT(( "fd_funk_rec_prepare returned err=%d", err ));
418 0 : }
419 0 : fd_funk_rec_txn_publish( funk, prepare );
420 :
421 0 : err = fd_funk_rec_map_txn_test( map_txn );
422 0 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) {
423 0 : FD_LOG_CRIT(( "fd_funk_rec_map_txn_test returned err %d", err ));
424 0 : }
425 :
426 0 : fd_funk_rec_map_txn_fini( map_txn );
427 :
428 0 : #undef MAX_TXN_KEY_CNT
429 0 : }
430 :
431 : fd_funk_rec_t *
432 : fd_funk_rec_clone( fd_funk_t * funk,
433 : fd_funk_txn_t * txn,
434 : fd_funk_rec_key_t const * key,
435 : fd_funk_rec_prepare_t * prepare,
436 39 : int * opt_err ) {
437 39 : fd_funk_rec_t * new_rec = fd_funk_rec_prepare( funk, txn, key, prepare, opt_err );
438 39 : if( !new_rec ) return NULL;
439 :
440 39 : for(;;) {
441 39 : fd_funk_rec_query_t query[1];
442 39 : fd_funk_rec_t const * old_rec = fd_funk_rec_query_try_global( funk, txn, key, NULL, query );
443 39 : if( !old_rec ) {
444 39 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_KEY );
445 39 : fd_funk_rec_cancel( funk, prepare );
446 39 : return NULL;
447 39 : }
448 :
449 0 : fd_wksp_t * wksp = fd_funk_wksp( funk );
450 0 : ulong val_sz = old_rec->val_sz;
451 0 : void * buf = fd_funk_val_truncate(
452 0 : new_rec,
453 0 : fd_funk_alloc( funk ),
454 0 : wksp,
455 0 : 0UL,
456 0 : val_sz,
457 0 : opt_err );
458 0 : if( !buf ) {
459 0 : fd_funk_rec_cancel( funk, prepare );
460 0 : return NULL;
461 0 : }
462 0 : memcpy( buf, fd_funk_val( old_rec, wksp ), val_sz );
463 :
464 0 : if( !fd_funk_rec_query_test( query ) ) {
465 0 : return new_rec;
466 0 : }
467 0 : }
468 39 : }
469 :
470 : int
471 : fd_funk_rec_remove( fd_funk_t * funk,
472 : fd_funk_txn_t * txn,
473 : fd_funk_rec_key_t const * key,
474 12212265 : fd_funk_rec_t ** rec_out ) {
475 : #ifdef FD_FUNK_HANDHOLDING
476 : if( FD_UNLIKELY( funk==NULL || key==NULL ) ) {
477 : return FD_FUNK_ERR_INVAL;
478 : }
479 : if( FD_UNLIKELY( txn && !fd_funk_txn_valid( funk, txn ) ) ) {
480 : return FD_FUNK_ERR_INVAL;
481 : }
482 : #endif
483 :
484 12212265 : if( !txn ) { /* Modifying last published */
485 1491216 : if( FD_UNLIKELY( fd_funk_last_publish_is_frozen( funk ) ) ) {
486 1437087 : return FD_FUNK_ERR_FROZEN;
487 1437087 : }
488 10721049 : } else {
489 10721049 : if( FD_UNLIKELY( fd_funk_txn_is_frozen( txn ) ) ) {
490 10199817 : return FD_FUNK_ERR_FROZEN;
491 10199817 : }
492 10721049 : }
493 :
494 575361 : fd_funk_xid_key_pair_t pair[1];
495 575361 : fd_funk_rec_key_set_pair( pair, txn, key );
496 :
497 575361 : fd_funk_rec_query_t query[ 1 ];
498 575361 : for(;;) {
499 575361 : int err = fd_funk_rec_map_query_try( funk->rec_map, pair, NULL, query, 0 );
500 575361 : if( err == FD_MAP_SUCCESS ) break;
501 0 : if( err == FD_MAP_ERR_KEY ) return FD_FUNK_ERR_KEY;
502 0 : if( err == FD_MAP_ERR_AGAIN ) continue;
503 0 : FD_LOG_CRIT(( "query returned err %d", err ));
504 0 : }
505 :
506 575361 : fd_funk_rec_t * rec = fd_funk_rec_map_query_ele( query );
507 575361 : if( rec_out ) *rec_out = rec;
508 :
509 : /* Access the flags atomically */
510 575361 : ulong old_flags;
511 575361 : for(;;) {
512 575361 : old_flags = rec->flags;
513 575361 : if( FD_UNLIKELY( old_flags & FD_FUNK_REC_FLAG_ERASE ) ) return FD_FUNK_SUCCESS;
514 459801 : if( FD_ATOMIC_CAS( &rec->flags, old_flags, old_flags | FD_FUNK_REC_FLAG_ERASE ) == old_flags ) break;
515 459801 : }
516 :
517 : /* Flush the value and leave a tombstone behind. In theory, this can
518 : lead to an unbounded number of records, but for application
519 : reasons, we need to remember what was deleted. */
520 :
521 459801 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
522 :
523 459801 : return FD_FUNK_SUCCESS;
524 575361 : }
525 :
526 : static void
527 18908736 : fd_funk_all_iter_skip_nulls( fd_funk_all_iter_t * iter ) {
528 18908736 : if( iter->chain_idx == iter->chain_cnt ) return;
529 3362813889 : while( fd_funk_rec_map_iter_done( iter->rec_map_iter ) ) {
530 3344007180 : if( ++(iter->chain_idx) == iter->chain_cnt ) break;
531 3343905153 : iter->rec_map_iter = fd_funk_rec_map_iter( &iter->rec_map, iter->chain_idx );
532 3343905153 : }
533 18908736 : }
534 :
535 : void
536 102027 : fd_funk_all_iter_new( fd_funk_t * funk, fd_funk_all_iter_t * iter ) {
537 102027 : iter->rec_map = *funk->rec_map;
538 102027 : iter->chain_cnt = fd_funk_rec_map_chain_cnt( &iter->rec_map );
539 102027 : iter->chain_idx = 0;
540 102027 : iter->rec_map_iter = fd_funk_rec_map_iter( &iter->rec_map, 0 );
541 102027 : fd_funk_all_iter_skip_nulls( iter );
542 102027 : }
543 :
544 : int
545 18908736 : fd_funk_all_iter_done( fd_funk_all_iter_t * iter ) {
546 18908736 : return ( iter->chain_idx == iter->chain_cnt );
547 18908736 : }
548 :
549 : void
550 18806709 : fd_funk_all_iter_next( fd_funk_all_iter_t * iter ) {
551 18806709 : iter->rec_map_iter = fd_funk_rec_map_iter_next( iter->rec_map_iter );
552 18806709 : fd_funk_all_iter_skip_nulls( iter );
553 18806709 : }
554 :
555 : fd_funk_rec_t const *
556 18806709 : fd_funk_all_iter_ele_const( fd_funk_all_iter_t * iter ) {
557 18806709 : return fd_funk_rec_map_iter_ele_const( iter->rec_map_iter );
558 18806709 : }
559 :
560 : fd_funk_rec_t *
561 0 : fd_funk_all_iter_ele( fd_funk_all_iter_t * iter ) {
562 0 : return fd_funk_rec_map_iter_ele( iter->rec_map_iter );
563 0 : }
564 :
565 : int
566 0 : fd_funk_rec_purify( fd_funk_t * funk ) {
567 0 : uint rec_used_cnt = 0;
568 0 : uint rec_free_cnt = 0;
569 :
570 0 : fd_funk_rec_map_t * rec_map = funk->rec_map;
571 0 : fd_funk_rec_pool_t * rec_pool = funk->rec_pool;
572 0 : ulong rec_max = fd_funk_rec_pool_ele_max( rec_pool );
573 :
574 0 : fd_funk_rec_map_reset( rec_map );
575 0 : rec_pool->pool->ver_top = fd_funk_rec_pool_idx_null();;
576 :
577 0 : for( ulong i = 0; i < rec_max; ++i ) {
578 0 : fd_funk_rec_t * rec = rec_pool->ele + i;
579 0 : if( fd_funk_rec_pool_is_in_pool( rec ) ||
580 0 : (rec->flags & FD_FUNK_REC_FLAG_ERASE) ) {
581 0 : fd_funk_rec_pool_release( rec_pool, rec, 1 );
582 0 : rec_free_cnt++;
583 0 : continue;
584 0 : }
585 0 : if( !fd_funk_txn_xid_eq_root( rec->pair.xid ) ) {
586 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
587 0 : fd_funk_rec_pool_release( rec_pool, rec, 1 );
588 0 : rec_free_cnt++;
589 0 : continue;
590 0 : }
591 0 : rec_used_cnt++;
592 :
593 0 : fd_funk_rec_map_insert( rec_map, rec, FD_MAP_FLAG_BLOCKING );
594 0 : }
595 :
596 0 : FD_LOG_NOTICE(( "funk records used after purify: %u, free: %u", rec_used_cnt, rec_free_cnt ));
597 :
598 0 : return FD_FUNK_SUCCESS;
599 0 : }
600 :
601 : int
602 12 : fd_funk_rec_verify( fd_funk_t * funk ) {
603 12 : fd_funk_rec_map_t * rec_map = funk->rec_map;
604 12 : fd_funk_rec_pool_t * rec_pool = funk->rec_pool;
605 12 : fd_funk_txn_pool_t * txn_pool = funk->txn_pool;
606 12 : ulong txn_max = fd_funk_txn_pool_ele_max( txn_pool );
607 12 : ulong rec_max = fd_funk_rec_pool_ele_max( rec_pool );
608 :
609 24 : # define TEST(c) do { \
610 24 : if( FD_UNLIKELY( !(c) ) ) { FD_LOG_WARNING(( "FAIL: %s", #c )); return FD_FUNK_ERR_INVAL; } \
611 24 : } while(0)
612 :
613 12 : TEST( !fd_funk_rec_map_verify( rec_map ) );
614 12 : TEST( !fd_funk_rec_pool_verify( rec_pool ) );
615 :
616 : /* Iterate (again) over all records in use */
617 :
618 12 : ulong chain_cnt = fd_funk_rec_map_chain_cnt( rec_map );
619 786450 : for( ulong chain_idx=0UL; chain_idx<chain_cnt; chain_idx++ ) {
620 786438 : for( fd_funk_rec_map_iter_t iter = fd_funk_rec_map_iter( rec_map, chain_idx );
621 786438 : !fd_funk_rec_map_iter_done( iter );
622 786438 : iter = fd_funk_rec_map_iter_next( iter ) ) {
623 0 : fd_funk_rec_t const * rec = fd_funk_rec_map_iter_ele_const( iter );
624 0 : TEST( !fd_funk_rec_pool_is_in_pool( rec ) );
625 :
626 : /* Make sure every record either links up with the last published
627 : transaction or an in-prep transaction and the flags are sane. */
628 :
629 0 : fd_funk_txn_xid_t const * txn_xid = fd_funk_rec_xid( rec );
630 0 : ulong txn_idx = fd_funk_txn_idx( rec->txn_cidx );
631 :
632 0 : if( fd_funk_txn_idx_is_null( txn_idx ) ) { /* This is a record from the last published transaction */
633 :
634 0 : TEST( fd_funk_txn_xid_eq_root( txn_xid ) );
635 : /* No record linked list at the root txn */
636 0 : TEST( fd_funk_rec_idx_is_null( rec->prev_idx ) );
637 0 : TEST( fd_funk_rec_idx_is_null( rec->next_idx ) );
638 :
639 0 : } else { /* This is a record from an in-prep transaction */
640 :
641 0 : TEST( txn_idx<txn_max );
642 0 : fd_funk_txn_t const * txn = fd_funk_txn_query( txn_xid, funk->txn_map );
643 0 : TEST( txn );
644 0 : TEST( txn==(funk->txn_pool->ele+txn_idx) );
645 :
646 0 : }
647 0 : }
648 786438 : }
649 :
650 : /* Clear record tags and then verify membership */
651 :
652 1572876 : for( ulong rec_idx=0UL; rec_idx<rec_max; rec_idx++ ) rec_pool->ele[ rec_idx ].tag = 0U;
653 :
654 12 : do {
655 12 : fd_funk_all_iter_t iter[1];
656 12 : for( fd_funk_all_iter_new( funk, iter ); !fd_funk_all_iter_done( iter ); fd_funk_all_iter_next( iter ) ) {
657 0 : fd_funk_rec_t * rec = fd_funk_all_iter_ele( iter );
658 0 : if( fd_funk_txn_xid_eq_root( rec->pair.xid ) ) {
659 0 : TEST( rec->tag==0U );
660 0 : rec->tag = 1U;
661 0 : }
662 0 : }
663 :
664 12 : fd_funk_txn_all_iter_t txn_iter[1];
665 12 : for( fd_funk_txn_all_iter_new( funk, txn_iter ); !fd_funk_txn_all_iter_done( txn_iter ); fd_funk_txn_all_iter_next( txn_iter ) ) {
666 0 : fd_funk_txn_t const * txn = fd_funk_txn_all_iter_ele_const( txn_iter );
667 :
668 0 : ulong txn_idx = (ulong)(txn-txn_pool->ele);
669 0 : uint rec_idx = txn->rec_head_idx;
670 0 : while( !fd_funk_rec_idx_is_null( rec_idx ) ) {
671 0 : TEST( (rec_idx<rec_max) && (fd_funk_txn_idx( rec_pool->ele[ rec_idx ].txn_cidx )==txn_idx) && rec_pool->ele[ rec_idx ].tag==0U );
672 0 : rec_pool->ele[ rec_idx ].tag = 1U;
673 0 : fd_funk_rec_query_t query[1];
674 0 : fd_funk_rec_t const * rec2 = fd_funk_rec_query_try_global( funk, txn, rec_pool->ele[ rec_idx ].pair.key, NULL, query );
675 0 : if( FD_UNLIKELY( rec_pool->ele[ rec_idx ].flags & FD_FUNK_REC_FLAG_ERASE ) )
676 0 : TEST( rec2 == NULL );
677 0 : else
678 0 : TEST( rec2 == rec_pool->ele + rec_idx );
679 0 : uint next_idx = rec_pool->ele[ rec_idx ].next_idx;
680 0 : if( !fd_funk_rec_idx_is_null( next_idx ) ) TEST( rec_pool->ele[ next_idx ].prev_idx==rec_idx );
681 0 : rec_idx = next_idx;
682 0 : }
683 0 : }
684 12 : } while(0);
685 :
686 1572876 : for( ulong rec_idx=0UL; rec_idx<rec_max; rec_idx++ ) {
687 1572864 : FD_TEST( fd_funk_rec_pool_is_in_pool( &rec_pool->ele[ rec_idx ] ) ==
688 1572864 : (rec_pool->ele[ rec_idx ].tag == 0UL) );
689 1572864 : }
690 :
691 12 : do {
692 12 : fd_funk_txn_all_iter_t txn_iter[1];
693 12 : for( fd_funk_txn_all_iter_new( funk, txn_iter ); !fd_funk_txn_all_iter_done( txn_iter ); fd_funk_txn_all_iter_next( txn_iter ) ) {
694 0 : fd_funk_txn_t const * txn = fd_funk_txn_all_iter_ele_const( txn_iter );
695 :
696 0 : ulong txn_idx = (ulong)(txn-txn_pool->ele);
697 0 : uint rec_idx = txn->rec_tail_idx;
698 0 : while( !fd_funk_rec_idx_is_null( rec_idx ) ) {
699 0 : TEST( (rec_idx<rec_max) && (fd_funk_txn_idx( rec_pool->ele[ rec_idx ].txn_cidx )==txn_idx) && rec_pool->ele[ rec_idx ].tag==1U );
700 0 : rec_pool->ele[ rec_idx ].tag = 2U;
701 0 : uint prev_idx = rec_pool->ele[ rec_idx ].prev_idx;
702 0 : if( !fd_funk_rec_idx_is_null( prev_idx ) ) TEST( rec_pool->ele[ prev_idx ].next_idx==rec_idx );
703 0 : rec_idx = prev_idx;
704 0 : }
705 0 : }
706 12 : } while(0);
707 :
708 12 : # undef TEST
709 :
710 12 : return FD_FUNK_SUCCESS;
711 12 : }
|