Line data Source code
1 : /* fd_vinyl_io_ur_rd.c provides io_ur read methods. */
2 :
3 : #include "fd_vinyl_io_ur_private.h"
4 :
5 : /* io_ur reads are either served by the write-back cache or by disk,
6 : see the explanation in fd_vinyl_io_ur_wb.c.
7 :
8 : For a given request, the low part of the request is served by disk
9 : and the high part is served by the write-back cache. wb_read handles
10 : the high part.
11 :
12 : [seq0,seq0+sz) is the requested bstream range (assumed to be in
13 : [seq_past,seq_present)). wb_read identifies the pivot point retval
14 : where seq0<=retval<=seq0+sz and copies [retval,seq0+sz) to
15 : [dst0+retval,dst0+sz).
16 :
17 : Returns retval (the number of bytes that were not read). I.e.
18 : [seq0,seq0+retval) gives the bstream range that should be fetched
19 : from the underlying store. */
20 :
21 : static ulong
22 : wb_read( fd_vinyl_io_ur_t * io,
23 : uchar * dst0, /* destination buffer */
24 : ulong seq0, /* request lower bound */
25 0 : ulong sz ) { /* request size */
26 0 : if( FD_UNLIKELY( !sz ) ) return 0UL;
27 :
28 : /* validate that this request spans committed data */
29 0 : ulong seq_cache = io->seq_cache;
30 0 : ulong seq1 = seq0 + sz;
31 :
32 0 : if( FD_UNLIKELY( fd_vinyl_seq_lt( seq0, io->base->seq_past ) |
33 0 : fd_vinyl_seq_gt( seq1, io->base->seq_present ) ) ) {
34 0 : FD_LOG_CRIT(( "bstream write-back cache read [%016lx,%016lx)/%lu is out-of-bounds (past [%016lx,%016lx))",
35 0 : seq0, seq1, sz,
36 0 : io->base->seq_past, io->base->seq_present ));
37 0 : }
38 :
39 : /* expect most reads to hit disk (seq_cache marks the first seq that
40 : is present in cache) */
41 0 : if( FD_LIKELY( fd_vinyl_seq_le( seq1, seq_cache ) ) ) return sz;
42 :
43 : /* request seq range served by cache */
44 0 : ulong req0 = fd_ulong_max( seq0, seq_cache );
45 0 : ulong req1 = seq1;
46 0 : FD_CRIT( fd_vinyl_seq_ge( req0, wb_ring_seq0( &io->wb ) ), "invariant violation" );
47 0 : FD_CRIT( fd_vinyl_seq_le( req1, wb_ring_seq1( &io->wb ) ), "invariant violation" );
48 :
49 : /* output range */
50 0 : uchar * dst = dst0 + (req0-seq0);
51 0 : ulong rsz = req1 - req0;
52 0 : FD_CRIT( dst>=dst0 && dst<dst0+sz && rsz<=sz, "invariant violation" );
53 :
54 : /* find source range, copy into output */
55 0 : uchar const * spad = fd_vinyl_io_ur_wb_buf( io );
56 0 : wb_ring_span_t span = wb_ring_translate( &io->wb, req0, rsz );
57 0 : if( span.sz0 ) fd_memcpy( dst, spad+span.off0, span.sz0 );
58 0 : if( span.sz1 ) fd_memcpy( dst+span.sz0, spad+span.off1, span.sz1 );
59 0 : io->base->cache_read_cnt += (ulong)( !!span.sz0 + !!span.sz1 );
60 0 : io->base->cache_read_tot_sz += span.sz0 + span.sz1;
61 :
62 0 : return sz-rsz;
63 0 : }
64 :
65 : /* fd_vinyl_io_ur_read_imm (vinyl_io interface) does a blocking read. */
66 :
67 : void
68 : fd_vinyl_io_ur_read_imm( fd_vinyl_io_t * io,
69 : ulong seq0,
70 : void * _dst,
71 0 : ulong sz ) {
72 0 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
73 :
74 : /* If this is a request to read nothing, succeed immediately. If
75 : this is a request to read outside the bstream's past, fail. */
76 :
77 0 : if( FD_UNLIKELY( !sz ) ) return;
78 :
79 0 : uchar * dst = (uchar *)_dst;
80 0 : ulong seq1 = seq0 + sz;
81 :
82 0 : ulong seq_past = ur->base->seq_past;
83 0 : ulong seq_present = ur->base->seq_present;
84 :
85 0 : int bad_seq = !fd_ulong_is_aligned( seq0, FD_VINYL_BSTREAM_BLOCK_SZ );
86 0 : int bad_dst = (!fd_ulong_is_aligned( (ulong)dst, FD_VINYL_BSTREAM_BLOCK_SZ )) | !dst;
87 0 : int bad_sz = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
88 0 : int bad_past = !(fd_vinyl_seq_le( seq_past, seq0 ) & fd_vinyl_seq_lt( seq0, seq1 ) & fd_vinyl_seq_le( seq1, seq_present ));
89 :
90 0 : if( FD_UNLIKELY( bad_seq | bad_dst | bad_sz | bad_past ) )
91 0 : FD_LOG_CRIT(( "bstream read_imm [%016lx,%016lx)/%lu failed (past [%016lx,%016lx)/%lu, %s)",
92 0 : seq0, seq1, sz, seq_past, seq_present, seq_present-seq_past,
93 0 : bad_seq ? "misaligned seq" :
94 0 : bad_dst ? "misaligned or NULL dst" :
95 0 : bad_sz ? "misaligned sz" :
96 0 : "not in past" ));
97 :
98 : /* At this point, we have a valid read request. Serve tail from
99 : write-back cache. */
100 :
101 0 : sz = wb_read( ur, dst, seq0, sz );
102 0 : if( !sz ) return;
103 0 : seq1 = seq0 + sz; /* unused */
104 :
105 : /* Read the rest from disk. Map seq0 into the bstream store. Read
106 : the lesser of sz bytes or until the store end. If we hit the store
107 : end with more to go, wrap around and finish the read at the store
108 : start. */
109 :
110 0 : int dev_fd = ur->dev_fd;
111 0 : ulong dev_base = ur->dev_base;
112 0 : ulong dev_sz = ur->dev_sz;
113 :
114 0 : ulong dev_off = seq0 % dev_sz;
115 :
116 0 : ulong rsz = fd_ulong_min( sz, dev_sz - dev_off );
117 0 : bd_read( dev_fd, dev_base + dev_off, dst, rsz );
118 0 : sz -= rsz;
119 0 : ur->base->file_read_cnt++;
120 0 : ur->base->file_read_tot_sz += rsz;
121 :
122 0 : if( FD_UNLIKELY( sz ) ) {
123 0 : bd_read( dev_fd, dev_base, dst + rsz, sz );
124 0 : ur->base->file_read_cnt++;
125 0 : ur->base->file_read_tot_sz += sz;
126 0 : }
127 :
128 0 : }
129 :
130 : /* ### Read pipeline explainer
131 :
132 : vinyl_io clients submit read jobs using vinyl_io_read, and poll for
133 : completions using vinyl_io_poll. Reads may complete in arbitrary
134 : order. On first sight, this cleanly translates to io_uring.
135 :
136 : Read job descriptors are user-allocated. The client is not aware of
137 : any job queue depth limits in the vinyl_io backend's internals. The
138 : vinyl_io backend is expected to queue up an infinitely deep backlog
139 : of read jobs. However, the io_uring submission queue has a hard
140 : depth limit.
141 :
142 : The vinyl_io lifecycle therefore is as follows:
143 : - io_ur_read adds a read job to the 'staged' queue. This is a linked
144 : list weaving through all user-submitted jobs.
145 : - io_ur_read/io_ur_poll move jobs from the 'staged' queue to the
146 : 'wait' heap. Each wait heap entry is shadowed by an io_uring
147 : submission queue entry.
148 : - io_ur_poll matches io_uring completions with corresponding 'wait'
149 : heap entries. Each entry is removed from the 'wait' heap and
150 : returned back to the user.
151 :
152 : In rare cases, a bstream read may wrap around the end of the bstream.
153 : In this case, two linked SQEs are generated.
154 :
155 : ### Polling
156 :
157 : fd_vinyl_io_read registers work in userspace only but does not do any
158 : syscalls. fd_vinyl_io_poll submits read jobs (calls kernel io_uring
159 : syscall) if there is any work pending, then polls for completions. */
160 :
161 : /* rq_push adds a read job to the staged queue. */
162 :
163 : static void
164 : rq_push( fd_vinyl_io_ur_t * ur,
165 0 : fd_vinyl_io_ur_rd_t * rd ) {
166 0 : rd->next = NULL;
167 0 : *ur->rq_tail_next = rd;
168 0 : ur->rq_tail_next = &rd->next;
169 0 : }
170 :
171 : /* rc_push adds a read job to the early-complete queue. */
172 :
173 : static void
174 : rc_push( fd_vinyl_io_ur_t * ur,
175 0 : fd_vinyl_io_ur_rd_t * rd ) {
176 0 : rd->next = NULL;
177 0 : *ur->rc_tail_next = rd;
178 0 : ur->rc_tail_next = &rd->next;
179 0 : }
180 :
181 : static void
182 : track_sqe_read( fd_vinyl_io_ur_t * ur,
183 0 : ulong sz ) {
184 0 : ur->base->file_read_cnt++;
185 0 : ur->base->file_read_tot_sz += sz;
186 0 : ur->sqe_prep_cnt++;
187 0 : ur->cqe_pending++;
188 0 : ur->cqe_read_pending++;
189 0 : }
190 :
191 : /* rq_prep translates a staged read job into one (or rarely two)
192 : io_uring SQEs. SQEs are allocated off the io_uring instance.
193 : Returns the number of SQEs prepared on success, and moves rd onto the
194 : wait heap. Might not prepare any SQEs if the read request was served
195 : entirely from cache. On failure to allocate SQEs, behaves like a
196 : no-op (safe to retry) and returns -1. */
197 :
198 : static int
199 : rq_prep( fd_vinyl_io_ur_t * ur,
200 : fd_vinyl_io_ur_rd_t * rd,
201 : ulong seq0,
202 0 : ulong sz ) {
203 :
204 0 : fd_io_uring_t * ring = ur->ring;
205 0 : if( FD_UNLIKELY( sz>INT_MAX ) ) {
206 0 : FD_LOG_CRIT(( "Invalid read size 0x%lx bytes (exceeds max)", sz ));
207 0 : }
208 0 : if( FD_UNLIKELY( fd_io_uring_sq_space_left( ring->sq )<2U ) ) return -1;
209 :
210 : /* Serve tail from write-back cache */
211 :
212 0 : sz = wb_read( ur, rd->dst, seq0, sz );
213 0 : if( !sz ) {
214 0 : rc_push( ur, rd );
215 0 : return 0;
216 0 : }
217 :
218 : /* If the read was entirely served by cache (sz==0), generate an
219 : io_uring SQE regardless, since we don't have any data structure to
220 : hold early completions. */
221 :
222 : /* Map seq0 into the bstream store. */
223 :
224 0 : ulong dev_base = ur->dev_base;
225 0 : ulong dev_sz = ur->dev_sz;
226 :
227 0 : ulong dev_off = seq0 % dev_sz;
228 :
229 0 : ulong rsz = fd_ulong_min( sz, dev_sz - dev_off );
230 0 : sz -= rsz;
231 :
232 : /* Prepare the head SQE */
233 0 : rd->next = NULL;
234 0 : rd->head_off = 0U;
235 0 : rd->head_sz = (uint)rsz;
236 0 : rd->tail_off = 0U;
237 0 : rd->tail_sz = 0U;
238 0 : struct io_uring_sqe * sqe = fd_io_uring_get_sqe( ring->sq );
239 0 : if( FD_UNLIKELY( !sqe ) ) FD_LOG_CRIT(( "fd_io_uring_get_sqe() returned NULL despite io_uring_sq_space_left()>=2" ));
240 0 : *sqe = (struct io_uring_sqe) {
241 0 : .opcode = IORING_OP_READ,
242 0 : .fd = 0, /* fixed file index 0 */
243 0 : .off = dev_base + dev_off,
244 0 : .addr = (ulong)rd->dst,
245 0 : .len = (uint)rsz,
246 0 : .flags = IOSQE_FIXED_FILE,
247 0 : .user_data = ur_udata_pack_ptr( UR_REQ_READ, rd ),
248 0 : };
249 0 : track_sqe_read( ur, rsz );
250 0 : if( FD_LIKELY( !sz ) ) return 1; /* optimize for the unfragmented case */
251 :
252 : /* Prepare the tail SQE */
253 0 : rd->tail_sz = (uint)sz;
254 0 : sqe = fd_io_uring_get_sqe( ring->sq );
255 0 : if( FD_UNLIKELY( !sqe ) ) FD_LOG_CRIT(( "fd_io_uring_get_sqe() returned NULL despite io_uring_sq_space_left()>=2" ));
256 0 : *sqe = (struct io_uring_sqe) {
257 0 : .opcode = IORING_OP_READ,
258 0 : .fd = 0, /* fixed file index 0 */
259 0 : .off = dev_base,
260 0 : .addr = (ulong)rd->dst + rsz,
261 0 : .len = (uint)sz,
262 0 : .flags = IOSQE_FIXED_FILE,
263 0 : .user_data = ur_udata_pack_ptr( UR_REQ_READ_TAIL, rd ),
264 0 : };
265 0 : track_sqe_read( ur, sz );
266 0 : return 2;
267 0 : }
268 :
269 : /* rq_clean moves as many read jobs from the staged queue to the
270 : submission queue as possible. */
271 :
272 : static void
273 0 : rq_clean( fd_vinyl_io_ur_t * ur ) {
274 0 : for(;;) {
275 0 : fd_vinyl_io_ur_rd_t * rd = ur->rq_head;
276 0 : if( !rd ) break;
277 :
278 0 : fd_vinyl_io_ur_rd_t ** rq_tail_next = ur->rq_tail_next;
279 0 : fd_vinyl_io_ur_rd_t * rq_next = rd->next;
280 :
281 0 : if( FD_UNLIKELY( rq_prep( ur, rd, rd->seq, rd->sz )<0 ) ) break;
282 :
283 0 : ur->rq_head = rq_next;
284 0 : ur->rq_tail_next = fd_ptr_if( !!rq_next, rq_tail_next, &ur->rq_head );
285 0 : }
286 0 : }
287 :
288 : void
289 : fd_vinyl_io_ur_read( fd_vinyl_io_t * io,
290 0 : fd_vinyl_io_rd_t * _rd ) {
291 0 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io;
292 0 : fd_vinyl_io_ur_rd_t * rd = (fd_vinyl_io_ur_rd_t *)_rd;
293 0 : rq_push( ur, rd );
294 0 : rq_clean( ur );
295 0 : }
296 :
297 : /* rq_prep_retry re-enqueues a SQE after a short read */
298 :
299 : static void
300 : rq_prep_retry( fd_vinyl_io_ur_t * ur,
301 : fd_vinyl_io_ur_rd_t * rd,
302 0 : ulong req_type ) {
303 0 : fd_io_uring_t * ring = ur->ring;
304 :
305 0 : ulong dev_base = ur->dev_base;
306 0 : ulong dev_sz = ur->dev_sz;
307 :
308 0 : ulong frag_dev_off;
309 0 : uchar * frag_dst;
310 0 : ulong frag_sz;
311 0 : if( FD_LIKELY( req_type==UR_REQ_READ ) ) {
312 0 : frag_dev_off = dev_base + (rd->seq % dev_sz) + rd->head_off;
313 0 : frag_dst = (uchar *)rd->dst + rd->head_off;
314 0 : frag_sz = rd->head_sz - rd->head_off;
315 0 : } else { /* tail read */
316 0 : frag_dev_off = dev_base + rd->tail_off;
317 0 : frag_dst = (uchar *)rd->dst + rd->head_sz + rd->tail_off;
318 0 : frag_sz = rd->tail_sz - rd->tail_off;
319 0 : }
320 :
321 0 : struct io_uring_sqe * sqe = fd_io_uring_get_sqe( ring->sq );
322 0 : if( FD_UNLIKELY( !sqe ) ) FD_LOG_CRIT(( "no SQ space available; unbalanced SQ and CQ?" ));
323 0 : *sqe = (struct io_uring_sqe) {
324 0 : .opcode = IORING_OP_READ,
325 0 : .fd = 0, /* fixed file index 0 */
326 0 : .off = frag_dev_off,
327 0 : .addr = (ulong)frag_dst,
328 0 : .len = (uint)frag_sz,
329 0 : .flags = IOSQE_FIXED_FILE,
330 0 : .user_data = ur_udata_pack_ptr( req_type, rd ),
331 0 : };
332 0 : track_sqe_read( ur, frag_sz );
333 0 : }
334 :
335 : /* rq_completion consumes an io_uring CQE. Returns io_rd if a read job
336 : completed, otherwise returns NULL. */
337 :
338 : static fd_vinyl_io_rd_t *
339 0 : rq_completion( fd_vinyl_io_ur_t * ur ) {
340 0 : fd_io_uring_t * ring = ur->ring;
341 :
342 : /* The CQE could come in one of these shapes:
343 : - Success (full read)
344 : - Short read: re-enqueue
345 : - Zero byte read: unexpected EOF reached, crash the app
346 : - Error: crash the app */
347 :
348 0 : FD_CRIT( ur->cqe_pending >0, "stray completion" );
349 0 : FD_CRIT( ur->cqe_read_pending>0, "stray read completion" );
350 :
351 0 : struct io_uring_cqe * cqe = fd_io_uring_cq_head( ring->cq );
352 0 : ulong req_type = ur_udata_req_type( cqe->user_data );
353 0 : fd_vinyl_io_ur_rd_t * rd = ur_udata_ptr ( cqe->user_data );
354 :
355 0 : if( FD_UNLIKELY( !rd ) ) FD_LOG_CRIT(( "io_uring_peek_cqe() yielded invalid user data" ));
356 0 : int cqe_res = cqe->res;
357 0 : if( cqe_res<0 ) {
358 0 : FD_LOG_ERR(( "io_uring read failed (%i-%s)", -cqe_res, fd_io_strerror( -cqe_res ) ));
359 0 : }
360 0 : if( FD_UNLIKELY( cqe_res==0 ) ) {
361 0 : FD_LOG_ERR(( "io_uring read failed (unexpected EOF)" ));
362 0 : }
363 :
364 : /* interpret CQE user data */
365 0 : uint * poff;
366 0 : uint * psz;
367 0 : if( FD_LIKELY( req_type==UR_REQ_READ ) ) {
368 0 : poff = &rd->head_off; psz = &rd->head_sz;
369 0 : } else if( req_type==UR_REQ_WRITE ) {
370 0 : poff = &rd->tail_off; psz = &rd->tail_sz;
371 0 : } else {
372 0 : FD_LOG_CRIT(( "unexpected CQE (user_data=0x%016llx)", cqe->user_data ));
373 0 : }
374 0 : FD_CRIT( *poff < *psz, "invariant violation" );
375 0 : ur->cqe_read_pending--;
376 0 : ur->cqe_pending--;
377 0 : ur->cqe_cnt++;
378 0 : fd_io_uring_cq_advance( ring->cq, 1U );
379 :
380 : /* was this a short read? */
381 0 : if( FD_UNLIKELY( (uint)cqe_res > *psz-*poff ) ) {
382 0 : FD_LOG_CRIT(( "io_uring read returned excess data (seq=%lu expected_sz=%u cqe_res=%d part=%s)",
383 0 : rd->seq, *psz-*poff, cqe_res, req_type==UR_REQ_READ?"head":"tail" ));
384 0 : }
385 0 : *poff += (uint)cqe_res;
386 0 : if( FD_UNLIKELY( *poff < *psz ) ) {
387 0 : ur->cqe_read_short_cnt++;
388 0 : rq_prep_retry( ur, rd, req_type );
389 0 : return NULL;
390 0 : }
391 :
392 : /* did all reads complete? */
393 0 : if( FD_UNLIKELY( ( rd->head_off < rd->head_sz ) |
394 0 : ( rd->tail_off < rd->tail_sz ) ) ) {
395 : /* need another CQE to make progress */
396 0 : return NULL;
397 0 : }
398 :
399 0 : return (fd_vinyl_io_rd_t *)rd;
400 0 : }
401 :
402 : /* fd_vinyl_io_ur_poll pops the next read completion. May block. */
403 :
404 : int
405 : fd_vinyl_io_ur_poll( fd_vinyl_io_t * io,
406 : fd_vinyl_io_rd_t ** _rd,
407 0 : int flags ) {
408 0 : fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io;
409 0 : fd_io_uring_t * ring = ur->ring;
410 0 : int blocking = !!( flags & FD_VINYL_IO_FLAG_BLOCKING );
411 0 : *_rd = NULL;
412 :
413 : /* Pop early completions */
414 0 : if( ur->rc_head ) {
415 0 : fd_vinyl_io_ur_rd_t * rd = ur->rc_head;
416 0 : fd_vinyl_io_ur_rd_t ** rc_tail_next = ur->rc_tail_next;
417 0 : fd_vinyl_io_ur_rd_t * rc_next = rd->next;
418 0 : ur->rc_head = rc_next;
419 0 : ur->rc_tail_next = fd_ptr_if( !!rc_next, rc_tail_next, &ur->rc_head );
420 0 : *_rd = (fd_vinyl_io_rd_t *)rd;
421 0 : return FD_VINYL_SUCCESS;
422 0 : }
423 :
424 : /* Drain completions until we find a read (skip writes). */
425 0 : for(;;) {
426 0 : if( FD_UNLIKELY( fd_io_uring_sq_dropped( ring->sq ) ) ) {
427 0 : FD_LOG_CRIT(( "io_uring submission queue overflowed" ));
428 0 : }
429 :
430 0 : uint cq_cnt = fd_io_uring_cq_ready( ring->cq );
431 0 : if( FD_UNLIKELY( !cq_cnt ) ) { /* no CQEs ready */
432 : /* Move staged work to submission queue */
433 0 : rq_clean( ur );
434 :
435 : /* If no work is available to schedule, bail to avoid deadlock */
436 0 : int have_pending = ur->sqe_prep_cnt > ur->sqe_sent_cnt;
437 0 : int have_waiting = ur->sqe_sent_cnt > ur->cqe_cnt;
438 0 : if( FD_UNLIKELY( !have_pending && !have_waiting ) ) {
439 0 : return FD_VINYL_ERR_EMPTY;
440 0 : }
441 :
442 : /* Issue syscall to drive kernel */
443 0 : uint flags = blocking ? IORING_ENTER_GETEVENTS : 0U;
444 0 : int submit_cnt = fd_io_uring_submit( ring->sq, ring->ioring_fd, !!blocking, flags );
445 0 : if( FD_UNLIKELY( submit_cnt<0 ) ) {
446 0 : FD_LOG_ERR(( "io_uring_enter failed (%i-%s)", -submit_cnt, fd_io_strerror( -submit_cnt ) ));
447 0 : }
448 0 : ur->sqe_sent_cnt += (ulong)submit_cnt;
449 :
450 0 : cq_cnt = fd_io_uring_cq_ready( ring->cq );
451 0 : if( !cq_cnt ) {
452 0 : if( FD_UNLIKELY( blocking ) ) FD_LOG_CRIT(( "io_uring_submit_and_wait() returned but no CQEs ready" ));
453 0 : return FD_VINYL_ERR_AGAIN;
454 0 : }
455 0 : }
456 :
457 0 : struct io_uring_cqe * cqe = fd_io_uring_cq_head( ring->cq );
458 0 : if( FD_UNLIKELY( !cqe ) ) FD_LOG_CRIT(( "fd_io_uring_cq_head() returned NULL despite io_uring_cq_ready()>=1" ));
459 0 : if( ur_udata_req_type( cqe->user_data )==UR_REQ_WRITE ) {
460 0 : fd_vinyl_io_wq_completion( ur );
461 0 : continue;
462 0 : }
463 :
464 0 : fd_vinyl_io_rd_t * rd = rq_completion( ur );
465 0 : if( FD_UNLIKELY( !rd ) ) continue;
466 0 : *_rd = rd;
467 0 : return FD_VINYL_SUCCESS;
468 0 : }
469 0 : }
|