LCOV - code coverage report
Current view: top level - app/firedancer-dev/commands - snapshot_load.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 164 0.0 %
Date: 2025-10-13 04:42:14 Functions: 0 3 0.0 %

          Line data    Source code
       1             : #include "../../firedancer/topology.h"
       2             : #include "../../platform/fd_sys_util.h"
       3             : #include "../../shared/commands/configure/configure.h"
       4             : #include "../../shared/commands/run/run.h"
       5             : #include "../../shared_dev/commands/dev.h"
       6             : #include "../../../disco/metrics/fd_metrics.h"
       7             : #include "../../../disco/topo/fd_topob.h"
       8             : #include "../../../disco/pack/fd_pack.h"
       9             : #include "../../../disco/pack/fd_pack_cost.h"
      10             : #include "../../../util/tile/fd_tile_private.h"
      11             : #include "../../../util/pod/fd_pod_format.h"
      12             : #include "../../../discof/restore/utils/fd_ssmsg.h"
      13             : 
      14             : #include <sys/resource.h>
      15             : #include <linux/capability.h>
      16             : #include <unistd.h>
      17             : #include <stdio.h>
      18             : 
      19             : #define NAME "snapshot-load"
      20             : 
      21             : extern fd_topo_obj_callbacks_t * CALLBACKS[];
      22             : 
      23             : fd_topo_run_tile_t
      24             : fdctl_tile_run( fd_topo_tile_t const * tile );
      25             : 
      26             : static void
      27             : snapshot_load_topo( config_t *     config,
      28           0 :                     args_t const * args ) {
      29           0 :   fd_topo_t * topo = &config->topo;
      30           0 :   fd_topob_new( &config->topo, config->name );
      31           0 :   topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size );
      32             : 
      33           0 :   fd_topob_wksp( topo, "txncache" );
      34           0 :   fd_topo_obj_t * txncache_obj = setup_topo_txncache( topo, "txncache",
      35           0 :       config->firedancer.runtime.max_live_slots,
      36           0 :       fd_ulong_pow2_up( FD_PACK_MAX_TXNCACHE_TXN_PER_SLOT ) );
      37           0 :   FD_TEST( fd_pod_insertf_ulong( topo->props, txncache_obj->id, "txncache" ) );
      38             : 
      39           0 :   fd_topob_wksp( topo, "funk" );
      40           0 :   fd_topo_obj_t * funk_obj = setup_topo_funk( topo, "funk",
      41           0 :       config->firedancer.funk.max_account_records,
      42           0 :       config->firedancer.funk.max_database_transactions,
      43           0 :       config->firedancer.funk.heap_size_gib,
      44           0 :       config->firedancer.funk.lock_pages );
      45             : 
      46           0 :   static ushort tile_to_cpu[ FD_TILE_MAX ] = {0};
      47           0 :   if( args->snapshot_load.tile_cpus[0] ) {
      48           0 :     ulong cpu_cnt = fd_tile_private_cpus_parse( args->snapshot_load.tile_cpus, tile_to_cpu );
      49           0 :     if( FD_UNLIKELY( cpu_cnt<4UL ) ) FD_LOG_ERR(( "--tile-cpus specifies %lu CPUs, but need at least 4", cpu_cnt ));
      50           0 :   }
      51             : 
      52             :   /* metrics tile *****************************************************/
      53           0 :   fd_topob_wksp( topo, "metric_in" );
      54           0 :   fd_topob_wksp( topo, "metric" );
      55           0 :   fd_topob_tile( topo, "metric",  "metric", "metric_in", tile_to_cpu[0], 0, 0 );
      56             : 
      57             :   /* read() tile */
      58           0 :   fd_topob_wksp( topo, "snaprd" );
      59           0 :   fd_topo_tile_t * snaprd_tile = fd_topob_tile( topo, "snaprd", "snaprd", "snaprd", tile_to_cpu[1], 0, 0 );
      60           0 :   snaprd_tile->allow_shutdown = 1;
      61             : 
      62             :   /* "snapdc": Zstandard decompress tile */
      63           0 :   fd_topob_wksp( topo, "snapdc" );
      64           0 :   fd_topo_tile_t * snapdc_tile = fd_topob_tile( topo, "snapdc", "snapdc", "snapdc", tile_to_cpu[2], 0, 0 );
      65           0 :   snapdc_tile->allow_shutdown = 1;
      66             : 
      67             :   /* Compressed data stream */
      68           0 :   fd_topob_wksp( topo, "snap_zstd" );
      69           0 :   fd_topob_link( topo, "snap_zstd", "snap_zstd", 8192UL, 16384, 1UL );
      70             : 
      71           0 :   fd_topob_wksp( topo, "snaprd_rp" );
      72           0 :   fd_topo_link_t * snaprd_rp_link = fd_topob_link( topo, "snaprd_rp", "snaprd_rp", 128UL, 0UL, 1UL );
      73           0 :   snaprd_rp_link->permit_no_consumers = 1;
      74             : 
      75             :   /* Uncompressed data stream */
      76           0 :   fd_topob_wksp( topo, "snap_stream" );
      77           0 :   fd_topob_link( topo, "snap_stream", "snap_stream", 2048UL, USHORT_MAX, 1UL );
      78             : 
      79             :   /* snaprd tile -> compressed stream */
      80           0 :   fd_topob_tile_out( topo, "snaprd", 0UL, "snap_zstd", 0UL );
      81           0 :   fd_topob_tile_out( topo, "snaprd", 0UL, "snaprd_rp", 0UL );
      82             : 
      83             :   /* compressed stream -> snapdc tile */
      84           0 :   fd_topob_tile_in( topo, "snapdc", 0UL, "metric_in", "snap_zstd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
      85             : 
      86             :   /* snapdc tile -> uncompressed stream */
      87           0 :   fd_topob_tile_out( topo, "snapdc", 0UL, "snap_stream", 0UL );
      88             : 
      89             :   /* "snapin": Snapshot parser tile */
      90           0 :   fd_topob_wksp( topo, "snapin" );
      91           0 :   fd_topo_tile_t * snapin_tile = fd_topob_tile( topo, "snapin", "snapin", "snapin", tile_to_cpu[3], 0, 0 );
      92           0 :   snapin_tile->allow_shutdown = 1;
      93             : 
      94             :   /* uncompressed stream -> snapin tile */
      95           0 :   fd_topob_tile_in  ( topo, "snapin", 0UL, "metric_in", "snap_stream", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
      96             : 
      97             :   /* snapin funk access */
      98           0 :   fd_topob_tile_uses( topo, snapin_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
      99           0 :   fd_topob_tile_uses( topo, snapin_tile, txncache_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
     100           0 :   snapin_tile->snapin.funk_obj_id     = funk_obj->id;
     101           0 :   snapin_tile->snapin.txncache_obj_id = txncache_obj->id;
     102             : 
     103           0 :   snapin_tile->snapin.max_live_slots  = config->firedancer.runtime.max_live_slots;
     104             : 
     105             :   /* snapshot manifest out link */
     106           0 :   fd_topob_wksp( topo, "snap_out" );
     107           0 :   fd_topo_link_t * snap_out_link = fd_topob_link( topo, "snap_out", "snap_out", 2UL, sizeof(fd_snapshot_manifest_t), 1UL );
     108           0 :   snap_out_link->permit_no_consumers = 1;
     109           0 :   fd_topob_tile_out( topo, "snapin", 0UL, "snap_out", 0UL );
     110             : 
     111           0 :   fd_topob_link( topo, "snapdc_rd", "snap_zstd", 128UL, 0UL, 1UL );
     112           0 :   fd_topob_tile_in( topo, "snaprd", 0UL, "metric_in", "snapdc_rd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
     113           0 :   fd_topob_tile_out( topo, "snapdc", 0UL, "snapdc_rd", 0UL );
     114             : 
     115           0 :   fd_topob_wksp( topo, "snapin_rd" );
     116           0 :   fd_topob_link( topo, "snapin_rd", "snapin_rd", 128UL, 0UL, 1UL );
     117           0 :   fd_topob_tile_in( topo, "snaprd", 0UL, "metric_in", "snapin_rd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
     118           0 :   fd_topob_tile_out( topo, "snapin", 0UL, "snapin_rd", 0UL );
     119             : 
     120           0 :   for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
     121           0 :     fd_topo_tile_t * tile = &topo->tiles[ i ];
     122           0 :     fd_topo_configure_tile( tile, config );
     123           0 :   }
     124             : 
     125           0 :   if( !args->snapshot_load.tile_cpus[0] ) {
     126           0 :     fd_topob_auto_layout( topo, 0 );
     127           0 :   }
     128           0 :   fd_topob_finish( topo, CALLBACKS );
     129           0 : }
     130             : 
     131             : static void
     132             : snapshot_load_cmd_args( int *    pargc,
     133             :                         char *** pargv,
     134           0 :                         args_t * args ) {
     135           0 :   char const * tile_cpus                = fd_env_strip_cmdline_cstr( pargc, pargv,  "--tile-cpus",     "FD_TILE_CPUS", NULL );
     136             : 
     137           0 :   if( tile_cpus ) {
     138           0 :     ulong tile_cpus_strlen = strlen( tile_cpus );
     139           0 :     if( FD_UNLIKELY( tile_cpus_strlen>=sizeof(args->snapshot_load.tile_cpus) ) ) FD_LOG_ERR(( "--tile-cpus: flag too long" ));
     140           0 :     fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->snapshot_load.tile_cpus ), tile_cpus, tile_cpus_strlen ) );
     141           0 :   }
     142           0 : }
     143             : 
     144             : extern int * fd_log_private_shared_lock;
     145             : 
     146             : static void
     147             : snapshot_load_cmd_fn( args_t *   args,
     148           0 :                       config_t * config ) {
     149           0 :   if( FD_UNLIKELY( config->firedancer.snapshots.sources.gossip.enabled || config->firedancer.snapshots.sources.entrypoints.enabled ) ) {
     150           0 :     FD_LOG_ERR(( "snapshot-load command is incompatible with gossip or entrypoint snapshot sources" ));
     151           0 :   }
     152           0 :   snapshot_load_topo( config, args );
     153           0 :   fd_topo_t * topo = &config->topo;
     154             : 
     155           0 :   args_t configure_args = {
     156           0 :     .configure.command = CONFIGURE_CMD_INIT,
     157           0 :   };
     158             : 
     159           0 :   for( ulong i=0UL; STAGES[ i ]; i++ )
     160           0 :     configure_args.configure.stages[ i ] = STAGES[ i ];
     161           0 :   configure_cmd_fn( &configure_args, config );
     162             : 
     163           0 :   run_firedancer_init( config, 1, 0 );
     164             : 
     165           0 :   fd_log_private_shared_lock[ 1 ] = 0;
     166           0 :   fd_topo_join_workspaces( topo, FD_SHMEM_JOIN_MODE_READ_WRITE );
     167           0 :   fd_topo_fill( topo );
     168             : 
     169           0 :   double tick_per_ns = fd_tempo_tick_per_ns( NULL );
     170           0 :   double ns_per_tick = 1.0/tick_per_ns;
     171             : 
     172           0 :   long start = fd_log_wallclock();
     173           0 :   fd_topo_run_single_process( topo, 2, config->uid, config->gid, fdctl_tile_run );
     174             : 
     175           0 :   fd_topo_tile_t * snaprd_tile = &topo->tiles[ fd_topo_find_tile( topo, "snaprd", 0UL ) ];
     176           0 :   fd_topo_tile_t * snapdc_tile = &topo->tiles[ fd_topo_find_tile( topo, "snapdc", 0UL ) ];
     177           0 :   fd_topo_tile_t * snapin_tile = &topo->tiles[ fd_topo_find_tile( topo, "snapin", 0UL ) ];
     178             : 
     179           0 :   ulong volatile * const snaprd_metrics = fd_metrics_tile( snaprd_tile->metrics );
     180           0 :   ulong volatile * const snapdc_metrics = fd_metrics_tile( snapdc_tile->metrics );
     181           0 :   ulong volatile * const snapin_metrics = fd_metrics_tile( snapin_tile->metrics );
     182             : 
     183           0 :   ulong total_off_old    = 0UL;
     184           0 :   ulong snaprd_backp_old = 0UL;
     185           0 :   ulong snaprd_wait_old  = 0UL;
     186           0 :   ulong snapdc_backp_old = 0UL;
     187           0 :   ulong snapdc_wait_old  = 0UL;
     188           0 :   ulong snapin_backp_old = 0UL;
     189           0 :   ulong snapin_wait_old  = 0UL;
     190           0 :   ulong acc_cnt_old      = 0UL;
     191           0 :   sleep( 1 );
     192           0 :   puts( "" );
     193           0 :   puts( "Columns:" );
     194           0 :   puts( "- bw:    Uncompressed bandwidth" );
     195           0 :   puts( "- backp: Backpressured by downstream tile" );
     196           0 :   puts( "- stall: Waiting on upstream tile"         );
     197           0 :   puts( "- acc:   Number of accounts"               );
     198           0 :   puts( "" );
     199           0 :   puts( "-------------backp=(snaprd,snapdc,snapin) busy=(snaprd,snapdc,snapin)---------------" );
     200           0 :   long next = start+1000L*1000L*1000L;
     201           0 :   for(;;) {
     202           0 :     ulong snaprd_status = FD_VOLATILE_CONST( snaprd_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
     203           0 :     ulong snapdc_status = FD_VOLATILE_CONST( snapdc_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
     204           0 :     ulong snapin_status = FD_VOLATILE_CONST( snapin_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
     205             : 
     206           0 :     if( FD_UNLIKELY( snaprd_status==2UL && snapdc_status==2UL && snapin_status == 2UL ) ) break;
     207             : 
     208           0 :     long cur = fd_log_wallclock();
     209           0 :     if( FD_UNLIKELY( cur<next ) ) {
     210           0 :       long sleep_nanos = fd_long_min( 1000L*1000L, next-cur );
     211           0 :       FD_TEST( !fd_sys_util_nanosleep(  (uint)(sleep_nanos/(1000L*1000L*1000L)), (uint)(sleep_nanos%(1000L*1000L*1000L)) ) );
     212           0 :       continue;
     213           0 :     }
     214             : 
     215           0 :     ulong total_off    = snaprd_metrics[ MIDX( GAUGE, SNAPRD, FULL_BYTES_READ ) ] +
     216           0 :                          snaprd_metrics[ MIDX( GAUGE, SNAPRD, INCREMENTAL_BYTES_READ ) ];
     217           0 :     ulong snaprd_backp = snaprd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ];
     218           0 :     ulong snaprd_wait  = snaprd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG   ) ] + snaprd_backp;
     219           0 :     ulong snapdc_backp = snapdc_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ];
     220           0 :     ulong snapdc_wait  = snapdc_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG   ) ] + snapdc_backp;
     221           0 :     ulong snapin_backp = snapin_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ];
     222           0 :     ulong snapin_wait  = snapin_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG   ) ] + snapin_backp;
     223             : 
     224           0 :     double progress = 100.0 * (double)snaprd_metrics[ MIDX( GAUGE, SNAPRD, FULL_BYTES_READ ) ] / (double)snaprd_metrics[ MIDX( GAUGE, SNAPRD, FULL_BYTES_TOTAL ) ];
     225             : 
     226           0 :     ulong acc_cnt      = snapin_metrics[ MIDX( GAUGE, SNAPIN, ACCOUNTS_INSERTED    ) ];
     227           0 :     printf( "%.1f %% bw=%4.0f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s\n",
     228           0 :             progress,
     229           0 :             (double)( total_off-total_off_old )/1e6,
     230           0 :             ( (double)( snaprd_backp-snaprd_backp_old )*ns_per_tick )/1e7,
     231           0 :             ( (double)( snapdc_backp-snapdc_backp_old )*ns_per_tick )/1e7,
     232           0 :             ( (double)( snapin_backp-snapin_backp_old )*ns_per_tick )/1e7,
     233           0 :             100-( ( (double)( snaprd_wait-snaprd_wait_old  )*ns_per_tick )/1e7 ),
     234           0 :             100-( ( (double)( snapdc_wait-snapdc_wait_old  )*ns_per_tick )/1e7 ),
     235           0 :             100-( ( (double)( snapin_wait-snapin_wait_old  )*ns_per_tick )/1e7 ),
     236           0 :             (double)( acc_cnt-acc_cnt_old  )/1e6 );
     237           0 :     fflush( stdout );
     238           0 :     total_off_old    = total_off;
     239           0 :     snaprd_backp_old = snaprd_backp;
     240           0 :     snaprd_wait_old  = snaprd_wait;
     241           0 :     snapdc_backp_old = snapdc_backp;
     242           0 :     snapdc_wait_old  = snapdc_wait;
     243           0 :     snapin_backp_old = snapin_backp;
     244           0 :     snapin_wait_old  = snapin_wait;
     245           0 :     acc_cnt_old      = acc_cnt;
     246             : 
     247           0 :     next+=1000L*1000L*1000L;
     248           0 :   }
     249           0 : }
     250             : 
     251             : action_t fd_action_snapshot_load = {
     252             :   .name = NAME,
     253             :   .args = snapshot_load_cmd_args,
     254             :   .perm = dev_cmd_perm,
     255             :   .fn   = snapshot_load_cmd_fn
     256             : };

Generated by: LCOV version 1.14