Line data Source code
1 : #include "fd_rocksdb.h"
2 : #include "fd_blockstore.h"
3 : #include "../shredcap/fd_shredcap.h"
4 : #include <stdbool.h>
5 : #include <stdlib.h>
6 : #include <stdio.h>
7 : #include <unistd.h>
8 : #include "../../util/bits/fd_bits.h"
9 :
10 : char *
11 : fd_rocksdb_init( fd_rocksdb_t * db,
12 0 : char const * db_name ) {
13 0 : fd_memset(db, 0, sizeof(fd_rocksdb_t));
14 :
15 0 : db->opts = rocksdb_options_create();
16 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEFAULT ] = "default";
17 0 : db->cfgs[ FD_ROCKSDB_CFIDX_META ] = "meta";
18 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEAD_SLOTS ] = "dead_slots";
19 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DUPLICATE_SLOTS ] = "duplicate_slots";
20 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ERASURE_META ] = "erasure_meta";
21 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ORPHANS ] = "orphans";
22 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BANK_HASHES ] = "bank_hashes";
23 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ROOT ] = "root";
24 0 : db->cfgs[ FD_ROCKSDB_CFIDX_INDEX ] = "index";
25 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DATA_SHRED ] = "data_shred";
26 0 : db->cfgs[ FD_ROCKSDB_CFIDX_CODE_SHRED ] = "code_shred";
27 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ] = "transaction_status";
28 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ] = "address_signatures";
29 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ] = "transaction_memos";
30 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS_INDEX ] = "transaction_status_index";
31 0 : db->cfgs[ FD_ROCKSDB_CFIDX_REWARDS ] = "rewards";
32 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCKTIME ] = "blocktime";
33 0 : db->cfgs[ FD_ROCKSDB_CFIDX_PERF_SAMPLES ] = "perf_samples";
34 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCK_HEIGHT ] = "block_height";
35 0 : db->cfgs[ FD_ROCKSDB_CFIDX_OPTIMISTIC_SLOTS ] = "optimistic_slots";
36 0 : db->cfgs[ FD_ROCKSDB_CFIDX_MERKLE_ROOT_META ] = "merkle_root_meta";
37 :
38 0 : rocksdb_options_t const * cf_options[ FD_ROCKSDB_CF_CNT ];
39 0 : for( ulong i=0UL; i<FD_ROCKSDB_CF_CNT; i++ )
40 0 : cf_options[ i ] = db->opts;
41 :
42 0 : char *err = NULL;
43 :
44 0 : db->db = rocksdb_open_for_read_only_column_families(
45 0 : db->opts,
46 0 : db_name,
47 0 : FD_ROCKSDB_CF_CNT,
48 0 : (char const * const *)db->cfgs,
49 0 : (rocksdb_options_t const * const *)cf_options,
50 0 : db->cf_handles,
51 0 : false,
52 0 : &err );
53 :
54 0 : if( FD_UNLIKELY( err ) ) return err;
55 :
56 0 : db->ro = rocksdb_readoptions_create();
57 :
58 0 : return NULL;
59 0 : }
60 :
61 : void
62 : fd_rocksdb_new( fd_rocksdb_t * db,
63 0 : char const * db_name ) {
64 0 : fd_memset(db, 0, sizeof(fd_rocksdb_t));
65 :
66 0 : db->opts = rocksdb_options_create();
67 : /* Create the db*/
68 0 : rocksdb_options_set_create_if_missing(db->opts, 1);
69 :
70 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEFAULT ] = "default";
71 0 : db->cfgs[ FD_ROCKSDB_CFIDX_META ] = "meta";
72 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEAD_SLOTS ] = "dead_slots";
73 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DUPLICATE_SLOTS ] = "duplicate_slots";
74 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ERASURE_META ] = "erasure_meta";
75 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ORPHANS ] = "orphans";
76 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BANK_HASHES ] = "bank_hashes";
77 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ROOT ] = "root";
78 0 : db->cfgs[ FD_ROCKSDB_CFIDX_INDEX ] = "index";
79 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DATA_SHRED ] = "data_shred";
80 0 : db->cfgs[ FD_ROCKSDB_CFIDX_CODE_SHRED ] = "code_shred";
81 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ] = "transaction_status";
82 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ] = "address_signatures";
83 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ] = "transaction_memos";
84 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS_INDEX ] = "transaction_status_index";
85 0 : db->cfgs[ FD_ROCKSDB_CFIDX_REWARDS ] = "rewards";
86 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCKTIME ] = "blocktime";
87 0 : db->cfgs[ FD_ROCKSDB_CFIDX_PERF_SAMPLES ] = "perf_samples";
88 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCK_HEIGHT ] = "block_height";
89 0 : db->cfgs[ FD_ROCKSDB_CFIDX_OPTIMISTIC_SLOTS ] = "optimistic_slots";
90 0 : db->cfgs[ FD_ROCKSDB_CFIDX_MERKLE_ROOT_META ] = "merkle_root_meta";
91 :
92 : /* Create the rocksdb */
93 0 : char * err = NULL;
94 0 : db->db = rocksdb_open(db->opts, db_name, &err);
95 0 : if ( err != NULL ) {
96 0 : FD_LOG_ERR(("rocksdb creation failed: %s", err));
97 0 : }
98 :
99 0 : db->wo = rocksdb_writeoptions_create();
100 :
101 : /* Create column families, default already exists at index 0 */
102 0 : for ( ulong i = 1; i < FD_ROCKSDB_CF_CNT; ++i ) {
103 0 : db->cf_handles[i] = rocksdb_create_column_family(db->db, db->opts, db->cfgs[i], &err);
104 0 : }
105 0 : rocksdb_options_set_compression( db->opts, rocksdb_lz4_compression );
106 0 : }
107 :
108 0 : void fd_rocksdb_destroy(fd_rocksdb_t *db) {
109 :
110 0 : for( ulong i=0UL; i<FD_ROCKSDB_CF_CNT; i++ ) {
111 0 : if( db->cf_handles[i] ) {
112 0 : rocksdb_column_family_handle_destroy( db->cf_handles[i] );
113 0 : db->cf_handles[i] = NULL;
114 0 : }
115 0 : }
116 :
117 0 : if( db->ro ) {
118 0 : rocksdb_readoptions_destroy( db->ro );
119 0 : db->ro = NULL;
120 0 : }
121 :
122 0 : if( db->opts ) {
123 0 : rocksdb_options_destroy( db->opts );
124 0 : db->opts = NULL;
125 0 : }
126 :
127 0 : if( db->db ) {
128 0 : rocksdb_close( db->db );
129 0 : db->db = NULL;
130 0 : }
131 :
132 0 : if( db->wo ) {
133 0 : rocksdb_writeoptions_destroy( db->wo );
134 0 : }
135 0 : }
136 :
137 0 : ulong fd_rocksdb_last_slot(fd_rocksdb_t *db, char **err) {
138 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
139 0 : rocksdb_iter_seek_to_last(iter);
140 0 : if (!rocksdb_iter_valid(iter)) {
141 0 : rocksdb_iter_destroy(iter);
142 0 : *err = "db column for root is empty";
143 0 : return 0;
144 0 : }
145 :
146 0 : size_t klen = 0;
147 0 : const char *key = rocksdb_iter_key(iter, &klen); // There is no need to free key
148 0 : unsigned long slot = fd_ulong_bswap(*((unsigned long *) key));
149 0 : rocksdb_iter_destroy(iter);
150 0 : return slot;
151 0 : }
152 :
153 0 : ulong fd_rocksdb_find_last_slot(fd_rocksdb_t *db, char **err) {
154 0 : ulong max_slot = 0;
155 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
156 0 : rocksdb_iter_seek_to_first(iter);
157 0 : if (!rocksdb_iter_valid(iter)) {
158 0 : rocksdb_iter_destroy(iter);
159 0 : *err = "db column for root is empty";
160 0 : return 0;
161 0 : }
162 :
163 0 : for( ; rocksdb_iter_valid(iter); rocksdb_iter_next(iter) ) {
164 0 : size_t klen = 0;
165 0 : const char *key = rocksdb_iter_key(iter, &klen); // There is no need to free key
166 0 : unsigned long slot = fd_ulong_bswap(*((unsigned long *) key));
167 :
168 0 : if( slot > max_slot ) {
169 0 : max_slot = slot;
170 0 : FD_LOG_WARNING(("new max_slot: %lu", max_slot));
171 0 : }
172 0 : }
173 :
174 0 : rocksdb_iter_destroy(iter);
175 0 : return max_slot;
176 0 : }
177 :
178 : ulong
179 : fd_rocksdb_first_slot( fd_rocksdb_t * db,
180 0 : char ** err ) {
181 :
182 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
183 0 : rocksdb_iter_seek_to_first(iter);
184 0 : if( FD_UNLIKELY( !rocksdb_iter_valid(iter) ) ) {
185 0 : rocksdb_iter_destroy(iter);
186 0 : *err = "db column for root is empty";
187 0 : return 0;
188 0 : }
189 :
190 0 : ulong klen = 0;
191 0 : char const * key = rocksdb_iter_key( iter, &klen ); // There is no need to free key
192 0 : ulong slot = fd_ulong_bswap( *((ulong *)key));
193 0 : rocksdb_iter_destroy(iter);
194 0 : return slot;
195 0 : }
196 :
197 : int
198 : fd_rocksdb_get_meta( fd_rocksdb_t * db,
199 : ulong slot,
200 : fd_slot_meta_t * m,
201 0 : fd_valloc_t valloc ) {
202 0 : ulong ks = fd_ulong_bswap(slot);
203 0 : size_t vallen = 0;
204 :
205 0 : char * err = NULL;
206 0 : char * meta = rocksdb_get_cf( db->db,
207 0 : db->ro,
208 0 : db->cf_handles[FD_ROCKSDB_CFIDX_META],
209 0 : (const char *) &ks,
210 0 : sizeof(ks),
211 0 : &vallen,
212 0 : &err );
213 :
214 0 : if( NULL != err ) {
215 0 : FD_LOG_WARNING(( "%s", err ));
216 0 : free( err );
217 0 : return -2;
218 0 : }
219 :
220 0 : if (0 == vallen)
221 0 : return -1;
222 :
223 0 : fd_bincode_decode_ctx_t ctx;
224 0 : ctx.data = meta;
225 0 : ctx.dataend = &meta[vallen];
226 :
227 0 : ulong total_sz = 0UL;
228 0 : if( fd_slot_meta_decode_footprint( &ctx, &total_sz ) ) {
229 0 : FD_LOG_ERR(( "fd_slot_meta_decode failed" ));
230 0 : }
231 :
232 0 : uchar * mem = fd_valloc_malloc( valloc, fd_slot_meta_align(), total_sz );
233 0 : if( NULL == mem ) {
234 0 : FD_LOG_ERR(( "fd_valloc_malloc failed" ));
235 0 : }
236 :
237 0 : fd_slot_meta_decode( mem, &ctx );
238 :
239 0 : fd_memcpy( m, mem, sizeof(fd_slot_meta_t) );
240 :
241 0 : free(meta);
242 :
243 0 : return 0;
244 0 : }
245 :
246 : void *
247 0 : fd_rocksdb_root_iter_new ( void * ptr ) {
248 0 : fd_memset(ptr, 0, sizeof(fd_rocksdb_root_iter_t));
249 0 : return ptr;
250 0 : }
251 :
252 : fd_rocksdb_root_iter_t *
253 0 : fd_rocksdb_root_iter_join ( void * ptr ) {
254 0 : return (fd_rocksdb_root_iter_t *) ptr;
255 0 : }
256 :
257 : void *
258 0 : fd_rocksdb_root_iter_leave ( fd_rocksdb_root_iter_t * ptr ) {
259 0 : return ptr;
260 0 : }
261 :
262 : int
263 : fd_rocksdb_root_iter_seek( fd_rocksdb_root_iter_t * self,
264 : fd_rocksdb_t * db,
265 : ulong slot,
266 : fd_slot_meta_t * m,
267 0 : fd_valloc_t valloc ) {
268 0 : self->db = db;
269 :
270 0 : if( FD_UNLIKELY( !self->iter ) )
271 0 : self->iter = rocksdb_create_iterator_cf(self->db->db, self->db->ro, self->db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
272 :
273 0 : ulong ks = fd_ulong_bswap( slot );
274 :
275 0 : rocksdb_iter_seek( self->iter, (char const *)&ks, sizeof(ulong) );
276 0 : if( FD_UNLIKELY( !rocksdb_iter_valid(self->iter) ) )
277 0 : return -1;
278 :
279 0 : size_t klen = 0;
280 0 : char const * key = rocksdb_iter_key( self->iter, &klen ); // There is no need to free key
281 0 : ulong kslot = fd_ulong_bswap( *((ulong *)key) );
282 :
283 0 : if( FD_UNLIKELY( kslot != slot ) ) {
284 0 : FD_LOG_WARNING(( "fd_rocksdb_root_iter_seek: wanted slot %lu, found %lu",
285 0 : slot, kslot ));
286 0 : return -2;
287 0 : }
288 :
289 0 : return fd_rocksdb_get_meta( self->db, slot, m, valloc );
290 0 : }
291 :
292 : int
293 0 : fd_rocksdb_root_iter_slot ( fd_rocksdb_root_iter_t * self, ulong *slot ) {
294 0 : if ((NULL == self->db) || (NULL == self->iter))
295 0 : return -1;
296 :
297 0 : if (!rocksdb_iter_valid(self->iter))
298 0 : return -2;
299 :
300 0 : size_t klen = 0;
301 0 : const char *key = rocksdb_iter_key(self->iter, &klen); // There is no need to free key
302 0 : *slot = fd_ulong_bswap(*((unsigned long *) key));
303 0 : return 0;
304 0 : }
305 :
306 : int
307 : fd_rocksdb_root_iter_next( fd_rocksdb_root_iter_t * self,
308 : fd_slot_meta_t * m,
309 0 : fd_valloc_t valloc ) {
310 0 : if ((NULL == self->db) || (NULL == self->iter))
311 0 : return -1;
312 :
313 0 : if (!rocksdb_iter_valid(self->iter))
314 0 : return -2;
315 :
316 0 : rocksdb_iter_next(self->iter);
317 :
318 0 : if (!rocksdb_iter_valid(self->iter))
319 0 : return -3;
320 :
321 0 : size_t klen = 0;
322 0 : const char *key = rocksdb_iter_key(self->iter, &klen); // There is no need to free key
323 :
324 0 : return fd_rocksdb_get_meta( self->db, fd_ulong_bswap(*((unsigned long *) key)), m, valloc );
325 0 : }
326 :
327 : void
328 0 : fd_rocksdb_root_iter_destroy ( fd_rocksdb_root_iter_t * self ) {
329 0 : if (NULL != self->iter) {
330 0 : rocksdb_iter_destroy(self->iter);
331 0 : self->iter = 0;
332 0 : }
333 0 : self->db = NULL;
334 0 : }
335 :
336 : void *
337 : fd_rocksdb_get_txn_status_raw( fd_rocksdb_t * self,
338 : ulong slot,
339 : void const * sig,
340 0 : ulong * psz ) {
341 :
342 0 : ulong slot_be = fd_ulong_bswap( slot );
343 :
344 : /* Construct RocksDB query key */
345 0 : char key[72];
346 0 : memcpy( key, sig, 64UL );
347 0 : memcpy( key+64UL, &slot_be, 8UL );
348 :
349 : /* Query record */
350 0 : char * err = NULL;
351 0 : char * res = rocksdb_get_cf(
352 0 : self->db, self->ro,
353 0 : self->cf_handles[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ],
354 0 : key, 72UL,
355 0 : psz,
356 0 : &err );
357 :
358 0 : if( FD_UNLIKELY( err ) ) {
359 0 : FD_LOG_WARNING(("err=%s", err));
360 0 : free( err );
361 0 : return NULL;
362 0 : }
363 0 : return res;
364 0 : }
365 :
366 : ulong
367 0 : fd_rocksdb_get_slot( ulong cf_idx, char const * key ) {
368 0 : switch (cf_idx) {
369 0 : case FD_ROCKSDB_CFIDX_TRANSACTION_STATUS:
370 0 : return fd_ulong_bswap(*((ulong *) &key[72])); /* (signature,slot)*/
371 0 : case FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES:
372 0 : return fd_ulong_bswap(*((ulong *) &key[40])); /* (pubkey,slot,u32,signature) */
373 0 : default: /* all other cfs have the slot at the start */
374 0 : return fd_ulong_bswap( *((ulong *)&key[0]) ); /* The key is just the slot number */
375 0 : }
376 :
377 0 : return fd_ulong_bswap( *((ulong *)key) );
378 0 : }
379 :
380 : void
381 0 : fd_rocksdb_iter_seek_to_slot_if_possible( rocksdb_iterator_t * iter, const ulong cf_idx, const ulong slot ) {
382 0 : ulong k = fd_ulong_bswap(slot);
383 0 : switch (cf_idx) {
384 : /* These cfs do not have the slot at the start, we can't seek based on slot prefix */
385 0 : case FD_ROCKSDB_CFIDX_TRANSACTION_STATUS:
386 0 : case FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES:
387 0 : rocksdb_iter_seek_to_first( iter );
388 0 : break;
389 0 : default: /* all other cfs have the slot at the start, seek based on slot prefix */
390 0 : rocksdb_iter_seek( iter, (const char *)&k, 8);
391 0 : break;
392 0 : }
393 0 : }
394 :
395 : int
396 : fd_rocksdb_copy_over_slot_indexed_range( fd_rocksdb_t * src,
397 : fd_rocksdb_t * dst,
398 : ulong cf_idx,
399 : ulong start_slot,
400 0 : ulong end_slot ) {
401 0 : FD_LOG_NOTICE(( "fd_rocksdb_copy_over_slot_indexed_range: %lu", cf_idx ));
402 :
403 0 : if ( cf_idx == FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ||
404 0 : cf_idx == FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ||
405 0 : cf_idx == FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ) {
406 0 : FD_LOG_NOTICE(( "fd_rocksdb_copy_over_range: skipping cf_idx=%lu because not slot indexed", cf_idx ));
407 0 : return 0;
408 0 : }
409 :
410 0 : rocksdb_iterator_t * iter = rocksdb_create_iterator_cf( src->db, src->ro, src->cf_handles[cf_idx] );
411 0 : if ( FD_UNLIKELY( iter == NULL ) ) {
412 0 : FD_LOG_ERR(( "rocksdb_create_iterator_cf failed for cf_idx=%lu", cf_idx ));
413 0 : }
414 :
415 0 : for ( fd_rocksdb_iter_seek_to_slot_if_possible( iter, cf_idx, start_slot ); rocksdb_iter_valid( iter ); rocksdb_iter_next( iter ) ) {
416 0 : ulong klen = 0;
417 0 : char const * key = rocksdb_iter_key( iter, &klen ); // There is no need to free key
418 :
419 0 : ulong slot = fd_rocksdb_get_slot( cf_idx, key );
420 0 : if ( slot < start_slot ) {
421 0 : continue;
422 0 : }
423 0 : else if ( slot > end_slot ) {
424 0 : break;
425 0 : }
426 :
427 0 : ulong vlen = 0;
428 0 : char const * value = rocksdb_iter_value( iter, &vlen );
429 :
430 0 : fd_rocksdb_insert_entry( dst, cf_idx, key, klen, value, vlen );
431 0 : }
432 0 : rocksdb_iter_destroy( iter );
433 0 : return 0;
434 0 : }
435 :
436 : int
437 : fd_rocksdb_copy_over_txn_status_range( fd_rocksdb_t * src,
438 : fd_rocksdb_t * dst,
439 : fd_blockstore_t * blockstore,
440 : ulong start_slot,
441 0 : ulong end_slot ) {
442 : /* Look up the blocks data and iterate through its transactions */
443 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
444 :
445 0 : for ( ulong slot = start_slot; slot <= end_slot; ++slot ) {
446 0 : FD_LOG_NOTICE(( "fd_rocksdb_copy_over_txn_status_range: %lu", slot ));
447 0 : fd_block_info_t * block_entry = fd_blockstore_block_map_query( blockstore, slot );
448 0 : if( FD_LIKELY( block_entry && fd_blockstore_shreds_complete( blockstore, slot) ) ) {
449 0 : fd_block_t * blk = fd_wksp_laddr_fast( wksp, block_entry->block_gaddr );
450 0 : uchar * data = fd_wksp_laddr_fast( wksp, blk->data_gaddr );
451 0 : fd_block_txn_t * txns = fd_wksp_laddr_fast( wksp, blk->txns_gaddr );
452 : /* TODO: change txn indexing after blockstore refactor */
453 0 : ulong last_txn_off = ULONG_MAX;
454 0 : for ( ulong j = 0; j < blk->txns_cnt; ++j ) {
455 0 : fd_txn_key_t sig;
456 0 : fd_memcpy( &sig, data + txns[j].id_off, sizeof(sig) );
457 0 : if( txns[j].txn_off != last_txn_off ) {
458 0 : last_txn_off = txns[j].txn_off;
459 0 : fd_rocksdb_copy_over_txn_status( src, dst, slot, &sig );
460 0 : }
461 0 : }
462 0 : }
463 0 : }
464 0 : return 0;
465 0 : }
466 :
467 : void
468 : fd_rocksdb_copy_over_txn_status( fd_rocksdb_t * src,
469 : fd_rocksdb_t * dst,
470 : ulong slot,
471 0 : void const * sig ) {
472 0 : ulong slot_be = fd_ulong_bswap( slot );
473 :
474 : /* Construct RocksDB query key */
475 : /* TODO: Replace with constants */
476 0 : char key[ 72 ];
477 0 : memcpy( key, sig, 64UL );
478 0 : memcpy( key+64UL, &slot_be, 8UL );
479 :
480 : /* Query record */
481 0 : ulong sz;
482 0 : char * err = NULL;
483 0 : char * res = rocksdb_get_cf(
484 0 : src->db, src->ro, src->cf_handles[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ],
485 0 : key, 72UL, &sz, &err );
486 :
487 0 : if( FD_UNLIKELY( err ) ) {
488 0 : FD_LOG_WARNING(("err=%s", err));
489 0 : free( err );
490 0 : return;
491 0 : }
492 :
493 0 : fd_rocksdb_insert_entry( dst, FD_ROCKSDB_CFIDX_TRANSACTION_STATUS, key, 72UL, res, sz );
494 0 : }
495 :
496 : int
497 : fd_rocksdb_insert_entry( fd_rocksdb_t * db,
498 : ulong cf_idx,
499 : const char * key,
500 : ulong klen,
501 : const char * value,
502 : ulong vlen )
503 0 : {
504 0 : char * err = NULL;
505 0 : rocksdb_put_cf( db->db, db->wo, db->cf_handles[cf_idx],
506 0 : key, klen, value, vlen, &err );
507 0 : if( FD_UNLIKELY( err != NULL ) ) {
508 0 : FD_LOG_WARNING(( "rocksdb_put_cf failed with error %s", err ));
509 0 : return -1;
510 0 : }
511 0 : return 0;
512 0 : }
513 :
514 : static void
515 0 : fd_blockstore_scan_block( fd_blockstore_t * blockstore, ulong slot, fd_block_t * block ) {
516 :
517 0 : fd_block_micro_t * micros = fd_alloc_malloc( fd_blockstore_alloc( blockstore ),
518 0 : alignof( fd_block_micro_t ),
519 0 : sizeof( *micros ) * FD_MICROBLOCK_MAX_PER_SLOT );
520 0 : fd_block_txn_t * txns = fd_alloc_malloc( fd_blockstore_alloc( blockstore ),
521 0 : alignof( fd_block_txn_t ),
522 0 : sizeof( *txns ) * FD_TXN_MAX_PER_SLOT );
523 :
524 : /*
525 : * Agave decodes precisely one array of microblocks from each batch.
526 : * As of bincode version 1.3.3, the default deserializer used when
527 : * decoding a batch in the blockstore allows for trailing bytes to be
528 : * ignored.
529 : * https://github.com/anza-xyz/agave/blob/v2.1.0/ledger/src/blockstore.rs#L3764
530 : */
531 0 : uchar allow_trailing = 1UL;
532 :
533 0 : uchar const * data = fd_blockstore_block_data_laddr( blockstore, block );
534 0 : FD_LOG_DEBUG(( "scanning slot %lu, ptr %p, sz %lu", slot, (void *)data, block->data_sz ));
535 :
536 0 : fd_block_entry_batch_t const * batch_laddr = fd_blockstore_block_batch_laddr( blockstore, block );
537 0 : ulong const batch_cnt = block->batch_cnt;
538 :
539 0 : ulong micros_cnt = 0UL;
540 0 : ulong txns_cnt = 0UL;
541 0 : ulong blockoff = 0UL;
542 0 : for( ulong batch_i = 0UL; batch_i < batch_cnt; batch_i++ ) {
543 0 : ulong const batch_end_off = batch_laddr[ batch_i ].end_off;
544 0 : if( blockoff + sizeof( ulong ) > batch_end_off ) FD_LOG_ERR(( "premature end of batch" ));
545 0 : ulong mcount = FD_LOAD( ulong, data + blockoff );
546 0 : blockoff += sizeof( ulong );
547 :
548 : /* Loop across microblocks */
549 0 : for( ulong mblk = 0; mblk < mcount; ++mblk ) {
550 0 : if( blockoff + sizeof( fd_microblock_hdr_t ) > batch_end_off )
551 0 : FD_LOG_ERR(( "premature end of batch" ));
552 0 : if( micros_cnt < FD_MICROBLOCK_MAX_PER_SLOT ) {
553 0 : fd_block_micro_t * m = micros + ( micros_cnt++ );
554 0 : m->off = blockoff;
555 0 : }
556 0 : fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)( data + blockoff );
557 0 : blockoff += sizeof( fd_microblock_hdr_t );
558 :
559 : /* Loop across transactions */
560 0 : for( ulong txn_idx = 0; txn_idx < hdr->txn_cnt; txn_idx++ ) {
561 0 : uchar txn_out[FD_TXN_MAX_SZ];
562 0 : uchar const * raw = data + blockoff;
563 0 : ulong pay_sz = 0;
564 0 : ulong txn_sz = fd_txn_parse_core( (uchar const *)raw,
565 0 : fd_ulong_min( batch_end_off - blockoff, FD_TXN_MTU ),
566 0 : txn_out,
567 0 : NULL,
568 0 : &pay_sz );
569 0 : if( txn_sz == 0 || txn_sz > FD_TXN_MTU ) {
570 0 : FD_LOG_ERR(( "failed to parse transaction %lu in microblock %lu in slot %lu. txn size: %lu",
571 0 : txn_idx,
572 0 : mblk,
573 0 : slot,
574 0 : txn_sz ));
575 0 : }
576 0 : fd_txn_t const * txn = (fd_txn_t const *)txn_out;
577 :
578 0 : if( pay_sz == 0UL )
579 0 : FD_LOG_ERR(( "failed to parse transaction %lu in microblock %lu in slot %lu",
580 0 : txn_idx,
581 0 : mblk,
582 0 : slot ));
583 :
584 0 : fd_txn_key_t const * sigs =
585 0 : (fd_txn_key_t const *)( (ulong)raw + (ulong)txn->signature_off );
586 0 : fd_txn_map_t * txn_map = fd_blockstore_txn_map( blockstore );
587 0 : for( ulong j = 0; j < txn->signature_cnt; j++ ) {
588 0 : if( FD_UNLIKELY( fd_txn_map_key_cnt( txn_map ) ==
589 0 : fd_txn_map_key_max( txn_map ) ) ) {
590 0 : break;
591 0 : }
592 0 : fd_txn_key_t sig;
593 0 : fd_memcpy( &sig, sigs + j, sizeof( sig ) );
594 0 : if( fd_txn_map_query( txn_map, &sig, NULL ) != NULL ) continue;
595 0 : fd_txn_map_t * elem = fd_txn_map_insert( txn_map, &sig );
596 0 : if( elem == NULL ) { break; }
597 0 : elem->slot = slot;
598 0 : elem->offset = blockoff;
599 0 : elem->sz = pay_sz;
600 0 : elem->meta_gaddr = 0;
601 0 : elem->meta_sz = 0;
602 0 : if( txns_cnt < FD_TXN_MAX_PER_SLOT ) {
603 0 : fd_block_txn_t * ref = &txns[txns_cnt++];
604 0 : ref->txn_off = blockoff;
605 0 : ref->id_off = (ulong)( sigs + j ) - (ulong)data;
606 0 : ref->sz = pay_sz;
607 0 : }
608 0 : }
609 :
610 0 : blockoff += pay_sz;
611 0 : }
612 0 : }
613 0 : if( FD_UNLIKELY( blockoff > batch_end_off ) ) {
614 0 : FD_LOG_ERR(( "parser error: shouldn't have been allowed to read past batch boundary" ));
615 0 : }
616 0 : if( FD_UNLIKELY( blockoff < batch_end_off ) ) {
617 0 : if( FD_LIKELY( allow_trailing ) ) {
618 0 : FD_LOG_DEBUG(( "ignoring %lu trailing bytes in slot %lu batch %lu", batch_end_off-blockoff, slot, batch_i ));
619 0 : }
620 0 : if( FD_UNLIKELY( !allow_trailing ) ) {
621 0 : FD_LOG_ERR(( "%lu trailing bytes in slot %lu batch %lu", batch_end_off-blockoff, slot, batch_i ));
622 0 : }
623 0 : }
624 0 : blockoff = batch_end_off;
625 0 : }
626 :
627 0 : fd_block_micro_t * micros_laddr =
628 0 : fd_alloc_malloc( fd_blockstore_alloc( blockstore ),
629 0 : alignof( fd_block_micro_t ),
630 0 : sizeof( fd_block_micro_t ) * micros_cnt );
631 0 : fd_memcpy( micros_laddr, micros, sizeof( fd_block_micro_t ) * micros_cnt );
632 0 : block->micros_gaddr = fd_wksp_gaddr_fast( fd_blockstore_wksp( blockstore ), micros_laddr );
633 0 : block->micros_cnt = micros_cnt;
634 :
635 0 : fd_block_txn_t * txns_laddr =
636 0 : fd_alloc_malloc( fd_blockstore_alloc( blockstore ),
637 0 : alignof( fd_block_txn_t ),
638 0 : sizeof( fd_block_txn_t ) * txns_cnt );
639 0 : fd_memcpy( txns_laddr, txns, sizeof( fd_block_txn_t ) * txns_cnt );
640 0 : block->txns_gaddr = fd_wksp_gaddr_fast( fd_blockstore_wksp( blockstore ), txns_laddr );
641 0 : block->txns_cnt = txns_cnt;
642 :
643 0 : fd_alloc_free( fd_blockstore_alloc( blockstore ), micros );
644 0 : fd_alloc_free( fd_blockstore_alloc( blockstore ), txns );
645 0 : }
646 :
647 : static int
648 0 : deshred( fd_blockstore_t * blockstore, ulong slot ) {
649 0 : FD_LOG_NOTICE(( "[%s] slot %lu", __func__, slot ));
650 :
651 : // TODO make this update non blocking
652 0 : fd_block_map_query_t query[1];
653 0 : int err = fd_block_map_prepare( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
654 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
655 0 : FD_TEST( err == FD_MAP_SUCCESS && block_info->slot == slot && block_info->block_gaddr == 0 );
656 : /* FIXME duplicate blocks are not supported */
657 :
658 0 : block_info->ts = fd_log_wallclock();
659 0 : ulong shred_cnt = block_info->slot_complete_idx + 1;
660 0 : fd_block_map_publish( query );
661 :
662 0 : ulong block_sz = 0UL;
663 0 : ulong batch_cnt = 0UL;
664 0 : fd_shred_t shred_hdr;
665 0 : for( uint idx = 0; idx < shred_cnt; idx++ ) {
666 0 : fd_shred_key_t key = { slot, idx };
667 0 : int err = FD_MAP_ERR_AGAIN;
668 0 : while( err == FD_MAP_ERR_AGAIN ) {
669 0 : fd_buf_shred_map_query_t query[1] = { 0 };
670 0 : err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query, 0 );
671 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) FD_LOG_ERR(( "[%s] map missing shred %lu %u while deshredding", __func__, slot, idx ));
672 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] map corrupt. shred %lu %u", __func__, slot, idx ));
673 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
674 0 : fd_buf_shred_t const * shred = fd_buf_shred_map_query_ele_const( query );
675 0 : shred_hdr = shred->hdr;
676 0 : err = fd_buf_shred_map_query_test( query );
677 0 : }
678 0 : FD_TEST( !err );
679 0 : block_sz += fd_shred_payload_sz( &shred_hdr );
680 0 : if( FD_LIKELY( ( shred_hdr.data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE ) ||
681 0 : shred_hdr.data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE ) ) {
682 0 : batch_cnt++;
683 0 : }
684 0 : }
685 :
686 : // alloc mem for the block
687 0 : ulong data_off = fd_ulong_align_up( sizeof(fd_block_t), 128UL );
688 0 : ulong shred_off = fd_ulong_align_up( data_off + block_sz, alignof(fd_block_shred_t) );
689 0 : ulong batch_off = fd_ulong_align_up( shred_off + (sizeof(fd_block_shred_t) * shred_cnt), alignof(fd_block_entry_batch_t) );
690 0 : ulong tot_sz = batch_off + (sizeof(fd_block_entry_batch_t) * batch_cnt);
691 :
692 0 : fd_alloc_t * alloc = fd_blockstore_alloc( blockstore );
693 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
694 0 : fd_block_t * block = fd_alloc_malloc( alloc, 128UL, tot_sz );
695 0 : if( FD_UNLIKELY( !block ) ) {
696 0 : FD_LOG_ERR(( "[%s] OOM: failed to alloc block. blockstore needs to hold in memory all blocks for slots >= SMR, so either increase memory or check for issues with publishing new SMRs.", __func__ ));
697 0 : }
698 :
699 0 : fd_memset( block, 0, sizeof(fd_block_t) );
700 :
701 0 : uchar * data_laddr = (uchar *)((ulong)block + data_off);
702 0 : block->data_gaddr = fd_wksp_gaddr_fast( wksp, data_laddr );
703 0 : block->data_sz = block_sz;
704 0 : fd_block_shred_t * shreds_laddr = (fd_block_shred_t *)((ulong)block + shred_off);
705 0 : block->shreds_gaddr = fd_wksp_gaddr_fast( wksp, shreds_laddr );
706 0 : block->shreds_cnt = shred_cnt;
707 0 : fd_block_entry_batch_t * batch_laddr = (fd_block_entry_batch_t *)((ulong)block + batch_off);
708 0 : block->batch_gaddr = fd_wksp_gaddr_fast( wksp, batch_laddr );
709 0 : block->batch_cnt = batch_cnt;
710 :
711 0 : ulong off = 0UL;
712 0 : ulong batch_i = 0UL;
713 0 : for( uint idx = 0; idx < shred_cnt; idx++ ) {
714 : // TODO can do this in one iteration with block sz loop... massage with deshredder API
715 0 : fd_shred_key_t key = { slot, idx };
716 0 : ulong payload_sz = 0UL;
717 0 : uchar flags = 0;
718 0 : int err = FD_MAP_ERR_AGAIN;
719 0 : while( err == FD_MAP_ERR_AGAIN ) {
720 0 : fd_buf_shred_map_query_t query[1] = { 0 };;
721 0 : err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query, 0 );
722 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
723 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) FD_LOG_ERR(( "[%s] map missing shred %lu %u while deshredding", __func__, slot, idx ));
724 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] map corrupt. shred %lu %u", __func__, slot, idx ));
725 0 : fd_shred_t const * shred = &fd_buf_shred_map_query_ele_const( query )->hdr;
726 0 : memcpy( data_laddr + off, fd_shred_data_payload( shred ), fd_shred_payload_sz( shred ) );
727 :
728 0 : shreds_laddr[idx].hdr = *shred;
729 0 : shreds_laddr[idx].off = off;
730 0 : FD_TEST( 0 == memcmp( &shreds_laddr[idx].hdr, shred, sizeof( fd_shred_t ) ) );
731 0 : FD_TEST( 0 == memcmp( data_laddr + shreds_laddr[idx].off, fd_shred_data_payload( shred ), fd_shred_payload_sz( shred ) ) );
732 :
733 0 : payload_sz = fd_shred_payload_sz( shred );
734 0 : flags = shred->data.flags;
735 :
736 0 : err = fd_buf_shred_map_query_test( query );
737 0 : }
738 0 : FD_TEST( !err );
739 0 : off += payload_sz;
740 0 : if( FD_LIKELY( (flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE) || flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE ) ) {
741 0 : batch_laddr[ batch_i++ ].end_off = off;
742 0 : }
743 : // fd_blockstore_shred_remove( blockstore, slot, idx );
744 0 : }
745 0 : if( FD_UNLIKELY( batch_cnt != batch_i ) ) {
746 0 : FD_LOG_ERR(( "batch_cnt(%lu)!=batch_i(%lu) potential memory corruption", batch_cnt, batch_i ));
747 0 : }
748 :
749 0 : fd_blockstore_scan_block( blockstore, slot, block );
750 :
751 : /* Do this last when it's safe */
752 0 : FD_COMPILER_MFENCE();
753 :
754 : // TODO make this non blocking
755 0 : err = fd_block_map_prepare( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
756 0 : block_info = fd_block_map_query_ele( query );
757 0 : FD_TEST( err == FD_MAP_SUCCESS && block_info->slot == slot );
758 :
759 0 : block_info->block_gaddr = fd_wksp_gaddr_fast( wksp, block );
760 0 : fd_block_micro_t * micros = fd_wksp_laddr_fast( wksp, block->micros_gaddr );
761 0 : uchar * data = fd_wksp_laddr_fast( wksp, block->data_gaddr );
762 0 : fd_microblock_hdr_t * last_micro = (fd_microblock_hdr_t *)( data + micros[block->micros_cnt - 1].off );
763 0 : memcpy( &block_info->block_hash, last_micro->hash, sizeof( fd_hash_t ) );
764 :
765 0 : block_info->flags = fd_uchar_clear_bit( block_info->flags, FD_BLOCK_FLAG_RECEIVING );
766 0 : block_info->flags = fd_uchar_set_bit( block_info->flags, FD_BLOCK_FLAG_COMPLETED );
767 0 : fd_block_map_publish( query );
768 :
769 0 : return FD_BLOCKSTORE_SUCCESS;
770 0 : }
771 :
772 : void
773 : fd_blockstore_block_allocs_remove( fd_blockstore_t * blockstore,
774 0 : ulong slot ){
775 0 : fd_block_map_query_t query[1] = { 0 };
776 0 : ulong block_gaddr = 0;
777 0 : int err = FD_MAP_ERR_AGAIN;
778 0 : while( err == FD_MAP_ERR_AGAIN ) {
779 0 : err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
780 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
781 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return; /* slot not found */
782 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
783 0 : if( FD_UNLIKELY( fd_uchar_extract_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING ) ) ) {
784 0 : FD_LOG_WARNING(( "[%s] slot %lu has replay in progress. not removing.", __func__, slot ));
785 0 : return;
786 0 : }
787 0 : block_gaddr = block_info->block_gaddr;
788 0 : err = fd_block_map_query_test( query );
789 0 : }
790 :
791 : /* Remove all the allocations relating to a block. */
792 :
793 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
794 0 : fd_alloc_t * alloc = fd_blockstore_alloc( blockstore );
795 :
796 0 : fd_txn_map_t * txn_map = fd_blockstore_txn_map( blockstore );
797 0 : fd_block_t * block = fd_wksp_laddr_fast( wksp, block_gaddr );
798 :
799 : /* DO THIS FIRST FOR THREAD SAFETY */
800 0 : FD_COMPILER_MFENCE();
801 : //block_info->block_gaddr = 0;
802 :
803 0 : uchar * data = fd_wksp_laddr_fast( wksp, block->data_gaddr );
804 0 : fd_block_txn_t * txns = fd_wksp_laddr_fast( wksp, block->txns_gaddr );
805 0 : for( ulong j = 0; j < block->txns_cnt; ++j ) {
806 0 : fd_txn_key_t sig;
807 0 : fd_memcpy( &sig, data + txns[j].id_off, sizeof( sig ) );
808 0 : fd_txn_map_remove( txn_map, &sig );
809 0 : }
810 0 : if( block->micros_gaddr ) fd_alloc_free( alloc, fd_wksp_laddr_fast( wksp, block->micros_gaddr ) );
811 0 : if( block->txns_gaddr ) fd_alloc_free( alloc, txns );
812 0 : ulong mgaddr = block->txns_meta_gaddr;
813 0 : while( mgaddr ) {
814 0 : ulong * laddr = fd_wksp_laddr_fast( wksp, mgaddr );
815 0 : ulong mgaddr2 = laddr[0]; /* link to next allocation */
816 0 : fd_alloc_free( alloc, laddr );
817 0 : mgaddr = mgaddr2;
818 0 : }
819 0 : fd_alloc_free( alloc, block );
820 0 : }
821 :
822 : int
823 : fd_rocksdb_import_block_blockstore( fd_rocksdb_t * db,
824 : fd_slot_meta_t * m,
825 : fd_blockstore_t * blockstore,
826 : int txnstatus,
827 : const uchar * hash_override,
828 0 : fd_valloc_t valloc ) {
829 0 : ulong slot = m->slot;
830 0 : ulong start_idx = 0;
831 0 : ulong end_idx = m->received;
832 :
833 0 : rocksdb_iterator_t * iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_DATA_SHRED]);
834 :
835 0 : char k[16];
836 0 : ulong slot_be = *((ulong *) &k[0]) = fd_ulong_bswap(slot);
837 0 : *((ulong *) &k[8]) = fd_ulong_bswap(start_idx);
838 :
839 0 : rocksdb_iter_seek(iter, (const char *) k, sizeof(k));
840 :
841 0 : for (ulong i = start_idx; i < end_idx; i++) {
842 0 : ulong cur_slot, index;
843 0 : uchar valid = rocksdb_iter_valid(iter);
844 :
845 0 : if (valid) {
846 0 : size_t klen = 0;
847 0 : const char* key = rocksdb_iter_key(iter, &klen); // There is no need to free key
848 0 : if (klen != 16) // invalid key
849 0 : continue;
850 0 : cur_slot = fd_ulong_bswap(*((ulong *) &key[0]));
851 0 : index = fd_ulong_bswap(*((ulong *) &key[8]));
852 0 : }
853 :
854 0 : if (!valid || cur_slot != slot) {
855 0 : FD_LOG_WARNING(("missing shreds for slot %lu", slot));
856 0 : rocksdb_iter_destroy(iter);
857 0 : return -1;
858 0 : }
859 :
860 0 : if (index != i) {
861 0 : FD_LOG_WARNING(("missing shred %lu at index %lu for slot %lu", i, index, slot));
862 0 : rocksdb_iter_destroy(iter);
863 0 : return -1;
864 0 : }
865 :
866 0 : size_t dlen = 0;
867 : // Data was first copied from disk into memory to make it available to this API
868 0 : const unsigned char *data = (const unsigned char *) rocksdb_iter_value(iter, &dlen);
869 0 : if (data == NULL) {
870 0 : FD_LOG_WARNING(("failed to read shred %lu/%lu", slot, i));
871 0 : rocksdb_iter_destroy(iter);
872 0 : return -1;
873 0 : }
874 :
875 : // This just correctly selects from inside the data pointer to the
876 : // actual data without a memory copy
877 0 : fd_shred_t const * shred = fd_shred_parse( data, (ulong) dlen );
878 0 : if (shred == NULL) {
879 0 : FD_LOG_WARNING(("failed to parse shred %lu/%lu", slot, i));
880 0 : rocksdb_iter_destroy(iter);
881 0 : return -1;
882 0 : }
883 0 : fd_blockstore_shred_insert( blockstore, shred );
884 : // if (rc != FD_BLOCKSTORE_SUCCESS_SLOT_COMPLETE && rc != FD_BLOCKSTORE_SUCCESS) {
885 : // FD_LOG_WARNING(("failed to store shred %lu/%lu", slot, i));
886 : // rocksdb_iter_destroy(iter);
887 : // return -1;
888 : // }
889 :
890 0 : rocksdb_iter_next(iter);
891 0 : }
892 :
893 0 : rocksdb_iter_destroy(iter);
894 :
895 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
896 0 : fd_block_info_t * block_info = fd_blockstore_block_map_query( blockstore, slot );
897 0 : if( FD_LIKELY( block_info && fd_blockstore_shreds_complete( blockstore, slot ) ) ) {
898 0 : deshred( blockstore, slot );
899 :
900 0 : size_t vallen = 0;
901 0 : char * err = NULL;
902 0 : char * res = rocksdb_get_cf(
903 0 : db->db,
904 0 : db->ro,
905 0 : db->cf_handles[ FD_ROCKSDB_CFIDX_BLOCKTIME ],
906 0 : (char const *)&slot_be, sizeof(ulong),
907 0 : &vallen,
908 0 : &err );
909 0 : if( FD_UNLIKELY( err ) ) {
910 0 : FD_LOG_WARNING(( "rocksdb: %s", err ));
911 0 : free( err );
912 0 : } else if(vallen == sizeof(ulong)) {
913 0 : block_info->ts = (*(long*)res)*((long)1e9); /* Convert to nanos */
914 0 : free(res);
915 0 : }
916 :
917 0 : vallen = 0;
918 0 : err = NULL;
919 0 : res = rocksdb_get_cf(
920 0 : db->db,
921 0 : db->ro,
922 0 : db->cf_handles[ FD_ROCKSDB_CFIDX_BLOCK_HEIGHT ],
923 0 : (char const *)&slot_be, sizeof(ulong),
924 0 : &vallen,
925 0 : &err );
926 0 : block_info->block_height = 0;
927 0 : if( FD_UNLIKELY( err ) ) {
928 0 : FD_LOG_WARNING(( "rocksdb: %s", err ));
929 0 : free( err );
930 0 : } else if(vallen == sizeof(ulong)) {
931 0 : block_info->block_height = *(ulong*)res;
932 0 : free(res);
933 0 : }
934 :
935 0 : vallen = 0;
936 0 : err = NULL;
937 0 : if (NULL != hash_override)
938 0 : fd_memcpy( block_info->bank_hash.hash, hash_override, 32UL );
939 0 : else {
940 0 : res = rocksdb_get_cf(
941 0 : db->db,
942 0 : db->ro,
943 0 : db->cf_handles[ FD_ROCKSDB_CFIDX_BANK_HASHES ],
944 0 : (char const *)&slot_be, sizeof(ulong),
945 0 : &vallen,
946 0 : &err );
947 0 : if( FD_UNLIKELY( err ) ) {
948 0 : FD_LOG_WARNING(( "rocksdb: %s", err ));
949 0 : free( err );
950 0 : } else {
951 0 : fd_bincode_decode_ctx_t decode = {
952 0 : .data = res,
953 0 : .dataend = res + vallen
954 0 : };
955 0 : ulong total_sz = 0UL;
956 0 : int decode_err = fd_frozen_hash_versioned_decode_footprint( &decode, &total_sz );
957 :
958 0 : uchar * mem = fd_valloc_malloc( valloc, fd_frozen_hash_versioned_align(), total_sz );
959 0 : if( NULL == mem ) {
960 0 : FD_LOG_ERR(( "fd_valloc_malloc failed" ));
961 0 : }
962 :
963 0 : fd_frozen_hash_versioned_t * versioned = fd_frozen_hash_versioned_decode( mem, &decode );
964 0 : if( FD_UNLIKELY( decode_err!=FD_BINCODE_SUCCESS ) ) goto cleanup;
965 0 : if( FD_UNLIKELY( decode.data!=decode.dataend ) ) goto cleanup;
966 0 : if( FD_UNLIKELY( versioned->discriminant !=fd_frozen_hash_versioned_enum_current ) ) goto cleanup;
967 : /* Success */
968 0 : fd_memcpy( block_info->bank_hash.hash, versioned->inner.current.frozen_hash.hash, 32UL );
969 0 : cleanup:
970 0 : free( res );
971 0 : }
972 0 : }
973 0 : }
974 :
975 0 : if( txnstatus && FD_LIKELY( block_info && fd_blockstore_shreds_complete( blockstore, slot ) ) ) {
976 0 : fd_block_t * blk = fd_wksp_laddr_fast( wksp, block_info->block_gaddr );
977 0 : uchar * data = fd_wksp_laddr_fast( wksp, blk->data_gaddr );
978 0 : fd_block_txn_t * txns = fd_wksp_laddr_fast( wksp, blk->txns_gaddr );
979 :
980 : /* TODO: change txn indexing after blockstore refactor */
981 : /* Compute the total size of the logs */
982 0 : ulong tot_meta_sz = 2*sizeof(ulong);
983 0 : for ( ulong j = 0; j < blk->txns_cnt; ++j ) {
984 0 : if( j == 0 || txns[j].txn_off != txns[j-1].txn_off ) {
985 0 : fd_txn_key_t sig;
986 0 : fd_memcpy( &sig, data + txns[j].id_off, sizeof( sig ) );
987 0 : ulong sz;
988 0 : void * raw = fd_rocksdb_get_txn_status_raw( db, slot, &sig, &sz );
989 0 : if( raw != NULL ) {
990 0 : free(raw);
991 0 : tot_meta_sz += sz;
992 0 : }
993 0 : }
994 0 : }
995 0 : fd_alloc_t * alloc = fd_blockstore_alloc( blockstore );
996 0 : uchar * cur_laddr = fd_alloc_malloc( alloc, 1, tot_meta_sz );
997 0 : if( cur_laddr == NULL ) {
998 0 : return 0;
999 0 : }
1000 0 : ((ulong*)cur_laddr)[0] = blk->txns_meta_gaddr; /* Link to previous allocation */
1001 0 : ((ulong*)cur_laddr)[1] = blk->txns_meta_sz;
1002 0 : blk->txns_meta_gaddr = fd_wksp_gaddr_fast( wksp, cur_laddr );
1003 0 : blk->txns_meta_sz = tot_meta_sz;
1004 0 : cur_laddr += 2*sizeof(ulong);
1005 :
1006 : /* Copy over the logs */
1007 0 : fd_txn_map_t * txn_map = fd_blockstore_txn_map( blockstore );
1008 0 : ulong meta_gaddr = 0;
1009 0 : ulong meta_sz = 0;
1010 0 : fd_txn_key_t sig = { 0 };
1011 0 : for ( ulong j = 0; j < blk->txns_cnt; ++j ) {
1012 0 : if( j == 0 || txns[j].txn_off != txns[j-1].txn_off ) {
1013 0 : fd_memcpy( &sig, data + txns[j].id_off, sizeof( sig ) );
1014 0 : ulong sz;
1015 0 : void * raw = fd_rocksdb_get_txn_status_raw( db, slot, &sig, &sz );
1016 0 : if( raw == NULL ) {
1017 0 : meta_gaddr = 0;
1018 0 : meta_sz = 0;
1019 0 : } else {
1020 0 : fd_memcpy(cur_laddr, raw, sz);
1021 0 : free(raw);
1022 0 : meta_gaddr = fd_wksp_gaddr_fast( wksp, cur_laddr );
1023 0 : meta_sz = sz;
1024 0 : cur_laddr += sz;
1025 0 : }
1026 0 : }
1027 0 : fd_txn_map_t * txn_map_entry = fd_txn_map_query( txn_map, &sig, NULL );
1028 0 : if( FD_UNLIKELY( !txn_map_entry ) ) {
1029 0 : char sig_str[ FD_BASE58_ENCODED_64_SZ ];
1030 0 : fd_base58_encode_64( fd_type_pun_const( sig.v ), NULL, sig_str );
1031 0 : FD_LOG_WARNING(( "missing transaction %s", sig_str ));
1032 0 : continue;
1033 0 : }
1034 0 : txn_map_entry->meta_gaddr = meta_gaddr;
1035 0 : txn_map_entry->meta_sz = meta_sz;
1036 0 : }
1037 :
1038 0 : FD_TEST( blk->txns_meta_gaddr + blk->txns_meta_sz == fd_wksp_gaddr_fast( wksp, cur_laddr ) );
1039 0 : }
1040 :
1041 0 : blockstore->shmem->lps = slot;
1042 0 : blockstore->shmem->hcs = slot;
1043 0 : blockstore->shmem->wmk = slot;
1044 :
1045 0 : if( FD_LIKELY( block_info ) ) {
1046 0 : block_info->flags =
1047 0 : fd_uchar_set_bit(
1048 0 : fd_uchar_set_bit(
1049 0 : fd_uchar_set_bit(
1050 0 : fd_uchar_set_bit(
1051 0 : fd_uchar_set_bit(
1052 0 : block_info->flags,
1053 0 : FD_BLOCK_FLAG_COMPLETED ),
1054 0 : FD_BLOCK_FLAG_PROCESSED ),
1055 0 : FD_BLOCK_FLAG_EQVOCSAFE ),
1056 0 : FD_BLOCK_FLAG_CONFIRMED ),
1057 0 : FD_BLOCK_FLAG_FINALIZED );
1058 0 : }
1059 :
1060 0 : return 0;
1061 0 : }
1062 :
1063 : int
1064 : fd_rocksdb_import_block_shredcap( fd_rocksdb_t * db,
1065 : fd_slot_meta_t * metadata,
1066 : fd_io_buffered_ostream_t * ostream,
1067 : fd_io_buffered_ostream_t * bank_hash_ostream,
1068 0 : fd_valloc_t valloc ) {
1069 0 : ulong slot = metadata->slot;
1070 :
1071 : /* pre_slot_hdr_file_offset is the current offset within the file, but
1072 : pre_slot_hdr_file_offset_real accounts for the size of the buffer that has
1073 : been filled but not flushed. This value is used to jump back into the file to
1074 : populate the payload_sz for the slot header */
1075 0 : long pre_slot_hdr_file_offset = lseek( ostream->fd, 0, SEEK_CUR );
1076 0 : long pre_slot_hdr_file_offset_real = pre_slot_hdr_file_offset + (long)ostream->wbuf_used;
1077 0 : if ( FD_UNLIKELY( pre_slot_hdr_file_offset == -1 ) ) {
1078 0 : FD_LOG_ERR(( "lseek error while seeking to current location" ));
1079 0 : }
1080 :
1081 : /* Write slot specific header */
1082 0 : fd_shredcap_slot_hdr_t slot_hdr;
1083 0 : slot_hdr.magic = FD_SHREDCAP_SLOT_HDR_MAGIC;
1084 0 : slot_hdr.version = FD_SHREDCAP_SLOT_HDR_VERSION;
1085 0 : slot_hdr.payload_sz = ULONG_MAX; /* This value is populated after slot is processed */
1086 0 : slot_hdr.slot = metadata->slot;
1087 0 : slot_hdr.consumed = metadata->consumed;
1088 0 : slot_hdr.received = metadata->received;
1089 0 : slot_hdr.first_shred_timestamp = metadata->first_shred_timestamp;
1090 0 : slot_hdr.last_index = metadata->last_index;
1091 0 : slot_hdr.parent_slot = metadata->parent_slot;
1092 0 : fd_io_buffered_ostream_write( ostream, &slot_hdr, FD_SHREDCAP_SLOT_HDR_FOOTPRINT );
1093 :
1094 : /* We need to track the payload size */
1095 0 : ulong payload_sz = 0;
1096 :
1097 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf( db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_DATA_SHRED] );
1098 :
1099 0 : char k[16];
1100 0 : ulong slot_be = *((ulong *) &k[0]) = fd_ulong_bswap( slot );
1101 0 : *((ulong *) &k[8]) = fd_ulong_bswap( 0 );
1102 :
1103 0 : rocksdb_iter_seek( iter, (const char *) k, sizeof(k) );
1104 :
1105 0 : ulong start_idx = 0;
1106 0 : ulong end_idx = metadata->received;
1107 0 : for ( ulong i = start_idx; i < end_idx; i++ ) {
1108 0 : ulong cur_slot, index;
1109 0 : uchar valid = rocksdb_iter_valid( iter );
1110 :
1111 0 : if ( valid ) {
1112 0 : size_t klen = 0;
1113 0 : const char* key = rocksdb_iter_key( iter, &klen ); // There is no need to free key
1114 0 : if ( klen != 16 ) { // invalid key
1115 0 : continue;
1116 0 : }
1117 0 : cur_slot = fd_ulong_bswap(*((ulong *) &key[0]));
1118 0 : index = fd_ulong_bswap(*((ulong *) &key[8]));
1119 0 : }
1120 :
1121 0 : if ( !valid || cur_slot != slot ) {
1122 0 : FD_LOG_WARNING(( "missing shreds for slot %lu", slot ));
1123 0 : rocksdb_iter_destroy( iter );
1124 0 : return -1;
1125 0 : }
1126 :
1127 0 : if ( index != i ) {
1128 0 : FD_LOG_WARNING(( "missing shred %lu at index %lu for slot %lu", i, index, slot ));
1129 0 : rocksdb_iter_destroy( iter );
1130 0 : return -1;
1131 0 : }
1132 :
1133 0 : size_t dlen = 0;
1134 : // Data was first copied from disk into memory to make it available to this API
1135 0 : const unsigned char *data = (const unsigned char *) rocksdb_iter_value( iter, &dlen );
1136 0 : if ( data == NULL ) {
1137 0 : FD_LOG_WARNING(( "failed to read shred %lu/%lu", slot, i ));
1138 0 : rocksdb_iter_destroy( iter );
1139 0 : return -1;
1140 0 : }
1141 :
1142 0 : fd_shred_t const * shred = fd_shred_parse( data, (ulong) dlen );
1143 0 : if ( shred == NULL ) {
1144 0 : FD_LOG_WARNING(( "failed to parse shred %lu/%lu", slot, i ));
1145 0 : rocksdb_iter_destroy( iter );
1146 0 : return -1;
1147 0 : }
1148 :
1149 : /* Write a shred header and shred. Each shred and it's header will be aligned */
1150 0 : char shred_buf[ FD_SHREDCAP_SHRED_MAX ];
1151 0 : char * shred_buf_ptr = shred_buf;
1152 0 : ushort shred_sz = (ushort)fd_shred_sz( shred );
1153 0 : uint shred_boundary_sz = (uint)fd_uint_align_up( shred_sz + FD_SHREDCAP_SHRED_HDR_FOOTPRINT,
1154 0 : FD_SHREDCAP_ALIGN ) - FD_SHREDCAP_SHRED_HDR_FOOTPRINT;
1155 :
1156 0 : fd_memset( shred_buf_ptr, 0, shred_boundary_sz );
1157 : /* Populate start of buffer with header */
1158 0 : fd_shredcap_shred_hdr_t * shred_hdr = (fd_shredcap_shred_hdr_t*)shred_buf_ptr;
1159 0 : shred_hdr->hdr_sz = FD_SHREDCAP_SHRED_HDR_FOOTPRINT;
1160 0 : shred_hdr->shred_sz = shred_sz;
1161 0 : shred_hdr->shred_boundary_sz = shred_boundary_sz;
1162 :
1163 : /* Skip ahead and populate rest of buffer with shred and write out */
1164 0 : fd_memcpy( shred_buf_ptr + FD_SHREDCAP_SHRED_HDR_FOOTPRINT, shred, shred_boundary_sz );
1165 0 : fd_io_buffered_ostream_write( ostream, shred_buf_ptr,
1166 0 : shred_boundary_sz + FD_SHREDCAP_SHRED_HDR_FOOTPRINT );
1167 :
1168 0 : payload_sz += shred_boundary_sz + FD_SHREDCAP_SHRED_HDR_FOOTPRINT;
1169 0 : rocksdb_iter_next( iter );
1170 0 : }
1171 :
1172 : /* Update file size */
1173 0 : long pre_slot_processed_file_offset = lseek( ostream->fd, 0, SEEK_CUR );
1174 0 : if ( FD_UNLIKELY( pre_slot_processed_file_offset == -1 ) ) {
1175 0 : FD_LOG_ERR(( "lseek error when seeking to current position" ));
1176 0 : }
1177 :
1178 0 : if ( FD_UNLIKELY( pre_slot_processed_file_offset == pre_slot_hdr_file_offset ) ) {
1179 : /* This case is when the payload from the shreds is smaller than the free
1180 : space from the write buffer. This means that the buffer was not flushed
1181 : at any point. This case is highly unlikely */
1182 0 : fd_io_buffered_ostream_flush( ostream );
1183 0 : }
1184 :
1185 : /* Safely assume that the buffer was flushed to the file at least once. Store
1186 : original seek position, skip to position with payload_sz in header, write
1187 : updated payload sz, and then reset seek position. */
1188 0 : long original_offset = lseek( ostream->fd, 0, SEEK_CUR );
1189 0 : if ( FD_UNLIKELY( original_offset == -1 ) ) {
1190 0 : FD_LOG_ERR(( "lseek error when seeking to current position" ));
1191 0 : }
1192 0 : long payload_sz_file_offset = pre_slot_hdr_file_offset_real +
1193 0 : (long)FD_SHREDCAP_SLOT_HDR_PAYLOAD_SZ_OFFSET;
1194 :
1195 0 : long offset;
1196 0 : offset = lseek( ostream->fd, payload_sz_file_offset, SEEK_SET );
1197 0 : if ( FD_UNLIKELY( offset == -1 ) ) {
1198 0 : FD_LOG_ERR(( "lseek error when seeking to offset=%ld", payload_sz_file_offset ));
1199 0 : }
1200 0 : ulong to_write;
1201 0 : fd_io_write( ostream->fd, &payload_sz, sizeof(ulong), sizeof(ulong), &to_write );
1202 :
1203 0 : offset = lseek( ostream->fd, original_offset, SEEK_SET );
1204 0 : if ( FD_UNLIKELY( offset == -1 ) ) {
1205 0 : FD_LOG_ERR(( "lseek error when seeking to offset=%ld", original_offset ));
1206 0 : }
1207 :
1208 : /* Write slot footer */
1209 0 : fd_shredcap_slot_ftr_t slot_ftr;
1210 0 : slot_ftr.magic = FD_SHREDCAP_SLOT_FTR_MAGIC;
1211 0 : slot_ftr.payload_sz = payload_sz;
1212 0 : fd_io_buffered_ostream_write( ostream, &slot_ftr, FD_SHREDCAP_SLOT_FTR_FOOTPRINT );
1213 0 : rocksdb_iter_destroy( iter );
1214 :
1215 : /* Get and write bank hash information to respective file */
1216 0 : size_t vallen = 0;
1217 0 : char * err = NULL;
1218 0 : char * res = rocksdb_get_cf( db->db, db->ro, db->cf_handles[ FD_ROCKSDB_CFIDX_BANK_HASHES ],
1219 0 : (char const *)&slot_be, sizeof(ulong), &vallen, &err );
1220 0 : if( FD_UNLIKELY( err ) ) {
1221 0 : FD_LOG_WARNING((" Could not get bank hash data due to err=%s",err ));
1222 0 : free( err );
1223 0 : } else {
1224 0 : fd_bincode_decode_ctx_t decode = {
1225 0 : .data = res,
1226 0 : .dataend = res + vallen,
1227 0 : };
1228 0 : ulong total_sz = 0UL;
1229 0 : int decode_err = fd_frozen_hash_versioned_decode_footprint( &decode, &total_sz );
1230 :
1231 0 : uchar * mem = fd_valloc_malloc( valloc, fd_frozen_hash_versioned_align(), total_sz );
1232 :
1233 0 : fd_frozen_hash_versioned_t * versioned = fd_frozen_hash_versioned_decode( mem, &decode );
1234 :
1235 0 : if( FD_UNLIKELY( decode_err != FD_BINCODE_SUCCESS ) ) goto cleanup;
1236 0 : if( FD_UNLIKELY( decode.data!=decode.dataend ) ) goto cleanup;
1237 0 : if( FD_UNLIKELY( versioned->discriminant != fd_frozen_hash_versioned_enum_current ) ) goto cleanup;
1238 0 : fd_shredcap_bank_hash_entry_t bank_hash_entry;
1239 0 : bank_hash_entry.slot = slot;
1240 0 : fd_memcpy( &bank_hash_entry.bank_hash, versioned->inner.current.frozen_hash.hash, 32UL );
1241 0 : fd_io_buffered_ostream_write( bank_hash_ostream, &bank_hash_entry, FD_SHREDCAP_BANK_HASH_ENTRY_FOOTPRINT );
1242 0 : cleanup:
1243 0 : free( res );
1244 0 : }
1245 0 : return 0;
1246 0 : }
|