Line data Source code
1 : #include "utils/fd_ssctrl.h"
2 : #include "utils/fd_ssload.h"
3 : #include "utils/fd_ssmsg.h"
4 : #include "utils/fd_ssparse.h"
5 : #include "utils/fd_ssmanifest_parser.h"
6 : #include "utils/fd_slot_delta_parser.h"
7 :
8 : #include "../../disco/topo/fd_topo.h"
9 : #include "../../disco/metrics/fd_metrics.h"
10 : #include "../../disco/gui/fd_gui_config_parse.h"
11 : #include "../../flamenco/runtime/fd_txncache.h"
12 : #include "../../flamenco/runtime/fd_system_ids.h"
13 : #include "../../flamenco/runtime/fd_hashes.h"
14 : #include "../../flamenco/runtime/sysvar/fd_sysvar_epoch_schedule.h"
15 : #include "../../flamenco/runtime/sysvar/fd_sysvar_slot_history.h"
16 :
17 : #include "../../flamenco/runtime/fd_txncache.h"
18 : #include "../../flamenco/runtime/fd_bank.h"
19 : #include "../../flamenco/features/fd_feature_snoop.h"
20 : #include "../../disco/stem/fd_stem.h"
21 : #include "../../flamenco/accdb/fd_accdb.h"
22 :
23 : #include "generated/fd_snapin_tile_seccomp.h"
24 :
25 : #define NAME "snapin"
26 :
27 : /* The snapin tile is a state machine that parses and loads a full
28 : and optionally an incremental snapshot. It is currently responsible
29 : for loading accounts into an in-memory database, though this may
30 : change. */
31 :
32 : /* 300 root slots in the slot deltas array, and each one references all
33 : 151 prior blockhashes that it's able to. */
34 : #define FD_SNAPIN_MAX_SLOT_DELTA_GROUPS (300UL*151UL)
35 :
36 : struct fd_blockhash_entry {
37 : fd_hash_t blockhash;
38 :
39 : struct {
40 : ulong prev;
41 : ulong next;
42 : } map;
43 : };
44 :
45 : typedef struct fd_blockhash_entry fd_blockhash_entry_t;
46 :
47 : #define MAP_NAME blockhash_map
48 0 : #define MAP_KEY blockhash
49 : #define MAP_KEY_T fd_hash_t
50 : #define MAP_ELE_T fd_blockhash_entry_t
51 0 : #define MAP_KEY_EQ(k0,k1) (!memcmp((k0),(k1), sizeof(fd_hash_t)))
52 0 : #define MAP_KEY_HASH(key,seed) (fd_hash((seed),(key),sizeof(fd_hash_t)))
53 0 : #define MAP_PREV map.prev
54 0 : #define MAP_NEXT map.next
55 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
56 : #include "../../util/tmpl/fd_map_chain.c"
57 :
58 : /* 300 here is from status_cache.rs::MAX_CACHE_ENTRIES which is the most
59 : root slots Agave could possibly serve in a snapshot. */
60 : #define FD_SNAPIN_TXNCACHE_MAX_ENTRIES (300UL*FD_PACK_MAX_TXNCACHE_TXN_PER_SLOT)
61 :
62 : struct blockhash_group {
63 : uchar blockhash[ 32UL ];
64 : ulong txnhash_offset;
65 : };
66 :
67 : typedef struct blockhash_group blockhash_group_t;
68 :
69 : struct fd_snapin_out_link {
70 : ulong idx;
71 : fd_wksp_t * mem;
72 : ulong chunk0;
73 : ulong wmark;
74 : ulong chunk;
75 : ulong mtu;
76 : };
77 : typedef struct fd_snapin_out_link fd_snapin_out_link_t;
78 :
79 : struct fd_snapin_tile {
80 : int state;
81 : uint full : 1; /* loading a full snapshot? */
82 :
83 : ulong seed;
84 : long boot_timestamp;
85 :
86 : fd_accdb_t * accdb;
87 : fd_txncache_t * txncache;
88 :
89 : fd_banks_t * banks;
90 : fd_bank_t * bank;
91 :
92 : fd_feature_snoop_t feature_snoop[1];
93 : struct {
94 : int capturing;
95 : fd_pubkey_t pubkey;
96 : ulong lamports;
97 : uchar owner[ 32UL ];
98 : ulong need;
99 : ulong write_pos;
100 : uchar buf[ sizeof(fd_feature_t) ];
101 : } feature_reasm;
102 :
103 : fd_ssparse_t ssparse[1];
104 : fd_ssmanifest_parser_t * manifest_parser;
105 : fd_slot_delta_parser_t * slot_delta_parser;
106 :
107 : struct {
108 : int manifest_done;
109 : int status_cache_done;
110 : int manifest_processed;
111 : } flags;
112 :
113 : ulong advertised_slot;
114 : ulong bank_slot;
115 : ulong epoch;
116 :
117 : fd_epoch_schedule_t epoch_schedule;
118 :
119 : ulong full_genesis_creation_time_seconds;
120 : uchar advertised_hash[ FD_HASH_FOOTPRINT ];
121 :
122 : ulong capitalization; /* tracks capitalization of all loaded accounts in the current snapshot */
123 : ulong dup_capitalization; /* tracks capitalization of duplicate accounts encountered during incremental snapshot loading */
124 : ulong manifest_capitalization; /* capitalization according to the current snapshot manifest */
125 :
126 : struct {
127 : ulong capitalization;
128 : fd_accdb_snapshot_recovery_t accdb_metadata;
129 : fd_feature_snoop_t feature_snoop;
130 : } recovery; /* stores state from the last full snapshot for incremental revert */
131 :
132 : ulong blockhash_offsets_len;
133 : blockhash_group_t * blockhash_offsets;
134 :
135 : ulong txncache_entries_len;
136 : fd_sstxncache_entry_t * txncache_entries;
137 :
138 : fd_accdb_fork_id_t accdb_root_fork_id;
139 : fd_accdb_fork_id_t accdb_incr_fork_id; /* child fork for incremental writes (purge on failure) */
140 : fd_txncache_fork_id_t txncache_root_fork_id;
141 :
142 : struct {
143 : ulong full_bytes_read;
144 : ulong incremental_bytes_read;
145 :
146 : /* Account counters (full + incremental) */
147 : ulong accounts_loaded;
148 : ulong accounts_replaced;
149 : ulong accounts_ignored;
150 :
151 : /* Account counters (snapshot taken for full snapshot only) */
152 : ulong full_accounts_loaded;
153 : ulong full_accounts_replaced;
154 : ulong full_accounts_ignored;
155 :
156 : /* Persistent counters */
157 : ulong total_accounts_processed;
158 : ulong total_account_batches_processed;
159 : } metrics;
160 :
161 : struct {
162 : fd_wksp_t * wksp;
163 : ulong chunk0;
164 : ulong wmark;
165 : ulong mtu;
166 : ulong pos;
167 : } in;
168 :
169 : fd_snapin_out_link_t ct_out;
170 : fd_snapin_out_link_t manifest_out;
171 : fd_snapin_out_link_t gui_out;
172 :
173 : ulong gui_config_acct_sz; /* total expected account data length (0 when not accumulating) */
174 : ulong gui_config_acct_off; /* bytes accumulated so far into the current gui_out link chunk */
175 :
176 : /* In-memory copy of the SlotHistory sysvar account, captured by
177 : snooping the account stream as the snapshot is loaded. The accdb
178 : read-back path is unsafe at the end of load because the snapwr
179 : tile may not have flushed the bytes yet; the snoop path observes
180 : the bytes directly. The captured copy is then used by
181 : verify_slot_deltas_with_slot_history.
182 :
183 : Replacement uses the same precedence as fd_accdb_snapshot_write_*:
184 : a write with slot >= captured.slot replaces the captured copy.
185 : This handles the incremental snapshot superseding the full. */
186 : struct {
187 : int captured;
188 : int capturing; /* streaming-path: currently appending data for this account */
189 : ulong slot;
190 : ulong lamports;
191 : ulong data_len;
192 : uchar owner[ 32UL ];
193 : int executable;
194 : ulong write_pos; /* bytes written into buf during the current streaming capture */
195 : uchar buf[ FD_SYSVAR_SLOT_HISTORY_BINCODE_SZ ];
196 : } slot_history;
197 : };
198 :
199 : typedef struct fd_snapin_tile fd_snapin_tile_t;
200 :
201 : static void
202 0 : format_count( char * out, ulong out_sz, ulong n ) {
203 0 : if( n>=1000000UL ) FD_TEST( fd_cstr_printf_check( out, out_sz, NULL, "%.1fM", (double)n/1e6 ) );
204 0 : else if( n>=1000UL ) FD_TEST( fd_cstr_printf_check( out, out_sz, NULL, "%.1fK", (double)n/1e3 ) );
205 0 : else FD_TEST( fd_cstr_printf_check( out, out_sz, NULL, "%lu", n ) );
206 0 : }
207 :
208 : static inline int
209 0 : should_shutdown( fd_snapin_tile_t * ctx ) {
210 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN ) ) {
211 0 : ulong accounts_dup = ctx->metrics.accounts_ignored + ctx->metrics.accounts_replaced;
212 0 : long elapsed_ns = fd_log_wallclock() - ctx->boot_timestamp;
213 0 : char loaded_buf[ 32 ];
214 0 : char dup_buf [ 32 ];
215 0 : format_count( loaded_buf, sizeof(loaded_buf), ctx->metrics.accounts_loaded );
216 0 : format_count( dup_buf, sizeof(dup_buf), accounts_dup );
217 0 : FD_LOG_NOTICE(( "loaded %s accounts (%s dups) from snapshot in %.3f seconds",
218 0 : loaded_buf, dup_buf, (double)elapsed_ns/1e9 ));
219 0 : }
220 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
221 0 : }
222 :
223 : static ulong
224 0 : scratch_align( void ) {
225 0 : return 512UL;
226 0 : }
227 :
228 : static ulong
229 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
230 0 : ulong l = FD_LAYOUT_INIT;
231 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
232 0 : l = FD_LAYOUT_APPEND( l, fd_txncache_align(), fd_txncache_footprint( tile->snapin.max_live_slots ) );
233 0 : l = FD_LAYOUT_APPEND( l, fd_accdb_align(), fd_accdb_footprint( tile->snapin.max_live_slots ) );
234 0 : l = FD_LAYOUT_APPEND( l, fd_ssmanifest_parser_align(), fd_ssmanifest_parser_footprint() );
235 0 : l = FD_LAYOUT_APPEND( l, fd_slot_delta_parser_align(), fd_slot_delta_parser_footprint() );
236 0 : l = FD_LAYOUT_APPEND( l, alignof(blockhash_group_t), sizeof(blockhash_group_t)*FD_SNAPIN_MAX_SLOT_DELTA_GROUPS );
237 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_sstxncache_entry_t), sizeof(fd_sstxncache_entry_t)*FD_SNAPIN_TXNCACHE_MAX_ENTRIES );
238 0 : return FD_LAYOUT_FINI( l, scratch_align() );
239 0 : }
240 :
241 : static void
242 0 : metrics_write( fd_snapin_tile_t * ctx ) {
243 0 : FD_MGAUGE_SET( SNAPIN, STATE, (ulong)ctx->state );
244 0 : FD_MGAUGE_SET( SNAPIN, FULL_BYTES_READ, ctx->metrics.full_bytes_read );
245 0 : FD_MGAUGE_SET( SNAPIN, INCREMENTAL_BYTES_READ, ctx->metrics.incremental_bytes_read );
246 0 : FD_MGAUGE_SET( SNAPIN, ACCOUNT_LOADED, ctx->metrics.accounts_loaded );
247 0 : FD_MGAUGE_SET( SNAPIN, ACCOUNT_REPLACED, ctx->metrics.accounts_replaced );
248 0 : FD_MGAUGE_SET( SNAPIN, ACCOUNT_IGNORED, ctx->metrics.accounts_ignored );
249 0 : FD_MCNT_SET ( SNAPIN, ACCOUNT_PROCESSED, ctx->metrics.total_accounts_processed );
250 0 : FD_MCNT_SET ( SNAPIN, ACCOUNT_BATCH_PROCESSED, ctx->metrics.total_account_batches_processed );
251 0 : }
252 :
253 : /* verify_slot_deltas_with_slot_history verifies the 'SlotHistory'
254 : sysvar account after loading a snapshot. Uses the in-memory copy
255 : captured by snooping the account stream (process_account_batch /
256 : process_account_header / process_account_data). We cannot read
257 : from accdb at this point because the snapwr tile's pwritev2 may
258 : not have completed yet for the SlotHistory bytes.
259 :
260 : Returns 0 if verification passed, -1 if not. */
261 :
262 : static int
263 0 : verify_slot_deltas_with_slot_history( fd_snapin_tile_t * ctx ) {
264 0 : if( FD_UNLIKELY( !ctx->slot_history.captured ) ) {
265 0 : FD_LOG_WARNING(( "SlotHistory sysvar account was not present in the snapshot stream" ));
266 0 : return -1;
267 0 : }
268 0 : if( FD_UNLIKELY( !ctx->slot_history.lamports || !ctx->slot_history.data_len ) ) {
269 0 : FD_LOG_WARNING(( "SlotHistory sysvar account missing or empty" ));
270 0 : return -1;
271 0 : }
272 0 : if( FD_UNLIKELY( !fd_memeq( ctx->slot_history.owner, fd_sysvar_owner_id.uc, sizeof(fd_pubkey_t) ) ) ) {
273 0 : FD_BASE58_ENCODE_32_BYTES( ctx->slot_history.owner, owner_b58 );
274 0 : FD_LOG_WARNING(( "SlotHistory sysvar owner is invalid: %s != sysvar_owner_id", owner_b58 ));
275 0 : return -1;
276 0 : }
277 :
278 0 : fd_slot_history_view_t view[1];
279 0 : if( FD_UNLIKELY( !fd_sysvar_slot_history_view( view, ctx->slot_history.buf, ctx->slot_history.data_len ) ) ) {
280 0 : FD_LOG_WARNING(( "SlotHistory sysvar account data is corrupt" ));
281 0 : return -1;
282 0 : }
283 :
284 : /* Sanity checks for slot history:
285 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L586 */
286 :
287 0 : ulong newest_slot = view->next_slot - 1UL;
288 0 : if( FD_UNLIKELY( newest_slot!=ctx->bank_slot ) ) {
289 : /* VerifySlotHistoryError::InvalidNewestSlot
290 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L621 */
291 0 : FD_LOG_WARNING(( "SlotHistory sysvar has an invalid newest slot: %lu != bank slot: %lu", newest_slot, ctx->bank_slot ));
292 0 : return -1;
293 0 : }
294 :
295 0 : if( FD_UNLIKELY( view->bits_len!=FD_SLOT_HISTORY_MAX_ENTRIES ) ) {
296 : /* VerifySlotHistoryError::InvalidNumEntries
297 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L625 */
298 0 : FD_LOG_WARNING(( "SlotHistory sysvar has invalid number of entries: %lu != expected: %lu", view->bits_len, FD_SLOT_HISTORY_MAX_ENTRIES ));
299 0 : return -1;
300 0 : }
301 :
302 : /* All slots in the txncache should be present in the slot history */
303 0 : for( ulong i=0UL; i<ctx->txncache_entries_len; i++ ) {
304 0 : fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[i];
305 0 : if( FD_UNLIKELY( fd_sysvar_slot_history_find_slot( view, entry->slot )!=FD_SLOT_HISTORY_SLOT_FOUND ) ) {
306 : /* VerifySlotDeltasError::SlotNotFoundInHistory
307 : https://github.com/anza-xyz/agave/blob/v3.1.8/snapshots/src/error.rs#L144
308 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L593 */
309 0 : FD_LOG_WARNING(( "slot %lu missing from SlotHistory sysvar account", entry->slot ));
310 0 : return -1;
311 0 : }
312 0 : }
313 :
314 : /* The most recent slots (up to the number of slots in the txncache)
315 : in the SlotHistory should be present in the txncache. */
316 0 : fd_slot_delta_slot_set_t slot_set = fd_slot_delta_parser_slot_set( ctx->slot_delta_parser );
317 0 : if( FD_LIKELY( slot_set.ele_cnt ) ) {
318 0 : ulong oldest = newest_slot - slot_set.ele_cnt;
319 0 : for( ulong i=newest_slot; i>oldest; i-- ) {
320 0 : if( FD_LIKELY( fd_sysvar_slot_history_find_slot( view, i )==FD_SLOT_HISTORY_SLOT_FOUND ) ) {
321 0 : if( FD_UNLIKELY( slot_set_ele_query( slot_set.map, &i, NULL, slot_set.pool )==NULL ) ) {
322 : /* VerifySlotDeltasError::SlotNotFoundInDeltas
323 : https://github.com/anza-xyz/agave/blob/v3.1.8/snapshots/src/error.rs#L147
324 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L609 */
325 0 : FD_LOG_WARNING(( "slot %lu missing from slot deltas but present in SlotHistory", i ));
326 0 : return -1;
327 0 : }
328 0 : }
329 0 : }
330 0 : }
331 :
332 0 : return 0;
333 0 : }
334 :
335 : /* verification of epoch stakes from manifest
336 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L632 */
337 : static int
338 0 : verify_epoch_stakes( fd_snapshot_manifest_t const * manifest ) {
339 0 : fd_epoch_schedule_t epoch_schedule = (fd_epoch_schedule_t){
340 0 : .slots_per_epoch = manifest->epoch_schedule_params.slots_per_epoch,
341 0 : .leader_schedule_slot_offset = manifest->epoch_schedule_params.leader_schedule_slot_offset,
342 0 : .warmup = manifest->epoch_schedule_params.warmup,
343 0 : .first_normal_epoch = manifest->epoch_schedule_params.first_normal_epoch,
344 0 : .first_normal_slot = manifest->epoch_schedule_params.first_normal_slot,
345 0 : };
346 :
347 0 : ulong min_required_epoch = fd_slot_to_epoch( &epoch_schedule, manifest->slot, NULL );
348 0 : ulong max_required_epoch = fd_slot_to_leader_schedule_epoch( &epoch_schedule, manifest->slot );
349 :
350 : /* ensure all required epochs are present in epoch stakes */
351 0 : for( ulong i=min_required_epoch; i<=max_required_epoch; i++ ) {
352 0 : int found = 0;
353 0 : for( ulong j=0UL; j<FD_EPOCH_STAKES_LEN; j++ ) {
354 0 : if( manifest->epoch_stakes[j].epoch==i ) {
355 0 : found = 1;
356 0 : break;
357 0 : }
358 0 : }
359 :
360 0 : if( FD_UNLIKELY( !found ) ) {
361 : /* VerifyEpochStakesError::StakesNotFound
362 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L667 */
363 0 : FD_LOG_WARNING(( "stakes not found for epoch %lu in manifest", i ));
364 0 : return -1;
365 0 : }
366 0 : }
367 :
368 0 : return 0;
369 0 : }
370 :
371 : static int
372 : verify_slot_deltas_with_bank_slot( fd_snapin_tile_t * ctx,
373 0 : ulong bank_slot ) {
374 0 : for( ulong i=0UL; i<ctx->txncache_entries_len; i++ ) {
375 0 : fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[i];
376 : /* VerifySlotDeltasError::SlotGreaterThanMaxRoot
377 : https://github.com/anza-xyz/agave/blob/v3.1.8/snapshots/src/error.rs#L138
378 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L550 */
379 0 : if( FD_UNLIKELY( entry->slot>bank_slot ) ) {
380 0 : FD_LOG_WARNING(( "entry slot %lu is greater than bank slot %lu", entry->slot, bank_slot ));
381 0 : return -1;
382 0 : }
383 0 : }
384 0 : return 0;
385 0 : }
386 :
387 : static int
388 : verify_bank_hash( fd_snapin_tile_t const * ctx,
389 0 : fd_snapshot_manifest_t const * manifest ) {
390 0 : if( FD_UNLIKELY( manifest->blockhashes_len==0UL ) ) {
391 0 : FD_LOG_WARNING(( "%s manifest for epoch %lu and slot %lu has no blockhashes",
392 0 : ctx->full?"full":"incr", ctx->epoch, manifest->slot ));
393 0 : return -1;
394 0 : }
395 :
396 0 : if( FD_UNLIKELY( !manifest->has_accounts_lthash ) ) {
397 0 : FD_LOG_WARNING(( "%s manifest for epoch %lu and slot %lu is missing accounts lthash",
398 0 : ctx->full?"full":"incr", ctx->epoch, manifest->slot ));
399 0 : return -1;
400 0 : }
401 :
402 : /* find the last blockhash */
403 0 : ulong max_hash_idx = 0UL;
404 0 : ulong last_bh_idx = 0UL;
405 0 : for( ulong i=0UL; i<manifest->blockhashes_len; i++ ) {
406 0 : if( FD_LIKELY( manifest->blockhashes[ i ].hash_index > max_hash_idx ) ) {
407 0 : max_hash_idx = manifest->blockhashes[ i ].hash_index;
408 0 : last_bh_idx = i;
409 0 : }
410 0 : }
411 :
412 : /* fd_lthash_value_t is aligned to 64B but the accounts_lthash in the
413 : manifest may not be because its simply a uchar array. Copy is
414 : needed to avoid undefined behavior. */
415 0 : fd_lthash_value_t accounts_lthash[ 1UL ];
416 0 : fd_memcpy( accounts_lthash, manifest->accounts_lthash, sizeof(fd_lthash_value_t) );
417 :
418 0 : fd_hash_t const * parent_bank_hash = (fd_hash_t const *)fd_type_pun_const( manifest->parent_bank_hash );
419 0 : fd_hash_t const * last_blockhash = (fd_hash_t const *)fd_type_pun_const( manifest->blockhashes[ last_bh_idx ].hash );
420 0 : fd_hash_t computed_bank_hash[ 1UL ];
421 0 : fd_hashes_hash_bank( accounts_lthash, parent_bank_hash, last_blockhash, manifest->signature_count, computed_bank_hash );
422 0 : fd_hashes_apply_hard_forks(
423 0 : computed_bank_hash,
424 0 : manifest->slot,
425 0 : manifest->parent_slot,
426 0 : manifest->hard_forks,
427 0 : manifest->hard_fork_cnt );
428 :
429 0 : if( FD_UNLIKELY( memcmp( computed_bank_hash, manifest->bank_hash, FD_HASH_FOOTPRINT ) ) ) {
430 0 : FD_BASE58_ENCODE_32_BYTES( computed_bank_hash->hash, computed_bank_hash_enc );
431 0 : FD_BASE58_ENCODE_32_BYTES( manifest->bank_hash, manifest_bank_hash_enc );
432 0 : FD_LOG_WARNING(( "%s manifest for epoch %lu and slot %lu bank hash verification failed: computed %s does not match manifest %s",
433 0 : ctx->full?"full":"incr", ctx->epoch, manifest->slot,
434 0 : computed_bank_hash_enc, manifest_bank_hash_enc ));
435 0 : return -1;
436 0 : }
437 :
438 0 : return 0;
439 0 : }
440 :
441 : static void
442 : transition_malformed( fd_snapin_tile_t * ctx,
443 0 : fd_stem_context_t * stem ) {
444 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) return;
445 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
446 0 : fd_stem_publish( stem, ctx->ct_out.idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
447 0 : }
448 :
449 : static int
450 : populate_txncache( fd_snapin_tile_t * ctx,
451 : fd_snapshot_manifest_blockhash_t const blockhashes[ static FD_BLOCKHASHES_MAX ],
452 0 : ulong blockhashes_len ) {
453 : /* Our txncache internally contains the fork structure for the chain,
454 : which we need to recreate here. Because snapshots are only served
455 : for rooted slots, there is actually no forking, and the bank forks
456 : are just a single bank, the root, like
457 :
458 : _root
459 :
460 : But the txncache also must contain the 150 more recent banks prior
461 : to the root (151 rooted banks total), looking like,
462 :
463 :
464 : _root_150 -> _root_149 -> ... -> _root_2 -> _root_1 -> _root
465 :
466 : Our txncache is "slot agnostic" meaning there is no concept of a
467 : slot number in it. It just has a fork tree structure. So long as
468 : the fork tree is isomorphic to the actual bank forks, and each bank
469 : has the correct blockhash, it works.
470 :
471 : So the challenge is simply to create this chain of 151 forks in the
472 : txncache, with correct blockhashes, and then insert all the
473 : transactions into it.
474 :
475 : Constructing the chain of blockhashes is easy. It is just the
476 : BLOCKHASH_QUEUE array in the manifest. This array is unfortuantely
477 : not sorted and appears in random order, but it has a hash_index
478 : field which is a gapless index, starting at some arbitrary offset,
479 : so we can back out the 151 blockhashes we need from this, by first
480 : finding the max hash_index as _max and then collecting hash entries
481 : via,
482 :
483 : _root_150 -> _root_149 -> ... -> _root_2 -> _root_1 -> _root
484 : _max-150 -> _max-149 -> ... -> _max-2 -> _max-1 -> _max
485 :
486 : Now the remaining problem is inserting transactions into this
487 : chain. Remember each transaction needs to be inserted with:
488 :
489 : (a) The fork ID (position of the bank in the chain) it was executed in.
490 : (b) The blockhash of the bank it referenced.
491 :
492 : (b) is trivial to retrieve, as it's in the actual slot_deltas entry
493 : in the manifest served by Agave. But (a) is mildly annoying. Agave
494 : serves slot_deltas based on slot, so we need an additional mapping
495 : from slot to position in our banks chain. It turns out we have to
496 : go to yet another structure in the manifest to retrieve this, the
497 : ancestors array. This is just an array of slot values, so we need
498 : to sort it, and line it up against our banks chain like so,
499 :
500 : _root_150 -> _root_149 -> ... -> _root_2 -> _root_1 -> _root
501 : _max-150 -> _max-149 -> ... -> _max-2 -> _max-1 -> _max
502 : _slots_150 -> _slots_149 -> ... -> _slots_2 -> _slots_1 -> _slots
503 :
504 : From there we are done.
505 :
506 : Well almost ... if you were paying attention you might have noticed
507 : this is a lot of work and we are lazy. Why don't we just ignore the
508 : slot mapping and assume everything executed at the root slot
509 : exactly? The only invariant we should maintain from a memory
510 : perspective is that at most, across all active banks,
511 : FD_MAX_TXN_PER_SLOT transactions are stored per slot, but we
512 : have preserved that. It is not true "per slot" technically, but
513 : it's true across all slots, and the memory is aggregated. It will
514 : also always be true, even as slots are garbage collected, because
515 : entries are collected by referece blockhash, not executed slot.
516 :
517 : ... actually we can't do this. There's more broken things here.
518 : The Agave status decided to only store 20 bytes for 32 byte
519 : transaction hashes to save on memory. That's OK, but they didn't
520 : just take the first 20 bytes. They instead, for each blockhash,
521 : take a random offset between 0 and 12, and store bytes
522 : [ offset, offset+20 ) of the transaction hash. We need to know this
523 : offset to be able to query the txncache later, so we need to
524 : retrieve it from the slot_deltas entry in the manifest, and key it
525 : into our txncache. Unfortunately this offset is stored per slot in
526 : the slot_deltas entry. So we need to first go and retrieve the
527 : ancestors array, sort it, and line it up against our banks chain as
528 : described above, and then go through slot deltas, to retrieve the
529 : offset for each slot, and stick it into the appropriate bank in
530 : our chain. */
531 :
532 0 : if( FD_UNLIKELY( blockhashes_len>FD_BLOCKHASHES_MAX ) ) {
533 0 : FD_LOG_WARNING(( "corrupt snapshot: blockhash queue length %lu exceeds maximum %lu", blockhashes_len, FD_BLOCKHASHES_MAX ));
534 0 : return 1;
535 0 : }
536 0 : if( FD_UNLIKELY( !blockhashes_len ) ) {
537 0 : FD_LOG_WARNING(( "corrupt snapshot: blockhash queue is empty" ));
538 0 : return 1;
539 0 : }
540 :
541 0 : ulong seq_min = ULONG_MAX;
542 0 : for( ulong i=0UL; i<blockhashes_len; i++ ) seq_min = fd_ulong_min( seq_min, blockhashes[ i ].hash_index );
543 :
544 0 : ulong seq_max;
545 0 : if( FD_UNLIKELY( __builtin_uaddl_overflow( seq_min, blockhashes_len, &seq_max ) ) ) {
546 0 : FD_LOG_WARNING(( "corrupt snapshot: blockhash queue sequence number wraparound (seq_min=%lu age_cnt=%lu)", seq_min, blockhashes_len ));
547 0 : return 1;
548 0 : }
549 :
550 : /* First let's construct the chain array as described above. But
551 : index 0 will be the root, index 1 the root's parent, etc. */
552 :
553 0 : struct {
554 0 : int exists;
555 0 : uchar blockhash[ 32UL ];
556 0 : fd_txncache_fork_id_t fork_id;
557 0 : ulong txnhash_offset;
558 0 : } banks[ FD_BLOCKHASHES_MAX ] = {0};
559 :
560 0 : for( ulong i=0UL; i<blockhashes_len; i++ ) {
561 0 : fd_snapshot_manifest_blockhash_t const * elem = &blockhashes[ i ];
562 0 : ulong idx;
563 0 : if( FD_UNLIKELY( __builtin_usubl_overflow( elem->hash_index, seq_min, &idx ) ) ) {
564 0 : FD_LOG_WARNING(( "corrupt snapshot: gap in blockhash queue (seq=[%lu,%lu) idx=%lu)", seq_min, seq_max, blockhashes[ i ].hash_index ));
565 0 : return 1;
566 0 : }
567 :
568 0 : if( FD_UNLIKELY( idx>=blockhashes_len ) ) {
569 0 : FD_LOG_WARNING(( "corrupt snapshot: blockhash queue index out of range (seq_min=%lu age_cnt=%lu idx=%lu)", seq_min, blockhashes_len, idx ));
570 0 : return 1;
571 0 : }
572 :
573 0 : if( FD_UNLIKELY( banks[ blockhashes_len-1UL-idx ].exists ) ) {
574 0 : FD_LOG_WARNING(( "corrupt snapshot: duplicate blockhash hash_index %lu", elem->hash_index ));
575 0 : return 1;
576 0 : }
577 :
578 0 : banks[ blockhashes_len-1UL-idx ].fork_id.val = USHORT_MAX;
579 0 : banks[ blockhashes_len-1UL-idx ].txnhash_offset = ULONG_MAX;
580 0 : memcpy( banks[ blockhashes_len-1UL-idx ].blockhash, elem->hash, 32UL );
581 0 : banks[ blockhashes_len-1UL-idx ].exists = 1;
582 0 : }
583 :
584 0 : ulong chain_len = fd_ulong_min( blockhashes_len, 151UL );
585 :
586 : /* Now we need a hashset of just the 151 most recent blockhashes,
587 : anything else is a nonce transaction which we do not insert, or an
588 : already expired transaction which can also be discarded. */
589 :
590 0 : uchar __attribute__((aligned(alignof(blockhash_map_t)))) _map[ blockhash_map_footprint( 1024UL ) ];
591 0 : blockhash_map_t * blockhash_map = blockhash_map_join( blockhash_map_new( _map, 1024UL, ctx->seed ) );
592 0 : if( FD_UNLIKELY( !blockhash_map ) ) FD_LOG_ERR(( "failed to create blockhash map" ));
593 :
594 0 : fd_blockhash_entry_t blockhash_pool[ 151UL ];
595 0 : for( ulong i=0UL; i<chain_len; i++ ) {
596 0 : fd_memcpy( blockhash_pool[ i ].blockhash.uc, banks[ i ].blockhash, 32UL );
597 :
598 0 : if( FD_UNLIKELY( blockhash_map_ele_query_const( blockhash_map, &blockhash_pool[ i ].blockhash, NULL, blockhash_pool ) ) ) {
599 0 : FD_BASE58_ENCODE_32_BYTES( banks[ i ].blockhash, blockhash_b58 );
600 0 : FD_LOG_WARNING(( "corrupt snapshot: duplicate blockhash %s in 151 most recent blockhashes", blockhash_b58 ));
601 0 : return 1;
602 0 : }
603 :
604 0 : blockhash_map_ele_insert( blockhash_map, &blockhash_pool[ i ], blockhash_pool );
605 0 : }
606 :
607 : /* Now load the blockhash offsets for these blockhashes ... */
608 0 : if( FD_UNLIKELY( !ctx->blockhash_offsets_len ) ) {
609 0 : FD_LOG_WARNING(( "corrupt snapshot: no blockhash offsets found (nothing is rooted)" ));
610 0 : return 1;
611 0 : }
612 0 : for( ulong i=0UL; i<ctx->blockhash_offsets_len; i++ ) {
613 0 : fd_hash_t key;
614 0 : fd_memcpy( key.uc, ctx->blockhash_offsets[ i ].blockhash, 32UL );
615 0 : fd_blockhash_entry_t * entry = blockhash_map_ele_query( blockhash_map, &key, NULL, blockhash_pool );
616 0 : if( FD_UNLIKELY( !entry ) ) continue; /* Not in the most recent 151 blockhashes */
617 :
618 0 : ulong chain_idx = (ulong)(entry - blockhash_pool);
619 :
620 0 : if( FD_UNLIKELY( banks[ chain_idx ].txnhash_offset!=ULONG_MAX && banks[ chain_idx ].txnhash_offset!=ctx->blockhash_offsets[ i ].txnhash_offset ) ) {
621 0 : FD_BASE58_ENCODE_32_BYTES( entry->blockhash.uc, blockhash_b58 );
622 0 : FD_LOG_WARNING(( "corrupt snapshot: conflicting txnhash offsets for blockhash %s", blockhash_b58 ));
623 0 : return 1;
624 0 : }
625 :
626 0 : banks[ chain_idx ].txnhash_offset = ctx->blockhash_offsets[ i ].txnhash_offset;
627 0 : }
628 :
629 : /* Construct the linear fork chain in the txncache. */
630 :
631 0 : fd_txncache_fork_id_t parent = { .val = USHORT_MAX };
632 0 : for( ulong i=0UL; i<chain_len; i++ ) banks[ chain_len-1UL-i ].fork_id = parent = fd_txncache_attach_child( ctx->txncache, parent );
633 0 : for( ulong i=0UL; i<chain_len; i++ ) fd_txncache_attach_blockhash( ctx->txncache, banks[ i ].fork_id, banks[ i ].blockhash );
634 :
635 : /* Now insert all transactions as if they executed at the current
636 : root, per above. */
637 :
638 0 : ulong insert_cnt = 0UL;
639 0 : for( ulong i=0UL; i<ctx->txncache_entries_len; i++ ) {
640 0 : fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[ i ];
641 0 : fd_hash_t key;
642 0 : fd_memcpy( key.uc, entry->blockhash, 32UL );
643 0 : if( FD_UNLIKELY( !blockhash_map_ele_query_const( blockhash_map, &key, NULL, blockhash_pool ) ) ) continue;
644 :
645 0 : insert_cnt++;
646 0 : fd_txncache_insert( ctx->txncache, banks[ 0UL ].fork_id, entry->blockhash, entry->txnhash );
647 0 : }
648 :
649 0 : FD_LOG_INFO(( "inserted %lu/%lu transactions into the txncache", insert_cnt, ctx->txncache_entries_len ));
650 :
651 : /* Then finalize all the banks (freezing them) and setting the txnhash
652 : offset so future queries use the correct offset. If the offset is
653 : ULONG_MAX this is valid, it means the blockhash had no transactions
654 : in it, so there's nothing in the status cache under that blockhash.
655 :
656 : Just set the offset to 0 in this case, it doesn't matter, but
657 : should be valid between 0 and 12 inclusive. */
658 0 : for( ulong i=0UL; i<chain_len; i++ ) {
659 0 : ulong txnhash_offset = banks[ chain_len-1UL-i ].txnhash_offset==ULONG_MAX ? 0UL : banks[ chain_len-1UL-i ].txnhash_offset;
660 0 : fd_txncache_finalize_fork( ctx->txncache, banks[ chain_len-1UL-i ].fork_id, txnhash_offset, banks[ chain_len-1UL-i ].blockhash );
661 0 : }
662 :
663 0 : for( ulong i=1UL; i<chain_len; i++ ) fd_txncache_advance_root( ctx->txncache, banks[ chain_len-1UL-i ].fork_id );
664 :
665 0 : ctx->txncache_root_fork_id = parent;
666 :
667 0 : return 0;
668 0 : }
669 :
670 : static void
671 : process_manifest( fd_snapin_tile_t * ctx,
672 0 : fd_stem_context_t * stem ) {
673 0 : fd_snapshot_manifest_t * manifest = fd_chunk_to_laddr( ctx->manifest_out.mem, ctx->manifest_out.chunk );
674 :
675 0 : if( FD_UNLIKELY( ctx->advertised_slot!=manifest->slot ) ) {
676 : /* SnapshotError::MismatchedSlot
677 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L472 */
678 0 : FD_LOG_WARNING(( "snapshot manifest bank slot %lu does not match advertised slot %lu from snapshot peer",
679 0 : manifest->slot, ctx->advertised_slot ));
680 0 : transition_malformed( ctx, stem );
681 0 : return;
682 0 : }
683 :
684 0 : if( FD_UNLIKELY( !manifest->has_accounts_lthash ) ) {
685 : /* The manifest must contain accounts lthash, irrespective of
686 : whether lthash verification is disabled or not.
687 : https://github.com/anza-xyz/agave/blob/v3.1.9/runtime/src/serde_snapshot.rs#L482 */
688 0 : FD_LOG_WARNING(( "snapshot manifest missing accounts lthash" ));
689 0 : transition_malformed( ctx, stem );
690 0 : return;
691 0 : }
692 :
693 0 : uchar const * sum = manifest->accounts_lthash;
694 0 : uchar hash32[32]; fd_blake3_hash( sum, FD_LTHASH_LEN_BYTES, hash32 );
695 0 : FD_BASE58_ENCODE_32_BYTES( sum, sum_enc );
696 0 : FD_BASE58_ENCODE_32_BYTES( hash32, hash32_enc );
697 0 : FD_LOG_INFO(( "snapshot manifest slot=%lu indicates lthash[..32]=%s blake3(lthash)=%s",
698 0 : manifest->slot, sum_enc, hash32_enc ));
699 :
700 0 : if( FD_UNLIKELY( memcmp( ctx->advertised_hash, hash32, FD_HASH_FOOTPRINT ) ) ) {
701 : /* SnapshotError::MismatchedHash
702 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L479 */
703 0 : FD_BASE58_ENCODE_32_BYTES( ctx->advertised_hash, advertised_hash_enc );
704 0 : FD_LOG_WARNING(( "snapshot manifest accounts lthash %s does not match advertised hash from snapshot peer %s",
705 0 : hash32_enc, advertised_hash_enc ));
706 0 : transition_malformed( ctx, stem );
707 0 : return;
708 0 : }
709 :
710 0 : ctx->bank_slot = manifest->slot;
711 0 : ctx->manifest_capitalization = manifest->capitalization;
712 0 : if( FD_UNLIKELY( ctx->manifest_capitalization>LONG_MAX ) ) {
713 : /* Calculations downstream require capitalization to be treated
714 : as long (to handle addition and subtraction). */
715 0 : FD_LOG_WARNING(( "snapshot manifest capitalization %lu exceeds LONG_MAX", ctx->manifest_capitalization ));
716 0 : transition_malformed( ctx, stem );
717 0 : return;
718 0 : }
719 :
720 0 : if( FD_UNLIKELY( fd_ssload_manifest_validate( manifest, FD_RUNTIME_MAX_VOTE_ACCOUNTS, FD_RUNTIME_MAX_STAKE_ACCOUNTS ) ) ) {
721 0 : FD_LOG_WARNING(( "snapshot manifest validation failed" ));
722 0 : transition_malformed( ctx, stem );
723 0 : return;
724 0 : }
725 :
726 0 : fd_epoch_schedule_t epoch_schedule = (fd_epoch_schedule_t){
727 0 : .slots_per_epoch = manifest->epoch_schedule_params.slots_per_epoch,
728 0 : .leader_schedule_slot_offset = manifest->epoch_schedule_params.leader_schedule_slot_offset,
729 0 : .warmup = manifest->epoch_schedule_params.warmup,
730 0 : .first_normal_epoch = manifest->epoch_schedule_params.first_normal_epoch,
731 0 : .first_normal_slot = manifest->epoch_schedule_params.first_normal_slot,
732 0 : };
733 0 : ctx->epoch = fd_slot_to_epoch( &epoch_schedule, manifest->slot, NULL );
734 0 : ctx->epoch_schedule = epoch_schedule;
735 :
736 0 : if( FD_UNLIKELY( verify_bank_hash( ctx, manifest ) ) ) {
737 : /* https://github.com/anza-xyz/agave/blob/v3.1.9/runtime/src/bank.rs#L4682 */
738 0 : transition_malformed( ctx, stem );
739 0 : return;
740 0 : }
741 :
742 0 : if( FD_UNLIKELY( verify_slot_deltas_with_bank_slot( ctx, manifest->slot ) ) ) {
743 0 : FD_LOG_WARNING(( "slot deltas verification failed" ));
744 0 : transition_malformed( ctx, stem );
745 0 : return;
746 0 : }
747 :
748 0 : if( FD_UNLIKELY( verify_epoch_stakes( manifest ) ) ) {
749 0 : FD_LOG_WARNING(( "epoch stakes verification failed" ));
750 0 : transition_malformed( ctx, stem );
751 0 : return;
752 0 : }
753 :
754 0 : if( FD_UNLIKELY( populate_txncache( ctx, manifest->blockhashes, manifest->blockhashes_len ) ) ) {
755 0 : FD_LOG_WARNING(( "populating txncache failed" ));
756 0 : transition_malformed( ctx, stem );
757 0 : return;
758 0 : }
759 :
760 0 : if( ctx->full ) {
761 0 : ctx->full_genesis_creation_time_seconds = manifest->creation_time_seconds;
762 0 : } else {
763 0 : if( FD_UNLIKELY( manifest->creation_time_seconds!=ctx->full_genesis_creation_time_seconds ) ) {
764 0 : FD_LOG_WARNING(( "snapshot manifest genesis creation time seconds %lu does not match full snapshot genesis creation time seconds %lu",
765 0 : manifest->creation_time_seconds, ctx->full_genesis_creation_time_seconds ));
766 0 : transition_malformed( ctx, stem );
767 0 : return;
768 0 : }
769 0 : }
770 :
771 0 : manifest->accdb_fork_id = ctx->accdb_root_fork_id.val;
772 0 : manifest->txncache_fork_id = ctx->txncache_root_fork_id.val;
773 :
774 0 : ulong sig = ctx->full ? fd_ssmsg_sig( FD_SSMSG_MANIFEST_FULL ) :
775 0 : fd_ssmsg_sig( FD_SSMSG_MANIFEST_INCREMENTAL );
776 0 : fd_stem_publish( stem, ctx->manifest_out.idx, sig, ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), 0UL, 0UL, 0UL );
777 0 : ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), ctx->manifest_out.chunk0, ctx->manifest_out.wmark );
778 0 : }
779 :
780 : static int
781 : process_account_batch( fd_snapin_tile_t * ctx,
782 0 : fd_ssparse_advance_result_t * result ) {
783 0 : uchar const * const * entries = result->account_batch.batch;
784 0 : ulong cnt = result->account_batch.batch_cnt;
785 0 : ulong batch_slot = result->account_batch.slot;
786 :
787 0 : uchar const * pubkeys [ FD_SSPARSE_ACC_BATCH_MAX ] = {0};
788 0 : ulong slots [ FD_SSPARSE_ACC_BATCH_MAX ] = {0};
789 0 : ulong lamports [ FD_SSPARSE_ACC_BATCH_MAX ] = {0};
790 0 : ulong data_lens [ FD_SSPARSE_ACC_BATCH_MAX ] = {0};
791 0 : int executables[ FD_SSPARSE_ACC_BATCH_MAX ] = {0};
792 :
793 0 : for( ulong i=0UL; i<cnt; i++ ) {
794 0 : uchar const * e = entries[ i ];
795 0 : pubkeys[ i ] = e + 16UL;
796 0 : slots[ i ] = batch_slot;
797 0 : lamports[ i ] = fd_ulong_load_8_fast( e+48UL );
798 0 : data_lens[ i ] = fd_ulong_load_8_fast( e+8UL );
799 0 : executables[ i ] = e[ 96UL ];
800 :
801 : /* Snoop SlotHistory sysvar. Account body in the batch path is
802 : contiguous starting at e+136. */
803 0 : if( FD_UNLIKELY( !memcmp( pubkeys[ i ], fd_sysvar_slot_history_id.uc, 32UL ) ) &&
804 0 : ( !ctx->slot_history.captured || batch_slot>=ctx->slot_history.slot ) &&
805 0 : data_lens[ i ]<=FD_SYSVAR_SLOT_HISTORY_BINCODE_SZ ) {
806 0 : ctx->slot_history.slot = batch_slot;
807 0 : ctx->slot_history.lamports = lamports[ i ];
808 0 : ctx->slot_history.data_len = data_lens[ i ];
809 0 : ctx->slot_history.executable = executables[ i ];
810 0 : memcpy( ctx->slot_history.owner, e+64UL, 32UL );
811 0 : memcpy( ctx->slot_history.buf, e+136UL, data_lens[ i ] );
812 0 : ctx->slot_history.captured = 1;
813 0 : }
814 :
815 0 : fd_feature_snoop_account( ctx->feature_snoop, (fd_pubkey_t const *)pubkeys[ i ], lamports[ i ], e+64UL, e+136UL, data_lens[ i ] );
816 0 : }
817 :
818 0 : ulong accounts_ignored, accounts_replaced, accounts_loaded, replaced_lamports, ignored_lamports;
819 0 : fd_accdb_fork_id_t fork_id = ctx->full ? (fd_accdb_fork_id_t){ .val = USHORT_MAX } : ctx->accdb_incr_fork_id;
820 0 : if( FD_UNLIKELY( 0!=fd_accdb_snapshot_write_batch( ctx->accdb, fork_id, cnt, pubkeys, slots, lamports, data_lens,
821 0 : executables, &accounts_ignored, &accounts_replaced, &accounts_loaded,
822 0 : &replaced_lamports, &ignored_lamports ) ) ) {
823 0 : return -1;
824 0 : }
825 0 : ctx->metrics.accounts_ignored += accounts_ignored;
826 0 : ctx->metrics.accounts_replaced += accounts_replaced;
827 0 : ctx->metrics.accounts_loaded += accounts_loaded;
828 0 : ctx->metrics.total_accounts_processed += cnt;
829 0 : ctx->metrics.total_account_batches_processed++;
830 : /* Sum lamports of every accepted entry into capitalization, and
831 : accumulate the lamports of overwritten entries into
832 : dup_capitalization so the final value can be reconciled with the
833 : manifest. Ignored entries (older slot than what's already in the
834 : accdb) contribute neither to the live database nor to capitalization,
835 : so subtract their lamports back out. */
836 0 : for( ulong i=0UL; i<cnt; i++ ) ctx->capitalization = fd_ulong_sat_add( ctx->capitalization, lamports[ i ] );
837 0 : ctx->capitalization = fd_ulong_sat_sub( ctx->capitalization, ignored_lamports );
838 0 : ctx->dup_capitalization = fd_ulong_sat_add( ctx->dup_capitalization, replaced_lamports );
839 :
840 0 : return 0;
841 0 : }
842 :
843 : static int
844 : process_account_header( fd_snapin_tile_t * ctx,
845 0 : fd_ssparse_advance_result_t * result ) {
846 0 : ctx->metrics.total_account_batches_processed++;
847 0 : ctx->metrics.total_accounts_processed++;
848 0 : ulong replaced_lamports = 0UL;
849 0 : fd_accdb_fork_id_t fork_id = ctx->full ? (fd_accdb_fork_id_t){ .val = USHORT_MAX } : ctx->accdb_incr_fork_id;
850 0 : int account = fd_accdb_snapshot_write_one( ctx->accdb,
851 0 : fork_id,
852 0 : result->account_header.pubkey,
853 0 : result->account_header.slot,
854 0 : result->account_header.lamports,
855 0 : result->account_header.data_len,
856 0 : result->account_header.executable,
857 0 : &replaced_lamports );
858 0 : if( FD_UNLIKELY( -1==account ) ) {
859 0 : ctx->metrics.accounts_ignored++;
860 0 : } else {
861 0 : if( FD_UNLIKELY( 2==account ) ) {
862 0 : ctx->metrics.accounts_replaced++;
863 0 : ctx->dup_capitalization = fd_ulong_sat_add( ctx->dup_capitalization, replaced_lamports );
864 0 : } else {
865 0 : ctx->metrics.accounts_loaded++;
866 0 : }
867 0 : ctx->capitalization = fd_ulong_sat_add( ctx->capitalization, result->account_header.lamports );
868 0 : }
869 :
870 : /* Snoop SlotHistory sysvar. Streaming path: arm the capture window
871 : here; process_account_data appends bytes while armed. */
872 0 : ctx->slot_history.capturing = 0;
873 0 : if( FD_UNLIKELY( !memcmp( result->account_header.pubkey, fd_sysvar_slot_history_id.uc, 32UL ) ) &&
874 0 : ( !ctx->slot_history.captured || result->account_header.slot>=ctx->slot_history.slot ) &&
875 0 : result->account_header.data_len<=FD_SYSVAR_SLOT_HISTORY_BINCODE_SZ ) {
876 0 : ctx->slot_history.slot = result->account_header.slot;
877 0 : ctx->slot_history.lamports = result->account_header.lamports;
878 0 : ctx->slot_history.data_len = result->account_header.data_len;
879 0 : ctx->slot_history.executable = result->account_header.executable;
880 0 : memcpy( ctx->slot_history.owner, result->account_header.owner, 32UL );
881 0 : ctx->slot_history.write_pos = 0UL;
882 0 : ctx->slot_history.capturing = 1;
883 0 : }
884 0 : ctx->feature_reasm.capturing = 0;
885 0 : if( FD_UNLIKELY( !memcmp( result->account_header.owner, fd_solana_feature_program_id.uc, 32UL ) &&
886 0 : result->account_header.lamports ) ) {
887 0 : memcpy( ctx->feature_reasm.pubkey.uc, result->account_header.pubkey, 32UL );
888 0 : memcpy( ctx->feature_reasm.owner, result->account_header.owner, 32UL );
889 0 : ctx->feature_reasm.lamports = result->account_header.lamports;
890 0 : ctx->feature_reasm.need = fd_ulong_min( result->account_header.data_len, sizeof(ctx->feature_reasm.buf) );
891 0 : ctx->feature_reasm.write_pos = 0UL;
892 0 : ctx->feature_reasm.capturing = 1;
893 0 : if( FD_UNLIKELY( !ctx->feature_reasm.need ) ) {
894 0 : fd_feature_snoop_account( ctx->feature_snoop, &ctx->feature_reasm.pubkey,
895 0 : ctx->feature_reasm.lamports, ctx->feature_reasm.owner,
896 0 : ctx->feature_reasm.buf, 0UL );
897 0 : ctx->feature_reasm.capturing = 0;
898 0 : }
899 0 : }
900 :
901 0 : return 0;
902 0 : }
903 :
904 : static void
905 : process_account_data( fd_snapin_tile_t * ctx,
906 0 : fd_ssparse_advance_result_t * result ) {
907 0 : if( FD_UNLIKELY( ctx->slot_history.capturing ) ) {
908 0 : ulong remaining = ctx->slot_history.data_len - ctx->slot_history.write_pos;
909 0 : ulong copy_sz = fd_ulong_min( result->account_data.data_sz, remaining );
910 0 : memcpy( ctx->slot_history.buf + ctx->slot_history.write_pos, result->account_data.data, copy_sz );
911 0 : ctx->slot_history.write_pos += copy_sz;
912 0 : if( ctx->slot_history.write_pos==ctx->slot_history.data_len ) {
913 0 : ctx->slot_history.captured = 1;
914 0 : ctx->slot_history.capturing = 0;
915 0 : }
916 0 : }
917 :
918 0 : if( FD_UNLIKELY( ctx->feature_reasm.capturing ) ) {
919 0 : ulong remaining = ctx->feature_reasm.need - ctx->feature_reasm.write_pos;
920 0 : ulong copy_sz = fd_ulong_min( result->account_data.data_sz, remaining );
921 0 : memcpy( ctx->feature_reasm.buf + ctx->feature_reasm.write_pos, result->account_data.data, copy_sz );
922 0 : ctx->feature_reasm.write_pos += copy_sz;
923 0 : if( ctx->feature_reasm.write_pos==ctx->feature_reasm.need ) {
924 0 : fd_feature_snoop_account( ctx->feature_snoop, &ctx->feature_reasm.pubkey,
925 0 : ctx->feature_reasm.lamports, ctx->feature_reasm.owner,
926 0 : ctx->feature_reasm.buf, ctx->feature_reasm.need );
927 0 : ctx->feature_reasm.capturing = 0;
928 0 : }
929 0 : }
930 0 : }
931 :
932 : static int
933 : handle_data_frag( fd_snapin_tile_t * ctx,
934 : ulong chunk,
935 : ulong sz,
936 0 : fd_stem_context_t * stem ) {
937 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_FINISHING ) ) {
938 0 : FD_LOG_WARNING(( "received unexpected data frag while in state %s (%lu)",
939 0 : fd_ssctrl_state_str( (ulong)ctx->state ), (ulong)ctx->state ));
940 0 : transition_malformed( ctx, stem );
941 0 : return 0;
942 0 : }
943 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
944 : /* Ignore all data frags after observing an error in the stream until
945 : we receive fail & init control messages to restart processing. */
946 0 : return 0;
947 0 : }
948 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) ) {
949 0 : FD_LOG_ERR(( "received data frag during invalid state %s (%lu)",
950 0 : fd_ssctrl_state_str( (ulong)ctx->state ), (ulong)ctx->state ));
951 0 : }
952 :
953 0 : if( FD_UNLIKELY( chunk<ctx->in.chunk0 || chunk>ctx->in.wmark || sz>ctx->in.mtu ) ) FD_LOG_ERR(( "invalid data frag bounds (chunk=%lu chunk0=%lu wmark=%lu sz=%lu mtu=%lu)", chunk, ctx->in.chunk0, ctx->in.wmark, sz, ctx->in.mtu ));
954 :
955 0 : for(;;) {
956 0 : if( FD_UNLIKELY( sz-ctx->in.pos==0UL ) ) break;
957 :
958 0 : uchar const * data = (uchar const *)fd_chunk_to_laddr_const( ctx->in.wksp, chunk ) + ctx->in.pos;
959 :
960 0 : int early_exit = 0;
961 0 : fd_ssparse_advance_result_t result[1];
962 0 : int res = fd_ssparse_advance( ctx->ssparse, data, sz-ctx->in.pos, result );
963 0 : switch( res ) {
964 0 : case FD_SSPARSE_ADVANCE_ERROR:
965 0 : FD_LOG_WARNING(( "error while parsing snapshot stream" ));
966 0 : transition_malformed( ctx, stem );
967 0 : return 0;
968 0 : case FD_SSPARSE_ADVANCE_AGAIN:
969 0 : break;
970 0 : case FD_SSPARSE_ADVANCE_MANIFEST:
971 0 : case FD_SSPARSE_ADVANCE_MANIFEST_DONE: {
972 0 : if( FD_UNLIKELY( ctx->flags.manifest_done ) ) {
973 0 : FD_LOG_WARNING(( "excess data after manifest" ));
974 0 : transition_malformed( ctx, stem );
975 0 : return 0;
976 0 : }
977 0 : int parser_res = fd_ssmanifest_parser_consume( ctx->manifest_parser,
978 0 : result->manifest.data,
979 0 : result->manifest.data_sz );
980 0 : if( FD_UNLIKELY( parser_res==FD_SSMANIFEST_PARSER_ADVANCE_ERROR ) ) {
981 0 : FD_LOG_WARNING(( "error while parsing snapshot manifest" ));
982 0 : transition_malformed( ctx, stem );
983 0 : return 0;
984 0 : }
985 0 : if( res==FD_SSPARSE_ADVANCE_MANIFEST_DONE ) {
986 0 : if( FD_UNLIKELY( fd_ssmanifest_parser_fini( ctx->manifest_parser )!=FD_SSMANIFEST_PARSER_ADVANCE_DONE ) ) {
987 0 : FD_LOG_WARNING(( "manifest stream ended before parser was done" ));
988 0 : transition_malformed( ctx, stem );
989 0 : return 0;
990 0 : }
991 0 : ctx->flags.manifest_done = 1;
992 0 : }
993 0 : break;
994 0 : }
995 0 : case FD_SSPARSE_ADVANCE_STATUS_CACHE: {
996 0 : fd_slot_delta_parser_advance_result_t sd_result[1];
997 0 : ulong bytes_remaining = result->status_cache.data_sz;
998 :
999 0 : while( bytes_remaining ) {
1000 0 : int res = fd_slot_delta_parser_consume( ctx->slot_delta_parser,
1001 0 : result->status_cache.data,
1002 0 : bytes_remaining,
1003 0 : sd_result );
1004 0 : if( FD_UNLIKELY( res<0 ) ) {
1005 0 : FD_LOG_WARNING(( "error while parsing slot deltas in status cache" ));
1006 0 : transition_malformed( ctx, stem );
1007 0 : return 0;
1008 0 : } else if( FD_LIKELY( res==FD_SLOT_DELTA_PARSER_ADVANCE_GROUP ) ) {
1009 0 : if( FD_UNLIKELY( ctx->blockhash_offsets_len>=FD_SNAPIN_MAX_SLOT_DELTA_GROUPS ) ) {
1010 0 : FD_LOG_WARNING(( "blockhash offsets overflow, max is %lu", FD_SNAPIN_MAX_SLOT_DELTA_GROUPS ));
1011 0 : transition_malformed( ctx, stem );
1012 0 : return 0;
1013 0 : }
1014 :
1015 0 : memcpy( ctx->blockhash_offsets[ ctx->blockhash_offsets_len ].blockhash, sd_result->group.blockhash, 32UL );
1016 0 : ctx->blockhash_offsets[ ctx->blockhash_offsets_len ].txnhash_offset = sd_result->group.txnhash_offset;
1017 0 : ctx->blockhash_offsets_len++;
1018 0 : } else if( FD_LIKELY( res==FD_SLOT_DELTA_PARSER_ADVANCE_ENTRY ) ) {
1019 0 : if( FD_UNLIKELY( ctx->txncache_entries_len>=FD_SNAPIN_TXNCACHE_MAX_ENTRIES ) ) {
1020 0 : FD_LOG_WARNING(( "txncache entries overflow, max is %lu", FD_SNAPIN_TXNCACHE_MAX_ENTRIES ));
1021 0 : transition_malformed( ctx, stem );
1022 0 : return 0;
1023 0 : }
1024 0 : ctx->txncache_entries[ ctx->txncache_entries_len++ ] = *sd_result->entry;
1025 0 : }
1026 :
1027 0 : bytes_remaining -= sd_result->bytes_consumed;
1028 0 : result->status_cache.data += sd_result->bytes_consumed;
1029 0 : }
1030 :
1031 0 : if( FD_UNLIKELY( result->status_cache.done ) ) {
1032 0 : int fini_res = fd_slot_delta_parser_consume( ctx->slot_delta_parser, result->status_cache.data, 0UL, sd_result );
1033 0 : if( FD_UNLIKELY( fini_res<0 ) ) {
1034 0 : FD_LOG_WARNING(( "error while finalizing slot deltas in status cache" ));
1035 0 : transition_malformed( ctx, stem );
1036 0 : return 0;
1037 0 : }
1038 0 : ctx->flags.status_cache_done = fini_res==FD_SLOT_DELTA_PARSER_ADVANCE_DONE;
1039 0 : }
1040 0 : break;
1041 0 : }
1042 0 : case FD_SSPARSE_ADVANCE_ACCOUNT_HEADER:
1043 0 : early_exit = process_account_header( ctx, result );
1044 0 : if( FD_UNLIKELY( early_exit<0 ) ) {
1045 0 : transition_malformed( ctx, stem );
1046 0 : return 0;
1047 0 : }
1048 :
1049 0 : if( FD_UNLIKELY( ctx->gui_out.idx!=ULONG_MAX
1050 0 : && !memcmp( result->account_header.owner, fd_solana_config_program_id.key, sizeof(fd_hash_t) )
1051 0 : && result->account_header.data_len
1052 0 : && result->account_header.data_len<=FD_GUI_CONFIG_PARSE_MAX_VALID_ACCT_SZ ) ) {
1053 0 : ctx->gui_config_acct_sz = result->account_header.data_len;
1054 0 : ctx->gui_config_acct_off = 0UL;
1055 0 : } else {
1056 0 : ctx->gui_config_acct_sz = 0UL;
1057 0 : }
1058 0 : break;
1059 0 : case FD_SSPARSE_ADVANCE_ACCOUNT_DATA:
1060 0 : process_account_data( ctx, result );
1061 :
1062 : /* Account data may span multiple input chunks (when an account
1063 : straddles a decompressed chunk boundary), so we copy each
1064 : piece into the gui_out dcache and only publish once the full
1065 : account has been received.
1066 :
1067 : We expect ConfigKeys Vec to be length 2 (checked via the
1068 : first byte of the accumulated data). We expect the size of
1069 : ConfigProgram-owned accounts to be at most
1070 : FD_GUI_CONFIG_PARSE_MAX_VALID_ACCT_SZ, since this is the
1071 : size that the Solana CLI allocates for them. Although the
1072 : ConfigProgram itself does not enforce these invariants, the
1073 : vast majority of accounts (with a tiny number of excpetions
1074 : on devnet) are maintained with the Solana CLI. */
1075 0 : if( FD_UNLIKELY( ctx->gui_config_acct_sz ) ) {
1076 0 : uchar * acct = fd_chunk_to_laddr( ctx->gui_out.mem, ctx->gui_out.chunk );
1077 0 : fd_memcpy( acct + ctx->gui_config_acct_off, result->account_data.data, result->account_data.data_sz );
1078 0 : ctx->gui_config_acct_off += result->account_data.data_sz;
1079 :
1080 0 : if( FD_LIKELY( ctx->gui_config_acct_off>=ctx->gui_config_acct_sz ) ) {
1081 0 : ctx->gui_config_acct_sz = 0UL;
1082 0 : if( FD_LIKELY( acct[ 0 ]==2UL ) ) {
1083 0 : fd_stem_publish( stem, ctx->gui_out.idx, 0UL, ctx->gui_out.chunk, ctx->gui_config_acct_off, 0UL, 0UL, 0UL );
1084 0 : ctx->gui_out.chunk = fd_dcache_compact_next( ctx->gui_out.chunk, ctx->gui_config_acct_off, ctx->gui_out.chunk0, ctx->gui_out.wmark );
1085 0 : early_exit = 1;
1086 0 : }
1087 0 : }
1088 0 : }
1089 0 : break;
1090 0 : case FD_SSPARSE_ADVANCE_ACCOUNT_BATCH:
1091 0 : early_exit = process_account_batch( ctx, result );
1092 0 : if( FD_UNLIKELY( early_exit<0 ) ) {
1093 0 : transition_malformed( ctx, stem );
1094 0 : return 0;
1095 0 : }
1096 0 : break;
1097 0 : case FD_SSPARSE_ADVANCE_DONE:
1098 0 : ctx->state = FD_SNAPSHOT_STATE_FINISHING;
1099 0 : break;
1100 0 : default:
1101 0 : FD_LOG_ERR(( "unexpected fd_ssparse_advance result %d", res ));
1102 0 : break;
1103 0 : }
1104 :
1105 0 : if( FD_UNLIKELY( !ctx->flags.manifest_processed && ctx->flags.manifest_done && ctx->flags.status_cache_done ) ) {
1106 0 : process_manifest( ctx, stem );
1107 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) break;
1108 0 : ctx->flags.manifest_processed = 1;
1109 0 : }
1110 :
1111 0 : ctx->in.pos += result->bytes_consumed;
1112 0 : if( FD_LIKELY( ctx->full ) ) ctx->metrics.full_bytes_read += result->bytes_consumed;
1113 0 : else ctx->metrics.incremental_bytes_read += result->bytes_consumed;
1114 :
1115 0 : if( FD_UNLIKELY( early_exit ) ) break;
1116 0 : }
1117 :
1118 0 : int reprocess_frag = ctx->in.pos<sz;
1119 0 : if( FD_LIKELY( !reprocess_frag ) ) ctx->in.pos = 0UL;
1120 0 : return reprocess_frag;
1121 0 : }
1122 :
1123 : static int
1124 0 : validate_capitalization( fd_snapin_tile_t * ctx ) {
1125 0 : if( FD_UNLIKELY( ctx->capitalization!=ctx->manifest_capitalization ) ) {
1126 : /* SnapshotError::MismatchedCapitalization
1127 : https://github.com/anza-xyz/agave/blob/v4.0.0-beta.2/runtime/src/snapshot_bank_utils.rs#L217 */
1128 0 : FD_LOG_WARNING(( "%s snapshot manifest capitalization %lu does not match computed capitalization %lu",
1129 0 : ctx->full?"full":"incr", ctx->manifest_capitalization, ctx->capitalization ));
1130 0 : return -1;
1131 0 : }
1132 0 : return 0;
1133 0 : }
1134 :
1135 : static void
1136 : handle_control_frag( fd_snapin_tile_t * ctx,
1137 : fd_stem_context_t * stem,
1138 : ulong sig,
1139 0 : ulong chunk ) {
1140 0 : if( ctx->state==FD_SNAPSHOT_STATE_ERROR && sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) {
1141 : /* Control messages move along the snapshot load pipeline. Since
1142 : error conditions can be triggered by any tile in the pipeline,
1143 : it is possible to be in error state and still receive otherwise
1144 : valid messages. Only a fail message can revert this. */
1145 0 : return;
1146 0 : };
1147 :
1148 0 : int forward_msg = 1;
1149 :
1150 0 : switch( sig ) {
1151 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
1152 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR: {
1153 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
1154 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
1155 0 : ctx->full = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
1156 0 : ctx->in.pos = 0UL;
1157 0 : ctx->txncache_entries_len = 0UL;
1158 0 : ctx->blockhash_offsets_len = 0UL;
1159 0 : ctx->manifest_capitalization = 0UL;
1160 0 : fd_txncache_reset( ctx->txncache );
1161 0 : fd_ssparse_init( ctx->ssparse );
1162 0 : fd_ssparse_batch_enable( ctx->ssparse, 1 );
1163 0 : fd_ssmanifest_parser_init( ctx->manifest_parser, fd_chunk_to_laddr( ctx->manifest_out.mem, ctx->manifest_out.chunk ) );
1164 0 : fd_slot_delta_parser_init( ctx->slot_delta_parser );
1165 0 : fd_memset( &ctx->flags, 0, sizeof(ctx->flags) );
1166 :
1167 : /* Rewind metric counters (no-op unless recovering from a fail) */
1168 0 : if( sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL ) {
1169 0 : ctx->metrics.accounts_loaded = ctx->metrics.full_accounts_loaded = 0;
1170 0 : ctx->metrics.accounts_replaced = ctx->metrics.full_accounts_replaced = 0;
1171 0 : ctx->metrics.accounts_ignored = ctx->metrics.full_accounts_ignored = 0;
1172 0 : ctx->metrics.full_bytes_read = 0UL;
1173 0 : ctx->metrics.incremental_bytes_read = 0UL;
1174 0 : ctx->full_genesis_creation_time_seconds = 0UL;
1175 0 : ctx->capitalization = 0UL;
1176 0 : ctx->dup_capitalization = 0UL;
1177 0 : ctx->recovery.capitalization = 0UL;
1178 :
1179 0 : fd_accdb_reset( ctx->accdb );
1180 0 : fd_accdb_fork_id_t null_fork_id = (fd_accdb_fork_id_t){ .val = USHORT_MAX };
1181 0 : ctx->accdb_root_fork_id = fd_accdb_attach_child( ctx->accdb, null_fork_id );
1182 :
1183 0 : fd_accdb_snapshot_load_begin( ctx->accdb );
1184 :
1185 0 : ctx->slot_history.captured = 0;
1186 0 : ctx->slot_history.capturing = 0;
1187 :
1188 0 : fd_memset( ctx->feature_snoop, 0, sizeof(ctx->feature_snoop) );
1189 0 : ctx->feature_reasm.capturing = 0;
1190 0 : } else {
1191 0 : ctx->metrics.accounts_loaded = ctx->metrics.full_accounts_loaded;
1192 0 : ctx->metrics.accounts_replaced = ctx->metrics.full_accounts_replaced;
1193 0 : ctx->metrics.accounts_ignored = ctx->metrics.full_accounts_ignored;
1194 0 : ctx->metrics.incremental_bytes_read = 0UL;
1195 :
1196 0 : ctx->capitalization = ctx->recovery.capitalization;
1197 0 : ctx->dup_capitalization = 0UL;
1198 :
1199 : /* Discard stale capture so the retry's sysvar is snooped fresh */
1200 0 : ctx->slot_history.captured = 0;
1201 0 : ctx->slot_history.capturing = 0;
1202 0 : ctx->feature_reasm.capturing = 0;
1203 :
1204 : /* Create a child fork for incremental writes. On failure,
1205 : fd_accdb_purge(child) reverts just the incremental changes.
1206 : On success, fd_accdb_advance_root(child) promotes them. */
1207 0 : ctx->accdb_incr_fork_id = fd_accdb_attach_child( ctx->accdb, ctx->accdb_root_fork_id );
1208 0 : }
1209 :
1210 : /* Save the slot advertised by the snapshot peer and verify it
1211 : against the slot in the snapshot manifest. For downloaded
1212 : snapshots, this is simply a best estimate. The actual
1213 : advertised slot for downloaded snapshots is received in a
1214 : separate fd_ssctrl_meta_t message below. */
1215 0 : fd_ssctrl_init_t const * msg = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
1216 0 : ctx->advertised_slot = msg->slot;
1217 0 : fd_memcpy( ctx->advertised_hash, msg->snapshot_hash, FD_HASH_FOOTPRINT );
1218 0 : break;
1219 0 : }
1220 :
1221 0 : case FD_SNAPSHOT_MSG_CTRL_FINI: {
1222 : /* This is a special case: handle_data_frag must have already
1223 : processed FD_SSPARSE_ADVANCE_DONE and moved the state into
1224 : FD_SNAPSHOT_STATE_FINISHING. Otherwise, treat this as a
1225 : malformed snapshot so that the pipeline can retry. */
1226 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_FINISHING ) ) {
1227 0 : FD_LOG_WARNING(( "received FINI while in state %s (%lu), expected FINISHING (possibly truncated tar stream)",
1228 0 : fd_ssctrl_state_str( (ulong)ctx->state ), (ulong)ctx->state ));
1229 0 : transition_malformed( ctx, stem );
1230 0 : forward_msg = 0;
1231 0 : break;
1232 0 : }
1233 0 : break;
1234 0 : }
1235 :
1236 0 : case FD_SNAPSHOT_MSG_CTRL_NEXT: {
1237 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
1238 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
1239 :
1240 0 : if( FD_UNLIKELY( verify_slot_deltas_with_slot_history( ctx ) ) ) {
1241 0 : FD_LOG_WARNING(( "slot deltas verification failed for full snapshot" ));
1242 0 : transition_malformed( ctx, stem );
1243 0 : forward_msg = 0;
1244 0 : break;
1245 0 : }
1246 :
1247 0 : ctx->capitalization = fd_ulong_sat_sub( ctx->capitalization, ctx->dup_capitalization );
1248 0 : if( FD_UNLIKELY( validate_capitalization( ctx )!=0 ) ) {
1249 0 : transition_malformed( ctx, stem );
1250 0 : forward_msg = 0;
1251 0 : break;
1252 0 : }
1253 :
1254 0 : ctx->recovery.capitalization = ctx->capitalization;
1255 0 : fd_accdb_snapshot_save_whead( ctx->accdb, &ctx->recovery.accdb_metadata );
1256 0 : ctx->recovery.feature_snoop = *ctx->feature_snoop;
1257 :
1258 : /* Backup metric counters */
1259 0 : ctx->metrics.full_accounts_loaded = ctx->metrics.accounts_loaded;
1260 0 : ctx->metrics.full_accounts_replaced = ctx->metrics.accounts_replaced;
1261 0 : ctx->metrics.full_accounts_ignored = ctx->metrics.accounts_ignored;
1262 0 : break;
1263 0 : }
1264 :
1265 0 : case FD_SNAPSHOT_MSG_CTRL_DONE: {
1266 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
1267 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
1268 :
1269 0 : if( FD_UNLIKELY( verify_slot_deltas_with_slot_history( ctx ) ) ) {
1270 0 : if( ctx->full ) FD_LOG_WARNING(( "slot deltas verification failed for full snapshot" ));
1271 0 : else FD_LOG_WARNING(( "slot deltas verification failed for incremental snapshot" ));
1272 0 : transition_malformed( ctx, stem );
1273 0 : forward_msg = 0;
1274 0 : break;
1275 0 : }
1276 :
1277 0 : ctx->capitalization = fd_ulong_sat_sub( ctx->capitalization, ctx->dup_capitalization );
1278 0 : if( FD_UNLIKELY( validate_capitalization( ctx )!=0 ) ) {
1279 0 : transition_malformed( ctx, stem );
1280 0 : forward_msg = 0;
1281 0 : break;
1282 0 : }
1283 :
1284 0 : if( !ctx->full ) {
1285 0 : fd_accdb_advance_root( ctx->accdb, ctx->accdb_incr_fork_id );
1286 0 : ctx->accdb_root_fork_id = ctx->accdb_incr_fork_id;
1287 0 : ctx->accdb_incr_fork_id = (fd_accdb_fork_id_t){ .val = USHORT_MAX };
1288 0 : }
1289 :
1290 0 : fd_accdb_snapshot_load_end( ctx->accdb );
1291 :
1292 0 : fd_feature_snoop_finalize( &ctx->bank->f.features, ctx->bank_slot, &ctx->epoch_schedule, ctx->feature_snoop );
1293 :
1294 : /* Notify replay when snapshot is fully loaded and verified. */
1295 0 : fd_stem_publish( stem, ctx->manifest_out.idx, fd_ssmsg_sig( FD_SSMSG_DONE ), 0UL, 0UL, 0UL, 0UL, 0UL );
1296 0 : break;
1297 0 : }
1298 :
1299 0 : case FD_SNAPSHOT_MSG_CTRL_ERROR: {
1300 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
1301 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
1302 0 : break;
1303 0 : }
1304 :
1305 0 : case FD_SNAPSHOT_MSG_CTRL_FAIL: {
1306 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
1307 0 : if( ctx->full ) {
1308 0 : fd_accdb_reset( ctx->accdb );
1309 0 : ctx->accdb_root_fork_id = (fd_accdb_fork_id_t){ .val = USHORT_MAX };
1310 0 : ctx->accdb_incr_fork_id = (fd_accdb_fork_id_t){ .val = USHORT_MAX };
1311 0 : } else {
1312 0 : fd_accdb_purge( ctx->accdb, ctx->accdb_incr_fork_id ); /* this fork and subsequent children */
1313 0 : fd_accdb_snapshot_revert_whead( ctx->accdb, &ctx->recovery.accdb_metadata );
1314 0 : ctx->accdb_incr_fork_id = (fd_accdb_fork_id_t){ .val = USHORT_MAX };
1315 0 : *ctx->feature_snoop = ctx->recovery.feature_snoop;
1316 0 : }
1317 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
1318 0 : break;
1319 0 : }
1320 :
1321 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: {
1322 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
1323 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
1324 0 : break;
1325 0 : }
1326 :
1327 0 : default: {
1328 0 : FD_LOG_ERR(( "unexpected control frag %s (%lu) in state %s (%lu)",
1329 0 : fd_ssctrl_msg_ctrl_str( sig ), sig,
1330 0 : fd_ssctrl_state_str( (ulong)ctx->state ), (ulong)ctx->state ));
1331 0 : break;
1332 0 : }
1333 0 : }
1334 :
1335 : /* Forward the control message down the pipeline */
1336 0 : if( FD_LIKELY( forward_msg ) ) {
1337 0 : fd_stem_publish( stem, ctx->ct_out.idx, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
1338 0 : }
1339 0 : }
1340 :
1341 : static inline int
1342 : returnable_frag( fd_snapin_tile_t * ctx,
1343 : ulong in_idx FD_PARAM_UNUSED,
1344 : ulong seq FD_PARAM_UNUSED,
1345 : ulong sig,
1346 : ulong chunk,
1347 : ulong sz,
1348 : ulong ctl FD_PARAM_UNUSED,
1349 : ulong tsorig FD_PARAM_UNUSED,
1350 : ulong tspub FD_PARAM_UNUSED,
1351 0 : fd_stem_context_t * stem ) {
1352 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
1353 :
1354 0 : if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) return handle_data_frag( ctx, chunk, sz, stem );
1355 0 : else handle_control_frag( ctx, stem, sig, chunk );
1356 :
1357 0 : return 0;
1358 0 : }
1359 :
1360 : static ulong
1361 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
1362 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
1363 : ulong out_fds_cnt,
1364 0 : int * out_fds ) {
1365 0 : if( FD_UNLIKELY( out_fds_cnt<3UL ) ) FD_LOG_ERR(( "invalid out_fds_cnt %lu", out_fds_cnt ));
1366 :
1367 0 : ulong out_cnt = 0;
1368 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
1369 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
1370 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
1371 0 : }
1372 0 : out_fds[ out_cnt++ ] = FD_ACCDB_FD_RW; /* accounts db */
1373 :
1374 0 : return out_cnt;
1375 0 : }
1376 :
1377 : static ulong
1378 : populate_allowed_seccomp( fd_topo_t const * topo,
1379 : fd_topo_tile_t const * tile,
1380 : ulong out_cnt,
1381 0 : struct sock_filter * out ) {
1382 0 : (void)topo; (void)tile;
1383 0 : populate_sock_filter_policy_fd_snapin_tile( out_cnt, out, (uint)fd_log_private_logfile_fd(), FD_ACCDB_FD_RW );
1384 0 : return sock_filter_policy_fd_snapin_tile_instr_cnt;
1385 0 : }
1386 :
1387 : static void
1388 : privileged_init( fd_topo_t const * topo,
1389 0 : fd_topo_tile_t const * tile ) {
1390 0 : fd_snapin_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1391 0 : memset( ctx, 0, sizeof(fd_snapin_tile_t) );
1392 0 : FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
1393 0 : }
1394 :
1395 : static inline fd_snapin_out_link_t
1396 : out1( fd_topo_t const * topo,
1397 : fd_topo_tile_t const * tile,
1398 0 : char const * name ) {
1399 0 : ulong idx = fd_topo_find_tile_out_link( topo, tile, name, 0UL );
1400 :
1401 0 : if( FD_UNLIKELY( idx==ULONG_MAX ) ) return (fd_snapin_out_link_t){ .idx = ULONG_MAX, .mem = NULL, .chunk0 = 0, .wmark = 0, .chunk = 0, .mtu = 0 };
1402 :
1403 0 : ulong mtu = topo->links[ tile->out_link_id[ idx ] ].mtu;
1404 0 : if( FD_UNLIKELY( mtu==0UL ) ) return (fd_snapin_out_link_t){ .idx = idx, .mem = NULL, .chunk0 = ULONG_MAX, .wmark = ULONG_MAX, .chunk = ULONG_MAX, .mtu = mtu };
1405 :
1406 0 : void * mem = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ idx ] ].dcache_obj_id ].wksp_id ].wksp;
1407 0 : ulong chunk0 = fd_dcache_compact_chunk0( mem, topo->links[ tile->out_link_id[ idx ] ].dcache );
1408 0 : ulong wmark = fd_dcache_compact_wmark ( mem, topo->links[ tile->out_link_id[ idx ] ].dcache, mtu );
1409 0 : return (fd_snapin_out_link_t){ .idx = idx, .mem = mem, .chunk0 = chunk0, .wmark = wmark, .chunk = chunk0, .mtu = mtu };
1410 0 : }
1411 :
1412 : static void
1413 : unprivileged_init( fd_topo_t const * topo,
1414 0 : fd_topo_tile_t const * tile ) {
1415 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
1416 :
1417 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
1418 0 : fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
1419 0 : void * _txncache = FD_SCRATCH_ALLOC_APPEND( l, fd_txncache_align(), fd_txncache_footprint( tile->snapin.max_live_slots ) );
1420 0 : void * _accdb = FD_SCRATCH_ALLOC_APPEND( l, fd_accdb_align(), fd_accdb_footprint( tile->snapin.max_live_slots ) );
1421 0 : void * _manifest_parser = FD_SCRATCH_ALLOC_APPEND( l, fd_ssmanifest_parser_align(), fd_ssmanifest_parser_footprint() );
1422 0 : void * _sd_parser = FD_SCRATCH_ALLOC_APPEND( l, fd_slot_delta_parser_align(), fd_slot_delta_parser_footprint() );
1423 0 : ctx->blockhash_offsets = FD_SCRATCH_ALLOC_APPEND( l, alignof(blockhash_group_t), sizeof(blockhash_group_t)*FD_SNAPIN_MAX_SLOT_DELTA_GROUPS );
1424 0 : ctx->txncache_entries = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sstxncache_entry_t), sizeof(fd_sstxncache_entry_t)*FD_SNAPIN_TXNCACHE_MAX_ENTRIES );
1425 :
1426 0 : ctx->full = 1;
1427 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
1428 :
1429 0 : void * _accdb_shmem = fd_topo_obj_laddr( topo, tile->snapin.accdb_obj_id );
1430 0 : fd_accdb_shmem_t * accdb_shmem = fd_accdb_shmem_join( _accdb_shmem );
1431 0 : FD_TEST( accdb_shmem );
1432 0 : ctx->accdb = fd_accdb_join( fd_accdb_new( _accdb, accdb_shmem, FD_ACCDB_FD_RW, 0UL, NULL ) );
1433 0 : FD_TEST( ctx->accdb );
1434 :
1435 0 : void * _txncache_shmem = fd_topo_obj_laddr( topo, tile->snapin.txncache_obj_id );
1436 0 : fd_txncache_shmem_t * txncache_shmem = fd_txncache_shmem_join( _txncache_shmem );
1437 0 : FD_TEST( txncache_shmem );
1438 0 : ctx->txncache = fd_txncache_join( fd_txncache_new( _txncache, txncache_shmem ) );
1439 0 : FD_TEST( ctx->txncache );
1440 :
1441 0 : ctx->banks = fd_banks_join( fd_topo_obj_laddr( topo, tile->snapin.banks_obj_id ) );
1442 0 : FD_TEST( ctx->banks );
1443 0 : ctx->bank = fd_banks_init_bank( ctx->banks );
1444 0 : FD_TEST( ctx->bank );
1445 0 : FD_TEST( ctx->bank->idx==0UL );
1446 :
1447 0 : ctx->txncache_entries_len = 0UL;
1448 0 : ctx->blockhash_offsets_len = 0UL;
1449 :
1450 0 : ctx->manifest_parser = fd_ssmanifest_parser_join( fd_ssmanifest_parser_new( _manifest_parser ) );
1451 0 : FD_TEST( ctx->manifest_parser );
1452 :
1453 0 : ctx->slot_delta_parser = fd_slot_delta_parser_join( fd_slot_delta_parser_new( _sd_parser ) );
1454 0 : FD_TEST( ctx->slot_delta_parser );
1455 :
1456 0 : fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
1457 :
1458 0 : if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
1459 0 : if( FD_UNLIKELY( tile->in_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1", tile->in_cnt ));
1460 :
1461 0 : ctx->ct_out = out1( topo, tile, "snapin_ct" );
1462 0 : ctx->manifest_out = out1( topo, tile, "snapin_manif" );
1463 0 : ctx->gui_out = out1( topo, tile, "snapin_gui" );
1464 :
1465 0 : if( FD_UNLIKELY( ctx->ct_out.idx==ULONG_MAX ) ) FD_LOG_ERR(( "tile `" NAME "` missing required out link `snapin_ct`" ));
1466 0 : if( FD_UNLIKELY( ctx->manifest_out.idx==ULONG_MAX ) ) FD_LOG_ERR(( "tile `" NAME "` missing required out link `snapin_manif`" ));
1467 :
1468 0 : fd_ssparse_init( ctx->ssparse );
1469 0 : fd_ssmanifest_parser_init( ctx->manifest_parser, fd_chunk_to_laddr( ctx->manifest_out.mem, ctx->manifest_out.chunk ) );
1470 0 : fd_slot_delta_parser_init( ctx->slot_delta_parser );
1471 :
1472 0 : fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0UL ] ];
1473 0 : FD_TEST( 0==strcmp( in_link->name, "snapdc_in" ) );
1474 0 : fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
1475 0 : ctx->in.wksp = in_wksp->wksp;
1476 0 : ctx->in.chunk0 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
1477 0 : ctx->in.wmark = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
1478 0 : ctx->in.mtu = in_link->mtu;
1479 0 : ctx->in.pos = 0UL;
1480 :
1481 0 : ctx->gui_config_acct_sz = 0UL;
1482 0 : ctx->gui_config_acct_off = 0UL;
1483 :
1484 0 : ctx->advertised_slot = 0UL;
1485 0 : ctx->bank_slot = 0UL;
1486 0 : ctx->epoch = 0UL;
1487 :
1488 0 : ctx->full_genesis_creation_time_seconds = 0UL;
1489 0 : ctx->manifest_capitalization = 0UL;
1490 0 : ctx->capitalization = 0UL;
1491 0 : ctx->dup_capitalization = 0UL;
1492 0 : ctx->recovery.capitalization = 0UL;
1493 0 : memset( &ctx->recovery.accdb_metadata, 0, sizeof(ctx->recovery.accdb_metadata) );
1494 :
1495 0 : ctx->accdb_root_fork_id = (fd_accdb_fork_id_t){ .val = USHORT_MAX };
1496 0 : ctx->accdb_incr_fork_id = (fd_accdb_fork_id_t){ .val = USHORT_MAX };
1497 :
1498 0 : fd_memset( &ctx->flags, 0, sizeof(ctx->flags) );
1499 0 : ctx->boot_timestamp = fd_log_wallclock();
1500 0 : }
1501 :
1502 : /* There are 3 output links that affect the calculation of STEM_BURST:
1503 : 1. snapin_ct
1504 : 2. snapin_manif - worst case: 1 message
1505 : 3. snapin_gui - worst case: 1 message (config program account)
1506 : The STEM_BURST is the max value across these 3 links (not the sum).
1507 : Note that snapin_txn is excluded from this calculation, since it is
1508 : an unreliable link, working as a dcache place holder. */
1509 0 : #define STEM_BURST 1UL
1510 :
1511 0 : #define STEM_LAZY (128L*3000L)
1512 :
1513 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapin_tile_t
1514 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapin_tile_t)
1515 :
1516 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
1517 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
1518 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
1519 :
1520 : #include "../../disco/stem/fd_stem.c"
1521 :
1522 : fd_topo_run_tile_t fd_tile_snapin = {
1523 : .name = NAME,
1524 : .populate_allowed_fds = populate_allowed_fds,
1525 : .populate_allowed_seccomp = populate_allowed_seccomp,
1526 : .scratch_align = scratch_align,
1527 : .scratch_footprint = scratch_footprint,
1528 : .privileged_init = privileged_init,
1529 : .unprivileged_init = unprivileged_init,
1530 : .run = stem_run,
1531 : };
1532 :
1533 : #undef NAME
|