LCOV - code coverage report
Current view: top level - app/firedancer-dev/commands - snapshot_load.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 155 0.0 %
Date: 2025-08-05 05:04:49 Functions: 0 3 0.0 %

          Line data    Source code
       1             : #include "../../firedancer/topology.h"
       2             : #include "../../platform/fd_sys_util.h"
       3             : #include "../../shared/commands/configure/configure.h"
       4             : #include "../../shared/commands/run/run.h"
       5             : #include "../../shared_dev/commands/dev.h"
       6             : #include "../../../disco/metrics/fd_metrics.h"
       7             : #include "../../../disco/topo/fd_topob.h"
       8             : #include "../../../util/tile/fd_tile_private.h"
       9             : #include "../../../discof/restore/utils/fd_ssmsg.h"
      10             : 
      11             : #include <sys/resource.h>
      12             : #include <linux/capability.h>
      13             : #include <unistd.h>
      14             : #include <stdio.h>
      15             : 
      16             : #define NAME "snapshot-load"
      17             : 
      18             : extern fd_topo_obj_callbacks_t * CALLBACKS[];
      19             : 
      20             : fd_topo_run_tile_t
      21             : fdctl_tile_run( fd_topo_tile_t const * tile );
      22             : 
      23             : static void
      24             : snapshot_load_topo( config_t *     config,
      25           0 :                     args_t const * args ) {
      26           0 :   fd_topo_t * topo = &config->topo;
      27           0 :   fd_topob_new( &config->topo, config->name );
      28           0 :   topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size );
      29             : 
      30           0 :   fd_topob_wksp( topo, "funk" );
      31           0 :   fd_topo_obj_t * funk_obj = setup_topo_funk( topo, "funk",
      32           0 :       config->firedancer.funk.max_account_records,
      33           0 :       config->firedancer.funk.max_database_transactions,
      34           0 :       config->firedancer.funk.heap_size_gib,
      35           0 :       config->firedancer.funk.lock_pages );
      36             : 
      37           0 :   static ushort tile_to_cpu[ FD_TILE_MAX ] = {0};
      38           0 :   if( args->snapshot_load.tile_cpus[0] ) {
      39           0 :     ulong cpu_cnt = fd_tile_private_cpus_parse( args->snapshot_load.tile_cpus, tile_to_cpu );
      40           0 :     if( FD_UNLIKELY( cpu_cnt<4UL ) ) FD_LOG_ERR(( "--tile-cpus specifies %lu CPUs, but need at least 4", cpu_cnt ));
      41           0 :   }
      42             : 
      43             :   /* metrics tile *****************************************************/
      44           0 :   fd_topob_wksp( topo, "metric_in" );
      45           0 :   fd_topob_wksp( topo, "metric" );
      46           0 :   fd_topob_tile( topo, "metric",  "metric", "metric_in", tile_to_cpu[0], 0, 0 );
      47             : 
      48             :   /* read() tile */
      49           0 :   fd_topob_wksp( topo, "snaprd" );
      50           0 :   fd_topo_tile_t * snaprd_tile = fd_topob_tile( topo, "snaprd", "snaprd", "snaprd", tile_to_cpu[1], 0, 0 );
      51           0 :   snaprd_tile->allow_shutdown = 1;
      52             : 
      53             :   /* "snapdc": Zstandard decompress tile */
      54           0 :   fd_topob_wksp( topo, "snapdc" );
      55           0 :   fd_topo_tile_t * snapdc_tile = fd_topob_tile( topo, "snapdc", "snapdc", "snapdc", tile_to_cpu[2], 0, 0 );
      56           0 :   snapdc_tile->allow_shutdown = 1;
      57             : 
      58             :   /* Compressed data stream */
      59           0 :   fd_topob_wksp( topo, "snap_zstd" );
      60           0 :   fd_topob_link( topo, "snap_zstd", "snap_zstd", 8192UL, 16384, 1UL );
      61             : 
      62             :   /* Uncompressed data stream */
      63           0 :   fd_topob_wksp( topo, "snap_stream" );
      64           0 :   fd_topob_link( topo, "snap_stream", "snap_stream", 2048UL, USHORT_MAX, 1UL );
      65             : 
      66             :   /* snaprd tile -> compressed stream */
      67           0 :   fd_topob_tile_out( topo, "snaprd", 0UL, "snap_zstd", 0UL );
      68             : 
      69             :   /* compressed stream -> snapdc tile */
      70           0 :   fd_topob_tile_in( topo, "snapdc", 0UL, "metric_in", "snap_zstd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
      71             : 
      72             :   /* snapdc tile -> uncompressed stream */
      73           0 :   fd_topob_tile_out( topo, "snapdc", 0UL, "snap_stream", 0UL );
      74             : 
      75             :   /* "snapin": Snapshot parser tile */
      76           0 :   fd_topob_wksp( topo, "snapin" );
      77           0 :   fd_topo_tile_t * snapin_tile = fd_topob_tile( topo, "snapin", "snapin", "snapin", tile_to_cpu[3], 0, 0 );
      78           0 :   snapin_tile->allow_shutdown = 1;
      79             : 
      80             :   /* uncompressed stream -> snapin tile */
      81           0 :   fd_topob_tile_in  ( topo, "snapin", 0UL, "metric_in", "snap_stream", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
      82             : 
      83             :   /* snapin funk access */
      84           0 :   fd_topob_tile_uses( topo, snapin_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
      85           0 :   snapin_tile->snapin.funk_obj_id = funk_obj->id;
      86             : 
      87             :   /* snapshot manifest out link */
      88           0 :   fd_topob_wksp( topo, "snap_out" );
      89           0 :   FD_TEST( sizeof(fd_snapshot_manifest_t)<(5UL*(1UL<<30UL)) );
      90           0 :   fd_topo_link_t * snap_out_link = fd_topob_link( topo, "snap_out", "snap_out", 2UL, 5UL*(1UL<<30UL), 1UL );
      91           0 :   snap_out_link->permit_no_consumers = 1;
      92           0 :   fd_topob_tile_out( topo, "snapin", 0UL, "snap_out", 0UL );
      93             : 
      94           0 :   fd_topob_link( topo, "snapdc_rd", "snap_zstd", 128UL, 0UL, 1UL );
      95           0 :   fd_topob_tile_in( topo, "snaprd", 0UL, "metric_in", "snapdc_rd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
      96           0 :   fd_topob_tile_out( topo, "snapdc", 0UL, "snapdc_rd", 0UL );
      97             : 
      98           0 :   fd_topob_wksp( topo, "snapin_rd" );
      99           0 :   fd_topob_link( topo, "snapin_rd", "snapin_rd", 128UL, 0UL, 1UL );
     100           0 :   fd_topob_tile_in( topo, "snaprd", 0UL, "metric_in", "snapin_rd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
     101           0 :   fd_topob_tile_out( topo, "snapin", 0UL, "snapin_rd", 0UL );
     102             : 
     103           0 :   for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
     104           0 :     fd_topo_tile_t * tile = &topo->tiles[ i ];
     105           0 :     if( !fd_topo_configure_tile( tile, config ) ) {
     106           0 :       FD_LOG_ERR(( "unknown tile name %lu `%s`", i, tile->name ));
     107           0 :     }
     108           0 :   }
     109             : 
     110           0 :   if( !args->snapshot_load.tile_cpus[0] ) {
     111           0 :     fd_topob_auto_layout( topo, 0 );
     112           0 :   }
     113           0 :   fd_topob_finish( topo, CALLBACKS );
     114           0 : }
     115             : 
     116             : static void
     117             : snapshot_load_cmd_args( int *    pargc,
     118             :                         char *** pargv,
     119           0 :                         args_t * args ) {
     120           0 :   char const * tile_cpus                = fd_env_strip_cmdline_cstr( pargc, pargv,  "--tile-cpus",     "FD_TILE_CPUS", NULL );
     121             : 
     122           0 :   if( tile_cpus ) {
     123           0 :     ulong tile_cpus_strlen = strlen( tile_cpus );
     124           0 :     if( FD_UNLIKELY( tile_cpus_strlen>=sizeof(args->snapshot_load.tile_cpus) ) ) FD_LOG_ERR(( "--tile-cpus: flag too long" ));
     125           0 :     fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->snapshot_load.tile_cpus ), tile_cpus, tile_cpus_strlen ) );
     126           0 :   }
     127           0 : }
     128             : 
     129             : extern int * fd_log_private_shared_lock;
     130             : 
     131             : static void
     132             : snapshot_load_cmd_fn( args_t *   args,
     133           0 :                       config_t * config ) {
     134           0 :   snapshot_load_topo( config, args );
     135           0 :   fd_topo_t * topo = &config->topo;
     136             : 
     137           0 :   args_t configure_args = {
     138           0 :     .configure.command = CONFIGURE_CMD_INIT,
     139           0 :   };
     140             : 
     141           0 :   for( ulong i=0UL; STAGES[ i ]; i++ )
     142           0 :     configure_args.configure.stages[ i ] = STAGES[ i ];
     143           0 :   configure_cmd_fn( &configure_args, config );
     144             : 
     145           0 :   run_firedancer_init( config, 1 );
     146             : 
     147           0 :   fd_log_private_shared_lock[ 1 ] = 0;
     148           0 :   fd_topo_join_workspaces( topo, FD_SHMEM_JOIN_MODE_READ_WRITE );
     149           0 :   fd_topo_fill( topo );
     150             : 
     151           0 :   double tick_per_ns = fd_tempo_tick_per_ns( NULL );
     152           0 :   double ns_per_tick = 1.0/tick_per_ns;
     153             : 
     154           0 :   long start = fd_log_wallclock();
     155           0 :   fd_topo_run_single_process( topo, 2, config->uid, config->gid, fdctl_tile_run );
     156             : 
     157           0 :   fd_topo_tile_t * snaprd_tile = &topo->tiles[ fd_topo_find_tile( topo, "snaprd", 0UL ) ];
     158           0 :   fd_topo_tile_t * snapdc_tile = &topo->tiles[ fd_topo_find_tile( topo, "snapdc", 0UL ) ];
     159           0 :   fd_topo_tile_t * snapin_tile = &topo->tiles[ fd_topo_find_tile( topo, "snapin", 0UL ) ];
     160             : 
     161           0 :   ulong volatile * const snaprd_metrics = fd_metrics_tile( snaprd_tile->metrics );
     162           0 :   ulong volatile * const snapdc_metrics = fd_metrics_tile( snapdc_tile->metrics );
     163           0 :   ulong volatile * const snapin_metrics = fd_metrics_tile( snapin_tile->metrics );
     164             : 
     165           0 :   ulong total_off_old    = 0UL;
     166           0 :   ulong snaprd_backp_old = 0UL;
     167           0 :   ulong snaprd_wait_old  = 0UL;
     168           0 :   ulong snapdc_backp_old = 0UL;
     169           0 :   ulong snapdc_wait_old  = 0UL;
     170           0 :   ulong snapin_backp_old = 0UL;
     171           0 :   ulong snapin_wait_old  = 0UL;
     172           0 :   ulong acc_cnt_old      = 0UL;
     173           0 :   sleep( 1 );
     174           0 :   puts( "" );
     175           0 :   puts( "Columns:" );
     176           0 :   puts( "- bw:    Uncompressed bandwidth" );
     177           0 :   puts( "- backp: Backpressured by downstream tile" );
     178           0 :   puts( "- stall: Waiting on upstream tile"         );
     179           0 :   puts( "- acc:   Number of accounts"               );
     180           0 :   puts( "" );
     181           0 :   puts( "-------------backp=(snaprd,snapdc,snapin) busy=(snaprd,snapdc,snapin)---------------" );
     182           0 :   long next = start+1000L*1000L*1000L;
     183           0 :   for(;;) {
     184           0 :     ulong snaprd_status = FD_VOLATILE_CONST( snaprd_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
     185           0 :     ulong snapdc_status = FD_VOLATILE_CONST( snapdc_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
     186           0 :     ulong snapin_status = FD_VOLATILE_CONST( snapin_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
     187             : 
     188           0 :     if( FD_UNLIKELY( snaprd_status==2UL && snapdc_status==2UL && snapin_status == 2UL ) ) break;
     189             : 
     190           0 :     long cur = fd_log_wallclock();
     191           0 :     if( FD_UNLIKELY( cur<next ) ) {
     192           0 :       long sleep_nanos = fd_long_min( 1000L*1000L, next-cur );
     193           0 :       FD_TEST( !fd_sys_util_nanosleep(  (uint)(sleep_nanos/(1000L*1000L*1000L)), (uint)(sleep_nanos%(1000L*1000L*1000L)) ) );
     194           0 :       continue;
     195           0 :     }
     196             : 
     197           0 :     ulong total_off    = snaprd_metrics[ MIDX( GAUGE, SNAPRD, FULL_BYTES_READ ) ] +
     198           0 :                          snaprd_metrics[ MIDX( GAUGE, SNAPRD, INCREMENTAL_BYTES_READ ) ];
     199           0 :     ulong snaprd_backp = snaprd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ];
     200           0 :     ulong snaprd_wait  = snaprd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_PREFRAG    ) ] +
     201           0 :                          snaprd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG   ) ] + snaprd_backp;
     202           0 :     ulong snapdc_backp = snapdc_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ];
     203           0 :     ulong snapdc_wait  = snapdc_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_PREFRAG    ) ] +
     204           0 :                          snapdc_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG   ) ] + snapdc_backp;
     205           0 :     ulong snapin_backp = snapin_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ];
     206           0 :     ulong snapin_wait  = snapin_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_PREFRAG    ) ] +
     207           0 :                          snapin_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG   ) ] + snapin_backp;
     208             : 
     209           0 :     ulong acc_cnt      = snapin_metrics[ MIDX( GAUGE, SNAPIN, ACCOUNTS_INSERTED    ) ];
     210           0 :     printf( "bw=%4.0f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s\n",
     211           0 :             (double)( total_off-total_off_old )/1e6,
     212           0 :             ( (double)( snaprd_backp-snaprd_backp_old )*ns_per_tick )/1e7,
     213           0 :             ( (double)( snapdc_backp-snapdc_backp_old )*ns_per_tick )/1e7,
     214           0 :             ( (double)( snapin_backp-snapin_backp_old )*ns_per_tick )/1e7,
     215           0 :             100-( ( (double)( snaprd_wait-snaprd_wait_old  )*ns_per_tick )/1e7 ),
     216           0 :             100-( ( (double)( snapdc_wait-snapdc_wait_old  )*ns_per_tick )/1e7 ),
     217           0 :             100-( ( (double)( snapin_wait-snapin_wait_old  )*ns_per_tick )/1e7 ),
     218           0 :             (double)( acc_cnt-acc_cnt_old  )/1e6 );
     219           0 :     fflush( stdout );
     220           0 :     total_off_old    = total_off;
     221           0 :     snaprd_backp_old = snaprd_backp;
     222           0 :     snaprd_wait_old  = snaprd_wait;
     223           0 :     snapdc_backp_old = snapdc_backp;
     224           0 :     snapdc_wait_old  = snapdc_wait;
     225           0 :     snapin_backp_old = snapin_backp;
     226           0 :     snapin_wait_old  = snapin_wait;
     227           0 :     acc_cnt_old      = acc_cnt;
     228             : 
     229           0 :     next+=1000L*1000L*1000L;
     230           0 :   }
     231             : 
     232           0 :   long end = fd_log_wallclock();
     233           0 :   FD_LOG_NOTICE(( "Loaded %.1fM accounts in %.1f seconds", (double)snapin_metrics[ MIDX( GAUGE, SNAPIN, ACCOUNTS_INSERTED ) ]/1e6, ((double)(end-start))/(1e9)));
     234           0 : }
     235             : 
     236             : action_t fd_action_snapshot_load = {
     237             :   .name = NAME,
     238             :   .args = snapshot_load_cmd_args,
     239             :   .perm = dev_cmd_perm,
     240             :   .fn   = snapshot_load_cmd_fn
     241             : };

Generated by: LCOV version 1.14