Line data Source code
1 : #define _DEFAULT_SOURCE /* madvise */
2 : #include "fd_snapwm_tile_private.h"
3 : #include "utils/fd_ssctrl.h"
4 : #include "utils/fd_ssparse.h"
5 : #include "utils/fd_vinyl_io_wd.h"
6 : #include "../../ballet/lthash/fd_lthash.h"
7 : #include "../../ballet/lthash/fd_lthash_adder.h"
8 :
9 : #include <errno.h>
10 : #include <fcntl.h> /* open */
11 : #include <sys/mman.h> /* mmap, madvise */
12 : #include <sys/stat.h> /* fstat */
13 : #include <unistd.h> /* close */
14 :
15 : #include "generated/fd_snapwm_tile_vinyl_seccomp.h"
16 :
17 : FD_STATIC_ASSERT( WD_WR_FSEQ_CNT_MAX<=FD_TOPO_MAX_TILE_IN_LINKS, "WD_WR_FSEQ_CNT_MAX" );
18 :
19 : /**********************************************************************\
20 :
21 : Vinyl 101:
22 : - Vinyl is Firedancer's main account database
23 : - Vinyl is comprised of several components on-disk and in-memory
24 : - vinyl_bstream is a single file containing all vinyl records
25 : - vinyl_bstream is the source of truth
26 : - vinyl_meta indexes the latest revisions of all elements in
27 : vinyl_bstream
28 : - Vinyl has an in-memory caching layer, but snapwm does not use it
29 :
30 : The snapshot loader must:
31 : - Load the most recent version of each account into bstream
32 : - Create a full vinyl_meta index of accounts
33 : - Recover from load failures and retry
34 :
35 : Note on I/O layers:
36 : - io_mm is the slow/generic memory mapped I/O backend.
37 : - io_wd is the fast/dumb O_DIRECT backend. Can only append, thus used
38 : for hot path account writing.
39 : - io_mm and io_wd cannot be active at the same time -- snapwm will
40 : switch between them as necessary.
41 :
42 : Full snapshot logic:
43 : - Write accounts to bstream (io_wd)
44 : - Synchronously populate the vinyl_meta index while writing
45 : - On load failure, destroy and recreate the bstream (io_mm)
46 :
47 : Incremental snapshot logic:
48 : - Phase 1: while reading the incremental snapshot
49 : - Write accounts to bstream without updating the index (io_wd)
50 : - On load failure, undo writes done to bstream (io_mm)
51 : - Phase 2: once read is done
52 : - Replay all elements written to bstream (io_mm)
53 : - Populate the vinyl_meta index while replaying
54 :
55 : \**********************************************************************/
56 :
57 : void
58 : fd_snapwm_vinyl_privileged_init( fd_snapwm_tile_t * ctx,
59 : fd_topo_t * topo,
60 0 : fd_topo_tile_t * tile ) {
61 0 : void * shmap = fd_topo_obj_laddr( topo, tile->snapwm.vinyl_meta_map_obj_id );
62 0 : void * shele = fd_topo_obj_laddr( topo, tile->snapwm.vinyl_meta_pool_obj_id );
63 :
64 0 : FD_TEST( fd_vinyl_meta_join( ctx->vinyl.map, shmap, shele ) );
65 :
66 : /* Set up io_mm dependencies */
67 :
68 0 : char const * bstream_path = tile->snapwm.vinyl_path;
69 0 : int bstream_fd = open( bstream_path, O_RDWR|O_CLOEXEC, 0644 );
70 0 : if( FD_UNLIKELY( bstream_fd<0 ) ) {
71 0 : FD_LOG_ERR(( "open(%s,O_RDWR|O_CLOEXEC,0644) failed (%i-%s)",
72 0 : bstream_path, errno, fd_io_strerror( errno ) ));
73 0 : }
74 :
75 0 : struct stat st;
76 0 : if( FD_UNLIKELY( fstat( bstream_fd, &st )!=0 ) ) {
77 0 : FD_LOG_ERR(( "fstat(%s) failed (%i-%s)",
78 0 : bstream_path, errno, fd_io_strerror( errno ) ));
79 0 : }
80 0 : ulong bstream_sz = (ulong)st.st_size;
81 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( bstream_sz, FD_VINYL_BSTREAM_BLOCK_SZ ) ) ) {
82 0 : FD_LOG_ERR(( "vinyl file %s has misaligned size (%lu bytes)", bstream_path, bstream_sz ));
83 0 : }
84 :
85 0 : void * bstream_mem = mmap( NULL, bstream_sz, PROT_READ|PROT_WRITE, MAP_SHARED, bstream_fd, 0 );
86 0 : if( FD_UNLIKELY( bstream_mem==MAP_FAILED ) ) {
87 0 : FD_LOG_ERR(( "mmap(sz=%lu,PROT_READ|PROT_WRITE,MAP_SHARED,path=%s,off=0) failed (%i-%s)",
88 0 : bstream_sz, bstream_path, errno, fd_io_strerror( errno ) ));
89 0 : }
90 :
91 0 : if( FD_UNLIKELY( 0!=close( bstream_fd ) ) ) { /* clean up unused fd */
92 0 : FD_LOG_ERR(( "close(fd=%i) failed (%i-%s)",
93 0 : bstream_fd, errno, fd_io_strerror( errno ) ));
94 0 : }
95 :
96 0 : ctx->vinyl.bstream_mem = bstream_mem;
97 0 : ctx->vinyl.bstream_sz = bstream_sz;
98 :
99 0 : FD_TEST( fd_rng_secure( &ctx->vinyl.io_seed, 8UL ) );
100 0 : }
101 :
102 : static void
103 0 : io_mm_align_4k( fd_snapwm_tile_t * ctx ) {
104 0 : fd_vinyl_io_t * io_mm = ctx->vinyl.io_mm;
105 0 : if( FD_UNLIKELY( io_mm->seq_future!=0UL ) ) {
106 0 : FD_LOG_CRIT(( "unexpected io_mm state (seq_future=%lu)", io_mm->seq_future ));
107 0 : }
108 0 : uchar * mmio = fd_vinyl_mmio ( io_mm );
109 0 : ulong mmio_sz = fd_vinyl_mmio_sz( io_mm );
110 :
111 0 : ulong bstream_preamble = fd_ulong_align_up( FD_VINYL_BSTREAM_BLOCK_SZ, 4096UL ) - FD_VINYL_BSTREAM_BLOCK_SZ;
112 0 : FD_CRIT( bstream_preamble<=mmio_sz, "bstream too small for 4k alignment" );
113 :
114 0 : fd_memset( mmio, 0, bstream_preamble );
115 0 : io_mm->seq_present += bstream_preamble;
116 0 : io_mm->seq_future += bstream_preamble;
117 0 : }
118 :
119 : void
120 : fd_snapwm_vinyl_unprivileged_init( fd_snapwm_tile_t * ctx,
121 : fd_topo_t * topo,
122 : fd_topo_tile_t * tile,
123 : void * io_mm_mem,
124 0 : void * io_wd_mem ) {
125 :
126 : /* Set up io_mm */
127 :
128 0 : ctx->vinyl.io_mm =
129 0 : fd_vinyl_io_mm_init( io_mm_mem,
130 0 : FD_SNAPWM_IO_SPAD_MAX,
131 0 : ctx->vinyl.bstream_mem,
132 0 : ctx->vinyl.bstream_sz,
133 0 : 1,
134 0 : "accounts-v0", 12UL,
135 0 : ctx->vinyl.io_seed );
136 0 : if( FD_UNLIKELY( !ctx->vinyl.io_mm ) ) {
137 0 : FD_LOG_ERR(( "fd_vinyl_io_mm_init failed" ));
138 0 : }
139 :
140 : /* Write out zero blocks to align the bstream by 4096 bytes
141 : (Assuming a 512 byte sync block) */
142 :
143 0 : io_mm_align_4k( ctx );
144 :
145 : /* Set up io_wd dependencies */
146 :
147 0 : ulong wr_link_id = fd_topo_find_tile_out_link( topo, tile, "snapwm_wh", 0UL );
148 0 : if( FD_UNLIKELY( wr_link_id==ULONG_MAX ) ) FD_LOG_CRIT(( "snapwm_wh link not found" ));
149 0 : fd_topo_link_t * wr_link = &topo->links[ tile->out_link_id[ wr_link_id ] ];
150 :
151 0 : if( FD_UNLIKELY( tile->snapwm.snapwr_depth != fd_mcache_depth( wr_link->mcache ) ) ) {
152 : /* FIXME TOCTOU issue ... A malicious downstream tile could
153 : theoretically corrupt mcache->depth and cause an OOB access
154 : while snapwm is still initializing. Practically not an
155 : issue because the system is not exposed to attacker-
156 : controlled input at boot time. */
157 0 : FD_LOG_CRIT(( "snapwm_wr link mcache depth %lu does not match snapwr_depth %lu",
158 0 : fd_mcache_depth( wr_link->mcache ), tile->snapwm.snapwr_depth ));
159 0 : }
160 :
161 0 : ulong expected_wr_link_consumers_cnt = fd_topo_tile_name_cnt( topo, "snapwh" );
162 0 : if( FD_UNLIKELY( fd_topo_link_reliable_consumer_cnt( topo, wr_link )!=expected_wr_link_consumers_cnt ) ) {
163 0 : FD_LOG_CRIT(( "snapwm_wr link must have exactly %lu reliable consumers", expected_wr_link_consumers_cnt ));
164 0 : }
165 :
166 0 : ulong const * wh_fseq[WD_WR_FSEQ_CNT_MAX];
167 0 : ulong wh_fseq_cnt = 0UL;
168 0 : ulong wh_fseq_cnt_expected = fd_topo_tile_name_cnt( topo, "snapwh" );
169 0 : FD_TEST( wh_fseq_cnt_expected<=WD_WR_FSEQ_CNT_MAX );
170 0 : FD_TEST( wh_fseq_cnt_expected==fd_topo_link_reliable_consumer_cnt( topo, wr_link ) );
171 0 : for( ulong tile_idx=0UL; tile_idx<topo->tile_cnt; tile_idx++ ) {
172 0 : fd_topo_tile_t const * consumer_tile = &topo->tiles[ tile_idx ];
173 0 : for( ulong in_idx=0UL; in_idx<consumer_tile->in_cnt; in_idx++ ) {
174 0 : if( consumer_tile->in_link_id[ in_idx ]==wr_link->id ) {
175 0 : FD_TEST( wh_fseq_cnt<WD_WR_FSEQ_CNT_MAX );
176 0 : wh_fseq[ wh_fseq_cnt ] = consumer_tile->in_link_fseq[ in_idx ];
177 0 : wh_fseq_cnt++;
178 0 : }
179 0 : }
180 0 : }
181 0 : if( FD_UNLIKELY( wh_fseq_cnt!=wh_fseq_cnt_expected ) ) {
182 0 : FD_LOG_ERR(( "unable to find %lu fseq(s) for output link %s:%lu",
183 0 : wh_fseq_cnt, wr_link->name, wr_link->kind_id ));
184 0 : }
185 :
186 : /* Set up io_wd */
187 :
188 0 : ctx->vinyl.io_wd =
189 0 : fd_vinyl_io_wd_init( io_wd_mem,
190 0 : ctx->vinyl.bstream_sz,
191 0 : ctx->vinyl.io_mm->seed,
192 0 : wr_link->mcache,
193 0 : wr_link->dcache,
194 0 : wh_fseq,
195 0 : wh_fseq_cnt,
196 0 : wr_link->mtu );
197 0 : if( FD_UNLIKELY( !ctx->vinyl.io_wd ) ) {
198 0 : FD_LOG_ERR(( "fd_vinyl_io_wd_init failed" ));
199 0 : }
200 :
201 : /* Start by using io_mm */
202 :
203 0 : ctx->vinyl.io = ctx->vinyl.io_mm;
204 :
205 0 : ctx->vinyl.duplicate_accounts_batch_sz = 0UL;
206 0 : ctx->vinyl.duplicate_accounts_batch_cnt = 0UL;
207 :
208 0 : ctx->vinyl.pair_cnt = 0UL;
209 :
210 0 : fd_lthash_adder_new( &ctx->vinyl.adder );
211 0 : fd_lthash_zero( &ctx->vinyl.running_lthash );
212 0 : }
213 :
214 : ulong
215 : fd_snapwm_vinyl_seccomp( ulong out_cnt,
216 0 : struct sock_filter * out ) {
217 0 : populate_sock_filter_policy_fd_snapwm_tile_vinyl( out_cnt, out, (uint)fd_log_private_logfile_fd() );
218 0 : return sock_filter_policy_fd_snapwm_tile_vinyl_instr_cnt;
219 0 : }
220 :
221 : static void
222 0 : vinyl_mm_sync( fd_snapwm_tile_t * ctx ) {
223 0 : if( FD_UNLIKELY( 0!=msync( ctx->vinyl.bstream_mem, ctx->vinyl.bstream_sz, MS_SYNC ) ) ) {
224 0 : FD_LOG_ERR(( "msync(addr=%p,sz=%lu,MS_SYNC) failed (%i-%s)",
225 0 : (void *)ctx->vinyl.bstream_mem, ctx->vinyl.bstream_sz,
226 0 : errno, fd_io_strerror( errno ) ));
227 0 : }
228 0 : }
229 :
230 : /* Faster vinyl meta accesses *****************************************/
231 :
232 : static fd_vinyl_meta_ele_t *
233 : fd_vinyl_meta_prepare_nolock( fd_vinyl_meta_t * join,
234 : fd_vinyl_key_t const * key,
235 0 : ulong memo ) {
236 0 : fd_vinyl_meta_ele_t * ele0 = join->ele;
237 0 : ulong ele_max = join->ele_max;
238 0 : ulong probe_max = join->probe_max;
239 0 : void * ctx = join->ctx;
240 :
241 0 : ulong start_idx = memo & (ele_max-1UL);
242 :
243 0 : for(;;) {
244 :
245 0 : ulong ele_idx = start_idx;
246 :
247 0 : for( ulong probe_rem=probe_max; probe_rem; probe_rem-- ) {
248 0 : fd_vinyl_meta_ele_t * ele = ele0 + ele_idx;
249 :
250 0 : if( FD_LIKELY( fd_vinyl_meta_private_ele_is_free( ctx, ele ) ) || /* opt for low collision */
251 0 : (
252 0 : FD_LIKELY( ele->memo==memo ) &&
253 0 : FD_LIKELY( fd_vinyl_key_eq( &ele->phdr.key, key ) ) /* opt for already in map */
254 0 : ) ) {
255 0 : return ele;
256 0 : }
257 :
258 0 : ele_idx = (ele_idx+1UL) & (ele_max-1UL);
259 0 : }
260 :
261 0 : return NULL;
262 :
263 0 : }
264 :
265 : /* never get here */
266 0 : }
267 :
268 : /* Transactional APIs *************************************************/
269 :
270 : void
271 0 : fd_snapwm_vinyl_txn_begin( fd_snapwm_tile_t * ctx ) {
272 0 : FD_CRIT( !ctx->vinyl.txn_active, "txn_begin called while already in txn" );
273 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
274 0 : fd_vinyl_io_t * io = ctx->vinyl.io_mm;
275 :
276 : /* Finish any outstanding writes */
277 0 : int commit_err = fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
278 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
279 :
280 0 : ctx->vinyl.txn_seq = io->seq_present;
281 0 : ctx->vinyl.txn_active = 1;
282 0 : }
283 :
284 : FD_FN_UNUSED static void
285 : streamlined_hash( fd_lthash_adder_t * restrict adder,
286 : fd_lthash_value_t * restrict running_lthash,
287 0 : uchar const * restrict _pair ) {
288 0 : uchar const * pair = _pair;
289 0 : fd_vinyl_bstream_phdr_t const * phdr = (fd_vinyl_bstream_phdr_t const *)pair;
290 0 : pair += sizeof(fd_vinyl_bstream_phdr_t);
291 0 : fd_account_meta_t const * meta = (fd_account_meta_t const *)pair;
292 0 : pair += sizeof(fd_account_meta_t);
293 0 : uchar const * data = pair;
294 :
295 0 : ulong data_len = meta->dlen;
296 0 : const char * pubkey = phdr->key.c;
297 0 : ulong lamports = meta->lamports;
298 0 : const uchar * owner = meta->owner;
299 0 : uchar executable = (uchar)( !meta->executable ? 0U : 1U) ;
300 :
301 0 : if( FD_UNLIKELY( data_len > FD_RUNTIME_ACC_SZ_MAX ) ) FD_LOG_ERR(( "Found unusually large account (data_sz=%lu), aborting", data_len ));
302 0 : if( FD_UNLIKELY( lamports==0UL ) ) return;
303 :
304 0 : fd_lthash_adder_push_solana_account( adder,
305 0 : running_lthash,
306 0 : pubkey,
307 0 : data,
308 0 : data_len,
309 0 : lamports,
310 0 : executable,
311 0 : owner );
312 0 : }
313 :
314 : void
315 : fd_snapwm_vinyl_txn_commit( fd_snapwm_tile_t * ctx,
316 0 : fd_stem_context_t * stem ) {
317 0 : FD_CRIT( ctx->vinyl.txn_active, "txn_commit called while not in txn" );
318 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
319 0 : fd_vinyl_io_t * io = ctx->vinyl.io_mm;
320 :
321 0 : long dt = -fd_log_wallclock();
322 :
323 : /* Finish any outstanding writes */
324 :
325 0 : int commit_err = fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
326 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
327 :
328 : /* Hint to kernel to start prefetching to speed up reads */
329 :
330 0 : uchar * mmio = fd_vinyl_mmio ( io ); FD_TEST( mmio );
331 0 : ulong mmio_sz = fd_vinyl_mmio_sz( io );
332 :
333 0 : ulong txn_seq0 = ctx->vinyl.txn_seq;
334 0 : ulong txn_seq1 = ctx->vinyl.io_mm->seq_present;
335 0 : FD_LOG_INFO(( "vinyl txn_commit starting for seq [%lu,%lu)", txn_seq0, txn_seq1 ));
336 0 : ulong txn_sz = txn_seq1-txn_seq0;
337 0 : FD_CRIT( fd_vinyl_seq_le( txn_seq0, txn_seq1 ), "invalid txn seq range" );
338 0 : FD_CRIT( txn_seq1 <= mmio_sz, "invalid txn seq range" );
339 0 : if( FD_UNLIKELY( fd_vinyl_seq_eq( txn_seq0, txn_seq1 ) ) ) return;
340 :
341 0 : void * madv_base = (void *)fd_ulong_align_dn( (ulong)mmio+txn_seq0, FD_SHMEM_NORMAL_PAGE_SZ );
342 0 : ulong madv_sz = /* */fd_ulong_align_up( txn_sz, FD_SHMEM_NORMAL_PAGE_SZ );
343 0 : if( FD_UNLIKELY( madvise( madv_base, madv_sz, MADV_SEQUENTIAL ) ) ) {
344 0 : FD_LOG_WARNING(( "madvise(addr=%p,sz=%lu,MADV_SEQUENTIAL) failed (%i-%s)",
345 0 : madv_base, madv_sz,
346 0 : errno, fd_io_strerror( errno ) ));
347 0 : }
348 :
349 : /* Replay incremental account updates */
350 0 : fd_snapwm_vinyl_duplicate_accounts_batch_init( ctx, stem );
351 0 : fd_snapwm_vinyl_duplicate_accounts_lthash_init( ctx, stem );
352 0 : ulong dup_batch_cnt = 0UL;
353 :
354 0 : fd_vinyl_meta_t * meta_map = ctx->vinyl.map;
355 0 : for( ulong seq=txn_seq0; fd_vinyl_seq_lt( seq, txn_seq1 ); ) {
356 0 : fd_vinyl_bstream_block_t * block = (void *)( mmio+seq );
357 :
358 : /* Speculatively read block info */
359 0 : ulong ctl = FD_VOLATILE_CONST( block->ctl );
360 0 : fd_vinyl_bstream_phdr_t phdr = FD_VOLATILE_CONST( block->phdr );
361 :
362 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz ( ctl );
363 0 : int block_type = fd_vinyl_bstream_ctl_type( ctl );
364 0 : ulong block_sz;
365 :
366 0 : if( FD_LIKELY( block_type==FD_VINYL_BSTREAM_CTL_TYPE_PAIR ) ) {
367 0 : block_sz = fd_vinyl_bstream_pair_sz( val_esz );
368 0 : ulong memo = fd_vinyl_key_memo( meta_map->seed, &phdr.key );
369 0 : fd_vinyl_meta_ele_t * ele = fd_vinyl_meta_prepare_nolock( meta_map, &phdr.key, memo );
370 0 : if( FD_UNLIKELY( !ele ) ) FD_LOG_CRIT(( "fd_vinyl_meta_prepare failed (full)" ));
371 :
372 : /* Erase value if existing is newer */
373 0 : if( FD_UNLIKELY( fd_vinyl_meta_ele_in_use( ele ) ) ) { /* key exists */
374 0 : ulong exist_slot = ele->phdr.info.ul[ 1 ];
375 0 : ulong cur_slot = phdr.info.ul[ 1 ];
376 0 : if( exist_slot > cur_slot ) {
377 0 : ctx->metrics.accounts_ignored++;
378 0 : FD_COMPILER_MFENCE();
379 0 : fd_snapwm_vinyl_duplicate_accounts_lthash_append( ctx, (uchar*)block/*pair*/ );
380 0 : FD_COMPILER_MFENCE();
381 0 : fd_memset( block, 0, block_sz );
382 0 : goto next;
383 0 : } else {
384 0 : dup_batch_cnt += (ulong)fd_snapwm_vinyl_duplicate_accounts_batch_append( ctx, &ele->phdr, ele->seq );
385 0 : }
386 0 : ctx->metrics.accounts_replaced++;
387 0 : } else {
388 0 : ctx->vinyl.pair_cnt++;
389 0 : }
390 :
391 : /* Overwrite map entry */
392 0 : ele->memo = memo;
393 0 : ele->phdr = phdr;
394 0 : ele->seq = seq;
395 0 : ele->line_idx = ULONG_MAX;
396 0 : } else if( block_type==FD_VINYL_BSTREAM_CTL_TYPE_ZPAD ) {
397 0 : block_sz = FD_VINYL_BSTREAM_BLOCK_SZ;
398 0 : } else {
399 0 : FD_LOG_CRIT(( "unexpected block type %d", block_type ));
400 0 : }
401 :
402 0 : if( FD_UNLIKELY( !block_sz ) ) {
403 0 : FD_LOG_CRIT(( "Invalid block header at vinyl seq %lu, ctl=%016lx (zero block_sz)", seq, ctl ));
404 0 : }
405 0 : if( FD_UNLIKELY( block_sz > 64UL<<20 ) ) {
406 0 : FD_LOG_CRIT(( "Invalid block header at vinyl seq %lu, ctl=%016lx, block_sz=%lu (unreasonably large block size)", seq, ctl, block_sz ));
407 0 : }
408 :
409 0 : next:
410 0 : seq += block_sz;
411 :
412 0 : if( FD_UNLIKELY( dup_batch_cnt >= FD_SNAPWM_DUP_META_BATCH_CNT_MAX ) ) {
413 0 : fd_snapwm_vinyl_duplicate_accounts_batch_fini( ctx, stem );
414 0 : FD_COMPILER_MFENCE();
415 0 : fd_snapwm_vinyl_duplicate_accounts_batch_init( ctx, stem );
416 0 : dup_batch_cnt = 0UL;
417 0 : }
418 0 : }
419 :
420 : /* Batch fini must be invoked before lthash fini for two reasons:
421 : the batch still needs to be processed downstream and there should
422 : be no fd_stem_publish between batch init and fini. */
423 0 : fd_snapwm_vinyl_duplicate_accounts_batch_fini( ctx, stem );
424 0 : FD_COMPILER_MFENCE();
425 0 : fd_snapwm_vinyl_duplicate_accounts_lthash_fini( ctx, stem );
426 :
427 : /* Persist above erases to disk */
428 :
429 0 : int sync_err = fd_vinyl_io_sync( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
430 0 : if( FD_UNLIKELY( sync_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_sync(io_mm) failed (%i-%s)", sync_err, fd_vinyl_strerror( sync_err ) ));
431 0 : vinyl_mm_sync( ctx );
432 :
433 0 : dt += fd_log_wallclock();
434 0 : FD_LOG_INFO(( "vinyl txn_commit took %g seconds", (double)dt/1e9 ));
435 0 : }
436 :
437 : void
438 0 : fd_snapwm_vinyl_txn_cancel( fd_snapwm_tile_t * ctx ) {
439 0 : FD_CRIT( ctx->vinyl.txn_active, "txn_cancel called while not in txn" );
440 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
441 :
442 0 : fd_vinyl_io_t * io = ctx->vinyl.io_mm;
443 0 : fd_vinyl_io_rewind( io, ctx->vinyl.txn_seq );
444 0 : fd_vinyl_io_sync ( io, FD_VINYL_IO_FLAG_BLOCKING );
445 0 : }
446 :
447 : /* Fast writer ********************************************************/
448 :
449 : void
450 0 : fd_snapwm_vinyl_wd_init( fd_snapwm_tile_t * ctx ) {
451 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
452 :
453 0 : int commit_err = fd_vinyl_io_commit( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
454 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit(io_mm) failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
455 :
456 : /* Flush io_mm */
457 :
458 0 : int sync_err = fd_vinyl_io_sync( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
459 0 : if( FD_UNLIKELY( sync_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_sync(io_mm) failed (%i-%s)", sync_err, fd_vinyl_strerror( sync_err ) ));
460 0 : vinyl_mm_sync( ctx );
461 :
462 : /* Synchronize sequence numbers */
463 :
464 0 : ctx->vinyl.io_wd->seq_ancient = ctx->vinyl.io_mm->seq_ancient;
465 0 : ctx->vinyl.io_wd->seq_past = ctx->vinyl.io_mm->seq_past;
466 0 : ctx->vinyl.io_wd->seq_present = ctx->vinyl.io_mm->seq_present;
467 0 : ctx->vinyl.io_wd->seq_future = ctx->vinyl.io_mm->seq_future;
468 0 : ctx->vinyl.io_wd->spad_used = 0UL;
469 :
470 0 : ctx->vinyl.io = ctx->vinyl.io_wd;
471 0 : }
472 :
473 : void
474 0 : fd_snapwm_vinyl_wd_fini( fd_snapwm_tile_t * ctx ) {
475 0 : if( FD_UNLIKELY( ctx->vinyl.io!=ctx->vinyl.io_wd ) ) return;
476 :
477 0 : int commit_err = fd_vinyl_io_commit( ctx->vinyl.io_wd, FD_VINYL_IO_FLAG_BLOCKING );
478 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit(io_wd) failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
479 :
480 : /* Synchronize sequence numbers */
481 :
482 0 : ctx->vinyl.io_mm->seq_ancient = ctx->vinyl.io_wd->seq_ancient;
483 0 : ctx->vinyl.io_mm->seq_past = ctx->vinyl.io_wd->seq_past;
484 0 : ctx->vinyl.io_mm->seq_present = ctx->vinyl.io_wd->seq_present;
485 0 : ctx->vinyl.io_mm->seq_future = ctx->vinyl.io_wd->seq_future;
486 0 : ctx->vinyl.io_mm->spad_used = 0UL;
487 :
488 0 : ctx->vinyl.io = ctx->vinyl.io_mm;
489 0 : }
490 :
491 : /* bstream_alloc is a faster version of fd_vinyl_io_alloc. Indirect
492 : calls have significant overhead on Zen 5. */
493 :
494 : static uchar *
495 : bstream_alloc( fd_vinyl_io_t * io,
496 : ulong sz,
497 0 : int flags ) {
498 0 : if( FD_LIKELY( io->impl==&fd_vinyl_io_wd_impl ) )
499 0 : return fd_vinyl_io_wd_alloc( io, sz, flags );
500 0 : return fd_vinyl_io_alloc( io, sz, flags );
501 0 : }
502 :
503 : /* fd_snapwm_vinyl_process_account reads and processes a batch of
504 : pre-generated bstream pairs, handles the meta_map, and determines
505 : whether to forward each of the accounts (pairs) to the database. */
506 :
507 : void
508 : fd_snapwm_vinyl_process_account( fd_snapwm_tile_t * ctx,
509 : ulong chunk,
510 : ulong acc_cnt,
511 0 : fd_stem_context_t * stem ) {
512 0 : fd_vinyl_io_t * io = ctx->vinyl.io;
513 0 : fd_vinyl_meta_t * map = ctx->vinyl.map;
514 :
515 0 : uchar * src = fd_chunk_to_laddr( ctx->in.wksp, chunk );
516 :
517 0 : fd_snapwm_vinyl_duplicate_accounts_batch_init( ctx, stem );
518 :
519 0 : for( ulong acc_i=0UL; acc_i<acc_cnt; acc_i++ ) {
520 :
521 0 : fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)src;
522 :
523 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz ( phdr->ctl );
524 :
525 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
526 0 : uchar * pair = bstream_alloc( io, pair_sz, FD_VINYL_IO_FLAG_BLOCKING );
527 0 : uchar * dst = pair;
528 :
529 0 : ulong const account_header_slot = phdr->info.ul[1];
530 :
531 0 : ctx->metrics.accounts_loaded++;
532 :
533 0 : fd_vinyl_meta_ele_t * ele = NULL;
534 0 : if( ctx->full ) {
535 0 : ulong memo = fd_vinyl_key_memo( map->seed, &phdr->key );
536 0 : ele = fd_vinyl_meta_prepare_nolock( map, &phdr->key, memo );
537 0 : if( FD_UNLIKELY( !ele ) ) FD_LOG_CRIT(( "Failed to update vinyl index (full)" ));
538 :
539 0 : if( FD_UNLIKELY( fd_vinyl_meta_ele_in_use( ele ) ) ) {
540 : /* Drop current value if existing is newer */
541 0 : ulong const exist_slot = ele->phdr.info.ul[ 1 ];
542 0 : if( FD_UNLIKELY( exist_slot > account_header_slot ) ) {
543 0 : ctx->metrics.accounts_ignored++;
544 0 : src += pair_sz;
545 0 : continue;
546 0 : } else {
547 0 : fd_snapwm_vinyl_duplicate_accounts_batch_append( ctx, &ele->phdr, ele->seq );
548 0 : }
549 0 : ctx->metrics.accounts_replaced++;
550 0 : } else {
551 0 : ctx->vinyl.pair_cnt++;
552 0 : }
553 :
554 0 : ele->memo = memo;
555 0 : ele->phdr.ctl = phdr->ctl;
556 0 : ele->phdr.key = phdr->key;
557 0 : ele->phdr.info = phdr->info;
558 0 : ele->seq = ULONG_MAX; /* later init */
559 0 : ele->line_idx = ULONG_MAX;
560 0 : }
561 :
562 0 : fd_memcpy( dst, src, pair_sz );
563 0 : src += pair_sz;
564 :
565 0 : ulong seq_after = fd_vinyl_io_append( io, pair, pair_sz );
566 0 : if( ctx->full ) ele->seq = seq_after;
567 0 : }
568 :
569 0 : fd_snapwm_vinyl_duplicate_accounts_batch_fini( ctx, stem );
570 0 : }
571 :
572 : void
573 0 : fd_snapwm_vinyl_shutdown( fd_snapwm_tile_t * ctx ) {
574 0 : int commit_err = fd_vinyl_io_commit( ctx->vinyl.io, FD_VINYL_IO_FLAG_BLOCKING );
575 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit(io) failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
576 0 : int sync_err = fd_vinyl_io_sync( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
577 0 : if( FD_UNLIKELY( sync_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_sync(io_mm) failed (%i-%s)", sync_err, fd_vinyl_strerror( sync_err ) ));
578 0 : vinyl_mm_sync( ctx );
579 :
580 0 : fd_vinyl_io_wd_ctrl( ctx->vinyl.io_wd, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL );
581 0 : }
582 :
583 : void
584 : fd_snapwm_vinyl_read_account( fd_snapwm_tile_t * ctx,
585 : void const * acct_addr,
586 : fd_account_meta_t * meta,
587 : uchar * data,
588 0 : ulong data_max ) {
589 0 : if( FD_UNLIKELY( ctx->vinyl.io!=ctx->vinyl.io_mm ) ) {
590 0 : FD_LOG_CRIT(( "vinyl not in io_mm mode" ));
591 0 : }
592 :
593 0 : memset( meta, 0, sizeof(fd_account_meta_t) );
594 :
595 : /* Query database index */
596 :
597 0 : fd_vinyl_key_t key[1];
598 0 : fd_vinyl_key_init( key, acct_addr, 32UL );
599 0 : ulong memo = fd_vinyl_key_memo( ctx->vinyl.map->seed, key );
600 0 : fd_vinyl_meta_ele_t const * ele = fd_vinyl_meta_prepare_nolock( ctx->vinyl.map, key, memo );
601 0 : if( FD_UNLIKELY( !ele || !fd_vinyl_meta_ele_in_use( ele ) ) ) {
602 : /* account not found */
603 0 : return;
604 0 : }
605 :
606 0 : uchar * mmio = fd_vinyl_mmio ( ctx->vinyl.io_mm );
607 0 : ulong mmio_sz = fd_vinyl_mmio_sz( ctx->vinyl.io_mm );
608 :
609 : /* Validate index record */
610 :
611 0 : ulong const seq0 = ele->seq;
612 0 : ulong const ctl = ele->phdr.ctl;
613 0 : int const ctl_type = fd_vinyl_bstream_ctl_type( ctl );
614 0 : ulong const val_esz = fd_vinyl_bstream_ctl_sz ( ctl );
615 0 : ulong const pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
616 0 : ulong const seq1 = seq0 + pair_sz;
617 0 : ulong const seq_past = ctx->vinyl.io->seq_past;
618 0 : ulong const seq_present = ctx->vinyl.io->seq_present;
619 0 : if( FD_UNLIKELY( ctl_type!=FD_VINYL_BSTREAM_CTL_TYPE_PAIR ) ) {
620 0 : FD_LOG_CRIT(( "corrupt bstream record in index: ctl=%016lx", ctl ));
621 0 : }
622 0 : if( FD_UNLIKELY( val_esz<sizeof(fd_account_meta_t) ||
623 0 : val_esz>sizeof(fd_account_meta_t)+FD_RUNTIME_ACC_SZ_MAX ) ) {
624 0 : FD_LOG_CRIT(( "corrupt bstream record in index: val_esz=%lu", val_esz ));
625 0 : }
626 0 : int bad_past = !(fd_vinyl_seq_le( seq_past, seq0 ) & fd_vinyl_seq_lt( seq0, seq1 ) & fd_vinyl_seq_le( seq1, seq_present ));
627 0 : if( FD_UNLIKELY( bad_past ) ) {
628 0 : FD_LOG_CRIT(( "corrupt bstream record in index: seq[%lu,%lu) not in [seq_past=%lu,seq_present=%lu)",
629 0 : seq0, seq1, seq_past, seq_present ));
630 0 : }
631 :
632 : /* Map seq range to underlying device
633 : In the snapshot loader, it is safe to assume that bstream reads
634 : do not wrap around. */
635 :
636 0 : if( FD_UNLIKELY( seq1>mmio_sz ) ) {
637 0 : FD_LOG_CRIT(( "corrupt bstream record in index: seq[%lu,%lu) exceeds bstream addressable range [0,%lu)",
638 0 : seq0, seq1, mmio_sz ));
639 0 : }
640 :
641 : /* Read from bstream */
642 :
643 0 : ulong seq_meta = seq0 + sizeof(fd_vinyl_bstream_phdr_t);
644 0 : ulong seq_data = seq_meta + sizeof(fd_account_meta_t);
645 :
646 0 : memcpy( meta, mmio+seq_meta, sizeof(fd_account_meta_t) );
647 0 : if( FD_UNLIKELY( sizeof(fd_account_meta_t)+(ulong)meta->dlen > val_esz ) ) {
648 0 : FD_LOG_CRIT(( "corrupt bstream record: seq0=%lu val_esz=%lu dlen=%u", seq0, val_esz, meta->dlen ));
649 0 : }
650 0 : if( FD_UNLIKELY( meta->dlen > data_max ) ) {
651 0 : FD_BASE58_ENCODE_32_BYTES( acct_addr, acct_addr_b58 );
652 0 : FD_LOG_CRIT(( "failed to read account %s: account data size (%lu bytes) exceeds buffer size (%lu bytes)",
653 0 : acct_addr_b58, (ulong)meta->dlen, data_max ));
654 0 : }
655 0 : memcpy( data, mmio+seq_data, meta->dlen );
656 0 : }
657 :
658 : /* handle_hash_out_fseq_check is a blocking operation */
659 : static inline void
660 : handle_hash_out_fseq_check( fd_snapwm_tile_t * ctx,
661 : fd_stem_context_t * stem,
662 0 : ulong min_credit ) {
663 0 : ulong producer_fseq = fd_fseq_query( &stem->seqs[ ctx->hash_out.idx ] );
664 0 : ulong consumer_fseq = fd_fseq_query( ctx->hash_out.consumer_fseq );
665 0 : for(;;) {
666 0 : ulong avail = ctx->hash_out.depth - ( producer_fseq - consumer_fseq );
667 0 : if( FD_LIKELY( avail > min_credit ) ) break;
668 0 : FD_SPIN_PAUSE();
669 0 : consumer_fseq = fd_fseq_query( ctx->hash_out.consumer_fseq );
670 0 : }
671 0 : }
672 :
673 : int
674 : fd_snapwm_vinyl_duplicate_accounts_batch_init( fd_snapwm_tile_t * ctx,
675 0 : fd_stem_context_t * stem ) {
676 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return 0;
677 0 : ctx->vinyl.duplicate_accounts_batch_sz = 0UL;
678 0 : ctx->vinyl.duplicate_accounts_batch_cnt = 0UL;
679 :
680 : /* fseq check is mandatory here, since append writes directly to
681 : the dcache. */
682 0 : handle_hash_out_fseq_check( ctx, stem, FD_SNAPWM_DUP_BATCH_CREDIT_MIN );
683 0 : return 1;
684 0 : }
685 :
686 : int
687 : fd_snapwm_vinyl_duplicate_accounts_batch_append( fd_snapwm_tile_t * ctx,
688 : fd_vinyl_bstream_phdr_t * phdr,
689 0 : ulong seq ) {
690 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return 0;
691 0 : uchar * data = fd_chunk_to_laddr( ctx->hash_out.mem, ctx->hash_out.chunk );
692 0 : data += ctx->vinyl.duplicate_accounts_batch_sz; /* offset into the chunk */
693 0 : memcpy( data, &seq, sizeof(ulong) );
694 0 : memcpy( data + sizeof(ulong), phdr, sizeof(fd_vinyl_bstream_phdr_t) );
695 0 : ctx->vinyl.duplicate_accounts_batch_sz += FD_SNAPWM_DUP_META_SZ;
696 0 : ctx->vinyl.duplicate_accounts_batch_cnt +=1UL;
697 0 : return 1;
698 0 : }
699 :
700 : int
701 : fd_snapwm_vinyl_duplicate_accounts_batch_fini( fd_snapwm_tile_t * ctx,
702 0 : fd_stem_context_t * stem ) {
703 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return 0;
704 :
705 : /* There is no fseq check in batch_fini. This is a performance
706 : optimization, which requires no other fd_stem_publish on the
707 : output link in between init and fini. */
708 :
709 0 : ulong batch_sz = ctx->vinyl.duplicate_accounts_batch_sz;
710 0 : ulong batch_cnt = ctx->vinyl.duplicate_accounts_batch_cnt;
711 0 : if( FD_UNLIKELY( batch_cnt>FD_SSPARSE_ACC_BATCH_MAX ) ) {
712 0 : FD_LOG_CRIT(( "batch_cnt %lu exceeds FD_SSPARSE_ACC_BATCH_MAX %lu", batch_cnt, FD_SSPARSE_ACC_BATCH_MAX ));
713 0 : }
714 0 : if( FD_UNLIKELY( !batch_sz ) ) return 0;
715 0 : fd_stem_publish( stem, ctx->hash_out.idx, FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH, ctx->hash_out.chunk, batch_sz, 0UL, 0UL, batch_cnt/*tspub*/ );
716 0 : ctx->hash_out.chunk = fd_dcache_compact_next( ctx->hash_out.chunk, batch_sz, ctx->hash_out.chunk0, ctx->hash_out.wmark );
717 0 : return 1;
718 0 : }
719 :
720 : int
721 : fd_snapwm_vinyl_duplicate_accounts_lthash_init( fd_snapwm_tile_t * ctx,
722 0 : fd_stem_context_t * stem ) {
723 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return 0;
724 0 : fd_lthash_zero( &ctx->vinyl.running_lthash );
725 :
726 0 : (void)stem;
727 : /* There is no fseq check in lthash_init, since append uses internal
728 : adder and running_lthash, without accessing the dcache. */
729 0 : return 1;
730 0 : }
731 :
732 : int
733 : fd_snapwm_vinyl_duplicate_accounts_lthash_append( fd_snapwm_tile_t * ctx,
734 0 : uchar * pair ) {
735 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return 0;
736 0 : streamlined_hash( &ctx->vinyl.adder, &ctx->vinyl.running_lthash, pair );
737 0 : return 1;
738 0 : }
739 :
740 : int
741 : fd_snapwm_vinyl_duplicate_accounts_lthash_fini( fd_snapwm_tile_t * ctx,
742 0 : fd_stem_context_t * stem ) {
743 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return 0;
744 :
745 : /* fseq check is mandatory here. */
746 0 : handle_hash_out_fseq_check( ctx, stem, FD_SNAPWM_DUP_LTHASH_CREDIT_MIN );
747 :
748 0 : fd_lthash_adder_flush( &ctx->vinyl.adder, &ctx->vinyl.running_lthash );
749 0 : uchar * data = fd_chunk_to_laddr( ctx->hash_out.mem, ctx->hash_out.chunk );
750 0 : fd_memcpy( data, &ctx->vinyl.running_lthash, FD_LTHASH_LEN_BYTES );
751 0 : fd_stem_publish( stem, ctx->hash_out.idx, FD_SNAPSHOT_HASH_MSG_RESULT_SUB, ctx->hash_out.chunk, FD_LTHASH_LEN_BYTES, 0UL, 0UL, 0UL );
752 0 : ctx->hash_out.chunk = fd_dcache_compact_next( ctx->hash_out.chunk, FD_LTHASH_LEN_BYTES, ctx->hash_out.chunk0, ctx->hash_out.wmark );
753 0 : return 1;
754 0 : }
|