Line data Source code
1 : #include "fd_vinyl_io_ur.h"
2 :
3 : #if FD_HAS_LIBURING
4 :
5 : #include <unistd.h> /* lseek */
6 : #include <liburing.h>
7 :
8 : static inline void
9 : bd_read( int fd,
10 : ulong off,
11 : void * buf,
12 : ulong sz ) {
13 : ssize_t ssz = pread( fd, buf, sz, (off_t)off );
14 : if( FD_LIKELY( ssz==(ssize_t)sz ) ) return;
15 : if( ssz<(ssize_t)0 ) FD_LOG_CRIT(( "pread(fd %i,off %lu,sz %lu) failed (%i-%s)", fd, off, sz, errno, fd_io_strerror( errno ) ));
16 : /**/ FD_LOG_CRIT(( "pread(fd %i,off %lu,sz %lu) failed (unexpected sz %li)", fd, off, sz, (long)ssz ));
17 : }
18 :
19 : static inline void
20 : bd_write( int fd,
21 : ulong off,
22 : void const * buf,
23 : ulong sz ) {
24 : ssize_t ssz = pwrite( fd, buf, sz, (off_t)off );
25 : if( FD_LIKELY( ssz==(ssize_t)sz ) ) return;
26 : if( ssz<(ssize_t)0 ) FD_LOG_CRIT(( "pwrite(fd %i,off %lu,sz %lu) failed (%i-%s)", fd, off, sz, errno, fd_io_strerror( errno ) ));
27 : else FD_LOG_CRIT(( "pwrite(fd %i,off %lu,sz %lu) failed (unexpected sz %li)", fd, off, sz, (long)ssz ));
28 : }
29 :
30 : /* fd_vinyl_io_ur_rd_t extends fd_vinyl_io_rd_t. Describes an inflight
31 : read request. Each object gets created with a fd_vinyl_io_read()
32 : call, has at least the lifetime of a io_uring SQE/CQE transaction,
33 : and gets destroyed with fd_vinyl_io_poll().
34 :
35 : Each fd_vinyl_io_rd_t describes a contiguous read in bstream seq
36 : space. When mapped to the device, this typically results in a single
37 : contiguous read. */
38 :
39 : struct fd_vinyl_io_ur_rd;
40 : typedef struct fd_vinyl_io_ur_rd fd_vinyl_io_ur_rd_t;
41 :
42 : struct fd_vinyl_io_ur_rd {
43 : ulong ctx; /* Must mirror fd_vinyl_io_rd_t */
44 : ulong seq; /* " */
45 : void * dst; /* " */
46 : ulong sz; /* " */
47 :
48 : uint tsz; /* Tail read size */
49 : fd_vinyl_io_ur_rd_t * next; /* Next element in ur rd queue */
50 : };
51 :
52 : FD_STATIC_ASSERT( sizeof(fd_vinyl_io_ur_rd_t)<=sizeof(fd_vinyl_io_rd_t), layout );
53 :
54 : /* fd_vinyl_io_ur_t extends fd_viny_io_t. */
55 :
56 : struct fd_vinyl_io_ur {
57 : fd_vinyl_io_t base[1];
58 : int dev_fd; /* File descriptor of block device */
59 : ulong dev_sync; /* Offset to block that holds bstream sync (BLOCK_SZ multiple) */
60 : ulong dev_base; /* Offset to first block (BLOCK_SZ multiple) */
61 : ulong dev_sz; /* Block store byte size (BLOCK_SZ multiple) */
62 : fd_vinyl_io_ur_rd_t * rd_head; /* Pointer to queue head */
63 : fd_vinyl_io_ur_rd_t ** rd_tail_next; /* Pointer to queue &tail->next or &rd_head if empty. */
64 : fd_vinyl_bstream_block_t sync[1];
65 :
66 : struct io_uring * ring;
67 :
68 : ulong sq_prep_cnt; /* io_uring SQEs sent */
69 : ulong sq_sent_cnt; /* io_uring SQEs submitted */
70 : ulong cq_cnt; /* io_uring CQEs received */
71 :
72 : /* spad_max bytes follow */
73 : };
74 :
75 : typedef struct fd_vinyl_io_ur fd_vinyl_io_ur_t;
76 :
77 : /* fd_vinyl_io_ur_read_imm is identical to fd_vinyl_io_bd_read_imm. */
78 :
79 : static void
80 : fd_vinyl_io_ur_read_imm( fd_vinyl_io_t * io,
81 : ulong seq0,
82 : void * _dst,
83 : ulong sz ) {
84 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
85 :
86 : /* If this is a request to read nothing, succeed immediately. If
87 : this is a request to read outside the bstream's past, fail. */
88 :
89 : if( FD_UNLIKELY( !sz ) ) return;
90 :
91 : uchar * dst = (uchar *)_dst;
92 : ulong seq1 = seq0 + sz;
93 :
94 : ulong seq_past = ur->base->seq_past;
95 : ulong seq_present = ur->base->seq_present;
96 :
97 : int bad_seq = !fd_ulong_is_aligned( seq0, FD_VINYL_BSTREAM_BLOCK_SZ );
98 : int bad_dst = (!fd_ulong_is_aligned( (ulong)dst, FD_VINYL_BSTREAM_BLOCK_SZ )) | !dst;
99 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
100 : int bad_past = !(fd_vinyl_seq_le( seq_past, seq0 ) & fd_vinyl_seq_lt( seq0, seq1 ) & fd_vinyl_seq_le( seq1, seq_present ));
101 :
102 : if( FD_UNLIKELY( bad_seq | bad_dst | bad_sz | bad_past ) )
103 : FD_LOG_CRIT(( "bstream read_imm [%016lx,%016lx)/%lu failed (past [%016lx,%016lx)/%lu, %s)",
104 : seq0, seq1, sz, seq_past, seq_present, seq_present-seq_past,
105 : bad_seq ? "misaligned seq" :
106 : bad_dst ? "misaligned or NULL dst" :
107 : bad_sz ? "misaligned sz" :
108 : "not in past" ));
109 :
110 : /* At this point, we have a valid read request. Map seq0 into the
111 : bstream store. Read the lesser of sz bytes or until the store end.
112 : If we hit the store end with more to go, wrap around and finish the
113 : read at the store start. */
114 :
115 : int dev_fd = ur->dev_fd;
116 : ulong dev_base = ur->dev_base;
117 : ulong dev_sz = ur->dev_sz;
118 :
119 : ulong dev_off = seq0 % dev_sz;
120 :
121 : ulong rsz = fd_ulong_min( sz, dev_sz - dev_off );
122 : bd_read( dev_fd, dev_base + dev_off, dst, rsz );
123 : sz -= rsz;
124 :
125 : if( FD_UNLIKELY( sz ) ) bd_read( dev_fd, dev_base, dst + rsz, sz );
126 : }
127 :
128 : /* ### Read pipeline explainer
129 :
130 : vinyl_io clients submit read jobs using vinyl_io_read, and poll for
131 : completions using vinyl_io_poll. Reads may complete in arbitrary
132 : order. On first sight, this cleanly translates to io_uring.
133 :
134 : Read job descriptors are user-allocated. The client is not aware of
135 : any job queue depth limits in the vinyl_io backend's internals. The
136 : vinyl_io backend is expected to queue up an infinitely deep backlog
137 : of read jobs. However, the io_uring submission queue has a hard
138 : depth limit.
139 :
140 : The vinyl_io lifecycle therefore is as follows:
141 : - io_ur_read adds a read job to the 'staged' queue. This is a linked
142 : list weaving through all user-submitted jobs.
143 : - io_ur_read/io_ur_poll move jobs from the 'staged' queue to the
144 : 'wait' heap. Each wait heap entry is shadowed by an io_uring
145 : submission queue entry.
146 : - io_ur_poll matches io_uring completions with corresponding 'wait'
147 : heap entries. Each entry is removed from the 'wait' heap and
148 : returned back to the user.
149 :
150 : In rare cases, a bstream read may wrap around the end of the bstream.
151 : In this case, two linked SQEs are generated.
152 :
153 : ### Polling
154 :
155 : fd_vinyl_io_read registers work in userspace only but does not do any
156 : syscalls. fd_vinyl_io_poll submits read jobs (calls kernel io_uring
157 : syscall) if there is any work pending, then polls for completions. */
158 :
159 : /* ur_staged_push adds a read job to the staged queue. */
160 :
161 : static void
162 : ur_staged_push( fd_vinyl_io_ur_t * ur,
163 : fd_vinyl_io_ur_rd_t * rd ) {
164 : rd->next = NULL;
165 : *ur->rd_tail_next = rd;
166 : ur->rd_tail_next = &rd->next;
167 : }
168 :
169 : /* ur_prep_read translates a staged read job into one (or rarely two)
170 : io_uring SQEs. SQEs are allocated off the io_uring instance.
171 : Returns the number of SQEs prepared on success, and moves rd onto the
172 : wait heap. On failure to allocate SQEs, behaves like a no-op (safe
173 : to retry) and returns 0. */
174 :
175 : static uint
176 : ur_prep_read( fd_vinyl_io_ur_t * ur,
177 : fd_vinyl_io_ur_rd_t * rd,
178 : ulong seq0,
179 : ulong sz ) {
180 : struct io_uring * ring = ur->ring;
181 : if( FD_UNLIKELY( sz>INT_MAX ) ) {
182 : FD_LOG_CRIT(( "Invalid read size 0x%lx bytes (exceeds max)", sz ));
183 : }
184 : if( FD_UNLIKELY( io_uring_sq_space_left( ring )<2U ) ) return 0U;
185 :
186 : /* Map seq0 into the bstream store. */
187 :
188 : ulong dev_base = ur->dev_base;
189 : ulong dev_sz = ur->dev_sz;
190 :
191 : ulong dev_off = seq0 % dev_sz;
192 :
193 : ulong rsz = fd_ulong_min( sz, dev_sz - dev_off );
194 : sz -= rsz;
195 :
196 : /* Prepare the head SQE */
197 : rd->next = NULL;
198 : rd->tsz = (uint)rsz;
199 : struct io_uring_sqe * sqe = io_uring_get_sqe( ring );
200 : if( FD_UNLIKELY( !sqe ) ) FD_LOG_CRIT(( "io_uring_get_sqe() returned NULL despite io_uring_sq_space_left()>=2" ));
201 : io_uring_prep_read( sqe, 0, rd->dst, (uint)rsz, dev_base+dev_off );
202 : io_uring_sqe_set_flags( sqe, IOSQE_FIXED_FILE );
203 : io_uring_sqe_set_data( sqe, rd );
204 : ur->sq_prep_cnt++;
205 : if( FD_LIKELY( !sz ) ) return 1U; /* optimize for the unfragmented case */
206 :
207 : /* Tail wraparound occurred. Amend the head SQE to be linked to the
208 : tail SQE, detach it from the io_ur descriptor, and suppress the CQE
209 : for the head. If we get a CQE for the tail read job, we know that
210 : the head read job also succeeded. Also, set the low bit of the
211 : userdata to 1 (usually guaranteed to be 0 due to alignment), to
212 : indicate that this SQE is a head frag. */
213 : io_uring_sqe_set_flags( sqe, IOSQE_FIXED_FILE | IOSQE_IO_LINK | IOSQE_CQE_SKIP_SUCCESS );
214 : io_uring_sqe_set_data64( sqe, (ulong)rd+1UL );
215 : ur->cq_cnt++; /* Treat as already-completed in metrics */
216 :
217 : /* Prepare the tail SQE */
218 : rd->tsz = (uint)sz;
219 : sqe = io_uring_get_sqe( ring );
220 : if( FD_UNLIKELY( !sqe ) ) FD_LOG_CRIT(( "io_uring_get_sqe() returned NULL despite io_uring_sq_space_left()>=2" ));
221 : io_uring_prep_read( sqe, 0, (uchar *)rd->dst + rsz, (uint)sz, dev_base );
222 : io_uring_sqe_set_flags( sqe, IOSQE_FIXED_FILE );
223 : io_uring_sqe_set_data( sqe, rd );
224 : ur->sq_prep_cnt++;
225 : return 2U;
226 : }
227 :
228 : /* ur_staged_clean moves as many read jobs from the staged queue to the
229 : submission queue as possible. */
230 :
231 : static void
232 : ur_staged_clean( fd_vinyl_io_ur_t * ur ) {
233 : for(;;) {
234 : fd_vinyl_io_ur_rd_t * rd = ur->rd_head;
235 : if( !rd ) break;
236 :
237 : fd_vinyl_io_ur_rd_t ** rd_tail_next = ur->rd_tail_next;
238 : fd_vinyl_io_ur_rd_t * rd_next = rd->next;
239 :
240 : uint sqe_cnt = ur_prep_read( ur, rd, rd->seq, rd->sz );
241 : if( FD_UNLIKELY( !sqe_cnt ) ) break;
242 :
243 : ur->rd_head = rd_next;
244 : ur->rd_tail_next = fd_ptr_if( !!rd_next, rd_tail_next, &ur->rd_head );
245 : }
246 : }
247 :
248 : static void
249 : fd_vinyl_io_ur_read( fd_vinyl_io_t * io,
250 : fd_vinyl_io_rd_t * _rd ) {
251 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io;
252 : fd_vinyl_io_ur_rd_t * rd = (fd_vinyl_io_ur_rd_t *)_rd;
253 : ur_staged_push( ur, rd );
254 : ur_staged_clean( ur );
255 : }
256 :
257 : static int
258 : fd_vinyl_io_ur_poll( fd_vinyl_io_t * io,
259 : fd_vinyl_io_rd_t ** _rd,
260 : int flags ) {
261 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io;
262 : struct io_uring * ring = ur->ring;
263 : int blocking = !!( flags & FD_VINYL_IO_FLAG_BLOCKING );
264 : *_rd = NULL;
265 :
266 : uint cq_cnt = io_uring_cq_ready( ring );
267 : if( FD_UNLIKELY( !cq_cnt ) ) { /* no CQEs ready */
268 : /* Move staged work to submission queue */
269 : ur_staged_clean( ur );
270 :
271 : /* If no work is available to schedule, bail to avoid deadlock */
272 : int have_pending = ur->sq_prep_cnt > ur->sq_sent_cnt;
273 : int have_waiting = ur->sq_sent_cnt > ur->cq_cnt;
274 : if( FD_UNLIKELY( !have_pending && !have_waiting ) ) {
275 : return FD_VINYL_ERR_EMPTY;
276 : }
277 :
278 : /* Issue syscall to drive kernel */
279 : int submit_cnt;
280 : if( blocking ) {
281 : submit_cnt = io_uring_submit_and_wait( ring, 1U );
282 : } else {
283 : submit_cnt = io_uring_submit_and_get_events( ring );
284 : }
285 : if( FD_UNLIKELY( submit_cnt<0 ) ) {
286 : FD_LOG_ERR(( "%s failed (%i-%s)", blocking ? "io_uring_submit_and_wait" : "io_uring_submit_and_get_events", -submit_cnt, fd_io_strerror( -submit_cnt ) ));
287 : }
288 : ur->sq_sent_cnt += (ulong)submit_cnt;
289 :
290 : cq_cnt = io_uring_cq_ready( ring );
291 : if( !cq_cnt ) {
292 : if( FD_UNLIKELY( blocking ) ) FD_LOG_CRIT(( "io_uring_submit_and_wait() returned but no CQEs ready" ));
293 : return FD_VINYL_ERR_AGAIN;
294 : }
295 : }
296 :
297 : /* At this point, we have at least one CQE ready.
298 : It could come in one of these shapes:
299 : - Success (full read): implies that all fragments of a ur_rd read
300 : have been completed; only generated for the last frag
301 : - Short read: crash the app
302 : - Zero byte read: unexpected EOF reached, crash the app
303 : - Error (cancelled): a short read of the head frag broke the SQE
304 : chain, the tail got cancelled. Crash the app
305 : - Error (other): crash the app */
306 :
307 : struct io_uring_cqe * cqe = NULL;
308 : io_uring_peek_cqe( ring, &cqe );
309 : if( FD_UNLIKELY( !cqe ) ) FD_LOG_CRIT(( "io_uring_peek_cqe() yielded NULL despite io_uring_cq_ready()=%u", cq_cnt ));
310 : ulong udata = io_uring_cqe_get_data64( cqe );
311 : int last_frag = !fd_ulong_extract_bit( udata, 0 );
312 : fd_vinyl_io_ur_rd_t * rd = (void *)fd_ulong_clear_bit( udata, 0 );
313 : if( FD_UNLIKELY( !rd ) ) FD_LOG_CRIT(( "io_uring_peek_cqe() yielded invalid user data" ));
314 : int cqe_res = cqe->res;
315 : if( cqe_res<0 ) {
316 : FD_LOG_ERR(( "io_uring read failed (%i-%s)", -cqe_res, fd_io_strerror( -cqe_res ) ));
317 : }
318 : if( FD_UNLIKELY( !last_frag ) ) {
319 : FD_LOG_ERR(( "io_uring read failed (short read or EOF)" ));
320 : }
321 : if( FD_UNLIKELY( rd->tsz!=(uint)cqe_res ) ) {
322 : FD_LOG_ERR(( "io_uring read failed (expected %u bytes, got %i bytes)", rd->tsz, cqe_res ));
323 : }
324 : io_uring_cq_advance( ring, 1U );
325 : ur->cq_cnt++;
326 :
327 : *_rd = (fd_vinyl_io_rd_t *)rd;
328 : return FD_VINYL_SUCCESS;
329 : }
330 :
331 : /* fd_vinyl_io_ur_append is identical to fd_vinyl_io_bd_append. */
332 :
333 : static ulong
334 : fd_vinyl_io_ur_append( fd_vinyl_io_t * io,
335 : void const * _src,
336 : ulong sz ) {
337 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
338 : uchar const * src = (uchar const *)_src;
339 :
340 : /* Validate the input args. */
341 :
342 : ulong seq_future = ur->base->seq_future; if( FD_UNLIKELY( !sz ) ) return seq_future;
343 : ulong seq_ancient = ur->base->seq_ancient;
344 : int dev_fd = ur->dev_fd;
345 : ulong dev_base = ur->dev_base;
346 : ulong dev_sz = ur->dev_sz;
347 :
348 : int bad_src = !src;
349 : int bad_align = !fd_ulong_is_aligned( (ulong)src, FD_VINYL_BSTREAM_BLOCK_SZ );
350 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
351 : int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
352 :
353 : if( FD_UNLIKELY( bad_src | bad_align | bad_sz | bad_capacity ) )
354 : FD_LOG_CRIT(( bad_src ? "NULL src" :
355 : bad_align ? "misaligned src" :
356 : bad_sz ? "misaligned sz" :
357 : "device full" ));
358 :
359 : /* At this point, we appear to have a valid append request. Map it to
360 : the bstream (updating seq_future) and map it to the device. Then
361 : write the lesser of sz bytes or until the store end. If we hit the
362 : store end with more to go, wrap around and finish the write at the
363 : store start. */
364 :
365 : ulong seq = seq_future;
366 : ur->base->seq_future = seq + sz;
367 :
368 : ulong dev_off = seq % dev_sz;
369 :
370 : ulong wsz = fd_ulong_min( sz, dev_sz - dev_off );
371 : bd_write( dev_fd, dev_base + dev_off, src, wsz );
372 : sz -= wsz;
373 : if( sz ) bd_write( dev_fd, dev_base, src + wsz, sz );
374 :
375 : return seq;
376 : }
377 :
378 : /* fd_vinyl_io_ur_commit is identical to fd_vinyl_io_bd_commit. */
379 :
380 : static int
381 : fd_vinyl_io_ur_commit( fd_vinyl_io_t * io,
382 : int flags ) {
383 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
384 : (void)flags;
385 :
386 : ur->base->seq_present = ur->base->seq_future;
387 : ur->base->spad_used = 0UL;
388 :
389 : return FD_VINYL_SUCCESS;
390 : }
391 :
392 : /* fd_vinyl_io_ur_hint is identical to fd_vinyl_io_bd_hint. */
393 :
394 : static ulong
395 : fd_vinyl_io_ur_hint( fd_vinyl_io_t * io,
396 : ulong sz ) {
397 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
398 :
399 : ulong seq_future = ur->base->seq_future; if( FD_UNLIKELY( !sz ) ) return seq_future;
400 : ulong seq_ancient = ur->base->seq_ancient;
401 : ulong dev_sz = ur->dev_sz;
402 :
403 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
404 : int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
405 :
406 : if( FD_UNLIKELY( bad_sz | bad_capacity ) ) FD_LOG_CRIT(( bad_sz ? "misaligned sz" : "device full" ));
407 :
408 : return ur->base->seq_future;
409 : }
410 :
411 : /* fd_vinyl_io_ur_alloc is identical to fd_vinyl_io_bd_alloc. */
412 :
413 : static void *
414 : fd_vinyl_io_ur_alloc( fd_vinyl_io_t * io,
415 : ulong sz,
416 : int flags ) {
417 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
418 :
419 : ulong spad_max = ur->base->spad_max;
420 : ulong spad_used = ur->base->spad_used; if( FD_UNLIKELY( !sz ) ) return ((uchar *)(ur+1)) + spad_used;
421 :
422 : int bad_align = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
423 : int bad_sz = sz > spad_max;
424 :
425 : if( FD_UNLIKELY( bad_align | bad_sz ) ) FD_LOG_CRIT(( bad_align ? "misaligned sz" : "sz too large" ));
426 :
427 : if( FD_UNLIKELY( sz > (spad_max - spad_used ) ) ) {
428 : if( FD_UNLIKELY( fd_vinyl_io_ur_commit( io, flags ) ) ) return NULL;
429 : spad_used = 0UL;
430 : }
431 :
432 : ur->base->spad_used = spad_used + sz;
433 :
434 : return ((uchar *)(ur+1)) + spad_used;
435 : }
436 :
437 : /* fd_vinyl_io_ur_copy is identical to fd_vinyl_io_bd_copy. */
438 :
439 : static ulong
440 : fd_vinyl_io_ur_copy( fd_vinyl_io_t * io,
441 : ulong seq_src0,
442 : ulong sz ) {
443 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
444 :
445 : /* Validate the input args */
446 :
447 : ulong seq_ancient = ur->base->seq_ancient;
448 : ulong seq_past = ur->base->seq_past;
449 : ulong seq_present = ur->base->seq_present;
450 : ulong seq_future = ur->base->seq_future; if( FD_UNLIKELY( !sz ) ) return seq_future;
451 : ulong spad_max = ur->base->spad_max;
452 : ulong spad_used = ur->base->spad_used;
453 : int dev_fd = ur->dev_fd;
454 : ulong dev_base = ur->dev_base;
455 : ulong dev_sz = ur->dev_sz;
456 :
457 : ulong seq_src1 = seq_src0 + sz;
458 :
459 : int bad_past = !( fd_vinyl_seq_le( seq_past, seq_src0 ) &
460 : fd_vinyl_seq_lt( seq_src0, seq_src1 ) &
461 : fd_vinyl_seq_le( seq_src1, seq_present ) );
462 : int bad_src = !fd_ulong_is_aligned( seq_src0, FD_VINYL_BSTREAM_BLOCK_SZ );
463 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
464 : int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
465 :
466 : if( FD_UNLIKELY( bad_past | bad_src | bad_sz | bad_capacity ) )
467 : FD_LOG_CRIT(( bad_past ? "src is not in the past" :
468 : bad_src ? "misaligned src_seq" :
469 : bad_sz ? "misaligned sz" :
470 : "device full" ));
471 :
472 : /* At this point, we appear to have a valid copy request. Get
473 : buffer space from the scratch pad (committing as necessary). */
474 :
475 : if( FD_UNLIKELY( sz>(spad_max-spad_used) ) ) {
476 : fd_vinyl_io_ur_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
477 : spad_used = 0UL;
478 : }
479 :
480 : uchar * buf = (uchar *)(ur+1) + spad_used;
481 : ulong buf_max = spad_max - spad_used;
482 :
483 : /* Map the dst to the bstream (updating seq_future) and map the src
484 : and dst regions onto the device. Then copy as much as we can at a
485 : time, handling device wrap around and copy buffering space. */
486 :
487 : ulong seq = seq_future;
488 : ur->base->seq_future = seq + sz;
489 :
490 : ulong seq_dst0 = seq;
491 :
492 : for(;;) {
493 : ulong src_off = seq_src0 % dev_sz;
494 : ulong dst_off = seq_dst0 % dev_sz;
495 : ulong csz = fd_ulong_min( fd_ulong_min( sz, buf_max ), fd_ulong_min( dev_sz - src_off, dev_sz - dst_off ) );
496 :
497 : bd_read ( dev_fd, dev_base + src_off, buf, csz );
498 : bd_write( dev_fd, dev_base + dst_off, buf, csz );
499 :
500 : sz -= csz;
501 : if( !sz ) break;
502 :
503 : seq_src0 += csz;
504 : seq_dst0 += csz;
505 : }
506 :
507 : return seq;
508 : }
509 :
510 : /* fd_vinyl_io_ur_forget is identical to fd_vinyl_io_bd_forget. */
511 :
512 : static void
513 : fd_vinyl_io_ur_forget( fd_vinyl_io_t * io,
514 : ulong seq ) {
515 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
516 :
517 : /* Validate input arguments. Note that we don't allow forgetting into
518 : the future even when we have no uncommitted blocks because the
519 : resulting [seq_ancient,seq_future) might contain blocks that were
520 : never written (which might not be an issue practically but it would
521 : be a bit strange for something to try to scan starting from
522 : seq_ancient and discover unwritten blocks). */
523 :
524 : ulong seq_past = ur->base->seq_past;
525 : ulong seq_present = ur->base->seq_present;
526 : ulong seq_future = ur->base->seq_future;
527 :
528 : int bad_seq = !fd_ulong_is_aligned( seq, FD_VINYL_BSTREAM_BLOCK_SZ );
529 : int bad_dir = !(fd_vinyl_seq_le( seq_past, seq ) & fd_vinyl_seq_le( seq, seq_present ));
530 : int bad_read = !!ur->rd_head;
531 : int bad_append = fd_vinyl_seq_ne( seq_present, seq_future );
532 :
533 : if( FD_UNLIKELY( bad_seq | bad_dir | bad_read | bad_append ) )
534 : FD_LOG_CRIT(( "forget to seq %016lx failed (past [%016lx,%016lx)/%lu, %s)",
535 : seq, seq_past, seq_present, seq_present-seq_past,
536 : bad_seq ? "misaligned seq" :
537 : bad_dir ? "seq out of bounds" :
538 : bad_read ? "reads in progress" :
539 : "appends/copies in progress" ));
540 :
541 : ur->base->seq_past = seq;
542 : }
543 :
544 : /* fd_vinyl_io_ur_rewind is identical to fd_vinyl_io_bd_rewind. */
545 :
546 : static void
547 : fd_vinyl_io_ur_rewind( fd_vinyl_io_t * io,
548 : ulong seq ) {
549 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
550 :
551 : /* Validate input argments. Unlike forgot, we do allow rewinding to
552 : before seq_ancient as the region of sequence space reported to the
553 : caller as written is still accurate. */
554 :
555 : ulong seq_ancient = ur->base->seq_ancient;
556 : ulong seq_past = ur->base->seq_past;
557 : ulong seq_present = ur->base->seq_present;
558 : ulong seq_future = ur->base->seq_future;
559 :
560 : int bad_seq = !fd_ulong_is_aligned( seq, FD_VINYL_BSTREAM_BLOCK_SZ );
561 : int bad_dir = fd_vinyl_seq_gt( seq, seq_present );
562 : int bad_read = !!ur->rd_head;
563 : int bad_append = fd_vinyl_seq_ne( seq_present, seq_future );
564 :
565 : if( FD_UNLIKELY( bad_seq | bad_dir | bad_read | bad_append ) )
566 : FD_LOG_CRIT(( "rewind to seq %016lx failed (present %016lx, %s)", seq, seq_present,
567 : bad_seq ? "misaligned seq" :
568 : bad_dir ? "seq after seq_present" :
569 : bad_read ? "reads in progress" :
570 : "appends/copies in progress" ));
571 :
572 : ur->base->seq_ancient = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_ancient ), seq_ancient, seq );
573 : ur->base->seq_past = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_past ), seq_past, seq );
574 : ur->base->seq_present = seq;
575 : ur->base->seq_future = seq;
576 : }
577 :
578 : /* fd_vinyl_io_ur_sync is identical to fd_vinyl_io_bd_sync. */
579 :
580 : static int
581 : fd_vinyl_io_ur_sync( fd_vinyl_io_t * io,
582 : int flags ) {
583 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
584 : (void)flags;
585 :
586 : ulong seed = ur->base->seed;
587 : ulong seq_past = ur->base->seq_past;
588 : ulong seq_present = ur->base->seq_present;
589 :
590 : int dev_fd = ur->dev_fd;
591 : ulong dev_sync = ur->dev_sync;
592 :
593 : fd_vinyl_bstream_block_t * block = ur->sync;
594 :
595 : /* block->sync.ctl current (static) */
596 : block->sync.seq_past = seq_past;
597 : block->sync.seq_present = seq_present;
598 : /* block->sync.info_sz current (static) */
599 : /* block->sync.info current (static) */
600 :
601 : block->sync.hash_trail = 0UL;
602 : block->sync.hash_blocks = 0UL;
603 : fd_vinyl_bstream_block_hash( seed, block ); /* sets hash_trail back to seed */
604 :
605 : bd_write( dev_fd, dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ );
606 :
607 : ur->base->seq_ancient = seq_past;
608 :
609 : return FD_VINYL_SUCCESS;
610 : }
611 :
612 : /* fd_vinyl_io_ur_fini is identical to fd_vinyl_io_bd_fini. */
613 :
614 : static void *
615 : fd_vinyl_io_ur_fini( fd_vinyl_io_t * io ) {
616 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
617 :
618 : ulong seq_present = ur->base->seq_present;
619 : ulong seq_future = ur->base->seq_future;
620 :
621 : if( FD_UNLIKELY( ur->rd_head ) ) FD_LOG_WARNING(( "fini completing outstanding reads" ));
622 : if( FD_UNLIKELY( fd_vinyl_seq_ne( seq_present, seq_future ) ) ) FD_LOG_WARNING(( "fini discarding uncommited blocks" ));
623 :
624 : return io;
625 : }
626 :
627 : static fd_vinyl_io_impl_t fd_vinyl_io_ur_impl[1] = { {
628 : fd_vinyl_io_ur_read_imm,
629 : fd_vinyl_io_ur_read,
630 : fd_vinyl_io_ur_poll,
631 : fd_vinyl_io_ur_append,
632 : fd_vinyl_io_ur_commit,
633 : fd_vinyl_io_ur_hint,
634 : fd_vinyl_io_ur_alloc,
635 : fd_vinyl_io_ur_copy,
636 : fd_vinyl_io_ur_forget,
637 : fd_vinyl_io_ur_rewind,
638 : fd_vinyl_io_ur_sync,
639 : fd_vinyl_io_ur_fini
640 : } };
641 :
642 : FD_STATIC_ASSERT( alignof(fd_vinyl_io_ur_t)==FD_VINYL_BSTREAM_BLOCK_SZ, layout );
643 :
644 : ulong
645 : fd_vinyl_io_ur_align( void ) {
646 : return alignof(fd_vinyl_io_ur_t);
647 : }
648 :
649 : ulong
650 : fd_vinyl_io_ur_footprint( ulong spad_max ) {
651 : if( FD_UNLIKELY( !((0UL<spad_max) & (spad_max<(1UL<<63)) & fd_ulong_is_aligned( spad_max, FD_VINYL_BSTREAM_BLOCK_SZ )) ) )
652 : return 0UL;
653 : return sizeof(fd_vinyl_io_ur_t) + spad_max;
654 : }
655 :
656 : fd_vinyl_io_t *
657 : fd_vinyl_io_ur_init( void * mem,
658 : ulong spad_max,
659 : int dev_fd,
660 : struct io_uring * ring ) {
661 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)mem;
662 :
663 : if( FD_UNLIKELY( !ur ) ) {
664 : FD_LOG_WARNING(( "NULL mem" ));
665 : return NULL;
666 : }
667 :
668 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)ur, fd_vinyl_io_ur_align() ) ) ) {
669 : FD_LOG_WARNING(( "misaligned mem" ));
670 : return NULL;
671 : }
672 :
673 : ulong footprint = fd_vinyl_io_ur_footprint( spad_max );
674 : if( FD_UNLIKELY( !footprint ) ) {
675 : FD_LOG_WARNING(( "bad spad_max" ));
676 : return NULL;
677 : }
678 :
679 : off_t _dev_sz = lseek( dev_fd, (off_t)0, SEEK_END );
680 : if( FD_UNLIKELY( _dev_sz<(off_t)0 ) ) {
681 : FD_LOG_WARNING(( "lseek failed, bstream must be seekable (%i-%s)", errno, fd_io_strerror( errno ) ));
682 : return NULL;
683 : }
684 : ulong dev_sz = (ulong)_dev_sz;
685 :
686 : ulong dev_sz_min = 3UL*FD_VINYL_BSTREAM_BLOCK_SZ /* sync block, move block, closing partition */
687 : + fd_vinyl_bstream_pair_sz( FD_VINYL_VAL_MAX ); /* worst case pair (FIXME: LZ4_COMPRESSBOUND?) */
688 :
689 : int too_small = dev_sz < dev_sz_min;
690 : int too_large = dev_sz > (ulong)LONG_MAX;
691 : int misaligned = !fd_ulong_is_aligned( dev_sz, FD_VINYL_BSTREAM_BLOCK_SZ );
692 :
693 : if( FD_UNLIKELY( too_small | too_large | misaligned ) ) {
694 : FD_LOG_WARNING(( "bstream size %s", too_small ? "too small" :
695 : too_large ? "too large" :
696 : "not a block size multiple" ));
697 : return NULL;
698 : }
699 :
700 : memset( ur, 0, footprint );
701 :
702 : ur->base->type = FD_VINYL_IO_TYPE_UR;
703 :
704 : /* io_seed, seq_ancient, seq_past, seq_present, seq_future are init
705 : below */
706 :
707 : ur->base->spad_max = spad_max;
708 : ur->base->spad_used = 0UL;
709 : ur->base->impl = fd_vinyl_io_ur_impl;
710 :
711 : ur->dev_fd = dev_fd;
712 : ur->dev_sync = 0UL; /* Use the beginning of the file for the sync block */
713 : ur->dev_base = FD_VINYL_BSTREAM_BLOCK_SZ; /* Use the rest for the actual bstream store (at least 3.5 KiB) */
714 : ur->dev_sz = dev_sz - FD_VINYL_BSTREAM_BLOCK_SZ;
715 :
716 : ur->rd_head = NULL;
717 : ur->rd_tail_next = &ur->rd_head;
718 :
719 : ur->ring = ring;
720 :
721 : /* FIXME: Consider having the sync block on a completely separate
722 : device (to reduce seeking when syncing). */
723 :
724 : fd_vinyl_bstream_block_t * block = ur->sync;
725 :
726 : bd_read( dev_fd, ur->dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ ); /* logs details */
727 :
728 : int type = fd_vinyl_bstream_ctl_type ( block->sync.ctl );
729 : int version = fd_vinyl_bstream_ctl_style( block->sync.ctl );
730 : ulong val_max = fd_vinyl_bstream_ctl_sz ( block->sync.ctl );
731 : ulong seq_past = block->sync.seq_past;
732 : ulong seq_present = block->sync.seq_present;
733 : ulong info_sz = block->sync.info_sz; // overrides user info_sz
734 : void const * info = block->sync.info; // overrides user info
735 : ulong io_seed = block->sync.hash_trail; // overrides user io_seed
736 :
737 : int bad_type = (type != FD_VINYL_BSTREAM_CTL_TYPE_SYNC);
738 : int bad_version = (version != 0);
739 : int bad_val_max = (val_max != FD_VINYL_VAL_MAX);
740 : int bad_seq_past = !fd_ulong_is_aligned( seq_past, FD_VINYL_BSTREAM_BLOCK_SZ );
741 : int bad_seq_present = !fd_ulong_is_aligned( seq_present, FD_VINYL_BSTREAM_BLOCK_SZ );
742 : int bad_info_sz = (info_sz > FD_VINYL_BSTREAM_SYNC_INFO_MAX);
743 : int bad_past_order = fd_vinyl_seq_gt( seq_past, seq_present );
744 : int bad_past_sz = ((seq_present-seq_past) > ur->dev_sz);
745 :
746 : if( FD_UNLIKELY( bad_type | bad_version | bad_val_max | bad_seq_past | bad_seq_present | bad_info_sz |
747 : bad_past_order | bad_past_sz ) ) {
748 : FD_LOG_WARNING(( "bad sync block when recovering bstream (%s)",
749 : bad_type ? "unexpected type" :
750 : bad_version ? "unexpected version" :
751 : bad_val_max ? "unexpected max pair value decoded byte size" :
752 : bad_seq_past ? "unaligned seq_past" :
753 : bad_seq_present ? "unaligned seq_present" :
754 : bad_info_sz ? "unexpected info size" :
755 : bad_past_order ? "unordered seq_past and seq_present" :
756 : "past size larger than bstream store" ));
757 : return NULL;
758 : }
759 :
760 : if( FD_UNLIKELY( fd_vinyl_bstream_block_test( io_seed, block ) ) ) {
761 : FD_LOG_WARNING(( "corrupt sync block when recovering bstream" ));
762 : return NULL;
763 : }
764 :
765 : ur->base->seed = io_seed;
766 : ur->base->seq_ancient = seq_past;
767 : ur->base->seq_past = seq_past;
768 : ur->base->seq_present = seq_present;
769 : ur->base->seq_future = seq_present;
770 :
771 : FD_LOG_NOTICE(( "IO config"
772 : "\n\ttype ur"
773 : "\n\tspad_max %lu bytes"
774 : "\n\tdev_sz %lu bytes"
775 : "\n\tinfo \"%s\" (info_sz %lu, discovered)"
776 : "\n\tio_seed 0x%016lx (discovered)",
777 : spad_max, dev_sz,
778 : (char const *)info, info_sz,
779 : io_seed ));
780 :
781 : return ur->base;
782 : }
783 :
784 : #else /* io_uring not supported */
785 :
786 : ulong
787 0 : fd_vinyl_io_ur_align( void ) {
788 0 : return 8UL;
789 0 : }
790 :
791 : ulong
792 0 : fd_vinyl_io_ur_footprint( ulong spad_max ) {
793 0 : (void)spad_max;
794 0 : return 8UL;
795 0 : }
796 :
797 : fd_vinyl_io_t *
798 : fd_vinyl_io_ur_init( void * mem,
799 : ulong spad_max,
800 : int dev_fd,
801 0 : struct io_uring * ring ) {
802 0 : (void)mem; (void)spad_max; (void)dev_fd; (void)ring;
803 0 : FD_LOG_WARNING(( "Sorry, this build does not support io_uring" ));
804 : return NULL;
805 0 : }
806 :
807 : #endif
|