LCOV - code coverage report
Current view: top level - disco/bundle - fd_bundle_client.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 433 704 61.5 %
Date: 2026-02-13 06:06:24 Functions: 30 36 83.3 %

          Line data    Source code
       1             : /* fd_bundle_client.c steps gRPC related tasks. */
       2             : 
       3             : #define _GNU_SOURCE /* SOL_TCP */
       4             : #include "fd_bundle_auth.h"
       5             : #include "fd_bundle_tile_private.h"
       6             : #include "proto/block_engine.pb.h"
       7             : #include "proto/bundle.pb.h"
       8             : #include "proto/packet.pb.h"
       9             : #include "../fd_txn_m.h"
      10             : #include "../plugin/fd_plugin.h"
      11             : #include "../../waltz/h2/fd_h2_conn.h"
      12             : #include "../../waltz/http/fd_url.h" /* fd_url_unescape */
      13             : #include "../../ballet/base58/fd_base58.h"
      14             : #include "../../ballet/nanopb/pb_decode.h"
      15             : #include "../../util/net/fd_ip4.h"
      16             : 
      17             : #include <fcntl.h>
      18             : #include <errno.h>
      19             : #include <unistd.h> /* close */
      20             : #include <poll.h> /* poll */
      21             : #include <sys/socket.h> /* socket */
      22             : #include <netinet/in.h>
      23             : #include <netinet/ip.h>
      24             : #include <netinet/tcp.h>
      25             : 
      26           9 : #define FD_BUNDLE_CLIENT_REQUEST_TIMEOUT ((long)8e9) /* 8 seconds */
      27             : 
      28             : #define FD_BUNDLE_CLIENT_MAX_TXN_PER_BUNDLE (5UL)
      29             : 
      30             : __attribute__((weak)) long
      31         756 : fd_bundle_now( void ) {
      32         756 :   return fd_log_wallclock();
      33         756 : }
      34             : 
      35             : void
      36           3 : fd_bundle_client_reset( fd_bundle_tile_t * ctx ) {
      37           3 :   if( FD_UNLIKELY( ctx->tcp_sock >= 0 ) ) {
      38           3 :     if( FD_UNLIKELY( 0!=close( ctx->tcp_sock ) ) ) {
      39           0 :       FD_LOG_ERR(( "close(tcp_sock=%i) failed (%i-%s)", ctx->tcp_sock, errno, fd_io_strerror( errno ) ));
      40           0 :     }
      41           3 :     ctx->tcp_sock = -1;
      42           3 :     ctx->tcp_sock_connected = 0;
      43           3 :   }
      44           3 :   ctx->defer_reset = 0;
      45             : 
      46           3 :   ctx->builder_info_avail       = 0;
      47           3 :   ctx->builder_info_wait        = 0;
      48           3 :   ctx->packet_subscription_live = 0;
      49           3 :   ctx->packet_subscription_wait = 0;
      50           3 :   ctx->bundle_subscription_live = 0;
      51           3 :   ctx->bundle_subscription_wait = 0;
      52             : 
      53           3 :   memset( ctx->rtt, 0, sizeof(fd_rtt_estimate_t) );
      54             : 
      55           3 : # if FD_HAS_OPENSSL
      56           3 :   if( FD_UNLIKELY( ctx->ssl ) ) {
      57           0 :     SSL_free( ctx->ssl );
      58           0 :     ctx->ssl = NULL;
      59           0 :   }
      60           3 : # endif
      61             : 
      62           3 :   fd_bundle_tile_backoff( ctx, fd_bundle_now() );
      63             : 
      64           3 :   fd_bundle_auther_reset( &ctx->auther );
      65           3 :   fd_grpc_client_reset( ctx->grpc_client );
      66           3 : }
      67             : 
      68             : static int
      69             : fd_bundle_client_do_connect( fd_bundle_tile_t const * ctx,
      70           0 :                              uint                     ip4_addr ) {
      71           0 :   struct sockaddr_in addr = {
      72           0 :     .sin_family      = AF_INET,
      73           0 :     .sin_addr.s_addr = ip4_addr,
      74           0 :     .sin_port        = fd_ushort_bswap( ctx->server_tcp_port )
      75           0 :   };
      76           0 :   int err = connect( ctx->tcp_sock, fd_type_pun_const( &addr ), sizeof(struct sockaddr_in) );
      77             :   /* FD_LIKELY is used here as EINPROGRESS is expected even to local tcp ports */
      78           0 :   if( FD_LIKELY( err==-1 ) ) {
      79           0 :     return errno;
      80           0 :   }
      81           0 :   return 0;
      82           0 : }
      83             : 
      84             : static int
      85           0 : fd_bundle_client_get_connect_result( fd_bundle_tile_t const * ctx ) {
      86           0 :   int so_err = 0;
      87           0 :   socklen_t so_err_sz = sizeof(so_err);
      88           0 :   if( FD_UNLIKELY( getsockopt( ctx->tcp_sock, SOL_SOCKET, SO_ERROR, &so_err, &so_err_sz )==-1 ) ) {
      89           0 :     return errno;
      90           0 :   }
      91           0 :   return so_err;
      92           0 : }
      93             : 
      94             : static void
      95           0 : fd_bundle_client_create_conn( fd_bundle_tile_t * ctx ) {
      96           0 :   fd_bundle_client_reset( ctx );
      97             : 
      98             :   /* FIXME IPv6 support */
      99           0 :   fd_addrinfo_t hints = {0};
     100           0 :   hints.ai_family = AF_INET;
     101           0 :   fd_addrinfo_t * res = NULL;
     102           0 :   uchar scratch[ 4096 ];
     103           0 :   void * pscratch = scratch;
     104           0 :   int err = fd_getaddrinfo( ctx->server_fqdn, &hints, &res, &pscratch, sizeof(scratch) );
     105           0 :   if( FD_UNLIKELY( err ) ) {
     106           0 :     FD_LOG_WARNING(( "fd_getaddrinfo `%s` failed (%d-%s)", ctx->server_fqdn, err, fd_gai_strerror( err ) ));
     107           0 :     fd_bundle_client_reset( ctx );
     108           0 :     ctx->metrics.transport_fail_cnt++;
     109           0 :     return;
     110           0 :   }
     111           0 :   uint const ip4_addr = ((struct sockaddr_in *)res->ai_addr)->sin_addr.s_addr;
     112           0 :   ctx->server_ip4_addr = ip4_addr;
     113             : 
     114           0 :   int tcp_sock = socket( AF_INET, SOCK_STREAM|SOCK_CLOEXEC, 0 );
     115           0 :   if( FD_UNLIKELY( tcp_sock<0 ) ) {
     116           0 :     FD_LOG_ERR(( "socket(AF_INET,SOCK_STREAM|SOCK_CLOEXEC,0) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     117           0 :   }
     118           0 :   ctx->tcp_sock = tcp_sock;
     119             : 
     120           0 :   if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_SOCKET, SO_RCVBUF, &ctx->so_rcvbuf, sizeof(int) ) ) ) {
     121           0 :     FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_RCVBUF,%i) failed (%i-%s)", ctx->so_rcvbuf, errno, fd_io_strerror( errno ) ));
     122           0 :   }
     123             : 
     124           0 :   int tcp_nodelay = 1;
     125           0 :   if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_TCP, TCP_NODELAY, &tcp_nodelay, sizeof(int) ) ) ) {
     126           0 :     FD_LOG_ERR(( "setsockopt failed (%d-%s)", errno, fd_io_strerror( errno ) ));
     127           0 :   }
     128             : 
     129           0 :   if( FD_UNLIKELY( fcntl( tcp_sock, F_SETFL, O_NONBLOCK )==-1 ) ) {
     130           0 :     FD_LOG_ERR(( "fcntl(tcp_sock,F_SETFL,O_NONBLOCK) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     131           0 :   }
     132             : 
     133           0 :   char const * scheme = "http";
     134           0 : # if FD_HAS_OPENSSL
     135           0 :   if( ctx->is_ssl ) scheme = "https";
     136           0 : # endif
     137             : 
     138           0 :   FD_LOG_INFO(( "Connecting to %s://" FD_IP4_ADDR_FMT ":%hu (%.*s)",
     139           0 :                 scheme,
     140           0 :                 FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
     141           0 :                 (int)ctx->server_sni_len, ctx->server_sni ));
     142             : 
     143           0 :   int connect_err = fd_bundle_client_do_connect( ctx, ip4_addr );
     144             :   /* FD_LIKELY as EINPROGRESS is expected */
     145           0 :   if( FD_LIKELY( connect_err ) ) {
     146           0 :     if( FD_UNLIKELY( connect_err!=EINPROGRESS ) ) {
     147           0 :       FD_LOG_WARNING(( "connect(tcp_sock," FD_IP4_ADDR_FMT ":%u) failed (%i-%s)",
     148           0 :                       FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
     149           0 :                       connect_err, fd_io_strerror( connect_err ) ));
     150           0 :       fd_bundle_client_reset( ctx );
     151           0 :       ctx->metrics.transport_fail_cnt++;
     152           0 :       return;
     153           0 :     }
     154           0 :   }
     155             : 
     156           0 : # if FD_HAS_OPENSSL
     157           0 :   if( ctx->is_ssl ) {
     158           0 :     BIO * bio = BIO_new_socket( ctx->tcp_sock, BIO_NOCLOSE );
     159           0 :     if( FD_UNLIKELY( !bio ) ) {
     160           0 :       FD_LOG_ERR(( "BIO_new_socket failed" ));
     161           0 :     }
     162             : 
     163           0 :     SSL * ssl = SSL_new( ctx->ssl_ctx );
     164           0 :     if( FD_UNLIKELY( !ssl ) ) {
     165           0 :       FD_LOG_ERR(( "SSL_new failed" ));
     166           0 :     }
     167             : 
     168           0 :     SSL_set_bio( ssl, bio, bio ); /* moves ownership of bio */
     169           0 :     SSL_set_connect_state( ssl );
     170             : 
     171             :     /* Indicate to endpoint which server name we want */
     172           0 :     if( FD_UNLIKELY( !SSL_set_tlsext_host_name( ssl, ctx->server_sni ) ) ) {
     173           0 :       FD_LOG_ERR(( "SSL_set_tlsext_host_name failed" ));
     174           0 :     }
     175             : 
     176             :     /* Enable hostname verification */
     177           0 :     if( FD_UNLIKELY( !SSL_set1_host( ssl, ctx->server_sni ) ) ) {
     178           0 :       FD_LOG_ERR(( "SSL_set1_host failed" ));
     179           0 :     }
     180             : 
     181           0 :     ctx->ssl = ssl;
     182           0 :   }
     183           0 : # endif /* FD_HAS_OPENSSL */
     184             : 
     185           0 :   fd_grpc_client_reset( ctx->grpc_client );
     186           0 :   fd_keepalive_init( ctx->keepalive, ctx->rng, ctx->keepalive_interval, ctx->keepalive_interval, fd_bundle_now() );
     187           0 : }
     188             : 
     189             : static int
     190             : fd_bundle_client_drive_io( fd_bundle_tile_t * ctx,
     191          27 :                            int *              charge_busy ) {
     192          27 : # if FD_HAS_OPENSSL
     193          27 :   if( ctx->is_ssl ) {
     194           0 :     return fd_grpc_client_rxtx_ossl( ctx->grpc_client, ctx->ssl, charge_busy );
     195           0 :   }
     196          27 : # endif /* FD_HAS_OPENSSL */
     197             : 
     198          27 :   return fd_grpc_client_rxtx_socket( ctx->grpc_client, ctx->tcp_sock, charge_busy );
     199          27 : }
     200             : 
     201             : static void
     202           3 : fd_bundle_client_request_builder_info( fd_bundle_tile_t * ctx ) {
     203           3 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     204             : 
     205           3 :   block_engine_BlockBuilderFeeInfoRequest req = block_engine_BlockBuilderFeeInfoRequest_init_default;
     206           3 :   static char const path[] = "/block_engine.BlockEngineValidator/GetBlockBuilderFeeInfo";
     207           3 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     208           3 :       ctx->grpc_client,
     209           3 :       path, sizeof(path)-1,
     210           3 :       FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo,
     211           3 :       &block_engine_BlockBuilderFeeInfoRequest_msg, &req,
     212           3 :       ctx->auther.access_token, ctx->auther.access_token_sz,
     213           3 :       0 /* is_streaming */
     214           3 :   );
     215           3 :   if( FD_UNLIKELY( !request ) ) return;
     216           3 :   fd_grpc_client_deadline_set(
     217           3 :       request,
     218           3 :       FD_GRPC_DEADLINE_RX_END,
     219           3 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     220             : 
     221           3 :   ctx->builder_info_wait = 1;
     222           3 : }
     223             : 
     224             : static void
     225           3 : fd_bundle_client_subscribe_packets( fd_bundle_tile_t * ctx ) {
     226           3 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     227             : 
     228           3 :   block_engine_SubscribePacketsRequest req = block_engine_SubscribePacketsRequest_init_default;
     229           3 :   static char const path[] = "/block_engine.BlockEngineValidator/SubscribePackets";
     230           3 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     231           3 :       ctx->grpc_client,
     232           3 :       path, sizeof(path)-1,
     233           3 :       FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets,
     234           3 :       &block_engine_SubscribePacketsRequest_msg, &req,
     235           3 :       ctx->auther.access_token, ctx->auther.access_token_sz,
     236           3 :       0 /* is_streaming */
     237           3 :   );
     238           3 :   if( FD_UNLIKELY( !request ) ) return;
     239           3 :   fd_grpc_client_deadline_set(
     240           3 :       request,
     241           3 :       FD_GRPC_DEADLINE_HEADER,
     242           3 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     243             : 
     244           3 :   ctx->packet_subscription_wait = 1;
     245           3 : }
     246             : 
     247             : static void
     248           3 : fd_bundle_client_subscribe_bundles( fd_bundle_tile_t * ctx ) {
     249           3 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     250             : 
     251           3 :   block_engine_SubscribeBundlesRequest req = block_engine_SubscribeBundlesRequest_init_default;
     252           3 :   static char const path[] = "/block_engine.BlockEngineValidator/SubscribeBundles";
     253           3 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     254           3 :       ctx->grpc_client,
     255           3 :       path, sizeof(path)-1,
     256           3 :       FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles,
     257           3 :       &block_engine_SubscribeBundlesRequest_msg, &req,
     258           3 :       ctx->auther.access_token, ctx->auther.access_token_sz,
     259           3 :       0 /* is_streaming */
     260           3 :   );
     261           3 :   if( FD_UNLIKELY( !request ) ) return;
     262           3 :   fd_grpc_client_deadline_set(
     263           3 :       request,
     264           3 :       FD_GRPC_DEADLINE_HEADER,
     265           3 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     266             : 
     267           3 :   ctx->bundle_subscription_wait = 1;
     268           3 : }
     269             : 
     270             : void
     271           3 : fd_bundle_client_send_ping( fd_bundle_tile_t * ctx ) {
     272           3 :   if( FD_UNLIKELY( !ctx->grpc_client ) ) return; /* no client */
     273           3 :   fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
     274           3 :   if( FD_UNLIKELY( !conn ) ) return; /* no conn */
     275           3 :   if( FD_UNLIKELY( conn->flags ) ) return; /* conn busy */
     276           3 :   fd_h2_rbuf_t * rbuf_tx = fd_grpc_client_rbuf_tx( ctx->grpc_client );
     277             : 
     278           3 :   if( FD_LIKELY( fd_h2_tx_ping( conn, rbuf_tx ) ) ) {
     279           3 :     long now = fd_bundle_now();
     280           3 :     fd_keepalive_tx( ctx->keepalive, ctx->rng, now );
     281           3 :     FD_LOG_DEBUG(( "Keepalive TX (deadline=+%gs)", (double)( ctx->keepalive->ts_deadline-now )/1e9 ));
     282           3 :   }
     283           3 : }
     284             : 
     285             : int
     286             : fd_bundle_client_step_reconnect( fd_bundle_tile_t * ctx,
     287          18 :                                  long               now ) {
     288             :   /* Drive auth */
     289          18 :   if( FD_UNLIKELY( ctx->auther.needs_poll ) ) {
     290           0 :     fd_bundle_auther_poll( &ctx->auther, ctx->grpc_client, ctx->keyguard_client );
     291           0 :     return 1;
     292           0 :   }
     293          18 :   if( FD_UNLIKELY( ctx->auther.state!=FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) return 0;
     294             : 
     295             :   /* Request block builder info */
     296          18 :   int const builder_info_expired = ( ctx->builder_info_valid_until - now )<0;
     297          18 :   if( FD_UNLIKELY( ( ( !ctx->builder_info_avail ) |
     298          18 :                      ( builder_info_expired     ) ) &
     299          18 :                    ( !ctx->builder_info_wait      ) ) ) {
     300           3 :     fd_bundle_client_request_builder_info( ctx );
     301           3 :     return 1;
     302           3 :   }
     303             : 
     304             :   /* Subscribe to packets */
     305          15 :   if( FD_UNLIKELY( !ctx->packet_subscription_live && !ctx->packet_subscription_wait ) ) {
     306           3 :     fd_bundle_client_subscribe_packets( ctx );
     307           3 :     return 1;
     308           3 :   }
     309             : 
     310             :   /* Subscribe to bundles */
     311          12 :   if( FD_UNLIKELY( !ctx->bundle_subscription_live && !ctx->bundle_subscription_wait ) ) {
     312           3 :     fd_bundle_client_subscribe_bundles( ctx );
     313           3 :     return 1;
     314           3 :   }
     315             : 
     316             :   /* Send a PING */
     317           9 :   if( FD_UNLIKELY( fd_keepalive_should_tx( ctx->keepalive, now ) ) ) {
     318           3 :     fd_bundle_client_send_ping( ctx );
     319           3 :     return 1;
     320           3 :   }
     321             : 
     322           6 :   return 0;
     323           9 : }
     324             : 
     325             : static void
     326             : fd_bundle_client_step1( fd_bundle_tile_t * ctx,
     327          30 :                         int *              charge_busy ) {
     328             : 
     329             :   /* Wait for TCP socket to connect */
     330          30 :   if( FD_UNLIKELY( !ctx->tcp_sock_connected ) ) {
     331           0 :     if( FD_UNLIKELY( ctx->tcp_sock < 0 ) ) goto reconnect;
     332             : 
     333           0 :     struct pollfd pfds[1] = {
     334           0 :       { .fd = ctx->tcp_sock, .events = POLLOUT }
     335           0 :     };
     336           0 :     int poll_res = fd_syscall_poll( pfds, 1, 0 );
     337           0 :     if( FD_UNLIKELY( poll_res<0 ) ) {
     338           0 :       FD_LOG_ERR(( "fd_syscall_poll(tcp_sock) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     339           0 :     }
     340           0 :     if( poll_res==0 ) return;
     341             : 
     342           0 :     int connect_result = 0;
     343           0 :     if( pfds[0].revents & (POLLERR|POLLHUP) ) {
     344           0 :       connect_result = fd_bundle_client_get_connect_result( ctx );
     345           0 :     connect_failed:
     346           0 :       FD_LOG_INFO(( "Bundle gRPC connect attempt failed (%i-%s)", connect_result, fd_io_strerror( connect_result ) ));
     347           0 :       fd_bundle_client_reset( ctx );
     348           0 :       ctx->metrics.transport_fail_cnt++;
     349           0 :       *charge_busy = 1;
     350           0 :       return;
     351           0 :     }
     352           0 :     if( pfds[0].revents & POLLOUT ) {
     353           0 :       connect_result = fd_bundle_client_get_connect_result( ctx );
     354           0 :       if( FD_UNLIKELY( connect_result!=0 ) ) {
     355           0 :         goto connect_failed;
     356           0 :       }
     357           0 :       FD_LOG_DEBUG(( "Bundle TCP socket connected" ));
     358           0 :       ctx->tcp_sock_connected = 1;
     359           0 :       *charge_busy = 1;
     360           0 :       return;
     361           0 :     }
     362           0 :     return;
     363           0 :   }
     364             : 
     365             :   /* gRPC conn died? */
     366          30 :   if( FD_UNLIKELY( !ctx->grpc_client ) ) {
     367           0 :     long sleep_start;
     368           0 :   reconnect:
     369           0 :     sleep_start = fd_bundle_now();
     370           0 :     if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, sleep_start ) ) ) {
     371           0 :       long wait_dur = ctx->backoff_until - sleep_start;
     372           0 :       fd_log_sleep( fd_long_min( wait_dur, 1e6 ) );
     373           0 :       return;
     374           0 :     }
     375           0 :     fd_bundle_client_create_conn( ctx );
     376           0 :     *charge_busy = 1;
     377           0 :     return;
     378           0 :   }
     379             : 
     380             :   /* Did a HTTP/2 PING time out */
     381          30 :   long check_ts = ctx->cached_ts = fd_bundle_now();
     382          30 :   if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, check_ts ) ) ) {
     383           3 :     FD_LOG_WARNING(( "Bundle gRPC timed out (HTTP/2 PING went unanswered for %.2f seconds)",
     384           3 :                      (double)( check_ts - ctx->keepalive->ts_last_tx )/1e9 ));
     385           3 :     ctx->keepalive->inflight = 0;
     386           3 :     ctx->defer_reset = 1;
     387           3 :     *charge_busy = 1;
     388           3 :     return;
     389           3 :   }
     390             : 
     391             :   /* Drive I/O, SSL handshake, and any inflight requests */
     392          27 :   if( FD_UNLIKELY( -1==fd_bundle_client_drive_io( ctx, charge_busy ) || ctx->defer_reset /* new error? */ ) ) {
     393           0 :     fd_bundle_client_reset( ctx );
     394           0 :     ctx->metrics.transport_fail_cnt++;
     395           0 :     *charge_busy = 1;
     396           0 :     return;
     397           0 :   }
     398             : 
     399             :   /* Are we ready to issue a new request? */
     400          27 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     401          18 :   long io_ts = fd_bundle_now();
     402          18 :   if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, io_ts ) ) ) return;
     403             : 
     404          18 :   *charge_busy |= fd_bundle_client_step_reconnect( ctx, io_ts );
     405          18 : }
     406             : 
     407             : static void
     408          30 : fd_bundle_client_log_status( fd_bundle_tile_t * ctx ) {
     409          30 :   int status = fd_bundle_client_status( ctx );
     410             : 
     411          30 :   int const connected_now    = ( status==FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED );
     412          30 :   int const connected_before = ( ctx->bundle_status_logged==FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED );
     413             : 
     414          30 :   if( FD_UNLIKELY( connected_now!=connected_before ) ) {
     415           3 :     long ts = fd_log_wallclock();
     416           3 :     if( FD_LIKELY( ts-(ctx->last_bundle_status_log_nanos) >= (long)1e6 ) ) {
     417           3 :       if( connected_now ) {
     418           3 :         FD_LOG_NOTICE(( "Connected to bundle server" ));
     419           3 :       } else {
     420           0 :         FD_LOG_WARNING(( "Disconnected from bundle server" ));
     421           0 :       }
     422           3 :       ctx->last_bundle_status_log_nanos = ts;
     423           3 :       ctx->bundle_status_logged = (uchar)status;
     424           3 :     }
     425           3 :   }
     426          30 : }
     427             : 
     428             : void
     429             : fd_bundle_client_step( fd_bundle_tile_t * ctx,
     430          30 :                        int *              charge_busy ) {
     431             :   /* Edge-trigger logging with rate limiting */
     432          30 :   fd_bundle_client_step1( ctx, charge_busy );
     433          30 :   fd_bundle_client_log_status( ctx );
     434          30 : }
     435             : 
     436             : void
     437             : fd_bundle_tile_backoff( fd_bundle_tile_t * ctx,
     438          12 :                         long               now ) {
     439          12 :   uint iter = ctx->backoff_iter;
     440          12 :   if( now < ctx->backoff_reset ) iter = 0U;
     441          12 :   iter++;
     442             : 
     443             :   /* FIXME proper backoff */
     444          12 :   long wait_ns = (long)2e9;
     445          12 :   wait_ns = (long)( fd_rng_ulong( ctx->rng ) & ( (1UL<<fd_ulong_find_msb_w_default( (ulong)wait_ns, 0 ))-1UL ) );
     446             : 
     447          12 :   ctx->backoff_until = now +   wait_ns;
     448          12 :   ctx->backoff_reset = now + 2*wait_ns;
     449             : 
     450          12 :   ctx->backoff_iter = iter;
     451          12 : }
     452             : 
     453             : static void
     454           0 : fd_bundle_client_grpc_conn_established( void * app_ctx ) {
     455           0 :   (void)app_ctx;
     456           0 :   FD_LOG_INFO(( "Bundle gRPC connection established" ));
     457           0 : }
     458             : 
     459             : static void
     460             : fd_bundle_client_grpc_conn_dead( void * app_ctx,
     461             :                                  uint   h2_err,
     462           0 :                                  int    closed_by ) {
     463           0 :   fd_bundle_tile_t * ctx = app_ctx;
     464           0 :   FD_LOG_INFO(( "Bundle gRPC connection closed %s (%u-%s)",
     465           0 :                 closed_by ? "by peer" : "due to error",
     466           0 :                 h2_err, fd_h2_strerror( h2_err ) ));
     467           0 :   ctx->defer_reset = 1;
     468           0 : }
     469             : 
     470             : /* Forwards a bundle transaction to the tango message bus. */
     471             : 
     472             : static void
     473             : fd_bundle_tile_publish_bundle_txn(
     474             :     fd_bundle_tile_t * ctx,
     475             :     void const *       txn,
     476             :     ulong              txn_sz,  /* <=FD_TXN_MTU */
     477             :     ulong              bundle_txn_cnt,
     478             :     uint               source_ipv4
     479          30 : ) {
     480          30 :   if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
     481           0 :     ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
     482           0 :     return;
     483           0 :   }
     484             : 
     485          30 :   fd_txn_m_t * txnm = fd_chunk_to_laddr( ctx->verify_out.mem, ctx->verify_out.chunk );
     486          30 :   *txnm = (fd_txn_m_t) {
     487          30 :     .reference_slot = 0UL,
     488          30 :     .payload_sz     = (ushort)txn_sz,
     489          30 :     .txn_t_sz       = 0U,
     490          30 :     .source_ipv4      = source_ipv4,
     491          30 :     .source_tpu       = FD_TXN_M_TPU_SOURCE_BUNDLE,
     492          30 :     .block_engine   = {
     493          30 :       .bundle_id      = ctx->bundle_seq,
     494          30 :       .bundle_txn_cnt = bundle_txn_cnt,
     495          30 :       .commission     = (uchar)ctx->builder_commission
     496          30 :     },
     497          30 :   };
     498          30 :   memcpy( txnm->block_engine.commission_pubkey, ctx->builder_pubkey, 32UL );
     499          30 :   fd_memcpy( fd_txn_m_payload( txnm ), txn, txn_sz );
     500             : 
     501          30 :   ulong sz  = fd_txn_m_realized_footprint( txnm, 0, 0 );
     502          30 :   ulong sig = 1UL;
     503             : 
     504          30 :   if( FD_UNLIKELY( !ctx->stem ) ) {
     505           0 :     FD_LOG_CRIT(( "ctx->stem not set. This is a bug." ));
     506           0 :   }
     507             : 
     508          30 :   ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_bundle_now() );
     509          30 :   fd_stem_publish( ctx->stem, ctx->verify_out.idx, sig, ctx->verify_out.chunk, sz, 0UL, 0UL, tspub );
     510          30 :   ctx->verify_out.chunk = fd_dcache_compact_next( ctx->verify_out.chunk, sz, ctx->verify_out.chunk0, ctx->verify_out.wmark );
     511          30 :   ctx->metrics.txn_received_cnt++;
     512          30 : }
     513             : 
     514             : /* Forwards a regular transaction to the tango message bus. */
     515             : 
     516             : static void
     517             : fd_bundle_tile_publish_txn(
     518             :     fd_bundle_tile_t * ctx,
     519             :     void const *       txn,
     520             :     ulong              txn_sz,  /* <=FD_TXN_MTU */
     521             :     uint               source_ipv4
     522           9 : ) {
     523           9 :   fd_txn_m_t * txnm = fd_chunk_to_laddr( ctx->verify_out.mem, ctx->verify_out.chunk );
     524           9 :   *txnm = (fd_txn_m_t) {
     525           9 :     .reference_slot = 0UL,
     526           9 :     .payload_sz     = (ushort)txn_sz,
     527           9 :     .txn_t_sz       = 0U,
     528           9 :     .source_ipv4    = source_ipv4,
     529           9 :     .source_tpu     = FD_TXN_M_TPU_SOURCE_BUNDLE,
     530           9 :     .block_engine   = {
     531           9 :       .bundle_id         = 0UL,
     532           9 :       .bundle_txn_cnt    = 1UL,
     533           9 :       .commission        = 0U,
     534           9 :       .commission_pubkey = {0U}
     535           9 :     },
     536           9 :   };
     537           9 :   fd_memcpy( fd_txn_m_payload( txnm ), txn, txn_sz );
     538             : 
     539           9 :   ulong sz  = fd_txn_m_realized_footprint( txnm, 0, 0 );
     540           9 :   ulong sig = 0UL;
     541             : 
     542           9 :   if( FD_UNLIKELY( !ctx->stem ) ) {
     543           0 :     FD_LOG_CRIT(( "ctx->stem not set. This is a bug." ));
     544           0 :   }
     545             : 
     546           9 :   ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_bundle_now() );
     547           9 :   fd_stem_publish( ctx->stem, ctx->verify_out.idx, sig, ctx->verify_out.chunk, sz, 0UL, 0UL, tspub );
     548           9 :   ctx->verify_out.chunk = fd_dcache_compact_next( ctx->verify_out.chunk, sz, ctx->verify_out.chunk0, ctx->verify_out.wmark );
     549           9 :   ctx->metrics.txn_received_cnt++;
     550           9 : }
     551             : 
     552             : /* Called for each transaction in a bundle.  Simply counts up
     553             :    bundle_txn_cnt, but does not publish anything. */
     554             : 
     555             : static bool
     556             : fd_bundle_client_visit_pb_bundle_txn_preflight(
     557             :     pb_istream_t *     istream,
     558             :     pb_field_t const * field,
     559             :     void **            arg
     560          48 : ) {
     561          48 :   (void)istream; (void)field;
     562          48 :   fd_bundle_tile_t * ctx = *arg;
     563          48 :   ctx->bundle_txn_cnt++;
     564          48 :   return true;
     565          48 : }
     566             : 
     567             : /* Called for each transaction in a bundle.  Publishes each transaction
     568             :    to the tango message bus. */
     569             : 
     570             : static bool
     571             : fd_bundle_client_visit_pb_bundle_txn(
     572             :     pb_istream_t *     istream,
     573             :     pb_field_t const * field,
     574             :     void **            arg
     575          30 : ) {
     576          30 :   (void)field;
     577          30 :   fd_bundle_tile_t * ctx = *arg;
     578             : 
     579          30 :   packet_Packet packet = packet_Packet_init_default;
     580          30 :   if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
     581           0 :     ctx->metrics.decode_fail_cnt++;
     582           0 :     FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
     583           0 :     return false;
     584           0 :   }
     585             : 
     586          30 :   if( FD_UNLIKELY( packet.data.size == 0 ) ) {
     587           0 :     FD_LOG_WARNING(( "Bundle server delivered an empty packet, ignoring" ));
     588           0 :     return true;
     589           0 :   }
     590             : 
     591          30 :   if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
     592           0 :     FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
     593           0 :     return true;
     594           0 :   }
     595             : 
     596          30 :   uint _ip4; uint ip4 = fd_uint_if( packet.has_meta, fd_cstr_to_ip4_addr( packet.meta.addr, &_ip4 ) ? _ip4 : ctx->server_ip4_addr, ctx->server_ip4_addr );
     597          30 :   fd_bundle_tile_publish_bundle_txn(
     598          30 :       ctx,
     599          30 :       packet.data.bytes, packet.data.size,
     600          30 :       ctx->bundle_txn_cnt,
     601          30 :       ip4
     602          30 :   );
     603             : 
     604          30 :   return true;
     605          30 : }
     606             : 
     607             : static void
     608             : fd_bundle_client_sample_rx_delay(
     609             :     fd_bundle_tile_t *                ctx,
     610             :     google_protobuf_Timestamp const * ts
     611          12 : ) {
     612          12 :   ulong tsorig = (ulong)ts->seconds*(ulong)1e9 + (ulong)ts->nanos;
     613          12 :   fd_histf_sample( ctx->metrics.msg_rx_delay, fd_ulong_sat_sub( (ulong)ctx->cached_ts, tsorig ) );
     614          12 : }
     615             : 
     616             : /* Called for each BundleUuid in a SubscribeBundlesResponse. */
     617             : 
     618             : static bool
     619             : fd_bundle_client_visit_pb_bundle_uuid(
     620             :     pb_istream_t *     istream,
     621             :     pb_field_t const * field,
     622             :     void **            arg
     623           9 : ) {
     624           9 :   (void)field;
     625           9 :   fd_bundle_tile_t * ctx = *arg;
     626             : 
     627             :   /* Reset bundle state */
     628             : 
     629           9 :   ctx->bundle_txn_cnt = 0UL;
     630             : 
     631             :   /* Do two decode passes.  This is required because we need to know the
     632             :      number of transactions in a bundle ahead of time.  However, due to
     633             :      the Protobuf wire encoding, we don't know the number of txns that
     634             :      will come until we've parsed everything.
     635             : 
     636             :      First pass: Count number of bundles. */
     637             : 
     638           9 :   pb_istream_t peek = *istream;
     639           9 :   bundle_BundleUuid bundle = bundle_BundleUuid_init_default;
     640           9 :   bundle.bundle.packets = (pb_callback_t) {
     641           9 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_txn_preflight,
     642           9 :     .arg          = ctx
     643           9 :   };
     644           9 :   if( FD_UNLIKELY( !pb_decode( &peek, &bundle_BundleUuid_msg, &bundle ) ) ) {
     645           0 :     ctx->metrics.decode_fail_cnt++;
     646           0 :     FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed: %s", peek.errmsg ));
     647           0 :     return false;
     648           0 :   }
     649             : 
     650             :   /* At this opint, ctx->bundle_txn_cnt is correctly set.  Too many txns
     651             :      is treated as a NOP.
     652             : 
     653             :      Second pass: Actually publish bundle packets */
     654             : 
     655           9 :   if( FD_UNLIKELY( ctx->bundle_txn_cnt>FD_BUNDLE_CLIENT_MAX_TXN_PER_BUNDLE ) ) return true;
     656             : 
     657           6 :   ctx->bundle_seq++;
     658           6 :   bundle = (bundle_BundleUuid)bundle_BundleUuid_init_default;
     659           6 :   bundle.bundle.packets = (pb_callback_t) {
     660           6 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_txn,
     661           6 :     .arg          = ctx
     662           6 :   };
     663             : 
     664           6 :   ctx->metrics.bundle_received_cnt++;
     665             : 
     666           6 :   if( FD_UNLIKELY( !pb_decode( istream, &bundle_BundleUuid_msg, &bundle ) ) ) {
     667           0 :     ctx->metrics.decode_fail_cnt++;
     668           0 :     FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed (internal error): %s", istream->errmsg ));
     669           0 :     return false;
     670           0 :   }
     671             : 
     672           6 :   fd_bundle_client_sample_rx_delay( ctx, &bundle.bundle.header.ts );
     673             : 
     674           6 :   return true;
     675           6 : }
     676             : 
     677             : /* Handle a SubscribeBundlesResponse from a SubscribeBundles gRPC call. */
     678             : 
     679             : static void
     680             : fd_bundle_client_handle_bundle_batch(
     681             :     fd_bundle_tile_t * ctx,
     682             :     pb_istream_t *     istream
     683          12 : ) {
     684          12 :   if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
     685           3 :     ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
     686           3 :     return;
     687           3 :   }
     688             : 
     689           9 :   block_engine_SubscribeBundlesResponse res = block_engine_SubscribeBundlesResponse_init_default;
     690           9 :   res.bundles = (pb_callback_t) {
     691           9 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_uuid,
     692           9 :     .arg          = ctx
     693           9 :   };
     694           9 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribeBundlesResponse_msg, &res ) ) ) {
     695           0 :     ctx->metrics.decode_fail_cnt++;
     696           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribeBundlesResponse) failed: %s", istream->errmsg ));
     697           0 :     return;
     698           0 :   }
     699           9 : }
     700             : 
     701             : /* Called for each 'Packet' (a regular transaction) of a
     702             :    SubscribePacketsResponse. */
     703             : 
     704             : static bool
     705             : fd_bundle_client_visit_pb_packet(
     706             :     pb_istream_t *     istream,
     707             :     pb_field_t const * field,
     708             :     void **            arg
     709           9 : ) {
     710           9 :   (void)field;
     711           9 :   fd_bundle_tile_t * ctx = *arg;
     712             : 
     713           9 :   packet_Packet packet = packet_Packet_init_default;
     714           9 :   if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
     715           0 :     ctx->metrics.decode_fail_cnt++;
     716           0 :     FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
     717           0 :     return false;
     718           0 :   }
     719             : 
     720           9 :   if( FD_UNLIKELY( packet.data.size == 0 ) ) {
     721           0 :     FD_LOG_WARNING(( "Bundle server delivered an empty packet, ignoring" ));
     722           0 :     return true;
     723           0 :   }
     724             : 
     725           9 :   if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
     726           0 :     FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
     727           0 :     return true;
     728           0 :   }
     729             : 
     730             : 
     731           9 :   uint _ip4; uint ip4 = fd_uint_if( packet.has_meta, fd_cstr_to_ip4_addr( packet.meta.addr, &_ip4 ) ? _ip4 : 0U, 0U );
     732           9 :   fd_bundle_tile_publish_txn( ctx, packet.data.bytes, packet.data.size, ip4 );
     733           9 :   ctx->metrics.packet_received_cnt++;
     734             : 
     735           9 :   return true;
     736           9 : }
     737             : 
     738             : /* Handle a SubscribePacketsResponse from a SubscribePackets gRPC call. */
     739             : 
     740             : static void
     741             : fd_bundle_client_handle_packet_batch(
     742             :     fd_bundle_tile_t * ctx,
     743             :     pb_istream_t *     istream
     744           6 : ) {
     745           6 :   block_engine_SubscribePacketsResponse res = block_engine_SubscribePacketsResponse_init_default;
     746           6 :   res.batch.packets = (pb_callback_t) {
     747           6 :     .funcs.decode = fd_bundle_client_visit_pb_packet,
     748           6 :     .arg          = ctx
     749           6 :   };
     750           6 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribePacketsResponse_msg, &res ) ) ) {
     751           0 :     ctx->metrics.decode_fail_cnt++;
     752           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribePacketsResponse) failed" ));
     753           0 :     return;
     754           0 :   }
     755             : 
     756           6 :   fd_bundle_client_sample_rx_delay( ctx, &res.header.ts );
     757           6 : }
     758             : 
     759             : /* Handle a BlockBuilderFeeInfoResponse from a GetBlockBuilderFeeInfo
     760             :    gRPC call. */
     761             : 
     762             : static void
     763             : fd_bundle_client_handle_builder_fee_info(
     764             :     fd_bundle_tile_t * ctx,
     765             :     pb_istream_t *     istream
     766           9 : ) {
     767           9 :   block_engine_BlockBuilderFeeInfoResponse res = block_engine_BlockBuilderFeeInfoResponse_init_default;
     768           9 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_BlockBuilderFeeInfoResponse_msg, &res ) ) ) {
     769           0 :     ctx->metrics.decode_fail_cnt++;
     770           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.BlockBuilderFeeInfoResponse) failed" ));
     771           0 :     return;
     772           0 :   }
     773           9 :   if( FD_UNLIKELY( res.commission > 100 ) ) {
     774           3 :     ctx->metrics.decode_fail_cnt++;
     775           3 :     FD_LOG_WARNING(( "BlockBuilderFeeInfoResponse commission out of range (0-100): %lu", res.commission ));
     776           3 :     return;
     777           3 :   }
     778             : 
     779           6 :   ctx->builder_commission = (uchar)res.commission;
     780           6 :   if( FD_UNLIKELY( !fd_base58_decode_32( res.pubkey, ctx->builder_pubkey ) ) ) {
     781           3 :     FD_LOG_HEXDUMP_WARNING(( "Invalid pubkey in BlockBuilderFeeInfoResponse", res.pubkey, strnlen( res.pubkey, sizeof(res.pubkey) ) ));
     782           3 :     return;
     783           3 :   }
     784             : 
     785           3 :   long validity_duration_ns = (long)( 60e9 * 5. ); /* 5 minutes */
     786           3 :   ctx->builder_info_avail = 1;
     787           3 :   ctx->builder_info_valid_until = fd_bundle_now() + validity_duration_ns;
     788           3 : }
     789             : 
     790             : static void
     791             : fd_bundle_client_grpc_tx_complete(
     792             :     void * app_ctx,
     793             :     ulong  request_ctx
     794           9 : ) {
     795           9 :   (void)app_ctx; (void)request_ctx;
     796           9 : }
     797             : 
     798             : void
     799             : fd_bundle_client_grpc_rx_start(
     800             :     void * app_ctx,
     801             :     ulong  request_ctx
     802           9 : ) {
     803           9 :   fd_bundle_tile_t * ctx = app_ctx;
     804           9 :   switch( request_ctx ) {
     805           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     806           3 :     ctx->packet_subscription_live = 1;
     807           3 :     ctx->packet_subscription_wait = 0;
     808           3 :     break;
     809           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     810           3 :     ctx->bundle_subscription_live = 1;
     811           3 :     ctx->bundle_subscription_wait = 0;
     812           3 :     break;
     813           9 :   }
     814           9 : }
     815             : 
     816             : void
     817             : fd_bundle_client_grpc_rx_msg(
     818             :     void *       app_ctx,
     819             :     void const * protobuf,
     820             :     ulong        protobuf_sz,
     821             :     ulong        request_ctx
     822          27 : ) {
     823          27 :   fd_bundle_tile_t * ctx = app_ctx;
     824          27 :   ctx->metrics.proto_received_bytes += protobuf_sz;
     825          27 :   pb_istream_t istream = pb_istream_from_buffer( protobuf, protobuf_sz );
     826          27 :   switch( request_ctx ) {
     827           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     828           0 :     if( FD_UNLIKELY( !fd_bundle_auther_handle_challenge_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
     829           0 :       ctx->metrics.decode_fail_cnt++;
     830           0 :       fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     831           0 :     }
     832           0 :     break;
     833           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     834           0 :     if( FD_UNLIKELY( !fd_bundle_auther_handle_tokens_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
     835           0 :       ctx->metrics.decode_fail_cnt++;
     836           0 :       fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     837           0 :     }
     838           0 :     break;
     839          12 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     840          12 :     fd_bundle_client_handle_bundle_batch( ctx, &istream );
     841          12 :     break;
     842           6 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     843           6 :     fd_bundle_client_handle_packet_batch( ctx, &istream );
     844           6 :     break;
     845           9 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
     846           9 :     fd_bundle_client_handle_builder_fee_info( ctx, &istream );
     847           9 :     break;
     848           0 :   default:
     849           0 :     FD_LOG_ERR(( "Received unexpected gRPC message (request_ctx=%lu)", request_ctx ));
     850          27 :   }
     851          27 : }
     852             : 
     853             : static void
     854             : fd_bundle_client_request_failed( fd_bundle_tile_t * ctx,
     855           0 :                                  ulong              request_ctx ) {
     856           0 :   fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     857           0 :   switch( request_ctx ) {
     858           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     859           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     860           0 :     fd_bundle_auther_handle_request_fail( &ctx->auther );
     861           0 :     break;
     862           0 :   }
     863           0 : }
     864             : 
     865             : void
     866             : fd_bundle_client_grpc_rx_end(
     867             :     void *                app_ctx,
     868             :     ulong                 request_ctx,
     869             :     fd_grpc_resp_hdrs_t * resp
     870          12 : ) {
     871          12 :   fd_bundle_tile_t * ctx = app_ctx;
     872          12 :   if( FD_UNLIKELY( resp->h2_status!=200 ) ) {
     873           0 :     FD_LOG_WARNING(( "gRPC request failed (HTTP status %u)", resp->h2_status ));
     874           0 :     fd_bundle_client_request_failed( ctx, request_ctx );
     875           0 :     return;
     876           0 :   }
     877             : 
     878          12 :   resp->grpc_msg_len = (uint)fd_url_unescape( resp->grpc_msg, resp->grpc_msg_len );
     879          12 :   if( !resp->grpc_msg_len ) {
     880          12 :     fd_memcpy( resp->grpc_msg, "unknown error", 13 );
     881          12 :     resp->grpc_msg_len = 13;
     882          12 :   }
     883             : 
     884          12 :   switch( request_ctx ) {
     885           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     886           0 :     ctx->packet_subscription_live = 0;
     887           0 :     ctx->packet_subscription_wait = 0;
     888           0 :     fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     889           0 :     ctx->defer_reset = 1;
     890           0 :     FD_LOG_INFO(( "SubscribePackets stream failed (gRPC status %u-%s). Reconnecting ...",
     891           0 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
     892           0 :     return;
     893           9 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     894           9 :     ctx->bundle_subscription_live = 0;
     895           9 :     ctx->bundle_subscription_wait = 0;
     896           9 :     fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     897           9 :     ctx->defer_reset = 1;
     898           9 :     FD_LOG_INFO(( "SubscribeBundles stream failed (gRPC status %u-%s). Reconnecting ...",
     899           9 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
     900           9 :     return;
     901           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
     902           3 :     ctx->builder_info_wait = 0;
     903           3 :     break;
     904           0 :   default:
     905           0 :     break;
     906          12 :   }
     907             : 
     908           3 :   if( FD_UNLIKELY( resp->grpc_status!=FD_GRPC_STATUS_OK ) ) {
     909           0 :     FD_LOG_INFO(( "gRPC request failed (gRPC status %u-%s): %.*s",
     910           0 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
     911           0 :                   (int)resp->grpc_msg_len, resp->grpc_msg ));
     912           0 :     fd_bundle_client_request_failed( ctx, request_ctx );
     913           0 :     if( resp->grpc_status==FD_GRPC_STATUS_UNAUTHENTICATED ||
     914           0 :         resp->grpc_status==FD_GRPC_STATUS_PERMISSION_DENIED ) {
     915           0 :       fd_bundle_auther_reset( &ctx->auther );
     916           0 :     }
     917           0 :     return;
     918           0 :   }
     919           3 : }
     920             : 
     921             : void
     922             : fd_bundle_client_grpc_rx_timeout(
     923             :     void * app_ctx,
     924             :     ulong  request_ctx,  /* FD_BUNDLE_CLIENT_REQ_{...} */
     925             :     int    deadline_kind /* FD_GRPC_DEADLINE_{HEADER|RX_END} */
     926           6 : ) {
     927           6 :   (void)deadline_kind;
     928           6 :   FD_LOG_WARNING(( "Request timed out: %s", fd_bundle_request_ctx_cstr( request_ctx ) ));
     929           6 :   fd_bundle_tile_t * ctx = app_ctx;
     930           6 :   ctx->defer_reset = 1;
     931           6 : }
     932             : 
     933             : static void
     934           3 : fd_bundle_client_grpc_ping_ack( void * app_ctx ) {
     935           3 :   fd_bundle_tile_t * ctx = app_ctx;
     936           3 :   long rtt_sample = fd_keepalive_rx( ctx->keepalive, fd_bundle_now() );
     937           3 :   if( FD_LIKELY( rtt_sample ) ) {
     938           3 :     fd_rtt_sample( ctx->rtt, (float)rtt_sample, 0 );
     939           3 :     FD_LOG_DEBUG(( "Keepalive ACK" ));
     940           3 :   }
     941           3 :   ctx->metrics.ping_ack_cnt++;
     942           3 : }
     943             : 
     944             : fd_grpc_client_callbacks_t fd_bundle_client_grpc_callbacks = {
     945             :   .conn_established = fd_bundle_client_grpc_conn_established,
     946             :   .conn_dead        = fd_bundle_client_grpc_conn_dead,
     947             :   .tx_complete      = fd_bundle_client_grpc_tx_complete,
     948             :   .rx_start         = fd_bundle_client_grpc_rx_start,
     949             :   .rx_msg           = fd_bundle_client_grpc_rx_msg,
     950             :   .rx_end           = fd_bundle_client_grpc_rx_end,
     951             :   .rx_timeout       = fd_bundle_client_grpc_rx_timeout,
     952             :   .ping_ack         = fd_bundle_client_grpc_ping_ack,
     953             : };
     954             : 
     955             : /* Decrease verbosity */
     956          12 : #define DISCONNECTED FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_DISCONNECTED
     957          54 : #define CONNECTING   FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTING
     958         108 : #define CONNECTED    FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED
     959             : 
     960             : int
     961         174 : fd_bundle_client_status( fd_bundle_tile_t const * ctx ) {
     962         174 :   if( FD_UNLIKELY( ( !ctx->tcp_sock_connected ) |
     963         174 :                    ( !ctx->grpc_client        ) ) ) {
     964           3 :     return DISCONNECTED;
     965           3 :   }
     966             : 
     967         171 :   fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
     968         171 :   if( FD_UNLIKELY( !conn ) ) {
     969           0 :     return DISCONNECTED; /* no conn */
     970           0 :   }
     971         171 :   if( FD_UNLIKELY( conn->flags &
     972         171 :       ( FD_H2_CONN_FLAGS_DEAD |
     973         171 :         FD_H2_CONN_FLAGS_SEND_GOAWAY ) ) ) {
     974           6 :     return DISCONNECTED;
     975           6 :   }
     976             : 
     977         165 :   if( FD_UNLIKELY( conn->flags &
     978         165 :       ( FD_H2_CONN_FLAGS_CLIENT_INITIAL      |
     979         165 :         FD_H2_CONN_FLAGS_WAIT_SETTINGS_ACK_0 |
     980         165 :         FD_H2_CONN_FLAGS_WAIT_SETTINGS_0     |
     981         165 :         FD_H2_CONN_FLAGS_SERVER_INITIAL ) ) ) {
     982          12 :     return CONNECTING; /* connection is not ready */
     983          12 :   }
     984             : 
     985         153 :   if( FD_UNLIKELY( ctx->auther.state != FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) {
     986          12 :     return CONNECTING; /* not authenticated */
     987          12 :   }
     988             : 
     989         141 :   if( FD_UNLIKELY( ( !ctx->builder_info_avail       ) |
     990         141 :                    ( !ctx->packet_subscription_live ) |
     991         141 :                    ( !ctx->bundle_subscription_live ) ) ) {
     992          27 :     return CONNECTING; /* not fully connected */
     993          27 :   }
     994             : 
     995         114 :   if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, fd_bundle_now() ) ) ) {
     996           3 :     return DISCONNECTED; /* possible timeout */
     997           3 :   }
     998             : 
     999         111 :   if( FD_UNLIKELY( !fd_grpc_client_is_connected( ctx->grpc_client ) ) ) {
    1000           3 :     return CONNECTING;
    1001           3 :   }
    1002             : 
    1003             :   /* As far as we know, the bundle connection is alive and well. */
    1004         108 :   return CONNECTED;
    1005         111 : }
    1006             : 
    1007             : #undef DISCONNECTED
    1008             : #undef CONNECTING
    1009             : #undef CONNECTED
    1010             : 
    1011             : FD_FN_CONST char const *
    1012           6 : fd_bundle_request_ctx_cstr( ulong request_ctx ) {
    1013           6 :   switch( request_ctx ) {
    1014           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
    1015           0 :     return "GenerateAuthChallenge";
    1016           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
    1017           0 :     return "GenerateAuthTokens";
    1018           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
    1019           0 :     return "SubscribePackets";
    1020           6 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
    1021           6 :     return "SubscribeBundles";
    1022           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
    1023           0 :     return "GetBlockBuilderFeeInfo";
    1024           0 :   default:
    1025           0 :     return "unknown";
    1026           6 :   }
    1027           6 : }

Generated by: LCOV version 1.14