LCOV - code coverage report
Current view: top level - disco/events - fd_event_client.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 0 539 0.0 %
Date: 2026-06-29 05:51:35 Functions: 0 26 0.0 %

          Line data    Source code
       1             : #define _GNU_SOURCE
       2             : #include "fd_event_client.h"
       3             : 
       4             : #include "../../waltz/resolv/fd_netdb.h"
       5             : #include "../../waltz/http/fd_url.h"
       6             : #include "../../waltz/grpc/fd_grpc_client.h"
       7             : #include "../../waltz/grpc/fd_grpc_client_private.h"
       8             : #include "../../ballet/pb/fd_pb_tokenize.h"
       9             : #include "../../ballet/pb/fd_pb_encode.h"
      10             : #include "../../ballet/hex/fd_hex.h"
      11             : #include "../../util/net/fd_ip4.h"
      12             : #include "../../util/log/fd_log.h"
      13             : #include "../keyguard/fd_keyguard.h"
      14             : 
      15             : #if FD_HAS_OPENSSL
      16             : #include "../../waltz/openssl/fd_openssl.h"
      17             : #include <openssl/ssl.h>
      18             : #include <openssl/err.h>
      19             : #endif
      20             : 
      21             : #include <netinet/tcp.h>
      22             : #include <unistd.h>
      23             : #include <errno.h>
      24             : #include <sys/socket.h>
      25             : #include <netinet/in.h>
      26             : 
      27           0 : #define DISCONNECT_REASON_IDENTITY_CHANGED   (0)
      28           0 : #define DISCONNECT_REASON_CONNECT_FAILED     (1)
      29           0 : #define DISCONNECT_REASON_DNS_RESOLVE_FAILED (2)
      30           0 : #define DISCONNECT_REASON_TIMEOUT            (3)
      31           0 : #define DISCONNECT_REASON_TRANSPORT_FAILED   (4)
      32           0 : #define DISCONNECT_REASON_PEER_CLOSED        (5)
      33           0 : #define DISCONNECT_REASON_INVALID_CURSOR     (6)
      34           0 : #define DISCONNECT_REASON_AUTH_FAILED        (7)
      35           0 : #define DISCONNECT_REASON_INVALID_PROTOBUF   (8)
      36             : 
      37           0 : #define FD_EVENT_CLIENT_REQ_CTX_AUTHENTICATE  (1UL)
      38             : #define FD_EVENT_CLIENT_REQ_CTX_CONFIRM_AUTH  (2UL)
      39           0 : #define FD_EVENT_CLIENT_REQ_CTX_STREAM_EVENTS (3UL)
      40             : 
      41           0 : #define FD_EVENT_CLIENT_TOKEN_SZ (217UL)
      42             : 
      43             : struct fd_event_client {
      44             :   fd_grpc_client_t * grpc_client;
      45             :   fd_grpc_client_metrics_t grpc_metrics[1];
      46             :   fd_grpc_h2_stream_t * event_stream;
      47             : 
      48             :   char client_version[ 10UL ];
      49             :   char commit_hash[ 41UL ];
      50             :   char action[ 16UL ];
      51             :   uchar identity_pubkey[ 32UL ];
      52             : 
      53             :   int       has_genesis_hash;
      54             :   fd_hash_t genesis_hash[1];
      55             : 
      56             :   ushort has_shred_version;
      57             :   ushort shred_version;
      58             : 
      59             :   ulong event_id;
      60             : 
      61             :   ulong instance_id;
      62             :   ulong boot_id;
      63             :   ulong machine_id;
      64             : 
      65             :   int defer_disconnect;
      66             :   ulong consecutive_failure_count;
      67             : 
      68             :   int auth_send_pending;
      69             : 
      70             :   ulong state;
      71             :   union {
      72             :     struct {
      73             :       long reconnect_deadline;
      74             :     } disconnected;
      75             : 
      76             :     struct {
      77             :       long connect_deadline;
      78             :     } connecting;
      79             : 
      80             :     struct {
      81             :       long connected_timestamp;
      82             :     } connected;
      83             :   };
      84             : 
      85             :   int so_sndbuf;
      86             :   int sockfd;
      87             : 
      88             :   int    use_tls;
      89             : #if FD_HAS_OPENSSL
      90             :   SSL_CTX * ssl_ctx;
      91             :   SSL     * ssl;
      92             : #endif
      93             : 
      94             :   /* wallclock deadline for auth handshake, LONG_MAX if not
      95             :      authenticating. */
      96             :   long auth_deadline;
      97             : 
      98             :   char   server_fqdn[ 256 ]; /* cstr */
      99             :   ulong  server_fqdn_len;
     100             :   uint   server_ip4_addr;
     101             :   ushort server_tcp_port;
     102             : 
     103             :   fd_rng_t * rng;
     104             :   fd_circq_t * circq;
     105             :   fd_keyguard_client_t * keyguard_client;
     106             : 
     107             :   /* Stateless-auth bearer value: "hex(challenge_token).hex(signature)",
     108             :      built from the challenge token returned by Authenticate and our
     109             :      ed25519 signature over it.  Presented as the `authorization: Bearer
     110             :      <...>` header on the StreamEvents request. */
     111             :   char  auth_bearer[ 2UL*FD_EVENT_CLIENT_TOKEN_SZ + 1UL + 2UL*64UL + 1UL ];
     112             :   ulong auth_bearer_len;
     113             : 
     114             :   fd_event_client_metrics_t metrics;
     115             : };
     116             : 
     117             : FD_FN_CONST ulong
     118           0 : fd_event_client_align( void ) {
     119           0 :   return alignof( fd_event_client_t );
     120           0 : }
     121             : 
     122             : FD_FN_CONST ulong
     123           0 : fd_event_client_footprint( ulong buf_max ) {
     124           0 :   ulong l;
     125           0 :   l = FD_LAYOUT_INIT;
     126           0 :   l = FD_LAYOUT_APPEND( l, alignof(fd_event_client_t), sizeof(fd_event_client_t)           );
     127           0 :   l = FD_LAYOUT_APPEND( l, fd_grpc_client_align(),     fd_grpc_client_footprint( buf_max ) );
     128           0 :   return FD_LAYOUT_FINI( l, alignof(fd_event_client_t) );
     129           0 : }
     130             : 
     131             : void *
     132             : fd_event_client_new( void *                 shmem,
     133             :                      fd_keyguard_client_t * keyguard_client,
     134             :                      fd_rng_t *             rng,
     135             :                      fd_circq_t *           circq,
     136             :                      int                    so_sndbuf,
     137             :                      char const *           _url,
     138             :                      uchar const *          identity_pubkey,
     139             :                      char const *           client_version,
     140             :                      char const *           commit_hash,
     141             :                      char const *           action,
     142             :                      ulong                  instance_id,
     143             :                      ulong                  boot_id,
     144             :                      ulong                  machine_id,
     145             :                      ulong                  buf_max,
     146             :                      int                    use_tls,
     147           0 :                      void *                 ssl_ctx ) {
     148           0 :   if( FD_UNLIKELY( !shmem ) ) {
     149           0 :     FD_LOG_WARNING(( "NULL shmem" ));
     150           0 :     return NULL;
     151           0 :   }
     152             : 
     153           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shmem, fd_event_client_align() ) ) ) {
     154           0 :     FD_LOG_WARNING(( "misaligned shmem" ));
     155           0 :     return NULL;
     156           0 :   }
     157             : 
     158           0 :   FD_SCRATCH_ALLOC_INIT( l, shmem );
     159           0 :   fd_event_client_t * client = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_event_client_t), sizeof(fd_event_client_t)          );
     160           0 :   void * grpc_client_mem     = FD_SCRATCH_ALLOC_APPEND( l, fd_grpc_client_align(),     fd_grpc_client_footprint( buf_max ) );
     161             : 
     162           0 :   fd_url_t url[1];
     163           0 :   _Bool _is_ssl = 0;
     164           0 :   if( FD_UNLIKELY( fd_url_parse_endpoint( url,
     165           0 :                                           _url,
     166           0 :                                           strlen( _url ),
     167           0 :                                           &client->server_tcp_port,
     168           0 :                                           &_is_ssl,
     169           0 :                                           "[tiles.event.url]" ) ) ) {
     170           0 :     FD_LOG_ERR(( "Could not parse [tiles.event.url]" ));
     171           0 :   }
     172           0 :   if( FD_UNLIKELY( url->host_len > 255 ) ) {
     173           0 :     FD_LOG_CRIT(( "Invalid url->host_len" )); /* unreachable */
     174           0 :   }
     175           0 :   fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( client->server_fqdn ), url->host, url->host_len ) );
     176           0 :   client->server_fqdn_len = url->host_len;
     177             : 
     178           0 :   fd_memcpy( client->identity_pubkey, identity_pubkey, 32UL );
     179           0 :   strncpy( client->client_version, client_version, sizeof( client->client_version ) );
     180           0 :   client->client_version[ sizeof( client->client_version ) - 1UL ] = '\0';
     181           0 :   fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( client->commit_hash ), commit_hash, fd_ulong_min( strlen( commit_hash ), sizeof( client->commit_hash )-1UL ) ) );
     182           0 :   fd_cstr_ncpy( client->action, action, sizeof( client->action ) );
     183             : 
     184           0 :   client->event_id = 0UL;
     185             : 
     186           0 :   client->instance_id = instance_id;
     187           0 :   client->boot_id     = boot_id;
     188           0 :   client->machine_id  = machine_id;
     189             : 
     190           0 :   client->has_genesis_hash = 0;
     191           0 :   client->has_shred_version = 0;
     192             : 
     193           0 :   client->so_sndbuf = so_sndbuf;
     194           0 :   client->sockfd = -1;
     195           0 :   client->use_tls = use_tls;
     196           0 : #if FD_HAS_OPENSSL
     197           0 :   client->ssl_ctx = (SSL_CTX *)ssl_ctx;
     198           0 :   client->ssl = NULL;
     199             : #else
     200             :   (void)ssl_ctx;
     201             :   if( FD_UNLIKELY( use_tls ) ) {
     202             :     FD_LOG_ERR(( "TLS requested for event service but this build does not include OpenSSL. "
     203             :                  "To install OpenSSL, re-run ./deps.sh and do a clean rebuild." ));
     204             :   }
     205             : #endif
     206           0 :   client->auth_deadline = LONG_MAX;
     207           0 :   client->auth_send_pending = 0;
     208           0 :   client->state = FD_EVENT_CLIENT_STATE_DISCONNECTED;
     209           0 :   client->disconnected.reconnect_deadline = 0L;
     210             : 
     211           0 :   client->defer_disconnect = INT_MAX;
     212           0 :   client->consecutive_failure_count = 7UL; /* Start high, so if server is down we don't keep retrying on boot */
     213             : 
     214           0 :   client->circq = circq;
     215           0 :   client->rng = rng;
     216           0 :   client->keyguard_client = keyguard_client;
     217             : 
     218           0 :   extern fd_grpc_client_callbacks_t fd_event_client_grpc_callbacks;
     219           0 :   client->grpc_client = fd_grpc_client_new( grpc_client_mem, &fd_event_client_grpc_callbacks, client->grpc_metrics, client, buf_max, fd_rng_ulong( rng ) );
     220           0 :   FD_TEST( client->grpc_client );
     221             : 
     222           0 :   memset( &client->metrics, 0, sizeof(client->metrics) );
     223           0 :   memset( client->grpc_metrics, 0, sizeof(fd_grpc_client_metrics_t) );
     224             : 
     225           0 :   fd_grpc_client_set_version( client->grpc_client, client->client_version, strlen( client->client_version ) );
     226           0 :   fd_grpc_client_set_authority( client->grpc_client, client->server_fqdn, client->server_fqdn_len, client->server_tcp_port );
     227             : 
     228           0 :   return (void *)client;
     229           0 : }
     230             : 
     231             : fd_event_client_t *
     232           0 : fd_event_client_join( void * shec ) {
     233           0 :   if( FD_UNLIKELY( !shec ) ) {
     234           0 :     FD_LOG_WARNING(( "NULL shec" ));
     235           0 :     return NULL;
     236           0 :   }
     237             : 
     238           0 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)shec, fd_event_client_align() ) ) ) {
     239           0 :     FD_LOG_WARNING(( "misaligned shec" ));
     240           0 :     return NULL;
     241           0 :   }
     242             : 
     243           0 :   fd_event_client_t * client = (fd_event_client_t *)shec;
     244             : 
     245           0 :   return client;
     246           0 : }
     247             : 
     248             : fd_event_client_metrics_t const *
     249           0 : fd_event_client_metrics( fd_event_client_t const * client ) {
     250             :   /* Update bytes from grpc metrics */
     251           0 :   ((fd_event_client_t *)client)->metrics.bytes_written = client->grpc_metrics->stream_chunks_tx_bytes;
     252           0 :   ((fd_event_client_t *)client)->metrics.bytes_read = client->grpc_metrics->stream_chunks_rx_bytes;
     253           0 :   return &client->metrics;
     254           0 : }
     255             : 
     256             : ulong
     257           0 : fd_event_client_state( fd_event_client_t const * client ) {
     258           0 :   return client->state;
     259           0 : }
     260             : 
     261             : ulong
     262           0 : fd_event_client_id_reserve( fd_event_client_t * client ) {
     263           0 :   return client->event_id++;
     264           0 : }
     265             : 
     266             : void
     267             : fd_event_client_init_genesis( fd_event_client_t *       client,
     268           0 :                               fd_genesis_meta_t const * meta ) {
     269           0 :   *client->genesis_hash = meta->genesis_hash;
     270           0 :   client->has_genesis_hash = 1;
     271           0 : }
     272             : 
     273             : void
     274             : fd_event_client_init_shred_version( fd_event_client_t * client,
     275           0 :                                     ushort              shred_version ) {
     276           0 :   client->shred_version = shred_version;
     277           0 :   client->has_shred_version = 1;
     278           0 : }
     279             : 
     280             : static void
     281           0 : backoff( fd_event_client_t * client ) {
     282           0 :   long now = fd_log_wallclock();
     283           0 :   ulong backoff_base = 1UL << fd_ulong_min( client->consecutive_failure_count, 7UL ); /* max 4 mins */
     284           0 :   ulong backoff_jitter = fd_rng_ulong_roll( client->rng, backoff_base );
     285           0 :   client->disconnected.reconnect_deadline = now + (long)( backoff_base + backoff_jitter )*(long)1e9;
     286           0 :   if( FD_UNLIKELY( client->consecutive_failure_count < 8UL ) ) client->consecutive_failure_count++;
     287           0 : }
     288             : 
     289             : static void
     290             : disconnect( fd_event_client_t * client,
     291             :             int                 reason,
     292             :             int                 err,
     293           0 :             int                 _backoff ) {
     294           0 : #if FD_HAS_OPENSSL
     295           0 :   if( FD_UNLIKELY( client->ssl ) ) {
     296           0 :     SSL_free( client->ssl );
     297           0 :     client->ssl = NULL;
     298           0 :   }
     299           0 : #endif
     300           0 :   if( FD_LIKELY( -1!=client->sockfd ) ) {
     301           0 :     if( FD_UNLIKELY( -1==close( client->sockfd ) ) ) FD_LOG_ERR(( "close() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
     302           0 :     client->sockfd = -1;
     303           0 :     client->state = FD_EVENT_CLIENT_STATE_DISCONNECTED;
     304           0 :     fd_circq_reset_cursor( client->circq );
     305           0 :   }
     306             : 
     307           0 :   client->event_stream = NULL;
     308           0 :   client->auth_deadline = LONG_MAX;
     309           0 :   client->auth_send_pending = 0;
     310             : 
     311           0 :   client->auth_bearer[ 0 ] = '\0';
     312           0 :   client->auth_bearer_len  = 0UL;
     313             : 
     314           0 :   switch( reason ) {
     315           0 :     case DISCONNECT_REASON_IDENTITY_CHANGED:
     316           0 :       FD_LOG_INFO(( "disconnected: identity changed" ));
     317           0 :       break;
     318           0 :     case DISCONNECT_REASON_CONNECT_FAILED:
     319           0 :       FD_LOG_WARNING(( "connecting to " FD_IP4_ADDR_FMT ":%u failed (%i-%s)", FD_IP4_ADDR_FMT_ARGS( client->server_ip4_addr ), client->server_tcp_port, errno, fd_io_strerror( errno ) ));
     320           0 :       client->metrics.transport_fail_cnt++;
     321           0 :       break;
     322           0 :     case DISCONNECT_REASON_DNS_RESOLVE_FAILED:
     323           0 :       FD_LOG_WARNING(( "failed to resolve host `%.*s` (%d-%s)", (int)client->server_fqdn_len, client->server_fqdn, err, fd_gai_strerror( err ) ));
     324           0 :       client->metrics.transport_fail_cnt++;
     325           0 :       break;
     326           0 :     case DISCONNECT_REASON_TIMEOUT:
     327           0 :       FD_LOG_INFO(( "connection failed: timeout" ));
     328           0 :       client->metrics.transport_fail_cnt++;
     329           0 :       break;
     330           0 :     case DISCONNECT_REASON_TRANSPORT_FAILED:
     331           0 :       FD_LOG_WARNING(( "disconnected: transport failed (%d-%s)", err, fd_io_strerror( err ) ));
     332           0 :       client->metrics.transport_fail_cnt++;
     333           0 :       break;
     334           0 :     case DISCONNECT_REASON_PEER_CLOSED:
     335           0 :       FD_LOG_WARNING(( "disconnected: peer closed connection" ));
     336           0 :       client->metrics.transport_fail_cnt++;
     337           0 :       break;
     338           0 :     case DISCONNECT_REASON_INVALID_CURSOR:
     339           0 :       FD_LOG_WARNING(( "disconnected: invalid cursor" ));
     340           0 :       client->metrics.transport_fail_cnt++;
     341           0 :       break;
     342           0 :     case DISCONNECT_REASON_AUTH_FAILED:
     343           0 :       FD_LOG_WARNING(( "disconnected: authentication failed" ));
     344           0 :       client->metrics.transport_fail_cnt++;
     345           0 :       break;
     346           0 :     case DISCONNECT_REASON_INVALID_PROTOBUF:
     347           0 :       FD_LOG_WARNING(( "disconnected: invalid protobuf message received" ));
     348           0 :       client->metrics.transport_fail_cnt++;
     349           0 :       break;
     350           0 :     default:
     351           0 :       FD_LOG_WARNING(( "disconnected: unknown reason %d", reason ));
     352           0 :       client->metrics.transport_fail_cnt++;
     353           0 :       break;
     354           0 :   }
     355             : 
     356           0 :   if( FD_LIKELY( _backoff ) ) backoff( client );
     357           0 : }
     358             : 
     359             : void
     360             : fd_event_client_set_identity( fd_event_client_t * client,
     361           0 :                               uchar const *       identity_pubkey ) {
     362           0 :   fd_memcpy( client->identity_pubkey, identity_pubkey, 32UL );
     363           0 :   disconnect( client, DISCONNECT_REASON_IDENTITY_CHANGED, 0, 0 );
     364           0 : }
     365             : 
     366             : static void
     367             : reconnect( fd_event_client_t * client,
     368           0 :            int *               charge_busy ) {
     369           0 :   FD_TEST( client->state==FD_EVENT_CLIENT_STATE_DISCONNECTED );
     370             : 
     371           0 :   long now = fd_log_wallclock();
     372           0 :   if( FD_UNLIKELY( now<client->disconnected.reconnect_deadline ) ) return;
     373             : 
     374           0 :   *charge_busy = 1;
     375           0 :   client->metrics.connect_attempt_cnt++;
     376             : 
     377           0 :   FD_LOG_INFO(( "connecting to event server %s://%.*s:%u", client->use_tls ? "https" : "http", (int)client->server_fqdn_len, client->server_fqdn, client->server_tcp_port ));
     378             : 
     379             :   /* FIXME IPv6 support */
     380           0 :   fd_addrinfo_t hints = {0};
     381           0 :   hints.ai_family = AF_INET;
     382           0 :   fd_addrinfo_t * res = NULL;
     383           0 :   uchar scratch[ 4096 ];
     384           0 :   void * pscratch = scratch;
     385           0 :   int err = fd_getaddrinfo( client->server_fqdn, &hints, &res, &pscratch, sizeof(scratch) );
     386           0 :   if( FD_UNLIKELY( err ) ) {
     387           0 :     disconnect( client, DISCONNECT_REASON_DNS_RESOLVE_FAILED, err, 1 );
     388           0 :     return;
     389           0 :   }
     390             : 
     391           0 :   if( FD_UNLIKELY( !res || !res->ai_addr ) ) {
     392           0 :     disconnect( client, DISCONNECT_REASON_DNS_RESOLVE_FAILED, 0, 1 );
     393           0 :     return;
     394           0 :   }
     395             : 
     396           0 :   uint const ip4_addr = ((struct sockaddr_in *)res->ai_addr)->sin_addr.s_addr;
     397           0 :   client->server_ip4_addr = ip4_addr;
     398             : 
     399           0 :   client->sockfd = socket( AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0 );
     400           0 :   if( FD_UNLIKELY( -1==client->sockfd ) ) FD_LOG_ERR(( "socket() failed (%d-%s)", errno, fd_io_strerror( errno ) ));
     401             : 
     402           0 :   struct sockaddr_in addr;
     403           0 :   fd_memset( &addr, 0, sizeof( addr ) );
     404           0 :   addr.sin_family = AF_INET;
     405           0 :   addr.sin_port   = fd_ushort_bswap( client->server_tcp_port );
     406           0 :   addr.sin_addr.s_addr = ip4_addr;
     407             : 
     408           0 :   int tcp_nodelay = 1;
     409           0 :   if( FD_UNLIKELY( -1==setsockopt( client->sockfd, SOL_TCP, TCP_NODELAY, &tcp_nodelay, sizeof(int) ) ) ) FD_LOG_ERR(( "setsockopt failed (%d-%s)", errno, fd_io_strerror( errno ) ));
     410           0 :   if( FD_UNLIKELY( -1==setsockopt( client->sockfd, SOL_SOCKET, SO_SNDBUF, &client->so_sndbuf, sizeof(int) ) ) ) FD_LOG_ERR(( "setsockopt(SOL_SOCKET,SO_SNDBUF,%i) failed (%i-%s)", client->so_sndbuf, errno, fd_io_strerror( errno ) ));
     411             : 
     412           0 :   if( FD_UNLIKELY( -1==connect( client->sockfd, fd_type_pun_const( &addr ), sizeof(struct sockaddr_in) ) && errno!=EINPROGRESS ) ) {
     413           0 :     disconnect( client, DISCONNECT_REASON_CONNECT_FAILED, errno, 1 );
     414           0 :     return;
     415           0 :   }
     416             : 
     417           0 : # if FD_HAS_OPENSSL
     418           0 :   if( client->use_tls ) {
     419           0 :     BIO * bio = fd_openssl_bio_new_socket( client->sockfd, BIO_NOCLOSE );
     420           0 :     if( FD_UNLIKELY( !bio ) ) {
     421           0 :       FD_LOG_WARNING(( "fd_openssl_bio_new_socket failed" ));
     422           0 :       disconnect( client, DISCONNECT_REASON_CONNECT_FAILED, 0, 1 );
     423           0 :       return;
     424           0 :     }
     425             : 
     426           0 :     SSL * ssl = SSL_new( client->ssl_ctx );
     427           0 :     if( FD_UNLIKELY( !ssl ) ) {
     428           0 :       FD_LOG_WARNING(( "SSL_new failed" ));
     429           0 :       BIO_free( bio );
     430           0 :       disconnect( client, DISCONNECT_REASON_CONNECT_FAILED, 0, 1 );
     431           0 :       return;
     432           0 :     }
     433             : 
     434           0 :     SSL_set_bio( ssl, bio, bio ); /* moves ownership of bio */
     435           0 :     SSL_set_connect_state( ssl );
     436             : 
     437             :     /* SNI and hostname verification */
     438           0 :     if( FD_UNLIKELY( !SSL_set_tlsext_host_name( ssl, client->server_fqdn ) ) ) {
     439           0 :       FD_LOG_WARNING(( "SSL_set_tlsext_host_name failed" ));
     440           0 :       SSL_free( ssl );
     441           0 :       disconnect( client, DISCONNECT_REASON_CONNECT_FAILED, 0, 1 );
     442           0 :       return;
     443           0 :     }
     444           0 :     if( FD_UNLIKELY( !SSL_set1_host( ssl, client->server_fqdn ) ) ) {
     445           0 :       FD_LOG_WARNING(( "SSL_set1_host failed" ));
     446           0 :       SSL_free( ssl );
     447           0 :       disconnect( client, DISCONNECT_REASON_CONNECT_FAILED, 0, 1 );
     448           0 :       return;
     449           0 :     }
     450             : 
     451           0 :     client->ssl = ssl;
     452           0 :   }
     453           0 : # endif /* FD_HAS_OPENSSL */
     454             : 
     455           0 :   fd_grpc_client_reset( client->grpc_client );
     456             : 
     457           0 :   client->state = FD_EVENT_CLIENT_STATE_CONNECTING;
     458           0 :   client->connecting.connect_deadline = now+(long)1L*(long)1e9; /* 1 second to connect */
     459           0 : }
     460             : 
     461             : static int
     462           0 : fd_event_client_try_send_authenticate( fd_event_client_t * client ) {
     463           0 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( client->grpc_client ) ) ) return 0;
     464           0 :   if( FD_UNLIKELY( fd_grpc_client_request_stream_busy( client->grpc_client ) ) ) return 0;
     465             : 
     466           0 :   fd_pb_encoder_t auth_req[1];
     467           0 :   uchar buffer[ 256UL ];
     468           0 :   fd_pb_encoder_init( auth_req, buffer, sizeof(buffer) );
     469             : 
     470           0 :   fd_pb_push_bytes( auth_req, 1U, client->identity_pubkey, 32UL );
     471           0 :   fd_pb_push_string( auth_req, 2U, client->client_version, strlen( client->client_version ) );
     472           0 :   fd_pb_push_string( auth_req, 3U, client->commit_hash, strlen( client->commit_hash ) );
     473           0 :   fd_pb_push_bytes( auth_req, 4U, client->genesis_hash, 32UL );
     474           0 :   fd_pb_push_uint64( auth_req, 5U, client->shred_version );
     475           0 :   fd_pb_push_uint64( auth_req, 6U, client->instance_id );
     476           0 :   fd_pb_push_uint64( auth_req, 7U, client->machine_id );
     477           0 :   fd_pb_push_uint64( auth_req, 8U, client->boot_id );
     478           0 :   fd_pb_push_string( auth_req, 9U, client->action, strlen( client->action ) );
     479             : 
     480           0 :   fd_grpc_h2_stream_t * stream = fd_grpc_client_request_start1(
     481           0 :       client->grpc_client,
     482           0 :       "/events.v1.EventService/Authenticate", strlen("/events.v1.EventService/Authenticate"),
     483           0 :       FD_EVENT_CLIENT_REQ_CTX_AUTHENTICATE,
     484           0 :       buffer, fd_pb_encoder_out_sz( auth_req ),
     485           0 :       NULL, 0UL,
     486           0 :       0 /* not streaming */ );
     487             : 
     488           0 :   if( FD_UNLIKELY( !stream ) ) return 0;
     489             : 
     490           0 :   long now = fd_log_wallclock();
     491           0 :   fd_grpc_client_deadline_set( stream, FD_GRPC_DEADLINE_HEADER, now+(long)2e9 );
     492           0 :   fd_grpc_client_deadline_set( stream, FD_GRPC_DEADLINE_RX_END, now+(long)2e9 );
     493             : 
     494           0 :   client->auth_send_pending = 0;
     495           0 :   FD_LOG_INFO(( "Requesting auth challenge from event server " FD_IP4_ADDR_FMT ":%u (%.*s)",
     496           0 :                 FD_IP4_ADDR_FMT_ARGS( client->server_ip4_addr ), client->server_tcp_port,
     497           0 :                 (int)client->server_fqdn_len, client->server_fqdn ));
     498           0 :   return 1;
     499           0 : }
     500             : 
     501             : static void
     502           0 : fd_event_client_grpc_conn_established( void * app_ctx ) {
     503           0 :   fd_event_client_t * client = app_ctx;
     504             : 
     505           0 :   long now = fd_log_wallclock();
     506           0 :   client->state             = FD_EVENT_CLIENT_STATE_AUTHENTICATING;
     507           0 :   client->auth_deadline     = now + (long)2e9;
     508           0 :   client->auth_send_pending = 1;
     509             : 
     510           0 :   fd_event_client_try_send_authenticate( client );
     511           0 : }
     512             : 
     513             : static void
     514             : fd_event_client_handle_auth_challenge_resp( fd_event_client_t * client,
     515             :                                             void const *        protobuf,
     516           0 :                                             ulong               protobuf_sz ) {
     517           0 :   fd_pb_inbuf_t inbuf[1];
     518           0 :   fd_pb_inbuf_init( inbuf, protobuf, protobuf_sz );
     519             : 
     520           0 :   if( FD_UNLIKELY( protobuf_sz==0UL ) ) {
     521           0 :     FD_LOG_WARNING(( "Empty auth challenge response" ));
     522           0 :     client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
     523           0 :     return;
     524           0 :   }
     525             : 
     526           0 :   fd_pb_tlv_t challenge_tlv;
     527           0 :   if( FD_UNLIKELY( !fd_pb_read_tlv( inbuf, &challenge_tlv ) ) ) {
     528           0 :     FD_LOG_WARNING(( "Failed to parse auth challenge response" ));
     529           0 :     client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
     530           0 :     return;
     531           0 :   }
     532             : 
     533           0 :   if( FD_UNLIKELY( challenge_tlv.field_id!=1U || challenge_tlv.wire_type!=FD_PB_WIRE_TYPE_LEN ) ) {
     534           0 :     FD_LOG_WARNING(( "Unexpected field in auth challenge response" ));
     535           0 :     client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
     536           0 :     return;
     537           0 :   }
     538             : 
     539           0 :   ulong challenge_len = challenge_tlv.len;
     540           0 :   if( FD_UNLIKELY( challenge_len!=FD_EVENT_CLIENT_TOKEN_SZ ) ) {
     541           0 :     FD_LOG_WARNING(( "Invalid challenge token size: %lu bytes (expected %lu)", challenge_len, FD_EVENT_CLIENT_TOKEN_SZ ));
     542           0 :     client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
     543           0 :     return;
     544           0 :   }
     545             : 
     546           0 :   if( FD_UNLIKELY( fd_pb_inbuf_sz( inbuf )<challenge_len ) ) {
     547           0 :     FD_LOG_WARNING(( "Truncated auth challenge response" ));
     548           0 :     client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
     549           0 :     return;
     550           0 :   }
     551             : 
     552           0 :   uchar challenge_token[ FD_EVENT_CLIENT_TOKEN_SZ ];
     553           0 :   memcpy( challenge_token, inbuf->cur, challenge_len );
     554           0 :   inbuf->cur += challenge_len;
     555             : 
     556           0 :   if( FD_UNLIKELY( fd_pb_inbuf_sz( inbuf ) ) ) {
     557           0 :     FD_LOG_WARNING(( "Trailing data in auth challenge response" ));
     558           0 :     client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
     559           0 :     return;
     560           0 :   }
     561             : 
     562           0 :   uchar sign_request[ 100UL + FD_EVENT_CLIENT_TOKEN_SZ ];
     563           0 :   static char const sign_prefix[ 100 ] =
     564           0 :     "                                "  /* 32 spaces */
     565           0 :     "                                "  /* 32 spaces */
     566           0 :     "Firedancer event challenge-response";
     567           0 :   memcpy( sign_request,     sign_prefix,     sizeof(sign_prefix) );
     568           0 :   memcpy( sign_request+100, challenge_token, challenge_len       );
     569             : 
     570           0 :   uchar signature[ 64UL ];
     571           0 :   fd_keyguard_client_sign( client->keyguard_client,
     572           0 :                            signature,
     573           0 :                            sign_request, 100UL+challenge_len,
     574           0 :                            FD_KEYGUARD_SIGN_TYPE_ED25519 );
     575             : 
     576             :   /* Build "hex(challenge_token).hex(signature)" for the bearer token. */
     577           0 :   fd_hex_encode( client->auth_bearer, challenge_token, FD_EVENT_CLIENT_TOKEN_SZ );
     578           0 :   client->auth_bearer[ 2UL*FD_EVENT_CLIENT_TOKEN_SZ ] = '.';
     579           0 :   fd_hex_encode( client->auth_bearer + 2UL*FD_EVENT_CLIENT_TOKEN_SZ+1UL, signature, 64UL );
     580           0 :   client->auth_bearer_len = 2UL*FD_EVENT_CLIENT_TOKEN_SZ + 1UL + 2UL*64UL;
     581           0 :   client->auth_bearer[ client->auth_bearer_len ] = '\0';
     582             : 
     583           0 :   client->event_stream = NULL;
     584           0 :   client->metrics.transport_success_cnt++;
     585           0 :   client->state = FD_EVENT_CLIENT_STATE_CONNECTED;
     586           0 :   client->connected.connected_timestamp = fd_log_wallclock();
     587           0 :   FD_LOG_NOTICE(( "connected to event server " FD_IP4_ADDR_FMT ":%u (%.*s)",
     588           0 :                   FD_IP4_ADDR_FMT_ARGS( client->server_ip4_addr ), client->server_tcp_port,
     589           0 :                   (int)client->server_fqdn_len, client->server_fqdn ));
     590           0 : }
     591             : 
     592             : static void
     593             : fd_event_client_grpc_conn_dead( void * app_ctx,
     594             :                                 uint   h2_err,
     595           0 :                                 int    closed_by ) {
     596           0 :   fd_event_client_t * client = app_ctx;
     597           0 :   FD_LOG_WARNING(( "Event gRPC connection closed %s (%u-%s)",
     598           0 :                    closed_by ? "by peer" : "due to error",
     599           0 :                    h2_err, fd_h2_strerror( h2_err ) ));
     600           0 :   disconnect( client, DISCONNECT_REASON_PEER_CLOSED, 0, 1 );
     601           0 : }
     602             : 
     603             : static void
     604             : fd_event_client_grpc_tx_complete( void * app_ctx,
     605           0 :                                   ulong  request_ctx ) {
     606           0 :   (void)app_ctx; (void)request_ctx;
     607           0 : }
     608             : 
     609             : void
     610             : fd_event_client_grpc_rx_start( void * app_ctx,
     611           0 :                                ulong  request_ctx ) {
     612           0 :   (void)app_ctx; (void)request_ctx;
     613           0 : }
     614             : 
     615             : static void
     616             : fd_event_client_handle_stream_events_resp( fd_event_client_t * client,
     617             :                                            void const *        protobuf,
     618           0 :                                            ulong               protobuf_sz ) {
     619           0 :   fd_pb_inbuf_t inbuf[1];
     620           0 :   fd_pb_inbuf_init( inbuf, protobuf, protobuf_sz );
     621             : 
     622           0 :   ulong nonce_ack = 0UL;
     623           0 :   if( FD_LIKELY( protobuf_sz ) ) {
     624           0 :     fd_pb_tlv_t event_id;
     625           0 :     if( FD_UNLIKELY( !fd_pb_read_tlv( inbuf, &event_id ) ||
     626           0 :                      event_id.field_id!=1U /* event_id */ ||
     627           0 :                      event_id.wire_type!=FD_PB_WIRE_TYPE_VARINT ) ) {
     628           0 :       FD_LOG_WARNING(( "Event gRPC rx msg: invalid Protobuf" ));
     629           0 :       client->defer_disconnect = DISCONNECT_REASON_INVALID_PROTOBUF;
     630           0 :       return;
     631           0 :     }
     632           0 :     nonce_ack = event_id.varint;
     633             : 
     634           0 :     if( FD_UNLIKELY( fd_pb_inbuf_sz( inbuf ) ) ) {
     635           0 :       FD_LOG_WARNING(( "Event gRPC rx msg: trailing data in StreamEventsResponse" ));
     636           0 :       client->defer_disconnect = DISCONNECT_REASON_INVALID_PROTOBUF;
     637           0 :       return;
     638           0 :     }
     639           0 :   }
     640             : 
     641           0 :   client->metrics.events_acked++;
     642           0 :   if( FD_UNLIKELY( nonce_ack==ULONG_MAX ) ) return;
     643             : 
     644           0 :   client->metrics.last_acked_id = nonce_ack;
     645             : 
     646           0 :   int err = fd_circq_pop_until( client->circq, nonce_ack );
     647           0 :   if( FD_UNLIKELY( -1==err ) ) {
     648           0 :     FD_LOG_WARNING(( "Event gRPC rx msg: invalid cursor ack %lu", nonce_ack ));
     649           0 :     client->defer_disconnect = DISCONNECT_REASON_INVALID_CURSOR;
     650           0 :   }
     651           0 : }
     652             : 
     653             : void
     654             : fd_event_client_grpc_rx_msg( void *       app_ctx,
     655             :                              void const * protobuf,
     656             :                              ulong        protobuf_sz,
     657           0 :                              ulong        request_ctx ) {
     658           0 :   fd_event_client_t * client = app_ctx;
     659             : 
     660           0 :   switch( request_ctx ) {
     661           0 :     case FD_EVENT_CLIENT_REQ_CTX_AUTHENTICATE:
     662           0 :       fd_event_client_handle_auth_challenge_resp( client, protobuf, protobuf_sz );
     663           0 :       break;
     664           0 :     case FD_EVENT_CLIENT_REQ_CTX_STREAM_EVENTS:
     665           0 :       fd_event_client_handle_stream_events_resp( client, protobuf, protobuf_sz );
     666           0 :       break;
     667           0 :     default:
     668           0 :       FD_LOG_WARNING(( "Unknown request_ctx: %lu, disconnecting", request_ctx ));
     669           0 :       client->defer_disconnect = DISCONNECT_REASON_INVALID_PROTOBUF;
     670           0 :       break;
     671           0 :   }
     672           0 : }
     673             : 
     674             : void
     675             : fd_event_client_grpc_rx_end( void *                app_ctx,
     676             :                              ulong                 request_ctx,
     677           0 :                              fd_grpc_resp_hdrs_t * resp ) {
     678           0 :   fd_event_client_t * client = app_ctx;
     679             : 
     680           0 :   if( FD_UNLIKELY( resp->h2_status!=200 ) ) {
     681           0 :     FD_LOG_WARNING(( "Event gRPC request failed (HTTP status %u)", resp->h2_status ));
     682           0 :     client->defer_disconnect = DISCONNECT_REASON_TRANSPORT_FAILED;
     683           0 :     return;
     684           0 :   }
     685             : 
     686           0 :   resp->grpc_msg_len = (uint)fd_url_unescape( resp->grpc_msg, resp->grpc_msg_len );
     687           0 :   if( !resp->grpc_msg_len ) {
     688           0 :     fd_memcpy( resp->grpc_msg, "unknown error", 13 );
     689           0 :     resp->grpc_msg_len = 13;
     690           0 :   }
     691             : 
     692           0 :   if( FD_UNLIKELY( resp->grpc_status!=FD_GRPC_STATUS_OK ) ) {
     693           0 :     switch( request_ctx ) {
     694           0 :     case FD_EVENT_CLIENT_REQ_CTX_AUTHENTICATE:
     695           0 :       FD_LOG_WARNING(( "Event authentication failed (gRPC status %u-%s): %.*s",
     696           0 :                        resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
     697           0 :                        (int)resp->grpc_msg_len, resp->grpc_msg ));
     698           0 :       client->defer_disconnect = DISCONNECT_REASON_AUTH_FAILED;
     699           0 :       return;
     700           0 :     case FD_EVENT_CLIENT_REQ_CTX_STREAM_EVENTS:
     701           0 :       FD_LOG_WARNING(( "Event stream failed (gRPC status %u-%s): %.*s",
     702           0 :                        resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
     703           0 :                        (int)resp->grpc_msg_len, resp->grpc_msg ));
     704           0 :       client->defer_disconnect = DISCONNECT_REASON_PEER_CLOSED;
     705           0 :       return;
     706           0 :     default:
     707           0 :       FD_LOG_WARNING(( "Event gRPC request failed (gRPC status %u-%s): %.*s",
     708           0 :                        resp->grpc_status, fd_grpc_status_cstr( resp->grpc_status ),
     709           0 :                        (int)resp->grpc_msg_len, resp->grpc_msg ));
     710           0 :       client->defer_disconnect = DISCONNECT_REASON_TRANSPORT_FAILED;
     711           0 :       return;
     712           0 :     }
     713           0 :   }
     714             : 
     715           0 :   if( request_ctx==FD_EVENT_CLIENT_REQ_CTX_STREAM_EVENTS ) {
     716           0 :     FD_LOG_INFO(( "Event gRPC stream ended gracefully" ));
     717           0 :     client->defer_disconnect = DISCONNECT_REASON_PEER_CLOSED;
     718           0 :   }
     719           0 : }
     720             : 
     721             : void
     722             : fd_event_client_grpc_rx_timeout( void * app_ctx,
     723             :                                  ulong  request_ctx FD_PARAM_UNUSED,
     724           0 :                                  int    deadline_kind FD_PARAM_UNUSED ) {
     725           0 :   FD_LOG_WARNING(( "Event gRPC rx timeout" ));
     726           0 :   fd_event_client_t * client = (fd_event_client_t *)app_ctx;
     727           0 :   client->defer_disconnect = DISCONNECT_REASON_TRANSPORT_FAILED;
     728           0 :   client->event_stream     = NULL;
     729           0 : }
     730             : 
     731             : static void
     732           0 : fd_event_client_grpc_ping_ack( void * app_ctx ) {
     733           0 :   (void)app_ctx;
     734           0 :   FD_LOG_WARNING(( "Event gRPC ping ack" ));
     735           0 : }
     736             : 
     737             : static void
     738             : tx( fd_event_client_t * client,
     739           0 :     int *               charge_busy ) {
     740           0 :   FD_TEST( client->state==FD_EVENT_CLIENT_STATE_CONNECTED );
     741             : 
     742           0 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( client->grpc_client ) ) ) return;
     743           0 :   if( FD_UNLIKELY( client->event_stream && client->grpc_client->request_stream != NULL && client->grpc_client->request_stream!=client->event_stream ) ) return;
     744             : 
     745           0 :   if( FD_UNLIKELY( !client->event_stream ) ) {
     746           0 :     client->event_stream = fd_grpc_client_request_start1(
     747           0 :         client->grpc_client,
     748           0 :         "/events.v1.EventService/StreamEvents", strlen("/events.v1.EventService/StreamEvents"),
     749           0 :         FD_EVENT_CLIENT_REQ_CTX_STREAM_EVENTS,
     750           0 :         NULL, 0UL, /* headers only; first message sent later */
     751           0 :         client->auth_bearer, client->auth_bearer_len,
     752           0 :         1 /* streaming */ );
     753           0 :     if( FD_UNLIKELY( !client->event_stream ) ) return; /* transient; retry next poll */
     754           0 :     fd_grpc_client_deadline_set( client->event_stream, FD_GRPC_DEADLINE_HEADER, fd_log_wallclock()+(long)10e9 /* 10s */ );
     755           0 :     *charge_busy = 1;
     756           0 :     return;
     757           0 :   }
     758             : 
     759           0 :   ulong msg_sz;
     760           0 :   uchar const * msg = fd_circq_cursor_advance( client->circq, &msg_sz );
     761           0 :   if( FD_LIKELY( !msg ) ) return;
     762             : 
     763           0 :   int result = fd_grpc_client_stream_send_msg1( client->grpc_client, client->event_stream, msg, msg_sz );
     764           0 :   if( FD_UNLIKELY( !result ) ) return; /* Only reason for failure is too big message, so just skip it */
     765             : 
     766           0 :   client->metrics.events_sent++;
     767           0 :   *charge_busy = 1;
     768           0 : }
     769             : 
     770             : void
     771             : fd_event_client_poll( fd_event_client_t * client,
     772           0 :                       int *               charge_busy ) {
     773           0 :   if( FD_UNLIKELY( !client->has_genesis_hash || !client->has_shred_version ) ) return;
     774             : 
     775           0 :   long now = fd_log_wallclock();
     776             : 
     777           0 :   if( FD_UNLIKELY( client->state==FD_EVENT_CLIENT_STATE_DISCONNECTED ) ) reconnect( client, charge_busy );
     778           0 :   if( FD_UNLIKELY( client->state==FD_EVENT_CLIENT_STATE_CONNECTING ) ) {
     779           0 :     if( FD_UNLIKELY( now>client->connecting.connect_deadline ) ) {
     780           0 :       disconnect( client, DISCONNECT_REASON_TIMEOUT, 0, 1 );
     781           0 :       return;
     782           0 :     }
     783           0 :   }
     784             :   /* Check auth handshake timeout */
     785           0 :   if( FD_UNLIKELY( client->state==FD_EVENT_CLIENT_STATE_AUTHENTICATING && now>client->auth_deadline ) ) {
     786           0 :     FD_LOG_WARNING(( "auth handshake timed out" ));
     787           0 :     client->metrics.handshake_timeout_cnt++;
     788           0 :     disconnect( client, DISCONNECT_REASON_TIMEOUT, 0, 1 );
     789           0 :     return;
     790           0 :   }
     791           0 :   if( FD_LIKELY( client->state!=FD_EVENT_CLIENT_STATE_DISCONNECTED ) ) {
     792           0 :     int rxtx_err;
     793           0 : #   if FD_HAS_OPENSSL
     794           0 :     if( client->use_tls )
     795           0 :       rxtx_err = fd_grpc_client_rxtx_ossl( client->grpc_client, client->ssl, charge_busy );
     796           0 :     else
     797           0 : #   endif
     798           0 :       rxtx_err = fd_grpc_client_rxtx_socket( client->grpc_client, client->sockfd, charge_busy );
     799           0 :     if( FD_UNLIKELY( -1==rxtx_err ) ) {
     800           0 :       disconnect( client, DISCONNECT_REASON_TRANSPORT_FAILED, errno, 1 );
     801           0 :       return;
     802           0 :     }
     803           0 :   }
     804             : 
     805           0 :   if( FD_UNLIKELY( client->state==FD_EVENT_CLIENT_STATE_AUTHENTICATING && client->auth_send_pending ) ) {
     806           0 :     fd_event_client_try_send_authenticate( client );
     807           0 :   }
     808             : 
     809           0 :   if( FD_UNLIKELY( client->defer_disconnect!=INT_MAX ) ) {
     810           0 :     int reason = client->defer_disconnect;
     811           0 :     client->defer_disconnect = INT_MAX;
     812           0 :     if( reason==DISCONNECT_REASON_AUTH_FAILED ) client->metrics.auth_fail_cnt++;
     813           0 :     if( reason==DISCONNECT_REASON_INVALID_PROTOBUF ) client->metrics.invalid_msg_cnt++;
     814           0 :     disconnect( client, reason, 0, 1 );
     815           0 :     return;
     816           0 :   }
     817             : 
     818           0 :   if( FD_LIKELY( client->state==FD_EVENT_CLIENT_STATE_CONNECTED ) ) {
     819           0 :     if( FD_UNLIKELY( client->consecutive_failure_count && (now-client->connected.connected_timestamp>10L*(long)1e9 ) ) ) client->consecutive_failure_count = 0UL;
     820           0 :     tx( client, charge_busy );
     821           0 :   }
     822           0 : }
     823             : 
     824             : fd_grpc_client_callbacks_t fd_event_client_grpc_callbacks = {
     825             :   .conn_established = fd_event_client_grpc_conn_established,
     826             :   .conn_dead        = fd_event_client_grpc_conn_dead,
     827             :   .tx_complete      = fd_event_client_grpc_tx_complete,
     828             :   .rx_start         = fd_event_client_grpc_rx_start,
     829             :   .rx_msg           = fd_event_client_grpc_rx_msg,
     830             :   .rx_end           = fd_event_client_grpc_rx_end,
     831             :   .rx_timeout       = fd_event_client_grpc_rx_timeout,
     832             :   .ping_ack         = fd_event_client_grpc_ping_ack,
     833             : };

Generated by: LCOV version 1.14