LCOV - code coverage report
Current view: top level - disco/bundle - fd_bundle_client.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 420 665 63.2 %
Date: 2025-08-05 05:04:49 Functions: 30 35 85.7 %

          Line data    Source code
       1             : /* fd_bundle_client.c steps gRPC related tasks. */
       2             : 
       3             : #define _GNU_SOURCE /* SOL_TCP */
       4             : #include "fd_bundle_auth.h"
       5             : #include "fd_bundle_tile_private.h"
       6             : #include "proto/block_engine.pb.h"
       7             : #include "proto/bundle.pb.h"
       8             : #include "proto/packet.pb.h"
       9             : #include "../fd_txn_m_t.h"
      10             : #include "../plugin/fd_plugin.h"
      11             : #include "../../waltz/h2/fd_h2_conn.h"
      12             : #include "../../waltz/http/fd_url.h" /* fd_url_unescape */
      13             : #include "../../ballet/base58/fd_base58.h"
      14             : #include "../../ballet/nanopb/pb_decode.h"
      15             : #include "../../util/net/fd_ip4.h"
      16             : 
      17             : #include <fcntl.h>
      18             : #include <errno.h>
      19             : #include <unistd.h> /* close */
      20             : #include <poll.h> /* poll */
      21             : #include <sys/socket.h> /* socket */
      22             : #include <netinet/in.h>
      23             : #include <netinet/ip.h>
      24             : #include <netinet/tcp.h>
      25             : 
      26           9 : #define FD_BUNDLE_CLIENT_REQUEST_TIMEOUT ((long)8e9) /* 8 seconds */
      27             : 
      28             : __attribute__((weak)) long
      29         642 : fd_bundle_now( void ) {
      30         642 :   return fd_log_wallclock();
      31         642 : }
      32             : 
      33             : void
      34           3 : fd_bundle_client_reset( fd_bundle_tile_t * ctx ) {
      35           3 :   if( FD_UNLIKELY( ctx->tcp_sock >= 0 ) ) {
      36           3 :     if( FD_UNLIKELY( 0!=close( ctx->tcp_sock ) ) ) {
      37           0 :       FD_LOG_ERR(( "close(tcp_sock=%i) failed (%i-%s)", ctx->tcp_sock, errno, fd_io_strerror( errno ) ));
      38           0 :     }
      39           3 :     ctx->tcp_sock = -1;
      40           3 :     ctx->tcp_sock_connected = 0;
      41           3 :   }
      42           3 :   ctx->defer_reset = 0;
      43             : 
      44           3 :   ctx->builder_info_avail       = 0;
      45           3 :   ctx->builder_info_wait        = 0;
      46           3 :   ctx->packet_subscription_live = 0;
      47           3 :   ctx->packet_subscription_wait = 0;
      48           3 :   ctx->bundle_subscription_live = 0;
      49           3 :   ctx->bundle_subscription_wait = 0;
      50             : 
      51           3 :   memset( ctx->rtt, 0, sizeof(fd_rtt_estimate_t) );
      52             : 
      53           3 : # if FD_HAS_OPENSSL
      54           3 :   if( FD_UNLIKELY( ctx->ssl ) ) {
      55           0 :     SSL_free( ctx->ssl );
      56           0 :     ctx->ssl = NULL;
      57           0 :   }
      58           3 : # endif
      59             : 
      60           3 :   fd_bundle_tile_backoff( ctx, fd_bundle_now() );
      61             : 
      62           3 :   fd_bundle_auther_reset( &ctx->auther );
      63           3 :   fd_grpc_client_reset( ctx->grpc_client );
      64           3 : }
      65             : 
      66             : static int
      67             : fd_bundle_client_do_connect( fd_bundle_tile_t const * ctx,
      68           0 :                              uint                     ip4_addr ) {
      69           0 :   struct sockaddr_in addr = {
      70           0 :     .sin_family      = AF_INET,
      71           0 :     .sin_addr.s_addr = ip4_addr,
      72           0 :     .sin_port        = fd_ushort_bswap( ctx->server_tcp_port )
      73           0 :   };
      74           0 :   errno = 0;
      75           0 :   connect( ctx->tcp_sock, fd_type_pun_const( &addr ), sizeof(struct sockaddr_in) );
      76           0 :   return errno;
      77           0 : }
      78             : 
      79             : static void
      80           0 : fd_bundle_client_create_conn( fd_bundle_tile_t * ctx ) {
      81           0 :   fd_bundle_client_reset( ctx );
      82             : 
      83             :   /* FIXME IPv6 support */
      84           0 :   fd_addrinfo_t hints = {0};
      85           0 :   hints.ai_family = AF_INET;
      86           0 :   fd_addrinfo_t * res = NULL;
      87           0 :   uchar scratch[ 4096 ];
      88           0 :   void * pscratch = scratch;
      89           0 :   int err = fd_getaddrinfo( ctx->server_fqdn, &hints, &res, &pscratch, sizeof(scratch) );
      90           0 :   if( FD_UNLIKELY( err ) ) {
      91           0 :     FD_LOG_WARNING(( "fd_getaddrinfo `%s` failed (%d-%s)", ctx->server_fqdn, err, fd_gai_strerror( err ) ));
      92           0 :     fd_bundle_client_reset( ctx );
      93           0 :     ctx->metrics.transport_fail_cnt++;
      94           0 :     return;
      95           0 :   }
      96           0 :   uint const ip4_addr = ((struct sockaddr_in *)res->ai_addr)->sin_addr.s_addr;
      97           0 :   ctx->server_ip4_addr = ip4_addr;
      98             : 
      99           0 :   int tcp_sock = socket( AF_INET, SOCK_STREAM|SOCK_CLOEXEC, 0 );
     100           0 :   if( FD_UNLIKELY( tcp_sock<0 ) ) {
     101           0 :     FD_LOG_ERR(( "socket(AF_INET,SOCK_STREAM|SOCK_CLOEXEC,0) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     102           0 :   }
     103           0 :   ctx->tcp_sock = tcp_sock;
     104             : 
     105           0 :   if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_SOCKET, SO_RCVBUF, &ctx->so_rcvbuf, sizeof(int) ) ) ) {
     106           0 :     FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_RCVBUF,%i) failed (%i-%s)", ctx->so_rcvbuf, errno, fd_io_strerror( errno ) ));
     107           0 :   }
     108             : 
     109           0 :   int tcp_nodelay = 1;
     110           0 :   if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_TCP, TCP_NODELAY, &tcp_nodelay, sizeof(int) ) ) ) {
     111           0 :     FD_LOG_ERR(( "setsockopt failed (%d-%s)", errno, fd_io_strerror( errno ) ));
     112           0 :   }
     113             : 
     114           0 :   if( FD_UNLIKELY( fcntl( tcp_sock, F_SETFL, O_NONBLOCK )==-1 ) ) {
     115           0 :     FD_LOG_ERR(( "fcntl(tcp_sock,F_SETFL,O_NONBLOCK) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     116           0 :   }
     117             : 
     118           0 :   char const * scheme = "http";
     119           0 : # if FD_HAS_OPENSSL
     120           0 :   if( ctx->is_ssl ) scheme = "https";
     121           0 : # endif
     122             : 
     123           0 :   FD_LOG_INFO(( "Connecting to %s://" FD_IP4_ADDR_FMT ":%hu (%.*s)",
     124           0 :                 scheme,
     125           0 :                 FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
     126           0 :                 (int)ctx->server_sni_len, ctx->server_sni ));
     127             : 
     128           0 :   int connect_err = fd_bundle_client_do_connect( ctx, ip4_addr );
     129           0 :   if( FD_UNLIKELY( connect_err ) ) {
     130           0 :     if( FD_UNLIKELY( connect_err!=EINPROGRESS ) ) {
     131           0 :       FD_LOG_WARNING(( "connect(tcp_sock," FD_IP4_ADDR_FMT ":%u) failed (%i-%s)",
     132           0 :                       FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
     133           0 :                       connect_err, fd_io_strerror( connect_err ) ));
     134           0 :       fd_bundle_client_reset( ctx );
     135           0 :       ctx->metrics.transport_fail_cnt++;
     136           0 :       return;
     137           0 :     }
     138           0 :   }
     139             : 
     140           0 : # if FD_HAS_OPENSSL
     141           0 :   if( ctx->is_ssl ) {
     142           0 :     BIO * bio = BIO_new_socket( ctx->tcp_sock, BIO_NOCLOSE );
     143           0 :     if( FD_UNLIKELY( !bio ) ) {
     144           0 :       FD_LOG_ERR(( "BIO_new_socket failed" ));
     145           0 :     }
     146             : 
     147           0 :     SSL * ssl = SSL_new( ctx->ssl_ctx );
     148           0 :     if( FD_UNLIKELY( !ssl ) ) {
     149           0 :       FD_LOG_ERR(( "SSL_new failed" ));
     150           0 :     }
     151             : 
     152           0 :     SSL_set_bio( ssl, bio, bio ); /* moves ownership of bio */
     153           0 :     SSL_set_connect_state( ssl );
     154             : 
     155             :     /* Indicate to endpoint which server name we want */
     156           0 :     if( FD_UNLIKELY( !SSL_set_tlsext_host_name( ssl, ctx->server_sni ) ) ) {
     157           0 :       FD_LOG_ERR(( "SSL_set_tlsext_host_name failed" ));
     158           0 :     }
     159             : 
     160             :     /* Enable hostname verification */
     161           0 :     if( FD_UNLIKELY( !SSL_set1_host( ssl, ctx->server_sni ) ) ) {
     162           0 :       FD_LOG_ERR(( "SSL_set1_host failed" ));
     163           0 :     }
     164             : 
     165           0 :     ctx->ssl = ssl;
     166           0 :   }
     167           0 : # endif /* FD_HAS_OPENSSL */
     168             : 
     169           0 :   fd_grpc_client_reset( ctx->grpc_client );
     170           0 :   fd_keepalive_init( ctx->keepalive, ctx->rng, ctx->keepalive_interval, ctx->keepalive_interval, fd_bundle_now() );
     171           0 : }
     172             : 
     173             : static int
     174             : fd_bundle_client_drive_io( fd_bundle_tile_t * ctx,
     175          27 :                            int *              charge_busy ) {
     176          27 : # if FD_HAS_OPENSSL
     177          27 :   if( ctx->is_ssl ) {
     178           0 :     return fd_grpc_client_rxtx_ossl( ctx->grpc_client, ctx->ssl, charge_busy );
     179           0 :   }
     180          27 : # endif /* FD_HAS_OPENSSL */
     181             : 
     182          27 :   return fd_grpc_client_rxtx_socket( ctx->grpc_client, ctx->tcp_sock, charge_busy );
     183          27 : }
     184             : 
     185             : static void
     186           3 : fd_bundle_client_request_builder_info( fd_bundle_tile_t * ctx ) {
     187           3 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     188             : 
     189           3 :   block_engine_BlockBuilderFeeInfoRequest req = block_engine_BlockBuilderFeeInfoRequest_init_default;
     190           3 :   static char const path[] = "/block_engine.BlockEngineValidator/GetBlockBuilderFeeInfo";
     191           3 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     192           3 :       ctx->grpc_client,
     193           3 :       path, sizeof(path)-1,
     194           3 :       FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo,
     195           3 :       &block_engine_BlockBuilderFeeInfoRequest_msg, &req,
     196           3 :       ctx->auther.access_token, ctx->auther.access_token_sz
     197           3 :   );
     198           3 :   if( FD_UNLIKELY( !request ) ) return;
     199           3 :   fd_grpc_client_deadline_set(
     200           3 :       request,
     201           3 :       FD_GRPC_DEADLINE_RX_END,
     202           3 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     203             : 
     204           3 :   ctx->builder_info_wait = 1;
     205           3 : }
     206             : 
     207             : static void
     208           3 : fd_bundle_client_subscribe_packets( fd_bundle_tile_t * ctx ) {
     209           3 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     210             : 
     211           3 :   block_engine_SubscribePacketsRequest req = block_engine_SubscribePacketsRequest_init_default;
     212           3 :   static char const path[] = "/block_engine.BlockEngineValidator/SubscribePackets";
     213           3 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     214           3 :       ctx->grpc_client,
     215           3 :       path, sizeof(path)-1,
     216           3 :       FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets,
     217           3 :       &block_engine_SubscribePacketsRequest_msg, &req,
     218           3 :       ctx->auther.access_token, ctx->auther.access_token_sz
     219           3 :   );
     220           3 :   if( FD_UNLIKELY( !request ) ) return;
     221           3 :   fd_grpc_client_deadline_set(
     222           3 :       request,
     223           3 :       FD_GRPC_DEADLINE_HEADER,
     224           3 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     225             : 
     226           3 :   ctx->packet_subscription_wait = 1;
     227           3 : }
     228             : 
     229             : static void
     230           3 : fd_bundle_client_subscribe_bundles( fd_bundle_tile_t * ctx ) {
     231           3 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     232             : 
     233           3 :   block_engine_SubscribeBundlesRequest req = block_engine_SubscribeBundlesRequest_init_default;
     234           3 :   static char const path[] = "/block_engine.BlockEngineValidator/SubscribeBundles";
     235           3 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     236           3 :       ctx->grpc_client,
     237           3 :       path, sizeof(path)-1,
     238           3 :       FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles,
     239           3 :       &block_engine_SubscribeBundlesRequest_msg, &req,
     240           3 :       ctx->auther.access_token, ctx->auther.access_token_sz
     241           3 :   );
     242           3 :   if( FD_UNLIKELY( !request ) ) return;
     243           3 :   fd_grpc_client_deadline_set(
     244           3 :       request,
     245           3 :       FD_GRPC_DEADLINE_HEADER,
     246           3 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     247             : 
     248           3 :   ctx->bundle_subscription_wait = 1;
     249           3 : }
     250             : 
     251             : void
     252           3 : fd_bundle_client_send_ping( fd_bundle_tile_t * ctx ) {
     253           3 :   if( FD_UNLIKELY( !ctx->grpc_client ) ) return; /* no client */
     254           3 :   fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
     255           3 :   if( FD_UNLIKELY( !conn ) ) return; /* no conn */
     256           3 :   if( FD_UNLIKELY( conn->flags ) ) return; /* conn busy */
     257           3 :   fd_h2_rbuf_t * rbuf_tx = fd_grpc_client_rbuf_tx( ctx->grpc_client );
     258             : 
     259           3 :   if( FD_LIKELY( fd_h2_tx_ping( conn, rbuf_tx ) ) ) {
     260           3 :     long now = fd_bundle_now();
     261           3 :     fd_keepalive_tx( ctx->keepalive, ctx->rng, now );
     262           3 :     FD_LOG_DEBUG(( "Keepalive TX (deadline=+%gs)", (double)( ctx->keepalive->ts_deadline-now )/1e9 ));
     263           3 :   }
     264           3 : }
     265             : 
     266             : int
     267             : fd_bundle_client_step_reconnect( fd_bundle_tile_t * ctx,
     268          18 :                                  long               now ) {
     269             :   /* Drive auth */
     270          18 :   if( FD_UNLIKELY( ctx->auther.needs_poll ) ) {
     271           0 :     fd_bundle_auther_poll( &ctx->auther, ctx->grpc_client, ctx->keyguard_client );
     272           0 :     return 1;
     273           0 :   }
     274          18 :   if( FD_UNLIKELY( ctx->auther.state!=FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) return 0;
     275             : 
     276             :   /* Request block builder info */
     277          18 :   int const builder_info_expired = ( ctx->builder_info_valid_until - now )<0;
     278          18 :   if( FD_UNLIKELY( ( ( !ctx->builder_info_avail ) |
     279          18 :                      ( !builder_info_expired    ) ) &
     280          18 :                    ( !ctx->builder_info_wait      ) ) ) {
     281           3 :     fd_bundle_client_request_builder_info( ctx );
     282           3 :     return 1;
     283           3 :   }
     284             : 
     285             :   /* Subscribe to packets */
     286          15 :   if( FD_UNLIKELY( !ctx->packet_subscription_live && !ctx->packet_subscription_wait ) ) {
     287           3 :     fd_bundle_client_subscribe_packets( ctx );
     288           3 :     return 1;
     289           3 :   }
     290             : 
     291             :   /* Subscribe to bundles */
     292          12 :   if( FD_UNLIKELY( !ctx->bundle_subscription_live && !ctx->bundle_subscription_wait ) ) {
     293           3 :     fd_bundle_client_subscribe_bundles( ctx );
     294           3 :     return 1;
     295           3 :   }
     296             : 
     297             :   /* Send a PING */
     298           9 :   if( FD_UNLIKELY( fd_keepalive_should_tx( ctx->keepalive, now ) ) ) {
     299           3 :     fd_bundle_client_send_ping( ctx );
     300           3 :     return 1;
     301           3 :   }
     302             : 
     303           6 :   return 0;
     304           9 : }
     305             : 
     306             : static void
     307             : fd_bundle_client_step1( fd_bundle_tile_t * ctx,
     308          30 :                         int *              charge_busy ) {
     309             : 
     310             :   /* Wait for TCP socket to connect */
     311          30 :   if( FD_UNLIKELY( !ctx->tcp_sock_connected ) ) {
     312           0 :     if( FD_UNLIKELY( ctx->tcp_sock < 0 ) ) goto reconnect;
     313             : 
     314           0 :     struct pollfd pfds[1] = {
     315           0 :       { .fd = ctx->tcp_sock, .events = POLLOUT }
     316           0 :     };
     317           0 :     int poll_res = fd_syscall_poll( pfds, 1, 0 );
     318           0 :     if( FD_UNLIKELY( poll_res<0 ) ) {
     319           0 :       FD_LOG_ERR(( "fd_syscall_poll(tcp_sock) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     320           0 :     }
     321           0 :     if( poll_res==0 ) return;
     322             : 
     323           0 :     if( pfds[0].revents & (POLLERR|POLLHUP) ) {
     324           0 :       int connect_err = fd_bundle_client_do_connect( ctx, 0 );
     325           0 :       FD_LOG_INFO(( "Bundle gRPC connect attempt failed (%i-%s)", connect_err, fd_io_strerror( connect_err ) ));
     326           0 :       fd_bundle_client_reset( ctx );
     327           0 :       ctx->metrics.transport_fail_cnt++;
     328           0 :       *charge_busy = 1;
     329           0 :       return;
     330           0 :     }
     331           0 :     if( pfds[0].revents & POLLOUT ) {
     332           0 :       FD_LOG_DEBUG(( "Bundle TCP socket connected" ));
     333           0 :       ctx->tcp_sock_connected = 1;
     334           0 :       *charge_busy = 1;
     335           0 :       return;
     336           0 :     }
     337           0 :     return;
     338           0 :   }
     339             : 
     340             :   /* gRPC conn died? */
     341          30 :   if( FD_UNLIKELY( !ctx->grpc_client ) ) {
     342           0 :   reconnect:
     343           0 :     if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, fd_bundle_now() ) ) ) {
     344           0 :       return;
     345           0 :     }
     346           0 :     fd_bundle_client_create_conn( ctx );
     347           0 :     *charge_busy = 1;
     348           0 :     return;
     349           0 :   }
     350             : 
     351             :   /* Did a HTTP/2 PING time out */
     352          30 :   long check_ts = ctx->cached_ts = fd_bundle_now();
     353          30 :   if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, check_ts ) ) ) {
     354           3 :     FD_LOG_WARNING(( "Bundle gRPC timed out (HTTP/2 PING went unanswered for %.2f seconds)",
     355           3 :                      (double)( check_ts - ctx->keepalive->ts_last_tx )/1e9 ));
     356           3 :     ctx->keepalive->inflight = 0;
     357           3 :     ctx->defer_reset = 1;
     358           3 :     *charge_busy = 1;
     359           3 :     return;
     360           3 :   }
     361             : 
     362             :   /* Drive I/O, SSL handshake, and any inflight requests */
     363          27 :   if( FD_UNLIKELY( !fd_bundle_client_drive_io( ctx, charge_busy ) ||
     364          27 :                    ctx->defer_reset /* new error? */ ) ) {
     365           0 :     fd_bundle_client_reset( ctx );
     366           0 :     ctx->metrics.transport_fail_cnt++;
     367           0 :     *charge_busy = 1;
     368           0 :     return;
     369           0 :   }
     370             : 
     371             :   /* Are we ready to issue a new request? */
     372          27 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     373          18 :   long io_ts = fd_bundle_now();
     374          18 :   if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, io_ts ) ) ) return;
     375             : 
     376          18 :   *charge_busy |= fd_bundle_client_step_reconnect( ctx, io_ts );
     377          18 : }
     378             : 
     379             : static void
     380          30 : fd_bundle_client_log_status( fd_bundle_tile_t * ctx ) {
     381          30 :   int status = fd_bundle_client_status( ctx );
     382             : 
     383          30 :   int const connected_now    = ( status==FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED );
     384          30 :   int const connected_before = ( ctx->bundle_status_logged==FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED );
     385             : 
     386          30 :   if( FD_UNLIKELY( connected_now!=connected_before ) ) {
     387           3 :     long ts = fd_log_wallclock();
     388           3 :     if( FD_LIKELY( ts-(ctx->last_bundle_status_log_nanos) >= (long)1e6 ) ) {
     389           3 :       if( connected_now ) {
     390           3 :         FD_LOG_NOTICE(( "Connected to bundle server" ));
     391           3 :       } else {
     392           0 :         FD_LOG_WARNING(( "Disconnected from bundle server" ));
     393           0 :       }
     394           3 :       ctx->last_bundle_status_log_nanos = ts;
     395           3 :       ctx->bundle_status_logged = (uchar)status;
     396           3 :     }
     397           3 :   }
     398          30 : }
     399             : 
     400             : void
     401             : fd_bundle_client_step( fd_bundle_tile_t * ctx,
     402          30 :                        int *              charge_busy ) {
     403             :   /* Edge-trigger logging with rate limiting */
     404          30 :   fd_bundle_client_step1( ctx, charge_busy );
     405          30 :   fd_bundle_client_log_status( ctx );
     406          30 : }
     407             : 
     408             : void
     409             : fd_bundle_tile_backoff( fd_bundle_tile_t * ctx,
     410          12 :                         long               now ) {
     411          12 :   uint iter = ctx->backoff_iter;
     412          12 :   if( now < ctx->backoff_reset ) iter = 0U;
     413          12 :   iter++;
     414             : 
     415             :   /* FIXME proper backoff */
     416          12 :   long wait_ns = (long)2e9;
     417          12 :   wait_ns = (long)( fd_rng_ulong( ctx->rng ) & ( (1UL<<fd_ulong_find_msb_w_default( (ulong)wait_ns, 0 ))-1UL ) );
     418             : 
     419          12 :   ctx->backoff_until = now +   wait_ns;
     420          12 :   ctx->backoff_reset = now + 2*wait_ns;
     421             : 
     422          12 :   ctx->backoff_iter = iter;
     423          12 : }
     424             : 
     425             : static void
     426           0 : fd_bundle_client_grpc_conn_established( void * app_ctx ) {
     427           0 :   (void)app_ctx;
     428           0 :   FD_LOG_INFO(( "Bundle gRPC connection established" ));
     429           0 : }
     430             : 
     431             : static void
     432             : fd_bundle_client_grpc_conn_dead( void * app_ctx,
     433             :                                  uint   h2_err,
     434           0 :                                  int    closed_by ) {
     435           0 :   fd_bundle_tile_t * ctx = app_ctx;
     436           0 :   FD_LOG_INFO(( "Bundle gRPC connection closed %s (%u-%s)",
     437           0 :                 closed_by ? "by peer" : "due to error",
     438           0 :                 h2_err, fd_h2_strerror( h2_err ) ));
     439           0 :   ctx->defer_reset = 1;
     440           0 : }
     441             : 
     442             : /* Forwards a bundle transaction to the tango message bus. */
     443             : 
     444             : static void
     445             : fd_bundle_tile_publish_bundle_txn(
     446             :     fd_bundle_tile_t * ctx,
     447             :     void const *       txn,
     448             :     ulong              txn_sz,  /* <=FD_TXN_MTU */
     449             :     ulong              bundle_txn_cnt
     450          15 : ) {
     451          15 :   if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
     452           0 :     ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
     453           0 :     return;
     454           0 :   }
     455             : 
     456          15 :   fd_txn_m_t * txnm = fd_chunk_to_laddr( ctx->verify_out.mem, ctx->verify_out.chunk );
     457          15 :   *txnm = (fd_txn_m_t) {
     458          15 :     .reference_slot = 0UL,
     459          15 :     .payload_sz     = (ushort)txn_sz,
     460          15 :     .txn_t_sz       = 0,
     461          15 :     .block_engine   = {
     462          15 :       .bundle_id      = ctx->bundle_seq,
     463          15 :       .bundle_txn_cnt = bundle_txn_cnt,
     464          15 :       .commission     = (uchar)ctx->builder_commission
     465          15 :     },
     466          15 :   };
     467          15 :   memcpy( txnm->block_engine.commission_pubkey, ctx->builder_pubkey, 32UL );
     468          15 :   fd_memcpy( fd_txn_m_payload( txnm ), txn, txn_sz );
     469             : 
     470          15 :   ulong sz  = fd_txn_m_realized_footprint( txnm, 0, 0 );
     471          15 :   ulong sig = 1UL;
     472             : 
     473          15 :   if( FD_UNLIKELY( !ctx->stem ) ) {
     474           0 :     FD_LOG_CRIT(( "ctx->stem not set. This is a bug." ));
     475           0 :   }
     476             : 
     477          15 :   ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_bundle_now() );
     478          15 :   fd_stem_publish( ctx->stem, ctx->verify_out.idx, sig, ctx->verify_out.chunk, sz, 0UL, 0UL, tspub );
     479          15 :   ctx->verify_out.chunk = fd_dcache_compact_next( ctx->verify_out.chunk, sz, ctx->verify_out.chunk0, ctx->verify_out.wmark );
     480          15 :   ctx->metrics.txn_received_cnt++;
     481          15 : }
     482             : 
     483             : /* Forwards a regular transaction to the tango message bus. */
     484             : 
     485             : static void
     486             : fd_bundle_tile_publish_txn(
     487             :     fd_bundle_tile_t * ctx,
     488             :     void const *       txn,
     489             :     ulong              txn_sz  /* <=FD_TXN_MTU */
     490           9 : ) {
     491           9 :   fd_txn_m_t * txnm = fd_chunk_to_laddr( ctx->verify_out.mem, ctx->verify_out.chunk );
     492           9 :   *txnm = (fd_txn_m_t) {
     493           9 :     .reference_slot = 0UL,
     494           9 :     .payload_sz     = (ushort)txn_sz,
     495           9 :     .txn_t_sz       = 0,
     496           9 :     .block_engine   = {
     497           9 :       .bundle_id         = 0UL,
     498           9 :       .bundle_txn_cnt    = 1UL,
     499           9 :       .commission        = 0,
     500           9 :       .commission_pubkey = {0}
     501           9 :     },
     502           9 :   };
     503           9 :   fd_memcpy( fd_txn_m_payload( txnm ), txn, txn_sz );
     504             : 
     505           9 :   ulong sz  = fd_txn_m_realized_footprint( txnm, 0, 0 );
     506           9 :   ulong sig = 0UL;
     507             : 
     508           9 :   if( FD_UNLIKELY( !ctx->stem ) ) {
     509           0 :     FD_LOG_CRIT(( "ctx->stem not set. This is a bug." ));
     510           0 :   }
     511             : 
     512           9 :   ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_bundle_now() );
     513           9 :   fd_stem_publish( ctx->stem, ctx->verify_out.idx, sig, ctx->verify_out.chunk, sz, 0UL, 0UL, tspub );
     514           9 :   ctx->verify_out.chunk = fd_dcache_compact_next( ctx->verify_out.chunk, sz, ctx->verify_out.chunk0, ctx->verify_out.wmark );
     515           9 :   ctx->metrics.txn_received_cnt++;
     516           9 : }
     517             : 
     518             : /* Called for each transaction in a bundle.  Simply counts up
     519             :    bundle_txn_cnt, but does not publish anything. */
     520             : 
     521             : static bool
     522             : fd_bundle_client_visit_pb_bundle_txn_preflight(
     523             :     pb_istream_t *     istream,
     524             :     pb_field_t const * field,
     525             :     void **            arg
     526          15 : ) {
     527          15 :   (void)istream; (void)field;
     528          15 :   fd_bundle_tile_t * ctx = *arg;
     529          15 :   ctx->bundle_txn_cnt++;
     530          15 :   return true;
     531          15 : }
     532             : 
     533             : /* Called for each transaction in a bundle.  Publishes each transaction
     534             :    to the tango message bus. */
     535             : 
     536             : static bool
     537             : fd_bundle_client_visit_pb_bundle_txn(
     538             :     pb_istream_t *     istream,
     539             :     pb_field_t const * field,
     540             :     void **            arg
     541          15 : ) {
     542          15 :   (void)field;
     543          15 :   fd_bundle_tile_t * ctx = *arg;
     544             : 
     545          15 :   packet_Packet packet = packet_Packet_init_default;
     546          15 :   if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
     547           0 :     ctx->metrics.decode_fail_cnt++;
     548           0 :     FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
     549           0 :     return false;
     550           0 :   }
     551             : 
     552          15 :   if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
     553           0 :     FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
     554           0 :     return true;
     555           0 :   }
     556             : 
     557          15 :   fd_bundle_tile_publish_bundle_txn(
     558          15 :       ctx,
     559          15 :       packet.data.bytes, packet.data.size,
     560          15 :       ctx->bundle_txn_cnt
     561          15 :   );
     562             : 
     563          15 :   return true;
     564          15 : }
     565             : 
     566             : static void
     567             : fd_bundle_client_sample_rx_delay(
     568             :     fd_bundle_tile_t *                ctx,
     569             :     google_protobuf_Timestamp const * ts
     570           9 : ) {
     571           9 :   ulong tsorig = (ulong)ts->seconds*(ulong)1e9 + (ulong)ts->nanos;
     572           9 :   fd_histf_sample( ctx->metrics.msg_rx_delay, fd_ulong_sat_sub( (ulong)ctx->cached_ts, tsorig ) );
     573           9 : }
     574             : 
     575             : /* Called for each BundleUuid in a SubscribeBundlesResponse. */
     576             : 
     577             : static bool
     578             : fd_bundle_client_visit_pb_bundle_uuid(
     579             :     pb_istream_t *     istream,
     580             :     pb_field_t const * field,
     581             :     void **            arg
     582           3 : ) {
     583           3 :   (void)field;
     584           3 :   fd_bundle_tile_t * ctx = *arg;
     585             : 
     586             :   /* Reset bundle state */
     587             : 
     588           3 :   ctx->bundle_txn_cnt = 0UL;
     589           3 :   ctx->bundle_seq++;
     590             : 
     591             :   /* Do two decode passes.  This is required because we need to know the
     592             :      number of transactions in a bundle ahead of time.  However, due to
     593             :      the Protobuf wire encoding, we don't know the number of txns that
     594             :      will come until we've parsed everything.
     595             : 
     596             :      First pass: Count number of bundles. */
     597             : 
     598           3 :   pb_istream_t peek = *istream;
     599           3 :   bundle_BundleUuid bundle = bundle_BundleUuid_init_default;
     600           3 :   bundle.bundle.packets = (pb_callback_t) {
     601           3 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_txn_preflight,
     602           3 :     .arg          = ctx
     603           3 :   };
     604           3 :   if( FD_UNLIKELY( !pb_decode( &peek, &bundle_BundleUuid_msg, &bundle ) ) ) {
     605           0 :     ctx->metrics.decode_fail_cnt++;
     606           0 :     FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed: %s", peek.errmsg ));
     607           0 :     return false;
     608           0 :   }
     609             : 
     610             :   /* At this opint, ctx->bundle_txn_cnt is correctly set.
     611             :      Second pass: Actually publish bundle packets */
     612             : 
     613           3 :   bundle = (bundle_BundleUuid)bundle_BundleUuid_init_default;
     614           3 :   bundle.bundle.packets = (pb_callback_t) {
     615           3 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_txn,
     616           3 :     .arg          = ctx
     617           3 :   };
     618             : 
     619           3 :   ctx->metrics.bundle_received_cnt++;
     620             : 
     621           3 :   if( FD_UNLIKELY( !pb_decode( istream, &bundle_BundleUuid_msg, &bundle ) ) ) {
     622           0 :     ctx->metrics.decode_fail_cnt++;
     623           0 :     FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed (internal error): %s", istream->errmsg ));
     624           0 :     return false;
     625           0 :   }
     626             : 
     627           3 :   fd_bundle_client_sample_rx_delay( ctx, &bundle.bundle.header.ts );
     628             : 
     629           3 :   return true;
     630           3 : }
     631             : 
     632             : /* Handle a SubscribeBundlesResponse from a SubscribeBundles gRPC call. */
     633             : 
     634             : static void
     635             : fd_bundle_client_handle_bundle_batch(
     636             :     fd_bundle_tile_t * ctx,
     637             :     pb_istream_t *     istream
     638           6 : ) {
     639           6 :   if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
     640           3 :     ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
     641           3 :     return;
     642           3 :   }
     643             : 
     644           3 :   block_engine_SubscribeBundlesResponse res = block_engine_SubscribeBundlesResponse_init_default;
     645           3 :   res.bundles = (pb_callback_t) {
     646           3 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_uuid,
     647           3 :     .arg          = ctx
     648           3 :   };
     649           3 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribeBundlesResponse_msg, &res ) ) ) {
     650           0 :     ctx->metrics.decode_fail_cnt++;
     651           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribeBundlesResponse) failed: %s", istream->errmsg ));
     652           0 :     return;
     653           0 :   }
     654           3 : }
     655             : 
     656             : /* Called for each 'Packet' (a regular transaction) of a
     657             :    SubscribePacketsResponse. */
     658             : 
     659             : static bool
     660             : fd_bundle_client_visit_pb_packet(
     661             :     pb_istream_t *     istream,
     662             :     pb_field_t const * field,
     663             :     void **            arg
     664           9 : ) {
     665           9 :   (void)field;
     666           9 :   fd_bundle_tile_t * ctx = *arg;
     667             : 
     668           9 :   packet_Packet packet = packet_Packet_init_default;
     669           9 :   if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
     670           0 :     ctx->metrics.decode_fail_cnt++;
     671           0 :     FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
     672           0 :     return false;
     673           0 :   }
     674             : 
     675           9 :   if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
     676           0 :     FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
     677           0 :     return true;
     678           0 :   }
     679             : 
     680           9 :   fd_bundle_tile_publish_txn( ctx, packet.data.bytes, packet.data.size );
     681           9 :   ctx->metrics.packet_received_cnt++;
     682             : 
     683           9 :   return true;
     684           9 : }
     685             : 
     686             : /* Handle a SubscribePacketsResponse from a SubscribePackets gRPC call. */
     687             : 
     688             : static void
     689             : fd_bundle_client_handle_packet_batch(
     690             :     fd_bundle_tile_t * ctx,
     691             :     pb_istream_t *     istream
     692           6 : ) {
     693           6 :   block_engine_SubscribePacketsResponse res = block_engine_SubscribePacketsResponse_init_default;
     694           6 :   res.batch.packets = (pb_callback_t) {
     695           6 :     .funcs.decode = fd_bundle_client_visit_pb_packet,
     696           6 :     .arg          = ctx
     697           6 :   };
     698           6 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribePacketsResponse_msg, &res ) ) ) {
     699           0 :     ctx->metrics.decode_fail_cnt++;
     700           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribePacketsResponse) failed" ));
     701           0 :     return;
     702           0 :   }
     703             : 
     704           6 :   fd_bundle_client_sample_rx_delay( ctx, &res.header.ts );
     705           6 : }
     706             : 
     707             : /* Handle a BlockBuilderFeeInfoResponse from a GetBlockBuilderFeeInfo
     708             :    gRPC call. */
     709             : 
     710             : static void
     711             : fd_bundle_client_handle_builder_fee_info(
     712             :     fd_bundle_tile_t * ctx,
     713             :     pb_istream_t *     istream
     714           9 : ) {
     715           9 :   block_engine_BlockBuilderFeeInfoResponse res = block_engine_BlockBuilderFeeInfoResponse_init_default;
     716           9 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_BlockBuilderFeeInfoResponse_msg, &res ) ) ) {
     717           0 :     ctx->metrics.decode_fail_cnt++;
     718           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.BlockBuilderFeeInfoResponse) failed" ));
     719           0 :     return;
     720           0 :   }
     721           9 :   if( FD_UNLIKELY( res.commission > 100 ) ) {
     722           3 :     ctx->metrics.decode_fail_cnt++;
     723           3 :     FD_LOG_WARNING(( "BlockBuilderFeeInfoResponse commission out of range (0-100): %lu", res.commission ));
     724           3 :     return;
     725           3 :   }
     726             : 
     727           6 :   ctx->builder_commission = (uchar)res.commission;
     728           6 :   if( FD_UNLIKELY( !fd_base58_decode_32( res.pubkey, ctx->builder_pubkey ) ) ) {
     729           3 :     FD_LOG_HEXDUMP_WARNING(( "Invalid pubkey in BlockBuilderFeeInfoResponse", res.pubkey, strnlen( res.pubkey, sizeof(res.pubkey) ) ));
     730           3 :     return;
     731           3 :   }
     732             : 
     733           3 :   long validity_duration_ns = (long)( 60e9 * 5. ); /* 5 minutes */
     734           3 :   ctx->builder_info_avail = 1;
     735           3 :   ctx->builder_info_valid_until = fd_bundle_now() + validity_duration_ns;
     736           3 : }
     737             : 
     738             : static void
     739             : fd_bundle_client_grpc_tx_complete(
     740             :     void * app_ctx,
     741             :     ulong  request_ctx
     742           9 : ) {
     743           9 :   (void)app_ctx; (void)request_ctx;
     744           9 : }
     745             : 
     746             : void
     747             : fd_bundle_client_grpc_rx_start(
     748             :     void * app_ctx,
     749             :     ulong  request_ctx
     750           9 : ) {
     751           9 :   fd_bundle_tile_t * ctx = app_ctx;
     752           9 :   switch( request_ctx ) {
     753           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     754           3 :     ctx->packet_subscription_live = 1;
     755           3 :     ctx->packet_subscription_wait = 0;
     756           3 :     break;
     757           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     758           3 :     ctx->bundle_subscription_live = 1;
     759           3 :     ctx->bundle_subscription_wait = 0;
     760           3 :     break;
     761           9 :   }
     762           9 : }
     763             : 
     764             : void
     765             : fd_bundle_client_grpc_rx_msg(
     766             :     void *       app_ctx,
     767             :     void const * protobuf,
     768             :     ulong        protobuf_sz,
     769             :     ulong        request_ctx
     770          21 : ) {
     771          21 :   fd_bundle_tile_t * ctx = app_ctx;
     772          21 :   pb_istream_t istream = pb_istream_from_buffer( protobuf, protobuf_sz );
     773          21 :   switch( request_ctx ) {
     774           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     775           0 :     if( FD_UNLIKELY( !fd_bundle_auther_handle_challenge_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
     776           0 :       ctx->metrics.decode_fail_cnt++;
     777           0 :       fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     778           0 :     }
     779           0 :     break;
     780           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     781           0 :     if( FD_UNLIKELY( !fd_bundle_auther_handle_tokens_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
     782           0 :       ctx->metrics.decode_fail_cnt++;
     783           0 :       fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     784           0 :     }
     785           0 :     break;
     786           6 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     787           6 :     fd_bundle_client_handle_bundle_batch( ctx, &istream );
     788           6 :     break;
     789           6 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     790           6 :     fd_bundle_client_handle_packet_batch( ctx, &istream );
     791           6 :     break;
     792           9 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
     793           9 :     fd_bundle_client_handle_builder_fee_info( ctx, &istream );
     794           9 :     break;
     795           0 :   default:
     796           0 :     FD_LOG_ERR(( "Received unexpected gRPC message (request_ctx=%lu)", request_ctx ));
     797          21 :   }
     798          21 : }
     799             : 
     800             : static void
     801             : fd_bundle_client_request_failed( fd_bundle_tile_t * ctx,
     802           0 :                                  ulong              request_ctx ) {
     803           0 :   fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     804           0 :   switch( request_ctx ) {
     805           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     806           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     807           0 :     fd_bundle_auther_handle_request_fail( &ctx->auther );
     808           0 :     break;
     809           0 :   }
     810           0 : }
     811             : 
     812             : void
     813             : fd_bundle_client_grpc_rx_end(
     814             :     void *                app_ctx,
     815             :     ulong                 request_ctx,
     816             :     fd_grpc_resp_hdrs_t * resp
     817          12 : ) {
     818          12 :   fd_bundle_tile_t * ctx = app_ctx;
     819          12 :   if( FD_UNLIKELY( resp->h2_status!=200 ) ) {
     820           0 :     FD_LOG_WARNING(( "gRPC request failed (HTTP status %u)", resp->h2_status ));
     821           0 :     fd_bundle_client_request_failed( ctx, request_ctx );
     822           0 :     return;
     823           0 :   }
     824             : 
     825          12 :   resp->grpc_msg_len = (uint)fd_url_unescape( resp->grpc_msg, resp->grpc_msg_len );
     826          12 :   if( !resp->grpc_msg_len ) {
     827          12 :     fd_memcpy( resp->grpc_msg, "unknown error", 13 );
     828          12 :     resp->grpc_msg_len = 13;
     829          12 :   }
     830             : 
     831          12 :   switch( request_ctx ) {
     832           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     833           0 :     ctx->packet_subscription_live = 0;
     834           0 :     ctx->packet_subscription_wait = 0;
     835           0 :     fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     836           0 :     ctx->defer_reset = 1;
     837           0 :     FD_LOG_INFO(( "SubscribePackets stream failed (gRPC status %u-%s). Reconnecting ...",
     838           0 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
     839           0 :     return;
     840           9 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     841           9 :     ctx->bundle_subscription_live = 0;
     842           9 :     ctx->bundle_subscription_wait = 0;
     843           9 :     fd_bundle_tile_backoff( ctx, fd_bundle_now() );
     844           9 :     ctx->defer_reset = 1;
     845           9 :     FD_LOG_INFO(( "SubscribeBundles stream failed (gRPC status %u-%s). Reconnecting ...",
     846           9 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
     847           9 :     return;
     848           3 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
     849           3 :     ctx->builder_info_wait = 0;
     850           3 :     break;
     851           0 :   default:
     852           0 :     break;
     853          12 :   }
     854             : 
     855           3 :   if( FD_UNLIKELY( resp->grpc_status!=FD_GRPC_STATUS_OK ) ) {
     856           0 :     FD_LOG_INFO(( "gRPC request failed (gRPC status %u-%s): %.*s",
     857           0 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
     858           0 :                   (int)resp->grpc_msg_len, resp->grpc_msg ));
     859           0 :     fd_bundle_client_request_failed( ctx, request_ctx );
     860           0 :     if( resp->grpc_status==FD_GRPC_STATUS_UNAUTHENTICATED ||
     861           0 :         resp->grpc_status==FD_GRPC_STATUS_PERMISSION_DENIED ) {
     862           0 :       fd_bundle_auther_reset( &ctx->auther );
     863           0 :     }
     864           0 :     return;
     865           0 :   }
     866           3 : }
     867             : 
     868             : void
     869             : fd_bundle_client_grpc_rx_timeout(
     870             :     void * app_ctx,
     871             :     ulong  request_ctx,  /* FD_BUNDLE_CLIENT_REQ_{...} */
     872             :     int    deadline_kind /* FD_GRPC_DEADLINE_{HEADER|RX_END} */
     873           6 : ) {
     874           6 :   (void)deadline_kind;
     875           6 :   FD_LOG_WARNING(( "Request timed out: %s", fd_bundle_request_ctx_cstr( request_ctx ) ));
     876           6 :   fd_bundle_tile_t * ctx = app_ctx;
     877           6 :   ctx->defer_reset = 1;
     878           6 : }
     879             : 
     880             : static void
     881           3 : fd_bundle_client_grpc_ping_ack( void * app_ctx ) {
     882           3 :   fd_bundle_tile_t * ctx = app_ctx;
     883           3 :   long rtt_sample = fd_keepalive_rx( ctx->keepalive, fd_bundle_now() );
     884           3 :   if( FD_LIKELY( rtt_sample ) ) {
     885           3 :     fd_rtt_sample( ctx->rtt, (float)rtt_sample, 0 );
     886           3 :     FD_LOG_DEBUG(( "Keepalive ACK" ));
     887           3 :   }
     888           3 :   ctx->metrics.ping_ack_cnt++;
     889           3 : }
     890             : 
     891             : fd_grpc_client_callbacks_t fd_bundle_client_grpc_callbacks = {
     892             :   .conn_established = fd_bundle_client_grpc_conn_established,
     893             :   .conn_dead        = fd_bundle_client_grpc_conn_dead,
     894             :   .tx_complete      = fd_bundle_client_grpc_tx_complete,
     895             :   .rx_start         = fd_bundle_client_grpc_rx_start,
     896             :   .rx_msg           = fd_bundle_client_grpc_rx_msg,
     897             :   .rx_end           = fd_bundle_client_grpc_rx_end,
     898             :   .rx_timeout       = fd_bundle_client_grpc_rx_timeout,
     899             :   .ping_ack         = fd_bundle_client_grpc_ping_ack,
     900             : };
     901             : 
     902             : /* Decrease verbosity */
     903          12 : #define DISCONNECTED FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_DISCONNECTED
     904          54 : #define CONNECTING   FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTING
     905         108 : #define CONNECTED    FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED
     906             : 
     907             : int
     908         174 : fd_bundle_client_status( fd_bundle_tile_t const * ctx ) {
     909         174 :   if( FD_UNLIKELY( ( !ctx->tcp_sock_connected ) |
     910         174 :                    ( !ctx->grpc_client        ) ) ) {
     911           3 :     return DISCONNECTED;
     912           3 :   }
     913             : 
     914         171 :   fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
     915         171 :   if( FD_UNLIKELY( !conn ) ) {
     916           0 :     return DISCONNECTED; /* no conn */
     917           0 :   }
     918         171 :   if( FD_UNLIKELY( conn->flags &
     919         171 :       ( FD_H2_CONN_FLAGS_DEAD |
     920         171 :         FD_H2_CONN_FLAGS_SEND_GOAWAY ) ) ) {
     921           6 :     return DISCONNECTED;
     922           6 :   }
     923             : 
     924         165 :   if( FD_UNLIKELY( conn->flags &
     925         165 :       ( FD_H2_CONN_FLAGS_CLIENT_INITIAL      |
     926         165 :         FD_H2_CONN_FLAGS_WAIT_SETTINGS_ACK_0 |
     927         165 :         FD_H2_CONN_FLAGS_WAIT_SETTINGS_0     |
     928         165 :         FD_H2_CONN_FLAGS_SERVER_INITIAL ) ) ) {
     929          12 :     return CONNECTING; /* connection is not ready */
     930          12 :   }
     931             : 
     932         153 :   if( FD_UNLIKELY( ctx->auther.state != FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) {
     933          12 :     return CONNECTING; /* not authenticated */
     934          12 :   }
     935             : 
     936         141 :   if( FD_UNLIKELY( ( !ctx->builder_info_avail       ) |
     937         141 :                    ( !ctx->packet_subscription_live ) |
     938         141 :                    ( !ctx->bundle_subscription_live ) ) ) {
     939          27 :     return CONNECTING; /* not fully connected */
     940          27 :   }
     941             : 
     942         114 :   if( FD_UNLIKELY( fd_keepalive_is_timeout( ctx->keepalive, fd_bundle_now() ) ) ) {
     943           3 :     return DISCONNECTED; /* possible timeout */
     944           3 :   }
     945             : 
     946         111 :   if( FD_UNLIKELY( !fd_grpc_client_is_connected( ctx->grpc_client ) ) ) {
     947           3 :     return CONNECTING;
     948           3 :   }
     949             : 
     950             :   /* As far as we know, the bundle connection is alive and well. */
     951         108 :   return CONNECTED;
     952         111 : }
     953             : 
     954             : #undef DISCONNECTED
     955             : #undef CONNECTING
     956             : #undef CONNECTED
     957             : 
     958             : FD_FN_CONST char const *
     959           6 : fd_bundle_request_ctx_cstr( ulong request_ctx ) {
     960           6 :   switch( request_ctx ) {
     961           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     962           0 :     return "GenerateAuthChallenge";
     963           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     964           0 :     return "GenerateAuthTokens";
     965           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     966           0 :     return "SubscribePackets";
     967           6 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     968           6 :     return "SubscribeBundles";
     969           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
     970           0 :     return "GetBlockBuilderFeeInfo";
     971           0 :   default:
     972           0 :     return "unknown";
     973           6 :   }
     974           6 : }

Generated by: LCOV version 1.14