Line data Source code
1 : #include "../../firedancer/topology.h"
2 : #include "../../platform/fd_sys_util.h"
3 : #include "../../shared/commands/configure/configure.h"
4 : #include "../../shared/commands/run/run.h"
5 : #include "../../shared_dev/commands/dev.h"
6 : #include "../../../disco/metrics/fd_metrics.h"
7 : #include "../../../disco/topo/fd_topob.h"
8 : #include "../../../util/tile/fd_tile_private.h"
9 : #include "../../../discof/restore/utils/fd_ssmsg.h"
10 :
11 : #include <sys/resource.h>
12 : #include <linux/capability.h>
13 : #include <unistd.h>
14 : #include <stdio.h>
15 :
16 : #define NAME "snapshot-load"
17 :
18 : extern fd_topo_obj_callbacks_t * CALLBACKS[];
19 :
20 : fd_topo_run_tile_t
21 : fdctl_tile_run( fd_topo_tile_t const * tile );
22 :
23 : static void
24 : snapshot_load_topo( config_t * config,
25 0 : args_t const * args ) {
26 0 : fd_topo_t * topo = &config->topo;
27 0 : fd_topob_new( &config->topo, config->name );
28 0 : topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size );
29 :
30 0 : fd_topob_wksp( topo, "funk" );
31 0 : fd_topo_obj_t * funk_obj = setup_topo_funk( topo, "funk",
32 0 : config->firedancer.funk.max_account_records,
33 0 : config->firedancer.funk.max_database_transactions,
34 0 : config->firedancer.funk.heap_size_gib,
35 0 : config->firedancer.funk.lock_pages );
36 :
37 0 : static ushort tile_to_cpu[ FD_TILE_MAX ] = {0};
38 0 : if( args->snapshot_load.tile_cpus[0] ) {
39 0 : ulong cpu_cnt = fd_tile_private_cpus_parse( args->snapshot_load.tile_cpus, tile_to_cpu );
40 0 : if( FD_UNLIKELY( cpu_cnt<4UL ) ) FD_LOG_ERR(( "--tile-cpus specifies %lu CPUs, but need at least 4", cpu_cnt ));
41 0 : }
42 :
43 : /* metrics tile *****************************************************/
44 0 : fd_topob_wksp( topo, "metric_in" );
45 0 : fd_topob_wksp( topo, "metric" );
46 0 : fd_topob_tile( topo, "metric", "metric", "metric_in", tile_to_cpu[0], 0, 0 );
47 :
48 : /* read() tile */
49 0 : fd_topob_wksp( topo, "snaprd" );
50 0 : fd_topo_tile_t * snaprd_tile = fd_topob_tile( topo, "snaprd", "snaprd", "snaprd", tile_to_cpu[1], 0, 0 );
51 0 : snaprd_tile->allow_shutdown = 1;
52 :
53 : /* "snapdc": Zstandard decompress tile */
54 0 : fd_topob_wksp( topo, "snapdc" );
55 0 : fd_topo_tile_t * snapdc_tile = fd_topob_tile( topo, "snapdc", "snapdc", "snapdc", tile_to_cpu[2], 0, 0 );
56 0 : snapdc_tile->allow_shutdown = 1;
57 :
58 : /* Compressed data stream */
59 0 : fd_topob_wksp( topo, "snap_zstd" );
60 0 : fd_topob_link( topo, "snap_zstd", "snap_zstd", 8192UL, 16384, 1UL );
61 :
62 : /* Uncompressed data stream */
63 0 : fd_topob_wksp( topo, "snap_stream" );
64 0 : fd_topob_link( topo, "snap_stream", "snap_stream", 2048UL, USHORT_MAX, 1UL );
65 :
66 : /* snaprd tile -> compressed stream */
67 0 : fd_topob_tile_out( topo, "snaprd", 0UL, "snap_zstd", 0UL );
68 :
69 : /* compressed stream -> snapdc tile */
70 0 : fd_topob_tile_in( topo, "snapdc", 0UL, "metric_in", "snap_zstd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
71 :
72 : /* snapdc tile -> uncompressed stream */
73 0 : fd_topob_tile_out( topo, "snapdc", 0UL, "snap_stream", 0UL );
74 :
75 : /* "snapin": Snapshot parser tile */
76 0 : fd_topob_wksp( topo, "snapin" );
77 0 : fd_topo_tile_t * snapin_tile = fd_topob_tile( topo, "snapin", "snapin", "snapin", tile_to_cpu[3], 0, 0 );
78 0 : snapin_tile->allow_shutdown = 1;
79 :
80 : /* uncompressed stream -> snapin tile */
81 0 : fd_topob_tile_in ( topo, "snapin", 0UL, "metric_in", "snap_stream", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
82 :
83 : /* snapin funk access */
84 0 : fd_topob_tile_uses( topo, snapin_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
85 0 : snapin_tile->snapin.funk_obj_id = funk_obj->id;
86 :
87 : /* snapshot manifest out link */
88 0 : fd_topob_wksp( topo, "snap_out" );
89 0 : FD_TEST( sizeof(fd_snapshot_manifest_t)<(5UL*(1UL<<30UL)) );
90 0 : fd_topo_link_t * snap_out_link = fd_topob_link( topo, "snap_out", "snap_out", 2UL, 5UL*(1UL<<30UL), 1UL );
91 0 : snap_out_link->permit_no_consumers = 1;
92 0 : fd_topob_tile_out( topo, "snapin", 0UL, "snap_out", 0UL );
93 :
94 0 : fd_topob_link( topo, "snapdc_rd", "snap_zstd", 128UL, 0UL, 1UL );
95 0 : fd_topob_tile_in( topo, "snaprd", 0UL, "metric_in", "snapdc_rd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
96 0 : fd_topob_tile_out( topo, "snapdc", 0UL, "snapdc_rd", 0UL );
97 :
98 0 : fd_topob_wksp( topo, "snapin_rd" );
99 0 : fd_topob_link( topo, "snapin_rd", "snapin_rd", 128UL, 0UL, 1UL );
100 0 : fd_topob_tile_in( topo, "snaprd", 0UL, "metric_in", "snapin_rd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
101 0 : fd_topob_tile_out( topo, "snapin", 0UL, "snapin_rd", 0UL );
102 :
103 0 : for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
104 0 : fd_topo_tile_t * tile = &topo->tiles[ i ];
105 0 : if( !fd_topo_configure_tile( tile, config ) ) {
106 0 : FD_LOG_ERR(( "unknown tile name %lu `%s`", i, tile->name ));
107 0 : }
108 0 : }
109 :
110 0 : if( !args->snapshot_load.tile_cpus[0] ) {
111 0 : fd_topob_auto_layout( topo, 0 );
112 0 : }
113 0 : fd_topob_finish( topo, CALLBACKS );
114 0 : }
115 :
116 : static void
117 : snapshot_load_cmd_args( int * pargc,
118 : char *** pargv,
119 0 : args_t * args ) {
120 0 : char const * tile_cpus = fd_env_strip_cmdline_cstr( pargc, pargv, "--tile-cpus", "FD_TILE_CPUS", NULL );
121 :
122 0 : if( tile_cpus ) {
123 0 : ulong tile_cpus_strlen = strlen( tile_cpus );
124 0 : if( FD_UNLIKELY( tile_cpus_strlen>=sizeof(args->snapshot_load.tile_cpus) ) ) FD_LOG_ERR(( "--tile-cpus: flag too long" ));
125 0 : fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->snapshot_load.tile_cpus ), tile_cpus, tile_cpus_strlen ) );
126 0 : }
127 0 : }
128 :
129 : extern int * fd_log_private_shared_lock;
130 :
131 : static void
132 : snapshot_load_cmd_fn( args_t * args,
133 0 : config_t * config ) {
134 0 : snapshot_load_topo( config, args );
135 0 : fd_topo_t * topo = &config->topo;
136 :
137 0 : args_t configure_args = {
138 0 : .configure.command = CONFIGURE_CMD_INIT,
139 0 : };
140 :
141 0 : for( ulong i=0UL; STAGES[ i ]; i++ )
142 0 : configure_args.configure.stages[ i ] = STAGES[ i ];
143 0 : configure_cmd_fn( &configure_args, config );
144 :
145 0 : run_firedancer_init( config, 1 );
146 :
147 0 : fd_log_private_shared_lock[ 1 ] = 0;
148 0 : fd_topo_join_workspaces( topo, FD_SHMEM_JOIN_MODE_READ_WRITE );
149 0 : fd_topo_fill( topo );
150 :
151 0 : double tick_per_ns = fd_tempo_tick_per_ns( NULL );
152 0 : double ns_per_tick = 1.0/tick_per_ns;
153 :
154 0 : long start = fd_log_wallclock();
155 0 : fd_topo_run_single_process( topo, 2, config->uid, config->gid, fdctl_tile_run );
156 :
157 0 : fd_topo_tile_t * snaprd_tile = &topo->tiles[ fd_topo_find_tile( topo, "snaprd", 0UL ) ];
158 0 : fd_topo_tile_t * snapdc_tile = &topo->tiles[ fd_topo_find_tile( topo, "snapdc", 0UL ) ];
159 0 : fd_topo_tile_t * snapin_tile = &topo->tiles[ fd_topo_find_tile( topo, "snapin", 0UL ) ];
160 :
161 0 : ulong volatile * const snaprd_metrics = fd_metrics_tile( snaprd_tile->metrics );
162 0 : ulong volatile * const snapdc_metrics = fd_metrics_tile( snapdc_tile->metrics );
163 0 : ulong volatile * const snapin_metrics = fd_metrics_tile( snapin_tile->metrics );
164 :
165 0 : ulong total_off_old = 0UL;
166 0 : ulong snaprd_backp_old = 0UL;
167 0 : ulong snaprd_wait_old = 0UL;
168 0 : ulong snapdc_backp_old = 0UL;
169 0 : ulong snapdc_wait_old = 0UL;
170 0 : ulong snapin_backp_old = 0UL;
171 0 : ulong snapin_wait_old = 0UL;
172 0 : ulong acc_cnt_old = 0UL;
173 0 : sleep( 1 );
174 0 : puts( "" );
175 0 : puts( "Columns:" );
176 0 : puts( "- bw: Uncompressed bandwidth" );
177 0 : puts( "- backp: Backpressured by downstream tile" );
178 0 : puts( "- stall: Waiting on upstream tile" );
179 0 : puts( "- acc: Number of accounts" );
180 0 : puts( "" );
181 0 : puts( "-------------backp=(snaprd,snapdc,snapin) busy=(snaprd,snapdc,snapin)---------------" );
182 0 : long next = start+1000L*1000L*1000L;
183 0 : for(;;) {
184 0 : ulong snaprd_status = FD_VOLATILE_CONST( snaprd_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
185 0 : ulong snapdc_status = FD_VOLATILE_CONST( snapdc_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
186 0 : ulong snapin_status = FD_VOLATILE_CONST( snapin_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
187 :
188 0 : if( FD_UNLIKELY( snaprd_status==2UL && snapdc_status==2UL && snapin_status == 2UL ) ) break;
189 :
190 0 : long cur = fd_log_wallclock();
191 0 : if( FD_UNLIKELY( cur<next ) ) {
192 0 : long sleep_nanos = fd_long_min( 1000L*1000L, next-cur );
193 0 : FD_TEST( !fd_sys_util_nanosleep( (uint)(sleep_nanos/(1000L*1000L*1000L)), (uint)(sleep_nanos%(1000L*1000L*1000L)) ) );
194 0 : continue;
195 0 : }
196 :
197 0 : ulong total_off = snaprd_metrics[ MIDX( GAUGE, SNAPRD, FULL_BYTES_READ ) ] +
198 0 : snaprd_metrics[ MIDX( GAUGE, SNAPRD, INCREMENTAL_BYTES_READ ) ];
199 0 : ulong snaprd_backp = snaprd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ];
200 0 : ulong snaprd_wait = snaprd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_PREFRAG ) ] +
201 0 : snaprd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] + snaprd_backp;
202 0 : ulong snapdc_backp = snapdc_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ];
203 0 : ulong snapdc_wait = snapdc_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_PREFRAG ) ] +
204 0 : snapdc_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] + snapdc_backp;
205 0 : ulong snapin_backp = snapin_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ];
206 0 : ulong snapin_wait = snapin_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_PREFRAG ) ] +
207 0 : snapin_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] + snapin_backp;
208 :
209 0 : ulong acc_cnt = snapin_metrics[ MIDX( GAUGE, SNAPIN, ACCOUNTS_INSERTED ) ];
210 0 : printf( "bw=%4.0f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s\n",
211 0 : (double)( total_off-total_off_old )/1e6,
212 0 : ( (double)( snaprd_backp-snaprd_backp_old )*ns_per_tick )/1e7,
213 0 : ( (double)( snapdc_backp-snapdc_backp_old )*ns_per_tick )/1e7,
214 0 : ( (double)( snapin_backp-snapin_backp_old )*ns_per_tick )/1e7,
215 0 : 100-( ( (double)( snaprd_wait-snaprd_wait_old )*ns_per_tick )/1e7 ),
216 0 : 100-( ( (double)( snapdc_wait-snapdc_wait_old )*ns_per_tick )/1e7 ),
217 0 : 100-( ( (double)( snapin_wait-snapin_wait_old )*ns_per_tick )/1e7 ),
218 0 : (double)( acc_cnt-acc_cnt_old )/1e6 );
219 0 : fflush( stdout );
220 0 : total_off_old = total_off;
221 0 : snaprd_backp_old = snaprd_backp;
222 0 : snaprd_wait_old = snaprd_wait;
223 0 : snapdc_backp_old = snapdc_backp;
224 0 : snapdc_wait_old = snapdc_wait;
225 0 : snapin_backp_old = snapin_backp;
226 0 : snapin_wait_old = snapin_wait;
227 0 : acc_cnt_old = acc_cnt;
228 :
229 0 : next+=1000L*1000L*1000L;
230 0 : }
231 :
232 0 : long end = fd_log_wallclock();
233 0 : FD_LOG_NOTICE(( "Loaded %.1fM accounts in %.1f seconds", (double)snapin_metrics[ MIDX( GAUGE, SNAPIN, ACCOUNTS_INSERTED ) ]/1e6, ((double)(end-start))/(1e9)));
234 0 : }
235 :
236 : action_t fd_action_snapshot_load = {
237 : .name = NAME,
238 : .args = snapshot_load_cmd_args,
239 : .perm = dev_cmd_perm,
240 : .fn = snapshot_load_cmd_fn
241 : };
|