Line data Source code
1 : #define _DEFAULT_SOURCE /* madvise */
2 : #include "fd_snapwm_tile_private.h"
3 : #include "utils/fd_ssctrl.h"
4 : #include "utils/fd_ssparse.h"
5 : #include "utils/fd_vinyl_io_wd.h"
6 : #include "../../ballet/lthash/fd_lthash.h"
7 : #include "../../ballet/lthash/fd_lthash_adder.h"
8 : #include "../../util/pod/fd_pod.h"
9 :
10 : #include <errno.h>
11 : #include <fcntl.h> /* open */
12 : #include <sys/mman.h> /* mmap, madvise */
13 : #include <sys/stat.h> /* fstat */
14 : #include <unistd.h> /* close */
15 :
16 : #include "generated/fd_snapwm_tile_vinyl_seccomp.h"
17 :
18 : FD_STATIC_ASSERT( WD_WR_FSEQ_CNT_MAX<=FD_TOPO_MAX_TILE_IN_LINKS, "WD_WR_FSEQ_CNT_MAX" );
19 :
20 : /**********************************************************************\
21 :
22 : Vinyl 101:
23 : - Vinyl is Firedancer's main account database
24 : - Vinyl is comprised of several components on-disk and in-memory
25 : - vinyl_bstream is a single file containing all vinyl records
26 : - vinyl_bstream is the source of truth
27 : - vinyl_meta indexes the latest revisions of all elements in
28 : vinyl_bstream
29 : - Vinyl has an in-memory caching layer, but snapwm does not use it
30 :
31 : The snapshot loader must:
32 : - Load the most recent version of each account into bstream
33 : - Create a full vinyl_meta index of accounts
34 : - Recover from load failures and retry
35 :
36 : Note on I/O layers:
37 : - io_mm is the slow/generic memory mapped I/O backend.
38 : - io_wd is the fast/dumb O_DIRECT backend. Can only append, thus used
39 : for hot path account writing.
40 : - io_mm and io_wd cannot be active at the same time -- snapwm will
41 : switch between them as necessary.
42 :
43 : Full snapshot logic:
44 : - Write accounts to bstream (io_wd)
45 : - Synchronously populate the vinyl_meta index while writing
46 : - On load failure, destroy and recreate the bstream (io_mm)
47 :
48 : Incremental snapshot logic:
49 : - Phase 1: while reading the incremental snapshot
50 : - Write accounts to bstream without updating the index (io_wd)
51 : - On load failure, undo writes done to bstream (io_mm)
52 : - Phase 2: once read is done
53 : - Replay all elements written to bstream (io_mm)
54 : - Populate the vinyl_meta index while replaying
55 :
56 : \**********************************************************************/
57 :
58 : void
59 : fd_snapwm_vinyl_privileged_init( fd_snapwm_tile_t * ctx,
60 : fd_topo_t * topo,
61 0 : fd_topo_tile_t * tile ) {
62 0 : void * shmap = fd_topo_obj_laddr( topo, tile->snapwm.vinyl_meta_map_obj_id );
63 0 : void * shele = fd_topo_obj_laddr( topo, tile->snapwm.vinyl_meta_pool_obj_id );
64 :
65 0 : FD_TEST( fd_vinyl_meta_join( ctx->vinyl.map, shmap, shele ) );
66 :
67 : /* Set up io_mm dependencies */
68 :
69 0 : char const * bstream_path = tile->snapwm.vinyl_path;
70 0 : int bstream_fd = open( bstream_path, O_RDWR|O_CLOEXEC, 0644 );
71 0 : if( FD_UNLIKELY( bstream_fd<0 ) ) {
72 0 : FD_LOG_ERR(( "open(%s,O_RDWR|O_CLOEXEC,0644) failed (%i-%s)",
73 0 : bstream_path, errno, fd_io_strerror( errno ) ));
74 0 : }
75 :
76 0 : struct stat st;
77 0 : if( FD_UNLIKELY( fstat( bstream_fd, &st )!=0 ) ) {
78 0 : FD_LOG_ERR(( "fstat(%s) failed (%i-%s)",
79 0 : bstream_path, errno, fd_io_strerror( errno ) ));
80 0 : }
81 0 : ulong bstream_sz = (ulong)st.st_size;
82 0 : if( FD_UNLIKELY( !fd_ulong_is_aligned( bstream_sz, FD_VINYL_BSTREAM_BLOCK_SZ ) ) ) {
83 0 : FD_LOG_ERR(( "vinyl file %s has misaligned size (%lu bytes)", bstream_path, bstream_sz ));
84 0 : }
85 :
86 0 : void * bstream_mem = mmap( NULL, bstream_sz, PROT_READ|PROT_WRITE, MAP_SHARED, bstream_fd, 0 );
87 0 : if( FD_UNLIKELY( bstream_mem==MAP_FAILED ) ) {
88 0 : FD_LOG_ERR(( "mmap(sz=%lu,PROT_READ|PROT_WRITE,MAP_SHARED,path=%s,off=0) failed (%i-%s)",
89 0 : bstream_sz, bstream_path, errno, fd_io_strerror( errno ) ));
90 0 : }
91 :
92 0 : if( FD_UNLIKELY( 0!=close( bstream_fd ) ) ) { /* clean up unused fd */
93 0 : FD_LOG_ERR(( "close(fd=%i) failed (%i-%s)",
94 0 : bstream_fd, errno, fd_io_strerror( errno ) ));
95 0 : }
96 :
97 0 : ctx->vinyl.bstream_mem = bstream_mem;
98 0 : ctx->vinyl.bstream_sz = bstream_sz;
99 :
100 0 : FD_TEST( fd_rng_secure( &ctx->vinyl.io_seed, 8UL ) );
101 0 : }
102 :
103 : static void
104 0 : io_mm_align_4k( fd_snapwm_tile_t * ctx ) {
105 0 : fd_vinyl_io_t * io_mm = ctx->vinyl.io_mm;
106 0 : if( FD_UNLIKELY( io_mm->seq_future!=0UL ) ) {
107 0 : FD_LOG_CRIT(( "unexpected io_mm state (seq_future=%lu)", io_mm->seq_future ));
108 0 : }
109 0 : uchar * mmio = fd_vinyl_mmio ( io_mm );
110 0 : ulong mmio_sz = fd_vinyl_mmio_sz( io_mm );
111 :
112 0 : ulong bstream_preamble = fd_ulong_align_up( FD_VINYL_BSTREAM_BLOCK_SZ, 4096UL ) - FD_VINYL_BSTREAM_BLOCK_SZ;
113 0 : FD_CRIT( bstream_preamble<=mmio_sz, "bstream too small for 4k alignment" );
114 :
115 0 : fd_memset( mmio, 0, bstream_preamble );
116 0 : io_mm->seq_present += bstream_preamble;
117 0 : io_mm->seq_future += bstream_preamble;
118 0 : }
119 :
120 : void
121 : fd_snapwm_vinyl_unprivileged_init( fd_snapwm_tile_t * ctx,
122 : fd_topo_t * topo,
123 : fd_topo_tile_t * tile,
124 : void * io_mm_mem,
125 0 : void * io_wd_mem ) {
126 :
127 : /* Set up io_mm */
128 :
129 0 : ctx->vinyl.io_mm =
130 0 : fd_vinyl_io_mm_init( io_mm_mem,
131 0 : FD_SNAPWM_IO_SPAD_MAX,
132 0 : ctx->vinyl.bstream_mem,
133 0 : ctx->vinyl.bstream_sz,
134 0 : 1,
135 0 : "accounts-v0", 12UL,
136 0 : ctx->vinyl.io_seed );
137 0 : if( FD_UNLIKELY( !ctx->vinyl.io_mm ) ) {
138 0 : FD_LOG_ERR(( "fd_vinyl_io_mm_init failed" ));
139 0 : }
140 :
141 : /* Write out zero blocks to align the bstream by 4096 bytes
142 : (Assuming a 128 byte sync block) */
143 :
144 0 : io_mm_align_4k( ctx );
145 :
146 : /* Set up io_wd dependencies */
147 :
148 0 : ulong wr_link_id = fd_topo_find_tile_out_link( topo, tile, "snapwm_wh", 0UL );
149 0 : if( FD_UNLIKELY( wr_link_id==ULONG_MAX ) ) FD_LOG_CRIT(( "snapwm_wh link not found" ));
150 0 : fd_topo_link_t * wr_link = &topo->links[ tile->out_link_id[ wr_link_id ] ];
151 :
152 0 : if( FD_UNLIKELY( tile->snapwm.snapwr_depth != fd_mcache_depth( wr_link->mcache ) ) ) {
153 : /* FIXME TOCTOU issue ... A malicious downstream tile could
154 : theoretically corrupt mcache->depth and cause an OOB access
155 : while snapwm is still initializing. Practically not an
156 : issue because the system is not exposed to attacker-
157 : controlled input at boot time. */
158 0 : FD_LOG_CRIT(( "snapwm_wr link mcache depth %lu does not match snapwr_depth %lu",
159 0 : fd_mcache_depth( wr_link->mcache ), tile->snapwm.snapwr_depth ));
160 0 : }
161 :
162 0 : ulong expected_wr_link_consumers_cnt = fd_topo_tile_name_cnt( topo, "snapwh" );
163 0 : if( FD_UNLIKELY( fd_topo_link_reliable_consumer_cnt( topo, wr_link )!=expected_wr_link_consumers_cnt ) ) {
164 0 : FD_LOG_CRIT(( "snapwm_wr link must have exactly %lu reliable consumers", expected_wr_link_consumers_cnt ));
165 0 : }
166 :
167 0 : ulong const * wh_fseq[WD_WR_FSEQ_CNT_MAX];
168 0 : ulong wh_fseq_cnt = 0UL;
169 0 : ulong wh_fseq_cnt_expected = fd_topo_tile_name_cnt( topo, "snapwh" );
170 0 : FD_TEST( wh_fseq_cnt_expected<=WD_WR_FSEQ_CNT_MAX );
171 0 : FD_TEST( wh_fseq_cnt_expected==fd_topo_link_reliable_consumer_cnt( topo, wr_link ) );
172 0 : for( ulong tile_idx=0UL; tile_idx<topo->tile_cnt; tile_idx++ ) {
173 0 : fd_topo_tile_t const * consumer_tile = &topo->tiles[ tile_idx ];
174 0 : for( ulong in_idx=0UL; in_idx<consumer_tile->in_cnt; in_idx++ ) {
175 0 : if( consumer_tile->in_link_id[ in_idx ]==wr_link->id ) {
176 0 : FD_TEST( wh_fseq_cnt<WD_WR_FSEQ_CNT_MAX );
177 0 : wh_fseq[ wh_fseq_cnt ] = consumer_tile->in_link_fseq[ in_idx ];
178 0 : wh_fseq_cnt++;
179 0 : }
180 0 : }
181 0 : }
182 0 : if( FD_UNLIKELY( wh_fseq_cnt!=wh_fseq_cnt_expected ) ) {
183 0 : FD_LOG_ERR(( "unable to find %lu fseq(s) for output link %s:%lu",
184 0 : wh_fseq_cnt, wr_link->name, wr_link->kind_id ));
185 0 : }
186 :
187 : /* Set up io_wd */
188 :
189 0 : ctx->vinyl.io_wd =
190 0 : fd_vinyl_io_wd_init( io_wd_mem,
191 0 : ctx->vinyl.bstream_sz,
192 0 : ctx->vinyl.io_mm->seed,
193 0 : wr_link->mcache,
194 0 : wr_link->dcache,
195 0 : wh_fseq,
196 0 : wh_fseq_cnt,
197 0 : wr_link->mtu,
198 0 : tile->snapwm.vinyl_path );
199 0 : if( FD_UNLIKELY( !ctx->vinyl.io_wd ) ) {
200 0 : FD_LOG_ERR(( "fd_vinyl_io_wd_init failed" ));
201 0 : }
202 :
203 : /* Start by using io_mm */
204 :
205 0 : ctx->vinyl.io = ctx->vinyl.io_mm;
206 :
207 0 : ctx->vinyl.duplicate_accounts_batch_sz = 0UL;
208 0 : ctx->vinyl.duplicate_accounts_batch_cnt = 0UL;
209 :
210 0 : ctx->vinyl.pair_cnt = 0UL;
211 0 : ctx->vinyl.pair_cnt_max = tile->snapwm.max_accounts;
212 :
213 0 : fd_lthash_adder_new( &ctx->vinyl.adder );
214 0 : fd_lthash_zero( &ctx->vinyl.running_lthash );
215 :
216 0 : ulong wr_cnt = fd_topo_tile_name_cnt( topo, "snapwr" );
217 0 : ctx->vinyl.wr_cnt = wr_cnt;
218 :
219 0 : ctx->vinyl.admin = NULL;
220 0 : if( FD_LIKELY( !ctx->lthash_disabled ) ) {
221 0 : ulong vinyl_admin_obj_id = fd_pod_query_ulong( topo->props, "vinyl_admin", ULONG_MAX );
222 0 : FD_TEST( vinyl_admin_obj_id!=ULONG_MAX );
223 0 : fd_vinyl_admin_t * vinyl_admin = fd_vinyl_admin_join( fd_topo_obj_laddr( topo, vinyl_admin_obj_id ) );
224 0 : FD_TEST( vinyl_admin );
225 0 : ctx->vinyl.admin = vinyl_admin;
226 :
227 : /* There is no need for rw_lock here, since every other consumer
228 : is waiting for the completion of this initialization step and
229 : this can be done without a lock. */
230 0 : FD_TEST( fd_snapwm_vinyl_init_admin( ctx, 0/*do_rwlock*/ ) );
231 0 : }
232 :
233 0 : ctx->vinyl.txn_active = 0;
234 0 : ctx->vinyl.txn_commit = 0;
235 0 : }
236 :
237 : ulong
238 : fd_snapwm_vinyl_seccomp( ulong out_cnt,
239 0 : struct sock_filter * out ) {
240 0 : populate_sock_filter_policy_fd_snapwm_tile_vinyl( out_cnt, out, (uint)fd_log_private_logfile_fd() );
241 0 : return sock_filter_policy_fd_snapwm_tile_vinyl_instr_cnt;
242 0 : }
243 :
244 : static void
245 0 : vinyl_mm_sync( fd_snapwm_tile_t * ctx ) {
246 0 : if( FD_UNLIKELY( 0!=msync( ctx->vinyl.bstream_mem, ctx->vinyl.bstream_sz, MS_SYNC|MS_INVALIDATE ) ) ) {
247 0 : FD_LOG_ERR(( "msync(addr=%p,sz=%lu,MS_SYNC|MS_INVALIDATE) failed (%i-%s)",
248 0 : (void *)ctx->vinyl.bstream_mem, ctx->vinyl.bstream_sz,
249 0 : errno, fd_io_strerror( errno ) ));
250 0 : }
251 0 : }
252 :
253 : /* Faster vinyl meta accesses *****************************************/
254 :
255 : static fd_vinyl_meta_ele_t *
256 : fd_vinyl_meta_prepare_nolock( fd_vinyl_meta_t * join,
257 : fd_vinyl_key_t const * key,
258 0 : ulong memo ) {
259 0 : fd_vinyl_meta_ele_t * ele0 = join->ele;
260 0 : ulong ele_max = join->ele_max;
261 0 : ulong probe_max = join->probe_max;
262 0 : void * ctx = join->ctx;
263 :
264 0 : ulong start_idx = memo & (ele_max-1UL);
265 :
266 0 : for(;;) {
267 :
268 0 : ulong ele_idx = start_idx;
269 :
270 0 : for( ulong probe_rem=probe_max; probe_rem; probe_rem-- ) {
271 0 : fd_vinyl_meta_ele_t * ele = ele0 + ele_idx;
272 :
273 0 : if( FD_LIKELY( fd_vinyl_meta_private_ele_is_free( ctx, ele ) ) || /* opt for low collision */
274 0 : (
275 0 : FD_LIKELY( ele->memo==memo ) &&
276 0 : FD_LIKELY( fd_vinyl_key_eq( &ele->phdr.key, key ) ) /* opt for already in map */
277 0 : ) ) {
278 0 : return ele;
279 0 : }
280 :
281 0 : ele_idx = (ele_idx+1UL) & (ele_max-1UL);
282 0 : }
283 :
284 0 : return NULL;
285 :
286 0 : }
287 :
288 : /* never get here */
289 0 : }
290 :
291 : /* Transactional APIs *************************************************/
292 :
293 : void
294 0 : fd_snapwm_vinyl_txn_begin( fd_snapwm_tile_t * ctx ) {
295 0 : FD_CRIT( !ctx->vinyl.txn_active, "txn_begin called while already in txn" );
296 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
297 0 : fd_vinyl_io_t * io = ctx->vinyl.io_mm;
298 :
299 0 : if( FD_UNLIKELY( ctx->vinyl.txn_commit ) ) {
300 0 : FD_LOG_CRIT(( "unable to perform txn_begin after a completed txn_commit" ));
301 0 : return;
302 0 : }
303 :
304 0 : if( FD_UNLIKELY( !ctx->lthash_disabled ) ) {
305 0 : FD_LOG_CRIT(( "vinyl txn cannot be initialized when lthash verification is enabled" ));
306 0 : return;
307 0 : }
308 :
309 : /* Finish any outstanding writes */
310 0 : int commit_err = fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
311 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
312 :
313 0 : ctx->vinyl.txn_seq = io->seq_present;
314 0 : ctx->vinyl.txn_active = 1;
315 0 : }
316 :
317 : void
318 0 : fd_snapwm_vinyl_txn_commit( fd_snapwm_tile_t * ctx ) {
319 0 : FD_CRIT( ctx->vinyl.txn_active, "txn_commit called while not in txn" );
320 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
321 0 : fd_vinyl_io_t * io = ctx->vinyl.io_mm;
322 :
323 0 : long dt = -fd_log_wallclock();
324 :
325 : /* Finish any outstanding writes */
326 :
327 0 : int commit_err = fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
328 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
329 :
330 : /* Zero out txn_active before any return path. */
331 0 : ctx->vinyl.txn_active = 0;
332 :
333 : /* Hint to kernel to start prefetching to speed up reads */
334 :
335 0 : uchar * mmio = fd_vinyl_mmio ( io ); FD_TEST( mmio );
336 0 : ulong mmio_sz = fd_vinyl_mmio_sz( io );
337 :
338 0 : ulong txn_seq0 = ctx->vinyl.txn_seq;
339 0 : ulong txn_seq1 = ctx->vinyl.io_mm->seq_present;
340 0 : FD_LOG_INFO(( "vinyl txn_commit starting for seq [%lu,%lu)", txn_seq0, txn_seq1 ));
341 0 : ulong txn_sz = txn_seq1-txn_seq0;
342 0 : FD_CRIT( fd_vinyl_seq_le( txn_seq0, txn_seq1 ), "invalid txn seq range" );
343 0 : FD_CRIT( txn_seq1 <= mmio_sz, "invalid txn seq range" );
344 0 : if( FD_UNLIKELY( fd_vinyl_seq_eq( txn_seq0, txn_seq1 ) ) ) return;
345 :
346 0 : void * madv_base = (void *)fd_ulong_align_dn( (ulong)mmio+txn_seq0, FD_SHMEM_NORMAL_PAGE_SZ );
347 0 : ulong madv_sz = /* */fd_ulong_align_up( txn_sz, FD_SHMEM_NORMAL_PAGE_SZ );
348 0 : if( FD_UNLIKELY( madvise( madv_base, madv_sz, MADV_SEQUENTIAL ) ) ) {
349 0 : FD_LOG_WARNING(( "madvise(addr=%p,sz=%lu,MADV_SEQUENTIAL) failed (%i-%s)",
350 0 : madv_base, madv_sz,
351 0 : errno, fd_io_strerror( errno ) ));
352 0 : }
353 :
354 : /* Replay incremental account updates */
355 :
356 0 : fd_vinyl_meta_t * meta_map = ctx->vinyl.map;
357 0 : for( ulong seq=txn_seq0; fd_vinyl_seq_lt( seq, txn_seq1 ); ) {
358 0 : fd_vinyl_bstream_block_t * block = (void *)( mmio+seq );
359 :
360 : /* Speculatively read block info */
361 0 : ulong ctl = FD_VOLATILE_CONST( block->ctl );
362 0 : fd_vinyl_bstream_phdr_t phdr = FD_VOLATILE_CONST( block->phdr );
363 :
364 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz ( ctl );
365 0 : int block_type = fd_vinyl_bstream_ctl_type( ctl );
366 0 : ulong block_sz;
367 :
368 0 : if( FD_LIKELY( block_type==FD_VINYL_BSTREAM_CTL_TYPE_PAIR ) ) {
369 0 : block_sz = fd_vinyl_bstream_pair_sz( val_esz );
370 0 : ulong memo = fd_vinyl_key_memo( meta_map->seed, &phdr.key );
371 0 : fd_vinyl_meta_ele_t * ele = fd_vinyl_meta_prepare_nolock( meta_map, &phdr.key, memo );
372 0 : if( FD_UNLIKELY( !ele ) ) FD_LOG_CRIT(( "fd_vinyl_meta_prepare failed (full)" ));
373 :
374 : /* Erase value if existing is newer */
375 0 : if( FD_UNLIKELY( fd_vinyl_meta_ele_in_use( ele ) ) ) { /* key exists */
376 0 : ulong exist_slot = fd_snapin_vinyl_pair_info_slot( &ele->phdr.info );
377 0 : ulong cur_slot = fd_snapin_vinyl_pair_info_slot( &phdr.info );
378 0 : if( exist_slot > cur_slot ) {
379 0 : ctx->metrics.accounts_ignored++;
380 0 : fd_memset( block, 0, block_sz );
381 0 : goto next;
382 0 : }
383 0 : ctx->metrics.accounts_replaced++;
384 0 : } else {
385 0 : if( FD_UNLIKELY( ctx->vinyl.pair_cnt++ > ctx->vinyl.pair_cnt_max ) ) {
386 0 : FD_LOG_ERR(( "failed to load snapshot: exceeded [accounts.max_accounts] (%lu)", ctx->vinyl.pair_cnt_max ));
387 0 : }
388 0 : }
389 :
390 : /* Overwrite map entry */
391 0 : ele->memo = memo;
392 0 : ele->phdr = phdr;
393 0 : ele->seq = seq;
394 0 : ele->line_idx = ULONG_MAX;
395 0 : } else if( block_type==FD_VINYL_BSTREAM_CTL_TYPE_ZPAD ) {
396 0 : block_sz = FD_VINYL_BSTREAM_BLOCK_SZ;
397 0 : } else {
398 0 : FD_LOG_CRIT(( "unexpected block type %d", block_type ));
399 0 : }
400 :
401 0 : if( FD_UNLIKELY( !block_sz ) ) {
402 0 : FD_LOG_CRIT(( "Invalid block header at vinyl seq %lu, ctl=%016lx (zero block_sz)", seq, ctl ));
403 0 : }
404 0 : if( FD_UNLIKELY( block_sz > 64UL<<20 ) ) {
405 0 : FD_LOG_CRIT(( "Invalid block header at vinyl seq %lu, ctl=%016lx, block_sz=%lu (unreasonably large block size)", seq, ctl, block_sz ));
406 0 : }
407 :
408 0 : next:
409 0 : seq += block_sz;
410 0 : }
411 :
412 : /* Persist above erases to disk */
413 :
414 0 : int sync_err = fd_vinyl_io_sync( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
415 0 : if( FD_UNLIKELY( sync_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_sync(io_mm) failed (%i-%s)", sync_err, fd_vinyl_strerror( sync_err ) ));
416 0 : vinyl_mm_sync( ctx );
417 :
418 0 : ctx->vinyl.txn_commit = 1;
419 :
420 0 : dt += fd_log_wallclock();
421 0 : FD_LOG_INFO(( "vinyl txn_commit took %g seconds", (double)dt/1e9 ));
422 0 : }
423 :
424 : void
425 0 : fd_snapwm_vinyl_txn_cancel( fd_snapwm_tile_t * ctx ) {
426 0 : FD_CRIT( ctx->vinyl.txn_active, "txn_cancel called while not in txn" );
427 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
428 :
429 0 : fd_vinyl_io_t * io = ctx->vinyl.io_mm;
430 0 : fd_vinyl_io_rewind( io, ctx->vinyl.txn_seq );
431 0 : fd_vinyl_io_sync ( io, FD_VINYL_IO_FLAG_BLOCKING );
432 0 : ctx->vinyl.txn_active = 0;
433 0 : }
434 :
435 : /* Fast writer ********************************************************/
436 :
437 : void
438 0 : fd_snapwm_vinyl_wd_init( fd_snapwm_tile_t * ctx ) {
439 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
440 :
441 0 : int commit_err = fd_vinyl_io_commit( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
442 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit(io_mm) failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
443 :
444 : /* Flush io_mm */
445 :
446 0 : int sync_err = fd_vinyl_io_sync( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
447 0 : if( FD_UNLIKELY( sync_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_sync(io_mm) failed (%i-%s)", sync_err, fd_vinyl_strerror( sync_err ) ));
448 0 : vinyl_mm_sync( ctx );
449 :
450 : /* Synchronize sequence numbers */
451 :
452 0 : ctx->vinyl.io_wd->seq_ancient = ctx->vinyl.io_mm->seq_ancient;
453 0 : ctx->vinyl.io_wd->seq_past = ctx->vinyl.io_mm->seq_past;
454 0 : ctx->vinyl.io_wd->seq_present = ctx->vinyl.io_mm->seq_present;
455 0 : ctx->vinyl.io_wd->seq_future = ctx->vinyl.io_mm->seq_future;
456 0 : ctx->vinyl.io_wd->spad_used = 0UL;
457 :
458 0 : ctx->vinyl.io = ctx->vinyl.io_wd;
459 0 : }
460 :
461 : void
462 0 : fd_snapwm_vinyl_wd_fini( fd_snapwm_tile_t * ctx ) {
463 0 : if( FD_UNLIKELY( ctx->vinyl.io!=ctx->vinyl.io_wd ) ) return;
464 :
465 0 : int commit_err = fd_vinyl_io_commit( ctx->vinyl.io_wd, FD_VINYL_IO_FLAG_BLOCKING );
466 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit(io_wd) failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
467 :
468 : /* Synchronize sequence numbers */
469 :
470 0 : ctx->vinyl.io_mm->seq_ancient = ctx->vinyl.io_wd->seq_ancient;
471 0 : ctx->vinyl.io_mm->seq_past = ctx->vinyl.io_wd->seq_past;
472 0 : ctx->vinyl.io_mm->seq_present = ctx->vinyl.io_wd->seq_present;
473 0 : ctx->vinyl.io_mm->seq_future = ctx->vinyl.io_wd->seq_future;
474 0 : ctx->vinyl.io_mm->spad_used = 0UL;
475 :
476 0 : ctx->vinyl.io = ctx->vinyl.io_mm;
477 :
478 0 : vinyl_mm_sync( ctx );
479 0 : }
480 :
481 : /* bstream_alloc is a faster version of fd_vinyl_io_alloc. Indirect
482 : calls have significant overhead on Zen 5. */
483 :
484 : static uchar *
485 : bstream_alloc( fd_vinyl_io_t * io,
486 : ulong sz,
487 0 : int flags ) {
488 0 : if( FD_LIKELY( io->impl==&fd_vinyl_io_wd_impl ) )
489 0 : return fd_vinyl_io_wd_alloc( io, sz, flags );
490 0 : return fd_vinyl_io_alloc( io, sz, flags );
491 0 : }
492 :
493 : /* fd_snapwm_vinyl_process_account reads and processes a batch of
494 : pre-generated bstream pairs, handles the meta_map, and determines
495 : whether to forward each of the accounts (pairs) to the database. */
496 :
497 : void
498 : fd_snapwm_vinyl_process_account( fd_snapwm_tile_t * ctx,
499 : ulong chunk,
500 : ulong acc_cnt,
501 0 : fd_stem_context_t * stem ) {
502 0 : fd_vinyl_io_t * io = ctx->vinyl.io;
503 0 : fd_vinyl_meta_t * map = ctx->vinyl.map;
504 :
505 0 : uchar * src = fd_chunk_to_laddr( ctx->in.wksp, chunk );
506 :
507 0 : fd_snapwm_vinyl_duplicate_accounts_batch_init( ctx, stem );
508 :
509 0 : for( ulong acc_i=0UL; acc_i<acc_cnt; acc_i++ ) {
510 :
511 0 : fd_vinyl_bstream_phdr_t * src_phdr = (fd_vinyl_bstream_phdr_t*)src;
512 : /* phdr's recovery_seq may need to be updated, and this cannot
513 : happen on the src dcache. */
514 0 : fd_vinyl_bstream_phdr_t phdr = *src_phdr;
515 :
516 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz ( phdr.ctl );
517 :
518 0 : ulong pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
519 0 : uchar * pair = bstream_alloc( io, pair_sz, FD_VINYL_IO_FLAG_BLOCKING );
520 0 : uchar * dst = pair;
521 :
522 0 : ulong const account_header_slot = fd_snapin_vinyl_pair_info_slot( &phdr.info );
523 :
524 0 : ctx->metrics.accounts_loaded++;
525 :
526 0 : int do_meta_update = ctx->full || !ctx->lthash_disabled;
527 :
528 0 : ulong recovery_seq = 0UL;
529 :
530 0 : fd_vinyl_meta_ele_t * ele = NULL;
531 0 : if( FD_LIKELY( do_meta_update ) ) {
532 0 : ulong memo = fd_vinyl_key_memo( map->seed, &phdr.key );
533 0 : ele = fd_vinyl_meta_prepare_nolock( map, &phdr.key, memo );
534 0 : if( FD_UNLIKELY( !ele ) ) FD_LOG_CRIT(( "Failed to update vinyl index (full)" ));
535 :
536 0 : if( FD_UNLIKELY( fd_vinyl_meta_ele_in_use( ele ) ) ) {
537 : /* Drop current value if existing is newer */
538 0 : ulong const exist_slot = fd_snapin_vinyl_pair_info_slot( &ele->phdr.info );
539 0 : if( FD_UNLIKELY( exist_slot > account_header_slot ) ) {
540 0 : ctx->metrics.accounts_ignored++;
541 0 : src += pair_sz;
542 0 : continue;
543 0 : } else {
544 0 : fd_snapwm_vinyl_duplicate_accounts_batch_append( ctx, &ele->phdr, ele->seq );
545 0 : recovery_seq = ele->seq;
546 0 : ctx->metrics.accounts_replaced++;
547 0 : }
548 0 : } else {
549 0 : if( FD_UNLIKELY( ctx->vinyl.pair_cnt++ > ctx->vinyl.pair_cnt_max ) ) {
550 0 : FD_LOG_ERR(( "failed to load snapshot: exceeded [accounts.max_accounts] (%lu)", ctx->vinyl.pair_cnt_max ));
551 0 : }
552 0 : }
553 :
554 0 : fd_snapin_vinyl_pair_info_update_recovery_seq( &phdr.info, recovery_seq );
555 0 : ele->memo = memo;
556 0 : ele->phdr.ctl = phdr.ctl;
557 0 : ele->phdr.key = phdr.key;
558 0 : ele->phdr.info = phdr.info;
559 0 : ele->seq = ULONG_MAX; /* later init */
560 0 : ele->line_idx = ULONG_MAX;
561 0 : }
562 :
563 : /* sizeof(fd_vinyl_bstream_phdr_t) is less than the minimum
564 : pair_sz==FD_VINYL_BSTREAM_BLOCK_SZ. */
565 0 : ulong off = sizeof(fd_vinyl_bstream_phdr_t);
566 0 : fd_memcpy( dst, &phdr, off );
567 0 : fd_memcpy( dst+off, src+off, pair_sz-off );
568 0 : src += pair_sz;
569 :
570 0 : ulong seq_after = fd_vinyl_io_append( io, pair, pair_sz );
571 0 : if( FD_LIKELY( do_meta_update ) ) ele->seq = seq_after;
572 0 : }
573 :
574 0 : fd_snapwm_vinyl_duplicate_accounts_batch_fini( ctx, stem );
575 0 : }
576 :
577 : void
578 0 : fd_snapwm_vinyl_shutdown( fd_snapwm_tile_t * ctx ) {
579 0 : int commit_err = fd_vinyl_io_commit( ctx->vinyl.io, FD_VINYL_IO_FLAG_BLOCKING );
580 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit(io) failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
581 0 : int sync_err = fd_vinyl_io_sync( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
582 0 : if( FD_UNLIKELY( sync_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_sync(io_mm) failed (%i-%s)", sync_err, fd_vinyl_strerror( sync_err ) ));
583 0 : vinyl_mm_sync( ctx );
584 :
585 0 : fd_vinyl_io_wd_ctrl( ctx->vinyl.io_wd, FD_SNAPSHOT_MSG_CTRL_SHUTDOWN, 0UL );
586 0 : }
587 :
588 : void
589 : fd_snapwm_vinyl_read_account( fd_snapwm_tile_t * ctx,
590 : void const * acct_addr,
591 : fd_account_meta_t * meta,
592 : uchar * data,
593 0 : ulong data_max ) {
594 0 : if( FD_UNLIKELY( ctx->vinyl.io!=ctx->vinyl.io_mm ) ) {
595 0 : FD_LOG_CRIT(( "vinyl not in io_mm mode" ));
596 0 : }
597 :
598 0 : memset( meta, 0, sizeof(fd_account_meta_t) );
599 :
600 : /* Query database index */
601 :
602 0 : fd_vinyl_key_t key[1];
603 0 : fd_vinyl_key_init( key, acct_addr, 32UL );
604 0 : ulong memo = fd_vinyl_key_memo( ctx->vinyl.map->seed, key );
605 0 : fd_vinyl_meta_ele_t const * ele = fd_vinyl_meta_prepare_nolock( ctx->vinyl.map, key, memo );
606 0 : if( FD_UNLIKELY( !ele || !fd_vinyl_meta_ele_in_use( ele ) ) ) {
607 : /* account not found */
608 0 : return;
609 0 : }
610 :
611 0 : uchar * mmio = fd_vinyl_mmio ( ctx->vinyl.io_mm );
612 0 : ulong mmio_sz = fd_vinyl_mmio_sz( ctx->vinyl.io_mm );
613 :
614 : /* Validate index record */
615 :
616 0 : ulong const seq0 = ele->seq;
617 0 : ulong const ctl = ele->phdr.ctl;
618 0 : int const ctl_type = fd_vinyl_bstream_ctl_type( ctl );
619 0 : ulong const val_esz = fd_vinyl_bstream_ctl_sz ( ctl );
620 0 : ulong const pair_sz = fd_vinyl_bstream_pair_sz( val_esz );
621 0 : ulong const seq1 = seq0 + pair_sz;
622 0 : ulong const seq_past = ctx->vinyl.io->seq_past;
623 0 : ulong const seq_present = ctx->vinyl.io->seq_present;
624 0 : if( FD_UNLIKELY( ctl_type!=FD_VINYL_BSTREAM_CTL_TYPE_PAIR ) ) {
625 0 : FD_LOG_CRIT(( "corrupt bstream record in index: ctl=%016lx", ctl ));
626 0 : }
627 0 : if( FD_UNLIKELY( val_esz<sizeof(fd_account_meta_t) ||
628 0 : val_esz>sizeof(fd_account_meta_t)+FD_RUNTIME_ACC_SZ_MAX ) ) {
629 0 : FD_LOG_CRIT(( "corrupt bstream record in index: val_esz=%lu", val_esz ));
630 0 : }
631 0 : int bad_past = !(fd_vinyl_seq_le( seq_past, seq0 ) & fd_vinyl_seq_lt( seq0, seq1 ) & fd_vinyl_seq_le( seq1, seq_present ));
632 0 : if( FD_UNLIKELY( bad_past ) ) {
633 0 : FD_LOG_CRIT(( "corrupt bstream record in index: seq[%lu,%lu) not in [seq_past=%lu,seq_present=%lu)",
634 0 : seq0, seq1, seq_past, seq_present ));
635 0 : }
636 :
637 : /* Map seq range to underlying device
638 : In the snapshot loader, it is safe to assume that bstream reads
639 : do not wrap around. */
640 :
641 0 : if( FD_UNLIKELY( seq1>mmio_sz ) ) {
642 0 : FD_LOG_CRIT(( "corrupt bstream record in index: seq[%lu,%lu) exceeds bstream addressable range [0,%lu)",
643 0 : seq0, seq1, mmio_sz ));
644 0 : }
645 :
646 : /* Read from bstream */
647 :
648 0 : ulong seq_meta = seq0 + sizeof(fd_vinyl_bstream_phdr_t);
649 0 : ulong seq_data = seq_meta + sizeof(fd_account_meta_t);
650 :
651 0 : memcpy( meta, mmio+seq_meta, sizeof(fd_account_meta_t) );
652 0 : if( FD_UNLIKELY( sizeof(fd_account_meta_t)+(ulong)meta->dlen > val_esz ) ) {
653 0 : FD_LOG_CRIT(( "corrupt bstream record: seq0=%lu val_esz=%lu dlen=%u", seq0, val_esz, meta->dlen ));
654 0 : }
655 0 : if( FD_UNLIKELY( meta->dlen > data_max ) ) {
656 0 : FD_BASE58_ENCODE_32_BYTES( acct_addr, acct_addr_b58 );
657 0 : FD_LOG_CRIT(( "failed to read account %s: account data size (%lu bytes) exceeds buffer size (%lu bytes)",
658 0 : acct_addr_b58, (ulong)meta->dlen, data_max ));
659 0 : }
660 0 : memcpy( data, mmio+seq_data, meta->dlen );
661 0 : }
662 :
663 : /* handle_hash_out_fseq_check is a blocking operation */
664 : static inline void
665 : handle_hash_out_fseq_check( fd_snapwm_tile_t * ctx,
666 : fd_stem_context_t * stem,
667 0 : ulong min_credit ) {
668 0 : ulong producer_fseq = fd_fseq_query( &stem->seqs[ ctx->hash_out.idx ] );
669 0 : ulong consumer_fseq = fd_fseq_query( ctx->hash_out.consumer_fseq );
670 0 : for(;;) {
671 0 : ulong avail = ctx->hash_out.depth - ( producer_fseq - consumer_fseq );
672 0 : if( FD_LIKELY( avail > min_credit ) ) break;
673 0 : FD_SPIN_PAUSE();
674 0 : consumer_fseq = fd_fseq_query( ctx->hash_out.consumer_fseq );
675 0 : }
676 0 : }
677 :
678 : int
679 : fd_snapwm_vinyl_duplicate_accounts_batch_init( fd_snapwm_tile_t * ctx,
680 0 : fd_stem_context_t * stem ) {
681 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return 0;
682 0 : ctx->vinyl.duplicate_accounts_batch_sz = 0UL;
683 0 : ctx->vinyl.duplicate_accounts_batch_cnt = 0UL;
684 :
685 : /* fseq check is mandatory here, since append writes directly to
686 : the dcache. */
687 0 : handle_hash_out_fseq_check( ctx, stem, FD_SNAPWM_DUP_BATCH_CREDIT_MIN );
688 0 : return 1;
689 0 : }
690 :
691 : int
692 : fd_snapwm_vinyl_duplicate_accounts_batch_append( fd_snapwm_tile_t * ctx,
693 : fd_vinyl_bstream_phdr_t * phdr,
694 0 : ulong seq ) {
695 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return 0;
696 0 : uchar * data = fd_chunk_to_laddr( ctx->hash_out.mem, ctx->hash_out.chunk );
697 0 : data += ctx->vinyl.duplicate_accounts_batch_sz; /* offset into the chunk */
698 0 : memcpy( data, &seq, sizeof(ulong) );
699 0 : memcpy( data + sizeof(ulong), phdr, sizeof(fd_vinyl_bstream_phdr_t) );
700 0 : ctx->vinyl.duplicate_accounts_batch_sz += FD_SNAPWM_DUP_META_SZ;
701 0 : ctx->vinyl.duplicate_accounts_batch_cnt +=1UL;
702 0 : return 1;
703 0 : }
704 :
705 : int
706 : fd_snapwm_vinyl_duplicate_accounts_batch_fini( fd_snapwm_tile_t * ctx,
707 0 : fd_stem_context_t * stem ) {
708 0 : if( FD_UNLIKELY( ctx->lthash_disabled ) ) return 0;
709 :
710 : /* There is no fseq check in batch_fini. This is a performance
711 : optimization, which requires no other fd_stem_publish on the
712 : output link in between init and fini. */
713 :
714 0 : ulong batch_sz = ctx->vinyl.duplicate_accounts_batch_sz;
715 0 : ulong batch_cnt = ctx->vinyl.duplicate_accounts_batch_cnt;
716 0 : if( FD_UNLIKELY( batch_cnt>FD_SSPARSE_ACC_BATCH_MAX ) ) {
717 0 : FD_LOG_CRIT(( "batch_cnt %lu exceeds FD_SSPARSE_ACC_BATCH_MAX %lu", batch_cnt, FD_SSPARSE_ACC_BATCH_MAX ));
718 0 : }
719 0 : if( FD_UNLIKELY( !batch_sz ) ) return 0;
720 0 : fd_stem_publish( stem, ctx->hash_out.idx, FD_SNAPSHOT_HASH_MSG_SUB_META_BATCH, ctx->hash_out.chunk, batch_sz, 0UL, 0UL, batch_cnt/*tspub*/ );
721 0 : ctx->hash_out.chunk = fd_dcache_compact_next( ctx->hash_out.chunk, batch_sz, ctx->hash_out.chunk0, ctx->hash_out.wmark );
722 0 : return 1;
723 0 : }
724 :
725 : int
726 : fd_snapwm_vinyl_init_admin( fd_snapwm_tile_t * ctx,
727 0 : int do_rwlock ) {
728 0 : if( FD_UNLIKELY( !!do_rwlock ) ) fd_rwlock_write( &ctx->vinyl.admin->lock );
729 :
730 0 : ulong status = fd_vinyl_admin_ulong_query( &ctx->vinyl.admin->status );
731 0 : if( FD_UNLIKELY( status!=FD_VINYL_ADMIN_STATUS_INIT_PENDING ) ) {
732 0 : FD_LOG_WARNING(( "vinyl admin unexpected status %s (%lu) during initialization",
733 0 : fd_vinyl_admin_status_str( status ), status ));
734 0 : goto init_admin_error;
735 0 : }
736 :
737 0 : if( FD_UNLIKELY( !ctx->vinyl.wr_cnt ) ) {
738 0 : FD_LOG_WARNING(( "vinyl admin sees unexpected write tile count %lu", ctx->vinyl.wr_cnt ));
739 0 : goto init_admin_error;
740 0 : }
741 0 : fd_vinyl_admin_ulong_update( &ctx->vinyl.admin->wr_cnt, ctx->vinyl.wr_cnt );
742 :
743 0 : for( ulong i=0UL; i<ctx->vinyl.wr_cnt; i++ ) {
744 0 : fd_vinyl_admin_ulong_update( &ctx->vinyl.admin->wr_seq[ i ], 0UL );
745 0 : }
746 :
747 0 : fd_vinyl_admin_ulong_update( &ctx->vinyl.admin->status, FD_VINYL_ADMIN_STATUS_INIT_DONE );
748 :
749 0 : if( FD_UNLIKELY( !!do_rwlock ) ) fd_rwlock_unwrite( &ctx->vinyl.admin->lock );
750 0 : return 1;
751 :
752 0 : init_admin_error:
753 0 : if( FD_UNLIKELY( !!do_rwlock ) ) fd_rwlock_unwrite( &ctx->vinyl.admin->lock );
754 0 : return 0;
755 0 : }
756 :
757 : int
758 : fd_snapwm_vinyl_update_admin( fd_snapwm_tile_t * ctx,
759 0 : int do_rwlock ) {
760 0 : if( FD_UNLIKELY( !!do_rwlock ) ) fd_rwlock_write( &ctx->vinyl.admin->lock );
761 :
762 0 : fd_vinyl_admin_ulong_update( &ctx->vinyl.admin->status, FD_VINYL_ADMIN_STATUS_UPDATING );
763 :
764 0 : for( ulong i=0UL; i<ctx->vinyl.wr_cnt; i++ ) {
765 : /* This may cause a wr_seq[ i ] regression, which is expected e.g.
766 : if the snapshot load pipeline aborts the current snapshot and
767 : resets to load a new one. */
768 0 : fd_vinyl_admin_ulong_update( &ctx->vinyl.admin->wr_seq[ i ], ctx->vinyl.io_wd->seq_present );
769 0 : }
770 :
771 0 : ulong status = fd_ulong_if( ctx->full, FD_VINYL_ADMIN_STATUS_SNAPSHOT_FULL, FD_VINYL_ADMIN_STATUS_SNAPSHOT_INCR );
772 0 : fd_vinyl_admin_ulong_update( &ctx->vinyl.admin->status, status );
773 :
774 0 : if( FD_UNLIKELY( !!do_rwlock ) ) fd_rwlock_unwrite( &ctx->vinyl.admin->lock );
775 0 : return 1;
776 0 : }
777 :
778 : void
779 0 : fd_snapwm_vinyl_recovery_seq_backup( fd_snapwm_tile_t * ctx ) {
780 0 : ctx->vinyl.recovery.seq_ancient = ctx->vinyl.io_mm->seq_ancient;
781 0 : ctx->vinyl.recovery.seq_past = ctx->vinyl.io_mm->seq_past;
782 0 : ctx->vinyl.recovery.seq_present = ctx->vinyl.io_mm->seq_present;
783 0 : ctx->vinyl.recovery.seq_future = ctx->vinyl.io_mm->seq_future;
784 0 : }
785 :
786 : void
787 0 : fd_snapwm_vinyl_recovery_seq_apply( fd_snapwm_tile_t * ctx ) {
788 0 : ctx->vinyl.io_mm->seq_ancient = ctx->vinyl.recovery.seq_ancient;
789 0 : ctx->vinyl.io_mm->seq_past = ctx->vinyl.recovery.seq_past;
790 0 : ctx->vinyl.io_mm->seq_present = ctx->vinyl.recovery.seq_present;
791 0 : ctx->vinyl.io_mm->seq_future = ctx->vinyl.recovery.seq_future;
792 0 : }
793 :
794 : void
795 0 : fd_snapwm_vinyl_revert_full( fd_snapwm_tile_t * ctx ) {
796 0 : fd_vinyl_meta_t * map = ctx->vinyl.map;
797 0 : fd_vinyl_meta_ele_t * ele0 = map->ele;
798 0 : ulong ele_max = map->ele_max;
799 0 : void * map_ctx = map->ctx;
800 :
801 0 : long dt = -fd_log_wallclock();
802 0 : for( ulong ele_idx=0; ele_idx<ele_max; ele_idx++ ) {
803 0 : fd_vinyl_meta_ele_t * ele = ele0 + ele_idx;
804 0 : fd_vinyl_meta_private_ele_free( map_ctx, ele );
805 0 : }
806 :
807 : /* Apply changes and resync */
808 0 : fd_snapwm_vinyl_recovery_seq_apply( ctx );
809 0 : int sync_err = fd_vinyl_io_sync( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
810 0 : if( FD_UNLIKELY( sync_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_sync(io_mm) failed (%i-%s)", sync_err, fd_vinyl_strerror( sync_err ) ));
811 0 : vinyl_mm_sync( ctx );
812 :
813 0 : dt += fd_log_wallclock();
814 0 : FD_LOG_INFO(( "vinyl revert_full took %g seconds", (double)dt/1e9 ));
815 0 : }
816 :
817 : void
818 0 : fd_snapwm_vinyl_revert_incr( fd_snapwm_tile_t * ctx ) {
819 0 : FD_CRIT( ctx->vinyl.txn_active, "txn_commit called while not in txn" );
820 0 : FD_CRIT( ctx->vinyl.io==ctx->vinyl.io_mm, "vinyl not in io_mm mode" );
821 0 : fd_vinyl_io_t * io = ctx->vinyl.io_mm;
822 :
823 0 : long dt = -fd_log_wallclock();
824 :
825 : /* Finish any outstanding writes */
826 :
827 0 : int commit_err = fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
828 0 : if( FD_UNLIKELY( commit_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_commit failed (%i-%s)", commit_err, fd_vinyl_strerror( commit_err ) ));
829 :
830 : /* Hint to kernel to start prefetching to speed up reads */
831 :
832 0 : uchar * mmio = fd_vinyl_mmio ( io ); FD_TEST( mmio );
833 0 : ulong mmio_sz = fd_vinyl_mmio_sz( io );
834 :
835 0 : ulong txn_seq0 = ctx->vinyl.recovery.seq_present;
836 0 : ulong txn_seq1 = ctx->vinyl.io_mm->seq_present;
837 0 : FD_LOG_INFO(( "vinyl meta_recovery starting for seq [%lu,%lu)", txn_seq0, txn_seq1 ));
838 0 : ulong txn_sz = txn_seq1-txn_seq0;
839 0 : FD_CRIT( fd_vinyl_seq_le( txn_seq0, txn_seq1 ), "invalid txn seq range" );
840 0 : FD_CRIT( txn_seq1 <= mmio_sz, "invalid txn seq range" );
841 0 : if( FD_UNLIKELY( fd_vinyl_seq_eq( txn_seq0, txn_seq1 ) ) ) return;
842 :
843 0 : void * madv_base = (void *)fd_ulong_align_dn( (ulong)mmio+txn_seq0, FD_SHMEM_NORMAL_PAGE_SZ );
844 0 : ulong madv_sz = /* */fd_ulong_align_up( txn_sz, FD_SHMEM_NORMAL_PAGE_SZ );
845 0 : if( FD_UNLIKELY( madvise( madv_base, madv_sz, MADV_SEQUENTIAL ) ) ) {
846 0 : FD_LOG_WARNING(( "madvise(addr=%p,sz=%lu,MADV_SEQUENTIAL) failed (%i-%s)",
847 0 : madv_base, madv_sz,
848 0 : errno, fd_io_strerror( errno ) ));
849 0 : }
850 :
851 0 : fd_vinyl_meta_t * meta_map = ctx->vinyl.map;
852 0 : for( ulong seq=txn_seq0; fd_vinyl_seq_lt( seq, txn_seq1 ); ) {
853 :
854 0 : fd_vinyl_bstream_block_t * incr_block = (void *)( mmio+seq );
855 :
856 : /* Speculatively read block info */
857 0 : ulong ctl = FD_VOLATILE_CONST( incr_block->ctl );
858 0 : fd_vinyl_bstream_phdr_t incr_phdr = FD_VOLATILE_CONST( incr_block->phdr );
859 :
860 0 : ulong val_esz = fd_vinyl_bstream_ctl_sz ( ctl );
861 0 : int block_type = fd_vinyl_bstream_ctl_type( ctl );
862 0 : ulong block_sz;
863 :
864 0 : if( FD_LIKELY( block_type==FD_VINYL_BSTREAM_CTL_TYPE_PAIR ) ) {
865 0 : block_sz = fd_vinyl_bstream_pair_sz( val_esz );
866 0 : ulong memo = fd_vinyl_key_memo( meta_map->seed, &incr_phdr.key );
867 :
868 : /* recovery_seq must be read from the bstream pair, and not from
869 : the meta map ele, because ele->hdr.info and phdr.info may
870 : start disagreeing on this value as the recovery proceeds.
871 : Consider what happens when there are multiple duplicates for
872 : the same account in the incremental snapshot. */
873 0 : ulong recovery_seq = fd_snapin_vinyl_pair_info_recovery_seq( &incr_phdr.info );
874 :
875 : /* query the meta map element. */
876 0 : ulong found_ele_idx = 0UL;
877 0 : int found_ele = !fd_vinyl_meta_query_fast( meta_map->ele /*ele0*/,
878 0 : meta_map->ele_max,
879 0 : &incr_phdr.key,
880 0 : memo,
881 0 : &found_ele_idx );
882 :
883 : /* Consider these two generic cases, labeled A and B:
884 :
885 : bstream: [ full | incr | free )
886 : revert: (*)->.......)
887 : case A : [ A0 | A1 A2 | )
888 : case B : [ | B1 B2 | )
889 :
890 : with these pair -> recovery_seq:
891 : A0 -> 0 (sentinel)
892 : A1 -> A0
893 : A2 -> A1
894 :
895 : B1 -> 0 (sentinel)
896 : B2 -> B1
897 :
898 : Cases A1 and B1 have a recovery_seq in the full snapshot range,
899 : and are processed below in the "if" branch. Cases A2 and B2
900 : have a recovery_seq in the incr range, and are processed in the
901 : "else" branch. In these 4 cases, the corresponding bstream
902 : pair will be cleared.
903 :
904 : Note that bstream pairs are read/processed from left to right,
905 : i.e. A1 then A2, or B1 then B2.
906 :
907 : Case A1: the meta map element needs to be updated with bstream
908 : seq A0.
909 :
910 : Case B1: this account (bstream pair) was introduced during incr
911 : snapshot load, and should be discarded. Its meta map
912 : element is therefore freed.
913 :
914 : Case A2: its recovery_seq in the bstream pair's info points to
915 : A1, but the meta map element has already been updated
916 : to A0. In this case, the meta map element exists,
917 : and it is necessary to verify that the meta map
918 : element's seq points to a bstream seq in the full
919 : snapshot range. A2 is then discarded.
920 :
921 : Case B2: its recovery_seq in the bstream pair's info points to
922 : B1, but the meta map element has already been freed.
923 : In this case there is nothing else to do.
924 : */
925 0 : if( FD_LIKELY( recovery_seq<ctx->vinyl.recovery.seq_present ) ) {
926 : /* The meta map element must exist. */
927 0 : if( FD_UNLIKELY( !found_ele ) ) {
928 0 : FD_BASE58_ENCODE_32_BYTES( incr_phdr.key.uc, phdr_key_b58 );
929 0 : FD_LOG_CRIT(( "element seq %lu for key %s memo %016lx not found", seq, phdr_key_b58, memo ));
930 0 : }
931 :
932 0 : fd_vinyl_meta_ele_t * ele = meta_map->ele + found_ele_idx;
933 :
934 : /* The meta map element must be in use. */
935 0 : if( !fd_vinyl_meta_ele_in_use( ele ) ) {
936 0 : FD_BASE58_ENCODE_32_BYTES( incr_phdr.key.uc, phdr_key_b58 );
937 0 : FD_LOG_CRIT(( "element seq %lu for key %s memo %016lx not in use", seq, phdr_key_b58, memo ));
938 0 : }
939 :
940 : /* Either free the meta map element or update it. */
941 0 : if( FD_UNLIKELY( !recovery_seq ) ) {
942 0 : fd_vinyl_meta_private_ele_free( meta_map->ctx, ele );
943 0 : } else {
944 0 : fd_vinyl_bstream_block_t * full_block = (void *)( mmio+recovery_seq );
945 0 : fd_vinyl_bstream_phdr_t full_phdr = FD_VOLATILE_CONST( full_block->phdr );
946 0 : ulong incr_slot = fd_snapin_vinyl_pair_info_slot( &incr_phdr.info );
947 0 : ulong full_slot = fd_snapin_vinyl_pair_info_slot( &full_phdr.info );
948 :
949 0 : if( FD_UNLIKELY( full_slot>=incr_slot ) ) {
950 0 : FD_LOG_CRIT(( "revert incremental snapshot full_slot %lu >= incr_slot %lu", full_slot, incr_slot ));
951 0 : }
952 :
953 : /* Update meta map element. */
954 0 : ele->memo = fd_vinyl_key_memo( meta_map->seed, &full_phdr.key );
955 0 : ele->phdr = full_phdr;
956 0 : ele->seq = recovery_seq;
957 0 : ele->line_idx = ULONG_MAX;
958 0 : }
959 0 : } else{
960 : /* Only if the meta map element exists, verify that its
961 : recovery_seq points to a seq inside the full snapshot range. */
962 0 : if( FD_UNLIKELY( found_ele ) ) {
963 0 : fd_vinyl_meta_ele_t * ele = meta_map->ele + found_ele_idx;
964 0 : if( !fd_vinyl_meta_ele_in_use( ele ) ) {
965 0 : FD_BASE58_ENCODE_32_BYTES( incr_phdr.key.uc, phdr_key_b58 );
966 0 : FD_LOG_CRIT(( "element seq %lu for key %s memo %016lx not in use", seq, phdr_key_b58, memo ));
967 0 : }
968 0 : ulong ele_recovery_seq = fd_snapin_vinyl_pair_info_recovery_seq( &ele->phdr.info );
969 :
970 0 : if( FD_UNLIKELY( ele_recovery_seq>=ctx->vinyl.recovery.seq_present ) ) {
971 0 : FD_BASE58_ENCODE_32_BYTES( incr_phdr.key.uc, phdr_key_b58 );
972 0 : FD_LOG_CRIT(( "element seq %lu for key %s memo %016lx recovery_seq %lu with ele_recovery_seq %lu in the incr region", seq, phdr_key_b58, memo, recovery_seq, ele_recovery_seq ));
973 0 : }
974 0 : }
975 0 : }
976 : /* Verbose bstream cleanup. Even though the sync block will be
977 : updated as well before returning, it is preferred to zero out
978 : all incremental pairs that are being deprecated. */
979 0 : fd_memset( incr_block, 0, block_sz );
980 0 : } else if( block_type==FD_VINYL_BSTREAM_CTL_TYPE_ZPAD ) {
981 0 : block_sz = FD_VINYL_BSTREAM_BLOCK_SZ;
982 0 : } else {
983 0 : FD_LOG_CRIT(( "unexpected block type %d", block_type ));
984 0 : }
985 :
986 0 : if( FD_UNLIKELY( !block_sz ) ) {
987 0 : FD_LOG_CRIT(( "Invalid block header at vinyl seq %lu, ctl=%016lx (zero block_sz)", seq, ctl ));
988 0 : }
989 0 : if( FD_UNLIKELY( block_sz > 64UL<<20 ) ) {
990 0 : FD_LOG_CRIT(( "Invalid block header at vinyl seq %lu, ctl=%016lx, block_sz=%lu (unreasonably large block size)", seq, ctl, block_sz ));
991 0 : }
992 :
993 0 : seq += block_sz;
994 0 : }
995 :
996 : /* Apply changes and resync */
997 0 : fd_snapwm_vinyl_recovery_seq_apply( ctx );
998 0 : int sync_err = fd_vinyl_io_sync( ctx->vinyl.io_mm, FD_VINYL_IO_FLAG_BLOCKING );
999 0 : if( FD_UNLIKELY( sync_err ) ) FD_LOG_CRIT(( "fd_vinyl_io_sync(io_mm) failed (%i-%s)", sync_err, fd_vinyl_strerror( sync_err ) ));
1000 0 : vinyl_mm_sync( ctx );
1001 :
1002 0 : dt += fd_log_wallclock();
1003 0 : FD_LOG_INFO(( "vinyl revert_incr took %g seconds", (double)dt/1e9 ));
1004 0 : }
1005 :
1006 : void
1007 : fd_snapin_vinyl_pair_info_from_parts( fd_vinyl_info_t * info,
1008 : ulong val_sz,
1009 : ulong recovery_seq,
1010 0 : ulong slot ) {
1011 0 : ulong enc_seq = recovery_seq >> FD_VINYL_BSTREAM_BLOCK_LG_SZ;
1012 0 : ulong ul0 = ( ( enc_seq<<32 ) ) | ( ( val_sz<<32 )>>32);
1013 0 : ulong ul1 = ( ( enc_seq>>32 )<<48 ) | ( ( slot<<16 )>>16);
1014 0 : info->ul[ 0 ] = ul0;
1015 0 : info->ul[ 1 ] = ul1;
1016 0 : }
1017 :
1018 : void
1019 : fd_snapin_vinyl_pair_info_update_recovery_seq( fd_vinyl_info_t * info,
1020 0 : ulong recovery_seq ) {
1021 0 : fd_snapin_vinyl_pair_info_from_parts( info,
1022 0 : fd_snapin_vinyl_pair_info_val_sz( info ),
1023 0 : recovery_seq,
1024 0 : fd_snapin_vinyl_pair_info_slot( info ) );
1025 0 : }
1026 :
1027 : ulong
1028 0 : fd_snapin_vinyl_pair_info_val_sz ( fd_vinyl_info_t const * info ) {
1029 0 : return (ulong)info->ui[0];
1030 0 : }
1031 :
1032 : ulong
1033 0 : fd_snapin_vinyl_pair_info_recovery_seq( fd_vinyl_info_t const * info ) {
1034 0 : ulong enc_seq0 = info->ul[ 0 ];
1035 0 : ulong enc_seq1 = info->ul[ 1 ];
1036 0 : ulong enc_seq = ( ( enc_seq1>>48 )<<32 ) | ( enc_seq0>>32 );
1037 0 : ulong recovery_seq = enc_seq << FD_VINYL_BSTREAM_BLOCK_LG_SZ;
1038 0 : return recovery_seq;
1039 0 : }
1040 :
1041 : ulong
1042 0 : fd_snapin_vinyl_pair_info_slot( fd_vinyl_info_t const * info ) {
1043 0 : ulong slot = info->ul[ 1 ];
1044 0 : slot = ( slot<<16 )>>16;
1045 0 : return slot;
1046 0 : }
|