Line data Source code
1 : #include "fd_grpc_client.h"
2 : #include "fd_grpc_client_private.h"
3 : #include "../../ballet/nanopb/pb_encode.h" /* pb_msgdesc_t */
4 : #include <sys/socket.h>
5 : #include "../h2/fd_h2_rbuf_sock.h"
6 : #include "fd_grpc_codec.h"
7 : #if FD_HAS_OPENSSL
8 : #include "../openssl/fd_openssl.h"
9 : #include <openssl/ssl.h>
10 : #include <openssl/err.h>
11 : #include "../h2/fd_h2_rbuf_ossl.h"
12 : #endif
13 :
14 : ulong
15 405 : fd_grpc_client_align( void ) {
16 405 : return fd_ulong_max( alignof(fd_grpc_client_t), fd_grpc_h2_stream_pool_align() );
17 405 : }
18 :
19 : ulong
20 102 : fd_grpc_client_footprint( ulong buf_max ) {
21 102 : ulong l = FD_LAYOUT_INIT;
22 102 : l = FD_LAYOUT_APPEND( l, alignof(fd_grpc_client_t), sizeof(fd_grpc_client_t) );
23 102 : l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* nanopb_tx */
24 102 : l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_scratch */
25 102 : l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_rx_buf */
26 102 : l = FD_LAYOUT_APPEND( l, 1UL, buf_max ); /* frame_tx_buf */
27 102 : l = FD_LAYOUT_APPEND( l, fd_grpc_h2_stream_pool_align(), fd_grpc_h2_stream_pool_footprint( FD_GRPC_CLIENT_MAX_STREAMS ) );
28 102 : l = FD_LAYOUT_APPEND( l, 1UL, buf_max*FD_GRPC_CLIENT_MAX_STREAMS );
29 102 : return FD_LAYOUT_FINI( l, fd_grpc_client_align() );
30 102 : }
31 :
32 : static void
33 129 : fd_grpc_h2_stream_reset( fd_grpc_h2_stream_t * stream ) {
34 129 : memset( &stream->s, 0, sizeof(fd_h2_stream_t) );
35 129 : stream->request_ctx = 0UL;
36 129 : memset( &stream->hdrs, 0, sizeof(fd_grpc_resp_hdrs_t) );
37 129 : stream->hdrs.grpc_status = FD_GRPC_STATUS_UNKNOWN;
38 129 : stream->hdrs_received = 0;
39 129 : stream->msg_buf_used = 0UL;
40 129 : stream->msg_sz = 0UL;
41 129 : stream->has_header_deadline = 0;
42 129 : stream->has_rx_end_deadline = 0;
43 129 : }
44 :
45 : fd_grpc_client_t *
46 : fd_grpc_client_new( void * mem,
47 : fd_grpc_client_callbacks_t const * callbacks,
48 : fd_grpc_client_metrics_t * metrics,
49 : void * app_ctx,
50 : ulong buf_max,
51 51 : ulong rng_seed ) {
52 51 : if( FD_UNLIKELY( !mem ) ) {
53 0 : FD_LOG_WARNING(( "NULL mem" ));
54 0 : return NULL;
55 0 : }
56 51 : if( FD_UNLIKELY( buf_max<4096UL ) ) {
57 0 : FD_LOG_WARNING(( "undersz buf_max" ));
58 0 : return NULL;
59 0 : }
60 51 : if( FD_UNLIKELY( !fd_ulong_is_aligned( (ulong)mem, fd_grpc_client_align() ) ) ) {
61 0 : FD_LOG_WARNING(( "unaligned mem" ));
62 0 : return NULL;
63 0 : }
64 :
65 51 : FD_SCRATCH_ALLOC_INIT( l, mem );
66 51 : void * client_mem = FD_SCRATCH_ALLOC_APPEND( l, alignof(fd_grpc_client_t), sizeof(fd_grpc_client_t) );
67 51 : void * nanopb_tx = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* nanopb_tx */
68 51 : void * frame_scratch = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_scratch */
69 51 : void * frame_rx_buf = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_rx_buf */
70 51 : void * frame_tx_buf = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max ); /* frame_tx_buf */
71 51 : void * stream_pool_mem = FD_SCRATCH_ALLOC_APPEND( l, fd_grpc_h2_stream_pool_align(), fd_grpc_h2_stream_pool_footprint( FD_GRPC_CLIENT_MAX_STREAMS ) );
72 51 : void * stream_buf_mem = FD_SCRATCH_ALLOC_APPEND( l, 1UL, buf_max*FD_GRPC_CLIENT_MAX_STREAMS );
73 51 : ulong end = FD_SCRATCH_ALLOC_FINI( l, fd_grpc_client_align() );
74 51 : FD_TEST( end-(ulong)mem == fd_grpc_client_footprint( buf_max ) );
75 :
76 51 : fd_grpc_client_t * client = client_mem;
77 :
78 51 : fd_grpc_h2_stream_t * stream_pool =
79 51 : fd_grpc_h2_stream_pool_join( fd_grpc_h2_stream_pool_new( stream_pool_mem, FD_GRPC_CLIENT_MAX_STREAMS ) );
80 51 : if( FD_UNLIKELY( !stream_pool ) ) FD_LOG_CRIT(( "Failed to create stream pool" )); /* unreachable */
81 :
82 51 : *client = (fd_grpc_client_t){
83 51 : .callbacks = callbacks,
84 51 : .ctx = app_ctx,
85 51 : .stream_pool = stream_pool,
86 51 : .stream_bufs = stream_buf_mem,
87 51 : .nanopb_tx = nanopb_tx,
88 51 : .nanopb_tx_max = buf_max,
89 51 : .frame_scratch = frame_scratch,
90 51 : .frame_scratch_max = buf_max,
91 51 : .frame_rx_buf = frame_rx_buf,
92 51 : .frame_rx_buf_max = buf_max,
93 51 : .frame_tx_buf = frame_tx_buf,
94 51 : .frame_tx_buf_max = buf_max,
95 51 : .metrics = metrics
96 51 : };
97 :
98 : /* FIXME for performance, cache this? */
99 51 : fd_h2_hdr_matcher_init( client->matcher, rng_seed );
100 51 : fd_h2_hdr_matcher_insert_literal( client->matcher, FD_GRPC_HDR_STATUS, "grpc-status" );
101 51 : fd_h2_hdr_matcher_insert_literal( client->matcher, FD_GRPC_HDR_MESSAGE, "grpc-message" );
102 :
103 51 : client->version_len = 5;
104 51 : memcpy( client->version, "0.0.0", 5 );
105 :
106 459 : for( ulong i=0UL; i<FD_GRPC_CLIENT_MAX_STREAMS; i++ ) {
107 408 : fd_grpc_h2_stream_t * stream = &client->stream_pool[ i ];
108 408 : stream->msg_buf = (uchar *)stream_buf_mem + (i*buf_max);
109 408 : stream->msg_buf_max = buf_max;
110 408 : FD_TEST( (ulong)( stream->msg_buf + stream->msg_buf_max )<=end );
111 408 : }
112 51 : fd_grpc_client_reset( client );
113 :
114 51 : return client;
115 51 : }
116 :
117 : void *
118 3 : fd_grpc_client_delete( fd_grpc_client_t * client ) {
119 3 : return client;
120 3 : }
121 :
122 : void
123 : fd_grpc_client_set_version( fd_grpc_client_t * client,
124 : char const * version,
125 0 : ulong version_len ) {
126 0 : if( FD_UNLIKELY( version_len > FD_GRPC_CLIENT_VERSION_LEN_MAX ) ) {
127 0 : FD_LOG_WARNING(( "Version string too long (%lu chars), ignoring", version_len ));
128 0 : return;
129 0 : }
130 0 : client->version_len = (uchar)version_len;
131 0 : memcpy( client->version, version, version_len );
132 0 : }
133 :
134 : void
135 : fd_grpc_client_set_authority( fd_grpc_client_t * client,
136 : char const * host,
137 : ulong host_len,
138 0 : ushort port ) {
139 0 : host_len = fd_ulong_min( host_len, sizeof(client->host)-1 );
140 0 : fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( client->host ), host, host_len ) );
141 0 : client->host_len = (uchar)host_len;
142 0 : client->port = (ushort)port;
143 0 : }
144 :
145 : int
146 120 : fd_grpc_client_stream_acquire_is_safe( fd_grpc_client_t * client ) {
147 : /* Sufficient quota to start a stream? */
148 120 : if( FD_UNLIKELY( client->conn->stream_active_cnt[1]+1 > client->conn->peer_settings.max_concurrent_streams ) ) {
149 18 : return 0;
150 18 : }
151 :
152 : /* Free stream object available? */
153 102 : if( FD_UNLIKELY( !fd_grpc_h2_stream_pool_free( client->stream_pool ) ) ) {
154 0 : return 0;
155 0 : }
156 102 : if( FD_UNLIKELY( client->stream_cnt >= FD_GRPC_CLIENT_MAX_STREAMS ) ) {
157 0 : return 0;
158 0 : }
159 :
160 102 : return 1;
161 102 : }
162 :
163 : fd_grpc_h2_stream_t *
164 : fd_grpc_client_stream_acquire( fd_grpc_client_t * client,
165 129 : ulong request_ctx ) {
166 129 : if( FD_UNLIKELY( client->stream_cnt >= FD_GRPC_CLIENT_MAX_STREAMS ) ) {
167 0 : FD_LOG_CRIT(( "stream pool exhausted" ));
168 0 : }
169 :
170 129 : fd_h2_conn_t * conn = client->conn;
171 129 : uint const stream_id = client->conn->tx_stream_next;
172 129 : conn->tx_stream_next += 2U;
173 :
174 129 : fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_pool_ele_acquire( client->stream_pool );
175 129 : fd_grpc_h2_stream_reset( stream );
176 129 : stream->request_ctx = request_ctx;
177 :
178 129 : fd_h2_stream_open( fd_h2_stream_init( &stream->s ), conn, stream_id );
179 129 : client->request_stream = stream;
180 129 : client->stream_ids[ client->stream_cnt ] = stream_id;
181 129 : client->streams [ client->stream_cnt ] = stream;
182 129 : client->stream_cnt++;
183 :
184 129 : return stream;
185 129 : }
186 :
187 : void
188 : fd_grpc_client_stream_release( fd_grpc_client_t * client,
189 54 : fd_grpc_h2_stream_t * stream ) {
190 54 : if( FD_UNLIKELY( !client->stream_cnt ) ) FD_LOG_CRIT(( "stream map corrupt" )); /* unreachable */
191 :
192 : /* Deallocate tx_op */
193 54 : if( FD_UNLIKELY( stream == client->request_stream ) ) {
194 36 : client->request_stream = NULL;
195 36 : *client->request_tx_op = (fd_h2_tx_op_t){0};
196 36 : }
197 :
198 : /* Remove stream from map */
199 54 : int map_idx = -1;
200 153 : for( uint i=0UL; i<(client->stream_cnt); i++ ) {
201 99 : if( client->stream_ids[ i ] == stream->s.stream_id ) {
202 54 : map_idx = (int)i;
203 54 : }
204 99 : }
205 54 : if( FD_UNLIKELY( map_idx<0 ) ) FD_LOG_CRIT(( "stream map corrupt" )); /* unreachable */
206 54 : if( (ulong)map_idx+1 < client->stream_cnt ) {
207 6 : client->stream_ids[ map_idx ] = client->stream_ids[ client->stream_cnt-1 ];
208 6 : client->streams [ map_idx ] = client->streams [ client->stream_cnt-1 ];
209 6 : }
210 54 : client->stream_cnt--;
211 :
212 54 : fd_grpc_h2_stream_pool_ele_release( client->stream_pool, stream );
213 54 : }
214 :
215 : void
216 69 : fd_grpc_client_reset( fd_grpc_client_t * client ) {
217 69 : fd_h2_rbuf_init( client->frame_rx, client->frame_rx_buf, client->frame_rx_buf_max );
218 69 : fd_h2_rbuf_init( client->frame_tx, client->frame_tx_buf, client->frame_tx_buf_max );
219 69 : fd_h2_conn_init_client( client->conn );
220 69 : client->conn->ctx = client;
221 69 : client->h2_hs_done = 0;
222 69 : client->ssl_hs_done = 0;
223 69 : client->request_stream = NULL;
224 69 : *client->request_tx_op = (fd_h2_tx_op_t){0};
225 :
226 : /* Disable RX flow control */
227 69 : client->conn->self_settings.initial_window_size = (1U<<31)-1U;
228 69 : client->conn->rx_wnd_max = (1U<<31)-1U;
229 69 : client->conn->rx_wnd_wmark = client->conn->rx_wnd_max - (1U<<20);
230 :
231 : /* Free all stream objects */
232 78 : while( client->stream_cnt ) {
233 9 : fd_grpc_h2_stream_t * stream = client->streams[ client->stream_cnt-1 ];
234 9 : fd_grpc_client_stream_release( client, stream );
235 9 : }
236 69 : }
237 :
238 : /* fd_grpc_client_send_stream_quota writes a WINDOW_UPDATE frame, which
239 : eventually allows the peer to send more data bytes. */
240 :
241 : static void
242 : fd_grpc_client_send_stream_quota( fd_h2_rbuf_t * rbuf_tx,
243 : fd_grpc_h2_stream_t * stream,
244 3 : uint bump ) {
245 3 : fd_h2_window_update_t window_update = {
246 3 : .hdr = {
247 3 : .typlen = fd_h2_frame_typlen( FD_H2_FRAME_TYPE_WINDOW_UPDATE, 4UL ),
248 3 : .r_stream_id = fd_uint_bswap( stream->s.stream_id )
249 3 : },
250 3 : .increment = fd_uint_bswap( bump )
251 3 : };
252 3 : fd_h2_rbuf_push( rbuf_tx, &window_update, sizeof(fd_h2_window_update_t) );
253 3 : stream->s.rx_wnd += bump;
254 3 : }
255 :
256 : /* fd_grpc_client_send_timeout is called when a stream timeout triggers.
257 : Calls back to the user, writes a RST_STREAM frame, and frees the
258 : stream object. */
259 :
260 : static void
261 : fd_grpc_client_send_timeout( fd_h2_rbuf_t * rbuf_tx,
262 : fd_grpc_client_t * client,
263 : fd_grpc_h2_stream_t * stream,
264 12 : int deadline_kind ) {
265 12 : client->callbacks->rx_timeout( client->ctx, stream->request_ctx, deadline_kind );
266 12 : fd_h2_tx_rst_stream( rbuf_tx, stream->s.stream_id, FD_H2_ERR_CANCEL );
267 12 : fd_grpc_client_stream_release( client, stream );
268 12 : }
269 :
270 : void
271 : fd_grpc_client_service_streams( fd_grpc_client_t * client,
272 51 : long ts_nanos ) {
273 51 : ulong const meta_frame_max =
274 51 : fd_ulong_max( sizeof(fd_h2_window_update_t), sizeof(fd_h2_rst_stream_t) );
275 51 : fd_h2_conn_t * conn = client->conn;
276 51 : fd_h2_rbuf_t * rbuf_tx = client->frame_tx;
277 51 : if( FD_UNLIKELY( conn->flags ) ) return;
278 51 : uint const wnd_max = conn->self_settings.initial_window_size;
279 51 : uint const wnd_thres = wnd_max / 2;
280 141 : for( ulong i=0UL; i<(client->stream_cnt); i++ ) {
281 90 : if( FD_UNLIKELY( fd_h2_rbuf_free_sz( rbuf_tx )<meta_frame_max ) ) break;
282 90 : fd_grpc_h2_stream_t * stream = client->streams[ i ];
283 :
284 90 : if( FD_UNLIKELY( ( stream->has_header_deadline ) &
285 90 : ( stream->header_deadline_nanos - ts_nanos <= 0L ) ) ) {
286 6 : fd_grpc_client_send_timeout( rbuf_tx, client, stream, FD_GRPC_DEADLINE_HEADER );
287 6 : i--; /* stream removed */
288 6 : continue;
289 6 : }
290 :
291 84 : if( FD_UNLIKELY( ( stream->has_rx_end_deadline ) &
292 84 : ( stream->rx_end_deadline_nanos - ts_nanos <= 0L ) ) ) {
293 6 : fd_grpc_client_send_timeout( rbuf_tx, client, stream, FD_GRPC_DEADLINE_RX_END );
294 6 : i--; /* stream removed */
295 6 : continue;
296 6 : }
297 :
298 78 : if( FD_UNLIKELY( stream->s.rx_wnd < wnd_thres ) ) {
299 3 : uint const bump = wnd_max - stream->s.rx_wnd;
300 3 : fd_grpc_client_send_stream_quota( rbuf_tx, stream, bump );
301 3 : }
302 78 : }
303 51 : }
304 :
305 : #if FD_HAS_OPENSSL
306 :
307 : static int
308 : fd_ossl_log_error( char const * str,
309 : ulong len,
310 0 : void * ctx ) {
311 0 : (void)ctx;
312 0 : if( len>0 && str[ len-1 ]=='\n' ) len--;
313 0 : FD_LOG_INFO(( "%.*s", (int)len, str ));
314 0 : return 0;
315 0 : }
316 :
317 : int
318 : fd_grpc_client_rxtx_ossl( fd_grpc_client_t * client,
319 : SSL * ssl,
320 0 : int * charge_busy ) {
321 0 : if( FD_UNLIKELY( !client->ssl_hs_done ) ) {
322 0 : int res = SSL_do_handshake( ssl );
323 0 : if( res<=0 ) {
324 0 : int error = SSL_get_error( ssl, res );
325 0 : if( FD_LIKELY( error==SSL_ERROR_WANT_READ || error==SSL_ERROR_WANT_WRITE ) ) return 1;
326 0 : FD_LOG_INFO(( "SSL_do_handshake failed (%i-%s)", error, fd_openssl_ssl_strerror( error ) ));
327 0 : long verify_result = SSL_get_verify_result( ssl );
328 0 : if( error == SSL_ERROR_SSL && verify_result != X509_V_OK ) {
329 0 : FD_LOG_WARNING(( "Certificate verification failed: %s", X509_verify_cert_error_string( verify_result ) ));
330 0 : }
331 0 : ERR_print_errors_cb( fd_ossl_log_error, NULL );
332 0 : return 0;
333 0 : } else {
334 0 : client->ssl_hs_done = 1;
335 0 : }
336 0 : }
337 :
338 0 : fd_h2_conn_t * conn = client->conn;
339 0 : int ssl_err = 0;
340 0 : ulong read_sz = fd_h2_rbuf_ssl_read( client->frame_rx, ssl, &ssl_err );
341 0 : if( FD_UNLIKELY( ssl_err && ssl_err!=SSL_ERROR_WANT_READ ) ) {
342 0 : if( ssl_err==SSL_ERROR_ZERO_RETURN ) {
343 0 : FD_LOG_WARNING(( "gRPC server closed connection" ));
344 0 : return 0;
345 0 : }
346 0 : FD_LOG_WARNING(( "SSL_read_ex failed (%i-%s)", ssl_err, fd_openssl_ssl_strerror( ssl_err ) ));
347 0 : ERR_print_errors_cb( fd_ossl_log_error, NULL );
348 0 : return 0;
349 0 : }
350 0 : if( FD_UNLIKELY( conn->flags ) ) fd_h2_tx_control( conn, client->frame_tx, &fd_grpc_client_h2_callbacks );
351 0 : fd_h2_rx( conn, client->frame_rx, client->frame_tx, client->frame_scratch, client->frame_scratch_max, &fd_grpc_client_h2_callbacks );
352 0 : fd_grpc_client_service_streams( client, fd_log_wallclock() );
353 0 : ulong write_sz = fd_h2_rbuf_ssl_write( client->frame_tx, ssl );
354 0 : client->metrics->stream_chunks_rx_bytes += read_sz;
355 0 : client->metrics->stream_chunks_tx_bytes += write_sz;
356 :
357 0 : if( read_sz!=0 || write_sz!=0 ) *charge_busy = 1;
358 0 : return 1;
359 0 : }
360 :
361 : #endif /* FD_HAS_OPENSSL */
362 :
363 : #if FD_H2_HAS_SOCKETS
364 :
365 : int
366 : fd_grpc_client_rxtx_socket( fd_grpc_client_t * client,
367 : int sock_fd,
368 27 : int * charge_busy ) {
369 27 : fd_h2_conn_t * conn = client->conn;
370 27 : ulong const frame_rx_lo_0 = client->frame_rx->lo_off;
371 27 : ulong const frame_rx_hi_0 = client->frame_rx->hi_off;
372 27 : ulong const frame_tx_lo_1 = client->frame_tx->lo_off;
373 27 : ulong const frame_tx_hi_1 = client->frame_tx->hi_off;
374 :
375 27 : int rx_err = fd_h2_rbuf_recvmsg( client->frame_rx, sock_fd, MSG_NOSIGNAL|MSG_DONTWAIT );
376 27 : if( FD_UNLIKELY( rx_err ) ) {
377 0 : FD_LOG_INFO(( "Disconnected: recvmsg error (%i-%s)", rx_err, fd_io_strerror( rx_err ) ));
378 0 : errno = rx_err;
379 0 : return -1;
380 0 : }
381 :
382 27 : if( FD_UNLIKELY( conn->flags ) ) fd_h2_tx_control( conn, client->frame_tx, &fd_grpc_client_h2_callbacks );
383 27 : fd_h2_rx( conn, client->frame_rx, client->frame_tx, client->frame_scratch, client->frame_scratch_max, &fd_grpc_client_h2_callbacks );
384 27 : fd_grpc_client_service_streams( client, fd_log_wallclock() );
385 :
386 27 : int tx_err = fd_h2_rbuf_sendmsg( client->frame_tx, sock_fd, MSG_NOSIGNAL|MSG_DONTWAIT );
387 27 : if( FD_LIKELY( tx_err && tx_err==EAGAIN ) ) return 1;
388 27 : if( FD_UNLIKELY( tx_err ) ) {
389 0 : FD_LOG_WARNING(( "fd_h2_rbuf_sendmsg failed (%i-%s)", tx_err, fd_io_strerror( tx_err ) ));
390 0 : errno = tx_err;
391 0 : return -1;
392 0 : }
393 :
394 27 : ulong const frame_rx_lo_1 = client->frame_rx->lo_off;
395 27 : ulong const frame_rx_hi_1 = client->frame_rx->hi_off;
396 27 : ulong const frame_tx_lo_0 = client->frame_tx->lo_off;
397 27 : ulong const frame_tx_hi_0 = client->frame_tx->hi_off;
398 :
399 27 : client->metrics->stream_chunks_rx_bytes += frame_rx_hi_1 - frame_rx_hi_0;
400 27 : client->metrics->stream_chunks_tx_bytes += frame_tx_lo_0 - frame_tx_lo_1;
401 :
402 27 : if( frame_rx_lo_0!=frame_rx_lo_1 || frame_rx_hi_0!=frame_rx_hi_1 ||
403 27 : frame_tx_lo_0!=frame_tx_lo_1 || frame_tx_hi_0!=frame_tx_hi_1 ) {
404 3 : *charge_busy = 1;
405 3 : }
406 :
407 27 : return 0;
408 27 : }
409 :
410 : #endif /* FD_H2_HAS_SOCKETS */
411 :
412 : /* fd_grpc_client_request continue attempts to write a request data
413 : frame. */
414 :
415 : static int
416 9 : fd_grpc_client_request_continue1( fd_grpc_client_t * client ) {
417 9 : fd_grpc_h2_stream_t * stream = client->request_stream;
418 9 : fd_h2_stream_t * h2_stream = &stream->s;
419 9 : fd_h2_tx_op_copy( client->conn, h2_stream, client->frame_tx, client->request_tx_op );
420 9 : if( FD_UNLIKELY( client->request_tx_op->chunk_sz ) ) return 0;
421 9 : if( FD_UNLIKELY( h2_stream->state != FD_H2_STREAM_STATE_CLOSING_TX ) ) return 0;
422 9 : client->metrics->stream_chunks_tx_cnt++;
423 : /* Request finished */
424 9 : client->request_stream = NULL;
425 9 : client->callbacks->tx_complete( client->ctx, stream->request_ctx );
426 9 : return 1;
427 9 : }
428 :
429 : static int
430 0 : fd_grpc_client_request_continue( fd_grpc_client_t * client ) {
431 0 : if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
432 0 : if( FD_UNLIKELY( !client->request_stream ) ) return 0;
433 0 : if( FD_UNLIKELY( !client->request_tx_op->chunk_sz ) ) return 0;
434 0 : return fd_grpc_client_request_continue1( client );
435 0 : }
436 :
437 : int
438 111 : fd_grpc_client_is_connected( fd_grpc_client_t * client ) {
439 111 : if( FD_UNLIKELY( !client ) ) return 0;
440 111 : if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
441 111 : if( FD_UNLIKELY( !client->h2_hs_done ) ) return 0;
442 108 : return 1;
443 111 : }
444 :
445 : int
446 108 : fd_grpc_client_request_is_blocked( fd_grpc_client_t * client ) {
447 108 : if( FD_UNLIKELY( !client ) ) return 1;
448 108 : if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 1;
449 108 : if( FD_UNLIKELY( !client->h2_hs_done ) ) return 1;
450 108 : if( FD_UNLIKELY( !fd_h2_rbuf_is_empty( client->frame_tx ) ) ) return 1;
451 108 : if( FD_UNLIKELY( !fd_grpc_client_stream_acquire_is_safe( client ) ) ) return 1;
452 90 : return 0;
453 108 : }
454 :
455 : int
456 0 : fd_grpc_client_request_stream_busy( fd_grpc_client_t * client ) {
457 0 : return client->request_stream != NULL;
458 0 : }
459 :
460 : fd_grpc_h2_stream_t *
461 : fd_grpc_client_request_start(
462 : fd_grpc_client_t * client,
463 : char const * path,
464 : ulong path_len,
465 : ulong request_ctx,
466 : pb_msgdesc_t const * fields,
467 : void const * message,
468 : char const * auth_token,
469 : ulong auth_token_sz,
470 : int is_streaming
471 9 : ) {
472 9 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( client ) ) ) return NULL;
473 :
474 : /* Encode message */
475 9 : FD_TEST( client->nanopb_tx_max > sizeof(fd_grpc_hdr_t) );
476 9 : uchar * proto_buf = client->nanopb_tx + sizeof(fd_grpc_hdr_t);
477 9 : pb_ostream_t ostream = pb_ostream_from_buffer( proto_buf, client->nanopb_tx_max - sizeof(fd_grpc_hdr_t) );
478 9 : if( FD_UNLIKELY( !pb_encode( &ostream, fields, message ) ) ) {
479 0 : FD_LOG_WARNING(( "Failed to encode Protobuf message (%.*s). This is a bug (insufficient buffer space?)", (int)path_len, path ));
480 0 : return NULL;
481 0 : }
482 9 : ulong const serialized_sz = ostream.bytes_written;
483 :
484 : /* Create gRPC length prefix */
485 9 : fd_grpc_hdr_t hdr = {
486 9 : .compressed=0,
487 9 : .msg_sz=fd_uint_bswap( (uint)serialized_sz )
488 9 : };
489 9 : memcpy( client->nanopb_tx, &hdr, sizeof(fd_grpc_hdr_t) );
490 9 : ulong const payload_sz = serialized_sz + sizeof(fd_grpc_hdr_t);
491 :
492 : /* Allocate stream descriptor */
493 9 : fd_grpc_h2_stream_t * stream = fd_grpc_client_stream_acquire( client, request_ctx );
494 9 : uint const stream_id = stream->s.stream_id;
495 :
496 : /* Write HTTP/2 request headers */
497 9 : fd_h2_tx_prepare( client->conn, client->frame_tx, FD_H2_FRAME_TYPE_HEADERS, FD_H2_FLAG_END_HEADERS, stream_id );
498 9 : fd_grpc_req_hdrs_t req_meta = {
499 9 : .host = client->host,
500 9 : .host_len = client->host_len,
501 9 : .port = client->port,
502 9 : .path = path,
503 9 : .path_len = path_len,
504 9 : .https = 1, /* grpc_client assumes TLS encryption for now */
505 :
506 9 : .bearer_auth = auth_token,
507 9 : .bearer_auth_len = auth_token_sz
508 9 : };
509 9 : if( FD_UNLIKELY( !fd_grpc_h2_gen_request_hdrs(
510 9 : &req_meta,
511 9 : client->frame_tx,
512 9 : client->version,
513 9 : client->version_len
514 9 : ) ) ) {
515 0 : FD_LOG_WARNING(( "Failed to generate gRPC request headers (%.*s). This is a bug", (int)path_len, path ));
516 0 : fd_grpc_client_stream_release( client, stream );
517 0 : return NULL;
518 0 : }
519 9 : fd_h2_tx_commit( client->conn, client->frame_tx );
520 :
521 : /* Queue request payload for send
522 : (Protobuf message might have to be fragmented into multiple HTTP/2
523 : DATA frames if the client gets blocked)
524 : For streaming requests, don't set END_STREAM flag yet */
525 9 : uint flags = is_streaming ? 0U : FD_H2_FLAG_END_STREAM;
526 9 : fd_h2_tx_op_init( client->request_tx_op, client->nanopb_tx, payload_sz, flags );
527 9 : fd_grpc_client_request_continue1( client );
528 9 : client->metrics->requests_sent++;
529 9 : client->metrics->streams_active++;
530 :
531 9 : FD_LOG_DEBUG(( "gRPC request path=%.*s sz=%lu streaming=%d", (int)path_len, path, serialized_sz, is_streaming ));
532 :
533 9 : return stream;
534 9 : }
535 :
536 : fd_grpc_h2_stream_t *
537 : fd_grpc_client_request_start1(
538 : fd_grpc_client_t * client,
539 : char const * path,
540 : ulong path_len,
541 : ulong request_ctx,
542 : uchar const * protobuf,
543 : ulong protobuf_sz,
544 : char const * auth_token,
545 : ulong auth_token_sz,
546 : int is_streaming
547 0 : ) {
548 0 : if( FD_UNLIKELY( fd_grpc_client_request_is_blocked( client ) ) ) return NULL;
549 :
550 : /* Validate protobuf size */
551 0 : FD_TEST( client->nanopb_tx_max > sizeof(fd_grpc_hdr_t) );
552 0 : ulong const max_proto_sz = client->nanopb_tx_max - sizeof(fd_grpc_hdr_t);
553 0 : if( FD_UNLIKELY( protobuf_sz > max_proto_sz ) ) {
554 0 : FD_LOG_WARNING(( "Protobuf message too large (%lu bytes) for path (%.*s). Max size is %lu bytes", protobuf_sz, (int)path_len, path, max_proto_sz ));
555 0 : return NULL;
556 0 : }
557 :
558 : /* Copy protobuf to buffer after gRPC header */
559 0 : uchar * proto_buf = client->nanopb_tx + sizeof(fd_grpc_hdr_t);
560 0 : memcpy( proto_buf, protobuf, protobuf_sz );
561 :
562 : /* Create gRPC length prefix */
563 0 : fd_grpc_hdr_t hdr = {
564 0 : .compressed=0,
565 0 : .msg_sz=fd_uint_bswap( (uint)protobuf_sz )
566 0 : };
567 0 : memcpy( client->nanopb_tx, &hdr, sizeof(fd_grpc_hdr_t) );
568 0 : ulong const payload_sz = protobuf_sz + sizeof(fd_grpc_hdr_t);
569 :
570 : /* Allocate stream descriptor */
571 0 : fd_grpc_h2_stream_t * stream = fd_grpc_client_stream_acquire( client, request_ctx );
572 0 : uint const stream_id = stream->s.stream_id;
573 :
574 : /* Write HTTP/2 request headers */
575 0 : fd_h2_tx_prepare( client->conn, client->frame_tx, FD_H2_FRAME_TYPE_HEADERS, FD_H2_FLAG_END_HEADERS, stream_id );
576 0 : fd_grpc_req_hdrs_t req_meta = {
577 0 : .host = client->host,
578 0 : .host_len = client->host_len,
579 0 : .port = client->port,
580 0 : .path = path,
581 0 : .path_len = path_len,
582 0 : .https = 1, /* grpc_client assumes TLS encryption for now */
583 :
584 0 : .bearer_auth = auth_token,
585 0 : .bearer_auth_len = auth_token_sz
586 0 : };
587 0 : if( FD_UNLIKELY( !fd_grpc_h2_gen_request_hdrs(
588 0 : &req_meta,
589 0 : client->frame_tx,
590 0 : client->version,
591 0 : client->version_len
592 0 : ) ) ) {
593 0 : FD_LOG_WARNING(( "Failed to generate gRPC request headers (%.*s). This is a bug", (int)path_len, path ));
594 0 : fd_grpc_client_stream_release( client, stream );
595 0 : return NULL;
596 0 : }
597 0 : fd_h2_tx_commit( client->conn, client->frame_tx );
598 :
599 : /* Queue request payload for send
600 : (Protobuf message might have to be fragmented into multiple HTTP/2
601 : DATA frames if the client gets blocked)
602 : For streaming requests, don't set END_STREAM flag yet */
603 0 : uint flags = is_streaming ? 0U : FD_H2_FLAG_END_STREAM;
604 0 : fd_h2_tx_op_init( client->request_tx_op, client->nanopb_tx, payload_sz, flags );
605 0 : fd_grpc_client_request_continue1( client );
606 0 : client->metrics->requests_sent++;
607 0 : client->metrics->streams_active++;
608 :
609 0 : FD_LOG_DEBUG(( "gRPC request path=%.*s sz=%lu streaming=%d", (int)path_len, path, protobuf_sz, is_streaming ));
610 :
611 0 : return stream;
612 0 : }
613 :
614 : int
615 : fd_grpc_client_stream_send_msg(
616 : fd_grpc_client_t * client,
617 : fd_grpc_h2_stream_t * stream,
618 : pb_msgdesc_t const * fields,
619 : void const * message
620 0 : ) {
621 0 : if( FD_UNLIKELY( !client || !stream ) ) return 0;
622 0 : if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
623 0 : if( FD_UNLIKELY( !fd_h2_rbuf_is_empty( client->frame_tx ) ) ) return 0;
624 0 : if( FD_UNLIKELY( client->request_stream != NULL && client->request_stream != stream ) ) return 0; /* Another stream has a request in progress */
625 :
626 : /* Encode message */
627 0 : FD_TEST( client->nanopb_tx_max > sizeof(fd_grpc_hdr_t) );
628 0 : uchar * proto_buf = client->nanopb_tx + sizeof(fd_grpc_hdr_t);
629 0 : pb_ostream_t ostream = pb_ostream_from_buffer( proto_buf, client->nanopb_tx_max - sizeof(fd_grpc_hdr_t) );
630 0 : if( FD_UNLIKELY( !pb_encode( &ostream, fields, message ) ) ) {
631 0 : FD_LOG_WARNING(( "Failed to encode Protobuf message for stream_id=%u. This is a bug (insufficient buffer space?)", stream->s.stream_id ));
632 0 : return 0;
633 0 : }
634 0 : ulong const serialized_sz = ostream.bytes_written;
635 :
636 : /* Create gRPC length prefix */
637 0 : fd_grpc_hdr_t hdr = {
638 0 : .compressed=0,
639 0 : .msg_sz=fd_uint_bswap( (uint)serialized_sz )
640 0 : };
641 0 : memcpy( client->nanopb_tx, &hdr, sizeof(fd_grpc_hdr_t) );
642 0 : ulong const payload_sz = serialized_sz + sizeof(fd_grpc_hdr_t);
643 :
644 : /* Queue message payload for send (without END_STREAM flag) */
645 0 : client->request_stream = stream;
646 0 : fd_h2_tx_op_init( client->request_tx_op, client->nanopb_tx, payload_sz, 0U );
647 0 : fd_grpc_client_request_continue1( client );
648 :
649 0 : FD_LOG_DEBUG(( "gRPC stream_send_msg stream_id=%u sz=%lu", stream->s.stream_id, serialized_sz ));
650 :
651 0 : return 1;
652 0 : }
653 :
654 : int
655 : fd_grpc_client_stream_send_msg1(
656 : fd_grpc_client_t * client,
657 : fd_grpc_h2_stream_t * stream,
658 : uchar const * protobuf,
659 : ulong protobuf_sz
660 0 : ) {
661 0 : if( FD_UNLIKELY( !client || !stream ) ) return 0;
662 0 : if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
663 0 : if( FD_UNLIKELY( !fd_h2_rbuf_is_empty( client->frame_tx ) ) ) return 0;
664 0 : if( FD_UNLIKELY( client->request_stream != NULL && client->request_stream != stream ) ) return 0; /* Another stream has a request in progress */
665 :
666 : /* Validate protobuf size */
667 0 : FD_TEST( client->nanopb_tx_max > sizeof(fd_grpc_hdr_t) );
668 0 : ulong const max_proto_sz = client->nanopb_tx_max - sizeof(fd_grpc_hdr_t);
669 0 : if( FD_UNLIKELY( protobuf_sz > max_proto_sz ) ) {
670 0 : FD_LOG_WARNING(( "Protobuf message too large (%lu bytes) for stream_id=%u. Max size is %lu bytes", protobuf_sz, stream->s.stream_id, max_proto_sz ));
671 0 : return 0;
672 0 : }
673 :
674 : /* Copy protobuf to buffer after gRPC header */
675 0 : uchar * proto_buf = client->nanopb_tx + sizeof(fd_grpc_hdr_t);
676 0 : memcpy( proto_buf, protobuf, protobuf_sz );
677 :
678 : /* Create gRPC length prefix */
679 0 : fd_grpc_hdr_t hdr = {
680 0 : .compressed=0,
681 0 : .msg_sz=fd_uint_bswap( (uint)protobuf_sz )
682 0 : };
683 0 : memcpy( client->nanopb_tx, &hdr, sizeof(fd_grpc_hdr_t) );
684 0 : ulong const payload_sz = protobuf_sz + sizeof(fd_grpc_hdr_t);
685 :
686 : /* Queue message payload for send (without END_STREAM flag) */
687 0 : client->request_stream = stream;
688 0 : fd_h2_tx_op_init( client->request_tx_op, client->nanopb_tx, payload_sz, 0U );
689 0 : fd_grpc_client_request_continue1( client );
690 :
691 0 : FD_LOG_DEBUG(( "gRPC stream_send_msg stream_id=%u sz=%lu", stream->s.stream_id, protobuf_sz ));
692 :
693 0 : return 1;
694 0 : }
695 :
696 : int
697 : fd_grpc_client_stream_close(
698 : fd_grpc_client_t * client,
699 : fd_grpc_h2_stream_t * stream
700 0 : ) {
701 0 : if( FD_UNLIKELY( !client || !stream ) ) return 0;
702 0 : if( FD_UNLIKELY( client->conn->flags & FD_H2_CONN_FLAGS_DEAD ) ) return 0;
703 0 : if( FD_UNLIKELY( !fd_h2_rbuf_is_empty( client->frame_tx ) ) ) return 0;
704 0 : if( FD_UNLIKELY( client->request_stream != NULL ) ) return 0; /* Another request in progress */
705 :
706 : /* Send empty DATA frame with END_STREAM flag to close the stream */
707 0 : fd_h2_tx_prepare( client->conn, client->frame_tx, FD_H2_FRAME_TYPE_DATA, FD_H2_FLAG_END_STREAM, stream->s.stream_id );
708 0 : fd_h2_tx_commit( client->conn, client->frame_tx );
709 :
710 0 : FD_LOG_DEBUG(( "gRPC stream_close stream_id=%u", stream->s.stream_id ));
711 :
712 0 : return 1;
713 0 : }
714 :
715 : void
716 : fd_grpc_client_deadline_set( fd_grpc_h2_stream_t * stream,
717 : int deadline_kind,
718 18 : long ts_nanos ) {
719 18 : switch( deadline_kind ) {
720 12 : case FD_GRPC_DEADLINE_HEADER:
721 12 : stream->header_deadline_nanos = ts_nanos;
722 12 : stream->has_header_deadline = 1;
723 12 : break;
724 6 : case FD_GRPC_DEADLINE_RX_END:
725 6 : stream->rx_end_deadline_nanos = ts_nanos;
726 6 : stream->has_rx_end_deadline = 1;
727 6 : break;
728 18 : }
729 18 : }
730 :
731 : /* Lookup stream by ID */
732 :
733 : static fd_h2_stream_t *
734 : fd_grpc_h2_stream_query( fd_h2_conn_t * conn,
735 0 : uint stream_id ) {
736 0 : fd_grpc_client_t * client = conn->ctx;
737 0 : for( ulong i=0UL; i<client->stream_cnt; i++ ) {
738 0 : if( client->stream_ids[ i ] == stream_id ) {
739 0 : return &client->streams[ i ]->s;
740 0 : }
741 0 : }
742 0 : return NULL;
743 0 : }
744 :
745 : static void
746 0 : fd_grpc_h2_conn_established( fd_h2_conn_t * conn ) {
747 0 : fd_grpc_client_t * client = conn->ctx;
748 0 : client->h2_hs_done = 1;
749 0 : client->callbacks->conn_established( client->ctx );
750 0 : }
751 :
752 : static void
753 : fd_grpc_h2_conn_final( fd_h2_conn_t * conn,
754 : uint h2_err,
755 0 : int closed_by ) {
756 0 : fd_grpc_client_t * client = conn->ctx;
757 0 : client->callbacks->conn_dead( client->ctx, h2_err, closed_by );
758 0 : }
759 :
760 : /* React to response data */
761 :
762 : void
763 : fd_grpc_h2_cb_headers(
764 : fd_h2_conn_t * conn,
765 : fd_h2_stream_t * h2_stream,
766 : void const * data,
767 : ulong data_sz,
768 : ulong flags
769 18 : ) {
770 18 : fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_upcast( h2_stream );
771 18 : fd_grpc_client_t * client = conn->ctx;
772 :
773 18 : int h2_status = fd_grpc_h2_read_response_hdrs( &stream->hdrs, client->matcher, data, data_sz );
774 18 : if( FD_UNLIKELY( h2_status!=FD_H2_SUCCESS ) ) {
775 : /* Failed to parse HTTP/2 headers */
776 3 : fd_h2_stream_error( h2_stream, client->frame_tx, FD_H2_ERR_PROTOCOL );
777 3 : client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs ); /* invalidates stream->hdrs */
778 3 : fd_grpc_client_stream_release( client, stream );
779 3 : return;
780 3 : }
781 :
782 15 : if( !stream->hdrs_received && !!( flags & FD_H2_FLAG_END_HEADERS) ) {
783 : /* Got initial response header */
784 12 : stream->hdrs_received = 1;
785 12 : stream->has_header_deadline = 0;
786 12 : if( FD_LIKELY( ( stream->hdrs.h2_status==200 ) &
787 12 : ( !!stream->hdrs.is_grpc_proto ) ) ) {
788 6 : client->callbacks->rx_start( client->ctx, stream->request_ctx );
789 6 : }
790 12 : }
791 :
792 15 : if( ( flags & (FD_H2_FLAG_END_HEADERS|FD_H2_FLAG_END_STREAM) )
793 15 : ==(FD_H2_FLAG_END_HEADERS|FD_H2_FLAG_END_STREAM) ) {
794 3 : client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
795 3 : fd_grpc_client_stream_release( client, stream );
796 3 : return;
797 3 : }
798 15 : }
799 :
800 : static void
801 : fd_grpc_h2_cb_data(
802 : fd_h2_conn_t * conn,
803 : fd_h2_stream_t * h2_stream,
804 : void const * data,
805 : ulong data_sz,
806 : ulong flags
807 3 : ) {
808 3 : fd_grpc_client_t * client = conn->ctx;
809 3 : fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_upcast( h2_stream );
810 3 : if( FD_UNLIKELY( ( stream->hdrs.h2_status!=200 ) |
811 3 : ( !stream->hdrs.is_grpc_proto ) ) ) {
812 0 : return;
813 0 : }
814 :
815 3 : do {
816 :
817 : /* Read header bytes */
818 3 : if( stream->msg_buf_used < sizeof(fd_grpc_hdr_t) ) {
819 3 : ulong hdr_frag_sz = fd_ulong_min( sizeof(fd_grpc_hdr_t) - stream->msg_buf_used, data_sz );
820 3 : fd_memcpy( stream->msg_buf + stream->msg_buf_used, data, hdr_frag_sz );
821 3 : stream->msg_buf_used += hdr_frag_sz;
822 3 : data = (void const *)( (ulong)data + (ulong)hdr_frag_sz );
823 3 : data_sz -= hdr_frag_sz;
824 3 : if( FD_UNLIKELY( stream->msg_buf_used < sizeof(fd_grpc_hdr_t) ) ) return;
825 :
826 : /* Header complete */
827 3 : stream->msg_sz = fd_uint_bswap( FD_LOAD( uint, (void *)( (ulong)stream->msg_buf+1 ) ) );
828 3 : if( FD_UNLIKELY( sizeof(fd_grpc_hdr_t) + stream->msg_sz > stream->msg_buf_max ) ) {
829 3 : FD_LOG_WARNING(( "Received oversized gRPC message (%lu bytes), killing request", stream->msg_sz ));
830 3 : client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
831 3 : fd_h2_stream_error( h2_stream, client->frame_tx, FD_H2_ERR_INTERNAL );
832 3 : fd_grpc_client_stream_release( client, stream );
833 3 : return;
834 3 : }
835 3 : }
836 :
837 : /* Read payload bytes */
838 0 : ulong wmark = sizeof(fd_grpc_hdr_t) + stream->msg_sz;
839 0 : ulong chunk_sz = fd_ulong_min( stream->msg_buf_used+data_sz, wmark ) - stream->msg_buf_used;
840 0 : if( FD_UNLIKELY( chunk_sz>data_sz ) ) FD_LOG_CRIT(( "integer underflow" )); /* unreachable */
841 0 : fd_memcpy( stream->msg_buf + stream->msg_buf_used, data, chunk_sz );
842 0 : stream->msg_buf_used += chunk_sz;
843 0 : data = (void const *)( (ulong)data + (ulong)chunk_sz );
844 0 : data_sz -= chunk_sz;
845 :
846 0 : client->metrics->stream_chunks_rx_cnt++;
847 :
848 0 : if( stream->msg_buf_used >= wmark ) {
849 : /* Data complete */
850 0 : void const * msg_ptr = stream->msg_buf + sizeof(fd_grpc_hdr_t);
851 0 : client->callbacks->rx_msg( client->ctx, msg_ptr, stream->msg_sz, stream->request_ctx );
852 0 : stream->msg_buf_used = 0UL;
853 0 : stream->msg_sz = 0UL;
854 0 : }
855 :
856 0 : } while( data_sz );
857 :
858 0 : if( flags & FD_H2_FLAG_END_STREAM ) {
859 : /* FIXME incomplete gRPC message */
860 0 : if( FD_UNLIKELY( stream->msg_buf_used ) ) {
861 0 : FD_LOG_WARNING(( "Received incomplete gRPC message" ));
862 0 : }
863 0 : client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs );
864 0 : fd_grpc_client_stream_release( client, stream );
865 0 : }
866 0 : }
867 :
868 : /* Server might kill our request */
869 :
870 : static void
871 : fd_grpc_h2_rst_stream( fd_h2_conn_t * conn,
872 : fd_h2_stream_t * h2_stream,
873 : uint error_code,
874 3 : int closed_by ) {
875 3 : if( closed_by==1 ) {
876 3 : FD_LOG_WARNING(( "Server terminated request stream_id=%u (%u-%s)",
877 3 : h2_stream->stream_id, error_code, fd_h2_strerror( error_code ) ));
878 3 : } else {
879 0 : FD_LOG_WARNING(( "Stream failed stream_id=%u (%u-%s)",
880 0 : h2_stream->stream_id, error_code, fd_h2_strerror( error_code ) ));
881 0 : }
882 3 : fd_grpc_client_t * client = conn->ctx;
883 3 : fd_grpc_h2_stream_t * stream = fd_grpc_h2_stream_upcast( h2_stream );
884 3 : client->callbacks->rx_end( client->ctx, stream->request_ctx, &stream->hdrs ); /* invalidates stream->hdrs */
885 3 : fd_grpc_client_stream_release( client, stream );
886 3 : }
887 :
888 : /* A HTTP/2 flow control change might unblock a queued request send op */
889 :
890 : void
891 : fd_grpc_h2_window_update( fd_h2_conn_t * conn,
892 0 : uint increment ) {
893 0 : (void)increment;
894 0 : fd_grpc_client_request_continue( conn->ctx );
895 0 : }
896 :
897 : void
898 : fd_grpc_h2_stream_window_update( fd_h2_conn_t * conn,
899 : fd_h2_stream_t * stream,
900 0 : uint increment ) {
901 0 : (void)stream; (void)increment;
902 0 : fd_grpc_client_request_continue( conn->ctx );
903 0 : }
904 :
905 : void
906 3 : fd_grpc_h2_ping_ack( fd_h2_conn_t * conn ) {
907 3 : fd_grpc_client_t * client = conn->ctx;
908 3 : client->callbacks->ping_ack( client->ctx );
909 3 : }
910 :
911 : fd_h2_rbuf_t *
912 6 : fd_grpc_client_rbuf_tx( fd_grpc_client_t * client ) {
913 6 : return client->frame_tx;
914 6 : }
915 :
916 : fd_h2_rbuf_t *
917 0 : fd_grpc_client_rbuf_rx( fd_grpc_client_t * client ) {
918 0 : return client->frame_rx;
919 0 : }
920 :
921 : fd_h2_conn_t *
922 228 : fd_grpc_client_h2_conn( fd_grpc_client_t * client ) {
923 228 : return client->conn;
924 228 : }
925 :
926 : /* fd_grpc_client_h2_callbacks specifies h2->grpc_client callbacks.
927 : Stored in .rodata for security. Must be kept in sync with fd_h2 to
928 : avoid NULL pointers. */
929 :
930 : fd_h2_callbacks_t const fd_grpc_client_h2_callbacks = {
931 : .stream_create = fd_h2_noop_stream_create,
932 : .stream_query = fd_grpc_h2_stream_query,
933 : .conn_established = fd_grpc_h2_conn_established,
934 : .conn_final = fd_grpc_h2_conn_final,
935 : .headers = fd_grpc_h2_cb_headers,
936 : .data = fd_grpc_h2_cb_data,
937 : .rst_stream = fd_grpc_h2_rst_stream,
938 : .window_update = fd_grpc_h2_window_update,
939 : .stream_window_update = fd_grpc_h2_stream_window_update,
940 : .ping_ack = fd_grpc_h2_ping_ack,
941 : };
|