Line data Source code
1 : #include "../../flamenco/types/fd_types.h"
2 : #include "../../flamenco/runtime/fd_rocksdb.h"
3 : #include "../../flamenco/runtime/context/fd_capture_ctx.h"
4 : #include <unistd.h>
5 : #include <sys/stat.h>
6 :
7 : struct fd_ledger_args {
8 : fd_wksp_t * wksp; /* wksp for blockstore */
9 : char const * cmd; /* user passed command to fd_ledger */
10 : ulong start_slot; /* start slot for offline replay */
11 : ulong end_slot; /* end slot for offline replay */
12 : uint hashseed; /* hashseed */
13 : char const * restore; /* wksp restore */
14 : ulong shred_max; /* maximum number of shreds*/
15 : ulong slot_history_max; /* number of slots stored by blockstore*/
16 : char const * mini_db_dir; /* path to minifed rocksdb that's to be created */
17 : int copy_txn_status; /* determine if txns should be copied to the blockstore during minify/replay */
18 : ulong trash_hash; /* trash hash to be used for negative cases*/
19 : char const * rocksdb_path; /* path to rocksdb directory */
20 : fd_valloc_t valloc; /* wksp valloc that should NOT be used for runtime allocations */
21 : };
22 : typedef struct fd_ledger_args fd_ledger_args_t;
23 :
24 : /***************************** Helpers ****************************************/
25 : static fd_valloc_t
26 0 : allocator_setup( fd_wksp_t * wksp ) {
27 :
28 0 : if( FD_UNLIKELY( !wksp ) ) {
29 0 : FD_LOG_ERR(( "workspace is NULL" ));
30 0 : }
31 :
32 0 : void * alloc_shmem = fd_wksp_alloc_laddr( wksp, fd_alloc_align(), fd_alloc_footprint(), 3UL );
33 0 : if( FD_UNLIKELY( !alloc_shmem ) ) { FD_LOG_ERR( ( "fd_alloc too large for workspace" ) ); }
34 0 : void * alloc_shalloc = fd_alloc_new( alloc_shmem, 3UL );
35 0 : if( FD_UNLIKELY( !alloc_shalloc ) ) { FD_LOG_ERR( ( "fd_alloc_new failed" ) ); }
36 0 : fd_alloc_t * alloc = fd_alloc_join( alloc_shalloc, 3UL );
37 0 : if( FD_UNLIKELY( !alloc ) ) { FD_LOG_ERR( ( "fd_alloc_join failed" ) ); }
38 0 : fd_valloc_t valloc = fd_alloc_virtual( alloc );
39 0 : return valloc;
40 :
41 : /* NOTE: Enable this if leak hunting */
42 : //return fd_backtracing_alloc_virtual( &valloc );
43 :
44 0 : }
45 :
46 : void
47 : ingest_rocksdb( char const * file,
48 : ulong start_slot,
49 : ulong end_slot,
50 : FD_PARAM_UNUSED ulong trash_hash,
51 0 : fd_valloc_t valloc ) {
52 :
53 0 : fd_rocksdb_t rocks_db;
54 0 : char * err = fd_rocksdb_init( &rocks_db, file );
55 0 : if( FD_UNLIKELY( err!=NULL ) ) {
56 0 : FD_LOG_ERR(( "fd_rocksdb_init returned %s", err ));
57 0 : }
58 :
59 0 : ulong last_slot = fd_rocksdb_last_slot( &rocks_db, &err );
60 0 : if( FD_UNLIKELY( err!=NULL ) ) {
61 0 : FD_LOG_ERR(( "fd_rocksdb_last_slot returned %s", err ));
62 0 : }
63 :
64 0 : if( last_slot < start_slot ) {
65 0 : FD_LOG_ERR(( "rocksdb blocks are older than snapshot. first=%lu last=%lu wanted=%lu",
66 0 : fd_rocksdb_first_slot(&rocks_db, &err), last_slot, start_slot ));
67 0 : }
68 :
69 0 : FD_LOG_NOTICE(( "ingesting rocksdb from start=%lu to end=%lu", start_slot, end_slot ));
70 :
71 0 : fd_rocksdb_root_iter_t iter = {0};
72 0 : fd_rocksdb_root_iter_new( &iter );
73 :
74 0 : fd_slot_meta_t slot_meta = {0};
75 0 : fd_memset( &slot_meta, 0, sizeof(slot_meta) );
76 :
77 0 : int block_found = -1;
78 0 : while ( block_found!=0 && start_slot<=end_slot ) {
79 0 : block_found = fd_rocksdb_root_iter_seek( &iter, &rocks_db, start_slot, &slot_meta, valloc );
80 0 : if ( block_found!=0 ) {
81 0 : start_slot++;
82 0 : }
83 0 : }
84 0 : if( FD_UNLIKELY( block_found!=0 ) ) {
85 0 : FD_LOG_ERR(( "unable to seek to any slot" ));
86 0 : }
87 :
88 0 : uchar trash_hash_buf[32];
89 0 : memset( trash_hash_buf, 0xFE, sizeof(trash_hash_buf) );
90 :
91 0 : ulong blk_cnt = 0;
92 0 : do {
93 0 : ulong slot = slot_meta.slot;
94 0 : if( slot > end_slot ) {
95 0 : break;
96 0 : }
97 :
98 : /* Read and deshred block from RocksDB */
99 0 : if( blk_cnt % 100 == 0 ) {
100 0 : FD_LOG_WARNING(( "imported %lu blocks", blk_cnt ));
101 0 : }
102 :
103 0 : if( FD_UNLIKELY( err ) ) {
104 0 : FD_LOG_ERR(( "fd_rocksdb_get_block failed" ));
105 0 : }
106 :
107 0 : ++blk_cnt;
108 :
109 0 : memset( &slot_meta, 0, sizeof(fd_slot_meta_t) );
110 :
111 0 : int ret = fd_rocksdb_root_iter_next( &iter, &slot_meta, valloc );
112 0 : if( ret < 0 ) {
113 : // FD_LOG_WARNING(("Failed for slot %lu", slot + 1));
114 0 : ret = fd_rocksdb_get_meta( &rocks_db, slot + 1, &slot_meta, valloc );
115 0 : if( ret < 0 ) {
116 0 : break;
117 0 : }
118 0 : }
119 : // FD_LOG_ERR(("fd_rocksdb_root_iter_seek returned %d", ret));
120 0 : } while (1);
121 :
122 0 : fd_rocksdb_root_iter_destroy( &iter );
123 0 : fd_rocksdb_destroy( &rocks_db );
124 :
125 0 : FD_LOG_NOTICE(( "ingested %lu blocks", blk_cnt ));
126 0 : }
127 :
128 : // void
129 : // init_blockstore( fd_ledger_args_t * args ) {
130 : // fd_wksp_tag_query_info_t info;
131 : // ulong blockstore_tag = FD_BLOCKSTORE_MAGIC;
132 : // void * shmem;
133 : // if( fd_wksp_tag_query( args->wksp, &blockstore_tag, 1, &info, 1 ) > 0 ) {
134 : // shmem = fd_wksp_laddr_fast( args->wksp, info.gaddr_lo );
135 : // args->blockstore = fd_blockstore_join( &args->blockstore_ljoin, shmem );
136 : // if( args->blockstore->shmem->magic != FD_BLOCKSTORE_MAGIC ) {
137 : // FD_LOG_ERR(( "failed to join a blockstore" ));
138 : // }
139 : // FD_LOG_NOTICE(( "joined blockstore" ));
140 : // } else {
141 : // shmem = fd_wksp_alloc_laddr( args->wksp, fd_blockstore_align(), fd_blockstore_footprint( args->shred_max, args->slot_history_max, 16 ), blockstore_tag );
142 : // if( shmem == NULL ) {
143 : // FD_LOG_ERR(( "failed to allocate a blockstore" ));
144 : // }
145 : // args->blockstore = fd_blockstore_join( &args->blockstore_ljoin, fd_blockstore_new( shmem, 1, args->hashseed, args->shred_max, args->slot_history_max, 16 ) );
146 : // if( args->blockstore->shmem->magic != FD_BLOCKSTORE_MAGIC ) {
147 : // fd_wksp_free_laddr( shmem );
148 : // FD_LOG_ERR(( "failed to allocate a blockstore" ));
149 : // }
150 : // FD_LOG_NOTICE(( "allocating a new blockstore" ));
151 : // }
152 : // }
153 :
154 : void
155 0 : wksp_restore( fd_ledger_args_t * args ) {
156 0 : if( args->restore != NULL ) {
157 0 : FD_LOG_NOTICE(( "restoring wksp %s", args->restore ));
158 0 : fd_wksp_restore( args->wksp, args->restore, args->hashseed );
159 0 : }
160 0 : }
161 :
162 : /********************* Main Command Functions and Setup ***********************/
163 : void
164 0 : minify( fd_ledger_args_t * args ) {
165 : /* Example commmand:
166 : fd_ledger --cmd minify --rocksdb <LARGE_ROCKSDB> --minified-rocksdb <MINI_ROCKSDB>
167 : --start-slot <START_SLOT> --end-slot <END_SLOT> --copy-txn-status 1
168 : */
169 0 : if( args->rocksdb_path == NULL ) {
170 0 : FD_LOG_ERR(( "rocksdb path is NULL" ));
171 0 : }
172 0 : if( args->mini_db_dir == NULL ) {
173 0 : FD_LOG_ERR(( "minified rocksdb path is NULL" ));
174 0 : }
175 :
176 0 : args->valloc = allocator_setup( args->wksp );
177 :
178 0 : fd_rocksdb_t big_rocksdb;
179 0 : char * err = fd_rocksdb_init( &big_rocksdb, args->rocksdb_path );
180 0 : if( FD_UNLIKELY( err!=NULL ) ) {
181 0 : FD_LOG_ERR(( "fd_rocksdb_init at path=%s returned error=%s", args->rocksdb_path, err ));
182 0 : }
183 :
184 : /* If the directory for the minified rocksdb already exists, error out */
185 0 : struct stat statbuf;
186 0 : if( stat( args->mini_db_dir, &statbuf ) == 0 ) {
187 0 : FD_LOG_ERR(( "path for mini_db_dir=%s already exists", args->mini_db_dir ));
188 0 : }
189 :
190 : /* Create a new smaller rocksdb */
191 0 : fd_rocksdb_t mini_rocksdb;
192 0 : fd_rocksdb_new( &mini_rocksdb, args->mini_db_dir );
193 :
194 : /* Correctly bound off start and end slot */
195 0 : ulong first_slot = fd_rocksdb_first_slot( &big_rocksdb, &err );
196 0 : ulong last_slot = fd_rocksdb_last_slot( &big_rocksdb, &err );
197 0 : if( args->start_slot < first_slot ) { args->start_slot = first_slot; }
198 0 : if( args->end_slot > last_slot ) { args->end_slot = last_slot; }
199 :
200 0 : FD_LOG_NOTICE(( "copying over rocks db for range [%lu, %lu]", args->start_slot, args->end_slot ));
201 :
202 : /* Copy over all slot indexed columns */
203 0 : for( ulong cf_idx = 1; cf_idx < FD_ROCKSDB_CF_CNT; ++cf_idx ) {
204 0 : fd_rocksdb_copy_over_slot_indexed_range( &big_rocksdb, &mini_rocksdb, cf_idx,
205 0 : args->start_slot, args->end_slot );
206 0 : }
207 0 : FD_LOG_NOTICE(("copied over all slot indexed columns"));
208 :
209 : /* Copy over transactions. This is more complicated because first, a temporary
210 : blockstore will be populated. This will be used to look up transactions
211 : which can be quickly queried */
212 0 : if( args->copy_txn_status ) {
213 : // /* Ingest block range into blockstore */
214 : // ingest_rocksdb( args->rocksdb_path,
215 : // args->start_slot,
216 : // args->end_slot,
217 : // args->blockstore,
218 : // ULONG_MAX,
219 : // args->valloc );
220 :
221 0 : } else {
222 0 : FD_LOG_NOTICE(( "skipping copying of transaction statuses" ));
223 0 : }
224 :
225 : /* TODO: Currently, the address signatures column family isn't copied as it
226 : is indexed on the pubkey. */
227 :
228 0 : fd_rocksdb_destroy( &big_rocksdb );
229 0 : fd_rocksdb_destroy( &mini_rocksdb );
230 0 : }
231 :
232 : /* Parse user arguments and setup shared data structures used across commands */
233 : int
234 0 : initial_setup( int argc, char ** argv, fd_ledger_args_t * args ) {
235 0 : if( FD_UNLIKELY( argc==1 ) ) {
236 0 : return 1;
237 0 : }
238 :
239 0 : fd_boot( &argc, &argv );
240 :
241 0 : char const * wksp_name = fd_env_strip_cmdline_cstr ( &argc, &argv, "--wksp-name", NULL, NULL );
242 0 : ulong page_cnt = fd_env_strip_cmdline_ulong ( &argc, &argv, "--page-cnt", NULL, 5 );
243 0 : int reset = fd_env_strip_cmdline_int ( &argc, &argv, "--reset", NULL, 0 );
244 0 : char const * cmd = fd_env_strip_cmdline_cstr ( &argc, &argv, "--cmd", NULL, NULL );
245 0 : int copy_txn_status = fd_env_strip_cmdline_int ( &argc, &argv, "--copy-txn-status", NULL, 0 );
246 0 : ulong slot_history_max = fd_env_strip_cmdline_ulong ( &argc, &argv, "--slot-history", NULL, 100UL );
247 0 : ulong shred_max = fd_env_strip_cmdline_ulong ( &argc, &argv, "--shred-max", NULL, 1UL << 17 );
248 0 : ulong start_slot = fd_env_strip_cmdline_ulong ( &argc, &argv, "--start-slot", NULL, 0UL );
249 0 : ulong end_slot = fd_env_strip_cmdline_ulong ( &argc, &argv, "--end-slot", NULL, ULONG_MAX );
250 0 : char const * restore = fd_env_strip_cmdline_cstr ( &argc, &argv, "--restore", NULL, NULL );
251 0 : ulong trash_hash = fd_env_strip_cmdline_ulong ( &argc, &argv, "--trash-hash", NULL, ULONG_MAX );
252 0 : char const * mini_db_dir = fd_env_strip_cmdline_cstr ( &argc, &argv, "--minified-rocksdb", NULL, NULL );
253 0 : char const * rocksdb_path = fd_env_strip_cmdline_cstr ( &argc, &argv, "--rocksdb", NULL, NULL );
254 :
255 : // TODO: Add argument validation. Make sure that we aren't including any arguments that aren't parsed for
256 :
257 0 : char hostname[64];
258 0 : gethostname( hostname, sizeof(hostname) );
259 0 : ulong hashseed = fd_hash( 0, hostname, strnlen( hostname, sizeof(hostname) ) );
260 0 : args->hashseed = (uint)hashseed;
261 :
262 : /* Setup workspace */
263 0 : fd_wksp_t * wksp;
264 0 : if( wksp_name == NULL ) {
265 0 : FD_LOG_NOTICE(( "--wksp not specified, using an anonymous local workspace" ));
266 0 : wksp = fd_wksp_new_anonymous( FD_SHMEM_GIGANTIC_PAGE_SZ, page_cnt, 0, "wksp", 0UL );
267 0 : } else {
268 0 : fd_shmem_info_t shmem_info[1];
269 0 : if( FD_UNLIKELY( fd_shmem_info( wksp_name, 0UL, shmem_info ) ) )
270 0 : FD_LOG_ERR(( "unable to query region \"%s\"\n\tprobably does not exist or bad permissions", wksp_name ));
271 0 : wksp = fd_wksp_attach( wksp_name );
272 0 : }
273 :
274 0 : if( wksp == NULL ) {
275 0 : FD_LOG_ERR(( "failed to attach to workspace %s", wksp_name ));
276 0 : }
277 0 : if( reset ) {
278 0 : fd_wksp_reset( wksp, args->hashseed );
279 0 : }
280 0 : args->wksp = wksp;
281 :
282 : /* Copy over arguments */
283 0 : args->cmd = cmd;
284 0 : args->start_slot = start_slot;
285 0 : args->end_slot = end_slot;
286 0 : args->shred_max = shred_max;
287 0 : args->slot_history_max = slot_history_max;
288 0 : args->restore = restore;
289 0 : args->mini_db_dir = mini_db_dir;
290 0 : args->copy_txn_status = copy_txn_status;
291 0 : args->trash_hash = trash_hash;
292 0 : args->rocksdb_path = rocksdb_path;
293 :
294 0 : if( args->rocksdb_path != NULL ) {
295 0 : FD_LOG_NOTICE(( "rocksdb=%s", args->rocksdb_path ));
296 0 : }
297 :
298 0 : return 0;
299 0 : }
300 :
301 : int main( int argc, char ** argv ) {
302 : /* Declaring this on the stack gets the alignment wrong when using asan */
303 : fd_ledger_args_t * args = fd_alloca( alignof(fd_ledger_args_t), sizeof(fd_ledger_args_t) );
304 : memset( args, 0, sizeof(fd_ledger_args_t) );
305 : initial_setup( argc, argv, args );
306 :
307 : if( args->cmd == NULL ) {
308 : FD_LOG_ERR(( "no command specified" ));
309 : } else if( strcmp( args->cmd, "minify" ) == 0 ) {
310 : minify( args );
311 : } else {
312 : FD_LOG_ERR(( "unknown command=%s", args->cmd ));
313 : }
314 :
315 : return 0;
316 : }
|