Line data Source code
1 : #include "../../flamenco/types/fd_types.h"
2 : #include "../../flamenco/runtime/fd_rocksdb.h"
3 : #include "../../flamenco/runtime/context/fd_capture_ctx.h"
4 : #include <unistd.h>
5 : #include <sys/stat.h>
6 :
7 : struct fd_ledger_args {
8 : fd_wksp_t * wksp; /* wksp for blockstore */
9 : char const * cmd; /* user passed command to fd_ledger */
10 : ulong start_slot; /* start slot for offline replay */
11 : ulong end_slot; /* end slot for offline replay */
12 : uint hashseed; /* hashseed */
13 : char const * restore; /* wksp restore */
14 : ulong shred_max; /* maximum number of shreds*/
15 : ulong slot_history_max; /* number of slots stored by blockstore*/
16 : char const * mini_db_dir; /* path to minifed rocksdb that's to be created */
17 : int copy_txn_status; /* determine if txns should be copied to the blockstore during minify/replay */
18 : ulong trash_hash; /* trash hash to be used for negative cases*/
19 : char const * rocksdb_path; /* path to rocksdb directory */
20 : fd_valloc_t valloc; /* wksp valloc that should NOT be used for runtime allocations */
21 : };
22 : typedef struct fd_ledger_args fd_ledger_args_t;
23 :
24 : /***************************** Helpers ****************************************/
25 : static fd_valloc_t
26 0 : allocator_setup( fd_wksp_t * wksp ) {
27 :
28 0 : if( FD_UNLIKELY( !wksp ) ) {
29 0 : FD_LOG_ERR(( "workspace is NULL" ));
30 0 : }
31 :
32 0 : void * alloc_shmem = fd_wksp_alloc_laddr( wksp, fd_alloc_align(), fd_alloc_footprint(), 3UL );
33 0 : if( FD_UNLIKELY( !alloc_shmem ) ) { FD_LOG_ERR( ( "fd_alloc too large for workspace" ) ); }
34 0 : void * alloc_shalloc = fd_alloc_new( alloc_shmem, 3UL );
35 0 : if( FD_UNLIKELY( !alloc_shalloc ) ) { FD_LOG_ERR( ( "fd_alloc_new failed" ) ); }
36 0 : fd_alloc_t * alloc = fd_alloc_join( alloc_shalloc, 3UL );
37 0 : if( FD_UNLIKELY( !alloc ) ) { FD_LOG_ERR( ( "fd_alloc_join failed" ) ); }
38 0 : fd_valloc_t valloc = fd_alloc_virtual( alloc );
39 0 : return valloc;
40 :
41 : /* NOTE: Enable this if leak hunting */
42 : //return fd_backtracing_alloc_virtual( &valloc );
43 :
44 0 : }
45 :
46 : void
47 : ingest_rocksdb( char const * file,
48 : ulong start_slot,
49 : ulong end_slot,
50 : FD_PARAM_UNUSED ulong trash_hash,
51 0 : fd_valloc_t valloc ) {
52 :
53 0 : fd_rocksdb_t rocks_db;
54 0 : char * err = fd_rocksdb_init( &rocks_db, file );
55 0 : if( FD_UNLIKELY( err!=NULL ) ) {
56 0 : FD_LOG_ERR(( "fd_rocksdb_init returned %s", err ));
57 0 : }
58 :
59 0 : ulong last_slot = fd_rocksdb_last_slot( &rocks_db, &err );
60 0 : if( FD_UNLIKELY( err!=NULL ) ) {
61 0 : FD_LOG_ERR(( "fd_rocksdb_last_slot returned %s", err ));
62 0 : }
63 :
64 0 : if( last_slot < start_slot ) {
65 0 : FD_LOG_ERR(( "rocksdb blocks are older than snapshot. first=%lu last=%lu wanted=%lu",
66 0 : fd_rocksdb_first_slot(&rocks_db, &err), last_slot, start_slot ));
67 0 : }
68 :
69 0 : FD_LOG_NOTICE(( "ingesting rocksdb from start=%lu to end=%lu", start_slot, end_slot ));
70 :
71 0 : fd_rocksdb_root_iter_t iter = {0};
72 0 : fd_rocksdb_root_iter_new( &iter );
73 :
74 0 : fd_slot_meta_t slot_meta = {0};
75 0 : fd_memset( &slot_meta, 0, sizeof(slot_meta) );
76 :
77 0 : int block_found = -1;
78 0 : while ( block_found!=0 && start_slot<=end_slot ) {
79 0 : block_found = fd_rocksdb_root_iter_seek( &iter, &rocks_db, start_slot, &slot_meta, valloc );
80 0 : if ( block_found!=0 ) {
81 0 : start_slot++;
82 0 : }
83 0 : }
84 0 : if( FD_UNLIKELY( block_found!=0 ) ) {
85 0 : FD_LOG_ERR(( "unable to seek to any slot" ));
86 0 : }
87 :
88 0 : uchar trash_hash_buf[32];
89 0 : memset( trash_hash_buf, 0xFE, sizeof(trash_hash_buf) );
90 :
91 0 : ulong blk_cnt = 0;
92 0 : do {
93 0 : ulong slot = slot_meta.slot;
94 0 : if( slot > end_slot ) {
95 0 : break;
96 0 : }
97 :
98 : /* Read and deshred block from RocksDB */
99 0 : if( blk_cnt % 100 == 0 ) {
100 0 : FD_LOG_WARNING(( "imported %lu blocks", blk_cnt ));
101 0 : }
102 :
103 0 : if( FD_UNLIKELY( err ) ) {
104 0 : FD_LOG_ERR(( "fd_rocksdb_get_block failed" ));
105 0 : }
106 :
107 0 : ++blk_cnt;
108 :
109 0 : memset( &slot_meta, 0, sizeof(fd_slot_meta_t) );
110 :
111 0 : int ret = fd_rocksdb_root_iter_next( &iter, &slot_meta, valloc );
112 0 : if( ret < 0 ) {
113 : // FD_LOG_WARNING(("Failed for slot %lu", slot + 1));
114 0 : ret = fd_rocksdb_get_meta( &rocks_db, slot + 1, &slot_meta, valloc );
115 0 : if( ret < 0 ) {
116 0 : break;
117 0 : }
118 0 : }
119 : // FD_LOG_ERR(("fd_rocksdb_root_iter_seek returned %d", ret));
120 0 : } while (1);
121 :
122 0 : fd_rocksdb_root_iter_destroy( &iter );
123 0 : fd_rocksdb_destroy( &rocks_db );
124 :
125 0 : FD_LOG_NOTICE(( "ingested %lu blocks", blk_cnt ));
126 0 : }
127 :
128 : void
129 0 : wksp_restore( fd_ledger_args_t * args ) {
130 0 : if( args->restore != NULL ) {
131 0 : FD_LOG_NOTICE(( "restoring wksp %s", args->restore ));
132 0 : fd_wksp_restore( args->wksp, args->restore, args->hashseed );
133 0 : }
134 0 : }
135 :
136 : /********************* Main Command Functions and Setup ***********************/
137 : void
138 0 : minify( fd_ledger_args_t * args ) {
139 : /* Example commmand:
140 : fd_ledger --cmd minify --rocksdb <LARGE_ROCKSDB> --minified-rocksdb <MINI_ROCKSDB>
141 : --start-slot <START_SLOT> --end-slot <END_SLOT> --copy-txn-status 1
142 : */
143 0 : if( args->rocksdb_path == NULL ) {
144 0 : FD_LOG_ERR(( "rocksdb path is NULL" ));
145 0 : }
146 0 : if( args->mini_db_dir == NULL ) {
147 0 : FD_LOG_ERR(( "minified rocksdb path is NULL" ));
148 0 : }
149 :
150 0 : args->valloc = allocator_setup( args->wksp );
151 :
152 0 : fd_rocksdb_t big_rocksdb;
153 0 : char * err = fd_rocksdb_init( &big_rocksdb, args->rocksdb_path );
154 0 : if( FD_UNLIKELY( err!=NULL ) ) {
155 0 : FD_LOG_ERR(( "fd_rocksdb_init at path=%s returned error=%s", args->rocksdb_path, err ));
156 0 : }
157 :
158 : /* If the directory for the minified rocksdb already exists, error out */
159 0 : struct stat statbuf;
160 0 : if( stat( args->mini_db_dir, &statbuf ) == 0 ) {
161 0 : FD_LOG_ERR(( "path for mini_db_dir=%s already exists", args->mini_db_dir ));
162 0 : }
163 :
164 : /* Create a new smaller rocksdb */
165 0 : fd_rocksdb_t mini_rocksdb;
166 0 : fd_rocksdb_new( &mini_rocksdb, args->mini_db_dir );
167 :
168 : /* Correctly bound off start and end slot */
169 0 : ulong first_slot = fd_rocksdb_first_slot( &big_rocksdb, &err );
170 0 : ulong last_slot = fd_rocksdb_last_slot( &big_rocksdb, &err );
171 0 : if( args->start_slot < first_slot ) { args->start_slot = first_slot; }
172 0 : if( args->end_slot > last_slot ) { args->end_slot = last_slot; }
173 :
174 0 : FD_LOG_NOTICE(( "copying over rocks db for range [%lu, %lu]", args->start_slot, args->end_slot ));
175 :
176 : /* Copy over all slot indexed columns */
177 0 : for( ulong cf_idx = 1; cf_idx < FD_ROCKSDB_CF_CNT; ++cf_idx ) {
178 0 : fd_rocksdb_copy_over_slot_indexed_range( &big_rocksdb, &mini_rocksdb, cf_idx,
179 0 : args->start_slot, args->end_slot );
180 0 : }
181 0 : FD_LOG_NOTICE(("copied over all slot indexed columns"));
182 :
183 : /* Copy over transactions. This is more complicated because first, a temporary
184 : blockstore will be populated. This will be used to look up transactions
185 : which can be quickly queried */
186 0 : if( args->copy_txn_status ) {
187 : // /* Ingest block range into blockstore */
188 : // ingest_rocksdb( args->rocksdb_path,
189 : // args->start_slot,
190 : // args->end_slot,
191 : // args->blockstore,
192 : // ULONG_MAX,
193 : // args->valloc );
194 :
195 0 : } else {
196 0 : FD_LOG_NOTICE(( "skipping copying of transaction statuses" ));
197 0 : }
198 :
199 : /* TODO: Currently, the address signatures column family isn't copied as it
200 : is indexed on the pubkey. */
201 :
202 0 : fd_rocksdb_destroy( &big_rocksdb );
203 0 : fd_rocksdb_destroy( &mini_rocksdb );
204 0 : }
205 :
206 : /* Parse user arguments and setup shared data structures used across commands */
207 : int
208 0 : initial_setup( int argc, char ** argv, fd_ledger_args_t * args ) {
209 0 : if( FD_UNLIKELY( argc==1 ) ) {
210 0 : return 1;
211 0 : }
212 :
213 0 : fd_boot( &argc, &argv );
214 :
215 0 : char const * wksp_name = fd_env_strip_cmdline_cstr ( &argc, &argv, "--wksp-name", NULL, NULL );
216 0 : ulong page_cnt = fd_env_strip_cmdline_ulong ( &argc, &argv, "--page-cnt", NULL, 5 );
217 0 : int reset = fd_env_strip_cmdline_int ( &argc, &argv, "--reset", NULL, 0 );
218 0 : char const * cmd = fd_env_strip_cmdline_cstr ( &argc, &argv, "--cmd", NULL, NULL );
219 0 : int copy_txn_status = fd_env_strip_cmdline_int ( &argc, &argv, "--copy-txn-status", NULL, 0 );
220 0 : ulong slot_history_max = fd_env_strip_cmdline_ulong ( &argc, &argv, "--slot-history", NULL, 100UL );
221 0 : ulong shred_max = fd_env_strip_cmdline_ulong ( &argc, &argv, "--shred-max", NULL, 1UL << 17 );
222 0 : ulong start_slot = fd_env_strip_cmdline_ulong ( &argc, &argv, "--start-slot", NULL, 0UL );
223 0 : ulong end_slot = fd_env_strip_cmdline_ulong ( &argc, &argv, "--end-slot", NULL, ULONG_MAX );
224 0 : char const * restore = fd_env_strip_cmdline_cstr ( &argc, &argv, "--restore", NULL, NULL );
225 0 : ulong trash_hash = fd_env_strip_cmdline_ulong ( &argc, &argv, "--trash-hash", NULL, ULONG_MAX );
226 0 : char const * mini_db_dir = fd_env_strip_cmdline_cstr ( &argc, &argv, "--minified-rocksdb", NULL, NULL );
227 0 : char const * rocksdb_path = fd_env_strip_cmdline_cstr ( &argc, &argv, "--rocksdb", NULL, NULL );
228 :
229 : // TODO: Add argument validation. Make sure that we aren't including any arguments that aren't parsed for
230 :
231 0 : char hostname[64];
232 0 : gethostname( hostname, sizeof(hostname) );
233 0 : ulong hashseed = fd_hash( 0, hostname, strnlen( hostname, sizeof(hostname) ) );
234 0 : args->hashseed = (uint)hashseed;
235 :
236 : /* Setup workspace */
237 0 : fd_wksp_t * wksp;
238 0 : if( wksp_name == NULL ) {
239 0 : FD_LOG_NOTICE(( "--wksp not specified, using an anonymous local workspace" ));
240 0 : wksp = fd_wksp_new_anonymous( FD_SHMEM_GIGANTIC_PAGE_SZ, page_cnt, 0, "wksp", 0UL );
241 0 : } else {
242 0 : fd_shmem_info_t shmem_info[1];
243 0 : if( FD_UNLIKELY( fd_shmem_info( wksp_name, 0UL, shmem_info ) ) )
244 0 : FD_LOG_ERR(( "unable to query region \"%s\"\n\tprobably does not exist or bad permissions", wksp_name ));
245 0 : wksp = fd_wksp_attach( wksp_name );
246 0 : }
247 :
248 0 : if( wksp == NULL ) {
249 0 : FD_LOG_ERR(( "failed to attach to workspace %s", wksp_name ));
250 0 : }
251 0 : if( reset ) {
252 0 : fd_wksp_reset( wksp, args->hashseed );
253 0 : }
254 0 : args->wksp = wksp;
255 :
256 : /* Copy over arguments */
257 0 : args->cmd = cmd;
258 0 : args->start_slot = start_slot;
259 0 : args->end_slot = end_slot;
260 0 : args->shred_max = shred_max;
261 0 : args->slot_history_max = slot_history_max;
262 0 : args->restore = restore;
263 0 : args->mini_db_dir = mini_db_dir;
264 0 : args->copy_txn_status = copy_txn_status;
265 0 : args->trash_hash = trash_hash;
266 0 : args->rocksdb_path = rocksdb_path;
267 :
268 0 : if( args->rocksdb_path != NULL ) {
269 0 : FD_LOG_NOTICE(( "rocksdb=%s", args->rocksdb_path ));
270 0 : }
271 :
272 0 : return 0;
273 0 : }
274 :
275 : int main( int argc, char ** argv ) {
276 : /* Declaring this on the stack gets the alignment wrong when using asan */
277 : fd_ledger_args_t * args = fd_alloca( alignof(fd_ledger_args_t), sizeof(fd_ledger_args_t) );
278 : memset( args, 0, sizeof(fd_ledger_args_t) );
279 : initial_setup( argc, argv, args );
280 :
281 : if( args->cmd == NULL ) {
282 : FD_LOG_ERR(( "no command specified" ));
283 : } else if( strcmp( args->cmd, "minify" ) == 0 ) {
284 : minify( args );
285 : } else {
286 : FD_LOG_ERR(( "unknown command=%s", args->cmd ));
287 : }
288 :
289 : return 0;
290 : }
|