LCOV - code coverage report
Current view: top level - flamenco/runtime - fd_blockstore_tool.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 266 0.0 %
Date: 2025-07-01 05:00:49 Functions: 0 11 0.0 %

          Line data    Source code
       1             : #include "../../flamenco/runtime/fd_blockstore.h"
       2             : #include "../../flamenco/runtime/fd_rocksdb.h"
       3             : #include <unistd.h>
       4             : #include <stdio.h>
       5             : 
       6             : /*
       7             : Example:
       8             : ./build/native/gcc/bin/fd_blockstore_tool batch --rocksdb-path /data/emwang/307987557/rocksdb/ --out out.csv st 308015636 en 308015650
       9             : 
      10             : helpful:
      11             : sudo /data/emwang/agave/release/agave-ledger-tool -l /data/emwang/rocksdb.tar.zst bounds
      12             :  - to look at slot bounds
      13             : */
      14             : 
      15             : static int
      16           0 : usage( void ) {
      17           0 :   fprintf( stderr,
      18           0 :     "Usage: fd_blockstore_tool {microblock|batch|info} [options]\n"
      19           0 :     "\n"
      20           0 :     "Reads from a rocksdb path and tries to import all slots from st to en. \n"
      21           0 :     "Will continue if the slot does not exist in the rocksdb folder. \n"
      22           0 :     "It then aggregates the data into a csv file.\n"
      23           0 :     "\n"
      24           0 :     "If microblock is specified, it will aggregate the data into a csv file with the following columns:\n"
      25           0 :     "\tslot, batch_idx, ref_tick, hash_cnt_from_slot_start, sz, txn_cnt\n"
      26           0 :     "\n"
      27           0 :     "If batch is specified, it will aggregate the data into a csv file with the following columns:\n"
      28           0 :     "\tslot, ref_tick, sz, shred_cnt\n"
      29           0 :     "\n"
      30           0 :     "If info is specified, it will print the shred payload sizes and if the shred is the last in the batch to stdout\n"
      31           0 :     "\n"
      32           0 :     "Options:\n"
      33           0 :     "  {microblock|batch|info}                  Type of aggregation         Required\n"
      34           0 :     "  --rocksdb-path {path}                    Path of rocksdb/            Required\n"
      35           0 :     "  --out          {out.csv}                 Output csv path             Required for {microblock|batch}\n"
      36           0 :     "  st             {start_slot}              Target start slot           Required\n"
      37           0 :     "  en             {end_slot}                Target end slot             Required\n"
      38           0 :     "\n" );
      39           0 :   return 0;
      40           0 : }
      41             : 
      42             : #define INITIALIZE_BLOCKSTORE( blockstore )                                              \
      43           0 :     ulong shred_max = 1 << 17;                                                           \
      44           0 :     ulong idx_max = 1 << 12;                                                             \
      45           0 :     ulong block_max = 1 << 17;                                                           \
      46           0 :     ulong txn_max = 1 << 17;                                                             \
      47           0 :     void * mem = fd_wksp_alloc_laddr( wksp,                                              \
      48           0 :                                       fd_blockstore_align(),                             \
      49           0 :                                       fd_blockstore_footprint( shred_max,                \
      50           0 :                                                                block_max,                \
      51           0 :                                                                idx_max,                  \
      52           0 :                                                                txn_max ),                \
      53           0 :                                       1UL );                                             \
      54           0 :     FD_TEST( mem );                                                                      \
      55           0 :     void * shblockstore = fd_blockstore_new( mem,                                        \
      56           0 :                                              1UL,                                        \
      57           0 :                                              0UL,                                        \
      58           0 :                                              shred_max,                                  \
      59           0 :                                              block_max,                                  \
      60           0 :                                              idx_max,                                    \
      61           0 :                                              txn_max );                                  \
      62           0 :                                                                                          \
      63           0 :     FD_TEST( shblockstore );                                                             \
      64           0 :     fd_blockstore_t   blockstore_ljoin;                                                  \
      65           0 :     fd_blockstore_t * blockstore = fd_blockstore_join( &blockstore_ljoin, shblockstore ); \
      66           0 :     fd_buf_shred_pool_reset( blockstore->shred_pool, 0 );                                \
      67           0 :     FD_TEST( blockstore );                                                               \
      68           0 :     int fd = open( "dummy.archv", O_RDWR | O_CREAT, 0666 );                              \
      69           0 :     FD_TEST( fd > 0 );
      70             : 
      71             : struct fd_batch_row {
      72             :   ulong slot;
      73             :   int   ref_tick;
      74             :   ulong sz;        /* bytes */
      75             :   ulong shred_cnt;
      76             : };
      77             : typedef struct fd_batch_row fd_batch_row_t;
      78             : 
      79             : struct fd_entry_row {
      80             :   ulong slot;
      81             :   ulong batch_idx;
      82             :   int   ref_tick;
      83             :   ulong sz;        /* bytes */
      84             :   ulong txn_cnt;
      85             :   ulong hashcnt_from_slot_start;
      86             : };
      87             : typedef struct fd_entry_row fd_entry_row_t;
      88             : 
      89             : static void
      90           0 : entry_write_header( const char *filename ) {
      91           0 :     FILE *file = fopen( filename, "w" );
      92           0 :     if ( FD_UNLIKELY( file == NULL ) ) {
      93           0 :         perror( "Error opening file" );
      94           0 :         return;
      95           0 :     }
      96           0 :     fprintf(file, "slot,batch_idx,ref_tick,hash_count_from_start,sz,txn_cnt\n");
      97           0 :     fclose(file);
      98           0 : }
      99             : 
     100             : static void
     101           0 : batch_write_header( const char *filename ) {
     102           0 :     FILE *file = fopen(filename, "w");
     103           0 :     if ( FD_UNLIKELY( file == NULL ) ) {
     104           0 :         perror("Error opening file");
     105           0 :         return;
     106           0 :     }
     107           0 :     fprintf(file, "slot,ref_tick,sz,shred_cnt\n");
     108           0 :     fclose(file);
     109           0 : }
     110             : 
     111             : static void
     112           0 : batch_append_csv( const char * filename, fd_batch_row_t * row ) {
     113           0 :     FILE *file = fopen(filename, "a");
     114           0 :     if ( FD_UNLIKELY( file == NULL ) ) {
     115           0 :         perror("Error opening file");
     116           0 :         return;
     117           0 :     }
     118             : 
     119             :     // Write the row data to the CSV file
     120           0 :     fprintf(file, "%lu,%d,%lu,%lu\n",
     121           0 :             row->slot, row->ref_tick, row->sz, row->shred_cnt);
     122             : 
     123           0 :     fclose(file);
     124           0 : }
     125             : 
     126             : static void
     127           0 : entry_append_csv( const char * filename, fd_entry_row_t * row ) {
     128           0 :     FILE *file = fopen(filename, "a");
     129           0 :     if ( FD_UNLIKELY( file == NULL ) ) {
     130           0 :         perror("Error opening file");
     131           0 :         return;
     132           0 :     }
     133             : 
     134             :     // Write the row data to the CSV file
     135           0 :     fprintf(file, "%lu,%lu,%d,%lu,%lu,%lu\n",
     136           0 :             row->slot, row->batch_idx, row->ref_tick, row->hashcnt_from_slot_start,row->sz, row->txn_cnt);
     137             : 
     138           0 :     fclose(file);
     139           0 : }
     140             : 
     141             : static ulong
     142           0 : get_next_batch_shred_off( fd_block_shred_t * shreds, ulong shreds_cnt, ulong * curr_shred_idx ) {
     143           0 :   for( ulong i = *curr_shred_idx; i < shreds_cnt; i++ ) {
     144           0 :     if( shreds[i].hdr.data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE ) {
     145           0 :       *curr_shred_idx = i + 1;
     146           0 :       if ( i + 1 < shreds_cnt ) return shreds[i + 1].off;
     147           0 :       else return ULONG_MAX;
     148           0 :     }
     149           0 :   }
     150           0 :   return ULONG_MAX;
     151           0 : }
     152             : 
     153             : static int
     154             : initialize_rocksdb( fd_wksp_t * wksp,
     155             :                     fd_blockstore_t * blockstore,
     156             :                     const char * folder,
     157             :                     ulong st,
     158             :                     ulong end,
     159           0 :                     ulong * populated_slots_out ) {
     160           0 :   fd_rocksdb_t           rocks_db         = {0};
     161           0 :   fd_rocksdb_root_iter_t iter             = {0};
     162             : 
     163           0 :   char * err = fd_rocksdb_init( &rocks_db, folder );
     164           0 :   if( err ) {
     165           0 :     FD_LOG_ERR(( "Failed to initialize rocksdb: %s", err ));
     166           0 :     return -1;
     167           0 :   }
     168             : 
     169           0 :   fd_rocksdb_root_iter_new( &iter );
     170           0 :   void *       alloc_mem = fd_wksp_alloc_laddr( wksp, fd_alloc_align(), fd_alloc_footprint(), 1UL );
     171           0 :   fd_alloc_t * alloc     = fd_alloc_join( fd_alloc_new( alloc_mem, 1UL ), 1UL );
     172           0 :   fd_valloc_t  valloc    = fd_alloc_virtual( alloc );
     173             : 
     174           0 :   fd_slot_meta_t slot_meta = { 0 };
     175           0 :   uchar trash_hash_buf[32];
     176           0 :   memset( trash_hash_buf, 0xFE, sizeof(trash_hash_buf) );
     177             : 
     178           0 :   int slot_idx = 0;
     179           0 :   for (ulong slot = st; slot <= end; slot++) {
     180           0 :     int err = fd_rocksdb_root_iter_seek( &iter, &rocks_db, slot, &slot_meta, valloc );
     181             : 
     182           0 :     if( err < 0 ) continue;
     183             : 
     184           0 :     err = fd_rocksdb_import_block_blockstore( &rocks_db, &slot_meta, blockstore, 1, trash_hash_buf, valloc );
     185           0 :     if( FD_UNLIKELY( err != 0) ) {
     186           0 :       FD_LOG_ERR(( "Failed to import block %lu", slot ));
     187           0 :     }
     188           0 :     populated_slots_out[slot_idx++] = slot;
     189           0 :   }
     190           0 :   return slot_idx;
     191           0 : }
     192             : 
     193             : static void
     194           0 : aggregate_entries( fd_wksp_t * wksp, const char * folder, const char * csv, ulong st, ulong end ){
     195           0 :     INITIALIZE_BLOCKSTORE( blockstore );
     196           0 :     FD_TEST( fd_blockstore_init( blockstore, fd, FD_BLOCKSTORE_ARCHIVE_MIN_SIZE, 1UL ) );
     197             : 
     198           0 :     ulong populated_slots[end - st + 1];
     199           0 :     memset( populated_slots, -1, sizeof(populated_slots) );
     200           0 :     int slots_read = initialize_rocksdb( wksp, blockstore, folder, st, end, populated_slots );
     201             : 
     202           0 :     for( int i = 0; i < slots_read; i++ ) {
     203           0 :       fd_entry_row_t row = {0};
     204           0 :       ulong slot         = populated_slots[i];
     205           0 :       row.slot           = slot;
     206           0 :       fd_block_t * block = fd_blockstore_block_query( blockstore, slot );
     207           0 :       if (FD_UNLIKELY( !block ) ) {
     208           0 :         FD_LOG_WARNING(( "Block incomplete for slot %lu", slot ));
     209           0 :         continue;
     210           0 :       }
     211             : 
     212           0 :       fd_block_shred_t * shreds = fd_wksp_laddr_fast( wksp, block->shreds_gaddr );
     213           0 :       fd_block_micro_t * micros = fd_wksp_laddr_fast( wksp, block->micros_gaddr );
     214           0 :       uchar * data              = fd_wksp_laddr_fast( wksp, block->data_gaddr );
     215             : 
     216           0 :       FD_LOG_DEBUG(( "SLOT: %lu", slot ));
     217             : 
     218             :      /* prepare batch boundaries */
     219           0 :       ulong curr_shred_idx       = 0;
     220           0 :       ulong next_batch_shred_idx = curr_shred_idx;  /* not necessary to maintain both, but could be useful */
     221           0 :       int   curr_batch_tick      = shreds[curr_shred_idx].hdr.data.flags & FD_SHRED_DATA_REF_TICK_MASK;
     222           0 :       ulong next_batch_off       = get_next_batch_shred_off( shreds, block->shreds_cnt, &next_batch_shred_idx );
     223             : 
     224           0 :       row.batch_idx                 = 0;
     225           0 :       ulong hashcnt_from_slot_start = 0;
     226           0 :       for( ulong micro_idx = 0; micro_idx < block->micros_cnt; micro_idx++ ) {
     227           0 :         fd_block_micro_t * micro = &micros[micro_idx];
     228             : 
     229             :         /* as we iterate along microblocks, advance shred ptr with us */
     230             :         /* if we have reached a new batch  */
     231           0 :         if ( FD_UNLIKELY( micro->off >= next_batch_off ) ) {
     232           0 :           row.batch_idx++;
     233           0 :           FD_TEST( next_batch_shred_idx < block->shreds_cnt );
     234           0 :           curr_batch_tick = shreds[next_batch_shred_idx].hdr.data.flags & FD_SHRED_DATA_REF_TICK_MASK;
     235           0 :           curr_shred_idx  = next_batch_shred_idx;
     236           0 :           next_batch_off  = get_next_batch_shred_off( shreds, block->shreds_cnt, &next_batch_shred_idx ); // advance shred idx to next batch
     237           0 :           FD_LOG_DEBUG(( "New Batch - shred idx start: %lu, end: %lu, ref_tick: %d, off : %lu", curr_shred_idx, next_batch_shred_idx, curr_batch_tick, shreds[curr_shred_idx].off ));
     238             : 
     239           0 :           if( FD_UNLIKELY(next_batch_off == ULONG_MAX ) ) {
     240           0 :             FD_LOG_DEBUG(( "New Batch is last batch in slot" ));
     241           0 :           }
     242           0 :         }
     243             : 
     244           0 :         row.ref_tick = curr_batch_tick;
     245             : 
     246           0 :         fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)( (uchar *)data + micro->off );
     247           0 :         ulong hashcnt             = hdr->hash_cnt;
     248           0 :         hashcnt_from_slot_start  += hashcnt;
     249             : 
     250             :         /**
     251             :          Iterate through the transactions in the microblock to calculate the total payload size
     252             :          to handle case where there's extra stuff between microblocks
     253             :          */
     254             : 
     255           0 :         ulong total_sz = sizeof(fd_microblock_hdr_t);
     256           0 :         ulong blockoff = micro->off + sizeof(fd_microblock_hdr_t);
     257           0 :         for ( ulong txn_idx = 0; txn_idx < hdr->txn_cnt; txn_idx++ ) {
     258           0 :           ulong raw_mblk = (ulong) data + blockoff;
     259           0 :           uchar txn_out[FD_TXN_MAX_SZ];
     260           0 :           ulong pay_sz = 0;
     261           0 :           fd_txn_parse_core( (uchar const *) raw_mblk,
     262           0 :                              fd_ulong_min( block->data_sz - blockoff, FD_TXN_MTU ),
     263           0 :                              txn_out,
     264           0 :                              NULL,
     265           0 :                              &pay_sz );
     266           0 :           blockoff += pay_sz;
     267           0 :           total_sz += pay_sz;
     268           0 :         }
     269             : 
     270           0 :         row.hashcnt_from_slot_start = hashcnt_from_slot_start;
     271           0 :         row.txn_cnt                 = hdr->txn_cnt;
     272           0 :         row.sz                      = total_sz;
     273             : 
     274           0 :         if ( row.txn_cnt == 0 ) { /* truncate payload sz to 48 at all times */
     275             :           /* this shouldn't be needed bc of iterating txn counts above, but here to be safe */
     276           0 :           row.sz = 48;
     277           0 :         }
     278             : 
     279           0 :         entry_append_csv( csv, &row );
     280           0 :         FD_LOG_DEBUG(( "Entry | slot: %lu, payload_sz: %lu txn_cnt: %lu, ref_tick: %d",
     281           0 :                         row.slot, row.sz, row.txn_cnt, row.ref_tick ));
     282           0 :       }
     283           0 :     }
     284           0 : }
     285             : 
     286             : static void
     287           0 : aggregate_batch_entries( fd_wksp_t * wksp, const char * folder, const char * csv, ulong st, ulong end ){
     288           0 :   INITIALIZE_BLOCKSTORE( blockstore );
     289           0 :   FD_TEST( fd_blockstore_init( blockstore, fd, FD_BLOCKSTORE_ARCHIVE_MIN_SIZE, 1UL ) );
     290             : 
     291           0 :   ulong populated_slots[end - st + 1];
     292           0 :   memset( populated_slots, -1, sizeof(populated_slots) );
     293           0 :   int slots_read = initialize_rocksdb( wksp, blockstore, folder, st, end, populated_slots );
     294             : 
     295           0 :   fd_batch_row_t row = {0};
     296           0 :   fd_block_t * block = NULL;
     297           0 :   for( int i = 0; i < slots_read; i++ ) {
     298           0 :     ulong slot = populated_slots[i];
     299           0 :     row.slot   = slot;
     300           0 :     block      = fd_blockstore_block_query( blockstore, slot );
     301           0 :     if (FD_UNLIKELY( !block ) ) {
     302           0 :       FD_LOG_WARNING(( "Block incomplete for slot %lu", slot ));
     303           0 :       continue;
     304           0 :     }
     305             : 
     306           0 :     fd_block_shred_t * shreds = fd_wksp_laddr_fast( wksp, block->shreds_gaddr );
     307           0 :     ulong batch_start         = 0;
     308           0 :     ulong batch_sz            = 0;
     309           0 :     for ( ulong shred_idx = 0; shred_idx < block->shreds_cnt; shred_idx++ ) {
     310           0 :       fd_block_shred_t * shred = &shreds[shred_idx];
     311           0 :       batch_sz += fd_shred_payload_sz( &shred->hdr );
     312             : 
     313             :       /* batch done */
     314             : 
     315           0 :       if( shred->hdr.data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE ) {
     316           0 :         row.shred_cnt = shred_idx - batch_start + 1;
     317           0 :         row.ref_tick  = ( (int)shred->hdr.data.flags &
     318           0 :                                       (int)FD_SHRED_DATA_REF_TICK_MASK );
     319             : 
     320           0 :         row.sz        = batch_sz;
     321           0 :         batch_sz      = 0;
     322           0 :         batch_start   = shred_idx + 1;
     323             : 
     324           0 :         batch_append_csv( csv, &row );
     325             : 
     326           0 :         FD_LOG_DEBUG(( "Batch | slot: %lu, ref_tick: %d, payload_sz: %lu, shred_cnt: %lu",
     327           0 :                             row.slot, row.ref_tick, row.sz, row.shred_cnt ));
     328           0 :       }
     329           0 :     }
     330           0 :   }
     331           0 : }
     332             : 
     333             : static void
     334           0 : investigate_shred( fd_wksp_t * wksp, const char * folder, ulong st, ulong end ){
     335           0 :   INITIALIZE_BLOCKSTORE( blockstore );
     336           0 :   FD_TEST( fd_blockstore_init( blockstore, fd, FD_BLOCKSTORE_ARCHIVE_MIN_SIZE, 1UL ) );
     337             : 
     338           0 :   ulong populated_slots[end - st + 1];
     339           0 :   memset( populated_slots, -1, sizeof(populated_slots) );
     340           0 :   int slots_read = initialize_rocksdb( wksp, blockstore, folder, st, end, populated_slots );
     341             : 
     342           0 :   fd_block_t * block = NULL;
     343           0 :   for( int i = 0; i < slots_read; i++ ) {
     344           0 :     ulong slot = populated_slots[i];
     345           0 :     block      = fd_blockstore_block_query( blockstore, slot );
     346           0 :     FD_TEST( block );
     347             : 
     348           0 :     fd_block_shred_t * shreds = fd_wksp_laddr_fast( wksp, block->shreds_gaddr );
     349           0 :     fd_block_micro_t * micros = fd_wksp_laddr_fast( wksp, block->micros_gaddr );
     350             : 
     351           0 :     for ( ulong shred_idx = 0; shred_idx < block->shreds_cnt; shred_idx++ ) {
     352           0 :       fd_block_shred_t * shred = &shreds[shred_idx];
     353             : 
     354           0 :       printf("Shred payload sz: %lu\n", fd_shred_payload_sz( &shred->hdr ));
     355           0 :       if( shred->hdr.data.flags & FD_SHRED_DATA_FLAG_DATA_COMPLETE ) {
     356           0 :         printf(" -- BATCH DONE -- \n");
     357           0 :       }
     358           0 :     }
     359           0 :     for ( ulong micro_idx = 0; micro_idx < block->micros_cnt; micro_idx++ ) {
     360           0 :       fd_block_micro_t * micro = &micros[micro_idx];
     361           0 :       FD_LOG_NOTICE(("Micro offset: %lu", micro->off));
     362           0 :     }
     363           0 :     printf("Slot done %lu\n\n", slot);
     364           0 :   }
     365           0 : }
     366             : 
     367             : const char *
     368           0 : prepare_csv( int argc, char ** argv ) {
     369           0 :   const char * csv = fd_env_strip_cmdline_cstr( &argc, &argv, "--out", NULL, NULL );
     370           0 :   int csv_fd = open( csv, O_RDWR | O_CREAT, 0666 );
     371           0 :   FD_TEST( csv_fd > 0 );
     372           0 :   int err = ftruncate( csv_fd, 0);
     373           0 :   FD_TEST( err == 0 );
     374           0 :   return csv;
     375           0 : }
     376             : 
     377             : int
     378             : main( int argc, char ** argv ) {
     379             :   fd_boot( &argc, &argv );
     380             : 
     381             :   ulong  page_cnt = 10;
     382             :   char * _page_sz = "gigantic";
     383             :   ulong  numa_idx = fd_shmem_numa_idx( 0 );
     384             :   FD_LOG_NOTICE( ( "Creating workspace (--page-cnt %lu, --page-sz %s, --numa-idx %lu)",
     385             :                    page_cnt,
     386             :                    _page_sz,
     387             :                    numa_idx ) );
     388             :   fd_wksp_t * wksp = fd_wksp_new_anonymous( fd_cstr_to_shmem_page_sz( _page_sz ),
     389             :                                             page_cnt,
     390             :                                             fd_shmem_cpu_idx( numa_idx ),
     391             :                                             "wksp",
     392             :                                             0UL );
     393             :   FD_TEST( wksp );
     394             : 
     395             :   if ( fd_env_strip_cmdline_contains( &argc, &argv, "--help" ) ) {
     396             :     return usage();
     397             :   }
     398             : 
     399             :   const char * folder = fd_env_strip_cmdline_cstr( &argc, &argv, "--rocksdb-path", NULL, NULL);
     400             :   int fd = open( folder, O_RDONLY | O_DIRECTORY, 0666 );
     401             :   FD_TEST( fd > 0 );
     402             : 
     403             :   ulong start = fd_env_strip_cmdline_ulong( &argc, &argv, "st", NULL, 0 );
     404             :   ulong end   = fd_env_strip_cmdline_ulong( &argc, &argv, "en", NULL, 0 );
     405             : 
     406             :   if ( fd_env_strip_cmdline_contains(&argc, &argv, "microblock") ){
     407             :     const char * csv = prepare_csv(argc, argv);
     408             :     entry_write_header(csv);
     409             :     aggregate_entries( wksp , folder, csv, start, end);
     410             :   } else if( fd_env_strip_cmdline_contains(&argc, &argv, "batch") ){
     411             :     const char * csv = prepare_csv(argc, argv);
     412             :     batch_write_header(csv);
     413             :     aggregate_batch_entries( wksp, folder, csv, start, end);
     414             :   } else if( fd_env_strip_cmdline_contains(&argc, &argv, "info") ){
     415             :     investigate_shred( wksp, folder, start, end );
     416             :   } else {
     417             :     FD_LOG_WARNING(("Please specify either microblock, batch, or info in the command line. Check --help for usage." ));
     418             :   }
     419             : 
     420             :   fd_halt();
     421             :   return 0;
     422             : }

Generated by: LCOV version 1.14