LCOV - code coverage report
Current view: top level - vinyl - fd_vinyl_exec.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 368 0.0 %
Date: 2026-06-29 05:51:35 Functions: 0 2 0.0 %

          Line data    Source code
       1             : #include "fd_vinyl.h"
       2             : #include "../util/pod/fd_pod.h"
       3             : #include <errno.h>
       4             : #include <unistd.h>
       5             : #include <fcntl.h>
       6             : #include <lz4.h>
       7             : 
       8             : struct fd_vinyl_client {
       9             :   fd_vinyl_rq_t * rq;        /* Channel for requests from this client (could be shared by multiple vinyl instances) */
      10             :   fd_vinyl_cq_t * cq;        /* Channel for completions from this client to this vinyl instance
      11             :                                 (could be shared by multiple receivers of completions from this vinyl instance). */
      12             :   ulong           burst_max; /* Max requests receive from this client at a time */
      13             :   ulong           seq;       /* Sequence number of the next request to receive in the rq */
      14             :   ulong           link_id;   /* Identifies requests from this client to this vinyl instance in the rq */
      15             :   ulong           laddr0;    /* A valid non-zero gaddr from this client maps to the vinyl instance's laddr laddr0 + gaddr ... */
      16             :   ulong           laddr1;    /* ... and thus is in (laddr0,laddr1).  A zero gaddr maps to laddr NULL. */
      17             :   ulong           quota_rem; /* Num of remaining acquisitions this client is allowed on this vinyl instance */
      18             :   ulong           quota_max; /* Max quota */
      19             : };
      20             : 
      21             : typedef struct fd_vinyl_client fd_vinyl_client_t;
      22             : 
      23             : /* MAP_REQ_GADDR maps a request global address req_gaddr to an array of
      24             :    cnt T's into the local address space as a T * pointer.  If the result
      25             :    is not properly aligned or the entire range does not completely fall
      26             :    within the shared region with the client, returns NULL.  Likewise,
      27             :    gaadr 0 maps to NULL.  Assumes sizeof(T)*(n) does not overflow (which
      28             :    is true where as n is at most batch_cnt which is at most 2^32 and
      29             :    sizeof(T) is at most 40. */
      30             : 
      31           0 : #define MAP_REQ_GADDR( gaddr, T, n ) ((T *)fd_vinyl_laddr( (gaddr), alignof(T), sizeof(T)*(n), client_laddr0, client_laddr1 ))
      32             : 
      33             : FD_FN_CONST static inline void *
      34             : fd_vinyl_laddr( ulong req_gaddr,
      35             :                 ulong align,
      36             :                 ulong footprint,
      37             :                 ulong client_laddr0,
      38           0 :                 ulong client_laddr1 ) {
      39           0 :   ulong req_laddr0 = client_laddr0 + req_gaddr;
      40           0 :   ulong req_laddr1 = req_laddr0    + footprint;
      41           0 :   return (void *)fd_ulong_if( (!!req_gaddr) & fd_ulong_is_aligned( req_laddr0, align ) &
      42           0 :                               (client_laddr0<=req_laddr0) & (req_laddr0<=req_laddr1) & (req_laddr1<=client_laddr1),
      43           0 :                               req_laddr0, 0UL );
      44           0 : }
      45             : 
      46             : /* FIXME: STASH THESE IN THE VINYL TOO? */
      47             : #define FD_VINYL_CLIENT_MAX (1024UL)
      48           0 : #define FD_VINYL_REQ_MAX    (1024UL)
      49             : 
      50             : void
      51           0 : fd_vinyl_exec( fd_vinyl_t * vinyl ) {
      52             : 
      53             :   /* Unpack shared objects */
      54             : 
      55           0 :   fd_cnc_t *        cnc  = vinyl->cnc;
      56           0 :   fd_vinyl_io_t *   io   = vinyl->io;
      57           0 :   fd_vinyl_line_t * line = vinyl->line;
      58           0 :   fd_vinyl_meta_t * meta = vinyl->meta;
      59           0 :   fd_vinyl_data_t * data = vinyl->data;
      60             : 
      61             :   /* Unpack config */
      62             : 
      63           0 :   ulong line_cnt  = vinyl->line_cnt;
      64           0 :   ulong pair_max  = vinyl->pair_max;
      65           0 :   ulong async_min = vinyl->async_min;
      66           0 :   ulong async_max = vinyl->async_max;
      67             : 
      68             :   /* Unpack cnc */
      69             : 
      70           0 :   if( FD_UNLIKELY( fd_cnc_signal_query( cnc )!=FD_VINYL_CNC_SIGNAL_BOOT ) ) {
      71           0 :     FD_LOG_WARNING(( "cnc not booting (restarting after an unclean termination?); forcing to boot and attempting to continue" ));
      72           0 :     fd_cnc_signal( cnc, FD_VINYL_CNC_SIGNAL_BOOT );
      73           0 :   }
      74             : 
      75           0 :   fd_vinyl_cmd_t * cmd  = (fd_vinyl_cmd_t *)fd_cnc_app_laddr( cnc );
      76           0 :   ulong *          diag = (ulong *)(cmd+1);
      77             : 
      78             :   /* Unpack io */
      79             : 
      80           0 :   ulong io_seed = fd_vinyl_io_seed( io );
      81             : 
      82             :   /* Unpack meta */
      83             : 
      84           0 :   fd_vinyl_meta_ele_t * ele0       =  meta->ele;
      85           0 :   ulong                 ele_max    =  meta->ele_max;
      86           0 :   ulong                 meta_seed  =  meta->seed;
      87           0 :   ulong *               lock       =  meta->lock;
      88           0 :   int                   lock_shift =  meta->lock_shift;
      89             : 
      90             :   /* Unpack data */
      91             : 
      92           0 :   ulong                       data_laddr0 = (ulong)data->laddr0;
      93           0 :   fd_vinyl_data_vol_t const * vol         =        data->vol;
      94           0 :   ulong                       vol_cnt     =        data->vol_cnt;
      95             : 
      96             :   /* Connected clients */
      97             : 
      98           0 :   fd_vinyl_client_t _client[ FD_VINYL_CLIENT_MAX ];
      99           0 :   ulong             client_cnt = 0UL;               /* In [0,client_max) */
     100           0 :   ulong             client_idx = 0UL;               /* If client_cnt>0, next client to poll for requests, d/c otherwise */
     101             : 
     102           0 :   ulong quota_free = line_cnt - 1UL;
     103             : 
     104             :   /* Received requests */
     105             : 
     106           0 :   fd_vinyl_req_t _req[ FD_VINYL_REQ_MAX ];
     107           0 :   ulong          req_head = 0UL;           /* Requests [0,req_head)         have been processed */
     108           0 :   ulong          req_tail = 0UL;           /* Requests [req_head,req_tail)  are pending */
     109             :                                            /* Requests [req_tail,ULONG_MAX) have not been received */
     110           0 :   ulong burst_free = FD_VINYL_REQ_MAX;
     111           0 :   ulong exec_max   = 0UL;
     112             : 
     113             :   /* accum_dead_cnt is the number of dead blocks that have been
     114             :      written since the last partition block.
     115             : 
     116             :      accum_move_cnt is the number of move blocks that have been
     117             :      written since this last partition block.
     118             : 
     119             :      accum_garbage_cnt / sz is the number of items / bytes garbage in
     120             :      the bstream that have accumulated since the last time we compacted
     121             :      the bstream.  We use this to estimate the number of rounds of
     122             :      compaction to do in async handling.
     123             : 
     124             :      accum_drop_link is the number of requests that were silently
     125             :      dropped because the request link_id did not match the client's
     126             :      link_id.
     127             : 
     128             :      accum_drop_comp is the number of requests that were silently
     129             :      dropped because an out-of-band completion was requested to be sent
     130             :      to an unmappable client address.
     131             : 
     132             :      accumt_req_full is the number of times we detected the pending
     133             :      request queue being completely full. */
     134             : 
     135           0 :   ulong accum_dead_cnt    = 0UL;
     136           0 :   ulong accum_move_cnt    = 0UL;
     137           0 :   ulong accum_garbage_cnt = 0UL;
     138           0 :   ulong accum_garbage_sz  = 0UL;
     139           0 :   ulong accum_drop_link   = 0UL;
     140           0 :   ulong accum_drop_comp   = 0UL;
     141             : 
     142           0 :   ulong seq_part = fd_vinyl_io_seq_present( io );
     143             : 
     144             :   /* Run */
     145             : 
     146           0 :   fd_cnc_signal( cnc, FD_VINYL_CNC_SIGNAL_RUN );
     147             : 
     148           0 :   ulong async_rem = 1UL;
     149             : 
     150           0 :   for(;;) {
     151             : 
     152             :     /* Process background tasks this iteration if necessary */
     153             : 
     154           0 :     if( FD_UNLIKELY( !(--async_rem) ) ) {
     155           0 :       long now  = fd_log_wallclock();
     156           0 :       async_rem = async_min + (fd_ulong_hash( (ulong)now ) % (async_max-async_min+1UL)); /* FIXME: FASTER ALGO */
     157             : 
     158           0 :       fd_cnc_heartbeat( cnc, now );
     159             : 
     160             :       /* If we've written enough to justify appending a parallel
     161             :          recovery partition, append one. */
     162             : 
     163           0 :       ulong seq_future = fd_vinyl_io_seq_future( io );
     164           0 :       if( FD_UNLIKELY( (seq_future - seq_part) > vinyl->part_thresh ) ) {
     165             : 
     166           0 :         ulong seq = fd_vinyl_io_append_part( io, seq_part, accum_dead_cnt, accum_move_cnt, NULL, 0UL );
     167           0 :         FD_DCHECK_CRIT( fd_vinyl_seq_eq( seq, seq_future ), "corruption detected" );
     168           0 :         seq_part = seq + FD_VINYL_BSTREAM_BLOCK_SZ;
     169             : 
     170           0 :         accum_dead_cnt = 0UL;
     171           0 :         accum_move_cnt = 0UL;
     172             : 
     173           0 :         accum_garbage_cnt++;
     174           0 :         accum_garbage_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
     175             : 
     176           0 :         fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
     177             : 
     178           0 :       }
     179             : 
     180           0 :       diag[ FD_VINYL_DIAG_DROP_LINK ] += accum_drop_link; accum_drop_link = 0UL;
     181           0 :       diag[ FD_VINYL_DIAG_DROP_COMP ] += accum_drop_comp; accum_drop_comp = 0UL;
     182             : 
     183             :       /* Let the number of items of garbage generated since the last
     184             :          compaction be accum_garbage_cnt and let the steady steady
     185             :          average number of live / garbage items in the bstream's past be
     186             :          L / G (i.e. L is the average value of pair_cnt).  The average
     187             :          number pieces of garbage collected per garbage collection round
     188             :          is thus G / (L + G).  If we do compact_max rounds garbage
     189             :          collection this async handling, we expect to collect
     190             : 
     191             :               compact_max G / (L + G)
     192             : 
     193             :          items of garbage on average.  To make sure we collect garbage
     194             :          faster than we generate it on average, we then require:
     195             : 
     196             :               accum_garbage_cnt <~ compact_max G / (L + G)
     197             :            -> compact_max >~ (L + G) accum_garbage_cnt / G
     198             : 
     199             :          Let the be 2^-gc_eager be the maximum fraction of items in the
     200             :          bstream's past we are willing tolerate as garbage on average.
     201             :          We then have G = 2^-gc_eager (L + G).  This implies:
     202             : 
     203             :            -> compact_max >~ accum_garbage_cnt 2^gc_eager
     204             : 
     205             :          When accum_garbage_cnt is 0, we use a compact_max of 1 to do
     206             :          compaction rounds at a minimum rate all the time.  This allows
     207             :          transients (e.g. a sudden change to new steady state
     208             :          equilibrium, temporary disabling of garbage collection at key
     209             :          times for highest performance, etc) and unaccounted zero
     210             :          padding garbage to be absorbed when nothing else is going on. */
     211             : 
     212           0 :       int gc_eager = vinyl->gc_eager;
     213           0 :       if( FD_LIKELY( gc_eager>=0 ) ) {
     214             : 
     215             :         /* Saturating wide left shift */
     216           0 :         ulong overflow    = (accum_garbage_cnt >> (63-gc_eager) >> 1); /* sigh ... avoid wide shift UB */
     217           0 :         ulong compact_max = fd_ulong_max( fd_ulong_if( !overflow, accum_garbage_cnt << gc_eager, ULONG_MAX ), 1UL );
     218             : 
     219             :         /**/                                   accum_garbage_cnt = 0UL;
     220           0 :         vinyl->garbage_sz += accum_garbage_sz; accum_garbage_sz  = 0UL;
     221             : 
     222           0 :         fd_vinyl_compact( vinyl, compact_max );
     223             : 
     224           0 :       }
     225             : 
     226           0 :       ulong signal = fd_cnc_signal_query( cnc );
     227           0 :       if( FD_UNLIKELY( signal!=FD_VINYL_CNC_SIGNAL_RUN ) ) {
     228           0 :         if( FD_UNLIKELY( signal==FD_VINYL_CNC_SIGNAL_HALT ) ) break;
     229             : 
     230           0 :         switch( signal ) {
     231             : 
     232           0 :         case FD_VINYL_CNC_SIGNAL_SYNC: {
     233           0 :           fd_vinyl_io_sync( io, FD_VINYL_IO_FLAG_BLOCKING );
     234           0 :           break;
     235           0 :         }
     236             : 
     237           0 :         case FD_VINYL_CNC_SIGNAL_GET: {
     238           0 :           ulong old;
     239           0 :           int   err = FD_VINYL_SUCCESS;
     240           0 :           switch( cmd->get.opt ) {
     241           0 :           case FD_VINYL_OPT_PART_THRESH: old = vinyl->part_thresh;            break;
     242           0 :           case FD_VINYL_OPT_GC_THRESH:   old = vinyl->gc_thresh;              break;
     243           0 :           case FD_VINYL_OPT_GC_EAGER:    old = (ulong)(long)vinyl->gc_eager;  break;
     244           0 :           case FD_VINYL_OPT_STYLE:       old = (ulong)(uint)vinyl->style;     break;
     245           0 :           default:                       old = 0UL; err = FD_VINYL_ERR_INVAL; break;
     246           0 :           }
     247           0 :           cmd->get.val = old;
     248           0 :           cmd->get.err = err;
     249           0 :           break;
     250           0 :         }
     251             : 
     252           0 :         case FD_VINYL_CNC_SIGNAL_SET: { /* FIXME: ADD VALIDATION TO SET VALUES FOR OPT_GC_EAGER AND OPT_STYLE */
     253           0 :           ulong new = cmd->set.val;
     254           0 :           ulong old;
     255           0 :           int   err = FD_VINYL_SUCCESS;
     256           0 :           switch( cmd->set.opt ) {
     257           0 :           case FD_VINYL_OPT_PART_THRESH: old = vinyl->part_thresh;            vinyl->part_thresh =      new; break;
     258           0 :           case FD_VINYL_OPT_GC_THRESH:   old = vinyl->gc_thresh;              vinyl->gc_thresh   =      new; break;
     259           0 :           case FD_VINYL_OPT_GC_EAGER:    old = (ulong)(long)vinyl->gc_eager;  vinyl->gc_eager    = (int)new; break;
     260           0 :           case FD_VINYL_OPT_STYLE:       old = (ulong)(uint)vinyl->style;     vinyl->style       = (int)new; break;
     261           0 :           default:                       old = 0UL;                           err = FD_VINYL_ERR_INVAL;      break;
     262           0 :           }
     263           0 :           cmd->set.val = old;
     264           0 :           cmd->set.err = err;
     265           0 :           break;
     266           0 :         }
     267             : 
     268           0 :         case FD_VINYL_CNC_SIGNAL_CLIENT_JOIN: {
     269           0 :           int err;
     270             : 
     271           0 :           ulong        link_id   = cmd->join.link_id;
     272           0 :           ulong        burst_max = cmd->join.burst_max;
     273           0 :           ulong        quota_max = cmd->join.quota_max;
     274           0 :           char const * _rq       = cmd->join.rq;
     275           0 :           char const * _cq       = cmd->join.cq;
     276           0 :           char const * _wksp     = cmd->join.wksp;
     277             : 
     278           0 :           if( FD_UNLIKELY( client_cnt>=FD_VINYL_CLIENT_MAX ) ) {
     279           0 :             FD_LOG_WARNING(( "Too many clients (increase FD_VINYL_CLIENT_MAX)" ));
     280           0 :             err = FD_VINYL_ERR_FULL;
     281           0 :             goto join_done;
     282           0 :           }
     283             : 
     284           0 :           if( FD_UNLIKELY( burst_max > burst_free ) ) {
     285           0 :             FD_LOG_WARNING(( "Too large burst_max (increase FD_VINYL_REQ_MAX or decrease burst_max)" ));
     286           0 :             err = FD_VINYL_ERR_FULL;
     287           0 :             goto join_done;
     288           0 :           }
     289             : 
     290           0 :           if( FD_UNLIKELY( quota_max > fd_ulong_min( quota_free, FD_VINYL_COMP_QUOTA_MAX ) ) ) {
     291           0 :             FD_LOG_WARNING(( "Too large quota_max (increase line_cnt or decrease quota_max)" ));
     292           0 :             err = FD_VINYL_ERR_FULL;
     293           0 :             goto join_done;
     294           0 :           }
     295             : 
     296           0 :           for( ulong client_idx=0UL; client_idx<client_cnt; client_idx++ ) {
     297           0 :             if( FD_UNLIKELY( _client[ client_idx ].link_id==link_id ) ) {
     298           0 :               FD_LOG_WARNING(( "Client already joined with this link_id" ));
     299           0 :               err = FD_VINYL_ERR_FULL;
     300           0 :               goto join_done;
     301           0 :             }
     302           0 :           }
     303             : 
     304           0 :           fd_vinyl_rq_t * rq = fd_vinyl_rq_join( fd_wksp_map( _rq ) );
     305           0 :           if( FD_UNLIKELY( !rq ) ) {
     306           0 :             FD_LOG_WARNING(( "Unable to join client rq" ));
     307           0 :             err = FD_VINYL_ERR_INVAL;
     308           0 :             goto join_done;
     309           0 :           }
     310             : 
     311           0 :           fd_vinyl_cq_t * cq = fd_vinyl_cq_join( fd_wksp_map( _cq ) );
     312           0 :           if( FD_UNLIKELY( !cq ) ) {
     313           0 :             FD_LOG_WARNING(( "Unable to join client cq" ));
     314           0 :             err = FD_VINYL_ERR_INVAL;
     315           0 :             goto join_done;
     316           0 :           }
     317             : 
     318           0 :           fd_wksp_t * wksp = fd_wksp_attach( _wksp );
     319           0 :           if( FD_UNLIKELY( !wksp ) ) {
     320           0 :             FD_LOG_WARNING(( "Unable to attach to client request workspace" ));
     321           0 :             err = FD_VINYL_ERR_INVAL;
     322           0 :             goto join_done;
     323           0 :           }
     324             : 
     325           0 :           _client[ client_cnt ].rq        = rq;
     326           0 :           _client[ client_cnt ].cq        = cq;
     327           0 :           _client[ client_cnt ].burst_max = burst_max;
     328           0 :           _client[ client_cnt ].seq       = fd_vinyl_rq_seq( rq );
     329           0 :           _client[ client_cnt ].link_id   = link_id;
     330           0 :           _client[ client_cnt ].laddr0    = (ulong)wksp;
     331           0 :           _client[ client_cnt ].laddr1    = ULONG_MAX; //wksp->gaddr_hi; /* FIXME: HOW TO GET THIS CLEANLY */
     332           0 :           _client[ client_cnt ].quota_rem = quota_max;
     333           0 :           _client[ client_cnt ].quota_max = quota_max;
     334           0 :           client_cnt++;
     335             : 
     336           0 :           quota_free -= quota_max;
     337           0 :           burst_free -= burst_max;
     338             : 
     339             :           /* Every client_cnt run loop iterations we receive at most:
     340             : 
     341             :                sum_clients recv_max = FD_VINYL_REQ_MAX - burst_free
     342             : 
     343             :              requests.  To guarantee we processe requests fast enough
     344             :              that we never overrun our receive queue, under maximum
     345             :              client load, we need to process:
     346             : 
     347             :                sum_clients recv_max / client_cnt
     348             : 
     349             :              requests per run loop iteration.  We thus set exec_max
     350             :              to the ceil sum_clients recv_max / client_cnt. */
     351             : 
     352           0 :           exec_max = (FD_VINYL_REQ_MAX - burst_free + client_cnt - 1UL) / client_cnt;
     353             : 
     354           0 :           err = FD_VINYL_SUCCESS;
     355             : 
     356           0 :         join_done:
     357           0 :           cmd->join.err = err;
     358           0 :           break;
     359           0 :         }
     360             : 
     361           0 :         case FD_VINYL_CNC_SIGNAL_CLIENT_LEAVE: {
     362           0 :           int err;
     363             : 
     364           0 :           ulong link_id = cmd->leave.link_id;
     365             : 
     366           0 :           for( ulong client_idx=0UL; client_idx<client_cnt; client_idx++ ) {
     367           0 :             if( _client[ client_idx ].link_id==link_id ) {
     368             : 
     369           0 :               if( FD_UNLIKELY( _client[ client_idx ].quota_rem != _client[ client_idx ].quota_max ) ) {
     370           0 :                 FD_LOG_WARNING(( "client still has outstanding acquires" ));
     371           0 :                 err = FD_VINYL_ERR_INVAL;
     372           0 :                 goto leave_done;
     373           0 :               }
     374             : 
     375             :               /* discard pending requests from this client */
     376             : 
     377           0 :               ulong req_tail_new = req_head;
     378             : 
     379           0 :               for( ulong req_id=req_head; req_id<req_tail; req_id++ ) {
     380           0 :                 ulong req_idx = req_id & (FD_VINYL_REQ_MAX-1UL);
     381           0 :                 int   keep = (_req[ req_idx ].link_id != client_idx); /* Note: link_id remapped while pending */
     382           0 :                 _req[ req_tail_new & (FD_VINYL_REQ_MAX-1UL) ] = _req[ req_idx ];
     383           0 :                 req_tail_new += (ulong)keep;
     384           0 :               }
     385             : 
     386           0 :               ulong discard_cnt = req_tail - req_tail_new;
     387           0 :               if( discard_cnt ) FD_LOG_WARNING(( "discard %lu pending requests from leaving client", discard_cnt ));
     388             : 
     389           0 :               req_tail = req_tail_new;
     390             : 
     391           0 :               fd_wksp_unmap( fd_vinyl_rq_leave( _client[ client_idx ].rq ) );
     392           0 :               fd_wksp_unmap( fd_vinyl_cq_leave( _client[ client_idx ].cq ) );
     393           0 :               fd_wksp_detach( (fd_wksp_t *)_client[ client_idx ].laddr0 );
     394             : 
     395           0 :               quota_free += _client[ client_idx ].quota_max;
     396           0 :               burst_free += _client[ client_idx ].burst_max;
     397             : 
     398           0 :               ulong last_idx = --client_cnt;
     399           0 :               _client[ client_idx ] = _client[ last_idx ];
     400             : 
     401             :               /* Remap pending requests from the swapped client to its
     402             :                  new index. */
     403             : 
     404           0 :               for( ulong req_id=req_head; req_id<req_tail; req_id++ ) {
     405           0 :                 ulong req_idx = req_id & (FD_VINYL_REQ_MAX-1UL);
     406           0 :                 if( _req[ req_idx ].link_id == last_idx ) _req[ req_idx ].link_id = client_idx;
     407           0 :               }
     408             : 
     409           0 :               exec_max = client_cnt ? ((FD_VINYL_REQ_MAX - burst_free + client_cnt - 1UL) / client_cnt) : 0UL;
     410             : 
     411           0 :               err = FD_VINYL_SUCCESS;
     412           0 :               goto leave_done;
     413           0 :             }
     414           0 :           }
     415             : 
     416           0 :           FD_LOG_WARNING(( "client not joined" ));
     417           0 :           err = FD_VINYL_ERR_EMPTY;
     418             : 
     419           0 :         leave_done:
     420           0 :           cmd->leave.err = err;
     421           0 :           break;
     422           0 :         }
     423             : 
     424           0 :         default: {
     425           0 :           FD_LOG_WARNING(( "unknown signal received (%lu); ignoring", signal ));
     426           0 :           break;
     427           0 :         }
     428             : 
     429           0 :         }
     430             : 
     431           0 :         fd_cnc_signal( cnc, FD_VINYL_CNC_SIGNAL_RUN );
     432           0 :       }
     433           0 :     }
     434             : 
     435             :     /* Receive requests from clients */
     436             : 
     437           0 :     if( FD_LIKELY( client_cnt ) ) {
     438             : 
     439             :       /* Select client to poll this run loop iteration */
     440             : 
     441           0 :       client_idx = fd_ulong_if( client_idx+1UL<client_cnt, client_idx+1UL, 0UL );
     442             : 
     443           0 :       fd_vinyl_client_t * client = _client + client_idx;
     444             : 
     445           0 :       fd_vinyl_rq_t * rq        = client->rq;
     446           0 :       ulong           seq       = client->seq;
     447           0 :       ulong           burst_max = client->burst_max;
     448           0 :       ulong           link_id   = client->link_id;
     449             : 
     450             :       /* Enqueue up to burst_max requests from this client into the
     451             :          local request queue.  Using burst_max << FD_VINYL_REQ_MAX
     452             :          allows applications to prevent a bursty client from starving
     453             :          other clients of resources while preserving the spatial and
     454             :          temporal locality of reasonably sized O(burst_max) bursts from
     455             :          an individual client in processing below.  Each run loop
     456             :          iteration can enqueue up to burst_max requests per iterations. */
     457             : 
     458           0 :       for( ulong recv_rem=fd_ulong_min( FD_VINYL_REQ_MAX-(req_tail-req_head), burst_max ); recv_rem; recv_rem-- ) {
     459           0 :         fd_vinyl_req_t * req = _req + (req_tail & (FD_VINYL_REQ_MAX-1UL));
     460             : 
     461           0 :         long diff = fd_vinyl_rq_recv( rq, seq, req );
     462             : 
     463           0 :         if( FD_LIKELY( diff>0L ) ) break; /* No requests waiting in rq at this time */
     464             : 
     465           0 :         if( FD_UNLIKELY( diff ) ) FD_LOG_CRIT(( "client overran request queue" ));
     466             : 
     467           0 :         seq++;
     468             : 
     469             :         /* We got the next request.  Decide if we should accept it.
     470             : 
     471             :            Specifically, we ignore requests whose link_id don't match
     472             :            link_id (e.g. an unknown link_id or matches a different
     473             :            client's link_id ... don't know if it is where or even if it
     474             :            is safe to the completion).  Even if the request provided an
     475             :            out-of-band location to send the completion (comp_gaddr!=0),
     476             :            we have no reason to trust it given the mismatch.
     477             : 
     478             :            This also gives a mechanism for a client use a single rq to
     479             :            send requests to multiple vinyl instances ... the client
     480             :            should use a different link_id for each vinyl instance.  Each
     481             :            vinyl instance will quickly filter out the requests not
     482             :            addressed to it.
     483             : 
     484             :            Since we know the client_idx at this point, given a matching
     485             :            link_id, we stash the client_idx in the pending req link_id
     486             :            to eliminate the need to maintain a link_id<>client_idx map
     487             :            in the execution loop below. */
     488             : 
     489           0 :         if( FD_UNLIKELY( req->link_id!=link_id ) ) {
     490           0 :           accum_drop_link++;
     491           0 :           continue;
     492           0 :         }
     493             : 
     494           0 :         req->link_id = client_idx;
     495             : 
     496           0 :         req_tail++;
     497           0 :       }
     498             : 
     499           0 :       client->seq = seq;
     500           0 :     }
     501             : 
     502             :     /* Execute received requests */
     503             : 
     504           0 :     for( ulong exec_rem=fd_ulong_min( req_tail-req_head, exec_max ); exec_rem; exec_rem-- ) {
     505           0 :       fd_vinyl_req_t * req = _req + ((req_head++) & (FD_VINYL_REQ_MAX-1UL));
     506             : 
     507             :       /* Determine the client that sent this request and unpack the
     508             :          completion fields.  We ignore requests with non-NULL but
     509             :          unmappable out-of-band completion because we can't send the
     510             :          completion in the expected manner and, in lieu of that, the
     511             :          receivers aren't expecting any completion to come via the cq
     512             :          (if any).  Note that this implies requests that don't produce a
     513             :          completion (e.g. FETCH and FLUSH) need to either provide NULL
     514             :          or a valid non-NULL location for comp_gaddr to pass this
     515             :          validation (this is not a burden practically). */
     516             : 
     517           0 :       ulong  req_id     =        req->req_id;
     518           0 :       ulong  client_idx =        req->link_id; /* See note above about link_id / client_idx conversion */
     519           0 :       ulong  batch_cnt  = (ulong)req->batch_cnt;
     520           0 :       ulong  comp_gaddr =        req->comp_gaddr;
     521             : 
     522           0 :       fd_vinyl_client_t * client = _client + client_idx;
     523             : 
     524           0 :       fd_vinyl_cq_t * cq            = client->cq;
     525           0 :       ulong           link_id       = client->link_id;
     526           0 :       ulong           client_laddr0 = client->laddr0;
     527           0 :       ulong           client_laddr1 = client->laddr1;
     528           0 :       ulong           quota_rem     = client->quota_rem;
     529             : 
     530           0 :       FD_DCHECK_CRIT( quota_rem<=client->quota_max, "corruption detected" );
     531             : 
     532           0 :       fd_vinyl_comp_t * comp = MAP_REQ_GADDR( comp_gaddr, fd_vinyl_comp_t, 1UL );
     533           0 :       if( FD_UNLIKELY( (!comp) & (!!comp_gaddr) ) ) {
     534           0 :         accum_drop_comp++;
     535           0 :         continue;
     536           0 :       }
     537             : 
     538           0 :       int   comp_err   = 1;
     539           0 :       ulong fail_cnt   = 0UL;
     540             : 
     541           0 :       ulong read_cnt   = 0UL;
     542           0 :       ulong append_cnt = 0UL;
     543             : 
     544           0 :       switch( req->type ) {
     545             : 
     546           0 : #     include "fd_vinyl_case_acquire.c"
     547           0 : #     include "fd_vinyl_case_release.c"
     548           0 : #     include "fd_vinyl_case_erase.c"
     549           0 : #     include "fd_vinyl_case_move.c"
     550           0 : #     include "fd_vinyl_case_fetch.c"
     551           0 : #     include "fd_vinyl_case_flush.c"
     552           0 : #     include "fd_vinyl_case_try.c"
     553           0 : #     include "fd_vinyl_case_test.c"
     554             : 
     555           0 :       default:
     556           0 :         comp_err = FD_VINYL_ERR_INVAL;
     557           0 :         break;
     558           0 :       }
     559             : 
     560           0 :       for( ; read_cnt; read_cnt-- ) {
     561           0 :         fd_vinyl_io_rd_t * _rd; /* avoid pointer escape */
     562           0 :         fd_vinyl_io_poll( io, &_rd, FD_VINYL_IO_FLAG_BLOCKING );
     563           0 :         fd_vinyl_io_rd_t * rd = _rd;
     564             : 
     565           0 :         fd_vinyl_data_obj_t *     obj      = (fd_vinyl_data_obj_t *)    rd->ctx;
     566           0 :         ulong                     seq      =                            rd->seq; (void)seq;
     567           0 :         fd_vinyl_bstream_phdr_t * cphdr    = (fd_vinyl_bstream_phdr_t *)rd->dst;
     568           0 :         ulong                     cpair_sz =                            rd->sz;  (void)cpair_sz;
     569             : 
     570           0 :         fd_vinyl_data_obj_t * cobj = (fd_vinyl_data_obj_t *)fd_ulong_align_dn( (ulong)rd, FD_VINYL_BSTREAM_BLOCK_SZ );
     571             : 
     572           0 :         FD_DCHECK_CRIT( cphdr==fd_vinyl_data_obj_phdr( cobj ), "corruption detected" );
     573             : 
     574           0 :         ulong cpair_ctl = cphdr->ctl;
     575             : 
     576           0 :         int   cpair_type    = fd_vinyl_bstream_ctl_type ( cpair_ctl );
     577           0 :         int   cpair_style   = fd_vinyl_bstream_ctl_style( cpair_ctl );
     578           0 :         ulong cpair_val_esz = fd_vinyl_bstream_ctl_sz   ( cpair_ctl );
     579             : 
     580           0 :         FD_DCHECK_CRIT( cpair_type==FD_VINYL_BSTREAM_CTL_TYPE_PAIR,            "corruption detected" );
     581           0 :         FD_DCHECK_CRIT( cpair_sz  ==fd_vinyl_bstream_pair_sz( cpair_val_esz ), "corruption detected" );
     582             : 
     583           0 :         schar * rd_err = cobj->rd_err;
     584             : 
     585           0 :         FD_DCHECK_CRIT ( rd_err,                                          "corruption detected" );
     586           0 :         FD_DCHECK_ALERT( fd_vinyl_data_is_valid_obj( obj, vol, vol_cnt ), "corruption detected" );
     587             : 
     588           0 :         ulong line_idx = obj->line_idx;
     589             : 
     590           0 :         FD_DCHECK_CRIT( line_idx<line_cnt,                 "corruption detected" );
     591           0 :         FD_DCHECK_CRIT( line[ line_idx ].obj==obj,         "corruption detected" );
     592             : 
     593           0 :         ulong ele_idx = line[ line_idx ].ele_idx;
     594             : 
     595           0 :         FD_DCHECK_CRIT ( ele_idx<ele_max,                                                          "corruption detected" );
     596           0 :         FD_DCHECK_ALERT( !memcmp( &ele0[ ele_idx ].phdr, cphdr, sizeof(fd_vinyl_bstream_phdr_t) ), "corruption detected" );
     597           0 :         FD_DCHECK_CRIT ( ele0[ ele_idx ].seq     ==seq,                                            "corruption detected" );
     598           0 :         FD_DCHECK_CRIT ( ele0[ ele_idx ].line_idx==line_idx,                                       "corruption detected" );
     599             : 
     600             :         /* Verify data integrity */
     601             : 
     602           0 :         FD_DCHECK_ALERT( !fd_vinyl_bstream_pair_test( io_seed, seq, (fd_vinyl_bstream_block_t *)cphdr, cpair_sz ), "corruption detected" );
     603             : 
     604             :         /* Decode the pair */
     605             : 
     606           0 :         char * val    = (char *)fd_vinyl_data_obj_val( obj );
     607           0 :         ulong  val_sz = (ulong)cphdr->info.val_sz;
     608             : 
     609           0 :         FD_DCHECK_CRIT( val_sz <= FD_VINYL_VAL_MAX,                 "corruption detected" );
     610           0 :         FD_DCHECK_CRIT( fd_vinyl_data_obj_val_max( obj ) >= val_sz, "corruption detected" );
     611             : 
     612           0 :         if( FD_LIKELY( cpair_style==FD_VINYL_BSTREAM_CTL_STYLE_RAW ) ) {
     613             : 
     614           0 :           FD_DCHECK_CRIT( obj==cobj,             "corruption detected" );
     615           0 :           FD_DCHECK_CRIT( cpair_val_esz==val_sz, "corruption detected" );
     616             : 
     617           0 :         } else {
     618             : 
     619           0 :           char const * cval    = (char const *)fd_vinyl_data_obj_val( cobj );
     620           0 :           ulong        cval_sz = fd_vinyl_bstream_ctl_sz( cpair_ctl );
     621             : 
     622           0 :           ulong _val_sz = (ulong)LZ4_decompress_safe( cval, val, (int)cval_sz, (int)val_sz );
     623           0 :           if( FD_UNLIKELY( _val_sz!=val_sz ) ) FD_LOG_CRIT(( "LZ4_decompress_safe failed" ));
     624             : 
     625           0 :           fd_vinyl_data_free( data, cobj );
     626             : 
     627           0 :           fd_vinyl_bstream_phdr_t * phdr = fd_vinyl_data_obj_phdr( obj );
     628             : 
     629           0 :           phdr->ctl  = fd_vinyl_bstream_ctl( FD_VINYL_BSTREAM_CTL_TYPE_PAIR, FD_VINYL_BSTREAM_CTL_STYLE_RAW, val_sz );
     630           0 :           phdr->key  = cphdr->key;
     631           0 :           phdr->info = cphdr->info;
     632             : 
     633           0 :         }
     634             : 
     635           0 :         obj->rd_active = (short)0;
     636             : 
     637             :         /* Fill any trailing region with zeros (there is at least
     638             :            FD_VINYL_BSTREAM_FTR_SZ) and tell the client the item was
     639             :            successfully processed. */
     640             : 
     641           0 :         memset( val + val_sz, 0, fd_vinyl_data_szc_obj_footprint( (ulong)obj->szc )
     642           0 :                                  - (sizeof(fd_vinyl_data_obj_t) + sizeof(fd_vinyl_bstream_phdr_t) + val_sz) );
     643             : 
     644           0 :         FD_COMPILER_MFENCE();
     645           0 :         *rd_err = (schar)FD_VINYL_SUCCESS;
     646           0 :         FD_COMPILER_MFENCE();
     647             : 
     648           0 :       }
     649             : 
     650           0 :       if( FD_UNLIKELY( append_cnt ) ) fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
     651             : 
     652           0 :       if( FD_LIKELY( comp_err<=0 ) ) fd_vinyl_cq_send( cq, comp, req_id, link_id, comp_err, batch_cnt, fail_cnt, quota_rem );
     653             : 
     654           0 :       client->quota_rem = quota_rem;
     655             : 
     656           0 :     }
     657             : 
     658           0 :   } /* run loop */
     659             : 
     660           0 :   ulong discard_cnt = req_tail - req_head;
     661             : 
     662             :   /* Append the final partition and sync so we can resume with a fast
     663             :      parallel recovery */
     664             : 
     665           0 :   fd_vinyl_io_append_part( io, seq_part, accum_dead_cnt, accum_move_cnt, NULL, 0UL );
     666             : 
     667           0 :   accum_dead_cnt = 0UL;
     668           0 :   accum_move_cnt = 0UL;
     669             : 
     670           0 :   accum_garbage_cnt++;
     671           0 :   accum_garbage_sz += FD_VINYL_BSTREAM_BLOCK_SZ;
     672             : 
     673           0 :   fd_vinyl_io_commit( io, FD_VINYL_IO_FLAG_BLOCKING );
     674             : 
     675           0 :   fd_vinyl_io_sync( io, FD_VINYL_IO_FLAG_BLOCKING );
     676             : 
     677             :   /* Drain outstanding accumulators */
     678             : 
     679             :   /**/                                   accum_garbage_cnt = 0UL;
     680           0 :   vinyl->garbage_sz += accum_garbage_sz; accum_garbage_sz  = 0UL;
     681             : 
     682           0 :   diag[ FD_VINYL_DIAG_DROP_LINK ] += accum_drop_link; accum_drop_link   = 0UL;
     683           0 :   diag[ FD_VINYL_DIAG_DROP_COMP ] += accum_drop_comp; accum_drop_comp   = 0UL;
     684             : 
     685             :   /* Disconnect from the clients */
     686             : 
     687           0 :   ulong released_cnt = 0UL;
     688           0 :   for( ulong client_idx=0UL; client_idx<client_cnt; client_idx++ ) {
     689           0 :     released_cnt += (_client[ client_idx ].quota_max - _client[ client_idx ].quota_rem);
     690           0 :     fd_wksp_unmap( fd_vinyl_rq_leave( _client[ client_idx ].rq ) );
     691           0 :     fd_wksp_unmap( fd_vinyl_cq_leave( _client[ client_idx ].cq ) );
     692           0 :     fd_wksp_detach( (fd_wksp_t *)_client[ client_idx ].laddr0 );
     693           0 :   }
     694             : 
     695           0 :   if( FD_UNLIKELY( discard_cnt  ) ) FD_LOG_WARNING(( "halt discarded %lu received requests",   discard_cnt  ));
     696           0 :   if( FD_UNLIKELY( released_cnt ) ) FD_LOG_WARNING(( "halt released %lu outstanding acquires", released_cnt ));
     697           0 :   if( FD_UNLIKELY( client_cnt   ) ) FD_LOG_WARNING(( "halt disconneced %lu clients",           client_cnt   ));
     698             : 
     699             :   /* Return to boot state */
     700             : 
     701           0 :   fd_cnc_signal( cnc, FD_VINYL_CNC_SIGNAL_BOOT );
     702           0 : }

Generated by: LCOV version 1.14