Line data Source code
1 : #include "fd_funk.h"
2 :
3 : /* Provide the actual transaction map implementation */
4 :
5 : #define POOL_NAME fd_funk_txn_pool
6 2107161 : #define POOL_ELE_T fd_funk_txn_t
7 : #define POOL_IDX_T uint
8 5256294 : #define POOL_NEXT map_next
9 : #define POOL_IMPL_STYLE 2
10 : #include "../util/tmpl/fd_pool_para.c"
11 :
12 : #define MAP_NAME fd_funk_txn_map
13 588585870 : #define MAP_ELE_T fd_funk_txn_t
14 0 : #define MAP_KEY_T fd_funk_txn_xid_t
15 1053513 : #define MAP_KEY xid
16 : #define MAP_KEY_EQ(k0,k1) fd_funk_txn_xid_eq((k0),(k1))
17 : #define MAP_KEY_HASH(k0,seed) fd_funk_txn_xid_hash((k0),(seed))
18 394080410 : #define MAP_NEXT map_next
19 : #define MAP_HASH map_hash
20 33 : #define MAP_MAGIC (0xf173da2ce7172db0UL) /* Firedancer txn db version 0 */
21 : #define MAP_IMPL_STYLE 2
22 : #include "../util/tmpl/fd_map_chain_para.c"
23 :
24 : /* TODO: remove this lock */
25 : #include "../flamenco/fd_rwlock.h"
26 : static fd_rwlock_t funk_txn_lock[ 1 ] = {0};
27 :
28 : void
29 0 : fd_funk_txn_start_read( fd_funk_t * funk FD_PARAM_UNUSED ) {
30 0 : fd_rwlock_read( funk_txn_lock );
31 0 : }
32 :
33 : void
34 0 : fd_funk_txn_end_read( fd_funk_t * funk FD_PARAM_UNUSED ) {
35 0 : fd_rwlock_unread( funk_txn_lock );
36 0 : }
37 :
38 : void
39 33 : fd_funk_txn_start_write( fd_funk_t * funk FD_PARAM_UNUSED ) {
40 33 : fd_rwlock_write( funk_txn_lock );
41 33 : }
42 :
43 : void
44 33 : fd_funk_txn_end_write( fd_funk_t * funk FD_PARAM_UNUSED ) {
45 33 : fd_rwlock_unwrite( funk_txn_lock );
46 33 : }
47 :
48 : fd_funk_txn_t *
49 : fd_funk_txn_prepare( fd_funk_t * funk,
50 : fd_funk_txn_t * parent,
51 : fd_funk_txn_xid_t const * xid,
52 1315650 : int verbose ) {
53 :
54 1315650 : if( FD_UNLIKELY( !funk ) ) FD_LOG_CRIT(( "NULL funk" ));
55 1315650 : if( FD_UNLIKELY( !xid ) ) FD_LOG_CRIT(( "NULL xid" ));
56 1315650 : if( FD_UNLIKELY( fd_funk_txn_xid_eq_root( xid ) ) ) FD_LOG_CRIT(( "xid is root" ));
57 :
58 : #ifdef FD_FUNK_HANDHOLDING
59 : if( FD_UNLIKELY( parent && !fd_funk_txn_valid( funk, parent ) ) ) {
60 : if( FD_UNLIKELY( verbose ) ) FD_LOG_WARNING(( "bad txn" ));
61 : return NULL;
62 : }
63 : if( FD_UNLIKELY( fd_funk_txn_xid_eq( xid, funk->shmem->last_publish ) ) ) {
64 : if( FD_UNLIKELY( verbose ) ) FD_LOG_WARNING(( "xid is the last published" ));
65 : return NULL;
66 : }
67 : #endif
68 :
69 1315650 : fd_funk_txn_map_query_t query[1];
70 1315650 : if( FD_UNLIKELY( fd_funk_txn_map_query_try( funk->txn_map, xid, NULL, query, 0 ) != FD_MAP_ERR_KEY ) ) {
71 262125 : if( FD_UNLIKELY( verbose ) ) FD_LOG_WARNING(( "xid in use" ));
72 262125 : return NULL;
73 262125 : }
74 :
75 1053525 : ulong parent_idx;
76 1053525 : uint * _child_head_cidx;
77 1053525 : uint * _child_tail_cidx;
78 :
79 1053525 : if( FD_LIKELY( !parent ) ) { /* opt for incr pub */
80 :
81 445488 : parent_idx = FD_FUNK_TXN_IDX_NULL;
82 :
83 445488 : _child_head_cidx = &funk->shmem->child_head_cidx;
84 445488 : _child_tail_cidx = &funk->shmem->child_tail_cidx;
85 :
86 608037 : } else {
87 :
88 608037 : parent_idx = (ulong)(parent - funk->txn_pool->ele);
89 :
90 608037 : _child_head_cidx = &parent->child_head_cidx;
91 608037 : _child_tail_cidx = &parent->child_tail_cidx;
92 :
93 608037 : }
94 :
95 : /* Get a new transaction from the map */
96 :
97 1053525 : fd_funk_txn_t * txn = fd_funk_txn_pool_acquire( funk->txn_pool, NULL, 1, NULL );
98 1053525 : if( txn == NULL ) {
99 12 : if( FD_UNLIKELY( verbose ) ) FD_LOG_WARNING(( "transaction pool is exhuasted" ));
100 12 : return NULL;
101 12 : }
102 1053513 : fd_funk_txn_xid_copy( &txn->xid, xid );
103 1053513 : ulong txn_idx = (ulong)(txn - funk->txn_pool->ele);
104 :
105 : /* Join the family */
106 :
107 1053513 : ulong sibling_prev_idx = fd_funk_txn_idx( *_child_tail_cidx );
108 :
109 1053513 : int first_born = fd_funk_txn_idx_is_null( sibling_prev_idx );
110 :
111 1053513 : txn->parent_cidx = fd_funk_txn_cidx( parent_idx );
112 1053513 : txn->child_head_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
113 1053513 : txn->child_tail_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
114 1053513 : txn->sibling_prev_cidx = fd_funk_txn_cidx( sibling_prev_idx );
115 1053513 : txn->sibling_next_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
116 1053513 : txn->stack_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
117 1053513 : txn->tag = 0UL;
118 :
119 1053513 : txn->lock = 0;
120 1053513 : txn->rec_head_idx = FD_FUNK_REC_IDX_NULL;
121 1053513 : txn->rec_tail_idx = FD_FUNK_REC_IDX_NULL;
122 :
123 : /* TODO: consider branchless impl */
124 1053513 : if( FD_LIKELY( first_born ) ) *_child_head_cidx = fd_funk_txn_cidx( txn_idx ); /* opt for non-compete */
125 536700 : else funk->txn_pool->ele[ sibling_prev_idx ].sibling_next_cidx = fd_funk_txn_cidx( txn_idx );
126 :
127 1053513 : *_child_tail_cidx = fd_funk_txn_cidx( txn_idx );
128 :
129 1053513 : fd_funk_txn_map_insert( funk->txn_map, txn, FD_MAP_FLAG_BLOCKING );
130 :
131 1053513 : return txn;
132 1053525 : }
133 :
134 : /* fd_funk_txn_cancel_childless cancels a transaction that is known
135 : to be childless. Callers have already validated our input arguments.
136 : Assumes that cancelling in the app can't fail but that could be
137 : straightforward to support by giving this an error and plumbing
138 : through to abort the overall cancel operation when it hits a error. */
139 :
140 : static void
141 : fd_funk_txn_cancel_childless( fd_funk_t * funk,
142 837561 : ulong txn_idx ) {
143 :
144 : /* Remove all records used by this transaction. Note that we don't
145 : need to bother doing all the individual removal operations as we
146 : are removing the whole list. We do reset the record transaction
147 : idx with NULL though we can detect cycles as soon as possible
148 : and abort. */
149 :
150 837561 : fd_wksp_t * wksp = funk->wksp;
151 837561 : fd_alloc_t * alloc = funk->alloc;
152 837561 : fd_funk_rec_map_t * rec_map = funk->rec_map;
153 837561 : fd_funk_rec_pool_t * rec_pool = funk->rec_pool;
154 837561 : ulong rec_max = fd_funk_rec_pool_ele_max( rec_pool );
155 837561 : fd_funk_txn_map_t * txn_map = funk->txn_map;
156 837561 : fd_funk_txn_pool_t * txn_pool = funk->txn_pool;
157 :
158 837561 : fd_funk_txn_t * txn = &txn_pool->ele[ txn_idx ];
159 837561 : uint rec_idx = txn->rec_head_idx;
160 2379138 : while( !fd_funk_rec_idx_is_null( rec_idx ) ) {
161 :
162 1541577 : if( FD_UNLIKELY( rec_idx>=rec_max ) ) FD_LOG_CRIT(( "memory corruption detected (bad idx)" ));
163 1541577 : if( FD_UNLIKELY( fd_funk_txn_idx( rec_pool->ele[ rec_idx ].txn_cidx )!=txn_idx ) )
164 0 : FD_LOG_CRIT(( "memory corruption detected (cycle or bad idx)" ));
165 :
166 1541577 : fd_funk_rec_t * rec = &rec_pool->ele[ rec_idx ];
167 1541577 : uint next_idx = rec->next_idx;
168 1541577 : rec->txn_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
169 :
170 1541577 : for(;;) {
171 1541577 : fd_funk_rec_map_query_t rec_query[1];
172 1541577 : int err = fd_funk_rec_map_remove( rec_map, fd_funk_rec_pair( rec ), NULL, rec_query, FD_MAP_FLAG_BLOCKING );
173 1541577 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
174 1541577 : if( err == FD_MAP_ERR_KEY ) break;
175 1541577 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "map corruption" ));
176 1541577 : if( rec != fd_funk_rec_map_query_ele( rec_query ) ) break;
177 1541577 : fd_funk_val_flush( rec, alloc, wksp );
178 1541577 : fd_funk_rec_pool_release( rec_pool, rec, 1 );
179 1541577 : break;
180 1541577 : }
181 :
182 1541577 : rec_idx = next_idx;
183 1541577 : }
184 :
185 : /* Leave the family */
186 :
187 837561 : ulong sibling_prev_idx = fd_funk_txn_idx( txn->sibling_prev_cidx );
188 837561 : ulong sibling_next_idx = fd_funk_txn_idx( txn->sibling_next_cidx );
189 :
190 : /* TODO: Consider branchless impl */
191 :
192 837561 : if( FD_LIKELY( fd_funk_txn_idx_is_null( sibling_prev_idx ) ) ) { /* opt for non-compete */
193 407289 : ulong parent_idx = fd_funk_txn_idx( txn_pool->ele[ txn_idx ].parent_cidx );
194 407289 : if( FD_LIKELY( fd_funk_txn_idx_is_null( parent_idx ) ) ) { /* No older sib and is a funk child, opt for incr pub */
195 136761 : funk->shmem->child_head_cidx = fd_funk_txn_cidx( sibling_next_idx );
196 270528 : } else { /* No older sib and has parent */
197 270528 : txn_pool->ele[ parent_idx ].child_head_cidx = fd_funk_txn_cidx( sibling_next_idx );
198 270528 : }
199 430272 : } else { /* Has older sib */
200 430272 : txn_pool->ele[ sibling_prev_idx ].sibling_next_cidx = fd_funk_txn_cidx( sibling_next_idx );
201 430272 : }
202 :
203 837561 : if( FD_LIKELY( fd_funk_txn_idx_is_null( sibling_next_idx ) ) ) { /* opt for non-compete */
204 660684 : ulong parent_idx = fd_funk_txn_idx( txn_pool->ele[ txn_idx ].parent_cidx );
205 660684 : if( FD_LIKELY( fd_funk_txn_idx_is_null( parent_idx ) ) ) { /* No younger sib and is a funk child, opt for incr pub */
206 262608 : funk->shmem->child_tail_cidx = fd_funk_txn_cidx( sibling_prev_idx );
207 398076 : } else { /* No younger sib and has parent */
208 398076 : txn_pool->ele[ parent_idx ].child_tail_cidx = fd_funk_txn_cidx( sibling_prev_idx );
209 398076 : }
210 660684 : } else { /* Has younger sib */
211 176877 : txn_pool->ele[ sibling_next_idx ].sibling_prev_cidx = fd_funk_txn_cidx( sibling_prev_idx );
212 176877 : }
213 :
214 837561 : fd_funk_txn_map_query_t query[1];
215 837561 : if( fd_funk_txn_map_remove( txn_map, fd_funk_txn_xid( txn ), NULL, query, FD_MAP_FLAG_BLOCKING ) == FD_MAP_SUCCESS ) {
216 837561 : fd_funk_txn_pool_release( txn_pool, txn, 1 );
217 837561 : }
218 837561 : }
219 :
220 : /* fd_funk_txn_cancel_family cancels a transaction and all its
221 : descendants in a tree-depth-first-ordered sense from youngest to
222 : oldest. Callers have already validated our input arguments. Returns
223 : the number of transactions canceled. */
224 :
225 : static ulong
226 : fd_funk_txn_cancel_family( fd_funk_t * funk,
227 : ulong tag,
228 474282 : ulong txn_idx ) {
229 474282 : ulong cancel_cnt = 0UL;
230 :
231 474282 : ulong parent_stack_idx = FD_FUNK_TXN_IDX_NULL;
232 :
233 1200840 : for(;;) {
234 :
235 1200840 : fd_funk_txn_t * txn = &funk->txn_pool->ele[ txn_idx ];
236 1200840 : txn->tag = tag;
237 :
238 1200840 : ulong youngest_idx = fd_funk_txn_idx( txn->child_tail_cidx );
239 1200840 : if( FD_LIKELY( fd_funk_txn_idx_is_null( youngest_idx ) ) ) { /* txn is is childless, opt for incr pub */
240 :
241 837561 : fd_funk_txn_cancel_childless( funk, txn_idx ); /* If this can fail, return cancel_cnt here on fail */
242 837561 : cancel_cnt++;
243 :
244 837561 : txn_idx = parent_stack_idx; /* Pop the parent stack */
245 837561 : if( FD_LIKELY( fd_funk_txn_idx_is_null( txn_idx ) ) ) break; /* If stack is empty, we are done, opt for incr pub */
246 363279 : parent_stack_idx = fd_funk_txn_idx( funk->txn_pool->ele[ txn_idx ].stack_cidx );
247 363279 : continue;
248 837561 : }
249 :
250 : /* txn has at least one child and the youngest is youngest_idx. Tag
251 : the youngest child, push txn onto the parent stack and recurse
252 : into the youngest child. */
253 :
254 363279 : txn->stack_cidx = fd_funk_txn_cidx( parent_stack_idx );
255 363279 : parent_stack_idx = txn_idx;
256 :
257 363279 : txn_idx = youngest_idx;
258 363279 : }
259 :
260 474282 : return cancel_cnt;
261 474282 : }
262 :
263 : ulong
264 : fd_funk_txn_cancel( fd_funk_t * funk,
265 : fd_funk_txn_t * txn,
266 147159 : int verbose ) {
267 :
268 147159 : if( FD_UNLIKELY( !funk ) ) FD_LOG_CRIT(( "NULL funk" ));
269 147159 : if( FD_UNLIKELY( !txn ) ) FD_LOG_CRIT(( "NULL txn" ));
270 : #ifdef FD_FUNK_HANDHOLDING
271 : if( FD_UNLIKELY( !fd_funk_txn_valid( funk, txn ) ) ) {
272 : if( FD_UNLIKELY( verbose ) ) FD_LOG_WARNING(( "bad txn" ));
273 : return 0UL;
274 : }
275 : #else
276 147159 : (void)verbose;
277 147159 : #endif
278 :
279 147159 : ulong txn_idx = (ulong)(txn - funk->txn_pool->ele);
280 147159 : return fd_funk_txn_cancel_family( funk, funk->shmem->cycle_tag++, txn_idx );
281 147159 : }
282 :
283 : /* fd_funk_txn_oldest_sibling returns the index of the oldest sibling
284 : in txn_idx's family. Callers have already validated our input
285 : arguments. The caller should validate the return index. */
286 :
287 : static inline ulong
288 : fd_funk_txn_oldest_sibling( fd_funk_t * funk,
289 215826 : ulong txn_idx ) {
290 215826 : ulong parent_idx = fd_funk_txn_idx( funk->txn_pool->ele[ txn_idx ].parent_cidx );
291 :
292 215826 : if( FD_LIKELY( fd_funk_txn_idx_is_null( parent_idx ) ) ) return fd_funk_txn_idx( funk->shmem->child_head_cidx ); /* opt for incr pub */
293 :
294 9468 : return fd_funk_txn_idx( funk->txn_pool->ele[ parent_idx ].child_head_cidx );
295 215826 : }
296 :
297 : /* fd_funk_txn_cancel_sibling_list cancels siblings from sibling_idx down
298 : to the youngest sibling inclusive in the order from youngest to
299 : sibling_idx. Callers have already validated our input arguments
300 : except sibling_idx. Returns the number of cancelled transactions
301 : (should be at least 1). If any sibling is skip_idx, it will be not
302 : be cancelled but still tagged as visited. Passing
303 : FD_FUNK_TXN_IDX_NULL for skip_idx will cancel all siblings from
304 : sibling_idx to the youngest inclusive. */
305 :
306 : static ulong
307 : fd_funk_txn_cancel_sibling_list( fd_funk_t * funk,
308 : ulong tag,
309 : ulong sibling_idx,
310 215826 : ulong skip_idx ) {
311 :
312 215826 : ulong cancel_stack_idx = FD_FUNK_TXN_IDX_NULL;
313 :
314 : /* Push siblings_idx and its younger siblings inclusive (skipping
315 : sibling skip_idx if encounter) onto the cancel stack from oldest to
316 : youngest (such that we cancel youngest to oldest). */
317 :
318 542949 : for(;;) {
319 :
320 : /* At this point, sibling_idx is a sibling we might want to add to
321 : the sibling stack. Validate and tag it. */
322 :
323 542949 : fd_funk_txn_t * sibling = &funk->txn_pool->ele[ sibling_idx ];
324 542949 : sibling->tag = tag;
325 :
326 542949 : if( FD_UNLIKELY( sibling_idx!=skip_idx ) ) { /* Not skip_idx so push onto the cancel stack, opt for non-compete */
327 327123 : sibling->stack_cidx = fd_funk_txn_cidx( cancel_stack_idx );
328 327123 : cancel_stack_idx = sibling_idx;
329 327123 : }
330 :
331 542949 : ulong younger_idx = fd_funk_txn_idx( sibling->sibling_next_cidx );
332 542949 : if( FD_LIKELY( fd_funk_txn_idx_is_null( younger_idx ) ) ) break; /* opt for non-compete */
333 327123 : sibling_idx = younger_idx;
334 :
335 327123 : }
336 :
337 : /* Cancel all transactions and their descendants on the cancel stack */
338 :
339 215826 : ulong cancel_cnt = 0UL;
340 :
341 542949 : while( !fd_funk_txn_idx_is_null( cancel_stack_idx ) ) { /* TODO: peel first iter to make more predictable? */
342 327123 : ulong sibling_idx = cancel_stack_idx;
343 327123 : cancel_stack_idx = fd_funk_txn_idx( funk->txn_pool->ele[ sibling_idx ].stack_cidx );
344 327123 : cancel_cnt += fd_funk_txn_cancel_family( funk, tag, sibling_idx );
345 327123 : }
346 :
347 215826 : return cancel_cnt;
348 215826 : }
349 :
350 : ulong
351 : fd_funk_txn_cancel_siblings( fd_funk_t * funk,
352 : fd_funk_txn_t * txn,
353 0 : int verbose ) {
354 :
355 : #ifdef FD_FUNK_HANDHOLDING
356 : if( FD_UNLIKELY( !funk ) ) {
357 : if( FD_UNLIKELY( verbose ) ) FD_LOG_WARNING(( "NULL funk" ));
358 : return 0UL;
359 : }
360 : if( FD_UNLIKELY( !fd_funk_txn_valid( funk, txn ) ) ) {
361 : if( FD_UNLIKELY( verbose ) ) FD_LOG_WARNING(( "bad txn" ));
362 : return 0UL;
363 : }
364 : #else
365 0 : (void)verbose;
366 0 : #endif
367 :
368 0 : ulong txn_idx = (ulong)(txn - funk->txn_pool->ele);
369 :
370 0 : ulong oldest_idx = fd_funk_txn_oldest_sibling( funk, txn_idx );
371 :
372 0 : return fd_funk_txn_cancel_sibling_list( funk, funk->shmem->cycle_tag++, oldest_idx, txn_idx );
373 0 : }
374 :
375 : ulong
376 : fd_funk_txn_cancel_children( fd_funk_t * funk,
377 : fd_funk_txn_t * txn,
378 0 : int verbose ) {
379 :
380 : #ifdef FD_FUNK_HANDHOLDING
381 : if( FD_UNLIKELY( !funk ) ) {
382 : if( FD_UNLIKELY( verbose ) ) FD_LOG_WARNING(( "NULL funk" ));
383 : return 0UL;
384 : }
385 : #else
386 0 : (void)verbose;
387 0 : #endif
388 :
389 0 : ulong oldest_idx;
390 :
391 0 : if( FD_LIKELY( txn == NULL ) ) {
392 0 : oldest_idx = fd_funk_txn_idx( funk->shmem->child_head_cidx ); /* opt for non-compete */
393 0 : } else {
394 0 : if( FD_UNLIKELY( !fd_funk_txn_valid( funk, txn ) ) ) {
395 0 : if( FD_UNLIKELY( verbose ) ) FD_LOG_CRIT(( "bad txn" ));
396 0 : return 0UL;
397 0 : }
398 0 : oldest_idx = fd_funk_txn_idx( txn->child_head_cidx );
399 0 : }
400 :
401 0 : if( fd_funk_txn_idx_is_null( oldest_idx ) ) {
402 0 : return 0UL;
403 0 : }
404 :
405 0 : return fd_funk_txn_cancel_sibling_list( funk, funk->shmem->cycle_tag++, oldest_idx, FD_FUNK_TXN_IDX_NULL );
406 0 : }
407 :
408 : ulong
409 0 : fd_funk_txn_cancel_root( fd_funk_t * funk ) {
410 0 : fd_funk_rec_map_t * rec_map = funk->rec_map;
411 0 : fd_funk_rec_pool_t * rec_pool = funk->rec_pool;
412 0 : ulong chain_cnt = fd_funk_rec_map_chain_cnt( rec_map );
413 0 : for( ulong chain_idx=0UL; chain_idx<chain_cnt; chain_idx++ ) {
414 0 : for(
415 0 : fd_funk_rec_map_iter_t iter = fd_funk_rec_map_iter( rec_map, chain_idx );
416 0 : !fd_funk_rec_map_iter_done( iter );
417 0 : iter = fd_funk_rec_map_iter_next( iter )
418 0 : ) {
419 0 : for (;;) {
420 0 : fd_funk_rec_map_query_t rec_query[1];
421 0 : int err = fd_funk_rec_map_remove( rec_map, fd_funk_rec_pair( iter.ele ), NULL, rec_query, FD_MAP_FLAG_BLOCKING );
422 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
423 0 : if( err == FD_MAP_ERR_KEY ) break;
424 0 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "map corruption" ));
425 0 : if( iter.ele != fd_funk_rec_map_query_ele( rec_query ) ) break;
426 0 : fd_funk_val_flush( fd_funk_rec_map_iter_ele( iter ), fd_funk_alloc( funk ), fd_funk_wksp( funk ) );
427 0 : fd_funk_rec_pool_release( rec_pool, fd_funk_rec_map_query_ele( rec_query ), 1 );
428 0 : }
429 0 : }
430 0 : }
431 :
432 0 : return 0UL;
433 0 : }
434 :
435 : /* Cancel all outstanding transactions */
436 :
437 : ulong
438 : fd_funk_txn_cancel_all( fd_funk_t * funk,
439 0 : int verbose ) {
440 0 : return fd_funk_txn_cancel_children( funk, NULL, verbose );
441 0 : }
442 :
443 : /* fd_funk_txn_update applies the record updates in transaction txn_idx
444 : to another transaction or the parent transaction. Callers have
445 : already validated our input arguments.
446 :
447 : On entry, the head/tail of the destination records are at
448 : *_dst_rec_head_idx / *_dst_rec_tail_idx. All transactions on this
449 : list will have transaction id dst_xid and vice versa. That is, this
450 : is the record list the last published transaction or txn_idx's
451 : in-prep parent transaction.
452 :
453 : On exit, the head/tail of the updated records is at
454 : *_dst_rec_head_idx / *_dst_rec_tail_idx. As before, all transactions
455 : on this list will have transaction id dst_xid and vice versa.
456 : Transaction txn_idx will have an _empty_ record list.
457 :
458 : Updates in the transaction txn_idx are processed from oldest to
459 : youngest. If an update erases an existing record in dest, the record
460 : to erase is removed from the destination records without perturbing
461 : the order of remaining destination records. If an update is to
462 : update an existing record, the destination record value is updated
463 : and the order of the destination records is unchanged. If an update
464 : is to create a new record, the record is appended to the list of
465 : existing values as youngest without changing the order of existing
466 : values. If an update erases a record in an in-prep parent, the
467 : erasure will be moved into the parent as the youngest without
468 : changing the order of existing values. */
469 :
470 : static void
471 : fd_funk_txn_update( fd_funk_t * funk,
472 : uint * _dst_rec_head_idx, /* Pointer to the dst list head */
473 : uint * _dst_rec_tail_idx, /* Pointer to the dst list tail */
474 : ulong dst_txn_idx, /* Transaction index of the merge destination */
475 : fd_funk_txn_xid_t const * dst_xid, /* dst xid */
476 215826 : ulong txn_idx ) { /* Transaction index of the records to merge */
477 215826 : fd_wksp_t * wksp = funk->wksp;
478 215826 : fd_alloc_t * alloc = funk->alloc;
479 215826 : fd_funk_rec_map_t * rec_map = funk->rec_map;
480 215826 : fd_funk_rec_pool_t * rec_pool = funk->rec_pool;
481 215826 : fd_funk_txn_pool_t * txn_pool = funk->txn_pool;
482 :
483 215826 : int const critical = _dst_rec_head_idx==NULL;
484 215826 : if( critical ) {
485 : /* If the root transaction is being updated, we need to mark
486 : the beginning of a critical section becuase fd_funk_purify can't fix it */
487 206358 : fd_begin_crit( funk );
488 206358 : }
489 :
490 215826 : fd_funk_txn_t * txn = &txn_pool->ele[ txn_idx ];
491 215826 : uint rec_idx = txn->rec_head_idx;
492 437349 : while( !fd_funk_rec_idx_is_null( rec_idx ) ) {
493 221523 : fd_funk_rec_t * rec = &rec_pool->ele[ rec_idx ];
494 221523 : uint next_rec_idx = rec->next_idx;
495 :
496 : /* See if (dst_xid,key) already exists. */
497 221523 : fd_funk_xid_key_pair_t pair[1];
498 221523 : fd_funk_xid_key_pair_init( pair, dst_xid, rec->pair.key );
499 221523 : for(;;) {
500 221523 : fd_funk_rec_map_query_t rec_query[1];
501 221523 : int err = fd_funk_rec_map_remove( rec_map, pair, NULL, rec_query, FD_MAP_FLAG_BLOCKING );
502 221523 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
503 221523 : if( err == FD_MAP_ERR_KEY ) break;
504 171363 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "map corruption" ));
505 :
506 : /* Remove from the transaction */
507 171363 : fd_funk_rec_t * rec2 = fd_funk_rec_map_query_ele( rec_query );
508 171363 : uint prev_idx = rec2->prev_idx;
509 171363 : uint next_idx = rec2->next_idx;
510 171363 : if( fd_funk_rec_idx_is_null( prev_idx ) ) {
511 170595 : if( _dst_rec_head_idx ) *_dst_rec_head_idx = next_idx;
512 170595 : } else {
513 768 : rec_pool->ele[ prev_idx ].next_idx = next_idx;
514 768 : }
515 171363 : if( fd_funk_rec_idx_is_null( next_idx ) ) {
516 170484 : if( _dst_rec_tail_idx ) *_dst_rec_tail_idx = prev_idx;
517 170484 : } else {
518 879 : rec_pool->ele[ next_idx ].prev_idx = prev_idx;
519 879 : }
520 : /* Clean up value */
521 171363 : fd_funk_val_flush( rec2, alloc, wksp );
522 171363 : rec2->txn_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
523 171363 : fd_funk_rec_pool_release( rec_pool, rec2, 1 );
524 171363 : break;
525 171363 : }
526 :
527 : /* Add the new record to the transaction. We can update the xid in
528 : place because it is not used for hashing the element. We have
529 : to preserve the original element to preserve the
530 : newest-to-oldest ordering in the hash
531 : chain. fd_funk_rec_query_global relies on this subtle
532 : property. */
533 :
534 221523 : rec->pair.xid[0] = *dst_xid;
535 221523 : rec->txn_cidx = fd_funk_txn_cidx( dst_txn_idx );
536 :
537 221523 : rec->prev_idx = FD_FUNK_REC_IDX_NULL;
538 221523 : if( _dst_rec_head_idx ) {
539 50745 : if( fd_funk_rec_idx_is_null( *_dst_rec_head_idx ) ) {
540 4842 : *_dst_rec_head_idx = rec_idx;
541 45903 : } else {
542 45903 : rec_pool->ele[ *_dst_rec_tail_idx ].next_idx = rec_idx;
543 45903 : rec->prev_idx = *_dst_rec_tail_idx;
544 45903 : }
545 50745 : *_dst_rec_tail_idx = rec_idx;
546 50745 : }
547 221523 : rec->next_idx = FD_FUNK_REC_IDX_NULL;
548 :
549 221523 : rec_idx = next_rec_idx;
550 221523 : }
551 :
552 215826 : txn_pool->ele[ txn_idx ].rec_head_idx = FD_FUNK_REC_IDX_NULL;
553 215826 : txn_pool->ele[ txn_idx ].rec_tail_idx = FD_FUNK_REC_IDX_NULL;
554 :
555 215826 : if( critical ) {
556 206358 : fd_end_crit( funk );
557 206358 : }
558 215826 : }
559 :
560 : /* fd_funk_txn_publish_funk_child publishes a transaction that is known
561 : to be a child of funk. Callers have already validated our input
562 : arguments. Returns FD_FUNK_SUCCESS on success and an FD_FUNK_ERR_*
563 : code on failure. (There are currently no failure cases but the
564 : plumbing is there if value handling requires it at some point.) */
565 :
566 : static int
567 : fd_funk_txn_publish_funk_child( fd_funk_t * const funk,
568 : ulong const tag,
569 203826 : ulong const txn_idx ) {
570 :
571 : /* Apply the updates in txn to the last published transactions */
572 :
573 203826 : fd_funk_txn_update( funk, NULL, NULL, FD_FUNK_TXN_IDX_NULL, fd_funk_root( funk ), txn_idx );
574 :
575 : /* Cancel all competing transaction histories */
576 :
577 203826 : ulong oldest_idx = fd_funk_txn_oldest_sibling( funk, txn_idx );
578 203826 : fd_funk_txn_cancel_sibling_list( funk, tag, oldest_idx, txn_idx );
579 :
580 : /* Make all the children children of funk */
581 :
582 203826 : fd_funk_txn_t * txn = fd_funk_txn_pool_ele( funk->txn_pool, txn_idx );
583 203826 : ulong child_head_idx = fd_funk_txn_idx( txn->child_head_cidx );
584 203826 : ulong child_tail_idx = fd_funk_txn_idx( txn->child_tail_cidx );
585 :
586 203826 : ulong child_idx = child_head_idx;
587 385290 : while( FD_UNLIKELY( !fd_funk_txn_idx_is_null( child_idx ) ) ) { /* opt for incr pub */
588 181464 : fd_funk_txn_t * child_txn = fd_funk_txn_pool_ele( funk->txn_pool, child_idx );
589 181464 : child_txn->tag = tag;
590 181464 : child_txn->parent_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
591 181464 : child_idx = fd_funk_txn_idx( child_txn->sibling_next_cidx );
592 181464 : }
593 :
594 203826 : funk->shmem->child_head_cidx = fd_funk_txn_cidx( child_head_idx );
595 203826 : funk->shmem->child_tail_cidx = fd_funk_txn_cidx( child_tail_idx );
596 :
597 203826 : fd_funk_txn_xid_copy( funk->shmem->last_publish, fd_funk_txn_xid( txn ) );
598 :
599 : /* Remove the mapping */
600 :
601 203826 : fd_funk_txn_map_query_t query[1];
602 203826 : if( fd_funk_txn_map_remove( funk->txn_map, funk->shmem->last_publish, NULL, query, FD_MAP_FLAG_BLOCKING ) == FD_MAP_SUCCESS ) {
603 203826 : fd_funk_txn_pool_release( funk->txn_pool, txn, 1 );
604 203826 : }
605 :
606 203826 : return FD_FUNK_SUCCESS;
607 203826 : }
608 :
609 : ulong
610 : fd_funk_txn_publish( fd_funk_t * funk,
611 : fd_funk_txn_t * txn,
612 147603 : int verbose ) {
613 :
614 : #ifdef FD_FUNK_HANDHOLDING
615 : if( FD_UNLIKELY( !funk ) ) {
616 : if( FD_UNLIKELY( verbose ) ) FD_LOG_WARNING(( "NULL funk" ));
617 : return 0UL;
618 : }
619 : if( FD_UNLIKELY( !fd_funk_txn_valid( funk, txn ) ) ) {
620 : if( FD_UNLIKELY( verbose ) ) FD_LOG_WARNING(( "bad txn" ));
621 : return 0UL;
622 : }
623 : #else
624 147603 : (void)verbose;
625 147603 : #endif
626 :
627 147603 : ulong txn_idx = (ulong)(txn - funk->txn_pool->ele);
628 :
629 147603 : ulong tag = funk->shmem->cycle_tag++;
630 :
631 147603 : ulong publish_stack_idx = FD_FUNK_TXN_IDX_NULL;
632 :
633 203826 : for(;;) {
634 203826 : fd_funk_txn_t * txn2 = &funk->txn_pool->ele[ txn_idx ];
635 203826 : txn2->tag = tag;
636 :
637 : /* At this point, txn_idx is a transaction that needs to be
638 : published and has been tagged. If txn is a child of funk, we are
639 : ready to publish txn and everything on the publish stack. */
640 :
641 203826 : ulong parent_idx = fd_funk_txn_idx( txn2->parent_cidx );
642 203826 : if( FD_LIKELY( fd_funk_txn_idx_is_null( parent_idx ) ) ) break; /* opt for incr pub */
643 :
644 : /* txn_idx has a parent. Validate and tag it. Push txn to the
645 : publish stack and recurse into the parent. */
646 :
647 56223 : txn2->stack_cidx = fd_funk_txn_cidx( publish_stack_idx );
648 56223 : publish_stack_idx = txn_idx;
649 :
650 56223 : txn_idx = parent_idx;
651 56223 : }
652 :
653 147603 : ulong publish_cnt = 0UL;
654 :
655 203826 : for(;;) {
656 :
657 : /* At this point, all the transactions we need to publish are
658 : tagged, txn is the next up publish funk and publish_stack has the
659 : transactions to follow in order by pop. We use a new tag for
660 : each publish as txn and its siblings we potentially visited in a
661 : previous iteration of this loop. */
662 :
663 203826 : if( FD_UNLIKELY( fd_funk_txn_publish_funk_child( funk, funk->shmem->cycle_tag++, txn_idx ) ) ) break;
664 203826 : publish_cnt++;
665 :
666 203826 : txn_idx = publish_stack_idx;
667 203826 : if( FD_LIKELY( fd_funk_txn_idx_is_null( txn_idx ) ) ) break; /* opt for incr pub */
668 56223 : publish_stack_idx = fd_funk_txn_idx( funk->txn_pool->ele[ txn_idx ].stack_cidx );
669 56223 : }
670 :
671 147603 : return publish_cnt;
672 147603 : }
673 :
674 : int
675 : fd_funk_txn_publish_into_parent( fd_funk_t * funk,
676 : fd_funk_txn_t * txn,
677 12000 : int verbose ) {
678 : #ifdef FD_FUNK_HANDHOLDING
679 : if( FD_UNLIKELY( !funk ) ) {
680 : if( FD_UNLIKELY( verbose ) ) FD_LOG_WARNING(( "NULL funk" ));
681 : return FD_FUNK_ERR_INVAL;
682 : }
683 : if( FD_UNLIKELY( !fd_funk_txn_valid( funk, txn ) ) ) {
684 : if( FD_UNLIKELY( verbose ) ) FD_LOG_WARNING(( "bad txn" ));
685 : return 0UL;
686 : }
687 : #else
688 12000 : (void)verbose;
689 12000 : #endif
690 :
691 12000 : fd_funk_txn_map_t * txn_map = funk->txn_map;
692 12000 : fd_funk_txn_pool_t * txn_pool = funk->txn_pool;
693 12000 : ulong txn_idx = (ulong)(txn - txn_pool->ele);
694 :
695 12000 : ulong oldest_idx = fd_funk_txn_oldest_sibling( funk, txn_idx );
696 12000 : fd_funk_txn_cancel_sibling_list( funk, funk->shmem->cycle_tag++, oldest_idx, txn_idx );
697 :
698 12000 : ulong parent_idx = fd_funk_txn_idx( txn->parent_cidx );
699 12000 : if( fd_funk_txn_idx_is_null( parent_idx ) ) {
700 : /* Publish to root */
701 2532 : fd_funk_txn_update( funk, NULL, NULL, FD_FUNK_TXN_IDX_NULL, fd_funk_root( funk ), txn_idx );
702 : /* Inherit the children */
703 2532 : funk->shmem->child_head_cidx = txn->child_head_cidx;
704 2532 : funk->shmem->child_tail_cidx = txn->child_tail_cidx;
705 9468 : } else {
706 9468 : fd_funk_txn_t * parent_txn = &txn_pool->ele[ parent_idx ];
707 9468 : fd_funk_txn_update( funk, &parent_txn->rec_head_idx, &parent_txn->rec_tail_idx, parent_idx, &parent_txn->xid, txn_idx );
708 : /* Inherit the children */
709 9468 : parent_txn->child_head_cidx = txn->child_head_cidx;
710 9468 : parent_txn->child_tail_cidx = txn->child_tail_cidx;
711 9468 : }
712 :
713 : /* Adjust the parent pointers of the children to point to their grandparent */
714 12000 : ulong child_idx = fd_funk_txn_idx( txn->child_head_cidx );
715 21174 : while( FD_UNLIKELY( !fd_funk_txn_idx_is_null( child_idx ) ) ) {
716 9174 : txn_pool->ele[ child_idx ].parent_cidx = fd_funk_txn_cidx( parent_idx );
717 9174 : child_idx = fd_funk_txn_idx( txn_pool->ele[ child_idx ].sibling_next_cidx );
718 9174 : }
719 :
720 12000 : fd_funk_txn_map_query_t query[1];
721 12000 : if( fd_funk_txn_map_remove( txn_map, fd_funk_txn_xid( txn ), NULL, query, FD_MAP_FLAG_BLOCKING ) == FD_MAP_SUCCESS ) {
722 12000 : fd_funk_txn_pool_release( txn_pool, txn, 1 );
723 12000 : }
724 :
725 12000 : return FD_FUNK_SUCCESS;
726 12000 : }
727 :
728 : fd_funk_rec_t const *
729 : fd_funk_txn_first_rec( fd_funk_t * funk,
730 47624211 : fd_funk_txn_t const * txn ) {
731 47624211 : if( FD_UNLIKELY( !txn ) ) return NULL;
732 44376480 : uint rec_idx = txn->rec_head_idx;
733 44376480 : if( fd_funk_rec_idx_is_null( rec_idx ) ) return NULL;
734 28516608 : return funk->rec_pool->ele + rec_idx;
735 44376480 : }
736 :
737 : fd_funk_rec_t const *
738 : fd_funk_txn_last_rec( fd_funk_t * funk,
739 0 : fd_funk_txn_t const * txn ) {
740 0 : if( FD_UNLIKELY( !txn ) ) return NULL;
741 0 : uint rec_idx = txn->rec_tail_idx;
742 0 : if( fd_funk_rec_idx_is_null( rec_idx ) ) return NULL;
743 0 : return funk->rec_pool->ele + rec_idx;
744 0 : }
745 :
746 : /* Return the next record in a transaction. Returns NULL if the
747 : transaction has no more records. */
748 :
749 : fd_funk_rec_t const *
750 : fd_funk_txn_next_rec( fd_funk_t * funk,
751 445015851 : fd_funk_rec_t const * rec ) {
752 445015851 : uint rec_idx = rec->next_idx;
753 445015851 : if( fd_funk_rec_idx_is_null( rec_idx ) ) return NULL;
754 187996728 : return funk->rec_pool->ele + rec_idx;
755 445015851 : }
756 :
757 : fd_funk_rec_t const *
758 : fd_funk_txn_prev_rec( fd_funk_t * funk,
759 318768039 : fd_funk_rec_t const * rec ) {
760 318768039 : uint rec_idx = rec->prev_idx;
761 318768039 : if( fd_funk_rec_idx_is_null( rec_idx ) ) return NULL;
762 90265524 : return funk->rec_pool->ele + rec_idx;
763 318768039 : }
764 :
765 : fd_funk_txn_xid_t
766 18918894 : fd_funk_generate_xid(void) {
767 18918894 : fd_funk_txn_xid_t xid;
768 18918894 : static FD_TL ulong seq = 0;
769 18918894 : xid.ul[0] =
770 18918894 : (fd_log_cpu_id() + 1U)*3138831853UL +
771 18918894 : (fd_log_thread_id() + 1U)*9180195821UL +
772 18918894 : (++seq)*6208101967UL;
773 18918894 : xid.ul[1] = ((ulong)fd_tickcount())*2810745731UL;
774 18918894 : return xid;
775 18918894 : }
776 :
777 : static void
778 1949853 : fd_funk_txn_all_iter_skip_nulls( fd_funk_txn_all_iter_t * iter ) {
779 1949853 : if( iter->chain_idx == iter->chain_cnt ) return;
780 9948894 : while( fd_funk_txn_map_iter_done( iter->txn_map_iter ) ) {
781 8101068 : if( ++(iter->chain_idx) == iter->chain_cnt ) break;
782 7999041 : iter->txn_map_iter = fd_funk_txn_map_iter( &iter->txn_map, iter->chain_idx );
783 7999041 : }
784 1949853 : }
785 :
786 : void
787 : fd_funk_txn_all_iter_new( fd_funk_t * funk,
788 102027 : fd_funk_txn_all_iter_t * iter ) {
789 102027 : iter->txn_map = *funk->txn_map;
790 102027 : iter->chain_cnt = fd_funk_txn_map_chain_cnt( funk->txn_map );
791 102027 : iter->chain_idx = 0;
792 102027 : iter->txn_map_iter = fd_funk_txn_map_iter( funk->txn_map, 0 );
793 102027 : fd_funk_txn_all_iter_skip_nulls( iter );
794 102027 : }
795 :
796 : int
797 1949853 : fd_funk_txn_all_iter_done( fd_funk_txn_all_iter_t * iter ) {
798 1949853 : return ( iter->chain_idx == iter->chain_cnt );
799 1949853 : }
800 :
801 : void
802 1847826 : fd_funk_txn_all_iter_next( fd_funk_txn_all_iter_t * iter ) {
803 1847826 : iter->txn_map_iter = fd_funk_txn_map_iter_next( iter->txn_map_iter );
804 1847826 : fd_funk_txn_all_iter_skip_nulls( iter );
805 1847826 : }
806 :
807 : fd_funk_txn_t const *
808 1847826 : fd_funk_txn_all_iter_ele_const( fd_funk_txn_all_iter_t * iter ) {
809 1847826 : return fd_funk_txn_map_iter_ele_const( iter->txn_map_iter );
810 1847826 : }
811 :
812 : fd_funk_txn_t *
813 0 : fd_funk_txn_all_iter_ele( fd_funk_txn_all_iter_t * iter ) {
814 0 : return fd_funk_txn_map_iter_ele( iter->txn_map_iter );
815 0 : }
816 :
817 : int
818 12 : fd_funk_txn_verify( fd_funk_t * funk ) {
819 12 : fd_funk_txn_map_t * txn_map = funk->txn_map;
820 12 : fd_funk_txn_pool_t * txn_pool = funk->txn_pool;
821 12 : ulong txn_max = fd_funk_txn_pool_ele_max( txn_pool );
822 :
823 12 : ulong funk_child_head_idx = fd_funk_txn_idx( funk->shmem->child_head_cidx ); /* Previously verified */
824 12 : ulong funk_child_tail_idx = fd_funk_txn_idx( funk->shmem->child_tail_cidx ); /* Previously verified */
825 :
826 12 : fd_funk_txn_xid_t const * last_publish = funk->shmem->last_publish; /* Previously verified */
827 :
828 24 : # define TEST(c) do { \
829 24 : if( FD_UNLIKELY( !(c) ) ) { FD_LOG_WARNING(( "FAIL: %s", #c )); return FD_FUNK_ERR_INVAL; } \
830 24 : } while(0)
831 :
832 12 : # define IS_VALID( idx ) ( (idx==FD_FUNK_TXN_IDX_NULL) || \
833 12 : ((idx<txn_max) && (!fd_funk_txn_xid_eq( fd_funk_txn_xid( &txn_pool->ele[idx] ), last_publish ))) )
834 :
835 12 : TEST( !fd_funk_txn_map_verify( txn_map ) );
836 12 : TEST( !fd_funk_txn_pool_verify( txn_pool ) );
837 :
838 : /* Tag all transactions as not visited yet */
839 :
840 1572876 : for( ulong txn_idx=0UL; txn_idx<txn_max; txn_idx++ ) txn_pool->ele[ txn_idx ].tag = 0UL;
841 :
842 : /* Visit all transactions in preparation, traversing from oldest to
843 : youngest. */
844 :
845 12 : do {
846 :
847 : /* Push all children of funk to the stack */
848 :
849 12 : ulong stack_idx = FD_FUNK_TXN_IDX_NULL;
850 12 : ulong child_idx = funk_child_head_idx;
851 12 : while( !fd_funk_txn_idx_is_null( child_idx ) ) {
852 :
853 : /* Make sure valid idx, not tagged (detects cycles) and child
854 : knows it is a child of funk. Then tag as visited / in-prep,
855 : push to stack and update prep_cnt */
856 :
857 0 : TEST( IS_VALID( child_idx ) );
858 0 : TEST( !txn_pool->ele[ child_idx ].tag );
859 0 : TEST( fd_funk_txn_idx_is_null( fd_funk_txn_idx( txn_pool->ele[ child_idx ].parent_cidx ) ) );
860 0 : txn_pool->ele[ child_idx ].tag = 1UL;
861 0 : txn_pool->ele[ child_idx ].stack_cidx = fd_funk_txn_cidx( stack_idx );
862 0 : stack_idx = child_idx;
863 :
864 0 : ulong next_idx = fd_funk_txn_idx( txn_pool->ele[ child_idx ].sibling_next_cidx );
865 0 : if( !fd_funk_txn_idx_is_null( next_idx ) ) TEST( fd_funk_txn_idx( txn_pool->ele[ next_idx ].sibling_prev_cidx )==child_idx );
866 0 : child_idx = next_idx;
867 0 : }
868 :
869 12 : while( !fd_funk_txn_idx_is_null( stack_idx ) ) {
870 :
871 : /* Pop the next transaction to traverse */
872 :
873 0 : ulong txn_idx = stack_idx;
874 0 : stack_idx = fd_funk_txn_idx( txn_pool->ele[ txn_idx ].stack_cidx );
875 :
876 : /* Push all children of txn to the stack */
877 :
878 0 : ulong child_idx = fd_funk_txn_idx( txn_pool->ele[ txn_idx ].child_head_cidx );
879 0 : while( !fd_funk_txn_idx_is_null( child_idx ) ) {
880 :
881 : /* Make sure valid idx, not tagged (detects cycles) and child
882 : knows it is a child of txn_idx. Then tag as visited /
883 : in-prep, push to stack and update prep_cnt */
884 :
885 0 : TEST( IS_VALID( child_idx ) );
886 0 : TEST( !txn_pool->ele[ child_idx ].tag );
887 0 : TEST( fd_funk_txn_idx( txn_pool->ele[ child_idx ].parent_cidx )==txn_idx );
888 0 : txn_pool->ele[ child_idx ].tag = 1UL;
889 0 : txn_pool->ele[ child_idx ].stack_cidx = fd_funk_txn_cidx( stack_idx );
890 0 : stack_idx = child_idx;
891 :
892 0 : ulong next_idx = fd_funk_txn_idx( txn_pool->ele[ child_idx ].sibling_next_cidx );
893 0 : if( !fd_funk_txn_idx_is_null( next_idx ) ) TEST( fd_funk_txn_idx( txn_pool->ele[ next_idx ].sibling_prev_cidx )==child_idx );
894 0 : child_idx = next_idx;
895 0 : }
896 0 : }
897 :
898 12 : } while(0);
899 :
900 : /* Do it again with a youngest to oldest traversal to test reverse
901 : link integrity */
902 :
903 12 : do {
904 :
905 : /* Push all children of funk to the stack */
906 :
907 12 : ulong stack_idx = FD_FUNK_TXN_IDX_NULL;
908 12 : ulong child_idx = funk_child_tail_idx;
909 12 : while( !fd_funk_txn_idx_is_null( child_idx ) ) {
910 :
911 : /* Make sure valid idx, tagged as visited above (detects cycles)
912 : and child knows it is a child of funk. Then tag as visited /
913 : in-prep, push to stack and update prep_cnt */
914 :
915 0 : TEST( IS_VALID( child_idx ) );
916 0 : TEST( txn_pool->ele[ child_idx ].tag==1UL );
917 0 : TEST( fd_funk_txn_idx_is_null( fd_funk_txn_idx( txn_pool->ele[ child_idx ].parent_cidx ) ) );
918 0 : txn_pool->ele[ child_idx ].tag = 2UL;
919 0 : txn_pool->ele[ child_idx ].stack_cidx = fd_funk_txn_cidx( stack_idx );
920 0 : stack_idx = child_idx;
921 :
922 0 : ulong prev_idx = fd_funk_txn_idx( txn_pool->ele[ child_idx ].sibling_prev_cidx );
923 0 : if( !fd_funk_txn_idx_is_null( prev_idx ) ) TEST( fd_funk_txn_idx( txn_pool->ele[ prev_idx ].sibling_next_cidx )==child_idx );
924 0 : child_idx = prev_idx;
925 0 : }
926 :
927 12 : while( !fd_funk_txn_idx_is_null( stack_idx ) ) {
928 :
929 : /* Pop the next transaction to traverse */
930 :
931 0 : ulong txn_idx = stack_idx;
932 0 : stack_idx = fd_funk_txn_idx( txn_pool->ele[ txn_idx ].stack_cidx );
933 :
934 : /* Push all children of txn to the stack */
935 :
936 0 : ulong child_idx = fd_funk_txn_idx( txn_pool->ele[ txn_idx ].child_tail_cidx );
937 0 : while( !fd_funk_txn_idx_is_null( child_idx ) ) {
938 :
939 : /* Make sure valid idx, tagged as visited above (detects cycles)
940 : and child knows it is a child of txn_idx. Then, tag as
941 : visited / in-prep, push to stack and update prep_cnt */
942 :
943 0 : TEST( IS_VALID( child_idx ) );
944 0 : TEST( txn_pool->ele[ child_idx ].tag==1UL );
945 0 : TEST( fd_funk_txn_idx( txn_pool->ele[ child_idx ].parent_cidx )==txn_idx );
946 0 : txn_pool->ele[ child_idx ].tag = 2UL;
947 0 : txn_pool->ele[ child_idx ].stack_cidx = fd_funk_txn_cidx( stack_idx );
948 0 : stack_idx = child_idx;
949 :
950 0 : ulong prev_idx = fd_funk_txn_idx( txn_pool->ele[ child_idx ].sibling_prev_cidx );
951 0 : if( !fd_funk_txn_idx_is_null( prev_idx ) ) TEST( fd_funk_txn_idx( txn_pool->ele[ prev_idx ].sibling_next_cidx )==child_idx );
952 0 : child_idx = prev_idx;
953 0 : }
954 0 : }
955 12 : } while(0);
956 :
957 12 : # undef IS_VALID
958 12 : # undef TEST
959 :
960 12 : return FD_FUNK_SUCCESS;
961 12 : }
962 :
963 : int
964 0 : fd_funk_txn_valid( fd_funk_t const * funk, fd_funk_txn_t const * txn ) {
965 0 : ulong txn_idx = (ulong)(txn - funk->txn_pool->ele);
966 0 : ulong txn_max = fd_funk_txn_pool_ele_max( funk->txn_pool );
967 0 : if( txn_idx>=txn_max || txn != txn_idx + funk->txn_pool->ele ) return 0;
968 0 : fd_funk_txn_map_query_t query[1];
969 0 : int err = FD_MAP_ERR_AGAIN;
970 0 : while( err == FD_MAP_ERR_AGAIN ) {
971 0 : err = fd_funk_txn_map_query_try( funk->txn_map, &txn->xid, NULL, query, 0 );
972 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) {
973 0 : FD_LOG_DEBUG(( "fd_funk_txn_map_query_try() failed: %d on chain %lu", err, fd_funk_txn_map_iter_chain_idx( funk->txn_map, &txn->xid ) ));
974 0 : return 0;
975 0 : }
976 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) {
977 0 : FD_LOG_WARNING(( "fd_funk_txn_map_query_try() failed: %d on chain %lu", err, fd_funk_txn_map_iter_chain_idx( funk->txn_map, &txn->xid ) ));
978 0 : return 0;
979 0 : }
980 0 : if( FD_LIKELY( err == FD_MAP_SUCCESS ) ) {
981 0 : if( fd_funk_txn_map_query_ele( query ) != txn ) {
982 0 : FD_LOG_WARNING(( "fd_funk_txn_map_query_ele() failed: %p != %p", (void *)fd_funk_txn_map_query_ele( query ), (void *)txn ));
983 0 : return 0;
984 0 : }
985 0 : break;
986 0 : }
987 : /* Normally we'd do this, but we didn't really do any non-atomic reads of element fields.
988 : err = fd_funk_txn_map_query_test( query ); */
989 0 : }
990 0 : return 1;
991 0 : }
|