LCOV - code coverage report
Current view: top level - discof/replay - fd_sched.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 865 0.0 %
Date: 2025-09-19 04:41:14 Functions: 0 37 0.0 %

          Line data    Source code
       1             : #include "fd_sched.h"
       2             : #include "../../flamenco/runtime/fd_runtime.h" /* for fd_runtime_load_txn_address_lookup_tables */
       3             : 
       4             : #include "../../flamenco/runtime/sysvar/fd_sysvar_slot_hashes.h" /* for ALUTs */
       5             : 
       6             : 
       7             : // TODO can the bounds be tighter?
       8             : #define FD_SCHED_MAX_TXN_PER_BLOCK         (FD_TXN_MAX_PER_SLOT)
       9           0 : #define FD_SCHED_MAX_BLOCK_DEPTH           (1024UL)
      10             : #define FD_SCHED_MAX_NON_EMPTY_BLOCK_DEPTH (32UL)
      11           0 : #define FD_SCHED_MAX_DEPTH                 (FD_RDISP_MAX_DEPTH>>2)
      12             : #define FD_SCHED_MAX_STAGING_LANES_LOG     (2)
      13             : #define FD_SCHED_MAX_STAGING_LANES         (1UL<<FD_SCHED_MAX_STAGING_LANES_LOG)
      14             : 
      15             : /* We size the buffer to be able to hold residual data from the previous
      16             :    FEC set that only becomes parseable after the next FEC set is
      17             :    ingested, as well as the incoming FEC set.  The largest minimally
      18             :    parseable unit of data is a transaction.  So that much data may
      19             :    straddle FEC set boundaries.  Other minimally parseable units of data
      20             :    include the microblock header and the microblock count within a
      21             :    batch. */
      22           0 : #define FD_SCHED_MAX_PAYLOAD_PER_FEC       (FD_STORE_DATA_MAX)
      23             : #define FD_SCHED_MAX_FEC_BUF_SZ            (FD_SCHED_MAX_PAYLOAD_PER_FEC+FD_TXN_MTU)
      24             : FD_STATIC_ASSERT( FD_TXN_MTU>=sizeof(fd_microblock_hdr_t), resize buffer for residual data );
      25             : FD_STATIC_ASSERT( FD_TXN_MTU>=sizeof(ulong),               resize buffer for residual data );
      26             : 
      27           0 : #define FD_SCHED_MAGIC (0xace8a79c181f89b6UL) /* echo -n "fd_sched_v0" | sha512sum | head -c 16 */
      28             : 
      29             : 
      30             : /* Structs. */
      31             : 
      32             : struct fd_sched_block {
      33             :   fd_sched_block_id_t block_id;
      34             :   ulong               next;        /* reserved for internal use by fd_pool, fd_map_chain */
      35             :   ulong               parent_idx;  /* index of the parent in the pool */
      36             :   ulong               child_idx;   /* index of the left-child in the pool */
      37             :   ulong               sibling_idx; /* index of the right-sibling in the pool */
      38             : 
      39             :   /* Counters. */
      40             :   uint                txn_parsed_cnt;
      41             :   /*                  txn_queued_cnt = txn_parsed_cnt-txn_in_flight_cnt-txn_done_cnt */
      42             :   uint                txn_in_flight_cnt;
      43             :   uint                txn_done_cnt;
      44             :   uint                shred_cnt;
      45             : 
      46             :   /* Parser state. */
      47             :   uchar               txn[ FD_TXN_MAX_SZ ] __attribute__((aligned(alignof(fd_txn_t))));
      48             :   fd_hash_t           poh;          /* latest PoH hash we've seen from the ingested FEC sets */
      49             :   ulong               mblks_rem;    /* number of microblocks remaining in the current batch */
      50             :   ulong               txns_rem;     /* number of transactions remaining in the current microblock */
      51             :   fd_acct_addr_t      aluts[ 256 ]; /* resolve ALUT accounts into this buffer for more parallelism */
      52             :   uint                fec_buf_sz;   /* size of the fec_buf in bytes */
      53             :   uint                fec_buf_soff; /* starting offset into fec_buf for unparsed transactions */
      54             :   uint                fec_eob:1;    /* FEC end-of-batch: set if the last FEC set in the batch is being
      55             :                                        ingested */
      56             :   uint                fec_sob:1;    /* FEC start-of-batch: set if the parser expects to be receiving a new
      57             :                                        batch */
      58             : 
      59             :   /* Block state. */
      60             :   uint                fec_eos:1;                          /* FEC end-of-stream: set if the last FEC set in the block has been
      61             :                                                              ingested */
      62             :   uint                rooted:1;                           /* set if the block is rooted */
      63             :   uint                dying:1;                            /* set if the block has been abandoned and no transactions should be
      64             :                                                              scheduled from it */
      65             :   uint                in_rdisp:1;                         /* set if the block is being tracked by the dispatcher, either as staged
      66             :                                                              or unstaged */
      67             :   uint                block_start_signaled:1;             /* set if the start-of-block sentinel has been dispatched */
      68             :   uint                block_end_signaled:1;               /* set if the end-of-block sentinel has been dispatched */
      69             :   uint                staged:1;                           /* set if the block is in a dispatcher staging lane; a staged block is
      70             :                                                              tracked by the dispatcher */
      71             :   ulong               staging_lane;                       /* ignored if staged==0 */
      72             :   ulong               luf_depth;                          /* depth of longest unstaged fork starting from this node; only
      73             :                                                              stageable unstaged descendants are counted */
      74             :   uchar               fec_buf[ FD_SCHED_MAX_FEC_BUF_SZ ]; /* the previous FEC set could have some residual data that only becomes
      75             :                                                              parseable after the next FEC set is ingested */
      76             : };
      77             : typedef struct fd_sched_block fd_sched_block_t;
      78             : 
      79             : FD_STATIC_ASSERT( sizeof(fd_hash_t)==sizeof(((fd_microblock_hdr_t *)0)->hash), unexpected poh hash size );
      80             : 
      81             : #define POOL_NAME  block_pool
      82           0 : #define POOL_T     fd_sched_block_t
      83           0 : #define POOL_NEXT  next
      84             : #include "../../util/tmpl/fd_pool.c"
      85             : 
      86             : #define MAP_NAME               block_map
      87             : #define MAP_ELE_T              fd_sched_block_t
      88             : #define MAP_KEY_T              fd_sched_block_id_t
      89           0 : #define MAP_KEY                block_id
      90           0 : #define MAP_KEY_EQ(k0,k1)      (!memcmp((k0),(k1), sizeof(fd_sched_block_id_t)))
      91           0 : #define MAP_KEY_HASH(key,seed) (fd_ulong_hash((key)->id^seed))
      92           0 : #define MAP_NEXT               next
      93             : #include "../../util/tmpl/fd_map_chain.c"
      94             : 
      95             : struct fd_sched_metrics {
      96             :   uint  block_added_cnt;
      97             :   uint  block_added_staged_cnt;
      98             :   uint  block_added_unstaged_cnt;
      99             :   uint  block_added_dead_ood_cnt;
     100             :   uint  block_removed_cnt;
     101             :   uint  block_abandoned_cnt;
     102             :   uint  block_promoted_cnt;
     103             :   uint  block_demoted_cnt;
     104             :   uint  deactivate_no_child_cnt;
     105             :   uint  deactivate_no_txn_cnt;
     106             :   uint  deactivate_pruned_cnt;
     107             :   uint  deactivate_abandoned_cnt;
     108             :   uint  lane_switch_cnt;
     109             :   uint  lane_promoted_cnt;
     110             :   uint  lane_demoted_cnt;
     111             :   uint  alut_success_cnt;
     112             :   uint  alut_serializing_cnt;
     113             :   uint  txn_abandoned_parsed_cnt;
     114             :   uint  txn_abandoned_done_cnt;
     115             :   uint  txn_max_in_flight_cnt;
     116             :   ulong txn_weighted_in_flight_cnt;
     117             :   ulong txn_weighted_in_flight_tickcount;
     118             :   ulong txn_none_in_flight_tickcount;
     119             :   ulong txn_parsed_cnt;
     120             :   ulong txn_done_cnt;
     121             :   ulong bytes_ingested_cnt;
     122             :   ulong bytes_ingested_unparsed_cnt;
     123             :   ulong bytes_dropped_cnt;
     124             :   ulong fec_cnt;
     125             : };
     126             : typedef struct fd_sched_metrics fd_sched_metrics_t;
     127             : 
     128             : struct fd_sched {
     129             :   fd_sched_metrics_t  metrics[ 1 ];
     130             :   long                txn_in_flight_last_tick;
     131             :   ulong               root_idx;
     132             :   fd_rdisp_t *        rdisp;
     133             :   ulong               active_block_idx; /* index of the actively replayed block, or null_idx if no block is
     134             :                                            actively replayed; has to have a transaction to dispatch; staged
     135             :                                            blocks that have no transactions to dispatch are not eligible for
     136             :                                            being active. */
     137             :   ulong               staged_bitset;    /* bit i set if staging lane i is occupied */
     138             :   ulong               staged_head_block_idx[ FD_SCHED_MAX_STAGING_LANES ]; /* head of the linear chain in each staging lane, ignored if bit i is
     139             :                                                                               not set in the bitset */
     140             :   ulong               txn_pool_free_cnt;
     141             :   fd_txn_p_t          txn_pool[ FD_SCHED_MAX_DEPTH ];
     142             :   ulong               txn_to_block_idx[ FD_SCHED_MAX_DEPTH ]; /* index of the block that the txn belongs to */
     143             :   fd_sched_block_t *  block_pool; /* fd_pool of max_block_depth elements */
     144             :   block_map_t *       block_map;  /* map_chain */
     145             : };
     146             : typedef struct fd_sched fd_sched_t;
     147             : 
     148             : 
     149             : /* Internal helpers. */
     150             : 
     151             : static void
     152             : add_block( fd_sched_t *          sched,
     153             :            fd_sched_block_id_t * block_id,
     154             :            fd_sched_block_id_t * parent_block_id,
     155             :            fd_sched_block_t * *  out_block,
     156             :            fd_sched_block_t * *  out_parent_block );
     157             : 
     158             : static void
     159             : fd_sched_parse( fd_sched_t * sched, fd_sched_block_t * block, fd_sched_alut_ctx_t * alut_ctx );
     160             : 
     161             : static int
     162             : fd_sched_parse_txn( fd_sched_t * sched, fd_sched_block_t * block, fd_sched_alut_ctx_t * alut_ctx );
     163             : 
     164             : static void
     165             : try_activate_block( fd_sched_t * sched );
     166             : 
     167             : static void
     168             : subtree_abandon( fd_sched_t * sched, fd_sched_block_t * block );
     169             : 
     170             : FD_FN_UNUSED static ulong
     171             : find_and_stage_longest_unstaged_fork( fd_sched_t * sched, int lane_idx );
     172             : 
     173             : static ulong
     174             : compute_longest_unstaged_fork( fd_sched_t * sched, ulong block_idx );
     175             : 
     176             : static ulong
     177             : stage_longest_unstaged_fork( fd_sched_t * sched, ulong block_idx, int lane_idx );
     178             : 
     179             : FD_FN_UNUSED static inline int
     180           0 : block_is_void( fd_sched_block_t * block ) {
     181           0 :   /* We've seen everything in the block and no transaction got parsed
     182           0 :      out. */
     183           0 :   return block->fec_eos && block->txn_parsed_cnt==0;
     184           0 : }
     185             : 
     186             : static inline int
     187           0 : block_should_signal_end( fd_sched_block_t * block ) {
     188           0 :   ulong txn_queued_cnt = block->txn_parsed_cnt-block->txn_in_flight_cnt-block->txn_done_cnt;
     189           0 :   return block->fec_eos && txn_queued_cnt==0UL && !block->block_end_signaled;
     190           0 : }
     191             : 
     192             : static inline int
     193           0 : block_is_dispatchable( fd_sched_block_t * block ) {
     194           0 :   return block->txn_parsed_cnt>block->txn_done_cnt ||
     195           0 :          !block->block_start_signaled ||
     196           0 :          block_should_signal_end( block );
     197           0 : }
     198             : 
     199             : static inline int
     200           0 : block_is_done( fd_sched_block_t * block ) {
     201           0 :   return block->fec_eos && !block_is_dispatchable( block );
     202           0 : }
     203             : 
     204             : static inline int
     205           0 : block_is_stageable( fd_sched_block_t * block ) {
     206           0 :   int rv = !block_is_done( block ) && !block->dying;
     207           0 :   if( FD_UNLIKELY( rv && !block->in_rdisp ) ) {
     208             :     /* Invariant: stageable blocks may be currently staged or unstaged,
     209             :        but must be in the dispatcher either way.  When a block
     210             :        transitions to DONE, it will be immediately removed from the
     211             :        dispatcher.  When a block transitions to DYING, it will be
     212             :        eventually abandoned from the dispatcher. */
     213           0 :     FD_LOG_CRIT(( "invariant violation: stageable block->in_rdisp==0, slot %lu, prime %lu",
     214           0 :                   (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
     215           0 :   }
     216           0 :   return rv;
     217           0 : }
     218             : 
     219             : static inline int
     220           0 : block_is_promotable( fd_sched_block_t * block ) {
     221           0 :   return block_is_stageable( block ) && !block->staged;
     222           0 : }
     223             : 
     224             : static inline int
     225           0 : block_is_activatable( fd_sched_block_t * block ) {
     226           0 :   return block_is_stageable( block ) && block_is_dispatchable( block ) && block->staged;
     227           0 : }
     228             : 
     229             : FD_FN_UNUSED static void
     230           0 : debug_print_block( fd_sched_block_t * block ) {
     231           0 :   FD_LOG_INFO(( "block slot %lu, prime %lu, staged %d (lane %lu), dying %d, in_rdisp %d, fec_eos %d, rooted %d, block_start_signaled %d, block_end_signaled %d, txn_parsed_cnt %u, txn_in_flight_cnt %u, txn_done_cnt %u, shred_cnt %u",
     232           0 :                 (ulong)block->block_id.slot, (ulong)block->block_id.prime, block->staged, block->staging_lane, block->dying, block->in_rdisp, block->fec_eos, block->rooted, block->block_start_signaled, block->block_end_signaled, block->txn_parsed_cnt, block->txn_in_flight_cnt, block->txn_done_cnt, block->shred_cnt ));
     233           0 : }
     234             : 
     235             : FD_FN_UNUSED static void
     236           0 : debug_print_metrics( fd_sched_t * sched ) {
     237           0 :   FD_LOG_INFO(( "metrics: block_added_cnt %u, block_added_staged_cnt %u, block_added_unstaged_cnt %u, block_added_dead_ood_cnt %u, block_removed_cnt %u, block_abandoned_cnt %u, block_promoted_cnt %u, block_demoted_cnt %u, deactivate_no_child_cnt %u, deactivate_no_txn_cnt %u, deactivate_pruned_cnt %u, deactivate_abandoned_cnt %u, lane_switch_cnt %u, lane_promoted_cnt %u, lane_demoted_cnt %u, alut_success_cnt %u, alut_serializing_cnt %u, txn_abandoned_parsed_cnt %u, txn_abandoned_done_cnt %u, txn_max_in_flight_cnt %u, txn_weighted_in_flight_cnt %lu, txn_weighted_in_flight_tickcount %lu, txn_none_in_flight_tickcount %lu, txn_parsed_cnt %lu, txn_done_cnt %lu, bytes_ingested_cnt %lu, bytes_ingested_unparsed_cnt %lu, bytes_dropped_cnt %lu, fec_cnt %lu",
     238           0 :                 sched->metrics->block_added_cnt, sched->metrics->block_added_staged_cnt, sched->metrics->block_added_unstaged_cnt, sched->metrics->block_added_dead_ood_cnt, sched->metrics->block_removed_cnt, sched->metrics->block_abandoned_cnt, sched->metrics->block_promoted_cnt, sched->metrics->block_demoted_cnt, sched->metrics->deactivate_no_child_cnt, sched->metrics->deactivate_no_txn_cnt, sched->metrics->deactivate_pruned_cnt, sched->metrics->deactivate_abandoned_cnt, sched->metrics->lane_switch_cnt, sched->metrics->lane_promoted_cnt, sched->metrics->lane_demoted_cnt, sched->metrics->alut_success_cnt, sched->metrics->alut_serializing_cnt, sched->metrics->txn_abandoned_parsed_cnt, sched->metrics->txn_abandoned_done_cnt, sched->metrics->txn_max_in_flight_cnt, sched->metrics->txn_weighted_in_flight_cnt, sched->metrics->txn_weighted_in_flight_tickcount, sched->metrics->txn_none_in_flight_tickcount, sched->metrics->txn_parsed_cnt, sched->metrics->txn_done_cnt, sched->metrics->bytes_ingested_cnt, sched->metrics->bytes_ingested_unparsed_cnt, sched->metrics->bytes_dropped_cnt, sched->metrics->fec_cnt ));
     239           0 : }
     240             : 
     241             : /* Public functions. */
     242             : 
     243           0 : ulong fd_sched_align( void ) {
     244           0 :   return fd_ulong_max( alignof(fd_sched_t),
     245           0 :          fd_ulong_max( fd_rdisp_align(),
     246           0 :          fd_ulong_max( block_map_align(),
     247           0 :          fd_ulong_max( block_pool_align(), 64UL )))); /* Minimally cache line aligned. */
     248           0 : }
     249             : 
     250             : ulong
     251           0 : fd_sched_footprint( void ) {
     252           0 :   ulong chain_cnt = block_map_chain_cnt_est( FD_SCHED_MAX_BLOCK_DEPTH );
     253             : 
     254           0 :   ulong l = FD_LAYOUT_INIT;
     255           0 :   l = FD_LAYOUT_APPEND( l, fd_sched_align(),   sizeof(fd_sched_t)                                                   );
     256           0 :   l = FD_LAYOUT_APPEND( l, fd_rdisp_align(),   fd_rdisp_footprint  ( FD_SCHED_MAX_DEPTH, FD_SCHED_MAX_BLOCK_DEPTH ) ); /* dispatcher */
     257           0 :   l = FD_LAYOUT_APPEND( l, block_map_align(),  block_map_footprint ( chain_cnt                                    ) ); /* block map  */
     258           0 :   l = FD_LAYOUT_APPEND( l, block_pool_align(), block_pool_footprint( FD_SCHED_MAX_BLOCK_DEPTH                     ) ); /* block pool */
     259           0 :   return FD_LAYOUT_FINI( l, fd_sched_align() );
     260           0 : }
     261             : 
     262             : void *
     263           0 : fd_sched_new( void * mem ) {
     264           0 :   ulong chain_cnt = block_map_chain_cnt_est( FD_SCHED_MAX_BLOCK_DEPTH );
     265             : 
     266           0 :   FD_SCRATCH_ALLOC_INIT( l, mem );
     267           0 :   fd_sched_t * sched = FD_SCRATCH_ALLOC_APPEND( l, fd_sched_align(),   sizeof(fd_sched_t)                                                   );
     268           0 :   void * _rdisp      = FD_SCRATCH_ALLOC_APPEND( l, fd_rdisp_align(),   fd_rdisp_footprint  ( FD_SCHED_MAX_DEPTH, FD_SCHED_MAX_BLOCK_DEPTH ) );
     269           0 :   void * _bmap       = FD_SCRATCH_ALLOC_APPEND( l, block_map_align(),  block_map_footprint ( chain_cnt                                    ) );
     270           0 :   void * _bpool      = FD_SCRATCH_ALLOC_APPEND( l, block_pool_align(), block_pool_footprint( FD_SCHED_MAX_BLOCK_DEPTH                     ) );
     271           0 :   FD_SCRATCH_ALLOC_FINI( l, fd_sched_align() );
     272             : 
     273           0 :   ulong seed = ((ulong)fd_tickcount()) ^ FD_SCHED_MAGIC;
     274           0 :   fd_rdisp_new  ( _rdisp, FD_SCHED_MAX_DEPTH, FD_SCHED_MAX_BLOCK_DEPTH, seed );
     275           0 :   block_map_new ( _bmap, chain_cnt, seed+1UL                                 );
     276           0 :   block_pool_new( _bpool, FD_SCHED_MAX_BLOCK_DEPTH                           );
     277             : 
     278           0 :   fd_sched_block_t * _bpool_join = block_pool_join( _bpool );
     279           0 :   ulong null_idx = block_pool_idx_null( _bpool_join );
     280           0 :   block_pool_leave( _bpool_join );
     281             : 
     282           0 :   fd_memset( sched->metrics, 0, sizeof(fd_sched_metrics_t) );
     283           0 :   sched->txn_in_flight_last_tick = LONG_MAX;
     284             : 
     285           0 :   sched->root_idx         = null_idx;
     286           0 :   sched->active_block_idx = null_idx;
     287           0 :   sched->staged_bitset    = 0UL;
     288             : 
     289           0 :   sched->txn_pool_free_cnt = FD_SCHED_MAX_DEPTH-1UL; /* -1 because index 0 is unusable as a sentinel reserved by the dispatcher */
     290             : 
     291           0 :   return sched;
     292           0 : }
     293             : 
     294             : fd_sched_t *
     295           0 : fd_sched_join( void * mem ) {
     296           0 :   fd_sched_t * sched = (fd_sched_t *)mem;
     297             : 
     298           0 :   ulong chain_cnt = block_map_chain_cnt_est( FD_SCHED_MAX_BLOCK_DEPTH );
     299             : 
     300           0 :   FD_SCRATCH_ALLOC_INIT( l, mem );
     301           0 :   /*                */ FD_SCRATCH_ALLOC_APPEND( l, fd_sched_align(),   sizeof(fd_sched_t)                                                   );
     302           0 :   void  * _rdisp     = FD_SCRATCH_ALLOC_APPEND( l, fd_rdisp_align(),   fd_rdisp_footprint  ( FD_SCHED_MAX_DEPTH, FD_SCHED_MAX_BLOCK_DEPTH ) );
     303           0 :   void  * _bmap      = FD_SCRATCH_ALLOC_APPEND( l, block_map_align(),  block_map_footprint ( chain_cnt                                    ) );
     304           0 :   void  * _bpool     = FD_SCRATCH_ALLOC_APPEND( l, block_pool_align(), block_pool_footprint( FD_SCHED_MAX_BLOCK_DEPTH                     ) );
     305           0 :   FD_SCRATCH_ALLOC_FINI( l, fd_sched_align() );
     306             : 
     307           0 :   sched->rdisp      = fd_rdisp_join( _rdisp );
     308           0 :   sched->block_map  = block_map_join( _bmap );
     309           0 :   sched->block_pool = block_pool_join( _bpool );
     310             : 
     311           0 :   return sched;
     312           0 : }
     313             : 
     314             : int
     315           0 : fd_sched_fec_can_ingest( fd_sched_t * sched, fd_sched_fec_t * fec ) {
     316           0 :   if( FD_UNLIKELY( fec->fec->data_sz>FD_SCHED_MAX_PAYLOAD_PER_FEC ) ) {
     317           0 :     FD_LOG_CRIT(( "invalid FEC set: fec->data_sz %lu, fec->mr %s, slot %lu, prime %lu, parent slot %lu, parent prime %lu",
     318           0 :                   fec->fec->data_sz, FD_BASE58_ENC_32_ALLOCA( fec->fec->key.mr.hash ), (ulong)fec->block_id.slot, (ulong)fec->block_id.prime, (ulong)fec->parent_block_id.slot, (ulong)fec->parent_block_id.prime ));
     319           0 :   }
     320             : 
     321           0 :   ulong fec_buf_sz = 0UL;
     322           0 :   fd_sched_block_t * block = block_map_ele_query( sched->block_map, &fec->block_id, NULL, sched->block_pool );
     323           0 :   if( FD_LIKELY( block ) ) {
     324           0 :     fec_buf_sz += block->fec_buf_sz-block->fec_buf_soff;
     325           0 :   } else {
     326             :     /* This FEC set will need to allocate a new block from the pool. */
     327           0 :     if( FD_UNLIKELY( !block_pool_free( sched->block_pool ) ) ) {
     328           0 :       return 0;
     329           0 :     }
     330           0 :   }
     331             :   /* Addition is safe and won't overflow because we checked the FEC set
     332             :      size above. */
     333           0 :   fec_buf_sz += fec->fec->data_sz;
     334             :   /* Assuming every transaction is min size, do we have enough free
     335             :      entries in the txn pool?  For a more precise txn count, we would
     336             :      have to do some parsing. */
     337           0 :   return sched->txn_pool_free_cnt>=fec_buf_sz/FD_TXN_MIN_SERIALIZED_SZ;
     338           0 : }
     339             : 
     340             : int
     341           0 : fd_sched_can_ingest( fd_sched_t * sched ) {
     342             :   /* Assume worst case we will need to allocate a new block from the
     343             :      pool. */
     344           0 :   if( FD_UNLIKELY( !block_pool_free( sched->block_pool ) ) ) {
     345           0 :     return 0;
     346           0 :   }
     347             : 
     348             :   /* Worst case, we need one byte from the incoming data to extract a
     349             :      transaction out of the residual data, and the rest of the incoming
     350             :      data contributes toward min sized transactions. */
     351           0 :   ulong txn_cnt = (FD_SCHED_MAX_PAYLOAD_PER_FEC-1UL)/FD_TXN_MIN_SERIALIZED_SZ+1UL; /* 478 */
     352           0 :   return sched->txn_pool_free_cnt>=txn_cnt;
     353           0 : }
     354             : 
     355             : void
     356           0 : fd_sched_fec_ingest( fd_sched_t * sched, fd_sched_fec_t * fec ) {
     357           0 :   if( FD_UNLIKELY( fec->fec->data_sz>FD_SCHED_MAX_PAYLOAD_PER_FEC ) ) {
     358           0 :     FD_LOG_CRIT(( "invalid FEC set: fec->data_sz %lu, fec->mr %s, slot %lu, prime %lu, parent slot %lu, parent prime %lu",
     359           0 :                   fec->fec->data_sz, FD_BASE58_ENC_32_ALLOCA( fec->fec->key.mr.hash ), (ulong)fec->block_id.slot, (ulong)fec->block_id.prime, (ulong)fec->parent_block_id.slot, (ulong)fec->parent_block_id.prime ));
     360           0 :   }
     361             : 
     362           0 :   ulong null_idx = block_pool_idx_null( sched->block_pool );
     363             : 
     364           0 :   fd_sched_block_t * block = block_map_ele_query( sched->block_map, &fec->block_id, NULL, sched->block_pool );
     365           0 :   if( FD_UNLIKELY( !block ) ) {
     366             :     /* This is a new block. */
     367           0 :     fd_sched_block_t * parent_block = NULL;
     368           0 :     add_block( sched, &fec->block_id, &fec->parent_block_id, &block, &parent_block );
     369             : 
     370           0 :     if( FD_UNLIKELY( block->dying ) ) {
     371             :       /* The child of a dead block is also dead.  We added it to our
     372             :          fork tree just so we could track an entire lineage of dead
     373             :          children and propagate the dead property to the entire lineage,
     374             :          in case there were frags for more than one dead children
     375             :          in-flight at the time the parent was abandoned.  That being
     376             :          said, we shouldn't need to add the dead child to the
     377             :          dispatcher. */
     378           0 :       sched->metrics->block_added_dead_ood_cnt++;
     379             : 
     380             :       /* Ignore the FEC set for a dead block. */
     381           0 :       sched->metrics->bytes_dropped_cnt += fec->fec->data_sz;
     382           0 :       return;
     383           0 :     }
     384             : 
     385             :     /* Try to find a staging lane for this block. */
     386           0 :     int alloc_lane = 0;
     387           0 :     if( FD_LIKELY( parent_block->staged ) ) {
     388             :       /* Parent is staged.  So see if we can continue down the same
     389             :          staging lane. */
     390           0 :       ulong staging_lane = parent_block->staging_lane;
     391           0 :       ulong child_idx    = parent_block->child_idx;
     392           0 :       while( child_idx != null_idx ) {
     393           0 :         fd_sched_block_t * child = block_pool_ele( sched->block_pool, child_idx );
     394           0 :         if( child->staged && child->staging_lane==staging_lane ) {
     395             :           /* Found a child on the same lane.  So we're done. */
     396           0 :           staging_lane = FD_RDISP_UNSTAGED;
     397           0 :           break;
     398           0 :         }
     399           0 :         child_idx = child->sibling_idx;
     400           0 :       }
     401             :       /* No child is staged on the same lane as the parent.  So stage
     402             :          this block.  This is the common case. */
     403           0 :       if( FD_LIKELY( staging_lane!=FD_RDISP_UNSTAGED ) ) {
     404           0 :         block->in_rdisp     = 1;
     405           0 :         block->staged       = 1;
     406           0 :         block->staging_lane = staging_lane;
     407           0 :         fd_rdisp_add_block( sched->rdisp, block->block_id.id, staging_lane );
     408           0 :         sched->metrics->block_added_cnt++;
     409           0 :         sched->metrics->block_added_staged_cnt++;
     410           0 :       } else {
     411           0 :         alloc_lane = 1;
     412           0 :       }
     413           0 :     } else {
     414           0 :       if( block_is_stageable( parent_block ) ) {
     415             :         /* Parent is unstaged but stageable.  So let's be unstaged too.
     416             :            This is a policy decision to be lazy and not promote parent
     417             :            at the moment. */
     418           0 :         block->in_rdisp = 1;
     419           0 :         block->staged   = 0;
     420           0 :         fd_rdisp_add_block( sched->rdisp, block->block_id.id, FD_RDISP_UNSTAGED );
     421           0 :         sched->metrics->block_added_cnt++;
     422           0 :         sched->metrics->block_added_unstaged_cnt++;
     423           0 :       } else {
     424           0 :         alloc_lane = 1;
     425           0 :       }
     426           0 :     }
     427           0 :     if( FD_UNLIKELY( alloc_lane ) ) {
     428             :       /* We weren't able to inherit the parent's staging lane.  So try
     429             :          to find a new staging lane. */
     430           0 :       if( FD_LIKELY( sched->staged_bitset!=fd_ulong_mask_lsb( FD_SCHED_MAX_STAGING_LANES ) ) ) { /* Optimize for lane available. */
     431           0 :         int lane_idx = fd_ulong_find_lsb( ~sched->staged_bitset );
     432           0 :         if( FD_UNLIKELY( lane_idx>=(int)FD_SCHED_MAX_STAGING_LANES ) ) {
     433           0 :           FD_LOG_CRIT(( "invariant violation: lane_idx %d, sched->staged_bitset %lx",
     434           0 :                         lane_idx, sched->staged_bitset ));
     435           0 :         }
     436           0 :         sched->staged_bitset = fd_ulong_set_bit( sched->staged_bitset, lane_idx );
     437           0 :         sched->staged_head_block_idx[ lane_idx ] = block_pool_idx( sched->block_pool, block );
     438           0 :         block->in_rdisp     = 1;
     439           0 :         block->staged       = 1;
     440           0 :         block->staging_lane = (ulong)lane_idx;
     441           0 :         fd_rdisp_add_block( sched->rdisp, block->block_id.id, block->staging_lane );
     442           0 :         sched->metrics->block_added_cnt++;
     443           0 :         sched->metrics->block_added_staged_cnt++;
     444           0 :       } else {
     445             :         /* No lanes available. */
     446           0 :         block->in_rdisp = 1;
     447           0 :         block->staged   = 0;
     448           0 :         fd_rdisp_add_block( sched->rdisp, block->block_id.id, FD_RDISP_UNSTAGED );
     449           0 :         sched->metrics->block_added_cnt++;
     450           0 :         sched->metrics->block_added_unstaged_cnt++;
     451           0 :       }
     452           0 :     }
     453           0 :   }
     454             : 
     455           0 :   if( FD_UNLIKELY( block->dying ) ) {
     456             :     /* Ignore the FEC set for a dead block. */
     457           0 :     sched->metrics->bytes_dropped_cnt += fec->fec->data_sz;
     458           0 :     return;
     459           0 :   }
     460             : 
     461           0 :   if( FD_UNLIKELY( !block->in_rdisp ) ) {
     462             :     /* Invariant: block must be in the dispatcher at this point. */
     463           0 :     FD_LOG_CRIT(( "invariant violation: block->in_rdisp==0, slot %lu, prime %lu",
     464           0 :                   (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
     465           0 :   }
     466             : 
     467           0 :   if( FD_UNLIKELY( block->fec_eos ) ) {
     468             :     /* This means something is wrong upstream. */
     469           0 :     FD_LOG_CRIT(( "invariant violation: block->fec_eos set but getting more FEC sets fec->mr %s, slot %lu, prime %lu, parent slot %lu, parent prime %lu",
     470           0 :                   FD_BASE58_ENC_32_ALLOCA( fec->fec->key.mr.hash ), (ulong)fec->block_id.slot, (ulong)fec->block_id.prime, (ulong)fec->parent_block_id.slot, (ulong)fec->parent_block_id.prime ));
     471           0 :   }
     472           0 :   if( FD_UNLIKELY( block->fec_eob && fec->is_last_in_batch ) ) {
     473             :     /* This means the previous batch didn't parse properly.  So this is
     474             :        a bad block.  We should refuse to replay down the fork. */
     475           0 :     FD_LOG_WARNING(( "invariant violation: block->fec_eob set but getting another FEC set that is last in batch fec->mr %s, slot %lu, prime %lu, parent slot %lu, parent prime %lu",
     476           0 :                      FD_BASE58_ENC_32_ALLOCA( fec->fec->key.mr.hash ), (ulong)fec->block_id.slot, (ulong)fec->block_id.prime, (ulong)fec->parent_block_id.slot, (ulong)fec->parent_block_id.prime ));
     477           0 :     block->dying = 1;//FIXME inform replay/banks that it's dead?
     478           0 :     sched->metrics->bytes_dropped_cnt += fec->fec->data_sz;
     479           0 :     return;
     480           0 :   }
     481           0 :   if( FD_UNLIKELY( block->child_idx!=null_idx ) ) {
     482             :     /* This means something is wrong upstream.  FEC sets are not being
     483             :        delivered in replay order. */
     484           0 :     FD_LOG_CRIT(( "invariant violation: block->child_idx %lu, fec->mr %s, slot %lu, prime %lu, parent slot %lu, parent prime %lu",
     485           0 :                   block->child_idx, FD_BASE58_ENC_32_ALLOCA( fec->fec->key.mr.hash ), (ulong)fec->block_id.slot, (ulong)fec->block_id.prime, (ulong)fec->parent_block_id.slot, (ulong)fec->parent_block_id.prime ));
     486           0 :   }
     487             : 
     488           0 :   FD_TEST( block->fec_buf_sz>=block->fec_buf_soff );
     489           0 :   if( FD_LIKELY( block->fec_buf_sz>block->fec_buf_soff ) ) {
     490             :     /* If there is residual data from the previous FEC set within the
     491             :        same batch, we move it to the beginning of the buffer and append
     492             :        the new FEC set. */
     493           0 :     memmove( block->fec_buf, block->fec_buf+block->fec_buf_soff, block->fec_buf_sz-block->fec_buf_soff );
     494           0 :   }
     495           0 :   block->fec_buf_sz  -= block->fec_buf_soff;
     496           0 :   block->fec_buf_soff = 0;
     497             :   /* Addition is safe and won't overflow because we checked the FEC
     498             :      set size above. */
     499           0 :   if( FD_UNLIKELY( block->fec_buf_sz-block->fec_buf_soff+fec->fec->data_sz>FD_SCHED_MAX_FEC_BUF_SZ ) ) {
     500             :     /* In a conformant block, there shouldn't be more than a
     501             :        transaction's worth of residual data left over from the previous
     502             :        FEC set within the same batch.  So if this condition doesn't
     503             :        hold, it's a bad block.  Instead of crashing, we should refuse to
     504             :        replay down the fork. */
     505           0 :     FD_LOG_WARNING(( "bad block: fec_buf_sz %u, fec_buf_soff %u, fec->data_sz %lu, fec->mr %s, slot %lu, prime %lu, parent slot %lu, parent prime %lu",
     506           0 :                      block->fec_buf_sz, block->fec_buf_soff, fec->fec->data_sz, FD_BASE58_ENC_32_ALLOCA( fec->fec->key.mr.hash ), (ulong)fec->block_id.slot, (ulong)fec->block_id.prime, (ulong)fec->parent_block_id.slot, (ulong)fec->parent_block_id.prime ));
     507           0 :     block->dying = 1;//FIXME inform replay/banks that it's dead?
     508           0 :     sched->metrics->bytes_dropped_cnt += fec->fec->data_sz;
     509           0 :     return;
     510           0 :   }
     511             : 
     512           0 :   block->shred_cnt += fec->shred_cnt;
     513           0 :   sched->metrics->fec_cnt++;
     514             : 
     515             :   /* Append the new FEC set to the end of the buffer. */
     516           0 :   fd_memcpy( block->fec_buf+block->fec_buf_sz, fec->fec->data, fec->fec->data_sz );
     517           0 :   block->fec_buf_sz += (uint)fec->fec->data_sz;
     518           0 :   sched->metrics->bytes_ingested_cnt += fec->fec->data_sz;
     519             : 
     520           0 :   block->fec_eob = fec->is_last_in_batch;
     521           0 :   block->fec_eos = fec->is_last_in_block;
     522             : 
     523           0 :   fd_sched_parse( sched, block, fec->alut_ctx );
     524             : 
     525             :   /* Check if we need to set the active block. */
     526           0 :   if( FD_UNLIKELY( sched->active_block_idx==null_idx ) ) {
     527           0 :     try_activate_block( sched );
     528           0 :   } else {
     529           0 :     fd_sched_block_t * active_block = block_pool_ele( sched->block_pool, sched->active_block_idx );
     530           0 :     if( FD_UNLIKELY( !block_is_activatable( active_block ) ) ) {
     531           0 :       FD_LOG_CRIT(( "invariant violation: active_block_idx %lu is not activatable, txn_parsed_cnt %u, txn_done_cnt %u, fec_eos %u, dying %u, slot %lu, prime %lu",
     532           0 :                     sched->active_block_idx, active_block->txn_parsed_cnt, active_block->txn_done_cnt, (uint)active_block->fec_eos, (uint)active_block->dying, (ulong)active_block->block_id.slot, (ulong)active_block->block_id.prime ));
     533           0 :     }
     534           0 :   }
     535             : 
     536           0 :   return;
     537           0 : }
     538             : 
     539             : ulong
     540           0 : fd_sched_txn_next_ready( fd_sched_t * sched, fd_sched_txn_ready_t * out_txn ) {
     541           0 :   ulong null_idx = block_pool_idx_null( sched->block_pool );
     542           0 :   if( FD_UNLIKELY( sched->active_block_idx==null_idx ) ) {
     543             :     /* No need to try activating a block.  If we're in this state,
     544             :        there's truly nothing to execute.  We will activate something
     545             :        when we ingest a FEC set with transactions. */
     546           0 :     return 0UL;
     547           0 :   }
     548             : 
     549           0 :   out_txn->txn_id      = FD_SCHED_TXN_ID_NULL;
     550           0 :   out_txn->block_start = 0;
     551           0 :   out_txn->block_end   = 0;
     552             : 
     553             :   /* We could in theory reevaluate staging lane allocation here and do
     554             :      promotion/demotion as needed.  It's a policy decision to minimize
     555             :      fork churn for now and just execute down the same active fork. */
     556             : 
     557           0 :   fd_sched_block_t * block = block_pool_ele( sched->block_pool, sched->active_block_idx );
     558           0 :   if( FD_UNLIKELY( !block_is_activatable( block ) ) ) {
     559           0 :     FD_LOG_CRIT(( "invariant violation: active_block_idx %lu is not activatable, txn_parsed_cnt %u, txn_done_cnt %u, fec_eos %u, dying %u, slot %lu, prime %lu",
     560           0 :                   sched->active_block_idx, block->txn_parsed_cnt, block->txn_done_cnt, (uint)block->fec_eos, (uint)block->dying, (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
     561           0 :   }
     562             : 
     563           0 :   if( FD_UNLIKELY( !block->block_start_signaled ) ) {
     564           0 :     out_txn->txn_id           = FD_SCHED_TXN_ID_BLOCK_START;
     565           0 :     out_txn->block_id         = block->block_id;
     566           0 :     out_txn->parent_block_id  = block_pool_ele( sched->block_pool, block->parent_idx )->block_id;
     567           0 :     out_txn->block_start      = 1;
     568             : 
     569           0 :     block->block_start_signaled = 1;
     570           0 :     return 1UL;
     571           0 :   }
     572             : 
     573           0 :   ulong txn_queued_cnt = block->txn_parsed_cnt-block->txn_in_flight_cnt-block->txn_done_cnt;
     574           0 :   if( FD_LIKELY( txn_queued_cnt>0 ) ) { /* Optimize for no fork switching. */
     575           0 :     out_txn->txn_id          = fd_rdisp_get_next_ready( sched->rdisp, block->block_id.id );
     576           0 :     if( FD_UNLIKELY( out_txn->txn_id==0UL ) ) {
     577             :       /* There are transactions queued but none ready for execution.
     578             :          This implies that there must be in-flight transactions on whose
     579             :          completion the queued transactions depend. So we return and
     580             :          wait for those in-flight transactions to retire.  This is a
     581             :          policy decision to execute as much as we can down the current
     582             :          fork. */
     583           0 :       if( FD_UNLIKELY( !block->txn_in_flight_cnt ) ) {
     584           0 :         FD_LOG_CRIT(( "invariant violation: no ready transaction found but block->txn_in_flight_cnt==0, txn_parsed_cnt %u, txn_queued_cnt %lu, fec_eos %u, slot %lu, prime %lu",
     585           0 :                       block->txn_parsed_cnt, txn_queued_cnt, (uint)block->fec_eos, (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
     586           0 :       }
     587           0 :       return 0UL;
     588           0 :     }
     589           0 :     out_txn->block_id         = block->block_id;
     590           0 :     out_txn->parent_block_id  = block_pool_ele( sched->block_pool, block->parent_idx )->block_id;
     591             : 
     592           0 :     long now = fd_tickcount();
     593           0 :     ulong delta = (ulong)(now-sched->txn_in_flight_last_tick);
     594           0 :     sched->metrics->txn_none_in_flight_tickcount     += fd_ulong_if( block->txn_in_flight_cnt==0U && sched->txn_in_flight_last_tick!=LONG_MAX, delta, 0UL );
     595           0 :     sched->metrics->txn_weighted_in_flight_tickcount += fd_ulong_if( block->txn_in_flight_cnt!=0U, delta, 0UL );
     596           0 :     sched->metrics->txn_weighted_in_flight_cnt       += delta*block->txn_in_flight_cnt;
     597           0 :     sched->txn_in_flight_last_tick = now;
     598             : 
     599           0 :     block->txn_in_flight_cnt++;
     600           0 :     txn_queued_cnt--;
     601           0 :     sched->metrics->txn_max_in_flight_cnt = fd_uint_max( sched->metrics->txn_max_in_flight_cnt, block->txn_in_flight_cnt );
     602           0 :     return 1UL;
     603           0 :   }
     604             : 
     605           0 :   if( FD_UNLIKELY( block_should_signal_end( block ) ) ) {
     606           0 :     FD_TEST( block->block_start_signaled );
     607           0 :     out_txn->txn_id           = FD_SCHED_TXN_ID_BLOCK_END;
     608           0 :     out_txn->block_id         = block->block_id;
     609           0 :     out_txn->parent_block_id  = block_pool_ele( sched->block_pool, block->parent_idx )->block_id;
     610           0 :     out_txn->block_end        = 1;
     611             : 
     612           0 :     block->block_end_signaled = 1;
     613           0 :     return 1UL;
     614           0 :   }
     615             : 
     616             :   /* Nothing queued for the active block.  If we haven't received all
     617             :      the FEC sets for it, then return and wait for more FEC sets, while
     618             :      there are in-flight transactions.  This is a policy decision to
     619             :      minimize fork churn and allow for executing down the current fork
     620             :      as much as we can.  If we have received all the FEC sets for it,
     621             :      then we'd still like to return and wait for the in-flight
     622             :      transactions to retire, before switching to a different block.
     623             : 
     624             :      Either way, there should be in-flight transactions.  We deactivate
     625             :      the active block the moment we exhausted transactions from it. */
     626           0 :   if( FD_UNLIKELY( !block->txn_in_flight_cnt ) ) {
     627           0 :     FD_LOG_CRIT(( "invariant violation: expected in-flight transactions but none, txn_parsed_cnt %u, txn_done_cnt %u, fec_eos %u, slot %lu, prime %lu",
     628           0 :                   block->txn_parsed_cnt, block->txn_done_cnt, (uint)block->fec_eos, (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
     629           0 :   }
     630             : 
     631           0 :   return 0UL;
     632           0 : }
     633             : 
     634             : void
     635           0 : fd_sched_txn_done( fd_sched_t * sched, ulong txn_id ) {
     636           0 :   FD_TEST( txn_id!=FD_SCHED_TXN_ID_NULL );
     637             : 
     638           0 :   ulong              null_idx  = block_pool_idx_null( sched->block_pool );
     639           0 :   ulong              block_idx = fd_ulong_if( txn_id==FD_SCHED_TXN_ID_BLOCK_START||txn_id==FD_SCHED_TXN_ID_BLOCK_END, sched->active_block_idx, sched->txn_to_block_idx[ txn_id ] );
     640           0 :   fd_sched_block_t * block     = block_pool_ele( sched->block_pool, block_idx );
     641             : 
     642           0 :   if( FD_UNLIKELY( !block->staged ) ) {
     643             :     /* Invariant: only staged blocks can have in-flight transactions. */
     644           0 :     FD_LOG_CRIT(( "invariant violation: block->staged==0, slot %lu, prime %lu",
     645           0 :                   (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
     646           0 :   }
     647           0 :   if( FD_UNLIKELY( !block->in_rdisp ) ) {
     648             :     /* Invariant: staged blocks must be in the dispatcher. */
     649           0 :     FD_LOG_CRIT(( "invariant violation: block->in_rdisp==0, slot %lu, prime %lu",
     650           0 :                   (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
     651           0 :   }
     652             : 
     653           0 :   if( FD_LIKELY( txn_id!=FD_SCHED_TXN_ID_BLOCK_START && txn_id!=FD_SCHED_TXN_ID_BLOCK_END ) ) {
     654           0 :     FD_TEST( txn_id<FD_SCHED_MAX_DEPTH );
     655           0 :     long now = fd_tickcount();
     656           0 :     ulong delta = (ulong)(now-sched->txn_in_flight_last_tick);
     657           0 :     sched->metrics->txn_weighted_in_flight_tickcount += delta;
     658           0 :     sched->metrics->txn_weighted_in_flight_cnt       += delta*block->txn_in_flight_cnt;
     659           0 :     sched->txn_in_flight_last_tick = now;
     660             : 
     661           0 :     block->txn_done_cnt++;
     662           0 :     block->txn_in_flight_cnt--;
     663           0 :     fd_rdisp_complete_txn( sched->rdisp, txn_id );
     664           0 :     sched->txn_pool_free_cnt++;
     665           0 :     sched->metrics->txn_done_cnt++;
     666           0 :   }
     667             : 
     668           0 :   if( FD_UNLIKELY( block->dying && block->txn_in_flight_cnt==0U ) ) {
     669           0 :     if( FD_UNLIKELY( sched->active_block_idx==block_idx ) ) {
     670           0 :       FD_LOG_CRIT(( "invariant violation: active block shouldn't be dying, block_idx %lu, slot %lu, prime %lu",
     671           0 :                     block_idx, (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
     672           0 :     }
     673           0 :     subtree_abandon( sched, block );
     674           0 :     return;
     675           0 :   }
     676             : 
     677           0 :   if( FD_UNLIKELY( !block->dying && sched->active_block_idx!=block_idx ) ) {
     678             :     /* Block is not dead.  So we should be actively replaying it. */
     679           0 :     fd_sched_block_t * active_block = block_pool_ele( sched->block_pool, sched->active_block_idx );
     680           0 :     FD_LOG_CRIT(( "invariant violation: sched->active_block_idx %lu, slot %lu, prime %lu, block_idx %lu, slot %lu, prime %lu",
     681           0 :                   sched->active_block_idx, (ulong)active_block->block_id.slot, (ulong)active_block->block_id.prime,
     682           0 :                   block_idx, (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
     683           0 :   }
     684             : 
     685           0 :   if( FD_UNLIKELY( block_is_done( block ) ) ) {
     686           0 :     block->in_rdisp = 0;
     687           0 :     block->staged   = 0;
     688           0 :     fd_rdisp_remove_block( sched->rdisp, block->block_id.id );
     689           0 :     sched->metrics->block_removed_cnt++;
     690             : 
     691             :     /* See if there is a child block down the same staging lane.  This
     692             :        is a policy decision to minimize fork churn.  We could in theory
     693             :        reevaluate staging lane allocation here and do promotion/demotion
     694             :        as needed. */
     695           0 :     ulong child_idx = block->child_idx;
     696           0 :     while( child_idx != null_idx ) {
     697           0 :       fd_sched_block_t * child = block_pool_ele( sched->block_pool, child_idx );
     698           0 :       if( FD_LIKELY( child->staged && child->staging_lane==block->staging_lane ) ) {
     699             :         /* There is a child block down the same staging lane.  So switch
     700             :            the active block to it, and have the child inherit the head
     701             :            status of the lane.  This is the common case. */
     702           0 :         sched->active_block_idx = child_idx;
     703           0 :         sched->staged_head_block_idx[ block->staging_lane ] = child_idx;
     704           0 :         if( FD_UNLIKELY( !fd_ulong_extract_bit( sched->staged_bitset, (int)block->staging_lane ) ) ) {
     705           0 :           FD_LOG_CRIT(( "invariant violation: staged_bitset 0x%lx bit %lu is not set, slot %lu, prime %lu, child slot %lu, prime %lu",
     706           0 :                         sched->staged_bitset, block->staging_lane, (ulong)block->block_id.slot, (ulong)block->block_id.prime, (ulong)child->block_id.slot, (ulong)child->block_id.prime ));
     707           0 :         }
     708           0 :         return;
     709           0 :       }
     710           0 :       child_idx = child->sibling_idx;
     711           0 :     }
     712             :     /* There isn't a child block down the same staging lane.  This is
     713             :        the last block in the staging lane.  Release the staging lane. */
     714           0 :     sched->staged_bitset = fd_ulong_clear_bit( sched->staged_bitset, (int)block->staging_lane );
     715           0 :     sched->staged_head_block_idx[ block->staging_lane ] = null_idx;
     716             : 
     717             :     /* Reset the active block. */
     718           0 :     sched->active_block_idx = null_idx;
     719           0 :     sched->metrics->deactivate_no_child_cnt++;
     720           0 :     try_activate_block( sched );
     721           0 :   } else if( !block_is_activatable( block ) ) {
     722             :     /* We exhaused the active block, but it's not fully done yet.  We
     723             :        are just not getting FEC sets for it fast enough.  This could
     724             :        happen when the network path is congested, or when the leader
     725             :        simply went down.  Reset the active block. */
     726           0 :     sched->active_block_idx = null_idx;
     727           0 :     sched->metrics->deactivate_no_txn_cnt++;
     728           0 :     try_activate_block( sched );
     729           0 :   }
     730           0 : }
     731             : 
     732             : void
     733           0 : fd_sched_block_abandon( fd_sched_t * sched, fd_sched_block_id_t * block_id ) {
     734           0 :   fd_sched_block_t * block = block_map_ele_query( sched->block_map, block_id, NULL, sched->block_pool );
     735           0 :   if( FD_UNLIKELY( !block ) ) {
     736           0 :     FD_LOG_CRIT(( "invariant violation: block not found slot %lu, prime %lu",
     737           0 :                   (ulong)block_id->slot, (ulong)block_id->prime ));
     738           0 :   }
     739           0 :   ulong block_idx = block_pool_idx( sched->block_pool, block );
     740           0 :   if( FD_UNLIKELY( block_idx!=sched->active_block_idx ) ) {
     741             :     /* Invariant: abandoning should only be performed on actively
     742             :        replayed blocks.  We impose this requirement on the caller
     743             :        because the dispatcher expects blocks to be abandoned in the same
     744             :        order that they were added, and having this requirement makes it
     745             :        easier to please the dispatcher. */
     746           0 :     FD_LOG_CRIT(( "invariant violation: active_block_idx %lu, block_idx %lu, slot %lu, prime %lu",
     747           0 :                   sched->active_block_idx, block_idx, (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
     748           0 :   }
     749             : 
     750           0 :   subtree_abandon( sched, block );
     751             : 
     752             :   /* Reset the active block. */
     753           0 :   ulong null_idx = block_pool_idx_null( sched->block_pool );
     754           0 :   sched->active_block_idx = null_idx;
     755           0 :   sched->metrics->deactivate_abandoned_cnt++;
     756           0 :   try_activate_block( sched );
     757           0 : }
     758             : 
     759             : int
     760           0 : fd_sched_block_is_done( fd_sched_t * sched, fd_sched_block_id_t * block_id ) {
     761           0 :   fd_sched_block_t * block = block_map_ele_query( sched->block_map, block_id, NULL, sched->block_pool );
     762           0 :   if( FD_UNLIKELY( !block ) ) {
     763           0 :     FD_LOG_CRIT(( "invariant violation: block not found slot %lu, prime %lu",
     764           0 :                   (ulong)block_id->slot, (ulong)block_id->prime ));
     765           0 :   }
     766           0 :   return block_is_done( block );
     767           0 : }
     768             : 
     769             : void
     770           0 : fd_sched_block_add_done( fd_sched_t * sched, fd_sched_block_id_t * block_id, fd_sched_block_id_t * parent_block_id ) {
     771           0 :   fd_sched_block_t * block        = NULL;
     772           0 :   fd_sched_block_t * parent_block = NULL;
     773           0 :   add_block( sched, block_id, parent_block_id, &block, &parent_block );
     774           0 :   block->txn_done_cnt = block->txn_parsed_cnt = UINT_MAX;
     775           0 :   block->fec_eos = 1;
     776           0 :   block->block_start_signaled = 1;
     777           0 :   block->block_end_signaled   = 1;
     778           0 :   if( FD_UNLIKELY( !parent_block_id ) ) {
     779             :     /* Assumes that a NULL parent implies the snapshot slot. */
     780           0 :     sched->root_idx = block_pool_idx( sched->block_pool, block );
     781           0 :     block->rooted = 1;
     782           0 :   }
     783           0 : }
     784             : 
     785             : void
     786           0 : fd_sched_root_publish( fd_sched_t * sched, fd_sched_block_id_t * root ) {
     787           0 :   ulong null_idx = block_pool_idx_null( sched->block_pool );
     788             : 
     789           0 :   fd_sched_block_t * new_root = block_map_ele_query( sched->block_map, root, NULL, sched->block_pool );
     790           0 :   if( FD_UNLIKELY( !new_root ) ) {
     791           0 :     FD_LOG_CRIT(( "invariant violation: new_root not found slot %lu, prime %lu",
     792           0 :                   (ulong)root->slot, (ulong)root->prime ));
     793           0 :   }
     794             : 
     795           0 :   fd_sched_block_t * old_root = block_pool_ele( sched->block_pool, sched->root_idx );
     796           0 :   if( FD_UNLIKELY( !old_root ) ) {
     797           0 :     FD_LOG_CRIT(( "invariant violation: old_root not found" ));
     798           0 :   }
     799           0 :   if( FD_UNLIKELY( !old_root->rooted ) ) {
     800           0 :     FD_LOG_CRIT(( "invariant violation: old_root is not rooted, slot %lu, prime %lu",
     801           0 :                   (ulong)old_root->block_id.slot, (ulong)old_root->block_id.prime ));
     802           0 :   }
     803             : 
     804             :   /* Early exit if the new root is the same as the old root. */
     805           0 :   if( FD_UNLIKELY( old_root->block_id.id==new_root->block_id.id ) ) {
     806           0 :     FD_LOG_WARNING(( "new root is the same as the old root, slot %lu, prime %lu",
     807           0 :                      (ulong)new_root->block_id.slot, (ulong)new_root->block_id.prime ));
     808           0 :     return;
     809           0 :   }
     810             : 
     811           0 :   fd_sched_block_t * head = block_map_ele_remove( sched->block_map, &old_root->block_id, NULL, sched->block_pool );
     812           0 :   head->next              = null_idx;
     813           0 :   fd_sched_block_t * tail = head;
     814             : 
     815           0 :   while( head ) {
     816           0 :     fd_sched_block_t * child = block_pool_ele( sched->block_pool, head->child_idx );
     817           0 :     while( child ) {
     818             :       /* Add children to be pruned. */
     819           0 :       if( child!=new_root ) {
     820           0 :         tail->next = block_map_idx_remove( sched->block_map, &child->block_id, null_idx, sched->block_pool );
     821           0 :         tail       = block_pool_ele( sched->block_pool, tail->next );
     822           0 :         tail->next = null_idx;
     823           0 :       }
     824           0 :       child = block_pool_ele( sched->block_pool, child->sibling_idx );
     825           0 :     }
     826             : 
     827             :     /* Prune the current block.  We will never publish halfway into a
     828             :        staging lane, because anything on the rooted fork should have
     829             :        finished replaying gracefully and be out of the dispatcher.  In
     830             :        fact, anything that we are publishing away should be out of the
     831             :        dispatcher at this point.  And there should be no more in-flight
     832             :        transactions. */
     833           0 :     if( FD_UNLIKELY( head->txn_in_flight_cnt ) ) {
     834           0 :       FD_LOG_CRIT(( "invariant violation: block has transactions in flight, slot %lu, prime %lu",
     835           0 :                     (ulong)head->block_id.slot, (ulong)head->block_id.prime ));
     836           0 :     }
     837           0 :     if( FD_UNLIKELY( head->in_rdisp ) ) {
     838             :       /* We should have removed it from the dispatcher when we were
     839             :          notified of the new root, or when in-flight transactions were
     840             :          drained. */
     841           0 :       FD_LOG_CRIT(( "invariant violation: block is in the dispatcher, slot %lu, prime %lu",
     842           0 :                     (ulong)head->block_id.slot, (ulong)head->block_id.prime ));
     843           0 :     }
     844           0 :     fd_sched_block_t * next = block_pool_ele( sched->block_pool, head->next );
     845           0 :     block_pool_ele_release( sched->block_pool, head );
     846           0 :     head = next;
     847           0 :   }
     848             : 
     849           0 :   new_root->parent_idx = null_idx;
     850           0 :   sched->root_idx = block_pool_idx( sched->block_pool, new_root );
     851           0 : }
     852             : 
     853             : void
     854           0 : fd_sched_root_notify( fd_sched_t * sched, fd_sched_block_id_t * root ) {
     855           0 :   ulong null_idx = block_pool_idx_null( sched->block_pool );
     856             : 
     857           0 :   fd_sched_block_t * block = block_map_ele_query( sched->block_map, root, NULL, sched->block_pool );
     858           0 :   if( FD_UNLIKELY( !block ) ) {
     859           0 :     FD_LOG_CRIT(( "invariant violation: block not found slot %lu, prime %lu",
     860           0 :                   (ulong)root->slot, (ulong)root->prime ));
     861           0 :   }
     862             : 
     863           0 :   fd_sched_block_t * old_root = block_pool_ele( sched->block_pool, sched->root_idx );
     864           0 :   if( FD_UNLIKELY( !old_root ) ) {
     865           0 :     FD_LOG_CRIT(( "invariant violation: old_root not found" ));
     866           0 :   }
     867           0 :   if( FD_UNLIKELY( !old_root->rooted ) ) {
     868           0 :     FD_LOG_CRIT(( "invariant violation: old_root is not rooted, slot %lu, prime %lu",
     869           0 :                   (ulong)old_root->block_id.slot, (ulong)old_root->block_id.prime ));
     870           0 :   }
     871             : 
     872             :   /* Early exit if the new root is the same as the old root. */
     873           0 :   if( FD_UNLIKELY( old_root->block_id.id==block->block_id.id ) ) {
     874           0 :     FD_LOG_WARNING(( "new root is the same as the old root, slot %lu, prime %lu",
     875           0 :                      (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
     876           0 :     return;
     877           0 :   }
     878             : 
     879             :   /* Mark every node from the new root up through its parents to the
     880             :      old root as being rooted. */
     881           0 :   fd_sched_block_t * curr = block;
     882           0 :   fd_sched_block_t * prev = NULL;
     883           0 :   while( curr ) {
     884           0 :     if( FD_UNLIKELY( !block_is_done( curr ) ) ) {
     885           0 :       FD_LOG_CRIT(( "invariant violation: rooting a block that is not done, slot %lu, prime %lu",
     886           0 :                     (ulong)curr->block_id.slot, (ulong)curr->block_id.prime ));
     887           0 :     }
     888           0 :     if( FD_UNLIKELY( curr->dying ) ) {
     889           0 :       FD_LOG_CRIT(( "invariant violation: rooting a block that is dying, slot %lu, prime %lu",
     890           0 :                     (ulong)curr->block_id.slot, (ulong)curr->block_id.prime ));
     891           0 :     }
     892           0 :     if( FD_UNLIKELY( curr->staged ) ) {
     893           0 :       FD_LOG_CRIT(( "invariant violation: rooting a block that is staged, slot %lu, prime %lu",
     894           0 :                     (ulong)curr->block_id.slot, (ulong)curr->block_id.prime ));
     895           0 :     }
     896           0 :     if( FD_UNLIKELY( curr->in_rdisp ) ) {
     897           0 :       FD_LOG_CRIT(( "invariant violation: rooting a block that is in the dispatcher, slot %lu, prime %lu",
     898           0 :                     (ulong)curr->block_id.slot, (ulong)curr->block_id.prime ));
     899           0 :     }
     900           0 :     curr->rooted = 1;
     901           0 :     prev = curr;
     902           0 :     curr = block_pool_ele( sched->block_pool, curr->parent_idx );
     903           0 :   }
     904             : 
     905             :   /* If we didn't reach the old root, the new root is not a descendant. */
     906           0 :   if( FD_UNLIKELY( prev!=old_root ) ) {
     907           0 :     FD_LOG_CRIT(( "invariant violation: new root is not a descendant of old root, new root slot %lu, prime %lu, old root slot %lu, prime %lu",
     908           0 :                   (ulong)block->block_id.slot, (ulong)block->block_id.prime, (ulong)old_root->block_id.slot, (ulong)old_root->block_id.prime ));
     909           0 :   }
     910             : 
     911           0 :   ulong old_active_block_idx = sched->active_block_idx;
     912             : 
     913             :   /* Now traverse from old root towards new root, and abandon all
     914             :      minority forks. */
     915           0 :   curr = old_root;
     916           0 :   while( curr && curr->rooted && curr!=block ) { /* curr!=block to avoid abandoning good forks. */
     917           0 :     fd_sched_block_t * rooted_child_block = NULL;
     918           0 :     ulong              child_idx          = curr->child_idx;
     919           0 :     while( child_idx!=null_idx ) {
     920           0 :       fd_sched_block_t * child = block_pool_ele( sched->block_pool, child_idx );
     921           0 :       if( child->rooted ) {
     922           0 :         rooted_child_block = child;
     923           0 :       } else {
     924             :         /* This is a minority fork. */
     925           0 :         subtree_abandon( sched, child );
     926           0 :       }
     927           0 :       child_idx = child->sibling_idx;
     928           0 :     }
     929           0 :     curr = rooted_child_block;
     930           0 :   }
     931             : 
     932             :   /* If the active block got abandoned, we need to reset it. */
     933           0 :   if( sched->active_block_idx==null_idx ) {
     934           0 :     sched->metrics->deactivate_pruned_cnt += fd_uint_if( old_active_block_idx!=null_idx, 1U, 0U );
     935           0 :     try_activate_block( sched );
     936           0 :   }
     937           0 : }
     938             : 
     939             : fd_txn_p_t *
     940           0 : fd_sched_get_txn( fd_sched_t * sched, ulong txn_id ) {
     941           0 :   if( FD_UNLIKELY( txn_id>=FD_SCHED_MAX_DEPTH ) ) {
     942           0 :     return NULL;
     943           0 :   }
     944           0 :   return sched->txn_pool + txn_id;
     945           0 : }
     946             : 
     947             : fd_hash_t *
     948           0 : fd_sched_get_poh( fd_sched_t * sched, fd_sched_block_id_t * block_id ) {
     949           0 :   fd_sched_block_t * block = block_map_ele_query( sched->block_map, block_id, NULL, sched->block_pool );
     950           0 :   if( FD_UNLIKELY( !block ) ) {
     951           0 :     FD_LOG_CRIT(( "invariant violation: block not found slot %lu, prime %lu",
     952           0 :                   (ulong)block_id->slot, (ulong)block_id->prime ));
     953           0 :   }
     954           0 :   return &block->poh;
     955           0 : }
     956             : 
     957             : uint
     958           0 : fd_sched_get_shred_cnt( fd_sched_t * sched, fd_sched_block_id_t * block_id ) {
     959           0 :   fd_sched_block_t * block = block_map_ele_query( sched->block_map, block_id, NULL, sched->block_pool );
     960           0 :   if( FD_UNLIKELY( !block ) ) {
     961           0 :     FD_LOG_CRIT(( "invariant violation: block not found slot %lu, prime %lu",
     962           0 :                   (ulong)block_id->slot, (ulong)block_id->prime ));
     963           0 :   }
     964           0 :   return block->shred_cnt;
     965           0 : }
     966             : 
     967           0 : void * fd_sched_leave ( fd_sched_t * sched ) { return sched; }
     968           0 : void * fd_sched_delete( void * mem         ) { return   mem; }
     969             : 
     970             : 
     971             : /* Internal helpers. */
     972             : 
     973             : static void
     974             : add_block( fd_sched_t *          sched,
     975             :            fd_sched_block_id_t * block_id,
     976             :            fd_sched_block_id_t * parent_block_id,
     977             :            fd_sched_block_t * *  out_block,
     978           0 :            fd_sched_block_t * *  out_parent_block ) {
     979           0 :   if( FD_UNLIKELY( !block_pool_free( sched->block_pool ) ) ) {
     980           0 :     FD_LOG_CRIT(( "block_pool is full" ));
     981           0 :   }
     982           0 :   fd_sched_block_t * block = block_pool_ele_acquire( sched->block_pool );
     983           0 :   block->block_id = *block_id;
     984           0 :   block_map_ele_insert( sched->block_map, block, sched->block_pool );
     985           0 :   *out_block = block;
     986             : 
     987           0 :   block->txn_parsed_cnt    = 0U;
     988           0 :   block->txn_in_flight_cnt = 0U;
     989           0 :   block->txn_done_cnt      = 0U;
     990           0 :   block->shred_cnt         = 0U;
     991             : 
     992           0 :   block->mblks_rem    = 0UL;
     993           0 :   block->txns_rem     = 0UL;
     994           0 :   block->fec_buf_sz   = 0U;
     995           0 :   block->fec_buf_soff = 0U;
     996           0 :   block->fec_eob      = 0;
     997           0 :   block->fec_sob      = 1;
     998             : 
     999           0 :   block->fec_eos              = 0;
    1000           0 :   block->rooted               = 0;
    1001           0 :   block->dying                = 0;
    1002           0 :   block->in_rdisp             = 0;
    1003           0 :   block->block_start_signaled = 0;
    1004           0 :   block->block_end_signaled   = 0;
    1005           0 :   block->staged               = 0;
    1006             : 
    1007           0 :   block->luf_depth = 0UL;
    1008             : 
    1009             :   /* New leaf node, no child, no sibling. */
    1010           0 :   ulong null_idx     = block_pool_idx_null( sched->block_pool );
    1011           0 :   block->child_idx   = null_idx;
    1012           0 :   block->sibling_idx = null_idx;
    1013           0 :   block->parent_idx  = null_idx;
    1014             : 
    1015             :   /* node->parent link */
    1016           0 :   if( FD_LIKELY( parent_block_id ) ) {
    1017           0 :     fd_sched_block_t * parent_block = block_map_ele_query( sched->block_map, parent_block_id, NULL, sched->block_pool );
    1018           0 :     if( FD_UNLIKELY( !parent_block ) ) {
    1019           0 :       FD_LOG_CRIT(( "invariant violation: parent block not found slot %lu, prime %lu",
    1020           0 :                     (ulong)parent_block_id->slot, (ulong)parent_block_id->prime ));
    1021           0 :     }
    1022           0 :     block->parent_idx = block_pool_idx( sched->block_pool, parent_block );
    1023           0 :     *out_parent_block = parent_block;
    1024             : 
    1025             :     /* parent->node and sibling->node links */
    1026           0 :     ulong child_idx = block_pool_idx( sched->block_pool, block );
    1027           0 :     if( FD_LIKELY( parent_block->child_idx == null_idx ) ) { /* Optimize for no forking. */
    1028           0 :       parent_block->child_idx = child_idx;
    1029           0 :     } else {
    1030           0 :       fd_sched_block_t * curr_block = block_pool_ele( sched->block_pool, parent_block->child_idx );
    1031           0 :       while( curr_block->sibling_idx != null_idx ) {
    1032           0 :         curr_block = block_pool_ele( sched->block_pool, curr_block->sibling_idx );
    1033           0 :       }
    1034           0 :       curr_block->sibling_idx = child_idx;
    1035           0 :     }
    1036             : 
    1037           0 :     if( FD_UNLIKELY( parent_block->dying ) ) {
    1038           0 :       block->dying = 1;
    1039           0 :     }
    1040           0 :   }
    1041           0 : }
    1042             : 
    1043           0 : #define CHECK( cond )  do {      \
    1044           0 :   if( FD_UNLIKELY( !(cond) ) ) { \
    1045           0 :     return;                      \
    1046           0 :   }                              \
    1047           0 : } while( 0 )
    1048             : 
    1049             : /* CHECK that it is safe to read at least n more bytes. */
    1050           0 : #define CHECK_LEFT( n ) CHECK( (n)<=(block->fec_buf_sz-block->fec_buf_soff) )
    1051             : 
    1052             : /* Consume as much as possible from the buffer.  By the end of this
    1053             :    function, we will either have residual data that is unparseable only
    1054             :    because it is a batch that straddles FEC set boundaries, or we will
    1055             :    have reached the end of a batch.  In the former case, any remaining
    1056             :    bytes should be concatenated with the next FEC set for further
    1057             :    parsing.  In the latter case, any remaining bytes should be thrown
    1058             :    away. */
    1059             : static void
    1060           0 : fd_sched_parse( fd_sched_t * sched, fd_sched_block_t * block, fd_sched_alut_ctx_t * alut_ctx ) {
    1061           0 :   while( 1 ) {
    1062           0 :     while( block->txns_rem>0UL ) {
    1063           0 :       if( FD_UNLIKELY( !fd_sched_parse_txn( sched, block, alut_ctx ) ) ) {
    1064           0 :         return;
    1065           0 :       }
    1066           0 :     }
    1067           0 :     if( block->txns_rem==0UL && block->mblks_rem>0UL ) {
    1068           0 :       CHECK_LEFT( sizeof(fd_microblock_hdr_t) );
    1069           0 :       fd_microblock_hdr_t * hdr = (fd_microblock_hdr_t *)fd_type_pun( block->fec_buf+block->fec_buf_soff );
    1070           0 :       block->fec_buf_soff      += (uint)sizeof(fd_microblock_hdr_t);
    1071             : 
    1072           0 :       memcpy( block->poh.hash, hdr->hash, sizeof(block->poh.hash) );
    1073           0 :       block->txns_rem = hdr->txn_cnt;
    1074           0 :       block->mblks_rem--;
    1075           0 :       continue;
    1076           0 :     }
    1077           0 :     if( block->txns_rem==0UL && block->mblks_rem==0UL && block->fec_sob ) {
    1078           0 :       CHECK_LEFT( sizeof(ulong) );
    1079           0 :       FD_TEST( block->fec_buf_soff==0U );
    1080           0 :       block->mblks_rem     = FD_LOAD( ulong, block->fec_buf );
    1081           0 :       block->fec_buf_soff += (uint)sizeof(ulong);
    1082             :       /* FIXME what happens if someone sends us mblks_rem==0UL here? */
    1083             : 
    1084           0 :       block->fec_sob = 0;
    1085           0 :       continue;
    1086           0 :     }
    1087           0 :     if( block->txns_rem==0UL && block->mblks_rem==0UL ) {
    1088           0 :       break;
    1089           0 :     }
    1090           0 :   }
    1091           0 :   if( block->fec_eob ) {
    1092             :     /* Ignore trailing bytes at the end of a batch. */
    1093           0 :     sched->metrics->bytes_ingested_unparsed_cnt += block->fec_buf_sz-block->fec_buf_soff;
    1094           0 :     block->fec_buf_soff = 0U;
    1095           0 :     block->fec_buf_sz   = 0U;
    1096           0 :     block->fec_sob      = 1;
    1097           0 :     block->fec_eob      = 0;
    1098           0 :   }
    1099           0 : }
    1100             : 
    1101             : static int
    1102           0 : fd_sched_parse_txn( fd_sched_t * sched, fd_sched_block_t * block, fd_sched_alut_ctx_t * alut_ctx ) {
    1103           0 :   fd_txn_t * txn = fd_type_pun( block->txn );
    1104             : 
    1105           0 :   ulong pay_sz = 0UL;
    1106           0 :   ulong txn_sz = fd_txn_parse_core( block->fec_buf+block->fec_buf_soff,
    1107           0 :                                     fd_ulong_min( FD_TXN_MTU, block->fec_buf_sz-block->fec_buf_soff ),
    1108           0 :                                     txn,
    1109           0 :                                     NULL,
    1110           0 :                                     &pay_sz );
    1111             : 
    1112           0 :   if( FD_UNLIKELY( !pay_sz || !txn_sz ) ) {
    1113             :     /* Can't parse out a full transaction. */
    1114           0 :     return 0;
    1115           0 :   }
    1116             : 
    1117             :   /* Try to expand ALUTs. */
    1118           0 :   int has_aluts   = txn->transaction_version==FD_TXN_V0 && txn->addr_table_adtl_cnt>0;
    1119           0 :   int serializing = 0;
    1120           0 :   if( has_aluts ) {
    1121             :     /* FIXME: statically size out slot hashes decode footprint. */
    1122           0 :     FD_SPAD_FRAME_BEGIN( alut_ctx->runtime_spad ) {
    1123           0 :     fd_slot_hashes_global_t const * slot_hashes_global = fd_sysvar_slot_hashes_read( alut_ctx->funk, alut_ctx->funk_txn, alut_ctx->runtime_spad );
    1124           0 :     if( FD_LIKELY( slot_hashes_global ) ) {
    1125           0 :       fd_slot_hash_t * slot_hash = deq_fd_slot_hash_t_join( (uchar *)slot_hashes_global + slot_hashes_global->hashes_offset );
    1126           0 :       serializing = !!fd_runtime_load_txn_address_lookup_tables( txn, block->fec_buf+block->fec_buf_soff, alut_ctx->funk, alut_ctx->funk_txn, alut_ctx->els, slot_hash, block->aluts );
    1127           0 :       sched->metrics->alut_success_cnt += (uint)!serializing;
    1128           0 :     } else {
    1129           0 :       serializing = 1;
    1130           0 :     }
    1131           0 :     } FD_SPAD_FRAME_END;
    1132           0 :   }
    1133             : 
    1134           0 :   ulong txn_idx = fd_rdisp_add_txn( sched->rdisp, block->block_id.id, txn, block->fec_buf+block->fec_buf_soff, serializing ? NULL : block->aluts, serializing );
    1135           0 :   FD_TEST( txn_idx!=0UL );
    1136           0 :   sched->metrics->txn_parsed_cnt++;
    1137           0 :   sched->metrics->alut_serializing_cnt += (uint)serializing;
    1138           0 :   sched->txn_pool_free_cnt--;
    1139           0 :   fd_txn_p_t * txn_p = sched->txn_pool + txn_idx;
    1140           0 :   txn_p->payload_sz  = pay_sz;
    1141           0 :   fd_memcpy( txn_p->payload, block->fec_buf+block->fec_buf_soff, pay_sz );
    1142           0 :   fd_memcpy( TXN(txn_p),     txn,                                txn_sz );
    1143           0 :   sched->txn_to_block_idx[ txn_idx ] = block_pool_idx( sched->block_pool, block );
    1144             : 
    1145           0 :   block->fec_buf_soff += (uint)pay_sz;
    1146           0 :   block->txn_parsed_cnt++;
    1147           0 :   block->txns_rem--;
    1148           0 :   return 1;
    1149           0 : }
    1150             : 
    1151             : #undef CHECK
    1152             : #undef CHECK_LEFT
    1153             : 
    1154             : static void
    1155           0 : try_activate_block( fd_sched_t * sched ) {
    1156           0 :   ulong null_idx = block_pool_idx_null( sched->block_pool );
    1157             : 
    1158             :   /* See if there are any allocated staging lanes that we can activate
    1159             :      for scheduling ... */
    1160           0 :   ulong staged_bitset = sched->staged_bitset;
    1161           0 :   while( staged_bitset ) {
    1162           0 :     int lane_idx  = fd_ulong_find_lsb( staged_bitset );
    1163           0 :     staged_bitset = fd_ulong_pop_lsb( staged_bitset );
    1164             : 
    1165           0 :     ulong              head_idx     = sched->staged_head_block_idx[ lane_idx ];
    1166           0 :     fd_sched_block_t * head_block   = block_pool_ele( sched->block_pool, head_idx );
    1167           0 :     fd_sched_block_t * parent_block = block_pool_ele( sched->block_pool, head_block->parent_idx );
    1168           0 :     if( FD_UNLIKELY( parent_block->dying ) ) {
    1169             :       /* Invariant: no child of a dying block should be staged. */
    1170           0 :       FD_LOG_CRIT(( "invariant violation: staged_head_block_idx %lu, slot %lu, prime %lu on lane %d has parent_block->dying set, slot %lu, prime %lu",
    1171           0 :                     head_idx, (ulong)head_block->block_id.slot, (ulong)head_block->block_id.prime, lane_idx, (ulong)parent_block->block_id.slot, (ulong)parent_block->block_id.prime ));
    1172           0 :     }
    1173             :     //FIXME: restore this invariant check when we have immediate demotion of dying blocks
    1174             :     // if( FD_UNLIKELY( head_block->dying ) ) {
    1175             :     //   /* Invariant: no dying block should be staged. */
    1176             :     //   FD_LOG_CRIT(( "invariant violation: staged_head_block_idx %lu, slot %lu, prime %lu on lane %u has head_block->dying set",
    1177             :     //                 head_idx, (ulong)head_block->block_id.slot, (ulong)head_block->block_id.prime, lane_idx ));
    1178             :     // }
    1179           0 :     if( block_is_done( parent_block ) && block_is_activatable( head_block ) ) {
    1180             :       /* ... Yes, on this staging lane the parent block is done.  So we
    1181             :          can switch to the staged child. */
    1182           0 :       sched->active_block_idx = head_idx;
    1183           0 :       sched->metrics->lane_switch_cnt++;
    1184           0 :       return;
    1185           0 :     }
    1186           0 :   }
    1187             : 
    1188             :   /* ... No, promote unstaged blocks. */
    1189           0 :   ulong root_idx = sched->root_idx;
    1190           0 :   if( FD_UNLIKELY( root_idx==null_idx ) ) {
    1191           0 :     FD_LOG_CRIT(( "invariant violation: root_idx==null_idx indicating fd_sched is unintialized" ));
    1192           0 :   }
    1193             :   /* Find and stage the longest stageable unstaged fork.  This is a
    1194             :      policy decision. */
    1195           0 :   ulong depth = compute_longest_unstaged_fork( sched, root_idx );
    1196           0 :   if( FD_LIKELY( depth>0UL ) ) {
    1197           0 :     if( FD_UNLIKELY( sched->staged_bitset==fd_ulong_mask_lsb( FD_SCHED_MAX_STAGING_LANES ) ) ) {
    1198             :       /* No more staging lanes available.  All of them are occupied by
    1199             :          slow squatters.  Demote one of them. */
    1200             :       //FIXME implement this
    1201           0 :       FD_LOG_CRIT(( "unimplemented" ));
    1202           0 :       sched->metrics->lane_demoted_cnt++;
    1203             :       // sched->metrics->block_demoted_cnt++; for every demoted block
    1204           0 :     }
    1205           0 :     FD_TEST( sched->staged_bitset!=fd_ulong_mask_lsb( FD_SCHED_MAX_STAGING_LANES ) );
    1206           0 :     int lane_idx = fd_ulong_find_lsb( ~sched->staged_bitset );
    1207           0 :     if( FD_UNLIKELY( lane_idx>=(int)FD_SCHED_MAX_STAGING_LANES ) ) {
    1208           0 :       FD_LOG_CRIT(( "invariant violation: lane_idx %d, sched->staged_bitset %lx",
    1209           0 :                     lane_idx, sched->staged_bitset ));
    1210           0 :     }
    1211           0 :     ulong head_block_idx = stage_longest_unstaged_fork( sched, root_idx, lane_idx );
    1212           0 :     if( FD_UNLIKELY( head_block_idx==null_idx ) ) {
    1213             :       /* We found a promotable fork depth>0.  This should not happen. */
    1214           0 :       FD_LOG_CRIT(( "invariant violation: head_block_idx==null_idx" ));
    1215           0 :     }
    1216           0 :     sched->active_block_idx = head_block_idx;
    1217           0 :     return;
    1218           0 :   }
    1219             :   /* No unstaged blocks to promote.  So we're done.  Yay. */
    1220           0 : }
    1221             : 
    1222             : /* It's safe to call this function more than once on the same block. */
    1223             : static void
    1224           0 : subtree_abandon( fd_sched_t * sched, fd_sched_block_t * block ) {
    1225           0 :   if( FD_UNLIKELY( block->rooted ) ) {
    1226           0 :     FD_LOG_CRIT(( "invariant violation: rooted block should not be abandoned, slot %lu, prime %lu",
    1227           0 :                   (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
    1228           0 :   }
    1229             :   /* All minority fork nodes pass through this function eventually.  So
    1230             :      this is a good point to check per-node invariants for minority
    1231             :      forks. */
    1232           0 :   if( FD_UNLIKELY( block->staged && !block->in_rdisp ) ) {
    1233           0 :     FD_LOG_CRIT(( "invariant violation: staged block is not in the dispatcher, slot %lu, prime %lu",
    1234           0 :                   (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
    1235           0 :   }
    1236             : 
    1237           0 :   ulong null_idx  = block_pool_idx_null( sched->block_pool );
    1238             : 
    1239             :   /* Setting the flag is non-optional and can happen more than once. */
    1240           0 :   block->dying = 1;
    1241             : 
    1242             :   /* Removal from dispatcher should only happen once. */
    1243           0 :   if( block->in_rdisp ) {
    1244           0 :     fd_sched_block_t * parent = block_pool_ele( sched->block_pool, block->parent_idx );
    1245           0 :     if( FD_UNLIKELY( !parent ) ) {
    1246             :       /* Only the root has no parent.  Abandon should never be called on
    1247             :          the root.  So any block we are trying to abandon should have a
    1248             :          parent. */
    1249           0 :       FD_LOG_CRIT(( "invariant violation: parent not found slot %lu, prime %lu",
    1250           0 :                     (ulong)block->block_id.slot, (ulong)block->block_id.prime ));
    1251           0 :     }
    1252             : 
    1253             :     /* The dispatcher expects blocks to be abandoned in the same order
    1254             :        that they were added on each lane.  There are no requirements on
    1255             :        the order of abandoning if two blocks are not on the same lane,
    1256             :        or if a block is unstaged.  This means that in general we
    1257             :        shouldn't abandon a child block if the parent hasn't been
    1258             :        abandoned yet, if and only if they are on the same lane.  So wait
    1259             :        until we can abandon the parent, and then descend down the fork
    1260             :        tree to ensure orderly abandoning. */
    1261           0 :     int abandon = !parent->in_rdisp || /* parent is not in the dispatcher */
    1262           0 :                   !parent->staged   || /* parent is in the dispatcher but not staged */
    1263           0 :                   !block->staged    || /* parent is in the dispatcher and staged but this block is unstaged */
    1264           0 :                   block->staging_lane!=parent->staging_lane; /* this block is on a different staging lane than its parent */
    1265             : 
    1266             :     /* We inform the dispatcher of an abandon only when there are no
    1267             :        more in-flight transactions.  Otherwise, if the dispatcher
    1268             :        recycles the same txn_id that was just abandoned, and we receive
    1269             :        completion of an in-flight transaction whose txn_id was just
    1270             :        recycled, we would basically be aliasing the same txn_id and end
    1271             :        up indexing into txn_to_block_idx[] that is already overwritten
    1272             :        with new blocks. */
    1273           0 :     abandon = abandon && block->txn_in_flight_cnt==0;
    1274             : 
    1275           0 :     if( abandon ) {
    1276           0 :       block->in_rdisp = 0;
    1277           0 :       fd_rdisp_abandon_block( sched->rdisp, block->block_id.id );
    1278           0 :       sched->txn_pool_free_cnt += block->txn_parsed_cnt-block->txn_done_cnt; /* in_flight_cnt==0 */
    1279           0 :       sched->metrics->block_abandoned_cnt++;
    1280           0 :       sched->metrics->txn_abandoned_parsed_cnt += block->txn_parsed_cnt;
    1281           0 :       sched->metrics->txn_abandoned_done_cnt   += block->txn_done_cnt;
    1282             : 
    1283             :       /* Now release the staging lane. */
    1284             :       //FIXME when demote supports non-empty blocks, we should demote
    1285             :       //the block from the lane unconditionally and immediately,
    1286             :       //regardles of whether it's safe to abandon or not.  So a block
    1287             :       //would go immediately from staged to unstaged and eventually to
    1288             :       //abandoned.
    1289           0 :       if( FD_LIKELY( block->staged ) ) {
    1290           0 :         block->staged = 0;
    1291           0 :         sched->staged_bitset = fd_ulong_clear_bit( sched->staged_bitset, (int)block->staging_lane );
    1292           0 :         sched->staged_head_block_idx[ block->staging_lane ] = null_idx;
    1293           0 :       }
    1294           0 :     }
    1295             : 
    1296           0 :     if( FD_UNLIKELY( block->staged && sched->active_block_idx==sched->staged_head_block_idx[ block->staging_lane ] ) ) {
    1297             :       /* Dying blocks should not be active. */
    1298           0 :       sched->active_block_idx = null_idx;
    1299           0 :     }
    1300           0 :   }
    1301             : 
    1302             :   /* Abandon the entire fork chaining off of this block. */
    1303           0 :   ulong child_idx = block->child_idx;
    1304           0 :   while( child_idx != null_idx ) {
    1305           0 :     fd_sched_block_t * child = block_pool_ele( sched->block_pool, child_idx );
    1306           0 :     subtree_abandon( sched, child );
    1307           0 :     child_idx = child->sibling_idx;
    1308           0 :   }
    1309           0 : }
    1310             : 
    1311             : FD_FN_UNUSED static ulong
    1312           0 : find_and_stage_longest_unstaged_fork( fd_sched_t * sched, int lane_idx ) {
    1313           0 :   ulong null_idx = block_pool_idx_null( sched->block_pool );
    1314           0 :   ulong root_idx = sched->root_idx;
    1315           0 : 
    1316           0 :   if( FD_UNLIKELY( root_idx==null_idx ) ) {
    1317           0 :     FD_LOG_CRIT(( "invariant violation: root_idx==null_idx indicating fd_sched is unintialized" ));
    1318           0 :   }
    1319           0 : 
    1320           0 :   /* First pass: compute the longest unstaged fork depth for each node
    1321           0 :      in the fork tree. */
    1322           0 :   ulong depth = compute_longest_unstaged_fork( sched, root_idx );
    1323           0 : 
    1324           0 :   /* Second pass: stage blocks on the longest unstaged fork. */
    1325           0 :   ulong head_block_idx = stage_longest_unstaged_fork( sched, root_idx, lane_idx );
    1326           0 : 
    1327           0 :   if( FD_UNLIKELY( (depth>0UL && head_block_idx==null_idx) || (depth==0UL && head_block_idx!=null_idx) ) ) {
    1328           0 :     FD_LOG_CRIT(( "invariant violation: depth %lu, head_block_idx %lu",
    1329           0 :                   depth, head_block_idx ));
    1330           0 :   }
    1331           0 : 
    1332           0 :   return head_block_idx;
    1333           0 : }
    1334             : 
    1335             : /* Returns length of the longest stageable unstaged fork, if there is
    1336             :    one, and 0 otherwise. */
    1337             : static ulong
    1338           0 : compute_longest_unstaged_fork( fd_sched_t * sched, ulong block_idx ) {
    1339           0 :   ulong null_idx = block_pool_idx_null( sched->block_pool );
    1340           0 :   if( FD_UNLIKELY( block_idx==null_idx ) ) {
    1341           0 :     FD_LOG_CRIT(( "invariant violation: block_idx==null_idx" ));
    1342           0 :   }
    1343             : 
    1344           0 :   fd_sched_block_t * block = block_pool_ele( sched->block_pool, block_idx );
    1345             : 
    1346           0 :   ulong max_child_depth = 0UL;
    1347           0 :   ulong child_idx       = block->child_idx;
    1348           0 :   while( child_idx!=null_idx ) {
    1349           0 :     ulong child_depth = compute_longest_unstaged_fork( sched, child_idx );
    1350           0 :     if( child_depth > max_child_depth ) {
    1351           0 :       max_child_depth = child_depth;
    1352           0 :     }
    1353           0 :     fd_sched_block_t * child = block_pool_ele( sched->block_pool, child_idx );
    1354           0 :     child_idx = child->sibling_idx;
    1355           0 :   }
    1356             : 
    1357           0 :   block->luf_depth = max_child_depth + fd_ulong_if( block_is_promotable( block ), 1UL, 0UL );
    1358           0 :   return block->luf_depth;
    1359           0 : }
    1360             : 
    1361             : static ulong
    1362           0 : stage_longest_unstaged_fork_helper( fd_sched_t * sched, ulong block_idx, int lane_idx ) {
    1363           0 :   ulong null_idx = block_pool_idx_null( sched->block_pool );
    1364           0 :   if( FD_UNLIKELY( block_idx==null_idx ) ) {
    1365           0 :     FD_LOG_CRIT(( "invariant violation: block_idx==null_idx" ));
    1366           0 :   }
    1367             : 
    1368           0 :   fd_sched_block_t * block = block_pool_ele( sched->block_pool, block_idx );
    1369             : 
    1370           0 :   int   stage_it = fd_int_if( block_is_promotable( block ), 1, 0 );
    1371           0 :   ulong rv       = fd_ulong_if( stage_it, block_idx, null_idx );
    1372           0 :   if( FD_LIKELY( stage_it ) ) {
    1373           0 :     block->staged = 1;
    1374           0 :     block->staging_lane = (ulong)lane_idx;
    1375           0 :     fd_rdisp_promote_block( sched->rdisp, block->block_id.id, block->staging_lane );
    1376           0 :     sched->metrics->block_promoted_cnt++;
    1377           0 :   }
    1378             : 
    1379             :   /* Base case: leaf node. */
    1380           0 :   if( block->child_idx==null_idx ) return rv;
    1381             : 
    1382           0 :   ulong max_depth      = 0UL;
    1383           0 :   ulong best_child_idx = null_idx;
    1384           0 :   ulong child_idx      = block->child_idx;
    1385           0 :   while( child_idx!=null_idx ) {
    1386           0 :     fd_sched_block_t * child = block_pool_ele( sched->block_pool, child_idx );
    1387           0 :     if( child->luf_depth>max_depth ) {
    1388           0 :       max_depth      = child->luf_depth;
    1389           0 :       best_child_idx = child_idx;
    1390           0 :     }
    1391           0 :     child_idx = child->sibling_idx;
    1392           0 :   }
    1393             : 
    1394             :   /* Recursively stage descendants. */
    1395           0 :   if( best_child_idx!=null_idx ) {
    1396           0 :     ulong head_block_idx = stage_longest_unstaged_fork_helper( sched, best_child_idx, lane_idx );
    1397           0 :     rv = fd_ulong_if( rv!=null_idx, rv, head_block_idx );
    1398           0 :   }
    1399             : 
    1400           0 :   return rv;
    1401           0 : }
    1402             : 
    1403             : /* Returns idx of head block of staged lane on success, idx_null
    1404             :    otherwise. */
    1405             : static ulong
    1406           0 : stage_longest_unstaged_fork( fd_sched_t * sched, ulong block_idx, int lane_idx ) {
    1407           0 :   ulong head_block_idx = stage_longest_unstaged_fork_helper( sched, block_idx, lane_idx );
    1408           0 :   if( FD_LIKELY( head_block_idx!=block_pool_idx_null( sched->block_pool ) ) ) {
    1409           0 :     sched->metrics->lane_promoted_cnt++;
    1410           0 :     sched->staged_bitset = fd_ulong_set_bit( sched->staged_bitset, lane_idx );
    1411           0 :     sched->staged_head_block_idx[ lane_idx ] = head_block_idx;
    1412           0 :   }
    1413           0 :   return head_block_idx;
    1414           0 : }

Generated by: LCOV version 1.14