LCOV - code coverage report
Current view: top level - disco/bundle - fd_bundle_client.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 430 685 62.8 %
Date: 2025-09-19 04:41:14 Functions: 30 35 85.7 %

          Line data    Source code
       1             : /* fd_bundle_client.c steps gRPC related tasks. */
       2             : 
       3             : #define _GNU_SOURCE /* SOL_TCP */
       4             : #include "fd_bundle_auth.h"
       5             : #include "fd_bundle_tile_private.h"
       6             : #include "proto/block_engine.pb.h"
       7             : #include "proto/bundle.pb.h"
       8             : #include "proto/packet.pb.h"
       9             : #include "../fd_txn_m_t.h"
      10             : #include "../plugin/fd_plugin.h"
      11             : #include "../../waltz/h2/fd_h2_conn.h"
      12             : #include "../../waltz/http/fd_url.h" /* fd_url_unescape */
      13             : #include "../../ballet/base58/fd_base58.h"
      14             : #include "../../ballet/nanopb/pb_decode.h"
      15             : #include "../../util/net/fd_ip4.h"
      16             : 
      17             : #include <fcntl.h>
      18             : #include <errno.h>
      19             : #include <unistd.h> /* close */
      20             : #include <poll.h> /* poll */
      21             : #include <sys/socket.h> /* socket */
      22             : #include <netinet/in.h>
      23             : #include <netinet/ip.h>
      24             : #include <netinet/tcp.h>
      25             : 
      26           9 : #define FD_BUNDLE_CLIENT_REQUEST_TIMEOUT ((long)8e9) /* 8 seconds */
      27             : 
      28             : #define FD_BUNDLE_CLIENT_MAX_TXN_PER_BUNDLE (5UL)
      29             : 
      30             : __attribute__((weak)) long
      31         684 : fd_bundle_now( void ) {
      32         684 :   return fd_log_wallclock();
      33         684 : }
      34             : 
      35             : void
      36           3 : fd_bundle_client_reset( fd_bundle_tile_t * ctx ) {
      37           3 :   if( FD_UNLIKELY( ctx->tcp_sock >= 0 ) ) {
      38           3 :     if( FD_UNLIKELY( 0!=close( ctx->tcp_sock ) ) ) {
      39           0 :       FD_LOG_ERR(( "close(tcp_sock=%i) failed (%i-%s)", ctx->tcp_sock, errno, fd_io_strerror( errno ) ));
      40           0 :     }
      41           3 :     ctx->tcp_sock = -1;
      42           3 :     ctx->tcp_sock_connected = 0;
      43           3 :   }
      44           3 :   ctx->defer_reset = 0;
      45             : 
      46           3 :   ctx->builder_info_avail       = 0;
      47           3 :   ctx->builder_info_wait        = 0;
      48           3 :   ctx->packet_subscription_live = 0;
      49           3 :   ctx->packet_subscription_wait = 0;
      50           3 :   ctx->bundle_subscription_live = 0;
      51           3 :   ctx->bundle_subscription_wait = 0;
      52             : 
      53           3 :   memset( ctx->rtt, 0, sizeof(fd_rtt_estimate_t) );
      54             : 
      55           3 : # if FD_HAS_OPENSSL
      56           3 :   if( FD_UNLIKELY( ctx->ssl ) ) {
      57           0 :     SSL_free( ctx->ssl );
      58           0 :     ctx->ssl = NULL;
      59           0 :   }
      60           3 : # endif
      61             : 
      62           3 :   fd_bundle_tile_backoff( ctx, fd_bundle_now() );
      63             : 
      64           3 :   fd_bundle_auther_reset( &ctx->auther );
      65           3 :   fd_grpc_client_reset( ctx->grpc_client );
      66           3 : }
      67             : 
      68             : static int
      69             : fd_bundle_client_do_connect( fd_bundle_tile_t const * ctx,
      70           0 :                              uint                     ip4_addr ) {
      71           0 :   struct sockaddr_in addr = {
      72           0 :     .sin_family      = AF_INET,
      73           0 :     .sin_addr.s_addr = ip4_addr,
      74           0 :     .sin_port        = fd_ushort_bswap( ctx->server_tcp_port )
      75           0 :   };
      76           0 :   errno = 0;
      77           0 :   connect( ctx->tcp_sock, fd_type_pun_const( &addr ), sizeof(struct sockaddr_in) );
      78           0 :   return errno;
      79           0 : }
      80             : 
      81             : static void
      82           0 : fd_bundle_client_create_conn( fd_bundle_tile_t * ctx ) {
      83           0 :   fd_bundle_client_reset( ctx );
      84             : 
      85             :   /* FIXME IPv6 support */
      86           0 :   fd_addrinfo_t hints = {0};
      87           0 :   hints.ai_family = AF_INET;
      88           0 :   fd_addrinfo_t * res = NULL;
      89           0 :   uchar scratch[ 4096 ];
      90           0 :   void * pscratch = scratch;
      91           0 :   int err = fd_getaddrinfo( ctx->server_fqdn, &hints, &res, &pscratch, sizeof(scratch) );
      92           0 :   if( FD_UNLIKELY( err ) ) {
      93           0 :     FD_LOG_WARNING(( "fd_getaddrinfo `%s` failed (%d-%s)", ctx->server_fqdn, err, fd_gai_strerror( err ) ));
      94           0 :     fd_bundle_client_reset( ctx );
      95           0 :     ctx->metrics.transport_fail_cnt++;
      96           0 :     return;
      97           0 :   }
      98           0 :   uint const ip4_addr = ((struct sockaddr_in *)res->ai_addr)->sin_addr.s_addr;
      99           0 :   ctx->server_ip4_addr = ip4_addr;
     100             : 
     101           0 :   int tcp_sock = socket( AF_INET, SOCK_STREAM|SOCK_CLOEXEC, 0 );
     102           0 :   if( FD_UNLIKELY( tcp_sock<0 ) ) {
     103           0 :     FD_LOG_ERR(( "socket(AF_INET,SOCK_STREAM|SOCK_CLOEXEC,0) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     104           0 :   }
     105           0 :   ctx->tcp_sock = tcp_sock;
     106             : 
     107           0 :   if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_SOCKET, SO_RCVBUF, &ctx->so_rcvbuf, sizeof(int) ) ) ) {
     108           0 :     FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_RCVBUF,%i) failed (%i-%s)", ctx->so_rcvbuf, errno, fd_io_strerror( errno ) ));
     109           0 :   }
     110             : 
     111           0 :   int tcp_nodelay = 1;
     112           0 :   if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_TCP, TCP_NODELAY, &tcp_nodelay, sizeof(int) ) ) ) {
     113           0 :     FD_LOG_ERR(( "setsockopt failed (%d-%s)", errno, fd_io_strerror( errno ) ));
     114           0 :   }
     115             : 
     116           0 :   if( FD_UNLIKELY( fcntl( tcp_sock, F_SETFL, O_NONBLOCK )==-1 ) ) {
     117           0 :     FD_LOG_ERR(( "fcntl(tcp_sock,F_SETFL,O_NONBLOCK) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     118           0 :   }
     119             : 
     120           0 :   char const * scheme = "http";
     121           0 : # if FD_HAS_OPENSSL
     122           0 :   if( ctx->is_ssl ) scheme = "https";
     123           0 : # endif
     124             : 
     125           0 :   FD_LOG_INFO(( "Connecting to %s://" FD_IP4_ADDR_FMT ":%hu (%.*s)",
     126           0 :                 scheme,
     127           0 :                 FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
     128           0 :                 (int)ctx->server_sni_len, ctx->server_sni ));
     129             : 
     130           0 :   int connect_err = fd_bundle_client_do_connect( ctx, ip4_addr );
     131           0 :   if( FD_UNLIKELY( connect_err ) ) {
     132           0 :     if( FD_UNLIKELY( connect_err!=EINPROGRESS ) ) {
     133           0 :       FD_LOG_WARNING(( "connect(tcp_sock," FD_IP4_ADDR_FMT ":%u) failed (%i-%s)",
     134           0 :                       FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
     135           0 :                       connect_err, fd_io_strerror( connect_err ) ));
     136           0 :       fd_bundle_client_reset( ctx );
     137           0 :       ctx->metrics.transport_fail_cnt++;
     138           0 :       return;
     139           0 :     }
     140           0 :   }
     141             : 
     142           0 : # if FD_HAS_OPENSSL
     143           0 :   if( ctx->is_ssl ) {
     144           0 :     BIO * bio = BIO_new_socket( ctx->tcp_sock, BIO_NOCLOSE );
     145           0 :     if( FD_UNLIKELY( !bio ) ) {
     146           0 :       FD_LOG_ERR(( "BIO_new_socket failed" ));
     147           0 :     }
     148             : 
     149           0 :     SSL * ssl = SSL_new( ctx->ssl_ctx );
     150           0 :     if( FD_UNLIKELY( !ssl ) ) {
     151           0 :       FD_LOG_ERR(( "SSL_new failed" ));
     152           0 :     }
     153             : 
     154           0 :     SSL_set_bio( ssl, bio, bio ); /* moves ownership of bio */
     155           0 :     SSL_set_connect_state( ssl );
     156             : 
     157             :     /* Indicate to endpoint which server name we want */
     158           0 :     if( FD_UNLIKELY( !SSL_set_tlsext_host_name( ssl, ctx->server_sni ) ) ) {
     159           0 :       FD_LOG_ERR(( "SSL_set_tlsext_host_name failed" ));
     160           0 :     }
     161             : 
     162             :     /* Enable hostname verification */
     163           0 :     if( FD_UNLIKELY( !SSL_set1_host( ssl, ctx->server_sni ) ) ) {
     164           0 :       FD_LOG_ERR(( "SSL_set1_host failed" ));
     165           0 :     }
     166             : 
     167           0 :     ctx->ssl = ssl;
     168           0 :   }
     169           0 : # endif /* FD_HAS_OPENSSL */
     170             : 
     171           0 :   fd_grpc_client_reset( ctx->grpc_client );
     172           0 :   fd_keepalive_init( ctx->keepalive, ctx->rng, ctx->keepalive_interval, ctx->keepalive_interval, fd_bundle_now() );
     173           0 : }
     174             : 
     175             : static int
     176             : fd_bundle_client_drive_io( fd_bundle_tile_t * ctx,
     177          27 :                            int *              charge_busy ) {
     178          27 : # if FD_HAS_OPENSSL
     179          27 :   if( ctx->is_ssl ) {
     180           0 :     return fd_grpc_client_rxtx_ossl( ctx->grpc_client, ctx->ssl, charge_busy );
     181           0 :   }
     182          27 : # endif /* FD_HAS_OPENSSL */
     183             : 
     184          27 :   return fd_grpc_client_rxtx_socket( ctx->grpc_client, ctx->tcp_sock, charge_busy );
     185          27 : }
     186             : 
     187             : static void
     188           3 : fd_bundle_client_request_builder_info( fd_bundle_tile_t * ctx ) {
     189           3 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     190             : 
     191           3 :   block_engine_BlockBuilderFeeInfoRequest req = block_engine_BlockBuilderFeeInfoRequest_init_default;
     192           3 :   static char const path[] = "/block_engine.BlockEngineValidator/GetBlockBuilderFeeInfo";
     193           3 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     194           3 :       ctx->grpc_client,
     195           3 :       path, sizeof(path)-1,
     196           3 :       FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo,
     197           3 :       &block_engine_BlockBuilderFeeInfoRequest_msg, &req,
     198           3 :       ctx->auther.access_token, ctx->auther.access_token_sz
     199           3 :   );
     200           3 :   if( FD_UNLIKELY( !request ) ) return;
     201           3 :   fd_grpc_client_deadline_set(
     202           3 :       request,
     203           3 :       FD_GRPC_DEADLINE_RX_END,
     204           3 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     205             : 
     206           3 :   ctx->builder_info_wait = 1;
     207           3 : }
     208             : 
     209             : static void
     210           3 : fd_bundle_client_subscribe_packets( fd_bundle_tile_t * ctx ) {
     211           3 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     212             : 
     213           3 :   block_engine_SubscribePacketsRequest req = block_engine_SubscribePacketsRequest_init_default;
     214           3 :   static char const path[] = "/block_engine.BlockEngineValidator/SubscribePackets";
     215           3 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     216           3 :       ctx->grpc_client,
     217           3 :       path, sizeof(path)-1,
     218           3 :       FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets,
     219           3 :       &block_engine_SubscribePacketsRequest_msg, &req,
     220           3 :       ctx->auther.access_token, ctx->auther.access_token_sz
     221           3 :   );
     222           3 :   if( FD_UNLIKELY( !request ) ) return;
     223           3 :   fd_grpc_client_deadline_set(
     224           3 :       request,
     225           3 :       FD_GRPC_DEADLINE_HEADER,
     226           3 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     227             : 
     228           3 :   ctx->packet_subscription_wait = 1;
     229           3 : }
     230             : 
     231             : static void
     232           3 : fd_bundle_client_subscribe_bundles( fd_bundle_tile_t * ctx ) {
     233           3 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     234             : 
     235           3 :   block_engine_SubscribeBundlesRequest req = block_engine_SubscribeBundlesRequest_init_default;
     236           3 :   static char const path[] = "/block_engine.BlockEngineValidator/SubscribeBundles";
     237           3 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     238           3 :       ctx->grpc_client,
     239           3 :       path, sizeof(path)-1,
     240           3 :       FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles,
     241           3 :       &block_engine_SubscribeBundlesRequest_msg, &req,
     242           3 :       ctx->auther.access_token, ctx->auther.access_token_sz
     243           3 :   );
     244           3 :   if( FD_UNLIKELY( !request ) ) return;
     245           3 :   fd_grpc_client_deadline_set(
     246           3 :       request,
     247           3 :       FD_GRPC_DEADLINE_HEADER,
     248           3 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     249             : 
     250           3 :   ctx->bundle_subscription_wait = 1;
     251           3 : }
     252             : 
     253             : void
     254           3 : fd_bundle_client_send_ping( fd_bundle_tile_t * ctx ) {
     255           3 :   if( FD_UNLIKELY( !ctx->grpc_client ) ) return; /* no client */
     256           3 :   fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
     257           3 :   if( FD_UNLIKELY( !conn ) ) return; /* no conn */
     258           3 :   if( FD_UNLIKELY( conn->flags ) ) return; /* conn busy */
     259           3 :   fd_h2_rbuf_t * rbuf_tx = fd_grpc_client_rbuf_tx( ctx->grpc_client );
     260             : 
     261           3 :   if( FD_LIKELY( fd_h2_tx_ping( conn, rbuf_tx ) ) ) {
     262           3 :     long now = fd_bundle_now();
     263           3 :     fd_keepalive_tx( ctx->keepalive, ctx->rng, now );
     264           3 :     FD_LOG_DEBUG(( "Keepalive TX (deadline=+%gs)", (double)( ctx->keepalive->ts_deadline-now )/1e9 ));
     265           3 :   }
     266           3 : }
     267             : 
     268             : int
     269             : fd_bundle_client_step_reconnect( fd_bundle_tile_t * ctx,
     270          18 :                                  long               now ) {
     271             :   /* Drive auth */
     272          18 :   if( FD_UNLIKELY( ctx->auther.needs_poll ) ) {
     273           0 :     fd_bundle_auther_poll( &ctx->auther, ctx->grpc_client, ctx->keyguard_client );
     274           0 :     return 1;
     275           0 :   }
     276          18 :   if( FD_UNLIKELY( ctx->auther.state!=FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) return 0;
     277             : 
     278             :   /* Request block builder info */
     279          18 :   int const builder_info_expired = ( ctx->builder_info_valid_until - now )<0;
     280          18 :   if( FD_UNLIKELY( ( ( !ctx->builder_info_avail ) |
     281          18 :                      ( !builder_info_expired    ) ) &
     282          18 :                    ( !ctx->builder_info_wait      ) ) ) {
     283           3 :     fd_bundle_client_request_builder_info( ctx );
     284           3 :     return 1;
     285           3 :   }
     286             : 
     287             :   /* Subscribe to packets */
     288          15 :   if( FD_UNLIKELY( !ctx->packet_subscription_live && !ctx->packet_subscription_wait ) ) {
     289           3 :     fd_bundle_client_subscribe_packets( ctx );
     290           3 :     return 1;
     291           3 :   }
     292             : 
     293             :   /* Subscribe to bundles */
     294          12 :   if( FD_UNLIKELY( !ctx->bundle_subscription_live && !ctx->bundle_subscription_wait ) ) {
     295           3 :     fd_bundle_client_subscribe_bundles( ctx );
     296           3 :     return 1;
     297           3 :   }
     298             : 
     299             :   /* Send a PING */
     300           9 :   if( FD_UNLIKELY( fd_keepalive_should_tx( ctx->keepalive, now ) ) ) {
     301           3 :     fd_bundle_client_send_ping( ctx );
     302           3 :     return 1;
     303           3 :   }
     304             : 
     305           6 :   return 0;
     306           9 : }
     307             : 
     308             : static void
     309             : fd_bundle_client_step1( fd_bundle_tile_t * ctx,
     310          30 :                         int *              charge_busy ) {
     311             : 
     312             :   /* Wait for TCP socket to connect */
     313          30 :   if( FD_UNLIKELY( !ctx->tcp_sock_connected ) ) {
     314           0 :     if( FD_UNLIKELY( ctx->tcp_sock < 0 ) ) goto reconnect;
     315             : 
     316           0 :     struct pollfd pfds[1] = {
     317           0 :       { .fd = ctx->tcp_sock, .events = POLLOUT }
     318           0 :     };
     319           0 :     int poll_res = fd_syscall_poll( pfds, 1, 0 );
     320           0 :     if( FD_UNLIKELY( poll_res<0 ) ) {
     321           0 :       FD_LOG_ERR(( "fd_syscall_poll(tcp_sock) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     322           0 :     }
     323           0 :     if( poll_res==0 ) return;
     324             : 
     325           0 :     if( pfds[0].revents & (POLLERR|POLLHUP) ) {
     326           0 :       int connect_err = fd_bundle_client_do_connect( ctx, 0 );
     327           0 :       FD_LOG_INFO(( "Bundle gRPC connect attempt failed (%i-%s)", connect_err, fd_io_strerror( connect_err ) ));
     328           0 :       fd_bundle_client_reset( ctx );
     329           0 :       ctx->metrics.transport_fail_cnt++;
     330           0 :       *charge_busy = 1;
     331           0 :       return;
     332           0 :     }
     333           0 :     if( pfds[0].revents & POLLOUT ) {
     334           0 :       FD_LOG_DEBUG(( "Bundle TCP socket connected" ));
     335           0 :       ctx->tcp_sock_connected = 1;
     336           0 :       *charge_busy = 1;
     337           0 :       return;
     338           0 :     }
     339           0 :     return;
     340           0 :   }
     341             : 
     342             :   /* gRPC conn died? */
     343          30 :   if( FD_UNLIKELY( !ctx->grpc_client ) ) {
     344           0 :     long sleep_start;
     345           0 :   reconnect:
     346           0 :     sleep_start = fd_bundle_now();
     347           0 :     if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, sleep_start ) ) ) {
     348           0 :       long wait_dur = ctx->backoff_until - sleep_start;
     349           0 :       fd_log_sleep( fd_long_min( wait_dur, 1e6 ) );
     350           0 :       return;
     351           0 :     }
     352           0 :     fd_bundle_client_create_conn( ctx );
     353           0 :     *charge_busy = 1;
     354           0 :     return;
     355           0 :   }
     356             : 
     357             :   /* Did a HTTP/2 PING time out */
     358          30 :   long check_ts = ctx->cached_ts = fd_bundle_now();
     359          30 :   if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, check_ts ) ) ) {
     360           3 :     FD_LOG_WARNING(( "Bundle gRPC timed out (HTTP/2 PING went unanswered for %.2f seconds)",
     361           3 :                      (double)( check_ts - ctx->keepalive->ts_last_tx )/1e9 ));
     362           3 :     ctx->keepalive->inflight = 0;
     363           3 :     ctx->defer_reset = 1;
     364           3 :     *charge_busy = 1;
     365           3 :     return;
     366           3 :   }
     367             : 
     368             :   /* Drive I/O, SSL handshake, and any inflight requests */
     369          27 :   if( FD_UNLIKELY( !fd_bundle_client_drive_io( ctx, charge_busy ) ||
     370          27 :                    ctx->defer_reset /* new error? */ ) ) {
     371           0 :     fd_bundle_client_reset( ctx );
     372           0 :     ctx->metrics.transport_fail_cnt++;
     373           0 :     *charge_busy = 1;
     374           0 :     return;
     375           0 :   }
     376             : 
     377             :   /* Are we ready to issue a new request? */
     378          27 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     379          18 :   long io_ts = fd_bundle_now();
     380          18 :   if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, io_ts ) ) ) return;
     381             : 
     382          18 :   *charge_busy |= fd_bundle_client_step_reconnect( ctx, io_ts );
     383          18 : }
     384             : 
     385             : static void
     386          30 : fd_bundle_client_log_status( fd_bundle_tile_t * ctx ) {
     387          30 :   int status = fd_bundle_client_status( ctx );
     388             : 
     389          30 :   int const connected_now    = ( status==FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED );
     390          30 :   int const connected_before = ( ctx->bundle_status_logged==FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED );
     391             : 
     392          30 :   if( FD_UNLIKELY( connected_now!=connected_before ) ) {
     393           3 :     long ts = fd_log_wallclock();
     394           3 :     if( FD_LIKELY( ts-(ctx->last_bundle_status_log_nanos) >= (long)1e6 ) ) {
     395           3 :       if( connected_now ) {
     396           3 :         FD_LOG_NOTICE(( "Connected to bundle server" ));
     397           3 :       } else {
     398           0 :         FD_LOG_WARNING(( "Disconnected from bundle server" ));
     399           0 :       }
     400           3 :       ctx->last_bundle_status_log_nanos = ts;
     401           3 :       ctx->bundle_status_logged = (uchar)status;
     402           3 :     }
     403           3 :   }
     404          30 : }
     405             : 
     406             : void
     407             : fd_bundle_client_step( fd_bundle_tile_t * ctx,
     408          30 :                        int *              charge_busy ) {
     409             :   /* Edge-trigger logging with rate limiting */
     410          30 :   fd_bundle_client_step1( ctx, charge_busy );
     411          30 :   fd_bundle_client_log_status( ctx );
     412          30 : }
     413             : 
     414             : void
     415             : fd_bundle_tile_backoff( fd_bundle_tile_t * ctx,
     416          12 :                         long               now ) {
     417          12 :   uint iter = ctx->backoff_iter;
     418          12 :   if( now < ctx->backoff_reset ) iter = 0U;
     419          12 :   iter++;
     420             : 
     421             :   /* FIXME proper backoff */
     422          12 :   long wait_ns = (long)2e9;
     423          12 :   wait_ns = (long)( fd_rng_ulong( ctx->rng ) & ( (1UL<<fd_ulong_find_msb_w_default( (ulong)wait_ns, 0 ))-1UL ) );
     424             : 
     425          12 :   ctx->backoff_until = now +   wait_ns;
     426          12 :   ctx->backoff_reset = now + 2*wait_ns;
     427             : 
     428          12 :   ctx->backoff_iter = iter;
     429          12 : }
     430             : 
     431             : static void
     432           0 : fd_bundle_client_grpc_conn_established( void * app_ctx ) {
     433           0 :   (void)app_ctx;
     434           0 :   FD_LOG_INFO(( "Bundle gRPC connection established" ));
     435           0 : }
     436             : 
     437             : static void
     438             : fd_bundle_client_grpc_conn_dead( void * app_ctx,
     439             :                                  uint   h2_err,
     440           0 :                                  int    closed_by ) {
     441           0 :   fd_bundle_tile_t * ctx = app_ctx;
     442           0 :   FD_LOG_INFO(( "Bundle gRPC connection closed %s (%u-%s)",
     443           0 :                 closed_by ? "by peer" : "due to error",
     444           0 :                 h2_err, fd_h2_strerror( h2_err ) ));
     445           0 :   ctx->defer_reset = 1;
     446           0 : }
     447             : 
     448             : /* Forwards a bundle transaction to the tango message bus. */
     449             : 
     450             : static void
     451             : fd_bundle_tile_publish_bundle_txn(
     452             :     fd_bundle_tile_t * ctx,
     453             :     void const *       txn,
     454             :     ulong              txn_sz,  /* <=FD_TXN_MTU */
     455             :     ulong              bundle_txn_cnt,
     456             :     uint               source_ipv4
     457          30 : ) {
     458          30 :   if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
     459           0 :     ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
     460           0 :     return;
     461           0 :   }
     462             : 
     463          30 :   fd_txn_m_t * txnm = fd_chunk_to_laddr( ctx->verify_out.mem, ctx->verify_out.chunk );
     464          30 :   *txnm = (fd_txn_m_t) {
     465          30 :     .reference_slot = 0UL,
     466          30 :     .payload_sz     = (ushort)txn_sz,
     467          30 :     .txn_t_sz       = 0U,
     468          30 :     .source_ipv4      = source_ipv4,
     469          30 :     .source_tpu       = FD_TXN_M_TPU_SOURCE_BUNDLE,
     470          30 :     .block_engine   = {
     471          30 :       .bundle_id      = ctx->bundle_seq,
     472          30 :       .bundle_txn_cnt = bundle_txn_cnt,
     473          30 :       .commission     = (uchar)ctx->builder_commission
     474          30 :     },
     475          30 :   };
     476          30 :   memcpy( txnm->block_engine.commission_pubkey, ctx->builder_pubkey, 32UL );
     477          30 :   fd_memcpy( fd_txn_m_payload( txnm ), txn, txn_sz );
     478             : 
     479          30 :   ulong sz  = fd_txn_m_realized_footprint( txnm, 0, 0 );
     480          30 :   ulong sig = 1UL;
     481             : 
     482          30 :   if( FD_UNLIKELY( !ctx->stem ) ) {
     483           0 :     FD_LOG_CRIT(( "ctx->stem not set. This is a bug." ));
     484           0 :   }
     485             : 
     486          30 :   ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_bundle_now() );
     487          30 :   fd_stem_publish( ctx->stem, ctx->verify_out.idx, sig, ctx->verify_out.chunk, sz, 0UL, 0UL, tspub );
     488          30 :   ctx->verify_out.chunk = fd_dcache_compact_next( ctx->verify_out.chunk, sz, ctx->verify_out.chunk0, ctx->verify_out.wmark );
     489          30 :   ctx->metrics.txn_received_cnt++;
     490          30 : }
     491             : 
     492             : /* Forwards a regular transaction to the tango message bus. */
     493             : 
     494             : static void
     495             : fd_bundle_tile_publish_txn(
     496             :     fd_bundle_tile_t * ctx,
     497             :     void const *       txn,
     498             :     ulong              txn_sz,  /* <=FD_TXN_MTU */
     499             :     uint               source_ipv4
     500           9 : ) {
     501           9 :   fd_txn_m_t * txnm = fd_chunk_to_laddr( ctx->verify_out.mem, ctx->verify_out.chunk );
     502           9 :   *txnm = (fd_txn_m_t) {
     503           9 :     .reference_slot = 0UL,
     504           9 :     .payload_sz     = (ushort)txn_sz,
     505           9 :     .txn_t_sz       = 0U,
     506           9 :     .source_ipv4    = source_ipv4,
     507           9 :     .source_tpu     = FD_TXN_M_TPU_SOURCE_BUNDLE,
     508           9 :     .block_engine   = {
     509           9 :       .bundle_id         = 0UL,
     510           9 :       .bundle_txn_cnt    = 1UL,
     511           9 :       .commission        = 0U,
     512           9 :       .commission_pubkey = {0U}
     513           9 :     },
     514           9 :   };
     515           9 :   fd_memcpy( fd_txn_m_payload( txnm ), txn, txn_sz );
     516             : 
     517           9 :   ulong sz  = fd_txn_m_realized_footprint( txnm, 0, 0 );
     518           9 :   ulong sig = 0UL;
     519             : 
     520           9 :   if( FD_UNLIKELY( !ctx->stem ) ) {
     521           0 :     FD_LOG_CRIT(( "ctx->stem not set. This is a bug." ));
     522           0 :   }
     523             : 
     524           9 :   ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_bundle_now() );
     525           9 :   fd_stem_publish( ctx->stem, ctx->verify_out.idx, sig, ctx->verify_out.chunk, sz, 0UL, 0UL, tspub );
     526           9 :   ctx->verify_out.chunk = fd_dcache_compact_next( ctx->verify_out.chunk, sz, ctx->verify_out.chunk0, ctx->verify_out.wmark );
     527           9 :   ctx->metrics.txn_received_cnt++;
     528           9 : }
     529             : 
     530             : /* Called for each transaction in a bundle.  Simply counts up
     531             :    bundle_txn_cnt, but does not publish anything. */
     532             : 
     533             : static bool
     534             : fd_bundle_client_visit_pb_bundle_txn_preflight(
     535             :     pb_istream_t *     istream,
     536             :     pb_field_t const * field,
     537             :     void **            arg
     538          48 : ) {
     539          48 :   (void)istream; (void)field;
     540          48 :   fd_bundle_tile_t * ctx = *arg;
     541          48 :   ctx->bundle_txn_cnt++;
     542          48 :   return true;
     543          48 : }
     544             : 
     545             : /* Called for each transaction in a bundle.  Publishes each transaction
     546             :    to the tango message bus. */
     547             : 
     548             : static bool
     549             : fd_bundle_client_visit_pb_bundle_txn(
     550             :     pb_istream_t *     istream,
     551             :     pb_field_t const * field,
     552             :     void **            arg
     553          30 : ) {
     554          30 :   (void)field;
     555          30 :   fd_bundle_tile_t * ctx = *arg;
     556             : 
     557          30 :   packet_Packet packet = packet_Packet_init_default;
     558          30 :   if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
     559           0 :     ctx->metrics.decode_fail_cnt++;
     560           0 :     FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
     561           0 :     return false;
     562           0 :   }
     563             : 
     564          30 :   if( FD_UNLIKELY( packet.data.size == 0 ) ) {
     565           0 :     FD_LOG_WARNING(( "Bundle server delivered an empty packet, ignoring" ));
     566           0 :     return true;
     567           0 :   }
     568             : 
     569          30 :   if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
     570           0 :     FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
     571           0 :     return true;
     572           0 :   }
     573             : 
     574          30 :   uint _ip4; uint ip4 = fd_uint_if( packet.has_meta, fd_cstr_to_ip4_addr( packet.meta.addr, &_ip4 ) ? _ip4 : ctx->server_ip4_addr, ctx->server_ip4_addr );
     575          30 :   fd_bundle_tile_publish_bundle_txn(
     576          30 :       ctx,
     577          30 :       packet.data.bytes, packet.data.size,
     578          30 :       ctx->bundle_txn_cnt,
     579          30 :       ip4
     580          30 :   );
     581             : 
     582          30 :   return true;
     583          30 : }
     584             : 
     585             : static void
     586             : fd_bundle_client_sample_rx_delay(
     587             :     fd_bundle_tile_t *                ctx,
     588             :     google_protobuf_Timestamp const * ts
     589          12 : ) {
     590          12 :   ulong tsorig = (ulong)ts->seconds*(ulong)1e9 + (ulong)ts->nanos;
     591          12 :   fd_histf_sample( ctx->metrics.msg_rx_delay, fd_ulong_sat_sub( (ulong)ctx->cached_ts, tsorig ) );
     592          12 : }
     593             : 
     594             : /* Called for each BundleUuid in a SubscribeBundlesResponse. */
     595             : 
     596             : static bool
     597             : fd_bundle_client_visit_pb_bundle_uuid(
     598             :     pb_istream_t *     istream,
     599             :     pb_field_t const * field,
     600             :     void **            arg
     601           9 : ) {
     602           9 :   (void)field;
     603           9 :   fd_bundle_tile_t * ctx = *arg;
     604             : 
     605             :   /* Reset bundle state */
     606             : 
     607           9 :   ctx->bundle_txn_cnt = 0UL;
     608             : 
     609             :   /* Do two decode passes.  This is required because we need to know the
     610             :      number of transactions in a bundle ahead of time.  However, due to
     611             :      the Protobuf wire encoding, we don't know the number of txns that
     612             :      will come until we've parsed everything.
     613             : 
     614             :      First pass: Count number of bundles. */
     615             : 
     616           9 :   pb_istream_t peek = *istream;
     617           9 :   bundle_BundleUuid bundle = bundle_BundleUuid_init_default;
     618           9 :   bundle.bundle.packets = (pb_callback_t) {
     619           9 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_txn_preflight,
     620           9 :     .arg          = ctx
     621           9 :   };
     622           9 :   if( FD_UNLIKELY( !pb_decode( &peek, &bundle_BundleUuid_msg, &bundle ) ) ) {
     623           0 :     ctx->metrics.decode_fail_cnt++;
     624           0 :     FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed: %s", peek.errmsg ));
     625           0 :     return false;
     626           0 :   }
     627             : 
     628             :   /* At this opint, ctx->bundle_txn_cnt is correctly set.  Too many txns
     629             :      is treated as a NOP.
     630             : 
     631             :      Second pass: Actually publish bundle packets */
     632             : 
     633           9 :   if( FD_UNLIKELY( ctx->bundle_txn_cnt>FD_BUNDLE_CLIENT_MAX_TXN_PER_BUNDLE ) ) return true;
     634             : 
     635           6 :   ctx->bundle_seq++;
     636           6 :   bundle = (bundle_BundleUuid)bundle_BundleUuid_init_default;
     637           6 :   bundle.bundle.packets = (pb_callback_t) {
     638           6 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_txn,
     639           6 :     .arg          = ctx
     640           6 :   };
     641             : 
     642           6 :   ctx->metrics.bundle_received_cnt++;
     643             : 
     644           6 :   if( FD_UNLIKELY( !pb_decode( istream, &bundle_BundleUuid_msg, &bundle ) ) ) {
     645           0 :     ctx->metrics.decode_fail_cnt++;
     646           0 :     FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed (internal error): %s", istream->errmsg ));
     647           0 :     return false;
     648           0 :   }
     649             : 
     650           6 :   fd_bundle_client_sample_rx_delay( ctx, &bundle.bundle.header.ts );
     651             : 
     652           6 :   return true;
     653           6 : }
     654             : 
     655             : /* Handle a SubscribeBundlesResponse from a SubscribeBundles gRPC call. */
     656             : 
     657             : static void
     658             : fd_bundle_client_handle_bundle_batch(
     659             :     fd_bundle_tile_t * ctx,
     660             :     pb_istream_t *     istream
     661          12 : ) {
     662          12 :   if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
     663           3 :     ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
     664           3 :     return;
     665           3 :   }
     666             : 
     667           9 :   block_engine_SubscribeBundlesResponse res = block_engine_SubscribeBundlesResponse_init_default;
     668           9 :   res.bundles = (pb_callback_t) {
     669           9 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_uuid,
     670           9 :     .arg          = ctx
     671           9 :   };
     672           9 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribeBundlesResponse_msg, &res ) ) ) {
     673           0 :     ctx->metrics.decode_fail_cnt++;
     674           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribeBundlesResponse) failed: %s", istream->errmsg ));
     675           0 :     return;
     676           0 :   }
     677           9 : }
     678             : 
     679             : /* Called for each 'Packet' (a regular transaction) of a
     680             :    SubscribePacketsResponse. */
     681             : 
     682             : static bool
     683             : fd_bundle_client_visit_pb_packet(
     684             :     pb_istream_t *     istream,
     685             :     pb_field_t const * field,
     686             :     void **            arg
     687           9 : ) {
     688           9 :   (void)field;
     689           9 :   fd_bundle_tile_t * ctx = *arg;
     690             : 
     691           9 :   packet_Packet packet = packet_Packet_init_default;
     692           9 :   if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
     693           0 :     ctx->metrics.decode_fail_cnt++;
     694           0 :     FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
     695           0 :     return false;
     696           0 :   }
     697             : 
     698           9 :   if( FD_UNLIKELY( packet.data.size == 0 ) ) {
     699           0 :     FD_LOG_WARNING(( "Bundle server delivered an empty packet, ignoring" ));
     700           0 :     return true;
     701           0 :   }
     702             : 
     703           9 :   if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
     704           0 :     FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
     705           0 :     return true;
     706           0 :   }
     707             : 
     708             : 
     709           9 :   uint _ip4; uint ip4 = fd_uint_if( packet.has_meta, fd_cstr_to_ip4_addr( packet.meta.addr, &_ip4 ) ? _ip4 : 0U, 0U );
     710           9 :   fd_bundle_tile_publish_txn( ctx, packet.data.bytes, packet.data.size, ip4 );
     711           9 :   ctx->metrics.packet_received_cnt++;
     712             : 
     713           9 :   return true;
     714           9 : }
     715             : 
     716             : /* Handle a SubscribePacketsResponse from a SubscribePackets gRPC call. */
     717             : 
     718             : static void
     719             : fd_bundle_client_handle_packet_batch(
     720             :     fd_bundle_tile_t * ctx,
     721             :     pb_istream_t *     istream
     722           6 : ) {
     723           6 :   block_engine_SubscribePacketsResponse res = block_engine_SubscribePacketsResponse_init_default;
     724           6 :   res.batch.packets = (pb_callback_t) {
     725           6 :     .funcs.decode = fd_bundle_client_visit_pb_packet,
     726           6 :     .arg          = ctx
     727           6 :   };
     728           6 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribePacketsResponse_msg, &res ) ) ) {
     729           0 :     ctx->metrics.decode_fail_cnt++;
     730           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribePacketsResponse) failed" ));
     731           0 :     return;
     732           0 :   }
     733             : 
     734           6 :   fd_bundle_client_sample_rx_delay( ctx, &res.header.ts );
     735           6 : }
     736             : 
     737             : /* Handle a BlockBuilderFeeInfoResponse from a GetBlockBuilderFeeInfo
     738             :    gRPC call. */
     739             : 
     740             : static void
     741             : fd_bundle_client_handle_builder_fee_info(
     742             :     fd_bundle_tile_t * ctx,
     743             :     pb_istream_t *     istream
     744           9 : ) {
     745           9 :   block_engine_BlockBuilderFeeInfoResponse res = block_engine_BlockBuilderFeeInfoResponse_init_default;
     746           9 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_BlockBuilderFeeInfoResponse_msg, &res ) ) ) {
     747           0 :     ctx->metrics.decode_fail_cnt++;
     748           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.BlockBuilderFeeInfoResponse) failed" ));
     749           0 :     return;
     750           0 :   }
     751           9 :   if( FD_UNLIKELY( res.commission > 100 ) ) {
     752           3 :     ctx->metrics.decode_fail_cnt++;
     753           3 :     FD_LOG_WARNING(( "BlockBuilderFeeInfoResponse commission out of range (0-100): %lu", res.commission ));
     754           3 :     return;
     755           3 :   }
     756             : 
     757           6 :   ctx->builder_commission = (uchar)res.commission;
     758           6 :   if( FD_UNLIKELY( !fd_base58_decode_32( res.pubkey, ctx->builder_pubkey ) ) ) {
     759           3 :     FD_LOG_HEXDUMP_WARNING(( "Invalid pubkey in BlockBuilderFeeInfoResponse", res.pubkey, strnlen( res.pubkey, sizeof(res.pubkey) ) ));
     760           3 :     return;
     761           3 :   }
     762             : 
     763           3 :   long validity_duration_ns = (long)( 60e9 * 5. ); /* 5 minutes */
     764           3 :   ctx->builder_info_avail = 1;
     765           3 :   ctx->builder_info_valid_until = fd_bundle_now() + validity_duration_ns;
     766           3 : }
     767             : 
     768             : static void
     769             : fd_bundle_client_grpc_tx_complete(
     770             :     void * app_ctx,
     771             :     ulong  request_ctx
     772           9 : ) {
     773           9 :   (void)app_ctx; (void)request_ctx;
     774           9 : }
     775             : 
     776             : void
     777             : fd_bundle_client_grpc_rx_start(
     778             :     void * app_ctx,
     779             :     ulong  request_ctx
     780           9 : ) {
     781           9 :   fd_bundle_tile_t * ctx = app_ctx;
     782           9 :   switch( request_ctx ) {
     783           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     784           3 :     ctx->packet_subscription_live = 1;
     785           3 :     ctx->packet_subscription_wait = 0;
     786           3 :     break;
     787           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     788           3 :     ctx->bundle_subscription_live = 1;
     789           3 :     ctx->bundle_subscription_wait = 0;
     790           3 :     break;
     791           9 :   }
     792           9 : }
     793             : 
     794             : void
     795             : fd_bundle_client_grpc_rx_msg(
     796             :     void *       app_ctx,
     797             :     void const * protobuf,
     798             :     ulong        protobuf_sz,
     799             :     ulong        request_ctx
     800          27 : ) {
     801          27 :   fd_bundle_tile_t * ctx = app_ctx;
     802          27 :   pb_istream_t istream = pb_istream_from_buffer( protobuf, protobuf_sz );
     803          27 :   switch( request_ctx ) {
     804           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     805           0 :     if( FD_UNLIKELY( !fd_bundle_auther_handle_challenge_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
     806           0 :       ctx->metrics.decode_fail_cnt++;
     807           0 :       fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     808           0 :     }
     809           0 :     break;
     810           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     811           0 :     if( FD_UNLIKELY( !fd_bundle_auther_handle_tokens_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
     812           0 :       ctx->metrics.decode_fail_cnt++;
     813           0 :       fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     814           0 :     }
     815           0 :     break;
     816          12 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     817          12 :     fd_bundle_client_handle_bundle_batch( ctx, &istream );
     818          12 :     break;
     819           6 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     820           6 :     fd_bundle_client_handle_packet_batch( ctx, &istream );
     821           6 :     break;
     822           9 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
     823           9 :     fd_bundle_client_handle_builder_fee_info( ctx, &istream );
     824           9 :     break;
     825           0 :   default:
     826           0 :     FD_LOG_ERR(( "Received unexpected gRPC message (request_ctx=%lu)", request_ctx ));
     827          27 :   }
     828          27 : }
     829             : 
     830             : static void
     831             : fd_bundle_client_request_failed( fd_bundle_tile_t * ctx,
     832           0 :                                  ulong              request_ctx ) {
     833           0 :   fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     834           0 :   switch( request_ctx ) {
     835           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     836           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     837           0 :     fd_bundle_auther_handle_request_fail( &ctx->auther );
     838           0 :     break;
     839           0 :   }
     840           0 : }
     841             : 
     842             : void
     843             : fd_bundle_client_grpc_rx_end(
     844             :     void *                app_ctx,
     845             :     ulong                 request_ctx,
     846             :     fd_grpc_resp_hdrs_t * resp
     847          12 : ) {
     848          12 :   fd_bundle_tile_t * ctx = app_ctx;
     849          12 :   if( FD_UNLIKELY( resp->h2_status!=200 ) ) {
     850           0 :     FD_LOG_WARNING(( "gRPC request failed (HTTP status %u)", resp->h2_status ));
     851           0 :     fd_bundle_client_request_failed( ctx, request_ctx );
     852           0 :     return;
     853           0 :   }
     854             : 
     855          12 :   resp->grpc_msg_len = (uint)fd_url_unescape( resp->grpc_msg, resp->grpc_msg_len );
     856          12 :   if( !resp->grpc_msg_len ) {
     857          12 :     fd_memcpy( resp->grpc_msg, "unknown error", 13 );
     858          12 :     resp->grpc_msg_len = 13;
     859          12 :   }
     860             : 
     861          12 :   switch( request_ctx ) {
     862           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     863           0 :     ctx->packet_subscription_live = 0;
     864           0 :     ctx->packet_subscription_wait = 0;
     865           0 :     fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     866           0 :     ctx->defer_reset = 1;
     867           0 :     FD_LOG_INFO(( "SubscribePackets stream failed (gRPC status %u-%s). Reconnecting ...",
     868           0 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
     869           0 :     return;
     870           9 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     871           9 :     ctx->bundle_subscription_live = 0;
     872           9 :     ctx->bundle_subscription_wait = 0;
     873           9 :     fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     874           9 :     ctx->defer_reset = 1;
     875           9 :     FD_LOG_INFO(( "SubscribeBundles stream failed (gRPC status %u-%s). Reconnecting ...",
     876           9 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
     877           9 :     return;
     878           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
     879           3 :     ctx->builder_info_wait = 0;
     880           3 :     break;
     881           0 :   default:
     882           0 :     break;
     883          12 :   }
     884             : 
     885           3 :   if( FD_UNLIKELY( resp->grpc_status!=FD_GRPC_STATUS_OK ) ) {
     886           0 :     FD_LOG_INFO(( "gRPC request failed (gRPC status %u-%s): %.*s",
     887           0 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
     888           0 :                   (int)resp->grpc_msg_len, resp->grpc_msg ));
     889           0 :     fd_bundle_client_request_failed( ctx, request_ctx );
     890           0 :     if( resp->grpc_status==FD_GRPC_STATUS_UNAUTHENTICATED ||
     891           0 :         resp->grpc_status==FD_GRPC_STATUS_PERMISSION_DENIED ) {
     892           0 :       fd_bundle_auther_reset( &ctx->auther );
     893           0 :     }
     894           0 :     return;
     895           0 :   }
     896           3 : }
     897             : 
     898             : void
     899             : fd_bundle_client_grpc_rx_timeout(
     900             :     void * app_ctx,
     901             :     ulong  request_ctx,  /* FD_BUNDLE_CLIENT_REQ_{...} */
     902             :     int    deadline_kind /* FD_GRPC_DEADLINE_{HEADER|RX_END} */
     903           6 : ) {
     904           6 :   (void)deadline_kind;
     905           6 :   FD_LOG_WARNING(( "Request timed out: %s", fd_bundle_request_ctx_cstr( request_ctx ) ));
     906           6 :   fd_bundle_tile_t * ctx = app_ctx;
     907           6 :   ctx->defer_reset = 1;
     908           6 : }
     909             : 
     910             : static void
     911           3 : fd_bundle_client_grpc_ping_ack( void * app_ctx ) {
     912           3 :   fd_bundle_tile_t * ctx = app_ctx;
     913           3 :   long rtt_sample = fd_keepalive_rx( ctx->keepalive, fd_bundle_now() );
     914           3 :   if( FD_LIKELY( rtt_sample ) ) {
     915           3 :     fd_rtt_sample( ctx->rtt, (float)rtt_sample, 0 );
     916           3 :     FD_LOG_DEBUG(( "Keepalive ACK" ));
     917           3 :   }
     918           3 :   ctx->metrics.ping_ack_cnt++;
     919           3 : }
     920             : 
     921             : fd_grpc_client_callbacks_t fd_bundle_client_grpc_callbacks = {
     922             :   .conn_established = fd_bundle_client_grpc_conn_established,
     923             :   .conn_dead        = fd_bundle_client_grpc_conn_dead,
     924             :   .tx_complete      = fd_bundle_client_grpc_tx_complete,
     925             :   .rx_start         = fd_bundle_client_grpc_rx_start,
     926             :   .rx_msg           = fd_bundle_client_grpc_rx_msg,
     927             :   .rx_end           = fd_bundle_client_grpc_rx_end,
     928             :   .rx_timeout       = fd_bundle_client_grpc_rx_timeout,
     929             :   .ping_ack         = fd_bundle_client_grpc_ping_ack,
     930             : };
     931             : 
     932             : /* Decrease verbosity */
     933          12 : #define DISCONNECTED FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_DISCONNECTED
     934          54 : #define CONNECTING   FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTING
     935         108 : #define CONNECTED    FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED
     936             : 
     937             : int
     938         174 : fd_bundle_client_status( fd_bundle_tile_t const * ctx ) {
     939         174 :   if( FD_UNLIKELY( ( !ctx->tcp_sock_connected ) |
     940         174 :                    ( !ctx->grpc_client        ) ) ) {
     941           3 :     return DISCONNECTED;
     942           3 :   }
     943             : 
     944         171 :   fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
     945         171 :   if( FD_UNLIKELY( !conn ) ) {
     946           0 :     return DISCONNECTED; /* no conn */
     947           0 :   }
     948         171 :   if( FD_UNLIKELY( conn->flags &
     949         171 :       ( FD_H2_CONN_FLAGS_DEAD |
     950         171 :         FD_H2_CONN_FLAGS_SEND_GOAWAY ) ) ) {
     951           6 :     return DISCONNECTED;
     952           6 :   }
     953             : 
     954         165 :   if( FD_UNLIKELY( conn->flags &
     955         165 :       ( FD_H2_CONN_FLAGS_CLIENT_INITIAL      |
     956         165 :         FD_H2_CONN_FLAGS_WAIT_SETTINGS_ACK_0 |
     957         165 :         FD_H2_CONN_FLAGS_WAIT_SETTINGS_0     |
     958         165 :         FD_H2_CONN_FLAGS_SERVER_INITIAL ) ) ) {
     959          12 :     return CONNECTING; /* connection is not ready */
     960          12 :   }
     961             : 
     962         153 :   if( FD_UNLIKELY( ctx->auther.state != FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) {
     963          12 :     return CONNECTING; /* not authenticated */
     964          12 :   }
     965             : 
     966         141 :   if( FD_UNLIKELY( ( !ctx->builder_info_avail       ) |
     967         141 :                    ( !ctx->packet_subscription_live ) |
     968         141 :                    ( !ctx->bundle_subscription_live ) ) ) {
     969          27 :     return CONNECTING; /* not fully connected */
     970          27 :   }
     971             : 
     972         114 :   if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, fd_bundle_now() ) ) ) {
     973           3 :     return DISCONNECTED; /* possible timeout */
     974           3 :   }
     975             : 
     976         111 :   if( FD_UNLIKELY( !fd_grpc_client_is_connected( ctx->grpc_client ) ) ) {
     977           3 :     return CONNECTING;
     978           3 :   }
     979             : 
     980             :   /* As far as we know, the bundle connection is alive and well. */
     981         108 :   return CONNECTED;
     982         111 : }
     983             : 
     984             : #undef DISCONNECTED
     985             : #undef CONNECTING
     986             : #undef CONNECTED
     987             : 
     988             : FD_FN_CONST char const *
     989           6 : fd_bundle_request_ctx_cstr( ulong request_ctx ) {
     990           6 :   switch( request_ctx ) {
     991           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     992           0 :     return "GenerateAuthChallenge";
     993           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     994           0 :     return "GenerateAuthTokens";
     995           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     996           0 :     return "SubscribePackets";
     997           6 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     998           6 :     return "SubscribeBundles";
     999           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
    1000           0 :     return "GetBlockBuilderFeeInfo";
    1001           0 :   default:
    1002           0 :     return "unknown";
    1003           6 :   }
    1004           6 : }

Generated by: LCOV version 1.14