Line data Source code
1 : #include "fd_accdb_admin_v2_private.h"
2 :
3 : FD_STATIC_ASSERT( alignof(fd_accdb_admin_v2_t)<=alignof(fd_accdb_admin_t), layout );
4 : FD_STATIC_ASSERT( sizeof (fd_accdb_admin_v2_t)<=sizeof(fd_accdb_admin_t), layout );
5 :
6 : fd_accdb_admin_t *
7 : fd_accdb_admin_v2_init( fd_accdb_admin_t * accdb_,
8 : void * shfunk,
9 : void * vinyl_rq,
10 : void * vinyl_data,
11 : void * vinyl_req_pool,
12 0 : ulong vinyl_link_id ) {
13 : /* Call superclass constructor */
14 0 : if( FD_UNLIKELY( !fd_accdb_admin_v1_init( accdb_, shfunk ) ) ) {
15 0 : return NULL;
16 0 : }
17 0 : if( FD_UNLIKELY( !vinyl_data ) ) {
18 0 : FD_LOG_WARNING(( "NULL vinyl_data" ));
19 0 : return NULL;
20 0 : }
21 :
22 0 : fd_vinyl_rq_t * rq = fd_vinyl_rq_join( vinyl_rq );
23 0 : fd_vinyl_req_pool_t * req_pool = fd_vinyl_req_pool_join( vinyl_req_pool );
24 0 : if( FD_UNLIKELY( !rq || !req_pool ) ) {
25 : /* component joins log warning if this is reached */
26 0 : FD_LOG_WARNING(( "Failed to initialize database client" ));
27 0 : return NULL;
28 0 : }
29 :
30 0 : fd_accdb_admin_v2_t * accdb = fd_type_pun( accdb_ );
31 0 : accdb->vinyl_req_id = 0UL;
32 0 : accdb->vinyl_rq = rq;
33 0 : accdb->vinyl_link_id = vinyl_link_id;
34 0 : accdb->vinyl_data_wksp = vinyl_data;
35 0 : accdb->vinyl_req_wksp = fd_wksp_containing( req_pool );
36 0 : accdb->vinyl_req_pool = req_pool;
37 0 : accdb->base.accdb_type = FD_ACCDB_TYPE_V2;
38 0 : accdb->base.vt = &fd_accdb_admin_v2_vt;
39 0 : return accdb_;
40 0 : }
41 :
42 : static fd_accdb_admin_v2_t *
43 0 : downcast( fd_accdb_admin_t * admin ) {
44 0 : if( FD_UNLIKELY( !admin ) ) {
45 0 : FD_LOG_CRIT(( "NULL admin" ));
46 0 : }
47 0 : if( FD_UNLIKELY( admin->base.accdb_type!=FD_ACCDB_TYPE_V2 ) ) {
48 0 : FD_LOG_CRIT(( "corrupt accdb_admin handle" ));
49 0 : }
50 0 : return (fd_accdb_admin_v2_t *)admin;
51 0 : }
52 :
53 : void
54 0 : fd_accdb_admin_v2_fini( fd_accdb_admin_t * admin_ ) {
55 0 : fd_accdb_admin_v2_t * admin = downcast( admin_ );
56 :
57 0 : fd_vinyl_rq_leave( admin->vinyl_rq );
58 :
59 : /* superclass destructor */
60 0 : admin->base.accdb_type = FD_ACCDB_TYPE_V1;
61 0 : fd_accdb_admin_v1_fini( admin_ );
62 0 : }
63 :
64 : fd_funk_txn_xid_t
65 0 : fd_accdb_v2_root_get( fd_accdb_admin_t const * admin ) {
66 0 : return fd_accdb_v1_root_get( admin );
67 0 : }
68 :
69 : void
70 : fd_accdb_v2_attach_child( fd_accdb_admin_t * admin_,
71 : fd_funk_txn_xid_t const * xid_parent,
72 0 : fd_funk_txn_xid_t const * xid_new ) {
73 0 : fd_accdb_admin_v1_t * db = downcast( admin_ )->v1;
74 0 : FD_LOG_INFO(( "accdb txn xid %lu:%lu: created with parent %lu:%lu",
75 0 : xid_new ->ul[0], xid_new ->ul[1],
76 0 : xid_parent->ul[0], xid_parent->ul[1] ));
77 0 : fd_funk_txn_prepare( db->funk, xid_parent, xid_new );
78 0 : }
79 :
80 : void
81 : fd_accdb_v2_cancel( fd_accdb_admin_t * admin,
82 0 : fd_funk_txn_xid_t const * xid ) {
83 0 : fd_accdb_v1_cancel( admin, xid );
84 0 : }
85 :
86 : static void
87 : publish_recs( fd_accdb_admin_v2_t * admin,
88 0 : fd_funk_txn_t * txn ) {
89 0 : fd_funk_rec_t * rec_pool = admin->v1->funk->rec_pool->ele;
90 0 : fd_funk_rec_t * head = !fd_funk_rec_idx_is_null( txn->rec_head_idx ) ?
91 0 : &rec_pool[ txn->rec_head_idx ] : NULL;
92 0 : txn->rec_head_idx = FD_FUNK_REC_IDX_NULL;
93 0 : txn->rec_tail_idx = FD_FUNK_REC_IDX_NULL;
94 0 : while( head ) {
95 0 : head = fd_accdb_v2_root_batch( admin, head );
96 0 : }
97 0 : }
98 :
99 : static void
100 : txn_unregister( fd_funk_t * funk,
101 0 : fd_funk_txn_t * txn ) {
102 0 : ulong child_idx = fd_funk_txn_idx( txn->child_head_cidx );
103 0 : while( FD_UNLIKELY( !fd_funk_txn_idx_is_null( child_idx ) ) ) {
104 0 : funk->txn_pool->ele[ child_idx ].parent_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
105 0 : child_idx = fd_funk_txn_idx( funk->txn_pool->ele[ child_idx ].sibling_next_cidx );
106 0 : }
107 :
108 0 : fd_funk_txn_xid_t xid[1]; fd_funk_txn_xid_copy( xid, fd_funk_txn_xid( txn ) );
109 0 : fd_funk_txn_map_query_t query[1];
110 0 : int remove_err = fd_funk_txn_map_remove( funk->txn_map, xid, NULL, query, 0 );
111 0 : if( FD_UNLIKELY( remove_err!=FD_MAP_SUCCESS ) ) {
112 0 : FD_LOG_CRIT(( "fd_accdb_publish failed: fd_funk_txn_map_remove failed: %i-%s", remove_err, fd_map_strerror( remove_err ) ));
113 0 : }
114 0 : }
115 :
116 : static void
117 : txn_free( fd_funk_t * funk,
118 0 : fd_funk_txn_t * txn ) {
119 0 : FD_VOLATILE( txn->state ) = FD_FUNK_TXN_STATE_FREE;
120 0 : txn->parent_cidx = UINT_MAX;
121 0 : txn->sibling_prev_cidx = UINT_MAX;
122 0 : txn->sibling_next_cidx = UINT_MAX;
123 0 : txn->child_head_cidx = UINT_MAX;
124 0 : txn->child_tail_cidx = UINT_MAX;
125 0 : fd_funk_txn_pool_release( funk->txn_pool, txn, 1 );
126 0 : }
127 :
128 : static void
129 : fd_accdb_txn_publish_one( fd_accdb_admin_v2_t * accdb,
130 0 : fd_funk_txn_t * txn ) {
131 0 : fd_funk_t * funk = accdb->v1->funk;
132 :
133 : /* Children of transaction are now children of root */
134 0 : funk->shmem->child_head_cidx = txn->child_head_cidx;
135 0 : funk->shmem->child_tail_cidx = txn->child_tail_cidx;
136 :
137 : /* Phase 1: Mark transaction as "last published" */
138 :
139 0 : fd_funk_txn_xid_t xid[1]; fd_funk_txn_xid_copy( xid, fd_funk_txn_xid( txn ) );
140 0 : if( FD_UNLIKELY( !fd_funk_txn_idx_is_null( fd_funk_txn_idx( txn->parent_cidx ) ) ) ) {
141 0 : FD_LOG_CRIT(( "fd_accdb_txn_advance_root: parent of txn %lu:%lu is not root", xid->ul[0], xid->ul[1] ));
142 0 : }
143 0 : fd_funk_txn_xid_st_atomic( funk->shmem->last_publish, xid );
144 0 : FD_LOG_INFO(( "accdb txn laddr=%p xid %lu:%lu: publish", (void *)txn, txn->xid.ul[0], txn->xid.ul[1] ));
145 :
146 : /* Phase 2: Drain users from transaction */
147 :
148 0 : fd_rwlock_write( txn->lock );
149 0 : FD_VOLATILE( txn->state ) = FD_FUNK_TXN_STATE_PUBLISH;
150 :
151 : /* Phase 3: Move records from funk to vinyl */
152 :
153 0 : publish_recs( accdb, txn );
154 :
155 : /* Phase 4: Unregister transaction */
156 :
157 0 : txn_unregister( funk, txn );
158 :
159 : /* Phase 5: Free transaction object */
160 :
161 0 : fd_rwlock_unwrite( txn->lock );
162 0 : txn_free( funk, txn );
163 0 : }
164 :
165 : void
166 : fd_accdb_v2_advance_root( fd_accdb_admin_t * accdb_,
167 0 : fd_funk_txn_xid_t const * xid ) {
168 0 : fd_accdb_admin_v2_t * accdb = downcast( accdb_ );
169 0 : fd_funk_t * funk = accdb->v1->funk;
170 :
171 0 : fd_accdb_lineage_set_fork( accdb->root_lineage, funk, xid );
172 :
173 : /* Assume no concurrent access to txn_map */
174 :
175 0 : fd_funk_txn_map_query_t query[1];
176 0 : int query_err = fd_funk_txn_map_query_try( funk->txn_map, xid, NULL, query, 0 );
177 0 : if( FD_UNLIKELY( query_err ) ) {
178 0 : FD_LOG_CRIT(( "fd_accdb_advance_root failed: fd_funk_txn_map_query_try(xid=%lu:%lu) returned (%i-%s)",
179 0 : xid->ul[0], xid->ul[1], query_err, fd_map_strerror( query_err ) ));
180 0 : }
181 0 : fd_funk_txn_t * txn = fd_funk_txn_map_query_ele( query );
182 :
183 0 : FD_LOG_INFO(( "accdb txn laddr=%p xid %lu:%lu: advancing root",
184 0 : (void *)txn,
185 0 : xid->ul[0], xid->ul[1] ));
186 :
187 0 : fd_accdb_txn_cancel_siblings( accdb->v1, txn );
188 :
189 0 : fd_accdb_lineage_t * lineage = accdb->root_lineage;
190 0 : fd_funk_txn_xid_t oldest_xid = lineage->fork[ lineage->fork_depth-1UL ];
191 0 : if( fd_funk_txn_xid_eq_root( &oldest_xid ) && lineage->fork_depth>1UL ) {
192 0 : oldest_xid = lineage->fork[ lineage->fork_depth-2UL ];
193 0 : }
194 :
195 0 : ulong delay = xid->ul[0] - oldest_xid.ul[0];
196 0 : if( delay >= accdb->slot_delay ) {
197 0 : FD_LOG_INFO(( "accdb xid %lu:%lu: pruning",
198 0 : oldest_xid.ul[0], oldest_xid.ul[1] ));
199 0 : fd_funk_txn_t * oldest = &funk->txn_pool->ele[ funk->shmem->child_head_cidx ];
200 0 : FD_TEST( fd_funk_txn_xid_eq( &oldest_xid, &oldest->xid ) );
201 0 : fd_accdb_txn_publish_one( accdb, oldest );
202 0 : }
203 0 : }
204 :
205 : void
206 : fd_accdb_admin_v2_delay_set( fd_accdb_admin_t * accdb_,
207 0 : ulong slot_delay ) {
208 0 : fd_accdb_admin_v2_t * accdb = downcast( accdb_ );
209 0 : if( FD_UNLIKELY( !slot_delay ) ) FD_LOG_CRIT(( "invalid slot_delay (%lu)", slot_delay ));
210 0 : accdb->slot_delay = slot_delay;
211 0 : }
212 :
213 : fd_accdb_admin_vt_t const fd_accdb_admin_v2_vt = {
214 : .fini = fd_accdb_admin_v2_fini,
215 : .root_get = fd_accdb_v2_root_get,
216 : .attach_child = fd_accdb_v2_attach_child,
217 : .advance_root = fd_accdb_v2_advance_root,
218 : .cancel = fd_accdb_v2_cancel
219 : };
|