Line data Source code
1 : #include "../../firedancer/topology.h"
2 : #include "../../platform/fd_sys_util.h"
3 : #include "../../shared/commands/configure/configure.h"
4 : #include "../../shared/commands/run/run.h"
5 : #include "../../shared_dev/commands/dev.h"
6 : #include "../../../disco/metrics/fd_metrics.h"
7 : #include "../../../disco/topo/fd_topob.h"
8 : #include "../../../disco/pack/fd_pack.h"
9 : #include "../../../disco/pack/fd_pack_cost.h"
10 : #include "../../../util/tile/fd_tile_private.h"
11 : #include "../../../util/pod/fd_pod_format.h"
12 : #include "../../../discof/restore/utils/fd_ssmsg.h"
13 :
14 : #include <sys/resource.h>
15 : #include <linux/capability.h>
16 : #include <unistd.h>
17 : #include <stdio.h>
18 :
19 : #define NAME "snapshot-load"
20 :
21 : extern fd_topo_obj_callbacks_t * CALLBACKS[];
22 :
23 : fd_topo_run_tile_t
24 : fdctl_tile_run( fd_topo_tile_t const * tile );
25 :
26 : static void
27 : snapshot_load_topo( config_t * config,
28 0 : args_t const * args ) {
29 0 : fd_topo_t * topo = &config->topo;
30 0 : fd_topob_new( &config->topo, config->name );
31 0 : topo->max_page_size = fd_cstr_to_shmem_page_sz( config->hugetlbfs.max_page_size );
32 :
33 0 : fd_topob_wksp( topo, "txncache" );
34 0 : fd_topo_obj_t * txncache_obj = setup_topo_txncache( topo, "txncache",
35 0 : config->firedancer.runtime.max_live_slots,
36 0 : fd_ulong_pow2_up( FD_PACK_MAX_TXNCACHE_TXN_PER_SLOT ) );
37 0 : FD_TEST( fd_pod_insertf_ulong( topo->props, txncache_obj->id, "txncache" ) );
38 :
39 0 : fd_topob_wksp( topo, "funk" );
40 0 : fd_topo_obj_t * funk_obj = setup_topo_funk( topo, "funk",
41 0 : config->firedancer.funk.max_account_records,
42 0 : config->firedancer.funk.max_database_transactions,
43 0 : config->firedancer.funk.heap_size_gib,
44 0 : config->firedancer.funk.lock_pages );
45 :
46 0 : static ushort tile_to_cpu[ FD_TILE_MAX ] = {0};
47 0 : if( args->snapshot_load.tile_cpus[0] ) {
48 0 : ulong cpu_cnt = fd_tile_private_cpus_parse( args->snapshot_load.tile_cpus, tile_to_cpu );
49 0 : if( FD_UNLIKELY( cpu_cnt<4UL ) ) FD_LOG_ERR(( "--tile-cpus specifies %lu CPUs, but need at least 4", cpu_cnt ));
50 0 : }
51 :
52 : /* metrics tile *****************************************************/
53 0 : fd_topob_wksp( topo, "metric_in" );
54 0 : fd_topob_wksp( topo, "metric" );
55 0 : fd_topob_tile( topo, "metric", "metric", "metric_in", tile_to_cpu[0], 0, 0 );
56 :
57 : /* read() tile */
58 0 : fd_topob_wksp( topo, "snaprd" );
59 0 : fd_topo_tile_t * snaprd_tile = fd_topob_tile( topo, "snaprd", "snaprd", "snaprd", tile_to_cpu[1], 0, 0 );
60 0 : snaprd_tile->allow_shutdown = 1;
61 :
62 : /* "snapdc": Zstandard decompress tile */
63 0 : fd_topob_wksp( topo, "snapdc" );
64 0 : fd_topo_tile_t * snapdc_tile = fd_topob_tile( topo, "snapdc", "snapdc", "snapdc", tile_to_cpu[2], 0, 0 );
65 0 : snapdc_tile->allow_shutdown = 1;
66 :
67 : /* Compressed data stream */
68 0 : fd_topob_wksp( topo, "snap_zstd" );
69 0 : fd_topob_link( topo, "snap_zstd", "snap_zstd", 8192UL, 16384, 1UL );
70 :
71 0 : fd_topob_wksp( topo, "snaprd_rp" );
72 0 : fd_topo_link_t * snaprd_rp_link = fd_topob_link( topo, "snaprd_rp", "snaprd_rp", 128UL, 0UL, 1UL );
73 0 : snaprd_rp_link->permit_no_consumers = 1;
74 :
75 : /* Uncompressed data stream */
76 0 : fd_topob_wksp( topo, "snap_stream" );
77 0 : fd_topob_link( topo, "snap_stream", "snap_stream", 2048UL, USHORT_MAX, 1UL );
78 :
79 : /* snaprd tile -> compressed stream */
80 0 : fd_topob_tile_out( topo, "snaprd", 0UL, "snap_zstd", 0UL );
81 0 : fd_topob_tile_out( topo, "snaprd", 0UL, "snaprd_rp", 0UL );
82 :
83 : /* compressed stream -> snapdc tile */
84 0 : fd_topob_tile_in( topo, "snapdc", 0UL, "metric_in", "snap_zstd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
85 :
86 : /* snapdc tile -> uncompressed stream */
87 0 : fd_topob_tile_out( topo, "snapdc", 0UL, "snap_stream", 0UL );
88 :
89 : /* "snapin": Snapshot parser tile */
90 0 : fd_topob_wksp( topo, "snapin" );
91 0 : fd_topo_tile_t * snapin_tile = fd_topob_tile( topo, "snapin", "snapin", "snapin", tile_to_cpu[3], 0, 0 );
92 0 : snapin_tile->allow_shutdown = 1;
93 :
94 : /* uncompressed stream -> snapin tile */
95 0 : fd_topob_tile_in ( topo, "snapin", 0UL, "metric_in", "snap_stream", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
96 :
97 : /* snapin funk access */
98 0 : fd_topob_tile_uses( topo, snapin_tile, funk_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
99 0 : fd_topob_tile_uses( topo, snapin_tile, txncache_obj, FD_SHMEM_JOIN_MODE_READ_WRITE );
100 0 : snapin_tile->snapin.funk_obj_id = funk_obj->id;
101 0 : snapin_tile->snapin.txncache_obj_id = txncache_obj->id;
102 :
103 0 : snapin_tile->snapin.max_live_slots = config->firedancer.runtime.max_live_slots;
104 :
105 : /* snapshot manifest out link */
106 0 : fd_topob_wksp( topo, "snap_out" );
107 0 : fd_topo_link_t * snap_out_link = fd_topob_link( topo, "snap_out", "snap_out", 2UL, sizeof(fd_snapshot_manifest_t), 1UL );
108 0 : snap_out_link->permit_no_consumers = 1;
109 0 : fd_topob_tile_out( topo, "snapin", 0UL, "snap_out", 0UL );
110 :
111 0 : fd_topob_link( topo, "snapdc_rd", "snap_zstd", 128UL, 0UL, 1UL );
112 0 : fd_topob_tile_in( topo, "snaprd", 0UL, "metric_in", "snapdc_rd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
113 0 : fd_topob_tile_out( topo, "snapdc", 0UL, "snapdc_rd", 0UL );
114 :
115 0 : fd_topob_wksp( topo, "snapin_rd" );
116 0 : fd_topob_link( topo, "snapin_rd", "snapin_rd", 128UL, 0UL, 1UL );
117 0 : fd_topob_tile_in( topo, "snaprd", 0UL, "metric_in", "snapin_rd", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
118 0 : fd_topob_tile_out( topo, "snapin", 0UL, "snapin_rd", 0UL );
119 :
120 0 : for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
121 0 : fd_topo_tile_t * tile = &topo->tiles[ i ];
122 0 : fd_topo_configure_tile( tile, config );
123 0 : }
124 :
125 0 : if( !args->snapshot_load.tile_cpus[0] ) {
126 0 : fd_topob_auto_layout( topo, 0 );
127 0 : }
128 0 : fd_topob_finish( topo, CALLBACKS );
129 0 : }
130 :
131 : static void
132 : snapshot_load_cmd_args( int * pargc,
133 : char *** pargv,
134 0 : args_t * args ) {
135 0 : char const * tile_cpus = fd_env_strip_cmdline_cstr( pargc, pargv, "--tile-cpus", "FD_TILE_CPUS", NULL );
136 :
137 0 : if( tile_cpus ) {
138 0 : ulong tile_cpus_strlen = strlen( tile_cpus );
139 0 : if( FD_UNLIKELY( tile_cpus_strlen>=sizeof(args->snapshot_load.tile_cpus) ) ) FD_LOG_ERR(( "--tile-cpus: flag too long" ));
140 0 : fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( args->snapshot_load.tile_cpus ), tile_cpus, tile_cpus_strlen ) );
141 0 : }
142 0 : }
143 :
144 : extern int * fd_log_private_shared_lock;
145 :
146 : static void
147 : snapshot_load_cmd_fn( args_t * args,
148 0 : config_t * config ) {
149 0 : if( FD_UNLIKELY( config->firedancer.snapshots.sources.gossip.enabled || config->firedancer.snapshots.sources.entrypoints.enabled ) ) {
150 0 : FD_LOG_ERR(( "snapshot-load command is incompatible with gossip or entrypoint snapshot sources" ));
151 0 : }
152 0 : snapshot_load_topo( config, args );
153 0 : fd_topo_t * topo = &config->topo;
154 :
155 0 : args_t configure_args = {
156 0 : .configure.command = CONFIGURE_CMD_INIT,
157 0 : };
158 :
159 0 : for( ulong i=0UL; STAGES[ i ]; i++ )
160 0 : configure_args.configure.stages[ i ] = STAGES[ i ];
161 0 : configure_cmd_fn( &configure_args, config );
162 :
163 0 : run_firedancer_init( config, 1, 0 );
164 :
165 0 : fd_log_private_shared_lock[ 1 ] = 0;
166 0 : fd_topo_join_workspaces( topo, FD_SHMEM_JOIN_MODE_READ_WRITE );
167 0 : fd_topo_fill( topo );
168 :
169 0 : double tick_per_ns = fd_tempo_tick_per_ns( NULL );
170 0 : double ns_per_tick = 1.0/tick_per_ns;
171 :
172 0 : long start = fd_log_wallclock();
173 0 : fd_topo_run_single_process( topo, 2, config->uid, config->gid, fdctl_tile_run );
174 :
175 0 : fd_topo_tile_t * snaprd_tile = &topo->tiles[ fd_topo_find_tile( topo, "snaprd", 0UL ) ];
176 0 : fd_topo_tile_t * snapdc_tile = &topo->tiles[ fd_topo_find_tile( topo, "snapdc", 0UL ) ];
177 0 : fd_topo_tile_t * snapin_tile = &topo->tiles[ fd_topo_find_tile( topo, "snapin", 0UL ) ];
178 :
179 0 : ulong volatile * const snaprd_metrics = fd_metrics_tile( snaprd_tile->metrics );
180 0 : ulong volatile * const snapdc_metrics = fd_metrics_tile( snapdc_tile->metrics );
181 0 : ulong volatile * const snapin_metrics = fd_metrics_tile( snapin_tile->metrics );
182 :
183 0 : ulong total_off_old = 0UL;
184 0 : ulong snaprd_backp_old = 0UL;
185 0 : ulong snaprd_wait_old = 0UL;
186 0 : ulong snapdc_backp_old = 0UL;
187 0 : ulong snapdc_wait_old = 0UL;
188 0 : ulong snapin_backp_old = 0UL;
189 0 : ulong snapin_wait_old = 0UL;
190 0 : ulong acc_cnt_old = 0UL;
191 0 : sleep( 1 );
192 0 : puts( "" );
193 0 : puts( "Columns:" );
194 0 : puts( "- bw: Uncompressed bandwidth" );
195 0 : puts( "- backp: Backpressured by downstream tile" );
196 0 : puts( "- stall: Waiting on upstream tile" );
197 0 : puts( "- acc: Number of accounts" );
198 0 : puts( "" );
199 0 : puts( "-------------backp=(snaprd,snapdc,snapin) busy=(snaprd,snapdc,snapin)---------------" );
200 0 : long next = start+1000L*1000L*1000L;
201 0 : for(;;) {
202 0 : ulong snaprd_status = FD_VOLATILE_CONST( snaprd_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
203 0 : ulong snapdc_status = FD_VOLATILE_CONST( snapdc_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
204 0 : ulong snapin_status = FD_VOLATILE_CONST( snapin_metrics[ MIDX( GAUGE, TILE, STATUS ) ] );
205 :
206 0 : if( FD_UNLIKELY( snaprd_status==2UL && snapdc_status==2UL && snapin_status == 2UL ) ) break;
207 :
208 0 : long cur = fd_log_wallclock();
209 0 : if( FD_UNLIKELY( cur<next ) ) {
210 0 : long sleep_nanos = fd_long_min( 1000L*1000L, next-cur );
211 0 : FD_TEST( !fd_sys_util_nanosleep( (uint)(sleep_nanos/(1000L*1000L*1000L)), (uint)(sleep_nanos%(1000L*1000L*1000L)) ) );
212 0 : continue;
213 0 : }
214 :
215 0 : ulong total_off = snaprd_metrics[ MIDX( GAUGE, SNAPRD, FULL_BYTES_READ ) ] +
216 0 : snaprd_metrics[ MIDX( GAUGE, SNAPRD, INCREMENTAL_BYTES_READ ) ];
217 0 : ulong snaprd_backp = snaprd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ];
218 0 : ulong snaprd_wait = snaprd_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] + snaprd_backp;
219 0 : ulong snapdc_backp = snapdc_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ];
220 0 : ulong snapdc_wait = snapdc_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] + snapdc_backp;
221 0 : ulong snapin_backp = snapin_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_BACKPRESSURE_PREFRAG ) ];
222 0 : ulong snapin_wait = snapin_metrics[ MIDX( COUNTER, TILE, REGIME_DURATION_NANOS_CAUGHT_UP_POSTFRAG ) ] + snapin_backp;
223 :
224 0 : double progress = 100.0 * (double)snaprd_metrics[ MIDX( GAUGE, SNAPRD, FULL_BYTES_READ ) ] / (double)snaprd_metrics[ MIDX( GAUGE, SNAPRD, FULL_BYTES_TOTAL ) ];
225 :
226 0 : ulong acc_cnt = snapin_metrics[ MIDX( GAUGE, SNAPIN, ACCOUNTS_INSERTED ) ];
227 0 : printf( "%.1f %% bw=%4.0f MB/s backp=(%3.0f%%,%3.0f%%,%3.0f%%) busy=(%3.0f%%,%3.0f%%,%3.0f%%) acc=%3.1f M/s\n",
228 0 : progress,
229 0 : (double)( total_off-total_off_old )/1e6,
230 0 : ( (double)( snaprd_backp-snaprd_backp_old )*ns_per_tick )/1e7,
231 0 : ( (double)( snapdc_backp-snapdc_backp_old )*ns_per_tick )/1e7,
232 0 : ( (double)( snapin_backp-snapin_backp_old )*ns_per_tick )/1e7,
233 0 : 100-( ( (double)( snaprd_wait-snaprd_wait_old )*ns_per_tick )/1e7 ),
234 0 : 100-( ( (double)( snapdc_wait-snapdc_wait_old )*ns_per_tick )/1e7 ),
235 0 : 100-( ( (double)( snapin_wait-snapin_wait_old )*ns_per_tick )/1e7 ),
236 0 : (double)( acc_cnt-acc_cnt_old )/1e6 );
237 0 : fflush( stdout );
238 0 : total_off_old = total_off;
239 0 : snaprd_backp_old = snaprd_backp;
240 0 : snaprd_wait_old = snaprd_wait;
241 0 : snapdc_backp_old = snapdc_backp;
242 0 : snapdc_wait_old = snapdc_wait;
243 0 : snapin_backp_old = snapin_backp;
244 0 : snapin_wait_old = snapin_wait;
245 0 : acc_cnt_old = acc_cnt;
246 :
247 0 : next+=1000L*1000L*1000L;
248 0 : }
249 0 : }
250 :
251 : action_t fd_action_snapshot_load = {
252 : .name = NAME,
253 : .args = snapshot_load_cmd_args,
254 : .perm = dev_cmd_perm,
255 : .fn = snapshot_load_cmd_fn
256 : };
|