LCOV - code coverage report
Current view: top level - discof/send - fd_send_tile.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 409 0.0 %
Date: 2025-07-01 05:00:49 Functions: 0 23 0.0 %

          Line data    Source code
       1             : #include "fd_send_tile.h"
       2             : 
       3             : #include <errno.h>
       4             : #include <sys/random.h>
       5             : 
       6             : /* map leader pubkey to quic conn
       7             :    a peer entry can be in 3 states:
       8             :    - UNSTAKED: no element in map
       9             :    - NO_CONN: pubkey maps to null conn. staked, but no active conn
      10             :    - CONN: pubkey maps to conn. staked, connection initiated
      11             : 
      12             : The state machine works as follows:
      13             : 
      14             : receive stake msg including pubkey:
      15             :   - if UNSTAKED, create new entry in NO_CONN state
      16             : 
      17             : receive contact info:
      18             :   - Update contact info. NO_CONN -> CONN. If CONN and contact info changed,
      19             :     restart the conn.
      20             :   - touch last_ci_ticks
      21             : 
      22             : Conn closed:
      23             :   - reconnect, unless contact info stale
      24             : */
      25           0 : #define CONTACT_INFO_STALE_TICKS (60e9L) /* ~60 seconds */
      26             : 
      27             : #define MAP_NAME               fd_send_conn_map
      28           0 : #define MAP_T                  fd_send_conn_entry_t
      29           0 : #define MAP_LG_SLOT_CNT        17
      30           0 : #define MAP_KEY                pubkey
      31           0 : #define MAP_KEY_T              fd_pubkey_t
      32           0 : #define MAP_KEY_NULL           (fd_pubkey_t){0}
      33           0 : #define MAP_KEY_EQUAL(k0,k1)   (!(memcmp((k0).key,(k1).key,sizeof(fd_pubkey_t))))
      34           0 : #define MAP_KEY_INVAL(k)       (MAP_KEY_EQUAL((k),MAP_KEY_NULL))
      35             : #define MAP_KEY_EQUAL_IS_SLOW  1
      36           0 : #define MAP_KEY_HASH(key)      ((key).ui[3])
      37             : #include "../../util/tmpl/fd_map.c"
      38             : 
      39             : fd_quic_limits_t quic_limits = {
      40             :   .conn_cnt                    = MAX_STAKED_LEADERS,
      41             :   .handshake_cnt               = MAX_STAKED_LEADERS,
      42             :   .conn_id_cnt                 = FD_QUIC_MIN_CONN_ID_CNT,
      43             :   .inflight_frame_cnt          = 16UL * MAX_STAKED_LEADERS,
      44             :   .min_inflight_frame_cnt_conn = 4UL,
      45             :   .stream_id_cnt               = 16UL,
      46             :   .tx_buf_sz                   = FD_TXN_MTU,
      47             :   .stream_pool_cnt             = 2048UL
      48             : };
      49             : 
      50             : FD_FN_CONST static inline ulong
      51           0 : scratch_align( void ) {
      52           0 :   return fd_ulong_max( fd_ulong_max( 128UL, fd_quic_align() ), fd_send_conn_map_align() );
      53           0 : }
      54             : 
      55             : FD_FN_PURE static inline ulong
      56           0 : scratch_footprint( fd_topo_tile_t const * tile FD_PARAM_UNUSED) {
      57           0 :   ulong l = FD_LAYOUT_INIT;
      58           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_send_tile_ctx_t), sizeof(fd_send_tile_ctx_t) );
      59           0 :   l = FD_LAYOUT_APPEND( l, fd_quic_align(), fd_quic_footprint( &quic_limits ) );
      60           0 :   l = FD_LAYOUT_APPEND( l, fd_send_conn_map_align(), fd_send_conn_map_footprint() );
      61           0 :   return FD_LAYOUT_FINI( l, scratch_align() );
      62           0 : }
      63             : 
      64             : /* QUIC callbacks */
      65             : 
      66             : static void
      67             : quic_tls_cv_sign( void *      signer_ctx,
      68             :                   uchar       signature[ static 64 ],
      69           0 :                   uchar const payload[ static 130 ] ) {
      70           0 :   fd_send_tile_ctx_t * ctx = signer_ctx;
      71           0 :   fd_sha512_t * sha512 = fd_sha512_join( ctx->sha512 );
      72           0 :   fd_ed25519_sign( signature, payload, 130UL, ctx->tls_pub_key, ctx->tls_priv_key, sha512 );
      73           0 :   fd_sha512_leave( sha512 );
      74           0 : }
      75             : 
      76             : /* quic_now is used by quic to get current time */
      77             : static ulong
      78           0 : quic_now( void * _ctx ) {
      79           0 :   fd_send_tile_ctx_t * ctx = (fd_send_tile_ctx_t *)_ctx;
      80           0 :   return (ulong)ctx->now;
      81           0 : }
      82             : 
      83             : /* quic_hs_complete is called when the QUIC handshake is complete
      84             :    It is currently used only for debug logging */
      85             : static void
      86             : quic_hs_complete( fd_quic_conn_t * conn,
      87           0 :                   void *           quic_ctx FD_PARAM_UNUSED ) {
      88           0 :   fd_send_conn_entry_t * entry = fd_type_pun( fd_quic_conn_get_context( conn ) );
      89           0 :   if( FD_UNLIKELY( !entry ) ) return;
      90           0 :   FD_LOG_DEBUG(("send_tile: QUIC handshake complete for leader %s", FD_BASE58_ENC_32_ALLOCA( entry->pubkey.key )));
      91           0 : }
      92             : 
      93             : /* quic_conn_final is called when the QUIC connection dies.
      94             :    Reconnects if contact info is recent enough. */
      95             : static void
      96             : quic_conn_final( fd_quic_conn_t * conn,
      97           0 :                  void *           quic_ctx ) {
      98           0 :   fd_send_tile_ctx_t * ctx = quic_ctx;
      99             : 
     100           0 :   fd_send_conn_entry_t * entry = fd_type_pun( fd_quic_conn_get_context( conn ) );
     101           0 :   if( FD_UNLIKELY( !entry ) ) {
     102           0 :     FD_LOG_ERR(( "send_tile: Conn map entry not found in conn_final" ));
     103           0 :   }
     104             : 
     105           0 :   if( ctx->now - entry->last_ci_ticks > CONTACT_INFO_STALE_TICKS ) {
     106             :     /* stale contact info, don't reconnect */
     107           0 :     entry->conn = NULL;
     108           0 :     ctx->metrics.contact_stale++;
     109           0 :     return;
     110           0 :   }
     111             : 
     112           0 :   uint ip4_addr = entry->ip4_addr;
     113           0 :   FD_LOG_DEBUG(("send_tile: Quic conn final: %p to peer %u.%u.%u.%u:%u", (void*)conn, ip4_addr&0xFF, (ip4_addr>>8)&0xFF, (ip4_addr>>16)&0xFF, (ip4_addr>>24)&0xFF, entry->udp_port));
     114           0 :   entry->conn = NULL;
     115           0 :   quic_connect( ctx, entry );
     116           0 : }
     117             : 
     118             : static int
     119             : quic_tx_aio_send( void *                    _ctx,
     120             :                   fd_aio_pkt_info_t const * batch,
     121             :                   ulong                     batch_cnt,
     122             :                   ulong *                   opt_batch_idx,
     123           0 :                   int                       flush ) {
     124           0 :   (void)flush;
     125             : 
     126           0 :   fd_send_tile_ctx_t * ctx = _ctx;
     127             : 
     128           0 :   for( ulong i=0; i<batch_cnt; i++ ) {
     129           0 :     if( FD_UNLIKELY( batch[ i ].buf_sz<FD_NETMUX_SIG_MIN_HDR_SZ ) ) continue;
     130             : 
     131           0 :     uint const ip_dst = FD_LOAD( uint, batch[ i ].buf+offsetof( fd_ip4_hdr_t, daddr_c ) );
     132             : 
     133           0 :     fd_send_link_out_t * net_out_link = ctx->net_out;
     134           0 :     uchar * packet_l2 = fd_chunk_to_laddr( net_out_link->mem, net_out_link->chunk );
     135           0 :     uchar * packet_l3 = packet_l2 + sizeof(fd_eth_hdr_t);
     136           0 :     memset( packet_l2, 0, 12 );
     137           0 :     FD_STORE( ushort, packet_l2+offsetof( fd_eth_hdr_t, net_type ), fd_ushort_bswap( FD_ETH_HDR_TYPE_IP ) );
     138           0 :     fd_memcpy( packet_l3, batch[ i ].buf, batch[ i ].buf_sz );
     139           0 :     ulong sz_l2 = sizeof(fd_eth_hdr_t) + batch[ i ].buf_sz;
     140             : 
     141             :     /* send packets are just round-robined by sequence number, so for now
     142             :        just indicate where they came from so they don't bounce back */
     143           0 :     ulong sig = fd_disco_netmux_sig( ip_dst, 0U, ip_dst, DST_PROTO_OUTGOING, FD_NETMUX_SIG_MIN_HDR_SZ );
     144             : 
     145           0 :     ulong tspub = (ulong)ctx->now;
     146             : 
     147           0 :     fd_stem_publish( ctx->stem, net_out_link->idx, sig, net_out_link->chunk, sz_l2, 0UL, 0, tspub );
     148           0 :     net_out_link->chunk = fd_dcache_compact_next( net_out_link->chunk, sz_l2, net_out_link->chunk0, net_out_link->wmark );
     149           0 :   }
     150             : 
     151           0 :   if( FD_LIKELY( opt_batch_idx ) ) {
     152           0 :     *opt_batch_idx = batch_cnt;
     153           0 :   }
     154             : 
     155           0 :   return FD_AIO_SUCCESS;
     156           0 : }
     157             : 
     158             : /* QUIC conn management and wrappers */
     159             : 
     160             : fd_quic_conn_t *
     161             : quic_connect( fd_send_tile_ctx_t   * ctx,
     162           0 :               fd_send_conn_entry_t * entry ) {
     163           0 :   uint   dst_ip   = entry->ip4_addr;
     164           0 :   ushort dst_port = entry->udp_port;
     165             : 
     166           0 :   FD_TEST( entry->conn == NULL );
     167             : 
     168             :   // if( FD_UNLIKELY( entry->conn ) ) {
     169             :   //   /* close existing conn */
     170             :   //   fd_quic_conn_close( entry->conn, 0 );
     171             :   // }
     172             : 
     173           0 :   fd_quic_conn_t * conn = fd_quic_connect( ctx->quic, dst_ip, dst_port, ctx->src_ip_addr, ctx->src_port );
     174           0 :   if( FD_UNLIKELY( !conn ) ) {
     175           0 :     ctx->metrics.quic_conn_create_failed++;
     176           0 :     FD_LOG_WARNING(( "send_tile: Failed to create QUIC connection to %u.%u.%u.%u:%u", dst_ip&0xFF, (dst_ip>>8)&0xFF, (dst_ip>>16)&0xFF, (dst_ip>>24)&0xFF, dst_port ));
     177           0 :     return NULL;
     178           0 :   }
     179             : 
     180           0 :   FD_LOG_DEBUG(("send_tile: Quic conn created: %p to peer %u.%u.%u.%u:%u", (void*)conn, dst_ip&0xFF, (dst_ip>>8)&0xFF, (dst_ip>>16)&0xFF, (dst_ip>>24)&0xFF, dst_port));
     181             : 
     182           0 :   entry->conn = conn;
     183           0 :   fd_quic_conn_set_context( conn, entry );
     184             : 
     185           0 :   return conn;
     186           0 : }
     187             : 
     188             : /* get_quic_conn looks up a QUIC connection for a given pubkey. */
     189             : static fd_quic_conn_t *
     190             : get_quic_conn( fd_send_tile_ctx_t * ctx,
     191           0 :                fd_pubkey_t const *  pubkey ) {
     192           0 :   fd_send_conn_entry_t * entry = fd_send_conn_map_query( ctx->conn_map, *pubkey, NULL );
     193           0 :   if( FD_LIKELY( entry ) ) {
     194           0 :     return entry->conn;
     195           0 :   }
     196           0 :   return NULL;
     197           0 : }
     198             : 
     199             : void
     200             : quic_send( fd_send_tile_ctx_t  *  ctx,
     201             :            fd_pubkey_t const   *  pubkey,
     202             :            uchar const         *  payload,
     203           0 :            ulong                  payload_sz ) {
     204             : 
     205           0 :   fd_quic_conn_t * conn = get_quic_conn( ctx, pubkey );
     206           0 :   if( FD_UNLIKELY( !conn ) ) {
     207           0 :     ctx->metrics.quic_send_result_cnt[FD_METRICS_ENUM_TXN_QUIC_SEND_RESULT_V_NO_CONN_IDX]++;
     208           0 :     FD_LOG_DEBUG(("send_tile: Quic conn not found for leader %s", FD_BASE58_ENC_32_ALLOCA( pubkey->key )));
     209           0 :     return;
     210           0 :   }
     211             : 
     212           0 :   fd_quic_stream_t * stream = fd_quic_conn_new_stream( conn );
     213           0 :   if( FD_UNLIKELY( !stream ) ) {
     214           0 :     ctx->metrics.quic_send_result_cnt[FD_METRICS_ENUM_TXN_QUIC_SEND_RESULT_V_NO_STREAM_IDX]++;
     215           0 :     FD_LOG_DEBUG(("send_tile: Quic stream unavailable for leader %s", FD_BASE58_ENC_32_ALLOCA( pubkey->key )));
     216           0 :     return;
     217           0 :   }
     218             : 
     219           0 :   FD_LOG_DEBUG(("send_tile: Sending txn to leader %s", FD_BASE58_ENC_32_ALLOCA( pubkey->key )));
     220           0 :   ctx->metrics.quic_send_result_cnt[FD_METRICS_ENUM_TXN_QUIC_SEND_RESULT_V_SUCCESS_IDX]++;
     221             : 
     222           0 :   fd_quic_stream_send( stream, payload, payload_sz, 1 );
     223           0 : }
     224             : 
     225             : 
     226             : /* handle_new_contact_info handles a new contact. Validates contact info
     227             :    and starts/restarts a connection if necessary. */
     228             : static inline void
     229             : handle_new_contact_info( fd_send_tile_ctx_t   * ctx,
     230           0 :                          fd_shred_dest_wire_t * contact ) {
     231           0 :   uint    new_ip   = contact->ip4_addr;
     232           0 :   ushort  new_port = contact->udp_port;
     233           0 :   if( FD_UNLIKELY( new_ip==0 || new_port==0 ) ) {
     234           0 :     ctx->metrics.new_contact_info[FD_METRICS_ENUM_NEW_CONTACT_OUTCOME_V_UNROUTABLE_IDX]++;
     235           0 :     return;
     236           0 :   }
     237             : 
     238           0 :   fd_send_conn_entry_t * entry  = fd_send_conn_map_query( ctx->conn_map, *contact->pubkey, NULL );
     239           0 :   if( FD_UNLIKELY( !entry ) ) {
     240             :     /* Skip if UNSTAKED */
     241           0 :     FD_LOG_DEBUG(("send_tile: Skipping unstaked pubkey %s at %u.%u.%u.%u:%u", FD_BASE58_ENC_32_ALLOCA( contact->pubkey->key ), new_ip&0xFF, (new_ip>>8)&0xFF, (new_ip>>16)&0xFF, (new_ip>>24)&0xFF, new_port));
     242           0 :     ctx->metrics.new_contact_info[FD_METRICS_ENUM_NEW_CONTACT_OUTCOME_V_UNSTAKED_IDX]++;
     243           0 :     return;
     244           0 :   }
     245             : 
     246           0 :   int info_changed = (entry->ip4_addr != new_ip) | (entry->udp_port != new_port);
     247           0 :   entry->ip4_addr  = new_ip;
     248           0 :   entry->udp_port  = new_port;
     249             : 
     250           0 :   if( entry->conn && !info_changed ) {
     251           0 :     ctx->metrics.new_contact_info[FD_METRICS_ENUM_NEW_CONTACT_OUTCOME_V_NO_CHANGE_IDX]++;
     252           0 :     return;
     253           0 :   }
     254             : 
     255           0 :   if( FD_UNLIKELY( entry->conn && info_changed ) ) {
     256             :     /* close conn, will restart with new contact info */
     257           0 :     fd_quic_conn_close( entry->conn, 0 );
     258           0 :     entry->conn = NULL;
     259           0 :     ctx->metrics.new_contact_info[FD_METRICS_ENUM_NEW_CONTACT_OUTCOME_V_CHANGED_IDX]++;
     260           0 :   }
     261             : 
     262           0 :   entry->last_ci_ticks = ctx->now;
     263           0 :   quic_connect( ctx, entry );
     264           0 :   ctx->metrics.new_contact_info[FD_METRICS_ENUM_NEW_CONTACT_OUTCOME_V_CONNECT_IDX]++;
     265           0 :   return;
     266           0 : }
     267             : 
     268             : static inline void
     269           0 : finalize_new_cluster_contact_info( fd_send_tile_ctx_t * ctx ) {
     270           0 :   for( ulong i=0UL; i<ctx->contact_cnt; i++ ) {
     271           0 :     handle_new_contact_info( ctx, &ctx->contact_buf[i] );
     272           0 :   }
     273           0 : }
     274             : 
     275             : /* Called during after_frag for stake messages. */
     276             : static void
     277           0 : finalize_stake_msg( fd_send_tile_ctx_t * ctx ) {
     278             : 
     279           0 :   fd_multi_epoch_leaders_stake_msg_fini( ctx->mleaders );
     280             : 
     281             :   /* Get the current stake destinations */
     282           0 :   fd_stake_weight_t const * stakes    = fd_multi_epoch_leaders_get_stake_weights( ctx->mleaders );
     283           0 :   ulong                     stake_cnt = fd_multi_epoch_leaders_get_stake_cnt( ctx->mleaders );
     284           0 :   if( FD_UNLIKELY( !stakes ) ) {
     285           0 :     FD_LOG_WARNING(( "No stake destinations available for current slot" ));
     286           0 :     return;
     287           0 :   }
     288             : 
     289             :   /* populate staked validators in connection map */
     290           0 :   for( ulong i=0UL; i<stake_cnt; i++ ) {
     291           0 :     fd_stake_weight_t const * stake_info = &stakes[i];
     292           0 :     fd_pubkey_t       const   pubkey     = stake_info->key;
     293             : 
     294           0 :     fd_send_conn_entry_t * entry = fd_send_conn_map_query( ctx->conn_map, pubkey, NULL );
     295             :     /* UNSTAKED -> NO_CONN: create new entry in NO_CONN state */
     296           0 :     if( FD_UNLIKELY( !entry ) ) {
     297           0 :       FD_LOG_DEBUG(("send_tile: creating new entry for pubkey %s", FD_BASE58_ENC_32_ALLOCA( pubkey.key )));
     298           0 :       entry = fd_send_conn_map_insert( ctx->conn_map, pubkey );
     299           0 :       entry->conn = NULL;
     300           0 :     }
     301           0 :   }
     302           0 : }
     303             : 
     304             : /* Stem callbacks */
     305             : 
     306             : static inline void
     307             : before_credit( fd_send_tile_ctx_t * ctx,
     308             :                fd_stem_context_t  * stem,
     309           0 :                int *                charge_busy ) {
     310           0 :   ctx->stem = stem;
     311             : 
     312           0 :   ctx->now = fd_tickcount();
     313             : 
     314             :   /* Publishes to mcache via callbacks */
     315           0 :   *charge_busy = fd_quic_service( ctx->quic );
     316           0 : }
     317             : 
     318             : static void
     319             : during_frag( fd_send_tile_ctx_t * ctx,
     320             :              ulong                  in_idx,
     321             :              ulong                  seq FD_PARAM_UNUSED,
     322             :              ulong                  sig FD_PARAM_UNUSED,
     323             :              ulong                  chunk,
     324             :              ulong                  sz,
     325           0 :              ulong                  ctl ) {
     326             : 
     327           0 :   fd_send_link_in_t * in_link = &ctx->in_links[ in_idx ];
     328           0 :   if( FD_UNLIKELY( chunk<in_link->chunk0 || chunk>in_link->wmark ) ) {
     329           0 :     FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu] on link %lu", chunk, sz, in_link->chunk0, in_link->wmark, in_idx ));
     330           0 :   }
     331             : 
     332           0 :   uchar const * dcache_entry = fd_chunk_to_laddr_const( in_link->mem, chunk );
     333           0 :   ulong         kind         = in_link->kind;
     334             : 
     335           0 :   if( FD_UNLIKELY( kind==IN_KIND_NET ) ) {
     336           0 :     void const * src = fd_net_rx_translate_frag( &ctx->net_in_bounds, chunk, ctl, sz );
     337           0 :     fd_memcpy( ctx->quic_buf, src, sz );
     338           0 :   }
     339             : 
     340           0 :   if( FD_UNLIKELY( kind==IN_KIND_STAKE ) ) {
     341           0 :     if( sz>sizeof(fd_stake_weight_t)*(MAX_STAKED_LEADERS+1UL) ) {
     342           0 :       FD_LOG_ERR(( "sz %lu >= max expected stake update size %lu", sz, sizeof(fd_stake_weight_t) * (MAX_STAKED_LEADERS+1UL) ));
     343           0 :     }
     344           0 :     fd_multi_epoch_leaders_stake_msg_init( ctx->mleaders, fd_type_pun_const( dcache_entry ) );
     345           0 :   }
     346             : 
     347           0 :   if( FD_UNLIKELY( kind==IN_KIND_GOSSIP ) ) {
     348           0 :     if( sz>sizeof(fd_shred_dest_wire_t)*MAX_STAKED_LEADERS ) {
     349           0 :       FD_LOG_ERR(( "sz %lu >= max expected gossip update size %lu", sz, sizeof(fd_shred_dest_wire_t) * MAX_STAKED_LEADERS ));
     350           0 :     }
     351           0 :     ctx->contact_cnt = sz / sizeof(fd_shred_dest_wire_t);
     352           0 :     fd_memcpy( ctx->contact_buf, dcache_entry, sz );
     353           0 :   }
     354             : 
     355           0 :   if( FD_UNLIKELY( kind==IN_KIND_TOWER ) ) {
     356           0 :     if( sz!=sizeof(fd_txn_p_t) ) {
     357           0 :       FD_LOG_ERR(( "sz %lu != expected txn size %lu", sz, sizeof(fd_txn_p_t) ));
     358           0 :     }
     359           0 :     fd_memcpy( ctx->txn_buf, dcache_entry, sz );
     360           0 :   }
     361           0 : }
     362             : 
     363             : static void
     364             : after_frag( fd_send_tile_ctx_t * ctx,
     365             :             ulong                in_idx,
     366             :             ulong                seq FD_PARAM_UNUSED,
     367             :             ulong                sig,
     368             :             ulong                sz,
     369             :             ulong                tsorig FD_PARAM_UNUSED,
     370             :             ulong                tspub FD_PARAM_UNUSED,
     371           0 :             fd_stem_context_t *  stem ) {
     372             : 
     373           0 :   ctx->stem = stem;
     374             : 
     375           0 :   fd_send_link_in_t * in_link = &ctx->in_links[ in_idx ];
     376           0 :   ulong                 kind  = in_link->kind;
     377             : 
     378           0 :   if( FD_UNLIKELY( kind==IN_KIND_NET ) ) {
     379           0 :     uchar     * ip_pkt = ctx->quic_buf + sizeof(fd_eth_hdr_t);
     380           0 :     ulong       ip_sz  = sz - sizeof(fd_eth_hdr_t);
     381           0 :     fd_quic_t * quic   = ctx->quic;
     382             : 
     383           0 :     long dt = -fd_tickcount();
     384           0 :     fd_quic_process_packet( quic, ip_pkt, ip_sz );
     385           0 :     dt += fd_tickcount();
     386           0 :     fd_histf_sample( quic->metrics.receive_duration, (ulong)dt );
     387           0 :   }
     388             : 
     389           0 :   if( FD_UNLIKELY( kind==IN_KIND_TOWER ) ) {
     390             : 
     391           0 :     fd_txn_p_t * txn = (fd_txn_p_t *)fd_type_pun(ctx->txn_buf);
     392             : 
     393             :     /* sign the txn */
     394           0 :     uchar * signature = txn->payload + TXN(txn)->signature_off;
     395           0 :     uchar * message   = txn->payload + TXN(txn)->message_off;
     396           0 :     ulong message_sz  = txn->payload_sz - TXN(txn)->message_off;
     397           0 :     fd_keyguard_client_sign( ctx->keyguard_client, signature, message, message_sz, FD_KEYGUARD_SIGN_TYPE_ED25519 );
     398             : 
     399           0 :     ulong poh_slot = sig;
     400             : 
     401             :     /* send to leader for next few slots */
     402           0 :     for( ulong i=0UL; i<SEND_TO_LEADER_CNT; i++ ) {
     403           0 :       fd_pubkey_t const * leader = fd_multi_epoch_leaders_get_leader_for_slot( ctx->mleaders, poh_slot );
     404           0 :       if( FD_LIKELY( leader ) ) {
     405           0 :         quic_send( ctx, leader, txn->payload, txn->payload_sz );
     406           0 :       } else {
     407           0 :         ctx->metrics.leader_not_found++;
     408           0 :         FD_LOG_DEBUG(("send_tile: Failed to get leader contact"));
     409           0 :       }
     410           0 :     }
     411             : 
     412             :     /* send to gossip and dedup */
     413           0 :     fd_send_link_out_t * gossip_verify_out = ctx->gossip_verify_out;
     414           0 :     uchar * msg_to_gossip = fd_chunk_to_laddr( gossip_verify_out->mem, gossip_verify_out->chunk );
     415           0 :     fd_memcpy( msg_to_gossip, txn->payload, txn->payload_sz );
     416           0 :     fd_stem_publish( stem, gossip_verify_out->idx, 1UL, gossip_verify_out->chunk, txn->payload_sz, 0UL, 0, 0 );
     417           0 :     gossip_verify_out->chunk = fd_dcache_compact_next( gossip_verify_out->chunk, txn->payload_sz, gossip_verify_out->chunk0,
     418           0 :         gossip_verify_out->wmark );
     419           0 :   }
     420             : 
     421           0 :   if( FD_UNLIKELY( kind==IN_KIND_GOSSIP ) ) {
     422           0 :     finalize_new_cluster_contact_info( ctx );
     423           0 :     return;
     424           0 :   }
     425             : 
     426           0 :   if( FD_UNLIKELY( kind==IN_KIND_STAKE ) ) {
     427           0 :     finalize_stake_msg( ctx );
     428           0 :     return;
     429           0 :   }
     430           0 : }
     431             : 
     432             : static void
     433             : privileged_init( fd_topo_t *      topo,
     434           0 :                  fd_topo_tile_t * tile ) {
     435           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     436             : 
     437           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     438           0 :   fd_send_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_send_tile_ctx_t), sizeof(fd_send_tile_ctx_t) );
     439             : 
     440             :   /* identity not used yet, but will be soon to sign quic txns */
     441           0 :   if( FD_UNLIKELY( !strcmp( tile->send.identity_key_path, "" ) ) )
     442           0 :     FD_LOG_ERR(( "identity_key_path not set" ));
     443             : 
     444           0 :   ctx->identity_key[ 0 ] = *(fd_pubkey_t const *)fd_type_pun_const( fd_keyload_load( tile->send.identity_key_path, /* pubkey only: */ 1 ) );
     445           0 : }
     446             : 
     447             : static fd_send_link_in_t *
     448             : setup_input_link( fd_send_tile_ctx_t * ctx,
     449             :                   fd_topo_t          * topo,
     450             :                   fd_topo_tile_t     * tile,
     451             :                   ulong                kind,
     452           0 :                   const char         * name ) {
     453           0 :   ulong in_idx = fd_topo_find_tile_in_link( topo, tile, name, 0 );
     454           0 :   FD_TEST( in_idx!=ULONG_MAX );
     455           0 :   fd_topo_link_t * in_link = &topo->links[ tile->in_link_id[ in_idx ] ];
     456           0 :   fd_send_link_in_t * in_link_desc = &ctx->in_links[ in_idx ];
     457           0 :   in_link_desc->mem    = topo->workspaces[ topo->objs[ in_link->dcache_obj_id ].wksp_id ].wksp;
     458           0 :   in_link_desc->chunk0 = fd_dcache_compact_chunk0( in_link_desc->mem, in_link->dcache );
     459           0 :   in_link_desc->wmark  = fd_dcache_compact_wmark( in_link_desc->mem, in_link->dcache, in_link->mtu );
     460           0 :   in_link_desc->dcache = in_link->dcache;
     461           0 :   in_link_desc->kind   = kind;
     462           0 :   return in_link_desc;
     463           0 : }
     464             : 
     465             : static void
     466             : setup_output_link( fd_send_link_out_t * desc,
     467             :                    fd_topo_t           * topo,
     468             :                    fd_topo_tile_t      * tile,
     469           0 :                    const char          * name ) {
     470           0 :   ulong out_idx = fd_topo_find_tile_out_link( topo, tile, name, 0 );
     471           0 :   FD_TEST( out_idx!=ULONG_MAX );
     472           0 :   fd_topo_link_t * out_link = &topo->links[ tile->out_link_id[ out_idx ] ];
     473           0 :   desc->idx    = out_idx;
     474           0 :   desc->mcache = out_link->mcache;
     475           0 :   desc->sync   = fd_mcache_seq_laddr( desc->mcache );
     476           0 :   desc->depth  = fd_mcache_depth( desc->mcache );
     477           0 :   desc->mem    = topo->workspaces[ topo->objs[ out_link->dcache_obj_id ].wksp_id ].wksp;
     478           0 :   desc->chunk0 = fd_dcache_compact_chunk0( desc->mem, out_link->dcache );
     479           0 :   desc->wmark  = fd_dcache_compact_wmark( desc->mem, out_link->dcache, out_link->mtu );
     480           0 :   desc->chunk  = desc->chunk0;
     481           0 : }
     482             : 
     483             : static void
     484             : unprivileged_init( fd_topo_t *      topo,
     485           0 :                    fd_topo_tile_t * tile ) {
     486           0 :   void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
     487             : 
     488           0 :   if( FD_UNLIKELY( !tile->out_cnt ) ) FD_LOG_ERR(( "send has no primary output link" ));
     489             : 
     490             :   /* Scratch mem setup */
     491             : 
     492           0 :   FD_SCRATCH_ALLOC_INIT( l, scratch );
     493           0 :   fd_send_tile_ctx_t * ctx = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_send_tile_ctx_t), sizeof(fd_send_tile_ctx_t) );
     494           0 :   fd_memset( ctx, 0, sizeof(fd_send_tile_ctx_t) );
     495             : 
     496           0 :   ctx->mleaders = fd_multi_epoch_leaders_join( fd_multi_epoch_leaders_new( ctx->mleaders_mem ) );
     497           0 :   FD_TEST( ctx->mleaders );
     498             : 
     499           0 :   if( FD_UNLIKELY( getrandom( ctx->tls_priv_key, ED25519_PRIV_KEY_SZ, 0 )!=ED25519_PRIV_KEY_SZ ) ) {
     500           0 :     FD_LOG_ERR(( "getrandom failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     501           0 :   }
     502           0 :   fd_sha512_t * sha512 = fd_sha512_join( fd_sha512_new( ctx->sha512 ) );
     503           0 :   fd_ed25519_public_from_private( ctx->tls_pub_key, ctx->tls_priv_key, sha512 );
     504           0 :   fd_sha512_leave( sha512 );
     505             : 
     506           0 :   fd_aio_t * quic_tx_aio = fd_aio_join( fd_aio_new( ctx->quic_tx_aio, ctx, quic_tx_aio_send ) );
     507           0 :   if( FD_UNLIKELY( !quic_tx_aio ) ) FD_LOG_ERR(( "fd_aio_join failed" ));
     508             : 
     509           0 :   fd_quic_t * quic = fd_quic_join( fd_quic_new( FD_SCRATCH_ALLOC_APPEND( l, fd_quic_align(), fd_quic_footprint( &quic_limits ) ), &quic_limits ) );
     510           0 :   if( FD_UNLIKELY( !quic ) ) FD_LOG_ERR(( "fd_quic_new failed" ));
     511             : 
     512           0 :   quic->config.role          = FD_QUIC_ROLE_CLIENT;
     513           0 :   quic->config.idle_timeout  = QUIC_IDLE_TIMEOUT_NS;
     514           0 :   quic->config.ack_delay     = QUIC_ACK_DELAY_NS;
     515           0 :   quic->config.keep_alive    = 1;
     516           0 :   quic->config.sign          = quic_tls_cv_sign;
     517           0 :   quic->config.sign_ctx      = ctx;
     518           0 :   fd_memcpy( quic->config.identity_public_key, ctx->tls_pub_key, ED25519_PUB_KEY_SZ );
     519             : 
     520           0 :   quic->cb.conn_hs_complete  = quic_hs_complete;
     521           0 :   quic->cb.conn_final        = quic_conn_final;
     522           0 :   quic->cb.now               = quic_now;
     523           0 :   quic->cb.now_ctx           = ctx;
     524           0 :   quic->cb.quic_ctx          = ctx;
     525             : 
     526           0 :   fd_quic_set_aio_net_tx( quic, quic_tx_aio );
     527           0 :   fd_quic_set_clock_tickcount( quic );
     528           0 :   FD_TEST_CUSTOM( fd_quic_init( quic ), "fd_quic_init failed" );
     529             : 
     530           0 :   fd_histf_join( fd_histf_new( quic->metrics.service_duration, FD_MHIST_SECONDS_MIN( SEND, SERVICE_DURATION_SECONDS ),
     531           0 :                                                                     FD_MHIST_SECONDS_MAX( SEND, SERVICE_DURATION_SECONDS ) ) );
     532           0 :   fd_histf_join( fd_histf_new( quic->metrics.receive_duration, FD_MHIST_SECONDS_MIN( SEND, RECEIVE_DURATION_SECONDS ),
     533           0 :                                                                     FD_MHIST_SECONDS_MAX( SEND, RECEIVE_DURATION_SECONDS ) ) );
     534             : 
     535           0 :   ctx->quic = quic;
     536             : 
     537             :   /* Initialize connection map */
     538           0 :   void * conn_map_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_send_conn_map_align(), fd_send_conn_map_footprint() );
     539           0 :   ctx->conn_map = fd_send_conn_map_join( fd_send_conn_map_new( conn_map_mem ) );
     540           0 :   if( FD_UNLIKELY( !ctx->conn_map ) ) FD_LOG_ERR(( "fd_send_conn_map_join failed" ));
     541             : 
     542           0 :   ctx->src_ip_addr = tile->send.ip_addr;
     543           0 :   ctx->src_port    = tile->send.send_src_port;
     544           0 :   fd_ip4_udp_hdr_init( ctx->packet_hdr, FD_TXN_MTU, ctx->src_ip_addr, ctx->src_port );
     545             : 
     546           0 :   setup_input_link( ctx, topo, tile, IN_KIND_GOSSIP, "gossip_send" );
     547           0 :   setup_input_link( ctx, topo, tile, IN_KIND_STAKE,  "stake_out"   );
     548           0 :   setup_input_link( ctx, topo, tile, IN_KIND_TOWER,  "tower_send"  );
     549             : 
     550           0 :   fd_send_link_in_t * net_in = setup_input_link( ctx, topo, tile, IN_KIND_NET, "net_send" );
     551           0 :   fd_net_rx_bounds_init( &ctx->net_in_bounds, net_in->dcache );
     552             : 
     553           0 :   setup_output_link( ctx->gossip_verify_out, topo, tile, "send_txns" );
     554           0 :   setup_output_link( ctx->net_out,           topo, tile, "send_net"  );
     555             : 
     556             :   /* Set up keyguard(s) */
     557           0 :   ulong             sign_in_idx  =  fd_topo_find_tile_in_link(  topo, tile, "sign_send", 0 );
     558           0 :   ulong             sign_out_idx =  fd_topo_find_tile_out_link( topo, tile, "send_sign", 0 );
     559           0 :   fd_topo_link_t  * sign_in      =  &topo->links[ tile->in_link_id[  sign_in_idx  ] ];
     560           0 :   fd_topo_link_t  * sign_out     =  &topo->links[ tile->out_link_id[ sign_out_idx ] ];
     561             : 
     562           0 :   if ( fd_keyguard_client_join( fd_keyguard_client_new( ctx->keyguard_client,
     563           0 :                                                             sign_out->mcache,
     564           0 :                                                             sign_out->dcache,
     565           0 :                                                             sign_in->mcache,
     566           0 :                                                             sign_in->dcache ) )==NULL ) {
     567           0 :     FD_LOG_ERR(( "Keyguard join failed" ));
     568           0 :   }
     569             : 
     570             :   /* init metrics */
     571           0 :   fd_memset( &ctx->metrics, 0, sizeof(ctx->metrics) );
     572             : 
     573           0 :   ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, scratch_align() );
     574           0 :   if( FD_UNLIKELY( scratch_top != (ulong)scratch + scratch_footprint( tile ) ) ) {
     575           0 :     FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
     576           0 :   }
     577           0 : }
     578             : 
     579             : static ulong
     580             : populate_allowed_seccomp( fd_topo_t      const * topo FD_PARAM_UNUSED,
     581             :                           fd_topo_tile_t const * tile FD_PARAM_UNUSED,
     582             :                           ulong                  out_cnt,
     583           0 :                           struct sock_filter   * out ) {
     584             : 
     585           0 :   populate_sock_filter_policy_fd_send_tile( out_cnt, out, (uint)fd_log_private_logfile_fd() );
     586           0 :   return sock_filter_policy_fd_send_tile_instr_cnt;
     587           0 : }
     588             : 
     589             : static ulong
     590             : populate_allowed_fds( fd_topo_t const *      topo,
     591             :                       fd_topo_tile_t const * tile,
     592             :                       ulong                  out_fds_cnt,
     593           0 :                       int *                  out_fds ) {
     594           0 :   (void)topo;
     595           0 :   (void)tile;
     596             : 
     597           0 :   if( FD_UNLIKELY( out_fds_cnt<2UL ) ) FD_LOG_ERR(( "out_fds_cnt %lu", out_fds_cnt ));
     598             : 
     599           0 :   ulong out_cnt = 0;
     600           0 :   out_fds[ out_cnt++ ] = 2UL; /* stderr */
     601           0 :   if( FD_LIKELY( -1!=fd_log_private_logfile_fd() ) )
     602           0 :     out_fds[ out_cnt++ ] = fd_log_private_logfile_fd(); /* logfile */
     603           0 :   return out_cnt;
     604           0 : }
     605             : 
     606             : static void
     607           0 : metrics_write( fd_send_tile_ctx_t * ctx ) {
     608             : 
     609           0 :   FD_MCNT_SET( SEND, LEADER_NOT_FOUND,        ctx->metrics.leader_not_found        );
     610           0 :   FD_MCNT_SET( SEND, CONTACT_STALE,           ctx->metrics.contact_stale           );
     611           0 :   FD_MCNT_SET( SEND, QUIC_CONN_CREATE_FAILED, ctx->metrics.quic_conn_create_failed );
     612             : 
     613           0 :   FD_MCNT_ENUM_COPY( SEND, NEW_CONTACT_INFO, ctx->metrics.new_contact_info );
     614           0 :   FD_MCNT_ENUM_COPY( SEND, QUIC_SEND_RESULT, ctx->metrics.quic_send_result_cnt );
     615             : 
     616             :   /* QUIC metrics */
     617           0 :   FD_MCNT_SET(       SEND, RECEIVED_BYTES,         ctx->quic->metrics.net_rx_byte_cnt );
     618           0 :   FD_MCNT_ENUM_COPY( SEND, RECEIVED_FRAMES,        ctx->quic->metrics.frame_rx_cnt );
     619           0 :   FD_MCNT_SET(       SEND, RECEIVED_PACKETS,       ctx->quic->metrics.net_rx_pkt_cnt );
     620           0 :   FD_MCNT_SET(       SEND, STREAM_RECEIVED_BYTES,  ctx->quic->metrics.stream_rx_byte_cnt );
     621           0 :   FD_MCNT_SET(       SEND, STREAM_RECEIVED_EVENTS, ctx->quic->metrics.stream_rx_event_cnt );
     622             : 
     623           0 :   FD_MCNT_SET(       SEND, SENT_PACKETS,     ctx->quic->metrics.net_tx_pkt_cnt );
     624           0 :   FD_MCNT_SET(       SEND, SENT_BYTES,       ctx->quic->metrics.net_tx_byte_cnt );
     625           0 :   FD_MCNT_SET(       SEND, RETRY_SENT,       ctx->quic->metrics.retry_tx_cnt );
     626           0 :   FD_MCNT_ENUM_COPY( SEND, ACK_TX,           ctx->quic->metrics.ack_tx );
     627             : 
     628           0 :   FD_MGAUGE_SET( SEND, CONNECTIONS_ACTIVE,          ctx->quic->metrics.conn_active_cnt );
     629           0 :   FD_MCNT_SET(   SEND, CONNECTIONS_CREATED,         ctx->quic->metrics.conn_created_cnt );
     630           0 :   FD_MCNT_SET(   SEND, CONNECTIONS_CLOSED,          ctx->quic->metrics.conn_closed_cnt );
     631           0 :   FD_MCNT_SET(   SEND, CONNECTIONS_ABORTED,         ctx->quic->metrics.conn_aborted_cnt );
     632           0 :   FD_MCNT_SET(   SEND, CONNECTIONS_TIMED_OUT,       ctx->quic->metrics.conn_timeout_cnt );
     633           0 :   FD_MCNT_SET(   SEND, CONNECTIONS_RETRIED,         ctx->quic->metrics.conn_retry_cnt );
     634           0 :   FD_MCNT_SET(   SEND, CONNECTION_ERROR_NO_SLOTS,   ctx->quic->metrics.conn_err_no_slots_cnt );
     635           0 :   FD_MCNT_SET(   SEND, CONNECTION_ERROR_RETRY_FAIL, ctx->quic->metrics.conn_err_retry_fail_cnt );
     636             : 
     637           0 :   FD_MCNT_ENUM_COPY( SEND, PKT_CRYPTO_FAILED,       ctx->quic->metrics.pkt_decrypt_fail_cnt );
     638           0 :   FD_MCNT_ENUM_COPY( SEND, PKT_NO_KEY,              ctx->quic->metrics.pkt_no_key_cnt );
     639           0 :   FD_MCNT_SET(       SEND, PKT_NO_CONN,             ctx->quic->metrics.pkt_no_conn_cnt );
     640           0 :   FD_MCNT_ENUM_COPY( SEND, FRAME_TX_ALLOC,          ctx->quic->metrics.frame_tx_alloc_cnt );
     641           0 :   FD_MCNT_SET(       SEND, PKT_NET_HEADER_INVALID,  ctx->quic->metrics.pkt_net_hdr_err_cnt );
     642           0 :   FD_MCNT_SET(       SEND, PKT_QUIC_HEADER_INVALID, ctx->quic->metrics.pkt_quic_hdr_err_cnt );
     643           0 :   FD_MCNT_SET(       SEND, PKT_UNDERSZ,             ctx->quic->metrics.pkt_undersz_cnt );
     644           0 :   FD_MCNT_SET(       SEND, PKT_OVERSZ,              ctx->quic->metrics.pkt_oversz_cnt );
     645           0 :   FD_MCNT_SET(       SEND, PKT_VERNEG,              ctx->quic->metrics.pkt_verneg_cnt );
     646           0 :   FD_MCNT_SET(       SEND, PKT_RETRANSMISSIONS,     ctx->quic->metrics.pkt_retransmissions_cnt );
     647             : 
     648           0 :   FD_MCNT_SET(   SEND, HANDSHAKES_CREATED,          ctx->quic->metrics.hs_created_cnt );
     649           0 :   FD_MCNT_SET(   SEND, HANDSHAKE_ERROR_ALLOC_FAIL,  ctx->quic->metrics.hs_err_alloc_fail_cnt );
     650           0 :   FD_MCNT_SET(   SEND, HANDSHAKE_EVICTED,           ctx->quic->metrics.hs_evicted_cnt );
     651             : 
     652           0 :   FD_MCNT_SET(   SEND, FRAME_FAIL_PARSE,            ctx->quic->metrics.frame_rx_err_cnt );
     653             : 
     654           0 :   FD_MHIST_COPY( SEND, SERVICE_DURATION_SECONDS,    ctx->quic->metrics.service_duration );
     655           0 :   FD_MHIST_COPY( SEND, RECEIVE_DURATION_SECONDS,    ctx->quic->metrics.receive_duration );
     656           0 : }
     657             : 
     658             : 
     659           0 : #define STEM_BURST                  1UL /* send_txns */
     660             : 
     661           0 : #define STEM_CALLBACK_CONTEXT_TYPE  fd_send_tile_ctx_t
     662           0 : #define STEM_CALLBACK_CONTEXT_ALIGN alignof(fd_send_tile_ctx_t)
     663             : 
     664           0 : #define STEM_CALLBACK_DURING_FRAG   during_frag
     665           0 : #define STEM_CALLBACK_AFTER_FRAG    after_frag
     666           0 : #define STEM_CALLBACK_METRICS_WRITE metrics_write
     667           0 : #define STEM_CALLBACK_BEFORE_CREDIT before_credit
     668             : #include "../../disco/stem/fd_stem.c"
     669             : 
     670             : fd_topo_run_tile_t fd_tile_send = {
     671             :   .name                     = "send",
     672             :   .populate_allowed_seccomp = populate_allowed_seccomp,
     673             :   .populate_allowed_fds     = populate_allowed_fds,
     674             :   .scratch_align            = scratch_align,
     675             :   .scratch_footprint        = scratch_footprint,
     676             :   .privileged_init          = privileged_init,
     677             :   .unprivileged_init        = unprivileged_init,
     678             :   .run                      = stem_run,
     679             : };

Generated by: LCOV version 1.14