LCOV - code coverage report
Current view: top level - waltz/grpc - fd_grpc_client.c (source / functions) Hit Total Coverage
Test: cov.lcov Lines: 351 613 57.3 %
Date: 2026-02-13 06:06:24 Functions: 24 40 60.0 %

          Line data    Source code
       1             : #include "fd_grpc_client.h"
       2             : #include "fd_grpc_client_private.h"
       3             : #include "../../ballet/nanopb/pb_encode.h" /* pb_msgdesc_t */
       4             : #include <sys/socket.h>
       5             : #include "../h2/fd_h2_rbuf_sock.h"
       6             : #include "fd_grpc_codec.h"
       7             : #if FD_HAS_OPENSSL
       8             : #include "../openssl/fd_openssl.h"
       9             : #include <openssl/ssl.h>
      10             : #include <openssl/err.h>
      11             : #include "../h2/fd_h2_rbuf_ossl.h"
      12             : #endif
      13             : 
      14             : ulong
      15         405 : fd_grpc_client_align( void ) {
      16         405 :   return fd_ulong_max( alignof(fd_grpc_client_t), fd_grpc_h2_stream_pool_align() );
      17         405 : }
      18             : 
      19             : ulong
      20         102 : fd_grpc_client_footprint( ulong buf_max ) {
      21         102 :   ulong l = FD_LAYOUT_INIT;
      22         102 :   l = FD_LAYOUT_APPEND( l, alignof(fd_grpc_client_t), sizeof(fd_grpc_client_t) );
      23         102 :   l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* nanopb_tx */
      24         102 :   l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_scratch */
      25         102 :   l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_rx_buf */
      26         102 :   l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_tx_buf */
      27         102 :   l = FD_LAYOUT_APPEND( l, fd_grpc_h2_stream_pool_align(), fd_grpc_h2_stream_pool_footprint( FD_GRPC_CLIENT_MAX_STREAMS ) );
      28         102 :   l = FD_LAYOUT_APPEND( l, 1UL, buf_max*FD_GRPC_CLIENT_MAX_STREAMS );
      29         102 :   return FD_LAYOUT_FINI( l, fd_grpc_client_align() );
      30         102 : }
      31             : 
      32             : static void
      33         129 : fd_grpc_h2_stream_reset( fd_grpc_h2_stream_t * stream ) {
      34         129 :   memset( &stream->s, 0, sizeof(fd_h2_stream_t) );
      35         129 :   stream->request_ctx = 0UL;
      36         129 :   memset( &stream->hdrs, 0, sizeof(fd_grpc_resp_hdrs_t) );
      37         129 :   stream->hdrs.grpc_status    = FD_GRPC_STATUS_UNKNOWN;
      38         129 :   stream->hdrs_received       = 0;
      39         129 :   stream->msg_buf_used        = 0UL;
      40         129 :   stream->msg_sz              = 0UL;
      41         129 :   stream->has_header_deadline = 0;
      42         129 :   stream->has_rx_end_deadline = 0;
      43         129 : }
      44             : 
      45             : fd_grpc_client_t *
      46             : fd_grpc_client_new( void *                             mem,
      47             :                     fd_grpc_client_callbacks_t const * callbacks,
      48             :                     fd_grpc_client_metrics_t *         metrics,
      49             :                     void *                             app_ctx,
      50             :                     ulong                              buf_max,
      51          51 :                     ulong                              rng_seed ) {
      52          51 :   if( FD_UNLIKELY( !mem ) ) {
      53           0 :     FD_LOG_WARNING(( "NULL mem" ));
      54           0 :     return NULL;
      55           0 :   }
      56          51 :   if( FD_UNLIKELY( buf_max<4096UL ) ) {
      57           0 :     FD_LOG_WARNING(( "undersz buf_max" ));
      58           0 :     return NULL;
      59           0 :   }
      60          51 :   if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_grpc_client_align() ) ) ) {
      61           0 :     FD_LOG_WARNING(( "unaligned mem" ));
      62           0 :     return NULL;
      63           0 :   }
      64             : 
      65          51 :   FD_SCRATCH_ALLOC_INIT( l, mem );
      66          51 :   void * client_mem      = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_grpc_client_t), sizeof(fd_grpc_client_t) );
      67          51 :   void * nanopb_tx       = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* nanopb_tx */
      68          51 :   void * frame_scratch   = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_scratch */
      69          51 :   void * frame_rx_buf    = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_rx_buf */
      70          51 :   void * frame_tx_buf    = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_tx_buf */
      71          51 :   void * stream_pool_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_grpc_h2_stream_pool_align(), fd_grpc_h2_stream_pool_footprint( FD_GRPC_CLIENT_MAX_STREAMS ) );
      72          51 :   void * stream_buf_mem  = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max*FD_GRPC_CLIENT_MAX_STREAMS );
      73          51 :   ulong end = FD_SCRATCH_ALLOC_FINI( l, fd_grpc_client_align() );
      74          51 :   FD_TEST( end-(ulong)mem == fd_grpc_client_footprint( buf_max ) );
      75             : 
      76          51 :   fd_grpc_client_t * client = client_mem;
      77             : 
      78          51 :   fd_grpc_h2_stream_t * stream_pool =
      79          51 :     fd_grpc_h2_stream_pool_join( fd_grpc_h2_stream_pool_new( stream_pool_mem, FD_GRPC_CLIENT_MAX_STREAMS ) );
      80          51 :   if( FD_UNLIKELY( !stream_pool ) ) FD_LOG_CRIT(( "Failed to create stream pool" )); /* unreachable */
      81             : 
      82          51 :   *client = (fd_grpc_client_t){
      83          51 :     .callbacks         = callbacks,
      84          51 :     .ctx               = app_ctx,
      85          51 :     .stream_pool       = stream_pool,
      86          51 :     .stream_bufs       = stream_buf_mem,
      87          51 :     .nanopb_tx         = nanopb_tx,
      88          51 :     .nanopb_tx_max     = buf_max,
      89          51 :     .frame_scratch     = frame_scratch,
      90          51 :     .frame_scratch_max = buf_max,
      91          51 :     .frame_rx_buf      = frame_rx_buf,
      92          51 :     .frame_rx_buf_max  = buf_max,
      93          51 :     .frame_tx_buf      = frame_tx_buf,
      94          51 :     .frame_tx_buf_max  = buf_max,
      95          51 :     .metrics           = metrics
      96          51 :   };
      97             : 
      98             :   /* FIXME for performance, cache this? */
      99          51 :   fd_h2_hdr_matcher_init( client->matcher, rng_seed );
     100          51 :   fd_h2_hdr_matcher_insert_literal( client->matcher, FD_GRPC_HDR_STATUS,  "grpc-status"  );
     101          51 :   fd_h2_hdr_matcher_insert_literal( client->matcher, FD_GRPC_HDR_MESSAGE, "grpc-message" );
     102             : 
     103          51 :   client->version_len = 5;
     104          51 :   memcpy( client->version, "0.0.0", 5 );
     105             : 
     106         459 :   for( ulong i=0UL; i<FD_GRPC_CLIENT_MAX_STREAMS; i++ ) {
     107         408 :     fd_grpc_h2_stream_t * stream = &client->stream_pool[ i ];
     108         408 :     stream->msg_buf     = (uchar *)stream_buf_mem + (i*buf_max);
     109         408 :     stream->msg_buf_max = buf_max;
     110         408 :     FD_TEST( (ulong)( stream->msg_buf + stream->msg_buf_max )<=end );
     111         408 :   }
     112          51 :   fd_grpc_client_reset( client );
     113             : 
     114          51 :   return client;
     115          51 : }
     116             : 
     117             : void *
     118           3 : fd_grpc_client_delete( fd_grpc_client_t * client ) {
     119           3 :   return client;
     120           3 : }
     121             : 
     122             : void
     123             : fd_grpc_client_set_version( fd_grpc_client_t * client,
     124             :                             char const *       version,
     125           0 :                             ulong              version_len ) {
     126           0 :   if( FD_UNLIKELY( version_len > FD_GRPC_CLIENT_VERSION_LEN_MAX ) ) {
     127           0 :     FD_LOG_WARNING(( "Version string too long (%lu chars), ignoring", version_len ));
     128           0 :     return;
     129           0 :   }
     130           0 :   client->version_len = (uchar)version_len;
     131           0 :   memcpy( client->version, version, version_len );
     132           0 : }
     133             : 
     134             : void
     135             : fd_grpc_client_set_authority( fd_grpc_client_t * client,
     136             :                               char const *       host,
     137             :                               ulong              host_len,
     138           0 :                               ushort             port ) {
     139           0 :   host_len = fd_ulong_min( host_len, sizeof(client->host)-1 );
     140           0 :   fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( client->host ), host, host_len ) );
     141           0 :   client->host_len = (uchar)host_len;
     142           0 :   client->port     = (ushort)port;
     143           0 : }
     144             : 
     145             : int
     146         120 : fd_grpc_client_stream_acquire_is_safe( fd_grpc_client_t * client ) {
     147             :   /* Sufficient quota to start a stream? */
     148         120 :   if( FD_UNLIKELY( client->conn->stream_active_cnt[1]+1 > client->conn->peer_settings.max_concurrent_streams ) ) {
     149          18 :     return 0;
     150          18 :   }
     151             : 
     152             :   /* Free stream object available? */
     153         102 :   if( FD_UNLIKELY( !fd_grpc_h2_stream_pool_free( client->stream_pool ) ) ) {
     154           0 :     return 0;
     155           0 :   }
     156         102 :   if( FD_UNLIKELY( client->stream_cnt >= FD_GRPC_CLIENT_MAX_STREAMS ) ) {
     157           0 :     return 0;
     158           0 :   }
     159             : 
     160         102 :   return 1;
     161         102 : }
     162             : 
     163             : fd_grpc_h2_stream_t *
     164             : fd_grpc_client_stream_acquire( fd_grpc_client_t * client,
     165         129 :                                ulong              request_ctx ) {
     166         129 :   if( FD_UNLIKELY( client->stream_cnt >= FD_GRPC_CLIENT_MAX_STREAMS ) ) {
     167           0 :     FD_LOG_CRIT(( "stream pool exhausted" ));
     168           0 :   }
     169             : 
     170         129 :   fd_h2_conn_t * conn = client->conn;
     171         129 :   uint const stream_id = client->conn->tx_stream_next;
     172         129 :   conn->tx_stream_next += 2U;
     173             : 
     174         129 :   fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_pool_ele_acquire( client->stream_pool );
     175         129 :   fd_grpc_h2_stream_reset( stream );
     176         129 :   stream->request_ctx = request_ctx;
     177             : 
     178         129 :   fd_h2_stream_open( fd_h2_stream_init( &stream->s ), conn, stream_id );
     179         129 :   client->request_stream = stream;
     180         129 :   client->stream_ids[ client->stream_cnt ] = stream_id;
     181         129 :   client->streams   [ client->stream_cnt ] = stream;
     182         129 :   client->stream_cnt++;
     183             : 
     184         129 :   return stream;
     185         129 : }
     186             : 
     187             : void
     188             : fd_grpc_client_stream_release( fd_grpc_client_t *    client,
     189          54 :                                fd_grpc_h2_stream_t * stream ) {
     190          54 :   if( FD_UNLIKELY( !client->stream_cnt ) ) FD_LOG_CRIT(( "stream map corrupt" )); /* unreachable */
     191             : 
     192             :   /* Deallocate tx_op */
     193          54 :   if( FD_UNLIKELY( stream == client->request_stream ) ) {
     194          36 :     client->request_stream = NULL;
     195          36 :     *client->request_tx_op = (fd_h2_tx_op_t){0};
     196          36 :   }
     197             : 
     198             :   /* Remove stream from map */
     199          54 :   int map_idx = -1;
     200         153 :   for( uint i=0UL; i<(client->stream_cnt); i++ ) {
     201          99 :     if( client->stream_ids[ i ] == stream->s.stream_id ) {
     202          54 :       map_idx = (int)i;
     203          54 :     }
     204          99 :   }
     205          54 :   if( FD_UNLIKELY( map_idx<0 ) ) FD_LOG_CRIT(( "stream map corrupt" )); /* unreachable */
     206          54 :   if( (ulong)map_idx+1 < client->stream_cnt ) {
     207           6 :     client->stream_ids[ map_idx ] = client->stream_ids[ client->stream_cnt-1 ];
     208           6 :     client->streams   [ map_idx ] = client->streams   [ client->stream_cnt-1 ];
     209           6 :   }
     210          54 :   client->stream_cnt--;
     211             : 
     212          54 :   fd_grpc_h2_stream_pool_ele_release( client->stream_pool, stream );
     213          54 : }
     214             : 
     215             : void
     216          69 : fd_grpc_client_reset( fd_grpc_client_t * client ) {
     217          69 :   fd_h2_rbuf_init( client->frame_rx, client->frame_rx_buf, client->frame_rx_buf_max );
     218          69 :   fd_h2_rbuf_init( client->frame_tx, client->frame_tx_buf, client->frame_tx_buf_max );
     219          69 :   fd_h2_conn_init_client( client->conn );
     220          69 :   client->conn->ctx      = client;
     221          69 :   client->h2_hs_done     = 0;
     222          69 :   client->ssl_hs_done    = 0;
     223          69 :   client->request_stream = NULL;
     224          69 :   *client->request_tx_op = (fd_h2_tx_op_t){0};
     225             : 
     226             :   /* Disable RX flow control */
     227          69 :   client->conn->self_settings.initial_window_size = (1U<<31)-1U;
     228          69 :   client->conn->rx_wnd_max   = (1U<<31)-1U;
     229          69 :   client->conn->rx_wnd_wmark = client->conn->rx_wnd_max - (1U<<20);
     230             : 
     231             :   /* Free all stream objects */
     232          78 :   while( client->stream_cnt ) {
     233           9 :     fd_grpc_h2_stream_t * stream = client->streams[ client->stream_cnt-1 ];
     234           9 :     fd_grpc_client_stream_release( client, stream );
     235           9 :   }
     236          69 : }
     237             : 
     238             : /* fd_grpc_client_send_stream_quota writes a WINDOW_UPDATE frame, which
     239             :    eventually allows the peer to send more data bytes. */
     240             : 
     241             : static void
     242             : fd_grpc_client_send_stream_quota( fd_h2_rbuf_t *        rbuf_tx,
     243             :                                   fd_grpc_h2_stream_t * stream,
     244           3 :                                   uint                  bump ) {
     245           3 :   fd_h2_window_update_t window_update = {
     246           3 :     .hdr = {
     247           3 :       .typlen      = fd_h2_frame_typlen( FD_H2_FRAME_TYPE_WINDOW_UPDATE, 4UL ),
     248           3 :       .r_stream_id = fd_uint_bswap( stream->s.stream_id )
     249           3 :     },
     250           3 :     .increment = fd_uint_bswap( bump )
     251           3 :   };
     252           3 :   fd_h2_rbuf_push( rbuf_tx, &window_update, sizeof(fd_h2_window_update_t) );
     253           3 :   stream->s.rx_wnd += bump;
     254           3 : }
     255             : 
     256             : /* fd_grpc_client_send_timeout is called when a stream timeout triggers.
     257             :    Calls back to the user, writes a RST_STREAM frame, and frees the
     258             :    stream object. */
     259             : 
     260             : static void
     261             : fd_grpc_client_send_timeout( fd_h2_rbuf_t *        rbuf_tx,
     262             :                              fd_grpc_client_t *    client,
     263             :                              fd_grpc_h2_stream_t * stream,
     264          12 :                              int                   deadline_kind ) {
     265          12 :   client->callbacks->rx_timeout( client->ctx, stream->request_ctx, deadline_kind );
     266          12 :   fd_h2_tx_rst_stream( rbuf_tx, stream->s.stream_id, FD_H2_ERR_CANCEL );
     267          12 :   fd_grpc_client_stream_release( client, stream );
     268          12 : }
     269             : 
     270             : void
     271             : fd_grpc_client_service_streams( fd_grpc_client_t * client,
     272          51 :                                 long               ts_nanos ) {
     273          51 :   ulong const meta_frame_max =
     274          51 :     fd_ulong_max( sizeof(fd_h2_window_update_t), sizeof(fd_h2_rst_stream_t) );
     275          51 :   fd_h2_conn_t * conn    = client->conn;
     276          51 :   fd_h2_rbuf_t * rbuf_tx = client->frame_tx;
     277          51 :   if( FD_UNLIKELY( conn->flags ) ) return;
     278          51 :   uint  const wnd_max    = conn->self_settings.initial_window_size;
     279          51 :   uint  const wnd_thres  = wnd_max / 2;
     280         141 :   for( ulong i=0UL; i<(client->stream_cnt); i++ ) {
     281          90 :     if( FD_UNLIKELY( fd_h2_rbuf_free_sz( rbuf_tx )<meta_frame_max ) ) break;
     282          90 :     fd_grpc_h2_stream_t * stream = client->streams[ i ];
     283             : 
     284          90 :     if( FD_UNLIKELY( ( stream->has_header_deadline ) &
     285          90 :                      ( stream->header_deadline_nanos - ts_nanos <= 0L ) ) ) {
     286           6 :       fd_grpc_client_send_timeout( rbuf_tx, client, stream, FD_GRPC_DEADLINE_HEADER );
     287           6 :       i--; /* stream removed */
     288           6 :       continue;
     289           6 :     }
     290             : 
     291          84 :     if( FD_UNLIKELY( ( stream->has_rx_end_deadline ) &
     292          84 :                      ( stream->rx_end_deadline_nanos - ts_nanos <= 0L ) ) ) {
     293           6 :       fd_grpc_client_send_timeout( rbuf_tx, client, stream, FD_GRPC_DEADLINE_RX_END );
     294           6 :       i--; /* stream removed */
     295           6 :       continue;
     296           6 :     }
     297             : 
     298          78 :     if( FD_UNLIKELY( stream->s.rx_wnd < wnd_thres ) ) {
     299           3 :       uint const bump = wnd_max - stream->s.rx_wnd;
     300           3 :       fd_grpc_client_send_stream_quota( rbuf_tx, stream, bump );
     301           3 :     }
     302          78 :   }
     303          51 : }
     304             : 
     305             : #if FD_HAS_OPENSSL
     306             : 
     307             : static int
     308             : fd_ossl_log_error( char const * str,
     309             :                    ulong        len,
     310           0 :                    void *       ctx ) {
     311           0 :   (void)ctx;
     312           0 :   if( len>0 && str[ len-1 ]=='\n' ) len--;
     313           0 :   FD_LOG_INFO(( "%.*s", (int)len, str ));
     314           0 :   return 0;
     315           0 : }
     316             : 
     317             : int
     318             : fd_grpc_client_rxtx_ossl( fd_grpc_client_t * client,
     319             :                           SSL *              ssl,
     320           0 :                           int *              charge_busy ) {
     321           0 :   if( FD_UNLIKELY( !client->ssl_hs_done ) ) {
     322           0 :     int res = SSL_do_handshake( ssl );
     323           0 :     if( res<=0 ) {
     324           0 :       int error = SSL_get_error( ssl, res );
     325           0 :       if( FD_LIKELY( error==SSL_ERROR_WANT_READ || error==SSL_ERROR_WANT_WRITE ) ) return 1;
     326           0 :       FD_LOG_INFO(( "SSL_do_handshake failed (%i-%s)", error, fd_openssl_ssl_strerror( error ) ));
     327           0 :       long verify_result = SSL_get_verify_result( ssl );
     328           0 :       if( error == SSL_ERROR_SSL && verify_result != X509_V_OK ) {
     329           0 :         FD_LOG_WARNING(( "Certificate verification failed: %s", X509_verify_cert_error_string( verify_result ) ));
     330           0 :       }
     331           0 :       ERR_print_errors_cb( fd_ossl_log_error, NULL );
     332           0 :       return 0;
     333           0 :     } else {
     334           0 :       client->ssl_hs_done = 1;
     335           0 :     }
     336           0 :   }
     337             : 
     338           0 :   fd_h2_conn_t * conn = client->conn;
     339           0 :   int ssl_err = 0;
     340           0 :   ulong read_sz = fd_h2_rbuf_ssl_read( client->frame_rx, ssl, &ssl_err );
     341           0 :   if( FD_UNLIKELY( ssl_err && ssl_err!=SSL_ERROR_WANT_READ ) ) {
     342           0 :     if( ssl_err==SSL_ERROR_ZERO_RETURN ) {
     343           0 :       FD_LOG_WARNING(( "gRPC server closed connection" ));
     344           0 :       return 0;
     345           0 :     }
     346           0 :     FD_LOG_WARNING(( "SSL_read_ex failed (%i-%s)", ssl_err, fd_openssl_ssl_strerror( ssl_err ) ));
     347           0 :     ERR_print_errors_cb( fd_ossl_log_error, NULL );
     348           0 :     return 0;
     349           0 :   }
     350           0 :   if( FD_UNLIKELY( conn->flags ) ) fd_h2_tx_control( conn, client->frame_tx, &fd_grpc_client_h2_callbacks );
     351           0 :   fd_h2_rx( conn, client->frame_rx, client->frame_tx, client->frame_scratch, client->frame_scratch_max, &fd_grpc_client_h2_callbacks );
     352           0 :   fd_grpc_client_service_streams( client, fd_log_wallclock() );
     353           0 :   ulong write_sz = fd_h2_rbuf_ssl_write( client->frame_tx, ssl );
     354           0 :   client->metrics->stream_chunks_rx_bytes += read_sz;
     355           0 :   client->metrics->stream_chunks_tx_bytes += write_sz;
     356             : 
     357           0 :   if( read_sz!=0 || write_sz!=0 ) *charge_busy = 1;
     358           0 :   return 1;
     359           0 : }
     360             : 
     361             : #endif /* FD_HAS_OPENSSL */
     362             : 
     363             : #if FD_H2_HAS_SOCKETS
     364             : 
     365             : int
     366             : fd_grpc_client_rxtx_socket( fd_grpc_client_t * client,
     367             :                             int                sock_fd,
     368          27 :                             int *              charge_busy ) {
     369          27 :   fd_h2_conn_t * conn = client->conn;
     370          27 :   ulong const frame_rx_lo_0 = client->frame_rx->lo_off;
     371          27 :   ulong const frame_rx_hi_0 = client->frame_rx->hi_off;
     372          27 :   ulong const frame_tx_lo_1 = client->frame_tx->lo_off;
     373          27 :   ulong const frame_tx_hi_1 = client->frame_tx->hi_off;
     374             : 
     375          27 :   int rx_err = fd_h2_rbuf_recvmsg( client->frame_rx, sock_fd, MSG_NOSIGNAL|MSG_DONTWAIT );
     376          27 :   if( FD_UNLIKELY( rx_err ) ) {
     377           0 :     FD_LOG_INFO(( "Disconnected: recvmsg error (%i-%s)", rx_err, fd_io_strerror( rx_err ) ));
     378           0 :     errno = rx_err;
     379           0 :     return -1;
     380           0 :   }
     381             : 
     382          27 :   if( FD_UNLIKELY( conn->flags ) ) fd_h2_tx_control( conn, client->frame_tx, &fd_grpc_client_h2_callbacks );
     383          27 :   fd_h2_rx( conn, client->frame_rx, client->frame_tx, client->frame_scratch, client->frame_scratch_max, &fd_grpc_client_h2_callbacks );
     384          27 :   fd_grpc_client_service_streams( client, fd_log_wallclock() );
     385             : 
     386          27 :   int tx_err = fd_h2_rbuf_sendmsg( client->frame_tx, sock_fd, MSG_NOSIGNAL|MSG_DONTWAIT );
     387          27 :   if( FD_LIKELY( tx_err && tx_err==EAGAIN ) ) return 1;
     388          27 :   if( FD_UNLIKELY( tx_err ) ) {
     389           0 :     FD_LOG_WARNING(( "fd_h2_rbuf_sendmsg failed (%i-%s)", tx_err, fd_io_strerror( tx_err ) ));
     390           0 :     errno = tx_err;
     391           0 :     return -1;
     392           0 :   }
     393             : 
     394          27 :   ulong const frame_rx_lo_1 = client->frame_rx->lo_off;
     395          27 :   ulong const frame_rx_hi_1 = client->frame_rx->hi_off;
     396          27 :   ulong const frame_tx_lo_0 = client->frame_tx->lo_off;
     397          27 :   ulong const frame_tx_hi_0 = client->frame_tx->hi_off;
     398             : 
     399          27 :   client->metrics->stream_chunks_rx_bytes += frame_rx_hi_1 - frame_rx_hi_0;
     400          27 :   client->metrics->stream_chunks_tx_bytes += frame_tx_lo_0 - frame_tx_lo_1;
     401             : 
     402          27 :   if( frame_rx_lo_0!=frame_rx_lo_1 || frame_rx_hi_0!=frame_rx_hi_1 ||
     403          27 :       frame_tx_lo_0!=frame_tx_lo_1 || frame_tx_hi_0!=frame_tx_hi_1 ) {
     404           3 :     *charge_busy = 1;
     405           3 :   }
     406             : 
     407          27 :   return 0;
     408          27 : }
     409             : 
     410             : #endif /* FD_H2_HAS_SOCKETS */
     411             : 
     412             : /* fd_grpc_client_request continue attempts to write a request data
     413             :    frame. */
     414             : 
     415             : static int
     416           9 : fd_grpc_client_request_continue1( fd_grpc_client_t * client ) {
     417           9 :   fd_grpc_h2_stream_t * stream    = client->request_stream;
     418           9 :   fd_h2_stream_t *      h2_stream = &stream->s;
     419           9 :   fd_h2_tx_op_copy( client->conn, h2_stream, client->frame_tx, client->request_tx_op );
     420           9 :   if( FD_UNLIKELY( client->request_tx_op->chunk_sz ) ) return 0;
     421           9 :   if( FD_UNLIKELY( h2_stream->state != FD_H2_STREAM_STATE_CLOSING_TX ) ) return 0;
     422           9 :   client->metrics->stream_chunks_tx_cnt++;
     423             :   /* Request finished */
     424           9 :   client->request_stream = NULL;
     425           9 :   client->callbacks->tx_complete( client->ctx, stream->request_ctx );
     426           9 :   return 1;
     427           9 : }
     428             : 
     429             : static int
     430           0 : fd_grpc_client_request_continue( fd_grpc_client_t * client ) {
     431           0 :   if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
     432           0 :   if( FD_UNLIKELY( !client->request_stream ) ) return 0;
     433           0 :   if( FD_UNLIKELY( !client->request_tx_op->chunk_sz ) ) return 0;
     434           0 :   return fd_grpc_client_request_continue1( client );
     435           0 : }
     436             : 
     437             : int
     438         111 : fd_grpc_client_is_connected( fd_grpc_client_t * client ) {
     439         111 :   if( FD_UNLIKELY( !client                                          ) ) return 0;
     440         111 :   if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD      ) ) return 0;
     441         111 :   if( FD_UNLIKELY( !client->h2_hs_done                              ) ) return 0;
     442         108 :   return 1;
     443         111 : }
     444             : 
     445             : int
     446         108 : fd_grpc_client_request_is_blocked( fd_grpc_client_t * client ) {
     447         108 :   if( FD_UNLIKELY( !client                                          ) ) return 1;
     448         108 :   if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD      ) ) return 1;
     449         108 :   if( FD_UNLIKELY( !client->h2_hs_done                              ) ) return 1;
     450         108 :   if( FD_UNLIKELY( !fd_h2_rbuf_is_empty( client->frame_tx )         ) ) return 1;
     451         108 :   if( FD_UNLIKELY( !fd_grpc_client_stream_acquire_is_safe( client ) ) ) return 1;
     452          90 :   return 0;
     453         108 : }
     454             : 
     455             : int
     456           0 : fd_grpc_client_request_stream_busy( fd_grpc_client_t * client ) {
     457           0 :   return client->request_stream != NULL;
     458           0 : }
     459             : 
     460             : fd_grpc_h2_stream_t *
     461             : fd_grpc_client_request_start(
     462             :     fd_grpc_client_t *   client,
     463             :     char const *         path,
     464             :     ulong                path_len,
     465             :     ulong                request_ctx,
     466             :     pb_msgdesc_t const * fields,
     467             :     void const *         message,
     468             :     char const *         auth_token,
     469             :     ulong                auth_token_sz,
     470             :     int                  is_streaming
     471           9 : ) {
     472           9 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( client ) ) ) return NULL;
     473             : 
     474             :   /* Encode message */
     475           9 :   FD_TEST( client->nanopb_tx_max > sizeof(fd_grpc_hdr_t) );
     476           9 :   uchar * proto_buf = client->nanopb_tx + sizeof(fd_grpc_hdr_t);
     477           9 :   pb_ostream_t ostream = pb_ostream_from_buffer( proto_buf, client->nanopb_tx_max - sizeof(fd_grpc_hdr_t) );
     478           9 :   if( FD_UNLIKELY( !pb_encode( &ostream, fields, message ) ) ) {
     479           0 :     FD_LOG_WARNING(( "Failed to encode Protobuf message (%.*s). This is a bug (insufficient buffer space?)", (int)path_len, path ));
     480           0 :     return NULL;
     481           0 :   }
     482           9 :   ulong const serialized_sz = ostream.bytes_written;
     483             : 
     484             :   /* Create gRPC length prefix */
     485           9 :   fd_grpc_hdr_t hdr = {
     486           9 :     .compressed=0,
     487           9 :     .msg_sz=fd_uint_bswap( (uint)serialized_sz )
     488           9 :   };
     489           9 :   memcpy( client->nanopb_tx, &hdr, sizeof(fd_grpc_hdr_t) );
     490           9 :   ulong const payload_sz = serialized_sz + sizeof(fd_grpc_hdr_t);
     491             : 
     492             :   /* Allocate stream descriptor */
     493           9 :   fd_grpc_h2_stream_t * stream    = fd_grpc_client_stream_acquire( client, request_ctx );
     494           9 :   uint const            stream_id = stream->s.stream_id;
     495             : 
     496             :   /* Write HTTP/2 request headers */
     497           9 :   fd_h2_tx_prepare( client->conn, client->frame_tx, FD_H2_FRAME_TYPE_HEADERS, FD_H2_FLAG_END_HEADERS, stream_id );
     498           9 :   fd_grpc_req_hdrs_t req_meta = {
     499           9 :     .host     = client->host,
     500           9 :     .host_len = client->host_len,
     501           9 :     .port     = client->port,
     502           9 :     .path     = path,
     503           9 :     .path_len = path_len,
     504           9 :     .https    = 1, /* grpc_client assumes TLS encryption for now */
     505             : 
     506           9 :     .bearer_auth     = auth_token,
     507           9 :     .bearer_auth_len = auth_token_sz
     508           9 :   };
     509           9 :   if( FD_UNLIKELY( !fd_grpc_h2_gen_request_hdrs(
     510           9 :       &req_meta,
     511           9 :       client->frame_tx,
     512           9 :       client->version,
     513           9 :       client->version_len
     514           9 :   ) ) ) {
     515           0 :     FD_LOG_WARNING(( "Failed to generate gRPC request headers (%.*s). This is a bug", (int)path_len, path ));
     516           0 :     fd_grpc_client_stream_release( client, stream );
     517           0 :     return NULL;
     518           0 :   }
     519           9 :   fd_h2_tx_commit( client->conn, client->frame_tx );
     520             : 
     521             :   /* Queue request payload for send
     522             :      (Protobuf message might have to be fragmented into multiple HTTP/2
     523             :      DATA frames if the client gets blocked)
     524             :      For streaming requests, don't set END_STREAM flag yet */
     525           9 :   uint flags = is_streaming ? 0U : FD_H2_FLAG_END_STREAM;
     526           9 :   fd_h2_tx_op_init( client->request_tx_op, client->nanopb_tx, payload_sz, flags );
     527           9 :   fd_grpc_client_request_continue1( client );
     528           9 :   client->metrics->requests_sent++;
     529           9 :   client->metrics->streams_active++;
     530             : 
     531           9 :   FD_LOG_DEBUG(( "gRPC request path=%.*s sz=%lu streaming=%d", (int)path_len, path, serialized_sz, is_streaming ));
     532             : 
     533           9 :   return stream;
     534           9 : }
     535             : 
     536             : fd_grpc_h2_stream_t *
     537             : fd_grpc_client_request_start1(
     538             :     fd_grpc_client_t *   client,
     539             :     char const *         path,
     540             :     ulong                path_len,
     541             :     ulong                request_ctx,
     542             :     uchar const *        protobuf,
     543             :     ulong                protobuf_sz,
     544             :     char const *         auth_token,
     545             :     ulong                auth_token_sz,
     546             :     int                  is_streaming
     547           0 : ) {
     548           0 :   if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( client ) ) ) return NULL;
     549             : 
     550             :   /* Validate protobuf size */
     551           0 :   FD_TEST( client->nanopb_tx_max > sizeof(fd_grpc_hdr_t) );
     552           0 :   ulong const max_proto_sz = client->nanopb_tx_max - sizeof(fd_grpc_hdr_t);
     553           0 :   if( FD_UNLIKELY( protobuf_sz > max_proto_sz ) ) {
     554           0 :     FD_LOG_WARNING(( "Protobuf message too large (%lu bytes) for path (%.*s). Max size is %lu bytes", protobuf_sz, (int)path_len, path, max_proto_sz ));
     555           0 :     return NULL;
     556           0 :   }
     557             : 
     558             :   /* Copy protobuf to buffer after gRPC header */
     559           0 :   uchar * proto_buf = client->nanopb_tx + sizeof(fd_grpc_hdr_t);
     560           0 :   memcpy( proto_buf, protobuf, protobuf_sz );
     561             : 
     562             :   /* Create gRPC length prefix */
     563           0 :   fd_grpc_hdr_t hdr = {
     564           0 :     .compressed=0,
     565           0 :     .msg_sz=fd_uint_bswap( (uint)protobuf_sz )
     566           0 :   };
     567           0 :   memcpy( client->nanopb_tx, &hdr, sizeof(fd_grpc_hdr_t) );
     568           0 :   ulong const payload_sz = protobuf_sz + sizeof(fd_grpc_hdr_t);
     569             : 
     570             :   /* Allocate stream descriptor */
     571           0 :   fd_grpc_h2_stream_t * stream    = fd_grpc_client_stream_acquire( client, request_ctx );
     572           0 :   uint const            stream_id = stream->s.stream_id;
     573             : 
     574             :   /* Write HTTP/2 request headers */
     575           0 :   fd_h2_tx_prepare( client->conn, client->frame_tx, FD_H2_FRAME_TYPE_HEADERS, FD_H2_FLAG_END_HEADERS, stream_id );
     576           0 :   fd_grpc_req_hdrs_t req_meta = {
     577           0 :     .host     = client->host,
     578           0 :     .host_len = client->host_len,
     579           0 :     .port     = client->port,
     580           0 :     .path     = path,
     581           0 :     .path_len = path_len,
     582           0 :     .https    = 1, /* grpc_client assumes TLS encryption for now */
     583             : 
     584           0 :     .bearer_auth     = auth_token,
     585           0 :     .bearer_auth_len = auth_token_sz
     586           0 :   };
     587           0 :   if( FD_UNLIKELY( !fd_grpc_h2_gen_request_hdrs(
     588           0 :       &req_meta,
     589           0 :       client->frame_tx,
     590           0 :       client->version,
     591           0 :       client->version_len
     592           0 :   ) ) ) {
     593           0 :     FD_LOG_WARNING(( "Failed to generate gRPC request headers (%.*s). This is a bug", (int)path_len, path ));
     594           0 :     fd_grpc_client_stream_release( client, stream );
     595           0 :     return NULL;
     596           0 :   }
     597           0 :   fd_h2_tx_commit( client->conn, client->frame_tx );
     598             : 
     599             :   /* Queue request payload for send
     600             :      (Protobuf message might have to be fragmented into multiple HTTP/2
     601             :      DATA frames if the client gets blocked)
     602             :      For streaming requests, don't set END_STREAM flag yet */
     603           0 :   uint flags = is_streaming ? 0U : FD_H2_FLAG_END_STREAM;
     604           0 :   fd_h2_tx_op_init( client->request_tx_op, client->nanopb_tx, payload_sz, flags );
     605           0 :   fd_grpc_client_request_continue1( client );
     606           0 :   client->metrics->requests_sent++;
     607           0 :   client->metrics->streams_active++;
     608             : 
     609           0 :   FD_LOG_DEBUG(( "gRPC request path=%.*s sz=%lu streaming=%d", (int)path_len, path, protobuf_sz, is_streaming ));
     610             : 
     611           0 :   return stream;
     612           0 : }
     613             : 
     614             : int
     615             : fd_grpc_client_stream_send_msg(
     616             :     fd_grpc_client_t *   client,
     617             :     fd_grpc_h2_stream_t * stream,
     618             :     pb_msgdesc_t const * fields,
     619             :     void const *         message
     620           0 : ) {
     621           0 :   if( FD_UNLIKELY( !client || !stream ) ) return 0;
     622           0 :   if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
     623           0 :   if( FD_UNLIKELY( !fd_h2_rbuf_is_empty( client->frame_tx ) ) ) return 0;
     624           0 :   if( FD_UNLIKELY( client->request_stream != NULL && client->request_stream != stream ) ) return 0; /* Another stream has a request in progress */
     625             : 
     626             :   /* Encode message */
     627           0 :   FD_TEST( client->nanopb_tx_max > sizeof(fd_grpc_hdr_t) );
     628           0 :   uchar * proto_buf = client->nanopb_tx + sizeof(fd_grpc_hdr_t);
     629           0 :   pb_ostream_t ostream = pb_ostream_from_buffer( proto_buf, client->nanopb_tx_max - sizeof(fd_grpc_hdr_t) );
     630           0 :   if( FD_UNLIKELY( !pb_encode( &ostream, fields, message ) ) ) {
     631           0 :     FD_LOG_WARNING(( "Failed to encode Protobuf message for stream_id=%u. This is a bug (insufficient buffer space?)", stream->s.stream_id ));
     632           0 :     return 0;
     633           0 :   }
     634           0 :   ulong const serialized_sz = ostream.bytes_written;
     635             : 
     636             :   /* Create gRPC length prefix */
     637           0 :   fd_grpc_hdr_t hdr = {
     638           0 :     .compressed=0,
     639           0 :     .msg_sz=fd_uint_bswap( (uint)serialized_sz )
     640           0 :   };
     641           0 :   memcpy( client->nanopb_tx, &hdr, sizeof(fd_grpc_hdr_t) );
     642           0 :   ulong const payload_sz = serialized_sz + sizeof(fd_grpc_hdr_t);
     643             : 
     644             :   /* Queue message payload for send (without END_STREAM flag) */
     645           0 :   client->request_stream = stream;
     646           0 :   fd_h2_tx_op_init( client->request_tx_op, client->nanopb_tx, payload_sz, 0U );
     647           0 :   fd_grpc_client_request_continue1( client );
     648             : 
     649           0 :   FD_LOG_DEBUG(( "gRPC stream_send_msg stream_id=%u sz=%lu", stream->s.stream_id, serialized_sz ));
     650             : 
     651           0 :   return 1;
     652           0 : }
     653             : 
     654             : int
     655             : fd_grpc_client_stream_send_msg1(
     656             :     fd_grpc_client_t *    client,
     657             :     fd_grpc_h2_stream_t * stream,
     658             :     uchar const *         protobuf,
     659             :     ulong                 protobuf_sz
     660           0 : ) {
     661           0 :   if( FD_UNLIKELY( !client || !stream ) ) return 0;
     662           0 :   if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
     663           0 :   if( FD_UNLIKELY( !fd_h2_rbuf_is_empty( client->frame_tx ) ) ) return 0;
     664           0 :   if( FD_UNLIKELY( client->request_stream != NULL && client->request_stream != stream ) ) return 0; /* Another stream has a request in progress */
     665             : 
     666             :   /* Validate protobuf size */
     667           0 :   FD_TEST( client->nanopb_tx_max > sizeof(fd_grpc_hdr_t) );
     668           0 :   ulong const max_proto_sz = client->nanopb_tx_max - sizeof(fd_grpc_hdr_t);
     669           0 :   if( FD_UNLIKELY( protobuf_sz > max_proto_sz ) ) {
     670           0 :     FD_LOG_WARNING(( "Protobuf message too large (%lu bytes) for stream_id=%u. Max size is %lu bytes", protobuf_sz, stream->s.stream_id, max_proto_sz ));
     671           0 :     return 0;
     672           0 :   }
     673             : 
     674             :   /* Copy protobuf to buffer after gRPC header */
     675           0 :   uchar * proto_buf = client->nanopb_tx + sizeof(fd_grpc_hdr_t);
     676           0 :   memcpy( proto_buf, protobuf, protobuf_sz );
     677             : 
     678             :   /* Create gRPC length prefix */
     679           0 :   fd_grpc_hdr_t hdr = {
     680           0 :     .compressed=0,
     681           0 :     .msg_sz=fd_uint_bswap( (uint)protobuf_sz )
     682           0 :   };
     683           0 :   memcpy( client->nanopb_tx, &hdr, sizeof(fd_grpc_hdr_t) );
     684           0 :   ulong const payload_sz = protobuf_sz + sizeof(fd_grpc_hdr_t);
     685             : 
     686             :   /* Queue message payload for send (without END_STREAM flag) */
     687           0 :   client->request_stream = stream;
     688           0 :   fd_h2_tx_op_init( client->request_tx_op, client->nanopb_tx, payload_sz, 0U );
     689           0 :   fd_grpc_client_request_continue1( client );
     690             : 
     691           0 :   FD_LOG_DEBUG(( "gRPC stream_send_msg stream_id=%u sz=%lu", stream->s.stream_id, protobuf_sz ));
     692             : 
     693           0 :   return 1;
     694           0 : }
     695             : 
     696             : int
     697             : fd_grpc_client_stream_close(
     698             :     fd_grpc_client_t *    client,
     699             :     fd_grpc_h2_stream_t * stream
     700           0 : ) {
     701           0 :   if( FD_UNLIKELY( !client || !stream ) ) return 0;
     702           0 :   if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
     703           0 :   if( FD_UNLIKELY( !fd_h2_rbuf_is_empty( client->frame_tx ) ) ) return 0;
     704           0 :   if( FD_UNLIKELY( client->request_stream != NULL ) ) return 0; /* Another request in progress */
     705             : 
     706             :   /* Send empty DATA frame with END_STREAM flag to close the stream */
     707           0 :   fd_h2_tx_prepare( client->conn, client->frame_tx, FD_H2_FRAME_TYPE_DATA, FD_H2_FLAG_END_STREAM, stream->s.stream_id );
     708           0 :   fd_h2_tx_commit( client->conn, client->frame_tx );
     709             : 
     710           0 :   FD_LOG_DEBUG(( "gRPC stream_close stream_id=%u", stream->s.stream_id ));
     711             : 
     712           0 :   return 1;
     713           0 : }
     714             : 
     715             : void
     716             : fd_grpc_client_deadline_set( fd_grpc_h2_stream_t * stream,
     717             :                              int                   deadline_kind,
     718          18 :                              long                  ts_nanos ) {
     719          18 :   switch( deadline_kind ) {
     720          12 :   case FD_GRPC_DEADLINE_HEADER:
     721          12 :     stream->header_deadline_nanos = ts_nanos;
     722          12 :     stream->has_header_deadline   = 1;
     723          12 :     break;
     724           6 :   case FD_GRPC_DEADLINE_RX_END:
     725           6 :     stream->rx_end_deadline_nanos = ts_nanos;
     726           6 :     stream->has_rx_end_deadline   = 1;
     727           6 :     break;
     728          18 :   }
     729          18 : }
     730             : 
     731             : /* Lookup stream by ID */
     732             : 
     733             : static fd_h2_stream_t *
     734             : fd_grpc_h2_stream_query( fd_h2_conn_t * conn,
     735           0 :                          uint           stream_id ) {
     736           0 :   fd_grpc_client_t * client = conn->ctx;
     737           0 :   for( ulong i=0UL; i<client->stream_cnt; i++ ) {
     738           0 :     if( client->stream_ids[ i ] == stream_id ) {
     739           0 :       return &client->streams[ i ]->s;
     740           0 :     }
     741           0 :   }
     742           0 :   return NULL;
     743           0 : }
     744             : 
     745             : static void
     746           0 : fd_grpc_h2_conn_established( fd_h2_conn_t * conn ) {
     747           0 :   fd_grpc_client_t * client = conn->ctx;
     748           0 :   client->h2_hs_done = 1;
     749           0 :   client->callbacks->conn_established( client->ctx );
     750           0 : }
     751             : 
     752             : static void
     753             : fd_grpc_h2_conn_final( fd_h2_conn_t * conn,
     754             :                        uint           h2_err,
     755           0 :                        int            closed_by ) {
     756           0 :   fd_grpc_client_t * client = conn->ctx;
     757           0 :   client->callbacks->conn_dead( client->ctx, h2_err, closed_by );
     758           0 : }
     759             : 
     760             : /* React to response data */
     761             : 
     762             : void
     763             : fd_grpc_h2_cb_headers(
     764             :     fd_h2_conn_t *   conn,
     765             :     fd_h2_stream_t * h2_stream,
     766             :     void const *     data,
     767             :     ulong            data_sz,
     768             :     ulong            flags
     769          18 : ) {
     770          18 :   fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_upcast( h2_stream );
     771          18 :   fd_grpc_client_t * client = conn->ctx;
     772             : 
     773          18 :   int h2_status = fd_grpc_h2_read_response_hdrs( &stream->hdrs, client->matcher, data, data_sz );
     774          18 :   if( FD_UNLIKELY( h2_status!=FD_H2_SUCCESS ) ) {
     775             :     /* Failed to parse HTTP/2 headers */
     776           3 :     fd_h2_stream_error( h2_stream, client->frame_tx, FD_H2_ERR_PROTOCOL );
     777           3 :     client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs ); /* invalidates stream->hdrs */
     778           3 :     fd_grpc_client_stream_release( client, stream );
     779           3 :     return;
     780           3 :   }
     781             : 
     782          15 :   if( !stream->hdrs_received && !!( flags & FD_H2_FLAG_END_HEADERS) ) {
     783             :     /* Got initial response header */
     784          12 :     stream->hdrs_received = 1;
     785          12 :     stream->has_header_deadline = 0;
     786          12 :     if( FD_LIKELY( ( stream->hdrs.h2_status==200  ) &
     787          12 :                    ( !!stream->hdrs.is_grpc_proto ) ) ) {
     788           6 :       client->callbacks->rx_start( client->ctx, stream->request_ctx );
     789           6 :     }
     790          12 :   }
     791             : 
     792          15 :   if( ( flags & (FD_H2_FLAG_END_HEADERS|FD_H2_FLAG_END_STREAM) )
     793          15 :               ==(FD_H2_FLAG_END_HEADERS|FD_H2_FLAG_END_STREAM)   ) {
     794           3 :     client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
     795           3 :     fd_grpc_client_stream_release( client, stream );
     796           3 :     return;
     797           3 :   }
     798          15 : }
     799             : 
     800             : static void
     801             : fd_grpc_h2_cb_data(
     802             :     fd_h2_conn_t *   conn,
     803             :     fd_h2_stream_t * h2_stream,
     804             :     void const *     data,
     805             :     ulong            data_sz,
     806             :     ulong            flags
     807           3 : ) {
     808           3 :   fd_grpc_client_t *    client = conn->ctx;
     809           3 :   fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_upcast( h2_stream );
     810           3 :   if( FD_UNLIKELY( ( stream->hdrs.h2_status!=200 ) |
     811           3 :                    ( !stream->hdrs.is_grpc_proto ) ) ) {
     812           0 :     return;
     813           0 :   }
     814             : 
     815           3 :   do {
     816             : 
     817             :     /* Read header bytes */
     818           3 :     if( stream->msg_buf_used < sizeof(fd_grpc_hdr_t) ) {
     819           3 :       ulong hdr_frag_sz = fd_ulong_min( sizeof(fd_grpc_hdr_t) - stream->msg_buf_used, data_sz );
     820           3 :       fd_memcpy( stream->msg_buf + stream->msg_buf_used, data, hdr_frag_sz );
     821           3 :       stream->msg_buf_used += hdr_frag_sz;
     822           3 :       data     = (void const *)( (ulong)data + (ulong)hdr_frag_sz );
     823           3 :       data_sz -= hdr_frag_sz;
     824           3 :       if( FD_UNLIKELY( stream->msg_buf_used < sizeof(fd_grpc_hdr_t) ) ) return;
     825             : 
     826             :       /* Header complete */
     827           3 :       stream->msg_sz = fd_uint_bswap( FD_LOAD( uint, (void *)( (ulong)stream->msg_buf+1 ) ) );
     828           3 :       if( FD_UNLIKELY( sizeof(fd_grpc_hdr_t)  + stream->msg_sz > stream->msg_buf_max ) ) {
     829           3 :         FD_LOG_WARNING(( "Received oversized gRPC message (%lu bytes), killing request", stream->msg_sz ));
     830           3 :         client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
     831           3 :         fd_h2_stream_error( h2_stream, client->frame_tx, FD_H2_ERR_INTERNAL );
     832           3 :         fd_grpc_client_stream_release( client, stream );
     833           3 :         return;
     834           3 :       }
     835           3 :     }
     836             : 
     837             :     /* Read payload bytes */
     838           0 :     ulong wmark    = sizeof(fd_grpc_hdr_t) + stream->msg_sz;
     839           0 :     ulong chunk_sz = fd_ulong_min( stream->msg_buf_used+data_sz, wmark ) - stream->msg_buf_used;
     840           0 :     if( FD_UNLIKELY( chunk_sz>data_sz ) ) FD_LOG_CRIT(( "integer underflow" )); /* unreachable */
     841           0 :     fd_memcpy( stream->msg_buf + stream->msg_buf_used, data, chunk_sz );
     842           0 :     stream->msg_buf_used += chunk_sz;
     843           0 :     data     = (void const *)( (ulong)data + (ulong)chunk_sz );
     844           0 :     data_sz -= chunk_sz;
     845             : 
     846           0 :     client->metrics->stream_chunks_rx_cnt++;
     847             : 
     848           0 :     if( stream->msg_buf_used >= wmark ) {
     849             :       /* Data complete */
     850           0 :       void const * msg_ptr = stream->msg_buf + sizeof(fd_grpc_hdr_t);
     851           0 :       client->callbacks->rx_msg( client->ctx, msg_ptr, stream->msg_sz, stream->request_ctx );
     852           0 :       stream->msg_buf_used = 0UL;
     853           0 :       stream->msg_sz       = 0UL;
     854           0 :     }
     855             : 
     856           0 :   } while( data_sz );
     857             : 
     858           0 :   if( flags & FD_H2_FLAG_END_STREAM ) {
     859             :     /* FIXME incomplete gRPC message */
     860           0 :     if( FD_UNLIKELY( stream->msg_buf_used ) ) {
     861           0 :       FD_LOG_WARNING(( "Received incomplete gRPC message" ));
     862           0 :     }
     863           0 :     client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
     864           0 :     fd_grpc_client_stream_release( client, stream );
     865           0 :   }
     866           0 : }
     867             : 
     868             : /* Server might kill our request */
     869             : 
     870             : static void
     871             : fd_grpc_h2_rst_stream( fd_h2_conn_t *   conn,
     872             :                        fd_h2_stream_t * h2_stream,
     873             :                        uint             error_code,
     874           3 :                        int              closed_by ) {
     875           3 :   if( closed_by==1 ) {
     876           3 :     FD_LOG_WARNING(( "Server terminated request stream_id=%u (%u-%s)",
     877           3 :                      h2_stream->stream_id, error_code, fd_h2_strerror( error_code ) ));
     878           3 :   } else {
     879           0 :     FD_LOG_WARNING(( "Stream failed stream_id=%u (%u-%s)",
     880           0 :                      h2_stream->stream_id, error_code, fd_h2_strerror( error_code ) ));
     881           0 :   }
     882           3 :   fd_grpc_client_t *    client = conn->ctx;
     883           3 :   fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_upcast( h2_stream );
     884           3 :   client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs ); /* invalidates stream->hdrs */
     885           3 :   fd_grpc_client_stream_release( client, stream );
     886           3 : }
     887             : 
     888             : /* A HTTP/2 flow control change might unblock a queued request send op */
     889             : 
     890             : void
     891             : fd_grpc_h2_window_update( fd_h2_conn_t * conn,
     892           0 :                           uint           increment ) {
     893           0 :   (void)increment;
     894           0 :   fd_grpc_client_request_continue( conn->ctx );
     895           0 : }
     896             : 
     897             : void
     898             : fd_grpc_h2_stream_window_update( fd_h2_conn_t *   conn,
     899             :                                  fd_h2_stream_t * stream,
     900           0 :                                  uint             increment ) {
     901           0 :   (void)stream; (void)increment;
     902           0 :   fd_grpc_client_request_continue( conn->ctx );
     903           0 : }
     904             : 
     905             : void
     906           3 : fd_grpc_h2_ping_ack( fd_h2_conn_t * conn ) {
     907           3 :   fd_grpc_client_t * client = conn->ctx;
     908           3 :   client->callbacks->ping_ack( client->ctx );
     909           3 : }
     910             : 
     911             : fd_h2_rbuf_t *
     912           6 : fd_grpc_client_rbuf_tx( fd_grpc_client_t * client ) {
     913           6 :   return client->frame_tx;
     914           6 : }
     915             : 
     916             : fd_h2_rbuf_t *
     917           0 : fd_grpc_client_rbuf_rx( fd_grpc_client_t * client ) {
     918           0 :   return client->frame_rx;
     919           0 : }
     920             : 
     921             : fd_h2_conn_t *
     922         228 : fd_grpc_client_h2_conn( fd_grpc_client_t * client ) {
     923         228 :   return client->conn;
     924         228 : }
     925             : 
     926             : /* fd_grpc_client_h2_callbacks specifies h2->grpc_client callbacks.
     927             :    Stored in .rodata for security.  Must be kept in sync with fd_h2 to
     928             :    avoid NULL pointers. */
     929             : 
     930             : fd_h2_callbacks_t const fd_grpc_client_h2_callbacks = {
     931             :   .stream_create        = fd_h2_noop_stream_create,
     932             :   .stream_query         = fd_grpc_h2_stream_query,
     933             :   .conn_established     = fd_grpc_h2_conn_established,
     934             :   .conn_final           = fd_grpc_h2_conn_final,
     935             :   .headers              = fd_grpc_h2_cb_headers,
     936             :   .data                 = fd_grpc_h2_cb_data,
     937             :   .rst_stream           = fd_grpc_h2_rst_stream,
     938             :   .window_update        = fd_grpc_h2_window_update,
     939             :   .stream_window_update = fd_grpc_h2_stream_window_update,
     940             :   .ping_ack             = fd_grpc_h2_ping_ack,
     941             : };

Generated by: LCOV version 1.14