LCOV - code coverage report
Current view: top level - disco/bundle - fd_bundle_client.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 414 687 60.3 %
Date: 2026-04-01 06:30:45 Functions: 30 36 83.3 %

          Line data    Source code
       1             : /* fd_bundle_client.c steps gRPC related tasks. */
       2             : 
       3             : #define _GNU_SOURCE /* SOL_TCP */
       4             : #include "fd_bundle_auth.h"
       5             : #include "fd_bundle_tile_private.h"
       6             : #include "fd_bundle_tile.h"
       7             : #include "proto/block_engine.pb.h"
       8             : #include "proto/bundle.pb.h"
       9             : #include "proto/packet.pb.h"
      10             : #include "../fd_txn_m.h"
      11             : #include "../../waltz/h2/fd_h2_conn.h"
      12             : #include "../../waltz/http/fd_url.h" /* fd_url_unescape */
      13             : #include "../../waltz/openssl/fd_openssl.h"
      14             : #include "../../ballet/base58/fd_base58.h"
      15             : #include "../../ballet/nanopb/pb_decode.h"
      16             : #include "../../util/net/fd_ip4.h"
      17             : 
      18             : #include <fcntl.h>
      19             : #include <errno.h>
      20             : #include <unistd.h> /* close */
      21             : #include <poll.h> /* poll */
      22             : #include <sys/socket.h> /* socket */
      23             : #include <netinet/in.h>
      24             : #include <netinet/ip.h>
      25             : #include <netinet/tcp.h>
      26             : 
      27           9 : #define FD_BUNDLE_CLIENT_REQUEST_TIMEOUT ((long)8e9) /* 8 seconds */
      28             : 
      29             : 
      30             : __attribute__((weak)) long
      31         858 : fd_bundle_now( void ) {
      32         858 :   return fd_log_wallclock();
      33         858 : }
      34             : 
      35             : void
      36           3 : fd_bundle_client_reset( fd_bundle_tile_t * ctx ) {
      37           3 :   if( FD_UNLIKELY( ctx->tcp_sock >= 0 ) ) {
      38           3 :     if( FD_UNLIKELY( 0!=close( ctx->tcp_sock ) ) ) {
      39           0 :       FD_LOG_ERR(( "close(tcp_sock=%i) failed (%i-%s)", ctx->tcp_sock, errno, fd_io_strerror( errno ) ));
      40           0 :     }
      41           3 :     ctx->tcp_sock = -1;
      42           3 :     ctx->tcp_sock_connected = 0;
      43           3 :   }
      44           3 :   ctx->defer_reset = 0;
      45             : 
      46           3 :   ctx->builder_info_avail       = 0;
      47           3 :   ctx->builder_info_wait        = 0;
      48           3 :   ctx->packet_subscription_live = 0;
      49           3 :   ctx->packet_subscription_wait = 0;
      50           3 :   ctx->bundle_subscription_live = 0;
      51           3 :   ctx->bundle_subscription_wait = 0;
      52             : 
      53           3 :   fd_memset( ctx->rtt, 0, sizeof(fd_rtt_estimate_t) );
      54             : 
      55           3 : # if FD_HAS_OPENSSL
      56           3 :   if( FD_UNLIKELY( ctx->ssl ) ) {
      57           0 :     SSL_free( ctx->ssl );
      58           0 :     ctx->ssl = NULL;
      59           0 :   }
      60           3 : # endif
      61             : 
      62           3 :   fd_bundle_tile_backoff( ctx, fd_bundle_now() );
      63             : 
      64           3 :   fd_bundle_auther_reset( &ctx->auther );
      65           3 :   fd_grpc_client_reset( ctx->grpc_client );
      66           3 : }
      67             : 
      68             : static int
      69             : fd_bundle_client_do_connect( fd_bundle_tile_t const * ctx,
      70           0 :                              uint                     ip4_addr ) {
      71           0 :   struct sockaddr_in addr = {
      72           0 :     .sin_family      = AF_INET,
      73           0 :     .sin_addr.s_addr = ip4_addr,
      74           0 :     .sin_port        = fd_ushort_bswap( ctx->server_tcp_port )
      75           0 :   };
      76           0 :   int err = connect( ctx->tcp_sock, fd_type_pun_const( &addr ), sizeof(struct sockaddr_in) );
      77             :   /* FD_LIKELY is used here as EINPROGRESS is expected even to local tcp ports */
      78           0 :   if( FD_LIKELY( err==-1 ) ) {
      79           0 :     return errno;
      80           0 :   }
      81           0 :   return 0;
      82           0 : }
      83             : 
      84             : static int
      85           0 : fd_bundle_client_get_connect_result( fd_bundle_tile_t const * ctx ) {
      86           0 :   int so_err = 0;
      87           0 :   socklen_t so_err_sz = sizeof(so_err);
      88           0 :   if( FD_UNLIKELY( getsockopt( ctx->tcp_sock, SOL_SOCKET, SO_ERROR, &so_err, &so_err_sz )==-1 ) ) {
      89           0 :     return errno;
      90           0 :   }
      91           0 :   return so_err;
      92           0 : }
      93             : 
      94             : static void
      95           0 : fd_bundle_client_create_conn( fd_bundle_tile_t * ctx ) {
      96           0 :   fd_bundle_client_reset( ctx );
      97             : 
      98             :   /* FIXME IPv6 support */
      99           0 :   fd_addrinfo_t hints = {0};
     100           0 :   hints.ai_family = AF_INET;
     101           0 :   fd_addrinfo_t * res = NULL;
     102           0 :   uchar scratch[ 4096 ];
     103           0 :   void * pscratch = scratch;
     104           0 :   int err = fd_getaddrinfo( ctx->server_fqdn, &hints, &res, &pscratch, sizeof(scratch) );
     105           0 :   if( FD_UNLIKELY( err ) ) {
     106           0 :     FD_LOG_WARNING(( "fd_getaddrinfo `%s` failed (%d-%s)", ctx->server_fqdn, err, fd_gai_strerror( err ) ));
     107           0 :     fd_bundle_client_reset( ctx );
     108           0 :     ctx->metrics.transport_fail_cnt++;
     109           0 :     return;
     110           0 :   }
     111           0 :   uint const ip4_addr = ((struct sockaddr_in *)res->ai_addr)->sin_addr.s_addr;
     112           0 :   ctx->server_ip4_addr = ip4_addr;
     113             : 
     114           0 :   int tcp_sock = socket( AF_INET, SOCK_STREAM|SOCK_CLOEXEC, 0 );
     115           0 :   if( FD_UNLIKELY( tcp_sock<0 ) ) {
     116           0 :     FD_LOG_ERR(( "socket(AF_INET,SOCK_STREAM|SOCK_CLOEXEC,0) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     117           0 :   }
     118           0 :   ctx->tcp_sock = tcp_sock;
     119             : 
     120           0 :   if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_SOCKET, SO_RCVBUF, &ctx->so_rcvbuf, sizeof(int) ) ) ) {
     121           0 :     FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_RCVBUF,%i) failed (%i-%s)", ctx->so_rcvbuf, errno, fd_io_strerror( errno ) ));
     122           0 :   }
     123             : 
     124           0 :   int tcp_nodelay = 1;
     125           0 :   if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_TCP, TCP_NODELAY, &tcp_nodelay, sizeof(int) ) ) ) {
     126           0 :     FD_LOG_ERR(( "setsockopt failed (%d-%s)", errno, fd_io_strerror( errno ) ));
     127           0 :   }
     128             : 
     129           0 :   if( FD_UNLIKELY( fcntl( tcp_sock, F_SETFL, O_NONBLOCK )==-1 ) ) {
     130           0 :     FD_LOG_ERR(( "fcntl(tcp_sock,F_SETFL,O_NONBLOCK) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     131           0 :   }
     132             : 
     133           0 :   char const * scheme = "http";
     134           0 : # if FD_HAS_OPENSSL
     135           0 :   if( ctx->is_ssl ) scheme = "https";
     136           0 : # endif
     137             : 
     138           0 :   FD_LOG_INFO(( "Connecting to %s://" FD_IP4_ADDR_FMT ":%hu (%.*s)",
     139           0 :                 scheme,
     140           0 :                 FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
     141           0 :                 (int)ctx->server_sni_len, ctx->server_sni ));
     142             : 
     143           0 :   int connect_err = fd_bundle_client_do_connect( ctx, ip4_addr );
     144             :   /* FD_LIKELY as EINPROGRESS is expected */
     145           0 :   if( FD_LIKELY( connect_err ) ) {
     146           0 :     if( FD_UNLIKELY( connect_err!=EINPROGRESS ) ) {
     147           0 :       FD_LOG_WARNING(( "connect(tcp_sock," FD_IP4_ADDR_FMT ":%u) failed (%i-%s)",
     148           0 :                       FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
     149           0 :                       connect_err, fd_io_strerror( connect_err ) ));
     150           0 :       fd_bundle_client_reset( ctx );
     151           0 :       ctx->metrics.transport_fail_cnt++;
     152           0 :       return;
     153           0 :     }
     154           0 :   }
     155             : 
     156           0 : # if FD_HAS_OPENSSL
     157           0 :   if( ctx->is_ssl ) {
     158           0 :     BIO * bio = fd_openssl_bio_new_socket( ctx->tcp_sock, BIO_NOCLOSE );
     159           0 :     if( FD_UNLIKELY( !bio ) ) {
     160           0 :       FD_LOG_ERR(( "fd_openssl_bio_new_socket failed" ));
     161           0 :     }
     162             : 
     163           0 :     SSL * ssl = SSL_new( ctx->ssl_ctx );
     164           0 :     if( FD_UNLIKELY( !ssl ) ) {
     165           0 :       FD_LOG_ERR(( "SSL_new failed" ));
     166           0 :     }
     167             : 
     168           0 :     SSL_set_bio( ssl, bio, bio ); /* moves ownership of bio */
     169           0 :     SSL_set_connect_state( ssl );
     170             : 
     171             :     /* Indicate to endpoint which server name we want */
     172           0 :     if( FD_UNLIKELY( !SSL_set_tlsext_host_name( ssl, ctx->server_sni ) ) ) {
     173           0 :       FD_LOG_ERR(( "SSL_set_tlsext_host_name failed" ));
     174           0 :     }
     175             : 
     176             :     /* Enable hostname verification */
     177           0 :     if( FD_UNLIKELY( !SSL_set1_host( ssl, ctx->server_sni ) ) ) {
     178           0 :       FD_LOG_ERR(( "SSL_set1_host failed" ));
     179           0 :     }
     180             : 
     181           0 :     ctx->ssl = ssl;
     182           0 :   }
     183           0 : # endif /* FD_HAS_OPENSSL */
     184             : 
     185           0 :   fd_grpc_client_reset( ctx->grpc_client );
     186           0 :   fd_keepalive_init( ctx->keepalive, ctx->rng, ctx->keepalive_interval, ctx->keepalive_interval, fd_bundle_now() );
     187           0 : }
     188             : 
     189             : static int
     190             : fd_bundle_client_drive_io( fd_bundle_tile_t * ctx,
     191          27 :                            int *              charge_busy ) {
     192          27 : # if FD_HAS_OPENSSL
     193          27 :   if( ctx->is_ssl ) {
     194           0 :     return fd_grpc_client_rxtx_ossl( ctx->grpc_client, ctx->ssl, charge_busy );
     195           0 :   }
     196          27 : # endif /* FD_HAS_OPENSSL */
     197             : 
     198          27 :   return fd_grpc_client_rxtx_socket( ctx->grpc_client, ctx->tcp_sock, charge_busy );
     199          27 : }
     200             : 
     201             : static void
     202           3 : fd_bundle_client_request_builder_info( fd_bundle_tile_t * ctx ) {
     203           3 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     204             : 
     205           3 :   block_engine_BlockBuilderFeeInfoRequest req = block_engine_BlockBuilderFeeInfoRequest_init_default;
     206           3 :   static char const path[] = "/block_engine.BlockEngineValidator/GetBlockBuilderFeeInfo";
     207           3 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     208           3 :       ctx->grpc_client,
     209           3 :       path, sizeof(path)-1,
     210           3 :       FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo,
     211           3 :       &block_engine_BlockBuilderFeeInfoRequest_msg, &req,
     212           3 :       ctx->auther.access_token, ctx->auther.access_token_sz,
     213           3 :       0 /* is_streaming */
     214           3 :   );
     215           3 :   if( FD_UNLIKELY( !request ) ) return;
     216           3 :   fd_grpc_client_deadline_set(
     217           3 :       request,
     218           3 :       FD_GRPC_DEADLINE_RX_END,
     219           3 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     220             : 
     221           3 :   ctx->builder_info_wait = 1;
     222           3 : }
     223             : 
     224             : static void
     225           3 : fd_bundle_client_subscribe_packets( fd_bundle_tile_t * ctx ) {
     226           3 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     227             : 
     228           3 :   block_engine_SubscribePacketsRequest req = block_engine_SubscribePacketsRequest_init_default;
     229           3 :   static char const path[] = "/block_engine.BlockEngineValidator/SubscribePackets";
     230           3 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     231           3 :       ctx->grpc_client,
     232           3 :       path, sizeof(path)-1,
     233           3 :       FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets,
     234           3 :       &block_engine_SubscribePacketsRequest_msg, &req,
     235           3 :       ctx->auther.access_token, ctx->auther.access_token_sz,
     236           3 :       0 /* is_streaming */
     237           3 :   );
     238           3 :   if( FD_UNLIKELY( !request ) ) return;
     239           3 :   fd_grpc_client_deadline_set(
     240           3 :       request,
     241           3 :       FD_GRPC_DEADLINE_HEADER,
     242           3 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     243             : 
     244           3 :   ctx->packet_subscription_wait = 1;
     245           3 : }
     246             : 
     247             : static void
     248           3 : fd_bundle_client_subscribe_bundles( fd_bundle_tile_t * ctx ) {
     249           3 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     250             : 
     251           3 :   block_engine_SubscribeBundlesRequest req = block_engine_SubscribeBundlesRequest_init_default;
     252           3 :   static char const path[] = "/block_engine.BlockEngineValidator/SubscribeBundles";
     253           3 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     254           3 :       ctx->grpc_client,
     255           3 :       path, sizeof(path)-1,
     256           3 :       FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles,
     257           3 :       &block_engine_SubscribeBundlesRequest_msg, &req,
     258           3 :       ctx->auther.access_token, ctx->auther.access_token_sz,
     259           3 :       0 /* is_streaming */
     260           3 :   );
     261           3 :   if( FD_UNLIKELY( !request ) ) return;
     262           3 :   fd_grpc_client_deadline_set(
     263           3 :       request,
     264           3 :       FD_GRPC_DEADLINE_HEADER,
     265           3 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     266             : 
     267           3 :   ctx->bundle_subscription_wait = 1;
     268           3 : }
     269             : 
     270             : void
     271           3 : fd_bundle_client_send_ping( fd_bundle_tile_t * ctx ) {
     272           3 :   if( FD_UNLIKELY( !ctx->grpc_client ) ) return; /* no client */
     273           3 :   fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
     274           3 :   if( FD_UNLIKELY( !conn ) ) return; /* no conn */
     275           3 :   if( FD_UNLIKELY( conn->flags ) ) return; /* conn busy */
     276           3 :   fd_h2_rbuf_t * rbuf_tx = fd_grpc_client_rbuf_tx( ctx->grpc_client );
     277             : 
     278           3 :   if( FD_LIKELY( fd_h2_tx_ping( conn, rbuf_tx ) ) ) {
     279           3 :     long now = fd_bundle_now();
     280           3 :     fd_keepalive_tx( ctx->keepalive, ctx->rng, now );
     281           3 :     FD_LOG_DEBUG(( "Keepalive TX (deadline=+%gs)", (double)( ctx->keepalive->ts_deadline-now )/1e9 ));
     282           3 :   }
     283           3 : }
     284             : 
     285             : int
     286             : fd_bundle_client_step_reconnect( fd_bundle_tile_t * ctx,
     287          18 :                                  long               now ) {
     288             :   /* Drive auth */
     289          18 :   if( FD_UNLIKELY( ctx->auther.needs_poll ) ) {
     290           0 :     fd_bundle_auther_poll( &ctx->auther, ctx->grpc_client, ctx->keyguard_client );
     291           0 :     return 1;
     292           0 :   }
     293          18 :   if( FD_UNLIKELY( ctx->auther.state!=FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) return 0;
     294             : 
     295             :   /* Request block builder info */
     296          18 :   int const builder_info_expired = ( ctx->builder_info_valid_until - now )<0;
     297          18 :   if( FD_UNLIKELY( ( ( !ctx->builder_info_avail ) |
     298          18 :                      ( builder_info_expired     ) ) &
     299          18 :                    ( !ctx->builder_info_wait      ) ) ) {
     300           3 :     fd_bundle_client_request_builder_info( ctx );
     301           3 :     return 1;
     302           3 :   }
     303             : 
     304             :   /* Subscribe to packets */
     305          15 :   if( FD_UNLIKELY( !ctx->packet_subscription_live && !ctx->packet_subscription_wait ) ) {
     306           3 :     fd_bundle_client_subscribe_packets( ctx );
     307           3 :     return 1;
     308           3 :   }
     309             : 
     310             :   /* Subscribe to bundles */
     311          12 :   if( FD_UNLIKELY( !ctx->bundle_subscription_live && !ctx->bundle_subscription_wait ) ) {
     312           3 :     fd_bundle_client_subscribe_bundles( ctx );
     313           3 :     return 1;
     314           3 :   }
     315             : 
     316             :   /* Send a PING */
     317           9 :   if( FD_UNLIKELY( fd_keepalive_should_tx( ctx->keepalive, now ) ) ) {
     318           3 :     fd_bundle_client_send_ping( ctx );
     319           3 :     return 1;
     320           3 :   }
     321             : 
     322           6 :   return 0;
     323           9 : }
     324             : 
     325             : static void
     326             : fd_bundle_client_step1( fd_bundle_tile_t * ctx,
     327          30 :                         int *              charge_busy ) {
     328             : 
     329             :   /* Wait for TCP socket to connect */
     330          30 :   if( FD_UNLIKELY( !ctx->tcp_sock_connected ) ) {
     331           0 :     if( FD_UNLIKELY( ctx->tcp_sock < 0 ) ) goto reconnect;
     332             : 
     333           0 :     struct pollfd pfds[1] = {
     334           0 :       { .fd = ctx->tcp_sock, .events = POLLOUT }
     335           0 :     };
     336           0 :     int poll_res = fd_syscall_poll( pfds, 1, 0 );
     337           0 :     if( FD_UNLIKELY( poll_res<0 ) ) {
     338           0 :       FD_LOG_ERR(( "fd_syscall_poll(tcp_sock) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     339           0 :     }
     340           0 :     if( poll_res==0 ) return;
     341             : 
     342           0 :     int connect_result = 0;
     343           0 :     if( pfds[0].revents & (POLLERR|POLLHUP) ) {
     344           0 :       connect_result = fd_bundle_client_get_connect_result( ctx );
     345           0 :     connect_failed:
     346           0 :       FD_LOG_INFO(( "Bundle gRPC connect attempt failed (%i-%s)", connect_result, fd_io_strerror( connect_result ) ));
     347           0 :       fd_bundle_client_reset( ctx );
     348           0 :       ctx->metrics.transport_fail_cnt++;
     349           0 :       *charge_busy = 1;
     350           0 :       return;
     351           0 :     }
     352           0 :     if( pfds[0].revents & POLLOUT ) {
     353           0 :       connect_result = fd_bundle_client_get_connect_result( ctx );
     354           0 :       if( FD_UNLIKELY( connect_result!=0 ) ) {
     355           0 :         goto connect_failed;
     356           0 :       }
     357           0 :       FD_LOG_DEBUG(( "Bundle TCP socket connected" ));
     358           0 :       ctx->tcp_sock_connected = 1;
     359           0 :       *charge_busy = 1;
     360           0 :       return;
     361           0 :     }
     362           0 :     return;
     363           0 :   }
     364             : 
     365             :   /* gRPC conn died? */
     366          30 :   if( FD_UNLIKELY( !ctx->grpc_client ) ) {
     367           0 :     long sleep_start;
     368           0 :   reconnect:
     369           0 :     sleep_start = fd_bundle_now();
     370           0 :     if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, sleep_start ) ) ) {
     371           0 :       long wait_dur = ctx->backoff_until - sleep_start;
     372           0 :       fd_log_sleep( fd_long_min( wait_dur, 1e6 ) );
     373           0 :       return;
     374           0 :     }
     375           0 :     fd_bundle_client_create_conn( ctx );
     376           0 :     *charge_busy = 1;
     377           0 :     return;
     378           0 :   }
     379             : 
     380             :   /* Did a HTTP/2 PING time out */
     381          30 :   long check_ts = ctx->cached_ts = fd_bundle_now();
     382          30 :   if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, check_ts ) ) ) {
     383           3 :     FD_LOG_WARNING(( "Bundle gRPC timed out (HTTP/2 PING went unanswered for %.2f seconds)",
     384           3 :                      (double)( check_ts - ctx->keepalive->ts_last_tx )/1e9 ));
     385           3 :     ctx->keepalive->inflight = 0;
     386           3 :     ctx->defer_reset = 1;
     387           3 :     *charge_busy = 1;
     388           3 :     return;
     389           3 :   }
     390             : 
     391             :   /* Drive I/O, SSL handshake, and any inflight requests */
     392          27 :   if( FD_UNLIKELY( -1==fd_bundle_client_drive_io( ctx, charge_busy ) || ctx->defer_reset /* new error? */ ) ) {
     393           0 :     fd_bundle_client_reset( ctx );
     394           0 :     ctx->metrics.transport_fail_cnt++;
     395           0 :     *charge_busy = 1;
     396           0 :     return;
     397           0 :   }
     398             : 
     399             :   /* Are we ready to issue a new request? */
     400          27 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     401          18 :   long io_ts = fd_bundle_now();
     402          18 :   if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, io_ts ) ) ) return;
     403             : 
     404          18 :   *charge_busy |= fd_bundle_client_step_reconnect( ctx, io_ts );
     405          18 : }
     406             : 
     407             : static void
     408          30 : fd_bundle_client_log_status( fd_bundle_tile_t * ctx ) {
     409          30 :   int status = fd_bundle_client_status( ctx );
     410             : 
     411          30 :   int const connected_now    = ( status==FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTED );
     412          30 :   int const connected_before = ( ctx->bundle_status_logged==FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTED );
     413             : 
     414          30 :   if( FD_UNLIKELY( connected_now!=connected_before ) ) {
     415           3 :     long ts = fd_log_wallclock();
     416           3 :     if( FD_LIKELY( ts-(ctx->last_bundle_status_log_nanos) >= (long)1e6 ) ) {
     417           3 :       if( connected_now ) {
     418           3 :         FD_LOG_NOTICE(( "Connected to bundle server" ));
     419           3 :       } else {
     420           0 :         FD_LOG_WARNING(( "Disconnected from bundle server" ));
     421           0 :       }
     422           3 :       ctx->last_bundle_status_log_nanos = ts;
     423           3 :       ctx->bundle_status_logged = (uchar)status;
     424           3 :     }
     425           3 :   }
     426          30 : }
     427             : 
     428             : void
     429             : fd_bundle_client_step( fd_bundle_tile_t * ctx,
     430          30 :                        int *              charge_busy ) {
     431             :   /* Edge-trigger logging with rate limiting */
     432          30 :   fd_bundle_client_step1( ctx, charge_busy );
     433          30 :   fd_bundle_client_log_status( ctx );
     434          30 : }
     435             : 
     436             : void
     437             : fd_bundle_tile_backoff( fd_bundle_tile_t * ctx,
     438          12 :                         long               now ) {
     439          12 :   uint iter = ctx->backoff_iter;
     440          12 :   if( now < ctx->backoff_reset ) iter = 0U;
     441          12 :   iter++;
     442             : 
     443             :   /* FIXME proper backoff */
     444          12 :   long wait_ns = (long)2e9;
     445          12 :   wait_ns = (long)( fd_rng_ulong( ctx->rng ) & ( (1UL<<fd_ulong_find_msb_w_default( (ulong)wait_ns, 0 ))-1UL ) );
     446             : 
     447          12 :   ctx->backoff_until = now +   wait_ns;
     448          12 :   ctx->backoff_reset = now + 2*wait_ns;
     449             : 
     450          12 :   ctx->backoff_iter = iter;
     451          12 : }
     452             : 
     453             : static void
     454           0 : fd_bundle_client_grpc_conn_established( void * app_ctx ) {
     455           0 :   (void)app_ctx;
     456           0 :   FD_LOG_INFO(( "Bundle gRPC connection established" ));
     457           0 : }
     458             : 
     459             : static void
     460             : fd_bundle_client_grpc_conn_dead( void * app_ctx,
     461             :                                  uint   h2_err,
     462           0 :                                  int    closed_by ) {
     463           0 :   fd_bundle_tile_t * ctx = app_ctx;
     464           0 :   FD_LOG_INFO(( "Bundle gRPC connection closed %s (%u-%s)",
     465           0 :                 closed_by ? "by peer" : "due to error",
     466           0 :                 h2_err, fd_h2_strerror( h2_err ) ));
     467           0 :   ctx->defer_reset = 1;
     468           0 : }
     469             : 
     470             : /* Buffers a bundle transaction for deferred publishing by after_credit. */
     471             : 
     472             : static void
     473             : fd_bundle_tile_publish_bundle_txn(
     474             :     fd_bundle_tile_t * ctx,
     475             :     void const *       txn,
     476             :     ulong              txn_sz,  /* <=FD_TXN_MTU */
     477             :     ulong              bundle_txn_cnt,
     478             :     uint               source_ipv4
     479          72 : ) {
     480          72 :   if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
     481           0 :     ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
     482           0 :     return;
     483           0 :   }
     484             : 
     485          72 :   if( FD_UNLIKELY( pending_txn_full( ctx->pending_txns ) ) ) {
     486           0 :     ctx->metrics.backpressure_drop_cnt++;
     487           0 :     return;
     488           0 :   }
     489             : 
     490          72 :   fd_bundle_pending_txn_t * entry = pending_txn_push_tail_nocopy( ctx->pending_txns );
     491          72 :   fd_memcpy( entry->payload, txn, txn_sz );
     492          72 :   entry->payload_sz     = (ushort)txn_sz;
     493          72 :   entry->source_ipv4    = source_ipv4;
     494          72 :   entry->sig            = 1UL;
     495          72 :   entry->bundle_seq     = ctx->bundle_seq;
     496          72 :   entry->bundle_txn_cnt = bundle_txn_cnt;
     497          72 :   entry->commission     = (uchar)ctx->builder_commission;
     498          72 :   fd_memcpy( entry->commission_pubkey, ctx->builder_pubkey, 32UL );
     499          72 :   ctx->metrics.txn_received_cnt++;
     500          72 : }
     501             : 
     502             : /* Buffers a regular transaction for deferred publishing by after_credit. */
     503             : 
     504             : static void
     505             : fd_bundle_tile_publish_txn(
     506             :     fd_bundle_tile_t * ctx,
     507             :     void const *       txn,
     508             :     ulong              txn_sz,  /* <=FD_TXN_MTU */
     509             :     uint               source_ipv4
     510          42 : ) {
     511          42 :   if( FD_UNLIKELY( pending_txn_full( ctx->pending_txns ) ) ) {
     512           3 :     ctx->metrics.backpressure_drop_cnt++;
     513           3 :     return;
     514           3 :   }
     515             : 
     516          39 :   fd_bundle_pending_txn_t * entry = pending_txn_push_tail_nocopy( ctx->pending_txns );
     517          39 :   fd_memcpy( entry->payload, txn, txn_sz );
     518          39 :   entry->payload_sz     = (ushort)txn_sz;
     519          39 :   entry->source_ipv4    = source_ipv4;
     520          39 :   entry->sig            = 0UL;
     521          39 :   entry->bundle_seq     = 0UL;
     522          39 :   entry->bundle_txn_cnt = 1UL;
     523          39 :   entry->commission     = 0U;
     524          39 :   fd_memset( entry->commission_pubkey, 0, 32UL );
     525          39 :   ctx->metrics.txn_received_cnt++;
     526          39 : }
     527             : 
     528             : /* Called for each transaction in a bundle.  Simply counts up
     529             :    bundle_txn_cnt, but does not publish anything. */
     530             : 
     531             : static bool
     532             : fd_bundle_client_visit_pb_bundle_txn_preflight(
     533             :     pb_istream_t *     istream,
     534             :     pb_field_t const * field,
     535             :     void **            arg
     536          90 : ) {
     537          90 :   (void)istream; (void)field;
     538          90 :   fd_bundle_tile_t * ctx = *arg;
     539          90 :   ctx->bundle_txn_cnt++;
     540          90 :   return true;
     541          90 : }
     542             : 
     543             : /* Called for each transaction in a bundle.  Publishes each transaction
     544             :    to the tango message bus. */
     545             : 
     546             : static bool
     547             : fd_bundle_client_visit_pb_bundle_txn(
     548             :     pb_istream_t *     istream,
     549             :     pb_field_t const * field,
     550             :     void **            arg
     551          72 : ) {
     552          72 :   (void)field;
     553          72 :   fd_bundle_tile_t * ctx = *arg;
     554             : 
     555          72 :   packet_Packet packet = packet_Packet_init_default;
     556          72 :   if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
     557           0 :     ctx->metrics.decode_fail_cnt++;
     558           0 :     FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
     559           0 :     return false;
     560           0 :   }
     561             : 
     562          72 :   if( FD_UNLIKELY( packet.data.size == 0 ) ) {
     563           0 :     FD_LOG_WARNING(( "Bundle server delivered an empty packet, ignoring" ));
     564           0 :     return true;
     565           0 :   }
     566             : 
     567          72 :   if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
     568           0 :     FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
     569           0 :     return true;
     570           0 :   }
     571             : 
     572          72 :   uint _ip4; uint ip4 = fd_uint_if( packet.has_meta, fd_cstr_to_ip4_addr( packet.meta.addr, &_ip4 ) ? _ip4 : ctx->server_ip4_addr, ctx->server_ip4_addr );
     573          72 :   fd_bundle_tile_publish_bundle_txn(
     574          72 :       ctx,
     575          72 :       packet.data.bytes, packet.data.size,
     576          72 :       ctx->bundle_txn_cnt,
     577          72 :       ip4
     578          72 :   );
     579             : 
     580          72 :   return true;
     581          72 : }
     582             : 
     583             : static void
     584             : fd_bundle_client_sample_rx_delay(
     585             :     fd_bundle_tile_t *                ctx,
     586             :     google_protobuf_Timestamp const * ts
     587          45 : ) {
     588          45 :   ulong tsorig = (ulong)ts->seconds*(ulong)1e9 + (ulong)ts->nanos;
     589          45 :   fd_histf_sample( ctx->metrics.msg_rx_delay, fd_ulong_sat_sub( (ulong)ctx->cached_ts, tsorig ) );
     590          45 : }
     591             : 
     592             : /* Called for each BundleUuid in a SubscribeBundlesResponse. */
     593             : 
     594             : static bool
     595             : fd_bundle_client_visit_pb_bundle_uuid(
     596             :     pb_istream_t *     istream,
     597             :     pb_field_t const * field,
     598             :     void **            arg
     599          24 : ) {
     600          24 :   (void)field;
     601          24 :   fd_bundle_tile_t * ctx = *arg;
     602             : 
     603             :   /* Reset bundle state */
     604             : 
     605          24 :   ctx->bundle_txn_cnt = 0UL;
     606             : 
     607             :   /* Do two decode passes.  This is required because we need to know the
     608             :      number of transactions in a bundle ahead of time.  However, due to
     609             :      the Protobuf wire encoding, we don't know the number of txns that
     610             :      will come until we've parsed everything.
     611             : 
     612             :      First pass: Count number of bundles. */
     613             : 
     614          24 :   pb_istream_t peek = *istream;
     615          24 :   bundle_BundleUuid bundle = bundle_BundleUuid_init_default;
     616          24 :   bundle.bundle.packets = (pb_callback_t) {
     617          24 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_txn_preflight,
     618          24 :     .arg          = ctx
     619          24 :   };
     620          24 :   if( FD_UNLIKELY( !pb_decode( &peek, &bundle_BundleUuid_msg, &bundle ) ) ) {
     621           0 :     ctx->metrics.decode_fail_cnt++;
     622           0 :     FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed: %s", peek.errmsg ));
     623           0 :     return false;
     624           0 :   }
     625             : 
     626             :   /* At this opint, ctx->bundle_txn_cnt is correctly set.  Too many txns
     627             :      is treated as a NOP.
     628             : 
     629             :      Second pass: Actually publish bundle packets */
     630             : 
     631          24 :   if( FD_UNLIKELY( ctx->bundle_txn_cnt>FD_BUNDLE_CLIENT_MAX_TXN_PER_BUNDLE ) ) return true;
     632             : 
     633          21 :   if( FD_UNLIKELY( pending_txn_avail( ctx->pending_txns )<ctx->bundle_txn_cnt ) ) {
     634           0 :     ctx->metrics.backpressure_drop_cnt += ctx->bundle_txn_cnt;
     635           0 :     return true;
     636           0 :   }
     637             : 
     638          21 :   ctx->bundle_seq++;
     639          21 :   bundle = (bundle_BundleUuid)bundle_BundleUuid_init_default;
     640          21 :   bundle.bundle.packets = (pb_callback_t) {
     641          21 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_txn,
     642          21 :     .arg          = ctx
     643          21 :   };
     644             : 
     645          21 :   ctx->metrics.bundle_received_cnt++;
     646             : 
     647          21 :   if( FD_UNLIKELY( !pb_decode( istream, &bundle_BundleUuid_msg, &bundle ) ) ) {
     648           0 :     ctx->metrics.decode_fail_cnt++;
     649           0 :     FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed (internal error): %s", istream->errmsg ));
     650           0 :     return false;
     651           0 :   }
     652             : 
     653          21 :   fd_bundle_client_sample_rx_delay( ctx, &bundle.bundle.header.ts );
     654             : 
     655          21 :   return true;
     656          21 : }
     657             : 
     658             : /* Handle a SubscribeBundlesResponse from a SubscribeBundles gRPC call. */
     659             : 
     660             : static void
     661             : fd_bundle_client_handle_bundle_batch(
     662             :     fd_bundle_tile_t * ctx,
     663             :     pb_istream_t *     istream
     664          24 : ) {
     665          24 :   if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
     666           3 :     ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
     667           3 :     return;
     668           3 :   }
     669             : 
     670          21 :   block_engine_SubscribeBundlesResponse res = block_engine_SubscribeBundlesResponse_init_default;
     671          21 :   res.bundles = (pb_callback_t) {
     672          21 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_uuid,
     673          21 :     .arg          = ctx
     674          21 :   };
     675          21 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribeBundlesResponse_msg, &res ) ) ) {
     676           0 :     ctx->metrics.decode_fail_cnt++;
     677           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribeBundlesResponse) failed: %s", istream->errmsg ));
     678           0 :     return;
     679           0 :   }
     680          21 : }
     681             : 
     682             : /* Called for each 'Packet' (a regular transaction) of a
     683             :    SubscribePacketsResponse. */
     684             : 
     685             : static bool
     686             : fd_bundle_client_visit_pb_packet(
     687             :     pb_istream_t *     istream,
     688             :     pb_field_t const * field,
     689             :     void **            arg
     690          42 : ) {
     691          42 :   (void)field;
     692          42 :   fd_bundle_tile_t * ctx = *arg;
     693             : 
     694          42 :   packet_Packet packet = packet_Packet_init_default;
     695          42 :   if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
     696           0 :     ctx->metrics.decode_fail_cnt++;
     697           0 :     FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
     698           0 :     return false;
     699           0 :   }
     700             : 
     701          42 :   if( FD_UNLIKELY( packet.data.size == 0 ) ) {
     702           0 :     FD_LOG_WARNING(( "Bundle server delivered an empty packet, ignoring" ));
     703           0 :     return true;
     704           0 :   }
     705             : 
     706          42 :   if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
     707           0 :     FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
     708           0 :     return true;
     709           0 :   }
     710             : 
     711             : 
     712          42 :   uint _ip4; uint ip4 = fd_uint_if( packet.has_meta, fd_cstr_to_ip4_addr( packet.meta.addr, &_ip4 ) ? _ip4 : 0U, 0U );
     713          42 :   fd_bundle_tile_publish_txn( ctx, packet.data.bytes, packet.data.size, ip4 );
     714          42 :   ctx->metrics.packet_received_cnt++;
     715             : 
     716          42 :   return true;
     717          42 : }
     718             : 
     719             : /* Handle a SubscribePacketsResponse from a SubscribePackets gRPC call. */
     720             : 
     721             : static void
     722             : fd_bundle_client_handle_packet_batch(
     723             :     fd_bundle_tile_t * ctx,
     724             :     pb_istream_t *     istream
     725          24 : ) {
     726          24 :   block_engine_SubscribePacketsResponse res = block_engine_SubscribePacketsResponse_init_default;
     727          24 :   res.batch.packets = (pb_callback_t) {
     728          24 :     .funcs.decode = fd_bundle_client_visit_pb_packet,
     729          24 :     .arg          = ctx
     730          24 :   };
     731          24 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribePacketsResponse_msg, &res ) ) ) {
     732           0 :     ctx->metrics.decode_fail_cnt++;
     733           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribePacketsResponse) failed" ));
     734           0 :     return;
     735           0 :   }
     736             : 
     737          24 :   fd_bundle_client_sample_rx_delay( ctx, &res.header.ts );
     738          24 : }
     739             : 
     740             : /* Handle a BlockBuilderFeeInfoResponse from a GetBlockBuilderFeeInfo
     741             :    gRPC call. */
     742             : 
     743             : static void
     744             : fd_bundle_client_handle_builder_fee_info(
     745             :     fd_bundle_tile_t * ctx,
     746             :     pb_istream_t *     istream
     747           9 : ) {
     748           9 :   block_engine_BlockBuilderFeeInfoResponse res = block_engine_BlockBuilderFeeInfoResponse_init_default;
     749           9 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_BlockBuilderFeeInfoResponse_msg, &res ) ) ) {
     750           0 :     ctx->metrics.decode_fail_cnt++;
     751           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.BlockBuilderFeeInfoResponse) failed" ));
     752           0 :     return;
     753           0 :   }
     754           9 :   if( FD_UNLIKELY( res.commission > 100 ) ) {
     755           3 :     ctx->metrics.decode_fail_cnt++;
     756           3 :     FD_LOG_WARNING(( "BlockBuilderFeeInfoResponse commission out of range (0-100): %lu", res.commission ));
     757           3 :     return;
     758           3 :   }
     759             : 
     760           6 :   uchar decoded_builder_pubkey[ 32 ];
     761           6 :   if( FD_UNLIKELY( !fd_base58_decode_32( res.pubkey, decoded_builder_pubkey ) ) ) {
     762           3 :     FD_LOG_HEXDUMP_WARNING(( "Invalid pubkey in BlockBuilderFeeInfoResponse", res.pubkey, strnlen( res.pubkey, sizeof(res.pubkey) ) ));
     763           3 :     return;
     764           3 :   }
     765             : 
     766           3 :   ctx->builder_commission = (uchar)res.commission; /* Apply update atomically */
     767           3 :   fd_memcpy( ctx->builder_pubkey, decoded_builder_pubkey, sizeof(ctx->builder_pubkey) );
     768             : 
     769           3 :   long validity_duration_ns = (long)( 60e9 * 5. ); /* 5 minutes */
     770           3 :   ctx->builder_info_avail = 1;
     771           3 :   ctx->builder_info_valid_until = fd_bundle_now() + validity_duration_ns;
     772           3 : }
     773             : 
     774             : static void
     775             : fd_bundle_client_grpc_tx_complete(
     776             :     void * app_ctx,
     777             :     ulong  request_ctx
     778           9 : ) {
     779           9 :   (void)app_ctx; (void)request_ctx;
     780           9 : }
     781             : 
     782             : void
     783             : fd_bundle_client_grpc_rx_start(
     784             :     void * app_ctx,
     785             :     ulong  request_ctx
     786           9 : ) {
     787           9 :   fd_bundle_tile_t * ctx = app_ctx;
     788           9 :   switch( request_ctx ) {
     789           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     790           3 :     ctx->packet_subscription_live = 1;
     791           3 :     ctx->packet_subscription_wait = 0;
     792           3 :     break;
     793           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     794           3 :     ctx->bundle_subscription_live = 1;
     795           3 :     ctx->bundle_subscription_wait = 0;
     796           3 :     break;
     797           9 :   }
     798           9 : }
     799             : 
     800             : void
     801             : fd_bundle_client_grpc_rx_msg(
     802             :     void *       app_ctx,
     803             :     void const * protobuf,
     804             :     ulong        protobuf_sz,
     805             :     ulong        request_ctx
     806          57 : ) {
     807          57 :   fd_bundle_tile_t * ctx = app_ctx;
     808          57 :   ctx->metrics.proto_received_bytes += protobuf_sz;
     809          57 :   pb_istream_t istream = pb_istream_from_buffer( protobuf, protobuf_sz );
     810          57 :   switch( request_ctx ) {
     811           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     812           0 :     if( FD_UNLIKELY( !fd_bundle_auther_handle_challenge_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
     813           0 :       ctx->metrics.decode_fail_cnt++;
     814           0 :       fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     815           0 :     }
     816           0 :     break;
     817           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     818           0 :     if( FD_UNLIKELY( !fd_bundle_auther_handle_tokens_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
     819           0 :       ctx->metrics.decode_fail_cnt++;
     820           0 :       fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     821           0 :     }
     822           0 :     break;
     823          24 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     824          24 :     fd_bundle_client_handle_bundle_batch( ctx, &istream );
     825          24 :     break;
     826          24 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     827          24 :     fd_bundle_client_handle_packet_batch( ctx, &istream );
     828          24 :     break;
     829           9 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
     830           9 :     fd_bundle_client_handle_builder_fee_info( ctx, &istream );
     831           9 :     break;
     832           0 :   default:
     833           0 :     FD_LOG_ERR(( "Received unexpected gRPC message (request_ctx=%lu)", request_ctx ));
     834          57 :   }
     835          57 : }
     836             : 
     837             : static void
     838             : fd_bundle_client_request_failed( fd_bundle_tile_t * ctx,
     839           0 :                                  ulong              request_ctx ) {
     840           0 :   fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     841           0 :   switch( request_ctx ) {
     842           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     843           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     844           0 :     fd_bundle_auther_handle_request_fail( &ctx->auther );
     845           0 :     break;
     846           0 :   }
     847           0 : }
     848             : 
     849             : void
     850             : fd_bundle_client_grpc_rx_end(
     851             :     void *                app_ctx,
     852             :     ulong                 request_ctx,
     853             :     fd_grpc_resp_hdrs_t * resp
     854          12 : ) {
     855          12 :   fd_bundle_tile_t * ctx = app_ctx;
     856          12 :   if( FD_UNLIKELY( resp->h2_status!=200 ) ) {
     857           0 :     FD_LOG_WARNING(( "gRPC request failed (HTTP status %u)", resp->h2_status ));
     858           0 :     fd_bundle_client_request_failed( ctx, request_ctx );
     859           0 :     return;
     860           0 :   }
     861             : 
     862          12 :   resp->grpc_msg_len = (uint)fd_url_unescape( resp->grpc_msg, resp->grpc_msg_len );
     863          12 :   if( !resp->grpc_msg_len ) {
     864          12 :     fd_memcpy( resp->grpc_msg, "unknown error", 13 );
     865          12 :     resp->grpc_msg_len = 13;
     866          12 :   }
     867             : 
     868          12 :   switch( request_ctx ) {
     869           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     870           0 :     ctx->packet_subscription_live = 0;
     871           0 :     ctx->packet_subscription_wait = 0;
     872           0 :     fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     873           0 :     ctx->defer_reset = 1;
     874           0 :     FD_LOG_INFO(( "SubscribePackets stream failed (gRPC status %u-%s). Reconnecting ...",
     875           0 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
     876           0 :     return;
     877           9 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     878           9 :     ctx->bundle_subscription_live = 0;
     879           9 :     ctx->bundle_subscription_wait = 0;
     880           9 :     fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     881           9 :     ctx->defer_reset = 1;
     882           9 :     FD_LOG_INFO(( "SubscribeBundles stream failed (gRPC status %u-%s). Reconnecting ...",
     883           9 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
     884           9 :     return;
     885           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
     886           3 :     ctx->builder_info_wait = 0;
     887           3 :     break;
     888           0 :   default:
     889           0 :     break;
     890          12 :   }
     891             : 
     892           3 :   if( FD_UNLIKELY( resp->grpc_status!=FD_GRPC_STATUS_OK ) ) {
     893           0 :     FD_LOG_INFO(( "gRPC request failed (gRPC status %u-%s): %.*s",
     894           0 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
     895           0 :                   (int)resp->grpc_msg_len, resp->grpc_msg ));
     896           0 :     fd_bundle_client_request_failed( ctx, request_ctx );
     897           0 :     if( resp->grpc_status==FD_GRPC_STATUS_UNAUTHENTICATED ||
     898           0 :         resp->grpc_status==FD_GRPC_STATUS_PERMISSION_DENIED ) {
     899           0 :       fd_bundle_auther_reset( &ctx->auther );
     900           0 :     }
     901           0 :     return;
     902           0 :   }
     903           3 : }
     904             : 
     905             : void
     906             : fd_bundle_client_grpc_rx_timeout(
     907             :     void * app_ctx,
     908             :     ulong  request_ctx,  /* FD_BUNDLE_CLIENT_REQ_{...} */
     909             :     int    deadline_kind /* FD_GRPC_DEADLINE_{HEADER|RX_END} */
     910           6 : ) {
     911           6 :   (void)deadline_kind;
     912           6 :   FD_LOG_WARNING(( "Request timed out: %s", fd_bundle_request_ctx_cstr( request_ctx ) ));
     913           6 :   fd_bundle_tile_t * ctx = app_ctx;
     914           6 :   ctx->defer_reset = 1;
     915           6 : }
     916             : 
     917             : static void
     918           3 : fd_bundle_client_grpc_ping_ack( void * app_ctx ) {
     919           3 :   fd_bundle_tile_t * ctx = app_ctx;
     920           3 :   long rtt_sample = fd_keepalive_rx( ctx->keepalive, fd_bundle_now() );
     921           3 :   if( FD_LIKELY( rtt_sample ) ) {
     922           3 :     fd_rtt_sample( ctx->rtt, (float)rtt_sample, 0 );
     923           3 :     FD_LOG_DEBUG(( "Keepalive ACK" ));
     924           3 :   }
     925           3 :   ctx->metrics.ping_ack_cnt++;
     926           3 : }
     927             : 
     928             : fd_grpc_client_callbacks_t fd_bundle_client_grpc_callbacks = {
     929             :   .conn_established = fd_bundle_client_grpc_conn_established,
     930             :   .conn_dead        = fd_bundle_client_grpc_conn_dead,
     931             :   .tx_complete      = fd_bundle_client_grpc_tx_complete,
     932             :   .rx_start         = fd_bundle_client_grpc_rx_start,
     933             :   .rx_msg           = fd_bundle_client_grpc_rx_msg,
     934             :   .rx_end           = fd_bundle_client_grpc_rx_end,
     935             :   .rx_timeout       = fd_bundle_client_grpc_rx_timeout,
     936             :   .ping_ack         = fd_bundle_client_grpc_ping_ack,
     937             : };
     938             : 
     939             : int
     940         174 : fd_bundle_client_status( fd_bundle_tile_t const * ctx ) {
     941         174 :   if( FD_UNLIKELY( ( !ctx->tcp_sock_connected ) |
     942         174 :                    ( !ctx->grpc_client        ) ) ) {
     943           3 :     return FD_BUNDLE_BLOCK_ENGINE_STATUS_DISCONNECTED;
     944           3 :   }
     945             : 
     946         171 :   fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
     947         171 :   if( FD_UNLIKELY( !conn ) ) {
     948           0 :     return FD_BUNDLE_BLOCK_ENGINE_STATUS_DISCONNECTED; /* no conn */
     949           0 :   }
     950         171 :   if( FD_UNLIKELY( conn->flags &
     951         171 :       ( FD_H2_CONN_FLAGS_DEAD |
     952         171 :         FD_H2_CONN_FLAGS_SEND_GOAWAY ) ) ) {
     953           6 :     return FD_BUNDLE_BLOCK_ENGINE_STATUS_DISCONNECTED;
     954           6 :   }
     955             : 
     956         165 :   if( FD_UNLIKELY( conn->flags &
     957         165 :       ( FD_H2_CONN_FLAGS_CLIENT_INITIAL      |
     958         165 :         FD_H2_CONN_FLAGS_WAIT_SETTINGS_ACK_0 |
     959         165 :         FD_H2_CONN_FLAGS_WAIT_SETTINGS_0     |
     960         165 :         FD_H2_CONN_FLAGS_SERVER_INITIAL ) ) ) {
     961          12 :     return FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTING; /* connection is not ready */
     962          12 :   }
     963             : 
     964         153 :   if( FD_UNLIKELY( ctx->auther.state != FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) {
     965          12 :     return FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTING; /* not authenticated */
     966          12 :   }
     967             : 
     968         141 :   if( FD_UNLIKELY( ( !ctx->builder_info_avail       ) |
     969         141 :                    ( !ctx->packet_subscription_live ) |
     970         141 :                    ( !ctx->bundle_subscription_live ) ) ) {
     971          27 :     return FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTING; /* not fully connected */
     972          27 :   }
     973             : 
     974         114 :   if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, fd_bundle_now() ) ) ) {
     975           3 :     return FD_BUNDLE_BLOCK_ENGINE_STATUS_DISCONNECTED; /* possible timeout */
     976           3 :   }
     977             : 
     978         111 :   if( FD_UNLIKELY( !fd_grpc_client_is_connected( ctx->grpc_client ) ) ) {
     979           3 :     return FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTING;
     980           3 :   }
     981             : 
     982             :   /* As far as we know, the bundle connection is alive and well. */
     983         108 :   return FD_BUNDLE_BLOCK_ENGINE_STATUS_CONNECTED;
     984         111 : }
     985             : 
     986             : #undef DISCONNECTED
     987             : #undef CONNECTING
     988             : #undef CONNECTED
     989             : 
     990             : FD_FN_CONST char const *
     991           6 : fd_bundle_request_ctx_cstr( ulong request_ctx ) {
     992           6 :   switch( request_ctx ) {
     993           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     994           0 :     return "GenerateAuthChallenge";
     995           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     996           0 :     return "GenerateAuthTokens";
     997           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     998           0 :     return "SubscribePackets";
     999           6 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
    1000           6 :     return "SubscribeBundles";
    1001           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
    1002           0 :     return "GetBlockBuilderFeeInfo";
    1003           0 :   default:
    1004           0 :     return "unknown";
    1005           6 :   }
    1006           6 : }

Generated by: LCOV version 1.14