Line data Source code
1 : #include "fd_accdb_admin_v2.h"
2 : #include "fd_accdb_admin_v1.h"
3 : #include "fd_vinyl_req_pool.h"
4 : #include "../fd_flamenco_base.h"
5 : #include "../../vinyl/data/fd_vinyl_data.h"
6 :
7 : /* FD_ACCDB_ROOT_BATCH_MAX controls how many accounts to write in
8 : batches to the vinyl DB server. */
9 :
10 : #define FD_ACCDB_ROOT_BATCH_MAX (128UL)
11 :
12 : struct fd_accdb_admin_v2 {
13 : union {
14 : fd_accdb_admin_base_t base;
15 : fd_accdb_admin_v1_t v1[1];
16 : };
17 :
18 : /* Vinyl client */
19 : ulong vinyl_req_id;
20 : fd_vinyl_rq_t * vinyl_rq;
21 : ulong vinyl_link_id;
22 : fd_wksp_t * vinyl_data_wksp;
23 : fd_wksp_t * vinyl_req_wksp;
24 : fd_vinyl_req_pool_t * vinyl_req_pool;
25 : };
26 :
27 : typedef struct fd_accdb_admin_v2 fd_accdb_admin_v2_t;
28 :
29 : FD_STATIC_ASSERT( alignof(fd_accdb_admin_v2_t)<=alignof(fd_accdb_admin_t), layout );
30 : FD_STATIC_ASSERT( sizeof (fd_accdb_admin_v2_t)<=sizeof(fd_accdb_admin_t), layout );
31 :
32 : fd_accdb_admin_t *
33 : fd_accdb_admin_v2_init( fd_accdb_admin_t * accdb_,
34 : void * shfunk,
35 : void * vinyl_rq,
36 : void * vinyl_data,
37 : void * vinyl_req_pool,
38 0 : ulong vinyl_link_id ) {
39 : /* Call superclass constructor */
40 0 : if( FD_UNLIKELY( !fd_accdb_admin_v1_init( accdb_, shfunk ) ) ) {
41 0 : return NULL;
42 0 : }
43 0 : if( FD_UNLIKELY( !vinyl_data ) ) {
44 0 : FD_LOG_WARNING(( "NULL vinyl_data" ));
45 0 : return NULL;
46 0 : }
47 :
48 0 : fd_vinyl_rq_t * rq = fd_vinyl_rq_join( vinyl_rq );
49 0 : fd_vinyl_req_pool_t * req_pool = fd_vinyl_req_pool_join( vinyl_req_pool );
50 0 : if( FD_UNLIKELY( !rq || !req_pool ) ) {
51 : /* component joins log warning if this is reached */
52 0 : FD_LOG_WARNING(( "Failed to initialize database client" ));
53 0 : return NULL;
54 0 : }
55 :
56 0 : fd_accdb_admin_v2_t * accdb = fd_type_pun( accdb_ );
57 0 : accdb->vinyl_req_id = 0UL;
58 0 : accdb->vinyl_rq = rq;
59 0 : accdb->vinyl_link_id = vinyl_link_id;
60 0 : accdb->vinyl_data_wksp = vinyl_data;
61 0 : accdb->vinyl_req_wksp = fd_wksp_containing( req_pool );
62 0 : accdb->vinyl_req_pool = req_pool;
63 0 : accdb->base.accdb_type = FD_ACCDB_TYPE_V2;
64 0 : accdb->base.vt = &fd_accdb_admin_v2_vt;
65 0 : return accdb_;
66 0 : }
67 :
68 : static fd_accdb_admin_v2_t *
69 0 : downcast( fd_accdb_admin_t * admin ) {
70 0 : if( FD_UNLIKELY( !admin ) ) {
71 0 : FD_LOG_CRIT(( "NULL admin" ));
72 0 : }
73 0 : if( FD_UNLIKELY( admin->base.accdb_type!=FD_ACCDB_TYPE_V2 ) ) {
74 0 : FD_LOG_CRIT(( "corrupt accdb_admin handle" ));
75 0 : }
76 0 : return (fd_accdb_admin_v2_t *)admin;
77 0 : }
78 :
79 : void
80 0 : fd_accdb_admin_v2_fini( fd_accdb_admin_t * admin_ ) {
81 0 : fd_accdb_admin_v2_t * admin = downcast( admin_ );
82 :
83 0 : fd_vinyl_rq_leave( admin->vinyl_rq );
84 :
85 : /* superclass destructor */
86 0 : admin->base.accdb_type = FD_ACCDB_TYPE_V1;
87 0 : fd_accdb_admin_v1_fini( admin_ );
88 0 : }
89 :
90 : fd_funk_txn_xid_t
91 0 : fd_accdb_v2_root_get( fd_accdb_admin_t const * admin ) {
92 0 : return fd_accdb_v1_root_get( admin );
93 0 : }
94 :
95 : void
96 : fd_accdb_v2_attach_child( fd_accdb_admin_t * admin_,
97 : fd_funk_txn_xid_t const * xid_parent,
98 0 : fd_funk_txn_xid_t const * xid_new ) {
99 0 : fd_accdb_admin_v1_t * db = downcast( admin_ )->v1;
100 0 : FD_LOG_INFO(( "accdb txn xid %lu:%lu: created with parent %lu:%lu",
101 0 : xid_new ->ul[0], xid_new ->ul[1],
102 0 : xid_parent->ul[0], xid_parent->ul[1] ));
103 0 : fd_funk_txn_prepare( db->funk, xid_parent, xid_new );
104 0 : }
105 :
106 : void
107 : fd_accdb_v2_cancel( fd_accdb_admin_t * admin,
108 0 : fd_funk_txn_xid_t const * xid ) {
109 0 : fd_accdb_v1_cancel( admin, xid );
110 0 : }
111 :
112 : static void
113 : vinyl_push_rec( fd_accdb_admin_v2_t * admin,
114 : void const * addr,
115 0 : fd_account_meta_t const * src_meta ) {
116 0 : fd_vinyl_rq_t * rq = admin->vinyl_rq; /* "request queue "*/
117 0 : fd_vinyl_req_pool_t * req_pool = admin->vinyl_req_pool; /* "request pool" */
118 0 : fd_wksp_t * req_wksp = admin->vinyl_req_wksp; /* shm workspace containing request buffer */
119 0 : fd_wksp_t * data_wksp = admin->vinyl_data_wksp; /* shm workspace containing vinyl data cache */
120 0 : ulong link_id = admin->vinyl_link_id; /* vinyl client ID */
121 :
122 0 : ulong batch_idx = fd_vinyl_req_pool_acquire( req_pool );
123 : /* req_pool_release called before returning */
124 0 : fd_vinyl_comp_t * comp = fd_vinyl_req_batch_comp ( req_pool, batch_idx );
125 0 : fd_vinyl_key_t * req_key = fd_vinyl_req_batch_key ( req_pool, batch_idx );
126 0 : schar * req_err = fd_vinyl_req_batch_err ( req_pool, batch_idx );
127 0 : ulong * req_val_gaddr = fd_vinyl_req_batch_val_gaddr( req_pool, batch_idx );
128 :
129 0 : memset( comp, 0, sizeof(fd_vinyl_comp_t) );
130 0 : fd_vinyl_key_init( req_key, addr, 32UL );
131 0 : *req_err = 0;
132 0 : *req_val_gaddr = 0UL;
133 :
134 0 : ulong val_sz = sizeof(fd_account_meta_t)+src_meta->dlen;
135 0 : ulong flags = FD_VINYL_REQ_FLAG_MODIFY | FD_VINYL_REQ_FLAG_CREATE;
136 0 : ulong req_id = admin->vinyl_req_id++;
137 0 : fd_vinyl_req_send_batch( rq, req_pool, req_wksp, req_id, link_id, FD_VINYL_REQ_TYPE_ACQUIRE, flags, batch_idx, 1UL, val_sz );
138 :
139 0 : while( FD_VOLATILE_CONST( comp->seq )!=1UL ) FD_SPIN_PAUSE();
140 0 : FD_COMPILER_MFENCE();
141 0 : int comp_err = FD_VOLATILE_CONST( comp->err );
142 0 : if( FD_UNLIKELY( comp_err!=FD_VINYL_SUCCESS ) ) {
143 0 : FD_LOG_CRIT(( "vinyl tile rejected my ACQUIRE request: %i-%s", comp_err, fd_vinyl_strerror( comp_err ) ));
144 0 : }
145 0 : int err = FD_VOLATILE_CONST( req_err[0] );
146 0 : if( FD_UNLIKELY( err!=FD_VINYL_SUCCESS ) ) {
147 0 : err = FD_VOLATILE_CONST( req_err[0] );
148 0 : FD_LOG_CRIT(( "vinyl tile ACQUIRE request failed: %i-%s", err, fd_vinyl_strerror( err ) ));
149 0 : }
150 :
151 0 : fd_account_meta_t * dst_meta = fd_wksp_laddr_fast( data_wksp, req_val_gaddr[0] );
152 0 : fd_vinyl_info_t * val_info = fd_vinyl_data_info( dst_meta );
153 0 : fd_memcpy( dst_meta, src_meta, val_sz );
154 0 : val_info->val_sz = (uint)val_sz;
155 :
156 0 : memset( comp, 0, sizeof(fd_vinyl_comp_t) );
157 0 : *req_err = 0;
158 0 : flags = FD_VINYL_REQ_FLAG_MODIFY;
159 0 : req_id = admin->vinyl_req_id++;
160 0 : fd_vinyl_req_send_batch( rq, req_pool, req_wksp, req_id, link_id, FD_VINYL_REQ_TYPE_RELEASE, FD_VINYL_REQ_FLAG_MODIFY, batch_idx, 1UL, val_sz );
161 :
162 0 : while( FD_VOLATILE_CONST( comp->seq )!=1UL ) FD_SPIN_PAUSE();
163 0 : FD_COMPILER_MFENCE();
164 0 : comp_err = FD_VOLATILE_CONST( comp->err );
165 0 : if( FD_UNLIKELY( comp_err!=FD_VINYL_SUCCESS ) ) {
166 0 : FD_LOG_CRIT(( "vinyl tile rejected my RELEASE request: %i-%s", comp_err, fd_vinyl_strerror( comp_err ) ));
167 0 : }
168 0 : if( FD_UNLIKELY( req_err[0]!=FD_VINYL_SUCCESS ) ) {
169 0 : FD_LOG_CRIT(( "vinyl tile RELEASE request failed: %i-%s", req_err[0], fd_vinyl_strerror( req_err[0] ) ));
170 0 : }
171 :
172 0 : fd_vinyl_req_pool_release( req_pool, batch_idx );
173 0 : }
174 :
175 : static void
176 : vinyl_remove_rec( fd_accdb_admin_v2_t * admin,
177 0 : void const * addr ) {
178 0 : fd_vinyl_rq_t * rq = admin->vinyl_rq; /* "request queue "*/
179 0 : fd_vinyl_req_pool_t * req_pool = admin->vinyl_req_pool; /* "request pool" */
180 0 : fd_wksp_t * req_wksp = admin->vinyl_req_wksp; /* shm workspace containing request buffer */
181 0 : ulong link_id = admin->vinyl_link_id; /* vinyl client ID */
182 :
183 0 : ulong batch_idx = fd_vinyl_req_pool_acquire( req_pool );
184 : /* req_pool_release called before returning */
185 0 : fd_vinyl_comp_t * comp = fd_vinyl_req_batch_comp( req_pool, batch_idx );
186 0 : fd_vinyl_key_t * req_key = fd_vinyl_req_batch_key ( req_pool, batch_idx );
187 0 : schar * req_err = fd_vinyl_req_batch_err ( req_pool, batch_idx );
188 :
189 0 : memset( comp, 0, sizeof(fd_vinyl_comp_t) );
190 0 : fd_vinyl_key_init( req_key, addr, 32UL );
191 0 : *req_err = 0;
192 :
193 0 : ulong req_id = admin->vinyl_req_id++;
194 0 : fd_vinyl_req_send_batch( rq, req_pool, req_wksp, req_id, link_id, FD_VINYL_REQ_TYPE_ERASE, 0UL, batch_idx, 1UL, 0UL );
195 :
196 0 : while( FD_VOLATILE_CONST( comp->seq )!=1UL ) FD_SPIN_PAUSE();
197 0 : FD_COMPILER_MFENCE();
198 0 : int comp_err = FD_VOLATILE_CONST( comp->err );
199 0 : if( FD_UNLIKELY( comp_err!=FD_VINYL_SUCCESS ) ) {
200 0 : FD_LOG_CRIT(( "vinyl tile rejected my ERASE request: %i-%s", comp_err, fd_vinyl_strerror( comp_err ) ));
201 0 : }
202 0 : int err = FD_VOLATILE_CONST( req_err[0] );
203 0 : if( FD_UNLIKELY( err!=FD_VINYL_SUCCESS && err!=FD_VINYL_ERR_KEY ) ) {
204 0 : err = FD_VOLATILE_CONST( req_err[0] );
205 0 : FD_LOG_CRIT(( "vinyl tile ERASE request failed: %i-%s", err, fd_vinyl_strerror( err ) ));
206 0 : }
207 :
208 0 : fd_vinyl_req_pool_release( req_pool, batch_idx );
209 0 : }
210 :
211 : /* funk_rec_write_lock spins until it gains a write lock for a record,
212 : increments the version number, and returns the updated ver_lock
213 : value. */
214 :
215 : static ulong
216 0 : fd_funk_rec_admin_lock( fd_funk_rec_t * rec ) {
217 0 : ulong * vl = &rec->ver_lock;
218 0 : for(;;) {
219 0 : ulong const ver_lock = FD_VOLATILE_CONST( *vl );
220 0 : ulong const ver = fd_funk_rec_ver_bits ( ver_lock );
221 0 : ulong const lock = fd_funk_rec_lock_bits( ver_lock );
222 0 : if( FD_UNLIKELY( lock ) ) {
223 : /* Spin while there are active readers */
224 : /* FIXME kill client after spinning for 30 seconds to prevent silent deadlock */
225 0 : FD_SPIN_PAUSE();
226 0 : continue;
227 0 : }
228 0 : ulong const new_ver = fd_funk_rec_ver_inc( ver );
229 0 : ulong const new_vl = fd_funk_rec_ver_lock( new_ver, FD_FUNK_REC_LOCK_MASK );
230 0 : if( FD_UNLIKELY( FD_ATOMIC_CAS( vl, ver_lock, new_vl )!=ver_lock ) ) {
231 0 : FD_SPIN_PAUSE();
232 0 : continue;
233 0 : }
234 0 : return new_vl;
235 0 : }
236 0 : }
237 :
238 : static void
239 : fd_funk_rec_admin_unlock( fd_funk_rec_t * rec,
240 0 : ulong ver_lock ) {
241 0 : FD_VOLATILE( rec->ver_lock ) = fd_funk_rec_ver_lock( fd_funk_rec_ver_bits( ver_lock ), 0UL );
242 0 : }
243 :
244 : static void
245 : funk_remove_rec( fd_funk_t * funk,
246 0 : fd_funk_rec_t * rec ) {
247 :
248 : /* Step 1: Remove record from map */
249 :
250 0 : fd_funk_xid_key_pair_t pair = rec->pair;
251 0 : fd_funk_rec_query_t query[1];
252 0 : int rm_err = fd_funk_rec_map_remove( funk->rec_map, &pair, NULL, query, FD_MAP_FLAG_BLOCKING );
253 0 : if( FD_UNLIKELY( rm_err!=FD_MAP_SUCCESS ) ) FD_LOG_CRIT(( "fd_funk_rec_map_remove failed (%i-%s)", rm_err, fd_map_strerror( rm_err ) ));
254 0 : FD_COMPILER_MFENCE();
255 :
256 : /* Step 2: Acquire admin lock (kick out readers)
257 :
258 : Note: At this point, well-behaving external readers will abandon a
259 : read-lock attempt if they observe this active write lock. (An
260 : admin lock always implies the record is about to die) */
261 :
262 0 : ulong ver_lock = fd_funk_rec_admin_lock( rec );
263 :
264 : /* Step 3: Free record */
265 :
266 0 : memset( &rec->pair, 0, sizeof(fd_funk_xid_key_pair_t) );
267 0 : FD_COMPILER_MFENCE();
268 0 : rec->map_next = FD_FUNK_REC_IDX_NULL;
269 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
270 0 : fd_funk_rec_admin_unlock( rec, ver_lock );
271 0 : fd_funk_rec_pool_release( funk->rec_pool, rec, 1 );
272 0 : }
273 :
274 : static void
275 : publish_recs( fd_accdb_admin_v2_t * admin,
276 0 : fd_funk_txn_t * txn ) {
277 0 : fd_funk_t * funk = admin->v1->funk;
278 0 : fd_wksp_t * funk_wksp = funk->wksp;
279 :
280 : /* Iterate record list */
281 0 : uint head = txn->rec_head_idx;
282 0 : txn->rec_head_idx = FD_FUNK_REC_IDX_NULL;
283 0 : txn->rec_tail_idx = FD_FUNK_REC_IDX_NULL;
284 0 : while( !fd_funk_rec_idx_is_null( head ) ) {
285 0 : fd_funk_rec_t * rec = &funk->rec_pool->ele[ head ];
286 0 : uint next = rec->next_idx;
287 0 : fd_account_meta_t const * meta = fd_funk_val( rec, funk_wksp );
288 0 : FD_CRIT( meta && rec->val_sz>=sizeof(fd_account_meta_t), "invalid funk record value" );
289 :
290 : /* Migrate records one-by-one. This is slow and should be done in
291 : batches instead. But it's simple and shippable for now. */
292 0 : if( meta->lamports ) {
293 0 : vinyl_push_rec( admin, rec->pair.key, meta );
294 0 : } else {
295 0 : vinyl_remove_rec( admin, rec->pair.key );
296 0 : }
297 0 : funk_remove_rec( funk, rec );
298 :
299 0 : admin->base.root_cnt++;
300 0 : head = next; /* next record */
301 0 : }
302 0 : }
303 :
304 : static void
305 : txn_unregister( fd_funk_t * funk,
306 0 : fd_funk_txn_t * txn ) {
307 0 : ulong child_idx = fd_funk_txn_idx( txn->child_head_cidx );
308 0 : while( FD_UNLIKELY( !fd_funk_txn_idx_is_null( child_idx ) ) ) {
309 0 : funk->txn_pool->ele[ child_idx ].parent_cidx = fd_funk_txn_cidx( FD_FUNK_TXN_IDX_NULL );
310 0 : child_idx = fd_funk_txn_idx( funk->txn_pool->ele[ child_idx ].sibling_next_cidx );
311 0 : }
312 :
313 0 : fd_funk_txn_xid_t xid[1]; fd_funk_txn_xid_copy( xid, fd_funk_txn_xid( txn ) );
314 0 : fd_funk_txn_map_query_t query[1];
315 0 : int remove_err = fd_funk_txn_map_remove( funk->txn_map, xid, NULL, query, 0 );
316 0 : if( FD_UNLIKELY( remove_err!=FD_MAP_SUCCESS ) ) {
317 0 : FD_LOG_CRIT(( "fd_accdb_publish failed: fd_funk_txn_map_remove failed: %i-%s", remove_err, fd_map_strerror( remove_err ) ));
318 0 : }
319 0 : }
320 :
321 : static void
322 : txn_free( fd_funk_t * funk,
323 0 : fd_funk_txn_t * txn ) {
324 0 : FD_VOLATILE( txn->state ) = FD_FUNK_TXN_STATE_FREE;
325 0 : txn->parent_cidx = UINT_MAX;
326 0 : txn->sibling_prev_cidx = UINT_MAX;
327 0 : txn->sibling_next_cidx = UINT_MAX;
328 0 : txn->child_head_cidx = UINT_MAX;
329 0 : txn->child_tail_cidx = UINT_MAX;
330 0 : fd_funk_txn_pool_release( funk->txn_pool, txn, 1 );
331 0 : }
332 :
333 : static void
334 : fd_accdb_txn_publish_one( fd_accdb_admin_v2_t * accdb,
335 0 : fd_funk_txn_t * txn ) {
336 0 : fd_funk_t * funk = accdb->v1->funk;
337 :
338 : /* Phase 1: Mark transaction as "last published" */
339 :
340 0 : fd_funk_txn_xid_t xid[1]; fd_funk_txn_xid_copy( xid, fd_funk_txn_xid( txn ) );
341 0 : if( FD_UNLIKELY( !fd_funk_txn_idx_is_null( fd_funk_txn_idx( txn->parent_cidx ) ) ) ) {
342 0 : FD_LOG_CRIT(( "fd_accdb_publish failed: txn with xid %lu:%lu is not a child of the last published txn", xid->ul[0], xid->ul[1] ));
343 0 : }
344 0 : fd_funk_txn_xid_st_atomic( funk->shmem->last_publish, xid );
345 0 : FD_LOG_INFO(( "accdb txn laddr=%p xid %lu:%lu: publish", (void *)txn, txn->xid.ul[0], txn->xid.ul[1] ));
346 :
347 : /* Phase 2: Drain users from transaction */
348 :
349 0 : fd_rwlock_write( txn->lock );
350 0 : FD_VOLATILE( txn->state ) = FD_FUNK_TXN_STATE_PUBLISH;
351 :
352 : /* Phase 3: Move records from funk to vinyl */
353 :
354 0 : publish_recs( accdb, txn );
355 :
356 : /* Phase 4: Unregister transaction */
357 :
358 0 : txn_unregister( funk, txn );
359 :
360 : /* Phase 5: Free transaction object */
361 :
362 0 : fd_rwlock_unwrite( txn->lock );
363 0 : txn_free( funk, txn );
364 0 : }
365 :
366 : void
367 : fd_accdb_v2_advance_root( fd_accdb_admin_t * accdb_,
368 0 : fd_funk_txn_xid_t const * xid ) {
369 0 : fd_accdb_admin_v2_t * accdb = downcast( accdb_ );
370 0 : fd_funk_t * funk = accdb->v1->funk;
371 :
372 : /* Assume no concurrent access to txn_map */
373 :
374 0 : fd_funk_txn_map_query_t query[1];
375 0 : int query_err = fd_funk_txn_map_query_try( funk->txn_map, xid, NULL, query, 0 );
376 0 : if( FD_UNLIKELY( query_err ) ) {
377 0 : FD_LOG_CRIT(( "fd_accdb_advance_root failed: fd_funk_txn_map_query_try(xid=%lu:%lu) returned (%i-%s)",
378 0 : xid->ul[0], xid->ul[1], query_err, fd_map_strerror( query_err ) ));
379 0 : }
380 0 : fd_funk_txn_t * txn = fd_funk_txn_map_query_ele( query );
381 :
382 0 : if( FD_UNLIKELY( !fd_funk_txn_idx_is_null( fd_funk_txn_idx( txn->parent_cidx ) ) ) ) {
383 0 : FD_LOG_CRIT(( "fd_accdb_txn_advance_root: parent of txn %lu:%lu is not root", xid->ul[0], xid->ul[1] ));
384 0 : }
385 :
386 0 : FD_LOG_INFO(( "accdb txn laddr=%p xid %lu:%lu: advancing root",
387 0 : (void *)txn,
388 0 : xid->ul[0], xid->ul[1] ));
389 :
390 0 : fd_accdb_txn_cancel_siblings( accdb->v1, txn );
391 :
392 : /* Children of transaction are now children of root */
393 0 : funk->shmem->child_head_cidx = txn->child_head_cidx;
394 0 : funk->shmem->child_tail_cidx = txn->child_tail_cidx;
395 :
396 0 : fd_accdb_txn_publish_one( accdb, txn );
397 0 : }
398 :
399 : fd_accdb_admin_vt_t const fd_accdb_admin_v2_vt = {
400 : .fini = fd_accdb_admin_v2_fini,
401 : .root_get = fd_accdb_v2_root_get,
402 : .attach_child = fd_accdb_v2_attach_child,
403 : .advance_root = fd_accdb_v2_advance_root,
404 : .cancel = fd_accdb_v2_cancel
405 : };
|