Line data Source code
1 : #include "../../flamenco/types/fd_types.h"
2 : #include "../../flamenco/runtime/fd_rocksdb.h"
3 : #include <stdlib.h>
4 : #include <unistd.h>
5 : #include <sys/stat.h>
6 :
7 : struct fd_ledger_args {
8 : fd_wksp_t * wksp; /* wksp for blockstore */
9 : char const * cmd; /* user passed command to fd_ledger */
10 : ulong start_slot; /* start slot for offline replay */
11 : ulong end_slot; /* end slot for offline replay */
12 : uint hashseed; /* hashseed */
13 : ulong shred_max; /* maximum number of shreds*/
14 : ulong slot_history_max; /* number of slots stored by blockstore*/
15 : char const * mini_db_dir; /* path to minifed rocksdb that's to be created */
16 : int copy_txn_status; /* determine if txns should be copied to the blockstore during minify/replay */
17 : char const * rocksdb_path; /* path to rocksdb directory */
18 : };
19 : typedef struct fd_ledger_args fd_ledger_args_t;
20 :
21 : void
22 : ingest_rocksdb( char const * file,
23 : ulong start_slot,
24 0 : ulong end_slot ) {
25 :
26 0 : fd_rocksdb_t rocks_db;
27 0 : char * err = fd_rocksdb_init( &rocks_db, file );
28 0 : if( FD_UNLIKELY( err!=NULL ) ) {
29 0 : FD_LOG_ERR(( "fd_rocksdb_init returned %s", err ));
30 0 : }
31 :
32 0 : ulong last_slot = fd_rocksdb_last_slot( &rocks_db, &err );
33 0 : if( FD_UNLIKELY( err!=NULL ) ) {
34 0 : FD_LOG_ERR(( "fd_rocksdb_last_slot returned %s", err ));
35 0 : }
36 :
37 0 : if( last_slot < start_slot ) {
38 0 : FD_LOG_ERR(( "rocksdb blocks are older than snapshot. first=%lu last=%lu wanted=%lu",
39 0 : fd_rocksdb_first_slot(&rocks_db, &err), last_slot, start_slot ));
40 0 : }
41 :
42 0 : FD_LOG_NOTICE(( "ingesting rocksdb from start=%lu to end=%lu", start_slot, end_slot ));
43 :
44 0 : fd_rocksdb_root_iter_t iter = {0};
45 0 : fd_rocksdb_root_iter_new( &iter );
46 :
47 0 : fd_slot_meta_t * slot_meta = NULL;
48 :
49 0 : while( !slot_meta && start_slot<=end_slot ) {
50 0 : slot_meta = fd_rocksdb_root_iter_seek( &iter, &rocks_db, start_slot );
51 0 : if( !slot_meta ) { /* what is this logic??? */
52 0 : start_slot++;
53 0 : }
54 0 : }
55 0 : if( FD_UNLIKELY( !slot_meta ) ) {
56 0 : FD_LOG_ERR(( "unable to seek to any slot" ));
57 0 : }
58 :
59 0 : ulong blk_cnt = 0;
60 0 : do {
61 0 : ulong slot = slot_meta->slot;
62 0 : if( slot > end_slot ) {
63 0 : break;
64 0 : }
65 :
66 : /* Read and deshred block from RocksDB */
67 0 : if( blk_cnt % 100 == 0 ) {
68 0 : FD_LOG_WARNING(( "imported %lu blocks", blk_cnt ));
69 0 : }
70 :
71 0 : if( FD_UNLIKELY( err ) ) {
72 0 : FD_LOG_ERR(( "fd_rocksdb_get_block failed" ));
73 0 : }
74 :
75 0 : ++blk_cnt;
76 :
77 0 : free( slot_meta ); slot_meta = NULL;
78 0 : slot_meta = fd_rocksdb_root_iter_next( &iter );
79 0 : if( !slot_meta ) {
80 : // FD_LOG_WARNING(("Failed for slot %lu", slot + 1));
81 0 : slot_meta = fd_rocksdb_get_meta( &rocks_db, slot + 1 );
82 0 : if( !slot_meta ) break;
83 0 : }
84 : // FD_LOG_ERR(("fd_rocksdb_root_iter_seek returned %d", ret));
85 0 : } while (1);
86 :
87 0 : free( slot_meta ); slot_meta = NULL;
88 0 : fd_rocksdb_root_iter_destroy( &iter );
89 0 : fd_rocksdb_destroy( &rocks_db );
90 :
91 0 : FD_LOG_NOTICE(( "ingested %lu blocks", blk_cnt ));
92 0 : }
93 :
94 : /********************* Main Command Functions and Setup ***********************/
95 : void
96 0 : minify( fd_ledger_args_t * args ) {
97 : /* Example commmand:
98 : fd_ledger --cmd minify --rocksdb <LARGE_ROCKSDB> --minified-rocksdb <MINI_ROCKSDB>
99 : --start-slot <START_SLOT> --end-slot <END_SLOT> --copy-txn-status 1
100 : */
101 0 : if( args->rocksdb_path == NULL ) {
102 0 : FD_LOG_ERR(( "rocksdb path is NULL" ));
103 0 : }
104 0 : if( args->mini_db_dir == NULL ) {
105 0 : FD_LOG_ERR(( "minified rocksdb path is NULL" ));
106 0 : }
107 :
108 0 : fd_rocksdb_t big_rocksdb;
109 0 : char * err = fd_rocksdb_init( &big_rocksdb, args->rocksdb_path );
110 0 : if( FD_UNLIKELY( err!=NULL ) ) {
111 0 : FD_LOG_ERR(( "fd_rocksdb_init at path=%s returned error=%s", args->rocksdb_path, err ));
112 0 : }
113 :
114 : /* If the directory for the minified rocksdb already exists, error out */
115 0 : struct stat statbuf;
116 0 : if( stat( args->mini_db_dir, &statbuf ) == 0 ) {
117 0 : FD_LOG_ERR(( "path for mini_db_dir=%s already exists", args->mini_db_dir ));
118 0 : }
119 :
120 : /* Create a new smaller rocksdb */
121 0 : fd_rocksdb_t mini_rocksdb;
122 0 : fd_rocksdb_new( &mini_rocksdb, args->mini_db_dir );
123 :
124 : /* Correctly bound off start and end slot */
125 0 : ulong first_slot = fd_rocksdb_first_slot( &big_rocksdb, &err );
126 0 : ulong last_slot = fd_rocksdb_last_slot( &big_rocksdb, &err );
127 0 : if( args->start_slot < first_slot ) { args->start_slot = first_slot; }
128 0 : if( args->end_slot > last_slot ) { args->end_slot = last_slot; }
129 :
130 0 : FD_LOG_NOTICE(( "copying over rocks db for range [%lu, %lu]", args->start_slot, args->end_slot ));
131 :
132 : /* Copy over all slot indexed columns */
133 0 : for( ulong cf_idx = 1; cf_idx < FD_ROCKSDB_CF_CNT; ++cf_idx ) {
134 0 : fd_rocksdb_copy_over_slot_indexed_range( &big_rocksdb, &mini_rocksdb, cf_idx,
135 0 : args->start_slot, args->end_slot );
136 0 : }
137 0 : FD_LOG_NOTICE(("copied over all slot indexed columns"));
138 :
139 : /* Copy over transactions. This is more complicated because first, a temporary
140 : blockstore will be populated. This will be used to look up transactions
141 : which can be quickly queried */
142 0 : if( args->copy_txn_status ) {
143 : // /* Ingest block range into blockstore */
144 : // ingest_rocksdb( args->rocksdb_path,
145 : // args->start_slot,
146 : // args->end_slot,
147 : // args->blockstore,
148 : // ULONG_MAX );
149 :
150 0 : } else {
151 0 : FD_LOG_NOTICE(( "skipping copying of transaction statuses" ));
152 0 : }
153 :
154 : /* TODO: Currently, the address signatures column family isn't copied as it
155 : is indexed on the pubkey. */
156 :
157 0 : fd_rocksdb_destroy( &big_rocksdb );
158 0 : fd_rocksdb_destroy( &mini_rocksdb );
159 0 : }
160 :
161 : /* Parse user arguments and setup shared data structures used across commands */
162 : int
163 0 : initial_setup( int argc, char ** argv, fd_ledger_args_t * args ) {
164 0 : if( FD_UNLIKELY( argc==1 ) ) {
165 0 : return 1;
166 0 : }
167 :
168 0 : fd_boot( &argc, &argv );
169 :
170 0 : char const * cmd = fd_env_strip_cmdline_cstr ( &argc, &argv, "--cmd", NULL, NULL );
171 0 : int copy_txn_status = fd_env_strip_cmdline_int ( &argc, &argv, "--copy-txn-status", NULL, 0 );
172 0 : ulong slot_history_max = fd_env_strip_cmdline_ulong ( &argc, &argv, "--slot-history", NULL, 100UL );
173 0 : ulong shred_max = fd_env_strip_cmdline_ulong ( &argc, &argv, "--shred-max", NULL, 1UL << 17 );
174 0 : ulong start_slot = fd_env_strip_cmdline_ulong ( &argc, &argv, "--start-slot", NULL, 0UL );
175 0 : ulong end_slot = fd_env_strip_cmdline_ulong ( &argc, &argv, "--end-slot", NULL, ULONG_MAX );
176 0 : char const * mini_db_dir = fd_env_strip_cmdline_cstr ( &argc, &argv, "--minified-rocksdb", NULL, NULL );
177 0 : char const * rocksdb_path = fd_env_strip_cmdline_cstr ( &argc, &argv, "--rocksdb", NULL, NULL );
178 :
179 : // TODO: Add argument validation. Make sure that we aren't including any arguments that aren't parsed for
180 :
181 0 : char hostname[64];
182 0 : gethostname( hostname, sizeof(hostname) );
183 0 : ulong hashseed = fd_hash( 0, hostname, strnlen( hostname, sizeof(hostname) ) );
184 0 : args->hashseed = (uint)hashseed;
185 :
186 : /* Copy over arguments */
187 0 : args->cmd = cmd;
188 0 : args->start_slot = start_slot;
189 0 : args->end_slot = end_slot;
190 0 : args->shred_max = shred_max;
191 0 : args->slot_history_max = slot_history_max;
192 0 : args->mini_db_dir = mini_db_dir;
193 0 : args->copy_txn_status = copy_txn_status;
194 0 : args->rocksdb_path = rocksdb_path;
195 :
196 0 : if( args->rocksdb_path != NULL ) {
197 0 : FD_LOG_NOTICE(( "rocksdb=%s", args->rocksdb_path ));
198 0 : }
199 :
200 0 : return 0;
201 0 : }
202 :
203 : int main( int argc, char ** argv ) {
204 : /* Declaring this on the stack gets the alignment wrong when using asan */
205 : fd_ledger_args_t * args = fd_alloca( alignof(fd_ledger_args_t), sizeof(fd_ledger_args_t) );
206 : memset( args, 0, sizeof(fd_ledger_args_t) );
207 : initial_setup( argc, argv, args );
208 :
209 : if( args->cmd == NULL ) {
210 : FD_LOG_ERR(( "no command specified" ));
211 : } else if( strcmp( args->cmd, "minify" ) == 0 ) {
212 : minify( args );
213 : } else {
214 : FD_LOG_ERR(( "unknown command=%s", args->cmd ));
215 : }
216 :
217 : return 0;
218 : }
|