LCOV - code coverage report
Current view: top level - vinyl - fd_vinyl_case_move.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 163 0.0 %
Date: 2025-12-07 04:58:33 Functions: 0 0 -

          Line data    Source code
       1           0 :   case FD_VINYL_REQ_TYPE_MOVE: {
       2             : 
       3           0 :     fd_vinyl_key_t const * req_key_src = MAP_REQ_GADDR( req->key_gaddr,       fd_vinyl_key_t, batch_cnt );
       4           0 :     fd_vinyl_key_t const * req_key_dst = MAP_REQ_GADDR( req->val_gaddr_gaddr, fd_vinyl_key_t, batch_cnt );
       5           0 :     schar *                req_err     = MAP_REQ_GADDR( req->err_gaddr,       schar,          batch_cnt );
       6             : 
       7           0 :     if( FD_UNLIKELY( (!!batch_cnt) & ((!req_key_src) | (!req_key_dst) | (!req_err)) ) ) {
       8           0 :       comp_err = FD_VINYL_ERR_INVAL;
       9           0 :       break;
      10           0 :     }
      11             : 
      12           0 :     for( ulong batch_idx=0UL; batch_idx<batch_cnt; batch_idx++ ) {
      13             : 
      14           0 : #     define DONE(err) do {                                \
      15           0 :         int _err = (err);                                  \
      16           0 :         FD_COMPILER_MFENCE();                              \
      17           0 :         req_err[ batch_idx ] = (schar)_err;                \
      18           0 :         FD_COMPILER_MFENCE();                              \
      19           0 :         fail_cnt += (ulong)!!_err;                         \
      20           0 :         goto next_move;  /* sigh ... can't use continue */ \
      21           0 :       } while(0)
      22             : 
      23             :       /* If the input and output keys are the same, this is a no-op. */
      24             : 
      25           0 :       fd_vinyl_key_t const * key_src = req_key_src + batch_idx;
      26           0 :       fd_vinyl_key_t const * key_dst = req_key_dst + batch_idx;
      27             : 
      28           0 :       if( FD_UNLIKELY( fd_vinyl_key_eq( key_src, key_dst ) ) ) DONE( FD_VINYL_SUCCESS );
      29             : 
      30             :       /* At this point, key_src and key_dst are distinct.  Query meta
      31             :          for pair key_dst.  If key_dst is acquired, fail with again. */
      32             : 
      33           0 :       ulong memo_dst = fd_vinyl_key_memo( meta_seed, key_dst );
      34             : 
      35           0 :       ulong _ele_idx_dst; /* Avoid pointer escape */
      36           0 :       int   err_dst = fd_vinyl_meta_query_fast( ele0, ele_max, key_dst, memo_dst, &_ele_idx_dst );
      37           0 :       ulong ele_idx_dst = _ele_idx_dst; /* In [0,ele_max) */
      38             : 
      39           0 :       ulong line_idx_dst = ULONG_MAX; /* Fix spurious compiler warning */
      40             : 
      41           0 :       if( FD_UNLIKELY( !err_dst ) ) { /* dst exists at bstream seq_present or is being created */
      42             : 
      43           0 :         line_idx_dst = ele0[ ele_idx_dst ].line_idx;
      44             : 
      45           0 :         if( FD_UNLIKELY( line_idx_dst<line_cnt ) ) { /* dst is in cache */
      46             : 
      47           0 :           FD_CRIT( line[ line_idx_dst ].ele_idx==ele_idx_dst, "corruption detected" );
      48             : 
      49           0 :           ulong line_ctl_dst = line[ line_idx_dst ].ctl;
      50             : 
      51           0 :           long ref_dst = fd_vinyl_line_ctl_ref( line_ctl_dst );
      52             : 
      53           0 :           if( FD_UNLIKELY( ref_dst ) ) DONE( FD_VINYL_ERR_AGAIN ); /* dst is acquired */
      54             : 
      55           0 :         } else {
      56             : 
      57           0 :           FD_CRIT( line_idx_dst==ULONG_MAX, "corruption detected" );
      58             : 
      59           0 :         }
      60             : 
      61           0 :       }
      62             : 
      63             :       /* At this point, pair key dst might exist but is not acquired.
      64             :          Query meta for key_src.  If it doesn't exist (KEY) or is
      65             :          acquired (AGAIN), fail.  Otherwise, if it is not cached, cache
      66             :          it in the LRU position.
      67             : 
      68             :          (Note: if we want to overlap this IO maximally, we would do the
      69             :          caching of these lines async but then we'd need to be able to
      70             :          guarantee at least move batch_cnt lines are evictable ... this
      71             :          gets quite tricky.  So we just do blocking I/O here as we know
      72             :          at least 1 line is evictable.) */
      73             : 
      74           0 :       ulong memo_src = fd_vinyl_key_memo( meta_seed, key_src );
      75             : 
      76           0 :       ulong _ele_idx_src; /* Avoid pointer escape */
      77           0 :       int   err_src = fd_vinyl_meta_query_fast( ele0, ele_max, key_src, memo_src, &_ele_idx_src );
      78           0 :       ulong ele_idx_src = _ele_idx_src; /* In [0,ele_max) */
      79             : 
      80           0 :       if( FD_UNLIKELY( err_src ) ) DONE( FD_VINYL_ERR_KEY );
      81             : 
      82           0 :       ulong val_sz = (ulong)ele0[ ele_idx_src ].phdr.info.val_sz;
      83             : 
      84           0 :       FD_CRIT( val_sz<=FD_VINYL_VAL_MAX, "corruption detected" );
      85             : 
      86           0 :       ulong seq_src      = ele0[ ele_idx_src ].seq;
      87           0 :       ulong line_idx_src = ele0[ ele_idx_src ].line_idx;
      88             : 
      89           0 :       fd_vinyl_data_obj_t *     obj_src;
      90           0 :       fd_vinyl_bstream_phdr_t * phdr_src;
      91             : 
      92           0 :       if( FD_LIKELY( line_idx_src<line_cnt ) ) {
      93             : 
      94           0 :         ulong line_ctl_src = line[ line_idx_src ].ctl;
      95             : 
      96           0 :         long ref_src = fd_vinyl_line_ctl_ref( line_ctl_src );
      97             : 
      98           0 :         if( FD_UNLIKELY( ref_src ) ) DONE( FD_VINYL_ERR_AGAIN );
      99             : 
     100           0 :         ulong ver_src = fd_vinyl_line_ctl_ver( line_ctl_src );
     101             : 
     102           0 :         FD_CRIT( line[ line_idx_src ].ele_idx==ele_idx_src, "corruption detected" );
     103             : 
     104           0 :         obj_src = line[ line_idx_src ].obj;
     105             : 
     106           0 :         FD_ALERT( fd_vinyl_data_is_valid_obj( obj_src, vol, vol_cnt ), "corruption detected" );
     107           0 :         FD_CRIT ( obj_src->line_idx==line_idx_src,                     "corruption detected" );
     108             : 
     109           0 :         phdr_src = fd_vinyl_data_obj_phdr( obj_src );
     110             : 
     111           0 :         line[ line_idx_src ].ctl = fd_vinyl_line_ctl( ver_src+1UL, 0L );
     112             : 
     113           0 :       } else {
     114             : 
     115           0 :         FD_CRIT( line_idx_src==ULONG_MAX, "corruption detected" );
     116             : 
     117             :         /* Read the encoded pair from the bstream */
     118             : 
     119           0 :         ulong ctl = ele0[ ele_idx_src ].phdr.ctl;
     120             : 
     121           0 :         int   type    = fd_vinyl_bstream_ctl_type ( ctl );
     122           0 :         int   style   = fd_vinyl_bstream_ctl_style( ctl );
     123           0 :         ulong val_esz = fd_vinyl_bstream_ctl_sz   ( ctl );
     124             : 
     125           0 :         FD_CRIT( type==FD_VINYL_BSTREAM_CTL_TYPE_PAIR,                                              "corruption detected" );
     126           0 :         FD_CRIT( (style==FD_VINYL_BSTREAM_CTL_STYLE_RAW) | (style==FD_VINYL_BSTREAM_CTL_STYLE_LZ4), "corruption detected" );
     127           0 :         FD_CRIT( val_esz<=FD_VINYL_VAL_MAX,                                                         "corruption detected" );
     128             : 
     129           0 :         fd_vinyl_data_obj_t * cobj = fd_vinyl_data_alloc( data, fd_vinyl_data_szc( val_esz ) );
     130           0 :         if( FD_UNLIKELY( !cobj ) ) FD_LOG_CRIT(( "increase data cache size" ));
     131             : 
     132           0 :         fd_vinyl_bstream_phdr_t * cphdr    = fd_vinyl_data_obj_phdr( cobj );
     133           0 :         ulong                     cpair_sz = fd_vinyl_bstream_pair_sz( val_esz );
     134             : 
     135           0 :         fd_vinyl_io_read_imm( io, seq_src, cphdr, cpair_sz );
     136             :         /* not an async read (so no read_cnt increment) */
     137             : 
     138             :         /* Verify data integrity */
     139             : 
     140           0 :         FD_ALERT( !fd_vinyl_bstream_pair_test( io_seed, seq_src, (fd_vinyl_bstream_block_t *)cphdr, cpair_sz ),
     141           0 :                   "corruption detected" );
     142             : 
     143             :         /* Decode the pair */
     144             : 
     145           0 :         if( FD_LIKELY( style==FD_VINYL_BSTREAM_CTL_STYLE_RAW ) ) {
     146             : 
     147           0 :           FD_CRIT( val_esz==val_sz, "corruption detected" );
     148             : 
     149           0 :           obj_src  = cobj;
     150           0 :           phdr_src = cphdr;
     151             : 
     152           0 :         } else {
     153             : 
     154           0 :           obj_src = fd_vinyl_data_alloc( data, fd_vinyl_data_szc( val_sz ) );
     155           0 :           if( FD_UNLIKELY( !obj_src ) ) FD_LOG_CRIT(( "increase data cache size" ));
     156             : 
     157           0 :           char const * cval = (char const *)fd_vinyl_data_obj_val( cobj    );
     158           0 :           char *       val  = (char *)      fd_vinyl_data_obj_val( obj_src );
     159           0 :           if( FD_UNLIKELY( (ulong)LZ4_decompress_safe( cval,  val, (int)val_esz, (int)val_sz )!=val_sz ) )
     160           0 :             FD_LOG_CRIT(( "LZ4_decompress_safe failed" ));
     161             : 
     162           0 :           phdr_src = fd_vinyl_data_obj_phdr( obj_src );
     163             : 
     164           0 :           phdr_src->ctl  = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR, FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz );
     165           0 :           phdr_src->key  = cphdr->key;
     166           0 :           phdr_src->info = cphdr->info;
     167             : 
     168           0 :           fd_vinyl_data_free( data, cobj );
     169             : 
     170           0 :         }
     171             : 
     172           0 :         line_idx_src = fd_vinyl_line_evict_lru( &vinyl->line_idx_lru, line, line_cnt, ele0, ele_max, data );
     173             : 
     174           0 :         ulong line_ctl_src = line[ line_idx_src ].ctl;
     175             : 
     176           0 :         ulong ver_src = fd_vinyl_line_ctl_ver( line_ctl_src );
     177             : 
     178           0 :         line[ line_idx_src ].obj     = obj_src; obj_src->line_idx = line_idx_src; obj_src->rd_active = (short)0;
     179           0 :         line[ line_idx_src ].ele_idx = ele_idx_src; ele0[ ele_idx_src ].line_idx = line_idx_src;
     180           0 :         line[ line_idx_src ].ctl     = fd_vinyl_line_ctl( ver_src+1UL, 0L );
     181             : 
     182           0 :         fd_vinyl_line_evict_prio( &vinyl->line_idx_lru, line, line_cnt, line_idx_src, FD_VINYL_LINE_EVICT_PRIO_LRU );
     183             : 
     184           0 :         if( line_idx_src==line_idx_dst ) line_idx_dst = ULONG_MAX; /* Handle evict_lru evicting the dst */
     185             : 
     186           0 :       }
     187             : 
     188             :       /* At this point, pair key_src is cached but not acquired and pair
     189             :          key_dst is not acquired.  We are clear to move.  If pair
     190             :          key_dst exists, we are replacing pair key_dst with pair
     191             :          key_src.  In this case, we remove pair key_dst from cache and
     192             :          remove pair key_dst from the meta.  This remove might move the
     193             :          location of pair key_src's meta element.  So we reload if
     194             :          necessary. */
     195             : 
     196           0 :       FD_CRIT( fd_vinyl_bstream_ctl_type( phdr_src->ctl )==fd_vinyl_bstream_ctl_type( ele0[ ele_idx_src ].phdr.ctl ),
     197           0 :                                                                                                     "corruption detected" );
     198           0 :       FD_CRIT( fd_vinyl_key_eq( &phdr_src->key, &ele0[ ele_idx_src ].phdr.key ),                    "corruption detected" );
     199           0 :       FD_CRIT( !memcmp( &phdr_src->info, &ele0[ ele_idx_src ].phdr.info, sizeof(fd_vinyl_info_t) ), "corruption detected" );
     200             : 
     201           0 :       accum_garbage_cnt += 2UL; /* old src and new move block */
     202           0 :       accum_garbage_sz  += fd_vinyl_bstream_pair_sz( fd_vinyl_bstream_ctl_sz( ele0[ ele_idx_src ].phdr.ctl ) ) +
     203           0 :                            FD_VINYL_BSTREAM_BLOCK_SZ;
     204             : 
     205           0 :       if( FD_UNLIKELY( !err_dst ) ) {
     206             : 
     207           0 :         accum_garbage_cnt++; /* old dst */
     208           0 :         accum_garbage_sz += fd_vinyl_bstream_pair_sz( fd_vinyl_bstream_ctl_sz( ele0[ ele_idx_dst ].phdr.ctl ) );
     209             : 
     210           0 :         if( FD_UNLIKELY( line_idx_dst < line_cnt ) ) {
     211             : 
     212           0 :           FD_CRIT( line[ line_idx_dst ].ele_idx==ele_idx_dst, "corruption detected" );
     213             : 
     214           0 :           fd_vinyl_data_obj_t * obj_dst = line[ line_idx_dst ].obj;
     215             : 
     216           0 :           FD_ALERT( fd_vinyl_data_is_valid_obj( obj_dst, vol, vol_cnt ), "corruption detected" );
     217           0 :           FD_CRIT ( obj_dst->line_idx==line_idx_dst,                     "corruption detected" );
     218             : 
     219           0 :           ulong line_ctl_dst = line[ line_idx_dst ].ctl;
     220             : 
     221           0 :           ulong ver_dst = fd_vinyl_line_ctl_ver( line_ctl_dst );
     222             : 
     223           0 :           fd_vinyl_data_free( data, obj_dst );
     224             : 
     225           0 :           line[ line_idx_dst ].obj     = NULL;
     226           0 :           line[ line_idx_dst ].ele_idx = ULONG_MAX; // ele0[ ele_idx_dst ].line_idx = ULONG_MAX; /* Technically not necessary given below */
     227           0 :           line[ line_idx_dst ].ctl     = fd_vinyl_line_ctl( ver_dst+1UL, 0L );
     228             : 
     229           0 :           fd_vinyl_line_evict_prio( &vinyl->line_idx_lru, line, line_cnt, line_idx_dst, FD_VINYL_LINE_EVICT_PRIO_LRU );
     230           0 :         }
     231             : 
     232           0 :         fd_vinyl_meta_remove_fast( ele0, ele_max, lock, lock_shift, line, line_cnt, ele_idx_dst ); /* See note below about atomicity for concurrent meta readers */
     233             : 
     234           0 :         ulong pair_cnt = vinyl->pair_cnt;
     235           0 :         FD_CRIT( pair_cnt, "corruption detected" );
     236           0 :         vinyl->pair_cnt = pair_cnt - 1UL;
     237             : 
     238           0 :         err_src = fd_vinyl_meta_query_fast( ele0, ele_max, key_src, memo_src, &_ele_idx_src );
     239           0 :         ele_idx_src = _ele_idx_src; /* In [0,ele_max) */
     240           0 :         FD_CRIT( !err_src, "corruption detected" );
     241             :         /* Note: could test other fields post move too */
     242             : 
     243           0 :       }
     244             : 
     245             :       /* At this point, pair key_src is cached but not acquired and pair
     246             :          key_dst is not cached and not in the meta (the move block that
     247             :          will official erase if it already exists will be written
     248             :          below).  Update the cached phdr to reflect the move.  Remove
     249             :          the meta entry for pair key_src and insert a meta entry for
     250             :          pair key_dst.
     251             : 
     252             :          Note: this means from the point of view of concurrent meta
     253             :          queries, there will be a brief time interval when pair key_src
     254             :          and pair key_dst are both reported as not existing.
     255             : 
     256             :          As an alternative with more overhead we could instead insert
     257             :          the meta element for key_dst, remove the meta element for
     258             :          key_src and requery meta for key_dst (as the remove could move
     259             :          it).  In this case, there will be a gap where both key_src and
     260             :          key_dst are both reported as available (and they will point to
     261             :          the same cache entry during this interval).
     262             : 
     263             :          With even more complexity and overhead, we could eliminate the
     264             :          gap and overhead and make this atomic from the point of view of
     265             :          concurrent meta readers.  (Would have compute a lock set that
     266             :          cover the target key_dst insert location and the key_src probe
     267             :          sequence assuming key_dst has been inserted, lock the locks, do
     268             :          the insert, do the remove without any locking behavior, free
     269             :          the lock set and then requery where key_dst ended up.)  Also
     270             :          note that, if we are replacing pair key_dst, at this point,
     271             :          pair key_dst is already reported to concurrent meta readers as
     272             :          not existing.  Would need to extend this to the above.
     273             : 
     274             :          But it isn't clear that concurrent meta readers care at all.
     275             :          So we go with the fast simple method below (it still is atomic
     276             :          from the point of view of clients and the bstream). */
     277             : 
     278           0 :       ulong pair_sz  = fd_vinyl_bstream_pair_sz( val_sz );
     279           0 :       ulong seq_move = fd_vinyl_io_hint( io, FD_VINYL_BSTREAM_BLOCK_SZ + pair_sz );
     280           0 :       ulong seq_dst  = seq_move + FD_VINYL_BSTREAM_BLOCK_SZ;
     281             : 
     282             :     //phdr_src->ctl  = ... already init
     283           0 :       phdr_src->key = *key_dst;
     284             :     //phdr_src->info = ... already init
     285             : 
     286           0 :       fd_vinyl_meta_remove_fast( ele0, ele_max, lock, lock_shift, line, line_cnt, ele_idx_src );
     287             : 
     288           0 :       err_dst = fd_vinyl_meta_query_fast( ele0, ele_max, key_dst, memo_dst, &_ele_idx_dst );
     289           0 :       ele_idx_dst = _ele_idx_dst; /* In [0,ele_max) */
     290             : 
     291           0 :       FD_CRIT( err_dst==FD_VINYL_ERR_KEY, "corruption detected" );
     292             : 
     293           0 :       ele0[ ele_idx_dst ].memo      = memo_dst;
     294             :     //ele0[ ele_idx_dst ].phdr.ctl  = ... init below for concurrent safe insert
     295           0 :       ele0[ ele_idx_dst ].phdr.key  = phdr_src->key;
     296           0 :       ele0[ ele_idx_dst ].phdr.info = phdr_src->info;
     297           0 :       ele0[ ele_idx_dst ].line_idx  = line_idx_src;
     298           0 :       ele0[ ele_idx_dst ].seq       = seq_dst;
     299             : 
     300           0 :       FD_COMPILER_MFENCE();
     301           0 :       ele0[ ele_idx_dst ].phdr.ctl = phdr_src->ctl;
     302           0 :       FD_COMPILER_MFENCE();
     303             : 
     304           0 :       line[ line_idx_src ].ele_idx = ele_idx_dst;
     305             : 
     306           0 :       fd_vinyl_io_append_move( io, phdr_src, key_dst, NULL, 0UL );
     307           0 :       append_cnt++;
     308           0 :       accum_move_cnt++;
     309             : 
     310           0 :       fd_vinyl_bstream_pair_hash( io_seed, (fd_vinyl_bstream_block_t *)phdr_src );
     311             : 
     312           0 :       ulong seq = fd_vinyl_io_append( io, phdr_src, pair_sz );
     313           0 :       append_cnt++;
     314           0 :       FD_CRIT( fd_vinyl_seq_eq( seq, seq_dst ), "unexpected append location" );
     315             : 
     316           0 :       DONE( FD_VINYL_SUCCESS );
     317             : 
     318           0 :     next_move: /* silly language restriction */;
     319             : 
     320           0 : #     undef DONE
     321             : 
     322           0 :     }
     323             : 
     324           0 :     comp_err = FD_VINYL_SUCCESS;
     325           0 :     break;
     326           0 :   }

Generated by: LCOV version 1.14