LCOV - code coverage report
Current view: top level - waltz/grpc - fd_grpc_client.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 347 493 70.4 %
Date: 2025-08-05 05:04:49 Functions: 24 35 68.6 %

          Line data    Source code
       1             : #include "fd_grpc_client.h"
       2             : #include "fd_grpc_client_private.h"
       3             : #include "../../ballet/nanopb/pb_encode.h" /* pb_msgdesc_t */
       4             : #include <sys/socket.h>
       5             : #include "../h2/fd_h2_rbuf_sock.h"
       6             : #include "fd_grpc_codec.h"
       7             : #if FD_HAS_OPENSSL
       8             : #include "../openssl/fd_openssl.h"
       9             : #include <openssl/ssl.h>
      10             : #include <openssl/err.h>
      11             : #include "../h2/fd_h2_rbuf_ossl.h"
      12             : #endif
      13             : 
      14             : ulong
      15         357 : fd_grpc_client_align( void ) {
      16         357 :   return fd_ulong_max( alignof(fd_grpc_client_t), fd_grpc_h2_stream_pool_align() );
      17         357 : }
      18             : 
      19             : ulong
      20          90 : fd_grpc_client_footprint( ulong buf_max ) {
      21          90 :   ulong l = FD_LAYOUT_INIT;
      22          90 :   l = FD_LAYOUT_APPEND( l, alignof(fd_grpc_client_t), sizeof(fd_grpc_client_t) );
      23          90 :   l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* nanopb_tx */
      24          90 :   l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_scratch */
      25          90 :   l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_rx_buf */
      26          90 :   l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_tx_buf */
      27          90 :   l = FD_LAYOUT_APPEND( l, fd_grpc_h2_stream_pool_align(), fd_grpc_h2_stream_pool_footprint( FD_GRPC_CLIENT_MAX_STREAMS ) );
      28          90 :   l = FD_LAYOUT_APPEND( l, 1UL, buf_max*FD_GRPC_CLIENT_MAX_STREAMS );
      29          90 :   return FD_LAYOUT_FINI( l, fd_grpc_client_align() );
      30          90 : }
      31             : 
      32             : static void
      33         129 : fd_grpc_h2_stream_reset( fd_grpc_h2_stream_t * stream ) {
      34         129 :   memset( &stream->s, 0, sizeof(fd_h2_stream_t) );
      35         129 :   stream->request_ctx = 0UL;
      36         129 :   memset( &stream->hdrs, 0, sizeof(fd_grpc_resp_hdrs_t) );
      37         129 :   stream->hdrs.grpc_status    = FD_GRPC_STATUS_UNKNOWN;
      38         129 :   stream->hdrs_received       = 0;
      39         129 :   stream->msg_buf_used        = 0UL;
      40         129 :   stream->msg_sz              = 0UL;
      41         129 :   stream->has_header_deadline = 0;
      42         129 :   stream->has_rx_end_deadline = 0;
      43         129 : }
      44             : 
      45             : fd_grpc_client_t *
      46             : fd_grpc_client_new( void *                             mem,
      47             :                     fd_grpc_client_callbacks_t const * callbacks,
      48             :                     fd_grpc_client_metrics_t *         metrics,
      49             :                     void *                             app_ctx,
      50             :                     ulong                              buf_max,
      51          45 :                     ulong                              rng_seed ) {
      52          45 :   if( FD_UNLIKELY( !mem ) ) {
      53           0 :     FD_LOG_WARNING(( "NULL mem" ));
      54           0 :     return NULL;
      55           0 :   }
      56          45 :   if( FD_UNLIKELY( buf_max<4096UL ) ) {
      57           0 :     FD_LOG_WARNING(( "undersz buf_max" ));
      58           0 :     return NULL;
      59           0 :   }
      60          45 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_grpc_client_align() ) ) ) {
      61           0 :     FD_LOG_WARNING(( "unaligned mem" ));
      62           0 :     return NULL;
      63           0 :   }
      64             : 
      65          45 :   FD_SCRATCH_ALLOC_INIT( l, mem );
      66          45 :   void * client_mem      = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_grpc_client_t), sizeof(fd_grpc_client_t) );
      67          45 :   void * nanopb_tx       = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* nanopb_tx */
      68          45 :   void * frame_scratch   = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_scratch */
      69          45 :   void * frame_rx_buf    = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_rx_buf */
      70          45 :   void * frame_tx_buf    = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_tx_buf */
      71          45 :   void * stream_pool_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_grpc_h2_stream_pool_align(), fd_grpc_h2_stream_pool_footprint( FD_GRPC_CLIENT_MAX_STREAMS ) );
      72          45 :   void * stream_buf_mem  = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max*FD_GRPC_CLIENT_MAX_STREAMS );
      73          45 :   ulong end = FD_SCRATCH_ALLOC_FINI( l, fd_grpc_client_align() );
      74          45 :   FD_TEST( end-(ulong)mem == fd_grpc_client_footprint( buf_max ) );
      75             : 
      76          45 :   fd_grpc_client_t * client = client_mem;
      77             : 
      78          45 :   fd_grpc_h2_stream_t * stream_pool =
      79          45 :     fd_grpc_h2_stream_pool_join( fd_grpc_h2_stream_pool_new( stream_pool_mem, FD_GRPC_CLIENT_MAX_STREAMS ) );
      80          45 :   if( FD_UNLIKELY( !stream_pool ) ) FD_LOG_CRIT(( "Failed to create stream pool" )); /* unreachable */
      81             : 
      82          45 :   *client = (fd_grpc_client_t){
      83          45 :     .callbacks         = callbacks,
      84          45 :     .ctx               = app_ctx,
      85          45 :     .stream_pool       = stream_pool,
      86          45 :     .stream_bufs       = stream_buf_mem,
      87          45 :     .nanopb_tx         = nanopb_tx,
      88          45 :     .nanopb_tx_max     = buf_max,
      89          45 :     .frame_scratch     = frame_scratch,
      90          45 :     .frame_scratch_max = buf_max,
      91          45 :     .frame_rx_buf      = frame_rx_buf,
      92          45 :     .frame_rx_buf_max  = buf_max,
      93          45 :     .frame_tx_buf      = frame_tx_buf,
      94          45 :     .frame_tx_buf_max  = buf_max,
      95          45 :     .metrics           = metrics
      96          45 :   };
      97             : 
      98             :   /* FIXME for performance, cache this? */
      99          45 :   fd_h2_hdr_matcher_init( client->matcher, rng_seed );
     100          45 :   fd_h2_hdr_matcher_insert_literal( client->matcher, FD_GRPC_HDR_STATUS,  "grpc-status"  );
     101          45 :   fd_h2_hdr_matcher_insert_literal( client->matcher, FD_GRPC_HDR_MESSAGE, "grpc-message" );
     102             : 
     103          45 :   client->version_len = 5;
     104          45 :   memcpy( client->version, "0.0.0", 5 );
     105             : 
     106         405 :   for( ulong i=0UL; i<FD_GRPC_CLIENT_MAX_STREAMS; i++ ) {
     107         360 :     fd_grpc_h2_stream_t * stream = &client->stream_pool[ i ];
     108         360 :     stream->msg_buf     = (uchar *)stream_buf_mem + (i*buf_max);
     109         360 :     stream->msg_buf_max = buf_max;
     110         360 :     FD_TEST( (ulong)( stream->msg_buf + stream->msg_buf_max )<=end );
     111         360 :   }
     112          45 :   fd_grpc_client_reset( client );
     113             : 
     114          45 :   return client;
     115          45 : }
     116             : 
     117             : void *
     118           3 : fd_grpc_client_delete( fd_grpc_client_t * client ) {
     119           3 :   return client;
     120           3 : }
     121             : 
     122             : void
     123             : fd_grpc_client_set_version( fd_grpc_client_t * client,
     124             :                             char const *       version,
     125           0 :                             ulong              version_len ) {
     126           0 :   if( FD_UNLIKELY( version_len > FD_GRPC_CLIENT_VERSION_LEN_MAX ) ) {
     127           0 :     FD_LOG_WARNING(( "Version string too long (%lu chars), ignoring", version_len ));
     128           0 :     return;
     129           0 :   }
     130           0 :   client->version_len = (uchar)version_len;
     131           0 :   memcpy( client->version, version, version_len );
     132           0 : }
     133             : 
     134             : void
     135             : fd_grpc_client_set_authority( fd_grpc_client_t * client,
     136             :                               char const *       host,
     137             :                               ulong              host_len,
     138           0 :                               ushort             port ) {
     139           0 :   host_len = fd_ulong_min( host_len, sizeof(client->host)-1 );
     140           0 :   fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( client->host ), host, host_len ) );
     141           0 :   client->host_len = (uchar)host_len;
     142           0 :   client->port     = (ushort)port;
     143           0 : }
     144             : 
     145             : int
     146         120 : fd_grpc_client_stream_acquire_is_safe( fd_grpc_client_t * client ) {
     147             :   /* Sufficient quota to start a stream? */
     148         120 :   if( FD_UNLIKELY( client->conn->stream_active_cnt[1]+1 > client->conn->peer_settings.max_concurrent_streams ) ) {
     149          18 :     return 0;
     150          18 :   }
     151             : 
     152             :   /* Free stream object available? */
     153         102 :   if( FD_UNLIKELY( !fd_grpc_h2_stream_pool_free( client->stream_pool ) ) ) {
     154           0 :     return 0;
     155           0 :   }
     156         102 :   if( FD_UNLIKELY( client->stream_cnt >= FD_GRPC_CLIENT_MAX_STREAMS ) ) {
     157           0 :     return 0;
     158           0 :   }
     159             : 
     160         102 :   return 1;
     161         102 : }
     162             : 
     163             : fd_grpc_h2_stream_t *
     164             : fd_grpc_client_stream_acquire( fd_grpc_client_t * client,
     165         129 :                                ulong              request_ctx ) {
     166         129 :   if( FD_UNLIKELY( client->stream_cnt >= FD_GRPC_CLIENT_MAX_STREAMS ) ) {
     167           0 :     FD_LOG_CRIT(( "stream pool exhausted" ));
     168           0 :   }
     169             : 
     170         129 :   fd_h2_conn_t * conn = client->conn;
     171         129 :   uint const stream_id = client->conn->tx_stream_next;
     172         129 :   conn->tx_stream_next += 2U;
     173             : 
     174         129 :   fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_pool_ele_acquire( client->stream_pool );
     175         129 :   fd_grpc_h2_stream_reset( stream );
     176         129 :   stream->request_ctx = request_ctx;
     177             : 
     178         129 :   fd_h2_stream_open( fd_h2_stream_init( &stream->s ), conn, stream_id );
     179         129 :   client->request_stream = stream;
     180         129 :   client->stream_ids[ client->stream_cnt ] = stream_id;
     181         129 :   client->streams   [ client->stream_cnt ] = stream;
     182         129 :   client->stream_cnt++;
     183             : 
     184         129 :   return stream;
     185         129 : }
     186             : 
     187             : void
     188             : fd_grpc_client_stream_release( fd_grpc_client_t *    client,
     189          54 :                                fd_grpc_h2_stream_t * stream ) {
     190          54 :   if( FD_UNLIKELY( !client->stream_cnt ) ) FD_LOG_CRIT(( "stream map corrupt" )); /* unreachable */
     191             : 
     192             :   /* Deallocate tx_op */
     193          54 :   if( FD_UNLIKELY( stream == client->request_stream ) ) {
     194          36 :     client->request_stream = NULL;
     195          36 :     *client->request_tx_op = (fd_h2_tx_op_t){0};
     196          36 :   }
     197             : 
     198             :   /* Remove stream from map */
     199          54 :   int map_idx = -1;
     200         153 :   for( uint i=0UL; i<(client->stream_cnt); i++ ) {
     201          99 :     if( client->stream_ids[ i ] == stream->s.stream_id ) {
     202          54 :       map_idx = (int)i;
     203          54 :     }
     204          99 :   }
     205          54 :   if( FD_UNLIKELY( map_idx<0 ) ) FD_LOG_CRIT(( "stream map corrupt" )); /* unreachable */
     206          54 :   if( (ulong)map_idx+1 < client->stream_cnt ) {
     207           6 :     client->stream_ids[ map_idx ] = client->stream_ids[ client->stream_cnt-1 ];
     208           6 :     client->streams   [ map_idx ] = client->streams   [ client->stream_cnt-1 ];
     209           6 :   }
     210          54 :   client->stream_cnt--;
     211             : 
     212          54 :   fd_grpc_h2_stream_pool_ele_release( client->stream_pool, stream );
     213          54 : }
     214             : 
     215             : void
     216          63 : fd_grpc_client_reset( fd_grpc_client_t * client ) {
     217          63 :   fd_h2_rbuf_init( client->frame_rx, client->frame_rx_buf, client->frame_rx_buf_max );
     218          63 :   fd_h2_rbuf_init( client->frame_tx, client->frame_tx_buf, client->frame_tx_buf_max );
     219          63 :   fd_h2_conn_init_client( client->conn );
     220          63 :   client->conn->ctx      = client;
     221          63 :   client->h2_hs_done     = 0;
     222          63 :   client->ssl_hs_done    = 0;
     223          63 :   client->request_stream = NULL;
     224          63 :   *client->request_tx_op = (fd_h2_tx_op_t){0};
     225             : 
     226             :   /* Disable RX flow control */
     227          63 :   client->conn->self_settings.initial_window_size = (1U<<31)-1U;
     228          63 :   client->conn->rx_wnd_max   = (1U<<31)-1U;
     229          63 :   client->conn->rx_wnd_wmark = client->conn->rx_wnd_max - (1U<<20);
     230             : 
     231             :   /* Free all stream objects */
     232          72 :   while( client->stream_cnt ) {
     233           9 :     fd_grpc_h2_stream_t * stream = client->streams[ client->stream_cnt-1 ];
     234           9 :     fd_grpc_client_stream_release( client, stream );
     235           9 :   }
     236          63 : }
     237             : 
     238             : /* fd_grpc_client_send_stream_quota writes a WINDOW_UPDATE frame, which
     239             :    eventually allows the peer to send more data bytes. */
     240             : 
     241             : static void
     242             : fd_grpc_client_send_stream_quota( fd_h2_rbuf_t *        rbuf_tx,
     243             :                                   fd_grpc_h2_stream_t * stream,
     244           3 :                                   uint                  bump ) {
     245           3 :   fd_h2_window_update_t window_update = {
     246           3 :     .hdr = {
     247           3 :       .typlen      = fd_h2_frame_typlen( FD_H2_FRAME_TYPE_WINDOW_UPDATE, 4UL ),
     248           3 :       .r_stream_id = fd_uint_bswap( stream->s.stream_id )
     249           3 :     },
     250           3 :     .increment = fd_uint_bswap( bump )
     251           3 :   };
     252           3 :   fd_h2_rbuf_push( rbuf_tx, &window_update, sizeof(fd_h2_window_update_t) );
     253           3 :   stream->s.rx_wnd += bump;
     254           3 : }
     255             : 
     256             : /* fd_grpc_client_send_timeout is called when a stream timeout triggers.
     257             :    Calls back to the user, writes a RST_STREAM frame, and frees the
     258             :    stream object. */
     259             : 
     260             : static void
     261             : fd_grpc_client_send_timeout( fd_h2_rbuf_t *        rbuf_tx,
     262             :                              fd_grpc_client_t *    client,
     263             :                              fd_grpc_h2_stream_t * stream,
     264          12 :                              int                   deadline_kind ) {
     265          12 :   client->callbacks->rx_timeout( client->ctx, stream->request_ctx, deadline_kind );
     266          12 :   fd_h2_tx_rst_stream( rbuf_tx, stream->s.stream_id, FD_H2_ERR_CANCEL );
     267          12 :   fd_grpc_client_stream_release( client, stream );
     268          12 : }
     269             : 
     270             : void
     271             : fd_grpc_client_service_streams( fd_grpc_client_t * client,
     272          51 :                                 long               ts_nanos ) {
     273          51 :   ulong const meta_frame_max =
     274          51 :     fd_ulong_max( sizeof(fd_h2_window_update_t), sizeof(fd_h2_rst_stream_t) );
     275          51 :   fd_h2_conn_t * conn    = client->conn;
     276          51 :   fd_h2_rbuf_t * rbuf_tx = client->frame_tx;
     277          51 :   if( FD_UNLIKELY( conn->flags ) ) return;
     278          51 :   uint  const wnd_max    = conn->self_settings.initial_window_size;
     279          51 :   uint  const wnd_thres  = wnd_max / 2;
     280         141 :   for( ulong i=0UL; i<(client->stream_cnt); i++ ) {
     281          90 :     if( FD_UNLIKELY( fd_h2_rbuf_free_sz( rbuf_tx )<meta_frame_max ) ) break;
     282          90 :     fd_grpc_h2_stream_t * stream = client->streams[ i ];
     283             : 
     284          90 :     if( FD_UNLIKELY( ( stream->has_header_deadline ) &
     285          90 :                      ( stream->header_deadline_nanos - ts_nanos <= 0L ) ) ) {
     286           6 :       fd_grpc_client_send_timeout( rbuf_tx, client, stream, FD_GRPC_DEADLINE_HEADER );
     287           6 :       i--; /* stream removed */
     288           6 :       continue;
     289           6 :     }
     290             : 
     291          84 :     if( FD_UNLIKELY( ( stream->has_rx_end_deadline ) &
     292          84 :                      ( stream->rx_end_deadline_nanos - ts_nanos <= 0L ) ) ) {
     293           6 :       fd_grpc_client_send_timeout( rbuf_tx, client, stream, FD_GRPC_DEADLINE_RX_END );
     294           6 :       i--; /* stream removed */
     295           6 :       continue;
     296           6 :     }
     297             : 
     298          78 :     if( FD_UNLIKELY( stream->s.rx_wnd < wnd_thres ) ) {
     299           3 :       uint const bump = wnd_max - stream->s.rx_wnd;
     300           3 :       fd_grpc_client_send_stream_quota( rbuf_tx, stream, bump );
     301           3 :     }
     302          78 :   }
     303          51 : }
     304             : 
     305             : #if FD_HAS_OPENSSL
     306             : 
     307             : static int
     308             : fd_ossl_log_error( char const * str,
     309             :                    ulong        len,
     310           0 :                    void *       ctx ) {
     311           0 :   (void)ctx;
     312           0 :   if( len>0 && str[ len-1 ]=='\n' ) len--;
     313           0 :   FD_LOG_INFO(( "%.*s", (int)len, str ));
     314           0 :   return 0;
     315           0 : }
     316             : 
     317             : int
     318             : fd_grpc_client_rxtx_ossl( fd_grpc_client_t * client,
     319             :                           SSL *              ssl,
     320           0 :                           int *              charge_busy ) {
     321           0 :   if( FD_UNLIKELY( !client->ssl_hs_done ) ) {
     322           0 :     int res = SSL_do_handshake( ssl );
     323           0 :     if( res<=0 ) {
     324           0 :       int error = SSL_get_error( ssl, res );
     325           0 :       if( FD_LIKELY( error==SSL_ERROR_WANT_READ || error==SSL_ERROR_WANT_WRITE ) ) return 1;
     326           0 :       FD_LOG_INFO(( "SSL_do_handshake failed (%i-%s)", error, fd_openssl_ssl_strerror( error ) ));
     327           0 :       long verify_result = SSL_get_verify_result( ssl );
     328           0 :       if( error == SSL_ERROR_SSL && verify_result != X509_V_OK ) {
     329           0 :         FD_LOG_WARNING(( "Certificate verification failed: %s", X509_verify_cert_error_string( verify_result ) ));
     330           0 :       }
     331           0 :       ERR_print_errors_cb( fd_ossl_log_error, NULL );
     332           0 :       return 0;
     333           0 :     } else {
     334           0 :       client->ssl_hs_done = 1;
     335           0 :     }
     336           0 :   }
     337             : 
     338           0 :   fd_h2_conn_t * conn = client->conn;
     339           0 :   int ssl_err = 0;
     340           0 :   ulong read_sz = fd_h2_rbuf_ssl_read( client->frame_rx, ssl, &ssl_err );
     341           0 :   if( FD_UNLIKELY( ssl_err && ssl_err!=SSL_ERROR_WANT_READ ) ) {
     342           0 :     if( ssl_err==SSL_ERROR_ZERO_RETURN ) {
     343           0 :       FD_LOG_WARNING(( "gRPC server closed connection" ));
     344           0 :       return 0;
     345           0 :     }
     346           0 :     FD_LOG_WARNING(( "SSL_read_ex failed (%i-%s)", ssl_err, fd_openssl_ssl_strerror( ssl_err ) ));
     347           0 :     ERR_print_errors_cb( fd_ossl_log_error, NULL );
     348           0 :     return 0;
     349           0 :   }
     350           0 :   if( FD_UNLIKELY( conn->flags ) ) fd_h2_tx_control( conn, client->frame_tx, &fd_grpc_client_h2_callbacks );
     351           0 :   fd_h2_rx( conn, client->frame_rx, client->frame_tx, client->frame_scratch, client->frame_scratch_max, &fd_grpc_client_h2_callbacks );
     352           0 :   fd_grpc_client_service_streams( client, fd_log_wallclock() );
     353           0 :   ulong write_sz = fd_h2_rbuf_ssl_write( client->frame_tx, ssl );
     354             : 
     355           0 :   if( read_sz!=0 || write_sz!=0 ) *charge_busy = 1;
     356           0 :   return 1;
     357           0 : }
     358             : 
     359             : #endif /* FD_HAS_OPENSSL */
     360             : 
     361             : #if FD_H2_HAS_SOCKETS
     362             : 
     363             : int
     364             : fd_grpc_client_rxtx_socket( fd_grpc_client_t * client,
     365             :                             int                sock_fd,
     366          27 :                             int *              charge_busy ) {
     367          27 :   fd_h2_conn_t * conn = client->conn;
     368          27 :   ulong const frame_rx_lo_0 = client->frame_rx->lo_off;
     369          27 :   ulong const frame_rx_hi_0 = client->frame_rx->hi_off;
     370          27 :   ulong const frame_tx_lo_1 = client->frame_tx->lo_off;
     371          27 :   ulong const frame_tx_hi_1 = client->frame_tx->hi_off;
     372             : 
     373          27 :   int rx_err = fd_h2_rbuf_recvmsg( client->frame_rx, sock_fd, MSG_NOSIGNAL|MSG_DONTWAIT );
     374          27 :   if( FD_UNLIKELY( rx_err ) ) {
     375           0 :     FD_LOG_INFO(( "Disconnected: recvmsg error (%i-%s)", rx_err, fd_io_strerror( rx_err ) ));
     376           0 :     return 0;
     377           0 :   }
     378             : 
     379          27 :   if( FD_UNLIKELY( conn->flags ) ) fd_h2_tx_control( conn, client->frame_tx, &fd_grpc_client_h2_callbacks );
     380          27 :   fd_h2_rx( conn, client->frame_rx, client->frame_tx, client->frame_scratch, client->frame_scratch_max, &fd_grpc_client_h2_callbacks );
     381          27 :   fd_grpc_client_service_streams( client, fd_log_wallclock() );
     382             : 
     383          27 :   int tx_err = fd_h2_rbuf_sendmsg( client->frame_tx, sock_fd, MSG_NOSIGNAL|MSG_DONTWAIT );
     384          27 :   if( FD_UNLIKELY( tx_err ) ) {
     385           0 :     FD_LOG_WARNING(( "fd_h2_rbuf_sendmsg failed (%i-%s)", tx_err, fd_io_strerror( tx_err ) ));
     386           0 :     return 0;
     387           0 :   }
     388             : 
     389          27 :   ulong const frame_rx_lo_1 = client->frame_rx->lo_off;
     390          27 :   ulong const frame_rx_hi_1 = client->frame_rx->hi_off;
     391          27 :   ulong const frame_tx_lo_0 = client->frame_tx->lo_off;
     392          27 :   ulong const frame_tx_hi_0 = client->frame_tx->hi_off;
     393             : 
     394          27 :   if( frame_rx_lo_0!=frame_rx_lo_1 || frame_rx_hi_0!=frame_rx_hi_1 ||
     395          27 :       frame_tx_lo_0!=frame_tx_lo_1 || frame_tx_hi_0!=frame_tx_hi_1 ) {
     396           3 :     *charge_busy = 1;
     397           3 :   }
     398             : 
     399          27 :   return 1;
     400          27 : }
     401             : 
     402             : #endif /* FD_H2_HAS_SOCKETS */
     403             : 
     404             : /* fd_grpc_client_request continue attempts to write a request data
     405             :    frame. */
     406             : 
     407             : static int
     408           9 : fd_grpc_client_request_continue1( fd_grpc_client_t * client ) {
     409           9 :   fd_grpc_h2_stream_t * stream    = client->request_stream;
     410           9 :   fd_h2_stream_t *      h2_stream = &stream->s;
     411           9 :   fd_h2_tx_op_copy( client->conn, h2_stream, client->frame_tx, client->request_tx_op );
     412           9 :   if( FD_UNLIKELY( client->request_tx_op->chunk_sz ) ) return 0;
     413           9 :   if( FD_UNLIKELY( h2_stream->state != FD_H2_STREAM_STATE_CLOSING_TX ) ) return 0;
     414           9 :   client->metrics->stream_chunks_tx_cnt++;
     415             :   /* Request finished */
     416           9 :   client->request_stream = NULL;
     417           9 :   client->callbacks->tx_complete( client->ctx, stream->request_ctx );
     418           9 :   return 1;
     419           9 : }
     420             : 
     421             : static int
     422           0 : fd_grpc_client_request_continue( fd_grpc_client_t * client ) {
     423           0 :   if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
     424           0 :   if( FD_UNLIKELY( !client->request_stream ) ) return 0;
     425           0 :   if( FD_UNLIKELY( !client->request_tx_op->chunk_sz ) ) return 0;
     426           0 :   return fd_grpc_client_request_continue1( client );
     427           0 : }
     428             : 
     429             : int
     430         111 : fd_grpc_client_is_connected( fd_grpc_client_t * client ) {
     431         111 :   if( FD_UNLIKELY( !client                                          ) ) return 0;
     432         111 :   if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD      ) ) return 0;
     433         111 :   if( FD_UNLIKELY( !client->h2_hs_done                              ) ) return 0;
     434         108 :   return 1;
     435         111 : }
     436             : 
     437             : int
     438         108 : fd_grpc_client_request_is_blocked( fd_grpc_client_t * client ) {
     439         108 :   if( FD_UNLIKELY( !client                                          ) ) return 1;
     440         108 :   if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD      ) ) return 1;
     441         108 :   if( FD_UNLIKELY( !client->h2_hs_done                              ) ) return 1;
     442         108 :   if( FD_UNLIKELY( !fd_h2_rbuf_is_empty( client->frame_tx )         ) ) return 1;
     443         108 :   if( FD_UNLIKELY( !fd_grpc_client_stream_acquire_is_safe( client ) ) ) return 1;
     444          90 :   return 0;
     445         108 : }
     446             : 
     447             : fd_grpc_h2_stream_t *
     448             : fd_grpc_client_request_start(
     449             :     fd_grpc_client_t *   client,
     450             :     char const *         path,
     451             :     ulong                path_len,
     452             :     ulong                request_ctx,
     453             :     pb_msgdesc_t const * fields,
     454             :     void const *         message,
     455             :     char const *         auth_token,
     456             :     ulong                auth_token_sz
     457           9 : ) {
     458           9 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( client ) ) ) return NULL;
     459             : 
     460             :   /* Encode message */
     461           9 :   FD_TEST( client->nanopb_tx_max > sizeof(fd_grpc_hdr_t) );
     462           9 :   uchar * proto_buf = client->nanopb_tx + sizeof(fd_grpc_hdr_t);
     463           9 :   pb_ostream_t ostream = pb_ostream_from_buffer( proto_buf, client->nanopb_tx_max - sizeof(fd_grpc_hdr_t) );
     464           9 :   if( FD_UNLIKELY( !pb_encode( &ostream, fields, message ) ) ) {
     465           0 :     FD_LOG_WARNING(( "Failed to encode Protobuf message (%.*s). This is a bug (insufficient buffer space?)", (int)path_len, path ));
     466           0 :     return NULL;
     467           0 :   }
     468           9 :   ulong const serialized_sz = ostream.bytes_written;
     469             : 
     470             :   /* Create gRPC length prefix */
     471           9 :   fd_grpc_hdr_t hdr = {
     472           9 :     .compressed=0,
     473           9 :     .msg_sz=fd_uint_bswap( (uint)serialized_sz )
     474           9 :   };
     475           9 :   memcpy( client->nanopb_tx, &hdr, sizeof(fd_grpc_hdr_t) );
     476           9 :   ulong const payload_sz = serialized_sz + sizeof(fd_grpc_hdr_t);
     477             : 
     478             :   /* Allocate stream descriptor */
     479           9 :   fd_grpc_h2_stream_t * stream    = fd_grpc_client_stream_acquire( client, request_ctx );
     480           9 :   uint const            stream_id = stream->s.stream_id;
     481             : 
     482             :   /* Write HTTP/2 request headers */
     483           9 :   fd_h2_tx_prepare( client->conn, client->frame_tx, FD_H2_FRAME_TYPE_HEADERS, FD_H2_FLAG_END_HEADERS, stream_id );
     484           9 :   fd_grpc_req_hdrs_t req_meta = {
     485           9 :     .host     = client->host,
     486           9 :     .host_len = client->host_len,
     487           9 :     .port     = client->port,
     488           9 :     .path     = path,
     489           9 :     .path_len = path_len,
     490           9 :     .https    = 1, /* grpc_client assumes TLS encryption for now */
     491             : 
     492           9 :     .bearer_auth     = auth_token,
     493           9 :     .bearer_auth_len = auth_token_sz
     494           9 :   };
     495           9 :   if( FD_UNLIKELY( !fd_grpc_h2_gen_request_hdrs(
     496           9 :       &req_meta,
     497           9 :       client->frame_tx,
     498           9 :       client->version,
     499           9 :       client->version_len
     500           9 :   ) ) ) {
     501           0 :     FD_LOG_WARNING(( "Failed to generate gRPC request headers (%.*s). This is a bug", (int)path_len, path ));
     502           0 :     return NULL;
     503           0 :   }
     504           9 :   fd_h2_tx_commit( client->conn, client->frame_tx );
     505             : 
     506             :   /* Queue request payload for send
     507             :      (Protobuf message might have to be fragmented into multiple HTTP/2
     508             :      DATA frames if the client gets blocked) */
     509           9 :   fd_h2_tx_op_init( client->request_tx_op, client->nanopb_tx, payload_sz, FD_H2_FLAG_END_STREAM );
     510           9 :   fd_grpc_client_request_continue1( client );
     511           9 :   client->metrics->requests_sent++;
     512           9 :   client->metrics->streams_active++;
     513             : 
     514           9 :   FD_LOG_DEBUG(( "gRPC request path=%.*s sz=%lu", (int)path_len, path, serialized_sz ));
     515             : 
     516           9 :   return stream;
     517           9 : }
     518             : 
     519             : void
     520             : fd_grpc_client_deadline_set( fd_grpc_h2_stream_t * stream,
     521             :                              int                   deadline_kind,
     522          18 :                              long                  ts_nanos ) {
     523          18 :   switch( deadline_kind ) {
     524          12 :   case FD_GRPC_DEADLINE_HEADER:
     525          12 :     stream->header_deadline_nanos = ts_nanos;
     526          12 :     stream->has_header_deadline   = 1;
     527          12 :     break;
     528           6 :   case FD_GRPC_DEADLINE_RX_END:
     529           6 :     stream->rx_end_deadline_nanos = ts_nanos;
     530           6 :     stream->has_rx_end_deadline   = 1;
     531           6 :     break;
     532          18 :   }
     533          18 : }
     534             : 
     535             : /* Lookup stream by ID */
     536             : 
     537             : static fd_h2_stream_t *
     538             : fd_grpc_h2_stream_query( fd_h2_conn_t * conn,
     539           0 :                          uint           stream_id ) {
     540           0 :   fd_grpc_client_t * client = conn->ctx;
     541           0 :   for( ulong i=0UL; i<client->stream_cnt; i++ ) {
     542           0 :     if( client->stream_ids[ i ] == stream_id ) {
     543           0 :       return &client->streams[ i ]->s;
     544           0 :     }
     545           0 :   }
     546           0 :   return NULL;
     547           0 : }
     548             : 
     549             : static void
     550           0 : fd_grpc_h2_conn_established( fd_h2_conn_t * conn ) {
     551           0 :   fd_grpc_client_t * client = conn->ctx;
     552           0 :   client->h2_hs_done = 1;
     553           0 :   client->callbacks->conn_established( client->ctx );
     554           0 : }
     555             : 
     556             : static void
     557             : fd_grpc_h2_conn_final( fd_h2_conn_t * conn,
     558             :                        uint           h2_err,
     559           0 :                        int            closed_by ) {
     560           0 :   fd_grpc_client_t * client = conn->ctx;
     561           0 :   client->callbacks->conn_dead( client->ctx, h2_err, closed_by );
     562           0 : }
     563             : 
     564             : /* React to response data */
     565             : 
     566             : void
     567             : fd_grpc_h2_cb_headers(
     568             :     fd_h2_conn_t *   conn,
     569             :     fd_h2_stream_t * h2_stream,
     570             :     void const *     data,
     571             :     ulong            data_sz,
     572             :     ulong            flags
     573          18 : ) {
     574          18 :   fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_upcast( h2_stream );
     575          18 :   fd_grpc_client_t * client = conn->ctx;
     576             : 
     577          18 :   int h2_status = fd_grpc_h2_read_response_hdrs( &stream->hdrs, client->matcher, data, data_sz );
     578          18 :   if( FD_UNLIKELY( h2_status!=FD_H2_SUCCESS ) ) {
     579             :     /* Failed to parse HTTP/2 headers */
     580           3 :     fd_h2_stream_error( h2_stream, client->frame_tx, FD_H2_ERR_PROTOCOL );
     581           3 :     client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs ); /* invalidates stream->hdrs */
     582           3 :     fd_grpc_client_stream_release( client, stream );
     583           3 :     return;
     584           3 :   }
     585             : 
     586          15 :   if( !stream->hdrs_received && !!( flags & FD_H2_FLAG_END_HEADERS) ) {
     587             :     /* Got initial response header */
     588          12 :     stream->hdrs_received = 1;
     589          12 :     stream->has_header_deadline = 0;
     590          12 :     if( FD_LIKELY( ( stream->hdrs.h2_status==200  ) &
     591          12 :                    ( !!stream->hdrs.is_grpc_proto ) ) ) {
     592           6 :       client->callbacks->rx_start( client->ctx, stream->request_ctx );
     593           6 :     }
     594          12 :   }
     595             : 
     596          15 :   if( ( flags & (FD_H2_FLAG_END_HEADERS|FD_H2_FLAG_END_STREAM) )
     597          15 :               ==(FD_H2_FLAG_END_HEADERS|FD_H2_FLAG_END_STREAM)   ) {
     598           3 :     client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
     599           3 :     fd_grpc_client_stream_release( client, stream );
     600           3 :     return;
     601           3 :   }
     602          15 : }
     603             : 
     604             : static void
     605             : fd_grpc_h2_cb_data(
     606             :     fd_h2_conn_t *   conn,
     607             :     fd_h2_stream_t * h2_stream,
     608             :     void const *     data,
     609             :     ulong            data_sz,
     610             :     ulong            flags
     611           3 : ) {
     612           3 :   fd_grpc_client_t *    client = conn->ctx;
     613           3 :   fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_upcast( h2_stream );
     614           3 :   if( FD_UNLIKELY( ( stream->hdrs.h2_status!=200 ) |
     615           3 :                    ( !stream->hdrs.is_grpc_proto ) ) ) {
     616           0 :     return;
     617           0 :   }
     618             : 
     619           3 :   do {
     620             : 
     621             :     /* Read header bytes */
     622           3 :     if( stream->msg_buf_used < sizeof(fd_grpc_hdr_t) ) {
     623           3 :       ulong hdr_frag_sz = fd_ulong_min( sizeof(fd_grpc_hdr_t) - stream->msg_buf_used, data_sz );
     624           3 :       fd_memcpy( stream->msg_buf + stream->msg_buf_used, data, hdr_frag_sz );
     625           3 :       stream->msg_buf_used += hdr_frag_sz;
     626           3 :       data     = (void const *)( (ulong)data + (ulong)hdr_frag_sz );
     627           3 :       data_sz -= hdr_frag_sz;
     628           3 :       if( FD_UNLIKELY( stream->msg_buf_used < sizeof(fd_grpc_hdr_t) ) ) return;
     629             : 
     630             :       /* Header complete */
     631           3 :       stream->msg_sz = fd_uint_bswap( FD_LOAD( uint, (void *)( (ulong)stream->msg_buf+1 ) ) );
     632           3 :       if( FD_UNLIKELY( sizeof(fd_grpc_hdr_t)  + stream->msg_sz > stream->msg_buf_max ) ) {
     633           3 :         FD_LOG_WARNING(( "Received oversized gRPC message (%lu bytes), killing request", stream->msg_sz ));
     634           3 :         client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
     635           3 :         fd_h2_stream_error( h2_stream, client->frame_tx, FD_H2_ERR_INTERNAL );
     636           3 :         fd_grpc_client_stream_release( client, stream );
     637           3 :         return;
     638           3 :       }
     639           3 :     }
     640             : 
     641             :     /* Read payload bytes */
     642           0 :     ulong wmark    = sizeof(fd_grpc_hdr_t) + stream->msg_sz;
     643           0 :     ulong chunk_sz = fd_ulong_min( stream->msg_buf_used+data_sz, wmark ) - stream->msg_buf_used;
     644           0 :     if( FD_UNLIKELY( chunk_sz>data_sz ) ) FD_LOG_CRIT(( "integer underflow" )); /* unreachable */
     645           0 :     fd_memcpy( stream->msg_buf + stream->msg_buf_used, data, chunk_sz );
     646           0 :     stream->msg_buf_used += chunk_sz;
     647           0 :     data     = (void const *)( (ulong)data + (ulong)chunk_sz );
     648           0 :     data_sz -= chunk_sz;
     649             : 
     650           0 :     client->metrics->stream_chunks_rx_cnt++;
     651           0 :     client->metrics->stream_chunks_rx_bytes += chunk_sz;
     652             : 
     653           0 :     if( stream->msg_buf_used >= wmark ) {
     654             :       /* Data complete */
     655           0 :       void const * msg_ptr = stream->msg_buf + sizeof(fd_grpc_hdr_t);
     656           0 :       client->callbacks->rx_msg( client->ctx, msg_ptr, stream->msg_sz, stream->request_ctx );
     657           0 :       stream->msg_buf_used = 0UL;
     658           0 :       stream->msg_sz       = 0UL;
     659           0 :     }
     660             : 
     661           0 :   } while( data_sz );
     662             : 
     663           0 :   if( flags & FD_H2_FLAG_END_STREAM ) {
     664             :     /* FIXME incomplete gRPC message */
     665           0 :     if( FD_UNLIKELY( !stream->msg_buf_used ) ) {
     666           0 :       FD_LOG_WARNING(( "Received incomplete gRPC message" ));
     667           0 :     }
     668           0 :     client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
     669           0 :   }
     670           0 : }
     671             : 
     672             : /* Server might kill our request */
     673             : 
     674             : static void
     675             : fd_grpc_h2_rst_stream( fd_h2_conn_t *   conn,
     676             :                        fd_h2_stream_t * h2_stream,
     677             :                        uint             error_code,
     678           3 :                        int              closed_by ) {
     679           3 :   if( closed_by==1 ) {
     680           3 :     FD_LOG_WARNING(( "Server terminated request stream_id=%u (%u-%s)",
     681           3 :                      h2_stream->stream_id, error_code, fd_h2_strerror( error_code ) ));
     682           3 :   } else {
     683           0 :     FD_LOG_WARNING(( "Stream failed stream_id=%u (%u-%s)",
     684           0 :                      h2_stream->stream_id, error_code, fd_h2_strerror( error_code ) ));
     685           0 :   }
     686           3 :   fd_grpc_client_t *    client = conn->ctx;
     687           3 :   fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_upcast( h2_stream );
     688           3 :   client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs ); /* invalidates stream->hdrs */
     689           3 :   fd_grpc_client_stream_release( client, stream );
     690           3 : }
     691             : 
     692             : /* A HTTP/2 flow control change might unblock a queued request send op */
     693             : 
     694             : void
     695             : fd_grpc_h2_window_update( fd_h2_conn_t * conn,
     696           0 :                           uint           increment ) {
     697           0 :   (void)increment;
     698           0 :   fd_grpc_client_request_continue( conn->ctx );
     699           0 : }
     700             : 
     701             : void
     702             : fd_grpc_h2_stream_window_update( fd_h2_conn_t *   conn,
     703             :                                  fd_h2_stream_t * stream,
     704           0 :                                  uint             increment ) {
     705           0 :   (void)stream; (void)increment;
     706           0 :   fd_grpc_client_request_continue( conn->ctx );
     707           0 : }
     708             : 
     709             : void
     710           3 : fd_grpc_h2_ping_ack( fd_h2_conn_t * conn ) {
     711           3 :   fd_grpc_client_t * client = conn->ctx;
     712           3 :   client->callbacks->ping_ack( client->ctx );
     713           3 : }
     714             : 
     715             : fd_h2_rbuf_t *
     716           6 : fd_grpc_client_rbuf_tx( fd_grpc_client_t * client ) {
     717           6 :   return client->frame_tx;
     718           6 : }
     719             : 
     720             : fd_h2_rbuf_t *
     721           0 : fd_grpc_client_rbuf_rx( fd_grpc_client_t * client ) {
     722           0 :   return client->frame_rx;
     723           0 : }
     724             : 
     725             : fd_h2_conn_t *
     726         222 : fd_grpc_client_h2_conn( fd_grpc_client_t * client ) {
     727         222 :   return client->conn;
     728         222 : }
     729             : 
     730             : /* fd_grpc_client_h2_callbacks specifies h2->grpc_client callbacks.
     731             :    Stored in .rodata for security.  Must be kept in sync with fd_h2 to
     732             :    avoid NULL pointers. */
     733             : 
     734             : fd_h2_callbacks_t const fd_grpc_client_h2_callbacks = {
     735             :   .stream_create        = fd_h2_noop_stream_create,
     736             :   .stream_query         = fd_grpc_h2_stream_query,
     737             :   .conn_established     = fd_grpc_h2_conn_established,
     738             :   .conn_final           = fd_grpc_h2_conn_final,
     739             :   .headers              = fd_grpc_h2_cb_headers,
     740             :   .data                 = fd_grpc_h2_cb_data,
     741             :   .rst_stream           = fd_grpc_h2_rst_stream,
     742             :   .window_update        = fd_grpc_h2_window_update,
     743             :   .stream_window_update = fd_grpc_h2_stream_window_update,
     744             :   .ping_ack             = fd_grpc_h2_ping_ack,
     745             : };

Generated by: LCOV version 1.14