LCOV - code coverage report
Current view: top level - waltz/grpc - fd_grpc_client.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 215 490 43.9 %
Date: 2025-07-01 05:00:49 Functions: 14 35 40.0 %

          Line data    Source code
       1             : #include "fd_grpc_client.h"
       2             : #include "fd_grpc_client_private.h"
       3             : #include "../../ballet/nanopb/pb_encode.h" /* pb_msgdesc_t */
       4             : #include <sys/socket.h>
       5             : #include "../h2/fd_h2_rbuf_sock.h"
       6             : #include "fd_grpc_codec.h"
       7             : #if FD_HAS_OPENSSL
       8             : #include "../openssl/fd_openssl.h"
       9             : #include <openssl/ssl.h>
      10             : #include <openssl/err.h>
      11             : #include "../h2/fd_h2_rbuf_ossl.h"
      12             : #endif
      13             : 
      14             : ulong
      15          21 : fd_grpc_client_align( void ) {
      16          21 :   return fd_ulong_max( alignof(fd_grpc_client_t), fd_grpc_h2_stream_pool_align() );
      17          21 : }
      18             : 
      19             : ulong
      20           6 : fd_grpc_client_footprint( ulong buf_max ) {
      21           6 :   ulong l = FD_LAYOUT_INIT;
      22           6 :   l = FD_LAYOUT_APPEND( l, alignof(fd_grpc_client_t), sizeof(fd_grpc_client_t) );
      23           6 :   l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* nanopb_tx */
      24           6 :   l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_scratch */
      25           6 :   l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_rx_buf */
      26           6 :   l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_tx_buf */
      27           6 :   l = FD_LAYOUT_APPEND( l, fd_grpc_h2_stream_pool_align(), fd_grpc_h2_stream_pool_footprint( FD_GRPC_CLIENT_MAX_STREAMS ) );
      28           6 :   l = FD_LAYOUT_APPEND( l, 1UL, buf_max*FD_GRPC_CLIENT_MAX_STREAMS );
      29           6 :   return FD_LAYOUT_FINI( l, fd_grpc_client_align() );
      30           6 : }
      31             : 
      32             : static void
      33          36 : fd_grpc_h2_stream_reset( fd_grpc_h2_stream_t * stream ) {
      34          36 :   memset( &stream->s, 0, sizeof(fd_h2_stream_t) );
      35          36 :   stream->request_ctx = 0UL;
      36          36 :   memset( &stream->hdrs, 0, sizeof(fd_grpc_resp_hdrs_t) );
      37          36 :   stream->hdrs.grpc_status    = FD_GRPC_STATUS_UNKNOWN;
      38          36 :   stream->hdrs_received       = 0;
      39          36 :   stream->msg_buf_used        = 0UL;
      40          36 :   stream->msg_sz              = 0UL;
      41          36 :   stream->has_header_deadline = 0;
      42          36 :   stream->has_rx_end_deadline = 0;
      43          36 : }
      44             : 
      45             : fd_grpc_client_t *
      46             : fd_grpc_client_new( void *                             mem,
      47             :                     fd_grpc_client_callbacks_t const * callbacks,
      48             :                     fd_grpc_client_metrics_t *         metrics,
      49             :                     void *                             app_ctx,
      50             :                     ulong                              buf_max,
      51           3 :                     ulong                              rng_seed ) {
      52           3 :   if( FD_UNLIKELY( !mem ) ) {
      53           0 :     FD_LOG_WARNING(( "NULL mem" ));
      54           0 :     return NULL;
      55           0 :   }
      56           3 :   if( FD_UNLIKELY( buf_max<4096UL ) ) {
      57           0 :     FD_LOG_WARNING(( "undersz buf_max" ));
      58           0 :     return NULL;
      59           0 :   }
      60           3 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_grpc_client_align() ) ) ) {
      61           0 :     FD_LOG_WARNING(( "unaligned mem" ));
      62           0 :     return NULL;
      63           0 :   }
      64             : 
      65           3 :   FD_SCRATCH_ALLOC_INIT( l, mem );
      66           3 :   void * client_mem      = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_grpc_client_t), sizeof(fd_grpc_client_t) );
      67           3 :   void * nanopb_tx       = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* nanopb_tx */
      68           3 :   void * frame_scratch   = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_scratch */
      69           3 :   void * frame_rx_buf    = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_rx_buf */
      70           3 :   void * frame_tx_buf    = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_tx_buf */
      71           3 :   void * stream_pool_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_grpc_h2_stream_pool_align(), fd_grpc_h2_stream_pool_footprint( FD_GRPC_CLIENT_MAX_STREAMS ) );
      72           3 :   void * stream_buf_mem  = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max*FD_GRPC_CLIENT_MAX_STREAMS );
      73           3 :   ulong end = FD_SCRATCH_ALLOC_FINI( l, fd_grpc_client_align() );
      74           3 :   FD_TEST( end-(ulong)mem == fd_grpc_client_footprint( buf_max ) );
      75             : 
      76           3 :   fd_grpc_client_t * client = client_mem;
      77             : 
      78           3 :   fd_grpc_h2_stream_t * stream_pool =
      79           3 :     fd_grpc_h2_stream_pool_join( fd_grpc_h2_stream_pool_new( stream_pool_mem, FD_GRPC_CLIENT_MAX_STREAMS ) );
      80           3 :   if( FD_UNLIKELY( !stream_pool ) ) FD_LOG_CRIT(( "Failed to create stream pool" )); /* unreachable */
      81             : 
      82           3 :   *client = (fd_grpc_client_t){
      83           3 :     .callbacks         = callbacks,
      84           3 :     .ctx               = app_ctx,
      85           3 :     .stream_pool       = stream_pool,
      86           3 :     .stream_bufs       = stream_buf_mem,
      87           3 :     .nanopb_tx         = nanopb_tx,
      88           3 :     .nanopb_tx_max     = buf_max,
      89           3 :     .frame_scratch     = frame_scratch,
      90           3 :     .frame_scratch_max = buf_max,
      91           3 :     .frame_rx_buf      = frame_rx_buf,
      92           3 :     .frame_rx_buf_max  = buf_max,
      93           3 :     .frame_tx_buf      = frame_tx_buf,
      94           3 :     .frame_tx_buf_max  = buf_max,
      95           3 :     .metrics           = metrics
      96           3 :   };
      97             : 
      98             :   /* FIXME for performance, cache this? */
      99           3 :   fd_h2_hdr_matcher_init( client->matcher, rng_seed );
     100           3 :   fd_h2_hdr_matcher_insert_literal( client->matcher, FD_GRPC_HDR_STATUS,  "grpc-status"  );
     101           3 :   fd_h2_hdr_matcher_insert_literal( client->matcher, FD_GRPC_HDR_MESSAGE, "grpc-message" );
     102             : 
     103           3 :   client->version_len = 5;
     104           3 :   memcpy( client->version, "0.0.0", 5 );
     105             : 
     106          27 :   for( ulong i=0UL; i<FD_GRPC_CLIENT_MAX_STREAMS; i++ ) {
     107          24 :     fd_grpc_h2_stream_t * stream = &client->stream_pool[ i ];
     108          24 :     stream->msg_buf     = (uchar *)stream_buf_mem + (i*buf_max);
     109          24 :     stream->msg_buf_max = buf_max;
     110          24 :     FD_TEST( (ulong)( stream->msg_buf + stream->msg_buf_max )<=end );
     111          24 :   }
     112           3 :   fd_grpc_client_reset( client );
     113             : 
     114           3 :   return client;
     115           3 : }
     116             : 
     117             : void *
     118           3 : fd_grpc_client_delete( fd_grpc_client_t * client ) {
     119           3 :   return client;
     120           3 : }
     121             : 
     122             : void
     123             : fd_grpc_client_set_version( fd_grpc_client_t * client,
     124             :                             char const *       version,
     125           0 :                             ulong              version_len ) {
     126           0 :   if( FD_UNLIKELY( version_len > FD_GRPC_CLIENT_VERSION_LEN_MAX ) ) {
     127           0 :     FD_LOG_WARNING(( "Version string too long (%lu chars), ignoring", version_len ));
     128           0 :     return;
     129           0 :   }
     130           0 :   client->version_len = (uchar)version_len;
     131           0 :   memcpy( client->version, version, version_len );
     132           0 : }
     133             : 
     134             : void
     135             : fd_grpc_client_set_authority( fd_grpc_client_t * client,
     136             :                               char const *       host,
     137             :                               ulong              host_len,
     138           0 :                               ushort             port ) {
     139           0 :   host_len = fd_ulong_min( host_len, sizeof(client->host)-1 );
     140           0 :   fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( client->host ), host, host_len ) );
     141           0 :   client->host_len = (uchar)host_len;
     142           0 :   client->port     = (ushort)port;
     143           0 : }
     144             : 
     145             : int
     146          12 : fd_grpc_client_stream_acquire_is_safe( fd_grpc_client_t * client ) {
     147             :   /* Sufficient quota to start a stream? */
     148          12 :   if( FD_UNLIKELY( client->conn->stream_active_cnt[1]+1 > client->conn->peer_settings.max_concurrent_streams ) ) {
     149           0 :     return 0;
     150           0 :   }
     151             : 
     152             :   /* Free stream object available? */
     153          12 :   if( FD_UNLIKELY( !fd_grpc_h2_stream_pool_free( client->stream_pool ) ) ) {
     154           0 :     return 0;
     155           0 :   }
     156          12 :   if( FD_UNLIKELY( client->stream_cnt >= FD_GRPC_CLIENT_MAX_STREAMS ) ) {
     157           0 :     return 0;
     158           0 :   }
     159             : 
     160          12 :   return 1;
     161          12 : }
     162             : 
     163             : fd_grpc_h2_stream_t *
     164             : fd_grpc_client_stream_acquire( fd_grpc_client_t * client,
     165          36 :                                ulong              request_ctx ) {
     166          36 :   if( FD_UNLIKELY( client->stream_cnt >= FD_GRPC_CLIENT_MAX_STREAMS ) ) {
     167           0 :     FD_LOG_CRIT(( "stream pool exhausted" ));
     168           0 :   }
     169             : 
     170          36 :   fd_h2_conn_t * conn = client->conn;
     171          36 :   uint const stream_id = client->conn->tx_stream_next;
     172          36 :   conn->tx_stream_next += 2U;
     173             : 
     174          36 :   fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_pool_ele_acquire( client->stream_pool );
     175          36 :   fd_grpc_h2_stream_reset( stream );
     176          36 :   stream->request_ctx = request_ctx;
     177             : 
     178          36 :   fd_h2_stream_open( fd_h2_stream_init( &stream->s ), conn, stream_id );
     179          36 :   client->request_stream = stream;
     180          36 :   client->stream_ids[ client->stream_cnt ] = stream_id;
     181          36 :   client->streams   [ client->stream_cnt ] = stream;
     182          36 :   client->stream_cnt++;
     183             : 
     184          36 :   return stream;
     185          36 : }
     186             : 
     187             : void
     188             : fd_grpc_client_stream_release( fd_grpc_client_t *    client,
     189          36 :                                fd_grpc_h2_stream_t * stream ) {
     190          36 :   if( FD_UNLIKELY( !client->stream_cnt ) ) FD_LOG_CRIT(( "stream map corrupt" )); /* unreachable */
     191             : 
     192             :   /* Deallocate tx_op */
     193          36 :   if( FD_UNLIKELY( stream == client->request_stream ) ) {
     194          24 :     client->request_stream = NULL;
     195          24 :     *client->request_tx_op = (fd_h2_tx_op_t){0};
     196          24 :   }
     197             : 
     198             :   /* Remove stream from map */
     199          36 :   int map_idx = -1;
     200          90 :   for( uint i=0UL; i<(client->stream_cnt); i++ ) {
     201          54 :     if( client->stream_ids[ i ] == stream->s.stream_id ) {
     202          36 :       map_idx = (int)i;
     203          36 :     }
     204          54 :   }
     205          36 :   if( FD_UNLIKELY( map_idx<0 ) ) FD_LOG_CRIT(( "stream map corrupt" )); /* unreachable */
     206          36 :   if( (ulong)map_idx+1 < client->stream_cnt ) {
     207           6 :     client->stream_ids[ map_idx ] = client->stream_ids[ client->stream_cnt-1 ];
     208           6 :     client->streams   [ map_idx ] = client->streams   [ client->stream_cnt-1 ];
     209           6 :   }
     210          36 :   client->stream_cnt--;
     211             : 
     212          36 :   fd_grpc_h2_stream_pool_ele_release( client->stream_pool, stream );
     213          36 : }
     214             : 
     215             : void
     216          18 : fd_grpc_client_reset( fd_grpc_client_t * client ) {
     217          18 :   fd_h2_rbuf_init( client->frame_rx, client->frame_rx_buf, client->frame_rx_buf_max );
     218          18 :   fd_h2_rbuf_init( client->frame_tx, client->frame_tx_buf, client->frame_tx_buf_max );
     219          18 :   fd_h2_conn_init_client( client->conn );
     220          18 :   client->conn->ctx      = client;
     221          18 :   client->h2_hs_done     = 0;
     222          18 :   client->ssl_hs_done    = 0;
     223          18 :   client->request_stream = NULL;
     224          18 :   *client->request_tx_op = (fd_h2_tx_op_t){0};
     225             : 
     226             :   /* Disable RX flow control */
     227          18 :   client->conn->self_settings.initial_window_size = (1U<<31)-1U;
     228          18 :   client->conn->rx_wnd_max   = (1U<<31)-1U;
     229          18 :   client->conn->rx_wnd_wmark = client->conn->rx_wnd_max - (1U<<20);
     230             : 
     231             :   /* Free all stream objects */
     232          21 :   while( client->stream_cnt ) {
     233           3 :     fd_grpc_h2_stream_t * stream = client->streams[ client->stream_cnt-1 ];
     234           3 :     fd_grpc_client_stream_release( client, stream );
     235           3 :   }
     236          18 : }
     237             : 
     238             : /* fd_grpc_client_send_stream_quota writes a WINDOW_UPDATE frame, which
     239             :    eventually allows the peer to send more data bytes. */
     240             : 
     241             : static void
     242             : fd_grpc_client_send_stream_quota( fd_h2_rbuf_t *        rbuf_tx,
     243             :                                   fd_grpc_h2_stream_t * stream,
     244           3 :                                   uint                  bump ) {
     245           3 :   fd_h2_window_update_t window_update = {
     246           3 :     .hdr = {
     247           3 :       .typlen      = fd_h2_frame_typlen( FD_H2_FRAME_TYPE_WINDOW_UPDATE, 4UL ),
     248           3 :       .r_stream_id = fd_uint_bswap( stream->s.stream_id )
     249           3 :     },
     250           3 :     .increment = fd_uint_bswap( bump )
     251           3 :   };
     252           3 :   fd_h2_rbuf_push( rbuf_tx, &window_update, sizeof(fd_h2_window_update_t) );
     253           3 :   stream->s.rx_wnd += bump;
     254           3 : }
     255             : 
     256             : /* fd_grpc_client_send_timeout is called when a stream timeout triggers.
     257             :    Calls back to the user, writes a RST_STREAM frame, and frees the
     258             :    stream object. */
     259             : 
     260             : static void
     261             : fd_grpc_client_send_timeout( fd_h2_rbuf_t *        rbuf_tx,
     262             :                              fd_grpc_client_t *    client,
     263             :                              fd_grpc_h2_stream_t * stream,
     264           6 :                              int                   deadline_kind ) {
     265           6 :   client->callbacks->rx_timeout( client->ctx, stream->request_ctx, deadline_kind );
     266           6 :   fd_h2_tx_rst_stream( rbuf_tx, stream->s.stream_id, FD_H2_ERR_CANCEL );
     267           6 :   fd_grpc_client_stream_release( client, stream );
     268           6 : }
     269             : 
     270             : void
     271             : fd_grpc_client_service_streams( fd_grpc_client_t * client,
     272          18 :                                 long               ts_nanos ) {
     273          18 :   ulong const meta_frame_max =
     274          18 :     fd_ulong_max( sizeof(fd_h2_window_update_t), sizeof(fd_h2_rst_stream_t) );
     275          18 :   fd_h2_conn_t * conn    = client->conn;
     276          18 :   fd_h2_rbuf_t * rbuf_tx = client->frame_tx;
     277          18 :   if( FD_UNLIKELY( conn->flags ) ) return;
     278          18 :   uint  const wnd_max    = conn->self_settings.initial_window_size;
     279          18 :   uint  const wnd_thres  = wnd_max / 2;
     280          36 :   for( ulong i=0UL; i<(client->stream_cnt); i++ ) {
     281          18 :     if( FD_UNLIKELY( fd_h2_rbuf_free_sz( rbuf_tx )<meta_frame_max ) ) break;
     282          18 :     fd_grpc_h2_stream_t * stream = client->streams[ i ];
     283             : 
     284          18 :     if( FD_UNLIKELY( ( stream->has_header_deadline ) &
     285          18 :                      ( stream->header_deadline_nanos - ts_nanos <= 0L ) ) ) {
     286           3 :       fd_grpc_client_send_timeout( rbuf_tx, client, stream, FD_GRPC_DEADLINE_HEADER );
     287           3 :       i--; /* stream removed */
     288           3 :       continue;
     289           3 :     }
     290             : 
     291          15 :     if( FD_UNLIKELY( ( stream->has_rx_end_deadline ) &
     292          15 :                      ( stream->rx_end_deadline_nanos - ts_nanos <= 0L ) ) ) {
     293           3 :       fd_grpc_client_send_timeout( rbuf_tx, client, stream, FD_GRPC_DEADLINE_RX_END );
     294           3 :       i--; /* stream removed */
     295           3 :       continue;
     296           3 :     }
     297             : 
     298          12 :     if( FD_UNLIKELY( stream->s.rx_wnd < wnd_thres ) ) {
     299           3 :       uint const bump = wnd_max - stream->s.rx_wnd;
     300           3 :       fd_grpc_client_send_stream_quota( rbuf_tx, stream, bump );
     301           3 :     }
     302          12 :   }
     303          18 : }
     304             : 
     305             : #if FD_HAS_OPENSSL
     306             : 
     307             : static int
     308             : fd_ossl_log_error( char const * str,
     309             :                    ulong        len,
     310           0 :                    void *       ctx ) {
     311           0 :   (void)ctx;
     312           0 :   if( len>0 && str[ len-1 ]=='\n' ) len--;
     313           0 :   FD_LOG_INFO(( "%.*s", (int)len, str ));
     314           0 :   return 0;
     315           0 : }
     316             : 
     317             : int
     318             : fd_grpc_client_rxtx_ossl( fd_grpc_client_t * client,
     319             :                           SSL *              ssl,
     320           0 :                           int *              charge_busy ) {
     321           0 :   if( FD_UNLIKELY( !client->ssl_hs_done ) ) {
     322           0 :     int res = SSL_do_handshake( ssl );
     323           0 :     if( res<=0 ) {
     324           0 :       int error = SSL_get_error( ssl, res );
     325           0 :       if( FD_LIKELY( error==SSL_ERROR_WANT_READ || error==SSL_ERROR_WANT_WRITE ) ) return 1;
     326           0 :       FD_LOG_INFO(( "SSL_do_handshake failed (%i-%s)", error, fd_openssl_ssl_strerror( error ) ));
     327           0 :       long verify_result = SSL_get_verify_result( ssl );
     328           0 :       if( error == SSL_ERROR_SSL && verify_result != X509_V_OK ) {
     329           0 :         FD_LOG_WARNING(( "Certificate verification failed: %s", X509_verify_cert_error_string( verify_result ) ));
     330           0 :       }
     331           0 :       ERR_print_errors_cb( fd_ossl_log_error, NULL );
     332           0 :       return 0;
     333           0 :     } else {
     334           0 :       client->ssl_hs_done = 1;
     335           0 :     }
     336           0 :   }
     337             : 
     338           0 :   fd_h2_conn_t * conn = client->conn;
     339           0 :   int ssl_err = 0;
     340           0 :   ulong read_sz = fd_h2_rbuf_ssl_read( client->frame_rx, ssl, &ssl_err );
     341           0 :   if( FD_UNLIKELY( ssl_err && ssl_err!=SSL_ERROR_WANT_READ ) ) {
     342           0 :     if( ssl_err==SSL_ERROR_ZERO_RETURN ) {
     343           0 :       FD_LOG_WARNING(( "gRPC server closed connection" ));
     344           0 :       return 0;
     345           0 :     }
     346           0 :     FD_LOG_WARNING(( "SSL_read_ex failed (%i-%s)", ssl_err, fd_openssl_ssl_strerror( ssl_err ) ));
     347           0 :     ERR_print_errors_cb( fd_ossl_log_error, NULL );
     348           0 :     return 0;
     349           0 :   }
     350           0 :   if( FD_UNLIKELY( conn->flags ) ) fd_h2_tx_control( conn, client->frame_tx, &fd_grpc_client_h2_callbacks );
     351           0 :   fd_h2_rx( conn, client->frame_rx, client->frame_tx, client->frame_scratch, client->frame_scratch_max, &fd_grpc_client_h2_callbacks );
     352           0 :   fd_grpc_client_service_streams( client, fd_log_wallclock() );
     353           0 :   ulong write_sz = fd_h2_rbuf_ssl_write( client->frame_tx, ssl );
     354             : 
     355           0 :   if( read_sz!=0 || write_sz!=0 ) *charge_busy = 1;
     356           0 :   return 1;
     357           0 : }
     358             : 
     359             : #endif /* FD_HAS_OPENSSL */
     360             : 
     361             : #if FD_H2_HAS_SOCKETS
     362             : 
     363             : int
     364             : fd_grpc_client_rxtx_socket( fd_grpc_client_t * client,
     365             :                             int                sock_fd,
     366           0 :                             int *              charge_busy ) {
     367           0 :   fd_h2_conn_t * conn = client->conn;
     368           0 :   ulong const frame_rx_lo_0 = client->frame_rx->lo_off;
     369           0 :   ulong const frame_rx_hi_0 = client->frame_rx->hi_off;
     370           0 :   ulong const frame_tx_lo_1 = client->frame_tx->lo_off;
     371           0 :   ulong const frame_tx_hi_1 = client->frame_tx->hi_off;
     372             : 
     373           0 :   int rx_err = fd_h2_rbuf_recvmsg( client->frame_rx, sock_fd, MSG_NOSIGNAL|MSG_DONTWAIT );
     374           0 :   if( FD_UNLIKELY( rx_err ) ) {
     375           0 :     FD_LOG_INFO(( "Disconnected: recvmsg error (%i-%s)", rx_err, fd_io_strerror( rx_err ) ));
     376           0 :     return 0;
     377           0 :   }
     378             : 
     379           0 :   if( FD_UNLIKELY( conn->flags ) ) fd_h2_tx_control( conn, client->frame_tx, &fd_grpc_client_h2_callbacks );
     380           0 :   fd_h2_rx( conn, client->frame_rx, client->frame_tx, client->frame_scratch, client->frame_scratch_max, &fd_grpc_client_h2_callbacks );
     381           0 :   fd_grpc_client_service_streams( client, fd_log_wallclock() );
     382             : 
     383           0 :   int tx_err = fd_h2_rbuf_sendmsg( client->frame_tx, sock_fd, MSG_NOSIGNAL|MSG_DONTWAIT );
     384           0 :   if( FD_UNLIKELY( tx_err ) ) {
     385           0 :     FD_LOG_WARNING(( "fd_h2_rbuf_sendmsg failed (%i-%s)", tx_err, fd_io_strerror( tx_err ) ));
     386           0 :     return 0;
     387           0 :   }
     388             : 
     389           0 :   ulong const frame_rx_lo_1 = client->frame_rx->lo_off;
     390           0 :   ulong const frame_rx_hi_1 = client->frame_rx->hi_off;
     391           0 :   ulong const frame_tx_lo_0 = client->frame_tx->lo_off;
     392           0 :   ulong const frame_tx_hi_0 = client->frame_tx->hi_off;
     393             : 
     394           0 :   if( frame_rx_lo_0!=frame_rx_lo_1 || frame_rx_hi_0!=frame_rx_hi_1 ||
     395           0 :       frame_tx_lo_0!=frame_tx_lo_1 || frame_tx_hi_0!=frame_tx_hi_1 ) {
     396           0 :     *charge_busy = 1;
     397           0 :   }
     398             : 
     399           0 :   return 1;
     400           0 : }
     401             : 
     402             : #endif /* FD_H2_HAS_SOCKETS */
     403             : 
     404             : /* fd_grpc_client_request continue attempts to write a request data
     405             :    frame. */
     406             : 
     407             : static int
     408           0 : fd_grpc_client_request_continue1( fd_grpc_client_t * client ) {
     409           0 :   fd_grpc_h2_stream_t * stream    = client->request_stream;
     410           0 :   fd_h2_stream_t *      h2_stream = &stream->s;
     411           0 :   fd_h2_tx_op_copy( client->conn, h2_stream, client->frame_tx, client->request_tx_op );
     412           0 :   if( FD_UNLIKELY( client->request_tx_op->chunk_sz ) ) return 0;
     413           0 :   if( FD_UNLIKELY( h2_stream->state != FD_H2_STREAM_STATE_CLOSING_TX ) ) return 0;
     414           0 :   client->metrics->stream_chunks_tx_cnt++;
     415             :   /* Request finished */
     416           0 :   client->request_stream = NULL;
     417           0 :   client->callbacks->tx_complete( client->ctx, stream->request_ctx );
     418           0 :   return 1;
     419           0 : }
     420             : 
     421             : static int
     422           0 : fd_grpc_client_request_continue( fd_grpc_client_t * client ) {
     423           0 :   if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
     424           0 :   if( FD_UNLIKELY( !client->request_stream ) ) return 0;
     425           0 :   if( FD_UNLIKELY( !client->request_tx_op->chunk_sz ) ) return 0;
     426           0 :   return fd_grpc_client_request_continue1( client );
     427           0 : }
     428             : 
     429             : int
     430           0 : fd_grpc_client_is_connected( fd_grpc_client_t * client ) {
     431           0 :   if( FD_UNLIKELY( !client                                          ) ) return 0;
     432           0 :   if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD      ) ) return 0;
     433           0 :   if( FD_UNLIKELY( !client->h2_hs_done                              ) ) return 0;
     434           0 :   return 1;
     435           0 : }
     436             : 
     437             : int
     438           0 : fd_grpc_client_request_is_blocked( fd_grpc_client_t * client ) {
     439           0 :   if( FD_UNLIKELY( !client                                          ) ) return 1;
     440           0 :   if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD      ) ) return 1;
     441           0 :   if( FD_UNLIKELY( !client->h2_hs_done                              ) ) return 1;
     442           0 :   if( FD_UNLIKELY( !fd_h2_rbuf_is_empty( client->frame_tx )         ) ) return 1;
     443           0 :   if( FD_UNLIKELY( !fd_grpc_client_stream_acquire_is_safe( client ) ) ) return 1;
     444           0 :   return 0;
     445           0 : }
     446             : 
     447             : fd_grpc_h2_stream_t *
     448             : fd_grpc_client_request_start(
     449             :     fd_grpc_client_t *   client,
     450             :     char const *         path,
     451             :     ulong                path_len,
     452             :     ulong                request_ctx,
     453             :     pb_msgdesc_t const * fields,
     454             :     void const *         message,
     455             :     char const *         auth_token,
     456             :     ulong                auth_token_sz
     457           0 : ) {
     458           0 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( client ) ) ) return NULL;
     459             : 
     460             :   /* Encode message */
     461           0 :   FD_TEST( client->nanopb_tx_max > sizeof(fd_grpc_hdr_t) );
     462           0 :   uchar * proto_buf = client->nanopb_tx + sizeof(fd_grpc_hdr_t);
     463           0 :   pb_ostream_t ostream = pb_ostream_from_buffer( proto_buf, client->nanopb_tx_max - sizeof(fd_grpc_hdr_t) );
     464           0 :   if( FD_UNLIKELY( !pb_encode( &ostream, fields, message ) ) ) {
     465           0 :     FD_LOG_WARNING(( "Failed to encode Protobuf message (%.*s). This is a bug (insufficient buffer space?)", (int)path_len, path ));
     466           0 :     return NULL;
     467           0 :   }
     468           0 :   ulong const serialized_sz = ostream.bytes_written;
     469             : 
     470             :   /* Create gRPC length prefix */
     471           0 :   fd_grpc_hdr_t hdr = {
     472           0 :     .compressed=0,
     473           0 :     .msg_sz=fd_uint_bswap( (uint)serialized_sz )
     474           0 :   };
     475           0 :   memcpy( client->nanopb_tx, &hdr, sizeof(fd_grpc_hdr_t) );
     476           0 :   ulong const payload_sz = serialized_sz + sizeof(fd_grpc_hdr_t);
     477             : 
     478             :   /* Allocate stream descriptor */
     479           0 :   fd_grpc_h2_stream_t * stream    = fd_grpc_client_stream_acquire( client, request_ctx );
     480           0 :   uint const            stream_id = stream->s.stream_id;
     481             : 
     482             :   /* Write HTTP/2 request headers */
     483           0 :   fd_h2_tx_prepare( client->conn, client->frame_tx, FD_H2_FRAME_TYPE_HEADERS, FD_H2_FLAG_END_HEADERS, stream_id );
     484           0 :   fd_grpc_req_hdrs_t req_meta = {
     485           0 :     .host     = client->host,
     486           0 :     .host_len = client->host_len,
     487           0 :     .port     = client->port,
     488           0 :     .path     = path,
     489           0 :     .path_len = path_len,
     490           0 :     .https    = 1, /* grpc_client assumes TLS encryption for now */
     491             : 
     492           0 :     .bearer_auth     = auth_token,
     493           0 :     .bearer_auth_len = auth_token_sz
     494           0 :   };
     495           0 :   if( FD_UNLIKELY( !fd_grpc_h2_gen_request_hdrs(
     496           0 :       &req_meta,
     497           0 :       client->frame_tx,
     498           0 :       client->version,
     499           0 :       client->version_len
     500           0 :   ) ) ) {
     501           0 :     FD_LOG_WARNING(( "Failed to generate gRPC request headers (%.*s). This is a bug", (int)path_len, path ));
     502           0 :     return NULL;
     503           0 :   }
     504           0 :   fd_h2_tx_commit( client->conn, client->frame_tx );
     505             : 
     506             :   /* Queue request payload for send
     507             :      (Protobuf message might have to be fragmented into multiple HTTP/2
     508             :      DATA frames if the client gets blocked) */
     509           0 :   fd_h2_tx_op_init( client->request_tx_op, client->nanopb_tx, payload_sz, FD_H2_FLAG_END_STREAM );
     510           0 :   fd_grpc_client_request_continue1( client );
     511           0 :   client->metrics->requests_sent++;
     512           0 :   client->metrics->streams_active++;
     513             : 
     514           0 :   FD_LOG_DEBUG(( "gRPC request path=%.*s sz=%lu", (int)path_len, path, serialized_sz ));
     515             : 
     516           0 :   return stream;
     517           0 : }
     518             : 
     519             : void
     520             : fd_grpc_client_deadline_set( fd_grpc_h2_stream_t * stream,
     521             :                              int                   deadline_kind,
     522           9 :                              long                  ts_nanos ) {
     523           9 :   switch( deadline_kind ) {
     524           6 :   case FD_GRPC_DEADLINE_HEADER:
     525           6 :     stream->header_deadline_nanos = ts_nanos;
     526           6 :     stream->has_header_deadline   = 1;
     527           6 :     break;
     528           3 :   case FD_GRPC_DEADLINE_RX_END:
     529           3 :     stream->rx_end_deadline_nanos = ts_nanos;
     530           3 :     stream->has_rx_end_deadline   = 1;
     531           3 :     break;
     532           9 :   }
     533           9 : }
     534             : 
     535             : /* Lookup stream by ID */
     536             : 
     537             : static fd_h2_stream_t *
     538             : fd_grpc_h2_stream_query( fd_h2_conn_t * conn,
     539           0 :                          uint           stream_id ) {
     540           0 :   fd_grpc_client_t * client = conn->ctx;
     541           0 :   for( ulong i=0UL; i<FD_GRPC_CLIENT_MAX_STREAMS; i++ ) {
     542           0 :     if( client->stream_ids[ i ] == stream_id ) {
     543           0 :       return &client->streams[ i ]->s;
     544           0 :     }
     545           0 :   }
     546           0 :   return NULL;
     547           0 : }
     548             : 
     549             : static void
     550           0 : fd_grpc_h2_conn_established( fd_h2_conn_t * conn ) {
     551           0 :   fd_grpc_client_t * client = conn->ctx;
     552           0 :   client->h2_hs_done = 1;
     553           0 :   client->callbacks->conn_established( client->ctx );
     554           0 : }
     555             : 
     556             : static void
     557             : fd_grpc_h2_conn_final( fd_h2_conn_t * conn,
     558             :                        uint           h2_err,
     559           0 :                        int            closed_by ) {
     560           0 :   fd_grpc_client_t * client = conn->ctx;
     561           0 :   client->callbacks->conn_dead( client->ctx, h2_err, closed_by );
     562           0 : }
     563             : 
     564             : /* React to response data */
     565             : 
     566             : void
     567             : fd_grpc_h2_cb_headers(
     568             :     fd_h2_conn_t *   conn,
     569             :     fd_h2_stream_t * h2_stream,
     570             :     void const *     data,
     571             :     ulong            data_sz,
     572             :     ulong            flags
     573          18 : ) {
     574          18 :   fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_upcast( h2_stream );
     575          18 :   fd_grpc_client_t * client = conn->ctx;
     576             : 
     577          18 :   int h2_status = fd_grpc_h2_read_response_hdrs( &stream->hdrs, client->matcher, data, data_sz );
     578          18 :   if( FD_UNLIKELY( h2_status!=FD_H2_SUCCESS ) ) {
     579             :     /* Failed to parse HTTP/2 headers */
     580           3 :     fd_h2_stream_error( h2_stream, client->frame_tx, FD_H2_ERR_PROTOCOL );
     581           3 :     client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs ); /* invalidates stream->hdrs */
     582           3 :     fd_grpc_client_stream_release( client, stream );
     583           3 :     return;
     584           3 :   }
     585             : 
     586          15 :   if( !stream->hdrs_received && !!( flags & FD_H2_FLAG_END_HEADERS) ) {
     587             :     /* Got initial response header */
     588          12 :     stream->hdrs_received = 1;
     589          12 :     stream->has_header_deadline = 0;
     590          12 :     if( FD_LIKELY( ( stream->hdrs.h2_status==200  ) &
     591          12 :                    ( !!stream->hdrs.is_grpc_proto ) ) ) {
     592           6 :       client->callbacks->rx_start( client->ctx, stream->request_ctx );
     593           6 :     }
     594          12 :   }
     595             : 
     596          15 :   if( ( flags & (FD_H2_FLAG_END_HEADERS|FD_H2_FLAG_END_STREAM) )
     597          15 :               ==(FD_H2_FLAG_END_HEADERS|FD_H2_FLAG_END_STREAM)   ) {
     598           3 :     client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
     599           3 :     fd_grpc_client_stream_release( client, stream );
     600           3 :     return;
     601           3 :   }
     602          15 : }
     603             : 
     604             : static void
     605             : fd_grpc_h2_cb_data(
     606             :     fd_h2_conn_t *   conn,
     607             :     fd_h2_stream_t * h2_stream,
     608             :     void const *     data,
     609             :     ulong            data_sz,
     610             :     ulong            flags
     611           0 : ) {
     612           0 :   fd_grpc_client_t *    client = conn->ctx;
     613           0 :   fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_upcast( h2_stream );
     614           0 :   if( FD_UNLIKELY( ( stream->hdrs.h2_status!=200 ) |
     615           0 :                    ( !stream->hdrs.is_grpc_proto ) ) ) {
     616           0 :     return;
     617           0 :   }
     618             : 
     619           0 :   do {
     620             : 
     621             :     /* Read header bytes */
     622           0 :     if( stream->msg_buf_used < sizeof(fd_grpc_hdr_t) ) {
     623           0 :       ulong hdr_frag_sz = fd_ulong_min( sizeof(fd_grpc_hdr_t) - stream->msg_buf_used, data_sz );
     624           0 :       fd_memcpy( stream->msg_buf + stream->msg_buf_used, data, hdr_frag_sz );
     625           0 :       stream->msg_buf_used += hdr_frag_sz;
     626           0 :       data     = (void const *)( (ulong)data + (ulong)hdr_frag_sz );
     627           0 :       data_sz -= hdr_frag_sz;
     628           0 :       if( FD_UNLIKELY( stream->msg_buf_used < sizeof(fd_grpc_hdr_t) ) ) return;
     629             : 
     630             :       /* Header complete */
     631           0 :       stream->msg_sz = fd_uint_bswap( FD_LOAD( uint, (void *)( (ulong)stream->msg_buf+1 ) ) );
     632           0 :       if( FD_UNLIKELY( sizeof(fd_grpc_hdr_t)  + stream->msg_sz > stream->msg_buf_max ) ) {
     633           0 :         FD_LOG_WARNING(( "Received oversized gRPC message (%lu bytes), killing request", stream->msg_sz ));
     634           0 :         fd_h2_stream_error( h2_stream, client->frame_tx, FD_H2_ERR_INTERNAL );
     635           0 :         fd_grpc_client_stream_release( client, stream );
     636           0 :         return;
     637           0 :       }
     638           0 :     }
     639             : 
     640             :     /* Read payload bytes */
     641           0 :     ulong wmark    = sizeof(fd_grpc_hdr_t) + stream->msg_sz;
     642           0 :     ulong chunk_sz = fd_ulong_min( stream->msg_buf_used+data_sz, wmark ) - stream->msg_buf_used;
     643           0 :     if( FD_UNLIKELY( chunk_sz>data_sz ) ) FD_LOG_CRIT(( "integer underflow" )); /* unreachable */
     644           0 :     fd_memcpy( stream->msg_buf + stream->msg_buf_used, data, chunk_sz );
     645           0 :     stream->msg_buf_used += chunk_sz;
     646           0 :     data     = (void const *)( (ulong)data + (ulong)chunk_sz );
     647           0 :     data_sz -= chunk_sz;
     648             : 
     649           0 :     client->metrics->stream_chunks_rx_cnt++;
     650           0 :     client->metrics->stream_chunks_rx_bytes += chunk_sz;
     651             : 
     652           0 :     if( stream->msg_buf_used >= wmark ) {
     653             :       /* Data complete */
     654           0 :       void const * msg_ptr = stream->msg_buf + sizeof(fd_grpc_hdr_t);
     655           0 :       client->callbacks->rx_msg( client->ctx, msg_ptr, stream->msg_sz, stream->request_ctx );
     656           0 :       stream->msg_buf_used = 0UL;
     657           0 :       stream->msg_sz       = 0UL;
     658           0 :     }
     659             : 
     660           0 :   } while( data_sz );
     661             : 
     662           0 :   if( flags & FD_H2_FLAG_END_STREAM ) {
     663             :     /* FIXME incomplete gRPC message */
     664           0 :     if( FD_UNLIKELY( !stream->msg_buf_used ) ) {
     665           0 :       FD_LOG_WARNING(( "Received incomplete gRPC message" ));
     666           0 :     }
     667           0 :     client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
     668           0 :   }
     669           0 : }
     670             : 
     671             : /* Server might kill our request */
     672             : 
     673             : static void
     674             : fd_grpc_h2_rst_stream( fd_h2_conn_t *   conn,
     675             :                        fd_h2_stream_t * stream,
     676             :                        uint             error_code,
     677           0 :                        int              closed_by ) {
     678           0 :   if( closed_by==1 ) {
     679           0 :     FD_LOG_WARNING(( "Server terminated request stream_id=%u (%u-%s)",
     680           0 :                      stream->stream_id, error_code, fd_h2_strerror( error_code ) ));
     681           0 :   } else {
     682           0 :     FD_LOG_WARNING(( "Stream failed stream_id=%u (%u-%s)",
     683           0 :                      stream->stream_id, error_code, fd_h2_strerror( error_code ) ));
     684           0 :   }
     685           0 :   fd_grpc_client_t * client = conn->ctx;
     686           0 :   fd_grpc_client_stream_release( client, fd_grpc_h2_stream_upcast( stream ) );
     687           0 : }
     688             : 
     689             : /* A HTTP/2 flow control change might unblock a queued request send op */
     690             : 
     691             : void
     692             : fd_grpc_h2_window_update( fd_h2_conn_t * conn,
     693           0 :                           uint           increment ) {
     694           0 :   (void)increment;
     695           0 :   fd_grpc_client_request_continue( conn->ctx );
     696           0 : }
     697             : 
     698             : void
     699             : fd_grpc_h2_stream_window_update( fd_h2_conn_t *   conn,
     700             :                                  fd_h2_stream_t * stream,
     701           0 :                                  uint             increment ) {
     702           0 :   (void)stream; (void)increment;
     703           0 :   fd_grpc_client_request_continue( conn->ctx );
     704           0 : }
     705             : 
     706             : void
     707           0 : fd_grpc_h2_ping_ack( fd_h2_conn_t * conn ) {
     708           0 :   fd_grpc_client_t * client = conn->ctx;
     709           0 :   client->callbacks->ping_ack( client->ctx );
     710           0 : }
     711             : 
     712             : fd_h2_rbuf_t *
     713           0 : fd_grpc_client_rbuf_tx( fd_grpc_client_t * client ) {
     714           0 :   return client->frame_tx;
     715           0 : }
     716             : 
     717             : fd_h2_rbuf_t *
     718           0 : fd_grpc_client_rbuf_rx( fd_grpc_client_t * client ) {
     719           0 :   return client->frame_rx;
     720           0 : }
     721             : 
     722             : fd_h2_conn_t *
     723           0 : fd_grpc_client_h2_conn( fd_grpc_client_t * client ) {
     724           0 :   return client->conn;
     725           0 : }
     726             : 
     727             : /* fd_grpc_client_h2_callbacks specifies h2->grpc_client callbacks.
     728             :    Stored in .rodata for security.  Must be kept in sync with fd_h2 to
     729             :    avoid NULL pointers. */
     730             : 
     731             : fd_h2_callbacks_t const fd_grpc_client_h2_callbacks = {
     732             :   .stream_create        = fd_h2_noop_stream_create,
     733             :   .stream_query         = fd_grpc_h2_stream_query,
     734             :   .conn_established     = fd_grpc_h2_conn_established,
     735             :   .conn_final           = fd_grpc_h2_conn_final,
     736             :   .headers              = fd_grpc_h2_cb_headers,
     737             :   .data                 = fd_grpc_h2_cb_data,
     738             :   .rst_stream           = fd_grpc_h2_rst_stream,
     739             :   .window_update        = fd_grpc_h2_window_update,
     740             :   .stream_window_update = fd_grpc_h2_stream_window_update,
     741             :   .ping_ack             = fd_grpc_h2_ping_ack,
     742             : };

Generated by: LCOV version 1.14