Line data Source code
1 : #include "utils/fd_ssctrl.h"
2 : #include "utils/fd_snapshot_parser.h"
3 : #include "utils/fd_ssmsg.h"
4 :
5 : #include "../../disco/topo/fd_topo.h"
6 : #include "../../disco/metrics/fd_metrics.h"
7 : #include "../../flamenco/runtime/fd_acc_mgr.h"
8 : #include "../../flamenco/types/fd_types.h"
9 : #include "../../funk/fd_funk.h"
10 :
11 : #define NAME "snapin"
12 :
13 : /* The snapin tile is a state machine that parses and loads a full
14 : and optionally an incremental snapshot. It is currently responsible
15 : for loading accounts into an in-memory database, though this may
16 : change. */
17 :
18 0 : #define FD_SNAPIN_STATE_LOADING (0) /* We are inserting accounts from a snapshot */
19 0 : #define FD_SNAPIN_STATE_DONE (1) /* We are done inserting accounts from a snapshot */
20 0 : #define FD_SNAPIN_STATE_MALFORMED (1) /* The snapshot is malformed, we are waiting for a reset notification */
21 0 : #define FD_SNAPIN_STATE_SHUTDOWN (2) /* The tile is done, been told to shut down, and has likely already exited */
22 :
23 : struct fd_snapin_tile {
24 : int full;
25 : int state;
26 :
27 : ulong seed;
28 : long boot_timestamp;
29 :
30 : fd_funk_t funk[1];
31 : fd_funk_txn_t * funk_txn;
32 : uchar * acc_data;
33 :
34 : fd_stem_context_t * stem;
35 : fd_snapshot_parser_t * ssparse;
36 :
37 : struct {
38 : ulong full_bytes_read;
39 : ulong incremental_bytes_read;
40 : ulong accounts_inserted;
41 : } metrics;
42 :
43 : struct {
44 : fd_wksp_t * wksp;
45 : ulong chunk0;
46 : ulong wmark;
47 : ulong mtu;
48 : } in;
49 :
50 : struct {
51 : fd_wksp_t * wksp;
52 : ulong chunk0;
53 : ulong wmark;
54 : ulong chunk;
55 : ulong mtu;
56 : } manifest_out;
57 : };
58 :
59 : typedef struct fd_snapin_tile fd_snapin_tile_t;
60 :
61 : static inline int
62 0 : should_shutdown( fd_snapin_tile_t * ctx ) {
63 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_SHUTDOWN ) ) {
64 0 : FD_LOG_NOTICE(( "loaded %.1fM accounts from snapshot in %.1f seconds", (double)ctx->metrics.accounts_inserted/1e6, (double)(fd_log_wallclock()-ctx->boot_timestamp)/1e9 ));
65 0 : }
66 0 : return ctx->state==FD_SNAPIN_STATE_SHUTDOWN;
67 0 : }
68 :
69 : static ulong
70 0 : scratch_align( void ) {
71 0 : return 128UL;
72 0 : }
73 :
74 : static ulong
75 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
76 0 : (void)tile;
77 0 : ulong l = FD_LAYOUT_INIT;
78 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
79 0 : l = FD_LAYOUT_APPEND( l, fd_snapshot_parser_align(), fd_snapshot_parser_footprint( 1UL<<24UL ) );
80 0 : return FD_LAYOUT_FINI( l, alignof(fd_snapin_tile_t) );
81 0 : }
82 :
83 : static void
84 0 : metrics_write( fd_snapin_tile_t * ctx ) {
85 0 : FD_MGAUGE_SET( SNAPIN, FULL_BYTES_READ, ctx->metrics.full_bytes_read );
86 0 : FD_MGAUGE_SET( SNAPIN, INCREMENTAL_BYTES_READ, ctx->metrics.incremental_bytes_read );
87 :
88 0 : FD_MGAUGE_SET( SNAPIN, ACCOUNTS_INSERTED, ctx->metrics.accounts_inserted );
89 0 : FD_MGAUGE_SET( SNAPIN, STATE, (ulong)ctx->state );
90 0 : }
91 :
92 : static void
93 0 : manifest_cb( void * _ctx ) {
94 0 : fd_snapin_tile_t * ctx = (fd_snapin_tile_t*)_ctx;
95 :
96 0 : ulong sig = ctx->full ? fd_ssmsg_sig( FD_SSMSG_MANIFEST_FULL ) :
97 0 : fd_ssmsg_sig( FD_SSMSG_MANIFEST_INCREMENTAL );
98 0 : fd_stem_publish( ctx->stem, 0UL, sig, ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), 0UL, 0UL, 0UL );
99 0 : ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), ctx->manifest_out.chunk0, ctx->manifest_out.wmark );
100 0 : }
101 :
102 : static void
103 : account_cb( void * _ctx,
104 0 : fd_solana_account_hdr_t const * hdr ) {
105 0 : fd_snapin_tile_t * ctx = (fd_snapin_tile_t*)_ctx;
106 :
107 0 : fd_funk_rec_key_t id = fd_funk_acc_key( (fd_pubkey_t*)hdr->meta.pubkey );
108 0 : fd_funk_rec_query_t query[1];
109 0 : fd_funk_rec_t const * rec = fd_funk_rec_query_try( ctx->funk, ctx->funk_txn, &id, query );
110 :
111 0 : int should_publish = 0;
112 0 : fd_funk_rec_prepare_t prepare[1];
113 0 : if( FD_LIKELY( !rec ) ) {
114 0 : should_publish = 1;
115 0 : rec = fd_funk_rec_prepare( ctx->funk, ctx->funk_txn, &id, prepare, NULL );
116 0 : FD_TEST( rec );
117 0 : }
118 :
119 0 : fd_account_meta_t * meta = fd_funk_val( rec, ctx->funk->wksp );
120 0 : if( FD_UNLIKELY( meta ) ) {
121 0 : if( FD_LIKELY( meta->slot>ctx->ssparse->accv_slot ) ) {
122 0 : ctx->acc_data = NULL;
123 0 : return;
124 0 : }
125 :
126 : /* TODO: Reaching here means the existing value is a duplicate
127 : account. We need to hash the existing account and subtract that
128 : hash from the running lthash. */
129 0 : }
130 :
131 0 : if( FD_LIKELY( rec->val_sz<sizeof(fd_account_meta_t)+hdr->meta.data_len ) ) {
132 0 : meta = fd_funk_val_truncate( (fd_funk_rec_t*)rec, ctx->funk->alloc, ctx->funk->wksp, 0UL, sizeof(fd_account_meta_t)+hdr->meta.data_len, NULL );
133 0 : FD_TEST( meta );
134 0 : }
135 :
136 0 : meta->dlen = (uint)hdr->meta.data_len;
137 0 : meta->slot = ctx->ssparse->accv_slot;
138 0 : memcpy( meta->owner, hdr->info.owner, sizeof(fd_pubkey_t) );
139 0 : meta->lamports = hdr->info.lamports;
140 0 : meta->executable = hdr->info.executable;
141 :
142 0 : ctx->acc_data = (uchar*)meta + sizeof(fd_account_meta_t);
143 0 : ctx->metrics.accounts_inserted++;
144 :
145 0 : if( FD_LIKELY( should_publish ) ) fd_funk_rec_publish( ctx->funk, prepare );
146 0 : }
147 :
148 : static void
149 : account_data_cb( void * _ctx,
150 : uchar const * buf,
151 0 : ulong data_sz ) {
152 0 : fd_snapin_tile_t * ctx = (fd_snapin_tile_t*)_ctx;
153 0 : if( FD_UNLIKELY( !ctx->acc_data ) ) return;
154 :
155 0 : fd_memcpy( ctx->acc_data, buf, data_sz );
156 0 : ctx->acc_data += data_sz;
157 0 : }
158 :
159 : static void
160 : transition_malformed( fd_snapin_tile_t * ctx,
161 0 : fd_stem_context_t * stem ) {
162 0 : ctx->state = FD_SNAPIN_STATE_MALFORMED;
163 0 : fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_MALFORMED, 0UL, 0UL, 0UL, 0UL, 0UL );
164 0 : }
165 :
166 : static void
167 : handle_data_frag( fd_snapin_tile_t * ctx,
168 : ulong chunk,
169 : ulong sz,
170 0 : fd_stem_context_t * stem ) {
171 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_MALFORMED ) ) return;
172 :
173 0 : FD_TEST( ctx->state==FD_SNAPIN_STATE_LOADING || ctx->state==FD_SNAPIN_STATE_DONE );
174 0 : FD_TEST( chunk>=ctx->in.chunk0 && chunk<=ctx->in.wmark && sz<=ctx->in.mtu );
175 :
176 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_DONE ) ) {
177 0 : FD_LOG_WARNING(( "received data fragment while in done state" ));
178 0 : transition_malformed( ctx, stem );
179 0 : return;
180 0 : }
181 :
182 0 : uchar const * const chunk_start = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
183 0 : uchar const * const chunk_end = chunk_start + sz;
184 0 : uchar const * cur = chunk_start;
185 :
186 0 : for(;;) {
187 0 : if( FD_UNLIKELY( cur>=chunk_end ) ) {
188 0 : break;
189 0 : }
190 :
191 0 : cur = fd_snapshot_parser_process_chunk( ctx->ssparse, cur, (ulong)( chunk_end-cur ) );
192 0 : if( FD_UNLIKELY( ctx->ssparse->flags ) ) {
193 0 : if( FD_UNLIKELY( ctx->ssparse->flags & SNAP_FLAG_FAILED ) ) {
194 0 : transition_malformed( ctx, stem );
195 0 : return;
196 0 : }
197 0 : }
198 0 : }
199 :
200 0 : if( FD_UNLIKELY( ctx->ssparse->flags & SNAP_FLAG_DONE ) ) ctx->state = FD_SNAPIN_STATE_DONE;
201 :
202 0 : if( FD_LIKELY( ctx->full ) ) ctx->metrics.full_bytes_read += sz;
203 0 : else ctx->metrics.incremental_bytes_read += sz;
204 0 : }
205 :
206 : static void
207 : handle_control_frag( fd_snapin_tile_t * ctx,
208 : fd_stem_context_t * stem,
209 0 : ulong sig ) {
210 0 : switch( sig ) {
211 0 : case FD_SNAPSHOT_MSG_CTRL_RESET_FULL:
212 0 : ctx->full = 1;
213 0 : fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
214 0 : fd_funk_txn_cancel_root( ctx->funk );
215 0 : ctx->state = FD_SNAPIN_STATE_LOADING;
216 0 : break;
217 0 : case FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL:
218 0 : ctx->full = 0;
219 0 : fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
220 0 : if( FD_UNLIKELY( !ctx->funk_txn ) ) fd_funk_txn_cancel_root( ctx->funk );
221 0 : else fd_funk_txn_cancel( ctx->funk, ctx->funk_txn, 0 );
222 0 : ctx->state = FD_SNAPIN_STATE_LOADING;
223 0 : break;
224 0 : case FD_SNAPSHOT_MSG_CTRL_EOF_FULL:
225 0 : FD_TEST( ctx->full );
226 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPIN_STATE_DONE ) ) {
227 0 : FD_LOG_WARNING(( "unexpected end of snapshot when not done parsing" ));
228 0 : transition_malformed( ctx, stem );
229 0 : break;
230 0 : }
231 :
232 0 : fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
233 :
234 0 : fd_funk_txn_xid_t incremental_xid = fd_funk_generate_xid();
235 0 : ctx->funk_txn = fd_funk_txn_prepare( ctx->funk, ctx->funk_txn, &incremental_xid, 0 );
236 0 : FD_TEST( ctx->funk_txn );
237 :
238 0 : ctx->full = 0;
239 0 : ctx->state = FD_SNAPIN_STATE_LOADING;
240 0 : break;
241 0 : case FD_SNAPSHOT_MSG_CTRL_DONE:
242 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPIN_STATE_DONE ) ) {
243 0 : FD_LOG_WARNING(( "unexpected end of snapshot when not done parsing" ));
244 0 : transition_malformed( ctx, stem );
245 0 : break;
246 0 : }
247 :
248 0 : if( FD_LIKELY( ctx->funk_txn ) ) fd_funk_txn_publish_into_parent( ctx->funk, ctx->funk_txn, 0 );
249 0 : fd_stem_publish( stem, 0UL, fd_ssmsg_sig( FD_SSMSG_DONE ), 0UL, 0UL, 0UL, 0UL, 0UL );
250 0 : break;
251 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
252 0 : ctx->state = FD_SNAPIN_STATE_SHUTDOWN;
253 0 : metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
254 0 : break;
255 0 : default:
256 0 : FD_LOG_ERR(( "unexpected control sig %lu", sig ));
257 0 : return;
258 0 : }
259 :
260 : /* We must acknowledge after handling the control frag, because if it
261 : causes us to generate a malformed transition, that must be sent
262 : back to the snaprd controller before the acknowledgement. */
263 0 : fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_ACK, 0UL, 0UL, 0UL, 0UL, 0UL );
264 0 : }
265 :
266 : static inline int
267 : returnable_frag( fd_snapin_tile_t * ctx,
268 : ulong in_idx,
269 : ulong seq,
270 : ulong sig,
271 : ulong chunk,
272 : ulong sz,
273 : ulong ctl,
274 : ulong tsorig,
275 : ulong tspub,
276 0 : fd_stem_context_t * stem ) {
277 0 : (void)in_idx;
278 0 : (void)seq;
279 0 : (void)ctl;
280 0 : (void)tsorig;
281 0 : (void)tspub;
282 :
283 0 : ctx->stem = stem;
284 :
285 0 : FD_TEST( ctx->state!=FD_SNAPIN_STATE_SHUTDOWN );
286 :
287 0 : if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) handle_data_frag( ctx, chunk, sz, stem );
288 0 : else handle_control_frag( ctx, stem, sig );
289 :
290 0 : return 0;
291 0 : }
292 :
293 : static void
294 : privileged_init( fd_topo_t * topo,
295 0 : fd_topo_tile_t * tile ) {
296 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
297 :
298 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
299 0 : fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
300 :
301 0 : FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
302 0 : }
303 :
304 : FD_FN_UNUSED static void
305 : unprivileged_init( fd_topo_t * topo,
306 0 : fd_topo_tile_t * tile ) {
307 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
308 :
309 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
310 0 : fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
311 0 : void * _ssparse = FD_SCRATCH_ALLOC_APPEND( l, fd_snapshot_parser_align(), fd_snapshot_parser_footprint( 1UL<<24UL ) );
312 :
313 0 : ctx->full = 1;
314 0 : ctx->state = FD_SNAPIN_STATE_LOADING;
315 :
316 0 : ctx->boot_timestamp = fd_log_wallclock();
317 :
318 0 : FD_TEST( fd_funk_join( ctx->funk, fd_topo_obj_laddr( topo, tile->snapin.funk_obj_id ) ) );
319 0 : ctx->funk_txn = fd_funk_txn_query( fd_funk_root( ctx->funk ), ctx->funk->txn_map );
320 :
321 0 : ctx->ssparse = fd_snapshot_parser_new( _ssparse, ctx, ctx->seed, 1UL<<24UL, manifest_cb, account_cb, account_data_cb );
322 :
323 0 : FD_TEST( ctx->ssparse );
324 :
325 0 : fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
326 :
327 0 : if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
328 0 : if( FD_UNLIKELY( tile->in_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1", tile->in_cnt ));
329 0 : if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt ));
330 :
331 0 : fd_topo_link_t * writer_link = &topo->links[ tile->out_link_id[ 0UL ] ];
332 0 : ctx->manifest_out.wksp = topo->workspaces[ topo->objs[ writer_link->dcache_obj_id ].wksp_id ].wksp;
333 0 : ctx->manifest_out.chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( writer_link->dcache ), writer_link->dcache );
334 0 : ctx->manifest_out.wmark = fd_dcache_compact_wmark ( ctx->manifest_out.wksp, writer_link->dcache, writer_link->mtu );
335 0 : ctx->manifest_out.chunk = ctx->manifest_out.chunk0;
336 0 : ctx->manifest_out.mtu = writer_link->mtu;
337 :
338 0 : fd_snapshot_parser_reset( ctx->ssparse, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ), ctx->manifest_out.mtu );
339 :
340 0 : fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0UL ] ];
341 0 : fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
342 0 : ctx->in.wksp = in_wksp->wksp;;
343 0 : ctx->in.chunk0 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
344 0 : ctx->in.wmark = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
345 0 : ctx->in.mtu = in_link->mtu;
346 0 : }
347 :
348 0 : #define STEM_BURST 2UL /* For control fragments, one acknowledgement, and one malformed message */
349 0 : #define STEM_LAZY 1000L
350 :
351 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapin_tile_t
352 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapin_tile_t)
353 :
354 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
355 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
356 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
357 :
358 : #include "../../disco/stem/fd_stem.c"
359 :
360 : fd_topo_run_tile_t fd_tile_snapin = {
361 : .name = NAME,
362 : .scratch_align = scratch_align,
363 : .scratch_footprint = scratch_footprint,
364 : .privileged_init = privileged_init,
365 : .unprivileged_init = unprivileged_init,
366 : .run = stem_run,
367 : };
368 :
369 : #undef NAME
|