Line data Source code
1 : #include "fd_snapin_tile_private.h"
2 : #include "utils/fd_ssctrl.h"
3 : #include "utils/fd_ssmsg.h"
4 : #include "utils/fd_vinyl_io_wd.h"
5 :
6 : #include "../../disco/topo/fd_topo.h"
7 : #include "../../disco/metrics/fd_metrics.h"
8 : #include "../../disco/gui/fd_gui_config_parse.h"
9 : #include "../../flamenco/accdb/fd_accdb_admin_v1.h"
10 : #include "../../flamenco/accdb/fd_accdb_impl_v1.h"
11 : #include "../../flamenco/runtime/fd_txncache.h"
12 : #include "../../flamenco/runtime/fd_system_ids.h"
13 : #include "../../flamenco/runtime/sysvar/fd_sysvar_slot_history.h"
14 : #include "../../flamenco/types/fd_types.h"
15 :
16 : #include "generated/fd_snapin_tile_seccomp.h"
17 :
18 : #define NAME "snapin"
19 :
20 : /* The snapin tile is a state machine that parses and loads a full
21 : and optionally an incremental snapshot. It is currently responsible
22 : for loading accounts into an in-memory database, though this may
23 : change. */
24 :
25 : /* 300 root slots in the slot deltas array, and each one references all
26 : 151 prior blockhashes that it's able to. */
27 : #define FD_SNAPIN_MAX_SLOT_DELTA_GROUPS (300UL*151UL)
28 :
29 : struct fd_blockhash_entry {
30 : fd_hash_t blockhash;
31 :
32 : struct {
33 : ulong prev;
34 : ulong next;
35 : } map;
36 : };
37 :
38 : typedef struct fd_blockhash_entry fd_blockhash_entry_t;
39 :
40 : #define MAP_NAME blockhash_map
41 0 : #define MAP_KEY blockhash
42 : #define MAP_KEY_T fd_hash_t
43 : #define MAP_ELE_T fd_blockhash_entry_t
44 0 : #define MAP_KEY_EQ(k0,k1) (!memcmp((k0),(k1), sizeof(fd_hash_t)))
45 0 : #define MAP_KEY_HASH(key,seed) (fd_hash((seed),(key),sizeof(fd_hash_t)))
46 0 : #define MAP_PREV map.prev
47 0 : #define MAP_NEXT map.next
48 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
49 : #include "../../util/tmpl/fd_map_chain.c"
50 :
51 : static inline int
52 0 : should_shutdown( fd_snapin_tile_t * ctx ) {
53 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN && !ctx->use_vinyl ) ) {
54 : /* This only needs to be logged under funk. When vinyl is enabled,
55 : snapwm will log instead. */
56 0 : ulong accounts_dup = ctx->metrics.accounts_ignored + ctx->metrics.accounts_replaced;
57 0 : ulong accounts = ctx->metrics.accounts_loaded - accounts_dup;
58 0 : long elapsed_ns = fd_log_wallclock() - ctx->boot_timestamp;
59 0 : FD_LOG_NOTICE(( "loaded %.1fM accounts (%.1fM dups) from snapshot in %.3f seconds",
60 0 : (double)accounts/1e6,
61 0 : (double)accounts_dup/1e6,
62 0 : (double)elapsed_ns/1e9 ));
63 0 : }
64 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
65 0 : }
66 :
67 : static ulong
68 0 : scratch_align( void ) {
69 0 : return 512UL;
70 0 : }
71 :
72 : static ulong
73 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
74 0 : (void)tile;
75 0 : ulong l = FD_LAYOUT_INIT;
76 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
77 0 : l = FD_LAYOUT_APPEND( l, fd_ssparse_align(), fd_ssparse_footprint( 1UL<<24UL ) );
78 0 : l = FD_LAYOUT_APPEND( l, fd_txncache_align(), fd_txncache_footprint( tile->snapin.max_live_slots ) );
79 0 : l = FD_LAYOUT_APPEND( l, fd_ssmanifest_parser_align(), fd_ssmanifest_parser_footprint() );
80 0 : l = FD_LAYOUT_APPEND( l, fd_slot_delta_parser_align(), fd_slot_delta_parser_footprint() );
81 0 : l = FD_LAYOUT_APPEND( l, alignof(blockhash_group_t), sizeof(blockhash_group_t)*FD_SNAPIN_MAX_SLOT_DELTA_GROUPS );
82 0 : if( !tile->snapin.use_vinyl ) {
83 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_sstxncache_entry_t), sizeof(fd_sstxncache_entry_t)*FD_SNAPIN_TXNCACHE_MAX_ENTRIES );
84 0 : }
85 0 : return FD_LAYOUT_FINI( l, scratch_align() );
86 0 : }
87 :
88 : static void
89 0 : metrics_write( fd_snapin_tile_t * ctx ) {
90 0 : FD_MGAUGE_SET( SNAPIN, STATE, (ulong)ctx->state );
91 0 : FD_MGAUGE_SET( SNAPIN, FULL_BYTES_READ, ctx->metrics.full_bytes_read );
92 0 : FD_MGAUGE_SET( SNAPIN, INCREMENTAL_BYTES_READ, ctx->metrics.incremental_bytes_read );
93 0 : FD_MGAUGE_SET( SNAPIN, ACCOUNTS_LOADED, ctx->metrics.accounts_loaded );
94 0 : FD_MGAUGE_SET( SNAPIN, ACCOUNTS_REPLACED, ctx->metrics.accounts_replaced );
95 0 : FD_MGAUGE_SET( SNAPIN, ACCOUNTS_IGNORED, ctx->metrics.accounts_ignored );
96 0 : }
97 :
98 : /* verify_slot_deltas_with_slot_history verifies the 'SlotHistory'
99 : sysvar account after loading a snapshot. The full database
100 : architecture is only instantiated after snapshot loading, so this
101 : function uses a primitive/cache-free mechanism to query the parts of
102 : the account database that are available.
103 :
104 : Returns 0 if verification passed, -1 if not. */
105 :
106 : static int
107 0 : verify_slot_deltas_with_slot_history( fd_snapin_tile_t * ctx ) {
108 : /* Do a raw read of the slot history sysvar account from the database.
109 : Requires approx 500kB stack space. */
110 :
111 0 : fd_account_meta_t meta;
112 0 : uchar data[ FD_SYSVAR_SLOT_HISTORY_BINCODE_SZ ];
113 0 : union {
114 0 : uchar buf[ FD_SYSVAR_SLOT_HISTORY_FOOTPRINT ];
115 0 : fd_slot_history_global_t o;
116 0 : } decoded;
117 0 : FD_STATIC_ASSERT( offsetof( __typeof__(decoded), buf)==offsetof( __typeof__(decoded), o ), memory_layout );
118 0 : fd_snapin_read_account( ctx, &fd_sysvar_slot_history_id, &meta, data, sizeof(data) );
119 :
120 0 : if( FD_UNLIKELY( !meta.lamports || !meta.dlen ) ) {
121 0 : FD_LOG_WARNING(( "SlotHistory sysvar account missing or empty" ));
122 0 : return -1;
123 0 : }
124 0 : if( FD_UNLIKELY( meta.dlen > FD_SYSVAR_SLOT_HISTORY_BINCODE_SZ ) ) {
125 0 : FD_LOG_WARNING(( "SlotHistory sysvar account data too large: %u bytes", meta.dlen ));
126 0 : return -1;
127 0 : }
128 0 : if( FD_UNLIKELY( !fd_memeq( meta.owner, fd_sysvar_owner_id.uc, sizeof(fd_pubkey_t) ) ) ) {
129 0 : FD_BASE58_ENCODE_32_BYTES( meta.owner, owner_b58 );
130 0 : FD_LOG_WARNING(( "SlotHistory sysvar owner is invalid: %s != sysvar_owner_id", owner_b58 ));
131 0 : return -1;
132 0 : }
133 :
134 0 : if( FD_UNLIKELY(
135 0 : !fd_bincode_decode_static_global(
136 0 : slot_history,
137 0 : &decoded.o,
138 0 : data,
139 0 : meta.dlen,
140 0 : NULL )
141 0 : ) ) {
142 0 : FD_LOG_WARNING(( "SlotHistory sysvar account data is corrupt" ));
143 0 : return -1;
144 0 : }
145 :
146 : /* Sanity checks for slot history:
147 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L586 */
148 :
149 0 : ulong newest_slot = fd_sysvar_slot_history_newest( &decoded.o );
150 0 : if( FD_UNLIKELY( newest_slot!=ctx->bank_slot ) ) {
151 : /* VerifySlotHistoryError::InvalidNewestSlot
152 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L621 */
153 0 : FD_LOG_WARNING(( "SlotHistory sysvar has an invalid newest slot: %lu != bank slot: %lu", newest_slot, ctx->bank_slot ));
154 0 : return -1;
155 0 : }
156 :
157 0 : ulong slot_history_len = fd_sysvar_slot_history_len( &decoded.o );
158 0 : if( FD_UNLIKELY( slot_history_len!=FD_SLOT_HISTORY_MAX_ENTRIES ) ) {
159 : /* VerifySlotHistoryError::InvalidNumEntries
160 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L625 */
161 0 : FD_LOG_WARNING(( "SLotHistory sysvar has invalid number of entries: %lu != expected: %lu", slot_history_len, FD_SLOT_HISTORY_MAX_ENTRIES ));
162 0 : return -1;
163 0 : }
164 :
165 : /* All slots in the txncache should be present in the slot history */
166 0 : for( ulong i=0UL; i<ctx->txncache_entries_len; i++ ) {
167 0 : fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[i];
168 0 : if( FD_UNLIKELY( fd_sysvar_slot_history_find_slot( &decoded.o, entry->slot )!=FD_SLOT_HISTORY_SLOT_FOUND ) ) {
169 : /* VerifySlotDeltasError::SlotNotFoundInHistory
170 : https://github.com/anza-xyz/agave/blob/v3.1.8/snapshots/src/error.rs#L144
171 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L593 */
172 0 : FD_LOG_WARNING(( "slot %lu missing from SlotHistory sysvar account", entry->slot ));
173 0 : return -1;
174 0 : }
175 0 : }
176 :
177 : /* The most recent slots (up to the number of slots in the txncache)
178 : in the SlotHistory should be present in the txncache. */
179 0 : fd_slot_delta_slot_set_t slot_set = fd_slot_delta_parser_slot_set( ctx->slot_delta_parser );
180 0 : for( ulong i=newest_slot; i>newest_slot-slot_set.ele_cnt; i-- ) {
181 0 : if( FD_LIKELY( fd_sysvar_slot_history_find_slot( &decoded.o, i ) )==FD_SLOT_HISTORY_SLOT_FOUND ) {
182 0 : if( FD_UNLIKELY( slot_set_ele_query( slot_set.map, &i, NULL, slot_set.pool )==NULL ) ) {
183 : /* VerifySlotDeltasError::SlotNotFoundInDeltas
184 : https://github.com/anza-xyz/agave/blob/v3.1.8/snapshots/src/error.rs#L147
185 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L609 */
186 0 : FD_LOG_WARNING(( "slot %lu missing from slot deltas but present in SlotHistory", i ));
187 0 : return -1;
188 0 : }
189 0 : }
190 0 : }
191 0 : return 0;
192 0 : }
193 :
194 : static int
195 : verify_slot_deltas_with_bank_slot( fd_snapin_tile_t * ctx,
196 0 : ulong bank_slot ) {
197 0 : for( ulong i=0UL; i<ctx->txncache_entries_len; i++ ) {
198 0 : fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[i];
199 : /* VerifySlotDeltasError::SlotGreaterThanMaxRoot
200 : https://github.com/anza-xyz/agave/blob/v3.1.8/snapshots/src/error.rs#L138
201 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L550 */
202 0 : if( FD_UNLIKELY( entry->slot>bank_slot ) ) return -1;
203 0 : }
204 0 : return 0;
205 0 : }
206 :
207 : static void
208 : transition_malformed( fd_snapin_tile_t * ctx,
209 0 : fd_stem_context_t * stem ) {
210 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) return;
211 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
212 0 : fd_stem_publish( stem, ctx->out_ct_idx, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
213 0 : }
214 :
215 : static int
216 : populate_txncache( fd_snapin_tile_t * ctx,
217 : fd_snapshot_manifest_blockhash_t const blockhashes[ static 301UL ],
218 0 : ulong blockhashes_len ) {
219 : /* Our txncache internally contains the fork structure for the chain,
220 : which we need to recreate here. Because snapshots are only served
221 : for rooted slots, there is actually no forking, and the bank forks
222 : are just a single bank, the root, like
223 :
224 : _root
225 :
226 : But the txncache also must contain the 150 more recent banks prior
227 : to the root (151 rooted banks total), looking like,
228 :
229 :
230 : _root_150 -> _root_149 -> ... -> _root_2 -> _root_1 -> _root
231 :
232 : Our txncache is "slot agnostic" meaning there is no concept of a
233 : slot number in it. It just has a fork tree structure. So long as
234 : the fork tree is isomorphic to the actual bank forks, and each bank
235 : has the correct blockhash, it works.
236 :
237 : So the challenge is simply to create this chain of 151 forks in the
238 : txncache, with correct blockhashes, and then insert all the
239 : transactions into it.
240 :
241 : Constructing the chain of blockhashes is easy. It is just the
242 : BLOCKHASH_QUEUE array in the manifest. This array is unfortuantely
243 : not sorted and appears in random order, but it has a hash_index
244 : field which is a gapless index, starting at some arbitrary offset,
245 : so we can back out the 151 blockhashes we need from this, by first
246 : finding the max hash_index as _max and then collecting hash entries
247 : via,
248 :
249 : _root_150 -> _root_149 -> ... -> _root_2 -> _root_1 -> _root
250 : _max-150 -> _max-149 -> ... -> _max-2 -> _max-1 -> _max
251 :
252 : Now the remaining problem is inserting transactions into this
253 : chain. Remember each transaction needs to be inserted with:
254 :
255 : (a) The fork ID (position of the bank in the chain) it was executed in.
256 : (b) The blockhash of the bank it referenced.
257 :
258 : (b) is trivial to retrieve, as it's in the actual slot_deltas entry
259 : in the manifest served by Agave. But (a) is mildly annoying. Agave
260 : serves slot_deltas based on slot, so we need an additional mapping
261 : from slot to position in our banks chain. It turns out we have to
262 : go to yet another structure in the manifest to retrieve this, the
263 : ancestors array. This is just an array of slot values, so we need
264 : to sort it, and line it up against our banks chain like so,
265 :
266 : _root_150 -> _root_149 -> ... -> _root_2 -> _root_1 -> _root
267 : _max-150 -> _max-149 -> ... -> _max-2 -> _max-1 -> _max
268 : _slots_150 -> _slots_149 -> ... -> _slots_2 -> _slots_1 -> _slots
269 :
270 : From there we are done.
271 :
272 : Well almost ... if you were paying attention you might have noticed
273 : this is a lot of work and we are lazy. Why don't we just ignore the
274 : slot mapping and assume everything executed at the root slot
275 : exactly? The only invariant we should maintain from a memory
276 : perspective is that at most, across all active banks,
277 : FD_MAX_TXN_PER_SLOT transactions are stored per slot, but we
278 : have preserved that. It is not true "per slot" technically, but
279 : it's true across all slots, and the memory is aggregated. It will
280 : also always be true, even as slots are garbage collected, because
281 : entries are collected by referece blockhash, not executed slot.
282 :
283 : ... actually we can't do this. There's more broken things here.
284 : The Agave status decided to only store 20 bytes for 32 byte
285 : transaction hashes to save on memory. That's OK, but they didn't
286 : just take the first 20 bytes. They instead, for each blockhash,
287 : take a random offset between 0 and 12, and store bytes
288 : [ offset, offset+20 ) of the transaction hash. We need to know this
289 : offset to be able to query the txncache later, so we need to
290 : retrieve it from the slot_deltas entry in the manifest, and key it
291 : into our txncache. Unfortunately this offset is stored per slot in
292 : the slot_deltas entry. So we need to first go and retrieve the
293 : ancestors array, sort it, and line it up against our banks chain as
294 : described above, and then go through slot deltas, to retrieve the
295 : offset for each slot, and stick it into the appropriate bank in
296 : our chain. */
297 :
298 0 : FD_TEST( blockhashes_len<=301UL );
299 0 : FD_TEST( blockhashes_len>0UL );
300 :
301 0 : ulong seq_min = ULONG_MAX;
302 0 : for( ulong i=0UL; i<blockhashes_len; i++ ) seq_min = fd_ulong_min( seq_min, blockhashes[ i ].hash_index );
303 :
304 0 : ulong seq_max;
305 0 : if( FD_UNLIKELY( __builtin_uaddl_overflow( seq_min, blockhashes_len, &seq_max ) ) ) {
306 0 : FD_LOG_WARNING(( "corrupt snapshot: blockhash queue sequence number wraparound (seq_min=%lu age_cnt=%lu)", seq_min, blockhashes_len ));
307 0 : transition_malformed( ctx, ctx->stem );
308 0 : return 1;
309 0 : }
310 :
311 : /* First let's construct the chain array as described above. But
312 : index 0 will be the root, index 1 the root's parent, etc. */
313 :
314 0 : struct {
315 0 : int exists;
316 0 : uchar blockhash[ 32UL ];
317 0 : fd_txncache_fork_id_t fork_id;
318 0 : ulong txnhash_offset;
319 0 : } banks[ 301UL ] = {0};
320 :
321 0 : for( ulong i=0UL; i<blockhashes_len; i++ ) {
322 0 : fd_snapshot_manifest_blockhash_t const * elem = &blockhashes[ i ];
323 0 : ulong idx;
324 0 : if( FD_UNLIKELY( __builtin_usubl_overflow( elem->hash_index, seq_min, &idx ) ) ) {
325 0 : FD_LOG_WARNING(( "corrupt snapshot: gap in blockhash queue (seq=[%lu,%lu) idx=%lu)", seq_min, seq_max, blockhashes[ i ].hash_index ));
326 0 : transition_malformed( ctx, ctx->stem );
327 0 : return 1;
328 0 : }
329 :
330 0 : if( FD_UNLIKELY( idx>=blockhashes_len ) ) {
331 0 : FD_LOG_WARNING(( "corrupt snapshot: blockhash queue index out of range (seq_min=%lu age_cnt=%lu idx=%lu)", seq_min, blockhashes_len, idx ));
332 0 : transition_malformed( ctx, ctx->stem );
333 0 : return 1;
334 0 : }
335 :
336 0 : if( FD_UNLIKELY( banks[ blockhashes_len-1UL-idx ].exists ) ) {
337 0 : FD_LOG_WARNING(( "corrupt snapshot: duplicate blockhash hash_index %lu", elem->hash_index ));
338 0 : transition_malformed( ctx, ctx->stem );
339 0 : return 1;
340 0 : }
341 :
342 0 : banks[ blockhashes_len-1UL-idx ].fork_id.val = USHORT_MAX;
343 0 : banks[ blockhashes_len-1UL-idx ].txnhash_offset = ULONG_MAX;
344 0 : memcpy( banks[ blockhashes_len-1UL-idx ].blockhash, elem->hash, 32UL );
345 0 : banks[ blockhashes_len-1UL-idx ].exists = 1;
346 0 : }
347 :
348 0 : ulong chain_len = fd_ulong_min( blockhashes_len, 151UL );
349 :
350 : /* Now we need a hashset of just the 151 most recent blockhashes,
351 : anything else is a nonce transaction which we do not insert, or an
352 : already expired transaction which can also be discarded. */
353 :
354 0 : uchar * _map = fd_alloca_check( alignof(blockhash_map_t), blockhash_map_footprint( 1024UL ) );
355 0 : blockhash_map_t * blockhash_map = blockhash_map_join( blockhash_map_new( _map, 1024UL, ctx->seed ) );
356 0 : FD_TEST( blockhash_map );
357 :
358 0 : fd_blockhash_entry_t blockhash_pool[ 151UL ];
359 0 : for( ulong i=0UL; i<chain_len; i++ ) {
360 0 : fd_memcpy( blockhash_pool[ i ].blockhash.uc, banks[ i ].blockhash, 32UL );
361 :
362 0 : if( FD_UNLIKELY( blockhash_map_ele_query_const( blockhash_map, &blockhash_pool[ i ].blockhash, NULL, blockhash_pool ) ) ) {
363 0 : FD_BASE58_ENCODE_32_BYTES( banks[ i ].blockhash, blockhash_b58 );
364 0 : FD_LOG_WARNING(( "corrupt snapshot: duplicate blockhash %s in 151 most recent blockhashes", blockhash_b58 ));
365 0 : transition_malformed( ctx, ctx->stem );
366 0 : return 1;
367 0 : }
368 :
369 0 : blockhash_map_ele_insert( blockhash_map, &blockhash_pool[ i ], blockhash_pool );
370 0 : }
371 :
372 : /* Now load the blockhash offsets for these blockhashes ... */
373 0 : FD_TEST( ctx->blockhash_offsets_len ); /* Must be at least one else nothing would be rooted */
374 0 : for( ulong i=0UL; i<ctx->blockhash_offsets_len; i++ ) {
375 0 : fd_hash_t key;
376 0 : fd_memcpy( key.uc, ctx->blockhash_offsets[ i ].blockhash, 32UL );
377 0 : fd_blockhash_entry_t * entry = blockhash_map_ele_query( blockhash_map, &key, NULL, blockhash_pool );
378 0 : if( FD_UNLIKELY( !entry ) ) continue; /* Not in the most recent 151 blockhashes */
379 :
380 0 : ulong chain_idx = (ulong)(entry - blockhash_pool);
381 :
382 0 : if( FD_UNLIKELY( banks[ chain_idx ].txnhash_offset!=ULONG_MAX && banks[ chain_idx ].txnhash_offset!=ctx->blockhash_offsets[ i ].txnhash_offset ) ) {
383 0 : FD_BASE58_ENCODE_32_BYTES( entry->blockhash.uc, blockhash_b58 );
384 0 : FD_LOG_WARNING(( "corrupt snapshot: conflicting txnhash offsets for blockhash %s", blockhash_b58 ));
385 0 : transition_malformed( ctx, ctx->stem );
386 0 : return 1;
387 0 : }
388 :
389 0 : banks[ chain_idx ].txnhash_offset = ctx->blockhash_offsets[ i ].txnhash_offset;
390 0 : }
391 :
392 : /* Construct the linear fork chain in the txncache. */
393 :
394 0 : fd_txncache_fork_id_t parent = { .val = USHORT_MAX };
395 0 : for( ulong i=0UL; i<chain_len; i++ ) banks[ chain_len-1UL-i ].fork_id = parent = fd_txncache_attach_child( ctx->txncache, parent );
396 0 : for( ulong i=0UL; i<chain_len; i++ ) fd_txncache_attach_blockhash( ctx->txncache, banks[ i ].fork_id, banks[ i ].blockhash );
397 :
398 : /* Now insert all transactions as if they executed at the current
399 : root, per above. */
400 :
401 0 : ulong insert_cnt = 0UL;
402 0 : for( ulong i=0UL; i<ctx->txncache_entries_len; i++ ) {
403 0 : fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[ i ];
404 0 : fd_hash_t key;
405 0 : fd_memcpy( key.uc, entry->blockhash, 32UL );
406 0 : if( FD_UNLIKELY( !blockhash_map_ele_query_const( blockhash_map, &key, NULL, blockhash_pool ) ) ) continue;
407 :
408 0 : insert_cnt++;
409 0 : fd_txncache_insert( ctx->txncache, banks[ 0UL ].fork_id, entry->blockhash, entry->txnhash );
410 0 : }
411 :
412 0 : if( !!ctx->use_vinyl && !!ctx->txncache_entries_len_vinyl_ptr ) {
413 0 : *ctx->txncache_entries_len_vinyl_ptr = ctx->txncache_entries_len;
414 0 : }
415 :
416 0 : FD_LOG_INFO(( "inserted %lu/%lu transactions into the txncache", insert_cnt, ctx->txncache_entries_len ));
417 :
418 : /* Then finalize all the banks (freezing them) and setting the txnhash
419 : offset so future queries use the correct offset. If the offset is
420 : ULONG_MAX this is valid, it means the blockhash had no transactions
421 : in it, so there's nothing in the status cache under that blockhash.
422 :
423 : Just set the offset to 0 in this case, it doesn't matter, but
424 : should be valid between 0 and 12 inclusive. */
425 0 : for( ulong i=0UL; i<chain_len; i++ ) {
426 0 : ulong txnhash_offset = banks[ chain_len-1UL-i ].txnhash_offset==ULONG_MAX ? 0UL : banks[ chain_len-1UL-i ].txnhash_offset;
427 0 : fd_txncache_finalize_fork( ctx->txncache, banks[ chain_len-1UL-i ].fork_id, txnhash_offset, banks[ chain_len-1UL-i ].blockhash );
428 0 : }
429 :
430 0 : for( ulong i=1UL; i<chain_len; i++ ) fd_txncache_advance_root( ctx->txncache, banks[ chain_len-1UL-i ].fork_id );
431 :
432 0 : ctx->txncache_root_fork_id = parent;
433 :
434 0 : return 0;
435 0 : }
436 :
437 : static void
438 0 : process_manifest( fd_snapin_tile_t * ctx ) {
439 0 : fd_snapshot_manifest_t * manifest = fd_chunk_to_laddr( ctx->manifest_out.mem, ctx->manifest_out.chunk );
440 :
441 0 : if( FD_UNLIKELY( ctx->advertised_slot!=manifest->slot ) ) {
442 : /* SnapshotError::MismatchedSlot:
443 : https://github.com/anza-xyz/agave/blob/v3.1.8/runtime/src/snapshot_bank_utils.rs#L472 */
444 0 : FD_LOG_WARNING(( "snapshot manifest bank slot %lu does not match advertised slot %lu from snapshot peer",
445 0 : manifest->slot, ctx->advertised_slot ));
446 0 : transition_malformed( ctx, ctx->stem );
447 0 : return;
448 0 : }
449 :
450 0 : ctx->bank_slot = manifest->slot;
451 0 : if( FD_UNLIKELY( verify_slot_deltas_with_bank_slot( ctx, manifest->slot ) ) ) {
452 0 : FD_LOG_WARNING(( "slot deltas verification failed" ));
453 0 : transition_malformed( ctx, ctx->stem );
454 0 : return;
455 0 : }
456 :
457 0 : if( FD_UNLIKELY( populate_txncache( ctx, manifest->blockhashes, manifest->blockhashes_len ) ) ) {
458 0 : FD_LOG_WARNING(( "populating txncache failed" ));
459 0 : transition_malformed( ctx, ctx->stem );
460 0 : return;
461 0 : }
462 :
463 0 : if( manifest->has_accounts_lthash ) {
464 0 : uchar const * sum = manifest->accounts_lthash;
465 0 : uchar hash32[32]; fd_blake3_hash( sum, FD_LTHASH_LEN_BYTES, hash32 );
466 0 : FD_BASE58_ENCODE_32_BYTES( sum, sum_enc );
467 0 : FD_BASE58_ENCODE_32_BYTES( hash32, hash32_enc );
468 0 : FD_LOG_INFO(( "snapshot manifest slot=%lu indicates lthash[..32]=%s blake3(lthash)=%s",
469 0 : manifest->slot, sum_enc, hash32_enc ));
470 0 : }
471 :
472 0 : manifest->txncache_fork_id = ctx->txncache_root_fork_id.val;
473 :
474 0 : if( FD_LIKELY( !ctx->lthash_disabled ) ) {
475 0 : if( FD_UNLIKELY( !manifest->has_accounts_lthash ) ) {
476 0 : FD_LOG_WARNING(( "snapshot manifest missing accounts lthash field" ));
477 0 : transition_malformed( ctx, ctx->stem );
478 0 : return;
479 0 : }
480 :
481 0 : fd_lthash_value_t * expected_lthash = fd_chunk_to_laddr( ctx->hash_out.mem, ctx->hash_out.chunk );
482 0 : fd_memcpy( expected_lthash, manifest->accounts_lthash, sizeof(fd_lthash_value_t) );
483 0 : fd_stem_publish( ctx->stem, ctx->out_ct_idx, FD_SNAPSHOT_HASH_MSG_EXPECTED, ctx->hash_out.chunk, sizeof(fd_lthash_value_t), 0UL, 0UL, 0UL );
484 0 : ctx->hash_out.chunk = fd_dcache_compact_next( ctx->hash_out.chunk, sizeof(fd_lthash_value_t), ctx->hash_out.chunk0, ctx->hash_out.wmark );
485 0 : }
486 :
487 0 : ulong sig = ctx->full ? fd_ssmsg_sig( FD_SSMSG_MANIFEST_FULL ) :
488 0 : fd_ssmsg_sig( FD_SSMSG_MANIFEST_INCREMENTAL );
489 0 : fd_stem_publish( ctx->stem, ctx->manifest_out.idx, sig, ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), 0UL, 0UL, 0UL );
490 0 : ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), ctx->manifest_out.chunk0, ctx->manifest_out.wmark );
491 0 : }
492 :
493 :
494 : static int
495 : handle_data_frag( fd_snapin_tile_t * ctx,
496 : ulong chunk,
497 : ulong sz,
498 0 : fd_stem_context_t * stem ) {
499 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_FINISHING ) ) {
500 0 : transition_malformed( ctx, stem );
501 0 : return 0;
502 0 : }
503 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
504 : /* Ignore all data frags after observing an error in the stream until
505 : we receive fail & init control messages to restart processing. */
506 0 : return 0;
507 0 : }
508 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) ) {
509 0 : FD_LOG_ERR(( "invalid state for data frag %d", ctx->state ));
510 0 : }
511 :
512 0 : FD_TEST( chunk>=ctx->in.chunk0 && chunk<=ctx->in.wmark && sz<=ctx->in.mtu );
513 :
514 0 : if( FD_UNLIKELY( !ctx->lthash_disabled && ctx->buffered_batch.batch_cnt>0UL ) ) {
515 0 : fd_snapin_process_account_batch( ctx, NULL, &ctx->buffered_batch );
516 0 : return 1;
517 0 : }
518 :
519 0 : for(;;) {
520 0 : if( FD_UNLIKELY( sz-ctx->in.pos==0UL ) ) break;
521 :
522 0 : uchar const * data = (uchar const *)fd_chunk_to_laddr_const( ctx->in.wksp, chunk ) + ctx->in.pos;
523 :
524 0 : int early_exit = 0;
525 0 : fd_ssparse_advance_result_t result[1];
526 0 : int res = fd_ssparse_advance( ctx->ssparse, data, sz-ctx->in.pos, result );
527 0 : switch( res ) {
528 0 : case FD_SSPARSE_ADVANCE_ERROR:
529 0 : transition_malformed( ctx, stem );
530 0 : return 0;
531 0 : case FD_SSPARSE_ADVANCE_AGAIN:
532 0 : break;
533 0 : case FD_SSPARSE_ADVANCE_MANIFEST: {
534 0 : int res = fd_ssmanifest_parser_consume( ctx->manifest_parser,
535 0 : result->manifest.data,
536 0 : result->manifest.data_sz,
537 0 : result->manifest.acc_vec_map,
538 0 : result->manifest.acc_vec_pool );
539 0 : if( FD_UNLIKELY( res==FD_SSMANIFEST_PARSER_ADVANCE_ERROR ) ) {
540 0 : transition_malformed( ctx, stem );
541 0 : return 0;
542 0 : } else if( FD_LIKELY( res==FD_SSMANIFEST_PARSER_ADVANCE_DONE ) ) {
543 0 : ctx->flags.manifest_done = 1;
544 0 : }
545 0 : break;
546 0 : }
547 0 : case FD_SSPARSE_ADVANCE_STATUS_CACHE: {
548 0 : fd_slot_delta_parser_advance_result_t sd_result[1];
549 0 : ulong bytes_remaining = result->status_cache.data_sz;
550 :
551 0 : while( bytes_remaining ) {
552 0 : int res = fd_slot_delta_parser_consume( ctx->slot_delta_parser,
553 0 : result->status_cache.data,
554 0 : bytes_remaining,
555 0 : sd_result );
556 0 : if( FD_UNLIKELY( res<0 ) ) {
557 0 : transition_malformed( ctx, stem );
558 0 : return 0;
559 0 : } else if( FD_LIKELY( res==FD_SLOT_DELTA_PARSER_ADVANCE_GROUP ) ) {
560 0 : if( FD_UNLIKELY( ctx->blockhash_offsets_len>=FD_SNAPIN_MAX_SLOT_DELTA_GROUPS ) ) FD_LOG_ERR(( "blockhash offsets overflow, max is %lu", FD_SNAPIN_MAX_SLOT_DELTA_GROUPS ));
561 :
562 0 : memcpy( ctx->blockhash_offsets[ ctx->blockhash_offsets_len ].blockhash, sd_result->group.blockhash, 32UL );
563 0 : ctx->blockhash_offsets[ ctx->blockhash_offsets_len ].txnhash_offset = sd_result->group.txnhash_offset;
564 0 : ctx->blockhash_offsets_len++;
565 0 : } else if( FD_LIKELY( res==FD_SLOT_DELTA_PARSER_ADVANCE_ENTRY ) ) {
566 0 : if( FD_UNLIKELY( ctx->txncache_entries_len>=FD_SNAPIN_TXNCACHE_MAX_ENTRIES ) ) FD_LOG_ERR(( "txncache entries overflow, max is %lu", FD_SNAPIN_TXNCACHE_MAX_ENTRIES ));
567 0 : ctx->txncache_entries[ ctx->txncache_entries_len++ ] = *sd_result->entry;
568 0 : }
569 :
570 0 : bytes_remaining -= sd_result->bytes_consumed;
571 0 : result->status_cache.data += sd_result->bytes_consumed;
572 0 : }
573 :
574 0 : ctx->flags.status_cache_done = fd_slot_delta_parser_consume( ctx->slot_delta_parser, result->status_cache.data, 0UL, sd_result )==FD_SLOT_DELTA_PARSER_ADVANCE_DONE;
575 0 : break;
576 0 : }
577 0 : case FD_SSPARSE_ADVANCE_ACCOUNT_HEADER:
578 0 : early_exit = fd_snapin_process_account_header( ctx, result );
579 0 : break;
580 0 : case FD_SSPARSE_ADVANCE_ACCOUNT_DATA:
581 0 : early_exit = fd_snapin_process_account_data( ctx, result );
582 :
583 : /* We exepect ConfigKeys Vec to be length 2. We expect the size
584 : of ConfigProgram-owned accounts to be
585 : FD_GUI_CONFIG_PARSE_MAX_VALID_ACCT_SZ, since this the size
586 : that the solana CLI allocates for them. Although the Config
587 : program itself does not enforce this limit, the vast majority
588 : of accounts (with a tiny number of excpetions on devnet) are
589 : maintained with the solana cli. */
590 0 : if( FD_UNLIKELY( ctx->gui_out.idx!=ULONG_MAX && !memcmp( result->account_data.owner, fd_solana_config_program_id.key, sizeof(fd_hash_t) ) && result->account_data.data_sz && *(uchar *)result->account_data.data==2UL && result->account_data.data_sz<=FD_GUI_CONFIG_PARSE_MAX_VALID_ACCT_SZ ) ) {
591 0 : uchar * acct = fd_chunk_to_laddr( ctx->gui_out.mem, ctx->gui_out.chunk );
592 0 : fd_memcpy( acct, result->account_data.data, result->account_data.data_sz );
593 :
594 0 : fd_stem_publish( stem, ctx->gui_out.idx, 0UL, ctx->gui_out.chunk, result->account_data.data_sz, 0UL, 0UL, 0UL );
595 0 : ctx->gui_out.chunk = fd_dcache_compact_next( ctx->gui_out.chunk, result->account_data.data_sz, ctx->gui_out.chunk0, ctx->gui_out.wmark );
596 0 : }
597 0 : break;
598 0 : case FD_SSPARSE_ADVANCE_ACCOUNT_BATCH:
599 0 : early_exit = fd_snapin_process_account_batch( ctx, result, NULL );
600 0 : break;
601 0 : case FD_SSPARSE_ADVANCE_DONE:
602 0 : ctx->state = FD_SNAPSHOT_STATE_FINISHING;
603 0 : break;
604 0 : default:
605 0 : FD_LOG_ERR(( "unexpected fd_ssparse_advance result %d", res ));
606 0 : break;
607 0 : }
608 :
609 0 : if( FD_UNLIKELY( !ctx->flags.manifest_processed && ctx->flags.manifest_done && ctx->flags.status_cache_done ) ) {
610 0 : process_manifest( ctx );
611 0 : ctx->flags.manifest_processed = 1;
612 0 : }
613 :
614 0 : ctx->in.pos += result->bytes_consumed;
615 0 : if( FD_LIKELY( ctx->full ) ) ctx->metrics.full_bytes_read += result->bytes_consumed;
616 0 : else ctx->metrics.incremental_bytes_read += result->bytes_consumed;
617 :
618 0 : if( FD_UNLIKELY( early_exit ) ) break;
619 0 : }
620 :
621 0 : int reprocess_frag = ctx->in.pos<sz;
622 0 : if( FD_LIKELY( !reprocess_frag ) ) ctx->in.pos = 0UL;
623 0 : return reprocess_frag;
624 0 : }
625 :
626 : static void
627 : handle_control_frag( fd_snapin_tile_t * ctx,
628 : fd_stem_context_t * stem,
629 : ulong sig,
630 0 : ulong chunk ) {
631 0 : if( ctx->state==FD_SNAPSHOT_STATE_ERROR && sig!=FD_SNAPSHOT_MSG_CTRL_FAIL ) {
632 : /* Control messages move along the snapshot load pipeline. Since
633 : error conditions can be triggered by any tile in the pipeline,
634 : it is possible to be in error state and still receive otherwise
635 : valid messages. Only a fail message can revert this. */
636 0 : return;
637 0 : };
638 :
639 0 : int forward_msg = 1;
640 :
641 0 : switch( sig ) {
642 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
643 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR: {
644 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
645 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
646 0 : fd_ssparse_batch_enable( ctx->ssparse, ctx->use_vinyl || sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL );
647 0 : ctx->full = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
648 0 : ctx->txncache_entries_len = 0UL;
649 0 : ctx->blockhash_offsets_len = 0UL;
650 0 : fd_txncache_reset( ctx->txncache );
651 0 : fd_ssparse_reset( ctx->ssparse );
652 0 : fd_ssmanifest_parser_init( ctx->manifest_parser, fd_chunk_to_laddr( ctx->manifest_out.mem, ctx->manifest_out.chunk ) );
653 0 : fd_slot_delta_parser_init( ctx->slot_delta_parser );
654 0 : fd_memset( &ctx->flags, 0, sizeof(ctx->flags) );
655 0 : fd_memset( &ctx->vinyl_op, 0, sizeof(ctx->vinyl_op) );
656 :
657 : /* Rewind metric counters (no-op unless recovering from a fail) */
658 0 : if( sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL ) {
659 0 : ctx->metrics.accounts_loaded = ctx->metrics.full_accounts_loaded = 0;
660 0 : ctx->metrics.accounts_replaced = ctx->metrics.full_accounts_replaced = 0;
661 0 : ctx->metrics.accounts_ignored = ctx->metrics.full_accounts_ignored = 0;
662 0 : ctx->metrics.full_bytes_read = 0UL;
663 0 : ctx->metrics.incremental_bytes_read = 0UL;
664 0 : } else {
665 0 : ctx->metrics.accounts_loaded = ctx->metrics.full_accounts_loaded;
666 0 : ctx->metrics.accounts_replaced = ctx->metrics.full_accounts_replaced;
667 0 : ctx->metrics.accounts_ignored = ctx->metrics.full_accounts_ignored;
668 0 : ctx->metrics.incremental_bytes_read = 0UL;
669 :
670 0 : fd_funk_txn_xid_t incremental_xid = { .ul={ LONG_MAX, LONG_MAX } };
671 0 : fd_accdb_attach_child( ctx->accdb_admin, ctx->xid, &incremental_xid );
672 0 : fd_funk_txn_xid_copy( ctx->xid, &incremental_xid );
673 0 : }
674 :
675 : /* Save the slot advertised by the snapshot peer and verify it
676 : against the slot in the snapshot manifest. For downloaded
677 : snapshots, this is simply a best estimate. The actual
678 : advertised slot for downloaded snapshots is received in a
679 : separate fd_ssctrl_meta_t message below. */
680 0 : fd_ssctrl_init_t const * msg = fd_chunk_to_laddr_const( ctx->in.wksp, chunk );
681 0 : ctx->advertised_slot = msg->slot;
682 0 : break;
683 0 : }
684 :
685 0 : case FD_SNAPSHOT_MSG_CTRL_FINI: {
686 : /* This is a special case: handle_data_frag must have already
687 : processed FD_SSPARSE_ADVANCE_DONE and moved the state into
688 : FD_SNAPSHOT_STATE_FINISHING. */
689 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
690 0 : ctx->state = FD_SNAPSHOT_STATE_FINISHING;
691 0 : break;
692 0 : }
693 :
694 0 : case FD_SNAPSHOT_MSG_CTRL_NEXT: {
695 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
696 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
697 :
698 0 : if( !ctx->use_vinyl ) {
699 0 : if( FD_UNLIKELY( verify_slot_deltas_with_slot_history( ctx ) ) ) {
700 0 : FD_LOG_WARNING(( "slot deltas verification failed for full snapshot" ));
701 0 : transition_malformed( ctx, stem );
702 0 : forward_msg = 0;
703 0 : break;
704 0 : }
705 0 : }
706 :
707 : /* Backup metric counters */
708 0 : ctx->metrics.full_accounts_loaded = ctx->metrics.accounts_loaded;
709 0 : ctx->metrics.full_accounts_replaced = ctx->metrics.accounts_replaced;
710 0 : ctx->metrics.full_accounts_ignored = ctx->metrics.accounts_ignored;
711 0 : break;
712 0 : }
713 :
714 0 : case FD_SNAPSHOT_MSG_CTRL_DONE: {
715 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_FINISHING );
716 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
717 :
718 0 : if( !ctx->use_vinyl ) {
719 0 : if( FD_UNLIKELY( verify_slot_deltas_with_slot_history( ctx ) ) ) {
720 0 : if( ctx->full ) FD_LOG_WARNING(( "slot deltas verification failed for full snapshot" ));
721 0 : else FD_LOG_WARNING(( "slot deltas verification failed for incremental snapshot" ));
722 0 : transition_malformed( ctx, stem );
723 0 : forward_msg = 0;
724 0 : break;
725 0 : }
726 0 : }
727 :
728 : /* Publish any remaining funk txn */
729 0 : if( FD_LIKELY( fd_funk_last_publish_is_frozen( ctx->funk ) ) ) {
730 0 : ctx->accdb_admin->base.gc_root_cnt = 0UL;
731 0 : ctx->accdb_admin->base.reclaim_cnt = 0UL;
732 0 : fd_accdb_advance_root( ctx->accdb_admin, ctx->xid );
733 0 : ctx->metrics.accounts_replaced += ctx->accdb_admin->base.gc_root_cnt;
734 : /* If an incremental snapshot 'reclaims' (deletes) an account,
735 : this removes both the full snapshot's original account, and
736 : the incremental snapshot's tombstone record. Thus account
737 : for twice in metrics. */
738 0 : ctx->metrics.accounts_loaded -= ctx->accdb_admin->base.reclaim_cnt;
739 0 : ctx->metrics.accounts_replaced += ctx->accdb_admin->base.reclaim_cnt;
740 0 : }
741 0 : FD_TEST( !fd_funk_last_publish_is_frozen( ctx->funk ) );
742 :
743 : /* Make 'Last published' XID equal the restored slot number */
744 0 : fd_funk_txn_xid_t target_xid = { .ul = { ctx->bank_slot, 0UL } };
745 0 : fd_accdb_attach_child( ctx->accdb_admin, ctx->xid, &target_xid );
746 0 : fd_accdb_advance_root( ctx->accdb_admin, &target_xid );
747 0 : fd_funk_txn_xid_copy( ctx->xid, &target_xid );
748 0 : break;
749 0 : }
750 :
751 0 : case FD_SNAPSHOT_MSG_CTRL_ERROR: {
752 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
753 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
754 0 : break;
755 0 : }
756 :
757 0 : case FD_SNAPSHOT_MSG_CTRL_FAIL: {
758 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
759 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
760 0 : if( ctx->full ) {
761 0 : fd_accdb_v1_clear( ctx->accdb_admin );
762 0 : }
763 :
764 0 : if( !ctx->full ) {
765 0 : fd_accdb_cancel( ctx->accdb_admin, ctx->xid );
766 0 : fd_funk_txn_xid_copy( ctx->xid, fd_funk_last_publish( ctx->funk ) );
767 0 : }
768 0 : break;
769 0 : }
770 :
771 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN: {
772 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
773 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
774 : /* Notify replay when snapshot is fully loaded and verified. This
775 : occurs when we receive the shutdown control message, indicating
776 : snapshot load is successful. */
777 0 : fd_stem_publish( stem, ctx->manifest_out.idx, fd_ssmsg_sig( FD_SSMSG_DONE ), 0UL, 0UL, 0UL, 0UL, 0UL );
778 0 : break;
779 0 : }
780 :
781 0 : default: {
782 0 : FD_LOG_ERR(( "unexpected control sig %lu", sig ));
783 0 : break;
784 0 : }
785 0 : }
786 :
787 : /* Forward the control message down the pipeline */
788 0 : if( FD_LIKELY( forward_msg ) ) {
789 0 : fd_stem_publish( stem, ctx->out_ct_idx, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
790 0 : }
791 0 : }
792 :
793 : static inline int
794 : returnable_frag( fd_snapin_tile_t * ctx,
795 : ulong in_idx FD_PARAM_UNUSED,
796 : ulong seq FD_PARAM_UNUSED,
797 : ulong sig,
798 : ulong chunk,
799 : ulong sz,
800 : ulong ctl FD_PARAM_UNUSED,
801 : ulong tsorig FD_PARAM_UNUSED,
802 : ulong tspub FD_PARAM_UNUSED,
803 0 : fd_stem_context_t * stem ) {
804 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
805 :
806 0 : ctx->stem = stem;
807 0 : if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) return handle_data_frag( ctx, chunk, sz, stem );
808 0 : else handle_control_frag( ctx, stem, sig, chunk );
809 0 : ctx->stem = NULL;
810 :
811 0 : return 0;
812 0 : }
813 :
814 : static ulong
815 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
816 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
817 : ulong out_fds_cnt,
818 0 : int * out_fds ) {
819 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
820 :
821 0 : ulong out_cnt = 0;
822 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
823 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
824 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
825 0 : }
826 :
827 0 : return out_cnt;
828 0 : }
829 :
830 : static ulong
831 : populate_allowed_seccomp( fd_topo_t const * topo,
832 : fd_topo_tile_t const * tile,
833 : ulong out_cnt,
834 0 : struct sock_filter * out ) {
835 0 : (void)topo; (void)tile;
836 0 : populate_sock_filter_policy_fd_snapin_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
837 0 : return sock_filter_policy_fd_snapin_tile_instr_cnt;
838 0 : }
839 :
840 : static void
841 : privileged_init( fd_topo_t * topo,
842 0 : fd_topo_tile_t * tile ) {
843 0 : fd_snapin_tile_t * ctx = fd_topo_obj_laddr( topo, tile->tile_obj_id );
844 0 : memset( ctx, 0, sizeof(fd_snapin_tile_t) );
845 0 : FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
846 :
847 0 : if( tile->snapin.use_vinyl ) {
848 0 : ctx->use_vinyl = 1;
849 0 : }
850 0 : }
851 :
852 : static inline fd_snapin_out_link_t
853 : out1( fd_topo_t const * topo,
854 : fd_topo_tile_t const * tile,
855 0 : char const * name ) {
856 0 : ulong idx = fd_topo_find_tile_out_link( topo, tile, name, 0UL );
857 :
858 0 : if( FD_UNLIKELY( idx==ULONG_MAX ) ) return (fd_snapin_out_link_t){ .idx = ULONG_MAX, .mem = NULL, .chunk0 = 0, .wmark = 0, .chunk = 0, .mtu = 0 };
859 :
860 0 : ulong mtu = topo->links[ tile->out_link_id[ idx ] ].mtu;
861 0 : if( FD_UNLIKELY( mtu==0UL ) ) return (fd_snapin_out_link_t){ .idx = idx, .mem = NULL, .chunk0 = ULONG_MAX, .wmark = ULONG_MAX, .chunk = ULONG_MAX, .mtu = mtu };
862 :
863 0 : void * mem = topo->workspaces[ topo->objs[ topo->links[ tile->out_link_id[ idx ] ].dcache_obj_id ].wksp_id ].wksp;
864 0 : ulong chunk0 = fd_dcache_compact_chunk0( mem, topo->links[ tile->out_link_id[ idx ] ].dcache );
865 0 : ulong wmark = fd_dcache_compact_wmark ( mem, topo->links[ tile->out_link_id[ idx ] ].dcache, mtu );
866 0 : return (fd_snapin_out_link_t){ .idx = idx, .mem = mem, .chunk0 = chunk0, .wmark = wmark, .chunk = chunk0, .mtu = mtu };
867 0 : }
868 :
869 : FD_FN_UNUSED static void
870 : unprivileged_init( fd_topo_t * topo,
871 0 : fd_topo_tile_t * tile ) {
872 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
873 :
874 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
875 0 : fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
876 0 : void * _ssparse = FD_SCRATCH_ALLOC_APPEND( l, fd_ssparse_align(), fd_ssparse_footprint( 1UL<<24UL ) );
877 0 : void * _txncache = FD_SCRATCH_ALLOC_APPEND( l, fd_txncache_align(), fd_txncache_footprint( tile->snapin.max_live_slots ) );
878 0 : void * _manifest_parser = FD_SCRATCH_ALLOC_APPEND( l, fd_ssmanifest_parser_align(), fd_ssmanifest_parser_footprint() );
879 0 : void * _sd_parser = FD_SCRATCH_ALLOC_APPEND( l, fd_slot_delta_parser_align(), fd_slot_delta_parser_footprint() );
880 0 : ctx->blockhash_offsets = FD_SCRATCH_ALLOC_APPEND( l, alignof(blockhash_group_t), sizeof(blockhash_group_t)*FD_SNAPIN_MAX_SLOT_DELTA_GROUPS );
881 :
882 0 : if( tile->snapin.use_vinyl ) {
883 : /* snapwm needs all txn_cache data in order to verify the slot
884 : deltas with the slot history. To make this possible, snapin
885 : uses the dcache of the snapin_txn link as the scratch memory.
886 : The app field of the dcache is used to communicate the
887 : txncache_entries_len value. */
888 0 : fd_snapin_out_link_t snapin_txn = out1( topo, tile, "snapin_txn" );
889 0 : FD_TEST( !!snapin_txn.mem );
890 0 : fd_topo_link_t const * out_link_txn = &topo->links[ tile->out_link_id[ snapin_txn.idx ] ];
891 0 : ulong depth = out_link_txn->depth;
892 0 : FD_TEST( ( depth*snapin_txn.mtu )==( sizeof(fd_sstxncache_entry_t)*FD_SNAPIN_TXNCACHE_MAX_ENTRIES ) );
893 0 : ctx->txncache_entries = fd_chunk_to_laddr( snapin_txn.mem, snapin_txn.chunk0 );
894 0 : ctx->txncache_entries_len_vinyl_ptr = (ulong*)fd_dcache_app_laddr( out_link_txn->dcache );
895 0 : memset( ctx->txncache_entries_len_vinyl_ptr, 0, sizeof(ulong) );
896 0 : } else {
897 0 : ctx->txncache_entries = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sstxncache_entry_t), sizeof(fd_sstxncache_entry_t)*FD_SNAPIN_TXNCACHE_MAX_ENTRIES );
898 0 : ctx->txncache_entries_len_vinyl_ptr = NULL;
899 0 : }
900 :
901 0 : ctx->full = 1;
902 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
903 0 : ctx->lthash_disabled = tile->snapin.lthash_disabled;
904 :
905 0 : ctx->boot_timestamp = fd_log_wallclock();
906 :
907 0 : FD_TEST( fd_accdb_admin_v1_init( ctx->accdb_admin, fd_topo_obj_laddr( topo, tile->snapin.funk_obj_id ) ) );
908 0 : FD_TEST( fd_accdb_user_v1_init ( ctx->accdb, fd_topo_obj_laddr( topo, tile->snapin.funk_obj_id ) ) );
909 0 : ctx->funk = fd_accdb_user_v1_funk( ctx->accdb );
910 0 : fd_funk_txn_xid_copy( ctx->xid, fd_funk_root( ctx->funk ) );
911 :
912 0 : void * _txncache_shmem = fd_topo_obj_laddr( topo, tile->snapin.txncache_obj_id );
913 0 : fd_txncache_shmem_t * txncache_shmem = fd_txncache_shmem_join( _txncache_shmem );
914 0 : FD_TEST( txncache_shmem );
915 0 : ctx->txncache = fd_txncache_join( fd_txncache_new( _txncache, txncache_shmem ) );
916 0 : FD_TEST( ctx->txncache );
917 :
918 0 : ctx->txncache_entries_len = 0UL;
919 0 : ctx->blockhash_offsets_len = 0UL;
920 :
921 0 : ctx->ssparse = fd_ssparse_new( _ssparse, 1UL<<24UL, ctx->seed );
922 0 : FD_TEST( ctx->ssparse );
923 :
924 0 : ctx->manifest_parser = fd_ssmanifest_parser_join( fd_ssmanifest_parser_new( _manifest_parser ) );
925 0 : FD_TEST( ctx->manifest_parser );
926 :
927 0 : ctx->slot_delta_parser = fd_slot_delta_parser_join( fd_slot_delta_parser_new( _sd_parser ) );
928 0 : FD_TEST( ctx->slot_delta_parser );
929 :
930 0 : fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
931 :
932 0 : if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
933 0 : if( FD_UNLIKELY( tile->in_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1", tile->in_cnt ));
934 :
935 0 : ctx->manifest_out = out1( topo, tile, "snapin_manif" );
936 0 : ctx->gui_out = out1( topo, tile, "snapin_gui" );
937 0 : ulong out_link_ct_idx = fd_topo_find_tile_out_link( topo, tile, "snapin_ct", 0UL );
938 0 : if( out_link_ct_idx==ULONG_MAX ) out_link_ct_idx = fd_topo_find_tile_out_link( topo, tile, "snapin_ls", 0UL );
939 0 : if( out_link_ct_idx==ULONG_MAX ) out_link_ct_idx = fd_topo_find_tile_out_link( topo, tile, "snapin_wm", 0UL );
940 0 : if( FD_UNLIKELY( out_link_ct_idx==ULONG_MAX ) ) FD_LOG_ERR(( "tile `" NAME "` missing required out link `snapin_ct` or `snapin_ls` or `snapin_wm`" ));
941 0 : fd_topo_link_t * snapin_out_link = &topo->links[ tile->out_link_id[ out_link_ct_idx ] ];
942 0 : ctx->out_ct_idx = out_link_ct_idx;
943 :
944 0 : if( FD_UNLIKELY( ctx->out_ct_idx==ULONG_MAX ) ) FD_LOG_ERR(( "tile `" NAME "` missing required out link `snapin_ct` or `snapin_ls` or `snapin_wm`" ));
945 0 : if( FD_UNLIKELY( ctx->manifest_out.idx==ULONG_MAX ) ) FD_LOG_ERR(( "tile `" NAME "` missing required out link `snapin_manif`" ));
946 :
947 0 : if( ( 0==strcmp( snapin_out_link->name, "snapin_ls" ) ) ||
948 0 : ( 0==strcmp( snapin_out_link->name, "snapin_wm" ) ) ) {
949 0 : ctx->hash_out = out1( topo, tile, snapin_out_link->name );
950 0 : }
951 :
952 0 : fd_ssparse_reset( ctx->ssparse );
953 0 : fd_ssmanifest_parser_init( ctx->manifest_parser, fd_chunk_to_laddr( ctx->manifest_out.mem, ctx->manifest_out.chunk ) );
954 0 : fd_slot_delta_parser_init( ctx->slot_delta_parser );
955 :
956 0 : fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0UL ] ];
957 0 : FD_TEST( 0==strcmp( in_link->name, "snapdc_in" ) );
958 0 : fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
959 0 : ctx->in.wksp = in_wksp->wksp;
960 0 : ctx->in.chunk0 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
961 0 : ctx->in.wmark = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
962 0 : ctx->in.mtu = in_link->mtu;
963 0 : ctx->in.pos = 0UL;
964 :
965 0 : ctx->buffered_batch.batch_cnt = 0UL;
966 0 : ctx->buffered_batch.remaining_idx = 0UL;
967 :
968 0 : ctx->advertised_slot = 0UL;
969 0 : ctx->bank_slot = 0UL;
970 :
971 0 : fd_memset( &ctx->flags, 0, sizeof(ctx->flags) );
972 :
973 0 : if( tile->snapin.use_vinyl ) {
974 0 : ctx->use_vinyl = 1;
975 0 : fd_snapin_vinyl_unprivileged_init( ctx, topo, tile, NULL, NULL );
976 0 : }
977 0 : }
978 :
979 : /* Control fragments can result in one extra publish to forward the
980 : message down the pipeline, in addition to the result / malformed
981 : message. Can send one duplicate account message as well. */
982 0 : #define STEM_BURST 3UL
983 :
984 0 : #define STEM_LAZY 1000L
985 :
986 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapin_tile_t
987 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapin_tile_t)
988 :
989 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
990 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
991 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
992 :
993 : #include "../../disco/stem/fd_stem.c"
994 :
995 : fd_topo_run_tile_t fd_tile_snapin = {
996 : .name = NAME,
997 : .populate_allowed_fds = populate_allowed_fds,
998 : .populate_allowed_seccomp = populate_allowed_seccomp,
999 : .scratch_align = scratch_align,
1000 : .scratch_footprint = scratch_footprint,
1001 : .privileged_init = privileged_init,
1002 : .unprivileged_init = unprivileged_init,
1003 : .run = stem_run,
1004 : };
1005 :
1006 : #undef NAME
|