Line data Source code
1 : #include "fd_accdb_admin_v1.h"
2 : #include "../fd_flamenco_base.h"
3 :
4 : int fd_accdb_log_enabled = 1;
5 :
6 : FD_STATIC_ASSERT( alignof(fd_accdb_admin_v1_t)<=alignof(fd_accdb_admin_t), layout );
7 : FD_STATIC_ASSERT( sizeof (fd_accdb_admin_v1_t)<=sizeof(fd_accdb_admin_t), layout );
8 :
9 : fd_accdb_admin_t *
10 : fd_accdb_admin_v1_init( fd_accdb_admin_t * admin_,
11 : void * shfunk,
12 120 : void * shlocks ) {
13 120 : if( FD_UNLIKELY( !admin_ ) ) {
14 0 : FD_LOG_WARNING(( "NULL ljoin" ));
15 0 : return NULL;
16 0 : }
17 :
18 120 : fd_accdb_admin_v1_t * admin = (fd_accdb_admin_v1_t *)admin_;
19 120 : memset( admin, 0, sizeof(fd_accdb_admin_t) );
20 120 : admin->base.accdb_type = FD_ACCDB_TYPE_V1;
21 120 : admin->base.vt = &fd_accdb_admin_v1_vt;
22 :
23 120 : if( FD_UNLIKELY( !fd_funk_join( admin->funk, shfunk, shlocks ) ) ) {
24 0 : FD_LOG_CRIT(( "fd_funk_join failed" ));
25 0 : }
26 :
27 120 : return admin_;
28 120 : }
29 :
30 : static fd_accdb_admin_v1_t *
31 797202 : downcast( fd_accdb_admin_t * admin ) {
32 797202 : if( FD_UNLIKELY( !admin ) ) {
33 0 : FD_LOG_CRIT(( "NULL admin" ));
34 0 : }
35 797202 : if( FD_UNLIKELY( admin->base.accdb_type!=FD_ACCDB_TYPE_V1 ) ) {
36 0 : FD_LOG_CRIT(( "corrupt accdb_admin handle" ));
37 0 : }
38 797202 : return (fd_accdb_admin_v1_t *)admin;
39 797202 : }
40 :
41 : void
42 117 : fd_accdb_admin_v1_fini( fd_accdb_admin_t * admin_ ) {
43 117 : fd_accdb_admin_v1_t * admin = downcast( admin_ );
44 117 : if( FD_UNLIKELY( !fd_funk_leave( admin->funk, NULL, NULL ) ) ) FD_LOG_CRIT(( "fd_funk_leave failed" ));
45 117 : memset( admin, 0, sizeof(fd_accdb_admin_base_t) );
46 117 : }
47 :
48 : fd_funk_t *
49 105 : fd_accdb_admin_v1_funk( fd_accdb_admin_t * admin ) {
50 105 : fd_accdb_admin_v1_t * a = downcast( admin );
51 105 : return a->funk;
52 105 : }
53 :
54 : fd_funk_txn_xid_t
55 24 : fd_accdb_v1_root_get( fd_accdb_admin_t const * admin_ ) {
56 24 : fd_accdb_admin_v1_t const * admin = (fd_accdb_admin_v1_t const *)admin_;
57 24 : return *fd_funk_last_publish( admin->funk );
58 24 : }
59 :
60 : /* Begin transaction-level operations. It is assumed that funk_txn data
61 : structures are not concurrently modified. This includes txn_pool and
62 : txn_map. */
63 :
64 : void
65 : fd_accdb_v1_attach_child( fd_accdb_admin_t * db_,
66 : fd_funk_txn_xid_t const * xid_parent,
67 571950 : fd_funk_txn_xid_t const * xid_new ) {
68 571950 : fd_accdb_admin_v1_t * db = downcast( db_ );
69 571950 : if( FD_LIKELY( fd_accdb_log_enabled ) )
70 571950 : FD_LOG_INFO(( "accdb txn xid %lu:%lu: created with parent %lu:%lu",
71 571950 : xid_new ->ul[0], xid_new ->ul[1],
72 571950 : xid_parent->ul[0], xid_parent->ul[1] ));
73 571950 : fd_funk_txn_prepare( db->funk, xid_parent, xid_new );
74 571950 : }
75 :
76 : static void
77 : fd_accdb_txn_cancel_one( fd_accdb_admin_v1_t * admin,
78 347136 : fd_funk_txn_t * txn ) {
79 347136 : if( FD_LIKELY( fd_accdb_log_enabled ) )
80 347136 : FD_LOG_INFO(( "accdb txn laddr=%p xid %lu:%lu: cancel", (void *)txn, txn->xid.ul[0], txn->xid.ul[1] ));
81 :
82 347136 : if( FD_UNLIKELY( txn->state!=FD_FUNK_TXN_STATE_ACTIVE ) ) {
83 0 : FD_LOG_CRIT(( "cannot cancel xid %lu:%lu: unxpected state %u-%s",
84 0 : txn->xid.ul[0], txn->xid.ul[1],
85 0 : txn->state, fd_funk_txn_state_str( txn->state ) ));
86 0 : }
87 347136 : fd_funk_t * funk = admin->funk;
88 347136 : if( FD_UNLIKELY( !fd_funk_txn_idx_is_null( txn->child_head_cidx ) ||
89 347136 : !fd_funk_txn_idx_is_null( txn->child_tail_cidx ) ) ) {
90 0 : FD_LOG_CRIT(( "fd_accdb_txn_cancel failed: txn at %p with xid %lu:%lu has children (data corruption?)",
91 0 : (void *)txn, txn->xid.ul[0], txn->xid.ul[1] ));
92 0 : }
93 :
94 : /* Phase 1: Drain users from transaction */
95 :
96 347136 : ulong txn_idx = (ulong)( txn - funk->txn_pool->ele );
97 347136 : fd_rwlock_write( &funk->txn_lock[ txn_idx ] );
98 347136 : FD_COMPILER_MFENCE();
99 347136 : FD_VOLATILE( txn->state ) = FD_FUNK_TXN_STATE_CANCEL;
100 :
101 : /* Phase 2: Detach all records */
102 :
103 347136 : FD_COMPILER_MFENCE();
104 347136 : uint const rec_head_idx = txn->rec_head_idx;
105 347136 : txn->rec_head_idx = FD_FUNK_REC_IDX_NULL;
106 347136 : txn->rec_tail_idx = FD_FUNK_REC_IDX_NULL;
107 347136 : FD_COMPILER_MFENCE();
108 :
109 : /* Phase 3: Remove records */
110 :
111 347136 : ulong rec_cnt = 0UL;
112 347136 : uint rec_idx = rec_head_idx;
113 1031445 : while( !fd_funk_rec_idx_is_null( rec_idx ) ) {
114 684309 : fd_funk_rec_t * rec = &funk->rec_pool->ele[ rec_idx ];
115 684309 : fd_funk_xid_key_pair_t pair = FD_VOLATILE_CONST( rec->pair );
116 :
117 684309 : uint next_idx = rec->next_idx;
118 684309 : if( FD_UNLIKELY( !fd_funk_txn_xid_eq( pair.xid, &txn->xid ) ) ) {
119 0 : FD_LOG_CRIT(( "Record does not belong to txn being cancelled (data corruption?): rec_idx=%u", rec_idx ));
120 0 : }
121 :
122 : /* Phase 3.1: Hide record */
123 :
124 684309 : fd_funk_rec_query_t query[1];
125 684309 : int remove_err = fd_funk_rec_map_remove( funk->rec_map, &pair, NULL, query, FD_MAP_FLAG_BLOCKING );
126 684309 : if( FD_UNLIKELY( remove_err ) ) FD_LOG_CRIT(( "fd_funk_rec_map_remove failed: %i-%s", remove_err, fd_map_strerror( remove_err ) ));
127 684309 : if( FD_UNLIKELY( query->ele!=rec ) ) FD_LOG_CRIT(( "Found duplicate record in map idx[0]=%p idx[1]=%p", (void *)query->ele, (void *)rec ));
128 :
129 : /* Phase 3.2: Mark record as invalid */
130 :
131 684309 : FD_COMPILER_MFENCE();
132 684309 : memset( &rec->pair, 0, sizeof(fd_funk_xid_key_pair_t) );
133 684309 : FD_COMPILER_MFENCE();
134 :
135 : /* Phase 3.3: Free record */
136 :
137 684309 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
138 684309 : rec->next_idx = FD_FUNK_REC_IDX_NULL;
139 684309 : rec->prev_idx = FD_FUNK_REC_IDX_NULL;
140 684309 : funk->rec_lock[ rec_idx ] = fd_funk_rec_ver_lock( fd_funk_rec_ver_inc( fd_funk_rec_ver_bits( funk->rec_lock[ rec_idx ] ) ), 0UL );
141 684309 : fd_funk_rec_pool_release( funk->rec_pool, rec, 1 );
142 684309 : rec_idx = next_idx;
143 684309 : rec_cnt++;
144 684309 : }
145 347136 : admin->base.revert_cnt += rec_cnt;
146 347136 : if( FD_LIKELY( fd_accdb_log_enabled ) )
147 347136 : FD_LOG_INFO(( "accdb freed %lu records while cancelling txn %lu:%lu",
148 347136 : rec_cnt, txn->xid.ul[0], txn->xid.ul[1] ));
149 :
150 : /* Phase 4: Remove transaction from fork graph */
151 :
152 347136 : uint self_cidx = fd_funk_txn_cidx( (ulong)( txn-funk->txn_pool->ele ) );
153 347136 : uint prev_cidx = txn->sibling_prev_cidx; ulong prev_idx = fd_funk_txn_idx( prev_cidx );
154 347136 : uint next_cidx = txn->sibling_next_cidx; ulong next_idx = fd_funk_txn_idx( next_cidx );
155 347136 : if( !fd_funk_txn_idx_is_null( next_idx ) ) {
156 156813 : funk->txn_pool->ele[ next_idx ].sibling_prev_cidx = prev_cidx;
157 156813 : }
158 347136 : if( !fd_funk_txn_idx_is_null( prev_idx ) ) {
159 109587 : funk->txn_pool->ele[ prev_idx ].sibling_next_cidx = next_cidx;
160 109587 : }
161 347136 : if( !fd_funk_txn_idx_is_null( fd_funk_txn_idx( txn->parent_cidx ) ) ) {
162 169860 : fd_funk_txn_t * parent = &funk->txn_pool->ele[ fd_funk_txn_idx( txn->parent_cidx ) ];
163 169860 : if( parent->child_head_cidx==self_cidx ) parent->child_head_cidx = next_cidx;
164 169860 : if( parent->child_tail_cidx==self_cidx ) parent->child_tail_cidx = prev_cidx;
165 177276 : } else {
166 177276 : if( funk->shmem->child_head_cidx==self_cidx ) funk->shmem->child_head_cidx = next_cidx;
167 177276 : if( funk->shmem->child_tail_cidx==self_cidx ) funk->shmem->child_tail_cidx = prev_cidx;
168 177276 : }
169 :
170 : /* Phase 5: Remove transaction from index */
171 :
172 347136 : fd_funk_txn_map_query_t query[1];
173 347136 : int remove_err = fd_funk_txn_map_remove( funk->txn_map, &txn->xid, NULL, query, FD_MAP_FLAG_BLOCKING );
174 347136 : if( FD_UNLIKELY( remove_err!=FD_MAP_SUCCESS ) ) {
175 0 : FD_LOG_CRIT(( "fd_accdb_txn_cancel failed: fd_funk_txn_map_remove(%lu:%lu) failed: %i-%s",
176 0 : txn->xid.ul[0], txn->xid.ul[1], remove_err, fd_map_strerror( remove_err ) ));
177 0 : }
178 :
179 : /* Phase 6: Free transaction object */
180 :
181 347136 : txn->parent_cidx = UINT_MAX;
182 347136 : txn->sibling_prev_cidx = UINT_MAX;
183 347136 : txn->sibling_next_cidx = UINT_MAX;
184 347136 : fd_rwlock_unwrite( &funk->txn_lock[ txn_idx ] );
185 347136 : FD_VOLATILE( txn->state ) = FD_FUNK_TXN_STATE_FREE;
186 347136 : fd_funk_txn_pool_release( funk->txn_pool, txn, 1 );
187 347136 : }
188 :
189 : /* Cancels txn and all children */
190 :
191 : static void
192 : fd_accdb_txn_cancel_tree( fd_accdb_admin_v1_t * accdb,
193 347136 : fd_funk_txn_t * txn ) {
194 516915 : for(;;) {
195 516915 : ulong child_idx = fd_funk_txn_idx( txn->child_head_cidx );
196 516915 : if( fd_funk_txn_idx_is_null( child_idx ) ) break;
197 169779 : fd_funk_txn_t * child = &accdb->funk->txn_pool->ele[ child_idx ];
198 169779 : fd_accdb_txn_cancel_tree( accdb, child );
199 169779 : }
200 347136 : fd_accdb_txn_cancel_one( accdb, txn );
201 347136 : }
202 :
203 : /* Cancels all left/right siblings */
204 :
205 : static void
206 : fd_accdb_txn_cancel_prev_list( fd_accdb_admin_v1_t * accdb,
207 224709 : fd_funk_txn_t * txn ) {
208 224709 : ulong self_idx = (ulong)( txn - accdb->funk->txn_pool->ele );
209 313776 : for(;;) {
210 313776 : ulong prev_idx = fd_funk_txn_idx( txn->sibling_prev_cidx );
211 313776 : if( FD_UNLIKELY( prev_idx==self_idx ) ) FD_LOG_CRIT(( "detected cycle in fork graph" ));
212 313776 : if( fd_funk_txn_idx_is_null( prev_idx ) ) break;
213 89067 : fd_funk_txn_t * sibling = &accdb->funk->txn_pool->ele[ prev_idx ];
214 89067 : fd_accdb_txn_cancel_tree( accdb, sibling );
215 89067 : }
216 224709 : }
217 :
218 : static void
219 : fd_accdb_txn_cancel_next_list( fd_accdb_admin_v1_t * accdb,
220 224709 : fd_funk_txn_t * txn ) {
221 224709 : ulong self_idx = (ulong)( txn - accdb->funk->txn_pool->ele );
222 312897 : for(;;) {
223 312897 : ulong next_idx = fd_funk_txn_idx( txn->sibling_next_cidx );
224 312897 : if( FD_UNLIKELY( next_idx==self_idx ) ) FD_LOG_CRIT(( "detected cycle in fork graph" ));
225 312897 : if( fd_funk_txn_idx_is_null( next_idx ) ) break;
226 88188 : fd_funk_txn_t * sibling = &accdb->funk->txn_pool->ele[ next_idx ];
227 88188 : fd_accdb_txn_cancel_tree( accdb, sibling );
228 88188 : }
229 224709 : }
230 :
231 : void
232 : fd_accdb_txn_cancel_siblings( fd_accdb_admin_v1_t * accdb,
233 224709 : fd_funk_txn_t * txn ) {
234 224709 : fd_accdb_txn_cancel_prev_list( accdb, txn );
235 224709 : fd_accdb_txn_cancel_next_list( accdb, txn );
236 224709 : txn->sibling_prev_cidx = UINT_MAX;
237 224709 : txn->sibling_next_cidx = UINT_MAX;
238 224709 : }
239 :
240 : void
241 : fd_accdb_v1_cancel( fd_accdb_admin_t * accdb_,
242 102 : fd_funk_txn_xid_t const * xid ) {
243 102 : fd_accdb_admin_v1_t * accdb = downcast( accdb_ );
244 102 : fd_funk_t * funk = accdb->funk;
245 :
246 : /* Assume no concurrent access to txn_map */
247 :
248 102 : fd_funk_txn_map_query_t query[1];
249 102 : int query_err = fd_funk_txn_map_query_try( funk->txn_map, xid, NULL, query, 0 );
250 102 : if( FD_UNLIKELY( query_err ) ) {
251 0 : FD_LOG_CRIT(( "fd_accdb_cancel failed: fd_funk_txn_map_query_try(xid=%lu:%lu) returned (%i-%s)",
252 0 : xid->ul[0], xid->ul[1], query_err, fd_map_strerror( query_err ) ));
253 0 : }
254 102 : fd_funk_txn_t * txn = fd_funk_txn_map_query_ele( query );
255 :
256 102 : fd_accdb_txn_cancel_tree( accdb, txn );
257 102 : }
258 :
259 : /* fd_accdb_chain_reclaim "reclaims" a zero-lamport account by removing
260 : its underlying record. */
261 :
262 : static void
263 : fd_accdb_chain_reclaim( fd_accdb_admin_v1_t * accdb,
264 3 : fd_funk_rec_t * rec ) {
265 3 : fd_funk_t * funk = accdb->funk;
266 :
267 : /* Phase 1: Remove record from map */
268 :
269 3 : fd_funk_xid_key_pair_t pair = rec->pair;
270 3 : fd_funk_rec_query_t query[1];
271 3 : int rm_err = fd_funk_rec_map_remove( funk->rec_map, &pair, NULL, query, FD_MAP_FLAG_BLOCKING );
272 3 : if( FD_UNLIKELY( rm_err!=FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "fd_funk_rec_map_remove failed (%i-%s)", rm_err, fd_map_strerror( rm_err ) ));
273 3 : FD_COMPILER_MFENCE();
274 :
275 : /* Phase 2: Invalidate record */
276 :
277 3 : fd_funk_rec_t * old_rec = query->ele;
278 3 : memset( &old_rec->pair, 0, sizeof(fd_funk_xid_key_pair_t) );
279 3 : FD_COMPILER_MFENCE();
280 :
281 : /* Phase 3: Free record */
282 :
283 3 : old_rec->map_next = FD_FUNK_REC_IDX_NULL;
284 3 : fd_funk_val_flush( old_rec, funk->alloc, funk->wksp );
285 3 : fd_funk_rec_pool_release( funk->rec_pool, old_rec, 1 );
286 3 : accdb->base.reclaim_cnt++;
287 3 : }
288 :
289 : /* fd_accdb_chain_gc_root cleans up a stale "rooted" version of a
290 : record. */
291 :
292 : static void
293 : fd_accdb_chain_gc_root( fd_accdb_admin_v1_t * accdb,
294 593463 : fd_funk_xid_key_pair_t const * pair ) {
295 593463 : fd_funk_t * funk = accdb->funk;
296 :
297 : /* Phase 1: Remove record from map if found */
298 :
299 593463 : fd_funk_rec_query_t query[1];
300 593463 : int rm_err = fd_funk_rec_map_remove( funk->rec_map, pair, NULL, query, FD_MAP_FLAG_BLOCKING );
301 593463 : if( rm_err==FD_MAP_ERR_KEY ) return;
302 593226 : if( FD_UNLIKELY( rm_err!=FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "fd_funk_rec_map_remove failed (%i-%s)", rm_err, fd_map_strerror( rm_err ) ));
303 593226 : FD_COMPILER_MFENCE();
304 :
305 : /* Phase 2: Invalidate record */
306 :
307 593226 : fd_funk_rec_t * old_rec = query->ele;
308 593226 : memset( &old_rec->pair, 0, sizeof(fd_funk_xid_key_pair_t) );
309 593226 : FD_COMPILER_MFENCE();
310 :
311 : /* Phase 3: Free record */
312 :
313 593226 : old_rec->map_next = FD_FUNK_REC_IDX_NULL;
314 593226 : fd_funk_val_flush( old_rec, funk->alloc, funk->wksp );
315 593226 : fd_funk_rec_pool_release( funk->rec_pool, old_rec, 1 );
316 593226 : accdb->base.gc_root_cnt++;
317 593226 : }
318 :
319 : /* fd_accdb_publish_recs moves all records in a transaction to the DB
320 : root. Currently, the DB root is stored by funk, which might change
321 : in the future.
322 :
323 : It is assumed at this point that the txn has no more concurrent
324 : users. */
325 :
326 : static void
327 : fd_accdb_publish_recs( fd_accdb_admin_v1_t * accdb,
328 224709 : fd_funk_txn_t * txn ) {
329 : /* Iterate record list */
330 224709 : uint head = txn->rec_head_idx;
331 224709 : txn->rec_head_idx = FD_FUNK_REC_IDX_NULL;
332 224709 : txn->rec_tail_idx = FD_FUNK_REC_IDX_NULL;
333 224709 : fd_wksp_t * funk_wksp = accdb->funk->wksp;
334 818172 : while( !fd_funk_rec_idx_is_null( head ) ) {
335 593463 : fd_funk_rec_t * rec = &accdb->funk->rec_pool->ele[ head ];
336 :
337 : /* Evict previous value from hash chain */
338 593463 : fd_funk_xid_key_pair_t pair[1];
339 593463 : fd_funk_rec_key_copy( pair->key, rec->pair.key );
340 593463 : fd_funk_txn_xid_set_root( pair->xid );
341 593463 : fd_accdb_chain_gc_root( accdb, pair );
342 :
343 : /* Root or reclaim record */
344 593463 : uint next = rec->next_idx;
345 593463 : fd_account_meta_t const * meta = fd_funk_val( rec, funk_wksp );
346 593463 : FD_CRIT( meta && rec->val_sz>=sizeof(fd_account_meta_t), "invalid funk record value" );
347 593463 : if( !meta->lamports ) {
348 : /* Remove record */
349 3 : fd_accdb_chain_reclaim( accdb, rec );
350 593460 : } else {
351 : /* Migrate record to root */
352 593460 : rec->prev_idx = FD_FUNK_REC_IDX_NULL;
353 593460 : rec->next_idx = FD_FUNK_REC_IDX_NULL;
354 593460 : fd_funk_txn_xid_t const root = { .ul = { ULONG_MAX, ULONG_MAX } };
355 593460 : fd_funk_txn_xid_st_atomic( rec->pair.xid, &root );
356 593460 : accdb->base.root_cnt++;
357 593460 : accdb->base.root_tot_sz += rec->val_sz;
358 593460 : }
359 :
360 593463 : head = next; /* next record */
361 593463 : }
362 224709 : }
363 :
364 : /* fd_accdb_txn_publish_one merges an in-prep transaction whose
365 : parent is the last published, into the parent. */
366 :
367 : static void
368 : fd_accdb_txn_publish_one( fd_accdb_admin_v1_t * accdb,
369 224709 : fd_funk_txn_t * txn ) {
370 224709 : fd_funk_t * funk = accdb->funk;
371 :
372 : /* Phase 1: Mark transaction as "last published" */
373 :
374 224709 : fd_funk_txn_xid_t xid[1]; fd_funk_txn_xid_copy( xid, fd_funk_txn_xid( txn ) );
375 224709 : if( FD_UNLIKELY( !fd_funk_txn_idx_is_null( fd_funk_txn_idx( txn->parent_cidx ) ) ) ) {
376 0 : FD_LOG_CRIT(( "fd_accdb_publish failed: txn with xid %lu:%lu is not a child of the last published txn", xid->ul[0], xid->ul[1] ));
377 0 : }
378 224709 : fd_funk_txn_xid_st_atomic( funk->shmem->last_publish, xid );
379 224709 : if( FD_LIKELY( fd_accdb_log_enabled ) )
380 224709 : FD_LOG_INFO(( "accdb txn laddr=%p xid %lu:%lu: publish", (void *)txn, txn->xid.ul[0], txn->xid.ul[1] ));
381 :
382 : /* Phase 2: Drain users from transaction */
383 :
384 224709 : ulong txn_idx = (ulong)( txn - funk->txn_pool->ele );
385 224709 : fd_rwlock_write( &funk->txn_lock[ txn_idx ] );
386 224709 : FD_VOLATILE( txn->state ) = FD_FUNK_TXN_STATE_PUBLISH;
387 :
388 : /* Phase 3: Migrate records */
389 :
390 224709 : fd_accdb_publish_recs( accdb, txn );
391 :
392 : /* Phase 4: Remove transaction from fork graph
393 :
394 : Because the transaction has no more records, removing it from the
395 : fork graph has no visible side effects to concurrent query ops
396 : (always return "no found") or insert ops (refuse to write to a
397 : "publish" state txn). */
398 :
399 224709 : { /* Adjust the parent pointers of the children to point to "last published" */
400 224709 : ulong child_idx = fd_funk_txn_idx( txn->child_head_cidx );
401 372258 : while( FD_UNLIKELY( !fd_funk_txn_idx_is_null( child_idx ) ) ) {
402 147549 : funk->txn_pool->ele[ child_idx ].parent_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
403 147549 : child_idx = fd_funk_txn_idx( funk->txn_pool->ele[ child_idx ].sibling_next_cidx );
404 147549 : }
405 224709 : }
406 :
407 : /* Phase 5: Remove transaction from index
408 :
409 : The transaction is now an orphan and won't get any new records. */
410 :
411 224709 : fd_funk_txn_map_query_t query[1];
412 224709 : int remove_err = fd_funk_txn_map_remove( funk->txn_map, xid, NULL, query, 0 );
413 224709 : if( FD_UNLIKELY( remove_err!=FD_MAP_SUCCESS ) ) {
414 0 : FD_LOG_CRIT(( "fd_accdb_publish failed: fd_funk_txn_map_remove failed: %i-%s", remove_err, fd_map_strerror( remove_err ) ));
415 0 : }
416 :
417 : /* Phase 6: Free transaction object */
418 :
419 224709 : fd_rwlock_unwrite( &funk->txn_lock[ txn_idx ] );
420 224709 : FD_VOLATILE( txn->state ) = FD_FUNK_TXN_STATE_FREE;
421 224709 : txn->parent_cidx = UINT_MAX;
422 224709 : txn->sibling_prev_cidx = UINT_MAX;
423 224709 : txn->sibling_next_cidx = UINT_MAX;
424 224709 : txn->child_head_cidx = UINT_MAX;
425 224709 : txn->child_tail_cidx = UINT_MAX;
426 224709 : fd_funk_txn_pool_release( funk->txn_pool, txn, 1 );
427 224709 : }
428 :
429 : void
430 : fd_accdb_v1_advance_root( fd_accdb_admin_t * accdb_,
431 224709 : fd_funk_txn_xid_t const * xid ) {
432 224709 : fd_accdb_admin_v1_t * accdb = downcast( accdb_ );
433 224709 : fd_funk_t * funk = accdb->funk;
434 :
435 : /* Assume no concurrent access to txn_map */
436 :
437 224709 : long dt = -fd_tickcount();
438 224709 : fd_funk_txn_map_query_t query[1];
439 224709 : int query_err = fd_funk_txn_map_query_try( funk->txn_map, xid, NULL, query, 0 );
440 224709 : if( FD_UNLIKELY( query_err ) ) {
441 0 : FD_LOG_CRIT(( "fd_accdb_advance_root failed: fd_funk_txn_map_query_try(xid=%lu:%lu) returned (%i-%s)",
442 0 : xid->ul[0], xid->ul[1], query_err, fd_map_strerror( query_err ) ));
443 0 : }
444 224709 : fd_funk_txn_t * txn = fd_funk_txn_map_query_ele( query );
445 :
446 224709 : if( FD_UNLIKELY( !fd_funk_txn_idx_is_null( fd_funk_txn_idx( txn->parent_cidx ) ) ) ) {
447 0 : FD_LOG_CRIT(( "fd_accdb_txn_advance_root: parent of txn %lu:%lu is not root", xid->ul[0], xid->ul[1] ));
448 0 : }
449 :
450 224709 : if( FD_LIKELY( fd_accdb_log_enabled ) )
451 224709 : FD_LOG_INFO(( "accdb txn laddr=%p xid %lu:%lu: advancing root",
452 224709 : (void *)txn,
453 224709 : xid->ul[0], xid->ul[1] ));
454 :
455 224709 : fd_accdb_txn_cancel_siblings( accdb, txn );
456 :
457 : /* Children of transaction are now children of root */
458 224709 : funk->shmem->child_head_cidx = txn->child_head_cidx;
459 224709 : funk->shmem->child_tail_cidx = txn->child_tail_cidx;
460 :
461 224709 : fd_accdb_txn_publish_one( accdb, txn );
462 :
463 224709 : dt += fd_tickcount();
464 224709 : accdb->base.dt_gc += dt;
465 224709 : }
466 :
467 : /* reset_rec_map frees all records in a funk instance. */
468 :
469 : static void
470 21 : reset_rec_map( fd_funk_t * funk ) {
471 21 : fd_wksp_t * wksp = funk->wksp;
472 21 : fd_alloc_t * alloc = funk->alloc;
473 21 : fd_funk_rec_map_t * rec_map = funk->rec_map;
474 21 : fd_funk_rec_pool_t * rec_pool = funk->rec_pool;
475 :
476 21 : ulong chain_cnt = fd_funk_rec_map_chain_cnt( rec_map );
477 1077 : for( ulong chain_idx=0UL; chain_idx<chain_cnt; chain_idx++ ) {
478 1056 : for(
479 1056 : fd_funk_rec_map_iter_t iter = fd_funk_rec_map_iter( rec_map, chain_idx );
480 1311 : !fd_funk_rec_map_iter_done( iter );
481 1056 : ) {
482 255 : fd_funk_rec_t * rec = fd_funk_rec_map_iter_ele( iter );
483 255 : ulong next = fd_funk_rec_map_private_idx( rec->map_next );;
484 :
485 : /* Remove rec object from map */
486 255 : fd_funk_rec_map_query_t rec_query[1];
487 255 : int err = fd_funk_rec_map_remove( rec_map, fd_funk_rec_pair( rec ), NULL, rec_query, FD_MAP_FLAG_BLOCKING );
488 255 : fd_funk_rec_key_t key; fd_funk_rec_key_copy( &key, rec->pair.key );
489 255 : if( FD_UNLIKELY( err!=FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "fd_funk_rec_map_remove failed (%i-%s)", err, fd_map_strerror( err ) ));
490 :
491 : /* Free rec resources */
492 255 : rec->map_next = FD_FUNK_REC_IDX_NULL;
493 255 : rec->next_idx = FD_FUNK_REC_IDX_NULL;
494 255 : rec->prev_idx = FD_FUNK_REC_IDX_NULL;
495 255 : memset( &rec->pair, 0, sizeof(fd_funk_xid_key_pair_t) );
496 255 : fd_funk_val_flush( rec, alloc, wksp );
497 255 : fd_funk_rec_pool_release( rec_pool, rec, 1 );
498 255 : iter.ele_idx = next;
499 255 : }
500 1056 : }
501 21 : }
502 :
503 : /* clear_txn_list does a depth-first traversal of the txn tree.
504 : Removes all txns. */
505 :
506 : static void
507 : clear_txn_list( fd_funk_t * funk,
508 33 : ulong txn_head_idx ) {
509 33 : fd_funk_txn_pool_t * txn_pool = funk->txn_pool;
510 33 : fd_funk_txn_map_t * txn_map = funk->txn_map;
511 33 : for( ulong idx = txn_head_idx;
512 45 : !fd_funk_txn_idx_is_null( idx );
513 33 : ) {
514 12 : fd_funk_txn_t * txn = &txn_pool->ele[ idx ];
515 12 : fd_funk_txn_state_assert( txn, FD_FUNK_TXN_STATE_ACTIVE );
516 12 : ulong next_idx = fd_funk_txn_idx( txn->sibling_next_cidx );
517 12 : ulong child_idx = fd_funk_txn_idx( txn->child_head_cidx );
518 12 : txn->rec_head_idx = FD_FUNK_REC_IDX_NULL;
519 12 : txn->rec_tail_idx = FD_FUNK_REC_IDX_NULL;
520 12 : txn->child_head_cidx = UINT_MAX;
521 12 : txn->child_tail_cidx = UINT_MAX;
522 12 : txn->parent_cidx = UINT_MAX;
523 12 : txn->sibling_prev_cidx = UINT_MAX;
524 12 : txn->sibling_next_cidx = UINT_MAX;
525 12 : clear_txn_list( funk, child_idx );
526 12 : fd_funk_txn_map_query_t query[1];
527 12 : int rm_err = fd_funk_txn_map_remove( txn_map, &txn->xid, NULL, query, FD_MAP_FLAG_BLOCKING );
528 12 : if( FD_UNLIKELY( rm_err!=FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "fd_funk_txn_map_remove failed (%i-%s)", rm_err, fd_map_strerror( rm_err ) ));
529 12 : txn->state = FD_FUNK_TXN_STATE_FREE;
530 12 : int free_err = fd_funk_txn_pool_release( txn_pool, txn, 1 );
531 12 : if( FD_UNLIKELY( free_err!=FD_POOL_SUCCESS ) ) FD_LOG_CRIT(( "fd_funk_txn_pool_release failed (%i)", free_err ));
532 12 : idx = next_idx;
533 12 : }
534 33 : }
535 :
536 : void
537 21 : fd_accdb_v1_clear( fd_accdb_admin_t * accdb_ ) {
538 21 : fd_accdb_admin_v1_t * accdb = downcast( accdb_ );
539 21 : fd_funk_t * funk = accdb->funk;
540 21 : clear_txn_list( funk, fd_funk_txn_idx( funk->shmem->child_head_cidx ) );
541 21 : funk->shmem->child_head_cidx = UINT_MAX;
542 21 : funk->shmem->child_tail_cidx = UINT_MAX;
543 21 : reset_rec_map( funk );
544 21 : }
545 :
546 : void
547 198 : fd_accdb_v1_verify( fd_accdb_admin_t * accdb_ ) {
548 198 : fd_accdb_admin_v1_t * accdb = downcast( accdb_ );
549 198 : FD_TEST( fd_funk_verify( accdb->funk )==FD_FUNK_SUCCESS );
550 198 : }
551 :
552 : fd_accdb_admin_vt_t const fd_accdb_admin_v1_vt = {
553 : .fini = fd_accdb_admin_v1_fini,
554 : .root_get = fd_accdb_v1_root_get,
555 : .attach_child = fd_accdb_v1_attach_child,
556 : .advance_root = fd_accdb_v1_advance_root,
557 : .cancel = fd_accdb_v1_cancel
558 : };
|