LCOV - code coverage report
Current view: top level - discof/repair - fd_repair_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 615 0.0 %
Date: 2026-01-01 05:20:18 Functions: 0 48 0.0 %

          Line data    Source code
       1             : /* The repair tile is responsible for repairing missing shreds that were
       2             :    not received via Turbine.
       3             : 
       4             :    Generally there are two distinct traffic patterns:
       5             : 
       6             :    1. Firedancer boots up and fires off a large number of repairs to
       7             :       recover all the blocks between the snapshot on which it is booting
       8             :       and the head of the chain.  In this mode, repair tile utilization
       9             :       is very high along with net and sign utilization.
      10             : 
      11             :    2. Firedancer catches up to the head of the chain and enters steady
      12             :       state where most shred traffic is delivered over turbine.  In this
      13             :       state, repairs are only occasionally needed to recover shreds lost
      14             :       due to anomalies like packet loss, transmitter (leader) never sent
      15             :       them or even a malicious leader etc. */
      16             : 
      17             : #define _GNU_SOURCE
      18             : 
      19             : #include "../genesis/fd_genesi_tile.h"
      20             : #include "../../disco/topo/fd_topo.h"
      21             : #include "generated/fd_repair_tile_seccomp.h"
      22             : #include "../../disco/fd_disco.h"
      23             : #include "../../disco/keyguard/fd_keyload.h"
      24             : #include "../../disco/keyguard/fd_keyguard.h"
      25             : #include "../../disco/net/fd_net_tile.h"
      26             : #include "../../flamenco/gossip/fd_gossip_types.h"
      27             : #include "../tower/fd_tower_tile.h"
      28             : #include "../../discof/restore/utils/fd_ssmsg.h"
      29             : #include "../../util/net/fd_net_headers.h"
      30             : #include "../../tango/fd_tango_base.h"
      31             : 
      32             : #include "../forest/fd_forest.h"
      33             : #include "fd_repair_metrics.h"
      34             : #include "fd_inflight.h"
      35             : #include "fd_repair.h"
      36             : #include "fd_policy.h"
      37             : 
      38             : #define LOGGING       1
      39             : #define DEBUG_LOGGING 0
      40             : 
      41             : #define IN_KIND_CONTACT (0)
      42           0 : #define IN_KIND_NET     (1)
      43           0 : #define IN_KIND_TOWER   (2)
      44           0 : #define IN_KIND_SHRED   (3)
      45           0 : #define IN_KIND_SIGN    (4)
      46           0 : #define IN_KIND_SNAP    (5)
      47           0 : #define IN_KIND_STAKE   (6)
      48           0 : #define IN_KIND_GOSSIP  (7)
      49           0 : #define IN_KIND_GENESIS (8)
      50             : 
      51             : #define MAX_IN_LINKS    (32)
      52             : 
      53             : #define MAX_REPAIR_PEERS   40200UL
      54             : #define MAX_BUFFER_SIZE    ( MAX_REPAIR_PEERS * sizeof( fd_shred_dest_wire_t ) )
      55             : #define MAX_SHRED_TILE_CNT ( 16UL )
      56             : #define MAX_SIGN_TILE_CNT  ( 16UL )
      57             : 
      58             : /* Maximum size of a network packet */
      59           0 : #define FD_REPAIR_MAX_PACKET_SIZE 1232
      60             : /* Max number of validators that can be actively queried */
      61           0 : #define FD_ACTIVE_KEY_MAX (FD_CONTACT_INFO_TABLE_SIZE)
      62             : /* Max number of pending shred requests */
      63           0 : #define FD_NEEDED_KEY_MAX (1<<20)
      64             : 
      65             : /* static map from request type to metric array index */
      66             : static uint metric_index[FD_REPAIR_KIND_ORPHAN + 1] = {
      67             :   [FD_REPAIR_KIND_SHRED]         = FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_V_NEEDED_WINDOW_IDX,
      68             :   [FD_REPAIR_KIND_HIGHEST_SHRED] = FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_V_NEEDED_HIGHEST_WINDOW_IDX,
      69             :   [FD_REPAIR_KIND_ORPHAN]        = FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_V_NEEDED_ORPHAN_IDX,
      70             : };
      71             : 
      72             : typedef union {
      73             :   struct {
      74             :     fd_wksp_t * mem;
      75             :     ulong       chunk0;
      76             :     ulong       wmark;
      77             :     ulong       mtu;
      78             :   };
      79             :   fd_net_rx_bounds_t net_rx;
      80             : } in_ctx_t;
      81             : 
      82             : struct out_ctx {
      83             :   ulong         idx;
      84             :   fd_wksp_t *   mem;
      85             :   ulong         chunk0;
      86             :   ulong         wmark;
      87             :   ulong         chunk;
      88             : 
      89             :   /* Repair tile directly tracks credit outside of stem for these
      90             :      asynchronous sign links.  In particular, credits tracks the RETURN
      91             :      sign_repair link.  This is because repair_sign is reliable, and
      92             :      sign_repair is unreliable.  If both links were reliable, and the
      93             :      links filled completely, stem would get into a deadlock. Neither
      94             :      repair or sign would have credits, which would prevent frags from
      95             :      getting polled in repair or sign, which would prevent any credits
      96             :      from getting returned back to the tiles.  So the sign_repair return
      97             :      link must be unreliable. credits / max_credits are used by the
      98             :      repair_sign link.  In particular, credits manages the RETURN
      99             :      sign_repair link.
     100             : 
     101             :      Consider the scenario:
     102             : 
     103             :              repair_sign (depth 128)        sign_repair (depth 128)
     104             :      repair  ---------------------->  sign ------------------------> repair
     105             :              [rest free, r130, r129]       [r128, r127, ... , r1] (full)
     106             : 
     107             :      If repair is publishing too many requests too fast(common in
     108             :      catchup), and not polling enough frags from sign, without manual
     109             :      management the sign_repair link would be overrun.  Nothing is
     110             :      stopping repair from publishing more requests, because sign is
     111             :      functioning fast enough to handle the requests. However, nothing is
     112             :      stopping sign from polling the next request and signing it, and
     113             :      PUBLISHING it on the sign_repair link that is already full, because
     114             :      the sign_repair link is unreliable.
     115             : 
     116             :      This is why we need to manually track credits for the sign_repair
     117             :      link. We must ensure that there are never more than 128 items in
     118             :      the ENTIRE repair_sign -> sign tile -> sign_repair work queue, else
     119             :      there is always a possibility of an overrun in the sign_repair
     120             :      link.
     121             : 
     122             :      We can furthermore ensure some nice properties by having the
     123             :      repair_sign link have a greater depth than the sign_repair link.
     124             :      This way, we exclusively use manual credit management to control
     125             :      the rate at which we publish requests to sign.  We can then avoid
     126             :      being stem backpressured, which allows us to keep polling frags and
     127             :      reading incoming shreds, even when the repair sign link is "full."
     128             :      This is a non-necessary property for good performance.
     129             : 
     130             :      To lose a frag to overrun isn't necessarily critical, but in
     131             :      general the repair tile relies on the fact that a signing task
     132             :      published to sign tile will always come back.  If we lose a frag to
     133             :      overrun, then there will be an entry in the pending signs structure
     134             :      that is never removed, and theoretically the map could fill up.
     135             :      Conceptually, with a reliable sign->repair->sign structure, there
     136             :      should be no eviction needed in this pending signs structure. */
     137             : 
     138             :   ulong in_idx;      /* index of the incoming link */
     139             :   ulong credits;     /* available credits for link */
     140             :   ulong max_credits; /* maximum credits (depth) */
     141             : };
     142             : typedef struct out_ctx out_ctx_t;
     143             : 
     144             : struct fd_fec_sig {
     145             :   ulong            key; /* map key. 32 msb = slot, 32 lsb = fec_set_idx */
     146             :   fd_ed25519_sig_t sig; /* Ed25519 sig identifier of the FEC. */
     147             : };
     148             : typedef struct fd_fec_sig fd_fec_sig_t;
     149             : 
     150             : #define MAP_NAME    fd_fec_sig
     151           0 : #define MAP_T       fd_fec_sig_t
     152             : #define MAP_MEMOIZE 0
     153             : #include "../../util/tmpl/fd_map_dynamic.c"
     154             : 
     155             : /* Data needed to sign and send a pong that is not contained in the
     156             :    pong msg itself. */
     157             : 
     158             : struct pong_data {
     159             :   fd_ip4_port_t  peer_addr;
     160             :   fd_hash_t      hash;
     161             :   uint           daddr;
     162             : };
     163             : typedef struct pong_data pong_data_t;
     164             : 
     165             : struct sign_req {
     166             :   ulong       key;        /* map key, ctx->pending_key_next */
     167             :   ulong       buflen;
     168             :   union {
     169             :     uchar           buf[sizeof(fd_repair_msg_t)];
     170             :     fd_repair_msg_t msg;
     171             :   };
     172             :   pong_data_t  pong_data; /* populated only for pong msgs */
     173             : };
     174             : typedef struct sign_req sign_req_t;
     175             : 
     176             : #define MAP_NAME         fd_signs_map
     177           0 : #define MAP_KEY          key
     178           0 : #define MAP_KEY_NULL     ULONG_MAX
     179           0 : #define MAP_KEY_INVAL(k) (k==ULONG_MAX)
     180           0 : #define MAP_T            sign_req_t
     181             : #define MAP_MEMOIZE      0
     182             : #include "../../util/tmpl/fd_map_dynamic.c"
     183             : 
     184             : /* Because the sign tiles could be all busy when a contact info arrives,
     185             :    we need to save ping messages to be signed in a queue and dispatched
     186             :    in after_credit when there are sign tiles available.  The size of the
     187             :    queue was determined by the following: we can limit the size of this
     188             :    queue to be the maximum number of active keys - which is equal to the
     189             :    number of warm up requests we might queue.  The queue will also hold
     190             :    pongs, but in order for the ping to arrive the warm up request must
     191             :    have left the queue.  It is possible that we start up and get
     192             :    FD_ACTIVE_KEY_MAX peers gossiped to us, and as we are queueing up
     193             :    their pings they all drop and another FD_ACTIVE_KEY_MAX new peers
     194             :    gossip to us, causing us to fill up the queue.  Idk overall this
     195             :    scenario is highly unlikely and it's not the end of the world if we
     196             :    drop a warmup req or ping  to a peer because the first req to them
     197             :    will retrigger it anyway.
     198             : 
     199             :    Typical flow is that a pong will get added to the sign_queue during
     200             :    an after_frag call.  Then on the following after_credit will get
     201             :    popped from the sign_queue and added to sign_map, and then dispatched
     202             :    to the sign tile. */
     203             : 
     204             : struct sign_pending {
     205             :   fd_repair_msg_t msg;
     206             :   pong_data_t     pong_data; /* populated only for pong msgs */
     207             : };
     208             : typedef struct sign_pending sign_pending_t;
     209             : 
     210             : #define QUEUE_NAME       fd_signs_queue
     211           0 : #define QUEUE_T          sign_pending_t
     212           0 : #define QUEUE_MAX        2*FD_ACTIVE_KEY_MAX
     213             : #include "../../util/tmpl/fd_queue.c"
     214             : 
     215             : struct ctx {
     216             :   long tsdebug; /* timestamp for debug printing */
     217             : 
     218             :   ulong repair_seed;
     219             : 
     220             :   fd_ip4_port_t repair_intake_addr;
     221             :   fd_ip4_port_t repair_serve_addr;
     222             : 
     223             :   fd_forest_t    * forest;
     224             :   fd_fec_sig_t   * fec_sigs;
     225             :   fd_policy_t    * policy;
     226             :   fd_inflights_t * inflights;
     227             :   fd_repair_t    * protocol;
     228             : 
     229             :   fd_pubkey_t identity_public_key;
     230             : 
     231             :   fd_wksp_t * wksp;
     232             : 
     233             :   fd_stem_context_t * stem;
     234             : 
     235             :   uchar    in_kind[ MAX_IN_LINKS ];
     236             :   in_ctx_t in_links[ MAX_IN_LINKS ];
     237             : 
     238             :   int skip_frag;
     239             : 
     240             :   uint        net_out_idx;
     241             :   fd_wksp_t * net_out_mem;
     242             :   ulong       net_out_chunk0;
     243             :   ulong       net_out_wmark;
     244             :   ulong       net_out_chunk;
     245             : 
     246             :   ulong snap_out_chunk;
     247             : 
     248             :   uint      shred_tile_cnt;
     249             :   out_ctx_t shred_out_ctx[ MAX_SHRED_TILE_CNT ];
     250             : 
     251             :   /* repair_sign links (to sign tiles 1+) - for round-robin distribution */
     252             : 
     253             :   ulong     repair_sign_cnt;
     254             :   out_ctx_t repair_sign_out_ctx[ MAX_SIGN_TILE_CNT ];
     255             : 
     256             :   ulong     sign_rrobin_idx;
     257             : 
     258             :   /* Pending sign requests for async operations */
     259             : 
     260             :   uint             pending_key_next;
     261             :   sign_req_t     * signs_map;  /* contains any request currently in the repair->sign or sign->repair dcache */
     262             :   sign_pending_t * sign_queue; /* contains any request waiting to be dispatched to repair->sign */
     263             : 
     264             :   ushort net_id;
     265             :   uchar buffer[ MAX_BUFFER_SIZE ]; /* includes Ethernet, IP, UDP headers */
     266             :   fd_ip4_udp_hdrs_t intake_hdr[1];
     267             :   fd_ip4_udp_hdrs_t serve_hdr [1];
     268             : 
     269             :   ulong manifest_slot;
     270             :   struct {
     271             :     ulong send_pkt_cnt;
     272             :     ulong sent_pkt_types[FD_METRICS_ENUM_REPAIR_SENT_REQUEST_TYPES_CNT];
     273             :     ulong repaired_slots;
     274             :     ulong current_slot;
     275             :     ulong sign_tile_unavail;
     276             :     ulong rerequest;
     277             :     ulong malformed_ping;
     278             :     fd_histf_t slot_compl_time[ 1 ];
     279             :     fd_histf_t response_latency[ 1 ];
     280             :   } metrics[ 1 ];
     281             : 
     282             :   /* Slot-level metrics */
     283             : 
     284             :   fd_repair_metrics_t * slot_metrics;
     285             :   ulong turbine_slot0;  // catchup considered complete after this slot
     286             :   struct {
     287             :     int   enabled;
     288             :     ulong end_slot;
     289             :     int   complete;
     290             :   } profiler;
     291             : };
     292             : typedef struct ctx ctx_t;
     293             : 
     294             : FD_FN_CONST static inline ulong
     295           0 : scratch_align( void ) {
     296           0 :   return 128UL;
     297           0 : }
     298             : 
     299             : FD_FN_PURE static inline ulong
     300           0 : loose_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED ) {
     301           0 :   return 1UL * FD_SHMEM_GIGANTIC_PAGE_SZ;
     302           0 : }
     303             : 
     304             : FD_FN_PURE static inline ulong
     305           0 : scratch_footprint( fd_topo_tile_t const * tile ) {
     306           0 :   ulong total_sign_depth = tile->repair.repair_sign_depth * tile->repair.repair_sign_cnt;
     307           0 :   int   lg_sign_depth    = fd_ulong_find_msb( fd_ulong_pow2_up(total_sign_depth) ) + 1;
     308             : 
     309           0 :   ulong l = FD_LAYOUT_INIT;
     310           0 :   l = FD_LAYOUT_APPEND( l, alignof(ctx_t),            sizeof(ctx_t)                                                    );
     311           0 :   l = FD_LAYOUT_APPEND( l, fd_repair_align(),         fd_repair_footprint     ()                                       );
     312           0 :   l = FD_LAYOUT_APPEND( l, fd_forest_align(),         fd_forest_footprint     ( tile->repair.slot_max )                );
     313           0 :   l = FD_LAYOUT_APPEND( l, fd_policy_align(),         fd_policy_footprint     ( FD_NEEDED_KEY_MAX, FD_ACTIVE_KEY_MAX ) );
     314           0 :   l = FD_LAYOUT_APPEND( l, fd_inflights_align(),      fd_inflights_footprint  ()                                       );
     315           0 :   l = FD_LAYOUT_APPEND( l, fd_fec_sig_align(),        fd_fec_sig_footprint    ( 20 )                                   );
     316           0 :   l = FD_LAYOUT_APPEND( l, fd_signs_map_align(),      fd_signs_map_footprint  ( lg_sign_depth )                        );
     317           0 :   l = FD_LAYOUT_APPEND( l, fd_signs_queue_align(),    fd_signs_queue_footprint()                                       );
     318           0 :   l = FD_LAYOUT_APPEND( l, fd_repair_metrics_align(), fd_repair_metrics_footprint()                                    );
     319           0 :   return FD_LAYOUT_FINI( l, scratch_align() );
     320           0 : }
     321             : 
     322             : /* Below functions manage the current pending sign requests. */
     323             : 
     324             : sign_req_t *
     325             : sign_map_insert( ctx_t *                 ctx,
     326             :                  fd_repair_msg_t const * msg,
     327           0 :                  pong_data_t const     * opt_pong_data ) {
     328             : 
     329             :   /* Check if there is any space for a new pending sign request. Should never fail as long as credit management is working. */
     330           0 :   if( FD_UNLIKELY( fd_signs_map_key_cnt( ctx->signs_map )==fd_signs_map_key_max( ctx->signs_map ) ) ) return NULL;
     331             : 
     332           0 :   sign_req_t * pending = fd_signs_map_insert( ctx->signs_map, ctx->pending_key_next++ );
     333           0 :   if( FD_UNLIKELY( !pending ) ) return NULL; // Not possible, unless the same nonce is used twice.
     334           0 :   pending->msg    = *msg;
     335           0 :   pending->buflen = fd_repair_sz( msg );
     336           0 :   if( FD_UNLIKELY( opt_pong_data ) ) pending->pong_data = *opt_pong_data;
     337           0 :   return pending;
     338           0 : }
     339             : 
     340             : int
     341             : sign_map_remove( ctx_t * ctx,
     342           0 :                  ulong   key  ) {
     343           0 :   sign_req_t * pending = fd_signs_map_query( ctx->signs_map, key, NULL );
     344           0 :   if( FD_UNLIKELY( !pending ) ) return -1;
     345           0 :   fd_signs_map_remove( ctx->signs_map, pending );
     346           0 :   return 0;
     347           0 : }
     348             : 
     349             : static void
     350             : send_packet( ctx_t             * ctx,
     351             :              fd_stem_context_t * stem,
     352             :              int                 is_intake,
     353             :              uint                dst_ip_addr,
     354             :              ushort              dst_port,
     355             :              uint                src_ip_addr,
     356             :              uchar const *       payload,
     357             :              ulong               payload_sz,
     358           0 :              ulong               tsorig ) {
     359           0 :   ctx->metrics->send_pkt_cnt++;
     360           0 :   uchar * packet = fd_chunk_to_laddr( ctx->net_out_mem, ctx->net_out_chunk );
     361           0 :   fd_ip4_udp_hdrs_t * hdr = (fd_ip4_udp_hdrs_t *)packet;
     362           0 :   *hdr = *(is_intake ? ctx->intake_hdr : ctx->serve_hdr);
     363             : 
     364           0 :   fd_ip4_hdr_t * ip4 = hdr->ip4;
     365           0 :   ip4->saddr       = src_ip_addr;
     366           0 :   ip4->daddr       = dst_ip_addr;
     367           0 :   ip4->net_id      = fd_ushort_bswap( ctx->net_id++ );
     368           0 :   ip4->check       = 0U;
     369           0 :   ip4->net_tot_len = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_ip4_hdr_t)+sizeof(fd_udp_hdr_t)) );
     370           0 :   ip4->check       = fd_ip4_hdr_check_fast( ip4 );
     371             : 
     372           0 :   fd_udp_hdr_t * udp = hdr->udp;
     373           0 :   udp->net_dport = dst_port;
     374           0 :   udp->net_len   = fd_ushort_bswap( (ushort)(payload_sz + sizeof(fd_udp_hdr_t)) );
     375           0 :   fd_memcpy( packet+sizeof(fd_ip4_udp_hdrs_t), payload, payload_sz );
     376           0 :   hdr->udp->check = 0U;
     377             : 
     378           0 :   ulong tspub     = fd_frag_meta_ts_comp( fd_tickcount() );
     379           0 :   ulong sig       = fd_disco_netmux_sig( dst_ip_addr, dst_port, dst_ip_addr, DST_PROTO_OUTGOING, sizeof(fd_ip4_udp_hdrs_t) );
     380           0 :   ulong packet_sz = payload_sz + sizeof(fd_ip4_udp_hdrs_t);
     381           0 :   ulong chunk     = ctx->net_out_chunk;
     382           0 :   fd_stem_publish( stem, ctx->net_out_idx, sig, chunk, packet_sz, 0UL, tsorig, tspub );
     383           0 :   ctx->net_out_chunk = fd_dcache_compact_next( chunk, packet_sz, ctx->net_out_chunk0, ctx->net_out_wmark );
     384           0 : }
     385             : 
     386             : /* Returns a sign_out context with max available credits.
     387             :    If no sign_out context has available credits, returns NULL. */
     388             : static out_ctx_t *
     389           0 : sign_avail_credits( ctx_t * ctx ) {
     390           0 :   out_ctx_t * sign_out = NULL;
     391           0 :   ulong max_credits = 0;
     392           0 :   for( uint i = 0; i < ctx->repair_sign_cnt; i++ ) {
     393           0 :     if( ctx->repair_sign_out_ctx[i].credits > max_credits ) {
     394           0 :       max_credits =  ctx->repair_sign_out_ctx[i].credits;
     395           0 :       sign_out    = &ctx->repair_sign_out_ctx[i];
     396           0 :     }
     397           0 :   }
     398           0 :   return sign_out;
     399           0 : }
     400             : 
     401             : /* Prepares the signing preimage and publishes a signing request that
     402             :    will be signed asynchronously by the sign tile.  The signed data will
     403             :    be returned via dcache as a frag. */
     404             : static void
     405             : fd_repair_send_sign_request( ctx_t                 * ctx,
     406             :                              out_ctx_t             * sign_out,
     407             :                              fd_repair_msg_t const * msg,
     408           0 :                              pong_data_t     const * opt_pong_data ){
     409             :   /* New sign request */
     410           0 :   sign_req_t * pending = sign_map_insert( ctx, msg, opt_pong_data );
     411           0 :   if( FD_UNLIKELY( !pending ) ) return;
     412             : 
     413           0 :   ulong   sig         = 0;
     414           0 :   ulong   preimage_sz = 0;
     415           0 :   uchar * dst         = fd_chunk_to_laddr( sign_out->mem, sign_out->chunk );
     416             : 
     417           0 :   if( FD_UNLIKELY( msg->kind == FD_REPAIR_KIND_PONG ) ) {
     418           0 :     uchar pre_image[FD_REPAIR_PONG_PREIMAGE_SZ];
     419           0 :     preimage_pong( &opt_pong_data->hash, pre_image, sizeof(pre_image) );
     420           0 :     preimage_sz = FD_REPAIR_PONG_PREIMAGE_SZ;
     421           0 :     fd_memcpy( dst, pre_image, preimage_sz );
     422           0 :     sig = ((ulong)pending->key << 32) | (uint)FD_KEYGUARD_SIGN_TYPE_SHA256_ED25519;
     423           0 :   } else {
     424             :     /* Sign and prepare the message directly into the pending buffer */
     425           0 :     uchar * preimage = preimage_req( &pending->msg, &preimage_sz );
     426           0 :     fd_memcpy( dst, preimage, preimage_sz );
     427           0 :     sig = ((ulong)pending->key << 32) | (uint)FD_KEYGUARD_SIGN_TYPE_ED25519;
     428           0 :   }
     429             : 
     430           0 :   fd_stem_publish( ctx->stem, sign_out->idx, sig, sign_out->chunk, preimage_sz, 0UL, 0UL, 0UL );
     431           0 :   sign_out->chunk = fd_dcache_compact_next( sign_out->chunk, preimage_sz, sign_out->chunk0, sign_out->wmark );
     432             : 
     433           0 :   ctx->metrics->sent_pkt_types[metric_index[msg->kind]]++;
     434           0 :   sign_out->credits--;
     435           0 : }
     436             : 
     437             : static inline int
     438             : before_frag( ctx_t * ctx,
     439             :              ulong   in_idx,
     440             :              ulong   seq FD_PARAM_UNUSED,
     441           0 :              ulong   sig ) {
     442           0 :   uint in_kind = ctx->in_kind[ in_idx ];
     443           0 :   if( FD_LIKELY  ( in_kind==IN_KIND_NET   ) ) return fd_disco_netmux_sig_proto( sig )!=DST_PROTO_REPAIR;
     444           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_SHRED ) ) return fd_int_if( fd_forest_root_slot( ctx->forest )==ULONG_MAX, -1, 0 ); /* not ready to read frag */
     445           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_GOSSIP ) ) {
     446           0 :     return sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO &&
     447           0 :            sig!=FD_GOSSIP_UPDATE_TAG_CONTACT_INFO_REMOVE;
     448           0 :   }
     449           0 :   return 0;
     450           0 : }
     451             : 
     452             : static void
     453             : during_frag( ctx_t * ctx,
     454             :              ulong                  in_idx,
     455             :              ulong                  seq FD_PARAM_UNUSED,
     456             :              ulong                  sig FD_PARAM_UNUSED,
     457             :              ulong                  chunk,
     458             :              ulong                  sz,
     459           0 :              ulong                  ctl ) {
     460           0 :   ctx->skip_frag = 0;
     461             : 
     462           0 :   uint             in_kind =  ctx->in_kind[ in_idx ];
     463           0 :   in_ctx_t const * in_ctx  = &ctx->in_links[ in_idx ];
     464             : 
     465           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_TOWER ) ) {
     466           0 :     if( FD_UNLIKELY( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark || sz>in_ctx->mtu ) ) {
     467           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in_ctx->chunk0, in_ctx->wmark ));
     468           0 :     }
     469           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
     470           0 :     fd_memcpy( ctx->buffer, dcache_entry, sz );
     471           0 :     return;
     472           0 :   }
     473             : 
     474           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_GENESIS ) ) {
     475           0 :     return;
     476           0 :   }
     477           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_NET ) ) {
     478           0 :     uchar const * dcache_entry = fd_net_rx_translate_frag( &in_ctx->net_rx, chunk, ctl, sz );
     479           0 :     fd_memcpy( ctx->buffer, dcache_entry, sz );
     480           0 :     return;
     481           0 :   }
     482             : 
     483           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_GOSSIP ) ) {
     484           0 :     if( FD_UNLIKELY( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark || sz>in_ctx->mtu ) ) {
     485           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in_ctx->chunk0, in_ctx->wmark ));
     486           0 :     }
     487           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
     488           0 :     fd_memcpy( ctx->buffer, dcache_entry, sz );
     489           0 :     return;
     490           0 :   }
     491             : 
     492           0 :   if( FD_LIKELY  ( in_kind==IN_KIND_SHRED  ) ) {
     493           0 :     if( FD_UNLIKELY( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark || sz>in_ctx->mtu ) ) {
     494           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in_ctx->chunk0, in_ctx->wmark ));
     495           0 :     }
     496           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
     497           0 :     if( FD_LIKELY( sz > 0 ) ) fd_memcpy( ctx->buffer, dcache_entry, sz );
     498           0 :     return;
     499           0 :   }
     500             : 
     501           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_STAKE ) ) {
     502           0 :     return;
     503           0 :   }
     504             : 
     505           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_SNAP ) ) {
     506           0 :     if( FD_UNLIKELY( fd_ssmsg_sig_message( sig )!=FD_SSMSG_DONE ) ) ctx->snap_out_chunk = chunk;
     507           0 :     return;
     508           0 :   }
     509             : 
     510           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_SIGN ) ) {
     511           0 :     if( FD_UNLIKELY( chunk<in_ctx->chunk0 || chunk>in_ctx->wmark || sz>in_ctx->mtu ) ) {
     512           0 :       FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, in_ctx->chunk0, in_ctx->wmark ));
     513           0 :     }
     514           0 :     uchar const * dcache_entry = fd_chunk_to_laddr_const( in_ctx->mem, chunk );
     515           0 :     fd_memcpy( ctx->buffer, dcache_entry, sz );
     516           0 :     return;
     517           0 :   }
     518             : 
     519           0 :   FD_LOG_ERR(( "Frag from unknown link (kind=%u in_idx=%lu)", in_kind, in_idx ));
     520           0 : }
     521             : 
     522             : static inline void
     523             : after_snap( ctx_t * ctx,
     524             :                  ulong                  sig,
     525           0 :                  uchar const          * chunk ) {
     526           0 :   if( FD_UNLIKELY( fd_ssmsg_sig_message( sig )!=FD_SSMSG_DONE ) ) return;
     527           0 :   fd_snapshot_manifest_t * manifest = (fd_snapshot_manifest_t *)chunk;
     528             : 
     529           0 :   fd_forest_init( ctx->forest, manifest->slot );
     530           0 :   FD_TEST( fd_forest_root_slot( ctx->forest )!=ULONG_MAX );
     531             : 
     532           0 : }
     533             : 
     534             : static inline void
     535           0 : after_contact( ctx_t * ctx, fd_gossip_update_message_t const * msg ) {
     536           0 :   fd_contact_info_t const * contact_info = msg->contact_info.contact_info;
     537           0 :   fd_ip4_port_t repair_peer = contact_info->sockets[ FD_CONTACT_INFO_SOCKET_SERVE_REPAIR ];
     538           0 :   if( FD_UNLIKELY( !repair_peer.addr || !repair_peer.port ) ) return;
     539           0 :   fd_policy_peer_t const * peer = fd_policy_peer_insert( ctx->policy, &contact_info->pubkey, &repair_peer );
     540           0 :   if( peer ) {
     541             :     /* The repair process uses a Ping-Pong protocol that incurs one
     542             :        round-trip time (RTT) for the initial repair request.  To
     543             :        optimize this, we proactively send a placeholder repair request
     544             :        as soon as we receive a peer's contact information for the first
     545             :        time, effectively prepaying the RTT cost. */
     546           0 :     fd_repair_msg_t * init = fd_repair_shred( ctx->protocol, &contact_info->pubkey, (ulong)fd_log_wallclock()/1000000L, 0, 0, 0 );
     547           0 :     fd_signs_queue_push( ctx->sign_queue, (sign_pending_t){ .msg = *init } );
     548           0 :   }
     549           0 : }
     550             : 
     551             : static inline void
     552             : after_sign( ctx_t             * ctx,
     553             :             ulong               in_idx,
     554             :             ulong               sig,
     555           0 :             fd_stem_context_t * stem ) {
     556           0 :   ulong pending_key = sig >> 32;
     557             :   /* Look up the pending request. Since the repair_sign links are
     558             :      reliable, the incoming sign_repair fragments represent a complete
     559             :      set of the previously sent outgoing messages. However, with
     560             :      multiple sign tiles, the responses may arrive interleaved. */
     561             : 
     562             :   /* Find which sign tile sent this response and increment its credits */
     563           0 :   for( uint i = 0; i < ctx->repair_sign_cnt; i++ ) {
     564           0 :     if( ctx->repair_sign_out_ctx[i].in_idx == in_idx ) {
     565           0 :       if( ctx->repair_sign_out_ctx[i].credits < ctx->repair_sign_out_ctx[i].max_credits ) {
     566           0 :         ctx->repair_sign_out_ctx[i].credits++;
     567           0 :       }
     568           0 :       break;
     569           0 :     }
     570           0 :   }
     571             : 
     572           0 :   sign_req_t * pending_ = fd_signs_map_query( ctx->signs_map, pending_key, NULL );
     573           0 :   if( FD_UNLIKELY( !pending_ ) ) FD_LOG_CRIT(( "No pending request found for key %lu", pending_key ));
     574             : 
     575           0 :   sign_req_t   pending[1] = { *pending_ }; /* Make a copy of the pending request so we can sign_map_remove immediately. */
     576           0 :   sign_map_remove( ctx, pending_key );
     577             : 
     578             :   /* Thhis is a pong message */
     579           0 :   if( FD_UNLIKELY( pending->msg.kind == FD_REPAIR_KIND_PONG ) ) {
     580           0 :     fd_memcpy( pending->msg.pong.sig, ctx->buffer, 64UL );
     581           0 :     send_packet( ctx, stem, 1, pending->pong_data.peer_addr.addr, pending->pong_data.peer_addr.port, pending->pong_data.daddr, pending->buf, fd_repair_sz( &pending->msg ), fd_frag_meta_ts_comp( fd_tickcount() ) );
     582           0 :     return;
     583           0 :   }
     584             : 
     585             :   /* Inject the signature into the pending request */
     586           0 :   fd_memcpy( pending->buf + 4, ctx->buffer, 64UL );
     587           0 :   uint  src_ip4 = 0U;
     588             : 
     589             :   /* This is a warmup message */
     590           0 :   if( FD_UNLIKELY( pending->msg.kind == FD_REPAIR_KIND_SHRED && pending->msg.shred.slot == 0 ) ) {
     591           0 :     fd_policy_peer_t * active = fd_policy_peer_query( ctx->policy, &pending->msg.shred.to );
     592           0 :     if( FD_UNLIKELY( active ) ) send_packet( ctx, stem, 1, active->ip4, active->port, src_ip4, pending->buf, pending->buflen, fd_frag_meta_ts_comp( fd_tickcount() ) );
     593           0 :     else { /* This is a warmup request for a peer that is no longer active.  There's no reason to pick another peer for a warmup rq, so just drop it. */ }
     594           0 :     return;
     595           0 :   }
     596             : 
     597             :   /* This is a regular repair shred request
     598             : 
     599             :      TODO: anyways to make this less complicated? Essentially we need to
     600             :      ensure we always send out any shred requests we have, because policy_next
     601             :      has no way to revisit a shred.  But the fact that peers can drop out
     602             :      of the active peer list makes this complicated.
     603             : 
     604             :      1. If the peer is still there (common), it's fine.
     605             :      2. If the peer is not there, we can select another peer and send the request.
     606             :      3. If the peer is not there, and we have no other peers, we can add
     607             :         this request to the inflights table, pretend we've sent it and
     608             :         let the inflight timeout request it down the line.
     609             :   */
     610           0 :   fd_policy_peer_t * active         = fd_policy_peer_query( ctx->policy, &pending->msg.shred.to );
     611           0 :   int                is_regular_req = pending->msg.kind == FD_REPAIR_KIND_SHRED && pending->msg.shred.nonce > 0; // not a highest/orphan request
     612             : 
     613           0 :   if( FD_UNLIKELY( !active ) ) {
     614           0 :     fd_pubkey_t const * new_peer = fd_policy_peer_select( ctx->policy );
     615           0 :     if( FD_LIKELY( new_peer ) ) {
     616             :       /* We have a new peer, so we can send the request */
     617           0 :       pending->msg.shred.to = *new_peer;
     618           0 :       fd_signs_queue_push( ctx->sign_queue, (sign_pending_t){ .msg = pending->msg } );
     619           0 :     }
     620             : 
     621           0 :     if( FD_UNLIKELY( !new_peer && is_regular_req ) ) {
     622             :       /* This is real devastation - we clearly had a peer at the time of
     623             :          making this request, but for some reason we now have ZERO
     624             :          peers. The only thing we can do is to add this artificially to
     625             :          the inflights table, pretend we've sent it and let the inflight
     626             :          timeout request it down the line. */
     627           0 :       fd_inflights_request_insert( ctx->inflights, pending->msg.shred.nonce, &pending->msg.shred.to, pending->msg.shred.slot, pending->msg.shred.shred_idx );
     628           0 :     }
     629           0 :     return;
     630           0 :   }
     631             :   /* Happy path - all is well, our peer didn't drop out from beneath us. */
     632           0 :   if( FD_LIKELY( is_regular_req ) ) {
     633           0 :     fd_inflights_request_insert( ctx->inflights, pending->msg.shred.nonce, &pending->msg.shred.to, pending->msg.shred.slot, pending->msg.shred.shred_idx );
     634           0 :     fd_policy_peer_request_update( ctx->policy, &pending->msg.shred.to );
     635           0 :   }
     636           0 :   send_packet( ctx, stem, 1, active->ip4, active->port, src_ip4, pending->buf, pending->buflen, fd_frag_meta_ts_comp( fd_tickcount() ) );
     637           0 : }
     638             : 
     639             : static inline void
     640             : after_shred( ctx_t      * ctx,
     641             :              ulong        sig,
     642             :              fd_shred_t * shred,
     643           0 :              ulong        nonce ) {
     644             :   /* Insert the shred sig (shared by all shred members in the FEC set)
     645             :       into the map. */
     646           0 :   int is_code = fd_shred_is_code( fd_shred_type( shred->variant ) );
     647           0 :   int src = fd_disco_shred_out_shred_sig_is_turbine( sig ) ? SHRED_SRC_TURBINE : SHRED_SRC_REPAIR;
     648           0 :   if( FD_LIKELY( !is_code ) ) {
     649           0 :     long rtt = 0;
     650           0 :     fd_pubkey_t peer;
     651           0 :     if( FD_UNLIKELY( src == SHRED_SRC_REPAIR && ( rtt = fd_inflights_request_remove( ctx->inflights, nonce, &peer ) ) > 0 ) ) {
     652           0 :       fd_policy_peer_response_update( ctx->policy, &peer, rtt );
     653           0 :       fd_histf_sample( ctx->metrics->response_latency, (ulong)rtt );
     654           0 :     }
     655             : 
     656           0 :     int slot_complete = !!(shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE);
     657           0 :     int ref_tick      = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
     658           0 :     fd_forest_blk_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off );
     659           0 :     if( FD_UNLIKELY( ctx->profiler.enabled && shred->slot == ctx->profiler.end_slot ) ) fd_forest_blk_parent_update( ctx->forest, shred->slot, shred->slot - shred->data.parent_off );
     660           0 :     fd_forest_data_shred_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off, shred->idx, shred->fec_set_idx, slot_complete, ref_tick, src );
     661           0 :   } else {
     662           0 :     fd_forest_code_shred_insert( ctx->forest, shred->slot, shred->idx );
     663           0 :   }
     664           0 : }
     665             : 
     666             : static inline void
     667             : after_fec( ctx_t      * ctx,
     668           0 :            fd_shred_t * shred ) {
     669             : 
     670             :   /* When this is a FEC completes msg, it is implied that all the
     671             :      other shreds in the FEC set can also be inserted.  Shred inserts
     672             :      into the forest are idempotent so it is fine to insert the same
     673             :      shred multiple times. */
     674             : 
     675           0 :   int slot_complete = !!( shred->data.flags & FD_SHRED_DATA_FLAG_SLOT_COMPLETE );
     676           0 :   int ref_tick      = shred->data.flags & FD_SHRED_DATA_REF_TICK_MASK;
     677             : 
     678           0 :   fd_forest_blk_t * ele = fd_forest_blk_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off );
     679           0 :   fd_forest_fec_insert( ctx->forest, shred->slot, shred->slot - shred->data.parent_off, shred->idx, shred->fec_set_idx, slot_complete, ref_tick );
     680           0 :   fd_fec_sig_t * fec_sig = fd_fec_sig_query( ctx->fec_sigs, (shred->slot << 32) | shred->fec_set_idx, NULL );
     681           0 :   if( FD_LIKELY( fec_sig ) ) fd_fec_sig_remove( ctx->fec_sigs, fec_sig );
     682           0 :   FD_TEST( ele ); /* must be non-empty */
     683             : 
     684             :   /* metrics for completed slots */
     685           0 :   if( FD_UNLIKELY( ele->complete_idx != UINT_MAX && ele->buffered_idx==ele->complete_idx &&
     686           0 :                    0==memcmp( ele->cmpl, ele->fecs, sizeof(fd_forest_blk_idxs_t) * fd_forest_blk_idxs_word_cnt ) ) ) {
     687           0 :     long now = fd_tickcount();
     688           0 :     long start_ts = ele->first_req_ts == 0 || ele->slot >= ctx->turbine_slot0 ? ele->first_shred_ts : ele->first_req_ts;
     689           0 :     ulong duration_ticks = (ulong)(now - start_ts);
     690           0 :     fd_histf_sample( ctx->metrics->slot_compl_time, duration_ticks );
     691           0 :     fd_repair_metrics_add_slot( ctx->slot_metrics, ele->slot, start_ts, now, ele->repair_cnt, ele->turbine_cnt );
     692           0 :     FD_LOG_INFO(( "slot is complete %lu. num_data_shreds: %u, num_repaired: %u, num_turbine: %u, num_recovered: %u, duration: %.2f ms", ele->slot, ele->complete_idx + 1, ele->repair_cnt, ele->turbine_cnt, ele->recovered_cnt, (double)fd_metrics_convert_ticks_to_nanoseconds(duration_ticks) / 1e6 ));
     693           0 :   }
     694             : 
     695           0 :   if( FD_UNLIKELY( ctx->profiler.enabled ) ) {
     696             :     // If turbine slot 0 is in the consumed frontier, and it satisfies the
     697             :     // above conditions for completions, then catchup is complete
     698           0 :     fd_forest_blk_t * turbine0     = fd_forest_query( ctx->forest, ctx->turbine_slot0 );
     699           0 :     ulong             turbine0_idx = fd_forest_pool_idx( fd_forest_pool( ctx->forest ), turbine0 );
     700           0 :     fd_forest_ref_t * consumed     = fd_forest_consumed_ele_query( fd_forest_consumed( ctx->forest ), &turbine0_idx, NULL, fd_forest_conspool( ctx->forest ) );
     701           0 :     if( FD_UNLIKELY( consumed && turbine0->complete_idx != UINT_MAX && turbine0->complete_idx == turbine0->buffered_idx &&
     702           0 :                      0==memcmp( turbine0->cmpl, turbine0->fecs, sizeof(fd_forest_blk_idxs_t) * fd_forest_blk_idxs_word_cnt ) ) ) {
     703           0 :       FD_COMPILER_MFENCE();
     704           0 :       FD_VOLATILE( ctx->profiler.complete ) = 1;
     705           0 :     }
     706           0 :   }
     707           0 : }
     708             : 
     709             : static inline void
     710             : after_net( ctx_t * ctx,
     711           0 :            ulong   sz  ) {
     712           0 :   fd_eth_hdr_t * eth; fd_ip4_hdr_t * ip4; fd_udp_hdr_t * udp;
     713           0 :   uchar * data; ulong data_sz;
     714           0 :   FD_TEST( fd_ip4_udp_hdr_strip( ctx->buffer, sz, &data, &data_sz, &eth, &ip4, &udp ) );
     715           0 :   fd_ip4_port_t peer_addr = { .addr=ip4->saddr, .port=udp->net_sport };
     716           0 :   if( FD_UNLIKELY( data_sz != sizeof(fd_repair_ping_t) ) ) {
     717           0 :     ctx->metrics->malformed_ping++;
     718           0 :     return;
     719           0 :   }
     720           0 :   fd_repair_ping_t * res = (fd_repair_ping_t *)fd_type_pun( data );
     721           0 :   if( FD_UNLIKELY( res->kind != FD_REPAIR_KIND_PING ) ) {
     722           0 :     ctx->metrics->malformed_ping++;
     723           0 :     return;
     724           0 :   }
     725           0 :   fd_repair_msg_t * pong = fd_repair_pong( ctx->protocol, &res->ping.hash );
     726           0 :   fd_signs_queue_push( ctx->sign_queue, (sign_pending_t){ .msg = *pong, .pong_data = { .peer_addr = peer_addr, .hash = res->ping.hash, .daddr = ip4->daddr } } );
     727           0 : }
     728             : 
     729             : static inline void
     730             : after_evict( ctx_t * ctx,
     731           0 :              ulong   sig ) {
     732           0 :   ulong spilled_slot        = fd_disco_shred_out_shred_sig_slot       ( sig );
     733           0 :   uint  spilled_fec_set_idx = fd_disco_shred_out_shred_sig_fec_set_idx( sig );
     734           0 :   uint  spilled_max_idx     = fd_disco_shred_out_shred_sig_data_cnt   ( sig );
     735             : 
     736           0 :   fd_forest_fec_clear( ctx->forest, spilled_slot, spilled_fec_set_idx, spilled_max_idx );
     737           0 : }
     738             : 
     739             : static void
     740             : after_frag( ctx_t * ctx,
     741             :             ulong                  in_idx,
     742             :             ulong                  seq    FD_PARAM_UNUSED,
     743             :             ulong                  sig,
     744             :             ulong                  sz,
     745             :             ulong                  tsorig FD_PARAM_UNUSED,
     746             :             ulong                  tspub  FD_PARAM_UNUSED,
     747           0 :             fd_stem_context_t *    stem ) {
     748           0 :   if( FD_UNLIKELY( ctx->skip_frag ) ) return;
     749             : 
     750           0 :   ctx->stem = stem;
     751             : 
     752           0 :   uint in_kind = ctx->in_kind[ in_idx ];
     753           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_GENESIS && sig==GENESI_SIG_BOOTSTRAP_COMPLETED ) ) {
     754           0 :     fd_forest_init( ctx->forest, 0 );
     755           0 :     return;
     756           0 :   }
     757             : 
     758           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_GOSSIP ) ) {
     759           0 :     fd_gossip_update_message_t const * msg = (fd_gossip_update_message_t const *)fd_type_pun_const( ctx->buffer );
     760           0 :     if( FD_LIKELY( sig==FD_GOSSIP_UPDATE_TAG_CONTACT_INFO ) ){
     761           0 :       after_contact( ctx, msg );
     762           0 :     } else {
     763           0 :       fd_policy_peer_remove( ctx->policy, (fd_pubkey_t const *)fd_type_pun_const( msg->origin_pubkey ) );
     764           0 :     }
     765           0 :     return;
     766           0 :   }
     767             : 
     768           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_TOWER ) ) {
     769           0 :     if( FD_LIKELY( sig==FD_TOWER_SIG_SLOT_DONE ) ) {
     770           0 :       fd_tower_slot_done_t const * msg = (fd_tower_slot_done_t const *)fd_type_pun_const( ctx->buffer );
     771           0 :       if( FD_LIKELY( msg->root_slot!=ULONG_MAX ) ) fd_forest_publish( ctx->forest, msg->root_slot );
     772           0 :     }
     773           0 :     return;
     774           0 :   }
     775             : 
     776           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_SIGN ) ) {
     777           0 :     after_sign( ctx, in_idx, sig, stem );
     778           0 :     return;
     779           0 :   }
     780             : 
     781           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_SHRED ) ) {
     782             : 
     783             :     /* There are 3 message types from shred:
     784             :         1. resolver evict - incomplete FEC set is evicted by resolver
     785             :         2. fec complete   - FEC set is completed by resolver. Also contains a shred.
     786             :         3. shred          - new shred
     787             : 
     788             :         Msgs 2 and 3 have a shred header in ctx->buffer */
     789             : 
     790           0 :     int resolver_evicted = sz == 0;
     791           0 :     int fec_completes    = sz == FD_SHRED_DATA_HEADER_SZ + sizeof(fd_hash_t) + sizeof(fd_hash_t) + sizeof(int);
     792           0 :     if( FD_UNLIKELY( resolver_evicted ) ) {
     793           0 :       after_evict( ctx, sig );
     794           0 :       return;
     795           0 :     }
     796             : 
     797           0 :     fd_shred_t * shred = (fd_shred_t *)fd_type_pun( ctx->buffer );
     798           0 :     uint         nonce = FD_LOAD(uint, ctx->buffer + fd_shred_header_sz( shred->variant ) );
     799           0 :     if( FD_UNLIKELY( shred->slot <= fd_forest_root_slot( ctx->forest ) ) ) {
     800           0 :       FD_LOG_INFO(( "shred %lu %u %u too old, ignoring", shred->slot, shred->idx, shred->fec_set_idx ));
     801           0 :       return;
     802           0 :     };
     803             : 
     804           0 :     if( FD_UNLIKELY( ctx->profiler.enabled && ctx->turbine_slot0 != ULONG_MAX && shred->slot > ctx->turbine_slot0 ) ) return;
     805           0 : #   if LOGGING
     806           0 :     if( FD_UNLIKELY( shred->slot > ctx->metrics->current_slot ) ) {
     807           0 :       FD_LOG_INFO(( "\n\n[Turbine]\n"
     808           0 :                     "slot:             %lu\n"
     809           0 :                     "root:             %lu\n",
     810           0 :                     shred->slot,
     811           0 :                     fd_forest_root_slot( ctx->forest ) ));
     812           0 :     }
     813           0 : #   endif
     814           0 :     ctx->metrics->current_slot  = fd_ulong_max( shred->slot, ctx->metrics->current_slot );
     815           0 :     if( FD_UNLIKELY( ctx->turbine_slot0 == ULONG_MAX ) ) {
     816             : 
     817           0 :       if( FD_UNLIKELY( ctx->profiler.enabled ) ) {
     818             :         /* we wait until the first turbine shred arrives to kick off
     819             :            the profiler.  This is to let gossip peers accumulate similar
     820             :            to a regular Firedancer run. */
     821           0 :         fd_forest_blk_insert( ctx->forest, ctx->profiler.end_slot, ctx->profiler.end_slot - 1 );
     822           0 :         fd_forest_code_shred_insert( ctx->forest, ctx->profiler.end_slot, 0 );
     823             : 
     824           0 :         ctx->turbine_slot0 = ctx->profiler.end_slot;
     825           0 :         fd_repair_metrics_set_turbine_slot0( ctx->slot_metrics, ctx->profiler.end_slot );
     826           0 :         fd_policy_set_turbine_slot0( ctx->policy, ctx->profiler.end_slot );
     827           0 :         return;
     828           0 :       }
     829             : 
     830           0 :       ctx->turbine_slot0 = shred->slot;
     831           0 :       fd_repair_metrics_set_turbine_slot0( ctx->slot_metrics, shred->slot );
     832           0 :       fd_policy_set_turbine_slot0( ctx->policy, shred->slot );
     833           0 :     }
     834             : 
     835           0 :     if( FD_UNLIKELY( fec_completes ) ) {
     836           0 :       after_fec( ctx, shred );
     837           0 :     } else {
     838             :       /* Don't want to reinsert the shred sig for an already complete FEC set */
     839           0 :       fd_fec_sig_t * fec_sig = fd_fec_sig_query( ctx->fec_sigs, (shred->slot << 32) | shred->fec_set_idx, NULL );
     840           0 :       if( FD_UNLIKELY( !fec_sig ) ) {
     841           0 :         fec_sig = fd_fec_sig_insert( ctx->fec_sigs, (shred->slot << 32) | shred->fec_set_idx );
     842           0 :         memcpy( fec_sig->sig, shred->signature, sizeof(fd_ed25519_sig_t) );
     843           0 :       }
     844           0 :       after_shred( ctx, sig, shred, nonce );
     845           0 :     }
     846             : 
     847             :     /* Check if there are FECs to force complete. Algorithm: window
     848             :        through the idxs in interval [i, j). If j = next fec_set_idx
     849             :        then we know we can force complete the FEC set interval [i, j)
     850             :        (assuming it wasn't already completed based on `cmpl`). */
     851             : 
     852           0 :     fd_forest_blk_t * blk = fd_forest_query( ctx->forest, shred->slot );
     853           0 :     if( blk ) {
     854           0 :       uint i = blk->consumed_idx + 1;
     855           0 :       for( uint j = i; j < blk->buffered_idx + 1; j++ ) {
     856           0 :         if( FD_UNLIKELY( fd_forest_blk_idxs_test( blk->fecs, j ) ) ) {
     857           0 :           if( FD_UNLIKELY( fd_forest_blk_idxs_test( blk->cmpl, j ) ) ) {
     858             :             /* already been completed without force complete */
     859           0 :           } else {
     860             :             /* force completeable */
     861           0 :             fd_fec_sig_t * fec_sig  = fd_fec_sig_query( ctx->fec_sigs, (shred->slot << 32) | i, NULL );
     862           0 :             if( FD_LIKELY( fec_sig ) ) {
     863           0 :               ulong          sig      = fd_ulong_load_8( fec_sig->sig );
     864           0 :               ulong          tile_idx = sig % ctx->shred_tile_cnt;
     865           0 :               uint           last_idx = j - i;
     866             : 
     867           0 :               uchar * chunk = fd_chunk_to_laddr( ctx->shred_out_ctx[tile_idx].mem, ctx->shred_out_ctx[tile_idx].chunk );
     868           0 :               memcpy( chunk, fec_sig->sig, sizeof(fd_ed25519_sig_t) );
     869           0 :               fd_fec_sig_remove( ctx->fec_sigs, fec_sig );
     870           0 :               fd_stem_publish( stem, ctx->shred_out_ctx[tile_idx].idx, last_idx, ctx->shred_out_ctx[tile_idx].chunk, sizeof(fd_ed25519_sig_t), 0UL, 0UL, 0UL );
     871           0 :               ctx->shred_out_ctx[tile_idx].chunk = fd_dcache_compact_next( ctx->shred_out_ctx[tile_idx].chunk, sizeof(fd_ed25519_sig_t), ctx->shred_out_ctx[tile_idx].chunk0, ctx->shred_out_ctx[tile_idx].wmark );
     872           0 :             }
     873           0 :           }
     874             :           /* advance consumed */
     875           0 :           blk->consumed_idx = j;
     876           0 :           i = j + 1;
     877           0 :         }
     878           0 :       }
     879           0 :     }
     880             :     /* update metrics */
     881           0 :     ctx->metrics->repaired_slots = fd_forest_highest_repaired_slot( ctx->forest );
     882           0 :     return;
     883           0 :   }
     884             : 
     885           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_STAKE ) ) {
     886           0 :     return;
     887           0 :   }
     888             : 
     889           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_SNAP ) ) {
     890           0 :     after_snap( ctx, sig, fd_chunk_to_laddr( ctx->in_links[ in_idx ].mem, ctx->snap_out_chunk ) );
     891           0 :     return;
     892           0 :   }
     893             : 
     894           0 :   if( FD_UNLIKELY( in_kind==IN_KIND_NET ) ) {
     895           0 :     after_net( ctx, sz );
     896           0 :     return;
     897           0 :   }
     898           0 : }
     899             : 
     900             : static inline void
     901             : after_credit( ctx_t *             ctx,
     902             :               fd_stem_context_t * stem FD_PARAM_UNUSED,
     903             :               int *               opt_poll_in FD_PARAM_UNUSED,
     904           0 :               int *               charge_busy ) {
     905           0 :   long now = fd_log_wallclock();
     906             : 
     907             :   /* Verify that there is at least one sign tile with available credits.
     908             :      If not, we can't send any requests and leave early. */
     909           0 :   out_ctx_t * sign_out = sign_avail_credits( ctx );
     910           0 :   if( FD_UNLIKELY( !sign_out ) ) {
     911           0 :     ctx->metrics->sign_tile_unavail++;
     912           0 :     return;
     913           0 :   }
     914           0 :   if( FD_UNLIKELY( !fd_signs_queue_empty( ctx->sign_queue ) ) ) {
     915           0 :     sign_pending_t signable = fd_signs_queue_pop( ctx->sign_queue );
     916           0 :     fd_repair_send_sign_request( ctx, sign_out, &signable.msg, signable.msg.kind == FD_REPAIR_KIND_PONG ? &signable.pong_data : NULL );
     917           0 :     *charge_busy = 1;
     918           0 :     return;
     919           0 :   }
     920             : 
     921           0 :   if( FD_UNLIKELY( fd_inflights_should_drain( ctx->inflights, now ) ) ) {
     922           0 :     ulong nonce; ulong slot; ulong shred_idx;
     923           0 :     *charge_busy = 1;
     924           0 :     fd_inflights_request_pop( ctx->inflights, &nonce, &slot, &shred_idx );
     925           0 :     fd_forest_blk_t * blk = fd_forest_query( ctx->forest, slot );
     926           0 :     if( FD_UNLIKELY( blk && !fd_forest_blk_idxs_test( blk->idxs, shred_idx ) ) ) {
     927           0 :       fd_pubkey_t const * peer = fd_policy_peer_select( ctx->policy );
     928           0 :       ctx->metrics->rerequest++;
     929           0 :       if( FD_UNLIKELY( !peer ) ) {
     930             :         /* No peers. But we CANNOT lose this request. */
     931             :         /* Add this request to the inflights table, pretend we've sent it and let the inflight timeout request it down the line. */
     932           0 :         fd_hash_t hash = { .ul[0] = 0 };
     933           0 :         fd_inflights_request_insert( ctx->inflights, ctx->policy->nonce++, &hash, slot, shred_idx );
     934           0 :       } else {
     935           0 :         fd_repair_msg_t * msg = fd_repair_shred( ctx->protocol, peer, (ulong)((ulong)now / 1e6L), ctx->policy->nonce++, slot, shred_idx );
     936           0 :         fd_repair_send_sign_request( ctx, sign_out, msg, NULL );
     937           0 :         return;
     938           0 :       }
     939           0 :     }
     940           0 :   }
     941             : 
     942           0 :   fd_repair_msg_t const * cout = fd_policy_next( ctx->policy, ctx->forest, ctx->protocol, now, ctx->metrics->current_slot, charge_busy );
     943           0 :   if( FD_UNLIKELY( !cout ) ) return;
     944             : 
     945           0 :   fd_repair_send_sign_request( ctx, sign_out, cout, NULL );
     946           0 : }
     947             : 
     948             : static inline void
     949           0 : during_housekeeping( ctx_t * ctx ) {
     950           0 :   (void)ctx;
     951             : # if DEBUG_LOGGING
     952             :   long now = fd_log_wallclock();
     953             :   if( FD_UNLIKELY( now - ctx->tsdebug > (long)10e9 ) ) {
     954             :     fd_forest_print( ctx->forest );
     955             :     ctx->tsdebug = fd_log_wallclock();
     956             :   }
     957             : # endif
     958           0 : }
     959             : 
     960             : static void
     961             : privileged_init( fd_topo_t *      topo,
     962           0 :                  fd_topo_tile_t * tile ) {
     963           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     964             : 
     965           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     966           0 :   ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(ctx_t), sizeof(ctx_t) );
     967           0 :   fd_memset( ctx, 0, sizeof(ctx_t) );
     968             : 
     969           0 :   uchar const * identity_key = fd_keyload_load( tile->repair.identity_key_path, /* pubkey only: */ 0 );
     970           0 :   fd_memcpy( ctx->identity_public_key.uc, identity_key + 32UL, sizeof(fd_pubkey_t) );
     971             : 
     972           0 :   FD_TEST( fd_rng_secure( &ctx->repair_seed, sizeof(ulong) ) );
     973           0 : }
     974             : 
     975             : static void
     976             : unprivileged_init( fd_topo_t *      topo,
     977           0 :                    fd_topo_tile_t * tile ) {
     978           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     979             : 
     980           0 :   ulong total_sign_depth = tile->repair.repair_sign_depth * tile->repair.repair_sign_cnt;
     981           0 :   int   lg_sign_depth    = fd_ulong_find_msb( fd_ulong_pow2_up(total_sign_depth) ) + 1;
     982             : 
     983           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     984           0 :   ctx_t * ctx       = FD_SCRATCH_ALLOC_APPEND( l, alignof(ctx_t),            sizeof(ctx_t)                                                    );
     985           0 :   ctx->protocol     = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_align(),         fd_repair_footprint     ()                                       );
     986           0 :   ctx->forest       = FD_SCRATCH_ALLOC_APPEND( l, fd_forest_align(),         fd_forest_footprint     ( tile->repair.slot_max )                );
     987           0 :   ctx->policy       = FD_SCRATCH_ALLOC_APPEND( l, fd_policy_align(),         fd_policy_footprint     ( FD_NEEDED_KEY_MAX, FD_ACTIVE_KEY_MAX ) );
     988           0 :   ctx->inflights    = FD_SCRATCH_ALLOC_APPEND( l, fd_inflights_align(),      fd_inflights_footprint  ()                                       );
     989           0 :   ctx->fec_sigs     = FD_SCRATCH_ALLOC_APPEND( l, fd_fec_sig_align(),        fd_fec_sig_footprint    ( 20 )                                   );
     990           0 :   ctx->signs_map    = FD_SCRATCH_ALLOC_APPEND( l, fd_signs_map_align(),      fd_signs_map_footprint  ( lg_sign_depth )                        );
     991           0 :   ctx->sign_queue   = FD_SCRATCH_ALLOC_APPEND( l, fd_signs_queue_align(),    fd_signs_queue_footprint()                                       );
     992           0 :   ctx->slot_metrics = FD_SCRATCH_ALLOC_APPEND( l, fd_repair_metrics_align(), fd_repair_metrics_footprint()                                    );
     993           0 :   FD_TEST( FD_SCRATCH_ALLOC_FINI( l, scratch_align() ) == (ulong)scratch + scratch_footprint( tile ) );
     994             : 
     995           0 :   ctx->protocol     = fd_repair_join        ( fd_repair_new        ( ctx->protocol, &ctx->identity_public_key                              ) );
     996           0 :   ctx->forest       = fd_forest_join        ( fd_forest_new        ( ctx->forest,   tile->repair.slot_max, ctx->repair_seed                ) );
     997           0 :   ctx->policy       = fd_policy_join        ( fd_policy_new        ( ctx->policy,   FD_NEEDED_KEY_MAX, FD_ACTIVE_KEY_MAX, ctx->repair_seed ) );
     998           0 :   ctx->inflights    = fd_inflights_join     ( fd_inflights_new     ( ctx->inflights                                                        ) );
     999           0 :   ctx->fec_sigs     = fd_fec_sig_join       ( fd_fec_sig_new       ( ctx->fec_sigs, 20, 0UL                                                ) );
    1000           0 :   ctx->signs_map    = fd_signs_map_join     ( fd_signs_map_new     ( ctx->signs_map, lg_sign_depth, 0UL                                    ) );
    1001           0 :   ctx->sign_queue   = fd_signs_queue_join   ( fd_signs_queue_new   ( ctx->sign_queue                                                       ) );
    1002           0 :   ctx->slot_metrics = fd_repair_metrics_join( fd_repair_metrics_new( ctx->slot_metrics                                                     ) );
    1003             : 
    1004             :   /* Process in links */
    1005             : 
    1006           0 :   if( FD_UNLIKELY( tile->in_cnt > MAX_IN_LINKS ) ) FD_LOG_ERR(( "repair tile has too many input links" ));
    1007             : 
    1008           0 :   uint  sign_repair_in_idx[ MAX_SIGN_TILE_CNT ] = {0};
    1009           0 :   uint  sign_repair_idx  = 0;
    1010           0 :   ulong sign_link_depth  = 0;
    1011             : 
    1012           0 :   for( uint in_idx=0U; in_idx<(tile->in_cnt); in_idx++ ) {
    1013           0 :     fd_topo_link_t * link = &topo->links[ tile->in_link_id[ in_idx ] ];
    1014           0 :     if( 0==strcmp( link->name, "net_repair" ) ) {
    1015           0 :       ctx->in_kind[ in_idx ] = IN_KIND_NET;
    1016           0 :       fd_net_rx_bounds_init( &ctx->in_links[ in_idx ].net_rx, link->dcache );
    1017           0 :       continue;
    1018           0 :     } else if( 0==strcmp( link->name, "sign_repair" ) ) {
    1019           0 :       ctx->in_kind[ in_idx ]                  = IN_KIND_SIGN;
    1020           0 :       sign_repair_in_idx[ sign_repair_idx++ ] = in_idx;
    1021           0 :       sign_link_depth                         = link->depth;
    1022           0 :     }
    1023           0 :     else if( 0==strcmp( link->name, "gossip_out"   ) ) ctx->in_kind[ in_idx ] = IN_KIND_GOSSIP;
    1024           0 :     else if( 0==strcmp( link->name, "tower_out"    ) ) ctx->in_kind[ in_idx ] = IN_KIND_TOWER;
    1025           0 :     else if( 0==strcmp( link->name, "shred_out"    ) ) ctx->in_kind[ in_idx ] = IN_KIND_SHRED;
    1026           0 :     else if( 0==strcmp( link->name, "snapin_manif" ) ) ctx->in_kind[ in_idx ] = IN_KIND_SNAP;
    1027           0 :     else if( 0==strcmp( link->name, "replay_stake" ) ) ctx->in_kind[ in_idx ] = IN_KIND_STAKE;
    1028           0 :     else if( 0==strcmp( link->name, "genesi_out"   ) ) ctx->in_kind[ in_idx ] = IN_KIND_GENESIS;
    1029           0 :     else FD_LOG_ERR(( "repair tile has unexpected input link %s", link->name ));
    1030             : 
    1031           0 :     ctx->in_links[ in_idx ].mem    = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
    1032           0 :     ctx->in_links[ in_idx ].chunk0 = fd_dcache_compact_chunk0( ctx->in_links[ in_idx ].mem, link->dcache );
    1033           0 :     ctx->in_links[ in_idx ].wmark  = fd_dcache_compact_wmark ( ctx->in_links[ in_idx ].mem, link->dcache, link->mtu );
    1034           0 :     ctx->in_links[ in_idx ].mtu    = link->mtu;
    1035             : 
    1036           0 :     FD_TEST( fd_dcache_compact_is_safe( ctx->in_links[in_idx].mem, link->dcache, link->mtu, link->depth ) );
    1037           0 :   }
    1038             : 
    1039           0 :   ctx->net_out_idx       = UINT_MAX;
    1040           0 :   ctx->shred_tile_cnt    = 0;
    1041           0 :   ctx->repair_sign_cnt   = 0;
    1042           0 :   ctx->sign_rrobin_idx   = 0;
    1043             : 
    1044           0 :   for( uint out_idx=0U; out_idx<(tile->out_cnt); out_idx++ ) {
    1045           0 :     fd_topo_link_t * link = &topo->links[ tile->out_link_id[ out_idx ] ];
    1046             : 
    1047           0 :     if( 0==strcmp( link->name, "repair_net" ) ) {
    1048             : 
    1049           0 :       if( ctx->net_out_idx!=UINT_MAX ) continue; /* only use first net link */
    1050           0 :       ctx->net_out_idx    = out_idx;
    1051           0 :       ctx->net_out_mem    = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
    1052           0 :       ctx->net_out_chunk0 = fd_dcache_compact_chunk0( ctx->net_out_mem, link->dcache );
    1053           0 :       ctx->net_out_wmark  = fd_dcache_compact_wmark( ctx->net_out_mem, link->dcache, link->mtu );
    1054           0 :       ctx->net_out_chunk  = ctx->net_out_chunk0;
    1055             : 
    1056           0 :     } else if( 0==strcmp( link->name, "repair_shred" ) ) {
    1057             : 
    1058           0 :       out_ctx_t * shred_out = &ctx->shred_out_ctx[ ctx->shred_tile_cnt++ ];
    1059           0 :       shred_out->idx        = out_idx;
    1060           0 :       shred_out->mem        = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
    1061           0 :       shred_out->chunk0     = fd_dcache_compact_chunk0( shred_out->mem, link->dcache );
    1062           0 :       shred_out->wmark      = fd_dcache_compact_wmark( shred_out->mem, link->dcache, link->mtu );
    1063           0 :       shred_out->chunk      = shred_out->chunk0;
    1064             : 
    1065           0 :     } else if( 0==strcmp( link->name, "repair_sign" ) ) {
    1066             : 
    1067           0 :       out_ctx_t * repair_sign_out  = &ctx->repair_sign_out_ctx[ ctx->repair_sign_cnt ];
    1068           0 :       repair_sign_out->idx         = out_idx;
    1069           0 :       repair_sign_out->mem         = topo->workspaces[ topo->objs[ link->dcache_obj_id ].wksp_id ].wksp;
    1070           0 :       repair_sign_out->chunk0      = fd_dcache_compact_chunk0( repair_sign_out->mem, link->dcache );
    1071           0 :       repair_sign_out->wmark       = fd_dcache_compact_wmark( repair_sign_out->mem, link->dcache, link->mtu );
    1072           0 :       repair_sign_out->chunk       = repair_sign_out->chunk0;
    1073           0 :       repair_sign_out->in_idx      = sign_repair_in_idx[ ctx->repair_sign_cnt++ ]; /* match to the sign_repair input link */
    1074           0 :       repair_sign_out->max_credits = sign_link_depth;
    1075           0 :       repair_sign_out->credits     = sign_link_depth;
    1076             : 
    1077           0 :     } else {
    1078           0 :       FD_LOG_ERR(( "repair tile has unexpected output link %s", link->name ));
    1079           0 :     }
    1080           0 :   }
    1081           0 :   if( FD_UNLIKELY( ctx->net_out_idx==UINT_MAX       ) ) FD_LOG_ERR(( "Missing repair_net link" ));
    1082           0 :   if( FD_UNLIKELY( ctx->repair_sign_cnt!=sign_repair_idx ) ) {
    1083           0 :     FD_LOG_ERR(( "Mismatch between repair_sign output links (%lu) and sign_repair input links (%u)", ctx->repair_sign_cnt, sign_repair_idx ));
    1084           0 :   }
    1085             : 
    1086           0 :   FD_TEST( ctx->shred_tile_cnt == fd_topo_tile_name_cnt( topo, "shred" ) );
    1087             : 
    1088             : # if DEBUG_LOGGING
    1089             :   if( fd_signs_map_key_max( ctx->signs_map ) < tile->repair.repair_sign_depth * tile->repair.repair_sign_cnt ) {
    1090             :     FD_LOG_ERR(( "repair pending signs tracking map is too small: %lu < %lu.  Increase the key_max", fd_signs_map_key_max( ctx->signs_map ), tile->repair.repair_sign_depth * tile->repair.repair_sign_cnt ));
    1091             :   }
    1092             : # endif
    1093             : 
    1094           0 :   ctx->wksp = topo->workspaces[ topo->objs[ tile->tile_obj_id ].wksp_id ].wksp;
    1095           0 :   ctx->repair_intake_addr.port = fd_ushort_bswap( tile->repair.repair_intake_listen_port );
    1096           0 :   ctx->repair_serve_addr.port  = fd_ushort_bswap( tile->repair.repair_serve_listen_port  );
    1097             : 
    1098           0 :   ctx->net_id = (ushort)0;
    1099           0 :   fd_ip4_udp_hdr_init( ctx->intake_hdr, FD_REPAIR_MAX_PACKET_SIZE, 0, tile->repair.repair_intake_listen_port );
    1100           0 :   fd_ip4_udp_hdr_init( ctx->serve_hdr,  FD_REPAIR_MAX_PACKET_SIZE, 0, tile->repair.repair_serve_listen_port  );
    1101             : 
    1102             :   /* Repair set up */
    1103             : 
    1104           0 :   ctx->turbine_slot0 = ULONG_MAX;
    1105           0 :   FD_LOG_INFO(( "repair my addr - intake addr: " FD_IP4_ADDR_FMT ":%u, serve_addr: " FD_IP4_ADDR_FMT ":%u",
    1106           0 :     FD_IP4_ADDR_FMT_ARGS( ctx->repair_intake_addr.addr ), fd_ushort_bswap( ctx->repair_intake_addr.port ),
    1107           0 :     FD_IP4_ADDR_FMT_ARGS( ctx->repair_serve_addr.addr ), fd_ushort_bswap( ctx->repair_serve_addr.port ) ));
    1108             : 
    1109           0 :   memset( ctx->metrics, 0, sizeof(ctx->metrics) );
    1110             : 
    1111           0 :   fd_histf_join( fd_histf_new( ctx->metrics->slot_compl_time, FD_MHIST_SECONDS_MIN( REPAIR, SLOT_COMPLETE_TIME ),
    1112           0 :                                                               FD_MHIST_SECONDS_MAX( REPAIR, SLOT_COMPLETE_TIME ) ) );
    1113           0 :   fd_histf_join( fd_histf_new( ctx->metrics->response_latency, FD_MHIST_MIN( REPAIR, RESPONSE_LATENCY ),
    1114           0 :                                                                FD_MHIST_MAX( REPAIR, RESPONSE_LATENCY ) ) );
    1115             : 
    1116           0 :   ctx->tsdebug = fd_log_wallclock();
    1117           0 :   ctx->pending_key_next = 0;
    1118           0 :   ctx->profiler.enabled  = tile->repair.end_slot != 0UL;
    1119           0 :   ctx->profiler.end_slot = tile->repair.end_slot;
    1120           0 :   if( ctx->profiler.enabled ) {
    1121           0 :     ctx->metrics->current_slot = tile->repair.end_slot + 1; /* +1 to allow the turbine slot 0 to be completed */
    1122           0 :     ctx->profiler.complete     = 0;
    1123           0 :   }
    1124           0 : }
    1125             : 
    1126             : static ulong
    1127             : populate_allowed_seccomp( fd_topo_t const *      topo FD_PARAM_UNUSED,
    1128             :                           fd_topo_tile_t const * tile FD_PARAM_UNUSED,
    1129             :                           ulong                  out_cnt,
    1130           0 :                           struct sock_filter *   out ) {
    1131           0 :   populate_sock_filter_policy_fd_repair_tile(
    1132           0 :     out_cnt, out, (uint)fd_log_private_logfile_fd(), (uint)-1 );
    1133           0 :   return sock_filter_policy_fd_repair_tile_instr_cnt;
    1134           0 : }
    1135             : 
    1136             : static ulong
    1137             : populate_allowed_fds( fd_topo_t const *      topo FD_PARAM_UNUSED,
    1138             :                       fd_topo_tile_t const * tile FD_PARAM_UNUSED,
    1139             :                       ulong                  out_fds_cnt,
    1140           0 :                       int *                  out_fds ) {
    1141           0 :   if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
    1142             : 
    1143           0 :   ulong out_cnt = 0UL;
    1144           0 :   out_fds[ out_cnt++ ] = 2; /* stderr */
    1145           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
    1146           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
    1147           0 :   return out_cnt;
    1148           0 : }
    1149             : 
    1150             : static inline void
    1151           0 : metrics_write( ctx_t * ctx ) {
    1152           0 :   FD_MCNT_SET( REPAIR, CURRENT_SLOT,      ctx->metrics->current_slot );
    1153           0 :   FD_MCNT_SET( REPAIR, REPAIRED_SLOTS,    ctx->metrics->repaired_slots );
    1154           0 :   FD_MCNT_SET( REPAIR, REQUEST_PEERS,     fd_peer_pool_used( ctx->policy->peers.pool ) );
    1155           0 :   FD_MCNT_SET( REPAIR, SIGN_TILE_UNAVAIL, ctx->metrics->sign_tile_unavail );
    1156           0 :   FD_MCNT_SET( REPAIR, REREQUEST_QUEUE,   ctx->metrics->rerequest );
    1157             : 
    1158           0 :   FD_MCNT_SET      ( REPAIR, TOTAL_PKT_COUNT, ctx->metrics->send_pkt_cnt   );
    1159           0 :   FD_MCNT_ENUM_COPY( REPAIR, SENT_PKT_TYPES,  ctx->metrics->sent_pkt_types );
    1160             : 
    1161           0 :   FD_MHIST_COPY( REPAIR, SLOT_COMPLETE_TIME, ctx->metrics->slot_compl_time );
    1162           0 :   FD_MHIST_COPY( REPAIR, RESPONSE_LATENCY,   ctx->metrics->response_latency );
    1163           0 : }
    1164             : 
    1165             : #undef DEBUG_LOGGING
    1166             : 
    1167             : /* TODO: This is not correct, but is temporary and will be fixed
    1168             :    when fixed FEC 32 goes in, and we can finally get rid of force
    1169             :    completes BS. */
    1170           0 : #define STEM_BURST (64UL)
    1171             : 
    1172             : /* Sign manual credit management, backpressuring, sign tile count, &
    1173             :    sign speed effect this lazy value. The main goal of repair's highest
    1174             :    workload (catchup) is to have high send packet rate.  Repair is
    1175             :    regularly idle, and mostly waiting for dispatched signs to come
    1176             :    in. Processing shreds from shred tile is a relatively fast operation.
    1177             :    Thus we only worry about fully utilizing the sign tiles' capacity.
    1178             : 
    1179             :    Assuming standard 2 sign tiles & reasonably fast signing rate & if
    1180             :    repair_sign_depth==sign_repair_depth: the lower the LAZY, the less
    1181             :    time is spent in backpressure, and the higher the packet send rate
    1182             :    gets.  As expected, up until a certain point, credit return is slower
    1183             :    than signing. This starts to plateau at ~10k LAZY (for a box that can
    1184             :    sign at ~20k repair pps, but is fully dependent on the sign tile's
    1185             :    speed).
    1186             : 
    1187             :    At this point we start returning credits faster than we actually get
    1188             :    them from the sign tile, so signing becomes the bottleneck.  The
    1189             :    extreme case is when we set it to standard lazy (289 ns);
    1190             :    housekeeping time spikes, but backpressure time drops (to a lower but
    1191             :    inconsistent value). But because we are usually idling in the repair
    1192             :    tile, higher housekeeping doesn't really effect the send packet rate.
    1193             : 
    1194             :    Recall that repair_sign_depth is actually > sign_repair_depth (see
    1195             :    long comment in ctx_t struct).  So repair_sign is NEVER
    1196             :    backpressuring the repair tile.  When we set
    1197             :    repair_sign_depth>sign_repair_depth, we spend very little time in
    1198             :    backpressure (repair_sign always has available credits), and most of
    1199             :    the time idling.  Theoretically, this uncouples repair tile with
    1200             :    credit return and basically sends at rate as close to as we can sign.
    1201             :    This is a small improvement over the first case (low lazy,
    1202             :    repair_sign_depth==sign_repair_depth).
    1203             : 
    1204             :    Since we don't ever fill up repair_sign link, we can set LAZY to any
    1205             :    reasonable value that keeps housekeeping time low. */
    1206           0 : #define STEM_LAZY  (64000)
    1207             : 
    1208           0 : #define STEM_CALLBACK_CONTEXT_TYPE  ctx_t
    1209           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(ctx_t)
    1210             : 
    1211           0 : #define STEM_CALLBACK_AFTER_CREDIT        after_credit
    1212           0 : #define STEM_CALLBACK_BEFORE_FRAG         before_frag
    1213           0 : #define STEM_CALLBACK_DURING_FRAG         during_frag
    1214           0 : #define STEM_CALLBACK_AFTER_FRAG          after_frag
    1215           0 : #define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
    1216           0 : #define STEM_CALLBACK_METRICS_WRITE       metrics_write
    1217             : 
    1218             : #include "../../disco/stem/fd_stem.c"
    1219             : 
    1220             : fd_topo_run_tile_t fd_tile_repair = {
    1221             :   .name                     = "repair",
    1222             :   .loose_footprint          = loose_footprint,
    1223             :   .populate_allowed_seccomp = populate_allowed_seccomp,
    1224             :   .populate_allowed_fds     = populate_allowed_fds,
    1225             :   .scratch_align            = scratch_align,
    1226             :   .scratch_footprint        = scratch_footprint,
    1227             :   .unprivileged_init        = unprivileged_init,
    1228             :   .privileged_init          = privileged_init,
    1229             :   .run                      = stem_run,
    1230             : };

Generated by: LCOV version 1.14