Line data Source code
1 : #include "fd_rocksdb.h"
2 : #include <stdbool.h>
3 : #include <stdlib.h>
4 : #include <stdio.h>
5 : #include <unistd.h>
6 : #include "../../util/bits/fd_bits.h"
7 :
8 : char *
9 : fd_rocksdb_init( fd_rocksdb_t * db,
10 0 : char const * db_name ) {
11 0 : fd_memset(db, 0, sizeof(fd_rocksdb_t));
12 :
13 0 : db->opts = rocksdb_options_create();
14 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEFAULT ] = "default";
15 0 : db->cfgs[ FD_ROCKSDB_CFIDX_META ] = "meta";
16 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEAD_SLOTS ] = "dead_slots";
17 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DUPLICATE_SLOTS ] = "duplicate_slots";
18 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ERASURE_META ] = "erasure_meta";
19 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ORPHANS ] = "orphans";
20 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BANK_HASHES ] = "bank_hashes";
21 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ROOT ] = "root";
22 0 : db->cfgs[ FD_ROCKSDB_CFIDX_INDEX ] = "index";
23 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DATA_SHRED ] = "data_shred";
24 0 : db->cfgs[ FD_ROCKSDB_CFIDX_CODE_SHRED ] = "code_shred";
25 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ] = "transaction_status";
26 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ] = "address_signatures";
27 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ] = "transaction_memos";
28 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS_INDEX ] = "transaction_status_index";
29 0 : db->cfgs[ FD_ROCKSDB_CFIDX_REWARDS ] = "rewards";
30 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCKTIME ] = "blocktime";
31 0 : db->cfgs[ FD_ROCKSDB_CFIDX_PERF_SAMPLES ] = "perf_samples";
32 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCK_HEIGHT ] = "block_height";
33 0 : db->cfgs[ FD_ROCKSDB_CFIDX_OPTIMISTIC_SLOTS ] = "optimistic_slots";
34 0 : db->cfgs[ FD_ROCKSDB_CFIDX_MERKLE_ROOT_META ] = "merkle_root_meta";
35 :
36 0 : rocksdb_options_t const * cf_options[ FD_ROCKSDB_CF_CNT ];
37 0 : for( ulong i=0UL; i<FD_ROCKSDB_CF_CNT; i++ )
38 0 : cf_options[ i ] = db->opts;
39 :
40 0 : char *err = NULL;
41 :
42 0 : db->db = rocksdb_open_for_read_only_column_families(
43 0 : db->opts,
44 0 : db_name,
45 0 : FD_ROCKSDB_CF_CNT,
46 0 : (char const * const *)db->cfgs,
47 0 : (rocksdb_options_t const * const *)cf_options,
48 0 : db->cf_handles,
49 0 : false,
50 0 : &err );
51 :
52 0 : if( FD_UNLIKELY( err ) ) return err;
53 :
54 0 : db->ro = rocksdb_readoptions_create();
55 :
56 0 : return NULL;
57 0 : }
58 :
59 : void
60 : fd_rocksdb_new( fd_rocksdb_t * db,
61 0 : char const * db_name ) {
62 0 : fd_memset(db, 0, sizeof(fd_rocksdb_t));
63 :
64 0 : db->opts = rocksdb_options_create();
65 : /* Create the db*/
66 0 : rocksdb_options_set_create_if_missing(db->opts, 1);
67 :
68 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEFAULT ] = "default";
69 0 : db->cfgs[ FD_ROCKSDB_CFIDX_META ] = "meta";
70 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DEAD_SLOTS ] = "dead_slots";
71 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DUPLICATE_SLOTS ] = "duplicate_slots";
72 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ERASURE_META ] = "erasure_meta";
73 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ORPHANS ] = "orphans";
74 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BANK_HASHES ] = "bank_hashes";
75 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ROOT ] = "root";
76 0 : db->cfgs[ FD_ROCKSDB_CFIDX_INDEX ] = "index";
77 0 : db->cfgs[ FD_ROCKSDB_CFIDX_DATA_SHRED ] = "data_shred";
78 0 : db->cfgs[ FD_ROCKSDB_CFIDX_CODE_SHRED ] = "code_shred";
79 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ] = "transaction_status";
80 0 : db->cfgs[ FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ] = "address_signatures";
81 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ] = "transaction_memos";
82 0 : db->cfgs[ FD_ROCKSDB_CFIDX_TRANSACTION_STATUS_INDEX ] = "transaction_status_index";
83 0 : db->cfgs[ FD_ROCKSDB_CFIDX_REWARDS ] = "rewards";
84 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCKTIME ] = "blocktime";
85 0 : db->cfgs[ FD_ROCKSDB_CFIDX_PERF_SAMPLES ] = "perf_samples";
86 0 : db->cfgs[ FD_ROCKSDB_CFIDX_BLOCK_HEIGHT ] = "block_height";
87 0 : db->cfgs[ FD_ROCKSDB_CFIDX_OPTIMISTIC_SLOTS ] = "optimistic_slots";
88 0 : db->cfgs[ FD_ROCKSDB_CFIDX_MERKLE_ROOT_META ] = "merkle_root_meta";
89 :
90 : /* Create the rocksdb */
91 0 : char * err = NULL;
92 0 : db->db = rocksdb_open(db->opts, db_name, &err);
93 0 : if ( err != NULL ) {
94 0 : FD_LOG_ERR(("rocksdb creation failed: %s", err));
95 0 : }
96 :
97 0 : db->wo = rocksdb_writeoptions_create();
98 :
99 0 : rocksdb_options_set_compression( db->opts, rocksdb_lz4_compression );
100 :
101 : /* Create column families, default already exists at index 0 */
102 0 : for ( ulong i = 1; i < FD_ROCKSDB_CF_CNT; ++i ) {
103 0 : db->cf_handles[i] = rocksdb_create_column_family(db->db, db->opts, db->cfgs[i], &err);
104 0 : }
105 0 : }
106 :
107 0 : void fd_rocksdb_destroy(fd_rocksdb_t *db) {
108 :
109 0 : for( ulong i=0UL; i<FD_ROCKSDB_CF_CNT; i++ ) {
110 0 : if( db->cf_handles[i] ) {
111 0 : rocksdb_column_family_handle_destroy( db->cf_handles[i] );
112 0 : db->cf_handles[i] = NULL;
113 0 : }
114 0 : }
115 :
116 0 : if( db->ro ) {
117 0 : rocksdb_readoptions_destroy( db->ro );
118 0 : db->ro = NULL;
119 0 : }
120 :
121 0 : if( db->opts ) {
122 0 : rocksdb_options_destroy( db->opts );
123 0 : db->opts = NULL;
124 0 : }
125 :
126 0 : if( db->db ) {
127 0 : rocksdb_close( db->db );
128 0 : db->db = NULL;
129 0 : }
130 :
131 0 : if( db->wo ) {
132 0 : rocksdb_writeoptions_destroy( db->wo );
133 0 : }
134 0 : }
135 :
136 0 : ulong fd_rocksdb_last_slot(fd_rocksdb_t *db, char **err) {
137 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
138 0 : rocksdb_iter_seek_to_last(iter);
139 0 : if (!rocksdb_iter_valid(iter)) {
140 0 : rocksdb_iter_destroy(iter);
141 0 : *err = "db column for root is empty";
142 0 : return 0;
143 0 : }
144 :
145 0 : size_t klen = 0;
146 0 : const char *key = rocksdb_iter_key(iter, &klen); // There is no need to free key
147 0 : unsigned long slot = fd_ulong_bswap(*((unsigned long *) key));
148 0 : rocksdb_iter_destroy(iter);
149 0 : return slot;
150 0 : }
151 :
152 : ulong
153 : fd_rocksdb_first_slot( fd_rocksdb_t * db,
154 0 : char ** err ) {
155 :
156 0 : rocksdb_iterator_t* iter = rocksdb_create_iterator_cf(db->db, db->ro, db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
157 0 : rocksdb_iter_seek_to_first(iter);
158 0 : if( FD_UNLIKELY( !rocksdb_iter_valid(iter) ) ) {
159 0 : rocksdb_iter_destroy(iter);
160 0 : *err = "db column for root is empty";
161 0 : return 0;
162 0 : }
163 :
164 0 : ulong klen = 0;
165 0 : char const * key = rocksdb_iter_key( iter, &klen ); // There is no need to free key
166 0 : ulong slot = fd_ulong_bswap( *((ulong *)key));
167 0 : rocksdb_iter_destroy(iter);
168 0 : return slot;
169 0 : }
170 :
171 : fd_slot_meta_t *
172 : fd_rocksdb_get_meta( fd_rocksdb_t * db,
173 0 : ulong slot ) {
174 0 : ulong ks = fd_ulong_bswap(slot);
175 0 : size_t vallen = 0;
176 :
177 0 : char * err = NULL;
178 0 : char * meta = rocksdb_get_cf( db->db,
179 0 : db->ro,
180 0 : db->cf_handles[FD_ROCKSDB_CFIDX_META],
181 0 : (const char *) &ks,
182 0 : sizeof(ks),
183 0 : &vallen,
184 0 : &err );
185 :
186 0 : if( NULL != err ) {
187 0 : FD_LOG_WARNING(( "%s", err ));
188 0 : free( err );
189 0 : return NULL;
190 0 : }
191 :
192 0 : if (0 == vallen)
193 0 : return NULL;
194 :
195 0 : fd_bincode_decode_ctx_t ctx;
196 0 : ctx.data = meta;
197 0 : ctx.dataend = &meta[vallen];
198 :
199 0 : ulong total_sz = 0UL;
200 0 : if( fd_slot_meta_decode_footprint( &ctx, &total_sz ) ) {
201 0 : FD_LOG_ERR(( "fd_slot_meta_decode failed" ));
202 0 : }
203 :
204 0 : fd_slot_meta_t * mem = aligned_alloc( fd_slot_meta_align(), total_sz );
205 0 : if( FD_UNLIKELY( !mem ) ) {
206 0 : FD_LOG_ERR(( "aligned_alloc failed" ));
207 0 : }
208 :
209 0 : fd_slot_meta_decode( mem, &ctx );
210 :
211 0 : free(meta);
212 :
213 0 : return mem;
214 0 : }
215 :
216 : void *
217 0 : fd_rocksdb_root_iter_new ( void * ptr ) {
218 0 : fd_memset(ptr, 0, sizeof(fd_rocksdb_root_iter_t));
219 0 : return ptr;
220 0 : }
221 :
222 : fd_rocksdb_root_iter_t *
223 0 : fd_rocksdb_root_iter_join ( void * ptr ) {
224 0 : return (fd_rocksdb_root_iter_t *) ptr;
225 0 : }
226 :
227 : void *
228 0 : fd_rocksdb_root_iter_leave ( fd_rocksdb_root_iter_t * ptr ) {
229 0 : return ptr;
230 0 : }
231 :
232 : fd_slot_meta_t *
233 : fd_rocksdb_root_iter_seek( fd_rocksdb_root_iter_t * self,
234 : fd_rocksdb_t * db,
235 0 : ulong slot ) {
236 0 : self->db = db;
237 :
238 0 : if( FD_UNLIKELY( !self->iter ) )
239 0 : self->iter = rocksdb_create_iterator_cf(self->db->db, self->db->ro, self->db->cf_handles[FD_ROCKSDB_CFIDX_ROOT]);
240 :
241 0 : ulong ks = fd_ulong_bswap( slot );
242 :
243 0 : rocksdb_iter_seek( self->iter, (char const *)&ks, sizeof(ulong) );
244 0 : if( FD_UNLIKELY( !rocksdb_iter_valid(self->iter) ) )
245 0 : return NULL;
246 :
247 0 : size_t klen = 0;
248 0 : char const * key = rocksdb_iter_key( self->iter, &klen ); // There is no need to free key
249 0 : ulong kslot = fd_ulong_bswap( *((ulong *)key) );
250 :
251 0 : if( FD_UNLIKELY( kslot != slot ) ) {
252 0 : FD_LOG_WARNING(( "fd_rocksdb_root_iter_seek: wanted slot %lu, found %lu",
253 0 : slot, kslot ));
254 0 : return NULL;
255 0 : }
256 :
257 0 : return fd_rocksdb_get_meta( self->db, slot );
258 0 : }
259 :
260 : int
261 0 : fd_rocksdb_root_iter_slot ( fd_rocksdb_root_iter_t * self, ulong *slot ) {
262 0 : if ((NULL == self->db) || (NULL == self->iter))
263 0 : return -1;
264 :
265 0 : if (!rocksdb_iter_valid(self->iter))
266 0 : return -2;
267 :
268 0 : size_t klen = 0;
269 0 : const char *key = rocksdb_iter_key(self->iter, &klen); // There is no need to free key
270 0 : *slot = fd_ulong_bswap(*((unsigned long *) key));
271 0 : return 0;
272 0 : }
273 :
274 : fd_slot_meta_t *
275 0 : fd_rocksdb_root_iter_next( fd_rocksdb_root_iter_t * self ) {
276 0 : if ((NULL == self->db) || (NULL == self->iter))
277 0 : return NULL;
278 :
279 0 : if (!rocksdb_iter_valid(self->iter))
280 0 : return NULL;
281 :
282 0 : rocksdb_iter_next(self->iter);
283 :
284 0 : if (!rocksdb_iter_valid(self->iter))
285 0 : return NULL;
286 :
287 0 : size_t klen = 0;
288 0 : const char *key = rocksdb_iter_key(self->iter, &klen); // There is no need to free key
289 :
290 0 : return fd_rocksdb_get_meta( self->db, fd_ulong_bswap(*((unsigned long *) key)) );
291 0 : }
292 :
293 : void
294 0 : fd_rocksdb_root_iter_destroy ( fd_rocksdb_root_iter_t * self ) {
295 0 : if (NULL != self->iter) {
296 0 : rocksdb_iter_destroy(self->iter);
297 0 : self->iter = 0;
298 0 : }
299 0 : self->db = NULL;
300 0 : }
301 :
302 : ulong
303 0 : fd_rocksdb_get_slot( ulong cf_idx, char const * key ) {
304 0 : switch (cf_idx) {
305 0 : case FD_ROCKSDB_CFIDX_TRANSACTION_STATUS:
306 0 : return fd_ulong_bswap(*((ulong *) &key[72])); /* (signature,slot)*/
307 0 : case FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES:
308 0 : return fd_ulong_bswap(*((ulong *) &key[40])); /* (pubkey,slot,u32,signature) */
309 0 : default: /* all other cfs have the slot at the start */
310 0 : return fd_ulong_bswap( *((ulong *)&key[0]) ); /* The key is just the slot number */
311 0 : }
312 :
313 0 : return fd_ulong_bswap( *((ulong *)key) );
314 0 : }
315 :
316 : void
317 0 : fd_rocksdb_iter_seek_to_slot_if_possible( rocksdb_iterator_t * iter, const ulong cf_idx, const ulong slot ) {
318 0 : ulong k = fd_ulong_bswap(slot);
319 0 : switch (cf_idx) {
320 : /* These cfs do not have the slot at the start, we can't seek based on slot prefix */
321 0 : case FD_ROCKSDB_CFIDX_TRANSACTION_STATUS:
322 0 : case FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES:
323 0 : rocksdb_iter_seek_to_first( iter );
324 0 : break;
325 0 : default: /* all other cfs have the slot at the start, seek based on slot prefix */
326 0 : rocksdb_iter_seek( iter, (const char *)&k, 8);
327 0 : break;
328 0 : }
329 0 : }
330 :
331 : int
332 : fd_rocksdb_copy_over_slot_indexed_range( fd_rocksdb_t * src,
333 : fd_rocksdb_t * dst,
334 : ulong cf_idx,
335 : ulong start_slot,
336 0 : ulong end_slot ) {
337 0 : FD_LOG_NOTICE(( "fd_rocksdb_copy_over_slot_indexed_range: %lu", cf_idx ));
338 :
339 0 : if ( cf_idx == FD_ROCKSDB_CFIDX_TRANSACTION_MEMOS ||
340 0 : cf_idx == FD_ROCKSDB_CFIDX_TRANSACTION_STATUS ||
341 0 : cf_idx == FD_ROCKSDB_CFIDX_ADDRESS_SIGNATURES ) {
342 0 : FD_LOG_NOTICE(( "fd_rocksdb_copy_over_range: skipping cf_idx=%lu because not slot indexed", cf_idx ));
343 0 : return 0;
344 0 : }
345 :
346 0 : rocksdb_iterator_t * iter = rocksdb_create_iterator_cf( src->db, src->ro, src->cf_handles[cf_idx] );
347 0 : if ( FD_UNLIKELY( iter == NULL ) ) {
348 0 : FD_LOG_ERR(( "rocksdb_create_iterator_cf failed for cf_idx=%lu", cf_idx ));
349 0 : }
350 :
351 0 : for ( fd_rocksdb_iter_seek_to_slot_if_possible( iter, cf_idx, start_slot ); rocksdb_iter_valid( iter ); rocksdb_iter_next( iter ) ) {
352 0 : ulong klen = 0;
353 0 : char const * key = rocksdb_iter_key( iter, &klen ); // There is no need to free key
354 :
355 0 : ulong slot = fd_rocksdb_get_slot( cf_idx, key );
356 0 : if ( slot < start_slot ) {
357 0 : continue;
358 0 : }
359 0 : else if ( slot > end_slot ) {
360 0 : break;
361 0 : }
362 :
363 0 : ulong vlen = 0;
364 0 : char const * value = rocksdb_iter_value( iter, &vlen );
365 :
366 0 : fd_rocksdb_insert_entry( dst, cf_idx, key, klen, value, vlen );
367 0 : }
368 0 : rocksdb_iter_destroy( iter );
369 0 : return 0;
370 0 : }
371 :
372 : int
373 : fd_rocksdb_insert_entry( fd_rocksdb_t * db,
374 : ulong cf_idx,
375 : const char * key,
376 : ulong klen,
377 : const char * value,
378 : ulong vlen )
379 0 : {
380 0 : char * err = NULL;
381 0 : rocksdb_put_cf( db->db, db->wo, db->cf_handles[cf_idx],
382 0 : key, klen, value, vlen, &err );
383 0 : if( FD_UNLIKELY( err != NULL ) ) {
384 0 : FD_LOG_WARNING(( "rocksdb_put_cf failed with error %s", err ));
385 0 : return -1;
386 0 : }
387 0 : return 0;
388 0 : }
|