LCOV - code coverage report
Current view: top level - vinyl - fd_vinyl_recover.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 346 0.0 %
Date: 2025-12-07 04:58:33 Functions: 0 9 0.0 %

          Line data    Source code
       1             : #include "fd_vinyl.h"
       2             : #include "fd_vinyl_recover_serial.c"
       3             : 
       4           0 : #define PEEK( seq ) ((fd_vinyl_bstream_block_t *)(mmio + ((seq) % mmio_sz)))
       5             : 
       6             : /* fd_vinyl_recover_test tests if parallel recovery is possible. */
       7             : 
       8             : static int
       9           0 : fd_vinyl_recover_test( fd_vinyl_io_t * io ) {
      10             : 
      11           0 :   uchar * mmio = (uchar *)fd_vinyl_mmio( io );
      12             : 
      13           0 :   if( FD_UNLIKELY( !mmio ) ) {
      14           0 :     FD_LOG_NOTICE(( "bstream io interface type does not support parallel memory mapped io"
      15           0 :                     "\n\tfalling back to serial recovery" ));
      16           0 :     return FD_VINYL_ERR_INVAL;
      17           0 :   }
      18             : 
      19           0 :   ulong mmio_sz = fd_vinyl_mmio_sz( io );
      20             : 
      21           0 :   ulong seq_past    = fd_vinyl_io_seq_past   ( io );
      22           0 :   ulong seq_present = fd_vinyl_io_seq_present( io );
      23           0 :   ulong io_seed     = fd_vinyl_io_seed       ( io );
      24             : 
      25             : //ulong tstone_req = 0UL;
      26             : 
      27           0 :   ulong seq1 = seq_present;
      28           0 :   while( fd_vinyl_seq_gt( seq1, seq_past ) ) {
      29             : 
      30             :     /* At this point, we've tested [seq1,seq_present) is suitable for
      31             :        parallel recovery.  Peek at the block just before seq1.  If it is
      32             :        not a valid partition block, we can't do parallel recovery. */
      33             : 
      34           0 :     ulong part_seq = seq1 - FD_VINYL_BSTREAM_BLOCK_SZ;
      35             : 
      36           0 :     fd_vinyl_bstream_block_t block[1];
      37             : 
      38           0 :     block[0] = *PEEK( part_seq );
      39             : 
      40           0 :     char const * _err = fd_vinyl_bstream_part_test( io_seed, part_seq, block ); /* testing changes the block */
      41           0 :     if( FD_UNLIKELY( _err ) ) {
      42           0 :       FD_LOG_WARNING(( "bstream past does not have a valid partitioning"
      43           0 :                        "\n\tseq %016lx: %s"
      44           0 :                        "\n\tprevious bstream writers probably did not terminate cleanly"
      45           0 :                        "\n\tfalling back to serial recovery", part_seq, _err ));
      46           0 :       return FD_VINYL_ERR_CORRUPT;
      47           0 :     }
      48             : 
      49             :     /* We got a valid partition block.  Determine the start of this
      50             :        partition. */
      51             : 
      52           0 :     ulong seq0 = block->part.seq0;
      53           0 :     seq0 = fd_vinyl_seq_gt( seq0, seq_past ) ? seq0 : seq_past;
      54             : 
      55             : #   if 0
      56             :     /* Compute the maximum number of deads the portion of this partition
      57             :        in the bstream's past that could produce as the lesser the number
      58             :        of deads reported in the partition and the number of blocks in
      59             :        the partition.  Similarly for move (note that each move makes two
      60             :        tombstone but also requires at least two blocks ... so moves also
      61             :        make, at most, 1 tombstone per block on average). */
      62             : 
      63             :     ulong part_sz  = seq1 - seq0 - FD_VINYL_BSTREAM_BLOCK_SZ; /* exclude trailing part block for below */
      64             : 
      65             :     ulong dead_max = fd_ulong_min( block->part.dead_cnt, part_sz );
      66             :     ulong move_max = fd_ulong_min( block->part.move_cnt, part_sz );
      67             : 
      68             :     tstone_req += fd_ulong_min( dead_max + 2UL*move_max, part_sz );
      69             : #   endif
      70             : 
      71             :     /* Move to the previous partition */
      72             : 
      73           0 :     seq1 = seq0;
      74           0 :   }
      75             : 
      76             :   /* We seem to have a valid partitioning for parallel recovery */
      77             : 
      78             : # if 0
      79             :   if( FD_UNLIKELY( tstone_req > tstone_max ) ) {
      80             :     FD_LOG_WARNING(( "insufficient scratch space for parallel recovery"
      81             :                      "\n\tincrease data cache size"
      82             :                      "\n\tfalling back to serial recovery" ));
      83             :     return FD_VINYL_ERR_FULL;
      84             :   }
      85             : # endif
      86             : 
      87           0 :   return FD_VINYL_SUCCESS;
      88           0 : }
      89             : 
      90             : /* fd_vinyl_recover_line_task tests parallel flushes all vinyl
      91             :    lines and resets the evicition priority sequence. */
      92             : 
      93           0 : static FD_FOR_ALL_BEGIN( fd_vinyl_recover_line_task, 1L ) {
      94           0 :   fd_vinyl_t * vinyl = (fd_vinyl_t *)arg[0];
      95             : 
      96           0 :   fd_vinyl_line_t * line     = vinyl->line;
      97           0 :   ulong             line_cnt = vinyl->line_cnt;
      98             : 
      99           0 :   ulong line0 = (ulong)block_i0;
     100           0 :   ulong line1 = (ulong)block_i1;
     101             : 
     102           0 :   for( ulong line_idx=line0; line_idx<line1; line_idx++ ) {
     103           0 :     line[ line_idx ].obj            = NULL;
     104           0 :     line[ line_idx ].ele_idx        = ULONG_MAX;
     105           0 :     line[ line_idx ].ctl            = fd_vinyl_line_ctl( 0UL, 0L);
     106           0 :     line[ line_idx ].line_idx_older = (uint)fd_ulong_if( line_idx!=0UL,          line_idx-1UL, line_cnt-1UL );
     107           0 :     line[ line_idx ].line_idx_newer = (uint)fd_ulong_if( line_idx!=line_cnt-1UL, line_idx+1UL, 0UL          );
     108           0 :   }
     109             : 
     110           0 : } FD_FOR_ALL_END
     111             : 
     112             : /* fd_vinyl_recover_reclaim_task parallel locks all the meta locks,
     113             :    reclaiming any that were locked from presumably dead writers that
     114             :    terminated uncleanly.  Returns the number of locks reclaimed. */
     115             : 
     116           0 : static FD_MAP_REDUCE_BEGIN( fd_vinyl_recover_reclaim_task, 1L, alignof(ulong), sizeof(ulong), 1UL ) {
     117           0 :   ulong      * _reclaim_cnt = (ulong *)     arg[0];
     118           0 :   fd_vinyl_t * vinyl        = (fd_vinyl_t *)arg[1];
     119             : 
     120           0 :   ulong * lock = vinyl->meta->lock;
     121             : 
     122           0 :   ulong reclaim_cnt = 0UL;
     123             : 
     124           0 :   for( long lock_idx=block_i0; lock_idx<block_i1; lock_idx++ ) {
     125           0 : #   if FD_HAS_ATOMIC
     126           0 :     ulong l = FD_ATOMIC_FETCH_AND_OR( &lock[ lock_idx ], 1UL );
     127             : #   else
     128             :     ulong l = lock[ lock_idx ];
     129             :     lock[ lock_idx ] |= 1UL;
     130             : #   endif
     131           0 :     reclaim_cnt += l & 1UL;
     132           0 :   }
     133             : 
     134           0 :   *_reclaim_cnt = reclaim_cnt;
     135             : 
     136           0 : } FD_MAP_END {
     137             : 
     138           0 :   *(ulong *)arg[0] += *(ulong const *)_r1;
     139             : 
     140           0 : } FD_REDUCE_END
     141             : 
     142             : /* fd_vinyl_recover_meta_flush_task tests parallel clears the meta
     143             :    element storage.  Assumes the meta is fully locked. */
     144             : 
     145           0 : static FD_FOR_ALL_BEGIN( fd_vinyl_recover_meta_flush_task, 1L ) {
     146           0 :   fd_vinyl_t * vinyl = (fd_vinyl_t *)arg[0];
     147             : 
     148           0 :   fd_vinyl_meta_ele_t * ele0 = vinyl->meta->ele;
     149             : 
     150           0 :   fd_vinyl_meta_ele_t init_ele[1];
     151           0 :   memset( init_ele, 0, sizeof(fd_vinyl_meta_ele_t) );
     152           0 :   init_ele->line_idx = ULONG_MAX;
     153             : 
     154           0 :   for( long ele_idx=block_i0; ele_idx<block_i1; ele_idx++ ) ele0[ ele_idx ] = init_ele[0];
     155             : 
     156           0 : } FD_FOR_ALL_END
     157             : 
     158             : /* fd_vinyl_recover_unlock_task tests parallel unlocks all the meta
     159             :    locks.  Assumes the meta is fully locked. */
     160             : 
     161           0 : static FD_FOR_ALL_BEGIN( fd_vinyl_recover_unlock_task, 1L ) {
     162           0 :   fd_vinyl_t * vinyl = (fd_vinyl_t *)arg[0];
     163             : 
     164           0 :   ulong * lock = vinyl->meta->lock;
     165             : 
     166           0 :   for( long lock_idx=block_i0; lock_idx<block_i1; lock_idx++ ) lock[ lock_idx ]++;
     167             : 
     168           0 : } FD_FOR_ALL_END
     169             : 
     170             : /* fd_vinyl_recover_tstone inserts a tstone for key at seq in the meta
     171             :    if there isn't anything beyond seq for key already.  Returns SUCCESS
     172             :    on success and FD_VINYL_ERR code on failure.  This will update the
     173             :    pair_cnt, garbage_sz and tstone_cnt counters appropriately. */
     174             : 
     175             : static int
     176             : fd_vinyl_recover_tstone( fd_vinyl_meta_t *      meta,
     177             :                          fd_vinyl_key_t const * key,
     178             :                          ulong                  seq,
     179             :                          ulong *                _pair_cnt,
     180             :                          ulong *                _garbage_sz,
     181           0 :                          ulong *                _tstone_cnt ) {
     182             : 
     183             :   /* Query meta for key */
     184             : 
     185           0 :   fd_vinyl_meta_query_t query[1];
     186             : 
     187           0 :   fd_vinyl_meta_prepare( meta, key, NULL, query, FD_MAP_FLAG_BLOCKING );
     188             : 
     189           0 :   fd_vinyl_meta_ele_t * ele = fd_vinyl_meta_query_ele( query );
     190             : 
     191           0 :   if( FD_UNLIKELY( !ele ) ) {
     192           0 :     FD_LOG_NOTICE(( "%016lx: increase meta cache size for parallel recovery or corruption", seq ));
     193           0 :     return FD_VINYL_ERR_FULL;
     194           0 :   }
     195             : 
     196           0 :   if( FD_LIKELY( !ele->phdr.ctl ) ) {
     197             : 
     198             :     /* There is no version or tstone for pair key in the meta currently.
     199             :        Insert a tstone at seq for key so any versions or tstone for pair
     200             :        key encountered later in parallel recovery can tell if they are
     201             :        before or after this tstone.  Because we don't know if there will
     202             :        version of key after this, we need to append key to the tstone
     203             :        array. */
     204             : 
     205             :    //pair_cnt   unchanged
     206             :    //garbage_sz unchanged
     207           0 :     (*_tstone_cnt)++;
     208             : 
     209           0 :     ele->memo      = fd_vinyl_meta_query_memo( query );
     210           0 :     ele->phdr.ctl  = 1UL;
     211           0 :     ele->phdr.key  = *key;
     212             :   //ele->phdr.info = d/c
     213           0 :     ele->line_idx  = ULONG_MAX - 1UL; // tstone
     214           0 :     ele->seq       = seq;
     215             : 
     216           0 :     fd_vinyl_meta_publish( query );
     217             : 
     218           0 :   } else if( FD_LIKELY( fd_vinyl_seq_lt( ele->seq, seq ) ) ) {
     219             : 
     220             :     /* The version (or tstone) for pair key in the meta is older than
     221             :        seq.  We append a key to the tstone array if we haven't already. */
     222             : 
     223           0 :     int old_ele_is_pair = (ele->line_idx==ULONG_MAX);
     224             : 
     225           0 :     (*_pair_cnt)   -= (ulong)old_ele_is_pair;
     226           0 :     (*_garbage_sz) +=        old_ele_is_pair ? fd_vinyl_bstream_pair_sz( fd_vinyl_bstream_ctl_sz( ele->phdr.ctl ) ) : 0UL;
     227           0 :     (*_tstone_cnt) += (ulong)old_ele_is_pair;
     228             : 
     229             :   //ele->memo      = already init
     230             :   //ele->phdr.ctl  = already init
     231             :   //ele->phdr.key  = already init
     232             :   //ele->phdr.info = d/c
     233           0 :     ele->line_idx  = ULONG_MAX - 1UL; // tstone
     234           0 :     ele->seq       = seq;
     235             : 
     236           0 :     fd_vinyl_meta_publish( query );
     237             : 
     238           0 :   } else {
     239             : 
     240             :     /* The meta entry (pair or tstone) for pair key in the meta is newer
     241             :        than seq.  We can skip this tstone. */
     242             : 
     243             :    //pair_cnt   unchanged
     244             :    //garbage_sz unchanged
     245             :    //tstone_cnt unchanged
     246             : 
     247           0 :     int corrupt = fd_vinyl_seq_eq( ele->seq, seq );
     248             : 
     249           0 :     fd_vinyl_meta_cancel( query );
     250             : 
     251           0 :     if( FD_UNLIKELY( corrupt ) ) {
     252           0 :       FD_LOG_WARNING(( "%016lx: probable corruption detected", seq ));
     253           0 :       return FD_VINYL_ERR_CORRUPT;
     254           0 :     }
     255             : 
     256           0 :   }
     257             : 
     258           0 :   return FD_VINYL_SUCCESS;
     259           0 : }
     260             : 
     261             : /* fd_vinyl_recover_part_task dynamically assigns the partitions of the
     262             :    bstream's past to threads for recovery and then recovers them in
     263             :    parallel.  The bstream past partition iteration is near identical
     264             :    to bstream past iteration in serial recovery.  See
     265             :    fd_vinyl_recover_serial.c for more details. */
     266             : 
     267             : /* FIXME: ADD MORE EXTENSIVE DATA INTEGRITY CHECKING LIKE SERIAL IMPL */
     268             : 
     269           0 : static FD_FN_UNUSED FD_MAP_REDUCE_BEGIN( fd_vinyl_recover_part_task, 1UL, alignof(ulong), sizeof(ulong), 4UL ) {
     270           0 :   ulong *          _rlocal    = (ulong *)         arg[0];
     271           0 :   fd_vinyl_t *     vinyl      = (fd_vinyl_t *)    arg[1];
     272           0 :   ulong *          _lock      = (ulong *)         arg[2];
     273             : 
     274           0 :   fd_vinyl_io_t *   io   = vinyl->io;
     275           0 :   fd_vinyl_meta_t * meta = vinyl->meta;
     276             : 
     277           0 :   ulong   io_seed  =          fd_vinyl_io_seed    ( io );
     278           0 :   ulong   seq_past =          fd_vinyl_io_seq_past( io );
     279           0 :   uchar * mmio     = (uchar *)fd_vinyl_mmio       ( io );
     280           0 :   ulong   mmio_sz  =          fd_vinyl_mmio_sz    ( io );
     281             : 
     282           0 :   ulong fail       = 1UL;
     283           0 :   ulong pair_cnt   = 0UL;
     284           0 :   ulong garbage_sz = 0UL;
     285           0 :   ulong tstone_cnt = 0UL;
     286             : 
     287           0 :   for(;;) {
     288             : 
     289             :     /* Determine the range of the bstream past we should process next. */
     290             : 
     291           0 :     ulong seq0;
     292           0 :     ulong seq1;
     293             : 
     294             :     /* Lock and fetch the task assignment cursor */
     295             : 
     296           0 :     FD_COMPILER_MFENCE();
     297           0 : #   if FD_HAS_ATOMIC
     298           0 :     while( FD_ATOMIC_CAS( _lock, 0UL, 1UL ) ) FD_SPIN_PAUSE();
     299             : #   else
     300             :     *_lock = 1UL;
     301             : #   endif
     302           0 :     FD_COMPILER_MFENCE();
     303             : 
     304           0 :     seq1 = _lock[1];
     305             : 
     306             :     /* At this point, the bstream range [seq_past,seq1) has not been
     307             :        assigned.  If seq1 is at seq_past, everything has been assigned
     308             :        already.  Otherwise, the block before cursor is a valid partition
     309             :        block (as per the test above) and we claim the range:
     310             : 
     311             :          [ the older of part_seq0 and seq_past, seq1 )
     312             : 
     313             :        to process. */
     314             : 
     315           0 :     if( FD_UNLIKELY( fd_vinyl_seq_le( seq1, seq_past ) ) ) seq0 = seq_past;
     316           0 :     else {
     317           0 :       fd_vinyl_bstream_block_t const * block = PEEK( seq1 - FD_VINYL_BSTREAM_BLOCK_SZ );
     318           0 :       seq0 = block->part.seq0;
     319           0 :       if( fd_vinyl_seq_lt( seq0, seq_past ) ) seq0 = seq_past;
     320           0 :     }
     321             : 
     322             :     /* Update and unlock the task assignment cursor */
     323             : 
     324           0 :     _lock[1] = seq0;
     325           0 :     FD_COMPILER_MFENCE();
     326           0 :     _lock[0] = 0UL;
     327           0 :     FD_COMPILER_MFENCE();
     328             : 
     329           0 :     if( FD_UNLIKELY( fd_vinyl_seq_le( seq1, seq_past ) ) ) break;
     330             : 
     331             :     /* At this point, we need to recover the range [seq0,seq1). */
     332             : 
     333           0 :     ulong seq = seq0;
     334           0 :     while( fd_vinyl_seq_lt( seq, seq1 ) ) {
     335             : 
     336           0 :       fd_vinyl_bstream_block_t block[1];
     337             : 
     338           0 :       block[0] = *(fd_vinyl_bstream_block_t *)PEEK( seq ); /* testing is destructive */
     339             : 
     340           0 :       ulong ctl = block->ctl;
     341             : 
     342           0 :       int type = fd_vinyl_bstream_ctl_type( ctl );
     343             : 
     344           0 :       switch( type ) {
     345             : 
     346           0 :       case FD_VINYL_BSTREAM_CTL_TYPE_PAIR: {
     347             : 
     348           0 :         ulong pair_val_esz = fd_vinyl_bstream_ctl_sz( ctl );
     349             : 
     350           0 :         ulong pair_sz = fd_vinyl_bstream_pair_sz( pair_val_esz );
     351             : 
     352           0 :         if( FD_UNLIKELY( pair_sz > (seq1-seq) ) ) { /* Wrapping safe */
     353           0 :           FD_LOG_WARNING(( "%016lx: truncated", seq ));
     354           0 :           goto done;
     355           0 :         }
     356             : 
     357           0 :         fd_vinyl_bstream_block_t ftr[1];
     358             : 
     359           0 :         ftr[0] = *PEEK( seq + pair_sz - FD_VINYL_BSTREAM_BLOCK_SZ );
     360             : 
     361           0 :         char const * _err = fd_vinyl_bstream_pair_test_fast( io_seed, seq, block, ftr );
     362           0 :         if( FD_UNLIKELY( _err ) ) {
     363           0 :           FD_LOG_WARNING(( "%016lx: %s", seq, _err ));
     364           0 :           goto done;
     365           0 :         }
     366             : 
     367             :         /* At this point, we appear to have valid completely written
     368             :            pair.  Prepare the meta to do an update for this key. */
     369             : 
     370           0 :         fd_vinyl_meta_query_t query[1];
     371             : 
     372           0 :         fd_vinyl_meta_prepare( meta, &block->phdr.key, NULL, query, FD_MAP_FLAG_BLOCKING );
     373             : 
     374           0 :         fd_vinyl_meta_ele_t * ele = fd_vinyl_meta_query_ele( query );
     375             : 
     376           0 :         if( FD_UNLIKELY( !ele ) ) {
     377           0 :           FD_LOG_WARNING(( "%016lx: corruption detected or meta cache too small for parallel recovery", seq ));
     378           0 :           goto done;
     379           0 :         }
     380             : 
     381           0 :         if( FD_LIKELY( (!ele->phdr.ctl) | fd_vinyl_seq_gt( seq, ele->seq ) ) ) {
     382             : 
     383           0 :           pair_cnt++;
     384             : 
     385             :           /* At this point, this is the first time any thread has seen
     386             :              pair key or this version of pair key is newer than the
     387             :              version (or tstone) of pair key has been seed */
     388             : 
     389           0 :           ele->memo     = fd_vinyl_meta_query_memo( query );
     390           0 :           ele->phdr     = block->phdr;
     391           0 :           ele->line_idx = ULONG_MAX;   // pair
     392           0 :           ele->seq      = seq;
     393             : 
     394           0 :           fd_vinyl_meta_publish( query );
     395             : 
     396           0 :         } else {
     397             : 
     398             :           /* At this point, this version of pair key is older than the
     399             :              version (or tstone) for pair key seen by all threads so
     400             :              far. */
     401             : 
     402           0 :           fd_vinyl_meta_cancel( query );
     403             : 
     404           0 :           garbage_sz += pair_sz;
     405             : 
     406           0 :         }
     407             : 
     408           0 :         seq += pair_sz;
     409           0 :         break;
     410           0 :       }
     411             : 
     412           0 :       case FD_VINYL_BSTREAM_CTL_TYPE_DEAD: {
     413             : 
     414           0 :         char const * _err = fd_vinyl_bstream_dead_test( io_seed, seq, block );
     415           0 :         if( FD_UNLIKELY( _err ) ) {
     416           0 :           FD_LOG_WARNING(( "%016lx: %s", seq, _err ));
     417           0 :           goto done;
     418           0 :         }
     419             : 
     420           0 :         int err = fd_vinyl_recover_tstone( meta, &block->dead.phdr.key, seq, &pair_cnt, &garbage_sz, &tstone_cnt );
     421           0 :         if( FD_UNLIKELY( err ) ) goto done; /* logs details */
     422             : 
     423           0 :         garbage_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
     424           0 :         seq        += FD_VINYL_BSTREAM_BLOCK_SZ;
     425           0 :         break;
     426           0 :       }
     427             : 
     428           0 :       case FD_VINYL_BSTREAM_CTL_TYPE_MOVE: {
     429             : 
     430           0 :         if( FD_UNLIKELY( 2UL*FD_VINYL_BSTREAM_BLOCK_SZ > (seq1-seq) ) ) { /* Wrapping safe */
     431           0 :           FD_LOG_WARNING(( "%016lx: truncated", seq ));
     432           0 :           goto done;
     433           0 :         }
     434             : 
     435           0 :         fd_vinyl_bstream_block_t dst[1];
     436             : 
     437           0 :         dst[0] = *PEEK( seq + FD_VINYL_BSTREAM_BLOCK_SZ );
     438             : 
     439           0 :         char const * _err = fd_vinyl_bstream_move_test( io_seed, seq, block, dst );
     440           0 :         if( FD_UNLIKELY( _err ) ) {
     441           0 :           FD_LOG_WARNING(( "%016lx: %s", seq, _err ));
     442           0 :           goto done;
     443           0 :         }
     444             : 
     445           0 :         int  err = fd_vinyl_recover_tstone( meta, &block->move.src.key, seq, &pair_cnt, &garbage_sz, &tstone_cnt );
     446           0 :         if( FD_UNLIKELY( err ) ) goto done; /* logs details */
     447             : 
     448           0 :         /**/ err = fd_vinyl_recover_tstone( meta, &block->move.dst,     seq, &pair_cnt, &garbage_sz, &tstone_cnt );
     449           0 :         if( FD_UNLIKELY( err ) ) goto done; /* logs details */
     450             : 
     451           0 :         garbage_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
     452           0 :         seq        += FD_VINYL_BSTREAM_BLOCK_SZ;
     453           0 :         break;
     454           0 :       }
     455             : 
     456           0 :       case FD_VINYL_BSTREAM_CTL_TYPE_PART: {
     457             : 
     458           0 :         char const * _err = fd_vinyl_bstream_part_test( io_seed, seq, block );
     459           0 :         if( FD_UNLIKELY( _err ) ) {
     460           0 :           FD_LOG_WARNING(( "%016lx: %s", seq, _err ));
     461           0 :           goto done;
     462           0 :         }
     463             : 
     464           0 :         garbage_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
     465           0 :         seq        += FD_VINYL_BSTREAM_BLOCK_SZ;
     466           0 :         break;
     467           0 :       }
     468             : 
     469           0 :       case FD_VINYL_BSTREAM_CTL_TYPE_ZPAD: {
     470             : 
     471           0 :         char const * _err = fd_vinyl_bstream_zpad_test( io_seed, seq, block );
     472           0 :         if( FD_UNLIKELY( _err ) ) {
     473           0 :           FD_LOG_WARNING(( "%016lx: %s", seq, _err ));
     474           0 :           goto done;
     475           0 :         }
     476             : 
     477           0 :         seq += FD_VINYL_BSTREAM_BLOCK_SZ;
     478           0 :         break;
     479           0 :       }
     480             : 
     481           0 :       default:
     482           0 :         FD_LOG_WARNING(( "%016lx: unknown type (%x)", seq, (uint)type ));
     483           0 :         goto done;
     484             : 
     485           0 :       }
     486           0 :     }
     487             : 
     488           0 :     if( FD_UNLIKELY( fd_vinyl_seq_ne( seq, seq1 ) ) ) {
     489           0 :       FD_LOG_WARNING(( "%016lx: bad partitioning", seq ));
     490           0 :       goto done;
     491           0 :     }
     492             : 
     493           0 :   }
     494             : 
     495           0 :   fail = 0UL;
     496             : 
     497           0 : done:
     498             : 
     499             :   /* If we failed, tell all the other threads to not continue by
     500             :      setting the task assignment cursor to seq_past. */
     501             : 
     502           0 :   if( fail ) {
     503           0 :     FD_COMPILER_MFENCE();
     504           0 : #   if FD_HAS_ATOMIC
     505           0 :     while( FD_ATOMIC_CAS( _lock, 0UL, 1UL ) ) FD_SPIN_PAUSE();
     506             : #   else
     507             :     *_lock = 1UL;
     508             : #   endif
     509           0 :     FD_COMPILER_MFENCE();
     510           0 :     _lock[1]= seq_past;
     511           0 :     FD_COMPILER_MFENCE();
     512           0 :     _lock[0]= 0UL;
     513           0 :   }
     514             : 
     515           0 :   _rlocal[0] = fail;
     516           0 :   _rlocal[1] = pair_cnt;
     517           0 :   _rlocal[2] = garbage_sz;
     518           0 :   _rlocal[3] = tstone_cnt;
     519             : 
     520           0 : } FD_MAP_END {
     521             : 
     522           0 :   ulong       * _rlocal  = (ulong *)      arg[0];
     523           0 :   ulong const * _rremote = (ulong const *)_r1;
     524             : 
     525           0 :   _rlocal[0] |= _rremote[0];
     526           0 :   _rlocal[1] += _rremote[1];
     527           0 :   _rlocal[2] += _rremote[2];
     528           0 :   _rlocal[3] += _rremote[3];
     529             : 
     530           0 : } FD_REDUCE_END
     531             : 
     532           0 : static FD_FN_UNUSED FD_MAP_REDUCE_BEGIN( fd_vinyl_recover_meta_cleanup_task, 1L, alignof(ulong), sizeof(ulong), 1UL ) {
     533           0 :   ulong * _rlocal = (ulong *)arg[0];
     534             : 
     535           0 :   fd_vinyl_t * vinyl = (fd_vinyl_t *)arg[1];
     536             : 
     537           0 :   fd_vinyl_meta_t * meta = vinyl->meta;
     538             : 
     539           0 :   fd_vinyl_meta_ele_t * ele0       = meta->ele;
     540           0 :   ulong const         * lock       = meta->lock;
     541           0 :   int                   lock_shift = meta->lock_shift;
     542             : 
     543           0 :   ulong remove_cnt = 0UL;
     544             : 
     545           0 :   for( long ele_idx=block_i0; ele_idx<block_i1; ele_idx++ ) {
     546           0 :     long lock_idx = ele_idx >> lock_shift;
     547             : 
     548           0 :     fd_vinyl_key_t key;
     549           0 :     int            try_remove;
     550             : 
     551             :     /* Do a non-blocking query by ele_idx (not be key).  We have to do
     552             :        this direct because this is no standard API for this.  This is
     553             :        highly unlikely to ever block (but theoretically could if the
     554             :        remove in a different thread has locked a probe chain that
     555             :        touches elements in this thread). */
     556             : 
     557           0 :     for(;;) {
     558           0 :       FD_COMPILER_MFENCE();
     559           0 :       ulong ver0 = lock[ lock_idx ];
     560           0 :       FD_COMPILER_MFENCE();
     561           0 :       if( FD_LIKELY( !(ver0 & 1UL) ) ) {
     562             : 
     563           0 :         try_remove = (!!ele0[ ele_idx ].phdr.ctl) & (ele0[ ele_idx ].line_idx==(ULONG_MAX-1UL));
     564           0 :         key        = ele0[ ele_idx ].phdr.key;
     565             : 
     566           0 :         FD_COMPILER_MFENCE();
     567           0 :         ulong ver1 = lock[ lock_idx ];
     568           0 :         FD_COMPILER_MFENCE();
     569           0 :         if( FD_LIKELY( ver0==ver1 ) ) break;
     570           0 :       }
     571           0 :       FD_SPIN_PAUSE();
     572           0 :     }
     573             : 
     574             :     /* If try_remove is not set, ele_idx either had no key it in or
     575             :        had a pair entry.  So we continue to the next slot. */
     576             : 
     577           0 :     if( FD_LIKELY( !try_remove ) ) continue;
     578             : 
     579             :     /* At this point, we observed key had a tstone in the meta above.
     580             :        So we try to remove it.  It is possible (though extremely
     581             :        unlikely for big sparse maps and the vanilla thread partitioning
     582             :        here) that a remove on another thread got key first.  So it is
     583             :        okay if this fails.  We have to use the parallel version of this
     584             :        (even if it is highly unlikely to interfere with other threads)
     585             :        for the same reason we had to use a non-blocking query above. */
     586             : 
     587           0 :     fd_vinyl_meta_query_t query[1];
     588           0 :     remove_cnt += (ulong)!fd_vinyl_meta_remove( meta, &key, query, FD_MAP_FLAG_BLOCKING );
     589           0 :   }
     590             : 
     591           0 :   *_rlocal = remove_cnt;
     592             : 
     593           0 : } FD_MAP_END {
     594             : 
     595           0 :   ulong       * _rlocal  = (ulong *)      arg[0];
     596           0 :   ulong const * _rremote = (ulong const *)_r1;
     597             : 
     598           0 :   *_rlocal += *_rremote;
     599             : 
     600           0 : } FD_REDUCE_END
     601             : 
     602             : ulong
     603             : fd_vinyl_recover( fd_tpool_t * tpool, ulong t0, ulong t1, int level,
     604           0 :                   fd_vinyl_t * vinyl ) {
     605             : 
     606           0 :   fd_vinyl_meta_t * meta     = vinyl->meta;
     607           0 :   ulong             line_cnt = vinyl->line_cnt;
     608             : 
     609           0 :   ulong ele_max  = meta->ele_max;
     610           0 :   ulong lock_cnt = meta->lock_cnt;
     611             : 
     612             :   /* Using all avaialble threads, flush the lines and meta cache.  We do
     613             :      the meta flush locked so we don't confuse any concurrent meta
     614             :      readers.  This will claim any existing locks (e.g.  the previous
     615             :      meta writer died while holding a lock and the user didn't clean it
     616             :      up before calling this). */
     617             : 
     618           0 :   ulong reclaim_cnt;
     619             : 
     620           0 :   FD_FOR_ALL   ( fd_vinyl_recover_line_task,       tpool,t0,t1, 0L,(long)line_cnt,               vinyl );
     621           0 :   FD_MAP_REDUCE( fd_vinyl_recover_reclaim_task,    tpool,t0,t1, 0L,(long)lock_cnt, &reclaim_cnt, vinyl );
     622           0 :   FD_FOR_ALL   ( fd_vinyl_recover_meta_flush_task, tpool,t0,t1, 0L,(long)ele_max,                vinyl );
     623           0 :   FD_FOR_ALL   ( fd_vinyl_recover_unlock_task,     tpool,t0,t1, 0L,(long)lock_cnt,               vinyl );
     624             : 
     625           0 :   if( FD_UNLIKELY( reclaim_cnt ) ) FD_LOG_WARNING(( "reclaimed %lu locks (dead writer?); attempting to continue", reclaim_cnt ));
     626             : 
     627             :   /* FIXME: should this fail if it detects in progress io? */
     628             : 
     629             :   /* If there is only 1 thread provided or the bstream past doesn't
     630             :      have a valid partitioning, use the serial recovery algorithm */
     631             : 
     632           0 : t1 = t0 + 1UL; /* Turn off parallel recovery while it is untested */
     633             : 
     634           0 :   if( FD_UNLIKELY( (t1-t0)<=1UL                     ) ||
     635           0 :       FD_UNLIKELY( fd_vinyl_recover_test( vinyl->io ) ||
     636           0 :       !FD_HAS_ATOMIC ) ) {
     637           0 :     fd_vinyl_data_reset( tpool,t0,t1, level, vinyl->data );
     638           0 :     return fd_vinyl_recover_serial( vinyl );
     639           0 :   }
     640             : 
     641           0 : # if FD_HAS_ATOMIC
     642             : 
     643             :   /* The parallel recovery of bstream partition may leave tstones in the
     644             :      meta elements.  To clean this up, we have two options.
     645             : 
     646             :      Option 1 (simplest and most robust): we parallel scan all the meta
     647             :      elements in parallel for tstones and remove them.  We might have to
     648             :      do more than one pass because the removal of elements could mean
     649             :      some elements are not placed well.  This requires no scratch (and
     650             :      thus is more robust against arbitrary erase / move patterns in the
     651             :      recovery region).  While it isn't any less algo inefficient
     652             :      (because we paralllel scan all the elements already to clear them),
     653             :      it is pracitcally less efficient for applications access patterns
     654             :      that don't generate many tombstones and/or have pair_cnt<<pair_max.
     655             : 
     656             :      Option 2 (fastest but trickiest): we append the keys that might
     657             :      have tstones at the end of partition processing in a scratch memory
     658             :      during the parallel recovery.  The vinyl data cache region is huge,
     659             :      well aligned, not used at this point.  So it can handle all but the
     660             :      most extreme tstone generate application patterns.  We can store
     661             :      either the key directly in the scratch or the location in the
     662             :      bstream (faster but more scratch efficient) or the bstream seq of
     663             :      the dead / move that generated the tstone (slower but more scratch
     664             :      efficient).  We further can use the aux information in the
     665             :      partition to tighly bound the worst case number of tstones required
     666             :      up front.  But this is tricky because the srcatch array needs to
     667             :      have the partition processing tasks append to it in parallel.  So
     668             :      we either have to use atomic increments in the inner loop (yuck) or
     669             :      we have to partition the array up front (keeping fingers crossed
     670             :      that a uniform distribution assumption is valid) and then
     671             :      concatenate the partitions for parallel processing (yuck) or have
     672             :      the parallel cleanup processing work with non-compactly stored
     673             :      scratch (yuck).
     674             : 
     675             :      (There is a hybrid option where this tries to do option 2 but if
     676             :      scratch runs out on any thread, use option 1 to clean up tstones in
     677             :      meta.)
     678             : 
     679             :      We go with the simplest and robust implementation below.
     680             : 
     681             :      FIXME: regardless of the above, it is theoretically possible for
     682             :      the number of used meta elements that need to be tracked
     683             :      intermediate to exceed meta pair_max even if the final state at
     684             :      seq_present can be stored in pair_max.  We retry with a serial
     685             :      recovery if parallel recovery fails. */
     686             : 
     687           0 :   ulong seq = fd_vinyl_io_seq_present( vinyl->io );
     688             : 
     689           0 :   ulong rtmp[4];
     690           0 :   ulong lock[2];
     691             : 
     692           0 :   lock[0] = 0UL;
     693           0 :   lock[1] = seq;
     694             : 
     695           0 :   FD_MAP_REDUCE( fd_vinyl_recover_part_task, tpool,t0,t1, 0L,(long)(t1-t0), rtmp, vinyl, lock );
     696             : 
     697           0 :   ulong fail = rtmp[0];
     698           0 :   if( FD_UNLIKELY( fail ) ) {
     699           0 :     FD_LOG_WARNING(( "parallel recovery failed; attempting serial recovery" ));
     700             : 
     701             :     /* Reset the meta from whatever messy state failed parallel recovery
     702             :        left it */
     703             : 
     704           0 :     FD_MAP_REDUCE( fd_vinyl_recover_reclaim_task,    tpool,t0,t1, 0L,(long)lock_cnt, &reclaim_cnt, vinyl );
     705           0 :     FD_FOR_ALL   ( fd_vinyl_recover_meta_flush_task, tpool,t0,t1, 0L,(long)ele_max,                vinyl );
     706           0 :     FD_FOR_ALL   ( fd_vinyl_recover_unlock_task,     tpool,t0,t1, 0L,(long)lock_cnt,               vinyl );
     707             : 
     708           0 :     fd_vinyl_data_reset( tpool,t0,t1, level, vinyl->data );
     709             : 
     710           0 :     return fd_vinyl_recover_serial( vinyl );
     711           0 :   }
     712             : 
     713           0 :   vinyl->pair_cnt   = rtmp[1];
     714           0 :   vinyl->garbage_sz = rtmp[2];
     715             : 
     716           0 :   ulong tstone_rem = rtmp[3];
     717             : 
     718           0 :   while( tstone_rem ) {
     719           0 :     FD_FOR_ALL( fd_vinyl_recover_meta_cleanup_task, tpool,t0,t1, 0L,(long)ele_max, rtmp, vinyl );
     720           0 :     tstone_rem -= rtmp[0];
     721           0 :   }
     722             : 
     723             :   /* Reset the data cache to clean up any scratch usage (currently none
     724             :      but no reason to do earlier) */
     725             : 
     726           0 :   fd_vinyl_data_reset( tpool,t0,t1, level, vinyl->data );
     727             : 
     728           0 :   return seq;
     729             : 
     730           0 : # endif /* FD_HAS_ATOMIC */
     731           0 : }

Generated by: LCOV version 1.14