LCOV - code coverage report
Current view: top level - disco/bundle - fd_bundle_client.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 432 695 62.2 %
Date: 2026-06-29 05:51:35 Functions: 31 36 86.1 %

          Line data    Source code
       1             : /* fd_bundle_client.c steps gRPC related tasks. */
       2             : 
       3             : #define _GNU_SOURCE /* SOL_TCP */
       4             : #include "fd_bundle_auth.h"
       5             : #include "fd_bundle_tile_private.h"
       6             : #include "fd_bundle_tile.h"
       7             : #include "proto/block_engine.pb.h"
       8             : #include "proto/bundle.pb.h"
       9             : #include "proto/packet.pb.h"
      10             : #include "../fd_txn_m.h"
      11             : #include "../../waltz/h2/fd_h2_conn.h"
      12             : #include "../../waltz/http/fd_url.h" /* fd_url_unescape */
      13             : #include "../../waltz/openssl/fd_openssl.h"
      14             : #include "../../ballet/base58/fd_base58.h"
      15             : #include "../../ballet/nanopb/pb_decode.h"
      16             : #include "../../util/net/fd_ip4.h"
      17             : 
      18             : #include <fcntl.h>
      19             : #include <errno.h>
      20             : #include <unistd.h> /* close */
      21             : #include <poll.h> /* poll */
      22             : #include <sys/socket.h> /* socket */
      23             : #include <netinet/in.h>
      24             : #include <netinet/ip.h>
      25             : #include <netinet/tcp.h>
      26             : 
      27           9 : #define FD_BUNDLE_CLIENT_REQUEST_TIMEOUT ((long)8e9) /* 8 seconds */
      28             : 
      29             : 
      30             : __attribute__((weak)) long
      31         930 : fd_bundle_now( void ) {
      32         930 :   return fd_log_wallclock();
      33         930 : }
      34             : 
      35             : void
      36           3 : fd_bundle_client_reset( fd_bundle_tile_t * ctx ) {
      37           3 :   if( FD_UNLIKELY( ctx->tcp_sock >= 0 ) ) {
      38           3 :     if( FD_UNLIKELY( 0!=close( ctx->tcp_sock ) ) ) {
      39           0 :       FD_LOG_ERR(( "close(tcp_sock=%i) failed (%i-%s)", ctx->tcp_sock, errno, fd_io_strerror( errno ) ));
      40           0 :     }
      41           3 :     ctx->tcp_sock = -1;
      42           3 :     ctx->tcp_sock_connected = 0;
      43           3 :   }
      44           3 :   ctx->defer_reset = 0;
      45             : 
      46           3 :   ctx->builder_info_avail       = 0;
      47           3 :   ctx->builder_info_wait        = 0;
      48           3 :   ctx->packet_subscription_live = 0;
      49           3 :   ctx->packet_subscription_wait = 0;
      50           3 :   ctx->bundle_subscription_live = 0;
      51           3 :   ctx->bundle_subscription_wait = 0;
      52             : 
      53           3 :   fd_memset( ctx->rtt, 0, sizeof(fd_rtt_estimate_t) );
      54             : 
      55           3 : # if FD_HAS_OPENSSL
      56           3 :   if( FD_UNLIKELY( ctx->ssl ) ) {
      57           0 :     SSL_free( ctx->ssl );
      58           0 :     ctx->ssl = NULL;
      59           0 :   }
      60           3 : # endif
      61             : 
      62           3 :   fd_bundle_tile_backoff( ctx, fd_bundle_now() );
      63             : 
      64           3 :   fd_bundle_auther_reset( &ctx->auther );
      65           3 :   fd_grpc_client_reset( ctx->grpc_client );
      66           3 : }
      67             : 
      68             : static int
      69             : fd_bundle_client_do_connect( fd_bundle_tile_t const * ctx,
      70           0 :                              uint                     ip4_addr ) {
      71           0 :   struct sockaddr_in addr = {
      72           0 :     .sin_family      = AF_INET,
      73           0 :     .sin_addr.s_addr = ip4_addr,
      74           0 :     .sin_port        = fd_ushort_bswap( ctx->server_tcp_port )
      75           0 :   };
      76           0 :   int err = connect( ctx->tcp_sock, fd_type_pun_const( &addr ), sizeof(struct sockaddr_in) );
      77             :   /* FD_LIKELY is used here as EINPROGRESS is expected even to local tcp ports */
      78           0 :   if( FD_LIKELY( err==-1 ) ) {
      79           0 :     return errno;
      80           0 :   }
      81           0 :   return 0;
      82           0 : }
      83             : 
      84             : static int
      85           0 : fd_bundle_client_get_connect_result( fd_bundle_tile_t const * ctx ) {
      86           0 :   int so_err = 0;
      87           0 :   socklen_t so_err_sz = sizeof(so_err);
      88           0 :   if( FD_UNLIKELY( getsockopt( ctx->tcp_sock, SOL_SOCKET, SO_ERROR, &so_err, &so_err_sz )==-1 ) ) {
      89           0 :     return errno;
      90           0 :   }
      91           0 :   return so_err;
      92           0 : }
      93             : 
      94             : static void
      95           0 : fd_bundle_client_create_conn( fd_bundle_tile_t * ctx ) {
      96           0 :   fd_bundle_client_reset( ctx );
      97             : 
      98             :   /* FIXME IPv6 support */
      99           0 :   fd_addrinfo_t hints = {0};
     100           0 :   hints.ai_family = AF_INET;
     101           0 :   fd_addrinfo_t * res = NULL;
     102           0 :   uchar scratch[ 4096 ];
     103           0 :   void * pscratch = scratch;
     104           0 :   int err = fd_getaddrinfo( ctx->server_fqdn, &hints, &res, &pscratch, sizeof(scratch) );
     105           0 :   if( FD_UNLIKELY( err ) ) {
     106           0 :     FD_LOG_WARNING(( "fd_getaddrinfo `%s` failed (%d-%s)", ctx->server_fqdn, err, fd_gai_strerror( err ) ));
     107           0 :     fd_bundle_client_reset( ctx );
     108           0 :     ctx->metrics.transport_fail_cnt++;
     109           0 :     return;
     110           0 :   }
     111           0 :   uint const ip4_addr = ((struct sockaddr_in *)res->ai_addr)->sin_addr.s_addr;
     112           0 :   ctx->server_ip4_addr = ip4_addr;
     113             : 
     114           0 :   int tcp_sock = socket( AF_INET, SOCK_STREAM|SOCK_CLOEXEC, 0 );
     115           0 :   if( FD_UNLIKELY( tcp_sock<0 ) ) {
     116           0 :     FD_LOG_ERR(( "socket(AF_INET,SOCK_STREAM|SOCK_CLOEXEC,0) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     117           0 :   }
     118           0 :   ctx->tcp_sock = tcp_sock;
     119             : 
     120           0 :   if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_SOCKET, SO_RCVBUF, &ctx->so_rcvbuf, sizeof(int) ) ) ) {
     121           0 :     FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_RCVBUF,%i) failed (%i-%s)", ctx->so_rcvbuf, errno, fd_io_strerror( errno ) ));
     122           0 :   }
     123             : 
     124           0 :   int tcp_nodelay = 1;
     125           0 :   if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_TCP, TCP_NODELAY, &tcp_nodelay, sizeof(int) ) ) ) {
     126           0 :     FD_LOG_ERR(( "setsockopt failed (%d-%s)", errno, fd_io_strerror( errno ) ));
     127           0 :   }
     128             : 
     129           0 :   if( FD_UNLIKELY( fcntl( tcp_sock, F_SETFL, O_NONBLOCK )==-1 ) ) {
     130           0 :     FD_LOG_ERR(( "fcntl(tcp_sock,F_SETFL,O_NONBLOCK) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     131           0 :   }
     132             : 
     133           0 :   char const * scheme = "http";
     134           0 : # if FD_HAS_OPENSSL
     135           0 :   if( ctx->is_ssl ) scheme = "https";
     136           0 : # endif
     137             : 
     138           0 :   FD_LOG_INFO(( "Connecting to %s://" FD_IP4_ADDR_FMT ":%hu (%.*s)",
     139           0 :                 scheme,
     140           0 :                 FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
     141           0 :                 (int)ctx->server_sni_len, ctx->server_sni ));
     142             : 
     143           0 :   int connect_err = fd_bundle_client_do_connect( ctx, ip4_addr );
     144             :   /* FD_LIKELY as EINPROGRESS is expected */
     145           0 :   if( FD_LIKELY( connect_err ) ) {
     146           0 :     if( FD_UNLIKELY( connect_err!=EINPROGRESS ) ) {
     147           0 :       FD_LOG_WARNING(( "connect(tcp_sock," FD_IP4_ADDR_FMT ":%u) failed (%i-%s)",
     148           0 :                       FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
     149           0 :                       connect_err, fd_io_strerror( connect_err ) ));
     150           0 :       fd_bundle_client_reset( ctx );
     151           0 :       ctx->metrics.transport_fail_cnt++;
     152           0 :       return;
     153           0 :     }
     154           0 :   }
     155             : 
     156           0 : # if FD_HAS_OPENSSL
     157           0 :   if( ctx->is_ssl ) {
     158           0 :     BIO * bio = fd_openssl_bio_new_socket( ctx->tcp_sock, BIO_NOCLOSE );
     159           0 :     if( FD_UNLIKELY( !bio ) ) {
     160           0 :       FD_LOG_ERR(( "fd_openssl_bio_new_socket failed" ));
     161           0 :     }
     162             : 
     163           0 :     SSL * ssl = SSL_new( ctx->ssl_ctx );
     164           0 :     if( FD_UNLIKELY( !ssl ) ) {
     165           0 :       FD_LOG_ERR(( "SSL_new failed" ));
     166           0 :     }
     167             : 
     168           0 :     SSL_set_bio( ssl, bio, bio ); /* moves ownership of bio */
     169           0 :     SSL_set_connect_state( ssl );
     170             : 
     171             :     /* Indicate to endpoint which server name we want */
     172           0 :     if( FD_UNLIKELY( !SSL_set_tlsext_host_name( ssl, ctx->server_sni ) ) ) {
     173           0 :       FD_LOG_ERR(( "SSL_set_tlsext_host_name failed" ));
     174           0 :     }
     175             : 
     176             :     /* Enable hostname verification */
     177           0 :     if( FD_UNLIKELY( !SSL_set1_host( ssl, ctx->server_sni ) ) ) {
     178           0 :       FD_LOG_ERR(( "SSL_set1_host failed" ));
     179           0 :     }
     180             : 
     181           0 :     ctx->ssl = ssl;
     182           0 :   }
     183           0 : # endif /* FD_HAS_OPENSSL */
     184             : 
     185           0 :   fd_grpc_client_reset( ctx->grpc_client );
     186           0 :   fd_keepalive_init( ctx->keepalive, ctx->rng, ctx->keepalive_interval, ctx->keepalive_interval, fd_bundle_now() );
     187           0 : }
     188             : 
     189             : static int
     190             : fd_bundle_client_drive_io( fd_bundle_tile_t * ctx,
     191          27 :                            int *              charge_busy ) {
     192          27 : # if FD_HAS_OPENSSL
     193          27 :   if( ctx->is_ssl ) {
     194           0 :     return fd_grpc_client_rxtx_ossl( ctx->grpc_client, ctx->ssl, charge_busy );
     195           0 :   }
     196          27 : # endif /* FD_HAS_OPENSSL */
     197             : 
     198          27 :   return fd_grpc_client_rxtx_socket( ctx->grpc_client, ctx->tcp_sock, charge_busy );
     199          27 : }
     200             : 
     201             : static void
     202           3 : fd_bundle_client_request_builder_info( fd_bundle_tile_t * ctx ) {
     203           3 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     204             : 
     205           3 :   block_engine_BlockBuilderFeeInfoRequest req = block_engine_BlockBuilderFeeInfoRequest_init_default;
     206           3 :   static char const path[] = "/block_engine.BlockEngineValidator/GetBlockBuilderFeeInfo";
     207           3 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     208           3 :       ctx->grpc_client,
     209           3 :       path, sizeof(path)-1,
     210           3 :       FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo,
     211           3 :       &block_engine_BlockBuilderFeeInfoRequest_msg, &req,
     212           3 :       ctx->auther.access_token, ctx->auther.access_token_sz,
     213           3 :       0 /* is_streaming */
     214           3 :   );
     215           3 :   if( FD_UNLIKELY( !request ) ) return;
     216           3 :   fd_grpc_client_deadline_set(
     217           3 :       request,
     218           3 :       FD_GRPC_DEADLINE_RX_END,
     219           3 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     220             : 
     221           3 :   ctx->builder_info_wait = 1;
     222           3 : }
     223             : 
     224             : static void
     225           3 : fd_bundle_client_subscribe_packets( fd_bundle_tile_t * ctx ) {
     226           3 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     227             : 
     228           3 :   block_engine_SubscribePacketsRequest req = block_engine_SubscribePacketsRequest_init_default;
     229           3 :   static char const path[] = "/block_engine.BlockEngineValidator/SubscribePackets";
     230           3 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     231           3 :       ctx->grpc_client,
     232           3 :       path, sizeof(path)-1,
     233           3 :       FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets,
     234           3 :       &block_engine_SubscribePacketsRequest_msg, &req,
     235           3 :       ctx->auther.access_token, ctx->auther.access_token_sz,
     236           3 :       0 /* is_streaming */
     237           3 :   );
     238           3 :   if( FD_UNLIKELY( !request ) ) return;
     239           3 :   fd_grpc_client_deadline_set(
     240           3 :       request,
     241           3 :       FD_GRPC_DEADLINE_HEADER,
     242           3 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     243             : 
     244           3 :   ctx->packet_subscription_wait = 1;
     245           3 : }
     246             : 
     247             : static void
     248           3 : fd_bundle_client_subscribe_bundles( fd_bundle_tile_t * ctx ) {
     249           3 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     250             : 
     251           3 :   block_engine_SubscribeBundlesRequest req = block_engine_SubscribeBundlesRequest_init_default;
     252           3 :   static char const path[] = "/block_engine.BlockEngineValidator/SubscribeBundles";
     253           3 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     254           3 :       ctx->grpc_client,
     255           3 :       path, sizeof(path)-1,
     256           3 :       FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles,
     257           3 :       &block_engine_SubscribeBundlesRequest_msg, &req,
     258           3 :       ctx->auther.access_token, ctx->auther.access_token_sz,
     259           3 :       0 /* is_streaming */
     260           3 :   );
     261           3 :   if( FD_UNLIKELY( !request ) ) return;
     262           3 :   fd_grpc_client_deadline_set(
     263           3 :       request,
     264           3 :       FD_GRPC_DEADLINE_HEADER,
     265           3 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     266             : 
     267           3 :   ctx->bundle_subscription_wait = 1;
     268           3 : }
     269             : 
     270             : void
     271           3 : fd_bundle_client_send_ping( fd_bundle_tile_t * ctx ) {
     272           3 :   if( FD_UNLIKELY( !ctx->grpc_client ) ) return; /* no client */
     273           3 :   fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
     274           3 :   if( FD_UNLIKELY( !conn ) ) return; /* no conn */
     275           3 :   if( FD_UNLIKELY( conn->flags ) ) return; /* conn busy */
     276           3 :   fd_h2_rbuf_t * rbuf_tx = fd_grpc_client_rbuf_tx( ctx->grpc_client );
     277             : 
     278           3 :   if( FD_LIKELY( fd_h2_tx_ping( conn, rbuf_tx ) ) ) {
     279           3 :     long now = fd_bundle_now();
     280           3 :     fd_keepalive_tx( ctx->keepalive, ctx->rng, now );
     281           3 :     FD_LOG_DEBUG(( "Keepalive TX (deadline=+%gs)", (double)( ctx->keepalive->ts_deadline-now )/1e9 ));
     282           3 :   }
     283           3 : }
     284             : 
     285             : int
     286             : fd_bundle_client_step_reconnect( fd_bundle_tile_t * ctx,
     287          18 :                                  long               now ) {
     288             :   /* Drive auth */
     289          18 :   if( FD_UNLIKELY( ctx->auther.needs_poll ) ) {
     290           0 :     fd_bundle_auther_poll( &ctx->auther, ctx->grpc_client, ctx->keyguard_client );
     291           0 :     return 1;
     292           0 :   }
     293          18 :   if( FD_UNLIKELY( ctx->auther.state!=FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) return 0;
     294             : 
     295             :   /* Request block builder info */
     296          18 :   int const builder_info_expired = ( ctx->builder_info_valid_until - now )<0;
     297          18 :   if( FD_UNLIKELY( ( ( !ctx->builder_info_avail ) |
     298          18 :                      ( builder_info_expired     ) ) &
     299          18 :                    ( !ctx->builder_info_wait      ) ) ) {
     300           3 :     fd_bundle_client_request_builder_info( ctx );
     301           3 :     return 1;
     302           3 :   }
     303             : 
     304             :   /* Subscribe to packets */
     305          15 :   if( FD_UNLIKELY( !ctx->packet_subscription_live && !ctx->packet_subscription_wait ) ) {
     306           3 :     fd_bundle_client_subscribe_packets( ctx );
     307           3 :     return 1;
     308           3 :   }
     309             : 
     310             :   /* Subscribe to bundles */
     311          12 :   if( FD_UNLIKELY( !ctx->bundle_subscription_live && !ctx->bundle_subscription_wait ) ) {
     312           3 :     fd_bundle_client_subscribe_bundles( ctx );
     313           3 :     return 1;
     314           3 :   }
     315             : 
     316             :   /* Send a PING */
     317           9 :   if( FD_UNLIKELY( fd_keepalive_should_tx( ctx->keepalive, now ) ) ) {
     318           3 :     fd_bundle_client_send_ping( ctx );
     319           3 :     return 1;
     320           3 :   }
     321             : 
     322           6 :   return 0;
     323           9 : }
     324             : 
     325             : static void
     326             : fd_bundle_client_step1( fd_bundle_tile_t * ctx,
     327          30 :                         int *              charge_busy ) {
     328             : 
     329             :   /* Wait for TCP socket to connect */
     330          30 :   if( FD_UNLIKELY( !ctx->tcp_sock_connected ) ) {
     331           0 :     if( FD_UNLIKELY( ctx->tcp_sock < 0 ) ) goto reconnect;
     332             : 
     333           0 :     struct pollfd pfds[1] = {
     334           0 :       { .fd = ctx->tcp_sock, .events = POLLOUT }
     335           0 :     };
     336           0 :     int poll_res = fd_syscall_poll( pfds, 1, 0 );
     337           0 :     if( FD_UNLIKELY( poll_res<0 ) ) {
     338           0 :       FD_LOG_ERR(( "fd_syscall_poll(tcp_sock) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     339           0 :     }
     340           0 :     if( poll_res==0 ) return;
     341             : 
     342           0 :     int connect_result = 0;
     343           0 :     if( pfds[0].revents & (POLLERR|POLLHUP) ) {
     344           0 :       connect_result = fd_bundle_client_get_connect_result( ctx );
     345           0 :     connect_failed:
     346           0 :       FD_LOG_INFO(( "Bundle gRPC connect attempt failed (%i-%s)", connect_result, fd_io_strerror( connect_result ) ));
     347           0 :       fd_bundle_client_reset( ctx );
     348           0 :       ctx->metrics.transport_fail_cnt++;
     349           0 :       *charge_busy = 1;
     350           0 :       return;
     351           0 :     }
     352           0 :     if( pfds[0].revents & POLLOUT ) {
     353           0 :       connect_result = fd_bundle_client_get_connect_result( ctx );
     354           0 :       if( FD_UNLIKELY( connect_result!=0 ) ) {
     355           0 :         goto connect_failed;
     356           0 :       }
     357           0 :       FD_LOG_DEBUG(( "Bundle TCP socket connected" ));
     358           0 :       ctx->tcp_sock_connected = 1;
     359           0 :       *charge_busy = 1;
     360           0 :       return;
     361           0 :     }
     362           0 :     return;
     363           0 :   }
     364             : 
     365             :   /* gRPC conn died? */
     366          30 :   if( FD_UNLIKELY( !ctx->grpc_client ) ) {
     367           0 :     long sleep_start;
     368           0 :   reconnect:
     369           0 :     sleep_start = fd_bundle_now();
     370           0 :     if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, sleep_start ) ) ) {
     371           0 :       long wait_dur = ctx->backoff_until - sleep_start;
     372           0 :       fd_log_sleep( fd_long_min( wait_dur, 1e6 ) );
     373           0 :       return;
     374           0 :     }
     375           0 :     fd_bundle_client_create_conn( ctx );
     376           0 :     *charge_busy = 1;
     377           0 :     return;
     378           0 :   }
     379             : 
     380             :   /* Did a HTTP/2 PING time out */
     381          30 :   long check_ts = ctx->cached_ts = fd_bundle_now();
     382          30 :   if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, check_ts ) ) ) {
     383           3 :     FD_LOG_WARNING(( "Bundle gRPC timed out (HTTP/2 PING went unanswered for %.2f seconds)",
     384           3 :                      (double)( check_ts - ctx->keepalive->ts_last_tx )/1e9 ));
     385           3 :     ctx->keepalive->inflight = 0;
     386           3 :     ctx->defer_reset = 1;
     387           3 :     *charge_busy = 1;
     388           3 :     return;
     389           3 :   }
     390             : 
     391             :   /* Drive I/O, SSL handshake, and any inflight requests */
     392          27 :   if( FD_UNLIKELY( -1==fd_bundle_client_drive_io( ctx, charge_busy ) || ctx->defer_reset /* new error? */ ) ) {
     393           0 :     fd_bundle_client_reset( ctx );
     394           0 :     ctx->metrics.transport_fail_cnt++;
     395           0 :     *charge_busy = 1;
     396           0 :     return;
     397           0 :   }
     398             : 
     399             :   /* Are we ready to issue a new request? */
     400          27 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     401          18 :   long io_ts = fd_bundle_now();
     402          18 :   if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, io_ts ) ) ) return;
     403             : 
     404          18 :   *charge_busy |= fd_bundle_client_step_reconnect( ctx, io_ts );
     405          18 : }
     406             : 
     407             : static void
     408          30 : fd_bundle_client_log_status( fd_bundle_tile_t * ctx ) {
     409          30 :   int status = fd_bundle_client_status( ctx );
     410             : 
     411          30 :   int const connected_now    = ( status==FD_BUNDLE_STATE_CONNECTED );
     412          30 :   int const connected_before = ( ctx->bundle_status_logged==FD_BUNDLE_STATE_CONNECTED );
     413             : 
     414          30 :   if( FD_UNLIKELY( connected_now!=connected_before ) ) {
     415           3 :     long ts = fd_log_wallclock();
     416           3 :     if( FD_LIKELY( ts-(ctx->last_bundle_status_log_nanos) >= (long)1e6 ) ) {
     417           3 :       if( connected_now ) FD_LOG_INFO(( "Connected to bundle server" ));
     418           0 :       else                FD_LOG_INFO(( "Disconnected from bundle server" ));
     419           3 :       ctx->last_bundle_status_log_nanos = ts;
     420           3 :       ctx->bundle_status_logged = (uchar)status;
     421           3 :     }
     422           3 :   }
     423          30 : }
     424             : 
     425             : void
     426             : fd_bundle_client_step( fd_bundle_tile_t * ctx,
     427          30 :                        int *              charge_busy ) {
     428             :   /* Edge-trigger logging with rate limiting */
     429          30 :   fd_bundle_client_step1( ctx, charge_busy );
     430          30 :   fd_bundle_client_log_status( ctx );
     431          30 : }
     432             : 
     433             : void
     434             : fd_bundle_tile_backoff( fd_bundle_tile_t * ctx,
     435          24 :                         long               now ) {
     436          24 :   uint iter = ctx->backoff_iter;
     437          24 :   if( now < ctx->backoff_reset ) iter = 0U;
     438          24 :   iter++;
     439             : 
     440             :   /* FIXME proper backoff */
     441          24 :   long wait_ns = (long)2e9;
     442          24 :   wait_ns = (long)( fd_rng_ulong( ctx->rng ) & ( (1UL<<fd_ulong_find_msb_w_default( (ulong)wait_ns, 0 ))-1UL ) );
     443             : 
     444          24 :   ctx->backoff_until = now +   wait_ns;
     445          24 :   ctx->backoff_reset = now + 2*wait_ns;
     446             : 
     447          24 :   ctx->backoff_iter = iter;
     448          24 : }
     449             : 
     450             : static void
     451           0 : fd_bundle_client_grpc_conn_established( void * app_ctx ) {
     452           0 :   (void)app_ctx;
     453           0 :   FD_LOG_INFO(( "Bundle gRPC connection established" ));
     454           0 : }
     455             : 
     456             : static void
     457             : fd_bundle_client_grpc_conn_dead( void * app_ctx,
     458             :                                  uint   h2_err,
     459           0 :                                  int    closed_by ) {
     460           0 :   fd_bundle_tile_t * ctx = app_ctx;
     461           0 :   FD_LOG_INFO(( "Bundle gRPC connection closed %s (%u-%s)",
     462           0 :                 closed_by ? "by peer" : "due to error",
     463           0 :                 h2_err, fd_h2_strerror( h2_err ) ));
     464           0 :   ctx->defer_reset = 1;
     465           0 : }
     466             : 
     467             : /* Buffers a bundle transaction for deferred publishing by after_credit. */
     468             : 
     469             : static void
     470             : fd_bundle_tile_publish_bundle_txn(
     471             :     fd_bundle_tile_t * ctx,
     472             :     void const *       txn,
     473             :     ulong              txn_sz,  /* <=FD_TXN_MTU */
     474             :     ulong              bundle_txn_cnt,
     475             :     uint               source_ipv4
     476          72 : ) {
     477          72 :   if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
     478           0 :     ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
     479           0 :     return;
     480           0 :   }
     481             : 
     482          72 :   if( FD_UNLIKELY( pending_txn_full( ctx->pending_txns ) ) ) {
     483           0 :     ctx->metrics.backpressure_drop_cnt++;
     484           0 :     return;
     485           0 :   }
     486             : 
     487          72 :   fd_bundle_pending_txn_t * entry = pending_txn_push_tail_nocopy( ctx->pending_txns );
     488          72 :   fd_memcpy( entry->payload, txn, txn_sz );
     489          72 :   entry->payload_sz     = (ushort)txn_sz;
     490          72 :   entry->source_ipv4    = source_ipv4;
     491          72 :   entry->sig            = 1UL;
     492          72 :   entry->bundle_seq     = ctx->bundle_seq;
     493          72 :   entry->bundle_txn_cnt = bundle_txn_cnt;
     494          72 :   entry->commission     = (uchar)ctx->builder_commission;
     495          72 :   fd_memcpy( entry->commission_pubkey, ctx->builder_pubkey, 32UL );
     496          72 :   ctx->metrics.txn_received_cnt++;
     497          72 : }
     498             : 
     499             : /* Buffers a regular transaction for deferred publishing by after_credit. */
     500             : 
     501             : static void
     502             : fd_bundle_tile_publish_txn(
     503             :     fd_bundle_tile_t * ctx,
     504             :     void const *       txn,
     505             :     ulong              txn_sz,  /* <=FD_TXN_MTU */
     506             :     uint               source_ipv4
     507          42 : ) {
     508          42 :   if( FD_UNLIKELY( pending_txn_full( ctx->pending_txns ) ) ) {
     509           3 :     ctx->metrics.backpressure_drop_cnt++;
     510           3 :     return;
     511           3 :   }
     512             : 
     513          39 :   fd_bundle_pending_txn_t * entry = pending_txn_push_tail_nocopy( ctx->pending_txns );
     514          39 :   fd_memcpy( entry->payload, txn, txn_sz );
     515          39 :   entry->payload_sz     = (ushort)txn_sz;
     516          39 :   entry->source_ipv4    = source_ipv4;
     517          39 :   entry->sig            = 0UL;
     518          39 :   entry->bundle_seq     = 0UL;
     519          39 :   entry->bundle_txn_cnt = 1UL;
     520          39 :   entry->commission     = 0U;
     521          39 :   fd_memset( entry->commission_pubkey, 0, 32UL );
     522          39 :   ctx->metrics.txn_received_cnt++;
     523          39 : }
     524             : 
     525             : /* Called for each transaction in a bundle.  Simply counts up
     526             :    bundle_txn_cnt, but does not publish anything. */
     527             : 
     528             : static bool
     529             : fd_bundle_client_visit_pb_bundle_txn_preflight(
     530             :     pb_istream_t *     istream,
     531             :     pb_field_t const * field,
     532             :     void **            arg
     533          90 : ) {
     534          90 :   (void)istream; (void)field;
     535          90 :   fd_bundle_tile_t * ctx = *arg;
     536          90 :   ctx->bundle_txn_cnt++;
     537          90 :   return true;
     538          90 : }
     539             : 
     540             : /* Called for each transaction in a bundle.  Publishes each transaction
     541             :    to the tango message bus. */
     542             : 
     543             : static bool
     544             : fd_bundle_client_visit_pb_bundle_txn(
     545             :     pb_istream_t *     istream,
     546             :     pb_field_t const * field,
     547             :     void **            arg
     548          72 : ) {
     549          72 :   (void)field;
     550          72 :   fd_bundle_tile_t * ctx = *arg;
     551             : 
     552          72 :   packet_Packet packet = packet_Packet_init_default;
     553          72 :   if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
     554           0 :     ctx->metrics.decode_fail_cnt++;
     555           0 :     FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
     556           0 :     return false;
     557           0 :   }
     558             : 
     559          72 :   if( FD_UNLIKELY( packet.data.size == 0 ) ) {
     560           0 :     FD_LOG_INFO(( "Bundle server delivered an empty packet, ignoring" ));
     561           0 :     return true;
     562           0 :   }
     563             : 
     564          72 :   if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
     565           0 :     FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
     566           0 :     return true;
     567           0 :   }
     568             : 
     569          72 :   uint _ip4; uint ip4 = fd_uint_if( packet.has_meta, fd_cstr_to_ip4_addr( packet.meta.addr, &_ip4 ) ? _ip4 : ctx->server_ip4_addr, ctx->server_ip4_addr );
     570          72 :   fd_bundle_tile_publish_bundle_txn(
     571          72 :       ctx,
     572          72 :       packet.data.bytes, packet.data.size,
     573          72 :       ctx->bundle_txn_cnt,
     574          72 :       ip4
     575          72 :   );
     576             : 
     577          72 :   return true;
     578          72 : }
     579             : 
     580             : static void
     581             : fd_bundle_client_sample_rx_delay(
     582             :     fd_bundle_tile_t *                ctx,
     583             :     google_protobuf_Timestamp const * ts
     584          45 : ) {
     585          45 :   ulong tsorig = (ulong)ts->seconds*(ulong)1e9 + (ulong)ts->nanos;
     586          45 :   fd_histf_sample( ctx->metrics.msg_rx_delay, fd_ulong_sat_sub( (ulong)ctx->cached_ts, tsorig ) );
     587          45 : }
     588             : 
     589             : /* Called for each BundleUuid in a SubscribeBundlesResponse. */
     590             : 
     591             : static bool
     592             : fd_bundle_client_visit_pb_bundle_uuid(
     593             :     pb_istream_t *     istream,
     594             :     pb_field_t const * field,
     595             :     void **            arg
     596          24 : ) {
     597          24 :   (void)field;
     598          24 :   fd_bundle_tile_t * ctx = *arg;
     599             : 
     600             :   /* Reset bundle state */
     601             : 
     602          24 :   ctx->bundle_txn_cnt = 0UL;
     603             : 
     604             :   /* Do two decode passes.  This is required because we need to know the
     605             :      number of transactions in a bundle ahead of time.  However, due to
     606             :      the Protobuf wire encoding, we don't know the number of txns that
     607             :      will come until we've parsed everything.
     608             : 
     609             :      First pass: Count number of bundles. */
     610             : 
     611          24 :   pb_istream_t peek = *istream;
     612          24 :   bundle_BundleUuid bundle = bundle_BundleUuid_init_default;
     613          24 :   bundle.bundle.packets = (pb_callback_t) {
     614          24 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_txn_preflight,
     615          24 :     .arg          = ctx
     616          24 :   };
     617          24 :   if( FD_UNLIKELY( !pb_decode( &peek, &bundle_BundleUuid_msg, &bundle ) ) ) {
     618           0 :     ctx->metrics.decode_fail_cnt++;
     619           0 :     FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed: %s", peek.errmsg ));
     620           0 :     return false;
     621           0 :   }
     622             : 
     623             :   /* At this opint, ctx->bundle_txn_cnt is correctly set.  Too many txns
     624             :      is treated as a NOP.
     625             : 
     626             :      Second pass: Actually publish bundle packets */
     627             : 
     628          24 :   if( FD_UNLIKELY( ctx->bundle_txn_cnt>FD_BUNDLE_CLIENT_MAX_TXN_PER_BUNDLE ) ) return true;
     629             : 
     630          21 :   if( FD_UNLIKELY( pending_txn_avail( ctx->pending_txns )<ctx->bundle_txn_cnt ) ) {
     631           0 :     ctx->metrics.backpressure_drop_cnt += ctx->bundle_txn_cnt;
     632           0 :     return true;
     633           0 :   }
     634             : 
     635          21 :   ctx->bundle_seq++;
     636          21 :   bundle = (bundle_BundleUuid)bundle_BundleUuid_init_default;
     637          21 :   bundle.bundle.packets = (pb_callback_t) {
     638          21 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_txn,
     639          21 :     .arg          = ctx
     640          21 :   };
     641             : 
     642          21 :   ctx->metrics.bundle_received_cnt++;
     643             : 
     644          21 :   if( FD_UNLIKELY( !pb_decode( istream, &bundle_BundleUuid_msg, &bundle ) ) ) {
     645           0 :     ctx->metrics.decode_fail_cnt++;
     646           0 :     FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed (internal error): %s", istream->errmsg ));
     647           0 :     return false;
     648           0 :   }
     649             : 
     650          21 :   fd_bundle_client_sample_rx_delay( ctx, &bundle.bundle.header.ts );
     651             : 
     652          21 :   return true;
     653          21 : }
     654             : 
     655             : /* Handle a SubscribeBundlesResponse from a SubscribeBundles gRPC call. */
     656             : 
     657             : static void
     658             : fd_bundle_client_handle_bundle_batch(
     659             :     fd_bundle_tile_t * ctx,
     660             :     pb_istream_t *     istream
     661          24 : ) {
     662          24 :   if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
     663           3 :     ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
     664           3 :     return;
     665           3 :   }
     666             : 
     667          21 :   block_engine_SubscribeBundlesResponse res = block_engine_SubscribeBundlesResponse_init_default;
     668          21 :   res.bundles = (pb_callback_t) {
     669          21 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_uuid,
     670          21 :     .arg          = ctx
     671          21 :   };
     672          21 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribeBundlesResponse_msg, &res ) ) ) {
     673           0 :     ctx->metrics.decode_fail_cnt++;
     674           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribeBundlesResponse) failed: %s", istream->errmsg ));
     675           0 :     return;
     676           0 :   }
     677          21 : }
     678             : 
     679             : /* Called for each 'Packet' (a regular transaction) of a
     680             :    SubscribePacketsResponse. */
     681             : 
     682             : static bool
     683             : fd_bundle_client_visit_pb_packet(
     684             :     pb_istream_t *     istream,
     685             :     pb_field_t const * field,
     686             :     void **            arg
     687          42 : ) {
     688          42 :   (void)field;
     689          42 :   fd_bundle_tile_t * ctx = *arg;
     690             : 
     691          42 :   packet_Packet packet = packet_Packet_init_default;
     692          42 :   if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
     693           0 :     ctx->metrics.decode_fail_cnt++;
     694           0 :     FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
     695           0 :     return false;
     696           0 :   }
     697             : 
     698          42 :   if( FD_UNLIKELY( packet.data.size == 0 ) ) {
     699           0 :     FD_LOG_WARNING(( "Bundle server delivered an empty packet, ignoring" ));
     700           0 :     return true;
     701           0 :   }
     702             : 
     703          42 :   if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
     704           0 :     FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
     705           0 :     return true;
     706           0 :   }
     707             : 
     708             : 
     709          42 :   uint _ip4; uint ip4 = fd_uint_if( packet.has_meta, fd_cstr_to_ip4_addr( packet.meta.addr, &_ip4 ) ? _ip4 : 0U, 0U );
     710          42 :   fd_bundle_tile_publish_txn( ctx, packet.data.bytes, packet.data.size, ip4 );
     711          42 :   ctx->metrics.packet_received_cnt++;
     712             : 
     713          42 :   return true;
     714          42 : }
     715             : 
     716             : /* Handle a SubscribePacketsResponse from a SubscribePackets gRPC call. */
     717             : 
     718             : static void
     719             : fd_bundle_client_handle_packet_batch(
     720             :     fd_bundle_tile_t * ctx,
     721             :     pb_istream_t *     istream
     722          24 : ) {
     723          24 :   block_engine_SubscribePacketsResponse res = block_engine_SubscribePacketsResponse_init_default;
     724          24 :   res.batch.packets = (pb_callback_t) {
     725          24 :     .funcs.decode = fd_bundle_client_visit_pb_packet,
     726          24 :     .arg          = ctx
     727          24 :   };
     728          24 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribePacketsResponse_msg, &res ) ) ) {
     729           0 :     ctx->metrics.decode_fail_cnt++;
     730           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribePacketsResponse) failed" ));
     731           0 :     return;
     732           0 :   }
     733             : 
     734          24 :   fd_bundle_client_sample_rx_delay( ctx, &res.header.ts );
     735          24 : }
     736             : 
     737             : /* Handle a BlockBuilderFeeInfoResponse from a GetBlockBuilderFeeInfo
     738             :    gRPC call. */
     739             : 
     740             : static void
     741             : fd_bundle_client_handle_builder_fee_info(
     742             :     fd_bundle_tile_t * ctx,
     743             :     pb_istream_t *     istream
     744           9 : ) {
     745           9 :   block_engine_BlockBuilderFeeInfoResponse res = block_engine_BlockBuilderFeeInfoResponse_init_default;
     746           9 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_BlockBuilderFeeInfoResponse_msg, &res ) ) ) {
     747           0 :     ctx->metrics.decode_fail_cnt++;
     748           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.BlockBuilderFeeInfoResponse) failed" ));
     749           0 :     return;
     750           0 :   }
     751           9 :   if( FD_UNLIKELY( res.commission > 100 ) ) {
     752           3 :     ctx->metrics.decode_fail_cnt++;
     753           3 :     FD_LOG_WARNING(( "BlockBuilderFeeInfoResponse commission out of range (0-100): %lu", res.commission ));
     754           3 :     return;
     755           3 :   }
     756             : 
     757           6 :   uchar decoded_builder_pubkey[ 32 ];
     758           6 :   if( FD_UNLIKELY( !fd_base58_decode_32( res.pubkey, decoded_builder_pubkey ) ) ) {
     759           3 :     FD_LOG_HEXDUMP_WARNING(( "Invalid pubkey in BlockBuilderFeeInfoResponse", res.pubkey, strnlen( res.pubkey, sizeof(res.pubkey) ) ));
     760           3 :     return;
     761           3 :   }
     762             : 
     763           3 :   ctx->builder_commission = (uchar)res.commission; /* Apply update atomically */
     764           3 :   fd_memcpy( ctx->builder_pubkey, decoded_builder_pubkey, sizeof(ctx->builder_pubkey) );
     765             : 
     766           3 :   long validity_duration_ns = (long)( 60e9 * 5. ); /* 5 minutes */
     767           3 :   ctx->builder_info_avail = 1;
     768           3 :   ctx->builder_info_valid_until = fd_bundle_now() + validity_duration_ns;
     769           3 : }
     770             : 
     771             : static void
     772             : fd_bundle_client_grpc_tx_complete(
     773             :     void * app_ctx,
     774             :     ulong  request_ctx
     775           9 : ) {
     776           9 :   (void)app_ctx; (void)request_ctx;
     777           9 : }
     778             : 
     779             : void
     780             : fd_bundle_client_grpc_rx_start(
     781             :     void * app_ctx,
     782             :     ulong  request_ctx
     783           9 : ) {
     784           9 :   fd_bundle_tile_t * ctx = app_ctx;
     785           9 :   switch( request_ctx ) {
     786           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     787           3 :     ctx->packet_subscription_live = 1;
     788           3 :     ctx->packet_subscription_wait = 0;
     789           3 :     break;
     790           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     791           3 :     ctx->bundle_subscription_live = 1;
     792           3 :     ctx->bundle_subscription_wait = 0;
     793           3 :     break;
     794           9 :   }
     795           9 : }
     796             : 
     797             : void
     798             : fd_bundle_client_grpc_rx_msg(
     799             :     void *       app_ctx,
     800             :     void const * protobuf,
     801             :     ulong        protobuf_sz,
     802             :     ulong        request_ctx
     803          57 : ) {
     804          57 :   fd_bundle_tile_t * ctx = app_ctx;
     805          57 :   ctx->metrics.proto_received_bytes += protobuf_sz;
     806          57 :   pb_istream_t istream = pb_istream_from_buffer( protobuf, protobuf_sz );
     807          57 :   switch( request_ctx ) {
     808           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     809           0 :     if( FD_UNLIKELY( !fd_bundle_auther_handle_challenge_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
     810           0 :       ctx->metrics.decode_fail_cnt++;
     811           0 :       fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     812           0 :     }
     813           0 :     break;
     814           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     815           0 :     if( FD_UNLIKELY( !fd_bundle_auther_handle_tokens_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
     816           0 :       ctx->metrics.decode_fail_cnt++;
     817           0 :       fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     818           0 :     }
     819           0 :     break;
     820          24 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     821          24 :     fd_bundle_client_handle_bundle_batch( ctx, &istream );
     822          24 :     break;
     823          24 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     824          24 :     fd_bundle_client_handle_packet_batch( ctx, &istream );
     825          24 :     break;
     826           9 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
     827           9 :     fd_bundle_client_handle_builder_fee_info( ctx, &istream );
     828           9 :     break;
     829           0 :   default:
     830           0 :     FD_LOG_ERR(( "Received unexpected gRPC message (request_ctx=%lu)", request_ctx ));
     831          57 :   }
     832          57 : }
     833             : 
     834             : static void
     835             : fd_bundle_client_request_failed( fd_bundle_tile_t * ctx,
     836           9 :                                  ulong              request_ctx ) {
     837           9 :   fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     838           9 :   switch( request_ctx ) {
     839           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     840           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     841           0 :     fd_bundle_auther_handle_request_fail( &ctx->auther );
     842           0 :     break;
     843           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
     844           3 :     ctx->builder_info_wait = 0;
     845           3 :     break;
     846           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     847           3 :     ctx->packet_subscription_live = 0;
     848           3 :     ctx->packet_subscription_wait = 0;
     849           3 :     break;
     850           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     851           3 :     ctx->bundle_subscription_live = 0;
     852           3 :     ctx->bundle_subscription_wait = 0;
     853           3 :     break;
     854           9 :   }
     855           9 : }
     856             : 
     857             : void
     858             : fd_bundle_client_grpc_rx_end(
     859             :     void *                app_ctx,
     860             :     ulong                 request_ctx,
     861             :     fd_grpc_resp_hdrs_t * resp
     862          24 : ) {
     863          24 :   fd_bundle_tile_t * ctx = app_ctx;
     864          24 :   if( FD_UNLIKELY( resp->h2_status!=200 ) ) {
     865           9 :     FD_LOG_WARNING(( "gRPC request failed (HTTP status %u)", resp->h2_status ));
     866           9 :     fd_bundle_client_request_failed( ctx, request_ctx );
     867           9 :     return;
     868           9 :   }
     869             : 
     870          15 :   resp->grpc_msg_len = (uint)fd_url_unescape( resp->grpc_msg, resp->grpc_msg_len );
     871          15 :   if( !resp->grpc_msg_len ) {
     872          15 :     fd_memcpy( resp->grpc_msg, "unknown error", 13 );
     873          15 :     resp->grpc_msg_len = 13;
     874          15 :   }
     875             : 
     876          15 :   switch( request_ctx ) {
     877           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     878           0 :     ctx->packet_subscription_live = 0;
     879           0 :     ctx->packet_subscription_wait = 0;
     880           0 :     fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     881           0 :     ctx->defer_reset = 1;
     882           0 :     FD_LOG_INFO(( "SubscribePackets stream failed (gRPC status %u-%s). Reconnecting ...",
     883           0 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
     884           0 :     return;
     885          12 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     886          12 :     ctx->bundle_subscription_live = 0;
     887          12 :     ctx->bundle_subscription_wait = 0;
     888          12 :     fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     889          12 :     ctx->defer_reset = 1;
     890          12 :     FD_LOG_INFO(( "SubscribeBundles stream failed (gRPC status %u-%s). Reconnecting ...",
     891          12 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
     892          12 :     return;
     893           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
     894           3 :     ctx->builder_info_wait = 0;
     895           3 :     break;
     896           0 :   default:
     897           0 :     break;
     898          15 :   }
     899             : 
     900           3 :   if( FD_UNLIKELY( resp->grpc_status!=FD_GRPC_STATUS_OK ) ) {
     901           0 :     FD_LOG_INFO(( "gRPC request failed (gRPC status %u-%s): %.*s",
     902           0 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
     903           0 :                   (int)resp->grpc_msg_len, resp->grpc_msg ));
     904           0 :     fd_bundle_client_request_failed( ctx, request_ctx );
     905           0 :     if( resp->grpc_status==FD_GRPC_STATUS_UNAUTHENTICATED ||
     906           0 :         resp->grpc_status==FD_GRPC_STATUS_PERMISSION_DENIED ) {
     907           0 :       fd_bundle_auther_reset( &ctx->auther );
     908           0 :     }
     909           0 :     return;
     910           0 :   }
     911           3 : }
     912             : 
     913             : void
     914             : fd_bundle_client_grpc_rx_timeout(
     915             :     void * app_ctx,
     916             :     ulong  request_ctx,  /* FD_BUNDLE_CLIENT_REQ_{...} */
     917             :     int    deadline_kind /* FD_GRPC_DEADLINE_{HEADER|RX_END} */
     918           6 : ) {
     919           6 :   (void)deadline_kind;
     920           6 :   FD_LOG_WARNING(( "Request timed out: %s", fd_bundle_request_ctx_cstr( request_ctx ) ));
     921           6 :   fd_bundle_tile_t * ctx = app_ctx;
     922           6 :   ctx->defer_reset = 1;
     923           6 : }
     924             : 
     925             : static void
     926           3 : fd_bundle_client_grpc_ping_ack( void * app_ctx ) {
     927           3 :   fd_bundle_tile_t * ctx = app_ctx;
     928           3 :   long rtt_sample = fd_keepalive_rx( ctx->keepalive, fd_bundle_now() );
     929           3 :   if( FD_LIKELY( rtt_sample ) ) {
     930           3 :     fd_rtt_sample( ctx->rtt, (float)rtt_sample, 0 );
     931           3 :     FD_LOG_DEBUG(( "Keepalive ACK" ));
     932           3 :   }
     933           3 :   ctx->metrics.ping_ack_cnt++;
     934           3 : }
     935             : 
     936             : fd_grpc_client_callbacks_t fd_bundle_client_grpc_callbacks = {
     937             :   .conn_established = fd_bundle_client_grpc_conn_established,
     938             :   .conn_dead        = fd_bundle_client_grpc_conn_dead,
     939             :   .tx_complete      = fd_bundle_client_grpc_tx_complete,
     940             :   .rx_start         = fd_bundle_client_grpc_rx_start,
     941             :   .rx_msg           = fd_bundle_client_grpc_rx_msg,
     942             :   .rx_end           = fd_bundle_client_grpc_rx_end,
     943             :   .rx_timeout       = fd_bundle_client_grpc_rx_timeout,
     944             :   .ping_ack         = fd_bundle_client_grpc_ping_ack,
     945             : };
     946             : 
     947             : int
     948         177 : fd_bundle_client_status( fd_bundle_tile_t const * ctx ) {
     949         177 :   if( FD_UNLIKELY( ( !ctx->tcp_sock_connected ) |
     950         177 :                    ( !ctx->grpc_client        ) ) ) {
     951           3 :     return FD_BUNDLE_STATE_DISCONNECTED;
     952           3 :   }
     953             : 
     954         174 :   fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
     955         174 :   if( FD_UNLIKELY( !conn ) ) {
     956           0 :     return FD_BUNDLE_STATE_DISCONNECTED; /* no conn */
     957           0 :   }
     958         174 :   if( FD_UNLIKELY( conn->flags &
     959         174 :       ( FD_H2_CONN_FLAGS_DEAD |
     960         174 :         FD_H2_CONN_FLAGS_SEND_GOAWAY ) ) ) {
     961           6 :     return FD_BUNDLE_STATE_DISCONNECTED;
     962           6 :   }
     963             : 
     964         168 :   if( FD_UNLIKELY( conn->flags &
     965         168 :       ( FD_H2_CONN_FLAGS_CLIENT_INITIAL      |
     966         168 :         FD_H2_CONN_FLAGS_WAIT_SETTINGS_ACK_0 |
     967         168 :         FD_H2_CONN_FLAGS_WAIT_SETTINGS_0     |
     968         168 :         FD_H2_CONN_FLAGS_SERVER_INITIAL ) ) ) {
     969          12 :     return FD_BUNDLE_STATE_CONNECTING; /* connection is not ready */
     970          12 :   }
     971             : 
     972         156 :   if( FD_UNLIKELY( ctx->auther.state != FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) {
     973          12 :     return FD_BUNDLE_STATE_CONNECTING; /* not authenticated */
     974          12 :   }
     975             : 
     976         144 :   if( FD_UNLIKELY( ( !ctx->builder_info_avail       ) |
     977         144 :                    ( !ctx->packet_subscription_live ) |
     978         144 :                    ( !ctx->bundle_subscription_live ) ) ) {
     979          27 :     return FD_BUNDLE_STATE_CONNECTING; /* not fully connected */
     980          27 :   }
     981             : 
     982         117 :   if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, fd_bundle_now() ) ) ) {
     983           3 :     return FD_BUNDLE_STATE_DISCONNECTED; /* possible timeout */
     984           3 :   }
     985             : 
     986         114 :   if( FD_UNLIKELY( !fd_grpc_client_is_connected( ctx->grpc_client ) ) ) {
     987           3 :     return FD_BUNDLE_STATE_CONNECTING;
     988           3 :   }
     989             : 
     990             :   /* As far as we know, the bundle connection is alive and well. */
     991         111 :   return FD_BUNDLE_STATE_CONNECTED;
     992         114 : }
     993             : 
     994             : #undef DISCONNECTED
     995             : #undef CONNECTING
     996             : #undef CONNECTED
     997             : 
     998             : FD_FN_CONST char const *
     999           6 : fd_bundle_request_ctx_cstr( ulong request_ctx ) {
    1000           6 :   switch( request_ctx ) {
    1001           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
    1002           0 :     return "GenerateAuthChallenge";
    1003           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
    1004           0 :     return "GenerateAuthTokens";
    1005           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
    1006           0 :     return "SubscribePackets";
    1007           6 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
    1008           6 :     return "SubscribeBundles";
    1009           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
    1010           0 :     return "GetBlockBuilderFeeInfo";
    1011           0 :   default:
    1012           0 :     return "unknown";
    1013           6 :   }
    1014           6 : }

Generated by: LCOV version 1.14