Line data Source code
1 : #define _DEFAULT_SOURCE /* madvise */
2 : #include "fd_snapin_tile_private.h"
3 : #include "utils/fd_ssparse.h"
4 : #include "utils/fd_ssctrl.h"
5 : #include "utils/fd_vinyl_io_wd.h"
6 :
7 : #include <errno.h>
8 : #include <fcntl.h> /* open */
9 : #include <sys/mman.h> /* mmap, madvise */
10 : #include <sys/stat.h> /* fstat */
11 : #include <unistd.h> /* close */
12 :
13 : #include "generated/fd_snapin_tile_vinyl_seccomp.h"
14 :
15 : /**********************************************************************\
16 :
17 : Vinyl 101:
18 : - Vinyl is Firedancer's main account database
19 : - Vinyl is comprised of several components on-disk and in-memory
20 : - vinyl_bstream is a single file containing all vinyl records
21 : - vinyl_bstream is the source of truth
22 : - vinyl_meta indexes the latest revisions of all elements in
23 : vinyl_bstream
24 : - Vinyl has an in-memory caching layer, but snapin does not use it
25 :
26 : The snapshot loader must:
27 : - Load the most recent version of each account into bstream
28 : - Create a full vinyl_meta index of accounts
29 : - Recover from load failures and retry
30 :
31 : Note on I/O layers:
32 : - io_mm is the slow/generic memory mapped I/O backend.
33 : - io_wd is the fast/dumb O_DIRECT backend. Can only append, thus used
34 : for hot path account writing.
35 : - io_mm and io_wd cannot be active at the same time -- snapin will
36 : switch between them as necessary.
37 :
38 : Full snapshot logic:
39 : - Write accounts to bstream (io_wd)
40 : - Synchronously populate the vinyl_meta index while writing
41 : - On load failure, destroy and recreate the bstream (io_mm)
42 :
43 : Incremental snapshot logic:
44 : - Phase 1: while reading the incremental snapshot
45 : - Write accounts to bstream without updating the index (io_wd)
46 : - On load failure, undo writes done to bstream (io_mm)
47 : - Phase 2: once read is done
48 : - Replay all elements written to bstream (io_mm)
49 : - Populate the vinyl_meta index while replaying
50 :
51 : \**********************************************************************/
52 :
53 : void
54 : fd_snapin_vinyl_privileged_init( fd_snapin_tile_t * ctx,
55 : fd_topo_t * topo,
56 0 : fd_topo_tile_t * tile ) {
57 0 : void * shmap = fd_topo_obj_laddr( topo, tile->snapin.vinyl_meta_map_obj_id );
58 0 : void * shele = fd_topo_obj_laddr( topo, tile->snapin.vinyl_meta_pool_obj_id );
59 :
60 0 : FD_TEST( fd_vinyl_meta_join( ctx->vinyl.map, shmap, shele ) );
61 :
62 : /* Set up io_mm dependencies */
63 :
64 0 : char const * bstream_path = tile->snapin.vinyl_path;
65 0 : int bstream_fd = open( bstream_path, O_RDWR|O_CLOEXEC, 0644 );
66 0 : if( FD_UNLIKELY( bstream_fd<0 ) ) {
67 0 : FD_LOG_ERR(( "open(%s,O_RDWR|O_CLOEXEC,0644) failed (%i-%s)",
68 0 : bstream_path, errno, fd_io_strerror( errno ) ));
69 0 : }
70 :
71 0 : struct stat st;
72 0 : if( FD_UNLIKELY( fstat( bstream_fd, &st )!=0 ) ) {
73 0 : FD_LOG_ERR(( "fstat(%s) failed (%i-%s)",
74 0 : bstream_path, errno, fd_io_strerror( errno ) ));
75 0 : }
76 0 : ulong bstream_sz = (ulong)st.st_size;
77 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( bstream_sz, FD_VINYL_BSTREAM_BLOCK_SZ ) ) ) {
78 0 : FD_LOG_ERR(( "vinyl file %s has misaligned size (%lu bytes)", bstream_path, bstream_sz ));
79 0 : }
80 :
81 0 : void * bstream_mem = mmap( NULL, bstream_sz, PROT_READ|PROT_WRITE, MAP_SHARED, bstream_fd, 0 );
82 0 : if( FD_UNLIKELY( bstream_mem==MAP_FAILED ) ) {
83 0 : FD_LOG_ERR(( "mmap(sz=%lu,PROT_READ|PROT_WRITE,MAP_SHARED,path=%s,off=0) failed (%i-%s)",
84 0 : bstream_sz, bstream_path, errno, fd_io_strerror( errno ) ));
85 0 : }
86 :
87 0 : if( FD_UNLIKELY( 0!=close( bstream_fd ) ) ) { /* clean up unused fd */
88 0 : FD_LOG_ERR(( "close(fd=%i) failed (%i-%s)",
89 0 : bstream_fd, errno, fd_io_strerror( errno ) ));
90 0 : }
91 :
92 0 : ctx->vinyl.bstream_mem = bstream_mem;
93 0 : ctx->vinyl.bstream_sz = bstream_sz;
94 :
95 0 : FD_TEST( fd_rng_secure( &ctx->vinyl.io_seed, 8UL ) );
96 0 : }
97 :
98 : static void
99 0 : io_mm_align_4k( fd_snapin_tile_t * ctx ) {
100 0 : fd_vinyl_io_t * io_mm = ctx->vinyl.io_mm;
101 0 : if( FD_UNLIKELY( io_mm->seq_future!=0UL ) ) {
102 0 : FD_LOG_CRIT(( "unexpected io_mm state (seq_future=%lu)", io_mm->seq_future ));
103 0 : }
104 0 : uchar * mmio = fd_vinyl_mmio ( io_mm );
105 0 : ulong mmio_sz = fd_vinyl_mmio_sz( io_mm );
106 :
107 0 : ulong bstream_preamble = fd_ulong_align_up( FD_VINYL_BSTREAM_BLOCK_SZ, 4096UL ) - FD_VINYL_BSTREAM_BLOCK_SZ;
108 0 : FD_CRIT( bstream_preamble<=mmio_sz, "bstream too small for 4k alignment" );
109 :
110 0 : fd_memset( mmio, 0, bstream_preamble );
111 0 : io_mm->seq_present += bstream_preamble;
112 0 : io_mm->seq_future += bstream_preamble;
113 0 : }
114 :
115 : void
116 : fd_snapin_vinyl_unprivileged_init( fd_snapin_tile_t * ctx,
117 : fd_topo_t * topo,
118 : fd_topo_tile_t * tile,
119 : void * io_mm_mem,
120 0 : void * io_wd_mem ) {
121 :
122 : /* Set up io_mm */
123 :
124 0 : ctx->vinyl.io_mm =
125 0 : fd_vinyl_io_mm_init( io_mm_mem,
126 0 : FD_SNAPIN_IO_SPAD_MAX,
127 0 : ctx->vinyl.bstream_mem,
128 0 : ctx->vinyl.bstream_sz,
129 0 : 1,
130 0 : "accounts-v0", 12UL,
131 0 : ctx->vinyl.io_seed );
132 0 : if( FD_UNLIKELY( !ctx->vinyl.io_mm ) ) {
133 0 : FD_LOG_ERR(( "fd_vinyl_io_mm_init failed" ));
134 0 : }
135 :
136 : /* Write out zero blocks to align the bstream by 4096 bytes
137 : (Assuming a 512 byte sync block) */
138 :
139 0 : io_mm_align_4k( ctx );
140 :
141 : /* Set up io_wd dependencies */
142 :
143 0 : ulong wr_link_id = fd_topo_find_tile_out_link( topo, tile, "snapin_wh", 0UL );
144 0 : if( FD_UNLIKELY( wr_link_id==ULONG_MAX ) ) FD_LOG_CRIT(( "snapin_wh link not found" ));
145 0 : fd_topo_link_t * wr_link = &topo->links[ tile->out_link_id[ wr_link_id ] ];
146 :
147 0 : if( FD_UNLIKELY( tile->snapin.snapwr_depth != fd_mcache_depth( wr_link->mcache ) ) ) {
148 : /* FIXME TOCTOU issue ... A malicious downstream tile could
149 : theoretically corrupt mcache->depth and cause an OOB access
150 : while snapin is still initializing. Practically not an
151 : issue because the system is not exposed to attacker-
152 : controlled input at boot time. */
153 0 : FD_LOG_CRIT(( "snapin_wr link mcache depth %lu does not match snapwr_depth %lu",
154 0 : fd_mcache_depth( wr_link->mcache ), tile->snapin.snapwr_depth ));
155 0 : }
156 :
157 0 : if( FD_UNLIKELY( fd_topo_link_reliable_consumer_cnt( topo, wr_link )!=1UL ) ) {
158 0 : FD_LOG_CRIT(( "snapin_wr link must have exactly one reliable consumer" ));
159 0 : }
160 :
161 0 : ulong wh_tile_id = fd_topo_find_tile( topo, "snapwh", 0UL );
162 0 : FD_TEST( wh_tile_id!=ULONG_MAX );
163 0 : fd_topo_tile_t * wh_tile = &topo->tiles[ wh_tile_id ];
164 0 : FD_TEST( wh_tile->in_cnt==1 );
165 0 : FD_TEST( wh_tile->in_link_id[0] == wr_link->id );
166 0 : FD_CRIT( 0==strcmp( topo->links[ wh_tile->in_link_id[ 0 ] ].name, "snapin_wh" ), "unexpected link found" );
167 0 : ulong const * wh_fseq = wh_tile->in_link_fseq[ 0 ];
168 0 : if( FD_UNLIKELY( !wh_fseq ) ) {
169 0 : FD_LOG_CRIT(( "snapin_wr link reliable consumer fseq not found" ));
170 0 : }
171 :
172 : /* Set up io_wd */
173 :
174 0 : ctx->vinyl.io_wd =
175 0 : fd_vinyl_io_wd_init( io_wd_mem,
176 0 : ctx->vinyl.bstream_sz,
177 0 : ctx->vinyl.io_mm->seed,
178 0 : wr_link->mcache,
179 0 : wr_link->dcache,
180 0 : wh_fseq,
181 0 : wr_link->mtu );
182 0 : if( FD_UNLIKELY( !ctx->vinyl.io_wd ) ) {
183 0 : FD_LOG_ERR(( "fd_vinyl_io_wd_init failed" ));
184 0 : }
185 :
186 : /* Start by using io_mm */
187 :
188 0 : ctx->vinyl.io = ctx->vinyl.io_mm;
189 0 : }
190 :
191 : ulong
192 : fd_snapin_vinyl_seccomp( ulong out_cnt,
193 0 : struct sock_filter * out ) {
194 0 : populate_sock_filter_policy_fd_snapin_tile_vinyl( out_cnt, out, (uint)fd_log_private_logfile_fd() );
195 0 : return sock_filter_policy_fd_snapin_tile_vinyl_instr_cnt;
196 0 : }
197 :
198 : static void
199 0 : vinyl_mm_sync( fd_snapin_tile_t * ctx ) {
200 0 : if( FD_UNLIKELY( 0!=msync( ctx->vinyl.bstream_mem, ctx->vinyl.bstream_sz, MS_SYNC ) ) ) {
201 0 : FD_LOG_ERR(( "msync(addr=%p,sz=%lu,MS_SYNC) failed (%i-%s)",
202 0 : (void *)ctx->vinyl.bstream_mem, ctx->vinyl.bstream_sz,
203 0 : errno, fd_io_strerror( errno ) ));
204 0 : }
205 0 : }
206 :
207 : /* Faster vinyl meta accesses *****************************************/
208 :
209 : static fd_vinyl_meta_ele_t *
210 : fd_vinyl_meta_prepare_nolock( fd_vinyl_meta_t * join,
211 : fd_vinyl_key_t const * key,
212 0 : ulong memo ) {
213 0 : fd_vinyl_meta_ele_t * ele0 = join->ele;
214 0 : ulong ele_max = join->ele_max;
215 0 : ulong probe_max = join->probe_max;
216 0 : void * ctx = join->ctx;
217 :
218 0 : ulong start_idx = memo & (ele_max-1UL);
219 :
220 0 : for(;;) {
221 :
222 0 : ulong ele_idx = start_idx;
223 :
224 0 : for( ulong probe_rem=probe_max; probe_rem; probe_rem-- ) {
225 0 : fd_vinyl_meta_ele_t * ele = ele0 + ele_idx;
226 :
227 0 : if( FD_LIKELY( fd_vinyl_meta_private_ele_is_free( ctx, ele ) ) || /* opt for low collision */
228 0 : (
229 0 : FD_LIKELY( ele->memo==memo ) &&
230 0 : FD_LIKELY( fd_vinyl_key_eq( &ele->phdr.key, key ) ) /* opt for already in map */
231 0 : ) ) {
232 0 : return ele;
233 0 : }
234 :
235 0 : ele_idx = (ele_idx+1UL) & (ele_max-1UL);
236 0 : }
237 :
238 0 : return NULL;
239 :
240 0 : }
241 :
242 : /* never get here */
243 0 : }
244 :
245 : /* Transactional APIs *************************************************/
246 :
247 : void
248 0 : fd_snapin_vinyl_txn_begin( fd_snapin_tile_t * ctx ) {
249 0 : FD_CRIT( !ctx->vinyl.txn_active, "txn_begin called while already in txn" );
250 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
251 0 : fd_vinyl_io_t * io = ctx->vinyl.io_mm;
252 :
253 : /* Finish any outstanding writes */
254 0 : int commit_err = fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
255 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
256 :
257 0 : ctx->vinyl.txn_seq = io->seq_present;
258 0 : ctx->vinyl.txn_active = 1;
259 0 : }
260 :
261 : void
262 0 : fd_snapin_vinyl_txn_commit( fd_snapin_tile_t * ctx ) {
263 0 : FD_CRIT( ctx->vinyl.txn_active, "txn_commit called while not in txn" );
264 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
265 0 : fd_vinyl_io_t * io = ctx->vinyl.io_mm;
266 :
267 0 : long dt = -fd_log_wallclock();
268 :
269 : /* Finish any outstanding writes */
270 :
271 0 : int commit_err = fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
272 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
273 :
274 : /* Hint to kernel to start prefetching to speed up reads */
275 :
276 0 : uchar * mmio = fd_vinyl_mmio ( io ); FD_TEST( mmio );
277 0 : ulong mmio_sz = fd_vinyl_mmio_sz( io );
278 :
279 0 : ulong txn_seq0 = ctx->vinyl.txn_seq;
280 0 : ulong txn_seq1 = ctx->vinyl.io_mm->seq_present;
281 0 : FD_LOG_INFO(( "vinyl txn_commit starting for seq [%lu,%lu)", txn_seq0, txn_seq1 ));
282 0 : ulong txn_sz = txn_seq1-txn_seq0;
283 0 : FD_CRIT( fd_vinyl_seq_le( txn_seq0, txn_seq1 ), "invalid txn seq range" );
284 0 : FD_CRIT( txn_seq1 <= mmio_sz, "invalid txn seq range" );
285 0 : if( FD_UNLIKELY( fd_vinyl_seq_eq( txn_seq0, txn_seq1 ) ) ) return;
286 :
287 0 : void * madv_base = (void *)fd_ulong_align_dn( (ulong)mmio+txn_seq0, FD_SHMEM_NORMAL_PAGE_SZ );
288 0 : ulong madv_sz = /* */fd_ulong_align_up( txn_sz, FD_SHMEM_NORMAL_PAGE_SZ );
289 0 : if( FD_UNLIKELY( madvise( madv_base, madv_sz, MADV_SEQUENTIAL ) ) ) {
290 0 : FD_LOG_WARNING(( "madvise(addr=%p,sz=%lu,MADV_SEQUENTIAL) failed (%i-%s)",
291 0 : madv_base, madv_sz,
292 0 : errno, fd_io_strerror( errno ) ));
293 0 : }
294 :
295 : /* Replay incremental account updates */
296 :
297 0 : fd_vinyl_meta_t * meta_map = ctx->vinyl.map;
298 0 : for( ulong seq=txn_seq0; fd_vinyl_seq_lt( seq, txn_seq1 ); ) {
299 0 : fd_vinyl_bstream_block_t * block = (void *)( mmio+seq );
300 :
301 : /* Speculatively read block info */
302 0 : ulong ctl = FD_VOLATILE_CONST( block->ctl );
303 0 : fd_vinyl_bstream_phdr_t phdr = FD_VOLATILE_CONST( block->phdr );
304 :
305 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz ( ctl );
306 0 : int block_type = fd_vinyl_bstream_ctl_type( ctl );
307 0 : ulong block_sz;
308 :
309 0 : if( FD_LIKELY( block_type==FD_VINYL_BSTREAM_CTL_TYPE_PAIR ) ) {
310 0 : block_sz = fd_vinyl_bstream_pair_sz( val_esz );
311 0 : ulong memo = fd_vinyl_key_memo( meta_map->seed, &phdr.key );
312 0 : fd_vinyl_meta_ele_t * ele = fd_vinyl_meta_prepare_nolock( meta_map, &phdr.key, memo );
313 0 : if( FD_UNLIKELY( !ele ) ) FD_LOG_CRIT(( "fd_vinyl_meta_prepare failed (full)" ));
314 :
315 : /* Erase value if existing is newer */
316 0 : if( FD_UNLIKELY( fd_vinyl_meta_ele_in_use( ele ) ) ) { /* key exists */
317 0 : ulong exist_slot = ele->phdr.info.ul[ 1 ];
318 0 : ulong cur_slot = phdr.info.ul[ 1 ];
319 0 : if( exist_slot > cur_slot ) {
320 0 : fd_memset( block, 0, block_sz );
321 0 : goto next;
322 0 : }
323 0 : }
324 :
325 : /* Overwrite map entry */
326 0 : ele->memo = memo;
327 0 : ele->phdr = phdr;
328 0 : ele->seq = seq;
329 0 : ele->line_idx = ULONG_MAX;
330 0 : } else if( block_type==FD_VINYL_BSTREAM_CTL_TYPE_ZPAD ) {
331 0 : block_sz = FD_VINYL_BSTREAM_BLOCK_SZ;
332 0 : } else {
333 0 : FD_LOG_CRIT(( "unexpected block type %d", block_type ));
334 0 : }
335 :
336 :
337 0 : if( FD_UNLIKELY( !block_sz ) ) {
338 0 : FD_LOG_CRIT(( "Invalid block header at vinyl seq %lu, ctl=%016lx (zero block_sz)", seq, ctl ));
339 0 : }
340 0 : if( FD_UNLIKELY( block_sz > 64UL<<20 ) ) {
341 0 : FD_LOG_CRIT(( "Invalid block header at vinyl seq %lu, ctl=%016lx, block_sz=%lu (unreasonably large block size)", seq, ctl, block_sz ));
342 0 : }
343 :
344 0 : next:
345 0 : seq += block_sz;
346 0 : }
347 :
348 : /* Persist above erases to disk */
349 :
350 0 : int sync_err = fd_vinyl_io_sync( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
351 0 : if( FD_UNLIKELY( sync_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_sync(io_mm) failed (%i-%s)", sync_err, fd_vinyl_strerror( sync_err ) ));
352 0 : vinyl_mm_sync( ctx );
353 :
354 0 : dt += fd_log_wallclock();
355 0 : FD_LOG_INFO(( "vinyl txn_commit took %g seconds", (double)dt/1e9 ));
356 0 : }
357 :
358 : void
359 0 : fd_snapin_vinyl_txn_cancel( fd_snapin_tile_t * ctx ) {
360 0 : FD_CRIT( ctx->vinyl.txn_active, "txn_cancel called while not in txn" );
361 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
362 :
363 0 : fd_vinyl_io_t * io = ctx->vinyl.io_mm;
364 0 : fd_vinyl_io_rewind( io, ctx->vinyl.txn_seq );
365 0 : fd_vinyl_io_sync ( io, FD_VINYL_IO_FLAG_BLOCKING );
366 0 : }
367 :
368 : /* Fast writer ********************************************************/
369 :
370 : void
371 0 : fd_snapin_vinyl_wd_init( fd_snapin_tile_t * ctx ) {
372 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
373 :
374 0 : int commit_err = fd_vinyl_io_commit( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
375 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit(io_mm) failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
376 :
377 : /* Flush io_mm */
378 :
379 0 : int sync_err = fd_vinyl_io_sync( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
380 0 : if( FD_UNLIKELY( sync_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_sync(io_mm) failed (%i-%s)", sync_err, fd_vinyl_strerror( sync_err ) ));
381 0 : vinyl_mm_sync( ctx );
382 :
383 : /* Synchronize sequence numbers */
384 :
385 0 : ctx->vinyl.io_wd->seq_ancient = ctx->vinyl.io_mm->seq_ancient;
386 0 : ctx->vinyl.io_wd->seq_past = ctx->vinyl.io_mm->seq_past;
387 0 : ctx->vinyl.io_wd->seq_present = ctx->vinyl.io_mm->seq_present;
388 0 : ctx->vinyl.io_wd->seq_future = ctx->vinyl.io_mm->seq_future;
389 0 : ctx->vinyl.io_wd->spad_used = 0UL;
390 :
391 0 : ctx->vinyl.io = ctx->vinyl.io_wd;
392 0 : }
393 :
394 : void
395 0 : fd_snapin_vinyl_wd_fini( fd_snapin_tile_t * ctx ) {
396 0 : if( FD_UNLIKELY( ctx->vinyl.io!=ctx->vinyl.io_wd ) ) return;
397 :
398 0 : int commit_err = fd_vinyl_io_commit( ctx->vinyl.io_wd, FD_VINYL_IO_FLAG_BLOCKING );
399 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit(io_wd) failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
400 :
401 : /* Synchronize sequence numbers */
402 :
403 0 : ctx->vinyl.io_mm->seq_ancient = ctx->vinyl.io_wd->seq_ancient;
404 0 : ctx->vinyl.io_mm->seq_past = ctx->vinyl.io_wd->seq_past;
405 0 : ctx->vinyl.io_mm->seq_present = ctx->vinyl.io_wd->seq_present;
406 0 : ctx->vinyl.io_mm->seq_future = ctx->vinyl.io_wd->seq_future;
407 0 : ctx->vinyl.io_mm->spad_used = 0UL;
408 :
409 0 : ctx->vinyl.io = ctx->vinyl.io_mm;
410 0 : }
411 :
412 : /* bstream_push_account writes a single account out to bstream. */
413 :
414 : static void
415 0 : bstream_push_account( fd_snapin_tile_t * ctx ) {
416 0 : FD_CRIT( !ctx->vinyl_op.data_rem, "incomplete account store" );
417 0 : FD_CRIT( ctx->vinyl_op.pair, "no store in progres" );
418 :
419 0 : fd_vinyl_io_t * io = ctx->vinyl.io;
420 :
421 0 : uchar * pair = ctx->vinyl_op.pair;
422 0 : ulong pair_sz = ctx->vinyl_op.pair_sz;
423 :
424 0 : ulong seq_after = fd_vinyl_io_append( io, pair, pair_sz );
425 0 : if( ctx->full ) ctx->vinyl_op.meta_ele->seq = seq_after;
426 :
427 0 : ctx->vinyl_op.pair = NULL;
428 0 : ctx->vinyl_op.pair_sz = 0UL;
429 0 : ctx->vinyl_op.dst = NULL;
430 0 : ctx->vinyl_op.dst_rem = 0UL;
431 0 : ctx->vinyl_op.meta_ele = NULL;
432 :
433 0 : ctx->metrics.accounts_inserted++;
434 0 : }
435 :
436 : /* bstream_alloc is a faster version of fd_vinyl_io_alloc. Indirect
437 : calls have significant overhead on Zen 5. */
438 :
439 : static uchar *
440 : bstream_alloc( fd_vinyl_io_t * io,
441 : ulong sz,
442 0 : int flags ) {
443 0 : if( FD_LIKELY( io->impl==&fd_vinyl_io_wd_impl ) )
444 0 : return fd_vinyl_io_wd_alloc( io, sz, flags );
445 0 : return fd_vinyl_io_alloc( io, sz, flags );
446 0 : }
447 :
448 : /* fd_snapin_process_account_header_vinyl prepares a bstream write for
449 : one account (slow) */
450 :
451 : void
452 : fd_snapin_process_account_header_vinyl( fd_snapin_tile_t * ctx,
453 0 : fd_ssparse_advance_result_t * result ) {
454 0 : FD_CRIT( !ctx->vinyl_op.dst_rem, "incomplete account store" );
455 0 : FD_CRIT( !ctx->vinyl_op.pair, "incomplete account store" );
456 :
457 0 : fd_vinyl_io_t * io = ctx->vinyl.io;
458 0 : fd_vinyl_meta_t * map = ctx->vinyl.map;
459 :
460 0 : ulong val_sz = sizeof(fd_account_meta_t) + result->account_header.data_len;
461 0 : FD_CRIT( val_sz<=FD_VINYL_VAL_MAX, "corruption detected" );
462 :
463 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_sz );
464 0 : uchar * pair = bstream_alloc( io, pair_sz, FD_VINYL_IO_FLAG_BLOCKING );
465 :
466 0 : uchar * dst = pair;
467 0 : ulong dst_rem = pair_sz;
468 :
469 0 : FD_CRIT( dst_rem >= sizeof(fd_vinyl_bstream_phdr_t), "corruption detected" );
470 0 : fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)dst;
471 :
472 0 : phdr->ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR, FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz );
473 0 : fd_vinyl_key_init( &phdr->key, result->account_header.pubkey, 32UL );
474 0 : phdr->info.val_sz = (uint)val_sz;
475 0 : phdr->info.ul[1] = result->account_header.slot;
476 :
477 0 : dst += sizeof(fd_vinyl_bstream_phdr_t);
478 0 : dst_rem -= sizeof(fd_vinyl_bstream_phdr_t);
479 :
480 0 : FD_CRIT( dst_rem >= sizeof(fd_account_meta_t), "corruption detected" );
481 0 : fd_account_meta_t * meta = (fd_account_meta_t *)dst;
482 0 : memset( meta, 0, sizeof(fd_account_meta_t) ); /* bulk zero */
483 0 : memcpy( meta->owner, result->account_header.owner, sizeof(fd_pubkey_t) );
484 0 : meta->lamports = result->account_header.lamports;
485 0 : meta->slot = result->account_header.slot;
486 0 : meta->dlen = (uint)result->account_header.data_len;
487 0 : meta->executable = (uchar)result->account_header.executable;
488 :
489 0 : dst += sizeof(fd_account_meta_t);
490 0 : dst_rem -= sizeof(fd_account_meta_t);
491 :
492 0 : FD_CRIT( dst_rem >= result->account_header.data_len, "corruption detected" );
493 0 : if( ctx->full ) { /* update index immediately */
494 0 : ulong memo = fd_vinyl_key_memo( map->seed, &phdr->key );
495 0 : fd_vinyl_meta_ele_t * ele = fd_vinyl_meta_prepare_nolock( map, &phdr->key, memo );
496 0 : if( FD_UNLIKELY( !ele ) ) FD_LOG_CRIT(( "Failed to update vinyl index (full)" ));
497 :
498 0 : if( FD_UNLIKELY( fd_vinyl_meta_ele_in_use( ele ) ) ) {
499 : /* Drop current value if existing is newer */
500 0 : ulong exist_slot = ele->phdr.info.ul[ 1 ];
501 0 : if( exist_slot > result->account_header.slot ) {
502 0 : ctx->vinyl_op.pair = NULL;
503 0 : return;
504 0 : }
505 0 : }
506 :
507 0 : ele->memo = memo;
508 0 : ele->phdr.ctl = phdr->ctl;
509 0 : ele->phdr.key = phdr->key;
510 0 : ele->phdr.info = phdr->info;
511 0 : ele->seq = ULONG_MAX; /* later init */
512 0 : ele->line_idx = ULONG_MAX;
513 0 : ctx->vinyl_op.meta_ele = ele;
514 0 : }
515 :
516 0 : ctx->vinyl_op.pair = pair;
517 0 : ctx->vinyl_op.pair_sz = pair_sz;
518 0 : ctx->vinyl_op.dst = dst;
519 0 : ctx->vinyl_op.dst_rem = dst_rem;
520 0 : ctx->vinyl_op.data_rem = result->account_header.data_len;
521 :
522 0 : if( !ctx->vinyl_op.data_rem ) {
523 0 : bstream_push_account( ctx );
524 0 : }
525 0 : }
526 :
527 : /* fd_snapin_process_account_data_vinyl continues a bstream write (slow) */
528 :
529 : void
530 : fd_snapin_process_account_data_vinyl( fd_snapin_tile_t * ctx,
531 0 : fd_ssparse_advance_result_t * result ) {
532 0 : if( FD_UNLIKELY( !ctx->vinyl_op.pair ) ) return; /* ignored account */
533 :
534 0 : ulong chunk_sz = result->account_data.data_sz;
535 0 : if( FD_LIKELY( chunk_sz ) ) {
536 0 : FD_CRIT( chunk_sz <= ctx->vinyl_op.dst_rem, "corruption detected" );
537 0 : FD_CRIT( chunk_sz <= ctx->vinyl_op.data_rem, "corruption detected" );
538 0 : fd_memcpy( ctx->vinyl_op.dst, result->account_data.data, chunk_sz );
539 0 : ctx->vinyl_op.dst += chunk_sz;
540 0 : ctx->vinyl_op.dst_rem -= chunk_sz;
541 0 : ctx->vinyl_op.data_rem -= chunk_sz;
542 0 : }
543 0 : if( !ctx->vinyl_op.data_rem ) { /* finish store */
544 0 : bstream_push_account( ctx );
545 0 : }
546 0 : }
547 :
548 : /* fd_snapin_process_account_batch_vinyl inserts a batch of unfragmented
549 : accounts (fast path).
550 :
551 : The main optimization implemented is prefetching hash map accesses to
552 : amortize DRAM latency. */
553 :
554 : void
555 : fd_snapin_process_account_batch_vinyl( fd_snapin_tile_t * ctx,
556 0 : fd_ssparse_advance_result_t * result ) {
557 0 : fd_vinyl_meta_t * const map = ctx->vinyl.map;
558 0 : fd_vinyl_meta_ele_t * const ele0 = ctx->vinyl.map->ele;
559 :
560 : /* Derive map slot heads */
561 :
562 0 : ulong memo[ FD_SSPARSE_ACC_BATCH_MAX ];
563 0 : ulong const slot_mask = map->ele_max-1UL;
564 0 : for( ulong i=0UL; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
565 0 : uchar const * frame = result->account_batch.batch[ i ];
566 0 : uchar const * pubkey = frame+0x10UL;
567 0 : fd_vinyl_key_t key[1]; fd_vinyl_key_init( key, pubkey, 32UL );
568 0 : memo[ i ] = fd_vinyl_key_memo( map->seed, key );
569 0 : }
570 :
571 : /* Prefetch slots */
572 :
573 0 : for( ulong i=0UL; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
574 0 : ulong slot_idx = (uint)( memo[ i ]&slot_mask );
575 0 : __builtin_prefetch( ele0+slot_idx );
576 0 : }
577 :
578 : /* Insert map entries */
579 :
580 0 : fd_vinyl_meta_ele_t * batch_ele[ FD_SSPARSE_ACC_BATCH_MAX ];
581 0 : for( ulong i=0UL; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
582 0 : uchar const * frame = result->account_batch.batch[ i ];
583 0 : ulong const data_len = fd_ulong_load_8_fast( frame+0x08UL );
584 0 : uchar const * pubkey = frame+0x10UL;
585 0 : fd_vinyl_key_t key[1]; fd_vinyl_key_init( key, pubkey, 32UL );
586 :
587 0 : fd_vinyl_meta_ele_t * ele = fd_vinyl_meta_prepare_nolock( map, key, memo[ i ] );
588 0 : if( FD_UNLIKELY( !ele ) ) FD_LOG_CRIT(( "Failed to update vinyl index (full)" ));
589 0 : batch_ele[ i ] = ele;
590 :
591 0 : if( FD_UNLIKELY( fd_vinyl_meta_ele_in_use( ele ) ) ) { /* key exists */
592 : /* Drop current value if existing is newer */
593 0 : ulong exist_slot = ele->phdr.info.ul[ 1 ];
594 0 : if( exist_slot > result->account_batch.slot ) {
595 0 : batch_ele[ i ] = NULL;
596 0 : continue;
597 0 : }
598 0 : }
599 :
600 0 : ulong val_sz = sizeof(fd_account_meta_t) + data_len;
601 0 : FD_CRIT( val_sz<=FD_VINYL_VAL_MAX, "corruption detected" );
602 :
603 0 : fd_vinyl_bstream_phdr_t * phdr = &ele->phdr;
604 0 : phdr->ctl = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR, FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz );
605 0 : phdr->key = *key;
606 0 : phdr->info.val_sz = (uint)val_sz;
607 0 : phdr->info.ul[1] = result->account_batch.slot;
608 :
609 0 : ele->memo = memo[ i ];
610 0 : ele->phdr.ctl = phdr->ctl;
611 0 : ele->phdr.key = *key;
612 0 : ele->phdr.info = phdr->info;
613 0 : ele->seq = ULONG_MAX; /* later init */
614 0 : ele->line_idx = ULONG_MAX;
615 0 : }
616 :
617 : /* Write out to bstream */
618 :
619 0 : fd_vinyl_io_t * io = ctx->vinyl.io;
620 0 : for( ulong i=0UL; i<FD_SSPARSE_ACC_BATCH_MAX; i++ ) {
621 0 : fd_vinyl_meta_ele_t * ele = batch_ele[ i ];
622 0 : if( FD_UNLIKELY( !ele ) ) continue;
623 :
624 0 : uchar const * frame = result->account_batch.batch[ i ];
625 0 : ulong const data_len = fd_ulong_load_8_fast( frame+0x08UL );
626 0 : ulong lamports = fd_ulong_load_8_fast( frame+0x30UL );
627 0 : uchar owner[32]; memcpy( owner, frame+0x40UL, 32UL );
628 0 : _Bool executable = !!frame[ 0x60UL ];
629 :
630 0 : ulong val_sz = sizeof(fd_account_meta_t) + data_len;
631 0 : FD_CRIT( val_sz<=FD_VINYL_VAL_MAX, "corruption detected" );
632 :
633 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_sz );
634 0 : uchar * pair = fd_vinyl_io_wd_alloc_fast( io, pair_sz );
635 0 : if( FD_UNLIKELY( !pair ) ) {
636 0 : pair = fd_vinyl_io_wd_alloc( io, pair_sz, FD_VINYL_IO_FLAG_BLOCKING );
637 0 : }
638 :
639 0 : uchar * dst = pair;
640 0 : ulong dst_rem = pair_sz;
641 :
642 0 : FD_CRIT( dst_rem >= sizeof(fd_vinyl_bstream_phdr_t), "corruption detected" );
643 0 : fd_vinyl_bstream_phdr_t * phdr = (fd_vinyl_bstream_phdr_t *)dst;
644 0 : *phdr = ele->phdr;
645 :
646 0 : dst += sizeof(fd_vinyl_bstream_phdr_t);
647 0 : dst_rem -= sizeof(fd_vinyl_bstream_phdr_t);
648 :
649 0 : FD_CRIT( dst_rem >= sizeof(fd_account_meta_t), "corruption detected" );
650 0 : fd_account_meta_t * meta = (fd_account_meta_t *)dst;
651 0 : memset( meta, 0, sizeof(fd_account_meta_t) ); /* bulk zero */
652 0 : memcpy( meta->owner, owner, sizeof(fd_pubkey_t) );
653 0 : meta->lamports = lamports;
654 0 : meta->slot = result->account_batch.slot;
655 0 : meta->dlen = (uint)data_len;
656 0 : meta->executable = !!executable;
657 :
658 0 : dst += sizeof(fd_account_meta_t);
659 0 : dst_rem -= sizeof(fd_account_meta_t);
660 :
661 0 : FD_CRIT( dst_rem >= data_len, "corruption detected" );
662 0 : fd_memcpy( dst, frame+0x88UL, data_len );
663 :
664 0 : dst += data_len;
665 0 : dst_rem -= data_len;
666 :
667 0 : ulong seq_after = fd_vinyl_io_append( io, pair, pair_sz );
668 0 : ele->seq = seq_after;
669 :
670 0 : ctx->metrics.accounts_inserted++;
671 0 : }
672 0 : }
673 :
674 : void
675 0 : fd_snapin_vinyl_shutdown( fd_snapin_tile_t * ctx ) {
676 0 : int commit_err = fd_vinyl_io_commit( ctx->vinyl.io, FD_VINYL_IO_FLAG_BLOCKING );
677 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit(io) failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
678 0 : int sync_err = fd_vinyl_io_sync( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
679 0 : if( FD_UNLIKELY( sync_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_sync(io_mm) failed (%i-%s)", sync_err, fd_vinyl_strerror( sync_err ) ));
680 0 : vinyl_mm_sync( ctx );
681 :
682 0 : fd_vinyl_io_wd_ctrl( ctx->vinyl.io_wd, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL );
683 0 : }
684 :
685 : void
686 : fd_snapin_read_account_vinyl( fd_snapin_tile_t * ctx,
687 : void const * acct_addr,
688 : fd_account_meta_t * meta,
689 : uchar * data,
690 0 : ulong data_max ) {
691 0 : if( FD_UNLIKELY( ctx->vinyl.io!=ctx->vinyl.io_mm ) ) {
692 0 : FD_LOG_CRIT(( "vinyl not in io_mm mode" ));
693 0 : }
694 :
695 0 : memset( meta, 0, sizeof(fd_account_meta_t) );
696 :
697 : /* Query database index */
698 :
699 0 : fd_vinyl_key_t key[1];
700 0 : fd_vinyl_key_init( key, acct_addr, 32UL );
701 0 : ulong memo = fd_vinyl_key_memo( ctx->vinyl.map->seed, key );
702 0 : fd_vinyl_meta_ele_t const * ele = fd_vinyl_meta_prepare_nolock( ctx->vinyl.map, key, memo );
703 0 : if( FD_UNLIKELY( !ele || !fd_vinyl_meta_ele_in_use( ele ) ) ) {
704 : /* account not found */
705 0 : return;
706 0 : }
707 :
708 0 : uchar * mmio = fd_vinyl_mmio ( ctx->vinyl.io_mm );
709 0 : ulong mmio_sz = fd_vinyl_mmio_sz( ctx->vinyl.io_mm );
710 :
711 : /* Validate index record */
712 :
713 0 : ulong const seq0 = ele->seq;
714 0 : ulong const ctl = ele->phdr.ctl;
715 0 : int const ctl_type = fd_vinyl_bstream_ctl_type( ctl );
716 0 : ulong const val_esz = fd_vinyl_bstream_ctl_sz ( ctl );
717 0 : ulong const pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
718 0 : ulong const seq1 = seq0 + pair_sz;
719 0 : ulong const seq_past = ctx->vinyl.io->seq_past;
720 0 : ulong const seq_present = ctx->vinyl.io->seq_present;
721 0 : if( FD_UNLIKELY( ctl_type!=FD_VINYL_BSTREAM_CTL_TYPE_PAIR ) ) {
722 0 : FD_LOG_CRIT(( "corrupt bstream record in index: ctl=%016lx", ctl ));
723 0 : }
724 0 : if( FD_UNLIKELY( val_esz<sizeof(fd_account_meta_t) ||
725 0 : val_esz>sizeof(fd_account_meta_t)+FD_RUNTIME_ACC_SZ_MAX ) ) {
726 0 : FD_LOG_CRIT(( "corrupt bstream record in index: val_esz=%lu", val_esz ));
727 0 : }
728 0 : int bad_past = !(fd_vinyl_seq_le( seq_past, seq0 ) & fd_vinyl_seq_lt( seq0, seq1 ) & fd_vinyl_seq_le( seq1, seq_present ));
729 0 : if( FD_UNLIKELY( bad_past ) ) {
730 0 : FD_LOG_CRIT(( "corrupt bstream record in index: seq[%lu,%lu) not in [seq_past=%lu,seq_present=%lu)",
731 0 : seq0, seq1, seq_past, seq_present ));
732 0 : }
733 :
734 : /* Map seq range to underlying device
735 : In the snapshot loader, it is safe to assume that bstream reads
736 : do not wrap around. */
737 :
738 0 : if( FD_UNLIKELY( seq1>mmio_sz ) ) {
739 0 : FD_LOG_CRIT(( "corrupt bstream record in index: seq[%lu,%lu) exceeds bstream addressable range [0,%lu)",
740 0 : seq0, seq1, mmio_sz ));
741 0 : }
742 :
743 : /* Read from bstream */
744 :
745 0 : ulong seq_meta = seq0 + sizeof(fd_vinyl_bstream_phdr_t);
746 0 : ulong seq_data = seq_meta + sizeof(fd_account_meta_t);
747 :
748 0 : memcpy( meta, mmio+seq_meta, sizeof(fd_account_meta_t) );
749 0 : if( FD_UNLIKELY( sizeof(fd_account_meta_t)+(ulong)meta->dlen > val_esz ) ) {
750 0 : FD_LOG_CRIT(( "corrupt bstream record: seq0=%lu val_esz=%lu dlen=%u", seq0, val_esz, meta->dlen ));
751 0 : }
752 0 : if( FD_UNLIKELY( meta->dlen > data_max ) ) {
753 0 : FD_BASE58_ENCODE_32_BYTES( acct_addr, acct_addr_b58 );
754 0 : FD_LOG_WARNING(( "failed to read account %s: account data size (%lu bytes) exceeds buffer size (%lu bytes)",
755 0 : acct_addr_b58, (ulong)meta->dlen, data_max ));
756 0 : }
757 0 : memcpy( data, mmio+seq_data, meta->dlen );
758 0 : }
|