Line data Source code
1 : #include "fd_rocksdb.h"
2 : #include "fd_blockstore.h"
3 : #include "../shredcap/fd_shredcap.h"
4 : #include <stdbool.h>
5 : #include <stdlib.h>
6 : #include <stdio.h>
7 : #include <unistd.h>
8 : #include "../../util/bits/fd_bits.h"
9 :
10 : char *
11 : fd_rocksdb_init( fd_rocksdb_t * db,
12 0 : char const * db_name ) {
13 0 : fd_memset(db, 0, sizeof(fd_rocksdb_t));
14 :
15 0 : db->opts = rocksdb_options_create();
16 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEFAULT ] = "default";
17 0 : db->cfgs[ FD_ROCKSDB_CFIDX_META ] = "meta";
18 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEAD_SLOTS ] = "dead_slots";
19 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DUPLICATE_SLOTS ] = "duplicate_slots";
20 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ERASURE_META ] = "erasure_meta";
21 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ORPHANS ] = "orphans";
22 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BANK_HASHES ] = "bank_hashes";
23 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ROOT ] = "root";
24 0 : db->cfgs[ FD_ROCKSDB_CFIDX_INDEX ] = "index";
25 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DATA_SHRED ] = "data_shred";
26 0 : db->cfgs[ FD_ROCKSDB_CFIDX_CODE_SHRED ] = "code_shred";
27 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ] = "transaction_status";
28 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ] = "address_signatures";
29 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ] = "transaction_memos";
30 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS_INDEX ] = "transaction_status_index";
31 0 : db->cfgs[ FD_ROCKSDB_CFIDX_REWARDS ] = "rewards";
32 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCKTIME ] = "blocktime";
33 0 : db->cfgs[ FD_ROCKSDB_CFIDX_PERF_SAMPLES ] = "perf_samples";
34 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCK_HEIGHT ] = "block_height";
35 0 : db->cfgs[ FD_ROCKSDB_CFIDX_OPTIMISTIC_SLOTS ] = "optimistic_slots";
36 0 : db->cfgs[ FD_ROCKSDB_CFIDX_MERKLE_ROOT_META ] = "merkle_root_meta";
37 :
38 0 : rocksdb_options_t const * cf_options[ FD_ROCKSDB_CF_CNT ];
39 0 : for( ulong i=0UL; i<FD_ROCKSDB_CF_CNT; i++ )
40 0 : cf_options[ i ] = db->opts;
41 :
42 0 : char *err = NULL;
43 :
44 0 : db->db = rocksdb_open_for_read_only_column_families(
45 0 : db->opts,
46 0 : db_name,
47 0 : FD_ROCKSDB_CF_CNT,
48 0 : (char const * const *)db->cfgs,
49 0 : (rocksdb_options_t const * const *)cf_options,
50 0 : db->cf_handles,
51 0 : false,
52 0 : &err );
53 :
54 0 : if( FD_UNLIKELY( err ) ) return err;
55 :
56 0 : db->ro = rocksdb_readoptions_create();
57 :
58 0 : return NULL;
59 0 : }
60 :
61 : void
62 : fd_rocksdb_new( fd_rocksdb_t * db,
63 0 : char const * db_name ) {
64 0 : fd_memset(db, 0, sizeof(fd_rocksdb_t));
65 :
66 0 : db->opts = rocksdb_options_create();
67 : /* Create the db*/
68 0 : rocksdb_options_set_create_if_missing(db->opts, 1);
69 :
70 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEFAULT ] = "default";
71 0 : db->cfgs[ FD_ROCKSDB_CFIDX_META ] = "meta";
72 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEAD_SLOTS ] = "dead_slots";
73 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DUPLICATE_SLOTS ] = "duplicate_slots";
74 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ERASURE_META ] = "erasure_meta";
75 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ORPHANS ] = "orphans";
76 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BANK_HASHES ] = "bank_hashes";
77 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ROOT ] = "root";
78 0 : db->cfgs[ FD_ROCKSDB_CFIDX_INDEX ] = "index";
79 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DATA_SHRED ] = "data_shred";
80 0 : db->cfgs[ FD_ROCKSDB_CFIDX_CODE_SHRED ] = "code_shred";
81 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ] = "transaction_status";
82 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ] = "address_signatures";
83 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ] = "transaction_memos";
84 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS_INDEX ] = "transaction_status_index";
85 0 : db->cfgs[ FD_ROCKSDB_CFIDX_REWARDS ] = "rewards";
86 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCKTIME ] = "blocktime";
87 0 : db->cfgs[ FD_ROCKSDB_CFIDX_PERF_SAMPLES ] = "perf_samples";
88 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCK_HEIGHT ] = "block_height";
89 0 : db->cfgs[ FD_ROCKSDB_CFIDX_OPTIMISTIC_SLOTS ] = "optimistic_slots";
90 0 : db->cfgs[ FD_ROCKSDB_CFIDX_MERKLE_ROOT_META ] = "merkle_root_meta";
91 :
92 : /* Create the rocksdb */
93 0 : char * err = NULL;
94 0 : db->db = rocksdb_open(db->opts, db_name, &err);
95 0 : if ( err != NULL ) {
96 0 : FD_LOG_ERR(("rocksdb creation failed: %s", err));
97 0 : }
98 :
99 0 : db->wo = rocksdb_writeoptions_create();
100 :
101 : /* Create column families, default already exists at index 0 */
102 0 : for ( ulong i = 1; i < FD_ROCKSDB_CF_CNT; ++i ) {
103 0 : db->cf_handles[i] = rocksdb_create_column_family(db->db, db->opts, db->cfgs[i], &err);
104 0 : }
105 0 : rocksdb_options_set_compression( db->opts, rocksdb_lz4_compression );
106 0 : }
107 :
108 0 : void fd_rocksdb_destroy(fd_rocksdb_t *db) {
109 :
110 0 : for( ulong i=0UL; i<FD_ROCKSDB_CF_CNT; i++ ) {
111 0 : if( db->cf_handles[i] ) {
112 0 : rocksdb_column_family_handle_destroy( db->cf_handles[i] );
113 0 : db->cf_handles[i] = NULL;
114 0 : }
115 0 : }
116 :
117 0 : if( db->ro ) {
118 0 : rocksdb_readoptions_destroy( db->ro );
119 0 : db->ro = NULL;
120 0 : }
121 :
122 0 : if( db->opts ) {
123 0 : rocksdb_options_destroy( db->opts );
124 0 : db->opts = NULL;
125 0 : }
126 :
127 0 : if( db->db ) {
128 0 : rocksdb_close( db->db );
129 0 : db->db = NULL;
130 0 : }
131 :
132 0 : if( db->wo ) {
133 0 : rocksdb_writeoptions_destroy( db->wo );
134 0 : }
135 0 : }
136 :
137 0 : ulong fd_rocksdb_last_slot(fd_rocksdb_t *db, char **err) {
138 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
139 0 : rocksdb_iter_seek_to_last(iter);
140 0 : if (!rocksdb_iter_valid(iter)) {
141 0 : rocksdb_iter_destroy(iter);
142 0 : *err = "db column for root is empty";
143 0 : return 0;
144 0 : }
145 :
146 0 : size_t klen = 0;
147 0 : const char *key = rocksdb_iter_key(iter, &klen); // There is no need to free key
148 0 : unsigned long slot = fd_ulong_bswap(*((unsigned long *) key));
149 0 : rocksdb_iter_destroy(iter);
150 0 : return slot;
151 0 : }
152 :
153 0 : ulong fd_rocksdb_find_last_slot(fd_rocksdb_t *db, char **err) {
154 0 : ulong max_slot = 0;
155 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
156 0 : rocksdb_iter_seek_to_first(iter);
157 0 : if (!rocksdb_iter_valid(iter)) {
158 0 : rocksdb_iter_destroy(iter);
159 0 : *err = "db column for root is empty";
160 0 : return 0;
161 0 : }
162 :
163 0 : for( ; rocksdb_iter_valid(iter); rocksdb_iter_next(iter) ) {
164 0 : size_t klen = 0;
165 0 : const char *key = rocksdb_iter_key(iter, &klen); // There is no need to free key
166 0 : unsigned long slot = fd_ulong_bswap(*((unsigned long *) key));
167 :
168 0 : if( slot > max_slot ) {
169 0 : max_slot = slot;
170 0 : FD_LOG_WARNING(("new max_slot: %lu", max_slot));
171 0 : }
172 0 : }
173 :
174 0 : rocksdb_iter_destroy(iter);
175 0 : return max_slot;
176 0 : }
177 :
178 : ulong
179 : fd_rocksdb_first_slot( fd_rocksdb_t * db,
180 0 : char ** err ) {
181 :
182 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
183 0 : rocksdb_iter_seek_to_first(iter);
184 0 : if( FD_UNLIKELY( !rocksdb_iter_valid(iter) ) ) {
185 0 : rocksdb_iter_destroy(iter);
186 0 : *err = "db column for root is empty";
187 0 : return 0;
188 0 : }
189 :
190 0 : ulong klen = 0;
191 0 : char const * key = rocksdb_iter_key( iter, &klen ); // There is no need to free key
192 0 : ulong slot = fd_ulong_bswap( *((ulong *)key));
193 0 : rocksdb_iter_destroy(iter);
194 0 : return slot;
195 0 : }
196 :
197 : int
198 : fd_rocksdb_get_meta( fd_rocksdb_t * db,
199 : ulong slot,
200 : fd_slot_meta_t * m,
201 0 : fd_valloc_t valloc ) {
202 0 : ulong ks = fd_ulong_bswap(slot);
203 0 : size_t vallen = 0;
204 :
205 0 : char * err = NULL;
206 0 : char * meta = rocksdb_get_cf( db->db,
207 0 : db->ro,
208 0 : db->cf_handles[FD_ROCKSDB_CFIDX_META],
209 0 : (const char *) &ks,
210 0 : sizeof(ks),
211 0 : &vallen,
212 0 : &err );
213 :
214 0 : if( NULL != err ) {
215 0 : FD_LOG_WARNING(( "%s", err ));
216 0 : free( err );
217 0 : return -2;
218 0 : }
219 :
220 0 : if (0 == vallen)
221 0 : return -1;
222 :
223 0 : fd_bincode_decode_ctx_t ctx;
224 0 : ctx.data = meta;
225 0 : ctx.dataend = &meta[vallen];
226 :
227 0 : ulong total_sz = 0UL;
228 0 : if( fd_slot_meta_decode_footprint( &ctx, &total_sz ) ) {
229 0 : FD_LOG_ERR(( "fd_slot_meta_decode failed" ));
230 0 : }
231 :
232 0 : uchar * mem = fd_valloc_malloc( valloc, fd_slot_meta_align(), total_sz );
233 0 : if( NULL == mem ) {
234 0 : FD_LOG_ERR(( "fd_valloc_malloc failed" ));
235 0 : }
236 :
237 0 : fd_slot_meta_decode( mem, &ctx );
238 :
239 0 : fd_memcpy( m, mem, sizeof(fd_slot_meta_t) );
240 :
241 0 : free(meta);
242 :
243 0 : return 0;
244 0 : }
245 :
246 : void *
247 0 : fd_rocksdb_root_iter_new ( void * ptr ) {
248 0 : fd_memset(ptr, 0, sizeof(fd_rocksdb_root_iter_t));
249 0 : return ptr;
250 0 : }
251 :
252 : fd_rocksdb_root_iter_t *
253 0 : fd_rocksdb_root_iter_join ( void * ptr ) {
254 0 : return (fd_rocksdb_root_iter_t *) ptr;
255 0 : }
256 :
257 : void *
258 0 : fd_rocksdb_root_iter_leave ( fd_rocksdb_root_iter_t * ptr ) {
259 0 : return ptr;
260 0 : }
261 :
262 : int
263 : fd_rocksdb_root_iter_seek( fd_rocksdb_root_iter_t * self,
264 : fd_rocksdb_t * db,
265 : ulong slot,
266 : fd_slot_meta_t * m,
267 0 : fd_valloc_t valloc ) {
268 0 : self->db = db;
269 :
270 0 : if( FD_UNLIKELY( !self->iter ) )
271 0 : self->iter = rocksdb_create_iterator_cf(self->db->db, self->db->ro, self->db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
272 :
273 0 : ulong ks = fd_ulong_bswap( slot );
274 :
275 0 : rocksdb_iter_seek( self->iter, (char const *)&ks, sizeof(ulong) );
276 0 : if( FD_UNLIKELY( !rocksdb_iter_valid(self->iter) ) )
277 0 : return -1;
278 :
279 0 : size_t klen = 0;
280 0 : char const * key = rocksdb_iter_key( self->iter, &klen ); // There is no need to free key
281 0 : ulong kslot = fd_ulong_bswap( *((ulong *)key) );
282 :
283 0 : if( FD_UNLIKELY( kslot != slot ) ) {
284 0 : FD_LOG_WARNING(( "fd_rocksdb_root_iter_seek: wanted slot %lu, found %lu",
285 0 : slot, kslot ));
286 0 : return -2;
287 0 : }
288 :
289 0 : return fd_rocksdb_get_meta( self->db, slot, m, valloc );
290 0 : }
291 :
292 : int
293 0 : fd_rocksdb_root_iter_slot ( fd_rocksdb_root_iter_t * self, ulong *slot ) {
294 0 : if ((NULL == self->db) || (NULL == self->iter))
295 0 : return -1;
296 :
297 0 : if (!rocksdb_iter_valid(self->iter))
298 0 : return -2;
299 :
300 0 : size_t klen = 0;
301 0 : const char *key = rocksdb_iter_key(self->iter, &klen); // There is no need to free key
302 0 : *slot = fd_ulong_bswap(*((unsigned long *) key));
303 0 : return 0;
304 0 : }
305 :
306 : int
307 : fd_rocksdb_root_iter_next( fd_rocksdb_root_iter_t * self,
308 : fd_slot_meta_t * m,
309 0 : fd_valloc_t valloc ) {
310 0 : if ((NULL == self->db) || (NULL == self->iter))
311 0 : return -1;
312 :
313 0 : if (!rocksdb_iter_valid(self->iter))
314 0 : return -2;
315 :
316 0 : rocksdb_iter_next(self->iter);
317 :
318 0 : if (!rocksdb_iter_valid(self->iter))
319 0 : return -3;
320 :
321 0 : size_t klen = 0;
322 0 : const char *key = rocksdb_iter_key(self->iter, &klen); // There is no need to free key
323 :
324 0 : return fd_rocksdb_get_meta( self->db, fd_ulong_bswap(*((unsigned long *) key)), m, valloc );
325 0 : }
326 :
327 : void
328 0 : fd_rocksdb_root_iter_destroy ( fd_rocksdb_root_iter_t * self ) {
329 0 : if (NULL != self->iter) {
330 0 : rocksdb_iter_destroy(self->iter);
331 0 : self->iter = 0;
332 0 : }
333 0 : self->db = NULL;
334 0 : }
335 :
336 : void *
337 : fd_rocksdb_get_txn_status_raw( fd_rocksdb_t * self,
338 : ulong slot,
339 : void const * sig,
340 0 : ulong * psz ) {
341 :
342 0 : ulong slot_be = fd_ulong_bswap( slot );
343 :
344 : /* Construct RocksDB query key */
345 0 : char key[72];
346 0 : memcpy( key, sig, 64UL );
347 0 : memcpy( key+64UL, &slot_be, 8UL );
348 :
349 : /* Query record */
350 0 : char * err = NULL;
351 0 : char * res = rocksdb_get_cf(
352 0 : self->db, self->ro,
353 0 : self->cf_handles[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ],
354 0 : key, 72UL,
355 0 : psz,
356 0 : &err );
357 :
358 0 : if( FD_UNLIKELY( err ) ) {
359 0 : FD_LOG_WARNING(("err=%s", err));
360 0 : free( err );
361 0 : return NULL;
362 0 : }
363 0 : return res;
364 0 : }
365 :
366 : ulong
367 0 : fd_rocksdb_get_slot( ulong cf_idx, char const * key ) {
368 0 : switch (cf_idx) {
369 0 : case FD_ROCKSDB_CFIDX_TRANSACTION_STATUS:
370 0 : return fd_ulong_bswap(*((ulong *) &key[72])); /* (signature,slot)*/
371 0 : case FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES:
372 0 : return fd_ulong_bswap(*((ulong *) &key[40])); /* (pubkey,slot,u32,signature) */
373 0 : default: /* all other cfs have the slot at the start */
374 0 : return fd_ulong_bswap( *((ulong *)&key[0]) ); /* The key is just the slot number */
375 0 : }
376 :
377 0 : return fd_ulong_bswap( *((ulong *)key) );
378 0 : }
379 :
380 : void
381 0 : fd_rocksdb_iter_seek_to_slot_if_possible( rocksdb_iterator_t * iter, const ulong cf_idx, const ulong slot ) {
382 0 : ulong k = fd_ulong_bswap(slot);
383 0 : switch (cf_idx) {
384 : /* These cfs do not have the slot at the start, we can't seek based on slot prefix */
385 0 : case FD_ROCKSDB_CFIDX_TRANSACTION_STATUS:
386 0 : case FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES:
387 0 : rocksdb_iter_seek_to_first( iter );
388 0 : break;
389 0 : default: /* all other cfs have the slot at the start, seek based on slot prefix */
390 0 : rocksdb_iter_seek( iter, (const char *)&k, 8);
391 0 : break;
392 0 : }
393 0 : }
394 :
395 : int
396 : fd_rocksdb_copy_over_slot_indexed_range( fd_rocksdb_t * src,
397 : fd_rocksdb_t * dst,
398 : ulong cf_idx,
399 : ulong start_slot,
400 0 : ulong end_slot ) {
401 0 : FD_LOG_NOTICE(( "fd_rocksdb_copy_over_slot_indexed_range: %lu", cf_idx ));
402 :
403 0 : if ( cf_idx == FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ||
404 0 : cf_idx == FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ||
405 0 : cf_idx == FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ) {
406 0 : FD_LOG_NOTICE(( "fd_rocksdb_copy_over_range: skipping cf_idx=%lu because not slot indexed", cf_idx ));
407 0 : return 0;
408 0 : }
409 :
410 0 : rocksdb_iterator_t * iter = rocksdb_create_iterator_cf( src->db, src->ro, src->cf_handles[cf_idx] );
411 0 : if ( FD_UNLIKELY( iter == NULL ) ) {
412 0 : FD_LOG_ERR(( "rocksdb_create_iterator_cf failed for cf_idx=%lu", cf_idx ));
413 0 : }
414 :
415 0 : for ( fd_rocksdb_iter_seek_to_slot_if_possible( iter, cf_idx, start_slot ); rocksdb_iter_valid( iter ); rocksdb_iter_next( iter ) ) {
416 0 : ulong klen = 0;
417 0 : char const * key = rocksdb_iter_key( iter, &klen ); // There is no need to free key
418 :
419 0 : ulong slot = fd_rocksdb_get_slot( cf_idx, key );
420 0 : if ( slot < start_slot ) {
421 0 : continue;
422 0 : }
423 0 : else if ( slot > end_slot ) {
424 0 : break;
425 0 : }
426 :
427 0 : ulong vlen = 0;
428 0 : char const * value = rocksdb_iter_value( iter, &vlen );
429 :
430 0 : fd_rocksdb_insert_entry( dst, cf_idx, key, klen, value, vlen );
431 0 : }
432 0 : rocksdb_iter_destroy( iter );
433 0 : return 0;
434 0 : }
435 :
436 : int
437 : fd_rocksdb_insert_entry( fd_rocksdb_t * db,
438 : ulong cf_idx,
439 : const char * key,
440 : ulong klen,
441 : const char * value,
442 : ulong vlen )
443 0 : {
444 0 : char * err = NULL;
445 0 : rocksdb_put_cf( db->db, db->wo, db->cf_handles[cf_idx],
446 0 : key, klen, value, vlen, &err );
447 0 : if( FD_UNLIKELY( err != NULL ) ) {
448 0 : FD_LOG_WARNING(( "rocksdb_put_cf failed with error %s", err ));
449 0 : return -1;
450 0 : }
451 0 : return 0;
452 0 : }
453 :
454 : static void
455 0 : fd_blockstore_scan_block( fd_blockstore_t * blockstore, ulong slot, fd_block_t * block ) {
456 :
457 0 : fd_block_micro_t * micros = fd_alloc_malloc( fd_blockstore_alloc( blockstore ),
458 0 : alignof( fd_block_micro_t ),
459 0 : sizeof( *micros ) * FD_MICROBLOCK_MAX_PER_SLOT );
460 :
461 : /*
462 : * Agave decodes precisely one array of microblocks from each batch.
463 : * As of bincode version 1.3.3, the default deserializer used when
464 : * decoding a batch in the blockstore allows for trailing bytes to be
465 : * ignored.
466 : * https://github.com/anza-xyz/agave/blob/v2.1.0/ledger/src/blockstore.rs#L3764
467 : */
468 0 : uchar allow_trailing = 1UL;
469 :
470 0 : uchar const * data = fd_blockstore_block_data_laddr( blockstore, block );
471 0 : FD_LOG_DEBUG(( "scanning slot %lu, ptr %p, sz %lu", slot, (void *)data, block->data_sz ));
472 :
473 0 : fd_block_entry_batch_t const * batch_laddr = fd_blockstore_block_batch_laddr( blockstore, block );
474 0 : ulong const batch_cnt = block->batch_cnt;
475 :
476 0 : ulong micros_cnt = 0UL;
477 0 : ulong blockoff = 0UL;
478 0 : for( ulong batch_i = 0UL; batch_i < batch_cnt; batch_i++ ) {
479 0 : ulong const batch_end_off = batch_laddr[ batch_i ].end_off;
480 0 : if( blockoff + sizeof( ulong ) > batch_end_off ) FD_LOG_ERR(( "premature end of batch" ));
481 0 : ulong mcount = FD_LOAD( ulong, data + blockoff );
482 0 : blockoff += sizeof( ulong );
483 :
484 : /* Loop across microblocks */
485 0 : for( ulong mblk = 0; mblk < mcount; ++mblk ) {
486 0 : if( blockoff + sizeof( fd_microblock_hdr_t ) > batch_end_off )
487 0 : FD_LOG_ERR(( "premature end of batch" ));
488 0 : if( micros_cnt < FD_MICROBLOCK_MAX_PER_SLOT ) {
489 0 : fd_block_micro_t * m = micros + ( micros_cnt++ );
490 0 : m->off = blockoff;
491 0 : }
492 0 : fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)( data + blockoff );
493 0 : blockoff += sizeof( fd_microblock_hdr_t );
494 :
495 : /* Loop across transactions */
496 0 : for( ulong txn_idx = 0; txn_idx < hdr->txn_cnt; txn_idx++ ) {
497 0 : uchar txn_out[FD_TXN_MAX_SZ];
498 0 : uchar const * raw = data + blockoff;
499 0 : ulong pay_sz = 0;
500 0 : ulong txn_sz = fd_txn_parse_core( (uchar const *)raw,
501 0 : fd_ulong_min( batch_end_off - blockoff, FD_TXN_MTU ),
502 0 : txn_out,
503 0 : NULL,
504 0 : &pay_sz );
505 0 : if( txn_sz == 0 || txn_sz > FD_TXN_MTU ) {
506 0 : FD_LOG_ERR(( "failed to parse transaction %lu in microblock %lu in slot %lu. txn size: %lu",
507 0 : txn_idx,
508 0 : mblk,
509 0 : slot,
510 0 : txn_sz ));
511 0 : }
512 :
513 0 : if( pay_sz == 0UL )
514 0 : FD_LOG_ERR(( "failed to parse transaction %lu in microblock %lu in slot %lu",
515 0 : txn_idx,
516 0 : mblk,
517 0 : slot ));
518 :
519 0 : blockoff += pay_sz;
520 0 : }
521 0 : }
522 0 : if( FD_UNLIKELY( blockoff > batch_end_off ) ) {
523 0 : FD_LOG_ERR(( "parser error: shouldn't have been allowed to read past batch boundary" ));
524 0 : }
525 0 : if( FD_UNLIKELY( blockoff < batch_end_off ) ) {
526 0 : if( FD_LIKELY( allow_trailing ) ) {
527 0 : FD_LOG_DEBUG(( "ignoring %lu trailing bytes in slot %lu batch %lu", batch_end_off-blockoff, slot, batch_i ));
528 0 : }
529 0 : if( FD_UNLIKELY( !allow_trailing ) ) {
530 0 : FD_LOG_ERR(( "%lu trailing bytes in slot %lu batch %lu", batch_end_off-blockoff, slot, batch_i ));
531 0 : }
532 0 : }
533 0 : blockoff = batch_end_off;
534 0 : }
535 :
536 0 : fd_block_micro_t * micros_laddr =
537 0 : fd_alloc_malloc( fd_blockstore_alloc( blockstore ),
538 0 : alignof( fd_block_micro_t ),
539 0 : sizeof( fd_block_micro_t ) * micros_cnt );
540 0 : fd_memcpy( micros_laddr, micros, sizeof( fd_block_micro_t ) * micros_cnt );
541 0 : block->micros_gaddr = fd_wksp_gaddr_fast( fd_blockstore_wksp( blockstore ), micros_laddr );
542 0 : block->micros_cnt = micros_cnt;
543 :
544 0 : fd_alloc_free( fd_blockstore_alloc( blockstore ), micros );
545 0 : }
546 :
547 : static int
548 0 : deshred( fd_blockstore_t * blockstore, ulong slot ) {
549 0 : FD_LOG_NOTICE(( "[%s] slot %lu", __func__, slot ));
550 :
551 : // TODO make this update non blocking
552 0 : fd_block_map_query_t query[1];
553 0 : int err = fd_block_map_prepare( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
554 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
555 0 : FD_TEST( err == FD_MAP_SUCCESS && block_info->slot == slot && block_info->block_gaddr == 0 );
556 : /* FIXME duplicate blocks are not supported */
557 :
558 0 : block_info->ts = fd_log_wallclock();
559 0 : ulong shred_cnt = block_info->slot_complete_idx + 1;
560 0 : fd_block_map_publish( query );
561 :
562 0 : ulong block_sz = 0UL;
563 0 : ulong batch_cnt = 0UL;
564 0 : fd_shred_t shred_hdr;
565 0 : for( uint idx = 0; idx < shred_cnt; idx++ ) {
566 0 : fd_shred_key_t key = { slot, idx };
567 0 : int err = FD_MAP_ERR_AGAIN;
568 0 : while( err == FD_MAP_ERR_AGAIN ) {
569 0 : fd_buf_shred_map_query_t query[1] = { 0 };
570 0 : err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query, 0 );
571 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) FD_LOG_ERR(( "[%s] map missing shred %lu %u while deshredding", __func__, slot, idx ));
572 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] map corrupt. shred %lu %u", __func__, slot, idx ));
573 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
574 0 : fd_buf_shred_t const * shred = fd_buf_shred_map_query_ele_const( query );
575 0 : shred_hdr = shred->hdr;
576 0 : err = fd_buf_shred_map_query_test( query );
577 0 : }
578 0 : FD_TEST( !err );
579 0 : block_sz += fd_shred_payload_sz( &shred_hdr );
580 0 : if( FD_LIKELY( ( shred_hdr.data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE ) ||
581 0 : shred_hdr.data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE ) ) {
582 0 : batch_cnt++;
583 0 : }
584 0 : }
585 :
586 : // alloc mem for the block
587 0 : ulong data_off = fd_ulong_align_up( sizeof(fd_block_t), 128UL );
588 0 : ulong shred_off = fd_ulong_align_up( data_off + block_sz, alignof(fd_block_shred_t) );
589 0 : ulong batch_off = fd_ulong_align_up( shred_off + (sizeof(fd_block_shred_t) * shred_cnt), alignof(fd_block_entry_batch_t) );
590 0 : ulong tot_sz = batch_off + (sizeof(fd_block_entry_batch_t) * batch_cnt);
591 :
592 0 : fd_alloc_t * alloc = fd_blockstore_alloc( blockstore );
593 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
594 0 : fd_block_t * block = fd_alloc_malloc( alloc, 128UL, tot_sz );
595 0 : if( FD_UNLIKELY( !block ) ) {
596 0 : FD_LOG_ERR(( "[%s] OOM: failed to alloc block. blockstore needs to hold in memory all blocks for slots >= SMR, so either increase memory or check for issues with publishing new SMRs.", __func__ ));
597 0 : }
598 :
599 0 : fd_memset( block, 0, sizeof(fd_block_t) );
600 :
601 0 : uchar * data_laddr = (uchar *)((ulong)block + data_off);
602 0 : block->data_gaddr = fd_wksp_gaddr_fast( wksp, data_laddr );
603 0 : block->data_sz = block_sz;
604 0 : fd_block_shred_t * shreds_laddr = (fd_block_shred_t *)((ulong)block + shred_off);
605 0 : block->shreds_gaddr = fd_wksp_gaddr_fast( wksp, shreds_laddr );
606 0 : block->shreds_cnt = shred_cnt;
607 0 : fd_block_entry_batch_t * batch_laddr = (fd_block_entry_batch_t *)((ulong)block + batch_off);
608 0 : block->batch_gaddr = fd_wksp_gaddr_fast( wksp, batch_laddr );
609 0 : block->batch_cnt = batch_cnt;
610 :
611 0 : ulong off = 0UL;
612 0 : ulong batch_i = 0UL;
613 0 : for( uint idx = 0; idx < shred_cnt; idx++ ) {
614 : // TODO can do this in one iteration with block sz loop... massage with deshredder API
615 0 : fd_shred_key_t key = { slot, idx };
616 0 : ulong payload_sz = 0UL;
617 0 : uchar flags = 0;
618 0 : int err = FD_MAP_ERR_AGAIN;
619 0 : while( err == FD_MAP_ERR_AGAIN ) {
620 0 : fd_buf_shred_map_query_t query[1] = { 0 };;
621 0 : err = fd_buf_shred_map_query_try( blockstore->shred_map, &key, NULL, query, 0 );
622 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
623 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) FD_LOG_ERR(( "[%s] map missing shred %lu %u while deshredding", __func__, slot, idx ));
624 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_CORRUPT ) ) FD_LOG_ERR(( "[%s] map corrupt. shred %lu %u", __func__, slot, idx ));
625 0 : fd_shred_t const * shred = &fd_buf_shred_map_query_ele_const( query )->hdr;
626 0 : memcpy( data_laddr + off, fd_shred_data_payload( shred ), fd_shred_payload_sz( shred ) );
627 :
628 0 : shreds_laddr[idx].hdr = *shred;
629 0 : shreds_laddr[idx].off = off;
630 0 : FD_TEST( 0 == memcmp( &shreds_laddr[idx].hdr, shred, sizeof( fd_shred_t ) ) );
631 0 : FD_TEST( 0 == memcmp( data_laddr + shreds_laddr[idx].off, fd_shred_data_payload( shred ), fd_shred_payload_sz( shred ) ) );
632 :
633 0 : payload_sz = fd_shred_payload_sz( shred );
634 0 : flags = shred->data.flags;
635 :
636 0 : err = fd_buf_shred_map_query_test( query );
637 0 : }
638 0 : FD_TEST( !err );
639 0 : off += payload_sz;
640 0 : if( FD_LIKELY( (flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE) || flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE ) ) {
641 0 : batch_laddr[ batch_i++ ].end_off = off;
642 0 : }
643 : // fd_blockstore_shred_remove( blockstore, slot, idx );
644 0 : }
645 0 : if( FD_UNLIKELY( batch_cnt != batch_i ) ) {
646 0 : FD_LOG_ERR(( "batch_cnt(%lu)!=batch_i(%lu) potential memory corruption", batch_cnt, batch_i ));
647 0 : }
648 :
649 0 : fd_blockstore_scan_block( blockstore, slot, block );
650 :
651 : /* Do this last when it's safe */
652 0 : FD_COMPILER_MFENCE();
653 :
654 : // TODO make this non blocking
655 0 : err = fd_block_map_prepare( blockstore->block_map, &slot, NULL, query, FD_MAP_FLAG_BLOCKING );
656 0 : block_info = fd_block_map_query_ele( query );
657 0 : FD_TEST( err == FD_MAP_SUCCESS && block_info->slot == slot );
658 :
659 0 : block_info->block_gaddr = fd_wksp_gaddr_fast( wksp, block );
660 0 : fd_block_micro_t * micros = fd_wksp_laddr_fast( wksp, block->micros_gaddr );
661 0 : uchar * data = fd_wksp_laddr_fast( wksp, block->data_gaddr );
662 0 : fd_microblock_hdr_t * last_micro = (fd_microblock_hdr_t *)( data + micros[block->micros_cnt - 1].off );
663 0 : memcpy( &block_info->block_hash, last_micro->hash, sizeof( fd_hash_t ) );
664 :
665 0 : block_info->flags = fd_uchar_clear_bit( block_info->flags, FD_BLOCK_FLAG_RECEIVING );
666 0 : block_info->flags = fd_uchar_set_bit( block_info->flags, FD_BLOCK_FLAG_COMPLETED );
667 0 : fd_block_map_publish( query );
668 :
669 0 : return FD_BLOCKSTORE_SUCCESS;
670 0 : }
671 :
672 : void
673 : fd_blockstore_block_allocs_remove( fd_blockstore_t * blockstore,
674 0 : ulong slot ){
675 0 : fd_block_map_query_t query[1] = { 0 };
676 0 : ulong block_gaddr = 0;
677 0 : int err = FD_MAP_ERR_AGAIN;
678 0 : while( err == FD_MAP_ERR_AGAIN ) {
679 0 : err = fd_block_map_query_try( blockstore->block_map, &slot, NULL, query, 0 );
680 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
681 0 : if( FD_UNLIKELY( err == FD_MAP_ERR_KEY ) ) return; /* slot not found */
682 0 : fd_block_info_t * block_info = fd_block_map_query_ele( query );
683 0 : if( FD_UNLIKELY( fd_uchar_extract_bit( block_info->flags, FD_BLOCK_FLAG_REPLAYING ) ) ) {
684 0 : FD_LOG_WARNING(( "[%s] slot %lu has replay in progress. not removing.", __func__, slot ));
685 0 : return;
686 0 : }
687 0 : block_gaddr = block_info->block_gaddr;
688 0 : err = fd_block_map_query_test( query );
689 0 : }
690 :
691 : /* Remove all the allocations relating to a block. */
692 :
693 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
694 0 : fd_alloc_t * alloc = fd_blockstore_alloc( blockstore );
695 :
696 0 : fd_block_t * block = fd_wksp_laddr_fast( wksp, block_gaddr );
697 :
698 : /* DO THIS FIRST FOR THREAD SAFETY */
699 0 : FD_COMPILER_MFENCE();
700 : //block_info->block_gaddr = 0;
701 :
702 0 : if( block->micros_gaddr ) fd_alloc_free( alloc, fd_wksp_laddr_fast( wksp, block->micros_gaddr ) );
703 :
704 0 : fd_alloc_free( alloc, block );
705 0 : }
706 :
707 : int
708 : fd_rocksdb_import_block_blockstore( fd_rocksdb_t * db,
709 : fd_slot_meta_t * m,
710 : fd_blockstore_t * blockstore,
711 : const uchar * hash_override,
712 0 : fd_valloc_t valloc ) {
713 0 : ulong slot = m->slot;
714 0 : ulong start_idx = 0;
715 0 : ulong end_idx = m->received;
716 :
717 0 : rocksdb_iterator_t * iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_DATA_SHRED]);
718 :
719 0 : char k[16];
720 0 : ulong slot_be = *((ulong *) &k[0]) = fd_ulong_bswap(slot);
721 0 : *((ulong *) &k[8]) = fd_ulong_bswap(start_idx);
722 :
723 0 : rocksdb_iter_seek(iter, (const char *) k, sizeof(k));
724 :
725 0 : for (ulong i = start_idx; i < end_idx; i++) {
726 0 : ulong cur_slot, index;
727 0 : uchar valid = rocksdb_iter_valid(iter);
728 :
729 0 : if (valid) {
730 0 : size_t klen = 0;
731 0 : const char* key = rocksdb_iter_key(iter, &klen); // There is no need to free key
732 0 : if (klen != 16) // invalid key
733 0 : continue;
734 0 : cur_slot = fd_ulong_bswap(*((ulong *) &key[0]));
735 0 : index = fd_ulong_bswap(*((ulong *) &key[8]));
736 0 : }
737 :
738 0 : if (!valid || cur_slot != slot) {
739 0 : FD_LOG_WARNING(("missing shreds for slot %lu", slot));
740 0 : rocksdb_iter_destroy(iter);
741 0 : return -1;
742 0 : }
743 :
744 0 : if (index != i) {
745 0 : FD_LOG_WARNING(("missing shred %lu at index %lu for slot %lu", i, index, slot));
746 0 : rocksdb_iter_destroy(iter);
747 0 : return -1;
748 0 : }
749 :
750 0 : size_t dlen = 0;
751 : // Data was first copied from disk into memory to make it available to this API
752 0 : const unsigned char *data = (const unsigned char *) rocksdb_iter_value(iter, &dlen);
753 0 : if (data == NULL) {
754 0 : FD_LOG_WARNING(("failed to read shred %lu/%lu", slot, i));
755 0 : rocksdb_iter_destroy(iter);
756 0 : return -1;
757 0 : }
758 :
759 : // This just correctly selects from inside the data pointer to the
760 : // actual data without a memory copy
761 0 : fd_shred_t const * shred = fd_shred_parse( data, (ulong) dlen );
762 0 : if (shred == NULL) {
763 0 : FD_LOG_WARNING(("failed to parse shred %lu/%lu", slot, i));
764 0 : rocksdb_iter_destroy(iter);
765 0 : return -1;
766 0 : }
767 0 : fd_blockstore_shred_insert( blockstore, shred );
768 : // if (rc != FD_BLOCKSTORE_SUCCESS_SLOT_COMPLETE && rc != FD_BLOCKSTORE_SUCCESS) {
769 : // FD_LOG_WARNING(("failed to store shred %lu/%lu", slot, i));
770 : // rocksdb_iter_destroy(iter);
771 : // return -1;
772 : // }
773 :
774 0 : rocksdb_iter_next(iter);
775 0 : }
776 :
777 0 : rocksdb_iter_destroy(iter);
778 :
779 0 : fd_block_info_t * block_info = fd_blockstore_block_map_query( blockstore, slot );
780 0 : if( FD_LIKELY( block_info && fd_blockstore_shreds_complete( blockstore, slot ) ) ) {
781 0 : deshred( blockstore, slot );
782 :
783 0 : size_t vallen = 0;
784 0 : char * err = NULL;
785 0 : char * res = rocksdb_get_cf(
786 0 : db->db,
787 0 : db->ro,
788 0 : db->cf_handles[ FD_ROCKSDB_CFIDX_BLOCKTIME ],
789 0 : (char const *)&slot_be, sizeof(ulong),
790 0 : &vallen,
791 0 : &err );
792 0 : if( FD_UNLIKELY( err ) ) {
793 0 : FD_LOG_WARNING(( "rocksdb: %s", err ));
794 0 : free( err );
795 0 : } else if(vallen == sizeof(ulong)) {
796 0 : block_info->ts = (*(long*)res)*((long)1e9); /* Convert to nanos */
797 0 : free(res);
798 0 : }
799 :
800 0 : vallen = 0;
801 0 : err = NULL;
802 0 : res = rocksdb_get_cf(
803 0 : db->db,
804 0 : db->ro,
805 0 : db->cf_handles[ FD_ROCKSDB_CFIDX_BLOCK_HEIGHT ],
806 0 : (char const *)&slot_be, sizeof(ulong),
807 0 : &vallen,
808 0 : &err );
809 0 : block_info->block_height = 0;
810 0 : if( FD_UNLIKELY( err ) ) {
811 0 : FD_LOG_WARNING(( "rocksdb: %s", err ));
812 0 : free( err );
813 0 : } else if(vallen == sizeof(ulong)) {
814 0 : block_info->block_height = *(ulong*)res;
815 0 : free(res);
816 0 : }
817 :
818 0 : vallen = 0;
819 0 : err = NULL;
820 0 : if (NULL != hash_override)
821 0 : fd_memcpy( block_info->bank_hash.hash, hash_override, 32UL );
822 0 : else {
823 0 : res = rocksdb_get_cf(
824 0 : db->db,
825 0 : db->ro,
826 0 : db->cf_handles[ FD_ROCKSDB_CFIDX_BANK_HASHES ],
827 0 : (char const *)&slot_be, sizeof(ulong),
828 0 : &vallen,
829 0 : &err );
830 0 : if( FD_UNLIKELY( err ) ) {
831 0 : FD_LOG_WARNING(( "rocksdb: %s", err ));
832 0 : free( err );
833 0 : } else {
834 0 : fd_bincode_decode_ctx_t decode = {
835 0 : .data = res,
836 0 : .dataend = res + vallen
837 0 : };
838 0 : ulong total_sz = 0UL;
839 0 : int decode_err = fd_frozen_hash_versioned_decode_footprint( &decode, &total_sz );
840 :
841 0 : uchar * mem = fd_valloc_malloc( valloc, fd_frozen_hash_versioned_align(), total_sz );
842 0 : if( NULL == mem ) {
843 0 : FD_LOG_ERR(( "fd_valloc_malloc failed" ));
844 0 : }
845 :
846 0 : fd_frozen_hash_versioned_t * versioned = fd_frozen_hash_versioned_decode( mem, &decode );
847 0 : if( FD_UNLIKELY( decode_err!=FD_BINCODE_SUCCESS ) ) goto cleanup;
848 0 : if( FD_UNLIKELY( decode.data!=decode.dataend ) ) goto cleanup;
849 0 : if( FD_UNLIKELY( versioned->discriminant !=fd_frozen_hash_versioned_enum_current ) ) goto cleanup;
850 : /* Success */
851 0 : fd_memcpy( block_info->bank_hash.hash, versioned->inner.current.frozen_hash.hash, 32UL );
852 0 : cleanup:
853 0 : free( res );
854 0 : }
855 0 : }
856 0 : }
857 :
858 0 : blockstore->shmem->lps = slot;
859 0 : blockstore->shmem->hcs = slot;
860 0 : blockstore->shmem->wmk = slot;
861 :
862 0 : if( FD_LIKELY( block_info ) ) {
863 0 : block_info->flags =
864 0 : fd_uchar_set_bit(
865 0 : fd_uchar_set_bit(
866 0 : fd_uchar_set_bit(
867 0 : fd_uchar_set_bit(
868 0 : fd_uchar_set_bit(
869 0 : block_info->flags,
870 0 : FD_BLOCK_FLAG_COMPLETED ),
871 0 : FD_BLOCK_FLAG_PROCESSED ),
872 0 : FD_BLOCK_FLAG_EQVOCSAFE ),
873 0 : FD_BLOCK_FLAG_CONFIRMED ),
874 0 : FD_BLOCK_FLAG_FINALIZED );
875 0 : }
876 :
877 0 : return 0;
878 0 : }
879 :
880 : int
881 : fd_rocksdb_import_block_shredcap( fd_rocksdb_t * db,
882 : fd_slot_meta_t * metadata,
883 : fd_io_buffered_ostream_t * ostream,
884 : fd_io_buffered_ostream_t * bank_hash_ostream,
885 0 : fd_valloc_t valloc ) {
886 0 : ulong slot = metadata->slot;
887 :
888 : /* pre_slot_hdr_file_offset is the current offset within the file, but
889 : pre_slot_hdr_file_offset_real accounts for the size of the buffer that has
890 : been filled but not flushed. This value is used to jump back into the file to
891 : populate the payload_sz for the slot header */
892 0 : long pre_slot_hdr_file_offset = lseek( ostream->fd, 0, SEEK_CUR );
893 0 : long pre_slot_hdr_file_offset_real = pre_slot_hdr_file_offset + (long)ostream->wbuf_used;
894 0 : if ( FD_UNLIKELY( pre_slot_hdr_file_offset == -1 ) ) {
895 0 : FD_LOG_ERR(( "lseek error while seeking to current location" ));
896 0 : }
897 :
898 : /* Write slot specific header */
899 0 : fd_shredcap_slot_hdr_t slot_hdr;
900 0 : slot_hdr.magic = FD_SHREDCAP_SLOT_HDR_MAGIC;
901 0 : slot_hdr.version = FD_SHREDCAP_SLOT_HDR_VERSION;
902 0 : slot_hdr.payload_sz = ULONG_MAX; /* This value is populated after slot is processed */
903 0 : slot_hdr.slot = metadata->slot;
904 0 : slot_hdr.consumed = metadata->consumed;
905 0 : slot_hdr.received = metadata->received;
906 0 : slot_hdr.first_shred_timestamp = metadata->first_shred_timestamp;
907 0 : slot_hdr.last_index = metadata->last_index;
908 0 : slot_hdr.parent_slot = metadata->parent_slot;
909 0 : fd_io_buffered_ostream_write( ostream, &slot_hdr, FD_SHREDCAP_SLOT_HDR_FOOTPRINT );
910 :
911 : /* We need to track the payload size */
912 0 : ulong payload_sz = 0;
913 :
914 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf( db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_DATA_SHRED] );
915 :
916 0 : char k[16];
917 0 : ulong slot_be = *((ulong *) &k[0]) = fd_ulong_bswap( slot );
918 0 : *((ulong *) &k[8]) = fd_ulong_bswap( 0 );
919 :
920 0 : rocksdb_iter_seek( iter, (const char *) k, sizeof(k) );
921 :
922 0 : ulong start_idx = 0;
923 0 : ulong end_idx = metadata->received;
924 0 : for ( ulong i = start_idx; i < end_idx; i++ ) {
925 0 : ulong cur_slot, index;
926 0 : uchar valid = rocksdb_iter_valid( iter );
927 :
928 0 : if ( valid ) {
929 0 : size_t klen = 0;
930 0 : const char* key = rocksdb_iter_key( iter, &klen ); // There is no need to free key
931 0 : if ( klen != 16 ) { // invalid key
932 0 : continue;
933 0 : }
934 0 : cur_slot = fd_ulong_bswap(*((ulong *) &key[0]));
935 0 : index = fd_ulong_bswap(*((ulong *) &key[8]));
936 0 : }
937 :
938 0 : if ( !valid || cur_slot != slot ) {
939 0 : FD_LOG_WARNING(( "missing shreds for slot %lu", slot ));
940 0 : rocksdb_iter_destroy( iter );
941 0 : return -1;
942 0 : }
943 :
944 0 : if ( index != i ) {
945 0 : FD_LOG_WARNING(( "missing shred %lu at index %lu for slot %lu", i, index, slot ));
946 0 : rocksdb_iter_destroy( iter );
947 0 : return -1;
948 0 : }
949 :
950 0 : size_t dlen = 0;
951 : // Data was first copied from disk into memory to make it available to this API
952 0 : const unsigned char *data = (const unsigned char *) rocksdb_iter_value( iter, &dlen );
953 0 : if ( data == NULL ) {
954 0 : FD_LOG_WARNING(( "failed to read shred %lu/%lu", slot, i ));
955 0 : rocksdb_iter_destroy( iter );
956 0 : return -1;
957 0 : }
958 :
959 0 : fd_shred_t const * shred = fd_shred_parse( data, (ulong) dlen );
960 0 : if ( shred == NULL ) {
961 0 : FD_LOG_WARNING(( "failed to parse shred %lu/%lu", slot, i ));
962 0 : rocksdb_iter_destroy( iter );
963 0 : return -1;
964 0 : }
965 :
966 : /* Write a shred header and shred. Each shred and it's header will be aligned */
967 0 : char shred_buf[ FD_SHREDCAP_SHRED_MAX ];
968 0 : char * shred_buf_ptr = shred_buf;
969 0 : ushort shred_sz = (ushort)fd_shred_sz( shred );
970 0 : uint shred_boundary_sz = (uint)fd_uint_align_up( shred_sz + FD_SHREDCAP_SHRED_HDR_FOOTPRINT,
971 0 : FD_SHREDCAP_ALIGN ) - FD_SHREDCAP_SHRED_HDR_FOOTPRINT;
972 :
973 0 : fd_memset( shred_buf_ptr, 0, shred_boundary_sz );
974 : /* Populate start of buffer with header */
975 0 : fd_shredcap_shred_hdr_t * shred_hdr = (fd_shredcap_shred_hdr_t*)shred_buf_ptr;
976 0 : shred_hdr->hdr_sz = FD_SHREDCAP_SHRED_HDR_FOOTPRINT;
977 0 : shred_hdr->shred_sz = shred_sz;
978 0 : shred_hdr->shred_boundary_sz = shred_boundary_sz;
979 :
980 : /* Skip ahead and populate rest of buffer with shred and write out */
981 0 : fd_memcpy( shred_buf_ptr + FD_SHREDCAP_SHRED_HDR_FOOTPRINT, shred, shred_boundary_sz );
982 0 : fd_io_buffered_ostream_write( ostream, shred_buf_ptr,
983 0 : shred_boundary_sz + FD_SHREDCAP_SHRED_HDR_FOOTPRINT );
984 :
985 0 : payload_sz += shred_boundary_sz + FD_SHREDCAP_SHRED_HDR_FOOTPRINT;
986 0 : rocksdb_iter_next( iter );
987 0 : }
988 :
989 : /* Update file size */
990 0 : long pre_slot_processed_file_offset = lseek( ostream->fd, 0, SEEK_CUR );
991 0 : if ( FD_UNLIKELY( pre_slot_processed_file_offset == -1 ) ) {
992 0 : FD_LOG_ERR(( "lseek error when seeking to current position" ));
993 0 : }
994 :
995 0 : if ( FD_UNLIKELY( pre_slot_processed_file_offset == pre_slot_hdr_file_offset ) ) {
996 : /* This case is when the payload from the shreds is smaller than the free
997 : space from the write buffer. This means that the buffer was not flushed
998 : at any point. This case is highly unlikely */
999 0 : fd_io_buffered_ostream_flush( ostream );
1000 0 : }
1001 :
1002 : /* Safely assume that the buffer was flushed to the file at least once. Store
1003 : original seek position, skip to position with payload_sz in header, write
1004 : updated payload sz, and then reset seek position. */
1005 0 : long original_offset = lseek( ostream->fd, 0, SEEK_CUR );
1006 0 : if ( FD_UNLIKELY( original_offset == -1 ) ) {
1007 0 : FD_LOG_ERR(( "lseek error when seeking to current position" ));
1008 0 : }
1009 0 : long payload_sz_file_offset = pre_slot_hdr_file_offset_real +
1010 0 : (long)FD_SHREDCAP_SLOT_HDR_PAYLOAD_SZ_OFFSET;
1011 :
1012 0 : long offset;
1013 0 : offset = lseek( ostream->fd, payload_sz_file_offset, SEEK_SET );
1014 0 : if ( FD_UNLIKELY( offset == -1 ) ) {
1015 0 : FD_LOG_ERR(( "lseek error when seeking to offset=%ld", payload_sz_file_offset ));
1016 0 : }
1017 0 : ulong to_write;
1018 0 : fd_io_write( ostream->fd, &payload_sz, sizeof(ulong), sizeof(ulong), &to_write );
1019 :
1020 0 : offset = lseek( ostream->fd, original_offset, SEEK_SET );
1021 0 : if ( FD_UNLIKELY( offset == -1 ) ) {
1022 0 : FD_LOG_ERR(( "lseek error when seeking to offset=%ld", original_offset ));
1023 0 : }
1024 :
1025 : /* Write slot footer */
1026 0 : fd_shredcap_slot_ftr_t slot_ftr;
1027 0 : slot_ftr.magic = FD_SHREDCAP_SLOT_FTR_MAGIC;
1028 0 : slot_ftr.payload_sz = payload_sz;
1029 0 : fd_io_buffered_ostream_write( ostream, &slot_ftr, FD_SHREDCAP_SLOT_FTR_FOOTPRINT );
1030 0 : rocksdb_iter_destroy( iter );
1031 :
1032 : /* Get and write bank hash information to respective file */
1033 0 : size_t vallen = 0;
1034 0 : char * err = NULL;
1035 0 : char * res = rocksdb_get_cf( db->db, db->ro, db->cf_handles[ FD_ROCKSDB_CFIDX_BANK_HASHES ],
1036 0 : (char const *)&slot_be, sizeof(ulong), &vallen, &err );
1037 0 : if( FD_UNLIKELY( err ) ) {
1038 0 : FD_LOG_WARNING((" Could not get bank hash data due to err=%s",err ));
1039 0 : free( err );
1040 0 : } else {
1041 0 : fd_bincode_decode_ctx_t decode = {
1042 0 : .data = res,
1043 0 : .dataend = res + vallen,
1044 0 : };
1045 0 : ulong total_sz = 0UL;
1046 0 : int decode_err = fd_frozen_hash_versioned_decode_footprint( &decode, &total_sz );
1047 :
1048 0 : uchar * mem = fd_valloc_malloc( valloc, fd_frozen_hash_versioned_align(), total_sz );
1049 :
1050 0 : fd_frozen_hash_versioned_t * versioned = fd_frozen_hash_versioned_decode( mem, &decode );
1051 :
1052 0 : if( FD_UNLIKELY( decode_err != FD_BINCODE_SUCCESS ) ) goto cleanup;
1053 0 : if( FD_UNLIKELY( decode.data!=decode.dataend ) ) goto cleanup;
1054 0 : if( FD_UNLIKELY( versioned->discriminant != fd_frozen_hash_versioned_enum_current ) ) goto cleanup;
1055 0 : fd_shredcap_bank_hash_entry_t bank_hash_entry;
1056 0 : bank_hash_entry.slot = slot;
1057 0 : fd_memcpy( &bank_hash_entry.bank_hash, versioned->inner.current.frozen_hash.hash, 32UL );
1058 0 : fd_io_buffered_ostream_write( bank_hash_ostream, &bank_hash_entry, FD_SHREDCAP_BANK_HASH_ENTRY_FOOTPRINT );
1059 0 : cleanup:
1060 0 : free( res );
1061 0 : }
1062 0 : return 0;
1063 0 : }
|