Line data Source code
1 : #include "fd_accdb_admin_v2_private.h"
2 :
3 : FD_STATIC_ASSERT( alignof(fd_accdb_admin_v2_t)<=alignof(fd_accdb_admin_t), layout );
4 : FD_STATIC_ASSERT( sizeof (fd_accdb_admin_v2_t)<=sizeof(fd_accdb_admin_t), layout );
5 :
6 : fd_accdb_admin_t *
7 : fd_accdb_admin_v2_init( fd_accdb_admin_t * accdb_,
8 : void * shfunk,
9 : void * shlocks,
10 : void * vinyl_rq,
11 : void * vinyl_data,
12 : void * vinyl_req_pool,
13 : ulong vinyl_link_id,
14 0 : ulong max_depth ) {
15 : /* Call superclass constructor */
16 0 : if( FD_UNLIKELY( !fd_accdb_admin_v1_init( accdb_, shfunk, shlocks ) ) ) {
17 0 : return NULL;
18 0 : }
19 0 : if( FD_UNLIKELY( !vinyl_data ) ) {
20 0 : FD_LOG_WARNING(( "NULL vinyl_data" ));
21 0 : return NULL;
22 0 : }
23 :
24 0 : fd_vinyl_rq_t * rq = fd_vinyl_rq_join( vinyl_rq );
25 0 : fd_vinyl_req_pool_t * req_pool = fd_vinyl_req_pool_join( vinyl_req_pool );
26 0 : if( FD_UNLIKELY( !rq || !req_pool ) ) {
27 : /* component joins log warning if this is reached */
28 0 : FD_LOG_WARNING(( "Failed to initialize database client" ));
29 0 : return NULL;
30 0 : }
31 :
32 0 : fd_accdb_admin_v2_t * accdb = fd_type_pun( accdb_ );
33 0 : accdb->root_lineage->max_depth = max_depth;
34 0 : accdb->vinyl_req_id = 0UL;
35 0 : accdb->vinyl_rq = rq;
36 0 : accdb->vinyl_link_id = vinyl_link_id;
37 0 : accdb->vinyl_data_wksp = vinyl_data;
38 0 : accdb->vinyl_req_wksp = fd_wksp_containing( req_pool );
39 0 : accdb->vinyl_req_pool = req_pool;
40 0 : accdb->base.accdb_type = FD_ACCDB_TYPE_V2;
41 0 : accdb->base.vt = &fd_accdb_admin_v2_vt;
42 0 : return accdb_;
43 0 : }
44 :
45 : static fd_accdb_admin_v2_t *
46 0 : downcast( fd_accdb_admin_t * admin ) {
47 0 : if( FD_UNLIKELY( !admin ) ) {
48 0 : FD_LOG_CRIT(( "NULL admin" ));
49 0 : }
50 0 : if( FD_UNLIKELY( admin->base.accdb_type!=FD_ACCDB_TYPE_V2 ) ) {
51 0 : FD_LOG_CRIT(( "corrupt accdb_admin handle" ));
52 0 : }
53 0 : return (fd_accdb_admin_v2_t *)admin;
54 0 : }
55 :
56 : void
57 0 : fd_accdb_admin_v2_fini( fd_accdb_admin_t * admin_ ) {
58 0 : fd_accdb_admin_v2_t * admin = downcast( admin_ );
59 :
60 0 : fd_vinyl_rq_leave( admin->vinyl_rq );
61 :
62 : /* superclass destructor */
63 0 : admin->base.accdb_type = FD_ACCDB_TYPE_V1;
64 0 : fd_accdb_admin_v1_fini( admin_ );
65 0 : }
66 :
67 : fd_funk_txn_xid_t
68 0 : fd_accdb_v2_root_get( fd_accdb_admin_t const * admin ) {
69 0 : return fd_accdb_v1_root_get( admin );
70 0 : }
71 :
72 : void
73 : fd_accdb_v2_attach_child( fd_accdb_admin_t * admin_,
74 : fd_funk_txn_xid_t const * xid_parent,
75 0 : fd_funk_txn_xid_t const * xid_new ) {
76 0 : fd_accdb_admin_v1_t * db = downcast( admin_ )->v1;
77 0 : if( FD_LIKELY( fd_accdb_log_enabled ) )
78 0 : FD_LOG_INFO(( "accdb txn xid %lu:%lu: created with parent %lu:%lu",
79 0 : xid_new ->ul[0], xid_new ->ul[1],
80 0 : xid_parent->ul[0], xid_parent->ul[1] ));
81 0 : fd_funk_txn_prepare( db->funk, xid_parent, xid_new );
82 0 : }
83 :
84 : void
85 : fd_accdb_v2_cancel( fd_accdb_admin_t * admin_,
86 0 : fd_funk_txn_xid_t const * xid ) {
87 0 : fd_accdb_admin_v2_t * admin = downcast( admin_ );
88 0 : admin->base.accdb_type = FD_ACCDB_TYPE_V1;
89 0 : fd_accdb_v1_cancel( admin_, xid );
90 0 : admin->base.accdb_type = FD_ACCDB_TYPE_V2;
91 0 : }
92 :
93 : static void
94 : publish_recs( fd_accdb_admin_v2_t * admin,
95 0 : fd_funk_txn_t * txn ) {
96 0 : fd_funk_rec_t * rec_pool = admin->v1->funk->rec_pool->ele;
97 0 : fd_funk_rec_t * head = !fd_funk_rec_idx_is_null( txn->rec_head_idx ) ?
98 0 : &rec_pool[ txn->rec_head_idx ] : NULL;
99 0 : txn->rec_head_idx = FD_FUNK_REC_IDX_NULL;
100 0 : txn->rec_tail_idx = FD_FUNK_REC_IDX_NULL;
101 0 : while( head ) {
102 0 : head = fd_accdb_v2_root_batch( admin, head );
103 0 : }
104 0 : }
105 :
106 : static void
107 : txn_unregister( fd_funk_t * funk,
108 0 : fd_funk_txn_t * txn ) {
109 0 : ulong child_idx = fd_funk_txn_idx( txn->child_head_cidx );
110 0 : while( FD_UNLIKELY( !fd_funk_txn_idx_is_null( child_idx ) ) ) {
111 0 : funk->txn_pool->ele[ child_idx ].parent_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
112 0 : child_idx = fd_funk_txn_idx( funk->txn_pool->ele[ child_idx ].sibling_next_cidx );
113 0 : }
114 :
115 0 : fd_funk_txn_xid_t xid[1]; fd_funk_txn_xid_copy( xid, fd_funk_txn_xid( txn ) );
116 0 : fd_funk_txn_map_query_t query[1];
117 0 : int remove_err = fd_funk_txn_map_remove( funk->txn_map, xid, NULL, query, 0 );
118 0 : if( FD_UNLIKELY( remove_err!=FD_MAP_SUCCESS ) ) {
119 0 : FD_LOG_CRIT(( "fd_accdb_publish failed: fd_funk_txn_map_remove failed: %i-%s", remove_err, fd_map_strerror( remove_err ) ));
120 0 : }
121 0 : }
122 :
123 : static void
124 : txn_free( fd_funk_t * funk,
125 0 : fd_funk_txn_t * txn ) {
126 0 : FD_VOLATILE( txn->state ) = FD_FUNK_TXN_STATE_FREE;
127 0 : txn->parent_cidx = UINT_MAX;
128 0 : txn->sibling_prev_cidx = UINT_MAX;
129 0 : txn->sibling_next_cidx = UINT_MAX;
130 0 : txn->child_head_cidx = UINT_MAX;
131 0 : txn->child_tail_cidx = UINT_MAX;
132 0 : fd_funk_txn_pool_release( funk->txn_pool, txn, 1 );
133 0 : }
134 :
135 : static void
136 : fd_accdb_txn_publish_one( fd_accdb_admin_v2_t * accdb,
137 0 : fd_funk_txn_t * txn ) {
138 0 : fd_funk_t * funk = accdb->v1->funk;
139 :
140 : /* Children of transaction are now children of root */
141 0 : funk->shmem->child_head_cidx = txn->child_head_cidx;
142 0 : funk->shmem->child_tail_cidx = txn->child_tail_cidx;
143 :
144 : /* Phase 1: Mark transaction as "last published" */
145 :
146 0 : fd_funk_txn_xid_t xid[1]; fd_funk_txn_xid_copy( xid, fd_funk_txn_xid( txn ) );
147 0 : if( FD_UNLIKELY( !fd_funk_txn_idx_is_null( fd_funk_txn_idx( txn->parent_cidx ) ) ) ) {
148 0 : FD_LOG_CRIT(( "fd_accdb_txn_advance_root: parent of txn %lu:%lu is not root", xid->ul[0], xid->ul[1] ));
149 0 : }
150 0 : fd_funk_txn_xid_st_atomic( funk->shmem->last_publish, xid );
151 0 : if( FD_LIKELY( fd_accdb_log_enabled ) )
152 0 : FD_LOG_INFO(( "accdb txn laddr=%p xid %lu:%lu: publish", (void *)txn, txn->xid.ul[0], txn->xid.ul[1] ));
153 :
154 : /* Phase 2: Drain users from transaction */
155 :
156 0 : ulong txn_idx = (ulong)( txn - funk->txn_pool->ele );
157 0 : fd_rwlock_write( &funk->txn_lock[ txn_idx ] );
158 0 : FD_VOLATILE( txn->state ) = FD_FUNK_TXN_STATE_PUBLISH;
159 :
160 : /* Phase 3: Move records from funk to vinyl */
161 :
162 0 : publish_recs( accdb, txn );
163 :
164 : /* Phase 4: Unregister transaction */
165 :
166 0 : txn_unregister( funk, txn );
167 :
168 : /* Phase 5: Free transaction object */
169 :
170 0 : fd_rwlock_unwrite( &funk->txn_lock[ txn_idx ] );
171 0 : txn_free( funk, txn );
172 0 : }
173 :
174 : void
175 : fd_accdb_v2_advance_root( fd_accdb_admin_t * accdb_,
176 0 : fd_funk_txn_xid_t const * xid ) {
177 0 : fd_accdb_admin_v2_t * accdb = downcast( accdb_ );
178 0 : fd_funk_t * funk = accdb->v1->funk;
179 :
180 0 : fd_accdb_lineage_set_fork( accdb->root_lineage, funk, xid );
181 :
182 : /* Assume no concurrent access to txn_map */
183 :
184 0 : fd_funk_txn_map_query_t query[1];
185 0 : int query_err = fd_funk_txn_map_query_try( funk->txn_map, xid, NULL, query, 0 );
186 0 : if( FD_UNLIKELY( query_err ) ) {
187 0 : FD_LOG_CRIT(( "fd_accdb_advance_root failed: fd_funk_txn_map_query_try(xid=%lu:%lu) returned (%i-%s)",
188 0 : xid->ul[0], xid->ul[1], query_err, fd_map_strerror( query_err ) ));
189 0 : }
190 0 : fd_funk_txn_t * txn = fd_funk_txn_map_query_ele( query );
191 :
192 0 : if( FD_LIKELY( fd_accdb_log_enabled ) )
193 0 : FD_LOG_INFO(( "accdb txn laddr=%p xid %lu:%lu: advancing root",
194 0 : (void *)txn,
195 0 : xid->ul[0], xid->ul[1] ));
196 :
197 0 : fd_accdb_txn_cancel_siblings( accdb->v1, txn );
198 :
199 0 : fd_accdb_lineage_t * lineage = accdb->root_lineage;
200 0 : fd_funk_txn_xid_t oldest_xid = lineage->fork[ lineage->fork_depth-1UL ];
201 0 : if( fd_funk_txn_xid_eq_root( &oldest_xid ) && lineage->fork_depth>1UL ) {
202 0 : oldest_xid = lineage->fork[ lineage->fork_depth-2UL ];
203 0 : }
204 :
205 0 : ulong delay = xid->ul[0] - oldest_xid.ul[0];
206 : /* genesis_override is necessary when bootstrapping from genesis,
207 : without requiring fd_accdb_admin_v2_delay_set to accept 0. */
208 0 : int genesis_override = !xid->ul[0];
209 0 : if( delay >= accdb->slot_delay || genesis_override ) {
210 0 : if( FD_LIKELY( fd_accdb_log_enabled ) )
211 0 : FD_LOG_INFO(( "accdb xid %lu:%lu: pruning",
212 0 : oldest_xid.ul[0], oldest_xid.ul[1] ));
213 0 : fd_funk_txn_t * oldest = &funk->txn_pool->ele[ funk->shmem->child_head_cidx ];
214 0 : FD_TEST( fd_funk_txn_xid_eq( &oldest_xid, &oldest->xid ) );
215 0 : fd_accdb_txn_publish_one( accdb, oldest );
216 0 : }
217 0 : }
218 :
219 : void
220 : fd_accdb_admin_v2_delay_set( fd_accdb_admin_t * accdb_,
221 0 : ulong slot_delay ) {
222 0 : fd_accdb_admin_v2_t * accdb = downcast( accdb_ );
223 0 : if( FD_UNLIKELY( !slot_delay ) ) FD_LOG_CRIT(( "invalid slot_delay (%lu)", slot_delay ));
224 0 : accdb->slot_delay = slot_delay;
225 0 : }
226 :
227 : fd_accdb_admin_vt_t const fd_accdb_admin_v2_vt = {
228 : .fini = fd_accdb_admin_v2_fini,
229 : .root_get = fd_accdb_v2_root_get,
230 : .attach_child = fd_accdb_v2_attach_child,
231 : .advance_root = fd_accdb_v2_advance_root,
232 : .cancel = fd_accdb_v2_cancel
233 : };
|