LCOV - code coverage report
Current view: top level - disco/stem - fd_stem.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 387 0.0 %
Date: 2026-02-09 06:11:16 Functions: 0 325 0.0 %

          Line data    Source code
       1             : #include "fd_stem.h"
       2             : 
       3             : /* fd_stem provides services to multiplex multiple streams of input
       4             :    fragments and present them to a mix of reliable and unreliable
       5             :    consumers as though they were generated by multiple different
       6             :    multi-stream producers.  The code can be included to generate
       7             :    a definition of stem_run which can be called as a tile main run
       8             :    loop.
       9             : 
      10             :    The template supports various callback functions which can be
      11             :    defined like #define STEM_CALLBACK_BEFORE_FRAG before_frag to
      12             :    tune the behavior of the stem_run loop.  The callbacks are:
      13             : 
      14             :      SHOULD_SHUTDOWN
      15             :    It is called at the beginning of each iteration of the stem run loop,
      16             :    and if it returns non-zero, the stem will exit the run loop and
      17             :    return from the stem_run function.  This is useful for shutting down
      18             :    the tile.
      19             : 
      20             :      DURING_HOUSEKEEPING
      21             :    Is called during the housekeeping routine, which happens infrequently
      22             :    on a schedule determined by the stem (based on the lazy parameter,
      23             :    see fd_tempo.h for more information).  It is appropriate to do
      24             :    slightly expensive things here that wouldn't be OK to do in the main
      25             :    loop, like updating sequence numbers that are shared with other tiles
      26             :    (e.g. synchronization information), or sending batched information
      27             :    somewhere.  The ctx is a user-provided context object from when the
      28             :    stem was initialized.
      29             : 
      30             :      METRICS_WRITE
      31             :    By convention, tiles may wish to accumulate high traffic metrics
      32             :    locally so they don't cause a lot of cache coherency traffic, and
      33             :    then periodically publish them to external observers.  This callback
      34             :    is here to support that use case.  It occurs infrequently during the
      35             :    housekeeping loop, and is called inside a compiler fence to ensure
      36             :    the writes do not get reordered, which may be important for observers
      37             :    or monitoring tools.  The ctx is a user-provided context object from
      38             :    when the stem tile was initialized.
      39             : 
      40             :      BEFORE_CREDIT
      41             :    Is called every iteration of the stem run loop, whether there is a
      42             :    new frag ready to receive or not.  This callback is also still
      43             :    invoked even if the stem is backpressured and cannot read any new
      44             :    fragments while waiting for downstream consumers to catch up.  This
      45             :    callback is useful for things that need to occur even if no new frags
      46             :    are being handled.  For example, servicing network connections could
      47             :    happen here.  The ctx is a user-provided context object from when the
      48             :    stem tile was initialized.  The stem is the stem which is invoking
      49             :    this callback. The stem should only be used for calling
      50             :    fd_stem_publish to publish a fragment to downstream consumers.
      51             : 
      52             :    The charge_busy argument is 0 by default, and should be set to 1 if
      53             :    the before_credit function is doing work that should be accounted for
      54             :    as part of the tiles busy indicator.
      55             : 
      56             :       AFTER_CREDIT
      57             :    Is called every iteration of the stem run loop, whether there is a
      58             :    new frag ready to receive or not, except in cases where the stem is
      59             :    backpressured by a downstream consumer and would not be able to
      60             :    publish.  The callback might be used for publishing new fragments to
      61             :    downstream consumers in the main loop which are not in response to an
      62             :    incoming fragment.  For example, code that collects incoming
      63             :    fragments over a period of 1 second and joins them together before
      64             :    publishing a large block fragment downstream, would publish the block
      65             :    here. The ctx is a user-provided context object from when the stem
      66             :    tile was initialized.  The stem is the stem which is invoking this
      67             :    callback. The stem should only be used for calling fd_stem_publish to
      68             :    publish a fragment to downstream consumers.
      69             : 
      70             :    The opt_poll_in argument determines if the stem should proceed with
      71             :    checking for new fragments to consumer, or should `continue` the main
      72             :    stem loop to do credit checking again.  This could be used if the
      73             :    after_credit function publishes, and the flow control needs to be
      74             :    checked again.  By default, opt_poll_in is true and the stem will
      75             :    poll for fragments right away without rerunning the loop or checking
      76             :    for credits.
      77             : 
      78             :    The charge_busy argument is 0 by default, and should be set to 1 if
      79             :    the after_credit function is doing work that should be accounted for
      80             :    as part of the tiles busy indicator.
      81             : 
      82             :       BEFORE_FRAG
      83             :    Is called immediately whenever a new fragment has been detected that
      84             :    was published by an upstream producer.  The signature and sequence
      85             :    number (sig and seq) provided as arguments are read atomically from
      86             :    shared memory, so must both match each other from the published
      87             :    fragment (aka. they will not be torn or partially overwritten).
      88             :    in_idx is an index in [0, num_ins) indicating which producer
      89             :    published the fragment. No fragment data has been read yet here, nor
      90             :    has other metadata, for example the size or timestamps of the
      91             :    fragment.  Mainly this callback is useful for deciding whether to
      92             :    filter the fragment based on its signature.  If the return value is
      93             :    non-zero, the frag will be skipped completely, no fragment data will
      94             :    be read, and the in will be advanced so that we now wait for the next
      95             :    fragment.  If the return value is -1, then the frag is returned back
      96             :    to the message queue and will be reprocessed.  The ctx is a
      97             :    user-provided context object from when the stem tile was initialized.
      98             : 
      99             :       DURING_FRAG
     100             :    Is called after the stem has received a new frag from an in, but
     101             :    before the stem has checked that it was overrun.  This callback is
     102             :    not invoked if the stem is backpressured, as it would not try and
     103             :    read a frag from an in in the first place (instead, leaving it on the
     104             :    in mcache to backpressure the upstream producer).  in_idx will be the
     105             :    index of the in that the frag was received from, skipping any unpolled
     106             :    links. If the producer of the frags is respecting flow control, it is
     107             :    safe to read frag data in any of the callbacks, but it is suggested to
     108             :    copy or read frag data within this callback, as if the producer does
     109             :    not respect flow control, the frag may be torn or corrupt due to an
     110             :    overrun by the reader.  If the frag being read from has been
     111             :    overwritten while this callback is running, the frag will be ignored
     112             :    and the stem will not call the after_frag function. Instead it will
     113             :    recover from the overrun and continue with new frags.  This function
     114             :    cannot fail.  The ctx is a user-provided context object from when the
     115             :    stem tile was initialized. seq, sig, chunk, and sz are the respective
     116             :    fields from the mcache fragment that was received.  If the producer
     117             :    is not respecting flow control, these may be corrupt or torn and
     118             :    should not be trusted, except for seq which is read atomically.
     119             : 
     120             :       RETURNABLE_FRAG
     121             :    Is called after the stem has received a new frag from an in, and
     122             :    assumes that the stem cannot be overrun.  This special callback can
     123             :    instruct the stem not to advance the input sequence number, and
     124             :    instead return the fragment to the stem to be processed again.  This
     125             :    is useful for processing partial data from fragments without copying
     126             :    it.  This callback is unsafe in general contexts, since it assumes
     127             :    that the frag will not be overwritten while the callback is running,
     128             :    and that the frag data is valid throughout the function call.  It
     129             :    should only be used when the stem is guaranteed to not be overrun.
     130             :    This callback is not invoked if the stem is backpressured, as it
     131             :    would not try and read a frag from an in in the first place (instead,
     132             :    leaving it on the in mcache to backpressure the upstream producer).
     133             :    in_idx will be the index of the in that the frag was received from.
     134             :    seq, sig, chunk, and sz are the respective fields from the mcache
     135             :    fragment that was received.  tsorig and tspub are the timestamps of
     136             :    the fragment that was received, and are read atomically from shared
     137             :    memory, so must both match each other from the published fragment
     138             :    (aka. they will not be torn or partially overwritten).  The ctx is a
     139             :    user-provided context object from when the stem tile was initialized.
     140             :    The callback should return 1 if the fragment was not fully processed
     141             :    and should be returned to the stem for further processing, or 0 if
     142             :    the fragment was fully processed and the consumer link should be
     143             :    advanced.
     144             : 
     145             :       AFTER_FRAG
     146             :    Is called immediately after the DURING_FRAG, along with an additional
     147             :    check that the reader was not overrun while handling the frag.  If
     148             :    the reader was overrun, the frag is abandoned and this function is
     149             :    not called.  This callback is not invoked if the stem is
     150             :    backpressured, as it would not read a frag in the first place.
     151             :    in_idx will be the index of the in that the frag was received from,
     152             :    skipping any unpolled links. You should not read the frag data directly
     153             :    here, as it might still get overrun, instead it should be copied out of
     154             :    the frag during the read callback if needed later. This function cannot
     155             :    fail. The ctx is a user-provided context object from when the stem tile
     156             :    was initialized.  stem should only be used for calling fd_stem_publish
     157             :    to publish a fragment to downstream consumers.  seq is the sequence
     158             :    number of the fragment that was read from the input mcache. sig,
     159             :    chunk, sz, tsorig, and tspub are the respective fields from the
     160             :    mcache fragment that was received.  If the producer is not respecting
     161             :    flow control, these may be corrupt or torn and should not be trusted.
     162             : 
     163             :       AFTER_POLL_OVERRUN
     164             :    Is called when an overrun is detected while polling for new frags.
     165             :    This callback is not called when an overrun is detected in
     166             :    during_frag. */
     167             : 
     168             : #if !FD_HAS_ALLOCA
     169             : #error "fd_stem requires alloca"
     170             : #endif
     171             : 
     172             : #include "../topo/fd_topo.h"
     173             : #include "../metrics/fd_metrics.h"
     174             : #include "../../tango/fd_tango.h"
     175             : 
     176             : #ifndef STEM_NAME
     177             : #define STEM_NAME stem
     178             : #endif
     179           0 : #define STEM_(n) FD_EXPAND_THEN_CONCAT3(STEM_NAME,_,n)
     180             : 
     181             : #ifndef STEM_BURST
     182             : #error "STEM_BURST must be defined"
     183             : #endif
     184             : 
     185             : #ifndef STEM_CALLBACK_CONTEXT_TYPE
     186             : #error "STEM_CALLBACK_CONTEXT_TYPE must be defined"
     187             : #endif
     188             : 
     189             : #ifndef STEM_CALLBACK_CONTEXT_ALIGN
     190             : #error "STEM_CALLBACK_CONTEXT_ALIGN must be defined"
     191             : #endif
     192             : 
     193             : #ifndef STEM_LAZY
     194           0 : #define STEM_LAZY (0L)
     195             : #endif
     196             : 
     197           0 : #define STEM_SHUTDOWN_SEQ (ULONG_MAX-1UL)
     198             : 
     199             : static inline void
     200           0 : STEM_(in_update)( fd_stem_tile_in_t * in ) {
     201           0 :   fd_fseq_update( in->fseq, in->seq );
     202             : 
     203           0 :   volatile ulong * metrics = fd_metrics_link_in( fd_metrics_base_tl, in->idx );
     204             : 
     205           0 :   uint *  accum = in->accum;
     206           0 :   ulong a0 = (ulong)accum[0]; ulong a1 = (ulong)accum[1]; ulong a2 = (ulong)accum[2];
     207           0 :   ulong a3 = (ulong)accum[3]; ulong a4 = (ulong)accum[4]; ulong a5 = (ulong)accum[5];
     208           0 :   FD_COMPILER_MFENCE();
     209           0 :   metrics[0] += a0;           metrics[1] += a1;           metrics[2] += a2;
     210           0 :   metrics[3] += a3;           metrics[4] += a4;           metrics[5] += a5;
     211           0 :   FD_COMPILER_MFENCE();
     212           0 :   accum[0] = 0U;              accum[1] = 0U;              accum[2] = 0U;
     213           0 :   accum[3] = 0U;              accum[4] = 0U;              accum[5] = 0U;
     214           0 : }
     215             : 
     216             : FD_FN_PURE static inline ulong
     217           0 : STEM_(scratch_align)( void ) {
     218           0 :   return FD_STEM_SCRATCH_ALIGN;
     219           0 : }
     220             : 
     221             : FD_FN_PURE static inline ulong
     222             : STEM_(scratch_footprint)( ulong in_cnt,
     223             :                           ulong out_cnt,
     224           0 :                           ulong cons_cnt ) {
     225           0 :   ulong l = FD_LAYOUT_INIT;
     226           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_stem_tile_in_t), in_cnt*sizeof(fd_stem_tile_in_t)     );  /* in */
     227           0 :   l = FD_LAYOUT_APPEND( l, alignof(ulong),             out_cnt*sizeof(ulong)                ); /* cr_avail */
     228           0 :   l = FD_LAYOUT_APPEND( l, alignof(ulong),             out_cnt*sizeof(ulong)                ); /* out_depth */
     229           0 :   l = FD_LAYOUT_APPEND( l, alignof(ulong),             out_cnt*sizeof(ulong)                ); /* out_seq */
     230           0 :   l = FD_LAYOUT_APPEND( l, alignof(ulong const *),     cons_cnt*sizeof(ulong const *)       ); /* cons_fseq */
     231           0 :   l = FD_LAYOUT_APPEND( l, alignof(ulong *),           cons_cnt*sizeof(ulong *)             ); /* cons_slow */
     232           0 :   l = FD_LAYOUT_APPEND( l, alignof(ulong),             cons_cnt*sizeof(ulong)               ); /* cons_out */
     233           0 :   l = FD_LAYOUT_APPEND( l, alignof(ulong),             cons_cnt*sizeof(ulong)               ); /* cons_seq */
     234           0 :   const ulong event_cnt = in_cnt + 1UL + cons_cnt;
     235           0 :   l = FD_LAYOUT_APPEND( l, alignof(ushort),            event_cnt*sizeof(ushort)             ); /* event_map */
     236           0 :   return FD_LAYOUT_FINI( l, STEM_(scratch_align)() );
     237           0 : }
     238             : 
     239             : static inline void
     240             : STEM_(run1)( ulong                        in_cnt,
     241             :              fd_frag_meta_t const **      in_mcache,
     242             :              ulong **                     in_fseq,
     243             :              ulong                        out_cnt,
     244             :              fd_frag_meta_t **            out_mcache,
     245             :              ulong                        cons_cnt,
     246             :              ulong *                      _cons_out,
     247             :              ulong **                     _cons_fseq,
     248             :              volatile ulong **            _cons_slow,
     249             :              ulong                        burst,
     250             :              long                         lazy,
     251             :              fd_rng_t *                   rng,
     252             :              void *                       scratch,
     253           0 :              STEM_CALLBACK_CONTEXT_TYPE * ctx ) {
     254             :   /* in frag stream state */
     255           0 :   ulong               in_seq; /* current position in input poll sequence, in [0,in_cnt) */
     256           0 :   fd_stem_tile_in_t * in;     /* in[in_seq] for in_seq in [0,in_cnt) has information about input fragment stream currently at
     257             :                                  position in_seq in the in_idx polling sequence.  The ordering of this array is continuously
     258             :                                  shuffled to avoid lighthousing effects in the output fragment stream at extreme fan-in and load */
     259             : 
     260             :   /* out frag stream state */
     261           0 :   ulong *        out_depth; /* ==fd_mcache_depth( out_mcache[out_idx] ) for out_idx in [0, out_cnt) */
     262           0 :   ulong *        out_seq;  /* next mux frag sequence number to publish for out_idx in [0, out_cnt) ]*/
     263             : 
     264             :   /* out flow control state */
     265           0 :   ulong *        cr_avail;     /* number of flow control credits available to publish downstream across all outs */
     266           0 :   ulong          min_cr_avail; /* minimum number of flow control credits available to publish downstream */
     267           0 :   ulong const ** cons_fseq;    /* cons_fseq[cons_idx] for cons_idx in [0,cons_cnt) is where to receive fctl credits from consumers */
     268           0 :   volatile ulong ** cons_slow; /* cons_slow[cons_idx] for cons_idx in [0,cons_cnt) is where to accumulate slow events */
     269           0 :   ulong *        cons_out;     /* cons_out[cons_idx] for cons_idx in [0,cons_ct) is which out the consumer consumes from */
     270           0 :   ulong *        cons_seq;     /* cons_seq [cons_idx] is the most recent observation of cons_fseq[cons_idx] */
     271             : 
     272             :   /* housekeeping state */
     273           0 :   ulong    event_cnt; /* ==in_cnt+cons_cnt+1, total number of housekeeping events */
     274           0 :   ulong    event_seq; /* current position in housekeeping event sequence, in [0,event_cnt) */
     275           0 :   ushort * event_map; /* current mapping of event_seq to event idx, event_map[ event_seq ] is next event to process */
     276           0 :   ulong    async_min; /* minimum number of ticks between processing a housekeeping event, positive integer power of 2 */
     277             : 
     278             :   /* performance metrics */
     279           0 :   ulong metric_in_backp;  /* is the run loop currently backpressured by one or more of the outs, in [0,1] */
     280           0 :   ulong metric_backp_cnt; /* Accumulates number of transitions of tile to backpressured between housekeeping events */
     281             : 
     282           0 :   ulong metric_regime_ticks[ FD_METRICS_ENUM_TILE_REGIME_CNT ]; /* How many ticks the tile has spent in each regime */
     283             : 
     284           0 :   if( FD_UNLIKELY( !scratch ) ) FD_LOG_ERR(( "NULL scratch" ));
     285           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)scratch, STEM_(scratch_align)() ) ) ) FD_LOG_ERR(( "misaligned scratch" ));
     286             : 
     287             :   /* in_backp==1, backp_cnt==0 indicates waiting for initial credits,
     288             :       cleared during first housekeeping if credits available */
     289           0 :   metric_in_backp  = 1UL;
     290           0 :   metric_backp_cnt = 0UL;
     291           0 :   memset( metric_regime_ticks, 0, sizeof( metric_regime_ticks ) );
     292             : 
     293             :   /* in frag stream init */
     294             : 
     295           0 :   in_seq = 0UL; /* First in to poll */
     296             : 
     297           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     298           0 :   in = (fd_stem_tile_in_t *)FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_stem_tile_in_t), in_cnt*sizeof(fd_stem_tile_in_t) );
     299             : 
     300           0 :   if( FD_UNLIKELY( !!in_cnt && !in_mcache ) ) FD_LOG_ERR(( "NULL in_mcache" ));
     301           0 :   if( FD_UNLIKELY( !!in_cnt && !in_fseq   ) ) FD_LOG_ERR(( "NULL in_fseq"   ));
     302           0 :   if( FD_UNLIKELY( in_cnt > UINT_MAX ) )      FD_LOG_ERR(( "in_cnt too large" ));
     303           0 :   for( ulong in_idx=0UL; in_idx<in_cnt; in_idx++ ) {
     304             : 
     305           0 :     if( FD_UNLIKELY( !in_mcache[ in_idx ] ) ) FD_LOG_ERR(( "NULL in_mcache[%lu]", in_idx ));
     306           0 :     if( FD_UNLIKELY( !in_fseq  [ in_idx ] ) ) FD_LOG_ERR(( "NULL in_fseq[%lu]",   in_idx ));
     307             : 
     308           0 :     fd_stem_tile_in_t * this_in = &in[ in_idx ];
     309             : 
     310           0 :     this_in->mcache = in_mcache[ in_idx ];
     311           0 :     this_in->fseq   = in_fseq  [ in_idx ];
     312             : 
     313           0 :     ulong depth    = fd_mcache_depth( this_in->mcache );
     314           0 :     if( FD_UNLIKELY( depth > UINT_MAX ) ) FD_LOG_ERR(( "in_mcache[%lu] too deep", in_idx ));
     315           0 :     this_in->depth = (uint)depth;
     316           0 :     this_in->idx   = (uint)in_idx;
     317           0 :     this_in->seq   = 0UL;
     318           0 :     this_in->mline = this_in->mcache + fd_mcache_line_idx( this_in->seq, this_in->depth );
     319             : 
     320           0 :     this_in->accum[0] = 0U; this_in->accum[1] = 0U; this_in->accum[2] = 0U;
     321           0 :     this_in->accum[3] = 0U; this_in->accum[4] = 0U; this_in->accum[5] = 0U;
     322           0 :   }
     323             : 
     324             :   /* out frag stream init */
     325             : 
     326           0 :   cr_avail     = (ulong *)FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), out_cnt*sizeof(ulong) );
     327           0 :   min_cr_avail = 0UL;
     328             : 
     329           0 :   out_depth  = (ulong *)FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), out_cnt*sizeof(ulong) );
     330           0 :   out_seq    = (ulong *)FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong), out_cnt*sizeof(ulong) );
     331             : 
     332           0 :   ulong cr_max = fd_ulong_if( !out_cnt, 128UL, ULONG_MAX );
     333             : 
     334           0 :   for( ulong out_idx=0UL; out_idx<out_cnt; out_idx++ ) {
     335             : 
     336           0 :     if( FD_UNLIKELY( !out_mcache[ out_idx ] ) ) FD_LOG_ERR(( "NULL out_mcache[%lu]", out_idx ));
     337             : 
     338           0 :     out_depth[ out_idx ] = fd_mcache_depth( out_mcache[ out_idx ] );
     339           0 :     out_seq[ out_idx ] = 0UL;
     340             : 
     341           0 :     cr_avail[ out_idx ] = out_depth[ out_idx ];
     342           0 :   }
     343             : 
     344           0 :   cons_fseq = (ulong const **)FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong const *), cons_cnt*sizeof(ulong const *) );
     345           0 :   cons_slow = (volatile ulong **)FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong *),       cons_cnt*sizeof(ulong *)       );
     346           0 :   cons_out  = (ulong *)       FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong),         cons_cnt*sizeof(ulong)         );
     347           0 :   cons_seq  = (ulong *)       FD_SCRATCH_ALLOC_APPEND( l, alignof(ulong),         cons_cnt*sizeof(ulong)         );
     348             : 
     349           0 :   if( FD_UNLIKELY( !!cons_cnt && !_cons_fseq ) ) FD_LOG_ERR(( "NULL cons_fseq" ));
     350           0 :   if( FD_UNLIKELY( !!cons_cnt && !_cons_slow ) ) FD_LOG_ERR(( "NULL cons_slow" ));
     351           0 :   for( ulong cons_idx=0UL; cons_idx<cons_cnt; cons_idx++ ) {
     352           0 :     if( FD_UNLIKELY( !_cons_fseq[ cons_idx ] ) ) FD_LOG_ERR(( "NULL cons_fseq[%lu]", cons_idx ));
     353           0 :     if( FD_UNLIKELY( !_cons_slow[ cons_idx ] ) ) FD_LOG_ERR(( "NULL cons_slow[%lu]", cons_idx ));
     354           0 :     cons_fseq[ cons_idx ] = _cons_fseq[ cons_idx ];
     355           0 :     cons_out [ cons_idx ] = _cons_out [ cons_idx ];
     356           0 :     cons_slow[ cons_idx ] = _cons_slow[ cons_idx ];
     357           0 :     cons_seq [ cons_idx ] = fd_fseq_query( _cons_fseq[ cons_idx ] );
     358             : 
     359           0 :     cr_max = fd_ulong_min( cr_max, out_depth[ cons_out[ cons_idx ] ] );
     360           0 :   }
     361             : 
     362           0 :   if( FD_UNLIKELY( burst>cr_max ) ) FD_LOG_ERR(( "one or more out links have insufficient depth for STEM_BURST %lu. cr_max is %lu", burst, cr_max ));
     363             : 
     364             :   /* housekeeping init */
     365             : 
     366           0 :   if( lazy<=0L ) lazy = fd_tempo_lazy_default( cr_max );
     367           0 :   if( FD_UNLIKELY( lazy>(long)1e9 ) ) FD_LOG_ERR(( "excessive stem lazy value: %li", lazy ));
     368           0 :   FD_LOG_INFO(( "Configuring housekeeping (lazy %li ns)", lazy ));
     369             : 
     370             :   /* Initialize the initial event sequence to immediately update
     371             :      cr_avail on the first run loop iteration and then update all the
     372             :      ins accordingly. */
     373             : 
     374           0 :   event_cnt = in_cnt + 1UL + cons_cnt;
     375           0 :   event_map = (ushort *)FD_SCRATCH_ALLOC_APPEND( l, alignof(ushort), event_cnt*sizeof(ushort) );
     376           0 :   event_seq = 0UL;                                         event_map[ event_seq++ ] = (ushort)cons_cnt;
     377           0 :   for( ulong   in_idx=0UL;   in_idx< in_cnt;  in_idx++   ) event_map[ event_seq++ ] = (ushort)(in_idx+cons_cnt+1UL);
     378           0 :   for( ulong cons_idx=0UL; cons_idx<cons_cnt; cons_idx++ ) event_map[ event_seq++ ] = (ushort)cons_idx;
     379           0 :   event_seq = 0UL;
     380             : 
     381           0 :   async_min = fd_tempo_async_min( lazy, event_cnt, (float)fd_tempo_tick_per_ns( NULL ) );
     382           0 :   if( FD_UNLIKELY( !async_min ) ) FD_LOG_ERR(( "bad lazy %lu %lu", (ulong)lazy, event_cnt ));
     383             : 
     384           0 :   FD_LOG_INFO(( "Running stem, cr_max = %lu", cr_max ));
     385           0 :   FD_MGAUGE_SET( TILE, STATUS, 1UL );
     386           0 :   long then = fd_tickcount();
     387           0 :   long now  = then;
     388           0 :   for(;;) {
     389             : 
     390             : #ifdef STEM_CALLBACK_SHOULD_SHUTDOWN
     391           0 :     if( FD_UNLIKELY( STEM_CALLBACK_SHOULD_SHUTDOWN( ctx ) ) ) break;
     392           0 : #endif
     393             : 
     394             :     /* Do housekeeping at a low rate in the background */
     395             : 
     396           0 :     ulong housekeeping_ticks = 0UL;
     397           0 :     if( FD_UNLIKELY( (now-then)>=0L ) ) {
     398           0 :       ulong event_idx = (ulong)event_map[ event_seq ];
     399             : 
     400             :       /* Do the next async event.  event_idx:
     401             :             <out_cnt - receive credits from out event_idx
     402             :            ==out_cnt - housekeeping
     403             :             >out_cnt - send credits to in event_idx - out_cnt - 1.
     404             :          Branch hints and order are optimized for the case:
     405             :            out_cnt >~ in_cnt >~ 1. */
     406             : 
     407           0 :       if( FD_LIKELY( event_idx<cons_cnt ) ) { /* cons fctl for cons cons_idx */
     408           0 :         ulong cons_idx = event_idx;
     409             : 
     410             :         /* Receive flow control credits from this out. */
     411           0 :         cons_seq[ cons_idx ] = fd_fseq_query( cons_fseq[ cons_idx ] );
     412             : 
     413           0 :       } else if( FD_LIKELY( event_idx>cons_cnt ) ) { /* in fctl for in in_idx */
     414           0 :         ulong in_idx = event_idx - cons_cnt - 1UL;
     415             : 
     416             :         /* Send flow control credits and drain flow control diagnostics
     417             :            for in_idx. */
     418             : 
     419           0 :         STEM_(in_update)( &in[ in_idx ] );
     420             : 
     421           0 :       } else { /* event_idx==cons_cnt, housekeeping event */
     422             : 
     423             :         /* Update metrics counters to external viewers */
     424           0 :         FD_COMPILER_MFENCE();
     425           0 :         FD_MGAUGE_SET( TILE, HEARTBEAT,                 (ulong)now );
     426           0 :         FD_MGAUGE_SET( TILE, IN_BACKPRESSURE,           metric_in_backp );
     427           0 :         FD_MCNT_INC  ( TILE, BACKPRESSURE_COUNT,        metric_backp_cnt );
     428           0 :         FD_MCNT_ENUM_COPY( TILE, REGIME_DURATION_NANOS, metric_regime_ticks );
     429             : #ifdef STEM_CALLBACK_METRICS_WRITE
     430           0 :         STEM_CALLBACK_METRICS_WRITE( ctx );
     431             : #endif
     432           0 :         FD_COMPILER_MFENCE();
     433           0 :         metric_backp_cnt = 0UL;
     434             : 
     435             :         /* Receive flow control credits */
     436           0 :         if( FD_LIKELY( min_cr_avail<cr_max ) ) {
     437           0 :           ulong slowest_cons = ULONG_MAX;
     438           0 :           min_cr_avail = cr_max;
     439           0 :           for( ulong out_idx=0; out_idx<out_cnt; out_idx++ ) {
     440           0 :             cr_avail[ out_idx ] = out_depth[ out_idx ];
     441           0 :           }
     442             : 
     443           0 :           for( ulong cons_idx=0UL; cons_idx<cons_cnt; cons_idx++ ) {
     444           0 :             ulong out_idx = cons_out[ cons_idx ];
     445           0 :             ulong cons_cr_avail = (ulong)fd_long_max( (long)out_depth[ out_idx ]-fd_long_max( fd_seq_diff( out_seq[ out_idx ], cons_seq[ cons_idx ] ), 0L ), 0L );
     446             : 
     447             :             /* If a reliable consumer exits, they can set the credit
     448             :                return fseq to STEM_SHUTDOWN_SEQ to indicate they are no
     449             :                longer actively consuming. */
     450           0 :             cons_cr_avail = fd_ulong_if( cons_seq[ cons_idx ]==STEM_SHUTDOWN_SEQ, out_depth[ out_idx ], cons_cr_avail );
     451           0 :             slowest_cons = fd_ulong_if( cons_cr_avail<min_cr_avail, cons_idx, slowest_cons );
     452             : 
     453           0 :             cr_avail[ out_idx ] = fd_ulong_min( cr_avail[ out_idx ], cons_cr_avail );
     454           0 :             min_cr_avail        = fd_ulong_min( cons_cr_avail, min_cr_avail );
     455           0 :           }
     456             : 
     457             :           /* See notes above about use of quasi-atomic diagnostic accum */
     458           0 :           if( FD_LIKELY( slowest_cons!=ULONG_MAX ) ) {
     459           0 :             FD_COMPILER_MFENCE();
     460           0 :             (*cons_slow[ slowest_cons ]) += metric_in_backp;
     461           0 :             FD_COMPILER_MFENCE();
     462           0 :           }
     463           0 :         }
     464             : 
     465             : #ifdef STEM_CALLBACK_DURING_HOUSEKEEPING
     466           0 :         STEM_CALLBACK_DURING_HOUSEKEEPING( ctx );
     467             : #else
     468             :         (void)ctx;
     469             : #endif
     470           0 :       }
     471             : 
     472             :       /* Select which event to do next (randomized round robin) and
     473             :          reload the housekeeping timer. */
     474             : 
     475           0 :       event_seq++;
     476           0 :       if( FD_UNLIKELY( event_seq>=event_cnt ) ) {
     477           0 :         event_seq = 0UL;
     478             : 
     479             :         /* Randomize the order of event processing for the next event
     480             :            event_cnt events to avoid lighthousing effects causing input
     481             :            credit starvation at extreme fan in/fan out, extreme in load
     482             :            and high credit return laziness. */
     483             : 
     484           0 :         ulong  swap_idx = (ulong)fd_rng_uint_roll( rng, (uint)event_cnt );
     485           0 :         ushort map_tmp        = event_map[ swap_idx ];
     486           0 :         event_map[ swap_idx ] = event_map[ 0        ];
     487           0 :         event_map[ 0        ] = map_tmp;
     488             : 
     489             :         /* We also do the same with the ins to prevent there being a
     490             :            correlated order frag origins from different inputs
     491             :            downstream at extreme fan in and extreme in load. */
     492             : 
     493           0 :         if( FD_LIKELY( in_cnt>1UL ) ) {
     494           0 :           swap_idx = (ulong)fd_rng_uint_roll( rng, (uint)in_cnt );
     495           0 :           fd_stem_tile_in_t in_tmp;
     496           0 :           in_tmp         = in[ swap_idx ];
     497           0 :           in[ swap_idx ] = in[ 0        ];
     498           0 :           in[ 0        ] = in_tmp;
     499           0 :         }
     500           0 :       }
     501             : 
     502             :       /* Reload housekeeping timer */
     503           0 :       then = now + (long)fd_tempo_async_reload( rng, async_min );
     504           0 :       long next = fd_tickcount();
     505           0 :       housekeeping_ticks = (ulong)(next - now);
     506           0 :       now = next;
     507           0 :     }
     508             : 
     509             : #if defined(STEM_CALLBACK_BEFORE_CREDIT) || defined(STEM_CALLBACK_AFTER_CREDIT) || defined(STEM_CALLBACK_AFTER_FRAG) || defined(STEM_CALLBACK_RETURNABLE_FRAG)
     510             :     fd_stem_context_t stem = {
     511             :       .mcaches             = out_mcache,
     512             :       .depths              = out_depth,
     513             :       .seqs                = out_seq,
     514             : 
     515             :       .cr_avail            = cr_avail,
     516             :       .min_cr_avail        = &min_cr_avail,
     517             :       .cr_decrement_amount = fd_ulong_if( out_cnt>0UL, 1UL, 0UL ),
     518             :     };
     519             : #endif
     520             : 
     521           0 :     int charge_busy_before = 0;
     522             : #ifdef STEM_CALLBACK_BEFORE_CREDIT
     523           0 :     STEM_CALLBACK_BEFORE_CREDIT( ctx, &stem, &charge_busy_before );
     524             : #endif
     525             : 
     526             :   /* Check if we are backpressured.  If so, count any transition into
     527             :      a backpressured regime and spin to wait for flow control credits
     528             :      to return.  We don't do a fully atomic update here as it is only
     529             :      diagnostic and it will still be correct in the usual case where
     530             :      individual diagnostic counters aren't used by writers in
     531             :      different threads of execution.  We only count the transition
     532             :      from not backpressured to backpressured. */
     533             : 
     534           0 :     if( FD_UNLIKELY( min_cr_avail<burst ) ) {
     535           0 :       metric_backp_cnt += (ulong)!metric_in_backp;
     536           0 :       metric_in_backp   = 1UL;
     537           0 :       FD_SPIN_PAUSE();
     538           0 :       metric_regime_ticks[2] += housekeeping_ticks;
     539           0 :       long next = fd_tickcount();
     540           0 :       metric_regime_ticks[5] += (ulong)(next - now);
     541           0 :       now = next;
     542           0 :       continue;
     543           0 :     }
     544           0 :     metric_in_backp = 0UL;
     545             : 
     546           0 :     int charge_busy_after = 0;
     547             : #ifdef STEM_CALLBACK_AFTER_CREDIT
     548             :     int poll_in = 1;
     549           0 :     STEM_CALLBACK_AFTER_CREDIT( ctx, &stem, &poll_in, &charge_busy_after );
     550           0 :     if( FD_UNLIKELY( !poll_in ) ) {
     551           0 :       metric_regime_ticks[1] += housekeeping_ticks;
     552           0 :       long next = fd_tickcount();
     553           0 :       metric_regime_ticks[4] += (ulong)(next - now);
     554           0 :       now = next;
     555           0 :       continue;
     556           0 :     }
     557           0 : #endif
     558             : 
     559             :     /* Select which in to poll next (randomized round robin) */
     560             : 
     561           0 :     if( FD_UNLIKELY( !in_cnt ) ) {
     562           0 :       int was_busy = charge_busy_before+charge_busy_after;
     563           0 :       metric_regime_ticks[0] += housekeeping_ticks;
     564           0 :       long next = fd_tickcount();
     565           0 :       if( FD_UNLIKELY( was_busy ) ) metric_regime_ticks[3] += (ulong)(next - now);
     566           0 :       else                          metric_regime_ticks[6] += (ulong)(next - now);
     567           0 :       now = next;
     568           0 :       continue;
     569           0 :     }
     570             : 
     571           0 :     ulong prefrag_ticks = 0UL;
     572             : #if defined(STEM_CALLBACK_BEFORE_CREDIT) && defined(STEM_CALLBACK_AFTER_CREDIT)
     573           0 :     if( FD_LIKELY( charge_busy_before || charge_busy_after ) ) {
     574             : #elif defined(STEM_CALLBACK_BEFORE_CREDIT)
     575           0 :     if( FD_LIKELY( charge_busy_before ) ) {
     576             : #elif defined(STEM_CALLBACK_AFTER_CREDIT)
     577           0 :     if( FD_LIKELY( charge_busy_after ) ) {
     578           0 : #endif
     579             : 
     580             : #if defined(STEM_CALLBACK_BEFORE_CREDIT) || defined(STEM_CALLBACK_AFTER_CREDIT)
     581           0 :       long prefrag_next = fd_tickcount();
     582           0 :       prefrag_ticks = (ulong)(prefrag_next - now);
     583           0 :       now = prefrag_next;
     584           0 :     }
     585             : #endif
     586             : 
     587           0 :     fd_stem_tile_in_t * this_in = &in[ in_seq ];
     588           0 :     in_seq++;
     589           0 :     if( in_seq>=in_cnt ) in_seq = 0UL; /* cmov */
     590             : 
     591             :     /* Check if this in has any new fragments to mux */
     592             : 
     593           0 :     ulong                  this_in_seq   = this_in->seq;
     594           0 :     fd_frag_meta_t const * this_in_mline = this_in->mline; /* Already at appropriate line for this_in_seq */
     595             : 
     596           0 : #if FD_HAS_SSE
     597           0 :     __m128i seq_sig = fd_frag_meta_seq_sig_query( this_in_mline );
     598           0 :     ulong seq_found = fd_frag_meta_sse0_seq( seq_sig );
     599           0 :     ulong sig       = fd_frag_meta_sse0_sig( seq_sig );
     600             : #else
     601             :     /* Without SSE, seq and sig might be read from different frags (due
     602             :        to overrun), which results in a before_frag and during_frag being
     603             :        issued with incorrect arguments, but not after_frag. */
     604             :     ulong seq_found = FD_VOLATILE_CONST( this_in_mline->seq );
     605             :     ulong sig       = FD_VOLATILE_CONST( this_in_mline->sig );
     606             : #endif
     607           0 :     (void)sig;
     608             : 
     609           0 :     long diff = fd_seq_diff( this_in_seq, seq_found );
     610           0 :     if( FD_UNLIKELY( diff ) ) { /* Caught up or overrun, optimize for new frag case */
     611           0 :       ulong * housekeeping_regime = &metric_regime_ticks[0];
     612           0 :       ulong * prefrag_regime = &metric_regime_ticks[3];
     613           0 :       ulong * finish_regime = &metric_regime_ticks[6];
     614           0 :       if( FD_UNLIKELY( diff<0L ) ) { /* Overrun (impossible if in is honoring our flow control) */
     615           0 :         this_in->seq = seq_found; /* Resume from here (probably reasonably current, could query in mcache sync directly instead) */
     616           0 :         housekeeping_regime = &metric_regime_ticks[1];
     617           0 :         prefrag_regime = &metric_regime_ticks[4];
     618           0 :         finish_regime = &metric_regime_ticks[7];
     619           0 :         this_in->accum[ FD_METRICS_COUNTER_LINK_OVERRUN_POLLING_COUNT_OFF ]++;
     620           0 :         this_in->accum[ FD_METRICS_COUNTER_LINK_OVERRUN_POLLING_FRAG_COUNT_OFF ] += (uint)(-diff);
     621             : 
     622             : #ifdef STEM_CALLBACK_AFTER_POLL_OVERRUN
     623           0 :         STEM_CALLBACK_AFTER_POLL_OVERRUN( ctx );
     624             : #endif
     625           0 :       }
     626             : 
     627             :       /* Don't bother with spin as polling multiple locations */
     628           0 :       *housekeeping_regime += housekeeping_ticks;
     629           0 :       *prefrag_regime += prefrag_ticks;
     630           0 :       long next = fd_tickcount();
     631           0 :       *finish_regime += (ulong)(next - now);
     632           0 :       now = next;
     633           0 :       continue;
     634           0 :     }
     635             : 
     636             : #ifdef STEM_CALLBACK_BEFORE_FRAG
     637           0 :     int filter = STEM_CALLBACK_BEFORE_FRAG( ctx, (ulong)this_in->idx, seq_found, sig );
     638           0 :     if( FD_UNLIKELY( filter<0 ) ) {
     639           0 :       metric_regime_ticks[1] += housekeeping_ticks;
     640           0 :       metric_regime_ticks[4] += prefrag_ticks;
     641           0 :       long next = fd_tickcount();
     642           0 :       metric_regime_ticks[7] += (ulong)(next - now);
     643           0 :       now = next;
     644           0 :       continue;
     645           0 :     } else if( FD_UNLIKELY( filter>0 ) ) {
     646           0 :       this_in->accum[ FD_METRICS_COUNTER_LINK_FILTERED_COUNT_OFF ]++;
     647           0 :       this_in->accum[ FD_METRICS_COUNTER_LINK_FILTERED_SIZE_BYTES_OFF ] += (uint)this_in_mline->sz; /* TODO: This might be overrun ... ? Not loaded atomically */
     648             : 
     649             :       this_in_seq    = fd_seq_inc( this_in_seq, 1UL );
     650             :       this_in->seq   = this_in_seq;
     651             :       this_in->mline = this_in->mcache + fd_mcache_line_idx( this_in_seq, this_in->depth );
     652             : 
     653           0 :       metric_regime_ticks[1] += housekeeping_ticks;
     654           0 :       metric_regime_ticks[4] += prefrag_ticks;
     655           0 :       long next = fd_tickcount();
     656           0 :       metric_regime_ticks[7] += (ulong)(next - now);
     657           0 :       now = next;
     658           0 :       continue;
     659           0 :     }
     660           0 : #endif
     661             : 
     662             :     /* We have a new fragment to mux.  Try to load it.  This attempt
     663             :        should always be successful if in producers are honoring our flow
     664             :        control.  Since we can cheaply detect if there are
     665             :        misconfigurations (should be an L1 cache hit / predictable branch
     666             :        in the properly configured case), we do so anyway.  Note that if
     667             :        we are on a platform where AVX is atomic, this could be replaced
     668             :        by a flat AVX load of the metadata and an extraction of the found
     669             :        sequence number for higher performance. */
     670           0 :     FD_COMPILER_MFENCE();
     671           0 :     ulong chunk    = (ulong)this_in_mline->chunk;  (void)chunk;
     672           0 :     ulong sz       = (ulong)this_in_mline->sz;     (void)sz;
     673           0 :     ulong ctl      = (ulong)this_in_mline->ctl;    (void)ctl;
     674           0 :     ulong tsorig   = (ulong)this_in_mline->tsorig; (void)tsorig;
     675           0 :     ulong tspub    = (ulong)this_in_mline->tspub;  (void)tspub;
     676             : 
     677             : #ifdef STEM_CALLBACK_DURING_FRAG1
     678           0 :     STEM_CALLBACK_DURING_FRAG1( ctx, (ulong)this_in->idx, seq_found, sig, chunk, sz, ctl, tsorig, tspub );
     679             : #endif
     680             : #ifdef STEM_CALLBACK_DURING_FRAG
     681           0 :     STEM_CALLBACK_DURING_FRAG( ctx, (ulong)this_in->idx, seq_found, sig, chunk, sz, ctl );
     682             : #endif
     683             : 
     684           0 :     FD_COMPILER_MFENCE();
     685           0 :     ulong seq_test =        this_in_mline->seq;
     686           0 :     FD_COMPILER_MFENCE();
     687             : 
     688           0 :     if( FD_UNLIKELY( fd_seq_ne( seq_test, seq_found ) ) ) { /* Overrun while reading (impossible if this_in honoring our fctl) */
     689           0 :       this_in->seq = seq_test; /* Resume from here (probably reasonably current, could query in mcache sync instead) */
     690           0 :       fd_metrics_link_in( fd_metrics_base_tl, this_in->idx )[ FD_METRICS_COUNTER_LINK_OVERRUN_READING_COUNT_OFF ]++; /* No local accum since extremely rare, faster to use smaller cache line */
     691           0 :       fd_metrics_link_in( fd_metrics_base_tl, this_in->idx )[ FD_METRICS_COUNTER_LINK_OVERRUN_READING_FRAG_COUNT_OFF ] += (uint)fd_seq_diff( seq_test, seq_found ); /* No local accum since extremely rare, faster to use smaller cache line */
     692             :       /* Don't bother with spin as polling multiple locations */
     693           0 :       metric_regime_ticks[1] += housekeeping_ticks;
     694           0 :       metric_regime_ticks[4] += prefrag_ticks;
     695           0 :       long next = fd_tickcount();
     696           0 :       metric_regime_ticks[7] += (ulong)(next - now);
     697           0 :       now = next;
     698           0 :       continue;
     699           0 :     }
     700             : 
     701             : #ifdef STEM_CALLBACK_RETURNABLE_FRAG
     702           0 :     int return_frag = STEM_CALLBACK_RETURNABLE_FRAG( ctx, (ulong)this_in->idx, seq_found, sig, chunk, sz, ctl, tsorig, tspub, &stem );
     703           0 :     if( FD_UNLIKELY( return_frag ) ) {
     704           0 :       metric_regime_ticks[1] += housekeeping_ticks;
     705           0 :       long next = fd_tickcount();
     706           0 :       metric_regime_ticks[4] += (ulong)(next - now);
     707           0 :       now = next;
     708           0 :       continue;
     709           0 :     }
     710           0 : #endif
     711             : 
     712             : #ifdef STEM_CALLBACK_AFTER_FRAG
     713           0 :     STEM_CALLBACK_AFTER_FRAG( ctx, (ulong)this_in->idx, seq_found, sig, sz, tsorig, tspub, &stem );
     714           0 : #endif
     715             : 
     716             :     /* Windup for the next in poll and accumulate diagnostics */
     717             : 
     718           0 :     this_in_seq    = fd_seq_inc( this_in_seq, 1UL );
     719           0 :     this_in->seq   = this_in_seq;
     720           0 :     this_in->mline = this_in->mcache + fd_mcache_line_idx( this_in_seq, this_in->depth );
     721             : 
     722           0 :     this_in->accum[ FD_METRICS_COUNTER_LINK_CONSUMED_COUNT_OFF ]++;
     723           0 :     this_in->accum[ FD_METRICS_COUNTER_LINK_CONSUMED_SIZE_BYTES_OFF ] += (uint)sz;
     724             : 
     725           0 :     metric_regime_ticks[1] += housekeeping_ticks;
     726           0 :     metric_regime_ticks[4] += prefrag_ticks;
     727           0 :     long next = fd_tickcount();
     728           0 :     metric_regime_ticks[7] += (ulong)(next - now);
     729           0 :     now = next;
     730           0 :   }
     731           0 : }
     732             : 
     733             : FD_FN_UNUSED static void
     734             : STEM_(run)( fd_topo_t *      topo,
     735           0 :             fd_topo_tile_t * tile ) {
     736           0 :   const fd_frag_meta_t * in_mcache[ FD_TOPO_MAX_LINKS ];
     737           0 :   ulong * in_fseq[ FD_TOPO_MAX_TILE_IN_LINKS ];
     738             : 
     739           0 :   ulong polled_in_cnt = 0UL;
     740           0 :   for( ulong i=0UL; i<tile->in_cnt; i++ ) {
     741           0 :     if( FD_UNLIKELY( !tile->in_link_poll[ i ] ) ) continue;
     742             : 
     743           0 :     in_mcache[ polled_in_cnt ] = topo->links[ tile->in_link_id[ i ] ].mcache;
     744           0 :     FD_TEST( in_mcache[ polled_in_cnt ] );
     745           0 :     in_fseq[ polled_in_cnt ]   = tile->in_link_fseq[ i ];
     746           0 :     FD_TEST( in_fseq[ polled_in_cnt ] );
     747           0 :     polled_in_cnt += 1;
     748           0 :   }
     749             : 
     750           0 :   fd_frag_meta_t * out_mcache[ FD_TOPO_MAX_LINKS ];
     751           0 :   for( ulong i=0UL; i<tile->out_cnt; i++ ) {
     752           0 :     out_mcache[ i ] = topo->links[ tile->out_link_id[ i ] ].mcache;
     753           0 :     FD_TEST( out_mcache[ i ] );
     754           0 :   }
     755             : 
     756           0 :   ulong   reliable_cons_cnt = 0UL;
     757           0 :   ulong   cons_out[ FD_TOPO_MAX_LINKS ];
     758           0 :   ulong * cons_fseq[ FD_TOPO_MAX_LINKS ];
     759           0 :   volatile ulong * cons_slow[ FD_TOPO_MAX_LINKS ];
     760           0 :   for( ulong i=0UL; i<topo->tile_cnt; i++ ) {
     761           0 :     fd_topo_tile_t * consumer_tile = &topo->tiles[ i ];
     762           0 :     ulong polled_in_idx = 0UL;
     763           0 :     for( ulong j=0UL; j<consumer_tile->in_cnt; j++ ) {
     764           0 :       int is_polled = consumer_tile->in_link_poll[ j ];
     765           0 :       for( ulong k=0UL; k<tile->out_cnt; k++ ) {
     766           0 :         if( FD_UNLIKELY( consumer_tile->in_link_id[ j ]==tile->out_link_id[ k ] && consumer_tile->in_link_reliable[ j ] ) ) {
     767           0 :           cons_out[ reliable_cons_cnt ] = k;
     768           0 :           cons_fseq[ reliable_cons_cnt ] = consumer_tile->in_link_fseq[ j ];
     769           0 :           FD_TEST( cons_fseq[ reliable_cons_cnt ] );
     770           0 :           cons_slow[ reliable_cons_cnt ] = fd_metrics_link_in( consumer_tile->metrics, polled_in_idx ) + FD_METRICS_COUNTER_LINK_SLOW_COUNT_OFF;
     771           0 :           reliable_cons_cnt++;
     772             :           /* Need to test this, since each link may connect to many outs,
     773             :              you could construct a topology which has more than this
     774             :              consumers of links. */
     775           0 :           FD_TEST( reliable_cons_cnt<FD_TOPO_MAX_LINKS );
     776           0 :         }
     777           0 :       }
     778           0 :       if( FD_LIKELY( is_polled ) ) polled_in_idx++;
     779           0 :     }
     780           0 :   }
     781             : 
     782           0 :   fd_rng_t rng[1];
     783           0 :   FD_TEST( fd_rng_join( fd_rng_new( rng, 0, 0UL ) ) );
     784             : 
     785           0 :   STEM_CALLBACK_CONTEXT_TYPE * ctx = (STEM_CALLBACK_CONTEXT_TYPE*)fd_ulong_align_up( (ulong)fd_topo_obj_laddr( topo, tile->tile_obj_id ), STEM_CALLBACK_CONTEXT_ALIGN );
     786             : 
     787           0 :   STEM_(run1)( polled_in_cnt,
     788           0 :                in_mcache,
     789           0 :                in_fseq,
     790           0 :                tile->out_cnt,
     791           0 :                out_mcache,
     792           0 :                reliable_cons_cnt,
     793           0 :                cons_out,
     794           0 :                cons_fseq,
     795           0 :                cons_slow,
     796           0 :                STEM_BURST,
     797           0 :                STEM_LAZY,
     798           0 :                rng,
     799           0 :                fd_alloca( FD_STEM_SCRATCH_ALIGN, STEM_(scratch_footprint)( polled_in_cnt, tile->out_cnt, reliable_cons_cnt ) ),
     800           0 :                ctx );
     801             : 
     802             : #ifdef STEM_CALLBACK_METRICS_WRITE
     803             :   /* Write final metrics state before shutting down */
     804           0 :   FD_COMPILER_MFENCE();
     805           0 :   STEM_CALLBACK_METRICS_WRITE( ctx );
     806           0 :   FD_COMPILER_MFENCE();
     807             : #endif
     808             : 
     809           0 :   if( FD_LIKELY( tile->allow_shutdown ) ) {
     810           0 :     for( ulong i=0UL; i<tile->in_cnt; i++ ) {
     811           0 :       if( FD_UNLIKELY( !tile->in_link_poll[ i ] || !tile->in_link_reliable[ i ] ) ) continue;
     812             : 
     813             :       /* Return infinite credits on any reliable consumer links so that
     814             :          producers now no longer expect us to consume. */
     815           0 :       ulong fseq_id = tile->in_link_fseq_obj_id[ i ];
     816           0 :       ulong * fseq = fd_fseq_join( fd_topo_obj_laddr( topo, fseq_id ) );
     817           0 :       FD_TEST( fseq );
     818           0 :       fd_fseq_update( fseq, STEM_SHUTDOWN_SEQ );
     819           0 :     }
     820           0 :   }
     821           0 : }
     822             : 
     823             : #undef STEM_NAME
     824             : #undef STEM_
     825             : #undef STEM_BURST
     826             : #undef STEM_CALLBACK_CONTEXT_TYPE
     827             : #undef STEM_CALLBACK_CONTEXT_ALIGN
     828             : #undef STEM_LAZY
     829             : #undef STEM_CALLBACK_SHOULD_SHUTDOWN
     830             : #undef STEM_CALLBACK_DURING_HOUSEKEEPING
     831             : #undef STEM_CALLBACK_METRICS_WRITE
     832             : #undef STEM_CALLBACK_BEFORE_CREDIT
     833             : #undef STEM_CALLBACK_AFTER_CREDIT
     834             : #undef STEM_CALLBACK_BEFORE_FRAG
     835             : #undef STEM_CALLBACK_DURING_FRAG
     836             : #undef STEM_CALLBACK_RETURNABLE_FRAG
     837             : #undef STEM_CALLBACK_AFTER_FRAG
     838             : #undef STEM_CALLBACK_AFTER_POLL_OVERRUN

Generated by: LCOV version 1.14