Line data Source code
1 : #include "../../flamenco/types/fd_types.h"
2 : #include "../../flamenco/runtime/fd_rocksdb.h"
3 : #include "../../flamenco/runtime/context/fd_capture_ctx.h"
4 : #include <unistd.h>
5 : #include <sys/stat.h>
6 :
7 : struct fd_ledger_args {
8 : fd_wksp_t * wksp; /* wksp for blockstore */
9 : char const * cmd; /* user passed command to fd_ledger */
10 : ulong start_slot; /* start slot for offline replay */
11 : ulong end_slot; /* end slot for offline replay */
12 : uint hashseed; /* hashseed */
13 : char const * restore; /* wksp restore */
14 : ulong shred_max; /* maximum number of shreds*/
15 : ulong slot_history_max; /* number of slots stored by blockstore*/
16 : char const * mini_db_dir; /* path to minifed rocksdb that's to be created */
17 : int copy_txn_status; /* determine if txns should be copied to the blockstore during minify/replay */
18 : ulong trash_hash; /* trash hash to be used for negative cases*/
19 : char const * rocksdb_path; /* path to rocksdb directory */
20 : };
21 : typedef struct fd_ledger_args fd_ledger_args_t;
22 :
23 : void
24 : ingest_rocksdb( char const * file,
25 : ulong start_slot,
26 : ulong end_slot,
27 0 : FD_PARAM_UNUSED ulong trash_hash ) {
28 :
29 0 : fd_rocksdb_t rocks_db;
30 0 : char * err = fd_rocksdb_init( &rocks_db, file );
31 0 : if( FD_UNLIKELY( err!=NULL ) ) {
32 0 : FD_LOG_ERR(( "fd_rocksdb_init returned %s", err ));
33 0 : }
34 :
35 0 : ulong last_slot = fd_rocksdb_last_slot( &rocks_db, &err );
36 0 : if( FD_UNLIKELY( err!=NULL ) ) {
37 0 : FD_LOG_ERR(( "fd_rocksdb_last_slot returned %s", err ));
38 0 : }
39 :
40 0 : if( last_slot < start_slot ) {
41 0 : FD_LOG_ERR(( "rocksdb blocks are older than snapshot. first=%lu last=%lu wanted=%lu",
42 0 : fd_rocksdb_first_slot(&rocks_db, &err), last_slot, start_slot ));
43 0 : }
44 :
45 0 : FD_LOG_NOTICE(( "ingesting rocksdb from start=%lu to end=%lu", start_slot, end_slot ));
46 :
47 0 : fd_rocksdb_root_iter_t iter = {0};
48 0 : fd_rocksdb_root_iter_new( &iter );
49 :
50 0 : fd_slot_meta_t * slot_meta = NULL;
51 :
52 0 : while( !slot_meta && start_slot<=end_slot ) {
53 0 : slot_meta = fd_rocksdb_root_iter_seek( &iter, &rocks_db, start_slot );
54 0 : if( !slot_meta ) { /* what is this logic??? */
55 0 : start_slot++;
56 0 : }
57 0 : }
58 0 : if( FD_UNLIKELY( !slot_meta ) ) {
59 0 : FD_LOG_ERR(( "unable to seek to any slot" ));
60 0 : }
61 :
62 0 : uchar trash_hash_buf[32];
63 0 : memset( trash_hash_buf, 0xFE, sizeof(trash_hash_buf) );
64 :
65 0 : ulong blk_cnt = 0;
66 0 : do {
67 0 : ulong slot = slot_meta->slot;
68 0 : if( slot > end_slot ) {
69 0 : break;
70 0 : }
71 :
72 : /* Read and deshred block from RocksDB */
73 0 : if( blk_cnt % 100 == 0 ) {
74 0 : FD_LOG_WARNING(( "imported %lu blocks", blk_cnt ));
75 0 : }
76 :
77 0 : if( FD_UNLIKELY( err ) ) {
78 0 : FD_LOG_ERR(( "fd_rocksdb_get_block failed" ));
79 0 : }
80 :
81 0 : ++blk_cnt;
82 :
83 0 : free( slot_meta ); slot_meta = NULL;
84 0 : slot_meta = fd_rocksdb_root_iter_next( &iter );
85 0 : if( !slot_meta ) {
86 : // FD_LOG_WARNING(("Failed for slot %lu", slot + 1));
87 0 : slot_meta = fd_rocksdb_get_meta( &rocks_db, slot + 1 );
88 0 : if( !slot_meta ) break;
89 0 : }
90 : // FD_LOG_ERR(("fd_rocksdb_root_iter_seek returned %d", ret));
91 0 : } while (1);
92 :
93 0 : free( slot_meta ); slot_meta = NULL;
94 0 : fd_rocksdb_root_iter_destroy( &iter );
95 0 : fd_rocksdb_destroy( &rocks_db );
96 :
97 0 : FD_LOG_NOTICE(( "ingested %lu blocks", blk_cnt ));
98 0 : }
99 :
100 : void
101 0 : wksp_restore( fd_ledger_args_t * args ) {
102 0 : if( args->restore != NULL ) {
103 0 : FD_LOG_NOTICE(( "restoring wksp %s", args->restore ));
104 0 : fd_wksp_restore( args->wksp, args->restore, args->hashseed );
105 0 : }
106 0 : }
107 :
108 : /********************* Main Command Functions and Setup ***********************/
109 : void
110 0 : minify( fd_ledger_args_t * args ) {
111 : /* Example commmand:
112 : fd_ledger --cmd minify --rocksdb <LARGE_ROCKSDB> --minified-rocksdb <MINI_ROCKSDB>
113 : --start-slot <START_SLOT> --end-slot <END_SLOT> --copy-txn-status 1
114 : */
115 0 : if( args->rocksdb_path == NULL ) {
116 0 : FD_LOG_ERR(( "rocksdb path is NULL" ));
117 0 : }
118 0 : if( args->mini_db_dir == NULL ) {
119 0 : FD_LOG_ERR(( "minified rocksdb path is NULL" ));
120 0 : }
121 :
122 0 : fd_rocksdb_t big_rocksdb;
123 0 : char * err = fd_rocksdb_init( &big_rocksdb, args->rocksdb_path );
124 0 : if( FD_UNLIKELY( err!=NULL ) ) {
125 0 : FD_LOG_ERR(( "fd_rocksdb_init at path=%s returned error=%s", args->rocksdb_path, err ));
126 0 : }
127 :
128 : /* If the directory for the minified rocksdb already exists, error out */
129 0 : struct stat statbuf;
130 0 : if( stat( args->mini_db_dir, &statbuf ) == 0 ) {
131 0 : FD_LOG_ERR(( "path for mini_db_dir=%s already exists", args->mini_db_dir ));
132 0 : }
133 :
134 : /* Create a new smaller rocksdb */
135 0 : fd_rocksdb_t mini_rocksdb;
136 0 : fd_rocksdb_new( &mini_rocksdb, args->mini_db_dir );
137 :
138 : /* Correctly bound off start and end slot */
139 0 : ulong first_slot = fd_rocksdb_first_slot( &big_rocksdb, &err );
140 0 : ulong last_slot = fd_rocksdb_last_slot( &big_rocksdb, &err );
141 0 : if( args->start_slot < first_slot ) { args->start_slot = first_slot; }
142 0 : if( args->end_slot > last_slot ) { args->end_slot = last_slot; }
143 :
144 0 : FD_LOG_NOTICE(( "copying over rocks db for range [%lu, %lu]", args->start_slot, args->end_slot ));
145 :
146 : /* Copy over all slot indexed columns */
147 0 : for( ulong cf_idx = 1; cf_idx < FD_ROCKSDB_CF_CNT; ++cf_idx ) {
148 0 : fd_rocksdb_copy_over_slot_indexed_range( &big_rocksdb, &mini_rocksdb, cf_idx,
149 0 : args->start_slot, args->end_slot );
150 0 : }
151 0 : FD_LOG_NOTICE(("copied over all slot indexed columns"));
152 :
153 : /* Copy over transactions. This is more complicated because first, a temporary
154 : blockstore will be populated. This will be used to look up transactions
155 : which can be quickly queried */
156 0 : if( args->copy_txn_status ) {
157 : // /* Ingest block range into blockstore */
158 : // ingest_rocksdb( args->rocksdb_path,
159 : // args->start_slot,
160 : // args->end_slot,
161 : // args->blockstore,
162 : // ULONG_MAX );
163 :
164 0 : } else {
165 0 : FD_LOG_NOTICE(( "skipping copying of transaction statuses" ));
166 0 : }
167 :
168 : /* TODO: Currently, the address signatures column family isn't copied as it
169 : is indexed on the pubkey. */
170 :
171 0 : fd_rocksdb_destroy( &big_rocksdb );
172 0 : fd_rocksdb_destroy( &mini_rocksdb );
173 0 : }
174 :
175 : /* Parse user arguments and setup shared data structures used across commands */
176 : int
177 0 : initial_setup( int argc, char ** argv, fd_ledger_args_t * args ) {
178 0 : if( FD_UNLIKELY( argc==1 ) ) {
179 0 : return 1;
180 0 : }
181 :
182 0 : fd_boot( &argc, &argv );
183 :
184 0 : char const * wksp_name = fd_env_strip_cmdline_cstr ( &argc, &argv, "--wksp-name", NULL, NULL );
185 0 : ulong page_cnt = fd_env_strip_cmdline_ulong ( &argc, &argv, "--page-cnt", NULL, 5 );
186 0 : int reset = fd_env_strip_cmdline_int ( &argc, &argv, "--reset", NULL, 0 );
187 0 : char const * cmd = fd_env_strip_cmdline_cstr ( &argc, &argv, "--cmd", NULL, NULL );
188 0 : int copy_txn_status = fd_env_strip_cmdline_int ( &argc, &argv, "--copy-txn-status", NULL, 0 );
189 0 : ulong slot_history_max = fd_env_strip_cmdline_ulong ( &argc, &argv, "--slot-history", NULL, 100UL );
190 0 : ulong shred_max = fd_env_strip_cmdline_ulong ( &argc, &argv, "--shred-max", NULL, 1UL << 17 );
191 0 : ulong start_slot = fd_env_strip_cmdline_ulong ( &argc, &argv, "--start-slot", NULL, 0UL );
192 0 : ulong end_slot = fd_env_strip_cmdline_ulong ( &argc, &argv, "--end-slot", NULL, ULONG_MAX );
193 0 : char const * restore = fd_env_strip_cmdline_cstr ( &argc, &argv, "--restore", NULL, NULL );
194 0 : ulong trash_hash = fd_env_strip_cmdline_ulong ( &argc, &argv, "--trash-hash", NULL, ULONG_MAX );
195 0 : char const * mini_db_dir = fd_env_strip_cmdline_cstr ( &argc, &argv, "--minified-rocksdb", NULL, NULL );
196 0 : char const * rocksdb_path = fd_env_strip_cmdline_cstr ( &argc, &argv, "--rocksdb", NULL, NULL );
197 :
198 : // TODO: Add argument validation. Make sure that we aren't including any arguments that aren't parsed for
199 :
200 0 : char hostname[64];
201 0 : gethostname( hostname, sizeof(hostname) );
202 0 : ulong hashseed = fd_hash( 0, hostname, strnlen( hostname, sizeof(hostname) ) );
203 0 : args->hashseed = (uint)hashseed;
204 :
205 : /* Setup workspace */
206 0 : fd_wksp_t * wksp;
207 0 : if( wksp_name == NULL ) {
208 0 : FD_LOG_NOTICE(( "--wksp not specified, using an anonymous local workspace" ));
209 0 : wksp = fd_wksp_new_anonymous( FD_SHMEM_GIGANTIC_PAGE_SZ, page_cnt, 0, "wksp", 0UL );
210 0 : } else {
211 0 : fd_shmem_info_t shmem_info[1];
212 0 : if( FD_UNLIKELY( fd_shmem_info( wksp_name, 0UL, shmem_info ) ) )
213 0 : FD_LOG_ERR(( "unable to query region \"%s\"\n\tprobably does not exist or bad permissions", wksp_name ));
214 0 : wksp = fd_wksp_attach( wksp_name );
215 0 : }
216 :
217 0 : if( wksp == NULL ) {
218 0 : FD_LOG_ERR(( "failed to attach to workspace %s", wksp_name ));
219 0 : }
220 0 : if( reset ) {
221 0 : fd_wksp_reset( wksp, args->hashseed );
222 0 : }
223 0 : args->wksp = wksp;
224 :
225 : /* Copy over arguments */
226 0 : args->cmd = cmd;
227 0 : args->start_slot = start_slot;
228 0 : args->end_slot = end_slot;
229 0 : args->shred_max = shred_max;
230 0 : args->slot_history_max = slot_history_max;
231 0 : args->restore = restore;
232 0 : args->mini_db_dir = mini_db_dir;
233 0 : args->copy_txn_status = copy_txn_status;
234 0 : args->trash_hash = trash_hash;
235 0 : args->rocksdb_path = rocksdb_path;
236 :
237 0 : if( args->rocksdb_path != NULL ) {
238 0 : FD_LOG_NOTICE(( "rocksdb=%s", args->rocksdb_path ));
239 0 : }
240 :
241 0 : return 0;
242 0 : }
243 :
244 : int main( int argc, char ** argv ) {
245 : /* Declaring this on the stack gets the alignment wrong when using asan */
246 : fd_ledger_args_t * args = fd_alloca( alignof(fd_ledger_args_t), sizeof(fd_ledger_args_t) );
247 : memset( args, 0, sizeof(fd_ledger_args_t) );
248 : initial_setup( argc, argv, args );
249 :
250 : if( args->cmd == NULL ) {
251 : FD_LOG_ERR(( "no command specified" ));
252 : } else if( strcmp( args->cmd, "minify" ) == 0 ) {
253 : minify( args );
254 : } else {
255 : FD_LOG_ERR(( "unknown command=%s", args->cmd ));
256 : }
257 :
258 : return 0;
259 : }
|