Line data Source code
1 : #include "fd_funk.h"
2 :
3 : /* Provide the actual record map implementation */
4 :
5 27434842 : static void fd_funk_rec_pool_mark_in_pool( fd_funk_rec_t * ele ) {
6 27434842 : ele->val_gaddr = ULONG_MAX;
7 27434842 : ele->val_sz = UINT_MAX;
8 27434842 : ele->val_max = UINT_MAX;
9 27434842 : }
10 22518331 : static void fd_funk_rec_pool_mark_not_in_pool( fd_funk_rec_t * ele ) {
11 22518331 : ele->val_gaddr = 0;
12 22518331 : ele->val_sz = 0;
13 22518331 : ele->val_max = 0;
14 22518331 : }
15 1572864 : static int fd_funk_rec_pool_is_in_pool( fd_funk_rec_t const * ele ) {
16 1572864 : return (ele->val_sz == UINT_MAX);
17 1572864 : }
18 :
19 : #define POOL_NAME fd_funk_rec_pool
20 465331675 : #define POOL_ELE_T fd_funk_rec_t
21 : #define POOL_IDX_T uint
22 51526220 : #define POOL_NEXT map_next
23 27434842 : #define POOL_MARK_IN_POOL(ele) fd_funk_rec_pool_mark_in_pool(ele)
24 22518331 : #define POOL_MARK_NOT_IN_POOL(ele) fd_funk_rec_pool_mark_not_in_pool(ele)
25 : #define POOL_IMPL_STYLE 2
26 : #include "../util/tmpl/fd_pool_para.c"
27 :
28 : #define MAP_NAME fd_funk_rec_map
29 388304756 : #define MAP_ELE_T fd_funk_rec_t
30 0 : #define MAP_KEY_T fd_funk_xid_key_pair_t
31 22518310 : #define MAP_KEY pair
32 : #define MAP_KEY_EQ(k0,k1) fd_funk_xid_key_pair_eq((k0),(k1))
33 : #define MAP_KEY_HASH(k0,seed) fd_funk_xid_key_pair_hash((k0),(seed))
34 388304687 : #define MAP_IDX_T uint
35 438403066 : #define MAP_NEXT map_next
36 22518310 : #define MAP_MEMO map_hash
37 33 : #define MAP_MAGIC (0xf173da2ce77ecdb0UL) /* Firedancer rec db version 0 */
38 : #define MAP_MEMOIZE 1
39 : #define MAP_IMPL_STYLE 2
40 : #include "../util/tmpl/fd_map_chain_para.c"
41 :
42 : static void
43 : fd_funk_rec_key_set_pair( fd_funk_xid_key_pair_t * key_pair,
44 : fd_funk_txn_t const * txn,
45 476133916 : fd_funk_rec_key_t const * key ) {
46 476133916 : if( !txn ) {
47 214585797 : fd_funk_txn_xid_set_root( key_pair->xid );
48 261548119 : } else {
49 261548119 : fd_funk_txn_xid_copy( key_pair->xid, &txn->xid );
50 261548119 : }
51 476133916 : fd_funk_rec_key_copy( key_pair->key, key );
52 476133916 : }
53 :
54 : fd_funk_rec_t const *
55 : fd_funk_rec_query_try( fd_funk_t * funk,
56 : fd_funk_txn_t const * txn,
57 : fd_funk_rec_key_t const * key,
58 342797874 : fd_funk_rec_query_t * query ) {
59 : #ifdef FD_FUNK_HANDHOLDING
60 : if( FD_UNLIKELY( funk==NULL || key==NULL || query==NULL ) ) {
61 : return NULL;
62 : }
63 : if( FD_UNLIKELY( txn && !fd_funk_txn_valid( funk, txn ) ) ) {
64 : return NULL;
65 : }
66 : #endif
67 :
68 342797874 : fd_funk_xid_key_pair_t pair[1];
69 342797874 : fd_funk_rec_key_set_pair( pair, txn, key );
70 :
71 342797874 : for(;;) {
72 342797874 : int err = fd_funk_rec_map_query_try( funk->rec_map, pair, NULL, query, 0 );
73 342797874 : if( err == FD_MAP_SUCCESS ) break;
74 48 : if( err == FD_MAP_ERR_KEY ) return NULL;
75 0 : if( err == FD_MAP_ERR_AGAIN ) continue;
76 0 : FD_LOG_CRIT(( "query returned err %d", err ));
77 0 : }
78 342797826 : return fd_funk_rec_map_query_ele_const( query );
79 342797874 : }
80 :
81 :
82 : fd_funk_rec_t *
83 : fd_funk_rec_modify( fd_funk_t * funk,
84 : fd_funk_txn_t const * txn,
85 : fd_funk_rec_key_t const * key,
86 23740419 : fd_funk_rec_query_t * query ) {
87 23740419 : fd_funk_rec_map_t * rec_map = fd_funk_rec_map( funk );
88 23740419 : fd_funk_xid_key_pair_t pair[1];
89 23740419 : fd_funk_rec_key_set_pair( pair, txn, key );
90 :
91 23740419 : int err = fd_funk_rec_map_modify_try( rec_map, pair, NULL, query, FD_MAP_FLAG_BLOCKING );
92 23740419 : if( err==FD_MAP_ERR_KEY ) return NULL;
93 23740419 : if( err!=FD_MAP_SUCCESS ) FD_LOG_CRIT(( "query returned err %d", err ));
94 :
95 23740419 : fd_funk_rec_t * rec = fd_funk_rec_map_query_ele( query );
96 23740419 : return rec;
97 23740419 : }
98 :
99 : void
100 23740419 : fd_funk_rec_modify_publish( fd_funk_rec_query_t * query ) {
101 23740419 : fd_funk_rec_map_modify_test( query );
102 23740419 : }
103 :
104 : fd_funk_rec_t const *
105 : fd_funk_rec_query_try_global( fd_funk_t const * funk,
106 : fd_funk_txn_t const * txn,
107 : fd_funk_rec_key_t const * key,
108 : fd_funk_txn_t const ** txn_out,
109 109014877 : fd_funk_rec_query_t * query ) {
110 : #ifdef FD_FUNK_HANDHOLDING
111 : if( FD_UNLIKELY( funk==NULL || key==NULL || query==NULL ) ) {
112 : return NULL;
113 : }
114 : if( FD_UNLIKELY( txn && !fd_funk_txn_valid( funk, txn ) ) ) {
115 : return NULL;
116 : }
117 : #endif
118 :
119 : /* Look for the first element in the hash chain with the right
120 : record key. This takes advantage of the fact that elements with
121 : the same record key appear on the same hash chain in order of
122 : newest to oldest. */
123 :
124 109014877 : fd_funk_xid_key_pair_t pair[1];
125 109014877 : fd_funk_rec_key_set_pair( pair, txn, key );
126 :
127 109014877 : fd_funk_rec_map_shmem_t * rec_map = funk->rec_map->map;
128 109014877 : ulong hash = fd_funk_rec_map_key_hash( pair, rec_map->seed );
129 109014877 : ulong chain_idx = (hash & (rec_map->chain_cnt-1UL) );
130 :
131 109014877 : fd_funk_rec_map_shmem_private_chain_t * chain = fd_funk_rec_map_shmem_private_chain( rec_map, hash );
132 109014877 : query->ele = NULL;
133 109014877 : query->chain = chain;
134 109014877 : query->ver_cnt = chain->ver_cnt; /* After unlock */
135 :
136 109014877 : for( fd_funk_rec_map_iter_t iter = fd_funk_rec_map_iter( funk->rec_map, chain_idx );
137 199158565 : !fd_funk_rec_map_iter_done( iter );
138 199126675 : iter = fd_funk_rec_map_iter_next( iter ) ) {
139 199126675 : fd_funk_rec_t const * ele = fd_funk_rec_map_iter_ele_const( iter );
140 199126675 : if( FD_LIKELY( hash == ele->map_hash ) && FD_LIKELY( fd_funk_rec_key_eq( key, ele->pair.key ) ) ) {
141 :
142 : /* For cur_txn in path from [txn] to [root] where root is NULL */
143 :
144 347372143 : for( fd_funk_txn_t const * cur_txn = txn; ; cur_txn = fd_funk_txn_parent( cur_txn, funk->txn_pool ) ) {
145 : /* If record ele is part of transaction cur_txn, we have a
146 : match. According to the property above, this will be the
147 : youngest descendent in the transaction stack. */
148 :
149 347372143 : int match = FD_UNLIKELY( cur_txn ) ? /* opt for root find (FIXME: eliminate branch with cmov into txn_xid_eq?) */
150 253464538 : fd_funk_txn_xid_eq( &cur_txn->xid, ele->pair.xid ) :
151 347372143 : fd_funk_txn_xid_eq_root( ele->pair.xid );
152 :
153 347372143 : if( FD_LIKELY( match ) ) {
154 108982987 : if( txn_out ) *txn_out = cur_txn;
155 108982987 : query->ele = ( FD_UNLIKELY( ele->flags & FD_FUNK_REC_FLAG_ERASE ) ? NULL :
156 108982987 : (fd_funk_rec_t *)ele );
157 108982987 : return query->ele;
158 108982987 : }
159 :
160 238389156 : if( cur_txn == NULL ) break;
161 238389156 : }
162 152392312 : }
163 199126675 : }
164 31890 : return NULL;
165 109014877 : }
166 :
167 : fd_funk_rec_t const *
168 : fd_funk_rec_query_copy( fd_funk_t * funk,
169 : fd_funk_txn_t const * txn,
170 : fd_funk_rec_key_t const * key,
171 : fd_valloc_t valloc,
172 0 : ulong * sz_out ) {
173 0 : *sz_out = ULONG_MAX;
174 0 : fd_funk_xid_key_pair_t pair[1];
175 0 : fd_funk_rec_key_set_pair( pair, txn, key );
176 :
177 0 : void * last_copy = NULL;
178 0 : ulong last_copy_sz = 0;
179 0 : for(;;) {
180 0 : fd_funk_rec_query_t query[1];
181 0 : int err = fd_funk_rec_map_query_try( funk->rec_map, pair, NULL, query, 0 );
182 0 : if( err == FD_MAP_ERR_KEY ) {
183 0 : if( last_copy ) fd_valloc_free( valloc, last_copy );
184 0 : return NULL;
185 0 : }
186 0 : if( err == FD_MAP_ERR_AGAIN ) continue;
187 0 : if( err != FD_MAP_SUCCESS ) FD_LOG_CRIT(( "query returned err %d", err ));
188 0 : fd_funk_rec_t const * rec = fd_funk_rec_map_query_ele_const( query );
189 0 : ulong sz = fd_funk_val_sz( rec );
190 0 : void * copy;
191 0 : if( sz <= last_copy_sz ) {
192 0 : copy = last_copy;
193 0 : } else {
194 0 : if( last_copy ) fd_valloc_free( valloc, last_copy );
195 0 : copy = last_copy = fd_valloc_malloc( valloc, 1, sz );
196 0 : last_copy_sz = sz;
197 0 : }
198 0 : memcpy( copy, fd_funk_val( rec, fd_funk_wksp( funk ) ), sz );
199 0 : *sz_out = sz;
200 0 : if( !fd_funk_rec_query_test( query ) ) return copy;
201 0 : }
202 0 : }
203 :
204 : int
205 404331823 : fd_funk_rec_query_test( fd_funk_rec_query_t * query ) {
206 404331823 : return fd_funk_rec_map_query_test( query );
207 404331823 : }
208 :
209 : fd_funk_rec_t *
210 : fd_funk_rec_prepare( fd_funk_t * funk,
211 : fd_funk_txn_t * txn,
212 : fd_funk_rec_key_t const * key,
213 : fd_funk_rec_prepare_t * prepare,
214 278505824 : int * opt_err ) {
215 : #ifdef FD_FUNK_HANDHOLDING
216 : if( FD_UNLIKELY( funk==NULL || key==NULL || prepare==NULL ) ) {
217 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_INVAL );
218 : return NULL;
219 : }
220 : if( FD_UNLIKELY( txn && !fd_funk_txn_valid( funk, txn ) ) ) {
221 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_INVAL );
222 : return NULL;
223 : }
224 : #endif
225 :
226 278505824 : if( !txn ) { /* Modifying last published */
227 6060348 : if( FD_UNLIKELY( fd_funk_last_publish_is_frozen( funk ) ) ) {
228 6060120 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_FROZEN );
229 6060120 : return NULL;
230 6060120 : }
231 272445476 : } else {
232 272445476 : if( FD_UNLIKELY( fd_funk_txn_is_frozen( txn ) ) ) {
233 39779922 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_FROZEN );
234 39779922 : return NULL;
235 39779922 : }
236 272445476 : }
237 :
238 232665782 : fd_funk_rec_t * rec = prepare->rec = fd_funk_rec_pool_acquire( funk->rec_pool, NULL, 1, opt_err );
239 232665782 : if( opt_err && *opt_err == FD_POOL_ERR_CORRUPT ) {
240 0 : FD_LOG_ERR(( "corrupt element returned from funk rec pool" ));
241 0 : }
242 :
243 232665782 : if( rec != NULL ) {
244 22518331 : fd_funk_val_init( rec );
245 22518331 : if( txn == NULL ) {
246 228 : fd_funk_txn_xid_set_root( rec->pair.xid );
247 228 : rec->txn_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
248 228 : prepare->rec_head_idx = &funk->shmem->rec_head_idx;
249 228 : prepare->rec_tail_idx = &funk->shmem->rec_tail_idx;
250 228 : prepare->txn_lock = &funk->shmem->lock;
251 22518103 : } else {
252 22518103 : fd_funk_txn_xid_copy( rec->pair.xid, &txn->xid );
253 22518103 : rec->txn_cidx = fd_funk_txn_cidx( (ulong)( txn - funk->txn_pool->ele ) );
254 22518103 : prepare->rec_head_idx = &txn->rec_head_idx;
255 22518103 : prepare->rec_tail_idx = &txn->rec_tail_idx;
256 22518103 : prepare->txn_lock = &txn->lock;
257 22518103 : }
258 22518331 : fd_funk_rec_key_copy( rec->pair.key, key );
259 22518331 : rec->tag = 0;
260 22518331 : rec->flags = 0;
261 22518331 : rec->prev_idx = FD_FUNK_REC_IDX_NULL;
262 22518331 : rec->next_idx = FD_FUNK_REC_IDX_NULL;
263 210147451 : } else {
264 210147451 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_REC );
265 210147451 : }
266 232665782 : return rec;
267 232665782 : }
268 :
269 : void
270 : fd_funk_rec_publish( fd_funk_t * funk,
271 22515412 : fd_funk_rec_prepare_t * prepare ) {
272 22515412 : fd_funk_rec_t * rec = prepare->rec;
273 22515412 : uint * rec_head_idx = prepare->rec_head_idx;
274 22515412 : uint * rec_tail_idx = prepare->rec_tail_idx;
275 :
276 : /* Lock the txn */
277 546228805 : while( FD_ATOMIC_CAS( prepare->txn_lock, 0, 1 ) ) FD_SPIN_PAUSE();
278 :
279 22515412 : uint rec_prev_idx;
280 22515412 : uint rec_idx = (uint)( rec - funk->rec_pool->ele );
281 22515412 : rec_prev_idx = *rec_tail_idx;
282 22515412 : *rec_tail_idx = rec_idx;
283 22515412 : rec->prev_idx = rec_prev_idx;
284 22515412 : rec->next_idx = FD_FUNK_REC_IDX_NULL;
285 22515412 : if( fd_funk_rec_idx_is_null( rec_prev_idx ) ) {
286 281053 : *rec_head_idx = rec_idx;
287 22234359 : } else {
288 22234359 : funk->rec_pool->ele[ rec_prev_idx ].next_idx = rec_idx;
289 22234359 : }
290 :
291 22515412 : if( fd_funk_rec_map_insert( funk->rec_map, rec, FD_MAP_FLAG_BLOCKING ) ) {
292 0 : FD_LOG_CRIT(( "fd_funk_rec_map_insert failed" ));
293 0 : }
294 :
295 22515412 : FD_VOLATILE( *prepare->txn_lock ) = 0;
296 22515412 : }
297 :
298 : void
299 : fd_funk_rec_cancel( fd_funk_t * funk,
300 21 : fd_funk_rec_prepare_t * prepare ) {
301 21 : fd_funk_val_flush( prepare->rec, funk->alloc, funk->wksp );
302 21 : fd_funk_rec_pool_release( funk->rec_pool, prepare->rec, 1 );
303 21 : }
304 :
305 : static void
306 : fd_funk_rec_txn_publish( fd_funk_t * funk,
307 2898 : fd_funk_rec_prepare_t * prepare ) {
308 2898 : fd_funk_rec_t * rec = prepare->rec;
309 2898 : uint * rec_head_idx = prepare->rec_head_idx;
310 2898 : uint * rec_tail_idx = prepare->rec_tail_idx;
311 :
312 2898 : uint rec_prev_idx;
313 2898 : uint rec_idx = (uint)( rec - funk->rec_pool->ele );
314 2898 : rec_prev_idx = *rec_tail_idx;
315 2898 : *rec_tail_idx = rec_idx;
316 2898 : rec->prev_idx = rec_prev_idx;
317 2898 : rec->next_idx = FD_FUNK_REC_IDX_NULL;
318 2898 : if( fd_funk_rec_idx_is_null( rec_prev_idx ) ) {
319 1449 : *rec_head_idx = rec_idx;
320 1449 : } else {
321 1449 : funk->rec_pool->ele[ rec_prev_idx ].next_idx = rec_idx;
322 1449 : }
323 :
324 2898 : if( FD_UNLIKELY( fd_funk_rec_map_txn_insert( funk->rec_map, rec ) ) ) {
325 0 : FD_LOG_CRIT(( "fd_funk_rec_map_insert failed" ));
326 0 : }
327 2898 : }
328 :
329 : void
330 : fd_funk_rec_try_clone_safe( fd_funk_t * funk,
331 : fd_funk_txn_t * txn,
332 : fd_funk_rec_key_t const * key,
333 : ulong align,
334 23740419 : ulong min_sz ) {
335 :
336 : /* TODO: There is probably a cleaner way to allocate the txn memory. */
337 :
338 : /* See the header comment for why the max is 2. */
339 23743317 : #define MAX_TXN_KEY_CNT (2UL)
340 23740419 : uchar txn_mem[ fd_funk_rec_map_txn_footprint( MAX_TXN_KEY_CNT ) ] __attribute__((aligned(alignof(fd_funk_rec_map_txn_t))));
341 :
342 : /* First, we will do a global query to find a version of the record
343 : from either the current transaction or one of its ancestors. */
344 :
345 23740419 : fd_funk_rec_t const * rec_glob = NULL;
346 23740419 : fd_funk_txn_t const * txn_glob = NULL;
347 :
348 23740419 : for(;;) {
349 23740419 : fd_funk_rec_query_t query_glob[1];
350 23740419 : txn_glob = NULL;
351 23740419 : rec_glob = fd_funk_rec_query_try_global(
352 23740419 : funk, txn,key, &txn_glob, query_glob );
353 :
354 : /* If the record exists and already exists in the specified funk
355 : txn, we can return successfully. */
356 23740419 : if( rec_glob && txn==txn_glob ) {
357 23737521 : return;
358 23737521 : }
359 :
360 2898 : if( fd_funk_rec_query_test( query_glob )==FD_FUNK_SUCCESS ) {
361 2898 : break;
362 2898 : }
363 2898 : }
364 :
365 : /* At this point, we need to atomically clone the record and copy
366 : in the contents of the global record. We at most will be trying to
367 : create a txn with two record keys. If the key exists in some
368 : ancestor txn, than we need to add the ancestor key to the txn. We
369 : will always need to add the current key to the txn. */
370 : /* TODO: Turn key_max into a const. */
371 :
372 2898 : fd_funk_rec_map_txn_t * map_txn = fd_funk_rec_map_txn_init( txn_mem, funk->rec_map, MAX_TXN_KEY_CNT );
373 :
374 2898 : fd_funk_xid_key_pair_t pair[1];
375 2898 : fd_funk_rec_key_set_pair( pair, txn, key );
376 :
377 2898 : fd_funk_rec_map_txn_add( map_txn, pair, 1 );
378 :
379 2898 : fd_funk_xid_key_pair_t pair_glob[1];
380 2898 : if( rec_glob ) {
381 2898 : fd_funk_rec_key_set_pair( pair_glob, txn_glob, key );
382 2898 : fd_funk_rec_map_txn_add( map_txn, pair_glob, 1 );
383 2898 : }
384 :
385 : /* Now that the keys are in the txn, we can try to start the
386 : transaction on the record_map. */
387 :
388 2898 : int err = fd_funk_rec_map_txn_try( map_txn, FD_MAP_FLAG_BLOCKING );
389 2898 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) {
390 0 : FD_LOG_CRIT(( "fd_funk_rec_map_txn_try returned err %d", err ));
391 0 : }
392 :
393 : /* We are now in a txn try with a lock on both record keys. */
394 :
395 : /* First we need to make sure that the record hasn't been created yet. */
396 2898 : fd_funk_rec_map_query_t query[1];
397 2898 : err = fd_funk_rec_map_txn_query( funk->rec_map, pair, NULL, query, FD_MAP_FLAG_BLOCKING );
398 2898 : if( FD_UNLIKELY( err==FD_MAP_SUCCESS ) ) {
399 : /* The key has been inserted. We need to gracefully exit the txn. */
400 0 : err = fd_funk_rec_map_txn_test( map_txn );
401 0 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) {
402 0 : FD_LOG_CRIT(( "fd_funk_rec_map_txn_test returned err %d", err ));
403 0 : }
404 0 : fd_funk_rec_map_txn_fini( map_txn );
405 0 : return;
406 0 : }
407 :
408 : /* If we are at this point, we know for certain that the record hasn't
409 : been created yet. We will copy in the record from the global txn
410 : (if one exists). */
411 :
412 2898 : fd_funk_rec_prepare_t prepare[1];
413 2898 : fd_funk_rec_t * new_rec = fd_funk_rec_prepare( funk, txn, key, prepare, &err );
414 2898 : if( FD_UNLIKELY( err ) ) {
415 0 : FD_LOG_CRIT(( "fd_funk_rec_prepare returned err=%d", err ));
416 0 : }
417 :
418 : /* It is fine to use the old version of the record because we can
419 : assume that it comes from a frozen txn. */
420 2898 : ulong old_val_sz = !!rec_glob ? rec_glob->val_sz : 0UL;
421 2898 : ulong new_val_sz = fd_ulong_max( old_val_sz, min_sz );
422 :
423 2898 : uchar * new_val = fd_funk_val_truncate(
424 2898 : new_rec,
425 2898 : fd_funk_alloc( funk ),
426 2898 : fd_funk_wksp( funk ),
427 2898 : align,
428 2898 : new_val_sz,
429 2898 : &err );
430 2898 : if( FD_UNLIKELY( err ) ) {
431 0 : FD_LOG_CRIT(( "fd_funk_val_truncate returned err=%d", err ));
432 0 : }
433 2898 : if( FD_UNLIKELY( !new_val ) ) {
434 0 : FD_LOG_CRIT(( "fd_funk_val_truncate returned NULL" ));
435 0 : }
436 :
437 2898 : if( rec_glob ) {
438 : /* If we have a global copy of the record, copy it in. */
439 2898 : uchar * old_data = fd_funk_val( rec_glob, fd_funk_wksp( funk ) );
440 2898 : memcpy( new_val, old_data, old_val_sz );
441 2898 : }
442 :
443 2898 : fd_funk_rec_txn_publish( funk, prepare );
444 :
445 2898 : err = fd_funk_rec_map_txn_test( map_txn );
446 2898 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) {
447 0 : FD_LOG_CRIT(( "fd_funk_rec_map_txn_test returned err %d", err ));
448 0 : }
449 :
450 2898 : fd_funk_rec_map_txn_fini( map_txn );
451 :
452 2898 : #undef MAX_TXN_KEY_CNT
453 2898 : }
454 :
455 : fd_funk_rec_t *
456 : fd_funk_rec_clone( fd_funk_t * funk,
457 : fd_funk_txn_t * txn,
458 : fd_funk_rec_key_t const * key,
459 : fd_funk_rec_prepare_t * prepare,
460 24 : int * opt_err ) {
461 24 : fd_funk_rec_t * new_rec = fd_funk_rec_prepare( funk, txn, key, prepare, opt_err );
462 24 : if( !new_rec ) return NULL;
463 :
464 24 : for(;;) {
465 24 : fd_funk_rec_query_t query[1];
466 24 : fd_funk_rec_t const * old_rec = fd_funk_rec_query_try_global( funk, txn, key, NULL, query );
467 24 : if( !old_rec ) {
468 21 : fd_int_store_if( !!opt_err, opt_err, FD_FUNK_ERR_KEY );
469 21 : fd_funk_rec_cancel( funk, prepare );
470 21 : return NULL;
471 21 : }
472 :
473 3 : fd_wksp_t * wksp = fd_funk_wksp( funk );
474 3 : ulong val_sz = old_rec->val_sz;
475 3 : void * buf = fd_funk_val_truncate(
476 3 : new_rec,
477 3 : fd_funk_alloc( funk ),
478 3 : wksp,
479 3 : 0UL,
480 3 : val_sz,
481 3 : opt_err );
482 3 : if( !buf ) {
483 0 : fd_funk_rec_cancel( funk, prepare );
484 0 : return NULL;
485 0 : }
486 3 : memcpy( buf, fd_funk_val( old_rec, wksp ), val_sz );
487 :
488 3 : if( !fd_funk_rec_query_test( query ) ) {
489 3 : return new_rec;
490 3 : }
491 3 : }
492 24 : }
493 :
494 : void
495 : fd_funk_rec_hard_remove( fd_funk_t * funk,
496 : fd_funk_txn_t * txn,
497 0 : fd_funk_rec_key_t const * key ) {
498 0 : fd_funk_xid_key_pair_t pair[1];
499 0 : fd_funk_rec_key_set_pair( pair, txn, key );
500 :
501 0 : uchar * lock = NULL;
502 0 : if( txn==NULL ) {
503 0 : lock = &funk->shmem->lock;
504 0 : } else {
505 0 : lock = &txn->lock;
506 0 : }
507 :
508 0 : while( FD_ATOMIC_CAS( lock, 0, 1 ) ) FD_SPIN_PAUSE();
509 :
510 0 : fd_funk_rec_t * rec = NULL;
511 0 : for(;;) {
512 0 : fd_funk_rec_map_query_t rec_query[1];
513 0 : int err = fd_funk_rec_map_remove( funk->rec_map, pair, NULL, rec_query, FD_MAP_FLAG_BLOCKING );
514 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
515 0 : if( err == FD_MAP_ERR_KEY ) {
516 0 : FD_VOLATILE( *lock ) = 0;
517 0 : return;
518 0 : }
519 0 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "map corruption" ));
520 0 : rec = fd_funk_rec_map_query_ele( rec_query );
521 0 : break;
522 0 : }
523 :
524 0 : uint prev_idx = rec->prev_idx;
525 0 : uint next_idx = rec->next_idx;
526 0 : if( txn == NULL ) {
527 0 : if( fd_funk_rec_idx_is_null( prev_idx ) ) funk->shmem->rec_head_idx = next_idx;
528 0 : else funk->rec_pool->ele[ prev_idx ].next_idx = next_idx;
529 0 : if( fd_funk_rec_idx_is_null( next_idx ) ) funk->shmem->rec_tail_idx = prev_idx;
530 0 : else funk->rec_pool->ele[ next_idx ].prev_idx = prev_idx;
531 0 : } else {
532 0 : if( fd_funk_rec_idx_is_null( prev_idx ) ) txn->rec_head_idx = next_idx;
533 0 : else funk->rec_pool->ele[ prev_idx ].next_idx = next_idx;
534 0 : if( fd_funk_rec_idx_is_null( next_idx ) ) txn->rec_tail_idx = prev_idx;
535 0 : else funk->rec_pool->ele[ next_idx ].prev_idx = prev_idx;
536 0 : }
537 :
538 0 : FD_VOLATILE( *lock ) = 0;
539 :
540 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
541 0 : fd_funk_rec_pool_release( funk->rec_pool, rec, 1 );
542 0 : }
543 :
544 : int
545 : fd_funk_rec_remove( fd_funk_t * funk,
546 : fd_funk_txn_t * txn,
547 : fd_funk_rec_key_t const * key,
548 : fd_funk_rec_t ** rec_out,
549 12211854 : ulong erase_data ) {
550 : #ifdef FD_FUNK_HANDHOLDING
551 : if( FD_UNLIKELY( funk==NULL || key==NULL ) ) {
552 : return FD_FUNK_ERR_INVAL;
553 : }
554 : if( FD_UNLIKELY( txn && !fd_funk_txn_valid( funk, txn ) ) ) {
555 : return FD_FUNK_ERR_INVAL;
556 : }
557 : #endif
558 :
559 12211854 : if( !txn ) { /* Modifying last published */
560 1491216 : if( FD_UNLIKELY( fd_funk_last_publish_is_frozen( funk ) ) ) {
561 1437087 : return FD_FUNK_ERR_FROZEN;
562 1437087 : }
563 10720638 : } else {
564 10720638 : if( FD_UNLIKELY( fd_funk_txn_is_frozen( txn ) ) ) {
565 10199817 : return FD_FUNK_ERR_FROZEN;
566 10199817 : }
567 10720638 : }
568 :
569 574950 : fd_funk_xid_key_pair_t pair[1];
570 574950 : fd_funk_rec_key_set_pair( pair, txn, key );
571 :
572 574950 : fd_funk_rec_query_t query[ 1 ];
573 574950 : for(;;) {
574 574950 : int err = fd_funk_rec_map_query_try( funk->rec_map, pair, NULL, query, 0 );
575 574950 : if( err == FD_MAP_SUCCESS ) break;
576 0 : if( err == FD_MAP_ERR_KEY ) return FD_FUNK_ERR_KEY;
577 0 : if( err == FD_MAP_ERR_AGAIN ) continue;
578 0 : FD_LOG_CRIT(( "query returned err %d", err ));
579 0 : }
580 :
581 574950 : fd_funk_rec_t * rec = fd_funk_rec_map_query_ele( query );
582 574950 : if( rec_out ) *rec_out = rec;
583 :
584 : /* Access the flags atomically */
585 574950 : ulong old_flags;
586 574950 : for(;;) {
587 574950 : old_flags = rec->flags;
588 574950 : if( FD_UNLIKELY( old_flags & FD_FUNK_REC_FLAG_ERASE ) ) return FD_FUNK_SUCCESS;
589 459390 : if( FD_ATOMIC_CAS( &rec->flags, old_flags, old_flags | FD_FUNK_REC_FLAG_ERASE ) == old_flags ) break;
590 459390 : }
591 :
592 : /* Flush the value and leave a tombstone behind. In theory, this can
593 : lead to an unbounded number of records, but for application
594 : reasons, we need to remember what was deleted. */
595 :
596 459390 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
597 :
598 : /* At this point, the 5 most significant bytes should store data about the
599 : transaction that the record was updated in. */
600 :
601 459390 : fd_funk_rec_set_erase_data( rec, erase_data );
602 :
603 459390 : return FD_FUNK_SUCCESS;
604 574950 : }
605 :
606 : void
607 459390 : fd_funk_rec_set_erase_data( fd_funk_rec_t * rec, ulong erase_data ) {
608 459390 : rec->flags |= ((erase_data & 0xFFFFFFFFFFUL) << (sizeof(unsigned long) * 8 - 40));
609 459390 : }
610 :
611 : ulong
612 0 : fd_funk_rec_get_erase_data( fd_funk_rec_t const * rec ) {
613 0 : return (rec->flags >> (sizeof(unsigned long) * 8 - 40)) & 0xFFFFFFFFFFUL;
614 0 : }
615 :
616 : int
617 : fd_funk_rec_forget( fd_funk_t * funk,
618 : fd_funk_rec_t ** recs,
619 0 : ulong recs_cnt ) {
620 : #ifdef FD_FUNK_HANDHOLDING
621 : if( FD_UNLIKELY( !funk ) ) return FD_FUNK_ERR_INVAL;
622 : #endif
623 :
624 : #ifdef FD_FUNK_HANDHOLDING
625 : ulong rec_max = funk->shmem->rec_max;
626 : #endif
627 :
628 0 : for( ulong i = 0; i < recs_cnt; ++i ) {
629 0 : fd_funk_rec_t * rec = recs[i];
630 :
631 : #ifdef FD_FUNK_HANDHOLDING
632 : ulong rec_idx = (ulong)(rec - funk->rec_pool->ele);
633 : if( FD_UNLIKELY( (rec_idx>=rec_max) /* Out of map (incl NULL) */ | (rec!=(funk->rec_pool->ele+rec_idx)) /* Bad alignment */ ) )
634 : return FD_FUNK_ERR_INVAL;
635 : #endif
636 :
637 0 : ulong txn_idx = fd_funk_txn_idx( rec->txn_cidx );
638 0 : if( FD_UNLIKELY( !fd_funk_txn_idx_is_null( txn_idx ) || /* Must be published */
639 0 : !( rec->flags & FD_FUNK_REC_FLAG_ERASE ) ) ) { /* Must be removed */
640 0 : return FD_FUNK_ERR_KEY;
641 0 : }
642 :
643 0 : for(;;) {
644 0 : fd_funk_rec_map_query_t rec_query[1];
645 0 : int err = fd_funk_rec_map_remove( funk->rec_map, fd_funk_rec_pair( rec ), NULL, rec_query, FD_MAP_FLAG_BLOCKING );
646 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
647 0 : if( err == FD_MAP_ERR_KEY ) return FD_FUNK_ERR_KEY;
648 0 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "map corruption" ));
649 0 : if( rec != fd_funk_rec_map_query_ele( rec_query ) ) FD_LOG_CRIT(( "map corruption" ));
650 0 : break;
651 0 : }
652 :
653 0 : uint prev_idx = rec->prev_idx;
654 0 : uint next_idx = rec->next_idx;
655 0 : if( fd_funk_rec_idx_is_null( prev_idx ) ) funk->shmem->rec_head_idx = next_idx;
656 0 : else funk->rec_pool->ele[ prev_idx ].next_idx = next_idx;
657 0 : if( fd_funk_rec_idx_is_null( next_idx ) ) funk->shmem->rec_tail_idx = prev_idx;
658 0 : else funk->rec_pool->ele[ next_idx ].prev_idx = prev_idx;
659 :
660 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
661 0 : fd_funk_rec_pool_release( funk->rec_pool, rec, 1 );
662 0 : }
663 :
664 0 : return FD_FUNK_SUCCESS;
665 0 : }
666 :
667 : static void
668 18901185 : fd_funk_all_iter_skip_nulls( fd_funk_all_iter_t * iter ) {
669 18901185 : if( iter->chain_idx == iter->chain_cnt ) return;
670 3362019912 : while( fd_funk_rec_map_iter_done( iter->rec_map_iter ) ) {
671 3343220742 : if( ++(iter->chain_idx) == iter->chain_cnt ) break;
672 3343118727 : iter->rec_map_iter = fd_funk_rec_map_iter( &iter->rec_map, iter->chain_idx );
673 3343118727 : }
674 18901185 : }
675 :
676 : void
677 102015 : fd_funk_all_iter_new( fd_funk_t * funk, fd_funk_all_iter_t * iter ) {
678 102015 : iter->rec_map = *funk->rec_map;
679 102015 : iter->chain_cnt = fd_funk_rec_map_chain_cnt( &iter->rec_map );
680 102015 : iter->chain_idx = 0;
681 102015 : iter->rec_map_iter = fd_funk_rec_map_iter( &iter->rec_map, 0 );
682 102015 : fd_funk_all_iter_skip_nulls( iter );
683 102015 : }
684 :
685 : int
686 18901185 : fd_funk_all_iter_done( fd_funk_all_iter_t * iter ) {
687 18901185 : return ( iter->chain_idx == iter->chain_cnt );
688 18901185 : }
689 :
690 : void
691 18799170 : fd_funk_all_iter_next( fd_funk_all_iter_t * iter ) {
692 18799170 : iter->rec_map_iter = fd_funk_rec_map_iter_next( iter->rec_map_iter );
693 18799170 : fd_funk_all_iter_skip_nulls( iter );
694 18799170 : }
695 :
696 : fd_funk_rec_t const *
697 18799170 : fd_funk_all_iter_ele_const( fd_funk_all_iter_t * iter ) {
698 18799170 : return fd_funk_rec_map_iter_ele_const( iter->rec_map_iter );
699 18799170 : }
700 :
701 : fd_funk_rec_t *
702 0 : fd_funk_all_iter_ele( fd_funk_all_iter_t * iter ) {
703 0 : return fd_funk_rec_map_iter_ele( iter->rec_map_iter );
704 0 : }
705 :
706 : int
707 0 : fd_funk_rec_purify( fd_funk_t * funk ) {
708 0 : uint rec_used_cnt = 0;
709 0 : uint rec_free_cnt = 0;
710 :
711 0 : fd_funk_rec_map_t * rec_map = funk->rec_map;
712 0 : fd_funk_rec_pool_t * rec_pool = funk->rec_pool;
713 0 : ulong rec_max = fd_funk_rec_pool_ele_max( rec_pool );
714 :
715 0 : fd_funk_rec_map_reset( rec_map );
716 0 : rec_pool->pool->ver_top = fd_funk_rec_pool_idx_null();;
717 :
718 0 : uint prev_idx = FD_FUNK_REC_IDX_NULL;
719 0 : for( ulong i = 0; i < rec_max; ++i ) {
720 0 : fd_funk_rec_t * rec = rec_pool->ele + i;
721 0 : if( fd_funk_rec_pool_is_in_pool( rec ) ||
722 0 : (rec->flags & FD_FUNK_REC_FLAG_ERASE) ) {
723 0 : fd_funk_rec_pool_release( rec_pool, rec, 1 );
724 0 : rec_free_cnt++;
725 0 : continue;
726 0 : }
727 0 : if( !fd_funk_txn_xid_eq_root( rec->pair.xid ) ) {
728 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
729 0 : fd_funk_rec_pool_release( rec_pool, rec, 1 );
730 0 : rec_free_cnt++;
731 0 : continue;
732 0 : }
733 0 : rec_used_cnt++;
734 :
735 0 : fd_funk_rec_map_insert( rec_map, rec, FD_MAP_FLAG_BLOCKING );
736 :
737 0 : rec->prev_idx = prev_idx;
738 0 : if( prev_idx != FD_FUNK_REC_IDX_NULL ) {
739 0 : (rec_pool->ele + prev_idx)->next_idx = fd_funk_rec_map_private_cidx( i );
740 0 : } else {
741 0 : funk->shmem->rec_head_idx = fd_funk_rec_map_private_cidx( i );
742 0 : }
743 0 : prev_idx = fd_funk_rec_map_private_cidx( i );
744 0 : }
745 :
746 0 : funk->shmem->rec_tail_idx = prev_idx;
747 0 : if( prev_idx != FD_FUNK_REC_IDX_NULL ) {
748 0 : (rec_pool->ele + prev_idx)->next_idx = FD_FUNK_REC_IDX_NULL;
749 0 : } else {
750 0 : funk->shmem->rec_head_idx = FD_FUNK_REC_IDX_NULL;
751 0 : }
752 :
753 0 : FD_LOG_NOTICE(( "funk records used after purify: %u, free: %u", rec_used_cnt, rec_free_cnt ));
754 :
755 0 : return FD_FUNK_SUCCESS;
756 0 : }
757 :
758 : int
759 12 : fd_funk_rec_verify( fd_funk_t * funk ) {
760 12 : fd_funk_rec_map_t * rec_map = funk->rec_map;
761 12 : fd_funk_rec_pool_t * rec_pool = funk->rec_pool;
762 12 : fd_funk_txn_pool_t * txn_pool = funk->txn_pool;
763 12 : ulong txn_max = fd_funk_txn_pool_ele_max( txn_pool );
764 12 : ulong rec_max = fd_funk_rec_pool_ele_max( rec_pool );
765 :
766 24 : # define TEST(c) do { \
767 24 : if( FD_UNLIKELY( !(c) ) ) { FD_LOG_WARNING(( "FAIL: %s", #c )); return FD_FUNK_ERR_INVAL; } \
768 24 : } while(0)
769 :
770 12 : TEST( !fd_funk_rec_map_verify( rec_map ) );
771 12 : TEST( !fd_funk_rec_pool_verify( rec_pool ) );
772 :
773 : /* Iterate (again) over all records in use */
774 :
775 12 : ulong chain_cnt = fd_funk_rec_map_chain_cnt( rec_map );
776 786450 : for( ulong chain_idx=0UL; chain_idx<chain_cnt; chain_idx++ ) {
777 786438 : for( fd_funk_rec_map_iter_t iter = fd_funk_rec_map_iter( rec_map, chain_idx );
778 786438 : !fd_funk_rec_map_iter_done( iter );
779 786438 : iter = fd_funk_rec_map_iter_next( iter ) ) {
780 0 : fd_funk_rec_t const * rec = fd_funk_rec_map_iter_ele_const( iter );
781 0 : TEST( !fd_funk_rec_pool_is_in_pool( rec ) );
782 :
783 : /* Make sure every record either links up with the last published
784 : transaction or an in-prep transaction and the flags are sane. */
785 :
786 0 : fd_funk_txn_xid_t const * txn_xid = fd_funk_rec_xid( rec );
787 0 : ulong txn_idx = fd_funk_txn_idx( rec->txn_cidx );
788 :
789 0 : if( fd_funk_txn_idx_is_null( txn_idx ) ) { /* This is a record from the last published transaction */
790 :
791 0 : TEST( fd_funk_txn_xid_eq_root( txn_xid ) );
792 :
793 0 : } else { /* This is a record from an in-prep transaction */
794 :
795 0 : TEST( txn_idx<txn_max );
796 0 : fd_funk_txn_t const * txn = fd_funk_txn_query( txn_xid, funk->txn_map );
797 0 : TEST( txn );
798 0 : TEST( txn==(funk->txn_pool->ele+txn_idx) );
799 :
800 0 : }
801 0 : }
802 786438 : }
803 :
804 : /* Clear record tags and then verify the forward and reverse linkage */
805 :
806 1572876 : for( ulong rec_idx=0UL; rec_idx<rec_max; rec_idx++ ) rec_pool->ele[ rec_idx ].tag = 0U;
807 :
808 12 : do {
809 12 : ulong txn_idx = FD_FUNK_TXN_IDX_NULL;
810 12 : uint rec_idx = funk->shmem->rec_head_idx;
811 12 : while( !fd_funk_rec_idx_is_null( rec_idx ) ) {
812 0 : TEST( (rec_idx<rec_max) && (fd_funk_txn_idx( rec_pool->ele[ rec_idx ].txn_cidx )==txn_idx) && rec_pool->ele[ rec_idx ].tag==0U );
813 0 : rec_pool->ele[ rec_idx ].tag = 1U;
814 0 : fd_funk_rec_query_t query[1];
815 0 : fd_funk_rec_t const * rec2 = fd_funk_rec_query_try_global( funk, NULL, rec_pool->ele[ rec_idx ].pair.key, NULL, query );
816 0 : if( FD_UNLIKELY( rec_pool->ele[ rec_idx ].flags & FD_FUNK_REC_FLAG_ERASE ) )
817 0 : TEST( rec2 == NULL );
818 0 : else
819 0 : TEST( rec2 == rec_pool->ele + rec_idx );
820 0 : uint next_idx = rec_pool->ele[ rec_idx ].next_idx;
821 0 : if( !fd_funk_rec_idx_is_null( next_idx ) ) TEST( rec_pool->ele[ next_idx ].prev_idx==rec_idx );
822 0 : rec_idx = next_idx;
823 0 : }
824 12 : fd_funk_txn_all_iter_t txn_iter[1];
825 12 : for( fd_funk_txn_all_iter_new( funk, txn_iter ); !fd_funk_txn_all_iter_done( txn_iter ); fd_funk_txn_all_iter_next( txn_iter ) ) {
826 0 : fd_funk_txn_t const * txn = fd_funk_txn_all_iter_ele_const( txn_iter );
827 :
828 0 : ulong txn_idx = (ulong)(txn-txn_pool->ele);
829 0 : uint rec_idx = txn->rec_head_idx;
830 0 : while( !fd_funk_rec_idx_is_null( rec_idx ) ) {
831 0 : TEST( (rec_idx<rec_max) && (fd_funk_txn_idx( rec_pool->ele[ rec_idx ].txn_cidx )==txn_idx) && rec_pool->ele[ rec_idx ].tag==0U );
832 0 : rec_pool->ele[ rec_idx ].tag = 1U;
833 0 : fd_funk_rec_query_t query[1];
834 0 : fd_funk_rec_t const * rec2 = fd_funk_rec_query_try_global( funk, txn, rec_pool->ele[ rec_idx ].pair.key, NULL, query );
835 0 : if( FD_UNLIKELY( rec_pool->ele[ rec_idx ].flags & FD_FUNK_REC_FLAG_ERASE ) )
836 0 : TEST( rec2 == NULL );
837 0 : else
838 0 : TEST( rec2 == rec_pool->ele + rec_idx );
839 0 : uint next_idx = rec_pool->ele[ rec_idx ].next_idx;
840 0 : if( !fd_funk_rec_idx_is_null( next_idx ) ) TEST( rec_pool->ele[ next_idx ].prev_idx==rec_idx );
841 0 : rec_idx = next_idx;
842 0 : }
843 0 : }
844 12 : } while(0);
845 :
846 1572876 : for( ulong rec_idx=0UL; rec_idx<rec_max; rec_idx++ ) {
847 1572864 : FD_TEST( fd_funk_rec_pool_is_in_pool( &rec_pool->ele[ rec_idx ] ) ==
848 1572864 : (rec_pool->ele[ rec_idx ].tag == 0UL) );
849 1572864 : }
850 :
851 12 : do {
852 12 : ulong txn_idx = FD_FUNK_TXN_IDX_NULL;
853 12 : uint rec_idx = funk->shmem->rec_tail_idx;
854 12 : while( !fd_funk_rec_idx_is_null( rec_idx ) ) {
855 0 : TEST( (rec_idx<rec_max) && (fd_funk_txn_idx( rec_pool->ele[ rec_idx ].txn_cidx )==txn_idx) && rec_pool->ele[ rec_idx ].tag==1U );
856 0 : rec_pool->ele[ rec_idx ].tag = 2U;
857 0 : uint prev_idx = rec_pool->ele[ rec_idx ].prev_idx;
858 0 : if( !fd_funk_rec_idx_is_null( prev_idx ) ) TEST( rec_pool->ele[ prev_idx ].next_idx==rec_idx );
859 0 : rec_idx = prev_idx;
860 0 : }
861 :
862 12 : fd_funk_txn_all_iter_t txn_iter[1];
863 12 : for( fd_funk_txn_all_iter_new( funk, txn_iter ); !fd_funk_txn_all_iter_done( txn_iter ); fd_funk_txn_all_iter_next( txn_iter ) ) {
864 0 : fd_funk_txn_t const * txn = fd_funk_txn_all_iter_ele_const( txn_iter );
865 :
866 0 : ulong txn_idx = (ulong)(txn-txn_pool->ele);
867 0 : uint rec_idx = txn->rec_tail_idx;
868 0 : while( !fd_funk_rec_idx_is_null( rec_idx ) ) {
869 0 : TEST( (rec_idx<rec_max) && (fd_funk_txn_idx( rec_pool->ele[ rec_idx ].txn_cidx )==txn_idx) && rec_pool->ele[ rec_idx ].tag==1U );
870 0 : rec_pool->ele[ rec_idx ].tag = 2U;
871 0 : uint prev_idx = rec_pool->ele[ rec_idx ].prev_idx;
872 0 : if( !fd_funk_rec_idx_is_null( prev_idx ) ) TEST( rec_pool->ele[ prev_idx ].next_idx==rec_idx );
873 0 : rec_idx = prev_idx;
874 0 : }
875 0 : }
876 12 : } while(0);
877 :
878 12 : # undef TEST
879 :
880 12 : return FD_FUNK_SUCCESS;
881 12 : }
|