LCOV - code coverage report
Current view: top level - vinyl/io - fd_vinyl_io_ur.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 11 0.0 %
Date: 2025-12-07 04:58:33 Functions: 0 3 0.0 %

          Line data    Source code
       1             : #include "fd_vinyl_io_ur.h"
       2             : 
       3             : #if FD_HAS_LIBURING
       4             : 
       5             : #include <unistd.h> /* lseek */
       6             : #include <liburing.h>
       7             : 
       8             : static inline void
       9             : bd_read( int    fd,
      10             :          ulong  off,
      11             :          void * buf,
      12             :          ulong  sz ) {
      13             :   ssize_t ssz = pread( fd, buf, sz, (off_t)off );
      14             :   if( FD_LIKELY( ssz==(ssize_t)sz ) ) return;
      15             :   if( ssz<(ssize_t)0 ) FD_LOG_CRIT(( "pread(fd %i,off %lu,sz %lu) failed (%i-%s)", fd, off, sz, errno, fd_io_strerror( errno ) ));
      16             :   /**/                 FD_LOG_CRIT(( "pread(fd %i,off %lu,sz %lu) failed (unexpected sz %li)", fd, off, sz, (long)ssz ));
      17             : }
      18             : 
      19             : static inline void
      20             : bd_write( int          fd,
      21             :           ulong        off,
      22             :           void const * buf,
      23             :           ulong        sz ) {
      24             :   ssize_t ssz = pwrite( fd, buf, sz, (off_t)off );
      25             :   if( FD_LIKELY( ssz==(ssize_t)sz ) ) return;
      26             :   if( ssz<(ssize_t)0 ) FD_LOG_CRIT(( "pwrite(fd %i,off %lu,sz %lu) failed (%i-%s)", fd, off, sz, errno, fd_io_strerror( errno ) ));
      27             :   else                 FD_LOG_CRIT(( "pwrite(fd %i,off %lu,sz %lu) failed (unexpected sz %li)", fd, off, sz, (long)ssz ));
      28             : }
      29             : 
      30             : /* fd_vinyl_io_ur_rd_t extends fd_vinyl_io_rd_t.  Describes an inflight
      31             :    read request.  Each object gets created with a fd_vinyl_io_read()
      32             :    call, has at least the lifetime of a io_uring SQE/CQE transaction,
      33             :    and gets destroyed with fd_vinyl_io_poll().
      34             : 
      35             :    Each fd_vinyl_io_rd_t describes a contiguous read in bstream seq
      36             :    space.  When mapped to the device, this typically results in a single
      37             :    contiguous read. */
      38             : 
      39             : struct fd_vinyl_io_ur_rd;
      40             : typedef struct fd_vinyl_io_ur_rd fd_vinyl_io_ur_rd_t;
      41             : 
      42             : struct fd_vinyl_io_ur_rd {
      43             :   ulong                 ctx;  /* Must mirror fd_vinyl_io_rd_t */
      44             :   ulong                 seq;  /* " */
      45             :   void *                dst;  /* " */
      46             :   ulong                 sz;   /* " */
      47             : 
      48             :   uint                  tsz;  /* Tail read size */
      49             :   fd_vinyl_io_ur_rd_t * next; /* Next element in ur rd queue */
      50             : };
      51             : 
      52             : FD_STATIC_ASSERT( sizeof(fd_vinyl_io_ur_rd_t)<=sizeof(fd_vinyl_io_rd_t), layout );
      53             : 
      54             : /* fd_vinyl_io_ur_t extends fd_viny_io_t. */
      55             : 
      56             : struct fd_vinyl_io_ur {
      57             :   fd_vinyl_io_t            base[1];
      58             :   int                      dev_fd;       /* File descriptor of block device */
      59             :   ulong                    dev_sync;     /* Offset to block that holds bstream sync (BLOCK_SZ multiple) */
      60             :   ulong                    dev_base;     /* Offset to first block (BLOCK_SZ multiple) */
      61             :   ulong                    dev_sz;       /* Block store byte size (BLOCK_SZ multiple) */
      62             :   fd_vinyl_io_ur_rd_t *    rd_head;      /* Pointer to queue head */
      63             :   fd_vinyl_io_ur_rd_t **   rd_tail_next; /* Pointer to queue &tail->next or &rd_head if empty. */
      64             :   fd_vinyl_bstream_block_t sync[1];
      65             : 
      66             :   struct io_uring * ring;
      67             : 
      68             :   ulong sq_prep_cnt;  /* io_uring SQEs sent */
      69             :   ulong sq_sent_cnt;  /* io_uring SQEs submitted */
      70             :   ulong cq_cnt;       /* io_uring CQEs received */
      71             : 
      72             :   /* spad_max bytes follow */
      73             : };
      74             : 
      75             : typedef struct fd_vinyl_io_ur fd_vinyl_io_ur_t;
      76             : 
      77             : /* fd_vinyl_io_ur_read_imm is identical to fd_vinyl_io_bd_read_imm. */
      78             : 
      79             : static void
      80             : fd_vinyl_io_ur_read_imm( fd_vinyl_io_t * io,
      81             :                          ulong           seq0,
      82             :                          void *          _dst,
      83             :                          ulong           sz ) {
      84             :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io;  /* Note: io must be non-NULL to have even been called */
      85             : 
      86             :   /* If this is a request to read nothing, succeed immediately.  If
      87             :      this is a request to read outside the bstream's past, fail. */
      88             : 
      89             :   if( FD_UNLIKELY( !sz ) ) return;
      90             : 
      91             :   uchar * dst  = (uchar *)_dst;
      92             :   ulong   seq1 = seq0 + sz;
      93             : 
      94             :   ulong seq_past    = ur->base->seq_past;
      95             :   ulong seq_present = ur->base->seq_present;
      96             : 
      97             :   int bad_seq  = !fd_ulong_is_aligned( seq0, FD_VINYL_BSTREAM_BLOCK_SZ );
      98             :   int bad_dst  = (!fd_ulong_is_aligned( (ulong)dst, FD_VINYL_BSTREAM_BLOCK_SZ )) | !dst;
      99             :   int bad_sz   = !fd_ulong_is_aligned( sz,   FD_VINYL_BSTREAM_BLOCK_SZ );
     100             :   int bad_past = !(fd_vinyl_seq_le( seq_past, seq0 ) & fd_vinyl_seq_lt( seq0, seq1 ) & fd_vinyl_seq_le( seq1, seq_present ));
     101             : 
     102             :   if( FD_UNLIKELY( bad_seq | bad_dst | bad_sz | bad_past ) )
     103             :     FD_LOG_CRIT(( "bstream read_imm [%016lx,%016lx)/%lu failed (past [%016lx,%016lx)/%lu, %s)",
     104             :                   seq0, seq1, sz, seq_past, seq_present, seq_present-seq_past,
     105             :                   bad_seq ? "misaligned seq"         :
     106             :                   bad_dst ? "misaligned or NULL dst" :
     107             :                   bad_sz  ? "misaligned sz"          :
     108             :                             "not in past" ));
     109             : 
     110             :   /* At this point, we have a valid read request.  Map seq0 into the
     111             :      bstream store.  Read the lesser of sz bytes or until the store end.
     112             :      If we hit the store end with more to go, wrap around and finish the
     113             :      read at the store start. */
     114             : 
     115             :   int   dev_fd   = ur->dev_fd;
     116             :   ulong dev_base = ur->dev_base;
     117             :   ulong dev_sz   = ur->dev_sz;
     118             : 
     119             :   ulong dev_off = seq0 % dev_sz;
     120             : 
     121             :   ulong rsz = fd_ulong_min( sz, dev_sz - dev_off );
     122             :   bd_read( dev_fd, dev_base + dev_off, dst, rsz );
     123             :   sz -= rsz;
     124             : 
     125             :   if( FD_UNLIKELY( sz ) ) bd_read( dev_fd, dev_base, dst + rsz, sz );
     126             : }
     127             : 
     128             : /* ### Read pipeline explainer
     129             : 
     130             :    vinyl_io clients submit read jobs using vinyl_io_read, and poll for
     131             :    completions using vinyl_io_poll.  Reads may complete in arbitrary
     132             :    order.  On first sight, this cleanly translates to io_uring.
     133             : 
     134             :    Read job descriptors are user-allocated.  The client is not aware of
     135             :    any job queue depth limits in the vinyl_io backend's internals.  The
     136             :    vinyl_io backend is expected to queue up an infinitely deep backlog
     137             :    of read jobs.  However, the io_uring submission queue has a hard
     138             :    depth limit.
     139             : 
     140             :    The vinyl_io lifecycle therefore is as follows:
     141             :    - io_ur_read adds a read job to the 'staged' queue.  This is a linked
     142             :      list weaving through all user-submitted jobs.
     143             :    - io_ur_read/io_ur_poll move jobs from the 'staged' queue to the
     144             :      'wait' heap.  Each wait heap entry is shadowed by an io_uring
     145             :       submission queue entry.
     146             :    - io_ur_poll matches io_uring completions with corresponding 'wait'
     147             :      heap entries.  Each entry is removed from the 'wait' heap and
     148             :      returned back to the user.
     149             : 
     150             :    In rare cases, a bstream read may wrap around the end of the bstream.
     151             :    In this case, two linked SQEs are generated.
     152             : 
     153             :    ### Polling
     154             : 
     155             :    fd_vinyl_io_read registers work in userspace only but does not do any
     156             :    syscalls.  fd_vinyl_io_poll submits read jobs (calls kernel io_uring
     157             :    syscall) if there is any work pending, then polls for completions. */
     158             : 
     159             : /* ur_staged_push adds a read job to the staged queue. */
     160             : 
     161             : static void
     162             : ur_staged_push( fd_vinyl_io_ur_t *    ur,
     163             :                 fd_vinyl_io_ur_rd_t * rd ) {
     164             :   rd->next          = NULL;
     165             :   *ur->rd_tail_next = rd;
     166             :   ur->rd_tail_next  = &rd->next;
     167             : }
     168             : 
     169             : /* ur_prep_read translates a staged read job into one (or rarely two)
     170             :    io_uring SQEs.  SQEs are allocated off the io_uring instance.
     171             :    Returns the number of SQEs prepared on success, and moves rd onto the
     172             :    wait heap.  On failure to allocate SQEs, behaves like a no-op (safe
     173             :    to retry) and returns 0.  */
     174             : 
     175             : static uint
     176             : ur_prep_read( fd_vinyl_io_ur_t *    ur,
     177             :               fd_vinyl_io_ur_rd_t * rd,
     178             :               ulong                 seq0,
     179             :               ulong                 sz ) {
     180             :   struct io_uring * ring = ur->ring;
     181             :   if( FD_UNLIKELY( sz>INT_MAX ) ) {
     182             :     FD_LOG_CRIT(( "Invalid read size 0x%lx bytes (exceeds max)", sz ));
     183             :   }
     184             :   if( FD_UNLIKELY( io_uring_sq_space_left( ring )<2U ) ) return 0U;
     185             : 
     186             :   /* Map seq0 into the bstream store. */
     187             : 
     188             :   ulong dev_base = ur->dev_base;
     189             :   ulong dev_sz   = ur->dev_sz;
     190             : 
     191             :   ulong dev_off = seq0 % dev_sz;
     192             : 
     193             :   ulong rsz = fd_ulong_min( sz, dev_sz - dev_off );
     194             :   sz -= rsz;
     195             : 
     196             :   /* Prepare the head SQE */
     197             :   rd->next = NULL;
     198             :   rd->tsz  = (uint)rsz;
     199             :   struct io_uring_sqe * sqe = io_uring_get_sqe( ring );
     200             :   if( FD_UNLIKELY( !sqe ) ) FD_LOG_CRIT(( "io_uring_get_sqe() returned NULL despite io_uring_sq_space_left()>=2" ));
     201             :   io_uring_prep_read( sqe, 0, rd->dst, (uint)rsz, dev_base+dev_off );
     202             :   io_uring_sqe_set_flags( sqe, IOSQE_FIXED_FILE );
     203             :   io_uring_sqe_set_data( sqe, rd );
     204             :   ur->sq_prep_cnt++;
     205             :   if( FD_LIKELY( !sz ) ) return 1U; /* optimize for the unfragmented case */
     206             : 
     207             :   /* Tail wraparound occurred.  Amend the head SQE to be linked to the
     208             :      tail SQE, detach it from the io_ur descriptor, and suppress the CQE
     209             :      for the head.  If we get a CQE for the tail read job, we know that
     210             :      the head read job also succeeded.  Also, set the low bit of the
     211             :      userdata to 1 (usually guaranteed to be 0 due to alignment), to
     212             :      indicate that this SQE is a head frag. */
     213             :   io_uring_sqe_set_flags( sqe, IOSQE_FIXED_FILE | IOSQE_IO_LINK | IOSQE_CQE_SKIP_SUCCESS );
     214             :   io_uring_sqe_set_data64( sqe, (ulong)rd+1UL );
     215             :   ur->cq_cnt++;  /* Treat as already-completed in metrics */
     216             : 
     217             :   /* Prepare the tail SQE */
     218             :   rd->tsz  = (uint)sz;
     219             :   sqe = io_uring_get_sqe( ring );
     220             :   if( FD_UNLIKELY( !sqe ) ) FD_LOG_CRIT(( "io_uring_get_sqe() returned NULL despite io_uring_sq_space_left()>=2" ));
     221             :   io_uring_prep_read( sqe, 0, (uchar *)rd->dst + rsz, (uint)sz, dev_base );
     222             :   io_uring_sqe_set_flags( sqe, IOSQE_FIXED_FILE );
     223             :   io_uring_sqe_set_data( sqe, rd );
     224             :   ur->sq_prep_cnt++;
     225             :   return 2U;
     226             : }
     227             : 
     228             : /* ur_staged_clean moves as many read jobs from the staged queue to the
     229             :    submission queue as possible. */
     230             : 
     231             : static void
     232             : ur_staged_clean( fd_vinyl_io_ur_t * ur ) {
     233             :   for(;;) {
     234             :     fd_vinyl_io_ur_rd_t * rd = ur->rd_head;
     235             :     if( !rd ) break;
     236             : 
     237             :     fd_vinyl_io_ur_rd_t ** rd_tail_next = ur->rd_tail_next;
     238             :     fd_vinyl_io_ur_rd_t *  rd_next      = rd->next;
     239             : 
     240             :     uint sqe_cnt = ur_prep_read( ur, rd, rd->seq, rd->sz );
     241             :     if( FD_UNLIKELY( !sqe_cnt ) ) break;
     242             : 
     243             :     ur->rd_head      = rd_next;
     244             :     ur->rd_tail_next = fd_ptr_if( !!rd_next, rd_tail_next, &ur->rd_head );
     245             :   }
     246             : }
     247             : 
     248             : static void
     249             : fd_vinyl_io_ur_read( fd_vinyl_io_t *    io,
     250             :                      fd_vinyl_io_rd_t * _rd ) {
     251             :   fd_vinyl_io_ur_t *    ur = (fd_vinyl_io_ur_t *)io;
     252             :   fd_vinyl_io_ur_rd_t * rd = (fd_vinyl_io_ur_rd_t *)_rd;
     253             :   ur_staged_push( ur, rd );
     254             :   ur_staged_clean( ur );
     255             : }
     256             : 
     257             : static int
     258             : fd_vinyl_io_ur_poll( fd_vinyl_io_t *     io,
     259             :                      fd_vinyl_io_rd_t ** _rd,
     260             :                      int                 flags ) {
     261             :   fd_vinyl_io_ur_t * ur       = (fd_vinyl_io_ur_t *)io;
     262             :   struct io_uring *  ring     = ur->ring;
     263             :   int                blocking = !!( flags & FD_VINYL_IO_FLAG_BLOCKING );
     264             :   *_rd = NULL;
     265             : 
     266             :   uint cq_cnt = io_uring_cq_ready( ring );
     267             :   if( FD_UNLIKELY( !cq_cnt ) ) {  /* no CQEs ready */
     268             :     /* Move staged work to submission queue */
     269             :     ur_staged_clean( ur );
     270             : 
     271             :     /* If no work is available to schedule, bail to avoid deadlock */
     272             :     int have_pending = ur->sq_prep_cnt > ur->sq_sent_cnt;
     273             :     int have_waiting = ur->sq_sent_cnt > ur->cq_cnt;
     274             :     if( FD_UNLIKELY( !have_pending && !have_waiting ) ) {
     275             :       return FD_VINYL_ERR_EMPTY;
     276             :     }
     277             : 
     278             :     /* Issue syscall to drive kernel */
     279             :     int submit_cnt;
     280             :     if( blocking ) {
     281             :       submit_cnt = io_uring_submit_and_wait( ring, 1U );
     282             :     } else {
     283             :       submit_cnt = io_uring_submit_and_get_events( ring );
     284             :     }
     285             :     if( FD_UNLIKELY( submit_cnt<0 ) ) {
     286             :       FD_LOG_ERR(( "%s failed (%i-%s)", blocking ? "io_uring_submit_and_wait" : "io_uring_submit_and_get_events", -submit_cnt, fd_io_strerror( -submit_cnt ) ));
     287             :     }
     288             :     ur->sq_sent_cnt += (ulong)submit_cnt;
     289             : 
     290             :     cq_cnt = io_uring_cq_ready( ring );
     291             :     if( !cq_cnt ) {
     292             :       if( FD_UNLIKELY( blocking ) ) FD_LOG_CRIT(( "io_uring_submit_and_wait() returned but no CQEs ready" ));
     293             :       return FD_VINYL_ERR_AGAIN;
     294             :     }
     295             :   }
     296             : 
     297             :   /* At this point, we have at least one CQE ready.
     298             :      It could come in one of these shapes:
     299             :      - Success (full read): implies that all fragments of a ur_rd read
     300             :        have been completed; only generated for the last frag
     301             :      - Short read: crash the app
     302             :      - Zero byte read: unexpected EOF reached, crash the app
     303             :      - Error (cancelled): a short read of the head frag broke the SQE
     304             :        chain, the tail got cancelled.  Crash the app
     305             :      - Error (other): crash the app */
     306             : 
     307             :   struct io_uring_cqe * cqe = NULL;
     308             :   io_uring_peek_cqe( ring, &cqe );
     309             :   if( FD_UNLIKELY( !cqe ) ) FD_LOG_CRIT(( "io_uring_peek_cqe() yielded NULL despite io_uring_cq_ready()=%u", cq_cnt ));
     310             :   ulong                 udata     = io_uring_cqe_get_data64( cqe );
     311             :   int                   last_frag = !fd_ulong_extract_bit( udata, 0 );
     312             :   fd_vinyl_io_ur_rd_t * rd        = (void *)fd_ulong_clear_bit( udata, 0 );
     313             :   if( FD_UNLIKELY( !rd ) ) FD_LOG_CRIT(( "io_uring_peek_cqe() yielded invalid user data" ));
     314             :   int cqe_res = cqe->res;
     315             :   if( cqe_res<0 ) {
     316             :     FD_LOG_ERR(( "io_uring read failed (%i-%s)", -cqe_res, fd_io_strerror( -cqe_res ) ));
     317             :   }
     318             :   if( FD_UNLIKELY( !last_frag ) ) {
     319             :     FD_LOG_ERR(( "io_uring read failed (short read or EOF)" ));
     320             :   }
     321             :   if( FD_UNLIKELY( rd->tsz!=(uint)cqe_res ) ) {
     322             :     FD_LOG_ERR(( "io_uring read failed (expected %u bytes, got %i bytes)", rd->tsz, cqe_res ));
     323             :   }
     324             :   io_uring_cq_advance( ring, 1U );
     325             :   ur->cq_cnt++;
     326             : 
     327             :   *_rd = (fd_vinyl_io_rd_t *)rd;
     328             :   return FD_VINYL_SUCCESS;
     329             : }
     330             : 
     331             : /* fd_vinyl_io_ur_append is identical to fd_vinyl_io_bd_append. */
     332             : 
     333             : static ulong
     334             : fd_vinyl_io_ur_append( fd_vinyl_io_t * io,
     335             :                        void const *    _src,
     336             :                        ulong           sz ) {
     337             :   fd_vinyl_io_ur_t * ur  = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     338             :   uchar const *      src = (uchar const *)_src;
     339             : 
     340             :   /* Validate the input args. */
     341             : 
     342             :   ulong seq_future  = ur->base->seq_future;  if( FD_UNLIKELY( !sz ) ) return seq_future;
     343             :   ulong seq_ancient = ur->base->seq_ancient;
     344             :   int   dev_fd      = ur->dev_fd;
     345             :   ulong dev_base    = ur->dev_base;
     346             :   ulong dev_sz      = ur->dev_sz;
     347             : 
     348             :   int bad_src      = !src;
     349             :   int bad_align    = !fd_ulong_is_aligned( (ulong)src, FD_VINYL_BSTREAM_BLOCK_SZ );
     350             :   int bad_sz       = !fd_ulong_is_aligned( sz,         FD_VINYL_BSTREAM_BLOCK_SZ );
     351             :   int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
     352             : 
     353             :   if( FD_UNLIKELY( bad_src | bad_align | bad_sz | bad_capacity ) )
     354             :     FD_LOG_CRIT(( bad_src   ? "NULL src"       :
     355             :                   bad_align ? "misaligned src" :
     356             :                   bad_sz    ? "misaligned sz"  :
     357             :                               "device full" ));
     358             : 
     359             :   /* At this point, we appear to have a valid append request.  Map it to
     360             :      the bstream (updating seq_future) and map it to the device.  Then
     361             :      write the lesser of sz bytes or until the store end.  If we hit the
     362             :      store end with more to go, wrap around and finish the write at the
     363             :      store start. */
     364             : 
     365             :   ulong seq = seq_future;
     366             :   ur->base->seq_future = seq + sz;
     367             : 
     368             :   ulong dev_off = seq % dev_sz;
     369             : 
     370             :   ulong wsz = fd_ulong_min( sz, dev_sz - dev_off );
     371             :   bd_write( dev_fd, dev_base + dev_off, src, wsz );
     372             :   sz -= wsz;
     373             :   if( sz ) bd_write( dev_fd, dev_base, src + wsz, sz );
     374             : 
     375             :   return seq;
     376             : }
     377             : 
     378             : /* fd_vinyl_io_ur_commit is identical to fd_vinyl_io_bd_commit. */
     379             : 
     380             : static int
     381             : fd_vinyl_io_ur_commit( fd_vinyl_io_t * io,
     382             :                        int             flags ) {
     383             :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     384             :   (void)flags;
     385             : 
     386             :   ur->base->seq_present = ur->base->seq_future;
     387             :   ur->base->spad_used   = 0UL;
     388             : 
     389             :   return FD_VINYL_SUCCESS;
     390             : }
     391             : 
     392             : /* fd_vinyl_io_ur_hint is identical to fd_vinyl_io_bd_hint. */
     393             : 
     394             : static ulong
     395             : fd_vinyl_io_ur_hint( fd_vinyl_io_t * io,
     396             :                      ulong           sz ) {
     397             :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     398             : 
     399             :   ulong seq_future  = ur->base->seq_future;  if( FD_UNLIKELY( !sz ) ) return seq_future;
     400             :   ulong seq_ancient = ur->base->seq_ancient;
     401             :   ulong dev_sz      = ur->dev_sz;
     402             : 
     403             :   int bad_sz       = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
     404             :   int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
     405             : 
     406             :   if( FD_UNLIKELY( bad_sz | bad_capacity ) ) FD_LOG_CRIT(( bad_sz ? "misaligned sz" : "device full" ));
     407             : 
     408             :   return ur->base->seq_future;
     409             : }
     410             : 
     411             : /* fd_vinyl_io_ur_alloc is identical to fd_vinyl_io_bd_alloc. */
     412             : 
     413             : static void *
     414             : fd_vinyl_io_ur_alloc( fd_vinyl_io_t * io,
     415             :                       ulong           sz,
     416             :                       int             flags ) {
     417             :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     418             : 
     419             :   ulong spad_max  = ur->base->spad_max;
     420             :   ulong spad_used = ur->base->spad_used; if( FD_UNLIKELY( !sz ) ) return ((uchar *)(ur+1)) + spad_used;
     421             : 
     422             :   int bad_align = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
     423             :   int bad_sz    = sz > spad_max;
     424             : 
     425             :   if( FD_UNLIKELY( bad_align | bad_sz ) ) FD_LOG_CRIT(( bad_align ? "misaligned sz" : "sz too large" ));
     426             : 
     427             :   if( FD_UNLIKELY( sz > (spad_max - spad_used ) ) ) {
     428             :     if( FD_UNLIKELY( fd_vinyl_io_ur_commit( io, flags ) ) ) return NULL;
     429             :     spad_used = 0UL;
     430             :   }
     431             : 
     432             :   ur->base->spad_used = spad_used + sz;
     433             : 
     434             :   return ((uchar *)(ur+1)) + spad_used;
     435             : }
     436             : 
     437             : /* fd_vinyl_io_ur_copy is identical to fd_vinyl_io_bd_copy. */
     438             : 
     439             : static ulong
     440             : fd_vinyl_io_ur_copy( fd_vinyl_io_t * io,
     441             :                      ulong           seq_src0,
     442             :                      ulong           sz ) {
     443             :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     444             : 
     445             :   /* Validate the input args */
     446             : 
     447             :   ulong seq_ancient = ur->base->seq_ancient;
     448             :   ulong seq_past    = ur->base->seq_past;
     449             :   ulong seq_present = ur->base->seq_present;
     450             :   ulong seq_future  = ur->base->seq_future;   if( FD_UNLIKELY( !sz ) ) return seq_future;
     451             :   ulong spad_max    = ur->base->spad_max;
     452             :   ulong spad_used   = ur->base->spad_used;
     453             :   int   dev_fd      = ur->dev_fd;
     454             :   ulong dev_base    = ur->dev_base;
     455             :   ulong dev_sz      = ur->dev_sz;
     456             : 
     457             :   ulong seq_src1 = seq_src0 + sz;
     458             : 
     459             :   int bad_past     = !( fd_vinyl_seq_le( seq_past, seq_src0    ) &
     460             :                         fd_vinyl_seq_lt( seq_src0, seq_src1    ) &
     461             :                         fd_vinyl_seq_le( seq_src1, seq_present ) );
     462             :   int bad_src      = !fd_ulong_is_aligned( seq_src0, FD_VINYL_BSTREAM_BLOCK_SZ );
     463             :   int bad_sz       = !fd_ulong_is_aligned( sz,       FD_VINYL_BSTREAM_BLOCK_SZ );
     464             :   int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
     465             : 
     466             :   if( FD_UNLIKELY( bad_past | bad_src | bad_sz | bad_capacity ) )
     467             :     FD_LOG_CRIT(( bad_past ? "src is not in the past"    :
     468             :                   bad_src  ? "misaligned src_seq"        :
     469             :                   bad_sz   ? "misaligned sz"             :
     470             :                              "device full" ));
     471             : 
     472             :   /* At this point, we appear to have a valid copy request.  Get
     473             :      buffer space from the scratch pad (committing as necessary). */
     474             : 
     475             :   if( FD_UNLIKELY( sz>(spad_max-spad_used) ) ) {
     476             :     fd_vinyl_io_ur_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
     477             :     spad_used = 0UL;
     478             :   }
     479             : 
     480             :   uchar * buf     = (uchar *)(ur+1) + spad_used;
     481             :   ulong   buf_max = spad_max - spad_used;
     482             : 
     483             :   /* Map the dst to the bstream (updating seq_future) and map the src
     484             :      and dst regions onto the device.  Then copy as much as we can at a
     485             :      time, handling device wrap around and copy buffering space. */
     486             : 
     487             :   ulong seq = seq_future;
     488             :   ur->base->seq_future = seq + sz;
     489             : 
     490             :   ulong seq_dst0 = seq;
     491             : 
     492             :   for(;;) {
     493             :     ulong src_off = seq_src0 % dev_sz;
     494             :     ulong dst_off = seq_dst0 % dev_sz;
     495             :     ulong csz     = fd_ulong_min( fd_ulong_min( sz, buf_max ), fd_ulong_min( dev_sz - src_off, dev_sz - dst_off ) );
     496             : 
     497             :     bd_read ( dev_fd, dev_base + src_off, buf, csz );
     498             :     bd_write( dev_fd, dev_base + dst_off, buf, csz );
     499             : 
     500             :     sz -= csz;
     501             :     if( !sz ) break;
     502             : 
     503             :     seq_src0 += csz;
     504             :     seq_dst0 += csz;
     505             :   }
     506             : 
     507             :   return seq;
     508             : }
     509             : 
     510             : /* fd_vinyl_io_ur_forget is identical to fd_vinyl_io_bd_forget. */
     511             : 
     512             : static void
     513             : fd_vinyl_io_ur_forget( fd_vinyl_io_t * io,
     514             :                        ulong           seq ) {
     515             :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     516             : 
     517             :   /* Validate input arguments.  Note that we don't allow forgetting into
     518             :      the future even when we have no uncommitted blocks because the
     519             :      resulting [seq_ancient,seq_future) might contain blocks that were
     520             :      never written (which might not be an issue practically but it would
     521             :      be a bit strange for something to try to scan starting from
     522             :      seq_ancient and discover unwritten blocks). */
     523             : 
     524             :   ulong seq_past    = ur->base->seq_past;
     525             :   ulong seq_present = ur->base->seq_present;
     526             :   ulong seq_future  = ur->base->seq_future;
     527             : 
     528             :   int bad_seq    = !fd_ulong_is_aligned( seq, FD_VINYL_BSTREAM_BLOCK_SZ );
     529             :   int bad_dir    = !(fd_vinyl_seq_le( seq_past, seq ) & fd_vinyl_seq_le( seq, seq_present ));
     530             :   int bad_read   = !!ur->rd_head;
     531             :   int bad_append = fd_vinyl_seq_ne( seq_present, seq_future );
     532             : 
     533             :   if( FD_UNLIKELY( bad_seq | bad_dir | bad_read | bad_append ) )
     534             :     FD_LOG_CRIT(( "forget to seq %016lx failed (past [%016lx,%016lx)/%lu, %s)",
     535             :                   seq, seq_past, seq_present, seq_present-seq_past,
     536             :                   bad_seq  ? "misaligned seq"             :
     537             :                   bad_dir  ? "seq out of bounds"          :
     538             :                   bad_read ? "reads in progress"          :
     539             :                              "appends/copies in progress" ));
     540             : 
     541             :   ur->base->seq_past = seq;
     542             : }
     543             : 
     544             : /* fd_vinyl_io_ur_rewind is identical to fd_vinyl_io_bd_rewind. */
     545             : 
     546             : static void
     547             : fd_vinyl_io_ur_rewind( fd_vinyl_io_t * io,
     548             :                        ulong           seq ) {
     549             :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     550             : 
     551             :   /* Validate input argments.  Unlike forgot, we do allow rewinding to
     552             :      before seq_ancient as the region of sequence space reported to the
     553             :      caller as written is still accurate. */
     554             : 
     555             :   ulong seq_ancient = ur->base->seq_ancient;
     556             :   ulong seq_past    = ur->base->seq_past;
     557             :   ulong seq_present = ur->base->seq_present;
     558             :   ulong seq_future  = ur->base->seq_future;
     559             : 
     560             :   int bad_seq    = !fd_ulong_is_aligned( seq, FD_VINYL_BSTREAM_BLOCK_SZ );
     561             :   int bad_dir    = fd_vinyl_seq_gt( seq, seq_present );
     562             :   int bad_read   = !!ur->rd_head;
     563             :   int bad_append = fd_vinyl_seq_ne( seq_present, seq_future );
     564             : 
     565             :   if( FD_UNLIKELY( bad_seq | bad_dir | bad_read | bad_append ) )
     566             :     FD_LOG_CRIT(( "rewind to seq %016lx failed (present %016lx, %s)", seq, seq_present,
     567             :                   bad_seq  ? "misaligned seq"             :
     568             :                   bad_dir  ? "seq after seq_present"      :
     569             :                   bad_read ? "reads in progress"          :
     570             :                              "appends/copies in progress" ));
     571             : 
     572             :   ur->base->seq_ancient = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_ancient ), seq_ancient, seq );
     573             :   ur->base->seq_past    = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_past    ), seq_past,    seq );
     574             :   ur->base->seq_present = seq;
     575             :   ur->base->seq_future  = seq;
     576             : }
     577             : 
     578             : /* fd_vinyl_io_ur_sync is identical to fd_vinyl_io_bd_sync. */
     579             : 
     580             : static int
     581             : fd_vinyl_io_ur_sync( fd_vinyl_io_t * io,
     582             :                      int             flags ) {
     583             :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     584             :   (void)flags;
     585             : 
     586             :   ulong seed        = ur->base->seed;
     587             :   ulong seq_past    = ur->base->seq_past;
     588             :   ulong seq_present = ur->base->seq_present;
     589             : 
     590             :   int   dev_fd       = ur->dev_fd;
     591             :   ulong dev_sync     = ur->dev_sync;
     592             : 
     593             :   fd_vinyl_bstream_block_t * block = ur->sync;
     594             : 
     595             :   /* block->sync.ctl     current (static) */
     596             :   block->sync.seq_past    = seq_past;
     597             :   block->sync.seq_present = seq_present;
     598             :   /* block->sync.info_sz current (static) */
     599             :   /* block->sync.info    current (static) */
     600             : 
     601             :   block->sync.hash_trail  = 0UL;
     602             :   block->sync.hash_blocks = 0UL;
     603             :   fd_vinyl_bstream_block_hash( seed, block ); /* sets hash_trail back to seed */
     604             : 
     605             :   bd_write( dev_fd, dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ );
     606             : 
     607             :   ur->base->seq_ancient = seq_past;
     608             : 
     609             :   return FD_VINYL_SUCCESS;
     610             : }
     611             : 
     612             : /* fd_vinyl_io_ur_fini is identical to fd_vinyl_io_bd_fini. */
     613             : 
     614             : static void *
     615             : fd_vinyl_io_ur_fini( fd_vinyl_io_t * io ) {
     616             :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     617             : 
     618             :   ulong seq_present = ur->base->seq_present;
     619             :   ulong seq_future  = ur->base->seq_future;
     620             : 
     621             :   if( FD_UNLIKELY( ur->rd_head                                ) ) FD_LOG_WARNING(( "fini completing outstanding reads" ));
     622             :   if( FD_UNLIKELY( fd_vinyl_seq_ne( seq_present, seq_future ) ) ) FD_LOG_WARNING(( "fini discarding uncommited blocks" ));
     623             : 
     624             :   return io;
     625             : }
     626             : 
     627             : static fd_vinyl_io_impl_t fd_vinyl_io_ur_impl[1] = { {
     628             :   fd_vinyl_io_ur_read_imm,
     629             :   fd_vinyl_io_ur_read,
     630             :   fd_vinyl_io_ur_poll,
     631             :   fd_vinyl_io_ur_append,
     632             :   fd_vinyl_io_ur_commit,
     633             :   fd_vinyl_io_ur_hint,
     634             :   fd_vinyl_io_ur_alloc,
     635             :   fd_vinyl_io_ur_copy,
     636             :   fd_vinyl_io_ur_forget,
     637             :   fd_vinyl_io_ur_rewind,
     638             :   fd_vinyl_io_ur_sync,
     639             :   fd_vinyl_io_ur_fini
     640             : } };
     641             : 
     642             : FD_STATIC_ASSERT( alignof(fd_vinyl_io_ur_t)==FD_VINYL_BSTREAM_BLOCK_SZ, layout );
     643             : 
     644             : ulong
     645             : fd_vinyl_io_ur_align( void ) {
     646             :   return alignof(fd_vinyl_io_ur_t);
     647             : }
     648             : 
     649             : ulong
     650             : fd_vinyl_io_ur_footprint( ulong spad_max ) {
     651             :   if( FD_UNLIKELY( !((0UL<spad_max) & (spad_max<(1UL<<63)) & fd_ulong_is_aligned( spad_max, FD_VINYL_BSTREAM_BLOCK_SZ )) ) )
     652             :     return 0UL;
     653             :   return sizeof(fd_vinyl_io_ur_t) + spad_max;
     654             : }
     655             : 
     656             : fd_vinyl_io_t *
     657             : fd_vinyl_io_ur_init( void *            mem,
     658             :                      ulong             spad_max,
     659             :                      int               dev_fd,
     660             :                      struct io_uring * ring ) {
     661             :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)mem;
     662             : 
     663             :   if( FD_UNLIKELY( !ur ) ) {
     664             :     FD_LOG_WARNING(( "NULL mem" ));
     665             :     return NULL;
     666             :   }
     667             : 
     668             :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)ur, fd_vinyl_io_ur_align() ) ) ) {
     669             :     FD_LOG_WARNING(( "misaligned mem" ));
     670             :     return NULL;
     671             :   }
     672             : 
     673             :   ulong footprint = fd_vinyl_io_ur_footprint( spad_max );
     674             :   if( FD_UNLIKELY( !footprint ) ) {
     675             :     FD_LOG_WARNING(( "bad spad_max" ));
     676             :     return NULL;
     677             :   }
     678             : 
     679             :   off_t _dev_sz = lseek( dev_fd, (off_t)0, SEEK_END );
     680             :   if( FD_UNLIKELY( _dev_sz<(off_t)0 ) ) {
     681             :     FD_LOG_WARNING(( "lseek failed, bstream must be seekable (%i-%s)", errno, fd_io_strerror( errno ) ));
     682             :     return NULL;
     683             :   }
     684             :   ulong dev_sz = (ulong)_dev_sz;
     685             : 
     686             :   ulong dev_sz_min = 3UL*FD_VINYL_BSTREAM_BLOCK_SZ /* sync block, move block, closing partition */
     687             :                    + fd_vinyl_bstream_pair_sz( FD_VINYL_VAL_MAX ); /* worst case pair (FIXME: LZ4_COMPRESSBOUND?) */
     688             : 
     689             :   int too_small  = dev_sz < dev_sz_min;
     690             :   int too_large  = dev_sz > (ulong)LONG_MAX;
     691             :   int misaligned = !fd_ulong_is_aligned( dev_sz, FD_VINYL_BSTREAM_BLOCK_SZ );
     692             : 
     693             :   if( FD_UNLIKELY( too_small | too_large | misaligned ) ) {
     694             :     FD_LOG_WARNING(( "bstream size %s", too_small ? "too small" :
     695             :                                         too_large ? "too large" :
     696             :                                                     "not a block size multiple" ));
     697             :     return NULL;
     698             :   }
     699             : 
     700             :   memset( ur, 0, footprint );
     701             : 
     702             :   ur->base->type = FD_VINYL_IO_TYPE_UR;
     703             : 
     704             :   /* io_seed, seq_ancient, seq_past, seq_present, seq_future are init
     705             :      below */
     706             : 
     707             :   ur->base->spad_max  = spad_max;
     708             :   ur->base->spad_used = 0UL;
     709             :   ur->base->impl      = fd_vinyl_io_ur_impl;
     710             : 
     711             :   ur->dev_fd   = dev_fd;
     712             :   ur->dev_sync = 0UL;                            /* Use the beginning of the file for the sync block */
     713             :   ur->dev_base = FD_VINYL_BSTREAM_BLOCK_SZ;      /* Use the rest for the actual bstream store (at least 3.5 KiB) */
     714             :   ur->dev_sz   = dev_sz - FD_VINYL_BSTREAM_BLOCK_SZ;
     715             : 
     716             :   ur->rd_head      = NULL;
     717             :   ur->rd_tail_next = &ur->rd_head;
     718             : 
     719             :   ur->ring = ring;
     720             : 
     721             :   /* FIXME: Consider having the sync block on a completely separate
     722             :      device (to reduce seeking when syncing). */
     723             : 
     724             :   fd_vinyl_bstream_block_t * block = ur->sync;
     725             : 
     726             :   bd_read( dev_fd, ur->dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ ); /* logs details */
     727             : 
     728             :   int          type        = fd_vinyl_bstream_ctl_type ( block->sync.ctl );
     729             :   int          version     = fd_vinyl_bstream_ctl_style( block->sync.ctl );
     730             :   ulong        val_max     = fd_vinyl_bstream_ctl_sz   ( block->sync.ctl );
     731             :   ulong        seq_past    = block->sync.seq_past;
     732             :   ulong        seq_present = block->sync.seq_present;
     733             :   ulong        info_sz     = block->sync.info_sz;    // overrides user info_sz
     734             :   void const * info        = block->sync.info;       // overrides user info
     735             :   ulong        io_seed     = block->sync.hash_trail; // overrides user io_seed
     736             : 
     737             :   int bad_type        = (type != FD_VINYL_BSTREAM_CTL_TYPE_SYNC);
     738             :   int bad_version     = (version != 0);
     739             :   int bad_val_max     = (val_max != FD_VINYL_VAL_MAX);
     740             :   int bad_seq_past    = !fd_ulong_is_aligned( seq_past,    FD_VINYL_BSTREAM_BLOCK_SZ );
     741             :   int bad_seq_present = !fd_ulong_is_aligned( seq_present, FD_VINYL_BSTREAM_BLOCK_SZ );
     742             :   int bad_info_sz     = (info_sz > FD_VINYL_BSTREAM_SYNC_INFO_MAX);
     743             :   int bad_past_order  = fd_vinyl_seq_gt( seq_past, seq_present );
     744             :   int bad_past_sz     = ((seq_present-seq_past) > ur->dev_sz);
     745             : 
     746             :   if( FD_UNLIKELY( bad_type | bad_version | bad_val_max | bad_seq_past | bad_seq_present | bad_info_sz |
     747             :                     bad_past_order | bad_past_sz ) ) {
     748             :     FD_LOG_WARNING(( "bad sync block when recovering bstream (%s)",
     749             :                       bad_type        ? "unexpected type"                             :
     750             :                       bad_version     ? "unexpected version"                          :
     751             :                       bad_val_max     ? "unexpected max pair value decoded byte size" :
     752             :                       bad_seq_past    ? "unaligned seq_past"                          :
     753             :                       bad_seq_present ? "unaligned seq_present"                       :
     754             :                       bad_info_sz     ? "unexpected info size"                        :
     755             :                       bad_past_order  ? "unordered seq_past and seq_present"          :
     756             :                                         "past size larger than bstream store" ));
     757             :     return NULL;
     758             :   }
     759             : 
     760             :   if( FD_UNLIKELY( fd_vinyl_bstream_block_test( io_seed, block ) ) ) {
     761             :     FD_LOG_WARNING(( "corrupt sync block when recovering bstream" ));
     762             :     return NULL;
     763             :   }
     764             : 
     765             :   ur->base->seed        = io_seed;
     766             :   ur->base->seq_ancient = seq_past;
     767             :   ur->base->seq_past    = seq_past;
     768             :   ur->base->seq_present = seq_present;
     769             :   ur->base->seq_future  = seq_present;
     770             : 
     771             :   FD_LOG_NOTICE(( "IO config"
     772             :                   "\n\ttype     ur"
     773             :                   "\n\tspad_max %lu bytes"
     774             :                   "\n\tdev_sz   %lu bytes"
     775             :                   "\n\tinfo     \"%s\" (info_sz %lu, discovered)"
     776             :                   "\n\tio_seed  0x%016lx (discovered)",
     777             :                   spad_max, dev_sz,
     778             :                   (char const *)info, info_sz,
     779             :                   io_seed ));
     780             : 
     781             :   return ur->base;
     782             : }
     783             : 
     784             : #else /* io_uring not supported */
     785             : 
     786             : ulong
     787           0 : fd_vinyl_io_ur_align( void ) {
     788           0 :   return 8UL;
     789           0 : }
     790             : 
     791             : ulong
     792           0 : fd_vinyl_io_ur_footprint( ulong spad_max ) {
     793           0 :   (void)spad_max;
     794           0 :   return 8UL;
     795           0 : }
     796             : 
     797             : fd_vinyl_io_t *
     798             : fd_vinyl_io_ur_init( void *            mem,
     799             :                      ulong             spad_max,
     800             :                      int               dev_fd,
     801           0 :                      struct io_uring * ring ) {
     802           0 :   (void)mem; (void)spad_max; (void)dev_fd; (void)ring;
     803           0 :   FD_LOG_WARNING(( "Sorry, this build does not support io_uring" ));
     804             :   return NULL;
     805           0 : }
     806             : 
     807             : #endif

Generated by: LCOV version 1.14