LCOV - code coverage report
Current view: top level - disco/bundle - fd_bundle_client.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 682 0.0 %
Date: 2025-07-01 05:00:49 Functions: 0 36 0.0 %

          Line data    Source code
       1             : /* fd_bundle_client.c steps gRPC related tasks. */
       2             : 
       3             : #include "fd_bundle_auth.h"
       4             : #include "fd_bundle_tile_private.h"
       5             : #include "proto/block_engine.pb.h"
       6             : #include "proto/bundle.pb.h"
       7             : #include "proto/packet.pb.h"
       8             : #include "../fd_txn_m_t.h"
       9             : #include "../plugin/fd_plugin.h"
      10             : #include "../../waltz/h2/fd_h2_conn.h"
      11             : #include "../../ballet/base58/fd_base58.h"
      12             : #include "../../ballet/nanopb/pb_decode.h"
      13             : #include "../../util/net/fd_ip4.h"
      14             : 
      15             : #include <fcntl.h>
      16             : #include <errno.h>
      17             : #include <unistd.h> /* close */
      18             : #include <poll.h> /* poll */
      19             : #include <sys/socket.h> /* socket */
      20             : #include <netinet/in.h>
      21             : 
      22           0 : #define FD_BUNDLE_CLIENT_REQUEST_TIMEOUT ((long)8e9) /* 8 seconds */
      23             : 
      24             : static void
      25           0 : fd_bundle_client_reset( fd_bundle_tile_t * ctx ) {
      26           0 :   if( FD_UNLIKELY( ctx->tcp_sock >= 0 ) ) {
      27           0 :     if( FD_UNLIKELY( 0!=close( ctx->tcp_sock ) ) ) {
      28           0 :       FD_LOG_ERR(( "close(tcp_sock=%i) failed (%i-%s)", ctx->tcp_sock, errno, fd_io_strerror( errno ) ));
      29           0 :     }
      30           0 :     ctx->tcp_sock = -1;
      31           0 :     ctx->tcp_sock_connected = 0;
      32           0 :   }
      33           0 :   ctx->defer_reset = 0;
      34             : 
      35           0 :   ctx->builder_info_avail       = 0;
      36           0 :   ctx->builder_info_wait        = 0;
      37           0 :   ctx->packet_subscription_live = 0;
      38           0 :   ctx->packet_subscription_wait = 0;
      39           0 :   ctx->bundle_subscription_live = 0;
      40           0 :   ctx->bundle_subscription_wait = 0;
      41             : 
      42           0 :   ctx->rtt->is_rtt_valid = 0;
      43             : 
      44           0 : # if FD_HAS_OPENSSL
      45           0 :   if( FD_UNLIKELY( ctx->ssl ) ) {
      46           0 :     SSL_free( ctx->ssl );
      47           0 :     ctx->ssl = NULL;
      48           0 :   }
      49           0 : # endif
      50             : 
      51           0 :   fd_bundle_tile_backoff( ctx, fd_tickcount() );
      52             : 
      53           0 :   fd_bundle_auther_handle_request_fail( &ctx->auther );
      54           0 : }
      55             : 
      56             : static int
      57             : fd_bundle_client_do_connect( fd_bundle_tile_t const * ctx,
      58           0 :                              uint                     ip4_addr ) {
      59           0 :   struct sockaddr_in addr = {
      60           0 :     .sin_family      = AF_INET,
      61           0 :     .sin_addr.s_addr = ip4_addr,
      62           0 :     .sin_port        = fd_ushort_bswap( ctx->server_tcp_port )
      63           0 :   };
      64           0 :   errno = 0;
      65           0 :   connect( ctx->tcp_sock, fd_type_pun_const( &addr ), sizeof(struct sockaddr_in) );
      66           0 :   return errno;
      67           0 : }
      68             : 
      69             : static void
      70           0 : fd_bundle_client_create_conn( fd_bundle_tile_t * ctx ) {
      71           0 :   fd_bundle_client_reset( ctx );
      72             : 
      73             :   /* FIXME IPv6 support */
      74           0 :   fd_addrinfo_t hints = {0};
      75           0 :   hints.ai_family = AF_INET;
      76           0 :   fd_addrinfo_t * res = NULL;
      77           0 :   uchar scratch[ 4096 ];
      78           0 :   void * pscratch = scratch;
      79           0 :   int err = fd_getaddrinfo( ctx->server_fqdn, &hints, &res, &pscratch, sizeof(scratch) );
      80           0 :   if( FD_UNLIKELY( err ) ) {
      81           0 :     FD_LOG_WARNING(( "fd_getaddrinfo `%s` failed (%d-%s)", ctx->server_fqdn, err, fd_gai_strerror( err ) ));
      82           0 :     fd_bundle_client_reset( ctx );
      83           0 :     ctx->metrics.transport_fail_cnt++;
      84           0 :     return;
      85           0 :   }
      86           0 :   uint const ip4_addr = ((struct sockaddr_in *)res->ai_addr)->sin_addr.s_addr;
      87           0 :   ctx->server_ip4_addr = ip4_addr;
      88             : 
      89           0 :   int tcp_sock = socket( AF_INET, SOCK_STREAM|SOCK_CLOEXEC, 0 );
      90           0 :   if( FD_UNLIKELY( tcp_sock<0 ) ) {
      91           0 :     FD_LOG_ERR(( "socket(AF_INET,SOCK_STREAM|SOCK_CLOEXEC,0) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
      92           0 :   }
      93           0 :   ctx->tcp_sock = tcp_sock;
      94             : 
      95           0 :   if( FD_UNLIKELY( 0!=setsockopt( tcp_sock, SOL_SOCKET, SO_RCVBUF, &ctx->so_rcvbuf, sizeof(int) ) ) ) {
      96           0 :     FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_RCVBUF,%i) failed (%i-%s)", ctx->so_rcvbuf, errno, fd_io_strerror( errno ) ));
      97           0 :   }
      98             : 
      99           0 :   if( FD_UNLIKELY( fcntl( tcp_sock, F_SETFL, O_NONBLOCK )==-1 ) ) {
     100           0 :     FD_LOG_ERR(( "fcntl(tcp_sock,F_SETFL,O_NONBLOCK) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     101           0 :   }
     102             : 
     103           0 :   char const * scheme = "http";
     104           0 : # if FD_HAS_OPENSSL
     105           0 :   if( ctx->is_ssl ) scheme = "https";
     106           0 : # endif
     107             : 
     108           0 :   FD_LOG_INFO(( "Connecting to %s://" FD_IP4_ADDR_FMT ":%hu (%.*s)",
     109           0 :                 scheme,
     110           0 :                 FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
     111           0 :                 (int)ctx->server_sni_len, ctx->server_sni ));
     112             : 
     113           0 :   int connect_err = fd_bundle_client_do_connect( ctx, ip4_addr );
     114           0 :   if( FD_UNLIKELY( connect_err ) ) {
     115           0 :     if( FD_UNLIKELY( connect_err!=EINPROGRESS ) ) {
     116           0 :       FD_LOG_WARNING(( "connect(tcp_sock," FD_IP4_ADDR_FMT ":%u) failed (%i-%s)",
     117           0 :                       FD_IP4_ADDR_FMT_ARGS( ip4_addr ), ctx->server_tcp_port,
     118           0 :                       connect_err, fd_io_strerror( connect_err ) ));
     119           0 :       fd_bundle_client_reset( ctx );
     120           0 :       ctx->metrics.transport_fail_cnt++;
     121           0 :       return;
     122           0 :     }
     123           0 :   }
     124             : 
     125           0 : # if FD_HAS_OPENSSL
     126           0 :   if( ctx->is_ssl ) {
     127           0 :     BIO * bio = BIO_new_socket( ctx->tcp_sock, BIO_NOCLOSE );
     128           0 :     if( FD_UNLIKELY( !bio ) ) {
     129           0 :       FD_LOG_ERR(( "BIO_new_socket failed" ));
     130           0 :     }
     131             : 
     132           0 :     SSL * ssl = SSL_new( ctx->ssl_ctx );
     133           0 :     if( FD_UNLIKELY( !ssl ) ) {
     134           0 :       FD_LOG_ERR(( "SSL_new failed" ));
     135           0 :     }
     136             : 
     137           0 :     SSL_set_bio( ssl, bio, bio ); /* moves ownership of bio */
     138           0 :     SSL_set_connect_state( ssl );
     139             : 
     140             :     /* Indicate to endpoint which server name we want */
     141           0 :     if( FD_UNLIKELY( !SSL_set_tlsext_host_name( ssl, ctx->server_sni ) ) ) {
     142           0 :       FD_LOG_ERR(( "SSL_set_tlsext_host_name failed" ));
     143           0 :     }
     144             : 
     145             :     /* Enable hostname verification */
     146           0 :     if( FD_UNLIKELY( !SSL_set1_host( ssl, ctx->server_sni ) ) ) {
     147           0 :       FD_LOG_ERR(( "SSL_set1_host failed" ));
     148           0 :     }
     149             : 
     150           0 :     ctx->ssl = ssl;
     151           0 :   }
     152           0 : # endif /* FD_HAS_OPENSSL */
     153             : 
     154           0 :   fd_grpc_client_reset( ctx->grpc_client );
     155           0 : }
     156             : 
     157             : static int
     158             : fd_bundle_client_drive_io( fd_bundle_tile_t * ctx,
     159           0 :                            int *              charge_busy ) {
     160           0 : # if FD_HAS_OPENSSL
     161           0 :   if( ctx->is_ssl ) {
     162           0 :     return fd_grpc_client_rxtx_ossl( ctx->grpc_client, ctx->ssl, charge_busy );
     163           0 :   }
     164           0 : # endif /* FD_HAS_OPENSSL */
     165             : 
     166           0 :   return fd_grpc_client_rxtx_socket( ctx->grpc_client, ctx->tcp_sock, charge_busy );
     167           0 : }
     168             : 
     169             : static void
     170           0 : fd_bundle_client_request_builder_info( fd_bundle_tile_t * ctx ) {
     171           0 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     172             : 
     173           0 :   block_engine_BlockBuilderFeeInfoRequest req = block_engine_BlockBuilderFeeInfoRequest_init_default;
     174           0 :   static char const path[] = "/block_engine.BlockEngineValidator/GetBlockBuilderFeeInfo";
     175           0 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     176           0 :       ctx->grpc_client,
     177           0 :       path, sizeof(path)-1,
     178           0 :       FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo,
     179           0 :       &block_engine_BlockBuilderFeeInfoRequest_msg, &req,
     180           0 :       ctx->auther.access_token, ctx->auther.access_token_sz
     181           0 :   );
     182           0 :   if( FD_UNLIKELY( !request ) ) return;
     183           0 :   fd_grpc_client_deadline_set(
     184           0 :       request,
     185           0 :       FD_GRPC_DEADLINE_RX_END,
     186           0 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     187             : 
     188           0 :   ctx->builder_info_wait = 1;
     189           0 : }
     190             : 
     191             : static void
     192           0 : fd_bundle_client_subscribe_packets( fd_bundle_tile_t * ctx ) {
     193           0 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     194             : 
     195           0 :   block_engine_SubscribePacketsRequest req = block_engine_SubscribePacketsRequest_init_default;
     196           0 :   static char const path[] = "/block_engine.BlockEngineValidator/SubscribePackets";
     197           0 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     198           0 :       ctx->grpc_client,
     199           0 :       path, sizeof(path)-1,
     200           0 :       FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets,
     201           0 :       &block_engine_SubscribePacketsRequest_msg, &req,
     202           0 :       ctx->auther.access_token, ctx->auther.access_token_sz
     203           0 :   );
     204           0 :   if( FD_UNLIKELY( !request ) ) return;
     205           0 :   fd_grpc_client_deadline_set(
     206           0 :       request,
     207           0 :       FD_GRPC_DEADLINE_HEADER,
     208           0 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     209             : 
     210           0 :   ctx->packet_subscription_wait = 1;
     211           0 : }
     212             : 
     213             : static void
     214           0 : fd_bundle_client_subscribe_bundles( fd_bundle_tile_t * ctx ) {
     215           0 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     216             : 
     217           0 :   block_engine_SubscribeBundlesRequest req = block_engine_SubscribeBundlesRequest_init_default;
     218           0 :   static char const path[] = "/block_engine.BlockEngineValidator/SubscribeBundles";
     219           0 :   fd_grpc_h2_stream_t * request = fd_grpc_client_request_start(
     220           0 :       ctx->grpc_client,
     221           0 :       path, sizeof(path)-1,
     222           0 :       FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles,
     223           0 :       &block_engine_SubscribeBundlesRequest_msg, &req,
     224           0 :       ctx->auther.access_token, ctx->auther.access_token_sz
     225           0 :   );
     226           0 :   if( FD_UNLIKELY( !request ) ) return;
     227           0 :   fd_grpc_client_deadline_set(
     228           0 :       request,
     229           0 :       FD_GRPC_DEADLINE_HEADER,
     230           0 :       fd_log_wallclock() + FD_BUNDLE_CLIENT_REQUEST_TIMEOUT );
     231             : 
     232           0 :   ctx->bundle_subscription_wait = 1;
     233           0 : }
     234             : 
     235             : static void
     236           0 : fd_bundle_client_send_ping( fd_bundle_tile_t * ctx ) {
     237           0 :   if( FD_UNLIKELY( !ctx->grpc_client ) ) return; /* no client */
     238           0 :   fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
     239           0 :   if( FD_UNLIKELY( !conn ) ) return; /* no conn */
     240           0 :   if( FD_UNLIKELY( conn->flags ) ) return; /* conn busy */
     241           0 :   fd_h2_rbuf_t * rbuf_tx = fd_grpc_client_rbuf_tx( ctx->grpc_client );
     242             : 
     243           0 :   if( FD_LIKELY( fd_h2_tx_ping( conn, rbuf_tx ) ) ) {
     244           0 :     ctx->last_ping_tx_ticks = fd_tickcount();
     245           0 :     ctx->last_ping_tx_nanos = fd_log_wallclock();
     246           0 :     ctx->ping_randomize     = fd_rng_ulong( ctx->rng );
     247           0 :   }
     248           0 : }
     249             : 
     250             : FD_FN_PURE static int
     251             : fd_bundle_client_keepalive_due( fd_bundle_tile_t const * ctx,
     252           0 :                                 long                     now_ticks ) {
     253           0 :   ulong delay_min = ctx->ping_threshold_ticks>>1;
     254           0 :   ulong delay_rng = ctx->ping_threshold_ticks & ctx->ping_randomize;
     255           0 :   ulong delay     = delay_min + delay_rng;
     256           0 :   long  deadline  = ctx->last_ping_tx_ticks + (long)delay;
     257           0 :   return now_ticks >= deadline;
     258           0 : }
     259             : 
     260             : int
     261             : fd_bundle_client_step_reconnect( fd_bundle_tile_t * ctx,
     262           0 :                                  long               io_ticks ) {
     263             :   /* Drive auth */
     264           0 :   if( FD_UNLIKELY( ctx->auther.needs_poll ) ) {
     265           0 :     fd_bundle_auther_poll( &ctx->auther, ctx->grpc_client, ctx->keyguard_client );
     266           0 :     return 1;
     267           0 :   }
     268           0 :   if( FD_UNLIKELY( ctx->auther.state!=FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) return 0;
     269             : 
     270             :   /* Request block builder info */
     271           0 :   int const builder_info_expired = ( ctx->builder_info_valid_until_ticks - io_ticks )<0;
     272           0 :   if( FD_UNLIKELY( ( ( !ctx->builder_info_avail ) |
     273           0 :                      ( !builder_info_expired    ) ) &
     274           0 :                    ( !ctx->builder_info_wait      ) ) ) {
     275           0 :     fd_bundle_client_request_builder_info( ctx );
     276           0 :     return 1;
     277           0 :   }
     278             : 
     279             :   /* Subscribe to packets */
     280           0 :   if( FD_UNLIKELY( !ctx->packet_subscription_live && !ctx->packet_subscription_wait ) ) {
     281           0 :     fd_bundle_client_subscribe_packets( ctx );
     282           0 :     return 1;
     283           0 :   }
     284             : 
     285             :   /* Subscribe to bundles */
     286           0 :   if( FD_UNLIKELY( !ctx->bundle_subscription_live && !ctx->bundle_subscription_wait ) ) {
     287           0 :     fd_bundle_client_subscribe_bundles( ctx );
     288           0 :     return 1;
     289           0 :   }
     290             : 
     291             :   /* Send a PING */
     292           0 :   if( FD_UNLIKELY( fd_bundle_client_keepalive_due( ctx, io_ticks ) ) ) {
     293           0 :     fd_bundle_client_send_ping( ctx );
     294           0 :     return 1;
     295           0 :   }
     296             : 
     297           0 :   return 0;
     298           0 : }
     299             : 
     300             : static void
     301             : fd_bundle_client_step1( fd_bundle_tile_t * ctx,
     302           0 :                         int *              charge_busy ) {
     303             : 
     304             :   /* Wait for TCP socket to connect */
     305           0 :   if( FD_UNLIKELY( !ctx->tcp_sock_connected ) ) {
     306           0 :     if( FD_UNLIKELY( ctx->tcp_sock < 0 ) ) goto reconnect;
     307             : 
     308           0 :     struct pollfd pfds[1] = {
     309           0 :       { .fd = ctx->tcp_sock, .events = POLLOUT }
     310           0 :     };
     311           0 :     int poll_res = poll( pfds, 1, 0 );
     312           0 :     if( FD_UNLIKELY( poll_res<0 ) ) {
     313           0 :       FD_LOG_ERR(( "poll(tcp_sock) failed (%i-%s)", errno, fd_io_strerror( errno ) ));
     314           0 :     }
     315           0 :     if( poll_res==0 ) return;
     316             : 
     317           0 :     if( pfds[0].revents & (POLLERR|POLLHUP) ) {
     318           0 :       int connect_err = fd_bundle_client_do_connect( ctx, 0 );
     319           0 :       FD_LOG_INFO(( "Bundle gRPC connect attempt failed (%i-%s)", connect_err, fd_io_strerror( connect_err ) ));
     320           0 :       fd_bundle_client_reset( ctx );
     321           0 :       ctx->metrics.transport_fail_cnt++;
     322           0 :       *charge_busy = 1;
     323           0 :       return;
     324           0 :     }
     325           0 :     if( pfds[0].revents & POLLOUT ) {
     326           0 :       FD_LOG_DEBUG(( "Bundle TCP socket connected" ));
     327           0 :       ctx->tcp_sock_connected = 1;
     328           0 :       *charge_busy = 1;
     329           0 :       return;
     330           0 :     }
     331           0 :     return;
     332           0 :   }
     333             : 
     334             :   /* gRPC conn died? */
     335           0 :   if( FD_UNLIKELY( !ctx->grpc_client ) ) {
     336           0 :   reconnect:
     337           0 :     if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, fd_tickcount() ) ) ) {
     338           0 :       return;
     339           0 :     }
     340           0 :     fd_bundle_client_create_conn( ctx );
     341           0 :     *charge_busy = 1;
     342           0 :     return;
     343           0 :   }
     344             : 
     345             :   /* Drive I/O, SSL handshake, and any inflight requests */
     346           0 :   if( FD_UNLIKELY( !fd_bundle_client_drive_io( ctx, charge_busy ) ||
     347           0 :                    ctx->defer_reset /* new error? */ ) ) {
     348           0 :     fd_bundle_client_reset( ctx );
     349           0 :     ctx->metrics.transport_fail_cnt++;
     350           0 :     *charge_busy = 1;
     351           0 :     return;
     352           0 :   }
     353             : 
     354             :   /* Are we ready to issue a new request? */
     355           0 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( ctx->grpc_client ) ) ) return;
     356           0 :   long io_ts = fd_tickcount();
     357           0 :   if( FD_UNLIKELY( fd_bundle_tile_should_stall( ctx, io_ts ) ) ) return;
     358             : 
     359           0 :   *charge_busy |= fd_bundle_client_step_reconnect( ctx, io_ts );
     360           0 : }
     361             : 
     362             : static void
     363           0 : fd_bundle_client_log_status( fd_bundle_tile_t * ctx ) {
     364           0 :   int status = fd_bundle_client_status( ctx );
     365             : 
     366           0 :   int const connected_now    = ( status==FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED );
     367           0 :   int const connected_before = ( ctx->bundle_status_logged==FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED );
     368             : 
     369           0 :   if( FD_UNLIKELY( connected_now!=connected_before ) ) {
     370           0 :     long ts = fd_log_wallclock();
     371           0 :     if( FD_LIKELY( ts-(ctx->last_bundle_status_log_nanos) >= (long)1e6 ) ) {
     372           0 :       if( connected_now ) {
     373           0 :         FD_LOG_NOTICE(( "Connected to bundle server" ));
     374           0 :       } else {
     375           0 :         FD_LOG_WARNING(( "Disconnected from bundle server" ));
     376           0 :       }
     377           0 :       ctx->last_bundle_status_log_nanos = ts;
     378           0 :       ctx->bundle_status_logged = (uchar)status;
     379           0 :     }
     380           0 :   }
     381           0 : }
     382             : 
     383             : void
     384             : fd_bundle_client_step( fd_bundle_tile_t * ctx,
     385           0 :                        int *              charge_busy ) {
     386             :   /* Edge-trigger logging with rate limiting */
     387           0 :   fd_bundle_client_step1( ctx, charge_busy );
     388           0 :   fd_bundle_client_log_status( ctx );
     389           0 : }
     390             : 
     391             : void
     392             : fd_bundle_tile_backoff( fd_bundle_tile_t * ctx,
     393           0 :                         long               ts_ticks ) {
     394           0 :   uint iter = ctx->backoff_iter;
     395           0 :   if( ts_ticks < ctx->backoff_reset ) iter = 0U;
     396           0 :   iter++;
     397             : 
     398             :   /* FIXME proper backoff */
     399           0 :   long wait_ticks = (long)( 2e9 * fd_tempo_tick_per_ns( NULL ) );
     400           0 :   wait_ticks = (long)( fd_rng_ulong( ctx->rng ) & ( (1UL<<fd_ulong_find_msb_w_default( (ulong)wait_ticks, 0 ))-1UL ) );
     401             : 
     402           0 :   ctx->backoff_until = ts_ticks +   wait_ticks;
     403           0 :   ctx->backoff_reset = ts_ticks + 2*wait_ticks;
     404             : 
     405           0 :   ctx->backoff_iter = iter;
     406           0 : }
     407             : 
     408             : static void
     409           0 : fd_bundle_client_grpc_conn_established( void * app_ctx ) {
     410           0 :   (void)app_ctx;
     411           0 :   FD_LOG_INFO(( "Bundle gRPC connection established" ));
     412           0 : }
     413             : 
     414             : static void
     415             : fd_bundle_client_grpc_conn_dead( void * app_ctx,
     416             :                                  uint   h2_err,
     417           0 :                                  int    closed_by ) {
     418           0 :   fd_bundle_tile_t * ctx = app_ctx;
     419           0 :   FD_LOG_INFO(( "Bundle gRPC connection closed %s (%u-%s)",
     420           0 :                 closed_by ? "by peer" : "due to error",
     421           0 :                 h2_err, fd_h2_strerror( h2_err ) ));
     422           0 :   ctx->defer_reset = 1;
     423           0 : }
     424             : 
     425             : /* Forwards a bundle transaction to the tango message bus. */
     426             : 
     427             : static void
     428             : fd_bundle_tile_publish_bundle_txn(
     429             :     fd_bundle_tile_t * ctx,
     430             :     void const *       txn,
     431             :     ulong              txn_sz,  /* <=FD_TXN_MTU */
     432             :     ulong              bundle_txn_cnt
     433           0 : ) {
     434           0 :   if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
     435           0 :     ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
     436           0 :     return;
     437           0 :   }
     438             : 
     439           0 :   fd_txn_m_t * txnm = fd_chunk_to_laddr( ctx->verify_out.mem, ctx->verify_out.chunk );
     440           0 :   *txnm = (fd_txn_m_t) {
     441           0 :     .reference_slot = 0UL,
     442           0 :     .payload_sz     = (ushort)txn_sz,
     443           0 :     .txn_t_sz       = 0,
     444           0 :     .block_engine   = {
     445           0 :       .bundle_id      = ctx->bundle_seq,
     446           0 :       .bundle_txn_cnt = bundle_txn_cnt,
     447           0 :       .commission     = (uchar)ctx->builder_commission
     448           0 :     },
     449           0 :   };
     450           0 :   memcpy( txnm->block_engine.commission_pubkey, ctx->builder_pubkey, 32UL );
     451           0 :   fd_memcpy( fd_txn_m_payload( txnm ), txn, txn_sz );
     452             : 
     453           0 :   ulong sz  = fd_txn_m_realized_footprint( txnm, 0, 0 );
     454           0 :   ulong sig = 1UL;
     455             : 
     456           0 :   if( FD_UNLIKELY( !ctx->stem ) ) {
     457           0 :     FD_LOG_CRIT(( "ctx->stem not set. This is a bug." ));
     458           0 :   }
     459             : 
     460           0 :   ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
     461           0 :   fd_stem_publish( ctx->stem, ctx->verify_out.idx, sig, ctx->verify_out.chunk, sz, 0UL, 0UL, tspub );
     462           0 :   ctx->verify_out.chunk = fd_dcache_compact_next( ctx->verify_out.chunk, sz, ctx->verify_out.chunk0, ctx->verify_out.wmark );
     463           0 :   ctx->metrics.txn_received_cnt++;
     464           0 : }
     465             : 
     466             : /* Forwards a regular transaction to the tango message bus. */
     467             : 
     468             : static void
     469             : fd_bundle_tile_publish_txn(
     470             :     fd_bundle_tile_t * ctx,
     471             :     void const *       txn,
     472             :     ulong              txn_sz  /* <=FD_TXN_MTU */
     473           0 : ) {
     474           0 :   fd_txn_m_t * txnm = fd_chunk_to_laddr( ctx->verify_out.mem, ctx->verify_out.chunk );
     475           0 :   *txnm = (fd_txn_m_t) {
     476           0 :     .reference_slot = 0UL,
     477           0 :     .payload_sz     = (ushort)txn_sz,
     478           0 :     .txn_t_sz       = 0,
     479           0 :     .block_engine   = {
     480           0 :       .bundle_id         = 0UL,
     481           0 :       .bundle_txn_cnt    = 1UL,
     482           0 :       .commission        = 0,
     483           0 :       .commission_pubkey = {0}
     484           0 :     },
     485           0 :   };
     486           0 :   fd_memcpy( fd_txn_m_payload( txnm ), txn, txn_sz );
     487             : 
     488           0 :   ulong sz  = fd_txn_m_realized_footprint( txnm, 0, 0 );
     489           0 :   ulong sig = 0UL;
     490             : 
     491           0 :   if( FD_UNLIKELY( !ctx->stem ) ) {
     492           0 :     FD_LOG_CRIT(( "ctx->stem not set. This is a bug." ));
     493           0 :   }
     494             : 
     495           0 :   ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
     496           0 :   fd_stem_publish( ctx->stem, ctx->verify_out.idx, sig, ctx->verify_out.chunk, sz, 0UL, 0UL, tspub );
     497           0 :   ctx->verify_out.chunk = fd_dcache_compact_next( ctx->verify_out.chunk, sz, ctx->verify_out.chunk0, ctx->verify_out.wmark );
     498           0 :   ctx->metrics.txn_received_cnt++;
     499           0 : }
     500             : 
     501             : /* Called for each transaction in a bundle.  Simply counts up
     502             :    bundle_txn_cnt, but does not publish anything. */
     503             : 
     504             : static bool
     505             : fd_bundle_client_visit_pb_bundle_txn_preflight(
     506             :     pb_istream_t *     istream,
     507             :     pb_field_t const * field,
     508             :     void **            arg
     509           0 : ) {
     510           0 :   (void)istream; (void)field;
     511           0 :   fd_bundle_tile_t * ctx = *arg;
     512           0 :   ctx->bundle_txn_cnt++;
     513           0 :   return true;
     514           0 : }
     515             : 
     516             : /* Called for each transaction in a bundle.  Publishes each transaction
     517             :    to the tango message bus. */
     518             : 
     519             : static bool
     520             : fd_bundle_client_visit_pb_bundle_txn(
     521             :     pb_istream_t *     istream,
     522             :     pb_field_t const * field,
     523             :     void **            arg
     524           0 : ) {
     525           0 :   (void)field;
     526           0 :   fd_bundle_tile_t * ctx = *arg;
     527             : 
     528           0 :   packet_Packet packet = packet_Packet_init_default;
     529           0 :   if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
     530           0 :     ctx->metrics.decode_fail_cnt++;
     531           0 :     FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
     532           0 :     return false;
     533           0 :   }
     534             : 
     535           0 :   if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
     536           0 :     FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
     537           0 :     return true;
     538           0 :   }
     539             : 
     540           0 :   fd_bundle_tile_publish_bundle_txn(
     541           0 :       ctx,
     542           0 :       packet.data.bytes, packet.data.size,
     543           0 :       ctx->bundle_txn_cnt
     544           0 :   );
     545             : 
     546           0 :   return true;
     547           0 : }
     548             : 
     549             : /* Called for each BundleUuid in a SubscribeBundlesResponse. */
     550             : 
     551             : static bool
     552             : fd_bundle_client_visit_pb_bundle_uuid(
     553             :     pb_istream_t *     istream,
     554             :     pb_field_t const * field,
     555             :     void **            arg
     556           0 : ) {
     557           0 :   (void)field;
     558           0 :   fd_bundle_tile_t * ctx = *arg;
     559             : 
     560             :   /* Reset bundle state */
     561             : 
     562           0 :   ctx->bundle_txn_cnt = 0UL;
     563           0 :   ctx->bundle_seq++;
     564             : 
     565             :   /* Do two decode passes.  This is required because we need to know the
     566             :      number of transactions in a bundle ahead of time.  However, due to
     567             :      the Protobuf wire encoding, we don't know the number of txns that
     568             :      will come until we've parsed everything.
     569             : 
     570             :      First pass: Count number of bundles. */
     571             : 
     572           0 :   pb_istream_t peek = *istream;
     573           0 :   bundle_BundleUuid bundle = bundle_BundleUuid_init_default;
     574           0 :   bundle.bundle.packets = (pb_callback_t) {
     575           0 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_txn_preflight,
     576           0 :     .arg          = ctx
     577           0 :   };
     578           0 :   if( FD_UNLIKELY( !pb_decode( &peek, &bundle_BundleUuid_msg, &bundle ) ) ) {
     579           0 :     ctx->metrics.decode_fail_cnt++;
     580           0 :     FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed: %s", peek.errmsg ));
     581           0 :     return false;
     582           0 :   }
     583             : 
     584             :   /* At this opint, ctx->bundle_txn_cnt is correctly set.
     585             :      Second pass: Actually publish bundle packets */
     586             : 
     587           0 :   bundle = (bundle_BundleUuid)bundle_BundleUuid_init_default;
     588           0 :   bundle.bundle.packets = (pb_callback_t) {
     589           0 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_txn,
     590           0 :     .arg          = ctx
     591           0 :   };
     592             : 
     593           0 :   ctx->metrics.bundle_received_cnt++;
     594             : 
     595           0 :   if( FD_UNLIKELY( !pb_decode( istream, &bundle_BundleUuid_msg, &bundle ) ) ) {
     596           0 :     ctx->metrics.decode_fail_cnt++;
     597           0 :     FD_LOG_WARNING(( "Protobuf decode of (bundle.BundleUuid) failed (internal error): %s", istream->errmsg ));
     598           0 :     return false;
     599           0 :   }
     600             : 
     601           0 :   return true;
     602           0 : }
     603             : 
     604             : /* Handle a SubscribeBundlesResponse from a SubscribeBundles gRPC call. */
     605             : 
     606             : static void
     607             : fd_bundle_client_handle_bundle_batch(
     608             :     fd_bundle_tile_t * ctx,
     609             :     pb_istream_t *     istream
     610           0 : ) {
     611           0 :   if( FD_UNLIKELY( !ctx->builder_info_avail ) ) {
     612           0 :     ctx->metrics.missing_builder_info_fail_cnt++; /* unreachable */
     613           0 :     return;
     614           0 :   }
     615             : 
     616           0 :   block_engine_SubscribeBundlesResponse res = block_engine_SubscribeBundlesResponse_init_default;
     617           0 :   res.bundles = (pb_callback_t) {
     618           0 :     .funcs.decode = fd_bundle_client_visit_pb_bundle_uuid,
     619           0 :     .arg          = ctx
     620           0 :   };
     621           0 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribeBundlesResponse_msg, &res ) ) ) {
     622           0 :     ctx->metrics.decode_fail_cnt++;
     623           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribeBundlesResponse) failed: %s", istream->errmsg ));
     624           0 :     return;
     625           0 :   }
     626           0 : }
     627             : 
     628             : /* Called for each 'Packet' (a regular transaction) of a
     629             :    SubscribePacketsResponse. */
     630             : 
     631             : static bool
     632             : fd_bundle_client_visit_pb_packet(
     633             :     pb_istream_t *     istream,
     634             :     pb_field_t const * field,
     635             :     void **            arg
     636           0 : ) {
     637           0 :   (void)field;
     638           0 :   fd_bundle_tile_t * ctx = *arg;
     639             : 
     640           0 :   packet_Packet packet = packet_Packet_init_default;
     641           0 :   if( FD_UNLIKELY( !pb_decode( istream, &packet_Packet_msg, &packet ) ) ) {
     642           0 :     ctx->metrics.decode_fail_cnt++;
     643           0 :     FD_LOG_WARNING(( "Protobuf decode of (packet.Packet) failed" ));
     644           0 :     return false;
     645           0 :   }
     646             : 
     647           0 :   if( FD_UNLIKELY( packet.data.size > FD_TXN_MTU ) ) {
     648           0 :     FD_LOG_WARNING(( "Bundle server delivered an oversize transaction, ignoring" ));
     649           0 :     return true;
     650           0 :   }
     651             : 
     652           0 :   fd_bundle_tile_publish_txn( ctx, packet.data.bytes, packet.data.size );
     653           0 :   ctx->metrics.packet_received_cnt++;
     654             : 
     655           0 :   return true;
     656           0 : }
     657             : 
     658             : /* Handle a SubscribePacketsResponse from a SubscribePackets gRPC call. */
     659             : 
     660             : static void
     661             : fd_bundle_client_handle_packet_batch(
     662             :     fd_bundle_tile_t * ctx,
     663             :     pb_istream_t *     istream
     664           0 : ) {
     665           0 :   block_engine_SubscribePacketsResponse res = block_engine_SubscribePacketsResponse_init_default;
     666           0 :   res.batch.packets = (pb_callback_t) {
     667           0 :     .funcs.decode = fd_bundle_client_visit_pb_packet,
     668           0 :     .arg          = ctx
     669           0 :   };
     670           0 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_SubscribePacketsResponse_msg, &res ) ) ) {
     671           0 :     ctx->metrics.decode_fail_cnt++;
     672           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.SubscribePacketsResponse) failed" ));
     673           0 :     return;
     674           0 :   }
     675           0 : }
     676             : 
     677             : /* Handle a BlockBuilderFeeInfoResponse from a GetBlockBuilderFeeInfo
     678             :    gRPC call. */
     679             : 
     680             : static void
     681             : fd_bundle_client_handle_builder_fee_info(
     682             :     fd_bundle_tile_t * ctx,
     683             :     pb_istream_t *     istream
     684           0 : ) {
     685           0 :   block_engine_BlockBuilderFeeInfoResponse res = block_engine_BlockBuilderFeeInfoResponse_init_default;
     686           0 :   if( FD_UNLIKELY( !pb_decode( istream, &block_engine_BlockBuilderFeeInfoResponse_msg, &res ) ) ) {
     687           0 :     ctx->metrics.decode_fail_cnt++;
     688           0 :     FD_LOG_WARNING(( "Protobuf decode of (block_engine.BlockBuilderFeeInfoResponse) failed" ));
     689           0 :     return;
     690           0 :   }
     691           0 :   if( FD_UNLIKELY( res.commission > 100 ) ) {
     692           0 :     ctx->metrics.decode_fail_cnt++;
     693           0 :     FD_LOG_WARNING(( "BlockBuilderFeeInfoResponse commission out of range (0-100): %lu", res.commission ));
     694           0 :     return;
     695           0 :   }
     696             : 
     697           0 :   ctx->builder_commission = (uchar)res.commission;
     698           0 :   if( FD_UNLIKELY( !fd_base58_decode_32( res.pubkey, ctx->builder_pubkey ) ) ) {
     699           0 :     FD_LOG_HEXDUMP_WARNING(( "Invalid pubkey in BlockBuilderFeeInfoResponse", res.pubkey, strnlen( res.pubkey, sizeof(res.pubkey) ) ));
     700           0 :     return;
     701           0 :   }
     702             : 
     703           0 :   long validity_duration_ticks = (long)( fd_tempo_tick_per_ns( NULL ) * (60e9 * 5.) ); /* 5 minutes */
     704           0 :   ctx->builder_info_avail = 1;
     705           0 :   ctx->builder_info_valid_until_ticks = fd_tickcount() + validity_duration_ticks;
     706           0 : }
     707             : 
     708             : static void
     709             : fd_bundle_client_grpc_tx_complete(
     710             :     void * app_ctx,
     711             :     ulong  request_ctx
     712           0 : ) {
     713           0 :   (void)app_ctx; (void)request_ctx;
     714           0 : }
     715             : 
     716             : static void
     717             : fd_bundle_client_grpc_rx_start(
     718             :     void * app_ctx,
     719             :     ulong  request_ctx
     720           0 : ) {
     721           0 :   fd_bundle_tile_t * ctx = app_ctx;
     722           0 :   switch( request_ctx ) {
     723           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     724           0 :     ctx->packet_subscription_live = 1;
     725           0 :     ctx->packet_subscription_wait = 0;
     726           0 :     break;
     727           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     728           0 :     ctx->bundle_subscription_live = 1;
     729           0 :     ctx->bundle_subscription_wait = 0;
     730           0 :     break;
     731           0 :   }
     732           0 : }
     733             : 
     734             : void
     735             : fd_bundle_client_grpc_rx_msg(
     736             :     void *       app_ctx,
     737             :     void const * protobuf,
     738             :     ulong        protobuf_sz,
     739             :     ulong        request_ctx
     740           0 : ) {
     741           0 :   fd_bundle_tile_t * ctx = app_ctx;
     742           0 :   pb_istream_t istream = pb_istream_from_buffer( protobuf, protobuf_sz );
     743           0 :   switch( request_ctx ) {
     744           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     745           0 :     if( FD_UNLIKELY( !fd_bundle_auther_handle_challenge_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
     746           0 :       ctx->metrics.decode_fail_cnt++;
     747           0 :       fd_bundle_tile_backoff( ctx, fd_tickcount() );
     748           0 :     }
     749           0 :     break;
     750           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     751           0 :     if( FD_UNLIKELY( !fd_bundle_auther_handle_tokens_resp( &ctx->auther, protobuf, protobuf_sz ) ) ) {
     752           0 :       ctx->metrics.decode_fail_cnt++;
     753           0 :       fd_bundle_tile_backoff( ctx, fd_tickcount() );
     754           0 :     }
     755           0 :     break;
     756           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     757           0 :     fd_bundle_client_handle_bundle_batch( ctx, &istream );
     758           0 :     break;
     759           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     760           0 :     fd_bundle_client_handle_packet_batch( ctx, &istream );
     761           0 :     break;
     762           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
     763           0 :     fd_bundle_client_handle_builder_fee_info( ctx, &istream );
     764           0 :     break;
     765           0 :   default:
     766           0 :     FD_LOG_ERR(( "Received unexpected gRPC message (request_ctx=%lu)", request_ctx ));
     767           0 :   }
     768           0 : }
     769             : 
     770             : static void
     771             : fd_bundle_client_request_failed( fd_bundle_tile_t * ctx,
     772           0 :                                  ulong              request_ctx ) {
     773           0 :   fd_bundle_tile_backoff( ctx, fd_tickcount() );
     774           0 :   switch( request_ctx ) {
     775           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     776           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     777           0 :     fd_bundle_auther_handle_request_fail( &ctx->auther );
     778           0 :     break;
     779           0 :   }
     780           0 : }
     781             : 
     782             : static inline int
     783           0 : fd_hex_unhex( int c ) {
     784           0 :   if( c>='0' && c<='9' ) return c-'0';
     785           0 :   if( c>='a' && c<='f' ) return c-'a'+0xa;
     786           0 :   if( c>='A' && c<='F' ) return c-'A'+0xa;
     787           0 :   return -1;
     788           0 : }
     789             : 
     790             : static ulong
     791             : fd_url_unescape( char * const msg,
     792           0 :                  ulong  const len ) {
     793           0 :   char * end = msg+len;
     794           0 :   int state = 0;
     795           0 :   char * dst = msg;
     796           0 :   for( char * src=msg; src<end; src++ ) {
     797             :     /* invariant: p<=msg */
     798           0 :     switch( state ) {
     799           0 :     case 0:
     800           0 :       if( FD_LIKELY( (*src)!='%' ) ) {
     801           0 :         *dst = *src;
     802           0 :         dst++;
     803           0 :       } else {
     804           0 :         state = 1;
     805           0 :       }
     806           0 :       break;
     807           0 :     case 1:
     808           0 :       if( FD_LIKELY( (*src)!='%' ) )  {
     809           0 :         *dst = (char)( ( fd_hex_unhex( *src )&0xf )<<4 );
     810           0 :         state = 2;
     811           0 :       } else {
     812             :         /* FIXME is 'aa%%aa' a valid escape? */
     813           0 :         *(dst++) = '%';
     814           0 :         state = 0;
     815           0 :       }
     816           0 :       break;
     817           0 :     case 2:
     818           0 :       *dst = (char)( (*dst) | ( fd_hex_unhex( *src )&0xf ) );
     819           0 :       dst++;
     820           0 :       state = 0;
     821           0 :       break;
     822           0 :     }
     823           0 :   }
     824           0 :   return (ulong)( dst-msg );
     825           0 : }
     826             : 
     827             : void
     828             : fd_bundle_client_grpc_rx_end(
     829             :     void *                app_ctx,
     830             :     ulong                 request_ctx,
     831             :     fd_grpc_resp_hdrs_t * resp
     832           0 : ) {
     833           0 :   fd_bundle_tile_t * ctx = app_ctx;
     834           0 :   if( FD_UNLIKELY( resp->h2_status!=200 ) ) {
     835           0 :     FD_LOG_WARNING(( "gRPC request failed (HTTP status %u)", resp->h2_status ));
     836           0 :     fd_bundle_client_request_failed( ctx, request_ctx );
     837           0 :     return;
     838           0 :   }
     839             : 
     840           0 :   resp->grpc_msg_len = (uint)fd_url_unescape( resp->grpc_msg, resp->grpc_msg_len );
     841           0 :   if( !resp->grpc_msg_len ) {
     842           0 :     fd_memcpy( resp->grpc_msg, "unknown error", 13 );
     843           0 :     resp->grpc_msg_len = 13;
     844           0 :   }
     845             : 
     846           0 :   switch( request_ctx ) {
     847           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     848           0 :     ctx->packet_subscription_live = 0;
     849           0 :     ctx->packet_subscription_wait = 0;
     850           0 :     fd_bundle_tile_backoff( ctx, fd_tickcount() );
     851           0 :     ctx->defer_reset = 1;
     852           0 :     FD_LOG_INFO(( "SubscribePackets stream failed (gRPC status %u-%s). Reconnecting ...",
     853           0 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
     854           0 :     return;
     855           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     856           0 :     ctx->bundle_subscription_live = 0;
     857           0 :     ctx->bundle_subscription_wait = 0;
     858           0 :     fd_bundle_tile_backoff( ctx, fd_tickcount() );
     859           0 :     ctx->defer_reset = 1;
     860           0 :     FD_LOG_INFO(( "SubscribeBundles stream failed (gRPC status %u-%s). Reconnecting ...",
     861           0 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ) ));
     862           0 :     return;
     863           0 :   default:
     864           0 :     break;
     865           0 :   }
     866             : 
     867           0 :   if( FD_UNLIKELY( resp->grpc_status!=FD_GRPC_STATUS_OK ) ) {
     868           0 :     FD_LOG_INFO(( "gRPC request failed (gRPC status %u-%s): %.*s",
     869           0 :                   resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
     870           0 :                   (int)resp->grpc_msg_len, resp->grpc_msg ));
     871           0 :     fd_bundle_client_request_failed( ctx, request_ctx );
     872           0 :     if( resp->grpc_status==FD_GRPC_STATUS_UNAUTHENTICATED ||
     873           0 :         resp->grpc_status==FD_GRPC_STATUS_PERMISSION_DENIED ) {
     874           0 :       fd_bundle_auther_reset( &ctx->auther );
     875           0 :     }
     876           0 :     return;
     877           0 :   }
     878           0 : }
     879             : 
     880             : void
     881             : fd_bundle_client_grpc_rx_timeout(
     882             :     void * app_ctx,
     883             :     ulong  request_ctx,  /* FD_BUNDLE_CLIENT_REQ_{...} */
     884             :     int    deadline_kind /* FD_GRPC_DEADLINE_{HEADER|RX_END} */
     885           0 : ) {
     886           0 :   (void)deadline_kind;
     887           0 :   FD_LOG_WARNING(( "Request timed out %s", fd_bundle_request_ctx_cstr( request_ctx ) ));
     888           0 :   fd_bundle_tile_t * ctx = app_ctx;
     889           0 :   ctx->defer_reset = 1;
     890           0 : }
     891             : 
     892             : static void
     893           0 : fd_bundle_client_grpc_ping_ack( void * app_ctx ) {
     894           0 :   fd_bundle_tile_t * ctx = app_ctx;
     895           0 :   ctx->last_ping_rx_ts = fd_tickcount();
     896           0 :   ctx->metrics.ping_ack_cnt++;
     897           0 :   long rtt_sample = fd_log_wallclock() - ctx->last_ping_tx_nanos;
     898           0 :   fd_rtt_sample( ctx->rtt, (float)rtt_sample, 0 );
     899           0 : }
     900             : 
     901             : fd_grpc_client_callbacks_t fd_bundle_client_grpc_callbacks = {
     902             :   .conn_established = fd_bundle_client_grpc_conn_established,
     903             :   .conn_dead        = fd_bundle_client_grpc_conn_dead,
     904             :   .tx_complete      = fd_bundle_client_grpc_tx_complete,
     905             :   .rx_start         = fd_bundle_client_grpc_rx_start,
     906             :   .rx_msg           = fd_bundle_client_grpc_rx_msg,
     907             :   .rx_end           = fd_bundle_client_grpc_rx_end,
     908             :   .rx_timeout       = fd_bundle_client_grpc_rx_timeout,
     909             :   .ping_ack         = fd_bundle_client_grpc_ping_ack,
     910             : };
     911             : 
     912             : /* Decrease verbosity */
     913           0 : #define DISCONNECTED FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_DISCONNECTED
     914           0 : #define CONNECTING   FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTING
     915           0 : #define CONNECTED    FD_PLUGIN_MSG_BLOCK_ENGINE_UPDATE_STATUS_CONNECTED
     916             : 
     917             : int
     918           0 : fd_bundle_client_status( fd_bundle_tile_t const * ctx ) {
     919           0 :   if( FD_UNLIKELY( ( !ctx->tcp_sock_connected ) |
     920           0 :                    ( !ctx->grpc_client        ) ) ) {
     921           0 :     return DISCONNECTED;
     922           0 :   }
     923             : 
     924           0 :   fd_h2_conn_t * conn = fd_grpc_client_h2_conn( ctx->grpc_client );
     925           0 :   if( FD_UNLIKELY( !conn ) ) {
     926           0 :     return DISCONNECTED; /* no conn */
     927           0 :   }
     928           0 :   if( FD_UNLIKELY( conn->flags &
     929           0 :       ( FD_H2_CONN_FLAGS_DEAD |
     930           0 :         FD_H2_CONN_FLAGS_SEND_GOAWAY ) ) ) {
     931           0 :     return DISCONNECTED;
     932           0 :   }
     933             : 
     934           0 :   if( FD_UNLIKELY( conn->flags &
     935           0 :       ( FD_H2_CONN_FLAGS_CLIENT_INITIAL      |
     936           0 :         FD_H2_CONN_FLAGS_WAIT_SETTINGS_ACK_0 |
     937           0 :         FD_H2_CONN_FLAGS_WAIT_SETTINGS_0     |
     938           0 :         FD_H2_CONN_FLAGS_SERVER_INITIAL ) ) ) {
     939           0 :     return CONNECTING; /* connection is not ready */
     940           0 :   }
     941             : 
     942           0 :   if( FD_UNLIKELY( ctx->auther.state != FD_BUNDLE_AUTH_STATE_DONE_WAIT ) ) {
     943           0 :     return CONNECTING; /* not authenticated */
     944           0 :   }
     945             : 
     946           0 :   if( FD_UNLIKELY( ( !ctx->builder_info_avail       ) |
     947           0 :                    ( !ctx->packet_subscription_live ) |
     948           0 :                    ( !ctx->bundle_subscription_live ) ) ) {
     949           0 :     return CONNECTING; /* not fully connected */
     950           0 :   }
     951             : 
     952           0 :   long ping_timeout = (long)( 3UL * ctx->ping_threshold_ticks );
     953           0 :   if( FD_UNLIKELY( fd_tickcount() > ctx->last_ping_rx_ts + ping_timeout ) ) {
     954           0 :     return DISCONNECTED; /* possible timeout */
     955           0 :   }
     956             : 
     957           0 :   if( FD_UNLIKELY( !fd_grpc_client_is_connected( ctx->grpc_client ) ) ) {
     958           0 :     return DISCONNECTED;
     959           0 :   }
     960             : 
     961             :   /* As far as we know, the bundle connection is alive and well. */
     962           0 :   return CONNECTED;
     963           0 : }
     964             : 
     965             : #undef DISCONNECTED
     966             : #undef CONNECTING
     967             : #undef CONNECTED
     968             : 
     969             : FD_FN_CONST char const *
     970           0 : fd_bundle_request_ctx_cstr( ulong request_ctx ) {
     971           0 :   switch( request_ctx ) {
     972           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthChallenge:
     973           0 :     return "GenerateAuthChallenge";
     974           0 :   case FD_BUNDLE_CLIENT_REQ_Auth_GenerateAuthTokens:
     975           0 :     return "GenerateAuthTokens";
     976           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribePackets:
     977           0 :     return "SubscribePackets";
     978           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_SubscribeBundles:
     979           0 :     return "SubscribeBundles";
     980           0 :   case FD_BUNDLE_CLIENT_REQ_Bundle_GetBlockBuilderFeeInfo:
     981           0 :     return "GetBlockBuilderFeeInfo";
     982           0 :   default:
     983           0 :     return "unknown";
     984           0 :   }
     985           0 : }

Generated by: LCOV version 1.14