Line data Source code
1 : #include "fd_vinyl_io_ur_private.h"
2 :
3 : /* This backend uses scratch pad memory as a write-back cache. Every
4 : byte in the scratch pad has one of the following states:
5 :
6 : - clean: Written out to bstream
7 : - write: Write job in progress
8 : - wait: To be written out in the future
9 : - future: Written with append(), awaiting commit()
10 : - used: Reserved with alloc(), awaiting trim()/append()
11 : - free: Unused
12 :
13 : The write-back cache has a logical address space (bstream_seq),
14 : and a physical address space (offset).
15 :
16 : In logical space, each state covers a contiguous range of bytes in
17 : the following order. Various bstream seq numbers mark the bounds
18 : between regions.
19 :
20 : [clean] [write] [wait] [future] [used] [free]
21 : ^ ^ ^ ^ ^ ^
22 : seq_cache | seq_write | seq_future |
23 : seq_clean seq_present wb.seq1
24 :
25 : The region [seq_past,seq_cache) is outside of the write cache.
26 :
27 : The region [seq_cache,seq_present), i.e. spanning 'clean', 'write',
28 : and 'wait', is part of the write cache. Read requests that fall in
29 : this range are served from the write cache.
30 :
31 : The logical address space maps to the physical address space as a
32 : ring. The ring mapping is not modulo, see wb_ring for details. */
33 :
34 : void
35 0 : fd_vinyl_io_wq_completion( fd_vinyl_io_ur_t * ur ) {
36 0 : fd_io_uring_t * ring = ur->ring;
37 :
38 0 : FD_CRIT( ur->cqe_pending >0, "stray completion" );
39 0 : FD_CRIT( ur->cqe_write_pending>0, "stray write completion" );
40 :
41 : /* interpret CQE */
42 0 : struct io_uring_cqe * cqe = fd_io_uring_cq_head( ring->cq );
43 0 : if( FD_UNLIKELY( !cqe ) ) FD_LOG_CRIT(( "no write completion found" ));
44 0 : if( ur_udata_req_type( cqe->user_data )!=UR_REQ_WRITE ) {
45 0 : FD_LOG_CRIT(( "unexpected CQE type while flushing write queue" ));
46 0 : }
47 0 : int cqe_res = cqe->res;
48 0 : if( cqe_res<0 ) {
49 0 : FD_LOG_ERR(( "io_uring write failed (%i-%s)", -cqe_res, fd_io_strerror( -cqe_res ) ));
50 0 : }
51 :
52 : /* advance write cursor */
53 0 : ulong wq_idx = ur_udata_idx( cqe->user_data );
54 0 : wq_desc_t * wq_desc = wq_ring_desc( &ur->wq, wq_idx );
55 0 : ulong req_sz = wq_desc->sz;
56 0 : if( FD_UNLIKELY( (uint)cqe_res != req_sz ) ) {
57 0 : FD_LOG_ERR(( "database write failed (short write): requested to write %lu bytes, but only wrote %i bytes (at bstream seq 0x%016lx)",
58 0 : req_sz, cqe_res,
59 0 : wq_desc->seq - req_sz ));
60 0 : }
61 0 : ur->seq_clean = wq_ring_complete( &ur->wq, wq_idx );
62 :
63 0 : fd_io_uring_cq_advance( ring->cq, 1U );
64 0 : ur->cqe_write_pending--;
65 0 : ur->cqe_pending--;
66 0 : ur->cqe_cnt++;
67 0 : }
68 :
69 : /* wq_clean polls for write completions. */
70 :
71 : static void
72 0 : wq_clean( fd_vinyl_io_ur_t * ur ) {
73 0 : fd_io_uring_t * ring = ur->ring;
74 0 : while( fd_io_uring_cq_ready( ring->cq ) ) {
75 0 : fd_vinyl_io_wq_completion( ur );
76 0 : }
77 0 : }
78 :
79 : /* wq_wait waits for one write completion. */
80 :
81 : static void
82 0 : wq_wait( fd_vinyl_io_ur_t * ur ) {
83 0 : fd_io_uring_t * ring = ur->ring;
84 0 : FD_CRIT( ur->cqe_write_pending>0, "stray write completion wait" );
85 0 : FD_CRIT( ur->cqe_pending >0, "stray completion wait" );
86 0 : int err = fd_io_uring_enter( ring->ioring_fd, 0U, 1U, IORING_ENTER_GETEVENTS, NULL, 0UL );
87 0 : if( FD_UNLIKELY( err<0 ) ) {
88 0 : FD_LOG_ERR(( "io_uring_enter failed (%i-%s)", -err, fd_io_strerror( -err ) ));
89 0 : }
90 0 : FD_CRIT( fd_io_uring_cq_ready( ring->cq )>0, "io_uring_enter returned but no CQEs ready" );
91 0 : }
92 :
93 : /* wq_enqueue adds a buffer to the write queue. */
94 :
95 : static void
96 : track_sqe_write( fd_vinyl_io_ur_t * ur,
97 0 : ulong sz ) {
98 0 : ur->base->file_write_cnt++;
99 0 : ur->base->file_write_tot_sz += sz;
100 0 : ur->sqe_prep_cnt++;
101 0 : ur->cqe_pending++;
102 0 : ur->cqe_write_pending++;
103 0 : }
104 :
105 : static void
106 : wq_enqueue( fd_vinyl_io_ur_t * ur,
107 : ulong seq,
108 : uchar const * src,
109 0 : ulong sz ) {
110 :
111 0 : fd_io_uring_t * ring = ur->ring;
112 0 : ulong cq_depth = ring->cq->depth;
113 0 : wq_ring_t * wq = &ur->wq;
114 0 : for(;;) {
115 0 : wq_clean( ur );
116 0 : if( wq_ring_free_cnt( wq )>=2 && ur->cqe_pending+2 <= cq_depth ) break;
117 0 : wq_wait( ur );
118 0 : }
119 :
120 : /* Map seq into the bstream store. If we hit the store and with more
121 : to go, wrap around and finish the write at the store start. */
122 :
123 0 : ulong dev_base = ur->dev_base;
124 0 : ulong dev_sz = ur->dev_sz;
125 0 : ulong dev_off = seq % dev_sz;
126 :
127 0 : ulong wsz = fd_ulong_min( sz, dev_sz - dev_off );
128 0 : ulong wq0 = wq_ring_enqueue( wq, seq+wsz );
129 0 : wq_ring_desc( wq, wq0 )->sz = (uint)wsz; /* remember request size for CQE */
130 0 : struct io_uring_sqe * sqe = fd_io_uring_get_sqe( ring->sq );
131 0 : if( FD_UNLIKELY( !sqe ) ) FD_LOG_CRIT(( "unbalanced SQ" ));
132 0 : *sqe = (struct io_uring_sqe) {
133 0 : .opcode = IORING_OP_WRITE,
134 0 : .fd = 0, /* fixed file index 0 */
135 0 : .off = dev_base + (seq % dev_sz),
136 0 : .addr = (ulong)src,
137 0 : .len = (uint)wsz,
138 0 : .flags = IOSQE_FIXED_FILE,
139 0 : .user_data = ur_udata_pack_idx( UR_REQ_WRITE, wq0 ),
140 0 : };
141 0 : track_sqe_write( ur, wsz );
142 :
143 0 : sz -= wsz;
144 0 : if( sz ) {
145 0 : ulong wq1 = wq_ring_enqueue( wq, seq+wsz+sz );
146 0 : wq_ring_desc( wq, wq1 )->sz = (uint)sz;
147 0 : sqe = fd_io_uring_get_sqe( ring->sq );
148 0 : if( FD_UNLIKELY( !sqe ) ) FD_LOG_CRIT(( "unbalanced SQ" ));
149 0 : *sqe = (struct io_uring_sqe) {
150 0 : .opcode = IORING_OP_WRITE,
151 0 : .fd = 0, /* fixed file index 0 */
152 0 : .off = dev_base,
153 0 : .addr = (ulong)( src + wsz ),
154 0 : .len = (uint)sz,
155 0 : .flags = IOSQE_FIXED_FILE,
156 0 : .user_data = ur_udata_pack_idx( UR_REQ_WRITE, wq1 ),
157 0 : };
158 0 : track_sqe_write( ur, sz );
159 0 : }
160 0 : }
161 :
162 : /* wq_enqueue_seq adds jobs to the write queue until the given sequence
163 : number. */
164 :
165 : static void
166 : wq_enqueue_seq( fd_vinyl_io_ur_t * ur,
167 0 : ulong seq1 ) {
168 0 : ulong seq0 = ur->seq_write;
169 0 : FD_CRIT( fd_vinyl_seq_lt( seq0, seq1 ), "invalid wq_enqueue_seq call" );
170 0 : ulong const sz = seq1 - seq0;
171 0 : uchar const * spad = fd_vinyl_io_ur_wb_buf( ur );
172 :
173 0 : wb_ring_span_t span = wb_ring_translate( &ur->wb, seq0, sz );
174 0 : if( span.sz0 ) wq_enqueue( ur, ur->seq_write, spad+span.off0, span.sz0 );
175 0 : if( span.sz1 ) wq_enqueue( ur, ur->seq_write+span.sz0, spad+span.off1, span.sz1 );
176 0 : FD_CRIT( ur->seq_write+span.sz0+span.sz1==seq1, "invariant violation" );
177 0 : ur->seq_write = seq1;
178 :
179 0 : fd_io_uring_t * ring = ur->ring;
180 0 : int submit_cnt = fd_io_uring_submit( ring->sq, ring->ioring_fd, 0, 0U );
181 0 : if( FD_UNLIKELY( submit_cnt<0 ) ) FD_LOG_ERR(( "io_uring_submit failed (%i-%s)", -submit_cnt, fd_io_strerror( -submit_cnt ) ));
182 0 : ur->sqe_sent_cnt += (ulong)submit_cnt;
183 0 : }
184 :
185 : /* wb_flush_seq does a blocking flush of the write cache until the
186 : given sequence number. */
187 :
188 : static void
189 : wb_flush_seq( fd_vinyl_io_ur_t * ur,
190 0 : ulong seq ) {
191 0 : if( FD_UNLIKELY( fd_vinyl_seq_gt( seq, ur->base->seq_present ) ) ) {
192 0 : FD_LOG_CRIT(( "pipeline depth exceeded (seq=%lu seq_present=%lu)", seq, ur->base->seq_present ));
193 0 : }
194 0 : if( fd_vinyl_seq_gt( seq, ur->seq_write ) ) {
195 0 : ulong write_sz = seq - ur->seq_write;
196 0 : if( FD_UNLIKELY( write_sz < WQ_BLOCK_SZ ) ) {
197 : /* required write is a bit small. try to eagerly write more in
198 : order to reduce the amount of write ops. */
199 0 : ulong write_max = ur->base->seq_present - ur->seq_write;
200 0 : write_sz = fd_ulong_min( write_max, WQ_BLOCK_SZ );
201 0 : }
202 0 : ulong seq_write_new = ur->seq_write + write_sz;
203 0 : FD_CRIT( fd_vinyl_seq_ge( seq_write_new, seq ), "invariant violation" );
204 0 : wq_enqueue_seq( ur, seq_write_new );
205 0 : }
206 0 : for(;;) {
207 0 : wq_clean( ur );
208 0 : if( fd_vinyl_seq_ge( ur->seq_clean, seq ) ) break;
209 0 : wq_wait( ur );
210 0 : }
211 0 : }
212 :
213 : /* fd_vinyl_io_ur_alloc allocates space in the write-back ring buffer. */
214 :
215 : void *
216 : fd_vinyl_io_ur_alloc( fd_vinyl_io_t * io,
217 : ulong sz,
218 0 : int flags ) {
219 0 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
220 :
221 0 : int flag_blocking = !!( flags & FD_VINYL_IO_FLAG_BLOCKING );
222 :
223 0 : int bad_align = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
224 0 : int bad_sz = sz > ur->wb.max;
225 :
226 0 : if( FD_UNLIKELY( bad_align | bad_sz ) ) FD_LOG_CRIT(( bad_align ? "misaligned sz" : "sz too large" ));
227 :
228 : /* An alloc op discards a previous alloc */
229 :
230 0 : wb_ring_trim( &ur->wb, ur->base->seq_future );
231 0 : ur->base->spad_used = 0UL;
232 :
233 : /* Ensure that this alloc does not evict anything from the write cache
234 : that we cannot recover from the bstream file descriptor. */
235 :
236 0 : ulong seq_clean_new = wb_ring_alloc_seq0( &ur->wb, sz );
237 0 : if( FD_UNLIKELY( fd_vinyl_seq_gt( seq_clean_new, ur->seq_clean ) ) ) {
238 : /* This alloc cannot proceed until some writes happen */
239 0 : if( !flag_blocking ) return NULL;
240 0 : wb_flush_seq( ur, seq_clean_new );
241 0 : }
242 :
243 : /* At this point, we have enough clean space to alloc sz bytes. */
244 :
245 0 : ulong seq = ur->base->seq_future;
246 0 : if( FD_UNLIKELY( fd_vinyl_seq_ne( seq, wb_ring_seq1( &ur->wb ) ) ) ) {
247 0 : FD_LOG_CRIT(( "seq_future (%lu) and write-back buffer (%lu) out of sync", seq, wb_ring_seq1( &ur->wb ) ));
248 0 : }
249 0 : wb_ring_alloc( &ur->wb, sz );
250 0 : if( FD_UNLIKELY( fd_vinyl_seq_ne( seq+sz, wb_ring_seq1( &ur->wb ) ) ) ) {
251 0 : FD_LOG_CRIT(( "seq_future (%lu) and write-back buffer (%lu) out of sync", seq, wb_ring_seq1( &ur->wb ) ));
252 0 : }
253 0 : ur->base->spad_used = sz;
254 0 : if( fd_vinyl_seq_gt( ur->wb.seq0, ur->seq_cache ) ) ur->seq_cache = ur->wb.seq0;
255 0 : if( FD_UNLIKELY( !sz ) ) {
256 0 : ur->last_alloc = NULL;
257 0 : return NULL;
258 0 : }
259 :
260 0 : ulong off = wb_ring_seq_to_off( &ur->wb, seq );
261 0 : void * alloc = fd_vinyl_io_ur_wb_buf( ur ) + off;
262 0 : ur->last_alloc = alloc;
263 0 : return alloc;
264 0 : }
265 :
266 : /* fd_vinyl_io_ur_append appends to the write-back cache. */
267 :
268 : ulong
269 : fd_vinyl_io_ur_append( fd_vinyl_io_t * io,
270 : void const * _src,
271 0 : ulong sz ) {
272 0 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
273 0 : uchar const * src = (uchar const *)_src;
274 0 : uchar * spad = fd_vinyl_io_ur_wb_buf( ur );
275 :
276 0 : if( FD_UNLIKELY( ur->cqe_read_pending ) ) {
277 0 : FD_LOG_CRIT(( "attempted to enqueue a write while there are still inflight reads" ));
278 0 : }
279 :
280 : /* Validate the input args. */
281 :
282 0 : ulong seq_future = ur->base->seq_future; if( FD_UNLIKELY( !sz ) ) return seq_future;
283 0 : ulong seq_ancient = ur->base->seq_ancient;
284 0 : ulong dev_sz = ur->dev_sz;
285 :
286 0 : int bad_src = !src;
287 0 : int bad_align = !fd_ulong_is_aligned( (ulong)src, FD_VINYL_BSTREAM_BLOCK_SZ );
288 0 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
289 0 : int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
290 :
291 0 : if( FD_UNLIKELY( bad_src | bad_align | bad_sz | bad_capacity ) )
292 0 : FD_LOG_CRIT(( bad_src ? "NULL src" :
293 0 : bad_align ? "misaligned src" :
294 0 : bad_sz ? "misaligned sz" :
295 0 : "device full" ));
296 :
297 : /* Is the request in-place? If so, trim the allocation */
298 :
299 0 : ulong seq = seq_future;
300 :
301 0 : ur->base->spad_used = 0UL;
302 0 : if( src==ur->last_alloc ) {
303 0 : FD_CRIT( seq+sz<=ur->wb.seq1+ur->wb.sz1, "seq_future and write-back buffer out of sync" );
304 0 : wb_ring_trim( &ur->wb, seq+sz );
305 0 : } else {
306 : /* src must not be in spad */
307 0 : if( FD_UNLIKELY( ( (ulong)src>=(ulong)(spad ) ) &
308 0 : ( (ulong)src<=(ulong)(spad+ur->wb.max) ) ) ) {
309 0 : FD_LOG_CRIT(( "src buffer overlaps write-back cache" ));
310 0 : }
311 : /* copy allocation into spad */
312 0 : wb_ring_trim( &ur->wb, seq );
313 0 : void * alloc = fd_vinyl_io_ur_alloc( io, sz, FD_VINYL_IO_FLAG_BLOCKING );
314 0 : fd_memcpy( alloc, src, sz );
315 0 : ur->base->cache_write_cnt++;
316 0 : ur->base->cache_write_tot_sz += sz;
317 0 : }
318 :
319 : /* At this point, the append request is at the correct position in the
320 : write-back cache. Commit the allocation. */
321 :
322 0 : ur->base->seq_future = seq + sz;
323 0 : ur->base->spad_used = 0UL;
324 :
325 0 : return seq;
326 0 : }
327 :
328 : /* fd_vinyl_io_ur_commit 'commits' previous appends, making them visible
329 : to subsequent read calls. */
330 :
331 : int
332 : fd_vinyl_io_ur_commit( fd_vinyl_io_t * io,
333 0 : int flags ) {
334 0 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
335 0 : (void)flags;
336 :
337 0 : wb_ring_trim( &ur->wb, ur->base->seq_future );
338 0 : ur->base->seq_present = ur->base->seq_future;
339 0 : ur->base->spad_used = 0UL;
340 :
341 : /* Keep io_uring pipeline fed */
342 :
343 0 : if( ur->base->seq_present - ur->seq_write >= WQ_BLOCK_SZ ) {
344 0 : ulong seq_until = fd_ulong_align_dn( ur->base->seq_present, WQ_BLOCK_SZ );
345 0 : if( fd_vinyl_seq_gt( seq_until, ur->seq_write ) ) {
346 0 : wq_enqueue_seq( ur, seq_until );
347 0 : }
348 0 : }
349 :
350 0 : return FD_VINYL_SUCCESS;
351 0 : }
352 :
353 : /* fd_vinyl_io_ur_copy does a blocking read of an old block, then copies
354 : it out to the tail. This is not ideal. Instead, we should enqueue
355 : async background read/write jobs. */
356 :
357 : ulong
358 : fd_vinyl_io_ur_copy( fd_vinyl_io_t * io,
359 : ulong seq_src0,
360 0 : ulong sz ) {
361 0 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
362 :
363 : /* Validate the input args */
364 :
365 0 : ulong seq_past = ur->base->seq_past;
366 0 : ulong seq_present = ur->base->seq_present;
367 0 : ulong seq_future = ur->base->seq_future; if( FD_UNLIKELY( !sz ) ) return seq_future;
368 0 : ulong spad_max = ur->wb.max;
369 :
370 0 : ulong seq_src1 = seq_src0 + sz;
371 :
372 0 : int bad_past = !( fd_vinyl_seq_le( seq_past, seq_src0 ) &
373 0 : fd_vinyl_seq_lt( seq_src0, seq_src1 ) &
374 0 : fd_vinyl_seq_le( seq_src1, seq_present ) );
375 0 : int bad_src = !fd_ulong_is_aligned( seq_src0, FD_VINYL_BSTREAM_BLOCK_SZ );
376 0 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
377 :
378 0 : if( FD_UNLIKELY( bad_past | bad_src | bad_sz ) )
379 0 : FD_LOG_CRIT(( bad_past ? "src is not in the past" :
380 0 : bad_src ? "misaligned src_seq" :
381 0 : "misaligned sz" ));
382 :
383 : /* Map the dst to the bstream (updating seq_future) and map the src
384 : and dst regions onto the device. Then copy as much as we can at a
385 : time, handling device wrap around and copy buffering space. */
386 :
387 0 : ulong seq = seq_future;
388 :
389 0 : for(;;) {
390 0 : ulong csz = fd_ulong_min( sz, spad_max );
391 :
392 0 : void * buf = fd_vinyl_io_ur_alloc( io, csz, FD_VINYL_IO_FLAG_BLOCKING );
393 0 : fd_vinyl_io_ur_read_imm( io, seq_src0, buf, csz );
394 0 : fd_vinyl_io_ur_append ( io, buf, csz );
395 :
396 0 : sz -= csz;
397 0 : if( !sz ) break;
398 :
399 0 : seq_src0 += csz;
400 0 : }
401 :
402 0 : return seq;
403 0 : }
404 :
405 : ulong
406 : fd_vinyl_io_ur_hint( fd_vinyl_io_t * io,
407 0 : ulong sz ) {
408 0 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
409 0 : fd_vinyl_io_ur_alloc( io, sz, FD_VINYL_IO_FLAG_BLOCKING );
410 0 : fd_vinyl_io_trim( io, ur->base->seq_future );
411 0 : return io->seq_future;
412 0 : }
413 :
414 : int
415 : fd_vinyl_io_ur_sync( fd_vinyl_io_t * io,
416 0 : int flags ) {
417 0 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
418 0 : (void)flags;
419 :
420 0 : ulong seed = ur->base->seed;
421 0 : ulong seq_past = ur->base->seq_past;
422 0 : ulong seq_present = ur->base->seq_present;
423 :
424 0 : wb_flush_seq( ur, seq_present );
425 :
426 0 : int dev_fd = ur->dev_fd;
427 0 : ulong dev_sync = ur->dev_sync;
428 :
429 0 : fd_vinyl_bstream_block_t * block = ur->sync;
430 :
431 : /* block->sync.ctl current (static) */
432 0 : block->sync.seq_past = seq_past;
433 0 : block->sync.seq_present = seq_present;
434 : /* block->sync.info_sz current (static) */
435 : /* block->sync.info current (static) */
436 :
437 0 : block->sync.hash_trail = 0UL;
438 0 : block->sync.hash_blocks = 0UL;
439 0 : fd_vinyl_bstream_block_hash( seed, block ); /* sets hash_trail back to seed */
440 :
441 0 : bd_write( dev_fd, dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ );
442 0 : ur->base->file_write_cnt++;
443 0 : ur->base->file_write_tot_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
444 :
445 0 : ur->base->seq_ancient = seq_past;
446 :
447 0 : return FD_VINYL_SUCCESS;
448 0 : }
449 :
450 : /* fd_vinyl_io_ur_forget "forgets" old data, making it available to
451 : future allocations. */
452 :
453 : void
454 : fd_vinyl_io_ur_forget( fd_vinyl_io_t * io,
455 0 : ulong seq ) {
456 0 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
457 :
458 : /* Validate input arguments. Note that we don't allow forgetting into
459 : the future even when we have no uncommitted blocks because the
460 : resulting [seq_ancient,seq_future) might contain blocks that were
461 : never written (which might not be an issue practically but it would
462 : be a bit strange for something to try to scan starting from
463 : seq_ancient and discover unwritten blocks). */
464 :
465 0 : ulong seq_past = ur->base->seq_past;
466 0 : ulong seq_present = ur->base->seq_present;
467 0 : ulong seq_future = ur->base->seq_future;
468 :
469 0 : int bad_seq = !fd_ulong_is_aligned( seq, FD_VINYL_BSTREAM_BLOCK_SZ );
470 0 : int bad_dir = !(fd_vinyl_seq_le( seq_past, seq ) & fd_vinyl_seq_le( seq, seq_present ));
471 0 : int bad_read = !!ur->rq_head || !!ur->rc_head;
472 0 : int bad_append = fd_vinyl_seq_ne( seq_present, seq_future );
473 :
474 0 : if( FD_UNLIKELY( bad_seq | bad_dir | bad_read | bad_append ) )
475 0 : FD_LOG_CRIT(( "forget to seq %016lx failed (past [%016lx,%016lx)/%lu, %s)",
476 0 : seq, seq_past, seq_present, seq_present-seq_past,
477 0 : bad_seq ? "misaligned seq" :
478 0 : bad_dir ? "seq out of bounds" :
479 0 : bad_read ? "reads in progress" :
480 0 : "appends/copies in progress" ));
481 :
482 : /* Before we can forget data, we must have finished writing it first.
483 : Usually, this happens long after data has already been flushed, so
484 : this should be a no-op for practical purposes. */
485 :
486 0 : wb_flush_seq( ur, seq );
487 :
488 0 : ur->base->seq_past = seq;
489 0 : }
490 :
491 : /* fd_vinyl_io_ur_rewind "undoes" recent writes, allowing the user to
492 : overwrite a bstream range. */
493 :
494 : void
495 : fd_vinyl_io_ur_rewind( fd_vinyl_io_t * io,
496 0 : ulong seq ) {
497 0 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
498 :
499 : /* Validate input argments. Unlike forgot, we do allow rewinding to
500 : before seq_ancient as the region of sequence space reported to the
501 : caller as written is still accurate. */
502 :
503 0 : ulong seq_ancient = ur->base->seq_ancient;
504 0 : ulong seq_past = ur->base->seq_past;
505 0 : ulong seq_cache = ur-> seq_cache;
506 0 : ulong seq_clean = ur-> seq_clean;
507 0 : ulong seq_write = ur-> seq_write;
508 0 : ulong seq_present = ur->base->seq_present;
509 0 : ulong seq_future = ur->base->seq_future;
510 :
511 0 : int bad_seq = !fd_ulong_is_aligned( seq, FD_VINYL_BSTREAM_BLOCK_SZ );
512 0 : int bad_past = fd_vinyl_seq_lt( seq, seq_past );
513 0 : int bad_dir = fd_vinyl_seq_gt( seq, seq_present );
514 0 : int bad_read = !!ur->rq_head || !!ur->rc_head;
515 0 : int bad_append = fd_vinyl_seq_ne( seq_present, seq_future );
516 :
517 0 : if( FD_UNLIKELY( bad_seq | bad_past | bad_dir | bad_read | bad_append ) )
518 0 : FD_LOG_CRIT(( "rewind to seq %016lx failed (present %016lx, %s)", seq, seq_present,
519 0 : bad_seq ? "misaligned seq" :
520 0 : bad_past ? "seq before seq_past" :
521 0 : bad_dir ? "seq after seq_present" :
522 0 : bad_read ? "reads in progress" :
523 0 : "appends/copies in progress" ));
524 :
525 : /* Need to awkwardly unwind the write pipeline. At this instant,
526 : there might be inflight writes for the range that is being
527 : rewinded. So, we start at the right and walk to the left until we
528 : reach seq: unwind allocs, unwind uncommitted data, unwind dirty
529 : cache, unwind inflight writes, unwind clean cache, and finally free
530 : up log file space.
531 :
532 : See top of this file for a description of the various regions. */
533 :
534 0 : if( fd_vinyl_seq_lt( seq, seq_write ) ) {
535 0 : wb_ring_trim( &ur->wb, seq_write ); /* unwind "used", "future", "wait" */
536 0 : wb_flush_seq( ur, seq_write ); /* wait for I/O completion (moves "write" to "clean") */
537 0 : }
538 0 : wb_ring_trim( &ur->wb, seq ); /* unwind rest */
539 :
540 0 : ur->base->seq_ancient = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_ancient ), seq_ancient, seq );
541 0 : ur->base->seq_past = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_past ), seq_past, seq );
542 0 : ur-> seq_cache = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_cache ), seq_cache, seq );
543 0 : ur-> seq_clean = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_clean ), seq_clean, seq );
544 0 : ur-> seq_write = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_write ), seq_write, seq );
545 0 : ur->base->seq_present = seq;
546 0 : ur->base->seq_future = seq;
547 0 : }
|