Line data Source code
1 : #include "utils/fd_ssctrl.h"
2 : #include "utils/fd_snapshot_parser.h"
3 : #include "utils/fd_ssmsg.h"
4 :
5 : #include "../../disco/topo/fd_topo.h"
6 : #include "../../disco/metrics/fd_metrics.h"
7 : #include "../../flamenco/runtime/fd_acc_mgr.h"
8 : #include "../../flamenco/types/fd_types.h"
9 : #include "../../funk/fd_funk.h"
10 :
11 : #define NAME "snapin"
12 :
13 : /* The snapin tile is a state machine that parses and loads a full
14 : and optionally an incremental snapshot. It is currently responsible
15 : for loading accounts into an in-memory database, though this may
16 : change. */
17 :
18 0 : #define FD_SNAPIN_STATE_LOADING (0) /* We are inserting accounts from a snapshot */
19 0 : #define FD_SNAPIN_STATE_DONE (1) /* We are done inserting accounts from a snapshot */
20 0 : #define FD_SNAPIN_STATE_MALFORMED (1) /* The snapshot is malformed, we are waiting for a reset notification */
21 0 : #define FD_SNAPIN_STATE_SHUTDOWN (2) /* The tile is done, been told to shut down, and has likely already exited */
22 :
23 : struct fd_snapin_tile {
24 : int full;
25 : int state;
26 :
27 : ulong seed;
28 : long boot_timestamp;
29 :
30 : fd_funk_t funk[1];
31 : fd_funk_txn_t * funk_txn;
32 : uchar * acc_data;
33 :
34 : fd_stem_context_t * stem;
35 : fd_snapshot_parser_t * ssparse;
36 :
37 : struct {
38 : ulong full_bytes_read;
39 : ulong incremental_bytes_read;
40 : ulong accounts_inserted;
41 : } metrics;
42 :
43 : struct {
44 : fd_wksp_t * wksp;
45 : ulong chunk0;
46 : ulong wmark;
47 : ulong mtu;
48 : } in;
49 :
50 : struct {
51 : fd_wksp_t * wksp;
52 : ulong chunk0;
53 : ulong wmark;
54 : ulong chunk;
55 : ulong mtu;
56 : } manifest_out;
57 : };
58 :
59 : typedef struct fd_snapin_tile fd_snapin_tile_t;
60 :
61 : static inline int
62 0 : should_shutdown( fd_snapin_tile_t * ctx ) {
63 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_SHUTDOWN ) ) {
64 0 : FD_LOG_NOTICE(( "loaded %.1fM accounts from snapshot in %.1f seconds", (double)ctx->metrics.accounts_inserted/1e6, (double)(fd_log_wallclock()-ctx->boot_timestamp)/1e9 ));
65 0 : }
66 0 : return ctx->state==FD_SNAPIN_STATE_SHUTDOWN;
67 0 : }
68 :
69 : static ulong
70 0 : scratch_align( void ) {
71 0 : return 128UL;
72 0 : }
73 :
74 : static ulong
75 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
76 0 : (void)tile;
77 0 : ulong l = FD_LAYOUT_INIT;
78 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
79 0 : l = FD_LAYOUT_APPEND( l, fd_snapshot_parser_align(), fd_snapshot_parser_footprint( 1UL<<24UL ) );
80 0 : return FD_LAYOUT_FINI( l, alignof(fd_snapin_tile_t) );
81 0 : }
82 :
83 : static void
84 0 : metrics_write( fd_snapin_tile_t * ctx ) {
85 0 : FD_MGAUGE_SET( SNAPIN, FULL_BYTES_READ, ctx->metrics.full_bytes_read );
86 0 : FD_MGAUGE_SET( SNAPIN, INCREMENTAL_BYTES_READ, ctx->metrics.incremental_bytes_read );
87 :
88 0 : FD_MGAUGE_SET( SNAPIN, ACCOUNTS_INSERTED, ctx->metrics.accounts_inserted );
89 0 : FD_MGAUGE_SET( SNAPIN, STATE, (ulong)ctx->state );
90 0 : }
91 :
92 : static void
93 : manifest_cb( void * _ctx,
94 0 : ulong manifest_sz ) {
95 0 : fd_snapin_tile_t * ctx = (fd_snapin_tile_t*)_ctx;
96 :
97 0 : ulong sz = sizeof(fd_snapshot_manifest_t)+manifest_sz;
98 0 : FD_TEST( sz<=ctx->manifest_out.mtu );
99 0 : ulong sig = ctx->full ? fd_ssmsg_sig( FD_SSMSG_MANIFEST_FULL, manifest_sz ) :
100 0 : fd_ssmsg_sig( FD_SSMSG_MANIFEST_INCREMENTAL, manifest_sz );
101 0 : fd_stem_publish( ctx->stem, 0UL, sig, ctx->manifest_out.chunk, sz, 0UL, 0UL, 0UL );
102 0 : ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk, sz, ctx->manifest_out.chunk0, ctx->manifest_out.wmark );
103 0 : }
104 :
105 : static int
106 : is_duplicate_account( fd_snapin_tile_t * ctx,
107 0 : uchar const * account_pubkey ) {
108 0 : fd_account_meta_t const * rec_meta = fd_funk_get_acc_meta_readonly( ctx->funk,
109 0 : ctx->funk_txn,
110 0 : (fd_pubkey_t*)account_pubkey,
111 0 : NULL,
112 0 : NULL,
113 0 : NULL );
114 0 : if( FD_UNLIKELY( rec_meta ) ) {
115 0 : if( FD_LIKELY( rec_meta->slot>ctx->ssparse->accv_slot ) ) return 1;
116 :
117 : /* TODO: Reaching here means the existing value is a duplicate
118 : account. We need to hash the existing account and subtract that
119 : hash from the running lthash. */
120 0 : }
121 :
122 0 : return 0;
123 0 : }
124 :
125 : static void
126 : account_cb( void * _ctx,
127 0 : fd_solana_account_hdr_t const * hdr ) {
128 0 : fd_snapin_tile_t * ctx = (fd_snapin_tile_t*)_ctx;
129 :
130 0 : if( FD_UNLIKELY( is_duplicate_account( ctx, hdr->meta.pubkey ) ) ) {
131 0 : ctx->acc_data = NULL;
132 0 : return;
133 0 : }
134 :
135 0 : FD_TXN_ACCOUNT_DECL( rec );
136 0 : int err = fd_txn_account_init_from_funk_mutable( rec,
137 0 : (fd_pubkey_t*)hdr->meta.pubkey,
138 0 : ctx->funk,
139 0 : ctx->funk_txn,
140 0 : /* do_create */ 1,
141 0 : hdr->meta.data_len );
142 0 : if( FD_UNLIKELY( err!=FD_ACC_MGR_SUCCESS ) ) FD_LOG_ERR(( "fd_txn_account_init_from_funk_mutable failed (%d)", err ));
143 :
144 0 : rec->vt->set_data_len( rec, hdr->meta.data_len );
145 0 : rec->vt->set_slot( rec, ctx->ssparse->accv_slot );
146 0 : rec->vt->set_hash( rec, &hdr->hash );
147 0 : rec->vt->set_info( rec, &hdr->info );
148 :
149 0 : ctx->acc_data = rec->vt->get_data_mut( rec );
150 0 : ctx->metrics.accounts_inserted++;
151 0 : fd_txn_account_mutable_fini( rec, ctx->funk, ctx->funk_txn );
152 0 : }
153 :
154 : static void
155 : account_data_cb( void * _ctx,
156 : uchar const * buf,
157 0 : ulong data_sz ) {
158 0 : fd_snapin_tile_t * ctx = (fd_snapin_tile_t*)_ctx;
159 0 : if( FD_UNLIKELY( !ctx->acc_data ) ) return;
160 :
161 0 : fd_memcpy( ctx->acc_data, buf, data_sz );
162 0 : ctx->acc_data += data_sz;
163 0 : }
164 :
165 : static void
166 : transition_malformed( fd_snapin_tile_t * ctx,
167 0 : fd_stem_context_t * stem ) {
168 0 : ctx->state = FD_SNAPIN_STATE_MALFORMED;
169 0 : fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_MALFORMED, 0UL, 0UL, 0UL, 0UL, 0UL );
170 0 : }
171 :
172 : static void
173 : handle_data_frag( fd_snapin_tile_t * ctx,
174 : ulong chunk,
175 : ulong sz,
176 0 : fd_stem_context_t * stem ) {
177 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_MALFORMED ) ) return;
178 :
179 0 : FD_TEST( ctx->state==FD_SNAPIN_STATE_LOADING || ctx->state==FD_SNAPIN_STATE_DONE );
180 0 : FD_TEST( chunk>=ctx->in.chunk0 && chunk<=ctx->in.wmark && sz<=ctx->in.mtu );
181 :
182 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_DONE ) ) {
183 0 : FD_LOG_WARNING(( "received data fragment while in done state" ));
184 0 : transition_malformed( ctx, stem );
185 0 : return;
186 0 : }
187 :
188 0 : uchar const * const chunk_start = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
189 0 : uchar const * const chunk_end = chunk_start + sz;
190 0 : uchar const * cur = chunk_start;
191 :
192 0 : for(;;) {
193 0 : if( FD_UNLIKELY( cur>=chunk_end ) ) {
194 0 : break;
195 0 : }
196 :
197 0 : cur = fd_snapshot_parser_process_chunk( ctx->ssparse, cur, (ulong)( chunk_end-cur ) );
198 0 : if( FD_UNLIKELY( ctx->ssparse->flags ) ) {
199 0 : if( FD_UNLIKELY( ctx->ssparse->flags & SNAP_FLAG_FAILED ) ) {
200 0 : transition_malformed( ctx, stem );
201 0 : return;
202 0 : }
203 0 : }
204 0 : }
205 :
206 0 : if( FD_UNLIKELY( ctx->ssparse->flags & SNAP_FLAG_DONE ) ) ctx->state = FD_SNAPIN_STATE_DONE;
207 :
208 0 : if( FD_LIKELY( ctx->full ) ) ctx->metrics.full_bytes_read += sz;
209 0 : else ctx->metrics.incremental_bytes_read += sz;
210 0 : }
211 :
212 : static void
213 : handle_control_frag( fd_snapin_tile_t * ctx,
214 : fd_stem_context_t * stem,
215 0 : ulong sig ) {
216 0 : switch( sig ) {
217 0 : case FD_SNAPSHOT_MSG_CTRL_RESET_FULL:
218 0 : ctx->full = 1;
219 0 : fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
220 0 : fd_funk_txn_cancel_root( ctx->funk );
221 0 : ctx->state = FD_SNAPIN_STATE_LOADING;
222 0 : break;
223 0 : case FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL:
224 0 : ctx->full = 0;
225 0 : fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
226 0 : if( FD_UNLIKELY( !ctx->funk_txn ) ) fd_funk_txn_cancel_root( ctx->funk );
227 0 : else fd_funk_txn_cancel( ctx->funk, ctx->funk_txn, 0 );
228 0 : ctx->state = FD_SNAPIN_STATE_LOADING;
229 0 : break;
230 0 : case FD_SNAPSHOT_MSG_CTRL_EOF_FULL:
231 0 : FD_TEST( ctx->full );
232 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPIN_STATE_DONE ) ) {
233 0 : FD_LOG_WARNING(( "unexpected end of snapshot when not done parsing" ));
234 0 : transition_malformed( ctx, stem );
235 0 : break;
236 0 : }
237 :
238 0 : fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
239 :
240 0 : fd_funk_txn_xid_t incremental_xid = fd_funk_generate_xid();
241 0 : ctx->funk_txn = fd_funk_txn_prepare( ctx->funk, ctx->funk_txn, &incremental_xid, 0 );
242 0 : ctx->full = 0;
243 0 : ctx->state = FD_SNAPIN_STATE_LOADING;
244 0 : break;
245 0 : case FD_SNAPSHOT_MSG_CTRL_DONE:
246 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPIN_STATE_DONE ) ) {
247 0 : FD_LOG_WARNING(( "unexpected end of snapshot when not done parsing" ));
248 0 : transition_malformed( ctx, stem );
249 0 : break;
250 0 : }
251 :
252 0 : if( FD_LIKELY( ctx->funk_txn ) ) fd_funk_txn_publish_into_parent( ctx->funk, ctx->funk_txn, 0 );
253 0 : fd_stem_publish( stem, 0UL, fd_ssmsg_sig( FD_SSMSG_DONE, 0UL ), 0UL, 0UL, 0UL, 0UL, 0UL );
254 0 : break;
255 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
256 0 : ctx->state = FD_SNAPIN_STATE_SHUTDOWN;
257 0 : break;
258 0 : default:
259 0 : FD_LOG_ERR(( "unexpected control sig %lu", sig ));
260 0 : return;
261 0 : }
262 :
263 : /* We must acknowledge after handling the control frag, because if it
264 : causes us to generate a malformed transition, that must be sent
265 : back to the snaprd controller before the acknowledgement. */
266 0 : fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_ACK, 0UL, 0UL, 0UL, 0UL, 0UL );
267 0 : }
268 :
269 : static inline int
270 : returnable_frag( fd_snapin_tile_t * ctx,
271 : ulong in_idx,
272 : ulong seq,
273 : ulong sig,
274 : ulong chunk,
275 : ulong sz,
276 : ulong tsorig,
277 : ulong tspub,
278 0 : fd_stem_context_t * stem ) {
279 0 : (void)in_idx;
280 0 : (void)seq;
281 0 : (void)sig;
282 0 : (void)tsorig;
283 0 : (void)tspub;
284 :
285 0 : ctx->stem = stem;
286 :
287 0 : FD_TEST( ctx->state!=FD_SNAPIN_STATE_SHUTDOWN );
288 :
289 0 : if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) handle_data_frag( ctx, chunk, sz, stem );
290 0 : else handle_control_frag( ctx, stem, sig );
291 :
292 0 : return 0;
293 0 : }
294 :
295 : static void
296 : privileged_init( fd_topo_t * topo,
297 0 : fd_topo_tile_t * tile ) {
298 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
299 :
300 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
301 0 : fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
302 :
303 0 : FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
304 0 : }
305 :
306 : FD_FN_UNUSED static void
307 : unprivileged_init( fd_topo_t * topo,
308 0 : fd_topo_tile_t * tile ) {
309 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
310 :
311 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
312 0 : fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
313 0 : void * _ssparse = FD_SCRATCH_ALLOC_APPEND( l, fd_snapshot_parser_align(), fd_snapshot_parser_footprint( 1UL<<24UL ) );
314 :
315 0 : ctx->full = 1;
316 0 : ctx->state = FD_SNAPIN_STATE_LOADING;
317 :
318 0 : ctx->boot_timestamp = fd_log_wallclock();
319 :
320 0 : FD_TEST( fd_funk_join( ctx->funk, fd_topo_obj_laddr( topo, tile->snapin.funk_obj_id ) ) );
321 0 : ctx->funk_txn = fd_funk_txn_query( fd_funk_root( ctx->funk ), ctx->funk->txn_map );
322 :
323 0 : ctx->ssparse = fd_snapshot_parser_new( _ssparse, ctx, ctx->seed, 1UL<<24UL, manifest_cb, account_cb, account_data_cb );
324 :
325 0 : FD_TEST( ctx->ssparse );
326 :
327 0 : fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
328 :
329 0 : if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
330 0 : if( FD_UNLIKELY( tile->in_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1", tile->in_cnt ));
331 0 : if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt ));
332 :
333 0 : fd_topo_link_t * writer_link = &topo->links[ tile->out_link_id[ 0UL ] ];
334 0 : ctx->manifest_out.wksp = topo->workspaces[ topo->objs[ writer_link->dcache_obj_id ].wksp_id ].wksp;
335 0 : ctx->manifest_out.chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( writer_link->dcache ), writer_link->dcache );
336 0 : ctx->manifest_out.wmark = fd_dcache_compact_wmark ( ctx->manifest_out.wksp, writer_link->dcache, writer_link->mtu );
337 0 : ctx->manifest_out.chunk = ctx->manifest_out.chunk0;
338 0 : ctx->manifest_out.mtu = writer_link->mtu;
339 :
340 0 : fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
341 :
342 0 : fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0UL ] ];
343 0 : fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
344 0 : ctx->in.wksp = in_wksp->wksp;;
345 0 : ctx->in.chunk0 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
346 0 : ctx->in.wmark = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
347 0 : ctx->in.mtu = in_link->mtu;
348 0 : }
349 :
350 0 : #define STEM_BURST 2UL /* For control fragments, one acknowledgement, and one malformed message */
351 0 : #define STEM_LAZY 1000L
352 :
353 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapin_tile_t
354 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapin_tile_t)
355 :
356 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
357 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
358 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
359 :
360 : #include "../../disco/stem/fd_stem.c"
361 :
362 : fd_topo_run_tile_t fd_tile_snapin = {
363 : .name = NAME,
364 : .scratch_align = scratch_align,
365 : .scratch_footprint = scratch_footprint,
366 : .privileged_init = privileged_init,
367 : .unprivileged_init = unprivileged_init,
368 : .run = stem_run,
369 : };
370 :
371 : #undef NAME
|