Line data Source code
1 : #include "utils/fd_ssctrl.h"
2 : #include "utils/fd_ssparse.h"
3 : #include "utils/fd_ssmanifest_parser.h"
4 : #include "utils/fd_slot_delta_parser.h"
5 : #include "utils/fd_ssmsg.h"
6 :
7 : #include "../../disco/topo/fd_topo.h"
8 : #include "../../disco/metrics/fd_metrics.h"
9 : #include "../../flamenco/runtime/fd_acc_mgr.h"
10 : #include "../../funk/fd_funk.h"
11 : #include "../../flamenco/runtime/fd_txncache.h"
12 : #include "../../flamenco/runtime/sysvar/fd_sysvar_slot_history.h"
13 :
14 : #include "generated/fd_snapin_tile_seccomp.h"
15 :
16 : #define NAME "snapin"
17 :
18 : /* The snapin tile is a state machine that parses and loads a full
19 : and optionally an incremental snapshot. It is currently responsible
20 : for loading accounts into an in-memory database, though this may
21 : change. */
22 :
23 0 : #define FD_SNAPIN_STATE_LOADING (0) /* We are inserting accounts from a snapshot */
24 0 : #define FD_SNAPIN_STATE_DONE (1) /* We are done inserting accounts from a snapshot */
25 0 : #define FD_SNAPIN_STATE_MALFORMED (2) /* The snapshot is malformed, we are waiting for a reset notification */
26 0 : #define FD_SNAPIN_STATE_SHUTDOWN (3) /* The tile is done, been told to shut down, and has likely already exited */
27 :
28 : /* 300 here is from status_cache.rs::MAX_CACHE_ENTRIES which is the most
29 : root slots Agave could possibly serve in a snapshot. */
30 : #define FD_SNAPIN_TXNCACHE_MAX_ENTRIES (300UL*FD_PACK_MAX_TXNCACHE_TXN_PER_SLOT)
31 :
32 : /* 300 root slots in the slot deltas array, and each one references all
33 : 151 prior blockhashes that it's able to. */
34 : #define FD_SNAPIN_MAX_SLOT_DELTA_GROUPS (300UL*151UL)
35 :
36 : struct fd_blockhash_entry {
37 : fd_hash_t blockhash;
38 :
39 : struct {
40 : ulong prev;
41 : ulong next;
42 : } map;
43 : };
44 :
45 : typedef struct fd_blockhash_entry fd_blockhash_entry_t;
46 :
47 : #define MAP_NAME blockhash_map
48 0 : #define MAP_KEY blockhash
49 : #define MAP_KEY_T fd_hash_t
50 : #define MAP_ELE_T fd_blockhash_entry_t
51 0 : #define MAP_KEY_EQ(k0,k1) (!memcmp((k0),(k1), sizeof(fd_hash_t)))
52 0 : #define MAP_KEY_HASH(key,seed) (fd_hash((seed),(key),sizeof(fd_hash_t)))
53 0 : #define MAP_PREV map.prev
54 0 : #define MAP_NEXT map.next
55 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
56 : #include "../../util/tmpl/fd_map_chain.c"
57 :
58 : struct blockhash_group {
59 : uchar blockhash[ 32UL ];
60 : ulong txnhash_offset;
61 : };
62 :
63 : typedef struct blockhash_group blockhash_group_t;
64 :
65 : struct fd_snapin_tile {
66 : int full;
67 : int state;
68 :
69 : ulong seed;
70 : long boot_timestamp;
71 :
72 : fd_funk_t funk[1];
73 : fd_txncache_t * txncache;
74 : uchar * acc_data;
75 :
76 : fd_funk_txn_xid_t xid[1]; /* txn XID */
77 :
78 : fd_stem_context_t * stem;
79 : fd_ssparse_t * ssparse;
80 : fd_ssmanifest_parser_t * manifest_parser;
81 : fd_slot_delta_parser_t * slot_delta_parser;
82 :
83 : struct {
84 : int manifest_done;
85 : int status_cache_done;
86 : int manifest_processed;
87 : } flags;
88 :
89 : ulong bank_slot;
90 :
91 : ulong blockhash_offsets_len;
92 : blockhash_group_t * blockhash_offsets;
93 :
94 : ulong txncache_entries_len;
95 : fd_sstxncache_entry_t * txncache_entries;
96 :
97 : fd_txncache_fork_id_t txncache_root_fork_id;
98 :
99 : struct {
100 : ulong full_bytes_read;
101 : ulong incremental_bytes_read;
102 : ulong accounts_inserted;
103 : } metrics;
104 :
105 : struct {
106 : fd_wksp_t * wksp;
107 : ulong chunk0;
108 : ulong wmark;
109 : ulong mtu;
110 : ulong pos;
111 : } in;
112 :
113 : struct {
114 : fd_wksp_t * wksp;
115 : ulong chunk0;
116 : ulong wmark;
117 : ulong chunk;
118 : ulong mtu;
119 : } manifest_out;
120 : };
121 :
122 : typedef struct fd_snapin_tile fd_snapin_tile_t;
123 :
124 : static inline int
125 0 : should_shutdown( fd_snapin_tile_t * ctx ) {
126 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_SHUTDOWN ) ) {
127 0 : FD_LOG_NOTICE(( "loaded %.1fM accounts from snapshot in %.3f seconds", (double)ctx->metrics.accounts_inserted/1e6, (double)(fd_log_wallclock()-ctx->boot_timestamp)/1e9 ));
128 0 : }
129 0 : return ctx->state==FD_SNAPIN_STATE_SHUTDOWN;
130 0 : }
131 :
132 : static ulong
133 0 : scratch_align( void ) {
134 0 : return 128UL;
135 0 : }
136 :
137 : static ulong
138 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
139 0 : (void)tile;
140 0 : ulong l = FD_LAYOUT_INIT;
141 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
142 0 : l = FD_LAYOUT_APPEND( l, fd_ssparse_align(), fd_ssparse_footprint( 1UL<<24UL ) );
143 0 : l = FD_LAYOUT_APPEND( l, fd_txncache_align(), fd_txncache_footprint( tile->snapin.max_live_slots ) );
144 0 : l = FD_LAYOUT_APPEND( l, fd_ssmanifest_parser_align(), fd_ssmanifest_parser_footprint() );
145 0 : l = FD_LAYOUT_APPEND( l, fd_slot_delta_parser_align(), fd_slot_delta_parser_footprint() );
146 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_sstxncache_entry_t), sizeof(fd_sstxncache_entry_t)*FD_SNAPIN_TXNCACHE_MAX_ENTRIES );
147 0 : l = FD_LAYOUT_APPEND( l, alignof(blockhash_group_t), sizeof(blockhash_group_t)*FD_SNAPIN_MAX_SLOT_DELTA_GROUPS );
148 0 : return FD_LAYOUT_FINI( l, alignof(fd_snapin_tile_t) );
149 0 : }
150 :
151 : static void
152 0 : metrics_write( fd_snapin_tile_t * ctx ) {
153 0 : FD_MGAUGE_SET( SNAPIN, FULL_BYTES_READ, ctx->metrics.full_bytes_read );
154 0 : FD_MGAUGE_SET( SNAPIN, INCREMENTAL_BYTES_READ, ctx->metrics.incremental_bytes_read );
155 0 : FD_MGAUGE_SET( SNAPIN, ACCOUNTS_INSERTED, ctx->metrics.accounts_inserted );
156 0 : FD_MGAUGE_SET( SNAPIN, STATE, (ulong)ctx->state );
157 0 : }
158 :
159 : static int
160 : verify_slot_deltas_with_slot_history( fd_snapin_tile_t * ctx,
161 0 : fd_slot_history_global_t * slot_history ) {
162 :
163 0 : for( ulong i=0UL; i<ctx->txncache_entries_len; i++ ) {
164 0 : fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[i];
165 0 : if( FD_UNLIKELY( fd_sysvar_slot_history_find_slot( slot_history, entry->slot, NULL )!=FD_SLOT_HISTORY_SLOT_FOUND ) ) return -1;
166 0 : }
167 0 : return 0;
168 0 : }
169 :
170 : static int
171 : verify_slot_deltas_with_bank_slot( fd_snapin_tile_t * ctx,
172 0 : ulong bank_slot ) {
173 0 : for( ulong i=0UL; i<ctx->txncache_entries_len; i++ ) {
174 0 : fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[i];
175 0 : if( FD_UNLIKELY( entry->slot>bank_slot ) ) return -1;
176 0 : }
177 0 : return 0;
178 0 : }
179 :
180 : static void
181 : transition_malformed( fd_snapin_tile_t * ctx,
182 0 : fd_stem_context_t * stem ) {
183 0 : ctx->state = FD_SNAPIN_STATE_MALFORMED;
184 0 : fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_MALFORMED, 0UL, 0UL, 0UL, 0UL, 0UL );
185 0 : }
186 :
187 : static int
188 : populate_txncache( fd_snapin_tile_t * ctx,
189 : fd_snapshot_manifest_blockhash_t const blockhashes[ static 301UL ],
190 0 : ulong blockhashes_len ) {
191 : /* Our txncache internally contains the fork structure for the chain,
192 : which we need to recreate here. Because snapshots are only served
193 : for rooted slots, there is actually no forking, and the bank forks
194 : are just a single bank, the root, like
195 :
196 : _root
197 :
198 : But the txncache also must contain the 150 more recent banks prior
199 : to the root (151 rooted banks total), looking like,
200 :
201 :
202 : _root_150 -> _root_149 -> ... -> _root_2 -> _root_1 -> _root
203 :
204 : Our txncache is "slot agnostic" meaning there is no concept of a
205 : slot number in it. It just has a fork tree structure. So long as
206 : the fork tree is isomorphic to the actual bank forks, and each bank
207 : has the correct blockhash, it works.
208 :
209 : So the challenge is simply to create this chain of 151 forks in the
210 : txncache, with correct blockhashes, and then insert all the
211 : transactions into it.
212 :
213 : Constructing the chain of blockhashes is easy. It is just the
214 : BLOCKHASH_QUEUE array in the manifest. This array is unfortuantely
215 : not sorted and appears in random order, but it has a hash_index
216 : field which is a gapless index, starting at some arbitrary offset,
217 : so we can back out the 151 blockhashes we need from this, by first
218 : finding the max hash_index as _max and then collecting hash entries
219 : via,
220 :
221 : _root_150 -> _root_149 -> ... -> _root_2 -> _root_1 -> _root
222 : _max-150 -> _max-149 -> ... -> _max-2 -> _max-1 -> _max
223 :
224 : Now the remaining problem is inserting transactions into this
225 : chain. Remember each transaction needs to be inserted with:
226 :
227 : (a) The fork ID (position of the bank in the chain) it was executed in.
228 : (b) The blockhash of the bank it referenced.
229 :
230 : (b) is trivial to retrieve, as it's in the actual slot_deltas entry
231 : in the manifest served by Agave. But (a) is mildly annoying. Agave
232 : serves slot_deltas based on slot, so we need an additional mapping
233 : from slot to position in our banks chain. It turns out we have to
234 : go to yet another structure in the manifest to retrieve this, the
235 : ancestors array. This is just an array of slot values, so we need
236 : to sort it, and line it up against our banks chain like so,
237 :
238 : _root_150 -> _root_149 -> ... -> _root_2 -> _root_1 -> _root
239 : _max-150 -> _max-149 -> ... -> _max-2 -> _max-1 -> _max
240 : _slots_150 -> _slots_149 -> ... -> _slots_2 -> _slots_1 -> _slots
241 :
242 : From there we are done.
243 :
244 : Well almost ... if you were paying attention you might have noticed
245 : this is a lot of work and we are lazy. Why don't we just ignore the
246 : slot mapping and assume everything executed at the root slot
247 : exactly? The only invariant we should maintain from a memory
248 : perspective is that at most, across all active banks,
249 : FD_MAX_TXN_PER_SLOT transactions are stored per slot, but we
250 : have preserved that. It is not true "per slot" technically, but
251 : it's true across all slots, and the memory is aggregated. It will
252 : also always be true, even as slots are garbage collected, because
253 : entries are collected by referece blockhash, not executed slot.
254 :
255 : ... actually we can't do this. There's more broken things here.
256 : The Agave status decided to only store 20 bytes for 32 byte
257 : transaction hashes to save on memory. That's OK, but they didn't
258 : just take the first 20 bytes. They instead, for each blockhash,
259 : take a random offset between 0 and 12, and store bytes
260 : [ offset, offset+20 ) of the transaction hash. We need to know this
261 : offset to be able to query the txncache later, so we need to
262 : retrieve it from the slot_deltas entry in the manifest, and key it
263 : into our txncache. Unfortunately this offset is stored per slot in
264 : the slot_deltas entry. So we need to first go and retrieve the
265 : ancestors array, sort it, and line it up against our banks chain as
266 : described above, and then go through slot deltas, to retrieve the
267 : offset for each slot, and stick it into the appropriate bank in
268 : our chain. */
269 :
270 0 : FD_TEST( blockhashes_len<=301UL );
271 0 : FD_TEST( blockhashes_len>0UL );
272 :
273 0 : ulong seq_min = ULONG_MAX;
274 0 : for( ulong i=0UL; i<blockhashes_len; i++ ) seq_min = fd_ulong_min( seq_min, blockhashes[ i ].hash_index );
275 :
276 0 : ulong seq_max;
277 0 : if( FD_UNLIKELY( __builtin_uaddl_overflow( seq_min, blockhashes_len, &seq_max ) ) ) {
278 0 : FD_LOG_WARNING(( "corrupt snapshot: blockhash queue sequence number wraparound (seq_min=%lu age_cnt=%lu)", seq_min, blockhashes_len ));
279 0 : transition_malformed( ctx, ctx->stem );
280 0 : return 1;
281 0 : }
282 :
283 : /* First let's construct the chain array as described above. But
284 : index 0 will be the root, index 1 the root's parent, etc. */
285 :
286 0 : struct {
287 0 : int exists;
288 0 : uchar blockhash[ 32UL ];
289 0 : fd_txncache_fork_id_t fork_id;
290 0 : ulong txnhash_offset;
291 0 : } banks[ 301UL ] = {0};
292 :
293 0 : for( ulong i=0UL; i<blockhashes_len; i++ ) {
294 0 : fd_snapshot_manifest_blockhash_t const * elem = &blockhashes[ i ];
295 0 : ulong idx;
296 0 : if( FD_UNLIKELY( __builtin_usubl_overflow( elem->hash_index, seq_min, &idx ) ) ) {
297 0 : FD_LOG_WARNING(( "corrupt snapshot: gap in blockhash queue (seq=[%lu,%lu) idx=%lu)", seq_min, seq_max, blockhashes[ i ].hash_index ));
298 0 : transition_malformed( ctx, ctx->stem );
299 0 : return 1;
300 0 : }
301 :
302 0 : if( FD_UNLIKELY( idx>=blockhashes_len ) ) {
303 0 : FD_LOG_WARNING(( "corrupt snapshot: blockhash queue index out of range (seq_min=%lu age_cnt=%lu idx=%lu)", seq_min, blockhashes_len, idx ));
304 0 : transition_malformed( ctx, ctx->stem );
305 0 : return 1;
306 0 : }
307 :
308 0 : if( FD_UNLIKELY( banks[ blockhashes_len-1UL-idx ].exists ) ) {
309 0 : FD_LOG_WARNING(( "corrupt snapshot: duplicate blockhash hash_index %lu", elem->hash_index ));
310 0 : transition_malformed( ctx, ctx->stem );
311 0 : return 1;
312 0 : }
313 :
314 0 : banks[ blockhashes_len-1UL-idx ].fork_id.val = USHORT_MAX;
315 0 : banks[ blockhashes_len-1UL-idx ].txnhash_offset = ULONG_MAX;
316 0 : memcpy( banks[ blockhashes_len-1UL-idx ].blockhash, elem->hash, 32UL );
317 0 : banks[ blockhashes_len-1UL-idx ].exists = 1;
318 0 : }
319 :
320 0 : ulong chain_len = fd_ulong_min( blockhashes_len, 151UL );
321 :
322 : /* Now we need a hashset of just the 151 most recent blockhashes,
323 : anything else is a nonce transaction which we do not insert, or an
324 : already expired transaction which can also be discarded. */
325 :
326 0 : uchar * _map = fd_alloca_check( alignof(blockhash_map_t), blockhash_map_footprint( 1024UL ) );
327 0 : blockhash_map_t * blockhash_map = blockhash_map_join( blockhash_map_new( _map, 1024UL, ctx->seed ) );
328 0 : FD_TEST( blockhash_map );
329 :
330 0 : fd_blockhash_entry_t blockhash_pool[ 151UL ];
331 0 : for( ulong i=0UL; i<chain_len; i++ ) {
332 0 : fd_memcpy( blockhash_pool[ i ].blockhash.uc, banks[ i ].blockhash, 32UL );
333 :
334 0 : if( FD_UNLIKELY( blockhash_map_ele_query_const( blockhash_map, &blockhash_pool[ i ].blockhash, NULL, blockhash_pool ) ) ) {
335 0 : FD_LOG_WARNING(( "corrupt snapshot: duplicate blockhash %s in 151 most recent blockhashes", FD_BASE58_ENC_32_ALLOCA( banks[ i ].blockhash ) ));
336 0 : transition_malformed( ctx, ctx->stem );
337 0 : return 1;
338 0 : }
339 :
340 0 : blockhash_map_ele_insert( blockhash_map, &blockhash_pool[ i ], blockhash_pool );
341 0 : }
342 :
343 : /* Now load the blockhash offsets for these blockhashes ... */
344 0 : FD_TEST( ctx->blockhash_offsets_len ); /* Must be at least one else nothing would be rooted */
345 0 : for( ulong i=0UL; i<ctx->blockhash_offsets_len; i++ ) {
346 0 : fd_hash_t key;
347 0 : fd_memcpy( key.uc, ctx->blockhash_offsets[ i ].blockhash, 32UL );
348 0 : fd_blockhash_entry_t * entry = blockhash_map_ele_query( blockhash_map, &key, NULL, blockhash_pool );
349 0 : if( FD_UNLIKELY( !entry ) ) continue; /* Not in the most recent 151 blockhashes */
350 :
351 0 : ulong chain_idx = (ulong)(entry - blockhash_pool);
352 :
353 0 : if( FD_UNLIKELY( banks[ chain_idx ].txnhash_offset!=ULONG_MAX && banks[ chain_idx ].txnhash_offset!=ctx->blockhash_offsets[ i ].txnhash_offset ) ) {
354 0 : FD_LOG_WARNING(( "corrupt snapshot: conflicting txnhash offsets for blockhash %s", FD_BASE58_ENC_32_ALLOCA( entry->blockhash.uc ) ));
355 0 : transition_malformed( ctx, ctx->stem );
356 0 : return 1;
357 0 : }
358 :
359 0 : banks[ chain_idx ].txnhash_offset = ctx->blockhash_offsets[ i ].txnhash_offset;
360 0 : }
361 :
362 : /* Construct the linear fork chain in the txncache. */
363 :
364 0 : fd_txncache_fork_id_t parent = { .val = USHORT_MAX };
365 0 : for( ulong i=0UL; i<chain_len; i++ ) banks[ chain_len-1UL-i ].fork_id = parent = fd_txncache_attach_child( ctx->txncache, parent );
366 0 : for( ulong i=0UL; i<chain_len; i++ ) fd_txncache_attach_blockhash( ctx->txncache, banks[ i ].fork_id, banks[ i ].blockhash );
367 :
368 : /* Now insert all transactions as if they executed at the current
369 : root, per above. */
370 :
371 0 : ulong insert_cnt = 0UL;
372 0 : for( ulong i=0UL; i<ctx->txncache_entries_len; i++ ) {
373 0 : fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[ i ];
374 0 : fd_hash_t key;
375 0 : fd_memcpy( key.uc, entry->blockhash, 32UL );
376 0 : if( FD_UNLIKELY( !blockhash_map_ele_query_const( blockhash_map, &key, NULL, blockhash_pool ) ) ) continue;
377 :
378 0 : insert_cnt++;
379 0 : fd_txncache_insert( ctx->txncache, banks[ 0UL ].fork_id, entry->blockhash, entry->txnhash );
380 0 : }
381 :
382 0 : FD_LOG_INFO(( "inserted %lu/%lu transactions into the txncache", insert_cnt, ctx->txncache_entries_len ));
383 :
384 : /* Then finalize all the banks (freezing them) and setting the txnhash
385 : offset so future queries use the correct offset. If the offset is
386 : ULONG_MAX this is valid, it means the blockhash had no transactions
387 : in it, so there's nothing in the status cache under that blockhash.
388 :
389 : Just set the offset to 0 in this case, it doesn't matter, but
390 : should be valid between 0 and 12 inclusive. */
391 0 : for( ulong i=0UL; i<chain_len; i++ ) {
392 0 : ulong txnhash_offset = banks[ chain_len-1UL-i ].txnhash_offset==ULONG_MAX ? 0UL : banks[ chain_len-1UL-i ].txnhash_offset;
393 0 : fd_txncache_finalize_fork( ctx->txncache, banks[ chain_len-1UL-i ].fork_id, txnhash_offset, banks[ chain_len-1UL-i ].blockhash );
394 0 : }
395 :
396 0 : for( ulong i=1UL; i<chain_len; i++ ) fd_txncache_advance_root( ctx->txncache, banks[ chain_len-1UL-i ].fork_id );
397 :
398 0 : ctx->txncache_root_fork_id = parent;
399 :
400 0 : return 0;
401 0 : }
402 :
403 : static void
404 0 : process_manifest( fd_snapin_tile_t * ctx ) {
405 0 : fd_snapshot_manifest_t * manifest = fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk );
406 :
407 0 : ctx->bank_slot = manifest->slot;
408 0 : if( FD_UNLIKELY( verify_slot_deltas_with_bank_slot( ctx, manifest->slot ) ) ) {
409 0 : FD_LOG_WARNING(( "slot deltas verification failed" ));
410 0 : transition_malformed( ctx, ctx->stem );
411 0 : return;
412 0 : }
413 :
414 0 : if( FD_UNLIKELY( populate_txncache( ctx, manifest->blockhashes, manifest->blockhashes_len ) ) ) {
415 0 : FD_LOG_WARNING(( "populating txncache failed" ));
416 0 : transition_malformed( ctx, ctx->stem );
417 0 : return;
418 0 : }
419 :
420 0 : manifest->txncache_fork_id = ctx->txncache_root_fork_id.val;
421 :
422 0 : ulong sig = ctx->full ? fd_ssmsg_sig( FD_SSMSG_MANIFEST_FULL ) :
423 0 : fd_ssmsg_sig( FD_SSMSG_MANIFEST_INCREMENTAL );
424 0 : fd_stem_publish( ctx->stem, 0UL, sig, ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), 0UL, 0UL, 0UL );
425 0 : ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), ctx->manifest_out.chunk0, ctx->manifest_out.wmark );
426 0 : }
427 :
428 : static void
429 : process_account_header( fd_snapin_tile_t * ctx,
430 0 : fd_ssparse_advance_result_t * result ) {
431 0 : fd_funk_rec_key_t id = fd_funk_acc_key( (fd_pubkey_t const*)result->account_header.pubkey );
432 0 : fd_funk_rec_query_t query[1];
433 0 : fd_funk_rec_t const * rec = fd_funk_rec_query_try( ctx->funk, ctx->xid, &id, query );
434 :
435 0 : int should_publish = 0;
436 0 : fd_funk_rec_prepare_t prepare[1];
437 0 : if( FD_LIKELY( !rec ) ) {
438 0 : should_publish = 1;
439 0 : rec = fd_funk_rec_prepare( ctx->funk, ctx->xid, &id, prepare, NULL );
440 0 : FD_TEST( rec );
441 0 : }
442 :
443 0 : fd_account_meta_t * meta = fd_funk_val( rec, ctx->funk->wksp );
444 0 : if( FD_UNLIKELY( meta ) ) {
445 0 : if( FD_LIKELY( meta->slot>result->account_header.slot ) ) {
446 0 : ctx->acc_data = NULL;
447 0 : return;
448 0 : }
449 :
450 : /* TODO: Reaching here means the existing value is a duplicate
451 : account. We need to hash the existing account and subtract that
452 : hash from the running lthash. */
453 0 : }
454 :
455 0 : if( FD_LIKELY( rec->val_sz<sizeof(fd_account_meta_t)+result->account_header.data_len ) ) {
456 0 : meta = fd_funk_val_truncate( (fd_funk_rec_t*)rec, ctx->funk->alloc, ctx->funk->wksp, 0UL, sizeof(fd_account_meta_t)+result->account_header.data_len, NULL );
457 0 : FD_TEST( meta );
458 0 : }
459 :
460 0 : meta->dlen = (uint)result->account_header.data_len;
461 0 : meta->slot = result->account_header.slot;
462 0 : memcpy( meta->owner, result->account_header.owner, sizeof(fd_pubkey_t) );
463 0 : meta->lamports = result->account_header.lamports;
464 0 : meta->executable = (uchar)result->account_header.executable;
465 :
466 0 : ctx->acc_data = (uchar*)meta + sizeof(fd_account_meta_t);
467 0 : ctx->metrics.accounts_inserted++;
468 :
469 0 : if( FD_LIKELY( should_publish ) ) fd_funk_rec_publish( ctx->funk, prepare );
470 0 : }
471 :
472 : static void
473 : process_account_data( fd_snapin_tile_t * ctx,
474 0 : fd_ssparse_advance_result_t * result ) {
475 0 : if( FD_UNLIKELY( !ctx->acc_data ) ) return;
476 :
477 0 : fd_memcpy( ctx->acc_data, result->account_data.data, result->account_data.data_sz );
478 0 : ctx->acc_data += result->account_data.data_sz;
479 0 : }
480 :
481 : static int
482 : handle_data_frag( fd_snapin_tile_t * ctx,
483 : ulong chunk,
484 : ulong sz,
485 0 : fd_stem_context_t * stem ) {
486 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_MALFORMED ) ) return 0;
487 :
488 0 : FD_TEST( ctx->state==FD_SNAPIN_STATE_LOADING || ctx->state==FD_SNAPIN_STATE_DONE );
489 0 : FD_TEST( chunk>=ctx->in.chunk0 && chunk<=ctx->in.wmark && sz<=ctx->in.mtu );
490 :
491 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPIN_STATE_DONE ) ) {
492 0 : FD_LOG_WARNING(( "received data fragment while in done state" ));
493 0 : transition_malformed( ctx, stem );
494 0 : return 0;
495 0 : }
496 :
497 0 : for(;;) {
498 0 : if( FD_UNLIKELY( sz-ctx->in.pos==0UL ) ) break;
499 :
500 0 : uchar const * data = (uchar const *)fd_chunk_to_laddr_const( ctx->in.wksp, chunk ) + ctx->in.pos;
501 :
502 0 : fd_ssparse_advance_result_t result[1];
503 0 : int res = fd_ssparse_advance( ctx->ssparse, data, sz-ctx->in.pos, result );
504 0 : switch( res ) {
505 0 : case FD_SSPARSE_ADVANCE_ERROR:
506 0 : transition_malformed( ctx, stem );
507 0 : return 0;
508 0 : case FD_SSPARSE_ADVANCE_AGAIN:
509 0 : break;
510 0 : case FD_SSPARSE_ADVANCE_MANIFEST: {
511 0 : int res = fd_ssmanifest_parser_consume( ctx->manifest_parser,
512 0 : result->manifest.data,
513 0 : result->manifest.data_sz,
514 0 : result->manifest.acc_vec_map,
515 0 : result->manifest.acc_vec_pool );
516 0 : if( FD_UNLIKELY( res==FD_SSMANIFEST_PARSER_ADVANCE_ERROR ) ) {
517 0 : transition_malformed( ctx, stem );
518 0 : return 0;
519 0 : } else if( FD_LIKELY( res==FD_SSMANIFEST_PARSER_ADVANCE_DONE ) ) {
520 0 : ctx->flags.manifest_done = 1;
521 0 : }
522 0 : break;
523 0 : }
524 0 : case FD_SSPARSE_ADVANCE_STATUS_CACHE: {
525 0 : fd_slot_delta_parser_advance_result_t sd_result[1];
526 0 : ulong bytes_remaining = result->status_cache.data_sz;
527 :
528 0 : while( bytes_remaining ) {
529 0 : int res = fd_slot_delta_parser_consume( ctx->slot_delta_parser,
530 0 : result->status_cache.data,
531 0 : bytes_remaining,
532 0 : sd_result );
533 0 : if( FD_UNLIKELY( res<0 ) ) {
534 0 : transition_malformed( ctx, stem );
535 0 : return 0;
536 0 : } else if( FD_LIKELY( res==FD_SLOT_DELTA_PARSER_ADVANCE_GROUP ) ) {
537 0 : if( FD_UNLIKELY( ctx->blockhash_offsets_len>=FD_SNAPIN_MAX_SLOT_DELTA_GROUPS ) ) FD_LOG_ERR(( "blockhash offsets overflow, max is %lu", FD_SNAPIN_MAX_SLOT_DELTA_GROUPS ));
538 :
539 0 : memcpy( ctx->blockhash_offsets[ ctx->blockhash_offsets_len ].blockhash, sd_result->group.blockhash, 32UL );
540 0 : ctx->blockhash_offsets[ ctx->blockhash_offsets_len ].txnhash_offset = sd_result->group.txnhash_offset;
541 0 : ctx->blockhash_offsets_len++;
542 0 : } else if( FD_LIKELY( res==FD_SLOT_DELTA_PARSER_ADVANCE_ENTRY ) ) {
543 0 : if( FD_UNLIKELY( ctx->txncache_entries_len>=FD_SNAPIN_TXNCACHE_MAX_ENTRIES ) ) FD_LOG_ERR(( "txncache entries overflow, max is %lu", FD_SNAPIN_TXNCACHE_MAX_ENTRIES ));
544 0 : ctx->txncache_entries[ ctx->txncache_entries_len++ ] = *sd_result->entry;
545 0 : }
546 :
547 0 : bytes_remaining -= sd_result->bytes_consumed;
548 0 : result->status_cache.data += sd_result->bytes_consumed;
549 0 : }
550 :
551 0 : ctx->flags.status_cache_done = fd_slot_delta_parser_consume( ctx->slot_delta_parser, result->status_cache.data, 0UL, sd_result )==FD_SLOT_DELTA_PARSER_ADVANCE_DONE;
552 0 : break;
553 0 : }
554 0 : case FD_SSPARSE_ADVANCE_ACCOUNT_HEADER:
555 0 : process_account_header( ctx, result );
556 0 : break;
557 0 : case FD_SSPARSE_ADVANCE_ACCOUNT_DATA:
558 0 : process_account_data( ctx, result );
559 0 : break;
560 0 : case FD_SSPARSE_ADVANCE_DONE:
561 0 : ctx->state = FD_SNAPIN_STATE_DONE;
562 0 : break;
563 0 : default:
564 0 : FD_LOG_ERR(( "unexpected fd_ssparse_advance result %d", res ));
565 0 : break;
566 0 : }
567 :
568 0 : if( FD_UNLIKELY( !ctx->flags.manifest_processed && ctx->flags.manifest_done && ctx->flags.status_cache_done ) ) {
569 0 : process_manifest( ctx );
570 0 : ctx->flags.manifest_processed = 1;
571 0 : }
572 :
573 0 : ctx->in.pos += result->bytes_consumed;
574 0 : if( FD_LIKELY( ctx->full ) ) ctx->metrics.full_bytes_read += result->bytes_consumed;
575 0 : else ctx->metrics.incremental_bytes_read += result->bytes_consumed;
576 0 : }
577 :
578 0 : int reprocess_frag = ctx->in.pos<sz;
579 0 : if( FD_LIKELY( !reprocess_frag ) ) ctx->in.pos = 0UL;
580 0 : return reprocess_frag;
581 0 : }
582 :
583 : static void
584 : handle_control_frag( fd_snapin_tile_t * ctx,
585 : fd_stem_context_t * stem,
586 0 : ulong sig ) {
587 0 : switch( sig ) {
588 0 : case FD_SNAPSHOT_MSG_CTRL_RESET_FULL:
589 0 : ctx->full = 1;
590 0 : ctx->txncache_entries_len = 0UL;
591 0 : ctx->blockhash_offsets_len = 0UL;
592 0 : fd_txncache_reset( ctx->txncache );
593 0 : fd_ssparse_reset( ctx->ssparse );
594 0 : fd_ssmanifest_parser_init( ctx->manifest_parser, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ) );
595 0 : fd_slot_delta_parser_init( ctx->slot_delta_parser );
596 0 : fd_funk_txn_remove_published( ctx->funk );
597 0 : fd_memset( &ctx->flags, 0, sizeof(ctx->flags) );
598 0 : ctx->state = FD_SNAPIN_STATE_LOADING;
599 0 : break;
600 0 : case FD_SNAPSHOT_MSG_CTRL_RESET_INCREMENTAL:
601 0 : ctx->full = 0;
602 0 : ctx->txncache_entries_len = 0UL;
603 0 : ctx->blockhash_offsets_len = 0UL;
604 0 : fd_txncache_reset( ctx->txncache );
605 0 : fd_ssparse_reset( ctx->ssparse );
606 0 : fd_ssmanifest_parser_init( ctx->manifest_parser, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ) );
607 0 : fd_slot_delta_parser_init( ctx->slot_delta_parser );
608 0 : fd_funk_txn_cancel( ctx->funk, ctx->xid );
609 0 : fd_funk_txn_xid_copy( ctx->xid, fd_funk_last_publish( ctx->funk ) );
610 0 : fd_memset( &ctx->flags, 0, sizeof(ctx->flags) );
611 0 : ctx->state = FD_SNAPIN_STATE_LOADING;
612 0 : break;
613 0 : case FD_SNAPSHOT_MSG_CTRL_EOF_FULL:
614 0 : FD_TEST( ctx->full );
615 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPIN_STATE_DONE ) ) {
616 0 : FD_LOG_WARNING(( "unexpected end of snapshot when not done parsing" ));
617 0 : transition_malformed( ctx, stem );
618 0 : break;
619 0 : }
620 :
621 0 : ctx->txncache_entries_len = 0UL;
622 0 : ctx->blockhash_offsets_len = 0UL;
623 0 : fd_txncache_reset( ctx->txncache );
624 0 : fd_ssparse_reset( ctx->ssparse );
625 0 : fd_ssmanifest_parser_init( ctx->manifest_parser, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ) );
626 0 : fd_slot_delta_parser_init( ctx->slot_delta_parser );
627 0 : fd_memset( &ctx->flags, 0, sizeof(ctx->flags) );
628 :
629 0 : fd_funk_txn_xid_t incremental_xid = fd_funk_generate_xid();
630 0 : fd_funk_txn_prepare( ctx->funk, ctx->xid, &incremental_xid );
631 0 : fd_funk_txn_xid_copy( ctx->xid, &incremental_xid );
632 :
633 0 : ctx->full = 0;
634 0 : ctx->state = FD_SNAPIN_STATE_LOADING;
635 0 : break;
636 0 : case FD_SNAPSHOT_MSG_CTRL_DONE: {
637 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPIN_STATE_DONE ) ) {
638 0 : FD_LOG_WARNING(( "unexpected end of snapshot when not done parsing" ));
639 0 : transition_malformed( ctx, stem );
640 0 : break;
641 0 : }
642 :
643 0 : uchar slot_history_mem[ FD_SYSVAR_SLOT_HISTORY_FOOTPRINT ];
644 0 : fd_slot_history_global_t * slot_history = fd_sysvar_slot_history_read( ctx->funk, ctx->xid, slot_history_mem );
645 0 : if( FD_UNLIKELY( verify_slot_deltas_with_slot_history( ctx, slot_history ) ) ) {
646 0 : FD_LOG_WARNING(( "slot deltas verification failed" ));
647 0 : transition_malformed( ctx, stem );
648 0 : break;
649 0 : }
650 :
651 : /* Publish any remaining funk txn */
652 0 : if( FD_LIKELY( fd_funk_last_publish_is_frozen( ctx->funk ) ) ) {
653 0 : fd_funk_txn_publish_into_parent( ctx->funk, ctx->xid );
654 0 : }
655 0 : FD_TEST( !fd_funk_last_publish_is_frozen( ctx->funk ) );
656 :
657 : /* Make 'Last published' XID equal the restored slot number */
658 0 : fd_funk_txn_xid_t target_xid = { .ul = { ctx->bank_slot, ctx->bank_slot } };
659 0 : fd_funk_txn_prepare( ctx->funk, ctx->xid, &target_xid );
660 0 : fd_funk_txn_publish_into_parent( ctx->funk, &target_xid );
661 0 : fd_funk_txn_xid_copy( ctx->xid, &target_xid );
662 :
663 0 : fd_stem_publish( stem, 0UL, fd_ssmsg_sig( FD_SSMSG_DONE ), 0UL, 0UL, 0UL, 0UL, 0UL );
664 0 : break;
665 0 : }
666 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
667 0 : ctx->state = FD_SNAPIN_STATE_SHUTDOWN;
668 0 : metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
669 0 : break;
670 0 : default:
671 0 : FD_LOG_ERR(( "unexpected control sig %lu", sig ));
672 0 : return;
673 0 : }
674 :
675 : /* We must acknowledge after handling the control frag, because if it
676 : causes us to generate a malformed transition, that must be sent
677 : back to the snaprd controller before the acknowledgement. */
678 0 : fd_stem_publish( stem, 1UL, FD_SNAPSHOT_MSG_CTRL_ACK, 0UL, 0UL, 0UL, 0UL, 0UL );
679 0 : }
680 :
681 : static inline int
682 : returnable_frag( fd_snapin_tile_t * ctx,
683 : ulong in_idx,
684 : ulong seq,
685 : ulong sig,
686 : ulong chunk,
687 : ulong sz,
688 : ulong ctl,
689 : ulong tsorig,
690 : ulong tspub,
691 0 : fd_stem_context_t * stem ) {
692 0 : (void)in_idx;
693 0 : (void)seq;
694 0 : (void)ctl;
695 0 : (void)tsorig;
696 0 : (void)tspub;
697 :
698 0 : ctx->stem = stem;
699 :
700 0 : FD_TEST( ctx->state!=FD_SNAPIN_STATE_SHUTDOWN );
701 :
702 0 : if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) return handle_data_frag( ctx, chunk, sz, stem );
703 0 : else handle_control_frag( ctx, stem, sig );
704 :
705 0 : return 0;
706 0 : }
707 :
708 : static ulong
709 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
710 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
711 : ulong out_fds_cnt,
712 0 : int * out_fds ) {
713 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
714 :
715 0 : ulong out_cnt = 0;
716 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
717 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
718 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
719 0 : }
720 :
721 0 : return out_cnt;
722 0 : }
723 :
724 : static ulong
725 : populate_allowed_seccomp( fd_topo_t const * topo FD_PARAM_UNUSED,
726 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
727 : ulong out_cnt,
728 0 : struct sock_filter * out ) {
729 :
730 0 : populate_sock_filter_policy_fd_snapin_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
731 0 : return sock_filter_policy_fd_snapin_tile_instr_cnt;
732 0 : }
733 :
734 :
735 : static void
736 : privileged_init( fd_topo_t * topo,
737 0 : fd_topo_tile_t * tile ) {
738 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
739 :
740 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
741 0 : fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
742 :
743 0 : FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
744 0 : }
745 :
746 : FD_FN_UNUSED static void
747 : unprivileged_init( fd_topo_t * topo,
748 0 : fd_topo_tile_t * tile ) {
749 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
750 :
751 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
752 0 : fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
753 0 : void * _ssparse = FD_SCRATCH_ALLOC_APPEND( l, fd_ssparse_align(), fd_ssparse_footprint( 1UL<<24UL ) );
754 0 : void * _txncache = FD_SCRATCH_ALLOC_APPEND( l, fd_txncache_align(), fd_txncache_footprint( tile->snapin.max_live_slots ) );
755 0 : void * _manifest_parser = FD_SCRATCH_ALLOC_APPEND( l, fd_ssmanifest_parser_align(), fd_ssmanifest_parser_footprint() );
756 0 : void * _sd_parser = FD_SCRATCH_ALLOC_APPEND( l, fd_slot_delta_parser_align(), fd_slot_delta_parser_footprint() );
757 0 : ctx->txncache_entries = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sstxncache_entry_t), sizeof(fd_sstxncache_entry_t)*FD_SNAPIN_TXNCACHE_MAX_ENTRIES );
758 0 : ctx->blockhash_offsets = FD_SCRATCH_ALLOC_APPEND( l, alignof(blockhash_group_t), sizeof(blockhash_group_t)*FD_SNAPIN_MAX_SLOT_DELTA_GROUPS );
759 :
760 0 : ctx->full = 1;
761 0 : ctx->state = FD_SNAPIN_STATE_LOADING;
762 :
763 0 : ctx->boot_timestamp = fd_log_wallclock();
764 :
765 0 : FD_TEST( fd_funk_join( ctx->funk, fd_topo_obj_laddr( topo, tile->snapin.funk_obj_id ) ) );
766 0 : fd_funk_txn_xid_set_root( ctx->xid );
767 :
768 0 : void * _txncache_shmem = fd_topo_obj_laddr( topo, tile->snapin.txncache_obj_id );
769 0 : fd_txncache_shmem_t * txncache_shmem = fd_txncache_shmem_join( _txncache_shmem );
770 0 : FD_TEST( txncache_shmem );
771 0 : ctx->txncache = fd_txncache_join( fd_txncache_new( _txncache, txncache_shmem ) );
772 0 : FD_TEST( ctx->txncache );
773 :
774 0 : ctx->txncache_entries_len = 0UL;
775 0 : ctx->blockhash_offsets_len = 0UL;
776 :
777 0 : ctx->ssparse = fd_ssparse_new( _ssparse, 1UL<<24UL, ctx->seed );
778 0 : FD_TEST( ctx->ssparse );
779 :
780 0 : ctx->manifest_parser = fd_ssmanifest_parser_join( fd_ssmanifest_parser_new( _manifest_parser ) );
781 0 : FD_TEST( ctx->manifest_parser );
782 :
783 0 : ctx->slot_delta_parser = fd_slot_delta_parser_join( fd_slot_delta_parser_new( _sd_parser ) );
784 0 : FD_TEST( ctx->slot_delta_parser );
785 :
786 0 : fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
787 :
788 0 : if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
789 0 : if( FD_UNLIKELY( tile->in_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1", tile->in_cnt ));
790 0 : if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt ));
791 :
792 0 : fd_topo_link_t * writer_link = &topo->links[ tile->out_link_id[ 0UL ] ];
793 0 : ctx->manifest_out.wksp = topo->workspaces[ topo->objs[ writer_link->dcache_obj_id ].wksp_id ].wksp;
794 0 : ctx->manifest_out.chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( writer_link->dcache ), writer_link->dcache );
795 0 : ctx->manifest_out.wmark = fd_dcache_compact_wmark ( ctx->manifest_out.wksp, writer_link->dcache, writer_link->mtu );
796 0 : ctx->manifest_out.chunk = ctx->manifest_out.chunk0;
797 0 : ctx->manifest_out.mtu = writer_link->mtu;
798 :
799 0 : fd_ssparse_reset( ctx->ssparse );
800 0 : fd_ssmanifest_parser_init( ctx->manifest_parser, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ) );
801 0 : fd_slot_delta_parser_init( ctx->slot_delta_parser );
802 :
803 0 : fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0UL ] ];
804 0 : fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
805 0 : ctx->in.wksp = in_wksp->wksp;;
806 0 : ctx->in.chunk0 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
807 0 : ctx->in.wmark = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
808 0 : ctx->in.mtu = in_link->mtu;
809 0 : ctx->in.pos = 0UL;
810 :
811 0 : fd_memset( &ctx->flags, 0, sizeof(ctx->flags) );
812 0 : }
813 :
814 0 : #define STEM_BURST 2UL /* For control fragments, one acknowledgement, and one malformed message */
815 0 : #define STEM_LAZY 1000L
816 :
817 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapin_tile_t
818 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapin_tile_t)
819 :
820 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
821 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
822 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
823 :
824 : #include "../../disco/stem/fd_stem.c"
825 :
826 : fd_topo_run_tile_t fd_tile_snapin = {
827 : .name = NAME,
828 : .populate_allowed_fds = populate_allowed_fds,
829 : .populate_allowed_seccomp = populate_allowed_seccomp,
830 : .scratch_align = scratch_align,
831 : .scratch_footprint = scratch_footprint,
832 : .privileged_init = privileged_init,
833 : .unprivileged_init = unprivileged_init,
834 : .run = stem_run,
835 : };
836 :
837 : #undef NAME
|