Line data Source code
1 : #include "fd_rocksdb.h"
2 : #include "fd_blockstore.h"
3 : #include "../shredcap/fd_shredcap.h"
4 : #include <stdbool.h>
5 : #include <stdlib.h>
6 : #include <stdio.h>
7 : #include <unistd.h>
8 : #include "../../util/bits/fd_bits.h"
9 :
10 : char *
11 : fd_rocksdb_init( fd_rocksdb_t * db,
12 0 : char const * db_name ) {
13 0 : fd_memset(db, 0, sizeof(fd_rocksdb_t));
14 :
15 0 : db->opts = rocksdb_options_create();
16 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEFAULT ] = "default";
17 0 : db->cfgs[ FD_ROCKSDB_CFIDX_META ] = "meta";
18 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEAD_SLOTS ] = "dead_slots";
19 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DUPLICATE_SLOTS ] = "duplicate_slots";
20 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ERASURE_META ] = "erasure_meta";
21 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ORPHANS ] = "orphans";
22 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BANK_HASHES ] = "bank_hashes";
23 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ROOT ] = "root";
24 0 : db->cfgs[ FD_ROCKSDB_CFIDX_INDEX ] = "index";
25 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DATA_SHRED ] = "data_shred";
26 0 : db->cfgs[ FD_ROCKSDB_CFIDX_CODE_SHRED ] = "code_shred";
27 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ] = "transaction_status";
28 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ] = "address_signatures";
29 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ] = "transaction_memos";
30 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS_INDEX ] = "transaction_status_index";
31 0 : db->cfgs[ FD_ROCKSDB_CFIDX_REWARDS ] = "rewards";
32 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCKTIME ] = "blocktime";
33 0 : db->cfgs[ FD_ROCKSDB_CFIDX_PERF_SAMPLES ] = "perf_samples";
34 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCK_HEIGHT ] = "block_height";
35 0 : db->cfgs[ FD_ROCKSDB_CFIDX_PROGRAM_COSTS ] = "program_costs";
36 0 : db->cfgs[ FD_ROCKSDB_CFIDX_OPTIMISTIC_SLOTS ] = "optimistic_slots";
37 0 : db->cfgs[ FD_ROCKSDB_CFIDX_MERKLE_ROOT_META ] = "merkle_root_meta";
38 :
39 :
40 0 : rocksdb_options_t const * cf_options[ FD_ROCKSDB_CF_CNT ];
41 0 : for( ulong i=0UL; i<FD_ROCKSDB_CF_CNT; i++ )
42 0 : cf_options[ i ] = db->opts;
43 :
44 0 : char *err = NULL;
45 :
46 0 : db->db = rocksdb_open_for_read_only_column_families(
47 0 : db->opts,
48 0 : db_name,
49 0 : FD_ROCKSDB_CF_CNT,
50 0 : (char const * const *)db->cfgs,
51 0 : (rocksdb_options_t const * const *)cf_options,
52 0 : db->cf_handles,
53 0 : false,
54 0 : &err );
55 :
56 0 : if( FD_UNLIKELY( err ) ) return err;
57 :
58 0 : db->ro = rocksdb_readoptions_create();
59 :
60 0 : return NULL;
61 0 : }
62 :
63 : void
64 : fd_rocksdb_new( fd_rocksdb_t * db,
65 0 : char const * db_name ) {
66 0 : fd_memset(db, 0, sizeof(fd_rocksdb_t));
67 :
68 0 : db->opts = rocksdb_options_create();
69 : /* Create the db*/
70 0 : rocksdb_options_set_create_if_missing(db->opts, 1);
71 :
72 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEFAULT ] = "default";
73 0 : db->cfgs[ FD_ROCKSDB_CFIDX_META ] = "meta";
74 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEAD_SLOTS ] = "dead_slots";
75 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DUPLICATE_SLOTS ] = "duplicate_slots";
76 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ERASURE_META ] = "erasure_meta";
77 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ORPHANS ] = "orphans";
78 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BANK_HASHES ] = "bank_hashes";
79 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ROOT ] = "root";
80 0 : db->cfgs[ FD_ROCKSDB_CFIDX_INDEX ] = "index";
81 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DATA_SHRED ] = "data_shred";
82 0 : db->cfgs[ FD_ROCKSDB_CFIDX_CODE_SHRED ] = "code_shred";
83 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ] = "transaction_status";
84 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ] = "address_signatures";
85 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ] = "transaction_memos";
86 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS_INDEX ] = "transaction_status_index";
87 0 : db->cfgs[ FD_ROCKSDB_CFIDX_REWARDS ] = "rewards";
88 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCKTIME ] = "blocktime";
89 0 : db->cfgs[ FD_ROCKSDB_CFIDX_PERF_SAMPLES ] = "perf_samples";
90 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCK_HEIGHT ] = "block_height";
91 0 : db->cfgs[ FD_ROCKSDB_CFIDX_PROGRAM_COSTS ] = "program_costs";
92 0 : db->cfgs[ FD_ROCKSDB_CFIDX_OPTIMISTIC_SLOTS ] = "optimistic_slots";
93 0 : db->cfgs[ FD_ROCKSDB_CFIDX_MERKLE_ROOT_META ] = "merkle_root_meta";
94 :
95 : /* Create the rocksdb */
96 0 : char * err = NULL;
97 0 : db->db = rocksdb_open(db->opts, db_name, &err);
98 0 : if ( err != NULL ) {
99 0 : FD_LOG_ERR(("rocksdb creation failed: %s", err));
100 0 : }
101 :
102 0 : db->wo = rocksdb_writeoptions_create();
103 :
104 : /* Create column families, default already exists at index 0 */
105 0 : for ( ulong i = 1; i < FD_ROCKSDB_CF_CNT; ++i ) {
106 0 : db->cf_handles[i] = rocksdb_create_column_family(db->db, db->opts, db->cfgs[i], &err);
107 0 : }
108 0 : rocksdb_options_set_compression( db->opts, rocksdb_lz4_compression );
109 0 : }
110 :
111 0 : void fd_rocksdb_destroy(fd_rocksdb_t *db) {
112 :
113 0 : for( ulong i=0UL; i<FD_ROCKSDB_CF_CNT; i++ ) {
114 0 : if( db->cf_handles[i] ) {
115 0 : rocksdb_column_family_handle_destroy( db->cf_handles[i] );
116 0 : db->cf_handles[i] = NULL;
117 0 : }
118 0 : }
119 :
120 0 : if( db->ro ) {
121 0 : rocksdb_readoptions_destroy( db->ro );
122 0 : db->ro = NULL;
123 0 : }
124 :
125 0 : if( db->opts ) {
126 0 : rocksdb_options_destroy( db->opts );
127 0 : db->opts = NULL;
128 0 : }
129 :
130 0 : if( db->db ) {
131 0 : rocksdb_close( db->db );
132 0 : db->db = NULL;
133 0 : }
134 :
135 0 : if( db->wo ) {
136 0 : rocksdb_writeoptions_destroy( db->wo );
137 0 : }
138 0 : }
139 :
140 0 : ulong fd_rocksdb_last_slot(fd_rocksdb_t *db, char **err) {
141 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
142 0 : rocksdb_iter_seek_to_last(iter);
143 0 : if (!rocksdb_iter_valid(iter)) {
144 0 : rocksdb_iter_destroy(iter);
145 0 : *err = "db column for root is empty";
146 0 : return 0;
147 0 : }
148 :
149 0 : size_t klen = 0;
150 0 : const char *key = rocksdb_iter_key(iter, &klen); // There is no need to free key
151 0 : unsigned long slot = fd_ulong_bswap(*((unsigned long *) key));
152 0 : rocksdb_iter_destroy(iter);
153 0 : return slot;
154 0 : }
155 :
156 0 : ulong fd_rocksdb_find_last_slot(fd_rocksdb_t *db, char **err) {
157 0 : ulong max_slot = 0;
158 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
159 0 : rocksdb_iter_seek_to_first(iter);
160 0 : if (!rocksdb_iter_valid(iter)) {
161 0 : rocksdb_iter_destroy(iter);
162 0 : *err = "db column for root is empty";
163 0 : return 0;
164 0 : }
165 :
166 0 : for( ; rocksdb_iter_valid(iter); rocksdb_iter_next(iter) ) {
167 0 : size_t klen = 0;
168 0 : const char *key = rocksdb_iter_key(iter, &klen); // There is no need to free key
169 0 : unsigned long slot = fd_ulong_bswap(*((unsigned long *) key));
170 :
171 0 : if( slot > max_slot ) {
172 0 : max_slot = slot;
173 0 : FD_LOG_WARNING(("new max_slot: %lu", max_slot));
174 0 : }
175 0 : }
176 :
177 0 : rocksdb_iter_destroy(iter);
178 0 : return max_slot;
179 0 : }
180 :
181 :
182 : ulong
183 : fd_rocksdb_first_slot( fd_rocksdb_t * db,
184 0 : char ** err ) {
185 :
186 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
187 0 : rocksdb_iter_seek_to_first(iter);
188 0 : if( FD_UNLIKELY( !rocksdb_iter_valid(iter) ) ) {
189 0 : rocksdb_iter_destroy(iter);
190 0 : *err = "db column for root is empty";
191 0 : return 0;
192 0 : }
193 :
194 0 : ulong klen = 0;
195 0 : char const * key = rocksdb_iter_key( iter, &klen ); // There is no need to free key
196 0 : ulong slot = fd_ulong_bswap( *((ulong *)key));
197 0 : rocksdb_iter_destroy(iter);
198 0 : return slot;
199 0 : }
200 :
201 : int
202 : fd_rocksdb_get_meta( fd_rocksdb_t * db,
203 : ulong slot,
204 : fd_slot_meta_t * m,
205 0 : fd_valloc_t valloc ) {
206 0 : ulong ks = fd_ulong_bswap(slot);
207 0 : size_t vallen = 0;
208 :
209 0 : char *err = NULL;
210 0 : char *meta = rocksdb_get_cf(
211 0 : db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_META], (const char *) &ks, sizeof(ks), &vallen, &err);
212 :
213 0 : if (NULL != err) {
214 0 : FD_LOG_WARNING(( "%s", err ));
215 0 : free (err);
216 0 : return -2;
217 0 : }
218 :
219 0 : if (0 == vallen)
220 0 : return -1;
221 :
222 0 : fd_bincode_decode_ctx_t ctx;
223 0 : ctx.data = meta;
224 0 : ctx.dataend = &meta[vallen];
225 0 : ctx.valloc = valloc;
226 0 : if ( fd_slot_meta_decode(m, &ctx) )
227 0 : FD_LOG_ERR(("fd_slot_meta_decode failed"));
228 :
229 0 : free(meta);
230 :
231 0 : return 0;
232 0 : }
233 :
234 : void *
235 0 : fd_rocksdb_root_iter_new ( void * ptr ) {
236 0 : fd_memset(ptr, 0, sizeof(fd_rocksdb_root_iter_t));
237 0 : return ptr;
238 0 : }
239 :
240 : fd_rocksdb_root_iter_t *
241 0 : fd_rocksdb_root_iter_join ( void * ptr ) {
242 0 : return (fd_rocksdb_root_iter_t *) ptr;
243 0 : }
244 :
245 : void *
246 0 : fd_rocksdb_root_iter_leave ( fd_rocksdb_root_iter_t * ptr ) {
247 0 : return ptr;
248 0 : }
249 :
250 : int
251 : fd_rocksdb_root_iter_seek( fd_rocksdb_root_iter_t * self,
252 : fd_rocksdb_t * db,
253 : ulong slot,
254 : fd_slot_meta_t * m,
255 0 : fd_valloc_t valloc ) {
256 0 : self->db = db;
257 :
258 0 : if( FD_UNLIKELY( !self->iter ) )
259 0 : self->iter = rocksdb_create_iterator_cf(self->db->db, self->db->ro, self->db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
260 :
261 0 : ulong ks = fd_ulong_bswap( slot );
262 :
263 0 : rocksdb_iter_seek( self->iter, (char const *)&ks, sizeof(ulong) );
264 0 : if( FD_UNLIKELY( !rocksdb_iter_valid(self->iter) ) )
265 0 : return -1;
266 :
267 0 : size_t klen = 0;
268 0 : char const * key = rocksdb_iter_key( self->iter, &klen ); // There is no need to free key
269 0 : ulong kslot = fd_ulong_bswap( *((ulong *)key) );
270 :
271 0 : if( FD_UNLIKELY( kslot != slot ) ) {
272 0 : FD_LOG_WARNING(( "fd_rocksdb_root_iter_seek: wanted slot %lu, found %lu",
273 0 : slot, kslot ));
274 0 : return -2;
275 0 : }
276 :
277 0 : return fd_rocksdb_get_meta( self->db, slot, m, valloc );
278 0 : }
279 :
280 : int
281 0 : fd_rocksdb_root_iter_slot ( fd_rocksdb_root_iter_t * self, ulong *slot ) {
282 0 : if ((NULL == self->db) || (NULL == self->iter))
283 0 : return -1;
284 :
285 0 : if (!rocksdb_iter_valid(self->iter))
286 0 : return -2;
287 :
288 0 : size_t klen = 0;
289 0 : const char *key = rocksdb_iter_key(self->iter, &klen); // There is no need to free key
290 0 : *slot = fd_ulong_bswap(*((unsigned long *) key));
291 0 : return 0;
292 0 : }
293 :
294 : int
295 : fd_rocksdb_root_iter_next( fd_rocksdb_root_iter_t * self,
296 : fd_slot_meta_t * m,
297 0 : fd_valloc_t valloc ) {
298 0 : if ((NULL == self->db) || (NULL == self->iter))
299 0 : return -1;
300 :
301 0 : if (!rocksdb_iter_valid(self->iter))
302 0 : return -2;
303 :
304 0 : rocksdb_iter_next(self->iter);
305 :
306 0 : if (!rocksdb_iter_valid(self->iter))
307 0 : return -3;
308 :
309 0 : size_t klen = 0;
310 0 : const char *key = rocksdb_iter_key(self->iter, &klen); // There is no need to free key
311 :
312 0 : return fd_rocksdb_get_meta( self->db, fd_ulong_bswap(*((unsigned long *) key)), m, valloc );
313 0 : }
314 :
315 : void
316 0 : fd_rocksdb_root_iter_destroy ( fd_rocksdb_root_iter_t * self ) {
317 0 : if (NULL != self->iter) {
318 0 : rocksdb_iter_destroy(self->iter);
319 0 : self->iter = 0;
320 0 : }
321 0 : self->db = NULL;
322 0 : }
323 :
324 : void *
325 : fd_rocksdb_get_txn_status_raw( fd_rocksdb_t * self,
326 : ulong slot,
327 : void const * sig,
328 0 : ulong * psz ) {
329 :
330 0 : ulong slot_be = fd_ulong_bswap( slot );
331 :
332 : /* Construct RocksDB query key */
333 0 : char key[72];
334 0 : memcpy( key, sig, 64UL );
335 0 : memcpy( key+64UL, &slot_be, 8UL );
336 :
337 : /* Query record */
338 0 : char * err = NULL;
339 0 : char * res = rocksdb_get_cf(
340 0 : self->db, self->ro,
341 0 : self->cf_handles[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ],
342 0 : key, 72UL,
343 0 : psz,
344 0 : &err );
345 :
346 0 : if( FD_UNLIKELY( err ) ) {
347 0 : FD_LOG_WARNING(("err=%s", err));
348 0 : free( err );
349 0 : return NULL;
350 0 : }
351 0 : return res;
352 0 : }
353 :
354 : ulong
355 0 : fd_rocksdb_get_slot( ulong cf_idx, char const * key ) {
356 0 : switch (cf_idx) {
357 0 : case FD_ROCKSDB_CFIDX_TRANSACTION_STATUS:
358 0 : return fd_ulong_bswap(*((ulong *) &key[72])); /* (signature,slot)*/
359 0 : case FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES:
360 0 : return fd_ulong_bswap(*((ulong *) &key[40])); /* (pubkey,slot,u32,signature) */
361 0 : default: /* all other cfs have the slot at the start */
362 0 : return fd_ulong_bswap( *((ulong *)&key[0]) ); /* The key is just the slot number */
363 0 : }
364 :
365 0 : return fd_ulong_bswap( *((ulong *)key) );
366 0 : }
367 :
368 : void
369 0 : fd_rocksdb_iter_seek_to_slot_if_possible( rocksdb_iterator_t * iter, const ulong cf_idx, const ulong slot ) {
370 0 : ulong k = fd_ulong_bswap(slot);
371 0 : switch (cf_idx) {
372 : /* These cfs do not have the slot at the start, we can't seek based on slot prefix */
373 0 : case FD_ROCKSDB_CFIDX_TRANSACTION_STATUS:
374 0 : case FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES:
375 0 : rocksdb_iter_seek_to_first( iter );
376 0 : break;
377 0 : default: /* all other cfs have the slot at the start, seek based on slot prefix */
378 0 : rocksdb_iter_seek( iter, (const char *)&k, 8);
379 0 : break;
380 0 : }
381 0 : }
382 :
383 : int
384 : fd_rocksdb_copy_over_slot_indexed_range( fd_rocksdb_t * src,
385 : fd_rocksdb_t * dst,
386 : ulong cf_idx,
387 : ulong start_slot,
388 0 : ulong end_slot ) {
389 0 : FD_LOG_NOTICE(( "fd_rocksdb_copy_over_slot_indexed_range: %lu", cf_idx ));
390 :
391 0 : if ( cf_idx == FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ||
392 0 : cf_idx == FD_ROCKSDB_CFIDX_PROGRAM_COSTS ||
393 0 : cf_idx == FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ||
394 0 : cf_idx == FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ) {
395 0 : FD_LOG_NOTICE(("fd_rocksdb_copy_over_range: skipping cf_idx=%lu because not slot indexed", cf_idx));
396 0 : return 0;
397 0 : }
398 :
399 0 : rocksdb_iterator_t * iter = rocksdb_create_iterator_cf( src->db, src->ro, src->cf_handles[cf_idx] );
400 0 : if ( FD_UNLIKELY( iter == NULL ) ) {
401 0 : FD_LOG_ERR(("rocksdb_create_iterator_cf failed for cf_idx=%lu", cf_idx));
402 0 : }
403 :
404 0 : for ( fd_rocksdb_iter_seek_to_slot_if_possible( iter, cf_idx, start_slot ); rocksdb_iter_valid( iter ); rocksdb_iter_next( iter ) ) {
405 0 : ulong klen = 0;
406 0 : char const * key = rocksdb_iter_key( iter, &klen ); // There is no need to free key
407 :
408 0 : ulong slot = fd_rocksdb_get_slot( cf_idx, key );
409 0 : if ( slot < start_slot ) {
410 0 : continue;
411 0 : }
412 0 : else if ( slot > end_slot ) {
413 0 : break;
414 0 : }
415 :
416 0 : ulong vlen = 0;
417 0 : char const * value = rocksdb_iter_value( iter, &vlen );
418 :
419 0 : fd_rocksdb_insert_entry( dst, cf_idx, key, klen, value, vlen );
420 0 : }
421 0 : rocksdb_iter_destroy( iter );
422 0 : return 0;
423 0 : }
424 :
425 : int
426 : fd_rocksdb_copy_over_txn_status_range( fd_rocksdb_t * src,
427 : fd_rocksdb_t * dst,
428 : fd_blockstore_t * blockstore,
429 : ulong start_slot,
430 0 : ulong end_slot ) {
431 : /* Look up the blocks data and iterate through its transactions */
432 0 : fd_block_map_t * block_map = fd_blockstore_block_map( blockstore );
433 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
434 :
435 0 : for ( ulong slot = start_slot; slot <= end_slot; ++slot ) {
436 0 : FD_LOG_NOTICE(( "fd_rocksdb_copy_over_txn_status_range: %lu", slot ));
437 0 : fd_block_map_t * block_entry = fd_block_map_query( block_map, &slot, NULL );
438 0 : if( FD_LIKELY( block_entry && block_entry->block_gaddr ) ) {
439 0 : fd_block_t * blk = fd_wksp_laddr_fast( wksp, block_entry->block_gaddr );
440 0 : uchar * data = fd_wksp_laddr_fast( wksp, blk->data_gaddr );
441 0 : fd_block_txn_ref_t * txns = fd_wksp_laddr_fast( wksp, blk->txns_gaddr );
442 0 : ulong last_txn_off = ULONG_MAX;
443 0 : for ( ulong j = 0; j < blk->txns_cnt; ++j ) {
444 0 : fd_blockstore_txn_key_t sig;
445 0 : fd_memcpy( &sig, data + txns[j].id_off, sizeof(sig) );
446 0 : if( txns[j].txn_off != last_txn_off ) {
447 0 : last_txn_off = txns[j].txn_off;
448 0 : fd_rocksdb_copy_over_txn_status( src, dst, slot, &sig );
449 0 : }
450 0 : }
451 0 : }
452 0 : }
453 0 : return 0;
454 0 : }
455 :
456 : void
457 : fd_rocksdb_copy_over_txn_status( fd_rocksdb_t * src,
458 : fd_rocksdb_t * dst,
459 : ulong slot,
460 0 : void const * sig ) {
461 0 : ulong slot_be = fd_ulong_bswap( slot );
462 :
463 : /* Construct RocksDB query key */
464 : /* TODO: Replace with constants */
465 0 : char key[ 72 ];
466 0 : memcpy( key, sig, 64UL );
467 0 : memcpy( key+64UL, &slot_be, 8UL );
468 :
469 : /* Query record */
470 0 : ulong sz;
471 0 : char * err = NULL;
472 0 : char * res = rocksdb_get_cf(
473 0 : src->db, src->ro, src->cf_handles[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ],
474 0 : key, 72UL, &sz, &err );
475 :
476 0 : if( FD_UNLIKELY( err ) ) {
477 0 : FD_LOG_WARNING(("err=%s", err));
478 0 : free( err );
479 0 : return;
480 0 : }
481 :
482 0 : fd_rocksdb_insert_entry( dst, FD_ROCKSDB_CFIDX_TRANSACTION_STATUS, key, 72UL, res, sz );
483 0 : }
484 :
485 : int
486 : fd_rocksdb_insert_entry( fd_rocksdb_t * db,
487 : ulong cf_idx,
488 : const char * key,
489 : ulong klen,
490 : const char * value,
491 : ulong vlen )
492 0 : {
493 0 : char * err = NULL;
494 0 : rocksdb_put_cf( db->db, db->wo, db->cf_handles[cf_idx],
495 0 : key, klen, value, vlen, &err );
496 0 : if( FD_UNLIKELY( err != NULL ) ) {
497 0 : FD_LOG_WARNING(( "rocksdb_put_cf failed with error %s", err ));
498 0 : return -1;
499 0 : }
500 0 : return 0;
501 0 : }
502 :
503 : int
504 : fd_rocksdb_import_block_blockstore( fd_rocksdb_t * db,
505 : fd_slot_meta_t * m,
506 : fd_blockstore_t * blockstore,
507 : int txnstatus,
508 : const uchar *hash_override ) // How much effort should we go to here to confirm the size of the hash override?
509 0 : {
510 0 : fd_blockstore_start_write( blockstore );
511 :
512 0 : ulong slot = m->slot;
513 0 : ulong start_idx = 0;
514 0 : ulong end_idx = m->received;
515 :
516 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_DATA_SHRED]);
517 :
518 0 : char k[16];
519 0 : ulong slot_be = *((ulong *) &k[0]) = fd_ulong_bswap(slot);
520 0 : *((ulong *) &k[8]) = fd_ulong_bswap(start_idx);
521 :
522 0 : rocksdb_iter_seek(iter, (const char *) k, sizeof(k));
523 :
524 0 : for (ulong i = start_idx; i < end_idx; i++) {
525 0 : ulong cur_slot, index;
526 0 : uchar valid = rocksdb_iter_valid(iter);
527 :
528 0 : if (valid) {
529 0 : size_t klen = 0;
530 0 : const char* key = rocksdb_iter_key(iter, &klen); // There is no need to free key
531 0 : if (klen != 16) // invalid key
532 0 : continue;
533 0 : cur_slot = fd_ulong_bswap(*((ulong *) &key[0]));
534 0 : index = fd_ulong_bswap(*((ulong *) &key[8]));
535 0 : }
536 :
537 0 : if (!valid || cur_slot != slot) {
538 0 : FD_LOG_WARNING(("missing shreds for slot %lu", slot));
539 0 : rocksdb_iter_destroy(iter);
540 0 : fd_blockstore_end_write(blockstore);
541 0 : return -1;
542 0 : }
543 :
544 0 : if (index != i) {
545 0 : FD_LOG_WARNING(("missing shred %lu at index %lu for slot %lu", i, index, slot));
546 0 : rocksdb_iter_destroy(iter);
547 0 : fd_blockstore_end_write(blockstore);
548 0 : return -1;
549 0 : }
550 :
551 0 : size_t dlen = 0;
552 : // Data was first copied from disk into memory to make it available to this API
553 0 : const unsigned char *data = (const unsigned char *) rocksdb_iter_value(iter, &dlen);
554 0 : if (data == NULL) {
555 0 : FD_LOG_WARNING(("failed to read shred %lu/%lu", slot, i));
556 0 : rocksdb_iter_destroy(iter);
557 0 : fd_blockstore_end_write(blockstore);
558 0 : return -1;
559 0 : }
560 :
561 : // This just correctly selects from inside the data pointer to the
562 : // actual data without a memory copy
563 0 : fd_shred_t const * shred = fd_shred_parse( data, (ulong) dlen );
564 0 : if (shred == NULL) {
565 0 : FD_LOG_WARNING(("failed to parse shred %lu/%lu", slot, i));
566 0 : rocksdb_iter_destroy(iter);
567 0 : fd_blockstore_end_write(blockstore);
568 0 : return -1;
569 0 : }
570 0 : int rc = fd_buf_shred_insert( blockstore, shred );
571 0 : if (rc != FD_BLOCKSTORE_OK_SLOT_COMPLETE && rc != FD_BLOCKSTORE_OK) {
572 0 : FD_LOG_WARNING(("failed to store shred %lu/%lu", slot, i));
573 0 : rocksdb_iter_destroy(iter);
574 0 : fd_blockstore_end_write(blockstore);
575 0 : return -1;
576 0 : }
577 :
578 0 : rocksdb_iter_next(iter);
579 0 : }
580 :
581 0 : rocksdb_iter_destroy(iter);
582 :
583 0 : fd_wksp_t * wksp = fd_blockstore_wksp( blockstore );
584 0 : fd_block_map_t * block_map_entry = fd_blockstore_block_map_query( blockstore, slot );
585 0 : if( FD_LIKELY( block_map_entry && block_map_entry->block_gaddr ) ) {
586 0 : size_t vallen = 0;
587 0 : char * err = NULL;
588 0 : char * res = rocksdb_get_cf(
589 0 : db->db,
590 0 : db->ro,
591 0 : db->cf_handles[ FD_ROCKSDB_CFIDX_BLOCKTIME ],
592 0 : (char const *)&slot_be, sizeof(ulong),
593 0 : &vallen,
594 0 : &err );
595 0 : if( FD_UNLIKELY( err ) ) {
596 0 : FD_LOG_WARNING(( "rocksdb: %s", err ));
597 0 : free( err );
598 0 : } else if(vallen == sizeof(ulong)) {
599 0 : block_map_entry->ts = (*(long*)res)*((long)1e9); /* Convert to nanos */
600 0 : free(res);
601 0 : }
602 :
603 0 : vallen = 0;
604 0 : err = NULL;
605 0 : res = rocksdb_get_cf(
606 0 : db->db,
607 0 : db->ro,
608 0 : db->cf_handles[ FD_ROCKSDB_CFIDX_BLOCK_HEIGHT ],
609 0 : (char const *)&slot_be, sizeof(ulong),
610 0 : &vallen,
611 0 : &err );
612 0 : block_map_entry->height = 0;
613 0 : if( FD_UNLIKELY( err ) ) {
614 0 : FD_LOG_WARNING(( "rocksdb: %s", err ));
615 0 : free( err );
616 0 : } else if(vallen == sizeof(ulong)) {
617 0 : block_map_entry->height = *(ulong*)res;
618 0 : free(res);
619 0 : }
620 :
621 0 : vallen = 0;
622 0 : err = NULL;
623 0 : if (NULL != hash_override)
624 0 : fd_memcpy( block_map_entry->bank_hash.hash, hash_override, 32UL );
625 0 : else {
626 0 : res = rocksdb_get_cf(
627 0 : db->db,
628 0 : db->ro,
629 0 : db->cf_handles[ FD_ROCKSDB_CFIDX_BANK_HASHES ],
630 0 : (char const *)&slot_be, sizeof(ulong),
631 0 : &vallen,
632 0 : &err );
633 0 : if( FD_UNLIKELY( err ) ) {
634 0 : FD_LOG_WARNING(( "rocksdb: %s", err ));
635 0 : free( err );
636 0 : } else {
637 0 : fd_scratch_push();
638 0 : fd_bincode_decode_ctx_t decode = {
639 0 : .data = res,
640 0 : .dataend = res + vallen,
641 0 : .valloc = fd_scratch_virtual(),
642 0 : };
643 0 : fd_frozen_hash_versioned_t versioned;
644 0 : int decode_err = fd_frozen_hash_versioned_decode( &versioned, &decode );
645 0 : if( FD_UNLIKELY( decode_err!=FD_BINCODE_SUCCESS ) ) goto cleanup;
646 0 : if( FD_UNLIKELY( decode.data!=decode.dataend ) ) goto cleanup;
647 0 : if( FD_UNLIKELY( versioned.discriminant !=fd_frozen_hash_versioned_enum_current ) ) goto cleanup;
648 : /* Success */
649 0 : fd_memcpy( block_map_entry->bank_hash.hash, versioned.inner.current.frozen_hash.hash, 32UL );
650 0 : cleanup:
651 0 : free( res );
652 0 : fd_scratch_pop();
653 0 : }
654 0 : }
655 0 : }
656 :
657 0 : if( txnstatus && FD_LIKELY( block_map_entry && block_map_entry->block_gaddr ) ) {
658 0 : fd_block_t * blk = fd_wksp_laddr_fast( wksp, block_map_entry->block_gaddr );
659 0 : uchar * data = fd_wksp_laddr_fast( wksp, blk->data_gaddr );
660 0 : fd_block_txn_ref_t * txns = fd_wksp_laddr_fast( wksp, blk->txns_gaddr );
661 :
662 : /* Compute the total size of the logs */
663 0 : ulong tot_meta_sz = 2*sizeof(ulong);
664 0 : for ( ulong j = 0; j < blk->txns_cnt; ++j ) {
665 0 : if( j == 0 || txns[j].txn_off != txns[j-1].txn_off ) {
666 0 : fd_blockstore_txn_key_t sig;
667 0 : fd_memcpy( &sig, data + txns[j].id_off, sizeof( sig ) );
668 0 : ulong sz;
669 0 : void * raw = fd_rocksdb_get_txn_status_raw( db, slot, &sig, &sz );
670 0 : if( raw != NULL ) {
671 0 : free(raw);
672 0 : tot_meta_sz += sz;
673 0 : }
674 0 : }
675 0 : }
676 0 : fd_alloc_t * alloc = fd_wksp_laddr_fast( wksp, blockstore->alloc_gaddr );
677 0 : uchar * cur_laddr = fd_alloc_malloc( alloc, 1, tot_meta_sz );
678 0 : if( cur_laddr == NULL ) {
679 0 : fd_blockstore_end_write(blockstore);
680 0 : return 0;
681 0 : }
682 0 : ((ulong*)cur_laddr)[0] = blk->txns_meta_gaddr; /* Link to previous allocation */
683 0 : ((ulong*)cur_laddr)[1] = blk->txns_meta_sz;
684 0 : blk->txns_meta_gaddr = fd_wksp_gaddr_fast( wksp, cur_laddr );
685 0 : blk->txns_meta_sz = tot_meta_sz;
686 0 : cur_laddr += 2*sizeof(ulong);
687 :
688 : /* Copy over the logs */
689 0 : fd_blockstore_txn_map_t * txn_map = fd_wksp_laddr_fast( wksp, blockstore->txn_map_gaddr );
690 0 : ulong meta_gaddr = 0;
691 0 : ulong meta_sz = 0;
692 0 : fd_blockstore_txn_key_t sig = { 0 };
693 0 : for ( ulong j = 0; j < blk->txns_cnt; ++j ) {
694 0 : if( j == 0 || txns[j].txn_off != txns[j-1].txn_off ) {
695 0 : fd_memcpy( &sig, data + txns[j].id_off, sizeof( sig ) );
696 0 : ulong sz;
697 0 : void * raw = fd_rocksdb_get_txn_status_raw( db, slot, &sig, &sz );
698 0 : if( raw == NULL ) {
699 0 : meta_gaddr = 0;
700 0 : meta_sz = 0;
701 0 : } else {
702 0 : fd_memcpy(cur_laddr, raw, sz);
703 0 : free(raw);
704 0 : meta_gaddr = fd_wksp_gaddr_fast( wksp, cur_laddr );
705 0 : meta_sz = sz;
706 0 : cur_laddr += sz;
707 0 : }
708 0 : }
709 0 : fd_blockstore_txn_map_t * txn_map_entry = fd_blockstore_txn_map_query( txn_map, &sig, NULL );
710 0 : if( FD_UNLIKELY( !txn_map_entry ) ) {
711 0 : char sig_str[ FD_BASE58_ENCODED_64_SZ ];
712 0 : fd_base58_encode_64( fd_type_pun_const( sig.v ), NULL, sig_str );
713 0 : FD_LOG_WARNING(( "missing transaction %s", sig_str ));
714 0 : continue;
715 0 : }
716 0 : txn_map_entry->meta_gaddr = meta_gaddr;
717 0 : txn_map_entry->meta_sz = meta_sz;
718 0 : }
719 :
720 0 : FD_TEST( blk->txns_meta_gaddr + blk->txns_meta_sz == fd_wksp_gaddr_fast( wksp, cur_laddr ) );
721 0 : }
722 :
723 0 : if( slot > blockstore->max ) {
724 0 : blockstore->max = blockstore->hcs = slot;
725 0 : }
726 0 : if( FD_LIKELY( block_map_entry ) ) {
727 0 : block_map_entry->flags = fd_uchar_set_bit( block_map_entry->flags, FD_BLOCK_FLAG_COMPLETED );
728 0 : }
729 :
730 0 : fd_blockstore_end_write(blockstore);
731 0 : return 0;
732 0 : }
733 :
734 : int
735 : fd_rocksdb_import_block_shredcap( fd_rocksdb_t * db,
736 : fd_slot_meta_t * metadata,
737 : fd_io_buffered_ostream_t * ostream,
738 0 : fd_io_buffered_ostream_t * bank_hash_ostream ) {
739 0 : ulong slot = metadata->slot;
740 :
741 : /* pre_slot_hdr_file_offset is the current offset within the file, but
742 : pre_slot_hdr_file_offset_real accounts for the size of the buffer that has
743 : been filled but not flushed. This value is used to jump back into the file to
744 : populate the payload_sz for the slot header */
745 0 : long pre_slot_hdr_file_offset = lseek( ostream->fd, 0, SEEK_CUR );
746 0 : long pre_slot_hdr_file_offset_real = pre_slot_hdr_file_offset + (long)ostream->wbuf_used;
747 0 : if ( FD_UNLIKELY( pre_slot_hdr_file_offset == -1 ) ) {
748 0 : FD_LOG_ERR(( "lseek error while seeking to current location" ));
749 0 : }
750 :
751 : /* Write slot specific header */
752 0 : fd_shredcap_slot_hdr_t slot_hdr;
753 0 : slot_hdr.magic = FD_SHREDCAP_SLOT_HDR_MAGIC;
754 0 : slot_hdr.version = FD_SHREDCAP_SLOT_HDR_VERSION;
755 0 : slot_hdr.payload_sz = ULONG_MAX; /* This value is populated after slot is processed */
756 0 : slot_hdr.slot = metadata->slot;
757 0 : slot_hdr.consumed = metadata->consumed;
758 0 : slot_hdr.received = metadata->received;
759 0 : slot_hdr.first_shred_timestamp = metadata->first_shred_timestamp;
760 0 : slot_hdr.last_index = metadata->last_index;
761 0 : slot_hdr.parent_slot = metadata->parent_slot;
762 0 : fd_io_buffered_ostream_write( ostream, &slot_hdr, FD_SHREDCAP_SLOT_HDR_FOOTPRINT );
763 :
764 : /* We need to track the payload size */
765 0 : ulong payload_sz = 0;
766 :
767 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf( db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_DATA_SHRED] );
768 :
769 0 : char k[16];
770 0 : ulong slot_be = *((ulong *) &k[0]) = fd_ulong_bswap( slot );
771 0 : *((ulong *) &k[8]) = fd_ulong_bswap( 0 );
772 :
773 0 : rocksdb_iter_seek( iter, (const char *) k, sizeof(k) );
774 :
775 0 : ulong start_idx = 0;
776 0 : ulong end_idx = metadata->received;
777 0 : for ( ulong i = start_idx; i < end_idx; i++ ) {
778 0 : ulong cur_slot, index;
779 0 : uchar valid = rocksdb_iter_valid( iter );
780 :
781 0 : if ( valid ) {
782 0 : size_t klen = 0;
783 0 : const char* key = rocksdb_iter_key( iter, &klen ); // There is no need to free key
784 0 : if ( klen != 16 ) { // invalid key
785 0 : continue;
786 0 : }
787 0 : cur_slot = fd_ulong_bswap(*((ulong *) &key[0]));
788 0 : index = fd_ulong_bswap(*((ulong *) &key[8]));
789 0 : }
790 :
791 0 : if ( !valid || cur_slot != slot ) {
792 0 : FD_LOG_WARNING(( "missing shreds for slot %lu", slot ));
793 0 : rocksdb_iter_destroy( iter );
794 0 : return -1;
795 0 : }
796 :
797 0 : if ( index != i ) {
798 0 : FD_LOG_WARNING(( "missing shred %lu at index %lu for slot %lu", i, index, slot ));
799 0 : rocksdb_iter_destroy( iter );
800 0 : return -1;
801 0 : }
802 :
803 0 : size_t dlen = 0;
804 : // Data was first copied from disk into memory to make it available to this API
805 0 : const unsigned char *data = (const unsigned char *) rocksdb_iter_value( iter, &dlen );
806 0 : if ( data == NULL ) {
807 0 : FD_LOG_WARNING(( "failed to read shred %lu/%lu", slot, i ));
808 0 : rocksdb_iter_destroy( iter );
809 0 : return -1;
810 0 : }
811 :
812 0 : fd_shred_t const * shred = fd_shred_parse( data, (ulong) dlen );
813 0 : if ( shred == NULL ) {
814 0 : FD_LOG_WARNING(( "failed to parse shred %lu/%lu", slot, i ));
815 0 : rocksdb_iter_destroy( iter );
816 0 : return -1;
817 0 : }
818 :
819 : /* Write a shred header and shred. Each shred and it's header will be aligned */
820 0 : char shred_buf[ FD_SHREDCAP_SHRED_MAX ];
821 0 : char * shred_buf_ptr = shred_buf;
822 0 : ushort shred_sz = (ushort)fd_shred_sz( shred );
823 0 : uint shred_boundary_sz = (uint)fd_uint_align_up( shred_sz + FD_SHREDCAP_SHRED_HDR_FOOTPRINT,
824 0 : FD_SHREDCAP_ALIGN ) - FD_SHREDCAP_SHRED_HDR_FOOTPRINT;
825 :
826 0 : fd_memset( shred_buf_ptr, 0, shred_boundary_sz );
827 : /* Populate start of buffer with header */
828 0 : fd_shredcap_shred_hdr_t * shred_hdr = (fd_shredcap_shred_hdr_t*)shred_buf_ptr;
829 0 : shred_hdr->hdr_sz = FD_SHREDCAP_SHRED_HDR_FOOTPRINT;
830 0 : shred_hdr->shred_sz = shred_sz;
831 0 : shred_hdr->shred_boundary_sz = shred_boundary_sz;
832 :
833 : /* Skip ahead and populate rest of buffer with shred and write out */
834 0 : fd_memcpy( shred_buf_ptr + FD_SHREDCAP_SHRED_HDR_FOOTPRINT, shred, shred_boundary_sz );
835 0 : fd_io_buffered_ostream_write( ostream, shred_buf_ptr,
836 0 : shred_boundary_sz + FD_SHREDCAP_SHRED_HDR_FOOTPRINT );
837 :
838 0 : payload_sz += shred_boundary_sz + FD_SHREDCAP_SHRED_HDR_FOOTPRINT;
839 0 : rocksdb_iter_next( iter );
840 0 : }
841 :
842 : /* Update file size */
843 0 : long pre_slot_processed_file_offset = lseek( ostream->fd, 0, SEEK_CUR );
844 0 : if ( FD_UNLIKELY( pre_slot_processed_file_offset == -1 ) ) {
845 0 : FD_LOG_ERR(( "lseek error when seeking to current position" ));
846 0 : }
847 :
848 0 : if ( FD_UNLIKELY( pre_slot_processed_file_offset == pre_slot_hdr_file_offset ) ) {
849 : /* This case is when the payload from the shreds is smaller than the free
850 : space from the write buffer. This means that the buffer was not flushed
851 : at any point. This case is highly unlikely */
852 0 : fd_io_buffered_ostream_flush( ostream );
853 0 : }
854 :
855 : /* Safely assume that the buffer was flushed to the file at least once. Store
856 : original seek position, skip to position with payload_sz in header, write
857 : updated payload sz, and then reset seek position. */
858 0 : long original_offset = lseek( ostream->fd, 0, SEEK_CUR );
859 0 : if ( FD_UNLIKELY( original_offset == -1 ) ) {
860 0 : FD_LOG_ERR(( "lseek error when seeking to current position" ));
861 0 : }
862 0 : long payload_sz_file_offset = pre_slot_hdr_file_offset_real +
863 0 : (long)FD_SHREDCAP_SLOT_HDR_PAYLOAD_SZ_OFFSET;
864 :
865 0 : long offset;
866 0 : offset = lseek( ostream->fd, payload_sz_file_offset, SEEK_SET );
867 0 : if ( FD_UNLIKELY( offset == -1 ) ) {
868 0 : FD_LOG_ERR(( "lseek error when seeking to offset=%ld", payload_sz_file_offset ));
869 0 : }
870 0 : ulong to_write;
871 0 : fd_io_write( ostream->fd, &payload_sz, sizeof(ulong), sizeof(ulong), &to_write );
872 :
873 0 : offset = lseek( ostream->fd, original_offset, SEEK_SET );
874 0 : if ( FD_UNLIKELY( offset == -1 ) ) {
875 0 : FD_LOG_ERR(( "lseek error when seeking to offset=%ld", original_offset ));
876 0 : }
877 :
878 : /* Write slot footer */
879 0 : fd_shredcap_slot_ftr_t slot_ftr;
880 0 : slot_ftr.magic = FD_SHREDCAP_SLOT_FTR_MAGIC;
881 0 : slot_ftr.payload_sz = payload_sz;
882 0 : fd_io_buffered_ostream_write( ostream, &slot_ftr, FD_SHREDCAP_SLOT_FTR_FOOTPRINT );
883 0 : rocksdb_iter_destroy( iter );
884 :
885 : /* Get and write bank hash information to respective file */
886 0 : size_t vallen = 0;
887 0 : char * err = NULL;
888 0 : char * res = rocksdb_get_cf( db->db, db->ro, db->cf_handles[ FD_ROCKSDB_CFIDX_BANK_HASHES ],
889 0 : (char const *)&slot_be, sizeof(ulong), &vallen, &err );
890 0 : if( FD_UNLIKELY( err ) ) {
891 0 : FD_LOG_WARNING((" Could not get bank hash data due to err=%s",err ));
892 0 : free( err );
893 0 : } else {
894 0 : fd_scratch_push();
895 0 : fd_bincode_decode_ctx_t decode = {
896 0 : .data = res,
897 0 : .dataend = res + vallen,
898 0 : .valloc = fd_scratch_virtual(),
899 0 : };
900 0 : fd_frozen_hash_versioned_t versioned;
901 0 : int decode_err = fd_frozen_hash_versioned_decode( &versioned, &decode );
902 0 : if( FD_UNLIKELY( decode_err != FD_BINCODE_SUCCESS ) ) goto cleanup;
903 0 : if( FD_UNLIKELY( decode.data!=decode.dataend ) ) goto cleanup;
904 0 : if( FD_UNLIKELY( versioned.discriminant != fd_frozen_hash_versioned_enum_current ) ) goto cleanup;
905 :
906 0 : fd_shredcap_bank_hash_entry_t bank_hash_entry;
907 0 : bank_hash_entry.slot = slot;
908 0 : fd_memcpy( &bank_hash_entry.bank_hash, versioned.inner.current.frozen_hash.hash, 32UL );
909 0 : fd_io_buffered_ostream_write( bank_hash_ostream, &bank_hash_entry, FD_SHREDCAP_BANK_HASH_ENTRY_FOOTPRINT );
910 0 : cleanup:
911 0 : free( res );
912 0 : fd_scratch_pop();
913 0 : }
914 0 : return 0;
915 0 : }
|