Line data Source code
1 : #include "fd_rocksdb.h"
2 : #include <stdbool.h>
3 : #include <stdlib.h>
4 : #include <stdio.h>
5 : #include <unistd.h>
6 : #include "../../util/bits/fd_bits.h"
7 :
8 : char *
9 : fd_rocksdb_init( fd_rocksdb_t * db,
10 0 : char const * db_name ) {
11 0 : fd_memset(db, 0, sizeof(fd_rocksdb_t));
12 :
13 0 : db->opts = rocksdb_options_create();
14 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEFAULT ] = "default";
15 0 : db->cfgs[ FD_ROCKSDB_CFIDX_META ] = "meta";
16 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEAD_SLOTS ] = "dead_slots";
17 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DUPLICATE_SLOTS ] = "duplicate_slots";
18 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ERASURE_META ] = "erasure_meta";
19 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ORPHANS ] = "orphans";
20 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BANK_HASHES ] = "bank_hashes";
21 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ROOT ] = "root";
22 0 : db->cfgs[ FD_ROCKSDB_CFIDX_INDEX ] = "index";
23 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DATA_SHRED ] = "data_shred";
24 0 : db->cfgs[ FD_ROCKSDB_CFIDX_CODE_SHRED ] = "code_shred";
25 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ] = "transaction_status";
26 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ] = "address_signatures";
27 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ] = "transaction_memos";
28 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS_INDEX ] = "transaction_status_index";
29 0 : db->cfgs[ FD_ROCKSDB_CFIDX_REWARDS ] = "rewards";
30 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCKTIME ] = "blocktime";
31 0 : db->cfgs[ FD_ROCKSDB_CFIDX_PERF_SAMPLES ] = "perf_samples";
32 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCK_HEIGHT ] = "block_height";
33 0 : db->cfgs[ FD_ROCKSDB_CFIDX_OPTIMISTIC_SLOTS ] = "optimistic_slots";
34 0 : db->cfgs[ FD_ROCKSDB_CFIDX_MERKLE_ROOT_META ] = "merkle_root_meta";
35 :
36 0 : rocksdb_options_t const * cf_options[ FD_ROCKSDB_CF_CNT ];
37 0 : for( ulong i=0UL; i<FD_ROCKSDB_CF_CNT; i++ )
38 0 : cf_options[ i ] = db->opts;
39 :
40 0 : char *err = NULL;
41 :
42 0 : db->db = rocksdb_open_for_read_only_column_families(
43 0 : db->opts,
44 0 : db_name,
45 0 : FD_ROCKSDB_CF_CNT,
46 0 : (char const * const *)db->cfgs,
47 0 : (rocksdb_options_t const * const *)cf_options,
48 0 : db->cf_handles,
49 0 : false,
50 0 : &err );
51 :
52 0 : if( FD_UNLIKELY( err ) ) return err;
53 :
54 0 : db->ro = rocksdb_readoptions_create();
55 :
56 0 : return NULL;
57 0 : }
58 :
59 : void
60 : fd_rocksdb_new( fd_rocksdb_t * db,
61 0 : char const * db_name ) {
62 0 : fd_memset(db, 0, sizeof(fd_rocksdb_t));
63 :
64 0 : db->opts = rocksdb_options_create();
65 : /* Create the db*/
66 0 : rocksdb_options_set_create_if_missing(db->opts, 1);
67 :
68 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEFAULT ] = "default";
69 0 : db->cfgs[ FD_ROCKSDB_CFIDX_META ] = "meta";
70 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEAD_SLOTS ] = "dead_slots";
71 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DUPLICATE_SLOTS ] = "duplicate_slots";
72 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ERASURE_META ] = "erasure_meta";
73 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ORPHANS ] = "orphans";
74 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BANK_HASHES ] = "bank_hashes";
75 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ROOT ] = "root";
76 0 : db->cfgs[ FD_ROCKSDB_CFIDX_INDEX ] = "index";
77 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DATA_SHRED ] = "data_shred";
78 0 : db->cfgs[ FD_ROCKSDB_CFIDX_CODE_SHRED ] = "code_shred";
79 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ] = "transaction_status";
80 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ] = "address_signatures";
81 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ] = "transaction_memos";
82 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS_INDEX ] = "transaction_status_index";
83 0 : db->cfgs[ FD_ROCKSDB_CFIDX_REWARDS ] = "rewards";
84 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCKTIME ] = "blocktime";
85 0 : db->cfgs[ FD_ROCKSDB_CFIDX_PERF_SAMPLES ] = "perf_samples";
86 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCK_HEIGHT ] = "block_height";
87 0 : db->cfgs[ FD_ROCKSDB_CFIDX_OPTIMISTIC_SLOTS ] = "optimistic_slots";
88 0 : db->cfgs[ FD_ROCKSDB_CFIDX_MERKLE_ROOT_META ] = "merkle_root_meta";
89 :
90 : /* Create the rocksdb */
91 0 : char * err = NULL;
92 0 : db->db = rocksdb_open(db->opts, db_name, &err);
93 0 : if ( err != NULL ) {
94 0 : FD_LOG_ERR(("rocksdb creation failed: %s", err));
95 0 : }
96 :
97 0 : db->wo = rocksdb_writeoptions_create();
98 :
99 : /* Create column families, default already exists at index 0 */
100 0 : for ( ulong i = 1; i < FD_ROCKSDB_CF_CNT; ++i ) {
101 0 : db->cf_handles[i] = rocksdb_create_column_family(db->db, db->opts, db->cfgs[i], &err);
102 0 : }
103 0 : rocksdb_options_set_compression( db->opts, rocksdb_lz4_compression );
104 0 : }
105 :
106 0 : void fd_rocksdb_destroy(fd_rocksdb_t *db) {
107 :
108 0 : for( ulong i=0UL; i<FD_ROCKSDB_CF_CNT; i++ ) {
109 0 : if( db->cf_handles[i] ) {
110 0 : rocksdb_column_family_handle_destroy( db->cf_handles[i] );
111 0 : db->cf_handles[i] = NULL;
112 0 : }
113 0 : }
114 :
115 0 : if( db->ro ) {
116 0 : rocksdb_readoptions_destroy( db->ro );
117 0 : db->ro = NULL;
118 0 : }
119 :
120 0 : if( db->opts ) {
121 0 : rocksdb_options_destroy( db->opts );
122 0 : db->opts = NULL;
123 0 : }
124 :
125 0 : if( db->db ) {
126 0 : rocksdb_close( db->db );
127 0 : db->db = NULL;
128 0 : }
129 :
130 0 : if( db->wo ) {
131 0 : rocksdb_writeoptions_destroy( db->wo );
132 0 : }
133 0 : }
134 :
135 0 : ulong fd_rocksdb_last_slot(fd_rocksdb_t *db, char **err) {
136 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
137 0 : rocksdb_iter_seek_to_last(iter);
138 0 : if (!rocksdb_iter_valid(iter)) {
139 0 : rocksdb_iter_destroy(iter);
140 0 : *err = "db column for root is empty";
141 0 : return 0;
142 0 : }
143 :
144 0 : size_t klen = 0;
145 0 : const char *key = rocksdb_iter_key(iter, &klen); // There is no need to free key
146 0 : unsigned long slot = fd_ulong_bswap(*((unsigned long *) key));
147 0 : rocksdb_iter_destroy(iter);
148 0 : return slot;
149 0 : }
150 :
151 0 : ulong fd_rocksdb_find_last_slot(fd_rocksdb_t *db, char **err) {
152 0 : ulong max_slot = 0;
153 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
154 0 : rocksdb_iter_seek_to_first(iter);
155 0 : if (!rocksdb_iter_valid(iter)) {
156 0 : rocksdb_iter_destroy(iter);
157 0 : *err = "db column for root is empty";
158 0 : return 0;
159 0 : }
160 :
161 0 : for( ; rocksdb_iter_valid(iter); rocksdb_iter_next(iter) ) {
162 0 : size_t klen = 0;
163 0 : const char *key = rocksdb_iter_key(iter, &klen); // There is no need to free key
164 0 : unsigned long slot = fd_ulong_bswap(*((unsigned long *) key));
165 :
166 0 : if( slot > max_slot ) {
167 0 : max_slot = slot;
168 0 : FD_LOG_WARNING(("new max_slot: %lu", max_slot));
169 0 : }
170 0 : }
171 :
172 0 : rocksdb_iter_destroy(iter);
173 0 : return max_slot;
174 0 : }
175 :
176 : ulong
177 : fd_rocksdb_first_slot( fd_rocksdb_t * db,
178 0 : char ** err ) {
179 :
180 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
181 0 : rocksdb_iter_seek_to_first(iter);
182 0 : if( FD_UNLIKELY( !rocksdb_iter_valid(iter) ) ) {
183 0 : rocksdb_iter_destroy(iter);
184 0 : *err = "db column for root is empty";
185 0 : return 0;
186 0 : }
187 :
188 0 : ulong klen = 0;
189 0 : char const * key = rocksdb_iter_key( iter, &klen ); // There is no need to free key
190 0 : ulong slot = fd_ulong_bswap( *((ulong *)key));
191 0 : rocksdb_iter_destroy(iter);
192 0 : return slot;
193 0 : }
194 :
195 : int
196 : fd_rocksdb_get_meta( fd_rocksdb_t * db,
197 : ulong slot,
198 : fd_slot_meta_t * m,
199 0 : fd_valloc_t valloc ) {
200 0 : ulong ks = fd_ulong_bswap(slot);
201 0 : size_t vallen = 0;
202 :
203 0 : char * err = NULL;
204 0 : char * meta = rocksdb_get_cf( db->db,
205 0 : db->ro,
206 0 : db->cf_handles[FD_ROCKSDB_CFIDX_META],
207 0 : (const char *) &ks,
208 0 : sizeof(ks),
209 0 : &vallen,
210 0 : &err );
211 :
212 0 : if( NULL != err ) {
213 0 : FD_LOG_WARNING(( "%s", err ));
214 0 : free( err );
215 0 : return -2;
216 0 : }
217 :
218 0 : if (0 == vallen)
219 0 : return -1;
220 :
221 0 : fd_bincode_decode_ctx_t ctx;
222 0 : ctx.data = meta;
223 0 : ctx.dataend = &meta[vallen];
224 :
225 0 : ulong total_sz = 0UL;
226 0 : if( fd_slot_meta_decode_footprint( &ctx, &total_sz ) ) {
227 0 : FD_LOG_ERR(( "fd_slot_meta_decode failed" ));
228 0 : }
229 :
230 0 : uchar * mem = fd_valloc_malloc( valloc, fd_slot_meta_align(), total_sz );
231 0 : if( NULL == mem ) {
232 0 : FD_LOG_ERR(( "fd_valloc_malloc failed" ));
233 0 : }
234 :
235 0 : fd_slot_meta_decode( mem, &ctx );
236 :
237 0 : fd_memcpy( m, mem, sizeof(fd_slot_meta_t) );
238 :
239 0 : free(meta);
240 :
241 0 : return 0;
242 0 : }
243 :
244 : void *
245 0 : fd_rocksdb_root_iter_new ( void * ptr ) {
246 0 : fd_memset(ptr, 0, sizeof(fd_rocksdb_root_iter_t));
247 0 : return ptr;
248 0 : }
249 :
250 : fd_rocksdb_root_iter_t *
251 0 : fd_rocksdb_root_iter_join ( void * ptr ) {
252 0 : return (fd_rocksdb_root_iter_t *) ptr;
253 0 : }
254 :
255 : void *
256 0 : fd_rocksdb_root_iter_leave ( fd_rocksdb_root_iter_t * ptr ) {
257 0 : return ptr;
258 0 : }
259 :
260 : int
261 : fd_rocksdb_root_iter_seek( fd_rocksdb_root_iter_t * self,
262 : fd_rocksdb_t * db,
263 : ulong slot,
264 : fd_slot_meta_t * m,
265 0 : fd_valloc_t valloc ) {
266 0 : self->db = db;
267 :
268 0 : if( FD_UNLIKELY( !self->iter ) )
269 0 : self->iter = rocksdb_create_iterator_cf(self->db->db, self->db->ro, self->db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
270 :
271 0 : ulong ks = fd_ulong_bswap( slot );
272 :
273 0 : rocksdb_iter_seek( self->iter, (char const *)&ks, sizeof(ulong) );
274 0 : if( FD_UNLIKELY( !rocksdb_iter_valid(self->iter) ) )
275 0 : return -1;
276 :
277 0 : size_t klen = 0;
278 0 : char const * key = rocksdb_iter_key( self->iter, &klen ); // There is no need to free key
279 0 : ulong kslot = fd_ulong_bswap( *((ulong *)key) );
280 :
281 0 : if( FD_UNLIKELY( kslot != slot ) ) {
282 0 : FD_LOG_WARNING(( "fd_rocksdb_root_iter_seek: wanted slot %lu, found %lu",
283 0 : slot, kslot ));
284 0 : return -2;
285 0 : }
286 :
287 0 : return fd_rocksdb_get_meta( self->db, slot, m, valloc );
288 0 : }
289 :
290 : int
291 0 : fd_rocksdb_root_iter_slot ( fd_rocksdb_root_iter_t * self, ulong *slot ) {
292 0 : if ((NULL == self->db) || (NULL == self->iter))
293 0 : return -1;
294 :
295 0 : if (!rocksdb_iter_valid(self->iter))
296 0 : return -2;
297 :
298 0 : size_t klen = 0;
299 0 : const char *key = rocksdb_iter_key(self->iter, &klen); // There is no need to free key
300 0 : *slot = fd_ulong_bswap(*((unsigned long *) key));
301 0 : return 0;
302 0 : }
303 :
304 : int
305 : fd_rocksdb_root_iter_next( fd_rocksdb_root_iter_t * self,
306 : fd_slot_meta_t * m,
307 0 : fd_valloc_t valloc ) {
308 0 : if ((NULL == self->db) || (NULL == self->iter))
309 0 : return -1;
310 :
311 0 : if (!rocksdb_iter_valid(self->iter))
312 0 : return -2;
313 :
314 0 : rocksdb_iter_next(self->iter);
315 :
316 0 : if (!rocksdb_iter_valid(self->iter))
317 0 : return -3;
318 :
319 0 : size_t klen = 0;
320 0 : const char *key = rocksdb_iter_key(self->iter, &klen); // There is no need to free key
321 :
322 0 : return fd_rocksdb_get_meta( self->db, fd_ulong_bswap(*((unsigned long *) key)), m, valloc );
323 0 : }
324 :
325 : void
326 0 : fd_rocksdb_root_iter_destroy ( fd_rocksdb_root_iter_t * self ) {
327 0 : if (NULL != self->iter) {
328 0 : rocksdb_iter_destroy(self->iter);
329 0 : self->iter = 0;
330 0 : }
331 0 : self->db = NULL;
332 0 : }
333 :
334 : void *
335 : fd_rocksdb_get_txn_status_raw( fd_rocksdb_t * self,
336 : ulong slot,
337 : void const * sig,
338 0 : ulong * psz ) {
339 :
340 0 : ulong slot_be = fd_ulong_bswap( slot );
341 :
342 : /* Construct RocksDB query key */
343 0 : char key[72];
344 0 : memcpy( key, sig, 64UL );
345 0 : memcpy( key+64UL, &slot_be, 8UL );
346 :
347 : /* Query record */
348 0 : char * err = NULL;
349 0 : char * res = rocksdb_get_cf(
350 0 : self->db, self->ro,
351 0 : self->cf_handles[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ],
352 0 : key, 72UL,
353 0 : psz,
354 0 : &err );
355 :
356 0 : if( FD_UNLIKELY( err ) ) {
357 0 : FD_LOG_WARNING(("err=%s", err));
358 0 : free( err );
359 0 : return NULL;
360 0 : }
361 0 : return res;
362 0 : }
363 :
364 : ulong
365 0 : fd_rocksdb_get_slot( ulong cf_idx, char const * key ) {
366 0 : switch (cf_idx) {
367 0 : case FD_ROCKSDB_CFIDX_TRANSACTION_STATUS:
368 0 : return fd_ulong_bswap(*((ulong *) &key[72])); /* (signature,slot)*/
369 0 : case FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES:
370 0 : return fd_ulong_bswap(*((ulong *) &key[40])); /* (pubkey,slot,u32,signature) */
371 0 : default: /* all other cfs have the slot at the start */
372 0 : return fd_ulong_bswap( *((ulong *)&key[0]) ); /* The key is just the slot number */
373 0 : }
374 :
375 0 : return fd_ulong_bswap( *((ulong *)key) );
376 0 : }
377 :
378 : void
379 0 : fd_rocksdb_iter_seek_to_slot_if_possible( rocksdb_iterator_t * iter, const ulong cf_idx, const ulong slot ) {
380 0 : ulong k = fd_ulong_bswap(slot);
381 0 : switch (cf_idx) {
382 : /* These cfs do not have the slot at the start, we can't seek based on slot prefix */
383 0 : case FD_ROCKSDB_CFIDX_TRANSACTION_STATUS:
384 0 : case FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES:
385 0 : rocksdb_iter_seek_to_first( iter );
386 0 : break;
387 0 : default: /* all other cfs have the slot at the start, seek based on slot prefix */
388 0 : rocksdb_iter_seek( iter, (const char *)&k, 8);
389 0 : break;
390 0 : }
391 0 : }
392 :
393 : int
394 : fd_rocksdb_copy_over_slot_indexed_range( fd_rocksdb_t * src,
395 : fd_rocksdb_t * dst,
396 : ulong cf_idx,
397 : ulong start_slot,
398 0 : ulong end_slot ) {
399 0 : FD_LOG_NOTICE(( "fd_rocksdb_copy_over_slot_indexed_range: %lu", cf_idx ));
400 :
401 0 : if ( cf_idx == FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ||
402 0 : cf_idx == FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ||
403 0 : cf_idx == FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ) {
404 0 : FD_LOG_NOTICE(( "fd_rocksdb_copy_over_range: skipping cf_idx=%lu because not slot indexed", cf_idx ));
405 0 : return 0;
406 0 : }
407 :
408 0 : rocksdb_iterator_t * iter = rocksdb_create_iterator_cf( src->db, src->ro, src->cf_handles[cf_idx] );
409 0 : if ( FD_UNLIKELY( iter == NULL ) ) {
410 0 : FD_LOG_ERR(( "rocksdb_create_iterator_cf failed for cf_idx=%lu", cf_idx ));
411 0 : }
412 :
413 0 : for ( fd_rocksdb_iter_seek_to_slot_if_possible( iter, cf_idx, start_slot ); rocksdb_iter_valid( iter ); rocksdb_iter_next( iter ) ) {
414 0 : ulong klen = 0;
415 0 : char const * key = rocksdb_iter_key( iter, &klen ); // There is no need to free key
416 :
417 0 : ulong slot = fd_rocksdb_get_slot( cf_idx, key );
418 0 : if ( slot < start_slot ) {
419 0 : continue;
420 0 : }
421 0 : else if ( slot > end_slot ) {
422 0 : break;
423 0 : }
424 :
425 0 : ulong vlen = 0;
426 0 : char const * value = rocksdb_iter_value( iter, &vlen );
427 :
428 0 : fd_rocksdb_insert_entry( dst, cf_idx, key, klen, value, vlen );
429 0 : }
430 0 : rocksdb_iter_destroy( iter );
431 0 : return 0;
432 0 : }
433 :
434 : int
435 : fd_rocksdb_insert_entry( fd_rocksdb_t * db,
436 : ulong cf_idx,
437 : const char * key,
438 : ulong klen,
439 : const char * value,
440 : ulong vlen )
441 0 : {
442 0 : char * err = NULL;
443 0 : rocksdb_put_cf( db->db, db->wo, db->cf_handles[cf_idx],
444 0 : key, klen, value, vlen, &err );
445 0 : if( FD_UNLIKELY( err != NULL ) ) {
446 0 : FD_LOG_WARNING(( "rocksdb_put_cf failed with error %s", err ));
447 0 : return -1;
448 0 : }
449 0 : return 0;
450 0 : }
|