Line data Source code
1 : #include "../../flamenco/runtime/fd_blockstore.h"
2 : #include "../../flamenco/runtime/fd_rocksdb.h"
3 : #include <unistd.h>
4 : #include <stdio.h>
5 :
6 : /*
7 : Example:
8 : ./build/native/gcc/bin/fd_blockstore_tool batch --rocksdb-path /data/emwang/307987557/rocksdb/ --out out.csv st 308015636 en 308015650
9 :
10 : helpful:
11 : sudo /data/emwang/agave/release/agave-ledger-tool -l /data/emwang/rocksdb.tar.zst bounds
12 : - to look at slot bounds
13 : */
14 :
15 : static int
16 0 : usage( void ) {
17 0 : fprintf( stderr,
18 0 : "Usage: fd_blockstore_tool {microblock|batch|info} [options]\n"
19 0 : "\n"
20 0 : "Reads from a rocksdb path and tries to import all slots from st to en. \n"
21 0 : "Will continue if the slot does not exist in the rocksdb folder. \n"
22 0 : "It then aggregates the data into a csv file.\n"
23 0 : "\n"
24 0 : "If microblock is specified, it will aggregate the data into a csv file with the following columns:\n"
25 0 : "\tslot, batch_idx, ref_tick, hash_cnt_from_slot_start, sz, txn_cnt\n"
26 0 : "\n"
27 0 : "If batch is specified, it will aggregate the data into a csv file with the following columns:\n"
28 0 : "\tslot, ref_tick, sz, shred_cnt\n"
29 0 : "\n"
30 0 : "If info is specified, it will print the shred payload sizes and if the shred is the last in the batch to stdout\n"
31 0 : "\n"
32 0 : "Options:\n"
33 0 : " {microblock|batch|info} Type of aggregation Required\n"
34 0 : " --rocksdb-path {path} Path of rocksdb/ Required\n"
35 0 : " --out {out.csv} Output csv path Required for {microblock|batch}\n"
36 0 : " st {start_slot} Target start slot Required\n"
37 0 : " en {end_slot} Target end slot Required\n"
38 0 : "\n" );
39 0 : return 0;
40 0 : }
41 :
42 : #define INITIALIZE_BLOCKSTORE( blockstore ) \
43 0 : ulong shred_max = 1 << 17; \
44 0 : ulong idx_max = 1 << 12; \
45 0 : ulong block_max = 1 << 17; \
46 0 : ulong txn_max = 1 << 17; \
47 0 : void * mem = fd_wksp_alloc_laddr( wksp, \
48 0 : fd_blockstore_align(), \
49 0 : fd_blockstore_footprint( shred_max, \
50 0 : block_max, \
51 0 : idx_max, \
52 0 : txn_max ), \
53 0 : 1UL ); \
54 0 : FD_TEST( mem ); \
55 0 : void * shblockstore = fd_blockstore_new( mem, \
56 0 : 1UL, \
57 0 : 0UL, \
58 0 : shred_max, \
59 0 : block_max, \
60 0 : idx_max, \
61 0 : txn_max ); \
62 0 : \
63 0 : FD_TEST( shblockstore ); \
64 0 : fd_blockstore_t blockstore_ljoin; \
65 0 : fd_blockstore_t * blockstore = fd_blockstore_join( &blockstore_ljoin, shblockstore ); \
66 0 : fd_buf_shred_pool_reset( blockstore->shred_pool, 0 ); \
67 0 : FD_TEST( blockstore ); \
68 0 : int fd = open( "dummy.archv", O_RDWR | O_CREAT, 0666 ); \
69 0 : FD_TEST( fd > 0 );
70 :
71 : struct fd_batch_row {
72 : ulong slot;
73 : int ref_tick;
74 : ulong sz; /* bytes */
75 : ulong shred_cnt;
76 : };
77 : typedef struct fd_batch_row fd_batch_row_t;
78 :
79 : struct fd_entry_row {
80 : ulong slot;
81 : ulong batch_idx;
82 : int ref_tick;
83 : ulong sz; /* bytes */
84 : ulong txn_cnt;
85 : ulong hashcnt_from_slot_start;
86 : };
87 : typedef struct fd_entry_row fd_entry_row_t;
88 :
89 : static void
90 0 : entry_write_header( const char *filename ) {
91 0 : FILE *file = fopen( filename, "w" );
92 0 : if ( FD_UNLIKELY( file == NULL ) ) {
93 0 : perror( "Error opening file" );
94 0 : return;
95 0 : }
96 0 : fprintf(file, "slot,batch_idx,ref_tick,hash_count_from_start,sz,txn_cnt\n");
97 0 : fclose(file);
98 0 : }
99 :
100 : static void
101 0 : batch_write_header( const char *filename ) {
102 0 : FILE *file = fopen(filename, "w");
103 0 : if ( FD_UNLIKELY( file == NULL ) ) {
104 0 : perror("Error opening file");
105 0 : return;
106 0 : }
107 0 : fprintf(file, "slot,ref_tick,sz,shred_cnt\n");
108 0 : fclose(file);
109 0 : }
110 :
111 : static void
112 0 : batch_append_csv( const char * filename, fd_batch_row_t * row ) {
113 0 : FILE *file = fopen(filename, "a");
114 0 : if ( FD_UNLIKELY( file == NULL ) ) {
115 0 : perror("Error opening file");
116 0 : return;
117 0 : }
118 :
119 : // Write the row data to the CSV file
120 0 : fprintf(file, "%lu,%d,%lu,%lu\n",
121 0 : row->slot, row->ref_tick, row->sz, row->shred_cnt);
122 :
123 0 : fclose(file);
124 0 : }
125 :
126 : static void
127 0 : entry_append_csv( const char * filename, fd_entry_row_t * row ) {
128 0 : FILE *file = fopen(filename, "a");
129 0 : if ( FD_UNLIKELY( file == NULL ) ) {
130 0 : perror("Error opening file");
131 0 : return;
132 0 : }
133 :
134 : // Write the row data to the CSV file
135 0 : fprintf(file, "%lu,%lu,%d,%lu,%lu,%lu\n",
136 0 : row->slot, row->batch_idx, row->ref_tick, row->hashcnt_from_slot_start,row->sz, row->txn_cnt);
137 :
138 0 : fclose(file);
139 0 : }
140 :
141 : static ulong
142 0 : get_next_batch_shred_off( fd_block_shred_t * shreds, ulong shreds_cnt, ulong * curr_shred_idx ) {
143 0 : for( ulong i = *curr_shred_idx; i < shreds_cnt; i++ ) {
144 0 : if( shreds[i].hdr.data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE ) {
145 0 : *curr_shred_idx = i + 1;
146 0 : if ( i + 1 < shreds_cnt ) return shreds[i + 1].off;
147 0 : else return ULONG_MAX;
148 0 : }
149 0 : }
150 0 : return ULONG_MAX;
151 0 : }
152 :
153 : static int
154 : initialize_rocksdb( fd_wksp_t * wksp,
155 : fd_blockstore_t * blockstore,
156 : const char * folder,
157 : ulong st,
158 : ulong end,
159 0 : ulong * populated_slots_out ) {
160 0 : fd_rocksdb_t rocks_db = {0};
161 0 : fd_rocksdb_root_iter_t iter = {0};
162 :
163 0 : char * err = fd_rocksdb_init( &rocks_db, folder );
164 0 : if( err ) {
165 0 : FD_LOG_ERR(( "Failed to initialize rocksdb: %s", err ));
166 0 : return -1;
167 0 : }
168 :
169 0 : fd_rocksdb_root_iter_new( &iter );
170 0 : void * alloc_mem = fd_wksp_alloc_laddr( wksp, fd_alloc_align(), fd_alloc_footprint(), 1UL );
171 0 : fd_alloc_t * alloc = fd_alloc_join( fd_alloc_new( alloc_mem, 1UL ), 1UL );
172 0 : fd_valloc_t valloc = fd_alloc_virtual( alloc );
173 :
174 0 : fd_slot_meta_t slot_meta = { 0 };
175 0 : uchar trash_hash_buf[32];
176 0 : memset( trash_hash_buf, 0xFE, sizeof(trash_hash_buf) );
177 :
178 0 : int slot_idx = 0;
179 0 : for (ulong slot = st; slot <= end; slot++) {
180 0 : int err = fd_rocksdb_root_iter_seek( &iter, &rocks_db, slot, &slot_meta, valloc );
181 :
182 0 : if( err < 0 ) continue;
183 :
184 0 : err = fd_rocksdb_import_block_blockstore( &rocks_db, &slot_meta, blockstore, 1, trash_hash_buf, valloc );
185 0 : if( FD_UNLIKELY( err != 0) ) {
186 0 : FD_LOG_ERR(( "Failed to import block %lu", slot ));
187 0 : }
188 0 : populated_slots_out[slot_idx++] = slot;
189 0 : }
190 0 : return slot_idx;
191 0 : }
192 :
193 : static void
194 0 : aggregate_entries( fd_wksp_t * wksp, const char * folder, const char * csv, ulong st, ulong end ){
195 0 : INITIALIZE_BLOCKSTORE( blockstore );
196 0 : FD_TEST( fd_blockstore_init( blockstore, fd, FD_BLOCKSTORE_ARCHIVE_MIN_SIZE, 1UL ) );
197 :
198 0 : ulong populated_slots[end - st + 1];
199 0 : memset( populated_slots, -1, sizeof(populated_slots) );
200 0 : int slots_read = initialize_rocksdb( wksp, blockstore, folder, st, end, populated_slots );
201 :
202 0 : for( int i = 0; i < slots_read; i++ ) {
203 0 : fd_entry_row_t row = {0};
204 0 : ulong slot = populated_slots[i];
205 0 : row.slot = slot;
206 0 : fd_block_t * block = fd_blockstore_block_query( blockstore, slot );
207 0 : if (FD_UNLIKELY( !block ) ) {
208 0 : FD_LOG_WARNING(( "Block incomplete for slot %lu", slot ));
209 0 : continue;
210 0 : }
211 :
212 0 : fd_block_shred_t * shreds = fd_wksp_laddr_fast( wksp, block->shreds_gaddr );
213 0 : fd_block_micro_t * micros = fd_wksp_laddr_fast( wksp, block->micros_gaddr );
214 0 : uchar * data = fd_wksp_laddr_fast( wksp, block->data_gaddr );
215 :
216 0 : FD_LOG_DEBUG(( "SLOT: %lu", slot ));
217 :
218 : /* prepare batch boundaries */
219 0 : ulong curr_shred_idx = 0;
220 0 : ulong next_batch_shred_idx = curr_shred_idx; /* not necessary to maintain both, but could be useful */
221 0 : int curr_batch_tick = shreds[curr_shred_idx].hdr.data.flags & FD_SHRED_DATA_REF_TICK_MASK;
222 0 : ulong next_batch_off = get_next_batch_shred_off( shreds, block->shreds_cnt, &next_batch_shred_idx );
223 :
224 0 : row.batch_idx = 0;
225 0 : ulong hashcnt_from_slot_start = 0;
226 0 : for( ulong micro_idx = 0; micro_idx < block->micros_cnt; micro_idx++ ) {
227 0 : fd_block_micro_t * micro = µs[micro_idx];
228 :
229 : /* as we iterate along microblocks, advance shred ptr with us */
230 : /* if we have reached a new batch */
231 0 : if ( FD_UNLIKELY( micro->off >= next_batch_off ) ) {
232 0 : row.batch_idx++;
233 0 : FD_TEST( next_batch_shred_idx < block->shreds_cnt );
234 0 : curr_batch_tick = shreds[next_batch_shred_idx].hdr.data.flags & FD_SHRED_DATA_REF_TICK_MASK;
235 0 : curr_shred_idx = next_batch_shred_idx;
236 0 : next_batch_off = get_next_batch_shred_off( shreds, block->shreds_cnt, &next_batch_shred_idx ); // advance shred idx to next batch
237 0 : FD_LOG_DEBUG(( "New Batch - shred idx start: %lu, end: %lu, ref_tick: %d, off : %lu", curr_shred_idx, next_batch_shred_idx, curr_batch_tick, shreds[curr_shred_idx].off ));
238 :
239 0 : if( FD_UNLIKELY(next_batch_off == ULONG_MAX ) ) {
240 0 : FD_LOG_DEBUG(( "New Batch is last batch in slot" ));
241 0 : }
242 0 : }
243 :
244 0 : row.ref_tick = curr_batch_tick;
245 :
246 0 : fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)( (uchar *)data + micro->off );
247 0 : ulong hashcnt = hdr->hash_cnt;
248 0 : hashcnt_from_slot_start += hashcnt;
249 :
250 : /**
251 : Iterate through the transactions in the microblock to calculate the total payload size
252 : to handle case where there's extra stuff between microblocks
253 : */
254 :
255 0 : ulong total_sz = sizeof(fd_microblock_hdr_t);
256 0 : ulong blockoff = micro->off + sizeof(fd_microblock_hdr_t);
257 0 : for ( ulong txn_idx = 0; txn_idx < hdr->txn_cnt; txn_idx++ ) {
258 0 : ulong raw_mblk = (ulong) data + blockoff;
259 0 : uchar txn_out[FD_TXN_MAX_SZ];
260 0 : ulong pay_sz = 0;
261 0 : fd_txn_parse_core( (uchar const *) raw_mblk,
262 0 : fd_ulong_min( block->data_sz - blockoff, FD_TXN_MTU ),
263 0 : txn_out,
264 0 : NULL,
265 0 : &pay_sz );
266 0 : blockoff += pay_sz;
267 0 : total_sz += pay_sz;
268 0 : }
269 :
270 0 : row.hashcnt_from_slot_start = hashcnt_from_slot_start;
271 0 : row.txn_cnt = hdr->txn_cnt;
272 0 : row.sz = total_sz;
273 :
274 0 : if ( row.txn_cnt == 0 ) { /* truncate payload sz to 48 at all times */
275 : /* this shouldn't be needed bc of iterating txn counts above, but here to be safe */
276 0 : row.sz = 48;
277 0 : }
278 :
279 0 : entry_append_csv( csv, &row );
280 0 : FD_LOG_DEBUG(( "Entry | slot: %lu, payload_sz: %lu txn_cnt: %lu, ref_tick: %d",
281 0 : row.slot, row.sz, row.txn_cnt, row.ref_tick ));
282 0 : }
283 0 : }
284 0 : }
285 :
286 : static void
287 0 : aggregate_batch_entries( fd_wksp_t * wksp, const char * folder, const char * csv, ulong st, ulong end ){
288 0 : INITIALIZE_BLOCKSTORE( blockstore );
289 0 : FD_TEST( fd_blockstore_init( blockstore, fd, FD_BLOCKSTORE_ARCHIVE_MIN_SIZE, 1UL ) );
290 :
291 0 : ulong populated_slots[end - st + 1];
292 0 : memset( populated_slots, -1, sizeof(populated_slots) );
293 0 : int slots_read = initialize_rocksdb( wksp, blockstore, folder, st, end, populated_slots );
294 :
295 0 : fd_batch_row_t row = {0};
296 0 : fd_block_t * block = NULL;
297 0 : for( int i = 0; i < slots_read; i++ ) {
298 0 : ulong slot = populated_slots[i];
299 0 : row.slot = slot;
300 0 : block = fd_blockstore_block_query( blockstore, slot );
301 0 : if (FD_UNLIKELY( !block ) ) {
302 0 : FD_LOG_WARNING(( "Block incomplete for slot %lu", slot ));
303 0 : continue;
304 0 : }
305 :
306 0 : fd_block_shred_t * shreds = fd_wksp_laddr_fast( wksp, block->shreds_gaddr );
307 0 : ulong batch_start = 0;
308 0 : ulong batch_sz = 0;
309 0 : for ( ulong shred_idx = 0; shred_idx < block->shreds_cnt; shred_idx++ ) {
310 0 : fd_block_shred_t * shred = &shreds[shred_idx];
311 0 : batch_sz += fd_shred_payload_sz( &shred->hdr );
312 :
313 : /* batch done */
314 :
315 0 : if( shred->hdr.data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE ) {
316 0 : row.shred_cnt = shred_idx - batch_start + 1;
317 0 : row.ref_tick = ( (int)shred->hdr.data.flags &
318 0 : (int)FD_SHRED_DATA_REF_TICK_MASK );
319 :
320 0 : row.sz = batch_sz;
321 0 : batch_sz = 0;
322 0 : batch_start = shred_idx + 1;
323 :
324 0 : batch_append_csv( csv, &row );
325 :
326 0 : FD_LOG_DEBUG(( "Batch | slot: %lu, ref_tick: %d, payload_sz: %lu, shred_cnt: %lu",
327 0 : row.slot, row.ref_tick, row.sz, row.shred_cnt ));
328 0 : }
329 0 : }
330 0 : }
331 0 : }
332 :
333 : static void
334 0 : investigate_shred( fd_wksp_t * wksp, const char * folder, ulong st, ulong end ){
335 0 : INITIALIZE_BLOCKSTORE( blockstore );
336 0 : FD_TEST( fd_blockstore_init( blockstore, fd, FD_BLOCKSTORE_ARCHIVE_MIN_SIZE, 1UL ) );
337 :
338 0 : ulong populated_slots[end - st + 1];
339 0 : memset( populated_slots, -1, sizeof(populated_slots) );
340 0 : int slots_read = initialize_rocksdb( wksp, blockstore, folder, st, end, populated_slots );
341 :
342 0 : fd_block_t * block = NULL;
343 0 : for( int i = 0; i < slots_read; i++ ) {
344 0 : ulong slot = populated_slots[i];
345 0 : block = fd_blockstore_block_query( blockstore, slot );
346 0 : FD_TEST( block );
347 :
348 0 : fd_block_shred_t * shreds = fd_wksp_laddr_fast( wksp, block->shreds_gaddr );
349 0 : fd_block_micro_t * micros = fd_wksp_laddr_fast( wksp, block->micros_gaddr );
350 :
351 0 : for ( ulong shred_idx = 0; shred_idx < block->shreds_cnt; shred_idx++ ) {
352 0 : fd_block_shred_t * shred = &shreds[shred_idx];
353 :
354 0 : printf("Shred payload sz: %lu\n", fd_shred_payload_sz( &shred->hdr ));
355 0 : if( shred->hdr.data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE ) {
356 0 : printf(" -- BATCH DONE -- \n");
357 0 : }
358 0 : }
359 0 : for ( ulong micro_idx = 0; micro_idx < block->micros_cnt; micro_idx++ ) {
360 0 : fd_block_micro_t * micro = µs[micro_idx];
361 0 : FD_LOG_NOTICE(("Micro offset: %lu", micro->off));
362 0 : }
363 0 : printf("Slot done %lu\n\n", slot);
364 0 : }
365 0 : }
366 :
367 : const char *
368 0 : prepare_csv( int argc, char ** argv ) {
369 0 : const char * csv = fd_env_strip_cmdline_cstr( &argc, &argv, "--out", NULL, NULL );
370 0 : int csv_fd = open( csv, O_RDWR | O_CREAT, 0666 );
371 0 : FD_TEST( csv_fd > 0 );
372 0 : int err = ftruncate( csv_fd, 0);
373 0 : FD_TEST( err == 0 );
374 0 : return csv;
375 0 : }
376 :
377 : int
378 : main( int argc, char ** argv ) {
379 : fd_boot( &argc, &argv );
380 :
381 : ulong page_cnt = 10;
382 : char * _page_sz = "gigantic";
383 : ulong numa_idx = fd_shmem_numa_idx( 0 );
384 : FD_LOG_NOTICE( ( "Creating workspace (--page-cnt %lu, --page-sz %s, --numa-idx %lu)",
385 : page_cnt,
386 : _page_sz,
387 : numa_idx ) );
388 : fd_wksp_t * wksp = fd_wksp_new_anonymous( fd_cstr_to_shmem_page_sz( _page_sz ),
389 : page_cnt,
390 : fd_shmem_cpu_idx( numa_idx ),
391 : "wksp",
392 : 0UL );
393 : FD_TEST( wksp );
394 :
395 : if ( fd_env_strip_cmdline_contains( &argc, &argv, "--help" ) ) {
396 : return usage();
397 : }
398 :
399 : const char * folder = fd_env_strip_cmdline_cstr( &argc, &argv, "--rocksdb-path", NULL, NULL);
400 : int fd = open( folder, O_RDONLY | O_DIRECTORY, 0666 );
401 : FD_TEST( fd > 0 );
402 :
403 : ulong start = fd_env_strip_cmdline_ulong( &argc, &argv, "st", NULL, 0 );
404 : ulong end = fd_env_strip_cmdline_ulong( &argc, &argv, "en", NULL, 0 );
405 :
406 : if ( fd_env_strip_cmdline_contains(&argc, &argv, "microblock") ){
407 : const char * csv = prepare_csv(argc, argv);
408 : entry_write_header(csv);
409 : aggregate_entries( wksp , folder, csv, start, end);
410 : } else if( fd_env_strip_cmdline_contains(&argc, &argv, "batch") ){
411 : const char * csv = prepare_csv(argc, argv);
412 : batch_write_header(csv);
413 : aggregate_batch_entries( wksp, folder, csv, start, end);
414 : } else if( fd_env_strip_cmdline_contains(&argc, &argv, "info") ){
415 : investigate_shred( wksp, folder, start, end );
416 : } else {
417 : FD_LOG_WARNING(("Please specify either microblock, batch, or info in the command line. Check --help for usage." ));
418 : }
419 :
420 : fd_halt();
421 : return 0;
422 : }
|