Line data Source code
1 : #include "fd_funk_txn.h"
2 : #include "fd_funk.h"
3 :
4 : /* Provide the actual transaction map implementation */
5 :
6 : #define POOL_NAME fd_funk_txn_pool
7 2437713 : #define POOL_ELE_T fd_funk_txn_t
8 : #define POOL_IDX_T uint
9 5589987 : #define POOL_NEXT map_next
10 : #define POOL_IMPL_STYLE 2
11 : #include "../util/tmpl/fd_pool_para.c"
12 :
13 : #define MAP_NAME fd_funk_txn_map
14 315918837 : #define MAP_ELE_T fd_funk_txn_t
15 0 : #define MAP_KEY_T fd_funk_txn_xid_t
16 1218789 : #define MAP_KEY xid
17 : #define MAP_KEY_EQ(k0,k1) fd_funk_txn_xid_eq((k0),(k1))
18 : #define MAP_KEY_HASH(k0,seed) fd_funk_txn_xid_hash((k0),(seed))
19 123722602 : #define MAP_NEXT map_next
20 : #define MAP_HASH map_hash
21 42 : #define MAP_MAGIC (0xf173da2ce7172db0UL) /* Firedancer txn db version 0 */
22 : #define MAP_IMPL_STYLE 2
23 : #include "../util/tmpl/fd_map_chain_para.c"
24 :
25 3655965 : #define fd_funk_txn_state_transition(txn, before, after) do { \
26 3655965 : FD_LOG_DEBUG(( "funk_txn laddr=%p xid=%lu:%lu state change (%u-%s) -> (%u-%s)", \
27 3655965 : (void *)(txn), \
28 3655965 : (txn)->xid.ul[0], (txn)->xid.ul[1], \
29 3655965 : (before), fd_funk_txn_state_str( (before) ), \
30 3655965 : (after), fd_funk_txn_state_str( (after) ) )); \
31 3655965 : if( FD_HAS_ATOMIC ) { \
32 3655965 : if( FD_LIKELY( __sync_bool_compare_and_swap( &(txn)->state, before, after ) ) ) break; \
33 3655965 : } else { \
34 0 : if( FD_LIKELY( (txn)->state == (before) ) ) { \
35 0 : (txn)->state = (after); \
36 0 : break; \
37 0 : } \
38 0 : } \
39 3655965 : uint have_ = FD_VOLATILE_CONST( (txn)->state ); \
40 0 : FD_LOG_CRIT(( "Detected data race on funk txn %p: expected state %u-%s, found %u-%s, while transitioning to %u-%s", \
41 0 : (void *)(txn), \
42 0 : (before), fd_funk_txn_state_str( before ), \
43 0 : have_, fd_funk_txn_state_str( have_ ), \
44 0 : (after), fd_funk_txn_state_str( after ) )); \
45 0 : } while(0)
46 :
47 231480 : #define fd_funk_last_publish_transition(funk_shmem, after) do { \
48 231480 : fd_funk_shmem_t * _shmem = (funk_shmem); \
49 231480 : fd_funk_txn_xid_t * _last_pub = _shmem->last_publish; \
50 231480 : fd_funk_txn_xid_t _prev_pub[1]; fd_funk_txn_xid_copy( _prev_pub, _last_pub ); \
51 231480 : fd_funk_txn_xid_copy( _last_pub, (after) ); \
52 231480 : FD_LOG_DEBUG(( "funk last_publish (%lu:%lu) -> (%lu:%lu)", \
53 231480 : _prev_pub->ul[0], _prev_pub->ul[1], \
54 231480 : _last_pub->ul[0], _last_pub->ul[1] )); \
55 231480 : } while(0)
56 :
57 : void
58 : fd_funk_txn_prepare( fd_funk_t * funk,
59 : fd_funk_txn_xid_t const * parent_xid,
60 1218789 : fd_funk_txn_xid_t const * xid ) {
61 :
62 1218789 : if( FD_UNLIKELY( !funk ) ) FD_LOG_CRIT(( "NULL funk" ));
63 1218789 : if( FD_UNLIKELY( !parent_xid ) ) FD_LOG_CRIT(( "NULL parent_xid" ));
64 1218789 : if( FD_UNLIKELY( !xid ) ) FD_LOG_CRIT(( "NULL xid" ));
65 1218789 : if( FD_UNLIKELY( fd_funk_txn_xid_eq_root( xid ) ) ) FD_LOG_CRIT(( "xid is root" ));
66 :
67 1218789 : if( FD_UNLIKELY( fd_funk_txn_xid_eq( xid, funk->shmem->last_publish ) ) ) {
68 0 : FD_LOG_ERR(( "fd_funk_txn_prepare failed: xid %lu:%lu is the last published",
69 0 : xid->ul[0], xid->ul[1] ));
70 0 : }
71 :
72 1218789 : fd_funk_txn_map_query_t query[1];
73 1218789 : if( FD_UNLIKELY( fd_funk_txn_map_query_try( funk->txn_map, xid, NULL, query, 0 ) != FD_MAP_ERR_KEY ) ) {
74 0 : FD_LOG_ERR(( "fd_funk_txn_prepare failed: xid %lu:%lu already in use",
75 0 : xid->ul[0], xid->ul[1] ));
76 0 : }
77 :
78 1218789 : ulong parent_idx;
79 1218789 : uint * _child_head_cidx;
80 1218789 : uint * _child_tail_cidx;
81 :
82 1218789 : if( FD_UNLIKELY( fd_funk_txn_xid_eq( parent_xid, funk->shmem->last_publish ) ) ) {
83 :
84 473652 : parent_idx = FD_FUNK_TXN_IDX_NULL;
85 :
86 473652 : _child_head_cidx = &funk->shmem->child_head_cidx;
87 473652 : _child_tail_cidx = &funk->shmem->child_tail_cidx;
88 :
89 745137 : } else {
90 :
91 745137 : int query_err = fd_funk_txn_map_query_try( funk->txn_map, parent_xid, NULL, query, 0 );
92 745137 : if( FD_UNLIKELY( query_err!=FD_MAP_SUCCESS ) ) {
93 0 : FD_LOG_CRIT(( "fd_funk_txn_prepare failed: user provided invalid parent XID %lu:%lu (err %i-%s)",
94 0 : parent_xid->ul[0], parent_xid->ul[1],
95 0 : query_err, fd_map_strerror( query_err ) ));
96 0 : }
97 :
98 745137 : fd_funk_txn_t * parent = fd_funk_txn_map_query_ele( query );
99 745137 : fd_funk_txn_state_assert( parent, FD_FUNK_TXN_STATE_ACTIVE );
100 745137 : parent_idx = (ulong)(parent - funk->txn_pool->ele);
101 :
102 745137 : _child_head_cidx = &parent->child_head_cidx;
103 745137 : _child_tail_cidx = &parent->child_tail_cidx;
104 :
105 745137 : }
106 :
107 : /* Get a new transaction from the map */
108 :
109 1218789 : fd_funk_txn_t * txn = fd_funk_txn_pool_acquire( funk->txn_pool, NULL, 1, NULL );
110 1218789 : if( FD_UNLIKELY( !txn ) ) FD_LOG_ERR(( "fd_funk_txn_prepare failed: transaction object pool out of memory" ));
111 1218789 : fd_funk_txn_xid_copy( &txn->xid, xid );
112 1218789 : ulong txn_idx = (ulong)(txn - funk->txn_pool->ele);
113 :
114 : /* Join the family */
115 :
116 1218789 : ulong sibling_prev_idx = fd_funk_txn_idx( *_child_tail_cidx );
117 :
118 1218789 : int first_born = fd_funk_txn_idx_is_null( sibling_prev_idx );
119 :
120 1218789 : txn->parent_cidx = fd_funk_txn_cidx( parent_idx );
121 1218789 : txn->child_head_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
122 1218789 : txn->child_tail_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
123 1218789 : txn->sibling_prev_cidx = fd_funk_txn_cidx( sibling_prev_idx );
124 1218789 : txn->sibling_next_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
125 1218789 : txn->stack_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
126 1218789 : txn->tag = 0UL;
127 :
128 1218789 : fd_funk_txn_state_transition( txn, FD_FUNK_TXN_STATE_FREE, FD_FUNK_TXN_STATE_ACTIVE );
129 1218789 : txn->rec_head_idx = FD_FUNK_REC_IDX_NULL;
130 1218789 : txn->rec_tail_idx = FD_FUNK_REC_IDX_NULL;
131 :
132 : /* TODO: consider branchless impl */
133 1218789 : if( FD_LIKELY( first_born ) ) *_child_head_cidx = fd_funk_txn_cidx( txn_idx ); /* opt for non-compete */
134 615570 : else funk->txn_pool->ele[ sibling_prev_idx ].sibling_next_cidx = fd_funk_txn_cidx( txn_idx );
135 :
136 1218789 : *_child_tail_cidx = fd_funk_txn_cidx( txn_idx );
137 :
138 1218789 : if( parent_xid ) {
139 1218789 : if( FD_UNLIKELY( fd_funk_txn_map_query_test( query )!=FD_MAP_SUCCESS ) ) {
140 0 : FD_LOG_CRIT(( "Detected data race while preparing a funk txn" ));
141 0 : }
142 1218789 : }
143 :
144 1218789 : fd_funk_txn_map_insert( funk->txn_map, txn, FD_MAP_FLAG_BLOCKING );
145 1218789 : }
146 :
147 : /* fd_funk_txn_cancel_childless cancels a transaction that is known
148 : to be childless. Callers have already validated our input arguments.
149 : Assumes that cancelling in the app can't fail but that could be
150 : straightforward to support by giving this an error and plumbing
151 : through to abort the overall cancel operation when it hits a error. */
152 :
153 : static void
154 : fd_funk_txn_cancel_childless( fd_funk_t * funk,
155 977703 : ulong txn_idx ) {
156 :
157 : /* Remove all records used by this transaction. Note that we don't
158 : need to bother doing all the individual removal operations as we
159 : are removing the whole list. We do reset the record transaction
160 : idx with NULL though we can detect cycles as soon as possible
161 : and abort. */
162 :
163 977703 : fd_wksp_t * wksp = funk->wksp;
164 977703 : fd_alloc_t * alloc = funk->alloc;
165 977703 : fd_funk_rec_map_t * rec_map = funk->rec_map;
166 977703 : fd_funk_rec_pool_t * rec_pool = funk->rec_pool;
167 977703 : ulong rec_max = fd_funk_rec_pool_ele_max( rec_pool );
168 977703 : fd_funk_txn_map_t * txn_map = funk->txn_map;
169 977703 : fd_funk_txn_pool_t * txn_pool = funk->txn_pool;
170 :
171 977703 : fd_funk_txn_t * txn = &txn_pool->ele[ txn_idx ];
172 977703 : fd_funk_txn_state_assert( txn, FD_FUNK_TXN_STATE_CANCEL );
173 :
174 977703 : uint rec_idx = txn->rec_head_idx;
175 2899068 : while( !fd_funk_rec_idx_is_null( rec_idx ) ) {
176 :
177 1921365 : if( FD_UNLIKELY( rec_idx>=rec_max ) ) FD_LOG_CRIT(( "memory corruption detected (bad idx)" ));
178 1921365 : if( FD_UNLIKELY( !fd_funk_txn_xid_eq( rec_pool->ele[ rec_idx ].pair.xid, &txn->xid ) ) )
179 0 : FD_LOG_CRIT(( "memory corruption detected (cycle or bad idx)" ));
180 :
181 1921365 : fd_funk_rec_t * rec = &rec_pool->ele[ rec_idx ];
182 1921365 : uint next_idx = rec->next_idx;
183 :
184 1921365 : for(;;) {
185 1921365 : fd_funk_rec_map_query_t rec_query[1];
186 1921365 : int err = fd_funk_rec_map_remove( rec_map, fd_funk_rec_pair( rec ), NULL, rec_query, FD_MAP_FLAG_BLOCKING );
187 1921365 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
188 1921365 : if( err == FD_MAP_ERR_KEY ) break;
189 1921365 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "map corruption" ));
190 1921365 : if( rec != fd_funk_rec_map_query_ele( rec_query ) ) break;
191 1921365 : fd_funk_val_flush( rec, alloc, wksp );
192 1921365 : fd_funk_rec_pool_release( rec_pool, rec, 1 );
193 1921365 : break;
194 1921365 : }
195 :
196 1921365 : rec_idx = next_idx;
197 1921365 : }
198 :
199 : /* Leave the family */
200 :
201 977703 : ulong sibling_prev_idx = fd_funk_txn_idx( txn->sibling_prev_cidx );
202 977703 : ulong sibling_next_idx = fd_funk_txn_idx( txn->sibling_next_cidx );
203 :
204 : /* TODO: Consider branchless impl */
205 : /* FIXME use compare-and-swap here */
206 :
207 977703 : if( FD_LIKELY( fd_funk_txn_idx_is_null( sibling_prev_idx ) ) ) { /* opt for non-compete */
208 480876 : ulong parent_idx = fd_funk_txn_idx( txn_pool->ele[ txn_idx ].parent_cidx );
209 480876 : if( FD_LIKELY( fd_funk_txn_idx_is_null( parent_idx ) ) ) { /* No older sib and is a funk child, opt for incr pub */
210 147987 : funk->shmem->child_head_cidx = fd_funk_txn_cidx( sibling_next_idx );
211 332889 : } else { /* No older sib and has parent */
212 332889 : txn_pool->ele[ parent_idx ].child_head_cidx = fd_funk_txn_cidx( sibling_next_idx );
213 332889 : }
214 496827 : } else { /* Has older sib */
215 496827 : txn_pool->ele[ sibling_prev_idx ].sibling_next_cidx = fd_funk_txn_cidx( sibling_next_idx );
216 496827 : }
217 :
218 977703 : if( FD_LIKELY( fd_funk_txn_idx_is_null( sibling_next_idx ) ) ) { /* opt for non-compete */
219 781683 : ulong parent_idx = fd_funk_txn_idx( txn_pool->ele[ txn_idx ].parent_cidx );
220 781683 : if( FD_LIKELY( fd_funk_txn_idx_is_null( parent_idx ) ) ) { /* No younger sib and is a funk child, opt for incr pub */
221 289740 : funk->shmem->child_tail_cidx = fd_funk_txn_cidx( sibling_prev_idx );
222 491943 : } else { /* No younger sib and has parent */
223 491943 : txn_pool->ele[ parent_idx ].child_tail_cidx = fd_funk_txn_cidx( sibling_prev_idx );
224 491943 : }
225 781683 : } else { /* Has younger sib */
226 196020 : txn_pool->ele[ sibling_next_idx ].sibling_prev_cidx = fd_funk_txn_cidx( sibling_prev_idx );
227 196020 : }
228 :
229 977703 : fd_funk_txn_map_query_t query[1];
230 977703 : if( fd_funk_txn_map_remove( txn_map, fd_funk_txn_xid( txn ), NULL, query, FD_MAP_FLAG_BLOCKING ) == FD_MAP_SUCCESS ) {
231 977703 : fd_funk_txn_state_transition( txn, FD_FUNK_TXN_STATE_CANCEL, FD_FUNK_TXN_STATE_FREE );
232 977703 : fd_funk_txn_pool_release( txn_pool, txn, 1 );
233 977703 : }
234 977703 : }
235 :
236 : /* fd_funk_txn_cancel_family cancels a transaction and all its
237 : descendants in a tree-depth-first-ordered sense from youngest to
238 : oldest. Callers have already validated our input arguments. Returns
239 : the number of transactions canceled. */
240 :
241 : static ulong
242 : fd_funk_txn_cancel_family( fd_funk_t * funk,
243 : ulong tag,
244 525273 : ulong txn_idx ) {
245 525273 : ulong cancel_cnt = 0UL;
246 :
247 525273 : ulong parent_stack_idx = FD_FUNK_TXN_IDX_NULL;
248 :
249 525273 : fd_funk_txn_t * txn = &funk->txn_pool->ele[ txn_idx ];
250 525273 : fd_funk_txn_state_assert( txn, FD_FUNK_TXN_STATE_CANCEL );
251 :
252 1430133 : for(;;) {
253 :
254 1430133 : txn = &funk->txn_pool->ele[ txn_idx ];
255 1430133 : txn->tag = tag;
256 :
257 1430133 : ulong youngest_idx = fd_funk_txn_idx( txn->child_tail_cidx );
258 1430133 : if( FD_LIKELY( fd_funk_txn_idx_is_null( youngest_idx ) ) ) { /* txn is is childless, opt for incr pub */
259 :
260 977703 : fd_funk_txn_cancel_childless( funk, txn_idx ); /* If this can fail, return cancel_cnt here on fail */
261 977703 : cancel_cnt++;
262 :
263 977703 : txn_idx = parent_stack_idx; /* Pop the parent stack */
264 977703 : if( FD_LIKELY( fd_funk_txn_idx_is_null( txn_idx ) ) ) break; /* If stack is empty, we are done, opt for incr pub */
265 452430 : parent_stack_idx = fd_funk_txn_idx( funk->txn_pool->ele[ txn_idx ].stack_cidx );
266 452430 : continue;
267 977703 : }
268 :
269 : /* txn has at least one child and the youngest is youngest_idx. Tag
270 : the youngest child, push txn onto the parent stack and recurse
271 : into the youngest child. */
272 :
273 452430 : txn->stack_cidx = fd_funk_txn_cidx( parent_stack_idx );
274 452430 : parent_stack_idx = txn_idx;
275 452430 : fd_funk_txn_state_transition( &funk->txn_pool->ele[ youngest_idx ], FD_FUNK_TXN_STATE_ACTIVE, FD_FUNK_TXN_STATE_CANCEL );
276 :
277 452430 : txn_idx = youngest_idx;
278 452430 : }
279 :
280 525273 : return cancel_cnt;
281 525273 : }
282 :
283 : ulong
284 : fd_funk_txn_cancel( fd_funk_t * funk,
285 158094 : fd_funk_txn_xid_t const * xid ) {
286 :
287 158094 : if( FD_UNLIKELY( !funk ) ) FD_LOG_CRIT(( "NULL funk" ));
288 158094 : if( FD_UNLIKELY( !xid ) ) FD_LOG_CRIT(( "NULL xid" ));
289 :
290 158094 : fd_funk_txn_map_query_t query[1];
291 158094 : int map_err = fd_funk_txn_map_query_try( funk->txn_map, xid, NULL, query, 0 );
292 158094 : if( FD_UNLIKELY( map_err!=FD_MAP_SUCCESS ) ) {
293 0 : FD_LOG_CRIT(( "Failed to cancel txn %lu:%lu (%i-%s)",
294 0 : xid->ul[0], xid->ul[1],
295 0 : map_err, fd_map_strerror( map_err ) ));
296 0 : }
297 158094 : fd_funk_txn_t * txn = fd_funk_txn_map_query_ele( query );
298 158094 : fd_funk_txn_state_transition( txn, FD_FUNK_TXN_STATE_ACTIVE, FD_FUNK_TXN_STATE_CANCEL );
299 158094 : if( FD_UNLIKELY( fd_funk_txn_map_query_test( query )!=FD_MAP_SUCCESS ) ) {
300 0 : FD_LOG_CRIT(( "Detected data race while cancelling a funk txn" ));
301 0 : }
302 :
303 158094 : ulong txn_idx = (ulong)(txn - funk->txn_pool->ele);
304 158094 : return fd_funk_txn_cancel_family( funk, funk->shmem->cycle_tag++, txn_idx );
305 158094 : }
306 :
307 : /* fd_funk_txn_oldest_sibling returns the index of the oldest sibling
308 : in txn_idx's family. Callers have already validated our input
309 : arguments. The caller should validate the return index. */
310 :
311 : static inline ulong
312 : fd_funk_txn_oldest_sibling( fd_funk_t * funk,
313 240885 : ulong txn_idx ) {
314 240885 : ulong parent_idx = fd_funk_txn_idx( funk->txn_pool->ele[ txn_idx ].parent_cidx );
315 :
316 240885 : if( FD_LIKELY( fd_funk_txn_idx_is_null( parent_idx ) ) ) return fd_funk_txn_idx( funk->shmem->child_head_cidx ); /* opt for incr pub */
317 :
318 9408 : return fd_funk_txn_idx( funk->txn_pool->ele[ parent_idx ].child_head_cidx );
319 240885 : }
320 :
321 : /* fd_funk_txn_cancel_sibling_list cancels siblings from sibling_idx down
322 : to the youngest sibling inclusive in the order from youngest to
323 : sibling_idx. Callers have already validated our input arguments
324 : except sibling_idx. Returns the number of cancelled transactions
325 : (should be at least 1). If any sibling is skip_idx, it will be not
326 : be cancelled but still tagged as visited. Passing
327 : FD_FUNK_TXN_IDX_NULL for skip_idx will cancel all siblings from
328 : sibling_idx to the youngest inclusive. */
329 :
330 : static ulong
331 : fd_funk_txn_cancel_sibling_list( fd_funk_t * funk,
332 : ulong tag,
333 : ulong sibling_idx,
334 240885 : ulong skip_idx ) {
335 :
336 240885 : ulong cancel_stack_idx = FD_FUNK_TXN_IDX_NULL;
337 :
338 : /* Push siblings_idx and its younger siblings inclusive (skipping
339 : sibling skip_idx if encounter) onto the cancel stack from oldest to
340 : youngest (such that we cancel youngest to oldest). */
341 :
342 608064 : for(;;) {
343 :
344 : /* At this point, sibling_idx is a sibling we might want to add to
345 : the sibling stack. Validate and tag it. */
346 :
347 608064 : fd_funk_txn_t * sibling = &funk->txn_pool->ele[ sibling_idx ];
348 608064 : sibling->tag = tag;
349 :
350 608064 : if( FD_UNLIKELY( sibling_idx!=skip_idx ) ) { /* Not skip_idx so push onto the cancel stack, opt for non-compete */
351 367179 : fd_funk_txn_state_transition( sibling, FD_FUNK_TXN_STATE_ACTIVE, FD_FUNK_TXN_STATE_CANCEL );
352 367179 : sibling->stack_cidx = fd_funk_txn_cidx( cancel_stack_idx );
353 367179 : cancel_stack_idx = sibling_idx;
354 367179 : }
355 :
356 608064 : ulong younger_idx = fd_funk_txn_idx( sibling->sibling_next_cidx );
357 608064 : if( FD_LIKELY( fd_funk_txn_idx_is_null( younger_idx ) ) ) break; /* opt for non-compete */
358 367179 : sibling_idx = younger_idx;
359 :
360 367179 : }
361 :
362 : /* Cancel all transactions and their descendants on the cancel stack */
363 :
364 240885 : ulong cancel_cnt = 0UL;
365 :
366 608064 : while( !fd_funk_txn_idx_is_null( cancel_stack_idx ) ) { /* TODO: peel first iter to make more predictable? */
367 367179 : ulong sibling_idx = cancel_stack_idx;
368 367179 : cancel_stack_idx = fd_funk_txn_idx( funk->txn_pool->ele[ sibling_idx ].stack_cidx );
369 367179 : cancel_cnt += fd_funk_txn_cancel_family( funk, tag, sibling_idx );
370 367179 : }
371 :
372 240885 : return cancel_cnt;
373 240885 : }
374 :
375 : ulong
376 : fd_funk_txn_cancel_children( fd_funk_t * funk,
377 0 : fd_funk_txn_xid_t const * xid ) {
378 :
379 0 : if( FD_UNLIKELY( !funk ) ) FD_LOG_CRIT(( "NULL funk" ));
380 :
381 0 : fd_funk_txn_t * txn = NULL;
382 :
383 0 : ulong oldest_idx;
384 :
385 0 : if( FD_LIKELY( !xid ) ) {
386 0 : oldest_idx = fd_funk_txn_idx( funk->shmem->child_head_cidx ); /* opt for non-compete */
387 0 : } else {
388 0 : fd_funk_txn_map_query_t query[1];
389 0 : int map_err = fd_funk_txn_map_query_try( funk->txn_map, xid, NULL, query, 0 );
390 0 : if( FD_UNLIKELY( map_err!=FD_MAP_SUCCESS ) ) {
391 0 : FD_LOG_ERR(( "Failed to publish txn %lu:%lu: err %i", xid->ul[0], xid->ul[1], map_err ));
392 0 : }
393 0 : txn = fd_funk_txn_map_query_ele( query );
394 0 : oldest_idx = fd_funk_txn_idx( txn->child_head_cidx );
395 0 : }
396 :
397 0 : if( fd_funk_txn_idx_is_null( oldest_idx ) ) {
398 0 : return 0UL;
399 0 : }
400 :
401 0 : ulong res = fd_funk_txn_cancel_sibling_list( funk, funk->shmem->cycle_tag++, oldest_idx, FD_FUNK_TXN_IDX_NULL );
402 0 : if( txn ) fd_funk_txn_state_assert( txn, FD_FUNK_TXN_STATE_ACTIVE );
403 0 : return res;
404 0 : }
405 :
406 : void
407 3 : fd_funk_txn_remove_published( fd_funk_t * funk ) {
408 : /* Prevent new funk txn objects from spawning */
409 3 : if( FD_UNLIKELY( fd_funk_txn_pool_lock( funk->txn_pool, 1 )!=FD_POOL_SUCCESS ) ) {
410 0 : FD_LOG_CRIT(( "Failed to remove published txns: fd_funk_txn_pool_lock failed" ));
411 0 : }
412 3 : if( FD_UNLIKELY( fd_funk_last_publish_is_frozen( funk ) ) ) {
413 0 : fd_funk_txn_pool_unlock( funk->txn_pool );
414 0 : FD_LOG_ERR(( "Failed to remove published txns: there are still txns in preparation" ));
415 0 : }
416 :
417 3 : fd_wksp_t * wksp = funk->wksp;
418 3 : fd_alloc_t * alloc = funk->alloc;
419 3 : fd_funk_rec_map_t * rec_map = funk->rec_map;
420 3 : fd_funk_rec_pool_t * rec_pool = funk->rec_pool;
421 :
422 : /* Iterate over all funk records and remove them */
423 3 : ulong chain_cnt = fd_funk_rec_map_chain_cnt( rec_map );
424 9 : for( ulong chain_idx=0UL; chain_idx<chain_cnt; chain_idx++ ) {
425 : /* FIXME: Chains could be locked while iterating if the remove API
426 : supported */
427 : //ulong lock_seq[1] = {chain_idx};
428 : //int lock_err = fd_funk_rec_map_iter_lock( rec_map, lock_seq, 1UL, FD_MAP_FLAG_BLOCKING );
429 : //if( FD_UNLIKELY( lock_err!=FD_MAP_SUCCESS ) ) {
430 : // FD_LOG_CRIT(( "fd_funk_rec_map_iter_lock failed (%i-%s)", lock_err, fd_map_strerror( lock_err ) ));
431 : //}
432 6 : for(
433 6 : fd_funk_rec_map_iter_t iter = fd_funk_rec_map_iter( rec_map, chain_idx );
434 3078 : !fd_funk_rec_map_iter_done( iter );
435 3072 : ) {
436 3072 : fd_funk_rec_t * rec = fd_funk_rec_map_iter_ele( iter );
437 3072 : ulong next = fd_funk_rec_map_private_idx( rec->map_next );;
438 :
439 : /* Remove rec object from map */
440 3072 : fd_funk_rec_map_query_t rec_query[1];
441 3072 : int err = fd_funk_rec_map_remove( rec_map, fd_funk_rec_pair( rec ), NULL, rec_query, FD_MAP_FLAG_BLOCKING );
442 3072 : fd_funk_rec_key_t key; fd_funk_rec_key_copy( &key, rec->pair.key );
443 3072 : if( FD_UNLIKELY( err!=FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "fd_funk_rec_map_remove failed (%i-%s)", err, fd_map_strerror( err ) ));
444 :
445 : /* Sanity check: Record belongs to last published XID */
446 3072 : if( FD_UNLIKELY( !fd_funk_txn_xid_eq_root( rec->pair.xid ) ) ) {
447 0 : FD_LOG_ERR(( "Failed to remove published txns: concurrent in-prep record detected" ));
448 0 : }
449 :
450 : /* Free rec resources */
451 3072 : fd_funk_val_flush( rec, alloc, wksp );
452 3072 : fd_funk_rec_pool_release( rec_pool, rec, 1 );
453 3072 : iter.ele_idx = next;
454 3072 : }
455 : //fd_funk_rec_map_iter_unlock( rec_map, lock_seq, 1UL );
456 6 : }
457 :
458 : /* Reset 'last published' XID to 'root' XID */
459 3 : fd_funk_txn_xid_t root_xid; fd_funk_txn_xid_set_root( &root_xid );
460 3 : fd_funk_last_publish_transition( funk->shmem, &root_xid );
461 :
462 3 : fd_funk_txn_pool_unlock( funk->txn_pool );
463 3 : }
464 :
465 : /* Cancel all outstanding transactions */
466 :
467 : ulong
468 0 : fd_funk_txn_cancel_all( fd_funk_t * funk ) {
469 0 : return fd_funk_txn_cancel_children( funk, NULL );
470 0 : }
471 :
472 : /* fd_funk_txn_update applies the record updates in transaction txn_idx
473 : to another transaction or the parent transaction. Callers have
474 : already validated our input arguments.
475 :
476 : On entry, the head/tail of the destination records are at
477 : *_dst_rec_head_idx / *_dst_rec_tail_idx. All transactions on this
478 : list will have transaction id dst_xid and vice versa. That is, this
479 : is the record list the last published transaction or txn_idx's
480 : in-prep parent transaction.
481 :
482 : On exit, the head/tail of the updated records is at
483 : *_dst_rec_head_idx / *_dst_rec_tail_idx. As before, all transactions
484 : on this list will have transaction id dst_xid and vice versa.
485 : Transaction txn_idx will have an _empty_ record list.
486 :
487 : Updates in the transaction txn_idx are processed from oldest to
488 : youngest. If an update is to update an existing record, the
489 : destination record value is updated and the order of the destination
490 : records is unchanged. If an update is to create a new record, the
491 : record is appended to the list of existing values as youngest without
492 : changing the order of existing values. If an update erases a record
493 : in an in-prep parent, the erasure will be moved into the parent as
494 : the youngest without changing the order of existing values. */
495 :
496 : static void
497 : fd_funk_txn_update( fd_funk_t * funk,
498 : uint * _dst_rec_head_idx, /* Pointer to the dst list head */
499 : uint * _dst_rec_tail_idx, /* Pointer to the dst list tail */
500 : fd_funk_txn_xid_t const * dst_xid, /* dst xid */
501 240885 : ulong txn_idx ) { /* Transaction index of the records to merge */
502 240885 : fd_wksp_t * wksp = funk->wksp;
503 240885 : fd_alloc_t * alloc = funk->alloc;
504 240885 : fd_funk_rec_map_t * rec_map = funk->rec_map;
505 240885 : fd_funk_rec_pool_t * rec_pool = funk->rec_pool;
506 240885 : fd_funk_txn_pool_t * txn_pool = funk->txn_pool;
507 :
508 240885 : fd_funk_txn_t * txn = &txn_pool->ele[ txn_idx ];
509 240885 : fd_funk_txn_state_assert( txn, FD_FUNK_TXN_STATE_PUBLISH );
510 240885 : uint rec_idx = txn->rec_head_idx;
511 546312 : while( !fd_funk_rec_idx_is_null( rec_idx ) ) {
512 305427 : fd_funk_rec_t * rec = &rec_pool->ele[ rec_idx ];
513 305427 : uint next_rec_idx = rec->next_idx;
514 :
515 : /* See if (dst_xid,key) already exists. */
516 305427 : fd_funk_xid_key_pair_t pair[1];
517 305427 : fd_funk_xid_key_pair_init( pair, dst_xid, rec->pair.key );
518 305427 : for(;;) {
519 305427 : fd_funk_rec_map_query_t rec_query[1];
520 305427 : int err = fd_funk_rec_map_remove( rec_map, pair, NULL, rec_query, FD_MAP_FLAG_BLOCKING );
521 305427 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
522 305427 : if( err == FD_MAP_ERR_KEY ) break;
523 253122 : if( FD_UNLIKELY( err != FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "map corruption" ));
524 :
525 : /* Remove from the transaction */
526 253122 : fd_funk_rec_t * rec2 = fd_funk_rec_map_query_ele( rec_query );
527 253122 : uint prev_idx = rec2->prev_idx;
528 253122 : uint next_idx = rec2->next_idx;
529 253122 : if( fd_funk_rec_idx_is_null( prev_idx ) ) {
530 252270 : if( _dst_rec_head_idx ) *_dst_rec_head_idx = next_idx;
531 252270 : } else {
532 852 : rec_pool->ele[ prev_idx ].next_idx = next_idx;
533 852 : }
534 253122 : if( fd_funk_rec_idx_is_null( next_idx ) ) {
535 252111 : if( _dst_rec_tail_idx ) *_dst_rec_tail_idx = prev_idx;
536 252111 : } else {
537 1011 : rec_pool->ele[ next_idx ].prev_idx = prev_idx;
538 1011 : }
539 : /* Clean up value */
540 253122 : fd_funk_val_flush( rec2, alloc, wksp );
541 253122 : fd_funk_rec_pool_release( rec_pool, rec2, 1 );
542 253122 : break;
543 253122 : }
544 :
545 : /* Add the new record to the transaction. We can update the xid in
546 : place because it is not used for hashing the element. We have
547 : to preserve the original element to preserve the
548 : newest-to-oldest ordering in the hash
549 : chain. fd_funk_rec_query_global relies on this subtle
550 : property. */
551 :
552 305427 : rec->pair.xid[0] = *dst_xid;
553 :
554 305427 : rec->prev_idx = FD_FUNK_REC_IDX_NULL;
555 305427 : if( _dst_rec_head_idx ) {
556 49860 : if( fd_funk_rec_idx_is_null( *_dst_rec_head_idx ) ) {
557 4584 : *_dst_rec_head_idx = rec_idx;
558 45276 : } else {
559 45276 : rec_pool->ele[ *_dst_rec_tail_idx ].next_idx = rec_idx;
560 45276 : rec->prev_idx = *_dst_rec_tail_idx;
561 45276 : }
562 49860 : *_dst_rec_tail_idx = rec_idx;
563 49860 : }
564 305427 : rec->next_idx = FD_FUNK_REC_IDX_NULL;
565 :
566 305427 : rec_idx = next_rec_idx;
567 305427 : }
568 :
569 240885 : txn_pool->ele[ txn_idx ].rec_head_idx = FD_FUNK_REC_IDX_NULL;
570 240885 : txn_pool->ele[ txn_idx ].rec_tail_idx = FD_FUNK_REC_IDX_NULL;
571 240885 : }
572 :
573 : /* fd_funk_txn_publish_funk_child publishes a transaction that is known
574 : to be a child of funk. Callers have already validated our input
575 : arguments. Returns FD_FUNK_SUCCESS on success and an FD_FUNK_ERR_*
576 : code on failure. (There are currently no failure cases but the
577 : plumbing is there if value handling requires it at some point.) */
578 :
579 : static int
580 : fd_funk_txn_publish_funk_child( fd_funk_t * const funk,
581 : ulong const tag,
582 228885 : ulong const txn_idx ) {
583 :
584 : /* Apply the updates in txn to the last published transactions */
585 :
586 228885 : fd_funk_txn_update( funk, NULL, NULL, fd_funk_root( funk ), txn_idx );
587 :
588 : /* Cancel all competing transaction histories */
589 :
590 228885 : ulong oldest_idx = fd_funk_txn_oldest_sibling( funk, txn_idx );
591 228885 : fd_funk_txn_cancel_sibling_list( funk, tag, oldest_idx, txn_idx );
592 :
593 : /* Make all the children children of funk */
594 :
595 228885 : fd_funk_txn_t * txn = fd_funk_txn_pool_ele( funk->txn_pool, txn_idx );
596 228885 : ulong child_head_idx = fd_funk_txn_idx( txn->child_head_cidx );
597 228885 : ulong child_tail_idx = fd_funk_txn_idx( txn->child_tail_cidx );
598 :
599 228885 : ulong child_idx = child_head_idx;
600 450750 : while( FD_UNLIKELY( !fd_funk_txn_idx_is_null( child_idx ) ) ) { /* opt for incr pub */
601 221865 : fd_funk_txn_t * child_txn = fd_funk_txn_pool_ele( funk->txn_pool, child_idx );
602 221865 : child_txn->tag = tag;
603 221865 : child_txn->parent_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
604 221865 : child_idx = fd_funk_txn_idx( child_txn->sibling_next_cidx );
605 221865 : }
606 :
607 228885 : funk->shmem->child_head_cidx = fd_funk_txn_cidx( child_head_idx );
608 228885 : funk->shmem->child_tail_cidx = fd_funk_txn_cidx( child_tail_idx );
609 :
610 228885 : fd_funk_last_publish_transition( funk->shmem, fd_funk_txn_xid( txn ) );
611 :
612 : /* Remove the mapping */
613 :
614 228885 : fd_funk_txn_map_query_t query[1];
615 228885 : int remove_err = fd_funk_txn_map_remove( funk->txn_map, funk->shmem->last_publish, NULL, query, FD_MAP_FLAG_BLOCKING );
616 228885 : if( remove_err!=FD_MAP_SUCCESS ) {
617 0 : FD_LOG_ERR(( "Failed to remove published txn %lu:%lu from map: error %i",
618 0 : funk->shmem->last_publish->ul[0], funk->shmem->last_publish->ul[1],
619 0 : remove_err ));
620 0 : }
621 228885 : fd_funk_txn_state_transition( txn, FD_FUNK_TXN_STATE_PUBLISH, FD_FUNK_TXN_STATE_FREE );
622 228885 : fd_funk_txn_pool_release( funk->txn_pool, txn, 1 );
623 :
624 228885 : return FD_FUNK_SUCCESS;
625 228885 : }
626 :
627 : ulong
628 : fd_funk_txn_publish( fd_funk_t * funk,
629 158559 : fd_funk_txn_xid_t const * xid ) {
630 :
631 158559 : if( FD_UNLIKELY( !funk ) ) FD_LOG_CRIT(( "NULL funk" ));
632 158559 : if( FD_UNLIKELY( !xid ) ) FD_LOG_CRIT(( "NULL xid" ));
633 :
634 158559 : fd_funk_txn_map_query_t query[1];
635 158559 : int map_err = fd_funk_txn_map_query_try( funk->txn_map, xid, NULL, query, 0 );
636 158559 : if( FD_UNLIKELY( map_err!=FD_MAP_SUCCESS ) ) {
637 0 : FD_LOG_ERR(( "Failed to publish txn %lu:%lu (%i-%s)",
638 0 : xid->ul[0], xid->ul[1],
639 0 : map_err, fd_map_strerror( map_err ) ));
640 0 : }
641 158559 : fd_funk_txn_t * txn = fd_funk_txn_map_query_ele( query );
642 158559 : ulong txn_idx = (ulong)(txn - funk->txn_pool->ele);
643 :
644 158559 : ulong tag = funk->shmem->cycle_tag++; /* FIXME use atomic add */
645 :
646 158559 : ulong publish_stack_idx = FD_FUNK_TXN_IDX_NULL;
647 :
648 228885 : for(;;) {
649 228885 : fd_funk_txn_t * txn2 = &funk->txn_pool->ele[ txn_idx ];
650 228885 : fd_funk_txn_state_transition( txn2, FD_FUNK_TXN_STATE_ACTIVE, FD_FUNK_TXN_STATE_PUBLISH );
651 228885 : txn2->tag = tag;
652 :
653 : /* At this point, txn_idx is a transaction that needs to be
654 : published and has been tagged. If txn is a child of funk, we are
655 : ready to publish txn and everything on the publish stack. */
656 :
657 228885 : ulong parent_idx = fd_funk_txn_idx( txn2->parent_cidx );
658 228885 : if( FD_LIKELY( fd_funk_txn_idx_is_null( parent_idx ) ) ) break; /* opt for incr pub */
659 :
660 : /* txn_idx has a parent. Validate and tag it. Push txn to the
661 : publish stack and recurse into the parent. */
662 :
663 70326 : txn2->stack_cidx = fd_funk_txn_cidx( publish_stack_idx );
664 70326 : publish_stack_idx = txn_idx;
665 :
666 70326 : txn_idx = parent_idx;
667 70326 : }
668 :
669 158559 : ulong publish_cnt = 0UL;
670 :
671 228885 : for(;;) {
672 :
673 : /* At this point, all the transactions we need to publish are
674 : tagged, txn is the next up publish funk and publish_stack has the
675 : transactions to follow in order by pop. We use a new tag for
676 : each publish as txn and its siblings we potentially visited in a
677 : previous iteration of this loop. */
678 :
679 228885 : if( FD_UNLIKELY( fd_funk_txn_publish_funk_child( funk, funk->shmem->cycle_tag++, txn_idx ) ) ) break;
680 228885 : publish_cnt++;
681 :
682 228885 : txn_idx = publish_stack_idx;
683 228885 : if( FD_LIKELY( fd_funk_txn_idx_is_null( txn_idx ) ) ) break; /* opt for incr pub */
684 70326 : publish_stack_idx = fd_funk_txn_idx( funk->txn_pool->ele[ txn_idx ].stack_cidx );
685 70326 : }
686 :
687 158559 : return publish_cnt;
688 158559 : }
689 :
690 : void
691 : fd_funk_txn_publish_into_parent( fd_funk_t * funk,
692 12000 : fd_funk_txn_xid_t const * xid ) {
693 12000 : if( FD_UNLIKELY( !funk ) ) FD_LOG_CRIT(( "NULL funk" ));
694 12000 : if( FD_UNLIKELY( !xid ) ) FD_LOG_CRIT(( "NULL xid" ));
695 :
696 12000 : fd_funk_txn_map_query_t query[1];
697 12000 : int map_err = fd_funk_txn_map_query_try( funk->txn_map, xid, NULL, query, 0 );
698 12000 : if( FD_UNLIKELY( map_err!=FD_MAP_SUCCESS ) ) {
699 0 : FD_LOG_ERR(( "Failed to publish txn %lu:%lu: err %i", xid->ul[0], xid->ul[1], map_err ));
700 0 : }
701 12000 : fd_funk_txn_t * txn = fd_funk_txn_map_query_ele( query );
702 12000 : fd_funk_txn_state_transition( txn, FD_FUNK_TXN_STATE_ACTIVE, FD_FUNK_TXN_STATE_PUBLISH );
703 12000 : if( FD_UNLIKELY( fd_funk_txn_map_query_test( query )!=FD_MAP_SUCCESS ) ) {
704 0 : FD_LOG_CRIT(( "Detected data race while publishing a funk txn" ));
705 0 : }
706 :
707 12000 : fd_funk_txn_map_t * txn_map = funk->txn_map;
708 12000 : fd_funk_txn_pool_t * txn_pool = funk->txn_pool;
709 12000 : ulong txn_idx = (ulong)(txn - txn_pool->ele);
710 :
711 12000 : ulong oldest_idx = fd_funk_txn_oldest_sibling( funk, txn_idx );
712 12000 : fd_funk_txn_cancel_sibling_list( funk, funk->shmem->cycle_tag++, oldest_idx, txn_idx );
713 :
714 12000 : ulong parent_idx = fd_funk_txn_idx( txn->parent_cidx );
715 12000 : if( fd_funk_txn_idx_is_null( parent_idx ) ) {
716 : /* Publish to root */
717 2592 : fd_funk_txn_update( funk, NULL, NULL, fd_funk_root( funk ), txn_idx );
718 : /* Inherit the children */
719 2592 : funk->shmem->child_head_cidx = txn->child_head_cidx;
720 2592 : funk->shmem->child_tail_cidx = txn->child_tail_cidx;
721 9408 : } else {
722 9408 : fd_funk_txn_t * parent_txn = &txn_pool->ele[ parent_idx ];
723 9408 : fd_funk_txn_update( funk, &parent_txn->rec_head_idx, &parent_txn->rec_tail_idx, &parent_txn->xid, txn_idx );
724 : /* Inherit the children */
725 9408 : parent_txn->child_head_cidx = txn->child_head_cidx;
726 9408 : parent_txn->child_tail_cidx = txn->child_tail_cidx;
727 9408 : }
728 :
729 : /* Adjust the parent pointers of the children to point to their grandparent */
730 12000 : ulong child_idx = fd_funk_txn_idx( txn->child_head_cidx );
731 21747 : while( FD_UNLIKELY( !fd_funk_txn_idx_is_null( child_idx ) ) ) {
732 9747 : txn_pool->ele[ child_idx ].parent_cidx = fd_funk_txn_cidx( parent_idx );
733 9747 : child_idx = fd_funk_txn_idx( txn_pool->ele[ child_idx ].sibling_next_cidx );
734 9747 : }
735 :
736 12000 : if( fd_funk_txn_idx_is_null( parent_idx ) ) {
737 2592 : fd_funk_last_publish_transition( funk->shmem, xid );
738 2592 : }
739 :
740 12000 : if( fd_funk_txn_map_remove( txn_map, xid, NULL, query, FD_MAP_FLAG_BLOCKING )!=FD_MAP_SUCCESS ) {
741 0 : FD_LOG_ERR(( "Failed to remove published txn %lu:%lu from map: error %i",
742 0 : xid->ul[0], xid->ul[1],
743 0 : map_err ));
744 0 : }
745 12000 : fd_funk_txn_state_transition( txn, FD_FUNK_TXN_STATE_PUBLISH, FD_FUNK_TXN_STATE_FREE );
746 12000 : fd_funk_txn_pool_release( txn_pool, txn, 1 );
747 12000 : }
748 :
749 : fd_funk_rec_t const *
750 : fd_funk_txn_first_rec( fd_funk_t * funk,
751 47096265 : fd_funk_txn_t const * txn ) {
752 47096265 : if( FD_UNLIKELY( !txn ) ) return NULL;
753 43872534 : uint rec_idx = txn->rec_head_idx;
754 43872534 : if( fd_funk_rec_idx_is_null( rec_idx ) ) return NULL;
755 28145136 : return funk->rec_pool->ele + rec_idx;
756 43872534 : }
757 :
758 : fd_funk_rec_t const *
759 : fd_funk_txn_last_rec( fd_funk_t * funk,
760 0 : fd_funk_txn_t const * txn ) {
761 0 : if( FD_UNLIKELY( !txn ) ) return NULL;
762 0 : uint rec_idx = txn->rec_tail_idx;
763 0 : if( fd_funk_rec_idx_is_null( rec_idx ) ) return NULL;
764 0 : return funk->rec_pool->ele + rec_idx;
765 0 : }
766 :
767 : /* Return the next record in a transaction. Returns NULL if the
768 : transaction has no more records. */
769 :
770 : fd_funk_rec_t const *
771 : fd_funk_txn_next_rec( fd_funk_t * funk,
772 442326531 : fd_funk_rec_t const * rec ) {
773 442326531 : uint rec_idx = rec->next_idx;
774 442326531 : if( fd_funk_rec_idx_is_null( rec_idx ) ) return NULL;
775 185678880 : return funk->rec_pool->ele + rec_idx;
776 442326531 : }
777 :
778 : fd_funk_rec_t const *
779 : fd_funk_txn_prev_rec( fd_funk_t * funk,
780 318768039 : fd_funk_rec_t const * rec ) {
781 318768039 : uint rec_idx = rec->prev_idx;
782 318768039 : if( fd_funk_rec_idx_is_null( rec_idx ) ) return NULL;
783 90265524 : return funk->rec_pool->ele + rec_idx;
784 318768039 : }
785 :
786 : fd_funk_txn_xid_t
787 18918897 : fd_funk_generate_xid(void) {
788 18918897 : fd_funk_txn_xid_t xid;
789 18918897 : static FD_TL ulong seq = 0;
790 18918897 : xid.ul[0] =
791 18918897 : (fd_log_cpu_id() + 1U)*3138831853UL +
792 18918897 : (fd_log_thread_id() + 1U)*9180195821UL +
793 18918897 : (++seq)*6208101967UL;
794 18918897 : xid.ul[1] = ((ulong)fd_tickcount())*2810745731UL;
795 18918897 : return xid;
796 18918897 : }
797 :
798 : int
799 12 : fd_funk_txn_verify( fd_funk_t * funk ) {
800 12 : fd_funk_txn_map_t * txn_map = funk->txn_map;
801 12 : fd_funk_txn_pool_t * txn_pool = funk->txn_pool;
802 12 : ulong txn_max = fd_funk_txn_pool_ele_max( txn_pool );
803 :
804 12 : ulong funk_child_head_idx = fd_funk_txn_idx( funk->shmem->child_head_cidx ); /* Previously verified */
805 12 : ulong funk_child_tail_idx = fd_funk_txn_idx( funk->shmem->child_tail_cidx ); /* Previously verified */
806 :
807 12 : fd_funk_txn_xid_t const * last_publish = funk->shmem->last_publish; /* Previously verified */
808 :
809 24 : # define TEST(c) do { \
810 24 : if( FD_UNLIKELY( !(c) ) ) { FD_LOG_WARNING(( "FAIL: %s", #c )); return FD_FUNK_ERR_INVAL; } \
811 24 : } while(0)
812 :
813 12 : # define IS_VALID( idx ) ( (idx==FD_FUNK_TXN_IDX_NULL) || \
814 12 : ((idx<txn_max) && (!fd_funk_txn_xid_eq( fd_funk_txn_xid( &txn_pool->ele[idx] ), last_publish ))) )
815 :
816 12 : TEST( !fd_funk_txn_map_verify( txn_map ) );
817 12 : TEST( !fd_funk_txn_pool_verify( txn_pool ) );
818 :
819 : /* Tag all transactions as not visited yet */
820 :
821 1572876 : for( ulong txn_idx=0UL; txn_idx<txn_max; txn_idx++ ) txn_pool->ele[ txn_idx ].tag = 0UL;
822 :
823 : /* Visit all transactions in preparation, traversing from oldest to
824 : youngest. */
825 :
826 12 : do {
827 :
828 : /* Push all children of funk to the stack */
829 :
830 12 : ulong stack_idx = FD_FUNK_TXN_IDX_NULL;
831 12 : ulong child_idx = funk_child_head_idx;
832 12 : while( !fd_funk_txn_idx_is_null( child_idx ) ) {
833 :
834 : /* Make sure valid idx, not tagged (detects cycles) and child
835 : knows it is a child of funk. Then tag as visited / in-prep,
836 : push to stack and update prep_cnt */
837 :
838 0 : TEST( IS_VALID( child_idx ) );
839 0 : TEST( !txn_pool->ele[ child_idx ].tag );
840 0 : TEST( fd_funk_txn_idx_is_null( fd_funk_txn_idx( txn_pool->ele[ child_idx ].parent_cidx ) ) );
841 0 : txn_pool->ele[ child_idx ].tag = 1UL;
842 0 : txn_pool->ele[ child_idx ].stack_cidx = fd_funk_txn_cidx( stack_idx );
843 0 : stack_idx = child_idx;
844 :
845 0 : ulong next_idx = fd_funk_txn_idx( txn_pool->ele[ child_idx ].sibling_next_cidx );
846 0 : if( !fd_funk_txn_idx_is_null( next_idx ) ) TEST( fd_funk_txn_idx( txn_pool->ele[ next_idx ].sibling_prev_cidx )==child_idx );
847 0 : child_idx = next_idx;
848 0 : }
849 :
850 12 : while( !fd_funk_txn_idx_is_null( stack_idx ) ) {
851 :
852 : /* Pop the next transaction to traverse */
853 :
854 0 : ulong txn_idx = stack_idx;
855 0 : stack_idx = fd_funk_txn_idx( txn_pool->ele[ txn_idx ].stack_cidx );
856 :
857 : /* Push all children of txn to the stack */
858 :
859 0 : ulong child_idx = fd_funk_txn_idx( txn_pool->ele[ txn_idx ].child_head_cidx );
860 0 : while( !fd_funk_txn_idx_is_null( child_idx ) ) {
861 :
862 : /* Make sure valid idx, not tagged (detects cycles) and child
863 : knows it is a child of txn_idx. Then tag as visited /
864 : in-prep, push to stack and update prep_cnt */
865 :
866 0 : TEST( IS_VALID( child_idx ) );
867 0 : TEST( !txn_pool->ele[ child_idx ].tag );
868 0 : TEST( fd_funk_txn_idx( txn_pool->ele[ child_idx ].parent_cidx )==txn_idx );
869 0 : txn_pool->ele[ child_idx ].tag = 1UL;
870 0 : txn_pool->ele[ child_idx ].stack_cidx = fd_funk_txn_cidx( stack_idx );
871 0 : stack_idx = child_idx;
872 :
873 0 : ulong next_idx = fd_funk_txn_idx( txn_pool->ele[ child_idx ].sibling_next_cidx );
874 0 : if( !fd_funk_txn_idx_is_null( next_idx ) ) TEST( fd_funk_txn_idx( txn_pool->ele[ next_idx ].sibling_prev_cidx )==child_idx );
875 0 : child_idx = next_idx;
876 0 : }
877 0 : }
878 :
879 12 : } while(0);
880 :
881 : /* Do it again with a youngest to oldest traversal to test reverse
882 : link integrity */
883 :
884 12 : do {
885 :
886 : /* Push all children of funk to the stack */
887 :
888 12 : ulong stack_idx = FD_FUNK_TXN_IDX_NULL;
889 12 : ulong child_idx = funk_child_tail_idx;
890 12 : while( !fd_funk_txn_idx_is_null( child_idx ) ) {
891 :
892 : /* Make sure valid idx, tagged as visited above (detects cycles)
893 : and child knows it is a child of funk. Then tag as visited /
894 : in-prep, push to stack and update prep_cnt */
895 :
896 0 : TEST( IS_VALID( child_idx ) );
897 0 : TEST( txn_pool->ele[ child_idx ].tag==1UL );
898 0 : TEST( fd_funk_txn_idx_is_null( fd_funk_txn_idx( txn_pool->ele[ child_idx ].parent_cidx ) ) );
899 0 : txn_pool->ele[ child_idx ].tag = 2UL;
900 0 : txn_pool->ele[ child_idx ].stack_cidx = fd_funk_txn_cidx( stack_idx );
901 0 : stack_idx = child_idx;
902 :
903 0 : ulong prev_idx = fd_funk_txn_idx( txn_pool->ele[ child_idx ].sibling_prev_cidx );
904 0 : if( !fd_funk_txn_idx_is_null( prev_idx ) ) TEST( fd_funk_txn_idx( txn_pool->ele[ prev_idx ].sibling_next_cidx )==child_idx );
905 0 : child_idx = prev_idx;
906 0 : }
907 :
908 12 : while( !fd_funk_txn_idx_is_null( stack_idx ) ) {
909 :
910 : /* Pop the next transaction to traverse */
911 :
912 0 : ulong txn_idx = stack_idx;
913 0 : stack_idx = fd_funk_txn_idx( txn_pool->ele[ txn_idx ].stack_cidx );
914 :
915 : /* Push all children of txn to the stack */
916 :
917 0 : ulong child_idx = fd_funk_txn_idx( txn_pool->ele[ txn_idx ].child_tail_cidx );
918 0 : while( !fd_funk_txn_idx_is_null( child_idx ) ) {
919 :
920 : /* Make sure valid idx, tagged as visited above (detects cycles)
921 : and child knows it is a child of txn_idx. Then, tag as
922 : visited / in-prep, push to stack and update prep_cnt */
923 :
924 0 : TEST( IS_VALID( child_idx ) );
925 0 : TEST( txn_pool->ele[ child_idx ].tag==1UL );
926 0 : TEST( fd_funk_txn_idx( txn_pool->ele[ child_idx ].parent_cidx )==txn_idx );
927 0 : txn_pool->ele[ child_idx ].tag = 2UL;
928 0 : txn_pool->ele[ child_idx ].stack_cidx = fd_funk_txn_cidx( stack_idx );
929 0 : stack_idx = child_idx;
930 :
931 0 : ulong prev_idx = fd_funk_txn_idx( txn_pool->ele[ child_idx ].sibling_prev_cidx );
932 0 : if( !fd_funk_txn_idx_is_null( prev_idx ) ) TEST( fd_funk_txn_idx( txn_pool->ele[ prev_idx ].sibling_next_cidx )==child_idx );
933 0 : child_idx = prev_idx;
934 0 : }
935 0 : }
936 12 : } while(0);
937 :
938 12 : # undef IS_VALID
939 12 : # undef TEST
940 :
941 12 : return FD_FUNK_SUCCESS;
942 12 : }
943 :
944 : int
945 0 : fd_funk_txn_valid( fd_funk_t const * funk, fd_funk_txn_t const * txn ) {
946 0 : ulong txn_idx = (ulong)(txn - funk->txn_pool->ele);
947 0 : ulong txn_max = fd_funk_txn_pool_ele_max( funk->txn_pool );
948 0 : if( txn_idx>=txn_max || txn != txn_idx + funk->txn_pool->ele ) return 0;
949 0 : fd_funk_txn_map_query_t query[1];
950 0 : int err = FD_MAP_ERR_AGAIN;
951 0 : while( err == FD_MAP_ERR_AGAIN ) {
952 0 : err = fd_funk_txn_map_query_try( funk->txn_map, &txn->xid, NULL, query, 0 );
953 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) {
954 0 : FD_LOG_DEBUG(( "fd_funk_txn_map_query_try() failed: %d on chain %lu", err, fd_funk_txn_map_iter_chain_idx( funk->txn_map, &txn->xid ) ));
955 0 : return 0;
956 0 : }
957 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) {
958 0 : FD_LOG_WARNING(( "fd_funk_txn_map_query_try() failed: %d on chain %lu", err, fd_funk_txn_map_iter_chain_idx( funk->txn_map, &txn->xid ) ));
959 0 : return 0;
960 0 : }
961 0 : if( FD_LIKELY( err == FD_MAP_SUCCESS ) ) {
962 0 : if( fd_funk_txn_map_query_ele( query ) != txn ) {
963 0 : FD_LOG_WARNING(( "fd_funk_txn_map_query_ele() failed: %p != %p", (void *)fd_funk_txn_map_query_ele( query ), (void *)txn ));
964 0 : return 0;
965 0 : }
966 0 : break;
967 0 : }
968 : /* Normally we'd do this, but we didn't really do any non-atomic reads of element fields.
969 : err = fd_funk_txn_map_query_test( query ); */
970 0 : }
971 0 : return 1;
972 0 : }
|