LCOV - code coverage report
Current view: top level - vinyl/io/ur - fd_vinyl_io_ur_wb.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 323 0.0 %
Date: 2026-02-13 06:06:24 Functions: 0 15 0.0 %

          Line data    Source code
       1             : #include "fd_vinyl_io_ur_private.h"
       2             : 
       3             : /* This backend uses scratch pad memory as a write-back cache.  Every
       4             :    byte in the scratch pad has one of the following states:
       5             : 
       6             :    - clean:  Written out to bstream
       7             :    - write:  Write job in progress
       8             :    - wait:   To be written out in the future
       9             :    - future: Written with append(), awaiting commit()
      10             :    - used:   Reserved with alloc(), awaiting trim()/append()
      11             :    - free:   Unused
      12             : 
      13             :    The write-back cache has a logical address space (bstream_seq),
      14             :    and a physical address space (offset).
      15             : 
      16             :    In logical space, each state covers a contiguous range of bytes in
      17             :    the following order.  Various bstream seq numbers mark the bounds
      18             :    between regions.
      19             : 
      20             :          [clean] [write] [wait] [future] [used] [free]
      21             :          ^       ^       ^      ^        ^      ^
      22             :       seq_cache  |   seq_write  |    seq_future |
      23             :              seq_clean     seq_present       wb.seq1
      24             : 
      25             :    The region [seq_past,seq_cache) is outside of the write cache.
      26             : 
      27             :    The region [seq_cache,seq_present), i.e. spanning 'clean', 'write',
      28             :    and 'wait', is part of the write cache.  Read requests that fall in
      29             :    this range are served from the write cache.
      30             : 
      31             :    The logical address space maps to the physical address space as a
      32             :    ring.  The ring mapping is not modulo, see wb_ring for details. */
      33             : 
      34             : void
      35           0 : fd_vinyl_io_wq_completion( fd_vinyl_io_ur_t * ur ) {
      36           0 :   fd_io_uring_t * ring = ur->ring;
      37             : 
      38           0 :   FD_CRIT( ur->cqe_pending      >0, "stray completion"       );
      39           0 :   FD_CRIT( ur->cqe_write_pending>0, "stray write completion" );
      40             : 
      41             :   /* interpret CQE */
      42           0 :   struct io_uring_cqe * cqe = fd_io_uring_cq_head( ring->cq );
      43           0 :   if( FD_UNLIKELY( !cqe ) ) FD_LOG_CRIT(( "no write completion found" ));
      44           0 :   if( ur_udata_req_type( cqe->user_data )!=UR_REQ_WRITE ) {
      45           0 :     FD_LOG_CRIT(( "unexpected CQE type while flushing write queue" ));
      46           0 :   }
      47           0 :   int cqe_res = cqe->res;
      48           0 :   if( cqe_res<0 ) {
      49           0 :     FD_LOG_ERR(( "io_uring write failed (%i-%s)", -cqe_res, fd_io_strerror( -cqe_res ) ));
      50           0 :   }
      51             : 
      52             :   /* advance write cursor */
      53           0 :   ulong       wq_idx  = ur_udata_idx( cqe->user_data );
      54           0 :   wq_desc_t * wq_desc = wq_ring_desc( &ur->wq, wq_idx );
      55           0 :   ulong       req_sz  = wq_desc->sz;
      56           0 :   if( FD_UNLIKELY( (uint)cqe_res != req_sz ) ) {
      57           0 :     FD_LOG_ERR(( "database write failed (short write): requested to write %lu bytes, but only wrote %i bytes (at bstream seq 0x%016lx)",
      58           0 :                  req_sz, cqe_res,
      59           0 :                  wq_desc->seq - req_sz ));
      60           0 :   }
      61           0 :   ur->seq_clean = wq_ring_complete( &ur->wq, wq_idx );
      62             : 
      63           0 :   fd_io_uring_cq_advance( ring->cq, 1U );
      64           0 :   ur->cqe_write_pending--;
      65           0 :   ur->cqe_pending--;
      66           0 :   ur->cqe_cnt++;
      67           0 : }
      68             : 
      69             : /* wq_clean polls for write completions. */
      70             : 
      71             : static void
      72           0 : wq_clean( fd_vinyl_io_ur_t * ur ) {
      73           0 :   fd_io_uring_t * ring = ur->ring;
      74           0 :   while( fd_io_uring_cq_ready( ring->cq ) ) {
      75           0 :     fd_vinyl_io_wq_completion( ur );
      76           0 :   }
      77           0 : }
      78             : 
      79             : /* wq_wait waits for one write completion. */
      80             : 
      81             : static void
      82           0 : wq_wait( fd_vinyl_io_ur_t * ur ) {
      83           0 :   fd_io_uring_t * ring = ur->ring;
      84           0 :   FD_CRIT( ur->cqe_write_pending>0, "stray write completion wait" );
      85           0 :   FD_CRIT( ur->cqe_pending      >0, "stray completion wait" );
      86           0 :   int err = fd_io_uring_enter( ring->ioring_fd, 0U, 1U, IORING_ENTER_GETEVENTS, NULL, 0UL );
      87           0 :   if( FD_UNLIKELY( err<0 ) ) {
      88           0 :     FD_LOG_ERR(( "io_uring_enter failed (%i-%s)", -err, fd_io_strerror( -err ) ));
      89           0 :   }
      90           0 :   FD_CRIT( fd_io_uring_cq_ready( ring->cq )>0, "io_uring_enter returned but no CQEs ready" );
      91           0 : }
      92             : 
      93             : /* wq_enqueue adds a buffer to the write queue. */
      94             : 
      95             : static void
      96             : track_sqe_write( fd_vinyl_io_ur_t * ur,
      97           0 :                  ulong              sz ) {
      98           0 :   ur->base->file_write_cnt++;
      99           0 :   ur->base->file_write_tot_sz += sz;
     100           0 :   ur->sqe_prep_cnt++;
     101           0 :   ur->cqe_pending++;
     102           0 :   ur->cqe_write_pending++;
     103           0 : }
     104             : 
     105             : static void
     106             : wq_enqueue( fd_vinyl_io_ur_t * ur,
     107             :             ulong              seq,
     108             :             uchar const *      src,
     109           0 :             ulong              sz ) {
     110             : 
     111           0 :   fd_io_uring_t * ring     = ur->ring;
     112           0 :   ulong           cq_depth = ring->cq->depth;
     113           0 :   wq_ring_t *     wq       = &ur->wq;
     114           0 :   for(;;) {
     115           0 :     wq_clean( ur );
     116           0 :     if( wq_ring_free_cnt( wq )>=2 && ur->cqe_pending+2 <= cq_depth ) break;
     117           0 :     wq_wait( ur );
     118           0 :   }
     119             : 
     120             :   /* Map seq into the bstream store.  If we hit the store and with more
     121             :      to go, wrap around and finish the write at the store start. */
     122             : 
     123           0 :   ulong dev_base = ur->dev_base;
     124           0 :   ulong dev_sz   = ur->dev_sz;
     125           0 :   ulong dev_off  = seq % dev_sz;
     126             : 
     127           0 :   ulong wsz = fd_ulong_min( sz, dev_sz - dev_off );
     128           0 :   ulong wq0 = wq_ring_enqueue( wq, seq+wsz );
     129           0 :   wq_ring_desc( wq, wq0 )->sz = (uint)wsz; /* remember request size for CQE */
     130           0 :   struct io_uring_sqe * sqe = fd_io_uring_get_sqe( ring->sq );
     131           0 :   if( FD_UNLIKELY( !sqe ) ) FD_LOG_CRIT(( "unbalanced SQ" ));
     132           0 :   *sqe = (struct io_uring_sqe) {
     133           0 :     .opcode    = IORING_OP_WRITE,
     134           0 :     .fd        = 0, /* fixed file index 0 */
     135           0 :     .off       = dev_base + (seq % dev_sz),
     136           0 :     .addr      = (ulong)src,
     137           0 :     .len       = (uint)wsz,
     138           0 :     .flags     = IOSQE_FIXED_FILE,
     139           0 :     .user_data = ur_udata_pack_idx( UR_REQ_WRITE, wq0 ),
     140           0 :   };
     141           0 :   track_sqe_write( ur, wsz );
     142             : 
     143           0 :   sz -= wsz;
     144           0 :   if( sz ) {
     145           0 :     ulong wq1 = wq_ring_enqueue( wq, seq+wsz+sz );
     146           0 :     wq_ring_desc( wq, wq1 )->sz = (uint)sz;
     147           0 :     sqe = fd_io_uring_get_sqe( ring->sq );
     148           0 :     if( FD_UNLIKELY( !sqe ) ) FD_LOG_CRIT(( "unbalanced SQ" ));
     149           0 :     *sqe = (struct io_uring_sqe) {
     150           0 :       .opcode    = IORING_OP_WRITE,
     151           0 :       .fd        = 0, /* fixed file index 0 */
     152           0 :       .off       = dev_base,
     153           0 :       .addr      = (ulong)( src + wsz ),
     154           0 :       .len       = (uint)sz,
     155           0 :       .flags     = IOSQE_FIXED_FILE,
     156           0 :       .user_data = ur_udata_pack_idx( UR_REQ_WRITE, wq1 ),
     157           0 :     };
     158           0 :     track_sqe_write( ur, sz );
     159           0 :   }
     160           0 : }
     161             : 
     162             : /* wq_enqueue_seq adds jobs to the write queue until the given sequence
     163             :    number. */
     164             : 
     165             : static void
     166             : wq_enqueue_seq( fd_vinyl_io_ur_t * ur,
     167           0 :                 ulong              seq1 ) {
     168           0 :   ulong seq0 = ur->seq_write;
     169           0 :   FD_CRIT( fd_vinyl_seq_lt( seq0, seq1 ), "invalid wq_enqueue_seq call" );
     170           0 :   ulong const   sz   = seq1 - seq0;
     171           0 :   uchar const * spad = fd_vinyl_io_ur_wb_buf( ur );
     172             : 
     173           0 :   wb_ring_span_t span = wb_ring_translate( &ur->wb, seq0, sz );
     174           0 :   if( span.sz0 ) wq_enqueue( ur, ur->seq_write,          spad+span.off0, span.sz0 );
     175           0 :   if( span.sz1 ) wq_enqueue( ur, ur->seq_write+span.sz0, spad+span.off1, span.sz1 );
     176           0 :   FD_CRIT( ur->seq_write+span.sz0+span.sz1==seq1, "invariant violation" );
     177           0 :   ur->seq_write = seq1;
     178             : 
     179           0 :   fd_io_uring_t * ring = ur->ring;
     180           0 :   int submit_cnt = fd_io_uring_submit( ring->sq, ring->ioring_fd, 0, 0U );
     181           0 :   if( FD_UNLIKELY( submit_cnt<0 ) ) FD_LOG_ERR(( "io_uring_submit failed (%i-%s)", -submit_cnt, fd_io_strerror( -submit_cnt ) ));
     182           0 :   ur->sqe_sent_cnt += (ulong)submit_cnt;
     183           0 : }
     184             : 
     185             : /* wb_flush_seq does a blocking flush of the write cache until the
     186             :    given sequence number. */
     187             : 
     188             : static void
     189             : wb_flush_seq( fd_vinyl_io_ur_t * ur,
     190           0 :               ulong              seq ) {
     191           0 :   if( FD_UNLIKELY( fd_vinyl_seq_gt( seq, ur->base->seq_present ) ) ) {
     192           0 :     FD_LOG_CRIT(( "pipeline depth exceeded (seq=%lu seq_present=%lu)", seq, ur->base->seq_present ));
     193           0 :   }
     194           0 :   if( fd_vinyl_seq_gt( seq, ur->seq_write ) ) {
     195           0 :     ulong write_sz = seq - ur->seq_write;
     196           0 :     if( FD_UNLIKELY( write_sz < WQ_BLOCK_SZ ) ) {
     197             :       /* required write is a bit small.  try to eagerly write more in
     198             :          order to reduce the amount of write ops. */
     199           0 :       ulong write_max = ur->base->seq_present - ur->seq_write;
     200           0 :       write_sz = fd_ulong_min( write_max, WQ_BLOCK_SZ );
     201           0 :     }
     202           0 :     ulong seq_write_new = ur->seq_write + write_sz;
     203           0 :     FD_CRIT( fd_vinyl_seq_ge( seq_write_new, seq ), "invariant violation" );
     204           0 :     wq_enqueue_seq( ur, seq_write_new );
     205           0 :   }
     206           0 :   for(;;) {
     207           0 :     wq_clean( ur );
     208           0 :     if( fd_vinyl_seq_ge( ur->seq_clean, seq ) ) break;
     209           0 :     wq_wait( ur );
     210           0 :   }
     211           0 : }
     212             : 
     213             : /* fd_vinyl_io_ur_alloc allocates space in the write-back ring buffer. */
     214             : 
     215             : void *
     216             : fd_vinyl_io_ur_alloc( fd_vinyl_io_t * io,
     217             :                       ulong           sz,
     218           0 :                       int             flags ) {
     219           0 :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     220             : 
     221           0 :   int flag_blocking = !!( flags & FD_VINYL_IO_FLAG_BLOCKING );
     222             : 
     223           0 :   int bad_align = !fd_ulong_is_aligned( sz, FD_VINYL_BSTREAM_BLOCK_SZ );
     224           0 :   int bad_sz    = sz > ur->wb.max;
     225             : 
     226           0 :   if( FD_UNLIKELY( bad_align | bad_sz ) ) FD_LOG_CRIT(( bad_align ? "misaligned sz" : "sz too large" ));
     227             : 
     228             :   /* An alloc op discards a previous alloc */
     229             : 
     230           0 :   wb_ring_trim( &ur->wb, ur->base->seq_future );
     231           0 :   ur->base->spad_used = 0UL;
     232             : 
     233             :   /* Ensure that this alloc does not evict anything from the write cache
     234             :      that we cannot recover from the bstream file descriptor. */
     235             : 
     236           0 :   ulong seq_clean_new = wb_ring_alloc_seq0( &ur->wb, sz );
     237           0 :   if( FD_UNLIKELY( fd_vinyl_seq_gt( seq_clean_new, ur->seq_clean ) ) ) {
     238             :     /* This alloc cannot proceed until some writes happen */
     239           0 :     if( !flag_blocking ) return NULL;
     240           0 :     wb_flush_seq( ur, seq_clean_new );
     241           0 :   }
     242             : 
     243             :   /* At this point, we have enough clean space to alloc sz bytes. */
     244             : 
     245           0 :   ulong seq = ur->base->seq_future;
     246           0 :   if( FD_UNLIKELY( fd_vinyl_seq_ne( seq, wb_ring_seq1( &ur->wb ) ) ) ) {
     247           0 :     FD_LOG_CRIT(( "seq_future (%lu) and write-back buffer (%lu) out of sync", seq, wb_ring_seq1( &ur->wb ) ));
     248           0 :   }
     249           0 :   wb_ring_alloc( &ur->wb, sz );
     250           0 :   if( FD_UNLIKELY( fd_vinyl_seq_ne( seq+sz, wb_ring_seq1( &ur->wb ) ) ) ) {
     251           0 :     FD_LOG_CRIT(( "seq_future (%lu) and write-back buffer (%lu) out of sync", seq, wb_ring_seq1( &ur->wb ) ));
     252           0 :   }
     253           0 :   ur->base->spad_used = sz;
     254           0 :   if( fd_vinyl_seq_gt( ur->wb.seq0, ur->seq_cache ) ) ur->seq_cache = ur->wb.seq0;
     255           0 :   if( FD_UNLIKELY( !sz ) ) {
     256           0 :     ur->last_alloc = NULL;
     257           0 :     return NULL;
     258           0 :   }
     259             : 
     260           0 :   ulong off = wb_ring_seq_to_off( &ur->wb, seq );
     261           0 :   void * alloc = fd_vinyl_io_ur_wb_buf( ur ) + off;
     262           0 :   ur->last_alloc = alloc;
     263           0 :   return alloc;
     264           0 : }
     265             : 
     266             : /* fd_vinyl_io_ur_append appends to the write-back cache. */
     267             : 
     268             : ulong
     269             : fd_vinyl_io_ur_append( fd_vinyl_io_t * io,
     270             :                        void const *    _src,
     271           0 :                        ulong           sz ) {
     272           0 :   fd_vinyl_io_ur_t * ur   = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     273           0 :   uchar const *      src  = (uchar const *)_src;
     274           0 :   uchar *            spad = fd_vinyl_io_ur_wb_buf( ur );
     275             : 
     276           0 :   if( FD_UNLIKELY( ur->cqe_read_pending ) ) {
     277           0 :     FD_LOG_CRIT(( "attempted to enqueue a write while there are still inflight reads" ));
     278           0 :   }
     279             : 
     280             :   /* Validate the input args. */
     281             : 
     282           0 :   ulong seq_future  = ur->base->seq_future;  if( FD_UNLIKELY( !sz ) ) return seq_future;
     283           0 :   ulong seq_ancient = ur->base->seq_ancient;
     284           0 :   ulong dev_sz      = ur->dev_sz;
     285             : 
     286           0 :   int bad_src      = !src;
     287           0 :   int bad_align    = !fd_ulong_is_aligned( (ulong)src, FD_VINYL_BSTREAM_BLOCK_SZ );
     288           0 :   int bad_sz       = !fd_ulong_is_aligned( sz,         FD_VINYL_BSTREAM_BLOCK_SZ );
     289           0 :   int bad_capacity = sz > (dev_sz - (seq_future-seq_ancient));
     290             : 
     291           0 :   if( FD_UNLIKELY( bad_src | bad_align | bad_sz | bad_capacity ) )
     292           0 :     FD_LOG_CRIT(( bad_src   ? "NULL src"       :
     293           0 :                   bad_align ? "misaligned src" :
     294           0 :                   bad_sz    ? "misaligned sz"  :
     295           0 :                               "device full" ));
     296             : 
     297             :   /* Is the request in-place?  If so, trim the allocation */
     298             : 
     299           0 :   ulong seq = seq_future;
     300             : 
     301           0 :   ur->base->spad_used = 0UL;
     302           0 :   if( src==ur->last_alloc ) {
     303           0 :     FD_CRIT( seq+sz<=ur->wb.seq1+ur->wb.sz1, "seq_future and write-back buffer out of sync" );
     304           0 :     wb_ring_trim( &ur->wb, seq+sz );
     305           0 :   } else {
     306             :     /* src must not be in spad */
     307           0 :     if( FD_UNLIKELY( ( (ulong)src>=(ulong)(spad           ) ) &
     308           0 :                      ( (ulong)src<=(ulong)(spad+ur->wb.max) ) ) ) {
     309           0 :       FD_LOG_CRIT(( "src buffer overlaps write-back cache" ));
     310           0 :     }
     311             :     /* copy allocation into spad */
     312           0 :     wb_ring_trim( &ur->wb, seq );
     313           0 :     void * alloc = fd_vinyl_io_ur_alloc( io, sz, FD_VINYL_IO_FLAG_BLOCKING );
     314           0 :     fd_memcpy( alloc, src, sz );
     315           0 :     ur->base->cache_write_cnt++;
     316           0 :     ur->base->cache_write_tot_sz += sz;
     317           0 :   }
     318             : 
     319             :   /* At this point, the append request is at the correct position in the
     320             :      write-back cache.  Commit the allocation. */
     321             : 
     322           0 :   ur->base->seq_future = seq + sz;
     323           0 :   ur->base->spad_used  = 0UL;
     324             : 
     325           0 :   return seq;
     326           0 : }
     327             : 
     328             : /* fd_vinyl_io_ur_commit 'commits' previous appends, making them visible
     329             :    to subsequent read calls. */
     330             : 
     331             : int
     332             : fd_vinyl_io_ur_commit( fd_vinyl_io_t * io,
     333           0 :                        int             flags ) {
     334           0 :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     335           0 :   (void)flags;
     336             : 
     337           0 :   wb_ring_trim( &ur->wb, ur->base->seq_future );
     338           0 :   ur->base->seq_present = ur->base->seq_future;
     339           0 :   ur->base->spad_used   = 0UL;
     340             : 
     341             :   /* Keep io_uring pipeline fed */
     342             : 
     343           0 :   if( ur->base->seq_present - ur->seq_write >= WQ_BLOCK_SZ ) {
     344           0 :     ulong seq_until = fd_ulong_align_dn( ur->base->seq_present, WQ_BLOCK_SZ );
     345           0 :     if( fd_vinyl_seq_gt( seq_until, ur->seq_write ) ) {
     346           0 :       wq_enqueue_seq( ur, seq_until );
     347           0 :     }
     348           0 :   }
     349             : 
     350           0 :   return FD_VINYL_SUCCESS;
     351           0 : }
     352             : 
     353             : /* fd_vinyl_io_ur_copy does a blocking read of an old block, then copies
     354             :    it out to the tail.  This is not ideal.  Instead, we should enqueue
     355             :    async background read/write jobs. */
     356             : 
     357             : ulong
     358             : fd_vinyl_io_ur_copy( fd_vinyl_io_t * io,
     359             :                      ulong           seq_src0,
     360           0 :                      ulong           sz ) {
     361           0 :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     362             : 
     363             :   /* Validate the input args */
     364             : 
     365           0 :   ulong seq_past    = ur->base->seq_past;
     366           0 :   ulong seq_present = ur->base->seq_present;
     367           0 :   ulong seq_future  = ur->base->seq_future;   if( FD_UNLIKELY( !sz ) ) return seq_future;
     368           0 :   ulong spad_max    = ur->wb.max;
     369             : 
     370           0 :   ulong seq_src1 = seq_src0 + sz;
     371             : 
     372           0 :   int bad_past = !( fd_vinyl_seq_le( seq_past, seq_src0    ) &
     373           0 :                     fd_vinyl_seq_lt( seq_src0, seq_src1    ) &
     374           0 :                     fd_vinyl_seq_le( seq_src1, seq_present ) );
     375           0 :   int bad_src  = !fd_ulong_is_aligned( seq_src0, FD_VINYL_BSTREAM_BLOCK_SZ );
     376           0 :   int bad_sz   = !fd_ulong_is_aligned( sz,       FD_VINYL_BSTREAM_BLOCK_SZ );
     377             : 
     378           0 :   if( FD_UNLIKELY( bad_past | bad_src | bad_sz ) )
     379           0 :     FD_LOG_CRIT(( bad_past ? "src is not in the past" :
     380           0 :                   bad_src  ? "misaligned src_seq"     :
     381           0 :                              "misaligned sz" ));
     382             : 
     383             :   /* Map the dst to the bstream (updating seq_future) and map the src
     384             :      and dst regions onto the device.  Then copy as much as we can at a
     385             :      time, handling device wrap around and copy buffering space. */
     386             : 
     387           0 :   ulong seq = seq_future;
     388             : 
     389           0 :   for(;;) {
     390           0 :     ulong csz = fd_ulong_min( sz, spad_max );
     391             : 
     392           0 :     void * buf = fd_vinyl_io_ur_alloc( io, csz, FD_VINYL_IO_FLAG_BLOCKING );
     393           0 :     fd_vinyl_io_ur_read_imm( io, seq_src0, buf, csz );
     394           0 :     fd_vinyl_io_ur_append  ( io,           buf, csz );
     395             : 
     396           0 :     sz -= csz;
     397           0 :     if( !sz ) break;
     398             : 
     399           0 :     seq_src0 += csz;
     400           0 :   }
     401             : 
     402           0 :   return seq;
     403           0 : }
     404             : 
     405             : ulong
     406             : fd_vinyl_io_ur_hint( fd_vinyl_io_t * io,
     407           0 :                      ulong           sz ) {
     408           0 :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     409           0 :   fd_vinyl_io_ur_alloc( io, sz, FD_VINYL_IO_FLAG_BLOCKING );
     410           0 :   fd_vinyl_io_trim( io, ur->base->seq_future );
     411           0 :   return io->seq_future;
     412           0 : }
     413             : 
     414             : int
     415             : fd_vinyl_io_ur_sync( fd_vinyl_io_t * io,
     416           0 :                      int             flags ) {
     417           0 :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     418           0 :   (void)flags;
     419             : 
     420           0 :   ulong seed        = ur->base->seed;
     421           0 :   ulong seq_past    = ur->base->seq_past;
     422           0 :   ulong seq_present = ur->base->seq_present;
     423             : 
     424           0 :   wb_flush_seq( ur, seq_present );
     425             : 
     426           0 :   int   dev_fd       = ur->dev_fd;
     427           0 :   ulong dev_sync     = ur->dev_sync;
     428             : 
     429           0 :   fd_vinyl_bstream_block_t * block = ur->sync;
     430             : 
     431             :   /* block->sync.ctl     current (static) */
     432           0 :   block->sync.seq_past    = seq_past;
     433           0 :   block->sync.seq_present = seq_present;
     434             :   /* block->sync.info_sz current (static) */
     435             :   /* block->sync.info    current (static) */
     436             : 
     437           0 :   block->sync.hash_trail  = 0UL;
     438           0 :   block->sync.hash_blocks = 0UL;
     439           0 :   fd_vinyl_bstream_block_hash( seed, block ); /* sets hash_trail back to seed */
     440             : 
     441           0 :   bd_write( dev_fd, dev_sync, block, FD_VINYL_BSTREAM_BLOCK_SZ );
     442           0 :   ur->base->file_write_cnt++;
     443           0 :   ur->base->file_write_tot_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
     444             : 
     445           0 :   ur->base->seq_ancient = seq_past;
     446             : 
     447           0 :   return FD_VINYL_SUCCESS;
     448           0 : }
     449             : 
     450             : /* fd_vinyl_io_ur_forget "forgets" old data, making it available to
     451             :    future allocations. */
     452             : 
     453             : void
     454             : fd_vinyl_io_ur_forget( fd_vinyl_io_t * io,
     455           0 :                        ulong           seq ) {
     456           0 :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     457             : 
     458             :   /* Validate input arguments.  Note that we don't allow forgetting into
     459             :      the future even when we have no uncommitted blocks because the
     460             :      resulting [seq_ancient,seq_future) might contain blocks that were
     461             :      never written (which might not be an issue practically but it would
     462             :      be a bit strange for something to try to scan starting from
     463             :      seq_ancient and discover unwritten blocks). */
     464             : 
     465           0 :   ulong seq_past    = ur->base->seq_past;
     466           0 :   ulong seq_present = ur->base->seq_present;
     467           0 :   ulong seq_future  = ur->base->seq_future;
     468             : 
     469           0 :   int bad_seq    = !fd_ulong_is_aligned( seq, FD_VINYL_BSTREAM_BLOCK_SZ );
     470           0 :   int bad_dir    = !(fd_vinyl_seq_le( seq_past, seq ) & fd_vinyl_seq_le( seq, seq_present ));
     471           0 :   int bad_read   = !!ur->rq_head || !!ur->rc_head;
     472           0 :   int bad_append = fd_vinyl_seq_ne( seq_present, seq_future );
     473             : 
     474           0 :   if( FD_UNLIKELY( bad_seq | bad_dir | bad_read | bad_append ) )
     475           0 :     FD_LOG_CRIT(( "forget to seq %016lx failed (past [%016lx,%016lx)/%lu, %s)",
     476           0 :                   seq, seq_past, seq_present, seq_present-seq_past,
     477           0 :                   bad_seq  ? "misaligned seq"             :
     478           0 :                   bad_dir  ? "seq out of bounds"          :
     479           0 :                   bad_read ? "reads in progress"          :
     480           0 :                              "appends/copies in progress" ));
     481             : 
     482             :   /* Before we can forget data, we must have finished writing it first.
     483             :      Usually, this happens long after data has already been flushed, so
     484             :      this should be a no-op for practical purposes. */
     485             : 
     486           0 :   wb_flush_seq( ur, seq );
     487             : 
     488           0 :   ur->base->seq_past = seq;
     489           0 : }
     490             : 
     491             : /* fd_vinyl_io_ur_rewind "undoes" recent writes, allowing the user to
     492             :    overwrite a bstream range. */
     493             : 
     494             : void
     495             : fd_vinyl_io_ur_rewind( fd_vinyl_io_t * io,
     496           0 :                        ulong           seq ) {
     497           0 :   fd_vinyl_io_ur_t * ur = (fd_vinyl_io_ur_t *)io; /* Note: io must be non-NULL to have even been called */
     498             : 
     499             :   /* Validate input argments.  Unlike forgot, we do allow rewinding to
     500             :      before seq_ancient as the region of sequence space reported to the
     501             :      caller as written is still accurate. */
     502             : 
     503           0 :   ulong seq_ancient = ur->base->seq_ancient;
     504           0 :   ulong seq_past    = ur->base->seq_past;
     505           0 :   ulong seq_cache   = ur->      seq_cache;
     506           0 :   ulong seq_clean   = ur->      seq_clean;
     507           0 :   ulong seq_write   = ur->      seq_write;
     508           0 :   ulong seq_present = ur->base->seq_present;
     509           0 :   ulong seq_future  = ur->base->seq_future;
     510             : 
     511           0 :   int bad_seq    = !fd_ulong_is_aligned( seq, FD_VINYL_BSTREAM_BLOCK_SZ );
     512           0 :   int bad_past   = fd_vinyl_seq_lt( seq, seq_past    );
     513           0 :   int bad_dir    = fd_vinyl_seq_gt( seq, seq_present );
     514           0 :   int bad_read   = !!ur->rq_head || !!ur->rc_head;
     515           0 :   int bad_append = fd_vinyl_seq_ne( seq_present, seq_future );
     516             : 
     517           0 :   if( FD_UNLIKELY( bad_seq | bad_past | bad_dir | bad_read | bad_append ) )
     518           0 :     FD_LOG_CRIT(( "rewind to seq %016lx failed (present %016lx, %s)", seq, seq_present,
     519           0 :                   bad_seq  ? "misaligned seq"             :
     520           0 :                   bad_past ? "seq before seq_past"        :
     521           0 :                   bad_dir  ? "seq after seq_present"      :
     522           0 :                   bad_read ? "reads in progress"          :
     523           0 :                              "appends/copies in progress" ));
     524             : 
     525             :   /* Need to awkwardly unwind the write pipeline.  At this instant,
     526             :      there might be inflight writes for the range that is being
     527             :      rewinded.  So, we start at the right and walk to the left until we
     528             :      reach seq: unwind allocs, unwind uncommitted data, unwind dirty
     529             :      cache, unwind inflight writes, unwind clean cache, and finally free
     530             :      up log file space.
     531             : 
     532             :      See top of this file for a description of the various regions. */
     533             : 
     534           0 :   if( fd_vinyl_seq_lt( seq, seq_write ) ) {
     535           0 :     wb_ring_trim( &ur->wb, seq_write ); /* unwind "used", "future", "wait" */
     536           0 :     wb_flush_seq( ur,      seq_write ); /* wait for I/O completion (moves "write" to "clean") */
     537           0 :   }
     538           0 :   wb_ring_trim( &ur->wb, seq ); /* unwind rest */
     539             : 
     540           0 :   ur->base->seq_ancient = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_ancient ), seq_ancient, seq );
     541           0 :   ur->base->seq_past    = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_past    ), seq_past,    seq );
     542           0 :   ur->      seq_cache   = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_cache   ), seq_cache,   seq );
     543           0 :   ur->      seq_clean   = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_clean   ), seq_clean,   seq );
     544           0 :   ur->      seq_write   = fd_ulong_if( fd_vinyl_seq_ge( seq, seq_write   ), seq_write,   seq );
     545           0 :   ur->base->seq_present = seq;
     546           0 :   ur->base->seq_future  = seq;
     547           0 : }

Generated by: LCOV version 1.14