Line data Source code
1 : #include "utils/fd_ssctrl.h"
2 : #include "utils/fd_ssparse.h"
3 : #include "utils/fd_ssmanifest_parser.h"
4 : #include "utils/fd_slot_delta_parser.h"
5 : #include "utils/fd_ssmsg.h"
6 :
7 : #include "../../disco/topo/fd_topo.h"
8 : #include "../../disco/metrics/fd_metrics.h"
9 : #include "../../flamenco/accdb/fd_accdb_admin.h"
10 : #include "../../flamenco/accdb/fd_accdb_user.h"
11 : #include "../../flamenco/runtime/fd_acc_mgr.h"
12 : #include "../../flamenco/runtime/fd_txncache.h"
13 : #include "../../flamenco/runtime/sysvar/fd_sysvar_slot_history.h"
14 :
15 : #include "generated/fd_snapin_tile_seccomp.h"
16 :
17 : #define NAME "snapin"
18 :
19 : /* The snapin tile is a state machine that parses and loads a full
20 : and optionally an incremental snapshot. It is currently responsible
21 : for loading accounts into an in-memory database, though this may
22 : change. */
23 :
24 : /* 300 here is from status_cache.rs::MAX_CACHE_ENTRIES which is the most
25 : root slots Agave could possibly serve in a snapshot. */
26 : #define FD_SNAPIN_TXNCACHE_MAX_ENTRIES (300UL*FD_PACK_MAX_TXNCACHE_TXN_PER_SLOT)
27 :
28 : /* 300 root slots in the slot deltas array, and each one references all
29 : 151 prior blockhashes that it's able to. */
30 : #define FD_SNAPIN_MAX_SLOT_DELTA_GROUPS (300UL*151UL)
31 :
32 0 : #define FD_SNAPIN_OUT_SNAPCT 0UL
33 0 : #define FD_SNAPIN_OUT_MANIFEST 1UL
34 :
35 : struct fd_blockhash_entry {
36 : fd_hash_t blockhash;
37 :
38 : struct {
39 : ulong prev;
40 : ulong next;
41 : } map;
42 : };
43 :
44 : typedef struct fd_blockhash_entry fd_blockhash_entry_t;
45 :
46 : #define MAP_NAME blockhash_map
47 0 : #define MAP_KEY blockhash
48 : #define MAP_KEY_T fd_hash_t
49 : #define MAP_ELE_T fd_blockhash_entry_t
50 0 : #define MAP_KEY_EQ(k0,k1) (!memcmp((k0),(k1), sizeof(fd_hash_t)))
51 0 : #define MAP_KEY_HASH(key,seed) (fd_hash((seed),(key),sizeof(fd_hash_t)))
52 0 : #define MAP_PREV map.prev
53 0 : #define MAP_NEXT map.next
54 : #define MAP_OPTIMIZE_RANDOM_ACCESS_REMOVAL 1
55 : #include "../../util/tmpl/fd_map_chain.c"
56 :
57 : struct blockhash_group {
58 : uchar blockhash[ 32UL ];
59 : ulong txnhash_offset;
60 : };
61 :
62 : typedef struct blockhash_group blockhash_group_t;
63 :
64 : struct fd_snapin_tile {
65 : int full;
66 : int state;
67 :
68 : ulong seed;
69 : long boot_timestamp;
70 :
71 : fd_accdb_admin_t accdb_admin[1];
72 : fd_accdb_user_t accdb[1];
73 :
74 : fd_txncache_t * txncache;
75 : uchar * acc_data;
76 :
77 : fd_funk_txn_xid_t xid[1]; /* txn XID */
78 :
79 : fd_stem_context_t * stem;
80 : fd_ssparse_t * ssparse;
81 : fd_ssmanifest_parser_t * manifest_parser;
82 : fd_slot_delta_parser_t * slot_delta_parser;
83 :
84 : struct {
85 : int manifest_done;
86 : int status_cache_done;
87 : int manifest_processed;
88 : } flags;
89 :
90 : ulong bank_slot;
91 :
92 : ulong blockhash_offsets_len;
93 : blockhash_group_t * blockhash_offsets;
94 :
95 : ulong txncache_entries_len;
96 : fd_sstxncache_entry_t * txncache_entries;
97 :
98 : fd_txncache_fork_id_t txncache_root_fork_id;
99 :
100 : struct {
101 : ulong full_bytes_read;
102 : ulong incremental_bytes_read;
103 : ulong accounts_inserted;
104 : } metrics;
105 :
106 : struct {
107 : fd_wksp_t * wksp;
108 : ulong chunk0;
109 : ulong wmark;
110 : ulong mtu;
111 : ulong pos;
112 : } in;
113 :
114 : struct {
115 : fd_wksp_t * wksp;
116 : ulong chunk0;
117 : ulong wmark;
118 : ulong chunk;
119 : ulong mtu;
120 : } manifest_out;
121 : };
122 :
123 : typedef struct fd_snapin_tile fd_snapin_tile_t;
124 :
125 : static inline int
126 0 : should_shutdown( fd_snapin_tile_t * ctx ) {
127 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN ) ) {
128 0 : FD_LOG_NOTICE(( "loaded %.1fM accounts from snapshot in %.3f seconds", (double)ctx->metrics.accounts_inserted/1e6, (double)(fd_log_wallclock()-ctx->boot_timestamp)/1e9 ));
129 0 : }
130 0 : return ctx->state==FD_SNAPSHOT_STATE_SHUTDOWN;
131 0 : }
132 :
133 : static ulong
134 0 : scratch_align( void ) {
135 0 : return 128UL;
136 0 : }
137 :
138 : static ulong
139 0 : scratch_footprint( fd_topo_tile_t const * tile ) {
140 0 : (void)tile;
141 0 : ulong l = FD_LAYOUT_INIT;
142 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
143 0 : l = FD_LAYOUT_APPEND( l, fd_ssparse_align(), fd_ssparse_footprint( 1UL<<24UL ) );
144 0 : l = FD_LAYOUT_APPEND( l, fd_txncache_align(), fd_txncache_footprint( tile->snapin.max_live_slots ) );
145 0 : l = FD_LAYOUT_APPEND( l, fd_ssmanifest_parser_align(), fd_ssmanifest_parser_footprint() );
146 0 : l = FD_LAYOUT_APPEND( l, fd_slot_delta_parser_align(), fd_slot_delta_parser_footprint() );
147 0 : l = FD_LAYOUT_APPEND( l, alignof(fd_sstxncache_entry_t), sizeof(fd_sstxncache_entry_t)*FD_SNAPIN_TXNCACHE_MAX_ENTRIES );
148 0 : l = FD_LAYOUT_APPEND( l, alignof(blockhash_group_t), sizeof(blockhash_group_t)*FD_SNAPIN_MAX_SLOT_DELTA_GROUPS );
149 0 : return FD_LAYOUT_FINI( l, alignof(fd_snapin_tile_t) );
150 0 : }
151 :
152 : static void
153 0 : metrics_write( fd_snapin_tile_t * ctx ) {
154 0 : FD_MGAUGE_SET( SNAPIN, FULL_BYTES_READ, ctx->metrics.full_bytes_read );
155 0 : FD_MGAUGE_SET( SNAPIN, INCREMENTAL_BYTES_READ, ctx->metrics.incremental_bytes_read );
156 0 : FD_MGAUGE_SET( SNAPIN, ACCOUNTS_INSERTED, ctx->metrics.accounts_inserted );
157 0 : FD_MGAUGE_SET( SNAPIN, STATE, (ulong)ctx->state );
158 0 : }
159 :
160 : static int
161 : verify_slot_deltas_with_slot_history( fd_snapin_tile_t * ctx,
162 0 : fd_slot_history_global_t * slot_history ) {
163 :
164 0 : for( ulong i=0UL; i<ctx->txncache_entries_len; i++ ) {
165 0 : fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[i];
166 0 : if( FD_UNLIKELY( fd_sysvar_slot_history_find_slot( slot_history, entry->slot )!=FD_SLOT_HISTORY_SLOT_FOUND ) ) return -1;
167 0 : }
168 0 : return 0;
169 0 : }
170 :
171 : static int
172 : verify_slot_deltas_with_bank_slot( fd_snapin_tile_t * ctx,
173 0 : ulong bank_slot ) {
174 0 : for( ulong i=0UL; i<ctx->txncache_entries_len; i++ ) {
175 0 : fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[i];
176 0 : if( FD_UNLIKELY( entry->slot>bank_slot ) ) return -1;
177 0 : }
178 0 : return 0;
179 0 : }
180 :
181 : static void
182 : transition_malformed( fd_snapin_tile_t * ctx,
183 0 : fd_stem_context_t * stem ) {
184 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
185 0 : fd_stem_publish( stem, FD_SNAPIN_OUT_SNAPCT, FD_SNAPSHOT_MSG_CTRL_ERROR, 0UL, 0UL, 0UL, 0UL, 0UL );
186 0 : }
187 :
188 : static int
189 : populate_txncache( fd_snapin_tile_t * ctx,
190 : fd_snapshot_manifest_blockhash_t const blockhashes[ static 301UL ],
191 0 : ulong blockhashes_len ) {
192 : /* Our txncache internally contains the fork structure for the chain,
193 : which we need to recreate here. Because snapshots are only served
194 : for rooted slots, there is actually no forking, and the bank forks
195 : are just a single bank, the root, like
196 :
197 : _root
198 :
199 : But the txncache also must contain the 150 more recent banks prior
200 : to the root (151 rooted banks total), looking like,
201 :
202 :
203 : _root_150 -> _root_149 -> ... -> _root_2 -> _root_1 -> _root
204 :
205 : Our txncache is "slot agnostic" meaning there is no concept of a
206 : slot number in it. It just has a fork tree structure. So long as
207 : the fork tree is isomorphic to the actual bank forks, and each bank
208 : has the correct blockhash, it works.
209 :
210 : So the challenge is simply to create this chain of 151 forks in the
211 : txncache, with correct blockhashes, and then insert all the
212 : transactions into it.
213 :
214 : Constructing the chain of blockhashes is easy. It is just the
215 : BLOCKHASH_QUEUE array in the manifest. This array is unfortuantely
216 : not sorted and appears in random order, but it has a hash_index
217 : field which is a gapless index, starting at some arbitrary offset,
218 : so we can back out the 151 blockhashes we need from this, by first
219 : finding the max hash_index as _max and then collecting hash entries
220 : via,
221 :
222 : _root_150 -> _root_149 -> ... -> _root_2 -> _root_1 -> _root
223 : _max-150 -> _max-149 -> ... -> _max-2 -> _max-1 -> _max
224 :
225 : Now the remaining problem is inserting transactions into this
226 : chain. Remember each transaction needs to be inserted with:
227 :
228 : (a) The fork ID (position of the bank in the chain) it was executed in.
229 : (b) The blockhash of the bank it referenced.
230 :
231 : (b) is trivial to retrieve, as it's in the actual slot_deltas entry
232 : in the manifest served by Agave. But (a) is mildly annoying. Agave
233 : serves slot_deltas based on slot, so we need an additional mapping
234 : from slot to position in our banks chain. It turns out we have to
235 : go to yet another structure in the manifest to retrieve this, the
236 : ancestors array. This is just an array of slot values, so we need
237 : to sort it, and line it up against our banks chain like so,
238 :
239 : _root_150 -> _root_149 -> ... -> _root_2 -> _root_1 -> _root
240 : _max-150 -> _max-149 -> ... -> _max-2 -> _max-1 -> _max
241 : _slots_150 -> _slots_149 -> ... -> _slots_2 -> _slots_1 -> _slots
242 :
243 : From there we are done.
244 :
245 : Well almost ... if you were paying attention you might have noticed
246 : this is a lot of work and we are lazy. Why don't we just ignore the
247 : slot mapping and assume everything executed at the root slot
248 : exactly? The only invariant we should maintain from a memory
249 : perspective is that at most, across all active banks,
250 : FD_MAX_TXN_PER_SLOT transactions are stored per slot, but we
251 : have preserved that. It is not true "per slot" technically, but
252 : it's true across all slots, and the memory is aggregated. It will
253 : also always be true, even as slots are garbage collected, because
254 : entries are collected by referece blockhash, not executed slot.
255 :
256 : ... actually we can't do this. There's more broken things here.
257 : The Agave status decided to only store 20 bytes for 32 byte
258 : transaction hashes to save on memory. That's OK, but they didn't
259 : just take the first 20 bytes. They instead, for each blockhash,
260 : take a random offset between 0 and 12, and store bytes
261 : [ offset, offset+20 ) of the transaction hash. We need to know this
262 : offset to be able to query the txncache later, so we need to
263 : retrieve it from the slot_deltas entry in the manifest, and key it
264 : into our txncache. Unfortunately this offset is stored per slot in
265 : the slot_deltas entry. So we need to first go and retrieve the
266 : ancestors array, sort it, and line it up against our banks chain as
267 : described above, and then go through slot deltas, to retrieve the
268 : offset for each slot, and stick it into the appropriate bank in
269 : our chain. */
270 :
271 0 : FD_TEST( blockhashes_len<=301UL );
272 0 : FD_TEST( blockhashes_len>0UL );
273 :
274 0 : ulong seq_min = ULONG_MAX;
275 0 : for( ulong i=0UL; i<blockhashes_len; i++ ) seq_min = fd_ulong_min( seq_min, blockhashes[ i ].hash_index );
276 :
277 0 : ulong seq_max;
278 0 : if( FD_UNLIKELY( __builtin_uaddl_overflow( seq_min, blockhashes_len, &seq_max ) ) ) {
279 0 : FD_LOG_WARNING(( "corrupt snapshot: blockhash queue sequence number wraparound (seq_min=%lu age_cnt=%lu)", seq_min, blockhashes_len ));
280 0 : transition_malformed( ctx, ctx->stem );
281 0 : return 1;
282 0 : }
283 :
284 : /* First let's construct the chain array as described above. But
285 : index 0 will be the root, index 1 the root's parent, etc. */
286 :
287 0 : struct {
288 0 : int exists;
289 0 : uchar blockhash[ 32UL ];
290 0 : fd_txncache_fork_id_t fork_id;
291 0 : ulong txnhash_offset;
292 0 : } banks[ 301UL ] = {0};
293 :
294 0 : for( ulong i=0UL; i<blockhashes_len; i++ ) {
295 0 : fd_snapshot_manifest_blockhash_t const * elem = &blockhashes[ i ];
296 0 : ulong idx;
297 0 : if( FD_UNLIKELY( __builtin_usubl_overflow( elem->hash_index, seq_min, &idx ) ) ) {
298 0 : FD_LOG_WARNING(( "corrupt snapshot: gap in blockhash queue (seq=[%lu,%lu) idx=%lu)", seq_min, seq_max, blockhashes[ i ].hash_index ));
299 0 : transition_malformed( ctx, ctx->stem );
300 0 : return 1;
301 0 : }
302 :
303 0 : if( FD_UNLIKELY( idx>=blockhashes_len ) ) {
304 0 : FD_LOG_WARNING(( "corrupt snapshot: blockhash queue index out of range (seq_min=%lu age_cnt=%lu idx=%lu)", seq_min, blockhashes_len, idx ));
305 0 : transition_malformed( ctx, ctx->stem );
306 0 : return 1;
307 0 : }
308 :
309 0 : if( FD_UNLIKELY( banks[ blockhashes_len-1UL-idx ].exists ) ) {
310 0 : FD_LOG_WARNING(( "corrupt snapshot: duplicate blockhash hash_index %lu", elem->hash_index ));
311 0 : transition_malformed( ctx, ctx->stem );
312 0 : return 1;
313 0 : }
314 :
315 0 : banks[ blockhashes_len-1UL-idx ].fork_id.val = USHORT_MAX;
316 0 : banks[ blockhashes_len-1UL-idx ].txnhash_offset = ULONG_MAX;
317 0 : memcpy( banks[ blockhashes_len-1UL-idx ].blockhash, elem->hash, 32UL );
318 0 : banks[ blockhashes_len-1UL-idx ].exists = 1;
319 0 : }
320 :
321 0 : ulong chain_len = fd_ulong_min( blockhashes_len, 151UL );
322 :
323 : /* Now we need a hashset of just the 151 most recent blockhashes,
324 : anything else is a nonce transaction which we do not insert, or an
325 : already expired transaction which can also be discarded. */
326 :
327 0 : uchar * _map = fd_alloca_check( alignof(blockhash_map_t), blockhash_map_footprint( 1024UL ) );
328 0 : blockhash_map_t * blockhash_map = blockhash_map_join( blockhash_map_new( _map, 1024UL, ctx->seed ) );
329 0 : FD_TEST( blockhash_map );
330 :
331 0 : fd_blockhash_entry_t blockhash_pool[ 151UL ];
332 0 : for( ulong i=0UL; i<chain_len; i++ ) {
333 0 : fd_memcpy( blockhash_pool[ i ].blockhash.uc, banks[ i ].blockhash, 32UL );
334 :
335 0 : if( FD_UNLIKELY( blockhash_map_ele_query_const( blockhash_map, &blockhash_pool[ i ].blockhash, NULL, blockhash_pool ) ) ) {
336 0 : FD_LOG_WARNING(( "corrupt snapshot: duplicate blockhash %s in 151 most recent blockhashes", FD_BASE58_ENC_32_ALLOCA( banks[ i ].blockhash ) ));
337 0 : transition_malformed( ctx, ctx->stem );
338 0 : return 1;
339 0 : }
340 :
341 0 : blockhash_map_ele_insert( blockhash_map, &blockhash_pool[ i ], blockhash_pool );
342 0 : }
343 :
344 : /* Now load the blockhash offsets for these blockhashes ... */
345 0 : FD_TEST( ctx->blockhash_offsets_len ); /* Must be at least one else nothing would be rooted */
346 0 : for( ulong i=0UL; i<ctx->blockhash_offsets_len; i++ ) {
347 0 : fd_hash_t key;
348 0 : fd_memcpy( key.uc, ctx->blockhash_offsets[ i ].blockhash, 32UL );
349 0 : fd_blockhash_entry_t * entry = blockhash_map_ele_query( blockhash_map, &key, NULL, blockhash_pool );
350 0 : if( FD_UNLIKELY( !entry ) ) continue; /* Not in the most recent 151 blockhashes */
351 :
352 0 : ulong chain_idx = (ulong)(entry - blockhash_pool);
353 :
354 0 : if( FD_UNLIKELY( banks[ chain_idx ].txnhash_offset!=ULONG_MAX && banks[ chain_idx ].txnhash_offset!=ctx->blockhash_offsets[ i ].txnhash_offset ) ) {
355 0 : FD_LOG_WARNING(( "corrupt snapshot: conflicting txnhash offsets for blockhash %s", FD_BASE58_ENC_32_ALLOCA( entry->blockhash.uc ) ));
356 0 : transition_malformed( ctx, ctx->stem );
357 0 : return 1;
358 0 : }
359 :
360 0 : banks[ chain_idx ].txnhash_offset = ctx->blockhash_offsets[ i ].txnhash_offset;
361 0 : }
362 :
363 : /* Construct the linear fork chain in the txncache. */
364 :
365 0 : fd_txncache_fork_id_t parent = { .val = USHORT_MAX };
366 0 : for( ulong i=0UL; i<chain_len; i++ ) banks[ chain_len-1UL-i ].fork_id = parent = fd_txncache_attach_child( ctx->txncache, parent );
367 0 : for( ulong i=0UL; i<chain_len; i++ ) fd_txncache_attach_blockhash( ctx->txncache, banks[ i ].fork_id, banks[ i ].blockhash );
368 :
369 : /* Now insert all transactions as if they executed at the current
370 : root, per above. */
371 :
372 0 : ulong insert_cnt = 0UL;
373 0 : for( ulong i=0UL; i<ctx->txncache_entries_len; i++ ) {
374 0 : fd_sstxncache_entry_t const * entry = &ctx->txncache_entries[ i ];
375 0 : fd_hash_t key;
376 0 : fd_memcpy( key.uc, entry->blockhash, 32UL );
377 0 : if( FD_UNLIKELY( !blockhash_map_ele_query_const( blockhash_map, &key, NULL, blockhash_pool ) ) ) continue;
378 :
379 0 : insert_cnt++;
380 0 : fd_txncache_insert( ctx->txncache, banks[ 0UL ].fork_id, entry->blockhash, entry->txnhash );
381 0 : }
382 :
383 0 : FD_LOG_INFO(( "inserted %lu/%lu transactions into the txncache", insert_cnt, ctx->txncache_entries_len ));
384 :
385 : /* Then finalize all the banks (freezing them) and setting the txnhash
386 : offset so future queries use the correct offset. If the offset is
387 : ULONG_MAX this is valid, it means the blockhash had no transactions
388 : in it, so there's nothing in the status cache under that blockhash.
389 :
390 : Just set the offset to 0 in this case, it doesn't matter, but
391 : should be valid between 0 and 12 inclusive. */
392 0 : for( ulong i=0UL; i<chain_len; i++ ) {
393 0 : ulong txnhash_offset = banks[ chain_len-1UL-i ].txnhash_offset==ULONG_MAX ? 0UL : banks[ chain_len-1UL-i ].txnhash_offset;
394 0 : fd_txncache_finalize_fork( ctx->txncache, banks[ chain_len-1UL-i ].fork_id, txnhash_offset, banks[ chain_len-1UL-i ].blockhash );
395 0 : }
396 :
397 0 : for( ulong i=1UL; i<chain_len; i++ ) fd_txncache_advance_root( ctx->txncache, banks[ chain_len-1UL-i ].fork_id );
398 :
399 0 : ctx->txncache_root_fork_id = parent;
400 :
401 0 : return 0;
402 0 : }
403 :
404 : static void
405 0 : process_manifest( fd_snapin_tile_t * ctx ) {
406 0 : fd_snapshot_manifest_t * manifest = fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk );
407 :
408 0 : ctx->bank_slot = manifest->slot;
409 0 : if( FD_UNLIKELY( verify_slot_deltas_with_bank_slot( ctx, manifest->slot ) ) ) {
410 0 : FD_LOG_WARNING(( "slot deltas verification failed" ));
411 0 : transition_malformed( ctx, ctx->stem );
412 0 : return;
413 0 : }
414 :
415 0 : if( FD_UNLIKELY( populate_txncache( ctx, manifest->blockhashes, manifest->blockhashes_len ) ) ) {
416 0 : FD_LOG_WARNING(( "populating txncache failed" ));
417 0 : transition_malformed( ctx, ctx->stem );
418 0 : return;
419 0 : }
420 :
421 0 : manifest->txncache_fork_id = ctx->txncache_root_fork_id.val;
422 :
423 0 : ulong sig = ctx->full ? fd_ssmsg_sig( FD_SSMSG_MANIFEST_FULL ) :
424 0 : fd_ssmsg_sig( FD_SSMSG_MANIFEST_INCREMENTAL );
425 0 : fd_stem_publish( ctx->stem, FD_SNAPIN_OUT_MANIFEST, sig, ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), 0UL, 0UL, 0UL );
426 0 : ctx->manifest_out.chunk = fd_dcache_compact_next( ctx->manifest_out.chunk, sizeof(fd_snapshot_manifest_t), ctx->manifest_out.chunk0, ctx->manifest_out.wmark );
427 0 : }
428 :
429 : static void
430 : process_account_header( fd_snapin_tile_t * ctx,
431 0 : fd_ssparse_advance_result_t * result ) {
432 0 : fd_funk_t * funk = ctx->accdb->funk;
433 :
434 0 : fd_funk_rec_key_t id = fd_funk_acc_key( (fd_pubkey_t const*)result->account_header.pubkey );
435 0 : fd_funk_rec_query_t query[1];
436 0 : fd_funk_rec_t * rec = fd_funk_rec_query_try( funk, ctx->xid, &id, query );
437 :
438 0 : int should_publish = 0;
439 0 : fd_funk_rec_prepare_t prepare[1];
440 0 : if( FD_LIKELY( !rec ) ) {
441 0 : should_publish = 1;
442 0 : rec = fd_funk_rec_prepare( funk, ctx->xid, &id, prepare, NULL );
443 0 : FD_TEST( rec );
444 0 : }
445 :
446 0 : fd_account_meta_t * meta = fd_funk_val( rec, funk->wksp );
447 0 : if( FD_UNLIKELY( meta ) ) {
448 0 : if( FD_LIKELY( meta->slot>result->account_header.slot ) ) {
449 0 : ctx->acc_data = NULL;
450 0 : return;
451 0 : }
452 :
453 : /* TODO: Reaching here means the existing value is a duplicate
454 : account. We need to hash the existing account and subtract that
455 : hash from the running lthash. */
456 0 : }
457 :
458 : /* Allocate data space from heap, free old value (if any) */
459 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
460 0 : ulong const alloc_sz = sizeof(fd_account_meta_t)+result->account_header.data_len;
461 0 : ulong alloc_max;
462 0 : meta = fd_alloc_malloc_at_least( funk->alloc, 16UL, alloc_sz, &alloc_max );
463 0 : if( FD_UNLIKELY( !meta ) ) FD_LOG_ERR(( "Ran out of heap memory while loading snapshot (increase [funk.heap_size_gib])" ));
464 0 : memset( meta, 0, sizeof(fd_account_meta_t) );
465 0 : rec->val_gaddr = fd_wksp_gaddr_fast( funk->wksp, meta );
466 0 : rec->val_max = (uint)( fd_ulong_min( alloc_max, FD_FUNK_REC_VAL_MAX ) & FD_FUNK_REC_VAL_MAX );
467 0 : rec->val_sz = (uint)( alloc_sz & FD_FUNK_REC_VAL_MAX );
468 :
469 0 : meta->dlen = (uint)result->account_header.data_len;
470 0 : meta->slot = result->account_header.slot;
471 0 : memcpy( meta->owner, result->account_header.owner, sizeof(fd_pubkey_t) );
472 0 : meta->lamports = result->account_header.lamports;
473 0 : meta->executable = (uchar)result->account_header.executable;
474 :
475 0 : ctx->acc_data = (uchar*)meta + sizeof(fd_account_meta_t);
476 0 : ctx->metrics.accounts_inserted++;
477 :
478 0 : if( FD_LIKELY( should_publish ) ) fd_funk_rec_publish( funk, prepare );
479 0 : }
480 :
481 : static void
482 : process_account_data( fd_snapin_tile_t * ctx,
483 0 : fd_ssparse_advance_result_t * result ) {
484 0 : if( FD_UNLIKELY( !ctx->acc_data ) ) return;
485 :
486 0 : fd_memcpy( ctx->acc_data, result->account_data.data, result->account_data.data_sz );
487 0 : ctx->acc_data += result->account_data.data_sz;
488 0 : }
489 :
490 : /* streamlined_insert inserts an unfragmented account.
491 : Only used while loading a full snapshot, not an incremental. */
492 :
493 : static void
494 : streamlined_insert( fd_snapin_tile_t * ctx,
495 : fd_funk_rec_t * rec,
496 : uchar const * frame,
497 0 : ulong slot ) {
498 0 : ulong data_len = fd_ulong_load_8_fast( frame+0x08UL );
499 0 : ulong lamports = fd_ulong_load_8_fast( frame+0x30UL );
500 0 : ulong rent_epoch = fd_ulong_load_8_fast( frame+0x38UL ); (void)rent_epoch;
501 0 : uchar owner[32]; memcpy( owner, frame+0x40UL, 32UL );
502 0 : _Bool executable = !!frame[ 0x60UL ];
503 :
504 0 : fd_funk_t * funk = ctx->accdb->funk;
505 0 : if( FD_UNLIKELY( data_len > FD_RUNTIME_ACC_SZ_MAX ) ) FD_LOG_ERR(( "Found unusually large account (data_sz=%lu), aborting", data_len ));
506 0 : fd_funk_val_flush( rec, funk->alloc, funk->wksp );
507 0 : ulong const alloc_sz = sizeof(fd_account_meta_t)+data_len;
508 0 : ulong alloc_max;
509 0 : fd_account_meta_t * meta = fd_alloc_malloc_at_least( funk->alloc, 16UL, alloc_sz, &alloc_max );
510 0 : if( FD_UNLIKELY( !meta ) ) FD_LOG_ERR(( "Ran out of heap memory while loading snapshot (increase [funk.heap_size_gib])" ));
511 0 : memset( meta, 0, sizeof(fd_account_meta_t) );
512 0 : rec->val_gaddr = fd_wksp_gaddr_fast( funk->wksp, meta );
513 0 : rec->val_max = (uint)( fd_ulong_min( alloc_max, FD_FUNK_REC_VAL_MAX ) & FD_FUNK_REC_VAL_MAX );
514 0 : rec->val_sz = (uint)( alloc_sz & FD_FUNK_REC_VAL_MAX );
515 :
516 : /* Write metadata */
517 0 : meta->dlen = (uint)data_len;
518 0 : meta->slot = slot;
519 0 : memcpy( meta->owner, owner, sizeof(fd_pubkey_t) );
520 0 : meta->lamports = lamports;
521 0 : meta->executable = (uchar)executable;
522 :
523 : /* Write data */
524 0 : uchar * acc_data = (uchar *)( meta+1 );
525 0 : fd_memcpy( acc_data, frame+0x88UL, data_len );
526 :
527 0 : ctx->metrics.accounts_inserted++;
528 0 : }
529 :
530 : /* process_account_batch is a happy path performance optimization
531 : handling insertion of lots of small accounts.
532 :
533 : The main optimization implemented for funk is prefetching hash map
534 : accesses. */
535 :
536 : __attribute__((noinline)) static void
537 : process_account_batch( fd_snapin_tile_t * ctx,
538 0 : fd_ssparse_advance_result_t * result ) {
539 0 : fd_funk_t * funk = ctx->accdb->funk;
540 0 : fd_funk_rec_map_t * rec_map = funk->rec_map;
541 0 : fd_funk_rec_t * rec_tbl = funk->rec_pool->ele;
542 0 : fd_funk_rec_map_shmem_private_chain_t * chain_tbl = fd_funk_rec_map_shmem_private_chain( rec_map->map, 0UL );
543 :
544 : /* Derive map chains */
545 0 : uint chain_idx[ FD_SSPARSE_ACC_BATCH_MAX ];
546 0 : ulong chain_mask = rec_map->map->chain_cnt-1UL;
547 0 : for( ulong i=0UL; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
548 0 : uchar const * frame = result->account_batch.batch[ i ];
549 0 : uchar const * pubkey = frame+0x10UL;
550 0 : ulong memo = fd_funk_rec_key_hash1( pubkey, 0UL, rec_map->map->seed );
551 0 : chain_idx[ i ] = (uint)( memo&chain_mask );
552 0 : }
553 :
554 : /* Parallel load hash chain heads */
555 0 : uint map_node [ FD_SSPARSE_ACC_BATCH_MAX ];
556 0 : uint chain_cnt[ FD_SSPARSE_ACC_BATCH_MAX ];
557 0 : for( ulong i=0UL; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
558 0 : map_node [ i ] = chain_tbl[ chain_idx[ i ] ].head_cidx;
559 0 : chain_cnt[ i ] = (uint)chain_tbl[ chain_idx[ i ] ].ver_cnt;
560 0 : }
561 0 : uint chain_max = 0U;
562 0 : for( ulong i=0UL; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
563 0 : chain_max = fd_uint_max( chain_max, chain_cnt[ i ] );
564 0 : }
565 :
566 : /* Parallel walk hash chains */
567 0 : static fd_funk_rec_t dummy_rec = { .map_next = UINT_MAX };
568 0 : fd_funk_rec_t * rec[ FD_SSPARSE_ACC_BATCH_MAX ] = {0};
569 0 : for( ulong j=0UL; j<chain_max; j++ ) {
570 0 : for( ulong i=0UL; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
571 0 : uchar const * frame = result->account_batch.batch[ i ];
572 0 : uchar const * pubkey = frame+0x10UL;
573 0 : int const has_node = j<chain_cnt[ i ];
574 0 : fd_funk_rec_t * node = has_node ? rec_tbl+map_node[ i ] : &dummy_rec;
575 0 : int const key_match = 0==memcmp( node->pair.key, pubkey, sizeof(fd_funk_rec_key_t) );
576 0 : if( has_node && key_match ) rec[ i ] = node;
577 0 : map_node[ i ] = node->map_next;
578 0 : }
579 0 : }
580 :
581 : /* Create map entries */
582 0 : for( ulong i=0UL; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
583 0 : uchar const * frame = result->account_batch.batch[ i ];
584 0 : uchar const * pubkey = frame+0x10UL;
585 0 : fd_funk_rec_key_t key = FD_LOAD( fd_funk_rec_key_t, pubkey );
586 :
587 0 : fd_funk_rec_t * r = rec[ i ];
588 0 : if( FD_LIKELY( !r ) ) { /* optimize for new account */
589 0 : r = fd_funk_rec_pool_acquire( funk->rec_pool, NULL, 0, NULL );
590 0 : FD_TEST( r );
591 0 : memset( r, 0, sizeof(fd_funk_rec_t) );
592 0 : fd_funk_txn_xid_copy( r->pair.xid, ctx->xid );
593 0 : fd_funk_rec_key_copy( r->pair.key, &key );
594 :
595 : /* Insert to hash map. In theory, a key could appear twice in the
596 : same batch. All accounts in a batch are guaranteed to be from
597 : the same slot though, so this is fine, assuming that accdb code
598 : gracefully handles duplicate hash map entries. */
599 0 : fd_funk_rec_map_shmem_private_chain_t * chain = &chain_tbl[ chain_idx[ i ] ];
600 0 : ulong ver_cnt = chain->ver_cnt;
601 0 : uint head_cidx = chain->head_cidx;
602 0 : chain->ver_cnt = fd_funk_rec_map_private_vcnt( fd_funk_rec_map_private_vcnt_ver( ver_cnt ), fd_funk_rec_map_private_vcnt_cnt( ver_cnt )+1UL );
603 0 : chain->head_cidx = (uint)( r-rec_tbl );
604 0 : r->map_next = head_cidx;
605 0 : rec[ i ] = r;
606 0 : } else { /* existing record for key found */
607 0 : fd_account_meta_t const * existing = fd_funk_val( r, funk->wksp );
608 0 : if( FD_UNLIKELY( !existing ) ) FD_LOG_HEXDUMP_NOTICE(( "r", r, sizeof(fd_funk_rec_t) ));
609 0 : FD_TEST( existing );
610 0 : if( existing->slot > result->account_batch.slot ) {
611 0 : rec[ i ] = NULL; /* skip record if existing value is newer */
612 0 : }
613 0 : }
614 0 : }
615 :
616 : /* Actually insert accounts */
617 0 : for( ulong i=0UL; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
618 0 : if( rec[ i ] ) {
619 0 : streamlined_insert( ctx, rec[ i ], result->account_batch.batch[ i ], result->account_batch.slot );
620 0 : }
621 0 : }
622 0 : }
623 :
624 : static int
625 : handle_data_frag( fd_snapin_tile_t * ctx,
626 : ulong chunk,
627 : ulong sz,
628 0 : fd_stem_context_t * stem ) {
629 0 : if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_FINISHING ) ) {
630 0 : transition_malformed( ctx, stem );
631 0 : return 0;
632 0 : }
633 0 : else if( FD_UNLIKELY( ctx->state==FD_SNAPSHOT_STATE_ERROR ) ) {
634 : /* Ignore all data frags after observing an error in the stream until
635 : we receive fail & init control messages to restart processing. */
636 0 : return 0;
637 0 : }
638 0 : else if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_PROCESSING ) ) {
639 0 : FD_LOG_ERR(( "invalid state for data frag %d", ctx->state ));
640 0 : }
641 :
642 0 : FD_TEST( chunk>=ctx->in.chunk0 && chunk<=ctx->in.wmark && sz<=ctx->in.mtu );
643 :
644 0 : for(;;) {
645 0 : if( FD_UNLIKELY( sz-ctx->in.pos==0UL ) ) break;
646 :
647 0 : uchar const * data = (uchar const *)fd_chunk_to_laddr_const( ctx->in.wksp, chunk ) + ctx->in.pos;
648 :
649 0 : fd_ssparse_advance_result_t result[1];
650 0 : int res = fd_ssparse_advance( ctx->ssparse, data, sz-ctx->in.pos, result );
651 0 : switch( res ) {
652 0 : case FD_SSPARSE_ADVANCE_ERROR:
653 0 : transition_malformed( ctx, stem );
654 0 : return 0;
655 0 : case FD_SSPARSE_ADVANCE_AGAIN:
656 0 : break;
657 0 : case FD_SSPARSE_ADVANCE_MANIFEST: {
658 0 : int res = fd_ssmanifest_parser_consume( ctx->manifest_parser,
659 0 : result->manifest.data,
660 0 : result->manifest.data_sz,
661 0 : result->manifest.acc_vec_map,
662 0 : result->manifest.acc_vec_pool );
663 0 : if( FD_UNLIKELY( res==FD_SSMANIFEST_PARSER_ADVANCE_ERROR ) ) {
664 0 : transition_malformed( ctx, stem );
665 0 : return 0;
666 0 : } else if( FD_LIKELY( res==FD_SSMANIFEST_PARSER_ADVANCE_DONE ) ) {
667 0 : ctx->flags.manifest_done = 1;
668 0 : }
669 0 : break;
670 0 : }
671 0 : case FD_SSPARSE_ADVANCE_STATUS_CACHE: {
672 0 : fd_slot_delta_parser_advance_result_t sd_result[1];
673 0 : ulong bytes_remaining = result->status_cache.data_sz;
674 :
675 0 : while( bytes_remaining ) {
676 0 : int res = fd_slot_delta_parser_consume( ctx->slot_delta_parser,
677 0 : result->status_cache.data,
678 0 : bytes_remaining,
679 0 : sd_result );
680 0 : if( FD_UNLIKELY( res<0 ) ) {
681 0 : transition_malformed( ctx, stem );
682 0 : return 0;
683 0 : } else if( FD_LIKELY( res==FD_SLOT_DELTA_PARSER_ADVANCE_GROUP ) ) {
684 0 : if( FD_UNLIKELY( ctx->blockhash_offsets_len>=FD_SNAPIN_MAX_SLOT_DELTA_GROUPS ) ) FD_LOG_ERR(( "blockhash offsets overflow, max is %lu", FD_SNAPIN_MAX_SLOT_DELTA_GROUPS ));
685 :
686 0 : memcpy( ctx->blockhash_offsets[ ctx->blockhash_offsets_len ].blockhash, sd_result->group.blockhash, 32UL );
687 0 : ctx->blockhash_offsets[ ctx->blockhash_offsets_len ].txnhash_offset = sd_result->group.txnhash_offset;
688 0 : ctx->blockhash_offsets_len++;
689 0 : } else if( FD_LIKELY( res==FD_SLOT_DELTA_PARSER_ADVANCE_ENTRY ) ) {
690 0 : if( FD_UNLIKELY( ctx->txncache_entries_len>=FD_SNAPIN_TXNCACHE_MAX_ENTRIES ) ) FD_LOG_ERR(( "txncache entries overflow, max is %lu", FD_SNAPIN_TXNCACHE_MAX_ENTRIES ));
691 0 : ctx->txncache_entries[ ctx->txncache_entries_len++ ] = *sd_result->entry;
692 0 : }
693 :
694 0 : bytes_remaining -= sd_result->bytes_consumed;
695 0 : result->status_cache.data += sd_result->bytes_consumed;
696 0 : }
697 :
698 0 : ctx->flags.status_cache_done = fd_slot_delta_parser_consume( ctx->slot_delta_parser, result->status_cache.data, 0UL, sd_result )==FD_SLOT_DELTA_PARSER_ADVANCE_DONE;
699 0 : break;
700 0 : }
701 0 : case FD_SSPARSE_ADVANCE_ACCOUNT_HEADER:
702 0 : process_account_header( ctx, result );
703 0 : break;
704 0 : case FD_SSPARSE_ADVANCE_ACCOUNT_DATA:
705 0 : process_account_data( ctx, result );
706 0 : break;
707 0 : case FD_SSPARSE_ADVANCE_ACCOUNT_BATCH:
708 0 : process_account_batch( ctx, result );
709 0 : break;
710 0 : case FD_SSPARSE_ADVANCE_DONE:
711 0 : ctx->state = FD_SNAPSHOT_STATE_FINISHING;
712 0 : break;
713 0 : default:
714 0 : FD_LOG_ERR(( "unexpected fd_ssparse_advance result %d", res ));
715 0 : break;
716 0 : }
717 :
718 0 : if( FD_UNLIKELY( !ctx->flags.manifest_processed && ctx->flags.manifest_done && ctx->flags.status_cache_done ) ) {
719 0 : process_manifest( ctx );
720 0 : ctx->flags.manifest_processed = 1;
721 0 : }
722 :
723 0 : ctx->in.pos += result->bytes_consumed;
724 0 : if( FD_LIKELY( ctx->full ) ) ctx->metrics.full_bytes_read += result->bytes_consumed;
725 0 : else ctx->metrics.incremental_bytes_read += result->bytes_consumed;
726 0 : }
727 :
728 0 : int reprocess_frag = ctx->in.pos<sz;
729 0 : if( FD_LIKELY( !reprocess_frag ) ) ctx->in.pos = 0UL;
730 0 : return reprocess_frag;
731 0 : }
732 :
733 : static void
734 : handle_control_frag( fd_snapin_tile_t * ctx,
735 : fd_stem_context_t * stem,
736 0 : ulong sig ) {
737 0 : fd_funk_t * funk = ctx->accdb->funk;
738 0 : switch( sig ) {
739 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_FULL:
740 0 : case FD_SNAPSHOT_MSG_CTRL_INIT_INCR:
741 0 : fd_ssparse_batch_enable( ctx->ssparse, sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL );
742 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
743 0 : ctx->state = FD_SNAPSHOT_STATE_PROCESSING;
744 0 : ctx->full = sig==FD_SNAPSHOT_MSG_CTRL_INIT_FULL;
745 0 : ctx->txncache_entries_len = 0UL;
746 0 : ctx->blockhash_offsets_len = 0UL;
747 0 : fd_txncache_reset( ctx->txncache );
748 0 : fd_ssparse_reset( ctx->ssparse );
749 0 : fd_ssmanifest_parser_init( ctx->manifest_parser, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ) );
750 0 : fd_slot_delta_parser_init( ctx->slot_delta_parser );
751 0 : fd_memset( &ctx->flags, 0, sizeof(ctx->flags) );
752 0 : break;
753 :
754 0 : case FD_SNAPSHOT_MSG_CTRL_FAIL:
755 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ||
756 0 : ctx->state==FD_SNAPSHOT_STATE_FINISHING ||
757 0 : ctx->state==FD_SNAPSHOT_STATE_ERROR );
758 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
759 :
760 0 : if( ctx->full ) {
761 0 : fd_accdb_clear( ctx->accdb_admin );
762 0 : } else {
763 0 : fd_accdb_cancel( ctx->accdb_admin, ctx->xid );
764 0 : fd_funk_txn_xid_copy( ctx->xid, fd_funk_last_publish( funk ) );
765 0 : }
766 0 : break;
767 :
768 0 : case FD_SNAPSHOT_MSG_CTRL_NEXT: {
769 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ||
770 0 : ctx->state==FD_SNAPSHOT_STATE_FINISHING ||
771 0 : ctx->state==FD_SNAPSHOT_STATE_ERROR );
772 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_FINISHING ) ) {
773 0 : transition_malformed( ctx, stem );
774 0 : return;
775 0 : }
776 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
777 :
778 0 : fd_funk_txn_xid_t incremental_xid = { .ul={ LONG_MAX, LONG_MAX } };
779 0 : fd_accdb_attach_child( ctx->accdb_admin, ctx->xid, &incremental_xid );
780 0 : fd_funk_txn_xid_copy( ctx->xid, &incremental_xid );
781 0 : break;
782 0 : }
783 :
784 0 : case FD_SNAPSHOT_MSG_CTRL_DONE: {
785 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_PROCESSING ||
786 0 : ctx->state==FD_SNAPSHOT_STATE_FINISHING ||
787 0 : ctx->state==FD_SNAPSHOT_STATE_ERROR );
788 0 : if( FD_UNLIKELY( ctx->state!=FD_SNAPSHOT_STATE_FINISHING ) ) {
789 0 : transition_malformed( ctx, stem );
790 0 : return;
791 0 : }
792 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
793 :
794 0 : uchar slot_history_mem[ FD_SYSVAR_SLOT_HISTORY_FOOTPRINT ];
795 0 : fd_slot_history_global_t * slot_history = fd_sysvar_slot_history_read( funk, ctx->xid, slot_history_mem );
796 0 : if( FD_UNLIKELY( verify_slot_deltas_with_slot_history( ctx, slot_history ) ) ) {
797 0 : FD_LOG_WARNING(( "slot deltas verification failed" ));
798 0 : transition_malformed( ctx, stem );
799 0 : break;
800 0 : }
801 :
802 : /* Publish any remaining funk txn */
803 0 : if( FD_LIKELY( fd_funk_last_publish_is_frozen( funk ) ) ) {
804 0 : fd_accdb_advance_root( ctx->accdb_admin, ctx->xid );
805 0 : }
806 0 : FD_TEST( !fd_funk_last_publish_is_frozen( funk ) );
807 :
808 : /* Make 'Last published' XID equal the restored slot number */
809 0 : fd_funk_txn_xid_t target_xid = { .ul = { ctx->bank_slot, 0UL } };
810 0 : fd_accdb_attach_child( ctx->accdb_admin, ctx->xid, &target_xid );
811 0 : fd_accdb_advance_root( ctx->accdb_admin, &target_xid );
812 0 : fd_funk_txn_xid_copy( ctx->xid, &target_xid );
813 :
814 0 : fd_stem_publish( stem, FD_SNAPIN_OUT_MANIFEST, fd_ssmsg_sig( FD_SSMSG_DONE ), 0UL, 0UL, 0UL, 0UL, 0UL );
815 0 : break;
816 0 : }
817 :
818 0 : case FD_SNAPSHOT_MSG_CTRL_SHUTDOWN:
819 0 : FD_TEST( ctx->state==FD_SNAPSHOT_STATE_IDLE );
820 0 : ctx->state = FD_SNAPSHOT_STATE_SHUTDOWN;
821 0 : metrics_write( ctx ); /* ensures that shutdown state is written to metrics workspace before the tile actually shuts down */
822 0 : break;
823 :
824 0 : case FD_SNAPSHOT_MSG_CTRL_ERROR:
825 0 : ctx->state = FD_SNAPSHOT_STATE_ERROR;
826 0 : break;
827 :
828 0 : default:
829 0 : FD_LOG_ERR(( "unexpected control sig %lu", sig ));
830 0 : return;
831 0 : }
832 :
833 : /* Forward the control message down the pipeline */
834 0 : fd_stem_publish( stem, FD_SNAPIN_OUT_SNAPCT, sig, 0UL, 0UL, 0UL, 0UL, 0UL );
835 0 : }
836 :
837 : static inline int
838 : returnable_frag( fd_snapin_tile_t * ctx,
839 : ulong in_idx FD_PARAM_UNUSED,
840 : ulong seq FD_PARAM_UNUSED,
841 : ulong sig,
842 : ulong chunk,
843 : ulong sz,
844 : ulong ctl FD_PARAM_UNUSED,
845 : ulong tsorig FD_PARAM_UNUSED,
846 : ulong tspub FD_PARAM_UNUSED,
847 0 : fd_stem_context_t * stem ) {
848 0 : FD_TEST( ctx->state!=FD_SNAPSHOT_STATE_SHUTDOWN );
849 :
850 0 : ctx->stem = stem;
851 0 : if( FD_UNLIKELY( sig==FD_SNAPSHOT_MSG_DATA ) ) return handle_data_frag( ctx, chunk, sz, stem );
852 0 : else handle_control_frag( ctx, stem, sig );
853 0 : ctx->stem = NULL;
854 :
855 0 : return 0;
856 0 : }
857 :
858 : static ulong
859 : populate_allowed_fds( fd_topo_t const * topo FD_PARAM_UNUSED,
860 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
861 : ulong out_fds_cnt,
862 0 : int * out_fds ) {
863 0 : if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
864 :
865 0 : ulong out_cnt = 0;
866 0 : out_fds[ out_cnt++ ] = 2UL; /* stderr */
867 0 : if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) ) {
868 0 : out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
869 0 : }
870 :
871 0 : return out_cnt;
872 0 : }
873 :
874 : static ulong
875 : populate_allowed_seccomp( fd_topo_t const * topo FD_PARAM_UNUSED,
876 : fd_topo_tile_t const * tile FD_PARAM_UNUSED,
877 : ulong out_cnt,
878 0 : struct sock_filter * out ) {
879 :
880 0 : populate_sock_filter_policy_fd_snapin_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
881 0 : return sock_filter_policy_fd_snapin_tile_instr_cnt;
882 0 : }
883 :
884 :
885 : static void
886 : privileged_init( fd_topo_t * topo,
887 0 : fd_topo_tile_t * tile ) {
888 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
889 :
890 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
891 0 : fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
892 :
893 0 : FD_TEST( fd_rng_secure( &ctx->seed, 8UL ) );
894 0 : }
895 :
896 : FD_FN_UNUSED static void
897 : unprivileged_init( fd_topo_t * topo,
898 0 : fd_topo_tile_t * tile ) {
899 0 : void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
900 :
901 0 : FD_SCRATCH_ALLOC_INIT( l, scratch );
902 0 : fd_snapin_tile_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_snapin_tile_t), sizeof(fd_snapin_tile_t) );
903 0 : void * _ssparse = FD_SCRATCH_ALLOC_APPEND( l, fd_ssparse_align(), fd_ssparse_footprint( 1UL<<24UL ) );
904 0 : void * _txncache = FD_SCRATCH_ALLOC_APPEND( l, fd_txncache_align(), fd_txncache_footprint( tile->snapin.max_live_slots ) );
905 0 : void * _manifest_parser = FD_SCRATCH_ALLOC_APPEND( l, fd_ssmanifest_parser_align(), fd_ssmanifest_parser_footprint() );
906 0 : void * _sd_parser = FD_SCRATCH_ALLOC_APPEND( l, fd_slot_delta_parser_align(), fd_slot_delta_parser_footprint() );
907 0 : ctx->txncache_entries = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_sstxncache_entry_t), sizeof(fd_sstxncache_entry_t)*FD_SNAPIN_TXNCACHE_MAX_ENTRIES );
908 0 : ctx->blockhash_offsets = FD_SCRATCH_ALLOC_APPEND( l, alignof(blockhash_group_t), sizeof(blockhash_group_t)*FD_SNAPIN_MAX_SLOT_DELTA_GROUPS );
909 :
910 0 : ctx->full = 1;
911 0 : ctx->state = FD_SNAPSHOT_STATE_IDLE;
912 :
913 0 : ctx->boot_timestamp = fd_log_wallclock();
914 :
915 0 : FD_TEST( fd_accdb_admin_join( ctx->accdb_admin, fd_topo_obj_laddr( topo, tile->snapin.funk_obj_id ) ) );
916 0 : FD_TEST( fd_accdb_user_join ( ctx->accdb, fd_topo_obj_laddr( topo, tile->snapin.funk_obj_id ) ) );
917 0 : fd_funk_txn_xid_copy( ctx->xid, fd_funk_root( ctx->accdb_admin->funk ) );
918 :
919 0 : void * _txncache_shmem = fd_topo_obj_laddr( topo, tile->snapin.txncache_obj_id );
920 0 : fd_txncache_shmem_t * txncache_shmem = fd_txncache_shmem_join( _txncache_shmem );
921 0 : FD_TEST( txncache_shmem );
922 0 : ctx->txncache = fd_txncache_join( fd_txncache_new( _txncache, txncache_shmem ) );
923 0 : FD_TEST( ctx->txncache );
924 :
925 0 : ctx->txncache_entries_len = 0UL;
926 0 : ctx->blockhash_offsets_len = 0UL;
927 :
928 0 : ctx->ssparse = fd_ssparse_new( _ssparse, 1UL<<24UL, ctx->seed );
929 0 : FD_TEST( ctx->ssparse );
930 :
931 0 : ctx->manifest_parser = fd_ssmanifest_parser_join( fd_ssmanifest_parser_new( _manifest_parser ) );
932 0 : FD_TEST( ctx->manifest_parser );
933 :
934 0 : ctx->slot_delta_parser = fd_slot_delta_parser_join( fd_slot_delta_parser_new( _sd_parser ) );
935 0 : FD_TEST( ctx->slot_delta_parser );
936 :
937 0 : fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
938 :
939 0 : if( FD_UNLIKELY( tile->kind_id ) ) FD_LOG_ERR(( "There can only be one `" NAME "` tile" ));
940 0 : if( FD_UNLIKELY( tile->in_cnt!=1UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu ins, expected 1", tile->in_cnt ));
941 0 : if( FD_UNLIKELY( tile->out_cnt!=2UL ) ) FD_LOG_ERR(( "tile `" NAME "` has %lu outs, expected 2", tile->out_cnt ));
942 :
943 0 : fd_topo_link_t * snapct_link = &topo->links[ tile->out_link_id[ FD_SNAPIN_OUT_SNAPCT ] ];
944 0 : FD_TEST( 0==strcmp( snapct_link->name, "snapin_rd" ) );
945 :
946 0 : fd_topo_link_t * writer_link = &topo->links[ tile->out_link_id[ FD_SNAPIN_OUT_MANIFEST ] ];
947 0 : FD_TEST( 0==strcmp( writer_link->name, "snapin_manif" ) );
948 0 : ctx->manifest_out.wksp = topo->workspaces[ topo->objs[ writer_link->dcache_obj_id ].wksp_id ].wksp;
949 0 : ctx->manifest_out.chunk0 = fd_dcache_compact_chunk0( fd_wksp_containing( writer_link->dcache ), writer_link->dcache );
950 0 : ctx->manifest_out.wmark = fd_dcache_compact_wmark ( ctx->manifest_out.wksp, writer_link->dcache, writer_link->mtu );
951 0 : ctx->manifest_out.chunk = ctx->manifest_out.chunk0;
952 0 : ctx->manifest_out.mtu = writer_link->mtu;
953 :
954 0 : fd_ssparse_reset( ctx->ssparse );
955 0 : fd_ssmanifest_parser_init( ctx->manifest_parser, fd_chunk_to_laddr( ctx->manifest_out.wksp, ctx->manifest_out.chunk ) );
956 0 : fd_slot_delta_parser_init( ctx->slot_delta_parser );
957 :
958 0 : fd_topo_link_t const * in_link = &topo->links[ tile->in_link_id[ 0UL ] ];
959 0 : fd_topo_wksp_t const * in_wksp = &topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ];
960 0 : ctx->in.wksp = in_wksp->wksp;;
961 0 : ctx->in.chunk0 = fd_dcache_compact_chunk0( ctx->in.wksp, in_link->dcache );
962 0 : ctx->in.wmark = fd_dcache_compact_wmark( ctx->in.wksp, in_link->dcache, in_link->mtu );
963 0 : ctx->in.mtu = in_link->mtu;
964 0 : ctx->in.pos = 0UL;
965 :
966 0 : fd_memset( &ctx->flags, 0, sizeof(ctx->flags) );
967 0 : }
968 :
969 : /* Control fragments can result in one extra publish to forward the
970 : message down the pipeline, in addition to the result / malformed
971 : message / etc. */
972 0 : #define STEM_BURST 2UL
973 :
974 0 : #define STEM_LAZY 1000L
975 :
976 0 : #define STEM_CALLBACK_CONTEXT_TYPE fd_snapin_tile_t
977 0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_snapin_tile_t)
978 :
979 : #define STEM_CALLBACK_SHOULD_SHUTDOWN should_shutdown
980 0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
981 0 : #define STEM_CALLBACK_RETURNABLE_FRAG returnable_frag
982 :
983 : #include "../../disco/stem/fd_stem.c"
984 :
985 : fd_topo_run_tile_t fd_tile_snapin = {
986 : .name = NAME,
987 : .populate_allowed_fds = populate_allowed_fds,
988 : .populate_allowed_seccomp = populate_allowed_seccomp,
989 : .scratch_align = scratch_align,
990 : .scratch_footprint = scratch_footprint,
991 : .privileged_init = privileged_init,
992 : .unprivileged_init = unprivileged_init,
993 : .run = stem_run,
994 : };
995 :
996 : #undef NAME
|