Line data Source code
1 : #include "fd_rocksdb.h"
2 : #include "fd_blockstore.h"
3 : #include "../shredcap/fd_shredcap.h"
4 : #include <stdbool.h>
5 : #include <stdlib.h>
6 : #include <stdio.h>
7 : #include <unistd.h>
8 : #include "../../util/bits/fd_bits.h"
9 :
10 : char *
11 : fd_rocksdb_init( fd_rocksdb_t * db,
12 0 : char const * db_name ) {
13 0 : fd_memset(db, 0, sizeof(fd_rocksdb_t));
14 :
15 0 : db->opts = rocksdb_options_create();
16 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEFAULT ] = "default";
17 0 : db->cfgs[ FD_ROCKSDB_CFIDX_META ] = "meta";
18 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEAD_SLOTS ] = "dead_slots";
19 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DUPLICATE_SLOTS ] = "duplicate_slots";
20 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ERASURE_META ] = "erasure_meta";
21 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ORPHANS ] = "orphans";
22 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BANK_HASHES ] = "bank_hashes";
23 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ROOT ] = "root";
24 0 : db->cfgs[ FD_ROCKSDB_CFIDX_INDEX ] = "index";
25 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DATA_SHRED ] = "data_shred";
26 0 : db->cfgs[ FD_ROCKSDB_CFIDX_CODE_SHRED ] = "code_shred";
27 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ] = "transaction_status";
28 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ] = "address_signatures";
29 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ] = "transaction_memos";
30 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS_INDEX ] = "transaction_status_index";
31 0 : db->cfgs[ FD_ROCKSDB_CFIDX_REWARDS ] = "rewards";
32 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCKTIME ] = "blocktime";
33 0 : db->cfgs[ FD_ROCKSDB_CFIDX_PERF_SAMPLES ] = "perf_samples";
34 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCK_HEIGHT ] = "block_height";
35 0 : db->cfgs[ FD_ROCKSDB_CFIDX_PROGRAM_COSTS ] = "program_costs";
36 0 : db->cfgs[ FD_ROCKSDB_CFIDX_OPTIMISTIC_SLOTS ] = "optimistic_slots";
37 0 : db->cfgs[ FD_ROCKSDB_CFIDX_MERKLE_ROOT_META ] = "merkle_root_meta";
38 :
39 :
40 0 : rocksdb_options_t const * cf_options[ FD_ROCKSDB_CF_CNT ];
41 0 : for( ulong i=0UL; i<FD_ROCKSDB_CF_CNT; i++ )
42 0 : cf_options[ i ] = db->opts;
43 :
44 0 : char *err = NULL;
45 :
46 0 : db->db = rocksdb_open_for_read_only_column_families(
47 0 : db->opts,
48 0 : db_name,
49 0 : FD_ROCKSDB_CF_CNT,
50 0 : (char const * const *)db->cfgs,
51 0 : (rocksdb_options_t const * const *)cf_options,
52 0 : db->cf_handles,
53 0 : false,
54 0 : &err );
55 :
56 0 : if( FD_UNLIKELY( err ) ) return err;
57 :
58 0 : db->ro = rocksdb_readoptions_create();
59 :
60 0 : return NULL;
61 0 : }
62 :
63 : void
64 : fd_rocksdb_new( fd_rocksdb_t * db,
65 0 : char const * db_name ) {
66 0 : fd_memset(db, 0, sizeof(fd_rocksdb_t));
67 :
68 0 : db->opts = rocksdb_options_create();
69 : /* Create the db*/
70 0 : rocksdb_options_set_create_if_missing(db->opts, 1);
71 :
72 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEFAULT ] = "default";
73 0 : db->cfgs[ FD_ROCKSDB_CFIDX_META ] = "meta";
74 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEAD_SLOTS ] = "dead_slots";
75 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DUPLICATE_SLOTS ] = "duplicate_slots";
76 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ERASURE_META ] = "erasure_meta";
77 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ORPHANS ] = "orphans";
78 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BANK_HASHES ] = "bank_hashes";
79 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ROOT ] = "root";
80 0 : db->cfgs[ FD_ROCKSDB_CFIDX_INDEX ] = "index";
81 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DATA_SHRED ] = "data_shred";
82 0 : db->cfgs[ FD_ROCKSDB_CFIDX_CODE_SHRED ] = "code_shred";
83 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ] = "transaction_status";
84 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ] = "address_signatures";
85 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ] = "transaction_memos";
86 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS_INDEX ] = "transaction_status_index";
87 0 : db->cfgs[ FD_ROCKSDB_CFIDX_REWARDS ] = "rewards";
88 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCKTIME ] = "blocktime";
89 0 : db->cfgs[ FD_ROCKSDB_CFIDX_PERF_SAMPLES ] = "perf_samples";
90 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCK_HEIGHT ] = "block_height";
91 0 : db->cfgs[ FD_ROCKSDB_CFIDX_PROGRAM_COSTS ] = "program_costs";
92 0 : db->cfgs[ FD_ROCKSDB_CFIDX_OPTIMISTIC_SLOTS ] = "optimistic_slots";
93 0 : db->cfgs[ FD_ROCKSDB_CFIDX_MERKLE_ROOT_META ] = "merkle_root_meta";
94 :
95 : /* Create the rocksdb */
96 0 : char * err = NULL;
97 0 : db->db = rocksdb_open(db->opts, db_name, &err);
98 0 : if ( err != NULL ) {
99 0 : FD_LOG_ERR(("rocksdb creation failed: %s", err));
100 0 : }
101 :
102 0 : db->wo = rocksdb_writeoptions_create();
103 :
104 : /* Create column families, default already exists at index 0 */
105 0 : for ( ulong i = 1; i < FD_ROCKSDB_CF_CNT; ++i ) {
106 0 : db->cf_handles[i] = rocksdb_create_column_family(db->db, db->opts, db->cfgs[i], &err);
107 0 : }
108 0 : rocksdb_options_set_compression( db->opts, rocksdb_lz4_compression );
109 0 : }
110 :
111 0 : void fd_rocksdb_destroy(fd_rocksdb_t *db) {
112 :
113 0 : for( ulong i=0UL; i<FD_ROCKSDB_CF_CNT; i++ ) {
114 0 : if( db->cf_handles[i] ) {
115 0 : rocksdb_column_family_handle_destroy( db->cf_handles[i] );
116 0 : db->cf_handles[i] = NULL;
117 0 : }
118 0 : }
119 :
120 0 : if( db->ro ) {
121 0 : rocksdb_readoptions_destroy( db->ro );
122 0 : db->ro = NULL;
123 0 : }
124 :
125 0 : if( db->opts ) {
126 0 : rocksdb_options_destroy( db->opts );
127 0 : db->opts = NULL;
128 0 : }
129 :
130 0 : if( db->db ) {
131 0 : rocksdb_close( db->db );
132 0 : db->db = NULL;
133 0 : }
134 :
135 0 : if( db->wo ) {
136 0 : rocksdb_writeoptions_destroy( db->wo );
137 0 : }
138 0 : }
139 :
140 0 : ulong fd_rocksdb_last_slot(fd_rocksdb_t *db, char **err) {
141 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
142 0 : rocksdb_iter_seek_to_last(iter);
143 0 : if (!rocksdb_iter_valid(iter)) {
144 0 : rocksdb_iter_destroy(iter);
145 0 : *err = "db column for root is empty";
146 0 : return 0;
147 0 : }
148 :
149 0 : size_t klen = 0;
150 0 : const char *key = rocksdb_iter_key(iter, &klen); // There is no need to free key
151 0 : unsigned long slot = fd_ulong_bswap(*((unsigned long *) key));
152 0 : rocksdb_iter_destroy(iter);
153 0 : return slot;
154 0 : }
155 :
156 0 : ulong fd_rocksdb_find_last_slot(fd_rocksdb_t *db, char **err) {
157 0 : ulong max_slot = 0;
158 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
159 0 : rocksdb_iter_seek_to_first(iter);
160 0 : if (!rocksdb_iter_valid(iter)) {
161 0 : rocksdb_iter_destroy(iter);
162 0 : *err = "db column for root is empty";
163 0 : return 0;
164 0 : }
165 :
166 0 : for( ; rocksdb_iter_valid(iter); rocksdb_iter_next(iter) ) {
167 0 : size_t klen = 0;
168 0 : const char *key = rocksdb_iter_key(iter, &klen); // There is no need to free key
169 0 : unsigned long slot = fd_ulong_bswap(*((unsigned long *) key));
170 :
171 0 : if( slot > max_slot ) {
172 0 : max_slot = slot;
173 0 : FD_LOG_WARNING(("new max_slot: %lu", max_slot));
174 0 : }
175 0 : }
176 :
177 0 : rocksdb_iter_destroy(iter);
178 0 : return max_slot;
179 0 : }
180 :
181 :
182 : ulong
183 : fd_rocksdb_first_slot( fd_rocksdb_t * db,
184 0 : char ** err ) {
185 :
186 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
187 0 : rocksdb_iter_seek_to_first(iter);
188 0 : if( FD_UNLIKELY( !rocksdb_iter_valid(iter) ) ) {
189 0 : rocksdb_iter_destroy(iter);
190 0 : *err = "db column for root is empty";
191 0 : return 0;
192 0 : }
193 :
194 0 : ulong klen = 0;
195 0 : char const * key = rocksdb_iter_key( iter, &klen ); // There is no need to free key
196 0 : ulong slot = fd_ulong_bswap( *((ulong *)key));
197 0 : rocksdb_iter_destroy(iter);
198 0 : return slot;
199 0 : }
200 :
201 : int
202 : fd_rocksdb_get_meta( fd_rocksdb_t * db,
203 : ulong slot,
204 : fd_slot_meta_t * m,
205 0 : fd_valloc_t valloc ) {
206 0 : ulong ks = fd_ulong_bswap(slot);
207 0 : size_t vallen = 0;
208 :
209 0 : char * err = NULL;
210 0 : char * meta = rocksdb_get_cf( db->db,
211 0 : db->ro,
212 0 : db->cf_handles[FD_ROCKSDB_CFIDX_META],
213 0 : (const char *) &ks,
214 0 : sizeof(ks),
215 0 : &vallen,
216 0 : &err );
217 :
218 0 : if( NULL != err ) {
219 0 : FD_LOG_WARNING(( "%s", err ));
220 0 : free( err );
221 0 : return -2;
222 0 : }
223 :
224 0 : if (0 == vallen)
225 0 : return -1;
226 :
227 0 : fd_bincode_decode_ctx_t ctx;
228 0 : ctx.data = meta;
229 0 : ctx.dataend = &meta[vallen];
230 :
231 0 : ulong total_sz = 0UL;
232 0 : if( fd_slot_meta_decode_footprint( &ctx, &total_sz ) ) {
233 0 : FD_LOG_ERR(( "fd_slot_meta_decode failed" ));
234 0 : }
235 :
236 0 : uchar * mem = fd_valloc_malloc( valloc, fd_slot_meta_align(), total_sz );
237 0 : if( NULL == mem ) {
238 0 : FD_LOG_ERR(( "fd_valloc_malloc failed" ));
239 0 : }
240 :
241 0 : fd_slot_meta_decode( mem, &ctx );
242 :
243 0 : fd_memcpy( m, mem, sizeof(fd_slot_meta_t) );
244 :
245 0 : free(meta);
246 :
247 0 : return 0;
248 0 : }
249 :
250 : void *
251 0 : fd_rocksdb_root_iter_new ( void * ptr ) {
252 0 : fd_memset(ptr, 0, sizeof(fd_rocksdb_root_iter_t));
253 0 : return ptr;
254 0 : }
255 :
256 : fd_rocksdb_root_iter_t *
257 0 : fd_rocksdb_root_iter_join ( void * ptr ) {
258 0 : return (fd_rocksdb_root_iter_t *) ptr;
259 0 : }
260 :
261 : void *
262 0 : fd_rocksdb_root_iter_leave ( fd_rocksdb_root_iter_t * ptr ) {
263 0 : return ptr;
264 0 : }
265 :
266 : int
267 : fd_rocksdb_root_iter_seek( fd_rocksdb_root_iter_t * self,
268 : fd_rocksdb_t * db,
269 : ulong slot,
270 : fd_slot_meta_t * m,
271 0 : fd_valloc_t valloc ) {
272 0 : self->db = db;
273 :
274 0 : if( FD_UNLIKELY( !self->iter ) )
275 0 : self->iter = rocksdb_create_iterator_cf(self->db->db, self->db->ro, self->db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
276 :
277 0 : ulong ks = fd_ulong_bswap( slot );
278 :
279 0 : rocksdb_iter_seek( self->iter, (char const *)&ks, sizeof(ulong) );
280 0 : if( FD_UNLIKELY( !rocksdb_iter_valid(self->iter) ) )
281 0 : return -1;
282 :
283 0 : size_t klen = 0;
284 0 : char const * key = rocksdb_iter_key( self->iter, &klen ); // There is no need to free key
285 0 : ulong kslot = fd_ulong_bswap( *((ulong *)key) );
286 :
287 0 : if( FD_UNLIKELY( kslot != slot ) ) {
288 0 : FD_LOG_WARNING(( "fd_rocksdb_root_iter_seek: wanted slot %lu, found %lu",
289 0 : slot, kslot ));
290 0 : return -2;
291 0 : }
292 :
293 0 : return fd_rocksdb_get_meta( self->db, slot, m, valloc );
294 0 : }
295 :
296 : int
297 0 : fd_rocksdb_root_iter_slot ( fd_rocksdb_root_iter_t * self, ulong *slot ) {
298 0 : if ((NULL == self->db) || (NULL == self->iter))
299 0 : return -1;
300 :
301 0 : if (!rocksdb_iter_valid(self->iter))
302 0 : return -2;
303 :
304 0 : size_t klen = 0;
305 0 : const char *key = rocksdb_iter_key(self->iter, &klen); // There is no need to free key
306 0 : *slot = fd_ulong_bswap(*((unsigned long *) key));
307 0 : return 0;
308 0 : }
309 :
310 : int
311 : fd_rocksdb_root_iter_next( fd_rocksdb_root_iter_t * self,
312 : fd_slot_meta_t * m,
313 0 : fd_valloc_t valloc ) {
314 0 : if ((NULL == self->db) || (NULL == self->iter))
315 0 : return -1;
316 :
317 0 : if (!rocksdb_iter_valid(self->iter))
318 0 : return -2;
319 :
320 0 : rocksdb_iter_next(self->iter);
321 :
322 0 : if (!rocksdb_iter_valid(self->iter))
323 0 : return -3;
324 :
325 0 : size_t klen = 0;
326 0 : const char *key = rocksdb_iter_key(self->iter, &klen); // There is no need to free key
327 :
328 0 : return fd_rocksdb_get_meta( self->db, fd_ulong_bswap(*((unsigned long *) key)), m, valloc );
329 0 : }
330 :
331 : void
332 0 : fd_rocksdb_root_iter_destroy ( fd_rocksdb_root_iter_t * self ) {
333 0 : if (NULL != self->iter) {
334 0 : rocksdb_iter_destroy(self->iter);
335 0 : self->iter = 0;
336 0 : }
337 0 : self->db = NULL;
338 0 : }
339 :
340 : void *
341 : fd_rocksdb_get_txn_status_raw( fd_rocksdb_t * self,
342 : ulong slot,
343 : void const * sig,
344 0 : ulong * psz ) {
345 :
346 0 : ulong slot_be = fd_ulong_bswap( slot );
347 :
348 : /* Construct RocksDB query key */
349 0 : char key[72];
350 0 : memcpy( key, sig, 64UL );
351 0 : memcpy( key+64UL, &slot_be, 8UL );
352 :
353 : /* Query record */
354 0 : char * err = NULL;
355 0 : char * res = rocksdb_get_cf(
356 0 : self->db, self->ro,
357 0 : self->cf_handles[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ],
358 0 : key, 72UL,
359 0 : psz,
360 0 : &err );
361 :
362 0 : if( FD_UNLIKELY( err ) ) {
363 0 : FD_LOG_WARNING(("err=%s", err));
364 0 : free( err );
365 0 : return NULL;
366 0 : }
367 0 : return res;
368 0 : }
369 :
370 : ulong
371 0 : fd_rocksdb_get_slot( ulong cf_idx, char const * key ) {
372 0 : switch (cf_idx) {
373 0 : case FD_ROCKSDB_CFIDX_TRANSACTION_STATUS:
374 0 : return fd_ulong_bswap(*((ulong *) &key[72])); /* (signature,slot)*/
375 0 : case FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES:
376 0 : return fd_ulong_bswap(*((ulong *) &key[40])); /* (pubkey,slot,u32,signature) */
377 0 : default: /* all other cfs have the slot at the start */
378 0 : return fd_ulong_bswap( *((ulong *)&key[0]) ); /* The key is just the slot number */
379 0 : }
380 :
381 0 : return fd_ulong_bswap( *((ulong *)key) );
382 0 : }
383 :
384 : void
385 0 : fd_rocksdb_iter_seek_to_slot_if_possible( rocksdb_iterator_t * iter, const ulong cf_idx, const ulong slot ) {
386 0 : ulong k = fd_ulong_bswap(slot);
387 0 : switch (cf_idx) {
388 : /* These cfs do not have the slot at the start, we can't seek based on slot prefix */
389 0 : case FD_ROCKSDB_CFIDX_TRANSACTION_STATUS:
390 0 : case FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES:
391 0 : rocksdb_iter_seek_to_first( iter );
392 0 : break;
393 0 : default: /* all other cfs have the slot at the start, seek based on slot prefix */
394 0 : rocksdb_iter_seek( iter, (const char *)&k, 8);
395 0 : break;
396 0 : }
397 0 : }
398 :
399 : int
400 : fd_rocksdb_copy_over_slot_indexed_range( fd_rocksdb_t * src,
401 : fd_rocksdb_t * dst,
402 : ulong cf_idx,
403 : ulong start_slot,
404 0 : ulong end_slot ) {
405 0 : FD_LOG_NOTICE(( "fd_rocksdb_copy_over_slot_indexed_range: %lu", cf_idx ));
406 :
407 0 : if ( cf_idx == FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ||
408 0 : cf_idx == FD_ROCKSDB_CFIDX_PROGRAM_COSTS ||
409 0 : cf_idx == FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ||
410 0 : cf_idx == FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ) {
411 0 : FD_LOG_NOTICE(( "fd_rocksdb_copy_over_range: skipping cf_idx=%lu because not slot indexed", cf_idx ));
412 0 : return 0;
413 0 : }
414 :
415 0 : rocksdb_iterator_t * iter = rocksdb_create_iterator_cf( src->db, src->ro, src->cf_handles[cf_idx] );
416 0 : if ( FD_UNLIKELY( iter == NULL ) ) {
417 0 : FD_LOG_ERR(( "rocksdb_create_iterator_cf failed for cf_idx=%lu", cf_idx ));
418 0 : }
419 :
420 0 : for ( fd_rocksdb_iter_seek_to_slot_if_possible( iter, cf_idx, start_slot ); rocksdb_iter_valid( iter ); rocksdb_iter_next( iter ) ) {
421 0 : ulong klen = 0;
422 0 : char const * key = rocksdb_iter_key( iter, &klen ); // There is no need to free key
423 :
424 0 : ulong slot = fd_rocksdb_get_slot( cf_idx, key );
425 0 : if ( slot < start_slot ) {
426 0 : continue;
427 0 : }
428 0 : else if ( slot > end_slot ) {
429 0 : break;
430 0 : }
431 :
432 0 : ulong vlen = 0;
433 0 : char const * value = rocksdb_iter_value( iter, &vlen );
434 :
435 0 : fd_rocksdb_insert_entry( dst, cf_idx, key, klen, value, vlen );
436 0 : }
437 0 : rocksdb_iter_destroy( iter );
438 0 : return 0;
439 0 : }
440 :
441 : int
442 : fd_rocksdb_copy_over_txn_status_range( fd_rocksdb_t * src,
443 : fd_rocksdb_t * dst,
444 : fd_blockstore_t * blockstore,
445 : ulong start_slot,
446 0 : ulong end_slot ) {
447 : /* Look up the blocks data and iterate through its transactions */
448 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
449 :
450 0 : for ( ulong slot = start_slot; slot <= end_slot; ++slot ) {
451 0 : FD_LOG_NOTICE(( "fd_rocksdb_copy_over_txn_status_range: %lu", slot ));
452 0 : fd_block_info_t * block_entry = fd_blockstore_block_map_query( blockstore, slot );
453 0 : if( FD_LIKELY( block_entry && fd_blockstore_shreds_complete( blockstore, slot) ) ) {
454 0 : fd_block_t * blk = fd_wksp_laddr_fast( wksp, block_entry->block_gaddr );
455 0 : uchar * data = fd_wksp_laddr_fast( wksp, blk->data_gaddr );
456 0 : fd_block_txn_t * txns = fd_wksp_laddr_fast( wksp, blk->txns_gaddr );
457 : /* TODO: change txn indexing after blockstore refactor */
458 0 : ulong last_txn_off = ULONG_MAX;
459 0 : for ( ulong j = 0; j < blk->txns_cnt; ++j ) {
460 0 : fd_txn_key_t sig;
461 0 : fd_memcpy( &sig, data + txns[j].id_off, sizeof(sig) );
462 0 : if( txns[j].txn_off != last_txn_off ) {
463 0 : last_txn_off = txns[j].txn_off;
464 0 : fd_rocksdb_copy_over_txn_status( src, dst, slot, &sig );
465 0 : }
466 0 : }
467 0 : }
468 0 : }
469 0 : return 0;
470 0 : }
471 :
472 : void
473 : fd_rocksdb_copy_over_txn_status( fd_rocksdb_t * src,
474 : fd_rocksdb_t * dst,
475 : ulong slot,
476 0 : void const * sig ) {
477 0 : ulong slot_be = fd_ulong_bswap( slot );
478 :
479 : /* Construct RocksDB query key */
480 : /* TODO: Replace with constants */
481 0 : char key[ 72 ];
482 0 : memcpy( key, sig, 64UL );
483 0 : memcpy( key+64UL, &slot_be, 8UL );
484 :
485 : /* Query record */
486 0 : ulong sz;
487 0 : char * err = NULL;
488 0 : char * res = rocksdb_get_cf(
489 0 : src->db, src->ro, src->cf_handles[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ],
490 0 : key, 72UL, &sz, &err );
491 :
492 0 : if( FD_UNLIKELY( err ) ) {
493 0 : FD_LOG_WARNING(("err=%s", err));
494 0 : free( err );
495 0 : return;
496 0 : }
497 :
498 0 : fd_rocksdb_insert_entry( dst, FD_ROCKSDB_CFIDX_TRANSACTION_STATUS, key, 72UL, res, sz );
499 0 : }
500 :
501 : int
502 : fd_rocksdb_insert_entry( fd_rocksdb_t * db,
503 : ulong cf_idx,
504 : const char * key,
505 : ulong klen,
506 : const char * value,
507 : ulong vlen )
508 0 : {
509 0 : char * err = NULL;
510 0 : rocksdb_put_cf( db->db, db->wo, db->cf_handles[cf_idx],
511 0 : key, klen, value, vlen, &err );
512 0 : if( FD_UNLIKELY( err != NULL ) ) {
513 0 : FD_LOG_WARNING(( "rocksdb_put_cf failed with error %s", err ));
514 0 : return -1;
515 0 : }
516 0 : return 0;
517 0 : }
518 :
519 : static void
520 0 : fd_blockstore_scan_block( fd_blockstore_t * blockstore, ulong slot, fd_block_t * block ) {
521 :
522 0 : fd_block_micro_t * micros = fd_alloc_malloc( fd_blockstore_alloc( blockstore ),
523 0 : alignof( fd_block_micro_t ),
524 0 : sizeof( *micros ) * FD_MICROBLOCK_MAX_PER_SLOT );
525 0 : fd_block_txn_t * txns = fd_alloc_malloc( fd_blockstore_alloc( blockstore ),
526 0 : alignof( fd_block_txn_t ),
527 0 : sizeof( *txns ) * FD_TXN_MAX_PER_SLOT );
528 :
529 : /*
530 : * Agave decodes precisely one array of microblocks from each batch.
531 : * As of bincode version 1.3.3, the default deserializer used when
532 : * decoding a batch in the blockstore allows for trailing bytes to be
533 : * ignored.
534 : * https://github.com/anza-xyz/agave/blob/v2.1.0/ledger/src/blockstore.rs#L3764
535 : */
536 0 : uchar allow_trailing = 1UL;
537 :
538 0 : uchar const * data = fd_blockstore_block_data_laddr( blockstore, block );
539 0 : FD_LOG_DEBUG(( "scanning slot %lu, ptr %p, sz %lu", slot, (void *)data, block->data_sz ));
540 :
541 0 : fd_block_entry_batch_t const * batch_laddr = fd_blockstore_block_batch_laddr( blockstore, block );
542 0 : ulong const batch_cnt = block->batch_cnt;
543 :
544 0 : ulong micros_cnt = 0UL;
545 0 : ulong txns_cnt = 0UL;
546 0 : ulong blockoff = 0UL;
547 0 : for( ulong batch_i = 0UL; batch_i < batch_cnt; batch_i++ ) {
548 0 : ulong const batch_end_off = batch_laddr[ batch_i ].end_off;
549 0 : if( blockoff + sizeof( ulong ) > batch_end_off ) FD_LOG_ERR(( "premature end of batch" ));
550 0 : ulong mcount = FD_LOAD( ulong, data + blockoff );
551 0 : blockoff += sizeof( ulong );
552 :
553 : /* Loop across microblocks */
554 0 : for( ulong mblk = 0; mblk < mcount; ++mblk ) {
555 0 : if( blockoff + sizeof( fd_microblock_hdr_t ) > batch_end_off )
556 0 : FD_LOG_ERR(( "premature end of batch" ));
557 0 : if( micros_cnt < FD_MICROBLOCK_MAX_PER_SLOT ) {
558 0 : fd_block_micro_t * m = micros + ( micros_cnt++ );
559 0 : m->off = blockoff;
560 0 : }
561 0 : fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)( data + blockoff );
562 0 : blockoff += sizeof( fd_microblock_hdr_t );
563 :
564 : /* Loop across transactions */
565 0 : for( ulong txn_idx = 0; txn_idx < hdr->txn_cnt; txn_idx++ ) {
566 0 : uchar txn_out[FD_TXN_MAX_SZ];
567 0 : uchar const * raw = data + blockoff;
568 0 : ulong pay_sz = 0;
569 0 : ulong txn_sz = fd_txn_parse_core( (uchar const *)raw,
570 0 : fd_ulong_min( batch_end_off - blockoff, FD_TXN_MTU ),
571 0 : txn_out,
572 0 : NULL,
573 0 : &pay_sz );
574 0 : if( txn_sz == 0 || txn_sz > FD_TXN_MTU ) {
575 0 : FD_LOG_ERR(( "failed to parse transaction %lu in microblock %lu in slot %lu. txn size: %lu",
576 0 : txn_idx,
577 0 : mblk,
578 0 : slot,
579 0 : txn_sz ));
580 0 : }
581 0 : fd_txn_t const * txn = (fd_txn_t const *)txn_out;
582 :
583 0 : if( pay_sz == 0UL )
584 0 : FD_LOG_ERR(( "failed to parse transaction %lu in microblock %lu in slot %lu",
585 0 : txn_idx,
586 0 : mblk,
587 0 : slot ));
588 :
589 0 : fd_txn_key_t const * sigs =
590 0 : (fd_txn_key_t const *)( (ulong)raw + (ulong)txn->signature_off );
591 0 : fd_txn_map_t * txn_map = fd_blockstore_txn_map( blockstore );
592 0 : for( ulong j = 0; j < txn->signature_cnt; j++ ) {
593 0 : if( FD_UNLIKELY( fd_txn_map_key_cnt( txn_map ) ==
594 0 : fd_txn_map_key_max( txn_map ) ) ) {
595 0 : break;
596 0 : }
597 0 : fd_txn_key_t sig;
598 0 : fd_memcpy( &sig, sigs + j, sizeof( sig ) );
599 0 : if( fd_txn_map_query( txn_map, &sig, NULL ) != NULL ) continue;
600 0 : fd_txn_map_t * elem = fd_txn_map_insert( txn_map, &sig );
601 0 : if( elem == NULL ) { break; }
602 0 : elem->slot = slot;
603 0 : elem->offset = blockoff;
604 0 : elem->sz = pay_sz;
605 0 : elem->meta_gaddr = 0;
606 0 : elem->meta_sz = 0;
607 0 : if( txns_cnt < FD_TXN_MAX_PER_SLOT ) {
608 0 : fd_block_txn_t * ref = &txns[txns_cnt++];
609 0 : ref->txn_off = blockoff;
610 0 : ref->id_off = (ulong)( sigs + j ) - (ulong)data;
611 0 : ref->sz = pay_sz;
612 0 : }
613 0 : }
614 :
615 0 : blockoff += pay_sz;
616 0 : }
617 0 : }
618 0 : if( FD_UNLIKELY( blockoff > batch_end_off ) ) {
619 0 : FD_LOG_ERR(( "parser error: shouldn't have been allowed to read past batch boundary" ));
620 0 : }
621 0 : if( FD_UNLIKELY( blockoff < batch_end_off ) ) {
622 0 : if( FD_LIKELY( allow_trailing ) ) {
623 0 : FD_LOG_DEBUG(( "ignoring %lu trailing bytes in slot %lu batch %lu", batch_end_off-blockoff, slot, batch_i ));
624 0 : }
625 0 : if( FD_UNLIKELY( !allow_trailing ) ) {
626 0 : FD_LOG_ERR(( "%lu trailing bytes in slot %lu batch %lu", batch_end_off-blockoff, slot, batch_i ));
627 0 : }
628 0 : }
629 0 : blockoff = batch_end_off;
630 0 : }
631 :
632 0 : fd_block_micro_t * micros_laddr =
633 0 : fd_alloc_malloc( fd_blockstore_alloc( blockstore ),
634 0 : alignof( fd_block_micro_t ),
635 0 : sizeof( fd_block_micro_t ) * micros_cnt );
636 0 : fd_memcpy( micros_laddr, micros, sizeof( fd_block_micro_t ) * micros_cnt );
637 0 : block->micros_gaddr = fd_wksp_gaddr_fast( fd_blockstore_wksp( blockstore ), micros_laddr );
638 0 : block->micros_cnt = micros_cnt;
639 :
640 0 : fd_block_txn_t * txns_laddr =
641 0 : fd_alloc_malloc( fd_blockstore_alloc( blockstore ),
642 0 : alignof( fd_block_txn_t ),
643 0 : sizeof( fd_block_txn_t ) * txns_cnt );
644 0 : fd_memcpy( txns_laddr, txns, sizeof( fd_block_txn_t ) * txns_cnt );
645 0 : block->txns_gaddr = fd_wksp_gaddr_fast( fd_blockstore_wksp( blockstore ), txns_laddr );
646 0 : block->txns_cnt = txns_cnt;
647 :
648 0 : fd_alloc_free( fd_blockstore_alloc( blockstore ), micros );
649 0 : fd_alloc_free( fd_blockstore_alloc( blockstore ), txns );
650 0 : }
651 :
652 : static int
653 0 : deshred( fd_blockstore_t * blockstore, ulong slot ) {
654 0 : FD_LOG_NOTICE(( "[%s] slot %lu", __func__, slot ));
655 :
656 : // TODO make this update non blocking
657 0 : fd_block_map_query_t query[1];
658 0 : int err = fd_block_map_prepare( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
659 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
660 0 : FD_TEST( err == FD_MAP_SUCCESS && block_info->slot == slot && block_info->block_gaddr == 0 );
661 : /* FIXME duplicate blocks are not supported */
662 :
663 0 : block_info->ts = fd_log_wallclock();
664 0 : ulong shred_cnt = block_info->slot_complete_idx + 1;
665 0 : fd_block_map_publish( query );
666 :
667 0 : ulong block_sz = 0UL;
668 0 : ulong batch_cnt = 0UL;
669 0 : fd_shred_t shred_hdr;
670 0 : for( uint idx = 0; idx < shred_cnt; idx++ ) {
671 0 : fd_shred_key_t key = { slot, idx };
672 0 : int err = FD_MAP_ERR_AGAIN;
673 0 : while( err == FD_MAP_ERR_AGAIN ) {
674 0 : fd_buf_shred_map_query_t query[1] = { 0 };
675 0 : err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query );
676 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) FD_LOG_ERR(( "[%s] map missing shred %lu %u while deshredding", __func__, slot, idx ));
677 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] map corrupt. shred %lu %u", __func__, slot, idx ));
678 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
679 0 : fd_buf_shred_t const * shred = fd_buf_shred_map_query_ele_const( query );
680 0 : shred_hdr = shred->hdr;
681 0 : err = fd_buf_shred_map_query_test( query );
682 0 : }
683 0 : FD_TEST( !err );
684 0 : block_sz += fd_shred_payload_sz( &shred_hdr );
685 0 : if( FD_LIKELY( ( shred_hdr.data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE ) ||
686 0 : shred_hdr.data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE ) ) {
687 0 : batch_cnt++;
688 0 : }
689 0 : }
690 :
691 : // alloc mem for the block
692 0 : ulong data_off = fd_ulong_align_up( sizeof(fd_block_t), 128UL );
693 0 : ulong shred_off = fd_ulong_align_up( data_off + block_sz, alignof(fd_block_shred_t) );
694 0 : ulong batch_off = fd_ulong_align_up( shred_off + (sizeof(fd_block_shred_t) * shred_cnt), alignof(fd_block_entry_batch_t) );
695 0 : ulong tot_sz = batch_off + (sizeof(fd_block_entry_batch_t) * batch_cnt);
696 :
697 0 : fd_alloc_t * alloc = fd_blockstore_alloc( blockstore );
698 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
699 0 : fd_block_t * block = fd_alloc_malloc( alloc, 128UL, tot_sz );
700 0 : if( FD_UNLIKELY( !block ) ) {
701 0 : FD_LOG_ERR(( "[%s] OOM: failed to alloc block. blockstore needs to hold in memory all blocks for slots >= SMR, so either increase memory or check for issues with publishing new SMRs.", __func__ ));
702 0 : }
703 :
704 0 : fd_memset( block, 0, sizeof(fd_block_t) );
705 :
706 0 : uchar * data_laddr = (uchar *)((ulong)block + data_off);
707 0 : block->data_gaddr = fd_wksp_gaddr_fast( wksp, data_laddr );
708 0 : block->data_sz = block_sz;
709 0 : fd_block_shred_t * shreds_laddr = (fd_block_shred_t *)((ulong)block + shred_off);
710 0 : block->shreds_gaddr = fd_wksp_gaddr_fast( wksp, shreds_laddr );
711 0 : block->shreds_cnt = shred_cnt;
712 0 : fd_block_entry_batch_t * batch_laddr = (fd_block_entry_batch_t *)((ulong)block + batch_off);
713 0 : block->batch_gaddr = fd_wksp_gaddr_fast( wksp, batch_laddr );
714 0 : block->batch_cnt = batch_cnt;
715 :
716 0 : ulong off = 0UL;
717 0 : ulong batch_i = 0UL;
718 0 : for( uint idx = 0; idx < shred_cnt; idx++ ) {
719 : // TODO can do this in one iteration with block sz loop... massage with deshredder API
720 0 : fd_shred_key_t key = { slot, idx };
721 0 : ulong payload_sz = 0UL;
722 0 : uchar flags = 0;
723 0 : int err = FD_MAP_ERR_AGAIN;
724 0 : while( err == FD_MAP_ERR_AGAIN ) {
725 0 : fd_buf_shred_map_query_t query[1] = { 0 };;
726 0 : err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query );
727 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
728 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) FD_LOG_ERR(( "[%s] map missing shred %lu %u while deshredding", __func__, slot, idx ));
729 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] map corrupt. shred %lu %u", __func__, slot, idx ));
730 0 : fd_shred_t const * shred = &fd_buf_shred_map_query_ele_const( query )->hdr;
731 0 : memcpy( data_laddr + off, fd_shred_data_payload( shred ), fd_shred_payload_sz( shred ) );
732 :
733 0 : shreds_laddr[idx].hdr = *shred;
734 0 : shreds_laddr[idx].off = off;
735 0 : FD_TEST( 0 == memcmp( &shreds_laddr[idx].hdr, shred, sizeof( fd_shred_t ) ) );
736 0 : FD_TEST( 0 == memcmp( data_laddr + shreds_laddr[idx].off, fd_shred_data_payload( shred ), fd_shred_payload_sz( shred ) ) );
737 :
738 0 : payload_sz = fd_shred_payload_sz( shred );
739 0 : flags = shred->data.flags;
740 :
741 0 : err = fd_buf_shred_map_query_test( query );
742 0 : }
743 0 : FD_TEST( !err );
744 0 : off += payload_sz;
745 0 : if( FD_LIKELY( (flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE) || flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE ) ) {
746 0 : batch_laddr[ batch_i++ ].end_off = off;
747 0 : }
748 : // fd_blockstore_shred_remove( blockstore, slot, idx );
749 0 : }
750 0 : if( FD_UNLIKELY( batch_cnt != batch_i ) ) {
751 0 : FD_LOG_ERR(( "batch_cnt(%lu)!=batch_i(%lu) potential memory corruption", batch_cnt, batch_i ));
752 0 : }
753 :
754 0 : fd_blockstore_scan_block( blockstore, slot, block );
755 :
756 : /* Do this last when it's safe */
757 0 : FD_COMPILER_MFENCE();
758 :
759 : // TODO make this non blocking
760 0 : err = fd_block_map_prepare( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
761 0 : block_info = fd_block_map_query_ele( query );
762 0 : FD_TEST( err == FD_MAP_SUCCESS && block_info->slot == slot );
763 :
764 0 : block_info->block_gaddr = fd_wksp_gaddr_fast( wksp, block );
765 0 : fd_block_micro_t * micros = fd_wksp_laddr_fast( wksp, block->micros_gaddr );
766 0 : uchar * data = fd_wksp_laddr_fast( wksp, block->data_gaddr );
767 0 : fd_microblock_hdr_t * last_micro = (fd_microblock_hdr_t *)( data + micros[block->micros_cnt - 1].off );
768 0 : memcpy( &block_info->block_hash, last_micro->hash, sizeof( fd_hash_t ) );
769 :
770 0 : block_info->flags = fd_uchar_clear_bit( block_info->flags, FD_BLOCK_FLAG_RECEIVING );
771 0 : block_info->flags = fd_uchar_set_bit( block_info->flags, FD_BLOCK_FLAG_COMPLETED );
772 0 : fd_block_map_publish( query );
773 :
774 0 : return FD_BLOCKSTORE_SUCCESS;
775 0 : }
776 :
777 : void
778 : fd_blockstore_block_allocs_remove( fd_blockstore_t * blockstore,
779 0 : ulong slot ){
780 0 : fd_block_map_query_t query[1] = { 0 };
781 0 : ulong block_gaddr = 0;
782 0 : int err = FD_MAP_ERR_AGAIN;
783 0 : while( err == FD_MAP_ERR_AGAIN ) {
784 0 : err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
785 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
786 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return; /* slot not found */
787 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
788 0 : if( FD_UNLIKELY( fd_uchar_extract_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING ) ) ) {
789 0 : FD_LOG_WARNING(( "[%s] slot %lu has replay in progress. not removing.", __func__, slot ));
790 0 : return;
791 0 : }
792 0 : block_gaddr = block_info->block_gaddr;
793 0 : err = fd_block_map_query_test( query );
794 0 : }
795 :
796 : /* Remove all the allocations relating to a block. */
797 :
798 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
799 0 : fd_alloc_t * alloc = fd_blockstore_alloc( blockstore );
800 :
801 0 : fd_txn_map_t * txn_map = fd_blockstore_txn_map( blockstore );
802 0 : fd_block_t * block = fd_wksp_laddr_fast( wksp, block_gaddr );
803 :
804 : /* DO THIS FIRST FOR THREAD SAFETY */
805 0 : FD_COMPILER_MFENCE();
806 : //block_info->block_gaddr = 0;
807 :
808 0 : uchar * data = fd_wksp_laddr_fast( wksp, block->data_gaddr );
809 0 : fd_block_txn_t * txns = fd_wksp_laddr_fast( wksp, block->txns_gaddr );
810 0 : for( ulong j = 0; j < block->txns_cnt; ++j ) {
811 0 : fd_txn_key_t sig;
812 0 : fd_memcpy( &sig, data + txns[j].id_off, sizeof( sig ) );
813 0 : fd_txn_map_remove( txn_map, &sig );
814 0 : }
815 0 : if( block->micros_gaddr ) fd_alloc_free( alloc, fd_wksp_laddr_fast( wksp, block->micros_gaddr ) );
816 0 : if( block->txns_gaddr ) fd_alloc_free( alloc, txns );
817 0 : ulong mgaddr = block->txns_meta_gaddr;
818 0 : while( mgaddr ) {
819 0 : ulong * laddr = fd_wksp_laddr_fast( wksp, mgaddr );
820 0 : ulong mgaddr2 = laddr[0]; /* link to next allocation */
821 0 : fd_alloc_free( alloc, laddr );
822 0 : mgaddr = mgaddr2;
823 0 : }
824 0 : fd_alloc_free( alloc, block );
825 0 : }
826 :
827 : int
828 : fd_rocksdb_import_block_blockstore( fd_rocksdb_t * db,
829 : fd_slot_meta_t * m,
830 : fd_blockstore_t * blockstore,
831 : int txnstatus,
832 : const uchar * hash_override,
833 0 : fd_valloc_t valloc ) {
834 0 : ulong slot = m->slot;
835 0 : ulong start_idx = 0;
836 0 : ulong end_idx = m->received;
837 :
838 0 : rocksdb_iterator_t * iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_DATA_SHRED]);
839 :
840 0 : char k[16];
841 0 : ulong slot_be = *((ulong *) &k[0]) = fd_ulong_bswap(slot);
842 0 : *((ulong *) &k[8]) = fd_ulong_bswap(start_idx);
843 :
844 0 : rocksdb_iter_seek(iter, (const char *) k, sizeof(k));
845 :
846 0 : for (ulong i = start_idx; i < end_idx; i++) {
847 0 : ulong cur_slot, index;
848 0 : uchar valid = rocksdb_iter_valid(iter);
849 :
850 0 : if (valid) {
851 0 : size_t klen = 0;
852 0 : const char* key = rocksdb_iter_key(iter, &klen); // There is no need to free key
853 0 : if (klen != 16) // invalid key
854 0 : continue;
855 0 : cur_slot = fd_ulong_bswap(*((ulong *) &key[0]));
856 0 : index = fd_ulong_bswap(*((ulong *) &key[8]));
857 0 : }
858 :
859 0 : if (!valid || cur_slot != slot) {
860 0 : FD_LOG_WARNING(("missing shreds for slot %lu", slot));
861 0 : rocksdb_iter_destroy(iter);
862 0 : return -1;
863 0 : }
864 :
865 0 : if (index != i) {
866 0 : FD_LOG_WARNING(("missing shred %lu at index %lu for slot %lu", i, index, slot));
867 0 : rocksdb_iter_destroy(iter);
868 0 : return -1;
869 0 : }
870 :
871 0 : size_t dlen = 0;
872 : // Data was first copied from disk into memory to make it available to this API
873 0 : const unsigned char *data = (const unsigned char *) rocksdb_iter_value(iter, &dlen);
874 0 : if (data == NULL) {
875 0 : FD_LOG_WARNING(("failed to read shred %lu/%lu", slot, i));
876 0 : rocksdb_iter_destroy(iter);
877 0 : return -1;
878 0 : }
879 :
880 : // This just correctly selects from inside the data pointer to the
881 : // actual data without a memory copy
882 0 : fd_shred_t const * shred = fd_shred_parse( data, (ulong) dlen );
883 0 : if (shred == NULL) {
884 0 : FD_LOG_WARNING(("failed to parse shred %lu/%lu", slot, i));
885 0 : rocksdb_iter_destroy(iter);
886 0 : return -1;
887 0 : }
888 0 : fd_blockstore_shred_insert( blockstore, shred );
889 : // if (rc != FD_BLOCKSTORE_SUCCESS_SLOT_COMPLETE && rc != FD_BLOCKSTORE_SUCCESS) {
890 : // FD_LOG_WARNING(("failed to store shred %lu/%lu", slot, i));
891 : // rocksdb_iter_destroy(iter);
892 : // return -1;
893 : // }
894 :
895 0 : rocksdb_iter_next(iter);
896 0 : }
897 :
898 0 : rocksdb_iter_destroy(iter);
899 :
900 :
901 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
902 0 : fd_block_info_t * block_info = fd_blockstore_block_map_query( blockstore, slot );
903 0 : if( FD_LIKELY( block_info && fd_blockstore_shreds_complete( blockstore, slot ) ) ) {
904 0 : deshred( blockstore, slot );
905 :
906 0 : size_t vallen = 0;
907 0 : char * err = NULL;
908 0 : char * res = rocksdb_get_cf(
909 0 : db->db,
910 0 : db->ro,
911 0 : db->cf_handles[ FD_ROCKSDB_CFIDX_BLOCKTIME ],
912 0 : (char const *)&slot_be, sizeof(ulong),
913 0 : &vallen,
914 0 : &err );
915 0 : if( FD_UNLIKELY( err ) ) {
916 0 : FD_LOG_WARNING(( "rocksdb: %s", err ));
917 0 : free( err );
918 0 : } else if(vallen == sizeof(ulong)) {
919 0 : block_info->ts = (*(long*)res)*((long)1e9); /* Convert to nanos */
920 0 : free(res);
921 0 : }
922 :
923 0 : vallen = 0;
924 0 : err = NULL;
925 0 : res = rocksdb_get_cf(
926 0 : db->db,
927 0 : db->ro,
928 0 : db->cf_handles[ FD_ROCKSDB_CFIDX_BLOCK_HEIGHT ],
929 0 : (char const *)&slot_be, sizeof(ulong),
930 0 : &vallen,
931 0 : &err );
932 0 : block_info->block_height = 0;
933 0 : if( FD_UNLIKELY( err ) ) {
934 0 : FD_LOG_WARNING(( "rocksdb: %s", err ));
935 0 : free( err );
936 0 : } else if(vallen == sizeof(ulong)) {
937 0 : block_info->block_height = *(ulong*)res;
938 0 : free(res);
939 0 : }
940 :
941 0 : vallen = 0;
942 0 : err = NULL;
943 0 : if (NULL != hash_override)
944 0 : fd_memcpy( block_info->bank_hash.hash, hash_override, 32UL );
945 0 : else {
946 0 : res = rocksdb_get_cf(
947 0 : db->db,
948 0 : db->ro,
949 0 : db->cf_handles[ FD_ROCKSDB_CFIDX_BANK_HASHES ],
950 0 : (char const *)&slot_be, sizeof(ulong),
951 0 : &vallen,
952 0 : &err );
953 0 : if( FD_UNLIKELY( err ) ) {
954 0 : FD_LOG_WARNING(( "rocksdb: %s", err ));
955 0 : free( err );
956 0 : } else {
957 0 : fd_bincode_decode_ctx_t decode = {
958 0 : .data = res,
959 0 : .dataend = res + vallen
960 0 : };
961 0 : ulong total_sz = 0UL;
962 0 : int decode_err = fd_frozen_hash_versioned_decode_footprint( &decode, &total_sz );
963 :
964 0 : uchar * mem = fd_valloc_malloc( valloc, fd_frozen_hash_versioned_align(), total_sz );
965 0 : if( NULL == mem ) {
966 0 : FD_LOG_ERR(( "fd_valloc_malloc failed" ));
967 0 : }
968 :
969 0 : fd_frozen_hash_versioned_t * versioned = fd_frozen_hash_versioned_decode( mem, &decode );
970 0 : if( FD_UNLIKELY( decode_err!=FD_BINCODE_SUCCESS ) ) goto cleanup;
971 0 : if( FD_UNLIKELY( decode.data!=decode.dataend ) ) goto cleanup;
972 0 : if( FD_UNLIKELY( versioned->discriminant !=fd_frozen_hash_versioned_enum_current ) ) goto cleanup;
973 : /* Success */
974 0 : fd_memcpy( block_info->bank_hash.hash, versioned->inner.current.frozen_hash.hash, 32UL );
975 0 : cleanup:
976 0 : free( res );
977 0 : }
978 0 : }
979 0 : }
980 :
981 0 : if( txnstatus && FD_LIKELY( block_info && fd_blockstore_shreds_complete( blockstore, slot ) ) ) {
982 0 : fd_block_t * blk = fd_wksp_laddr_fast( wksp, block_info->block_gaddr );
983 0 : uchar * data = fd_wksp_laddr_fast( wksp, blk->data_gaddr );
984 0 : fd_block_txn_t * txns = fd_wksp_laddr_fast( wksp, blk->txns_gaddr );
985 :
986 : /* TODO: change txn indexing after blockstore refactor */
987 : /* Compute the total size of the logs */
988 0 : ulong tot_meta_sz = 2*sizeof(ulong);
989 0 : for ( ulong j = 0; j < blk->txns_cnt; ++j ) {
990 0 : if( j == 0 || txns[j].txn_off != txns[j-1].txn_off ) {
991 0 : fd_txn_key_t sig;
992 0 : fd_memcpy( &sig, data + txns[j].id_off, sizeof( sig ) );
993 0 : ulong sz;
994 0 : void * raw = fd_rocksdb_get_txn_status_raw( db, slot, &sig, &sz );
995 0 : if( raw != NULL ) {
996 0 : free(raw);
997 0 : tot_meta_sz += sz;
998 0 : }
999 0 : }
1000 0 : }
1001 0 : fd_alloc_t * alloc = fd_blockstore_alloc( blockstore );
1002 0 : uchar * cur_laddr = fd_alloc_malloc( alloc, 1, tot_meta_sz );
1003 0 : if( cur_laddr == NULL ) {
1004 0 : return 0;
1005 0 : }
1006 0 : ((ulong*)cur_laddr)[0] = blk->txns_meta_gaddr; /* Link to previous allocation */
1007 0 : ((ulong*)cur_laddr)[1] = blk->txns_meta_sz;
1008 0 : blk->txns_meta_gaddr = fd_wksp_gaddr_fast( wksp, cur_laddr );
1009 0 : blk->txns_meta_sz = tot_meta_sz;
1010 0 : cur_laddr += 2*sizeof(ulong);
1011 :
1012 : /* Copy over the logs */
1013 0 : fd_txn_map_t * txn_map = fd_blockstore_txn_map( blockstore );
1014 0 : ulong meta_gaddr = 0;
1015 0 : ulong meta_sz = 0;
1016 0 : fd_txn_key_t sig = { 0 };
1017 0 : for ( ulong j = 0; j < blk->txns_cnt; ++j ) {
1018 0 : if( j == 0 || txns[j].txn_off != txns[j-1].txn_off ) {
1019 0 : fd_memcpy( &sig, data + txns[j].id_off, sizeof( sig ) );
1020 0 : ulong sz;
1021 0 : void * raw = fd_rocksdb_get_txn_status_raw( db, slot, &sig, &sz );
1022 0 : if( raw == NULL ) {
1023 0 : meta_gaddr = 0;
1024 0 : meta_sz = 0;
1025 0 : } else {
1026 0 : fd_memcpy(cur_laddr, raw, sz);
1027 0 : free(raw);
1028 0 : meta_gaddr = fd_wksp_gaddr_fast( wksp, cur_laddr );
1029 0 : meta_sz = sz;
1030 0 : cur_laddr += sz;
1031 0 : }
1032 0 : }
1033 0 : fd_txn_map_t * txn_map_entry = fd_txn_map_query( txn_map, &sig, NULL );
1034 0 : if( FD_UNLIKELY( !txn_map_entry ) ) {
1035 0 : char sig_str[ FD_BASE58_ENCODED_64_SZ ];
1036 0 : fd_base58_encode_64( fd_type_pun_const( sig.v ), NULL, sig_str );
1037 0 : FD_LOG_WARNING(( "missing transaction %s", sig_str ));
1038 0 : continue;
1039 0 : }
1040 0 : txn_map_entry->meta_gaddr = meta_gaddr;
1041 0 : txn_map_entry->meta_sz = meta_sz;
1042 0 : }
1043 :
1044 0 : FD_TEST( blk->txns_meta_gaddr + blk->txns_meta_sz == fd_wksp_gaddr_fast( wksp, cur_laddr ) );
1045 0 : }
1046 :
1047 0 : blockstore->shmem->lps = slot;
1048 0 : blockstore->shmem->hcs = slot;
1049 0 : blockstore->shmem->wmk = slot;
1050 :
1051 0 : if( FD_LIKELY( block_info ) ) {
1052 0 : block_info->flags =
1053 0 : fd_uchar_set_bit(
1054 0 : fd_uchar_set_bit(
1055 0 : fd_uchar_set_bit(
1056 0 : fd_uchar_set_bit(
1057 0 : fd_uchar_set_bit(
1058 0 : block_info->flags,
1059 0 : FD_BLOCK_FLAG_COMPLETED ),
1060 0 : FD_BLOCK_FLAG_PROCESSED ),
1061 0 : FD_BLOCK_FLAG_EQVOCSAFE ),
1062 0 : FD_BLOCK_FLAG_CONFIRMED ),
1063 0 : FD_BLOCK_FLAG_FINALIZED );
1064 0 : }
1065 :
1066 0 : return 0;
1067 0 : }
1068 :
1069 : int
1070 : fd_rocksdb_import_block_shredcap( fd_rocksdb_t * db,
1071 : fd_slot_meta_t * metadata,
1072 : fd_io_buffered_ostream_t * ostream,
1073 : fd_io_buffered_ostream_t * bank_hash_ostream,
1074 0 : fd_valloc_t valloc ) {
1075 0 : ulong slot = metadata->slot;
1076 :
1077 : /* pre_slot_hdr_file_offset is the current offset within the file, but
1078 : pre_slot_hdr_file_offset_real accounts for the size of the buffer that has
1079 : been filled but not flushed. This value is used to jump back into the file to
1080 : populate the payload_sz for the slot header */
1081 0 : long pre_slot_hdr_file_offset = lseek( ostream->fd, 0, SEEK_CUR );
1082 0 : long pre_slot_hdr_file_offset_real = pre_slot_hdr_file_offset + (long)ostream->wbuf_used;
1083 0 : if ( FD_UNLIKELY( pre_slot_hdr_file_offset == -1 ) ) {
1084 0 : FD_LOG_ERR(( "lseek error while seeking to current location" ));
1085 0 : }
1086 :
1087 : /* Write slot specific header */
1088 0 : fd_shredcap_slot_hdr_t slot_hdr;
1089 0 : slot_hdr.magic = FD_SHREDCAP_SLOT_HDR_MAGIC;
1090 0 : slot_hdr.version = FD_SHREDCAP_SLOT_HDR_VERSION;
1091 0 : slot_hdr.payload_sz = ULONG_MAX; /* This value is populated after slot is processed */
1092 0 : slot_hdr.slot = metadata->slot;
1093 0 : slot_hdr.consumed = metadata->consumed;
1094 0 : slot_hdr.received = metadata->received;
1095 0 : slot_hdr.first_shred_timestamp = metadata->first_shred_timestamp;
1096 0 : slot_hdr.last_index = metadata->last_index;
1097 0 : slot_hdr.parent_slot = metadata->parent_slot;
1098 0 : fd_io_buffered_ostream_write( ostream, &slot_hdr, FD_SHREDCAP_SLOT_HDR_FOOTPRINT );
1099 :
1100 : /* We need to track the payload size */
1101 0 : ulong payload_sz = 0;
1102 :
1103 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf( db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_DATA_SHRED] );
1104 :
1105 0 : char k[16];
1106 0 : ulong slot_be = *((ulong *) &k[0]) = fd_ulong_bswap( slot );
1107 0 : *((ulong *) &k[8]) = fd_ulong_bswap( 0 );
1108 :
1109 0 : rocksdb_iter_seek( iter, (const char *) k, sizeof(k) );
1110 :
1111 0 : ulong start_idx = 0;
1112 0 : ulong end_idx = metadata->received;
1113 0 : for ( ulong i = start_idx; i < end_idx; i++ ) {
1114 0 : ulong cur_slot, index;
1115 0 : uchar valid = rocksdb_iter_valid( iter );
1116 :
1117 0 : if ( valid ) {
1118 0 : size_t klen = 0;
1119 0 : const char* key = rocksdb_iter_key( iter, &klen ); // There is no need to free key
1120 0 : if ( klen != 16 ) { // invalid key
1121 0 : continue;
1122 0 : }
1123 0 : cur_slot = fd_ulong_bswap(*((ulong *) &key[0]));
1124 0 : index = fd_ulong_bswap(*((ulong *) &key[8]));
1125 0 : }
1126 :
1127 0 : if ( !valid || cur_slot != slot ) {
1128 0 : FD_LOG_WARNING(( "missing shreds for slot %lu", slot ));
1129 0 : rocksdb_iter_destroy( iter );
1130 0 : return -1;
1131 0 : }
1132 :
1133 0 : if ( index != i ) {
1134 0 : FD_LOG_WARNING(( "missing shred %lu at index %lu for slot %lu", i, index, slot ));
1135 0 : rocksdb_iter_destroy( iter );
1136 0 : return -1;
1137 0 : }
1138 :
1139 0 : size_t dlen = 0;
1140 : // Data was first copied from disk into memory to make it available to this API
1141 0 : const unsigned char *data = (const unsigned char *) rocksdb_iter_value( iter, &dlen );
1142 0 : if ( data == NULL ) {
1143 0 : FD_LOG_WARNING(( "failed to read shred %lu/%lu", slot, i ));
1144 0 : rocksdb_iter_destroy( iter );
1145 0 : return -1;
1146 0 : }
1147 :
1148 0 : fd_shred_t const * shred = fd_shred_parse( data, (ulong) dlen );
1149 0 : if ( shred == NULL ) {
1150 0 : FD_LOG_WARNING(( "failed to parse shred %lu/%lu", slot, i ));
1151 0 : rocksdb_iter_destroy( iter );
1152 0 : return -1;
1153 0 : }
1154 :
1155 : /* Write a shred header and shred. Each shred and it's header will be aligned */
1156 0 : char shred_buf[ FD_SHREDCAP_SHRED_MAX ];
1157 0 : char * shred_buf_ptr = shred_buf;
1158 0 : ushort shred_sz = (ushort)fd_shred_sz( shred );
1159 0 : uint shred_boundary_sz = (uint)fd_uint_align_up( shred_sz + FD_SHREDCAP_SHRED_HDR_FOOTPRINT,
1160 0 : FD_SHREDCAP_ALIGN ) - FD_SHREDCAP_SHRED_HDR_FOOTPRINT;
1161 :
1162 0 : fd_memset( shred_buf_ptr, 0, shred_boundary_sz );
1163 : /* Populate start of buffer with header */
1164 0 : fd_shredcap_shred_hdr_t * shred_hdr = (fd_shredcap_shred_hdr_t*)shred_buf_ptr;
1165 0 : shred_hdr->hdr_sz = FD_SHREDCAP_SHRED_HDR_FOOTPRINT;
1166 0 : shred_hdr->shred_sz = shred_sz;
1167 0 : shred_hdr->shred_boundary_sz = shred_boundary_sz;
1168 :
1169 : /* Skip ahead and populate rest of buffer with shred and write out */
1170 0 : fd_memcpy( shred_buf_ptr + FD_SHREDCAP_SHRED_HDR_FOOTPRINT, shred, shred_boundary_sz );
1171 0 : fd_io_buffered_ostream_write( ostream, shred_buf_ptr,
1172 0 : shred_boundary_sz + FD_SHREDCAP_SHRED_HDR_FOOTPRINT );
1173 :
1174 0 : payload_sz += shred_boundary_sz + FD_SHREDCAP_SHRED_HDR_FOOTPRINT;
1175 0 : rocksdb_iter_next( iter );
1176 0 : }
1177 :
1178 : /* Update file size */
1179 0 : long pre_slot_processed_file_offset = lseek( ostream->fd, 0, SEEK_CUR );
1180 0 : if ( FD_UNLIKELY( pre_slot_processed_file_offset == -1 ) ) {
1181 0 : FD_LOG_ERR(( "lseek error when seeking to current position" ));
1182 0 : }
1183 :
1184 0 : if ( FD_UNLIKELY( pre_slot_processed_file_offset == pre_slot_hdr_file_offset ) ) {
1185 : /* This case is when the payload from the shreds is smaller than the free
1186 : space from the write buffer. This means that the buffer was not flushed
1187 : at any point. This case is highly unlikely */
1188 0 : fd_io_buffered_ostream_flush( ostream );
1189 0 : }
1190 :
1191 : /* Safely assume that the buffer was flushed to the file at least once. Store
1192 : original seek position, skip to position with payload_sz in header, write
1193 : updated payload sz, and then reset seek position. */
1194 0 : long original_offset = lseek( ostream->fd, 0, SEEK_CUR );
1195 0 : if ( FD_UNLIKELY( original_offset == -1 ) ) {
1196 0 : FD_LOG_ERR(( "lseek error when seeking to current position" ));
1197 0 : }
1198 0 : long payload_sz_file_offset = pre_slot_hdr_file_offset_real +
1199 0 : (long)FD_SHREDCAP_SLOT_HDR_PAYLOAD_SZ_OFFSET;
1200 :
1201 0 : long offset;
1202 0 : offset = lseek( ostream->fd, payload_sz_file_offset, SEEK_SET );
1203 0 : if ( FD_UNLIKELY( offset == -1 ) ) {
1204 0 : FD_LOG_ERR(( "lseek error when seeking to offset=%ld", payload_sz_file_offset ));
1205 0 : }
1206 0 : ulong to_write;
1207 0 : fd_io_write( ostream->fd, &payload_sz, sizeof(ulong), sizeof(ulong), &to_write );
1208 :
1209 0 : offset = lseek( ostream->fd, original_offset, SEEK_SET );
1210 0 : if ( FD_UNLIKELY( offset == -1 ) ) {
1211 0 : FD_LOG_ERR(( "lseek error when seeking to offset=%ld", original_offset ));
1212 0 : }
1213 :
1214 : /* Write slot footer */
1215 0 : fd_shredcap_slot_ftr_t slot_ftr;
1216 0 : slot_ftr.magic = FD_SHREDCAP_SLOT_FTR_MAGIC;
1217 0 : slot_ftr.payload_sz = payload_sz;
1218 0 : fd_io_buffered_ostream_write( ostream, &slot_ftr, FD_SHREDCAP_SLOT_FTR_FOOTPRINT );
1219 0 : rocksdb_iter_destroy( iter );
1220 :
1221 : /* Get and write bank hash information to respective file */
1222 0 : size_t vallen = 0;
1223 0 : char * err = NULL;
1224 0 : char * res = rocksdb_get_cf( db->db, db->ro, db->cf_handles[ FD_ROCKSDB_CFIDX_BANK_HASHES ],
1225 0 : (char const *)&slot_be, sizeof(ulong), &vallen, &err );
1226 0 : if( FD_UNLIKELY( err ) ) {
1227 0 : FD_LOG_WARNING((" Could not get bank hash data due to err=%s",err ));
1228 0 : free( err );
1229 0 : } else {
1230 0 : fd_bincode_decode_ctx_t decode = {
1231 0 : .data = res,
1232 0 : .dataend = res + vallen,
1233 0 : };
1234 0 : ulong total_sz = 0UL;
1235 0 : int decode_err = fd_frozen_hash_versioned_decode_footprint( &decode, &total_sz );
1236 :
1237 0 : uchar * mem = fd_valloc_malloc( valloc, fd_frozen_hash_versioned_align(), total_sz );
1238 :
1239 0 : fd_frozen_hash_versioned_t * versioned = fd_frozen_hash_versioned_decode( mem, &decode );
1240 :
1241 0 : if( FD_UNLIKELY( decode_err != FD_BINCODE_SUCCESS ) ) goto cleanup;
1242 0 : if( FD_UNLIKELY( decode.data!=decode.dataend ) ) goto cleanup;
1243 0 : if( FD_UNLIKELY( versioned->discriminant != fd_frozen_hash_versioned_enum_current ) ) goto cleanup;
1244 0 : fd_shredcap_bank_hash_entry_t bank_hash_entry;
1245 0 : bank_hash_entry.slot = slot;
1246 0 : fd_memcpy( &bank_hash_entry.bank_hash, versioned->inner.current.frozen_hash.hash, 32UL );
1247 0 : fd_io_buffered_ostream_write( bank_hash_ostream, &bank_hash_entry, FD_SHREDCAP_BANK_HASH_ENTRY_FOOTPRINT );
1248 0 : cleanup:
1249 0 : free( res );
1250 0 : }
1251 0 : return 0;
1252 0 : }
|