LCOV - code coverage report
Current view: top level - app/ledger - main.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 166 0.0 %
Date: 2025-08-05 05:04:49 Functions: 0 5 0.0 %

          Line data    Source code
       1             : #include "../../flamenco/types/fd_types.h"
       2             : #include "../../flamenco/runtime/fd_rocksdb.h"
       3             : #include "../../flamenco/runtime/context/fd_capture_ctx.h"
       4             : #include <unistd.h>
       5             : #include <sys/stat.h>
       6             : 
       7             : struct fd_ledger_args {
       8             :   fd_wksp_t *           wksp;                    /* wksp for blockstore */
       9             :   char const *          cmd;                     /* user passed command to fd_ledger */
      10             :   ulong                 start_slot;              /* start slot for offline replay */
      11             :   ulong                 end_slot;                /* end slot for offline replay */
      12             :   uint                  hashseed;                /* hashseed */
      13             :   char const *          restore;                 /* wksp restore */
      14             :   ulong                 shred_max;               /* maximum number of shreds*/
      15             :   ulong                 slot_history_max;        /* number of slots stored by blockstore*/
      16             :   char const *          mini_db_dir;             /* path to minifed rocksdb that's to be created */
      17             :   int                   copy_txn_status;         /* determine if txns should be copied to the blockstore during minify/replay */
      18             :   ulong                 trash_hash;              /* trash hash to be used for negative cases*/
      19             :   char const *          rocksdb_path;            /* path to rocksdb directory */
      20             :   fd_valloc_t           valloc; /* wksp valloc that should NOT be used for runtime allocations */
      21             : };
      22             : typedef struct fd_ledger_args fd_ledger_args_t;
      23             : 
      24             : /***************************** Helpers ****************************************/
      25             : static fd_valloc_t
      26           0 : allocator_setup( fd_wksp_t * wksp ) {
      27             : 
      28           0 :   if( FD_UNLIKELY( !wksp ) ) {
      29           0 :     FD_LOG_ERR(( "workspace is NULL" ));
      30           0 :   }
      31             : 
      32           0 :   void * alloc_shmem = fd_wksp_alloc_laddr( wksp, fd_alloc_align(), fd_alloc_footprint(), 3UL );
      33           0 :   if( FD_UNLIKELY( !alloc_shmem ) ) { FD_LOG_ERR( ( "fd_alloc too large for workspace" ) ); }
      34           0 :   void * alloc_shalloc = fd_alloc_new( alloc_shmem, 3UL );
      35           0 :   if( FD_UNLIKELY( !alloc_shalloc ) ) { FD_LOG_ERR( ( "fd_alloc_new failed" ) ); }
      36           0 :   fd_alloc_t * alloc = fd_alloc_join( alloc_shalloc, 3UL );
      37           0 :   if( FD_UNLIKELY( !alloc ) ) { FD_LOG_ERR( ( "fd_alloc_join failed" ) ); }
      38           0 :   fd_valloc_t valloc = fd_alloc_virtual( alloc );
      39           0 :   return valloc;
      40             : 
      41             :   /* NOTE: Enable this if leak hunting */
      42             :   //return fd_backtracing_alloc_virtual( &valloc );
      43             : 
      44           0 : }
      45             : 
      46             : void
      47             : ingest_rocksdb( char const *      file,
      48             :                 ulong             start_slot,
      49             :                 ulong             end_slot,
      50             :                 FD_PARAM_UNUSED ulong             trash_hash,
      51           0 :                 fd_valloc_t       valloc ) {
      52             : 
      53           0 :   fd_rocksdb_t rocks_db;
      54           0 :   char * err = fd_rocksdb_init( &rocks_db, file );
      55           0 :   if( FD_UNLIKELY( err!=NULL ) ) {
      56           0 :     FD_LOG_ERR(( "fd_rocksdb_init returned %s", err ));
      57           0 :   }
      58             : 
      59           0 :   ulong last_slot = fd_rocksdb_last_slot( &rocks_db, &err );
      60           0 :   if( FD_UNLIKELY( err!=NULL ) ) {
      61           0 :     FD_LOG_ERR(( "fd_rocksdb_last_slot returned %s", err ));
      62           0 :   }
      63             : 
      64           0 :   if( last_slot < start_slot ) {
      65           0 :     FD_LOG_ERR(( "rocksdb blocks are older than snapshot. first=%lu last=%lu wanted=%lu",
      66           0 :                  fd_rocksdb_first_slot(&rocks_db, &err), last_slot, start_slot ));
      67           0 :   }
      68             : 
      69           0 :   FD_LOG_NOTICE(( "ingesting rocksdb from start=%lu to end=%lu", start_slot, end_slot ));
      70             : 
      71           0 :   fd_rocksdb_root_iter_t iter = {0};
      72           0 :   fd_rocksdb_root_iter_new( &iter );
      73             : 
      74           0 :   fd_slot_meta_t slot_meta = {0};
      75           0 :   fd_memset( &slot_meta, 0, sizeof(slot_meta) );
      76             : 
      77           0 :   int block_found = -1;
      78           0 :   while ( block_found!=0 && start_slot<=end_slot ) {
      79           0 :     block_found = fd_rocksdb_root_iter_seek( &iter, &rocks_db, start_slot, &slot_meta, valloc );
      80           0 :     if ( block_found!=0 ) {
      81           0 :       start_slot++;
      82           0 :     }
      83           0 :   }
      84           0 :   if( FD_UNLIKELY( block_found!=0 ) ) {
      85           0 :     FD_LOG_ERR(( "unable to seek to any slot" ));
      86           0 :   }
      87             : 
      88           0 :   uchar trash_hash_buf[32];
      89           0 :   memset( trash_hash_buf, 0xFE, sizeof(trash_hash_buf) );
      90             : 
      91           0 :   ulong blk_cnt = 0;
      92           0 :   do {
      93           0 :     ulong slot = slot_meta.slot;
      94           0 :     if( slot > end_slot ) {
      95           0 :       break;
      96           0 :     }
      97             : 
      98             :     /* Read and deshred block from RocksDB */
      99           0 :     if( blk_cnt % 100 == 0 ) {
     100           0 :       FD_LOG_WARNING(( "imported %lu blocks", blk_cnt ));
     101           0 :     }
     102             : 
     103           0 :     if( FD_UNLIKELY( err ) ) {
     104           0 :       FD_LOG_ERR(( "fd_rocksdb_get_block failed" ));
     105           0 :     }
     106             : 
     107           0 :     ++blk_cnt;
     108             : 
     109           0 :     memset( &slot_meta, 0, sizeof(fd_slot_meta_t) );
     110             : 
     111           0 :     int ret = fd_rocksdb_root_iter_next( &iter, &slot_meta, valloc );
     112           0 :     if( ret < 0 ) {
     113             :       // FD_LOG_WARNING(("Failed for slot %lu", slot + 1));
     114           0 :       ret = fd_rocksdb_get_meta( &rocks_db, slot + 1, &slot_meta, valloc );
     115           0 :       if( ret < 0 ) {
     116           0 :         break;
     117           0 :       }
     118           0 :     }
     119             :       // FD_LOG_ERR(("fd_rocksdb_root_iter_seek returned %d", ret));
     120           0 :   } while (1);
     121             : 
     122           0 :   fd_rocksdb_root_iter_destroy( &iter );
     123           0 :   fd_rocksdb_destroy( &rocks_db );
     124             : 
     125           0 :   FD_LOG_NOTICE(( "ingested %lu blocks", blk_cnt ));
     126           0 : }
     127             : 
     128             : // void
     129             : // init_blockstore( fd_ledger_args_t * args ) {
     130             : //   fd_wksp_tag_query_info_t info;
     131             : //   ulong blockstore_tag = FD_BLOCKSTORE_MAGIC;
     132             : //   void * shmem;
     133             : //   if( fd_wksp_tag_query( args->wksp, &blockstore_tag, 1, &info, 1 ) > 0 ) {
     134             : //     shmem = fd_wksp_laddr_fast( args->wksp, info.gaddr_lo );
     135             : //     args->blockstore = fd_blockstore_join( &args->blockstore_ljoin, shmem );
     136             : //     if( args->blockstore->shmem->magic != FD_BLOCKSTORE_MAGIC ) {
     137             : //       FD_LOG_ERR(( "failed to join a blockstore" ));
     138             : //     }
     139             : //     FD_LOG_NOTICE(( "joined blockstore" ));
     140             : //   } else {
     141             : //     shmem = fd_wksp_alloc_laddr( args->wksp, fd_blockstore_align(), fd_blockstore_footprint( args->shred_max, args->slot_history_max, 16 ), blockstore_tag );
     142             : //     if( shmem == NULL ) {
     143             : //       FD_LOG_ERR(( "failed to allocate a blockstore" ));
     144             : //     }
     145             : //     args->blockstore = fd_blockstore_join( &args->blockstore_ljoin, fd_blockstore_new( shmem, 1, args->hashseed, args->shred_max, args->slot_history_max, 16 ) );
     146             : //     if( args->blockstore->shmem->magic != FD_BLOCKSTORE_MAGIC ) {
     147             : //       fd_wksp_free_laddr( shmem );
     148             : //       FD_LOG_ERR(( "failed to allocate a blockstore" ));
     149             : //     }
     150             : //     FD_LOG_NOTICE(( "allocating a new blockstore" ));
     151             : //   }
     152             : // }
     153             : 
     154             : void
     155           0 : wksp_restore( fd_ledger_args_t * args ) {
     156           0 :   if( args->restore != NULL ) {
     157           0 :     FD_LOG_NOTICE(( "restoring wksp %s", args->restore ));
     158           0 :     fd_wksp_restore( args->wksp, args->restore, args->hashseed );
     159           0 :   }
     160           0 : }
     161             : 
     162             : /********************* Main Command Functions and Setup ***********************/
     163             : void
     164           0 : minify( fd_ledger_args_t * args ) {
     165             :     /* Example commmand:
     166             :     fd_ledger --cmd minify --rocksdb <LARGE_ROCKSDB> --minified-rocksdb <MINI_ROCKSDB>
     167             :               --start-slot <START_SLOT> --end-slot <END_SLOT> --copy-txn-status 1
     168             :   */
     169           0 :   if( args->rocksdb_path == NULL ) {
     170           0 :     FD_LOG_ERR(( "rocksdb path is NULL" ));
     171           0 :   }
     172           0 :   if( args->mini_db_dir == NULL ) {
     173           0 :     FD_LOG_ERR(( "minified rocksdb path is NULL" ));
     174           0 :   }
     175             : 
     176           0 :   args->valloc = allocator_setup( args->wksp );
     177             : 
     178           0 :   fd_rocksdb_t big_rocksdb;
     179           0 :   char * err = fd_rocksdb_init( &big_rocksdb, args->rocksdb_path );
     180           0 :   if( FD_UNLIKELY( err!=NULL ) ) {
     181           0 :     FD_LOG_ERR(( "fd_rocksdb_init at path=%s returned error=%s", args->rocksdb_path, err ));
     182           0 :   }
     183             : 
     184             :   /* If the directory for the minified rocksdb already exists, error out */
     185           0 :   struct stat statbuf;
     186           0 :   if( stat( args->mini_db_dir, &statbuf ) == 0 ) {
     187           0 :     FD_LOG_ERR(( "path for mini_db_dir=%s already exists", args->mini_db_dir ));
     188           0 :   }
     189             : 
     190             :   /* Create a new smaller rocksdb */
     191           0 :   fd_rocksdb_t mini_rocksdb;
     192           0 :   fd_rocksdb_new( &mini_rocksdb, args->mini_db_dir );
     193             : 
     194             :   /* Correctly bound off start and end slot */
     195           0 :   ulong first_slot = fd_rocksdb_first_slot( &big_rocksdb, &err );
     196           0 :   ulong last_slot  = fd_rocksdb_last_slot( &big_rocksdb, &err );
     197           0 :   if( args->start_slot < first_slot ) { args->start_slot = first_slot; }
     198           0 :   if( args->end_slot > last_slot )    { args->end_slot = last_slot; }
     199             : 
     200           0 :   FD_LOG_NOTICE(( "copying over rocks db for range [%lu, %lu]", args->start_slot, args->end_slot ));
     201             : 
     202             :   /* Copy over all slot indexed columns */
     203           0 :   for( ulong cf_idx = 1; cf_idx < FD_ROCKSDB_CF_CNT; ++cf_idx ) {
     204           0 :     fd_rocksdb_copy_over_slot_indexed_range( &big_rocksdb, &mini_rocksdb, cf_idx,
     205           0 :                                               args->start_slot, args->end_slot );
     206           0 :   }
     207           0 :   FD_LOG_NOTICE(("copied over all slot indexed columns"));
     208             : 
     209             :   /* Copy over transactions. This is more complicated because first, a temporary
     210             :       blockstore will be populated. This will be used to look up transactions
     211             :       which can be quickly queried */
     212           0 :   if( args->copy_txn_status ) {
     213             :     // /* Ingest block range into blockstore */
     214             :     // ingest_rocksdb( args->rocksdb_path,
     215             :     //                 args->start_slot,
     216             :     //                 args->end_slot,
     217             :     //                 args->blockstore,
     218             :     //                 ULONG_MAX,
     219             :     //                 args->valloc );
     220             : 
     221           0 :   } else {
     222           0 :     FD_LOG_NOTICE(( "skipping copying of transaction statuses" ));
     223           0 :   }
     224             : 
     225             :   /* TODO: Currently, the address signatures column family isn't copied as it
     226             :            is indexed on the pubkey. */
     227             : 
     228           0 :   fd_rocksdb_destroy( &big_rocksdb );
     229           0 :   fd_rocksdb_destroy( &mini_rocksdb );
     230           0 : }
     231             : 
     232             : /* Parse user arguments and setup shared data structures used across commands */
     233             : int
     234           0 : initial_setup( int argc, char ** argv, fd_ledger_args_t * args ) {
     235           0 :   if( FD_UNLIKELY( argc==1 ) ) {
     236           0 :     return 1;
     237           0 :   }
     238             : 
     239           0 :   fd_boot( &argc, &argv );
     240             : 
     241           0 :   char const * wksp_name             = fd_env_strip_cmdline_cstr  ( &argc, &argv, "--wksp-name",             NULL, NULL                                               );
     242           0 :   ulong        page_cnt              = fd_env_strip_cmdline_ulong ( &argc, &argv, "--page-cnt",              NULL, 5                                                  );
     243           0 :   int          reset                 = fd_env_strip_cmdline_int   ( &argc, &argv, "--reset",                 NULL, 0                                                  );
     244           0 :   char const * cmd                   = fd_env_strip_cmdline_cstr  ( &argc, &argv, "--cmd",                   NULL, NULL                                               );
     245           0 :   int          copy_txn_status       = fd_env_strip_cmdline_int   ( &argc, &argv, "--copy-txn-status",       NULL, 0                                                  );
     246           0 :   ulong        slot_history_max      = fd_env_strip_cmdline_ulong ( &argc, &argv, "--slot-history",          NULL, 100UL                                              );
     247           0 :   ulong        shred_max             = fd_env_strip_cmdline_ulong ( &argc, &argv, "--shred-max",             NULL, 1UL << 17                                          );
     248           0 :   ulong        start_slot            = fd_env_strip_cmdline_ulong ( &argc, &argv, "--start-slot",            NULL, 0UL                                                );
     249           0 :   ulong        end_slot              = fd_env_strip_cmdline_ulong ( &argc, &argv, "--end-slot",              NULL, ULONG_MAX                                          );
     250           0 :   char const * restore               = fd_env_strip_cmdline_cstr  ( &argc, &argv, "--restore",               NULL, NULL                                               );
     251           0 :   ulong        trash_hash            = fd_env_strip_cmdline_ulong ( &argc, &argv, "--trash-hash",            NULL, ULONG_MAX                                          );
     252           0 :   char const * mini_db_dir           = fd_env_strip_cmdline_cstr  ( &argc, &argv, "--minified-rocksdb",      NULL, NULL                                               );
     253           0 :   char const * rocksdb_path          = fd_env_strip_cmdline_cstr  ( &argc, &argv, "--rocksdb",               NULL, NULL                                               );
     254             : 
     255             :   // TODO: Add argument validation. Make sure that we aren't including any arguments that aren't parsed for
     256             : 
     257           0 :   char hostname[64];
     258           0 :   gethostname( hostname, sizeof(hostname) );
     259           0 :   ulong hashseed = fd_hash( 0, hostname, strnlen( hostname, sizeof(hostname) ) );
     260           0 :   args->hashseed = (uint)hashseed;
     261             : 
     262             :   /* Setup workspace */
     263           0 :   fd_wksp_t * wksp;
     264           0 :   if( wksp_name == NULL ) {
     265           0 :     FD_LOG_NOTICE(( "--wksp not specified, using an anonymous local workspace" ));
     266           0 :     wksp = fd_wksp_new_anonymous( FD_SHMEM_GIGANTIC_PAGE_SZ, page_cnt, 0, "wksp", 0UL );
     267           0 :   } else {
     268           0 :     fd_shmem_info_t shmem_info[1];
     269           0 :     if( FD_UNLIKELY( fd_shmem_info( wksp_name, 0UL, shmem_info ) ) )
     270           0 :       FD_LOG_ERR(( "unable to query region \"%s\"\n\tprobably does not exist or bad permissions", wksp_name ));
     271           0 :     wksp = fd_wksp_attach( wksp_name );
     272           0 :   }
     273             : 
     274           0 :   if( wksp == NULL ) {
     275           0 :     FD_LOG_ERR(( "failed to attach to workspace %s", wksp_name ));
     276           0 :   }
     277           0 :   if( reset ) {
     278           0 :     fd_wksp_reset( wksp, args->hashseed );
     279           0 :   }
     280           0 :   args->wksp = wksp;
     281             : 
     282             :   /* Copy over arguments */
     283           0 :   args->cmd                     = cmd;
     284           0 :   args->start_slot              = start_slot;
     285           0 :   args->end_slot                = end_slot;
     286           0 :   args->shred_max               = shred_max;
     287           0 :   args->slot_history_max        = slot_history_max;
     288           0 :   args->restore                 = restore;
     289           0 :   args->mini_db_dir             = mini_db_dir;
     290           0 :   args->copy_txn_status         = copy_txn_status;
     291           0 :   args->trash_hash              = trash_hash;
     292           0 :   args->rocksdb_path            = rocksdb_path;
     293             : 
     294           0 :   if( args->rocksdb_path != NULL ) {
     295           0 :     FD_LOG_NOTICE(( "rocksdb=%s", args->rocksdb_path ));
     296           0 :   }
     297             : 
     298           0 :   return 0;
     299           0 : }
     300             : 
     301             : int main( int argc, char ** argv ) {
     302             :   /* Declaring this on the stack gets the alignment wrong when using asan */
     303             :   fd_ledger_args_t * args = fd_alloca( alignof(fd_ledger_args_t), sizeof(fd_ledger_args_t) );
     304             :   memset( args, 0, sizeof(fd_ledger_args_t) );
     305             :   initial_setup( argc, argv, args );
     306             : 
     307             :   if( args->cmd == NULL ) {
     308             :     FD_LOG_ERR(( "no command specified" ));
     309             :   } else if( strcmp( args->cmd, "minify" ) == 0 ) {
     310             :     minify( args );
     311             :   } else {
     312             :     FD_LOG_ERR(( "unknown command=%s", args->cmd ));
     313             :   }
     314             : 
     315             :   return 0;
     316             : }

Generated by: LCOV version 1.14